This is an automated email from the ASF dual-hosted git repository. zakelly pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6a76eeee28eb56a1c2cb880cdad61f962c55c82a Author: yinhan.yh <[email protected]> AuthorDate: Tue Sep 24 18:49:18 2024 +0800 [FLINK-30614][serializer] Remove old method of resolving schema compatibility. --- .../serialization/custom_serialization.md | 6 +- .../serialization/custom_serialization.md | 6 +- .../common/typeutils/TypeSerializerSnapshot.java | 70 +---------------- .../base/GenericArraySerializerConfigSnapshot.java | 9 +++ .../runtime/EitherSerializerSnapshot.java | 9 +++ .../typeutils/TypeSerializerSnapshotTest.java | 87 ++-------------------- 6 files changed, 34 insertions(+), 153 deletions(-) diff --git a/docs/content.zh/docs/dev/datastream/fault-tolerance/serialization/custom_serialization.md b/docs/content.zh/docs/dev/datastream/fault-tolerance/serialization/custom_serialization.md index 08eb9d5d88b..f8e4840d28a 100644 --- a/docs/content.zh/docs/dev/datastream/fault-tolerance/serialization/custom_serialization.md +++ b/docs/content.zh/docs/dev/datastream/fault-tolerance/serialization/custom_serialization.md @@ -456,9 +456,9 @@ There are no ways to specify the compatibility with the old serializer in the ne not supported in some scenarios. So from Flink 1.19, the direction of resolving schema compatibility has been reversed. The old method -`TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer newSerializer)` has been marked as deprecated -and will be removed in the future. it is highly recommended to migrate from the old one to -`TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializerSnapshot oldSerializerSnapshot)`. The steps to do this are as follows: +`TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer newSerializer)` is now removed and needs to be replaced with +`TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializerSnapshot oldSerializerSnapshot)`. +To make this transition, follow these steps: 1. Implement the `TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializerSnapshot oldSerializerSnapshot)` whose logic should be same as the original `TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer newSerializer)`. diff --git a/docs/content/docs/dev/datastream/fault-tolerance/serialization/custom_serialization.md b/docs/content/docs/dev/datastream/fault-tolerance/serialization/custom_serialization.md index 5cfa9c67b69..ea87c8bfc1d 100644 --- a/docs/content/docs/dev/datastream/fault-tolerance/serialization/custom_serialization.md +++ b/docs/content/docs/dev/datastream/fault-tolerance/serialization/custom_serialization.md @@ -460,9 +460,9 @@ There are no ways to specify the compatibility with the old serializer in the ne not supported in some scenarios. So from Flink 1.19, the direction of resolving schema compatibility has been reversed. The old method -`TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer newSerializer)` has been marked as deprecated -and will be removed in the future. it is highly recommended to migrate from the old one to -`TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializerSnapshot oldSerializerSnapshot)`. The steps to do this are as follows: +`TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer newSerializer)` is now removed and needs to be replaced with +`TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializerSnapshot oldSerializerSnapshot)`. +To make this transition, follow these steps: 1. Implement the `TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializerSnapshot oldSerializerSnapshot)` whose logic should be same as the original `TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer newSerializer)`. diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.java index dea3d30a7b9..1fe4134ee51 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.java @@ -112,51 +112,6 @@ public interface TypeSerializerSnapshot<T> { */ TypeSerializer<T> restoreSerializer(); - /** - * Checks a new serializer's compatibility to read data written by the prior serializer. - * - * <p>When a checkpoint/savepoint is restored, this method checks whether the serialization - * format of the data in the checkpoint/savepoint is compatible for the format of the serializer - * used by the program that restores the checkpoint/savepoint. The outcome can be that the - * serialization format is compatible, that the program's serializer needs to reconfigure itself - * (meaning to incorporate some information from the TypeSerializerSnapshot to be compatible), - * that the format is outright incompatible, or that a migration needed. In the latter case, the - * TypeSerializerSnapshot produces a serializer to deserialize the data, and the restoring - * program's serializer re-serializes the data, thus converting the format during the restore - * operation. - * - * @deprecated This method has been replaced by {@link TypeSerializerSnapshot - * #resolveSchemaCompatibility(TypeSerializerSnapshot)} and will be removed in the future - * release. It's strongly recommended to migrate from old method to the new one, see the doc - * section "Migrating from deprecated - * `TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer newSerializer)`" for - * more details. - * @param newSerializer the new serializer to check. - * @return the serializer compatibility result. - */ - @Deprecated - default TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility( - TypeSerializer<T> newSerializer) { - // A temporal check to ensure that at least one method is implemented to avoid infinite loop - TypeSerializerSnapshot<T> newSerializerSnapshot = newSerializer.snapshotConfiguration(); - try { - Class<?> subClass = - newSerializerSnapshot - .getClass() - .getMethod("resolveSchemaCompatibility", TypeSerializerSnapshot.class) - .getDeclaringClass(); - if (subClass == TypeSerializerSnapshot.class) { - throw new UnsupportedOperationException( - "Must implement at least one method about 'resolveSchemaCompatibility', " - + "Recommend strongly to implement TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializerSnapshot), see FLIP-263 for more details"); - } - } catch (NoSuchMethodException e) { - throw new RuntimeException(e); - } - // Call the new method to resolve the schema compatibility which must be implemented - return newSerializerSnapshot.resolveSchemaCompatibility(this); - } - /** * Checks current serializer's compatibility to read data written by the prior serializer. * @@ -176,29 +131,8 @@ public interface TypeSerializerSnapshot<T> { * @param oldSerializerSnapshot the old serializer snapshot to check. * @return the serializer compatibility result. */ - default TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility( - TypeSerializerSnapshot<T> oldSerializerSnapshot) { - // A temporal check to ensure that at least one method is implemented to avoid infinite - // loop, - // which will be removed after removing the deprecated method - // TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer newSerializer). - try { - Class<?> subClass = - oldSerializerSnapshot - .getClass() - .getMethod("resolveSchemaCompatibility", TypeSerializer.class) - .getDeclaringClass(); - if (subClass == TypeSerializerSnapshot.class) { - throw new UnsupportedOperationException( - "Must implement at least one method about 'resolveSchemaCompatibility', " - + "Recommend strongly to implement TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializerSnapshot), see FLIP-263 for more details"); - } - } catch (NoSuchMethodException e) { - throw new RuntimeException(e); - } - // Call the old method to resolve the schema compatibility which must be implemented - return oldSerializerSnapshot.resolveSchemaCompatibility(restoreSerializer()); - } + TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility( + TypeSerializerSnapshot<T> oldSerializerSnapshot); // ------------------------------------------------------------------------ // read / write utilities diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java index cb9369cc3e7..1aa9df51d15 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java @@ -20,6 +20,7 @@ package org.apache.flink.api.common.typeutils.base; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; @@ -105,6 +106,14 @@ public final class GenericArraySerializerConfigSnapshot<C> implements TypeSerial componentClass, nestedSnapshot.getRestoredNestedSerializer(0)); } + @Override + public TypeSerializerSchemaCompatibility<C[]> resolveSchemaCompatibility( + TypeSerializerSnapshot<C[]> oldSerializerSnapshot) { + throw new UnsupportedOperationException( + "Unexpected call to GenericArraySerializerConfigSnapshot#resolveSchemaCompatibility." + + " GenericArraySerializerSnapshot should be used instead."); + } + @Nullable public TypeSerializerSnapshot<?>[] getNestedSerializerSnapshots() { return nestedSnapshot == null ? null : nestedSnapshot.getNestedSerializerSnapshots(); diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerSnapshot.java index 5c463eab3af..338789d3d5e 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerSnapshot.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerSnapshot.java @@ -21,6 +21,7 @@ package org.apache.flink.api.java.typeutils.runtime; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; @@ -106,4 +107,12 @@ public final class EitherSerializerSnapshot<L, R> implements TypeSerializerSnaps public TypeSerializerSnapshot<?>[] getNestedSerializerSnapshots() { return nestedSnapshot == null ? null : nestedSnapshot.getNestedSerializerSnapshots(); } + + @Override + public TypeSerializerSchemaCompatibility<Either<L, R>> resolveSchemaCompatibility( + TypeSerializerSnapshot<Either<L, R>> oldSerializerSnapshot) { + throw new UnsupportedOperationException( + "Unexpected call to EitherSerializerSnapshot#resolveSchemaCompatibility." + + " JavaEitherSerializerSnapshot should be used instead."); + } } diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotTest.java index d16eb886813..b176adc482b 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotTest.java @@ -24,86 +24,10 @@ import org.apache.flink.core.memory.DataOutputView; import org.junit.jupiter.api.Test; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Test for {@link TypeSerializerSnapshot} */ class TypeSerializerSnapshotTest { - @Test - void testIllegalSchemaCompatibility() { - TypeSerializerSnapshot<Integer> illegalSnapshot = - new NotCompletedTypeSerializerSnapshot() {}; - - // Should throw UnsupportedOperationException if both two methods are not implemented - assertThatThrownBy( - () -> - illegalSnapshot.resolveSchemaCompatibility( - new NotCompletedTypeSerializer())) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy( - () -> - illegalSnapshot.resolveSchemaCompatibility( - new NotCompletedTypeSerializer().snapshotConfiguration())) - .isInstanceOf(UnsupportedOperationException.class); - } - - @Test - void testNewSchemaCompatibility() { - TypeSerializerSnapshot<Integer> legalSnapshot = - new NotCompletedTypeSerializerSnapshot() { - @Override - public TypeSerializerSchemaCompatibility<Integer> resolveSchemaCompatibility( - TypeSerializerSnapshot<Integer> oldSerializerSnapshot) { - return TypeSerializerSchemaCompatibility.compatibleAsIs(); - } - }; - - // The result of resolving schema compatibility should always be determined by legalSnapshot - assertThat( - new NotCompletedTypeSerializerSnapshot() - .resolveSchemaCompatibility( - new NotCompletedTypeSerializer() { - @Override - public TypeSerializerSnapshot<Integer> - snapshotConfiguration() { - return legalSnapshot; - } - }) - .isCompatibleAsIs()) - .isTrue(); - assertThat( - legalSnapshot - .resolveSchemaCompatibility( - new NotCompletedTypeSerializerSnapshot() {}) - .isCompatibleAsIs()) - .isTrue(); - } - - @Test - void testOldSchemaCompatibility() { - TypeSerializerSnapshot<Integer> legalSnapshot = - new NotCompletedTypeSerializerSnapshot() { - - @Override - public TypeSerializerSchemaCompatibility<Integer> resolveSchemaCompatibility( - TypeSerializer<Integer> newSerializer) { - return TypeSerializerSchemaCompatibility.compatibleAsIs(); - } - }; - - // The result of resolving schema compatibility should always be determined by legalSnapshot - assertThat( - legalSnapshot - .resolveSchemaCompatibility(new NotCompletedTypeSerializer()) - .isCompatibleAsIs()) - .isTrue(); - assertThat( - new NotCompletedTypeSerializerSnapshot() - .resolveSchemaCompatibility(legalSnapshot) - .isCompatibleAsIs()) - .isTrue(); - } - @Test void testNestedSchemaCompatibility() { TypeSerializerSnapshot<Integer> innerSnapshot = @@ -119,9 +43,8 @@ class TypeSerializerSnapshotTest { new NotCompletedTypeSerializerSnapshot() { @Override public TypeSerializerSchemaCompatibility<Integer> resolveSchemaCompatibility( - TypeSerializer<Integer> newSerializer) { - return innerSnapshot.resolveSchemaCompatibility( - innerSnapshot.restoreSerializer()); + TypeSerializerSnapshot<Integer> newSerializer) { + return innerSnapshot.resolveSchemaCompatibility(innerSnapshot); } }; @@ -232,5 +155,11 @@ class TypeSerializerSnapshotTest { } }; } + + @Override + public TypeSerializerSchemaCompatibility<Integer> resolveSchemaCompatibility( + TypeSerializerSnapshot<Integer> oldSerializerSnapshot) { + return TypeSerializerSchemaCompatibility.incompatible(); + } } }
