Stop expanding PValues in DirectRunner visitors A PValue always expands to itself, and these calls are unneccessary.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8ef74a74 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8ef74a74 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8ef74a74 Branch: refs/heads/master Commit: 8ef74a744327c40fbb05030fd7657db8a865cb94 Parents: 5d619e8 Author: Thomas Groh <tg...@google.com> Authored: Fri Dec 9 15:52:15 2016 -0800 Committer: Thomas Groh <tg...@google.com> Committed: Mon Dec 12 13:51:29 2016 -0800 ---------------------------------------------------------------------- .../beam/runners/direct/DirectGraphVisitor.java | 14 ++++++-------- .../runners/direct/KeyedPValueTrackingVisitor.java | 2 +- 2 files changed, 7 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8ef74a74/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java index 4f38bce..0283d03 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java @@ -99,14 +99,12 @@ class DirectGraphVisitor extends PipelineVisitor.Defaults { if (!producers.containsKey(value)) { producers.put(value, appliedTransform); } - for (PValue expandedValue : value.expand()) { - if (expandedValue instanceof PCollectionView) { - views.add((PCollectionView<?>) expandedValue); - } - if (!producers.containsKey(expandedValue)) { - producers.put(value, appliedTransform); - } - } + if (value instanceof PCollectionView) { + views.add((PCollectionView<?>) value); + } + if (!producers.containsKey(value)) { + producers.put(value, appliedTransform); + } } private AppliedPTransform<?, ?, ?> getAppliedTransform(TransformHierarchy.Node node) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8ef74a74/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java index 4161f9e..7f85169 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java @@ -84,7 +84,7 @@ class KeyedPValueTrackingVisitor implements PipelineVisitor { @Override public void visitValue(PValue value, TransformHierarchy.Node producer) { if (producesKeyedOutputs.contains(producer.getTransform().getClass())) { - keyedValues.addAll(value.expand()); + keyedValues.add(value); } }