This is an automated email from the ASF dual-hosted git repository.

zakelly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6a76eeee28eb56a1c2cb880cdad61f962c55c82a
Author: yinhan.yh <[email protected]>
AuthorDate: Tue Sep 24 18:49:18 2024 +0800

    [FLINK-30614][serializer] Remove old method of resolving schema 
compatibility.
---
 .../serialization/custom_serialization.md          |  6 +-
 .../serialization/custom_serialization.md          |  6 +-
 .../common/typeutils/TypeSerializerSnapshot.java   | 70 +----------------
 .../base/GenericArraySerializerConfigSnapshot.java |  9 +++
 .../runtime/EitherSerializerSnapshot.java          |  9 +++
 .../typeutils/TypeSerializerSnapshotTest.java      | 87 ++--------------------
 6 files changed, 34 insertions(+), 153 deletions(-)

diff --git 
a/docs/content.zh/docs/dev/datastream/fault-tolerance/serialization/custom_serialization.md
 
b/docs/content.zh/docs/dev/datastream/fault-tolerance/serialization/custom_serialization.md
index 08eb9d5d88b..f8e4840d28a 100644
--- 
a/docs/content.zh/docs/dev/datastream/fault-tolerance/serialization/custom_serialization.md
+++ 
b/docs/content.zh/docs/dev/datastream/fault-tolerance/serialization/custom_serialization.md
@@ -456,9 +456,9 @@ There are no ways to specify the compatibility with the old 
serializer in the ne
 not supported in some scenarios.
 
 So from Flink 1.19, the direction of resolving schema compatibility has been 
reversed. The old method
-`TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer 
newSerializer)` has been marked as deprecated
-and will be removed in the future. it is highly recommended to migrate from 
the old one to
-`TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializerSnapshot 
oldSerializerSnapshot)`. The steps to do this are as follows:
+`TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer 
newSerializer)` is now removed and needs to be replaced with
+`TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializerSnapshot 
oldSerializerSnapshot)`.
+To make this transition, follow these steps:
 
 1. Implement the 
`TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializerSnapshot 
oldSerializerSnapshot)` whose logic
    should be same as the original 
`TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer 
newSerializer)`.
diff --git 
a/docs/content/docs/dev/datastream/fault-tolerance/serialization/custom_serialization.md
 
b/docs/content/docs/dev/datastream/fault-tolerance/serialization/custom_serialization.md
index 5cfa9c67b69..ea87c8bfc1d 100644
--- 
a/docs/content/docs/dev/datastream/fault-tolerance/serialization/custom_serialization.md
+++ 
b/docs/content/docs/dev/datastream/fault-tolerance/serialization/custom_serialization.md
@@ -460,9 +460,9 @@ There are no ways to specify the compatibility with the old 
serializer in the ne
 not supported in some scenarios.
 
 So from Flink 1.19, the direction of resolving schema compatibility has been 
reversed. The old method 
-`TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer 
newSerializer)` has been marked as deprecated 
-and will be removed in the future. it is highly recommended to migrate from 
the old one to 
-`TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializerSnapshot 
oldSerializerSnapshot)`. The steps to do this are as follows:
+`TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer 
newSerializer)` is now removed and needs to be replaced with 
+`TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializerSnapshot 
oldSerializerSnapshot)`.
+To make this transition, follow these steps:
 
 1. Implement the 
`TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializerSnapshot 
oldSerializerSnapshot)` whose logic 
    should be same as the original 
`TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer 
newSerializer)`.
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.java
index dea3d30a7b9..1fe4134ee51 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.java
@@ -112,51 +112,6 @@ public interface TypeSerializerSnapshot<T> {
      */
     TypeSerializer<T> restoreSerializer();
 
-    /**
-     * Checks a new serializer's compatibility to read data written by the 
prior serializer.
-     *
-     * <p>When a checkpoint/savepoint is restored, this method checks whether 
the serialization
-     * format of the data in the checkpoint/savepoint is compatible for the 
format of the serializer
-     * used by the program that restores the checkpoint/savepoint. The outcome 
can be that the
-     * serialization format is compatible, that the program's serializer needs 
to reconfigure itself
-     * (meaning to incorporate some information from the 
TypeSerializerSnapshot to be compatible),
-     * that the format is outright incompatible, or that a migration needed. 
In the latter case, the
-     * TypeSerializerSnapshot produces a serializer to deserialize the data, 
and the restoring
-     * program's serializer re-serializes the data, thus converting the format 
during the restore
-     * operation.
-     *
-     * @deprecated This method has been replaced by {@link 
TypeSerializerSnapshot
-     *     #resolveSchemaCompatibility(TypeSerializerSnapshot)} and will be 
removed in the future
-     *     release. It's strongly recommended to migrate from old method to 
the new one, see the doc
-     *     section "Migrating from deprecated
-     *     `TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer 
newSerializer)`" for
-     *     more details.
-     * @param newSerializer the new serializer to check.
-     * @return the serializer compatibility result.
-     */
-    @Deprecated
-    default TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(
-            TypeSerializer<T> newSerializer) {
-        // A temporal check to ensure that at least one method is implemented 
to avoid infinite loop
-        TypeSerializerSnapshot<T> newSerializerSnapshot = 
newSerializer.snapshotConfiguration();
-        try {
-            Class<?> subClass =
-                    newSerializerSnapshot
-                            .getClass()
-                            .getMethod("resolveSchemaCompatibility", 
TypeSerializerSnapshot.class)
-                            .getDeclaringClass();
-            if (subClass == TypeSerializerSnapshot.class) {
-                throw new UnsupportedOperationException(
-                        "Must implement at least one method about 
'resolveSchemaCompatibility', "
-                                + "Recommend strongly to implement 
TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializerSnapshot), see 
FLIP-263 for more details");
-            }
-        } catch (NoSuchMethodException e) {
-            throw new RuntimeException(e);
-        }
-        // Call the new method to resolve the schema compatibility which must 
be implemented
-        return newSerializerSnapshot.resolveSchemaCompatibility(this);
-    }
-
     /**
      * Checks current serializer's compatibility to read data written by the 
prior serializer.
      *
@@ -176,29 +131,8 @@ public interface TypeSerializerSnapshot<T> {
      * @param oldSerializerSnapshot the old serializer snapshot to check.
      * @return the serializer compatibility result.
      */
-    default TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(
-            TypeSerializerSnapshot<T> oldSerializerSnapshot) {
-        // A temporal check to ensure that at least one method is implemented 
to avoid infinite
-        // loop,
-        // which will be removed after removing the deprecated method
-        // TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer 
newSerializer).
-        try {
-            Class<?> subClass =
-                    oldSerializerSnapshot
-                            .getClass()
-                            .getMethod("resolveSchemaCompatibility", 
TypeSerializer.class)
-                            .getDeclaringClass();
-            if (subClass == TypeSerializerSnapshot.class) {
-                throw new UnsupportedOperationException(
-                        "Must implement at least one method about 
'resolveSchemaCompatibility', "
-                                + "Recommend strongly to implement 
TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializerSnapshot), see 
FLIP-263 for more details");
-            }
-        } catch (NoSuchMethodException e) {
-            throw new RuntimeException(e);
-        }
-        // Call the old method to resolve the schema compatibility which must 
be implemented
-        return 
oldSerializerSnapshot.resolveSchemaCompatibility(restoreSerializer());
-    }
+    TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(
+            TypeSerializerSnapshot<T> oldSerializerSnapshot);
 
     // ------------------------------------------------------------------------
     //  read / write utilities
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
index cb9369cc3e7..1aa9df51d15 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.common.typeutils.base;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -105,6 +106,14 @@ public final class GenericArraySerializerConfigSnapshot<C> 
implements TypeSerial
                 componentClass, nestedSnapshot.getRestoredNestedSerializer(0));
     }
 
