This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 85de16f41a [common] Fix integer overflow in 
HeapBytesVector.reserveBytes() (#8158)
85de16f41a is described below

commit 85de16f41a202aa2aa12474da7872ac304eda1e5
Author: yugan <[email protected]>
AuthorDate: Mon Jun 8 13:25:28 2026 +0800

    [common] Fix integer overflow in HeapBytesVector.reserveBytes() (#8158)
    
    `HeapBytesVector.reserveBytes()` computes `newCapacity * 2` using plain
    `int` multiplication. When `newCapacity` exceeds ~1.07 billion bytes,
    this overflows `Integer.MAX_VALUE`, causing `NegativeArraySizeException`
    or silent data corruption during compaction reads of large records.
---
 .../paimon/data/columnar/heap/HeapBytesVector.java |  48 +++++--
 .../heap/HeapBytesVectorReserveBytesTest.java      | 157 +++++++++++++++++++++
 2 files changed, 190 insertions(+), 15 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapBytesVector.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapBytesVector.java
index be0c05eee6..c72633ebf1 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapBytesVector.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapBytesVector.java
@@ -80,7 +80,8 @@ public class HeapBytesVector extends AbstractHeapVector 
implements WritableBytes
 
     @Override
     public void putByteArray(int elementNum, byte[] sourceBuf, int start, int 
length) {
-        reserveBytes(bytesAppended + length);
+        long requiredCapacity = (long) bytesAppended + length;
+        reserveBytes(requiredCapacity);
         System.arraycopy(sourceBuf, start, buffer, bytesAppended, length);
         this.start[elementNum] = bytesAppended;
         this.length[elementNum] = length;
@@ -96,7 +97,8 @@ public class HeapBytesVector extends AbstractHeapVector 
implements WritableBytes
 
     @Override
     public void fill(byte[] value) {
-        reserveBytes(start.length * value.length);
+        long requiredCapacity = (long) start.length * value.length;
+        reserveBytes(requiredCapacity);
         for (int i = 0; i < start.length; i++) {
             System.arraycopy(value, 0, buffer, i * value.length, value.length);
         }
@@ -106,19 +108,35 @@ public class HeapBytesVector extends AbstractHeapVector 
implements WritableBytes
         Arrays.fill(this.length, value.length);
     }
 
-    private void reserveBytes(int newCapacity) {
-        if (newCapacity > buffer.length) {
-            int newBytesCapacity = newCapacity * 2;
-            try {
-                buffer = Arrays.copyOf(buffer, newBytesCapacity);
-            } catch (NegativeArraySizeException e) {
-                throw new RuntimeException(
-                        String.format(
-                                "The new claimed capacity %s is too large, 
will overflow the INTEGER.MAX after multiply by 2. "
-                                        + "Try reduce `read.batch-size` to 
avoid this exception.",
-                                newCapacity),
-                        e);
-            }
+    /** The maximum size of array to allocate. Some VMs reserve header words 
in an array. */
+    static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
+
+    private void reserveBytes(long requiredCapacity) {
+        if (requiredCapacity > buffer.length) {
+            buffer = Arrays.copyOf(buffer, 
calculateNewBytesCapacity(requiredCapacity));
+        }
+    }
+
+    /**
+     * Calculate the new buffer capacity for the given required capacity. 
Visible for testing.
+     *
+     * <p>The strategy is: double the required capacity for amortized growth 
when safe. If doubling
+     * would exceed {@link #MAX_ARRAY_SIZE}, fall back to the exact required 
capacity. Throws if the
+     * required capacity itself exceeds {@link #MAX_ARRAY_SIZE}.
+     */
+    static int calculateNewBytesCapacity(long requiredCapacity) {
+        if (requiredCapacity > MAX_ARRAY_SIZE) {
+            throw new RuntimeException(
+                    String.format(
+                            "The required byte buffer capacity %d exceeds the 
maximum array size %d. "
+                                    + "Try reducing `read.batch-size` to avoid 
this exception.",
+                            requiredCapacity, MAX_ARRAY_SIZE));
+        }
+        int intCapacity = (int) requiredCapacity;
+        if (intCapacity <= (MAX_ARRAY_SIZE >> 1)) {
+            return intCapacity << 1;
+        } else {
+            return intCapacity;
         }
     }
 
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/data/columnar/heap/HeapBytesVectorReserveBytesTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/data/columnar/heap/HeapBytesVectorReserveBytesTest.java
new file mode 100644
index 0000000000..c0bb3e1ae0
--- /dev/null
+++ 
b/paimon-common/src/test/java/org/apache/paimon/data/columnar/heap/HeapBytesVectorReserveBytesTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data.columnar.heap;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Tests for {@link HeapBytesVector#putByteArray}, focusing on reserveBytes() 
overflow safety and
+ * the capacity-growth calculation in {@link 
HeapBytesVector#calculateNewBytesCapacity}.
+ */
+class HeapBytesVectorReserveBytesTest {
+
+    @Test
+    void testNormalGrowthDoublesCapacity() {
+        HeapBytesVector vector = new HeapBytesVector(4);
+        int initialBufferSize = vector.buffer.length;
+
+        // Write enough data to trigger growth
+        byte[] data = new byte[initialBufferSize + 1];
+        vector.putByteArray(0, data, 0, data.length);
+
+        // Buffer should have doubled
+        assertThat(vector.buffer.length).isEqualTo((initialBufferSize + 1) * 
2);
+    }
+
+    @Test
+    void testPutByteArrayStoresDataCorrectly() {
+        HeapBytesVector vector = new HeapBytesVector(4);
+
+        byte[] data1 = new byte[] {1, 2, 3};
+        byte[] data2 = new byte[] {4, 5, 6, 7};
+        vector.putByteArray(0, data1, 0, data1.length);
+        vector.putByteArray(1, data2, 0, data2.length);
+
+        HeapBytesVector.Bytes bytes0 = vector.getBytes(0);
+        assertThat(bytes0.len).isEqualTo(3);
+        assertThat(vector.buffer[bytes0.offset]).isEqualTo((byte) 1);
+        assertThat(vector.buffer[bytes0.offset + 2]).isEqualTo((byte) 3);
+
+        HeapBytesVector.Bytes bytes1 = vector.getBytes(1);
+        assertThat(bytes1.len).isEqualTo(4);
+        assertThat(vector.buffer[bytes1.offset]).isEqualTo((byte) 4);
+    }
+
+    @Test
+    void testLargeCapacityDoesNotOverflow() {
+        HeapBytesVector vector = new HeapBytesVector(2);
+
+        // Simulate a scenario where the required capacity is large but still 
within
+        // MAX_ARRAY_SIZE. We can't actually allocate Integer.MAX_VALUE bytes 
in a test,
+        // but we can verify the logic by checking that a moderately large 
allocation works.
+        int largeSize = 64 * 1024 * 1024; // 64MB
+        byte[] largeData = new byte[largeSize];
+        vector.putByteArray(0, largeData, 0, largeData.length);
+
+        assertThat(vector.buffer.length).isGreaterThanOrEqualTo(largeSize);
+        assertThat(vector.getBytes(0).len).isEqualTo(largeSize);
+    }
+
+    // ---- Tests for calculateNewBytesCapacity (no large allocation needed) 
----
+
+    @Test
+    void testCalculateNewBytesCapacityDoublesSmallValues() {
+        
assertThat(HeapBytesVector.calculateNewBytesCapacity(100)).isEqualTo(200);
+        assertThat(HeapBytesVector.calculateNewBytesCapacity(1)).isEqualTo(2);
+        assertThat(HeapBytesVector.calculateNewBytesCapacity(1024 * 1024))
+                .isEqualTo(2 * 1024 * 1024);
+    }
+
+    @Test
+    void testCalculateNewBytesCapacityAtHalfMaxReturnsDoubled() {
+        int halfMax = HeapBytesVector.MAX_ARRAY_SIZE >> 1;
+        // Exactly at the boundary: should still double
+        
assertThat(HeapBytesVector.calculateNewBytesCapacity(halfMax)).isEqualTo(halfMax
 << 1);
+    }
+
+    @Test
+    void testCalculateNewBytesCapacityAboveHalfMaxReturnsExact() {
+        int halfMax = HeapBytesVector.MAX_ARRAY_SIZE >> 1;
+        int justAboveHalf = halfMax + 1;
+        // Above the doubling threshold: should return exact required 
capacity, not MAX_ARRAY_SIZE
+        assertThat(HeapBytesVector.calculateNewBytesCapacity(justAboveHalf))
+                .isEqualTo(justAboveHalf);
+    }
+
+    @Test
+    void testCalculateNewBytesCapacityAtMaxArraySizeReturnsExact() {
+        // Exactly at MAX_ARRAY_SIZE: should return exact value
+        
assertThat(HeapBytesVector.calculateNewBytesCapacity(HeapBytesVector.MAX_ARRAY_SIZE))
+                .isEqualTo(HeapBytesVector.MAX_ARRAY_SIZE);
+    }
+
+    @Test
+    void testCalculateNewBytesCapacityAboveMaxArraySizeThrows() {
+        long aboveMax = (long) HeapBytesVector.MAX_ARRAY_SIZE + 1;
+        assertThatThrownBy(() -> 
HeapBytesVector.calculateNewBytesCapacity(aboveMax))
+                .isInstanceOf(RuntimeException.class)
+                .hasMessageContaining("exceeds the maximum array size");
+    }
+
+    @Test
+    void testCalculateNewBytesCapacityLongOverflowThrows() {
+        // Simulate what would happen if int arithmetic overflowed:
+        // e.g. bytesAppended=Integer.MAX_VALUE, length=1 => long sum > 
MAX_ARRAY_SIZE
+        long overflowedCapacity = (long) Integer.MAX_VALUE + 1;
+        assertThatThrownBy(() -> 
HeapBytesVector.calculateNewBytesCapacity(overflowedCapacity))
+                .isInstanceOf(RuntimeException.class)
+                .hasMessageContaining("exceeds the maximum array size");
+    }
+
+    @Test
+    void testCalculateNewBytesCapacityNegativeLongThrows() {
+        // A very large long value that would represent an int overflow 
scenario
+        long hugeCapacity = 3_000_000_000L;
+        assertThatThrownBy(() -> 
HeapBytesVector.calculateNewBytesCapacity(hugeCapacity))
+                .isInstanceOf(RuntimeException.class)
+                .hasMessageContaining("exceeds the maximum array size");
+    }
+
+    @Test
+    void testResetClearsBytesAppended() {
+        HeapBytesVector vector = new HeapBytesVector(4);
+
+        byte[] data = new byte[] {1, 2, 3};
+        vector.putByteArray(0, data, 0, data.length);
+
+        vector.reset();
+
+        // After reset, we should be able to write again from the beginning
+        byte[] data2 = new byte[] {10, 20};
+        vector.putByteArray(0, data2, 0, data2.length);
+
+        HeapBytesVector.Bytes bytes = vector.getBytes(0);
+        assertThat(bytes.offset).isEqualTo(0);
+        assertThat(bytes.len).isEqualTo(2);
+        assertThat(vector.buffer[0]).isEqualTo((byte) 10);
+    }
+}

Reply via email to