Repository: incubator-beam Updated Branches: refs/heads/master baae9013c -> 4d5303d8a
Add CommittedResult Return as the output to InProcessEvaluationContext#handleResult(). This allows a richer return type to improve possible behaviors when a result is returned. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2618aa68 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2618aa68 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2618aa68 Branch: refs/heads/master Commit: 2618aa68ecad95f77a6a42f8eddfe63e511edf23 Parents: a9387fc Author: Thomas Groh <[email protected]> Authored: Thu Apr 28 12:22:47 2016 -0700 Committer: Thomas Groh <[email protected]> Committed: Thu Apr 28 13:18:26 2016 -0700 ---------------------------------------------------------------------- .../sdk/runners/inprocess/CommittedResult.java | 47 ++++++++++++ .../runners/inprocess/CompletionCallback.java | 2 +- .../ExecutorServiceParallelExecutor.java | 16 ++-- .../inprocess/InProcessEvaluationContext.java | 4 +- .../runners/inprocess/TransformExecutor.java | 4 +- .../runners/inprocess/CommittedResultTest.java | 78 ++++++++++++++++++++ .../InProcessEvaluationContextTest.java | 4 +- .../inprocess/TransformExecutorTest.java | 4 +- 8 files changed, 142 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2618aa68/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CommittedResult.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CommittedResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CommittedResult.java new file mode 100644 index 0000000..3ad0ae6 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CommittedResult.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied + * + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.runners.inprocess; + +import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; +import org.apache.beam.sdk.transforms.AppliedPTransform; + +import com.google.auto.value.AutoValue; + +/** + * A {@link InProcessTransformResult} that has been committed. + */ +@AutoValue +abstract class CommittedResult { + /** + * Returns the {@link AppliedPTransform} that produced this result. + */ + public abstract AppliedPTransform<?, ?, ?> getTransform(); + + /** + * Returns the outputs produced by the transform. + */ + public abstract Iterable<? extends CommittedBundle<?>> getOutputs(); + + public static CommittedResult create( + InProcessTransformResult original, Iterable<? extends CommittedBundle<?>> outputs) { + return new AutoValue_CommittedResult(original.getTransform(), + outputs); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2618aa68/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CompletionCallback.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CompletionCallback.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CompletionCallback.java index 90c488e..30a2b92 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CompletionCallback.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CompletionCallback.java @@ -26,7 +26,7 @@ interface CompletionCallback { /** * Handle a successful result, returning the committed outputs of the result. */ - Iterable<? extends CommittedBundle<?>> handleResult( + CommittedResult handleResult( CommittedBundle<?> inputBundle, InProcessTransformResult result); /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2618aa68/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java index 3463d08..19bf35d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java @@ -216,14 +216,14 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor { */ private class DefaultCompletionCallback implements CompletionCallback { @Override - public Iterable<? extends CommittedBundle<?>> handleResult( + public CommittedResult handleResult( CommittedBundle<?> inputBundle, InProcessTransformResult result) { - Iterable<? extends CommittedBundle<?>> resultBundles = + CommittedResult committedResult = evaluationContext.handleResult(inputBundle, Collections.<TimerData>emptyList(), result); - for (CommittedBundle<?> outputBundle : resultBundles) { + for (CommittedBundle<?> outputBundle : committedResult.getOutputs()) { allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle)); } - return resultBundles; + return committedResult; } @Override @@ -246,14 +246,14 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor { } @Override - public Iterable<? extends CommittedBundle<?>> handleResult( + public CommittedResult handleResult( CommittedBundle<?> inputBundle, InProcessTransformResult result) { - Iterable<? extends CommittedBundle<?>> resultBundles = + CommittedResult committedResult = evaluationContext.handleResult(inputBundle, timers, result); - for (CommittedBundle<?> outputBundle : resultBundles) { + for (CommittedBundle<?> outputBundle : committedResult.getOutputs()) { allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle)); } - return resultBundles; + return committedResult; } @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2618aa68/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContext.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContext.java index 3990f0d..7c0dcee 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContext.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContext.java @@ -145,7 +145,7 @@ class InProcessEvaluationContext { * @param result the result of evaluating the input bundle * @return the committed bundles contained within the handled {@code result} */ - public synchronized Iterable<? extends CommittedBundle<?>> handleResult( + public synchronized CommittedResult handleResult( @Nullable CommittedBundle<?> completedBundle, Iterable<TimerData> completedTimers, InProcessTransformResult result) { @@ -176,7 +176,7 @@ class InProcessEvaluationContext { applicationStateInternals.remove(stepAndKey); } } - return committedBundles; + return CommittedResult.create(result, committedBundles); } private Iterable<? extends CommittedBundle<?>> commitBundles( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2618aa68/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformExecutor.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformExecutor.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformExecutor.java index 3a7bedc..a93c7b2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformExecutor.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformExecutor.java @@ -158,9 +158,9 @@ class TransformExecutor<T> implements Callable<InProcessTransformResult> { TransformEvaluator<T> evaluator, Collection<ModelEnforcement<T>> enforcements) throws Exception { InProcessTransformResult result = evaluator.finishBundle(); - Iterable<? extends CommittedBundle<?>> outputs = onComplete.handleResult(inputBundle, result); + CommittedResult outputs = onComplete.handleResult(inputBundle, result); for (ModelEnforcement<T> enforcement : enforcements) { - enforcement.afterFinish(inputBundle, result, outputs); + enforcement.afterFinish(inputBundle, result, outputs.getOutputs()); } return result; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2618aa68/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/CommittedResultTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/CommittedResultTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/CommittedResultTest.java new file mode 100644 index 0000000..7fad647 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/CommittedResultTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied + * + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.runners.inprocess; + +import static org.junit.Assert.assertThat; + +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; + +import com.google.common.collect.ImmutableList; + +import org.hamcrest.Matchers; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.io.Serializable; +import java.util.Collections; +import java.util.List; + +/** + * Tests for {@link CommittedResult}. + */ +@RunWith(JUnit4.class) +public class CommittedResultTest implements Serializable { + private transient TestPipeline p = TestPipeline.create(); + private transient AppliedPTransform<?, ?, ?> transform = + AppliedPTransform.of("foo", p.begin(), PDone.in(p), new PTransform<PBegin, PDone>() { + }); + private transient BundleFactory bundleFactory = InProcessBundleFactory.create(); + + @Test + public void getTransformExtractsFromResult() { + CommittedResult result = + CommittedResult.create(StepTransformResult.withoutHold(transform).build(), + Collections.<InProcessPipelineRunner.CommittedBundle<?>>emptyList()); + + assertThat(result.getTransform(), Matchers.<AppliedPTransform<?, ?, ?>>equalTo(transform)); + } + + @Test + public void getOutputsEqualInput() { + List<? extends InProcessPipelineRunner.CommittedBundle<?>> outputs = + ImmutableList.of(bundleFactory.createRootBundle(PCollection.createPrimitiveOutputInternal(p, + WindowingStrategy.globalDefault(), + PCollection.IsBounded.BOUNDED)).commit(Instant.now()), + bundleFactory.createRootBundle(PCollection.createPrimitiveOutputInternal(p, + WindowingStrategy.globalDefault(), + PCollection.IsBounded.UNBOUNDED)).commit(Instant.now())); + CommittedResult result = + CommittedResult.create(StepTransformResult.withoutHold(transform).build(), outputs); + + assertThat(result.getOutputs(), Matchers.containsInAnyOrder(outputs.toArray())); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2618aa68/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContextTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContextTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContextTest.java index ee56954..d1ea51a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContextTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContextTest.java @@ -460,7 +460,7 @@ public class InProcessEvaluationContextTest { UncommittedBundle<Integer> rootBundle = context.createRootBundle(created); rootBundle.add(WindowedValue.valueInGlobalWindow(1)); - Iterable<? extends CommittedBundle<?>> handleResult = + CommittedResult handleResult = context.handleResult( null, ImmutableList.<TimerData>of(), @@ -469,7 +469,7 @@ public class InProcessEvaluationContextTest { .build()); @SuppressWarnings("unchecked") CommittedBundle<Integer> committedBundle = - (CommittedBundle<Integer>) Iterables.getOnlyElement(handleResult); + (CommittedBundle<Integer>) Iterables.getOnlyElement(handleResult.getOutputs()); context.handleResult( null, ImmutableList.<TimerData>of(), http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2618aa68/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorTest.java index d3d70e0..31cb29a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorTest.java @@ -487,11 +487,11 @@ public class TransformExecutorTest { } @Override - public Iterable<? extends CommittedBundle<?>> handleResult( + public CommittedResult handleResult( CommittedBundle<?> inputBundle, InProcessTransformResult result) { handledResult = result; onMethod.countDown(); - return Collections.emptyList(); + return CommittedResult.create(result, Collections.<CommittedBundle<?>>emptyList()); } @Override
