Move BaseStepContext to the top level
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/59322d51 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/59322d51 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/59322d51 Branch: refs/heads/master Commit: 59322d51e80e7480710a296f51a4cb65303f5e06 Parents: 8b7a1f6 Author: Kenneth Knowles <[email protected]> Authored: Mon May 22 15:35:46 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Tue May 23 11:16:27 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/core/BaseExecutionContext.java | 46 -------------- .../beam/runners/core/BaseStepContext.java | 66 ++++++++++++++++++++ .../beam/runners/core/SimpleDoFnRunnerTest.java | 1 - .../runners/core/StatefulDoFnRunnerTest.java | 1 - .../runners/direct/DirectExecutionContext.java | 1 + 5 files changed, 67 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/59322d51/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java index 5667250..877fa0a 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java @@ -17,15 +17,10 @@ */ package org.apache.beam.runners.core; -import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.LinkedHashMap; import java.util.Map; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.TupleTag; /** * Base class for implementations of {@link ExecutionContext}. @@ -104,45 +99,4 @@ public abstract class BaseExecutionContext<T extends StepContext> return Collections.unmodifiableCollection(cachedStepContexts.values()); } - /** - * Base class for implementations of {@link org.apache.beam.runners.core.StepContext}. - * - * <p>To complete a concrete subclass, implement {@link #timerInternals} and - * {@link #stateInternals}. - */ - public abstract static class BaseStepContext implements org.apache.beam.runners.core.StepContext { - private final ExecutionContext executionContext; - private final String stepName; - private final String transformName; - - public BaseStepContext(ExecutionContext executionContext, String stepName, String transformName) { - this.executionContext = executionContext; - this.stepName = stepName; - this.transformName = transformName; - } - - @Override - public String getStepName() { - return stepName; - } - - @Override - public String getTransformName() { - return transformName; - } - - @Override - public <T, W extends BoundedWindow> void writePCollectionViewData( - TupleTag<?> tag, - Iterable<WindowedValue<T>> data, Coder<Iterable<WindowedValue<T>>> dataCoder, - W window, Coder<W> windowCoder) throws IOException { - throw new UnsupportedOperationException("Not implemented."); - } - - @Override - public abstract StateInternals stateInternals(); - - @Override - public abstract TimerInternals timerInternals(); - } } http://git-wip-us.apache.org/repos/asf/beam/blob/59322d51/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseStepContext.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseStepContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseStepContext.java new file mode 100644 index 0000000..f0436ac --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseStepContext.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import java.io.IOException; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.TupleTag; + +/** + * Base class for implementations of {@link StepContext}. + * + * <p>To complete a concrete subclass, implement {@link #timerInternals} and + * {@link #stateInternals}. + */ +public abstract class BaseStepContext implements StepContext { + private final ExecutionContext executionContext; + private final String stepName; + private final String transformName; + + public BaseStepContext(ExecutionContext executionContext, String stepName, String transformName) { + this.executionContext = executionContext; + this.stepName = stepName; + this.transformName = transformName; + } + + @Override + public String getStepName() { + return stepName; + } + + @Override + public String getTransformName() { + return transformName; + } + + @Override + public <T, W extends BoundedWindow> void writePCollectionViewData( + TupleTag<?> tag, + Iterable<WindowedValue<T>> data, Coder<Iterable<WindowedValue<T>>> dataCoder, + W window, Coder<W> windowCoder) throws IOException { + throw new UnsupportedOperationException("Not implemented."); + } + + @Override + public abstract StateInternals stateInternals(); + + @Override + public abstract TimerInternals timerInternals(); +} http://git-wip-us.apache.org/repos/asf/beam/blob/59322d51/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java index 3750e6c..59e5857 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java @@ -29,7 +29,6 @@ import com.google.common.collect.ListMultimap; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import org.apache.beam.runners.core.BaseExecutionContext.BaseStepContext; import org.apache.beam.runners.core.DoFnRunners.OutputManager; import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.sdk.coders.Coder; http://git-wip-us.apache.org/repos/asf/beam/blob/59322d51/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java index a335c3a..62a6578 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java @@ -24,7 +24,6 @@ import static org.mockito.Mockito.when; import com.google.common.base.MoreObjects; import java.util.Collections; -import org.apache.beam.runners.core.BaseExecutionContext.BaseStepContext; import org.apache.beam.runners.core.metrics.MetricsContainerImpl; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.VarIntCoder; http://git-wip-us.apache.org/repos/asf/beam/blob/59322d51/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java index 6d2d02a..e5b88e5 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.direct; import org.apache.beam.runners.core.BaseExecutionContext; +import org.apache.beam.runners.core.BaseStepContext; import org.apache.beam.runners.core.ExecutionContext; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
