lostluck commented on code in PR #36069:
URL: https://github.com/apache/beam/pull/36069#discussion_r2341959970
##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -1466,22 +1509,43 @@ func (ss *stageState) buildTriggeredBundle(em
*ElementManager, key []byte, win t
// Ensure the heap invariants are maintained.
heap.Init(&dnt.elements)
}
+ return toProcess, accumulationDiff
+}
+func (ss *stageState) startTriggeredBundle(key []byte, win typex.Window,
genBundID func() string) (string, bool, int) {
Review Comment:
Since this is mutating the stage state, please also document that we need
the stage.mu lock held. Same for buildTriggeredBundle above.
##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -1302,17 +1313,54 @@ func (*aggregateStageKind) addPending(ss *stageState,
em *ElementManager, newPen
ready := ss.strat.IsTriggerReady(triggerInput{
newElementCount: 1,
endOfWindowReached: endOfWindowReached,
+ emNow: em.ProcessingTimeNow(),
}, &state)
if ready {
state.Pane = computeNextTriggeredPane(state.Pane,
endOfWindowReached)
+ } else {
+ if pts := ss.strat.GetAfterProcessingTimeTriggers();
pts != nil {
+ for _, t := range pts {
+ ts := (&state).getTriggerState(t)
+ if ts.extra == nil ||
t.shouldFire((&state)) {
+ // Skipping inserting a
processing time timer if the firing time
+ // is not set or it already
should fire.
+ // When the after processing
time triggers should fire, there are
+ // two scenarios:
+ // (1) the entire trigger of
this window is ready to fire. In this
+ // case, `ready` should be
true and we won't reach here.
+ // (2) we are still waiting for
other triggers (subtriggers) to
+ // fire (e.g. AfterAll).
+ continue
+ }
+ firingTime :=
ts.extra.(afterProcessingTimeState).firingTime
+ notYetHolds := map[mtime.Time]int{}
+ timer := element{
+ window: e.window,
+ timestamp: firingTime,
+ holdTimestamp:
e.window.MaxTimestamp(),
+ pane:
typex.NoFiringPane(),
+ transform: ss.ID, // Use
stage id to fake transform id
+ family:
"AfterProcessingTime",
+ tag: "",
+ sequence: 1,
+ elmBytes: nil,
+ keyBytes: e.keyBytes,
+ }
+ // TODO: how to deal with watermark
holds for this implicit processing time timer
+ //
ss.watermarkHolds.Add(timer.holdTimestamp, 1)
+
ss.processingTimeTimers.Persist(firingTime, timer, notYetHolds)
+
em.processTimeEvents.Schedule(firingTime, ss.ID)
+ em.wakeUpAt(firingTime)
Review Comment:
I'd recommend just putting the em.processTime.Schedule call *inside* the
wakeUpAt call, and replacing all the existing calls. Perhaps a better name:
`scheduleProcessingTimeEventForStage`
Avoids issues where one is called but the other is not.
------
On the other hand: If we keep them separate we have a better chance of
reducing the number of spare goroutines/timers we're keeping around. Eg. On
such a firing, we see when the next processing time event is, and re-schedule
it. That feels like a loop we can build and initialize as part of the Bundles
set up... I'll give it some more thought.
##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -1466,22 +1509,43 @@ func (ss *stageState) buildTriggeredBundle(em
*ElementManager, key []byte, win t
// Ensure the heap invariants are maintained.
heap.Init(&dnt.elements)
}
+ return toProcess, accumulationDiff
+}
+func (ss *stageState) startTriggeredBundle(key []byte, win typex.Window,
genBundID func() string) (string, bool, int) {
+ toProcess, accumulationDiff := ss.buildTriggeredBundle(key, win)
+
+ if len(toProcess) == 0 {
+ return "", false, accumulationDiff
Review Comment:
Should accumulationDiff ever be non-zero when a bundle isn't going to be run?
##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -1869,17 +1948,87 @@ func (ss *stageState) startProcessingTimeBundle(em
*ElementManager, emNow mtime.
for _, v := range notYet {
ss.processingTimeTimers.Persist(v.firing, v.timer, notYetHolds)
em.processTimeEvents.Schedule(v.firing, ss.ID)
+ em.wakeUpAt(v.firing)
}
// Add a refresh if there are still processing time events to process.
stillSchedulable := (nextTime < emNow && nextTime != mtime.MaxTimestamp
|| len(notYet) > 0)
- if len(toProcess) == 0 {
- // If we have nothing
- return "", false, stillSchedulable
+ return toProcess, minTs, newKeys, holdsInBundle, stillSchedulable
+}
+
+func (*aggregateStageKind) buildProcessingTimeBundle(ss *stageState, em
*ElementManager, emNow mtime.Time) (toProcess elementHeap, _ mtime.Time, _
set[string], _ map[mtime.Time]int, schedulable bool) {
+ // var toProcess []element
+ minTs := mtime.MaxTimestamp
+ holdsInBundle := map[mtime.Time]int{}
+
+ var notYet []fireElement
+
+ nextTime := ss.processingTimeTimers.Peek()
+ keyCounts := map[string]int{}
+ newKeys := set[string]{}
+
+ for nextTime <= emNow {
+ elems := ss.processingTimeTimers.FireAt(nextTime)
+ for _, e := range elems {
+ // Check if we're already executing this timer's key.
+ if ss.inprogressKeys.present(string(e.keyBytes)) {
+ notYet = append(notYet, fireElement{firing:
nextTime, timer: e})
+ continue
+ }
+
+ // If we are set to have OneKeyPerBundle, and we
already have a key for this bundle, we process it later.
+ if len(keyCounts) > 0 && OneKeyPerBundle {
+ notYet = append(notYet, fireElement{firing:
nextTime, timer: e})
+ continue
+ }
+ // If we are set to have OneElementPerKey, and we
already have an element for this key we set this to process later.
+ if v := keyCounts[string(e.keyBytes)]; v > 0 &&
OneElementPerKey {
+ notYet = append(notYet, fireElement{firing:
nextTime, timer: e})
+ continue
+ }
+ keyCounts[string(e.keyBytes)]++
+ newKeys.insert(string(e.keyBytes))
+ if e.timestamp < minTs {
+ minTs = e.timestamp
+ }
+ // TODO: how to deal with watermark holds for this
implicit processing time timer
+ // holdsInBundle[e.holdTimestamp]++
Review Comment:
Correct me if I'm wrong, I think this is the only difference preventing us
from a single shared method between the general stateful handler and the
aggregation handler?
(That is, a single method on ss.state that both kinds call)
##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -1869,17 +1948,87 @@ func (ss *stageState) startProcessingTimeBundle(em
*ElementManager, emNow mtime.
for _, v := range notYet {
ss.processingTimeTimers.Persist(v.firing, v.timer, notYetHolds)
em.processTimeEvents.Schedule(v.firing, ss.ID)
+ em.wakeUpAt(v.firing)
}
// Add a refresh if there are still processing time events to process.
stillSchedulable := (nextTime < emNow && nextTime != mtime.MaxTimestamp
|| len(notYet) > 0)
- if len(toProcess) == 0 {
- // If we have nothing
- return "", false, stillSchedulable
+ return toProcess, minTs, newKeys, holdsInBundle, stillSchedulable
+}
+
+func (*aggregateStageKind) buildProcessingTimeBundle(ss *stageState, em
*ElementManager, emNow mtime.Time) (toProcess elementHeap, _ mtime.Time, _
set[string], _ map[mtime.Time]int, schedulable bool) {
Review Comment:
I think we're safe in commenting here that "aggegateStageKind" builds
processing time Bundles only when there are processing time triggers.
##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -2154,14 +2304,16 @@ func (ss *stageState) bundleReady(em *ElementManager,
emNow mtime.Time) (mtime.T
// then we can't yet process this stage.
inputW := ss.input
_, upstreamW := ss.UpstreamWatermark()
- if inputW == upstreamW {
+ if inputW == upstreamW && !ss.watermarkAdvanced {
slog.Debug("bundleReady: unchanged upstream watermark",
slog.String("stage", ss.ID),
slog.Group("watermark",
slog.Any("upstream", upstreamW),
slog.Any("input", inputW)))
return mtime.MinTimestamp, false, ptimeEventsReady,
injectedReady
}
+
+ ss.watermarkAdvanced = false
Review Comment:
The part I don't love about this here is that bundleReady wasn't doing any
mutations to the state previously and now it's effectively bypassing the
previous guardrails.
If we have both a good explanation why we need this behavior, and passing
tests, then I'm happy with it. We have a what, but "why" is it
correct/necessary?
This applies whether we move this property to be something we're passing
into the bundleReady call or if it remains a field or not.
##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -2154,14 +2304,16 @@ func (ss *stageState) bundleReady(em *ElementManager,
emNow mtime.Time) (mtime.T
// then we can't yet process this stage.
inputW := ss.input
_, upstreamW := ss.UpstreamWatermark()
- if inputW == upstreamW {
+ if inputW == upstreamW && !ss.watermarkAdvanced {
Review Comment:
We need to update the comment just above here explaining why this bypass is
needed.
##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -1466,22 +1509,43 @@ func (ss *stageState) buildTriggeredBundle(em
*ElementManager, key []byte, win t
// Ensure the heap invariants are maintained.
heap.Init(&dnt.elements)
}
+ return toProcess, accumulationDiff
+}
+func (ss *stageState) startTriggeredBundle(key []byte, win typex.Window,
genBundID func() string) (string, bool, int) {
Review Comment:
Alternatively, since this is ultimately only called by runTriggeredBundle,
we should just move it all into there instead. I can be convinced otherwise.
##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -1996,6 +2145,7 @@ func (ss *stageState) updateWatermarks(em
*ElementManager) set[string] {
// If bigger, advance the input watermark.
if newIn > ss.input {
ss.input = newIn
+ ss.watermarkAdvanced = true
Review Comment:
I don't love this toggle, but please name it "inputWatermarkAdvanced"
instead.
It also looks like this is mostly just to mark this specific property to get
it to the bundleReady call for that bypass. I'd prefer if we can make it
explicitly returned out, and passed into bundleReady instead.
stageState contains persistent state, while the "inputWatermark has
advanced" is a transient property between the two calls. It feels brittle that
it could be reset or forgotten to be set, compared to implicitly reset when we
pass it explicitly.
##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -1810,14 +1874,29 @@ func (ss *stageState) startProcessingTimeBundle(em
*ElementManager, emNow mtime.
ss.mu.Lock()
defer ss.mu.Unlock()
+ toProcess, minTs, newKeys, holdsInBundle, stillSchedulable :=
ss.kind.buildProcessingTimeBundle(ss, em, emNow)
+
+ if len(toProcess) == 0 {
+ // If we have nothing
+ return "", false, stillSchedulable
+ }
+ bundID := ss.makeInProgressBundle(genBundID, toProcess, minTs, newKeys,
holdsInBundle)
+ return bundID, true, stillSchedulable
+}
+
+func (*ordinaryStageKind) buildProcessingTimeBundle(ss *stageState, em
*ElementManager, emNow mtime.Time) (toProcess elementHeap, _ mtime.Time, _
set[string], _ map[mtime.Time]int, schedulable bool) {
Review Comment:
Feel free to drop the named return values, since we aren't using them here.
Same for the others. Named returns are more valuable for documentation purposes
when the function is exported, or for tricky but useful defer special cases.
Here we can just have a comment in the implementation anyway, when
applicable. And avoid the _ names because we aren't using any of the named
returns.
##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -1466,22 +1509,43 @@ func (ss *stageState) buildTriggeredBundle(em
*ElementManager, key []byte, win t
// Ensure the heap invariants are maintained.
heap.Init(&dnt.elements)
}
+ return toProcess, accumulationDiff
+}
+func (ss *stageState) startTriggeredBundle(key []byte, win typex.Window,
genBundID func() string) (string, bool, int) {
Review Comment:
Also document what the returns are please:
...returns the bundleID, whether the bundle is OK, and any necessary
adjustments to the pending count due to accumulation.
##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -1869,17 +1948,87 @@ func (ss *stageState) startProcessingTimeBundle(em
*ElementManager, emNow mtime.
for _, v := range notYet {
ss.processingTimeTimers.Persist(v.firing, v.timer, notYetHolds)
em.processTimeEvents.Schedule(v.firing, ss.ID)
+ em.wakeUpAt(v.firing)
}
// Add a refresh if there are still processing time events to process.
stillSchedulable := (nextTime < emNow && nextTime != mtime.MaxTimestamp
|| len(notYet) > 0)
- if len(toProcess) == 0 {
- // If we have nothing
- return "", false, stillSchedulable
+ return toProcess, minTs, newKeys, holdsInBundle, stillSchedulable
+}
+
+func (*aggregateStageKind) buildProcessingTimeBundle(ss *stageState, em
*ElementManager, emNow mtime.Time) (toProcess elementHeap, _ mtime.Time, _
set[string], _ map[mtime.Time]int, schedulable bool) {
+ // var toProcess []element
Review Comment:
rm comment
##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -1153,6 +1161,8 @@ type stageState struct {
inprogressHoldsByBundle map[string]map[mtime.Time]int // bundle to
associated holds.
processingTimeTimers *timerHandler
+
+ watermarkAdvanced bool // whether the watermark for this stage has
advanced
Review Comment:
Commented elsewhere, but what this is whether the input watermark has
advanced or not since the last bundle. Previously we just needed the output
watermark time.
##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -1466,22 +1509,43 @@ func (ss *stageState) buildTriggeredBundle(em
*ElementManager, key []byte, win t
// Ensure the heap invariants are maintained.
heap.Init(&dnt.elements)
}
+ return toProcess, accumulationDiff
+}
+func (ss *stageState) startTriggeredBundle(key []byte, win typex.Window,
genBundID func() string) (string, bool, int) {
+ toProcess, accumulationDiff := ss.buildTriggeredBundle(key, win)
+
+ if len(toProcess) == 0 {
+ return "", false, accumulationDiff
+ }
if ss.inprogressKeys == nil {
ss.inprogressKeys = set[string]{}
}
- ss.makeInProgressBundle(
- func() string { return rb.BundleID },
+ bundID := ss.makeInProgressBundle(
+ genBundID,
toProcess,
ss.input,
singleSet(string(key)),
nil,
)
- ss.bundlesToInject = append(ss.bundlesToInject, rb)
- // Bundle is marked in progress here to prevent a race condition.
- em.refreshCond.L.Lock()
- em.inprogressBundles.insert(rb.BundleID)
- em.refreshCond.L.Unlock()
+
+ return bundID, true, accumulationDiff
+}
+
+// runTriggeredBundle must be called with the stage.mu lock held.
+// When in discarding mode, returns 0.
+// When in accumulating mode, returns the number of fired elements to maintain
a correct pending count.
Review Comment:
Might be good to elaborate here, since I had to re-think this through.
Perhaps...
```suggestion
// Returns the accumulation diff that the pending work needs to be adjusted
by, as completed work is subtracted from the pending count.
// When in discarding mode, returns 0, as the pending work already includes
these elements.
// When in accumulating mode, returns the number of fired elements, since
those elements remain pending even after this bundle is fired.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]