[FLINK-6883] [core] Refactor TypeSerializer to not implement TypeDeserializer

The separation of the TypeDeserializer interface from the TypeSerializer
base class is due to the fact that additionally implementing the
TypeDeserializer interface alters the generation order of anonymos
serializer classes for Scala case classes and collections.

Instead, the TypeDeserializer is now used as a mixin on the
TypeDeserializerAdapter utility, which now serves as a bridge for
both directions (i.e. TypeSerializer to TypeDeserializer, and vice
versa). No user interfaces are broken due to this change.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/39c8270d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/39c8270d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/39c8270d

Branch: refs/heads/release-1.3
Commit: 39c8270d39684765484fa4b6b2711e5714b81b64
Parents: 57421f9
Author: Tzu-Li (Gordon) Tai <[email protected]>
Authored: Sun Jun 11 15:30:36 2017 +0200
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Tue Jun 13 07:20:53 2017 +0200

----------------------------------------------------------------------
 .../common/typeutils/CompatibilityResult.java   | 24 +++++++++++-
 .../typeutils/TypeDeserializerAdapter.java      | 40 ++++++++++++++------
 .../api/common/typeutils/TypeSerializer.java    |  2 +-
 3 files changed, 51 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/39c8270d/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java
index 4c83ded..1e05d57 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java
@@ -60,6 +60,8 @@ public final class CompatibilityResult<T> {
         * @param convertDeserializer the convert deserializer to use, in the 
case that the preceding serializer
         *                            cannot be found.
         *
+        * @param <T> the type of the data being migrated.
+        *
         * @return a result that signals migration is necessary, also providing 
a convert deserializer.
         */
        public static <T> CompatibilityResult<T> requiresMigration(@Nonnull 
TypeDeserializer<T> convertDeserializer) {
@@ -69,11 +71,29 @@ public final class CompatibilityResult<T> {
        }
 
        /**
+        * Returns a result that signals migration to be performed, and in the 
case that the preceding serializer
+        * cannot be found or restored to read the previous data during 
migration, a provided convert serializer
+        * can be used. The provided serializer will only be used for 
deserialization.
+        *
+        * @param convertSerializer the convert serializer to use, in the case 
that the preceding serializer
+        *                          cannot be found. The provided serializer 
will only be used for deserialization.
+        *
+        * @param <T> the type of the data being migrated.
+        *
+        * @return a result that signals migration is necessary, also providing 
a convert serializer.
+        */
+       public static <T> CompatibilityResult<T> requiresMigration(@Nonnull 
TypeSerializer<T> convertSerializer) {
+               Preconditions.checkNotNull(convertSerializer, "Convert 
serializer cannot be null.");
+
+               return new CompatibilityResult<>(true, new 
TypeDeserializerAdapter<>(convertSerializer));
+       }
+
+       /**
         * Returns a result that signals migration to be performed. The 
migration will fail if the preceding
         * serializer for the previous data cannot be found.
         *
-        * <p>You can also provide a convert deserializer using {@link 
#requiresMigration(TypeDeserializer)},
-        * which will be used as a fallback resort in such cases.
+        * <p>You can also provide a convert deserializer using {@link 
#requiresMigration(TypeDeserializer)}
+        * or {@link #requiresMigration(TypeSerializer)}, which will be used as 
a fallback resort in such cases.
         *
         * @return a result that signals migration is necessary, without 
providing a convert deserializer.
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/39c8270d/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeDeserializerAdapter.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeDeserializerAdapter.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeDeserializerAdapter.java
index e02bed4..fb59602 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeDeserializerAdapter.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeDeserializerAdapter.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.common.typeutils;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.Preconditions;
@@ -25,27 +26,42 @@ import org.apache.flink.util.Preconditions;
 import java.io.IOException;
 
 /**
- * A utility class that wraps a {@link TypeDeserializer} as a {@link 
TypeSerializer}.
+ * A utility class that is used to bridge a {@link TypeSerializer} and {@link 
TypeDeserializer}.
+ * It either wraps a type deserializer or serializer, and can only ever be 
used for deserialization
+ * (i.e. only read-related methods is functional).
  *
- * <p>Methods related to deserialization are directly forwarded to the wrapped 
deserializer,
+ * <p>Methods related to deserialization are directly forwarded to the wrapped 
deserializer or serializer,
  * while serialization methods are masked and not intended for use.
  *
  * @param <T> The data type that the deserializer deserializes.
  */
-public final class TypeDeserializerAdapter<T> extends TypeSerializer<T> {
+@Internal
+public final class TypeDeserializerAdapter<T> extends TypeSerializer<T> 
implements TypeDeserializer<T> {
 
        private static final long serialVersionUID = 1L;
 
-       /** The actual wrapped deserializer instance */
+       /** The actual wrapped deserializer or serializer instance */
        private final TypeDeserializer<T> deserializer;
+       private final TypeSerializer<T> serializer;
 
        /**
-        * Creates a {@link TypeSerializer} that wraps a {@link 
TypeDeserializer}.
+        * Creates a {@link TypeDeserializerAdapter} that wraps a {@link 
TypeDeserializer}.
         *
         * @param deserializer the actual deserializer to wrap.
         */
        public TypeDeserializerAdapter(TypeDeserializer<T> deserializer) {
                this.deserializer = Preconditions.checkNotNull(deserializer);
+               this.serializer = null;
+       }
+
+       /**
+        * Creates a {@link TypeDeserializerAdapter} that wraps a {@link 
TypeSerializer}.
+        *
+        * @param serializer the actual serializer to wrap.
+        */
+       public TypeDeserializerAdapter(TypeSerializer<T> serializer) {
+               this.deserializer = null;
+               this.serializer = Preconditions.checkNotNull(serializer);
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -53,31 +69,31 @@ public final class TypeDeserializerAdapter<T> extends 
TypeSerializer<T> {
        // 
--------------------------------------------------------------------------------------------
 
        public T deserialize(DataInputView source) throws IOException {
-               return deserializer.deserialize(source);
+               return (deserializer != null) ? 
deserializer.deserialize(source) : serializer.deserialize(source);
        }
 
        public T deserialize(T reuse, DataInputView source) throws IOException {
-               return deserializer.deserialize(reuse, source);
+               return (deserializer != null) ? deserializer.deserialize(reuse, 
source) : serializer.deserialize(reuse, source);
        }
 
        public TypeSerializer<T> duplicate() {
-               return deserializer.duplicate();
+               return (deserializer != null) ? deserializer.duplicate() : 
serializer.duplicate();
        }
 
        public int getLength() {
-               return deserializer.getLength();
+               return (deserializer != null) ? deserializer.getLength() : 
serializer.getLength();
        }
 
        public boolean equals(Object obj) {
-               return deserializer.equals(obj);
+               return (deserializer != null) ? deserializer.equals(obj) : 
serializer.equals(obj);
        }
 
        public boolean canEqual(Object obj) {
-               return deserializer.canEqual(obj);
+               return (deserializer != null) ? deserializer.canEqual(obj) : 
serializer.canEqual(obj);
        }
 
        public int hashCode() {
-               return deserializer.hashCode();
+               return (deserializer != null) ? deserializer.hashCode() : 
serializer.hashCode();
        }
 
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/39c8270d/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
index 85cbfdb..a606a18 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
@@ -36,7 +36,7 @@ import java.io.Serializable;
  * @param <T> The data type that the serializer serializes.
  */
 @PublicEvolving
-public abstract class TypeSerializer<T> implements TypeDeserializer<T>, 
Serializable {
+public abstract class TypeSerializer<T> implements Serializable {
        
        private static final long serialVersionUID = 1L;
 

Reply via email to