[ 
https://issues.apache.org/jira/browse/BEAM-3409?focusedWorklogId=87881&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-87881
 ]

ASF GitHub Bot logged work on BEAM-3409:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 05/Apr/18 04:09
            Start Date: 05/Apr/18 04:09
    Worklog Time Spent: 10m 
      Work Description: jbonofre closed pull request #4790: [BEAM-3409] 
waitUntilFinish() doesn't wait for the teardown execution on Direct runner - 
fixing compilation issue on flink
URL: https://github.com/apache/beam/pull/4790
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
index 30648f6e582..fd36318e5af 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
@@ -49,4 +49,10 @@
    * additional tasks, such as flushing in-memory states.
    */
   void finishBundle();
+
+  /**
+   * @since 2.5.0
+   * @return the underlying fn instance.
+   */
+  DoFn<InputT, OutputT> getFn();
 }
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
index f89aa4e839c..d101cc57b27 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
@@ -60,6 +60,11 @@ public LateDataDroppingDoFnRunner(
     lateDataFilter = new LateDataFilter(windowingStrategy, timerInternals);
   }
 
+  @Override
+  public DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> getFn() {
+    return doFnRunner.getFn();
+  }
+
   @Override
   public void startBundle() {
     doFnRunner.startBundle();
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
index e4dfd132e2d..8c360ef0bd0 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
@@ -25,6 +25,7 @@
 import org.apache.beam.runners.core.StateNamespaces.WindowNamespace;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -52,6 +53,11 @@ public ProcessFnRunner(
     this.sideInputReader = sideInputReader;
   }
 
+  @Override
+  public DoFn<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> 
getFn() {
+    return underlying.getFn();
+  }
+
   @Override
   public void startBundle() {
     underlying.startBundle();
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
index 8f21086794d..f3013eff0cb 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
@@ -48,4 +48,10 @@ void onTimer(String timerId, BoundedWindow window, Instant 
timestamp,
 
   /** Calls the underlying {@link DoFn.FinishBundle} method. */
   void finishBundle();
+
+  /**
+   * @since 2.5.0
+   * @return the underlying fn instance.
+   */
+  DoFn<InputT, OutputT> getFn();
 }
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index d4c5775464b..7e60b033e54 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -120,6 +120,11 @@ public SimpleDoFnRunner(
     this.allowedLateness = windowingStrategy.getAllowedLateness();
   }
 
+  @Override
+  public DoFn<InputT, OutputT> getFn() {
+    return fn;
+  }
+
   @Override
   public void startBundle() {
     // This can contain user code. Wrap it in case it throws an exception.
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java
index 591a6a2f059..8b7489b8798 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java
@@ -25,6 +25,7 @@
 import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -51,7 +52,7 @@
   }
 
   private SimplePushbackSideInputDoFnRunner(
-      DoFnRunner<InputT, OutputT> underlying,
+          DoFnRunner<InputT, OutputT> underlying,
       Collection<PCollectionView<?>> views,
       ReadyCheckingSideInputReader sideInputReader) {
     this.underlying = underlying;
@@ -59,6 +60,11 @@ private SimplePushbackSideInputDoFnRunner(
     this.sideInputReader = sideInputReader;
   }
 
+  @Override
+  public DoFn<InputT, OutputT> getFn() {
+    return underlying.getFn();
+  }
+
   @Override
   public void startBundle() {
     notReadyWindows = new HashSet<>();
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
index c68a94319e9..ef9b3db9a62 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
@@ -76,6 +76,11 @@ private void rejectMergingWindowFn(WindowFn<?, ?> windowFn) {
     }
   }
 
+  @Override
+  public DoFn<InputT, OutputT> getFn() {
+    return doFnRunner.getFn();
+  }
+
   @Override
   public void startBundle() {
     doFnRunner.startBundle();
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java
index 6ac80558e56..dab7fdf80b7 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java
@@ -32,6 +32,7 @@
 import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -247,6 +248,11 @@ public void testOnTimerCalled() {
     private boolean started = false;
     private boolean finished = false;
 
+    @Override
+    public DoFn<InputT, OutputT> getFn() {
+      return null;
+    }
+
     @Override
     public void startBundle() {
       started = true;
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java
index e537962e0ff..eed81b3d279 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java
@@ -45,6 +45,10 @@ private DoFnLifecycleManagerRemovingTransformEvaluator(
     this.lifecycleManager = lifecycleManager;
   }
 
+  public ParDoEvaluator<InputT> getParDoEvaluator() {
+    return underlying;
+  }
+
   @Override
   public void processElement(WindowedValue<InputT> element) throws Exception {
     try {
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 652f3880ddd..ab759287637 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -25,6 +25,7 @@
 import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
@@ -34,6 +35,7 @@
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.local.ExecutionDriver;
 import org.apache.beam.runners.local.ExecutionDriver.DriverState;
@@ -272,17 +274,43 @@ private void shutdownIfNecessary(State newState) {
       return;
     }
     LOG.debug("Pipeline has terminated. Shutting down.");
-    pipelineState.compareAndSet(State.RUNNING, newState);
+
+    final Collection<Exception> errors = new ArrayList<>();
     // Stop accepting new work before shutting down the executor. This ensures 
that thread don't try
     // to add work to the shutdown executor.
-    serialExecutorServices.invalidateAll();
-    serialExecutorServices.cleanUp();
-    parallelExecutorService.shutdown();
-    executorService.shutdown();
+    try {
+      serialExecutorServices.invalidateAll();
+    } catch (final RuntimeException re) {
+      errors.add(re);
+    }
+    try {
+      serialExecutorServices.cleanUp();
+    } catch (final RuntimeException re) {
+      errors.add(re);
+    }
+    try {
+      parallelExecutorService.shutdown();
+    } catch (final RuntimeException re) {
+      errors.add(re);
+    }
+    try {
+      executorService.shutdown();
+    } catch (final RuntimeException re) {
+      errors.add(re);
+    }
     try {
       registry.cleanup();
-    } catch (Exception e) {
-      visibleUpdates.failed(e);
+    } catch (final Exception e) {
+      errors.add(e);
+    }
+    pipelineState.compareAndSet(State.RUNNING, newState); // ensure we hit a 
terminal node
+    if (!errors.isEmpty()) {
+      final IllegalStateException exception = new IllegalStateException(
+        "Error" + (errors.size() == 1 ? "" : "s") + " during executor 
shutdown:\n"
+        + errors.stream().map(Exception::getMessage)
+          .collect(Collectors.joining("\n- ", "- ", "")));
+      visibleUpdates.failed(exception);
+      throw exception;
     }
   }
 
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index 7694b94f46f..c2b877f54df 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -169,6 +169,14 @@ private ParDoEvaluator(
     }
   }
 
+  public PushbackSideInputDoFnRunner<InputT, ?> getFnRunner() {
+    return fnRunner;
+  }
+
+  public DirectStepContext getStepContext() {
+    return stepContext;
+  }
+
   public BundleOutputManager getOutputManager() {
     return outputManager;
   }
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
index f4c489544b2..718178c1fab 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
@@ -17,18 +17,22 @@
  */
 package org.apache.beam.runners.direct;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.cache.CacheLoader;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.util.Collection;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.runners.core.DoFnRunners.OutputManager;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import 
org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker;
 import org.apache.beam.runners.core.OutputWindowedValue;
+import org.apache.beam.runners.core.ProcessFnRunner;
 import 
org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems.ProcessElements;
 import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems.ProcessFn;
-import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -50,16 +54,33 @@
     implements TransformEvaluatorFactory {
   private final ParDoEvaluatorFactory<KeyedWorkItem<String, KV<InputT, 
RestrictionT>>, OutputT>
       delegateFactory;
+  private final ScheduledExecutorService ses;
   private final EvaluationContext evaluationContext;
 
   SplittableProcessElementsEvaluatorFactory(EvaluationContext 
evaluationContext) {
     this.evaluationContext = evaluationContext;
     this.delegateFactory =
-        new ParDoEvaluatorFactory<>(
-            evaluationContext,
-            SplittableProcessElementsEvaluatorFactory
-                .<InputT, OutputT, RestrictionT>processFnRunnerFactory(),
-            ParDoEvaluatorFactory.basicDoFnCacheLoader());
+      new ParDoEvaluatorFactory<>(
+        evaluationContext,
+        SplittableProcessElementsEvaluatorFactory.
+          <InputT, OutputT, RestrictionT>processFnRunnerFactory(),
+          new CacheLoader<AppliedPTransform<?, ?, ?>, DoFnLifecycleManager>() {
+            @Override
+            public DoFnLifecycleManager load(final AppliedPTransform<?, ?, ?> 
application) {
+              checkArgument(
+                ProcessElements.class.isInstance(application.getTransform()),
+                "No know extraction of the fn from " + application);
+              final ProcessElements<InputT, OutputT, RestrictionT, TrackerT> 
transform =
+                (ProcessElements<InputT, OutputT, RestrictionT, TrackerT>)
+                  application.getTransform();
+              return 
DoFnLifecycleManager.of(transform.newProcessFn(transform.getFn()));
+            }
+          });
+    this.ses = Executors.newSingleThreadScheduledExecutor(
+      new ThreadFactoryBuilder()
+        .setThreadFactory(MoreExecutors.platformThreadFactory())
+        
.setNameFormat("direct-splittable-process-element-checkpoint-executor_" + 
hashCode())
+        .build());
   }
 
   @Override
@@ -74,6 +95,7 @@
 
   @Override
   public void cleanup() throws Exception {
+    ses.shutdownNow(); // stop before cleaning
     delegateFactory.cleanup();
   }
 
@@ -88,41 +110,28 @@ public void cleanup() throws Exception {
     final ProcessElements<InputT, OutputT, RestrictionT, TrackerT> transform =
         application.getTransform();
 
-    ProcessFn<InputT, OutputT, RestrictionT, TrackerT> processFn =
-        transform.newProcessFn(transform.getFn());
-
-    DoFnLifecycleManager fnManager = DoFnLifecycleManager.of(processFn);
-    processFn =
-        ((ProcessFn<InputT, OutputT, RestrictionT, TrackerT>)
-            fnManager.<KeyedWorkItem<String, KV<InputT, RestrictionT>>, 
OutputT>get());
-
-    String stepName = evaluationContext.getStepName(application);
-    final DirectExecutionContext.DirectStepContext stepContext =
-        evaluationContext
-            .getExecutionContext(application, inputBundle.getKey())
-            .getStepContext(stepName);
-
-    final ParDoEvaluator<KeyedWorkItem<String, KV<InputT, RestrictionT>>>
-        parDoEvaluator =
-            delegateFactory.createParDoEvaluator(
-                application,
-                inputBundle.getKey(),
-                (PCollection<KeyedWorkItem<String, KV<InputT, RestrictionT>>>)
-                    inputBundle.getPCollection(),
-                transform.getSideInputs(),
-                transform.getMainOutputTag(),
-                transform.getAdditionalOutputTags().getAll(),
-                stepContext,
-                processFn,
-                fnManager);
-
-    processFn.setStateInternalsFactory(key -> (StateInternals) 
stepContext.stateInternals());
-
+    final DoFnLifecycleManagerRemovingTransformEvaluator
+      <KeyedWorkItem<String, KV<InputT, RestrictionT>>> evaluator =
+      delegateFactory.createEvaluator(
+        (AppliedPTransform) application,
+        (PCollection<KeyedWorkItem<String, KV<InputT, RestrictionT>>>) 
inputBundle.getPCollection(),
+        inputBundle.getKey(),
+        application.getTransform().getSideInputs(),
+        application.getTransform().getMainOutputTag(),
+        application.getTransform().getAdditionalOutputTags().getAll());
+    final ParDoEvaluator<KeyedWorkItem<String, KV<InputT, RestrictionT>>> pde =
+      evaluator.getParDoEvaluator();
+    final ProcessFn<InputT, OutputT, RestrictionT, TrackerT> processFn =
+      (ProcessFn<InputT, OutputT, RestrictionT, TrackerT>)
+        ProcessFnRunner.class.cast(pde.getFnRunner()).getFn();
+
+    final DirectExecutionContext.DirectStepContext stepContext = 
pde.getStepContext();
+    processFn.setStateInternalsFactory(key -> stepContext.stateInternals());
     processFn.setTimerInternalsFactory(key -> stepContext.timerInternals());
 
     OutputWindowedValue<OutputT> outputWindowedValue =
         new OutputWindowedValue<OutputT>() {
-          private final OutputManager outputManager = 
parDoEvaluator.getOutputManager();
+          private final OutputManager outputManager = pde.getOutputManager();
 
           @Override
           public void outputWindowedValue(
@@ -150,21 +159,13 @@ public void outputWindowedValue(
             evaluationContext.getPipelineOptions(),
             outputWindowedValue,
             evaluationContext.createSideInputReader(transform.getSideInputs()),
-            // TODO: For better performance, use a higher-level executor?
-            // TODO: (BEAM-723) Create a shared ExecutorService for 
maintenance tasks in the
-            // DirectRunner.
-            Executors.newSingleThreadScheduledExecutor(
-                new ThreadFactoryBuilder()
-                    .setThreadFactory(MoreExecutors.platformThreadFactory())
-                    .setDaemon(true)
-                    
.setNameFormat("direct-splittable-process-element-checkpoint-executor")
-                    .build()),
+            ses,
             // Setting small values here to stimulate frequent checkpointing 
and better exercise
             // splittable DoFn's in that respect.
             100,
             Duration.standardSeconds(1)));
 
-    return 
DoFnLifecycleManagerRemovingTransformEvaluator.wrapping(parDoEvaluator, 
fnManager);
+    return evaluator;
   }
 
   private static <InputT, OutputT, RestrictionT>
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index 830d0c17757..3f2a4772a97 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -19,6 +19,7 @@
 
 import static com.google.common.base.Preconditions.checkState;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.isA;
 import static org.junit.Assert.assertThat;
@@ -39,6 +40,7 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
@@ -299,6 +301,39 @@ public void hang(ProcessContext context) throws 
InterruptedException {
     assertThat(result.getState(), is(State.RUNNING));
   }
 
+  private static final AtomicLong TEARDOWN_CALL = new AtomicLong(-1);
+
+  @Test
+  public void tearsDownFnsBeforeFinishing() {
+    TEARDOWN_CALL.set(-1);
+    final Pipeline pipeline = getPipeline();
+    pipeline.apply(Create.of("a"))
+      .apply(ParDo.of(new DoFn<String, String>() {
+        @ProcessElement
+        public void onElement(final ProcessContext ctx) {
+            // no-op
+        }
+
+        @Teardown
+        public void teardown() {
+          // just to not have a fast execution hiding an issue until we have a 
shutdown callback
+          try {
+            Thread.sleep(1000);
+          } catch (final InterruptedException e) {
+            fail();
+          }
+          TEARDOWN_CALL.set(System.nanoTime());
+        }
+      }));
+    final PipelineResult pipelineResult = pipeline.run();
+    pipelineResult.waitUntilFinish();
+
+    final long doneTs = System.nanoTime();
+    final long tearDownTs = TEARDOWN_CALL.get();
+    assertThat(tearDownTs, greaterThan(0L));
+    assertThat(doneTs, greaterThan(tearDownTs));
+  }
+
   @Test
   public void transformDisplayDataExceptionShouldFail() {
     DoFn<Integer, Integer> brokenDoFn = new DoFn<Integer, Integer>() {
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java
index ccce127eeed..3dd9675ebaa 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java
@@ -23,6 +23,7 @@
 import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.flink.api.common.functions.RuntimeContext;
@@ -91,4 +92,9 @@ public void finishBundle() {
     // update metrics
     container.updateMetrics();
   }
+
+  @Override
+  public DoFn<InputT, OutputT> getFn() {
+    return delegate.getFn();
+  }
 }
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java
index 394b80b0931..3517b4a18f2 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java
@@ -26,6 +26,7 @@
 import org.apache.beam.sdk.metrics.MetricsContainer;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.spark.Accumulator;
@@ -49,6 +50,11 @@
     this.metricsAccum = metricsAccum;
   }
 
+  @Override
+  public DoFn<InputT, OutputT> getFn() {
+    return delegate.getFn();
+  }
+
   @Override
   public void startBundle() {
     try (Closeable ignored = 
MetricsEnvironment.scopedMetricsContainer(metricsContainer())) {
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index f7dcb650b4b..abfc9a36ed7 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -443,6 +443,11 @@ public void finishBundle() {
     stateKeyObjectCache.clear();
   }
 
+  @Override
+  public DoFn<InputT, OutputT> getFn() {
+    return doFnInvoker.getFn();
+  }
+
   /**
    * Outputs the given element to the specified set of consumers wrapping any 
exceptions.
    */


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 87881)
    Time Spent: 7h 10m  (was: 7h)

> Unexpected behavior of DoFn teardown method running in unit tests 
> ------------------------------------------------------------------
>
>                 Key: BEAM-3409
>                 URL: https://issues.apache.org/jira/browse/BEAM-3409
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-direct
>    Affects Versions: 2.3.0
>            Reporter: Alexey Romanenko
>            Assignee: Romain Manni-Bucau
>            Priority: Blocker
>              Labels: test
>             Fix For: 2.5.0
>
>          Time Spent: 7h 10m
>  Remaining Estimate: 0h
>
> Writing a unit test, I found out a strange behaviour of Teardown method of 
> DoFn implementation when I run this method in unit tests using TestPipeline.
> To be more precise, it doesn’t wait until teardown() method will be finished, 
> it just exits from this method after about 1 sec (on my machine) even if it 
> should take longer (very simple example - running infinite loop inside this 
> method or put thread in sleep). In the same time, when I run the same code 
> from main() with ordinary Pipeline and direct runner, then it’s ok and it 
> works as expected - teardown() method will be performed completely despite 
> how much time it will take.
> I created two test cases to reproduce this issue - the first one to run with 
> main() and the second one to run with junit. They use the same implementation 
> of DoFn (class LongTearDownFn) and expects that teardown method will be 
> running at least for SLEEP_TIME ms. In case of running as junit test it's not 
> a case (see output log).
> - run with main()
> https://github.com/aromanenko-dev/beam-samples/blob/master/runners-tests/src/main/java/TearDown.java
> - run with junit
> https://github.com/aromanenko-dev/beam-samples/blob/master/runners-tests/src/test/java/TearDownTest.java



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to