This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new f3cb463  Replace deprecated StateTag.StateBinder in 
FlinkStateInternals  (#6754)
f3cb463 is described below

commit f3cb4630efe011f53fd0abe85c2f03836073faf6
Author: Maximilian Michels <m...@apache.org>
AuthorDate: Tue Oct 23 17:21:45 2018 +0200

    Replace deprecated StateTag.StateBinder in FlinkStateInternals  (#6754)
    
    * Replace deprecated StateTag.StateBinder in FlinkStateInternals
    
    * Convert anonymous class / Pass only required dependencies
---
 .../streaming/state/FlinkStateInternals.java       | 235 +++++++++++----------
 1 file changed, 124 insertions(+), 111 deletions(-)

diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
index a65e792..02a2ebe 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
@@ -38,7 +38,9 @@ import org.apache.beam.sdk.state.ReadableState;
 import org.apache.beam.sdk.state.ReadableStates;
 import org.apache.beam.sdk.state.SetState;
 import org.apache.beam.sdk.state.State;
+import org.apache.beam.sdk.state.StateBinder;
 import org.apache.beam.sdk.state.StateContext;
+import org.apache.beam.sdk.state.StateSpec;
 import org.apache.beam.sdk.state.ValueState;
 import org.apache.beam.sdk.state.WatermarkHoldState;
 import org.apache.beam.sdk.transforms.Combine;
@@ -99,93 +101,108 @@ public class FlinkStateInternals<K> implements 
StateInternals {
 
   @Override
   public <T extends State> T state(
-      final StateNamespace namespace, StateTag<T> address, final 
StateContext<?> context) {
-
-    return address.bind(
-        new StateTag.StateBinder() {
+      StateNamespace namespace, StateTag<T> address, StateContext<?> context) {
+    return address
+        .getSpec()
+        .bind(
+            address.getId(),
+            new FlinkStateBinder(namespace, context, flinkStateBackend, 
watermarkHolds));
+  }
 
-          @Override
-          public <T2> ValueState<T2> bindValue(StateTag<ValueState<T2>> 
address, Coder<T2> coder) {
+  private static class FlinkStateBinder implements StateBinder {
 
-            return new FlinkValueState<>(flinkStateBackend, address, 
namespace, coder);
-          }
-
-          @Override
-          public <T2> BagState<T2> bindBag(StateTag<BagState<T2>> address, 
Coder<T2> elemCoder) {
+    private final StateNamespace namespace;
+    private final StateContext<?> stateContext;
+    private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
+    private final Map<String, Instant> watermarkHolds;
 
-            return new FlinkBagState<>(flinkStateBackend, address, namespace, 
elemCoder);
-          }
+    private FlinkStateBinder(
+        StateNamespace namespace,
+        StateContext<?> stateContext,
+        KeyedStateBackend<ByteBuffer> flinkStateBackend,
+        Map<String, Instant> watermarkHolds) {
+      this.namespace = namespace;
+      this.stateContext = stateContext;
+      this.flinkStateBackend = flinkStateBackend;
+      this.watermarkHolds = watermarkHolds;
+    }
 
-          @Override
-          public <T2> SetState<T2> bindSet(StateTag<SetState<T2>> address, 
Coder<T2> elemCoder) {
-            return new FlinkSetState<>(flinkStateBackend, address, namespace, 
elemCoder);
-          }
+    @Override
+    public <T2> ValueState<T2> bindValue(
+        String id, StateSpec<ValueState<T2>> spec, Coder<T2> coder) {
+      return new FlinkValueState<>(flinkStateBackend, id, namespace, coder);
+    }
 
-          @Override
-          public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
-              StateTag<MapState<KeyT, ValueT>> address,
-              Coder<KeyT> mapKeyCoder,
-              Coder<ValueT> mapValueCoder) {
-            return new FlinkMapState<>(
-                flinkStateBackend, address, namespace, mapKeyCoder, 
mapValueCoder);
-          }
+    @Override
+    public <T2> BagState<T2> bindBag(String id, StateSpec<BagState<T2>> spec, 
Coder<T2> elemCoder) {
+      return new FlinkBagState<>(flinkStateBackend, id, namespace, elemCoder);
+    }
 
-          @Override
-          public <InputT, AccumT, OutputT>
-              CombiningState<InputT, AccumT, OutputT> bindCombiningValue(
-                  StateTag<CombiningState<InputT, AccumT, OutputT>> address,
-                  Coder<AccumT> accumCoder,
-                  Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
+    @Override
+    public <T2> SetState<T2> bindSet(String id, StateSpec<SetState<T2>> spec, 
Coder<T2> elemCoder) {
+      return new FlinkSetState<>(flinkStateBackend, id, namespace, elemCoder);
+    }
 
-            return new FlinkCombiningState<>(
-                flinkStateBackend, address, combineFn, namespace, accumCoder);
-          }
+    @Override
+    public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
+        String id,
+        StateSpec<MapState<KeyT, ValueT>> spec,
+        Coder<KeyT> mapKeyCoder,
+        Coder<ValueT> mapValueCoder) {
+      return new FlinkMapState<>(flinkStateBackend, id, namespace, 
mapKeyCoder, mapValueCoder);
+    }
 
-          @Override
-          public <InputT, AccumT, OutputT>
-              CombiningState<InputT, AccumT, OutputT> 
bindCombiningValueWithContext(
-                  StateTag<CombiningState<InputT, AccumT, OutputT>> address,
-                  Coder<AccumT> accumCoder,
-                  CombineWithContext.CombineFnWithContext<InputT, AccumT, 
OutputT> combineFn) {
-            return new FlinkCombiningStateWithContext<>(
-                flinkStateBackend,
-                address,
-                combineFn,
-                namespace,
-                accumCoder,
-                FlinkStateInternals.this,
-                CombineContextFactory.createFromStateContext(context));
-          }
+    @Override
+    public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> 
bindCombining(
+        String id,
+        StateSpec<CombiningState<InputT, AccumT, OutputT>> spec,
+        Coder<AccumT> accumCoder,
+        Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
+      return new FlinkCombiningState<>(flinkStateBackend, id, combineFn, 
namespace, accumCoder);
+    }
 
-          @Override
-          public WatermarkHoldState bindWatermark(
-              StateTag<WatermarkHoldState> address, TimestampCombiner 
timestampCombiner) {
+    @Override
+    public <InputT, AccumT, OutputT>
+        CombiningState<InputT, AccumT, OutputT> bindCombiningWithContext(
+            String id,
+            StateSpec<CombiningState<InputT, AccumT, OutputT>> spec,
+            Coder<AccumT> accumCoder,
+            CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> 
combineFn) {
+      return new FlinkCombiningStateWithContext<>(
+          flinkStateBackend,
+          id,
+          combineFn,
+          namespace,
+          accumCoder,
+          CombineContextFactory.createFromStateContext(stateContext));
+    }
 
-            return new FlinkWatermarkHoldState<>(
-                flinkStateBackend, FlinkStateInternals.this, address, 
namespace, timestampCombiner);
-          }
-        });
+    @Override
+    public WatermarkHoldState bindWatermark(
+        String id, StateSpec<WatermarkHoldState> spec, TimestampCombiner 
timestampCombiner) {
+      return new FlinkWatermarkHoldState<>(
+          flinkStateBackend, watermarkHolds, id, namespace, timestampCombiner);
+    }
   }
 
-  private static class FlinkValueState<K, T> implements ValueState<T> {
+  private static class FlinkValueState<T> implements ValueState<T> {
 
     private final StateNamespace namespace;
-    private final StateTag<ValueState<T>> address;
+    private final String stateId;
     private final ValueStateDescriptor<T> flinkStateDescriptor;
     private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
 
     FlinkValueState(
         KeyedStateBackend<ByteBuffer> flinkStateBackend,
-        StateTag<ValueState<T>> address,
+        String stateId,
         StateNamespace namespace,
         Coder<T> coder) {
 
       this.namespace = namespace;
-      this.address = address;
+      this.stateId = stateId;
       this.flinkStateBackend = flinkStateBackend;
 
-      flinkStateDescriptor =
-          new ValueStateDescriptor<>(address.getId(), new 
CoderTypeSerializer<>(coder));
+      flinkStateDescriptor = new ValueStateDescriptor<>(stateId, new 
CoderTypeSerializer<>(coder));
     }
 
     @Override
@@ -238,15 +255,15 @@ public class FlinkStateInternals<K> implements 
StateInternals {
         return false;
       }
 
-      FlinkValueState<?, ?> that = (FlinkValueState<?, ?>) o;
+      FlinkValueState<?> that = (FlinkValueState<?>) o;
 
-      return namespace.equals(that.namespace) && address.equals(that.address);
+      return namespace.equals(that.namespace) && stateId.equals(that.stateId);
     }
 
     @Override
     public int hashCode() {
       int result = namespace.hashCode();
-      result = 31 * result + address.hashCode();
+      result = 31 * result + stateId.hashCode();
       return result;
     }
   }
@@ -254,22 +271,21 @@ public class FlinkStateInternals<K> implements 
StateInternals {
   private static class FlinkBagState<K, T> implements BagState<T> {
 
     private final StateNamespace namespace;
-    private final StateTag<BagState<T>> address;
+    private final String stateId;
     private final ListStateDescriptor<T> flinkStateDescriptor;
     private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
 
     FlinkBagState(
         KeyedStateBackend<ByteBuffer> flinkStateBackend,
-        StateTag<BagState<T>> address,
+        String stateId,
         StateNamespace namespace,
         Coder<T> coder) {
 
       this.namespace = namespace;
-      this.address = address;
+      this.stateId = stateId;
       this.flinkStateBackend = flinkStateBackend;
 
-      flinkStateDescriptor =
-          new ListStateDescriptor<>(address.getId(), new 
CoderTypeSerializer<>(coder));
+      flinkStateDescriptor = new ListStateDescriptor<>(stateId, new 
CoderTypeSerializer<>(coder));
     }
 
     @Override
@@ -351,13 +367,13 @@ public class FlinkStateInternals<K> implements 
StateInternals {
 
       FlinkBagState<?, ?> that = (FlinkBagState<?, ?>) o;
 
-      return namespace.equals(that.namespace) && address.equals(that.address);
+      return namespace.equals(that.namespace) && stateId.equals(that.stateId);
     }
 
     @Override
     public int hashCode() {
       int result = namespace.hashCode();
-      result = 31 * result + address.hashCode();
+      result = 31 * result + stateId.hashCode();
       return result;
     }
   }
@@ -366,25 +382,25 @@ public class FlinkStateInternals<K> implements 
StateInternals {
       implements CombiningState<InputT, AccumT, OutputT> {
 
     private final StateNamespace namespace;
-    private final StateTag<CombiningState<InputT, AccumT, OutputT>> address;
+    private final String stateId;
     private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
     private final ValueStateDescriptor<AccumT> flinkStateDescriptor;
     private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
 
     FlinkCombiningState(
         KeyedStateBackend<ByteBuffer> flinkStateBackend,
-        StateTag<CombiningState<InputT, AccumT, OutputT>> address,
+        String stateId,
         Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
         StateNamespace namespace,
         Coder<AccumT> accumCoder) {
 
       this.namespace = namespace;
-      this.address = address;
+      this.stateId = stateId;
       this.combineFn = combineFn;
       this.flinkStateBackend = flinkStateBackend;
 
       flinkStateDescriptor =
-          new ValueStateDescriptor<>(address.getId(), new 
CoderTypeSerializer<>(accumCoder));
+          new ValueStateDescriptor<>(stateId, new 
CoderTypeSerializer<>(accumCoder));
     }
 
     @Override
@@ -510,13 +526,13 @@ public class FlinkStateInternals<K> implements 
StateInternals {
 
       FlinkCombiningState<?, ?, ?, ?> that = (FlinkCombiningState<?, ?, ?, ?>) 
o;
 
-      return namespace.equals(that.namespace) && address.equals(that.address);
+      return namespace.equals(that.namespace) && stateId.equals(that.stateId);
     }
 
     @Override
     public int hashCode() {
       int result = namespace.hashCode();
-      result = 31 * result + address.hashCode();
+      result = 31 * result + stateId.hashCode();
       return result;
     }
   }
@@ -525,31 +541,28 @@ public class FlinkStateInternals<K> implements 
StateInternals {
       implements CombiningState<InputT, AccumT, OutputT> {
 
     private final StateNamespace namespace;
-    private final StateTag<CombiningState<InputT, AccumT, OutputT>> address;
+    private final String stateId;
     private final CombineWithContext.CombineFnWithContext<InputT, AccumT, 
OutputT> combineFn;
     private final ValueStateDescriptor<AccumT> flinkStateDescriptor;
     private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
-    private final FlinkStateInternals<K> flinkStateInternals;
     private final CombineWithContext.Context context;
 
     FlinkCombiningStateWithContext(
         KeyedStateBackend<ByteBuffer> flinkStateBackend,
-        StateTag<CombiningState<InputT, AccumT, OutputT>> address,
+        String stateId,
         CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> 
combineFn,
         StateNamespace namespace,
         Coder<AccumT> accumCoder,
-        FlinkStateInternals<K> flinkStateInternals,
         CombineWithContext.Context context) {
 
       this.namespace = namespace;
-      this.address = address;
+      this.stateId = stateId;
       this.combineFn = combineFn;
       this.flinkStateBackend = flinkStateBackend;
-      this.flinkStateInternals = flinkStateInternals;
       this.context = context;
 
       flinkStateDescriptor =
-          new ValueStateDescriptor<>(address.getId(), new 
CoderTypeSerializer<>(accumCoder));
+          new ValueStateDescriptor<>(stateId, new 
CoderTypeSerializer<>(accumCoder));
     }
 
     @Override
@@ -676,40 +689,40 @@ public class FlinkStateInternals<K> implements 
StateInternals {
       FlinkCombiningStateWithContext<?, ?, ?, ?> that =
           (FlinkCombiningStateWithContext<?, ?, ?, ?>) o;
 
-      return namespace.equals(that.namespace) && address.equals(that.address);
+      return namespace.equals(that.namespace) && stateId.equals(that.stateId);
     }
 
     @Override
     public int hashCode() {
       int result = namespace.hashCode();
-      result = 31 * result + address.hashCode();
+      result = 31 * result + stateId.hashCode();
       return result;
     }
   }
 
   private static class FlinkWatermarkHoldState<K, W extends BoundedWindow>
       implements WatermarkHoldState {
-    private final StateTag<WatermarkHoldState> address;
+    private final String stateId;
     private final TimestampCombiner timestampCombiner;
     private final StateNamespace namespace;
     private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
-    private final FlinkStateInternals<K> flinkStateInternals;
+    private final Map<String, Instant> watermarkHolds;
     private final ValueStateDescriptor<Instant> flinkStateDescriptor;
 
     public FlinkWatermarkHoldState(
         KeyedStateBackend<ByteBuffer> flinkStateBackend,
-        FlinkStateInternals<K> flinkStateInternals,
-        StateTag<WatermarkHoldState> address,
+        Map<String, Instant> watermarkHolds,
+        String stateId,
         StateNamespace namespace,
         TimestampCombiner timestampCombiner) {
-      this.address = address;
+      this.stateId = stateId;
       this.timestampCombiner = timestampCombiner;
       this.namespace = namespace;
       this.flinkStateBackend = flinkStateBackend;
-      this.flinkStateInternals = flinkStateInternals;
+      this.watermarkHolds = watermarkHolds;
 
       flinkStateDescriptor =
-          new ValueStateDescriptor<>(address.getId(), new 
CoderTypeSerializer<>(InstantCoder.of()));
+          new ValueStateDescriptor<>(stateId, new 
CoderTypeSerializer<>(InstantCoder.of()));
     }
 
     @Override
@@ -755,11 +768,11 @@ public class FlinkStateInternals<K> implements 
StateInternals {
         Instant current = state.value();
         if (current == null) {
           state.update(value);
-          flinkStateInternals.watermarkHolds.put(namespace.stringKey(), value);
+          watermarkHolds.put(namespace.stringKey(), value);
         } else {
           Instant combined = timestampCombiner.combine(current, value);
           state.update(combined);
-          flinkStateInternals.watermarkHolds.put(namespace.stringKey(), 
combined);
+          watermarkHolds.put(namespace.stringKey(), combined);
         }
       } catch (Exception e) {
         throw new RuntimeException("Error updating state.", e);
@@ -780,7 +793,7 @@ public class FlinkStateInternals<K> implements 
StateInternals {
 
     @Override
     public void clear() {
-      flinkStateInternals.watermarkHolds.remove(namespace.stringKey());
+      watermarkHolds.remove(namespace.stringKey());
       try {
         org.apache.flink.api.common.state.ValueState<Instant> state =
             flinkStateBackend.getPartitionedState(
@@ -802,7 +815,7 @@ public class FlinkStateInternals<K> implements 
StateInternals {
 
       FlinkWatermarkHoldState<?, ?> that = (FlinkWatermarkHoldState<?, ?>) o;
 
-      if (!address.equals(that.address)) {
+      if (!stateId.equals(that.stateId)) {
         return false;
       }
       if (!timestampCombiner.equals(that.timestampCombiner)) {
@@ -813,7 +826,7 @@ public class FlinkStateInternals<K> implements 
StateInternals {
 
     @Override
     public int hashCode() {
-      int result = address.hashCode();
+      int result = stateId.hashCode();
       result = 31 * result + timestampCombiner.hashCode();
       result = 31 * result + namespace.hashCode();
       return result;
@@ -823,22 +836,22 @@ public class FlinkStateInternals<K> implements 
StateInternals {
   private static class FlinkMapState<KeyT, ValueT> implements MapState<KeyT, 
ValueT> {
 
     private final StateNamespace namespace;
-    private final StateTag<MapState<KeyT, ValueT>> address;
+    private final String stateId;
     private final MapStateDescriptor<KeyT, ValueT> flinkStateDescriptor;
     private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
 
     FlinkMapState(
         KeyedStateBackend<ByteBuffer> flinkStateBackend,
-        StateTag<MapState<KeyT, ValueT>> address,
+        String stateId,
         StateNamespace namespace,
         Coder<KeyT> mapKeyCoder,
         Coder<ValueT> mapValueCoder) {
       this.namespace = namespace;
-      this.address = address;
+      this.stateId = stateId;
       this.flinkStateBackend = flinkStateBackend;
       this.flinkStateDescriptor =
           new MapStateDescriptor<>(
-              address.getId(),
+              stateId,
               new CoderTypeSerializer<>(mapKeyCoder),
               new CoderTypeSerializer<>(mapValueCoder));
     }
@@ -996,13 +1009,13 @@ public class FlinkStateInternals<K> implements 
StateInternals {
 
       FlinkMapState<?, ?> that = (FlinkMapState<?, ?>) o;
 
-      return namespace.equals(that.namespace) && address.equals(that.address);
+      return namespace.equals(that.namespace) && stateId.equals(that.stateId);
     }
 
     @Override
     public int hashCode() {
       int result = namespace.hashCode();
-      result = 31 * result + address.hashCode();
+      result = 31 * result + stateId.hashCode();
       return result;
     }
   }
@@ -1010,21 +1023,21 @@ public class FlinkStateInternals<K> implements 
StateInternals {
   private static class FlinkSetState<T> implements SetState<T> {
 
     private final StateNamespace namespace;
-    private final StateTag<SetState<T>> address;
+    private final String stateId;
     private final MapStateDescriptor<T, Boolean> flinkStateDescriptor;
     private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
 
     FlinkSetState(
         KeyedStateBackend<ByteBuffer> flinkStateBackend,
-        StateTag<SetState<T>> address,
+        String stateId,
         StateNamespace namespace,
         Coder<T> coder) {
       this.namespace = namespace;
-      this.address = address;
+      this.stateId = stateId;
       this.flinkStateBackend = flinkStateBackend;
       this.flinkStateDescriptor =
           new MapStateDescriptor<>(
-              address.getId(), new CoderTypeSerializer<>(coder), new 
BooleanSerializer());
+              stateId, new CoderTypeSerializer<>(coder), new 
BooleanSerializer());
     }
 
     @Override
@@ -1147,13 +1160,13 @@ public class FlinkStateInternals<K> implements 
StateInternals {
 
       FlinkSetState<?> that = (FlinkSetState<?>) o;
 
-      return namespace.equals(that.namespace) && address.equals(that.address);
+      return namespace.equals(that.namespace) && stateId.equals(that.stateId);
     }
 
     @Override
     public int hashCode() {
       int result = namespace.hashCode();
-      result = 31 * result + address.hashCode();
+      result = 31 * result + stateId.hashCode();
       return result;
     }
   }

Reply via email to