Repository: beam Updated Branches: refs/heads/master 474345f59 -> c84d3da38
Move StepContext to top level Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/98a75551 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/98a75551 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/98a75551 Branch: refs/heads/master Commit: 98a75551064c742d108d8c5ec8fc0783db7761d2 Parents: 474345f Author: Kenneth Knowles <[email protected]> Authored: Mon May 22 15:28:44 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Tue May 23 11:16:26 2017 -0700 ---------------------------------------------------------------------- .../apex/translation/utils/NoOpStepContext.java | 6 +- .../beam/runners/core/BaseExecutionContext.java | 8 +-- .../apache/beam/runners/core/DoFnRunners.java | 1 - .../beam/runners/core/ExecutionContext.java | 47 ------------- .../beam/runners/core/SimpleDoFnRunner.java | 1 - .../apache/beam/runners/core/StepContext.java | 70 ++++++++++++++++++++ .../functions/FlinkNoOpStepContext.java | 2 +- .../wrappers/streaming/DoFnOperator.java | 7 +- .../spark/translation/SparkProcessContext.java | 2 +- .../beam/fn/harness/fake/FakeStepContext.java | 2 +- 10 files changed, 83 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/98a75551/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java index 721eecd..241a985 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java @@ -19,8 +19,8 @@ package org.apache.beam.runners.apex.translation.utils; import java.io.IOException; import java.io.Serializable; -import org.apache.beam.runners.core.ExecutionContext; import org.apache.beam.runners.core.StateInternals; +import org.apache.beam.runners.core.StepContext; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -28,9 +28,9 @@ import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.TupleTag; /** - * Serializable {@link ExecutionContext.StepContext} that does nothing. + * Serializable {@link StepContext} that does nothing. */ -public class NoOpStepContext implements ExecutionContext.StepContext, Serializable { +public class NoOpStepContext implements StepContext, Serializable { private static final long serialVersionUID = 1L; @Override http://git-wip-us.apache.org/repos/asf/beam/blob/98a75551/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java index 23d61f8..ed37143 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java @@ -49,7 +49,7 @@ import org.apache.beam.sdk.values.TupleTag; * {@link #getOrCreateStepContext(String, String)}, and {@link #getAllStepContexts()} * will be appropriately specialized. */ -public abstract class BaseExecutionContext<T extends ExecutionContext.StepContext> +public abstract class BaseExecutionContext<T extends StepContext> implements ExecutionContext { private Map<String, T> cachedStepContexts = new LinkedHashMap<>(); @@ -81,7 +81,7 @@ public abstract class BaseExecutionContext<T extends ExecutionContext.StepContex * Factory method interface to create an execution context if none exists during * {@link #getOrCreateStepContext(String, CreateStepContextFunction)}. */ - protected interface CreateStepContextFunction<T extends ExecutionContext.StepContext> { + protected interface CreateStepContextFunction<T extends org.apache.beam.runners.core.StepContext> { T create(); } @@ -111,12 +111,12 @@ public abstract class BaseExecutionContext<T extends ExecutionContext.StepContex public void noteOutput(TupleTag<?> tag, WindowedValue<?> output) {} /** - * Base class for implementations of {@link ExecutionContext.StepContext}. + * Base class for implementations of {@link org.apache.beam.runners.core.StepContext}. * * <p>To complete a concrete subclass, implement {@link #timerInternals} and * {@link #stateInternals}. */ - public abstract static class StepContext implements ExecutionContext.StepContext { + public abstract static class StepContext implements org.apache.beam.runners.core.StepContext { private final ExecutionContext executionContext; private final String stepName; private final String transformName; http://git-wip-us.apache.org/repos/asf/beam/blob/98a75551/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java index 71dfd11..9d3e25d 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java @@ -19,7 +19,6 @@ package org.apache.beam.runners.core; import java.util.Collection; import java.util.List; -import org.apache.beam.runners.core.ExecutionContext.StepContext; import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems.ProcessFn; import org.apache.beam.runners.core.StatefulDoFnRunner.CleanupTimer; import org.apache.beam.runners.core.StatefulDoFnRunner.StateCleaner; http://git-wip-us.apache.org/repos/asf/beam/blob/98a75551/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java index d2fdaac..f431c92 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java @@ -17,11 +17,8 @@ */ package org.apache.beam.runners.core; -import java.io.IOException; import java.util.Collection; -import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.DoFn.WindowedContext; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.TupleTag; @@ -52,48 +49,4 @@ public interface ExecutionContext { */ void noteOutput(TupleTag<?> tag, WindowedValue<?> output); - /** - * Per-step, per-key context used for retrieving state. - */ - public interface StepContext { - - /** - * The name of the step. - */ - String getStepName(); - - /** - * The name of the transform for the step. - */ - String getTransformName(); - - /** - * Hook for subclasses to implement that will be called whenever - * {@link WindowedContext#output} - * is called. - */ - void noteOutput(WindowedValue<?> output); - - /** - * Hook for subclasses to implement that will be called whenever - * {@link WindowedContext#output} - * is called. - */ - void noteOutput(TupleTag<?> tag, WindowedValue<?> output); - - /** - * Writes the given {@code PCollectionView} data to a globally accessible location. - */ - <T, W extends BoundedWindow> void writePCollectionViewData( - TupleTag<?> tag, - Iterable<WindowedValue<T>> data, - Coder<Iterable<WindowedValue<T>>> dataCoder, - W window, - Coder<W> windowCoder) - throws IOException; - - StateInternals stateInternals(); - - TimerInternals timerInternals(); - } } http://git-wip-us.apache.org/repos/asf/beam/blob/98a75551/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index 65384da..adbe62e 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -29,7 +29,6 @@ import java.util.List; import java.util.Set; import javax.annotation.Nullable; import org.apache.beam.runners.core.DoFnRunners.OutputManager; -import org.apache.beam.runners.core.ExecutionContext.StepContext; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.state.State; http://git-wip-us.apache.org/repos/asf/beam/blob/98a75551/runners/core-java/src/main/java/org/apache/beam/runners/core/StepContext.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StepContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StepContext.java new file mode 100644 index 0000000..a414830 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StepContext.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import java.io.IOException; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.DoFn.WindowedContext; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.TupleTag; + +/** + * Per-step, per-key context used for retrieving state. + */ +public interface StepContext { + + /** + * The name of the step. + */ + String getStepName(); + + /** + * The name of the transform for the step. + */ + String getTransformName(); + + /** + * Hook for subclasses to implement that will be called whenever + * {@link WindowedContext#output} + * is called. + */ + void noteOutput(WindowedValue<?> output); + + /** + * Hook for subclasses to implement that will be called whenever + * {@link WindowedContext#output} + * is called. + */ + void noteOutput(TupleTag<?> tag, WindowedValue<?> output); + + /** + * Writes the given {@code PCollectionView} data to a globally accessible location. + */ + <T, W extends BoundedWindow> void writePCollectionViewData( + TupleTag<?> tag, + Iterable<WindowedValue<T>> data, + Coder<Iterable<WindowedValue<T>>> dataCoder, + W window, + Coder<W> windowCoder) + throws IOException; + + StateInternals stateInternals(); + + TimerInternals timerInternals(); +} http://git-wip-us.apache.org/repos/asf/beam/blob/98a75551/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java index 8640801..c394ebd 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java @@ -18,7 +18,7 @@ package org.apache.beam.runners.flink.translation.functions; import java.io.IOException; -import org.apache.beam.runners.core.ExecutionContext.StepContext; +import org.apache.beam.runners.core.StepContext; import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.sdk.coders.Coder; http://git-wip-us.apache.org/repos/asf/beam/blob/98a75551/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index f35ba7a..c9f106a 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -32,7 +32,6 @@ import java.util.Map; import javax.annotation.Nullable; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; -import org.apache.beam.runners.core.ExecutionContext; import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn; import org.apache.beam.runners.core.NullSideInputReader; import org.apache.beam.runners.core.PushbackSideInputDoFnRunner; @@ -184,7 +183,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> TimerInternals.TimerDataCoder.of(windowingStrategy.getWindowFn().windowCoder()); } - private ExecutionContext.StepContext createStepContext() { + private org.apache.beam.runners.core.StepContext createStepContext() { return new StepContext(); } @@ -250,7 +249,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> doFnInvoker.invokeSetup(); - ExecutionContext.StepContext stepContext = createStepContext(); + org.apache.beam.runners.core.StepContext stepContext = createStepContext(); doFnRunner = DoFnRunners.simpleRunner( serializedOptions.getPipelineOptions(), @@ -676,7 +675,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> * {@link StepContext} for running {@link DoFn DoFns} on Flink. This does not allow * accessing state or timer internals. */ - protected class StepContext implements ExecutionContext.StepContext { + protected class StepContext implements org.apache.beam.runners.core.StepContext { @Override public String getStepName() { http://git-wip-us.apache.org/repos/asf/beam/blob/98a75551/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java index ffe343b..9147422 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java @@ -24,7 +24,7 @@ import java.io.IOException; import java.util.Iterator; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners.OutputManager; -import org.apache.beam.runners.core.ExecutionContext.StepContext; +import org.apache.beam.runners.core.StepContext; import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.sdk.coders.Coder; http://git-wip-us.apache.org/repos/asf/beam/blob/98a75551/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java index 9b79d11..b206bc7 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java @@ -19,7 +19,7 @@ package org.apache.beam.fn.harness.fake; import java.io.IOException; -import org.apache.beam.runners.core.ExecutionContext.StepContext; +import org.apache.beam.runners.core.StepContext; import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.sdk.coders.Coder;
