Remove unused pieces of DirectStepContext
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b32a1c35 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b32a1c35 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b32a1c35 Branch: refs/heads/master Commit: b32a1c350398a91b1b1552d5257dab6ab7d1da3a Parents: d425b27 Author: Kenneth Knowles <[email protected]> Authored: Tue May 23 11:13:19 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Tue May 23 11:16:42 2017 -0700 ---------------------------------------------------------------------- .../runners/direct/DirectExecutionContext.java | 18 +++++------------- .../direct/GroupAlsoByWindowEvaluatorFactory.java | 2 +- .../runners/direct/ParDoEvaluatorFactory.java | 2 +- ...SplittableProcessElementsEvaluatorFactory.java | 2 +- .../direct/StatefulParDoEvaluatorFactory.java | 2 +- .../runners/direct/EvaluationContextTest.java | 16 ++++++++-------- .../beam/runners/direct/ParDoEvaluatorTest.java | 2 +- .../direct/StatefulParDoEvaluatorFactoryTest.java | 4 ++-- 8 files changed, 20 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/b32a1c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java index 11c1b86..e8ad8d7 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java @@ -48,19 +48,17 @@ class DirectExecutionContext { this.watermarks = watermarks; } - private DirectStepContext createStepContext(String stepName, String transformName) { - return new DirectStepContext(stepName, transformName); + private DirectStepContext createStepContext() { + return new DirectStepContext(); } /** * Returns the {@link StepContext} associated with the given step. */ - public DirectStepContext getStepContext(String stepName, String transformName) { - final String finalStepName = stepName; - final String finalTransformName = transformName; + public DirectStepContext getStepContext(String stepName) { DirectStepContext context = cachedStepContexts.get(stepName); if (context == null) { - context = createStepContext(finalStepName, finalTransformName); + context = createStepContext(); cachedStepContexts.put(stepName, context); } return context; @@ -72,14 +70,8 @@ class DirectExecutionContext { public class DirectStepContext implements StepContext { private CopyOnAccessInMemoryStateInternals<?> stateInternals; private DirectTimerInternals timerInternals; - private final String stepName; - private final String transformName; - public DirectStepContext( - String stepName, String transformName) { - this.stepName = stepName; - this.transformName = transformName; - } + public DirectStepContext() { } @Override public CopyOnAccessInMemoryStateInternals<?> stateInternals() { http://git-wip-us.apache.org/repos/asf/beam/blob/b32a1c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java index 49b7512..1a588ee 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java @@ -130,7 +130,7 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory { stepContext = evaluationContext .getExecutionContext(application, inputBundle.getKey()) .getStepContext( - evaluationContext.getStepName(application), application.getTransform().getName()); + evaluationContext.getStepName(application)); windowingStrategy = (WindowingStrategy<?, BoundedWindow>) application.getTransform().getInputWindowingStrategy(); http://git-wip-us.apache.org/repos/asf/beam/blob/b32a1c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java index 12c6751..8aa75cf 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java @@ -112,7 +112,7 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator DirectStepContext stepContext = evaluationContext .getExecutionContext(application, inputBundleKey) - .getStepContext(stepName, stepName); + .getStepContext(stepName); DoFnLifecycleManager fnManager = fnClones.getUnchecked(doFn); http://git-wip-us.apache.org/repos/asf/beam/blob/b32a1c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java index 13d9345..b85f481c 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java @@ -109,7 +109,7 @@ class SplittableProcessElementsEvaluatorFactory< final DirectExecutionContext.DirectStepContext stepContext = evaluationContext .getExecutionContext(application, inputBundle.getKey()) - .getStepContext(stepName, stepName); + .getStepContext(stepName); final ParDoEvaluator<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>> parDoEvaluator = http://git-wip-us.apache.org/repos/asf/beam/blob/b32a1c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java index 70d0cf5..506c84c 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java @@ -163,7 +163,7 @@ final class StatefulParDoEvaluatorFactory<K, InputT, OutputT> implements Transfo evaluationContext .getExecutionContext( transformOutputWindow.getTransform(), transformOutputWindow.getKey()) - .getStepContext(stepName, stepName); + .getStepContext(stepName); final StateNamespace namespace = StateNamespaces.window( http://git-wip-us.apache.org/repos/asf/beam/blob/b32a1c35/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java index 0e2be8d..80b04f8 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java @@ -160,7 +160,7 @@ public class EvaluationContextTest { StateTag<BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of()); - DirectStepContext stepContext = fooContext.getStepContext("s1", "s1"); + DirectStepContext stepContext = fooContext.getStepContext("s1"); stepContext.stateInternals().state(StateNamespaces.global(), intBag).add(1); context.handleResult( @@ -177,7 +177,7 @@ public class EvaluationContextTest { StructuralKey.of("foo", StringUtf8Coder.of())); assertThat( secondFooContext - .getStepContext("s1", "s1") + .getStepContext("s1") .stateInternals() .state(StateNamespaces.global(), intBag) .read(), @@ -194,7 +194,7 @@ public class EvaluationContextTest { StateTag<BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of()); fooContext - .getStepContext("s1", "s1") + .getStepContext("s1") .stateInternals() .state(StateNamespaces.global(), intBag) .add(1); @@ -205,7 +205,7 @@ public class EvaluationContextTest { assertThat(barContext, not(equalTo(fooContext))); assertThat( barContext - .getStepContext("s1", "s1") + .getStepContext("s1") .stateInternals() .state(StateNamespaces.global(), intBag) .read(), @@ -221,7 +221,7 @@ public class EvaluationContextTest { StateTag<BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of()); fooContext - .getStepContext("s1", "s1") + .getStepContext("s1") .stateInternals() .state(StateNamespaces.global(), intBag) .add(1); @@ -230,7 +230,7 @@ public class EvaluationContextTest { context.getExecutionContext(downstreamProducer, myKey); assertThat( barContext - .getStepContext("s1", "s1") + .getStepContext("s1") .stateInternals() .state(StateNamespaces.global(), intBag) .read(), @@ -246,7 +246,7 @@ public class EvaluationContextTest { StateTag<BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of()); CopyOnAccessInMemoryStateInternals<?> state = - fooContext.getStepContext("s1", "s1").stateInternals(); + fooContext.getStepContext("s1").stateInternals(); BagState<Integer> bag = state.state(StateNamespaces.global(), intBag); bag.add(1); bag.add(2); @@ -266,7 +266,7 @@ public class EvaluationContextTest { context.getExecutionContext(downstreamProducer, myKey); CopyOnAccessInMemoryStateInternals<?> afterResultState = - afterResultContext.getStepContext("s1", "s1").stateInternals(); + afterResultContext.getStepContext("s1").stateInternals(); assertThat(afterResultState.state(StateNamespaces.global(), intBag).read(), contains(1, 2, 4)); } http://git-wip-us.apache.org/repos/asf/beam/blob/b32a1c35/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java index 22b3b7e..09a21ac 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java @@ -141,7 +141,7 @@ public class ParDoEvaluatorTest { DirectStepContext stepContext = mock(DirectStepContext.class); when( executionContext.getStepContext( - Mockito.any(String.class), Mockito.any(String.class))) + Mockito.any(String.class))) .thenReturn(stepContext); when(stepContext.getTimerUpdate()).thenReturn(TimerUpdate.empty()); when( http://git-wip-us.apache.org/repos/asf/beam/blob/b32a1c35/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java index b233c1b..9366b7c 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java @@ -153,7 +153,7 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable { when(mockEvaluationContext.getExecutionContext( eq(producingTransform), Mockito.<StructuralKey>any())) .thenReturn(mockExecutionContext); - when(mockExecutionContext.getStepContext(anyString(), anyString())) + when(mockExecutionContext.getStepContext(anyString())) .thenReturn(mockStepContext); IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new Instant(9)); @@ -269,7 +269,7 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable { when(mockEvaluationContext.getExecutionContext( eq(producingTransform), Mockito.<StructuralKey>any())) .thenReturn(mockExecutionContext); - when(mockExecutionContext.getStepContext(anyString(), anyString())) + when(mockExecutionContext.getStepContext(anyString())) .thenReturn(mockStepContext); when(mockEvaluationContext.createBundle(Matchers.<PCollection<Integer>>any())) .thenReturn(mockUncommittedBundle);
