This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/fluss.git
commit 7fad656f1f298dd89a6288147e3df57b69478596 Author: ocean.wy <[email protected]> AuthorDate: Wed Dec 31 09:25:49 2025 +0800 [common Improve ArrowArrayWriter performance by replacing hardcoded handleSafe=true with dynamic check based on fieldIndex --- .../main/java/org/apache/fluss/row/arrow/ArrowWriter.java | 5 ++++- .../org/apache/fluss/row/arrow/writers/ArrowArrayWriter.java | 12 +++++++----- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/fluss-common/src/main/java/org/apache/fluss/row/arrow/ArrowWriter.java b/fluss-common/src/main/java/org/apache/fluss/row/arrow/ArrowWriter.java index 98b7ed6e5..ebc0a4043 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/arrow/ArrowWriter.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/arrow/ArrowWriter.java @@ -56,8 +56,11 @@ public class ArrowWriter implements AutoCloseable { /** * The initial capacity of the vectors which are used to store the rows. The capacity will be * expanded automatically if the rows exceed the initial capacity. + * + * <p>Public for use by nested writers (e.g., ArrowArrayWriter) to determine when to use safe + * write mode based on element indices. */ - private static final int INITIAL_CAPACITY = 1024; + public static final int INITIAL_CAPACITY = 1024; /** * The buffer usage ratio which is used to determine whether the writer is full. The writer is diff --git a/fluss-common/src/main/java/org/apache/fluss/row/arrow/writers/ArrowArrayWriter.java b/fluss-common/src/main/java/org/apache/fluss/row/arrow/writers/ArrowArrayWriter.java index ee413e8ef..2fc034f56 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/arrow/writers/ArrowArrayWriter.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/arrow/writers/ArrowArrayWriter.java @@ -20,6 +20,7 @@ package org.apache.fluss.row.arrow.writers; import org.apache.fluss.row.DataGetters; import org.apache.fluss.row.InternalArray; +import org.apache.fluss.row.arrow.ArrowWriter; import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.FieldVector; import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.ListVector; @@ -41,11 +42,12 @@ public class ArrowArrayWriter extends ArrowFieldWriter { listVector.startNewValue(rowIndex); for (int arrIndex = 0; arrIndex < array.size(); arrIndex++) { int fieldIndex = offset + arrIndex; - // Always use safe writes for array elements because the element index (offset + - // arrIndex) can exceed INITIAL_CAPACITY even when the row count doesn't. The parent's - // handleSafe is based on row count, but array element indices grow based on the total - // number of elements across all arrays, which can be much larger. - elementWriter.write(fieldIndex, array, arrIndex, true); + // Use element-based index to determine handleSafe, not parent row count. + // This fixes issue #2164: when row count < INITIAL_CAPACITY but total + // array elements > INITIAL_CAPACITY, we need to use safe mode for elements + // beyond the initial capacity. + boolean elementHandleSafe = fieldIndex >= ArrowWriter.INITIAL_CAPACITY; + elementWriter.write(fieldIndex, array, arrIndex, elementHandleSafe); } offset += array.size(); listVector.endValue(rowIndex, array.size());
