Repository: beam
Updated Branches:
  refs/heads/master 698b89e2b -> fd40d4b29


[BEAM-1377] Splittable DoFn in Dataflow streaming runner

Transform expansion and translation for the involved primitive
transforms. Of course, the current PR will only work after the
respective Dataflow worker and backend changes are released.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a06c8bfa
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a06c8bfa
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a06c8bfa

Branch: refs/heads/master
Commit: a06c8bfae6fb9e35deeb4adfdd7761889b12be89
Parents: 4f6032c
Author: Eugene Kirpichov <[email protected]>
Authored: Wed Feb 1 17:26:55 2017 -0800
Committer: Eugene Kirpichov <[email protected]>
Committed: Tue Jun 20 16:27:12 2017 -0700

----------------------------------------------------------------------
 runners/google-cloud-dataflow-java/pom.xml      |  6 +-
 .../dataflow/DataflowPipelineTranslator.java    | 40 +++++++++
 .../beam/runners/dataflow/DataflowRunner.java   | 14 +++
 .../dataflow/SplittableParDoOverrides.java      | 76 +++++++++++++++++
 .../runners/dataflow/util/PropertyNames.java    |  1 +
 .../DataflowPipelineTranslatorTest.java         | 89 ++++++++++++++++++++
 .../beam/sdk/transforms/SplittableDoFnTest.java | 22 ++++-
 7 files changed, 246 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/a06c8bfa/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml 
b/runners/google-cloud-dataflow-java/pom.xml
index f627f12..d1bce32 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -216,13 +216,17 @@
           <execution>
             <id>validates-runner-tests</id>
             <configuration>
+              <!--
+                UsesSplittableParDoWithWindowedSideInputs because of
+                https://issues.apache.org/jira/browse/BEAM-2476
+              -->
               <excludedGroups>
                 org.apache.beam.sdk.testing.LargeKeys$Above10MB,
                 org.apache.beam.sdk.testing.UsesDistributionMetrics,
                 org.apache.beam.sdk.testing.UsesGaugeMetrics,
                 org.apache.beam.sdk.testing.UsesSetState,
                 org.apache.beam.sdk.testing.UsesMapState,
-                org.apache.beam.sdk.testing.UsesSplittableParDo,
+                
org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs,
                 org.apache.beam.sdk.testing.UsesUnboundedPCollections,
                 org.apache.beam.sdk.testing.UsesTestStream,
               </excludedGroups>

http://git-wip-us.apache.org/repos/asf/beam/blob/a06c8bfa/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index afc34e6..bfd9b64 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -56,6 +56,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.Nullable;
+import org.apache.beam.runners.core.construction.SplittableParDo;
 import org.apache.beam.runners.core.construction.TransformInputs;
 import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
 import 
org.apache.beam.runners.dataflow.BatchViewOverrides.GroupByKeyAndSortValuesOnly;
@@ -886,6 +887,45 @@ public class DataflowPipelineTranslator {
     // IO Translation.
 
     registerTransformTranslator(Read.Bounded.class, new ReadTranslator());
+
+    ///////////////////////////////////////////////////////////////////////////
+    // Splittable DoFn translation.
+
+    registerTransformTranslator(
+        SplittableParDo.ProcessKeyedElements.class,
+        new TransformTranslator<SplittableParDo.ProcessKeyedElements>() {
+          @Override
+          public void translate(
+              SplittableParDo.ProcessKeyedElements transform, 
TranslationContext context) {
+            translateTyped(transform, context);
+          }
+
+          private <InputT, OutputT, RestrictionT> void translateTyped(
+              SplittableParDo.ProcessKeyedElements<InputT, OutputT, 
RestrictionT> transform,
+              TranslationContext context) {
+            StepTranslationContext stepContext =
+                context.addStep(transform, "SplittableProcessKeyed");
+
+            translateInputs(
+                stepContext, context.getInput(transform), 
transform.getSideInputs(), context);
+            BiMap<Long, TupleTag<?>> outputMap =
+                translateOutputs(context.getOutputs(transform), stepContext);
+            stepContext.addInput(
+                PropertyNames.SERIALIZED_FN,
+                byteArrayToJsonString(
+                    serializeToByteArray(
+                        DoFnInfo.forFn(
+                            transform.getFn(),
+                            transform.getInputWindowingStrategy(),
+                            transform.getSideInputs(),
+                            transform.getElementCoder(),
+                            
outputMap.inverse().get(transform.getMainOutputTag()),
+                            outputMap))));
+            stepContext.addInput(
+                PropertyNames.RESTRICTION_CODER,
+                CloudObjects.asCloudObject(transform.getRestrictionCoder()));
+          }
+        });
   }
 
   private static void translateInputs(

http://git-wip-us.apache.org/repos/asf/beam/blob/a06c8bfa/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index ea9db24..c584b31 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -325,6 +325,20 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
                 new StreamingFnApiCreateOverrideFactory()));
       }
       overridesBuilder
