Repository: beam Updated Branches: refs/heads/master 698b89e2b -> fd40d4b29
[BEAM-1377] Splittable DoFn in Dataflow streaming runner Transform expansion and translation for the involved primitive transforms. Of course, the current PR will only work after the respective Dataflow worker and backend changes are released. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a06c8bfa Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a06c8bfa Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a06c8bfa Branch: refs/heads/master Commit: a06c8bfae6fb9e35deeb4adfdd7761889b12be89 Parents: 4f6032c Author: Eugene Kirpichov <[email protected]> Authored: Wed Feb 1 17:26:55 2017 -0800 Committer: Eugene Kirpichov <[email protected]> Committed: Tue Jun 20 16:27:12 2017 -0700 ---------------------------------------------------------------------- runners/google-cloud-dataflow-java/pom.xml | 6 +- .../dataflow/DataflowPipelineTranslator.java | 40 +++++++++ .../beam/runners/dataflow/DataflowRunner.java | 14 +++ .../dataflow/SplittableParDoOverrides.java | 76 +++++++++++++++++ .../runners/dataflow/util/PropertyNames.java | 1 + .../DataflowPipelineTranslatorTest.java | 89 ++++++++++++++++++++ .../beam/sdk/transforms/SplittableDoFnTest.java | 22 ++++- 7 files changed, 246 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/a06c8bfa/runners/google-cloud-dataflow-java/pom.xml ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml index f627f12..d1bce32 100644 --- a/runners/google-cloud-dataflow-java/pom.xml +++ b/runners/google-cloud-dataflow-java/pom.xml @@ -216,13 +216,17 @@ <execution> <id>validates-runner-tests</id> <configuration> + <!-- + UsesSplittableParDoWithWindowedSideInputs because of + https://issues.apache.org/jira/browse/BEAM-2476 + --> <excludedGroups> org.apache.beam.sdk.testing.LargeKeys$Above10MB, org.apache.beam.sdk.testing.UsesDistributionMetrics, org.apache.beam.sdk.testing.UsesGaugeMetrics, org.apache.beam.sdk.testing.UsesSetState, org.apache.beam.sdk.testing.UsesMapState, - org.apache.beam.sdk.testing.UsesSplittableParDo, + org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs, org.apache.beam.sdk.testing.UsesUnboundedPCollections, org.apache.beam.sdk.testing.UsesTestStream, </excludedGroups> http://git-wip-us.apache.org/repos/asf/beam/blob/a06c8bfa/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index afc34e6..bfd9b64 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -56,6 +56,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nullable; +import org.apache.beam.runners.core.construction.SplittableParDo; import org.apache.beam.runners.core.construction.TransformInputs; import org.apache.beam.runners.core.construction.WindowingStrategyTranslation; import org.apache.beam.runners.dataflow.BatchViewOverrides.GroupByKeyAndSortValuesOnly; @@ -886,6 +887,45 @@ public class DataflowPipelineTranslator { // IO Translation. registerTransformTranslator(Read.Bounded.class, new ReadTranslator()); + + /////////////////////////////////////////////////////////////////////////// + // Splittable DoFn translation. + + registerTransformTranslator( + SplittableParDo.ProcessKeyedElements.class, + new TransformTranslator<SplittableParDo.ProcessKeyedElements>() { + @Override + public void translate( + SplittableParDo.ProcessKeyedElements transform, TranslationContext context) { + translateTyped(transform, context); + } + + private <InputT, OutputT, RestrictionT> void translateTyped( + SplittableParDo.ProcessKeyedElements<InputT, OutputT, RestrictionT> transform, + TranslationContext context) { + StepTranslationContext stepContext = + context.addStep(transform, "SplittableProcessKeyed"); + + translateInputs( + stepContext, context.getInput(transform), transform.getSideInputs(), context); + BiMap<Long, TupleTag<?>> outputMap = + translateOutputs(context.getOutputs(transform), stepContext); + stepContext.addInput( + PropertyNames.SERIALIZED_FN, + byteArrayToJsonString( + serializeToByteArray( + DoFnInfo.forFn( + transform.getFn(), + transform.getInputWindowingStrategy(), + transform.getSideInputs(), + transform.getElementCoder(), + outputMap.inverse().get(transform.getMainOutputTag()), + outputMap)))); + stepContext.addInput( + PropertyNames.RESTRICTION_CODER, + CloudObjects.asCloudObject(transform.getRestrictionCoder())); + } + }); } private static void translateInputs( http://git-wip-us.apache.org/repos/asf/beam/blob/a06c8bfa/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index ea9db24..c584b31 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -325,6 +325,20 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { new StreamingFnApiCreateOverrideFactory())); } overridesBuilder + // Support Splittable DoFn for now only in streaming mode. + // The order of the following overrides is important because they are applied in order. + + // By default Dataflow runner replaces single-output ParDo with a ParDoSingle override. + // However, we want a different expansion for single-output splittable ParDo. + .add( + PTransformOverride.of( + PTransformMatchers.splittableParDoSingle(), + new ReflectiveOneToOneOverrideFactory( + SplittableParDoOverrides.ParDoSingleViaMulti.class, this))) + .add( + PTransformOverride.of( + PTransformMatchers.splittableParDoMulti(), + new SplittableParDoOverrides.SplittableParDoOverrideFactory())) .add( // Streaming Bounded Read is implemented in terms of Streaming Unbounded Read, and // must precede it http://git-wip-us.apache.org/repos/asf/beam/blob/a06c8bfa/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/SplittableParDoOverrides.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/SplittableParDoOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/SplittableParDoOverrides.java new file mode 100644 index 0000000..9322878 --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/SplittableParDoOverrides.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow; + +import java.util.Map; +import org.apache.beam.runners.core.construction.ForwardingPTransform; +import org.apache.beam.runners.core.construction.PTransformReplacements; +import org.apache.beam.runners.core.construction.ReplacementOutputs; +import org.apache.beam.runners.core.construction.SplittableParDo; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.runners.PTransformOverrideFactory; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; + +/** Transform overrides for supporting {@link SplittableParDo} in the Dataflow runner. */ +class SplittableParDoOverrides { + static class ParDoSingleViaMulti<InputT, OutputT> + extends ForwardingPTransform<PCollection<? extends InputT>, PCollection<OutputT>> { + private final ParDo.SingleOutput<InputT, OutputT> original; + + public ParDoSingleViaMulti( + DataflowRunner ignored, ParDo.SingleOutput<InputT, OutputT> original) { + this.original = original; + } + + @Override + protected PTransform<PCollection<? extends InputT>, PCollection<OutputT>> delegate() { + return original; + } + + @Override + public PCollection<OutputT> expand(PCollection<? extends InputT> input) { + TupleTag<OutputT> mainOutput = new TupleTag<>(); + return input.apply(original.withOutputTags(mainOutput, TupleTagList.empty())).get(mainOutput); + } + } + + static class SplittableParDoOverrideFactory<InputT, OutputT, RestrictionT> + implements PTransformOverrideFactory< + PCollection<InputT>, PCollectionTuple, ParDo.MultiOutput<InputT, OutputT>> { + @Override + public PTransformReplacement<PCollection<InputT>, PCollectionTuple> getReplacementTransform( + AppliedPTransform<PCollection<InputT>, PCollectionTuple, ParDo.MultiOutput<InputT, OutputT>> + appliedTransform) { + return PTransformReplacement.of( + PTransformReplacements.getSingletonMainInput(appliedTransform), + new SplittableParDo<>(appliedTransform.getTransform())); + } + + @Override + public Map<PValue, ReplacementOutput> mapOutputs( + Map<TupleTag<?>, PValue> outputs, PCollectionTuple newOutput) { + return ReplacementOutputs.tagged(outputs, newOutput); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/a06c8bfa/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java index f82f1f1..55e0c4e 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java @@ -63,4 +63,5 @@ public class PropertyNames { public static final String USES_KEYED_STATE = "uses_keyed_state"; public static final String VALUE = "value"; public static final String DISPLAY_DATA = "display_data"; + public static final String RESTRICTION_CODER = "restriction_coder"; } http://git-wip-us.apache.org/repos/asf/beam/blob/a06c8bfa/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index 53215f6..948af1c 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -18,11 +18,14 @@ package org.apache.beam.runners.dataflow; import static org.apache.beam.runners.dataflow.util.Structs.getString; +import static org.apache.beam.sdk.util.StringUtils.jsonStringToByteArray; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.hasKey; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -66,11 +69,15 @@ import java.util.Set; import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions; +import org.apache.beam.runners.dataflow.util.CloudObject; +import org.apache.beam.runners.dataflow.util.CloudObjects; +import org.apache.beam.runners.dataflow.util.DoFnInfo; import org.apache.beam.runners.dataflow.util.OutputReference; import org.apache.beam.runners.dataflow.util.PropertyNames; import org.apache.beam.runners.dataflow.util.Structs; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.extensions.gcp.auth.TestCredential; @@ -91,7 +98,13 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.splittabledofn.OffsetRange; +import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.GcsUtil; +import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.gcsfs.GcsPath; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -100,6 +113,8 @@ import org.apache.beam.sdk.values.PDone; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.sdk.values.WindowingStrategy; +import org.hamcrest.Matchers; +import org.joda.time.Duration; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -896,6 +911,68 @@ public class DataflowPipelineTranslatorTest implements Serializable { not(equalTo("true"))); } + /** + * Smoke test to fail fast if translation of a splittable ParDo + * in streaming breaks. + */ + @Test + public void testStreamingSplittableParDoTranslation() throws Exception { + DataflowPipelineOptions options = buildPipelineOptions(); + DataflowRunner runner = DataflowRunner.fromOptions(options); + options.setStreaming(true); + DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options); + + Pipeline pipeline = Pipeline.create(options); + + PCollection<String> windowedInput = pipeline + .apply(Create.of("a")) + .apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(1)))); + windowedInput.apply(ParDo.of(new TestSplittableFn())); + + runner.replaceTransforms(pipeline); + + Job job = + translator + .translate( + pipeline, + runner, + Collections.<DataflowPackage>emptyList()) + .getJob(); + + // The job should contain a SplittableParDo.ProcessKeyedElements step, translated as + // "SplittableProcessKeyed". + + List<Step> steps = job.getSteps(); + Step processKeyedStep = null; + for (Step step : steps) { + if (step.getKind().equals("SplittableProcessKeyed")) { + assertNull(processKeyedStep); + processKeyedStep = step; + } + } + assertNotNull(processKeyedStep); + + @SuppressWarnings({"unchecked", "rawtypes"}) + DoFnInfo<String, Integer> fnInfo = + (DoFnInfo<String, Integer>) + SerializableUtils.deserializeFromByteArray( + jsonStringToByteArray( + Structs.getString( + processKeyedStep.getProperties(), PropertyNames.SERIALIZED_FN)), + "DoFnInfo"); + assertThat(fnInfo.getDoFn(), instanceOf(TestSplittableFn.class)); + assertThat( + fnInfo.getWindowingStrategy().getWindowFn(), + Matchers.<WindowFn>equalTo(FixedWindows.of(Duration.standardMinutes(1)))); + Coder<?> restrictionCoder = + CloudObjects.coderFromCloudObject( + (CloudObject) + Structs.getObject( + processKeyedStep.getProperties(), PropertyNames.RESTRICTION_CODER)); + + assertEquals(SerializableCoder.of(OffsetRange.class), restrictionCoder); + } + @Test public void testToSingletonTranslationWithIsmSideInput() throws Exception { // A "change detector" test that makes sure the translation @@ -1090,4 +1167,16 @@ public class DataflowPipelineTranslatorTest implements Serializable { assertTrue(String.format("Found duplicate output ids %s", outputIds), outputIds.size() == 0); } + + private static class TestSplittableFn extends DoFn<String, Integer> { + @ProcessElement + public void process(ProcessContext c, OffsetRangeTracker tracker) { + // noop + } + + @GetInitialRestriction + public OffsetRange getInitialRange(String element) { + return null; + } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/a06c8bfa/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java index 646d8d3..0c2bd1c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.transforms; import static com.google.common.base.Preconditions.checkState; +import static org.apache.beam.sdk.testing.TestPipeline.testingPipelineOptions; import static org.hamcrest.Matchers.greaterThan; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -33,6 +34,8 @@ import org.apache.beam.sdk.coders.BigEndianIntegerCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestStream; @@ -97,8 +100,25 @@ public class SplittableDoFnTest implements Serializable { } } + private static PipelineOptions streamingTestPipelineOptions() { + // Using testing options with streaming=true makes it possible to enable UsesSplittableParDo + // tests in Dataflow runner, because as of writing, it can run Splittable DoFn only in + // streaming mode. + // This is a no-op for other runners currently (Direct runner doesn't care, and other + // runners don't implement SDF at all yet). + // + // This is a workaround until https://issues.apache.org/jira/browse/BEAM-1620 + // is properly implemented and supports marking tests as streaming-only. + // + // https://issues.apache.org/jira/browse/BEAM-2483 specifically tracks the removal of the + // current workaround. + PipelineOptions options = testingPipelineOptions(); + options.as(StreamingOptions.class).setStreaming(true); + return options; + } + @Rule - public final transient TestPipeline p = TestPipeline.create(); + public final transient TestPipeline p = TestPipeline.fromOptions(streamingTestPipelineOptions()); @Test @Category({ValidatesRunner.class, UsesSplittableParDo.class})
