This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 52e23d57133 [FLINK-36318] Fix deserializing from 1.18 savepoint with 
RAW<MAP...> types
52e23d57133 is described below

commit 52e23d57133ac1eb9dde14b9dc1504717c635b9b
Author: Dawid Wysakowicz <dwysakow...@apache.org>
AuthorDate: Fri Sep 20 21:52:53 2024 +0200

    [FLINK-36318] Fix deserializing from 1.18 savepoint with RAW<MAP...> types
    
    The commit fixes restoring from savepoints which were created for SQL 
queries which used functions such as LAG/LEAD/ARRAY_AGG which use a RAW type 
for accumulators.
---
 .../org/apache/flink/util/InstantiationUtil.java   | 211 ++++++--------
 .../nodes/exec/stream/OverWindowRestoreTest.java   |  55 ++++
 .../nodes/exec/stream/OverWindowTestPrograms.java  |  61 ++++
 .../plan/nodes/exec/testutils/RestoreTestBase.java |  60 +++-
 .../1.18/savepoint/OverWindowRestoreTest           | 176 ++++++++++++
 .../over-aggregate-lag/1.18/savepoint/_metadata    | Bin 0 -> 24310 bytes
 .../plan/over-aggregate-lag.json                   | 318 +++++++++++++++++++++
 .../over-aggregate-lag/savepoint/_metadata         | Bin 0 -> 24310 bytes
 .../runtime/typeutils/ArrayDataSerializer.java     |   6 +-
 .../table/runtime/typeutils/MapDataSerializer.java |  13 +-
 .../table/runtime/typeutils/RowDataSerializer.java |   3 +-
 11 files changed, 756 insertions(+), 147 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java 
b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
index 8207e6379a2..334a30cde88 100644
--- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
@@ -20,7 +20,6 @@ package org.apache.flink.util;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.MapSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
@@ -46,9 +45,8 @@ import java.lang.reflect.Proxy;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.zip.DeflaterOutputStream;
 import java.util.zip.InflaterInputStream;
 
@@ -146,144 +144,93 @@ public final class InstantiationUtil {
     }
 
     /**
-     * This is maintained as a temporary workaround for FLINK-6869.
-     *
-     * <p>Before 1.3, the Scala serializers did not specify the 
serialVersionUID. Although since 1.3
-     * they are properly specified, we still have to ignore them for now as 
their previous
-     * serialVersionUIDs will vary depending on the Scala version.
-     *
-     * <p>This can be removed once 1.2 is no longer supported.
+     * Workaround for bugs like e.g. FLINK-36318 where we serialize a class 
into a snapshot and then
+     * its serialVersionUID is changed in an uncontrolled way. This lets us 
deserialize the old
+     * snapshot assuming the binary representation of the faulty class has not 
changed.
      */
