http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java index 31e931c..cfe3f9b 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java @@ -34,7 +34,6 @@ import org.apache.beam.sdk.coders.MapCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.CombineWithContext; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.util.CombineContextFactory; import org.apache.beam.sdk.util.state.BagState; @@ -62,7 +61,7 @@ import org.apache.flink.runtime.state.OperatorStateBackend; * <p>Note: Ignore index of key. * Mainly for SideInputs. */ -public class FlinkBroadcastStateInternals<K> implements StateInternals<K> { +public class FlinkBroadcastStateInternals<K> implements StateInternals { private int indexInSubtaskGroup; private final DefaultOperatorStateBackend stateBackend; @@ -86,7 +85,7 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> { @Override public <T extends State> T state( final StateNamespace namespace, - StateTag<? super K, T> address) { + StateTag<T> address) { return state(namespace, address, StateContexts.nullContext()); } @@ -94,36 +93,36 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> { @Override public <T extends State> T state( final StateNamespace namespace, - StateTag<? super K, T> address, + StateTag<T> address, final StateContext<?> context) { return address.bind( - new StateTag.StateBinder<K>() { + new StateTag.StateBinder() { @Override public <T> ValueState<T> bindValue( - StateTag<? super K, ValueState<T>> address, Coder<T> coder) { + StateTag<ValueState<T>> address, Coder<T> coder) { return new FlinkBroadcastValueState<>(stateBackend, address, namespace, coder); } @Override public <T> BagState<T> bindBag( - StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) { + StateTag<BagState<T>> address, Coder<T> elemCoder) { return new FlinkBroadcastBagState<>(stateBackend, address, namespace, elemCoder); } @Override public <T> SetState<T> bindSet( - StateTag<? super K, SetState<T>> address, Coder<T> elemCoder) { + StateTag<SetState<T>> address, Coder<T> elemCoder) { throw new UnsupportedOperationException( String.format("%s is not supported", SetState.class.getSimpleName())); } @Override public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap( - StateTag<? super K, MapState<KeyT, ValueT>> spec, + StateTag<MapState<KeyT, ValueT>> spec, Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) { throw new UnsupportedOperationException( @@ -133,7 +132,7 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> { @Override public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValue( - StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, + StateTag<CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, Combine.CombineFn<InputT, AccumT, OutputT> combineFn) { @@ -144,7 +143,7 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> { @Override public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext( - StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, + StateTag<CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn) { return new FlinkCombiningStateWithContext<>( @@ -158,8 +157,8 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> { } @Override - public <W extends BoundedWindow> WatermarkHoldState bindWatermark( - StateTag<? super K, WatermarkHoldState> address, + public WatermarkHoldState bindWatermark( + StateTag<WatermarkHoldState> address, TimestampCombiner timestampCombiner) { throw new UnsupportedOperationException( String.format("%s is not supported", WatermarkHoldState.class.getSimpleName())); @@ -302,11 +301,11 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> { extends AbstractBroadcastState<T> implements ValueState<T> { private final StateNamespace namespace; - private final StateTag<? super K, ValueState<T>> address; + private final StateTag<ValueState<T>> address; FlinkBroadcastValueState( DefaultOperatorStateBackend flinkStateBackend, - StateTag<? super K, ValueState<T>> address, + StateTag<ValueState<T>> address, StateNamespace namespace, Coder<T> coder) { super(flinkStateBackend, address.getId(), namespace, coder); @@ -363,11 +362,11 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> { implements BagState<T> { private final StateNamespace namespace; - private final StateTag<? super K, BagState<T>> address; + private final StateTag<BagState<T>> address; FlinkBroadcastBagState( DefaultOperatorStateBackend flinkStateBackend, - StateTag<? super K, BagState<T>> address, + StateTag<BagState<T>> address, StateNamespace namespace, Coder<T> coder) { super(flinkStateBackend, address.getId(), namespace, ListCoder.of(coder)); @@ -451,12 +450,12 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> { implements CombiningState<InputT, AccumT, OutputT> { private final StateNamespace namespace; - private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address; + private final StateTag<CombiningState<InputT, AccumT, OutputT>> address; private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn; FlinkCombiningState( DefaultOperatorStateBackend flinkStateBackend, - StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, + StateTag<CombiningState<InputT, AccumT, OutputT>> address, Combine.CombineFn<InputT, AccumT, OutputT> combineFn, StateNamespace namespace, Coder<AccumT> accumCoder) { @@ -568,13 +567,13 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> { implements CombiningState<InputT, AccumT, OutputT> { private final StateNamespace namespace; - private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address; + private final StateTag<CombiningState<InputT, AccumT, OutputT>> address; private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn; private final FlinkBroadcastStateInternals<K> flinkStateInternals; FlinkKeyedCombiningState( DefaultOperatorStateBackend flinkStateBackend, - StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, + StateTag<CombiningState<InputT, AccumT, OutputT>> address, Combine.CombineFn<InputT, AccumT, OutputT> combineFn, StateNamespace namespace, Coder<AccumT> accumCoder, @@ -704,14 +703,14 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> { implements CombiningState<InputT, AccumT, OutputT> { private final StateNamespace namespace; - private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address; + private final StateTag<CombiningState<InputT, AccumT, OutputT>> address; private final CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn; private final FlinkBroadcastStateInternals<K> flinkStateInternals; private final CombineWithContext.Context context; FlinkCombiningStateWithContext( DefaultOperatorStateBackend flinkStateBackend, - StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, + StateTag<CombiningState<InputT, AccumT, OutputT>> address, CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn, StateNamespace namespace, Coder<AccumT> accumCoder,
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java index 67d7966..c9b7797 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java @@ -37,7 +37,6 @@ import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.CombineWithContext; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.state.BagState; @@ -67,7 +66,7 @@ import org.apache.flink.util.Preconditions; * * <p>Reference from {@link HeapInternalTimerService} to the local key-group range. */ -public class FlinkKeyGroupStateInternals<K> implements StateInternals<K> { +public class FlinkKeyGroupStateInternals<K> implements StateInternals { private final Coder<K> keyCoder; private final KeyGroupsList localKeyGroupRange; @@ -109,7 +108,7 @@ public class FlinkKeyGroupStateInternals<K> implements StateInternals<K> { @Override public <T extends State> T state( final StateNamespace namespace, - StateTag<? super K, T> address) { + StateTag<T> address) { return state(namespace, address, StateContexts.nullContext()); } @@ -117,36 +116,36 @@ public class FlinkKeyGroupStateInternals<K> implements StateInternals<K> { @Override public <T extends State> T state( final StateNamespace namespace, - StateTag<? super K, T> address, + StateTag<T> address, final StateContext<?> context) { return address.bind( - new StateTag.StateBinder<K>() { + new StateTag.StateBinder() { @Override public <T> ValueState<T> bindValue( - StateTag<? super K, ValueState<T>> address, Coder<T> coder) { + StateTag<ValueState<T>> address, Coder<T> coder) { throw new UnsupportedOperationException( String.format("%s is not supported", ValueState.class.getSimpleName())); } @Override public <T> BagState<T> bindBag( - StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) { + StateTag<BagState<T>> address, Coder<T> elemCoder) { return new FlinkKeyGroupBagState<>(address, namespace, elemCoder); } @Override public <T> SetState<T> bindSet( - StateTag<? super K, SetState<T>> address, Coder<T> elemCoder) { + StateTag<SetState<T>> address, Coder<T> elemCoder) { throw new UnsupportedOperationException( String.format("%s is not supported", SetState.class.getSimpleName())); } @Override public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap( - StateTag<? super K, MapState<KeyT, ValueT>> spec, + StateTag<MapState<KeyT, ValueT>> spec, Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) { throw new UnsupportedOperationException( @@ -156,7 +155,7 @@ public class FlinkKeyGroupStateInternals<K> implements StateInternals<K> { @Override public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValue( - StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, + StateTag<CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, Combine.CombineFn<InputT, AccumT, OutputT> combineFn) { throw new UnsupportedOperationException("bindCombiningValue is not supported."); @@ -165,7 +164,7 @@ public class FlinkKeyGroupStateInternals<K> implements StateInternals<K> { @Override public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext( - StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, + StateTag<CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn) { throw new UnsupportedOperationException( @@ -173,8 +172,8 @@ public class FlinkKeyGroupStateInternals<K> implements StateInternals<K> { } @Override - public <W extends BoundedWindow> WatermarkHoldState bindWatermark( - StateTag<? super K, WatermarkHoldState> address, + public WatermarkHoldState bindWatermark( + StateTag<WatermarkHoldState> address, TimestampCombiner timestampCombiner) { throw new UnsupportedOperationException( String.format("%s is not supported", CombiningState.class.getSimpleName())); @@ -334,10 +333,10 @@ public class FlinkKeyGroupStateInternals<K> implements StateInternals<K> { implements BagState<T> { private final StateNamespace namespace; - private final StateTag<? super K, BagState<T>> address; + private final StateTag<BagState<T>> address; FlinkKeyGroupBagState( - StateTag<? super K, BagState<T>> address, + StateTag<BagState<T>> address, StateNamespace namespace, Coder<T> coder) { super(address.getId(), namespace.stringKey(), ListCoder.of(coder), http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java index ef6c3b2..3d38f88 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java @@ -26,7 +26,6 @@ import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.CombineWithContext; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.util.state.BagState; import org.apache.beam.sdk.util.state.CombiningState; @@ -53,7 +52,7 @@ import org.apache.flink.runtime.state.OperatorStateBackend; * Ignore index of key and namespace. * Just implement BagState. */ -public class FlinkSplitStateInternals<K> implements StateInternals<K> { +public class FlinkSplitStateInternals<K> implements StateInternals { private final OperatorStateBackend stateBackend; @@ -69,7 +68,7 @@ public class FlinkSplitStateInternals<K> implements StateInternals<K> { @Override public <T extends State> T state( final StateNamespace namespace, - StateTag<? super K, T> address) { + StateTag<T> address) { return state(namespace, address, StateContexts.nullContext()); } @@ -77,36 +76,36 @@ public class FlinkSplitStateInternals<K> implements StateInternals<K> { @Override public <T extends State> T state( final StateNamespace namespace, - StateTag<? super K, T> address, + StateTag<T> address, final StateContext<?> context) { return address.bind( - new StateTag.StateBinder<K>() { + new StateTag.StateBinder() { @Override public <T> ValueState<T> bindValue( - StateTag<? super K, ValueState<T>> address, Coder<T> coder) { + StateTag<ValueState<T>> address, Coder<T> coder) { throw new UnsupportedOperationException( String.format("%s is not supported", ValueState.class.getSimpleName())); } @Override public <T> BagState<T> bindBag( - StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) { + StateTag<BagState<T>> address, Coder<T> elemCoder) { return new FlinkSplitBagState<>(stateBackend, address, namespace, elemCoder); } @Override public <T> SetState<T> bindSet( - StateTag<? super K, SetState<T>> address, Coder<T> elemCoder) { + StateTag<SetState<T>> address, Coder<T> elemCoder) { throw new UnsupportedOperationException( String.format("%s is not supported", SetState.class.getSimpleName())); } @Override public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap( - StateTag<? super K, MapState<KeyT, ValueT>> spec, + StateTag<MapState<KeyT, ValueT>> spec, Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) { throw new UnsupportedOperationException( @@ -116,7 +115,7 @@ public class FlinkSplitStateInternals<K> implements StateInternals<K> { @Override public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValue( - StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, + StateTag<CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, Combine.CombineFn<InputT, AccumT, OutputT> combineFn) { throw new UnsupportedOperationException("bindCombiningValue is not supported."); @@ -125,7 +124,7 @@ public class FlinkSplitStateInternals<K> implements StateInternals<K> { @Override public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext( - StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, + StateTag<CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn) { throw new UnsupportedOperationException( @@ -133,8 +132,8 @@ public class FlinkSplitStateInternals<K> implements StateInternals<K> { } @Override - public <W extends BoundedWindow> WatermarkHoldState bindWatermark( - StateTag<? super K, WatermarkHoldState> address, + public WatermarkHoldState bindWatermark( + StateTag<WatermarkHoldState> address, TimestampCombiner timestampCombiner) { throw new UnsupportedOperationException( String.format("%s is not supported", CombiningState.class.getSimpleName())); @@ -147,11 +146,11 @@ public class FlinkSplitStateInternals<K> implements StateInternals<K> { private final ListStateDescriptor<T> descriptor; private OperatorStateBackend flinkStateBackend; private final StateNamespace namespace; - private final StateTag<? super K, BagState<T>> address; + private final StateTag<BagState<T>> address; FlinkSplitBagState( OperatorStateBackend flinkStateBackend, - StateTag<? super K, BagState<T>> address, + StateTag<BagState<T>> address, StateNamespace namespace, Coder<T> coder) { this.flinkStateBackend = flinkStateBackend; http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java index c99d085..c033be6 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java @@ -57,7 +57,7 @@ import org.joda.time.Instant; * <p>Note: In the Flink streaming runner the key is always encoded * using an {@link Coder} and stored in a {@link ByteBuffer}. */ -public class FlinkStateInternals<K> implements StateInternals<K> { +public class FlinkStateInternals<K> implements StateInternals { private final KeyedStateBackend<ByteBuffer> flinkStateBackend; private Coder<K> keyCoder; @@ -95,7 +95,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> { @Override public <T extends State> T state( final StateNamespace namespace, - StateTag<? super K, T> address) { + StateTag<T> address) { return state(namespace, address, StateContexts.nullContext()); } @@ -103,36 +103,36 @@ public class FlinkStateInternals<K> implements StateInternals<K> { @Override public <T extends State> T state( final StateNamespace namespace, - StateTag<? super K, T> address, + StateTag<T> address, final StateContext<?> context) { return address.bind( - new StateTag.StateBinder<K>() { + new StateTag.StateBinder() { @Override public <T> ValueState<T> bindValue( - StateTag<? super K, ValueState<T>> address, Coder<T> coder) { + StateTag<ValueState<T>> address, Coder<T> coder) { return new FlinkValueState<>(flinkStateBackend, address, namespace, coder); } @Override public <T> BagState<T> bindBag( - StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) { + StateTag<BagState<T>> address, Coder<T> elemCoder) { return new FlinkBagState<>(flinkStateBackend, address, namespace, elemCoder); } @Override public <T> SetState<T> bindSet( - StateTag<? super K, SetState<T>> address, Coder<T> elemCoder) { + StateTag<SetState<T>> address, Coder<T> elemCoder) { throw new UnsupportedOperationException( String.format("%s is not supported", SetState.class.getSimpleName())); } @Override public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap( - StateTag<? super K, MapState<KeyT, ValueT>> spec, + StateTag<MapState<KeyT, ValueT>> spec, Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) { throw new UnsupportedOperationException( @@ -142,7 +142,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> { @Override public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValue( - StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, + StateTag<CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, Combine.CombineFn<InputT, AccumT, OutputT> combineFn) { @@ -153,7 +153,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> { @Override public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext( - StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, + StateTag<CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn) { return new FlinkCombiningStateWithContext<>( @@ -167,8 +167,8 @@ public class FlinkStateInternals<K> implements StateInternals<K> { } @Override - public <W extends BoundedWindow> WatermarkHoldState bindWatermark( - StateTag<? super K, WatermarkHoldState> address, + public WatermarkHoldState bindWatermark( + StateTag<WatermarkHoldState> address, TimestampCombiner timestampCombiner) { return new FlinkWatermarkHoldState<>( @@ -180,13 +180,13 @@ public class FlinkStateInternals<K> implements StateInternals<K> { private static class FlinkValueState<K, T> implements ValueState<T> { private final StateNamespace namespace; - private final StateTag<? super K, ValueState<T>> address; + private final StateTag<ValueState<T>> address; private final ValueStateDescriptor<T> flinkStateDescriptor; private final KeyedStateBackend<ByteBuffer> flinkStateBackend; FlinkValueState( KeyedStateBackend<ByteBuffer> flinkStateBackend, - StateTag<? super K, ValueState<T>> address, + StateTag<ValueState<T>> address, StateNamespace namespace, Coder<T> coder) { @@ -266,13 +266,13 @@ public class FlinkStateInternals<K> implements StateInternals<K> { private static class FlinkBagState<K, T> implements BagState<T> { private final StateNamespace namespace; - private final StateTag<? super K, BagState<T>> address; + private final StateTag<BagState<T>> address; private final ListStateDescriptor<T> flinkStateDescriptor; private final KeyedStateBackend<ByteBuffer> flinkStateBackend; FlinkBagState( KeyedStateBackend<ByteBuffer> flinkStateBackend, - StateTag<? super K, BagState<T>> address, + StateTag<BagState<T>> address, StateNamespace namespace, Coder<T> coder) { @@ -379,14 +379,14 @@ public class FlinkStateInternals<K> implements StateInternals<K> { implements CombiningState<InputT, AccumT, OutputT> { private final StateNamespace namespace; - private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address; + private final StateTag<CombiningState<InputT, AccumT, OutputT>> address; private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn; private final ValueStateDescriptor<AccumT> flinkStateDescriptor; private final KeyedStateBackend<ByteBuffer> flinkStateBackend; FlinkCombiningState( KeyedStateBackend<ByteBuffer> flinkStateBackend, - StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, + StateTag<CombiningState<InputT, AccumT, OutputT>> address, Combine.CombineFn<InputT, AccumT, OutputT> combineFn, StateNamespace namespace, Coder<AccumT> accumCoder) { @@ -547,7 +547,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> { implements CombiningState<InputT, AccumT, OutputT> { private final StateNamespace namespace; - private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address; + private final StateTag<CombiningState<InputT, AccumT, OutputT>> address; private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn; private final ValueStateDescriptor<AccumT> flinkStateDescriptor; private final KeyedStateBackend<ByteBuffer> flinkStateBackend; @@ -555,7 +555,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> { FlinkKeyedCombiningState( KeyedStateBackend<ByteBuffer> flinkStateBackend, - StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, + StateTag<CombiningState<InputT, AccumT, OutputT>> address, Combine.CombineFn<InputT, AccumT, OutputT> combineFn, StateNamespace namespace, Coder<AccumT> accumCoder, @@ -718,7 +718,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> { implements CombiningState<InputT, AccumT, OutputT> { private final StateNamespace namespace; - private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address; + private final StateTag<CombiningState<InputT, AccumT, OutputT>> address; private final CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn; private final ValueStateDescriptor<AccumT> flinkStateDescriptor; private final KeyedStateBackend<ByteBuffer> flinkStateBackend; @@ -727,7 +727,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> { FlinkCombiningStateWithContext( KeyedStateBackend<ByteBuffer> flinkStateBackend, - StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, + StateTag<CombiningState<InputT, AccumT, OutputT>> address, CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn, StateNamespace namespace, Coder<AccumT> accumCoder, @@ -886,7 +886,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> { private static class FlinkWatermarkHoldState<K, W extends BoundedWindow> implements WatermarkHoldState { - private final StateTag<? super K, WatermarkHoldState> address; + private final StateTag<WatermarkHoldState> address; private final TimestampCombiner timestampCombiner; private final StateNamespace namespace; private final KeyedStateBackend<ByteBuffer> flinkStateBackend; @@ -896,7 +896,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> { public FlinkWatermarkHoldState( KeyedStateBackend<ByteBuffer> flinkStateBackend, FlinkStateInternals<K> flinkStateInternals, - StateTag<? super K, WatermarkHoldState> address, + StateTag<WatermarkHoldState> address, StateNamespace namespace, TimestampCombiner timestampCombiner) { this.address = address; http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java index 4e18ac2..bda30e4 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java @@ -196,7 +196,7 @@ public class DoFnOperatorTest { DoFn<Integer, String> fn = new DoFn<Integer, String>() { @StateId("state") - private final StateSpec<Object, ValueState<String>> stateSpec = + private final StateSpec<ValueState<String>> stateSpec = StateSpecs.value(StringUtf8Coder.of()); @ProcessElement @@ -296,7 +296,7 @@ public class DoFnOperatorTest { private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME); @StateId(stateId) - private final StateSpec<Object, ValueState<String>> stateSpec = + private final StateSpec<ValueState<String>> stateSpec = StateSpecs.value(StringUtf8Coder.of()); @ProcessElement http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java index 7e7d1e1..eb2c05f 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java @@ -56,12 +56,12 @@ public class FlinkBroadcastStateInternalsTest { private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2"); private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3"); - private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR = + private static final StateTag<ValueState<String>> STRING_VALUE_ADDR = StateTags.value("stringValue", StringUtf8Coder.of()); - private static final StateTag<Object, CombiningState<Integer, int[], Integer>> + private static final StateTag<CombiningState<Integer, int[], Integer>> SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal( "sumInteger", VarIntCoder.of(), Sum.ofIntegers()); - private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR = + private static final StateTag<BagState<String>> STRING_BAG_ADDR = StateTags.bag("stringBag", StringUtf8Coder.of()); FlinkBroadcastStateInternals<String> underTest; http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkKeyGroupStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkKeyGroupStateInternalsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkKeyGroupStateInternalsTest.java index 5433d07..0e0267b 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkKeyGroupStateInternalsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkKeyGroupStateInternalsTest.java @@ -64,7 +64,7 @@ public class FlinkKeyGroupStateInternalsTest { private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2"); private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3"); - private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR = + private static final StateTag<BagState<String>> STRING_BAG_ADDR = StateTags.bag("stringBag", StringUtf8Coder.of()); FlinkKeyGroupStateInternals<String> underTest; http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkSplitStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkSplitStateInternalsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkSplitStateInternalsTest.java index 08ae0c4..8033a9d 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkSplitStateInternalsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkSplitStateInternalsTest.java @@ -47,7 +47,7 @@ public class FlinkSplitStateInternalsTest { private static final StateNamespace NAMESPACE_1 = new StateNamespaceForTest("ns1"); private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2"); - private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR = + private static final StateTag<BagState<String>> STRING_BAG_ADDR = StateTags.bag("stringBag", StringUtf8Coder.of()); FlinkSplitStateInternals<String> underTest; http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java index 17c43bf..cd00d9e 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java @@ -70,18 +70,18 @@ public class FlinkStateInternalsTest { private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2"); private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3"); - private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR = + private static final StateTag<ValueState<String>> STRING_VALUE_ADDR = StateTags.value("stringValue", StringUtf8Coder.of()); - private static final StateTag<Object, CombiningState<Integer, int[], Integer>> + private static final StateTag<CombiningState<Integer, int[], Integer>> SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal( "sumInteger", VarIntCoder.of(), Sum.ofIntegers()); - private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR = + private static final StateTag<BagState<String>> STRING_BAG_ADDR = StateTags.bag("stringBag", StringUtf8Coder.of()); - private static final StateTag<Object, WatermarkHoldState> WATERMARK_EARLIEST_ADDR = + private static final StateTag<WatermarkHoldState> WATERMARK_EARLIEST_ADDR = StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST); - private static final StateTag<Object, WatermarkHoldState> WATERMARK_LATEST_ADDR = + private static final StateTag<WatermarkHoldState> WATERMARK_LATEST_ADDR = StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST); - private static final StateTag<Object, WatermarkHoldState> WATERMARK_EOW_ADDR = + private static final StateTag<WatermarkHoldState> WATERMARK_EOW_ADDR = StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW); FlinkStateInternals<String> underTest; http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java index ce7f678..38129ab 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java @@ -103,7 +103,7 @@ public class BatchStatefulParDoOverridesTest implements Serializable { private static class DummyStatefulDoFn extends DoFn<KV<Integer, Integer>, Integer> { @StateId("foo") - private final StateSpec<Object, ValueState<Integer>> spec = StateSpecs.value(VarIntCoder.of()); + private final StateSpec<ValueState<Integer>> spec = StateSpecs.value(VarIntCoder.of()); @ProcessElement public void processElem(ProcessContext c) { http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index 343d51b..63e1166 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -890,7 +890,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { ParDo.of( new DoFn<KV<Integer, Integer>, Integer>() { @StateId("unused") - final StateSpec<Object, ValueState<Integer>> stateSpec = + final StateSpec<ValueState<Integer>> stateSpec = StateSpecs.value(VarIntCoder.of()); @ProcessElement http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java index cdc23ff..afaba3a 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java @@ -32,7 +32,6 @@ import org.apache.beam.sdk.coders.InstantCoder; import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.util.CombineFnUtil; import org.apache.beam.sdk.util.state.BagState; @@ -51,7 +50,7 @@ import org.joda.time.Instant; /** * An implementation of {@link StateInternals} for the SparkRunner. */ -class SparkStateInternals<K> implements StateInternals<K> { +class SparkStateInternals<K> implements StateInternals { private final K key; //Serializable state for internals (namespace to state tag to coded value). @@ -86,50 +85,47 @@ class SparkStateInternals<K> implements StateInternals<K> { } @Override - public <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> address) { + public <T extends State> T state(StateNamespace namespace, StateTag<T> address) { return state(namespace, address, StateContexts.nullContext()); } @Override public <T extends State> T state( StateNamespace namespace, - StateTag<? super K, T> address, + StateTag<T> address, StateContext<?> c) { - return address.bind(new SparkStateBinder(key, namespace, c)); + return address.bind(new SparkStateBinder(namespace, c)); } - private class SparkStateBinder implements StateBinder<K> { - private final K key; + private class SparkStateBinder implements StateBinder { private final StateNamespace namespace; private final StateContext<?> c; - private SparkStateBinder(K key, - StateNamespace namespace, + private SparkStateBinder(StateNamespace namespace, StateContext<?> c) { - this.key = key; this.namespace = namespace; this.c = c; } @Override - public <T> ValueState<T> bindValue(StateTag<? super K, ValueState<T>> address, Coder<T> coder) { + public <T> ValueState<T> bindValue(StateTag<ValueState<T>> address, Coder<T> coder) { return new SparkValueState<>(namespace, address, coder); } @Override - public <T> BagState<T> bindBag(StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) { + public <T> BagState<T> bindBag(StateTag<BagState<T>> address, Coder<T> elemCoder) { return new SparkBagState<>(namespace, address, elemCoder); } @Override - public <T> SetState<T> bindSet(StateTag<? super K, SetState<T>> spec, Coder<T> elemCoder) { + public <T> SetState<T> bindSet(StateTag<SetState<T>> spec, Coder<T> elemCoder) { throw new UnsupportedOperationException( String.format("%s is not supported", SetState.class.getSimpleName())); } @Override public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap( - StateTag<? super K, MapState<KeyT, ValueT>> spec, + StateTag<MapState<KeyT, ValueT>> spec, Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) { throw new UnsupportedOperationException( String.format("%s is not supported", MapState.class.getSimpleName())); @@ -138,7 +134,7 @@ class SparkStateInternals<K> implements StateInternals<K> { @Override public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValue( - StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, + StateTag<CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) { return new SparkCombiningState<>(namespace, address, accumCoder, combineFn); @@ -147,7 +143,7 @@ class SparkStateInternals<K> implements StateInternals<K> { @Override public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext( - StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, + StateTag<CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, CombineFnWithContext<InputT, AccumT, OutputT> combineFn) { return new SparkCombiningState<>( @@ -155,8 +151,8 @@ class SparkStateInternals<K> implements StateInternals<K> { } @Override - public <W extends BoundedWindow> WatermarkHoldState bindWatermark( - StateTag<? super K, WatermarkHoldState> address, + public WatermarkHoldState bindWatermark( + StateTag<WatermarkHoldState> address, TimestampCombiner timestampCombiner) { return new SparkWatermarkHoldState(namespace, address, timestampCombiner); } @@ -164,12 +160,12 @@ class SparkStateInternals<K> implements StateInternals<K> { private class AbstractState<T> { final StateNamespace namespace; - final StateTag<?, ? extends State> address; + final StateTag<? extends State> address; final Coder<T> coder; private AbstractState( StateNamespace namespace, - StateTag<?, ? extends State> address, + StateTag<? extends State> address, Coder<T> coder) { this.namespace = namespace; this.address = address; @@ -218,7 +214,7 @@ class SparkStateInternals<K> implements StateInternals<K> { private SparkValueState( StateNamespace namespace, - StateTag<?, ValueState<T>> address, + StateTag<ValueState<T>> address, Coder<T> coder) { super(namespace, address, coder); } @@ -246,7 +242,7 @@ class SparkStateInternals<K> implements StateInternals<K> { public SparkWatermarkHoldState( StateNamespace namespace, - StateTag<?, WatermarkHoldState> address, + StateTag<WatermarkHoldState> address, TimestampCombiner timestampCombiner) { super(namespace, address, InstantCoder.of()); this.timestampCombiner = timestampCombiner; @@ -300,7 +296,7 @@ class SparkStateInternals<K> implements StateInternals<K> { private SparkCombiningState( StateNamespace namespace, - StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, + StateTag<CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> coder, CombineFn<InputT, AccumT, OutputT> combineFn) { super(namespace, address, coder); @@ -363,7 +359,7 @@ class SparkStateInternals<K> implements StateInternals<K> { private final class SparkBagState<T> extends AbstractState<List<T>> implements BagState<T> { private SparkBagState( StateNamespace namespace, - StateTag<?, BagState<T>> address, + StateTag<BagState<T>> address, Coder<T> coder) { super(namespace, address, ListCoder.of(coder)); } http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java index 0a00c45..063feef 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java @@ -93,7 +93,7 @@ public class SparkGroupAlsoByWindowViaOutputBufferFn<K, InputT, W extends Bounde InMemoryTimerInternals timerInternals = new InMemoryTimerInternals(); timerInternals.advanceProcessingTime(Instant.now()); timerInternals.advanceSynchronizedProcessingTime(Instant.now()); - StateInternals<K> stateInternals = stateInternalsFactory.stateInternalsForKey(key); + StateInternals stateInternals = stateInternalsFactory.stateInternalsForKey(key); GABWOutputWindowedValue<K, InputT> outputter = new GABWOutputWindowedValue<>(); ReduceFnRunner<K, InputT, Iterable<InputT>, W> reduceFnRunner = http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java index 3e8dde5..ffe343b 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java @@ -124,7 +124,7 @@ class SparkProcessContext<FnInputT, FnOutputT, OutputT> { Coder<W> windowCoder) throws IOException { } @Override - public StateInternals<?> stateInternals() { + public StateInternals stateInternals() { return null; } http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java index ef1ff9f..7b6f9ed 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java @@ -65,7 +65,7 @@ public final class TranslationUtils { */ static class InMemoryStateInternalsFactory<K> implements StateInternalsFactory<K>, Serializable { @Override - public StateInternals<K> stateInternalsForKey(K key) { + public StateInternals stateInternalsForKey(K key) { return InMemoryStateInternals.forKey(key); } } http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java index 9b99ca4..0368476 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java @@ -331,10 +331,10 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD /** * Annotation for declaring and dereferencing state cells. * - * <p>To declare a state cell, create a field of type {@link StateSpec} annotated with a {@link - * StateId}. To use the cell during processing, add a parameter of the appropriate {@link State} - * subclass to your {@link ProcessElement @ProcessElement} or {@link OnTimer @OnTimer} method, and - * annotate it with {@link StateId}. See the following code for an example: + * <p>To declare a state cell, create a field of type {@link StateSpec} annotated with a + * {@link StateId}. To use the cell during processing, add a parameter of the appropriate {@link + * State} subclass to your {@link ProcessElement @ProcessElement} or {@link OnTimer @OnTimer} + * method, and annotate it with {@link StateId}. See the following code for an example: * * <pre><code>{@literal new DoFn<KV<Key, Foo>, Baz>()} { * http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java index b5547e3..02f3a85 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java @@ -112,14 +112,14 @@ public class GroupIntoBatches<K, InputT> private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME); @StateId(BATCH_ID) - private final StateSpec<Object, BagState<InputT>> batchSpec; + private final StateSpec<BagState<InputT>> batchSpec; @StateId(NUM_ELEMENTS_IN_BATCH_ID) - private final StateSpec<Object, CombiningState<Long, Long, Long>> + private final StateSpec<CombiningState<Long, Long, Long>> numElementsInBatchSpec; @StateId(KEY_ID) - private final StateSpec<Object, ValueState<K>> keySpec; + private final StateSpec<ValueState<K>> keySpec; private final long prefetchFrequency; http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java index 1f6afbf..6137a7b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java @@ -446,7 +446,7 @@ public class ParDo { Map<String, DoFnSignature.StateDeclaration> stateDeclarations = signature.stateDeclarations(); for (DoFnSignature.StateDeclaration stateDeclaration : stateDeclarations.values()) { try { - StateSpec<?, ?> stateSpec = (StateSpec<?, ?>) stateDeclaration.field().get(fn); + StateSpec<?> stateSpec = (StateSpec<?>) stateDeclaration.field().get(fn); stateSpec.offerCoders(codersForStateSpecTypes(stateDeclaration, coderRegistry, inputCoder)); stateSpec.finishSpecifying(); } catch (IllegalAccessException e) { http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java index 6fe37a1..48fa742 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java @@ -20,34 +20,36 @@ package org.apache.beam.sdk.util.state; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.CombineWithContext; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; /** * Visitor for binding a {@link StateSpec} and to the associated {@link State}. - * - * @param <K> the type of key this binder embodies. */ -public interface StateBinder<K> { - <T> ValueState<T> bindValue(String id, StateSpec<? super K, ValueState<T>> spec, Coder<T> coder); +public interface StateBinder { + <T> ValueState<T> bindValue( + String id, StateSpec<ValueState<T>> spec, Coder<T> coder); - <T> BagState<T> bindBag(String id, StateSpec<? super K, BagState<T>> spec, Coder<T> elemCoder); + <T> BagState<T> bindBag( + String id, StateSpec<BagState<T>> spec, Coder<T> elemCoder); - <T> SetState<T> bindSet(String id, StateSpec<? super K, SetState<T>> spec, Coder<T> elemCoder); + <T> SetState<T> bindSet( + String id, StateSpec<SetState<T>> spec, Coder<T> elemCoder); <KeyT, ValueT> MapState<KeyT, ValueT> bindMap( - String id, StateSpec<? super K, MapState<KeyT, ValueT>> spec, - Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder); + String id, + StateSpec<MapState<KeyT, ValueT>> spec, + Coder<KeyT> mapKeyCoder, + Coder<ValueT> mapValueCoder); <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombining( String id, - StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec, + StateSpec<CombiningState<InputT, AccumT, OutputT>> spec, Coder<AccumT> accumCoder, Combine.CombineFn<InputT, AccumT, OutputT> combineFn); <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningWithContext( String id, - StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec, + StateSpec<CombiningState<InputT, AccumT, OutputT>> spec, Coder<AccumT> accumCoder, CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn); @@ -57,8 +59,8 @@ public interface StateBinder<K> { * <p>This accepts the {@link TimestampCombiner} that dictates how watermark hold timestamps added * to the returned {@link WatermarkHoldState} are to be combined. */ - <W extends BoundedWindow> WatermarkHoldState bindWatermark( + WatermarkHoldState bindWatermark( String id, - StateSpec<? super K, WatermarkHoldState> spec, + StateSpec<WatermarkHoldState> spec, TimestampCombiner timestampCombiner); } http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpec.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpec.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpec.java index 6b94c40..8eda218 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpec.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpec.java @@ -26,23 +26,22 @@ import org.apache.beam.sdk.coders.Coder; * A specification of a persistent state cell. This includes information necessary to encode the * value and details about the intended access pattern. * - * @param <K> The type of key that must be used with the state tag. Contravariant: methods should - * accept values of type {@code StateSpec<? super K, StateT>}. * @param <StateT> The type of state being described. */ @Experimental(Kind.STATE) -public interface StateSpec<K, StateT extends State> extends Serializable { +public interface StateSpec<StateT extends State> extends Serializable { /** * Use the {@code binder} to create an instance of {@code StateT} appropriate for this address. */ - StateT bind(String id, StateBinder<? extends K> binder); + StateT bind(String id, StateBinder binder); /** - * Given {code coders} are inferred from type arguments defined for this class. - * Coders which are already set should take precedence over offered coders. - * @param coders Array of coders indexed by the type arguments order. Entries might be null if - * the coder could not be inferred. + * Given {code coders} are inferred from type arguments defined for this class. Coders which are + * already set should take precedence over offered coders. + * + * @param coders Array of coders indexed by the type arguments order. Entries might be null if the + * coder could not be inferred. */ void offerCoders(Coder[] coders); http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java index a057a0b..49d5722 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java @@ -28,7 +28,6 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; /** @@ -42,12 +41,12 @@ public class StateSpecs { private StateSpecs() {} /** Create a simple state spec for values of type {@code T}. */ - public static <T> StateSpec<Object, ValueState<T>> value() { + public static <T> StateSpec<ValueState<T>> value() { return new ValueStateSpec<>(null); } /** Create a simple state spec for values of type {@code T}. */ - public static <T> StateSpec<Object, ValueState<T>> value(Coder<T> valueCoder) { + public static <T> StateSpec<ValueState<T>> value(Coder<T> valueCoder) { checkArgument(valueCoder != null, "valueCoder should not be null. Consider value() instead"); return new ValueStateSpec<>(valueCoder); } @@ -57,17 +56,27 @@ public class StateSpecs { * {@code InputT}s into a single {@code OutputT}. */ public static <InputT, AccumT, OutputT> - StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combining( + StateSpec<CombiningState<InputT, AccumT, OutputT>> combining( CombineFn<InputT, AccumT, OutputT> combineFn) { return new CombiningStateSpec<InputT, AccumT, OutputT>(null, combineFn); } /** + * Create a state spec for values that use a {@link CombineFnWithContext} to automatically merge + * multiple {@code InputT}s into a single {@code OutputT}. + */ + public static <InputT, AccumT, OutputT> + StateSpec<CombiningState<InputT, AccumT, OutputT>> combining( + CombineFnWithContext<InputT, AccumT, OutputT> combineFn) { + return new CombiningWithContextStateSpec<InputT, AccumT, OutputT>(null, combineFn); + } + + /** * Create a state spec for values that use a {@link CombineFn} to automatically merge multiple * {@code InputT}s into a single {@code OutputT}. */ public static <InputT, AccumT, OutputT> - StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combining( + StateSpec<CombiningState<InputT, AccumT, OutputT>> combining( Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) { checkArgument(accumCoder != null, "accumCoder should not be null. " @@ -80,11 +89,8 @@ public class StateSpecs { * multiple {@code InputT}s into a single {@code OutputT}. */ public static <InputT, AccumT, OutputT> - StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combining( + StateSpec<CombiningState<InputT, AccumT, OutputT>> combining( Coder<AccumT> accumCoder, CombineFnWithContext<InputT, AccumT, OutputT> combineFn) { - checkArgument(accumCoder != null, - "accumCoder should not be null. " - + "Consider using combining(CombineFn<> combineFn) instead."); return combiningInternal(accumCoder, combineFn); } @@ -96,7 +102,7 @@ public class StateSpecs { * only be used to initialize static values. */ public static <InputT, AccumT, OutputT> - StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> + StateSpec<CombiningState<InputT, AccumT, OutputT>> combiningFromInputInternal( Coder<InputT> inputCoder, CombineFn<InputT, AccumT, OutputT> combineFn) { try { @@ -113,13 +119,13 @@ public class StateSpecs { } private static <InputT, AccumT, OutputT> - StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combiningInternal( + StateSpec<CombiningState<InputT, AccumT, OutputT>> combiningInternal( Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) { return new CombiningStateSpec<InputT, AccumT, OutputT>(accumCoder, combineFn); } private static <InputT, AccumT, OutputT> - StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combiningInternal( + StateSpec<CombiningState<InputT, AccumT, OutputT>> combiningInternal( Coder<AccumT> accumCoder, CombineFnWithContext<InputT, AccumT, OutputT> combineFn) { return new CombiningWithContextStateSpec<InputT, AccumT, OutputT>(accumCoder, combineFn); } @@ -128,7 +134,7 @@ public class StateSpecs { * Create a state spec that is optimized for adding values frequently, and occasionally retrieving * all the values that have been added. */ - public static <T> StateSpec<Object, BagState<T>> bag() { + public static <T> StateSpec<BagState<T>> bag() { return bag(null); } @@ -136,49 +142,46 @@ public class StateSpecs { * Create a state spec that is optimized for adding values frequently, and occasionally retrieving * all the values that have been added. */ - public static <T> StateSpec<Object, BagState<T>> bag(Coder<T> elemCoder) { + public static <T> StateSpec<BagState<T>> bag(Coder<T> elemCoder) { return new BagStateSpec<>(elemCoder); } /** * Create a state spec that supporting for {@link java.util.Set} like access patterns. */ - public static <T> StateSpec<Object, SetState<T>> set() { + public static <T> StateSpec<SetState<T>> set() { return set(null); } /** * Create a state spec that supporting for {@link java.util.Set} like access patterns. */ - public static <T> StateSpec<Object, SetState<T>> set(Coder<T> elemCoder) { + public static <T> StateSpec<SetState<T>> set(Coder<T> elemCoder) { return new SetStateSpec<>(elemCoder); } /** * Create a state spec that supporting for {@link java.util.Map} like access patterns. */ - public static <K, V> StateSpec<Object, MapState<K, V>> map() { + public static <K, V> StateSpec<MapState<K, V>> map() { return new MapStateSpec<>(null, null); } - /** - * Create a state spec that supporting for {@link java.util.Map} like access patterns. - */ - public static <K, V> StateSpec<Object, MapState<K, V>> map(Coder<K> keyCoder, - Coder<V> valueCoder) { + /** Create a state spec that supporting for {@link java.util.Map} like access patterns. */ + public static <K, V> StateSpec<MapState<K, V>> map(Coder<K> keyCoder, Coder<V> valueCoder) { return new MapStateSpec<>(keyCoder, valueCoder); } /** Create a state spec for holding the watermark. */ - public static <W extends BoundedWindow> - StateSpec<Object, WatermarkHoldState> watermarkStateInternal( + public static + StateSpec<WatermarkHoldState> watermarkStateInternal( TimestampCombiner timestampCombiner) { - return new WatermarkStateSpecInternal<W>(timestampCombiner); + return new WatermarkStateSpecInternal(timestampCombiner); } - public static <K, InputT, AccumT, OutputT> - StateSpec<Object, BagState<AccumT>> convertToBagSpecInternal( - StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> combiningSpec) { + public static <InputT, AccumT, OutputT> + StateSpec<BagState<AccumT>> convertToBagSpecInternal( + StateSpec<CombiningState<InputT, AccumT, OutputT>> combiningSpec) { if (combiningSpec instanceof CombiningStateSpec) { // Checked above; conversion to a bag spec depends on the provided spec being one of those // created via the factory methods in this class. @@ -201,7 +204,7 @@ public class StateSpecs { * * <p>Includes the coder for {@code T}. */ - private static class ValueStateSpec<T> implements StateSpec<Object, ValueState<T>> { + private static class ValueStateSpec<T> implements StateSpec<ValueState<T>> { @Nullable private Coder<T> coder; @@ -211,7 +214,7 @@ public class StateSpecs { } @Override - public ValueState<T> bind(String id, StateBinder<?> visitor) { + public ValueState<T> bind(String id, StateBinder visitor) { return visitor.bindValue(id, this, coder); } @@ -260,7 +263,7 @@ public class StateSpecs { * <p>Includes the {@link CombineFn} and the coder for the accumulator type. */ private static class CombiningStateSpec<InputT, AccumT, OutputT> - implements StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> { + implements StateSpec<CombiningState<InputT, AccumT, OutputT>> { @Nullable private Coder<AccumT> accumCoder; @@ -275,7 +278,7 @@ public class StateSpecs { @Override public CombiningState<InputT, AccumT, OutputT> bind( - String id, StateBinder<? extends Object> visitor) { + String id, StateBinder visitor) { return visitor.bindCombining(id, this, accumCoder, combineFn); } @@ -320,7 +323,7 @@ public class StateSpecs { return Objects.hash(getClass(), accumCoder); } - private StateSpec<Object, BagState<AccumT>> asBagSpec() { + private StateSpec<BagState<AccumT>> asBagSpec() { return new BagStateSpec<AccumT>(accumCoder); } } @@ -332,7 +335,7 @@ public class StateSpecs { * <p>Includes the {@link CombineFnWithContext} and the coder for the accumulator type. */ private static class CombiningWithContextStateSpec<InputT, AccumT, OutputT> - implements StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> { + implements StateSpec<CombiningState<InputT, AccumT, OutputT>> { @Nullable private Coder<AccumT> accumCoder; private final CombineFnWithContext<InputT, AccumT, OutputT> combineFn; @@ -346,7 +349,7 @@ public class StateSpecs { @Override public CombiningState<InputT, AccumT, OutputT> bind( - String id, StateBinder<? extends Object> visitor) { + String id, StateBinder visitor) { return visitor.bindCombiningWithContext(id, this, accumCoder, combineFn); } @@ -392,7 +395,7 @@ public class StateSpecs { return Objects.hash(getClass(), accumCoder); } - private StateSpec<Object, BagState<AccumT>> asBagSpec() { + private StateSpec<BagState<AccumT>> asBagSpec() { return new BagStateSpec<AccumT>(accumCoder); } } @@ -403,7 +406,7 @@ public class StateSpecs { * * <p>Includes the coder for the element type {@code T}</p> */ - private static class BagStateSpec<T> implements StateSpec<Object, BagState<T>> { + private static class BagStateSpec<T> implements StateSpec<BagState<T>> { @Nullable private Coder<T> elemCoder; @@ -413,7 +416,7 @@ public class StateSpecs { } @Override - public BagState<T> bind(String id, StateBinder<?> visitor) { + public BagState<T> bind(String id, StateBinder visitor) { return visitor.bindBag(id, this, elemCoder); } @@ -456,7 +459,7 @@ public class StateSpecs { } } - private static class MapStateSpec<K, V> implements StateSpec<Object, MapState<K, V>> { + private static class MapStateSpec<K, V> implements StateSpec<MapState<K, V>> { @Nullable private Coder<K> keyCoder; @@ -469,7 +472,7 @@ public class StateSpecs { } @Override - public MapState<K, V> bind(String id, StateBinder<?> visitor) { + public MapState<K, V> bind(String id, StateBinder visitor) { return visitor.bindMap(id, this, keyCoder, valueCoder); } @@ -523,7 +526,7 @@ public class StateSpecs { * * <p>Includes the coder for the element type {@code T}</p> */ - private static class SetStateSpec<T> implements StateSpec<Object, SetState<T>> { + private static class SetStateSpec<T> implements StateSpec<SetState<T>> { @Nullable private Coder<T> elemCoder; @@ -533,7 +536,7 @@ public class StateSpecs { } @Override - public SetState<T> bind(String id, StateBinder<?> visitor) { + public SetState<T> bind(String id, StateBinder visitor) { return visitor.bindSet(id, this, elemCoder); } @@ -582,8 +585,7 @@ public class StateSpecs { * <p>Includes the {@link TimestampCombiner} according to which the output times * are combined. */ - private static class WatermarkStateSpecInternal<W extends BoundedWindow> - implements StateSpec<Object, WatermarkHoldState> { + private static class WatermarkStateSpecInternal implements StateSpec<WatermarkHoldState> { /** * When multiple output times are added to hold the watermark, this determines how they are @@ -597,7 +599,7 @@ public class StateSpecs { } @Override - public WatermarkHoldState bind(String id, StateBinder<?> visitor) { + public WatermarkHoldState bind(String id, StateBinder visitor) { return visitor.bindWatermark(id, this, timestampCombiner); }
