This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 7ad353fa1 [lake/lance] Add Array type support for Lance (#2537)
7ad353fa1 is described below

commit 7ad353fa118521bdbdca46cd6960d1c022b8e0db
Author: ForwardXu <[email protected]>
AuthorDate: Wed Feb 4 19:08:44 2026 +0800

    [lake/lance] Add Array type support for Lance (#2537)
---
 .../fluss/lake/lance/utils/ArrowDataConverter.java |  53 +++++++++
 .../fluss/lake/lance/utils/LanceArrowUtils.java    |  12 +-
 .../lance/testutils/FlinkLanceTieringTestBase.java |  19 ++++
 .../lake/lance/tiering/LanceTieringITCase.java     | 124 ++++++++++++++++-----
 4 files changed, 177 insertions(+), 31 deletions(-)

diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/ArrowDataConverter.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/ArrowDataConverter.java
index eb8c53b22..ed822e8b2 100644
--- 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/ArrowDataConverter.java
+++ 
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/ArrowDataConverter.java
@@ -21,6 +21,7 @@ import org.apache.arrow.memory.ArrowBuf;
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.vector.FieldVector;
 import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.ListVector;
 import org.apache.arrow.vector.types.pojo.Schema;
 
 import java.nio.ByteBuffer;
@@ -68,6 +69,18 @@ public class ArrowDataConverter {
     private static void copyVectorData(
             org.apache.fluss.shaded.arrow.org.apache.arrow.vector.FieldVector 
shadedVector,
             FieldVector nonShadedVector) {
+
+        if (shadedVector
+                        instanceof
+                        
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.ListVector
+                && nonShadedVector instanceof ListVector) {
+            copyListVectorData(
+                    
(org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.ListVector)
+                            shadedVector,
+                    (ListVector) nonShadedVector);
+            return;
+        }
+
         List<org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowBuf> 
shadedBuffers =
                 shadedVector.getFieldBuffers();
 
@@ -90,4 +103,44 @@ public class ArrowDataConverter {
             }
         }
     }
+
+    private static void copyListVectorData(
+            
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.ListVector
+                    shadedListVector,
+            ListVector nonShadedListVector) {
+
+        // First, recursively copy the child data vector
+        org.apache.fluss.shaded.arrow.org.apache.arrow.vector.FieldVector 
shadedDataVector =
+                shadedListVector.getDataVector();
+        FieldVector nonShadedDataVector = nonShadedListVector.getDataVector();
+
+        if (shadedDataVector != null && nonShadedDataVector != null) {
+            copyVectorData(shadedDataVector, nonShadedDataVector);
+        }
+
+        // Copy the ListVector's own buffers (validity and offset buffers)
+        List<org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowBuf> 
shadedBuffers =
+                shadedListVector.getFieldBuffers();
+        List<ArrowBuf> nonShadedBuffers = 
nonShadedListVector.getFieldBuffers();
+
+        for (int i = 0; i < Math.min(shadedBuffers.size(), 
nonShadedBuffers.size()); i++) {
+            org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowBuf 
shadedBuf =
+                    shadedBuffers.get(i);
+            ArrowBuf nonShadedBuf = nonShadedBuffers.get(i);
+
+            long size = Math.min(shadedBuf.capacity(), 
nonShadedBuf.capacity());
+            if (size > 0) {
+                ByteBuffer srcBuffer = shadedBuf.nioBuffer(0, (int) size);
+                srcBuffer.position(0);
+                srcBuffer.limit((int) Math.min(size, Integer.MAX_VALUE));
+                nonShadedBuf.setBytes(0, srcBuffer);
+            }
+        }
+
+        // Set value count WITHOUT calling setValueCount() which would 
overwrite offset buffer
+        // Instead, directly set the internal value count field
+        int valueCount = shadedListVector.getValueCount();
+        // For ListVector, we need to manually set lastSet to avoid offset 
buffer recalculation
+        nonShadedListVector.setLastSet(valueCount - 1);
+    }
 }
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/LanceArrowUtils.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/LanceArrowUtils.java
index 7e9448312..681367a32 100644
--- 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/LanceArrowUtils.java
+++ 
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/LanceArrowUtils.java
@@ -17,6 +17,7 @@
 
 package org.apache.fluss.lake.lance.utils;
 
+import org.apache.fluss.types.ArrayType;
 import org.apache.fluss.types.BigIntType;
 import org.apache.fluss.types.BinaryType;
 import org.apache.fluss.types.BooleanType;
@@ -44,6 +45,7 @@ import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.FieldType;
 import org.apache.arrow.vector.types.pojo.Schema;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -65,7 +67,13 @@ public class LanceArrowUtils {
     private static Field toArrowField(String fieldName, DataType logicalType) {
         FieldType fieldType =
                 new FieldType(logicalType.isNullable(), 
toArrowType(logicalType), null);
-        return new Field(fieldName, fieldType, null);
+        List<Field> children = null;
+        if (logicalType instanceof ArrayType) {
+            children =
+                    Collections.singletonList(
+                            toArrowField("element", ((ArrayType) 
logicalType).getElementType()));
+        }
+        return new Field(fieldName, fieldType, children);
     }
 
     private static ArrowType toArrowType(DataType dataType) {
@@ -129,6 +137,8 @@ public class LanceArrowUtils {
             } else {
                 return new ArrowType.Timestamp(TimeUnit.NANOSECOND, null);
             }
+        } else if (dataType instanceof ArrayType) {
+            return ArrowType.List.INSTANCE;
         } else {
             throw new UnsupportedOperationException(
                     String.format(
diff --git 
a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/testutils/FlinkLanceTieringTestBase.java
 
b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/testutils/FlinkLanceTieringTestBase.java
index 8a429764d..64fc8f71f 100644
--- 
a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/testutils/FlinkLanceTieringTestBase.java
+++ 
b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/testutils/FlinkLanceTieringTestBase.java
@@ -178,6 +178,25 @@ public class FlinkLanceTieringTestBase {
         return createTable(tablePath, tableBuilder.build());
     }
 
+    protected long createLogTableWithAllArrayTypes(TablePath tablePath) throws 
Exception {
+        Schema.Builder schemaBuilder =
+                Schema.newBuilder()
+                        .column("a", DataTypes.INT())
+                        .column("b", DataTypes.STRING())
+                        .column("tags", DataTypes.ARRAY(DataTypes.STRING()))
+                        .column("scores", DataTypes.ARRAY(DataTypes.INT()))
+                        .column("embedding", 
DataTypes.ARRAY(DataTypes.FLOAT()));
+
+        TableDescriptor.Builder tableBuilder =
+                TableDescriptor.builder()
+                        .distributedBy(1, "a")
+                        .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), 
"true")
+                        .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, 
Duration.ofMillis(500));
+
+        tableBuilder.schema(schemaBuilder.build());
+        return createTable(tablePath, tableBuilder.build());
+    }
+
     protected void writeRows(TablePath tablePath, List<InternalRow> rows, 
boolean append)
             throws Exception {
         try (Table table = conn.getTable(tablePath)) {
diff --git 
a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringITCase.java
 
b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringITCase.java
index 98dc30dd0..74be68e2d 100644
--- 
a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringITCase.java
+++ 
b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringITCase.java
@@ -23,14 +23,13 @@ import org.apache.fluss.lake.lance.LanceConfig;
 import org.apache.fluss.lake.lance.testutils.FlinkLanceTieringTestBase;
 import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.metadata.TablePath;
-import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.GenericArray;
 import org.apache.fluss.server.zk.data.lake.LakeTable;
 
 import com.lancedb.lance.Dataset;
 import com.lancedb.lance.ReadOptions;
 import com.lancedb.lance.Transaction;
 import org.apache.arrow.memory.RootAllocator;
-import org.apache.arrow.vector.VarCharVector;
 import org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.arrow.vector.ipc.ArrowReader;
 import org.apache.flink.core.execution.JobClient;
@@ -38,11 +37,8 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 
 import static 
org.apache.fluss.lake.committer.LakeCommitter.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
@@ -67,36 +63,79 @@ class LanceTieringITCase extends FlinkLanceTieringTestBase {
 
     @Test
     void testTiering() throws Exception {
-        // create log table
+        // Test 1: Basic log table with INT and STRING columns
         TablePath t1 = TablePath.of(DEFAULT_DB, "logTable");
         long t1Id = createLogTable(t1);
         TableBucket t1Bucket = new TableBucket(t1Id, 0);
-        List<InternalRow> flussRows = new ArrayList<>();
+
         // write records
         for (int i = 0; i < 10; i++) {
-            List<InternalRow> rows = Arrays.asList(row(1, "v1"), row(2, "v2"), 
row(3, "v3"));
-            flussRows.addAll(rows);
-            // write records
-            writeRows(t1, rows, true);
+            writeRows(t1, Arrays.asList(row(1, "v1"), row(2, "v2"), row(3, 
"v3")), true);
         }
 
         // then start tiering job
         JobClient jobClient = buildTieringJob(execEnv);
 
-        // check the status of replica after synced;
-        // note: we can't update log start offset for unaware bucket mode log 
table
+        // check the status of replica after synced
         assertReplicaStatus(t1Bucket, 30);
 
-        LanceConfig config =
+        LanceConfig config1 =
                 LanceConfig.from(
                         lanceConf.toMap(),
                         Collections.emptyMap(),
                         t1.getDatabaseName(),
                         t1.getTableName());
 
-        // check data in lance
-        checkDataInLanceAppendOnlyTable(config, flussRows);
-        checkSnapshotPropertyInLance(config, 
Collections.singletonMap(t1Bucket, 30L));
+        // check data in lance using TSV string comparison
+        String expectedTsv1 = buildExpectedTsvForBasicTable(30);
+        checkDataInLance(config1, expectedTsv1);
+        checkSnapshotPropertyInLance(config1, 
Collections.singletonMap(t1Bucket, 30L));
+
+        // Test 2: Log table with multiple array type columns (STRING, INT, 
FLOAT)
+        TablePath t2 = TablePath.of(DEFAULT_DB, "logTableWithArrays");
+        long t2Id = createLogTableWithAllArrayTypes(t2);
+        TableBucket t2Bucket = new TableBucket(t2Id, 0);
+
+        // write records with various array types
+        for (int i = 0; i < 10; i++) {
+            writeRows(
+                    t2,
+                    Arrays.asList(
+                            row(
+                                    1,
+                                    "v1",
+                                    new String[] {"tag1", "tag2"},
+                                    new int[] {10, 20},
+                                    new GenericArray(new float[] {0.1f, 0.2f, 
0.3f, 0.4f})),
+                            row(
+                                    2,
+                                    "v2",
+                                    new String[] {"tag3"},
+                                    new int[] {30, 40, 50},
+                                    new GenericArray(new float[] {0.5f, 0.6f, 
0.7f, 0.8f})),
+                            row(
+                                    3,
+                                    "v3",
+                                    new String[] {"tag4", "tag5", "tag6"},
+                                    new int[] {60},
+                                    new GenericArray(new float[] {0.9f, 1.0f, 
1.1f, 1.2f}))),
+                    true);
+        }
+
+        // check the status of replica after synced
+        assertReplicaStatus(t2Bucket, 30);
+
+        LanceConfig config2 =
+                LanceConfig.from(
+                        lanceConf.toMap(),
+                        Collections.emptyMap(),
+                        t2.getDatabaseName(),
+                        t2.getTableName());
+
+        // check data in lance using TSV string comparison
+        String expectedTsv2 = buildExpectedTsvForArrayTable(30);
+        checkDataInLance(config2, expectedTsv2);
+        checkSnapshotPropertyInLance(config2, 
Collections.singletonMap(t2Bucket, 30L));
 
         jobClient.cancel().get();
     }
@@ -121,8 +160,7 @@ class LanceTieringITCase extends FlinkLanceTieringTestBase {
         }
     }
 
-    private void checkDataInLanceAppendOnlyTable(LanceConfig config, 
List<InternalRow> expectedRows)
-            throws Exception {
+    private void checkDataInLance(LanceConfig config, String expectedTsv) 
throws Exception {
         try (Dataset dataset =
                 Dataset.open(
                         allocator,
@@ -130,18 +168,44 @@ class LanceTieringITCase extends 
FlinkLanceTieringTestBase {
                         LanceConfig.genReadOptionFromConfig(config))) {
             ArrowReader reader = dataset.newScan().scanBatches();
             VectorSchemaRoot readerRoot = reader.getVectorSchemaRoot();
-            reader.loadNextBatch();
-            Iterator<InternalRow> flussRowIterator = expectedRows.iterator();
-            int rowCount = readerRoot.getRowCount();
-            for (int i = 0; i < rowCount; i++) {
-                InternalRow flussRow = flussRowIterator.next();
-                assertThat((int) (readerRoot.getVector(0).getObject(i)))
-                        .isEqualTo(flussRow.getInt(0));
-                assertThat(((VarCharVector) 
readerRoot.getVector(1)).getObject(i).toString())
-                        .isEqualTo(flussRow.getString(1).toString());
+            StringBuilder actualTsvBuilder = new StringBuilder();
+            boolean isFirstBatch = true;
+            while (reader.loadNextBatch()) {
+                String batchTsv = readerRoot.contentToTSVString();
+                if (isFirstBatch) {
+                    actualTsvBuilder.append(batchTsv);
+                    isFirstBatch = false;
+                } else {
+                    // Skip header line for subsequent batches
+                    int firstNewline = batchTsv.indexOf('\n');
+                    if (firstNewline >= 0 && firstNewline < batchTsv.length() 
- 1) {
+                        
actualTsvBuilder.append(batchTsv.substring(firstNewline + 1));
+                    }
+                }
             }
-            assertThat(reader.loadNextBatch()).isFalse();
-            assertThat(flussRowIterator.hasNext()).isFalse();
+            assertThat(actualTsvBuilder.toString()).isEqualTo(expectedTsv);
+        }
+    }
+
+    private String buildExpectedTsvForBasicTable(int rowCount) {
+        StringBuilder sb = new StringBuilder();
+        sb.append("a\tb\n");
+        for (int i = 0; i < rowCount / 3; i++) {
+            sb.append("1\tv1\n");
+            sb.append("2\tv2\n");
+            sb.append("3\tv3\n");
+        }
+        return sb.toString();
+    }
+
+    private String buildExpectedTsvForArrayTable(int rowCount) {
+        StringBuilder sb = new StringBuilder();
+        sb.append("a\tb\ttags\tscores\tembedding\n");
+        for (int i = 0; i < rowCount / 3; i++) {
+            
sb.append("1\tv1\t[\"tag1\",\"tag2\"]\t[10,20]\t[0.1,0.2,0.3,0.4]\n");
+            sb.append("2\tv2\t[\"tag3\"]\t[30,40,50]\t[0.5,0.6,0.7,0.8]\n");
+            
sb.append("3\tv3\t[\"tag4\",\"tag5\",\"tag6\"]\t[60]\t[0.9,1.0,1.1,1.2]\n");
         }
+        return sb.toString();
     }
 }

Reply via email to