This is an automated email from the ASF dual-hosted git repository.
shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new fe5c2888ae4 [Prism] Add previousInput watermark and use it in
bundleReady (#36137)
fe5c2888ae4 is described below
commit fe5c2888ae43678d2498b07e588c0037502220a2
Author: Shunping Huang <[email protected]>
AuthorDate: Sat Sep 13 01:11:45 2025 -0400
[Prism] Add previousInput watermark and use it in bundleReady (#36137)
* Add preRefreshedInput watermark and use it in bundleReady.
* Address the reviewer feedback.
---
.../pkg/beam/runners/prism/internal/engine/elementmanager.go | 11 ++++++++---
1 file changed, 8 insertions(+), 3 deletions(-)
diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
index 8c8b71ca414..18f10f45e6c 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
@@ -1153,6 +1153,7 @@ type stageState struct {
input mtime.Time // input watermark for the parallel input.
output mtime.Time // Output watermark for the whole stage
estimatedOutput mtime.Time // Estimated watermark output from DoFns
+ previousInput mtime.Time // input watermark before the latest
watermark refresh
pending elementHeap // pending input
elements for this stage that are to be processesd
inprogress map[string]elements // inprogress elements
by active bundles, keyed by bundle
@@ -2014,6 +2015,8 @@ func (ss *stageState) updateWatermarks(em
*ElementManager) set[string] {
newIn = minPending
}
+ ss.previousInput = ss.input
+
// If bigger, advance the input watermark.
if newIn > ss.input {
ss.input = newIn
@@ -2171,11 +2174,13 @@ func (ss *stageState) bundleReady(em *ElementManager,
emNow mtime.Time) (mtime.T
ptimeEventsReady := ss.processingTimeTimers.Peek() <= emNow || emNow ==
mtime.MaxTimestamp
injectedReady := len(ss.bundlesToInject) > 0
- // If the upstream watermark and the input watermark are the same,
- // then we can't yet process this stage.
+ // If the upstream watermark does not change, we can't yet process this
stage.
+ // To check whether upstream water is unchanged, we evaluate if the
input watermark, and
+ // the input watermark before the latest refresh are the same.
inputW := ss.input
_, upstreamW := ss.UpstreamWatermark()
- if inputW == upstreamW {
+ previousInputW := ss.previousInput
+ if inputW == upstreamW && previousInputW == inputW {
slog.Debug("bundleReady: unchanged upstream watermark",
slog.String("stage", ss.ID),
slog.Group("watermark",