[ 
https://issues.apache.org/jira/browse/BEAM-3187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16315199#comment-16315199
 ] 

ASF GitHub Bot commented on BEAM-3187:
--------------------------------------

iemejia closed pull request #4130: [BEAM-3187] Ensure that teardown is called 
in case of Exception on the Spark runner
URL: https://github.com/apache/beam/pull/4130
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index 3670cdeb3ed..f61863b6726 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -69,10 +69,6 @@
                   <goal>test</goal>
                 </goals>
                 <configuration>
-                  <!-- BEAM-3187 -->
-                  <excludes>
-                    
<exclude>org.apache.beam.sdk.transforms.ParDoLifecycleTest</exclude>
-                  </excludes>
                   <groups>
                     org.apache.beam.sdk.testing.ValidatesRunner,
                     org.apache.beam.runners.spark.UsesCheckpointRecovery
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
index 8b85155fd53..4376c386f4e 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
@@ -21,7 +21,7 @@
 import static com.google.common.base.Preconditions.checkArgument;
 
 import com.google.common.collect.AbstractIterator;
-import com.google.common.collect.Lists;
+import java.util.ArrayList;
 import java.util.Iterator;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners.OutputManager;
@@ -61,17 +61,13 @@
   Iterable<OutputT> processPartition(
       Iterator<WindowedValue<FnInputT>> partition) throws Exception {
 
-    // setup DoFn.
-    DoFnInvokers.invokerFor(doFn).invokeSetup();
-
     // skip if partition is empty.
     if (!partition.hasNext()) {
-      DoFnInvokers.invokerFor(doFn).invokeTeardown();
-      return Lists.newArrayList();
+      return new ArrayList<>();
     }
 
-    // call startBundle() before beginning to process the partition.
-    doFnRunner.startBundle();
+    // setup DoFn.
+    DoFnInvokers.invokerFor(doFn).invokeSetup();
     // process the partition; finishBundle() is called from within the output 
iterator.
     return this.getOutputIterable(partition, doFnRunner);
   }
@@ -87,7 +83,6 @@ private void clearOutput() {
   private Iterable<OutputT> getOutputIterable(
       final Iterator<WindowedValue<FnInputT>> iter,
       final DoFnRunner<FnInputT, FnOutputT> doFnRunner) {
-
     return new Iterable<OutputT>() {
       @Override
       public Iterator<OutputT> iterator() {
@@ -120,7 +115,8 @@ public TimerInternals timerInternals() {
     private final Iterator<WindowedValue<FnInputT>> inputIterator;
     private final DoFnRunner<FnInputT, FnOutputT> doFnRunner;
     private Iterator<OutputT> outputIterator;
-    private boolean calledFinish;
+    private boolean isBundleStarted;
+    private boolean isBundleFinished;
 
     ProcCtxtIterator(
         Iterator<WindowedValue<FnInputT>> iterator,
@@ -137,31 +133,41 @@ protected OutputT computeNext() {
       // collection (and iterator) is reset between each call to 
processElement, so the
       // collection only holds the output values for each call to 
processElement, rather
       // than for the whole partition (which would use too much memory).
-      while (true) {
-        if (outputIterator.hasNext()) {
-          return outputIterator.next();
-        } else if (inputIterator.hasNext()) {
-          clearOutput();
-          // grab the next element and process it.
-          doFnRunner.processElement(inputIterator.next());
-          outputIterator = getOutputIterator();
-        } else if (timerDataIterator.hasNext()) {
+      if (!isBundleStarted) {
+        isBundleStarted = true;
+        // call startBundle() before beginning to process the partition.
+        doFnRunner.startBundle();
+      }
+
+      try {
+        while (true) {
+          if (outputIterator.hasNext()) {
+            return outputIterator.next();
+          }
+
           clearOutput();
-          fireTimer(timerDataIterator.next());
-          outputIterator = getOutputIterator();
-        } else {
-          // no more input to consume, but finishBundle can produce more output
-          if (!calledFinish) {
-            clearOutput();
-            calledFinish = true;
-            doFnRunner.finishBundle();
-            // teardown DoFn.
-            DoFnInvokers.invokerFor(doFn).invokeTeardown();
+          if (inputIterator.hasNext()) {
+            // grab the next element and process it.
+            doFnRunner.processElement(inputIterator.next());
+            outputIterator = getOutputIterator();
+          } else if (timerDataIterator.hasNext()) {
+            fireTimer(timerDataIterator.next());
             outputIterator = getOutputIterator();
-            continue; // try to consume outputIterator from start of loop
+          } else {
+            // no more input to consume, but finishBundle can produce more 
output
+            if (!isBundleFinished) {
+              isBundleFinished = true;
+              doFnRunner.finishBundle();
+              outputIterator = getOutputIterator();
+              continue; // try to consume outputIterator from start of loop
+            }
+            DoFnInvokers.invokerFor(doFn).invokeTeardown();
+            return endOfData();
           }
-          return endOfData();
         }
+      } catch (final RuntimeException re) {
+        DoFnInvokers.invokerFor(doFn).invokeTeardown();
+        throw re;
       }
     }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Spark runner does not respect ParDo's lifecycle on case of exceptions
> ---------------------------------------------------------------------
>
>                 Key: BEAM-3187
>                 URL: https://issues.apache.org/jira/browse/BEAM-3187
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-spark
>    Affects Versions: 2.2.0
>            Reporter: Ismaël Mejía
>            Assignee: Ismaël Mejía
>             Fix For: 2.3.0
>
>
> The lifecycle of the DoFn is not respected in the case of an Exception on the 
> finishBundle method. The Spark runner does not call the teardown function of 
> the DoFn as it should since this should be dual with the setup.
> This happens too if the exception happens in the startBundle too.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to