This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 360aa9709bb [FLINK-39248][runtime] Fix RowDataKeySerializerSnapshot 
restore
360aa9709bb is described below

commit 360aa9709bbe25e2673e04e17a374d90480ce967
Author: Roman Khachatryan <[email protected]>
AuthorDate: Thu Mar 12 15:49:52 2026 +0100

    [FLINK-39248][runtime] Fix RowDataKeySerializerSnapshot restore
---
 .../linked/RowDataKeySerializerSnapshot.java       |   2 +-
 .../linked/RowDataKeySerializerSnapshotTest.java   | 106 +++++++++++++++++++++
 2 files changed, 107 insertions(+), 1 deletion(-)

diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKeySerializerSnapshot.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKeySerializerSnapshot.java
index 08d9888e18b..f4a690dbf5b 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKeySerializerSnapshot.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKeySerializerSnapshot.java
@@ -92,7 +92,7 @@ public class RowDataKeySerializerSnapshot implements 
TypeSerializerSnapshot<RowD
     private <T> T restore(DataInputView in, ClassLoader classLoader) throws 
IOException {
         int len = in.readInt();
         byte[] bytes = new byte[len];
-        in.read(bytes);
+        in.readFully(bytes);
         try {
             return InstantiationUtil.deserializeObject(bytes, classLoader); // 
here
         } catch (ClassNotFoundException e) {
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKeySerializerSnapshotTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKeySerializerSnapshotTest.java
new file mode 100644
index 00000000000..e800dfe080d
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKeySerializerSnapshotTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.sequencedmultisetstate.linked;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.runtime.util.StreamRecordUtils;
+import org.apache.flink.table.types.logical.IntType;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import static 
org.apache.flink.table.runtime.sequencedmultisetstate.linked.RowDataKeySerializerTest.EQUALISER;
+import static 
org.apache.flink.table.runtime.sequencedmultisetstate.linked.RowDataKeySerializerTest.HASH_FUNCTION;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link RowDataKeySerializerSnapshot}, specifically partial-read 
scenarios. */
+class RowDataKeySerializerSnapshotTest {
+
+    /**
+     * Verifies that the snapshot can be restored when the underlying stream 
returns fewer bytes
+     * than requested from {@code read(byte[], int, int)} (e.g., when data 
spans multiple memory
+     * segments). This is a regression test for a bug where {@code 
DataInputView.read()} was used
+     * instead of {@code readFully()}.
+     */
+    @Test
+    void testSnapshotRestoreWithPartialReads() throws Exception {
+        RowDataKeySerializer serializer = createSerializer();
+        RowDataKeySerializerSnapshot snapshot = new 
RowDataKeySerializerSnapshot(serializer);
+
+        // Write snapshot to bytes.
+        byte[] snapshotBytes;
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+            DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(baos);
+            snapshot.writeSnapshot(out);
+            snapshotBytes = baos.toByteArray();
+        }
+
+        // Read snapshot back through a stream that returns at most 1 byte per 
read() call.
+        RowDataKeySerializerSnapshot restored = new 
RowDataKeySerializerSnapshot();
+        try (InputStream bais =
+                new OneByteAtATimeInputStream(new 
ByteArrayInputStream(snapshotBytes))) {
+            DataInputView in = new DataInputViewStreamWrapper(bais);
+            restored.readSnapshot(0, in, 
Thread.currentThread().getContextClassLoader());
+        }
+
+        // Verify the restored serializer works.
+        RowDataKeySerializer restoredSerializer =
+                (RowDataKeySerializer) restored.restoreSerializer();
+        RowDataKey original =
+                RowDataKey.toKeyNotProjected(
+                        StreamRecordUtils.row(42),
+                        serializer.equalizerInstance,
+                        serializer.hashFunctionInstance);
+        assertThat(restoredSerializer.copy(original)).isEqualTo(original);
+    }
+
+    private static RowDataKeySerializer createSerializer() {
+        return new RowDataKeySerializer(
+                new RowDataSerializer(new IntType()),
+                
EQUALISER.newInstance(RowDataKeySerializerSnapshotTest.class.getClassLoader()),
+                
HASH_FUNCTION.newInstance(RowDataKeySerializerSnapshotTest.class.getClassLoader()),
+                EQUALISER,
+                HASH_FUNCTION);
+    }
+
+    /**
+     * An {@link InputStream} wrapper that returns at most 1 byte per {@code 
read(byte[], int, int)}
+     * call, simulating the behavior of streams that don't fulfill the full 
read request (e.g.,
+     * multi-segment memory views or network streams).
+     */
+    private static class OneByteAtATimeInputStream extends FilterInputStream {
+
+        OneByteAtATimeInputStream(InputStream in) {
+            super(in);
+        }
+
+        @Override
+        public int read(byte[] b, int off, int len) throws IOException {
+            return super.read(b, off, Math.min(1, len));
+        }
+    }
+}

Reply via email to