[
https://issues.apache.org/jira/browse/BEAM-2421?focusedWorklogId=80921&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-80921
]
ASF GitHub Bot logged work on BEAM-2421:
----------------------------------------
Author: ASF GitHub Bot
Created on: 15/Mar/18 18:48
Start Date: 15/Mar/18 18:48
Worklog Time Spent: 10m
Work Description: tgroh closed pull request #4776: [BEAM-2421] Add
impulse override for read transforms
URL: https://github.com/apache/beam/pull/4776
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/JavaReadViaImpulse.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/JavaReadViaImpulse.java
index f72227d09b9..40e08360087 100644
---
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/JavaReadViaImpulse.java
+++
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/JavaReadViaImpulse.java
@@ -20,9 +20,19 @@
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.Map;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.runners.PTransformMatcher;
+import org.apache.beam.sdk.runners.PTransformOverride;
+import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.PTransform;
@@ -30,6 +40,8 @@
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
/**
* Read from a Java {@link BoundedSource} via the {@link Impulse} and {@link
ParDo} primitive
@@ -42,6 +54,16 @@
return new BoundedReadViaImpulse<>(source);
}
+ public static PTransformOverride boundedOverride() {
+ return PTransformOverride.of(boundedMatcher(), new
BoundedOverrideFactory<>());
+ }
+
+ private static PTransformMatcher boundedMatcher() {
+ return
PTransformMatchers.urnEqualTo(PTransformTranslation.READ_TRANSFORM_URN)
+ .and(transform ->
+ ReadTranslation.sourceIsBounded(transform) ==
PCollection.IsBounded.BOUNDED);
+ }
+
private static class BoundedReadViaImpulse<T> extends PTransform<PBegin,
PCollection<T>> {
private final BoundedSource<T> source;
@@ -54,13 +76,37 @@ private BoundedReadViaImpulse(BoundedSource<T> source) {
return input
.apply(Impulse.create())
.apply(ParDo.of(new SplitBoundedSourceFn<>(source,
DEFAULT_BUNDLE_SIZE)))
- .setCoder((Coder<BoundedSource<T>>) SerializableCoder.of((Class)
BoundedSource.class))
+ .setCoder(new BoundedSourceCoder<>())
.apply(Reshuffle.viaRandomKey())
.apply(ParDo.of(new ReadFromBoundedSourceFn<>()))
.setCoder(source.getOutputCoder());
}
}
+ private static class BoundedOverrideFactory<T> implements
PTransformOverrideFactory<
+ PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> {
+
+ @Override
+ public PTransformReplacement<PBegin, PCollection<T>>
getReplacementTransform(
+ AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin,
PCollection<T>>> transform) {
+ PBegin input = PBegin.in(transform.getPipeline());
+ BoundedSource<T> source;
+ try {
+ source = ReadTranslation.boundedSourceFromTransform(transform);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return PTransformReplacement.of(input, bounded(source));
+ }
+
+ @Override
+ public Map<PValue, ReplacementOutput> mapOutputs(Map<TupleTag<?>, PValue>
outputs,
+ PCollection<T> newOutput)
{
+ return ReplacementOutputs.singleton(outputs, newOutput);
+ }
+
+ }
+
@VisibleForTesting
static class SplitBoundedSourceFn<T> extends DoFn<byte[], BoundedSource<T>> {
private final BoundedSource<T> source;
@@ -92,4 +138,37 @@ public void readSoruce(ProcessContext ctxt) throws
IOException {
}
}
}
+
+ /**
+ * A {@link Coder} for {@link BoundedSource}s that wraps a {@link
SerializableCoder}. We cannot
+ * safely use an unwrapped SerializableCoder because
+ * {@link SerializableCoder#structuralValue(Serializable)} assumes that
coded elements support
+ * object equality (https://issues.apache.org/jira/browse/BEAM-3807). By
default, Coders compare
+ * equality by serialized bytes, which we want in this case. It is usually
safe to depend on coded
+ * representation here because we only compare objects on bundle commit,
which compares
+ * serializations of the same object instance.
+ *
+ * <p>BoundedSources are generally not used as PCollection elements, so we
do not expose this
+ * coder for wider use.
+ */
+ @VisibleForTesting
+ static class BoundedSourceCoder<T> extends CustomCoder<BoundedSource<T>> {
+ private final Coder<BoundedSource<T>> coder;
+
+ BoundedSourceCoder() {
+ coder = (Coder<BoundedSource<T>>) SerializableCoder.of((Class)
BoundedSource.class);
+ }
+
+ @Override
+ public void encode(BoundedSource<T> value, OutputStream outStream) throws
CoderException,
+ IOException {
+ coder.encode(value, outStream);
+ }
+
+ @Override
+ public BoundedSource<T> decode(InputStream inStream) throws
CoderException, IOException {
+ return coder.decode(inStream);
+ }
+
+ }
}
diff --git
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuser.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuser.java
index 3f053c46192..fa09bacfa52 100644
---
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuser.java
+++
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuser.java
@@ -69,6 +69,15 @@ private GreedyPipelineFuser(Pipeline p) {
fusePipeline(groupSiblings(rootConsumers));
}
+ /**
+ * Fuses a {@link Pipeline} into a collection of {@link ExecutableStage}s.
+ *
+ * <p>This fuser expects each ExecutableStage to have exactly one input.
This means that pipelines
+ * must be rooted at Impulse, or other runner-executed primitive transforms,
instead of primitive
+ * Read nodes. The utilities in
+ * {@link org.apache.beam.runners.core.construction.JavaReadViaImpulse} can
be used to translate
+ * non-compliant pipelines.
+ */
public static FusedPipeline fuse(Pipeline p) {
GreedyPipelineFuser fuser = new GreedyPipelineFuser(p);
return FusedPipeline.of(fuser.stages, fuser.unfusedTransforms);
diff --git
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/JavaReadViaImpulseTest.java
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/JavaReadViaImpulseTest.java
index 7ef6ca23b11..3a5129240ab 100644
---
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/JavaReadViaImpulseTest.java
+++
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/JavaReadViaImpulseTest.java
@@ -20,6 +20,7 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertThat;
import com.google.common.collect.Iterables;
@@ -28,19 +29,24 @@
import java.util.List;
import
org.apache.beam.runners.core.construction.JavaReadViaImpulse.ReadFromBoundedSourceFn;
import
org.apache.beam.runners.core.construction.JavaReadViaImpulse.SplitBoundedSourceFn;
+import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.CountingSource;
+import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Impulse;
+import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PValue;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -72,7 +78,8 @@ public void testSplitSourceFn() {
* Split the source of 1 million longs into bundles of size
300 thousand bytes.
* This should produce some small number of bundles, but more
than one.
*/
- ParDo.of(new
SplitBoundedSourceFn<>(CountingSource.upTo(1_000_000L), 300_000L)));
+ ParDo.of(new
SplitBoundedSourceFn<>(CountingSource.upTo(1_000_000L), 300_000L)))
+ .setCoder(new JavaReadViaImpulse.BoundedSourceCoder<>());
PAssert.that(splits)
.satisfies(
@@ -89,14 +96,50 @@ public void testSplitSourceFn() {
public void testReadFromSourceFn() {
BoundedSource<Long> source = CountingSource.upTo(10L);
PCollection<BoundedSource<Long>> sourcePC =
- (PCollection)
- p.apply(Create.of(source).withCoder(SerializableCoder.of((Class)
BoundedSource.class)));
- PCollection<Long> elems = sourcePC.apply(ParDo.of(new
ReadFromBoundedSourceFn<>()));
+ p.apply(Create.of(source)
+ .withCoder(new JavaReadViaImpulse.BoundedSourceCoder<>()));
+ PCollection<Long> elems = sourcePC.apply(ParDo.of(new
ReadFromBoundedSourceFn<>()))
+ .setCoder(VarLongCoder.of());
PAssert.that(elems).containsInAnyOrder(0L, 9L, 8L, 1L, 2L, 7L, 6L, 3L, 4L,
5L);
p.run();
}
+ @Test
+ @Category(NeedsRunner.class)
+ public void testReadToImpulseOverride() {
+ BoundedSource<Long> source = CountingSource.upTo(10L);
+ // Use an explicit read transform to ensure the override is exercised.
+ PCollection<Long> input = p.apply(Read.from(source));
+ PAssert.that(input).containsInAnyOrder(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L,
9L);
+
p.replaceAll(Collections.singletonList(JavaReadViaImpulse.boundedOverride()));
+ p.traverseTopologically(new Pipeline.PipelineVisitor() {
+ @Override
+ public void enterPipeline(Pipeline p) {}
+
+ @Override
+ public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node
node) {
+ assertNotReadTransform(node.getTransform());
+ return CompositeBehavior.ENTER_TRANSFORM;
+ }
+
+ @Override
+ public void leaveCompositeTransform(TransformHierarchy.Node node) {}
+
+ @Override
+ public void visitPrimitiveTransform(TransformHierarchy.Node node) {
+ assertNotReadTransform(node.getTransform());
+ }
+
+ @Override
+ public void visitValue(PValue value, TransformHierarchy.Node producer) {}
+
+ @Override
+ public void leavePipeline(Pipeline pipeline) {}
+ });
+ p.run();
+ }
+
@Test
public void testOutputCoder() {
p.enableAbandonedNodeEnforcement(false);
@@ -106,6 +149,13 @@ public void testOutputCoder() {
equalTo(BigEndianIntegerCoder.of()));
}
+ private static void assertNotReadTransform(PTransform<?, ?> transform) {
+ if (transform != null) {
+ String urn = PTransformTranslation.urnForTransformOrNull(transform);
+ assertThat(urn, not(equalTo(PTransformTranslation.READ_TRANSFORM_URN)));
+ }
+ }
+
private static class BigEndianIntegerSource extends BoundedSource<Integer> {
@Override
public List<? extends BoundedSource<Integer>> split(
diff --git
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java
index 76bdddedd0b..6387bf0435b 100644
---
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java
+++
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java
@@ -975,7 +975,12 @@ public void compositesIgnored() {
.build())
.build();
FusedPipeline fused =
-
GreedyPipelineFuser.fuse(Pipeline.newBuilder().setComponents(components).build());
+ GreedyPipelineFuser.fuse(
+ Pipeline.newBuilder()
+ .addRootTransformIds("impulse")
+ .addRootTransformIds("compositeMultiLang")
+ .setComponents(components)
+ .build());
// Impulse is the runner transform
assertThat(fused.getRunnerExecutedTransforms(), hasSize(1));
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformMatcher.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformMatcher.java
index 350d0d98f7a..9821596052d 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformMatcher.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformMatcher.java
@@ -32,4 +32,12 @@
@Experimental(Kind.CORE_RUNNERS_ONLY)
public interface PTransformMatcher {
boolean matches(AppliedPTransform<?, ?, ?> application);
+
+ default PTransformMatcher and(PTransformMatcher matcher) {
+ return application -> this.matches(application) &&
matcher.matches(application);
+ }
+
+ default PTransformMatcher or(PTransformMatcher matcher) {
+ return application -> this.matches(application) ||
matcher.matches(application);
+ }
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 80921)
Time Spent: 4.5h (was: 4h 20m)
> Migrate Apache Beam to use impulse primitive as the only root primitive
> -----------------------------------------------------------------------
>
> Key: BEAM-2421
> URL: https://issues.apache.org/jira/browse/BEAM-2421
> Project: Beam
> Issue Type: Improvement
> Components: beam-model
> Reporter: Luke Cwik
> Priority: Major
> Labels: portability
> Time Spent: 4.5h
> Remaining Estimate: 0h
>
> The impulse source emits a single byte array element within the global window.
> This is in preparation for using SDF as the replacement for different bounded
> and unbounded source implementations.
> Before:
> Read(Source) -> ParDo -> ...
> Impulse -> SDF(Source) -> ParDo -> ...
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)