Separate View Evaluation and Overrides This simplifies the View Evaluation by separating it from the logic of overriding CreatePCollectionView.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/69d0b307 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/69d0b307 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/69d0b307 Branch: refs/heads/master Commit: 69d0b3070eaee55f650b15ceda3608cc27807caf Parents: 5f72b83 Author: Thomas Groh <[email protected]> Authored: Mon Mar 27 18:03:30 2017 -0700 Committer: Thomas Groh <[email protected]> Committed: Fri Mar 31 18:08:46 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/direct/DirectRunner.java | 1 - .../direct/TransformEvaluatorRegistry.java | 3 +- .../runners/direct/ViewEvaluatorFactory.java | 79 ++---------- .../runners/direct/ViewOverrideFactory.java | 115 +++++++++++++++++ .../direct/ViewEvaluatorFactoryTest.java | 11 +- .../runners/direct/ViewOverrideFactoryTest.java | 124 +++++++++++++++++++ 6 files changed, 249 insertions(+), 84 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/69d0b307/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index 11fe3f5..bd210c3 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -35,7 +35,6 @@ import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.runners.core.construction.PTransformMatchers; import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult; import org.apache.beam.runners.direct.TestStreamEvaluatorFactory.DirectTestStreamFactory; -import org.apache.beam.runners.direct.ViewEvaluatorFactory.ViewOverrideFactory; import org.apache.beam.sdk.AggregatorRetrievalException; import org.apache.beam.sdk.AggregatorValues; import org.apache.beam.sdk.Pipeline; http://git-wip-us.apache.org/repos/asf/beam/blob/69d0b307/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java index 5ad8709..ae7ad93 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java @@ -30,6 +30,7 @@ import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow; import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.runners.direct.ParDoMultiOverrideFactory.StatefulParDo; +import org.apache.beam.runners.direct.ViewOverrideFactory.WriteView; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.Flatten.PCollections; @@ -54,7 +55,7 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory { .put(ParDo.MultiOutput.class, new ParDoEvaluatorFactory<>(ctxt)) .put(StatefulParDo.class, new StatefulParDoEvaluatorFactory<>(ctxt)) .put(PCollections.class, new FlattenEvaluatorFactory(ctxt)) - .put(ViewEvaluatorFactory.WriteView.class, new ViewEvaluatorFactory(ctxt)) + .put(WriteView.class, new ViewEvaluatorFactory(ctxt)) .put(Window.Assign.class, new WindowEvaluatorFactory(ctxt)) // Runner-specific primitives used in expansion of GroupByKey .put(DirectGroupByKeyOnly.class, new GroupByKeyOnlyEvaluatorFactory(ctxt)) http://git-wip-us.apache.org/repos/asf/beam/blob/69d0b307/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java index 9dcbf9e..dc74d3e 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java @@ -20,32 +20,25 @@ package org.apache.beam.runners.direct; import com.google.common.collect.Iterables; import java.util.ArrayList; import java.util.List; -import org.apache.beam.runners.core.construction.ForwardingPTransform; -import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory; import org.apache.beam.runners.direct.CommittedResult.OutputType; import org.apache.beam.runners.direct.DirectRunner.PCollectionViewWriter; import org.apache.beam.runners.direct.StepTransformResult.Builder; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.runners.direct.ViewOverrideFactory.WriteView; import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.Values; import org.apache.beam.sdk.transforms.View.CreatePCollectionView; -import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; /** - * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the - * {@link CreatePCollectionView} primitive {@link PTransform}. + * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the {@link CreatePCollectionView} + * primitive {@link PTransform}. * * <p>The {@link ViewEvaluatorFactory} produces {@link TransformEvaluator TransformEvaluators} for - * the {@link WriteView} {@link PTransform}, which is part of the - * {@link DirectCreatePCollectionView} composite transform. This transform is an override for the - * {@link CreatePCollectionView} transform that applies windowing and triggers before the view is - * written. + * the {@link WriteView} {@link PTransform}, which is part of the {@link DirectRunner} override. + * This transform is an override for the {@link CreatePCollectionView} transform that applies + * windowing and triggers before the view is written. */ class ViewEvaluatorFactory implements TransformEvaluatorFactory { private final EvaluationContext context; @@ -91,67 +84,9 @@ class ViewEvaluatorFactory implements TransformEvaluatorFactory { if (!elements.isEmpty()) { resultBuilder = resultBuilder.withAdditionalOutput(OutputType.PCOLLECTION_VIEW); } - return resultBuilder - .build(); + return resultBuilder.build(); } }; } - public static class ViewOverrideFactory<ElemT, ViewT> - extends SingleInputOutputOverrideFactory< - PCollection<ElemT>, PCollectionView<ViewT>, CreatePCollectionView<ElemT, ViewT>> { - @Override - public PTransform<PCollection<ElemT>, PCollectionView<ViewT>> getReplacementTransform( - CreatePCollectionView<ElemT, ViewT> transform) { - return new DirectCreatePCollectionView<>(transform); - } - } - - /** - * An in-process override for {@link CreatePCollectionView}. - */ - private static class DirectCreatePCollectionView<ElemT, ViewT> - extends ForwardingPTransform<PCollection<ElemT>, PCollectionView<ViewT>> { - private final CreatePCollectionView<ElemT, ViewT> og; - - private DirectCreatePCollectionView(CreatePCollectionView<ElemT, ViewT> og) { - this.og = og; - } - - @Override - public PCollectionView<ViewT> expand(PCollection<ElemT> input) { - return input.apply(WithKeys.<Void, ElemT>of((Void) null)) - .setCoder(KvCoder.of(VoidCoder.of(), input.getCoder())) - .apply(GroupByKey.<Void, ElemT>create()) - .apply(Values.<Iterable<ElemT>>create()) - .apply(new WriteView<ElemT, ViewT>(og)); - } - - @Override - protected PTransform<PCollection<ElemT>, PCollectionView<ViewT>> delegate() { - return og; - } - } - - /** - * An in-process implementation of the {@link CreatePCollectionView} primitive. - * - * <p>This implementation requires the input {@link PCollection} to be an iterable - * of {@code WindowedValue<ElemT>}, which is provided - * to {@link PCollectionView#getViewFn()} for conversion to {@link ViewT}. - */ - public static final class WriteView<ElemT, ViewT> - extends PTransform<PCollection<Iterable<ElemT>>, PCollectionView<ViewT>> { - private final CreatePCollectionView<ElemT, ViewT> og; - - WriteView(CreatePCollectionView<ElemT, ViewT> og) { - this.og = og; - } - - @Override - @SuppressWarnings("deprecation") - public PCollectionView<ViewT> expand(PCollection<Iterable<ElemT>> input) { - return og.getView(); - } - } } http://git-wip-us.apache.org/repos/asf/beam/blob/69d0b307/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java new file mode 100644 index 0000000..64e1218 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.direct; + +import com.google.common.collect.Iterables; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.apache.beam.runners.core.construction.ForwardingPTransform; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.runners.PTransformOverrideFactory; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.Values; +import org.apache.beam.sdk.transforms.View.CreatePCollectionView; +import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.TaggedPValue; + +/** + * A {@link PTransformOverrideFactory} that provides overrides for the {@link CreatePCollectionView} + * {@link PTransform}. + */ +class ViewOverrideFactory<ElemT, ViewT> + implements PTransformOverrideFactory< + PCollection<ElemT>, PCollectionView<ViewT>, CreatePCollectionView<ElemT, ViewT>> { + @Override + public PTransform<PCollection<ElemT>, PCollectionView<ViewT>> getReplacementTransform( + CreatePCollectionView<ElemT, ViewT> transform) { + return new GroupAndWriteView<>(transform); + } + + @Override + public PCollection<ElemT> getInput(List<TaggedPValue> inputs, Pipeline p) { + return (PCollection<ElemT>) Iterables.getOnlyElement(inputs).getValue(); + } + + @Override + public Map<PValue, ReplacementOutput> mapOutputs( + List<TaggedPValue> outputs, PCollectionView<ViewT> newOutput) { + return Collections.emptyMap(); + } + + /** The {@link DirectRunner} composite override for {@link CreatePCollectionView}. */ + static class GroupAndWriteView<ElemT, ViewT> + extends ForwardingPTransform<PCollection<ElemT>, PCollectionView<ViewT>> { + private final CreatePCollectionView<ElemT, ViewT> og; + + private GroupAndWriteView(CreatePCollectionView<ElemT, ViewT> og) { + this.og = og; + } + + @Override + public PCollectionView<ViewT> expand(PCollection<ElemT> input) { + return input + .apply(WithKeys.<Void, ElemT>of((Void) null)) + .setCoder(KvCoder.of(VoidCoder.of(), input.getCoder())) + .apply(GroupByKey.<Void, ElemT>create()) + .apply(Values.<Iterable<ElemT>>create()) + .apply(new WriteView<ElemT, ViewT>(og)); + } + + @Override + protected PTransform<PCollection<ElemT>, PCollectionView<ViewT>> delegate() { + return og; + } + } + + /** + * The {@link DirectRunner} implementation of the {@link CreatePCollectionView} primitive. + * + * <p>This implementation requires the input {@link PCollection} to be an iterable of {@code + * WindowedValue<ElemT>}, which is provided to {@link PCollectionView#getViewFn()} for conversion + * to {@link ViewT}. + */ + static final class WriteView<ElemT, ViewT> + extends PTransform<PCollection<Iterable<ElemT>>, PCollectionView<ViewT>> { + private final CreatePCollectionView<ElemT, ViewT> og; + + WriteView(CreatePCollectionView<ElemT, ViewT> og) { + this.og = og; + } + + @Override + @SuppressWarnings("deprecation") + public PCollectionView<ViewT> expand(PCollection<Iterable<ElemT>> input) { + return og.getView(); + } + + @SuppressWarnings("deprecation") + public PCollectionView<ViewT> getView() { + return og.getView(); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/69d0b307/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java index b56bd74..fe55a5f 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java @@ -18,7 +18,6 @@ package org.apache.beam.runners.direct; import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertThat; import static org.mockito.Mockito.mock; @@ -27,7 +26,6 @@ import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableList; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.runners.direct.DirectRunner.PCollectionViewWriter; -import org.apache.beam.runners.direct.ViewEvaluatorFactory.ViewOverrideFactory; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VoidCoder; @@ -71,7 +69,7 @@ public class ViewEvaluatorFactoryTest { .apply(GroupByKey.<Void, String>create()) .apply(Values.<Iterable<String>>create()); PCollectionView<Iterable<String>> view = - concat.apply(new ViewEvaluatorFactory.WriteView<>(createView)); + concat.apply(new ViewOverrideFactory.WriteView<>(createView)); EvaluationContext context = mock(EvaluationContext.class); TestViewWriter<String, Iterable<String>> viewWriter = new TestViewWriter<>(); @@ -94,13 +92,6 @@ public class ViewEvaluatorFactoryTest { WindowedValue.valueInGlobalWindow("foo"), WindowedValue.valueInGlobalWindow("bar"))); } - @Test - public void overrideFactoryGetInputSucceeds() { - ViewOverrideFactory<String, String> factory = new ViewOverrideFactory<>(); - PCollection<String> input = p.apply(Create.of("foo", "bar")); - assertThat(factory.getInput(input.expand(), p), equalTo(input)); - } - private static class TestViewWriter<ElemT, ViewT> implements PCollectionViewWriter<ElemT, ViewT> { private Iterable<WindowedValue<ElemT>> latest; http://git-wip-us.apache.org/repos/asf/beam/blob/69d0b307/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java new file mode 100644 index 0000000..6dcc13c --- /dev/null +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.direct; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +import com.google.common.collect.ImmutableSet; +import java.io.Serializable; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.beam.runners.direct.ViewOverrideFactory.WriteView; +import org.apache.beam.sdk.Pipeline.PipelineVisitor; +import org.apache.beam.sdk.runners.TransformHierarchy.Node; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.View.CreatePCollectionView; +import org.apache.beam.sdk.util.PCollectionViews; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; +import org.hamcrest.Matchers; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link ViewOverrideFactory}. */ +@RunWith(JUnit4.class) +public class ViewOverrideFactoryTest implements Serializable { + @Rule + public transient TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); + + private transient ViewOverrideFactory<Integer, List<Integer>> factory = + new ViewOverrideFactory<>(); + + @Test + public void replacementSucceeds() { + PCollection<Integer> ints = p.apply("CreateContents", Create.of(1, 2, 3)); + final PCollectionView<List<Integer>> view = + PCollectionViews.listView(ints, WindowingStrategy.globalDefault(), ints.getCoder()); + PTransform<PCollection<Integer>, PCollectionView<List<Integer>>> replacementTransform = + factory.getReplacementTransform(CreatePCollectionView.<Integer, List<Integer>>of(view)); + PCollectionView<List<Integer>> afterReplacement = ints.apply(replacementTransform); + assertThat( + "The CreatePCollectionView replacement should return the same View", + afterReplacement, + equalTo(view)); + + PCollection<Set<Integer>> outputViewContents = + p.apply("CreateSingleton", Create.of(0)) + .apply( + "OutputContents", + ParDo.of( + new DoFn<Integer, Set<Integer>>() { + @ProcessElement + public void outputSideInput(ProcessContext context) { + context.output(ImmutableSet.copyOf(context.sideInput(view))); + } + }) + .withSideInputs(view)); + PAssert.thatSingleton(outputViewContents).isEqualTo(ImmutableSet.of(1, 2, 3)); + + p.run(); + } + + @Test + public void replacementGetViewReturnsOriginal() { + final PCollection<Integer> ints = p.apply("CreateContents", Create.of(1, 2, 3)); + final PCollectionView<List<Integer>> view = + PCollectionViews.listView(ints, WindowingStrategy.globalDefault(), ints.getCoder()); + PTransform<PCollection<Integer>, PCollectionView<List<Integer>>> replacement = + factory.getReplacementTransform(CreatePCollectionView.<Integer, List<Integer>>of(view)); + ints.apply(replacement); + final AtomicBoolean writeViewVisited = new AtomicBoolean(); + p.traverseTopologically( + new PipelineVisitor.Defaults() { + @Override + public void visitPrimitiveTransform(Node node) { + if (node.getTransform() instanceof WriteView) { + assertThat( + "There should only be one WriteView primitive in the graph", + writeViewVisited.getAndSet(true), + is(false)); + PCollectionView replacementView = ((WriteView) node.getTransform()).getView(); + assertThat(replacementView, Matchers.<PCollectionView>theInstance(view)); + assertThat(node.getInputs(), hasSize(1)); + } + } + }); + + assertThat(writeViewVisited.get(), is(true)); + } + + @Test + public void overrideFactoryGetInputSucceeds() { + ViewOverrideFactory<String, String> factory = new ViewOverrideFactory<>(); + PCollection<String> input = p.apply(Create.of("foo", "bar")); + assertThat(factory.getInput(input.expand(), p), equalTo(input)); + } +}
