Update Timers and State in the InProcess ParDoEvaluator

If the ParDo accessed state, put the committed value into the transform
result, and likewise with timers.

Add a #commitState method to InProcessStepContext to return the
committed state.

Implement stateInternals() and timerInternals() to provide actual
implementations of StateInternals and TimerInternals. Use concrete types
due to implementation requirements. stateInternals() and
timerInternals() construct response values the first time they are
called based on the underlying data structure; #commitState returns null
if and only if stateInternals was not used by the transform, and
likewise for #getTimerUpdate

----Release Notes----
[]
-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=115504122


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ca98da2a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ca98da2a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ca98da2a

Branch: refs/heads/master
Commit: ca98da2a372210325e1c8985292b4040d1ac8c62
Parents: 639e9d9
Author: tgroh <[email protected]>
Authored: Wed Feb 24 16:01:50 2016 -0800
Committer: Davor Bonaci <[email protected]>
Committed: Thu Feb 25 23:58:28 2016 -0800

----------------------------------------------------------------------
 .../inprocess/InProcessExecutionContext.java    | 108 +++++++++
 .../inprocess/InProcessPipelineRunner.java      |  47 +---
 .../inprocess/ParDoInProcessEvaluator.java      |  34 ++-
 .../inprocess/ParDoMultiEvaluatorFactory.java   |  25 +--
 .../inprocess/ParDoSingleEvaluatorFactory.java  |  30 +--
 .../util/InMemoryWatermarkManager.java          |  17 ++
 .../ParDoMultiEvaluatorFactoryTest.java         | 223 ++++++++++++++++++-
 .../ParDoSingleEvaluatorFactoryTest.java        | 184 ++++++++++++++-
 8 files changed, 556 insertions(+), 112 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ca98da2a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessExecutionContext.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessExecutionContext.java
 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessExecutionContext.java