+          // Support Splittable DoFn for now only in streaming mode.
+          // The order of the following overrides is important because they 
are applied in order.
+
+          // By default Dataflow runner replaces single-output ParDo with a 
ParDoSingle override.
+          // However, we want a different expansion for single-output 
splittable ParDo.
+          .add(
+              PTransformOverride.of(
+                  PTransformMatchers.splittableParDoSingle(),
+                  new ReflectiveOneToOneOverrideFactory(
+                      SplittableParDoOverrides.ParDoSingleViaMulti.class, 
this)))
+          .add(
+              PTransformOverride.of(
+                  PTransformMatchers.splittableParDoMulti(),
+                  new 
SplittableParDoOverrides.SplittableParDoOverrideFactory()))
           .add(
               // Streaming Bounded Read is implemented in terms of Streaming 
Unbounded Read, and
               // must precede it

http://git-wip-us.apache.org/repos/asf/beam/blob/a06c8bfa/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/SplittableParDoOverrides.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/SplittableParDoOverrides.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/SplittableParDoOverrides.java
new file mode 100644
index 0000000..9322878
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/SplittableParDoOverrides.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+import java.util.Map;
+import org.apache.beam.runners.core.construction.ForwardingPTransform;
+import org.apache.beam.runners.core.construction.PTransformReplacements;
+import org.apache.beam.runners.core.construction.ReplacementOutputs;
+import org.apache.beam.runners.core.construction.SplittableParDo;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.runners.PTransformOverrideFactory;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+
+/** Transform overrides for supporting {@link SplittableParDo} in the Dataflow 
runner. */
+class SplittableParDoOverrides {
+  static class ParDoSingleViaMulti<InputT, OutputT>
+      extends ForwardingPTransform<PCollection<? extends InputT>, 
PCollection<OutputT>> {
+    private final ParDo.SingleOutput<InputT, OutputT> original;
+
+    public ParDoSingleViaMulti(
+        DataflowRunner ignored, ParDo.SingleOutput<InputT, OutputT> original) {
+      this.original = original;
+    }
+
+    @Override
+    protected PTransform<PCollection<? extends InputT>, PCollection<OutputT>> 
delegate() {
+      return original;
+    }
+
+    @Override
+    public PCollection<OutputT> expand(PCollection<? extends InputT> input) {
+      TupleTag<OutputT> mainOutput = new TupleTag<>();
+      return input.apply(original.withOutputTags(mainOutput, 
TupleTagList.empty())).get(mainOutput);
+    }
+  }
+
+  static class SplittableParDoOverrideFactory<InputT, OutputT, RestrictionT>
+      implements PTransformOverrideFactory<
+          PCollection<InputT>, PCollectionTuple, ParDo.MultiOutput<InputT, 
OutputT>> {
+    @Override
+    public PTransformReplacement<PCollection<InputT>, PCollectionTuple> 
getReplacementTransform(
+        AppliedPTransform<PCollection<InputT>, PCollectionTuple, 
ParDo.MultiOutput<InputT, OutputT>>
+            appliedTransform) {
+      return PTransformReplacement.of(
+          PTransformReplacements.getSingletonMainInput(appliedTransform),
+          new SplittableParDo<>(appliedTransform.getTransform()));
+    }
+
+    @Override
+    public Map<PValue, ReplacementOutput> mapOutputs(
+        Map<TupleTag<?>, PValue> outputs, PCollectionTuple newOutput) {
+      return ReplacementOutputs.tagged(outputs, newOutput);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a06c8bfa/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java
index f82f1f1..55e0c4e 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java
@@ -63,4 +63,5 @@ public class PropertyNames {
   public static final String USES_KEYED_STATE = "uses_keyed_state";
   public static final String VALUE = "value";
   public static final String DISPLAY_DATA = "display_data";
+  public static final String RESTRICTION_CODER = "restriction_coder";
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a06c8bfa/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 53215f6..948af1c 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -18,11 +18,14 @@
 package org.apache.beam.runners.dataflow;
 
 import static org.apache.beam.runners.dataflow.util.Structs.getString;
+import static org.apache.beam.sdk.util.StringUtils.jsonStringToByteArray;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasEntry;
 import static org.hamcrest.Matchers.hasKey;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
@@ -66,11 +69,15 @@ import java.util.Set;
 import 
org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import 
org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
+import org.apache.beam.runners.dataflow.util.CloudObject;
+import org.apache.beam.runners.dataflow.util.CloudObjects;
+import org.apache.beam.runners.dataflow.util.DoFnInfo;
 import org.apache.beam.runners.dataflow.util.OutputReference;
 import org.apache.beam.runners.dataflow.util.PropertyNames;
 import org.apache.beam.runners.dataflow.util.Structs;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
@@ -91,7 +98,13 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRange;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.GcsUtil;
+import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.gcsfs.GcsPath;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -100,6 +113,8 @@ import org.apache.beam.sdk.values.PDone;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 import org.apache.beam.sdk.values.WindowingStrategy;
+import org.hamcrest.Matchers;
+import org.joda.time.Duration;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -896,6 +911,68 @@ public class DataflowPipelineTranslatorTest implements 
Serializable {
         not(equalTo("true")));
   }
 
+  /**
+   * Smoke test to fail fast if translation of a splittable ParDo
+   * in streaming breaks.
+   */
+  @Test
+  public void testStreamingSplittableParDoTranslation() throws Exception {
+    DataflowPipelineOptions options = buildPipelineOptions();
+    DataflowRunner runner = DataflowRunner.fromOptions(options);
+    options.setStreaming(true);
+    DataflowPipelineTranslator translator = 
DataflowPipelineTranslator.fromOptions(options);
+
+    Pipeline pipeline = Pipeline.create(options);
+
+    PCollection<String> windowedInput = pipeline
+        .apply(Create.of("a"))
+        
.apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))));
+    windowedInput.apply(ParDo.of(new TestSplittableFn()));
+
+    runner.replaceTransforms(pipeline);
+
+    Job job =
+        translator
+            .translate(
+                pipeline,
+                runner,
+                Collections.<DataflowPackage>emptyList())
+            .getJob();
+
+    // The job should contain a SplittableParDo.ProcessKeyedElements step, 
translated as
+    // "SplittableProcessKeyed".
+
+    List<Step> steps = job.getSteps();
+    Step processKeyedStep = null;
+    for (Step step : steps) {
+      if (step.getKind().equals("SplittableProcessKeyed")) {
+        assertNull(processKeyedStep);
+        processKeyedStep = step;
+      }
+    }
+    assertNotNull(processKeyedStep);
+
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    DoFnInfo<String, Integer> fnInfo =
+        (DoFnInfo<String, Integer>)
+            SerializableUtils.deserializeFromByteArray(
+                jsonStringToByteArray(
+                    Structs.getString(
+                        processKeyedStep.getProperties(), 
PropertyNames.SERIALIZED_FN)),
+                "DoFnInfo");
+    assertThat(fnInfo.getDoFn(), instanceOf(TestSplittableFn.class));
+    assertThat(
+        fnInfo.getWindowingStrategy().getWindowFn(),
+        
Matchers.<WindowFn>equalTo(FixedWindows.of(Duration.standardMinutes(1))));
+    Coder<?> restrictionCoder =
+        CloudObjects.coderFromCloudObject(
+            (CloudObject)
+                Structs.getObject(
+                    processKeyedStep.getProperties(), 
PropertyNames.RESTRICTION_CODER));
+
+    assertEquals(SerializableCoder.of(OffsetRange.class), restrictionCoder);
+  }
+
   @Test
   public void testToSingletonTranslationWithIsmSideInput() throws Exception {
     // A "change detector" test that makes sure the translation
@@ -1090,4 +1167,16 @@ public class DataflowPipelineTranslatorTest implements 
Serializable {
     assertTrue(String.format("Found duplicate output ids %s", outputIds),
         outputIds.size() == 0);
   }
+
+  private static class TestSplittableFn extends DoFn<String, Integer> {
+    @ProcessElement
+    public void process(ProcessContext c, OffsetRangeTracker tracker) {
+      // noop
+    }
+
+    @GetInitialRestriction
+    public OffsetRange getInitialRange(String element) {
+      return null;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a06c8bfa/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
index 646d8d3..0c2bd1c 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.transforms;
 
 import static com.google.common.base.Preconditions.checkState;
+import static org.apache.beam.sdk.testing.TestPipeline.testingPipelineOptions;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -33,6 +34,8 @@ import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.StreamingOptions;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestStream;
@@ -97,8 +100,25 @@ public class SplittableDoFnTest implements Serializable {
     }
   }
 
+  private static PipelineOptions streamingTestPipelineOptions() {
+    // Using testing options with streaming=true makes it possible to enable 
UsesSplittableParDo
+    // tests in Dataflow runner, because as of writing, it can run Splittable 
DoFn only in
+    // streaming mode.
+    // This is a no-op for other runners currently (Direct runner doesn't 
care, and other
+    // runners don't implement SDF at all yet).
+    //
+    // This is a workaround until 
https://issues.apache.org/jira/browse/BEAM-1620
+    // is properly implemented and supports marking tests as streaming-only.
+    //
+    // https://issues.apache.org/jira/browse/BEAM-2483 specifically tracks the 
removal of the
+    // current workaround.
+    PipelineOptions options = testingPipelineOptions();
+    options.as(StreamingOptions.class).setStreaming(true);
+    return options;
+  }
+
   @Rule
-  public final transient TestPipeline p = TestPipeline.create();
+  public final transient TestPipeline p = 
TestPipeline.fromOptions(streamingTestPipelineOptions());
 
   @Test
   @Category({ValidatesRunner.class, UsesSplittableParDo.class})

Reply via email to