arunpandianp commented on code in PR #38814:
URL: https://github.com/apache/beam/pull/38814#discussion_r3371048191


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java:
##########
@@ -237,6 +243,17 @@ public void setProcessingThreadName(String 
processingThreadName) {
   @Override
   public void setFailed() {
     this.isFailed = true;
+    Runnable listener = onFailureListener;
+    if (listener != null) {
+      listener.run();
+    }
+  }
+
+  public void setOnFailureListener(@Nullable Runnable listener) {
+    this.onFailureListener = listener;
+    if (isFailed && listener != null) {

Review Comment:
   moved to shared boolean



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java:
##########
@@ -258,37 +247,36 @@ private void processWork(
     // Before any processing starts, call any pending OnCommit callbacks.  
Nothing that requires
     // cleanup should be done before this, since we might exit early here.
     
commitFinalizer.finalizeCommits(workItem.getSourceState().getFinalizeIdsList());
+
     if (workItem.getSourceState().getOnlyFinalize()) {
-      Windmill.WorkItemCommitRequest.Builder outputBuilder = 
initializeOutputBuilder(key, workItem);
-      
outputBuilder.setSourceStateUpdates(Windmill.SourceState.newBuilder().setOnlyFinalize(true));
-      work.setState(Work.State.COMMIT_QUEUED);
-      work.queueCommit(outputBuilder.build(), computationState);
+      handleOnlyFinalize(computationState, work, workItem);
       return;
     }
 
     long processingStartTimeNanos = System.nanoTime();
-    MapTask mapTask = computationState.getMapTask();
-    StageInfo stageInfo =
-        stageInfoMap.computeIfAbsent(
-            mapTask.getStageName(), s -> StageInfo.create(s, 
mapTask.getSystemName()));
+    StageInfo stageInfo = getStageInfo(computationState);
 
+    List<Work> worksToCleanup = null;
     try {
       if (work.isFailed()) {
         throw new WorkItemCancelledException(workItem.getShardingKey());
       }
 
-      // Execute the user code for the Work.
-      ExecuteWorkResult executeWorkResult = executeWork(work, stageInfo, 
computationState);
-      Windmill.WorkItemCommitRequest.Builder commitRequest = 
executeWorkResult.commitWorkRequest();
+      // Execute the user code for the Work batch.
+      ExecuteWorkResult executeWorkResult = executeWork(work, stageInfo, 
computationState, handle);
+      List<Work> workBatch = executeWorkResult.workBatch();
+      worksToCleanup = workBatch;
+      List<Windmill.WorkItemCommitRequest.Builder> outputBuilders =
+          executeWorkResult.outputBuilders();
+      Map<Long, Pair<Instant, Runnable>> accumulatedCallbacks =

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to