This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 7ad353fa1 [lake/lance] Add Array type support for Lance (#2537)
7ad353fa1 is described below
commit 7ad353fa118521bdbdca46cd6960d1c022b8e0db
Author: ForwardXu <[email protected]>
AuthorDate: Wed Feb 4 19:08:44 2026 +0800
[lake/lance] Add Array type support for Lance (#2537)
---
.../fluss/lake/lance/utils/ArrowDataConverter.java | 53 +++++++++
.../fluss/lake/lance/utils/LanceArrowUtils.java | 12 +-
.../lance/testutils/FlinkLanceTieringTestBase.java | 19 ++++
.../lake/lance/tiering/LanceTieringITCase.java | 124 ++++++++++++++++-----
4 files changed, 177 insertions(+), 31 deletions(-)
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/ArrowDataConverter.java
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/ArrowDataConverter.java
index eb8c53b22..ed822e8b2 100644
---
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/ArrowDataConverter.java
+++
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/ArrowDataConverter.java
@@ -21,6 +21,7 @@ import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.types.pojo.Schema;
import java.nio.ByteBuffer;
@@ -68,6 +69,18 @@ public class ArrowDataConverter {
private static void copyVectorData(
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.FieldVector
shadedVector,
FieldVector nonShadedVector) {
+
+ if (shadedVector
+ instanceof
+
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.ListVector
+ && nonShadedVector instanceof ListVector) {
+ copyListVectorData(
+
(org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.ListVector)
+ shadedVector,
+ (ListVector) nonShadedVector);
+ return;
+ }
+
List<org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowBuf>
shadedBuffers =
shadedVector.getFieldBuffers();
@@ -90,4 +103,44 @@ public class ArrowDataConverter {
}
}
}
+
+ private static void copyListVectorData(
+
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.ListVector
+ shadedListVector,
+ ListVector nonShadedListVector) {
+
+ // First, recursively copy the child data vector
+ org.apache.fluss.shaded.arrow.org.apache.arrow.vector.FieldVector
shadedDataVector =
+ shadedListVector.getDataVector();
+ FieldVector nonShadedDataVector = nonShadedListVector.getDataVector();
+
+ if (shadedDataVector != null && nonShadedDataVector != null) {
+ copyVectorData(shadedDataVector, nonShadedDataVector);
+ }
+
+ // Copy the ListVector's own buffers (validity and offset buffers)
+ List<org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowBuf>
shadedBuffers =
+ shadedListVector.getFieldBuffers();
+ List<ArrowBuf> nonShadedBuffers =
nonShadedListVector.getFieldBuffers();
+
+ for (int i = 0; i < Math.min(shadedBuffers.size(),
nonShadedBuffers.size()); i++) {
+ org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowBuf
shadedBuf =
+ shadedBuffers.get(i);
+ ArrowBuf nonShadedBuf = nonShadedBuffers.get(i);
+
+ long size = Math.min(shadedBuf.capacity(),
nonShadedBuf.capacity());
+ if (size > 0) {
+ ByteBuffer srcBuffer = shadedBuf.nioBuffer(0, (int) size);
+ srcBuffer.position(0);
+ srcBuffer.limit((int) Math.min(size, Integer.MAX_VALUE));
+ nonShadedBuf.setBytes(0, srcBuffer);
+ }
+ }
+
+ // Set value count WITHOUT calling setValueCount() which would
overwrite offset buffer
+ // Instead, directly set the internal value count field
+ int valueCount = shadedListVector.getValueCount();
+ // For ListVector, we need to manually set lastSet to avoid offset
buffer recalculation
+ nonShadedListVector.setLastSet(valueCount - 1);
+ }
}
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/LanceArrowUtils.java
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/LanceArrowUtils.java
index 7e9448312..681367a32 100644
---
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/LanceArrowUtils.java
+++
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/LanceArrowUtils.java
@@ -17,6 +17,7 @@
package org.apache.fluss.lake.lance.utils;
+import org.apache.fluss.types.ArrayType;
import org.apache.fluss.types.BigIntType;
import org.apache.fluss.types.BinaryType;
import org.apache.fluss.types.BooleanType;
@@ -44,6 +45,7 @@ import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;
+import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
@@ -65,7 +67,13 @@ public class LanceArrowUtils {
private static Field toArrowField(String fieldName, DataType logicalType) {
FieldType fieldType =
new FieldType(logicalType.isNullable(),
toArrowType(logicalType), null);
- return new Field(fieldName, fieldType, null);
+ List<Field> children = null;
+ if (logicalType instanceof ArrayType) {
+ children =
+ Collections.singletonList(
+ toArrowField("element", ((ArrayType)
logicalType).getElementType()));
+ }
+ return new Field(fieldName, fieldType, children);
}
private static ArrowType toArrowType(DataType dataType) {
@@ -129,6 +137,8 @@ public class LanceArrowUtils {
} else {
return new ArrowType.Timestamp(TimeUnit.NANOSECOND, null);
}
+ } else if (dataType instanceof ArrayType) {
+ return ArrowType.List.INSTANCE;
} else {
throw new UnsupportedOperationException(
String.format(
diff --git
a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/testutils/FlinkLanceTieringTestBase.java
b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/testutils/FlinkLanceTieringTestBase.java
index 8a429764d..64fc8f71f 100644
---
a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/testutils/FlinkLanceTieringTestBase.java
+++
b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/testutils/FlinkLanceTieringTestBase.java
@@ -178,6 +178,25 @@ public class FlinkLanceTieringTestBase {
return createTable(tablePath, tableBuilder.build());
}
+ protected long createLogTableWithAllArrayTypes(TablePath tablePath) throws
Exception {
+ Schema.Builder schemaBuilder =
+ Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .column("b", DataTypes.STRING())
+ .column("tags", DataTypes.ARRAY(DataTypes.STRING()))
+ .column("scores", DataTypes.ARRAY(DataTypes.INT()))
+ .column("embedding",
DataTypes.ARRAY(DataTypes.FLOAT()));
+
+ TableDescriptor.Builder tableBuilder =
+ TableDescriptor.builder()
+ .distributedBy(1, "a")
+ .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(),
"true")
+ .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS,
Duration.ofMillis(500));
+
+ tableBuilder.schema(schemaBuilder.build());
+ return createTable(tablePath, tableBuilder.build());
+ }
+
protected void writeRows(TablePath tablePath, List<InternalRow> rows,
boolean append)
throws Exception {
try (Table table = conn.getTable(tablePath)) {
diff --git
a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringITCase.java
b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringITCase.java
index 98dc30dd0..74be68e2d 100644
---
a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringITCase.java
+++
b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringITCase.java
@@ -23,14 +23,13 @@ import org.apache.fluss.lake.lance.LanceConfig;
import org.apache.fluss.lake.lance.testutils.FlinkLanceTieringTestBase;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TablePath;
-import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.GenericArray;
import org.apache.fluss.server.zk.data.lake.LakeTable;
import com.lancedb.lance.Dataset;
import com.lancedb.lance.ReadOptions;
import com.lancedb.lance.Transaction;
import org.apache.arrow.memory.RootAllocator;
-import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.flink.core.execution.JobClient;
@@ -38,11 +37,8 @@ import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
import java.util.Map;
import static
org.apache.fluss.lake.committer.LakeCommitter.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
@@ -67,36 +63,79 @@ class LanceTieringITCase extends FlinkLanceTieringTestBase {
@Test
void testTiering() throws Exception {
- // create log table
+ // Test 1: Basic log table with INT and STRING columns
TablePath t1 = TablePath.of(DEFAULT_DB, "logTable");
long t1Id = createLogTable(t1);
TableBucket t1Bucket = new TableBucket(t1Id, 0);
- List<InternalRow> flussRows = new ArrayList<>();
+
// write records
for (int i = 0; i < 10; i++) {
- List<InternalRow> rows = Arrays.asList(row(1, "v1"), row(2, "v2"),
row(3, "v3"));
- flussRows.addAll(rows);
- // write records
- writeRows(t1, rows, true);
+ writeRows(t1, Arrays.asList(row(1, "v1"), row(2, "v2"), row(3,
"v3")), true);
}
// then start tiering job
JobClient jobClient = buildTieringJob(execEnv);
- // check the status of replica after synced;
- // note: we can't update log start offset for unaware bucket mode log
table
+ // check the status of replica after synced
assertReplicaStatus(t1Bucket, 30);
- LanceConfig config =
+ LanceConfig config1 =
LanceConfig.from(
lanceConf.toMap(),
Collections.emptyMap(),
t1.getDatabaseName(),
t1.getTableName());
- // check data in lance
- checkDataInLanceAppendOnlyTable(config, flussRows);
- checkSnapshotPropertyInLance(config,
Collections.singletonMap(t1Bucket, 30L));
+ // check data in lance using TSV string comparison
+ String expectedTsv1 = buildExpectedTsvForBasicTable(30);
+ checkDataInLance(config1, expectedTsv1);
+ checkSnapshotPropertyInLance(config1,
Collections.singletonMap(t1Bucket, 30L));
+
+ // Test 2: Log table with multiple array type columns (STRING, INT,
FLOAT)
+ TablePath t2 = TablePath.of(DEFAULT_DB, "logTableWithArrays");
+ long t2Id = createLogTableWithAllArrayTypes(t2);
+ TableBucket t2Bucket = new TableBucket(t2Id, 0);
+
+ // write records with various array types
+ for (int i = 0; i < 10; i++) {
+ writeRows(
+ t2,
+ Arrays.asList(
+ row(
+ 1,
+ "v1",
+ new String[] {"tag1", "tag2"},
+ new int[] {10, 20},
+ new GenericArray(new float[] {0.1f, 0.2f,
0.3f, 0.4f})),
+ row(
+ 2,
+ "v2",
+ new String[] {"tag3"},
+ new int[] {30, 40, 50},
+ new GenericArray(new float[] {0.5f, 0.6f,
0.7f, 0.8f})),
+ row(
+ 3,
+ "v3",
+ new String[] {"tag4", "tag5", "tag6"},
+ new int[] {60},
+ new GenericArray(new float[] {0.9f, 1.0f,
1.1f, 1.2f}))),
+ true);
+ }
+
+ // check the status of replica after synced
+ assertReplicaStatus(t2Bucket, 30);
+
+ LanceConfig config2 =
+ LanceConfig.from(
+ lanceConf.toMap(),
+ Collections.emptyMap(),
+ t2.getDatabaseName(),
+ t2.getTableName());
+
+ // check data in lance using TSV string comparison
+ String expectedTsv2 = buildExpectedTsvForArrayTable(30);
+ checkDataInLance(config2, expectedTsv2);
+ checkSnapshotPropertyInLance(config2,
Collections.singletonMap(t2Bucket, 30L));
jobClient.cancel().get();
}
@@ -121,8 +160,7 @@ class LanceTieringITCase extends FlinkLanceTieringTestBase {
}
}
- private void checkDataInLanceAppendOnlyTable(LanceConfig config,
List<InternalRow> expectedRows)
- throws Exception {
+ private void checkDataInLance(LanceConfig config, String expectedTsv)
throws Exception {
try (Dataset dataset =
Dataset.open(
allocator,
@@ -130,18 +168,44 @@ class LanceTieringITCase extends
FlinkLanceTieringTestBase {
LanceConfig.genReadOptionFromConfig(config))) {
ArrowReader reader = dataset.newScan().scanBatches();
VectorSchemaRoot readerRoot = reader.getVectorSchemaRoot();
- reader.loadNextBatch();
- Iterator<InternalRow> flussRowIterator = expectedRows.iterator();
- int rowCount = readerRoot.getRowCount();
- for (int i = 0; i < rowCount; i++) {
- InternalRow flussRow = flussRowIterator.next();
- assertThat((int) (readerRoot.getVector(0).getObject(i)))
- .isEqualTo(flussRow.getInt(0));
- assertThat(((VarCharVector)
readerRoot.getVector(1)).getObject(i).toString())
- .isEqualTo(flussRow.getString(1).toString());
+ StringBuilder actualTsvBuilder = new StringBuilder();
+ boolean isFirstBatch = true;
+ while (reader.loadNextBatch()) {
+ String batchTsv = readerRoot.contentToTSVString();
+ if (isFirstBatch) {
+ actualTsvBuilder.append(batchTsv);
+ isFirstBatch = false;
+ } else {
+ // Skip header line for subsequent batches
+ int firstNewline = batchTsv.indexOf('\n');
+ if (firstNewline >= 0 && firstNewline < batchTsv.length()
- 1) {
+
actualTsvBuilder.append(batchTsv.substring(firstNewline + 1));
+ }
+ }
}
- assertThat(reader.loadNextBatch()).isFalse();
- assertThat(flussRowIterator.hasNext()).isFalse();
+ assertThat(actualTsvBuilder.toString()).isEqualTo(expectedTsv);
+ }
+ }
+
+ private String buildExpectedTsvForBasicTable(int rowCount) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("a\tb\n");
+ for (int i = 0; i < rowCount / 3; i++) {
+ sb.append("1\tv1\n");
+ sb.append("2\tv2\n");
+ sb.append("3\tv3\n");
+ }
+ return sb.toString();
+ }
+
+ private String buildExpectedTsvForArrayTable(int rowCount) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("a\tb\ttags\tscores\tembedding\n");
+ for (int i = 0; i < rowCount / 3; i++) {
+
sb.append("1\tv1\t[\"tag1\",\"tag2\"]\t[10,20]\t[0.1,0.2,0.3,0.4]\n");
+ sb.append("2\tv2\t[\"tag3\"]\t[30,40,50]\t[0.5,0.6,0.7,0.8]\n");
+
sb.append("3\tv3\t[\"tag4\",\"tag5\",\"tag6\"]\t[60]\t[0.9,1.0,1.1,1.2]\n");
}
+ return sb.toString();
}
}