Repository: incubator-beam Updated Branches: refs/heads/master 487052588 -> 874ddef05
Update Watermarks Outside of handleResult Remove excess mutual exclusion Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/46bc6e14 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/46bc6e14 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/46bc6e14 Branch: refs/heads/master Commit: 46bc6e14b5e247905f567e617ffc017f98cc6f44 Parents: 4870525 Author: Thomas Groh <[email protected]> Authored: Mon May 2 17:26:47 2016 -0700 Committer: Thomas Groh <[email protected]> Committed: Tue May 10 10:49:25 2016 -0700 ---------------------------------------------------------------------- .../direct/InMemoryWatermarkManager.java | 113 +++++++++++++++---- .../direct/InProcessEvaluationContext.java | 14 ++- .../direct/WatermarkCallbackExecutor.java | 9 +- .../direct/InMemoryWatermarkManagerTest.java | 58 ++++++++-- .../direct/InProcessEvaluationContextTest.java | 8 +- .../direct/WatermarkCallbackExecutorTest.java | 4 +- 6 files changed, 169 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/46bc6e14/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java index 87ea4d5..f8cf343 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java @@ -29,6 +29,7 @@ import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PValue; +import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.MoreObjects; @@ -57,6 +58,7 @@ import java.util.Objects; import java.util.PriorityQueue; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; @@ -686,6 +688,17 @@ public class InMemoryWatermarkManager { private final Map<AppliedPTransform<?, ?, ?>, TransformWatermarks> transformToWatermarks; /** + * A queue of pending updates to the state of this {@link InMemoryWatermarkManager}. + */ + private final ConcurrentLinkedQueue<PendingWatermarkUpdate> pendingUpdates; + + /** + * A queue of pending {@link AppliedPTransform AppliedPTransforms} that have potentially + * stale data. + */ + private final ConcurrentLinkedQueue<AppliedPTransform<?, ?, ?>> pendingRefreshes; + + /** * Creates a new {@link InMemoryWatermarkManager}. All watermarks within the newly created * {@link InMemoryWatermarkManager} start at {@link BoundedWindow#TIMESTAMP_MIN_VALUE}, the * minimum watermark, with no watermark holds or pending elements. @@ -707,6 +720,8 @@ public class InMemoryWatermarkManager { Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> consumers) { this.clock = clock; this.consumers = consumers; + this.pendingUpdates = new ConcurrentLinkedQueue<>(); + this.pendingRefreshes = new ConcurrentLinkedQueue<>(); transformToWatermarks = new HashMap<>(); @@ -810,27 +825,38 @@ public class InMemoryWatermarkManager { @Nullable CommittedBundle<?> completed, TimerUpdate timerUpdate, CommittedResult result, - @Nullable Instant earliestHold) { - AppliedPTransform<?, ?, ?> transform = result.getTransform(); - updatePending(completed, timerUpdate, result); - TransformWatermarks transformWms = transformToWatermarks.get(transform); - transformWms.setEventTimeHold(completed == null ? null : completed.getKey(), earliestHold); - refreshWatermarks(transform); + Instant earliestHold) { + pendingUpdates.offer(PendingWatermarkUpdate.create(completed, + timerUpdate, + result, + earliestHold)); } - private void refreshWatermarks(AppliedPTransform<?, ?, ?> transform) { - TransformWatermarks myWatermarks = transformToWatermarks.get(transform); - WatermarkUpdate updateResult = myWatermarks.refresh(); - if (updateResult.isAdvanced()) { - for (PValue outputPValue : transform.getOutput().expand()) { - Collection<AppliedPTransform<?, ?, ?>> downstreamTransforms = consumers.get(outputPValue); - if (downstreamTransforms != null) { - for (AppliedPTransform<?, ?, ?> downstreamTransform : downstreamTransforms) { - refreshWatermarks(downstreamTransform); - } - } - } - } + /** + * Applies all pending updates to this {@link InMemoryWatermarkManager}, causing the pending state + * of all {@link TransformWatermarks} to be advanced as far as possible. + */ + private void applyPendingUpdates() { + Set<AppliedPTransform<?, ?, ?>> updatedTransforms = new HashSet<>(); + PendingWatermarkUpdate pending = pendingUpdates.poll(); + while (pending != null) { + applyPendingUpdate(pending); + updatedTransforms.add(pending.getTransform()); + pending = pendingUpdates.poll(); + } + pendingRefreshes.addAll(updatedTransforms); + } + + private void applyPendingUpdate(PendingWatermarkUpdate pending) { + CommittedResult result = pending.getResult(); + AppliedPTransform transform = result.getTransform(); + CommittedBundle<?> inputBundle = pending.getInputBundle(); + + updatePending(inputBundle, pending.getTimerUpdate(), result); + + TransformWatermarks transformWms = transformToWatermarks.get(transform); + transformWms.setEventTimeHold(inputBundle == null ? null : inputBundle.getKey(), + pending.getEarliestHold()); } /** @@ -871,6 +897,29 @@ public class InMemoryWatermarkManager { } /** + * Refresh the watermarks contained within this {@link InMemoryWatermarkManager}, causing all + * watermarks to be advanced as far as possible. + */ + synchronized void refreshAll() { + applyPendingUpdates(); + while (!pendingRefreshes.isEmpty()) { + refreshWatermarks(pendingRefreshes.poll()); + } + } + + private void refreshWatermarks(AppliedPTransform<?, ?, ?> toRefresh) { + TransformWatermarks myWatermarks = transformToWatermarks.get(toRefresh); + WatermarkUpdate updateResult = myWatermarks.refresh(); + Set<AppliedPTransform<?, ?, ?>> additionalRefreshes = new HashSet<>(); + if (updateResult.isAdvanced()) { + for (PValue outputPValue : toRefresh.getOutput().expand()) { + additionalRefreshes.addAll(consumers.get(outputPValue)); + } + } + pendingRefreshes.addAll(additionalRefreshes); + } + + /** * Returns a map of each {@link PTransform} that has pending timers to those timers. All of the * pending timers will be removed from this {@link InMemoryWatermarkManager}. */ @@ -1338,4 +1387,30 @@ public class InMemoryWatermarkManager { } return result; } + + @AutoValue + abstract static class PendingWatermarkUpdate { + @Nullable + public abstract CommittedBundle<?> getInputBundle(); + public abstract TimerUpdate getTimerUpdate(); + public abstract CommittedResult getResult(); + public abstract Instant getEarliestHold(); + + /** + * Gets the {@link AppliedPTransform} that generated this result. + */ + public AppliedPTransform<?, ?, ?> getTransform() { + return getResult().getTransform(); + } + + public static PendingWatermarkUpdate create( + CommittedBundle<?> inputBundle, + TimerUpdate timerUpdate, + CommittedResult result, Instant earliestHold) { + return new AutoValue_InMemoryWatermarkManager_PendingWatermarkUpdate(inputBundle, + timerUpdate, + result, + earliestHold); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/46bc6e14/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java index a6dffba..9eeafbb 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java @@ -43,6 +43,7 @@ import org.apache.beam.sdk.values.PCollection.IsBounded; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PValue; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; @@ -51,6 +52,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executors; import javax.annotation.Nullable; @@ -128,7 +130,7 @@ class InProcessEvaluationContext { this.applicationStateInternals = new ConcurrentHashMap<>(); this.mergedCounters = new CounterSet(); - this.callbackExecutor = WatermarkCallbackExecutor.create(); + this.callbackExecutor = WatermarkCallbackExecutor.create(Executors.newSingleThreadExecutor()); } /** @@ -146,7 +148,7 @@ class InProcessEvaluationContext { * @param result the result of evaluating the input bundle * @return the committed bundles contained within the handled {@code result} */ - public synchronized CommittedResult handleResult( + public CommittedResult handleResult( @Nullable CommittedBundle<?> completedBundle, Iterable<TimerData> completedTimers, InProcessTransformResult result) { @@ -163,7 +165,6 @@ class InProcessEvaluationContext { result.getTimerUpdate().withCompletedTimers(completedTimers), committedResult, result.getWatermarkHold()); - fireAllAvailableCallbacks(); // Update counters if (result.getCounters() != null) { mergedCounters.merge(result.getCounters()); @@ -359,6 +360,12 @@ class InProcessEvaluationContext { return mergedCounters; } + @VisibleForTesting + void forceRefresh() { + watermarkManager.refreshAll(); + fireAllAvailableCallbacks(); + } + /** * Extracts all timers that have been fired and have not already been extracted. * @@ -366,6 +373,7 @@ class InProcessEvaluationContext { * for each time they are set. */ public Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> extractFiredTimers() { + forceRefresh(); Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> fired = watermarkManager.extractFiredTimers(); return fired; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/46bc6e14/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java index 4a3a517..1c9b050 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java @@ -30,7 +30,6 @@ import java.util.PriorityQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; /** * Executes callbacks that occur based on the progression of the watermark per-step. @@ -52,17 +51,17 @@ class WatermarkCallbackExecutor { /** * Create a new {@link WatermarkCallbackExecutor}. */ - public static WatermarkCallbackExecutor create() { - return new WatermarkCallbackExecutor(); + public static WatermarkCallbackExecutor create(ExecutorService executor) { + return new WatermarkCallbackExecutor(executor); } private final ConcurrentMap<AppliedPTransform<?, ?, ?>, PriorityQueue<WatermarkCallback>> callbacks; private final ExecutorService executor; - private WatermarkCallbackExecutor() { + private WatermarkCallbackExecutor(ExecutorService executor) { this.callbacks = new ConcurrentHashMap<>(); - this.executor = Executors.newSingleThreadExecutor(); + this.executor = executor; } /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/46bc6e14/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java index b45440d..7f202fb 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java @@ -167,6 +167,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { null, Collections.<CommittedBundle<?>>singleton(output)), new Instant(8000L)); + manager.refreshAll(); TransformWatermarks updatedSourceWatermark = manager.getWatermarks(createdInts.getProducingTransformInternal()); @@ -187,6 +188,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { null, Collections.<CommittedBundle<?>>singleton(secondPcollectionBundle)), BoundedWindow.TIMESTAMP_MAX_VALUE); + manager.refreshAll(); // We didn't do anything for the first source, so we shouldn't have progressed the watermark TransformWatermarks firstSourceWatermark = @@ -219,7 +221,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { result(flattened.getProducingTransformInternal(), secondPcollectionBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()), Collections.<CommittedBundle<?>>singleton(flattenedBundleSecondCreate)), - null); + BoundedWindow.TIMESTAMP_MAX_VALUE); TransformWatermarks transformAfterProcessing = manager.getWatermarks(flattened.getProducingTransformInternal()); manager.updateWatermarks(secondPcollectionBundle, @@ -227,7 +229,8 @@ public class InMemoryWatermarkManagerTest implements Serializable { result(flattened.getProducingTransformInternal(), secondPcollectionBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()), Collections.<CommittedBundle<?>>singleton(flattenedBundleSecondCreate)), - null); + BoundedWindow.TIMESTAMP_MAX_VALUE); + manager.refreshAll(); assertThat( transformAfterProcessing.getInputWatermark(), not(laterThan(BoundedWindow.TIMESTAMP_MIN_VALUE))); @@ -245,7 +248,8 @@ public class InMemoryWatermarkManagerTest implements Serializable { result(createdInts.getProducingTransformInternal(), null, Collections.<CommittedBundle<?>>singleton(firstPcollectionBundle)), - new Instant(Long.MAX_VALUE)); + BoundedWindow.TIMESTAMP_MAX_VALUE); + manager.refreshAll(); TransformWatermarks firstSourceWatermarks = manager.getWatermarks(createdInts.getProducingTransformInternal()); assertThat( @@ -276,7 +280,8 @@ public class InMemoryWatermarkManagerTest implements Serializable { result(flattened.getProducingTransformInternal(), firstPcollectionBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()), Collections.<CommittedBundle<?>>singleton(completedFlattenBundle)), - null); + BoundedWindow.TIMESTAMP_MAX_VALUE); + manager.refreshAll(); TransformWatermarks afterConsumingAllInput = manager.getWatermarks(flattened.getProducingTransformInternal()); assertThat( @@ -302,6 +307,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { null, Collections.<CommittedBundle<?>>singleton(createdBundle)), new Instant(Long.MAX_VALUE)); + manager.refreshAll(); TransformWatermarks createdAfterProducing = manager.getWatermarks(createdInts.getProducingTransformInternal()); assertThat( @@ -317,7 +323,8 @@ public class InMemoryWatermarkManagerTest implements Serializable { result(keyed.getProducingTransformInternal(), createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()), Collections.<CommittedBundle<?>>singleton(keyBundle)), - null); + BoundedWindow.TIMESTAMP_MAX_VALUE); + manager.refreshAll(); TransformWatermarks keyedWatermarks = manager.getWatermarks(keyed.getProducingTransformInternal()); assertThat( @@ -337,7 +344,8 @@ public class InMemoryWatermarkManagerTest implements Serializable { result(filtered.getProducingTransformInternal(), createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()), Collections.<CommittedBundle<?>>singleton(filteredBundle)), - null); + BoundedWindow.TIMESTAMP_MAX_VALUE); + manager.refreshAll(); TransformWatermarks filteredProcessedWatermarks = manager.getWatermarks(filtered.getProducingTransformInternal()); assertThat( @@ -375,6 +383,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()), Collections.<CommittedBundle<?>>singleton(keyBundle)), new Instant(500L)); + manager.refreshAll(); TransformWatermarks keyedWatermarks = manager.getWatermarks(keyed.getProducingTransformInternal()); assertThat( @@ -418,6 +427,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { secondKeyBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()), Collections.<CommittedBundle<?>>emptyList()), new Instant(1234L)); + manager.refreshAll(); TransformWatermarks filteredWatermarks = manager.getWatermarks(filtered.getProducingTransformInternal()); @@ -433,6 +443,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { fauxFirstKeyTimerBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()), Collections.<CommittedBundle<?>>emptyList()), BoundedWindow.TIMESTAMP_MAX_VALUE); + manager.refreshAll(); assertThat(filteredWatermarks.getOutputWatermark(), equalTo(new Instant(1234L))); @@ -444,6 +455,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { fauxSecondKeyTimerBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()), Collections.<CommittedBundle<?>>emptyList()), new Instant(5678L)); + manager.refreshAll(); assertThat(filteredWatermarks.getOutputWatermark(), equalTo(new Instant(5678L))); manager.updateWatermarks(fauxSecondKeyTimerBundle, @@ -452,6 +464,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { fauxSecondKeyTimerBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()), Collections.<CommittedBundle<?>>emptyList()), BoundedWindow.TIMESTAMP_MAX_VALUE); + manager.refreshAll(); assertThat(filteredWatermarks.getOutputWatermark(), not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE))); } @@ -469,6 +482,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { null, Collections.<CommittedBundle<?>>singleton(firstInput)), new Instant(0L)); + manager.refreshAll(); TransformWatermarks firstWatermarks = manager.getWatermarks(createdInts.getProducingTransformInternal()); assertThat(firstWatermarks.getOutputWatermark(), equalTo(new Instant(0L))); @@ -481,6 +495,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { null, Collections.<CommittedBundle<?>>singleton(secondInput)), new Instant(-250L)); + manager.refreshAll(); TransformWatermarks secondWatermarks = manager.getWatermarks(createdInts.getProducingTransformInternal()); assertThat(secondWatermarks.getOutputWatermark(), not(earlierThan(new Instant(0L)))); @@ -513,6 +528,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()), Collections.<CommittedBundle<?>>singleton(keyBundle)), new Instant(500L)); + manager.refreshAll(); TransformWatermarks keyedWatermarks = manager.getWatermarks(keyed.getProducingTransformInternal()); assertThat( @@ -589,7 +605,8 @@ public class InMemoryWatermarkManagerTest implements Serializable { result(keyed.getProducingTransformInternal(), createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()), Collections.<CommittedBundle<?>>singleton(keyBundle)), - null); + BoundedWindow.TIMESTAMP_MAX_VALUE); + manager.refreshAll(); TransformWatermarks onTimeWatermarks = manager.getWatermarks(keyed.getProducingTransformInternal()); assertThat(onTimeWatermarks.getInputWatermark(), equalTo(sourceWatermark)); @@ -605,6 +622,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()), Collections.<CommittedBundle<?>>singleton(lateDataBundle)), new Instant(2_000_000L)); + manager.refreshAll(); TransformWatermarks bufferedLateWm = manager.getWatermarks(createdInts.getProducingTransformInternal()); assertThat(bufferedLateWm.getOutputWatermark(), equalTo(new Instant(2_000_000L))); @@ -623,7 +641,8 @@ public class InMemoryWatermarkManagerTest implements Serializable { result(keyed.getProducingTransformInternal(), lateDataBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()), Collections.<CommittedBundle<?>>singleton(lateKeyedBundle)), - null); + BoundedWindow.TIMESTAMP_MAX_VALUE); + manager.refreshAll(); } public void updateWatermarkWithDifferentWindowedValueInstances() { @@ -646,6 +665,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()), Collections.<CommittedBundle<?>>emptyList()), null); + manager.refreshAll(); TransformWatermarks onTimeWatermarks = manager.getWatermarks(keyed.getProducingTransformInternal()); assertThat(onTimeWatermarks.getInputWatermark(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE)); @@ -664,6 +684,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { null, Collections.<CommittedBundle<?>>singleton(emptyCreateOutput)), BoundedWindow.TIMESTAMP_MAX_VALUE); + manager.refreshAll(); TransformWatermarks updatedSourceWatermarks = manager.getWatermarks(createdInts.getProducingTransformInternal()); @@ -702,6 +723,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { firstCreateOutput.withElements(Collections.<WindowedValue<Integer>>emptyList()), Collections.<CommittedBundle<?>>singleton(firstFilterOutput)), new Instant(10_000L)); + manager.refreshAll(); TransformWatermarks firstFilterWatermarks = manager.getWatermarks(filtered.getProducingTransformInternal()); assertThat(firstFilterWatermarks.getInputWatermark(), not(earlierThan(new Instant(12_000L)))); @@ -714,6 +736,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { null, Collections.<CommittedBundle<?>>singleton(emptyCreateOutput)), BoundedWindow.TIMESTAMP_MAX_VALUE); + manager.refreshAll(); TransformWatermarks updatedSourceWatermarks = manager.getWatermarks(createdInts.getProducingTransformInternal()); @@ -757,6 +780,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { null, Collections.<CommittedBundle<?>>singleton(createOutput)), BoundedWindow.TIMESTAMP_MAX_VALUE); + manager.refreshAll(); TransformWatermarks createAfterUpdate = manager.getWatermarks(createdInts.getProducingTransformInternal()); assertThat(createAfterUpdate.getSynchronizedProcessingInputTime(), equalTo(clock.now())); @@ -787,6 +811,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { createOutput.withElements(Collections.<WindowedValue<Integer>>emptyList()), Collections.<CommittedBundle<?>>singleton(filterOutputBundle)), BoundedWindow.TIMESTAMP_MAX_VALUE); + manager.refreshAll(); TransformWatermarks filterAfterConsumed = manager.getWatermarks(filtered.getProducingTransformInternal()); assertThat( @@ -812,6 +837,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { null, Collections.<CommittedBundle<?>>singleton(createdBundle)), new Instant(1248L)); + manager.refreshAll(); TransformWatermarks filteredWms = manager.getWatermarks(filtered.getProducingTransformInternal()); @@ -833,6 +859,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()), Collections.<CommittedBundle<?>>singleton(filteredBundle)), BoundedWindow.TIMESTAMP_MAX_VALUE); + manager.refreshAll(); Instant startTime = clock.now(); clock.set(startTime.plus(250L)); // We're held based on the past timer @@ -871,6 +898,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { filteredTimerBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()), Collections.<CommittedBundle<?>>singleton(filteredTimerResult)), BoundedWindow.TIMESTAMP_MAX_VALUE); + manager.refreshAll(); clock.set(startTime.plus(500L)); assertThat(filteredWms.getSynchronizedProcessingOutputTime(), not(laterThan(clock.now()))); @@ -885,6 +913,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { filteredTimerResult.withElements(Collections.<WindowedValue<Integer>>emptyList()), Collections.<CommittedBundle<?>>emptyList()), BoundedWindow.TIMESTAMP_MAX_VALUE); + manager.refreshAll(); assertThat(filteredDoubledWms.getSynchronizedProcessingOutputTime(), equalTo(clock.now())); clock.set(new Instant(Long.MAX_VALUE)); @@ -923,6 +952,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { null, Collections.<CommittedBundle<?>>singleton(createOutput)), BoundedWindow.TIMESTAMP_MAX_VALUE); + manager.refreshAll(); TransformWatermarks createAfterUpdate = manager.getWatermarks(createdInts.getProducingTransformInternal()); assertThat(createAfterUpdate.getSynchronizedProcessingInputTime(), not(laterThan(clock.now()))); @@ -937,6 +967,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { null, Collections.<CommittedBundle<?>>singleton(createSecondOutput)), BoundedWindow.TIMESTAMP_MAX_VALUE); + manager.refreshAll(); assertThat(createAfterUpdate.getSynchronizedProcessingOutputTime(), equalTo(clock.now())); } @@ -950,6 +981,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { null, Collections.<CommittedBundle<?>>singleton(created)), new Instant(40_900L)); + manager.refreshAll(); CommittedBundle<Integer> filteredBundle = multiWindowedBundle(filtered, 2, 4); Instant upstreamHold = new Instant(2048L); @@ -961,6 +993,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { created.withElements(Collections.<WindowedValue<Integer>>emptyList()), Collections.<CommittedBundle<?>>singleton(filteredBundle)), BoundedWindow.TIMESTAMP_MAX_VALUE); + manager.refreshAll(); TransformWatermarks downstreamWms = manager.getWatermarks(filteredTimesTwo.getProducingTransformInternal()); @@ -982,6 +1015,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { otherCreated.withElements(Collections.<WindowedValue<Integer>>emptyList()), Collections.<CommittedBundle<?>>emptyList()), BoundedWindow.TIMESTAMP_MAX_VALUE); + manager.refreshAll(); assertThat(downstreamWms.getSynchronizedProcessingInputTime(), not(earlierThan(clock.now()))); } @@ -1007,6 +1041,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { created.withElements(Collections.<WindowedValue<Integer>>emptyList()), Collections.<CommittedBundle<?>>singleton(filteredBundle)), BoundedWindow.TIMESTAMP_MAX_VALUE); + manager.refreshAll(); TransformWatermarks downstreamWms = manager.getWatermarks(filteredTimesTwo.getProducingTransformInternal()); @@ -1031,6 +1066,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { null, Collections.singleton(createdBundle)), new Instant(1500L)); + manager.refreshAll(); TimerData earliestTimer = TimerData.of(StateNamespaces.global(), new Instant(1000), TimeDomain.EVENT_TIME); @@ -1052,6 +1088,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()), Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten))), new Instant(1000L)); + manager.refreshAll(); Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> firstTransformFiredTimers = manager.extractFiredTimers(); @@ -1069,6 +1106,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { null, Collections.<CommittedBundle<?>>emptyList()), new Instant(50_000L)); + manager.refreshAll(); Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> secondTransformFiredTimers = manager.extractFiredTimers(); assertThat( @@ -1118,6 +1156,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()), Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten))), new Instant(1000L)); + manager.refreshAll(); Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> firstTransformFiredTimers = manager.extractFiredTimers(); @@ -1136,6 +1175,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { null, Collections.<CommittedBundle<?>>emptyList()), new Instant(50_000L)); + manager.refreshAll(); Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> secondTransformFiredTimers = manager.extractFiredTimers(); assertThat( @@ -1185,6 +1225,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()), Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten))), new Instant(1000L)); + manager.refreshAll(); Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> firstTransformFiredTimers = manager.extractFiredTimers(); @@ -1204,6 +1245,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { null, Collections.<CommittedBundle<?>>emptyList()), new Instant(50_000L)); + manager.refreshAll(); Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> secondTransformFiredTimers = manager.extractFiredTimers(); assertThat( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/46bc6e14/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java index 59c4d8e..b73e41a 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java @@ -317,7 +317,6 @@ public class InProcessEvaluationContextTest { .build(); context.handleResult(null, ImmutableList.<TimerData>of(), result); - // Difficult to demonstrate that we took no action in a multithreaded world; poll for a bit // will likely be flaky if this logic is broken assertThat(callLatch.await(500L, TimeUnit.MILLISECONDS), is(false)); @@ -325,6 +324,7 @@ public class InProcessEvaluationContextTest { InProcessTransformResult finishedResult = StepTransformResult.withoutHold(created.getProducingTransformInternal()).build(); context.handleResult(null, ImmutableList.<TimerData>of(), finishedResult); + context.forceRefresh(); // Obtain the value via blocking call assertThat(callLatch.await(1, TimeUnit.SECONDS), is(true)); } @@ -336,6 +336,7 @@ public class InProcessEvaluationContextTest { context.handleResult(null, ImmutableList.<TimerData>of(), finishedResult); final CountDownLatch callLatch = new CountDownLatch(1); + context.extractFiredTimers(); Runnable callback = new Runnable() { @Override @@ -426,6 +427,7 @@ public class InProcessEvaluationContextTest { null, ImmutableList.<TimerData>of(), StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build()); + context.extractFiredTimers(); assertThat(context.isDone(unbounded.getProducingTransformInternal()), is(true)); } @@ -450,6 +452,7 @@ public class InProcessEvaluationContextTest { null, ImmutableList.<TimerData>of(), StepTransformResult.withoutHold(created.getProducingTransformInternal()).build()); + context.extractFiredTimers(); assertThat(context.isDone(created.getProducingTransformInternal()), is(true)); } @@ -482,6 +485,7 @@ public class InProcessEvaluationContextTest { ImmutableList.<TimerData>of(), StepTransformResult.withoutHold(consumers).build()); } + context.extractFiredTimers(); assertThat(context.isDone(), is(true)); } @@ -502,12 +506,14 @@ public class InProcessEvaluationContextTest { context.createRootBundle(created).commit(Instant.now()), ImmutableList.<TimerData>of(), StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build()); + context.extractFiredTimers(); assertThat(context.isDone(), is(false)); context.handleResult( context.createRootBundle(created).commit(Instant.now()), ImmutableList.<TimerData>of(), StepTransformResult.withoutHold(view.getProducingTransformInternal()).build()); + context.extractFiredTimers(); assertThat(context.isDone(), is(false)); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/46bc6e14/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java index d47cf5e..b6b2bf5 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java @@ -40,6 +40,7 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** @@ -47,7 +48,8 @@ import java.util.concurrent.TimeUnit; */ @RunWith(JUnit4.class) public class WatermarkCallbackExecutorTest { - private WatermarkCallbackExecutor executor = WatermarkCallbackExecutor.create(); + private WatermarkCallbackExecutor executor = + WatermarkCallbackExecutor.create(Executors.newSingleThreadExecutor()); private AppliedPTransform<?, ?, ?> create; private AppliedPTransform<?, ?, ?> sum;
