scwhittle commented on code in PR #36631:
URL: https://github.com/apache/beam/pull/36631#discussion_r2522421372
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactory.java:
##########
@@ -105,11 +105,32 @@ public DataflowMapTaskExecutor create(
Networks.replaceDirectedNetworkNodes(
network, createOutputReceiversTransform(stageName, counterSet));
- // Swap out all the ParallelInstruction nodes with Operation nodes
- Networks.replaceDirectedNetworkNodes(
- network,
- createOperationTransformForParallelInstructionNodes(
- stageName, network, options, readerFactory, sinkFactory,
executionContext));
+ // Swap out all the ParallelInstruction nodes with Operation nodes. While
updating the network,
+ // we keep track of
+ // the created Operations so that if an exception is encountered we can
properly abort started
+ // operations.
+ ArrayList<Operation> createdOperations = new ArrayList<>();
+ try {
+ Networks.replaceDirectedNetworkNodes(
+ network,
+ createOperationTransformForParallelInstructionNodes(
+ stageName,
+ network,
+ options,
+ readerFactory,
+ sinkFactory,
+ executionContext,
+ createdOperations));
+ } catch (RuntimeException exn) {
+ for (Operation o : createdOperations) {
+ try {
+ o.abort();
Review Comment:
This is when creating the executor, if there is no error creating it then we
don't want to abort here because the executor is then returned and reused many
times.
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactoryTest.java:
##########
@@ -746,15 +766,108 @@ public void testCreateFlattenOperation() throws
Exception {
options,
readerRegistry,
sinkRegistry,
- BatchModeExecutionContext.forTesting(options, counterSet,
"testStage"))
+ BatchModeExecutionContext.forTesting(options, counterSet,
"testStage"),
+ createdOperations)
.apply(instructionNode);
assertThat(operationNode, instanceOf(OperationNode.class));
assertThat(((OperationNode) operationNode).getOperation(),
instanceOf(FlattenOperation.class));
FlattenOperation flattenOperation =
(FlattenOperation) ((OperationNode) operationNode).getOperation();
+ assertThat(createdOperations, contains(flattenOperation));
assertEquals(1, flattenOperation.receivers.length);
assertEquals(0, flattenOperation.receivers[0].getReceiverCount());
assertEquals(Operation.InitializationState.UNSTARTED,
flattenOperation.initializationState);
}
+
+ static class TestTeardownDoFn extends DoFn<String, String> {
+ static AtomicInteger setupCalls = new AtomicInteger();
+ static AtomicInteger teardownCalls = new AtomicInteger();
+
+ private final boolean throwExceptionOnSetup;
+ private boolean setupCalled = false;
+
+ TestTeardownDoFn(boolean throwExceptionOnSetup) {
+ this.throwExceptionOnSetup = throwExceptionOnSetup;
+ }
+
+ @Setup
+ public void setup() {
+ assertFalse(setupCalled);
+ setupCalled = true;
+ setupCalls.addAndGet(1);
+ if (throwExceptionOnSetup) {
+ throw new RuntimeException("Test setup exception");
+ }
+ }
+
+ @ProcessElement
+ public void process(ProcessContext c) {
+ fail("no elements should be processed");
+ }
+
+ @Teardown
+ public void teardown() {
+ assertTrue(setupCalled);
+ setupCalled = false;
+ teardownCalls.addAndGet(1);
+ }
+ }
+
+ @Test
+ public void testCreateMapTaskExecutorException() throws Exception {
+ List<ParallelInstruction> instructions =
+ Arrays.asList(
+ createReadInstruction("Read"),
+ createParDoInstruction(0, 0, "DoFn1", "DoFn1", new
TestTeardownDoFn(false)),
+ createParDoInstruction(0, 0, "DoFn2", "DoFn2", new
TestTeardownDoFn(false)),
+ createParDoInstruction(0, 0, "ErrorFn", "", new
TestTeardownDoFn(true)),
+ createParDoInstruction(0, 0, "DoFn3", "DoFn3", new
TestTeardownDoFn(false)),
+ createFlattenInstruction(1, 0, 2, 0, "Flatten"),
+ createWriteInstruction(3, 0, "Write"));
+
+ MapTask mapTask = new MapTask();
+ mapTask.setStageName(STAGE);
+ mapTask.setSystemName("systemName");
+ mapTask.setInstructions(instructions);
+ mapTask.setFactory(Transport.getJsonFactory());
+
+ assertThrows(
+ "Test setup exception",
+ RuntimeException.class,
+ () ->
+ mapTaskExecutorFactory.create(
+ mapTaskToNetwork.apply(mapTask),
+ options,
+ STAGE,
+ readerRegistry,
+ sinkRegistry,
+ BatchModeExecutionContext.forTesting(options, counterSet,
"testStage"),
+ counterSet,
+ idGenerator));
+ assertEquals(3, TestTeardownDoFn.setupCalls.getAndSet(0));
+ // We only tear-down the instruction we were unable to create. The other
Review Comment:
See this test for comments on which functions have teardown called and setup
called.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java:
##########
@@ -415,18 +415,21 @@ private ExecuteWorkResult executeWork(
// Release the execution state for another thread to use.
computationState.releaseComputationWorkExecutor(computationWorkExecutor);
+ computationWorkExecutor = null;
work.setState(Work.State.COMMIT_QUEUED);
outputBuilder.addAllPerWorkItemLatencyAttributions(work.getLatencyAttributions(sampler));
return ExecuteWorkResult.create(
outputBuilder, stateReader.getBytesRead() +
localSideInputStateFetcher.getBytesRead());
} catch (Throwable t) {
- // If processing failed due to a thrown exception, close the
executionState. Do not
- // return/release the executionState back to computationState as that
will lead to this
- // executionState instance being reused.
- LOG.debug("Invalidating executor after work item {} failed",
workItem.getWorkToken(), t);
- computationWorkExecutor.invalidate();
+ if (computationWorkExecutor != null) {
Review Comment:
the null possiblity is just possible now since we set it to null above.
The previous bug could have been that if an exception was thrown after
line 417
`computationState.releaseComputationWorkExecutor(computationWorkExecutor);` we
could invalidate an executor that we have already released and is possibly
doing other work or is in the cache. I don't think that such an exception is
expected since we aren't doing much in this gap but it seems safer in case the
code changes in the future to prevent that.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]