Repository: beam
Updated Branches:
  refs/heads/master ef56ea495 -> 1866a0113


[BEAM-2401] Update Flink Runner to Flink 1.3.0


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fbc6cc59
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fbc6cc59
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fbc6cc59

Branch: refs/heads/master
Commit: fbc6cc59bff93dfcf8676f874870a43eeb228c15
Parents: ef56ea4
Author: JingsongLi <lzljs3620...@aliyun.com>
Authored: Fri Jun 2 11:31:44 2017 +0800
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Sun Jun 4 08:18:20 2017 +0200

----------------------------------------------------------------------
 runners/flink/pom.xml                           |  2 +-
 .../translation/types/CoderTypeSerializer.java  | 55 ++++++++++++++++++++
 .../types/EncodedValueSerializer.java           | 18 +------
 .../state/FlinkBroadcastStateInternals.java     | 29 +++++------
 .../streaming/UnboundedSourceWrapperTest.java   | 37 +++++++++++++
 5 files changed, 109 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/fbc6cc59/runners/flink/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml
index fb0a67c..92f95a0 100644
--- a/runners/flink/pom.xml
+++ b/runners/flink/pom.xml
@@ -31,7 +31,7 @@
   <packaging>jar</packaging>
 
   <properties>
-    <flink.version>1.2.1</flink.version>
+    <flink.version>1.3.0</flink.version>
   </properties>
 
   <profiles>

