Update Timers and State in the InProcess ParDoEvaluator If the ParDo accessed state, put the committed value into the transform result, and likewise with timers.
Add a #commitState method to InProcessStepContext to return the committed state. Implement stateInternals() and timerInternals() to provide actual implementations of StateInternals and TimerInternals. Use concrete types due to implementation requirements. stateInternals() and timerInternals() construct response values the first time they are called based on the underlying data structure; #commitState returns null if and only if stateInternals was not used by the transform, and likewise for #getTimerUpdate ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=115504122 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ca98da2a Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ca98da2a Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ca98da2a Branch: refs/heads/master Commit: ca98da2a372210325e1c8985292b4040d1ac8c62 Parents: 639e9d9 Author: tgroh <[email protected]> Authored: Wed Feb 24 16:01:50 2016 -0800 Committer: Davor Bonaci <[email protected]> Committed: Thu Feb 25 23:58:28 2016 -0800 ---------------------------------------------------------------------- .../inprocess/InProcessExecutionContext.java | 108 +++++++++ .../inprocess/InProcessPipelineRunner.java | 47 +--- .../inprocess/ParDoInProcessEvaluator.java | 34 ++- .../inprocess/ParDoMultiEvaluatorFactory.java | 25 +-- .../inprocess/ParDoSingleEvaluatorFactory.java | 30 +-- .../util/InMemoryWatermarkManager.java | 17 ++ .../ParDoMultiEvaluatorFactoryTest.java | 223 ++++++++++++++++++- .../ParDoSingleEvaluatorFactoryTest.java | 184 ++++++++++++++- 8 files changed, 556 insertions(+), 112 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ca98da2a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessExecutionContext.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessExecutionContext.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessExecutionContext.java new file mode 100644 index 0000000..6342cd4 --- /dev/null +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessExecutionContext.java @@ -0,0 +1,108 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.runners.inprocess; + +import com.google.cloud.dataflow.sdk.runners.inprocess.util.Clock; +import com.google.cloud.dataflow.sdk.runners.inprocess.util.InMemoryWatermarkManager.TimerUpdate; +import com.google.cloud.dataflow.sdk.runners.inprocess.util.InMemoryWatermarkManager.TransformWatermarks; +import com.google.cloud.dataflow.sdk.runners.inprocess.util.InProcessTimerInternals; +import com.google.cloud.dataflow.sdk.util.BaseExecutionContext; +import com.google.cloud.dataflow.sdk.util.ExecutionContext; +import com.google.cloud.dataflow.sdk.util.TimerInternals; +import com.google.cloud.dataflow.sdk.util.common.worker.StateSampler; +import com.google.cloud.dataflow.sdk.util.state.CopyOnAccessInMemoryStateInternals; + +/** + * Execution Context for the {@link InProcessPipelineRunner}. + * + * This implementation is not thread safe. A new {@link InProcessExecutionContext} must be created + * for each thread that requires it. + */ +class InProcessExecutionContext + extends BaseExecutionContext<InProcessExecutionContext.InProcessStepContext> { + private final Clock clock; + private final Object key; + private final CopyOnAccessInMemoryStateInternals<Object> existingState; + private final TransformWatermarks watermarks; + + public InProcessExecutionContext(Clock clock, Object key, + CopyOnAccessInMemoryStateInternals<Object> existingState, TransformWatermarks watermarks) { + this.clock = clock; + this.key = key; + this.existingState = existingState; + this.watermarks = watermarks; + } + + @Override + protected InProcessStepContext createStepContext( + String stepName, String transformName, StateSampler stateSampler) { + return new InProcessStepContext(this, stepName, transformName); + } + + /** + * Step Context for the {@link InProcessPipelineRunner}. + */ + public class InProcessStepContext + extends com.google.cloud.dataflow.sdk.util.BaseExecutionContext.StepContext { + private CopyOnAccessInMemoryStateInternals<Object> stateInternals; + private InProcessTimerInternals timerInternals; + + public InProcessStepContext( + ExecutionContext executionContext, String stepName, String transformName) { + super(executionContext, stepName, transformName); + } + + @Override + public CopyOnAccessInMemoryStateInternals<Object> stateInternals() { + if (stateInternals == null) { + stateInternals = CopyOnAccessInMemoryStateInternals.withUnderlying(key, existingState); + } + return stateInternals; + } + + @Override + public InProcessTimerInternals timerInternals() { + if (timerInternals == null) { + timerInternals = + InProcessTimerInternals.create(clock, watermarks, TimerUpdate.builder(key)); + } + return timerInternals; + } + + /** + * Commits the state of this step, and returns the committed state. If the step has not + * accessed any state, return null. + */ + public CopyOnAccessInMemoryStateInternals<?> commitState() { + if (stateInternals != null) { + return stateInternals.commit(); + } + return null; + } + + /** + * Gets the timer update of the {@link TimerInternals} of this {@link InProcessStepContext}, + * which is empty if the {@link TimerInternals} were never accessed. + */ + public TimerUpdate getTimerUpdate() { + if (timerInternals == null) { + return TimerUpdate.empty(); + } + return timerInternals.getTimerUpdate(); + } + } +} + http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ca98da2a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java index 26c5061..124de46 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java @@ -28,16 +28,12 @@ import com.google.cloud.dataflow.sdk.transforms.PTransform; import com.google.cloud.dataflow.sdk.transforms.View.CreatePCollectionView; import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; import com.google.cloud.dataflow.sdk.transforms.windowing.Trigger; -import com.google.cloud.dataflow.sdk.util.BaseExecutionContext; import com.google.cloud.dataflow.sdk.util.ExecutionContext; import com.google.cloud.dataflow.sdk.util.SideInputReader; -import com.google.cloud.dataflow.sdk.util.TimerInternals; import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData; import com.google.cloud.dataflow.sdk.util.WindowedValue; import com.google.cloud.dataflow.sdk.util.WindowingStrategy; import com.google.cloud.dataflow.sdk.util.common.CounterSet; -import com.google.cloud.dataflow.sdk.util.common.worker.StateSampler; -import com.google.cloud.dataflow.sdk.util.state.StateInternals; import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.cloud.dataflow.sdk.values.PCollectionView; import com.google.cloud.dataflow.sdk.values.PValue; @@ -159,46 +155,6 @@ public class InProcessPipelineRunner { } /** - * Execution Context for the InMemoryPipelineRunner. - * - * This implementation is not thread safe. A new InMemoryExecutionContext must be created for each - * thread that requires it. - */ - public static class InProcessExecutionContext - extends BaseExecutionContext<InProcessExecutionContext.InProcessStepContext> { - @Override - protected InProcessStepContext createStepContext( - String stepName, String transformName, StateSampler stateSampler) { - return new InProcessStepContext(this, stepName, transformName); - } - - /** - * Step Context for the InMemoryPipelineRunner. - */ - public class InProcessStepContext - extends com.google.cloud.dataflow.sdk.util.BaseExecutionContext.StepContext { - public InProcessStepContext( - InProcessExecutionContext executionContext, String stepName, String transformName) { - super(executionContext, stepName, transformName); - } - - @Override - public StateInternals stateInternals() { - // TODO get or create state for current key. - throw new UnsupportedOperationException("StateInternals not yet meaningfully supported"); - } - - @Override - public TimerInternals timerInternals() { - // TODO: Have the executionContext/evaluationContext pass this in - throw new UnsupportedOperationException("TimerInternals not yet meaningfully supported"); - } - } - - } - - - /** * The evaluation context for the {@link InProcessPipelineRunner}. Contains state shared within * the current evaluation. */ @@ -235,7 +191,8 @@ public class InProcessPipelineRunner { /** * Get an {@link ExecutionContext} for the provided application. */ - InProcessExecutionContext getExecutionContext(AppliedPTransform<?, ?, ?> application); + InProcessExecutionContext getExecutionContext( + AppliedPTransform<?, ?, ?> application, @Nullable Object key); /** * Get the Step Name for the provided application. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ca98da2a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoInProcessEvaluator.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoInProcessEvaluator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoInProcessEvaluator.java index f0b2ca2..2a21e8c 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoInProcessEvaluator.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoInProcessEvaluator.java @@ -15,49 +15,63 @@ */ package com.google.cloud.dataflow.sdk.runners.inprocess; +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessExecutionContext.InProcessStepContext; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; -import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; import com.google.cloud.dataflow.sdk.util.DoFnRunner; import com.google.cloud.dataflow.sdk.util.DoFnRunners.OutputManager; import com.google.cloud.dataflow.sdk.util.WindowedValue; import com.google.cloud.dataflow.sdk.util.common.CounterSet; +import com.google.cloud.dataflow.sdk.util.state.CopyOnAccessInMemoryStateInternals; import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.cloud.dataflow.sdk.values.TupleTag; -import org.joda.time.Instant; - import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; -class ParDoInProcessEvaluator<T> { +class ParDoInProcessEvaluator<T> implements TransformEvaluator<T> { private final DoFnRunner<T, ?> fnRunner; private final AppliedPTransform<PCollection<T>, ?, ?> transform; private final CounterSet counters; private final Collection<UncommittedBundle<?>> outputBundles; + private final InProcessStepContext stepContext; - public ParDoInProcessEvaluator(DoFnRunner<T, ?> fnRunner, - AppliedPTransform<PCollection<T>, ?, ?> transform, CounterSet counters, - Collection<UncommittedBundle<?>> outputBundles) { + public ParDoInProcessEvaluator( + DoFnRunner<T, ?> fnRunner, + AppliedPTransform<PCollection<T>, ?, ?> transform, + CounterSet counters, + Collection<UncommittedBundle<?>> outputBundles, + InProcessStepContext stepContext) { this.fnRunner = fnRunner; this.transform = transform; this.counters = counters; this.outputBundles = outputBundles; + this.stepContext = stepContext; } + @Override public void processElement(WindowedValue<T> element) { fnRunner.processElement(element); } + @Override public InProcessTransformResult finishBundle() { fnRunner.finishBundle(); - // TODO Use a real value - Instant hold = BoundedWindow.TIMESTAMP_MAX_VALUE; - return StepTransformResult.withHold(transform, hold) + StepTransformResult.Builder resultBuilder; + CopyOnAccessInMemoryStateInternals<?> state = stepContext.commitState(); + if (state != null) { + resultBuilder = + StepTransformResult.withHold(transform, state.getEarliestWatermarkHold()) + .withState(state); + } else { + resultBuilder = StepTransformResult.withoutHold(transform); + } + return resultBuilder .addOutput(outputBundles) + .withTimerUpdate(stepContext.getTimerUpdate()) .withCounters(counters) .build(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ca98da2a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java index ad68a6b..e3ae1a0 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java @@ -15,10 +15,9 @@ */ package com.google.cloud.dataflow.sdk.runners.inprocess; +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessExecutionContext.InProcessStepContext; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessExecutionContext; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessExecutionContext.InProcessStepContext; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; import com.google.cloud.dataflow.sdk.runners.inprocess.ParDoInProcessEvaluator.BundleOutputManager; import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; @@ -27,7 +26,6 @@ import com.google.cloud.dataflow.sdk.transforms.PTransform; import com.google.cloud.dataflow.sdk.transforms.ParDo.BoundMulti; import com.google.cloud.dataflow.sdk.util.DoFnRunner; import com.google.cloud.dataflow.sdk.util.DoFnRunners; -import com.google.cloud.dataflow.sdk.util.WindowedValue; import com.google.cloud.dataflow.sdk.util.common.CounterSet; import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.cloud.dataflow.sdk.values.PCollectionTuple; @@ -46,20 +44,7 @@ class ParDoMultiEvaluatorFactory implements TransformEvaluatorFactory { AppliedPTransform<?, ?, ?> application, CommittedBundle<?> inputBundle, InProcessEvaluationContext evaluationContext) { - @SuppressWarnings({"unchecked", "rawtypes"}) - final ParDoInProcessEvaluator<T> multiEvaluator = - createMultiEvaluator((AppliedPTransform) application, inputBundle, evaluationContext); - return new TransformEvaluator<T>() { - @Override - public void processElement(WindowedValue<T> value) { - multiEvaluator.processElement(value); - } - - @Override - public InProcessTransformResult finishBundle() { - return multiEvaluator.finishBundle(); - } - }; + return createMultiEvaluator((AppliedPTransform) application, inputBundle, evaluationContext); } private static <InT, OuT> ParDoInProcessEvaluator<InT> createMultiEvaluator( @@ -74,7 +59,8 @@ class ParDoMultiEvaluatorFactory implements TransformEvaluatorFactory { outputEntry.getKey(), evaluationContext.createBundle(inputBundle, outputEntry.getValue())); } - InProcessExecutionContext executionContext = evaluationContext.getExecutionContext(application); + InProcessExecutionContext executionContext = + evaluationContext.getExecutionContext(application, inputBundle.getKey()); String stepName = evaluationContext.getStepName(application); InProcessStepContext stepContext = executionContext.getOrCreateStepContext(stepName, stepName, null); @@ -96,6 +82,7 @@ class ParDoMultiEvaluatorFactory implements TransformEvaluatorFactory { runner.startBundle(); - return new ParDoInProcessEvaluator<>(runner, application, counters, outputBundles.values()); + return new ParDoInProcessEvaluator<>( + runner, application, counters, outputBundles.values(), stepContext); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ca98da2a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java index 737d0e9..cd79c21 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java @@ -15,10 +15,9 @@ */ package com.google.cloud.dataflow.sdk.runners.inprocess; +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessExecutionContext.InProcessStepContext; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessExecutionContext; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessExecutionContext.InProcessStepContext; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; import com.google.cloud.dataflow.sdk.runners.inprocess.ParDoInProcessEvaluator.BundleOutputManager; import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; @@ -26,7 +25,6 @@ import com.google.cloud.dataflow.sdk.transforms.PTransform; import com.google.cloud.dataflow.sdk.transforms.ParDo.Bound; import com.google.cloud.dataflow.sdk.util.DoFnRunner; import com.google.cloud.dataflow.sdk.util.DoFnRunners; -import com.google.cloud.dataflow.sdk.util.WindowedValue; import com.google.cloud.dataflow.sdk.util.common.CounterSet; import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.cloud.dataflow.sdk.values.TupleTag; @@ -43,22 +41,7 @@ class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory { final AppliedPTransform<?, ?, ?> application, CommittedBundle<?> inputBundle, InProcessEvaluationContext evaluationContext) { - @SuppressWarnings({"unchecked", "rawtypes"}) - final ParDoInProcessEvaluator<T> evaluator = - createSingleEvaluator((AppliedPTransform) application, inputBundle, evaluationContext); - TransformEvaluator<T> singleEvaluator = - new TransformEvaluator<T>() { - @Override - public void processElement(WindowedValue<T> value) { - evaluator.processElement(value); - } - - @Override - public InProcessTransformResult finishBundle() { - return evaluator.finishBundle(); - } - }; - return singleEvaluator; + return createSingleEvaluator((AppliedPTransform) application, inputBundle, evaluationContext); } private static <InputT, OutputT> ParDoInProcessEvaluator<InputT> createSingleEvaluator( @@ -69,7 +52,8 @@ class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory { UncommittedBundle<OutputT> outputBundle = evaluationContext.createBundle(inputBundle, application.getOutput()); - InProcessExecutionContext executionContext = evaluationContext.getExecutionContext(application); + InProcessExecutionContext executionContext = + evaluationContext.getExecutionContext(application, inputBundle.getKey()); String stepName = evaluationContext.getStepName(application); InProcessStepContext stepContext = executionContext.getOrCreateStepContext(stepName, stepName, null); @@ -92,6 +76,10 @@ class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory { runner.startBundle(); return new ParDoInProcessEvaluator<InputT>( - runner, application, counters, Collections.<UncommittedBundle<?>>singleton(outputBundle)); + runner, + application, + counters, + Collections.<UncommittedBundle<?>>singleton(outputBundle), + stepContext); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ca98da2a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/util/InMemoryWatermarkManager.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/util/InMemoryWatermarkManager.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/util/InMemoryWatermarkManager.java index ea6e00a..4428e41 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/util/InMemoryWatermarkManager.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/util/InMemoryWatermarkManager.java @@ -1244,6 +1244,23 @@ public class InMemoryWatermarkManager { Iterable<? extends TimerData> getDeletedTimers() { return deletedTimers; } + + @Override + public int hashCode() { + return Objects.hash(key, completedTimers, setTimers, deletedTimers); + } + + @Override + public boolean equals(Object other) { + if (other == null || !(other instanceof TimerUpdate)) { + return false; + } + TimerUpdate that = (TimerUpdate) other; + return Objects.equals(this.key, that.key) + && Objects.equals(this.completedTimers, that.completedTimers) + && Objects.equals(this.setTimers, that.setTimers) + && Objects.equals(this.deletedTimers, that.deletedTimers); + } } /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ca98da2a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java index c55a9d5..80863b9 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java @@ -15,15 +15,19 @@ */ package com.google.cloud.dataflow.sdk.runners.inprocess; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessExecutionContext; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; +import com.google.cloud.dataflow.sdk.runners.inprocess.util.InMemoryWatermarkManager.TimerUpdate; import com.google.cloud.dataflow.sdk.runners.inprocess.util.InProcessBundle; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.Create; @@ -31,9 +35,20 @@ import com.google.cloud.dataflow.sdk.transforms.DoFn; import com.google.cloud.dataflow.sdk.transforms.ParDo; import com.google.cloud.dataflow.sdk.transforms.ParDo.BoundMulti; import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFns; import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo; +import com.google.cloud.dataflow.sdk.util.TimeDomain; +import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData; import com.google.cloud.dataflow.sdk.util.WindowedValue; import com.google.cloud.dataflow.sdk.util.common.CounterSet; +import com.google.cloud.dataflow.sdk.util.state.BagState; +import com.google.cloud.dataflow.sdk.util.state.StateNamespace; +import com.google.cloud.dataflow.sdk.util.state.StateNamespaces; +import com.google.cloud.dataflow.sdk.util.state.StateTag; +import com.google.cloud.dataflow.sdk.util.state.StateTags; +import com.google.cloud.dataflow.sdk.util.state.WatermarkHoldState; import com.google.cloud.dataflow.sdk.values.KV; import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.cloud.dataflow.sdk.values.PCollectionTuple; @@ -41,6 +56,7 @@ import com.google.cloud.dataflow.sdk.values.TupleTag; import com.google.cloud.dataflow.sdk.values.TupleTagList; import org.hamcrest.Matchers; +import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Test; import org.junit.runner.RunWith; @@ -81,20 +97,18 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable { PCollection<Integer> lengthOutput = outputTuple.get(lengthTag); InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class); - UncommittedBundle<KV<String, Integer>> mainOutputBundle = - InProcessBundle.unkeyed(mainOutput); - UncommittedBundle<String> elementOutputBundle = - InProcessBundle.unkeyed(elementOutput); - UncommittedBundle<Integer> lengthOutputBundle = - InProcessBundle.unkeyed(lengthOutput); + UncommittedBundle<KV<String, Integer>> mainOutputBundle = InProcessBundle.unkeyed(mainOutput); + UncommittedBundle<String> elementOutputBundle = InProcessBundle.unkeyed(elementOutput); + UncommittedBundle<Integer> lengthOutputBundle = InProcessBundle.unkeyed(lengthOutput); when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle); when(evaluationContext.createBundle(inputBundle, elementOutput)) .thenReturn(elementOutputBundle); when(evaluationContext.createBundle(inputBundle, lengthOutput)).thenReturn(lengthOutputBundle); - InProcessExecutionContext executionContext = new InProcessExecutionContext(); - when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal())) + InProcessExecutionContext executionContext = + new InProcessExecutionContext(null, null, null, null); + when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), null)) .thenReturn(executionContext); CounterSet counters = new CounterSet(); when(evaluationContext.createCounterSet()).thenReturn(counters); @@ -173,8 +187,8 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable { .thenReturn(elementOutputBundle); InProcessExecutionContext executionContext = - new InProcessExecutionContext(); - when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal())) + new InProcessExecutionContext(null, null, null, null); + when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), null)) .thenReturn(executionContext); CounterSet counters = new CounterSet(); when(evaluationContext.createCounterSet()).thenReturn(counters); @@ -211,5 +225,190 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable { WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)), WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING))); } -} + @Test + public void finishBundleWithStatePutsStateInResult() throws Exception { + TestPipeline p = TestPipeline.create(); + + PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam")); + + TupleTag<KV<String, Integer>> mainOutputTag = new TupleTag<KV<String, Integer>>() {}; + final TupleTag<String> elementTag = new TupleTag<>(); + + final StateTag<Object, WatermarkHoldState<BoundedWindow>> watermarkTag = + StateTags.watermarkStateInternal("myId", OutputTimeFns.outputAtEndOfWindow()); + final StateTag<Object, BagState<String>> bagTag = StateTags.bag("myBag", StringUtf8Coder.of()); + final StateNamespace windowNs = + StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE); + BoundMulti<String, KV<String, Integer>> pardo = + ParDo.of( + new DoFn<String, KV<String, Integer>>() { + @Override + public void processElement(ProcessContext c) { + c.windowingInternals() + .stateInternals() + .state(StateNamespaces.global(), watermarkTag) + .add(new Instant(20202L + c.element().length())); + c.windowingInternals() + .stateInternals() + .state( + StateNamespaces.window( + GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE), + bagTag) + .add(c.element()); + } + }) + .withOutputTags(mainOutputTag, TupleTagList.of(elementTag)); + PCollectionTuple outputTuple = input.apply(pardo); + + CommittedBundle<String> inputBundle = InProcessBundle.unkeyed(input).commit(Instant.now()); + + PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag); + PCollection<String> elementOutput = outputTuple.get(elementTag); + + InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class); + UncommittedBundle<KV<String, Integer>> mainOutputBundle = InProcessBundle.unkeyed(mainOutput); + UncommittedBundle<String> elementOutputBundle = InProcessBundle.unkeyed(elementOutput); + + when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle); + when(evaluationContext.createBundle(inputBundle, elementOutput)) + .thenReturn(elementOutputBundle); + + InProcessExecutionContext executionContext = + new InProcessExecutionContext(null, "myKey", null, null); + when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), null)) + .thenReturn(executionContext); + CounterSet counters = new CounterSet(); + when(evaluationContext.createCounterSet()).thenReturn(counters); + + com.google.cloud.dataflow.sdk.runners.inprocess.TransformEvaluator<String> evaluator = + new ParDoMultiEvaluatorFactory().forApplication( + mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext); + + evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); + evaluator.processElement( + WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000))); + evaluator.processElement( + WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)); + + InProcessTransformResult result = evaluator.finishBundle(); + assertThat( + result.getOutputBundles(), + Matchers.<UncommittedBundle<?>>containsInAnyOrder(mainOutputBundle, elementOutputBundle)); + assertThat(result.getWatermarkHold(), equalTo(new Instant(20205L))); + assertThat(result.getState(), not(nullValue())); + assertThat( + result.getState().state(StateNamespaces.global(), watermarkTag).read(), + equalTo(new Instant(20205L))); + assertThat( + result.getState().state(windowNs, bagTag).read(), + containsInAnyOrder("foo", "bara", "bazam")); + } + + @Test + public void finishBundleWithStateAndTimersPutsTimersInResult() throws Exception { + TestPipeline p = TestPipeline.create(); + + PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam")); + + TupleTag<KV<String, Integer>> mainOutputTag = new TupleTag<KV<String, Integer>>() {}; + final TupleTag<String> elementTag = new TupleTag<>(); + + final TimerData addedTimer = + TimerData.of( + StateNamespaces.window( + IntervalWindow.getCoder(), + new IntervalWindow( + new Instant(0).plus(Duration.standardMinutes(5)), + new Instant(1) + .plus(Duration.standardMinutes(5)) + .plus(Duration.standardHours(1)))), + new Instant(54541L), + TimeDomain.EVENT_TIME); + final TimerData deletedTimer = + TimerData.of( + StateNamespaces.window( + IntervalWindow.getCoder(), + new IntervalWindow(new Instant(0), new Instant(0).plus(Duration.standardHours(1)))), + new Instant(3400000), + TimeDomain.SYNCHRONIZED_PROCESSING_TIME); + + BoundMulti<String, KV<String, Integer>> pardo = + ParDo.of( + new DoFn<String, KV<String, Integer>>() { + @Override + public void processElement(ProcessContext c) { + c.windowingInternals().stateInternals(); + c.windowingInternals() + .timerInternals() + .setTimer( + TimerData.of( + StateNamespaces.window( + IntervalWindow.getCoder(), + new IntervalWindow( + new Instant(0).plus(Duration.standardMinutes(5)), + new Instant(1) + .plus(Duration.standardMinutes(5)) + .plus(Duration.standardHours(1)))), + new Instant(54541L), + TimeDomain.EVENT_TIME)); + c.windowingInternals() + .timerInternals() + .deleteTimer( + TimerData.of( + StateNamespaces.window( + IntervalWindow.getCoder(), + new IntervalWindow( + new Instant(0), + new Instant(0).plus(Duration.standardHours(1)))), + new Instant(3400000), + TimeDomain.SYNCHRONIZED_PROCESSING_TIME)); + } + }) + .withOutputTags(mainOutputTag, TupleTagList.of(elementTag)); + PCollectionTuple outputTuple = input.apply(pardo); + + CommittedBundle<String> inputBundle = InProcessBundle.unkeyed(input).commit(Instant.now()); + + PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag); + PCollection<String> elementOutput = outputTuple.get(elementTag); + + InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class); + UncommittedBundle<KV<String, Integer>> mainOutputBundle = InProcessBundle.unkeyed(mainOutput); + UncommittedBundle<String> elementOutputBundle = InProcessBundle.unkeyed(elementOutput); + + when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle); + when(evaluationContext.createBundle(inputBundle, elementOutput)) + .thenReturn(elementOutputBundle); + + InProcessExecutionContext executionContext = + new InProcessExecutionContext(null, "myKey", null, null); + when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), null)) + .thenReturn(executionContext); + CounterSet counters = new CounterSet(); + when(evaluationContext.createCounterSet()).thenReturn(counters); + + com.google.cloud.dataflow.sdk.runners.inprocess.TransformEvaluator<String> evaluator = + new ParDoMultiEvaluatorFactory().forApplication( + mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext); + + evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); + evaluator.processElement( + WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000))); + evaluator.processElement( + WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)); + + InProcessTransformResult result = evaluator.finishBundle(); + assertThat( + result.getTimerUpdate(), + equalTo( + TimerUpdate.builder("myKey") + .setTimer(addedTimer) + .setTimer(addedTimer) + .setTimer(addedTimer) + .deletedTimer(deletedTimer) + .deletedTimer(deletedTimer) + .deletedTimer(deletedTimer) + .build())); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ca98da2a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java index 4fc765c..919e69e 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java @@ -15,28 +15,45 @@ */ package com.google.cloud.dataflow.sdk.runners.inprocess; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessExecutionContext; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; +import com.google.cloud.dataflow.sdk.runners.inprocess.util.InMemoryWatermarkManager.TimerUpdate; import com.google.cloud.dataflow.sdk.runners.inprocess.util.InProcessBundle; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.Create; import com.google.cloud.dataflow.sdk.transforms.DoFn; import com.google.cloud.dataflow.sdk.transforms.ParDo; import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFns; import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo; +import com.google.cloud.dataflow.sdk.util.TimeDomain; +import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData; import com.google.cloud.dataflow.sdk.util.WindowedValue; import com.google.cloud.dataflow.sdk.util.common.CounterSet; +import com.google.cloud.dataflow.sdk.util.state.BagState; +import com.google.cloud.dataflow.sdk.util.state.StateNamespace; +import com.google.cloud.dataflow.sdk.util.state.StateNamespaces; +import com.google.cloud.dataflow.sdk.util.state.StateTag; +import com.google.cloud.dataflow.sdk.util.state.StateTags; +import com.google.cloud.dataflow.sdk.util.state.WatermarkHoldState; +import com.google.cloud.dataflow.sdk.values.KV; import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.cloud.dataflow.sdk.values.TupleTag; import org.hamcrest.Matchers; +import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Test; import org.junit.runner.RunWith; @@ -65,8 +82,9 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable { UncommittedBundle<Integer> outputBundle = InProcessBundle.unkeyed(collection); when(evaluationContext.createBundle(inputBundle, collection)).thenReturn(outputBundle); - InProcessExecutionContext executionContext = new InProcessExecutionContext(); - when(evaluationContext.getExecutionContext(collection.getProducingTransformInternal())) + InProcessExecutionContext executionContext = + new InProcessExecutionContext(null, null, null, null); + when(evaluationContext.getExecutionContext(collection.getProducingTransformInternal(), null)) .thenReturn(executionContext); CounterSet counters = new CounterSet(); when(evaluationContext.createCounterSet()).thenReturn(counters); @@ -112,8 +130,8 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable { InProcessBundle.unkeyed(collection); when(evaluationContext.createBundle(inputBundle, collection)).thenReturn(outputBundle); InProcessExecutionContext executionContext = - new InProcessExecutionContext(); - when(evaluationContext.getExecutionContext(collection.getProducingTransformInternal())) + new InProcessExecutionContext(null, null, null, null); + when(evaluationContext.getExecutionContext(collection.getProducingTransformInternal(), null)) .thenReturn(executionContext); CounterSet counters = new CounterSet(); when(evaluationContext.createCounterSet()).thenReturn(counters); @@ -134,5 +152,161 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable { assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE)); assertThat(result.getCounters(), equalTo(counters)); } + + @Test + public void finishBundleWithStatePutsStateInResult() throws Exception { + TestPipeline p = TestPipeline.create(); + + PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam")); + + final StateTag<Object, WatermarkHoldState<BoundedWindow>> watermarkTag = + StateTags.watermarkStateInternal("myId", OutputTimeFns.outputAtEarliestInputTimestamp()); + final StateTag<Object, BagState<String>> bagTag = StateTags.bag("myBag", StringUtf8Coder.of()); + final StateNamespace windowNs = + StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE); + ParDo.Bound<String, KV<String, Integer>> pardo = + ParDo.of( + new DoFn<String, KV<String, Integer>>() { + @Override + public void processElement(ProcessContext c) { + c.windowingInternals() + .stateInternals() + .state(StateNamespaces.global(), watermarkTag) + .add(new Instant(124443L - c.element().length())); + c.windowingInternals() + .stateInternals() + .state( + StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE), + bagTag) + .add(c.element()); + } + }); + PCollection<KV<String, Integer>> mainOutput = input.apply(pardo); + + CommittedBundle<String> inputBundle = InProcessBundle.unkeyed(input).commit(Instant.now()); + + InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class); + UncommittedBundle<KV<String, Integer>> mainOutputBundle = InProcessBundle.unkeyed(mainOutput); + + when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle); + + InProcessExecutionContext executionContext = + new InProcessExecutionContext(null, "myKey", null, null); + when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), null)) + .thenReturn(executionContext); + CounterSet counters = new CounterSet(); + when(evaluationContext.createCounterSet()).thenReturn(counters); + + com.google.cloud.dataflow.sdk.runners.inprocess.TransformEvaluator<String> evaluator = + new ParDoSingleEvaluatorFactory() + .forApplication( + mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext); + + evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); + evaluator.processElement( + WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000))); + evaluator.processElement( + WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)); + + InProcessTransformResult result = evaluator.finishBundle(); + assertThat(result.getWatermarkHold(), equalTo(new Instant(124438L))); + assertThat(result.getState(), not(nullValue())); + assertThat( + result.getState().state(StateNamespaces.global(), watermarkTag).read(), + equalTo(new Instant(124438L))); + assertThat( + result.getState().state(windowNs, bagTag).read(), + containsInAnyOrder("foo", "bara", "bazam")); + } + + @Test + public void finishBundleWithStateAndTimersPutsTimersInResult() throws Exception { + TestPipeline p = TestPipeline.create(); + + PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam")); + + final TimerData addedTimer = + TimerData.of( + StateNamespaces.window( + IntervalWindow.getCoder(), + new IntervalWindow( + new Instant(0).plus(Duration.standardMinutes(5)), + new Instant(1) + .plus(Duration.standardMinutes(5)) + .plus(Duration.standardHours(1)))), + new Instant(54541L), + TimeDomain.EVENT_TIME); + final TimerData deletedTimer = + TimerData.of( + StateNamespaces.window( + IntervalWindow.getCoder(), + new IntervalWindow(new Instant(0), new Instant(0).plus(Duration.standardHours(1)))), + new Instant(3400000), + TimeDomain.SYNCHRONIZED_PROCESSING_TIME); + + ParDo.Bound<String, KV<String, Integer>> pardo = + ParDo.of( + new DoFn<String, KV<String, Integer>>() { + @Override + public void processElement(ProcessContext c) { + c.windowingInternals().stateInternals(); + c.windowingInternals() + .timerInternals() + .setTimer( + TimerData.of( + StateNamespaces.window( + IntervalWindow.getCoder(), + new IntervalWindow( + new Instant(0).plus(Duration.standardMinutes(5)), + new Instant(1) + .plus(Duration.standardMinutes(5)) + .plus(Duration.standardHours(1)))), + new Instant(54541L), + TimeDomain.EVENT_TIME)); + c.windowingInternals() + .timerInternals() + .deleteTimer( + TimerData.of( + StateNamespaces.window( + IntervalWindow.getCoder(), + new IntervalWindow( + new Instant(0), + new Instant(0).plus(Duration.standardHours(1)))), + new Instant(3400000), + TimeDomain.SYNCHRONIZED_PROCESSING_TIME)); + } + }); + PCollection<KV<String, Integer>> mainOutput = input.apply(pardo); + + CommittedBundle<String> inputBundle = InProcessBundle.unkeyed(input).commit(Instant.now()); + + InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class); + UncommittedBundle<KV<String, Integer>> mainOutputBundle = InProcessBundle.unkeyed(mainOutput); + + when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle); + + InProcessExecutionContext executionContext = + new InProcessExecutionContext(null, "myKey", null, null); + when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), null)) + .thenReturn(executionContext); + CounterSet counters = new CounterSet(); + when(evaluationContext.createCounterSet()).thenReturn(counters); + + TransformEvaluator<String> evaluator = + new ParDoSingleEvaluatorFactory() + .forApplication( + mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext); + + evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); + + InProcessTransformResult result = evaluator.finishBundle(); + assertThat( + result.getTimerUpdate(), + equalTo( + TimerUpdate.builder("myKey") + .setTimer(addedTimer) + .deletedTimer(deletedTimer) + .build())); + } }
