Simplify type parameters of StateSpec and related Before this change, almost all uses of state had a type variable that existing only to support the esoteric use of a KeyedCombineFn in a state cell. KeyedCombineFn is now gone, so the key is no longer required.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8fe59c35 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8fe59c35 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8fe59c35 Branch: refs/heads/master Commit: 8fe59c3555e65c21c0a0bbaa5a0ba9eac39316dd Parents: eec903f Author: Kenneth Knowles <[email protected]> Authored: Thu Apr 20 20:46:37 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Tue May 2 11:55:18 2017 -0700 ---------------------------------------------------------------------- .../operators/ApexParDoOperator.java | 4 +- .../translation/utils/ApexStateInternals.java | 42 +++-- .../apex/translation/utils/NoOpStepContext.java | 2 +- .../translation/utils/StateInternalsProxy.java | 6 +- .../utils/ApexStateInternalsTest.java | 12 +- .../construction/PTransformMatchersTest.java | 2 +- .../beam/runners/core/BaseExecutionContext.java | 2 +- .../beam/runners/core/ExecutionContext.java | 2 +- .../GroupAlsoByWindowViaOutputBufferDoFn.java | 2 +- .../core/GroupAlsoByWindowViaWindowSetDoFn.java | 2 +- .../GroupAlsoByWindowViaWindowSetNewDoFn.java | 2 +- .../runners/core/InMemoryStateInternals.java | 36 ++-- .../runners/core/MergingActiveWindowSet.java | 4 +- .../beam/runners/core/MergingStateAccessor.java | 2 +- .../apache/beam/runners/core/NonEmptyPanes.java | 2 +- .../beam/runners/core/PaneInfoTracker.java | 2 +- .../runners/core/ReduceFnContextFactory.java | 37 ++-- .../beam/runners/core/ReduceFnRunner.java | 4 +- .../beam/runners/core/SideInputHandler.java | 14 +- .../beam/runners/core/SimpleDoFnRunner.java | 8 +- .../beam/runners/core/SimpleOldDoFnRunner.java | 2 +- .../beam/runners/core/SplittableParDo.java | 8 +- .../apache/beam/runners/core/StateAccessor.java | 2 +- .../beam/runners/core/StateInternals.java | 8 +- .../runners/core/StateInternalsFactory.java | 2 +- .../apache/beam/runners/core/StateMerging.java | 16 +- .../apache/beam/runners/core/StateTable.java | 10 +- .../org/apache/beam/runners/core/StateTag.java | 28 ++- .../org/apache/beam/runners/core/StateTags.java | 70 ++++---- .../beam/runners/core/StatefulDoFnRunner.java | 6 +- .../beam/runners/core/SystemReduceFn.java | 8 +- .../core/TestInMemoryStateInternals.java | 6 +- .../apache/beam/runners/core/WatermarkHold.java | 8 +- .../beam/runners/core/WindowingInternals.java | 2 +- .../AfterDelayFromFirstElementStateMachine.java | 2 +- .../core/triggers/AfterPaneStateMachine.java | 2 +- .../TriggerStateMachineContextFactory.java | 12 +- .../triggers/TriggerStateMachineRunner.java | 2 +- .../core/GroupAlsoByWindowsProperties.java | 10 +- .../core/InMemoryStateInternalsTest.java | 16 +- .../core/MergingActiveWindowSetTest.java | 2 +- .../beam/runners/core/ReduceFnTester.java | 18 +- .../beam/runners/core/SplittableParDoTest.java | 2 +- .../apache/beam/runners/core/StateTagTest.java | 62 +++---- .../runners/core/StatefulDoFnRunnerTest.java | 4 +- .../CopyOnAccessInMemoryStateInternals.java | 118 ++++++------- .../runners/direct/DirectExecutionContext.java | 15 +- .../beam/runners/direct/EvaluationContext.java | 8 +- .../GroupAlsoByWindowEvaluatorFactory.java | 6 +- .../beam/runners/direct/ParDoEvaluator.java | 2 +- ...littableProcessElementsEvaluatorFactory.java | 2 +- .../direct/StatefulParDoEvaluatorFactory.java | 5 +- .../runners/direct/StepTransformResult.java | 4 +- .../beam/runners/direct/TransformResult.java | 2 +- .../CopyOnAccessInMemoryStateInternalsTest.java | 106 +++++------ .../beam/runners/direct/DirectRunnerTest.java | 1 + .../runners/direct/EvaluationContextTest.java | 12 +- .../StatefulParDoEvaluatorFactoryTest.java | 10 +- .../functions/FlinkNoOpStepContext.java | 2 +- .../functions/FlinkStatefulDoFnFunction.java | 2 +- .../wrappers/streaming/DoFnOperator.java | 6 +- .../streaming/SplittableDoFnOperator.java | 4 +- .../wrappers/streaming/WindowDoFnOperator.java | 4 +- .../state/FlinkBroadcastStateInternals.java | 45 +++-- .../state/FlinkKeyGroupStateInternals.java | 29 ++- .../state/FlinkSplitStateInternals.java | 29 ++- .../streaming/state/FlinkStateInternals.java | 48 ++--- .../flink/streaming/DoFnOperatorTest.java | 4 +- .../FlinkBroadcastStateInternalsTest.java | 6 +- .../FlinkKeyGroupStateInternalsTest.java | 2 +- .../streaming/FlinkSplitStateInternalsTest.java | 2 +- .../streaming/FlinkStateInternalsTest.java | 12 +- .../BatchStatefulParDoOverridesTest.java | 2 +- .../DataflowPipelineTranslatorTest.java | 2 +- .../spark/stateful/SparkStateInternals.java | 44 +++-- ...SparkGroupAlsoByWindowViaOutputBufferFn.java | 2 +- .../spark/translation/SparkProcessContext.java | 2 +- .../spark/translation/TranslationUtils.java | 2 +- .../org/apache/beam/sdk/transforms/DoFn.java | 8 +- .../beam/sdk/transforms/GroupIntoBatches.java | 6 +- .../org/apache/beam/sdk/transforms/ParDo.java | 2 +- .../apache/beam/sdk/util/state/StateBinder.java | 28 +-- .../apache/beam/sdk/util/state/StateSpec.java | 15 +- .../apache/beam/sdk/util/state/StateSpecs.java | 92 +++++----- .../apache/beam/sdk/transforms/ParDoTest.java | 177 +++++++++---------- .../transforms/reflect/DoFnInvokersTest.java | 2 +- .../transforms/reflect/DoFnSignaturesTest.java | 26 +-- .../beam/fn/harness/fake/FakeStepContext.java | 2 +- 88 files changed, 692 insertions(+), 701 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java index b66d818..d5dd0dd 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java @@ -94,7 +94,7 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements private StateInternalsProxy<?> currentKeyStateInternals; private final ApexTimerInternals<Object> currentKeyTimerInternals; - private final StateInternals<Void> sideInputStateInternals; + private final StateInternals sideInputStateInternals; private final ValueAndCoderKryoSerializable<List<WindowedValue<InputT>>> pushedBack; private LongMin pushedBackWatermark = new LongMin(); private long currentInputWatermark = Long.MIN_VALUE; @@ -327,7 +327,7 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements NoOpStepContext stepContext = new NoOpStepContext() { @Override - public StateInternals<?> stateInternals() { + public StateInternals stateInternals() { return currentKeyStateInternals; } http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java index e682894..4300567 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java @@ -65,7 +65,7 @@ import org.joda.time.Instant; * <p>For fields that need to be serialized, use {@link ApexStateInternalsFactory} * or {@link StateInternalsProxy} */ -public class ApexStateInternals<K> implements StateInternals<K> { +public class ApexStateInternals<K> implements StateInternals { private final K key; private final Table<String, String, byte[]> stateTable; @@ -80,46 +80,44 @@ public class ApexStateInternals<K> implements StateInternals<K> { } @Override - public <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> address) { + public <T extends State> T state(StateNamespace namespace, StateTag<T> address) { return state(namespace, address, StateContexts.nullContext()); } @Override public <T extends State> T state( - StateNamespace namespace, StateTag<? super K, T> address, final StateContext<?> c) { - return address.bind(new ApexStateBinder(key, namespace, address, c)); + StateNamespace namespace, StateTag<T> address, final StateContext<?> c) { + return address.bind(new ApexStateBinder(namespace, address, c)); } /** * A {@link StateBinder} that returns {@link State} wrappers for serialized state. */ - private class ApexStateBinder implements StateBinder<K> { - private final K key; + private class ApexStateBinder implements StateBinder { private final StateNamespace namespace; private final StateContext<?> c; - private ApexStateBinder(K key, StateNamespace namespace, StateTag<? super K, ?> address, + private ApexStateBinder(StateNamespace namespace, StateTag<?> address, StateContext<?> c) { - this.key = key; this.namespace = namespace; this.c = c; } @Override public <T> ValueState<T> bindValue( - StateTag<? super K, ValueState<T>> address, Coder<T> coder) { + StateTag<ValueState<T>> address, Coder<T> coder) { return new ApexValueState<>(namespace, address, coder); } @Override public <T> BagState<T> bindBag( - final StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) { + final StateTag<BagState<T>> address, Coder<T> elemCoder) { return new ApexBagState<>(namespace, address, elemCoder); } @Override public <T> SetState<T> bindSet( - StateTag<? super K, SetState<T>> address, + StateTag<SetState<T>> address, Coder<T> elemCoder) { throw new UnsupportedOperationException( String.format("%s is not supported", SetState.class.getSimpleName())); @@ -127,7 +125,7 @@ public class ApexStateInternals<K> implements StateInternals<K> { @Override public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap( - StateTag<? super K, MapState<KeyT, ValueT>> spec, + StateTag<MapState<KeyT, ValueT>> spec, Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) { throw new UnsupportedOperationException( String.format("%s is not supported", MapState.class.getSimpleName())); @@ -136,7 +134,7 @@ public class ApexStateInternals<K> implements StateInternals<K> { @Override public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValue( - StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, + StateTag<CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, final CombineFn<InputT, AccumT, OutputT> combineFn) { return new ApexCombiningState<>( @@ -149,8 +147,8 @@ public class ApexStateInternals<K> implements StateInternals<K> { } @Override - public <W extends BoundedWindow> WatermarkHoldState bindWatermark( - StateTag<? super K, WatermarkHoldState> address, + public WatermarkHoldState bindWatermark( + StateTag<WatermarkHoldState> address, TimestampCombiner timestampCombiner) { return new ApexWatermarkHoldState<>(namespace, address, timestampCombiner); } @@ -158,7 +156,7 @@ public class ApexStateInternals<K> implements StateInternals<K> { @Override public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext( - StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, + StateTag<CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, CombineFnWithContext<InputT, AccumT, OutputT> combineFn) { return bindCombiningValue(address, accumCoder, CombineFnUtil.bindContext(combineFn, c)); @@ -167,12 +165,12 @@ public class ApexStateInternals<K> implements StateInternals<K> { private class AbstractState<T> { protected final StateNamespace namespace; - protected final StateTag<?, ? extends State> address; + protected final StateTag<? extends State> address; protected final Coder<T> coder; private AbstractState( StateNamespace namespace, - StateTag<?, ? extends State> address, + StateTag<? extends State> address, Coder<T> coder) { this.namespace = namespace; this.address = address; @@ -233,7 +231,7 @@ public class ApexStateInternals<K> implements StateInternals<K> { private ApexValueState( StateNamespace namespace, - StateTag<?, ValueState<T>> address, + StateTag<ValueState<T>> address, Coder<T> coder) { super(namespace, address, coder); } @@ -261,7 +259,7 @@ public class ApexStateInternals<K> implements StateInternals<K> { public ApexWatermarkHoldState( StateNamespace namespace, - StateTag<?, WatermarkHoldState> address, + StateTag<WatermarkHoldState> address, TimestampCombiner timestampCombiner) { super(namespace, address, InstantCoder.of()); this.timestampCombiner = timestampCombiner; @@ -312,7 +310,7 @@ public class ApexStateInternals<K> implements StateInternals<K> { private final CombineFn<InputT, AccumT, OutputT> combineFn; private ApexCombiningState(StateNamespace namespace, - StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, + StateTag<CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> coder, K key, CombineFn<InputT, AccumT, OutputT> combineFn) { super(namespace, address, coder); @@ -376,7 +374,7 @@ public class ApexStateInternals<K> implements StateInternals<K> { private final class ApexBagState<T> extends AbstractState<List<T>> implements BagState<T> { private ApexBagState( StateNamespace namespace, - StateTag<?, BagState<T>> address, + StateTag<BagState<T>> address, Coder<T> coder) { super(namespace, address, ListCoder.of(coder)); } http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java index cc64c7c..721eecd 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java @@ -60,7 +60,7 @@ public class NoOpStepContext implements ExecutionContext.StepContext, Serializab } @Override - public StateInternals<?> stateInternals() { + public StateInternals stateInternals() { return null; } http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/StateInternalsProxy.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/StateInternalsProxy.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/StateInternalsProxy.java index 1f28364..746be2f 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/StateInternalsProxy.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/StateInternalsProxy.java @@ -32,7 +32,7 @@ import org.apache.beam.sdk.util.state.StateContext; * @param <K> */ @DefaultSerializer(JavaSerializer.class) -public class StateInternalsProxy<K> implements StateInternals<K>, Serializable { +public class StateInternalsProxy<K> implements StateInternals, Serializable { private final StateInternalsFactory<K> factory; private transient K currentKey; @@ -55,12 +55,12 @@ public class StateInternalsProxy<K> implements StateInternals<K>, Serializable { } @Override - public <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> address) { + public <T extends State> T state(StateNamespace namespace, StateTag<T> address) { return factory.stateInternalsForKey(currentKey).state(namespace, address); } @Override - public <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> address, + public <T extends State> T state(StateNamespace namespace, StateTag<T> address, StateContext<?> c) { return factory.stateInternalsForKey(currentKey).state(namespace, address, c); } http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java index 225b654..8b48a74 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java @@ -58,19 +58,19 @@ public class ApexStateInternalsTest { private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2"); private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3"); - private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR = + private static final StateTag<ValueState<String>> STRING_VALUE_ADDR = StateTags.value("stringValue", StringUtf8Coder.of()); - private static final StateTag<Object, CombiningState<Integer, int[], Integer>> + private static final StateTag<CombiningState<Integer, int[], Integer>> SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal( "sumInteger", VarIntCoder.of(), Sum.ofIntegers()); - private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR = + private static final StateTag<BagState<String>> STRING_BAG_ADDR = StateTags.bag("stringBag", StringUtf8Coder.of()); - private static final StateTag<Object, WatermarkHoldState> + private static final StateTag<WatermarkHoldState> WATERMARK_EARLIEST_ADDR = StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST); - private static final StateTag<Object, WatermarkHoldState> WATERMARK_LATEST_ADDR = + private static final StateTag<WatermarkHoldState> WATERMARK_LATEST_ADDR = StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST); - private static final StateTag<Object, WatermarkHoldState> WATERMARK_EOW_ADDR = + private static final StateTag<WatermarkHoldState> WATERMARK_EOW_ADDR = StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW); private ApexStateInternals<String> underTest; http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java index 9754bb3..33ba80c 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java @@ -169,7 +169,7 @@ public class PTransformMatchersTest implements Serializable { private final String stateId = "mystate"; @StateId(stateId) - private final StateSpec<Object, ValueState<Integer>> intState = + private final StateSpec<ValueState<Integer>> intState = StateSpecs.value(VarIntCoder.of()); @ProcessElement http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java index cc7b574..aa1474c 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java @@ -165,7 +165,7 @@ public abstract class BaseExecutionContext<T extends ExecutionContext.StepContex } @Override - public abstract StateInternals<?> stateInternals(); + public abstract StateInternals stateInternals(); @Override public abstract TimerInternals timerInternals(); http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java index ecd30c0..c0d01ae 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java @@ -92,7 +92,7 @@ public interface ExecutionContext { Coder<W> windowCoder) throws IOException; - StateInternals<?> stateInternals(); + StateInternals stateInternals(); TimerInternals timerInternals(); } http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java index 9e66f07..311ed1c 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java @@ -57,7 +57,7 @@ public class GroupAlsoByWindowViaOutputBufferDoFn<K, InputT, OutputT, W extends InMemoryTimerInternals timerInternals = new InMemoryTimerInternals(); timerInternals.advanceProcessingTime(Instant.now()); timerInternals.advanceSynchronizedProcessingTime(Instant.now()); - StateInternals<K> stateInternals = stateInternalsFactory.stateInternalsForKey(key); + StateInternals stateInternals = stateInternalsFactory.stateInternalsForKey(key); ReduceFnRunner<K, InputT, OutputT, W> reduceFnRunner = new ReduceFnRunner<>( http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java index 651458f..34afa5d 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java @@ -63,7 +63,7 @@ public class GroupAlsoByWindowViaWindowSetDoFn< K key = keyedWorkItem.key(); TimerInternals timerInternals = c.windowingInternals().timerInternals(); - StateInternals<K> stateInternals = stateInternalsFactory.stateInternalsForKey(key); + StateInternals stateInternals = stateInternalsFactory.stateInternalsForKey(key); ReduceFnRunner<K, InputT, OutputT, W> reduceFnRunner = new ReduceFnRunner<>( http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java index de4ac29..c553dbf 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java @@ -114,7 +114,7 @@ public class GroupAlsoByWindowViaWindowSetNewDoFn< KeyedWorkItem<K, InputT> keyedWorkItem = c.element(); K key = keyedWorkItem.key(); - StateInternals<K> stateInternals = stateInternalsFactory.stateInternalsForKey(key); + StateInternals stateInternals = stateInternalsFactory.stateInternalsForKey(key); TimerInternals timerInternals = timerInternalsFactory.timerInternalsForKey(key); ReduceFnRunner<K, InputT, OutputT, W> reduceFnRunner = http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java index 2c02282..199ce41 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java @@ -53,7 +53,7 @@ import org.joda.time.Instant; * and for running tests that need state. */ @Experimental(Kind.STATE) -public class InMemoryStateInternals<K> implements StateInternals<K> { +public class InMemoryStateInternals<K> implements StateInternals { public static <K> InMemoryStateInternals<K> forKey(K key) { return new InMemoryStateInternals<>(key); @@ -79,10 +79,10 @@ public class InMemoryStateInternals<K> implements StateInternals<K> { T copy(); } - protected final StateTable<K> inMemoryState = new StateTable<K>() { + protected final StateTable inMemoryState = new StateTable() { @Override - protected StateBinder<K> binderForNamespace(StateNamespace namespace, StateContext<?> c) { - return new InMemoryStateBinder<K>(key, c); + protected StateBinder binderForNamespace(StateNamespace namespace, StateContext<?> c) { + return new InMemoryStateBinder(c); } }; @@ -99,48 +99,46 @@ public class InMemoryStateInternals<K> implements StateInternals<K> { } @Override - public <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> address) { + public <T extends State> T state(StateNamespace namespace, StateTag<T> address) { return inMemoryState.get(namespace, address, StateContexts.nullContext()); } @Override public <T extends State> T state( - StateNamespace namespace, StateTag<? super K, T> address, final StateContext<?> c) { + StateNamespace namespace, StateTag<T> address, final StateContext<?> c) { return inMemoryState.get(namespace, address, c); } /** * A {@link StateBinder} that returns In Memory {@link State} objects. */ - public static class InMemoryStateBinder<K> implements StateBinder<K> { - private final K key; + public static class InMemoryStateBinder implements StateBinder { private final StateContext<?> c; - public InMemoryStateBinder(K key, StateContext<?> c) { - this.key = key; + public InMemoryStateBinder(StateContext<?> c) { this.c = c; } @Override public <T> ValueState<T> bindValue( - StateTag<? super K, ValueState<T>> address, Coder<T> coder) { + StateTag<ValueState<T>> address, Coder<T> coder) { return new InMemoryValue<T>(); } @Override public <T> BagState<T> bindBag( - final StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) { + final StateTag<BagState<T>> address, Coder<T> elemCoder) { return new InMemoryBag<T>(); } @Override - public <T> SetState<T> bindSet(StateTag<? super K, SetState<T>> spec, Coder<T> elemCoder) { + public <T> SetState<T> bindSet(StateTag<SetState<T>> spec, Coder<T> elemCoder) { return new InMemorySet<>(); } @Override public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap( - StateTag<? super K, MapState<KeyT, ValueT>> spec, + StateTag<MapState<KeyT, ValueT>> spec, Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) { return new InMemoryMap<>(); } @@ -148,23 +146,23 @@ public class InMemoryStateInternals<K> implements StateInternals<K> { @Override public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValue( - StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, + StateTag<CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, final CombineFn<InputT, AccumT, OutputT> combineFn) { return new InMemoryCombiningState<>(combineFn); } @Override - public <W extends BoundedWindow> WatermarkHoldState bindWatermark( - StateTag<? super K, WatermarkHoldState> address, + public WatermarkHoldState bindWatermark( + StateTag<WatermarkHoldState> address, TimestampCombiner timestampCombiner) { - return new InMemoryWatermarkHold<W>(timestampCombiner); + return new InMemoryWatermarkHold(timestampCombiner); } @Override public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext( - StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, + StateTag<CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, CombineFnWithContext<InputT, AccumT, OutputT> combineFn) { return bindCombiningValue(address, accumCoder, CombineFnUtil.bindContext(combineFn, c)); http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingActiveWindowSet.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingActiveWindowSet.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingActiveWindowSet.java index b4e864c..2faedbb 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingActiveWindowSet.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingActiveWindowSet.java @@ -67,10 +67,10 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWi */ private final ValueState<Map<W, Set<W>>> valueState; - public MergingActiveWindowSet(WindowFn<Object, W> windowFn, StateInternals<?> state) { + public MergingActiveWindowSet(WindowFn<Object, W> windowFn, StateInternals state) { this.windowFn = windowFn; - StateTag<Object, ValueState<Map<W, Set<W>>>> tag = + StateTag<ValueState<Map<W, Set<W>>>> tag = StateTags.makeSystemTagInternal(StateTags.value( "tree", MapCoder.of(windowFn.windowCoder(), SetCoder.of(windowFn.windowCoder())))); valueState = state.state(StateNamespaces.global(), tag); http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingStateAccessor.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingStateAccessor.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingStateAccessor.java index e948650..5ffb9a2 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingStateAccessor.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingStateAccessor.java @@ -37,5 +37,5 @@ public interface MergingStateAccessor<K, W extends BoundedWindow> * are known to have state. */ <StateT extends State> Map<W, StateT> accessInEachMergingWindow( - StateTag<? super K, StateT> address); + StateTag<StateT> address); } http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java index 3e875c2..06dcc9c 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java @@ -113,7 +113,7 @@ public abstract class NonEmptyPanes<K, W extends BoundedWindow> { private static class GeneralNonEmptyPanes<K, W extends BoundedWindow> extends NonEmptyPanes<K, W> { - private static final StateTag<Object, CombiningState<Long, long[], Long>> + private static final StateTag<CombiningState<Long, long[], Long>> PANE_ADDITIONS_TAG = StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal( "count", VarLongCoder.of(), Sum.ofLongs())); http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java index 4cf4d67..66b3960 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java @@ -43,7 +43,7 @@ public class PaneInfoTracker { } @VisibleForTesting - static final StateTag<Object, ValueState<PaneInfo>> PANE_INFO_TAG = + static final StateTag<ValueState<PaneInfo>> PANE_INFO_TAG = StateTags.makeSystemTagInternal(StateTags.value("pane", PaneInfoCoder.INSTANCE)); public void clear(StateAccessor<?> state) { http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java index 8493474..3031ebf 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java @@ -51,7 +51,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> { private final K key; private final ReduceFn<K, InputT, OutputT, W> reduceFn; private final WindowingStrategy<?, W> windowingStrategy; - private final StateInternals<K> stateInternals; + private final StateInternals stateInternals; private final ActiveWindowSet<W> activeWindows; private final TimerInternals timerInternals; private final SideInputReader sideInputReader; @@ -61,7 +61,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> { K key, ReduceFn<K, InputT, OutputT, W> reduceFn, WindowingStrategy<?, W> windowingStrategy, - StateInternals<K> stateInternals, + StateInternals stateInternals, ActiveWindowSet<W> activeWindows, TimerInternals timerInternals, SideInputReader sideInputReader, @@ -165,11 +165,15 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> { protected final StateContext<W> context; protected final StateNamespace windowNamespace; protected final Coder<W> windowCoder; - protected final StateInternals<K> stateInternals; + protected final StateInternals stateInternals; protected final StateStyle style; - public StateAccessorImpl(ActiveWindowSet<W> activeWindows, Coder<W> windowCoder, - StateInternals<K> stateInternals, StateContext<W> context, StateStyle style) { + public StateAccessorImpl( + ActiveWindowSet<W> activeWindows, + Coder<W> windowCoder, + StateInternals stateInternals, + StateContext<W> context, + StateStyle style) { this.activeWindows = activeWindows; this.windowCoder = windowCoder; @@ -196,7 +200,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> { } @Override - public <StateT extends State> StateT access(StateTag<? super K, StateT> address) { + public <StateT extends State> StateT access(StateTag<StateT> address) { switch (style) { case DIRECT: return stateInternals.state(windowNamespace(), address, context); @@ -212,8 +216,12 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> { extends StateAccessorImpl<K, W> implements MergingStateAccessor<K, W> { private final Collection<W> activeToBeMerged; - public MergingStateAccessorImpl(ActiveWindowSet<W> activeWindows, Coder<W> windowCoder, - StateInternals<K> stateInternals, StateStyle style, Collection<W> activeToBeMerged, + public MergingStateAccessorImpl( + ActiveWindowSet<W> activeWindows, + Coder<W> windowCoder, + StateInternals stateInternals, + StateStyle style, + Collection<W> activeToBeMerged, W mergeResult) { super(activeWindows, windowCoder, stateInternals, stateContextForWindowOnly(mergeResult), style); @@ -221,7 +229,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> { } @Override - public <StateT extends State> StateT access(StateTag<? super K, StateT> address) { + public <StateT extends State> StateT access(StateTag<StateT> address) { switch (style) { case DIRECT: return stateInternals.state(windowNamespace(), address, context); @@ -237,7 +245,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> { @Override public <StateT extends State> Map<W, StateT> accessInEachMergingWindow( - StateTag<? super K, StateT> address) { + StateTag<StateT> address) { ImmutableMap.Builder<W, StateT> builder = ImmutableMap.builder(); for (W mergingWindow : activeToBeMerged) { StateNamespace namespace = null; @@ -258,8 +266,11 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> { static class PremergingStateAccessorImpl<K, W extends BoundedWindow> extends StateAccessorImpl<K, W> implements MergingStateAccessor<K, W> { - public PremergingStateAccessorImpl(ActiveWindowSet<W> activeWindows, Coder<W> windowCoder, - StateInternals<K> stateInternals, W window) { + public PremergingStateAccessorImpl( + ActiveWindowSet<W> activeWindows, + Coder<W> windowCoder, + StateInternals stateInternals, + W window) { super(activeWindows, windowCoder, stateInternals, stateContextForWindowOnly(window), StateStyle.RENAMED); } @@ -270,7 +281,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> { @Override public <StateT extends State> Map<W, StateT> accessInEachMergingWindow( - StateTag<? super K, StateT> address) { + StateTag<StateT> address) { ImmutableMap.Builder<W, StateT> builder = ImmutableMap.builder(); for (W stateAddressWindow : activeWindows.readStateAddresses(context.window())) { StateT stateForWindow = http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java index 08014ec..d3dc067 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java @@ -108,7 +108,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { private final OutputWindowedValue<KV<K, OutputT>> outputter; - private final StateInternals<K> stateInternals; + private final StateInternals stateInternals; private final Counter droppedDueToClosedWindow; @@ -214,7 +214,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { K key, WindowingStrategy<?, W> windowingStrategy, ExecutableTriggerStateMachine triggerStateMachine, - StateInternals<K> stateInternals, + StateInternals stateInternals, TimerInternals timerInternals, OutputWindowedValue<KV<K, OutputT>> outputter, SideInputReader sideInputReader, http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java index 26e920a..5c67148 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java @@ -61,7 +61,7 @@ public class SideInputHandler implements ReadyCheckingSideInputReader { * be keep locally but if side inputs are broadcast to all parallel operators then all will * have the same view of the state. */ - private final StateInternals<Void> stateInternals; + private final StateInternals stateInternals; /** * A state tag for each side input that we handle. The state is used to track @@ -70,7 +70,6 @@ public class SideInputHandler implements ReadyCheckingSideInputReader { private final Map< PCollectionView<?>, StateTag< - Object, CombiningState< BoundedWindow, Set<BoundedWindow>, @@ -81,7 +80,7 @@ public class SideInputHandler implements ReadyCheckingSideInputReader { */ private final Map< PCollectionView<?>, - StateTag<Object, ValueState<Iterable<WindowedValue<?>>>>> sideInputContentsTags; + StateTag<ValueState<Iterable<WindowedValue<?>>>>> sideInputContentsTags; /** * Creates a new {@code SideInputHandler} for the given side inputs that uses @@ -89,7 +88,7 @@ public class SideInputHandler implements ReadyCheckingSideInputReader { */ public SideInputHandler( Collection<PCollectionView<?>> sideInputs, - StateInternals<Void> stateInternals) { + StateInternals stateInternals) { this.sideInputs = sideInputs; this.stateInternals = stateInternals; this.availableWindowsTags = new HashMap<>(); @@ -105,7 +104,6 @@ public class SideInputHandler implements ReadyCheckingSideInputReader { .windowCoder(); StateTag< - Object, CombiningState< BoundedWindow, Set<BoundedWindow>, @@ -117,7 +115,7 @@ public class SideInputHandler implements ReadyCheckingSideInputReader { availableWindowsTags.put(sideInput, availableTag); Coder<Iterable<WindowedValue<?>>> coder = sideInput.getCoderInternal(); - StateTag<Object, ValueState<Iterable<WindowedValue<?>>>> stateTag = + StateTag<ValueState<Iterable<WindowedValue<?>>>> stateTag = StateTags.value("side-input-data-" + sideInput.getTagInternal().getId(), coder); sideInputContentsTags.put(sideInput, stateTag); } @@ -145,7 +143,7 @@ public class SideInputHandler implements ReadyCheckingSideInputReader { inputWithReifiedWindows.add(value.withValue(e)); } - StateTag<Object, ValueState<Iterable<WindowedValue<?>>>> stateTag = + StateTag<ValueState<Iterable<WindowedValue<?>>>> stateTag = sideInputContentsTags.get(sideInput); for (BoundedWindow window: value.getWindows()) { @@ -175,7 +173,7 @@ public class SideInputHandler implements ReadyCheckingSideInputReader { .getWindowFn() .windowCoder(); - StateTag<Object, ValueState<Iterable<WindowedValue<?>>>> stateTag = + StateTag<ValueState<Iterable<WindowedValue<?>>>> stateTag = sideInputContentsTags.get(sideInput); ValueState<Iterable<WindowedValue<?>>> state = http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index 1865d54..edce9a2 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -614,8 +614,8 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out @Override public State state(String stateId) { try { - StateSpec<?, ?> spec = - (StateSpec<?, ?>) signature.stateDeclarations().get(stateId).field().get(fn); + StateSpec<?> spec = + (StateSpec<?>) signature.stateDeclarations().get(stateId).field().get(fn); return stepContext .stateInternals() .state(getNamespace(), StateTags.tagForSpec(stateId, (StateSpec) spec)); @@ -726,8 +726,8 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out @Override public State state(String stateId) { try { - StateSpec<?, ?> spec = - (StateSpec<?, ?>) signature.stateDeclarations().get(stateId).field().get(fn); + StateSpec<?> spec = + (StateSpec<?>) signature.stateDeclarations().get(stateId).field().get(fn); return stepContext .stateInternals() .state(getNamespace(), StateTags.tagForSpec(stateId, (StateSpec) spec)); http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java index b8db491..b5f8f45 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java @@ -489,7 +489,7 @@ class SimpleOldDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT } @Override - public StateInternals<?> stateInternals() { + public StateInternals stateInternals() { return context.stepContext.stateInternals(); } http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java index 0fa6f76..94f5f35 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java @@ -353,7 +353,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT> * the input watermark when the first {@link DoFn.ProcessElement} call for this element * completes. */ - private static final StateTag<Object, WatermarkHoldState> watermarkHoldTag = + private static final StateTag<WatermarkHoldState> watermarkHoldTag = StateTags.makeSystemTagInternal( StateTags.<GlobalWindow>watermarkStateInternal( "hold", TimestampCombiner.LATEST)); @@ -363,13 +363,13 @@ public class SplittableParDo<InputT, OutputT, RestrictionT> * DoFn.ProcessElement} call and read during subsequent calls in response to timer firings, when * the original element is no longer available. */ - private final StateTag<Object, ValueState<WindowedValue<InputT>>> elementTag; + private final StateTag<ValueState<WindowedValue<InputT>>> elementTag; /** * The state cell containing a restriction representing the unprocessed part of work for this * element. */ - private StateTag<Object, ValueState<RestrictionT>> restrictionTag; + private StateTag<ValueState<RestrictionT>> restrictionTag; private final DoFn<InputT, OutputT> fn; private final Coder<InputT> elementCoder; @@ -453,7 +453,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT> @ProcessElement public void processElement(final ProcessContext c) { String key = c.element().key(); - StateInternals<String> stateInternals = + StateInternals stateInternals = stateInternalsFactory.stateInternalsForKey(key); TimerInternals timerInternals = timerInternalsFactory.timerInternalsForKey(key); http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/StateAccessor.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateAccessor.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateAccessor.java index 87353f2..eda896b 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateAccessor.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateAccessor.java @@ -34,5 +34,5 @@ public interface StateAccessor<K> { * <p>Never accounts for merged windows. When windows are merged, any state accessed via * this method must be eagerly combined and written into the result window. */ - <StateT extends State> StateT access(StateTag<? super K, StateT> address); + <StateT extends State> StateT access(StateTag<StateT> address); } http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternals.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternals.java index e6440bf..c2e9412 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternals.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternals.java @@ -40,20 +40,20 @@ import org.apache.beam.sdk.util.state.StateContext; * used directly, and is highly likely to change. */ @Experimental(Kind.STATE) -public interface StateInternals<K> { +public interface StateInternals { /** The key for this {@link StateInternals}. */ - K getKey(); + Object getKey(); /** * Return the state associated with {@code address} in the specified {@code namespace}. */ - <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> address); + <T extends State> T state(StateNamespace namespace, StateTag<T> address); /** * Return the state associated with {@code address} in the specified {@code namespace} * with the {@link StateContext}. */ <T extends State> T state( - StateNamespace namespace, StateTag<? super K, T> address, StateContext<?> c); + StateNamespace namespace, StateTag<T> address, StateContext<?> c); } http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternalsFactory.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternalsFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternalsFactory.java index ea7d742..c756364 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternalsFactory.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternalsFactory.java @@ -31,5 +31,5 @@ import org.apache.beam.sdk.annotations.Experimental.Kind; public interface StateInternalsFactory<K> { /** Returns {@link StateInternals} for the provided key. */ - StateInternals<K> stateInternalsForKey(K key); + StateInternals stateInternalsForKey(K key); } http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java index ce37fd3..f6b9103 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java @@ -42,7 +42,7 @@ public class StateMerging { * in {@code context}. */ public static <K, StateT extends State, W extends BoundedWindow> void clear( - MergingStateAccessor<K, W> context, StateTag<? super K, StateT> address) { + MergingStateAccessor<K, W> context, StateTag<StateT> address) { for (StateT state : context.accessInEachMergingWindow(address).values()) { state.clear(); } @@ -54,7 +54,7 @@ public class StateMerging { * blindly append to. */ public static <K, T, W extends BoundedWindow> void prefetchBags( - MergingStateAccessor<K, W> context, StateTag<? super K, BagState<T>> address) { + MergingStateAccessor<K, W> context, StateTag<BagState<T>> address) { Map<W, BagState<T>> map = context.accessInEachMergingWindow(address); if (map.isEmpty()) { // Nothing to prefetch. @@ -73,7 +73,7 @@ public class StateMerging { * Merge all bag state in {@code address} across all windows under merge. */ public static <K, T, W extends BoundedWindow> void mergeBags( - MergingStateAccessor<K, W> context, StateTag<? super K, BagState<T>> address) { + MergingStateAccessor<K, W> context, StateTag<BagState<T>> address) { mergeBags(context.accessInEachMergingWindow(address).values(), context.access(address)); } @@ -116,7 +116,7 @@ public class StateMerging { * Merge all set state in {@code address} across all windows under merge. */ public static <K, T, W extends BoundedWindow> void mergeSets( - MergingStateAccessor<K, W> context, StateTag<? super K, SetState<T>> address) { + MergingStateAccessor<K, W> context, StateTag<SetState<T>> address) { mergeSets(context.accessInEachMergingWindow(address).values(), context.access(address)); } @@ -161,7 +161,7 @@ public class StateMerging { */ public static <K, StateT extends GroupingState<?, ?>, W extends BoundedWindow> void prefetchCombiningValues(MergingStateAccessor<K, W> context, - StateTag<? super K, StateT> address) { + StateTag<StateT> address) { for (StateT state : context.accessInEachMergingWindow(address).values()) { prefetchRead(state); } @@ -172,7 +172,7 @@ public class StateMerging { */ public static <K, InputT, AccumT, OutputT, W extends BoundedWindow> void mergeCombiningValues( MergingStateAccessor<K, W> context, - StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address) { + StateTag<CombiningState<InputT, AccumT, OutputT>> address) { mergeCombiningValues( context.accessInEachMergingWindow(address).values(), context.access(address)); } @@ -218,7 +218,7 @@ public class StateMerging { */ public static <K, W extends BoundedWindow> void prefetchWatermarks( MergingStateAccessor<K, W> context, - StateTag<? super K, WatermarkHoldState> address) { + StateTag<WatermarkHoldState> address) { Map<W, WatermarkHoldState> map = context.accessInEachMergingWindow(address); WatermarkHoldState result = context.access(address); if (map.isEmpty()) { @@ -250,7 +250,7 @@ public class StateMerging { */ public static <K, W extends BoundedWindow> void mergeWatermarks( MergingStateAccessor<K, W> context, - StateTag<? super K, WatermarkHoldState> address, + StateTag<WatermarkHoldState> address, W mergeResult) { mergeWatermarks( context.accessInEachMergingWindow(address).values(), context.access(address), mergeResult); http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTable.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTable.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTable.java index d2511c9..1bf4ff5 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTable.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTable.java @@ -28,9 +28,9 @@ import org.apache.beam.sdk.util.state.StateContext; /** * Table mapping {@code StateNamespace} and {@code StateTag<?>} to a {@code State} instance. */ -public abstract class StateTable<K> { +public abstract class StateTable { - private final Table<StateNamespace, StateTag<? super K, ?>, State> stateTable = + private final Table<StateNamespace, StateTag<?>, State> stateTable = HashBasedTable.create(); /** @@ -39,7 +39,7 @@ public abstract class StateTable<K> { * already present in this {@link StateTable}. */ public <StateT extends State> StateT get( - StateNamespace namespace, StateTag<? super K, StateT> tag, StateContext<?> c) { + StateNamespace namespace, StateTag<StateT> tag, StateContext<?> c) { State storage = stateTable.get(namespace, tag); if (storage != null) { @SuppressWarnings("unchecked") @@ -68,7 +68,7 @@ public abstract class StateTable<K> { return stateTable.containsRow(namespace); } - public Map<StateTag<? super K, ?>, State> getTagsInUse(StateNamespace namespace) { + public Map<StateTag<?>, State> getTagsInUse(StateNamespace namespace) { return stateTable.row(namespace); } @@ -80,5 +80,5 @@ public abstract class StateTable<K> { * Provide the {@code StateBinder} to use for creating {@code Storage} instances * in the specified {@code namespace}. */ - protected abstract StateBinder<K> binderForNamespace(StateNamespace namespace, StateContext<?> c); + protected abstract StateBinder binderForNamespace(StateNamespace namespace, StateContext<?> c); } http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java index aaeecf0..38e9dea 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java @@ -25,7 +25,6 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext; import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.util.state.BagState; import org.apache.beam.sdk.util.state.CombiningState; @@ -46,12 +45,10 @@ import org.apache.beam.sdk.util.state.WatermarkHoldState; * * <p>Currently, this can only be used in a step immediately following a {@link GroupByKey}. * - * @param <K> The type of key that must be used with the state tag. Contravariant: methods should - * accept values of type {@code KeyedStateTag<? super K, StateT>}. * @param <StateT> The type of state being tagged. */ @Experimental(Kind.STATE) -public interface StateTag<K, StateT extends State> extends Serializable { +public interface StateTag<StateT extends State> extends Serializable { /** Append the UTF-8 encoding of this tag to the given {@link Appendable}. */ void appendTo(Appendable sb) throws IOException; @@ -64,7 +61,7 @@ public interface StateTag<K, StateT extends State> extends Serializable { /** * The specification for the state stored in the referenced cell. */ - StateSpec<K, StateT> getSpec(); + StateSpec<StateT> getSpec(); /** * Bind this state tag. See {@link StateSpec#bind}. @@ -72,35 +69,34 @@ public interface StateTag<K, StateT extends State> extends Serializable { * @deprecated Use the {@link StateSpec#bind} method via {@link #getSpec} for now. */ @Deprecated - StateT bind(StateBinder<? extends K> binder); + StateT bind(StateBinder binder); /** * Visitor for binding a {@link StateSpec} and to the associated {@link State}. * - * @param <K> the type of key this binder embodies. * @deprecated for migration only; runners should reference the top level {@link StateBinder} * and move towards {@link StateSpec} rather than {@link StateTag}. */ @Deprecated - public interface StateBinder<K> { - <T> ValueState<T> bindValue(StateTag<? super K, ValueState<T>> spec, Coder<T> coder); + public interface StateBinder { + <T> ValueState<T> bindValue(StateTag<ValueState<T>> spec, Coder<T> coder); - <T> BagState<T> bindBag(StateTag<? super K, BagState<T>> spec, Coder<T> elemCoder); + <T> BagState<T> bindBag(StateTag<BagState<T>> spec, Coder<T> elemCoder); - <T> SetState<T> bindSet(StateTag<? super K, SetState<T>> spec, Coder<T> elemCoder); + <T> SetState<T> bindSet(StateTag<SetState<T>> spec, Coder<T> elemCoder); <KeyT, ValueT> MapState<KeyT, ValueT> bindMap( - StateTag<? super K, MapState<KeyT, ValueT>> spec, + StateTag<MapState<KeyT, ValueT>> spec, Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder); <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValue( - StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> spec, + StateTag<CombiningState<InputT, AccumT, OutputT>> spec, Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn); <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext( - StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> spec, + StateTag<CombiningState<InputT, AccumT, OutputT>> spec, Coder<AccumT> accumCoder, CombineFnWithContext<InputT, AccumT, OutputT> combineFn); @@ -110,7 +106,7 @@ public interface StateTag<K, StateT extends State> extends Serializable { * <p>This accepts the {@link TimestampCombiner} that dictates how watermark hold timestamps * added to the returned {@link WatermarkHoldState} are to be combined. */ - <W extends BoundedWindow> WatermarkHoldState bindWatermark( - StateTag<? super K, WatermarkHoldState> spec, TimestampCombiner timestampCombiner); + WatermarkHoldState bindWatermark( + StateTag<WatermarkHoldState> spec, TimestampCombiner timestampCombiner); } } http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java index fe99f27..ca8b238 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java @@ -50,29 +50,29 @@ public class StateTags { /** @deprecated for migration purposes only */ @Deprecated - private static <K> StateBinder<K> adaptTagBinder(final StateTag.StateBinder<K> binder) { - return new StateBinder<K>() { + private static StateBinder adaptTagBinder(final StateTag.StateBinder binder) { + return new StateBinder() { @Override public <T> ValueState<T> bindValue( - String id, StateSpec<? super K, ValueState<T>> spec, Coder<T> coder) { + String id, StateSpec<ValueState<T>> spec, Coder<T> coder) { return binder.bindValue(tagForSpec(id, spec), coder); } @Override public <T> BagState<T> bindBag( - String id, StateSpec<? super K, BagState<T>> spec, Coder<T> elemCoder) { + String id, StateSpec<BagState<T>> spec, Coder<T> elemCoder) { return binder.bindBag(tagForSpec(id, spec), elemCoder); } @Override public <T> SetState<T> bindSet( - String id, StateSpec<? super K, SetState<T>> spec, Coder<T> elemCoder) { + String id, StateSpec<SetState<T>> spec, Coder<T> elemCoder) { return binder.bindSet(tagForSpec(id, spec), elemCoder); } @Override public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap( - String id, StateSpec<? super K, MapState<KeyT, ValueT>> spec, + String id, StateSpec<MapState<KeyT, ValueT>> spec, Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) { return binder.bindMap(tagForSpec(id, spec), mapKeyCoder, mapValueCoder); } @@ -81,7 +81,7 @@ public class StateTags { public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombining( String id, - StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec, + StateSpec<CombiningState<InputT, AccumT, OutputT>> spec, Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) { return binder.bindCombiningValue(tagForSpec(id, spec), accumCoder, combineFn); @@ -91,7 +91,7 @@ public class StateTags { public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningWithContext( String id, - StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec, + StateSpec<CombiningState<InputT, AccumT, OutputT>> spec, Coder<AccumT> accumCoder, CombineFnWithContext<InputT, AccumT, OutputT> combineFn) { return binder.bindCombiningValueWithContext( @@ -99,9 +99,9 @@ public class StateTags { } @Override - public <W extends BoundedWindow> WatermarkHoldState bindWatermark( + public WatermarkHoldState bindWatermark( String id, - StateSpec<? super K, WatermarkHoldState> spec, + StateSpec<WatermarkHoldState> spec, TimestampCombiner timestampCombiner) { return binder.bindWatermark(tagForSpec(id, spec), timestampCombiner); } @@ -121,20 +121,20 @@ public class StateTags { private StateTags() { } - private interface SystemStateTag<K, StateT extends State> { - StateTag<K, StateT> asKind(StateKind kind); + private interface SystemStateTag<StateT extends State> { + StateTag<StateT> asKind(StateKind kind); } /** Create a state tag for the given id and spec. */ - public static <K, StateT extends State> StateTag<K, StateT> tagForSpec( - String id, StateSpec<K, StateT> spec) { + public static <StateT extends State> StateTag<StateT> tagForSpec( + String id, StateSpec<StateT> spec) { return new SimpleStateTag<>(new StructuredId(id), spec); } /** * Create a simple state tag for values of type {@code T}. */ - public static <T> StateTag<Object, ValueState<T>> value(String id, Coder<T> valueCoder) { + public static <T> StateTag<ValueState<T>> value(String id, Coder<T> valueCoder) { return new SimpleStateTag<>(new StructuredId(id), StateSpecs.value(valueCoder)); } @@ -143,7 +143,7 @@ public class StateTags { * multiple {@code InputT}s into a single {@code OutputT}. */ public static <InputT, AccumT, OutputT> - StateTag<Object, CombiningState<InputT, AccumT, OutputT>> + StateTag<CombiningState<InputT, AccumT, OutputT>> combiningValue( String id, Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) { return new SimpleStateTag<>( @@ -155,7 +155,7 @@ public class StateTags { * merge multiple {@code InputT}s into a single {@code OutputT}. */ public static <InputT, AccumT, OutputT> - StateTag<Object, CombiningState<InputT, AccumT, OutputT>> + StateTag<CombiningState<InputT, AccumT, OutputT>> combiningValueWithContext( String id, Coder<AccumT> accumCoder, @@ -172,7 +172,7 @@ public class StateTags { * should only be used to initialize static values. */ public static <InputT, AccumT, OutputT> - StateTag<Object, CombiningState<InputT, AccumT, OutputT>> + StateTag<CombiningState<InputT, AccumT, OutputT>> combiningValueFromInputInternal( String id, Coder<InputT> inputCoder, CombineFn<InputT, AccumT, OutputT> combineFn) { return new SimpleStateTag<>( @@ -183,21 +183,21 @@ public class StateTags { * Create a state tag that is optimized for adding values frequently, and * occasionally retrieving all the values that have been added. */ - public static <T> StateTag<Object, BagState<T>> bag(String id, Coder<T> elemCoder) { + public static <T> StateTag<BagState<T>> bag(String id, Coder<T> elemCoder) { return new SimpleStateTag<>(new StructuredId(id), StateSpecs.bag(elemCoder)); } /** * Create a state spec that supporting for {@link java.util.Set} like access patterns. */ - public static <T> StateTag<Object, SetState<T>> set(String id, Coder<T> elemCoder) { + public static <T> StateTag<SetState<T>> set(String id, Coder<T> elemCoder) { return new SimpleStateTag<>(new StructuredId(id), StateSpecs.set(elemCoder)); } /** * Create a state spec that supporting for {@link java.util.Map} like access patterns. */ - public static <K, V> StateTag<Object, MapState<K, V>> map( + public static <K, V> StateTag<MapState<K, V>> map( String id, Coder<K> keyCoder, Coder<V> valueCoder) { return new SimpleStateTag<>(new StructuredId(id), StateSpecs.map(keyCoder, valueCoder)); } @@ -205,7 +205,7 @@ public class StateTags { /** * Create a state tag for holding the watermark. */ - public static <W extends BoundedWindow> StateTag<Object, WatermarkHoldState> + public static <W extends BoundedWindow> StateTag<WatermarkHoldState> watermarkStateInternal(String id, TimestampCombiner timestampCombiner) { return new SimpleStateTag<>( new StructuredId(id), StateSpecs.watermarkStateInternal(timestampCombiner)); @@ -215,20 +215,20 @@ public class StateTags { * Convert an arbitrary {@link StateTag} to a system-internal tag that is guaranteed not to * collide with any user tags. */ - public static <K, StateT extends State> StateTag<K, StateT> makeSystemTagInternal( - StateTag<K, StateT> tag) { + public static <StateT extends State> StateTag<StateT> makeSystemTagInternal( + StateTag<StateT> tag) { if (!(tag instanceof SystemStateTag)) { throw new IllegalArgumentException("Expected subclass of SimpleStateTag, got " + tag); } // Checked above @SuppressWarnings("unchecked") - SystemStateTag<K, StateT> typedTag = (SystemStateTag<K, StateT>) tag; + SystemStateTag<StateT> typedTag = (SystemStateTag<StateT>) tag; return typedTag.asKind(StateKind.SYSTEM); } - public static <K, InputT, AccumT, OutputT> StateTag<Object, BagState<AccumT>> + public static <InputT, AccumT, OutputT> StateTag<BagState<AccumT>> convertToBagTagInternal( - StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> combiningTag) { + StateTag<CombiningState<InputT, AccumT, OutputT>> combiningTag) { return new SimpleStateTag<>( new StructuredId(combiningTag.getId()), StateSpecs.convertToBagSpecInternal(combiningTag.getSpec())); @@ -291,20 +291,20 @@ public class StateTags { /** * A basic {@link StateTag} implementation that manages the structured ids. */ - private static class SimpleStateTag<K, StateT extends State> - implements StateTag<K, StateT>, SystemStateTag<K, StateT> { + private static class SimpleStateTag<StateT extends State> + implements StateTag<StateT>, SystemStateTag<StateT> { - private final StateSpec<K, StateT> spec; + private final StateSpec<StateT> spec; private final StructuredId id; - public SimpleStateTag(StructuredId id, StateSpec<K, StateT> spec) { + public SimpleStateTag(StructuredId id, StateSpec<StateT> spec) { this.id = id; this.spec = spec; } @Override @Deprecated - public StateT bind(StateTag.StateBinder<? extends K> binder) { + public StateT bind(StateTag.StateBinder binder) { return spec.bind( this.id.getRawId(), adaptTagBinder(binder)); } @@ -315,7 +315,7 @@ public class StateTags { } @Override - public StateSpec<K, StateT> getSpec() { + public StateSpec<StateT> getSpec() { return spec; } @@ -332,7 +332,7 @@ public class StateTags { } @Override - public StateTag<K, StateT> asKind(StateKind kind) { + public StateTag<StateT> asKind(StateKind kind) { return new SimpleStateTag<>(id.asKind(kind), spec); } @@ -342,7 +342,7 @@ public class StateTags { return false; } - SimpleStateTag<?, ?> otherTag = (SimpleStateTag<?, ?>) other; + SimpleStateTag<?> otherTag = (SimpleStateTag<?>) other; return Objects.equals(this.getId(), otherTag.getId()) && Objects.equals(this.getSpec(), otherTag.getSpec()); } http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java index 7a20590..e3717a8 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java @@ -236,12 +236,12 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow> private final DoFn<?, ?> fn; private final DoFnSignature signature; - private final StateInternals<?> stateInternals; + private final StateInternals stateInternals; private final Coder<W> windowCoder; public StateInternalsStateCleaner( DoFn<?, ?> fn, - StateInternals<?> stateInternals, + StateInternals stateInternals, Coder<W> windowCoder) { this.fn = fn; this.signature = DoFnSignatures.getSignature(fn.getClass()); @@ -254,7 +254,7 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow> for (Map.Entry<String, DoFnSignature.StateDeclaration> entry : signature.stateDeclarations().entrySet()) { try { - StateSpec<?, ?> spec = (StateSpec<?, ?>) entry.getValue().field().get(fn); + StateSpec<?> spec = (StateSpec<?>) entry.getValue().field().get(fn); State state = stateInternals.state(StateNamespaces.window(windowCoder, window), StateTags.tagForSpec(entry.getKey(), (StateSpec) spec)); state.clear(); http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java index 86a7fd7..f18460a 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java @@ -47,7 +47,7 @@ public abstract class SystemReduceFn<K, InputT, AccumT, OutputT, W extends Bound */ public static <K, T, W extends BoundedWindow> SystemReduceFn<K, T, Iterable<T>, Iterable<T>, W> buffering(final Coder<T> inputCoder) { - final StateTag<Object, BagState<T>> bufferTag = + final StateTag<BagState<T>> bufferTag = StateTags.makeSystemTagInternal(StateTags.bag(BUFFER_NAME, inputCoder)); return new SystemReduceFn<K, T, Iterable<T>, Iterable<T>, W>(bufferTag) { @Override @@ -70,7 +70,7 @@ public abstract class SystemReduceFn<K, InputT, AccumT, OutputT, W extends Bound AccumT, OutputT, W> combining( final Coder<K> keyCoder, final AppliedCombineFn<K, InputT, AccumT, OutputT> combineFn) { - final StateTag<Object, CombiningState<InputT, AccumT, OutputT>> bufferTag; + final StateTag<CombiningState<InputT, AccumT, OutputT>> bufferTag; if (combineFn.getFn() instanceof CombineFnWithContext) { bufferTag = StateTags.makeSystemTagInternal( StateTags.<InputT, AccumT, OutputT>combiningValueWithContext( @@ -96,10 +96,10 @@ public abstract class SystemReduceFn<K, InputT, AccumT, OutputT, W extends Bound }; } - private StateTag<? super K, ? extends GroupingState<InputT, OutputT>> bufferTag; + private StateTag<? extends GroupingState<InputT, OutputT>> bufferTag; public SystemReduceFn( - StateTag<? super K, ? extends GroupingState<InputT, OutputT>> bufferTag) { + StateTag<? extends GroupingState<InputT, OutputT>> bufferTag) { this.bufferTag = bufferTag; } http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java index 1dfb85f..18b50db 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java @@ -32,9 +32,9 @@ public class TestInMemoryStateInternals<K> extends InMemoryStateInternals<K> { super(key); } - public Set<StateTag<? super K, ?>> getTagsInUse(StateNamespace namespace) { - Set<StateTag<? super K, ?>> inUse = new HashSet<>(); - for (Map.Entry<StateTag<? super K, ?>, State> entry : + public Set<StateTag<?>> getTagsInUse(StateNamespace namespace) { + Set<StateTag<?>> inUse = new HashSet<>(); + for (Map.Entry<StateTag<?>, State> entry : inMemoryState.getTagsInUse(namespace).entrySet()) { if (!isEmptyForTesting(entry.getValue())) { inUse.add(entry.getKey()); http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java index 9bb9c62..e6e4ffb 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java @@ -54,9 +54,9 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable { * used for elements. */ public static <W extends BoundedWindow> - StateTag<Object, WatermarkHoldState> watermarkHoldTagForTimestampCombiner( + StateTag<WatermarkHoldState> watermarkHoldTagForTimestampCombiner( TimestampCombiner timestampCombiner) { - return StateTags.<Object, WatermarkHoldState>makeSystemTagInternal( + return StateTags.<WatermarkHoldState>makeSystemTagInternal( StateTags.<W>watermarkStateInternal("hold", timestampCombiner)); } @@ -67,13 +67,13 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable { * would take the end-of-window time as its element time.) */ @VisibleForTesting - public static final StateTag<Object, WatermarkHoldState> EXTRA_HOLD_TAG = + public static final StateTag<WatermarkHoldState> EXTRA_HOLD_TAG = StateTags.makeSystemTagInternal(StateTags.watermarkStateInternal( "extra", TimestampCombiner.EARLIEST)); private final TimerInternals timerInternals; private final WindowingStrategy<?, W> windowingStrategy; - private final StateTag<Object, WatermarkHoldState> elementHoldTag; + private final StateTag<WatermarkHoldState> elementHoldTag; public WatermarkHold(TimerInternals timerInternals, WindowingStrategy<?, W> windowingStrategy) { this.timerInternals = timerInternals; http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternals.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternals.java index 5005065..a824a7b 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternals.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternals.java @@ -40,7 +40,7 @@ public interface WindowingInternals<InputT, OutputT> { * Unsupported state internals. The key type is unknown. It is up to the user to use the * correct type of key. */ - StateInternals<?> stateInternals(); + StateInternals stateInternals(); /** * Output the value at the specified timestamp in the listed windows. http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java index b416788..ed2c26f 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java @@ -55,7 +55,7 @@ public abstract class AfterDelayFromFirstElementStateMachine extends OnceTrigger protected static final List<SerializableFunction<Instant, Instant>> IDENTITY = ImmutableList.<SerializableFunction<Instant, Instant>>of(); - protected static final StateTag<Object, CombiningState<Instant, + protected static final StateTag<CombiningState<Instant, Holder<Instant>, Instant>> DELAYED_UNTIL_TAG = StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal( "delayed", InstantCoder.of(), Min.<Instant>naturalOrder())); http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java index 11323cc..52fb5ff 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java @@ -35,7 +35,7 @@ import org.apache.beam.sdk.util.state.CombiningState; @Experimental(Experimental.Kind.TRIGGER) public class AfterPaneStateMachine extends OnceTriggerStateMachine { -private static final StateTag<Object, CombiningState<Long, long[], Long>> +private static final StateTag<CombiningState<Long, long[], Long>> ELEMENTS_IN_PANE_TAG = StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal( "count", VarLongCoder.of(), Sum.ofLongs()));
