Repository: beam Updated Branches: refs/heads/master 338012d14 -> 6ecbfb9e1
Ensure Composite Nodes produce no output Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0e1893a4 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0e1893a4 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0e1893a4 Branch: refs/heads/master Commit: 0e1893a471bb87fbb48be94370a2c0b0872f23b3 Parents: 338012d Author: Thomas Groh <[email protected]> Authored: Wed Jan 18 11:45:25 2017 -0800 Committer: Thomas Groh <[email protected]> Committed: Mon Jan 23 16:24:23 2017 -0800 ---------------------------------------------------------------------- .../beam/sdk/runners/TransformHierarchy.java | 33 +++++++++++--------- .../sdk/runners/TransformHierarchyTest.java | 30 ++++++++++++++++-- 2 files changed, 46 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/0e1893a4/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java index 3676e1a..dc8f823 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java @@ -290,22 +290,27 @@ public class TransformHierarchy { for (TaggedPValue outputValue : output.expand()) { outputProducers.add(getProducer(outputValue.getValue())); } - if (outputProducers.contains(this) && outputProducers.size() != 1) { - Set<String> otherProducerNames = new HashSet<>(); - for (Node outputProducer : outputProducers) { - if (outputProducer != this) { - otherProducerNames.add(outputProducer.getFullName()); + if (outputProducers.contains(this)) { + if (!parts.isEmpty() || outputProducers.size() > 1) { + Set<String> otherProducerNames = new HashSet<>(); + for (Node outputProducer : outputProducers) { + if (outputProducer != this) { + otherProducerNames.add(outputProducer.getFullName()); + } } + throw new IllegalArgumentException( + String.format( + "Output of composite transform [%s] contains a primitive %s produced by it. " + + "Only primitive transforms are permitted to produce primitive outputs." + + "%n Outputs: %s" + + "%n Other Producers: %s" + + "%n Components: %s", + getFullName(), + POutput.class.getSimpleName(), + output.expand(), + otherProducerNames, + parts)); } - throw new IllegalArgumentException( - String.format( - "Output of transform [%s] contains a %s produced by it as well as other " - + "Transforms. A primitive transform must produce all of its outputs, and " - + "outputs of a composite transform must be produced by a component transform " - + "or be part of the input." - + "%n Other Outputs: %s" - + "%n Other Producers: %s", - getFullName(), POutput.class.getSimpleName(), output.expand(), otherProducerNames)); } } http://git-wip-us.apache.org/repos/asf/beam/blob/0e1893a4/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java index d373caf..9a77b9b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java @@ -140,9 +140,7 @@ public class TransformHierarchyTest implements Serializable { } }); thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("produced by it as well as other Transforms"); - thrown.expectMessage("primitive transform must produce all of its outputs"); - thrown.expectMessage("composite transform must be produced by a component transform"); + thrown.expectMessage("contains a primitive POutput produced by it"); thrown.expectMessage("AddPc"); thrown.expectMessage("Create"); thrown.expectMessage(appended.expand().toString()); @@ -150,6 +148,32 @@ public class TransformHierarchyTest implements Serializable { } @Test + public void producingOwnOutputWithCompositeFails() { + final PCollection<Long> comp = + PCollection.createPrimitiveOutputInternal( + pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED); + PTransform<PBegin, PCollection<Long>> root = + new PTransform<PBegin, PCollection<Long>>() { + @Override + public PCollection<Long> expand(PBegin input) { + return comp; + } + }; + hierarchy.pushNode("Composite", PBegin.in(pipeline), root); + + Create.Values<Integer> create = Create.of(1); + hierarchy.pushNode("Create", PBegin.in(pipeline), create); + hierarchy.setOutput(pipeline.apply(create)); + hierarchy.popNode(); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("contains a primitive POutput produced by it"); + thrown.expectMessage("primitive transforms are permitted to produce"); + thrown.expectMessage("Composite"); + hierarchy.setOutput(comp); + } + + @Test public void visitVisitsAllPushed() { TransformHierarchy.Node root = hierarchy.getCurrent(); PBegin begin = PBegin.in(pipeline);
