damondouglas commented on code in PR #33337:
URL: https://github.com/apache/beam/pull/33337#discussion_r1881138472


##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -194,9 +195,10 @@ type ElementManager struct {
 
        pcolParents map[string]string // Map from pcollectionID to stageIDs 
that produce the pcollection.
 
-       refreshCond       sync.Cond   // refreshCond protects the following 
fields with it's lock, and unblocks bundle scheduling.
-       inprogressBundles set[string] // Active bundleIDs
-       changedStages     set[string] // Stages that have changed and need 
their watermark refreshed.
+       refreshCond        sync.Cond   // refreshCond protects the following 
fields with it's lock, and unblocks bundle scheduling.
+       inprogressBundles  set[string] // Active bundleIDs
+       changedStages      set[string] // Stages that have changed and need 
their watermark refreshed.
+       sideChannelBundles []RunBundle // Represents ready to executed bundles 
prepared on the side by a stage instead of in the main loop, such as for 
onWindowExpiry, or for Triggers.

Review Comment:
   May we consider the following? I'm rewriting the comment as a learning 
feedback.
   
   `injectedChannelBundles []RunBundle // Represents ready to execute bundles 
prepared outside the main loop, that have been injected by expired windows 
pending garbage collection; includes onWindowExpiry or Triggers.`



##########
CHANGES.md:
##########
@@ -68,6 +68,8 @@
 ## New Features / Improvements
 
 * X feature added (Java/Python) 
