Abacn commented on code in PR #36631:
URL: https://github.com/apache/beam/pull/36631#discussion_r2519126746


##########
runners/google-cloud-dataflow-java/build.gradle:
##########
@@ -520,8 +519,7 @@ task validatesRunnerV2 {
     excludedTests: [
       
'org.apache.beam.sdk.transforms.ReshuffleTest.testReshuffleWithTimestampsStreaming',
 
-      // TODO(https://github.com/apache/beam/issues/18592): respect ParDo 
lifecycle.
-      
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testFnCallSequenceStateful',

Review Comment:
   The change only involves legacy runner, while this excluded test is removed. 
Can we confirm this now works with runner v2? One can trigger PostCommit 
validates runner test by adding any change to 
https://github.com/apache/beam/blob/master/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_V2.json
 and push a commit



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java:
##########
@@ -415,18 +415,21 @@ private ExecuteWorkResult executeWork(
 
       // Release the execution state for another thread to use.
       computationState.releaseComputationWorkExecutor(computationWorkExecutor);
+      computationWorkExecutor = null;
 
       work.setState(Work.State.COMMIT_QUEUED);
       
outputBuilder.addAllPerWorkItemLatencyAttributions(work.getLatencyAttributions(sampler));
 
       return ExecuteWorkResult.create(
           outputBuilder, stateReader.getBytesRead() + 
localSideInputStateFetcher.getBytesRead());
     } catch (Throwable t) {
-      // If processing failed due to a thrown exception, close the 
executionState. Do not
-      // return/release the executionState back to computationState as that 
will lead to this
-      // executionState instance being reused.
-      LOG.debug("Invalidating executor after work item {} failed", 
workItem.getWorkToken(), t);
-      computationWorkExecutor.invalidate();
+      if (computationWorkExecutor != null) {

Review Comment:
   Looks like it fixed a NPE. Was this an existing bug or exposed after this 
change (to MapTaskExecutor)?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactory.java:
##########
@@ -105,11 +105,32 @@ public DataflowMapTaskExecutor create(
     Networks.replaceDirectedNetworkNodes(
         network, createOutputReceiversTransform(stageName, counterSet));
 
-    // Swap out all the ParallelInstruction nodes with Operation nodes
-    Networks.replaceDirectedNetworkNodes(
-        network,
-        createOperationTransformForParallelInstructionNodes(
-            stageName, network, options, readerFactory, sinkFactory, 
executionContext));
+    // Swap out all the ParallelInstruction nodes with Operation nodes. While 
updating the network,
+    // we keep track of
+    // the created Operations so that if an exception is encountered we can 
properly abort started
+    // operations.
+    ArrayList<Operation> createdOperations = new ArrayList<>();
+    try {
+      Networks.replaceDirectedNetworkNodes(
+          network,
+          createOperationTransformForParallelInstructionNodes(
+              stageName,
+              network,
+              options,
+              readerFactory,
+              sinkFactory,
+              executionContext,
+              createdOperations));
+    } catch (RuntimeException exn) {
+      for (Operation o : createdOperations) {
+        try {
+          o.abort();

Review Comment:
   we are now evaluating teardown for all pardos when one throw. Given the 
change would it be straightforward to fix for the case DoFn finished normally 
(like changing `catch` to `finally` ), or it would need further consideration?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to