This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch ci-array-flink
in repository https://gitbox.apache.org/repos/asf/fluss.git

commit 209d6eda6fd3cf47702e9be4d927a165da604ae8
Author: Jark Wu <[email protected]>
AuthorDate: Tue Dec 2 12:13:44 2025 +0800

    [common] Deep copy ColumnarArray in CompletedFetch#fetchRecords() to fix 
Arrow IndexOutOfBoundsException
    
    This fixes exception:
    Caused by: java.lang.IndexOutOfBoundsException: index: 0, length: 1 
(expected: range(0, 0))
            at 
org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowBuf.checkIndexD(ArrowBuf.java:319)
---
 .../scanner/log/DefaultCompletedFetchTest.java     | 78 ++++++++++++++++++++++
 .../apache/fluss/record/LogRecordReadContext.java  |  3 +-
 .../java/org/apache/fluss/row/GenericArray.java    | 47 +++++++------
 .../java/org/apache/fluss/row/InternalArray.java   | 51 ++++++++++++++
 .../java/org/apache/fluss/row/InternalRow.java     | 56 ++++++++++++++++
 .../org/apache/fluss/testutils/DataTestUtils.java  | 28 ++++++--
 6 files changed, 236 insertions(+), 27 deletions(-)

diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java
index 3502dcc3c..11e4d7c99 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java
@@ -28,6 +28,7 @@ import org.apache.fluss.record.FileLogProjection;
 import org.apache.fluss.record.FileLogRecords;
 import org.apache.fluss.record.LogRecordReadContext;
 import org.apache.fluss.record.MemoryLogRecords;
+import org.apache.fluss.row.GenericArray;
 import org.apache.fluss.row.InternalRow;
 import org.apache.fluss.rpc.entity.FetchLogResultForBucket;
 import org.apache.fluss.types.DataTypes;
@@ -212,6 +213,83 @@ public class DefaultCompletedFetchTest {
         }
     }
 
