leonardBang commented on code in PR #1441:
URL: https://github.com/apache/fluss/pull/1441#discussion_r2280920779


##########
fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/tiering/LanceArrowWriterTest.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.tiering;
+
+import com.alibaba.fluss.lake.lance.utils.LanceArrowUtils;
+import com.alibaba.fluss.record.GenericRecord;
+import com.alibaba.fluss.record.LogRecord;
+import com.alibaba.fluss.row.GenericRow;
+import com.alibaba.fluss.types.DataField;
+import com.alibaba.fluss.types.DataTypes;
+import com.alibaba.fluss.types.RowType;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.VectorUnloader;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static com.alibaba.fluss.record.ChangeType.APPEND_ONLY;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** The UT for Lance Arrow Writer. */
+public class LanceArrowWriterTest {
+    @Test
+    public void test() throws Exception {

Review Comment:
   Could you give a meaningful case name?



##########
fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/tiering/LanceTieringITCase.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.tiering;
+
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.lake.lance.LanceConfig;
+import com.alibaba.fluss.lake.lance.testutils.FlinkLanceTieringTestBase;
+import com.alibaba.fluss.lake.lance.utils.LanceDatasetAdapter;
+import com.alibaba.fluss.metadata.TableBucket;
+import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.row.InternalRow;
+
+import com.lancedb.lance.Dataset;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static 
com.alibaba.fluss.lake.committer.BucketOffset.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
+import static com.alibaba.fluss.testutils.DataTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT case for tiering tables to lance. */
+public class LanceTieringITCase extends FlinkLanceTieringTestBase {
+    protected static final String DEFAULT_DB = "fluss";
+    private static StreamExecutionEnvironment execEnv;
+    private static Configuration lanceConf;
+
+    @BeforeAll
+    protected static void beforeAll() {
+        FlinkLanceTieringTestBase.beforeAll();
+        execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+        execEnv.setParallelism(2);
+        execEnv.enableCheckpointing(1000);
+        lanceConf = Configuration.fromMap(getLanceCatalogConf());
+    }
+
+    @Test
+    void testTiering() throws Exception {
+        // create log table
+        TablePath t1 = TablePath.of(DEFAULT_DB, "logTable");
+        long t1Id = createLogTable(t1);
+        TableBucket t1Bucket = new TableBucket(t1Id, 0);
+        List<InternalRow> flussRows = new ArrayList<>();
+        // write records
+        for (int i = 0; i < 10; i++) {
+            List<InternalRow> rows = Arrays.asList(row(1, "v1"), row(2, "v2"), 
row(3, "v3"));
+            flussRows.addAll(rows);
+            // write records
+            writeRows(t1, rows, true);
+        }
+
+        // then start tiering job
+        JobClient jobClient = buildTieringJob(execEnv);
+
+        // check the status of replica after synced;
+        // note: we can't update log start offset for unaware bucket mode log 
table
+        assertReplicaStatus(t1Bucket, 30);
+
+        LanceConfig config =
+                LanceConfig.from(
+                        lanceConf.toMap(),
+                        Collections.emptyMap(),
+                        t1.getDatabaseName(),
+                        t1.getTableName());
+
+        // check data in lance
+        checkDataInLanceAppendOnlyTable(config, flussRows);
+        // check snapshot property in lance
+        Map<String, String> properties =
+                new HashMap<String, String>() {
+                    {
+                        put(
+                                FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
+                                "[{\"bucket_id\":0,\"log_offset\":30}]");
+                    }
+                };
+        checkSnapshotPropertyInLance(config, properties);
+
+        jobClient.cancel().get();
+    }
+
+    private void checkSnapshotPropertyInLance(
+            LanceConfig config, Map<String, String> expectedProperties) throws 
Exception {
+        Map<String, String> transactionProperties =
+                LanceDatasetAdapter.getTransactionProperties(config, null);
+        assertThat(transactionProperties).isEqualTo(expectedProperties);
+    }
+
+    private void checkDataInLanceAppendOnlyTable(LanceConfig config, 
List<InternalRow> expectedRows)
+            throws Exception {
+
+        try (Dataset dataset =
+                Dataset.open(
+                        new RootAllocator(),
+                        config.getDatasetUri(),
+                        LanceConfig.genReadOptionFromConfig(config))) {
+            ArrowReader reader = dataset.newScan().scanBatches();
+            VectorSchemaRoot readerRoot = reader.getVectorSchemaRoot();
+            //        while (reader.loadNextBatch()) {
+            //            System.out.print(readerRoot.contentToTSVString());
+            //        }
+            reader.loadNextBatch();

Review Comment:
   ditto



##########
fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceArrowWriter.java:
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.tiering;
+
+import com.alibaba.fluss.record.LogRecord;
+import com.alibaba.fluss.types.RowType;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+import java.io.IOException;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static com.alibaba.fluss.utils.Preconditions.checkArgument;
+import static com.alibaba.fluss.utils.Preconditions.checkNotNull;
+
+/** A custom arrow reader that supports writes Fluss internal rows while 
reading data in batches. */
+public class LanceArrowWriter extends ArrowReader {

Review Comment:
   LanceArrowWriter  extends ArrowReader ?  The design confuse me at the first 
glance, the reader and writer typically represent opposite responsibilities.
   



##########
fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/LanceConfig.java:
##########
@@ -37,12 +37,6 @@ public class LanceConfig implements Serializable {
     private static final String max_row_per_file = "max_row_per_file";
     private static final String max_rows_per_group = "max_rows_per_group";
     private static final String max_bytes_per_file = "max_bytes_per_file";
-    private static final String ak = "access_key_id";
-    private static final String sk = "secret_access_key";
-    private static final String endpoint = "aws_endpoint";
-    private static final String region = "aws_region";

Review Comment:
   Ops, Could you double check the configs that are really required for lance?



##########
fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/tiering/LanceTieringTest.java:
##########
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.tiering;
+
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.lake.committer.CommittedLakeSnapshot;
+import com.alibaba.fluss.lake.committer.LakeCommitter;
+import com.alibaba.fluss.lake.lance.LanceConfig;
+import com.alibaba.fluss.lake.lance.utils.LanceArrowUtils;
+import com.alibaba.fluss.lake.lance.utils.LanceDatasetAdapter;
+import com.alibaba.fluss.lake.serializer.SimpleVersionedSerializer;
+import com.alibaba.fluss.lake.writer.LakeWriter;
+import com.alibaba.fluss.lake.writer.WriterInitContext;
+import com.alibaba.fluss.metadata.Schema;
+import com.alibaba.fluss.metadata.TableBucket;
+import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.record.ChangeType;
+import com.alibaba.fluss.record.GenericRecord;
+import com.alibaba.fluss.record.LogRecord;
+import com.alibaba.fluss.row.BinaryString;
+import com.alibaba.fluss.row.GenericRow;
+import com.alibaba.fluss.types.DataTypes;
+import com.alibaba.fluss.utils.types.Tuple2;
+
+import com.lancedb.lance.Dataset;
+import com.lancedb.lance.WriteParams;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static 
com.alibaba.fluss.flink.tiering.committer.TieringCommitOperator.toBucketOffsetsProperty;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** The UT for tiering to Lance via {@link LanceLakeTieringFactory}. */
+public class LanceTieringTest {
+    private @TempDir File tempWarehouseDir;
+    private LanceLakeTieringFactory lanceLakeTieringFactory;
+    private Configuration configuration;
+
+    @BeforeEach
+    void beforeEach() {
+        configuration = new Configuration();
+        configuration.setString("warehouse", tempWarehouseDir.toString());
+        lanceLakeTieringFactory = new LanceLakeTieringFactory(configuration);
+    }
+
+    private static Stream<Arguments> tieringWriteArgs() {
+        return Stream.of(Arguments.of(false), Arguments.of(true));
+    }
+
+    @ParameterizedTest
+    @MethodSource("tieringWriteArgs")
+    void testTieringWriteTable(boolean isPartitioned) throws Exception {
+        int bucketNum = 3;
+        TablePath tablePath = TablePath.of("lance", "logTable");
+        Map<String, String> customProperties = new HashMap<>();
+        customProperties.put("lance.batch_size", "256");
+        LanceConfig config =
+                LanceConfig.from(
+                        configuration.toMap(),
+                        customProperties,
+                        tablePath.getDatabaseName(),
+                        tablePath.getTableName());
+        Schema schema = createTable(config);
+
+        List<LanceWriteResult> lanceWriteResults = new ArrayList<>();
+        SimpleVersionedSerializer<LanceWriteResult> writeResultSerializer =
+                lanceLakeTieringFactory.getWriteResultSerializer();
+        SimpleVersionedSerializer<LanceCommittable> committableSerializer =
+                lanceLakeTieringFactory.getCommittableSerializer();
+
+        try (LakeCommitter<LanceWriteResult, LanceCommittable> lakeCommitter =
+                createLakeCommitter(tablePath)) {
+            // should no any missing snapshot
+            assertThat(lakeCommitter.getMissingLakeSnapshot(1L)).isNull();
+        }
+
+        Map<Tuple2<String, Integer>, List<LogRecord>> recordsByBucket = new 
HashMap<>();
+        Map<Long, String> partitionIdAndName =
+                isPartitioned
+                        ? new HashMap<Long, String>() {
+                            {
+                                put(1L, "p1");
+                                put(2L, "p2");
+                                put(3L, "p3");
+                            }
+                        }
+                        : Collections.singletonMap(null, null);
+        List<String> partitionKeys = isPartitioned ? Arrays.asList("c3") : 
null;
+        Map<TableBucket, Long> tableBucketOffsets = new HashMap<>();
+        // first, write data
+        for (int bucket = 0; bucket < bucketNum; bucket++) {
+            for (Map.Entry<Long, String> entry : 
partitionIdAndName.entrySet()) {
+                String partition = entry.getValue();
+                try (LakeWriter<LanceWriteResult> lakeWriter =
+                        createLakeWriter(tablePath, bucket, partition, schema, 
customProperties)) {
+                    Tuple2<String, Integer> partitionBucket = 
Tuple2.of(partition, bucket);
+                    Tuple2<List<LogRecord>, List<LogRecord>> 
writeAndExpectRecords =
+                            genLogTableRecords(partition, bucket, 10);
+                    List<LogRecord> writtenRecords = writeAndExpectRecords.f0;
+                    List<LogRecord> expectRecords = writeAndExpectRecords.f1;
+                    recordsByBucket.put(partitionBucket, expectRecords);
+                    tableBucketOffsets.put(new TableBucket(0, entry.getKey(), 
bucket), 10L);
+                    for (LogRecord logRecord : writtenRecords) {
+                        lakeWriter.write(logRecord);
+                    }
+                    // serialize/deserialize writeResult
+                    LanceWriteResult lanceWriteResult = lakeWriter.complete();
+                    byte[] serialized = 
writeResultSerializer.serialize(lanceWriteResult);
+                    lanceWriteResults.add(
+                            writeResultSerializer.deserialize(
+                                    writeResultSerializer.getVersion(), 
serialized));
+                }
+            }
+        }
+
+        // second, commit data
+        try (LakeCommitter<LanceWriteResult, LanceCommittable> lakeCommitter =
+                createLakeCommitter(tablePath)) {
+            // serialize/deserialize committable
+            LanceCommittable lanceCommittable = 
lakeCommitter.toCommittable(lanceWriteResults);
+            byte[] serialized = 
committableSerializer.serialize(lanceCommittable);
+            lanceCommittable =
+                    committableSerializer.deserialize(
+                            committableSerializer.getVersion(), serialized);
+            long snapshot =
+                    lakeCommitter.commit(
+                            lanceCommittable,
+                            toBucketOffsetsProperty(
+                                    tableBucketOffsets, partitionIdAndName, 
partitionKeys));
+            assertThat(snapshot).isEqualTo(1);
+        }
+
+        try (Dataset dataset =
+                Dataset.open(
+                        new RootAllocator(),
+                        config.getDatasetUri(),
+                        LanceConfig.genReadOptionFromConfig(config))) {
+            ArrowReader reader = dataset.newScan().scanBatches();
+            VectorSchemaRoot readerRoot = reader.getVectorSchemaRoot();
+            //        while (reader.loadNextBatch()) {
+            //            System.out.print(readerRoot.contentToTSVString());
+            //        }
+
+            // then, check data
+            for (int bucket = 0; bucket < 3; bucket++) {
+                for (String partition : partitionIdAndName.values()) {
+                    reader.loadNextBatch();
+                    Tuple2<String, Integer> partitionBucket = 
Tuple2.of(partition, bucket);
+                    List<LogRecord> expectRecords = 
recordsByBucket.get(partitionBucket);
+                    verifyLogTableRecords(
+                            readerRoot, expectRecords, bucket, isPartitioned, 
partition);
+                }
+            }
+            assertThat(reader.loadNextBatch()).isFalse();
+        }
+
+        // then, let's verify getMissingLakeSnapshot works
+        try (LakeCommitter<LanceWriteResult, LanceCommittable> lakeCommitter =
+                createLakeCommitter(tablePath)) {
+            // use snapshot id 0 as the known snapshot id
+            CommittedLakeSnapshot committedLakeSnapshot = 
lakeCommitter.getMissingLakeSnapshot(0L);
+            assertThat(committedLakeSnapshot).isNotNull();
+            Map<Tuple2<Long, Integer>, Long> offsets = 
committedLakeSnapshot.getLogEndOffsets();
+            for (int bucket = 0; bucket < 3; bucket++) {
+                for (Long partitionId : partitionIdAndName.keySet()) {
+                    // we only write 10 records, so expected log offset should 
be 10
+                    assertThat(offsets.get(Tuple2.of(partitionId, 
bucket))).isEqualTo(10);
+                }
+            }
+            assertThat(committedLakeSnapshot.getLakeSnapshotId()).isOne();
+
+            // use null as the known snapshot id
+            CommittedLakeSnapshot committedLakeSnapshot2 =
+                    lakeCommitter.getMissingLakeSnapshot(null);
+            
assertThat(committedLakeSnapshot2).isEqualTo(committedLakeSnapshot);
+
+            // use snapshot id 1 as the known snapshot id
+            committedLakeSnapshot = lakeCommitter.getMissingLakeSnapshot(1L);
+            // no any missing committed offset since the latest snapshot is 1L
+            assertThat(committedLakeSnapshot).isNull();
+        }
+    }
+
+    private void verifyLogTableRecords(
+            VectorSchemaRoot root,
+            List<LogRecord> expectRecords,
+            int expectBucket,
+            boolean isPartitioned,
+            @Nullable String partition)
+            throws Exception {
+        assertThat(root.getRowCount()).isEqualTo(expectRecords.size());
+        for (int i = 0; i < expectRecords.size(); i++) {
+            LogRecord expectRecord = expectRecords.get(i);
+            // check business columns:
+            assertThat((int) (root.getVector(0).getObject(i)))
+                    .isEqualTo(expectRecord.getRow().getInt(0));
+            assertThat(((VarCharVector) 
root.getVector(1)).getObject(i).toString())
+                    .isEqualTo(expectRecord.getRow().getString(1).toString());
+            assertThat(((VarCharVector) 
root.getVector(2)).getObject(i).toString())
+                    .isEqualTo(expectRecord.getRow().getString(2).toString());
+        }
+    }
+
+    private LakeCommitter<LanceWriteResult, LanceCommittable> 
createLakeCommitter(
+            TablePath tablePath) throws IOException {
+        return lanceLakeTieringFactory.createLakeCommitter(() -> tablePath);
+    }
+
+    private LakeWriter<LanceWriteResult> createLakeWriter(
+            TablePath tablePath,
+            int bucket,
+            @Nullable String partition,
+            Schema schema,
+            Map<String, String> customProperties)
+            throws IOException {
+        return lanceLakeTieringFactory.createLakeWriter(
+                new WriterInitContext() {
+                    @Override
+                    public TablePath tablePath() {
+                        return tablePath;
+                    }
+
+                    @Override
+                    public TableBucket tableBucket() {
+                        // don't care about tableId & partitionId
+                        return new TableBucket(0, 0L, bucket);
+                    }
+
+                    @Nullable
+                    @Override
+                    public String partition() {
+                        return partition;
+                    }
+
+                    @Override
+                    public com.alibaba.fluss.metadata.Schema schema() {
+                        return schema;
+                    }
+
+                    @Override
+                    public Map<String, String> customProperties() {
+                        return customProperties;
+                    }
+                });
+    }
+
+    private Tuple2<List<LogRecord>, List<LogRecord>> genLogTableRecords(
+            @Nullable String partition, int bucket, int numRecords) {
+        List<LogRecord> logRecords = new ArrayList<>();
+        for (int i = 0; i < numRecords; i++) {
+            GenericRow genericRow;
+            if (partition != null) {
+                // Partitioned table: include partition field in data
+                genericRow = new GenericRow(3); // c1, c2, c3(partition)
+                genericRow.setField(0, i);
+                genericRow.setField(1, BinaryString.fromString("bucket" + 
bucket + "_" + i));
+                genericRow.setField(2, BinaryString.fromString(partition)); // 
partition field
+            } else {
+                // Non-partitioned table
+                genericRow = new GenericRow(3);
+                genericRow.setField(0, i);
+                genericRow.setField(1, BinaryString.fromString("bucket" + 
bucket + "_" + i));
+                genericRow.setField(2, BinaryString.fromString("bucket" + 
bucket));
+            }
+            LogRecord logRecord =
+                    new GenericRecord(
+                            i, System.currentTimeMillis(), 
ChangeType.APPEND_ONLY, genericRow);
+            logRecords.add(logRecord);
+        }
+        return Tuple2.of(logRecords, logRecords);
+    }
+
+    private Schema createTable(LanceConfig config) throws Exception {
+        List<Schema.Column> columns = new ArrayList<>();
+        columns.add(new Schema.Column("c1", DataTypes.INT()));
+        columns.add(new Schema.Column("c2", DataTypes.STRING()));
+        columns.add(new Schema.Column("c3", DataTypes.STRING()));
+        Schema.Builder schemaBuilder = 
Schema.newBuilder().fromColumns(columns);
+        Schema schema = schemaBuilder.build();
+        doCreateLanceTable(schema, config);
+        return schema;
+    }
+
+    private void doCreateLanceTable(Schema schema, LanceConfig config) throws 
Exception {
+        WriteParams params = LanceConfig.genWriteParamsFromConfig(config);
+        LanceDatasetAdapter.createDataset(
+                config.getDatasetUri(), 
LanceArrowUtils.toArrowSchema(schema.getRowType()), params);
+    }

Review Comment:
   The logic is simple, kooks like a separate method is not necessarl



##########
fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/LakeEnabledTableCreateITCase.java:
##########
@@ -182,25 +185,11 @@ void testLogTable() throws Exception {
                         FieldType.nullable(new 
ArrowType.Timestamp(TimeUnit.MICROSECOND, null)),
                         null);
 
-        // for __bucket, __offset, __timestamp
-        Field logC17 =
-                new Field(
-                        BUCKET_COLUMN_NAME, FieldType.nullable(new 
ArrowType.Int(32, true)), null);
-        Field logC18 =
-                new Field(
-                        OFFSET_COLUMN_NAME, FieldType.nullable(new 
ArrowType.Int(64, true)), null);
-        Field logC19 =
-                new Field(
-                        TIMESTAMP_COLUMN_NAME,
-                        FieldType.nullable(new 
ArrowType.Timestamp(TimeUnit.MICROSECOND, null)),
-                        null);
-
         org.apache.arrow.vector.types.pojo.Schema expectedSchema =
                 new org.apache.arrow.vector.types.pojo.Schema(
                         Arrays.asList(
                                 logC1, logC2, logC3, logC4, logC5, logC6, 
logC7, logC8, logC9,
-                                logC10, logC11, logC12, logC13, logC14, 
logC15, logC16, logC17,
-                                logC18, logC19));
+                                logC10, logC11, logC12, logC13, logC14, 
logC15, logC16));

Review Comment:
   Why we need to remove the three metadata columns ?



##########
fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/tiering/LanceArrowWriterTest.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.tiering;
+
+import com.alibaba.fluss.lake.lance.utils.LanceArrowUtils;
+import com.alibaba.fluss.record.GenericRecord;
+import com.alibaba.fluss.record.LogRecord;
+import com.alibaba.fluss.row.GenericRow;
+import com.alibaba.fluss.types.DataField;
+import com.alibaba.fluss.types.DataTypes;
+import com.alibaba.fluss.types.RowType;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.VectorUnloader;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static com.alibaba.fluss.record.ChangeType.APPEND_ONLY;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** The UT for Lance Arrow Writer. */
+public class LanceArrowWriterTest {
+    @Test
+    public void test() throws Exception {
+        try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
+            List<DataField> fields = Arrays.asList(new DataField("column1", 
DataTypes.INT()));
+
+            RowType rowType = new RowType(fields);
+            final int totalRows = 125;
+            final int batchSize = 34;
+
+            final LanceArrowWriter arrowWriter =
+                    new LanceArrowWriter(
+                            allocator, LanceArrowUtils.toArrowSchema(rowType), 
batchSize, rowType);
+            AtomicInteger rowsWritten = new AtomicInteger(0);
+            AtomicInteger rowsRead = new AtomicInteger(0);
+            AtomicLong expectedBytesRead = new AtomicLong(0);

Review Comment:
   `expectedBytesRead` is never been verified?



##########
fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/ArrowWriter.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.tiering;
+
+import com.alibaba.fluss.lake.lance.utils.LanceArrowUtils;
+import com.alibaba.fluss.lake.lance.writers.ArrowFieldWriter;
+import com.alibaba.fluss.row.InternalRow;
+import com.alibaba.fluss.types.RowType;
+
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+
+/** An Arrow writer for InternalRow. */
+public class ArrowWriter {
+    /**
+     * An array of writers which are responsible for the serialization of each 
column of the rows.
+     */
+    private final ArrowFieldWriter<InternalRow>[] fieldWriters;
+
+    private int recordsCount;
+    private VectorSchemaRoot root;

Review Comment:
   Could be `final`?



##########
fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/tiering/LanceArrowWriterTest.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.tiering;
+
+import com.alibaba.fluss.lake.lance.utils.LanceArrowUtils;
+import com.alibaba.fluss.record.GenericRecord;
+import com.alibaba.fluss.record.LogRecord;
+import com.alibaba.fluss.row.GenericRow;
+import com.alibaba.fluss.types.DataField;
+import com.alibaba.fluss.types.DataTypes;
+import com.alibaba.fluss.types.RowType;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.VectorUnloader;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static com.alibaba.fluss.record.ChangeType.APPEND_ONLY;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** The UT for Lance Arrow Writer. */
+public class LanceArrowWriterTest {
+    @Test
+    public void test() throws Exception {
+        try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
+            List<DataField> fields = Arrays.asList(new DataField("column1", 
DataTypes.INT()));
+
+            RowType rowType = new RowType(fields);
+            final int totalRows = 125;
+            final int batchSize = 34;
+
+            final LanceArrowWriter arrowWriter =
+                    new LanceArrowWriter(
+                            allocator, LanceArrowUtils.toArrowSchema(rowType), 
batchSize, rowType);
+            AtomicInteger rowsWritten = new AtomicInteger(0);
+            AtomicInteger rowsRead = new AtomicInteger(0);
+            AtomicLong expectedBytesRead = new AtomicLong(0);
+
+            Thread writerThread =
+                    new Thread(
+                            () -> {
+                                try {
+                                    for (int i = 0; i < totalRows; i++) {
+                                        GenericRow genericRow = new 
GenericRow(1);
+                                        genericRow.setField(0, 
rowsWritten.incrementAndGet());
+                                        LogRecord logRecord =
+                                                new GenericRecord(
+                                                        i,
+                                                        
System.currentTimeMillis(),
+                                                        APPEND_ONLY,
+                                                        genericRow);
+
+                                        arrowWriter.write(logRecord);
+                                    }
+                                    arrowWriter.setFinished();
+                                } catch (Exception e) {

Review Comment:
   minor: catch(Throwable t) and then print via logger, it will help us to 
troubleshoot later



##########
fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/ArrowWriter.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.tiering;
+
+import com.alibaba.fluss.lake.lance.utils.LanceArrowUtils;
+import com.alibaba.fluss.lake.lance.writers.ArrowFieldWriter;
+import com.alibaba.fluss.row.InternalRow;
+import com.alibaba.fluss.types.RowType;
+
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+
+/** An Arrow writer for InternalRow. */

Review Comment:
   ```suggestion
   /** An Arrow writer for {@link InternalRow}. */
   ```



##########
fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/ArrowWriter.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.tiering;
+
+import com.alibaba.fluss.lake.lance.utils.LanceArrowUtils;
+import com.alibaba.fluss.lake.lance.writers.ArrowFieldWriter;
+import com.alibaba.fluss.row.InternalRow;
+import com.alibaba.fluss.types.RowType;
+
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+
+/** An Arrow writer for InternalRow. */
+public class ArrowWriter {
+    /**
+     * An array of writers which are responsible for the serialization of each 
column of the rows.
+     */
+    private final ArrowFieldWriter<InternalRow>[] fieldWriters;
+
+    private int recordsCount;
+    private VectorSchemaRoot root;
+
+    public ArrowWriter(ArrowFieldWriter<InternalRow>[] fieldWriters, 
VectorSchemaRoot root) {

Review Comment:
   we can kindly add some note for the constructor



##########
fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/tiering/LanceTieringTest.java:
##########
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.tiering;
+
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.lake.committer.CommittedLakeSnapshot;
+import com.alibaba.fluss.lake.committer.LakeCommitter;
+import com.alibaba.fluss.lake.lance.LanceConfig;
+import com.alibaba.fluss.lake.lance.utils.LanceArrowUtils;
+import com.alibaba.fluss.lake.lance.utils.LanceDatasetAdapter;
+import com.alibaba.fluss.lake.serializer.SimpleVersionedSerializer;
+import com.alibaba.fluss.lake.writer.LakeWriter;
+import com.alibaba.fluss.lake.writer.WriterInitContext;
+import com.alibaba.fluss.metadata.Schema;
+import com.alibaba.fluss.metadata.TableBucket;
+import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.record.ChangeType;
+import com.alibaba.fluss.record.GenericRecord;
+import com.alibaba.fluss.record.LogRecord;
+import com.alibaba.fluss.row.BinaryString;
+import com.alibaba.fluss.row.GenericRow;
+import com.alibaba.fluss.types.DataTypes;
+import com.alibaba.fluss.utils.types.Tuple2;
+
+import com.lancedb.lance.Dataset;
+import com.lancedb.lance.WriteParams;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static 
com.alibaba.fluss.flink.tiering.committer.TieringCommitOperator.toBucketOffsetsProperty;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** The UT for tiering to Lance via {@link LanceLakeTieringFactory}. */
+public class LanceTieringTest {
+    private @TempDir File tempWarehouseDir;
+    private LanceLakeTieringFactory lanceLakeTieringFactory;
+    private Configuration configuration;
+
+    @BeforeEach
+    void beforeEach() {
+        configuration = new Configuration();
+        configuration.setString("warehouse", tempWarehouseDir.toString());
+        lanceLakeTieringFactory = new LanceLakeTieringFactory(configuration);
+    }
+
+    private static Stream<Arguments> tieringWriteArgs() {
+        return Stream.of(Arguments.of(false), Arguments.of(true));
+    }
+
+    @ParameterizedTest
+    @MethodSource("tieringWriteArgs")
+    void testTieringWriteTable(boolean isPartitioned) throws Exception {
+        int bucketNum = 3;
+        TablePath tablePath = TablePath.of("lance", "logTable");
+        Map<String, String> customProperties = new HashMap<>();
+        customProperties.put("lance.batch_size", "256");
+        LanceConfig config =
+                LanceConfig.from(
+                        configuration.toMap(),
+                        customProperties,
+                        tablePath.getDatabaseName(),
+                        tablePath.getTableName());
+        Schema schema = createTable(config);
+
+        List<LanceWriteResult> lanceWriteResults = new ArrayList<>();
+        SimpleVersionedSerializer<LanceWriteResult> writeResultSerializer =
+                lanceLakeTieringFactory.getWriteResultSerializer();
+        SimpleVersionedSerializer<LanceCommittable> committableSerializer =
+                lanceLakeTieringFactory.getCommittableSerializer();
+
+        try (LakeCommitter<LanceWriteResult, LanceCommittable> lakeCommitter =
+                createLakeCommitter(tablePath)) {
+            // should no any missing snapshot
+            assertThat(lakeCommitter.getMissingLakeSnapshot(1L)).isNull();
+        }
+
+        Map<Tuple2<String, Integer>, List<LogRecord>> recordsByBucket = new 
HashMap<>();
+        Map<Long, String> partitionIdAndName =
+                isPartitioned
+                        ? new HashMap<Long, String>() {
+                            {
+                                put(1L, "p1");
+                                put(2L, "p2");
+                                put(3L, "p3");
+                            }
+                        }
+                        : Collections.singletonMap(null, null);
+        List<String> partitionKeys = isPartitioned ? Arrays.asList("c3") : 
null;
+        Map<TableBucket, Long> tableBucketOffsets = new HashMap<>();
+        // first, write data
+        for (int bucket = 0; bucket < bucketNum; bucket++) {
+            for (Map.Entry<Long, String> entry : 
partitionIdAndName.entrySet()) {
+                String partition = entry.getValue();
+                try (LakeWriter<LanceWriteResult> lakeWriter =
+                        createLakeWriter(tablePath, bucket, partition, schema, 
customProperties)) {
+                    Tuple2<String, Integer> partitionBucket = 
Tuple2.of(partition, bucket);
+                    Tuple2<List<LogRecord>, List<LogRecord>> 
writeAndExpectRecords =
+                            genLogTableRecords(partition, bucket, 10);
+                    List<LogRecord> writtenRecords = writeAndExpectRecords.f0;
+                    List<LogRecord> expectRecords = writeAndExpectRecords.f1;
+                    recordsByBucket.put(partitionBucket, expectRecords);
+                    tableBucketOffsets.put(new TableBucket(0, entry.getKey(), 
bucket), 10L);
+                    for (LogRecord logRecord : writtenRecords) {
+                        lakeWriter.write(logRecord);
+                    }
+                    // serialize/deserialize writeResult
+                    LanceWriteResult lanceWriteResult = lakeWriter.complete();
+                    byte[] serialized = 
writeResultSerializer.serialize(lanceWriteResult);
+                    lanceWriteResults.add(
+                            writeResultSerializer.deserialize(
+                                    writeResultSerializer.getVersion(), 
serialized));
+                }
+            }
+        }
+
+        // second, commit data
+        try (LakeCommitter<LanceWriteResult, LanceCommittable> lakeCommitter =
+                createLakeCommitter(tablePath)) {
+            // serialize/deserialize committable
+            LanceCommittable lanceCommittable = 
lakeCommitter.toCommittable(lanceWriteResults);
+            byte[] serialized = 
committableSerializer.serialize(lanceCommittable);
+            lanceCommittable =
+                    committableSerializer.deserialize(
+                            committableSerializer.getVersion(), serialized);
+            long snapshot =
+                    lakeCommitter.commit(
+                            lanceCommittable,
+                            toBucketOffsetsProperty(
+                                    tableBucketOffsets, partitionIdAndName, 
partitionKeys));
+            assertThat(snapshot).isEqualTo(1);
+        }
+
+        try (Dataset dataset =
+                Dataset.open(
+                        new RootAllocator(),
+                        config.getDatasetUri(),
+                        LanceConfig.genReadOptionFromConfig(config))) {
+            ArrowReader reader = dataset.newScan().scanBatches();
+            VectorSchemaRoot readerRoot = reader.getVectorSchemaRoot();
+            //        while (reader.loadNextBatch()) {
+            //            System.out.print(readerRoot.contentToTSVString());
+            //        }

Review Comment:
   plz remove debug logs



##########
fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceCommittableSerializer.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.tiering;
+
+import com.alibaba.fluss.lake.serializer.SimpleVersionedSerializer;
+
+import com.lancedb.lance.FragmentMetadata;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.List;
+
+/** The serializer of {@link LanceCommittable}. */
+public class LanceCommittableSerializer implements 
SimpleVersionedSerializer<LanceCommittable> {
+    private static final int CURRENT_VERSION = 1;
+
+    @Override
+    public int getVersion() {
+        return CURRENT_VERSION;
+    }
+
+    @Override
+    public byte[] serialize(LanceCommittable lanceCommittable) throws 
IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        ObjectOutputStream oos = new ObjectOutputStream(baos);
+        oos.writeObject(lanceCommittable.committable());
+        oos.close();
+        return baos.toByteArray();
+    }
+
+    @Override
+    public LanceCommittable deserialize(int version, byte[] serialized) throws 
IOException {
+        if (version != CURRENT_VERSION) {
+            throw new UnsupportedOperationException(
+                    "Expecting LanceCommittable version to be "
+                            + CURRENT_VERSION
+                            + ", but found "
+                            + version
+                            + ".");
+        }
+        ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
+        ObjectInputStream ois = new ObjectInputStream(bais);
+        try {
+            return new LanceCommittable((List<FragmentMetadata>) 
ois.readObject());
+        } catch (ClassNotFoundException e) {
+            throw new RuntimeException(e);
+        }

Review Comment:
   When and Why a `ClassNotFoundException` may be thrown here? could you add 
some explain message instead of a direct wrapped  `RuntimeException`?



##########
fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceLakeCommitter.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.tiering;
+
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.lake.committer.BucketOffset;
+import com.alibaba.fluss.lake.committer.CommittedLakeSnapshot;
+import com.alibaba.fluss.lake.committer.LakeCommitter;
+import com.alibaba.fluss.lake.lance.LanceConfig;
+import com.alibaba.fluss.lake.lance.utils.LanceDatasetAdapter;
+import com.alibaba.fluss.metadata.TablePath;
+import 
com.alibaba.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
com.alibaba.fluss.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import com.alibaba.fluss.utils.json.BucketOffsetJsonSerde;
+
+import com.lancedb.lance.FragmentMetadata;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static 
com.alibaba.fluss.lake.committer.BucketOffset.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
+
+/** Implementation of {@link LakeCommitter} for Lance. */
+public class LanceLakeCommitter implements LakeCommitter<LanceWriteResult, 
LanceCommittable> {
+    private final LanceConfig config;
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    public LanceLakeCommitter(Configuration options, TablePath tablePath) {
+        this.config =
+                LanceConfig.from(
+                        options.toMap(),
+                        Collections.emptyMap(),
+                        tablePath.getDatabaseName(),
+                        tablePath.getTableName());
+    }
+
+    @Override
+    public LanceCommittable toCommittable(List<LanceWriteResult> 
lanceWriteResults)
+            throws IOException {
+        List<FragmentMetadata> fragments =
+                lanceWriteResults.stream()
+                        .map(LanceWriteResult::commitMessage)
+                        .flatMap(List::stream)
+                        .collect(Collectors.toList());
+        return new LanceCommittable(fragments);
+    }
+
+    @Override
+    public long commit(LanceCommittable committable, Map<String, String> 
snapshotProperties)
+            throws IOException {
+        return LanceDatasetAdapter.commitAppend(
+                config, committable.committable(), snapshotProperties);
+    }
+
+    @Override
+    public void abort(LanceCommittable committable) throws IOException {
+        throw new UnsupportedOperationException();

Review Comment:
   This will lead the data consistency IIUC



##########
fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceLakeWriter.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.tiering;
+
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.lake.lance.LanceConfig;
+import com.alibaba.fluss.lake.lance.utils.LanceDatasetAdapter;
+import com.alibaba.fluss.lake.writer.LakeWriter;
+import com.alibaba.fluss.lake.writer.WriterInitContext;
+import com.alibaba.fluss.record.LogRecord;
+
+import com.lancedb.lance.FragmentMetadata;
+import com.lancedb.lance.WriteParams;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.FutureTask;
+
+/** Implementation of {@link LakeWriter} for Lance. */
+public class LanceLakeWriter implements LakeWriter<LanceWriteResult> {
+    private final LanceArrowWriter arrowWriter;
+    FutureTask<List<FragmentMetadata>> fragmentCreationTask;

Review Comment:
   private final ?



##########
fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/testutils/FlinkLanceTieringTestBase.java:
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.testutils;
+
+import com.alibaba.fluss.client.Connection;
+import com.alibaba.fluss.client.ConnectionFactory;
+import com.alibaba.fluss.client.admin.Admin;
+import com.alibaba.fluss.client.table.Table;
+import com.alibaba.fluss.client.table.writer.AppendWriter;
+import com.alibaba.fluss.client.table.writer.TableWriter;
+import com.alibaba.fluss.client.table.writer.UpsertWriter;
+import com.alibaba.fluss.config.AutoPartitionTimeUnit;
+import com.alibaba.fluss.config.ConfigOptions;
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.exception.FlussRuntimeException;
+import com.alibaba.fluss.flink.tiering.LakeTieringJobBuilder;
+import com.alibaba.fluss.metadata.DataLakeFormat;
+import com.alibaba.fluss.metadata.Schema;
+import com.alibaba.fluss.metadata.TableBucket;
+import com.alibaba.fluss.metadata.TableDescriptor;
+import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.row.InternalRow;
+import com.alibaba.fluss.server.replica.Replica;
+import com.alibaba.fluss.server.testutils.FlussClusterExtension;
+import com.alibaba.fluss.types.DataTypes;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.nio.file.Files;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
com.alibaba.fluss.flink.tiering.source.TieringSourceOptions.POLL_TIERING_TABLE_INTERVAL;
+import static com.alibaba.fluss.testutils.common.CommonTestUtils.retry;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test base for sync to lance by Flink. */
+public class FlinkLanceTieringTestBase {
+
+    protected static final String DEFAULT_DB = "fluss";
+
+    @RegisterExtension
+    public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
+            FlussClusterExtension.builder()
+                    .setClusterConf(initConfig())
+                    .setNumOfTabletServers(3)
+                    .build();
+
+    protected StreamExecutionEnvironment execEnv;
+
+    protected static Connection conn;
+    protected static Admin admin;
+    protected static Configuration clientConf;
+    private static String warehousePath;
+
+    private static Configuration initConfig() {
+        Configuration conf = new Configuration();
+        conf.set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1))
+                // not to clean snapshots for test purpose
+                .set(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, 
Integer.MAX_VALUE);
+        conf.setString("datalake.format", "lance");
+        try {
+            warehousePath =
+                    Files.createTempDirectory("fluss-testing-datalake-tiered")
+                            .resolve("warehouse")
+                            .toString();
+        } catch (Exception e) {
+            throw new FlussRuntimeException("Failed to create warehouse path");
+        }
+        conf.setString("datalake.lance.warehouse", warehousePath);
+        return conf;
+    }
+
+    @BeforeAll
+    protected static void beforeAll() {
+        clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
+        conn = ConnectionFactory.createConnection(clientConf);
+        admin = conn.getAdmin();
+    }
+
+    @BeforeEach
+    public void beforeEach() {
+        execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+        execEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+        execEnv.setParallelism(2);
+        execEnv = StreamExecutionEnvironment.getExecutionEnvironment();

Review Comment:
   useless line



##########
fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/utils/LanceArrowUtils.java:
##########
@@ -187,4 +222,66 @@ protected ArrowType defaultMethod(DataType dataType) {
                             "Unsupported data type %s currently.", 
dataType.asSummaryString()));
         }
     }
+
+    private static int getPrecision(DecimalVector decimalVector) {
+        int precision = -1;
+        try {
+            java.lang.reflect.Field precisionField =
+                    decimalVector.getClass().getDeclaredField("precision");
+            precisionField.setAccessible(true);
+            precision = (int) precisionField.get(decimalVector);

Review Comment:
   What's the reason we don't to use `decimalVector.getPrecision();` here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to