[FLINK-6554] [core] Make CompatibilityResult options more explicitly defined

Previously, if a serializer determines that state migration needs to be
performed but could not provide a fallback convert deserializer, it
would use CompatibilityResult.requiresMigration(null).

This commit makes this option more explicit by having a
CompatibilityResult.requiresMigration() option that takes no parameters.
This improves how the user perceives the API without having to rely on
the Javadoc that it is allowed to have no fallback convert deserializer.

Consequently, when using
CompatibilityResult.requiresMigration(TypeDeserializer), the provided
argument cannot be null.

This closes #3886.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/947c44e8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/947c44e8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/947c44e8

Branch: refs/heads/master
Commit: 947c44e862396baa95e74cbdc50a4c7cd3befe9b
Parents: 347100d
Author: Tzu-Li (Gordon) Tai <[email protected]>
Authored: Fri May 12 21:00:51 2017 +0800
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Sat May 13 15:00:33 2017 +0800

----------------------------------------------------------------------
 .../typeutils/runtime/WritableSerializer.java   |  2 +-
 .../state/RocksDBKeyedStateBackend.java         |  2 +-
 .../common/typeutils/CompatibilityResult.java   | 36 ++++++++++++++------
 .../api/common/typeutils/TypeSerializer.java    |  9 +++--
 .../common/typeutils/base/EnumSerializer.java   |  2 +-
 .../typeutils/base/GenericArraySerializer.java  |  6 ++--
 .../common/typeutils/base/ListSerializer.java   |  4 +--
 .../common/typeutils/base/MapSerializer.java    |  4 +--
 .../typeutils/base/TypeSerializerSingleton.java |  2 +-
 .../java/typeutils/runtime/AvroSerializer.java  |  4 +--
 .../runtime/CopyableValueSerializer.java        |  2 +-
 .../typeutils/runtime/EitherSerializer.java     |  4 +--
 .../java/typeutils/runtime/PojoSerializer.java  | 16 ++++-----
 .../java/typeutils/runtime/RowSerializer.java   |  6 ++--
 .../typeutils/runtime/TupleSerializerBase.java  |  6 ++--
 .../java/typeutils/runtime/ValueSerializer.java |  2 +-
 .../typeutils/runtime/kryo/KryoSerializer.java  |  4 +--
 .../common/typeutils/SerializerTestBase.java    |  4 +--
 .../typeutils/base/EnumSerializerTest.java      |  6 ++--
 .../typeutils/runtime/PojoSerializerTest.java   | 10 +++---
 .../kryo/KryoSerializerCompatibilityTest.java   |  4 +--
 .../AbstractKeyedCEPPatternOperator.java        |  4 +--
 .../table/runtime/types/CRowSerializer.scala    |  6 ++--
 .../runtime/state/ArrayListSerializer.java      |  4 +--
 .../flink/runtime/state/HashMapSerializer.java  |  4 +--
 .../flink/runtime/state/StateMigrationUtil.java |  4 +--
 .../api/scala/typeutils/EitherSerializer.scala  |  8 +++--
 .../scala/typeutils/EnumValueSerializer.scala   |  6 ++--
 .../api/scala/typeutils/OptionSerializer.scala  |  6 ++--
 .../api/scala/typeutils/TrySerializer.scala     |  6 ++--
 .../MultiplexingStreamRecordSerializer.java     |  4 +--
 .../streamrecord/StreamRecordSerializer.java    |  6 ++--
 .../streamrecord/StreamElementSerializer.java   |  4 +--
 33 files changed, 109 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
 
b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
index 1a02e7b..421d7a3 100644
--- 
a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
@@ -171,7 +171,7 @@ public final class WritableSerializer<T extends Writable> 
extends TypeSerializer
 
                        return CompatibilityResult.compatible();
                } else {
-                       return CompatibilityResult.requiresMigration(null);
+                       return CompatibilityResult.requiresMigration();
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index f5dddd6..6af53c3 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -1520,7 +1520,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                        
restoredMetaInfo.getStateSerializerConfigSnapshot(),
                                        newMetaInfo.getStateSerializer());
 
