Repository: beam Updated Branches: refs/heads/master 30cb93ced -> 2c0cffaf7
Add ReplaceOutputs to PTransformOverrideFactory This maps the outputs produced by applying a Replacement PTransform to the outputs produced by the original PTransform. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/86f00db6 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/86f00db6 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/86f00db6 Branch: refs/heads/master Commit: 86f00db6612e6055c4cc3899f77f196ee682ecf2 Parents: 30cb93c Author: Thomas Groh <[email protected]> Authored: Thu Feb 9 11:11:23 2017 -0800 Committer: Thomas Groh <[email protected]> Committed: Mon Feb 13 10:58:58 2017 -0800 ---------------------------------------------------------------------- .../direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java | 9 +++++++++ .../runners/direct/DirectGroupByKeyOverrideFactory.java | 9 +++++++++ .../beam/runners/direct/ParDoMultiOverrideFactory.java | 9 +++++++++ .../runners/direct/ParDoSingleViaMultiOverrideFactory.java | 9 +++++++++ .../beam/runners/direct/TestStreamEvaluatorFactory.java | 9 +++++++++ .../apache/beam/runners/direct/ViewEvaluatorFactory.java | 9 +++++++++ .../beam/runners/direct/WriteWithShardingFactory.java | 9 +++++++++ .../apache/beam/sdk/runners/PTransformOverrideFactory.java | 8 ++++++++ 8 files changed, 71 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/86f00db6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java index caf61db..8de7b93 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java @@ -19,13 +19,16 @@ package org.apache.beam.runners.direct; import com.google.common.collect.Iterables; import java.util.List; +import java.util.Map; import org.apache.beam.runners.core.KeyedWorkItem; +import org.apache.beam.runners.core.ReplacementOutputs; import org.apache.beam.runners.core.SplittableParDo.GBKIntoKeyedWorkItems; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.runners.PTransformOverrideFactory; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TaggedPValue; /** @@ -47,4 +50,10 @@ class DirectGBKIntoKeyedWorkItemsOverrideFactory<KeyT, InputT> List<TaggedPValue> inputs, Pipeline p) { return (PCollection<KV<KeyT, InputT>>) Iterables.getOnlyElement(inputs).getValue(); } + + @Override + public Map<PValue, ReplacementOutput> mapOutputs( + List<TaggedPValue> outputs, PCollection<KeyedWorkItem<KeyT, InputT>> newOutput) { + return ReplacementOutputs.singleton(outputs, newOutput); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/86f00db6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java index 8a5413b..eedee31 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java @@ -19,12 +19,15 @@ package org.apache.beam.runners.direct; import com.google.common.collect.Iterables; import java.util.List; +import java.util.Map; +import org.apache.beam.runners.core.ReplacementOutputs; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.runners.PTransformOverrideFactory; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TaggedPValue; /** A {@link PTransformOverrideFactory} for {@link GroupByKey} PTransforms. */ @@ -42,4 +45,10 @@ final class DirectGroupByKeyOverrideFactory<K, V> List<TaggedPValue> inputs, Pipeline p) { return (PCollection<KV<K, V>>) Iterables.getOnlyElement(inputs).getValue(); } + + @Override + public Map<PValue, ReplacementOutput> mapOutputs( + List<TaggedPValue> outputs, PCollection<KV<K, Iterable<V>>> newOutput) { + return ReplacementOutputs.singleton(outputs, newOutput); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/86f00db6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java index 483b7ce..ccbde7a 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java @@ -21,9 +21,11 @@ import static com.google.common.base.Preconditions.checkState; import com.google.common.collect.Iterables; import java.util.List; +import java.util.Map; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.KeyedWorkItemCoder; import org.apache.beam.runners.core.KeyedWorkItems; +import org.apache.beam.runners.core.ReplacementOutputs; import org.apache.beam.runners.core.SplittableParDo; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.CannotProvideCoderException; @@ -47,6 +49,7 @@ import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TaggedPValue; import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.sdk.values.TypedPValue; @@ -87,6 +90,12 @@ class ParDoMultiOverrideFactory<InputT, OutputT> return (PCollection<? extends InputT>) Iterables.getOnlyElement(inputs).getValue(); } + @Override + public Map<PValue, ReplacementOutput> mapOutputs( + List<TaggedPValue> outputs, PCollectionTuple newOutput) { + return ReplacementOutputs.tagged(outputs, newOutput); + } + static class GbkThenStatefulParDo<K, InputT, OutputT> extends PTransform<PCollection<KV<K, InputT>>, PCollectionTuple> { private final ParDo.BoundMulti<KV<K, InputT>, OutputT> underlyingParDo; http://git-wip-us.apache.org/repos/asf/beam/blob/86f00db6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java index 6da5bb4..0ac8b04 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java @@ -19,6 +19,8 @@ package org.apache.beam.runners.direct; import com.google.common.collect.Iterables; import java.util.List; +import java.util.Map; +import org.apache.beam.runners.core.ReplacementOutputs; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.runners.PTransformOverrideFactory; import org.apache.beam.sdk.transforms.PTransform; @@ -26,6 +28,7 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.ParDo.Bound; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TaggedPValue; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; @@ -49,6 +52,12 @@ class ParDoSingleViaMultiOverrideFactory<InputT, OutputT> return (PCollection<? extends InputT>) Iterables.getOnlyElement(inputs).getValue(); } + @Override + public Map<PValue, ReplacementOutput> mapOutputs( + List<TaggedPValue> outputs, PCollection<OutputT> newOutput) { + return ReplacementOutputs.singleton(outputs, newOutput); + } + static class ParDoSingleViaMulti<InputT, OutputT> extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> { private static final String MAIN_OUTPUT_TAG = "main"; http://git-wip-us.apache.org/repos/asf/beam/blob/86f00db6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java index b81d7d5..082d33f 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java @@ -27,8 +27,10 @@ import com.google.common.collect.Iterables; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; +import org.apache.beam.runners.core.ReplacementOutputs; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.sdk.Pipeline; @@ -48,6 +50,7 @@ import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; +import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TaggedPValue; import org.apache.beam.sdk.values.TimestampedValue; import org.joda.time.Duration; @@ -175,6 +178,12 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory { return p.begin(); } + @Override + public Map<PValue, ReplacementOutput> mapOutputs( + List<TaggedPValue> outputs, PCollection<T> newOutput) { + return ReplacementOutputs.singleton(outputs, newOutput); + } + static class DirectTestStream<T> extends PTransform<PBegin, PCollection<T>> { private final TestStream<T> original; http://git-wip-us.apache.org/repos/asf/beam/blob/86f00db6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java index 817fb33..6ccc156 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java @@ -20,6 +20,8 @@ package org.apache.beam.runners.direct; import com.google.common.collect.Iterables; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import org.apache.beam.runners.core.ReplacementOutputs; import org.apache.beam.runners.direct.CommittedResult.OutputType; import org.apache.beam.runners.direct.DirectRunner.PCollectionViewWriter; import org.apache.beam.runners.direct.StepTransformResult.Builder; @@ -36,6 +38,7 @@ import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TaggedPValue; /** @@ -113,6 +116,12 @@ class ViewEvaluatorFactory implements TransformEvaluatorFactory { List<TaggedPValue> inputs, Pipeline p) { return (PCollection<ElemT>) Iterables.getOnlyElement(inputs).getValue(); } + + @Override + public Map<PValue, ReplacementOutput> mapOutputs( + List<TaggedPValue> outputs, PCollectionView<ViewT> newOutput) { + return ReplacementOutputs.singleton(outputs, newOutput); + } } /** http://git-wip-us.apache.org/repos/asf/beam/blob/86f00db6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java index 9f5f4bd..966ce4e 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java @@ -22,7 +22,9 @@ import static com.google.common.base.Preconditions.checkArgument; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterables; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.ThreadLocalRandom; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.Write; @@ -42,6 +44,7 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TaggedPValue; import org.joda.time.Duration; @@ -70,6 +73,12 @@ class WriteWithShardingFactory<InputT> return (PCollection<InputT>) Iterables.getOnlyElement(inputs).getValue(); } + @Override + public Map<PValue, ReplacementOutput> mapOutputs( + List<TaggedPValue> outputs, PDone newOutput) { + return Collections.emptyMap(); + } + private static class DynamicallyReshardedWrite<T> extends PTransform<PCollection<T>, PDone> { private final transient Write.Bound<T> original; http://git-wip-us.apache.org/repos/asf/beam/blob/86f00db6/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformOverrideFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformOverrideFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformOverrideFactory.java index 0a167f3..e2b6009 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformOverrideFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformOverrideFactory.java @@ -21,12 +21,14 @@ package org.apache.beam.sdk.runners; import com.google.auto.value.AutoValue; import java.util.List; +import java.util.Map; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; +import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TaggedPValue; /** @@ -49,6 +51,12 @@ public interface PTransformOverrideFactory< InputT getInput(List<TaggedPValue> inputs, Pipeline p); /** + * Returns a {@link Map} from the expanded values in {@code newOutput} to the values produced by + * the original transform. + */ + Map<PValue, ReplacementOutput> mapOutputs(List<TaggedPValue> outputs, OutputT newOutput); + + /** * A mapping between original {@link TaggedPValue} outputs and their replacements. */ @AutoValue
