This is an automated email from the ASF dual-hosted git repository.

shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 5485467f230 Handle pending adjustment for processing time bundle 
correctly. (#36384)
5485467f230 is described below

commit 5485467f230100e7ac2c0b50bda72b5e38ed9826
Author: Shunping Huang <[email protected]>
AuthorDate: Fri Oct 3 19:39:45 2025 -0400

    Handle pending adjustment for processing time bundle correctly. (#36384)
---
 runners/prism/java/build.gradle                    |  3 +-
 .../prism/internal/engine/elementmanager.go        | 44 +++++++++++++---------
 2 files changed, 27 insertions(+), 20 deletions(-)

diff --git a/runners/prism/java/build.gradle b/runners/prism/java/build.gradle
index dbd7cc6cb5c..5e5ddbe139e 100644
--- a/runners/prism/java/build.gradle
+++ b/runners/prism/java/build.gradle
@@ -86,9 +86,8 @@ def sickbayTests = [
     
'org.apache.beam.sdk.metrics.MetricsTest$CommittedMetricTests.testCommittedStringSetMetrics',
     
'org.apache.beam.sdk.metrics.MetricsTest$CommittedMetricTests.testCommittedGaugeMetrics',
 
-    // negative WaitGroup counter when failing bundle
+    // Instead of 42, Prism got 84, which suggests two early panes of 42 are 
fired.
     
'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState',
-    'org.apache.beam.sdk.testing.TestStreamTest.testEarlyPanesOfWindow',
 
     // A regression introduced when we use number of pending elements rather 
than watermark to determine
     // the bundle readiness of a stateless stage.
diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go 
b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
index f77844b6f6c..12b0cada750 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
@@ -477,12 +477,15 @@ func (em *ElementManager) Bundles(ctx context.Context, 
upstreamCancelFn context.
                                        }
                                }
                                if ptimeEventsReady {
-                                       bundleID, ok, reschedule := 
ss.startProcessingTimeBundle(em, emNow, nextBundID)
+                                       bundleID, ok, reschedule, 
pendingAdjustment := ss.startProcessingTimeBundle(em, emNow, nextBundID)
                                        // Handle the reschedule even when 
there's no bundle.
                                        if reschedule {
                                                em.changedStages.insert(stageID)
                                        }
                                        if ok {
+                                               if pendingAdjustment > 0 {
+                                                       
em.addPending(pendingAdjustment)
+                                               }
                                                rb := RunBundle{StageID: 
stageID, BundleID: bundleID, Watermark: watermark}
 
                                                
em.inprogressBundles.insert(rb.BundleID)
@@ -1218,7 +1221,7 @@ type stageKind interface {
                holdsInBundle map[mtime.Time]int, panesInBundle []bundlePane, 
schedulable bool, pendingAdjustment int)
        // buildProcessingTimeBundle handles building processing-time bundles 
for the stage per it's kind.
        buildProcessingTimeBundle(ss *stageState, em *ElementManager, emNow 
mtime.Time) (toProcess elementHeap, minTs mtime.Time, newKeys set[string],
-               holdsInBundle map[mtime.Time]int, panesInBundle []bundlePane, 
schedulable bool)
+               holdsInBundle map[mtime.Time]int, panesInBundle []bundlePane, 
schedulable bool, pendingAdjustment int)
        // getPaneOrDefault based on the stage state, element metadata, and 
bundle id.
        getPaneOrDefault(ss *stageState, defaultPane typex.PaneInfo, w 
typex.Window, keyBytes []byte, bundID string) typex.PaneInfo
 }
@@ -1983,24 +1986,24 @@ keysPerBundle:
        return toProcess, minTs, newKeys, holdsInBundle, panesInBundle, 
stillSchedulable, accumulatingPendingAdjustment
 }
 
-func (ss *stageState) startProcessingTimeBundle(em *ElementManager, emNow 
mtime.Time, genBundID func() string) (string, bool, bool) {
+func (ss *stageState) startProcessingTimeBundle(em *ElementManager, emNow 
mtime.Time, genBundID func() string) (string, bool, bool, int) {
        ss.mu.Lock()
        defer ss.mu.Unlock()
 
-       toProcess, minTs, newKeys, holdsInBundle, panesInBundle, 
stillSchedulable := ss.kind.buildProcessingTimeBundle(ss, em, emNow)
+       toProcess, minTs, newKeys, holdsInBundle, panesInBundle, 
stillSchedulable, accumulatingPendingAdjustment := 
ss.kind.buildProcessingTimeBundle(ss, em, emNow)
 
        if len(toProcess) == 0 {
                // If we have nothing
-               return "", false, stillSchedulable
+               return "", false, stillSchedulable, 
accumulatingPendingAdjustment
        }
        bundID := ss.makeInProgressBundle(genBundID, toProcess, minTs, newKeys, 
holdsInBundle, panesInBundle)
        slog.Debug("started a processing time bundle", "stageID", ss.ID, 
"bundleID", bundID, "size", len(toProcess), "emNow", emNow)
-       return bundID, true, stillSchedulable
+       return bundID, true, stillSchedulable, accumulatingPendingAdjustment
 }
 
 // handleProcessingTimeTimer contains the common code for handling 
processing-time timers for aggregation stages and stateful stages.
 func handleProcessingTimeTimer(ss *stageState, em *ElementManager, emNow 
mtime.Time,
-       processTimerFn func(e element, toProcess []element, holdsInBundle 
map[mtime.Time]int, panesInBundle []bundlePane) ([]element, []bundlePane)) 
(elementHeap, mtime.Time, set[string], map[mtime.Time]int, []bundlePane, bool) {
+       processTimerFn func(e element, toProcess []element, holdsInBundle 
map[mtime.Time]int, panesInBundle []bundlePane) ([]element, []bundlePane, int)) 
(elementHeap, mtime.Time, set[string], map[mtime.Time]int, []bundlePane, bool, 
int) {
        // TODO: Determine if it's possible and a good idea to treat all 
EventTime processing as a MinTime
        // Special Case for ProcessingTime handling.
        // Eg. Always queue EventTime elements at minTime.
@@ -2010,6 +2013,8 @@ func handleProcessingTimeTimer(ss *stageState, em 
*ElementManager, emNow mtime.T
 
        var toProcess []element
        var panesInBundle []bundlePane
+       var pendingAdjustment int
+       accumulatingPendingAdjustment := 0
        minTs := mtime.MaxTimestamp
        holdsInBundle := map[mtime.Time]int{}
 
@@ -2044,7 +2049,8 @@ func handleProcessingTimeTimer(ss *stageState, em 
*ElementManager, emNow mtime.T
                                minTs = e.timestamp
                        }
 
-                       toProcess, panesInBundle = processTimerFn(e, toProcess, 
holdsInBundle, panesInBundle)
+                       toProcess, panesInBundle, pendingAdjustment = 
processTimerFn(e, toProcess, holdsInBundle, panesInBundle)
+                       accumulatingPendingAdjustment += pendingAdjustment
                }
 
                nextTime = ss.processingTimeTimers.Peek()
@@ -2065,24 +2071,26 @@ func handleProcessingTimeTimer(ss *stageState, em 
*ElementManager, emNow mtime.T
        // Add a refresh if there are still processing time events to process.
        stillSchedulable := (nextTime < emNow && nextTime != mtime.MaxTimestamp 
|| len(notYet) > 0)
 
-       return toProcess, minTs, newKeys, holdsInBundle, panesInBundle, 
stillSchedulable
+       return toProcess, minTs, newKeys, holdsInBundle, panesInBundle, 
stillSchedulable, accumulatingPendingAdjustment
 }
 
 // buildProcessingTimeBundle for stateful stages prepares bundles for 
processing-time timers
-func (*statefulStageKind) buildProcessingTimeBundle(ss *stageState, em 
*ElementManager, emNow mtime.Time) (elementHeap, mtime.Time, set[string], 
map[mtime.Time]int, []bundlePane, bool) {
-       return handleProcessingTimeTimer(ss, em, emNow, func(e element, 
toProcess []element, holdsInBundle map[mtime.Time]int, panesInBundle 
[]bundlePane) ([]element, []bundlePane) {
+func (*statefulStageKind) buildProcessingTimeBundle(ss *stageState, em 
*ElementManager, emNow mtime.Time) (elementHeap, mtime.Time, set[string], 
map[mtime.Time]int, []bundlePane, bool, int) {
+       return handleProcessingTimeTimer(ss, em, emNow, func(e element, 
toProcess []element, holdsInBundle map[mtime.Time]int, panesInBundle 
[]bundlePane) ([]element, []bundlePane, int) {
                holdsInBundle[e.holdTimestamp]++
                // We're going to process this timer!
                toProcess = append(toProcess, e)
-               return toProcess, nil
+               return toProcess, nil, 0
        })
 }
 
 // buildProcessingTimeBundle for aggregation stages prepares bundles for 
after-processing-time triggers
-func (*aggregateStageKind) buildProcessingTimeBundle(ss *stageState, em 
*ElementManager, emNow mtime.Time) (elementHeap, mtime.Time, set[string], 
map[mtime.Time]int, []bundlePane, bool) {
-       return handleProcessingTimeTimer(ss, em, emNow, func(e element, 
toProcess []element, holdsInBundle map[mtime.Time]int, panesInBundle 
[]bundlePane) ([]element, []bundlePane) {
+func (*aggregateStageKind) buildProcessingTimeBundle(ss *stageState, em 
*ElementManager, emNow mtime.Time) (elementHeap, mtime.Time, set[string], 
map[mtime.Time]int, []bundlePane, bool, int) {
+       return handleProcessingTimeTimer(ss, em, emNow, func(e element, 
toProcess []element, holdsInBundle map[mtime.Time]int, panesInBundle 
[]bundlePane) ([]element, []bundlePane, int) {
                // Different from `buildProcessingTimeBundle` for stateful 
stage,
                // triggers don't hold back the watermark, so no holds are in 
the triggered bundle.
+               var pendingAdjustment int
+               var elems []element
                state := ss.state[LinkID{}][e.window][string(e.keyBytes)]
                endOfWindowReached := e.window.MaxTimestamp() < ss.input
                ready := ss.strat.IsTriggerReady(triggerInput{
@@ -2095,7 +2103,7 @@ func (*aggregateStageKind) buildProcessingTimeBundle(ss 
*stageState, em *Element
                        state.Pane = computeNextTriggeredPane(state.Pane, 
endOfWindowReached)
 
                        // We're going to process this trigger!
-                       elems, _ := ss.buildTriggeredBundle(em, 
string(e.keyBytes), e.window)
+                       elems, pendingAdjustment = ss.buildTriggeredBundle(em, 
string(e.keyBytes), e.window)
                        toProcess = append(toProcess, elems...)
 
                        ss.state[LinkID{}][e.window][string(e.keyBytes)] = state
@@ -2103,14 +2111,14 @@ func (*aggregateStageKind) buildProcessingTimeBundle(ss 
*stageState, em *Element
                        panesInBundle = append(panesInBundle, bundlePane{})
                }
 
-               return toProcess, panesInBundle
+               return toProcess, panesInBundle, pendingAdjustment
        })
 }
 
 // buildProcessingTimeBundle for stateless stages is not supposed to be called 
currently
-func (*ordinaryStageKind) buildProcessingTimeBundle(ss *stageState, em 
*ElementManager, emNow mtime.Time) (elementHeap, mtime.Time, set[string], 
map[mtime.Time]int, []bundlePane, bool) {
+func (*ordinaryStageKind) buildProcessingTimeBundle(ss *stageState, em 
*ElementManager, emNow mtime.Time) (elementHeap, mtime.Time, set[string], 
map[mtime.Time]int, []bundlePane, bool, int) {
        slog.Error("ordinary stages can't have processing time elements")
-       return nil, mtime.MinTimestamp, nil, nil, nil, false
+       return nil, mtime.MinTimestamp, nil, nil, nil, false, 0
 }
 
 // makeInProgressBundle is common code to store a set of elements as a bundle 
in progress.

Reply via email to