Handle Errors in the DirectRunner

When a worker dies because of an error, propagate that error and fail
the Pipeline.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f03c04a7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f03c04a7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f03c04a7

Branch: refs/heads/master
Commit: f03c04a787343c3710355c84a105582cdc815469
Parents: 28180c4
Author: Thomas Groh <tg...@google.com>
Authored: Tue May 9 09:46:38 2017 -0700
Committer: Dan Halperin <dhalp...@google.com>
Committed: Tue May 9 11:49:14 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/CompletionCallback.java |  6 +++++
 .../direct/ExecutorServiceParallelExecutor.java | 27 +++++++++++++++-----
 .../beam/runners/direct/TransformExecutor.java  | 12 +++++++--
 .../apache/beam/runners/direct/MockClock.java   |  2 +-
 .../runners/direct/TransformExecutorTest.java   |  5 ++++
 5 files changed, 43 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f03c04a7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
index 0af22c8..417fa09 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
@@ -40,4 +40,10 @@ interface CompletionCallback {
    * Handle a result that terminated abnormally due to the provided {@link 
Exception}.
    */
   void handleException(CommittedBundle<?> inputBundle, Exception t);
+
+  /**
+   * Handle a result that terminated abnormally due to the provided {@link 
Error}. The pipeline
+   * should be shut down, and the Error propagated.
+  */
+  void handleError(Error err);
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/f03c04a7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index b7f4732..71ab4cc 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -285,8 +285,15 @@ final class ExecutorServiceParallelExecutor implements 
PipelineExecutor {
         // there are no updates to process and no updates will ever be 
published because the
         // executor is shutdown
         return pipelineState.get();
-      } else if (update != null && update.exception.isPresent()) {
-        throw update.exception.get();
+      } else if (update != null && update.thrown.isPresent()) {
+        Throwable thrown = update.thrown.get();
+        if (thrown instanceof Exception) {
+          throw (Exception) thrown;
+        } else if (thrown instanceof Error) {
+          throw (Error) thrown;
+        } else {
+          throw new Exception("Unknown Type of Throwable", thrown);
+        }
       }
     }
     return pipelineState.get();
@@ -380,6 +387,11 @@ final class ExecutorServiceParallelExecutor implements 
PipelineExecutor {
       allUpdates.offer(ExecutorUpdate.fromException(e));
       outstandingWork.decrementAndGet();
     }
+
+    @Override
+    public void handleError(Error err) {
+      visibleUpdates.add(VisibleExecutorUpdate.fromError(err));
+    }
   }
 
   /**
@@ -424,7 +436,7 @@ final class ExecutorServiceParallelExecutor implements 
PipelineExecutor {
    * return normally or throw an exception.
    */
   private static class VisibleExecutorUpdate {
-    private final Optional<? extends Exception> exception;
+    private final Optional<? extends Throwable> thrown;
     @Nullable
     private final State newState;
 
@@ -432,6 +444,10 @@ final class ExecutorServiceParallelExecutor implements 
PipelineExecutor {
       return new VisibleExecutorUpdate(null, e);
     }
 
+    public static VisibleExecutorUpdate fromError(Error err) {
+      return new VisibleExecutorUpdate(State.FAILED, err);
+    }
+
     public static VisibleExecutorUpdate finished() {
       return new VisibleExecutorUpdate(State.DONE, null);
     }
@@ -440,15 +456,14 @@ final class ExecutorServiceParallelExecutor implements 
PipelineExecutor {
       return new VisibleExecutorUpdate(State.CANCELLED, null);
     }
 
-    private VisibleExecutorUpdate(State newState, @Nullable Exception 
exception) {
-      this.exception = Optional.fromNullable(exception);
+    private VisibleExecutorUpdate(State newState, @Nullable Throwable 
exception) {
+      this.thrown = Optional.fromNullable(exception);
       this.newState = newState;
     }
 
     public State getNewState() {
       return newState;
     }
-
   }
 
   private class MonitorRunnable implements Runnable {

http://git-wip-us.apache.org/repos/asf/beam/blob/f03c04a7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
index 8e1515b..56f8650 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
@@ -26,6 +26,8 @@ import org.apache.beam.sdk.metrics.MetricsContainer;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A {@link Callable} responsible for constructing a {@link 
TransformEvaluator} from a
@@ -36,6 +38,8 @@ import org.apache.beam.sdk.util.WindowedValue;
  * that it is being executed on.
  */
 class TransformExecutor<T> implements Runnable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TransformExecutor.class);
+
   public static <T> TransformExecutor<T> create(
       EvaluationContext context,
       TransformEvaluatorFactory factory,
@@ -112,6 +116,10 @@ class TransformExecutor<T> implements Runnable {
         throw (RuntimeException) e;
       }
       throw new RuntimeException(e);
+    } catch (Error err) {
+      LOG.error("Error occurred within {}", this, err);
+      onComplete.handleError(err);
+      throw err;
     } finally {
       // Report the physical metrics from the end of this step.
       context.getMetrics().commitPhysical(inputBundle, 
metricsContainer.getCumulative());
@@ -162,8 +170,8 @@ class TransformExecutor<T> implements Runnable {
       TransformEvaluator<T> evaluator, MetricsContainer metricsContainer,
       Collection<ModelEnforcement<T>> enforcements)
       throws Exception {
-    TransformResult<T> result = evaluator.finishBundle()
-        .withLogicalMetricUpdates(metricsContainer.getCumulative());
+    TransformResult<T> result =
+        
evaluator.finishBundle().withLogicalMetricUpdates(metricsContainer.getCumulative());
     CommittedResult outputs = onComplete.handleResult(inputBundle, result);
     for (ModelEnforcement<T> enforcement : enforcements) {
       enforcement.afterFinish(inputBundle, result, outputs.getOutputs());

http://git-wip-us.apache.org/repos/asf/beam/blob/f03c04a7/runners/direct-java/src/test/java/org/apache/beam/runners/direct/MockClock.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/MockClock.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/MockClock.java
index 11ecbff..9275e3c 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/MockClock.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/MockClock.java
@@ -28,7 +28,7 @@ import org.joda.time.Instant;
  *
  * <p>For uses of the {@link Clock} interface in unit tests.
  */
-public class MockClock implements Clock {
+class MockClock implements Clock {
 
   private Instant now;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/f03c04a7/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
index dc0ef7c..86412a0 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
@@ -434,6 +434,11 @@ public class TransformExecutorTest {
       handledException = e;
       onMethod.countDown();
     }
+
+    @Override
+    public void handleError(Error err) {
+      throw err;
+    }
   }
 
   private static class TestEnforcementFactory implements 
ModelEnforcementFactory {

Reply via email to