This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 705db250d51 [Dataflow Streaming] Activate SourceState Finalizers
before submitting workitem to harness threads (#38921)
705db250d51 is described below
commit 705db250d514d9f11c8692ba1afc6e828d78873b
Author: Arun Pandian <[email protected]>
AuthorDate: Fri Jun 12 06:09:49 2026 -0700
[Dataflow Streaming] Activate SourceState Finalizers before submitting
workitem to harness threads (#38921)
---
.../worker/windmill/work/processing/StreamingWorkScheduler.java | 5 ++---
1 file changed, 2 insertions(+), 3 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
index 364608be82c..a3f23aebdf8 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
@@ -216,6 +216,8 @@ public class StreamingWorkScheduler {
Work.ProcessingContext processingContext,
boolean drainMode,
ImmutableList<LatencyAttribution> getWorkStreamLatencies) {
+ // Before any processing starts, call any pending OnCommit callbacks
+
commitFinalizer.finalizeCommits(workItem.getSourceState().getFinalizeIdsList());
computationState.activateWork(
ExecutableWork.create(
Work.create(
@@ -255,9 +257,6 @@ public class StreamingWorkScheduler {
setUpWorkLoggingContext(work.getLatencyTrackingId(), computationId);
LOG.debug("Starting processing for {}:\n{}", computationId, work);
- // Before any processing starts, call any pending OnCommit callbacks.
Nothing that requires
- // cleanup should be done before this, since we might exit early here.
-
commitFinalizer.finalizeCommits(workItem.getSourceState().getFinalizeIdsList());
if (workItem.getSourceState().getOnlyFinalize()) {
Windmill.WorkItemCommitRequest.Builder outputBuilder =
initializeOutputBuilder(key, workItem);
outputBuilder.setSourceStateUpdates(Windmill.SourceState.newBuilder().setOnlyFinalize(true));