+    @Test
+    void testComplexTypeFetch() throws Exception {
+        List<Object[]> complexData =
+                Arrays.asList(
+                        new Object[] {
+                            1,
+                            new String[] {"a", "b"},
+                            new Object[] {new int[] {1, 2}, new int[] {3, 4}}
+                        },
+                        new Object[] {
+                            2, new String[] {"c", null}, new Object[] {null, 
new int[] {3, 4}}
+                        },
+                        new Object[] {
+                            3,
+                            new String[] {"e", "f"},
+                            new Object[] {new int[] {5, 6, 7}, new int[] {8}}
+                        });
+        Schema schema =
+                Schema.newBuilder()
+                        .column("a", DataTypes.INT())
+                        .column("b", DataTypes.ARRAY(DataTypes.STRING()))
+                        .column("c", 
DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT())))
+                        .build();
+        TableInfo tableInfo =
+                TableInfo.of(
+                        DATA2_TABLE_PATH,
+                        DATA2_TABLE_ID,
+                        DEFAULT_SCHEMA_ID,
+                        TableDescriptor.builder()
+                                .schema(schema)
+                                .distributedBy(3)
+                                .logFormat(LogFormat.ARROW)
+                                .build(),
+                        System.currentTimeMillis(),
+                        System.currentTimeMillis());
+        long fetchOffset = 0L;
+        int bucketId = 0;
+        TableBucket tb = new TableBucket(DATA2_TABLE_ID, bucketId);
+        FetchLogResultForBucket resultForBucket =
+                new FetchLogResultForBucket(
+                        tb,
+                        createRecordsWithoutBaseLogOffset(
+                                schema.getRowType(),
+                                DEFAULT_SCHEMA_ID,
+                                0L,
+                                1000L,
+                                LOG_MAGIC_VALUE_V0,
+                                complexData,
+                                LogFormat.ARROW),
+                        3L);
+        DefaultCompletedFetch defaultCompletedFetch =
+                new DefaultCompletedFetch(
+                        tb,
+                        resultForBucket,
+                        LogRecordReadContext.createReadContext(tableInfo, 
false, null),
+                        logScannerStatus,
+                        true,
+                        fetchOffset);
+        List<ScanRecord> scanRecords = defaultCompletedFetch.fetchRecords(3);
+        // close the read context to release arrow root resource,
+        // this is important to test complex types
+        defaultCompletedFetch.readContext.close();
+        assertThat(scanRecords.size()).isEqualTo(3);
+        for (int i = 0; i < scanRecords.size(); i++) {
+            ScanRecord record = scanRecords.get(i);
+            assertThat(record.logOffset()).isEqualTo(i);
+            InternalRow row = record.getRow();
+            assertThat(row.getInt(0)).isEqualTo(complexData.get(i)[0]);
+            assertThat(row.getArray(1)).isInstanceOf(GenericArray.class);
+            GenericArray array = (GenericArray) row.getArray(1);
+            assertThat(array.toString())
+                    .isEqualTo(Arrays.deepToString((Object[]) 
complexData.get(i)[1]));
+            assertThat(row.getArray(2).toString())
+                    .isEqualTo(Arrays.deepToString((Object[]) 
complexData.get(i)[2]));
+        }
+    }
+
     private DefaultCompletedFetch makeCompletedFetch(
             TableBucket tableBucket, FetchLogResultForBucket resultForBucket, 
long offset) {
         return makeCompletedFetch(tableBucket, resultForBucket, offset, null);
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java 
b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java
index 1158dbc35..0a9329113 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java
@@ -228,8 +228,9 @@ public class LogRecordReadContext implements 
LogRecordBatch.ReadContext, AutoClo
         List<DataType> dataTypeList = rowType.getChildren();
         FieldGetter[] fieldGetters = new FieldGetter[selectedFields.length];
         for (int i = 0; i < fieldGetters.length; i++) {
+            // build deep field getter to support nested types
             fieldGetters[i] =
-                    InternalRow.createFieldGetter(
+                    InternalRow.createDeepFieldGetter(
                             dataTypeList.get(selectedFields[i]), 
selectedFields[i]);
         }
         return fieldGetters;
diff --git a/fluss-common/src/main/java/org/apache/fluss/row/GenericArray.java 
b/fluss-common/src/main/java/org/apache/fluss/row/GenericArray.java
index a9e905ccb..d97accd62 100644
--- a/fluss-common/src/main/java/org/apache/fluss/row/GenericArray.java
+++ b/fluss-common/src/main/java/org/apache/fluss/row/GenericArray.java
@@ -139,27 +139,6 @@ public final class GenericArray implements InternalArray, 
Serializable {
         return !isPrimitiveArray && ((Object[]) array)[pos] == null;
     }
 
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-        GenericArray that = (GenericArray) o;
-        return size == that.size
-                && isPrimitiveArray == that.isPrimitiveArray
-                && Objects.deepEquals(array, that.array);
-    }
-
-    @Override
-    public int hashCode() {
-        int result = Objects.hash(size, isPrimitiveArray);
-        result = 31 * result + Arrays.deepHashCode(new Object[] {array});
-        return result;
-    }
-
     // 
------------------------------------------------------------------------------------------
     // Read-only accessor methods
     // 
------------------------------------------------------------------------------------------
@@ -328,6 +307,32 @@ public final class GenericArray implements InternalArray, 
Serializable {
         return ArrayUtils.toPrimitiveDouble((Object[]) array);
     }
 
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        GenericArray that = (GenericArray) o;
+        return size == that.size
+                && isPrimitiveArray == that.isPrimitiveArray
+                && Objects.deepEquals(array, that.array);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = Objects.hash(size, isPrimitiveArray);
+        result = 31 * result + Arrays.deepHashCode(new Object[] {array});
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return Arrays.toString((Object[]) array);
+    }
+
     // 
----------------------------------------------------------------------------------------
     // Utilities
     // 
----------------------------------------------------------------------------------------
diff --git a/fluss-common/src/main/java/org/apache/fluss/row/InternalArray.java 
b/fluss-common/src/main/java/org/apache/fluss/row/InternalArray.java
index 019ee4078..1d16b738e 100644
--- a/fluss-common/src/main/java/org/apache/fluss/row/InternalArray.java
+++ b/fluss-common/src/main/java/org/apache/fluss/row/InternalArray.java
@@ -19,6 +19,8 @@
 package org.apache.fluss.row;
 
 import org.apache.fluss.annotation.PublicEvolving;
+import org.apache.fluss.row.columnar.ColumnarRow;
+import org.apache.fluss.row.columnar.VectorizedColumnBatch;
 import org.apache.fluss.types.ArrayType;
 import org.apache.fluss.types.DataType;
 
