[
https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=133312&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-133312
]
ASF GitHub Bot logged work on BEAM-4658:
----------------------------------------
Author: ASF GitHub Bot
Created on: 09/Aug/18 21:44
Start Date: 09/Aug/18 21:44
Worklog Time Spent: 10m
Work Description: lukecwik closed pull request #6187: [BEAM-4658] Follow
up on PR comments from #6050
URL: https://github.com/apache/beam/pull/6187
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java
index 3215691363c..2ddd80afae8 100644
---
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java
+++
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java
@@ -81,6 +81,7 @@
private static final CompatibilityChecker DEFAULT_COMPATIBILITY_CHECKER =
GreedyPCollectionFusers::unknownTransformCompatibility;
+ /** Returns true if the PTransform node for the given input PCollection can
be fused across. */
public static boolean canFuse(
PTransformNode transformNode,
Environment environment,
@@ -92,6 +93,10 @@ public static boolean canFuse(
.canFuse(transformNode, environment, candidate, stagePCollections,
pipeline);
}
+ /**
+ * Returns true if the two PTransforms are compatible such that they can be
executed in the same
+ * environment.
+ */
public static boolean isCompatible(
PTransformNode left, PTransformNode right, QueryablePipeline pipeline) {
CompatibilityChecker leftChecker =
@@ -184,7 +189,8 @@ private static boolean canFuseParDo(
private static boolean parDoCompatibility(
PTransformNode parDo, PTransformNode other, QueryablePipeline pipeline) {
- // Implicitly true if we are attempting to fuse against oneself. This is
for timer PCollection which create a loop.
+ // Implicitly true if we are attempting to fuse against oneself. This case
comes up for
+ // PCollections representing timers since they create a self-loop in the
graph.
return parDo.equals(other)
// This is a convenience rather than a strict requirement. In general,
a ParDo that consumes
// side inputs can be fused with other transforms in the same
environment which are not
diff --git
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/TimerReference.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/TimerReference.java
index 36dd3284a6a..0c6bf3a36bc 100644
---
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/TimerReference.java
+++
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/TimerReference.java
@@ -21,8 +21,9 @@
import org.apache.beam.model.pipeline.v1.RunnerApi;
/**
- * A reference to a timer. This includes the PTransform that references the
timer as well as the
- * PCollection referenced. Both are necessary in order to fully resolve a
timer.
+ * Contains references to components relevant for runners during execution for
timers. The
+ * referenced PTransform specifies the timer specification while the
PCollection specifies the
+ * encoding representation.
*/
@AutoValue
public abstract class TimerReference {
diff --git
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStageTest.java
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStageTest.java
index 4920e17cba9..41b1e6c1d4a 100644
---
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStageTest.java
+++
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStageTest.java
@@ -49,9 +49,9 @@ public void ofFullComponentsOnlyHasStagePTransforms() throws
Exception {
PTransform.newBuilder()
.putInputs("input", "input.out")
.putInputs("side_input", "sideInput.in")
- .putInputs("timer", "timer.out")
+ .putInputs("timer", "timer.pc")
.putOutputs("output", "output.out")
- .putOutputs("timer", "timer.out")
+ .putOutputs("timer", "timer.pc")
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
@@ -66,7 +66,7 @@ public void ofFullComponentsOnlyHasStagePTransforms() throws
Exception {
.build();
PCollection input =
PCollection.newBuilder().setUniqueName("input.out").build();
PCollection sideInput =
PCollection.newBuilder().setUniqueName("sideInput.in").build();
- PCollection timer =
PCollection.newBuilder().setUniqueName("timer.out").build();
+ PCollection timer =
PCollection.newBuilder().setUniqueName("timer.pc").build();
PCollection output =
PCollection.newBuilder().setUniqueName("output.out").build();
Components components =
@@ -75,7 +75,7 @@ public void ofFullComponentsOnlyHasStagePTransforms() throws
Exception {
.putTransforms("other_pt",
PTransform.newBuilder().setUniqueName("other").build())
.putPcollections("input.out", input)
.putPcollections("sideInput.in", sideInput)
- .putPcollections("timer.out", timer)
+ .putPcollections("timer.pc", timer)
.putPcollections("output.out", output)
.putEnvironments("foo", env)
.build();
@@ -88,7 +88,7 @@ public void ofFullComponentsOnlyHasStagePTransforms() throws
Exception {
UserStateReference.of(
transformNode, "user_state", PipelineNode.pCollection("input.out",
input));
TimerReference timerRef =
- TimerReference.of(transformNode, "timer",
PipelineNode.pCollection("timer.out", timer));
+ TimerReference.of(transformNode, "timer",
PipelineNode.pCollection("timer.pc", timer));
ImmutableExecutableStage stage =
ImmutableExecutableStage.ofFullComponents(
components,
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 133312)
Time Spent: 4h 50m (was: 4h 40m)
> Update pipeline representation in runner support libraries to handle timers
> ---------------------------------------------------------------------------
>
> Key: BEAM-4658
> URL: https://issues.apache.org/jira/browse/BEAM-4658
> Project: Beam
> Issue Type: Sub-task
> Components: runner-core
> Reporter: Luke Cwik
> Assignee: Luke Cwik
> Priority: Major
> Fix For: 2.7.0
>
> Time Spent: 4h 50m
> Remaining Estimate: 0h
>
> Expose relevant timer information on *ExecutableStage* to runners
> Update fusion logic to handle timers.
> Provide execution time interface to fire timers into *RemoteBundle*s and also
> to receive new timers that are being set.
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)