-    private static final Set<String> scalaSerializerClassnames = new 
HashSet<>();
+    private static final class VersionMismatchHandler {
 
-    static {
-        
scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.TraversableSerializer");
-        
scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.CaseClassSerializer");
-        
scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EitherSerializer");
-        
scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EnumValueSerializer");
-        
scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.OptionSerializer");
-        
scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.TrySerializer");
-        
scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.UnitSerializer");
-    }
+        private final Map<String, Map<Long, List<Long>>> 
supportedSerialVersionUidsPerClass =
+                new HashMap<>();
 
-    /**
-     * The serialVersionUID might change between Scala versions and since 
those classes are part of
-     * the tuple serializer config snapshots we need to ignore them.
-     *
-     * @see <a 
href="https://issues.apache.org/jira/browse/FLINK-8451";>FLINK-8451</a>
-     */
-    private static final Set<String> scalaTypes = new HashSet<>();
+        void addVersionsMatch(
+                String className, long localVersionUID, List<Long> 
streamVersionUIDs) {
+            supportedSerialVersionUidsPerClass
+                    .computeIfAbsent(className, k -> new HashMap<>())
+                    .put(localVersionUID, streamVersionUIDs);
+        }
 
-    static {
-        scalaTypes.add("scala.Tuple1");
-        scalaTypes.add("scala.Tuple2");
-        scalaTypes.add("scala.Tuple3");
-        scalaTypes.add("scala.Tuple4");
-        scalaTypes.add("scala.Tuple5");
-        scalaTypes.add("scala.Tuple6");
-        scalaTypes.add("scala.Tuple7");
-        scalaTypes.add("scala.Tuple8");
-        scalaTypes.add("scala.Tuple9");
-        scalaTypes.add("scala.Tuple10");
-        scalaTypes.add("scala.Tuple11");
-        scalaTypes.add("scala.Tuple12");
-        scalaTypes.add("scala.Tuple13");
-        scalaTypes.add("scala.Tuple14");
-        scalaTypes.add("scala.Tuple15");
-        scalaTypes.add("scala.Tuple16");
-        scalaTypes.add("scala.Tuple17");
-        scalaTypes.add("scala.Tuple18");
-        scalaTypes.add("scala.Tuple19");
-        scalaTypes.add("scala.Tuple20");
-        scalaTypes.add("scala.Tuple21");
-        scalaTypes.add("scala.Tuple22");
-        scalaTypes.add("scala.Tuple1$mcJ$sp");
-        scalaTypes.add("scala.Tuple1$mcI$sp");
-        scalaTypes.add("scala.Tuple1$mcD$sp");
-        scalaTypes.add("scala.Tuple2$mcJJ$sp");
-        scalaTypes.add("scala.Tuple2$mcJI$sp");
-        scalaTypes.add("scala.Tuple2$mcJD$sp");
-        scalaTypes.add("scala.Tuple2$mcIJ$sp");
-        scalaTypes.add("scala.Tuple2$mcII$sp");
-        scalaTypes.add("scala.Tuple2$mcID$sp");
-        scalaTypes.add("scala.Tuple2$mcDJ$sp");
-        scalaTypes.add("scala.Tuple2$mcDI$sp");
-        scalaTypes.add("scala.Tuple2$mcDD$sp");
-        scalaTypes.add("scala.Enumeration$ValueSet");
-    }
-
-    private static boolean isAnonymousClass(Class clazz) {
-        final String name = clazz.getName();
-
-        // isAnonymousClass does not work for anonymous Scala classes; 
additionally check by class
-        // name
-        if (name.contains("$anon$") || name.contains("$anonfun") || 
name.contains("$macro$")) {
-            return true;
-        }
-
-        // calling isAnonymousClass or getSimpleName can throw InternalError 
for certain Scala
-        // types, see https://issues.scala-lang.org/browse/SI-2034
-        // until we move to JDK 9, this try-catch is necessary
-        try {
-            return clazz.isAnonymousClass();
-        } catch (InternalError e) {
-            return false;
+        /**
+         * Checks if the local version of the given class can safely 
deserialize the class of a
+         * different version from the object stream.
+         */
+        boolean shouldTolerateSerialVersionMismatch(
+                String className, long localVersionUID, long streamVersionUID) 
{
+            return supportedSerialVersionUidsPerClass
+                    .getOrDefault(className, Collections.emptyMap())
+                    .getOrDefault(localVersionUID, Collections.emptyList())
+                    .contains(streamVersionUID);
+        }
+
+        /**
+         * Checks if there are any rules for the given class. This lets us 
decide early if we need
+         * to look up the local class.
+         */
+        boolean haveRulesForClass(String className) {
+            return supportedSerialVersionUidsPerClass.containsKey(className);
         }
     }
 
-    private static boolean isOldAvroSerializer(String name, long 
serialVersionUID) {
-        // please see FLINK-11436 for details on why we need to ignore serial 
version UID here for
-        // the AvroSerializer
-        return (serialVersionUID == 1)
-                && 
"org.apache.flink.formats.avro.typeutils.AvroSerializer".equals(name);
+    private static final VersionMismatchHandler versionMismatchHandler =
+            new VersionMismatchHandler();
+
+    static {
+        // See FLINK-36318
+        versionMismatchHandler.addVersionsMatch(
+                "org.apache.flink.table.runtime.typeutils.MapDataSerializer",
+                4073842523628732956L,
+                Collections.singletonList(2533002123505507000L));
     }
 
     /**
-     * A mapping between the full path of a deprecated serializer and its 
equivalent. These mappings
-     * are hardcoded and fixed.
-     *
-     * <p>IMPORTANT: mappings can be removed after 1 release as there will be 
a "migration path". As
-     * an example, a serializer is removed in 1.5-SNAPSHOT, then the mapping 
should be added for
-     * 1.5, and it can be removed in 1.6, as the path would be Flink-{< 1.5} 
-> Flink-1.5 ->
-     * Flink-{>= 1.6}.
+     * An {@link ObjectInputStream} that ignores certain serialVersionUID 
mismatches. This is a
+     * workaround for uncontrolled serialVersionUIDs changes.
      */
-    private enum MigrationUtil {
-
-        // To add a new mapping just pick a name and add an entry as the 
following:
-
-        HASH_MAP_SERIALIZER(
-                "org.apache.flink.runtime.state.HashMapSerializer",
-                ObjectStreamClass.lookup(MapSerializer.class)); // added in 1.5
-
-        /**
-         * An internal unmodifiable map containing the mappings between 
deprecated and new
-         * serializers.
-         */
-        private static final Map<String, ObjectStreamClass> EQUIVALENCE_MAP =
-                Collections.unmodifiableMap(initMap());
-
-        /** The full name of the class of the old serializer. */
-        private final String oldSerializerName;
-
-        /** The serialization descriptor of the class of the new serializer. */
-        private final ObjectStreamClass newSerializerStreamClass;
+    public static class FailureTolerantObjectInputStream
+            extends InstantiationUtil.ClassLoaderObjectInputStream {
 
-        MigrationUtil(String oldSerializerName, ObjectStreamClass 
newSerializerStreamClass) {
-            this.oldSerializerName = oldSerializerName;
-            this.newSerializerStreamClass = newSerializerStreamClass;
+        public FailureTolerantObjectInputStream(InputStream in, ClassLoader 
cl) throws IOException {
+            super(in, cl);
         }
 
-        private static Map<String, ObjectStreamClass> initMap() {
-            final Map<String, ObjectStreamClass> init =
-                    CollectionUtil.newHashMapWithExpectedSize(4);
-            for (MigrationUtil m : MigrationUtil.values()) {
-                init.put(m.oldSerializerName, m.newSerializerStreamClass);
+        @Override
+        protected ObjectStreamClass readClassDescriptor()
+                throws IOException, ClassNotFoundException {
+            ObjectStreamClass streamClassDescriptor = 
super.readClassDescriptor();
+
+            final Class localClass = resolveClass(streamClassDescriptor);
+            final String name = localClass.getName();
+            if (versionMismatchHandler.haveRulesForClass(name)) {
+                final ObjectStreamClass localClassDescriptor = 
ObjectStreamClass.lookup(localClass);
+                if (localClassDescriptor != null
+                        && localClassDescriptor.getSerialVersionUID()
+                                != 
streamClassDescriptor.getSerialVersionUID()) {
+                    if 
(versionMismatchHandler.shouldTolerateSerialVersionMismatch(
+                            name,
+                            localClassDescriptor.getSerialVersionUID(),
+                            streamClassDescriptor.getSerialVersionUID())) {
+                        LOG.warn(
+                                "Ignoring serialVersionUID mismatch for class 
{}; was {}, now {}.",
+                                streamClassDescriptor.getName(),
+                                streamClassDescriptor.getSerialVersionUID(),
+                                localClassDescriptor.getSerialVersionUID());
+
+                        streamClassDescriptor = localClassDescriptor;
+                    }
+                }
             }
-            return init;
-        }
 
-        private static ObjectStreamClass getEquivalentSerializer(String 
classDescriptorName) {
-            return EQUIVALENCE_MAP.get(classDescriptorName);
+            return streamClassDescriptor;
         }
     }
 
@@ -515,20 +462,28 @@ public final class InstantiationUtil {
         return serializer.deserialize(reuse, inputViewWrapper);
     }
 
-    @SuppressWarnings("unchecked")
     public static <T> T deserializeObject(byte[] bytes, ClassLoader cl)
             throws IOException, ClassNotFoundException {
         return deserializeObject(new ByteArrayInputStream(bytes), cl);
     }
 
-    @SuppressWarnings("unchecked")
     public static <T> T deserializeObject(InputStream in, ClassLoader cl)
             throws IOException, ClassNotFoundException {
+        return deserializeObject(in, cl, false);
+    }
+
+    @SuppressWarnings("unchecked")
+    public static <T> T deserializeObject(
+            InputStream in, ClassLoader cl, boolean 
tolerateKnownVersionMismatch)
+            throws IOException, ClassNotFoundException {
 
         final ClassLoader old = Thread.currentThread().getContextClassLoader();
         // not using resource try to avoid AutoClosable's close() on the given 
stream
         try {
-            ObjectInputStream oois = new 
InstantiationUtil.ClassLoaderObjectInputStream(in, cl);
+            ObjectInputStream oois =
+                    tolerateKnownVersionMismatch
+                            ? new 
InstantiationUtil.FailureTolerantObjectInputStream(in, cl)
+                            : new 
InstantiationUtil.ClassLoaderObjectInputStream(in, cl);
             Thread.currentThread().setContextClassLoader(cl);
             return (T) oois.readObject();
         } finally {
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/OverWindowRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/OverWindowRestoreTest.java
new file mode 100644
index 00000000000..cb681e4bff5
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/OverWindowRestoreTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.FlinkVersion;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
+import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
+import org.apache.flink.table.test.program.TableTestProgram;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Stream;
+
+/** Tests for verifying {@link StreamExecOverAggregate}. */
+public class OverWindowRestoreTest extends RestoreTestBase {
+    public OverWindowRestoreTest() {
+        super(StreamExecOverAggregate.class);
+    }
+
+    @Override
+    protected Stream<String> getSavepointPaths(
+            TableTestProgram program, ExecNodeMetadata metadata) {
+        if (metadata.version() == 1) {
+            return Stream.concat(
+                    super.getSavepointPaths(program, metadata),
+                    // See 
src/test/resources/restore-tests/stream-exec-over-aggregate_1/over
+                    // -aggregate-lag/1.18/savepoint/OverWindowRestoreTest how 
the savepoint was
+                    // generated
+                    Stream.of(getSavepointPath(program, metadata, 
FlinkVersion.v1_18)));
+        } else {
+            return super.getSavepointPaths(program, metadata);
+        }
+    }
+
+    @Override
+    public List<TableTestProgram> programs() {
+        return 
Collections.singletonList(OverWindowTestPrograms.LAG_OVER_FUNCTION);
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/OverWindowTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/OverWindowTestPrograms.java
new file mode 100644
index 00000000000..e9fdbb03a42
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/OverWindowTestPrograms.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+
+import java.util.Collections;
+
+/** Programs for verifying {@link StreamExecOverAggregate}. */
+public class OverWindowTestPrograms {
+    static final TableTestProgram LAG_OVER_FUNCTION =
+            TableTestProgram.of("over-aggregate-lag", "validates restoring a 
lag function")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("t")
+                                    .addSchema(
+                                            "ts STRING",
+                                            "b MAP<DOUBLE, DOUBLE>",
+                                            "`r_time` AS TO_TIMESTAMP(`ts`)",
+                                            "WATERMARK for `r_time` AS 
`r_time`")
+                                    .producedBeforeRestore(
+                                            Row.of(
+                                                    "2020-04-15 08:00:05",
+                                                    
Collections.singletonMap(42.0, 42.0)))
+                                    .producedAfterRestore(
+                                            Row.of(
+                                                    "2020-04-15 08:00:06",
+                                                    
Collections.singletonMap(42.1, 42.1)))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema("ts STRING", "b MAP<DOUBLE, 
DOUBLE>")
+                                    .consumedBeforeRestore(Row.of("2020-04-15 
08:00:05", null))
+                                    .consumedAfterRestore(
+                                            Row.of(
+                                                    "2020-04-15 08:00:06",
+                                                    
Collections.singletonMap(42.0, 42.0)))
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink_t SELECT ts, LAG(b, 1) over 
(order by r_time) AS "
+                                    + "bLag FROM t")
+                    .build();
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java
index af80fd24759..96d9d71b47d 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.testutils;
 
+import org.apache.flink.FlinkVersion;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.configuration.StateBackendOptions;
 import org.apache.flink.core.execution.JobClient;
@@ -58,6 +59,8 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 
+import javax.annotation.Nullable;
+
 import java.net.URI;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -89,6 +92,9 @@ import static org.assertj.core.api.Assertions.assertThat;
 @TestMethodOrder(OrderAnnotation.class)
 public abstract class RestoreTestBase implements TableTestProgramRunner {
 
+    // This version can be set to generate savepoints for a particular Flink 
version.
+    // By default, the savepoint is generated for the current version in the 
default directory.
+    private static final FlinkVersion FLINK_VERSION = null;
     private final Class<? extends ExecNode<?>> execNodeUnderTest;
     private final List<Class<? extends ExecNode<?>>> childExecNodesUnderTest;
     private final AfterRestoreSource afterRestoreSource;
@@ -173,7 +179,42 @@ public abstract class RestoreTestBase implements 
TableTestProgramRunner {
         return getAllMetadata().stream()
                 .flatMap(
                         metadata ->
-                                supportedPrograms().stream().map(p -> 
Arguments.of(p, metadata)));
+                                supportedPrograms().stream()
+                                        .flatMap(
+                                                program ->
+                                                        
getSavepointPaths(program, metadata)
+                                                                .map(
+                                                                        
savepointPath ->
+                                                                               
 Arguments.of(
+                                                                               
         program,
+                                                                               
         getPlanPath(
+                                                                               
                 program,
+                                                                               
                 metadata),
+                                                                               
         savepointPath))));
+    }
+
+    /**
+     * The method can be overridden in a subclass to test multiple savepoint 
files for a given
+     * program and a node in a particular version. This can be useful e.g. to 
test a node against
+     * savepoint generated in different Flink versions.
+     */
+    protected Stream<String> getSavepointPaths(
+            TableTestProgram program, ExecNodeMetadata metadata) {
+        return Stream.of(getSavepointPath(program, metadata, null));
+    }
+
+    protected final String getSavepointPath(
+            TableTestProgram program,
+            ExecNodeMetadata metadata,
+            @Nullable FlinkVersion flinkVersion) {
+        StringBuilder builder = new StringBuilder();
+        builder.append(getTestResourceDirectory(program, metadata));
+        if (flinkVersion != null) {
+            builder.append("/").append(flinkVersion);
+        }
+        builder.append("/savepoint/");
+
+        return builder.toString();
     }
 
     private void registerSinkObserver(
@@ -260,7 +301,8 @@ public abstract class RestoreTestBase implements 
TableTestProgramRunner {
                         .get();
         CommonTestUtils.waitForJobStatus(jobClient, 
Collections.singletonList(JobStatus.FINISHED));
         final Path savepointPath = Paths.get(new URI(savepoint));
-        final Path savepointDirPath = getSavepointPath(program, 
getLatestMetadata());
+        final Path savepointDirPath =
+                Paths.get(getSavepointPath(program, getLatestMetadata(), 
FLINK_VERSION));
         Files.createDirectories(savepointDirPath);
         Files.move(savepointPath, savepointDirPath, 
StandardCopyOption.ATOMIC_MOVE);
     }
@@ -268,7 +310,8 @@ public abstract class RestoreTestBase implements 
TableTestProgramRunner {
     @ParameterizedTest
     @MethodSource("createSpecs")
     @Order(1)
-    void testRestore(TableTestProgram program, ExecNodeMetadata metadata) 
throws Exception {
+    void testRestore(TableTestProgram program, Path planPath, String 
savepointPath)
+            throws Exception {
         final EnvironmentSettings settings = 
EnvironmentSettings.inStreamingMode();
         final SavepointRestoreSettings restoreSettings;
         if (afterRestoreSource == AfterRestoreSource.NO_RESTORE) {
@@ -276,9 +319,7 @@ public abstract class RestoreTestBase implements 
TableTestProgramRunner {
         } else {
             restoreSettings =
                     SavepointRestoreSettings.forPath(
-                            getSavepointPath(program, metadata).toString(),
-                            false,
-                            RecoveryClaimMode.NO_CLAIM);
+                            savepointPath, false, RecoveryClaimMode.NO_CLAIM);
         }
         SavepointRestoreSettings.toConfiguration(restoreSettings, 
settings.getConfiguration());
         settings.getConfiguration().set(StateBackendOptions.STATE_BACKEND, 
"rocksdb");
@@ -322,8 +363,7 @@ public abstract class RestoreTestBase implements 
TableTestProgramRunner {
         program.getSetupFunctionTestSteps().forEach(s -> s.apply(tEnv));
         program.getSetupTemporalFunctionTestSteps().forEach(s -> 
s.apply(tEnv));
 
-        final CompiledPlan compiledPlan =
-                tEnv.loadPlan(PlanReference.fromFile(getPlanPath(program, 
metadata)));
+        final CompiledPlan compiledPlan = 
tEnv.loadPlan(PlanReference.fromFile(planPath));
 
         if (afterRestoreSource == AfterRestoreSource.INFINITE) {
             final TableResult tableResult = compiledPlan.execute();
@@ -350,10 +390,6 @@ public abstract class RestoreTestBase implements 
TableTestProgramRunner {
                 getTestResourceDirectory(program, metadata) + "/plan/" + 
program.id + ".json");
     }
 
-    private Path getSavepointPath(TableTestProgram program, ExecNodeMetadata 
metadata) {
-        return Paths.get(getTestResourceDirectory(program, metadata) + 
"/savepoint/");
-    }
-
     private String getTestResourceDirectory(TableTestProgram program, 
ExecNodeMetadata metadata) {
         return String.format(
                 "%s/src/test/resources/restore-tests/%s_%d/%s",
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-lag/1.18/savepoint/OverWindowRestoreTest
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-lag/1.18/savepoint/OverWindowRestoreTest
new file mode 100644
index 00000000000..08e2541de98
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-lag/1.18/savepoint/OverWindowRestoreTest
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.commons.collections.CollectionUtils;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.configuration.StateBackendOptions;
+import org.apache.flink.connector.source.enumerator.NoOpEnumState;
+import org.apache.flink.connector.source.enumerator.NoOpEnumStateSerializer;
+import org.apache.flink.connector.source.enumerator.ValuesSourceEnumerator;
+import org.apache.flink.connector.source.split.ValuesSourceSplit;
+import org.apache.flink.connector.source.split.ValuesSourceSplitSerializer;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.CompiledPlan;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.config.TableConfigOptions;
+
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayOutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+// Run this test to generate a savepoint in Flink 1.18.
+// In order to run the test, you need to cherry-pick commits:
+// - dcce3764a4500b2006cd260677169d14c553a3eb
+// - e914eb7fc3f31286ed7e33cc93e7ffbca785b731
+// - a5b4e60e563bf145afede43dda8510833d2932e4
+@ExtendWith(MiniClusterExtension.class)
+class OverWindowRestoreTest {
+
+    private @TempDir Path tmpDir;
+
+    private final Path savepointDirPath = Paths.get(String.format(
+            "%s/src/test/resources/restore-tests/%s_%d/%s/savepoint/",
+            System.getProperty("user.dir"), "stream-exec-over-aggregate", 1,
+            "lag-function"));
+
+    @Test
+    void testOverWindowRestore() throws Exception {
+        final EnvironmentSettings settings = 
EnvironmentSettings.inStreamingMode();
+        settings.getConfiguration().set(StateBackendOptions.STATE_BACKEND, 
"rocksdb");
+        final TableEnvironment tEnv = TableEnvironment.create(settings);
+        tEnv.getConfig()
+                .set(
+                        TableConfigOptions.PLAN_COMPILE_CATALOG_OBJECTS,
+                        TableConfigOptions.CatalogPlanCompilation.SCHEMA);
+
+            final String id = TestValuesTableFactory.registerData(
+                    Collections.singletonList(
+                            Row.of("2020-04-15 08:00:05", 
Collections.singletonMap(42.0, 42.0))
+                    )
+            );
+
+            tEnv.executeSql(
+                    "CREATE TABLE t(\n"
+                            + "ts STRING,\n"
+                            + "b MAP<DOUBLE, DOUBLE>,\n"
+                            + "r_time AS TO_TIMESTAMP(`ts`),\n"
+                            + "WATERMARK for `r_time` AS `r_time`\n"
+                            + ") WITH(\n"
+                            + "'connector' = 'values',\n"
+                            + String.format("'data-id' = '%s',\n", id)
+                            + "'terminating' = 'false',\n"
+                            + "'runtime-source' = 'NewSource'\n"
+                            + ")\n"
+            );
+
+        CompletableFuture<?> future = new CompletableFuture<>();
+        TestValuesTableFactory.registerLocalRawResultsObserver(
+            "sink_t",
+                (integer, strings) -> {
+                    List<String> results = 
Collections.singletonList("+I[2020-04-15 08:00:05,"
+                            + " null]");
+                    List<String> currentResults = 
TestValuesTableFactory.getRawResultsAsStrings("sink_t");
+                    final boolean shouldComplete =
+                            CollectionUtils.isEqualCollection(currentResults, 
results);
+                    if (shouldComplete) {
+                        future.complete(null);
+                    }
+                }
+        );
+
+        tEnv.executeSql(
+                "CREATE TABLE sink_t(\n"
+                        + "ts STRING,\n"
+                        + "b MAP<DOUBLE, DOUBLE>\n"
+                        + ") WITH(\n"
+                        + "'connector' = 'values',\n"
+                        + "'sink-insert-only' = 'false'\n"
+                        + ")\n"
+        );
+
+        final CompiledPlan compiledPlan = tEnv.compilePlanSql(
+                "INSERT INTO sink_t SELECT ts, LAG(b, 1) over (order by 
r_time) AS bLag FROM t");
+
+        final TableResult tableResult = compiledPlan.execute();
+        future.get();
+        final JobClient jobClient = tableResult.getJobClient().get();
+        final String savepoint =
+                jobClient
+                        .stopWithSavepoint(false, tmpDir.toString(), 
SavepointFormatType.DEFAULT)
+                        .get();
+        CommonTestUtils.waitForJobStatus(jobClient, 
Collections.singletonList(JobStatus.FINISHED));
+        final Path savepointPath = Paths.get(new URI(savepoint));
+        Files.createDirectories(savepointDirPath);
+        Files.move(savepointPath, savepointDirPath, 
StandardCopyOption.ATOMIC_MOVE);
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-lag/1.18/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-lag/1.18/savepoint/_metadata
new file mode 100644
index 00000000000..91cb0c70428
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-lag/1.18/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-lag/plan/over-aggregate-lag.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-lag/plan/over-aggregate-lag.json
new file mode 100644
index 00000000000..c09e4e1171c
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-lag/plan/over-aggregate-lag.json
@@ -0,0 +1,318 @@
+{
+  "flinkVersion" : "1.19",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "ts",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "b",
+              "dataType" : "MAP<DOUBLE, DOUBLE>"
+            }, {
+              "name" : "r_time",
+              "kind" : "COMPUTED",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "internalName" : "$TO_TIMESTAMP$1",
+                  "operands" : [ {
+                    "kind" : "INPUT_REF",
+                    "inputIndex" : 0,
+                    "type" : "VARCHAR(2147483647)"
+                  } ],
+                  "type" : "TIMESTAMP(3)"
+                },
+                "serializableString" : "TO_TIMESTAMP(`ts`)"
+              }
+            } ],
+            "watermarkSpecs" : [ {
+              "rowtimeAttribute" : "r_time",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "INPUT_REF",
+                  "inputIndex" : 2,
+                  "type" : "TIMESTAMP(3)"
+                },
+                "serializableString" : "`r_time`"
+              }
+            } ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "outputType" : "ROW<`ts` VARCHAR(2147483647), `b` MAP<DOUBLE, DOUBLE>>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, t]], fields=[ts, b])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 2,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "MAP<DOUBLE, DOUBLE>"
+    }, {
+      "kind" : "CALL",
+      "internalName" : "$TO_TIMESTAMP$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 0,
+        "type" : "VARCHAR(2147483647)"
+      } ],
+      "type" : "TIMESTAMP(3)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`ts` VARCHAR(2147483647), `b` MAP<DOUBLE, DOUBLE>, 
`r_time` TIMESTAMP(3)>",
+    "description" : "Calc(select=[ts, b, TO_TIMESTAMP(ts) AS r_time])"
+  }, {
+    "id" : 3,
+    "type" : "stream-exec-watermark-assigner_1",
+    "watermarkExpr" : {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "TIMESTAMP(3)"
+    },
+    "rowtimeFieldIndex" : 2,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "ts",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "b",
+        "fieldType" : "MAP<DOUBLE, DOUBLE>"
+      }, {
+        "name" : "r_time",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      } ]
+    },
+    "description" : "WatermarkAssigner(rowtime=[r_time], watermark=[r_time])"
+  }, {
+    "id" : 4,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "SINGLETON"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "ts",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "b",
+        "fieldType" : "MAP<DOUBLE, DOUBLE>"
+      }, {
+        "name" : "r_time",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      } ]
+    },
+    "description" : "Exchange(distribution=[single])"
+  }, {
+    "id" : 5,
+    "type" : "stream-exec-over-aggregate_1",
+    "overSpec" : {
+      "partition" : {
+        "fields" : [ ]
+      },
+      "groups" : [ {
+        "orderBy" : {
+          "fields" : [ {
+            "index" : 2,
+            "isAscending" : true,
+            "nullIsLast" : false
+          } ]
+        },
+        "isRows" : false,
+        "lowerBound" : {
+          "kind" : "UNBOUNDED_PRECEDING"
+        },
+        "upperBound" : {
+          "kind" : "CURRENT_ROW"
+        },
+        "aggCalls" : [ {
+          "name" : "w0$o0",
+          "internalName" : "$LAG$1",
+          "argList" : [ 1, 3 ],
+          "filterArg" : -1,
+          "distinct" : false,
+          "approximate" : false,
+          "ignoreNulls" : false,
+          "type" : "MAP<DOUBLE, DOUBLE>"
+        } ]
+      } ],
+      "constants" : [ {
+        "kind" : "LITERAL",
+        "value" : 1,
+        "type" : "INT NOT NULL"
+      } ],
+      "originalInputFields" : 3
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "ts",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "b",
+        "fieldType" : "MAP<DOUBLE, DOUBLE>"
+      }, {
+        "name" : "r_time",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      }, {
+        "name" : "w0$o0",
+        "fieldType" : "MAP<DOUBLE, DOUBLE>"
+      } ]
+    },
+    "description" : "OverAggregate(orderBy=[r_time ASC], window=[ RANG BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW], select=[ts, b, r_time, LAG(b, 1) AS 
w0$o0])"
+  }, {
+    "id" : 6,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : "MAP<DOUBLE, DOUBLE>"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`ts` VARCHAR(2147483647), `$1` MAP<DOUBLE, DOUBLE>>",
+    "description" : "Calc(select=[ts, w0$o0 AS $1])"
+  }, {
+    "id" : 7,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "ts",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "b",
+              "dataType" : "MAP<DOUBLE, DOUBLE>"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`ts` VARCHAR(2147483647), `$1` MAP<DOUBLE, DOUBLE>>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[ts, $1])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 3,
+    "target" : 4,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 4,
+    "target" : 5,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 5,
+    "target" : 6,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 6,
+    "target" : 7,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-lag/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-lag/savepoint/_metadata
new file mode 100644
index 00000000000..6743d9776ce
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-lag/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/ArrayDataSerializer.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/ArrayDataSerializer.java
index 98c6bd8d297..bf9783789fe 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/ArrayDataSerializer.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/ArrayDataSerializer.java
@@ -310,8 +310,10 @@ public class ArrayDataSerializer extends 
TypeSerializer<ArrayData> {
                 throws IOException {
             try {
                 DataInputViewStream inStream = new DataInputViewStream(in);
-                this.eleType = InstantiationUtil.deserializeObject(inStream, 
userCodeClassLoader);
-                this.eleSer = InstantiationUtil.deserializeObject(inStream, 
userCodeClassLoader);
+                this.eleType =
+                        InstantiationUtil.deserializeObject(inStream, 
userCodeClassLoader, true);
+                this.eleSer =
+                        InstantiationUtil.deserializeObject(inStream, 
userCodeClassLoader, true);
             } catch (ClassNotFoundException e) {
                 throw new IOException(e);
             }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/MapDataSerializer.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/MapDataSerializer.java
index c667777c896..cd58a4bc0a3 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/MapDataSerializer.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/MapDataSerializer.java
@@ -44,6 +44,9 @@ import java.io.IOException;
 @Internal
 public class MapDataSerializer extends TypeSerializer<MapData> {
 
+    // version used since 1.19
+    private static final long serialVersionUID = 4073842523628732956L;
+
     private final LogicalType keyType;
     private final LogicalType valueType;
 
@@ -293,12 +296,14 @@ public class MapDataSerializer extends 
TypeSerializer<MapData> {
                 throws IOException {
             try {
                 DataInputViewStream inStream = new DataInputViewStream(in);
-                this.keyType = InstantiationUtil.deserializeObject(inStream, 
userCodeClassLoader);
-                this.valueType = InstantiationUtil.deserializeObject(inStream, 
userCodeClassLoader);
+                this.keyType =
+                        InstantiationUtil.deserializeObject(inStream, 
userCodeClassLoader, true);
+                this.valueType =
+                        InstantiationUtil.deserializeObject(inStream, 
userCodeClassLoader, true);
                 this.keySerializer =
-                        InstantiationUtil.deserializeObject(inStream, 
userCodeClassLoader);
+                        InstantiationUtil.deserializeObject(inStream, 
userCodeClassLoader, true);
                 this.valueSerializer =
-                        InstantiationUtil.deserializeObject(inStream, 
userCodeClassLoader);
+                        InstantiationUtil.deserializeObject(inStream, 
userCodeClassLoader, true);
             } catch (ClassNotFoundException e) {
                 throw new IOException(e);
             }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/RowDataSerializer.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/RowDataSerializer.java
index 178f4b0e8aa..3106d9e65d5 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/RowDataSerializer.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/RowDataSerializer.java
@@ -314,7 +314,8 @@ public class RowDataSerializer extends 
AbstractRowDataSerializer<RowData> {
             types = new LogicalType[length];
             for (int i = 0; i < length; i++) {
                 try {
-                    types[i] = InstantiationUtil.deserializeObject(stream, 
userCodeClassLoader);
+                    types[i] =
+                            InstantiationUtil.deserializeObject(stream, 
userCodeClassLoader, true);
                 } catch (ClassNotFoundException e) {
                     throw new IOException(e);
                 }

Reply via email to