new file mode 100644
index 0000000..6342cd4
--- /dev/null
+++ 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessExecutionContext.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy 
of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import com.google.cloud.dataflow.sdk.runners.inprocess.util.Clock;
+import 
com.google.cloud.dataflow.sdk.runners.inprocess.util.InMemoryWatermarkManager.TimerUpdate;
+import 
com.google.cloud.dataflow.sdk.runners.inprocess.util.InMemoryWatermarkManager.TransformWatermarks;
+import 
com.google.cloud.dataflow.sdk.runners.inprocess.util.InProcessTimerInternals;
+import com.google.cloud.dataflow.sdk.util.BaseExecutionContext;
+import com.google.cloud.dataflow.sdk.util.ExecutionContext;
+import com.google.cloud.dataflow.sdk.util.TimerInternals;
+import com.google.cloud.dataflow.sdk.util.common.worker.StateSampler;
+import 
com.google.cloud.dataflow.sdk.util.state.CopyOnAccessInMemoryStateInternals;
+
+/**
+ * Execution Context for the {@link InProcessPipelineRunner}.
+ *
+ * This implementation is not thread safe. A new {@link 
InProcessExecutionContext} must be created
+ * for each thread that requires it.
+ */
+class InProcessExecutionContext
+    extends 
BaseExecutionContext<InProcessExecutionContext.InProcessStepContext> {
+  private final Clock clock;
+  private final Object key;
+  private final CopyOnAccessInMemoryStateInternals<Object> existingState;
+  private final TransformWatermarks watermarks;
+
+  public InProcessExecutionContext(Clock clock, Object key,
+      CopyOnAccessInMemoryStateInternals<Object> existingState, 
TransformWatermarks watermarks) {
+    this.clock = clock;
+    this.key = key;
+    this.existingState = existingState;
+    this.watermarks = watermarks;
+  }
+
+  @Override
+  protected InProcessStepContext createStepContext(
+      String stepName, String transformName, StateSampler stateSampler) {
+    return new InProcessStepContext(this, stepName, transformName);
+  }
+
+  /**
+   * Step Context for the {@link InProcessPipelineRunner}.
+   */
+  public class InProcessStepContext
+      extends 
com.google.cloud.dataflow.sdk.util.BaseExecutionContext.StepContext {
+    private CopyOnAccessInMemoryStateInternals<Object> stateInternals;
+    private InProcessTimerInternals timerInternals;
+
+    public InProcessStepContext(
+        ExecutionContext executionContext, String stepName, String 
transformName) {
+      super(executionContext, stepName, transformName);
+    }
+
+    @Override
+    public CopyOnAccessInMemoryStateInternals<Object> stateInternals() {
+      if (stateInternals == null) {
+        stateInternals = 
CopyOnAccessInMemoryStateInternals.withUnderlying(key, existingState);
+      }
+      return stateInternals;
+    }
+
+    @Override
+    public InProcessTimerInternals timerInternals() {
+      if (timerInternals == null) {
+        timerInternals =
+            InProcessTimerInternals.create(clock, watermarks, 
TimerUpdate.builder(key));
+      }
+      return timerInternals;
+    }
+
+    /**
+     * Commits the state of this step, and returns the committed state. If the 
step has not
+     * accessed any state, return null.
+     */
+    public CopyOnAccessInMemoryStateInternals<?> commitState() {
+      if (stateInternals != null) {
+        return stateInternals.commit();
+      }
+      return null;
+    }
+
+    /**
+     * Gets the timer update of the {@link TimerInternals} of this {@link 
InProcessStepContext},
+     * which is empty if the {@link TimerInternals} were never accessed.
+     */
+    public TimerUpdate getTimerUpdate() {
+      if (timerInternals == null) {
+        return TimerUpdate.empty();
+      }
+      return timerInternals.getTimerUpdate();
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ca98da2a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
index 26c5061..124de46 100644
--- 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
+++ 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
@@ -28,16 +28,12 @@ import com.google.cloud.dataflow.sdk.transforms.PTransform;
 import com.google.cloud.dataflow.sdk.transforms.View.CreatePCollectionView;
 import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
 import com.google.cloud.dataflow.sdk.transforms.windowing.Trigger;
-import com.google.cloud.dataflow.sdk.util.BaseExecutionContext;
 import com.google.cloud.dataflow.sdk.util.ExecutionContext;
 import com.google.cloud.dataflow.sdk.util.SideInputReader;
-import com.google.cloud.dataflow.sdk.util.TimerInternals;
 import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData;
 import com.google.cloud.dataflow.sdk.util.WindowedValue;
 import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
 import com.google.cloud.dataflow.sdk.util.common.CounterSet;
-import com.google.cloud.dataflow.sdk.util.common.worker.StateSampler;
-import com.google.cloud.dataflow.sdk.util.state.StateInternals;
 import com.google.cloud.dataflow.sdk.values.PCollection;
 import com.google.cloud.dataflow.sdk.values.PCollectionView;
 import com.google.cloud.dataflow.sdk.values.PValue;
@@ -159,46 +155,6 @@ public class InProcessPipelineRunner {
   }
 
   /**
-   * Execution Context for the InMemoryPipelineRunner.
-   *
-   * This implementation is not thread safe. A new InMemoryExecutionContext 
must be created for each
-   * thread that requires it.
-   */
-  public static class InProcessExecutionContext
-      extends 
BaseExecutionContext<InProcessExecutionContext.InProcessStepContext> {
-    @Override
-    protected InProcessStepContext createStepContext(
-        String stepName, String transformName, StateSampler stateSampler) {
-      return new InProcessStepContext(this, stepName, transformName);
-    }
-
-    /**
-     * Step Context for the InMemoryPipelineRunner.
-     */
-    public class InProcessStepContext
-        extends 
com.google.cloud.dataflow.sdk.util.BaseExecutionContext.StepContext {
-      public InProcessStepContext(
-          InProcessExecutionContext executionContext, String stepName, String 
transformName) {
-        super(executionContext, stepName, transformName);
-      }
-
-      @Override
-      public StateInternals stateInternals() {
-        // TODO get or create state for current key.
-        throw new UnsupportedOperationException("StateInternals not yet 
meaningfully supported");
-      }
-
-      @Override
-      public TimerInternals timerInternals() {
-        // TODO: Have the executionContext/evaluationContext pass this in
-        throw new UnsupportedOperationException("TimerInternals not yet 
meaningfully supported");
-      }
-    }
-
-  }
-
-
-  /**
    * The evaluation context for the {@link InProcessPipelineRunner}. Contains 
state shared within
    * the current evaluation.
    */
@@ -235,7 +191,8 @@ public class InProcessPipelineRunner {
     /**
      * Get an {@link ExecutionContext} for the provided application.
      */
-    InProcessExecutionContext getExecutionContext(AppliedPTransform<?, ?, ?> 
application);
+    InProcessExecutionContext getExecutionContext(
+        AppliedPTransform<?, ?, ?> application, @Nullable Object key);
 
     /**
      * Get the Step Name for the provided application.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ca98da2a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoInProcessEvaluator.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoInProcessEvaluator.java
 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoInProcessEvaluator.java
index f0b2ca2..2a21e8c 100644
--- 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoInProcessEvaluator.java
+++ 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoInProcessEvaluator.java
@@ -15,49 +15,63 @@
  */
 package com.google.cloud.dataflow.sdk.runners.inprocess;
 
+import 
com.google.cloud.dataflow.sdk.runners.inprocess.InProcessExecutionContext.InProcessStepContext;
 import 
com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
 import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
 import com.google.cloud.dataflow.sdk.util.DoFnRunner;
 import com.google.cloud.dataflow.sdk.util.DoFnRunners.OutputManager;
 import com.google.cloud.dataflow.sdk.util.WindowedValue;
 import com.google.cloud.dataflow.sdk.util.common.CounterSet;
+import 
com.google.cloud.dataflow.sdk.util.state.CopyOnAccessInMemoryStateInternals;
 import com.google.cloud.dataflow.sdk.values.PCollection;
 import com.google.cloud.dataflow.sdk.values.TupleTag;
 
-import org.joda.time.Instant;
-
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-class ParDoInProcessEvaluator<T> {
+class ParDoInProcessEvaluator<T> implements TransformEvaluator<T> {
   private final DoFnRunner<T, ?> fnRunner;
   private final AppliedPTransform<PCollection<T>, ?, ?> transform;
   private final CounterSet counters;
   private final Collection<UncommittedBundle<?>> outputBundles;
+  private final InProcessStepContext stepContext;
 
-  public ParDoInProcessEvaluator(DoFnRunner<T, ?> fnRunner,
-      AppliedPTransform<PCollection<T>, ?, ?> transform, CounterSet counters,
-      Collection<UncommittedBundle<?>> outputBundles) {
+  public ParDoInProcessEvaluator(
+      DoFnRunner<T, ?> fnRunner,
+      AppliedPTransform<PCollection<T>, ?, ?> transform,
+      CounterSet counters,
+      Collection<UncommittedBundle<?>> outputBundles,
+      InProcessStepContext stepContext) {
     this.fnRunner = fnRunner;
     this.transform = transform;
     this.counters = counters;
     this.outputBundles = outputBundles;
+    this.stepContext = stepContext;
   }
 
+  @Override
   public void processElement(WindowedValue<T> element) {
     fnRunner.processElement(element);
   }
 
+  @Override
   public InProcessTransformResult finishBundle() {
     fnRunner.finishBundle();
-    // TODO Use a real value
-    Instant hold = BoundedWindow.TIMESTAMP_MAX_VALUE;
-    return StepTransformResult.withHold(transform, hold)
+    StepTransformResult.Builder resultBuilder;
+    CopyOnAccessInMemoryStateInternals<?> state = stepContext.commitState();
+    if (state != null) {
+      resultBuilder =
+          StepTransformResult.withHold(transform, 
state.getEarliestWatermarkHold())
+              .withState(state);
+    } else {
+      resultBuilder = StepTransformResult.withoutHold(transform);
+    }
+    return resultBuilder
         .addOutput(outputBundles)
+        .withTimerUpdate(stepContext.getTimerUpdate())
         .withCounters(counters)
         .build();
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ca98da2a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java
 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java
index ad68a6b..e3ae1a0 100644
--- 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java
+++ 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java
@@ -15,10 +15,9 @@
  */
 package com.google.cloud.dataflow.sdk.runners.inprocess;
 
+import 
com.google.cloud.dataflow.sdk.runners.inprocess.InProcessExecutionContext.InProcessStepContext;
 import 
com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
 import 
com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext;
-import 
com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessExecutionContext;
-import 
com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessExecutionContext.InProcessStepContext;
 import 
com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
 import 
com.google.cloud.dataflow.sdk.runners.inprocess.ParDoInProcessEvaluator.BundleOutputManager;
 import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
@@ -27,7 +26,6 @@ import com.google.cloud.dataflow.sdk.transforms.PTransform;
 import com.google.cloud.dataflow.sdk.transforms.ParDo.BoundMulti;
 import com.google.cloud.dataflow.sdk.util.DoFnRunner;
 import com.google.cloud.dataflow.sdk.util.DoFnRunners;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
 import com.google.cloud.dataflow.sdk.util.common.CounterSet;
 import com.google.cloud.dataflow.sdk.values.PCollection;
 import com.google.cloud.dataflow.sdk.values.PCollectionTuple;
@@ -46,20 +44,7 @@ class ParDoMultiEvaluatorFactory implements 
TransformEvaluatorFactory {
       AppliedPTransform<?, ?, ?> application,
       CommittedBundle<?> inputBundle,
       InProcessEvaluationContext evaluationContext) {
-    @SuppressWarnings({"unchecked", "rawtypes"})
-    final ParDoInProcessEvaluator<T> multiEvaluator =
-        createMultiEvaluator((AppliedPTransform) application, inputBundle, 
evaluationContext);
-    return new TransformEvaluator<T>() {
-      @Override
-      public void processElement(WindowedValue<T> value) {
-        multiEvaluator.processElement(value);
-      }
-
-      @Override
-      public InProcessTransformResult finishBundle() {
-        return multiEvaluator.finishBundle();
-      }
-    };
+    return createMultiEvaluator((AppliedPTransform) application, inputBundle, 
evaluationContext);
   }
 
   private static <InT, OuT> ParDoInProcessEvaluator<InT> createMultiEvaluator(
@@ -74,7 +59,8 @@ class ParDoMultiEvaluatorFactory implements 
TransformEvaluatorFactory {
           outputEntry.getKey(),
           evaluationContext.createBundle(inputBundle, outputEntry.getValue()));
     }
-    InProcessExecutionContext executionContext = 
evaluationContext.getExecutionContext(application);
+    InProcessExecutionContext executionContext =
+        evaluationContext.getExecutionContext(application, 
inputBundle.getKey());
     String stepName = evaluationContext.getStepName(application);
     InProcessStepContext stepContext =
         executionContext.getOrCreateStepContext(stepName, stepName, null);
@@ -96,6 +82,7 @@ class ParDoMultiEvaluatorFactory implements 
TransformEvaluatorFactory {
 
     runner.startBundle();
 
-    return new ParDoInProcessEvaluator<>(runner, application, counters, 
outputBundles.values());
+    return new ParDoInProcessEvaluator<>(
+        runner, application, counters, outputBundles.values(), stepContext);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ca98da2a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java
 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java
index 737d0e9..cd79c21 100644
--- 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java
+++ 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java
@@ -15,10 +15,9 @@
  */
 package com.google.cloud.dataflow.sdk.runners.inprocess;
 
+import 
com.google.cloud.dataflow.sdk.runners.inprocess.InProcessExecutionContext.InProcessStepContext;
 import 
com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
 import 
com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext;
-import 
com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessExecutionContext;
-import 
com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessExecutionContext.InProcessStepContext;
 import 
com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
 import 
com.google.cloud.dataflow.sdk.runners.inprocess.ParDoInProcessEvaluator.BundleOutputManager;
 import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
@@ -26,7 +25,6 @@ import com.google.cloud.dataflow.sdk.transforms.PTransform;
 import com.google.cloud.dataflow.sdk.transforms.ParDo.Bound;
 import com.google.cloud.dataflow.sdk.util.DoFnRunner;
 import com.google.cloud.dataflow.sdk.util.DoFnRunners;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
 import com.google.cloud.dataflow.sdk.util.common.CounterSet;
 import com.google.cloud.dataflow.sdk.values.PCollection;
 import com.google.cloud.dataflow.sdk.values.TupleTag;
@@ -43,22 +41,7 @@ class ParDoSingleEvaluatorFactory implements 
TransformEvaluatorFactory {
       final AppliedPTransform<?, ?, ?> application,
       CommittedBundle<?> inputBundle,
       InProcessEvaluationContext evaluationContext) {
-    @SuppressWarnings({"unchecked", "rawtypes"})
-    final ParDoInProcessEvaluator<T> evaluator =
-        createSingleEvaluator((AppliedPTransform) application, inputBundle, 
evaluationContext);
-    TransformEvaluator<T> singleEvaluator =
-        new TransformEvaluator<T>() {
-          @Override
-          public void processElement(WindowedValue<T> value) {
-            evaluator.processElement(value);
-          }
-
-          @Override
-          public InProcessTransformResult finishBundle() {
-            return evaluator.finishBundle();
-          }
-        };
-    return singleEvaluator;
+    return createSingleEvaluator((AppliedPTransform) application, inputBundle, 
evaluationContext);
   }
 
   private static <InputT, OutputT> ParDoInProcessEvaluator<InputT> 
createSingleEvaluator(
@@ -69,7 +52,8 @@ class ParDoSingleEvaluatorFactory implements 
TransformEvaluatorFactory {
     UncommittedBundle<OutputT> outputBundle =
         evaluationContext.createBundle(inputBundle, application.getOutput());
 
-    InProcessExecutionContext executionContext = 
evaluationContext.getExecutionContext(application);
+    InProcessExecutionContext executionContext =
+        evaluationContext.getExecutionContext(application, 
inputBundle.getKey());
     String stepName = evaluationContext.getStepName(application);
     InProcessStepContext stepContext =
         executionContext.getOrCreateStepContext(stepName, stepName, null);
@@ -92,6 +76,10 @@ class ParDoSingleEvaluatorFactory implements 
TransformEvaluatorFactory {
 
     runner.startBundle();
     return new ParDoInProcessEvaluator<InputT>(
-        runner, application, counters, 
Collections.<UncommittedBundle<?>>singleton(outputBundle));
+        runner,
+        application,
+        counters,
+        Collections.<UncommittedBundle<?>>singleton(outputBundle),
+        stepContext);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ca98da2a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/util/InMemoryWatermarkManager.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/util/InMemoryWatermarkManager.java
 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/util/InMemoryWatermarkManager.java
index ea6e00a..4428e41 100644
--- 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/util/InMemoryWatermarkManager.java
+++ 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/util/InMemoryWatermarkManager.java
@@ -1244,6 +1244,23 @@ public class InMemoryWatermarkManager {
     Iterable<? extends TimerData> getDeletedTimers() {
       return deletedTimers;
     }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(key, completedTimers, setTimers, deletedTimers);
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      if (other == null || !(other instanceof TimerUpdate)) {
+        return false;
+      }
+      TimerUpdate that = (TimerUpdate) other;
+      return Objects.equals(this.key, that.key)
+          && Objects.equals(this.completedTimers, that.completedTimers)
+          && Objects.equals(this.setTimers, that.setTimers)
+          && Objects.equals(this.deletedTimers, that.deletedTimers);
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ca98da2a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java
 
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java
index c55a9d5..80863b9 100644
--- 
a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java
+++ 
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java
@@ -15,15 +15,19 @@
  */
 package com.google.cloud.dataflow.sdk.runners.inprocess;
 
+import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
 import 
com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
 import 
com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext;
-import 
com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessExecutionContext;
 import 
com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
+import 
com.google.cloud.dataflow.sdk.runners.inprocess.util.InMemoryWatermarkManager.TimerUpdate;
 import com.google.cloud.dataflow.sdk.runners.inprocess.util.InProcessBundle;
 import com.google.cloud.dataflow.sdk.testing.TestPipeline;
 import com.google.cloud.dataflow.sdk.transforms.Create;
@@ -31,9 +35,20 @@ import com.google.cloud.dataflow.sdk.transforms.DoFn;
 import com.google.cloud.dataflow.sdk.transforms.ParDo;
 import com.google.cloud.dataflow.sdk.transforms.ParDo.BoundMulti;
 import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFns;
 import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
+import com.google.cloud.dataflow.sdk.util.TimeDomain;
+import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData;
 import com.google.cloud.dataflow.sdk.util.WindowedValue;
 import com.google.cloud.dataflow.sdk.util.common.CounterSet;
+import com.google.cloud.dataflow.sdk.util.state.BagState;
+import com.google.cloud.dataflow.sdk.util.state.StateNamespace;
+import com.google.cloud.dataflow.sdk.util.state.StateNamespaces;
+import com.google.cloud.dataflow.sdk.util.state.StateTag;
+import com.google.cloud.dataflow.sdk.util.state.StateTags;
+import com.google.cloud.dataflow.sdk.util.state.WatermarkHoldState;
 import com.google.cloud.dataflow.sdk.values.KV;
 import com.google.cloud.dataflow.sdk.values.PCollection;
 import com.google.cloud.dataflow.sdk.values.PCollectionTuple;
@@ -41,6 +56,7 @@ import com.google.cloud.dataflow.sdk.values.TupleTag;
 import com.google.cloud.dataflow.sdk.values.TupleTagList;
 
 import org.hamcrest.Matchers;
+import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -81,20 +97,18 @@ public class ParDoMultiEvaluatorFactoryTest implements 
Serializable {
     PCollection<Integer> lengthOutput = outputTuple.get(lengthTag);
 
     InProcessEvaluationContext evaluationContext = 
mock(InProcessEvaluationContext.class);
-    UncommittedBundle<KV<String, Integer>> mainOutputBundle =
-        InProcessBundle.unkeyed(mainOutput);
-    UncommittedBundle<String> elementOutputBundle =
-        InProcessBundle.unkeyed(elementOutput);
-    UncommittedBundle<Integer> lengthOutputBundle =
-        InProcessBundle.unkeyed(lengthOutput);
+    UncommittedBundle<KV<String, Integer>> mainOutputBundle = 
InProcessBundle.unkeyed(mainOutput);
+    UncommittedBundle<String> elementOutputBundle = 
InProcessBundle.unkeyed(elementOutput);
+    UncommittedBundle<Integer> lengthOutputBundle = 
InProcessBundle.unkeyed(lengthOutput);
 
     when(evaluationContext.createBundle(inputBundle, 
mainOutput)).thenReturn(mainOutputBundle);
     when(evaluationContext.createBundle(inputBundle, elementOutput))
         .thenReturn(elementOutputBundle);
     when(evaluationContext.createBundle(inputBundle, 
lengthOutput)).thenReturn(lengthOutputBundle);
 
-    InProcessExecutionContext executionContext = new 
InProcessExecutionContext();
-    
when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal()))
+    InProcessExecutionContext executionContext =
+        new InProcessExecutionContext(null, null, null, null);
+    
when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(),
 null))
         .thenReturn(executionContext);
     CounterSet counters = new CounterSet();
     when(evaluationContext.createCounterSet()).thenReturn(counters);
@@ -173,8 +187,8 @@ public class ParDoMultiEvaluatorFactoryTest implements 
Serializable {
         .thenReturn(elementOutputBundle);
 
     InProcessExecutionContext executionContext =
-        new InProcessExecutionContext();
-    
when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal()))
+        new InProcessExecutionContext(null, null, null, null);
+    
when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(),
 null))
         .thenReturn(executionContext);
     CounterSet counters = new CounterSet();
     when(evaluationContext.createCounterSet()).thenReturn(counters);
@@ -211,5 +225,190 @@ public class ParDoMultiEvaluatorFactoryTest implements 
Serializable {
             WindowedValue.timestampedValueInGlobalWindow("bara", new 
Instant(1000)),
             WindowedValue.valueInGlobalWindow("bazam", 
PaneInfo.ON_TIME_AND_ONLY_FIRING)));
   }
-}
 
+  @Test
+  public void finishBundleWithStatePutsStateInResult() throws Exception {
+    TestPipeline p = TestPipeline.create();
+
+    PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
+
+    TupleTag<KV<String, Integer>> mainOutputTag = new TupleTag<KV<String, 
Integer>>() {};
+    final TupleTag<String> elementTag = new TupleTag<>();
+
+    final StateTag<Object, WatermarkHoldState<BoundedWindow>> watermarkTag =
+        StateTags.watermarkStateInternal("myId", 
OutputTimeFns.outputAtEndOfWindow());
+    final StateTag<Object, BagState<String>> bagTag = StateTags.bag("myBag", 
StringUtf8Coder.of());
+    final StateNamespace windowNs =
+        StateNamespaces.window(GlobalWindow.Coder.INSTANCE, 
GlobalWindow.INSTANCE);
+    BoundMulti<String, KV<String, Integer>> pardo =
+        ParDo.of(
+                new DoFn<String, KV<String, Integer>>() {
+                  @Override
+                  public void processElement(ProcessContext c) {
+                    c.windowingInternals()
+                        .stateInternals()
+                        .state(StateNamespaces.global(), watermarkTag)
+                        .add(new Instant(20202L + c.element().length()));
+                    c.windowingInternals()
+                        .stateInternals()
+                        .state(
+                            StateNamespaces.window(
+                                GlobalWindow.Coder.INSTANCE, 
GlobalWindow.INSTANCE),
+                            bagTag)
+                        .add(c.element());
+                  }
+                })
+            .withOutputTags(mainOutputTag, TupleTagList.of(elementTag));
+    PCollectionTuple outputTuple = input.apply(pardo);
+
+    CommittedBundle<String> inputBundle = 
InProcessBundle.unkeyed(input).commit(Instant.now());
+
+    PCollection<KV<String, Integer>> mainOutput = 
outputTuple.get(mainOutputTag);
+    PCollection<String> elementOutput = outputTuple.get(elementTag);
+
+    InProcessEvaluationContext evaluationContext = 
mock(InProcessEvaluationContext.class);
+    UncommittedBundle<KV<String, Integer>> mainOutputBundle = 
InProcessBundle.unkeyed(mainOutput);
+    UncommittedBundle<String> elementOutputBundle = 
InProcessBundle.unkeyed(elementOutput);
+
+    when(evaluationContext.createBundle(inputBundle, 
mainOutput)).thenReturn(mainOutputBundle);
+    when(evaluationContext.createBundle(inputBundle, elementOutput))
+        .thenReturn(elementOutputBundle);
+
+    InProcessExecutionContext executionContext =
+        new InProcessExecutionContext(null, "myKey", null, null);
+    
when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(),
 null))
+        .thenReturn(executionContext);
+    CounterSet counters = new CounterSet();
+    when(evaluationContext.createCounterSet()).thenReturn(counters);
+
+    com.google.cloud.dataflow.sdk.runners.inprocess.TransformEvaluator<String> 
evaluator =
+        new ParDoMultiEvaluatorFactory().forApplication(
+            mainOutput.getProducingTransformInternal(), inputBundle, 
evaluationContext);
+
+    evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
+    evaluator.processElement(
+        WindowedValue.timestampedValueInGlobalWindow("bara", new 
Instant(1000)));
+    evaluator.processElement(
+        WindowedValue.valueInGlobalWindow("bazam", 
PaneInfo.ON_TIME_AND_ONLY_FIRING));
+
+    InProcessTransformResult result = evaluator.finishBundle();
+    assertThat(
+        result.getOutputBundles(),
+        Matchers.<UncommittedBundle<?>>containsInAnyOrder(mainOutputBundle, 
elementOutputBundle));
+    assertThat(result.getWatermarkHold(), equalTo(new Instant(20205L)));
+    assertThat(result.getState(), not(nullValue()));
+    assertThat(
+        result.getState().state(StateNamespaces.global(), watermarkTag).read(),
+        equalTo(new Instant(20205L)));
+    assertThat(
+        result.getState().state(windowNs, bagTag).read(),
+        containsInAnyOrder("foo", "bara", "bazam"));
+  }
+
+  @Test
+  public void finishBundleWithStateAndTimersPutsTimersInResult() throws 
Exception {
+    TestPipeline p = TestPipeline.create();
+
+    PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
+
+    TupleTag<KV<String, Integer>> mainOutputTag = new TupleTag<KV<String, 
Integer>>() {};
+    final TupleTag<String> elementTag = new TupleTag<>();
+
+    final TimerData addedTimer =
+        TimerData.of(
+            StateNamespaces.window(
+                IntervalWindow.getCoder(),
+                new IntervalWindow(
+                    new Instant(0).plus(Duration.standardMinutes(5)),
+                    new Instant(1)
+                        .plus(Duration.standardMinutes(5))
+                        .plus(Duration.standardHours(1)))),
+            new Instant(54541L),
+            TimeDomain.EVENT_TIME);
+    final TimerData deletedTimer =
+        TimerData.of(
+            StateNamespaces.window(
+                IntervalWindow.getCoder(),
+                new IntervalWindow(new Instant(0), new 
Instant(0).plus(Duration.standardHours(1)))),
+            new Instant(3400000),
+            TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
+
+    BoundMulti<String, KV<String, Integer>> pardo =
+        ParDo.of(
+                new DoFn<String, KV<String, Integer>>() {
+                  @Override
+                  public void processElement(ProcessContext c) {
+                    c.windowingInternals().stateInternals();
+                    c.windowingInternals()
+                        .timerInternals()
+                        .setTimer(
+                            TimerData.of(
+                                StateNamespaces.window(
+                                    IntervalWindow.getCoder(),
+                                    new IntervalWindow(
+                                        new 
Instant(0).plus(Duration.standardMinutes(5)),
+                                        new Instant(1)
+                                            .plus(Duration.standardMinutes(5))
+                                            .plus(Duration.standardHours(1)))),
+                                new Instant(54541L),
+                                TimeDomain.EVENT_TIME));
+                    c.windowingInternals()
+                        .timerInternals()
+                        .deleteTimer(
+                            TimerData.of(
+                                StateNamespaces.window(
+                                    IntervalWindow.getCoder(),
+                                    new IntervalWindow(
+                                        new Instant(0),
+                                        new 
Instant(0).plus(Duration.standardHours(1)))),
+                                new Instant(3400000),
+                                TimeDomain.SYNCHRONIZED_PROCESSING_TIME));
+                  }
+                })
+            .withOutputTags(mainOutputTag, TupleTagList.of(elementTag));
+    PCollectionTuple outputTuple = input.apply(pardo);
+
+    CommittedBundle<String> inputBundle = 
InProcessBundle.unkeyed(input).commit(Instant.now());
+
+    PCollection<KV<String, Integer>> mainOutput = 
outputTuple.get(mainOutputTag);
+    PCollection<String> elementOutput = outputTuple.get(elementTag);
+
+    InProcessEvaluationContext evaluationContext = 
mock(InProcessEvaluationContext.class);
+    UncommittedBundle<KV<String, Integer>> mainOutputBundle = 
InProcessBundle.unkeyed(mainOutput);
+    UncommittedBundle<String> elementOutputBundle = 
InProcessBundle.unkeyed(elementOutput);
+
+    when(evaluationContext.createBundle(inputBundle, 
mainOutput)).thenReturn(mainOutputBundle);
+    when(evaluationContext.createBundle(inputBundle, elementOutput))
+        .thenReturn(elementOutputBundle);
+
+    InProcessExecutionContext executionContext =
+        new InProcessExecutionContext(null, "myKey", null, null);
+    
when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(),
 null))
+        .thenReturn(executionContext);
+    CounterSet counters = new CounterSet();
+    when(evaluationContext.createCounterSet()).thenReturn(counters);
+
+    com.google.cloud.dataflow.sdk.runners.inprocess.TransformEvaluator<String> 
evaluator =
+        new ParDoMultiEvaluatorFactory().forApplication(
+            mainOutput.getProducingTransformInternal(), inputBundle, 
evaluationContext);
+
+    evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
+    evaluator.processElement(
+        WindowedValue.timestampedValueInGlobalWindow("bara", new 
Instant(1000)));
+    evaluator.processElement(
+        WindowedValue.valueInGlobalWindow("bazam", 
PaneInfo.ON_TIME_AND_ONLY_FIRING));
+
+    InProcessTransformResult result = evaluator.finishBundle();
+    assertThat(
+        result.getTimerUpdate(),
+        equalTo(
+            TimerUpdate.builder("myKey")
+                .setTimer(addedTimer)
+                .setTimer(addedTimer)
+                .setTimer(addedTimer)
+                .deletedTimer(deletedTimer)
+                .deletedTimer(deletedTimer)
+                .deletedTimer(deletedTimer)
+                .build()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ca98da2a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java
 
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java
index 4fc765c..919e69e 100644
--- 
a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java
+++ 
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java
@@ -15,28 +15,45 @@
  */
 package com.google.cloud.dataflow.sdk.runners.inprocess;
 
+import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
 import 
com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
 import 
com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext;
-import 
com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessExecutionContext;
 import 
com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
+import 
com.google.cloud.dataflow.sdk.runners.inprocess.util.InMemoryWatermarkManager.TimerUpdate;
 import com.google.cloud.dataflow.sdk.runners.inprocess.util.InProcessBundle;
 import com.google.cloud.dataflow.sdk.testing.TestPipeline;
 import com.google.cloud.dataflow.sdk.transforms.Create;
 import com.google.cloud.dataflow.sdk.transforms.DoFn;
 import com.google.cloud.dataflow.sdk.transforms.ParDo;
 import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFns;
 import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
+import com.google.cloud.dataflow.sdk.util.TimeDomain;
+import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData;
 import com.google.cloud.dataflow.sdk.util.WindowedValue;
 import com.google.cloud.dataflow.sdk.util.common.CounterSet;
+import com.google.cloud.dataflow.sdk.util.state.BagState;
+import com.google.cloud.dataflow.sdk.util.state.StateNamespace;
+import com.google.cloud.dataflow.sdk.util.state.StateNamespaces;
+import com.google.cloud.dataflow.sdk.util.state.StateTag;
+import com.google.cloud.dataflow.sdk.util.state.StateTags;
+import com.google.cloud.dataflow.sdk.util.state.WatermarkHoldState;
+import com.google.cloud.dataflow.sdk.values.KV;
 import com.google.cloud.dataflow.sdk.values.PCollection;
 import com.google.cloud.dataflow.sdk.values.TupleTag;
 
 import org.hamcrest.Matchers;
+import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -65,8 +82,9 @@ public class ParDoSingleEvaluatorFactoryTest implements 
Serializable {
     UncommittedBundle<Integer> outputBundle =
         InProcessBundle.unkeyed(collection);
     when(evaluationContext.createBundle(inputBundle, 
collection)).thenReturn(outputBundle);
-    InProcessExecutionContext executionContext = new 
InProcessExecutionContext();
-    
when(evaluationContext.getExecutionContext(collection.getProducingTransformInternal()))
+    InProcessExecutionContext executionContext =
+        new InProcessExecutionContext(null, null, null, null);
+    
when(evaluationContext.getExecutionContext(collection.getProducingTransformInternal(),
 null))
         .thenReturn(executionContext);
     CounterSet counters = new CounterSet();
     when(evaluationContext.createCounterSet()).thenReturn(counters);
@@ -112,8 +130,8 @@ public class ParDoSingleEvaluatorFactoryTest implements 
Serializable {
         InProcessBundle.unkeyed(collection);
     when(evaluationContext.createBundle(inputBundle, 
collection)).thenReturn(outputBundle);
     InProcessExecutionContext executionContext =
-        new InProcessExecutionContext();
-    
when(evaluationContext.getExecutionContext(collection.getProducingTransformInternal()))
+        new InProcessExecutionContext(null, null, null, null);
+    
when(evaluationContext.getExecutionContext(collection.getProducingTransformInternal(),
 null))
         .thenReturn(executionContext);
     CounterSet counters = new CounterSet();
     when(evaluationContext.createCounterSet()).thenReturn(counters);
@@ -134,5 +152,161 @@ public class ParDoSingleEvaluatorFactoryTest implements 
Serializable {
     assertThat(result.getWatermarkHold(), 
equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
     assertThat(result.getCounters(), equalTo(counters));
   }
+
+  @Test
+  public void finishBundleWithStatePutsStateInResult() throws Exception {
+    TestPipeline p = TestPipeline.create();
+
+    PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
+
+    final StateTag<Object, WatermarkHoldState<BoundedWindow>> watermarkTag =
+        StateTags.watermarkStateInternal("myId", 
OutputTimeFns.outputAtEarliestInputTimestamp());
+    final StateTag<Object, BagState<String>> bagTag = StateTags.bag("myBag", 
StringUtf8Coder.of());
+    final StateNamespace windowNs =
+        StateNamespaces.window(GlobalWindow.Coder.INSTANCE, 
GlobalWindow.INSTANCE);
+    ParDo.Bound<String, KV<String, Integer>> pardo =
+        ParDo.of(
+            new DoFn<String, KV<String, Integer>>() {
+              @Override
+              public void processElement(ProcessContext c) {
+                c.windowingInternals()
+                    .stateInternals()
+                    .state(StateNamespaces.global(), watermarkTag)
+                    .add(new Instant(124443L - c.element().length()));
+                c.windowingInternals()
+                    .stateInternals()
+                    .state(
+                        StateNamespaces.window(GlobalWindow.Coder.INSTANCE, 
GlobalWindow.INSTANCE),
+                        bagTag)
+                    .add(c.element());
+              }
+            });
+    PCollection<KV<String, Integer>> mainOutput = input.apply(pardo);
+
+    CommittedBundle<String> inputBundle = 
InProcessBundle.unkeyed(input).commit(Instant.now());
+
+    InProcessEvaluationContext evaluationContext = 
mock(InProcessEvaluationContext.class);
+    UncommittedBundle<KV<String, Integer>> mainOutputBundle = 
InProcessBundle.unkeyed(mainOutput);
+
+    when(evaluationContext.createBundle(inputBundle, 
mainOutput)).thenReturn(mainOutputBundle);
+
+    InProcessExecutionContext executionContext =
+        new InProcessExecutionContext(null, "myKey", null, null);
+    
when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(),
 null))
+        .thenReturn(executionContext);
+    CounterSet counters = new CounterSet();
+    when(evaluationContext.createCounterSet()).thenReturn(counters);
+
+    com.google.cloud.dataflow.sdk.runners.inprocess.TransformEvaluator<String> 
evaluator =
+        new ParDoSingleEvaluatorFactory()
+            .forApplication(
+                mainOutput.getProducingTransformInternal(), inputBundle, 
evaluationContext);
+
+    evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
+    evaluator.processElement(
+        WindowedValue.timestampedValueInGlobalWindow("bara", new 
Instant(1000)));
+    evaluator.processElement(
+        WindowedValue.valueInGlobalWindow("bazam", 
PaneInfo.ON_TIME_AND_ONLY_FIRING));
+
+    InProcessTransformResult result = evaluator.finishBundle();
+    assertThat(result.getWatermarkHold(), equalTo(new Instant(124438L)));
+    assertThat(result.getState(), not(nullValue()));
+    assertThat(
+        result.getState().state(StateNamespaces.global(), watermarkTag).read(),
+        equalTo(new Instant(124438L)));
+    assertThat(
+        result.getState().state(windowNs, bagTag).read(),
+        containsInAnyOrder("foo", "bara", "bazam"));
+  }
+
+  @Test
+  public void finishBundleWithStateAndTimersPutsTimersInResult() throws 
Exception {
+    TestPipeline p = TestPipeline.create();
+
+    PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
+
+    final TimerData addedTimer =
+        TimerData.of(
+            StateNamespaces.window(
+                IntervalWindow.getCoder(),
+                new IntervalWindow(
+                    new Instant(0).plus(Duration.standardMinutes(5)),
+                    new Instant(1)
+                        .plus(Duration.standardMinutes(5))
+                        .plus(Duration.standardHours(1)))),
+            new Instant(54541L),
+            TimeDomain.EVENT_TIME);
+    final TimerData deletedTimer =
+        TimerData.of(
+            StateNamespaces.window(
+                IntervalWindow.getCoder(),
+                new IntervalWindow(new Instant(0), new 
Instant(0).plus(Duration.standardHours(1)))),
+            new Instant(3400000),
+            TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
+
+    ParDo.Bound<String, KV<String, Integer>> pardo =
+        ParDo.of(
+                new DoFn<String, KV<String, Integer>>() {
+                  @Override
+                  public void processElement(ProcessContext c) {
+                    c.windowingInternals().stateInternals();
+                    c.windowingInternals()
+                        .timerInternals()
+                        .setTimer(
+                            TimerData.of(
+                                StateNamespaces.window(
+                                    IntervalWindow.getCoder(),
+                                    new IntervalWindow(
+                                        new 
Instant(0).plus(Duration.standardMinutes(5)),
+                                        new Instant(1)
+                                            .plus(Duration.standardMinutes(5))
+                                            .plus(Duration.standardHours(1)))),
+                                new Instant(54541L),
+                                TimeDomain.EVENT_TIME));
+                    c.windowingInternals()
+                        .timerInternals()
+                        .deleteTimer(
+                            TimerData.of(
+                                StateNamespaces.window(
+                                    IntervalWindow.getCoder(),
+                                    new IntervalWindow(
+                                        new Instant(0),
+                                        new 
Instant(0).plus(Duration.standardHours(1)))),
+                                new Instant(3400000),
+                                TimeDomain.SYNCHRONIZED_PROCESSING_TIME));
+                  }
+                });
+    PCollection<KV<String, Integer>> mainOutput = input.apply(pardo);
+
+    CommittedBundle<String> inputBundle = 
InProcessBundle.unkeyed(input).commit(Instant.now());
+
+    InProcessEvaluationContext evaluationContext = 
mock(InProcessEvaluationContext.class);
+    UncommittedBundle<KV<String, Integer>> mainOutputBundle = 
InProcessBundle.unkeyed(mainOutput);
+
+    when(evaluationContext.createBundle(inputBundle, 
mainOutput)).thenReturn(mainOutputBundle);
+
+    InProcessExecutionContext executionContext =
+        new InProcessExecutionContext(null, "myKey", null, null);
+    
when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(),
 null))
+        .thenReturn(executionContext);
+    CounterSet counters = new CounterSet();
+    when(evaluationContext.createCounterSet()).thenReturn(counters);
+
+    TransformEvaluator<String> evaluator =
+        new ParDoSingleEvaluatorFactory()
+            .forApplication(
+                mainOutput.getProducingTransformInternal(), inputBundle, 
evaluationContext);
+
+    evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
+
+    InProcessTransformResult result = evaluator.finishBundle();
+    assertThat(
+        result.getTimerUpdate(),
+        equalTo(
+            TimerUpdate.builder("myKey")
+                .setTimer(addedTimer)
+                .deletedTimer(deletedTimer)
+                .build()));
+  }
 }
 

Reply via email to