Use CoderTypeSerializer and remove unuse code in FlinkStateInternals

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4c365087
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4c365087
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4c365087

Branch: refs/heads/master
Commit: 4c36508733a69fafce0f7dfb86c71eee5eb6bc84
Parents: fe3d554
Author: JingsongLi <[email protected]>
Authored: Wed Jun 7 14:34:25 2017 +0800
Committer: Aljoscha Krettek <[email protected]>
Committed: Tue Jun 13 11:35:17 2017 +0200

----------------------------------------------------------------------
 .../streaming/state/FlinkStateInternals.java    | 198 +------------------
 1 file changed, 10 insertions(+), 188 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/4c365087/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
index f0d3278..d8771de 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
@@ -25,7 +25,6 @@ import java.util.Map;
 import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.StateTag;
-import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
 import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
@@ -196,9 +195,8 @@ public class FlinkStateInternals<K> implements 
StateInternals {
       this.address = address;
       this.flinkStateBackend = flinkStateBackend;
 
-      CoderTypeInformation<T> typeInfo = new CoderTypeInformation<>(coder);
-
-      flinkStateDescriptor = new ValueStateDescriptor<>(address.getId(), 
typeInfo, null);
+      flinkStateDescriptor = new ValueStateDescriptor<>(
+          address.getId(), new CoderTypeSerializer<>(coder));
     }
 
     @Override
@@ -282,9 +280,8 @@ public class FlinkStateInternals<K> implements 
StateInternals {
       this.address = address;
       this.flinkStateBackend = flinkStateBackend;
 
-      CoderTypeInformation<T> typeInfo = new CoderTypeInformation<>(coder);
-
-      flinkStateDescriptor = new ListStateDescriptor<>(address.getId(), 
typeInfo);
+      flinkStateDescriptor = new ListStateDescriptor<>(
+          address.getId(), new CoderTypeSerializer<>(coder));
     }
 
     @Override
@@ -398,9 +395,8 @@ public class FlinkStateInternals<K> implements 
StateInternals {
       this.combineFn = combineFn;
       this.flinkStateBackend = flinkStateBackend;
 
-      CoderTypeInformation<AccumT> typeInfo = new 
CoderTypeInformation<>(accumCoder);
-
-      flinkStateDescriptor = new ValueStateDescriptor<>(address.getId(), 
typeInfo, null);
+      flinkStateDescriptor = new ValueStateDescriptor<>(
+          address.getId(), new CoderTypeSerializer<>(accumCoder));
     }
 
     @Override
@@ -545,179 +541,6 @@ public class FlinkStateInternals<K> implements 
StateInternals {
     }
   }
 
