This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 62d2422916 [common] Fix RowHelper internal buffer never shrinking for 
large records (#8159)
62d2422916 is described below

commit 62d24229169b54c5b41aa2afa943038fd303e0d4
Author: yugan <[email protected]>
AuthorDate: Thu Jun 11 13:09:24 2026 +0800

    [common] Fix RowHelper internal buffer never shrinking for large records 
(#8159)
    
    `RowHelper.reuseWriter` grows its internal `MemorySegment` for large
    records (e.g. 100MB+), but `BinaryRowWriter.reset()` only resets the
    cursor without releasing the oversized segment. Additionally,
    `InternalRowSerializer.serialize()` can exit via `EOFException` — a
    normal signal when the sort buffer is full
    (`SimpleCollectingOutputView.nextSegment()` throws it, caught by
    `BinaryInMemorySortBuffer.write()`) — skipping any cleanup of the
    bloated buffer.
    
    With many buckets (e.g. 256), each bucket's writer independently retains
    an inflated buffer: 256 × 100MB+ = tens of GB, causing OOM.
---
 .../java/org/apache/paimon/data/RowHelper.java     |  40 +++++
 .../data/serializer/InternalRowSerializer.java     |  14 +-
 .../java/org/apache/paimon/data/RowHelperTest.java | 169 +++++++++++++++++++++
 3 files changed, 221 insertions(+), 2 deletions(-)

diff --git a/paimon-common/src/main/java/org/apache/paimon/data/RowHelper.java 
b/paimon-common/src/main/java/org/apache/paimon/data/RowHelper.java
index a5df6cdf78..f18e2d219f 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/RowHelper.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/RowHelper.java
@@ -36,6 +36,20 @@ public class RowHelper implements Serializable {
 
     private static final long serialVersionUID = 1L;
 
+    /**
+     * Maximum retained reuse buffer size in bytes. Buffers exceeding this cap 
are eligible for
+     * release when the shrink ratio condition is also met.
+     */
+    private static final int MAX_RETAINED_REUSE_BUFFER_SIZE = 4 * 1024 * 1024; 
// 4MB
+
+    /**
+     * Shrink ratio for hysteresis. The buffer is released only when its 
capacity exceeds {@link
+     * #MAX_RETAINED_REUSE_BUFFER_SIZE} AND is more than {@code SHRINK_RATIO} 
times the current
+     * row's size. This avoids thrashing for sustained medium-to-large records 
while still releasing
+     * after a spike (e.g. 100MB buffer with 5MB rows → 20x > 4x → release).
+     */
+    private static final int SHRINK_RATIO = 4;
+
     private final FieldGetter[] fieldGetters;
     private final ValueSetter[] valueSetters;
     private final boolean[] writeNulls;
@@ -81,6 +95,32 @@ public class RowHelper implements Serializable {
         reuseWriter.complete();
     }
 
+    /**
+     * Release the internal reuse buffer if the given row is the reuse row 
produced by this helper,
+     * the backing segment exceeds the maximum retained size, and the buffer 
is clearly oversized
+     * relative to the current record. The identity check ({@code currentRow 
== reuseRow}) ensures
+     * we only act when the caller actually used this helper's buffer.
+     *
+     * <p>The release condition combines a fixed cap with a relative ratio 
check:
+     *
+     * <ul>
+     *   <li>bufferCapacity > {@link #MAX_RETAINED_REUSE_BUFFER_SIZE} — the 
buffer was inflated
+     *       beyond the baseline
+     *   <li>bufferCapacity > currentRow.getSizeInBytes() * {@link 
#SHRINK_RATIO} — the buffer is
+     *       significantly larger than the current workload needs
+     * </ul>
+     */
+    public void resetIfTooLarge(BinaryRow currentRow) {
+        if (currentRow == reuseRow && reuseWriter != null && 
reuseWriter.getSegments() != null) {
+            int bufferCapacity = reuseWriter.getSegments().size();
+            if (bufferCapacity > MAX_RETAINED_REUSE_BUFFER_SIZE
+                    && bufferCapacity > (long) currentRow.getSizeInBytes() * 
SHRINK_RATIO) {
+                reuseRow = null;
+                reuseWriter = null;
+            }
+        }
+    }
+
     public BinaryRow reuseRow() {
         return reuseRow;
     }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
index c3abc2f4ce..b77f1ae39f 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
@@ -59,7 +59,12 @@ public class InternalRowSerializer extends 
AbstractRowDataSerializer<InternalRow
 
     @Override
     public void serialize(InternalRow row, DataOutputView target) throws 
IOException {
-        binarySerializer.serialize(toBinaryRow(row), target);
+        BinaryRow binaryRow = toBinaryRow(row);
+        try {
+            binarySerializer.serialize(binaryRow, target);
+        } finally {
+            rowHelper.resetIfTooLarge(binaryRow);
+        }
     }
 
     @Override
@@ -132,7 +137,12 @@ public class InternalRowSerializer extends 
AbstractRowDataSerializer<InternalRow
     @Override
     public int serializeToPages(InternalRow row, AbstractPagedOutputView 
target)
             throws IOException {
-        return binarySerializer.serializeToPages(toBinaryRow(row), target);
+        BinaryRow binaryRow = toBinaryRow(row);
+        try {
+            return binarySerializer.serializeToPages(binaryRow, target);
+        } finally {
+            rowHelper.resetIfTooLarge(binaryRow);
+        }
     }
 
     @Override
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/data/RowHelperTest.java 
b/paimon-common/src/test/java/org/apache/paimon/data/RowHelperTest.java
new file mode 100644
index 0000000000..2fa2625139
--- /dev/null
+++ b/paimon-common/src/test/java/org/apache/paimon/data/RowHelperTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data;
+
+import org.apache.paimon.memory.MemorySegment;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link RowHelper}, focusing on the resetIfTooLarge(BinaryRow) 
behavior. */
+class RowHelperTest {
+
+    @Test
+    void testReleasesWhenSpikeFollowedBySmallRecords() {
+        RowHelper helper = new RowHelper(Arrays.asList(DataTypes.STRING(), 
DataTypes.BYTES()));
+
+        // Write a large record (> 4MB) to inflate the internal buffer
+        byte[] largePayload = new byte[5 * 1024 * 1024];
+        GenericRow largeRow = GenericRow.of(BinaryString.fromString("key"), 
largePayload);
+        largeRow.setRowKind(RowKind.INSERT);
+        helper.copyInto(largeRow);
+        BinaryRow reuseAfterLarge = helper.reuseRow();
+        assertThat(reuseAfterLarge).isNotNull();
+
+        // buffer ~8MB, row ~5MB → ratio ~1.6x < 4x → should NOT release
+        helper.resetIfTooLarge(reuseAfterLarge);
+        assertThat(helper.reuseRow()).isNotNull();
+
+        // Write a small record — buffer still oversized
+        GenericRow smallRow = GenericRow.of(BinaryString.fromString("s"), new 
byte[10]);
+        smallRow.setRowKind(RowKind.INSERT);
+        helper.copyInto(smallRow);
+
+        // buffer ~8MB, row ~50B → ratio huge > 4x, buffer > 4MB → release
+        helper.resetIfTooLarge(helper.reuseRow());
+        assertThat(helper.reuseRow()).isNull();
+    }
+
+    @Test
+    void testReleasesWhenSpikeFollowedByMediumRecords() {
+        RowHelper helper = new RowHelper(Arrays.asList(DataTypes.STRING(), 
DataTypes.BYTES()));
+
+        // Write a very large record (100MB) to inflate the buffer 
significantly
+        byte[] hugePayload = new byte[100 * 1024 * 1024];
+        GenericRow hugeRow = GenericRow.of(BinaryString.fromString("key"), 
hugePayload);
+        hugeRow.setRowKind(RowKind.INSERT);
+        helper.copyInto(hugeRow);
+        assertThat(helper.reuseRow()).isNotNull();
+
+        // Write a medium record (5MB) — buffer is ~150MB (from grow), row is 
~5MB
+        // ratio ~30x > 4x, buffer > 4MB → should release
+        byte[] mediumPayload = new byte[5 * 1024 * 1024];
+        GenericRow mediumRow = GenericRow.of(BinaryString.fromString("m"), 
mediumPayload);
+        mediumRow.setRowKind(RowKind.INSERT);
+        helper.copyInto(mediumRow);
+
+        helper.resetIfTooLarge(helper.reuseRow());
+        assertThat(helper.reuseRow()).isNull();
+    }
+
+    @Test
+    void testRetainsWhenBufferProportionalToRecordSize() {
+        RowHelper helper = new RowHelper(Arrays.asList(DataTypes.STRING(), 
DataTypes.BYTES()));
+
+        // Write a 5MB record — buffer grows to ~8MB via grow() (1.5x strategy)
+        byte[] payload = new byte[5 * 1024 * 1024];
+        GenericRow row = GenericRow.of(BinaryString.fromString("key"), 
payload);
+        row.setRowKind(RowKind.INSERT);
+        helper.copyInto(row);
+
+        // buffer ~8MB, row ~5MB → ratio ~1.6x < 4x → should NOT release
+        // even though buffer > 4MB
+        helper.resetIfTooLarge(helper.reuseRow());
+        assertThat(helper.reuseRow()).isNotNull();
+    }
+
+    @Test
+    void testKeepsSmallBuffer() {
+        RowHelper helper = new RowHelper(Arrays.asList(DataTypes.STRING(), 
DataTypes.INT()));
+
+        GenericRow smallRow = GenericRow.of(BinaryString.fromString("hello"), 
42);
+        smallRow.setRowKind(RowKind.INSERT);
+        helper.copyInto(smallRow);
+        BinaryRow reuse = helper.reuseRow();
+        assertThat(reuse).isNotNull();
+
+        // Small buffer (< 4MB) — should NOT be released regardless of ratio
+        helper.resetIfTooLarge(reuse);
+        assertThat(helper.reuseRow()).isNotNull();
+    }
+
+    @Test
+    void testSkipsWhenCurrentRowIsNotReuseRow() {
+        RowHelper helper = new RowHelper(Arrays.asList(DataTypes.STRING(), 
DataTypes.BYTES()));
+
+        // Write a large record to inflate the buffer
+        byte[] largePayload = new byte[5 * 1024 * 1024];
+        GenericRow largeRow = GenericRow.of(BinaryString.fromString("key"), 
largePayload);
+        largeRow.setRowKind(RowKind.INSERT);
+        helper.copyInto(largeRow);
+        assertThat(helper.reuseRow()).isNotNull();
+
+        // Pass a different BinaryRow — simulates toBinaryRow() returning 
input directly
+        BinaryRow externalRow = new BinaryRow(2);
+        externalRow.pointTo(MemorySegment.wrap(new byte[32]), 0, 32);
+
+        // Should NOT release because externalRow != reuseRow
+        helper.resetIfTooLarge(externalRow);
+        assertThat(helper.reuseRow()).isNotNull();
+    }
+
+    @Test
+    void testSafeToCallWithNullReuseRow() {
+        RowHelper helper = new RowHelper(Arrays.asList(DataTypes.STRING()));
+        assertThat(helper.reuseRow()).isNull();
+
+        BinaryRow someRow = new BinaryRow(1);
+        someRow.pointTo(MemorySegment.wrap(new byte[16]), 0, 16);
+        helper.resetIfTooLarge(someRow);
+        assertThat(helper.reuseRow()).isNull();
+    }
+
+    @Test
+    void testReuseRecreatedAfterRelease() {
+        RowHelper helper = new RowHelper(Arrays.asList(DataTypes.STRING(), 
DataTypes.BYTES()));
+
+        // Inflate buffer, then transition to small
+        byte[] largePayload = new byte[5 * 1024 * 1024];
+        GenericRow largeRow = GenericRow.of(BinaryString.fromString("key"), 
largePayload);
+        largeRow.setRowKind(RowKind.INSERT);
+        helper.copyInto(largeRow);
+
+        GenericRow smallRow = GenericRow.of(BinaryString.fromString("small"), 
new byte[10]);
+        smallRow.setRowKind(RowKind.INSERT);
+        helper.copyInto(smallRow);
+
+        helper.resetIfTooLarge(helper.reuseRow());
+        assertThat(helper.reuseRow()).isNull();
+
+        // Write another small record — reuseRow should be recreated
+        helper.copyInto(smallRow);
+        assertThat(helper.reuseRow()).isNotNull();
+
+        // Small buffer should survive
+        helper.resetIfTooLarge(helper.reuseRow());
+        assertThat(helper.reuseRow()).isNotNull();
+    }
+}

Reply via email to