scwhittle commented on code in PR #36631:
URL: https://github.com/apache/beam/pull/36631#discussion_r2522421372


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactory.java:
##########
@@ -105,11 +105,32 @@ public DataflowMapTaskExecutor create(
     Networks.replaceDirectedNetworkNodes(
         network, createOutputReceiversTransform(stageName, counterSet));
 
-    // Swap out all the ParallelInstruction nodes with Operation nodes
-    Networks.replaceDirectedNetworkNodes(
-        network,
-        createOperationTransformForParallelInstructionNodes(
-            stageName, network, options, readerFactory, sinkFactory, 
executionContext));
+    // Swap out all the ParallelInstruction nodes with Operation nodes. While 
updating the network,
+    // we keep track of
+    // the created Operations so that if an exception is encountered we can 
properly abort started
+    // operations.
+    ArrayList<Operation> createdOperations = new ArrayList<>();
+    try {
+      Networks.replaceDirectedNetworkNodes(
+          network,
+          createOperationTransformForParallelInstructionNodes(
+              stageName,
+              network,
+              options,
+              readerFactory,
+              sinkFactory,
+              executionContext,
+              createdOperations));
+    } catch (RuntimeException exn) {
+      for (Operation o : createdOperations) {
+        try {
+          o.abort();

Review Comment:
   This is when creating the executor, if there is no error creating it then we 
don't want to abort here because the executor is then returned and reused many 
times.



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactoryTest.java:
##########
@@ -746,15 +766,108 @@ public void testCreateFlattenOperation() throws 
Exception {
                 options,
                 readerRegistry,
                 sinkRegistry,
-                BatchModeExecutionContext.forTesting(options, counterSet, 
"testStage"))
+                BatchModeExecutionContext.forTesting(options, counterSet, 
"testStage"),
+                createdOperations)
             .apply(instructionNode);
     assertThat(operationNode, instanceOf(OperationNode.class));
     assertThat(((OperationNode) operationNode).getOperation(), 
instanceOf(FlattenOperation.class));
     FlattenOperation flattenOperation =
         (FlattenOperation) ((OperationNode) operationNode).getOperation();
+    assertThat(createdOperations, contains(flattenOperation));
 
     assertEquals(1, flattenOperation.receivers.length);
     assertEquals(0, flattenOperation.receivers[0].getReceiverCount());
     assertEquals(Operation.InitializationState.UNSTARTED, 
flattenOperation.initializationState);
   }
+
+  static class TestTeardownDoFn extends DoFn<String, String> {
+    static AtomicInteger setupCalls = new AtomicInteger();
+    static AtomicInteger teardownCalls = new AtomicInteger();
+
+    private final boolean throwExceptionOnSetup;
+    private boolean setupCalled = false;
+
+    TestTeardownDoFn(boolean throwExceptionOnSetup) {
+      this.throwExceptionOnSetup = throwExceptionOnSetup;
+    }
+
+    @Setup
+    public void setup() {
+      assertFalse(setupCalled);
+      setupCalled = true;
+      setupCalls.addAndGet(1);
+      if (throwExceptionOnSetup) {
+        throw new RuntimeException("Test setup exception");
+      }
+    }
+
+    @ProcessElement
+    public void process(ProcessContext c) {
+      fail("no elements should be processed");
+    }
+
+    @Teardown
+    public void teardown() {
+      assertTrue(setupCalled);
+      setupCalled = false;
+      teardownCalls.addAndGet(1);
+    }
+  }
+
+  @Test
+  public void testCreateMapTaskExecutorException() throws Exception {
+    List<ParallelInstruction> instructions =
+        Arrays.asList(
+            createReadInstruction("Read"),
+            createParDoInstruction(0, 0, "DoFn1", "DoFn1", new 
TestTeardownDoFn(false)),
+            createParDoInstruction(0, 0, "DoFn2", "DoFn2", new 
TestTeardownDoFn(false)),
+            createParDoInstruction(0, 0, "ErrorFn", "", new 
TestTeardownDoFn(true)),
+            createParDoInstruction(0, 0, "DoFn3", "DoFn3", new 
TestTeardownDoFn(false)),
+            createFlattenInstruction(1, 0, 2, 0, "Flatten"),
+            createWriteInstruction(3, 0, "Write"));
+
+    MapTask mapTask = new MapTask();
+    mapTask.setStageName(STAGE);
+    mapTask.setSystemName("systemName");
+    mapTask.setInstructions(instructions);
+    mapTask.setFactory(Transport.getJsonFactory());
+
+    assertThrows(
+        "Test setup exception",
+        RuntimeException.class,
+        () ->
+            mapTaskExecutorFactory.create(
+                mapTaskToNetwork.apply(mapTask),
+                options,
+                STAGE,
+                readerRegistry,
+                sinkRegistry,
+                BatchModeExecutionContext.forTesting(options, counterSet, 
"testStage"),
+                counterSet,
+                idGenerator));
+    assertEquals(3, TestTeardownDoFn.setupCalls.getAndSet(0));
+    // We only tear-down the instruction we were unable to create.  The other

Review Comment:
   See this test for comments on which functions have teardown called and setup 
called.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java:
##########
@@ -415,18 +415,21 @@ private ExecuteWorkResult executeWork(
 
       // Release the execution state for another thread to use.
       computationState.releaseComputationWorkExecutor(computationWorkExecutor);
+      computationWorkExecutor = null;
 
       work.setState(Work.State.COMMIT_QUEUED);
       
outputBuilder.addAllPerWorkItemLatencyAttributions(work.getLatencyAttributions(sampler));
 
       return ExecuteWorkResult.create(
           outputBuilder, stateReader.getBytesRead() + 
localSideInputStateFetcher.getBytesRead());
     } catch (Throwable t) {
-      // If processing failed due to a thrown exception, close the 
executionState. Do not
-      // return/release the executionState back to computationState as that 
will lead to this
-      // executionState instance being reused.
-      LOG.debug("Invalidating executor after work item {} failed", 
workItem.getWorkToken(), t);
-      computationWorkExecutor.invalidate();
+      if (computationWorkExecutor != null) {

Review Comment:
   the null possiblity is just possible now since we set it to null above.
   
   The previous bug could have been that if an exception was thrown after 
   line 417 
`computationState.releaseComputationWorkExecutor(computationWorkExecutor);` we 
could invalidate an executor that we have already released and is possibly 
doing other work or is in the cache.  I don't think that such an exception is 
expected since we aren't doing much in this gap but it seems safer in case the 
code changes in the  future to prevent that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to