mr-runner: use InMemoryStateInternals in ParDoOperation, this fixed ParDoTest that uses state.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9f312c56 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9f312c56 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9f312c56 Branch: refs/heads/mr-runner Commit: 9f312c561a7a21c92072e91eebdca7fb6f72c9eb Parents: 0b37187 Author: Pei He <p...@apache.org> Authored: Thu Aug 31 21:01:59 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Fri Sep 1 17:13:40 2017 +0800 ---------------------------------------------------------------------- .../mapreduce/translation/ParDoOperation.java | 29 +++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/9f312c56/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java index 2c2fbde..ef83e72 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java @@ -28,7 +28,12 @@ import java.util.Map; import javax.annotation.Nullable; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; +import org.apache.beam.runners.core.InMemoryStateInternals; +import org.apache.beam.runners.core.InMemoryTimerInternals; import org.apache.beam.runners.core.NullSideInputReader; +import org.apache.beam.runners.core.StateInternals; +import org.apache.beam.runners.core.StepContext; +import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.options.PipelineOptions; @@ -91,6 +96,18 @@ public abstract class ParDoOperation<InputT, OutputT> extends Operation<InputT> for (Graphs.Tag tag : sideInputTags) { tupleTagToCoder.put(tag.getTupleTag(), tag.getCoder()); } + + final StateInternals stateInternals; + try { + stateInternals = InMemoryStateInternals.forKey(taskContext.getCurrentKey()); + } catch (IOException | InterruptedException e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + throw new RuntimeException(e); + } + final TimerInternals timerInternals = new InMemoryTimerInternals(); + fnRunner = DoFnRunners.simpleRunner( options.getPipelineOptions(), getDoFn(), @@ -100,7 +117,17 @@ public abstract class ParDoOperation<InputT, OutputT> extends Operation<InputT> createOutputManager(), mainOutputTag, sideOutputTags, - null, + new StepContext() { + @Override + public StateInternals stateInternals() { + return stateInternals; + } + + @Override + public TimerInternals timerInternals() { + return timerInternals; + } + }, windowingStrategy); try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(