This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new ad037cfba [lance] Fix Lance writer to emit Arrow FixedSizeList for
array columns to enable native vector search (#2707)
ad037cfba is described below
commit ad037cfbad862a909894a3627de45443fee09e88
Author: Keith Lee <[email protected]>
AuthorDate: Sat Feb 28 07:34:26 2026 +0000
[lance] Fix Lance writer to emit Arrow FixedSizeList for array columns to
enable native vector search (#2707)
---
.../apache/fluss/lake/lance/LanceLakeCatalog.java | 4 +-
.../fluss/lake/lance/utils/ArrowDataConverter.java | 170 ++++++++++++++++--
.../fluss/lake/lance/utils/LanceArrowUtils.java | 63 ++++++-
.../fluss/lake/lance/tiering/LanceTieringTest.java | 71 +++++---
.../lake/lance/utils/ArrowDataConverterTest.java | 197 +++++++++++++++++++++
.../lake/lance/utils/LanceArrowUtilsTest.java | 134 ++++++++++++++
6 files changed, 594 insertions(+), 45 deletions(-)
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/LanceLakeCatalog.java
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/LanceLakeCatalog.java
index 600dcbd0d..ad8f7687a 100644
---
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/LanceLakeCatalog.java
+++
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/LanceLakeCatalog.java
@@ -61,7 +61,9 @@ public class LanceLakeCatalog implements LakeCatalog {
List<Field> fields = new ArrayList<>();
// set schema
fields.addAll(
-
LanceArrowUtils.toArrowSchema(tableDescriptor.getSchema().getRowType())
+ LanceArrowUtils.toArrowSchema(
+ tableDescriptor.getSchema().getRowType(),
+ tableDescriptor.getCustomProperties())
.getFields());
try {
LanceDatasetAdapter.createDataset(config.getDatasetUri(), new
Schema(fields), params);
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/ArrowDataConverter.java
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/ArrowDataConverter.java
index ed822e8b2..0694b7fcb 100644
---
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/ArrowDataConverter.java
+++
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/ArrowDataConverter.java
@@ -21,6 +21,7 @@ import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.FixedSizeListVector;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.types.pojo.Schema;
@@ -54,16 +55,21 @@ public class ArrowDataConverter {
VectorSchemaRoot.create(nonShadedSchema, nonShadedAllocator);
nonShadedRoot.allocateNew();
-
List<org.apache.fluss.shaded.arrow.org.apache.arrow.vector.FieldVector>
shadedVectors =
- shadedRoot.getFieldVectors();
- List<FieldVector> nonShadedVectors = nonShadedRoot.getFieldVectors();
+ try {
+
List<org.apache.fluss.shaded.arrow.org.apache.arrow.vector.FieldVector>
shadedVectors =
+ shadedRoot.getFieldVectors();
+ List<FieldVector> nonShadedVectors =
nonShadedRoot.getFieldVectors();
- for (int i = 0; i < shadedVectors.size(); i++) {
- copyVectorData(shadedVectors.get(i), nonShadedVectors.get(i));
- }
+ for (int i = 0; i < shadedVectors.size(); i++) {
+ copyVectorData(shadedVectors.get(i), nonShadedVectors.get(i));
+ }
- nonShadedRoot.setRowCount(shadedRoot.getRowCount());
- return nonShadedRoot;
+ nonShadedRoot.setRowCount(shadedRoot.getRowCount());
+ return nonShadedRoot;
+ } catch (Exception e) {
+ nonShadedRoot.close();
+ throw e;
+ }
}
private static void copyVectorData(
@@ -71,14 +77,21 @@ public class ArrowDataConverter {
FieldVector nonShadedVector) {
if (shadedVector
- instanceof
-
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.ListVector
- && nonShadedVector instanceof ListVector) {
- copyListVectorData(
-
(org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.ListVector)
- shadedVector,
- (ListVector) nonShadedVector);
- return;
+ instanceof
+
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.ListVector) {
+ if (nonShadedVector instanceof FixedSizeListVector) {
+ copyListToFixedSizeListVectorData(
+
(org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.ListVector)
+ shadedVector,
+ (FixedSizeListVector) nonShadedVector);
+ return;
+ } else if (nonShadedVector instanceof ListVector) {
+ copyListVectorData(
+
(org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.ListVector)
+ shadedVector,
+ (ListVector) nonShadedVector);
+ return;
+ }
}
List<org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowBuf>
shadedBuffers =
@@ -143,4 +156,129 @@ public class ArrowDataConverter {
// For ListVector, we need to manually set lastSet to avoid offset
buffer recalculation
nonShadedListVector.setLastSet(valueCount - 1);
}
+
+ private static void copyListToFixedSizeListVectorData(
+
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.ListVector
+ shadedListVector,
+ FixedSizeListVector nonShadedFixedSizeListVector) {
+
+ int valueCount = shadedListVector.getValueCount();
+ int expectedListSize = nonShadedFixedSizeListVector.getListSize();
+ int nonNullCount = valueCount - shadedListVector.getNullCount();
+ int expectedTotalChildCount = nonNullCount * expectedListSize;
+
+ // Validate that backing data vector element count matches expected
fixed-size layout.
+ int totalChildCount = shadedListVector.getDataVector().getValueCount();
+ if (totalChildCount != expectedTotalChildCount) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Total child elements (%d) does not match expected
%d for FixedSizeList conversion.",
+ totalChildCount, expectedTotalChildCount));
+ }
+
+ // Copy child data from the source ListVector to the target
FixedSizeListVector.
+ //
+ // In a ListVector, child elements for non-null rows are packed
contiguously
+ // (null rows contribute 0 children). In a FixedSizeListVector, child
data is
+ // stride-based: row i's data starts at index i * listSize, so null
rows still
+ // occupy child slots. When null rows exist, a simple bulk buffer copy
won't
+ // work because the layouts differ — we must remap per-row using the
source
+ // offset buffer.
+ org.apache.fluss.shaded.arrow.org.apache.arrow.vector.FieldVector
shadedDataVector =
+ shadedListVector.getDataVector();
+ FieldVector nonShadedDataVector =
nonShadedFixedSizeListVector.getDataVector();
+
+ if (shadedDataVector != null && nonShadedDataVector != null) {
+ if (nonNullCount == valueCount) {
+ // No null rows — child layouts are identical, use fast bulk
copy.
+ copyVectorData(shadedDataVector, nonShadedDataVector);
+ } else if (totalChildCount > 0) {
+ // Null rows present — copy child data row-by-row with offset
remapping.
+ copyChildDataWithOffsetRemapping(
+ shadedListVector, nonShadedDataVector, valueCount,
expectedListSize);
+ nonShadedDataVector.setValueCount(valueCount *
expectedListSize);
+ }
+ }
+
+ // FixedSizeListVector only has a validity buffer (no offset buffer).
+ // Copy the validity buffer from the shaded ListVector.
+ List<org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowBuf>
shadedBuffers =
+ shadedListVector.getFieldBuffers();
+ List<ArrowBuf> nonShadedBuffers =
nonShadedFixedSizeListVector.getFieldBuffers();
+
+ // Both ListVector and FixedSizeListVector have validity as their
first buffer
+ if (!shadedBuffers.isEmpty() && !nonShadedBuffers.isEmpty()) {
+ org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowBuf
shadedValidityBuf =
+ shadedBuffers.get(0);
+ ArrowBuf nonShadedValidityBuf = nonShadedBuffers.get(0);
+
+ long size = Math.min(shadedValidityBuf.capacity(),
nonShadedValidityBuf.capacity());
+ if (size > 0) {
+ ByteBuffer srcBuffer = shadedValidityBuf.nioBuffer(0, (int)
size);
+ srcBuffer.position(0);
+ srcBuffer.limit((int) Math.min(size, Integer.MAX_VALUE));
+ nonShadedValidityBuf.setBytes(0, srcBuffer);
+ }
+ }
+
+ nonShadedFixedSizeListVector.setValueCount(valueCount);
+ }
+
+ /**
+ * Copies child element data from a shaded ListVector to a non-shaded
child vector, remapping
+ * offsets so that row i's data lands at index {@code i * listSize} in the
target (the layout
+ * required by FixedSizeListVector). Null rows in the source are skipped.
+ */
+ private static void copyChildDataWithOffsetRemapping(
+
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.ListVector
+ shadedListVector,
+ FieldVector nonShadedChildVector,
+ int valueCount,
+ int listSize) {
+
+ // Source child data buffer (index 1 for fixed-width vectors:
[validity, data])
+ org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowBuf
srcDataBuf =
+ shadedListVector.getDataVector().getFieldBuffers().get(1);
+ // Source offset buffer: offset[i] is the start element index for row i
+ org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowBuf
srcOffsetBuf =
+ shadedListVector.getOffsetBuffer();
+
+ // Target child buffers: [0] = validity, [1] = data
+ ArrowBuf tgtValidityBuf =
nonShadedChildVector.getFieldBuffers().get(0);
+ ArrowBuf tgtDataBuf = nonShadedChildVector.getFieldBuffers().get(1);
+
+ // Get element byte width from the non-shaded child vector type.
+ // Buffer capacity cannot be used because Arrow pads/aligns buffers.
+ if (!(nonShadedChildVector instanceof
org.apache.arrow.vector.BaseFixedWidthVector)) {
+ throw new UnsupportedOperationException(
+ "FixedSizeList conversion with null rows only supports
fixed-width child vectors, got "
+ + nonShadedChildVector.getClass().getSimpleName());
+ }
+ int elementByteWidth =
+ ((org.apache.arrow.vector.BaseFixedWidthVector)
nonShadedChildVector)
+ .getTypeWidth();
+ int rowByteWidth = listSize * elementByteWidth;
+
+ for (int i = 0; i < valueCount; i++) {
+ if (!shadedListVector.isNull(i)) {
+ int srcElementStart = srcOffsetBuf.getInt((long) i *
Integer.BYTES);
+ int srcByteOffset = srcElementStart * elementByteWidth;
+ int tgtElementStart = i * listSize;
+ int tgtByteOffset = tgtElementStart * elementByteWidth;
+
+ // Copy the data bytes for this row's child elements
+ ByteBuffer srcSlice = srcDataBuf.nioBuffer(srcByteOffset,
rowByteWidth);
+ tgtDataBuf.setBytes(tgtByteOffset, srcSlice);
+
+ // Set validity bits for each child element in this row
+ for (int j = 0; j < listSize; j++) {
+ int bitIndex = tgtElementStart + j;
+ int byteIndex = bitIndex / 8;
+ int bitOffset = bitIndex % 8;
+ byte currentByte = tgtValidityBuf.getByte(byteIndex);
+ tgtValidityBuf.setByte(byteIndex, currentByte | (1 <<
bitOffset));
+ }
+ }
+ }
+ }
}
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/LanceArrowUtils.java
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/LanceArrowUtils.java
index 681367a32..ed85b1a8c 100644
---
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/LanceArrowUtils.java
+++
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/LanceArrowUtils.java
@@ -47,31 +47,84 @@ import org.apache.arrow.vector.types.pojo.Schema;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
+import static org.apache.fluss.utils.Preconditions.checkArgument;
+
/**
* Utilities for converting Fluss RowType to non-shaded Arrow Schema. This is
needed because Lance
* requires non-shaded Arrow API.
*/
public class LanceArrowUtils {
+ /** Property suffix for configuring a fixed-size list Arrow type on array
columns. */
+ public static final String FIXED_SIZE_LIST_SIZE_SUFFIX =
".arrow.fixed-size-list.size";
+
/** Returns the non-shaded Arrow schema of the specified Fluss RowType. */
public static Schema toArrowSchema(RowType rowType) {
+ return toArrowSchema(rowType, Collections.emptyMap());
+ }
+
+ /**
+ * Returns the non-shaded Arrow schema of the specified Fluss RowType,
using table properties to
+ * determine whether array columns should use FixedSizeList instead of
List.
+ *
+ * <p>When a table property {@code <column>.arrow.fixed-size-list.size} is
set, the
+ * corresponding ARRAY column will be emitted as {@code
FixedSizeList<element>(size)} instead of
+ * {@code List<element>}.
+ */
+ public static Schema toArrowSchema(RowType rowType, Map<String, String>
tableProperties) {
List<Field> fields =
rowType.getFields().stream()
- .map(f -> toArrowField(f.getName(), f.getType()))
+ .map(f -> toArrowField(f.getName(), f.getType(),
tableProperties))
.collect(Collectors.toList());
return new Schema(fields);
}
- private static Field toArrowField(String fieldName, DataType logicalType) {
- FieldType fieldType =
- new FieldType(logicalType.isNullable(),
toArrowType(logicalType), null);
+ private static Field toArrowField(
+ String fieldName, DataType logicalType, Map<String, String>
tableProperties) {
+ checkArgument(
+ !fieldName.contains("."),
+ "Column name '%s' must not contain periods. "
+ + "Lance does not support field names with periods.",
+ fieldName);
+ ArrowType arrowType;
+ if (logicalType instanceof ArrayType && tableProperties != null) {
+ String sizeStr = tableProperties.get(fieldName +
FIXED_SIZE_LIST_SIZE_SUFFIX);
+ if (sizeStr != null) {
+ int listSize;
+ try {
+ listSize = Integer.parseInt(sizeStr);
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Invalid value '%s' for property '%s',
expected a positive integer.",
+ sizeStr, fieldName +
FIXED_SIZE_LIST_SIZE_SUFFIX),
+ e);
+ }
+
+ checkArgument(
+ listSize > 0,
+ "Invalid value '%s' for property '%s'. Expected a
positive integer.",
+ sizeStr,
+ fieldName + FIXED_SIZE_LIST_SIZE_SUFFIX);
+ arrowType = new ArrowType.FixedSizeList(listSize);
+ } else {
+ arrowType = toArrowType(logicalType);
+ }
+ } else {
+ arrowType = toArrowType(logicalType);
+ }
+ FieldType fieldType = new FieldType(logicalType.isNullable(),
arrowType, null);
List<Field> children = null;
if (logicalType instanceof ArrayType) {
children =
Collections.singletonList(
- toArrowField("element", ((ArrayType)
logicalType).getElementType()));
+ toArrowField(
+ "element",
+ ((ArrayType) logicalType).getElementType(),
+ tableProperties));
}
return new Field(fieldName, fieldType, children);
}
diff --git
a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java
b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java
index 59ca5b517..a7a13ff5e 100644
---
a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java
+++
b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java
@@ -37,6 +37,7 @@ import org.apache.fluss.record.ChangeType;
import org.apache.fluss.record.GenericRecord;
import org.apache.fluss.record.LogRecord;
import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.GenericArray;
import org.apache.fluss.row.GenericRow;
import org.apache.fluss.types.DataTypes;
import org.apache.fluss.utils.types.Tuple2;
@@ -46,7 +47,9 @@ import com.lancedb.lance.WriteParams;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.FixedSizeListVector;
import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.types.pojo.ArrowType;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
@@ -69,6 +72,8 @@ import static org.assertj.core.api.Assertions.assertThat;
/** The UT for tiering to Lance via {@link LanceLakeTieringFactory}. */
class LanceTieringTest {
+ private static final int EMBEDDING_LIST_SIZE = 4;
+
private @TempDir File tempWarehouseDir;
private LanceLakeTieringFactory lanceLakeTieringFactory;
private Configuration configuration;
@@ -91,13 +96,16 @@ class LanceTieringTest {
TablePath tablePath = TablePath.of("lance", "logTable");
Map<String, String> customProperties = new HashMap<>();
customProperties.put("lance.batch_size", "256");
+ customProperties.put(
+ "embedding" + LanceArrowUtils.FIXED_SIZE_LIST_SIZE_SUFFIX,
+ String.valueOf(EMBEDDING_LIST_SIZE));
LanceConfig config =
LanceConfig.from(
configuration.toMap(),
customProperties,
tablePath.getDatabaseName(),
tablePath.getTableName());
- Schema schema = createTable(config);
+ Schema schema = createTable(config, customProperties);
TableDescriptor descriptor =
TableDescriptor.builder()
@@ -180,6 +188,13 @@ class LanceTieringTest {
new RootAllocator(),
config.getDatasetUri(),
LanceConfig.genReadOptionFromConfig(config))) {
+ // verify the embedding column uses FixedSizeList in the Lance
schema
+ org.apache.arrow.vector.types.pojo.Field embeddingField =
+ dataset.getSchema().findField("embedding");
+
assertThat(embeddingField.getType()).isInstanceOf(ArrowType.FixedSizeList.class);
+ assertThat(((ArrowType.FixedSizeList)
embeddingField.getType()).getListSize())
+ .isEqualTo(EMBEDDING_LIST_SIZE);
+
ArrowReader reader = dataset.newScan().scanBatches();
VectorSchemaRoot readerRoot = reader.getVectorSchemaRoot();
@@ -189,8 +204,7 @@ class LanceTieringTest {
reader.loadNextBatch();
Tuple2<String, Integer> partitionBucket =
Tuple2.of(partition, bucket);
List<LogRecord> expectRecords =
recordsByBucket.get(partitionBucket);
- verifyLogTableRecords(
- readerRoot, expectRecords, bucket, isPartitioned,
partition);
+ verifyLogTableRecords(readerRoot, expectRecords);
}
}
assertThat(reader.loadNextBatch()).isFalse();
@@ -216,14 +230,13 @@ class LanceTieringTest {
}
}
- private void verifyLogTableRecords(
- VectorSchemaRoot root,
- List<LogRecord> expectRecords,
- int expectBucket,
- boolean isPartitioned,
- @Nullable String partition)
- throws Exception {
+ private void verifyLogTableRecords(VectorSchemaRoot root, List<LogRecord>
expectRecords) {
assertThat(root.getRowCount()).isEqualTo(expectRecords.size());
+
+ // verify the embedding vector is a FixedSizeListVector
+
assertThat(root.getVector("embedding")).isInstanceOf(FixedSizeListVector.class);
+ FixedSizeListVector embeddingVector = (FixedSizeListVector)
root.getVector("embedding");
+
for (int i = 0; i < expectRecords.size(); i++) {
LogRecord expectRecord = expectRecords.get(i);
// check business columns:
@@ -233,6 +246,13 @@ class LanceTieringTest {
.isEqualTo(expectRecord.getRow().getString(1).toString());
assertThat(((VarCharVector)
root.getVector(2)).getObject(i).toString())
.isEqualTo(expectRecord.getRow().getString(2).toString());
+ // check embedding column
+ java.util.List<?> embeddingValues = embeddingVector.getObject(i);
+ assertThat(embeddingValues).hasSize(EMBEDDING_LIST_SIZE);
+ org.apache.fluss.row.InternalArray expectedArray =
expectRecord.getRow().getArray(3);
+ for (int j = 0; j < EMBEDDING_LIST_SIZE; j++) {
+ assertThat((Float)
embeddingValues.get(j)).isEqualTo(expectedArray.getFloat(j));
+ }
}
}
@@ -296,19 +316,21 @@ class LanceTieringTest {
List<LogRecord> logRecords = new ArrayList<>();
for (int i = 0; i < numRecords; i++) {
GenericRow genericRow;
- if (partition != null) {
- // Partitioned table: include partition field in data
- genericRow = new GenericRow(3); // c1, c2, c3(partition)
- genericRow.setField(0, i);
- genericRow.setField(1, BinaryString.fromString("bucket" +
bucket + "_" + i));
- genericRow.setField(2, BinaryString.fromString(partition)); //
partition field
- } else {
- // Non-partitioned table
- genericRow = new GenericRow(3);
- genericRow.setField(0, i);
- genericRow.setField(1, BinaryString.fromString("bucket" +
bucket + "_" + i));
+
+ // Partitioned table: include partition field in data
+ genericRow = new GenericRow(4); // c1, c2, c3(partition), embedding
+ genericRow.setField(0, i);
+ genericRow.setField(1, BinaryString.fromString("bucket" + bucket +
"_" + i));
+
+ if (partition == null) {
genericRow.setField(2, BinaryString.fromString("bucket" +
bucket));
+ } else {
+ genericRow.setField(2, BinaryString.fromString(partition));
}
+
+ genericRow.setField(
+ 3, new GenericArray(new float[] {0.1f * i, 0.2f * i, 0.3f
* i, 0.4f * i}));
+
LogRecord logRecord =
new GenericRecord(
i, System.currentTimeMillis(),
ChangeType.APPEND_ONLY, genericRow);
@@ -317,16 +339,19 @@ class LanceTieringTest {
return Tuple2.of(logRecords, logRecords);
}
- private Schema createTable(LanceConfig config) {
+ private Schema createTable(LanceConfig config, Map<String, String>
customProperties) {
List<Schema.Column> columns = new ArrayList<>();
columns.add(new Schema.Column("c1", DataTypes.INT()));
columns.add(new Schema.Column("c2", DataTypes.STRING()));
columns.add(new Schema.Column("c3", DataTypes.STRING()));
+ columns.add(new Schema.Column("embedding",
DataTypes.ARRAY(DataTypes.FLOAT())));
Schema.Builder schemaBuilder =
Schema.newBuilder().fromColumns(columns);
Schema schema = schemaBuilder.build();
WriteParams params = LanceConfig.genWriteParamsFromConfig(config);
LanceDatasetAdapter.createDataset(
- config.getDatasetUri(),
LanceArrowUtils.toArrowSchema(schema.getRowType()), params);
+ config.getDatasetUri(),
+ LanceArrowUtils.toArrowSchema(schema.getRowType(),
customProperties),
+ params);
return schema;
}
diff --git
a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/utils/ArrowDataConverterTest.java
b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/utils/ArrowDataConverterTest.java
new file mode 100644
index 000000000..a5face19a
--- /dev/null
+++
b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/utils/ArrowDataConverterTest.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.lance.utils;
+
+import org.apache.fluss.lake.lance.tiering.ShadedArrowBatchWriter;
+import org.apache.fluss.row.GenericArray;
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.types.RowType;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.FixedSizeListVector;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link ArrowDataConverter#convertToNonShaded}. */
+class ArrowDataConverterTest {
+
+ private
org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator
shadedAllocator;
+ private BufferAllocator nonShadedAllocator;
+
+ @BeforeEach
+ void setUp() {
+ shadedAllocator =
+ new
org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator(
+ Long.MAX_VALUE);
+ nonShadedAllocator = new RootAllocator(Long.MAX_VALUE);
+ }
+
+ @AfterEach
+ void tearDown() {
+ shadedAllocator.close();
+ nonShadedAllocator.close();
+ }
+
+ @Test
+ void testConvertListToFixedSizeList() {
+ int listSize = 3;
+ RowType rowType =
+ DataTypes.ROW(DataTypes.FIELD("embedding",
DataTypes.ARRAY(DataTypes.FLOAT())));
+
+ Map<String, String> properties = new HashMap<>();
+ properties.put("embedding.arrow.fixed-size-list.size",
String.valueOf(listSize));
+ Schema nonShadedSchema = LanceArrowUtils.toArrowSchema(rowType,
properties);
+
+ // Populate shaded root with 3 rows, each having a list of exactly 3
floats
+ float[][] data = {
+ {1.0f, 2.0f, 3.0f},
+ {4.0f, 5.0f, 6.0f},
+ {7.0f, 8.0f, 9.0f}
+ };
+
+ try (ShadedArrowBatchWriter writer = new
ShadedArrowBatchWriter(shadedAllocator, rowType)) {
+ for (float[] floats : data) {
+ GenericRow row = new GenericRow(1);
+ row.setField(0, new GenericArray(floats));
+ writer.writeRow(row);
+ }
+ writer.finish();
+
+ try (VectorSchemaRoot nonShadedRoot =
+ ArrowDataConverter.convertToNonShaded(
+ writer.getShadedRoot(), nonShadedAllocator,
nonShadedSchema)) {
+ assertThat(nonShadedRoot.getRowCount()).isEqualTo(3);
+ assertThat(nonShadedRoot.getVector("embedding"))
+ .isInstanceOf(FixedSizeListVector.class);
+
+ FixedSizeListVector fixedSizeListVector =
+ (FixedSizeListVector)
nonShadedRoot.getVector("embedding");
+
assertThat(fixedSizeListVector.getListSize()).isEqualTo(listSize);
+
+ for (int i = 0; i < data.length; i++) {
+ List<?> values = fixedSizeListVector.getObject(i);
+ assertThat(values).hasSize(listSize);
+ for (int j = 0; j < listSize; j++) {
+ assertThat((Float)
values.get(j)).isEqualTo(data[i][j]);
+ }
+ }
+ }
+ }
+ }
+
+ @Test
+ void testConvertListToFixedSizeListMismatchedCount() {
+ int listSize = 3;
+ RowType rowType =
+ DataTypes.ROW(DataTypes.FIELD("embedding",
DataTypes.ARRAY(DataTypes.FLOAT())));
+
+ Map<String, String> properties = new HashMap<>();
+ properties.put("embedding.arrow.fixed-size-list.size",
String.valueOf(listSize));
+ Schema nonShadedSchema = LanceArrowUtils.toArrowSchema(rowType,
properties);
+
+ // Write 2 rows with 2 elements each: total child elements = 4,
expected = 2*3 = 6
+ try (ShadedArrowBatchWriter writer = new
ShadedArrowBatchWriter(shadedAllocator, rowType)) {
+ GenericRow row1 = new GenericRow(1);
+ row1.setField(0, new GenericArray(new float[] {1.0f, 2.0f}));
+ writer.writeRow(row1);
+
+ GenericRow row2 = new GenericRow(1);
+ row2.setField(0, new GenericArray(new float[] {3.0f, 4.0f}));
+ writer.writeRow(row2);
+ writer.finish();
+
+ assertThatThrownBy(
+ () ->
+ ArrowDataConverter.convertToNonShaded(
+ writer.getShadedRoot(),
+ nonShadedAllocator,
+ nonShadedSchema))
+ .isInstanceOf(IllegalArgumentException.class);
+
+ // Verify no memory leaked from the failed conversion
+ assertThat(nonShadedAllocator.getAllocatedMemory()).isZero();
+ }
+ }
+
+ @Test
+ void testConvertListToFixedSizeListWithNulls() {
+ int listSize = 3;
+ RowType rowType =
+ DataTypes.ROW(DataTypes.FIELD("embedding",
DataTypes.ARRAY(DataTypes.FLOAT())));
+
+ Map<String, String> properties = new HashMap<>();
+ properties.put("embedding.arrow.fixed-size-list.size",
String.valueOf(listSize));
+ Schema nonShadedSchema = LanceArrowUtils.toArrowSchema(rowType,
properties);
+
+ // 3 rows: non-null, null, non-null
+ try (ShadedArrowBatchWriter writer = new
ShadedArrowBatchWriter(shadedAllocator, rowType)) {
+ GenericRow row1 = new GenericRow(1);
+ row1.setField(0, new GenericArray(new float[] {1.0f, 2.0f, 3.0f}));
+ writer.writeRow(row1);
+
+ GenericRow row2 = new GenericRow(1);
+ row2.setField(0, null); // null embedding
+ writer.writeRow(row2);
+
+ GenericRow row3 = new GenericRow(1);
+ row3.setField(0, new GenericArray(new float[] {7.0f, 8.0f, 9.0f}));
+ writer.writeRow(row3);
+ writer.finish();
+
+ try (VectorSchemaRoot nonShadedRoot =
+ ArrowDataConverter.convertToNonShaded(
+ writer.getShadedRoot(), nonShadedAllocator,
nonShadedSchema)) {
+ assertThat(nonShadedRoot.getRowCount()).isEqualTo(3);
+
+ FixedSizeListVector fixedSizeListVector =
+ (FixedSizeListVector)
nonShadedRoot.getVector("embedding");
+
+ // Row 0: non-null
+ assertThat(fixedSizeListVector.isNull(0)).isFalse();
+ List<?> values0 = fixedSizeListVector.getObject(0);
+ assertThat(values0).hasSize(3);
+ assertThat((Float) values0.get(0)).isEqualTo(1.0f);
+ assertThat((Float) values0.get(1)).isEqualTo(2.0f);
+ assertThat((Float) values0.get(2)).isEqualTo(3.0f);
+
+ // Row 1: null
+ assertThat(fixedSizeListVector.isNull(1)).isTrue();
+
+ // Row 2: non-null
+ assertThat(fixedSizeListVector.isNull(2)).isFalse();
+ List<?> values2 = fixedSizeListVector.getObject(2);
+ assertThat(values2).hasSize(3);
+ assertThat((Float) values2.get(0)).isEqualTo(7.0f);
+ assertThat((Float) values2.get(1)).isEqualTo(8.0f);
+ assertThat((Float) values2.get(2)).isEqualTo(9.0f);
+ }
+ }
+ }
+}
diff --git
a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/utils/LanceArrowUtilsTest.java
b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/utils/LanceArrowUtilsTest.java
new file mode 100644
index 000000000..d682b0d88
--- /dev/null
+++
b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/utils/LanceArrowUtilsTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.lance.utils;
+
+import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.types.RowType;
+
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link LanceArrowUtils#toArrowSchema(RowType, Map)}. */
+class LanceArrowUtilsTest {
+
+ @Test
+ void testArrayColumnWithoutProperty() {
+ RowType rowType =
+ DataTypes.ROW(DataTypes.FIELD("embedding",
DataTypes.ARRAY(DataTypes.FLOAT())));
+
+ Schema schema = LanceArrowUtils.toArrowSchema(rowType,
Collections.emptyMap());
+
+ Field embeddingField = schema.findField("embedding");
+
assertThat(embeddingField.getType()).isInstanceOf(ArrowType.List.class);
+ }
+
+ @Test
+ void testArrayColumnWithFixedSizeListProperty() {
+ RowType rowType =
+ DataTypes.ROW(DataTypes.FIELD("embedding",
DataTypes.ARRAY(DataTypes.FLOAT())));
+
+ Map<String, String> properties = new HashMap<>();
+ properties.put("embedding.arrow.fixed-size-list.size", "4");
+
+ Schema schema = LanceArrowUtils.toArrowSchema(rowType, properties);
+
+ Field embeddingField = schema.findField("embedding");
+
assertThat(embeddingField.getType()).isInstanceOf(ArrowType.FixedSizeList.class);
+ assertThat(((ArrowType.FixedSizeList)
embeddingField.getType()).getListSize()).isEqualTo(4);
+
+ // Child should still be a float element
+ assertThat(embeddingField.getChildren()).hasSize(1);
+
assertThat(embeddingField.getChildren().get(0).getName()).isEqualTo("element");
+ }
+
+ @Test
+ void testArrayColumnWithZeroSize() {
+ RowType rowType =
+ DataTypes.ROW(DataTypes.FIELD("embedding",
DataTypes.ARRAY(DataTypes.FLOAT())));
+
+ Map<String, String> properties = new HashMap<>();
+ properties.put("embedding.arrow.fixed-size-list.size", "0");
+
+ assertThatThrownBy(() -> LanceArrowUtils.toArrowSchema(rowType,
properties))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ void testArrayColumnWithNegativeSize() {
+ RowType rowType =
+ DataTypes.ROW(DataTypes.FIELD("embedding",
DataTypes.ARRAY(DataTypes.FLOAT())));
+
+ Map<String, String> properties = new HashMap<>();
+ properties.put("embedding.arrow.fixed-size-list.size", "-1");
+
+ assertThatThrownBy(() -> LanceArrowUtils.toArrowSchema(rowType,
properties))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ void testArrayColumnWithNonNumericSize() {
+ RowType rowType =
+ DataTypes.ROW(DataTypes.FIELD("embedding",
DataTypes.ARRAY(DataTypes.FLOAT())));
+
+ Map<String, String> properties = new HashMap<>();
+ properties.put("embedding.arrow.fixed-size-list.size", "abc");
+
+ assertThatThrownBy(() -> LanceArrowUtils.toArrowSchema(rowType,
properties))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ void testToArrowSchemaWithEmptyProperties() {
+ RowType rowType =
+ DataTypes.ROW(DataTypes.FIELD("embedding",
DataTypes.ARRAY(DataTypes.FLOAT())));
+
+ Schema schema = LanceArrowUtils.toArrowSchema(rowType,
Collections.emptyMap());
+
+ Field embeddingField = schema.findField("embedding");
+
assertThat(embeddingField.getType()).isInstanceOf(ArrowType.List.class);
+ }
+
+ @Test
+ void testColumnNameWithPeriodThrows() {
+ RowType rowType = DataTypes.ROW(DataTypes.FIELD("my.embedding",
DataTypes.FLOAT()));
+
+ assertThatThrownBy(() -> LanceArrowUtils.toArrowSchema(rowType,
Collections.emptyMap()))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("must not contain periods");
+ }
+
+ @Test
+ void testToArrowSchemaWithNullProperties() {
+ RowType rowType =
+ DataTypes.ROW(DataTypes.FIELD("embedding",
DataTypes.ARRAY(DataTypes.FLOAT())));
+
+ Schema schema = LanceArrowUtils.toArrowSchema(rowType, null);
+
+ Field embeddingField = schema.findField("embedding");
+
assertThat(embeddingField.getType()).isInstanceOf(ArrowType.List.class);
+ }
+}