[
https://issues.apache.org/jira/browse/BEAM-4135?focusedWorklogId=93061&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93061
]
ASF GitHub Bot logged work on BEAM-4135:
----------------------------------------
Author: ASF GitHub Bot
Created on: 20/Apr/18 03:56
Start Date: 20/Apr/18 03:56
Worklog Time Spent: 10m
Work Description: tgroh closed pull request #5177: [BEAM-4135] Stop
taking the whole result in WatermarkManager
URL: https://github.com/apache/beam/pull/5177
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
index 8f0dd423125..bfa65cd2d8e 100644
---
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
+++
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
@@ -31,7 +31,6 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import javax.annotation.Nullable;
import org.apache.beam.runners.core.ReadyCheckingSideInputReader;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.runners.core.TimerInternals.TimerData;
@@ -141,7 +140,7 @@ public void initialize(
* @return the committed bundles contained within the handled {@code result}
*/
public CommittedResult<AppliedPTransform<?, ?, ?>> handleResult(
- @Nullable CommittedBundle<?> completedBundle,
+ CommittedBundle<?> completedBundle,
Iterable<TimerData> completedTimers,
TransformResult<?> result) {
Iterable<? extends CommittedBundle<?>> committedBundles =
@@ -162,9 +161,7 @@ public void initialize(
CopyOnAccessInMemoryStateInternals theirState = result.getState();
if (theirState != null) {
CopyOnAccessInMemoryStateInternals committedState = theirState.commit();
- StepAndKey stepAndKey =
- StepAndKey.of(
- result.getTransform(), completedBundle == null ? null :
completedBundle.getKey());
+ StepAndKey stepAndKey = StepAndKey.of(result.getTransform(),
completedBundle.getKey());
if (!committedState.isEmpty()) {
applicationStateInternals.put(stepAndKey, committedState);
} else {
@@ -176,7 +173,9 @@ public void initialize(
watermarkManager.updateWatermarks(
completedBundle,
result.getTimerUpdate().withCompletedTimers(completedTimers),
- committedResult,
+ committedResult.getExecutable(),
+ committedResult.getUnprocessedInputs().orNull(),
+ committedResult.getOutputs(),
result.getWatermarkHold());
return committedResult;
}
@@ -188,7 +187,7 @@ public void initialize(
* {@link Optional}.
*/
private Optional<? extends CommittedBundle<?>> getUnprocessedInput(
- @Nullable CommittedBundle<?> completedBundle, TransformResult<?> result)
{
+ CommittedBundle<?> completedBundle, TransformResult<?> result) {
if (completedBundle == null ||
Iterables.isEmpty(result.getUnprocessedElements())) {
return Optional.absent();
}
diff --git
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/QuiescenceDriver.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/QuiescenceDriver.java
index 9ada00e5c8f..e3632697164 100644
---
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/QuiescenceDriver.java
+++
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/QuiescenceDriver.java
@@ -162,12 +162,12 @@ private void fireTimers() {
transformTimers.getKey(),
(PCollection)
Iterables.getOnlyElement(
-
transformTimers.getTransform().getInputs().values()))
+
transformTimers.getExecutable().getInputs().values()))
.add(WindowedValue.valueInGlobalWindow(work))
.commit(evaluationContext.now());
outstandingWork.incrementAndGet();
bundleProcessor.process(
- bundle, transformTimers.getTransform(), new
TimerIterableCompletionCallback(delivery));
+ bundle, transformTimers.getExecutable(), new
TimerIterableCompletionCallback(delivery));
state.set(ExecutorState.ACTIVE);
}
} catch (Exception e) {
diff --git
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index 882bdc5cbc9..86e904655ce 100644
---
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -74,9 +74,9 @@
* {@link AppliedPTransform AppliedPTransforms} and a map of {@link
PCollection PCollections} to
* all the {@link AppliedPTransform AppliedPTransforms} that consume them at
construction time.
*
- * <p>Whenever a root {@link AppliedPTransform transform} produces elements,
the
+ * <p>Whenever a root {@link AppliedPTransform executable} produces elements,
the
* {@link WatermarkManager} is provided with the produced elements and the
output watermark
- * of the producing {@link AppliedPTransform transform}. The
+ * of the producing {@link AppliedPTransform executable}. The
* {@link WatermarkManager watermark manager} is responsible for computing the
watermarks
* of all {@link AppliedPTransform transforms} that consume one or more
* {@link PCollection PCollections}.
@@ -813,35 +813,35 @@ private TransformWatermarks getValueWatermark(CollectionT
value) {
return getTransformWatermark(graph.getProducer(value));
}
- private TransformWatermarks getTransformWatermark(ExecutableT transform) {
- TransformWatermarks wms = transformToWatermarks.get(transform);
+ private TransformWatermarks getTransformWatermark(ExecutableT executable) {
+ TransformWatermarks wms = transformToWatermarks.get(executable);
if (wms == null) {
- List<Watermark> inputCollectionWatermarks =
getInputWatermarks(transform);
+ List<Watermark> inputCollectionWatermarks =
getInputWatermarks(executable);
AppliedPTransformInputWatermark inputWatermark =
new AppliedPTransformInputWatermark(inputCollectionWatermarks);
AppliedPTransformOutputWatermark outputWatermark =
new AppliedPTransformOutputWatermark(inputWatermark);
SynchronizedProcessingTimeInputWatermark inputProcessingWatermark =
- new
SynchronizedProcessingTimeInputWatermark(getInputProcessingWatermarks(transform));
+ new
SynchronizedProcessingTimeInputWatermark(getInputProcessingWatermarks(executable));
SynchronizedProcessingTimeOutputWatermark outputProcessingWatermark =
new
SynchronizedProcessingTimeOutputWatermark(inputProcessingWatermark);
wms =
new TransformWatermarks(
- transform,
+ executable,
inputWatermark,
outputWatermark,
inputProcessingWatermark,
outputProcessingWatermark);
- transformToWatermarks.put(transform, wms);
+ transformToWatermarks.put(executable, wms);
}
return wms;
}
- private Collection<Watermark> getInputProcessingWatermarks(ExecutableT
transform) {
+ private Collection<Watermark> getInputProcessingWatermarks(ExecutableT
executable) {
ImmutableList.Builder<Watermark> inputWmsBuilder = ImmutableList.builder();
- Collection<CollectionT> inputs = graph.getPerElementInputs(transform);
+ Collection<CollectionT> inputs = graph.getPerElementInputs(executable);
if (inputs.isEmpty()) {
inputWmsBuilder.add(THE_END_OF_TIME);
}
@@ -853,9 +853,9 @@ private TransformWatermarks
getTransformWatermark(ExecutableT transform) {
return inputWmsBuilder.build();
}
- private List<Watermark> getInputWatermarks(ExecutableT transform) {
+ private List<Watermark> getInputWatermarks(ExecutableT executable) {
ImmutableList.Builder<Watermark> inputWatermarksBuilder =
ImmutableList.builder();
- Collection<CollectionT> inputs = graph.getPerElementInputs(transform);
+ Collection<CollectionT> inputs = graph.getPerElementInputs(executable);
if (inputs.isEmpty()) {
inputWatermarksBuilder.add(THE_END_OF_TIME);
}
@@ -873,10 +873,10 @@ private TransformWatermarks
getTransformWatermark(ExecutableT transform) {
* AppliedPTransform PTransform} has not processed any elements, return a
watermark of {@link
* BoundedWindow#TIMESTAMP_MIN_VALUE}.
*
- * @return a snapshot of the input watermark and output watermark for the
provided transform
+ * @return a snapshot of the input watermark and output watermark for the
provided executable
*/
- public TransformWatermarks getWatermarks(ExecutableT transform) {
- return transformToWatermarks.get(transform);
+ public TransformWatermarks getWatermarks(ExecutableT executable) {
+ return transformToWatermarks.get(executable);
}
public void initialize(Map<ExecutableT, ? extends
Iterable<CommittedBundle<?>>> initialBundles) {
@@ -896,9 +896,9 @@ public void initialize(Map<ExecutableT, ? extends
Iterable<CommittedBundle<?>>>
}
/**
- * Updates the watermarks of a transform with one or more inputs.
+ * Updates the watermarks of a executable with one or more inputs.
*
- * <p>Each transform has two monotonically increasing watermarks: the input
watermark, which can,
+ * <p>Each executable has two monotonically increasing watermarks: the input
watermark, which can,
* at any time, be updated to equal:
* <pre>
* MAX(CurrentInputWatermark, MIN(PendingElements,
InputPCollectionWatermarks))
@@ -908,24 +908,27 @@ public void initialize(Map<ExecutableT, ? extends
Iterable<CommittedBundle<?>>>
* MAX(CurrentOutputWatermark, MIN(InputWatermark, WatermarkHolds))
* </pre>.
*
+ * <p>Updates to watermarks may not be immediately visible.
+ *
* @param completed the input that has completed
- * @param timerUpdate the timers that were added, removed, and completed as
part of producing
- * this update
- * @param result the result that was produced by processing the input
- * @param earliestHold the earliest watermark hold in the transform's state.
{@code null} if there
- * is no hold
+ * @param timerUpdate the timers that were added, removed, and completed as
part of producing this
+ * update
+ * @param executable the executable applied to {@code completed} to produce
the outputs
+ * @param unprocessedInputs inputs that could not be processed
+ * @param outputs outputs that were produced by the application of the
{@code executable} to the
+ * input
+ * @param earliestHold the earliest watermark hold in the executable's state.
*/
public void updateWatermarks(
@Nullable CommittedBundle<?> completed,
TimerUpdate timerUpdate,
- CommittedResult<ExecutableT> result,
+ ExecutableT executable,
+ @Nullable CommittedBundle<?> unprocessedInputs,
+ Iterable<? extends CommittedBundle<?>> outputs,
Instant earliestHold) {
- pendingUpdates.offer(PendingWatermarkUpdate.create(
- result.getExecutable(),
- completed,
- timerUpdate,
- result,
- earliestHold));
+ pendingUpdates.offer(
+ PendingWatermarkUpdate.create(
+ executable, completed, timerUpdate, unprocessedInputs, outputs,
earliestHold));
tryApplyPendingUpdates();
}
@@ -952,10 +955,10 @@ private void applyAllPendingUpdates() {
}
}
- @GuardedBy("refreshLock")
/**
* Applies up to {@code numUpdates}, or all available updates if numUpdates
is non-positive.
*/
+ @GuardedBy("refreshLock")
private void applyNUpdates(int numUpdates) {
for (int i = 0; !pendingUpdates.isEmpty() && (i < numUpdates || numUpdates
<= 0); i++) {
PendingWatermarkUpdate<ExecutableT> pending = pendingUpdates.poll();
@@ -964,38 +967,48 @@ private void applyNUpdates(int numUpdates) {
}
}
+ /** Apply a {@link PendingWatermarkUpdate} to the {@link WatermarkManager}.
*/
private void applyPendingUpdate(PendingWatermarkUpdate<ExecutableT> pending)
{
- CommittedResult<ExecutableT> result = pending.getResult();
- ExecutableT transform = result.getExecutable();
+ ExecutableT executable = pending.getExecutable();
CommittedBundle<?> inputBundle = pending.getInputBundle();
- updatePending(inputBundle, pending.getTimerUpdate(), result);
+ updatePending(
+ inputBundle,
+ pending.getTimerUpdate(),
+ executable,
+ pending.getUnprocessedInputs(),
+ pending.getOutputs());
- TransformWatermarks transformWms = transformToWatermarks.get(transform);
- transformWms.setEventTimeHold(inputBundle == null ? null :
inputBundle.getKey(),
- pending.getEarliestHold());
+ TransformWatermarks transformWms = transformToWatermarks.get(executable);
+ transformWms.setEventTimeHold(
+ inputBundle == null ? null : inputBundle.getKey(),
pending.getEarliestHold());
}
/**
* First adds all produced elements to the queue of pending elements for
each consumer, then adds
* all pending timers to the collection of pending timers, then removes all
completed and deleted
* timers from the collection of pending timers, then removes all completed
elements from the
- * pending queue of the transform.
+ * pending queue of the executable.
*
* <p>It is required that all newly pending elements are added to the queue
of pending elements
* for each consumer prior to the completed elements being removed, as doing
otherwise could cause
* a Watermark to appear in a state in which the upstream (completed)
element does not hold the
* watermark but the element it produced is not yet pending. This can cause
the watermark to
* erroneously advance.
+ *
+ * <p>See {@link #updateWatermarks(CommittedBundle, TimerUpdate, Object,
CommittedBundle,
+ * Iterable, Instant)} for information about the parameters of this method.
*/
private void updatePending(
CommittedBundle<?> input,
TimerUpdate timerUpdate,
- CommittedResult<ExecutableT> result) {
+ ExecutableT executable,
+ @Nullable CommittedBundle<?> unprocessedInputs,
+ Iterable<? extends CommittedBundle<?>> outputs) {
// Newly pending elements must be added before completed elements are
removed, as the two
// do not share a Mutex within this call and thus can be interleaved with
external calls to
// refresh.
- for (CommittedBundle<?> bundle : result.getOutputs()) {
+ for (CommittedBundle<?> bundle : outputs) {
for (ExecutableT consumer :
// TODO: Remove this cast once CommittedBundle returns a CollectionT
graph.getPerElementConsumers((CollectionT) bundle.getPCollection()))
{
@@ -1004,10 +1017,10 @@ private void updatePending(
}
}
- TransformWatermarks completedTransform =
transformToWatermarks.get(result.getExecutable());
- if (result.getUnprocessedInputs().isPresent()) {
+ TransformWatermarks completedTransform =
transformToWatermarks.get(executable);
+ if (unprocessedInputs != null) {
// Add the unprocessed inputs
- completedTransform.addPending(result.getUnprocessedInputs().get());
+ completedTransform.addPending(unprocessedInputs);
}
completedTransform.updateTimers(timerUpdate);
if (input != null) {
@@ -1034,8 +1047,8 @@ synchronized void refreshAll() {
private Set<ExecutableT> refreshAllOf(Set<ExecutableT> toRefresh) {
Set<ExecutableT> newRefreshes = new HashSet<>();
- for (ExecutableT transform : toRefresh) {
- newRefreshes.addAll(refreshWatermarks(transform));
+ for (ExecutableT executable : toRefresh) {
+ newRefreshes.addAll(refreshWatermarks(executable));
}
return newRefreshes;
}
@@ -1179,7 +1192,7 @@ public void removeHold(Object key) {
* A reference to the input and output watermarks of an {@link
AppliedPTransform}.
*/
public class TransformWatermarks {
- private final ExecutableT transform;
+ private final ExecutableT executable;
private final AppliedPTransformInputWatermark inputWatermark;
private final AppliedPTransformOutputWatermark outputWatermark;
@@ -1191,12 +1204,12 @@ public void removeHold(Object key) {
private Instant latestSynchronizedOutputWm;
private TransformWatermarks(
- ExecutableT transform,
+ ExecutableT executable,
AppliedPTransformInputWatermark inputWatermark,
AppliedPTransformOutputWatermark outputWatermark,
SynchronizedProcessingTimeInputWatermark inputSynchProcessingWatermark,
SynchronizedProcessingTimeOutputWatermark
outputSynchProcessingWatermark) {
- this.transform = transform;
+ this.executable = executable;
this.inputWatermark = inputWatermark;
this.outputWatermark = outputWatermark;
@@ -1284,7 +1297,7 @@ private void addPending(CommittedBundle<?> bundle) {
for (Map.Entry<StructuralKey<?>, List<TimerData>> firedTimers :
timersPerKey.entrySet()) {
keyFiredTimers.add(
- new FiredTimers<>(transform, firedTimers.getKey(),
firedTimers.getValue()));
+ new FiredTimers<>(executable, firedTimers.getKey(),
firedTimers.getValue()));
}
return keyFiredTimers;
}
@@ -1470,26 +1483,26 @@ public boolean equals(Object other) {
/**
* A pair of {@link TimerData} and key which can be delivered to the
appropriate
- * {@link AppliedPTransform}. A timer fires at the transform that set it
with a specific key when
+ * {@link AppliedPTransform}. A timer fires at the executable that set it
with a specific key when
* the time domain in which it lives progresses past a specified time, as
determined by the
* {@link WatermarkManager}.
*/
public static class FiredTimers<ExecutableT> {
- /** The transform the timers were set at and will be delivered to. */
- private final ExecutableT transform;
+ /** The executable the timers were set at and will be delivered to. */
+ private final ExecutableT executable;
/** The key the timers were set for and will be delivered to. */
private final StructuralKey<?> key;
private final Collection<TimerData> timers;
private FiredTimers(
- ExecutableT transform, StructuralKey<?> key, Collection<TimerData>
timers) {
- this.transform = transform;
+ ExecutableT executable, StructuralKey<?> key, Collection<TimerData>
timers) {
+ this.executable = executable;
this.key = key;
this.timers = timers;
}
- public ExecutableT getTransform() {
- return transform;
+ public ExecutableT getExecutable() {
+ return executable;
}
public StructuralKey<?> getKey() {
@@ -1522,17 +1535,6 @@ public int compare(CommittedBundle<?> o1,
CommittedBundle<?> o2) {
}
}
- public Set<ExecutableT> getCompletedTransforms() {
- Set<ExecutableT> result = new HashSet<>();
- for (Map.Entry<ExecutableT, TransformWatermarks> wms :
- transformToWatermarks.entrySet()) {
- if (wms.getValue().getOutputWatermark().equals(THE_END_OF_TIME.get())) {
- result.add(wms.getKey());
- }
- }
- return result;
- }
-
@AutoValue
abstract static class PendingWatermarkUpdate<ExecutableT> {
abstract ExecutableT getExecutable();
@@ -1542,18 +1544,22 @@ public int compare(CommittedBundle<?> o1,
CommittedBundle<?> o2) {
abstract TimerUpdate getTimerUpdate();
- abstract CommittedResult<ExecutableT> getResult();
+ @Nullable
+ abstract CommittedBundle<?> getUnprocessedInputs();
+
+ abstract Iterable<? extends CommittedBundle<?>> getOutputs();
abstract Instant getEarliestHold();
public static <ExecutableT> PendingWatermarkUpdate<ExecutableT> create(
ExecutableT executable,
- CommittedBundle<?> inputBundle,
+ @Nullable CommittedBundle<?> inputBundle,
TimerUpdate timerUpdate,
- CommittedResult<ExecutableT> result,
+ @Nullable CommittedBundle<?> unprocessedInputs,
+ Iterable<? extends CommittedBundle<?>> outputs,
Instant earliestHold) {
- return new AutoValue_WatermarkManager_PendingWatermarkUpdate(
- executable, inputBundle, timerUpdate, result, earliestHold);
+ return new AutoValue_WatermarkManager_PendingWatermarkUpdate<>(
+ executable, inputBundle, timerUpdate, unprocessedInputs, outputs,
earliestHold);
}
}
}
diff --git
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
index 21dac7fbb51..03764c1a1ba 100644
---
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
+++
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
@@ -28,20 +28,16 @@
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.when;
-import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
-import java.util.EnumSet;
import java.util.List;
import java.util.Map;
-import javax.annotation.Nullable;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.TimerInternals.TimerData;
-import org.apache.beam.runners.direct.CommittedResult.OutputType;
import
org.apache.beam.runners.direct.WatermarkManager.AppliedPTransformInputWatermark;
import org.apache.beam.runners.direct.WatermarkManager.FiredTimers;
import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
@@ -145,8 +141,7 @@ public void processElement(ProcessContext c) throws
Exception {
*/
@Test
public void getWatermarkForUntouchedTransform() {
- TransformWatermarks watermarks =
- manager.getWatermarks(graph.getProducer(createdInts));
+ TransformWatermarks watermarks =
manager.getWatermarks(graph.getProducer(createdInts));
assertThat(watermarks.getInputWatermark(),
equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
assertThat(watermarks.getOutputWatermark(),
equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
@@ -159,11 +154,12 @@ public void getWatermarkForUntouchedTransform() {
@Test
public void getWatermarkForUpdatedSourceTransform() {
CommittedBundle<Integer> output = multiWindowedBundle(createdInts, 1);
- manager.updateWatermarks(null,
+ manager.updateWatermarks(
+ null,
TimerUpdate.empty(),
- result(graph.getProducer(createdInts),
- null,
- Collections.<CommittedBundle<?>>singleton(output)),
+ graph.getProducer(createdInts),
+ null,
+ Collections.singleton(output),
new Instant(8000L));
manager.refreshAll();
TransformWatermarks updatedSourceWatermark =
@@ -180,11 +176,12 @@ public void getWatermarkForUpdatedSourceTransform() {
public void getWatermarkForMultiInputTransform() {
CommittedBundle<Integer> secondPcollectionBundle =
multiWindowedBundle(intsToFlatten, -1);
- manager.updateWatermarks(null,
+ manager.updateWatermarks(
+ null,
TimerUpdate.empty(),
- result(graph.getProducer(intsToFlatten),
- null,
-
Collections.<CommittedBundle<?>>singleton(secondPcollectionBundle)),
+ graph.getProducer(intsToFlatten),
+ null,
+ Collections.<CommittedBundle<?>>singleton(secondPcollectionBundle),
BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.refreshAll();
@@ -217,20 +214,18 @@ public void getWatermarkForMultiInputTransform() {
manager.updateWatermarks(
secondPcollectionBundle,
TimerUpdate.empty(),
- result(
- graph.getProducer(flattened),
- secondPcollectionBundle.withElements(Collections.emptyList()),
-
Collections.<CommittedBundle<?>>singleton(flattenedBundleSecondCreate)),
+ graph.getProducer(flattened),
+ secondPcollectionBundle.withElements(Collections.emptyList()),
+ Collections.<CommittedBundle<?>>singleton(flattenedBundleSecondCreate),
BoundedWindow.TIMESTAMP_MAX_VALUE);
TransformWatermarks transformAfterProcessing =
manager.getWatermarks(graph.getProducer(flattened));
manager.updateWatermarks(
secondPcollectionBundle,
TimerUpdate.empty(),
- result(
- graph.getProducer(flattened),
- secondPcollectionBundle.withElements(Collections.emptyList()),
-
Collections.<CommittedBundle<?>>singleton(flattenedBundleSecondCreate)),
+ graph.getProducer(flattened),
+ secondPcollectionBundle.withElements(Collections.emptyList()),
+ Collections.<CommittedBundle<?>>singleton(flattenedBundleSecondCreate),
BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.refreshAll();
assertThat(
@@ -245,11 +240,12 @@ public void getWatermarkForMultiInputTransform() {
timestampedBundle(createdInts, TimestampedValue.of(5,
firstCollectionTimestamp));
// the source is done, but elements are still buffered. The source output
watermark should be
// past the end of the global window
- manager.updateWatermarks(null,
+ manager.updateWatermarks(
+ null,
TimerUpdate.empty(),
- result(graph.getProducer(createdInts),
- null,
- Collections.<CommittedBundle<?>>singleton(firstPcollectionBundle)),
+ graph.getProducer(createdInts),
+ null,
+ Collections.<CommittedBundle<?>>singleton(firstPcollectionBundle),
BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.refreshAll();
TransformWatermarks firstSourceWatermarks =
@@ -280,10 +276,9 @@ public void getWatermarkForMultiInputTransform() {
manager.updateWatermarks(
firstPcollectionBundle,
TimerUpdate.empty(),
- result(
- graph.getProducer(flattened),
- firstPcollectionBundle.withElements(Collections.emptyList()),
- Collections.<CommittedBundle<?>>singleton(completedFlattenBundle)),
+ graph.getProducer(flattened),
+ firstPcollectionBundle.withElements(Collections.emptyList()),
+ Collections.<CommittedBundle<?>>singleton(completedFlattenBundle),
BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.refreshAll();
TransformWatermarks afterConsumingAllInput =
@@ -332,11 +327,9 @@ public void getWatermarkMultiIdenticalInput() {
tstMgr.updateWatermarks(
root,
TimerUpdate.empty(),
- CommittedResult.create(
-
StepTransformResult.withoutHold(graph.getProducer(created)).build(),
- Optional.absent(),
- Collections.singleton(createBundle),
- EnumSet.allOf(OutputType.class)),
+ graph.getProducer(created),
+ null,
+ Collections.singleton(createBundle),
BoundedWindow.TIMESTAMP_MAX_VALUE);
tstMgr.refreshAll();
@@ -346,11 +339,9 @@ public void getWatermarkMultiIdenticalInput() {
tstMgr.updateWatermarks(
createBundle,
TimerUpdate.empty(),
- CommittedResult.create(
- StepTransformResult.withoutHold(theFlatten).build(),
- Optional.absent(),
- Collections.emptyList(),
- EnumSet.allOf(OutputType.class)),
+ theFlatten,
+ null,
+ Collections.emptyList(),
BoundedWindow.TIMESTAMP_MAX_VALUE);
tstMgr.refreshAll();
@@ -359,11 +350,9 @@ public void getWatermarkMultiIdenticalInput() {
tstMgr.updateWatermarks(
createBundle,
TimerUpdate.empty(),
- CommittedResult.create(
- StepTransformResult.withoutHold(theFlatten).build(),
- Optional.absent(),
- Collections.emptyList(),
- EnumSet.allOf(OutputType.class)),
+ theFlatten,
+ null,
+ Collections.emptyList(),
BoundedWindow.TIMESTAMP_MAX_VALUE);
tstMgr.refreshAll();
assertThat(flattenWms.getInputWatermark(),
equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
@@ -378,11 +367,12 @@ public void getWatermarkForMultiConsumedCollection() {
CommittedBundle<Integer> createdBundle = timestampedBundle(createdInts,
TimestampedValue.of(1, new Instant(1_000_000L)),
TimestampedValue.of(2, new Instant(1234L)),
TimestampedValue.of(3, new Instant(-1000L)));
- manager.updateWatermarks(null,
+ manager.updateWatermarks(
+ null,
TimerUpdate.empty(),
- result(graph.getProducer(createdInts),
- null,
- Collections.<CommittedBundle<?>>singleton(createdBundle)),
+ graph.getProducer(createdInts),
+ null,
+ Collections.<CommittedBundle<?>>singleton(createdBundle),
new Instant(Long.MAX_VALUE));
manager.refreshAll();
TransformWatermarks createdAfterProducing =
@@ -398,10 +388,9 @@ public void getWatermarkForMultiConsumedCollection() {
manager.updateWatermarks(
createdBundle,
TimerUpdate.empty(),
- result(
- graph.getProducer(keyed),
- createdBundle.withElements(Collections.emptyList()),
- Collections.<CommittedBundle<?>>singleton(keyBundle)),
+ graph.getProducer(keyed),
+ createdBundle.withElements(Collections.emptyList()),
+ Collections.<CommittedBundle<?>>singleton(keyBundle),
BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.refreshAll();
TransformWatermarks keyedWatermarks =
@@ -421,10 +410,9 @@ public void getWatermarkForMultiConsumedCollection() {
manager.updateWatermarks(
createdBundle,
TimerUpdate.empty(),
- result(
- graph.getProducer(filtered),
- createdBundle.withElements(Collections.emptyList()),
- Collections.<CommittedBundle<?>>singleton(filteredBundle)),
+ graph.getProducer(filtered),
+ createdBundle.withElements(Collections.emptyList()),
+ Collections.<CommittedBundle<?>>singleton(filteredBundle),
BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.refreshAll();
TransformWatermarks filteredProcessedWatermarks =
@@ -447,11 +435,12 @@ public void updateWatermarkWithWatermarkHolds() {
TimestampedValue.of(1, new Instant(1_000_000L)),
TimestampedValue.of(2, new Instant(1234L)),
TimestampedValue.of(3, new Instant(-1000L)));
- manager.updateWatermarks(null,
+ manager.updateWatermarks(
+ null,
TimerUpdate.empty(),
- result(graph.getProducer(createdInts),
- null,
- Collections.<CommittedBundle<?>>singleton(createdBundle)),
+ graph.getProducer(createdInts),
+ null,
+ Collections.<CommittedBundle<?>>singleton(createdBundle),
new Instant(Long.MAX_VALUE));
CommittedBundle<KV<String, Integer>> keyBundle = timestampedBundle(keyed,
@@ -461,10 +450,9 @@ public void updateWatermarkWithWatermarkHolds() {
manager.updateWatermarks(
createdBundle,
TimerUpdate.empty(),
- result(
graph.getProducer(keyed),
createdBundle.withElements(Collections.emptyList()),
- Collections.<CommittedBundle<?>>singleton(keyBundle)),
+ Collections.<CommittedBundle<?>>singleton(keyBundle),
new Instant(500L));
manager.refreshAll();
TransformWatermarks keyedWatermarks =
@@ -493,28 +481,27 @@ public void updateWatermarkWithKeyedWatermarkHolds() {
.add(WindowedValue.timestampedValueInGlobalWindow(2, new
Instant(1234L)))
.commit(clock.now());
- manager.updateWatermarks(null,
+ manager.updateWatermarks(
+ null,
TimerUpdate.empty(),
- result(graph.getProducer(createdInts),
- null,
- ImmutableList.of(firstKeyBundle, secondKeyBundle)),
+ graph.getProducer(createdInts),
+ null,
+ ImmutableList.of(firstKeyBundle, secondKeyBundle),
BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.updateWatermarks(
firstKeyBundle,
TimerUpdate.empty(),
- result(
graph.getProducer(filtered),
firstKeyBundle.withElements(Collections.emptyList()),
- Collections.emptyList()),
+ Collections.emptyList(),
new Instant(-1000L));
manager.updateWatermarks(
secondKeyBundle,
TimerUpdate.empty(),
- result(
graph.getProducer(filtered),
secondKeyBundle.withElements(Collections.emptyList()),
- Collections.emptyList()),
+ Collections.emptyList(),
new Instant(1234L));
manager.refreshAll();
@@ -530,10 +517,9 @@ public void updateWatermarkWithKeyedWatermarkHolds() {
manager.updateWatermarks(
fauxFirstKeyTimerBundle,
TimerUpdate.empty(),
- result(
graph.getProducer(filtered),
fauxFirstKeyTimerBundle.withElements(Collections.emptyList()),
- Collections.emptyList()),
+ Collections.emptyList(),
BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.refreshAll();
@@ -545,10 +531,9 @@ public void updateWatermarkWithKeyedWatermarkHolds() {
manager.updateWatermarks(
fauxSecondKeyTimerBundle,
TimerUpdate.empty(),
- result(
graph.getProducer(filtered),
fauxSecondKeyTimerBundle.withElements(Collections.emptyList()),
- Collections.emptyList()),
+ Collections.emptyList(),
new Instant(5678L));
manager.refreshAll();
assertThat(filteredWatermarks.getOutputWatermark(), equalTo(new
Instant(5678L)));
@@ -556,10 +541,9 @@ public void updateWatermarkWithKeyedWatermarkHolds() {
manager.updateWatermarks(
fauxSecondKeyTimerBundle,
TimerUpdate.empty(),
- result(
graph.getProducer(filtered),
fauxSecondKeyTimerBundle.withElements(Collections.emptyList()),
- Collections.emptyList()),
+ Collections.emptyList(),
BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.refreshAll();
assertThat(filteredWatermarks.getOutputWatermark(),
@@ -574,10 +558,12 @@ public void updateWatermarkWithKeyedWatermarkHolds() {
public void updateOutputWatermarkShouldBeMonotonic() {
CommittedBundle<?> firstInput =
bundleFactory.createBundle(createdInts).commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
- manager.updateWatermarks(null, TimerUpdate.empty(),
- result(graph.getProducer(createdInts),
- null,
- Collections.<CommittedBundle<?>>singleton(firstInput)),
+ manager.updateWatermarks(
+ null,
+ TimerUpdate.empty(),
+ graph.getProducer(createdInts),
+ null,
+ Collections.<CommittedBundle<?>>singleton(firstInput),
new Instant(0L));
manager.refreshAll();
TransformWatermarks firstWatermarks =
@@ -588,9 +574,9 @@ public void updateOutputWatermarkShouldBeMonotonic() {
bundleFactory.createBundle(createdInts).commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.updateWatermarks(null,
TimerUpdate.empty(),
- result(graph.getProducer(createdInts),
+ graph.getProducer(createdInts),
null,
- Collections.<CommittedBundle<?>>singleton(secondInput)),
+ Collections.<CommittedBundle<?>>singleton(secondInput),
new Instant(-250L));
manager.refreshAll();
TransformWatermarks secondWatermarks =
@@ -610,9 +596,9 @@ public void updateWatermarkWithHoldsShouldBeMonotonic() {
TimestampedValue.of(3, new Instant(-1000L)));
manager.updateWatermarks(null,
TimerUpdate.empty(),
- result(graph.getProducer(createdInts),
+ graph.getProducer(createdInts),
null,
- Collections.<CommittedBundle<?>>singleton(createdBundle)),
+ Collections.<CommittedBundle<?>>singleton(createdBundle),
new Instant(Long.MAX_VALUE));
CommittedBundle<KV<String, Integer>> keyBundle =
@@ -622,10 +608,9 @@ public void updateWatermarkWithHoldsShouldBeMonotonic() {
manager.updateWatermarks(
createdBundle,
TimerUpdate.empty(),
- result(
graph.getProducer(keyed),
createdBundle.withElements(Collections.emptyList()),
- Collections.<CommittedBundle<?>>singleton(keyBundle)),
+ Collections.<CommittedBundle<?>>singleton(keyBundle),
new Instant(500L));
manager.refreshAll();
TransformWatermarks keyedWatermarks =
@@ -657,20 +642,22 @@ public void updateWatermarkWithUnprocessedElements() {
.add(third)
.commit(clock.now());
- manager.updateWatermarks(null,
+ manager.updateWatermarks(
+ null,
TimerUpdate.empty(),
- result(graph.getProducer(createdInts),
- null,
- Collections.<CommittedBundle<?>>singleton(createdBundle)),
+ graph.getProducer(createdInts),
+ null,
+ Collections.<CommittedBundle<?>>singleton(createdBundle),
BoundedWindow.TIMESTAMP_MAX_VALUE);
CommittedBundle<KV<String, Integer>> keyBundle = timestampedBundle(keyed,
TimestampedValue.of(KV.of("MyKey", 1),
BoundedWindow.TIMESTAMP_MIN_VALUE));
- manager.updateWatermarks(createdBundle,
+ manager.updateWatermarks(
+ createdBundle,
TimerUpdate.empty(),
- result(graph.getProducer(keyed),
- createdBundle.withElements(ImmutableList.of(second, third)),
- Collections.<CommittedBundle<?>>singleton(keyBundle)),
+ graph.getProducer(keyed),
+ createdBundle.withElements(ImmutableList.of(second, third)),
+ Collections.<CommittedBundle<?>>singleton(keyBundle),
BoundedWindow.TIMESTAMP_MAX_VALUE);
TransformWatermarks keyedWatermarks =
manager.getWatermarks(graph.getProducer(keyed));
@@ -692,20 +679,20 @@ public void
updateWatermarkWithCompletedElementsNotPending() {
.add(second)
.commit(clock.now());
- manager.updateWatermarks(null,
+ manager.updateWatermarks(
+ null,
TimerUpdate.empty(),
- result(graph.getProducer(createdInts),
- null,
- Collections.<CommittedBundle<?>>singleton(createdBundle)),
+ graph.getProducer(createdInts),
+ null,
+ Collections.<CommittedBundle<?>>singleton(createdBundle),
BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.updateWatermarks(
neverCreatedBundle,
TimerUpdate.empty(),
- result(
graph.getProducer(filtered),
neverCreatedBundle.withElements(Collections.emptyList()),
- Collections.emptyList()),
+ Collections.emptyList(),
BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.refreshAll();
@@ -723,11 +710,12 @@ public void updateWatermarkWithLateData() {
CommittedBundle<Integer> createdBundle = timestampedBundle(createdInts,
TimestampedValue.of(1, sourceWatermark), TimestampedValue.of(2, new
Instant(1234L)));
- manager.updateWatermarks(null,
+ manager.updateWatermarks(
+ null,
TimerUpdate.empty(),
- result(graph.getProducer(createdInts),
- null,
- Collections.<CommittedBundle<?>>singleton(createdBundle)),
+ graph.getProducer(createdInts),
+ null,
+ Collections.<CommittedBundle<?>>singleton(createdBundle),
sourceWatermark);
CommittedBundle<KV<String, Integer>> keyBundle =
@@ -738,10 +726,9 @@ public void updateWatermarkWithLateData() {
manager.updateWatermarks(
createdBundle,
TimerUpdate.empty(),
- result(
graph.getProducer(keyed),
createdBundle.withElements(Collections.emptyList()),
- Collections.<CommittedBundle<?>>singleton(keyBundle)),
+ Collections.<CommittedBundle<?>>singleton(keyBundle),
BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.refreshAll();
TransformWatermarks onTimeWatermarks =
@@ -756,10 +743,9 @@ public void updateWatermarkWithLateData() {
manager.updateWatermarks(
null,
TimerUpdate.empty(),
- result(
graph.getProducer(createdInts),
createdBundle.withElements(Collections.emptyList()),
- Collections.<CommittedBundle<?>>singleton(lateDataBundle)),
+ Collections.<CommittedBundle<?>>singleton(lateDataBundle),
new Instant(2_000_000L));
manager.refreshAll();
TransformWatermarks bufferedLateWm =
@@ -778,10 +764,9 @@ public void updateWatermarkWithLateData() {
manager.updateWatermarks(
lateDataBundle,
TimerUpdate.empty(),
- result(
- graph.getProducer(keyed),
- lateDataBundle.withElements(Collections.emptyList()),
- Collections.<CommittedBundle<?>>singleton(lateKeyedBundle)),
+ graph.getProducer(keyed),
+ lateDataBundle.withElements(Collections.emptyList()),
+ Collections.<CommittedBundle<?>>singleton(lateKeyedBundle),
BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.refreshAll();
}
@@ -789,12 +774,14 @@ public void updateWatermarkWithLateData() {
public void updateWatermarkWithDifferentWindowedValueInstances() {
manager.updateWatermarks(
null,
- TimerUpdate.empty(), result(graph.getProducer(createdInts), null,
+ TimerUpdate.empty(),
+ graph.getProducer(createdInts),
+ null,
Collections.<CommittedBundle<?>>singleton(
bundleFactory
.createBundle(createdInts)
.add(WindowedValue.valueInGlobalWindow(1))
- .commit(Instant.now()))),
+ .commit(Instant.now())),
BoundedWindow.TIMESTAMP_MAX_VALUE);
CommittedBundle<Integer> createdBundle =
bundleFactory.createBundle(createdInts)
@@ -803,10 +790,9 @@ public void
updateWatermarkWithDifferentWindowedValueInstances() {
manager.updateWatermarks(
createdBundle,
TimerUpdate.empty(),
- result(
- graph.getProducer(keyed),
- createdBundle.withElements(Collections.emptyList()),
- Collections.emptyList()),
+ graph.getProducer(keyed),
+ createdBundle.withElements(Collections.emptyList()),
+ Collections.emptyList(),
null);
manager.refreshAll();
TransformWatermarks onTimeWatermarks =
@@ -821,11 +807,12 @@ public void
updateWatermarkWithDifferentWindowedValueInstances() {
@Test
public void getWatermarksAfterOnlyEmptyOutput() {
CommittedBundle<Integer> emptyCreateOutput =
multiWindowedBundle(createdInts);
- manager.updateWatermarks(null,
+ manager.updateWatermarks(
+ null,
TimerUpdate.empty(),
- result(graph.getProducer(createdInts),
- null,
- Collections.<CommittedBundle<?>>singleton(emptyCreateOutput)),
+ graph.getProducer(createdInts),
+ null,
+ Collections.<CommittedBundle<?>>singleton(emptyCreateOutput),
BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.refreshAll();
TransformWatermarks updatedSourceWatermarks =
@@ -852,21 +839,21 @@ public void getWatermarksAfterOnlyEmptyOutput() {
@Test
public void getWatermarksAfterHoldAndEmptyOutput() {
CommittedBundle<Integer> firstCreateOutput =
multiWindowedBundle(createdInts, 1, 2);
- manager.updateWatermarks(null,
+ manager.updateWatermarks(
+ null,
TimerUpdate.empty(),
- result(graph.getProducer(createdInts),
- null,
- Collections.<CommittedBundle<?>>singleton(firstCreateOutput)),
+ graph.getProducer(createdInts),
+ null,
+ Collections.<CommittedBundle<?>>singleton(firstCreateOutput),
new Instant(12_000L));
CommittedBundle<Integer> firstFilterOutput = multiWindowedBundle(filtered);
manager.updateWatermarks(
firstCreateOutput,
TimerUpdate.empty(),
- result(
- graph.getProducer(filtered),
- firstCreateOutput.withElements(Collections.emptyList()),
- Collections.<CommittedBundle<?>>singleton(firstFilterOutput)),
+ graph.getProducer(filtered),
+ firstCreateOutput.withElements(Collections.emptyList()),
+ Collections.<CommittedBundle<?>>singleton(firstFilterOutput),
new Instant(10_000L));
manager.refreshAll();
TransformWatermarks firstFilterWatermarks =
@@ -875,11 +862,12 @@ public void getWatermarksAfterHoldAndEmptyOutput() {
assertThat(firstFilterWatermarks.getOutputWatermark(), not(laterThan(new
Instant(10_000L))));
CommittedBundle<Integer> emptyCreateOutput =
multiWindowedBundle(createdInts);
- manager.updateWatermarks(null,
+ manager.updateWatermarks(
+ null,
TimerUpdate.empty(),
- result(graph.getProducer(createdInts),
- null,
- Collections.<CommittedBundle<?>>singleton(emptyCreateOutput)),
+ graph.getProducer(createdInts),
+ null,
+ Collections.<CommittedBundle<?>>singleton(emptyCreateOutput),
BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.refreshAll();
TransformWatermarks updatedSourceWatermarks =
@@ -919,11 +907,12 @@ public void
getSynchronizedProcessingTimeInputWatermarksHeldToPendingBundles() {
CommittedBundle<Integer> createOutput =
bundleFactory.createBundle(createdInts).commit(new Instant(1250L));
- manager.updateWatermarks(null,
+ manager.updateWatermarks(
+ null,
TimerUpdate.empty(),
- result(graph.getProducer(createdInts),
- null,
- Collections.<CommittedBundle<?>>singleton(createOutput)),
+ graph.getProducer(createdInts),
+ null,
+ Collections.<CommittedBundle<?>>singleton(createOutput),
BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.refreshAll();
TransformWatermarks createAfterUpdate =
@@ -953,10 +942,9 @@ public void
getSynchronizedProcessingTimeInputWatermarksHeldToPendingBundles() {
manager.updateWatermarks(
createOutput,
TimerUpdate.empty(),
- result(
- graph.getProducer(filtered),
- createOutput.withElements(Collections.emptyList()),
- Collections.<CommittedBundle<?>>singleton(filterOutputBundle)),
+ graph.getProducer(filtered),
+ createOutput.withElements(Collections.emptyList()),
+ Collections.<CommittedBundle<?>>singleton(filterOutputBundle),
BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.refreshAll();
TransformWatermarks filterAfterConsumed =
@@ -978,11 +966,12 @@ public void
getSynchronizedProcessingTimeInputWatermarksHeldToPendingBundles() {
// @Test
public void getSynchronizedProcessingTimeOutputHeldToPendingTimers() {
CommittedBundle<Integer> createdBundle = multiWindowedBundle(createdInts,
1, 2, 4, 8);
- manager.updateWatermarks(null,
+ manager.updateWatermarks(
+ null,
TimerUpdate.empty(),
- result(graph.getProducer(createdInts),
- null,
- Collections.<CommittedBundle<?>>singleton(createdBundle)),
+ graph.getProducer(createdInts),
+ null,
+ Collections.<CommittedBundle<?>>singleton(createdBundle),
new Instant(1248L));
manager.refreshAll();
@@ -1004,10 +993,9 @@ public void
getSynchronizedProcessingTimeOutputHeldToPendingTimers() {
manager.updateWatermarks(
createdBundle,
timers,
- result(
- graph.getProducer(filtered),
- createdBundle.withElements(Collections.emptyList()),
- Collections.<CommittedBundle<?>>singleton(filteredBundle)),
+ graph.getProducer(filtered),
+ createdBundle.withElements(Collections.emptyList()),
+ Collections.<CommittedBundle<?>>singleton(filteredBundle),
BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.refreshAll();
Instant startTime = clock.now();
@@ -1041,10 +1029,9 @@ public void
getSynchronizedProcessingTimeOutputHeldToPendingTimers() {
manager.updateWatermarks(
filteredTimerBundle,
TimerUpdate.builder(key).withCompletedTimers(Collections.singleton(pastTimer)).build(),
- result(
- graph.getProducer(filtered),
- filteredTimerBundle.withElements(Collections.emptyList()),
- Collections.<CommittedBundle<?>>singleton(filteredTimerResult)),
+ graph.getProducer(filtered),
+ filteredTimerBundle.withElements(Collections.emptyList()),
+ Collections.<CommittedBundle<?>>singleton(filteredTimerResult),
BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.refreshAll();
@@ -1058,10 +1045,9 @@ public void
getSynchronizedProcessingTimeOutputHeldToPendingTimers() {
manager.updateWatermarks(
filteredTimerResult,
TimerUpdate.empty(),
- result(
- graph.getProducer(filteredTimesTwo),
- filteredTimerResult.withElements(Collections.emptyList()),
- Collections.emptyList()),
+ graph.getProducer(filteredTimesTwo),
+ filteredTimerResult.withElements(Collections.emptyList()),
+ Collections.emptyList(),
BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.refreshAll();
assertThat(filteredDoubledWms.getSynchronizedProcessingOutputTime(),
equalTo(clock.now()));
@@ -1096,26 +1082,27 @@ public void
getSynchronizedProcessingTimeOutputTimeIsMonotonic() {
CommittedBundle<Integer> createOutput =
bundleFactory.createBundle(createdInts).commit(new Instant(1250L));
- manager.updateWatermarks(null,
+ manager.updateWatermarks(
+ null,
TimerUpdate.empty(),
- result(graph.getProducer(createdInts),
- null,
- Collections.<CommittedBundle<?>>singleton(createOutput)),
+ graph.getProducer(createdInts),
+ null,
+ Collections.<CommittedBundle<?>>singleton(createOutput),
BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.refreshAll();
- TransformWatermarks createAfterUpdate =
- manager.getWatermarks(graph.getProducer(createdInts));
+ TransformWatermarks createAfterUpdate =
manager.getWatermarks(graph.getProducer(createdInts));
assertThat(createAfterUpdate.getSynchronizedProcessingInputTime(),
not(laterThan(clock.now())));
- assertThat(createAfterUpdate.getSynchronizedProcessingOutputTime(),
- not(laterThan(clock.now())));
+ assertThat(
+ createAfterUpdate.getSynchronizedProcessingOutputTime(),
not(laterThan(clock.now())));
CommittedBundle<Integer> createSecondOutput =
bundleFactory.createBundle(createdInts).commit(new Instant(750L));
- manager.updateWatermarks(null,
+ manager.updateWatermarks(
+ null,
TimerUpdate.empty(),
- result(graph.getProducer(createdInts),
- null,
- Collections.<CommittedBundle<?>>singleton(createSecondOutput)),
+ graph.getProducer(createdInts),
+ null,
+ Collections.<CommittedBundle<?>>singleton(createSecondOutput),
BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.refreshAll();
@@ -1125,11 +1112,12 @@ public void
getSynchronizedProcessingTimeOutputTimeIsMonotonic() {
@Test
public void
synchronizedProcessingInputTimeIsHeldToUpstreamProcessingTimeTimers() {
CommittedBundle<Integer> created = multiWindowedBundle(createdInts, 1, 2,
3);
- manager.updateWatermarks(null,
+ manager.updateWatermarks(
+ null,
TimerUpdate.empty(),
- result(graph.getProducer(createdInts),
- null,
- Collections.<CommittedBundle<?>>singleton(created)),
+ graph.getProducer(createdInts),
+ null,
+ Collections.<CommittedBundle<?>>singleton(created),
new Instant(40_900L));
manager.refreshAll();
@@ -1142,15 +1130,13 @@ public void
synchronizedProcessingInputTimeIsHeldToUpstreamProcessingTimeTimers(
TimerUpdate.builder(StructuralKey.of("key", StringUtf8Coder.of()))
.setTimer(upstreamProcessingTimer)
.build(),
- result(
- graph.getProducer(filtered),
- created.withElements(Collections.emptyList()),
- Collections.<CommittedBundle<?>>singleton(filteredBundle)),
+ graph.getProducer(filtered),
+ created.withElements(Collections.emptyList()),
+ Collections.<CommittedBundle<?>>singleton(filteredBundle),
BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.refreshAll();
- TransformWatermarks downstreamWms =
- manager.getWatermarks(graph.getProducer(filteredTimesTwo));
+ TransformWatermarks downstreamWms =
manager.getWatermarks(graph.getProducer(filteredTimesTwo));
assertThat(downstreamWms.getSynchronizedProcessingInputTime(),
equalTo(clock.now()));
clock.set(BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -1167,10 +1153,9 @@ public void
synchronizedProcessingInputTimeIsHeldToUpstreamProcessingTimeTimers(
TimerUpdate.builder(StructuralKey.of("key", StringUtf8Coder.of()))
.withCompletedTimers(Collections.singleton(upstreamProcessingTimer))
.build(),
- result(
- graph.getProducer(filtered),
- otherCreated.withElements(Collections.emptyList()),
- Collections.emptyList()),
+ graph.getProducer(filtered),
+ otherCreated.withElements(Collections.emptyList()),
+ Collections.emptyList(),
BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.refreshAll();
@@ -1183,9 +1168,9 @@ public void
synchronizedProcessingInputTimeIsHeldToPendingBundleTimes() {
manager.updateWatermarks(
null,
TimerUpdate.empty(),
- result(graph.getProducer(createdInts),
- null,
- Collections.<CommittedBundle<?>>singleton(created)),
+ graph.getProducer(createdInts),
+ null,
+ Collections.<CommittedBundle<?>>singleton(created),
new Instant(29_919_235L));
Instant upstreamHold = new Instant(2048L);
@@ -1195,15 +1180,13 @@ public void
synchronizedProcessingInputTimeIsHeldToPendingBundleTimes() {
manager.updateWatermarks(
created,
TimerUpdate.empty(),
- result(
- graph.getProducer(filtered),
- created.withElements(Collections.emptyList()),
- Collections.<CommittedBundle<?>>singleton(filteredBundle)),
+ graph.getProducer(filtered),
+ created.withElements(Collections.emptyList()),
+ Collections.<CommittedBundle<?>>singleton(filteredBundle),
BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.refreshAll();
- TransformWatermarks downstreamWms =
- manager.getWatermarks(graph.getProducer(filteredTimesTwo));
+ TransformWatermarks downstreamWms =
manager.getWatermarks(graph.getProducer(filteredTimesTwo));
assertThat(downstreamWms.getSynchronizedProcessingInputTime(),
equalTo(clock.now()));
clock.set(BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -1219,11 +1202,12 @@ public void
extractFiredTimersReturnsFiredEventTimeTimers() {
// Advance WM of keyed past the first timer, but ahead of the second and
third
CommittedBundle<Integer> createdBundle = multiWindowedBundle(filtered);
- manager.updateWatermarks(null,
+ manager.updateWatermarks(
+ null,
TimerUpdate.empty(),
- result(graph.getProducer(createdInts),
- null,
- Collections.singleton(createdBundle)),
+ graph.getProducer(createdInts),
+ null,
+ Collections.singleton(createdBundle),
new Instant(1500L));
manager.refreshAll();
@@ -1244,10 +1228,9 @@ public void
extractFiredTimersReturnsFiredEventTimeTimers() {
manager.updateWatermarks(
createdBundle,
update,
- result(
- graph.getProducer(filtered),
- createdBundle.withElements(Collections.emptyList()),
-
Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten))),
+ graph.getProducer(filtered),
+ createdBundle.withElements(Collections.emptyList()),
+
Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten)),
new Instant(1000L));
manager.refreshAll();
@@ -1260,7 +1243,9 @@ public void
extractFiredTimersReturnsFiredEventTimeTimers() {
manager.updateWatermarks(
null,
TimerUpdate.empty(),
- result(graph.getProducer(createdInts), null, Collections.emptyList()),
+ graph.getProducer(createdInts),
+ null,
+ Collections.emptyList(),
new Instant(50_000L));
manager.refreshAll();
Collection<FiredTimers<AppliedPTransform<?, ?, ?>>> secondFiredTimers =
@@ -1281,11 +1266,12 @@ public void
extractFiredTimersReturnsFiredProcessingTimeTimers() {
// Advance WM of keyed past the first timer, but ahead of the second and
third
CommittedBundle<Integer> createdBundle = multiWindowedBundle(filtered);
- manager.updateWatermarks(null,
+ manager.updateWatermarks(
+ null,
TimerUpdate.empty(),
- result(graph.getProducer(createdInts),
- null,
- Collections.singleton(createdBundle)),
+ graph.getProducer(createdInts),
+ null,
+ Collections.singleton(createdBundle),
new Instant(1500L));
TimerData earliestTimer =
@@ -1305,10 +1291,9 @@ public void
extractFiredTimersReturnsFiredProcessingTimeTimers() {
manager.updateWatermarks(
createdBundle,
update,
- result(
- graph.getProducer(filtered),
- createdBundle.withElements(Collections.emptyList()),
-
Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten))),
+ graph.getProducer(filtered),
+ createdBundle.withElements(Collections.emptyList()),
+
Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten)),
new Instant(1000L));
manager.refreshAll();
@@ -1322,7 +1307,9 @@ public void
extractFiredTimersReturnsFiredProcessingTimeTimers() {
manager.updateWatermarks(
null,
TimerUpdate.empty(),
- result(graph.getProducer(createdInts), null, Collections.emptyList()),
+ graph.getProducer(createdInts),
+ null,
+ Collections.emptyList(),
new Instant(50_000L));
manager.refreshAll();
Collection<FiredTimers<AppliedPTransform<?, ?, ?>>> secondFiredTimers =
@@ -1343,11 +1330,12 @@ public void
extractFiredTimersReturnsFiredSynchronizedProcessingTimeTimers() {
// Advance WM of keyed past the first timer, but ahead of the second and
third
CommittedBundle<Integer> createdBundle = multiWindowedBundle(filtered);
- manager.updateWatermarks(null,
+ manager.updateWatermarks(
+ null,
TimerUpdate.empty(),
- result(graph.getProducer(createdInts),
- null,
- Collections.singleton(createdBundle)),
+ graph.getProducer(createdInts),
+ null,
+ Collections.singleton(createdBundle),
new Instant(1500L));
TimerData earliestTimer = TimerData.of(
@@ -1367,10 +1355,9 @@ public void
extractFiredTimersReturnsFiredSynchronizedProcessingTimeTimers() {
manager.updateWatermarks(
createdBundle,
update,
- result(
- graph.getProducer(filtered),
- createdBundle.withElements(Collections.emptyList()),
-
Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten))),
+ graph.getProducer(filtered),
+ createdBundle.withElements(Collections.emptyList()),
+
Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten)),
new Instant(1000L));
manager.refreshAll();
@@ -1384,7 +1371,9 @@ public void
extractFiredTimersReturnsFiredSynchronizedProcessingTimeTimers() {
manager.updateWatermarks(
null,
TimerUpdate.empty(),
- result(graph.getProducer(createdInts), null, Collections.emptyList()),
+ graph.getProducer(createdInts),
+ null,
+ Collections.emptyList(),
new Instant(50_000L));
manager.refreshAll();
Collection<FiredTimers<AppliedPTransform<?, ?, ?>>> secondFiredTimers =
@@ -1419,7 +1408,9 @@ public void processingTimeTimersCanBeReset() {
manager.updateWatermarks(
null,
initialUpdate,
- result(graph.getProducer(createdInts), null, Collections.emptyList()),
+ graph.getProducer(createdInts),
+ null,
+ Collections.emptyList(),
new Instant(5000L));
manager.refreshAll();
@@ -1427,7 +1418,9 @@ public void processingTimeTimersCanBeReset() {
manager.updateWatermarks(
null,
overridingUpdate,
- result(graph.getProducer(createdInts), null, Collections.emptyList()),
+ graph.getProducer(createdInts),
+ null,
+ Collections.emptyList(),
new Instant(10000L));
// Set clock past the timers.
@@ -1462,10 +1455,9 @@ public void eventTimeTimersCanBeReset() {
manager.updateWatermarks(
createdBundle,
initialUpdate,
- result(
- graph.getProducer(filtered),
- createdBundle.withElements(Collections.emptyList()),
-
Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten))),
+ graph.getProducer(filtered),
+ createdBundle.withElements(Collections.emptyList()),
+
Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten)),
new Instant(1000L));
manager.refreshAll();
@@ -1473,19 +1465,19 @@ public void eventTimeTimersCanBeReset() {
manager.updateWatermarks(
createdBundle,
overridingUpdate,
- result(
- graph.getProducer(filtered),
- createdBundle.withElements(Collections.emptyList()),
-
Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten))),
+ graph.getProducer(filtered),
+ createdBundle.withElements(Collections.emptyList()),
+
Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten)),
new Instant(1000L));
manager.refreshAll();
// Set WM past the timers.
- manager.updateWatermarks(null,
+ manager.updateWatermarks(
+ null,
TimerUpdate.empty(),
- result(graph.getProducer(createdInts),
- null,
- Collections.singleton(createdBundle)),
+ graph.getProducer(createdInts),
+ null,
+ Collections.singleton(createdBundle),
new Instant(3000L));
manager.refreshAll();
@@ -1709,23 +1701,4 @@ public void describeTo(Description description) {
}
return bundle.commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
}
-
- private CommittedResult<AppliedPTransform<?, ?, ?>> result(
- AppliedPTransform<?, ?, ?> transform,
- @Nullable CommittedBundle<?> unprocessedBundle,
- Iterable<? extends CommittedBundle<?>> bundles) {
- Optional<? extends CommittedBundle<?>> unprocessedElements;
- if (unprocessedBundle == null ||
Iterables.isEmpty(unprocessedBundle.getElements())) {
- unprocessedElements = Optional.absent();
- } else {
- unprocessedElements = Optional.of(unprocessedBundle);
- }
- return CommittedResult.create(
- StepTransformResult.withoutHold(transform).build(),
- unprocessedElements,
- bundles,
- Iterables.isEmpty(bundles)
- ? EnumSet.noneOf(OutputType.class)
- : EnumSet.of(OutputType.BUNDLE));
- }
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 93061)
Time Spent: 1h 50m (was: 1h 40m)
> Remove Use of Java SDK Types in the DirectRunner "engine"
> ---------------------------------------------------------
>
> Key: BEAM-4135
> URL: https://issues.apache.org/jira/browse/BEAM-4135
> Project: Beam
> Issue Type: New Feature
> Components: runner-direct
> Reporter: Thomas Groh
> Assignee: Thomas Groh
> Priority: Major
> Labels: portability
> Time Spent: 1h 50m
> Remaining Estimate: 0h
>
> The "engine" consists of the components which determine where to schedule
> work and route it to the appropriate processors, such as WatermarkManager,
> DirectBundleProcessor, and associated.
>
> These engine components never inspect the actual characteristics of the
> packaged work (e.g. the PCollection is a token, rather than a rich object),
> so they should not require use of a PCollection directly - instead, they can
> be generic.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)