lostluck commented on code in PR #31057:
URL: https://github.com/apache/beam/pull/31057#discussion_r1576857084


##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -532,12 +533,26 @@ func (em *ElementManager) StateForBundle(rb RunBundle) 
TentativeData {
        return ret
 }
 
+// Residual represents the unprocessed portion of a single element to be 
rescheduled for processing later.
+type Residual struct {
+       Element []byte
+       Delay   time.Duration // The relative time delay.
+       Bounded bool          // Whether this element is finite or not.
+}
+
+// Residuals is used to specify process continuations within a bundle.
+type Residuals struct {
+       Data                 []Residual
+       TransformID, InputID string                // Prism only allows one SDF 
at the root of a bundledescriptor so there should only be one each.
+       MinOutputWatermarks  map[string]mtime.Time // Output watermarks 
(technically per Residual, but aggregated here until it makes a difference.)

Review Comment:
   There's no such thing as a "per window watermark", and windows themselves 
are only meaningful at aggregation boundaries. As such, no, we won't add 
windows to this.
   
   Per the model/protos: The MinOutputWatermarks are set on a per output 
PCollection basis based on the estimated watermarks provided by the user code. 
So that string key refers to to the local output identifier.
   
   In practice, the watermark for a single transform is the same for all 
outputs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to