[ 
https://issues.apache.org/jira/browse/BEAM-2898?focusedWorklogId=90921&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-90921
 ]

ASF GitHub Bot logged work on BEAM-2898:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 13/Apr/18 18:15
            Start Date: 13/Apr/18 18:15
    Worklog Time Spent: 10m 
      Work Description: tgroh closed pull request #4783: [BEAM-2898] Support 
Impulse transforms in Flink batch runner
URL: https://github.com/apache/beam/pull/4783
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
index 5faf95f8853..04147ba9bf5 100644
--- a/runners/apex/pom.xml
+++ b/runners/apex/pom.xml
@@ -231,6 +231,7 @@
                 org.apache.beam.sdk.testing.UsesSplittableParDo,
                 org.apache.beam.sdk.testing.UsesAttemptedMetrics,
                 org.apache.beam.sdk.testing.UsesCommittedMetrics,
+                org.apache.beam.sdk.testing.UsesImpulse,
                 org.apache.beam.sdk.testing.UsesTestStream
               </excludedGroups>
               <parallel>none</parallel>
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/JavaReadViaImpulse.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/JavaReadViaImpulse.java
index 40e08360087..128b826ecb0 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/JavaReadViaImpulse.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/JavaReadViaImpulse.java
@@ -64,6 +64,8 @@ private static PTransformMatcher boundedMatcher() {
             ReadTranslation.sourceIsBounded(transform) == 
PCollection.IsBounded.BOUNDED);
   }
 
+  // TODO: https://issues.apache.org/jira/browse/BEAM-3859 Support unbounded 
reads via impulse.
+
   private static class BoundedReadViaImpulse<T> extends PTransform<PBegin, 