+    @Override
+    public TypeSerializerSchemaCompatibility<C[]> resolveSchemaCompatibility(
+            TypeSerializerSnapshot<C[]> oldSerializerSnapshot) {
+        throw new UnsupportedOperationException(
+                "Unexpected call to 
GenericArraySerializerConfigSnapshot#resolveSchemaCompatibility."
+                        + " GenericArraySerializerSnapshot should be used 
instead.");
+    }
+
     @Nullable
     public TypeSerializerSnapshot<?>[] getNestedSerializerSnapshots() {
         return nestedSnapshot == null ? null : 
nestedSnapshot.getNestedSerializerSnapshots();
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerSnapshot.java
index 5c463eab3af..338789d3d5e 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerSnapshot.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.java.typeutils.runtime;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -106,4 +107,12 @@ public final class EitherSerializerSnapshot<L, R> 
implements TypeSerializerSnaps
     public TypeSerializerSnapshot<?>[] getNestedSerializerSnapshots() {
         return nestedSnapshot == null ? null : 
nestedSnapshot.getNestedSerializerSnapshots();
     }
+
+    @Override
+    public TypeSerializerSchemaCompatibility<Either<L, R>> 
resolveSchemaCompatibility(
+            TypeSerializerSnapshot<Either<L, R>> oldSerializerSnapshot) {
+        throw new UnsupportedOperationException(
+                "Unexpected call to 
EitherSerializerSnapshot#resolveSchemaCompatibility."
+                        + " JavaEitherSerializerSnapshot should be used 
instead.");
+    }
 }
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotTest.java
index d16eb886813..b176adc482b 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotTest.java
@@ -24,86 +24,10 @@ import org.apache.flink.core.memory.DataOutputView;
 import org.junit.jupiter.api.Test;
 
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Test for {@link TypeSerializerSnapshot} */
 class TypeSerializerSnapshotTest {
 
-    @Test
-    void testIllegalSchemaCompatibility() {
-        TypeSerializerSnapshot<Integer> illegalSnapshot =
-                new NotCompletedTypeSerializerSnapshot() {};
-
-        // Should throw UnsupportedOperationException if both two methods are 
not implemented
-        assertThatThrownBy(
-                        () ->
-                                illegalSnapshot.resolveSchemaCompatibility(
-                                        new NotCompletedTypeSerializer()))
-                .isInstanceOf(UnsupportedOperationException.class);
-        assertThatThrownBy(
-                        () ->
-                                illegalSnapshot.resolveSchemaCompatibility(
-                                        new 
NotCompletedTypeSerializer().snapshotConfiguration()))
-                .isInstanceOf(UnsupportedOperationException.class);
-    }
-
-    @Test
-    void testNewSchemaCompatibility() {
-        TypeSerializerSnapshot<Integer> legalSnapshot =
-                new NotCompletedTypeSerializerSnapshot() {
-                    @Override
-                    public TypeSerializerSchemaCompatibility<Integer> 
resolveSchemaCompatibility(
-                            TypeSerializerSnapshot<Integer> 
oldSerializerSnapshot) {
-                        return 
TypeSerializerSchemaCompatibility.compatibleAsIs();
-                    }
-                };
-
-        // The result of resolving schema compatibility should always be 
determined by legalSnapshot
-        assertThat(
-                        new NotCompletedTypeSerializerSnapshot()
-                                .resolveSchemaCompatibility(
-                                        new NotCompletedTypeSerializer() {
-                                            @Override
-                                            public 
TypeSerializerSnapshot<Integer>
-                                                    snapshotConfiguration() {
-                                                return legalSnapshot;
-                                            }
-                                        })
-                                .isCompatibleAsIs())
-                .isTrue();
-        assertThat(
-                        legalSnapshot
-                                .resolveSchemaCompatibility(
-                                        new 
NotCompletedTypeSerializerSnapshot() {})
-                                .isCompatibleAsIs())
-                .isTrue();
-    }
-
-    @Test
-    void testOldSchemaCompatibility() {
-        TypeSerializerSnapshot<Integer> legalSnapshot =
-                new NotCompletedTypeSerializerSnapshot() {
-
-                    @Override
-                    public TypeSerializerSchemaCompatibility<Integer> 
resolveSchemaCompatibility(
-                            TypeSerializer<Integer> newSerializer) {
-                        return 
TypeSerializerSchemaCompatibility.compatibleAsIs();
-                    }
-                };
-
-        // The result of resolving schema compatibility should always be 
determined by legalSnapshot
-        assertThat(
-                        legalSnapshot
-                                .resolveSchemaCompatibility(new 
NotCompletedTypeSerializer())
-                                .isCompatibleAsIs())
-                .isTrue();
-        assertThat(
-                        new NotCompletedTypeSerializerSnapshot()
-                                .resolveSchemaCompatibility(legalSnapshot)
-                                .isCompatibleAsIs())
-                .isTrue();
-    }
-
     @Test
     void testNestedSchemaCompatibility() {
         TypeSerializerSnapshot<Integer> innerSnapshot =
@@ -119,9 +43,8 @@ class TypeSerializerSnapshotTest {
                 new NotCompletedTypeSerializerSnapshot() {
                     @Override
                     public TypeSerializerSchemaCompatibility<Integer> 
resolveSchemaCompatibility(
-                            TypeSerializer<Integer> newSerializer) {
-                        return innerSnapshot.resolveSchemaCompatibility(
-                                innerSnapshot.restoreSerializer());
+                            TypeSerializerSnapshot<Integer> newSerializer) {
+                        return 
innerSnapshot.resolveSchemaCompatibility(innerSnapshot);
                     }
                 };
 
@@ -232,5 +155,11 @@ class TypeSerializerSnapshotTest {
                 }
             };
         }
+
+        @Override
+        public TypeSerializerSchemaCompatibility<Integer> 
resolveSchemaCompatibility(
+                TypeSerializerSnapshot<Integer> oldSerializerSnapshot) {
+            return TypeSerializerSchemaCompatibility.incompatible();
+        }
     }
 }

Reply via email to