This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new f3cb463 Replace deprecated StateTag.StateBinder in FlinkStateInternals (#6754) f3cb463 is described below commit f3cb4630efe011f53fd0abe85c2f03836073faf6 Author: Maximilian Michels <m...@apache.org> AuthorDate: Tue Oct 23 17:21:45 2018 +0200 Replace deprecated StateTag.StateBinder in FlinkStateInternals (#6754) * Replace deprecated StateTag.StateBinder in FlinkStateInternals * Convert anonymous class / Pass only required dependencies --- .../streaming/state/FlinkStateInternals.java | 235 +++++++++++---------- 1 file changed, 124 insertions(+), 111 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java index a65e792..02a2ebe 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java @@ -38,7 +38,9 @@ import org.apache.beam.sdk.state.ReadableState; import org.apache.beam.sdk.state.ReadableStates; import org.apache.beam.sdk.state.SetState; import org.apache.beam.sdk.state.State; +import org.apache.beam.sdk.state.StateBinder; import org.apache.beam.sdk.state.StateContext; +import org.apache.beam.sdk.state.StateSpec; import org.apache.beam.sdk.state.ValueState; import org.apache.beam.sdk.state.WatermarkHoldState; import org.apache.beam.sdk.transforms.Combine; @@ -99,93 +101,108 @@ public class FlinkStateInternals<K> implements StateInternals { @Override public <T extends State> T state( - final StateNamespace namespace, StateTag<T> address, final StateContext<?> context) { - - return address.bind( - new StateTag.StateBinder() { + StateNamespace namespace, StateTag<T> address, StateContext<?> context) { + return address + .getSpec() + .bind( + address.getId(), + new FlinkStateBinder(namespace, context, flinkStateBackend, watermarkHolds)); + } - @Override - public <T2> ValueState<T2> bindValue(StateTag<ValueState<T2>> address, Coder<T2> coder) { + private static class FlinkStateBinder implements StateBinder { - return new FlinkValueState<>(flinkStateBackend, address, namespace, coder); - } - - @Override - public <T2> BagState<T2> bindBag(StateTag<BagState<T2>> address, Coder<T2> elemCoder) { + private final StateNamespace namespace; + private final StateContext<?> stateContext; + private final KeyedStateBackend<ByteBuffer> flinkStateBackend; + private final Map<String, Instant> watermarkHolds; - return new FlinkBagState<>(flinkStateBackend, address, namespace, elemCoder); - } + private FlinkStateBinder( + StateNamespace namespace, + StateContext<?> stateContext, + KeyedStateBackend<ByteBuffer> flinkStateBackend, + Map<String, Instant> watermarkHolds) { + this.namespace = namespace; + this.stateContext = stateContext; + this.flinkStateBackend = flinkStateBackend; + this.watermarkHolds = watermarkHolds; + } - @Override - public <T2> SetState<T2> bindSet(StateTag<SetState<T2>> address, Coder<T2> elemCoder) { - return new FlinkSetState<>(flinkStateBackend, address, namespace, elemCoder); - } + @Override + public <T2> ValueState<T2> bindValue( + String id, StateSpec<ValueState<T2>> spec, Coder<T2> coder) { + return new FlinkValueState<>(flinkStateBackend, id, namespace, coder); + } - @Override - public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap( - StateTag<MapState<KeyT, ValueT>> address, - Coder<KeyT> mapKeyCoder, - Coder<ValueT> mapValueCoder) { - return new FlinkMapState<>( - flinkStateBackend, address, namespace, mapKeyCoder, mapValueCoder); - } + @Override + public <T2> BagState<T2> bindBag(String id, StateSpec<BagState<T2>> spec, Coder<T2> elemCoder) { + return new FlinkBagState<>(flinkStateBackend, id, namespace, elemCoder); + } - @Override - public <InputT, AccumT, OutputT> - CombiningState<InputT, AccumT, OutputT> bindCombiningValue( - StateTag<CombiningState<InputT, AccumT, OutputT>> address, - Coder<AccumT> accumCoder, - Combine.CombineFn<InputT, AccumT, OutputT> combineFn) { + @Override + public <T2> SetState<T2> bindSet(String id, StateSpec<SetState<T2>> spec, Coder<T2> elemCoder) { + return new FlinkSetState<>(flinkStateBackend, id, namespace, elemCoder); + } - return new FlinkCombiningState<>( - flinkStateBackend, address, combineFn, namespace, accumCoder); - } + @Override + public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap( + String id, + StateSpec<MapState<KeyT, ValueT>> spec, + Coder<KeyT> mapKeyCoder, + Coder<ValueT> mapValueCoder) { + return new FlinkMapState<>(flinkStateBackend, id, namespace, mapKeyCoder, mapValueCoder); + } - @Override - public <InputT, AccumT, OutputT> - CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext( - StateTag<CombiningState<InputT, AccumT, OutputT>> address, - Coder<AccumT> accumCoder, - CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn) { - return new FlinkCombiningStateWithContext<>( - flinkStateBackend, - address, - combineFn, - namespace, - accumCoder, - FlinkStateInternals.this, - CombineContextFactory.createFromStateContext(context)); - } + @Override + public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombining( + String id, + StateSpec<CombiningState<InputT, AccumT, OutputT>> spec, + Coder<AccumT> accumCoder, + Combine.CombineFn<InputT, AccumT, OutputT> combineFn) { + return new FlinkCombiningState<>(flinkStateBackend, id, combineFn, namespace, accumCoder); + } - @Override - public WatermarkHoldState bindWatermark( - StateTag<WatermarkHoldState> address, TimestampCombiner timestampCombiner) { + @Override + public <InputT, AccumT, OutputT> + CombiningState<InputT, AccumT, OutputT> bindCombiningWithContext( + String id, + StateSpec<CombiningState<InputT, AccumT, OutputT>> spec, + Coder<AccumT> accumCoder, + CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn) { + return new FlinkCombiningStateWithContext<>( + flinkStateBackend, + id, + combineFn, + namespace, + accumCoder, + CombineContextFactory.createFromStateContext(stateContext)); + } - return new FlinkWatermarkHoldState<>( - flinkStateBackend, FlinkStateInternals.this, address, namespace, timestampCombiner); - } - }); + @Override + public WatermarkHoldState bindWatermark( + String id, StateSpec<WatermarkHoldState> spec, TimestampCombiner timestampCombiner) { + return new FlinkWatermarkHoldState<>( + flinkStateBackend, watermarkHolds, id, namespace, timestampCombiner); + } } - private static class FlinkValueState<K, T> implements ValueState<T> { + private static class FlinkValueState<T> implements ValueState<T> { private final StateNamespace namespace; - private final StateTag<ValueState<T>> address; + private final String stateId; private final ValueStateDescriptor<T> flinkStateDescriptor; private final KeyedStateBackend<ByteBuffer> flinkStateBackend; FlinkValueState( KeyedStateBackend<ByteBuffer> flinkStateBackend, - StateTag<ValueState<T>> address, + String stateId, StateNamespace namespace, Coder<T> coder) { this.namespace = namespace; - this.address = address; + this.stateId = stateId; this.flinkStateBackend = flinkStateBackend; - flinkStateDescriptor = - new ValueStateDescriptor<>(address.getId(), new CoderTypeSerializer<>(coder)); + flinkStateDescriptor = new ValueStateDescriptor<>(stateId, new CoderTypeSerializer<>(coder)); } @Override @@ -238,15 +255,15 @@ public class FlinkStateInternals<K> implements StateInternals { return false; } - FlinkValueState<?, ?> that = (FlinkValueState<?, ?>) o; + FlinkValueState<?> that = (FlinkValueState<?>) o; - return namespace.equals(that.namespace) && address.equals(that.address); + return namespace.equals(that.namespace) && stateId.equals(that.stateId); } @Override public int hashCode() { int result = namespace.hashCode(); - result = 31 * result + address.hashCode(); + result = 31 * result + stateId.hashCode(); return result; } } @@ -254,22 +271,21 @@ public class FlinkStateInternals<K> implements StateInternals { private static class FlinkBagState<K, T> implements BagState<T> { private final StateNamespace namespace; - private final StateTag<BagState<T>> address; + private final String stateId; private final ListStateDescriptor<T> flinkStateDescriptor; private final KeyedStateBackend<ByteBuffer> flinkStateBackend; FlinkBagState( KeyedStateBackend<ByteBuffer> flinkStateBackend, - StateTag<BagState<T>> address, + String stateId, StateNamespace namespace, Coder<T> coder) { this.namespace = namespace; - this.address = address; + this.stateId = stateId; this.flinkStateBackend = flinkStateBackend; - flinkStateDescriptor = - new ListStateDescriptor<>(address.getId(), new CoderTypeSerializer<>(coder)); + flinkStateDescriptor = new ListStateDescriptor<>(stateId, new CoderTypeSerializer<>(coder)); } @Override @@ -351,13 +367,13 @@ public class FlinkStateInternals<K> implements StateInternals { FlinkBagState<?, ?> that = (FlinkBagState<?, ?>) o; - return namespace.equals(that.namespace) && address.equals(that.address); + return namespace.equals(that.namespace) && stateId.equals(that.stateId); } @Override public int hashCode() { int result = namespace.hashCode(); - result = 31 * result + address.hashCode(); + result = 31 * result + stateId.hashCode(); return result; } } @@ -366,25 +382,25 @@ public class FlinkStateInternals<K> implements StateInternals { implements CombiningState<InputT, AccumT, OutputT> { private final StateNamespace namespace; - private final StateTag<CombiningState<InputT, AccumT, OutputT>> address; + private final String stateId; private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn; private final ValueStateDescriptor<AccumT> flinkStateDescriptor; private final KeyedStateBackend<ByteBuffer> flinkStateBackend; FlinkCombiningState( KeyedStateBackend<ByteBuffer> flinkStateBackend, - StateTag<CombiningState<InputT, AccumT, OutputT>> address, + String stateId, Combine.CombineFn<InputT, AccumT, OutputT> combineFn, StateNamespace namespace, Coder<AccumT> accumCoder) { this.namespace = namespace; - this.address = address; + this.stateId = stateId; this.combineFn = combineFn; this.flinkStateBackend = flinkStateBackend; flinkStateDescriptor = - new ValueStateDescriptor<>(address.getId(), new CoderTypeSerializer<>(accumCoder)); + new ValueStateDescriptor<>(stateId, new CoderTypeSerializer<>(accumCoder)); } @Override @@ -510,13 +526,13 @@ public class FlinkStateInternals<K> implements StateInternals { FlinkCombiningState<?, ?, ?, ?> that = (FlinkCombiningState<?, ?, ?, ?>) o; - return namespace.equals(that.namespace) && address.equals(that.address); + return namespace.equals(that.namespace) && stateId.equals(that.stateId); } @Override public int hashCode() { int result = namespace.hashCode(); - result = 31 * result + address.hashCode(); + result = 31 * result + stateId.hashCode(); return result; } } @@ -525,31 +541,28 @@ public class FlinkStateInternals<K> implements StateInternals { implements CombiningState<InputT, AccumT, OutputT> { private final StateNamespace namespace; - private final StateTag<CombiningState<InputT, AccumT, OutputT>> address; + private final String stateId; private final CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn; private final ValueStateDescriptor<AccumT> flinkStateDescriptor; private final KeyedStateBackend<ByteBuffer> flinkStateBackend; - private final FlinkStateInternals<K> flinkStateInternals; private final CombineWithContext.Context context; FlinkCombiningStateWithContext( KeyedStateBackend<ByteBuffer> flinkStateBackend, - StateTag<CombiningState<InputT, AccumT, OutputT>> address, + String stateId, CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn, StateNamespace namespace, Coder<AccumT> accumCoder, - FlinkStateInternals<K> flinkStateInternals, CombineWithContext.Context context) { this.namespace = namespace; - this.address = address; + this.stateId = stateId; this.combineFn = combineFn; this.flinkStateBackend = flinkStateBackend; - this.flinkStateInternals = flinkStateInternals; this.context = context; flinkStateDescriptor = - new ValueStateDescriptor<>(address.getId(), new CoderTypeSerializer<>(accumCoder)); + new ValueStateDescriptor<>(stateId, new CoderTypeSerializer<>(accumCoder)); } @Override @@ -676,40 +689,40 @@ public class FlinkStateInternals<K> implements StateInternals { FlinkCombiningStateWithContext<?, ?, ?, ?> that = (FlinkCombiningStateWithContext<?, ?, ?, ?>) o; - return namespace.equals(that.namespace) && address.equals(that.address); + return namespace.equals(that.namespace) && stateId.equals(that.stateId); } @Override public int hashCode() { int result = namespace.hashCode(); - result = 31 * result + address.hashCode(); + result = 31 * result + stateId.hashCode(); return result; } } private static class FlinkWatermarkHoldState<K, W extends BoundedWindow> implements WatermarkHoldState { - private final StateTag<WatermarkHoldState> address; + private final String stateId; private final TimestampCombiner timestampCombiner; private final StateNamespace namespace; private final KeyedStateBackend<ByteBuffer> flinkStateBackend; - private final FlinkStateInternals<K> flinkStateInternals; + private final Map<String, Instant> watermarkHolds; private final ValueStateDescriptor<Instant> flinkStateDescriptor; public FlinkWatermarkHoldState( KeyedStateBackend<ByteBuffer> flinkStateBackend, - FlinkStateInternals<K> flinkStateInternals, - StateTag<WatermarkHoldState> address, + Map<String, Instant> watermarkHolds, + String stateId, StateNamespace namespace, TimestampCombiner timestampCombiner) { - this.address = address; + this.stateId = stateId; this.timestampCombiner = timestampCombiner; this.namespace = namespace; this.flinkStateBackend = flinkStateBackend; - this.flinkStateInternals = flinkStateInternals; + this.watermarkHolds = watermarkHolds; flinkStateDescriptor = - new ValueStateDescriptor<>(address.getId(), new CoderTypeSerializer<>(InstantCoder.of())); + new ValueStateDescriptor<>(stateId, new CoderTypeSerializer<>(InstantCoder.of())); } @Override @@ -755,11 +768,11 @@ public class FlinkStateInternals<K> implements StateInternals { Instant current = state.value(); if (current == null) { state.update(value); - flinkStateInternals.watermarkHolds.put(namespace.stringKey(), value); + watermarkHolds.put(namespace.stringKey(), value); } else { Instant combined = timestampCombiner.combine(current, value); state.update(combined); - flinkStateInternals.watermarkHolds.put(namespace.stringKey(), combined); + watermarkHolds.put(namespace.stringKey(), combined); } } catch (Exception e) { throw new RuntimeException("Error updating state.", e); @@ -780,7 +793,7 @@ public class FlinkStateInternals<K> implements StateInternals { @Override public void clear() { - flinkStateInternals.watermarkHolds.remove(namespace.stringKey()); + watermarkHolds.remove(namespace.stringKey()); try { org.apache.flink.api.common.state.ValueState<Instant> state = flinkStateBackend.getPartitionedState( @@ -802,7 +815,7 @@ public class FlinkStateInternals<K> implements StateInternals { FlinkWatermarkHoldState<?, ?> that = (FlinkWatermarkHoldState<?, ?>) o; - if (!address.equals(that.address)) { + if (!stateId.equals(that.stateId)) { return false; } if (!timestampCombiner.equals(that.timestampCombiner)) { @@ -813,7 +826,7 @@ public class FlinkStateInternals<K> implements StateInternals { @Override public int hashCode() { - int result = address.hashCode(); + int result = stateId.hashCode(); result = 31 * result + timestampCombiner.hashCode(); result = 31 * result + namespace.hashCode(); return result; @@ -823,22 +836,22 @@ public class FlinkStateInternals<K> implements StateInternals { private static class FlinkMapState<KeyT, ValueT> implements MapState<KeyT, ValueT> { private final StateNamespace namespace; - private final StateTag<MapState<KeyT, ValueT>> address; + private final String stateId; private final MapStateDescriptor<KeyT, ValueT> flinkStateDescriptor; private final KeyedStateBackend<ByteBuffer> flinkStateBackend; FlinkMapState( KeyedStateBackend<ByteBuffer> flinkStateBackend, - StateTag<MapState<KeyT, ValueT>> address, + String stateId, StateNamespace namespace, Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) { this.namespace = namespace; - this.address = address; + this.stateId = stateId; this.flinkStateBackend = flinkStateBackend; this.flinkStateDescriptor = new MapStateDescriptor<>( - address.getId(), + stateId, new CoderTypeSerializer<>(mapKeyCoder), new CoderTypeSerializer<>(mapValueCoder)); } @@ -996,13 +1009,13 @@ public class FlinkStateInternals<K> implements StateInternals { FlinkMapState<?, ?> that = (FlinkMapState<?, ?>) o; - return namespace.equals(that.namespace) && address.equals(that.address); + return namespace.equals(that.namespace) && stateId.equals(that.stateId); } @Override public int hashCode() { int result = namespace.hashCode(); - result = 31 * result + address.hashCode(); + result = 31 * result + stateId.hashCode(); return result; } } @@ -1010,21 +1023,21 @@ public class FlinkStateInternals<K> implements StateInternals { private static class FlinkSetState<T> implements SetState<T> { private final StateNamespace namespace; - private final StateTag<SetState<T>> address; + private final String stateId; private final MapStateDescriptor<T, Boolean> flinkStateDescriptor; private final KeyedStateBackend<ByteBuffer> flinkStateBackend; FlinkSetState( KeyedStateBackend<ByteBuffer> flinkStateBackend, - StateTag<SetState<T>> address, + String stateId, StateNamespace namespace, Coder<T> coder) { this.namespace = namespace; - this.address = address; + this.stateId = stateId; this.flinkStateBackend = flinkStateBackend; this.flinkStateDescriptor = new MapStateDescriptor<>( - address.getId(), new CoderTypeSerializer<>(coder), new BooleanSerializer()); + stateId, new CoderTypeSerializer<>(coder), new BooleanSerializer()); } @Override @@ -1147,13 +1160,13 @@ public class FlinkStateInternals<K> implements StateInternals { FlinkSetState<?> that = (FlinkSetState<?>) o; - return namespace.equals(that.namespace) && address.equals(that.address); + return namespace.equals(that.namespace) && stateId.equals(that.stateId); } @Override public int hashCode() { int result = namespace.hashCode(); - result = 31 * result + address.hashCode(); + result = 31 * result + stateId.hashCode(); return result; } }