Abacn commented on code in PR #36631:
URL: https://github.com/apache/beam/pull/36631#discussion_r2519126746
##########
runners/google-cloud-dataflow-java/build.gradle:
##########
@@ -520,8 +519,7 @@ task validatesRunnerV2 {
excludedTests: [
'org.apache.beam.sdk.transforms.ReshuffleTest.testReshuffleWithTimestampsStreaming',
- // TODO(https://github.com/apache/beam/issues/18592): respect ParDo
lifecycle.
-
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testFnCallSequenceStateful',
Review Comment:
The change only involves legacy runner, while this excluded test is removed.
Can we confirm this now works with runner v2? One can trigger PostCommit
validates runner test by adding any change to
https://github.com/apache/beam/blob/master/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_V2.json
and push a commit
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java:
##########
@@ -415,18 +415,21 @@ private ExecuteWorkResult executeWork(
// Release the execution state for another thread to use.
computationState.releaseComputationWorkExecutor(computationWorkExecutor);
+ computationWorkExecutor = null;
work.setState(Work.State.COMMIT_QUEUED);
outputBuilder.addAllPerWorkItemLatencyAttributions(work.getLatencyAttributions(sampler));
return ExecuteWorkResult.create(
outputBuilder, stateReader.getBytesRead() +
localSideInputStateFetcher.getBytesRead());
} catch (Throwable t) {
- // If processing failed due to a thrown exception, close the
executionState. Do not
- // return/release the executionState back to computationState as that
will lead to this
- // executionState instance being reused.
- LOG.debug("Invalidating executor after work item {} failed",
workItem.getWorkToken(), t);
- computationWorkExecutor.invalidate();
+ if (computationWorkExecutor != null) {
Review Comment:
Looks like it fixed a NPE. Was this an existing bug or exposed after this
change (to MapTaskExecutor)?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactory.java:
##########
@@ -105,11 +105,32 @@ public DataflowMapTaskExecutor create(
Networks.replaceDirectedNetworkNodes(
network, createOutputReceiversTransform(stageName, counterSet));
- // Swap out all the ParallelInstruction nodes with Operation nodes
- Networks.replaceDirectedNetworkNodes(
- network,
- createOperationTransformForParallelInstructionNodes(
- stageName, network, options, readerFactory, sinkFactory,
executionContext));
+ // Swap out all the ParallelInstruction nodes with Operation nodes. While
updating the network,
+ // we keep track of
+ // the created Operations so that if an exception is encountered we can
properly abort started
+ // operations.
+ ArrayList<Operation> createdOperations = new ArrayList<>();
+ try {
+ Networks.replaceDirectedNetworkNodes(
+ network,
+ createOperationTransformForParallelInstructionNodes(
+ stageName,
+ network,
+ options,
+ readerFactory,
+ sinkFactory,
+ executionContext,
+ createdOperations));
+ } catch (RuntimeException exn) {
+ for (Operation o : createdOperations) {
+ try {
+ o.abort();
Review Comment:
we are now evaluating teardown for all pardos when one throw. Given the
change would it be straightforward to fix for the case DoFn finished normally
(like changing `catch` to `finally` ), or it would need further consideration?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]