This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git

commit 8fd792bdec78cccf935fee83d7b8fb463ce9b2a6
Author: binary-signal <[email protected]>
AuthorDate: Fri Dec 12 13:42:30 2025 +0100

    [common] Fix IndexOutOfBoundsException in ArrowArrayWriter when element 
count exceeds INITIAL_CAPACITY (#2165)
    
    Signed-off-by: binary-signal <[email protected]>
---
 .../fluss/row/arrow/writers/ArrowArrayWriter.java  |  6 +-
 .../fluss/row/arrow/ArrowReaderWriterTest.java     | 66 ++++++++++++++++++++++
 2 files changed, 71 insertions(+), 1 deletion(-)

diff --git 
a/fluss-common/src/main/java/org/apache/fluss/row/arrow/writers/ArrowArrayWriter.java
 
b/fluss-common/src/main/java/org/apache/fluss/row/arrow/writers/ArrowArrayWriter.java
index 548c4e956..ee413e8ef 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/row/arrow/writers/ArrowArrayWriter.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/row/arrow/writers/ArrowArrayWriter.java
@@ -41,7 +41,11 @@ public class ArrowArrayWriter extends ArrowFieldWriter {
         listVector.startNewValue(rowIndex);
         for (int arrIndex = 0; arrIndex < array.size(); arrIndex++) {
             int fieldIndex = offset + arrIndex;
-            elementWriter.write(fieldIndex, array, arrIndex, handleSafe);
+            // Always use safe writes for array elements because the element 
index (offset +
+            // arrIndex) can exceed INITIAL_CAPACITY even when the row count 
doesn't. The parent's
+            // handleSafe is based on row count, but array element indices 
grow based on the total
+            // number of elements across all arrays, which can be much larger.
+            elementWriter.write(fieldIndex, array, arrIndex, true);
         }
         offset += array.size();
         listVector.endValue(rowIndex, array.size());
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/row/arrow/ArrowReaderWriterTest.java
 
b/fluss-common/src/test/java/org/apache/fluss/row/arrow/ArrowReaderWriterTest.java
index fa8d14b3d..001eb4e88 100644
--- 
a/fluss-common/src/test/java/org/apache/fluss/row/arrow/ArrowReaderWriterTest.java
+++ 
b/fluss-common/src/test/java/org/apache/fluss/row/arrow/ArrowReaderWriterTest.java
@@ -262,4 +262,70 @@ class ArrowReaderWriterTest {
                             "The arrow batch size is full and it shouldn't 
accept writing new rows, it's a bug.");
         }
     }
+
+    /**
+     * Tests that array columns work correctly when the total number of array 
elements exceeds
+     * INITIAL_CAPACITY (1024) while the row count stays below it. This 
reproduces a bug where
+     * ArrowArrayWriter used the parent's handleSafe flag (based on row count) 
for element writes,
+     * causing IndexOutOfBoundsException when element indices exceeded the 
vector's initial
+     * capacity.
+     */
+    @Test
+    void testArrayWriterWithManyElements() throws IOException {
+        // Schema with array column
+        RowType rowType =
+                DataTypes.ROW(
+                        DataTypes.FIELD("id", DataTypes.INT()),
+                        DataTypes.FIELD("arr", 
DataTypes.ARRAY(DataTypes.INT())));
+
+        try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
+                VectorSchemaRoot root =
+                        
VectorSchemaRoot.create(ArrowUtils.toArrowSchema(rowType), allocator);
+                ArrowWriterPool provider = new ArrowWriterPool(allocator);
+                ArrowWriter writer =
+                        provider.getOrCreateWriter(
+                                1L, 1, Integer.MAX_VALUE, rowType, 
NO_COMPRESSION)) {
+
+            // Write 200 rows, each with a 10-element array.
+            // Total elements = 2000, exceeding INITIAL_CAPACITY (1024).
+            // But row count (200) < 1024, so handleSafe would be false 
without the fix.
+            int numRows = 200;
+            int arraySize = 10;
+            for (int i = 0; i < numRows; i++) {
+                Integer[] elements = new Integer[arraySize];
+                for (int j = 0; j < arraySize; j++) {
+                    elements[j] = i * arraySize + j;
+                }
+                writer.writeRow(GenericRow.of(i, GenericArray.of(elements)));
+            }
+
+            // Verify serialization works without IndexOutOfBoundsException
+            AbstractPagedOutputView pagedOutputView =
+                    new ManagedPagedOutputView(new TestingMemorySegmentPool(64 
* 1024));
+            int size =
+                    writer.serializeToOutputView(
+                            pagedOutputView, 
arrowChangeTypeOffset(CURRENT_LOG_MAGIC_VALUE));
+            assertThat(size).isGreaterThan(0);
+
+            // Verify the data can be read back correctly
+            int heapMemorySize = Math.max(size, writer.estimatedSizeInBytes());
+            MemorySegment segment = 
MemorySegment.allocateHeapMemory(heapMemorySize);
+            MemorySegment firstSegment = pagedOutputView.getCurrentSegment();
+            
firstSegment.copyTo(arrowChangeTypeOffset(CURRENT_LOG_MAGIC_VALUE), segment, 0, 
size);
+
+            ArrowReader reader =
+                    ArrowUtils.createArrowReader(segment, 0, size, root, 
allocator, rowType);
+            assertThat(reader.getRowCount()).isEqualTo(numRows);
+
+            for (int i = 0; i < numRows; i++) {
+                ColumnarRow row = reader.read(i);
+                row.setRowId(i);
+                assertThat(row.getInt(0)).isEqualTo(i);
+                assertThat(row.getArray(1).size()).isEqualTo(arraySize);
+                for (int j = 0; j < arraySize; j++) {
+                    assertThat(row.getArray(1).getInt(j)).isEqualTo(i * 
arraySize + j);
+                }
+            }
+        }
+    }
 }

Reply via email to