Repository: beam
Updated Branches:
  refs/heads/master 474345f59 -> c84d3da38


Move StepContext to top level


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/98a75551
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/98a75551
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/98a75551

Branch: refs/heads/master
Commit: 98a75551064c742d108d8c5ec8fc0783db7761d2
Parents: 474345f
Author: Kenneth Knowles <[email protected]>
Authored: Mon May 22 15:28:44 2017 -0700
Committer: Kenneth Knowles <[email protected]>
Committed: Tue May 23 11:16:26 2017 -0700

----------------------------------------------------------------------
 .../apex/translation/utils/NoOpStepContext.java |  6 +-
 .../beam/runners/core/BaseExecutionContext.java |  8 +--
 .../apache/beam/runners/core/DoFnRunners.java   |  1 -
 .../beam/runners/core/ExecutionContext.java     | 47 -------------
 .../beam/runners/core/SimpleDoFnRunner.java     |  1 -
 .../apache/beam/runners/core/StepContext.java   | 70 ++++++++++++++++++++
 .../functions/FlinkNoOpStepContext.java         |  2 +-
 .../wrappers/streaming/DoFnOperator.java        |  7 +-
 .../spark/translation/SparkProcessContext.java  |  2 +-
 .../beam/fn/harness/fake/FakeStepContext.java   |  2 +-
 10 files changed, 83 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/98a75551/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
index 721eecd..241a985 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
@@ -19,8 +19,8 @@ package org.apache.beam.runners.apex.translation.utils;
 
 import java.io.IOException;
 import java.io.Serializable;
-import org.apache.beam.runners.core.ExecutionContext;
 import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StepContext;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -28,9 +28,9 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.TupleTag;
 
 /**
- * Serializable {@link ExecutionContext.StepContext} that does nothing.
+ * Serializable {@link StepContext} that does nothing.
  */
