This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git

commit edeeda31745e63ec80caff97286343de8d2d2c43
Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
AuthorDate: Mon Feb 25 16:45:54 2019 +0800

    [FLINK-11741] [tests] Remove ensureCompatibility implementation in all 
test-related serializers
    
    This also fixes the snapshotConfiguration method of some test-related
    serializers, to return a proper snapshot of itself.
---
 .../TypeSerializerSerializationUtilTest.java       | 31 +++++++++++------
 .../typeutils/TypeSerializerSnapshotTest.java      |  5 ---
 .../api/java/io/CollectionInputFormatTest.java     |  6 ----
 .../testutils/types/IntListSerializer.java         |  6 ----
 .../testutils/types/IntPairSerializer.java         |  6 ----
 .../testutils/types/StringPairSerializer.java      | 10 ++----
 .../flink/runtime/query/KvStateRegistryTest.java   |  6 ----
 .../state/InternalPriorityQueueTestBase.java       | 39 +++++++++++++---------
 .../runtime/state/OperatorStateBackendTest.java    | 13 ++++----
 .../state/heap/TestDuplicateSerializer.java        | 10 ++----
 .../testutils/recordutils/RecordSerializer.java    | 10 ++----
 .../TypeSerializerSnapshotMigrationITCase.java     | 10 ------
 12 files changed, 57 insertions(+), 95 deletions(-)

diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtilTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtilTest.java
index 1a81c61..8e01b13 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtilTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtilTest.java
@@ -236,7 +236,12 @@ public class TypeSerializerSerializationUtilTest 
implements Serializable {
        @Test
        public void testAnonymousSerializerClassWithChangedSerialVersionUID() 
throws Exception {
 
-               TypeSerializer anonymousClassSerializer = new 
AbstractIntSerializer() {};
+               TypeSerializer anonymousClassSerializer = new 
AbstractIntSerializer() {
+                       @Override
+                       public TypeSerializerSnapshot<Integer> 
snapshotConfiguration() {
+                               return null;
+                       }
+               };
                // assert that our assumption holds
                
Assert.assertTrue(anonymousClassSerializer.getClass().isAnonymousClass());
 
@@ -405,16 +410,6 @@ public class TypeSerializerSerializationUtilTest 
implements Serializable {
                }
 
                @Override
-               public TypeSerializerSnapshot<Integer> snapshotConfiguration() {
-                       return IntSerializer.INSTANCE.snapshotConfiguration();
-               }
-
-               @Override
-               public CompatibilityResult<Integer> 
ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
-                       return 
IntSerializer.INSTANCE.ensureCompatibility(configSnapshot);
-               }
-
-               @Override
                public int getLength() {
                        return IntSerializer.INSTANCE.getLength();
                }
@@ -433,5 +428,19 @@ public class TypeSerializerSerializationUtilTest 
implements Serializable {
        /** Just some serializer used for tests. */
        public static class TestIntSerializer extends AbstractIntSerializer {
                private static final long serialVersionUID = 
-3684467698271707216L;
+
+               @Override
+               public TypeSerializerSnapshot<Integer> snapshotConfiguration() {
+                       return new TestIntSerializerSnapshot();
+               }
+       }
+
+       /**
+        * Test serializer snapshot.
+        */
+       public static class TestIntSerializerSnapshot extends 
SimpleTypeSerializerSnapshot<Integer> {
+               public TestIntSerializerSnapshot() {
+                       super(TestIntSerializer::new);
+               }
        }
 }
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotTest.java
index ece8bc8..0e8305c 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotTest.java
@@ -180,11 +180,6 @@ public class TypeSerializerSnapshotTest {
                public TypeSerializerSnapshot<Object> snapshotConfiguration() {
                        return new TestSerializerConfigSnapshot();
                }
-
-               @Override
-               public CompatibilityResult<Object> 
ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
-                       return compatible ? CompatibilityResult.compatible() : 
CompatibilityResult.requiresMigration();
-               }
        }
 
        public static class TestSerializerConfigSnapshot extends 
