lostluck commented on code in PR #33337:
URL: https://github.com/apache/beam/pull/33337#discussion_r1881384317
##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -1555,48 +1614,143 @@ func (ss *stageState) updateWatermarks(em
*ElementManager) set[string] {
if minWatermarkHold < newOut {
newOut = minWatermarkHold
}
- refreshes := set[string]{}
+ // If the newOut is smaller, then don't change downstream watermarks.
+ if newOut <= ss.output {
+ return nil
+ }
+
// If bigger, advance the output watermark
- if newOut > ss.output {
- ss.output = newOut
- for _, outputCol := range ss.outputIDs {
- consumers := em.consumers[outputCol]
-
- for _, sID := range consumers {
-
em.stages[sID].updateUpstreamWatermark(outputCol, ss.output)
- refreshes.insert(sID)
- }
- // Inform side input consumers, but don't update the
upstream watermark.
- for _, sID := range em.sideConsumers[outputCol] {
- refreshes.insert(sID.Global)
- }
- }
- // Garbage collect state, timers and side inputs, for all
windows
- // that are before the new output watermark.
- // They'll never be read in again.
- for _, wins := range ss.sideInputs {
- for win := range wins {
- //
TODO(#https://github.com/apache/beam/issues/31438):
- // Adjust with AllowedLateness
- // Clear out anything we've already used.
- if win.MaxTimestamp() < newOut {
- delete(wins, win)
+ preventDownstreamUpdate := ss.createOnWindowExpirationBundles(newOut,
em)
+
+ // Garbage collect state, timers and side inputs, for all windows
+ // that are before the new output watermark, if they aren't in progress
+ // of being expired.
+ // They'll never be read in again.
+ for _, wins := range ss.sideInputs {
+ for win := range wins {
+ // TODO(#https://github.com/apache/beam/issues/31438):
+ // Adjust with AllowedLateness
+ // Clear out anything we've already used.
+ if win.MaxTimestamp() < newOut {
+ // If the expiry is in progress, skip this
window.
+ if ss.inProgressExpiredWindows[win] > 0 {
+ continue
}
+ delete(wins, win)
}
}
- for _, wins := range ss.state {
- for win := range wins {
- //
TODO(#https://github.com/apache/beam/issues/31438):
- // Adjust with AllowedLateness
- if win.MaxTimestamp() < newOut {
- delete(wins, win)
+ }
+ for _, wins := range ss.state {
+ for win := range wins {
+ // TODO(#https://github.com/apache/beam/issues/31438):
+ // Adjust with AllowedLateness
+ if win.MaxTimestamp() < newOut {
+ // If the expiry is in progress, skip
collecting this window.
+ if ss.inProgressExpiredWindows[win] > 0 {
+ continue
}
+ delete(wins, win)
}
}
}
+ // If there are windows to expire, we don't update the output watermark
yet.
+ if preventDownstreamUpdate {
+ return nil
+ }
+
+ // Update this stage's output watermark, and then propagate that to
downstream stages
+ refreshes := set[string]{}
+ ss.output = newOut
+ for _, outputCol := range ss.outputIDs {
+ consumers := em.consumers[outputCol]
+
+ for _, sID := range consumers {
+ em.stages[sID].updateUpstreamWatermark(outputCol,
ss.output)
+ refreshes.insert(sID)
+ }
+ // Inform side input consumers, but don't update the upstream
watermark.
+ for _, sID := range em.sideConsumers[outputCol] {
+ refreshes.insert(sID.Global)
+ }
+ }
return refreshes
}
+// createOnWindowExpirationBundles creates side channel bundles when windows
+// expire for all keys that were used in that window. Returns true if any
+// bundles are created, which means that the window must not yet be garbage
+// collected.
+//
+// Must be called within the stageState.mu's and the ElementManager.refreshCond
+// critical sections.
+func (ss *stageState) createOnWindowExpirationBundles(newOut mtime.Time, em
*ElementManager) bool {
+ var preventDownstreamUpdate bool
+ for win, keys := range ss.keysToExpireByWindow {
+ // Check if the window has expired.
+ // TODO(#https://github.com/apache/beam/issues/31438):
+ // Adjust with AllowedLateness
+ if win.MaxTimestamp() >= newOut {
+ continue
+ }
+ // We can't advance the output watermark if there's garbage to
collect.
+ preventDownstreamUpdate = true
+ // Hold off on garbage collecting data for these windows while
these
+ // are in progress.
+ ss.inProgressExpiredWindows[win] += 1
+
+ // Produce bundle(s) for these keys and window, and side
channel them.
+ wm := win.MaxTimestamp()
+ rb := RunBundle{StageID: ss.ID, BundleID: "owe-" +
em.nextBundID(), Watermark: wm}
+
+ // Now we need to actually build the bundle.
+ var toProcess []element
+ busyKeys := set[string]{}
+ usedKeys := set[string]{}
+ for k := range keys {
+ if ss.inprogressKeys.present(k) {
+ busyKeys.insert(k)
+ continue
+ }
+ usedKeys.insert(k)
+ toProcess = append(toProcess, element{
+ window: win,
+ timestamp: wm,
+ pane: typex.NoFiringPane(),
+ holdTimestamp: wm,
+ transform: ss.onWindowExpiration.Transform,
+ family:
ss.onWindowExpiration.TimerFamily,
+ sequence: 1,
+ keyBytes: []byte(k),
+ elmBytes: nil,
+ })
Review Comment:
Correct, this is where we're producing the timer values that the SDK will
consume and ultimately call the OnWindowExpiration callbacks.
This sort of thing usually lives in "PersistBundle" but since these timers
aren't directly produced by users, we need to synthesize them here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]