This is an automated email from the ASF dual-hosted git repository. rkhachatryan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6f422510a8f3150ac67d40d30cca67065a8c54e5 Author: Roman Khachatryan <[email protected]> AuthorDate: Fri Mar 13 13:10:28 2026 +0100 [hotfix][table/runtime] Fix RowDataKeySerializerSnapshot.resolveSchemaCompatibility() --- .../linked/RowDataKeySerializerSnapshot.java | 6 ++- .../linked/RowDataKeySerializerTest.java | 57 ++++++++++++++++++++++ 2 files changed, 61 insertions(+), 2 deletions(-) diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKeySerializerSnapshot.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKeySerializerSnapshot.java index f4a690dbf5b..99bb6a03f2f 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKeySerializerSnapshot.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKeySerializerSnapshot.java @@ -115,8 +115,10 @@ public class RowDataKeySerializerSnapshot implements TypeSerializerSnapshot<RowD RowDataKeySerializerSnapshot old = (RowDataKeySerializerSnapshot) oldSerializerSnapshot; TypeSerializerSchemaCompatibility<RowData> compatibility = - old.restoredRowDataSerializerSnapshot.resolveSchemaCompatibility( - old.serializer.serializer.snapshotConfiguration()); + serializer + .serializer + .snapshotConfiguration() + .resolveSchemaCompatibility(old.restoredRowDataSerializerSnapshot); return mapToOuterCompatibility( compatibility, diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKeySerializerTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKeySerializerTest.java index f78faf9f343..b59e13a0ded 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKeySerializerTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKeySerializerTest.java @@ -20,7 +20,12 @@ package org.apache.flink.table.runtime.sequencedmultisetstate.linked; import org.apache.flink.api.common.typeutils.SerializerTestBase; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.generated.GeneratedHashFunction; import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser; @@ -29,9 +34,16 @@ import org.apache.flink.table.runtime.generated.RecordEqualiser; import org.apache.flink.table.runtime.typeutils.RowDataSerializer; import org.apache.flink.table.runtime.util.StreamRecordUtils; import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.VarCharType; +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.util.Objects; +import static org.assertj.core.api.Assertions.assertThat; + /** Test for {@link RowDataKeySerializer}. */ public class RowDataKeySerializerTest extends SerializerTestBase<RowDataKey> { @@ -62,6 +74,51 @@ public class RowDataKeySerializerTest extends SerializerTestBase<RowDataKey> { return new RowDataKey[] {new RowDataKey(StreamRecordUtils.row(123), equaliser, equaliser)}; } + @Test + void testResolveSchemaCompatibilityWithDifferentSchema() throws Exception { + // Create a snapshot from the "old" serializer (IntType only) and serialize it + RowDataKeySerializer oldSerializer = + new RowDataKeySerializer( + new RowDataSerializer(new IntType()), + equaliser, + equaliser, + EQUALISER, + HASH_FUNCTION); + + byte[] serializedSnapshot; + try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { + TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot( + new DataOutputViewStreamWrapper(out), oldSerializer.snapshotConfiguration()); + serializedSnapshot = out.toByteArray(); + } + + // Restore the "old" snapshot + TypeSerializerSnapshot<RowDataKey> restoredOldSnapshot; + try (ByteArrayInputStream in = new ByteArrayInputStream(serializedSnapshot)) { + restoredOldSnapshot = + TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot( + new DataInputViewStreamWrapper(in), + Thread.currentThread().getContextClassLoader()); + } + + // Create a "new" serializer with a different schema (IntType + VarCharType) + RowDataKeySerializer newSerializer = + new RowDataKeySerializer( + new RowDataSerializer(new IntType(), new VarCharType()), + equaliser, + equaliser, + EQUALISER, + HASH_FUNCTION); + + // The new snapshot should detect incompatibility with the old snapshot + TypeSerializerSchemaCompatibility<RowDataKey> compatibility = + newSerializer + .snapshotConfiguration() + .resolveSchemaCompatibility(restoredOldSnapshot); + + assertThat(compatibility.isIncompatible()).isTrue(); + } + static final GeneratedRecordEqualiser EQUALISER = new GeneratedRecordEqualiser("", "", new Object[0]) {
