This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 64b0534786 [common] Fix BinaryRowSerializer reuse buffer never
shrinking (#8160)
64b0534786 is described below
commit 64b0534786c8295eec2c0cfd5406a084fdbde85e
Author: yugan <[email protected]>
AuthorDate: Thu Jun 11 16:30:29 2026 +0800
[common] Fix BinaryRowSerializer reuse buffer never shrinking (#8160)
---
.../java/org/apache/paimon/data/RowHelper.java | 4 +-
.../data/serializer/BinaryRowSerializer.java | 5 +
.../serializer/BinaryRowSerializerShrinkTest.java | 168 +++++++++++++++++++++
3 files changed, 175 insertions(+), 2 deletions(-)
diff --git a/paimon-common/src/main/java/org/apache/paimon/data/RowHelper.java
b/paimon-common/src/main/java/org/apache/paimon/data/RowHelper.java
index f18e2d219f..5f5224465e 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/RowHelper.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/RowHelper.java
@@ -40,7 +40,7 @@ public class RowHelper implements Serializable {
* Maximum retained reuse buffer size in bytes. Buffers exceeding this cap
are eligible for
* release when the shrink ratio condition is also met.
*/
- private static final int MAX_RETAINED_REUSE_BUFFER_SIZE = 4 * 1024 * 1024;
// 4MB
+ public static final int MAX_RETAINED_REUSE_BUFFER_SIZE = 4 * 1024 * 1024;
// 4MB
/**
* Shrink ratio for hysteresis. The buffer is released only when its
capacity exceeds {@link
@@ -48,7 +48,7 @@ public class RowHelper implements Serializable {
* row's size. This avoids thrashing for sustained medium-to-large records
while still releasing
* after a spike (e.g. 100MB buffer with 5MB rows → 20x > 4x → release).
*/
- private static final int SHRINK_RATIO = 4;
+ public static final int SHRINK_RATIO = 4;
private final FieldGetter[] fieldGetters;
private final ValueSetter[] valueSetters;
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/BinaryRowSerializer.java
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/BinaryRowSerializer.java
index 49dcee73ef..dbf37f520f 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/BinaryRowSerializer.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/BinaryRowSerializer.java
@@ -21,6 +21,7 @@ package org.apache.paimon.data.serializer;
import org.apache.paimon.data.AbstractPagedInputView;
import org.apache.paimon.data.AbstractPagedOutputView;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.RowHelper;
import org.apache.paimon.io.DataInputView;
import org.apache.paimon.io.DataOutputView;
import org.apache.paimon.memory.MemorySegment;
@@ -88,6 +89,10 @@ public class BinaryRowSerializer extends
AbstractRowDataSerializer<BinaryRow> {
int length = source.readInt();
if (segments == null || segments[0].size() < length) {
+ // Need a larger buffer
+ segments = new MemorySegment[] {MemorySegment.wrap(new
byte[length])};
+ } else if (segments[0].size() >
RowHelper.MAX_RETAINED_REUSE_BUFFER_SIZE
+ && segments[0].size() > (long) length *
RowHelper.SHRINK_RATIO) {
segments = new MemorySegment[] {MemorySegment.wrap(new
byte[length])};
}
source.readFully(segments[0].getArray(), 0, length);
diff --git
a/paimon-common/src/test/java/org/apache/paimon/data/serializer/BinaryRowSerializerShrinkTest.java
b/paimon-common/src/test/java/org/apache/paimon/data/serializer/BinaryRowSerializerShrinkTest.java
new file mode 100644
index 0000000000..c1eac7e47a
--- /dev/null
+++
b/paimon-common/src/test/java/org/apache/paimon/data/serializer/BinaryRowSerializerShrinkTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data.serializer;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.io.DataInputDeserializer;
+import org.apache.paimon.io.DataOutputSerializer;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests for {@link BinaryRowSerializer#deserialize(BinaryRow,
org.apache.paimon.io.DataInputView)},
+ * focusing on the combined cap + ratio shrink behavior.
+ */
+class BinaryRowSerializerShrinkTest {
+
+ private static final int MAX_RETAINED = 4 * 1024 * 1024; // 4MB
+
+ @Test
+ void testShrinksWhenSpikeFollowedBySmallRecord() throws Exception {
+ BinaryRowSerializer serializer = new BinaryRowSerializer(1);
+
+ // Inflate buffer with a large record (> 4MB)
+ BinaryRow largeRow = createRowWithPayload(5 * 1024 * 1024);
+ byte[] largeBytes = serializeRow(serializer, largeRow);
+
+ BinaryRow reuse = serializer.createInstance();
+ reuse = serializer.deserialize(reuse, new
DataInputDeserializer(largeBytes));
+ int largeBufferSize = reuse.getSegments()[0].size();
+ assertThat(largeBufferSize).isGreaterThanOrEqualTo(5 * 1024 * 1024);
+
+ // Deserialize a small record — buffer > 4MB and ratio huge > 4x →
shrink
+ BinaryRow smallRow = createRowWithPayload(100);
+ byte[] smallBytes = serializeRow(serializer, smallRow);
+ reuse = serializer.deserialize(reuse, new
DataInputDeserializer(smallBytes));
+ assertThat(reuse.getSegments()[0].size()).isLessThan(MAX_RETAINED);
+ }
+
+ @Test
+ void testShrinksWhenSpikeFollowedByMediumRecord() throws Exception {
+ BinaryRowSerializer serializer = new BinaryRowSerializer(1);
+
+ // Inflate buffer with a very large record (100MB)
+ BinaryRow hugeRow = createRowWithPayload(100 * 1024 * 1024);
+ byte[] hugeBytes = serializeRow(serializer, hugeRow);
+
+ BinaryRow reuse = serializer.createInstance();
+ reuse = serializer.deserialize(reuse, new
DataInputDeserializer(hugeBytes));
+ int hugeBufferSize = reuse.getSegments()[0].size();
+ assertThat(hugeBufferSize).isGreaterThanOrEqualTo(100 * 1024 * 1024);
+
+ // Deserialize a 5MB record — buffer ~100MB, ratio ~20x > 4x → shrink
+ BinaryRow mediumRow = createRowWithPayload(5 * 1024 * 1024);
+ byte[] mediumBytes = serializeRow(serializer, mediumRow);
+ reuse = serializer.deserialize(reuse, new
DataInputDeserializer(mediumBytes));
+ assertThat(reuse.getSegments()[0].size()).isLessThan(hugeBufferSize);
+ }
+
+ @Test
+ void testRetainsWhenBufferProportionalToRecordSize() throws Exception {
+ BinaryRowSerializer serializer = new BinaryRowSerializer(1);
+
+ // Inflate buffer with a 5MB record
+ BinaryRow row1 = createRowWithPayload(5 * 1024 * 1024);
+ byte[] bytes1 = serializeRow(serializer, row1);
+
+ BinaryRow reuse = serializer.createInstance();
+ reuse = serializer.deserialize(reuse, new
DataInputDeserializer(bytes1));
+ int bufferAfterFirst = reuse.getSegments()[0].size();
+ assertThat(bufferAfterFirst).isGreaterThan(MAX_RETAINED);
+
+ // Deserialize another record just above threshold — ratio ~1.2x < 4x
→ retain
+ BinaryRow row2 = createRowWithPayload(MAX_RETAINED + 100);
+ byte[] bytes2 = serializeRow(serializer, row2);
+ reuse = serializer.deserialize(reuse, new
DataInputDeserializer(bytes2));
+ assertThat(reuse.getSegments()[0].size()).isEqualTo(bufferAfterFirst);
+ }
+
+ @Test
+ void testKeepsSmallBuffer() throws Exception {
+ BinaryRowSerializer serializer = new BinaryRowSerializer(1);
+
+ BinaryRow row1 = createRowWithPayload(1024);
+ byte[] bytes1 = serializeRow(serializer, row1);
+
+ BinaryRow reuse = serializer.createInstance();
+ reuse = serializer.deserialize(reuse, new
DataInputDeserializer(bytes1));
+ int bufferSize1 = reuse.getSegments()[0].size();
+
+ // Smaller record — buffer < 4MB, should reuse without shrinking
+ BinaryRow row2 = createRowWithPayload(100);
+ byte[] bytes2 = serializeRow(serializer, row2);
+ reuse = serializer.deserialize(reuse, new
DataInputDeserializer(bytes2));
+ assertThat(reuse.getSegments()[0].size()).isEqualTo(bufferSize1);
+ }
+
+ @Test
+ void testGrowsBufferWhenNeeded() throws Exception {
+ BinaryRowSerializer serializer = new BinaryRowSerializer(1);
+
+ BinaryRow smallRow = createRowWithPayload(100);
+ byte[] smallBytes = serializeRow(serializer, smallRow);
+
+ BinaryRow reuse = serializer.createInstance();
+ reuse = serializer.deserialize(reuse, new
DataInputDeserializer(smallBytes));
+
+ // Larger record arrives — buffer must grow
+ BinaryRow largerRow = createRowWithPayload(2048);
+ byte[] largerBytes = serializeRow(serializer, largerRow);
+ reuse = serializer.deserialize(reuse, new
DataInputDeserializer(largerBytes));
+ assertThat(reuse.getSegments()[0].size()).isGreaterThanOrEqualTo(2048);
+ }
+
+ @Test
+ void testRetainsBufferForConsecutiveLargeRecords() throws Exception {
+ BinaryRowSerializer serializer = new BinaryRowSerializer(1);
+
+ // Inflate buffer with 5MB record
+ BinaryRow largeRow1 = createRowWithPayload(5 * 1024 * 1024);
+ byte[] largeBytes1 = serializeRow(serializer, largeRow1);
+
+ BinaryRow reuse = serializer.createInstance();
+ reuse = serializer.deserialize(reuse, new
DataInputDeserializer(largeBytes1));
+ int bufferAfterFirst = reuse.getSegments()[0].size();
+
+ // Another 5MB record — ratio ~1x < 4x → retain
+ BinaryRow largeRow2 = createRowWithPayload(5 * 1024 * 1024);
+ byte[] largeBytes2 = serializeRow(serializer, largeRow2);
+ reuse = serializer.deserialize(reuse, new
DataInputDeserializer(largeBytes2));
+ assertThat(reuse.getSegments()[0].size()).isEqualTo(bufferAfterFirst);
+ }
+
+ private static BinaryRow createRowWithPayload(int payloadSize) {
+ BinaryRow row = new BinaryRow(1);
+ BinaryRowWriter writer = new BinaryRowWriter(row, payloadSize + 32);
+ byte[] payload = new byte[payloadSize];
+ writer.writeString(0, BinaryString.fromBytes(payload));
+ writer.complete();
+ return row;
+ }
+
+ private static byte[] serializeRow(BinaryRowSerializer serializer,
BinaryRow row)
+ throws Exception {
+ DataOutputSerializer output = new
DataOutputSerializer(row.getSizeInBytes() + 4);
+ serializer.serialize(row, output);
+ return output.getCopyOfBuffer();
+ }
+}