PCollection<T>> {
     private final BoundedSource<T> source;
 
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuser.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuser.java
index 35e079151bd..abbba2a91a9 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuser.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuser.java
@@ -70,13 +70,13 @@ private GreedyPipelineFuser(Pipeline p) {
   }
 
   /**
-   * Fuses a {@link Pipeline} into a collection of {@link ExecutableStage}s.
+   * Fuses a {@link Pipeline} into a collection of {@link ExecutableStage 
ExecutableStages}.
    *
    * <p>This fuser expects each ExecutableStage to have exactly one input. 
This means that pipelines
    * must be rooted at Impulse, or other runner-executed primitive transforms, 
instead of primitive
    * Read nodes. The utilities in
-   * {@link org.apache.beam.runners.core.construction.JavaReadViaImpulse} can 
be used to translate
-   * non-compliant pipelines.
+   * {@link org.apache.beam.runners.core.construction.JavaReadViaImpulse} can 
be used to convert
+   * bounded pipelines using the Read primitive.
    */
   public static FusedPipeline fuse(Pipeline p) {
     GreedyPipelineFuser fuser = new GreedyPipelineFuser(p);
diff --git a/runners/flink/build.gradle b/runners/flink/build.gradle
index d6673ae246d..b633f7c060a 100644
--- a/runners/flink/build.gradle
+++ b/runners/flink/build.gradle
@@ -104,6 +104,7 @@ def createValidatesRunnerTask(Map m) {
         excludeCategories 
'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders'
         excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above100MB'
         excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics'
+        excludeCategories 'org.apache.beam.sdk.testing.UsesImpulse'
         excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
       }
     } else {
@@ -111,8 +112,8 @@ def createValidatesRunnerTask(Map m) {
         includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
         excludeCategories 
'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders'
         excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above100MB'
-        excludeCategories 'org.apache.beam.sdk.testing.UsesSplittableParDo'
         excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics'
+        excludeCategories 'org.apache.beam.sdk.testing.UsesSplittableParDo'
         excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
       }
     }
diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml
index 610bc9d200d..f65ab911cd4 100644
--- a/runners/flink/pom.xml
+++ b/runners/flink/pom.xml
@@ -93,7 +93,8 @@
                     org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders,
                     org.apache.beam.sdk.testing.LargeKeys$Above100MB,
                     org.apache.beam.sdk.testing.UsesCommittedMetrics,
-                    org.apache.beam.sdk.testing.UsesTestStream,
+                    org.apache.beam.sdk.testing.UsesImpulse,
+                    org.apache.beam.sdk.testing.UsesTestStream
                   </excludedGroups>
                   <parallel>none</parallel>
                   <failIfNoTests>true</failIfNoTests>
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
index 7898fc122e5..c0102052c7d 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
@@ -44,6 +44,7 @@
 import 
org.apache.beam.runners.flink.translation.functions.FlinkStatefulDoFnFunction;
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
 import org.apache.beam.runners.flink.translation.types.KvKeySelector;
+import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat;
 import org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
@@ -103,6 +104,8 @@
       FlinkBatchPipelineTranslator.BatchTransformTranslator> TRANSLATORS = new 
HashMap<>();
 
   static {
+    TRANSLATORS.put(PTransformTranslation.IMPULSE_TRANSFORM_URN, new 
ImpulseTranslatorBatch());
+
     TRANSLATORS.put(PTransformTranslation.CREATE_VIEW_TRANSFORM_URN,
         new CreatePCollectionViewTranslatorBatch());
 
@@ -135,6 +138,28 @@ private static String 
getCurrentTransformName(FlinkBatchTranslationContext conte
     return context.getCurrentTransform().getFullName();
   }
 
+  private static class ImpulseTranslatorBatch implements
+      FlinkBatchPipelineTranslator.BatchTransformTranslator<
+          PTransform<PBegin, PCollection<byte[]>>> {
+
+    @Override
+    public void translateNode(PTransform<PBegin, PCollection<byte[]>> 
transform,
+        FlinkBatchTranslationContext context) {
+      String name = transform.getName();
+      PCollection<byte[]> output = context.getOutput(transform);
+
+      TypeInformation<WindowedValue<byte[]>> typeInformation = 
context.getTypeInfo(output);
+      DataSource<WindowedValue<byte[]>> dataSource = new DataSource<>(
+          context.getExecutionEnvironment(),
+          new ImpulseInputFormat(),
+          typeInformation,
+          name);
+
+      context.setOutputDataSet(output, dataSource);
+    }
+
+  }
+
   private static class ReadSourceTranslatorBatch<T>
       implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
         PTransform<PBegin, PCollection<T>>> {
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/ImpulseInputFormat.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/ImpulseInputFormat.java
new file mode 100644
index 00000000000..6492f1d9392
--- /dev/null
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/ImpulseInputFormat.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+
+/** Flink input format that implements impulses. */
+public class ImpulseInputFormat extends RichInputFormat<WindowedValue<byte[]>, 
GenericInputSplit> {
+
+  // Whether the input format has remaining output that has not yet been read.
+  private boolean availableOutput = false;
+
+  public ImpulseInputFormat() {}
+
+  @Override
+  public void configure(Configuration configuration) {
+    // Do nothing.
+  }
+
+  @Override
+  public BaseStatistics getStatistics(BaseStatistics baseStatistics) {
+    return new BaseStatistics() {
+      @Override
+      public long getTotalInputSize() {
+        return 1;
+      }
+
+      @Override
+      public long getNumberOfRecords() {
+        return 1;
+      }
+
+      @Override
+      public float getAverageRecordWidth() {
+        return 1;
+      }
+    };
+  }
+
+  @Override
+  public GenericInputSplit[] createInputSplits(int numSplits) {
+    // Always return a single split because only one global "impulse" will 
ever be sent.
+    return new GenericInputSplit[]{new GenericInputSplit(1, 1)};
+  }
+
+  @Override
+  public InputSplitAssigner getInputSplitAssigner(GenericInputSplit[] 
genericInputSplits) {
+    return new DefaultInputSplitAssigner(genericInputSplits);
+  }
+
+  @Override
+  public void open(GenericInputSplit genericInputSplit) {
+    availableOutput = true;
+  }
+
+  @Override
+  public boolean reachedEnd() {
+    return !availableOutput;
+  }
+
+  @Override
+  public WindowedValue<byte[]> nextRecord(WindowedValue<byte[]> windowedValue) 
{
+    checkState(availableOutput);
+    availableOutput = false;
+    if (windowedValue != null) {
+      return windowedValue;
+    }
+    return WindowedValue.valueInGlobalWindow(new byte[0]);
+  }
+
+  @Override
+  public void close() {
+    // Do nothing.
+  }
+
+}
diff --git a/runners/gearpump/pom.xml b/runners/gearpump/pom.xml
index 48b16ef52d2..e3aeb22d8d6 100644
--- a/runners/gearpump/pom.xml
+++ b/runners/gearpump/pom.xml
@@ -75,6 +75,7 @@
                     org.apache.beam.sdk.testing.UsesSplittableParDo,
                     org.apache.beam.sdk.testing.UsesAttemptedMetrics,
                     org.apache.beam.sdk.testing.UsesCommittedMetrics,
+                    org.apache.beam.sdk.testing.UsesImpulse,
                     org.apache.beam.sdk.testing.UsesTestStream,
                     org.apache.beam.sdk.testing.UsesCustomWindowMerging
                   </excludedGroups>
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index a9b90dc999d..e17635a6929 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -76,6 +76,7 @@
                   <excludedGroups>
                     org.apache.beam.sdk.testing.UsesSplittableParDo,
                     org.apache.beam.sdk.testing.UsesCommittedMetrics,
+                    org.apache.beam.sdk.testing.UsesImpulse,
                     org.apache.beam.sdk.testing.UsesTestStream,
                     org.apache.beam.sdk.testing.UsesCustomWindowMerging
                   </excludedGroups>
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesImpulse.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesImpulse.java
new file mode 100644
index 00000000000..3ccbe553160
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesImpulse.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+/** Category for tests that use {@link org.apache.beam.sdk.transforms.Impulse} 
transformations. */
+public class UsesImpulse {}
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ImpulseTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ImpulseTest.java
new file mode 100644
index 00000000000..e0ab2e52ac7
--- /dev/null
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ImpulseTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import java.util.Arrays;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.UsesImpulse;
+import org.apache.beam.sdk.testing.ValidatesRunner;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for Impulse. */
+@RunWith(JUnit4.class)
+public class ImpulseTest {
+  @Rule
+  public transient TestPipeline p = TestPipeline.create();
+
+  @Test
+  @Category({ValidatesRunner.class, UsesImpulse.class})
+  public void testImpulse() {
+    PCollection<Integer> result = p.apply(Impulse.create())
+        .apply(FlatMapElements.into(TypeDescriptors.integers())
+            .via(impulse -> Arrays.asList(1, 2, 3)));
+    PAssert.that(result).containsInAnyOrder(1, 2, 3);
+    p.run().waitUntilFinish();
+  }
+
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 90921)
    Time Spent: 4h 50m  (was: 4h 40m)

> Flink supports chaining/fusion of single-SDK stages
> ---------------------------------------------------
>
>                 Key: BEAM-2898
>                 URL: https://issues.apache.org/jira/browse/BEAM-2898
>             Project: Beam
>          Issue Type: Sub-task
>          Components: runner-flink
>            Reporter: Henning Rohde
>            Priority: Major
>              Labels: portability
>          Time Spent: 4h 50m
>  Remaining Estimate: 0h
>
> The Fn API supports fused stages, which avoids unnecessarily round-tripping 
> the data over the Fn API between stages. The Flink runner should use that 
> capability for better performance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to