http://git-wip-us.apache.org/repos/asf/beam/blob/fbc6cc59/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
index e003119..bea562e 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
@@ -24,7 +24,9 @@ import 
org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
@@ -129,4 +131,57 @@ public class CoderTypeSerializer<T> extends 
TypeSerializer<T> {
   public int hashCode() {
     return coder.hashCode();
   }
+
+  @Override
+  public TypeSerializerConfigSnapshot snapshotConfiguration() {
+    return new CoderTypeSerializerConfigSnapshot<>(coder);
+  }
+
+  @Override
+  public CompatibilityResult<T> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+    if (configSnapshot instanceof CoderTypeSerializerConfigSnapshot) {
+      if (coder.equals(((CoderTypeSerializerConfigSnapshot<?>) 
configSnapshot).coder)) {
+        return CompatibilityResult.compatible();
+      }
+    }
+    return CompatibilityResult.requiresMigration();
+  }
+
+  /**
+   *  TypeSerializerConfigSnapshot of CoderTypeSerializer.
+   */
+  public static class CoderTypeSerializerConfigSnapshot<T> extends 
TypeSerializerConfigSnapshot {
+
+    private static final int VERSION = 1;
+    private Coder<T> coder;
+
+    public CoderTypeSerializerConfigSnapshot(Coder<T> coder) {
+      this.coder = coder;
+    }
+
+    @Override
+    public int getVersion() {
+      return VERSION;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      CoderTypeSerializerConfigSnapshot<?> that = 
(CoderTypeSerializerConfigSnapshot<?>) o;
+
+      return coder != null ? coder.equals(that.coder) : that.coder == null;
+    }
+
+    @Override
+    public int hashCode() {
+      return coder.hashCode();
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/fbc6cc59/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueSerializer.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueSerializer.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueSerializer.java
index c3b9794..c40eb46 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueSerializer.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueSerializer.java
@@ -20,13 +20,14 @@ package org.apache.beam.runners.flink.translation.types;
 import java.io.IOException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
 /**
  * {@link TypeSerializer} for values that were encoded using a {@link Coder}.
  */
-public final class EncodedValueSerializer extends TypeSerializer<byte[]> {
+public final class EncodedValueSerializer extends 
TypeSerializerSingleton<byte[]> {
 
   private static final long serialVersionUID = 1L;
 
@@ -57,7 +58,6 @@ public final class EncodedValueSerializer extends 
TypeSerializer<byte[]> {
     return -1;
   }
 
-
   @Override
   public void serialize(byte[] record, DataOutputView target) throws 
IOException {
     if (record == null) {
@@ -94,18 +94,4 @@ public final class EncodedValueSerializer extends 
TypeSerializer<byte[]> {
     return obj instanceof EncodedValueSerializer;
   }
 
-  @Override
-  public int hashCode() {
-    return this.getClass().hashCode();
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    return obj instanceof EncodedValueSerializer;
-  }
-
-  @Override
-  public TypeSerializer<byte[]> duplicate() {
-    return this;
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/fbc6cc59/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
index f44e668..6cc2429 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
@@ -49,11 +49,11 @@ import org.apache.beam.sdk.util.CombineContextFactory;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
+import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.runtime.state.OperatorStateBackend;
 
 /**
- * {@link StateInternals} that uses a Flink {@link DefaultOperatorStateBackend}
+ * {@link StateInternals} that uses a Flink {@link OperatorStateBackend}
  * to manage the broadcast state.
  * The state is the same on all parallel instances of the operator.
  * So we just need store state of operator-0 in OperatorStateBackend.
@@ -64,13 +64,12 @@ import org.apache.flink.runtime.state.OperatorStateBackend;
 public class FlinkBroadcastStateInternals<K> implements StateInternals {
 
   private int indexInSubtaskGroup;
-  private final DefaultOperatorStateBackend stateBackend;
+  private final OperatorStateBackend stateBackend;
   // stateName -> <namespace, state>
   private Map<String, Map<String, ?>> stateForNonZeroOperator;
 
   public FlinkBroadcastStateInternals(int indexInSubtaskGroup, 
OperatorStateBackend stateBackend) {
-    //TODO flink do not yet expose through public API
-    this.stateBackend = (DefaultOperatorStateBackend) stateBackend;
+    this.stateBackend = stateBackend;
     this.indexInSubtaskGroup = indexInSubtaskGroup;
     if (indexInSubtaskGroup != 0) {
       stateForNonZeroOperator = new HashMap<>();
@@ -178,10 +177,10 @@ public class FlinkBroadcastStateInternals<K> implements 
StateInternals {
     private String name;
     private final StateNamespace namespace;
     private final ListStateDescriptor<Map<String, T>> flinkStateDescriptor;
-    private final DefaultOperatorStateBackend flinkStateBackend;
+    private final OperatorStateStore flinkStateBackend;
 
     AbstractBroadcastState(
-        DefaultOperatorStateBackend flinkStateBackend,
+        OperatorStateBackend flinkStateBackend,
         String name,
         StateNamespace namespace,
         Coder<T> coder) {
@@ -211,7 +210,7 @@ public class FlinkBroadcastStateInternals<K> implements 
StateInternals {
           if (result != null) {
             stateForNonZeroOperator.put(name, result);
             // we don't need it anymore, must clear it.
-            flinkStateBackend.getBroadcastOperatorState(
+            flinkStateBackend.getUnionListState(
                 flinkStateDescriptor).clear();
           }
         }
@@ -220,7 +219,7 @@ public class FlinkBroadcastStateInternals<K> implements 
StateInternals {
     }
 
     Map<String, T> getMapFromBroadcastState() throws Exception {
-      ListState<Map<String, T>> state = 
flinkStateBackend.getBroadcastOperatorState(
+      ListState<Map<String, T>> state = flinkStateBackend.getUnionListState(
           flinkStateDescriptor);
       Iterable<Map<String, T>> iterable = state.get();
       Map<String, T> ret = null;
@@ -239,7 +238,7 @@ public class FlinkBroadcastStateInternals<K> implements 
StateInternals {
      */
     void updateMap(Map<String, T> map) throws Exception {
       if (indexInSubtaskGroup == 0) {
-        ListState<Map<String, T>> state = 
flinkStateBackend.getBroadcastOperatorState(
+        ListState<Map<String, T>> state = flinkStateBackend.getUnionListState(
             flinkStateDescriptor);
         state.clear();
         if (map.size() > 0) {
@@ -304,7 +303,7 @@ public class FlinkBroadcastStateInternals<K> implements 
StateInternals {
     private final StateTag<ValueState<T>> address;
 
     FlinkBroadcastValueState(
-        DefaultOperatorStateBackend flinkStateBackend,
+        OperatorStateBackend flinkStateBackend,
         StateTag<ValueState<T>> address,
         StateNamespace namespace,
         Coder<T> coder) {
@@ -365,7 +364,7 @@ public class FlinkBroadcastStateInternals<K> implements 
StateInternals {
     private final StateTag<BagState<T>> address;
 
     FlinkBroadcastBagState(
-        DefaultOperatorStateBackend flinkStateBackend,
+        OperatorStateBackend flinkStateBackend,
         StateTag<BagState<T>> address,
         StateNamespace namespace,
         Coder<T> coder) {
@@ -454,7 +453,7 @@ public class FlinkBroadcastStateInternals<K> implements 
StateInternals {
     private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
 
     FlinkCombiningState(
-        DefaultOperatorStateBackend flinkStateBackend,
+        OperatorStateBackend flinkStateBackend,
         StateTag<CombiningState<InputT, AccumT, OutputT>> address,
         Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
         StateNamespace namespace,
@@ -572,7 +571,7 @@ public class FlinkBroadcastStateInternals<K> implements 
StateInternals {
     private final FlinkBroadcastStateInternals<K> flinkStateInternals;
 
     FlinkKeyedCombiningState(
-        DefaultOperatorStateBackend flinkStateBackend,
+        OperatorStateBackend flinkStateBackend,
         StateTag<CombiningState<InputT, AccumT, OutputT>> address,
         Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
         StateNamespace namespace,
@@ -709,7 +708,7 @@ public class FlinkBroadcastStateInternals<K> implements 
StateInternals {
     private final CombineWithContext.Context context;
 
     FlinkCombiningStateWithContext(
-        DefaultOperatorStateBackend flinkStateBackend,
+        OperatorStateBackend flinkStateBackend,
         StateTag<CombiningState<InputT, AccumT, OutputT>> address,
         CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> 
combineFn,
         StateNamespace namespace,

http://git-wip-us.apache.org/repos/asf/beam/blob/fbc6cc59/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
index 500fa66..e3875bc 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
@@ -55,9 +55,12 @@ import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.OutputTag;
 import org.junit.Test;
 import org.junit.experimental.runners.Enclosed;
 import org.junit.runner.RunWith;
@@ -127,6 +130,7 @@ public class UnboundedSourceWrapperTest {
       try {
         sourceOperator.open();
         sourceOperator.run(checkpointLock,
+            new TestStreamStatusMaintainer(),
             new 
Output<StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>>() {
               private int count = 0;
 
@@ -135,6 +139,11 @@ public class UnboundedSourceWrapperTest {
               }
 
               @Override
+              public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> 
streamRecord) {
+                collect((StreamRecord) streamRecord);
+              }
+
+              @Override
               public void emitLatencyMarker(LatencyMarker latencyMarker) {
               }
 
@@ -215,6 +224,7 @@ public class UnboundedSourceWrapperTest {
       try {
         sourceOperator.open();
         sourceOperator.run(checkpointLock,
+            new TestStreamStatusMaintainer(),
             new 
Output<StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>>() {
               private int count = 0;
 
@@ -223,6 +233,11 @@ public class UnboundedSourceWrapperTest {
               }
 
               @Override
+              public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> 
streamRecord) {
+                collect((StreamRecord) streamRecord);
+              }
+
+              @Override
               public void emitLatencyMarker(LatencyMarker latencyMarker) {
               }
 
@@ -293,6 +308,7 @@ public class UnboundedSourceWrapperTest {
       try {
         restoredSourceOperator.open();
         restoredSourceOperator.run(checkpointLock,
+            new TestStreamStatusMaintainer(),
             new 
Output<StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>>() {
               private int count = 0;
 
@@ -301,6 +317,11 @@ public class UnboundedSourceWrapperTest {
               }
 
               @Override
+              public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> 
streamRecord) {
+                collect((StreamRecord) streamRecord);
+              }
+
+              @Override
               public void emitLatencyMarker(LatencyMarker latencyMarker) {
               }
 
@@ -462,4 +483,20 @@ public class UnboundedSourceWrapperTest {
 
   }
 
+  private static final class TestStreamStatusMaintainer implements 
StreamStatusMaintainer {
+    StreamStatus currentStreamStatus = StreamStatus.ACTIVE;
+
+    @Override
+    public void toggleStreamStatus(StreamStatus streamStatus) {
+      if (!currentStreamStatus.equals(streamStatus)) {
+        currentStreamStatus = streamStatus;
+      }
+    }
+
+    @Override
+    public StreamStatus getStreamStatus() {
+      return currentStreamStatus;
+    }
+  }
+
 }

Reply via email to