lostluck commented on code in PR #36069:
URL: https://github.com/apache/beam/pull/36069#discussion_r2341959970


##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -1466,22 +1509,43 @@ func (ss *stageState) buildTriggeredBundle(em 
*ElementManager, key []byte, win t
                // Ensure the heap invariants are maintained.
                heap.Init(&dnt.elements)
        }
+       return toProcess, accumulationDiff
+}
 
+func (ss *stageState) startTriggeredBundle(key []byte, win typex.Window, 
genBundID func() string) (string, bool, int) {

Review Comment:
   Since this is mutating the stage state, please also document that we need 
the stage.mu lock held. Same for buildTriggeredBundle above.



##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -1302,17 +1313,54 @@ func (*aggregateStageKind) addPending(ss *stageState, 
em *ElementManager, newPen
                ready := ss.strat.IsTriggerReady(triggerInput{
                        newElementCount:    1,
                        endOfWindowReached: endOfWindowReached,
+                       emNow:              em.ProcessingTimeNow(),
                }, &state)
 
                if ready {
                        state.Pane = computeNextTriggeredPane(state.Pane, 
endOfWindowReached)
+               } else {
+                       if pts := ss.strat.GetAfterProcessingTimeTriggers(); 
pts != nil {
+                               for _, t := range pts {
+                                       ts := (&state).getTriggerState(t)
+                                       if ts.extra == nil || 
t.shouldFire((&state)) {
+                                               // Skipping inserting a 
processing time timer if the firing time
+                                               // is not set or it already 
should fire.
+                                               // When the after processing 
time triggers should fire, there are
+                                               // two scenarios:
+                                               // (1) the entire trigger of 
this window is ready to fire. In this
+                                               //     case, `ready` should be 
true and we won't reach here.
+                                               // (2) we are still waiting for 
other triggers (subtriggers) to
+                                               //     fire (e.g. AfterAll).
+                                               continue
+                                       }
+                                       firingTime := 
ts.extra.(afterProcessingTimeState).firingTime
+                                       notYetHolds := map[mtime.Time]int{}
+                                       timer := element{
+                                               window:        e.window,
+                                               timestamp:     firingTime,
+                                               holdTimestamp: 
e.window.MaxTimestamp(),
+                                               pane:          
typex.NoFiringPane(),
+                                               transform:     ss.ID, // Use 
stage id to fake transform id
+                                               family:        
"AfterProcessingTime",
+                                               tag:           "",
+                                               sequence:      1,
+                                               elmBytes:      nil,
+                                               keyBytes:      e.keyBytes,
+                                       }
+                                       // TODO: how to deal with watermark 
holds for this implicit processing time timer
+                                       // 
ss.watermarkHolds.Add(timer.holdTimestamp, 1)
+                                       
ss.processingTimeTimers.Persist(firingTime, timer, notYetHolds)
+                                       
em.processTimeEvents.Schedule(firingTime, ss.ID)
+                                       em.wakeUpAt(firingTime)

Review Comment:
   I'd recommend just putting the em.processTime.Schedule call *inside* the 
wakeUpAt call, and replacing all the existing calls. Perhaps a better name: 
`scheduleProcessingTimeEventForStage`
   
   Avoids issues where one is called but the other is not.
   
   ------
   
   On the other hand: If we keep them separate we have a better chance of 
reducing the number of spare goroutines/timers we're keeping around. Eg. On 
such a firing, we see when the next processing time event is, and re-schedule 
it. That feels like a loop we can build and initialize as part of the Bundles 
set up...  I'll give it some more thought.



##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -1466,22 +1509,43 @@ func (ss *stageState) buildTriggeredBundle(em 
*ElementManager, key []byte, win t
                // Ensure the heap invariants are maintained.
                heap.Init(&dnt.elements)
        }
+       return toProcess, accumulationDiff
+}
 
+func (ss *stageState) startTriggeredBundle(key []byte, win typex.Window, 
genBundID func() string) (string, bool, int) {
+       toProcess, accumulationDiff := ss.buildTriggeredBundle(key, win)
+
+       if len(toProcess) == 0 {
+               return "", false, accumulationDiff

Review Comment:
   Should accumulationDiff ever be non-zero when a bundle isn't going to be run?



##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -1869,17 +1948,87 @@ func (ss *stageState) startProcessingTimeBundle(em 
*ElementManager, emNow mtime.
        for _, v := range notYet {
                ss.processingTimeTimers.Persist(v.firing, v.timer, notYetHolds)
                em.processTimeEvents.Schedule(v.firing, ss.ID)
+               em.wakeUpAt(v.firing)
        }
 
        // Add a refresh if there are still processing time events to process.
        stillSchedulable := (nextTime < emNow && nextTime != mtime.MaxTimestamp 
|| len(notYet) > 0)
 
-       if len(toProcess) == 0 {
-               // If we have nothing
-               return "", false, stillSchedulable
+       return toProcess, minTs, newKeys, holdsInBundle, stillSchedulable
+}
+
+func (*aggregateStageKind) buildProcessingTimeBundle(ss *stageState, em 
*ElementManager, emNow mtime.Time) (toProcess elementHeap, _ mtime.Time, _ 
set[string], _ map[mtime.Time]int, schedulable bool) {
+       // var toProcess []element
+       minTs := mtime.MaxTimestamp
+       holdsInBundle := map[mtime.Time]int{}
+
+       var notYet []fireElement
+
+       nextTime := ss.processingTimeTimers.Peek()
+       keyCounts := map[string]int{}
+       newKeys := set[string]{}
+
+       for nextTime <= emNow {
+               elems := ss.processingTimeTimers.FireAt(nextTime)
+               for _, e := range elems {
+                       // Check if we're already executing this timer's key.
+                       if ss.inprogressKeys.present(string(e.keyBytes)) {
+                               notYet = append(notYet, fireElement{firing: 
nextTime, timer: e})
+                               continue
+                       }
+
+                       // If we are set to have OneKeyPerBundle, and we 
already have a key for this bundle, we process it later.
+                       if len(keyCounts) > 0 && OneKeyPerBundle {
+                               notYet = append(notYet, fireElement{firing: 
nextTime, timer: e})
+                               continue
+                       }
+                       // If we are set to have OneElementPerKey, and we 
already have an element for this key we set this to process later.
+                       if v := keyCounts[string(e.keyBytes)]; v > 0 && 
OneElementPerKey {
+                               notYet = append(notYet, fireElement{firing: 
nextTime, timer: e})
+                               continue
+                       }
+                       keyCounts[string(e.keyBytes)]++
+                       newKeys.insert(string(e.keyBytes))
+                       if e.timestamp < minTs {
+                               minTs = e.timestamp
+                       }
+                       // TODO: how to deal with watermark holds for this 
implicit processing time timer
+                       // holdsInBundle[e.holdTimestamp]++

Review Comment:
   Correct me if I'm wrong, I think this is the only difference preventing us 
from a single shared method between the general stateful handler and the 
aggregation handler? 
   
    (That is, a single method on ss.state that both kinds call)



##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -1869,17 +1948,87 @@ func (ss *stageState) startProcessingTimeBundle(em 
*ElementManager, emNow mtime.
        for _, v := range notYet {
                ss.processingTimeTimers.Persist(v.firing, v.timer, notYetHolds)
                em.processTimeEvents.Schedule(v.firing, ss.ID)
+               em.wakeUpAt(v.firing)
        }
 
        // Add a refresh if there are still processing time events to process.
        stillSchedulable := (nextTime < emNow && nextTime != mtime.MaxTimestamp 
|| len(notYet) > 0)
 
-       if len(toProcess) == 0 {
-               // If we have nothing
-               return "", false, stillSchedulable
+       return toProcess, minTs, newKeys, holdsInBundle, stillSchedulable
+}
+
+func (*aggregateStageKind) buildProcessingTimeBundle(ss *stageState, em 
*ElementManager, emNow mtime.Time) (toProcess elementHeap, _ mtime.Time, _ 
set[string], _ map[mtime.Time]int, schedulable bool) {

Review Comment:
   I think we're safe in commenting here that "aggegateStageKind" builds 
processing time Bundles only when there are processing time triggers.



##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -2154,14 +2304,16 @@ func (ss *stageState) bundleReady(em *ElementManager, 
emNow mtime.Time) (mtime.T
        // then we can't yet process this stage.
        inputW := ss.input
        _, upstreamW := ss.UpstreamWatermark()
-       if inputW == upstreamW {
+       if inputW == upstreamW && !ss.watermarkAdvanced {
                slog.Debug("bundleReady: unchanged upstream watermark",
                        slog.String("stage", ss.ID),
                        slog.Group("watermark",
                                slog.Any("upstream", upstreamW),
                                slog.Any("input", inputW)))
                return mtime.MinTimestamp, false, ptimeEventsReady, 
injectedReady
        }
+
+       ss.watermarkAdvanced = false

Review Comment:
   The part I don't love about this here is that bundleReady wasn't doing any 
mutations to the state previously and now it's effectively bypassing the 
previous guardrails.
   
   If we have both a good explanation why we need this behavior, and passing 
tests, then I'm happy with it. We have a what, but "why" is it 
correct/necessary?
   
   This applies whether we move this property to be something we're passing 
into the bundleReady call or if it remains a field or not.



##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -2154,14 +2304,16 @@ func (ss *stageState) bundleReady(em *ElementManager, 
emNow mtime.Time) (mtime.T
        // then we can't yet process this stage.
        inputW := ss.input
        _, upstreamW := ss.UpstreamWatermark()
-       if inputW == upstreamW {
+       if inputW == upstreamW && !ss.watermarkAdvanced {

Review Comment:
   We need to update the comment just above here explaining why this bypass is 
needed.



##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -1466,22 +1509,43 @@ func (ss *stageState) buildTriggeredBundle(em 
*ElementManager, key []byte, win t
                // Ensure the heap invariants are maintained.
                heap.Init(&dnt.elements)
        }
+       return toProcess, accumulationDiff
+}
 
+func (ss *stageState) startTriggeredBundle(key []byte, win typex.Window, 
genBundID func() string) (string, bool, int) {

Review Comment:
   Alternatively, since this is ultimately only called by runTriggeredBundle, 
we should just move it all into there instead. I can be convinced otherwise.



##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -1996,6 +2145,7 @@ func (ss *stageState) updateWatermarks(em 
*ElementManager) set[string] {
        // If bigger, advance the input watermark.
        if newIn > ss.input {
                ss.input = newIn
+               ss.watermarkAdvanced = true

Review Comment:
   I don't love this toggle, but please name it "inputWatermarkAdvanced" 
instead. 
   
   It also looks like this is mostly just to mark this specific property to get 
it to the bundleReady call for that bypass. I'd prefer if we can make it 
explicitly returned out, and passed into bundleReady instead. 
   
   stageState contains persistent state, while the "inputWatermark has 
advanced" is a transient property between the two calls. It feels brittle that 
it could be reset or forgotten to be set, compared to implicitly reset when we 
pass it explicitly.
   



##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -1810,14 +1874,29 @@ func (ss *stageState) startProcessingTimeBundle(em 
*ElementManager, emNow mtime.
        ss.mu.Lock()
        defer ss.mu.Unlock()
 
+       toProcess, minTs, newKeys, holdsInBundle, stillSchedulable := 
ss.kind.buildProcessingTimeBundle(ss, em, emNow)
+
+       if len(toProcess) == 0 {
+               // If we have nothing
+               return "", false, stillSchedulable
+       }
+       bundID := ss.makeInProgressBundle(genBundID, toProcess, minTs, newKeys, 
holdsInBundle)
+       return bundID, true, stillSchedulable
+}
+
+func (*ordinaryStageKind) buildProcessingTimeBundle(ss *stageState, em 
*ElementManager, emNow mtime.Time) (toProcess elementHeap, _ mtime.Time, _ 
set[string], _ map[mtime.Time]int, schedulable bool) {

Review Comment:
   Feel free to drop the named return values, since we aren't using them here. 
Same for the others. Named returns are more valuable for documentation purposes 
when the function is exported, or for tricky but useful defer special cases. 
   
   Here we can just have a comment in the implementation anyway, when 
applicable. And avoid the _ names because we aren't using any of the named 
returns.



##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -1466,22 +1509,43 @@ func (ss *stageState) buildTriggeredBundle(em 
*ElementManager, key []byte, win t
                // Ensure the heap invariants are maintained.
                heap.Init(&dnt.elements)
        }
+       return toProcess, accumulationDiff
+}
 
+func (ss *stageState) startTriggeredBundle(key []byte, win typex.Window, 
genBundID func() string) (string, bool, int) {

Review Comment:
   Also document what the returns are please:
   
   ...returns the bundleID, whether the bundle is OK, and any necessary 
adjustments to the pending count due to accumulation.



##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -1869,17 +1948,87 @@ func (ss *stageState) startProcessingTimeBundle(em 
*ElementManager, emNow mtime.
        for _, v := range notYet {
                ss.processingTimeTimers.Persist(v.firing, v.timer, notYetHolds)
                em.processTimeEvents.Schedule(v.firing, ss.ID)
+               em.wakeUpAt(v.firing)
        }
 
        // Add a refresh if there are still processing time events to process.
        stillSchedulable := (nextTime < emNow && nextTime != mtime.MaxTimestamp 
|| len(notYet) > 0)
 
-       if len(toProcess) == 0 {
-               // If we have nothing
-               return "", false, stillSchedulable
+       return toProcess, minTs, newKeys, holdsInBundle, stillSchedulable
+}
+
+func (*aggregateStageKind) buildProcessingTimeBundle(ss *stageState, em 
*ElementManager, emNow mtime.Time) (toProcess elementHeap, _ mtime.Time, _ 
set[string], _ map[mtime.Time]int, schedulable bool) {
+       // var toProcess []element

Review Comment:
   rm comment



##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -1153,6 +1161,8 @@ type stageState struct {
        inprogressHoldsByBundle map[string]map[mtime.Time]int // bundle to 
associated holds.
 
        processingTimeTimers *timerHandler
+
+       watermarkAdvanced bool // whether the watermark for this stage has 
advanced

Review Comment:
   Commented elsewhere, but what this is whether the input watermark has 
advanced or not since the last bundle. Previously we just needed the output 
watermark time.



##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -1466,22 +1509,43 @@ func (ss *stageState) buildTriggeredBundle(em 
*ElementManager, key []byte, win t
                // Ensure the heap invariants are maintained.
                heap.Init(&dnt.elements)
        }
+       return toProcess, accumulationDiff
+}
 
+func (ss *stageState) startTriggeredBundle(key []byte, win typex.Window, 
genBundID func() string) (string, bool, int) {
+       toProcess, accumulationDiff := ss.buildTriggeredBundle(key, win)
+
+       if len(toProcess) == 0 {
+               return "", false, accumulationDiff
+       }
        if ss.inprogressKeys == nil {
                ss.inprogressKeys = set[string]{}
        }
-       ss.makeInProgressBundle(
-               func() string { return rb.BundleID },
+       bundID := ss.makeInProgressBundle(
+               genBundID,
                toProcess,
                ss.input,
                singleSet(string(key)),
                nil,
        )
-       ss.bundlesToInject = append(ss.bundlesToInject, rb)
-       // Bundle is marked in progress here to prevent a race condition.
-       em.refreshCond.L.Lock()
-       em.inprogressBundles.insert(rb.BundleID)
-       em.refreshCond.L.Unlock()
+
+       return bundID, true, accumulationDiff
+}
+
+// runTriggeredBundle must be called with the stage.mu lock held.
+// When in discarding mode, returns 0.
+// When in accumulating mode, returns the number of fired elements to maintain 
a correct pending count.

Review Comment:
   Might be good to elaborate here, since I had to re-think this through.
   
   Perhaps...
   ```suggestion
   // Returns the accumulation diff that the pending work needs to be adjusted 
by, as completed work is subtracted from the pending count.
   // When in discarding mode, returns 0, as the pending work already includes 
these elements.
   // When in accumulating mode, returns the number of fired elements, since 
those elements remain pending even after this bundle is fired.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to