[
https://issues.apache.org/jira/browse/BEAM-3244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274394#comment-16274394
]
ASF GitHub Bot commented on BEAM-3244:
--------------------------------------
iemejia closed pull request #4174: [BEAM-3244] Ensure execution of teardown
method on Flink's DoFnOperator
URL: https://github.com/apache/beam/pull/4174
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml
index a6ab44bcea5..7840c328c9a 100644
--- a/runners/flink/pom.xml
+++ b/runners/flink/pom.xml
@@ -87,10 +87,6 @@
<goal>test</goal>
</goals>
<configuration>
- <!-- BEAM-3244 -->
- <excludes>
-
<exclude>org.apache.beam.sdk.transforms.ParDoLifecycleTest</exclude>
- </excludes>
<groups>org.apache.beam.sdk.testing.ValidatesRunner</groups>
<excludedGroups>
org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders,
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index d203ffb67dc..fcee0549706 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -361,6 +361,16 @@ public void onProcessingTime(long timestamp) throws
Exception {
SimplePushbackSideInputDoFnRunner.create(doFnRunner, sideInputs,
sideInputHandler);
}
+ @Override
+ public void dispose() throws Exception {
+ try {
+ super.dispose();
+ checkFinishBundleTimer.cancel(true);
+ } finally {
+ doFnInvoker.invokeTeardown();
+ }
+ }
+
@Override
public void close() throws Exception {
super.close();
@@ -379,8 +389,6 @@ public void close() throws Exception {
}
}
}
- checkFinishBundleTimer.cancel(true);
- doFnInvoker.invokeTeardown();
}
private long getPushbackWatermarkHold() {
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Flink runner does not respect ParDo's lifecycle on case of exceptions
> ---------------------------------------------------------------------
>
> Key: BEAM-3244
> URL: https://issues.apache.org/jira/browse/BEAM-3244
> Project: Beam
> Issue Type: Bug
> Components: runner-flink
> Affects Versions: 2.2.0
> Reporter: Ismaël Mejía
> Assignee: Ismaël Mejía
>
> The lifecycle of the DoFn is not respected in case of exception in any of the
> lifecycle methods after setup.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)