Propagate key through ParDo if DoFn is key-preserving
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d040b7f6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d040b7f6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d040b7f6 Branch: refs/heads/master Commit: d040b7f6a3cdefde829321015c75a800901cd88f Parents: b26ceaa Author: Kenneth Knowles <[email protected]> Authored: Thu Dec 8 11:44:48 2016 -0800 Committer: Kenneth Knowles <[email protected]> Committed: Tue Dec 20 11:18:04 2016 -0800 ---------------------------------------------------------------------- .../org/apache/beam/runners/direct/ParDoEvaluator.java | 13 +++++++++++-- .../beam/runners/direct/ParDoEvaluatorFactory.java | 3 +++ .../SplittableProcessElementsEvaluatorFactory.java | 1 + .../apache/beam/runners/direct/ParDoEvaluatorTest.java | 1 + 4 files changed, 16 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d040b7f6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java index a915cf0..a5de4c6 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java @@ -47,6 +47,7 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> { AppliedPTransform<?, ?, ?> application, WindowingStrategy<?, ? extends BoundedWindow> windowingStrategy, Serializable fn, // may be OldDoFn or DoFn + StructuralKey<?> key, List<PCollectionView<?>> sideInputs, TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> sideOutputTags, @@ -55,8 +56,16 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> { Map<TupleTag<?>, UncommittedBundle<?>> outputBundles = new HashMap<>(); for (Map.Entry<TupleTag<?>, PCollection<?>> outputEntry : outputs.entrySet()) { - outputBundles.put( - outputEntry.getKey(), evaluationContext.createBundle(outputEntry.getValue())); + // Just trust the context's decision as to whether the output should be keyed. + // The logic for whether this ParDo is key-preserving and whether the input + // is keyed lives elsewhere. + if (evaluationContext.isKeyed(outputEntry.getValue())) { + outputBundles.put( + outputEntry.getKey(), evaluationContext.createKeyedBundle(key, outputEntry.getValue())); + } else { + outputBundles.put( + outputEntry.getKey(), evaluationContext.createBundle(outputEntry.getValue())); + } } BundleOutputManager outputManager = BundleOutputManager.create(outputBundles); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d040b7f6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java index b4684e3..835e6ce 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java @@ -112,6 +112,7 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator return DoFnLifecycleManagerRemovingTransformEvaluator.wrapping( createParDoEvaluator( application, + inputBundleKey, sideInputs, mainOutputTag, sideOutputTags, @@ -123,6 +124,7 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator ParDoEvaluator<InputT, OutputT> createParDoEvaluator( AppliedPTransform<PCollection<InputT>, PCollectionTuple, ?> application, + StructuralKey<?> key, List<PCollectionView<?>> sideInputs, TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> sideOutputTags, @@ -137,6 +139,7 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator application, application.getInput().getWindowingStrategy(), fn, + key, sideInputs, mainOutputTag, sideOutputTags, http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d040b7f6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java index aae1149..18f3909 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java @@ -91,6 +91,7 @@ class SplittableProcessElementsEvaluatorFactory<InputT, OutputT, RestrictionT> parDoEvaluator = delegateFactory.createParDoEvaluator( application, + inputBundle.getKey(), transform.getSideInputs(), transform.getMainOutputTag(), transform.getSideOutputTags().getAll(), http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d040b7f6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java index 1a3207b..b3aceeb 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java @@ -164,6 +164,7 @@ public class ParDoEvaluatorTest { transform, transform.getInput().getWindowingStrategy(), fn, + null /* key */, ImmutableList.<PCollectionView<?>>of(singletonView), mainOutputTag, sideOutputTags,
