[BEAM-2407] Fix Flink CoderTyperSerializer ConfigSnapshot Before, the config snapshot was not deserializable because there was no default constructor and read()/write() where not implemented.
This also changes the compatibility-check logic to compare the class name of the Coder to avoid serializing the coder using Java Serialization. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/62b942a0 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/62b942a0 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/62b942a0 Branch: refs/heads/DSL_SQL Commit: 62b942a02ec633c172d543946be9cfe0648825ea Parents: b2de3db Author: Aljoscha Krettek <[email protected]> Authored: Mon Jun 5 09:10:59 2017 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Wed Jun 7 19:43:11 2017 +0200 ---------------------------------------------------------------------- .../translation/types/CoderTypeSerializer.java | 41 +++++++++++++++----- 1 file changed, 32 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/62b942a0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java index bea562e..ecfd3fb 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java @@ -19,6 +19,7 @@ package org.apache.beam.runners.flink.translation.types; import java.io.EOFException; import java.io.IOException; +import java.util.Objects; import org.apache.beam.runners.flink.translation.wrappers.DataInputViewWrapper; import org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper; import org.apache.beam.sdk.coders.Coder; @@ -139,24 +140,28 @@ public class CoderTypeSerializer<T> extends TypeSerializer<T> { @Override public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { - if (configSnapshot instanceof CoderTypeSerializerConfigSnapshot) { - if (coder.equals(((CoderTypeSerializerConfigSnapshot<?>) configSnapshot).coder)) { - return CompatibilityResult.compatible(); - } + if (snapshotConfiguration().equals(configSnapshot)) { + return CompatibilityResult.compatible(); } return CompatibilityResult.requiresMigration(); } /** - * TypeSerializerConfigSnapshot of CoderTypeSerializer. + * TypeSerializerConfigSnapshot of CoderTypeSerializer. This uses the class name of the + * {@link Coder} to determine compatibility. This is a bit crude but better than using + * Java Serialization to (de)serialize the {@link Coder}. */ public static class CoderTypeSerializerConfigSnapshot<T> extends TypeSerializerConfigSnapshot { private static final int VERSION = 1; - private Coder<T> coder; + private String coderName; + + public CoderTypeSerializerConfigSnapshot() { + // empty constructor for satisfying IOReadableWritable which is used for deserialization + } public CoderTypeSerializerConfigSnapshot(Coder<T> coder) { - this.coder = coder; + this.coderName = coder.getClass().getCanonicalName(); } @Override @@ -175,13 +180,31 @@ public class CoderTypeSerializer<T> extends TypeSerializer<T> { CoderTypeSerializerConfigSnapshot<?> that = (CoderTypeSerializerConfigSnapshot<?>) o; - return coder != null ? coder.equals(that.coder) : that.coder == null; + return coderName != null ? coderName.equals(that.coderName) : that.coderName == null; + } + + @Override + public void write(DataOutputView out) throws IOException { + super.write(out); + out.writeUTF(coderName); + } + + @Override + public void read(DataInputView in) throws IOException { + super.read(in); + this.coderName = in.readUTF(); } @Override public int hashCode() { - return coder.hashCode(); + return Objects.hash(coderName); } } + @Override + public String toString() { + return "CoderTypeSerializer{" + + "coder=" + coder + + '}'; + } }
