Add input type to TransformResult This would likely have caught some hard-to-diagnose type safety errors during the development of StatefulParDoEvaluatorFactory, so adding it should hopefully catch similar bugs in the future.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7502adda Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7502adda Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7502adda Branch: refs/heads/master Commit: 7502adda3262bce9d6d4fe4499bde8d8b5273029 Parents: 9fbd2d2 Author: Kenneth Knowles <k...@google.com> Authored: Tue Nov 22 16:01:45 2016 -0800 Committer: Thomas Groh <tg...@google.com> Committed: Mon Nov 28 10:06:31 2016 -0800 ---------------------------------------------------------------------- .../direct/AbstractModelEnforcement.java | 2 +- .../direct/BoundedReadEvaluatorFactory.java | 2 +- .../beam/runners/direct/CommittedResult.java | 2 +- .../beam/runners/direct/CompletionCallback.java | 2 +- ...ecycleManagerRemovingTransformEvaluator.java | 2 +- .../runners/direct/EmptyTransformEvaluator.java | 4 +- .../beam/runners/direct/EvaluationContext.java | 2 +- .../direct/ExecutorServiceParallelExecutor.java | 2 +- .../runners/direct/FlattenEvaluatorFactory.java | 10 ++--- .../GroupAlsoByWindowEvaluatorFactory.java | 5 ++- .../direct/GroupByKeyOnlyEvaluatorFactory.java | 2 +- .../direct/ImmutabilityEnforcementFactory.java | 2 +- .../beam/runners/direct/ModelEnforcement.java | 2 +- .../beam/runners/direct/ParDoEvaluator.java | 2 +- .../direct/PassthroughTransformEvaluator.java | 4 +- .../runners/direct/StepTransformResult.java | 38 +++++++++-------- .../direct/TestStreamEvaluatorFactory.java | 2 +- .../beam/runners/direct/TransformEvaluator.java | 2 +- .../beam/runners/direct/TransformExecutor.java | 4 +- .../beam/runners/direct/TransformResult.java | 16 +++++-- .../direct/UnboundedReadEvaluatorFactory.java | 3 +- .../runners/direct/ViewEvaluatorFactory.java | 2 +- .../runners/direct/WindowEvaluatorFactory.java | 6 ++- .../direct/BoundedReadEvaluatorFactoryTest.java | 10 ++--- ...leManagerRemovingTransformEvaluatorTest.java | 4 +- .../runners/direct/EvaluationContextTest.java | 20 ++++----- .../direct/FlattenEvaluatorFactoryTest.java | 6 +-- .../ImmutabilityEnforcementFactoryTest.java | 6 +-- .../beam/runners/direct/ParDoEvaluatorTest.java | 2 +- .../runners/direct/StepTransformResultTest.java | 25 ++++++----- .../direct/TestStreamEvaluatorFactoryTest.java | 10 ++--- .../runners/direct/TransformExecutorTest.java | 45 ++++++++++---------- .../UnboundedReadEvaluatorFactoryTest.java | 20 ++++++--- .../direct/WindowEvaluatorFactoryTest.java | 12 +++--- 34 files changed, 152 insertions(+), 126 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java index 81f0f5f..f09164b 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java @@ -33,6 +33,6 @@ abstract class AbstractModelEnforcement<T> implements ModelEnforcement<T> { @Override public void afterFinish( CommittedBundle<T> input, - TransformResult result, + TransformResult<T> result, Iterable<? extends CommittedBundle<?>> outputs) {} } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java index 66c55cd..65b622f 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java @@ -161,7 +161,7 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory { } @Override - public TransformResult finishBundle() { + public TransformResult<BoundedSourceShard<OutputT>> finishBundle() { return resultBuilder.build(); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java index 5fcf7b3..4db7e18 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java @@ -59,7 +59,7 @@ abstract class CommittedResult { public abstract Set<OutputType> getProducedOutputTypes(); public static CommittedResult create( - TransformResult original, + TransformResult<?> original, CommittedBundle<?> unprocessedElements, Iterable<? extends CommittedBundle<?>> outputs, Set<OutputType> producedOutputs) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java index 2986df1..766259d 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java @@ -28,7 +28,7 @@ interface CompletionCallback { * Handle a successful result, returning the committed outputs of the result. */ CommittedResult handleResult( - CommittedBundle<?> inputBundle, TransformResult result); + CommittedBundle<?> inputBundle, TransformResult<?> result); /** * Handle an input bundle that did not require processing. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java index faa0615..fb13b0f 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java @@ -54,7 +54,7 @@ class DoFnLifecycleManagerRemovingTransformEvaluator<InputT> implements Transfor } @Override - public TransformResult finishBundle() throws Exception { + public TransformResult<InputT> finishBundle() throws Exception { try { return underlying.finishBundle(); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyTransformEvaluator.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyTransformEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyTransformEvaluator.java index 778c5aa..85e5e70 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyTransformEvaluator.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyTransformEvaluator.java @@ -43,8 +43,8 @@ final class EmptyTransformEvaluator<T> implements TransformEvaluator<T> { public void processElement(WindowedValue<T> element) throws Exception {} @Override - public TransformResult finishBundle() throws Exception { - return StepTransformResult.withHold(transform, BoundedWindow.TIMESTAMP_MIN_VALUE) + public TransformResult<T> finishBundle() throws Exception { + return StepTransformResult.<T>withHold(transform, BoundedWindow.TIMESTAMP_MIN_VALUE) .build(); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java index b814def..c1225f6 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java @@ -161,7 +161,7 @@ class EvaluationContext { public CommittedResult handleResult( @Nullable CommittedBundle<?> completedBundle, Iterable<TimerData> completedTimers, - TransformResult result) { + TransformResult<?> result) { Iterable<? extends CommittedBundle<?>> committedBundles = commitBundles(result.getOutputBundles()); metrics.commitLogical(completedBundle, result.getLogicalMetricUpdates()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java index 05cdd34..b7908c5 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java @@ -270,7 +270,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor { @Override public final CommittedResult handleResult( - CommittedBundle<?> inputBundle, TransformResult result) { + CommittedBundle<?> inputBundle, TransformResult<?> result) { CommittedResult committedResult = evaluationContext.handleResult(inputBundle, timers, result); for (CommittedBundle<?> outputBundle : committedResult.getOutputs()) { allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle, http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java index 57d5628..817e736 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java @@ -56,17 +56,17 @@ class FlattenEvaluatorFactory implements TransformEvaluatorFactory { application) { final UncommittedBundle<InputT> outputBundle = evaluationContext.createBundle(application.getOutput()); - final TransformResult result = - StepTransformResult.withoutHold(application).addOutput(outputBundle).build(); + final TransformResult<InputT> result = + StepTransformResult.<InputT>withoutHold(application).addOutput(outputBundle).build(); return new FlattenEvaluator<>(outputBundle, result); } private static class FlattenEvaluator<InputT> implements TransformEvaluator<InputT> { private final UncommittedBundle<InputT> outputBundle; - private final TransformResult result; + private final TransformResult<InputT> result; public FlattenEvaluator( - UncommittedBundle<InputT> outputBundle, TransformResult result) { + UncommittedBundle<InputT> outputBundle, TransformResult<InputT> result) { this.outputBundle = outputBundle; this.result = result; } @@ -77,7 +77,7 @@ class FlattenEvaluatorFactory implements TransformEvaluatorFactory { } @Override - public TransformResult finishBundle() { + public TransformResult<InputT> finishBundle() { return result; } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java index 36c742b..9d25bc6 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java @@ -208,10 +208,11 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory { } @Override - public TransformResult finishBundle() throws Exception { + public TransformResult<KeyedWorkItem<K, V>> finishBundle() throws Exception { // State is initialized within the constructor. It can never be null. CopyOnAccessInMemoryStateInternals<?> state = stepContext.commitState(); - return StepTransformResult.withHold(application, state.getEarliestWatermarkHold()) + return StepTransformResult.<KeyedWorkItem<K, V>>withHold( + application, state.getEarliestWatermarkHold()) .withState(state) .addOutput(outputBundles) .withTimerUpdate(stepContext.getTimerUpdate()) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java index 0fa7ebd..4d691ea 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java @@ -143,7 +143,7 @@ class GroupByKeyOnlyEvaluatorFactory implements TransformEvaluatorFactory { } @Override - public TransformResult finishBundle() { + public TransformResult<KV<K, V>> finishBundle() { Builder resultBuilder = StepTransformResult.withoutHold(application); for (Map.Entry<GroupingKey<K>, List<WindowedValue<V>>> groupedEntry : groupingMap.entrySet()) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java index 612922a..85fc374 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java @@ -74,7 +74,7 @@ class ImmutabilityEnforcementFactory implements ModelEnforcementFactory { @Override public void afterFinish( CommittedBundle<T> input, - TransformResult result, + TransformResult<T> result, Iterable<? extends CommittedBundle<?>> outputs) { for (MutationDetector detector : mutationElements.values()) { verifyUnmodified(detector); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java index 074619a..25226f7 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java @@ -58,6 +58,6 @@ public interface ModelEnforcement<T> { */ void afterFinish( CommittedBundle<T> input, - TransformResult result, + TransformResult<T> result, Iterable<? extends CommittedBundle<?>> outputs); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java index 6f91319..254fa44 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java @@ -122,7 +122,7 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> { } @Override - public TransformResult finishBundle() { + public TransformResult<InputT> finishBundle() { try { fnRunner.finishBundle(); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java index c6e10e5..153af65 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java @@ -42,8 +42,8 @@ class PassthroughTransformEvaluator<InputT> implements TransformEvaluator<InputT } @Override - public TransformResult finishBundle() throws Exception { - return StepTransformResult.withoutHold(transform).addOutput(output).build(); + public TransformResult<InputT> finishBundle() throws Exception { + return StepTransformResult.<InputT>withoutHold(transform).addOutput(output).build(); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java index 5719e44..d58b027 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java @@ -37,18 +37,20 @@ import org.joda.time.Instant; * An immutable {@link TransformResult}. */ @AutoValue -public abstract class StepTransformResult implements TransformResult { +public abstract class StepTransformResult<InputT> implements TransformResult<InputT> { - public static Builder withHold(AppliedPTransform<?, ?, ?> transform, Instant watermarkHold) { + public static <InputT> Builder<InputT> withHold( + AppliedPTransform<?, ?, ?> transform, Instant watermarkHold) { return new Builder(transform, watermarkHold); } - public static Builder withoutHold(AppliedPTransform<?, ?, ?> transform) { + public static <InputT> Builder<InputT> withoutHold( + AppliedPTransform<?, ?, ?> transform) { return new Builder(transform, BoundedWindow.TIMESTAMP_MAX_VALUE); } @Override - public TransformResult withLogicalMetricUpdates(MetricUpdates metricUpdates) { + public TransformResult<InputT> withLogicalMetricUpdates(MetricUpdates metricUpdates) { return new AutoValue_StepTransformResult( getTransform(), getOutputBundles(), @@ -64,10 +66,10 @@ public abstract class StepTransformResult implements TransformResult { /** * A builder for creating instances of {@link StepTransformResult}. */ - public static class Builder { + public static class Builder<InputT> { private final AppliedPTransform<?, ?, ?> transform; private final ImmutableList.Builder<UncommittedBundle<?>> bundlesBuilder; - private final ImmutableList.Builder<WindowedValue<?>> unprocessedElementsBuilder; + private final ImmutableList.Builder<WindowedValue<InputT>> unprocessedElementsBuilder; private MetricUpdates metricUpdates; private CopyOnAccessInMemoryStateInternals<?> state; private TimerUpdate timerUpdate; @@ -85,8 +87,8 @@ public abstract class StepTransformResult implements TransformResult { this.metricUpdates = MetricUpdates.EMPTY; } - public StepTransformResult build() { - return new AutoValue_StepTransformResult( + public StepTransformResult<InputT> build() { + return new AutoValue_StepTransformResult<>( transform, bundlesBuilder.build(), unprocessedElementsBuilder.build(), @@ -98,49 +100,51 @@ public abstract class StepTransformResult implements TransformResult { producedOutputs); } - public Builder withAggregatorChanges(AggregatorContainer.Mutator aggregatorChanges) { + public Builder<InputT> withAggregatorChanges(AggregatorContainer.Mutator aggregatorChanges) { this.aggregatorChanges = aggregatorChanges; return this; } - public Builder withMetricUpdates(MetricUpdates metricUpdates) { + public Builder<InputT> withMetricUpdates(MetricUpdates metricUpdates) { this.metricUpdates = metricUpdates; return this; } - public Builder withState(CopyOnAccessInMemoryStateInternals<?> state) { + public Builder<InputT> withState(CopyOnAccessInMemoryStateInternals<?> state) { this.state = state; return this; } - public Builder withTimerUpdate(TimerUpdate timerUpdate) { + public Builder<InputT> withTimerUpdate(TimerUpdate timerUpdate) { this.timerUpdate = timerUpdate; return this; } - public Builder addUnprocessedElements(WindowedValue<?>... unprocessed) { + public Builder<InputT> addUnprocessedElements(WindowedValue<InputT>... unprocessed) { unprocessedElementsBuilder.addAll(Arrays.asList(unprocessed)); return this; } - public Builder addUnprocessedElements(Iterable<? extends WindowedValue<?>> unprocessed) { + public Builder<InputT> addUnprocessedElements( + Iterable<? extends WindowedValue<InputT>> unprocessed) { unprocessedElementsBuilder.addAll(unprocessed); return this; } - public Builder addOutput( + public Builder<InputT> addOutput( UncommittedBundle<?> outputBundle, UncommittedBundle<?>... outputBundles) { bundlesBuilder.add(outputBundle); bundlesBuilder.add(outputBundles); return this; } - public Builder addOutput(Collection<UncommittedBundle<?>> outputBundles) { + public Builder<InputT> addOutput( + Collection<UncommittedBundle<?>> outputBundles) { bundlesBuilder.addAll(outputBundles); return this; } - public Builder withAdditionalOutput(OutputType producedAdditionalOutput) { + public Builder<InputT> withAdditionalOutput(OutputType producedAdditionalOutput) { producedOutputs.add(producedAdditionalOutput); return this; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java index 2ab6adf..9df7cdc 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java @@ -127,7 +127,7 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory { } @Override - public TransformResult finishBundle() throws Exception { + public TransformResult<TestStreamIndex<T>> finishBundle() throws Exception { return resultBuilder.build(); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java index 1624fcb..79c942b 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java @@ -42,5 +42,5 @@ public interface TransformEvaluator<InputT> { * * @return an {@link TransformResult} containing the results of this bundle evaluation. */ - TransformResult finishBundle() throws Exception; + TransformResult<InputT> finishBundle() throws Exception; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java index fb31cc9..bbc0aae 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java @@ -159,11 +159,11 @@ class TransformExecutor<T> implements Runnable { * @return the {@link TransformResult} produced by * {@link TransformEvaluator#finishBundle()} */ - private TransformResult finishBundle( + private TransformResult<T> finishBundle( TransformEvaluator<T> evaluator, MetricsContainer metricsContainer, Collection<ModelEnforcement<T>> enforcements) throws Exception { - TransformResult result = evaluator.finishBundle() + TransformResult<T> result = evaluator.finishBundle() .withLogicalMetricUpdates(metricsContainer.getCumulative()); CommittedResult outputs = onComplete.handleResult(inputBundle, result); for (ModelEnforcement<T> enforcement : enforcements) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java index ac1e395..b4797b0 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java @@ -25,6 +25,7 @@ import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate; import org.apache.beam.sdk.metrics.MetricUpdates; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals; @@ -32,16 +33,25 @@ import org.joda.time.Instant; /** * The result of evaluating an {@link AppliedPTransform} with a {@link TransformEvaluator}. + * + * <p>Every transform evaluator has a defined input type, but {@link ParDo} has multiple outputs + * so there is not necesssarily a defined output type. */ -public interface TransformResult { +public interface TransformResult<InputT> { /** * Returns the {@link AppliedPTransform} that produced this result. + * + * <p>This is treated as an opaque identifier so evaluators can delegate to other evaluators + * that may not have compatible types. */ AppliedPTransform<?, ?, ?> getTransform(); /** * Returns the {@link UncommittedBundle (uncommitted) Bundles} output by this transform. These * will be committed by the evaluation context as part of completing this result. + * + * <p>Note that the bundles need not have a uniform type, for example in the case of multi-output + * {@link ParDo}. */ Iterable<? extends UncommittedBundle<?>> getOutputBundles(); @@ -49,7 +59,7 @@ public interface TransformResult { * Returns elements that were provided to the {@link TransformEvaluator} as input but were not * processed. */ - Iterable<? extends WindowedValue<?>> getUnprocessedElements(); + Iterable<? extends WindowedValue<InputT>> getUnprocessedElements(); /** * Returns the {@link AggregatorContainer.Mutator} used by this {@link PTransform}, or null if @@ -97,5 +107,5 @@ public interface TransformResult { * Returns a new TransformResult based on this one but overwriting any existing logical metric * updates with {@code metricUpdates}. */ - TransformResult withLogicalMetricUpdates(MetricUpdates metricUpdates); + TransformResult<InputT> withLogicalMetricUpdates(MetricUpdates metricUpdates); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java index 24a91cb..a4aebc9 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java @@ -229,7 +229,8 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory { } @Override - public TransformResult finishBundle() throws IOException { + public TransformResult<UnboundedSourceShard<OutputT, CheckpointMarkT>> finishBundle() + throws IOException { return resultBuilder.build(); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java index 2dd280a..b92ade1 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java @@ -81,7 +81,7 @@ class ViewEvaluatorFactory implements TransformEvaluatorFactory { } @Override - public TransformResult finishBundle() { + public TransformResult<Iterable<InT>> finishBundle() { writer.add(elements); Builder resultBuilder = StepTransformResult.withoutHold(application); if (!elements.isEmpty()) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java index eb53b7f..991addf 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java @@ -103,8 +103,10 @@ class WindowEvaluatorFactory implements TransformEvaluatorFactory { } @Override - public TransformResult finishBundle() throws Exception { - return StepTransformResult.withoutHold(transform).addOutput(outputBundle).build(); + public TransformResult<InputT> finishBundle() throws Exception { + return StepTransformResult.<InputT>withoutHold(transform) + .addOutput(outputBundle) + .build(); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java index e956c34..dee95a7 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java @@ -110,7 +110,7 @@ public class BoundedReadEvaluatorFactoryTest { for (WindowedValue<?> shard : shardBundle.getElements()) { evaluator.processElement((WindowedValue) shard); } - TransformResult result = evaluator.finishBundle(); + TransformResult<?> result = evaluator.finishBundle(); assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE)); assertThat( Iterables.size(result.getOutputBundles()), @@ -154,11 +154,11 @@ public class BoundedReadEvaluatorFactoryTest { Collection<CommittedBundle<?>> newUnreadInputs = new ArrayList<>(); for (CommittedBundle<?> shardBundle : unreadInputs) { - TransformEvaluator<?> evaluator = factory.forApplication(transform, null); + TransformEvaluator<Long> evaluator = factory.forApplication(transform, null); for (WindowedValue<?> shard : shardBundle.getElements()) { evaluator.processElement((WindowedValue) shard); } - TransformResult result = evaluator.finishBundle(); + TransformResult<Long> result = evaluator.finishBundle(); assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE)); assertThat( Iterables.size(result.getOutputBundles()), @@ -207,7 +207,7 @@ public class BoundedReadEvaluatorFactoryTest { for (WindowedValue<?> shard : shardBundle.getElements()) { evaluator.processElement((WindowedValue) shard); } - TransformResult result = evaluator.finishBundle(); + TransformResult<?> result = evaluator.finishBundle(); assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE)); assertThat( Iterables.size(result.getOutputBundles()), @@ -277,7 +277,7 @@ public class BoundedReadEvaluatorFactoryTest { when(context.createBundle(longs)).thenReturn(outputBundle); evaluator.processElement(shard); } - TransformResult result = evaluator.finishBundle(); + TransformResult<?> result = evaluator.finishBundle(); assertThat(Iterables.size(result.getOutputBundles()), equalTo(splits.size())); List<WindowedValue<?>> outputElems = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java index 9e2732e..b5eec63 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java @@ -115,7 +115,7 @@ public class DoFnLifecycleManagerRemovingTransformEvaluatorTest { } @Override - public TransformResult finishBundle() throws Exception { + public TransformResult<Object> finishBundle() throws Exception { finishBundleCalled = true; return null; } @@ -128,7 +128,7 @@ public class DoFnLifecycleManagerRemovingTransformEvaluatorTest { } @Override - public TransformResult finishBundle() throws Exception { + public TransformResult<Object> finishBundle() throws Exception { throw new Exception(); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java index e1277ac..9a3959d 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java @@ -250,7 +250,7 @@ public class EvaluationContextTest { AggregatorContainer.Mutator mutator = container.createMutator(); mutator.createAggregatorForDoFn(fn, stepContext, "foo", new SumLongFn()).addValue(4L); - TransformResult result = + TransformResult<?> result = StepTransformResult.withoutHold(created.getProducingTransformInternal()) .withAggregatorChanges(mutator) .build(); @@ -260,7 +260,7 @@ public class EvaluationContextTest { AggregatorContainer.Mutator mutatorAgain = container.createMutator(); mutatorAgain.createAggregatorForDoFn(fn, stepContext, "foo", new SumLongFn()).addValue(12L); - TransformResult secondResult = + TransformResult<?> secondResult = StepTransformResult.withoutHold(downstream.getProducingTransformInternal()) .withAggregatorChanges(mutatorAgain) .build(); @@ -286,7 +286,7 @@ public class EvaluationContextTest { bag.add(2); bag.add(4); - TransformResult stateResult = + TransformResult<?> stateResult = StepTransformResult.withoutHold(downstream.getProducingTransformInternal()) .withState(state) .build(); @@ -319,7 +319,7 @@ public class EvaluationContextTest { context.scheduleAfterOutputWouldBeProduced( downstream, GlobalWindow.INSTANCE, WindowingStrategy.globalDefault(), callback); - TransformResult result = + TransformResult<?> result = StepTransformResult.withHold(created.getProducingTransformInternal(), new Instant(0)) .build(); @@ -328,7 +328,7 @@ public class EvaluationContextTest { // will likely be flaky if this logic is broken assertThat(callLatch.await(500L, TimeUnit.MILLISECONDS), is(false)); - TransformResult finishedResult = + TransformResult<?> finishedResult = StepTransformResult.withoutHold(created.getProducingTransformInternal()).build(); context.handleResult(null, ImmutableList.<TimerData>of(), finishedResult); context.forceRefresh(); @@ -338,7 +338,7 @@ public class EvaluationContextTest { @Test public void callAfterOutputMustHaveBeenProducedAlreadyAfterCallsImmediately() throws Exception { - TransformResult finishedResult = + TransformResult<?> finishedResult = StepTransformResult.withoutHold(created.getProducingTransformInternal()).build(); context.handleResult(null, ImmutableList.<TimerData>of(), finishedResult); @@ -358,7 +358,7 @@ public class EvaluationContextTest { @Test public void extractFiredTimersExtractsTimers() { - TransformResult holdResult = + TransformResult<?> holdResult = StepTransformResult.withHold(created.getProducingTransformInternal(), new Instant(0)) .build(); context.handleResult(null, ImmutableList.<TimerData>of(), holdResult); @@ -366,7 +366,7 @@ public class EvaluationContextTest { StructuralKey<?> key = StructuralKey.of("foo".length(), VarIntCoder.of()); TimerData toFire = TimerData.of(StateNamespaces.global(), new Instant(100L), TimeDomain.EVENT_TIME); - TransformResult timerResult = + TransformResult<?> timerResult = StepTransformResult.withoutHold(downstream.getProducingTransformInternal()) .withState(CopyOnAccessInMemoryStateInternals.withUnderlying(key, null)) .withTimerUpdate(TimerUpdate.builder(key).setTimer(toFire).build()) @@ -382,7 +382,7 @@ public class EvaluationContextTest { // timer hasn't fired assertThat(context.extractFiredTimers(), emptyIterable()); - TransformResult advanceResult = + TransformResult<?> advanceResult = StepTransformResult.withoutHold(created.getProducingTransformInternal()).build(); // Should cause the downstream timer to fire context.handleResult(null, ImmutableList.<TimerData>of(), advanceResult); @@ -460,7 +460,7 @@ public class EvaluationContextTest { context.handleResult( null, ImmutableList.<TimerData>of(), - StepTransformResult.withoutHold(created.getProducingTransformInternal()) + StepTransformResult.<Integer>withoutHold(created.getProducingTransformInternal()) .addOutput(rootBundle) .build()); @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java index 417aa64..cb27fbc 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java @@ -84,8 +84,8 @@ public class FlattenEvaluatorFactoryTest { rightSideEvaluator.processElement( WindowedValue.timestampedValueInGlobalWindow(-4, new Instant(-4096))); - TransformResult rightSideResult = rightSideEvaluator.finishBundle(); - TransformResult leftSideResult = leftSideEvaluator.finishBundle(); + TransformResult<Integer> rightSideResult = rightSideEvaluator.finishBundle(); + TransformResult<Integer> leftSideResult = leftSideEvaluator.finishBundle(); assertThat( rightSideResult.getOutputBundles(), @@ -131,7 +131,7 @@ public class FlattenEvaluatorFactoryTest { flattened.getProducingTransformInternal(), bundleFactory.createRootBundle().commit(BoundedWindow.TIMESTAMP_MAX_VALUE)); - TransformResult leftSideResult = emptyEvaluator.finishBundle(); + TransformResult<Integer> leftSideResult = emptyEvaluator.finishBundle(); CommittedBundle<?> outputBundle = Iterables.getOnlyElement(leftSideResult.getOutputBundles()).commit(Instant.now()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java index a7277fe..a65cd30 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java @@ -78,7 +78,7 @@ public class ImmutabilityEnforcementFactoryTest implements Serializable { enforcement.afterElement(element); enforcement.afterFinish( elements, - StepTransformResult.withoutHold(consumer).build(), + StepTransformResult.<byte[]>withoutHold(consumer).build(), Collections.<CommittedBundle<?>>emptyList()); } @@ -98,7 +98,7 @@ public class ImmutabilityEnforcementFactoryTest implements Serializable { enforcement.afterElement(element); enforcement.afterFinish( elements, - StepTransformResult.withoutHold(consumer).build(), + StepTransformResult.<byte[]>withoutHold(consumer).build(), Collections.<CommittedBundle<?>>emptyList()); } @@ -120,7 +120,7 @@ public class ImmutabilityEnforcementFactoryTest implements Serializable { thrown.expectMessage("Input values must not be mutated"); enforcement.afterFinish( elements, - StepTransformResult.withoutHold(consumer).build(), + StepTransformResult.<byte[]>withoutHold(consumer).build(), Collections.<CommittedBundle<?>>emptyList()); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java index eab92f4..85e99c5 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java @@ -112,7 +112,7 @@ public class ParDoEvaluatorTest { evaluator.processElement(first); evaluator.processElement(second); evaluator.processElement(third); - TransformResult result = evaluator.finishBundle(); + TransformResult<Integer> result = evaluator.finishBundle(); assertThat( result.getUnprocessedElements(), http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java index 61f5812..a21d8f7 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java @@ -56,34 +56,37 @@ public class StepTransformResultTest { @Test public void producedBundlesProducedOutputs() { UncommittedBundle<Integer> bundle = bundleFactory.createBundle(pc); - TransformResult result = StepTransformResult.withoutHold(transform).addOutput(bundle) - .build(); + TransformResult<Integer> result = + StepTransformResult.<Integer>withoutHold(transform).addOutput(bundle).build(); - assertThat(result.getOutputBundles(), Matchers.<UncommittedBundle>containsInAnyOrder(bundle)); + assertThat( + result.getOutputBundles(), Matchers.<UncommittedBundle<?>>containsInAnyOrder(bundle)); } @Test public void withAdditionalOutputProducedOutputs() { - TransformResult result = StepTransformResult.withoutHold(transform) - .withAdditionalOutput(OutputType.PCOLLECTION_VIEW) - .build(); + TransformResult<Integer> result = + StepTransformResult.<Integer>withoutHold(transform) + .withAdditionalOutput(OutputType.PCOLLECTION_VIEW) + .build(); assertThat(result.getOutputTypes(), containsInAnyOrder(OutputType.PCOLLECTION_VIEW)); } @Test public void producedBundlesAndAdditionalOutputProducedOutputs() { - TransformResult result = StepTransformResult.withoutHold(transform) - .addOutput(bundleFactory.createBundle(pc)) - .withAdditionalOutput(OutputType.PCOLLECTION_VIEW) - .build(); + TransformResult<Integer> result = + StepTransformResult.<Integer>withoutHold(transform) + .addOutput(bundleFactory.createBundle(pc)) + .withAdditionalOutput(OutputType.PCOLLECTION_VIEW) + .build(); assertThat(result.getOutputTypes(), hasItem(OutputType.PCOLLECTION_VIEW)); } @Test public void noBundlesNoAdditionalOutputProducedOutputsFalse() { - TransformResult result = StepTransformResult.withoutHold(transform).build(); + TransformResult<Integer> result = StepTransformResult.<Integer>withoutHold(transform).build(); assertThat(result.getOutputTypes(), emptyIterable()); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java index 94a0d41..3d31df6 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java @@ -90,7 +90,7 @@ public class TestStreamEvaluatorFactoryTest { TransformEvaluator<TestStreamIndex<Integer>> firstEvaluator = factory.forApplication(streamVals.getProducingTransformInternal(), initialBundle); firstEvaluator.processElement(Iterables.getOnlyElement(initialBundle.getElements())); - TransformResult firstResult = firstEvaluator.finishBundle(); + TransformResult<TestStreamIndex<Integer>> firstResult = firstEvaluator.finishBundle(); WindowedValue<TestStreamIndex<Integer>> firstResidual = (WindowedValue<TestStreamIndex<Integer>>) @@ -103,7 +103,7 @@ public class TestStreamEvaluatorFactoryTest { TransformEvaluator<TestStreamIndex<Integer>> secondEvaluator = factory.forApplication(streamVals.getProducingTransformInternal(), secondBundle); secondEvaluator.processElement(firstResidual); - TransformResult secondResult = secondEvaluator.finishBundle(); + TransformResult<TestStreamIndex<Integer>> secondResult = secondEvaluator.finishBundle(); WindowedValue<TestStreamIndex<Integer>> secondResidual = (WindowedValue<TestStreamIndex<Integer>>) @@ -116,7 +116,7 @@ public class TestStreamEvaluatorFactoryTest { TransformEvaluator<TestStreamIndex<Integer>> thirdEvaluator = factory.forApplication(streamVals.getProducingTransformInternal(), thirdBundle); thirdEvaluator.processElement(secondResidual); - TransformResult thirdResult = thirdEvaluator.finishBundle(); + TransformResult<TestStreamIndex<Integer>> thirdResult = thirdEvaluator.finishBundle(); WindowedValue<TestStreamIndex<Integer>> thirdResidual = (WindowedValue<TestStreamIndex<Integer>>) @@ -130,7 +130,7 @@ public class TestStreamEvaluatorFactoryTest { TransformEvaluator<TestStreamIndex<Integer>> fourthEvaluator = factory.forApplication(streamVals.getProducingTransformInternal(), fourthBundle); fourthEvaluator.processElement(thirdResidual); - TransformResult fourthResult = fourthEvaluator.finishBundle(); + TransformResult<TestStreamIndex<Integer>> fourthResult = fourthEvaluator.finishBundle(); assertThat(clock.now(), equalTo(start.plus(Duration.standardMinutes(10)))); WindowedValue<TestStreamIndex<Integer>> fourthResidual = @@ -144,7 +144,7 @@ public class TestStreamEvaluatorFactoryTest { TransformEvaluator<TestStreamIndex<Integer>> fifthEvaluator = factory.forApplication(streamVals.getProducingTransformInternal(), fifthBundle); fifthEvaluator.processElement(fourthResidual); - TransformResult fifthResult = fifthEvaluator.finishBundle(); + TransformResult<TestStreamIndex<Integer>> fifthResult = fifthEvaluator.finishBundle(); assertThat( Iterables.getOnlyElement(firstResult.getOutputBundles()) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java index 0b7b882..85eff65 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java @@ -17,7 +17,6 @@ */ package org.apache.beam.runners.direct; -import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; @@ -96,7 +95,7 @@ public class TransformExecutorTest { @Test public void callWithNullInputBundleFinishesBundleAndCompletes() throws Exception { - final TransformResult result = + final TransformResult<Object> result = StepTransformResult.withoutHold(created.getProducingTransformInternal()).build(); final AtomicBoolean finishCalled = new AtomicBoolean(false); TransformEvaluator<Object> evaluator = @@ -107,7 +106,7 @@ public class TransformExecutorTest { } @Override - public TransformResult finishBundle() throws Exception { + public TransformResult<Object> finishBundle() throws Exception { finishCalled.set(true); return result; } @@ -128,7 +127,7 @@ public class TransformExecutorTest { executor.run(); assertThat(finishCalled.get(), is(true)); - assertThat(completionCallback.handledResult, equalTo(result)); + assertThat(completionCallback.handledResult, Matchers.<TransformResult<?>>equalTo(result)); assertThat(completionCallback.handledException, is(nullValue())); } @@ -154,8 +153,8 @@ public class TransformExecutorTest { @Test public void inputBundleProcessesEachElementFinishesAndCompletes() throws Exception { - final TransformResult result = - StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build(); + final TransformResult<String> result = + StepTransformResult.<String>withoutHold(downstream.getProducingTransformInternal()).build(); final Collection<WindowedValue<String>> elementsProcessed = new ArrayList<>(); TransformEvaluator<String> evaluator = new TransformEvaluator<String>() { @@ -166,7 +165,7 @@ public class TransformExecutorTest { } @Override - public TransformResult finishBundle() throws Exception { + public TransformResult<String> finishBundle() throws Exception { return result; } }; @@ -194,14 +193,14 @@ public class TransformExecutorTest { evaluatorCompleted.await(); assertThat(elementsProcessed, containsInAnyOrder(spam, third, foo)); - assertThat(completionCallback.handledResult, equalTo(result)); + assertThat(completionCallback.handledResult, Matchers.<TransformResult<?>>equalTo(result)); assertThat(completionCallback.handledException, is(nullValue())); } @Test public void processElementThrowsExceptionCallsback() throws Exception { - final TransformResult result = - StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build(); + final TransformResult<String> result = + StepTransformResult.<String>withoutHold(downstream.getProducingTransformInternal()).build(); final Exception exception = new Exception(); TransformEvaluator<String> evaluator = new TransformEvaluator<String>() { @@ -211,7 +210,7 @@ public class TransformExecutorTest { } @Override - public TransformResult finishBundle() throws Exception { + public TransformResult<String> finishBundle() throws Exception { return result; } }; @@ -248,7 +247,7 @@ public class TransformExecutorTest { public void processElement(WindowedValue<String> element) throws Exception {} @Override - public TransformResult finishBundle() throws Exception { + public TransformResult<String> finishBundle() throws Exception { throw exception; } }; @@ -277,7 +276,7 @@ public class TransformExecutorTest { @Test public void callWithEnforcementAppliesEnforcement() throws Exception { - final TransformResult result = + final TransformResult<Object> result = StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build(); TransformEvaluator<Object> evaluator = @@ -286,7 +285,7 @@ public class TransformExecutorTest { public void processElement(WindowedValue<Object> element) throws Exception {} @Override - public TransformResult finishBundle() throws Exception { + public TransformResult<Object> finishBundle() throws Exception { return result; } }; @@ -317,7 +316,7 @@ public class TransformExecutorTest { assertThat( testEnforcement.afterElements, Matchers.<WindowedValue<?>>containsInAnyOrder(barElem, fooElem)); - assertThat(testEnforcement.finishedBundles, contains(result)); + assertThat(testEnforcement.finishedBundles, Matchers.<TransformResult<?>>contains(result)); } @Test @@ -333,7 +332,7 @@ public class TransformExecutorTest { } }); - final TransformResult result = + final TransformResult<Object> result = StepTransformResult.withoutHold(pcBytes.getProducingTransformInternal()).build(); final CountDownLatch testLatch = new CountDownLatch(1); final CountDownLatch evaluatorLatch = new CountDownLatch(1); @@ -344,7 +343,7 @@ public class TransformExecutorTest { public void processElement(WindowedValue<Object> element) throws Exception {} @Override - public TransformResult finishBundle() throws Exception { + public TransformResult<Object> finishBundle() throws Exception { testLatch.countDown(); evaluatorLatch.await(); return result; @@ -389,7 +388,7 @@ public class TransformExecutorTest { } }); - final TransformResult result = + final TransformResult<Object> result = StepTransformResult.withoutHold(pcBytes.getProducingTransformInternal()).build(); final CountDownLatch testLatch = new CountDownLatch(1); final CountDownLatch evaluatorLatch = new CountDownLatch(1); @@ -403,7 +402,7 @@ public class TransformExecutorTest { } @Override - public TransformResult finishBundle() throws Exception { + public TransformResult<Object> finishBundle() throws Exception { return result; } }; @@ -434,7 +433,7 @@ public class TransformExecutorTest { } private static class RegisteringCompletionCallback implements CompletionCallback { - private TransformResult handledResult = null; + private TransformResult<?> handledResult = null; private boolean handledEmpty = false; private Exception handledException = null; private final CountDownLatch onMethod; @@ -444,7 +443,7 @@ public class TransformExecutorTest { } @Override - public CommittedResult handleResult(CommittedBundle<?> inputBundle, TransformResult result) { + public CommittedResult handleResult(CommittedBundle<?> inputBundle, TransformResult<?> result) { handledResult = result; onMethod.countDown(); @SuppressWarnings("rawtypes") @@ -490,7 +489,7 @@ public class TransformExecutorTest { private static class TestEnforcement<T> implements ModelEnforcement<T> { private final List<WindowedValue<T>> beforeElements = new ArrayList<>(); private final List<WindowedValue<T>> afterElements = new ArrayList<>(); - private final List<TransformResult> finishedBundles = new ArrayList<>(); + private final List<TransformResult<?>> finishedBundles = new ArrayList<>(); @Override public void beforeElement(WindowedValue<T> element) { @@ -505,7 +504,7 @@ public class TransformExecutorTest { @Override public void afterFinish( CommittedBundle<T> input, - TransformResult result, + TransformResult<T> result, Iterable<? extends CommittedBundle<?>> outputs) { finishedBundles.add(result); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java index 8d38275..5a10134 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java @@ -159,9 +159,10 @@ public class UnboundedReadEvaluatorFactoryTest { longs.getProducingTransformInternal(), inputShards); evaluator.processElement((WindowedValue) Iterables.getOnlyElement(inputShards.getElements())); - TransformResult result = evaluator.finishBundle(); + TransformResult<? super UnboundedSourceShard<Long, ?>> result = evaluator.finishBundle(); - WindowedValue<?> residual = Iterables.getOnlyElement(result.getUnprocessedElements()); + WindowedValue<? super UnboundedSourceShard<Long, ?>> residual = + Iterables.getOnlyElement(result.getUnprocessedElements()); assertThat( residual.getTimestamp(), Matchers.<ReadableInstant>lessThan(DateTime.now().toInstant())); UnboundedSourceShard<Long, ?> residualShard = @@ -206,7 +207,8 @@ public class UnboundedReadEvaluatorFactoryTest { evaluator.processElement( (WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>>) value); } - TransformResult result = evaluator.finishBundle(); + TransformResult<UnboundedSourceShard<Long, TestCheckpointMark>> result = + evaluator.finishBundle(); assertThat( output.commit(Instant.now()).getElements(), containsInAnyOrder(tgw(1L), tgw(2L), tgw(4L), tgw(3L), tgw(0L))); @@ -248,7 +250,8 @@ public class UnboundedReadEvaluatorFactoryTest { evaluator.processElement( (WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>>) value); } - TransformResult result = evaluator.finishBundle(); + TransformResult<UnboundedSourceShard<Long, TestCheckpointMark>> result = + evaluator.finishBundle(); // Read from the residual of the first read. This should not produce any output, but should // include a residual shard in the result. @@ -261,7 +264,8 @@ public class UnboundedReadEvaluatorFactoryTest { Iterables.getOnlyElement(result.getUnprocessedElements()); secondEvaluator.processElement(residual); - TransformResult secondResult = secondEvaluator.finishBundle(); + TransformResult<UnboundedSourceShard<Long, TestCheckpointMark>> secondResult = + secondEvaluator.finishBundle(); // Sanity check that nothing was output (The test would have to run for more than a day to do // so correctly.) @@ -308,7 +312,8 @@ public class UnboundedReadEvaluatorFactoryTest { TransformEvaluator<UnboundedSourceShard<Long, TestCheckpointMark>> evaluator = factory.forApplication(sourceTransform, inputBundle); evaluator.processElement(shard); - TransformResult result = evaluator.finishBundle(); + TransformResult<UnboundedSourceShard<Long, TestCheckpointMark>> result = + evaluator.finishBundle(); CommittedBundle<UnboundedSourceShard<Long, TestCheckpointMark>> residual = inputBundle.withElements( @@ -350,7 +355,8 @@ public class UnboundedReadEvaluatorFactoryTest { TransformEvaluator<UnboundedSourceShard<Long, TestCheckpointMark>> evaluator = factory.forApplication(sourceTransform, inputBundle); evaluator.processElement(shard); - TransformResult result = evaluator.finishBundle(); + TransformResult<UnboundedSourceShard<Long, TestCheckpointMark>> result = + evaluator.finishBundle(); CommittedBundle<UnboundedSourceShard<Long, TestCheckpointMark>> residual = inputBundle.withElements( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java index 741f8f2..e2f987c 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java @@ -118,7 +118,7 @@ public class WindowEvaluatorFactoryTest { UncommittedBundle<Long> outputBundle = createOutputBundle(triggering, inputBundle); - TransformResult result = runEvaluator(triggering, inputBundle, transform); + TransformResult<Long> result = runEvaluator(triggering, inputBundle, transform); assertThat( Iterables.getOnlyElement(result.getOutputBundles()), @@ -143,7 +143,7 @@ public class WindowEvaluatorFactoryTest { BoundedWindow firstSecondWindow = new IntervalWindow(EPOCH, EPOCH.plus(windowDuration)); BoundedWindow thirdWindow = new IntervalWindow(EPOCH.minus(windowDuration), EPOCH); - TransformResult result = runEvaluator(windowed, inputBundle, transform); + TransformResult<Long> result = runEvaluator(windowed, inputBundle, transform); assertThat( Iterables.getOnlyElement(result.getOutputBundles()), @@ -178,7 +178,7 @@ public class WindowEvaluatorFactoryTest { CommittedBundle<Long> inputBundle = createInputBundle(); UncommittedBundle<Long> outputBundle = createOutputBundle(windowed, inputBundle); - TransformResult result = runEvaluator(windowed, inputBundle, transform); + TransformResult<Long> result = runEvaluator(windowed, inputBundle, transform); assertThat( Iterables.getOnlyElement(result.getOutputBundles()), @@ -235,7 +235,7 @@ public class WindowEvaluatorFactoryTest { CommittedBundle<Long> inputBundle = createInputBundle(); UncommittedBundle<Long> outputBundle = createOutputBundle(windowed, inputBundle); - TransformResult result = runEvaluator(windowed, inputBundle, transform); + TransformResult<Long> result = runEvaluator(windowed, inputBundle, transform); assertThat( Iterables.getOnlyElement(result.getOutputBundles()), @@ -301,7 +301,7 @@ public class WindowEvaluatorFactoryTest { return outputBundle; } - private TransformResult runEvaluator( + private TransformResult<Long> runEvaluator( PCollection<Long> windowed, CommittedBundle<Long> inputBundle, Window.Bound<Long> windowTransform /* Required while Window.Bound is a composite */) @@ -313,7 +313,7 @@ public class WindowEvaluatorFactoryTest { evaluator.processElement(valueInGlobalWindow); evaluator.processElement(valueInGlobalAndTwoIntervalWindows); evaluator.processElement(valueInIntervalWindow); - TransformResult result = evaluator.finishBundle(); + TransformResult<Long> result = evaluator.finishBundle(); return result; }