[ 
https://issues.apache.org/jira/browse/BEAM-2421?focusedWorklogId=80921&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-80921
 ]

ASF GitHub Bot logged work on BEAM-2421:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 15/Mar/18 18:48
            Start Date: 15/Mar/18 18:48
    Worklog Time Spent: 10m 
      Work Description: tgroh closed pull request #4776: [BEAM-2421] Add 
impulse override for read transforms
URL: https://github.com/apache/beam/pull/4776
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/JavaReadViaImpulse.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/JavaReadViaImpulse.java
index f72227d09b9..40e08360087 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/JavaReadViaImpulse.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/JavaReadViaImpulse.java
@@ -20,9 +20,19 @@
 
 import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.Map;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.runners.PTransformMatcher;
+import org.apache.beam.sdk.runners.PTransformOverride;
+import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Impulse;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -30,6 +40,8 @@
 import org.apache.beam.sdk.transforms.Reshuffle;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
 
 /**
  * Read from a Java {@link BoundedSource} via the {@link Impulse} and {@link 
ParDo} primitive
@@ -42,6 +54,16 @@
     return new BoundedReadViaImpulse<>(source);
   }
 
+  public static PTransformOverride boundedOverride() {
+    return PTransformOverride.of(boundedMatcher(), new 
BoundedOverrideFactory<>());
+  }
+
+  private static PTransformMatcher boundedMatcher() {
+    return 
PTransformMatchers.urnEqualTo(PTransformTranslation.READ_TRANSFORM_URN)
+        .and(transform ->
+            ReadTranslation.sourceIsBounded(transform) == 
PCollection.IsBounded.BOUNDED);
+  }
+
   private static class BoundedReadViaImpulse<T> extends PTransform<PBegin, 
PCollection<T>> {
     private final BoundedSource<T> source;
 
@@ -54,13 +76,37 @@ private BoundedReadViaImpulse(BoundedSource<T> source) {
       return input
           .apply(Impulse.create())
           .apply(ParDo.of(new SplitBoundedSourceFn<>(source, 
DEFAULT_BUNDLE_SIZE)))
-          .setCoder((Coder<BoundedSource<T>>) SerializableCoder.of((Class) 
BoundedSource.class))
+          .setCoder(new BoundedSourceCoder<>())
           .apply(Reshuffle.viaRandomKey())
           .apply(ParDo.of(new ReadFromBoundedSourceFn<>()))
           .setCoder(source.getOutputCoder());
     }
   }
 
+  private static class BoundedOverrideFactory<T> implements 
PTransformOverrideFactory<
+      PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> {
+
+    @Override
+    public PTransformReplacement<PBegin, PCollection<T>> 
getReplacementTransform(
+        AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, 
PCollection<T>>> transform) {
+      PBegin input = PBegin.in(transform.getPipeline());
+      BoundedSource<T> source;
+      try {
+        source = ReadTranslation.boundedSourceFromTransform(transform);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      return PTransformReplacement.of(input, bounded(source));
+    }
+
+    @Override
+    public Map<PValue, ReplacementOutput> mapOutputs(Map<TupleTag<?>, PValue> 
outputs,
+                                                     PCollection<T> newOutput) 
{
+      return ReplacementOutputs.singleton(outputs, newOutput);
+    }
+
+  }
+
   @VisibleForTesting
   static class SplitBoundedSourceFn<T> extends DoFn<byte[], BoundedSource<T>> {
     private final BoundedSource<T> source;
@@ -92,4 +138,37 @@ public void readSoruce(ProcessContext ctxt) throws 
IOException {
       }
     }
   }
+
+  /**
+   * A {@link Coder} for {@link BoundedSource}s that wraps a {@link 
SerializableCoder}. We cannot
+   * safely use an unwrapped SerializableCoder because
+   * {@link SerializableCoder#structuralValue(Serializable)} assumes that 
coded elements support
+   * object equality (https://issues.apache.org/jira/browse/BEAM-3807). By 
default, Coders compare
+   * equality by serialized bytes, which we want in this case. It is usually 
safe to depend on coded
+   * representation here because we only compare objects on bundle commit, 
which compares
+   * serializations of the same object instance.
+   *
+   * <p>BoundedSources are generally not used as PCollection elements, so we 
do not expose this
+   * coder for wider use.
+   */
+  @VisibleForTesting
+  static class BoundedSourceCoder<T> extends CustomCoder<BoundedSource<T>> {
+    private final Coder<BoundedSource<T>> coder;
+
+    BoundedSourceCoder() {
+      coder = (Coder<BoundedSource<T>>) SerializableCoder.of((Class) 
BoundedSource.class);
+    }
+
+    @Override
+    public void encode(BoundedSource<T> value, OutputStream outStream) throws 
CoderException,
+        IOException {
+      coder.encode(value, outStream);
+    }
+
+    @Override
+    public BoundedSource<T> decode(InputStream inStream) throws 
CoderException, IOException {
+      return coder.decode(inStream);
+    }
+
+  }
 }
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuser.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuser.java
index 3f053c46192..fa09bacfa52 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuser.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuser.java
@@ -69,6 +69,15 @@ private GreedyPipelineFuser(Pipeline p) {
     fusePipeline(groupSiblings(rootConsumers));
   }
 