-public class NoOpStepContext implements ExecutionContext.StepContext, 
Serializable {
+public class NoOpStepContext implements StepContext, Serializable {
   private static final long serialVersionUID = 1L;
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/98a75551/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
index 23d61f8..ed37143 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
@@ -49,7 +49,7 @@ import org.apache.beam.sdk.values.TupleTag;
  * {@link #getOrCreateStepContext(String, String)}, and {@link 
#getAllStepContexts()}
  * will be appropriately specialized.
  */
-public abstract class BaseExecutionContext<T extends 
ExecutionContext.StepContext>
+public abstract class BaseExecutionContext<T extends StepContext>
     implements ExecutionContext {
 
   private Map<String, T> cachedStepContexts = new LinkedHashMap<>();
@@ -81,7 +81,7 @@ public abstract class BaseExecutionContext<T extends 
ExecutionContext.StepContex
    * Factory method interface to create an execution context if none exists 
during
    * {@link #getOrCreateStepContext(String, CreateStepContextFunction)}.
    */
-  protected interface CreateStepContextFunction<T extends 
ExecutionContext.StepContext> {
+  protected interface CreateStepContextFunction<T extends 
org.apache.beam.runners.core.StepContext> {
     T create();
   }
 
@@ -111,12 +111,12 @@ public abstract class BaseExecutionContext<T extends 
ExecutionContext.StepContex
   public void noteOutput(TupleTag<?> tag, WindowedValue<?> output) {}
 
   /**
-   * Base class for implementations of {@link ExecutionContext.StepContext}.
+   * Base class for implementations of {@link 
org.apache.beam.runners.core.StepContext}.
    *
    * <p>To complete a concrete subclass, implement {@link #timerInternals} and
    * {@link #stateInternals}.
    */
-  public abstract static class StepContext implements 
ExecutionContext.StepContext {
+  public abstract static class StepContext implements 
org.apache.beam.runners.core.StepContext {
     private final ExecutionContext executionContext;
     private final String stepName;
     private final String transformName;

http://git-wip-us.apache.org/repos/asf/beam/blob/98a75551/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
index 71dfd11..9d3e25d 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
@@ -19,7 +19,6 @@ package org.apache.beam.runners.core;
 
 import java.util.Collection;
 import java.util.List;
-import org.apache.beam.runners.core.ExecutionContext.StepContext;
 import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems.ProcessFn;
 import org.apache.beam.runners.core.StatefulDoFnRunner.CleanupTimer;
 import org.apache.beam.runners.core.StatefulDoFnRunner.StateCleaner;

http://git-wip-us.apache.org/repos/asf/beam/blob/98a75551/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java
index d2fdaac..f431c92 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java
@@ -17,11 +17,8 @@
  */
 package org.apache.beam.runners.core;
 
-import java.io.IOException;
 import java.util.Collection;
-import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.DoFn.WindowedContext;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.TupleTag;
 
@@ -52,48 +49,4 @@ public interface ExecutionContext {
    */
   void noteOutput(TupleTag<?> tag, WindowedValue<?> output);
 
-  /**
-   * Per-step, per-key context used for retrieving state.
-   */
-  public interface StepContext {
-
-    /**
-     * The name of the step.
-     */
-    String getStepName();
-
-    /**
-     * The name of the transform for the step.
-     */
-    String getTransformName();
-
-    /**
-     * Hook for subclasses to implement that will be called whenever
-     * {@link WindowedContext#output}
-     * is called.
-     */
-    void noteOutput(WindowedValue<?> output);
-
-    /**
-     * Hook for subclasses to implement that will be called whenever
-     * {@link WindowedContext#output}
-     * is called.
-     */
-    void noteOutput(TupleTag<?> tag, WindowedValue<?> output);
-
-    /**
-     * Writes the given {@code PCollectionView} data to a globally accessible 
location.
-     */
-    <T, W extends BoundedWindow> void writePCollectionViewData(
-        TupleTag<?> tag,
-        Iterable<WindowedValue<T>> data,
-        Coder<Iterable<WindowedValue<T>>> dataCoder,
-        W window,
-        Coder<W> windowCoder)
-            throws IOException;
-
-    StateInternals stateInternals();
-
-    TimerInternals timerInternals();
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/98a75551/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 65384da..adbe62e 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -29,7 +29,6 @@ import java.util.List;
 import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.core.DoFnRunners.OutputManager;
-import org.apache.beam.runners.core.ExecutionContext.StepContext;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.state.State;

http://git-wip-us.apache.org/repos/asf/beam/blob/98a75551/runners/core-java/src/main/java/org/apache/beam/runners/core/StepContext.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StepContext.java 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StepContext.java
new file mode 100644
index 0000000..a414830
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StepContext.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import java.io.IOException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.DoFn.WindowedContext;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * Per-step, per-key context used for retrieving state.
+ */
+public interface StepContext {
+
+  /**
+   * The name of the step.
+   */
+  String getStepName();
+
+  /**
+   * The name of the transform for the step.
+   */
+  String getTransformName();
+
+  /**
+   * Hook for subclasses to implement that will be called whenever
+   * {@link WindowedContext#output}
+   * is called.
+   */
+  void noteOutput(WindowedValue<?> output);
+
+  /**
+   * Hook for subclasses to implement that will be called whenever
+   * {@link WindowedContext#output}
+   * is called.
+   */
+  void noteOutput(TupleTag<?> tag, WindowedValue<?> output);
+
+  /**
+   * Writes the given {@code PCollectionView} data to a globally accessible 
location.
+   */
+  <T, W extends BoundedWindow> void writePCollectionViewData(
+      TupleTag<?> tag,
+      Iterable<WindowedValue<T>> data,
+      Coder<Iterable<WindowedValue<T>>> dataCoder,
+      W window,
+      Coder<W> windowCoder)
+          throws IOException;
+
+  StateInternals stateInternals();
+
+  TimerInternals timerInternals();
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/98a75551/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java
index 8640801..c394ebd 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java
@@ -18,7 +18,7 @@
 package org.apache.beam.runners.flink.translation.functions;
 
 import java.io.IOException;
-import org.apache.beam.runners.core.ExecutionContext.StepContext;
+import org.apache.beam.runners.core.StepContext;
 import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.sdk.coders.Coder;

http://git-wip-us.apache.org/repos/asf/beam/blob/98a75551/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index f35ba7a..c9f106a 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -32,7 +32,6 @@ import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
-import org.apache.beam.runners.core.ExecutionContext;
 import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn;
 import org.apache.beam.runners.core.NullSideInputReader;
 import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
@@ -184,7 +183,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
         
TimerInternals.TimerDataCoder.of(windowingStrategy.getWindowFn().windowCoder());
   }
 
-  private ExecutionContext.StepContext createStepContext() {
+  private org.apache.beam.runners.core.StepContext createStepContext() {
     return new StepContext();
   }
 
@@ -250,7 +249,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
 
     doFnInvoker.invokeSetup();
 
-    ExecutionContext.StepContext stepContext = createStepContext();
+    org.apache.beam.runners.core.StepContext stepContext = createStepContext();
 
     doFnRunner = DoFnRunners.simpleRunner(
         serializedOptions.getPipelineOptions(),
@@ -676,7 +675,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
    * {@link StepContext} for running {@link DoFn DoFns} on Flink. This does 
not allow
    * accessing state or timer internals.
    */
-  protected class StepContext implements ExecutionContext.StepContext {
+  protected class StepContext implements 
org.apache.beam.runners.core.StepContext {
 
     @Override
     public String getStepName() {

http://git-wip-us.apache.org/repos/asf/beam/blob/98a75551/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
index ffe343b..9147422 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
@@ -24,7 +24,7 @@ import java.io.IOException;
 import java.util.Iterator;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners.OutputManager;
-import org.apache.beam.runners.core.ExecutionContext.StepContext;
+import org.apache.beam.runners.core.StepContext;
 import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.sdk.coders.Coder;

http://git-wip-us.apache.org/repos/asf/beam/blob/98a75551/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java
index 9b79d11..b206bc7 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java
@@ -19,7 +19,7 @@
 package org.apache.beam.fn.harness.fake;
 
 import java.io.IOException;
-import org.apache.beam.runners.core.ExecutionContext.StepContext;
+import org.apache.beam.runners.core.StepContext;
 import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.sdk.coders.Coder;

Reply via email to