This is an automated email from the ASF dual-hosted git repository.
shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 5485467f230 Handle pending adjustment for processing time bundle
correctly. (#36384)
5485467f230 is described below
commit 5485467f230100e7ac2c0b50bda72b5e38ed9826
Author: Shunping Huang <[email protected]>
AuthorDate: Fri Oct 3 19:39:45 2025 -0400
Handle pending adjustment for processing time bundle correctly. (#36384)
---
runners/prism/java/build.gradle | 3 +-
.../prism/internal/engine/elementmanager.go | 44 +++++++++++++---------
2 files changed, 27 insertions(+), 20 deletions(-)
diff --git a/runners/prism/java/build.gradle b/runners/prism/java/build.gradle
index dbd7cc6cb5c..5e5ddbe139e 100644
--- a/runners/prism/java/build.gradle
+++ b/runners/prism/java/build.gradle
@@ -86,9 +86,8 @@ def sickbayTests = [
'org.apache.beam.sdk.metrics.MetricsTest$CommittedMetricTests.testCommittedStringSetMetrics',
'org.apache.beam.sdk.metrics.MetricsTest$CommittedMetricTests.testCommittedGaugeMetrics',
- // negative WaitGroup counter when failing bundle
+ // Instead of 42, Prism got 84, which suggests two early panes of 42 are
fired.
'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState',
- 'org.apache.beam.sdk.testing.TestStreamTest.testEarlyPanesOfWindow',
// A regression introduced when we use number of pending elements rather
than watermark to determine
// the bundle readiness of a stateless stage.
diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
index f77844b6f6c..12b0cada750 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
@@ -477,12 +477,15 @@ func (em *ElementManager) Bundles(ctx context.Context,
upstreamCancelFn context.
}
}
if ptimeEventsReady {
- bundleID, ok, reschedule :=
ss.startProcessingTimeBundle(em, emNow, nextBundID)
+ bundleID, ok, reschedule,
pendingAdjustment := ss.startProcessingTimeBundle(em, emNow, nextBundID)
// Handle the reschedule even when
there's no bundle.
if reschedule {
em.changedStages.insert(stageID)
}
if ok {
+ if pendingAdjustment > 0 {
+
em.addPending(pendingAdjustment)
+ }
rb := RunBundle{StageID:
stageID, BundleID: bundleID, Watermark: watermark}
em.inprogressBundles.insert(rb.BundleID)
@@ -1218,7 +1221,7 @@ type stageKind interface {
holdsInBundle map[mtime.Time]int, panesInBundle []bundlePane,
schedulable bool, pendingAdjustment int)
// buildProcessingTimeBundle handles building processing-time bundles
for the stage per it's kind.
buildProcessingTimeBundle(ss *stageState, em *ElementManager, emNow
mtime.Time) (toProcess elementHeap, minTs mtime.Time, newKeys set[string],
- holdsInBundle map[mtime.Time]int, panesInBundle []bundlePane,
schedulable bool)
+ holdsInBundle map[mtime.Time]int, panesInBundle []bundlePane,
schedulable bool, pendingAdjustment int)
// getPaneOrDefault based on the stage state, element metadata, and
bundle id.
getPaneOrDefault(ss *stageState, defaultPane typex.PaneInfo, w
typex.Window, keyBytes []byte, bundID string) typex.PaneInfo
}
@@ -1983,24 +1986,24 @@ keysPerBundle:
return toProcess, minTs, newKeys, holdsInBundle, panesInBundle,
stillSchedulable, accumulatingPendingAdjustment
}
-func (ss *stageState) startProcessingTimeBundle(em *ElementManager, emNow
mtime.Time, genBundID func() string) (string, bool, bool) {
+func (ss *stageState) startProcessingTimeBundle(em *ElementManager, emNow
mtime.Time, genBundID func() string) (string, bool, bool, int) {
ss.mu.Lock()
defer ss.mu.Unlock()
- toProcess, minTs, newKeys, holdsInBundle, panesInBundle,
stillSchedulable := ss.kind.buildProcessingTimeBundle(ss, em, emNow)
+ toProcess, minTs, newKeys, holdsInBundle, panesInBundle,
stillSchedulable, accumulatingPendingAdjustment :=
ss.kind.buildProcessingTimeBundle(ss, em, emNow)
if len(toProcess) == 0 {
// If we have nothing
- return "", false, stillSchedulable
+ return "", false, stillSchedulable,
accumulatingPendingAdjustment
}
bundID := ss.makeInProgressBundle(genBundID, toProcess, minTs, newKeys,
holdsInBundle, panesInBundle)
slog.Debug("started a processing time bundle", "stageID", ss.ID,
"bundleID", bundID, "size", len(toProcess), "emNow", emNow)
- return bundID, true, stillSchedulable
+ return bundID, true, stillSchedulable, accumulatingPendingAdjustment
}
// handleProcessingTimeTimer contains the common code for handling
processing-time timers for aggregation stages and stateful stages.
func handleProcessingTimeTimer(ss *stageState, em *ElementManager, emNow
mtime.Time,
- processTimerFn func(e element, toProcess []element, holdsInBundle
map[mtime.Time]int, panesInBundle []bundlePane) ([]element, []bundlePane))
(elementHeap, mtime.Time, set[string], map[mtime.Time]int, []bundlePane, bool) {
+ processTimerFn func(e element, toProcess []element, holdsInBundle
map[mtime.Time]int, panesInBundle []bundlePane) ([]element, []bundlePane, int))
(elementHeap, mtime.Time, set[string], map[mtime.Time]int, []bundlePane, bool,
int) {
// TODO: Determine if it's possible and a good idea to treat all
EventTime processing as a MinTime
// Special Case for ProcessingTime handling.
// Eg. Always queue EventTime elements at minTime.
@@ -2010,6 +2013,8 @@ func handleProcessingTimeTimer(ss *stageState, em
*ElementManager, emNow mtime.T
var toProcess []element
var panesInBundle []bundlePane
+ var pendingAdjustment int
+ accumulatingPendingAdjustment := 0
minTs := mtime.MaxTimestamp
holdsInBundle := map[mtime.Time]int{}
@@ -2044,7 +2049,8 @@ func handleProcessingTimeTimer(ss *stageState, em
*ElementManager, emNow mtime.T
minTs = e.timestamp
}
- toProcess, panesInBundle = processTimerFn(e, toProcess,
holdsInBundle, panesInBundle)
+ toProcess, panesInBundle, pendingAdjustment =
processTimerFn(e, toProcess, holdsInBundle, panesInBundle)
+ accumulatingPendingAdjustment += pendingAdjustment
}
nextTime = ss.processingTimeTimers.Peek()
@@ -2065,24 +2071,26 @@ func handleProcessingTimeTimer(ss *stageState, em
*ElementManager, emNow mtime.T
// Add a refresh if there are still processing time events to process.
stillSchedulable := (nextTime < emNow && nextTime != mtime.MaxTimestamp
|| len(notYet) > 0)
- return toProcess, minTs, newKeys, holdsInBundle, panesInBundle,
stillSchedulable
+ return toProcess, minTs, newKeys, holdsInBundle, panesInBundle,
stillSchedulable, accumulatingPendingAdjustment
}
// buildProcessingTimeBundle for stateful stages prepares bundles for
processing-time timers
-func (*statefulStageKind) buildProcessingTimeBundle(ss *stageState, em
*ElementManager, emNow mtime.Time) (elementHeap, mtime.Time, set[string],
map[mtime.Time]int, []bundlePane, bool) {
- return handleProcessingTimeTimer(ss, em, emNow, func(e element,
toProcess []element, holdsInBundle map[mtime.Time]int, panesInBundle
[]bundlePane) ([]element, []bundlePane) {
+func (*statefulStageKind) buildProcessingTimeBundle(ss *stageState, em
*ElementManager, emNow mtime.Time) (elementHeap, mtime.Time, set[string],
map[mtime.Time]int, []bundlePane, bool, int) {
+ return handleProcessingTimeTimer(ss, em, emNow, func(e element,
toProcess []element, holdsInBundle map[mtime.Time]int, panesInBundle
[]bundlePane) ([]element, []bundlePane, int) {
holdsInBundle[e.holdTimestamp]++
// We're going to process this timer!
toProcess = append(toProcess, e)
- return toProcess, nil
+ return toProcess, nil, 0
})
}
// buildProcessingTimeBundle for aggregation stages prepares bundles for
after-processing-time triggers
-func (*aggregateStageKind) buildProcessingTimeBundle(ss *stageState, em
*ElementManager, emNow mtime.Time) (elementHeap, mtime.Time, set[string],
map[mtime.Time]int, []bundlePane, bool) {
- return handleProcessingTimeTimer(ss, em, emNow, func(e element,
toProcess []element, holdsInBundle map[mtime.Time]int, panesInBundle
[]bundlePane) ([]element, []bundlePane) {
+func (*aggregateStageKind) buildProcessingTimeBundle(ss *stageState, em
*ElementManager, emNow mtime.Time) (elementHeap, mtime.Time, set[string],
map[mtime.Time]int, []bundlePane, bool, int) {
+ return handleProcessingTimeTimer(ss, em, emNow, func(e element,
toProcess []element, holdsInBundle map[mtime.Time]int, panesInBundle
[]bundlePane) ([]element, []bundlePane, int) {
// Different from `buildProcessingTimeBundle` for stateful
stage,
// triggers don't hold back the watermark, so no holds are in
the triggered bundle.
+ var pendingAdjustment int
+ var elems []element
state := ss.state[LinkID{}][e.window][string(e.keyBytes)]
endOfWindowReached := e.window.MaxTimestamp() < ss.input
ready := ss.strat.IsTriggerReady(triggerInput{
@@ -2095,7 +2103,7 @@ func (*aggregateStageKind) buildProcessingTimeBundle(ss
*stageState, em *Element
state.Pane = computeNextTriggeredPane(state.Pane,
endOfWindowReached)
// We're going to process this trigger!
- elems, _ := ss.buildTriggeredBundle(em,
string(e.keyBytes), e.window)
+ elems, pendingAdjustment = ss.buildTriggeredBundle(em,
string(e.keyBytes), e.window)
toProcess = append(toProcess, elems...)
ss.state[LinkID{}][e.window][string(e.keyBytes)] = state
@@ -2103,14 +2111,14 @@ func (*aggregateStageKind) buildProcessingTimeBundle(ss
*stageState, em *Element
panesInBundle = append(panesInBundle, bundlePane{})
}
- return toProcess, panesInBundle
+ return toProcess, panesInBundle, pendingAdjustment
})
}
// buildProcessingTimeBundle for stateless stages is not supposed to be called
currently
-func (*ordinaryStageKind) buildProcessingTimeBundle(ss *stageState, em
*ElementManager, emNow mtime.Time) (elementHeap, mtime.Time, set[string],
map[mtime.Time]int, []bundlePane, bool) {
+func (*ordinaryStageKind) buildProcessingTimeBundle(ss *stageState, em
*ElementManager, emNow mtime.Time) (elementHeap, mtime.Time, set[string],
map[mtime.Time]int, []bundlePane, bool, int) {
slog.Error("ordinary stages can't have processing time elements")
- return nil, mtime.MinTimestamp, nil, nil, nil, false
+ return nil, mtime.MinTimestamp, nil, nil, nil, false, 0
}
// makeInProgressBundle is common code to store a set of elements as a bundle
in progress.