Improve Work Rejection handling The timing between checking for a shutdown state and submitting work is racy. Re-check to see if a work rejection is acceptable because the underlying executor is shut down before throwing a RejectedExecutionException.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/db81205a Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/db81205a Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/db81205a Branch: refs/heads/master Commit: db81205af9b38245ff85ce2801af433cc31bce42 Parents: 7c169a6 Author: Thomas Groh <[email protected]> Authored: Tue Apr 11 11:33:53 2017 -0700 Committer: Thomas Groh <[email protected]> Committed: Tue Apr 11 13:44:34 2017 -0700 ---------------------------------------------------------------------- .../direct/TransformExecutorServices.java | 37 +++++++++++---- .../direct/TransformExecutorServicesTest.java | 48 ++++++++++++++++++++ 2 files changed, 77 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/db81205a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java index 6733758..53087bf 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java @@ -21,8 +21,11 @@ import com.google.common.base.MoreObjects; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Static factory methods for constructing instances of {@link TransformExecutorService}. @@ -37,7 +40,7 @@ final class TransformExecutorServices { * parallel. */ public static TransformExecutorService parallel(ExecutorService executor) { - return new ParallelEvaluationState(executor); + return new ParallelTransformExecutor(executor); } /** @@ -45,7 +48,7 @@ final class TransformExecutorServices { * serial. */ public static TransformExecutorService serial(ExecutorService executor) { - return new SerialEvaluationState(executor); + return new SerialTransformExecutor(executor); } /** @@ -55,18 +58,36 @@ final class TransformExecutorServices { * <p>A principal use of this is for the evaluation of an unkeyed Step. Unkeyed computations are * processed in parallel. */ - private static class ParallelEvaluationState implements TransformExecutorService { + private static class ParallelTransformExecutor implements TransformExecutorService { + private static final Logger LOG = LoggerFactory.getLogger(ParallelTransformExecutor.class); + private final ExecutorService executor; private final AtomicBoolean active = new AtomicBoolean(true); - private ParallelEvaluationState(ExecutorService executor) { + private ParallelTransformExecutor(ExecutorService executor) { this.executor = executor; } @Override public void schedule(TransformExecutor<?> work) { if (active.get()) { - executor.submit(work); + try { + executor.submit(work); + } catch (RejectedExecutionException rejected) { + boolean stillActive = active.get(); + if (stillActive) { + throw new IllegalStateException( + String.format( + "Execution of Work %s was rejected, but the %s is still active", + work, ParallelTransformExecutor.class.getSimpleName())); + } else { + LOG.debug( + "Rejected execution of Work {} on executor {}. " + + "Suppressed exception because evaluator is not active", + work, + this); + } + } } } @@ -88,14 +109,14 @@ final class TransformExecutorServices { * <p>A principal use of this is for the serial evaluation of a (Step, Key) pair. * Keyed computations are processed serially per step. */ - private static class SerialEvaluationState implements TransformExecutorService { + private static class SerialTransformExecutor implements TransformExecutorService { private final ExecutorService executor; private AtomicReference<TransformExecutor<?>> currentlyEvaluating; private final Queue<TransformExecutor<?>> workQueue; private boolean active = true; - private SerialEvaluationState(ExecutorService executor) { + private SerialTransformExecutor(ExecutorService executor) { this.executor = executor; this.currentlyEvaluating = new AtomicReference<>(); this.workQueue = new ConcurrentLinkedQueue<>(); @@ -149,7 +170,7 @@ final class TransformExecutorServices { @Override public String toString() { - return MoreObjects.toStringHelper(SerialEvaluationState.class) + return MoreObjects.toStringHelper(SerialTransformExecutor.class) .add("currentlyEvaluating", currentlyEvaluating) .add("workQueue", workQueue) .toString(); http://git-wip-us.apache.org/repos/asf/beam/blob/db81205a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorServicesTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorServicesTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorServicesTest.java index b085723..77652b2 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorServicesTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorServicesTest.java @@ -64,6 +64,31 @@ public class TransformExecutorServicesTest { } @Test + public void parallelRejectedStillActiveThrows() { + @SuppressWarnings("unchecked") + TransformExecutor<Object> first = mock(TransformExecutor.class); + + TransformExecutorService parallel = + TransformExecutorServices.parallel(executorService); + executorService.shutdown(); + thrown.expect(IllegalStateException.class); + thrown.expectMessage("still active"); + parallel.schedule(first); + } + + @Test + public void parallelRejectedShutdownSucceeds() { + @SuppressWarnings("unchecked") + TransformExecutor<Object> first = mock(TransformExecutor.class); + + TransformExecutorService parallel = + TransformExecutorServices.parallel(executorService); + executorService.shutdown(); + parallel.shutdown(); + parallel.schedule(first); + } + + @Test public void serialScheduleTwoWaitsForFirstToComplete() { @SuppressWarnings("unchecked") TransformExecutor<Object> first = mock(TransformExecutor.class); @@ -97,4 +122,27 @@ public class TransformExecutorServicesTest { serial.complete(second); } + + /** + * Tests that a Serial {@link TransformExecutorService} does not schedule follow up work if the + * executor is shut down when the initial work completes. + */ + @Test + public void serialShutdownCompleteActive() { + @SuppressWarnings("unchecked") + TransformExecutor<Object> first = mock(TransformExecutor.class); + @SuppressWarnings("unchecked") + TransformExecutor<Object> second = mock(TransformExecutor.class); + + TransformExecutorService serial = TransformExecutorServices.serial(executorService); + serial.schedule(first); + verify(first).run(); + + serial.schedule(second); + verify(second, never()).run(); + + serial.shutdown(); + serial.complete(first); + verify(second, never()).run(); + } }
