damondouglas commented on code in PR #31057:
URL: https://github.com/apache/beam/pull/31057#discussion_r1576847287
##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -532,12 +533,26 @@ func (em *ElementManager) StateForBundle(rb RunBundle)
TentativeData {
return ret
}
+// Residual represents the unprocessed portion of a single element to be
rescheduled for processing later.
+type Residual struct {
+ Element []byte
+ Delay time.Duration // The relative time delay.
+ Bounded bool // Whether this element is finite or not.
+}
+
+// Residuals is used to specify process continuations within a bundle.
+type Residuals struct {
+ Data []Residual
+ TransformID, InputID string // Prism only allows one SDF
at the root of a bundledescriptor so there should only be one each.
+ MinOutputWatermarks map[string]mtime.Time // Output watermarks
(technically per Residual, but aggregated here until it makes a difference.)
Review Comment:
Does the string key reference the bundle's window? If so could we add this?
Also, is the `Min` of its namesake related to that the minimum element
timestamp is what we consider as an input or output Watermark? Finally, I
struggled with `technically per Residual` and `aggregated`. Is there a nuance
here that is important i.e. cases when it is not 1:1 per Residual. What is
being aggregated?
##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -532,12 +533,26 @@ func (em *ElementManager) StateForBundle(rb RunBundle)
TentativeData {
return ret
}
+// Residual represents the unprocessed portion of a single element to be
rescheduled for processing later.
+type Residual struct {
+ Element []byte
+ Delay time.Duration // The relative time delay.
+ Bounded bool // Whether this element is finite or not.
+}
+
+// Residuals is used to specify process continuations within a bundle.
+type Residuals struct {
Review Comment:
Could we say instead, including the use of a symbol link,: "Residuals stores
remaining [Residual]s, their aggregated Watermarks, and references needed to
query their associated transforms"
Question triggered for me by "process continuations" that relates to me in
Java where the ProcessContinuation is the return type for all ProcessElement
lifecycle methods, whether or not the developer uses the void return type:
Is `Residuals` instantiated even when a DoFn does not explicitly set a
process timer? In other words, do the code paths converge to using Residuals,
both when remaining elements exist in a bundle and when process timers are
explicitly configured for a DoFn?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]