This is an automated email from the ASF dual-hosted git repository. zakelly pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit be90707464342634a3cd7b52a7206fb5be1d5598 Author: yinhan.yh <[email protected]> AuthorDate: Thu Sep 26 15:39:12 2024 +0800 [FLINK-30614][serializer] Remove deprecated GenericArraySerializerConfigSnapshot and EitherSerializerSnapshot. --- .../base/GenericArraySerializerConfigSnapshot.java | 126 --------------------- .../base/GenericArraySerializerSnapshot.java | 23 ---- .../runtime/EitherSerializerSnapshot.java | 118 ------------------- .../runtime/JavaEitherSerializerSnapshot.java | 11 -- .../CompositeTypeSerializerUpgradeTest.java | 17 --- 5 files changed, 295 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java deleted file mode 100644 index 1aa9df51d15..00000000000 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.common.typeutils.base; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate; -import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; -import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.util.InstantiationUtil; - -import javax.annotation.Nullable; - -import java.io.IOException; - -import static org.apache.flink.util.Preconditions.checkState; - -/** - * Point-in-time configuration of a {@link GenericArraySerializer}. - * - * @param <C> The component type. - * @deprecated this is deprecated and no longer used by the {@link GenericArraySerializer}. It has - * been replaced by {@link GenericArraySerializerSnapshot}. - */ -@Internal -@Deprecated -public final class GenericArraySerializerConfigSnapshot<C> implements TypeSerializerSnapshot<C[]> { - - private static final int CURRENT_VERSION = 2; - - /** The class of the components of the serializer's array type. */ - @Nullable private Class<C> componentClass; - - /** Snapshot handling for the component serializer snapshot. */ - @Nullable private NestedSerializersSnapshotDelegate nestedSnapshot; - - /** Constructor for read instantiation. */ - @SuppressWarnings("unused") - public GenericArraySerializerConfigSnapshot() {} - - /** Constructor to create the snapshot for writing. */ - public GenericArraySerializerConfigSnapshot(GenericArraySerializer<C> serializer) { - this.componentClass = serializer.getComponentClass(); - this.nestedSnapshot = - new NestedSerializersSnapshotDelegate(serializer.getComponentSerializer()); - } - - // ------------------------------------------------------------------------ - - @Override - public int getCurrentVersion() { - return CURRENT_VERSION; - } - - @Override - public void writeSnapshot(DataOutputView out) throws IOException { - checkState(componentClass != null && nestedSnapshot != null); - out.writeUTF(componentClass.getName()); - nestedSnapshot.writeNestedSerializerSnapshots(out); - } - - @Override - public void readSnapshot(int readVersion, DataInputView in, ClassLoader classLoader) - throws IOException { - switch (readVersion) { - case 1: - throw new UnsupportedOperationException( - String.format( - "No longer supported version [%d]. Please upgrade first to Flink 1.16. ", - readVersion)); - case 2: - readV2(in, classLoader); - break; - default: - throw new IllegalArgumentException("Unrecognized version: " + readVersion); - } - } - - private void readV2(DataInputView in, ClassLoader classLoader) throws IOException { - componentClass = InstantiationUtil.resolveClassByName(in, classLoader); - nestedSnapshot = - NestedSerializersSnapshotDelegate.readNestedSerializerSnapshots(in, classLoader); - } - - @Override - public GenericArraySerializer<C> restoreSerializer() { - checkState(componentClass != null && nestedSnapshot != null); - return new GenericArraySerializer<>( - componentClass, nestedSnapshot.getRestoredNestedSerializer(0)); - } - - @Override - public TypeSerializerSchemaCompatibility<C[]> resolveSchemaCompatibility( - TypeSerializerSnapshot<C[]> oldSerializerSnapshot) { - throw new UnsupportedOperationException( - "Unexpected call to GenericArraySerializerConfigSnapshot#resolveSchemaCompatibility." - + " GenericArraySerializerSnapshot should be used instead."); - } - - @Nullable - public TypeSerializerSnapshot<?>[] getNestedSerializerSnapshots() { - return nestedSnapshot == null ? null : nestedSnapshot.getNestedSerializerSnapshots(); - } - - @Nullable - public Class<C> getComponentClass() { - return componentClass; - } -} diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java index 6777aba2972..6e24edeb13b 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java @@ -19,7 +19,6 @@ package org.apache.flink.api.common.typeutils.base; import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot; -import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; @@ -28,7 +27,6 @@ import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.util.InstantiationUtil; import java.io.IOException; -import java.util.Objects; /** * Point-in-time configuration of a {@link GenericArraySerializer}. @@ -51,15 +49,6 @@ public final class GenericArraySerializerSnapshot<C> this.componentClass = genericArraySerializer.getComponentClass(); } - /** - * Constructor that the legacy {@link GenericArraySerializerConfigSnapshot} uses to delegate - * compatibility checks to this class. - */ - @SuppressWarnings("deprecation") - GenericArraySerializerSnapshot(Class<C> componentClass) { - this.componentClass = componentClass; - } - @Override protected int getCurrentOuterSnapshotVersion() { return CURRENT_VERSION; @@ -80,14 +69,6 @@ public final class GenericArraySerializerSnapshot<C> @Override public TypeSerializerSchemaCompatibility<C[]> resolveSchemaCompatibility( TypeSerializerSnapshot<C[]> oldSerializerSnapshot) { - if (oldSerializerSnapshot instanceof GenericArraySerializerConfigSnapshot) { - return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot( - oldSerializerSnapshot, - this, - Objects.requireNonNull( - ((GenericArraySerializerConfigSnapshot<C>) oldSerializerSnapshot) - .getNestedSerializerSnapshots())); - } return super.resolveSchemaCompatibility(oldSerializerSnapshot); } @@ -98,10 +79,6 @@ public final class GenericArraySerializerSnapshot<C> if (oldSerializerSnapshot instanceof GenericArraySerializerSnapshot) { componentClass = ((GenericArraySerializerSnapshot<C>) oldSerializerSnapshot).componentClass; - } else if (oldSerializerSnapshot instanceof GenericArraySerializerConfigSnapshot) { - componentClass = - ((GenericArraySerializerConfigSnapshot<C>) oldSerializerSnapshot) - .getComponentClass(); } else { return OuterSchemaCompatibility.INCOMPATIBLE; } diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerSnapshot.java deleted file mode 100644 index 338789d3d5e..00000000000 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerSnapshot.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; -import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.types.Either; - -import javax.annotation.Nullable; - -import java.io.IOException; - -import static org.apache.flink.util.Preconditions.checkState; - -/** - * Configuration snapshot for the {@link EitherSerializer}. - * - * @deprecated this snapshot class is no longer used by any serializers. Instead, {@link - * JavaEitherSerializerSnapshot} is used. - */ -@Internal -@Deprecated -public final class EitherSerializerSnapshot<L, R> implements TypeSerializerSnapshot<Either<L, R>> { - - private static final int CURRENT_VERSION = 2; - - /** Snapshot handling for the component serializer snapshot. */ - @Nullable private NestedSerializersSnapshotDelegate nestedSnapshot; - - /** Constructor for read instantiation. */ - @SuppressWarnings("unused") - public EitherSerializerSnapshot() {} - - /** Constructor to create the snapshot for writing. */ - public EitherSerializerSnapshot( - TypeSerializer<L> leftSerializer, TypeSerializer<R> rightSerializer) { - - this.nestedSnapshot = - new NestedSerializersSnapshotDelegate(leftSerializer, rightSerializer); - } - - // ------------------------------------------------------------------------ - - @Override - public int getCurrentVersion() { - return CURRENT_VERSION; - } - - @Override - public void writeSnapshot(DataOutputView out) throws IOException { - checkState(nestedSnapshot != null); - nestedSnapshot.writeNestedSerializerSnapshots(out); - } - - @Override - public void readSnapshot(int readVersion, DataInputView in, ClassLoader classLoader) - throws IOException { - switch (readVersion) { - case 1: - throw new UnsupportedOperationException( - String.format( - "No longer supported version [%d]. Please upgrade first to Flink 1.16. ", - readVersion)); - case 2: - readV2(in, classLoader); - break; - default: - throw new IllegalArgumentException("Unrecognized version: " + readVersion); - } - } - - private void readV2(DataInputView in, ClassLoader classLoader) throws IOException { - nestedSnapshot = - NestedSerializersSnapshotDelegate.readNestedSerializerSnapshots(in, classLoader); - } - - @Override - public EitherSerializer<L, R> restoreSerializer() { - checkState(nestedSnapshot != null); - return new EitherSerializer<>( - nestedSnapshot.getRestoredNestedSerializer(0), - nestedSnapshot.getRestoredNestedSerializer(1)); - } - - @Nullable - public TypeSerializerSnapshot<?>[] getNestedSerializerSnapshots() { - return nestedSnapshot == null ? null : nestedSnapshot.getNestedSerializerSnapshots(); - } - - @Override - public TypeSerializerSchemaCompatibility<Either<L, R>> resolveSchemaCompatibility( - TypeSerializerSnapshot<Either<L, R>> oldSerializerSnapshot) { - throw new UnsupportedOperationException( - "Unexpected call to EitherSerializerSnapshot#resolveSchemaCompatibility." - + " JavaEitherSerializerSnapshot should be used instead."); - } -} diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaEitherSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaEitherSerializerSnapshot.java index e33ce8e6ac3..e0fc4877b17 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaEitherSerializerSnapshot.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaEitherSerializerSnapshot.java @@ -19,14 +19,11 @@ package org.apache.flink.api.java.typeutils.runtime; import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot; -import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.apache.flink.types.Either; -import java.util.Objects; - /** Snapshot class for the {@link EitherSerializer}. */ public class JavaEitherSerializerSnapshot<L, R> extends CompositeTypeSerializerSnapshot<Either<L, R>, EitherSerializer<L, R>> { @@ -49,14 +46,6 @@ public class JavaEitherSerializerSnapshot<L, R> @Override public TypeSerializerSchemaCompatibility<Either<L, R>> resolveSchemaCompatibility( TypeSerializerSnapshot<Either<L, R>> oldSerializerSnapshot) { - if (oldSerializerSnapshot instanceof EitherSerializerSnapshot) { - return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot( - oldSerializerSnapshot, - this, - Objects.requireNonNull( - ((EitherSerializerSnapshot<L, R>) oldSerializerSnapshot) - .getNestedSerializerSnapshots())); - } return super.resolveSchemaCompatibility(oldSerializerSnapshot); } diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerUpgradeTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerUpgradeTest.java index 5e9526fbed0..eeaece4b260 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerUpgradeTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerUpgradeTest.java @@ -20,21 +20,17 @@ package org.apache.flink.api.common.typeutils; import org.apache.flink.FlinkVersion; import org.apache.flink.api.common.typeutils.base.GenericArraySerializer; -import org.apache.flink.api.common.typeutils.base.GenericArraySerializerConfigSnapshot; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.java.typeutils.runtime.EitherSerializer; import org.apache.flink.types.Either; import org.assertj.core.api.Condition; -import org.junit.jupiter.api.Test; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import static org.assertj.core.api.Assertions.assertThat; - /** A {@link TypeSerializerUpgradeTestBase} for {@link GenericArraySerializer}. */ class CompositeTypeSerializerUpgradeTest extends TypeSerializerUpgradeTestBase<Object, Object> { @@ -148,17 +144,4 @@ class CompositeTypeSerializerUpgradeTest extends TypeSerializerUpgradeTestBase<O return TypeSerializerConditions.isCompatibleAsIs(); } } - - @Test - void testUpgradeFromDeprecatedSnapshot() { - GenericArraySerializer<String> genericArraySerializer = - new GenericArraySerializer<>(String.class, StringSerializer.INSTANCE); - GenericArraySerializerConfigSnapshot<String> oldSnapshot = - new GenericArraySerializerConfigSnapshot<>(genericArraySerializer); - TypeSerializerSchemaCompatibility<String[]> schemaCompatibility = - genericArraySerializer - .snapshotConfiguration() - .resolveSchemaCompatibility(oldSnapshot); - assertThat(schemaCompatibility.isCompatibleAsIs()).isTrue(); - } }