TypeSerializerConfigSnapshot<Object> {
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
index 95b4e3b..87df0f3 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.api.java.io;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -393,10 +392,5 @@ public class CollectionInputFormatTest {
                public TypeSerializerConfigSnapshot<ElementType> 
snapshotConfiguration() {
                        throw new UnsupportedOperationException();
                }
-
-               @Override
-               public CompatibilityResult<ElementType> 
ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
-                       throw new UnsupportedOperationException();
-               }
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java
index e810337..7d06230 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.operators.testutils.types;
 import java.io.IOException;
 import java.util.Arrays;
 
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.core.memory.DataInputView;
@@ -123,9 +122,4 @@ public class IntListSerializer extends 
TypeSerializer<IntList> {
        public TypeSerializerConfigSnapshot<IntList> snapshotConfiguration() {
                throw new UnsupportedOperationException();
        }
-
-       @Override
-       public CompatibilityResult<IntList> 
ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
-               throw new UnsupportedOperationException();
-       }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java
index 2e40a0e..1392f9f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.operators.testutils.types;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
@@ -132,9 +131,4 @@ public class IntPairSerializer extends 
TypeSerializer<IntPair> {
        public TypeSerializerConfigSnapshot<IntPair> snapshotConfiguration() {
                throw new UnsupportedOperationException();
        }
-
-       @Override
-       public CompatibilityResult<IntPair> 
ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
-               throw new UnsupportedOperationException();
-       }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java
index af64a59..2d28b54 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java
@@ -20,9 +20,8 @@ package org.apache.flink.runtime.operators.testutils.types;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.types.StringValue;
@@ -97,12 +96,7 @@ public class StringPairSerializer extends 
TypeSerializer<StringPair> {
        }
 
        @Override
-       public TypeSerializerConfigSnapshot<StringPair> snapshotConfiguration() 
{
-               throw new UnsupportedOperationException();
-       }
-
-       @Override
-       public CompatibilityResult<StringPair> 
ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
+       public TypeSerializerSnapshot<StringPair> snapshotConfiguration() {
                throw new UnsupportedOperationException();
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java
index bd94639..dc5e5d7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.query;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
@@ -404,10 +403,5 @@ public class KvStateRegistryTest extends TestLogger {
                public TypeSerializerConfigSnapshot<String> 
snapshotConfiguration() {
                        return null;
                }
-
-               @Override
-               public CompatibilityResult<String> 
ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
-                       return null;
-               }
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java
index 2f4a33e..5f613f2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.runtime.state;
 
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -496,17 +496,11 @@ public abstract class InternalPriorityQueueTestBase 
extends TestLogger {
                }
 
                @Override
-               public TypeSerializerConfigSnapshot snapshotConfiguration() {
+               public Snapshot snapshotConfiguration() {
                        return new Snapshot(getRevision());
                }
 
-               @Override
-               public CompatibilityResult<TestElement> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
-                       return (configSnapshot instanceof Snapshot) && 
((Snapshot) configSnapshot).revision <= getRevision() ?
-                               CompatibilityResult.compatible() : 
CompatibilityResult.requiresMigration();
-               }
-
-               public static class Snapshot extends 
TypeSerializerConfigSnapshot {
+               public static class Snapshot implements 
TypeSerializerSnapshot<TestElement> {
 
                        private int revision;
 
@@ -528,7 +522,7 @@ public abstract class InternalPriorityQueueTestBase extends 
TestLogger {
                        }
 
                        @Override
-                       public int getVersion() {
+                       public int getCurrentVersion() {
                                return 0;
                        }
 
@@ -537,16 +531,31 @@ public abstract class InternalPriorityQueueTestBase 
extends TestLogger {
                        }
 
                        @Override
-                       public void write(DataOutputView out) throws 
IOException {
-                               super.write(out);
+                       public void writeSnapshot(DataOutputView out) throws 
IOException {
                                out.writeInt(revision);
                        }
 
                        @Override
-                       public void read(DataInputView in) throws IOException {
-                               super.read(in);
+                       public void readSnapshot(int readVersion, DataInputView 
in, ClassLoader userCodeClassLoader) throws IOException {
                                this.revision = in.readInt();
                        }
+
+                       @Override
+                       public TypeSerializer<TestElement> restoreSerializer() {
+                               return new TestElementSerializer();
+                       }
+
+                       @Override
+                       public TypeSerializerSchemaCompatibility<TestElement> 
resolveSchemaCompatibility(TypeSerializer<TestElement> newSerializer) {
+                               if (!(newSerializer instanceof 
TestElementSerializer)) {
+                                       return 
TypeSerializerSchemaCompatibility.incompatible();
+                               }
+
+                               TestElementSerializer testElementSerializer = 
(TestElementSerializer) newSerializer;
+                               return (revision <= 
testElementSerializer.getRevision())
+                                       ? 
TypeSerializerSchemaCompatibility.compatibleAsIs()
+                                       : 
TypeSerializerSchemaCompatibility.incompatible();
+                       }
                }
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
index 78e6f4e..f842fb2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
@@ -23,9 +23,8 @@ import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
@@ -340,12 +339,14 @@ public class OperatorStateBackendTest {
 
                @Override
                public TypeSerializerSnapshot<Integer> snapshotConfiguration() {
-                       return IntSerializer.INSTANCE.snapshotConfiguration();
+                       return new VerifyingIntSerializerSnapshot();
                }
+       }
 
-               @Override
-               public CompatibilityResult<Integer> 
ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
-                       return 
IntSerializer.INSTANCE.ensureCompatibility(configSnapshot);
+       @SuppressWarnings("WeakerAccess")
+       public static class VerifyingIntSerializerSnapshot extends 
SimpleTypeSerializerSnapshot<Integer> {
+               public VerifyingIntSerializerSnapshot() {
+                       super(() -> new 
VerifyingIntSerializer(Thread.currentThread().getContextClassLoader(), new 
AtomicInteger()));
                }
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/TestDuplicateSerializer.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/TestDuplicateSerializer.java
index a7e4ac8..da7fef8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/TestDuplicateSerializer.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/TestDuplicateSerializer.java
@@ -18,9 +18,8 @@
 
 package org.apache.flink.runtime.state.heap;
 
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
@@ -113,12 +112,7 @@ public class TestDuplicateSerializer extends 
TypeSerializer<Integer> {
        }
 
        @Override
-       public TypeSerializerConfigSnapshot<Integer> snapshotConfiguration() {
-               throw new UnsupportedOperationException();
-       }
-
-       @Override
-       public CompatibilityResult<Integer> 
ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
+       public TypeSerializerSnapshot<Integer> snapshotConfiguration() {
                throw new UnsupportedOperationException();
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordSerializer.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordSerializer.java
index a892bf4..ce060f8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordSerializer.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordSerializer.java
@@ -21,9 +21,8 @@ package org.apache.flink.runtime.testutils.recordutils;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.types.Record;
@@ -135,12 +134,7 @@ public final class RecordSerializer extends 
TypeSerializer<Record> {
        }
 
        @Override
-       public TypeSerializerConfigSnapshot<Record> snapshotConfiguration() {
-               throw new UnsupportedOperationException();
-       }
-
-       @Override
-       public CompatibilityResult<Record> 
ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
+       public TypeSerializerSnapshot<Record> snapshotConfiguration() {
                throw new UnsupportedOperationException();
        }
 }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/migration/TypeSerializerSnapshotMigrationITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/migration/TypeSerializerSnapshotMigrationITCase.java
index 3aa3963..a4d26de 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/migration/TypeSerializerSnapshotMigrationITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/migration/TypeSerializerSnapshotMigrationITCase.java
@@ -179,16 +179,6 @@ public class TypeSerializerSnapshotMigrationITCase extends 
SavepointMigrationTes
                        return new TestSerializerSnapshot(configPayload);
                }
 
-               /*
-               @Override
-               public CompatibilityResult<Long> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
-                       return (configSnapshot instanceof TestSerializerSnapshot
-                               && ((TestSerializerSnapshot) 
configSnapshot).configPayload.equals(configPayload))
-                               ? CompatibilityResult.compatible()
-                               : CompatibilityResult.requiresMigration();
-               }
-               */
-
                @Override
                public TypeSerializer<Long> duplicate() {
                        return this;

Reply via email to