This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 52e23d57133 [FLINK-36318] Fix deserializing from 1.18 savepoint with RAW<MAP...> types 52e23d57133 is described below commit 52e23d57133ac1eb9dde14b9dc1504717c635b9b Author: Dawid Wysakowicz <dwysakow...@apache.org> AuthorDate: Fri Sep 20 21:52:53 2024 +0200 [FLINK-36318] Fix deserializing from 1.18 savepoint with RAW<MAP...> types The commit fixes restoring from savepoints which were created for SQL queries which used functions such as LAG/LEAD/ARRAY_AGG which use a RAW type for accumulators. --- .../org/apache/flink/util/InstantiationUtil.java | 211 ++++++-------- .../nodes/exec/stream/OverWindowRestoreTest.java | 55 ++++ .../nodes/exec/stream/OverWindowTestPrograms.java | 61 ++++ .../plan/nodes/exec/testutils/RestoreTestBase.java | 60 +++- .../1.18/savepoint/OverWindowRestoreTest | 176 ++++++++++++ .../over-aggregate-lag/1.18/savepoint/_metadata | Bin 0 -> 24310 bytes .../plan/over-aggregate-lag.json | 318 +++++++++++++++++++++ .../over-aggregate-lag/savepoint/_metadata | Bin 0 -> 24310 bytes .../runtime/typeutils/ArrayDataSerializer.java | 6 +- .../table/runtime/typeutils/MapDataSerializer.java | 13 +- .../table/runtime/typeutils/RowDataSerializer.java | 3 +- 11 files changed, 756 insertions(+), 147 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java index 8207e6379a2..334a30cde88 100644 --- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java +++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java @@ -20,7 +20,6 @@ package org.apache.flink.util; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.base.MapSerializer; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.IOReadableWritable; import org.apache.flink.core.memory.DataInputView; @@ -46,9 +45,8 @@ import java.lang.reflect.Proxy; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; +import java.util.List; import java.util.Map; -import java.util.Set; import java.util.zip.DeflaterOutputStream; import java.util.zip.InflaterInputStream; @@ -146,144 +144,93 @@ public final class InstantiationUtil { } /** - * This is maintained as a temporary workaround for FLINK-6869. - * - * <p>Before 1.3, the Scala serializers did not specify the serialVersionUID. Although since 1.3 - * they are properly specified, we still have to ignore them for now as their previous - * serialVersionUIDs will vary depending on the Scala version. - * - * <p>This can be removed once 1.2 is no longer supported. + * Workaround for bugs like e.g. FLINK-36318 where we serialize a class into a snapshot and then + * its serialVersionUID is changed in an uncontrolled way. This lets us deserialize the old + * snapshot assuming the binary representation of the faulty class has not changed. */ - private static final Set<String> scalaSerializerClassnames = new HashSet<>(); + private static final class VersionMismatchHandler { - static { - scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.TraversableSerializer"); - scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.CaseClassSerializer"); - scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EitherSerializer"); - scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EnumValueSerializer"); - scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.OptionSerializer"); - scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.TrySerializer"); - scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.UnitSerializer"); - } + private final Map<String, Map<Long, List<Long>>> supportedSerialVersionUidsPerClass = + new HashMap<>(); - /** - * The serialVersionUID might change between Scala versions and since those classes are part of - * the tuple serializer config snapshots we need to ignore them. - * - * @see <a href="https://issues.apache.org/jira/browse/FLINK-8451">FLINK-8451</a> - */ - private static final Set<String> scalaTypes = new HashSet<>(); + void addVersionsMatch( + String className, long localVersionUID, List<Long> streamVersionUIDs) { + supportedSerialVersionUidsPerClass + .computeIfAbsent(className, k -> new HashMap<>()) + .put(localVersionUID, streamVersionUIDs); + } - static { - scalaTypes.add("scala.Tuple1"); - scalaTypes.add("scala.Tuple2"); - scalaTypes.add("scala.Tuple3"); - scalaTypes.add("scala.Tuple4"); - scalaTypes.add("scala.Tuple5"); - scalaTypes.add("scala.Tuple6"); - scalaTypes.add("scala.Tuple7"); - scalaTypes.add("scala.Tuple8"); - scalaTypes.add("scala.Tuple9"); - scalaTypes.add("scala.Tuple10"); - scalaTypes.add("scala.Tuple11"); - scalaTypes.add("scala.Tuple12"); - scalaTypes.add("scala.Tuple13"); - scalaTypes.add("scala.Tuple14"); - scalaTypes.add("scala.Tuple15"); - scalaTypes.add("scala.Tuple16"); - scalaTypes.add("scala.Tuple17"); - scalaTypes.add("scala.Tuple18"); - scalaTypes.add("scala.Tuple19"); - scalaTypes.add("scala.Tuple20"); - scalaTypes.add("scala.Tuple21"); - scalaTypes.add("scala.Tuple22"); - scalaTypes.add("scala.Tuple1$mcJ$sp"); - scalaTypes.add("scala.Tuple1$mcI$sp"); - scalaTypes.add("scala.Tuple1$mcD$sp"); - scalaTypes.add("scala.Tuple2$mcJJ$sp"); - scalaTypes.add("scala.Tuple2$mcJI$sp"); - scalaTypes.add("scala.Tuple2$mcJD$sp"); - scalaTypes.add("scala.Tuple2$mcIJ$sp"); - scalaTypes.add("scala.Tuple2$mcII$sp"); - scalaTypes.add("scala.Tuple2$mcID$sp"); - scalaTypes.add("scala.Tuple2$mcDJ$sp"); - scalaTypes.add("scala.Tuple2$mcDI$sp"); - scalaTypes.add("scala.Tuple2$mcDD$sp"); - scalaTypes.add("scala.Enumeration$ValueSet"); - } - - private static boolean isAnonymousClass(Class clazz) { - final String name = clazz.getName(); - - // isAnonymousClass does not work for anonymous Scala classes; additionally check by class - // name - if (name.contains("$anon$") || name.contains("$anonfun") || name.contains("$macro$")) { - return true; - } - - // calling isAnonymousClass or getSimpleName can throw InternalError for certain Scala - // types, see https://issues.scala-lang.org/browse/SI-2034 - // until we move to JDK 9, this try-catch is necessary - try { - return clazz.isAnonymousClass(); - } catch (InternalError e) { - return false; + /** + * Checks if the local version of the given class can safely deserialize the class of a + * different version from the object stream. + */ + boolean shouldTolerateSerialVersionMismatch( + String className, long localVersionUID, long streamVersionUID) { + return supportedSerialVersionUidsPerClass + .getOrDefault(className, Collections.emptyMap()) + .getOrDefault(localVersionUID, Collections.emptyList()) + .contains(streamVersionUID); + } + + /** + * Checks if there are any rules for the given class. This lets us decide early if we need + * to look up the local class. + */ + boolean haveRulesForClass(String className) { + return supportedSerialVersionUidsPerClass.containsKey(className); } } - private static boolean isOldAvroSerializer(String name, long serialVersionUID) { - // please see FLINK-11436 for details on why we need to ignore serial version UID here for - // the AvroSerializer - return (serialVersionUID == 1) - && "org.apache.flink.formats.avro.typeutils.AvroSerializer".equals(name); + private static final VersionMismatchHandler versionMismatchHandler = + new VersionMismatchHandler(); + + static { + // See FLINK-36318 + versionMismatchHandler.addVersionsMatch( + "org.apache.flink.table.runtime.typeutils.MapDataSerializer", + 4073842523628732956L, + Collections.singletonList(2533002123505507000L)); } /** - * A mapping between the full path of a deprecated serializer and its equivalent. These mappings - * are hardcoded and fixed. - * - * <p>IMPORTANT: mappings can be removed after 1 release as there will be a "migration path". As - * an example, a serializer is removed in 1.5-SNAPSHOT, then the mapping should be added for - * 1.5, and it can be removed in 1.6, as the path would be Flink-{< 1.5} -> Flink-1.5 -> - * Flink-{>= 1.6}. + * An {@link ObjectInputStream} that ignores certain serialVersionUID mismatches. This is a + * workaround for uncontrolled serialVersionUIDs changes. */ - private enum MigrationUtil { - - // To add a new mapping just pick a name and add an entry as the following: - - HASH_MAP_SERIALIZER( - "org.apache.flink.runtime.state.HashMapSerializer", - ObjectStreamClass.lookup(MapSerializer.class)); // added in 1.5 - - /** - * An internal unmodifiable map containing the mappings between deprecated and new - * serializers. - */ - private static final Map<String, ObjectStreamClass> EQUIVALENCE_MAP = - Collections.unmodifiableMap(initMap()); - - /** The full name of the class of the old serializer. */ - private final String oldSerializerName; - - /** The serialization descriptor of the class of the new serializer. */ - private final ObjectStreamClass newSerializerStreamClass; + public static class FailureTolerantObjectInputStream + extends InstantiationUtil.ClassLoaderObjectInputStream { - MigrationUtil(String oldSerializerName, ObjectStreamClass newSerializerStreamClass) { - this.oldSerializerName = oldSerializerName; - this.newSerializerStreamClass = newSerializerStreamClass; + public FailureTolerantObjectInputStream(InputStream in, ClassLoader cl) throws IOException { + super(in, cl); } - private static Map<String, ObjectStreamClass> initMap() { - final Map<String, ObjectStreamClass> init = - CollectionUtil.newHashMapWithExpectedSize(4); - for (MigrationUtil m : MigrationUtil.values()) { - init.put(m.oldSerializerName, m.newSerializerStreamClass); + @Override + protected ObjectStreamClass readClassDescriptor() + throws IOException, ClassNotFoundException { + ObjectStreamClass streamClassDescriptor = super.readClassDescriptor(); + + final Class localClass = resolveClass(streamClassDescriptor); + final String name = localClass.getName(); + if (versionMismatchHandler.haveRulesForClass(name)) { + final ObjectStreamClass localClassDescriptor = ObjectStreamClass.lookup(localClass); + if (localClassDescriptor != null + && localClassDescriptor.getSerialVersionUID() + != streamClassDescriptor.getSerialVersionUID()) { + if (versionMismatchHandler.shouldTolerateSerialVersionMismatch( + name, + localClassDescriptor.getSerialVersionUID(), + streamClassDescriptor.getSerialVersionUID())) { + LOG.warn( + "Ignoring serialVersionUID mismatch for class {}; was {}, now {}.", + streamClassDescriptor.getName(), + streamClassDescriptor.getSerialVersionUID(), + localClassDescriptor.getSerialVersionUID()); + + streamClassDescriptor = localClassDescriptor; + } + } } - return init; - } - private static ObjectStreamClass getEquivalentSerializer(String classDescriptorName) { - return EQUIVALENCE_MAP.get(classDescriptorName); + return streamClassDescriptor; } } @@ -515,20 +462,28 @@ public final class InstantiationUtil { return serializer.deserialize(reuse, inputViewWrapper); } - @SuppressWarnings("unchecked") public static <T> T deserializeObject(byte[] bytes, ClassLoader cl) throws IOException, ClassNotFoundException { return deserializeObject(new ByteArrayInputStream(bytes), cl); } - @SuppressWarnings("unchecked") public static <T> T deserializeObject(InputStream in, ClassLoader cl) throws IOException, ClassNotFoundException { + return deserializeObject(in, cl, false); + } + + @SuppressWarnings("unchecked") + public static <T> T deserializeObject( + InputStream in, ClassLoader cl, boolean tolerateKnownVersionMismatch) + throws IOException, ClassNotFoundException { final ClassLoader old = Thread.currentThread().getContextClassLoader(); // not using resource try to avoid AutoClosable's close() on the given stream try { - ObjectInputStream oois = new InstantiationUtil.ClassLoaderObjectInputStream(in, cl); + ObjectInputStream oois = + tolerateKnownVersionMismatch + ? new InstantiationUtil.FailureTolerantObjectInputStream(in, cl) + : new InstantiationUtil.ClassLoaderObjectInputStream(in, cl); Thread.currentThread().setContextClassLoader(cl); return (T) oois.readObject(); } finally { diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/OverWindowRestoreTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/OverWindowRestoreTest.java new file mode 100644 index 00000000000..cb681e4bff5 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/OverWindowRestoreTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.FlinkVersion; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata; +import org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase; +import org.apache.flink.table.test.program.TableTestProgram; + +import java.util.Collections; +import java.util.List; +import java.util.stream.Stream; + +/** Tests for verifying {@link StreamExecOverAggregate}. */ +public class OverWindowRestoreTest extends RestoreTestBase { + public OverWindowRestoreTest() { + super(StreamExecOverAggregate.class); + } + + @Override + protected Stream<String> getSavepointPaths( + TableTestProgram program, ExecNodeMetadata metadata) { + if (metadata.version() == 1) { + return Stream.concat( + super.getSavepointPaths(program, metadata), + // See src/test/resources/restore-tests/stream-exec-over-aggregate_1/over + // -aggregate-lag/1.18/savepoint/OverWindowRestoreTest how the savepoint was + // generated + Stream.of(getSavepointPath(program, metadata, FlinkVersion.v1_18))); + } else { + return super.getSavepointPaths(program, metadata); + } + } + + @Override + public List<TableTestProgram> programs() { + return Collections.singletonList(OverWindowTestPrograms.LAG_OVER_FUNCTION); + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/OverWindowTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/OverWindowTestPrograms.java new file mode 100644 index 00000000000..e9fdbb03a42 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/OverWindowTestPrograms.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.test.program.SinkTestStep; +import org.apache.flink.table.test.program.SourceTestStep; +import org.apache.flink.table.test.program.TableTestProgram; +import org.apache.flink.types.Row; + +import java.util.Collections; + +/** Programs for verifying {@link StreamExecOverAggregate}. */ +public class OverWindowTestPrograms { + static final TableTestProgram LAG_OVER_FUNCTION = + TableTestProgram.of("over-aggregate-lag", "validates restoring a lag function") + .setupTableSource( + SourceTestStep.newBuilder("t") + .addSchema( + "ts STRING", + "b MAP<DOUBLE, DOUBLE>", + "`r_time` AS TO_TIMESTAMP(`ts`)", + "WATERMARK for `r_time` AS `r_time`") + .producedBeforeRestore( + Row.of( + "2020-04-15 08:00:05", + Collections.singletonMap(42.0, 42.0))) + .producedAfterRestore( + Row.of( + "2020-04-15 08:00:06", + Collections.singletonMap(42.1, 42.1))) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema("ts STRING", "b MAP<DOUBLE, DOUBLE>") + .consumedBeforeRestore(Row.of("2020-04-15 08:00:05", null)) + .consumedAfterRestore( + Row.of( + "2020-04-15 08:00:06", + Collections.singletonMap(42.0, 42.0))) + .build()) + .runSql( + "INSERT INTO sink_t SELECT ts, LAG(b, 1) over (order by r_time) AS " + + "bLag FROM t") + .build(); +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java index af80fd24759..96d9d71b47d 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java @@ -18,6 +18,7 @@ package org.apache.flink.table.planner.plan.nodes.exec.testutils; +import org.apache.flink.FlinkVersion; import org.apache.flink.api.common.JobStatus; import org.apache.flink.configuration.StateBackendOptions; import org.apache.flink.core.execution.JobClient; @@ -58,6 +59,8 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import javax.annotation.Nullable; + import java.net.URI; import java.nio.file.Files; import java.nio.file.Path; @@ -89,6 +92,9 @@ import static org.assertj.core.api.Assertions.assertThat; @TestMethodOrder(OrderAnnotation.class) public abstract class RestoreTestBase implements TableTestProgramRunner { + // This version can be set to generate savepoints for a particular Flink version. + // By default, the savepoint is generated for the current version in the default directory. + private static final FlinkVersion FLINK_VERSION = null; private final Class<? extends ExecNode<?>> execNodeUnderTest; private final List<Class<? extends ExecNode<?>>> childExecNodesUnderTest; private final AfterRestoreSource afterRestoreSource; @@ -173,7 +179,42 @@ public abstract class RestoreTestBase implements TableTestProgramRunner { return getAllMetadata().stream() .flatMap( metadata -> - supportedPrograms().stream().map(p -> Arguments.of(p, metadata))); + supportedPrograms().stream() + .flatMap( + program -> + getSavepointPaths(program, metadata) + .map( + savepointPath -> + Arguments.of( + program, + getPlanPath( + program, + metadata), + savepointPath)))); + } + + /** + * The method can be overridden in a subclass to test multiple savepoint files for a given + * program and a node in a particular version. This can be useful e.g. to test a node against + * savepoint generated in different Flink versions. + */ + protected Stream<String> getSavepointPaths( + TableTestProgram program, ExecNodeMetadata metadata) { + return Stream.of(getSavepointPath(program, metadata, null)); + } + + protected final String getSavepointPath( + TableTestProgram program, + ExecNodeMetadata metadata, + @Nullable FlinkVersion flinkVersion) { + StringBuilder builder = new StringBuilder(); + builder.append(getTestResourceDirectory(program, metadata)); + if (flinkVersion != null) { + builder.append("/").append(flinkVersion); + } + builder.append("/savepoint/"); + + return builder.toString(); } private void registerSinkObserver( @@ -260,7 +301,8 @@ public abstract class RestoreTestBase implements TableTestProgramRunner { .get(); CommonTestUtils.waitForJobStatus(jobClient, Collections.singletonList(JobStatus.FINISHED)); final Path savepointPath = Paths.get(new URI(savepoint)); - final Path savepointDirPath = getSavepointPath(program, getLatestMetadata()); + final Path savepointDirPath = + Paths.get(getSavepointPath(program, getLatestMetadata(), FLINK_VERSION)); Files.createDirectories(savepointDirPath); Files.move(savepointPath, savepointDirPath, StandardCopyOption.ATOMIC_MOVE); } @@ -268,7 +310,8 @@ public abstract class RestoreTestBase implements TableTestProgramRunner { @ParameterizedTest @MethodSource("createSpecs") @Order(1) - void testRestore(TableTestProgram program, ExecNodeMetadata metadata) throws Exception { + void testRestore(TableTestProgram program, Path planPath, String savepointPath) + throws Exception { final EnvironmentSettings settings = EnvironmentSettings.inStreamingMode(); final SavepointRestoreSettings restoreSettings; if (afterRestoreSource == AfterRestoreSource.NO_RESTORE) { @@ -276,9 +319,7 @@ public abstract class RestoreTestBase implements TableTestProgramRunner { } else { restoreSettings = SavepointRestoreSettings.forPath( - getSavepointPath(program, metadata).toString(), - false, - RecoveryClaimMode.NO_CLAIM); + savepointPath, false, RecoveryClaimMode.NO_CLAIM); } SavepointRestoreSettings.toConfiguration(restoreSettings, settings.getConfiguration()); settings.getConfiguration().set(StateBackendOptions.STATE_BACKEND, "rocksdb"); @@ -322,8 +363,7 @@ public abstract class RestoreTestBase implements TableTestProgramRunner { program.getSetupFunctionTestSteps().forEach(s -> s.apply(tEnv)); program.getSetupTemporalFunctionTestSteps().forEach(s -> s.apply(tEnv)); - final CompiledPlan compiledPlan = - tEnv.loadPlan(PlanReference.fromFile(getPlanPath(program, metadata))); + final CompiledPlan compiledPlan = tEnv.loadPlan(PlanReference.fromFile(planPath)); if (afterRestoreSource == AfterRestoreSource.INFINITE) { final TableResult tableResult = compiledPlan.execute(); @@ -350,10 +390,6 @@ public abstract class RestoreTestBase implements TableTestProgramRunner { getTestResourceDirectory(program, metadata) + "/plan/" + program.id + ".json"); } - private Path getSavepointPath(TableTestProgram program, ExecNodeMetadata metadata) { - return Paths.get(getTestResourceDirectory(program, metadata) + "/savepoint/"); - } - private String getTestResourceDirectory(TableTestProgram program, ExecNodeMetadata metadata) { return String.format( "%s/src/test/resources/restore-tests/%s_%d/%s", diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-lag/1.18/savepoint/OverWindowRestoreTest b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-lag/1.18/savepoint/OverWindowRestoreTest new file mode 100644 index 00000000000..08e2541de98 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-lag/1.18/savepoint/OverWindowRestoreTest @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.commons.collections.CollectionUtils; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.configuration.StateBackendOptions; +import org.apache.flink.connector.source.enumerator.NoOpEnumState; +import org.apache.flink.connector.source.enumerator.NoOpEnumStateSerializer; +import org.apache.flink.connector.source.enumerator.ValuesSourceEnumerator; +import org.apache.flink.connector.source.split.ValuesSourceSplit; +import org.apache.flink.connector.source.split.ValuesSourceSplitSerializer; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.core.execution.SavepointFormatType; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.CompiledPlan; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; + +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.config.TableConfigOptions; + +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; + +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; + +import javax.annotation.Nullable; + +import java.io.ByteArrayOutputStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.assertj.core.api.Assertions.assertThat; + +// Run this test to generate a savepoint in Flink 1.18. +// In order to run the test, you need to cherry-pick commits: +// - dcce3764a4500b2006cd260677169d14c553a3eb +// - e914eb7fc3f31286ed7e33cc93e7ffbca785b731 +// - a5b4e60e563bf145afede43dda8510833d2932e4 +@ExtendWith(MiniClusterExtension.class) +class OverWindowRestoreTest { + + private @TempDir Path tmpDir; + + private final Path savepointDirPath = Paths.get(String.format( + "%s/src/test/resources/restore-tests/%s_%d/%s/savepoint/", + System.getProperty("user.dir"), "stream-exec-over-aggregate", 1, + "lag-function")); + + @Test + void testOverWindowRestore() throws Exception { + final EnvironmentSettings settings = EnvironmentSettings.inStreamingMode(); + settings.getConfiguration().set(StateBackendOptions.STATE_BACKEND, "rocksdb"); + final TableEnvironment tEnv = TableEnvironment.create(settings); + tEnv.getConfig() + .set( + TableConfigOptions.PLAN_COMPILE_CATALOG_OBJECTS, + TableConfigOptions.CatalogPlanCompilation.SCHEMA); + + final String id = TestValuesTableFactory.registerData( + Collections.singletonList( + Row.of("2020-04-15 08:00:05", Collections.singletonMap(42.0, 42.0)) + ) + ); + + tEnv.executeSql( + "CREATE TABLE t(\n" + + "ts STRING,\n" + + "b MAP<DOUBLE, DOUBLE>,\n" + + "r_time AS TO_TIMESTAMP(`ts`),\n" + + "WATERMARK for `r_time` AS `r_time`\n" + + ") WITH(\n" + + "'connector' = 'values',\n" + + String.format("'data-id' = '%s',\n", id) + + "'terminating' = 'false',\n" + + "'runtime-source' = 'NewSource'\n" + + ")\n" + ); + + CompletableFuture<?> future = new CompletableFuture<>(); + TestValuesTableFactory.registerLocalRawResultsObserver( + "sink_t", + (integer, strings) -> { + List<String> results = Collections.singletonList("+I[2020-04-15 08:00:05," + + " null]"); + List<String> currentResults = TestValuesTableFactory.getRawResultsAsStrings("sink_t"); + final boolean shouldComplete = + CollectionUtils.isEqualCollection(currentResults, results); + if (shouldComplete) { + future.complete(null); + } + } + ); + + tEnv.executeSql( + "CREATE TABLE sink_t(\n" + + "ts STRING,\n" + + "b MAP<DOUBLE, DOUBLE>\n" + + ") WITH(\n" + + "'connector' = 'values',\n" + + "'sink-insert-only' = 'false'\n" + + ")\n" + ); + + final CompiledPlan compiledPlan = tEnv.compilePlanSql( + "INSERT INTO sink_t SELECT ts, LAG(b, 1) over (order by r_time) AS bLag FROM t"); + + final TableResult tableResult = compiledPlan.execute(); + future.get(); + final JobClient jobClient = tableResult.getJobClient().get(); + final String savepoint = + jobClient + .stopWithSavepoint(false, tmpDir.toString(), SavepointFormatType.DEFAULT) + .get(); + CommonTestUtils.waitForJobStatus(jobClient, Collections.singletonList(JobStatus.FINISHED)); + final Path savepointPath = Paths.get(new URI(savepoint)); + Files.createDirectories(savepointDirPath); + Files.move(savepointPath, savepointDirPath, StandardCopyOption.ATOMIC_MOVE); + } +} diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-lag/1.18/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-lag/1.18/savepoint/_metadata new file mode 100644 index 00000000000..91cb0c70428 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-lag/1.18/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-lag/plan/over-aggregate-lag.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-lag/plan/over-aggregate-lag.json new file mode 100644 index 00000000000..c09e4e1171c --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-lag/plan/over-aggregate-lag.json @@ -0,0 +1,318 @@ +{ + "flinkVersion" : "1.19", + "nodes" : [ { + "id" : 1, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "ts", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "b", + "dataType" : "MAP<DOUBLE, DOUBLE>" + }, { + "name" : "r_time", + "kind" : "COMPUTED", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "internalName" : "$TO_TIMESTAMP$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + } ], + "type" : "TIMESTAMP(3)" + }, + "serializableString" : "TO_TIMESTAMP(`ts`)" + } + } ], + "watermarkSpecs" : [ { + "rowtimeAttribute" : "r_time", + "expression" : { + "rexNode" : { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "TIMESTAMP(3)" + }, + "serializableString" : "`r_time`" + } + } ] + }, + "partitionKeys" : [ ] + } + } + }, + "outputType" : "ROW<`ts` VARCHAR(2147483647), `b` MAP<DOUBLE, DOUBLE>>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, t]], fields=[ts, b])", + "inputProperties" : [ ] + }, { + "id" : 2, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "MAP<DOUBLE, DOUBLE>" + }, { + "kind" : "CALL", + "internalName" : "$TO_TIMESTAMP$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + } ], + "type" : "TIMESTAMP(3)" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`ts` VARCHAR(2147483647), `b` MAP<DOUBLE, DOUBLE>, `r_time` TIMESTAMP(3)>", + "description" : "Calc(select=[ts, b, TO_TIMESTAMP(ts) AS r_time])" + }, { + "id" : 3, + "type" : "stream-exec-watermark-assigner_1", + "watermarkExpr" : { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "TIMESTAMP(3)" + }, + "rowtimeFieldIndex" : 2, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "ts", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "b", + "fieldType" : "MAP<DOUBLE, DOUBLE>" + }, { + "name" : "r_time", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "WatermarkAssigner(rowtime=[r_time], watermark=[r_time])" + }, { + "id" : 4, + "type" : "stream-exec-exchange_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "SINGLETON" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "ts", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "b", + "fieldType" : "MAP<DOUBLE, DOUBLE>" + }, { + "name" : "r_time", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "Exchange(distribution=[single])" + }, { + "id" : 5, + "type" : "stream-exec-over-aggregate_1", + "overSpec" : { + "partition" : { + "fields" : [ ] + }, + "groups" : [ { + "orderBy" : { + "fields" : [ { + "index" : 2, + "isAscending" : true, + "nullIsLast" : false + } ] + }, + "isRows" : false, + "lowerBound" : { + "kind" : "UNBOUNDED_PRECEDING" + }, + "upperBound" : { + "kind" : "CURRENT_ROW" + }, + "aggCalls" : [ { + "name" : "w0$o0", + "internalName" : "$LAG$1", + "argList" : [ 1, 3 ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : "MAP<DOUBLE, DOUBLE>" + } ] + } ], + "constants" : [ { + "kind" : "LITERAL", + "value" : 1, + "type" : "INT NOT NULL" + } ], + "originalInputFields" : 3 + }, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "ts", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "b", + "fieldType" : "MAP<DOUBLE, DOUBLE>" + }, { + "name" : "r_time", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "name" : "w0$o0", + "fieldType" : "MAP<DOUBLE, DOUBLE>" + } ] + }, + "description" : "OverAggregate(orderBy=[r_time ASC], window=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[ts, b, r_time, LAG(b, 1) AS w0$o0])" + }, { + "id" : 6, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : "MAP<DOUBLE, DOUBLE>" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`ts` VARCHAR(2147483647), `$1` MAP<DOUBLE, DOUBLE>>", + "description" : "Calc(select=[ts, w0$o0 AS $1])" + }, { + "id" : 7, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "ts", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "b", + "dataType" : "MAP<DOUBLE, DOUBLE>" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "inputChangelogMode" : [ "INSERT" ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`ts` VARCHAR(2147483647), `$1` MAP<DOUBLE, DOUBLE>>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[ts, $1])" + } ], + "edges" : [ { + "source" : 1, + "target" : 2, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 2, + "target" : 3, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 3, + "target" : 4, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 4, + "target" : 5, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 5, + "target" : 6, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 6, + "target" : 7, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-lag/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-lag/savepoint/_metadata new file mode 100644 index 00000000000..6743d9776ce Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-lag/savepoint/_metadata differ diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/ArrayDataSerializer.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/ArrayDataSerializer.java index 98c6bd8d297..bf9783789fe 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/ArrayDataSerializer.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/ArrayDataSerializer.java @@ -310,8 +310,10 @@ public class ArrayDataSerializer extends TypeSerializer<ArrayData> { throws IOException { try { DataInputViewStream inStream = new DataInputViewStream(in); - this.eleType = InstantiationUtil.deserializeObject(inStream, userCodeClassLoader); - this.eleSer = InstantiationUtil.deserializeObject(inStream, userCodeClassLoader); + this.eleType = + InstantiationUtil.deserializeObject(inStream, userCodeClassLoader, true); + this.eleSer = + InstantiationUtil.deserializeObject(inStream, userCodeClassLoader, true); } catch (ClassNotFoundException e) { throw new IOException(e); } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/MapDataSerializer.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/MapDataSerializer.java index c667777c896..cd58a4bc0a3 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/MapDataSerializer.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/MapDataSerializer.java @@ -44,6 +44,9 @@ import java.io.IOException; @Internal public class MapDataSerializer extends TypeSerializer<MapData> { + // version used since 1.19 + private static final long serialVersionUID = 4073842523628732956L; + private final LogicalType keyType; private final LogicalType valueType; @@ -293,12 +296,14 @@ public class MapDataSerializer extends TypeSerializer<MapData> { throws IOException { try { DataInputViewStream inStream = new DataInputViewStream(in); - this.keyType = InstantiationUtil.deserializeObject(inStream, userCodeClassLoader); - this.valueType = InstantiationUtil.deserializeObject(inStream, userCodeClassLoader); + this.keyType = + InstantiationUtil.deserializeObject(inStream, userCodeClassLoader, true); + this.valueType = + InstantiationUtil.deserializeObject(inStream, userCodeClassLoader, true); this.keySerializer = - InstantiationUtil.deserializeObject(inStream, userCodeClassLoader); + InstantiationUtil.deserializeObject(inStream, userCodeClassLoader, true); this.valueSerializer = - InstantiationUtil.deserializeObject(inStream, userCodeClassLoader); + InstantiationUtil.deserializeObject(inStream, userCodeClassLoader, true); } catch (ClassNotFoundException e) { throw new IOException(e); } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/RowDataSerializer.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/RowDataSerializer.java index 178f4b0e8aa..3106d9e65d5 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/RowDataSerializer.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/RowDataSerializer.java @@ -314,7 +314,8 @@ public class RowDataSerializer extends AbstractRowDataSerializer<RowData> { types = new LogicalType[length]; for (int i = 0; i < length; i++) { try { - types[i] = InstantiationUtil.deserializeObject(stream, userCodeClassLoader); + types[i] = + InstantiationUtil.deserializeObject(stream, userCodeClassLoader, true); } catch (ClassNotFoundException e) { throw new IOException(e); }