Remove References to Instant#now in the DirectRunner The DirectRunner should use exclusively the configured clock to determine the processing time.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7585cfc3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7585cfc3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7585cfc3 Branch: refs/heads/master Commit: 7585cfc3693800b00c4ccc799c27f0311e9b0cc1 Parents: fcf6b1d Author: Thomas Groh <[email protected]> Authored: Fri Aug 5 09:58:05 2016 -0700 Committer: Thomas Groh <[email protected]> Committed: Fri Aug 5 10:04:21 2016 -0700 ---------------------------------------------------------------------- .../apache/beam/runners/direct/EvaluationContext.java | 14 ++++++++++---- .../direct/ExecutorServiceParallelExecutor.java | 5 ++--- 2 files changed, 12 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7585cfc3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java index 23c139d..94f28e2 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java @@ -48,6 +48,8 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.util.concurrent.MoreExecutors; +import org.joda.time.Instant; + import java.util.Collection; import java.util.EnumSet; import java.util.List; @@ -81,6 +83,7 @@ class EvaluationContext { /** The options that were used to create this {@link Pipeline}. */ private final DirectOptions options; + private final Clock clock; private final BundleFactory bundleFactory; /** The current processing time and event time watermarks and timers. */ @@ -116,6 +119,7 @@ class EvaluationContext { Map<AppliedPTransform<?, ?, ?>, String> stepNames, Collection<PCollectionView<?>> views) { this.options = checkNotNull(options); + this.clock = options.getClock(); this.bundleFactory = checkNotNull(bundleFactory); checkNotNull(rootTransforms); checkNotNull(valueToConsumers); @@ -123,9 +127,7 @@ class EvaluationContext { checkNotNull(views); this.stepNames = stepNames; - this.watermarkManager = - WatermarkManager.create( - NanosOffsetClock.create(), rootTransforms, valueToConsumers); + this.watermarkManager = WatermarkManager.create(clock, rootTransforms, valueToConsumers); this.sideInputContainer = SideInputContainer.create(this, views); this.applicationStateInternals = new ConcurrentHashMap<>(); @@ -314,7 +316,7 @@ class EvaluationContext { AppliedPTransform<?, ?, ?> application, StructuralKey<?> key) { StepAndKey stepAndKey = StepAndKey.of(application, key); return new DirectExecutionContext( - options.getClock(), + clock, key, (CopyOnAccessInMemoryStateInternals<Object>) applicationStateInternals.get(stepAndKey), watermarkManager.getWatermarks(application)); @@ -427,4 +429,8 @@ class EvaluationContext { } return true; } + + public Instant now() { + return clock.now(); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7585cfc3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java index 64836d8..a0a5ec0 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java @@ -40,7 +40,6 @@ import com.google.common.cache.LoadingCache; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; -import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -433,9 +432,9 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor { .createKeyedBundle( null, keyTimers.getKey(), (PCollection) transform.getInput()) .add(WindowedValue.valueInEmptyWindows(work)) - .commit(Instant.now()); - state.set(ExecutorState.ACTIVE); + .commit(evaluationContext.now()); scheduleConsumption(transform, bundle, new TimerIterableCompletionCallback(delivery)); + state.set(ExecutorState.ACTIVE); } } }
