mr-runner: use InMemoryStateInternals in ParDoOperation, this fixed ParDoTest 
that uses state.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9f312c56
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9f312c56
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9f312c56

Branch: refs/heads/mr-runner
Commit: 9f312c561a7a21c92072e91eebdca7fb6f72c9eb
Parents: 0b37187
Author: Pei He <p...@apache.org>
Authored: Thu Aug 31 21:01:59 2017 +0800
Committer: Pei He <p...@apache.org>
Committed: Fri Sep 1 17:13:40 2017 +0800

----------------------------------------------------------------------
 .../mapreduce/translation/ParDoOperation.java   | 29 +++++++++++++++++++-
 1 file changed, 28 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/9f312c56/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java
index 2c2fbde..ef83e72 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java
@@ -28,7 +28,12 @@ import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.InMemoryStateInternals;
+import org.apache.beam.runners.core.InMemoryTimerInternals;
 import org.apache.beam.runners.core.NullSideInputReader;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StepContext;
+import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -91,6 +96,18 @@ public abstract class ParDoOperation<InputT, OutputT> 
extends Operation<InputT>
     for (Graphs.Tag tag : sideInputTags) {
       tupleTagToCoder.put(tag.getTupleTag(), tag.getCoder());
     }
+
+    final StateInternals stateInternals;
+    try {
+      stateInternals = 
InMemoryStateInternals.forKey(taskContext.getCurrentKey());
+    } catch (IOException | InterruptedException e) {
+      if (e instanceof InterruptedException) {
+        Thread.currentThread().interrupt();
+      }
+      throw new RuntimeException(e);
+    }
+    final TimerInternals timerInternals = new InMemoryTimerInternals();
+
     fnRunner = DoFnRunners.simpleRunner(
         options.getPipelineOptions(),
         getDoFn(),
@@ -100,7 +117,17 @@ public abstract class ParDoOperation<InputT, OutputT> 
extends Operation<InputT>
         createOutputManager(),
         mainOutputTag,
         sideOutputTags,
-        null,
+        new StepContext() {
+          @Override
+          public StateInternals stateInternals() {
+            return stateInternals;
+          }
+
+          @Override
+          public TimerInternals timerInternals() {
+            return timerInternals;
+          }
+        },
         windowingStrategy);
 
     try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(

Reply via email to