This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 85de16f41a [common] Fix integer overflow in
HeapBytesVector.reserveBytes() (#8158)
85de16f41a is described below
commit 85de16f41a202aa2aa12474da7872ac304eda1e5
Author: yugan <[email protected]>
AuthorDate: Mon Jun 8 13:25:28 2026 +0800
[common] Fix integer overflow in HeapBytesVector.reserveBytes() (#8158)
`HeapBytesVector.reserveBytes()` computes `newCapacity * 2` using plain
`int` multiplication. When `newCapacity` exceeds ~1.07 billion bytes,
this overflows `Integer.MAX_VALUE`, causing `NegativeArraySizeException`
or silent data corruption during compaction reads of large records.
---
.../paimon/data/columnar/heap/HeapBytesVector.java | 48 +++++--
.../heap/HeapBytesVectorReserveBytesTest.java | 157 +++++++++++++++++++++
2 files changed, 190 insertions(+), 15 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapBytesVector.java
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapBytesVector.java
index be0c05eee6..c72633ebf1 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapBytesVector.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapBytesVector.java
@@ -80,7 +80,8 @@ public class HeapBytesVector extends AbstractHeapVector
implements WritableBytes
@Override
public void putByteArray(int elementNum, byte[] sourceBuf, int start, int
length) {
- reserveBytes(bytesAppended + length);
+ long requiredCapacity = (long) bytesAppended + length;
+ reserveBytes(requiredCapacity);
System.arraycopy(sourceBuf, start, buffer, bytesAppended, length);
this.start[elementNum] = bytesAppended;
this.length[elementNum] = length;
@@ -96,7 +97,8 @@ public class HeapBytesVector extends AbstractHeapVector
implements WritableBytes
@Override
public void fill(byte[] value) {
- reserveBytes(start.length * value.length);
+ long requiredCapacity = (long) start.length * value.length;
+ reserveBytes(requiredCapacity);
for (int i = 0; i < start.length; i++) {
System.arraycopy(value, 0, buffer, i * value.length, value.length);
}
@@ -106,19 +108,35 @@ public class HeapBytesVector extends AbstractHeapVector
implements WritableBytes
Arrays.fill(this.length, value.length);
}
- private void reserveBytes(int newCapacity) {
- if (newCapacity > buffer.length) {
- int newBytesCapacity = newCapacity * 2;
- try {
- buffer = Arrays.copyOf(buffer, newBytesCapacity);
- } catch (NegativeArraySizeException e) {
- throw new RuntimeException(
- String.format(
- "The new claimed capacity %s is too large,
will overflow the INTEGER.MAX after multiply by 2. "
- + "Try reduce `read.batch-size` to
avoid this exception.",
- newCapacity),
- e);
- }
+ /** The maximum size of array to allocate. Some VMs reserve header words
in an array. */
+ static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
+
+ private void reserveBytes(long requiredCapacity) {
+ if (requiredCapacity > buffer.length) {
+ buffer = Arrays.copyOf(buffer,
calculateNewBytesCapacity(requiredCapacity));
+ }
+ }
+
+ /**
+ * Calculate the new buffer capacity for the given required capacity.
Visible for testing.
+ *
+ * <p>The strategy is: double the required capacity for amortized growth
when safe. If doubling
+ * would exceed {@link #MAX_ARRAY_SIZE}, fall back to the exact required
capacity. Throws if the
+ * required capacity itself exceeds {@link #MAX_ARRAY_SIZE}.
+ */
+ static int calculateNewBytesCapacity(long requiredCapacity) {
+ if (requiredCapacity > MAX_ARRAY_SIZE) {
+ throw new RuntimeException(
+ String.format(
+ "The required byte buffer capacity %d exceeds the
maximum array size %d. "
+ + "Try reducing `read.batch-size` to avoid
this exception.",
+ requiredCapacity, MAX_ARRAY_SIZE));
+ }
+ int intCapacity = (int) requiredCapacity;
+ if (intCapacity <= (MAX_ARRAY_SIZE >> 1)) {
+ return intCapacity << 1;
+ } else {
+ return intCapacity;
}
}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/data/columnar/heap/HeapBytesVectorReserveBytesTest.java
b/paimon-common/src/test/java/org/apache/paimon/data/columnar/heap/HeapBytesVectorReserveBytesTest.java
new file mode 100644
index 0000000000..c0bb3e1ae0
--- /dev/null
+++
b/paimon-common/src/test/java/org/apache/paimon/data/columnar/heap/HeapBytesVectorReserveBytesTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data.columnar.heap;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Tests for {@link HeapBytesVector#putByteArray}, focusing on reserveBytes()
overflow safety and
+ * the capacity-growth calculation in {@link
HeapBytesVector#calculateNewBytesCapacity}.
+ */
+class HeapBytesVectorReserveBytesTest {
+
+ @Test
+ void testNormalGrowthDoublesCapacity() {
+ HeapBytesVector vector = new HeapBytesVector(4);
+ int initialBufferSize = vector.buffer.length;
+
+ // Write enough data to trigger growth
+ byte[] data = new byte[initialBufferSize + 1];
+ vector.putByteArray(0, data, 0, data.length);
+
+ // Buffer should have doubled
+ assertThat(vector.buffer.length).isEqualTo((initialBufferSize + 1) *
2);
+ }
+
+ @Test
+ void testPutByteArrayStoresDataCorrectly() {
+ HeapBytesVector vector = new HeapBytesVector(4);
+
+ byte[] data1 = new byte[] {1, 2, 3};
+ byte[] data2 = new byte[] {4, 5, 6, 7};
+ vector.putByteArray(0, data1, 0, data1.length);
+ vector.putByteArray(1, data2, 0, data2.length);
+
+ HeapBytesVector.Bytes bytes0 = vector.getBytes(0);
+ assertThat(bytes0.len).isEqualTo(3);
+ assertThat(vector.buffer[bytes0.offset]).isEqualTo((byte) 1);
+ assertThat(vector.buffer[bytes0.offset + 2]).isEqualTo((byte) 3);
+
+ HeapBytesVector.Bytes bytes1 = vector.getBytes(1);
+ assertThat(bytes1.len).isEqualTo(4);
+ assertThat(vector.buffer[bytes1.offset]).isEqualTo((byte) 4);
+ }
+
+ @Test
+ void testLargeCapacityDoesNotOverflow() {
+ HeapBytesVector vector = new HeapBytesVector(2);
+
+ // Simulate a scenario where the required capacity is large but still
within
+ // MAX_ARRAY_SIZE. We can't actually allocate Integer.MAX_VALUE bytes
in a test,
+ // but we can verify the logic by checking that a moderately large
allocation works.
+ int largeSize = 64 * 1024 * 1024; // 64MB
+ byte[] largeData = new byte[largeSize];
+ vector.putByteArray(0, largeData, 0, largeData.length);
+
+ assertThat(vector.buffer.length).isGreaterThanOrEqualTo(largeSize);
+ assertThat(vector.getBytes(0).len).isEqualTo(largeSize);
+ }
+
+ // ---- Tests for calculateNewBytesCapacity (no large allocation needed)
----
+
+ @Test
+ void testCalculateNewBytesCapacityDoublesSmallValues() {
+
assertThat(HeapBytesVector.calculateNewBytesCapacity(100)).isEqualTo(200);
+ assertThat(HeapBytesVector.calculateNewBytesCapacity(1)).isEqualTo(2);
+ assertThat(HeapBytesVector.calculateNewBytesCapacity(1024 * 1024))
+ .isEqualTo(2 * 1024 * 1024);
+ }
+
+ @Test
+ void testCalculateNewBytesCapacityAtHalfMaxReturnsDoubled() {
+ int halfMax = HeapBytesVector.MAX_ARRAY_SIZE >> 1;
+ // Exactly at the boundary: should still double
+
assertThat(HeapBytesVector.calculateNewBytesCapacity(halfMax)).isEqualTo(halfMax
<< 1);
+ }
+
+ @Test
+ void testCalculateNewBytesCapacityAboveHalfMaxReturnsExact() {
+ int halfMax = HeapBytesVector.MAX_ARRAY_SIZE >> 1;
+ int justAboveHalf = halfMax + 1;
+ // Above the doubling threshold: should return exact required
capacity, not MAX_ARRAY_SIZE
+ assertThat(HeapBytesVector.calculateNewBytesCapacity(justAboveHalf))
+ .isEqualTo(justAboveHalf);
+ }
+
+ @Test
+ void testCalculateNewBytesCapacityAtMaxArraySizeReturnsExact() {
+ // Exactly at MAX_ARRAY_SIZE: should return exact value
+
assertThat(HeapBytesVector.calculateNewBytesCapacity(HeapBytesVector.MAX_ARRAY_SIZE))
+ .isEqualTo(HeapBytesVector.MAX_ARRAY_SIZE);
+ }
+
+ @Test
+ void testCalculateNewBytesCapacityAboveMaxArraySizeThrows() {
+ long aboveMax = (long) HeapBytesVector.MAX_ARRAY_SIZE + 1;
+ assertThatThrownBy(() ->
HeapBytesVector.calculateNewBytesCapacity(aboveMax))
+ .isInstanceOf(RuntimeException.class)
+ .hasMessageContaining("exceeds the maximum array size");
+ }
+
+ @Test
+ void testCalculateNewBytesCapacityLongOverflowThrows() {
+ // Simulate what would happen if int arithmetic overflowed:
+ // e.g. bytesAppended=Integer.MAX_VALUE, length=1 => long sum >
MAX_ARRAY_SIZE
+ long overflowedCapacity = (long) Integer.MAX_VALUE + 1;
+ assertThatThrownBy(() ->
HeapBytesVector.calculateNewBytesCapacity(overflowedCapacity))
+ .isInstanceOf(RuntimeException.class)
+ .hasMessageContaining("exceeds the maximum array size");
+ }
+
+ @Test
+ void testCalculateNewBytesCapacityNegativeLongThrows() {
+ // A very large long value that would represent an int overflow
scenario
+ long hugeCapacity = 3_000_000_000L;
+ assertThatThrownBy(() ->
HeapBytesVector.calculateNewBytesCapacity(hugeCapacity))
+ .isInstanceOf(RuntimeException.class)
+ .hasMessageContaining("exceeds the maximum array size");
+ }
+
+ @Test
+ void testResetClearsBytesAppended() {
+ HeapBytesVector vector = new HeapBytesVector(4);
+
+ byte[] data = new byte[] {1, 2, 3};
+ vector.putByteArray(0, data, 0, data.length);
+
+ vector.reset();
+
+ // After reset, we should be able to write again from the beginning
+ byte[] data2 = new byte[] {10, 20};
+ vector.putByteArray(0, data2, 0, data2.length);
+
+ HeapBytesVector.Bytes bytes = vector.getBytes(0);
+ assertThat(bytes.offset).isEqualTo(0);
+ assertThat(bytes.len).isEqualTo(2);
+ assertThat(vector.buffer[0]).isEqualTo((byte) 10);
+ }
+}