This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 64b0534786 [common] Fix BinaryRowSerializer reuse buffer never 
shrinking (#8160)
64b0534786 is described below

commit 64b0534786c8295eec2c0cfd5406a084fdbde85e
Author: yugan <[email protected]>
AuthorDate: Thu Jun 11 16:30:29 2026 +0800

    [common] Fix BinaryRowSerializer reuse buffer never shrinking (#8160)
---
 .../java/org/apache/paimon/data/RowHelper.java     |   4 +-
 .../data/serializer/BinaryRowSerializer.java       |   5 +
 .../serializer/BinaryRowSerializerShrinkTest.java  | 168 +++++++++++++++++++++
 3 files changed, 175 insertions(+), 2 deletions(-)

diff --git a/paimon-common/src/main/java/org/apache/paimon/data/RowHelper.java 
b/paimon-common/src/main/java/org/apache/paimon/data/RowHelper.java
index f18e2d219f..5f5224465e 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/RowHelper.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/RowHelper.java
@@ -40,7 +40,7 @@ public class RowHelper implements Serializable {
      * Maximum retained reuse buffer size in bytes. Buffers exceeding this cap 
are eligible for
      * release when the shrink ratio condition is also met.
      */
-    private static final int MAX_RETAINED_REUSE_BUFFER_SIZE = 4 * 1024 * 1024; 
// 4MB
+    public static final int MAX_RETAINED_REUSE_BUFFER_SIZE = 4 * 1024 * 1024; 
// 4MB
 
     /**
      * Shrink ratio for hysteresis. The buffer is released only when its 
capacity exceeds {@link
@@ -48,7 +48,7 @@ public class RowHelper implements Serializable {
      * row's size. This avoids thrashing for sustained medium-to-large records 
while still releasing
      * after a spike (e.g. 100MB buffer with 5MB rows → 20x > 4x → release).
      */
-    private static final int SHRINK_RATIO = 4;
+    public static final int SHRINK_RATIO = 4;
 
     private final FieldGetter[] fieldGetters;
     private final ValueSetter[] valueSetters;
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/BinaryRowSerializer.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/BinaryRowSerializer.java
index 49dcee73ef..dbf37f520f 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/BinaryRowSerializer.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/BinaryRowSerializer.java
@@ -21,6 +21,7 @@ package org.apache.paimon.data.serializer;
 import org.apache.paimon.data.AbstractPagedInputView;
 import org.apache.paimon.data.AbstractPagedOutputView;
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.RowHelper;
 import org.apache.paimon.io.DataInputView;
 import org.apache.paimon.io.DataOutputView;
 import org.apache.paimon.memory.MemorySegment;
@@ -88,6 +89,10 @@ public class BinaryRowSerializer extends 
AbstractRowDataSerializer<BinaryRow> {
 
         int length = source.readInt();
         if (segments == null || segments[0].size() < length) {
+            // Need a larger buffer
+            segments = new MemorySegment[] {MemorySegment.wrap(new 
byte[length])};
+        } else if (segments[0].size() > 
RowHelper.MAX_RETAINED_REUSE_BUFFER_SIZE
+                && segments[0].size() > (long) length * 
RowHelper.SHRINK_RATIO) {
             segments = new MemorySegment[] {MemorySegment.wrap(new 
byte[length])};
         }
         source.readFully(segments[0].getArray(), 0, length);
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/data/serializer/BinaryRowSerializerShrinkTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/data/serializer/BinaryRowSerializerShrinkTest.java
new file mode 100644
index 0000000000..c1eac7e47a
--- /dev/null
+++ 
b/paimon-common/src/test/java/org/apache/paimon/data/serializer/BinaryRowSerializerShrinkTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data.serializer;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.io.DataInputDeserializer;
+import org.apache.paimon.io.DataOutputSerializer;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests for {@link BinaryRowSerializer#deserialize(BinaryRow, 
org.apache.paimon.io.DataInputView)},
+ * focusing on the combined cap + ratio shrink behavior.
+ */
+class BinaryRowSerializerShrinkTest {
+
+    private static final int MAX_RETAINED = 4 * 1024 * 1024; // 4MB
+
+    @Test
+    void testShrinksWhenSpikeFollowedBySmallRecord() throws Exception {
+        BinaryRowSerializer serializer = new BinaryRowSerializer(1);
+
+        // Inflate buffer with a large record (> 4MB)
+        BinaryRow largeRow = createRowWithPayload(5 * 1024 * 1024);
+        byte[] largeBytes = serializeRow(serializer, largeRow);
+
+        BinaryRow reuse = serializer.createInstance();
+        reuse = serializer.deserialize(reuse, new 
DataInputDeserializer(largeBytes));
+        int largeBufferSize = reuse.getSegments()[0].size();
+        assertThat(largeBufferSize).isGreaterThanOrEqualTo(5 * 1024 * 1024);
+
+        // Deserialize a small record — buffer > 4MB and ratio huge > 4x → 
shrink
+        BinaryRow smallRow = createRowWithPayload(100);
+        byte[] smallBytes = serializeRow(serializer, smallRow);
+        reuse = serializer.deserialize(reuse, new 
DataInputDeserializer(smallBytes));
+        assertThat(reuse.getSegments()[0].size()).isLessThan(MAX_RETAINED);
+    }
+
+    @Test
+    void testShrinksWhenSpikeFollowedByMediumRecord() throws Exception {
+        BinaryRowSerializer serializer = new BinaryRowSerializer(1);
+
+        // Inflate buffer with a very large record (100MB)
+        BinaryRow hugeRow = createRowWithPayload(100 * 1024 * 1024);
+        byte[] hugeBytes = serializeRow(serializer, hugeRow);
+
+        BinaryRow reuse = serializer.createInstance();
+        reuse = serializer.deserialize(reuse, new 
DataInputDeserializer(hugeBytes));
+        int hugeBufferSize = reuse.getSegments()[0].size();
+        assertThat(hugeBufferSize).isGreaterThanOrEqualTo(100 * 1024 * 1024);
+
+        // Deserialize a 5MB record — buffer ~100MB, ratio ~20x > 4x → shrink
+        BinaryRow mediumRow = createRowWithPayload(5 * 1024 * 1024);
+        byte[] mediumBytes = serializeRow(serializer, mediumRow);
+        reuse = serializer.deserialize(reuse, new 
DataInputDeserializer(mediumBytes));
+        assertThat(reuse.getSegments()[0].size()).isLessThan(hugeBufferSize);
+    }
+
+    @Test
+    void testRetainsWhenBufferProportionalToRecordSize() throws Exception {
+        BinaryRowSerializer serializer = new BinaryRowSerializer(1);
+
+        // Inflate buffer with a 5MB record
+        BinaryRow row1 = createRowWithPayload(5 * 1024 * 1024);
+        byte[] bytes1 = serializeRow(serializer, row1);
+
+        BinaryRow reuse = serializer.createInstance();
+        reuse = serializer.deserialize(reuse, new 
DataInputDeserializer(bytes1));
+        int bufferAfterFirst = reuse.getSegments()[0].size();
+        assertThat(bufferAfterFirst).isGreaterThan(MAX_RETAINED);
+
+        // Deserialize another record just above threshold — ratio ~1.2x < 4x 
→ retain
+        BinaryRow row2 = createRowWithPayload(MAX_RETAINED + 100);
+        byte[] bytes2 = serializeRow(serializer, row2);
+        reuse = serializer.deserialize(reuse, new 
DataInputDeserializer(bytes2));
+        assertThat(reuse.getSegments()[0].size()).isEqualTo(bufferAfterFirst);
+    }
+
+    @Test
+    void testKeepsSmallBuffer() throws Exception {
+        BinaryRowSerializer serializer = new BinaryRowSerializer(1);
+
+        BinaryRow row1 = createRowWithPayload(1024);
+        byte[] bytes1 = serializeRow(serializer, row1);
+
+        BinaryRow reuse = serializer.createInstance();
+        reuse = serializer.deserialize(reuse, new 
DataInputDeserializer(bytes1));
+        int bufferSize1 = reuse.getSegments()[0].size();
+
+        // Smaller record — buffer < 4MB, should reuse without shrinking
+        BinaryRow row2 = createRowWithPayload(100);
+        byte[] bytes2 = serializeRow(serializer, row2);
+        reuse = serializer.deserialize(reuse, new 
DataInputDeserializer(bytes2));
+        assertThat(reuse.getSegments()[0].size()).isEqualTo(bufferSize1);
+    }
+
+    @Test
+    void testGrowsBufferWhenNeeded() throws Exception {
+        BinaryRowSerializer serializer = new BinaryRowSerializer(1);
+
+        BinaryRow smallRow = createRowWithPayload(100);
+        byte[] smallBytes = serializeRow(serializer, smallRow);
+
+        BinaryRow reuse = serializer.createInstance();
+        reuse = serializer.deserialize(reuse, new 
DataInputDeserializer(smallBytes));
+
+        // Larger record arrives — buffer must grow
+        BinaryRow largerRow = createRowWithPayload(2048);
+        byte[] largerBytes = serializeRow(serializer, largerRow);
+        reuse = serializer.deserialize(reuse, new 
DataInputDeserializer(largerBytes));
+        assertThat(reuse.getSegments()[0].size()).isGreaterThanOrEqualTo(2048);
+    }
+
+    @Test
+    void testRetainsBufferForConsecutiveLargeRecords() throws Exception {
+        BinaryRowSerializer serializer = new BinaryRowSerializer(1);
+
+        // Inflate buffer with 5MB record
+        BinaryRow largeRow1 = createRowWithPayload(5 * 1024 * 1024);
+        byte[] largeBytes1 = serializeRow(serializer, largeRow1);
+
+        BinaryRow reuse = serializer.createInstance();
+        reuse = serializer.deserialize(reuse, new 
DataInputDeserializer(largeBytes1));
+        int bufferAfterFirst = reuse.getSegments()[0].size();
+
+        // Another 5MB record — ratio ~1x < 4x → retain
+        BinaryRow largeRow2 = createRowWithPayload(5 * 1024 * 1024);
+        byte[] largeBytes2 = serializeRow(serializer, largeRow2);
+        reuse = serializer.deserialize(reuse, new 
DataInputDeserializer(largeBytes2));
+        assertThat(reuse.getSegments()[0].size()).isEqualTo(bufferAfterFirst);
+    }
+
+    private static BinaryRow createRowWithPayload(int payloadSize) {
+        BinaryRow row = new BinaryRow(1);
+        BinaryRowWriter writer = new BinaryRowWriter(row, payloadSize + 32);
+        byte[] payload = new byte[payloadSize];
+        writer.writeString(0, BinaryString.fromBytes(payload));
+        writer.complete();
+        return row;
+    }
+
+    private static byte[] serializeRow(BinaryRowSerializer serializer, 
BinaryRow row)
+            throws Exception {
+        DataOutputSerializer output = new 
DataOutputSerializer(row.getSizeInBytes() + 4);
+        serializer.serialize(row, output);
+        return output.getCopyOfBuffer();
+    }
+}

Reply via email to