This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch ci-array-flink in repository https://gitbox.apache.org/repos/asf/fluss.git
commit 209d6eda6fd3cf47702e9be4d927a165da604ae8 Author: Jark Wu <[email protected]> AuthorDate: Tue Dec 2 12:13:44 2025 +0800 [common] Deep copy ColumnarArray in CompletedFetch#fetchRecords() to fix Arrow IndexOutOfBoundsException This fixes exception: Caused by: java.lang.IndexOutOfBoundsException: index: 0, length: 1 (expected: range(0, 0)) at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowBuf.checkIndexD(ArrowBuf.java:319) --- .../scanner/log/DefaultCompletedFetchTest.java | 78 ++++++++++++++++++++++ .../apache/fluss/record/LogRecordReadContext.java | 3 +- .../java/org/apache/fluss/row/GenericArray.java | 47 +++++++------ .../java/org/apache/fluss/row/InternalArray.java | 51 ++++++++++++++ .../java/org/apache/fluss/row/InternalRow.java | 56 ++++++++++++++++ .../org/apache/fluss/testutils/DataTestUtils.java | 28 ++++++-- 6 files changed, 236 insertions(+), 27 deletions(-) diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java index 3502dcc3c..11e4d7c99 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java @@ -28,6 +28,7 @@ import org.apache.fluss.record.FileLogProjection; import org.apache.fluss.record.FileLogRecords; import org.apache.fluss.record.LogRecordReadContext; import org.apache.fluss.record.MemoryLogRecords; +import org.apache.fluss.row.GenericArray; import org.apache.fluss.row.InternalRow; import org.apache.fluss.rpc.entity.FetchLogResultForBucket; import org.apache.fluss.types.DataTypes; @@ -212,6 +213,83 @@ public class DefaultCompletedFetchTest { } } + @Test + void testComplexTypeFetch() throws Exception { + List<Object[]> complexData = + Arrays.asList( + new Object[] { + 1, + new String[] {"a", "b"}, + new Object[] {new int[] {1, 2}, new int[] {3, 4}} + }, + new Object[] { + 2, new String[] {"c", null}, new Object[] {null, new int[] {3, 4}} + }, + new Object[] { + 3, + new String[] {"e", "f"}, + new Object[] {new int[] {5, 6, 7}, new int[] {8}} + }); + Schema schema = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.ARRAY(DataTypes.STRING())) + .column("c", DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT()))) + .build(); + TableInfo tableInfo = + TableInfo.of( + DATA2_TABLE_PATH, + DATA2_TABLE_ID, + DEFAULT_SCHEMA_ID, + TableDescriptor.builder() + .schema(schema) + .distributedBy(3) + .logFormat(LogFormat.ARROW) + .build(), + System.currentTimeMillis(), + System.currentTimeMillis()); + long fetchOffset = 0L; + int bucketId = 0; + TableBucket tb = new TableBucket(DATA2_TABLE_ID, bucketId); + FetchLogResultForBucket resultForBucket = + new FetchLogResultForBucket( + tb, + createRecordsWithoutBaseLogOffset( + schema.getRowType(), + DEFAULT_SCHEMA_ID, + 0L, + 1000L, + LOG_MAGIC_VALUE_V0, + complexData, + LogFormat.ARROW), + 3L); + DefaultCompletedFetch defaultCompletedFetch = + new DefaultCompletedFetch( + tb, + resultForBucket, + LogRecordReadContext.createReadContext(tableInfo, false, null), + logScannerStatus, + true, + fetchOffset); + List<ScanRecord> scanRecords = defaultCompletedFetch.fetchRecords(3); + // close the read context to release arrow root resource, + // this is important to test complex types + defaultCompletedFetch.readContext.close(); + assertThat(scanRecords.size()).isEqualTo(3); + for (int i = 0; i < scanRecords.size(); i++) { + ScanRecord record = scanRecords.get(i); + assertThat(record.logOffset()).isEqualTo(i); + InternalRow row = record.getRow(); + assertThat(row.getInt(0)).isEqualTo(complexData.get(i)[0]); + assertThat(row.getArray(1)).isInstanceOf(GenericArray.class); + GenericArray array = (GenericArray) row.getArray(1); + assertThat(array.toString()) + .isEqualTo(Arrays.deepToString((Object[]) complexData.get(i)[1])); + assertThat(row.getArray(2).toString()) + .isEqualTo(Arrays.deepToString((Object[]) complexData.get(i)[2])); + } + } + private DefaultCompletedFetch makeCompletedFetch( TableBucket tableBucket, FetchLogResultForBucket resultForBucket, long offset) { return makeCompletedFetch(tableBucket, resultForBucket, offset, null); diff --git a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java index 1158dbc35..0a9329113 100644 --- a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java +++ b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java @@ -228,8 +228,9 @@ public class LogRecordReadContext implements LogRecordBatch.ReadContext, AutoClo List<DataType> dataTypeList = rowType.getChildren(); FieldGetter[] fieldGetters = new FieldGetter[selectedFields.length]; for (int i = 0; i < fieldGetters.length; i++) { + // build deep field getter to support nested types fieldGetters[i] = - InternalRow.createFieldGetter( + InternalRow.createDeepFieldGetter( dataTypeList.get(selectedFields[i]), selectedFields[i]); } return fieldGetters; diff --git a/fluss-common/src/main/java/org/apache/fluss/row/GenericArray.java b/fluss-common/src/main/java/org/apache/fluss/row/GenericArray.java index a9e905ccb..d97accd62 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/GenericArray.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/GenericArray.java @@ -139,27 +139,6 @@ public final class GenericArray implements InternalArray, Serializable { return !isPrimitiveArray && ((Object[]) array)[pos] == null; } - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - GenericArray that = (GenericArray) o; - return size == that.size - && isPrimitiveArray == that.isPrimitiveArray - && Objects.deepEquals(array, that.array); - } - - @Override - public int hashCode() { - int result = Objects.hash(size, isPrimitiveArray); - result = 31 * result + Arrays.deepHashCode(new Object[] {array}); - return result; - } - // ------------------------------------------------------------------------------------------ // Read-only accessor methods // ------------------------------------------------------------------------------------------ @@ -328,6 +307,32 @@ public final class GenericArray implements InternalArray, Serializable { return ArrayUtils.toPrimitiveDouble((Object[]) array); } + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + GenericArray that = (GenericArray) o; + return size == that.size + && isPrimitiveArray == that.isPrimitiveArray + && Objects.deepEquals(array, that.array); + } + + @Override + public int hashCode() { + int result = Objects.hash(size, isPrimitiveArray); + result = 31 * result + Arrays.deepHashCode(new Object[] {array}); + return result; + } + + @Override + public String toString() { + return Arrays.toString((Object[]) array); + } + // ---------------------------------------------------------------------------------------- // Utilities // ---------------------------------------------------------------------------------------- diff --git a/fluss-common/src/main/java/org/apache/fluss/row/InternalArray.java b/fluss-common/src/main/java/org/apache/fluss/row/InternalArray.java index 019ee4078..1d16b738e 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/InternalArray.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/InternalArray.java @@ -19,6 +19,8 @@ package org.apache.fluss.row; import org.apache.fluss.annotation.PublicEvolving; +import org.apache.fluss.row.columnar.ColumnarRow; +import org.apache.fluss.row.columnar.VectorizedColumnBatch; import org.apache.fluss.types.ArrayType; import org.apache.fluss.types.DataType; @@ -152,6 +154,55 @@ public interface InternalArray extends DataGetters { }; } + /** + * Creates a deep accessor for getting elements in an internal array data structure at the given + * position. It returns new objects (GenericArray/GenericMap/GenericMap) for nested + * array/map/row types. + * + * <p>NOTE: Currently, it is only used for deep copying {@link ColumnarRow} for Arrow which + * avoid the arrow buffer is released before accessing elements. It doesn't deep copy STRING and + * BYTES types, because {@link ColumnarRow} already deep copies the bytes, see {@link + * VectorizedColumnBatch#getString(int, int)}. This can be removed once we supports object reuse + * for Arrow {@link ColumnarRow}, see {@code CompletedFetch#toScanRecord(LogRecord)}. + */ + static ElementGetter createDeepElementGetter(DataType fieldType) { + final ElementGetter elementGetter; + switch (fieldType.getTypeRoot()) { + case ARRAY: + DataType nestedType = ((ArrayType) fieldType).getElementType(); + ElementGetter nestedGetter = createDeepElementGetter(nestedType); + elementGetter = + (array, pos) -> { + InternalArray inner = array.getArray(pos); + Object[] objs = new Object[inner.size()]; + for (int i = 0; i < inner.size(); i++) { + objs[i] = nestedGetter.getElementOrNull(inner, i); + } + return new GenericArray(objs); + }; + break; + case MAP: + case ROW: + String msg = + String.format( + "type %s not support in %s", + fieldType.getTypeRoot().toString(), InternalArray.class.getName()); + throw new IllegalArgumentException(msg); + default: + // for primitive types, we can directly return the element getter + elementGetter = createElementGetter(fieldType); + } + if (!fieldType.isNullable()) { + return elementGetter; + } + return (array, pos) -> { + if (array.isNullAt(pos)) { + return null; + } + return elementGetter.getElementOrNull(array, pos); + }; + } + /** Accessor for getting the elements of an array during runtime. */ interface ElementGetter extends Serializable { @Nullable diff --git a/fluss-common/src/main/java/org/apache/fluss/row/InternalRow.java b/fluss-common/src/main/java/org/apache/fluss/row/InternalRow.java index 35ad867b6..38755aeef 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/InternalRow.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/InternalRow.java @@ -19,6 +19,9 @@ package org.apache.fluss.row; import org.apache.fluss.annotation.PublicEvolving; import org.apache.fluss.record.ChangeType; +import org.apache.fluss.row.columnar.ColumnarRow; +import org.apache.fluss.row.columnar.VectorizedColumnBatch; +import org.apache.fluss.types.ArrayType; import org.apache.fluss.types.DataType; import org.apache.fluss.types.RowType; @@ -26,6 +29,7 @@ import javax.annotation.Nullable; import java.io.Serializable; +import static org.apache.fluss.row.InternalArray.createDeepElementGetter; import static org.apache.fluss.types.DataTypeChecks.getLength; import static org.apache.fluss.types.DataTypeChecks.getPrecision; import static org.apache.fluss.types.DataTypeChecks.getScale; @@ -208,6 +212,7 @@ public interface InternalRow extends DataGetters { fieldGetter = row -> row.getTimestampLtz(fieldPos, timestampLtzPrecision); break; case ARRAY: + // TODO deep copy fieldGetter = row -> row.getArray(fieldPos); break; // TODO: MAP support will be added in Issue #1973 @@ -228,6 +233,57 @@ public interface InternalRow extends DataGetters { }; } + /** + * Creates a deep accessor for getting elements in an internal array data structure at the given + * position. It returns new objects (GenericArray/GenericMap/GenericMap) for nested + * array/map/row types. + * + * <p>NOTE: Currently, it is only used for deep copying {@link ColumnarRow} for Arrow which + * avoid the arrow buffer is released before accessing elements. It doesn't deep copy STRING and + * BYTES types, because {@link ColumnarRow} already deep copies the bytes, see {@link + * VectorizedColumnBatch#getString(int, int)}. This can be removed once we supports object reuse + * for Arrow {@link ColumnarRow}, see {@code CompletedFetch#toScanRecord(LogRecord)}. + */ + static FieldGetter createDeepFieldGetter(DataType fieldType, int fieldPos) { + final FieldGetter fieldGetter; + switch (fieldType.getTypeRoot()) { + case ARRAY: + DataType elementType = ((ArrayType) fieldType).getElementType(); + InternalArray.ElementGetter nestedGetter = createDeepElementGetter(elementType); + fieldGetter = + row -> { + InternalArray array = row.getArray(fieldPos); + Object[] objs = new Object[array.size()]; + for (int i = 0; i < array.size(); i++) { + objs[i] = nestedGetter.getElementOrNull(array, i); + } + return new GenericArray(objs); + }; + break; + case MAP: + case ROW: + String msg = + String.format( + "type %s not support in %s", + fieldType.getTypeRoot().toString(), InternalArray.class.getName()); + throw new IllegalArgumentException(msg); + default: + // for primitive types, use the normal field getter + fieldGetter = createFieldGetter(fieldType, fieldPos); + break; + } + + if (!fieldType.isNullable()) { + return fieldGetter; + } + return row -> { + if (row.isNullAt(fieldPos)) { + return null; + } + return fieldGetter.getFieldOrNull(row); + }; + } + /** Accessor for getting the field of a row during runtime. */ interface FieldGetter extends Serializable { @Nullable diff --git a/fluss-common/src/test/java/org/apache/fluss/testutils/DataTestUtils.java b/fluss-common/src/test/java/org/apache/fluss/testutils/DataTestUtils.java index 2236ebe21..2c3bc68fe 100644 --- a/fluss-common/src/test/java/org/apache/fluss/testutils/DataTestUtils.java +++ b/fluss-common/src/test/java/org/apache/fluss/testutils/DataTestUtils.java @@ -43,6 +43,7 @@ import org.apache.fluss.record.MemoryLogRecordsArrowBuilder; import org.apache.fluss.record.MemoryLogRecordsIndexedBuilder; import org.apache.fluss.remote.RemoteLogSegment; import org.apache.fluss.row.BinaryString; +import org.apache.fluss.row.GenericArray; import org.apache.fluss.row.GenericRow; import org.apache.fluss.row.InternalRow; import org.apache.fluss.row.arrow.ArrowWriter; @@ -95,15 +96,32 @@ public class DataTestUtils { public static GenericRow row(Object... objects) { GenericRow row = new GenericRow(objects.length); for (int i = 0; i < objects.length; i++) { - if (objects[i] instanceof String) { - row.setField(i, BinaryString.fromString((String) objects[i])); - } else { - row.setField(i, objects[i]); - } + Object value = toInternalObject(objects[i]); + row.setField(i, value); } return row; } + private static Object toInternalObject(Object obj) { + if (obj == null) { + return null; + } + if (obj instanceof String) { + return BinaryString.fromString((String) obj); + } else if (obj instanceof Object[]) { + Object[] array = (Object[]) obj; + Object[] internalArray = new Object[array.length]; + for (int j = 0; j < array.length; j++) { + internalArray[j] = toInternalObject(array[j]); + } + return new GenericArray(internalArray); + } else if (obj instanceof int[]) { + return new GenericArray((int[]) obj); + } else { + return obj; + } + } + public static CompactedRow compactedRow(RowType rowType, Object[] objects) { return genCompacted(rowType, objects); }
