This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 705db250d51 [Dataflow Streaming] Activate SourceState Finalizers 
before submitting workitem to harness threads (#38921)
705db250d51 is described below

commit 705db250d514d9f11c8692ba1afc6e828d78873b
Author: Arun Pandian <[email protected]>
AuthorDate: Fri Jun 12 06:09:49 2026 -0700

    [Dataflow Streaming] Activate SourceState Finalizers before submitting 
workitem to harness threads (#38921)
---
 .../worker/windmill/work/processing/StreamingWorkScheduler.java      | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
index 364608be82c..a3f23aebdf8 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
@@ -216,6 +216,8 @@ public class StreamingWorkScheduler {
       Work.ProcessingContext processingContext,
       boolean drainMode,
       ImmutableList<LatencyAttribution> getWorkStreamLatencies) {
+    // Before any processing starts, call any pending OnCommit callbacks
+    
commitFinalizer.finalizeCommits(workItem.getSourceState().getFinalizeIdsList());
     computationState.activateWork(
         ExecutableWork.create(
             Work.create(
@@ -255,9 +257,6 @@ public class StreamingWorkScheduler {
     setUpWorkLoggingContext(work.getLatencyTrackingId(), computationId);
     LOG.debug("Starting processing for {}:\n{}", computationId, work);
 
-    // Before any processing starts, call any pending OnCommit callbacks.  
Nothing that requires
-    // cleanup should be done before this, since we might exit early here.
-    
commitFinalizer.finalizeCommits(workItem.getSourceState().getFinalizeIdsList());
     if (workItem.getSourceState().getOnlyFinalize()) {
       Windmill.WorkItemCommitRequest.Builder outputBuilder = 
initializeOutputBuilder(key, workItem);
       
outputBuilder.setSourceStateUpdates(Windmill.SourceState.newBuilder().setOnlyFinalize(true));

Reply via email to