Ensure all Read outputs are consumed in Dataflow Apply a no-op ParDo to any PTransform that is not consumed.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/418c304d Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/418c304d Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/418c304d Branch: refs/heads/gearpump-runner Commit: 418c304dbff1ce8c176d08c890780ec97245aaae Parents: 714fdd2 Author: Thomas Groh <[email protected]> Authored: Tue Apr 18 17:25:59 2017 -0700 Committer: Thomas Groh <[email protected]> Committed: Wed Apr 19 10:53:30 2017 -0700 ---------------------------------------------------------------------- .../core/construction/UnconsumedReads.java | 72 +++++++++++++ .../core/construction/UnconsumedReadsTest.java | 105 +++++++++++++++++++ .../beam/runners/dataflow/DataflowRunner.java | 4 + .../runners/dataflow/DataflowRunnerTest.java | 24 +++++ 4 files changed, 205 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/418c304d/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnconsumedReads.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnconsumedReads.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnconsumedReads.java new file mode 100644 index 0000000..c191eeb --- /dev/null +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnconsumedReads.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.construction; + +import java.util.HashSet; +import java.util.Set; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.Pipeline.PipelineVisitor; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.runners.TransformHierarchy.Node; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PValue; + +/** + * Utilities for ensuring that all {@link Read} {@link PTransform PTransforms} are consumed by some + * {@link PTransform}. + */ +public class UnconsumedReads { + public static void ensureAllReadsConsumed(Pipeline pipeline) { + final Set<PCollection<?>> unconsumed = new HashSet<>(); + pipeline.traverseTopologically( + new PipelineVisitor.Defaults() { + @Override + public void visitPrimitiveTransform(Node node) { + unconsumed.removeAll(node.getInputs().values()); + } + + @Override + public void visitValue(PValue value, Node producer) { + if (producer.getTransform() instanceof Read.Bounded + || producer.getTransform() instanceof Read.Unbounded) { + unconsumed.add((PCollection<?>) value); + } + } + }); + int i = 0; + for (PCollection<?> unconsumedPCollection : unconsumed) { + consume(unconsumedPCollection, i); + i++; + } + } + + private static <T> void consume(PCollection<T> unconsumedPCollection, int uniq) { + // Multiple applications should never break due to stable unique names. + String uniqueName = "DropInputs" + (uniq == 0 ? "" : uniq); + unconsumedPCollection.apply(uniqueName, ParDo.of(new NoOpDoFn<T>())); + } + + private static class NoOpDoFn<T> extends DoFn<T, T> { + @ProcessElement + public void doNothing(ProcessContext context) {} + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/418c304d/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnconsumedReadsTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnconsumedReadsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnconsumedReadsTest.java new file mode 100644 index 0000000..1966a93 --- /dev/null +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnconsumedReadsTest.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.construction; + +import static org.junit.Assert.assertThat; + +import java.util.HashSet; +import java.util.Set; +import org.apache.beam.sdk.Pipeline.PipelineVisitor; +import org.apache.beam.sdk.io.CountingSource; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.io.Read.Bounded; +import org.apache.beam.sdk.io.Read.Unbounded; +import org.apache.beam.sdk.runners.TransformHierarchy.Node; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.PValue; +import org.hamcrest.Matchers; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link UnconsumedReads}. + */ +@RunWith(JUnit4.class) +public class UnconsumedReadsTest { + @Rule public TestPipeline pipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false); + + @Test + public void matcherProducesUnconsumedValueBoundedRead() { + Bounded<Long> transform = Read.from(CountingSource.upTo(20L)); + PCollection<Long> output = pipeline.apply(transform); + UnconsumedReads.ensureAllReadsConsumed(pipeline); + validateConsumed(); + } + + @Test + public void matcherProducesUnconsumedValueUnboundedRead() { + Unbounded<Long> transform = Read.from(CountingSource.unbounded()); + PCollection<Long> output = pipeline.apply(transform); + UnconsumedReads.ensureAllReadsConsumed(pipeline); + validateConsumed(); + } + + @Test + public void doesNotConsumeAlreadyConsumedRead() { + Unbounded<Long> transform = Read.from(CountingSource.unbounded()); + final PCollection<Long> output = pipeline.apply(transform); + final Flatten.PCollections<Long> consumer = Flatten.<Long>pCollections(); + PCollectionList.of(output).apply(consumer); + UnconsumedReads.ensureAllReadsConsumed(pipeline); + pipeline.traverseTopologically( + new PipelineVisitor.Defaults() { + @Override + public void visitPrimitiveTransform(Node node) { + // The output should only be consumed by a single consumer + if (node.getInputs().values().contains(output)) { + assertThat(node.getTransform(), Matchers.<PTransform<?, ?>>is(consumer)); + } + } + }); + } + + private void validateConsumed() { + final Set<PValue> consumedOutputs = new HashSet<PValue>(); + final Set<PValue> allReadOutputs = new HashSet<PValue>(); + pipeline.traverseTopologically( + new PipelineVisitor.Defaults() { + @Override + public void visitPrimitiveTransform(Node node) { + consumedOutputs.addAll(node.getInputs().values()); + } + + @Override + public void visitValue(PValue value, Node producer) { + if (producer.getTransform() instanceof Read.Bounded + || producer.getTransform() instanceof Read.Unbounded) { + allReadOutputs.add(value); + } + } + }); + assertThat(consumedOutputs, Matchers.hasItems(allReadOutputs.toArray(new PValue[0]))); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/418c304d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 4eec6b8..2912fa7 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -65,6 +65,7 @@ import org.apache.beam.runners.core.construction.PTransformReplacements; import org.apache.beam.runners.core.construction.ReplacementOutputs; import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory; import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource; +import org.apache.beam.runners.core.construction.UnconsumedReads; import org.apache.beam.runners.dataflow.BatchViewOverrides.BatchCombineGloballyAsSingletonViewFactory; import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification; import org.apache.beam.runners.dataflow.StreamingViewOverrides.StreamingCreatePCollectionViewFactory; @@ -690,6 +691,9 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { @VisibleForTesting void replaceTransforms(Pipeline pipeline) { boolean streaming = options.isStreaming() || containsUnboundedPCollection(pipeline); + // Ensure all outputs of all reads are consumed before potentially replacing any + // Read PTransforms + UnconsumedReads.ensureAllReadsConsumed(pipeline); pipeline.replaceAll(getOverrides(streaming)); } http://git-wip-us.apache.org/repos/asf/beam/blob/418c304d/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index 79a96e7..36704bc 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -23,6 +23,7 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.startsWith; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -57,6 +58,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Pattern; import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; @@ -65,11 +67,13 @@ import org.apache.beam.sdk.Pipeline.PipelineVisitor; import org.apache.beam.sdk.coders.BigEndianIntegerCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.io.TextIO.Read; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.runners.TransformHierarchy; +import org.apache.beam.sdk.runners.TransformHierarchy.Node; import org.apache.beam.sdk.testing.ExpectedLogs; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; @@ -331,6 +335,26 @@ public class DataflowRunnerTest { .apply(TextIO.Write.to(options.getOutput()).withoutValidation()); } + /** + * Tests that all reads are consumed by at least one {@link PTransform}. + */ + @Test + public void testUnconsumedReads() throws IOException { + DataflowPipelineOptions dataflowOptions = buildPipelineOptions(); + RuntimeTestOptions options = dataflowOptions.as(RuntimeTestOptions.class); + Pipeline p = buildDataflowPipeline(dataflowOptions); + PCollection<String> unconsumed = p.apply(Read.from(options.getInput()).withoutValidation()); + DataflowRunner.fromOptions(dataflowOptions).replaceTransforms(p); + final AtomicBoolean unconsumedSeenAsInput = new AtomicBoolean(); + p.traverseTopologically(new PipelineVisitor.Defaults() { + @Override + public void visitPrimitiveTransform(Node node) { + unconsumedSeenAsInput.set(true); + } + }); + assertThat(unconsumedSeenAsInput.get(), is(true)); + } + @Test public void testRunReturnDifferentRequestId() throws IOException { DataflowPipelineOptions options = buildPipelineOptions();
