Inline and delete BaseExecutionContext
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0be3cf34 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0be3cf34 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0be3cf34 Branch: refs/heads/master Commit: 0be3cf3462c19f0b007b2329c95ea4865d22cad5 Parents: 32c6cb1 Author: Kenneth Knowles <[email protected]> Authored: Mon May 22 16:50:41 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Tue May 23 11:16:27 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/core/BaseExecutionContext.java | 102 ------------------- .../runners/direct/DirectExecutionContext.java | 39 +++++-- 2 files changed, 32 insertions(+), 109 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/0be3cf34/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java deleted file mode 100644 index 877fa0a..0000000 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.core; - -import java.util.Collection; -import java.util.Collections; -import java.util.LinkedHashMap; -import java.util.Map; - -/** - * Base class for implementations of {@link ExecutionContext}. - * - * <p>A concrete subclass should implement {@link #createStepContext} to create the appropriate - * {@link BaseStepContext} implementation. Any {@code StepContext} created will - * be cached for the lifetime of this {@link ExecutionContext}. - * - * <p>BaseExecutionContext is generic to allow implementing subclasses to return a concrete subclass - * of {@link BaseStepContext} from {@link #getOrCreateStepContext(String, String)} and - * {@link #getAllStepContexts()} without forcing each subclass to override the method, e.g. - * <pre>{@code - * {@literal @}Override - * StreamingModeExecutionContext.StepContext getOrCreateStepContext(...) { - * return (StreamingModeExecutionContext.StepContext) super.getOrCreateStepContext(...); - * } - * }</pre> - * - * <p>When a subclass of {@code BaseExecutionContext} has been downcast, the return types of - * {@link #createStepContext(String, String)}, - * {@link #getOrCreateStepContext(String, String)}, and {@link #getAllStepContexts()} - * will be appropriately specialized. - */ -public abstract class BaseExecutionContext<T extends StepContext> - implements ExecutionContext { - - private Map<String, T> cachedStepContexts = new LinkedHashMap<>(); - - /** - * Implementations should override this to create the specific type - * of {@link BaseStepContext} they need. - */ - protected abstract T createStepContext(String stepName, String transformName); - - /** - * Returns the {@link BaseStepContext} associated with the given step. - */ - @Override - public T getOrCreateStepContext(String stepName, String transformName) { - final String finalStepName = stepName; - final String finalTransformName = transformName; - return getOrCreateStepContext( - stepName, - new CreateStepContextFunction<T>() { - @Override - public T create() { - return createStepContext(finalStepName, finalTransformName); - } - }); - } - - /** - * Factory method interface to create an execution context if none exists during - * {@link #getOrCreateStepContext(String, CreateStepContextFunction)}. - */ - protected interface CreateStepContextFunction<T extends org.apache.beam.runners.core.StepContext> { - T create(); - } - - protected final T getOrCreateStepContext(String stepName, - CreateStepContextFunction<T> createContextFunc) { - T context = cachedStepContexts.get(stepName); - if (context == null) { - context = createContextFunc.create(); - cachedStepContexts.put(stepName, context); - } - - return context; - } - - /** - * Returns a collection view of all of the {@link BaseStepContext}s. - */ - @Override - public Collection<? extends T> getAllStepContexts() { - return Collections.unmodifiableCollection(cachedStepContexts.values()); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/0be3cf34/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java index 39174d6..9b68662 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java @@ -17,11 +17,14 @@ */ package org.apache.beam.runners.direct; -import org.apache.beam.runners.core.BaseExecutionContext; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; +import org.apache.beam.runners.core.BaseStepContext; import org.apache.beam.runners.core.ExecutionContext; import org.apache.beam.runners.core.StepContext; import org.apache.beam.runners.core.TimerInternals; -import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext; import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate; import org.apache.beam.runners.direct.WatermarkManager.TransformWatermarks; @@ -31,12 +34,12 @@ import org.apache.beam.runners.direct.WatermarkManager.TransformWatermarks; * <p>This implementation is not thread safe. A new {@link DirectExecutionContext} must be created * for each thread that requires it. */ -class DirectExecutionContext - extends BaseExecutionContext<DirectStepContext> { +class DirectExecutionContext implements ExecutionContext { private final Clock clock; private final StructuralKey<?> key; private final CopyOnAccessInMemoryStateInternals existingState; private final TransformWatermarks watermarks; + private Map<String, DirectStepContext> cachedStepContexts = new LinkedHashMap<>(); public DirectExecutionContext( Clock clock, @@ -49,9 +52,31 @@ class DirectExecutionContext this.watermarks = watermarks; } + private DirectStepContext createStepContext(String stepName, String transformName) { + return new DirectStepContext(stepName, transformName); + } + + /** + * Returns the {@link BaseStepContext} associated with the given step. + */ + @Override + public DirectStepContext getOrCreateStepContext(String stepName, String transformName) { + final String finalStepName = stepName; + final String finalTransformName = transformName; + DirectStepContext context = cachedStepContexts.get(stepName); + if (context == null) { + context = createStepContext(finalStepName, finalTransformName); + cachedStepContexts.put(stepName, context); + } + return context; + } + + /** + * Returns a collection view of all of the {@link BaseStepContext}s. + */ @Override - protected DirectStepContext createStepContext(String stepName, String transformName) { - return new DirectStepContext(this, stepName, transformName); + public Collection<? extends DirectStepContext> getAllStepContexts() { + return Collections.unmodifiableCollection(cachedStepContexts.values()); } /** @@ -64,7 +89,7 @@ class DirectExecutionContext private final String transformName; public DirectStepContext( - ExecutionContext executionContext, String stepName, String transformName) { + String stepName, String transformName) { this.stepName = stepName; this.transformName = transformName; }
