Use CoderTypeSerializer and remove unuse code in FlinkStateInternals
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4c365087 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4c365087 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4c365087 Branch: refs/heads/master Commit: 4c36508733a69fafce0f7dfb86c71eee5eb6bc84 Parents: fe3d554 Author: JingsongLi <[email protected]> Authored: Wed Jun 7 14:34:25 2017 +0800 Committer: Aljoscha Krettek <[email protected]> Committed: Tue Jun 13 11:35:17 2017 +0200 ---------------------------------------------------------------------- .../streaming/state/FlinkStateInternals.java | 198 +------------------ 1 file changed, 10 insertions(+), 188 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/4c365087/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java index f0d3278..d8771de 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java @@ -25,7 +25,6 @@ import java.util.Map; import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateTag; -import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; @@ -196,9 +195,8 @@ public class FlinkStateInternals<K> implements StateInternals { this.address = address; this.flinkStateBackend = flinkStateBackend; - CoderTypeInformation<T> typeInfo = new CoderTypeInformation<>(coder); - - flinkStateDescriptor = new ValueStateDescriptor<>(address.getId(), typeInfo, null); + flinkStateDescriptor = new ValueStateDescriptor<>( + address.getId(), new CoderTypeSerializer<>(coder)); } @Override @@ -282,9 +280,8 @@ public class FlinkStateInternals<K> implements StateInternals { this.address = address; this.flinkStateBackend = flinkStateBackend; - CoderTypeInformation<T> typeInfo = new CoderTypeInformation<>(coder); - - flinkStateDescriptor = new ListStateDescriptor<>(address.getId(), typeInfo); + flinkStateDescriptor = new ListStateDescriptor<>( + address.getId(), new CoderTypeSerializer<>(coder)); } @Override @@ -398,9 +395,8 @@ public class FlinkStateInternals<K> implements StateInternals { this.combineFn = combineFn; this.flinkStateBackend = flinkStateBackend; - CoderTypeInformation<AccumT> typeInfo = new CoderTypeInformation<>(accumCoder); - - flinkStateDescriptor = new ValueStateDescriptor<>(address.getId(), typeInfo, null); + flinkStateDescriptor = new ValueStateDescriptor<>( + address.getId(), new CoderTypeSerializer<>(accumCoder)); } @Override @@ -545,179 +541,6 @@ public class FlinkStateInternals<K> implements StateInternals { } } - private static class FlinkKeyedCombiningState<K, InputT, AccumT, OutputT> - implements CombiningState<InputT, AccumT, OutputT> { - - private final StateNamespace namespace; - private final StateTag<CombiningState<InputT, AccumT, OutputT>> address; - private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn; - private final ValueStateDescriptor<AccumT> flinkStateDescriptor; - private final KeyedStateBackend<ByteBuffer> flinkStateBackend; - private final FlinkStateInternals<K> flinkStateInternals; - - FlinkKeyedCombiningState( - KeyedStateBackend<ByteBuffer> flinkStateBackend, - StateTag<CombiningState<InputT, AccumT, OutputT>> address, - Combine.CombineFn<InputT, AccumT, OutputT> combineFn, - StateNamespace namespace, - Coder<AccumT> accumCoder, - FlinkStateInternals<K> flinkStateInternals) { - - this.namespace = namespace; - this.address = address; - this.combineFn = combineFn; - this.flinkStateBackend = flinkStateBackend; - this.flinkStateInternals = flinkStateInternals; - - CoderTypeInformation<AccumT> typeInfo = new CoderTypeInformation<>(accumCoder); - - flinkStateDescriptor = new ValueStateDescriptor<>(address.getId(), typeInfo, null); - } - - @Override - public CombiningState<InputT, AccumT, OutputT> readLater() { - return this; - } - - @Override - public void add(InputT value) { - try { - org.apache.flink.api.common.state.ValueState<AccumT> state = - flinkStateBackend.getPartitionedState( - namespace.stringKey(), - StringSerializer.INSTANCE, - flinkStateDescriptor); - - AccumT current = state.value(); - if (current == null) { - current = combineFn.createAccumulator(); - } - current = combineFn.addInput(current, value); - state.update(current); - } catch (RuntimeException re) { - throw re; - } catch (Exception e) { - throw new RuntimeException("Error adding to state." , e); - } - } - - @Override - public void addAccum(AccumT accum) { - try { - org.apache.flink.api.common.state.ValueState<AccumT> state = - flinkStateBackend.getPartitionedState( - namespace.stringKey(), - StringSerializer.INSTANCE, - flinkStateDescriptor); - - AccumT current = state.value(); - if (current == null) { - state.update(accum); - } else { - current = combineFn.mergeAccumulators(Lists.newArrayList(current, accum)); - state.update(current); - } - } catch (Exception e) { - throw new RuntimeException("Error adding to state.", e); - } - } - - @Override - public AccumT getAccum() { - try { - return flinkStateBackend.getPartitionedState( - namespace.stringKey(), - StringSerializer.INSTANCE, - flinkStateDescriptor).value(); - } catch (Exception e) { - throw new RuntimeException("Error reading state.", e); - } - } - - @Override - public AccumT mergeAccumulators(Iterable<AccumT> accumulators) { - return combineFn.mergeAccumulators(accumulators); - } - - @Override - public OutputT read() { - try { - org.apache.flink.api.common.state.ValueState<AccumT> state = - flinkStateBackend.getPartitionedState( - namespace.stringKey(), - StringSerializer.INSTANCE, - flinkStateDescriptor); - - AccumT accum = state.value(); - if (accum != null) { - return combineFn.extractOutput(accum); - } else { - return combineFn.extractOutput(combineFn.createAccumulator()); - } - } catch (Exception e) { - throw new RuntimeException("Error reading state.", e); - } - } - - @Override - public ReadableState<Boolean> isEmpty() { - return new ReadableState<Boolean>() { - @Override - public Boolean read() { - try { - return flinkStateBackend.getPartitionedState( - namespace.stringKey(), - StringSerializer.INSTANCE, - flinkStateDescriptor).value() == null; - } catch (Exception e) { - throw new RuntimeException("Error reading state.", e); - } - - } - - @Override - public ReadableState<Boolean> readLater() { - return this; - } - }; - } - - @Override - public void clear() { - try { - flinkStateBackend.getPartitionedState( - namespace.stringKey(), - StringSerializer.INSTANCE, - flinkStateDescriptor).clear(); - } catch (Exception e) { - throw new RuntimeException("Error clearing state.", e); - } - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - FlinkKeyedCombiningState<?, ?, ?, ?> that = - (FlinkKeyedCombiningState<?, ?, ?, ?>) o; - - return namespace.equals(that.namespace) && address.equals(that.address); - - } - - @Override - public int hashCode() { - int result = namespace.hashCode(); - result = 31 * result + address.hashCode(); - return result; - } - } - private static class FlinkCombiningStateWithContext<K, InputT, AccumT, OutputT> implements CombiningState<InputT, AccumT, OutputT> { @@ -745,9 +568,8 @@ public class FlinkStateInternals<K> implements StateInternals { this.flinkStateInternals = flinkStateInternals; this.context = context; - CoderTypeInformation<AccumT> typeInfo = new CoderTypeInformation<>(accumCoder); - - flinkStateDescriptor = new ValueStateDescriptor<>(address.getId(), typeInfo, null); + flinkStateDescriptor = new ValueStateDescriptor<>( + address.getId(), new CoderTypeSerializer<>(accumCoder)); } @Override @@ -913,8 +735,8 @@ public class FlinkStateInternals<K> implements StateInternals { this.flinkStateBackend = flinkStateBackend; this.flinkStateInternals = flinkStateInternals; - CoderTypeInformation<Instant> typeInfo = new CoderTypeInformation<>(InstantCoder.of()); - flinkStateDescriptor = new ValueStateDescriptor<>(address.getId(), typeInfo, null); + flinkStateDescriptor = new ValueStateDescriptor<>( + address.getId(), new CoderTypeSerializer<>(InstantCoder.of())); } @Override
