This is an automated email from the ASF dual-hosted git repository.

shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new fe5c2888ae4 [Prism] Add previousInput watermark and use it in 
bundleReady (#36137)
fe5c2888ae4 is described below

commit fe5c2888ae43678d2498b07e588c0037502220a2
Author: Shunping Huang <[email protected]>
AuthorDate: Sat Sep 13 01:11:45 2025 -0400

    [Prism] Add previousInput watermark and use it in bundleReady (#36137)
    
    * Add preRefreshedInput watermark and use it in bundleReady.
    
    * Address the reviewer feedback.
---
 .../pkg/beam/runners/prism/internal/engine/elementmanager.go  | 11 ++++++++---
 1 file changed, 8 insertions(+), 3 deletions(-)

diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go 
b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
index 8c8b71ca414..18f10f45e6c 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
@@ -1153,6 +1153,7 @@ type stageState struct {
        input              mtime.Time // input watermark for the parallel input.
        output             mtime.Time // Output watermark for the whole stage
        estimatedOutput    mtime.Time // Estimated watermark output from DoFns
+       previousInput      mtime.Time // input watermark before the latest 
watermark refresh
 
        pending    elementHeap                          // pending input 
elements for this stage that are to be processesd
        inprogress map[string]elements                  // inprogress elements 
by active bundles, keyed by bundle
@@ -2014,6 +2015,8 @@ func (ss *stageState) updateWatermarks(em 
*ElementManager) set[string] {
                newIn = minPending
        }
 
+       ss.previousInput = ss.input
+
        // If bigger, advance the input watermark.
        if newIn > ss.input {
                ss.input = newIn
@@ -2171,11 +2174,13 @@ func (ss *stageState) bundleReady(em *ElementManager, 
emNow mtime.Time) (mtime.T
        ptimeEventsReady := ss.processingTimeTimers.Peek() <= emNow || emNow == 
mtime.MaxTimestamp
        injectedReady := len(ss.bundlesToInject) > 0
 
-       // If the upstream watermark and the input watermark are the same,
-       // then we can't yet process this stage.
+       // If the upstream watermark does not change, we can't yet process this 
stage.
+       // To check whether upstream water is unchanged, we evaluate if the 
input watermark, and
+       // the input watermark before the latest refresh are the same.
        inputW := ss.input
        _, upstreamW := ss.UpstreamWatermark()
-       if inputW == upstreamW {
+       previousInputW := ss.previousInput
+       if inputW == upstreamW && previousInputW == inputW {
                slog.Debug("bundleReady: unchanged upstream watermark",
                        slog.String("stage", ss.ID),
                        slog.Group("watermark",

Reply via email to