Output Keyed Bundles in GroupAlsoByWindowEvaluator This allows reuse of keys for downstream serialization.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1cec9702 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1cec9702 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1cec9702 Branch: refs/heads/gearpump-runner Commit: 1cec9702e62b64252149645627d96889edfeb33e Parents: b41789e Author: Thomas Groh <[email protected]> Authored: Tue Nov 22 14:51:39 2016 -0800 Committer: Kenneth Knowles <[email protected]> Committed: Tue Nov 22 15:05:17 2016 -0800 ---------------------------------------------------------------------- .../main/java/org/apache/beam/runners/direct/DirectRunner.java | 5 ++++- .../beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java | 4 +++- 2 files changed, 7 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1cec9702/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index 0060e84..cb31947 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -31,6 +31,7 @@ import java.util.Map; import java.util.Set; import javax.annotation.Nullable; import org.apache.beam.runners.core.GBKIntoKeyedWorkItems; +import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow; import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly; import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult; import org.apache.beam.runners.direct.TestStreamEvaluatorFactory.DirectTestStreamFactory; @@ -310,7 +311,9 @@ public class DirectRunner KeyedPValueTrackingVisitor keyedPValueVisitor = KeyedPValueTrackingVisitor.create( ImmutableSet.<Class<? extends PTransform>>of( - GroupByKey.class, DirectGroupByKeyOnly.class)); + GBKIntoKeyedWorkItems.class, + DirectGroupByKeyOnly.class, + DirectGroupAlsoByWindow.class)); pipeline.traverseTopologically(keyedPValueVisitor); DisplayDataValidator.validatePipeline(pipeline); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1cec9702/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java index b946e4d..36c742b 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java @@ -112,6 +112,7 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory { private @SuppressWarnings("unchecked") final WindowingStrategy<?, BoundedWindow> windowingStrategy; + private final StructuralKey<?> structuralKey; private final Collection<UncommittedBundle<?>> outputBundles; private final ImmutableList.Builder<WindowedValue<KeyedWorkItem<K, V>>> unprocessedElements; private final AggregatorContainer.Mutator aggregatorChanges; @@ -130,6 +131,7 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory { this.evaluationContext = evaluationContext; this.application = application; + structuralKey = inputBundle.getKey(); stepContext = evaluationContext .getExecutionContext(application, inputBundle.getKey()) .getOrCreateStepContext( @@ -159,7 +161,7 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory { K key = workItem.key(); UncommittedBundle<KV<K, Iterable<V>>> bundle = - evaluationContext.createBundle(application.getOutput()); + evaluationContext.createKeyedBundle(structuralKey, application.getOutput()); outputBundles.add(bundle); CopyOnAccessInMemoryStateInternals<K> stateInternals = (CopyOnAccessInMemoryStateInternals<K>) stepContext.stateInternals();
