Repository: beam
Updated Branches:
  refs/heads/master 30cb93ced -> 2c0cffaf7


Add ReplaceOutputs to PTransformOverrideFactory

This maps the outputs produced by applying a Replacement PTransform to
the outputs produced by the original PTransform.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/86f00db6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/86f00db6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/86f00db6

Branch: refs/heads/master
Commit: 86f00db6612e6055c4cc3899f77f196ee682ecf2
Parents: 30cb93c
Author: Thomas Groh <[email protected]>
Authored: Thu Feb 9 11:11:23 2017 -0800
Committer: Thomas Groh <[email protected]>
Committed: Mon Feb 13 10:58:58 2017 -0800

----------------------------------------------------------------------
 .../direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java  | 9 +++++++++
 .../runners/direct/DirectGroupByKeyOverrideFactory.java     | 9 +++++++++
 .../beam/runners/direct/ParDoMultiOverrideFactory.java      | 9 +++++++++
 .../runners/direct/ParDoSingleViaMultiOverrideFactory.java  | 9 +++++++++
 .../beam/runners/direct/TestStreamEvaluatorFactory.java     | 9 +++++++++
 .../apache/beam/runners/direct/ViewEvaluatorFactory.java    | 9 +++++++++
 .../beam/runners/direct/WriteWithShardingFactory.java       | 9 +++++++++
 .../apache/beam/sdk/runners/PTransformOverrideFactory.java  | 8 ++++++++
 8 files changed, 71 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/86f00db6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
index caf61db..8de7b93 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
@@ -19,13 +19,16 @@ package org.apache.beam.runners.direct;
 
 import com.google.common.collect.Iterables;
 import java.util.List;
+import java.util.Map;
 import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.runners.core.ReplacementOutputs;
 import org.apache.beam.runners.core.SplittableParDo.GBKIntoKeyedWorkItems;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TaggedPValue;
 
 /**
@@ -47,4 +50,10 @@ class DirectGBKIntoKeyedWorkItemsOverrideFactory<KeyT, 
InputT>
       List<TaggedPValue> inputs, Pipeline p) {
     return (PCollection<KV<KeyT, InputT>>) 
Iterables.getOnlyElement(inputs).getValue();
   }
+
+  @Override
+  public Map<PValue, ReplacementOutput> mapOutputs(
+      List<TaggedPValue> outputs, PCollection<KeyedWorkItem<KeyT, InputT>> 
newOutput) {
+    return ReplacementOutputs.singleton(outputs, newOutput);
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/86f00db6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
index 8a5413b..eedee31 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
@@ -19,12 +19,15 @@ package org.apache.beam.runners.direct;
 
 import com.google.common.collect.Iterables;
 import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.core.ReplacementOutputs;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TaggedPValue;
 
 /** A {@link PTransformOverrideFactory} for {@link GroupByKey} PTransforms. */
@@ -42,4 +45,10 @@ final class DirectGroupByKeyOverrideFactory<K, V>
       List<TaggedPValue> inputs, Pipeline p) {
     return (PCollection<KV<K, V>>) Iterables.getOnlyElement(inputs).getValue();
   }
+
+  @Override
+  public Map<PValue, ReplacementOutput> mapOutputs(
+      List<TaggedPValue> outputs, PCollection<KV<K, Iterable<V>>> newOutput) {
+    return ReplacementOutputs.singleton(outputs, newOutput);
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/86f00db6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
index 483b7ce..ccbde7a 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -21,9 +21,11 @@ import static 
com.google.common.base.Preconditions.checkState;
 
 import com.google.common.collect.Iterables;
 import java.util.List;
+import java.util.Map;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItemCoder;
 import org.apache.beam.runners.core.KeyedWorkItems;
+import org.apache.beam.runners.core.ReplacementOutputs;
 import org.apache.beam.runners.core.SplittableParDo;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
@@ -47,6 +49,7 @@ import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TaggedPValue;
 import org.apache.beam.sdk.values.TupleTagList;
 import org.apache.beam.sdk.values.TypedPValue;
@@ -87,6 +90,12 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
     return (PCollection<? extends InputT>) 
Iterables.getOnlyElement(inputs).getValue();
   }
 