+  /**
+   * Fuses a {@link Pipeline} into a collection of {@link ExecutableStage}s.
+   *
+   * <p>This fuser expects each ExecutableStage to have exactly one input. 
This means that pipelines
+   * must be rooted at Impulse, or other runner-executed primitive transforms, 
instead of primitive
+   * Read nodes. The utilities in
+   * {@link org.apache.beam.runners.core.construction.JavaReadViaImpulse} can 
be used to translate
+   * non-compliant pipelines.
+   */
   public static FusedPipeline fuse(Pipeline p) {
     GreedyPipelineFuser fuser = new GreedyPipelineFuser(p);
     return FusedPipeline.of(fuser.stages, fuser.unfusedTransforms);
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/JavaReadViaImpulseTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/JavaReadViaImpulseTest.java
index 7ef6ca23b11..3a5129240ab 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/JavaReadViaImpulseTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/JavaReadViaImpulseTest.java
@@ -20,6 +20,7 @@
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertThat;
 
 import com.google.common.collect.Iterables;
@@ -28,19 +29,24 @@
 import java.util.List;
 import 
org.apache.beam.runners.core.construction.JavaReadViaImpulse.ReadFromBoundedSourceFn;
 import 
org.apache.beam.runners.core.construction.JavaReadViaImpulse.SplitBoundedSourceFn;
+import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.CountingSource;
+import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.Impulse;
+import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PValue;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -72,7 +78,8 @@ public void testSplitSourceFn() {
                  * Split the source of 1 million longs into bundles of size 
300 thousand bytes.
                  * This should produce some small number of bundles, but more 
than one.
                  */
-                ParDo.of(new 
SplitBoundedSourceFn<>(CountingSource.upTo(1_000_000L), 300_000L)));
+                ParDo.of(new 
SplitBoundedSourceFn<>(CountingSource.upTo(1_000_000L), 300_000L)))
+                .setCoder(new JavaReadViaImpulse.BoundedSourceCoder<>());
 
     PAssert.that(splits)
         .satisfies(
@@ -89,14 +96,50 @@ public void testSplitSourceFn() {
   public void testReadFromSourceFn() {
     BoundedSource<Long> source = CountingSource.upTo(10L);
     PCollection<BoundedSource<Long>> sourcePC =
-        (PCollection)
-            p.apply(Create.of(source).withCoder(SerializableCoder.of((Class) 
BoundedSource.class)));
-    PCollection<Long> elems = sourcePC.apply(ParDo.of(new 
ReadFromBoundedSourceFn<>()));
+          p.apply(Create.of(source)
+              .withCoder(new JavaReadViaImpulse.BoundedSourceCoder<>()));
+    PCollection<Long> elems = sourcePC.apply(ParDo.of(new 
ReadFromBoundedSourceFn<>()))
+        .setCoder(VarLongCoder.of());
 
     PAssert.that(elems).containsInAnyOrder(0L, 9L, 8L, 1L, 2L, 7L, 6L, 3L, 4L, 
5L);
     p.run();
   }
 
+  @Test
+  @Category(NeedsRunner.class)
+  public void testReadToImpulseOverride() {
+    BoundedSource<Long> source = CountingSource.upTo(10L);
+    // Use an explicit read transform to ensure the override is exercised.
+    PCollection<Long> input = p.apply(Read.from(source));
+    PAssert.that(input).containsInAnyOrder(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 
9L);
+    
p.replaceAll(Collections.singletonList(JavaReadViaImpulse.boundedOverride()));
+    p.traverseTopologically(new Pipeline.PipelineVisitor() {
+      @Override
+      public void enterPipeline(Pipeline p) {}
+
+      @Override
+      public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node 
node) {
+        assertNotReadTransform(node.getTransform());
+        return CompositeBehavior.ENTER_TRANSFORM;
+      }
+
+      @Override
+      public void leaveCompositeTransform(TransformHierarchy.Node node) {}
+
+      @Override
+      public void visitPrimitiveTransform(TransformHierarchy.Node node) {
+        assertNotReadTransform(node.getTransform());
+      }
+
+      @Override
+      public void visitValue(PValue value, TransformHierarchy.Node producer) {}
+
+      @Override
+      public void leavePipeline(Pipeline pipeline) {}
+    });
+    p.run();
+  }
+
   @Test
   public void testOutputCoder() {
     p.enableAbandonedNodeEnforcement(false);
@@ -106,6 +149,13 @@ public void testOutputCoder() {
         equalTo(BigEndianIntegerCoder.of()));
   }
 
+  private static void assertNotReadTransform(PTransform<?, ?> transform) {
+    if (transform != null) {
+      String urn = PTransformTranslation.urnForTransformOrNull(transform);
+      assertThat(urn, not(equalTo(PTransformTranslation.READ_TRANSFORM_URN)));
+    }
+  }
+
   private static class BigEndianIntegerSource extends BoundedSource<Integer> {
     @Override
     public List<? extends BoundedSource<Integer>> split(
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java
index 76bdddedd0b..6387bf0435b 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java
@@ -975,7 +975,12 @@ public void compositesIgnored() {
                     .build())
             .build();
     FusedPipeline fused =
-        
GreedyPipelineFuser.fuse(Pipeline.newBuilder().setComponents(components).build());
+        GreedyPipelineFuser.fuse(
+            Pipeline.newBuilder()
+                .addRootTransformIds("impulse")
+                .addRootTransformIds("compositeMultiLang")
+                .setComponents(components)
+                .build());
 
     // Impulse is the runner transform
     assertThat(fused.getRunnerExecutedTransforms(), hasSize(1));
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformMatcher.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformMatcher.java
index 350d0d98f7a..9821596052d 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformMatcher.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformMatcher.java
@@ -32,4 +32,12 @@
 @Experimental(Kind.CORE_RUNNERS_ONLY)
 public interface PTransformMatcher {
   boolean matches(AppliedPTransform<?, ?, ?> application);
+
+  default PTransformMatcher and(PTransformMatcher matcher) {
+    return application -> this.matches(application) && 
matcher.matches(application);
+  }
+
+  default PTransformMatcher or(PTransformMatcher matcher) {
+    return application -> this.matches(application) || 
matcher.matches(application);
+  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 80921)
    Time Spent: 4.5h  (was: 4h 20m)

> Migrate Apache Beam to use impulse primitive as the only root primitive
> -----------------------------------------------------------------------
>
>                 Key: BEAM-2421
>                 URL: https://issues.apache.org/jira/browse/BEAM-2421
>             Project: Beam
>          Issue Type: Improvement
>          Components: beam-model
>            Reporter: Luke Cwik
>            Priority: Major
>              Labels: portability
>          Time Spent: 4.5h
>  Remaining Estimate: 0h
>
> The impulse source emits a single byte array element within the global window.
> This is in preparation for using SDF as the replacement for different bounded 
> and unbounded source implementations.
> Before:
> Read(Source) -> ParDo -> ...
> Impulse -> SDF(Source) -> ParDo -> ...



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to