Repository: beam Updated Branches: refs/heads/master 2d9bf2747 -> 752a4c9d9
Support waitUntilFinish, cancel in the DirectRunner Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/caacf297 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/caacf297 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/caacf297 Branch: refs/heads/master Commit: caacf297023d0d6e4a6bfe2f7ccd6edb73914d89 Parents: 2d9bf27 Author: Thomas Groh <[email protected]> Authored: Mon Oct 17 11:21:02 2016 -0700 Committer: Thomas Groh <[email protected]> Committed: Wed Mar 22 09:43:47 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/direct/DirectRunner.java | 43 +++--- .../direct/ExecutorServiceParallelExecutor.java | 137 ++++++++++++++----- .../beam/runners/direct/PipelineExecutor.java | 23 +++- .../direct/TransformExecutorService.java | 6 + .../direct/TransformExecutorServices.java | 22 ++- .../beam/runners/direct/DirectRunnerTest.java | 55 ++++++++ .../src/main/resources/beam/findbugs-filter.xml | 34 +++++ 7 files changed, 264 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/caacf297/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index 06aa3b1..4992c6a 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -22,7 +22,6 @@ import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; @@ -420,14 +419,34 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> { * {@link DirectOptions#isShutdownUnboundedProducersWithMaxWatermark()} set to false, * this method will never return. * - * <p>See also {@link PipelineExecutor#awaitCompletion()}. + * <p>See also {@link PipelineExecutor#waitUntilFinish(Duration)}. */ @Override public State waitUntilFinish() { - if (!state.isTerminal()) { + return waitUntilFinish(Duration.ZERO); + } + + @Override + public State cancel() { + this.state = executor.getPipelineState(); + if (!this.state.isTerminal()) { + executor.stop(); + this.state = executor.getPipelineState(); + } + return executor.getPipelineState(); + } + + @Override + public State waitUntilFinish(Duration duration) { + State startState = this.state; + if (!startState.isTerminal()) { try { - executor.awaitCompletion(); - state = State.DONE; + state = executor.waitUntilFinish(duration); + } catch (UserCodeException uce) { + // Emulates the behavior of Pipeline#run(), where a stack trace caused by a + // UserCodeException is truncated and replaced with the stack starting at the call to + // waitToFinish + throw new Pipeline.PipelineExecutionException(uce.getCause()); } catch (Exception e) { if (e instanceof InterruptedException) { Thread.currentThread().interrupt(); @@ -438,19 +457,7 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> { throw new RuntimeException(e); } } - return state; - } - - @Override - public State cancel() throws IOException { - throw new UnsupportedOperationException("DirectPipelineResult does not support cancel."); - } - - @Override - public State waitUntilFinish(Duration duration) { - throw new UnsupportedOperationException( - "DirectPipelineResult does not support waitUntilFinish with a Duration parameter. See" - + " BEAM-596."); + return this.state; } } http://git-wip-us.apache.org/repos/asf/beam/blob/caacf297/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java index 935104a..8b9f995 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java @@ -25,6 +25,8 @@ import com.google.common.base.Optional; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; import com.google.common.collect.Iterables; import java.util.ArrayList; import java.util.Collection; @@ -48,6 +50,7 @@ import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.runners.direct.WatermarkManager.FiredTimers; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult.State; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.util.UserCodeException; @@ -55,6 +58,8 @@ import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PValue; +import org.joda.time.Duration; +import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -99,6 +104,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor { * {@link CompletionCallback} decrement this value. */ private final AtomicLong outstandingWork = new AtomicLong(); + private AtomicReference<State> pipelineState = new AtomicReference<>(State.RUNNING); public static ExecutorServiceParallelExecutor create( int targetParallelism, @@ -138,7 +144,10 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor { // Executing TransformExecutorServices have a strong reference to their TransformExecutorService // which stops the TransformExecutorServices from being prematurely garbage collected executorServices = - CacheBuilder.newBuilder().weakValues().build(serialTransformExecutorServiceCacheLoader()); + CacheBuilder.newBuilder() + .weakValues() + .removalListener(shutdownExecutorServiceListener()) + .build(serialTransformExecutorServiceCacheLoader()); this.allUpdates = new ConcurrentLinkedQueue<>(); this.visibleUpdates = new LinkedBlockingQueue<>(); @@ -159,6 +168,19 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor { }; } + private RemovalListener<StepAndKey, TransformExecutorService> shutdownExecutorServiceListener() { + return new RemovalListener<StepAndKey, TransformExecutorService>() { + @Override + public void onRemoval( + RemovalNotification<StepAndKey, TransformExecutorService> notification) { + TransformExecutorService service = notification.getValue(); + if (service != null) { + service.shutdown(); + } + } + }; + } + @Override public void start(Collection<AppliedPTransform<?, ?, ?>> roots) { int numTargetSplits = Math.max(3, targetParallelism); @@ -179,7 +201,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor { } @SuppressWarnings("unchecked") - public void scheduleConsumption( + private void scheduleConsumption( AppliedPTransform<?, ?, ?> consumer, CommittedBundle<?> bundle, CompletionCallback onComplete) { @@ -219,7 +241,9 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor { onComplete, transformExecutor); outstandingWork.incrementAndGet(); - transformExecutor.schedule(callable); + if (!pipelineState.get().isTerminal()) { + transformExecutor.schedule(callable); + } } private boolean isKeyed(PValue pvalue) { @@ -234,20 +258,66 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor { } @Override - public void awaitCompletion() throws Exception { - VisibleExecutorUpdate update; - do { - // Get an update; don't block forever if another thread has handled it - update = visibleUpdates.poll(2L, TimeUnit.SECONDS); - if (update == null && executorService.isShutdown()) { + public State waitUntilFinish(Duration duration) throws Exception { + Instant completionTime; + if (duration.equals(Duration.ZERO)) { + completionTime = new Instant(Long.MAX_VALUE); + } else { + completionTime = Instant.now().plus(duration); + } + + VisibleExecutorUpdate update = null; + while (Instant.now().isBefore(completionTime) + && (update == null || isTerminalStateUpdate(update))) { + // Get an update; don't block forever if another thread has handled it. The call to poll will + // wait the entire timeout; this call primarily exists to relinquish any core. + update = visibleUpdates.poll(25L, TimeUnit.MILLISECONDS); + if (update == null && pipelineState.get().isTerminal()) { // there are no updates to process and no updates will ever be published because the // executor is shutdown - return; + return pipelineState.get(); } else if (update != null && update.exception.isPresent()) { throw update.exception.get(); } - } while (update == null || !update.isDone()); + } + return pipelineState.get(); + } + + @Override + public State getPipelineState() { + return pipelineState.get(); + } + + private boolean isTerminalStateUpdate(VisibleExecutorUpdate update) { + return !(update.getNewState() == null && update.getNewState().isTerminal()); + } + + @Override + public void stop() { + shutdownIfNecessary(State.CANCELLED); + while (!visibleUpdates.offer(VisibleExecutorUpdate.cancelled())) { + // Make sure "This Pipeline was Cancelled" notification arrives. + visibleUpdates.poll(); + } + } + + private void shutdownIfNecessary(State newState) { + if (!newState.isTerminal()) { + return; + } + LOG.debug("Pipeline has terminated. Shutting down."); + pipelineState.compareAndSet(State.RUNNING, newState); + // Stop accepting new work before shutting down the executor. This ensures that thread don't try + // to add work to the shutdown executor. + executorServices.invalidateAll(); + executorServices.cleanUp(); + parallelExecutorService.shutdown(); executorService.shutdown(); + try { + registry.cleanup(); + } catch (Exception e) { + visibleUpdates.add(VisibleExecutorUpdate.fromException(e)); + } } /** @@ -341,29 +411,35 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor { } /** - * An update of interest to the user. Used in {@link #awaitCompletion} to decide whether to + * An update of interest to the user. Used in {@link #waitUntilFinish} to decide whether to * return normally or throw an exception. */ private static class VisibleExecutorUpdate { private final Optional<? extends Exception> exception; - private final boolean done; + @Nullable + private final State newState; public static VisibleExecutorUpdate fromException(Exception e) { - return new VisibleExecutorUpdate(false, e); + return new VisibleExecutorUpdate(null, e); } public static VisibleExecutorUpdate finished() { - return new VisibleExecutorUpdate(true, null); + return new VisibleExecutorUpdate(State.DONE, null); + } + + public static VisibleExecutorUpdate cancelled() { + return new VisibleExecutorUpdate(State.CANCELLED, null); } - private VisibleExecutorUpdate(boolean done, @Nullable Exception exception) { + private VisibleExecutorUpdate(State newState, @Nullable Exception exception) { this.exception = Optional.fromNullable(exception); - this.done = done; + this.newState = newState; } - public boolean isDone() { - return done; + public State getNewState() { + return newState; } + } private class MonitorRunnable implements Runnable { @@ -475,22 +551,15 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor { } private boolean shouldShutdown() { - boolean shouldShutdown = exceptionThrown || evaluationContext.isDone(); - if (shouldShutdown) { - LOG.debug("Pipeline has terminated. Shutting down."); - executorService.shutdown(); - try { - registry.cleanup(); - } catch (Exception e) { - visibleUpdates.add(VisibleExecutorUpdate.fromException(e)); - } - if (evaluationContext.isDone()) { - while (!visibleUpdates.offer(VisibleExecutorUpdate.finished())) { - visibleUpdates.poll(); - } - } + State nextState = State.UNKNOWN; + if (exceptionThrown) { + nextState = State.FAILED; + } else if (evaluationContext.isDone()) { + visibleUpdates.offer(VisibleExecutorUpdate.finished()); + nextState = State.DONE; } - return shouldShutdown; + shutdownIfNecessary(nextState); + return pipelineState.get().isTerminal(); } /** http://git-wip-us.apache.org/repos/asf/beam/blob/caacf297/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java index f900a22..82f59a7 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java @@ -19,8 +19,11 @@ package org.apache.beam.runners.direct; import java.util.Collection; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult.State; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; +import org.joda.time.Duration; /** * An executor that schedules and executes {@link AppliedPTransform AppliedPTransforms} for both @@ -40,8 +43,24 @@ interface PipelineExecutor { * root {@link AppliedPTransform AppliedPTransforms} have completed, and all * {@link CommittedBundle Bundles} have been consumed. Jobs may also terminate abnormally. * - * @throws Throwable whenever an executor thread throws anything, transfers the throwable to the + * <p>Waits for up to the provided duration, or forever if the provided duration is less than or + * equal to zero. + * + * @return The terminal state of the Pipeline. + * @throws Exception whenever an executor thread throws anything, transfers to the * waiting thread and rethrows it */ - void awaitCompletion() throws Exception; + State waitUntilFinish(Duration duration) throws Exception; + + /** + * Gets the current state of the {@link Pipeline}. + */ + State getPipelineState(); + + /** + * Shuts down the executor. + * + * <p>The executor may continue to run for a short time after this method returns. + */ + void stop(); } http://git-wip-us.apache.org/repos/asf/beam/blob/caacf297/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorService.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorService.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorService.java index 837b858..c6f770f 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorService.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorService.java @@ -32,4 +32,10 @@ interface TransformExecutorService { * {@link TransformExecutor TransformExecutors} to be evaluated. */ void complete(TransformExecutor<?> completed); + + /** + * Cancel any outstanding work, if possible. Any future calls to schedule should ignore any + * work. + */ + void shutdown(); } http://git-wip-us.apache.org/repos/asf/beam/blob/caacf297/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java index 876da9d..6733758 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java @@ -21,6 +21,7 @@ import com.google.common.base.MoreObjects; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; /** @@ -56,6 +57,7 @@ final class TransformExecutorServices { */ private static class ParallelEvaluationState implements TransformExecutorService { private final ExecutorService executor; + private final AtomicBoolean active = new AtomicBoolean(true); private ParallelEvaluationState(ExecutorService executor) { this.executor = executor; @@ -63,12 +65,19 @@ final class TransformExecutorServices { @Override public void schedule(TransformExecutor<?> work) { - executor.submit(work); + if (active.get()) { + executor.submit(work); + } } @Override public void complete(TransformExecutor<?> completed) { } + + @Override + public void shutdown() { + active.set(false); + } } /** @@ -84,6 +93,7 @@ final class TransformExecutorServices { private AtomicReference<TransformExecutor<?>> currentlyEvaluating; private final Queue<TransformExecutor<?>> workQueue; + private boolean active = true; private SerialEvaluationState(ExecutorService executor) { this.executor = executor; @@ -113,12 +123,20 @@ final class TransformExecutorServices { updateCurrentlyEvaluating(); } + @Override + public void shutdown() { + synchronized (this) { + active = false; + } + workQueue.clear(); + } + private void updateCurrentlyEvaluating() { if (currentlyEvaluating.get() == null) { // Only synchronize if we need to update what's currently evaluating synchronized (this) { TransformExecutor<?> newWork = workQueue.poll(); - if (newWork != null) { + if (active && newWork != null) { if (currentlyEvaluating.compareAndSet(null, newWork)) { executor.submit(newWork); } else { http://git-wip-us.apache.org/repos/asf/beam/blob/caacf297/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java index d2b6d1d..e601fcf 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java @@ -32,9 +32,16 @@ import java.io.Serializable; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; @@ -49,6 +56,7 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; @@ -65,6 +73,7 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.TypeDescriptor; import org.hamcrest.Matchers; +import org.joda.time.Duration; import org.junit.Rule; import org.junit.Test; import org.junit.internal.matchers.ThrowableMessageMatcher; @@ -222,6 +231,52 @@ public class DirectRunnerTest implements Serializable { } @Test + public void cancelShouldStopPipeline() throws Exception { + PipelineOptions opts = TestPipeline.testingPipelineOptions(); + opts.as(DirectOptions.class).setBlockOnRun(false); + opts.setRunner(DirectRunner.class); + + final Pipeline p = Pipeline.create(opts); + p.apply(CountingInput.unbounded().withRate(1L, Duration.standardSeconds(1))); + + final BlockingQueue<PipelineResult> resultExchange = new ArrayBlockingQueue<>(1); + Runnable cancelRunnable = new Runnable() { + @Override + public void run() { + try { + resultExchange.take().cancel(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(e); + } catch (IOException e) { + throw new IllegalStateException(e); + } + } + }; + + Callable<PipelineResult> runPipelineRunnable = new Callable<PipelineResult>() { + @Override + public PipelineResult call() { + PipelineResult res = p.run(); + try { + resultExchange.put(res); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(e); + } + return res; + } + }; + + ExecutorService executor = Executors.newCachedThreadPool(); + executor.submit(cancelRunnable); + Future<PipelineResult> result = executor.submit(runPipelineRunnable); + + // If cancel doesn't work, this will hang forever + result.get().waitUntilFinish(); + } + + @Test public void transformDisplayDataExceptionShouldFail() { DoFn<Integer, Integer> brokenDoFn = new DoFn<Integer, Integer>() { @ProcessElement http://git-wip-us.apache.org/repos/asf/beam/blob/caacf297/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml ---------------------------------------------------------------------- diff --git a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml index 714fd00..2799b00 100644 --- a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml +++ b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml @@ -155,6 +155,40 @@ </Match> <Match> + <Class name="org.apache.beam.runners.direct.ExecutorServiceParallelExecutor$MonitorRunnable" /> + <Method name="shouldShutdown" /> + <Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE" /> + <!-- visibleUpdates is a non-capacity-limited LinkedBlockingQueue, which + can never refuse an offered update --> + </Match> + + <Match> + <Class name="org.apache.beam.runners.spark.util.BroadcastHelper$CodedBroadcastHelper"/> + <Or> + <Field name="bcast" /> + <Field name="value" /> + </Or> + <Bug pattern="IS2_INCONSISTENT_SYNC"/> + <!-- + Spark's Broadcast variables are a distributed and cached objects + and should not be treated as "normal" objects. + --> + </Match> + + <Match> + <Class name="org.apache.beam.runners.spark.util.BroadcastHelper$DirectBroadcastHelper"/> + <Or> + <Field name="bcast" /> + <Field name="value" /> + </Or> + <Bug pattern="IS2_INCONSISTENT_SYNC"/> + <!-- + Spark's Broadcast variables are a distributed and cached objects + and should not be treated as "normal" objects. + --> + </Match> + + <Match> <Class name="org.apache.beam.runners.spark.metrics.sink.CsvSink"/> <Bug pattern="NM_SAME_SIMPLE_NAME_AS_SUPERCLASS"/> <!-- Intentionally overriding parent name because inheritors should replace the parent. -->
