damondouglas commented on code in PR #31057:
URL: https://github.com/apache/beam/pull/31057#discussion_r1576847287


##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -532,12 +533,26 @@ func (em *ElementManager) StateForBundle(rb RunBundle) 
TentativeData {
        return ret
 }
 
+// Residual represents the unprocessed portion of a single element to be 
rescheduled for processing later.
+type Residual struct {
+       Element []byte
+       Delay   time.Duration // The relative time delay.
+       Bounded bool          // Whether this element is finite or not.
+}
+
+// Residuals is used to specify process continuations within a bundle.
+type Residuals struct {
+       Data                 []Residual
+       TransformID, InputID string                // Prism only allows one SDF 
at the root of a bundledescriptor so there should only be one each.
+       MinOutputWatermarks  map[string]mtime.Time // Output watermarks 
(technically per Residual, but aggregated here until it makes a difference.)

Review Comment:
   Does the string key reference the bundle's window? If so could we add this? 
Also, is the `Min` of its namesake related to that the minimum element 
timestamp is what we consider as an input or output Watermark? Finally, I 
struggled with `technically per Residual` and `aggregated`. Is there a nuance 
here that is important i.e. cases when it is not 1:1 per Residual. What is 
being aggregated?



##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -532,12 +533,26 @@ func (em *ElementManager) StateForBundle(rb RunBundle) 
TentativeData {
        return ret
 }
 
+// Residual represents the unprocessed portion of a single element to be 
rescheduled for processing later.
+type Residual struct {
+       Element []byte
+       Delay   time.Duration // The relative time delay.
+       Bounded bool          // Whether this element is finite or not.
+}
+
+// Residuals is used to specify process continuations within a bundle.
+type Residuals struct {

Review Comment:
   Could we say instead, including the use of a symbol link,: "Residuals stores 
remaining [Residual]s, their aggregated Watermarks, and references needed to 
query their associated transforms"
   
   Question triggered for me by "process continuations" that relates to me in 
Java where the ProcessContinuation is the return type for all ProcessElement 
lifecycle methods, whether or not the developer uses the void return type:
   Is `Residuals` instantiated even when a DoFn does not explicitly set a 
process timer? In other words, do the code paths converge to using Residuals, 
both when remaining elements exist in a bundle and when process timers are 
explicitly configured for a DoFn?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to