[BEAM-2407] Fix Flink CoderTyperSerializer ConfigSnapshot

Before, the config snapshot was not deserializable because there was no
default constructor and read()/write() where not implemented.

This also changes the compatibility-check logic to compare the class
name of the Coder to avoid serializing the coder using Java
Serialization.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/62b942a0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/62b942a0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/62b942a0

Branch: refs/heads/DSL_SQL
Commit: 62b942a02ec633c172d543946be9cfe0648825ea
Parents: b2de3db
Author: Aljoscha Krettek <[email protected]>
Authored: Mon Jun 5 09:10:59 2017 +0200
Committer: Aljoscha Krettek <[email protected]>
Committed: Wed Jun 7 19:43:11 2017 +0200

----------------------------------------------------------------------
 .../translation/types/CoderTypeSerializer.java  | 41 +++++++++++++++-----
 1 file changed, 32 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/62b942a0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
index bea562e..ecfd3fb 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.flink.translation.types;
 
 import java.io.EOFException;
 import java.io.IOException;
+import java.util.Objects;
 import org.apache.beam.runners.flink.translation.wrappers.DataInputViewWrapper;
 import 
org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper;
 import org.apache.beam.sdk.coders.Coder;
@@ -139,24 +140,28 @@ public class CoderTypeSerializer<T> extends 
TypeSerializer<T> {
 
   @Override
   public CompatibilityResult<T> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
-    if (configSnapshot instanceof CoderTypeSerializerConfigSnapshot) {
-      if (coder.equals(((CoderTypeSerializerConfigSnapshot<?>) 
configSnapshot).coder)) {
-        return CompatibilityResult.compatible();
-      }
+    if (snapshotConfiguration().equals(configSnapshot)) {
+      return CompatibilityResult.compatible();
     }
     return CompatibilityResult.requiresMigration();
   }
 
   /**
-   *  TypeSerializerConfigSnapshot of CoderTypeSerializer.
+   *  TypeSerializerConfigSnapshot of CoderTypeSerializer. This uses the class 
name of the
+   *  {@link Coder} to determine compatibility. This is a bit crude but better 
than using
+   *  Java Serialization to (de)serialize the {@link Coder}.
    */
   public static class CoderTypeSerializerConfigSnapshot<T> extends 
TypeSerializerConfigSnapshot {
 
     private static final int VERSION = 1;
-    private Coder<T> coder;
+    private String coderName;
+
+    public CoderTypeSerializerConfigSnapshot() {
+      // empty constructor for satisfying IOReadableWritable which is used for 
deserialization
+    }
 
     public CoderTypeSerializerConfigSnapshot(Coder<T> coder) {
-      this.coder = coder;
+      this.coderName = coder.getClass().getCanonicalName();
     }
 
     @Override
@@ -175,13 +180,31 @@ public class CoderTypeSerializer<T> extends 
TypeSerializer<T> {
 
       CoderTypeSerializerConfigSnapshot<?> that = 
(CoderTypeSerializerConfigSnapshot<?>) o;
 
-      return coder != null ? coder.equals(that.coder) : that.coder == null;
+      return coderName != null ? coderName.equals(that.coderName) : 
that.coderName == null;
+    }
+
+    @Override
+    public void write(DataOutputView out) throws IOException {
+      super.write(out);
+      out.writeUTF(coderName);
+    }
+
+    @Override
+    public void read(DataInputView in) throws IOException {
+      super.read(in);
+      this.coderName = in.readUTF();
     }
 
     @Override
     public int hashCode() {
-      return coder.hashCode();
+      return Objects.hash(coderName);
     }
   }
 
+  @Override
+  public String toString() {
+    return "CoderTypeSerializer{"
+        + "coder=" + coder
+        + '}';
+  }
 }

Reply via email to