luoyuxia commented on code in PR #1441:
URL: https://github.com/apache/fluss/pull/1441#discussion_r2285135029


##########
fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceCommittableSerializer.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.tiering;
+
+import com.alibaba.fluss.lake.serializer.SimpleVersionedSerializer;
+
+import com.lancedb.lance.FragmentMetadata;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.List;
+
+/** The serializer of {@link LanceCommittable}. */
+public class LanceCommittableSerializer implements 
SimpleVersionedSerializer<LanceCommittable> {
+    private static final int CURRENT_VERSION = 1;
+
+    @Override
+    public int getVersion() {
+        return CURRENT_VERSION;
+    }
+
+    @Override
+    public byte[] serialize(LanceCommittable lanceCommittable) throws 
IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();

Review Comment:
   nit:
   ```
   try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
                   ObjectOutputStream oos = new ObjectOutputStream(baos)) {
               oos.writeObject(lanceCommittable.committable());
               return baos.toByteArray();
           }
   ```
   In case it don't close the stream 



##########
fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/LanceConfig.java:
##########
@@ -64,11 +58,17 @@ public LanceConfig(
     }
 
     public static LanceConfig from(

Review Comment:
   nit:
   `LanceConfig#getDatabaseName`
   `LanceConfig#getTableName`
   is not used.



##########
fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceCommittableSerializer.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.tiering;
+
+import com.alibaba.fluss.lake.serializer.SimpleVersionedSerializer;
+
+import com.lancedb.lance.FragmentMetadata;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.List;
+
+/** The serializer of {@link LanceCommittable}. */
+public class LanceCommittableSerializer implements 
SimpleVersionedSerializer<LanceCommittable> {
+    private static final int CURRENT_VERSION = 1;
+
+    @Override
+    public int getVersion() {
+        return CURRENT_VERSION;
+    }
+
+    @Override
+    public byte[] serialize(LanceCommittable lanceCommittable) throws 
IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        ObjectOutputStream oos = new ObjectOutputStream(baos);
+        oos.writeObject(lanceCommittable.committable());
+        oos.close();
+        return baos.toByteArray();
+    }
+
+    @Override
+    public LanceCommittable deserialize(int version, byte[] serialized) throws 
IOException {
+        if (version != CURRENT_VERSION) {
+            throw new UnsupportedOperationException(
+                    "Expecting LanceCommittable version to be "
+                            + CURRENT_VERSION
+                            + ", but found "
+                            + version
+                            + ".");
+        }
+        ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
+        ObjectInputStream ois = new ObjectInputStream(bais);
+        try {
+            return new LanceCommittable((List<FragmentMetadata>) 
ois.readObject());

Review Comment:
   nit: add 
   ```
    //noinspection unchecked
   ```
   to make ide happy



##########
fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceLakeCommitter.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.tiering;
+
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.lake.committer.BucketOffset;
+import com.alibaba.fluss.lake.committer.CommittedLakeSnapshot;
+import com.alibaba.fluss.lake.committer.LakeCommitter;
+import com.alibaba.fluss.lake.lance.LanceConfig;
+import com.alibaba.fluss.lake.lance.utils.LanceDatasetAdapter;
+import com.alibaba.fluss.metadata.TablePath;
+import 
com.alibaba.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
com.alibaba.fluss.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import com.alibaba.fluss.utils.json.BucketOffsetJsonSerde;
+
+import com.lancedb.lance.Dataset;
+import com.lancedb.lance.FragmentMetadata;
+import com.lancedb.lance.ReadOptions;
+import com.lancedb.lance.Transaction;
+import com.lancedb.lance.Version;
+import org.apache.arrow.memory.RootAllocator;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static 
com.alibaba.fluss.lake.committer.BucketOffset.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
+import static 
com.alibaba.fluss.lake.writer.LakeTieringFactory.FLUSS_LAKE_TIERING_COMMIT_USER;
+
+/** Implementation of {@link LakeCommitter} for Lance. */
+public class LanceLakeCommitter implements LakeCommitter<LanceWriteResult, 
LanceCommittable> {
+    private final LanceConfig config;
+    private final RootAllocator allocator = new RootAllocator();
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    public LanceLakeCommitter(Configuration options, TablePath tablePath) {
+        this.config =
+                LanceConfig.from(
+                        options.toMap(),
+                        Collections.emptyMap(),
+                        tablePath.getDatabaseName(),
+                        tablePath.getTableName());
+    }
+
+    @Override
+    public LanceCommittable toCommittable(List<LanceWriteResult> 
lanceWriteResults)
+            throws IOException {
+        List<FragmentMetadata> fragments =
+                lanceWriteResults.stream()
+                        .map(LanceWriteResult::commitMessage)
+                        .flatMap(List::stream)
+                        .collect(Collectors.toList());
+        return new LanceCommittable(fragments);
+    }
+
+    @Override
+    public long commit(LanceCommittable committable, Map<String, String> 
snapshotProperties)
+            throws IOException {
+        Map<String, String> properties = new HashMap<>(snapshotProperties);
+        properties.put("commit-user", FLUSS_LAKE_TIERING_COMMIT_USER);
+        return LanceDatasetAdapter.commitAppend(config, 
committable.committable(), properties);
+    }
+
+    @Override
+    public void abort(LanceCommittable committable) throws IOException {}
+
+    @SuppressWarnings("checkstyle:LocalVariableName")
+    @Nullable
+    @Override
+    public CommittedLakeSnapshot getMissingLakeSnapshot(@Nullable Long 
latestLakeSnapshotIdOfFluss)
+            throws IOException {
+        Transaction latestLakeSnapshotIdOfLake =
+                
getCommittedLatestSnapshotOfLake(FLUSS_LAKE_TIERING_COMMIT_USER);
+
+        if (latestLakeSnapshotIdOfLake == null) {
+            return null;
+        }
+
+        // we get the latest snapshot committed by fluss,
+        // but the latest snapshot is not greater than 
latestLakeSnapshotIdOfFluss, no any missing
+        // snapshot, return directly
+        if (latestLakeSnapshotIdOfFluss != null
+                && latestLakeSnapshotIdOfLake.readVersion() <= 
latestLakeSnapshotIdOfFluss) {
+            return null;
+        }
+
+        CommittedLakeSnapshot committedLakeSnapshot =
+                new 
CommittedLakeSnapshot(latestLakeSnapshotIdOfLake.readVersion());
+        String flussOffsetProperties =
+                latestLakeSnapshotIdOfLake
+                        .transactionProperties()
+                        .get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY);
+        for (JsonNode node : OBJECT_MAPPER.readTree(flussOffsetProperties)) {
+            BucketOffset bucketOffset = 
BucketOffsetJsonSerde.INSTANCE.deserialize(node);
+            if (bucketOffset.getPartitionId() != null) {
+                committedLakeSnapshot.addPartitionBucket(
+                        bucketOffset.getPartitionId(),
+                        bucketOffset.getPartitionQualifiedName(),
+                        bucketOffset.getBucket(),
+                        bucketOffset.getLogOffset());
+            } else {
+                committedLakeSnapshot.addBucket(
+                        bucketOffset.getBucket(), bucketOffset.getLogOffset());
+            }
+        }
+        return committedLakeSnapshot;
+    }
+
+    @Nullable
+    private Transaction getCommittedLatestSnapshotOfLake(String commitUser) {
+        Transaction latestFlussSnapshot = null;
+
+        ReadOptions.Builder builder = new ReadOptions.Builder();
+        builder.setStorageOptions(LanceConfig.genStorageOptions(config));
+        try (Dataset dataset = Dataset.open(allocator, config.getDatasetUri(), 
builder.build())) {
+            for (Version version : dataset.listVersions()) {
+                builder.setVersion((int) version.getId());
+                try (Dataset datasetVersion =
+                        Dataset.open(allocator, config.getDatasetUri(), 
builder.build())) {
+                    Transaction transaction = 
datasetVersion.readTransaction().orElse(null);
+                    if (transaction != null
+                            && commitUser.equals(
+                                    
transaction.transactionProperties().get("commit-user"))) {
+                        if (latestFlussSnapshot == null
+                                || transaction.readVersion() > 
latestFlussSnapshot.readVersion()) {
+                            latestFlussSnapshot = transaction;
+                        }
+                    }
+                }
+            }
+        }
+        return latestFlussSnapshot;
+    }
+
+    @Override
+    public void close() throws Exception {}

Review Comment:
   close `allocator`?



##########
fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceLakeCommitter.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.tiering;
+
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.lake.committer.BucketOffset;
+import com.alibaba.fluss.lake.committer.CommittedLakeSnapshot;
+import com.alibaba.fluss.lake.committer.LakeCommitter;
+import com.alibaba.fluss.lake.lance.LanceConfig;
+import com.alibaba.fluss.lake.lance.utils.LanceDatasetAdapter;
+import com.alibaba.fluss.metadata.TablePath;
+import 
com.alibaba.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
com.alibaba.fluss.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import com.alibaba.fluss.utils.json.BucketOffsetJsonSerde;
+
+import com.lancedb.lance.Dataset;
+import com.lancedb.lance.FragmentMetadata;
+import com.lancedb.lance.ReadOptions;
+import com.lancedb.lance.Transaction;
+import com.lancedb.lance.Version;
+import org.apache.arrow.memory.RootAllocator;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static 
com.alibaba.fluss.lake.committer.BucketOffset.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
+import static 
com.alibaba.fluss.lake.writer.LakeTieringFactory.FLUSS_LAKE_TIERING_COMMIT_USER;
+
+/** Implementation of {@link LakeCommitter} for Lance. */
+public class LanceLakeCommitter implements LakeCommitter<LanceWriteResult, 
LanceCommittable> {
+    private final LanceConfig config;
+    private final RootAllocator allocator = new RootAllocator();
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    public LanceLakeCommitter(Configuration options, TablePath tablePath) {
+        this.config =
+                LanceConfig.from(
+                        options.toMap(),
+                        Collections.emptyMap(),
+                        tablePath.getDatabaseName(),
+                        tablePath.getTableName());
+    }
+
+    @Override
+    public LanceCommittable toCommittable(List<LanceWriteResult> 
lanceWriteResults)
+            throws IOException {
+        List<FragmentMetadata> fragments =
+                lanceWriteResults.stream()
+                        .map(LanceWriteResult::commitMessage)
+                        .flatMap(List::stream)
+                        .collect(Collectors.toList());
+        return new LanceCommittable(fragments);
+    }
+
+    @Override
+    public long commit(LanceCommittable committable, Map<String, String> 
snapshotProperties)
+            throws IOException {
+        Map<String, String> properties = new HashMap<>(snapshotProperties);
+        properties.put("commit-user", FLUSS_LAKE_TIERING_COMMIT_USER);
+        return LanceDatasetAdapter.commitAppend(config, 
committable.committable(), properties);
+    }
+
+    @Override
+    public void abort(LanceCommittable committable) throws IOException {}

Review Comment:
   please add a todo, seems lance-java missing abort api to delete files 
already written



##########
fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceLakeWriter.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.tiering;
+
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.lake.lance.LanceConfig;
+import com.alibaba.fluss.lake.lance.utils.LanceDatasetAdapter;
+import com.alibaba.fluss.lake.writer.LakeWriter;
+import com.alibaba.fluss.lake.writer.WriterInitContext;
+import com.alibaba.fluss.record.LogRecord;
+
+import com.lancedb.lance.FragmentMetadata;
+import com.lancedb.lance.WriteParams;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.FutureTask;
+
+/** Implementation of {@link LakeWriter} for Lance. */
+public class LanceLakeWriter implements LakeWriter<LanceWriteResult> {
+    private final LanceArrowWriter arrowWriter;
+    private final FutureTask<List<FragmentMetadata>> fragmentCreationTask;
+
+    public LanceLakeWriter(Configuration options, WriterInitContext 
writerInitContext)
+            throws IOException {
+        LanceConfig config =
+                LanceConfig.from(
+                        options.toMap(),
+                        writerInitContext.customProperties(),
+                        writerInitContext.tablePath().getDatabaseName(),
+                        writerInitContext.tablePath().getTableName());
+        int batchSize = LanceConfig.getBatchSize(config);
+        Optional<Schema> schema = LanceDatasetAdapter.getSchema(config);
+        if (!schema.isPresent()) {

Review Comment:
   nit:
   ```
   schema.isEmpty()
   ```



##########
fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceLakeCommitter.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.tiering;
+
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.lake.committer.BucketOffset;
+import com.alibaba.fluss.lake.committer.CommittedLakeSnapshot;
+import com.alibaba.fluss.lake.committer.LakeCommitter;
+import com.alibaba.fluss.lake.lance.LanceConfig;
+import com.alibaba.fluss.lake.lance.utils.LanceDatasetAdapter;
+import com.alibaba.fluss.metadata.TablePath;
+import 
com.alibaba.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
com.alibaba.fluss.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import com.alibaba.fluss.utils.json.BucketOffsetJsonSerde;
+
+import com.lancedb.lance.Dataset;
+import com.lancedb.lance.FragmentMetadata;
+import com.lancedb.lance.ReadOptions;
+import com.lancedb.lance.Transaction;
+import com.lancedb.lance.Version;
+import org.apache.arrow.memory.RootAllocator;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static 
com.alibaba.fluss.lake.committer.BucketOffset.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
+import static 
com.alibaba.fluss.lake.writer.LakeTieringFactory.FLUSS_LAKE_TIERING_COMMIT_USER;
+
+/** Implementation of {@link LakeCommitter} for Lance. */
+public class LanceLakeCommitter implements LakeCommitter<LanceWriteResult, 
LanceCommittable> {
+    private final LanceConfig config;
+    private final RootAllocator allocator = new RootAllocator();
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    public LanceLakeCommitter(Configuration options, TablePath tablePath) {
+        this.config =
+                LanceConfig.from(
+                        options.toMap(),
+                        Collections.emptyMap(),
+                        tablePath.getDatabaseName(),
+                        tablePath.getTableName());
+    }
+
+    @Override
+    public LanceCommittable toCommittable(List<LanceWriteResult> 
lanceWriteResults)
+            throws IOException {
+        List<FragmentMetadata> fragments =
+                lanceWriteResults.stream()
+                        .map(LanceWriteResult::commitMessage)
+                        .flatMap(List::stream)
+                        .collect(Collectors.toList());
+        return new LanceCommittable(fragments);
+    }
+
+    @Override
+    public long commit(LanceCommittable committable, Map<String, String> 
snapshotProperties)
+            throws IOException {
+        Map<String, String> properties = new HashMap<>(snapshotProperties);
+        properties.put("commit-user", FLUSS_LAKE_TIERING_COMMIT_USER);
+        return LanceDatasetAdapter.commitAppend(config, 
committable.committable(), properties);
+    }
+
+    @Override
+    public void abort(LanceCommittable committable) throws IOException {}
+
+    @SuppressWarnings("checkstyle:LocalVariableName")
+    @Nullable
+    @Override
+    public CommittedLakeSnapshot getMissingLakeSnapshot(@Nullable Long 
latestLakeSnapshotIdOfFluss)
+            throws IOException {
+        Transaction latestLakeSnapshotIdOfLake =
+                
getCommittedLatestSnapshotOfLake(FLUSS_LAKE_TIERING_COMMIT_USER);
+
+        if (latestLakeSnapshotIdOfLake == null) {
+            return null;
+        }
+
+        // we get the latest snapshot committed by fluss,
+        // but the latest snapshot is not greater than 
latestLakeSnapshotIdOfFluss, no any missing
+        // snapshot, return directly
+        if (latestLakeSnapshotIdOfFluss != null
+                && latestLakeSnapshotIdOfLake.readVersion() <= 
latestLakeSnapshotIdOfFluss) {
+            return null;
+        }
+
+        CommittedLakeSnapshot committedLakeSnapshot =
+                new 
CommittedLakeSnapshot(latestLakeSnapshotIdOfLake.readVersion());
+        String flussOffsetProperties =
+                latestLakeSnapshotIdOfLake
+                        .transactionProperties()
+                        .get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY);
+        for (JsonNode node : OBJECT_MAPPER.readTree(flussOffsetProperties)) {
+            BucketOffset bucketOffset = 
BucketOffsetJsonSerde.INSTANCE.deserialize(node);
+            if (bucketOffset.getPartitionId() != null) {
+                committedLakeSnapshot.addPartitionBucket(
+                        bucketOffset.getPartitionId(),
+                        bucketOffset.getPartitionQualifiedName(),
+                        bucketOffset.getBucket(),
+                        bucketOffset.getLogOffset());
+            } else {
+                committedLakeSnapshot.addBucket(
+                        bucketOffset.getBucket(), bucketOffset.getLogOffset());
+            }
+        }
+        return committedLakeSnapshot;
+    }
+
+    @Nullable
+    private Transaction getCommittedLatestSnapshotOfLake(String commitUser) {
+        Transaction latestFlussSnapshot = null;
+
+        ReadOptions.Builder builder = new ReadOptions.Builder();
+        builder.setStorageOptions(LanceConfig.genStorageOptions(config));
+        try (Dataset dataset = Dataset.open(allocator, config.getDatasetUri(), 
builder.build())) {
+            for (Version version : dataset.listVersions()) {

Review Comment:
   Just wondering can we iterate from end to start to find the latest snapshot 
committed by fluss faster?



##########
fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceLakeCommitter.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.tiering;
+
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.lake.committer.BucketOffset;
+import com.alibaba.fluss.lake.committer.CommittedLakeSnapshot;
+import com.alibaba.fluss.lake.committer.LakeCommitter;
+import com.alibaba.fluss.lake.lance.LanceConfig;
+import com.alibaba.fluss.lake.lance.utils.LanceDatasetAdapter;
+import com.alibaba.fluss.metadata.TablePath;
+import 
com.alibaba.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
com.alibaba.fluss.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import com.alibaba.fluss.utils.json.BucketOffsetJsonSerde;
+
+import com.lancedb.lance.Dataset;
+import com.lancedb.lance.FragmentMetadata;
+import com.lancedb.lance.ReadOptions;
+import com.lancedb.lance.Transaction;
+import com.lancedb.lance.Version;
+import org.apache.arrow.memory.RootAllocator;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static 
com.alibaba.fluss.lake.committer.BucketOffset.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
+import static 
com.alibaba.fluss.lake.writer.LakeTieringFactory.FLUSS_LAKE_TIERING_COMMIT_USER;
+
+/** Implementation of {@link LakeCommitter} for Lance. */
+public class LanceLakeCommitter implements LakeCommitter<LanceWriteResult, 
LanceCommittable> {
+    private final LanceConfig config;
+    private final RootAllocator allocator = new RootAllocator();
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    public LanceLakeCommitter(Configuration options, TablePath tablePath) {
+        this.config =
+                LanceConfig.from(
+                        options.toMap(),
+                        Collections.emptyMap(),
+                        tablePath.getDatabaseName(),
+                        tablePath.getTableName());
+    }
+
+    @Override
+    public LanceCommittable toCommittable(List<LanceWriteResult> 
lanceWriteResults)
+            throws IOException {
+        List<FragmentMetadata> fragments =
+                lanceWriteResults.stream()
+                        .map(LanceWriteResult::commitMessage)
+                        .flatMap(List::stream)
+                        .collect(Collectors.toList());
+        return new LanceCommittable(fragments);
+    }
+
+    @Override
+    public long commit(LanceCommittable committable, Map<String, String> 
snapshotProperties)
+            throws IOException {
+        Map<String, String> properties = new HashMap<>(snapshotProperties);
+        properties.put("commit-user", FLUSS_LAKE_TIERING_COMMIT_USER);
+        return LanceDatasetAdapter.commitAppend(config, 
committable.committable(), properties);
+    }
+
+    @Override
+    public void abort(LanceCommittable committable) throws IOException {}
+
+    @SuppressWarnings("checkstyle:LocalVariableName")
+    @Nullable
+    @Override
+    public CommittedLakeSnapshot getMissingLakeSnapshot(@Nullable Long 
latestLakeSnapshotIdOfFluss)
+            throws IOException {
+        Transaction latestLakeSnapshotIdOfLake =
+                
getCommittedLatestSnapshotOfLake(FLUSS_LAKE_TIERING_COMMIT_USER);
+
+        if (latestLakeSnapshotIdOfLake == null) {
+            return null;
+        }
+
+        // we get the latest snapshot committed by fluss,
+        // but the latest snapshot is not greater than 
latestLakeSnapshotIdOfFluss, no any missing
+        // snapshot, return directly
+        if (latestLakeSnapshotIdOfFluss != null
+                && latestLakeSnapshotIdOfLake.readVersion() <= 
latestLakeSnapshotIdOfFluss) {
+            return null;
+        }
+
+        CommittedLakeSnapshot committedLakeSnapshot =
+                new 
CommittedLakeSnapshot(latestLakeSnapshotIdOfLake.readVersion());
+        String flussOffsetProperties =
+                latestLakeSnapshotIdOfLake
+                        .transactionProperties()
+                        .get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY);
+        for (JsonNode node : OBJECT_MAPPER.readTree(flussOffsetProperties)) {
+            BucketOffset bucketOffset = 
BucketOffsetJsonSerde.INSTANCE.deserialize(node);
+            if (bucketOffset.getPartitionId() != null) {
+                committedLakeSnapshot.addPartitionBucket(
+                        bucketOffset.getPartitionId(),
+                        bucketOffset.getPartitionQualifiedName(),
+                        bucketOffset.getBucket(),
+                        bucketOffset.getLogOffset());
+            } else {
+                committedLakeSnapshot.addBucket(
+                        bucketOffset.getBucket(), bucketOffset.getLogOffset());
+            }
+        }
+        return committedLakeSnapshot;
+    }
+
+    @Nullable
+    private Transaction getCommittedLatestSnapshotOfLake(String commitUser) {
+        Transaction latestFlussSnapshot = null;
+
+        ReadOptions.Builder builder = new ReadOptions.Builder();
+        builder.setStorageOptions(LanceConfig.genStorageOptions(config));
+        try (Dataset dataset = Dataset.open(allocator, config.getDatasetUri(), 
builder.build())) {
+            for (Version version : dataset.listVersions()) {
+                builder.setVersion((int) version.getId());
+                try (Dataset datasetVersion =
+                        Dataset.open(allocator, config.getDatasetUri(), 
builder.build())) {
+                    Transaction transaction = 
datasetVersion.readTransaction().orElse(null);
+                    if (transaction != null
+                            && commitUser.equals(
+                                    
transaction.transactionProperties().get("commit-user"))) {

Review Comment:
   extract `commit-user` to a private static variable?



##########
fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceCommittableSerializer.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.tiering;
+
+import com.alibaba.fluss.lake.serializer.SimpleVersionedSerializer;
+
+import com.lancedb.lance.FragmentMetadata;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.List;
+
+/** The serializer of {@link LanceCommittable}. */
+public class LanceCommittableSerializer implements 
SimpleVersionedSerializer<LanceCommittable> {
+    private static final int CURRENT_VERSION = 1;
+
+    @Override
+    public int getVersion() {
+        return CURRENT_VERSION;
+    }
+
+    @Override
+    public byte[] serialize(LanceCommittable lanceCommittable) throws 
IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        ObjectOutputStream oos = new ObjectOutputStream(baos);
+        oos.writeObject(lanceCommittable.committable());
+        oos.close();
+        return baos.toByteArray();
+    }
+
+    @Override
+    public LanceCommittable deserialize(int version, byte[] serialized) throws 
IOException {
+        if (version != CURRENT_VERSION) {
+            throw new UnsupportedOperationException(
+                    "Expecting LanceCommittable version to be "
+                            + CURRENT_VERSION
+                            + ", but found "
+                            + version
+                            + ".");
+        }
+        ByteArrayInputStream bais = new ByteArrayInputStream(serialized);

Review Comment:
   nit:
   ```
   try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
                   ObjectInputStream ois = new ObjectInputStream(bais)) {
               //noinspection unchecked
               return new LanceCommittable((List<FragmentMetadata>) 
ois.readObject());
           } catch (ClassNotFoundException e) {
               throw new IOException("Couldn't deserialize LanceCommittable", 
e);
           }
   ```



##########
fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceWriteResultSerializer.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.tiering;
+
+import com.alibaba.fluss.lake.serializer.SimpleVersionedSerializer;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+/** The {@link SimpleVersionedSerializer} for {@link LanceWriteResult}. */
+public class LanceWriteResultSerializer implements 
SimpleVersionedSerializer<LanceWriteResult> {
+    private static final int CURRENT_VERSION = 1;
+
+    @Override
+    public int getVersion() {
+        return CURRENT_VERSION;
+    }
+
+    @Override
+    public byte[] serialize(LanceWriteResult lanceWriteResult) throws 
IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();

Review Comment:
   dito 



##########
fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceLakeWriter.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.tiering;
+
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.lake.lance.LanceConfig;
+import com.alibaba.fluss.lake.lance.utils.LanceDatasetAdapter;
+import com.alibaba.fluss.lake.writer.LakeWriter;
+import com.alibaba.fluss.lake.writer.WriterInitContext;
+import com.alibaba.fluss.record.LogRecord;
+
+import com.lancedb.lance.FragmentMetadata;
+import com.lancedb.lance.WriteParams;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.FutureTask;
+
+/** Implementation of {@link LakeWriter} for Lance. */
+public class LanceLakeWriter implements LakeWriter<LanceWriteResult> {
+    private final LanceArrowWriter arrowWriter;
+    private final FutureTask<List<FragmentMetadata>> fragmentCreationTask;
+
+    public LanceLakeWriter(Configuration options, WriterInitContext 
writerInitContext)
+            throws IOException {
+        LanceConfig config =
+                LanceConfig.from(
+                        options.toMap(),
+                        writerInitContext.customProperties(),
+                        writerInitContext.tablePath().getDatabaseName(),
+                        writerInitContext.tablePath().getTableName());
+        int batchSize = LanceConfig.getBatchSize(config);
+        Optional<Schema> schema = LanceDatasetAdapter.getSchema(config);
+        if (!schema.isPresent()) {
+            throw new IOException("Fail to get dataset " + 
config.getDatasetUri() + " in Lance.");
+        }
+
+        this.arrowWriter =
+                LanceDatasetAdapter.getArrowWriter(
+                        schema.get(), batchSize, 
writerInitContext.schema().getRowType());
+
+        WriteParams params = LanceConfig.genWriteParamsFromConfig(config);
+        Callable<List<FragmentMetadata>> fragmentCreator =
+                () ->
+                        LanceDatasetAdapter.createFragment(
+                                config.getDatasetUri(), arrowWriter, params);
+        fragmentCreationTask = new FutureTask<>(fragmentCreator);
+        Thread fragmentCreationThread = new Thread(fragmentCreationTask);
+        fragmentCreationThread.start();
+    }
+
+    @Override
+    public void write(LogRecord record) throws IOException {
+        arrowWriter.write(record);
+    }
+
+    @Override
+    public LanceWriteResult complete() throws IOException {
+        arrowWriter.setFinished();
+        try {
+            List<FragmentMetadata> fragmentMetadata = 
fragmentCreationTask.get();
+            return new LanceWriteResult(fragmentMetadata);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new IOException("Interrupted while waiting for reader thread 
to finish", e);

Review Comment:
   why waiting  `reader` thread?



##########
fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceLakeWriter.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.tiering;
+
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.lake.lance.LanceConfig;
+import com.alibaba.fluss.lake.lance.utils.LanceDatasetAdapter;
+import com.alibaba.fluss.lake.writer.LakeWriter;
+import com.alibaba.fluss.lake.writer.WriterInitContext;
+import com.alibaba.fluss.record.LogRecord;
+
+import com.lancedb.lance.FragmentMetadata;
+import com.lancedb.lance.WriteParams;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.FutureTask;
+
+/** Implementation of {@link LakeWriter} for Lance. */
+public class LanceLakeWriter implements LakeWriter<LanceWriteResult> {
+    private final LanceArrowWriter arrowWriter;
+    private final FutureTask<List<FragmentMetadata>> fragmentCreationTask;
+
+    public LanceLakeWriter(Configuration options, WriterInitContext 
writerInitContext)
+            throws IOException {
+        LanceConfig config =
+                LanceConfig.from(
+                        options.toMap(),
+                        writerInitContext.customProperties(),
+                        writerInitContext.tablePath().getDatabaseName(),
+                        writerInitContext.tablePath().getTableName());
+        int batchSize = LanceConfig.getBatchSize(config);
+        Optional<Schema> schema = LanceDatasetAdapter.getSchema(config);
+        if (!schema.isPresent()) {
+            throw new IOException("Fail to get dataset " + 
config.getDatasetUri() + " in Lance.");
+        }
+
+        this.arrowWriter =
+                LanceDatasetAdapter.getArrowWriter(
+                        schema.get(), batchSize, 
writerInitContext.schema().getRowType());
+
+        WriteParams params = LanceConfig.genWriteParamsFromConfig(config);
+        Callable<List<FragmentMetadata>> fragmentCreator =
+                () ->
+                        LanceDatasetAdapter.createFragment(
+                                config.getDatasetUri(), arrowWriter, params);
+        fragmentCreationTask = new FutureTask<>(fragmentCreator);
+        Thread fragmentCreationThread = new Thread(fragmentCreationTask);
+        fragmentCreationThread.start();
+    }
+
+    @Override
+    public void write(LogRecord record) throws IOException {
+        arrowWriter.write(record);
+    }
+
+    @Override
+    public LanceWriteResult complete() throws IOException {
+        arrowWriter.setFinished();
+        try {
+            List<FragmentMetadata> fragmentMetadata = 
fragmentCreationTask.get();
+            return new LanceWriteResult(fragmentMetadata);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new IOException("Interrupted while waiting for reader thread 
to finish", e);
+        } catch (ExecutionException e) {
+            throw new IOException("Exception in reader thread", e);

Review Comment:
   why is `reader thread`?



##########
fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceWriteResultSerializer.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.tiering;
+
+import com.alibaba.fluss.lake.serializer.SimpleVersionedSerializer;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+/** The {@link SimpleVersionedSerializer} for {@link LanceWriteResult}. */
+public class LanceWriteResultSerializer implements 
SimpleVersionedSerializer<LanceWriteResult> {
+    private static final int CURRENT_VERSION = 1;
+
+    @Override
+    public int getVersion() {
+        return CURRENT_VERSION;
+    }
+
+    @Override
+    public byte[] serialize(LanceWriteResult lanceWriteResult) throws 
IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        ObjectOutputStream oos = new ObjectOutputStream(baos);
+        oos.writeObject(lanceWriteResult);
+        oos.close();
+        return baos.toByteArray();
+    }
+
+    @Override
+    public LanceWriteResult deserialize(int version, byte[] serialized) throws 
IOException {
+        ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
+        ObjectInputStream ois = new ObjectInputStream(bais);

Review Comment:
   dito



##########
fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/tiering/LanceArrowWriterTest.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.tiering;
+
+import com.alibaba.fluss.lake.lance.utils.LanceArrowUtils;
+import com.alibaba.fluss.record.GenericRecord;
+import com.alibaba.fluss.record.LogRecord;
+import com.alibaba.fluss.row.GenericRow;
+import com.alibaba.fluss.types.DataField;
+import com.alibaba.fluss.types.DataTypes;
+import com.alibaba.fluss.types.RowType;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static com.alibaba.fluss.record.ChangeType.APPEND_ONLY;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** The UT for Lance Arrow Writer. */
+public class LanceArrowWriterTest {
+    @Test
+    public void testLanceArrowWriter() throws Exception {
+        try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
+            List<DataField> fields = Arrays.asList(new DataField("column1", 
DataTypes.INT()));
+
+            RowType rowType = new RowType(fields);
+            final int totalRows = 125;
+            final int batchSize = 34;
+
+            final LanceArrowWriter arrowWriter =
+                    new LanceArrowWriter(
+                            allocator, LanceArrowUtils.toArrowSchema(rowType), 
batchSize, rowType);
+            AtomicInteger rowsWritten = new AtomicInteger(0);
+            AtomicInteger rowsRead = new AtomicInteger(0);
+
+            Thread writerThread =

Review Comment:
   The test logic looks strange to me. Do we really need this test? I prefer to 
delete this test since we have LanceTieringTest



##########
fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/tiering/LanceArrowWriterTest.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.tiering;
+
+import com.alibaba.fluss.lake.lance.utils.LanceArrowUtils;
+import com.alibaba.fluss.record.GenericRecord;
+import com.alibaba.fluss.record.LogRecord;
+import com.alibaba.fluss.row.GenericRow;
+import com.alibaba.fluss.types.DataField;
+import com.alibaba.fluss.types.DataTypes;
+import com.alibaba.fluss.types.RowType;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static com.alibaba.fluss.record.ChangeType.APPEND_ONLY;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** The UT for Lance Arrow Writer. */
+public class LanceArrowWriterTest {
+    @Test
+    public void testLanceArrowWriter() throws Exception {

Review Comment:
   ```suggestion
       void testLanceArrowWriter() throws Exception {
   ```



##########
fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/utils/LanceDatasetAdapter.java:
##########
@@ -46,4 +58,51 @@ public static Optional<Schema> getSchema(LanceConfig config) 
{
             return Optional.empty();
         }
     }
+
+    public static long appendFragments(LanceConfig config, 
List<FragmentMetadata> fragments) {

Review Comment:
   this method is not used



##########
fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/utils/LanceDatasetAdapter.java:
##########
@@ -46,4 +58,51 @@ public static Optional<Schema> getSchema(LanceConfig config) 
{
             return Optional.empty();
         }
     }
+
+    public static long appendFragments(LanceConfig config, 
List<FragmentMetadata> fragments) {
+        FragmentOperation.Append appendOp = new 
FragmentOperation.Append(fragments);
+        String uri = config.getDatasetUri();
+        ReadOptions options = LanceConfig.genReadOptionFromConfig(config);
+        try (Dataset datasetRead = Dataset.open(allocator, uri, options)) {
+            Dataset datasetWrite =
+                    Dataset.commit(
+                            allocator,
+                            config.getDatasetUri(),
+                            appendOp,
+                            java.util.Optional.of(datasetRead.version()),
+                            options.getStorageOptions());
+            long version = datasetWrite.version();
+            datasetWrite.close();
+            // Dataset.create returns version 1
+            return version - 1;
+        }
+    }
+
+    public static long commitAppend(
+            LanceConfig config, List<FragmentMetadata> fragments, Map<String, 
String> properties) {
+        String uri = config.getDatasetUri();
+        ReadOptions options = LanceConfig.genReadOptionFromConfig(config);
+        try (Dataset dataset = Dataset.open(allocator, uri, options)) {
+            Transaction transaction =
+                    dataset.newTransactionBuilder()
+                            
.operation(Append.builder().fragments(fragments).build())
+                            .transactionProperties(properties)
+                            .build();
+            try (Dataset appendedDataset = transaction.commit()) {

Review Comment:
   Look strange, just double check, the version of the committed result 
`appendedDataset` is latest version + 1?



##########
fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/tiering/LanceTieringITCase.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.tiering;
+
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.lake.lance.LanceConfig;
+import com.alibaba.fluss.lake.lance.testutils.FlinkLanceTieringTestBase;
+import com.alibaba.fluss.metadata.TableBucket;
+import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.row.InternalRow;
+
+import com.lancedb.lance.Dataset;
+import com.lancedb.lance.ReadOptions;
+import com.lancedb.lance.Transaction;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static 
com.alibaba.fluss.lake.committer.BucketOffset.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
+import static 
com.alibaba.fluss.lake.writer.LakeTieringFactory.FLUSS_LAKE_TIERING_COMMIT_USER;
+import static com.alibaba.fluss.testutils.DataTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT case for tiering tables to lance. */
+public class LanceTieringITCase extends FlinkLanceTieringTestBase {

Review Comment:
   nit:
   ```suggestion
   class LanceTieringITCase extends FlinkLanceTieringTestBase {
   ```



##########
fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/tiering/LanceTieringTest.java:
##########
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.tiering;
+
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.lake.committer.CommittedLakeSnapshot;
+import com.alibaba.fluss.lake.committer.LakeCommitter;
+import com.alibaba.fluss.lake.lance.LanceConfig;
+import com.alibaba.fluss.lake.lance.utils.LanceArrowUtils;
+import com.alibaba.fluss.lake.lance.utils.LanceDatasetAdapter;
+import com.alibaba.fluss.lake.serializer.SimpleVersionedSerializer;
+import com.alibaba.fluss.lake.writer.LakeWriter;
+import com.alibaba.fluss.lake.writer.WriterInitContext;
+import com.alibaba.fluss.metadata.Schema;
+import com.alibaba.fluss.metadata.TableBucket;
+import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.record.ChangeType;
+import com.alibaba.fluss.record.GenericRecord;
+import com.alibaba.fluss.record.LogRecord;
+import com.alibaba.fluss.row.BinaryString;
+import com.alibaba.fluss.row.GenericRow;
+import com.alibaba.fluss.types.DataTypes;
+import com.alibaba.fluss.utils.types.Tuple2;
+
+import com.lancedb.lance.Dataset;
+import com.lancedb.lance.WriteParams;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static 
com.alibaba.fluss.flink.tiering.committer.TieringCommitOperator.toBucketOffsetsProperty;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** The UT for tiering to Lance via {@link LanceLakeTieringFactory}. */
+public class LanceTieringTest {

Review Comment:
   nit:
   ```suggestion
   class LanceTieringTest {
   ```



##########
fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceArrowWriter.java:
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.tiering;
+
+import com.alibaba.fluss.record.LogRecord;
+import com.alibaba.fluss.types.RowType;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+import java.io.IOException;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static com.alibaba.fluss.utils.Preconditions.checkArgument;
+import static com.alibaba.fluss.utils.Preconditions.checkNotNull;
+
+/** A custom arrow reader that supports writes Fluss internal rows while 
reading data in batches. */
+public class LanceArrowWriter extends ArrowReader {

Review Comment:
   +1 for it also confuses me...I think it's super hack



##########
fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceLakeWriter.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.tiering;
+
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.lake.lance.LanceConfig;
+import com.alibaba.fluss.lake.lance.utils.LanceDatasetAdapter;
+import com.alibaba.fluss.lake.writer.LakeWriter;
+import com.alibaba.fluss.lake.writer.WriterInitContext;
+import com.alibaba.fluss.record.LogRecord;
+
+import com.lancedb.lance.FragmentMetadata;
+import com.lancedb.lance.WriteParams;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.FutureTask;
+
+/** Implementation of {@link LakeWriter} for Lance. */
+public class LanceLakeWriter implements LakeWriter<LanceWriteResult> {
+    private final LanceArrowWriter arrowWriter;
+    private final FutureTask<List<FragmentMetadata>> fragmentCreationTask;
+
+    public LanceLakeWriter(Configuration options, WriterInitContext 
writerInitContext)
+            throws IOException {
+        LanceConfig config =
+                LanceConfig.from(
+                        options.toMap(),
+                        writerInitContext.customProperties(),
+                        writerInitContext.tablePath().getDatabaseName(),
+                        writerInitContext.tablePath().getTableName());
+        int batchSize = LanceConfig.getBatchSize(config);
+        Optional<Schema> schema = LanceDatasetAdapter.getSchema(config);
+        if (!schema.isPresent()) {
+            throw new IOException("Fail to get dataset " + 
config.getDatasetUri() + " in Lance.");
+        }
+
+        this.arrowWriter =
+                LanceDatasetAdapter.getArrowWriter(
+                        schema.get(), batchSize, 
writerInitContext.schema().getRowType());
+
+        WriteParams params = LanceConfig.genWriteParamsFromConfig(config);
+        Callable<List<FragmentMetadata>> fragmentCreator =
+                () ->
+                        LanceDatasetAdapter.createFragment(
+                                config.getDatasetUri(), arrowWriter, params);
+        fragmentCreationTask = new FutureTask<>(fragmentCreator);
+        Thread fragmentCreationThread = new Thread(fragmentCreationTask);
+        fragmentCreationThread.start();

Review Comment:
   won't we shutdown the fragmentCreationThread in close method?



##########
fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/LanceConfig.java:
##########
@@ -64,11 +58,17 @@ public LanceConfig(
     }
 
     public static LanceConfig from(

Review Comment:
   `builder.setIndexCacheSize`
   `builder.setMetadataCacheSize`
   is deprecated in LanceConfig#genReadOptionFromConfig



##########
fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/tiering/LanceArrowWriterTest.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.tiering;
+
+import com.alibaba.fluss.lake.lance.utils.LanceArrowUtils;
+import com.alibaba.fluss.record.GenericRecord;
+import com.alibaba.fluss.record.LogRecord;
+import com.alibaba.fluss.row.GenericRow;
+import com.alibaba.fluss.types.DataField;
+import com.alibaba.fluss.types.DataTypes;
+import com.alibaba.fluss.types.RowType;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static com.alibaba.fluss.record.ChangeType.APPEND_ONLY;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** The UT for Lance Arrow Writer. */
+public class LanceArrowWriterTest {

Review Comment:
   nit:
   ```suggestion
   class LanceArrowWriterTest {
   ```



##########
fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/LanceConfig.java:
##########
@@ -130,24 +130,8 @@ public static WriteParams 
genWriteParamsFromConfig(LanceConfig config) {
         return builder.build();
     }
 
-    private static Map<String, String> genStorageOptions(LanceConfig config) {

Review Comment:
   Just wondering is it possble to pass a map to create `ReadOptions` or 
`WriteOptions` so that the caller itself won't  need to parse `version`, 
`blockSize`, and then set `version`, `blockSize` via ReadOptions#Builder. If 
new option is added in `ReadOptions`, caller maywell forget the new option to 
set to the builder.



##########
fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/tiering/LanceTieringITCase.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.tiering;
+
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.lake.lance.LanceConfig;
+import com.alibaba.fluss.lake.lance.testutils.FlinkLanceTieringTestBase;
+import com.alibaba.fluss.metadata.TableBucket;
+import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.row.InternalRow;
+
+import com.lancedb.lance.Dataset;
+import com.lancedb.lance.ReadOptions;
+import com.lancedb.lance.Transaction;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static 
com.alibaba.fluss.lake.committer.BucketOffset.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
+import static 
com.alibaba.fluss.lake.writer.LakeTieringFactory.FLUSS_LAKE_TIERING_COMMIT_USER;
+import static com.alibaba.fluss.testutils.DataTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT case for tiering tables to lance. */
+public class LanceTieringITCase extends FlinkLanceTieringTestBase {
+    protected static final String DEFAULT_DB = "fluss";
+    private static StreamExecutionEnvironment execEnv;
+    private static Configuration lanceConf;
+    private static final RootAllocator allocator = new RootAllocator();
+
+    @BeforeAll
+    protected static void beforeAll() {
+        FlinkLanceTieringTestBase.beforeAll();
+        execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+        execEnv.setParallelism(2);
+        execEnv.enableCheckpointing(1000);
+        lanceConf = Configuration.fromMap(getLanceCatalogConf());
+    }
+
+    @Test
+    void testTiering() throws Exception {
+        // create log table
+        TablePath t1 = TablePath.of(DEFAULT_DB, "logTable");
+        long t1Id = createLogTable(t1);
+        TableBucket t1Bucket = new TableBucket(t1Id, 0);
+        List<InternalRow> flussRows = new ArrayList<>();
+        // write records
+        for (int i = 0; i < 10; i++) {
+            List<InternalRow> rows = Arrays.asList(row(1, "v1"), row(2, "v2"), 
row(3, "v3"));
+            flussRows.addAll(rows);
+            // write records
+            writeRows(t1, rows, true);
+        }
+
+        // then start tiering job
+        JobClient jobClient = buildTieringJob(execEnv);
+
+        // check the status of replica after synced;
+        // note: we can't update log start offset for unaware bucket mode log 
table
+        assertReplicaStatus(t1Bucket, 30);
+
+        LanceConfig config =
+                LanceConfig.from(
+                        lanceConf.toMap(),
+                        Collections.emptyMap(),
+                        t1.getDatabaseName(),
+                        t1.getTableName());
+
+        // check data in lance
+        checkDataInLanceAppendOnlyTable(config, flussRows);
+        // check snapshot property in lance
+        Map<String, String> properties =
+                new HashMap<String, String>() {
+                    {
+                        put(
+                                FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
+                                "[{\"bucket_id\":0,\"log_offset\":30}]");
+                        put("commit-user", FLUSS_LAKE_TIERING_COMMIT_USER);
+                    }
+                };
+        checkSnapshotPropertyInLance(config, properties);
+
+        jobClient.cancel().get();
+    }
+
+    private void checkSnapshotPropertyInLance(
+            LanceConfig config, Map<String, String> expectedProperties) throws 
Exception {
+        ReadOptions.Builder builder = new ReadOptions.Builder();
+        builder.setStorageOptions(LanceConfig.genStorageOptions(config));
+        try (Dataset dataset = Dataset.open(allocator, config.getDatasetUri(), 
builder.build())) {
+            Transaction transaction = dataset.readTransaction().orElse(null);
+            assertThat(transaction).isNotNull();
+            
assertThat(transaction.transactionProperties()).isEqualTo(expectedProperties);
+        }
+    }
+
+    private void checkDataInLanceAppendOnlyTable(LanceConfig config, 
List<InternalRow> expectedRows)
+            throws Exception {
+

Review Comment:
   nit: remove this blank line.



##########
fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceArrowWriter.java:
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.tiering;
+
+import com.alibaba.fluss.record.LogRecord;
+import com.alibaba.fluss.types.RowType;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+import java.io.IOException;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static com.alibaba.fluss.utils.Preconditions.checkArgument;
+import static com.alibaba.fluss.utils.Preconditions.checkNotNull;
+
+/** A custom arrow reader that supports writes Fluss internal rows while 
reading data in batches. */
+public class LanceArrowWriter extends ArrowReader {

Review Comment:
   Is it possible for us to use `Fragment#create(String datasetUri, 
BufferAllocator allocator, VectorSchemaRoot root, WriteParams params)`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to