Rename ConsumerTrackingPipelineVisitor to DirectGraphVisitor Reduce visibility of Visitor.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/662416a4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/662416a4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/662416a4 Branch: refs/heads/gearpump-runner Commit: 662416a4e176cca252c0d6fde1bf4252aeaa56c0 Parents: 8162cd2 Author: Thomas Groh <[email protected]> Authored: Fri Dec 2 10:07:05 2016 -0800 Committer: Thomas Groh <[email protected]> Committed: Fri Dec 2 14:02:25 2016 -0800 ---------------------------------------------------------------------- .../direct/ConsumerTrackingPipelineVisitor.java | 145 ----------- .../beam/runners/direct/DirectGraphVisitor.java | 145 +++++++++++ .../beam/runners/direct/DirectRunner.java | 8 +- .../ConsumerTrackingPipelineVisitorTest.java | 239 ------------------- .../runners/direct/DirectGraphVisitorTest.java | 239 +++++++++++++++++++ .../runners/direct/EvaluationContextTest.java | 6 +- .../ImmutabilityCheckingBundleFactoryTest.java | 2 +- .../runners/direct/WatermarkManagerTest.java | 8 +- 8 files changed, 396 insertions(+), 396 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/662416a4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitor.java deleted file mode 100644 index b9e77c5..0000000 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitor.java +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.direct; - -import static com.google.common.base.Preconditions.checkState; - -import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.ListMultimap; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.Pipeline.PipelineVisitor; -import org.apache.beam.sdk.runners.PipelineRunner; -import org.apache.beam.sdk.runners.TransformHierarchy; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.PInput; -import org.apache.beam.sdk.values.POutput; -import org.apache.beam.sdk.values.PValue; - -/** - * Tracks the {@link AppliedPTransform AppliedPTransforms} that consume each {@link PValue} in the - * {@link Pipeline}. This is used to schedule consuming {@link PTransform PTransforms} to consume - * input after the upstream transform has produced and committed output. - */ -public class ConsumerTrackingPipelineVisitor extends PipelineVisitor.Defaults { - private Map<POutput, AppliedPTransform<?, ?, ?>> producers = new HashMap<>(); - - private ListMultimap<PInput, AppliedPTransform<?, ?, ?>> primitiveConsumers = - ArrayListMultimap.create(); - - private Set<PCollectionView<?>> views = new HashSet<>(); - private Set<AppliedPTransform<?, ?, ?>> rootTransforms = new HashSet<>(); - private Map<AppliedPTransform<?, ?, ?>, String> stepNames = new HashMap<>(); - private Set<PValue> toFinalize = new HashSet<>(); - private int numTransforms = 0; - private boolean finalized = false; - - @Override - public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { - checkState( - !finalized, - "Attempting to traverse a pipeline (node %s) with a %s " - + "which has already visited a Pipeline and is finalized", - node.getFullName(), - ConsumerTrackingPipelineVisitor.class.getSimpleName()); - return CompositeBehavior.ENTER_TRANSFORM; - } - - @Override - public void leaveCompositeTransform(TransformHierarchy.Node node) { - checkState( - !finalized, - "Attempting to traverse a pipeline (node %s) with a %s which is already finalized", - node.getFullName(), - ConsumerTrackingPipelineVisitor.class.getSimpleName()); - if (node.isRootNode()) { - finalized = true; - } - } - - @Override - public void visitPrimitiveTransform(TransformHierarchy.Node node) { - toFinalize.removeAll(node.getInput().expand()); - AppliedPTransform<?, ?, ?> appliedTransform = getAppliedTransform(node); - stepNames.put(appliedTransform, genStepName()); - if (node.getInput().expand().isEmpty()) { - rootTransforms.add(appliedTransform); - } else { - for (PValue value : node.getInput().expand()) { - primitiveConsumers.put(value, appliedTransform); - } - } - } - - @Override - public void visitValue(PValue value, TransformHierarchy.Node producer) { - toFinalize.add(value); - - AppliedPTransform<?, ?, ?> appliedTransform = getAppliedTransform(producer); - if (!producers.containsKey(value)) { - producers.put(value, appliedTransform); - } - for (PValue expandedValue : value.expand()) { - if (expandedValue instanceof PCollectionView) { - views.add((PCollectionView<?>) expandedValue); - } - if (!producers.containsKey(expandedValue)) { - producers.put(value, appliedTransform); - } - } - } - - private AppliedPTransform<?, ?, ?> getAppliedTransform(TransformHierarchy.Node node) { - @SuppressWarnings({"rawtypes", "unchecked"}) - AppliedPTransform<?, ?, ?> application = AppliedPTransform.of( - node.getFullName(), node.getInput(), node.getOutput(), (PTransform) node.getTransform()); - return application; - } - - private String genStepName() { - return String.format("s%s", numTransforms++); - } - - /** - * Returns all of the {@link PValue PValues} that have been produced but not consumed. These - * {@link PValue PValues} should be finalized by the {@link PipelineRunner} before the - * {@link Pipeline} is executed. - */ - public void finishSpecifyingRemainder() { - checkState( - finalized, - "Can't call finishSpecifyingRemainder before the Pipeline has been completely traversed"); - for (PValue unfinalized : toFinalize) { - unfinalized.finishSpecifying(); - } - } - - /** - * Get the graph constructed by this {@link ConsumerTrackingPipelineVisitor}, which provides - * lookups for producers and consumers of {@link PValue PValues}. - */ - public DirectGraph getGraph() { - checkState(finalized, "Can't get a graph before the Pipeline has been completely traversed"); - return DirectGraph.create(producers, primitiveConsumers, views, rootTransforms, stepNames); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/662416a4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java new file mode 100644 index 0000000..cd9d120 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ListMultimap; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.Pipeline.PipelineVisitor; +import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.runners.TransformHierarchy; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; +import org.apache.beam.sdk.values.PValue; + +/** + * Tracks the {@link AppliedPTransform AppliedPTransforms} that consume each {@link PValue} in the + * {@link Pipeline}. This is used to schedule consuming {@link PTransform PTransforms} to consume + * input after the upstream transform has produced and committed output. + */ +class DirectGraphVisitor extends PipelineVisitor.Defaults { + private Map<POutput, AppliedPTransform<?, ?, ?>> producers = new HashMap<>(); + + private ListMultimap<PInput, AppliedPTransform<?, ?, ?>> primitiveConsumers = + ArrayListMultimap.create(); + + private Set<PCollectionView<?>> views = new HashSet<>(); + private Set<AppliedPTransform<?, ?, ?>> rootTransforms = new HashSet<>(); + private Map<AppliedPTransform<?, ?, ?>, String> stepNames = new HashMap<>(); + private Set<PValue> toFinalize = new HashSet<>(); + private int numTransforms = 0; + private boolean finalized = false; + + @Override + public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { + checkState( + !finalized, + "Attempting to traverse a pipeline (node %s) with a %s " + + "which has already visited a Pipeline and is finalized", + node.getFullName(), + getClass().getSimpleName()); + return CompositeBehavior.ENTER_TRANSFORM; + } + + @Override + public void leaveCompositeTransform(TransformHierarchy.Node node) { + checkState( + !finalized, + "Attempting to traverse a pipeline (node %s) with a %s which is already finalized", + node.getFullName(), + getClass().getSimpleName()); + if (node.isRootNode()) { + finalized = true; + } + } + + @Override + public void visitPrimitiveTransform(TransformHierarchy.Node node) { + toFinalize.removeAll(node.getInput().expand()); + AppliedPTransform<?, ?, ?> appliedTransform = getAppliedTransform(node); + stepNames.put(appliedTransform, genStepName()); + if (node.getInput().expand().isEmpty()) { + rootTransforms.add(appliedTransform); + } else { + for (PValue value : node.getInput().expand()) { + primitiveConsumers.put(value, appliedTransform); + } + } + } + + @Override + public void visitValue(PValue value, TransformHierarchy.Node producer) { + toFinalize.add(value); + + AppliedPTransform<?, ?, ?> appliedTransform = getAppliedTransform(producer); + if (!producers.containsKey(value)) { + producers.put(value, appliedTransform); + } + for (PValue expandedValue : value.expand()) { + if (expandedValue instanceof PCollectionView) { + views.add((PCollectionView<?>) expandedValue); + } + if (!producers.containsKey(expandedValue)) { + producers.put(value, appliedTransform); + } + } + } + + private AppliedPTransform<?, ?, ?> getAppliedTransform(TransformHierarchy.Node node) { + @SuppressWarnings({"rawtypes", "unchecked"}) + AppliedPTransform<?, ?, ?> application = AppliedPTransform.of( + node.getFullName(), node.getInput(), node.getOutput(), (PTransform) node.getTransform()); + return application; + } + + private String genStepName() { + return String.format("s%s", numTransforms++); + } + + /** + * Returns all of the {@link PValue PValues} that have been produced but not consumed. These + * {@link PValue PValues} should be finalized by the {@link PipelineRunner} before the + * {@link Pipeline} is executed. + */ + public void finishSpecifyingRemainder() { + checkState( + finalized, + "Can't call finishSpecifyingRemainder before the Pipeline has been completely traversed"); + for (PValue unfinalized : toFinalize) { + unfinalized.finishSpecifying(); + } + } + + /** + * Get the graph constructed by this {@link DirectGraphVisitor}, which provides + * lookups for producers and consumers of {@link PValue PValues}. + */ + public DirectGraph getGraph() { + checkState(finalized, "Can't get a graph before the Pipeline has been completely traversed"); + return DirectGraph.create(producers, primitiveConsumers, views, rootTransforms, stepNames); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/662416a4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index 0ad5836..2f84356 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -298,9 +298,9 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> { @Override public DirectPipelineResult run(Pipeline pipeline) { MetricsEnvironment.setMetricsSupported(true); - ConsumerTrackingPipelineVisitor consumerTrackingVisitor = new ConsumerTrackingPipelineVisitor(); - pipeline.traverseTopologically(consumerTrackingVisitor); - consumerTrackingVisitor.finishSpecifyingRemainder(); + DirectGraphVisitor graphVisitor = new DirectGraphVisitor(); + pipeline.traverseTopologically(graphVisitor); + graphVisitor.finishSpecifyingRemainder(); @SuppressWarnings("rawtypes") KeyedPValueTrackingVisitor keyedPValueVisitor = @@ -313,7 +313,7 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> { DisplayDataValidator.validatePipeline(pipeline); - DirectGraph graph = consumerTrackingVisitor.getGraph(); + DirectGraph graph = graphVisitor.getGraph(); EvaluationContext context = EvaluationContext.create( getPipelineOptions(), http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/662416a4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java deleted file mode 100644 index 02fe007..0000000 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java +++ /dev/null @@ -1,239 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.direct; - -import static org.hamcrest.Matchers.emptyIterable; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertThat; - -import java.io.Serializable; -import java.util.List; -import org.apache.beam.sdk.io.CountingInput; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.Flatten; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.View; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionList; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.PDone; -import org.apache.beam.sdk.values.PInput; -import org.hamcrest.Matchers; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for {@link ConsumerTrackingPipelineVisitor}. - */ -@RunWith(JUnit4.class) -public class ConsumerTrackingPipelineVisitorTest implements Serializable { - @Rule public transient ExpectedException thrown = ExpectedException.none(); - - private transient TestPipeline p = TestPipeline.create(); - private transient ConsumerTrackingPipelineVisitor visitor = new ConsumerTrackingPipelineVisitor(); - - @Test - public void getViewsReturnsViews() { - PCollectionView<List<String>> listView = - p.apply("listCreate", Create.of("foo", "bar")) - .apply( - ParDo.of( - new DoFn<String, String>() { - @ProcessElement - public void processElement(DoFn<String, String>.ProcessContext c) - throws Exception { - c.output(Integer.toString(c.element().length())); - } - })) - .apply(View.<String>asList()); - PCollectionView<Object> singletonView = - p.apply("singletonCreate", Create.<Object>of(1, 2, 3)).apply(View.<Object>asSingleton()); - p.traverseTopologically(visitor); - assertThat( - visitor.getGraph().getViews(), - Matchers.<PCollectionView<?>>containsInAnyOrder(listView, singletonView)); - } - - @Test - public void getRootTransformsContainsPBegins() { - PCollection<String> created = p.apply(Create.of("foo", "bar")); - PCollection<Long> counted = p.apply(CountingInput.upTo(1234L)); - PCollection<Long> unCounted = p.apply(CountingInput.unbounded()); - p.traverseTopologically(visitor); - assertThat( - visitor.getGraph().getRootTransforms(), - Matchers.<AppliedPTransform<?, ?, ?>>containsInAnyOrder( - created.getProducingTransformInternal(), - counted.getProducingTransformInternal(), - unCounted.getProducingTransformInternal())); - } - - @Test - public void getRootTransformsContainsEmptyFlatten() { - PCollection<String> empty = - PCollectionList.<String>empty(p).apply(Flatten.<String>pCollections()); - p.traverseTopologically(visitor); - assertThat( - visitor.getGraph().getRootTransforms(), - Matchers.<AppliedPTransform<?, ?, ?>>containsInAnyOrder( - empty.getProducingTransformInternal())); - } - - @Test - public void getValueToConsumersSucceeds() { - PCollection<String> created = p.apply(Create.of("1", "2", "3")); - PCollection<String> transformed = - created.apply( - ParDo.of( - new DoFn<String, String>() { - @ProcessElement - public void processElement(DoFn<String, String>.ProcessContext c) - throws Exception { - c.output(Integer.toString(c.element().length())); - } - })); - - PCollection<String> flattened = - PCollectionList.of(created).and(transformed).apply(Flatten.<String>pCollections()); - - p.traverseTopologically(visitor); - - assertThat( - visitor.getGraph().getPrimitiveConsumers(created), - Matchers.<AppliedPTransform<?, ?, ?>>containsInAnyOrder( - transformed.getProducingTransformInternal(), - flattened.getProducingTransformInternal())); - assertThat( - visitor.getGraph().getPrimitiveConsumers(transformed), - Matchers.<AppliedPTransform<?, ?, ?>>containsInAnyOrder( - flattened.getProducingTransformInternal())); - assertThat(visitor.getGraph().getPrimitiveConsumers(flattened), emptyIterable()); - } - - @Test - public void getValueToConsumersWithDuplicateInputSucceeds() { - PCollection<String> created = p.apply(Create.of("1", "2", "3")); - - PCollection<String> flattened = - PCollectionList.of(created).and(created).apply(Flatten.<String>pCollections()); - - p.traverseTopologically(visitor); - - assertThat( - visitor.getGraph().getPrimitiveConsumers(created), - Matchers.<AppliedPTransform<?, ?, ?>>containsInAnyOrder( - flattened.getProducingTransformInternal(), - flattened.getProducingTransformInternal())); - assertThat(visitor.getGraph().getPrimitiveConsumers(flattened), emptyIterable()); - } - - @Test - public void getUnfinalizedPValuesContainsDanglingOutputs() { - PCollection<String> created = p.apply(Create.of("1", "2", "3")); - PCollection<String> transformed = - created.apply( - ParDo.of( - new DoFn<String, String>() { - @ProcessElement - public void processElement(DoFn<String, String>.ProcessContext c) - throws Exception { - c.output(Integer.toString(c.element().length())); - } - })); - - assertThat(transformed.isFinishedSpecifyingInternal(), is(false)); - - p.traverseTopologically(visitor); - visitor.finishSpecifyingRemainder(); - assertThat(transformed.isFinishedSpecifyingInternal(), is(true)); - } - - @Test - public void getStepNamesContainsAllTransforms() { - PCollection<String> created = p.apply(Create.of("1", "2", "3")); - PCollection<String> transformed = - created.apply( - ParDo.of( - new DoFn<String, String>() { - @ProcessElement - public void processElement(DoFn<String, String>.ProcessContext c) - throws Exception { - c.output(Integer.toString(c.element().length())); - } - })); - PDone finished = - transformed.apply( - new PTransform<PInput, PDone>() { - @Override - public PDone apply(PInput input) { - return PDone.in(input.getPipeline()); - } - }); - - p.traverseTopologically(visitor); - DirectGraph graph = visitor.getGraph(); - assertThat(graph.getStepName(graph.getProducer(created)), equalTo("s0")); - assertThat(graph.getStepName(graph.getProducer(transformed)), equalTo("s1")); - // finished doesn't have a producer, because it's not a PValue. - // TODO: Demonstrate that PCollectionList/Tuple and other composite PValues are either safe to - // use, or make them so. - } - - @Test - public void traverseMultipleTimesThrows() { - p.apply(Create.of(1, 2, 3)); - - p.traverseTopologically(visitor); - thrown.expect(IllegalStateException.class); - thrown.expectMessage(ConsumerTrackingPipelineVisitor.class.getSimpleName()); - thrown.expectMessage("is finalized"); - p.traverseTopologically(visitor); - } - - @Test - public void traverseIndependentPathsSucceeds() { - p.apply("left", Create.of(1, 2, 3)); - p.apply("right", Create.of("foo", "bar", "baz")); - - p.traverseTopologically(visitor); - } - - @Test - public void getGraphWithoutVisitingThrows() { - thrown.expect(IllegalStateException.class); - thrown.expectMessage("completely traversed"); - thrown.expectMessage("get a graph"); - visitor.getGraph(); - } - - @Test - public void finishSpecifyingRemainderWithoutVisitingThrows() { - thrown.expect(IllegalStateException.class); - thrown.expectMessage("completely traversed"); - thrown.expectMessage("finishSpecifyingRemainder"); - visitor.finishSpecifyingRemainder(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/662416a4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java new file mode 100644 index 0000000..d218a81 --- /dev/null +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import static org.hamcrest.Matchers.emptyIterable; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +import java.io.Serializable; +import java.util.List; +import org.apache.beam.sdk.io.CountingInput; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.PInput; +import org.hamcrest.Matchers; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link DirectGraphVisitor}. + */ +@RunWith(JUnit4.class) +public class DirectGraphVisitorTest implements Serializable { + @Rule public transient ExpectedException thrown = ExpectedException.none(); + + private transient TestPipeline p = TestPipeline.create(); + private transient DirectGraphVisitor visitor = new DirectGraphVisitor(); + + @Test + public void getViewsReturnsViews() { + PCollectionView<List<String>> listView = + p.apply("listCreate", Create.of("foo", "bar")) + .apply( + ParDo.of( + new DoFn<String, String>() { + @ProcessElement + public void processElement(DoFn<String, String>.ProcessContext c) + throws Exception { + c.output(Integer.toString(c.element().length())); + } + })) + .apply(View.<String>asList()); + PCollectionView<Object> singletonView = + p.apply("singletonCreate", Create.<Object>of(1, 2, 3)).apply(View.<Object>asSingleton()); + p.traverseTopologically(visitor); + assertThat( + visitor.getGraph().getViews(), + Matchers.<PCollectionView<?>>containsInAnyOrder(listView, singletonView)); + } + + @Test + public void getRootTransformsContainsPBegins() { + PCollection<String> created = p.apply(Create.of("foo", "bar")); + PCollection<Long> counted = p.apply(CountingInput.upTo(1234L)); + PCollection<Long> unCounted = p.apply(CountingInput.unbounded()); + p.traverseTopologically(visitor); + assertThat( + visitor.getGraph().getRootTransforms(), + Matchers.<AppliedPTransform<?, ?, ?>>containsInAnyOrder( + created.getProducingTransformInternal(), + counted.getProducingTransformInternal(), + unCounted.getProducingTransformInternal())); + } + + @Test + public void getRootTransformsContainsEmptyFlatten() { + PCollection<String> empty = + PCollectionList.<String>empty(p).apply(Flatten.<String>pCollections()); + p.traverseTopologically(visitor); + assertThat( + visitor.getGraph().getRootTransforms(), + Matchers.<AppliedPTransform<?, ?, ?>>containsInAnyOrder( + empty.getProducingTransformInternal())); + } + + @Test + public void getValueToConsumersSucceeds() { + PCollection<String> created = p.apply(Create.of("1", "2", "3")); + PCollection<String> transformed = + created.apply( + ParDo.of( + new DoFn<String, String>() { + @ProcessElement + public void processElement(DoFn<String, String>.ProcessContext c) + throws Exception { + c.output(Integer.toString(c.element().length())); + } + })); + + PCollection<String> flattened = + PCollectionList.of(created).and(transformed).apply(Flatten.<String>pCollections()); + + p.traverseTopologically(visitor); + + assertThat( + visitor.getGraph().getPrimitiveConsumers(created), + Matchers.<AppliedPTransform<?, ?, ?>>containsInAnyOrder( + transformed.getProducingTransformInternal(), + flattened.getProducingTransformInternal())); + assertThat( + visitor.getGraph().getPrimitiveConsumers(transformed), + Matchers.<AppliedPTransform<?, ?, ?>>containsInAnyOrder( + flattened.getProducingTransformInternal())); + assertThat(visitor.getGraph().getPrimitiveConsumers(flattened), emptyIterable()); + } + + @Test + public void getValueToConsumersWithDuplicateInputSucceeds() { + PCollection<String> created = p.apply(Create.of("1", "2", "3")); + + PCollection<String> flattened = + PCollectionList.of(created).and(created).apply(Flatten.<String>pCollections()); + + p.traverseTopologically(visitor); + + assertThat( + visitor.getGraph().getPrimitiveConsumers(created), + Matchers.<AppliedPTransform<?, ?, ?>>containsInAnyOrder( + flattened.getProducingTransformInternal(), + flattened.getProducingTransformInternal())); + assertThat(visitor.getGraph().getPrimitiveConsumers(flattened), emptyIterable()); + } + + @Test + public void getUnfinalizedPValuesContainsDanglingOutputs() { + PCollection<String> created = p.apply(Create.of("1", "2", "3")); + PCollection<String> transformed = + created.apply( + ParDo.of( + new DoFn<String, String>() { + @ProcessElement + public void processElement(DoFn<String, String>.ProcessContext c) + throws Exception { + c.output(Integer.toString(c.element().length())); + } + })); + + assertThat(transformed.isFinishedSpecifyingInternal(), is(false)); + + p.traverseTopologically(visitor); + visitor.finishSpecifyingRemainder(); + assertThat(transformed.isFinishedSpecifyingInternal(), is(true)); + } + + @Test + public void getStepNamesContainsAllTransforms() { + PCollection<String> created = p.apply(Create.of("1", "2", "3")); + PCollection<String> transformed = + created.apply( + ParDo.of( + new DoFn<String, String>() { + @ProcessElement + public void processElement(DoFn<String, String>.ProcessContext c) + throws Exception { + c.output(Integer.toString(c.element().length())); + } + })); + PDone finished = + transformed.apply( + new PTransform<PInput, PDone>() { + @Override + public PDone apply(PInput input) { + return PDone.in(input.getPipeline()); + } + }); + + p.traverseTopologically(visitor); + DirectGraph graph = visitor.getGraph(); + assertThat(graph.getStepName(graph.getProducer(created)), equalTo("s0")); + assertThat(graph.getStepName(graph.getProducer(transformed)), equalTo("s1")); + // finished doesn't have a producer, because it's not a PValue. + // TODO: Demonstrate that PCollectionList/Tuple and other composite PValues are either safe to + // use, or make them so. + } + + @Test + public void traverseMultipleTimesThrows() { + p.apply(Create.of(1, 2, 3)); + + p.traverseTopologically(visitor); + thrown.expect(IllegalStateException.class); + thrown.expectMessage(DirectGraphVisitor.class.getSimpleName()); + thrown.expectMessage("is finalized"); + p.traverseTopologically(visitor); + } + + @Test + public void traverseIndependentPathsSucceeds() { + p.apply("left", Create.of(1, 2, 3)); + p.apply("right", Create.of("foo", "bar", "baz")); + + p.traverseTopologically(visitor); + } + + @Test + public void getGraphWithoutVisitingThrows() { + thrown.expect(IllegalStateException.class); + thrown.expectMessage("completely traversed"); + thrown.expectMessage("get a graph"); + visitor.getGraph(); + } + + @Test + public void finishSpecifyingRemainderWithoutVisitingThrows() { + thrown.expect(IllegalStateException.class); + thrown.expectMessage("completely traversed"); + thrown.expectMessage("finishSpecifyingRemainder"); + visitor.finishSpecifyingRemainder(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/662416a4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java index 1c2bf14..17cdea1 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java @@ -101,11 +101,11 @@ public class EvaluationContextTest { view = created.apply(View.<Integer>asIterable()); unbounded = p.apply(CountingInput.unbounded()); - ConsumerTrackingPipelineVisitor cVis = new ConsumerTrackingPipelineVisitor(); - p.traverseTopologically(cVis); + DirectGraphVisitor graphVisitor = new DirectGraphVisitor(); + p.traverseTopologically(graphVisitor); bundleFactory = ImmutableListBundleFactory.create(); - graph = cVis.getGraph(); + graph = graphVisitor.getGraph(); context = EvaluationContext.create( runner.getPipelineOptions(), NanosOffsetClock.create(), bundleFactory, graph); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/662416a4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java index e7e1e62..6ab8aea 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java @@ -56,7 +56,7 @@ public class ImmutabilityCheckingBundleFactoryTest { TestPipeline p = TestPipeline.create(); created = p.apply(Create.<byte[]>of().withCoder(ByteArrayCoder.of())); transformed = created.apply(ParDo.of(new IdentityDoFn<byte[]>())); - ConsumerTrackingPipelineVisitor visitor = new ConsumerTrackingPipelineVisitor(); + DirectGraphVisitor visitor = new DirectGraphVisitor(); p.traverseTopologically(visitor); factory = ImmutabilityCheckingBundleFactory.create( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/662416a4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java index 5cde4d6..076e0fb 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java @@ -140,7 +140,7 @@ public class WatermarkManagerTest implements Serializable { consumers.put(flattened, Collections.<AppliedPTransform<?, ?, ?>>emptyList()); clock = MockClock.fromInstant(new Instant(1000)); - ConsumerTrackingPipelineVisitor visitor = new ConsumerTrackingPipelineVisitor(); + DirectGraphVisitor visitor = new DirectGraphVisitor(); p.traverseTopologically(visitor); graph = visitor.getGraph(); @@ -309,9 +309,9 @@ public class WatermarkManagerTest implements Serializable { PCollection<Integer> created = p.apply(Create.of(1, 2, 3)); PCollection<Integer> multiConsumer = PCollectionList.of(created).and(created).apply(Flatten.<Integer>pCollections()); - ConsumerTrackingPipelineVisitor trackingVisitor = new ConsumerTrackingPipelineVisitor(); - p.traverseTopologically(trackingVisitor); - DirectGraph graph = trackingVisitor.getGraph(); + DirectGraphVisitor graphVisitor = new DirectGraphVisitor(); + p.traverseTopologically(graphVisitor); + DirectGraph graph = graphVisitor.getGraph(); AppliedPTransform<?, ?, ?> theFlatten = graph.getProducer(multiConsumer);