@@ -152,6 +154,55 @@ public interface InternalArray extends DataGetters {
         };
     }
 
+    /**
+     * Creates a deep accessor for getting elements in an internal array data 
structure at the given
+     * position. It returns new objects (GenericArray/GenericMap/GenericMap) 
for nested
+     * array/map/row types.
+     *
+     * <p>NOTE: Currently, it is only used for deep copying {@link 
ColumnarRow} for Arrow which
+     * avoid the arrow buffer is released before accessing elements. It 
doesn't deep copy STRING and
+     * BYTES types, because {@link ColumnarRow} already deep copies the bytes, 
see {@link
+     * VectorizedColumnBatch#getString(int, int)}. This can be removed once we 
supports object reuse
+     * for Arrow {@link ColumnarRow}, see {@code 
CompletedFetch#toScanRecord(LogRecord)}.
+     */
+    static ElementGetter createDeepElementGetter(DataType fieldType) {
+        final ElementGetter elementGetter;
+        switch (fieldType.getTypeRoot()) {
+            case ARRAY:
+                DataType nestedType = ((ArrayType) fieldType).getElementType();
+                ElementGetter nestedGetter = 
createDeepElementGetter(nestedType);
+                elementGetter =
+                        (array, pos) -> {
+                            InternalArray inner = array.getArray(pos);
+                            Object[] objs = new Object[inner.size()];
+                            for (int i = 0; i < inner.size(); i++) {
+                                objs[i] = nestedGetter.getElementOrNull(inner, 
i);
+                            }
+                            return new GenericArray(objs);
+                        };
+                break;
+            case MAP:
+            case ROW:
+                String msg =
+                        String.format(
+                                "type %s not support in %s",
+                                fieldType.getTypeRoot().toString(), 
InternalArray.class.getName());
+                throw new IllegalArgumentException(msg);
+            default:
+                // for primitive types, we can directly return the element 
getter
+                elementGetter = createElementGetter(fieldType);
+        }
+        if (!fieldType.isNullable()) {
+            return elementGetter;
+        }
+        return (array, pos) -> {
+            if (array.isNullAt(pos)) {
+                return null;
+            }
+            return elementGetter.getElementOrNull(array, pos);
+        };
+    }
+
     /** Accessor for getting the elements of an array during runtime. */
     interface ElementGetter extends Serializable {
         @Nullable
diff --git a/fluss-common/src/main/java/org/apache/fluss/row/InternalRow.java 
b/fluss-common/src/main/java/org/apache/fluss/row/InternalRow.java
index 35ad867b6..38755aeef 100644
--- a/fluss-common/src/main/java/org/apache/fluss/row/InternalRow.java
+++ b/fluss-common/src/main/java/org/apache/fluss/row/InternalRow.java
@@ -19,6 +19,9 @@ package org.apache.fluss.row;
 
 import org.apache.fluss.annotation.PublicEvolving;
 import org.apache.fluss.record.ChangeType;
+import org.apache.fluss.row.columnar.ColumnarRow;
+import org.apache.fluss.row.columnar.VectorizedColumnBatch;
+import org.apache.fluss.types.ArrayType;
 import org.apache.fluss.types.DataType;
 import org.apache.fluss.types.RowType;
 
@@ -26,6 +29,7 @@ import javax.annotation.Nullable;
 
 import java.io.Serializable;
 
+import static org.apache.fluss.row.InternalArray.createDeepElementGetter;
 import static org.apache.fluss.types.DataTypeChecks.getLength;
 import static org.apache.fluss.types.DataTypeChecks.getPrecision;
 import static org.apache.fluss.types.DataTypeChecks.getScale;
@@ -208,6 +212,7 @@ public interface InternalRow extends DataGetters {
                 fieldGetter = row -> row.getTimestampLtz(fieldPos, 
timestampLtzPrecision);
                 break;
             case ARRAY:
+                // TODO deep copy
                 fieldGetter = row -> row.getArray(fieldPos);
                 break;
                 // TODO: MAP support will be added in Issue #1973
@@ -228,6 +233,57 @@ public interface InternalRow extends DataGetters {
         };
     }
 
+    /**
+     * Creates a deep accessor for getting elements in an internal array data 
structure at the given
+     * position. It returns new objects (GenericArray/GenericMap/GenericMap) 
for nested
+     * array/map/row types.
+     *
+     * <p>NOTE: Currently, it is only used for deep copying {@link 
ColumnarRow} for Arrow which
+     * avoid the arrow buffer is released before accessing elements. It 
doesn't deep copy STRING and
+     * BYTES types, because {@link ColumnarRow} already deep copies the bytes, 
see {@link
+     * VectorizedColumnBatch#getString(int, int)}. This can be removed once we 
supports object reuse
+     * for Arrow {@link ColumnarRow}, see {@code 
CompletedFetch#toScanRecord(LogRecord)}.
+     */
+    static FieldGetter createDeepFieldGetter(DataType fieldType, int fieldPos) 
{
+        final FieldGetter fieldGetter;
+        switch (fieldType.getTypeRoot()) {
+            case ARRAY:
+                DataType elementType = ((ArrayType) 
fieldType).getElementType();
+                InternalArray.ElementGetter nestedGetter = 
createDeepElementGetter(elementType);
+                fieldGetter =
+                        row -> {
+                            InternalArray array = row.getArray(fieldPos);
+                            Object[] objs = new Object[array.size()];
+                            for (int i = 0; i < array.size(); i++) {
+                                objs[i] = nestedGetter.getElementOrNull(array, 
i);
+                            }
+                            return new GenericArray(objs);
+                        };
+                break;
+            case MAP:
+            case ROW:
+                String msg =
+                        String.format(
+                                "type %s not support in %s",
+                                fieldType.getTypeRoot().toString(), 
InternalArray.class.getName());
+                throw new IllegalArgumentException(msg);
+            default:
+                // for primitive types, use the normal field getter
+                fieldGetter = createFieldGetter(fieldType, fieldPos);
+                break;
+        }
+
+        if (!fieldType.isNullable()) {
+            return fieldGetter;
+        }
+        return row -> {
+            if (row.isNullAt(fieldPos)) {
+                return null;
+            }
+            return fieldGetter.getFieldOrNull(row);
+        };
+    }
+
     /** Accessor for getting the field of a row during runtime. */
     interface FieldGetter extends Serializable {
         @Nullable
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/testutils/DataTestUtils.java 
b/fluss-common/src/test/java/org/apache/fluss/testutils/DataTestUtils.java
index 2236ebe21..2c3bc68fe 100644
--- a/fluss-common/src/test/java/org/apache/fluss/testutils/DataTestUtils.java
+++ b/fluss-common/src/test/java/org/apache/fluss/testutils/DataTestUtils.java
@@ -43,6 +43,7 @@ import org.apache.fluss.record.MemoryLogRecordsArrowBuilder;
 import org.apache.fluss.record.MemoryLogRecordsIndexedBuilder;
 import org.apache.fluss.remote.RemoteLogSegment;
 import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.GenericArray;
 import org.apache.fluss.row.GenericRow;
 import org.apache.fluss.row.InternalRow;
 import org.apache.fluss.row.arrow.ArrowWriter;
@@ -95,15 +96,32 @@ public class DataTestUtils {
     public static GenericRow row(Object... objects) {
         GenericRow row = new GenericRow(objects.length);
         for (int i = 0; i < objects.length; i++) {
-            if (objects[i] instanceof String) {
-                row.setField(i, BinaryString.fromString((String) objects[i]));
-            } else {
-                row.setField(i, objects[i]);
-            }
+            Object value = toInternalObject(objects[i]);
+            row.setField(i, value);
         }
         return row;
     }
 
+    private static Object toInternalObject(Object obj) {
+        if (obj == null) {
+            return null;
+        }
+        if (obj instanceof String) {
+            return BinaryString.fromString((String) obj);
+        } else if (obj instanceof Object[]) {
+            Object[] array = (Object[]) obj;
+            Object[] internalArray = new Object[array.length];
+            for (int j = 0; j < array.length; j++) {
+                internalArray[j] = toInternalObject(array[j]);
+            }
+            return new GenericArray(internalArray);
+        } else if (obj instanceof int[]) {
+            return new GenericArray((int[]) obj);
+        } else {
+            return obj;
+        }
+    }
+
     public static CompactedRow compactedRow(RowType rowType, Object[] objects) 
{
         return genCompacted(rowType, objects);
     }

Reply via email to