Implement StepContext directly in the DirectRunner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5ac24e0a Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5ac24e0a Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5ac24e0a Branch: refs/heads/master Commit: 5ac24e0a89b95feafccbe381bdde9c11fdf82a88 Parents: 248c808 Author: Kenneth Knowles <[email protected]> Authored: Mon May 22 15:44:17 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Tue May 23 11:16:27 2017 -0700 ---------------------------------------------------------------------- .../runners/direct/DirectExecutionContext.java | 33 +++++++++++++++++--- 1 file changed, 29 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/5ac24e0a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java index d676f24..2a75ef5 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java @@ -17,13 +17,18 @@ */ package org.apache.beam.runners.direct; +import java.io.IOException; import org.apache.beam.runners.core.BaseExecutionContext; -import org.apache.beam.runners.core.BaseStepContext; import org.apache.beam.runners.core.ExecutionContext; +import org.apache.beam.runners.core.StepContext; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext; import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate; import org.apache.beam.runners.direct.WatermarkManager.TransformWatermarks; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.TupleTag; /** * Execution Context for the {@link DirectRunner}. @@ -57,14 +62,16 @@ class DirectExecutionContext /** * Step Context for the {@link DirectRunner}. */ - public class DirectStepContext - extends BaseStepContext { + public class DirectStepContext implements StepContext { private CopyOnAccessInMemoryStateInternals<?> stateInternals; private DirectTimerInternals timerInternals; + private final String stepName; + private final String transformName; public DirectStepContext( ExecutionContext executionContext, String stepName, String transformName) { - super(stepName, transformName); + this.stepName = stepName; + this.transformName = transformName; } @Override @@ -95,6 +102,24 @@ class DirectExecutionContext return null; } + @Override + public String getStepName() { + return stepName; + } + + @Override + public String getTransformName() { + return transformName; + } + + @Override + public <T, W extends BoundedWindow> void writePCollectionViewData( + TupleTag<?> tag, + Iterable<WindowedValue<T>> data, Coder<Iterable<WindowedValue<T>>> dataCoder, + W window, Coder<W> windowCoder) throws IOException { + throw new UnsupportedOperationException("Not implemented."); + } + /** * Gets the timer update of the {@link TimerInternals} of this {@link DirectStepContext}, * which is empty if the {@link TimerInternals} were never accessed.