-  private static class FlinkKeyedCombiningState<K, InputT, AccumT, OutputT>
-      implements CombiningState<InputT, AccumT, OutputT> {
-
-    private final StateNamespace namespace;
-    private final StateTag<CombiningState<InputT, AccumT, OutputT>> address;
-    private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
-    private final ValueStateDescriptor<AccumT> flinkStateDescriptor;
-    private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
-    private final FlinkStateInternals<K> flinkStateInternals;
-
-    FlinkKeyedCombiningState(
-        KeyedStateBackend<ByteBuffer> flinkStateBackend,
-        StateTag<CombiningState<InputT, AccumT, OutputT>> address,
-        Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
-        StateNamespace namespace,
-        Coder<AccumT> accumCoder,
-        FlinkStateInternals<K> flinkStateInternals) {
-
-      this.namespace = namespace;
-      this.address = address;
-      this.combineFn = combineFn;
-      this.flinkStateBackend = flinkStateBackend;
-      this.flinkStateInternals = flinkStateInternals;
-
-      CoderTypeInformation<AccumT> typeInfo = new 
CoderTypeInformation<>(accumCoder);
-
-      flinkStateDescriptor = new ValueStateDescriptor<>(address.getId(), 
typeInfo, null);
-    }
-
-    @Override
-    public CombiningState<InputT, AccumT, OutputT> readLater() {
-      return this;
-    }
-
-    @Override
-    public void add(InputT value) {
-      try {
-        org.apache.flink.api.common.state.ValueState<AccumT> state =
-            flinkStateBackend.getPartitionedState(
-                namespace.stringKey(),
-                StringSerializer.INSTANCE,
-                flinkStateDescriptor);
-
-        AccumT current = state.value();
-        if (current == null) {
-          current = combineFn.createAccumulator();
-        }
-        current = combineFn.addInput(current, value);
-        state.update(current);
-      } catch (RuntimeException re) {
-        throw re;
-      } catch (Exception e) {
-        throw new RuntimeException("Error adding to state." , e);
-      }
-    }
-
-    @Override
-    public void addAccum(AccumT accum) {
-      try {
-        org.apache.flink.api.common.state.ValueState<AccumT> state =
-            flinkStateBackend.getPartitionedState(
-                namespace.stringKey(),
-                StringSerializer.INSTANCE,
-                flinkStateDescriptor);
-
-        AccumT current = state.value();
-        if (current == null) {
-          state.update(accum);
-        } else {
-          current = combineFn.mergeAccumulators(Lists.newArrayList(current, 
accum));
-          state.update(current);
-        }
-      } catch (Exception e) {
-        throw new RuntimeException("Error adding to state.", e);
-      }
-    }
-
-    @Override
-    public AccumT getAccum() {
-      try {
-        return flinkStateBackend.getPartitionedState(
-            namespace.stringKey(),
-            StringSerializer.INSTANCE,
-            flinkStateDescriptor).value();
-      } catch (Exception e) {
-        throw new RuntimeException("Error reading state.", e);
-      }
-    }
-
-    @Override
-    public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
-      return combineFn.mergeAccumulators(accumulators);
-    }
-
-    @Override
-    public OutputT read() {
-      try {
-        org.apache.flink.api.common.state.ValueState<AccumT> state =
-            flinkStateBackend.getPartitionedState(
-                namespace.stringKey(),
-                StringSerializer.INSTANCE,
-                flinkStateDescriptor);
-
-        AccumT accum = state.value();
-        if (accum != null) {
-          return combineFn.extractOutput(accum);
-        } else {
-          return combineFn.extractOutput(combineFn.createAccumulator());
-        }
-      } catch (Exception e) {
-        throw new RuntimeException("Error reading state.", e);
-      }
-    }
-
-    @Override
-    public ReadableState<Boolean> isEmpty() {
-      return new ReadableState<Boolean>() {
-        @Override
-        public Boolean read() {
-          try {
-            return flinkStateBackend.getPartitionedState(
-                namespace.stringKey(),
-                StringSerializer.INSTANCE,
-                flinkStateDescriptor).value() == null;
-          } catch (Exception e) {
-            throw new RuntimeException("Error reading state.", e);
-          }
-
-        }
-
-        @Override
-        public ReadableState<Boolean> readLater() {
-          return this;
-        }
-      };
-    }
-
-    @Override
-    public void clear() {
-      try {
-        flinkStateBackend.getPartitionedState(
-            namespace.stringKey(),
-            StringSerializer.INSTANCE,
-            flinkStateDescriptor).clear();
-      } catch (Exception e) {
-        throw new RuntimeException("Error clearing state.", e);
-      }
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-
-      FlinkKeyedCombiningState<?, ?, ?, ?> that =
-          (FlinkKeyedCombiningState<?, ?, ?, ?>) o;
-
-      return namespace.equals(that.namespace) && address.equals(that.address);
-
-    }
-
-    @Override
-    public int hashCode() {
-      int result = namespace.hashCode();
-      result = 31 * result + address.hashCode();
-      return result;
-    }
-  }
-
   private static class FlinkCombiningStateWithContext<K, InputT, AccumT, 
OutputT>
       implements CombiningState<InputT, AccumT, OutputT> {
 
@@ -745,9 +568,8 @@ public class FlinkStateInternals<K> implements 
StateInternals {
       this.flinkStateInternals = flinkStateInternals;
       this.context = context;
 
-      CoderTypeInformation<AccumT> typeInfo = new 
CoderTypeInformation<>(accumCoder);
-
-      flinkStateDescriptor = new ValueStateDescriptor<>(address.getId(), 
typeInfo, null);
+      flinkStateDescriptor = new ValueStateDescriptor<>(
+          address.getId(), new CoderTypeSerializer<>(accumCoder));
     }
 
     @Override
@@ -913,8 +735,8 @@ public class FlinkStateInternals<K> implements 
StateInternals {
       this.flinkStateBackend = flinkStateBackend;
       this.flinkStateInternals = flinkStateInternals;
 
-      CoderTypeInformation<Instant> typeInfo = new 
CoderTypeInformation<>(InstantCoder.of());
-      flinkStateDescriptor = new ValueStateDescriptor<>(address.getId(), 
typeInfo, null);
+      flinkStateDescriptor = new ValueStateDescriptor<>(
+          address.getId(), new CoderTypeSerializer<>(InstantCoder.of()));
     }
 
     @Override

Reply via email to