This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 87993089a [lake/lance] Refactor LanceArrowWriter to reuse fluss-common 
ArrowWriters (#2345)
87993089a is described below

commit 87993089a04cb48491391b0bd0d02f656f8439ba
Author: ForwardXu <[email protected]>
AuthorDate: Sat Jan 17 15:12:56 2026 +0800

    [lake/lance] Refactor LanceArrowWriter to reuse fluss-common ArrowWriters 
(#2345)
---
 .../fluss/lake/lance/tiering/ArrowWriter.java      |  74 --------
 .../fluss/lake/lance/tiering/LanceArrowWriter.java | 130 -------------
 .../lance/tiering/LanceCommittableSerializer.java  |   2 +-
 .../fluss/lake/lance/tiering/LanceLakeWriter.java  | 111 +++++++----
 .../lake/lance/tiering/ShadedArrowBatchWriter.java | 114 +++++++++++
 .../fluss/lake/lance/utils/ArrowDataConverter.java |  93 +++++++++
 .../fluss/lake/lance/utils/LanceArrowUtils.java    | 210 ++++-----------------
 .../lake/lance/utils/LanceDatasetAdapter.java      |  18 --
 .../lake/lance/writers/ArrowBigIntWriter.java      |  52 -----
 .../lake/lance/writers/ArrowBinaryWriter.java      |  57 ------
 .../lake/lance/writers/ArrowBooleanWriter.java     |  54 ------
 .../fluss/lake/lance/writers/ArrowDateWriter.java  |  54 ------
 .../lake/lance/writers/ArrowDecimalWriter.java     |  70 -------
 .../lake/lance/writers/ArrowDoubleWriter.java      |  54 ------
 .../fluss/lake/lance/writers/ArrowFieldWriter.java |  63 -------
 .../fluss/lake/lance/writers/ArrowFloatWriter.java |  53 ------
 .../fluss/lake/lance/writers/ArrowIntWriter.java   |  54 ------
 .../lake/lance/writers/ArrowSmallIntWriter.java    |  54 ------
 .../fluss/lake/lance/writers/ArrowTimeWriter.java  |  90 ---------
 .../lance/writers/ArrowTimestampLtzWriter.java     |  99 ----------
 .../lance/writers/ArrowTimestampNtzWriter.java     |  97 ----------
 .../lake/lance/writers/ArrowTinyIntWriter.java     |  56 ------
 .../lake/lance/writers/ArrowVarBinaryWriter.java   |  52 -----
 .../lake/lance/writers/ArrowVarCharWriter.java     |  56 ------
 .../lake/lance/LakeEnabledTableCreateITCase.java   |   6 +-
 25 files changed, 326 insertions(+), 1447 deletions(-)

diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/ArrowWriter.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/ArrowWriter.java
deleted file mode 100644
index 928a33e3c..000000000
--- 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/ArrowWriter.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.lake.lance.tiering;
-
-import org.apache.fluss.lake.lance.utils.LanceArrowUtils;
-import org.apache.fluss.lake.lance.writers.ArrowFieldWriter;
-import org.apache.fluss.row.InternalRow;
-import org.apache.fluss.types.RowType;
-
-import org.apache.arrow.vector.FieldVector;
-import org.apache.arrow.vector.VectorSchemaRoot;
-
-/** An Arrow writer for {@link InternalRow}. */
-public class ArrowWriter {
-    private final VectorSchemaRoot root;
-
-    private final ArrowFieldWriter<InternalRow>[] fieldWriters;
-
-    private int recordsCount;
-
-    /**
-     * Writer which serializes the Fluss rows to Arrow record batches.
-     *
-     * @param fieldWriters An array of writers which are responsible for the 
serialization of each
-     *     column of the rows
-     * @param root Container that holds a set of vectors for the rows
-     */
-    public ArrowWriter(ArrowFieldWriter<InternalRow>[] fieldWriters, 
VectorSchemaRoot root) {
-        this.fieldWriters = fieldWriters;
-        this.root = root;
-    }
-
-    public static ArrowWriter create(VectorSchemaRoot root, RowType rowType) {
-        ArrowFieldWriter<InternalRow>[] fieldWriters =
-                new ArrowFieldWriter[root.getFieldVectors().size()];
-        for (int i = 0; i < fieldWriters.length; i++) {
-            FieldVector fieldVector = root.getVector(i);
-
-            fieldWriters[i] =
-                    LanceArrowUtils.createArrowFieldWriter(fieldVector, 
rowType.getTypeAt(i));
-        }
-        return new ArrowWriter(fieldWriters, root);
-    }
-
-    /** Writes the specified row which is serialized into Arrow format. */
-    public void writeRow(InternalRow row) {
-        for (int i = 0; i < fieldWriters.length; i++) {
-            fieldWriters[i].write(row, i, true);
-        }
-        recordsCount++;
-    }
-
-    public void finish() {
-        root.setRowCount(recordsCount);
-        for (ArrowFieldWriter<InternalRow> fieldWriter : fieldWriters) {
-            fieldWriter.finish();
-        }
-    }
-}
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceArrowWriter.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceArrowWriter.java
deleted file mode 100644
index 2a4b1599e..000000000
--- 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceArrowWriter.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.lake.lance.tiering;
-
-import org.apache.fluss.record.LogRecord;
-import org.apache.fluss.types.RowType;
-
-import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.vector.ipc.ArrowReader;
-import org.apache.arrow.vector.types.pojo.Schema;
-
-import java.io.IOException;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static org.apache.fluss.utils.Preconditions.checkArgument;
-import static org.apache.fluss.utils.Preconditions.checkNotNull;
-
-/** A custom arrow reader that supports writes Fluss internal rows while 
reading data in batches. */
-public class LanceArrowWriter extends ArrowReader {
-    private final Schema schema;
-    private final RowType rowType;
-    private final int batchSize;
-
-    private volatile boolean finished = false;
-
-    private final AtomicLong totalBytesRead = new AtomicLong();
-    private ArrowWriter arrowWriter = null;
-    private final AtomicInteger count = new AtomicInteger(0);
-    private final Semaphore writeToken;
-    private final Semaphore loadToken;
-
-    public LanceArrowWriter(
-            BufferAllocator allocator, Schema schema, int batchSize, RowType 
rowType) {
-        super(allocator);
-        checkNotNull(schema);
-        checkArgument(batchSize > 0);
-        this.schema = schema;
-        this.rowType = rowType;
-        this.batchSize = batchSize;
-        this.writeToken = new Semaphore(0);
-        this.loadToken = new Semaphore(0);
-    }
-
-    void write(LogRecord row) {
-        checkNotNull(row);
-        try {
-            // wait util prepareLoadNextBatch to release write token,
-            writeToken.acquire();
-            arrowWriter.writeRow(row.getRow());
-            if (count.incrementAndGet() == batchSize) {
-                // notify loadNextBatch to take the batch
-                loadToken.release();
-            }
-        } catch (InterruptedException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    void setFinished() {
-        finished = true;
-        loadToken.release();
-    }
-
-    @Override
-    public void prepareLoadNextBatch() throws IOException {
-        super.prepareLoadNextBatch();
-        arrowWriter = ArrowWriter.create(this.getVectorSchemaRoot(), rowType);
-        // release batch size token for write
-        writeToken.release(batchSize);
-    }
-
-    @Override
-    public boolean loadNextBatch() throws IOException {
-        prepareLoadNextBatch();
-        try {
-            if (finished && count.get() == 0) {
-                return false;
-            }
-            // wait util batch if full or finished
-            loadToken.acquire();
-            arrowWriter.finish();
-            if (!finished) {
-                count.set(0);
-                return true;
-            } else {
-                // true if it has some rows and return false if there is no 
record
-                if (count.get() > 0) {
-                    count.set(0);
-                    return true;
-                } else {
-                    return false;
-                }
-            }
-        } catch (InterruptedException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public long bytesRead() {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    protected synchronized void closeReadSource() throws IOException {
-        // Implement if needed
-    }
-
-    @Override
-    protected Schema readSchema() {
-        return this.schema;
-    }
-}
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceCommittableSerializer.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceCommittableSerializer.java
index d128acdc1..15b2208e6 100644
--- 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceCommittableSerializer.java
+++ 
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceCommittableSerializer.java
@@ -47,6 +47,7 @@ public class LanceCommittableSerializer implements 
SimpleVersionedSerializer<Lan
     }
 
     @Override
+    @SuppressWarnings("unchecked")
     public LanceCommittable deserialize(int version, byte[] serialized) throws 
IOException {
         if (version != CURRENT_VERSION) {
             throw new UnsupportedOperationException(
@@ -58,7 +59,6 @@ public class LanceCommittableSerializer implements 
SimpleVersionedSerializer<Lan
         }
         try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
                 ObjectInputStream ois = new ObjectInputStream(bais)) {
-            //noinspection unchecked
             return new LanceCommittable((List<FragmentMetadata>) 
ois.readObject());
         } catch (ClassNotFoundException e) {
             throw new IOException("Couldn't deserialize LanceCommittable", e);
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceLakeWriter.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceLakeWriter.java
index 699a528cc..1ed6c589e 100644
--- 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceLakeWriter.java
+++ 
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceLakeWriter.java
@@ -19,26 +19,39 @@ package org.apache.fluss.lake.lance.tiering;
 
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.lake.lance.LanceConfig;
+import org.apache.fluss.lake.lance.utils.ArrowDataConverter;
 import org.apache.fluss.lake.lance.utils.LanceDatasetAdapter;
 import org.apache.fluss.lake.writer.LakeWriter;
 import org.apache.fluss.lake.writer.WriterInitContext;
 import org.apache.fluss.record.LogRecord;
+import org.apache.fluss.types.RowType;
 
+import com.lancedb.lance.Fragment;
 import com.lancedb.lance.FragmentMetadata;
 import com.lancedb.lance.WriteParams;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.arrow.vector.types.pojo.Schema;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.FutureTask;
 
-/** Implementation of {@link LakeWriter} for Lance. */
+/** Implementation of {@link LakeWriter} for Lance using batch processing. */
 public class LanceLakeWriter implements LakeWriter<LanceWriteResult> {
-    private final LanceArrowWriter arrowWriter;
-    private final FutureTask<List<FragmentMetadata>> fragmentCreationTask;
+    private final BufferAllocator nonShadedAllocator;
+    private final 
org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator
+            shadedAllocator;
+    private final Schema nonShadedSchema;
+    private final RowType rowType;
+    private final int batchSize;
+    private final String datasetUri;
+    private final WriteParams writeParams;
+
+    private final ShadedArrowBatchWriter arrowWriter;
+    private final List<FragmentMetadata> allFragments;
 
     public LanceLakeWriter(Configuration options, WriterInitContext 
writerInitContext)
             throws IOException {
@@ -48,47 +61,79 @@ public class LanceLakeWriter implements 
LakeWriter<LanceWriteResult> {
                         
writerInitContext.tableInfo().getCustomProperties().toMap(),
                         writerInitContext.tablePath().getDatabaseName(),
                         writerInitContext.tablePath().getTableName());
-        int batchSize = LanceConfig.getBatchSize(config);
+
+        this.batchSize = LanceConfig.getBatchSize(config);
+        this.datasetUri = config.getDatasetUri();
+        this.writeParams = LanceConfig.genWriteParamsFromConfig(config);
+        this.rowType = writerInitContext.tableInfo().getRowType();
+        this.nonShadedAllocator = new RootAllocator();
+        this.shadedAllocator =
+                new 
org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator();
+        this.arrowWriter = new ShadedArrowBatchWriter(shadedAllocator, 
rowType);
+        this.allFragments = new ArrayList<>();
+
         Optional<Schema> schema = LanceDatasetAdapter.getSchema(config);
         if (!schema.isPresent()) {
-            throw new IOException("Fail to get dataset " + 
config.getDatasetUri() + " in Lance.");
+            throw new IOException("Fail to get dataset " + datasetUri + " in 
Lance.");
         }
-
-        this.arrowWriter =
-                LanceDatasetAdapter.getArrowWriter(
-                        schema.get(), batchSize, 
writerInitContext.tableInfo().getRowType());
-
-        WriteParams params = LanceConfig.genWriteParamsFromConfig(config);
-        Callable<List<FragmentMetadata>> fragmentCreator =
-                () ->
-                        LanceDatasetAdapter.createFragment(
-                                config.getDatasetUri(), arrowWriter, params);
-        fragmentCreationTask = new FutureTask<>(fragmentCreator);
-        Thread fragmentCreationThread = new Thread(fragmentCreationTask);
-        fragmentCreationThread.start();
+        this.nonShadedSchema = schema.get();
     }
 
     @Override
     public void write(LogRecord record) throws IOException {
-        arrowWriter.write(record);
+        arrowWriter.writeRow(record.getRow());
+
+        if (arrowWriter.getRecordsCount() >= batchSize) {
+            List<FragmentMetadata> fragments = flush();
+            allFragments.addAll(fragments);
+        }
     }
 
-    @Override
-    public LanceWriteResult complete() throws IOException {
-        arrowWriter.setFinished();
+    private List<FragmentMetadata> flush() throws IOException {
+        if (arrowWriter.getRecordsCount() == 0) {
+            return new ArrayList<>();
+        }
+
+        VectorSchemaRoot nonShadedRoot = null;
+
         try {
-            List<FragmentMetadata> fragmentMetadata = 
fragmentCreationTask.get();
-            return new LanceWriteResult(fragmentMetadata);
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new IOException("Interrupted while waiting for reader thread 
to finish", e);
-        } catch (ExecutionException e) {
-            throw new IOException("Exception in reader thread", e);
+            arrowWriter.finish();
+
+            nonShadedRoot =
+                    ArrowDataConverter.convertToNonShaded(
+                            arrowWriter.getShadedRoot(), nonShadedAllocator, 
nonShadedSchema);
+
+            List<FragmentMetadata> fragments =
+                    Fragment.create(datasetUri, nonShadedAllocator, 
nonShadedRoot, writeParams);
+
+            arrowWriter.reset();
+            return fragments;
+        } catch (Exception e) {
+            throw new IOException("Failed to write Lance fragment", e);
+        } finally {
+            if (nonShadedRoot != null) {
+                nonShadedRoot.close();
+            }
         }
     }
 
+    @Override
+    public LanceWriteResult complete() throws IOException {
+        List<FragmentMetadata> fragments = flush();
+        allFragments.addAll(fragments);
+        return new LanceWriteResult(allFragments);
+    }
+
     @Override
     public void close() throws IOException {
-        arrowWriter.close();
+        if (arrowWriter != null) {
+            arrowWriter.close();
+        }
+        if (shadedAllocator != null) {
+            shadedAllocator.close();
+        }
+        if (nonShadedAllocator != null) {
+            nonShadedAllocator.close();
+        }
     }
 }
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/ShadedArrowBatchWriter.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/ShadedArrowBatchWriter.java
new file mode 100644
index 000000000..21b2aa1ec
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/ShadedArrowBatchWriter.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.lance.tiering;
+
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.arrow.writers.ArrowFieldWriter;
+import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator;
+import 
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.BaseFixedWidthVector;
+import 
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.BaseVariableWidthVector;
+import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.FieldVector;
+import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.VectorSchemaRoot;
+import 
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.ListVector;
+import org.apache.fluss.types.RowType;
+import org.apache.fluss.utils.ArrowUtils;
+
+/**
+ * Batch writer using shaded Arrow and ArrowFieldWriter from fluss-common.
+ *
+ * <p>This class uses shaded Arrow vectors and ArrowFieldWriters to write 
Fluss InternalRows. It can
+ * later be converted to non-shaded Arrow format for Lance compatibility.
+ */
+public class ShadedArrowBatchWriter implements AutoCloseable {
+    private static final int INITIAL_CAPACITY = 1024;
+
+    private final VectorSchemaRoot shadedRoot;
+    private final ArrowFieldWriter[] fieldWriters;
+    private int recordsCount;
+
+    public ShadedArrowBatchWriter(BufferAllocator shadedAllocator, RowType 
rowType) {
+        this.shadedRoot =
+                VectorSchemaRoot.create(ArrowUtils.toArrowSchema(rowType), 
shadedAllocator);
+        this.fieldWriters = new ArrowFieldWriter[rowType.getFieldCount()];
+
+        for (int i = 0; i < fieldWriters.length; i++) {
+            FieldVector fieldVector = shadedRoot.getVector(i);
+            initFieldVector(fieldVector);
+            fieldWriters[i] = ArrowUtils.createArrowFieldWriter(fieldVector, 
rowType.getTypeAt(i));
+        }
+        this.recordsCount = 0;
+    }
+
+    public void writeRow(InternalRow row) {
+        boolean handleSafe = recordsCount >= INITIAL_CAPACITY;
+        for (int i = 0; i < fieldWriters.length; i++) {
+            fieldWriters[i].write(recordsCount, row, i, handleSafe);
+        }
+        recordsCount++;
+    }
+
+    public void finish() {
+        shadedRoot.setRowCount(recordsCount);
+    }
+
+    public void reset() {
+        recordsCount = 0;
+        for (int i = 0; i < fieldWriters.length; i++) {
+            FieldVector fieldVector = shadedRoot.getVector(i);
+            initFieldVector(fieldVector);
+        }
+        for (ArrowFieldWriter fieldWriter : fieldWriters) {
+            fieldWriter.reset();
+        }
+        shadedRoot.setRowCount(0);
+    }
+
+    public int getRecordsCount() {
+        return recordsCount;
+    }
+
+    public VectorSchemaRoot getShadedRoot() {
+        return shadedRoot;
+    }
+
+    @Override
+    public void close() {
+        if (shadedRoot != null) {
+            shadedRoot.close();
+        }
+    }
+
+    private void initFieldVector(FieldVector fieldVector) {
+        fieldVector.setInitialCapacity(INITIAL_CAPACITY);
+
+        if (fieldVector instanceof BaseFixedWidthVector) {
+            ((BaseFixedWidthVector) fieldVector).allocateNew(INITIAL_CAPACITY);
+        } else if (fieldVector instanceof BaseVariableWidthVector) {
+            ((BaseVariableWidthVector) 
fieldVector).allocateNew(INITIAL_CAPACITY);
+        } else if (fieldVector instanceof ListVector) {
+            ListVector listVector = (ListVector) fieldVector;
+            listVector.allocateNew();
+            FieldVector dataVector = listVector.getDataVector();
+            if (dataVector != null) {
+                initFieldVector(dataVector);
+            }
+        } else {
+            fieldVector.allocateNew();
+        }
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/ArrowDataConverter.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/ArrowDataConverter.java
new file mode 100644
index 000000000..eb8c53b22
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/ArrowDataConverter.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.lance.utils;
+
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ * Utility class to convert between shaded and non-shaded Arrow 
VectorSchemaRoot by sharing off-heap
+ * memory directly.
+ *
+ * <p>Since both shaded and non-shaded Arrow use the same off-heap memory 
layout, we can share the
+ * underlying ByteBuffer/memory address directly without serialization 
overhead.
+ */
+public class ArrowDataConverter {
+
+    /**
+     * Converts a shaded Arrow VectorSchemaRoot to a non-shaded Arrow 
VectorSchemaRoot by sharing
+     * the underlying off-heap memory.
+     *
+     * @param shadedRoot The shaded Arrow VectorSchemaRoot from fluss-common
+     * @param nonShadedAllocator The non-shaded BufferAllocator for Lance
+     * @param nonShadedSchema The non-shaded Arrow Schema for Lance
+     * @return A non-shaded VectorSchemaRoot compatible with Lance
+     */
+    public static VectorSchemaRoot convertToNonShaded(
+            
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.VectorSchemaRoot 
shadedRoot,
+            BufferAllocator nonShadedAllocator,
+            Schema nonShadedSchema) {
+
+        VectorSchemaRoot nonShadedRoot =
+                VectorSchemaRoot.create(nonShadedSchema, nonShadedAllocator);
+        nonShadedRoot.allocateNew();
+
+        
List<org.apache.fluss.shaded.arrow.org.apache.arrow.vector.FieldVector> 
shadedVectors =
+                shadedRoot.getFieldVectors();
+        List<FieldVector> nonShadedVectors = nonShadedRoot.getFieldVectors();
+
+        for (int i = 0; i < shadedVectors.size(); i++) {
+            copyVectorData(shadedVectors.get(i), nonShadedVectors.get(i));
+        }
+
+        nonShadedRoot.setRowCount(shadedRoot.getRowCount());
+        return nonShadedRoot;
+    }
+
+    private static void copyVectorData(
+            org.apache.fluss.shaded.arrow.org.apache.arrow.vector.FieldVector 
shadedVector,
+            FieldVector nonShadedVector) {
+        List<org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowBuf> 
shadedBuffers =
+                shadedVector.getFieldBuffers();
+
+        int valueCount = shadedVector.getValueCount();
+        nonShadedVector.setValueCount(valueCount);
+
+        List<ArrowBuf> nonShadedBuffers = nonShadedVector.getFieldBuffers();
+
+        for (int i = 0; i < Math.min(shadedBuffers.size(), 
nonShadedBuffers.size()); i++) {
+            org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowBuf 
shadedBuf =
+                    shadedBuffers.get(i);
+            ArrowBuf nonShadedBuf = nonShadedBuffers.get(i);
+
+            long size = Math.min(shadedBuf.capacity(), 
nonShadedBuf.capacity());
+            if (size > 0) {
+                ByteBuffer srcBuffer = shadedBuf.nioBuffer(0, (int) size);
+                srcBuffer.position(0);
+                srcBuffer.limit((int) Math.min(size, Integer.MAX_VALUE));
+                nonShadedBuf.setBytes(0, srcBuffer);
+            }
+        }
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/LanceArrowUtils.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/LanceArrowUtils.java
index 6b4da804f..7e9448312 100644
--- 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/LanceArrowUtils.java
+++ 
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/LanceArrowUtils.java
@@ -17,30 +17,12 @@
 
 package org.apache.fluss.lake.lance.utils;
 
-import org.apache.fluss.lake.lance.writers.ArrowBigIntWriter;
-import org.apache.fluss.lake.lance.writers.ArrowBinaryWriter;
-import org.apache.fluss.lake.lance.writers.ArrowBooleanWriter;
-import org.apache.fluss.lake.lance.writers.ArrowDateWriter;
-import org.apache.fluss.lake.lance.writers.ArrowDecimalWriter;
-import org.apache.fluss.lake.lance.writers.ArrowDoubleWriter;
-import org.apache.fluss.lake.lance.writers.ArrowFieldWriter;
-import org.apache.fluss.lake.lance.writers.ArrowFloatWriter;
-import org.apache.fluss.lake.lance.writers.ArrowIntWriter;
-import org.apache.fluss.lake.lance.writers.ArrowSmallIntWriter;
-import org.apache.fluss.lake.lance.writers.ArrowTimeWriter;
-import org.apache.fluss.lake.lance.writers.ArrowTimestampLtzWriter;
-import org.apache.fluss.lake.lance.writers.ArrowTimestampNtzWriter;
-import org.apache.fluss.lake.lance.writers.ArrowTinyIntWriter;
-import org.apache.fluss.lake.lance.writers.ArrowVarBinaryWriter;
-import org.apache.fluss.lake.lance.writers.ArrowVarCharWriter;
-import org.apache.fluss.row.InternalRow;
 import org.apache.fluss.types.BigIntType;
 import org.apache.fluss.types.BinaryType;
 import org.apache.fluss.types.BooleanType;
 import org.apache.fluss.types.BytesType;
 import org.apache.fluss.types.CharType;
 import org.apache.fluss.types.DataType;
-import org.apache.fluss.types.DataTypeDefaultVisitor;
 import org.apache.fluss.types.DateType;
 import org.apache.fluss.types.DecimalType;
 import org.apache.fluss.types.DoubleType;
@@ -54,24 +36,6 @@ import org.apache.fluss.types.TimeType;
 import org.apache.fluss.types.TimestampType;
 import org.apache.fluss.types.TinyIntType;
 
-import org.apache.arrow.vector.BigIntVector;
-import org.apache.arrow.vector.BitVector;
-import org.apache.arrow.vector.DateDayVector;
-import org.apache.arrow.vector.DecimalVector;
-import org.apache.arrow.vector.FixedSizeBinaryVector;
-import org.apache.arrow.vector.Float4Vector;
-import org.apache.arrow.vector.Float8Vector;
-import org.apache.arrow.vector.IntVector;
-import org.apache.arrow.vector.SmallIntVector;
-import org.apache.arrow.vector.TimeMicroVector;
-import org.apache.arrow.vector.TimeMilliVector;
-import org.apache.arrow.vector.TimeNanoVector;
-import org.apache.arrow.vector.TimeSecVector;
-import org.apache.arrow.vector.TimeStampVector;
-import org.apache.arrow.vector.TinyIntVector;
-import org.apache.arrow.vector.ValueVector;
-import org.apache.arrow.vector.VarBinaryVector;
-import org.apache.arrow.vector.VarCharVector;
 import org.apache.arrow.vector.types.DateUnit;
 import org.apache.arrow.vector.types.FloatingPointPrecision;
 import org.apache.arrow.vector.types.TimeUnit;
@@ -83,9 +47,13 @@ import org.apache.arrow.vector.types.pojo.Schema;
 import java.util.List;
 import java.util.stream.Collectors;
 
-/** Utilities for Arrow. */
+/**
+ * Utilities for converting Fluss RowType to non-shaded Arrow Schema. This is 
needed because Lance
+ * requires non-shaded Arrow API.
+ */
 public class LanceArrowUtils {
-    /** Returns the Arrow schema of the specified type. */
+
+    /** Returns the non-shaded Arrow schema of the specified Fluss RowType. */
     public static Schema toArrowSchema(RowType rowType) {
         List<Field> fields =
                 rowType.getFields().stream()
@@ -96,86 +64,40 @@ public class LanceArrowUtils {
 
     private static Field toArrowField(String fieldName, DataType logicalType) {
         FieldType fieldType =
-                new FieldType(
-                        logicalType.isNullable(),
-                        
logicalType.accept(DataTypeToArrowTypeConverter.INSTANCE),
-                        null);
+                new FieldType(logicalType.isNullable(), 
toArrowType(logicalType), null);
         return new Field(fieldName, fieldType, null);
     }
 
-    private static class DataTypeToArrowTypeConverter extends 
DataTypeDefaultVisitor<ArrowType> {
-
-        private static final DataTypeToArrowTypeConverter INSTANCE =
-                new DataTypeToArrowTypeConverter();
-
-        @Override
-        public ArrowType visit(TinyIntType tinyIntType) {
+    private static ArrowType toArrowType(DataType dataType) {
+        if (dataType instanceof TinyIntType) {
             return new ArrowType.Int(8, true);
-        }
-
-        @Override
-        public ArrowType visit(SmallIntType smallIntType) {
-            return new ArrowType.Int(2 * 8, true);
-        }
-
-        @Override
-        public ArrowType visit(IntType intType) {
-            return new ArrowType.Int(4 * 8, true);
-        }
-
-        @Override
-        public ArrowType visit(BigIntType bigIntType) {
-            return new ArrowType.Int(8 * 8, true);
-        }
-
-        @Override
-        public ArrowType visit(BooleanType booleanType) {
+        } else if (dataType instanceof SmallIntType) {
+            return new ArrowType.Int(16, true);
+        } else if (dataType instanceof IntType) {
+            return new ArrowType.Int(32, true);
+        } else if (dataType instanceof BigIntType) {
+            return new ArrowType.Int(64, true);
+        } else if (dataType instanceof BooleanType) {
             return ArrowType.Bool.INSTANCE;
-        }
-
-        @Override
-        public ArrowType visit(FloatType floatType) {
+        } else if (dataType instanceof FloatType) {
             return new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE);
-        }
-
-        @Override
-        public ArrowType visit(DoubleType doubleType) {
+        } else if (dataType instanceof DoubleType) {
             return new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE);
-        }
-
-        @Override
-        public ArrowType visit(CharType varCharType) {
-            return ArrowType.Utf8.INSTANCE;
-        }
-
-        @Override
-        public ArrowType visit(StringType stringType) {
+        } else if (dataType instanceof CharType || dataType instanceof 
StringType) {
             return ArrowType.Utf8.INSTANCE;
-        }
-
-        @Override
-        public ArrowType visit(BinaryType binaryType) {
+        } else if (dataType instanceof BinaryType) {
+            BinaryType binaryType = (BinaryType) dataType;
             return new ArrowType.FixedSizeBinary(binaryType.getLength());
-        }
-
-        @Override
-        public ArrowType visit(BytesType bytesType) {
+        } else if (dataType instanceof BytesType) {
             return ArrowType.Binary.INSTANCE;
-        }
-
-        @Override
-        public ArrowType visit(DecimalType decimalType) {
+        } else if (dataType instanceof DecimalType) {
+            DecimalType decimalType = (DecimalType) dataType;
             return ArrowType.Decimal.createDecimal(
                     decimalType.getPrecision(), decimalType.getScale(), null);
-        }
-
-        @Override
-        public ArrowType visit(DateType dateType) {
+        } else if (dataType instanceof DateType) {
             return new ArrowType.Date(DateUnit.DAY);
-        }
-
-        @Override
-        public ArrowType visit(TimeType timeType) {
+        } else if (dataType instanceof TimeType) {
+            TimeType timeType = (TimeType) dataType;
             if (timeType.getPrecision() == 0) {
                 return new ArrowType.Time(TimeUnit.SECOND, 32);
             } else if (timeType.getPrecision() >= 1 && timeType.getPrecision() 
<= 3) {
@@ -185,25 +107,19 @@ public class LanceArrowUtils {
             } else {
                 return new ArrowType.Time(TimeUnit.NANOSECOND, 64);
             }
-        }
-
-        @Override
-        public ArrowType visit(LocalZonedTimestampType 
localZonedTimestampType) {
-            if (localZonedTimestampType.getPrecision() == 0) {
+        } else if (dataType instanceof LocalZonedTimestampType) {
+            LocalZonedTimestampType timestampType = (LocalZonedTimestampType) 
dataType;
+            if (timestampType.getPrecision() == 0) {
                 return new ArrowType.Timestamp(TimeUnit.SECOND, null);
-            } else if (localZonedTimestampType.getPrecision() >= 1
-                    && localZonedTimestampType.getPrecision() <= 3) {
+            } else if (timestampType.getPrecision() >= 1 && 
timestampType.getPrecision() <= 3) {
                 return new ArrowType.Timestamp(TimeUnit.MILLISECOND, null);
-            } else if (localZonedTimestampType.getPrecision() >= 4
-                    && localZonedTimestampType.getPrecision() <= 6) {
+            } else if (timestampType.getPrecision() >= 4 && 
timestampType.getPrecision() <= 6) {
                 return new ArrowType.Timestamp(TimeUnit.MICROSECOND, null);
             } else {
                 return new ArrowType.Timestamp(TimeUnit.NANOSECOND, null);
             }
-        }
-
-        @Override
-        public ArrowType visit(TimestampType timestampType) {
+        } else if (dataType instanceof TimestampType) {
+            TimestampType timestampType = (TimestampType) dataType;
             if (timestampType.getPrecision() == 0) {
                 return new ArrowType.Timestamp(TimeUnit.SECOND, null);
             } else if (timestampType.getPrecision() >= 1 && 
timestampType.getPrecision() <= 3) {
@@ -213,66 +129,10 @@ public class LanceArrowUtils {
             } else {
                 return new ArrowType.Timestamp(TimeUnit.NANOSECOND, null);
             }
-        }
-
-        @Override
-        protected ArrowType defaultMethod(DataType dataType) {
+        } else {
             throw new UnsupportedOperationException(
                     String.format(
                             "Unsupported data type %s currently.", 
dataType.asSummaryString()));
         }
     }
-
-    private static int getPrecision(DecimalVector decimalVector) {
-        return decimalVector.getPrecision();
-    }
-
-    public static ArrowFieldWriter<InternalRow> createArrowFieldWriter(
-            ValueVector vector, DataType dataType) {
-        if (vector instanceof TinyIntVector) {
-            return ArrowTinyIntWriter.forField((TinyIntVector) vector);
-        } else if (vector instanceof SmallIntVector) {
-            return ArrowSmallIntWriter.forField((SmallIntVector) vector);
-        } else if (vector instanceof IntVector) {
-            return ArrowIntWriter.forField((IntVector) vector);
-        } else if (vector instanceof BigIntVector) {
-            return ArrowBigIntWriter.forField((BigIntVector) vector);
-        } else if (vector instanceof BitVector) {
-            return ArrowBooleanWriter.forField((BitVector) vector);
-        } else if (vector instanceof Float4Vector) {
-            return ArrowFloatWriter.forField((Float4Vector) vector);
-        } else if (vector instanceof Float8Vector) {
-            return ArrowDoubleWriter.forField((Float8Vector) vector);
-        } else if (vector instanceof VarCharVector) {
-            return ArrowVarCharWriter.forField((VarCharVector) vector);
-        } else if (vector instanceof FixedSizeBinaryVector) {
-            return ArrowBinaryWriter.forField((FixedSizeBinaryVector) vector);
-        } else if (vector instanceof VarBinaryVector) {
-            return ArrowVarBinaryWriter.forField((VarBinaryVector) vector);
-        } else if (vector instanceof DecimalVector) {
-            DecimalVector decimalVector = (DecimalVector) vector;
-            return ArrowDecimalWriter.forField(
-                    decimalVector, getPrecision(decimalVector), 
decimalVector.getScale());
-        } else if (vector instanceof DateDayVector) {
-            return ArrowDateWriter.forField((DateDayVector) vector);
-        } else if (vector instanceof TimeSecVector
-                || vector instanceof TimeMilliVector
-                || vector instanceof TimeMicroVector
-                || vector instanceof TimeNanoVector) {
-            return ArrowTimeWriter.forField(vector);
-        } else if (vector instanceof TimeStampVector
-                && ((ArrowType.Timestamp) 
vector.getField().getType()).getTimezone() == null) {
-            int precision;
-            if (dataType instanceof LocalZonedTimestampType) {
-                precision = ((LocalZonedTimestampType) 
dataType).getPrecision();
-                return ArrowTimestampLtzWriter.forField(vector, precision);
-            } else {
-                precision = ((TimestampType) dataType).getPrecision();
-                return ArrowTimestampNtzWriter.forField(vector, precision);
-            }
-        } else {
-            throw new UnsupportedOperationException(
-                    String.format("Unsupported type %s.", dataType));
-        }
-    }
 }
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/LanceDatasetAdapter.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/LanceDatasetAdapter.java
index 5d98da77f..b5e4c89f7 100644
--- 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/LanceDatasetAdapter.java
+++ 
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/LanceDatasetAdapter.java
@@ -18,21 +18,15 @@
 package org.apache.fluss.lake.lance.utils;
 
 import org.apache.fluss.lake.lance.LanceConfig;
-import org.apache.fluss.lake.lance.tiering.LanceArrowWriter;
-import org.apache.fluss.types.RowType;
 
 import com.lancedb.lance.Dataset;
-import com.lancedb.lance.Fragment;
 import com.lancedb.lance.FragmentMetadata;
 import com.lancedb.lance.ReadOptions;
 import com.lancedb.lance.Transaction;
 import com.lancedb.lance.WriteParams;
 import com.lancedb.lance.operation.Append;
-import org.apache.arrow.c.ArrowArrayStream;
-import org.apache.arrow.c.Data;
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
-import org.apache.arrow.vector.ipc.ArrowReader;
 import org.apache.arrow.vector.types.pojo.Schema;
 
 import java.util.List;
@@ -74,16 +68,4 @@ public class LanceDatasetAdapter {
             }
         }
     }
-
-    public static LanceArrowWriter getArrowWriter(Schema schema, int 
batchSize, RowType rowType) {
-        return new LanceArrowWriter(allocator, schema, batchSize, rowType);
-    }
-
-    public static List<FragmentMetadata> createFragment(
-            String datasetUri, ArrowReader reader, WriteParams params) {
-        try (ArrowArrayStream arrowStream = 
ArrowArrayStream.allocateNew(allocator)) {
-            Data.exportArrayStream(allocator, reader, arrowStream);
-            return Fragment.create(datasetUri, arrowStream, params);
-        }
-    }
 }
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowBigIntWriter.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowBigIntWriter.java
deleted file mode 100644
index 4cc124bec..000000000
--- 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowBigIntWriter.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.lake.lance.writers;
-
-import org.apache.fluss.row.InternalRow;
-
-import org.apache.arrow.vector.BigIntVector;
-
-/** {@link ArrowFieldWriter} for BigInt. */
-public class ArrowBigIntWriter extends ArrowFieldWriter<InternalRow> {
-
-    public static ArrowBigIntWriter forField(BigIntVector bigIntVector) {
-        return new ArrowBigIntWriter(bigIntVector);
-    }
-
-    private ArrowBigIntWriter(BigIntVector bigIntVector) {
-        super(bigIntVector);
-    }
-
-    @Override
-    public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
-        BigIntVector vector = (BigIntVector) getValueVector();
-        if (isNullAt(row, ordinal)) {
-            vector.setNull(getCount());
-        } else if (handleSafe) {
-            vector.setSafe(getCount(), readLong(row, ordinal));
-        }
-    }
-
-    private boolean isNullAt(InternalRow row, int ordinal) {
-        return row.isNullAt(ordinal);
-    }
-
-    private long readLong(InternalRow row, int ordinal) {
-        return row.getLong(ordinal);
-    }
-}
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowBinaryWriter.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowBinaryWriter.java
deleted file mode 100644
index 75fa16d76..000000000
--- 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowBinaryWriter.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.lake.lance.writers;
-
-import org.apache.fluss.row.InternalRow;
-
-import org.apache.arrow.vector.FixedSizeBinaryVector;
-
-/** {@link ArrowFieldWriter} for Binary. */
-public class ArrowBinaryWriter extends ArrowFieldWriter<InternalRow> {
-
-    public static ArrowBinaryWriter forField(FixedSizeBinaryVector 
binaryVector) {
-        return new ArrowBinaryWriter(binaryVector);
-    }
-
-    private final int byteWidth;
-
-    private ArrowBinaryWriter(FixedSizeBinaryVector binaryVector) {
-        super(binaryVector);
-        this.byteWidth = binaryVector.getByteWidth();
-    }
-
-    @Override
-    public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
-        FixedSizeBinaryVector vector = (FixedSizeBinaryVector) 
getValueVector();
-        if (isNullAt(row, ordinal)) {
-            vector.setNull(getCount());
-        } else if (handleSafe) {
-            vector.setSafe(getCount(), readBinary(row, ordinal));
-        } else {
-            vector.set(getCount(), readBinary(row, ordinal));
-        }
-    }
-
-    private boolean isNullAt(InternalRow row, int ordinal) {
-        return row.isNullAt(ordinal);
-    }
-
-    private byte[] readBinary(InternalRow row, int ordinal) {
-        return row.getBinary(ordinal, byteWidth);
-    }
-}
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowBooleanWriter.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowBooleanWriter.java
deleted file mode 100644
index 85c9b7b38..000000000
--- 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowBooleanWriter.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.lake.lance.writers;
-
-import org.apache.fluss.row.InternalRow;
-
-import org.apache.arrow.vector.BitVector;
-
-/** {@link ArrowFieldWriter} for Boolean. */
-public class ArrowBooleanWriter extends ArrowFieldWriter<InternalRow> {
-
-    public static ArrowBooleanWriter forField(BitVector booleanVector) {
-        return new ArrowBooleanWriter(booleanVector);
-    }
-
-    private ArrowBooleanWriter(BitVector bitVector) {
-        super(bitVector);
-    }
-
-    @Override
-    public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
-        BitVector vector = (BitVector) getValueVector();
-        if (isNullAt(row, ordinal)) {
-            vector.setNull(getCount());
-        } else if (handleSafe) {
-            vector.setSafe(getCount(), readBoolean(row, ordinal) ? 1 : 0);
-        } else {
-            vector.set(getCount(), readBoolean(row, ordinal) ? 1 : 0);
-        }
-    }
-
-    private boolean isNullAt(InternalRow row, int ordinal) {
-        return row.isNullAt(ordinal);
-    }
-
-    private boolean readBoolean(InternalRow row, int ordinal) {
-        return row.getBoolean(ordinal);
-    }
-}
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowDateWriter.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowDateWriter.java
deleted file mode 100644
index 8aa2ec7bd..000000000
--- 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowDateWriter.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.lake.lance.writers;
-
-import org.apache.fluss.row.InternalRow;
-
-import org.apache.arrow.vector.DateDayVector;
-
-/** {@link ArrowFieldWriter} for Date. */
-public class ArrowDateWriter extends ArrowFieldWriter<InternalRow> {
-
-    public static ArrowDateWriter forField(DateDayVector dateDayVector) {
-        return new ArrowDateWriter(dateDayVector);
-    }
-
-    private ArrowDateWriter(DateDayVector dateDayVector) {
-        super(dateDayVector);
-    }
-
-    @Override
-    public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
-        DateDayVector vector = (DateDayVector) getValueVector();
-        if (isNullAt(row, ordinal)) {
-            vector.setNull(getCount());
-        } else if (handleSafe) {
-            vector.setSafe(getCount(), readDate(row, ordinal));
-        } else {
-            vector.set(getCount(), readDate(row, ordinal));
-        }
-    }
-
-    private boolean isNullAt(InternalRow row, int ordinal) {
-        return row.isNullAt(ordinal);
-    }
-
-    private int readDate(InternalRow row, int ordinal) {
-        return row.getInt(ordinal);
-    }
-}
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowDecimalWriter.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowDecimalWriter.java
deleted file mode 100644
index 0a9d01c4e..000000000
--- 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowDecimalWriter.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.lake.lance.writers;
-
-import org.apache.fluss.row.Decimal;
-import org.apache.fluss.row.InternalRow;
-
-import org.apache.arrow.vector.DecimalVector;
-
-import java.math.BigDecimal;
-
-/** {@link ArrowFieldWriter} for Decimal. */
-public class ArrowDecimalWriter extends ArrowFieldWriter<InternalRow> {
-
-    public static ArrowDecimalWriter forField(
-            DecimalVector decimalVector, int precision, int scale) {
-        return new ArrowDecimalWriter(decimalVector, precision, scale);
-    }
-
-    private final int precision;
-    private final int scale;
-
-    private ArrowDecimalWriter(DecimalVector decimalVector, int precision, int 
scale) {
-        super(decimalVector);
-        this.precision = precision;
-        this.scale = scale;
-    }
-
-    @Override
-    public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
-        DecimalVector vector = (DecimalVector) getValueVector();
-        if (isNullAt(row, ordinal)) {
-            vector.setNull(getCount());
-        } else {
-            BigDecimal bigDecimal = readDecimal(row, ordinal).toBigDecimal();
-            if (bigDecimal == null) {
-                vector.setNull(getCount());
-            } else {
-                if (handleSafe) {
-                    vector.setSafe(getCount(), bigDecimal);
-                } else {
-                    vector.set(getCount(), bigDecimal);
-                }
-            }
-        }
-    }
-
-    private boolean isNullAt(InternalRow row, int ordinal) {
-        return row.isNullAt(ordinal);
-    }
-
-    private Decimal readDecimal(InternalRow row, int ordinal) {
-        return row.getDecimal(ordinal, precision, scale);
-    }
-}
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowDoubleWriter.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowDoubleWriter.java
deleted file mode 100644
index 459d1ac86..000000000
--- 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowDoubleWriter.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.lake.lance.writers;
-
-import org.apache.fluss.row.InternalRow;
-
-import org.apache.arrow.vector.Float8Vector;
-
-/** {@link ArrowFieldWriter} for Double. */
-public class ArrowDoubleWriter extends ArrowFieldWriter<InternalRow> {
-
-    public static ArrowDoubleWriter forField(Float8Vector doubleVector) {
-        return new ArrowDoubleWriter(doubleVector);
-    }
-
-    private ArrowDoubleWriter(Float8Vector doubleVector) {
-        super(doubleVector);
-    }
-
-    @Override
-    public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
-        Float8Vector vector = (Float8Vector) getValueVector();
-        if (isNullAt(row, ordinal)) {
-            vector.setNull(getCount());
-        } else if (handleSafe) {
-            vector.setSafe(getCount(), readDouble(row, ordinal));
-        } else {
-            vector.set(getCount(), readDouble(row, ordinal));
-        }
-    }
-
-    private boolean isNullAt(InternalRow row, int ordinal) {
-        return row.isNullAt(ordinal);
-    }
-
-    private double readDouble(InternalRow row, int ordinal) {
-        return row.getDouble(ordinal);
-    }
-}
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowFieldWriter.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowFieldWriter.java
deleted file mode 100644
index 15a56fc55..000000000
--- 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowFieldWriter.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.lake.lance.writers;
-
-import org.apache.arrow.vector.ValueVector;
-
-import static org.apache.fluss.utils.Preconditions.checkNotNull;
-
-/**
- * Base class for arrow field writer which is used to convert a field to an 
Arrow format.
- *
- * @param <IN> Type of the input to write. Currently, it's always InternalRow, 
may support
- *     InternalArray in the future.
- */
-public abstract class ArrowFieldWriter<IN> {
-
-    /** Container which is used to store the written sequence of values of a 
column. */
-    private final ValueVector valueVector;
-
-    private int count;
-
-    public ArrowFieldWriter(ValueVector valueVector) {
-        this.valueVector = checkNotNull(valueVector);
-    }
-
-    /** Returns the underlying container which stores the sequence of values 
of a column. */
-    public ValueVector getValueVector() {
-        return valueVector;
-    }
-
-    /** Returns the current count of elements written. */
-    public int getCount() {
-        return count;
-    }
-
-    /** Sets the field value as the field at the specified ordinal of the 
specified row. */
-    public abstract void doWrite(IN row, int ordinal, boolean handleSafe);
-
-    /** Writes the specified ordinal of the specified row. */
-    public void write(IN row, int ordinal, boolean handleSafe) {
-        doWrite(row, ordinal, handleSafe);
-        count++;
-    }
-
-    public void finish() {
-        valueVector.setValueCount(count);
-    }
-}
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowFloatWriter.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowFloatWriter.java
deleted file mode 100644
index c5528580a..000000000
--- 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowFloatWriter.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.lake.lance.writers;
-
-import org.apache.fluss.row.InternalRow;
-
-import org.apache.arrow.vector.Float4Vector;
-
-/** {@link ArrowFieldWriter} for Float. */
-public class ArrowFloatWriter extends ArrowFieldWriter<InternalRow> {
-    public static ArrowFloatWriter forField(Float4Vector float4Vector) {
-        return new ArrowFloatWriter(float4Vector);
-    }
-
-    private ArrowFloatWriter(Float4Vector float4Vector) {
-        super(float4Vector);
-    }
-
-    @Override
-    public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
-        Float4Vector vector = (Float4Vector) getValueVector();
-        if (isNullAt(row, ordinal)) {
-            vector.setNull(getCount());
-        } else if (handleSafe) {
-            vector.setSafe(getCount(), readFloat(row, ordinal));
-        } else {
-            vector.set(getCount(), readFloat(row, ordinal));
-        }
-    }
-
-    private boolean isNullAt(InternalRow row, int ordinal) {
-        return row.isNullAt(ordinal);
-    }
-
-    private float readFloat(InternalRow row, int ordinal) {
-        return row.getFloat(ordinal);
-    }
-}
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowIntWriter.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowIntWriter.java
deleted file mode 100644
index 6b5ee8588..000000000
--- 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowIntWriter.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.lake.lance.writers;
-
-import org.apache.fluss.row.InternalRow;
-
-import org.apache.arrow.vector.IntVector;
-
-/** {@link ArrowFieldWriter} for Int. */
-public class ArrowIntWriter extends ArrowFieldWriter<InternalRow> {
-
-    public static ArrowIntWriter forField(IntVector intVector) {
-        return new ArrowIntWriter(intVector);
-    }
-
-    private ArrowIntWriter(IntVector intVector) {
-        super(intVector);
-    }
-
-    @Override
-    public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
-        IntVector vector = (IntVector) getValueVector();
-        if (isNullAt(row, ordinal)) {
-            vector.setNull(getCount());
-        } else if (handleSafe) {
-            vector.setSafe(getCount(), readInt(row, ordinal));
-        } else {
-            vector.set(getCount(), readInt(row, ordinal));
-        }
-    }
-
-    private boolean isNullAt(InternalRow row, int ordinal) {
-        return row.isNullAt(ordinal);
-    }
-
-    int readInt(InternalRow row, int ordinal) {
-        return row.getInt(ordinal);
-    }
-}
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowSmallIntWriter.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowSmallIntWriter.java
deleted file mode 100644
index aab41753b..000000000
--- 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowSmallIntWriter.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.lake.lance.writers;
-
-import org.apache.fluss.row.InternalRow;
-
-import org.apache.arrow.vector.SmallIntVector;
-
-/** {@link ArrowFieldWriter} for SmallInt. */
-public class ArrowSmallIntWriter extends ArrowFieldWriter<InternalRow> {
-
-    public static ArrowSmallIntWriter forField(SmallIntVector smallIntVector) {
-        return new ArrowSmallIntWriter(smallIntVector);
-    }
-
-    private ArrowSmallIntWriter(SmallIntVector smallIntVector) {
-        super(smallIntVector);
-    }
-
-    @Override
-    public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
-        SmallIntVector vector = (SmallIntVector) getValueVector();
-        if (isNullAt(row, ordinal)) {
-            vector.setNull(getCount());
-        } else if (handleSafe) {
-            vector.setSafe(getCount(), readShort(row, ordinal));
-        } else {
-            vector.set(getCount(), readShort(row, ordinal));
-        }
-    }
-
-    public boolean isNullAt(InternalRow row, int ordinal) {
-        return row.isNullAt(ordinal);
-    }
-
-    public short readShort(InternalRow row, int ordinal) {
-        return row.getShort(ordinal);
-    }
-}
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowTimeWriter.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowTimeWriter.java
deleted file mode 100644
index bf1e1e901..000000000
--- 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowTimeWriter.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.lake.lance.writers;
-
-import org.apache.fluss.row.InternalRow;
-
-import org.apache.arrow.vector.BaseFixedWidthVector;
-import org.apache.arrow.vector.TimeMicroVector;
-import org.apache.arrow.vector.TimeMilliVector;
-import org.apache.arrow.vector.TimeNanoVector;
-import org.apache.arrow.vector.TimeSecVector;
-import org.apache.arrow.vector.ValueVector;
-
-import static org.apache.fluss.utils.Preconditions.checkState;
-
-/** {@link ArrowFieldWriter} for Time. */
-public class ArrowTimeWriter extends ArrowFieldWriter<InternalRow> {
-
-    public static ArrowTimeWriter forField(ValueVector valueVector) {
-        return new ArrowTimeWriter(valueVector);
-    }
-
-    private ArrowTimeWriter(ValueVector valueVector) {
-        super(valueVector);
-        checkState(
-                valueVector instanceof TimeSecVector
-                        || valueVector instanceof TimeMilliVector
-                        || valueVector instanceof TimeMicroVector
-                        || valueVector instanceof TimeNanoVector);
-    }
-
-    @Override
-    public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
-        ValueVector valueVector = getValueVector();
-        if (isNullAt(row, ordinal)) {
-            ((BaseFixedWidthVector) valueVector).setNull(getCount());
-        } else if (valueVector instanceof TimeSecVector) {
-            int sec = readTime(row, ordinal) / 1000;
-            if (handleSafe) {
-                ((TimeSecVector) valueVector).setSafe(getCount(), sec);
-            } else {
-                ((TimeSecVector) valueVector).set(getCount(), sec);
-            }
-        } else if (valueVector instanceof TimeMilliVector) {
-            int ms = readTime(row, ordinal);
-            if (handleSafe) {
-                ((TimeMilliVector) valueVector).setSafe(getCount(), ms);
-            } else {
-                ((TimeMilliVector) valueVector).set(getCount(), ms);
-            }
-        } else if (valueVector instanceof TimeMicroVector) {
-            long microSec = readTime(row, ordinal) * 1000L;
-            if (handleSafe) {
-                ((TimeMicroVector) valueVector).setSafe(getCount(), microSec);
-            } else {
-                ((TimeMicroVector) valueVector).set(getCount(), microSec);
-            }
-        } else {
-            long nanoSec = readTime(row, ordinal) * 1000000L;
-            if (handleSafe) {
-                ((TimeNanoVector) valueVector).setSafe(getCount(), nanoSec);
-            } else {
-                ((TimeNanoVector) valueVector).set(getCount(), nanoSec);
-            }
-        }
-    }
-
-    private boolean isNullAt(InternalRow row, int ordinal) {
-        return row.isNullAt(ordinal);
-    }
-
-    private int readTime(InternalRow row, int ordinal) {
-        return row.getInt(ordinal);
-    }
-}
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowTimestampLtzWriter.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowTimestampLtzWriter.java
deleted file mode 100644
index a9333c000..000000000
--- 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowTimestampLtzWriter.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.lake.lance.writers;
-
-import org.apache.fluss.row.InternalRow;
-import org.apache.fluss.row.TimestampLtz;
-
-import org.apache.arrow.vector.TimeStampMicroVector;
-import org.apache.arrow.vector.TimeStampMilliVector;
-import org.apache.arrow.vector.TimeStampSecVector;
-import org.apache.arrow.vector.TimeStampVector;
-import org.apache.arrow.vector.ValueVector;
-import org.apache.arrow.vector.types.pojo.ArrowType;
-
-import static org.apache.fluss.utils.Preconditions.checkState;
-
-/** {@link ArrowFieldWriter} for TimestampLtz. */
-public class ArrowTimestampLtzWriter extends ArrowFieldWriter<InternalRow> {
-    public static ArrowTimestampLtzWriter forField(ValueVector valueVector, 
int precision) {
-        return new ArrowTimestampLtzWriter(valueVector, precision);
-    }
-
-    private final int precision;
-
-    private ArrowTimestampLtzWriter(ValueVector valueVector, int precision) {
-        super(valueVector);
-        checkState(
-                valueVector instanceof TimeStampVector
-                        && ((ArrowType.Timestamp) 
valueVector.getField().getType()).getTimezone()
-                                == null);
-        this.precision = precision;
-    }
-
-    @Override
-    public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
-        TimeStampVector vector = (TimeStampVector) getValueVector();
-        if (isNullAt(row, ordinal)) {
-            vector.setNull(getCount());
-        } else {
-            TimestampLtz timestamp = readTimestamp(row, ordinal);
-            if (vector instanceof TimeStampSecVector) {
-                long sec = timestamp.getEpochMillisecond() / 1000;
-                if (handleSafe) {
-                    vector.setSafe(getCount(), sec);
-                } else {
-                    vector.set(getCount(), sec);
-                }
-            } else if (vector instanceof TimeStampMilliVector) {
-                long ms = timestamp.getEpochMillisecond();
-                if (handleSafe) {
-                    vector.setSafe(getCount(), ms);
-                } else {
-                    vector.set(getCount(), ms);
-                }
-            } else if (vector instanceof TimeStampMicroVector) {
-                long microSec =
-                        timestamp.getEpochMillisecond() * 1000
-                                + timestamp.getNanoOfMillisecond() / 1000;
-                if (handleSafe) {
-                    vector.setSafe(getCount(), microSec);
-                } else {
-                    vector.set(getCount(), microSec);
-                }
-            } else {
-                long nanoSec =
-                        timestamp.getEpochMillisecond() * 1_000_000
-                                + timestamp.getNanoOfMillisecond();
-                if (handleSafe) {
-                    vector.setSafe(getCount(), nanoSec);
-                } else {
-                    vector.set(getCount(), nanoSec);
-                }
-            }
-        }
-    }
-
-    private boolean isNullAt(InternalRow row, int ordinal) {
-        return row.isNullAt(ordinal);
-    }
-
-    private TimestampLtz readTimestamp(InternalRow row, int ordinal) {
-        return row.getTimestampLtz(ordinal, precision);
-    }
-}
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowTimestampNtzWriter.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowTimestampNtzWriter.java
deleted file mode 100644
index c57a30eb2..000000000
--- 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowTimestampNtzWriter.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.lake.lance.writers;
-
-import org.apache.fluss.row.InternalRow;
-import org.apache.fluss.row.TimestampNtz;
-
-import org.apache.arrow.vector.TimeStampMicroVector;
-import org.apache.arrow.vector.TimeStampMilliVector;
-import org.apache.arrow.vector.TimeStampSecVector;
-import org.apache.arrow.vector.TimeStampVector;
-import org.apache.arrow.vector.ValueVector;
-import org.apache.arrow.vector.types.pojo.ArrowType;
-
-import static org.apache.fluss.utils.Preconditions.checkState;
-
-/** {@link ArrowFieldWriter} for TimestampNtz. */
-public class ArrowTimestampNtzWriter extends ArrowFieldWriter<InternalRow> {
-    public static ArrowTimestampNtzWriter forField(ValueVector valueVector, 
int precision) {
-        return new ArrowTimestampNtzWriter(valueVector, precision);
-    }
-
-    private final int precision;
-
-    private ArrowTimestampNtzWriter(ValueVector valueVector, int precision) {
-        super(valueVector);
-        checkState(
-                valueVector instanceof TimeStampVector
-                        && ((ArrowType.Timestamp) 
valueVector.getField().getType()).getTimezone()
-                                == null);
-        this.precision = precision;
-    }
-
-    @Override
-    public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
-        TimeStampVector vector = (TimeStampVector) getValueVector();
-        if (isNullAt(row, ordinal)) {
-            vector.setNull(getCount());
-        } else {
-            TimestampNtz timestamp = readTimestamp(row, ordinal);
-            if (vector instanceof TimeStampSecVector) {
-                long sec = timestamp.getMillisecond() / 1000;
-                if (handleSafe) {
-                    vector.setSafe(getCount(), sec);
-                } else {
-                    vector.set(getCount(), sec);
-                }
-            } else if (vector instanceof TimeStampMilliVector) {
-                long ms = timestamp.getMillisecond();
-                if (handleSafe) {
-                    vector.setSafe(getCount(), ms);
-                } else {
-                    vector.set(getCount(), ms);
-                }
-            } else if (vector instanceof TimeStampMicroVector) {
-                long microSec =
-                        timestamp.getMillisecond() * 1000 + 
timestamp.getNanoOfMillisecond() / 1000;
-                if (handleSafe) {
-                    vector.setSafe(getCount(), microSec);
-                } else {
-                    vector.set(getCount(), microSec);
-                }
-            } else {
-                long nanoSec =
-                        timestamp.getMillisecond() * 1_000_000 + 
timestamp.getNanoOfMillisecond();
-                if (handleSafe) {
-                    vector.setSafe(getCount(), nanoSec);
-                } else {
-                    vector.set(getCount(), nanoSec);
-                }
-            }
-        }
-    }
-
-    private boolean isNullAt(InternalRow row, int ordinal) {
-        return row.isNullAt(ordinal);
-    }
-
-    private TimestampNtz readTimestamp(InternalRow row, int ordinal) {
-        return row.getTimestampNtz(ordinal, precision);
-    }
-}
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowTinyIntWriter.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowTinyIntWriter.java
deleted file mode 100644
index 0be03aac7..000000000
--- 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowTinyIntWriter.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.lake.lance.writers;
-
-import org.apache.fluss.row.InternalRow;
-
-import org.apache.arrow.vector.TinyIntVector;
-
-/** {@link ArrowFieldWriter} for TinyInt. */
-public class ArrowTinyIntWriter extends ArrowFieldWriter<InternalRow> {
-
-    public static ArrowTinyIntWriter forField(TinyIntVector tinyIntVector) {
-        return new ArrowTinyIntWriter(tinyIntVector);
-    }
-
-    private ArrowTinyIntWriter(TinyIntVector tinyIntVector) {
-        super(tinyIntVector);
-    }
-
-    @Override
-    public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
-        TinyIntVector vector = (TinyIntVector) getValueVector();
-        if (isNullAt(row, ordinal)) {
-            vector.setNull(getCount());
-        } else {
-            if (handleSafe) {
-                vector.setSafe(getCount(), readByte(row, ordinal));
-            } else {
-                vector.set(getCount(), readByte(row, ordinal));
-            }
-        }
-    }
-
-    private boolean isNullAt(InternalRow row, int ordinal) {
-        return row.isNullAt(ordinal);
-    }
-
-    private byte readByte(InternalRow row, int ordinal) {
-        return row.getByte(ordinal);
-    }
-}
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowVarBinaryWriter.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowVarBinaryWriter.java
deleted file mode 100644
index 3c9668a3c..000000000
--- 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowVarBinaryWriter.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.lake.lance.writers;
-
-import org.apache.fluss.row.InternalRow;
-
-import org.apache.arrow.vector.VarBinaryVector;
-
-/** {@link ArrowFieldWriter} for VarBinary. */
-public class ArrowVarBinaryWriter extends ArrowFieldWriter<InternalRow> {
-
-    public static ArrowVarBinaryWriter forField(VarBinaryVector 
varBinaryVector) {
-        return new ArrowVarBinaryWriter(varBinaryVector);
-    }
-
-    private ArrowVarBinaryWriter(VarBinaryVector varBinaryVector) {
-        super(varBinaryVector);
-    }
-
-    @Override
-    public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
-        VarBinaryVector vector = (VarBinaryVector) getValueVector();
-        if (isNullAt(row, ordinal)) {
-            vector.setNull(getCount());
-        } else {
-            vector.setSafe(getCount(), readBinary(row, ordinal));
-        }
-    }
-
-    private boolean isNullAt(InternalRow row, int ordinal) {
-        return row.isNullAt(ordinal);
-    }
-
-    private byte[] readBinary(InternalRow row, int ordinal) {
-        return row.getBytes(ordinal);
-    }
-}
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowVarCharWriter.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowVarCharWriter.java
deleted file mode 100644
index 72a401785..000000000
--- 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowVarCharWriter.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.lake.lance.writers;
-
-import org.apache.fluss.row.BinaryString;
-import org.apache.fluss.row.InternalRow;
-
-import org.apache.arrow.vector.VarCharVector;
-
-import java.nio.ByteBuffer;
-
-/** {@link ArrowFieldWriter} for VarChar. */
-public class ArrowVarCharWriter extends ArrowFieldWriter<InternalRow> {
-
-    public static ArrowVarCharWriter forField(VarCharVector varCharVector) {
-        return new ArrowVarCharWriter(varCharVector);
-    }
-
-    private ArrowVarCharWriter(VarCharVector varCharVector) {
-        super(varCharVector);
-    }
-
-    @Override
-    public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
-        VarCharVector vector = (VarCharVector) getValueVector();
-        if (isNullAt(row, ordinal)) {
-            vector.setNull(getCount());
-        } else {
-            ByteBuffer buffer = readString(row, ordinal).wrapByteBuffer();
-            vector.setSafe(getCount(), buffer, buffer.position(), 
buffer.remaining());
-        }
-    }
-
-    private boolean isNullAt(InternalRow row, int ordinal) {
-        return row.isNullAt(ordinal);
-    }
-
-    private BinaryString readString(InternalRow row, int ordinal) {
-        return row.getString(ordinal);
-    }
-}
diff --git 
a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/LakeEnabledTableCreateITCase.java
 
b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/LakeEnabledTableCreateITCase.java
index eeab99b99..45ccefdf0 100644
--- 
a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/LakeEnabledTableCreateITCase.java
+++ 
b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/LakeEnabledTableCreateITCase.java
@@ -166,7 +166,11 @@ class LakeEnabledTableCreateITCase {
         Field logC10 =
                 new Field("log_c10", FieldType.nullable(new 
ArrowType.FixedSizeBinary(10)), null);
         Field logC11 = new Field("log_c11", FieldType.nullable(new 
ArrowType.Binary()), null);
-        Field logC12 = new Field("log_c12", FieldType.nullable(new 
ArrowType.Decimal(10, 2)), null);
+        Field logC12 =
+                new Field(
+                        "log_c12",
+                        FieldType.nullable(ArrowType.Decimal.createDecimal(10, 
2, null)),
+                        null);
         Field logC13 =
                 new Field("log_c13", FieldType.nullable(new 
ArrowType.Date(DateUnit.DAY)), null);
         Field logC14 =


Reply via email to