+  @Override
+  public Map<PValue, ReplacementOutput> mapOutputs(
+      List<TaggedPValue> outputs, PCollectionTuple newOutput) {
+    return ReplacementOutputs.tagged(outputs, newOutput);
+  }
+
   static class GbkThenStatefulParDo<K, InputT, OutputT>
       extends PTransform<PCollection<KV<K, InputT>>, PCollectionTuple> {
     private final ParDo.BoundMulti<KV<K, InputT>, OutputT> underlyingParDo;

http://git-wip-us.apache.org/repos/asf/beam/blob/86f00db6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java
index 6da5bb4..0ac8b04 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java
@@ -19,6 +19,8 @@ package org.apache.beam.runners.direct;
 
 import com.google.common.collect.Iterables;
 import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.core.ReplacementOutputs;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -26,6 +28,7 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.ParDo.Bound;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TaggedPValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
@@ -49,6 +52,12 @@ class ParDoSingleViaMultiOverrideFactory<InputT, OutputT>
     return (PCollection<? extends InputT>) 
Iterables.getOnlyElement(inputs).getValue();
   }
 
+  @Override
+  public Map<PValue, ReplacementOutput> mapOutputs(
+      List<TaggedPValue> outputs, PCollection<OutputT> newOutput) {
+    return ReplacementOutputs.singleton(outputs, newOutput);
+  }
+
   static class ParDoSingleViaMulti<InputT, OutputT>
       extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
     private static final String MAIN_OUTPUT_TAG = "main";

http://git-wip-us.apache.org/repos/asf/beam/blob/86f00db6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
index b81d7d5..082d33f 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
@@ -27,8 +27,10 @@ import com.google.common.collect.Iterables;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
+import org.apache.beam.runners.core.ReplacementOutputs;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.Pipeline;
@@ -48,6 +50,7 @@ import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TaggedPValue;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.joda.time.Duration;
@@ -175,6 +178,12 @@ class TestStreamEvaluatorFactory implements 
TransformEvaluatorFactory {
       return p.begin();
     }
 
+    @Override
+    public Map<PValue, ReplacementOutput> mapOutputs(
+        List<TaggedPValue> outputs, PCollection<T> newOutput) {
+      return ReplacementOutputs.singleton(outputs, newOutput);
+    }
+
     static class DirectTestStream<T> extends PTransform<PBegin, 
PCollection<T>> {
       private final TestStream<T> original;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/86f00db6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
index 817fb33..6ccc156 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
@@ -20,6 +20,8 @@ package org.apache.beam.runners.direct;
 import com.google.common.collect.Iterables;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.core.ReplacementOutputs;
 import org.apache.beam.runners.direct.CommittedResult.OutputType;
 import org.apache.beam.runners.direct.DirectRunner.PCollectionViewWriter;
 import org.apache.beam.runners.direct.StepTransformResult.Builder;
@@ -36,6 +38,7 @@ import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TaggedPValue;
 
 /**
@@ -113,6 +116,12 @@ class ViewEvaluatorFactory implements 
TransformEvaluatorFactory {
         List<TaggedPValue> inputs, Pipeline p) {
       return (PCollection<ElemT>) Iterables.getOnlyElement(inputs).getValue();
     }
+
+    @Override
+    public Map<PValue, ReplacementOutput> mapOutputs(
+        List<TaggedPValue> outputs, PCollectionView<ViewT> newOutput) {
+      return ReplacementOutputs.singleton(outputs, newOutput);
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/86f00db6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
index 9f5f4bd..966ce4e 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
@@ -22,7 +22,9 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.Write;
@@ -42,6 +44,7 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TaggedPValue;
 import org.joda.time.Duration;
 
@@ -70,6 +73,12 @@ class WriteWithShardingFactory<InputT>
     return (PCollection<InputT>) Iterables.getOnlyElement(inputs).getValue();
   }
 
+  @Override
+  public Map<PValue, ReplacementOutput> mapOutputs(
+      List<TaggedPValue> outputs, PDone newOutput) {
+    return Collections.emptyMap();
+  }
+
   private static class DynamicallyReshardedWrite<T> extends 
PTransform<PCollection<T>, PDone> {
     private final transient Write.Bound<T> original;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/86f00db6/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformOverrideFactory.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformOverrideFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformOverrideFactory.java
index 0a167f3..e2b6009 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformOverrideFactory.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformOverrideFactory.java
@@ -21,12 +21,14 @@ package org.apache.beam.sdk.runners;
 
 import com.google.auto.value.AutoValue;
 import java.util.List;
+import java.util.Map;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TaggedPValue;
 
 /**
@@ -49,6 +51,12 @@ public interface PTransformOverrideFactory<
   InputT getInput(List<TaggedPValue> inputs, Pipeline p);
 
   /**
+   * Returns a {@link Map} from the expanded values in {@code newOutput} to 
the values produced by
+   * the original transform.
+   */
+  Map<PValue, ReplacementOutput> mapOutputs(List<TaggedPValue> outputs, 
OutputT newOutput);
+
+  /**
    * A mapping between original {@link TaggedPValue} outputs and their 
replacements.
    */
   @AutoValue

Reply via email to