Repository: beam Updated Branches: refs/heads/master 453e37bc6 -> fbaac0fc8
Migrate DirectRunner one-to-one factories Use SingleInputOutputOverrideFactory to reduce prevalence of boilerplate. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a9361fa6 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a9361fa6 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a9361fa6 Branch: refs/heads/master Commit: a9361fa6b68d3d6f6333872a32fd5f923f9f9673 Parents: 926385c Author: Thomas Groh <[email protected]> Authored: Tue Feb 21 09:55:47 2017 -0800 Committer: Thomas Groh <[email protected]> Committed: Wed Feb 22 09:36:17 2017 -0800 ---------------------------------------------------------------------- ...ectGBKIntoKeyedWorkItemsOverrideFactory.java | 23 ++----------------- .../direct/DirectGroupByKeyOverrideFactory.java | 22 ++---------------- .../ParDoSingleViaMultiOverrideFactory.java | 24 +++----------------- .../runners/direct/ViewEvaluatorFactory.java | 24 +++----------------- .../DirectGroupByKeyOverrideFactoryTest.java | 8 +++---- .../ParDoSingleViaMultiOverrideFactoryTest.java | 3 ++- 6 files changed, 16 insertions(+), 88 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/a9361fa6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java index a957a17..bb90a6c 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java @@ -17,26 +17,19 @@ */ package org.apache.beam.runners.direct; -import com.google.common.collect.Iterables; -import java.util.List; -import java.util.Map; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.SplittableParDo.GBKIntoKeyedWorkItems; -import org.apache.beam.runners.core.construction.ReplacementOutputs; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.runners.PTransformOverrideFactory; +import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.TaggedPValue; /** * Provides an implementation of {@link SplittableParDo.GBKIntoKeyedWorkItems} for the Direct * Runner. */ class DirectGBKIntoKeyedWorkItemsOverrideFactory<KeyT, InputT> - implements PTransformOverrideFactory< + extends SingleInputOutputOverrideFactory< PCollection<KV<KeyT, InputT>>, PCollection<KeyedWorkItem<KeyT, InputT>>, GBKIntoKeyedWorkItems<KeyT, InputT>> { @Override @@ -44,16 +37,4 @@ class DirectGBKIntoKeyedWorkItemsOverrideFactory<KeyT, InputT> getReplacementTransform(GBKIntoKeyedWorkItems<KeyT, InputT> transform) { return new DirectGroupByKey.DirectGroupByKeyOnly<>(); } - - @Override - public PCollection<KV<KeyT, InputT>> getInput( - List<TaggedPValue> inputs, Pipeline p) { - return (PCollection<KV<KeyT, InputT>>) Iterables.getOnlyElement(inputs).getValue(); - } - - @Override - public Map<PValue, ReplacementOutput> mapOutputs( - List<TaggedPValue> outputs, PCollection<KeyedWorkItem<KeyT, InputT>> newOutput) { - return ReplacementOutputs.singleton(outputs, newOutput); - } } http://git-wip-us.apache.org/repos/asf/beam/blob/a9361fa6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java index 1651987..f3b718f 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java @@ -17,38 +17,20 @@ */ package org.apache.beam.runners.direct; -import com.google.common.collect.Iterables; -import java.util.List; -import java.util.Map; -import org.apache.beam.runners.core.construction.ReplacementOutputs; -import org.apache.beam.sdk.Pipeline; +import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory; import org.apache.beam.sdk.runners.PTransformOverrideFactory; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.TaggedPValue; /** A {@link PTransformOverrideFactory} for {@link GroupByKey} PTransforms. */ final class DirectGroupByKeyOverrideFactory<K, V> - implements PTransformOverrideFactory< + extends SingleInputOutputOverrideFactory< PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>, GroupByKey<K, V>> { @Override public PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> getReplacementTransform( GroupByKey<K, V> transform) { return new DirectGroupByKey<>(transform); } - - @Override - public PCollection<KV<K, V>> getInput( - List<TaggedPValue> inputs, Pipeline p) { - return (PCollection<KV<K, V>>) Iterables.getOnlyElement(inputs).getValue(); - } - - @Override - public Map<PValue, ReplacementOutput> mapOutputs( - List<TaggedPValue> outputs, PCollection<KV<K, Iterable<V>>> newOutput) { - return ReplacementOutputs.singleton(outputs, newOutput); - } } http://git-wip-us.apache.org/repos/asf/beam/blob/a9361fa6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java index 990efb3..f859729 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java @@ -17,19 +17,13 @@ */ package org.apache.beam.runners.direct; -import com.google.common.collect.Iterables; -import java.util.List; -import java.util.Map; -import org.apache.beam.runners.core.construction.ReplacementOutputs; -import org.apache.beam.sdk.Pipeline; +import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory; import org.apache.beam.sdk.runners.PTransformOverrideFactory; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.ParDo.Bound; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.TaggedPValue; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; @@ -38,26 +32,14 @@ import org.apache.beam.sdk.values.TupleTagList; * it in terms of multi-output {@link ParDo}. */ class ParDoSingleViaMultiOverrideFactory<InputT, OutputT> - implements PTransformOverrideFactory< - PCollection<? extends InputT>, PCollection<OutputT>, Bound<InputT, OutputT>> { + extends SingleInputOutputOverrideFactory< + PCollection<? extends InputT>, PCollection<OutputT>, Bound<InputT, OutputT>> { @Override public PTransform<PCollection<? extends InputT>, PCollection<OutputT>> getReplacementTransform( Bound<InputT, OutputT> transform) { return new ParDoSingleViaMulti<>(transform); } - @Override - public PCollection<? extends InputT> getInput( - List<TaggedPValue> inputs, Pipeline p) { - return (PCollection<? extends InputT>) Iterables.getOnlyElement(inputs).getValue(); - } - - @Override - public Map<PValue, ReplacementOutput> mapOutputs( - List<TaggedPValue> outputs, PCollection<OutputT> newOutput) { - return ReplacementOutputs.singleton(outputs, newOutput); - } - static class ParDoSingleViaMulti<InputT, OutputT> extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> { private static final String MAIN_OUTPUT_TAG = "main"; http://git-wip-us.apache.org/repos/asf/beam/blob/a9361fa6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java index 49faaa9..0fa6254 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java @@ -20,15 +20,12 @@ package org.apache.beam.runners.direct; import com.google.common.collect.Iterables; import java.util.ArrayList; import java.util.List; -import java.util.Map; -import org.apache.beam.runners.core.construction.ReplacementOutputs; +import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory; import org.apache.beam.runners.direct.CommittedResult.OutputType; import org.apache.beam.runners.direct.DirectRunner.PCollectionViewWriter; import org.apache.beam.runners.direct.StepTransformResult.Builder; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.VoidCoder; -import org.apache.beam.sdk.runners.PTransformOverrideFactory; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; @@ -38,8 +35,6 @@ import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.TaggedPValue; /** * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the @@ -102,26 +97,13 @@ class ViewEvaluatorFactory implements TransformEvaluatorFactory { } public static class ViewOverrideFactory<ElemT, ViewT> - implements PTransformOverrideFactory< - PCollection<ElemT>, PCollectionView<ViewT>, CreatePCollectionView<ElemT, ViewT>> { - + extends SingleInputOutputOverrideFactory< + PCollection<ElemT>, PCollectionView<ViewT>, CreatePCollectionView<ElemT, ViewT>> { @Override public PTransform<PCollection<ElemT>, PCollectionView<ViewT>> getReplacementTransform( CreatePCollectionView<ElemT, ViewT> transform) { return new DirectCreatePCollectionView<>(transform); } - - @Override - public PCollection<ElemT> getInput( - List<TaggedPValue> inputs, Pipeline p) { - return (PCollection<ElemT>) Iterables.getOnlyElement(inputs).getValue(); - } - - @Override - public Map<PValue, ReplacementOutput> mapOutputs( - List<TaggedPValue> outputs, PCollectionView<ViewT> newOutput) { - return ReplacementOutputs.singleton(outputs, newOutput); - } } /** http://git-wip-us.apache.org/repos/asf/beam/blob/a9361fa6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactoryTest.java index 03f1dda..c9fdda0 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactoryTest.java @@ -32,12 +32,12 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -/** - * Tests for {@link DirectGBKIntoKeyedWorkItemsOverrideFactory}. - */ +/** Tests for {@link DirectGBKIntoKeyedWorkItemsOverrideFactory}. */ @RunWith(JUnit4.class) public class DirectGroupByKeyOverrideFactoryTest { - private DirectGroupByKeyOverrideFactory factory = new DirectGroupByKeyOverrideFactory(); + private DirectGroupByKeyOverrideFactory<String, Integer> factory = + new DirectGroupByKeyOverrideFactory<>(); + @Test public void getInputSucceeds() { TestPipeline p = TestPipeline.create(); http://git-wip-us.apache.org/repos/asf/beam/blob/a9361fa6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactoryTest.java index 8f170dd..59577a8 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactoryTest.java @@ -33,7 +33,8 @@ import org.junit.runners.JUnit4; */ @RunWith(JUnit4.class) public class ParDoSingleViaMultiOverrideFactoryTest { - private ParDoSingleViaMultiOverrideFactory factory = new ParDoSingleViaMultiOverrideFactory(); + private ParDoSingleViaMultiOverrideFactory<Integer, Integer> factory = + new ParDoSingleViaMultiOverrideFactory<>(); @Test public void getInputSucceeds() {