([#X](https://github.com/apache/beam/issues/X)).
+* Support OnWindowExpiration in Prism 
([#32211](https://github.com/apache/beam/issues/32211)).
+  * This enables initial Java GroupIntoBatches support.

Review Comment:
   May we consider:
   
   This enables Java 
[`@OnWindowExpiration`](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/DoFn.OnWindowExpiration.html)
 which enables initial 
[GroupIntoBatches](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/GroupIntoBatches.html)
 support.



##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -371,7 +385,7 @@ func (em *ElementManager) Bundles(ctx context.Context, 
upstreamCancelFn context.
                        em.changedStages.merge(changedByProcessingTime)
 
                        // If there are no changed stages or ready processing 
time events available, we wait until there are.
-                       for len(em.changedStages)+len(changedByProcessingTime) 
== 0 {
+                       for 
len(em.changedStages)+len(changedByProcessingTime)+len(em.sideChannelBundles) 
== 0 {

Review Comment:
   Change code comment to `// If there are no changed stages, ready processing 
time events available, or injected channel bundles i.e. event time events, we 
wait until there are`



##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -1555,48 +1614,143 @@ func (ss *stageState) updateWatermarks(em 
*ElementManager) set[string] {
        if minWatermarkHold < newOut {
                newOut = minWatermarkHold
        }
-       refreshes := set[string]{}
+       // If the newOut is smaller, then don't change downstream watermarks.
+       if newOut <= ss.output {
+               return nil
+       }
+
        // If bigger, advance the output watermark
-       if newOut > ss.output {
-               ss.output = newOut
-               for _, outputCol := range ss.outputIDs {
-                       consumers := em.consumers[outputCol]
-
-                       for _, sID := range consumers {
-                               
em.stages[sID].updateUpstreamWatermark(outputCol, ss.output)
-                               refreshes.insert(sID)
-                       }
-                       // Inform side input consumers, but don't update the 
upstream watermark.
-                       for _, sID := range em.sideConsumers[outputCol] {
-                               refreshes.insert(sID.Global)
-                       }
-               }
-               // Garbage collect state, timers and side inputs, for all 
windows
-               // that are before the new output watermark.
-               // They'll never be read in again.
-               for _, wins := range ss.sideInputs {
-                       for win := range wins {
-                               // 
TODO(#https://github.com/apache/beam/issues/31438):
-                               // Adjust with AllowedLateness
-                               // Clear out anything we've already used.
-                               if win.MaxTimestamp() < newOut {
-                                       delete(wins, win)
+       preventDownstreamUpdate := ss.createOnWindowExpirationBundles(newOut, 
em)
+
+       // Garbage collect state, timers and side inputs, for all windows
+       // that are before the new output watermark, if they aren't in progress
+       // of being expired.
+       // They'll never be read in again.
+       for _, wins := range ss.sideInputs {
+               for win := range wins {
+                       // TODO(#https://github.com/apache/beam/issues/31438):
+                       // Adjust with AllowedLateness
+                       // Clear out anything we've already used.
+                       if win.MaxTimestamp() < newOut {
+                               // If the expiry is in progress, skip this 
window.
+                               if ss.inProgressExpiredWindows[win] > 0 {
+                                       continue
                                }
+                               delete(wins, win)
                        }
                }
-               for _, wins := range ss.state {
-                       for win := range wins {
-                               // 
TODO(#https://github.com/apache/beam/issues/31438):
-                               // Adjust with AllowedLateness
-                               if win.MaxTimestamp() < newOut {
-                                       delete(wins, win)
+       }
+       for _, wins := range ss.state {
+               for win := range wins {
+                       // TODO(#https://github.com/apache/beam/issues/31438):
+                       // Adjust with AllowedLateness
+                       if win.MaxTimestamp() < newOut {
+                               // If the expiry is in progress, skip 
collecting this window.
+                               if ss.inProgressExpiredWindows[win] > 0 {
+                                       continue
                                }
+                               delete(wins, win)
                        }
                }
        }
+       // If there are windows to expire, we don't update the output watermark 
yet.
+       if preventDownstreamUpdate {
+               return nil
+       }
+
+       // Update this stage's output watermark, and then propagate that to 
downstream stages
+       refreshes := set[string]{}
+       ss.output = newOut
+       for _, outputCol := range ss.outputIDs {
+               consumers := em.consumers[outputCol]
+
+               for _, sID := range consumers {
+                       em.stages[sID].updateUpstreamWatermark(outputCol, 
ss.output)
+                       refreshes.insert(sID)
+               }
+               // Inform side input consumers, but don't update the upstream 
watermark.
+               for _, sID := range em.sideConsumers[outputCol] {
+                       refreshes.insert(sID.Global)
+               }
+       }
        return refreshes
 }
 
+// createOnWindowExpirationBundles creates side channel bundles when windows

Review Comment:
   // createOnWindowExpirationBundles injects channel bundles when windows



##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -1555,48 +1614,143 @@ func (ss *stageState) updateWatermarks(em 
*ElementManager) set[string] {
        if minWatermarkHold < newOut {
                newOut = minWatermarkHold
        }
-       refreshes := set[string]{}
+       // If the newOut is smaller, then don't change downstream watermarks.
+       if newOut <= ss.output {
+               return nil
+       }
+
        // If bigger, advance the output watermark
-       if newOut > ss.output {
-               ss.output = newOut
-               for _, outputCol := range ss.outputIDs {
-                       consumers := em.consumers[outputCol]
-
-                       for _, sID := range consumers {
-                               
em.stages[sID].updateUpstreamWatermark(outputCol, ss.output)
-                               refreshes.insert(sID)
-                       }
-                       // Inform side input consumers, but don't update the 
upstream watermark.
-                       for _, sID := range em.sideConsumers[outputCol] {
-                               refreshes.insert(sID.Global)
-                       }
-               }
-               // Garbage collect state, timers and side inputs, for all 
windows
-               // that are before the new output watermark.
-               // They'll never be read in again.
-               for _, wins := range ss.sideInputs {
-                       for win := range wins {
-                               // 
TODO(#https://github.com/apache/beam/issues/31438):
-                               // Adjust with AllowedLateness
-                               // Clear out anything we've already used.
-                               if win.MaxTimestamp() < newOut {
-                                       delete(wins, win)
+       preventDownstreamUpdate := ss.createOnWindowExpirationBundles(newOut, 
em)
+
+       // Garbage collect state, timers and side inputs, for all windows
+       // that are before the new output watermark, if they aren't in progress
+       // of being expired.
+       // They'll never be read in again.
+       for _, wins := range ss.sideInputs {
+               for win := range wins {
+                       // TODO(#https://github.com/apache/beam/issues/31438):
+                       // Adjust with AllowedLateness
+                       // Clear out anything we've already used.
+                       if win.MaxTimestamp() < newOut {
+                               // If the expiry is in progress, skip this 
window.
+                               if ss.inProgressExpiredWindows[win] > 0 {
+                                       continue
                                }
+                               delete(wins, win)
                        }
                }
-               for _, wins := range ss.state {
-                       for win := range wins {
-                               // 
TODO(#https://github.com/apache/beam/issues/31438):
-                               // Adjust with AllowedLateness
-                               if win.MaxTimestamp() < newOut {
-                                       delete(wins, win)
+       }
+       for _, wins := range ss.state {
+               for win := range wins {
+                       // TODO(#https://github.com/apache/beam/issues/31438):
+                       // Adjust with AllowedLateness
+                       if win.MaxTimestamp() < newOut {
+                               // If the expiry is in progress, skip 
collecting this window.
+                               if ss.inProgressExpiredWindows[win] > 0 {
+                                       continue
                                }
+                               delete(wins, win)
                        }
                }
        }
+       // If there are windows to expire, we don't update the output watermark 
yet.
+       if preventDownstreamUpdate {
+               return nil
+       }
+
+       // Update this stage's output watermark, and then propagate that to 
downstream stages
+       refreshes := set[string]{}
+       ss.output = newOut
+       for _, outputCol := range ss.outputIDs {
+               consumers := em.consumers[outputCol]
+
+               for _, sID := range consumers {
+                       em.stages[sID].updateUpstreamWatermark(outputCol, 
ss.output)
+                       refreshes.insert(sID)
+               }
+               // Inform side input consumers, but don't update the upstream 
watermark.
+               for _, sID := range em.sideConsumers[outputCol] {
+                       refreshes.insert(sID.Global)
+               }
+       }
        return refreshes
 }
 
+// createOnWindowExpirationBundles creates side channel bundles when windows
+// expire for all keys that were used in that window. Returns true if any
+// bundles are created, which means that the window must not yet be garbage
+// collected.
+//
+// Must be called within the stageState.mu's and the ElementManager.refreshCond
+// critical sections.
+func (ss *stageState) createOnWindowExpirationBundles(newOut mtime.Time, em 
*ElementManager) bool {
+       var preventDownstreamUpdate bool
+       for win, keys := range ss.keysToExpireByWindow {
+               // Check if the window has expired.
+               // TODO(#https://github.com/apache/beam/issues/31438):
+               // Adjust with AllowedLateness
+               if win.MaxTimestamp() >= newOut {
+                       continue
+               }
+               // We can't advance the output watermark if there's garbage to 
collect.
+               preventDownstreamUpdate = true
+               // Hold off on garbage collecting data for these windows while 
these
+               // are in progress.
+               ss.inProgressExpiredWindows[win] += 1
+
+               // Produce bundle(s) for these keys and window, and side 
channel them.
+               wm := win.MaxTimestamp()
+               rb := RunBundle{StageID: ss.ID, BundleID: "owe-" + 
em.nextBundID(), Watermark: wm}
+
+               // Now we need to actually build the bundle.
+               var toProcess []element
+               busyKeys := set[string]{}
+               usedKeys := set[string]{}
+               for k := range keys {
+                       if ss.inprogressKeys.present(k) {
+                               busyKeys.insert(k)
+                               continue
+                       }
+                       usedKeys.insert(k)
+                       toProcess = append(toProcess, element{
+                               window:        win,
+                               timestamp:     wm,
+                               pane:          typex.NoFiringPane(),
+                               holdTimestamp: wm,
+                               transform:     ss.onWindowExpiration.Transform,
+                               family:        
ss.onWindowExpiration.TimerFamily,
+                               sequence:      1,
+                               keyBytes:      []byte(k),
+                               elmBytes:      nil,
+                       })

Review Comment:
   This makes the elements available to the SDK during its OnWindowExpiration 
callback associated with a given key/window?



##########
sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go:
##########
@@ -182,8 +182,6 @@ func (s *Server) Prepare(ctx context.Context, req 
*jobpb.PrepareJobRequest) (_ *
                                check("TimerFamilySpecs.TimeDomain.Urn", 
spec.GetTimeDomain(), pipepb.TimeDomain_EVENT_TIME, 
pipepb.TimeDomain_PROCESSING_TIME)
                        }
 
-                       check("OnWindowExpirationTimerFamily", 
pardo.GetOnWindowExpirationTimerFamilySpec(), "") // Unsupported for now.

Review Comment:
   I ask this here as it's where the Pipeline graph is first analyzed. Is it 
the responsibility of the SDK or the Runner to check whether the input 
PCollection is a PCollection<KV>?



##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -338,6 +350,8 @@ func (rb RunBundle) LogValue() slog.Value {
 // The returned channel is closed when the context is canceled, or there are 
no pending elements
 // remaining.
 func (em *ElementManager) Bundles(ctx context.Context, upstreamCancelFn 
context.CancelCauseFunc, nextBundID func() string) <-chan RunBundle {
+       // Make it easier for side channel bundles to get unique IDs.

Review Comment:
   Perhaps change to  `// Make it easier for injected channel bundles to get 
unique IDs`



##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -628,6 +655,11 @@ type Block struct {
        Transform, Family string
 }
 
+// StaticTimerID represents the static user identifiers for a timer.
+type StaticTimerID struct {
+       Transform, TimerFamily string

Review Comment:
   ```
   // Transform identifies fused topologically sorted ids of Pipeline steps.
   Transform string
   
   // TimerFamily is the user identifier for a timer family, such as the 
parameter annotation for the TimerMap in a Java SDK DoFn.
   TimerFamily string
   ```



##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -1068,6 +1113,12 @@ type stageState struct {
        strat                        winStrat        // Windowing Strategy for 
aggregation fireings.
        processingTimeTimersFamilies map[string]bool // Indicates which timer 
families use the processing time domain.
 
+       // onWindowExpiration management
+       onWindowExpiration       StaticTimerID                // The static ID 
of the OnWindowExpiration callback.
+       keysToExpireByWindow     map[typex.Window]set[string] // Tracks all 
keys ever used with a window, so they may be expired.
+       inProgressExpiredWindows map[typex.Window]int         // Tracks the 
number of bundles currently expiring these windows, so we don't prematurely 
collect them.

Review Comment:
   so we don't prematurely garbage collect them.



##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -1555,48 +1614,143 @@ func (ss *stageState) updateWatermarks(em 
*ElementManager) set[string] {
        if minWatermarkHold < newOut {
                newOut = minWatermarkHold
        }
-       refreshes := set[string]{}
+       // If the newOut is smaller, then don't change downstream watermarks.
+       if newOut <= ss.output {
+               return nil
+       }
+
        // If bigger, advance the output watermark
-       if newOut > ss.output {
-               ss.output = newOut
-               for _, outputCol := range ss.outputIDs {
-                       consumers := em.consumers[outputCol]
-
-                       for _, sID := range consumers {
-                               
em.stages[sID].updateUpstreamWatermark(outputCol, ss.output)
-                               refreshes.insert(sID)
-                       }
-                       // Inform side input consumers, but don't update the 
upstream watermark.
-                       for _, sID := range em.sideConsumers[outputCol] {
-                               refreshes.insert(sID.Global)
-                       }
-               }
-               // Garbage collect state, timers and side inputs, for all 
windows
-               // that are before the new output watermark.
-               // They'll never be read in again.
-               for _, wins := range ss.sideInputs {
-                       for win := range wins {
-                               // 
TODO(#https://github.com/apache/beam/issues/31438):
-                               // Adjust with AllowedLateness
-                               // Clear out anything we've already used.
-                               if win.MaxTimestamp() < newOut {
-                                       delete(wins, win)
+       preventDownstreamUpdate := ss.createOnWindowExpirationBundles(newOut, 
em)
+
+       // Garbage collect state, timers and side inputs, for all windows
+       // that are before the new output watermark, if they aren't in progress
+       // of being expired.
+       // They'll never be read in again.
+       for _, wins := range ss.sideInputs {
+               for win := range wins {
+                       // TODO(#https://github.com/apache/beam/issues/31438):
+                       // Adjust with AllowedLateness
+                       // Clear out anything we've already used.
+                       if win.MaxTimestamp() < newOut {
+                               // If the expiry is in progress, skip this 
window.
+                               if ss.inProgressExpiredWindows[win] > 0 {
+                                       continue
                                }
+                               delete(wins, win)
                        }
                }
-               for _, wins := range ss.state {
-                       for win := range wins {
-                               // 
TODO(#https://github.com/apache/beam/issues/31438):
-                               // Adjust with AllowedLateness
-                               if win.MaxTimestamp() < newOut {
-                                       delete(wins, win)
+       }
+       for _, wins := range ss.state {
+               for win := range wins {
+                       // TODO(#https://github.com/apache/beam/issues/31438):
+                       // Adjust with AllowedLateness
+                       if win.MaxTimestamp() < newOut {
+                               // If the expiry is in progress, skip 
collecting this window.
+                               if ss.inProgressExpiredWindows[win] > 0 {
+                                       continue
                                }
+                               delete(wins, win)
                        }
                }
        }
+       // If there are windows to expire, we don't update the output watermark 
yet.
+       if preventDownstreamUpdate {
+               return nil
+       }
+
+       // Update this stage's output watermark, and then propagate that to 
downstream stages
+       refreshes := set[string]{}
+       ss.output = newOut
+       for _, outputCol := range ss.outputIDs {
+               consumers := em.consumers[outputCol]
+
+               for _, sID := range consumers {
+                       em.stages[sID].updateUpstreamWatermark(outputCol, 
ss.output)
+                       refreshes.insert(sID)
+               }
+               // Inform side input consumers, but don't update the upstream 
watermark.
+               for _, sID := range em.sideConsumers[outputCol] {
+                       refreshes.insert(sID.Global)
+               }
+       }
        return refreshes
 }
 
+// createOnWindowExpirationBundles creates side channel bundles when windows
+// expire for all keys that were used in that window. Returns true if any

Review Comment:
   I see why stageState's createOnWindowExpiration method is called by its 
updateWatermarks method which happens during the main bundle loop. This is when 
we know that the watermark timestamp is >= the window timestamp.



##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -386,6 +400,19 @@ func (em *ElementManager) Bundles(ctx context.Context, 
upstreamCancelFn context.
                                changedByProcessingTime = 
em.processTimeEvents.AdvanceTo(emNow)
                                em.changedStages.merge(changedByProcessingTime)
                        }
+                       // Run any side channel bundles first.

Review Comment:
   Change to `// Run any injected channel bundles first.`
   
   Question: Is it first because we want to prioritize processing expired 
windows? And I think I remember you mentioning that timers are processed when 
the watermark timestamp >= the end of the window timestamp so these `@OnTimer` 
and `@OnTimerFamily` would be processed here as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to