This is an automated email from the ASF dual-hosted git repository.

zakelly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit be90707464342634a3cd7b52a7206fb5be1d5598
Author: yinhan.yh <[email protected]>
AuthorDate: Thu Sep 26 15:39:12 2024 +0800

    [FLINK-30614][serializer] Remove deprecated 
GenericArraySerializerConfigSnapshot and EitherSerializerSnapshot.
---
 .../base/GenericArraySerializerConfigSnapshot.java | 126 ---------------------
 .../base/GenericArraySerializerSnapshot.java       |  23 ----
 .../runtime/EitherSerializerSnapshot.java          | 118 -------------------
 .../runtime/JavaEitherSerializerSnapshot.java      |  11 --
 .../CompositeTypeSerializerUpgradeTest.java        |  17 ---
 5 files changed, 295 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
deleted file mode 100644
index 1aa9df51d15..00000000000
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common.typeutils.base;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate;
-import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.InstantiationUtil;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-
-import static org.apache.flink.util.Preconditions.checkState;
-
-/**
- * Point-in-time configuration of a {@link GenericArraySerializer}.
- *
- * @param <C> The component type.
- * @deprecated this is deprecated and no longer used by the {@link 
GenericArraySerializer}. It has
- *     been replaced by {@link GenericArraySerializerSnapshot}.
- */
-@Internal
-@Deprecated
-public final class GenericArraySerializerConfigSnapshot<C> implements 
TypeSerializerSnapshot<C[]> {
-
-    private static final int CURRENT_VERSION = 2;
-
-    /** The class of the components of the serializer's array type. */
-    @Nullable private Class<C> componentClass;
-
-    /** Snapshot handling for the component serializer snapshot. */
-    @Nullable private NestedSerializersSnapshotDelegate nestedSnapshot;
-
-    /** Constructor for read instantiation. */
-    @SuppressWarnings("unused")
-    public GenericArraySerializerConfigSnapshot() {}
-
-    /** Constructor to create the snapshot for writing. */
-    public GenericArraySerializerConfigSnapshot(GenericArraySerializer<C> 
serializer) {
-        this.componentClass = serializer.getComponentClass();
-        this.nestedSnapshot =
-                new 
NestedSerializersSnapshotDelegate(serializer.getComponentSerializer());
-    }
-
-    // ------------------------------------------------------------------------
-
-    @Override
-    public int getCurrentVersion() {
-        return CURRENT_VERSION;
-    }
-
-    @Override
-    public void writeSnapshot(DataOutputView out) throws IOException {
-        checkState(componentClass != null && nestedSnapshot != null);
-        out.writeUTF(componentClass.getName());
-        nestedSnapshot.writeNestedSerializerSnapshots(out);
-    }
-
-    @Override
-    public void readSnapshot(int readVersion, DataInputView in, ClassLoader 
classLoader)
-            throws IOException {
-        switch (readVersion) {
-            case 1:
-                throw new UnsupportedOperationException(
-                        String.format(
-                                "No longer supported version [%d]. Please 
upgrade first to Flink 1.16. ",
-                                readVersion));
-            case 2:
-                readV2(in, classLoader);
-                break;
-            default:
-                throw new IllegalArgumentException("Unrecognized version: " + 
readVersion);
-        }
-    }
-
-    private void readV2(DataInputView in, ClassLoader classLoader) throws 
IOException {
-        componentClass = InstantiationUtil.resolveClassByName(in, classLoader);
-        nestedSnapshot =
-                
NestedSerializersSnapshotDelegate.readNestedSerializerSnapshots(in, 
classLoader);
-    }
-
-    @Override
-    public GenericArraySerializer<C> restoreSerializer() {
-        checkState(componentClass != null && nestedSnapshot != null);
-        return new GenericArraySerializer<>(
-                componentClass, nestedSnapshot.getRestoredNestedSerializer(0));
-    }
-
-    @Override
-    public TypeSerializerSchemaCompatibility<C[]> resolveSchemaCompatibility(
-            TypeSerializerSnapshot<C[]> oldSerializerSnapshot) {
-        throw new UnsupportedOperationException(
-                "Unexpected call to 
GenericArraySerializerConfigSnapshot#resolveSchemaCompatibility."
-                        + " GenericArraySerializerSnapshot should be used 
instead.");
-    }
-
-    @Nullable
-    public TypeSerializerSnapshot<?>[] getNestedSerializerSnapshots() {
-        return nestedSnapshot == null ? null : 
nestedSnapshot.getNestedSerializerSnapshots();
-    }
-
-    @Nullable
-    public Class<C> getComponentClass() {
-        return componentClass;
-    }
-}
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java
index 6777aba2972..6e24edeb13b 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java
@@ -19,7 +19,6 @@
 package org.apache.flink.api.common.typeutils.base;
 
 import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
-import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
@@ -28,7 +27,6 @@ import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.InstantiationUtil;
 
 import java.io.IOException;
-import java.util.Objects;
 
 /**
  * Point-in-time configuration of a {@link GenericArraySerializer}.
@@ -51,15 +49,6 @@ public final class GenericArraySerializerSnapshot<C>
         this.componentClass = genericArraySerializer.getComponentClass();
     }
 
-    /**
-     * Constructor that the legacy {@link 
GenericArraySerializerConfigSnapshot} uses to delegate
-     * compatibility checks to this class.
-     */
-    @SuppressWarnings("deprecation")
-    GenericArraySerializerSnapshot(Class<C> componentClass) {
-        this.componentClass = componentClass;
-    }
-
     @Override
     protected int getCurrentOuterSnapshotVersion() {
         return CURRENT_VERSION;
@@ -80,14 +69,6 @@ public final class GenericArraySerializerSnapshot<C>
     @Override
     public TypeSerializerSchemaCompatibility<C[]> resolveSchemaCompatibility(
             TypeSerializerSnapshot<C[]> oldSerializerSnapshot) {
-        if (oldSerializerSnapshot instanceof 
GenericArraySerializerConfigSnapshot) {
-            return 
CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
-                    oldSerializerSnapshot,
-                    this,
-                    Objects.requireNonNull(
-                            ((GenericArraySerializerConfigSnapshot<C>) 
oldSerializerSnapshot)
-                                    .getNestedSerializerSnapshots()));
-        }
         return super.resolveSchemaCompatibility(oldSerializerSnapshot);
     }
 
@@ -98,10 +79,6 @@ public final class GenericArraySerializerSnapshot<C>
         if (oldSerializerSnapshot instanceof GenericArraySerializerSnapshot) {
             componentClass =
                     ((GenericArraySerializerSnapshot<C>) 
oldSerializerSnapshot).componentClass;
-        } else if (oldSerializerSnapshot instanceof 
GenericArraySerializerConfigSnapshot) {
-            componentClass =
-                    ((GenericArraySerializerConfigSnapshot<C>) 
oldSerializerSnapshot)
-                            .getComponentClass();
         } else {
             return OuterSchemaCompatibility.INCOMPATIBLE;
         }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerSnapshot.java
deleted file mode 100644
index 338789d3d5e..00000000000
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerSnapshot.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.Either;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-
-import static org.apache.flink.util.Preconditions.checkState;
-
-/**
- * Configuration snapshot for the {@link EitherSerializer}.
- *
- * @deprecated this snapshot class is no longer used by any serializers. 
Instead, {@link
- *     JavaEitherSerializerSnapshot} is used.
- */
-@Internal
-@Deprecated
-public final class EitherSerializerSnapshot<L, R> implements 
TypeSerializerSnapshot<Either<L, R>> {
-
-    private static final int CURRENT_VERSION = 2;
-
-    /** Snapshot handling for the component serializer snapshot. */
-    @Nullable private NestedSerializersSnapshotDelegate nestedSnapshot;
-
-    /** Constructor for read instantiation. */
-    @SuppressWarnings("unused")
-    public EitherSerializerSnapshot() {}
-
-    /** Constructor to create the snapshot for writing. */
-    public EitherSerializerSnapshot(
-            TypeSerializer<L> leftSerializer, TypeSerializer<R> 
rightSerializer) {
-
-        this.nestedSnapshot =
-                new NestedSerializersSnapshotDelegate(leftSerializer, 
rightSerializer);
-    }
-
-    // ------------------------------------------------------------------------
-
-    @Override
-    public int getCurrentVersion() {
-        return CURRENT_VERSION;
-    }
-
-    @Override
-    public void writeSnapshot(DataOutputView out) throws IOException {
-        checkState(nestedSnapshot != null);
-        nestedSnapshot.writeNestedSerializerSnapshots(out);
-    }
-
-    @Override
-    public void readSnapshot(int readVersion, DataInputView in, ClassLoader 
classLoader)
-            throws IOException {
-        switch (readVersion) {
-            case 1:
-                throw new UnsupportedOperationException(
-                        String.format(
-                                "No longer supported version [%d]. Please 
upgrade first to Flink 1.16. ",
-                                readVersion));
-            case 2:
-                readV2(in, classLoader);
-                break;
-            default:
-                throw new IllegalArgumentException("Unrecognized version: " + 
readVersion);
-        }
-    }
-
-    private void readV2(DataInputView in, ClassLoader classLoader) throws 
IOException {
-        nestedSnapshot =
-                
NestedSerializersSnapshotDelegate.readNestedSerializerSnapshots(in, 
classLoader);
-    }
-
-    @Override
-    public EitherSerializer<L, R> restoreSerializer() {
-        checkState(nestedSnapshot != null);
-        return new EitherSerializer<>(
-                nestedSnapshot.getRestoredNestedSerializer(0),
-                nestedSnapshot.getRestoredNestedSerializer(1));
-    }
-
-    @Nullable
-    public TypeSerializerSnapshot<?>[] getNestedSerializerSnapshots() {
-        return nestedSnapshot == null ? null : 
nestedSnapshot.getNestedSerializerSnapshots();
-    }
-
-    @Override
-    public TypeSerializerSchemaCompatibility<Either<L, R>> 
resolveSchemaCompatibility(
-            TypeSerializerSnapshot<Either<L, R>> oldSerializerSnapshot) {
-        throw new UnsupportedOperationException(
-                "Unexpected call to 
EitherSerializerSnapshot#resolveSchemaCompatibility."
-                        + " JavaEitherSerializerSnapshot should be used 
instead.");
-    }
-}
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaEitherSerializerSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaEitherSerializerSnapshot.java
index e33ce8e6ac3..e0fc4877b17 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaEitherSerializerSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaEitherSerializerSnapshot.java
@@ -19,14 +19,11 @@
 package org.apache.flink.api.java.typeutils.runtime;
 
 import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
-import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.types.Either;
 
-import java.util.Objects;
-
 /** Snapshot class for the {@link EitherSerializer}. */
 public class JavaEitherSerializerSnapshot<L, R>
         extends CompositeTypeSerializerSnapshot<Either<L, R>, 
EitherSerializer<L, R>> {
@@ -49,14 +46,6 @@ public class JavaEitherSerializerSnapshot<L, R>
     @Override
     public TypeSerializerSchemaCompatibility<Either<L, R>> 
resolveSchemaCompatibility(
             TypeSerializerSnapshot<Either<L, R>> oldSerializerSnapshot) {
-        if (oldSerializerSnapshot instanceof EitherSerializerSnapshot) {
-            return 
CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
-                    oldSerializerSnapshot,
-                    this,
-                    Objects.requireNonNull(
-                            ((EitherSerializerSnapshot<L, R>) 
oldSerializerSnapshot)
-                                    .getNestedSerializerSnapshots()));
-        }
         return super.resolveSchemaCompatibility(oldSerializerSnapshot);
     }
 
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerUpgradeTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerUpgradeTest.java
index 5e9526fbed0..eeaece4b260 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerUpgradeTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerUpgradeTest.java
@@ -20,21 +20,17 @@ package org.apache.flink.api.common.typeutils;
 
 import org.apache.flink.FlinkVersion;
 import org.apache.flink.api.common.typeutils.base.GenericArraySerializer;
-import 
org.apache.flink.api.common.typeutils.base.GenericArraySerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.typeutils.runtime.EitherSerializer;
 import org.apache.flink.types.Either;
 
 import org.assertj.core.api.Condition;
-import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 
-import static org.assertj.core.api.Assertions.assertThat;
-
 /** A {@link TypeSerializerUpgradeTestBase} for {@link 
GenericArraySerializer}. */
 class CompositeTypeSerializerUpgradeTest extends 
TypeSerializerUpgradeTestBase<Object, Object> {
 
@@ -148,17 +144,4 @@ class CompositeTypeSerializerUpgradeTest extends 
TypeSerializerUpgradeTestBase<O
             return TypeSerializerConditions.isCompatibleAsIs();
         }
     }
-
-    @Test
-    void testUpgradeFromDeprecatedSnapshot() {
-        GenericArraySerializer<String> genericArraySerializer =
-                new GenericArraySerializer<>(String.class, 
StringSerializer.INSTANCE);
-        GenericArraySerializerConfigSnapshot<String> oldSnapshot =
-                new 
GenericArraySerializerConfigSnapshot<>(genericArraySerializer);
-        TypeSerializerSchemaCompatibility<String[]> schemaCompatibility =
-                genericArraySerializer
-                        .snapshotConfiguration()
-                        .resolveSchemaCompatibility(oldSnapshot);
-        assertThat(schemaCompatibility.isCompatibleAsIs()).isTrue();
-    }
 }

Reply via email to