Handle Errors in the DirectRunner When a worker dies because of an error, propagate that error and fail the Pipeline.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f03c04a7 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f03c04a7 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f03c04a7 Branch: refs/heads/master Commit: f03c04a787343c3710355c84a105582cdc815469 Parents: 28180c4 Author: Thomas Groh <tg...@google.com> Authored: Tue May 9 09:46:38 2017 -0700 Committer: Dan Halperin <dhalp...@google.com> Committed: Tue May 9 11:49:14 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/direct/CompletionCallback.java | 6 +++++ .../direct/ExecutorServiceParallelExecutor.java | 27 +++++++++++++++----- .../beam/runners/direct/TransformExecutor.java | 12 +++++++-- .../apache/beam/runners/direct/MockClock.java | 2 +- .../runners/direct/TransformExecutorTest.java | 5 ++++ 5 files changed, 43 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/f03c04a7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java index 0af22c8..417fa09 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java @@ -40,4 +40,10 @@ interface CompletionCallback { * Handle a result that terminated abnormally due to the provided {@link Exception}. */ void handleException(CommittedBundle<?> inputBundle, Exception t); + + /** + * Handle a result that terminated abnormally due to the provided {@link Error}. The pipeline + * should be shut down, and the Error propagated. + */ + void handleError(Error err); } http://git-wip-us.apache.org/repos/asf/beam/blob/f03c04a7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java index b7f4732..71ab4cc 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java @@ -285,8 +285,15 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor { // there are no updates to process and no updates will ever be published because the // executor is shutdown return pipelineState.get(); - } else if (update != null && update.exception.isPresent()) { - throw update.exception.get(); + } else if (update != null && update.thrown.isPresent()) { + Throwable thrown = update.thrown.get(); + if (thrown instanceof Exception) { + throw (Exception) thrown; + } else if (thrown instanceof Error) { + throw (Error) thrown; + } else { + throw new Exception("Unknown Type of Throwable", thrown); + } } } return pipelineState.get(); @@ -380,6 +387,11 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor { allUpdates.offer(ExecutorUpdate.fromException(e)); outstandingWork.decrementAndGet(); } + + @Override + public void handleError(Error err) { + visibleUpdates.add(VisibleExecutorUpdate.fromError(err)); + } } /** @@ -424,7 +436,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor { * return normally or throw an exception. */ private static class VisibleExecutorUpdate { - private final Optional<? extends Exception> exception; + private final Optional<? extends Throwable> thrown; @Nullable private final State newState; @@ -432,6 +444,10 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor { return new VisibleExecutorUpdate(null, e); } + public static VisibleExecutorUpdate fromError(Error err) { + return new VisibleExecutorUpdate(State.FAILED, err); + } + public static VisibleExecutorUpdate finished() { return new VisibleExecutorUpdate(State.DONE, null); } @@ -440,15 +456,14 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor { return new VisibleExecutorUpdate(State.CANCELLED, null); } - private VisibleExecutorUpdate(State newState, @Nullable Exception exception) { - this.exception = Optional.fromNullable(exception); + private VisibleExecutorUpdate(State newState, @Nullable Throwable exception) { + this.thrown = Optional.fromNullable(exception); this.newState = newState; } public State getNewState() { return newState; } - } private class MonitorRunnable implements Runnable { http://git-wip-us.apache.org/repos/asf/beam/blob/f03c04a7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java index 8e1515b..56f8650 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java @@ -26,6 +26,8 @@ import org.apache.beam.sdk.metrics.MetricsContainer; import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.util.WindowedValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A {@link Callable} responsible for constructing a {@link TransformEvaluator} from a @@ -36,6 +38,8 @@ import org.apache.beam.sdk.util.WindowedValue; * that it is being executed on. */ class TransformExecutor<T> implements Runnable { + private static final Logger LOG = LoggerFactory.getLogger(TransformExecutor.class); + public static <T> TransformExecutor<T> create( EvaluationContext context, TransformEvaluatorFactory factory, @@ -112,6 +116,10 @@ class TransformExecutor<T> implements Runnable { throw (RuntimeException) e; } throw new RuntimeException(e); + } catch (Error err) { + LOG.error("Error occurred within {}", this, err); + onComplete.handleError(err); + throw err; } finally { // Report the physical metrics from the end of this step. context.getMetrics().commitPhysical(inputBundle, metricsContainer.getCumulative()); @@ -162,8 +170,8 @@ class TransformExecutor<T> implements Runnable { TransformEvaluator<T> evaluator, MetricsContainer metricsContainer, Collection<ModelEnforcement<T>> enforcements) throws Exception { - TransformResult<T> result = evaluator.finishBundle() - .withLogicalMetricUpdates(metricsContainer.getCumulative()); + TransformResult<T> result = + evaluator.finishBundle().withLogicalMetricUpdates(metricsContainer.getCumulative()); CommittedResult outputs = onComplete.handleResult(inputBundle, result); for (ModelEnforcement<T> enforcement : enforcements) { enforcement.afterFinish(inputBundle, result, outputs.getOutputs()); http://git-wip-us.apache.org/repos/asf/beam/blob/f03c04a7/runners/direct-java/src/test/java/org/apache/beam/runners/direct/MockClock.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/MockClock.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/MockClock.java index 11ecbff..9275e3c 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/MockClock.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/MockClock.java @@ -28,7 +28,7 @@ import org.joda.time.Instant; * * <p>For uses of the {@link Clock} interface in unit tests. */ -public class MockClock implements Clock { +class MockClock implements Clock { private Instant now; http://git-wip-us.apache.org/repos/asf/beam/blob/f03c04a7/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java index dc0ef7c..86412a0 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java @@ -434,6 +434,11 @@ public class TransformExecutorTest { handledException = e; onMethod.countDown(); } + + @Override + public void handleError(Error err) { + throw err; + } } private static class TestEnforcementFactory implements ModelEnforcementFactory {