This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 360aa9709bb [FLINK-39248][runtime] Fix RowDataKeySerializerSnapshot
restore
360aa9709bb is described below
commit 360aa9709bbe25e2673e04e17a374d90480ce967
Author: Roman Khachatryan <[email protected]>
AuthorDate: Thu Mar 12 15:49:52 2026 +0100
[FLINK-39248][runtime] Fix RowDataKeySerializerSnapshot restore
---
.../linked/RowDataKeySerializerSnapshot.java | 2 +-
.../linked/RowDataKeySerializerSnapshotTest.java | 106 +++++++++++++++++++++
2 files changed, 107 insertions(+), 1 deletion(-)
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKeySerializerSnapshot.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKeySerializerSnapshot.java
index 08d9888e18b..f4a690dbf5b 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKeySerializerSnapshot.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKeySerializerSnapshot.java
@@ -92,7 +92,7 @@ public class RowDataKeySerializerSnapshot implements
TypeSerializerSnapshot<RowD
private <T> T restore(DataInputView in, ClassLoader classLoader) throws
IOException {
int len = in.readInt();
byte[] bytes = new byte[len];
- in.read(bytes);
+ in.readFully(bytes);
try {
return InstantiationUtil.deserializeObject(bytes, classLoader); //
here
} catch (ClassNotFoundException e) {
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKeySerializerSnapshotTest.java
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKeySerializerSnapshotTest.java
new file mode 100644
index 00000000000..e800dfe080d
--- /dev/null
+++
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKeySerializerSnapshotTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.sequencedmultisetstate.linked;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.runtime.util.StreamRecordUtils;
+import org.apache.flink.table.types.logical.IntType;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import static
org.apache.flink.table.runtime.sequencedmultisetstate.linked.RowDataKeySerializerTest.EQUALISER;
+import static
org.apache.flink.table.runtime.sequencedmultisetstate.linked.RowDataKeySerializerTest.HASH_FUNCTION;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link RowDataKeySerializerSnapshot}, specifically partial-read
scenarios. */
+class RowDataKeySerializerSnapshotTest {
+
+ /**
+ * Verifies that the snapshot can be restored when the underlying stream
returns fewer bytes
+ * than requested from {@code read(byte[], int, int)} (e.g., when data
spans multiple memory
+ * segments). This is a regression test for a bug where {@code
DataInputView.read()} was used
+ * instead of {@code readFully()}.
+ */
+ @Test
+ void testSnapshotRestoreWithPartialReads() throws Exception {
+ RowDataKeySerializer serializer = createSerializer();
+ RowDataKeySerializerSnapshot snapshot = new
RowDataKeySerializerSnapshot(serializer);
+
+ // Write snapshot to bytes.
+ byte[] snapshotBytes;
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+ DataOutputViewStreamWrapper out = new
DataOutputViewStreamWrapper(baos);
+ snapshot.writeSnapshot(out);
+ snapshotBytes = baos.toByteArray();
+ }
+
+ // Read snapshot back through a stream that returns at most 1 byte per
read() call.
+ RowDataKeySerializerSnapshot restored = new
RowDataKeySerializerSnapshot();
+ try (InputStream bais =
+ new OneByteAtATimeInputStream(new
ByteArrayInputStream(snapshotBytes))) {
+ DataInputView in = new DataInputViewStreamWrapper(bais);
+ restored.readSnapshot(0, in,
Thread.currentThread().getContextClassLoader());
+ }
+
+ // Verify the restored serializer works.
+ RowDataKeySerializer restoredSerializer =
+ (RowDataKeySerializer) restored.restoreSerializer();
+ RowDataKey original =
+ RowDataKey.toKeyNotProjected(
+ StreamRecordUtils.row(42),
+ serializer.equalizerInstance,
+ serializer.hashFunctionInstance);
+ assertThat(restoredSerializer.copy(original)).isEqualTo(original);
+ }
+
+ private static RowDataKeySerializer createSerializer() {
+ return new RowDataKeySerializer(
+ new RowDataSerializer(new IntType()),
+
EQUALISER.newInstance(RowDataKeySerializerSnapshotTest.class.getClassLoader()),
+
HASH_FUNCTION.newInstance(RowDataKeySerializerSnapshotTest.class.getClassLoader()),
+ EQUALISER,
+ HASH_FUNCTION);
+ }
+
+ /**
+ * An {@link InputStream} wrapper that returns at most 1 byte per {@code
read(byte[], int, int)}
+ * call, simulating the behavior of streams that don't fulfill the full
read request (e.g.,
+ * multi-segment memory views or network streams).
+ */
+ private static class OneByteAtATimeInputStream extends FilterInputStream {
+
+ OneByteAtATimeInputStream(InputStream in) {
+ super(in);
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ return super.read(b, off, Math.min(1, len));
+ }
+ }
+}