[FLINK-6883] [core] Refactor TypeSerializer to not implement TypeDeserializer

The separation of the TypeDeserializer interface from the TypeSerializer
base class is due to the fact that additionally implementing the
TypeDeserializer interface alters the generation order of anonymos
serializer classes for Scala case classes and collections.

Instead, the TypeDeserializer is now used as a mixin on the
TypeDeserializerAdapter utility, which now serves as a bridge for
both directions (i.e. TypeSerializer to TypeDeserializer, and vice
versa). No user interfaces are broken due to this change.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/69fada3d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/69fada3d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/69fada3d

Branch: refs/heads/master
Commit: 69fada3d0b4c686f29c356f00eb49039f416879f
Parents: 8d0c4c0
Author: Tzu-Li (Gordon) Tai <[email protected]>
Authored: Sun Jun 11 15:30:36 2017 +0200
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Tue Jun 13 06:37:46 2017 +0200

----------------------------------------------------------------------
 .../common/typeutils/CompatibilityResult.java   | 24 +++++++++++-
 .../typeutils/TypeDeserializerAdapter.java      | 40 ++++++++++++++------
 .../api/common/typeutils/TypeSerializer.java    |  2 +-
 3 files changed, 51 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/69fada3d/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java
index 4c83ded..1e05d57 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java
@@ -60,6 +60,8 @@ public final class CompatibilityResult<T> {
         * @param convertDeserializer the convert deserializer to use, in the 
case that the preceding serializer
         *                            cannot be found.
         *
+        * @param <T> the type of the data being migrated.
+        *
         * @return a result that signals migration is necessary, also providing 
a convert deserializer.
         */
        public static <T> CompatibilityResult<T> requiresMigration(@Nonnull 
TypeDeserializer<T> convertDeserializer) {
@@ -69,11 +71,29 @@ public final class CompatibilityResult<T> {
        }
 
        /**
+        * Returns a result that signals migration to be performed, and in the 
case that the preceding serializer
+        * cannot be found or restored to read the previous data during 
migration, a provided convert serializer
+        * can be used. The provided serializer will only be used for 
deserialization.
+        *
+        * @param convertSerializer the convert serializer to use, in the case 
that the preceding serializer
+        *                          cannot be found. The provided serializer 
will only be used for deserialization.
+        *
+        * @param <T> the type of the data being migrated.
+        *
+        * @return a result that signals migration is necessary, also providing 
a convert serializer.
+        */
+       public static <T> CompatibilityResult<T> requiresMigration(@Nonnull 
TypeSerializer<T> convertSerializer) {
+               Preconditions.checkNotNull(convertSerializer, "Convert 
serializer cannot be null.");
+
+               return new CompatibilityResult<>(true, new 
TypeDeserializerAdapter<>(convertSerializer));
+       }
+
+       /**
         * Returns a result that signals migration to be performed. The 
migration will fail if the preceding
         * serializer for the previous data cannot be found.
         *
-        * <p>You can also provide a convert deserializer using {@link 
#requiresMigration(TypeDeserializer)},
-        * which will be used as a fallback resort in such cases.
+        * <p>You can also provide a convert deserializer using {@link 
#requiresMigration(TypeDeserializer)}
+        * or {@link #requiresMigration(TypeSerializer)}, which will be used as 
a fallback resort in such cases.
         *
         * @return a result that signals migration is necessary, without 
providing a convert deserializer.
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/69fada3d/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeDeserializerAdapter.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeDeserializerAdapter.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeDeserializerAdapter.java
index e02bed4..fb59602 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeDeserializerAdapter.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeDeserializerAdapter.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.common.typeutils;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.Preconditions;
@@ -25,27 +26,42 @@ import org.apache.flink.util.Preconditions;
 import java.io.IOException;
 
 /**
- * A utility class that wraps a {@link TypeDeserializer} as a {@link 
TypeSerializer}.
+ * A utility class that is used to bridge a {@link TypeSerializer} and {@link 
TypeDeserializer}.
+ * It either wraps a type deserializer or serializer, and can only ever be 
used for deserialization
+ * (i.e. only read-related methods is functional).
  *
- * <p>Methods related to deserialization are directly forwarded to the wrapped 
deserializer,
+ * <p>Methods related to deserialization are directly forwarded to the wrapped 
deserializer or serializer,
  * while serialization methods are masked and not intended for use.
  *
  * @param <T> The data type that the deserializer deserializes.
  */
-public final class TypeDeserializerAdapter<T> extends TypeSerializer<T> {
+@Internal
+public final class TypeDeserializerAdapter<T> extends TypeSerializer<T> 
implements TypeDeserializer<T> {
 
        private static final long serialVersionUID = 1L;
 
-       /** The actual wrapped deserializer instance */
+       /** The actual wrapped deserializer or serializer instance */
        private final TypeDeserializer<T> deserializer;
+       private final TypeSerializer<T> serializer;
 
        /**
-        * Creates a {@link TypeSerializer} that wraps a {@link 
TypeDeserializer}.
+        * Creates a {@link TypeDeserializerAdapter} that wraps a {@link 
TypeDeserializer}.
         *
         * @param deserializer the actual deserializer to wrap.
         */
        public TypeDeserializerAdapter(TypeDeserializer<T> deserializer) {
                this.deserializer = Preconditions.checkNotNull(deserializer);
+               this.serializer = null;
+       }
+
+       /**
+        * Creates a {@link TypeDeserializerAdapter} that wraps a {@link 
TypeSerializer}.
+        *
+        * @param serializer the actual serializer to wrap.
+        */
+       public TypeDeserializerAdapter(TypeSerializer<T> serializer) {
+               this.deserializer = null;
+               this.serializer = Preconditions.checkNotNull(serializer);
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -53,31 +69,31 @@ public final class TypeDeserializerAdapter<T> extends 
TypeSerializer<T> {
        // 
--------------------------------------------------------------------------------------------
 
        public T deserialize(DataInputView source) throws IOException {
-               return deserializer.deserialize(source);
+               return (deserializer != null) ? 
deserializer.deserialize(source) : serializer.deserialize(source);
        }
 
        public T deserialize(T reuse, DataInputView source) throws IOException {
-               return deserializer.deserialize(reuse, source);
+               return (deserializer != null) ? deserializer.deserialize(reuse, 
source) : serializer.deserialize(reuse, source);
        }
 
        public TypeSerializer<T> duplicate() {
-               return deserializer.duplicate();
+               return (deserializer != null) ? deserializer.duplicate() : 
serializer.duplicate();
        }
 
        public int getLength() {
-               return deserializer.getLength();
+               return (deserializer != null) ? deserializer.getLength() : 
serializer.getLength();
        }
 
        public boolean equals(Object obj) {
-               return deserializer.equals(obj);
+               return (deserializer != null) ? deserializer.equals(obj) : 
serializer.equals(obj);
        }
 
        public boolean canEqual(Object obj) {
-               return deserializer.canEqual(obj);
+               return (deserializer != null) ? deserializer.canEqual(obj) : 
serializer.canEqual(obj);
        }
 
        public int hashCode() {
-               return deserializer.hashCode();
+               return (deserializer != null) ? deserializer.hashCode() : 
serializer.hashCode();
        }
 
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/69fada3d/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
index 85cbfdb..a606a18 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
@@ -36,7 +36,7 @@ import java.io.Serializable;
  * @param <T> The data type that the serializer serializes.
  */
 @PublicEvolving
-public abstract class TypeSerializer<T> implements TypeDeserializer<T>, 
Serializable {
+public abstract class TypeSerializer<T> implements Serializable {
        
        private static final long serialVersionUID = 1L;
 

Reply via email to