Repository: incubator-beam Updated Branches: refs/heads/master e63311fa4 -> 51e1e59b8
Refactor CompletionCallbacks The default and timerful completion callbacks are identical, excepting their calls to evaluationContext.commitResult; factor that code into a common location. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2adf45f3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2adf45f3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2adf45f3 Branch: refs/heads/master Commit: 2adf45f31a0253be0ab3f3cc74b65e0aee584e37 Parents: 9b9d73f Author: Thomas Groh <[email protected]> Authored: Tue May 3 13:22:13 2016 -0700 Committer: Thomas Groh <[email protected]> Committed: Tue May 3 14:33:38 2016 -0700 ---------------------------------------------------------------------- .../direct/ExecutorServiceParallelExecutor.java | 51 ++++++++++++-------- 1 file changed, 30 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2adf45f3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java index 18af363..9f26e5a 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java @@ -210,16 +210,20 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor { } /** - * The default {@link CompletionCallback}. The default completion callback is used to complete - * transform evaluations that are triggered due to the arrival of elements from an upstream - * transform, or for a source transform. + * The base implementation of {@link CompletionCallback} that provides implementations for + * {@link #handleResult(CommittedBundle, InProcessTransformResult)} and + * {@link #handleThrowable(CommittedBundle, Throwable)}, given an implementation of + * {@link #getCommittedResult(CommittedBundle, InProcessTransformResult)}. */ - private class DefaultCompletionCallback implements CompletionCallback { + private abstract class CompletionCallbackBase implements CompletionCallback { + protected abstract CommittedResult getCommittedResult( + CommittedBundle<?> inputBundle, + InProcessTransformResult result); + @Override - public CommittedResult handleResult( + public final CommittedResult handleResult( CommittedBundle<?> inputBundle, InProcessTransformResult result) { - CommittedResult committedResult = - evaluationContext.handleResult(inputBundle, Collections.<TimerData>emptyList(), result); + CommittedResult committedResult = getCommittedResult(inputBundle, result); for (CommittedBundle<?> outputBundle : committedResult.getOutputs()) { allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle)); } @@ -227,18 +231,33 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor { } @Override - public void handleThrowable(CommittedBundle<?> inputBundle, Throwable t) { + public final void handleThrowable(CommittedBundle<?> inputBundle, Throwable t) { allUpdates.offer(ExecutorUpdate.fromThrowable(t)); } } /** + * The default {@link CompletionCallback}. The default completion callback is used to complete + * transform evaluations that are triggered due to the arrival of elements from an upstream + * transform, or for a source transform. + */ + private class DefaultCompletionCallback extends CompletionCallbackBase { + @Override + public CommittedResult getCommittedResult( + CommittedBundle<?> inputBundle, InProcessTransformResult result) { + return evaluationContext.handleResult(inputBundle, + Collections.<TimerData>emptyList(), + result); + } + } + + /** * A {@link CompletionCallback} where the completed bundle was produced to deliver some collection * of {@link TimerData timers}. When the evaluator completes successfully, reports all of the * timers used to create the input to the {@link InProcessEvaluationContext evaluation context} * as part of the result. */ - private class TimerCompletionCallback implements CompletionCallback { + private class TimerCompletionCallback extends CompletionCallbackBase { private final Iterable<TimerData> timers; private TimerCompletionCallback(Iterable<TimerData> timers) { @@ -246,19 +265,9 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor { } @Override - public CommittedResult handleResult( + public CommittedResult getCommittedResult( CommittedBundle<?> inputBundle, InProcessTransformResult result) { - CommittedResult committedResult = - evaluationContext.handleResult(inputBundle, timers, result); - for (CommittedBundle<?> outputBundle : committedResult.getOutputs()) { - allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle)); - } - return committedResult; - } - - @Override - public void handleThrowable(CommittedBundle<?> inputBundle, Throwable t) { - allUpdates.offer(ExecutorUpdate.fromThrowable(t)); + return evaluationContext.handleResult(inputBundle, timers, result); } }
