This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 9cdc9b7ef [lake/lance] Lance lake writer and committer implementation 
(#1441)
9cdc9b7ef is described below

commit 9cdc9b7ef596996a86b45ec472a71d9dca42e68c
Author: xx789 <xx7896...@163.com>
AuthorDate: Mon Aug 25 10:51:19 2025 +0800

    [lake/lance] Lance lake writer and committer implementation (#1441)
---
 fluss-lake/fluss-lake-lance/pom.xml                |   2 +-
 .../com/alibaba/fluss/lake/lance/LanceConfig.java  |  54 +---
 .../alibaba/fluss/lake/lance/LanceLakeCatalog.java |  24 +-
 .../alibaba/fluss/lake/lance/LanceLakeStorage.java |   3 +-
 .../fluss/lake/lance/tiering/ArrowWriter.java      |  74 +++++
 .../fluss/lake/lance/tiering/LanceArrowWriter.java | 130 +++++++++
 .../lance/tiering/LanceCommittableSerializer.java  |  67 +++++
 .../lake/lance/tiering/LanceLakeCommitter.java     | 162 +++++++++++
 .../lance/tiering/LanceLakeTieringFactory.java     |  60 ++++
 .../fluss/lake/lance/tiering/LanceLakeWriter.java  |  94 ++++++
 .../lance/tiering/LanceWriteResultSerializer.java  |  55 ++++
 .../fluss/lake/lance/utils/LanceArrowUtils.java    |  88 ++++++
 .../lake/lance/utils/LanceDatasetAdapter.java      |  40 +++
 .../lake/lance/writers/ArrowBigIntWriter.java      |  52 ++++
 .../lake/lance/writers/ArrowBinaryWriter.java      |  57 ++++
 .../lake/lance/writers/ArrowBooleanWriter.java     |  54 ++++
 .../fluss/lake/lance/writers/ArrowDateWriter.java  |  54 ++++
 .../lake/lance/writers/ArrowDecimalWriter.java     |  70 +++++
 .../lake/lance/writers/ArrowDoubleWriter.java      |  54 ++++
 .../fluss/lake/lance/writers/ArrowFieldWriter.java |  63 ++++
 .../fluss/lake/lance/writers/ArrowFloatWriter.java |  53 ++++
 .../fluss/lake/lance/writers/ArrowIntWriter.java   |  54 ++++
 .../lake/lance/writers/ArrowSmallIntWriter.java    |  54 ++++
 .../fluss/lake/lance/writers/ArrowTimeWriter.java  |  90 ++++++
 .../lance/writers/ArrowTimestampLtzWriter.java     |  99 +++++++
 .../lance/writers/ArrowTimestampNtzWriter.java     |  97 +++++++
 .../lake/lance/writers/ArrowTinyIntWriter.java     |  56 ++++
 .../lake/lance/writers/ArrowVarBinaryWriter.java   |  52 ++++
 .../lake/lance/writers/ArrowVarCharWriter.java     |  56 ++++
 .../src/main/resources/META-INF/NOTICE             |   2 +-
 .../resources/META-INF/licenses/LICENSE.zstd-jni   |  26 --
 .../lake/lance/LakeEnabledTableCreateITCase.java   |  27 +-
 .../lance/testutils/FlinkLanceTieringTestBase.java | 219 ++++++++++++++
 .../lake/lance/tiering/LanceTieringITCase.java     | 148 ++++++++++
 .../fluss/lake/lance/tiering/LanceTieringTest.java | 318 +++++++++++++++++++++
 .../testutils/FlinkPaimonTieringTestBase.java      |   1 -
 36 files changed, 2497 insertions(+), 112 deletions(-)

diff --git a/fluss-lake/fluss-lake-lance/pom.xml 
b/fluss-lake/fluss-lake-lance/pom.xml
index 65715230d..960a3d4db 100644
--- a/fluss-lake/fluss-lake-lance/pom.xml
+++ b/fluss-lake/fluss-lake-lance/pom.xml
@@ -33,7 +33,7 @@
     <packaging>jar</packaging>
 
     <properties>
-        <lance.version>0.26.1</lance.version>
+        <lance.version>0.33.0</lance.version>
     </properties>
 
     <dependencies>
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/LanceConfig.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/LanceConfig.java
index 5d7218d89..240b387e6 100644
--- 
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/LanceConfig.java
+++ 
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/LanceConfig.java
@@ -32,17 +32,9 @@ public class LanceConfig implements Serializable {
 
     private static final String block_size = "block_size";
     private static final String version = "version";
-    private static final String index_cache_size = "index_cache_size";
-    private static final String metadata_cache_size = "metadata_cache_size";
     private static final String max_row_per_file = "max_row_per_file";
     private static final String max_rows_per_group = "max_rows_per_group";
     private static final String max_bytes_per_file = "max_bytes_per_file";
-    private static final String ak = "access_key_id";
-    private static final String sk = "secret_access_key";
-    private static final String endpoint = "aws_endpoint";
-    private static final String region = "aws_region";
-    private static final String virtual_hosted_style = 
"virtual_hosted_style_request";
-    private static final String allow_http = "allow_http";
     private static final String batch_size = "batch_size";
     private static final String warehouse = "warehouse";
 
@@ -64,11 +56,17 @@ public class LanceConfig implements Serializable {
     }
 
     public static LanceConfig from(
-            Map<String, String> properties, String databaseName, String 
tableName) {
-        if (!properties.containsKey(warehouse)) {
+            Map<String, String> clusterConf,
+            Map<String, String> tableCustomProperties,
+            String databaseName,
+            String tableName) {
+        if (!clusterConf.containsKey(warehouse)) {
             throw new IllegalArgumentException("Missing required option " + 
warehouse);
         }
-        return new LanceConfig(databaseName, tableName, 
properties.get(warehouse), properties);
+        Map<String, String> options = new HashMap<>();
+        options.putAll(clusterConf);
+        options.putAll(tableCustomProperties);
+        return new LanceConfig(databaseName, tableName, 
clusterConf.get(warehouse), options);
     }
 
     public static int getBatchSize(LanceConfig config) {
@@ -83,14 +81,6 @@ public class LanceConfig implements Serializable {
         return options;
     }
 
-    public String getDatabaseName() {
-        return databaseName;
-    }
-
-    public String getTableName() {
-        return tableName;
-    }
-
     public String getDatasetUri() {
         return datasetUri;
     }
@@ -104,12 +94,6 @@ public class LanceConfig implements Serializable {
         if (maps.containsKey(version)) {
             builder.setVersion(Integer.parseInt(maps.get(version)));
         }
-        if (maps.containsKey(index_cache_size)) {
-            
builder.setIndexCacheSize(Integer.parseInt(maps.get(index_cache_size)));
-        }
-        if (maps.containsKey(metadata_cache_size)) {
-            
builder.setMetadataCacheSize(Integer.parseInt(maps.get(metadata_cache_size)));
-        }
         builder.setStorageOptions(genStorageOptions(config));
         return builder.build();
     }
@@ -130,24 +114,8 @@ public class LanceConfig implements Serializable {
         return builder.build();
     }
 
-    private static Map<String, String> genStorageOptions(LanceConfig config) {
-        Map<String, String> storageOptions = new HashMap<>();
-        Map<String, String> maps = config.getOptions();
-        if (maps.containsKey(ak) && maps.containsKey(sk) && 
maps.containsKey(endpoint)) {
-            storageOptions.put(ak, maps.get(ak));
-            storageOptions.put(sk, maps.get(sk));
-            storageOptions.put(endpoint, maps.get(endpoint));
-        }
-        if (maps.containsKey(region)) {
-            storageOptions.put(region, maps.get(region));
-        }
-        if (maps.containsKey(virtual_hosted_style)) {
-            storageOptions.put(virtual_hosted_style, 
maps.get(virtual_hosted_style));
-        }
-        if (maps.containsKey(allow_http)) {
-            storageOptions.put(allow_http, maps.get(allow_http));
-        }
-        return storageOptions;
+    public static Map<String, String> genStorageOptions(LanceConfig config) {
+        return config.getOptions();
     }
 
     @Override
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/LanceLakeCatalog.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/LanceLakeCatalog.java
index 827c9582d..bb2df9389 100644
--- 
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/LanceLakeCatalog.java
+++ 
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/LanceLakeCatalog.java
@@ -26,31 +26,14 @@ import com.alibaba.fluss.metadata.TableDescriptor;
 import com.alibaba.fluss.metadata.TablePath;
 
 import com.lancedb.lance.WriteParams;
-import org.apache.arrow.vector.types.TimeUnit;
-import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.Schema;
 
 import java.util.ArrayList;
 import java.util.List;
 
-import static com.alibaba.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
-import static com.alibaba.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
-import static com.alibaba.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
-
 /** A Lance implementation of {@link LakeCatalog}. */
 public class LanceLakeCatalog implements LakeCatalog {
-    private static final List<Field> SYSTEM_COLUMNS = new ArrayList<>();
-
-    static {
-        SYSTEM_COLUMNS.add(Field.nullable(BUCKET_COLUMN_NAME, new 
ArrowType.Int(32, true)));
-        SYSTEM_COLUMNS.add(Field.nullable(OFFSET_COLUMN_NAME, new 
ArrowType.Int(64, true)));
-        SYSTEM_COLUMNS.add(
-                Field.nullable(
-                        TIMESTAMP_COLUMN_NAME,
-                        new ArrowType.Timestamp(TimeUnit.MICROSECOND, null)));
-    }
-
     private final Configuration options;
 
     public LanceLakeCatalog(Configuration config) {
@@ -67,7 +50,10 @@ public class LanceLakeCatalog implements LakeCatalog {
 
         LanceConfig config =
                 LanceConfig.from(
-                        options.toMap(), tablePath.getDatabaseName(), 
tablePath.getTableName());
+                        options.toMap(),
+                        tableDescriptor.getCustomProperties(),
+                        tablePath.getDatabaseName(),
+                        tablePath.getTableName());
         WriteParams params = LanceConfig.genWriteParamsFromConfig(config);
 
         List<Field> fields = new ArrayList<>();
@@ -75,8 +61,6 @@ public class LanceLakeCatalog implements LakeCatalog {
         fields.addAll(
                 
LanceArrowUtils.toArrowSchema(tableDescriptor.getSchema().getRowType())
                         .getFields());
-        // add system metadata columns to schema
-        fields.addAll(SYSTEM_COLUMNS);
         try {
             LanceDatasetAdapter.createDataset(config.getDatasetUri(), new 
Schema(fields), params);
         } catch (RuntimeException e) {
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/LanceLakeStorage.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/LanceLakeStorage.java
index 5864ccf77..c4b51cd9c 100644
--- 
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/LanceLakeStorage.java
+++ 
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/LanceLakeStorage.java
@@ -20,6 +20,7 @@ package com.alibaba.fluss.lake.lance;
 import com.alibaba.fluss.config.Configuration;
 import com.alibaba.fluss.lake.lakestorage.LakeStorage;
 import com.alibaba.fluss.lake.lance.tiering.LanceCommittable;
+import com.alibaba.fluss.lake.lance.tiering.LanceLakeTieringFactory;
 import com.alibaba.fluss.lake.lance.tiering.LanceWriteResult;
 import com.alibaba.fluss.lake.source.LakeSource;
 import com.alibaba.fluss.lake.writer.LakeTieringFactory;
@@ -35,7 +36,7 @@ public class LanceLakeStorage implements LakeStorage {
 
     @Override
     public LakeTieringFactory<LanceWriteResult, LanceCommittable> 
createLakeTieringFactory() {
-        throw new UnsupportedOperationException("Not implemented");
+        return new LanceLakeTieringFactory(config);
     }
 
     @Override
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/ArrowWriter.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/ArrowWriter.java
new file mode 100644
index 000000000..053b348b3
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/ArrowWriter.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.tiering;
+
+import com.alibaba.fluss.lake.lance.utils.LanceArrowUtils;
+import com.alibaba.fluss.lake.lance.writers.ArrowFieldWriter;
+import com.alibaba.fluss.row.InternalRow;
+import com.alibaba.fluss.types.RowType;
+
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+
+/** An Arrow writer for {@link InternalRow}. */
+public class ArrowWriter {
+    private final VectorSchemaRoot root;
+
+    private final ArrowFieldWriter<InternalRow>[] fieldWriters;
+
+    private int recordsCount;
+
+    /**
+     * Writer which serializes the Fluss rows to Arrow record batches.
+     *
+     * @param fieldWriters An array of writers which are responsible for the 
serialization of each
+     *     column of the rows
+     * @param root Container that holds a set of vectors for the rows
+     */
+    public ArrowWriter(ArrowFieldWriter<InternalRow>[] fieldWriters, 
VectorSchemaRoot root) {
+        this.fieldWriters = fieldWriters;
+        this.root = root;
+    }
+
+    public static ArrowWriter create(VectorSchemaRoot root, RowType rowType) {
+        ArrowFieldWriter<InternalRow>[] fieldWriters =
+                new ArrowFieldWriter[root.getFieldVectors().size()];
+        for (int i = 0; i < fieldWriters.length; i++) {
+            FieldVector fieldVector = root.getVector(i);
+
+            fieldWriters[i] =
+                    LanceArrowUtils.createArrowFieldWriter(fieldVector, 
rowType.getTypeAt(i));
+        }
+        return new ArrowWriter(fieldWriters, root);
+    }
+
+    /** Writes the specified row which is serialized into Arrow format. */
+    public void writeRow(InternalRow row) {
+        for (int i = 0; i < fieldWriters.length; i++) {
+            fieldWriters[i].write(row, i, true);
+        }
+        recordsCount++;
+    }
+
+    public void finish() {
+        root.setRowCount(recordsCount);
+        for (ArrowFieldWriter<InternalRow> fieldWriter : fieldWriters) {
+            fieldWriter.finish();
+        }
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceArrowWriter.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceArrowWriter.java
new file mode 100644
index 000000000..e2d0f4944
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceArrowWriter.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.tiering;
+
+import com.alibaba.fluss.record.LogRecord;
+import com.alibaba.fluss.types.RowType;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+import java.io.IOException;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static com.alibaba.fluss.utils.Preconditions.checkArgument;
+import static com.alibaba.fluss.utils.Preconditions.checkNotNull;
+
+/** A custom arrow reader that supports writes Fluss internal rows while 
reading data in batches. */
+public class LanceArrowWriter extends ArrowReader {
+    private final Schema schema;
+    private final RowType rowType;
+    private final int batchSize;
+
+    private volatile boolean finished;
+
+    private final AtomicLong totalBytesRead = new AtomicLong();
+    private ArrowWriter arrowWriter = null;
+    private final AtomicInteger count = new AtomicInteger(0);
+    private final Semaphore writeToken;
+    private final Semaphore loadToken;
+
+    public LanceArrowWriter(
+            BufferAllocator allocator, Schema schema, int batchSize, RowType 
rowType) {
+        super(allocator);
+        checkNotNull(schema);
+        checkArgument(batchSize > 0);
+        this.schema = schema;
+        this.rowType = rowType;
+        this.batchSize = batchSize;
+        this.writeToken = new Semaphore(0);
+        this.loadToken = new Semaphore(0);
+    }
+
+    void write(LogRecord row) {
+        checkNotNull(row);
+        try {
+            // wait util prepareLoadNextBatch to release write token,
+            writeToken.acquire();
+            arrowWriter.writeRow(row.getRow());
+            if (count.incrementAndGet() == batchSize) {
+                // notify loadNextBatch to take the batch
+                loadToken.release();
+            }
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    void setFinished() {
+        loadToken.release();
+        finished = true;
+    }
+
+    @Override
+    public void prepareLoadNextBatch() throws IOException {
+        super.prepareLoadNextBatch();
+        arrowWriter = ArrowWriter.create(this.getVectorSchemaRoot(), rowType);
+        // release batch size token for write
+        writeToken.release(batchSize);
+    }
+
+    @Override
+    public boolean loadNextBatch() throws IOException {
+        prepareLoadNextBatch();
+        try {
+            if (finished && count.get() == 0) {
+                return false;
+            }
+            // wait util batch if full or finished
+            loadToken.acquire();
+            arrowWriter.finish();
+            if (!finished) {
+                count.set(0);
+                return true;
+            } else {
+                // true if it has some rows and return false if there is no 
record
+                if (count.get() > 0) {
+                    count.set(0);
+                    return true;
+                } else {
+                    return false;
+                }
+            }
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public long bytesRead() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    protected synchronized void closeReadSource() throws IOException {
+        // Implement if needed
+    }
+
+    @Override
+    protected Schema readSchema() {
+        return this.schema;
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceCommittableSerializer.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceCommittableSerializer.java
new file mode 100644
index 000000000..abe7efc83
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceCommittableSerializer.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.tiering;
+
+import com.alibaba.fluss.lake.serializer.SimpleVersionedSerializer;
+
+import com.lancedb.lance.FragmentMetadata;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.List;
+
+/** The serializer of {@link LanceCommittable}. */
+public class LanceCommittableSerializer implements 
SimpleVersionedSerializer<LanceCommittable> {
+    private static final int CURRENT_VERSION = 1;
+
+    @Override
+    public int getVersion() {
+        return CURRENT_VERSION;
+    }
+
+    @Override
+    public byte[] serialize(LanceCommittable lanceCommittable) throws 
IOException {
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                ObjectOutputStream oos = new ObjectOutputStream(baos)) {
+            oos.writeObject(lanceCommittable.committable());
+            return baos.toByteArray();
+        }
+    }
+
+    @Override
+    public LanceCommittable deserialize(int version, byte[] serialized) throws 
IOException {
+        if (version != CURRENT_VERSION) {
+            throw new UnsupportedOperationException(
+                    "Expecting LanceCommittable version to be "
+                            + CURRENT_VERSION
+                            + ", but found "
+                            + version
+                            + ".");
+        }
+        try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
+                ObjectInputStream ois = new ObjectInputStream(bais)) {
+            //noinspection unchecked
+            return new LanceCommittable((List<FragmentMetadata>) 
ois.readObject());
+        } catch (ClassNotFoundException e) {
+            throw new IOException("Couldn't deserialize LanceCommittable", e);
+        }
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceLakeCommitter.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceLakeCommitter.java
new file mode 100644
index 000000000..3eae2c792
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceLakeCommitter.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.tiering;
+
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.lake.committer.BucketOffset;
+import com.alibaba.fluss.lake.committer.CommittedLakeSnapshot;
+import com.alibaba.fluss.lake.committer.LakeCommitter;
+import com.alibaba.fluss.lake.lance.LanceConfig;
+import com.alibaba.fluss.lake.lance.utils.LanceDatasetAdapter;
+import com.alibaba.fluss.metadata.TablePath;
+import 
com.alibaba.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
com.alibaba.fluss.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import com.alibaba.fluss.utils.json.BucketOffsetJsonSerde;
+import com.alibaba.fluss.utils.types.Tuple2;
+
+import com.lancedb.lance.Dataset;
+import com.lancedb.lance.FragmentMetadata;
+import com.lancedb.lance.ReadOptions;
+import com.lancedb.lance.Transaction;
+import com.lancedb.lance.Version;
+import org.apache.arrow.memory.RootAllocator;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static 
com.alibaba.fluss.lake.committer.BucketOffset.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
+import static 
com.alibaba.fluss.lake.writer.LakeTieringFactory.FLUSS_LAKE_TIERING_COMMIT_USER;
+
+/** Implementation of {@link LakeCommitter} for Lance. */
+public class LanceLakeCommitter implements LakeCommitter<LanceWriteResult, 
LanceCommittable> {
+    private final LanceConfig config;
+    private static final String committerName = "commit-user";
+    private final RootAllocator allocator = new RootAllocator();
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    public LanceLakeCommitter(Configuration options, TablePath tablePath) {
+        this.config =
+                LanceConfig.from(
+                        options.toMap(),
+                        Collections.emptyMap(),
+                        tablePath.getDatabaseName(),
+                        tablePath.getTableName());
+    }
+
+    @Override
+    public LanceCommittable toCommittable(List<LanceWriteResult> 
lanceWriteResults)
+            throws IOException {
+        List<FragmentMetadata> fragments =
+                lanceWriteResults.stream()
+                        .map(LanceWriteResult::commitMessage)
+                        .flatMap(List::stream)
+                        .collect(Collectors.toList());
+        return new LanceCommittable(fragments);
+    }
+
+    @Override
+    public long commit(LanceCommittable committable, Map<String, String> 
snapshotProperties)
+            throws IOException {
+        Map<String, String> properties = new HashMap<>(snapshotProperties);
+        properties.put(committerName, FLUSS_LAKE_TIERING_COMMIT_USER);
+        return LanceDatasetAdapter.commitAppend(config, 
committable.committable(), properties);
+    }
+
+    @Override
+    public void abort(LanceCommittable committable) throws IOException {
+        // TODO lance does not have the API to proactively delete the written 
files yet, see
+        // https://github.com/lancedb/lance/issues/4508
+    }
+
+    @SuppressWarnings("checkstyle:LocalVariableName")
+    @Nullable
+    @Override
+    public CommittedLakeSnapshot getMissingLakeSnapshot(@Nullable Long 
latestLakeSnapshotIdOfFluss)
+            throws IOException {
+        Tuple2<Version, Transaction> latestLakeSnapshotIdOfLake =
+                
getCommittedLatestSnapshotOfLake(FLUSS_LAKE_TIERING_COMMIT_USER);
+
+        if (latestLakeSnapshotIdOfLake == null) {
+            return null;
+        }
+
+        // we get the latest snapshot committed by fluss,
+        // but the latest snapshot is not greater than 
latestLakeSnapshotIdOfFluss, no any missing
+        // snapshot, return directly
+        if (latestLakeSnapshotIdOfFluss != null
+                && latestLakeSnapshotIdOfLake.f0.getId() <= 
latestLakeSnapshotIdOfFluss) {
+            return null;
+        }
+
+        CommittedLakeSnapshot committedLakeSnapshot =
+                new 
CommittedLakeSnapshot(latestLakeSnapshotIdOfLake.f0.getId());
+        String flussOffsetProperties =
+                latestLakeSnapshotIdOfLake
+                        .f1
+                        .transactionProperties()
+                        .get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY);
+        for (JsonNode node : OBJECT_MAPPER.readTree(flussOffsetProperties)) {
+            BucketOffset bucketOffset = 
BucketOffsetJsonSerde.INSTANCE.deserialize(node);
+            if (bucketOffset.getPartitionId() != null) {
+                committedLakeSnapshot.addPartitionBucket(
+                        bucketOffset.getPartitionId(),
+                        bucketOffset.getPartitionQualifiedName(),
+                        bucketOffset.getBucket(),
+                        bucketOffset.getLogOffset());
+            } else {
+                committedLakeSnapshot.addBucket(
+                        bucketOffset.getBucket(), bucketOffset.getLogOffset());
+            }
+        }
+        return committedLakeSnapshot;
+    }
+
+    @Nullable
+    private Tuple2<Version, Transaction> 
getCommittedLatestSnapshotOfLake(String commitUser) {
+        ReadOptions.Builder builder = new ReadOptions.Builder();
+        builder.setStorageOptions(LanceConfig.genStorageOptions(config));
+        try (Dataset dataset = Dataset.open(allocator, config.getDatasetUri(), 
builder.build())) {
+            List<Version> versions = dataset.listVersions();
+            for (int i = versions.size() - 1; i >= 0; i--) {
+                Version version = versions.get(i);
+                builder.setVersion((int) version.getId());
+                try (Dataset datasetRead =
+                        Dataset.open(allocator, config.getDatasetUri(), 
builder.build())) {
+                    Transaction transaction = 
datasetRead.readTransaction().orElse(null);
+                    if (transaction != null
+                            && commitUser.equals(
+                                    
transaction.transactionProperties().get(committerName))) {
+                        return Tuple2.of(version, transaction);
+                    }
+                }
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public void close() throws Exception {
+        allocator.close();
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceLakeTieringFactory.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceLakeTieringFactory.java
new file mode 100644
index 000000000..1bd0894b2
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceLakeTieringFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.tiering;
+
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.lake.committer.CommitterInitContext;
+import com.alibaba.fluss.lake.committer.LakeCommitter;
+import com.alibaba.fluss.lake.serializer.SimpleVersionedSerializer;
+import com.alibaba.fluss.lake.writer.LakeTieringFactory;
+import com.alibaba.fluss.lake.writer.LakeWriter;
+import com.alibaba.fluss.lake.writer.WriterInitContext;
+
+import java.io.IOException;
+
+/** Implementation of {@link LakeTieringFactory} for Lance . */
+public class LanceLakeTieringFactory
+        implements LakeTieringFactory<LanceWriteResult, LanceCommittable> {
+    private final Configuration config;
+
+    public LanceLakeTieringFactory(Configuration config) {
+        this.config = config;
+    }
+
+    @Override
+    public LakeWriter<LanceWriteResult> createLakeWriter(WriterInitContext 
writerInitContext)
+            throws IOException {
+        return new LanceLakeWriter(config, writerInitContext);
+    }
+
+    @Override
+    public SimpleVersionedSerializer<LanceWriteResult> 
getWriteResultSerializer() {
+        return new LanceWriteResultSerializer();
+    }
+
+    @Override
+    public LakeCommitter<LanceWriteResult, LanceCommittable> 
createLakeCommitter(
+            CommitterInitContext committerInitContext) throws IOException {
+        return new LanceLakeCommitter(config, 
committerInitContext.tablePath());
+    }
+
+    @Override
+    public SimpleVersionedSerializer<LanceCommittable> 
getCommittableSerializer() {
+        return new LanceCommittableSerializer();
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceLakeWriter.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceLakeWriter.java
new file mode 100644
index 000000000..73c0fdfdb
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceLakeWriter.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.tiering;
+
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.lake.lance.LanceConfig;
+import com.alibaba.fluss.lake.lance.utils.LanceDatasetAdapter;
+import com.alibaba.fluss.lake.writer.LakeWriter;
+import com.alibaba.fluss.lake.writer.WriterInitContext;
+import com.alibaba.fluss.record.LogRecord;
+
+import com.lancedb.lance.FragmentMetadata;
+import com.lancedb.lance.WriteParams;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.FutureTask;
+
+/** Implementation of {@link LakeWriter} for Lance. */
+public class LanceLakeWriter implements LakeWriter<LanceWriteResult> {
+    private final LanceArrowWriter arrowWriter;
+    private final FutureTask<List<FragmentMetadata>> fragmentCreationTask;
+
+    public LanceLakeWriter(Configuration options, WriterInitContext 
writerInitContext)
+            throws IOException {
+        LanceConfig config =
+                LanceConfig.from(
+                        options.toMap(),
+                        writerInitContext.customProperties(),
+                        writerInitContext.tablePath().getDatabaseName(),
+                        writerInitContext.tablePath().getTableName());
+        int batchSize = LanceConfig.getBatchSize(config);
+        Optional<Schema> schema = LanceDatasetAdapter.getSchema(config);
+        if (!schema.isPresent()) {
+            throw new IOException("Fail to get dataset " + 
config.getDatasetUri() + " in Lance.");
+        }
+
+        this.arrowWriter =
+                LanceDatasetAdapter.getArrowWriter(
+                        schema.get(), batchSize, 
writerInitContext.schema().getRowType());
+
+        WriteParams params = LanceConfig.genWriteParamsFromConfig(config);
+        Callable<List<FragmentMetadata>> fragmentCreator =
+                () ->
+                        LanceDatasetAdapter.createFragment(
+                                config.getDatasetUri(), arrowWriter, params);
+        fragmentCreationTask = new FutureTask<>(fragmentCreator);
+        Thread fragmentCreationThread = new Thread(fragmentCreationTask);
+        fragmentCreationThread.start();
+    }
+
+    @Override
+    public void write(LogRecord record) throws IOException {
+        arrowWriter.write(record);
+    }
+
+    @Override
+    public LanceWriteResult complete() throws IOException {
+        arrowWriter.setFinished();
+        try {
+            List<FragmentMetadata> fragmentMetadata = 
fragmentCreationTask.get();
+            return new LanceWriteResult(fragmentMetadata);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new IOException("Interrupted while waiting for reader thread 
to finish", e);
+        } catch (ExecutionException e) {
+            throw new IOException("Exception in reader thread", e);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        arrowWriter.close();
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceWriteResultSerializer.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceWriteResultSerializer.java
new file mode 100644
index 000000000..81c1b8450
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceWriteResultSerializer.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.tiering;
+
+import com.alibaba.fluss.lake.serializer.SimpleVersionedSerializer;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+/** The {@link SimpleVersionedSerializer} for {@link LanceWriteResult}. */
+public class LanceWriteResultSerializer implements 
SimpleVersionedSerializer<LanceWriteResult> {
+    private static final int CURRENT_VERSION = 1;
+
+    @Override
+    public int getVersion() {
+        return CURRENT_VERSION;
+    }
+
+    @Override
+    public byte[] serialize(LanceWriteResult lanceWriteResult) throws 
IOException {
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                ObjectOutputStream oos = new ObjectOutputStream(baos); ) {
+            oos.writeObject(lanceWriteResult);
+            return baos.toByteArray();
+        }
+    }
+
+    @Override
+    public LanceWriteResult deserialize(int version, byte[] serialized) throws 
IOException {
+        try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
+                ObjectInputStream ois = new ObjectInputStream(bais)) {
+            return (LanceWriteResult) ois.readObject();
+        } catch (ClassNotFoundException e) {
+            throw new IOException("Couldn't deserialize LanceWriteResult", e);
+        }
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/utils/LanceArrowUtils.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/utils/LanceArrowUtils.java
index cf92952c9..4de499ffe 100644
--- 
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/utils/LanceArrowUtils.java
+++ 
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/utils/LanceArrowUtils.java
@@ -17,6 +17,23 @@
 
 package com.alibaba.fluss.lake.lance.utils;
 
+import com.alibaba.fluss.lake.lance.writers.ArrowBigIntWriter;
+import com.alibaba.fluss.lake.lance.writers.ArrowBinaryWriter;
+import com.alibaba.fluss.lake.lance.writers.ArrowBooleanWriter;
+import com.alibaba.fluss.lake.lance.writers.ArrowDateWriter;
+import com.alibaba.fluss.lake.lance.writers.ArrowDecimalWriter;
+import com.alibaba.fluss.lake.lance.writers.ArrowDoubleWriter;
+import com.alibaba.fluss.lake.lance.writers.ArrowFieldWriter;
+import com.alibaba.fluss.lake.lance.writers.ArrowFloatWriter;
+import com.alibaba.fluss.lake.lance.writers.ArrowIntWriter;
+import com.alibaba.fluss.lake.lance.writers.ArrowSmallIntWriter;
+import com.alibaba.fluss.lake.lance.writers.ArrowTimeWriter;
+import com.alibaba.fluss.lake.lance.writers.ArrowTimestampLtzWriter;
+import com.alibaba.fluss.lake.lance.writers.ArrowTimestampNtzWriter;
+import com.alibaba.fluss.lake.lance.writers.ArrowTinyIntWriter;
+import com.alibaba.fluss.lake.lance.writers.ArrowVarBinaryWriter;
+import com.alibaba.fluss.lake.lance.writers.ArrowVarCharWriter;
+import com.alibaba.fluss.row.InternalRow;
 import com.alibaba.fluss.types.BigIntType;
 import com.alibaba.fluss.types.BinaryType;
 import com.alibaba.fluss.types.BooleanType;
@@ -37,6 +54,24 @@ import com.alibaba.fluss.types.TimeType;
 import com.alibaba.fluss.types.TimestampType;
 import com.alibaba.fluss.types.TinyIntType;
 
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.arrow.vector.FixedSizeBinaryVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.TimeMicroVector;
+import org.apache.arrow.vector.TimeMilliVector;
+import org.apache.arrow.vector.TimeNanoVector;
+import org.apache.arrow.vector.TimeSecVector;
+import org.apache.arrow.vector.TimeStampVector;
+import org.apache.arrow.vector.TinyIntVector;
+import org.apache.arrow.vector.ValueVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
 import org.apache.arrow.vector.types.DateUnit;
 import org.apache.arrow.vector.types.FloatingPointPrecision;
 import org.apache.arrow.vector.types.TimeUnit;
@@ -187,4 +222,57 @@ public class LanceArrowUtils {
                             "Unsupported data type %s currently.", 
dataType.asSummaryString()));
         }
     }
+
+    private static int getPrecision(DecimalVector decimalVector) {
+        return decimalVector.getPrecision();
+    }
+
+    public static ArrowFieldWriter<InternalRow> createArrowFieldWriter(
+            ValueVector vector, DataType dataType) {
+        if (vector instanceof TinyIntVector) {
+            return ArrowTinyIntWriter.forField((TinyIntVector) vector);
+        } else if (vector instanceof SmallIntVector) {
+            return ArrowSmallIntWriter.forField((SmallIntVector) vector);
+        } else if (vector instanceof IntVector) {
+            return ArrowIntWriter.forField((IntVector) vector);
+        } else if (vector instanceof BigIntVector) {
+            return ArrowBigIntWriter.forField((BigIntVector) vector);
+        } else if (vector instanceof BitVector) {
+            return ArrowBooleanWriter.forField((BitVector) vector);
+        } else if (vector instanceof Float4Vector) {
+            return ArrowFloatWriter.forField((Float4Vector) vector);
+        } else if (vector instanceof Float8Vector) {
+            return ArrowDoubleWriter.forField((Float8Vector) vector);
+        } else if (vector instanceof VarCharVector) {
+            return ArrowVarCharWriter.forField((VarCharVector) vector);
+        } else if (vector instanceof FixedSizeBinaryVector) {
+            return ArrowBinaryWriter.forField((FixedSizeBinaryVector) vector);
+        } else if (vector instanceof VarBinaryVector) {
+            return ArrowVarBinaryWriter.forField((VarBinaryVector) vector);
+        } else if (vector instanceof DecimalVector) {
+            DecimalVector decimalVector = (DecimalVector) vector;
+            return ArrowDecimalWriter.forField(
+                    decimalVector, getPrecision(decimalVector), 
decimalVector.getScale());
+        } else if (vector instanceof DateDayVector) {
+            return ArrowDateWriter.forField((DateDayVector) vector);
+        } else if (vector instanceof TimeSecVector
+                || vector instanceof TimeMilliVector
+                || vector instanceof TimeMicroVector
+                || vector instanceof TimeNanoVector) {
+            return ArrowTimeWriter.forField(vector);
+        } else if (vector instanceof TimeStampVector
+                && ((ArrowType.Timestamp) 
vector.getField().getType()).getTimezone() == null) {
+            int precision;
+            if (dataType instanceof LocalZonedTimestampType) {
+                precision = ((LocalZonedTimestampType) 
dataType).getPrecision();
+                return ArrowTimestampLtzWriter.forField(vector, precision);
+            } else {
+                precision = ((TimestampType) dataType).getPrecision();
+                return ArrowTimestampNtzWriter.forField(vector, precision);
+            }
+        } else {
+            throw new UnsupportedOperationException(
+                    String.format("Unsupported type %s.", dataType));
+        }
+    }
 }
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/utils/LanceDatasetAdapter.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/utils/LanceDatasetAdapter.java
index ed50abcc9..54ddefbef 100644
--- 
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/utils/LanceDatasetAdapter.java
+++ 
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/utils/LanceDatasetAdapter.java
@@ -18,14 +18,25 @@
 package com.alibaba.fluss.lake.lance.utils;
 
 import com.alibaba.fluss.lake.lance.LanceConfig;
+import com.alibaba.fluss.lake.lance.tiering.LanceArrowWriter;
+import com.alibaba.fluss.types.RowType;
 
 import com.lancedb.lance.Dataset;
+import com.lancedb.lance.Fragment;
+import com.lancedb.lance.FragmentMetadata;
 import com.lancedb.lance.ReadOptions;
+import com.lancedb.lance.Transaction;
 import com.lancedb.lance.WriteParams;
+import com.lancedb.lance.operation.Append;
+import org.apache.arrow.c.ArrowArrayStream;
+import org.apache.arrow.c.Data;
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.ipc.ArrowReader;
 import org.apache.arrow.vector.types.pojo.Schema;
 
+import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 
 /** Lance dataset API adapter. */
@@ -46,4 +57,33 @@ public class LanceDatasetAdapter {
             return Optional.empty();
         }
     }
+
+    public static long commitAppend(
+            LanceConfig config, List<FragmentMetadata> fragments, Map<String, 
String> properties) {
+        String uri = config.getDatasetUri();
+        ReadOptions options = LanceConfig.genReadOptionFromConfig(config);
+        try (Dataset dataset = Dataset.open(allocator, uri, options)) {
+            Transaction transaction =
+                    dataset.newTransactionBuilder()
+                            
.operation(Append.builder().fragments(fragments).build())
+                            .transactionProperties(properties)
+                            .build();
+            try (Dataset appendedDataset = transaction.commit()) {
+                // note: lance dataset version starts from 1
+                return appendedDataset.version();
+            }
+        }
+    }
+
+    public static LanceArrowWriter getArrowWriter(Schema schema, int 
batchSize, RowType rowType) {
+        return new LanceArrowWriter(allocator, schema, batchSize, rowType);
+    }
+
+    public static List<FragmentMetadata> createFragment(
+            String datasetUri, ArrowReader reader, WriteParams params) {
+        try (ArrowArrayStream arrowStream = 
ArrowArrayStream.allocateNew(allocator)) {
+            Data.exportArrayStream(allocator, reader, arrowStream);
+            return Fragment.create(datasetUri, arrowStream, params);
+        }
+    }
 }
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowBigIntWriter.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowBigIntWriter.java
new file mode 100644
index 000000000..55026de5b
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowBigIntWriter.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.writers;
+
+import com.alibaba.fluss.row.InternalRow;
+
+import org.apache.arrow.vector.BigIntVector;
+
+/** {@link ArrowFieldWriter} for BigInt. */
+public class ArrowBigIntWriter extends ArrowFieldWriter<InternalRow> {
+
+    public static ArrowBigIntWriter forField(BigIntVector bigIntVector) {
+        return new ArrowBigIntWriter(bigIntVector);
+    }
+
+    private ArrowBigIntWriter(BigIntVector bigIntVector) {
+        super(bigIntVector);
+    }
+
+    @Override
+    public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
+        BigIntVector vector = (BigIntVector) getValueVector();
+        if (isNullAt(row, ordinal)) {
+            vector.setNull(getCount());
+        } else if (handleSafe) {
+            vector.setSafe(getCount(), readLong(row, ordinal));
+        }
+    }
+
+    private boolean isNullAt(InternalRow row, int ordinal) {
+        return row.isNullAt(ordinal);
+    }
+
+    private long readLong(InternalRow row, int ordinal) {
+        return row.getLong(ordinal);
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowBinaryWriter.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowBinaryWriter.java
new file mode 100644
index 000000000..b33fd83f4
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowBinaryWriter.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.writers;
+
+import com.alibaba.fluss.row.InternalRow;
+
+import org.apache.arrow.vector.FixedSizeBinaryVector;
+
+/** {@link ArrowFieldWriter} for Binary. */
+public class ArrowBinaryWriter extends ArrowFieldWriter<InternalRow> {
+
+    public static ArrowBinaryWriter forField(FixedSizeBinaryVector 
binaryVector) {
+        return new ArrowBinaryWriter(binaryVector);
+    }
+
+    private final int byteWidth;
+
+    private ArrowBinaryWriter(FixedSizeBinaryVector binaryVector) {
+        super(binaryVector);
+        this.byteWidth = binaryVector.getByteWidth();
+    }
+
+    @Override
+    public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
+        FixedSizeBinaryVector vector = (FixedSizeBinaryVector) 
getValueVector();
+        if (isNullAt(row, ordinal)) {
+            vector.setNull(getCount());
+        } else if (handleSafe) {
+            vector.setSafe(getCount(), readBinary(row, ordinal));
+        } else {
+            vector.set(getCount(), readBinary(row, ordinal));
+        }
+    }
+
+    private boolean isNullAt(InternalRow row, int ordinal) {
+        return row.isNullAt(ordinal);
+    }
+
+    private byte[] readBinary(InternalRow row, int ordinal) {
+        return row.getBinary(ordinal, byteWidth);
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowBooleanWriter.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowBooleanWriter.java
new file mode 100644
index 000000000..80b8e79d8
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowBooleanWriter.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.writers;
+
+import com.alibaba.fluss.row.InternalRow;
+
+import org.apache.arrow.vector.BitVector;
+
+/** {@link ArrowFieldWriter} for Boolean. */
+public class ArrowBooleanWriter extends ArrowFieldWriter<InternalRow> {
+
+    public static ArrowBooleanWriter forField(BitVector booleanVector) {
+        return new ArrowBooleanWriter(booleanVector);
+    }
+
+    private ArrowBooleanWriter(BitVector bitVector) {
+        super(bitVector);
+    }
+
+    @Override
+    public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
+        BitVector vector = (BitVector) getValueVector();
+        if (isNullAt(row, ordinal)) {
+            vector.setNull(getCount());
+        } else if (handleSafe) {
+            vector.setSafe(getCount(), readBoolean(row, ordinal) ? 1 : 0);
+        } else {
+            vector.set(getCount(), readBoolean(row, ordinal) ? 1 : 0);
+        }
+    }
+
+    private boolean isNullAt(InternalRow row, int ordinal) {
+        return row.isNullAt(ordinal);
+    }
+
+    private boolean readBoolean(InternalRow row, int ordinal) {
+        return row.getBoolean(ordinal);
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowDateWriter.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowDateWriter.java
new file mode 100644
index 000000000..141df39a8
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowDateWriter.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.writers;
+
+import com.alibaba.fluss.row.InternalRow;
+
+import org.apache.arrow.vector.DateDayVector;
+
+/** {@link ArrowFieldWriter} for Date. */
+public class ArrowDateWriter extends ArrowFieldWriter<InternalRow> {
+
+    public static ArrowDateWriter forField(DateDayVector dateDayVector) {
+        return new ArrowDateWriter(dateDayVector);
+    }
+
+    private ArrowDateWriter(DateDayVector dateDayVector) {
+        super(dateDayVector);
+    }
+
+    @Override
+    public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
+        DateDayVector vector = (DateDayVector) getValueVector();
+        if (isNullAt(row, ordinal)) {
+            vector.setNull(getCount());
+        } else if (handleSafe) {
+            vector.setSafe(getCount(), readDate(row, ordinal));
+        } else {
+            vector.set(getCount(), readDate(row, ordinal));
+        }
+    }
+
+    private boolean isNullAt(InternalRow row, int ordinal) {
+        return row.isNullAt(ordinal);
+    }
+
+    private int readDate(InternalRow row, int ordinal) {
+        return row.getInt(ordinal);
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowDecimalWriter.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowDecimalWriter.java
new file mode 100644
index 000000000..f4e2e4ab9
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowDecimalWriter.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.writers;
+
+import com.alibaba.fluss.row.Decimal;
+import com.alibaba.fluss.row.InternalRow;
+
+import org.apache.arrow.vector.DecimalVector;
+
+import java.math.BigDecimal;
+
+/** {@link ArrowFieldWriter} for Decimal. */
+public class ArrowDecimalWriter extends ArrowFieldWriter<InternalRow> {
+
+    public static ArrowDecimalWriter forField(
+            DecimalVector decimalVector, int precision, int scale) {
+        return new ArrowDecimalWriter(decimalVector, precision, scale);
+    }
+
+    private final int precision;
+    private final int scale;
+
+    private ArrowDecimalWriter(DecimalVector decimalVector, int precision, int 
scale) {
+        super(decimalVector);
+        this.precision = precision;
+        this.scale = scale;
+    }
+
+    @Override
+    public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
+        DecimalVector vector = (DecimalVector) getValueVector();
+        if (isNullAt(row, ordinal)) {
+            vector.setNull(getCount());
+        } else {
+            BigDecimal bigDecimal = readDecimal(row, ordinal).toBigDecimal();
+            if (bigDecimal == null) {
+                vector.setNull(getCount());
+            } else {
+                if (handleSafe) {
+                    vector.setSafe(getCount(), bigDecimal);
+                } else {
+                    vector.set(getCount(), bigDecimal);
+                }
+            }
+        }
+    }
+
+    private boolean isNullAt(InternalRow row, int ordinal) {
+        return row.isNullAt(ordinal);
+    }
+
+    private Decimal readDecimal(InternalRow row, int ordinal) {
+        return row.getDecimal(ordinal, precision, scale);
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowDoubleWriter.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowDoubleWriter.java
new file mode 100644
index 000000000..f3d54bdf3
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowDoubleWriter.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.writers;
+
+import com.alibaba.fluss.row.InternalRow;
+
+import org.apache.arrow.vector.Float8Vector;
+
+/** {@link ArrowFieldWriter} for Double. */
+public class ArrowDoubleWriter extends ArrowFieldWriter<InternalRow> {
+
+    public static ArrowDoubleWriter forField(Float8Vector doubleVector) {
+        return new ArrowDoubleWriter(doubleVector);
+    }
+
+    private ArrowDoubleWriter(Float8Vector doubleVector) {
+        super(doubleVector);
+    }
+
+    @Override
+    public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
+        Float8Vector vector = (Float8Vector) getValueVector();
+        if (isNullAt(row, ordinal)) {
+            vector.setNull(getCount());
+        } else if (handleSafe) {
+            vector.setSafe(getCount(), readDouble(row, ordinal));
+        } else {
+            vector.set(getCount(), readDouble(row, ordinal));
+        }
+    }
+
+    private boolean isNullAt(InternalRow row, int ordinal) {
+        return row.isNullAt(ordinal);
+    }
+
+    private double readDouble(InternalRow row, int ordinal) {
+        return row.getDouble(ordinal);
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowFieldWriter.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowFieldWriter.java
new file mode 100644
index 000000000..0278d2b29
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowFieldWriter.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.writers;
+
+import org.apache.arrow.vector.ValueVector;
+
+import static com.alibaba.fluss.utils.Preconditions.checkNotNull;
+
+/**
+ * Base class for arrow field writer which is used to convert a field to an 
Arrow format.
+ *
+ * @param <IN> Type of the input to write. Currently, it's always InternalRow, 
may support
+ *     InternalArray in the future.
+ */
+public abstract class ArrowFieldWriter<IN> {
+
+    /** Container which is used to store the written sequence of values of a 
column. */
+    private final ValueVector valueVector;
+
+    private int count;
+
+    public ArrowFieldWriter(ValueVector valueVector) {
+        this.valueVector = checkNotNull(valueVector);
+    }
+
+    /** Returns the underlying container which stores the sequence of values 
of a column. */
+    public ValueVector getValueVector() {
+        return valueVector;
+    }
+
+    /** Returns the current count of elements written. */
+    public int getCount() {
+        return count;
+    }
+
+    /** Sets the field value as the field at the specified ordinal of the 
specified row. */
+    public abstract void doWrite(IN row, int ordinal, boolean handleSafe);
+
+    /** Writes the specified ordinal of the specified row. */
+    public void write(IN row, int ordinal, boolean handleSafe) {
+        doWrite(row, ordinal, handleSafe);
+        count++;
+    }
+
+    public void finish() {
+        valueVector.setValueCount(count);
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowFloatWriter.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowFloatWriter.java
new file mode 100644
index 000000000..e971e7735
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowFloatWriter.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.writers;
+
+import com.alibaba.fluss.row.InternalRow;
+
+import org.apache.arrow.vector.Float4Vector;
+
+/** {@link ArrowFieldWriter} for Float. */
+public class ArrowFloatWriter extends ArrowFieldWriter<InternalRow> {
+    public static ArrowFloatWriter forField(Float4Vector float4Vector) {
+        return new ArrowFloatWriter(float4Vector);
+    }
+
+    private ArrowFloatWriter(Float4Vector float4Vector) {
+        super(float4Vector);
+    }
+
+    @Override
+    public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
+        Float4Vector vector = (Float4Vector) getValueVector();
+        if (isNullAt(row, ordinal)) {
+            vector.setNull(getCount());
+        } else if (handleSafe) {
+            vector.setSafe(getCount(), readFloat(row, ordinal));
+        } else {
+            vector.set(getCount(), readFloat(row, ordinal));
+        }
+    }
+
+    private boolean isNullAt(InternalRow row, int ordinal) {
+        return row.isNullAt(ordinal);
+    }
+
+    private float readFloat(InternalRow row, int ordinal) {
+        return row.getFloat(ordinal);
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowIntWriter.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowIntWriter.java
new file mode 100644
index 000000000..06f397b8e
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowIntWriter.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.writers;
+
+import com.alibaba.fluss.row.InternalRow;
+
+import org.apache.arrow.vector.IntVector;
+
+/** {@link ArrowFieldWriter} for Int. */
+public class ArrowIntWriter extends ArrowFieldWriter<InternalRow> {
+
+    public static ArrowIntWriter forField(IntVector intVector) {
+        return new ArrowIntWriter(intVector);
+    }
+
+    private ArrowIntWriter(IntVector intVector) {
+        super(intVector);
+    }
+
+    @Override
+    public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
+        IntVector vector = (IntVector) getValueVector();
+        if (isNullAt(row, ordinal)) {
+            vector.setNull(getCount());
+        } else if (handleSafe) {
+            vector.setSafe(getCount(), readInt(row, ordinal));
+        } else {
+            vector.set(getCount(), readInt(row, ordinal));
+        }
+    }
+
+    private boolean isNullAt(InternalRow row, int ordinal) {
+        return row.isNullAt(ordinal);
+    }
+
+    int readInt(InternalRow row, int ordinal) {
+        return row.getInt(ordinal);
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowSmallIntWriter.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowSmallIntWriter.java
new file mode 100644
index 000000000..3b410ed6a
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowSmallIntWriter.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.writers;
+
+import com.alibaba.fluss.row.InternalRow;
+
+import org.apache.arrow.vector.SmallIntVector;
+
+/** {@link ArrowFieldWriter} for SmallInt. */
+public class ArrowSmallIntWriter extends ArrowFieldWriter<InternalRow> {
+
+    public static ArrowSmallIntWriter forField(SmallIntVector smallIntVector) {
+        return new ArrowSmallIntWriter(smallIntVector);
+    }
+
+    private ArrowSmallIntWriter(SmallIntVector smallIntVector) {
+        super(smallIntVector);
+    }
+
+    @Override
+    public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
+        SmallIntVector vector = (SmallIntVector) getValueVector();
+        if (isNullAt(row, ordinal)) {
+            vector.setNull(getCount());
+        } else if (handleSafe) {
+            vector.setSafe(getCount(), readShort(row, ordinal));
+        } else {
+            vector.set(getCount(), readShort(row, ordinal));
+        }
+    }
+
+    public boolean isNullAt(InternalRow row, int ordinal) {
+        return row.isNullAt(ordinal);
+    }
+
+    public short readShort(InternalRow row, int ordinal) {
+        return row.getShort(ordinal);
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowTimeWriter.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowTimeWriter.java
new file mode 100644
index 000000000..fcfe9fecb
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowTimeWriter.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.writers;
+
+import com.alibaba.fluss.row.InternalRow;
+
+import org.apache.arrow.vector.BaseFixedWidthVector;
+import org.apache.arrow.vector.TimeMicroVector;
+import org.apache.arrow.vector.TimeMilliVector;
+import org.apache.arrow.vector.TimeNanoVector;
+import org.apache.arrow.vector.TimeSecVector;
+import org.apache.arrow.vector.ValueVector;
+
+import static com.alibaba.fluss.utils.Preconditions.checkState;
+
+/** {@link ArrowFieldWriter} for Time. */
+public class ArrowTimeWriter extends ArrowFieldWriter<InternalRow> {
+
+    public static ArrowTimeWriter forField(ValueVector valueVector) {
+        return new ArrowTimeWriter(valueVector);
+    }
+
+    private ArrowTimeWriter(ValueVector valueVector) {
+        super(valueVector);
+        checkState(
+                valueVector instanceof TimeSecVector
+                        || valueVector instanceof TimeMilliVector
+                        || valueVector instanceof TimeMicroVector
+                        || valueVector instanceof TimeNanoVector);
+    }
+
+    @Override
+    public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
+        ValueVector valueVector = getValueVector();
+        if (isNullAt(row, ordinal)) {
+            ((BaseFixedWidthVector) valueVector).setNull(getCount());
+        } else if (valueVector instanceof TimeSecVector) {
+            int sec = readTime(row, ordinal) / 1000;
+            if (handleSafe) {
+                ((TimeSecVector) valueVector).setSafe(getCount(), sec);
+            } else {
+                ((TimeSecVector) valueVector).set(getCount(), sec);
+            }
+        } else if (valueVector instanceof TimeMilliVector) {
+            int ms = readTime(row, ordinal);
+            if (handleSafe) {
+                ((TimeMilliVector) valueVector).setSafe(getCount(), ms);
+            } else {
+                ((TimeMilliVector) valueVector).set(getCount(), ms);
+            }
+        } else if (valueVector instanceof TimeMicroVector) {
+            long microSec = readTime(row, ordinal) * 1000L;
+            if (handleSafe) {
+                ((TimeMicroVector) valueVector).setSafe(getCount(), microSec);
+            } else {
+                ((TimeMicroVector) valueVector).set(getCount(), microSec);
+            }
+        } else {
+            long nanoSec = readTime(row, ordinal) * 1000000L;
+            if (handleSafe) {
+                ((TimeNanoVector) valueVector).setSafe(getCount(), nanoSec);
+            } else {
+                ((TimeNanoVector) valueVector).set(getCount(), nanoSec);
+            }
+        }
+    }
+
+    private boolean isNullAt(InternalRow row, int ordinal) {
+        return row.isNullAt(ordinal);
+    }
+
+    private int readTime(InternalRow row, int ordinal) {
+        return row.getInt(ordinal);
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowTimestampLtzWriter.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowTimestampLtzWriter.java
new file mode 100644
index 000000000..3ea10d399
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowTimestampLtzWriter.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.writers;
+
+import com.alibaba.fluss.row.InternalRow;
+import com.alibaba.fluss.row.TimestampLtz;
+
+import org.apache.arrow.vector.TimeStampMicroVector;
+import org.apache.arrow.vector.TimeStampMilliVector;
+import org.apache.arrow.vector.TimeStampSecVector;
+import org.apache.arrow.vector.TimeStampVector;
+import org.apache.arrow.vector.ValueVector;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+
+import static com.alibaba.fluss.utils.Preconditions.checkState;
+
+/** {@link ArrowFieldWriter} for TimestampLtz. */
+public class ArrowTimestampLtzWriter extends ArrowFieldWriter<InternalRow> {
+    public static ArrowTimestampLtzWriter forField(ValueVector valueVector, 
int precision) {
+        return new ArrowTimestampLtzWriter(valueVector, precision);
+    }
+
+    private final int precision;
+
+    private ArrowTimestampLtzWriter(ValueVector valueVector, int precision) {
+        super(valueVector);
+        checkState(
+                valueVector instanceof TimeStampVector
+                        && ((ArrowType.Timestamp) 
valueVector.getField().getType()).getTimezone()
+                                == null);
+        this.precision = precision;
+    }
+
+    @Override
+    public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
+        TimeStampVector vector = (TimeStampVector) getValueVector();
+        if (isNullAt(row, ordinal)) {
+            vector.setNull(getCount());
+        } else {
+            TimestampLtz timestamp = readTimestamp(row, ordinal);
+            if (vector instanceof TimeStampSecVector) {
+                long sec = timestamp.getEpochMillisecond() / 1000;
+                if (handleSafe) {
+                    vector.setSafe(getCount(), sec);
+                } else {
+                    vector.set(getCount(), sec);
+                }
+            } else if (vector instanceof TimeStampMilliVector) {
+                long ms = timestamp.getEpochMillisecond();
+                if (handleSafe) {
+                    vector.setSafe(getCount(), ms);
+                } else {
+                    vector.set(getCount(), ms);
+                }
+            } else if (vector instanceof TimeStampMicroVector) {
+                long microSec =
+                        timestamp.getEpochMillisecond() * 1000
+                                + timestamp.getNanoOfMillisecond() / 1000;
+                if (handleSafe) {
+                    vector.setSafe(getCount(), microSec);
+                } else {
+                    vector.set(getCount(), microSec);
+                }
+            } else {
+                long nanoSec =
+                        timestamp.getEpochMillisecond() * 1_000_000
+                                + timestamp.getNanoOfMillisecond();
+                if (handleSafe) {
+                    vector.setSafe(getCount(), nanoSec);
+                } else {
+                    vector.set(getCount(), nanoSec);
+                }
+            }
+        }
+    }
+
+    private boolean isNullAt(InternalRow row, int ordinal) {
+        return row.isNullAt(ordinal);
+    }
+
+    private TimestampLtz readTimestamp(InternalRow row, int ordinal) {
+        return row.getTimestampLtz(ordinal, precision);
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowTimestampNtzWriter.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowTimestampNtzWriter.java
new file mode 100644
index 000000000..3d129be98
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowTimestampNtzWriter.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.writers;
+
+import com.alibaba.fluss.row.InternalRow;
+import com.alibaba.fluss.row.TimestampNtz;
+
+import org.apache.arrow.vector.TimeStampMicroVector;
+import org.apache.arrow.vector.TimeStampMilliVector;
+import org.apache.arrow.vector.TimeStampSecVector;
+import org.apache.arrow.vector.TimeStampVector;
+import org.apache.arrow.vector.ValueVector;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+
+import static com.alibaba.fluss.utils.Preconditions.checkState;
+
+/** {@link ArrowFieldWriter} for TimestampNtz. */
+public class ArrowTimestampNtzWriter extends ArrowFieldWriter<InternalRow> {
+    public static ArrowTimestampNtzWriter forField(ValueVector valueVector, 
int precision) {
+        return new ArrowTimestampNtzWriter(valueVector, precision);
+    }
+
+    private final int precision;
+
+    private ArrowTimestampNtzWriter(ValueVector valueVector, int precision) {
+        super(valueVector);
+        checkState(
+                valueVector instanceof TimeStampVector
+                        && ((ArrowType.Timestamp) 
valueVector.getField().getType()).getTimezone()
+                                == null);
+        this.precision = precision;
+    }
+
+    @Override
+    public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
+        TimeStampVector vector = (TimeStampVector) getValueVector();
+        if (isNullAt(row, ordinal)) {
+            vector.setNull(getCount());
+        } else {
+            TimestampNtz timestamp = readTimestamp(row, ordinal);
+            if (vector instanceof TimeStampSecVector) {
+                long sec = timestamp.getMillisecond() / 1000;
+                if (handleSafe) {
+                    vector.setSafe(getCount(), sec);
+                } else {
+                    vector.set(getCount(), sec);
+                }
+            } else if (vector instanceof TimeStampMilliVector) {
+                long ms = timestamp.getMillisecond();
+                if (handleSafe) {
+                    vector.setSafe(getCount(), ms);
+                } else {
+                    vector.set(getCount(), ms);
+                }
+            } else if (vector instanceof TimeStampMicroVector) {
+                long microSec =
+                        timestamp.getMillisecond() * 1000 + 
timestamp.getNanoOfMillisecond() / 1000;
+                if (handleSafe) {
+                    vector.setSafe(getCount(), microSec);
+                } else {
+                    vector.set(getCount(), microSec);
+                }
+            } else {
+                long nanoSec =
+                        timestamp.getMillisecond() * 1_000_000 + 
timestamp.getNanoOfMillisecond();
+                if (handleSafe) {
+                    vector.setSafe(getCount(), nanoSec);
+                } else {
+                    vector.set(getCount(), nanoSec);
+                }
+            }
+        }
+    }
+
+    private boolean isNullAt(InternalRow row, int ordinal) {
+        return row.isNullAt(ordinal);
+    }
+
+    private TimestampNtz readTimestamp(InternalRow row, int ordinal) {
+        return row.getTimestampNtz(ordinal, precision);
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowTinyIntWriter.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowTinyIntWriter.java
new file mode 100644
index 000000000..805d24afe
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowTinyIntWriter.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.writers;
+
+import com.alibaba.fluss.row.InternalRow;
+
+import org.apache.arrow.vector.TinyIntVector;
+
+/** {@link ArrowFieldWriter} for TinyInt. */
+public class ArrowTinyIntWriter extends ArrowFieldWriter<InternalRow> {
+
+    public static ArrowTinyIntWriter forField(TinyIntVector tinyIntVector) {
+        return new ArrowTinyIntWriter(tinyIntVector);
+    }
+
+    private ArrowTinyIntWriter(TinyIntVector tinyIntVector) {
+        super(tinyIntVector);
+    }
+
+    @Override
+    public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
+        TinyIntVector vector = (TinyIntVector) getValueVector();
+        if (isNullAt(row, ordinal)) {
+            vector.setNull(getCount());
+        } else {
+            if (handleSafe) {
+                vector.setSafe(getCount(), readByte(row, ordinal));
+            } else {
+                vector.set(getCount(), readByte(row, ordinal));
+            }
+        }
+    }
+
+    private boolean isNullAt(InternalRow row, int ordinal) {
+        return row.isNullAt(ordinal);
+    }
+
+    private byte readByte(InternalRow row, int ordinal) {
+        return row.getByte(ordinal);
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowVarBinaryWriter.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowVarBinaryWriter.java
new file mode 100644
index 000000000..d3be0a653
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowVarBinaryWriter.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.writers;
+
+import com.alibaba.fluss.row.InternalRow;
+
+import org.apache.arrow.vector.VarBinaryVector;
+
+/** {@link ArrowFieldWriter} for VarBinary. */
+public class ArrowVarBinaryWriter extends ArrowFieldWriter<InternalRow> {
+
+    public static ArrowVarBinaryWriter forField(VarBinaryVector 
varBinaryVector) {
+        return new ArrowVarBinaryWriter(varBinaryVector);
+    }
+
+    private ArrowVarBinaryWriter(VarBinaryVector varBinaryVector) {
+        super(varBinaryVector);
+    }
+
+    @Override
+    public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
+        VarBinaryVector vector = (VarBinaryVector) getValueVector();
+        if (isNullAt(row, ordinal)) {
+            vector.setNull(getCount());
+        } else {
+            vector.setSafe(getCount(), readBinary(row, ordinal));
+        }
+    }
+
+    private boolean isNullAt(InternalRow row, int ordinal) {
+        return row.isNullAt(ordinal);
+    }
+
+    private byte[] readBinary(InternalRow row, int ordinal) {
+        return row.getBytes(ordinal);
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowVarCharWriter.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowVarCharWriter.java
new file mode 100644
index 000000000..05b2d442c
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowVarCharWriter.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.writers;
+
+import com.alibaba.fluss.row.BinaryString;
+import com.alibaba.fluss.row.InternalRow;
+
+import org.apache.arrow.vector.VarCharVector;
+
+import java.nio.ByteBuffer;
+
+/** {@link ArrowFieldWriter} for VarChar. */
+public class ArrowVarCharWriter extends ArrowFieldWriter<InternalRow> {
+
+    public static ArrowVarCharWriter forField(VarCharVector varCharVector) {
+        return new ArrowVarCharWriter(varCharVector);
+    }
+
+    private ArrowVarCharWriter(VarCharVector varCharVector) {
+        super(varCharVector);
+    }
+
+    @Override
+    public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
+        VarCharVector vector = (VarCharVector) getValueVector();
+        if (isNullAt(row, ordinal)) {
+            vector.setNull(getCount());
+        } else {
+            ByteBuffer buffer = readString(row, ordinal).wrapByteBuffer();
+            vector.setSafe(getCount(), buffer, buffer.position(), 
buffer.remaining());
+        }
+    }
+
+    private boolean isNullAt(InternalRow row, int ordinal) {
+        return row.isNullAt(ordinal);
+    }
+
+    private BinaryString readString(InternalRow row, int ordinal) {
+        return row.getString(ordinal);
+    }
+}
diff --git a/fluss-lake/fluss-lake-lance/src/main/resources/META-INF/NOTICE 
b/fluss-lake/fluss-lake-lance/src/main/resources/META-INF/NOTICE
index 786813d32..36053bfbc 100644
--- a/fluss-lake/fluss-lake-lance/src/main/resources/META-INF/NOTICE
+++ b/fluss-lake/fluss-lake-lance/src/main/resources/META-INF/NOTICE
@@ -21,7 +21,7 @@ This project bundles the following dependencies under the 
Apache Software Licens
 - org.apache.arrow:arrow-memory-netty:15.0.0
 - org.apache.arrow:arrow-vector:15.0.0
 - org.questdb:jar-jni:1.1.1
-- com.lancedb:lance-core:0.26.1
+- com.lancedb:lance-core:0.33.0
 - commons-codec:commons-codec:1.4
 - com.google.flatbuffers:flatbuffers-java:23.5.26
 
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/resources/META-INF/licenses/LICENSE.zstd-jni
 
b/fluss-lake/fluss-lake-lance/src/main/resources/META-INF/licenses/LICENSE.zstd-jni
deleted file mode 100644
index 66abb8ae7..000000000
--- 
a/fluss-lake/fluss-lake-lance/src/main/resources/META-INF/licenses/LICENSE.zstd-jni
+++ /dev/null
@@ -1,26 +0,0 @@
-Zstd-jni: JNI bindings to Zstd Library
-
-Copyright (c) 2015-present, Luben Karavelov/ All rights reserved.
-
-BSD License
-
-Redistribution and use in source and binary forms, with or without 
modification,
-are permitted provided that the following conditions are met:
-
-* Redistributions of source code must retain the above copyright notice, this
-  list of conditions and the following disclaimer.
-
-* Redistributions in binary form must reproduce the above copyright notice, 
this
-  list of conditions and the following disclaimer in the documentation and/or
-  other materials provided with the distribution.
-
-THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
-ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
-DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE 
FOR
-ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
-(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
-LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
-ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
-SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git 
a/fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/LakeEnabledTableCreateITCase.java
 
b/fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/LakeEnabledTableCreateITCase.java
index 1b071e867..2be42fff4 100644
--- 
a/fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/LakeEnabledTableCreateITCase.java
+++ 
b/fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/LakeEnabledTableCreateITCase.java
@@ -45,10 +45,9 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.nio.file.Files;
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
 
-import static com.alibaba.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
-import static com.alibaba.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
-import static com.alibaba.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -111,6 +110,8 @@ class LakeEnabledTableCreateITCase {
 
     @Test
     void testLogTable() throws Exception {
+        Map<String, String> customProperties = new HashMap<>();
+        customProperties.put("lance.batch_size", "256");
         // test bucket key log table
         TableDescriptor logTable =
                 TableDescriptor.builder()
@@ -134,11 +135,13 @@ class LakeEnabledTableCreateITCase {
                                         .column("log_c16", 
DataTypes.TIMESTAMP())
                                         .build())
                         .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true)
+                        .customProperties(customProperties)
                         .distributedBy(BUCKET_NUM, "log_c1", "log_c2")
                         .build();
         TablePath logTablePath = TablePath.of(DATABASE, "log_table");
         admin.createTable(logTablePath, logTable, false).get();
-        LanceConfig config = LanceConfig.from(lanceConf.toMap(), DATABASE, 
"log_table");
+        LanceConfig config =
+                LanceConfig.from(lanceConf.toMap(), customProperties, 
DATABASE, "log_table");
 
         // check the gotten log table
         Field logC1 = new Field("log_c1", FieldType.nullable(new 
ArrowType.Int(4 * 8, true)), null);
@@ -182,25 +185,11 @@ class LakeEnabledTableCreateITCase {
                         FieldType.nullable(new 
ArrowType.Timestamp(TimeUnit.MICROSECOND, null)),
                         null);
 
-        // for __bucket, __offset, __timestamp
-        Field logC17 =
-                new Field(
-                        BUCKET_COLUMN_NAME, FieldType.nullable(new 
ArrowType.Int(32, true)), null);
-        Field logC18 =
-                new Field(
-                        OFFSET_COLUMN_NAME, FieldType.nullable(new 
ArrowType.Int(64, true)), null);
-        Field logC19 =
-                new Field(
-                        TIMESTAMP_COLUMN_NAME,
-                        FieldType.nullable(new 
ArrowType.Timestamp(TimeUnit.MICROSECOND, null)),
-                        null);
-
         org.apache.arrow.vector.types.pojo.Schema expectedSchema =
                 new org.apache.arrow.vector.types.pojo.Schema(
                         Arrays.asList(
                                 logC1, logC2, logC3, logC4, logC5, logC6, 
logC7, logC8, logC9,
-                                logC10, logC11, logC12, logC13, logC14, 
logC15, logC16, logC17,
-                                logC18, logC19));
+                                logC10, logC11, logC12, logC13, logC14, 
logC15, logC16));
         
assertThat(expectedSchema).isEqualTo(LanceDatasetAdapter.getSchema(config).get());
     }
 
diff --git 
a/fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/testutils/FlinkLanceTieringTestBase.java
 
b/fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/testutils/FlinkLanceTieringTestBase.java
new file mode 100644
index 000000000..728e4a8a9
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/testutils/FlinkLanceTieringTestBase.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.testutils;
+
+import com.alibaba.fluss.client.Connection;
+import com.alibaba.fluss.client.ConnectionFactory;
+import com.alibaba.fluss.client.admin.Admin;
+import com.alibaba.fluss.client.table.Table;
+import com.alibaba.fluss.client.table.writer.AppendWriter;
+import com.alibaba.fluss.client.table.writer.TableWriter;
+import com.alibaba.fluss.client.table.writer.UpsertWriter;
+import com.alibaba.fluss.config.AutoPartitionTimeUnit;
+import com.alibaba.fluss.config.ConfigOptions;
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.exception.FlussRuntimeException;
+import com.alibaba.fluss.flink.tiering.LakeTieringJobBuilder;
+import com.alibaba.fluss.metadata.DataLakeFormat;
+import com.alibaba.fluss.metadata.Schema;
+import com.alibaba.fluss.metadata.TableBucket;
+import com.alibaba.fluss.metadata.TableDescriptor;
+import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.row.InternalRow;
+import com.alibaba.fluss.server.replica.Replica;
+import com.alibaba.fluss.server.testutils.FlussClusterExtension;
+import com.alibaba.fluss.types.DataTypes;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.nio.file.Files;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
com.alibaba.fluss.flink.tiering.source.TieringSourceOptions.POLL_TIERING_TABLE_INTERVAL;
+import static com.alibaba.fluss.testutils.common.CommonTestUtils.retry;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test base for sync to lance by Flink. */
+public class FlinkLanceTieringTestBase {
+
+    protected static final String DEFAULT_DB = "fluss";
+
+    @RegisterExtension
+    public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
+            FlussClusterExtension.builder()
+                    .setClusterConf(initConfig())
+                    .setNumOfTabletServers(3)
+                    .build();
+
+    protected StreamExecutionEnvironment execEnv;
+
+    protected static Connection conn;
+    protected static Admin admin;
+    protected static Configuration clientConf;
+    private static String warehousePath;
+
+    private static Configuration initConfig() {
+        Configuration conf = new Configuration();
+        conf.set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1))
+                // not to clean snapshots for test purpose
+                .set(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, 
Integer.MAX_VALUE);
+        conf.setString("datalake.format", "lance");
+        try {
+            warehousePath =
+                    Files.createTempDirectory("fluss-testing-datalake-tiered")
+                            .resolve("warehouse")
+                            .toString();
+        } catch (Exception e) {
+            throw new FlussRuntimeException("Failed to create warehouse path");
+        }
+        conf.setString("datalake.lance.warehouse", warehousePath);
+        return conf;
+    }
+
+    @BeforeAll
+    protected static void beforeAll() {
+        clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
+        conn = ConnectionFactory.createConnection(clientConf);
+        admin = conn.getAdmin();
+    }
+
+    @BeforeEach
+    public void beforeEach() {
+        execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+        execEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+        execEnv.setParallelism(2);
+        execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+    }
+
+    @AfterAll
+    static void afterAll() throws Exception {
+        if (admin != null) {
+            admin.close();
+            admin = null;
+        }
+        if (conn != null) {
+            conn.close();
+            conn = null;
+        }
+    }
+
+    protected static Map<String, String> getLanceCatalogConf() {
+        Map<String, String> lanceConf = new HashMap<>();
+        lanceConf.put("warehouse", warehousePath);
+        return lanceConf;
+    }
+
+    protected long createTable(TablePath tablePath, TableDescriptor 
tableDescriptor)
+            throws Exception {
+        admin.createTable(tablePath, tableDescriptor, true).get();
+        return admin.getTableInfo(tablePath).get().getTableId();
+    }
+
+    protected long createLogTable(TablePath tablePath) throws Exception {
+        return createLogTable(tablePath, 1);
+    }
+
+    protected long createLogTable(TablePath tablePath, int bucketNum) throws 
Exception {
+        return createLogTable(tablePath, bucketNum, false);
+    }
+
+    protected Replica getLeaderReplica(TableBucket tableBucket) {
+        return FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tableBucket);
+    }
+
+    protected void assertReplicaStatus(TableBucket tb, long 
expectedLogEndOffset) {
+        retry(
+                Duration.ofMinutes(1),
+                () -> {
+                    Replica replica = getLeaderReplica(tb);
+                    // datalake snapshot id should be updated
+                    assertThat(replica.getLogTablet().getLakeTableSnapshotId())
+                            .isGreaterThanOrEqualTo(0);
+                    
assertThat(replica.getLakeLogEndOffset()).isEqualTo(expectedLogEndOffset);
+                });
+    }
+
+    protected long createLogTable(TablePath tablePath, int bucketNum, boolean 
isPartitioned)
+            throws Exception {
+        Schema.Builder schemaBuilder =
+                Schema.newBuilder().column("a", DataTypes.INT()).column("b", 
DataTypes.STRING());
+
+        TableDescriptor.Builder tableBuilder =
+                TableDescriptor.builder()
+                        .distributedBy(bucketNum, "a")
+                        .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), 
"true")
+                        .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, 
Duration.ofMillis(500));
+
+        if (isPartitioned) {
+            schemaBuilder.column("c", DataTypes.STRING());
+            tableBuilder.property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED, 
true);
+            tableBuilder.partitionedBy("c");
+            tableBuilder.property(
+                    ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT, 
AutoPartitionTimeUnit.YEAR);
+        }
+        tableBuilder.schema(schemaBuilder.build());
+        return createTable(tablePath, tableBuilder.build());
+    }
+
+    protected void writeRows(TablePath tablePath, List<InternalRow> rows, 
boolean append)
+            throws Exception {
+        try (Table table = conn.getTable(tablePath)) {
+            TableWriter tableWriter;
+            if (append) {
+                tableWriter = table.newAppend().createWriter();
+            } else {
+                tableWriter = table.newUpsert().createWriter();
+            }
+            for (InternalRow row : rows) {
+                if (tableWriter instanceof AppendWriter) {
+                    ((AppendWriter) tableWriter).append(row);
+                } else {
+                    ((UpsertWriter) tableWriter).upsert(row);
+                }
+            }
+            tableWriter.flush();
+        }
+    }
+
+    protected JobClient buildTieringJob(StreamExecutionEnvironment execEnv) 
throws Exception {
+        Configuration flussConfig = new Configuration(clientConf);
+        flussConfig.set(POLL_TIERING_TABLE_INTERVAL, Duration.ofMillis(500L));
+        return LakeTieringJobBuilder.newBuilder(
+                        execEnv,
+                        flussConfig,
+                        Configuration.fromMap(getLanceCatalogConf()),
+                        DataLakeFormat.LANCE.toString())
+                .build();
+    }
+
+    protected void waitUntilSnapshot(long tableId, int bucketNum, long 
snapshotId) {
+        for (int i = 0; i < bucketNum; i++) {
+            TableBucket tableBucket = new TableBucket(tableId, i);
+            FLUSS_CLUSTER_EXTENSION.waitUntilSnapshotFinished(tableBucket, 
snapshotId);
+        }
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/tiering/LanceTieringITCase.java
 
b/fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/tiering/LanceTieringITCase.java
new file mode 100644
index 000000000..ee54dfd92
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/tiering/LanceTieringITCase.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.tiering;
+
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.lake.lance.LanceConfig;
+import com.alibaba.fluss.lake.lance.testutils.FlinkLanceTieringTestBase;
+import com.alibaba.fluss.metadata.TableBucket;
+import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.row.InternalRow;
+
+import com.lancedb.lance.Dataset;
+import com.lancedb.lance.ReadOptions;
+import com.lancedb.lance.Transaction;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static 
com.alibaba.fluss.lake.committer.BucketOffset.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
+import static 
com.alibaba.fluss.lake.writer.LakeTieringFactory.FLUSS_LAKE_TIERING_COMMIT_USER;
+import static com.alibaba.fluss.testutils.DataTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT case for tiering tables to lance. */
+class LanceTieringITCase extends FlinkLanceTieringTestBase {
+    protected static final String DEFAULT_DB = "fluss";
+    private static StreamExecutionEnvironment execEnv;
+    private static Configuration lanceConf;
+    private static final RootAllocator allocator = new RootAllocator();
+
+    @BeforeAll
+    protected static void beforeAll() {
+        FlinkLanceTieringTestBase.beforeAll();
+        execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+        execEnv.setParallelism(2);
+        execEnv.enableCheckpointing(1000);
+        lanceConf = Configuration.fromMap(getLanceCatalogConf());
+    }
+
+    @Test
+    void testTiering() throws Exception {
+        // create log table
+        TablePath t1 = TablePath.of(DEFAULT_DB, "logTable");
+        long t1Id = createLogTable(t1);
+        TableBucket t1Bucket = new TableBucket(t1Id, 0);
+        List<InternalRow> flussRows = new ArrayList<>();
+        // write records
+        for (int i = 0; i < 10; i++) {
+            List<InternalRow> rows = Arrays.asList(row(1, "v1"), row(2, "v2"), 
row(3, "v3"));
+            flussRows.addAll(rows);
+            // write records
+            writeRows(t1, rows, true);
+        }
+
+        // then start tiering job
+        JobClient jobClient = buildTieringJob(execEnv);
+
+        // check the status of replica after synced;
+        // note: we can't update log start offset for unaware bucket mode log 
table
+        assertReplicaStatus(t1Bucket, 30);
+
+        LanceConfig config =
+                LanceConfig.from(
+                        lanceConf.toMap(),
+                        Collections.emptyMap(),
+                        t1.getDatabaseName(),
+                        t1.getTableName());
+
+        // check data in lance
+        checkDataInLanceAppendOnlyTable(config, flussRows);
+        // check snapshot property in lance
+        Map<String, String> properties =
+                new HashMap<String, String>() {
+                    {
+                        put(
+                                FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
+                                "[{\"bucket_id\":0,\"log_offset\":30}]");
+                        put("commit-user", FLUSS_LAKE_TIERING_COMMIT_USER);
+                    }
+                };
+        checkSnapshotPropertyInLance(config, properties);
+
+        jobClient.cancel().get();
+    }
+
+    private void checkSnapshotPropertyInLance(
+            LanceConfig config, Map<String, String> expectedProperties) throws 
Exception {
+        ReadOptions.Builder builder = new ReadOptions.Builder();
+        builder.setStorageOptions(LanceConfig.genStorageOptions(config));
+        try (Dataset dataset = Dataset.open(allocator, config.getDatasetUri(), 
builder.build())) {
+            Transaction transaction = dataset.readTransaction().orElse(null);
+            assertThat(transaction).isNotNull();
+            
assertThat(transaction.transactionProperties()).isEqualTo(expectedProperties);
+        }
+    }
+
+    private void checkDataInLanceAppendOnlyTable(LanceConfig config, 
List<InternalRow> expectedRows)
+            throws Exception {
+        try (Dataset dataset =
+                Dataset.open(
+                        allocator,
+                        config.getDatasetUri(),
+                        LanceConfig.genReadOptionFromConfig(config))) {
+            ArrowReader reader = dataset.newScan().scanBatches();
+            VectorSchemaRoot readerRoot = reader.getVectorSchemaRoot();
+            reader.loadNextBatch();
+            Iterator<InternalRow> flussRowIterator = expectedRows.iterator();
+            int rowCount = readerRoot.getRowCount();
+            for (int i = 0; i < rowCount; i++) {
+                InternalRow flussRow = flussRowIterator.next();
+                assertThat((int) (readerRoot.getVector(0).getObject(i)))
+                        .isEqualTo(flussRow.getInt(0));
+                assertThat(((VarCharVector) 
readerRoot.getVector(1)).getObject(i).toString())
+                        .isEqualTo(flussRow.getString(1).toString());
+            }
+            assertThat(reader.loadNextBatch()).isFalse();
+            assertThat(flussRowIterator.hasNext()).isFalse();
+        }
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/tiering/LanceTieringTest.java
 
b/fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/tiering/LanceTieringTest.java
new file mode 100644
index 000000000..60b7a330e
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/tiering/LanceTieringTest.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.tiering;
+
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.lake.committer.CommittedLakeSnapshot;
+import com.alibaba.fluss.lake.committer.LakeCommitter;
+import com.alibaba.fluss.lake.lance.LanceConfig;
+import com.alibaba.fluss.lake.lance.utils.LanceArrowUtils;
+import com.alibaba.fluss.lake.lance.utils.LanceDatasetAdapter;
+import com.alibaba.fluss.lake.serializer.SimpleVersionedSerializer;
+import com.alibaba.fluss.lake.writer.LakeWriter;
+import com.alibaba.fluss.lake.writer.WriterInitContext;
+import com.alibaba.fluss.metadata.Schema;
+import com.alibaba.fluss.metadata.TableBucket;
+import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.record.ChangeType;
+import com.alibaba.fluss.record.GenericRecord;
+import com.alibaba.fluss.record.LogRecord;
+import com.alibaba.fluss.row.BinaryString;
+import com.alibaba.fluss.row.GenericRow;
+import com.alibaba.fluss.types.DataTypes;
+import com.alibaba.fluss.utils.types.Tuple2;
+
+import com.lancedb.lance.Dataset;
+import com.lancedb.lance.WriteParams;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static 
com.alibaba.fluss.flink.tiering.committer.TieringCommitOperator.toBucketOffsetsProperty;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** The UT for tiering to Lance via {@link LanceLakeTieringFactory}. */
+class LanceTieringTest {
+    private @TempDir File tempWarehouseDir;
+    private LanceLakeTieringFactory lanceLakeTieringFactory;
+    private Configuration configuration;
+
+    @BeforeEach
+    void beforeEach() {
+        configuration = new Configuration();
+        configuration.setString("warehouse", tempWarehouseDir.toString());
+        lanceLakeTieringFactory = new LanceLakeTieringFactory(configuration);
+    }
+
+    private static Stream<Arguments> tieringWriteArgs() {
+        return Stream.of(Arguments.of(false), Arguments.of(true));
+    }
+
+    @ParameterizedTest
+    @MethodSource("tieringWriteArgs")
+    void testTieringWriteTable(boolean isPartitioned) throws Exception {
+        int bucketNum = 3;
+        TablePath tablePath = TablePath.of("lance", "logTable");
+        Map<String, String> customProperties = new HashMap<>();
+        customProperties.put("lance.batch_size", "256");
+        LanceConfig config =
+                LanceConfig.from(
+                        configuration.toMap(),
+                        customProperties,
+                        tablePath.getDatabaseName(),
+                        tablePath.getTableName());
+        Schema schema = createTable(config);
+
+        List<LanceWriteResult> lanceWriteResults = new ArrayList<>();
+        SimpleVersionedSerializer<LanceWriteResult> writeResultSerializer =
+                lanceLakeTieringFactory.getWriteResultSerializer();
+        SimpleVersionedSerializer<LanceCommittable> committableSerializer =
+                lanceLakeTieringFactory.getCommittableSerializer();
+
+        try (LakeCommitter<LanceWriteResult, LanceCommittable> lakeCommitter =
+                createLakeCommitter(tablePath)) {
+            // should no any missing snapshot
+            assertThat(lakeCommitter.getMissingLakeSnapshot(2L)).isNull();
+        }
+
+        Map<Tuple2<String, Integer>, List<LogRecord>> recordsByBucket = new 
HashMap<>();
+        Map<Long, String> partitionIdAndName =
+                isPartitioned
+                        ? new HashMap<Long, String>() {
+                            {
+                                put(1L, "p1");
+                                put(2L, "p2");
+                                put(3L, "p3");
+                            }
+                        }
+                        : Collections.singletonMap(null, null);
+        List<String> partitionKeys = isPartitioned ? Arrays.asList("c3") : 
null;
+        Map<TableBucket, Long> tableBucketOffsets = new HashMap<>();
+        // first, write data
+        for (int bucket = 0; bucket < bucketNum; bucket++) {
+            for (Map.Entry<Long, String> entry : 
partitionIdAndName.entrySet()) {
+                String partition = entry.getValue();
+                try (LakeWriter<LanceWriteResult> lakeWriter =
+                        createLakeWriter(tablePath, bucket, partition, schema, 
customProperties)) {
+                    Tuple2<String, Integer> partitionBucket = 
Tuple2.of(partition, bucket);
+                    Tuple2<List<LogRecord>, List<LogRecord>> 
writeAndExpectRecords =
+                            genLogTableRecords(partition, bucket, 10);
+                    List<LogRecord> writtenRecords = writeAndExpectRecords.f0;
+                    List<LogRecord> expectRecords = writeAndExpectRecords.f1;
+                    recordsByBucket.put(partitionBucket, expectRecords);
+                    tableBucketOffsets.put(new TableBucket(0, entry.getKey(), 
bucket), 10L);
+                    for (LogRecord logRecord : writtenRecords) {
+                        lakeWriter.write(logRecord);
+                    }
+                    // serialize/deserialize writeResult
+                    LanceWriteResult lanceWriteResult = lakeWriter.complete();
+                    byte[] serialized = 
writeResultSerializer.serialize(lanceWriteResult);
+                    lanceWriteResults.add(
+                            writeResultSerializer.deserialize(
+                                    writeResultSerializer.getVersion(), 
serialized));
+                }
+            }
+        }
+
+        // second, commit data
+        try (LakeCommitter<LanceWriteResult, LanceCommittable> lakeCommitter =
+                createLakeCommitter(tablePath)) {
+            // serialize/deserialize committable
+            LanceCommittable lanceCommittable = 
lakeCommitter.toCommittable(lanceWriteResults);
+            byte[] serialized = 
committableSerializer.serialize(lanceCommittable);
+            lanceCommittable =
+                    committableSerializer.deserialize(
+                            committableSerializer.getVersion(), serialized);
+            long snapshot =
+                    lakeCommitter.commit(
+                            lanceCommittable,
+                            toBucketOffsetsProperty(
+                                    tableBucketOffsets, partitionIdAndName, 
partitionKeys));
+            // lance dataset version starts from 1
+            assertThat(snapshot).isEqualTo(2);
+        }
+
+        try (Dataset dataset =
+                Dataset.open(
+                        new RootAllocator(),
+                        config.getDatasetUri(),
+                        LanceConfig.genReadOptionFromConfig(config))) {
+            ArrowReader reader = dataset.newScan().scanBatches();
+            VectorSchemaRoot readerRoot = reader.getVectorSchemaRoot();
+
+            // then, check data
+            for (int bucket = 0; bucket < 3; bucket++) {
+                for (String partition : partitionIdAndName.values()) {
+                    reader.loadNextBatch();
+                    Tuple2<String, Integer> partitionBucket = 
Tuple2.of(partition, bucket);
+                    List<LogRecord> expectRecords = 
recordsByBucket.get(partitionBucket);
+                    verifyLogTableRecords(
+                            readerRoot, expectRecords, bucket, isPartitioned, 
partition);
+                }
+            }
+            assertThat(reader.loadNextBatch()).isFalse();
+        }
+
+        // then, let's verify getMissingLakeSnapshot works
+        try (LakeCommitter<LanceWriteResult, LanceCommittable> lakeCommitter =
+                createLakeCommitter(tablePath)) {
+            // use snapshot id 1 as the known snapshot id
+            CommittedLakeSnapshot committedLakeSnapshot = 
lakeCommitter.getMissingLakeSnapshot(1L);
+            assertThat(committedLakeSnapshot).isNotNull();
+            Map<Tuple2<Long, Integer>, Long> offsets = 
committedLakeSnapshot.getLogEndOffsets();
+            for (int bucket = 0; bucket < 3; bucket++) {
+                for (Long partitionId : partitionIdAndName.keySet()) {
+                    // we only write 10 records, so expected log offset should 
be 10
+                    assertThat(offsets.get(Tuple2.of(partitionId, 
bucket))).isEqualTo(10);
+                }
+            }
+            
assertThat(committedLakeSnapshot.getLakeSnapshotId()).isEqualTo(2L);
+
+            // use null as the known snapshot id
+            CommittedLakeSnapshot committedLakeSnapshot2 =
+                    lakeCommitter.getMissingLakeSnapshot(null);
+            
assertThat(committedLakeSnapshot2).isEqualTo(committedLakeSnapshot);
+
+            // use snapshot id 2 as the known snapshot id
+            committedLakeSnapshot = lakeCommitter.getMissingLakeSnapshot(2L);
+            // no any missing committed offset since the latest snapshot is 2L
+            assertThat(committedLakeSnapshot).isNull();
+        }
+    }
+
+    private void verifyLogTableRecords(
+            VectorSchemaRoot root,
+            List<LogRecord> expectRecords,
+            int expectBucket,
+            boolean isPartitioned,
+            @Nullable String partition)
+            throws Exception {
+        assertThat(root.getRowCount()).isEqualTo(expectRecords.size());
+        for (int i = 0; i < expectRecords.size(); i++) {
+            LogRecord expectRecord = expectRecords.get(i);
+            // check business columns:
+            assertThat((int) (root.getVector(0).getObject(i)))
+                    .isEqualTo(expectRecord.getRow().getInt(0));
+            assertThat(((VarCharVector) 
root.getVector(1)).getObject(i).toString())
+                    .isEqualTo(expectRecord.getRow().getString(1).toString());
+            assertThat(((VarCharVector) 
root.getVector(2)).getObject(i).toString())
+                    .isEqualTo(expectRecord.getRow().getString(2).toString());
+        }
+    }
+
+    private LakeCommitter<LanceWriteResult, LanceCommittable> 
createLakeCommitter(
+            TablePath tablePath) throws IOException {
+        return lanceLakeTieringFactory.createLakeCommitter(() -> tablePath);
+    }
+
+    private LakeWriter<LanceWriteResult> createLakeWriter(
+            TablePath tablePath,
+            int bucket,
+            @Nullable String partition,
+            Schema schema,
+            Map<String, String> customProperties)
+            throws IOException {
+        return lanceLakeTieringFactory.createLakeWriter(
+                new WriterInitContext() {
+                    @Override
+                    public TablePath tablePath() {
+                        return tablePath;
+                    }
+
+                    @Override
+                    public TableBucket tableBucket() {
+                        // don't care about tableId & partitionId
+                        return new TableBucket(0, 0L, bucket);
+                    }
+
+                    @Nullable
+                    @Override
+                    public String partition() {
+                        return partition;
+                    }
+
+                    @Override
+                    public com.alibaba.fluss.metadata.Schema schema() {
+                        return schema;
+                    }
+
+                    @Override
+                    public Map<String, String> customProperties() {
+                        return customProperties;
+                    }
+                });
+    }
+
+    private Tuple2<List<LogRecord>, List<LogRecord>> genLogTableRecords(
+            @Nullable String partition, int bucket, int numRecords) {
+        List<LogRecord> logRecords = new ArrayList<>();
+        for (int i = 0; i < numRecords; i++) {
+            GenericRow genericRow;
+            if (partition != null) {
+                // Partitioned table: include partition field in data
+                genericRow = new GenericRow(3); // c1, c2, c3(partition)
+                genericRow.setField(0, i);
+                genericRow.setField(1, BinaryString.fromString("bucket" + 
bucket + "_" + i));
+                genericRow.setField(2, BinaryString.fromString(partition)); // 
partition field
+            } else {
+                // Non-partitioned table
+                genericRow = new GenericRow(3);
+                genericRow.setField(0, i);
+                genericRow.setField(1, BinaryString.fromString("bucket" + 
bucket + "_" + i));
+                genericRow.setField(2, BinaryString.fromString("bucket" + 
bucket));
+            }
+            LogRecord logRecord =
+                    new GenericRecord(
+                            i, System.currentTimeMillis(), 
ChangeType.APPEND_ONLY, genericRow);
+            logRecords.add(logRecord);
+        }
+        return Tuple2.of(logRecords, logRecords);
+    }
+
+    private Schema createTable(LanceConfig config) throws Exception {
+        List<Schema.Column> columns = new ArrayList<>();
+        columns.add(new Schema.Column("c1", DataTypes.INT()));
+        columns.add(new Schema.Column("c2", DataTypes.STRING()));
+        columns.add(new Schema.Column("c3", DataTypes.STRING()));
+        Schema.Builder schemaBuilder = 
Schema.newBuilder().fromColumns(columns);
+        Schema schema = schemaBuilder.build();
+        WriteParams params = LanceConfig.genWriteParamsFromConfig(config);
+        LanceDatasetAdapter.createDataset(
+                config.getDatasetUri(), 
LanceArrowUtils.toArrowSchema(schema.getRowType()), params);
+
+        return schema;
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
index 3dcfc826d..fe2933906 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
@@ -126,7 +126,6 @@ public class FlinkPaimonTieringTestBase {
         execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
         execEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);
         execEnv.setParallelism(2);
-        execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
     }
 
     protected JobClient buildTieringJob(StreamExecutionEnvironment execEnv) 
throws Exception {

Reply via email to