-                       if (!namespaceCompatibility.requiresMigration() && 
!stateCompatibility.requiresMigration()) {
+                       if (!namespaceCompatibility.isRequiresMigration() && 
!stateCompatibility.isRequiresMigration()) {
                                stateInfo.f1 = newMetaInfo;
                                return stateInfo.f0;
                        } else {

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java
index 891cfe0..5ad0b5e 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.common.typeutils;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.Preconditions;
 
 /**
  * A {@code CompatibilityResult} contains information about whether or not 
data migration
@@ -41,7 +42,7 @@ public final class CompatibilityResult<T> {
        private final TypeDeserializer<T> convertDeserializer;
 
        /**
-        * Returns a strategy that signals that the new serializer is 
compatible and no migration is required.
+        * Returns a result that signals that the new serializer is compatible 
and no migration is required.
         *
         * @return a result that signals migration is not required for the new 
serializer
         */
@@ -50,19 +51,32 @@ public final class CompatibilityResult<T> {
        }
 
        /**
-        * Returns a strategy that signals migration to be performed.
+        * Returns a result that signals migration to be performed, and in the 
case that the preceding serializer
+        * cannot be found or restored to read the previous data during 
migration, a provided convert deserializer
+        * can be used.
         *
-        * <p>Furthermore, in the case that the preceding serializer cannot be 
found or restored to read the
-        * previous data during migration, a provided convert deserializer can 
be used (may be {@code null}
-        * if one cannot be provided).
+        * @param convertDeserializer the convert deserializer to use, in the 
case that the preceding serializer
+        *                            cannot be found.
         *
-        * <p>In the case that the preceding serializer cannot be found and a 
convert deserializer is not
-        * provided, the migration will fail due to the incapability of reading 
previous data.
-        *
-        * @return a result that signals migration is necessary, possibly 
providing a convert deserializer.
+        * @return a result that signals migration is necessary, also providing 
a convert deserializer.
         */
        public static <T> CompatibilityResult<T> 
requiresMigration(TypeDeserializer<T> convertDeserializer) {
-               return new CompatibilityResult<>(true, convertDeserializer);
+               Preconditions.checkNotNull(convertDeserializer, "Convert 
deserializer cannot be null.");
+
+               return new CompatibilityResult<>(true, 
Preconditions.checkNotNull(convertDeserializer));
+       }
+
+       /**
+        * Returns a result that signals migration to be performed. The 
migration will fail if the preceding
+        * serializer for the previous data cannot be found.
+        *
+        * <p>You can also provide a convert deserializer using {@link 
#requiresMigration(TypeDeserializer)},
+        * which will be used as a fallback resort in such cases.
+        *
+        * @return a result that signals migration is necessary, without 
providing a convert deserializer.
+        */
+       public static <T> CompatibilityResult<T> requiresMigration() {
+               return new CompatibilityResult<>(true, null);
        }
 
        private CompatibilityResult(boolean requiresMigration, 
TypeDeserializer<T> convertDeserializer) {
@@ -74,7 +88,7 @@ public final class CompatibilityResult<T> {
                return convertDeserializer;
        }
 
-       public boolean requiresMigration() {
+       public boolean isRequiresMigration() {
                return requiresMigration;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
index 0b5a08a..85cbfdb 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
@@ -201,14 +201,19 @@ public abstract class TypeSerializer<T> implements 
TypeDeserializer<T>, Serializ
         *     migration needs to be performed, because this serializer is not 
compatible, or cannot be reconfigured to be
         *     compatible, for previous data. Furthermore, in the case that the 
preceding serializer cannot be found or
         *     restored to read the previous data to perform the migration, the 
provided convert deserializer can be
-        *     used (may be {@code null} if one cannot be provided).</li>
+        *     used as a fallback resort.</li>
+        *
+        *     <li>{@link CompatibilityResult#requiresMigration()}: this 
signals Flink that migration needs to be
+        *     performed, because this serializer is not compatible, or cannot 
be reconfigured to be compatible, for
+        *     previous data. If the preceding serializer cannot be found 
(either its implementation changed or it was
+        *     removed from the classpath) then the migration will fail due to 
incapability to read previous data.</li>
         * </ul>
         *
         * @see CompatibilityResult
         *
         * @param configSnapshot configuration snapshot of a preceding 
serializer for the same managed state
         *
-        * @return the determined compatibility result.
+        * @return the determined compatibility result (cannot be {@code null}).
         */
        public abstract CompatibilityResult<T> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
index 2f74d84..d9246ae 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
@@ -205,7 +205,7 @@ public final class EnumSerializer<T extends Enum<T>> 
extends TypeSerializer<T> {
                        }
                }
 
-               return CompatibilityResult.requiresMigration(null);
+               return CompatibilityResult.requiresMigration();
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
index 3e592b4..54c604c 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
@@ -208,8 +208,8 @@ public final class GenericArraySerializer<C> extends 
TypeSerializer<C[]> {
                                CompatibilityResult<C> compatResult = 
componentSerializer.ensureCompatibility(
                                        
config.getSingleNestedSerializerConfigSnapshot());
 
-                               if (!compatResult.requiresMigration()) {
-                                       return 
CompatibilityResult.requiresMigration(null);
+                               if (!compatResult.isRequiresMigration()) {
+                                       return CompatibilityResult.compatible();
                                } else if 
(compatResult.getConvertDeserializer() != null) {
                                        return 
CompatibilityResult.requiresMigration(
                                                new GenericArraySerializer<>(
@@ -219,6 +219,6 @@ public final class GenericArraySerializer<C> extends 
TypeSerializer<C[]> {
                        }
                }
 
-               return CompatibilityResult.requiresMigration(null);
+               return CompatibilityResult.requiresMigration();
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
index 1b6540c..aa9808e 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
@@ -185,7 +185,7 @@ public final class ListSerializer<T> extends 
TypeSerializer<List<T>> {
                        CompatibilityResult<T> compatResult = 
elementSerializer.ensureCompatibility(
                                ((CollectionSerializerConfigSnapshot) 
configSnapshot).getSingleNestedSerializerConfigSnapshot());
 
-                       if (!compatResult.requiresMigration()) {
+                       if (!compatResult.isRequiresMigration()) {
                                return CompatibilityResult.compatible();
                        } else if (compatResult.getConvertDeserializer() != 
null) {
                                return CompatibilityResult.requiresMigration(
@@ -193,6 +193,6 @@ public final class ListSerializer<T> extends 
TypeSerializer<List<T>> {
                        }
                }
 
-               return CompatibilityResult.requiresMigration(null);
+               return CompatibilityResult.requiresMigration();
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
index 182fff6..d5d6ec8 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
@@ -216,7 +216,7 @@ public final class MapSerializer<K, V> extends 
TypeSerializer<Map<K, V>> {
                        CompatibilityResult<K> keyCompatResult = 
keySerializer.ensureCompatibility(keyValueSerializerConfigSnapshots[0]);
                        CompatibilityResult<V> valueCompatResult = 
valueSerializer.ensureCompatibility(keyValueSerializerConfigSnapshots[1]);
 
-                       if (!keyCompatResult.requiresMigration() && 
!valueCompatResult.requiresMigration()) {
+                       if (!keyCompatResult.isRequiresMigration() && 
!valueCompatResult.isRequiresMigration()) {
                                return CompatibilityResult.compatible();
                        } else if (keyCompatResult.getConvertDeserializer() != 
null && valueCompatResult.getConvertDeserializer() != null) {
                                return CompatibilityResult.requiresMigration(
@@ -226,6 +226,6 @@ public final class MapSerializer<K, V> extends 
TypeSerializer<Map<K, V>> {
                        }
                }
 
-               return CompatibilityResult.requiresMigration(null);
+               return CompatibilityResult.requiresMigration();
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
index c5decc5..9354af0 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
@@ -66,7 +66,7 @@ public abstract class TypeSerializerSingleton<T> extends 
TypeSerializer<T>{
 
                        return CompatibilityResult.compatible();
                } else {
-                       return CompatibilityResult.requiresMigration(null);
+                       return CompatibilityResult.requiresMigration();
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
index c9eeb34..565bd4d 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
@@ -238,7 +238,7 @@ public final class AvroSerializer<T> extends 
TypeSerializer<T> {
 
                                for (Map.Entry<String, KryoRegistration> 
reconfiguredRegistrationEntry : kryoRegistrations.entrySet()) {
                                        if 
(reconfiguredRegistrationEntry.getValue().isDummy()) {
-                                               return 
CompatibilityResult.requiresMigration(null);
+                                               return 
CompatibilityResult.requiresMigration();
                                        }
                                }
 
@@ -249,7 +249,7 @@ public final class AvroSerializer<T> extends 
TypeSerializer<T> {
 
                // ends up here if the preceding serializer is not
                // the ValueSerializer, or serialized data type has changed
-               return CompatibilityResult.requiresMigration(null);
+               return CompatibilityResult.requiresMigration();
        }
 
        public static class AvroSerializerConfigSnapshot<T> extends 
KryoRegistrationSerializerConfigSnapshot<T> {

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
index 46b93c2..b903969 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
@@ -147,7 +147,7 @@ public final class CopyableValueSerializer<T extends 
CopyableValue<T>> extends T
                                && 
valueClass.equals(((CopyableValueSerializerConfigSnapshot) 
configSnapshot).getTypeClass())) {
                        return CompatibilityResult.compatible();
                } else {
-                       return CompatibilityResult.requiresMigration(null);
+                       return CompatibilityResult.requiresMigration();
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
index 461dd87..4373ee0 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
@@ -207,7 +207,7 @@ public class EitherSerializer<L, R> extends 
TypeSerializer<Either<L, R>> {
                        CompatibilityResult<L> leftCompatResult = 
leftSerializer.ensureCompatibility(leftRightSerializerConfigSnapshots[0]);
                        CompatibilityResult<R> rightCompatResult = 
rightSerializer.ensureCompatibility(leftRightSerializerConfigSnapshots[1]);
 
-                       if (!leftCompatResult.requiresMigration() && 
!rightCompatResult.requiresMigration()) {
+                       if (!leftCompatResult.isRequiresMigration() && 
!rightCompatResult.isRequiresMigration()) {
                                return CompatibilityResult.compatible();
                        } else {
                                if (leftCompatResult.getConvertDeserializer() 
!= null && rightCompatResult.getConvertDeserializer() != null) {
@@ -219,6 +219,6 @@ public class EitherSerializer<L, R> extends 
TypeSerializer<Either<L, R>> {
                        }
                }
 
-               return CompatibilityResult.requiresMigration(null);
+               return CompatibilityResult.requiresMigration();
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
index 08da49e..a8368c4 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
@@ -580,13 +580,13 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
                                                        reorderedFields[i] = 
fieldToConfigSnapshotEntry.getKey();
 
                                                        compatResult = 
fieldSerializers[fieldIndex].ensureCompatibility(fieldToConfigSnapshotEntry.getValue());
-                                                       if 
(compatResult.requiresMigration()) {
-                                                               return 
CompatibilityResult.requiresMigration(null);
+                                                       if 
(compatResult.isRequiresMigration()) {
+                                                               return 
CompatibilityResult.requiresMigration();
                                                        } else {
                                                                
reorderedFieldSerializers[i] = fieldSerializers[fieldIndex];
                                                        }
                                                } else {
-                                                       return 
CompatibilityResult.requiresMigration(null);
+                                                       return 
CompatibilityResult.requiresMigration();
                                                }
 
                                                i++;
@@ -618,8 +618,8 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
                                        for (TypeSerializerConfigSnapshot 
previousRegisteredSerializerConfig : previousRegistrations.values()) {
                                                // check compatibility of 
subclass serializer
                                                compatResult = 
reorderedRegisteredSubclassSerializers[i].ensureCompatibility(previousRegisteredSerializerConfig);
-                                               if 
(compatResult.requiresMigration()) {
-                                                       return 
CompatibilityResult.requiresMigration(null);
+                                               if 
(compatResult.isRequiresMigration()) {
+                                                       return 
CompatibilityResult.requiresMigration();
                                                }
 
                                                i++;
@@ -638,8 +638,8 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
 
                                                // check compatibility of 
cached subclass serializer
                                                compatResult = 
cachedSerializer.ensureCompatibility(previousCachedEntry.getValue());
-                                               if 
(compatResult.requiresMigration()) {
-                                                       return 
CompatibilityResult.requiresMigration(null);
+                                               if 
(compatResult.isRequiresMigration()) {
+                                                       return 
CompatibilityResult.requiresMigration();
                                                } else {
                                                        
rebuiltCache.put(previousCachedEntry.getKey(), cachedSerializer);
                                                }
@@ -661,7 +661,7 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
                        }
                }
 
-               return CompatibilityResult.requiresMigration(null);
+               return CompatibilityResult.requiresMigration();
        }
 
        public static final class PojoSerializerConfigSnapshot<T> extends 
GenericTypeSerializerConfigSnapshot<T> {

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
index 075c9d3..ba41d4b 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
@@ -270,12 +270,12 @@ public final class RowSerializer extends 
TypeSerializer<Row> {
                                CompatibilityResult<?> compatResult;
                                for (int i = 0; i < fieldSerializers.length; 
i++) {
                                        compatResult = 
fieldSerializers[i].ensureCompatibility(fieldSerializerConfigSnapshots[i]);
-                                       if (compatResult.requiresMigration()) {
+                                       if (compatResult.isRequiresMigration()) 
{
                                                requireMigration = true;
 
                                                if 
(compatResult.getConvertDeserializer() == null) {
                                                        // one of the field 
serializers cannot provide a fallback deserializer
-                                                       return 
CompatibilityResult.requiresMigration(null);
+                                                       return 
CompatibilityResult.requiresMigration();
                                                } else {
                                                        convertDeserializers[i] 
=
                                                                new 
TypeDeserializerAdapter<>(compatResult.getConvertDeserializer());
@@ -291,7 +291,7 @@ public final class RowSerializer extends 
TypeSerializer<Row> {
                        }
                }
 
-               return CompatibilityResult.requiresMigration(null);
+               return CompatibilityResult.requiresMigration();
        }
 
        public static final class RowSerializerConfigSnapshot extends 
CompositeTypeSerializerConfigSnapshot {

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
index 68d5aa8..032c3f1 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
@@ -145,8 +145,8 @@ public abstract class TupleSerializerBase<T> extends 
TypeSerializer<T> {
                                        CompatibilityResult compatResult;
                                        for (int i = 0; i < 
fieldSerializers.length; i++) {
                                                compatResult = 
fieldSerializers[i].ensureCompatibility(fieldSerializerConfigSnapshots[i]);
-                                               if 
(compatResult.requiresMigration()) {
-                                                       return 
CompatibilityResult.requiresMigration(null);
+                                               if 
(compatResult.isRequiresMigration()) {
+                                                       return 
CompatibilityResult.requiresMigration();
                                                }
                                        }
 
@@ -155,6 +155,6 @@ public abstract class TupleSerializerBase<T> extends 
TypeSerializer<T> {
                        }
                }
 
-               return CompatibilityResult.requiresMigration(null);
+               return CompatibilityResult.requiresMigration();
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
index 10e2330..0a028eb 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
@@ -193,7 +193,7 @@ public final class ValueSerializer<T extends Value> extends 
TypeSerializer<T> {
                        }
                }
 
-               return CompatibilityResult.requiresMigration(null);
+               return CompatibilityResult.requiresMigration();
        }
 
        public static class ValueSerializerConfigSnapshot<T extends Value> 
extends KryoRegistrationSerializerConfigSnapshot<T> {

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
index a172b72..655de76 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
@@ -399,7 +399,7 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
                                                        "proper serializer, 
because its previous serializer cannot be loaded or is no " +
                                                        "longer valid but a new 
serializer is not available", reconfiguredRegistrationEntry.getKey());
 
-                                               return 
CompatibilityResult.requiresMigration(null);
+                                               return 
CompatibilityResult.requiresMigration();
                                        }
                                }
 
@@ -410,7 +410,7 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
                        }
                }
 
-               return CompatibilityResult.requiresMigration(null);
+               return CompatibilityResult.requiresMigration();
        }
 
        public static final class KryoSerializerConfigSnapshot<T> extends 
KryoRegistrationSerializerConfigSnapshot<T> {

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
index f2879ac..73c4379 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
@@ -121,11 +121,11 @@ public abstract class SerializerTestBase<T> extends 
TestLogger {
                }
 
                CompatibilityResult strategy = 
getSerializer().ensureCompatibility(restoredConfig);
-               assertFalse(strategy.requiresMigration());
+               assertFalse(strategy.isRequiresMigration());
 
                // also verify that the serializer's reconfigure implementation 
detects incompatibility
                strategy = getSerializer().ensureCompatibility(new 
TestIncompatibleSerializerConfigSnapshot());
-               assertTrue(strategy.requiresMigration());
+               assertTrue(strategy.isRequiresMigration());
        }
        
        @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java
index 5c615de..16ea945 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java
@@ -74,7 +74,7 @@ public class EnumSerializerTest extends TestLogger {
                // reconfigure and verify compatibility
                CompatibilityResult<PublicEnum> compatResult = 
serializer.ensureCompatibility(
                        new 
EnumSerializer.EnumSerializerConfigSnapshot<>(PublicEnum.class, 
mockPreviousOrder));
-               assertFalse(compatResult.requiresMigration());
+               assertFalse(compatResult.isRequiresMigration());
 
                // after reconfiguration, the order should be first the 
original BAR, PAULA, NATHANIEL,
                // followed by the "new enum constants" FOO, PETER, EMMA
@@ -107,7 +107,7 @@ public class EnumSerializerTest extends TestLogger {
                }
 
                CompatibilityResult<PublicEnum> compatResult = 
serializer.ensureCompatibility(restoredConfig);
-               assertFalse(compatResult.requiresMigration());
+               assertFalse(compatResult.isRequiresMigration());
 
                assertEquals(PublicEnum.FOO.ordinal(), 
serializer.getValueToOrdinal().get(PublicEnum.FOO).intValue());
                assertEquals(PublicEnum.BAR.ordinal(), 
serializer.getValueToOrdinal().get(PublicEnum.BAR).intValue());
@@ -163,7 +163,7 @@ public class EnumSerializerTest extends TestLogger {
                // reconfigure and verify compatibility
                CompatibilityResult<PublicEnum> compatResult = 
serializer.ensureCompatibility(
                        new 
EnumSerializer.EnumSerializerConfigSnapshot<>(PublicEnum.class, 
mockPreviousOrder));
-               assertFalse(compatResult.requiresMigration());
+               assertFalse(compatResult.isRequiresMigration());
 
                // serialize and deserialize again the serializer
                byte[] serializedSerializer = 
InstantiationUtil.serializeObject(serializer);

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
index 5459d53..c77ffcc 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
@@ -300,7 +300,7 @@ public class PojoSerializerTest extends 
SerializerTestBase<PojoSerializerTest.Te
                }
 
                CompatibilityResult<SubTestUserClassA> compatResult = 
pojoSerializer2.ensureCompatibility(pojoSerializerConfigSnapshot);
-               assertTrue(compatResult.requiresMigration());
+               assertTrue(compatResult.isRequiresMigration());
        }
 
        /**
@@ -340,7 +340,7 @@ public class PojoSerializerTest extends 
SerializerTestBase<PojoSerializerTest.Te
                }
 
                CompatibilityResult<TestUserClass> compatResult = 
pojoSerializer.ensureCompatibility(pojoSerializerConfigSnapshot);
-               assertTrue(!compatResult.requiresMigration());
+               assertTrue(!compatResult.isRequiresMigration());
 
                // reconfigure - check reconfiguration result and that 
registration ids remains the same
                //assertEquals(ReconfigureResult.COMPATIBLE, 
pojoSerializer.reconfigure(pojoSerializerConfigSnapshot));
@@ -384,7 +384,7 @@ public class PojoSerializerTest extends 
SerializerTestBase<PojoSerializerTest.Te
 
                // reconfigure - check reconfiguration result and that subclass 
serializer cache is repopulated
                CompatibilityResult<TestUserClass> compatResult = 
pojoSerializer.ensureCompatibility(pojoSerializerConfigSnapshot);
-               assertFalse(compatResult.requiresMigration());
+               assertFalse(compatResult.isRequiresMigration());
                assertEquals(2, 
pojoSerializer.getSubclassSerializerCache().size());
                
assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassA.class));
                
assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassB.class));
@@ -446,7 +446,7 @@ public class PojoSerializerTest extends 
SerializerTestBase<PojoSerializerTest.Te
                // 1) subclass serializer cache is repopulated
                // 2) registrations also contain the now registered subclasses
                CompatibilityResult<TestUserClass> compatResult = 
pojoSerializer.ensureCompatibility(pojoSerializerConfigSnapshot);
-               assertFalse(compatResult.requiresMigration());
+               assertFalse(compatResult.isRequiresMigration());
                assertEquals(2, 
pojoSerializer.getSubclassSerializerCache().size());
                
assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassA.class));
                
assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassB.class));
@@ -501,7 +501,7 @@ public class PojoSerializerTest extends 
SerializerTestBase<PojoSerializerTest.Te
                CompatibilityResult<TestUserClass> compatResult = 
pojoSerializer.ensureCompatibility(
 
                        mockPreviousConfigSnapshot);
-               assertFalse(compatResult.requiresMigration());
+               assertFalse(compatResult.isRequiresMigration());
                int i = 0;
                for (Field field : mockOriginalFieldOrder) {
                        assertEquals(field, pojoSerializer.getFields()[i]);

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
index 60c4dc4..860c560 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
@@ -66,7 +66,7 @@ public class KryoSerializerCompatibilityTest {
                }
 
                CompatibilityResult<TestClassB> compatResult = 
kryoSerializerForB.ensureCompatibility(kryoSerializerConfigSnapshot);
-               assertTrue(compatResult.requiresMigration());
+               assertTrue(compatResult.isRequiresMigration());
        }
 
        /**
@@ -110,7 +110,7 @@ public class KryoSerializerCompatibilityTest {
 
                // reconfigure - check reconfiguration result and that 
registration id remains the same
                CompatibilityResult<TestClass> compatResult = 
kryoSerializer.ensureCompatibility(kryoSerializerConfigSnapshot);
-               assertFalse(compatResult.requiresMigration());
+               assertFalse(compatResult.isRequiresMigration());
                assertEquals(testClassId, 
kryoSerializer.getKryo().getRegistration(TestClass.class).getId());
                assertEquals(testClassAId, 
kryoSerializer.getKryo().getRegistration(TestClassA.class).getId());
                assertEquals(testClassBId, 
kryoSerializer.getKryo().getRegistration(TestClassB.class).getId());

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index 140e091..3afe397 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -513,7 +513,7 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT>
                                CompatibilityResult<T> compatResult = 
elementSerializer.ensureCompatibility(
                                                
((CollectionSerializerConfigSnapshot) 
configSnapshot).getSingleNestedSerializerConfigSnapshot());
 
-                               if (!compatResult.requiresMigration()) {
+                               if (!compatResult.isRequiresMigration()) {
                                        return CompatibilityResult.compatible();
                                } else if 
(compatResult.getConvertDeserializer() != null) {
                                        return 
CompatibilityResult.requiresMigration(
@@ -522,7 +522,7 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT>
                                }
                        }
 
-                       return CompatibilityResult.requiresMigration(null);
+                       return CompatibilityResult.requiresMigration();
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
index 122f4fb..0fd3680 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
@@ -93,20 +93,20 @@ class CRowSerializer(val rowSerializer: 
TypeSerializer[Row]) extends TypeSeriali
         val compatResult = rowSerializer.ensureCompatibility(
             
crowSerializerConfigSnapshot.getSingleNestedSerializerConfigSnapshot)
 
-        if (compatResult.requiresMigration()) {
+        if (compatResult.isRequiresMigration) {
           if (compatResult.getConvertDeserializer != null) {
             CompatibilityResult.requiresMigration(
               new CRowSerializer(
                 new 
TypeDeserializerAdapter(compatResult.getConvertDeserializer))
             )
           } else {
-            CompatibilityResult.requiresMigration(null)
+            CompatibilityResult.requiresMigration()
           }
         } else {
           CompatibilityResult.compatible()
         }
 
-      case _ => CompatibilityResult.requiresMigration(null)
+      case _ => CompatibilityResult.requiresMigration()
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
index c39cb9b..56eb7ea 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
@@ -152,7 +152,7 @@ final public class ArrayListSerializer<T> extends 
TypeSerializer<ArrayList<T>> {
                        CompatibilityResult<T> compatResult = 
elementSerializer.ensureCompatibility(
                                ((CollectionSerializerConfigSnapshot) 
configSnapshot).getSingleNestedSerializerConfigSnapshot());
 
-                       if (!compatResult.requiresMigration()) {
+                       if (!compatResult.isRequiresMigration()) {
                                return CompatibilityResult.compatible();
                        } else if (compatResult.getConvertDeserializer() != 
null) {
                                return CompatibilityResult.requiresMigration(
@@ -160,6 +160,6 @@ final public class ArrayListSerializer<T> extends 
TypeSerializer<ArrayList<T>> {
                        }
                }
 
-               return CompatibilityResult.requiresMigration(null);
+               return CompatibilityResult.requiresMigration();
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java
index 925fe78..b93c9e0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java
@@ -217,7 +217,7 @@ public final class HashMapSerializer<K, V> extends 
TypeSerializer<HashMap<K, V>>
                        CompatibilityResult<K> keyCompatResult = 
keySerializer.ensureCompatibility(keyValueSerializerConfigSnapshots[0]);
                        CompatibilityResult<V> valueCompatResult = 
valueSerializer.ensureCompatibility(keyValueSerializerConfigSnapshots[1]);
 
-                       if (!keyCompatResult.requiresMigration() && 
!valueCompatResult.requiresMigration()) {
+                       if (!keyCompatResult.isRequiresMigration() && 
!valueCompatResult.isRequiresMigration()) {
                                return CompatibilityResult.compatible();
                        } else if (keyCompatResult.getConvertDeserializer() != 
null && valueCompatResult.getConvertDeserializer() != null) {
                                return CompatibilityResult.requiresMigration(
@@ -227,6 +227,6 @@ public final class HashMapSerializer<K, V> extends 
TypeSerializer<HashMap<K, V>>
                        }
                }
 
-               return CompatibilityResult.requiresMigration(null);
+               return CompatibilityResult.requiresMigration();
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateMigrationUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateMigrationUtil.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateMigrationUtil.java
index 978f28d..39bb743 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateMigrationUtil.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateMigrationUtil.java
@@ -48,7 +48,7 @@ public class StateMigrationUtil {
         *
         * @param <T> Type of the data handled by the serializers
         *
-        * @return the final resolved compatiblity result
+        * @return the final resolved compatibility result
         */
        public static <T> CompatibilityResult<T> resolveCompatibilityResult(
                        TypeSerializer<T> precedingSerializer,
@@ -59,7 +59,7 @@ public class StateMigrationUtil {
                if (precedingSerializerConfigSnapshot != null) {
                        CompatibilityResult<T> initialResult = 
newSerializer.ensureCompatibility(precedingSerializerConfigSnapshot);
 
-                       if (!initialResult.requiresMigration()) {
+                       if (!initialResult.isRequiresMigration()) {
                                return initialResult;
                        } else {
                                if (precedingSerializer != null && 
!(precedingSerializer.getClass().equals(dummySerializerClassTag))) {

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
index 88b2041..6c4378a 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
@@ -127,7 +127,9 @@ class EitherSerializer[A, B, T <: Either[A, B]](
         val leftCompatResult = 
leftSerializer.ensureCompatibility(leftRightConfigs(0))
         val rightCompatResult = 
rightSerializer.ensureCompatibility(leftRightConfigs(1))
 
-        if (leftCompatResult.requiresMigration || 
rightCompatResult.requiresMigration) {
+        if (leftCompatResult.isRequiresMigration
+            || rightCompatResult.isRequiresMigration) {
+
           if (leftCompatResult.getConvertDeserializer != null
               && rightCompatResult.getConvertDeserializer != null) {
 
@@ -139,13 +141,13 @@ class EitherSerializer[A, B, T <: Either[A, B]](
             )
 
           } else {
-            CompatibilityResult.requiresMigration(null)
+            CompatibilityResult.requiresMigration()
           }
         } else {
           CompatibilityResult.compatible()
         }
 
-      case _ => CompatibilityResult.requiresMigration(null)
+      case _ => CompatibilityResult.requiresMigration()
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
index dc96c98..843079a 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
@@ -95,16 +95,16 @@ class EnumValueSerializer[E <: Enumeration](val enum: E) 
extends TypeSerializer[
             // and original constants must be in the exact same order
 
             if (currentEnumConstants(i) != 
enumSerializerConfigSnapshot.getEnumConstants(i)) {
-              CompatibilityResult.requiresMigration(null)
+              return CompatibilityResult.requiresMigration()
             }
           }
 
           CompatibilityResult.compatible()
         } else {
-          CompatibilityResult.requiresMigration(null)
+          CompatibilityResult.requiresMigration()
         }
 
-      case _ => CompatibilityResult.requiresMigration(null)
+      case _ => CompatibilityResult.requiresMigration()
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
index 81b3bcc..4b56059 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
@@ -111,19 +111,19 @@ class OptionSerializer[A](val elemSerializer: 
TypeSerializer[A])
         val compatResult = elemSerializer.ensureCompatibility(
           
optionSerializerConfigSnapshot.getSingleNestedSerializerConfigSnapshot)
 
-        if (compatResult.requiresMigration()) {
+        if (compatResult.isRequiresMigration) {
           if (compatResult.getConvertDeserializer != null) {
             CompatibilityResult.requiresMigration(
               new OptionSerializer[A](
                 new 
TypeDeserializerAdapter(compatResult.getConvertDeserializer)))
           } else {
-            CompatibilityResult.requiresMigration(null)
+            CompatibilityResult.requiresMigration()
           }
         } else {
           CompatibilityResult.compatible()
         }
 
-      case _ => CompatibilityResult.requiresMigration(null)
+      case _ => CompatibilityResult.requiresMigration()
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
index c864dc7..5de76ca 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
@@ -123,13 +123,13 @@ class TrySerializer[A](
         val throwableCompatRes =
           throwableSerializer.ensureCompatibility(serializerConfigSnapshots(1))
 
-        if (elemCompatRes.requiresMigration() || 
throwableCompatRes.requiresMigration()) {
-          CompatibilityResult.requiresMigration(null)
+        if (elemCompatRes.isRequiresMigration || 
throwableCompatRes.isRequiresMigration) {
+          CompatibilityResult.requiresMigration()
         } else {
           CompatibilityResult.compatible()
         }
 
-      case _ => CompatibilityResult.requiresMigration(null)
+      case _ => CompatibilityResult.requiresMigration()
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
index 552ffd0..dc23b8d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
@@ -225,7 +225,7 @@ public class MultiplexingStreamRecordSerializer<T> extends 
TypeSerializer<Stream
                        CompatibilityResult<T> compatResult = 
typeSerializer.ensureCompatibility(
                                
((MultiplexingStreamRecordSerializerConfigSnapshot) 
configSnapshot).getSingleNestedSerializerConfigSnapshot());
 
-                       if (!compatResult.requiresMigration()) {
+                       if (!compatResult.isRequiresMigration()) {
                                return CompatibilityResult.compatible();
                        } else if (compatResult.getConvertDeserializer() != 
null) {
                                return CompatibilityResult.requiresMigration(
@@ -234,7 +234,7 @@ public class MultiplexingStreamRecordSerializer<T> extends 
TypeSerializer<Stream
                        }
                }
 
-               return CompatibilityResult.requiresMigration(null);
+               return CompatibilityResult.requiresMigration();
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java
index f7a661e..7b0390d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java
@@ -164,8 +164,8 @@ public final class StreamRecordSerializer<T> extends 
TypeSerializer<StreamRecord
                        CompatibilityResult<T> compatResult = 
typeSerializer.ensureCompatibility(
                                ((StreamRecordSerializerConfigSnapshot) 
configSnapshot).getSingleNestedSerializerConfigSnapshot());
 
-                       if (!compatResult.requiresMigration()) {
-                               return 
CompatibilityResult.requiresMigration(null);
+                       if (!compatResult.isRequiresMigration()) {
+                               return CompatibilityResult.compatible();
                        } else if (compatResult.getConvertDeserializer() != 
null) {
                                return CompatibilityResult.requiresMigration(
                                        new StreamRecordSerializer<>(
@@ -173,7 +173,7 @@ public final class StreamRecordSerializer<T> extends 
TypeSerializer<StreamRecord
                        }
                }
 
-               return CompatibilityResult.requiresMigration(null);
+               return CompatibilityResult.requiresMigration();
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
index e444ced..ba69fed 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
@@ -286,7 +286,7 @@ public final class StreamElementSerializer<T> extends 
TypeSerializer<StreamEleme
                        CompatibilityResult<T> compatResult = 
typeSerializer.ensureCompatibility(
                                ((StreamElementSerializerConfigSnapshot) 
configSnapshot).getSingleNestedSerializerConfigSnapshot());
 
-                       if (!compatResult.requiresMigration()) {
+                       if (!compatResult.isRequiresMigration()) {
                                return CompatibilityResult.compatible();
                        } else if (compatResult.getConvertDeserializer() != 
null) {
                                return CompatibilityResult.requiresMigration(
@@ -295,7 +295,7 @@ public final class StreamElementSerializer<T> extends 
TypeSerializer<StreamEleme
                        }
                }
 
-               return CompatibilityResult.requiresMigration(null);
+               return CompatibilityResult.requiresMigration();
        }
 
        /**

Reply via email to