This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 87993089a [lake/lance] Refactor LanceArrowWriter to reuse fluss-common
ArrowWriters (#2345)
87993089a is described below
commit 87993089a04cb48491391b0bd0d02f656f8439ba
Author: ForwardXu <[email protected]>
AuthorDate: Sat Jan 17 15:12:56 2026 +0800
[lake/lance] Refactor LanceArrowWriter to reuse fluss-common ArrowWriters
(#2345)
---
.../fluss/lake/lance/tiering/ArrowWriter.java | 74 --------
.../fluss/lake/lance/tiering/LanceArrowWriter.java | 130 -------------
.../lance/tiering/LanceCommittableSerializer.java | 2 +-
.../fluss/lake/lance/tiering/LanceLakeWriter.java | 111 +++++++----
.../lake/lance/tiering/ShadedArrowBatchWriter.java | 114 +++++++++++
.../fluss/lake/lance/utils/ArrowDataConverter.java | 93 +++++++++
.../fluss/lake/lance/utils/LanceArrowUtils.java | 210 ++++-----------------
.../lake/lance/utils/LanceDatasetAdapter.java | 18 --
.../lake/lance/writers/ArrowBigIntWriter.java | 52 -----
.../lake/lance/writers/ArrowBinaryWriter.java | 57 ------
.../lake/lance/writers/ArrowBooleanWriter.java | 54 ------
.../fluss/lake/lance/writers/ArrowDateWriter.java | 54 ------
.../lake/lance/writers/ArrowDecimalWriter.java | 70 -------
.../lake/lance/writers/ArrowDoubleWriter.java | 54 ------
.../fluss/lake/lance/writers/ArrowFieldWriter.java | 63 -------
.../fluss/lake/lance/writers/ArrowFloatWriter.java | 53 ------
.../fluss/lake/lance/writers/ArrowIntWriter.java | 54 ------
.../lake/lance/writers/ArrowSmallIntWriter.java | 54 ------
.../fluss/lake/lance/writers/ArrowTimeWriter.java | 90 ---------
.../lance/writers/ArrowTimestampLtzWriter.java | 99 ----------
.../lance/writers/ArrowTimestampNtzWriter.java | 97 ----------
.../lake/lance/writers/ArrowTinyIntWriter.java | 56 ------
.../lake/lance/writers/ArrowVarBinaryWriter.java | 52 -----
.../lake/lance/writers/ArrowVarCharWriter.java | 56 ------
.../lake/lance/LakeEnabledTableCreateITCase.java | 6 +-
25 files changed, 326 insertions(+), 1447 deletions(-)
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/ArrowWriter.java
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/ArrowWriter.java
deleted file mode 100644
index 928a33e3c..000000000
---
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/ArrowWriter.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.lake.lance.tiering;
-
-import org.apache.fluss.lake.lance.utils.LanceArrowUtils;
-import org.apache.fluss.lake.lance.writers.ArrowFieldWriter;
-import org.apache.fluss.row.InternalRow;
-import org.apache.fluss.types.RowType;
-
-import org.apache.arrow.vector.FieldVector;
-import org.apache.arrow.vector.VectorSchemaRoot;
-
-/** An Arrow writer for {@link InternalRow}. */
-public class ArrowWriter {
- private final VectorSchemaRoot root;
-
- private final ArrowFieldWriter<InternalRow>[] fieldWriters;
-
- private int recordsCount;
-
- /**
- * Writer which serializes the Fluss rows to Arrow record batches.
- *
- * @param fieldWriters An array of writers which are responsible for the
serialization of each
- * column of the rows
- * @param root Container that holds a set of vectors for the rows
- */
- public ArrowWriter(ArrowFieldWriter<InternalRow>[] fieldWriters,
VectorSchemaRoot root) {
- this.fieldWriters = fieldWriters;
- this.root = root;
- }
-
- public static ArrowWriter create(VectorSchemaRoot root, RowType rowType) {
- ArrowFieldWriter<InternalRow>[] fieldWriters =
- new ArrowFieldWriter[root.getFieldVectors().size()];
- for (int i = 0; i < fieldWriters.length; i++) {
- FieldVector fieldVector = root.getVector(i);
-
- fieldWriters[i] =
- LanceArrowUtils.createArrowFieldWriter(fieldVector,
rowType.getTypeAt(i));
- }
- return new ArrowWriter(fieldWriters, root);
- }
-
- /** Writes the specified row which is serialized into Arrow format. */
- public void writeRow(InternalRow row) {
- for (int i = 0; i < fieldWriters.length; i++) {
- fieldWriters[i].write(row, i, true);
- }
- recordsCount++;
- }
-
- public void finish() {
- root.setRowCount(recordsCount);
- for (ArrowFieldWriter<InternalRow> fieldWriter : fieldWriters) {
- fieldWriter.finish();
- }
- }
-}
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceArrowWriter.java
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceArrowWriter.java
deleted file mode 100644
index 2a4b1599e..000000000
---
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceArrowWriter.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.lake.lance.tiering;
-
-import org.apache.fluss.record.LogRecord;
-import org.apache.fluss.types.RowType;
-
-import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.vector.ipc.ArrowReader;
-import org.apache.arrow.vector.types.pojo.Schema;
-
-import java.io.IOException;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static org.apache.fluss.utils.Preconditions.checkArgument;
-import static org.apache.fluss.utils.Preconditions.checkNotNull;
-
-/** A custom arrow reader that supports writes Fluss internal rows while
reading data in batches. */
-public class LanceArrowWriter extends ArrowReader {
- private final Schema schema;
- private final RowType rowType;
- private final int batchSize;
-
- private volatile boolean finished = false;
-
- private final AtomicLong totalBytesRead = new AtomicLong();
- private ArrowWriter arrowWriter = null;
- private final AtomicInteger count = new AtomicInteger(0);
- private final Semaphore writeToken;
- private final Semaphore loadToken;
-
- public LanceArrowWriter(
- BufferAllocator allocator, Schema schema, int batchSize, RowType
rowType) {
- super(allocator);
- checkNotNull(schema);
- checkArgument(batchSize > 0);
- this.schema = schema;
- this.rowType = rowType;
- this.batchSize = batchSize;
- this.writeToken = new Semaphore(0);
- this.loadToken = new Semaphore(0);
- }
-
- void write(LogRecord row) {
- checkNotNull(row);
- try {
- // wait util prepareLoadNextBatch to release write token,
- writeToken.acquire();
- arrowWriter.writeRow(row.getRow());
- if (count.incrementAndGet() == batchSize) {
- // notify loadNextBatch to take the batch
- loadToken.release();
- }
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
-
- void setFinished() {
- finished = true;
- loadToken.release();
- }
-
- @Override
- public void prepareLoadNextBatch() throws IOException {
- super.prepareLoadNextBatch();
- arrowWriter = ArrowWriter.create(this.getVectorSchemaRoot(), rowType);
- // release batch size token for write
- writeToken.release(batchSize);
- }
-
- @Override
- public boolean loadNextBatch() throws IOException {
- prepareLoadNextBatch();
- try {
- if (finished && count.get() == 0) {
- return false;
- }
- // wait util batch if full or finished
- loadToken.acquire();
- arrowWriter.finish();
- if (!finished) {
- count.set(0);
- return true;
- } else {
- // true if it has some rows and return false if there is no
record
- if (count.get() > 0) {
- count.set(0);
- return true;
- } else {
- return false;
- }
- }
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public long bytesRead() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- protected synchronized void closeReadSource() throws IOException {
- // Implement if needed
- }
-
- @Override
- protected Schema readSchema() {
- return this.schema;
- }
-}
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceCommittableSerializer.java
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceCommittableSerializer.java
index d128acdc1..15b2208e6 100644
---
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceCommittableSerializer.java
+++
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceCommittableSerializer.java
@@ -47,6 +47,7 @@ public class LanceCommittableSerializer implements
SimpleVersionedSerializer<Lan
}
@Override
+ @SuppressWarnings("unchecked")
public LanceCommittable deserialize(int version, byte[] serialized) throws
IOException {
if (version != CURRENT_VERSION) {
throw new UnsupportedOperationException(
@@ -58,7 +59,6 @@ public class LanceCommittableSerializer implements
SimpleVersionedSerializer<Lan
}
try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
ObjectInputStream ois = new ObjectInputStream(bais)) {
- //noinspection unchecked
return new LanceCommittable((List<FragmentMetadata>)
ois.readObject());
} catch (ClassNotFoundException e) {
throw new IOException("Couldn't deserialize LanceCommittable", e);
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceLakeWriter.java
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceLakeWriter.java
index 699a528cc..1ed6c589e 100644
---
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceLakeWriter.java
+++
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceLakeWriter.java
@@ -19,26 +19,39 @@ package org.apache.fluss.lake.lance.tiering;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.lake.lance.LanceConfig;
+import org.apache.fluss.lake.lance.utils.ArrowDataConverter;
import org.apache.fluss.lake.lance.utils.LanceDatasetAdapter;
import org.apache.fluss.lake.writer.LakeWriter;
import org.apache.fluss.lake.writer.WriterInitContext;
import org.apache.fluss.record.LogRecord;
+import org.apache.fluss.types.RowType;
+import com.lancedb.lance.Fragment;
import com.lancedb.lance.FragmentMetadata;
import com.lancedb.lance.WriteParams;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.pojo.Schema;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.FutureTask;
-/** Implementation of {@link LakeWriter} for Lance. */
+/** Implementation of {@link LakeWriter} for Lance using batch processing. */
public class LanceLakeWriter implements LakeWriter<LanceWriteResult> {
- private final LanceArrowWriter arrowWriter;
- private final FutureTask<List<FragmentMetadata>> fragmentCreationTask;
+ private final BufferAllocator nonShadedAllocator;
+ private final
org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator
+ shadedAllocator;
+ private final Schema nonShadedSchema;
+ private final RowType rowType;
+ private final int batchSize;
+ private final String datasetUri;
+ private final WriteParams writeParams;
+
+ private final ShadedArrowBatchWriter arrowWriter;
+ private final List<FragmentMetadata> allFragments;
public LanceLakeWriter(Configuration options, WriterInitContext
writerInitContext)
throws IOException {
@@ -48,47 +61,79 @@ public class LanceLakeWriter implements
LakeWriter<LanceWriteResult> {
writerInitContext.tableInfo().getCustomProperties().toMap(),
writerInitContext.tablePath().getDatabaseName(),
writerInitContext.tablePath().getTableName());
- int batchSize = LanceConfig.getBatchSize(config);
+
+ this.batchSize = LanceConfig.getBatchSize(config);
+ this.datasetUri = config.getDatasetUri();
+ this.writeParams = LanceConfig.genWriteParamsFromConfig(config);
+ this.rowType = writerInitContext.tableInfo().getRowType();
+ this.nonShadedAllocator = new RootAllocator();
+ this.shadedAllocator =
+ new
org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator();
+ this.arrowWriter = new ShadedArrowBatchWriter(shadedAllocator,
rowType);
+ this.allFragments = new ArrayList<>();
+
Optional<Schema> schema = LanceDatasetAdapter.getSchema(config);
if (!schema.isPresent()) {
- throw new IOException("Fail to get dataset " +
config.getDatasetUri() + " in Lance.");
+ throw new IOException("Fail to get dataset " + datasetUri + " in
Lance.");
}
-
- this.arrowWriter =
- LanceDatasetAdapter.getArrowWriter(
- schema.get(), batchSize,
writerInitContext.tableInfo().getRowType());
-
- WriteParams params = LanceConfig.genWriteParamsFromConfig(config);
- Callable<List<FragmentMetadata>> fragmentCreator =
- () ->
- LanceDatasetAdapter.createFragment(
- config.getDatasetUri(), arrowWriter, params);
- fragmentCreationTask = new FutureTask<>(fragmentCreator);
- Thread fragmentCreationThread = new Thread(fragmentCreationTask);
- fragmentCreationThread.start();
+ this.nonShadedSchema = schema.get();
}
@Override
public void write(LogRecord record) throws IOException {
- arrowWriter.write(record);
+ arrowWriter.writeRow(record.getRow());
+
+ if (arrowWriter.getRecordsCount() >= batchSize) {
+ List<FragmentMetadata> fragments = flush();
+ allFragments.addAll(fragments);
+ }
}
- @Override
- public LanceWriteResult complete() throws IOException {
- arrowWriter.setFinished();
+ private List<FragmentMetadata> flush() throws IOException {
+ if (arrowWriter.getRecordsCount() == 0) {
+ return new ArrayList<>();
+ }
+
+ VectorSchemaRoot nonShadedRoot = null;
+
try {
- List<FragmentMetadata> fragmentMetadata =
fragmentCreationTask.get();
- return new LanceWriteResult(fragmentMetadata);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IOException("Interrupted while waiting for reader thread
to finish", e);
- } catch (ExecutionException e) {
- throw new IOException("Exception in reader thread", e);
+ arrowWriter.finish();
+
+ nonShadedRoot =
+ ArrowDataConverter.convertToNonShaded(
+ arrowWriter.getShadedRoot(), nonShadedAllocator,
nonShadedSchema);
+
+ List<FragmentMetadata> fragments =
+ Fragment.create(datasetUri, nonShadedAllocator,
nonShadedRoot, writeParams);
+
+ arrowWriter.reset();
+ return fragments;
+ } catch (Exception e) {
+ throw new IOException("Failed to write Lance fragment", e);
+ } finally {
+ if (nonShadedRoot != null) {
+ nonShadedRoot.close();
+ }
}
}
+ @Override
+ public LanceWriteResult complete() throws IOException {
+ List<FragmentMetadata> fragments = flush();
+ allFragments.addAll(fragments);
+ return new LanceWriteResult(allFragments);
+ }
+
@Override
public void close() throws IOException {
- arrowWriter.close();
+ if (arrowWriter != null) {
+ arrowWriter.close();
+ }
+ if (shadedAllocator != null) {
+ shadedAllocator.close();
+ }
+ if (nonShadedAllocator != null) {
+ nonShadedAllocator.close();
+ }
}
}
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/ShadedArrowBatchWriter.java
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/ShadedArrowBatchWriter.java
new file mode 100644
index 000000000..21b2aa1ec
--- /dev/null
+++
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/ShadedArrowBatchWriter.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.lance.tiering;
+
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.arrow.writers.ArrowFieldWriter;
+import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator;
+import
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.BaseFixedWidthVector;
+import
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.BaseVariableWidthVector;
+import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.FieldVector;
+import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.VectorSchemaRoot;
+import
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.ListVector;
+import org.apache.fluss.types.RowType;
+import org.apache.fluss.utils.ArrowUtils;
+
+/**
+ * Batch writer using shaded Arrow and ArrowFieldWriter from fluss-common.
+ *
+ * <p>This class uses shaded Arrow vectors and ArrowFieldWriters to write
Fluss InternalRows. It can
+ * later be converted to non-shaded Arrow format for Lance compatibility.
+ */
+public class ShadedArrowBatchWriter implements AutoCloseable {
+ private static final int INITIAL_CAPACITY = 1024;
+
+ private final VectorSchemaRoot shadedRoot;
+ private final ArrowFieldWriter[] fieldWriters;
+ private int recordsCount;
+
+ public ShadedArrowBatchWriter(BufferAllocator shadedAllocator, RowType
rowType) {
+ this.shadedRoot =
+ VectorSchemaRoot.create(ArrowUtils.toArrowSchema(rowType),
shadedAllocator);
+ this.fieldWriters = new ArrowFieldWriter[rowType.getFieldCount()];
+
+ for (int i = 0; i < fieldWriters.length; i++) {
+ FieldVector fieldVector = shadedRoot.getVector(i);
+ initFieldVector(fieldVector);
+ fieldWriters[i] = ArrowUtils.createArrowFieldWriter(fieldVector,
rowType.getTypeAt(i));
+ }
+ this.recordsCount = 0;
+ }
+
+ public void writeRow(InternalRow row) {
+ boolean handleSafe = recordsCount >= INITIAL_CAPACITY;
+ for (int i = 0; i < fieldWriters.length; i++) {
+ fieldWriters[i].write(recordsCount, row, i, handleSafe);
+ }
+ recordsCount++;
+ }
+
+ public void finish() {
+ shadedRoot.setRowCount(recordsCount);
+ }
+
+ public void reset() {
+ recordsCount = 0;
+ for (int i = 0; i < fieldWriters.length; i++) {
+ FieldVector fieldVector = shadedRoot.getVector(i);
+ initFieldVector(fieldVector);
+ }
+ for (ArrowFieldWriter fieldWriter : fieldWriters) {
+ fieldWriter.reset();
+ }
+ shadedRoot.setRowCount(0);
+ }
+
+ public int getRecordsCount() {
+ return recordsCount;
+ }
+
+ public VectorSchemaRoot getShadedRoot() {
+ return shadedRoot;
+ }
+
+ @Override
+ public void close() {
+ if (shadedRoot != null) {
+ shadedRoot.close();
+ }
+ }
+
+ private void initFieldVector(FieldVector fieldVector) {
+ fieldVector.setInitialCapacity(INITIAL_CAPACITY);
+
+ if (fieldVector instanceof BaseFixedWidthVector) {
+ ((BaseFixedWidthVector) fieldVector).allocateNew(INITIAL_CAPACITY);
+ } else if (fieldVector instanceof BaseVariableWidthVector) {
+ ((BaseVariableWidthVector)
fieldVector).allocateNew(INITIAL_CAPACITY);
+ } else if (fieldVector instanceof ListVector) {
+ ListVector listVector = (ListVector) fieldVector;
+ listVector.allocateNew();
+ FieldVector dataVector = listVector.getDataVector();
+ if (dataVector != null) {
+ initFieldVector(dataVector);
+ }
+ } else {
+ fieldVector.allocateNew();
+ }
+ }
+}
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/ArrowDataConverter.java
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/ArrowDataConverter.java
new file mode 100644
index 000000000..eb8c53b22
--- /dev/null
+++
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/ArrowDataConverter.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.lance.utils;
+
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ * Utility class to convert between shaded and non-shaded Arrow
VectorSchemaRoot by sharing off-heap
+ * memory directly.
+ *
+ * <p>Since both shaded and non-shaded Arrow use the same off-heap memory
layout, we can share the
+ * underlying ByteBuffer/memory address directly without serialization
overhead.
+ */
+public class ArrowDataConverter {
+
+ /**
+ * Converts a shaded Arrow VectorSchemaRoot to a non-shaded Arrow
VectorSchemaRoot by sharing
+ * the underlying off-heap memory.
+ *
+ * @param shadedRoot The shaded Arrow VectorSchemaRoot from fluss-common
+ * @param nonShadedAllocator The non-shaded BufferAllocator for Lance
+ * @param nonShadedSchema The non-shaded Arrow Schema for Lance
+ * @return A non-shaded VectorSchemaRoot compatible with Lance
+ */
+ public static VectorSchemaRoot convertToNonShaded(
+
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.VectorSchemaRoot
shadedRoot,
+ BufferAllocator nonShadedAllocator,
+ Schema nonShadedSchema) {
+
+ VectorSchemaRoot nonShadedRoot =
+ VectorSchemaRoot.create(nonShadedSchema, nonShadedAllocator);
+ nonShadedRoot.allocateNew();
+
+
List<org.apache.fluss.shaded.arrow.org.apache.arrow.vector.FieldVector>
shadedVectors =
+ shadedRoot.getFieldVectors();
+ List<FieldVector> nonShadedVectors = nonShadedRoot.getFieldVectors();
+
+ for (int i = 0; i < shadedVectors.size(); i++) {
+ copyVectorData(shadedVectors.get(i), nonShadedVectors.get(i));
+ }
+
+ nonShadedRoot.setRowCount(shadedRoot.getRowCount());
+ return nonShadedRoot;
+ }
+
+ private static void copyVectorData(
+ org.apache.fluss.shaded.arrow.org.apache.arrow.vector.FieldVector
shadedVector,
+ FieldVector nonShadedVector) {
+ List<org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowBuf>
shadedBuffers =
+ shadedVector.getFieldBuffers();
+
+ int valueCount = shadedVector.getValueCount();
+ nonShadedVector.setValueCount(valueCount);
+
+ List<ArrowBuf> nonShadedBuffers = nonShadedVector.getFieldBuffers();
+
+ for (int i = 0; i < Math.min(shadedBuffers.size(),
nonShadedBuffers.size()); i++) {
+ org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowBuf
shadedBuf =
+ shadedBuffers.get(i);
+ ArrowBuf nonShadedBuf = nonShadedBuffers.get(i);
+
+ long size = Math.min(shadedBuf.capacity(),
nonShadedBuf.capacity());
+ if (size > 0) {
+ ByteBuffer srcBuffer = shadedBuf.nioBuffer(0, (int) size);
+ srcBuffer.position(0);
+ srcBuffer.limit((int) Math.min(size, Integer.MAX_VALUE));
+ nonShadedBuf.setBytes(0, srcBuffer);
+ }
+ }
+ }
+}
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/LanceArrowUtils.java
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/LanceArrowUtils.java
index 6b4da804f..7e9448312 100644
---
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/LanceArrowUtils.java
+++
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/LanceArrowUtils.java
@@ -17,30 +17,12 @@
package org.apache.fluss.lake.lance.utils;
-import org.apache.fluss.lake.lance.writers.ArrowBigIntWriter;
-import org.apache.fluss.lake.lance.writers.ArrowBinaryWriter;
-import org.apache.fluss.lake.lance.writers.ArrowBooleanWriter;
-import org.apache.fluss.lake.lance.writers.ArrowDateWriter;
-import org.apache.fluss.lake.lance.writers.ArrowDecimalWriter;
-import org.apache.fluss.lake.lance.writers.ArrowDoubleWriter;
-import org.apache.fluss.lake.lance.writers.ArrowFieldWriter;
-import org.apache.fluss.lake.lance.writers.ArrowFloatWriter;
-import org.apache.fluss.lake.lance.writers.ArrowIntWriter;
-import org.apache.fluss.lake.lance.writers.ArrowSmallIntWriter;
-import org.apache.fluss.lake.lance.writers.ArrowTimeWriter;
-import org.apache.fluss.lake.lance.writers.ArrowTimestampLtzWriter;
-import org.apache.fluss.lake.lance.writers.ArrowTimestampNtzWriter;
-import org.apache.fluss.lake.lance.writers.ArrowTinyIntWriter;
-import org.apache.fluss.lake.lance.writers.ArrowVarBinaryWriter;
-import org.apache.fluss.lake.lance.writers.ArrowVarCharWriter;
-import org.apache.fluss.row.InternalRow;
import org.apache.fluss.types.BigIntType;
import org.apache.fluss.types.BinaryType;
import org.apache.fluss.types.BooleanType;
import org.apache.fluss.types.BytesType;
import org.apache.fluss.types.CharType;
import org.apache.fluss.types.DataType;
-import org.apache.fluss.types.DataTypeDefaultVisitor;
import org.apache.fluss.types.DateType;
import org.apache.fluss.types.DecimalType;
import org.apache.fluss.types.DoubleType;
@@ -54,24 +36,6 @@ import org.apache.fluss.types.TimeType;
import org.apache.fluss.types.TimestampType;
import org.apache.fluss.types.TinyIntType;
-import org.apache.arrow.vector.BigIntVector;
-import org.apache.arrow.vector.BitVector;
-import org.apache.arrow.vector.DateDayVector;
-import org.apache.arrow.vector.DecimalVector;
-import org.apache.arrow.vector.FixedSizeBinaryVector;
-import org.apache.arrow.vector.Float4Vector;
-import org.apache.arrow.vector.Float8Vector;
-import org.apache.arrow.vector.IntVector;
-import org.apache.arrow.vector.SmallIntVector;
-import org.apache.arrow.vector.TimeMicroVector;
-import org.apache.arrow.vector.TimeMilliVector;
-import org.apache.arrow.vector.TimeNanoVector;
-import org.apache.arrow.vector.TimeSecVector;
-import org.apache.arrow.vector.TimeStampVector;
-import org.apache.arrow.vector.TinyIntVector;
-import org.apache.arrow.vector.ValueVector;
-import org.apache.arrow.vector.VarBinaryVector;
-import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.types.DateUnit;
import org.apache.arrow.vector.types.FloatingPointPrecision;
import org.apache.arrow.vector.types.TimeUnit;
@@ -83,9 +47,13 @@ import org.apache.arrow.vector.types.pojo.Schema;
import java.util.List;
import java.util.stream.Collectors;
-/** Utilities for Arrow. */
+/**
+ * Utilities for converting Fluss RowType to non-shaded Arrow Schema. This is
needed because Lance
+ * requires non-shaded Arrow API.
+ */
public class LanceArrowUtils {
- /** Returns the Arrow schema of the specified type. */
+
+ /** Returns the non-shaded Arrow schema of the specified Fluss RowType. */
public static Schema toArrowSchema(RowType rowType) {
List<Field> fields =
rowType.getFields().stream()
@@ -96,86 +64,40 @@ public class LanceArrowUtils {
private static Field toArrowField(String fieldName, DataType logicalType) {
FieldType fieldType =
- new FieldType(
- logicalType.isNullable(),
-
logicalType.accept(DataTypeToArrowTypeConverter.INSTANCE),
- null);
+ new FieldType(logicalType.isNullable(),
toArrowType(logicalType), null);
return new Field(fieldName, fieldType, null);
}
- private static class DataTypeToArrowTypeConverter extends
DataTypeDefaultVisitor<ArrowType> {
-
- private static final DataTypeToArrowTypeConverter INSTANCE =
- new DataTypeToArrowTypeConverter();
-
- @Override
- public ArrowType visit(TinyIntType tinyIntType) {
+ private static ArrowType toArrowType(DataType dataType) {
+ if (dataType instanceof TinyIntType) {
return new ArrowType.Int(8, true);
- }
-
- @Override
- public ArrowType visit(SmallIntType smallIntType) {
- return new ArrowType.Int(2 * 8, true);
- }
-
- @Override
- public ArrowType visit(IntType intType) {
- return new ArrowType.Int(4 * 8, true);
- }
-
- @Override
- public ArrowType visit(BigIntType bigIntType) {
- return new ArrowType.Int(8 * 8, true);
- }
-
- @Override
- public ArrowType visit(BooleanType booleanType) {
+ } else if (dataType instanceof SmallIntType) {
+ return new ArrowType.Int(16, true);
+ } else if (dataType instanceof IntType) {
+ return new ArrowType.Int(32, true);
+ } else if (dataType instanceof BigIntType) {
+ return new ArrowType.Int(64, true);
+ } else if (dataType instanceof BooleanType) {
return ArrowType.Bool.INSTANCE;
- }
-
- @Override
- public ArrowType visit(FloatType floatType) {
+ } else if (dataType instanceof FloatType) {
return new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE);
- }
-
- @Override
- public ArrowType visit(DoubleType doubleType) {
+ } else if (dataType instanceof DoubleType) {
return new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE);
- }
-
- @Override
- public ArrowType visit(CharType varCharType) {
- return ArrowType.Utf8.INSTANCE;
- }
-
- @Override
- public ArrowType visit(StringType stringType) {
+ } else if (dataType instanceof CharType || dataType instanceof
StringType) {
return ArrowType.Utf8.INSTANCE;
- }
-
- @Override
- public ArrowType visit(BinaryType binaryType) {
+ } else if (dataType instanceof BinaryType) {
+ BinaryType binaryType = (BinaryType) dataType;
return new ArrowType.FixedSizeBinary(binaryType.getLength());
- }
-
- @Override
- public ArrowType visit(BytesType bytesType) {
+ } else if (dataType instanceof BytesType) {
return ArrowType.Binary.INSTANCE;
- }
-
- @Override
- public ArrowType visit(DecimalType decimalType) {
+ } else if (dataType instanceof DecimalType) {
+ DecimalType decimalType = (DecimalType) dataType;
return ArrowType.Decimal.createDecimal(
decimalType.getPrecision(), decimalType.getScale(), null);
- }
-
- @Override
- public ArrowType visit(DateType dateType) {
+ } else if (dataType instanceof DateType) {
return new ArrowType.Date(DateUnit.DAY);
- }
-
- @Override
- public ArrowType visit(TimeType timeType) {
+ } else if (dataType instanceof TimeType) {
+ TimeType timeType = (TimeType) dataType;
if (timeType.getPrecision() == 0) {
return new ArrowType.Time(TimeUnit.SECOND, 32);
} else if (timeType.getPrecision() >= 1 && timeType.getPrecision()
<= 3) {
@@ -185,25 +107,19 @@ public class LanceArrowUtils {
} else {
return new ArrowType.Time(TimeUnit.NANOSECOND, 64);
}
- }
-
- @Override
- public ArrowType visit(LocalZonedTimestampType
localZonedTimestampType) {
- if (localZonedTimestampType.getPrecision() == 0) {
+ } else if (dataType instanceof LocalZonedTimestampType) {
+ LocalZonedTimestampType timestampType = (LocalZonedTimestampType)
dataType;
+ if (timestampType.getPrecision() == 0) {
return new ArrowType.Timestamp(TimeUnit.SECOND, null);
- } else if (localZonedTimestampType.getPrecision() >= 1
- && localZonedTimestampType.getPrecision() <= 3) {
+ } else if (timestampType.getPrecision() >= 1 &&
timestampType.getPrecision() <= 3) {
return new ArrowType.Timestamp(TimeUnit.MILLISECOND, null);
- } else if (localZonedTimestampType.getPrecision() >= 4
- && localZonedTimestampType.getPrecision() <= 6) {
+ } else if (timestampType.getPrecision() >= 4 &&
timestampType.getPrecision() <= 6) {
return new ArrowType.Timestamp(TimeUnit.MICROSECOND, null);
} else {
return new ArrowType.Timestamp(TimeUnit.NANOSECOND, null);
}
- }
-
- @Override
- public ArrowType visit(TimestampType timestampType) {
+ } else if (dataType instanceof TimestampType) {
+ TimestampType timestampType = (TimestampType) dataType;
if (timestampType.getPrecision() == 0) {
return new ArrowType.Timestamp(TimeUnit.SECOND, null);
} else if (timestampType.getPrecision() >= 1 &&
timestampType.getPrecision() <= 3) {
@@ -213,66 +129,10 @@ public class LanceArrowUtils {
} else {
return new ArrowType.Timestamp(TimeUnit.NANOSECOND, null);
}
- }
-
- @Override
- protected ArrowType defaultMethod(DataType dataType) {
+ } else {
throw new UnsupportedOperationException(
String.format(
"Unsupported data type %s currently.",
dataType.asSummaryString()));
}
}
-
- private static int getPrecision(DecimalVector decimalVector) {
- return decimalVector.getPrecision();
- }
-
- public static ArrowFieldWriter<InternalRow> createArrowFieldWriter(
- ValueVector vector, DataType dataType) {
- if (vector instanceof TinyIntVector) {
- return ArrowTinyIntWriter.forField((TinyIntVector) vector);
- } else if (vector instanceof SmallIntVector) {
- return ArrowSmallIntWriter.forField((SmallIntVector) vector);
- } else if (vector instanceof IntVector) {
- return ArrowIntWriter.forField((IntVector) vector);
- } else if (vector instanceof BigIntVector) {
- return ArrowBigIntWriter.forField((BigIntVector) vector);
- } else if (vector instanceof BitVector) {
- return ArrowBooleanWriter.forField((BitVector) vector);
- } else if (vector instanceof Float4Vector) {
- return ArrowFloatWriter.forField((Float4Vector) vector);
- } else if (vector instanceof Float8Vector) {
- return ArrowDoubleWriter.forField((Float8Vector) vector);
- } else if (vector instanceof VarCharVector) {
- return ArrowVarCharWriter.forField((VarCharVector) vector);
- } else if (vector instanceof FixedSizeBinaryVector) {
- return ArrowBinaryWriter.forField((FixedSizeBinaryVector) vector);
- } else if (vector instanceof VarBinaryVector) {
- return ArrowVarBinaryWriter.forField((VarBinaryVector) vector);
- } else if (vector instanceof DecimalVector) {
- DecimalVector decimalVector = (DecimalVector) vector;
- return ArrowDecimalWriter.forField(
- decimalVector, getPrecision(decimalVector),
decimalVector.getScale());
- } else if (vector instanceof DateDayVector) {
- return ArrowDateWriter.forField((DateDayVector) vector);
- } else if (vector instanceof TimeSecVector
- || vector instanceof TimeMilliVector
- || vector instanceof TimeMicroVector
- || vector instanceof TimeNanoVector) {
- return ArrowTimeWriter.forField(vector);
- } else if (vector instanceof TimeStampVector
- && ((ArrowType.Timestamp)
vector.getField().getType()).getTimezone() == null) {
- int precision;
- if (dataType instanceof LocalZonedTimestampType) {
- precision = ((LocalZonedTimestampType)
dataType).getPrecision();
- return ArrowTimestampLtzWriter.forField(vector, precision);
- } else {
- precision = ((TimestampType) dataType).getPrecision();
- return ArrowTimestampNtzWriter.forField(vector, precision);
- }
- } else {
- throw new UnsupportedOperationException(
- String.format("Unsupported type %s.", dataType));
- }
- }
}
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/LanceDatasetAdapter.java
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/LanceDatasetAdapter.java
index 5d98da77f..b5e4c89f7 100644
---
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/LanceDatasetAdapter.java
+++
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/LanceDatasetAdapter.java
@@ -18,21 +18,15 @@
package org.apache.fluss.lake.lance.utils;
import org.apache.fluss.lake.lance.LanceConfig;
-import org.apache.fluss.lake.lance.tiering.LanceArrowWriter;
-import org.apache.fluss.types.RowType;
import com.lancedb.lance.Dataset;
-import com.lancedb.lance.Fragment;
import com.lancedb.lance.FragmentMetadata;
import com.lancedb.lance.ReadOptions;
import com.lancedb.lance.Transaction;
import com.lancedb.lance.WriteParams;
import com.lancedb.lance.operation.Append;
-import org.apache.arrow.c.ArrowArrayStream;
-import org.apache.arrow.c.Data;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
-import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.types.pojo.Schema;
import java.util.List;
@@ -74,16 +68,4 @@ public class LanceDatasetAdapter {
}
}
}
-
- public static LanceArrowWriter getArrowWriter(Schema schema, int
batchSize, RowType rowType) {
- return new LanceArrowWriter(allocator, schema, batchSize, rowType);
- }
-
- public static List<FragmentMetadata> createFragment(
- String datasetUri, ArrowReader reader, WriteParams params) {
- try (ArrowArrayStream arrowStream =
ArrowArrayStream.allocateNew(allocator)) {
- Data.exportArrayStream(allocator, reader, arrowStream);
- return Fragment.create(datasetUri, arrowStream, params);
- }
- }
}
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowBigIntWriter.java
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowBigIntWriter.java
deleted file mode 100644
index 4cc124bec..000000000
---
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowBigIntWriter.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.lake.lance.writers;
-
-import org.apache.fluss.row.InternalRow;
-
-import org.apache.arrow.vector.BigIntVector;
-
-/** {@link ArrowFieldWriter} for BigInt. */
-public class ArrowBigIntWriter extends ArrowFieldWriter<InternalRow> {
-
- public static ArrowBigIntWriter forField(BigIntVector bigIntVector) {
- return new ArrowBigIntWriter(bigIntVector);
- }
-
- private ArrowBigIntWriter(BigIntVector bigIntVector) {
- super(bigIntVector);
- }
-
- @Override
- public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
- BigIntVector vector = (BigIntVector) getValueVector();
- if (isNullAt(row, ordinal)) {
- vector.setNull(getCount());
- } else if (handleSafe) {
- vector.setSafe(getCount(), readLong(row, ordinal));
- }
- }
-
- private boolean isNullAt(InternalRow row, int ordinal) {
- return row.isNullAt(ordinal);
- }
-
- private long readLong(InternalRow row, int ordinal) {
- return row.getLong(ordinal);
- }
-}
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowBinaryWriter.java
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowBinaryWriter.java
deleted file mode 100644
index 75fa16d76..000000000
---
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowBinaryWriter.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.lake.lance.writers;
-
-import org.apache.fluss.row.InternalRow;
-
-import org.apache.arrow.vector.FixedSizeBinaryVector;
-
-/** {@link ArrowFieldWriter} for Binary. */
-public class ArrowBinaryWriter extends ArrowFieldWriter<InternalRow> {
-
- public static ArrowBinaryWriter forField(FixedSizeBinaryVector
binaryVector) {
- return new ArrowBinaryWriter(binaryVector);
- }
-
- private final int byteWidth;
-
- private ArrowBinaryWriter(FixedSizeBinaryVector binaryVector) {
- super(binaryVector);
- this.byteWidth = binaryVector.getByteWidth();
- }
-
- @Override
- public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
- FixedSizeBinaryVector vector = (FixedSizeBinaryVector)
getValueVector();
- if (isNullAt(row, ordinal)) {
- vector.setNull(getCount());
- } else if (handleSafe) {
- vector.setSafe(getCount(), readBinary(row, ordinal));
- } else {
- vector.set(getCount(), readBinary(row, ordinal));
- }
- }
-
- private boolean isNullAt(InternalRow row, int ordinal) {
- return row.isNullAt(ordinal);
- }
-
- private byte[] readBinary(InternalRow row, int ordinal) {
- return row.getBinary(ordinal, byteWidth);
- }
-}
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowBooleanWriter.java
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowBooleanWriter.java
deleted file mode 100644
index 85c9b7b38..000000000
---
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowBooleanWriter.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.lake.lance.writers;
-
-import org.apache.fluss.row.InternalRow;
-
-import org.apache.arrow.vector.BitVector;
-
-/** {@link ArrowFieldWriter} for Boolean. */
-public class ArrowBooleanWriter extends ArrowFieldWriter<InternalRow> {
-
- public static ArrowBooleanWriter forField(BitVector booleanVector) {
- return new ArrowBooleanWriter(booleanVector);
- }
-
- private ArrowBooleanWriter(BitVector bitVector) {
- super(bitVector);
- }
-
- @Override
- public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
- BitVector vector = (BitVector) getValueVector();
- if (isNullAt(row, ordinal)) {
- vector.setNull(getCount());
- } else if (handleSafe) {
- vector.setSafe(getCount(), readBoolean(row, ordinal) ? 1 : 0);
- } else {
- vector.set(getCount(), readBoolean(row, ordinal) ? 1 : 0);
- }
- }
-
- private boolean isNullAt(InternalRow row, int ordinal) {
- return row.isNullAt(ordinal);
- }
-
- private boolean readBoolean(InternalRow row, int ordinal) {
- return row.getBoolean(ordinal);
- }
-}
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowDateWriter.java
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowDateWriter.java
deleted file mode 100644
index 8aa2ec7bd..000000000
---
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowDateWriter.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.lake.lance.writers;
-
-import org.apache.fluss.row.InternalRow;
-
-import org.apache.arrow.vector.DateDayVector;
-
-/** {@link ArrowFieldWriter} for Date. */
-public class ArrowDateWriter extends ArrowFieldWriter<InternalRow> {
-
- public static ArrowDateWriter forField(DateDayVector dateDayVector) {
- return new ArrowDateWriter(dateDayVector);
- }
-
- private ArrowDateWriter(DateDayVector dateDayVector) {
- super(dateDayVector);
- }
-
- @Override
- public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
- DateDayVector vector = (DateDayVector) getValueVector();
- if (isNullAt(row, ordinal)) {
- vector.setNull(getCount());
- } else if (handleSafe) {
- vector.setSafe(getCount(), readDate(row, ordinal));
- } else {
- vector.set(getCount(), readDate(row, ordinal));
- }
- }
-
- private boolean isNullAt(InternalRow row, int ordinal) {
- return row.isNullAt(ordinal);
- }
-
- private int readDate(InternalRow row, int ordinal) {
- return row.getInt(ordinal);
- }
-}
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowDecimalWriter.java
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowDecimalWriter.java
deleted file mode 100644
index 0a9d01c4e..000000000
---
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowDecimalWriter.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.lake.lance.writers;
-
-import org.apache.fluss.row.Decimal;
-import org.apache.fluss.row.InternalRow;
-
-import org.apache.arrow.vector.DecimalVector;
-
-import java.math.BigDecimal;
-
-/** {@link ArrowFieldWriter} for Decimal. */
-public class ArrowDecimalWriter extends ArrowFieldWriter<InternalRow> {
-
- public static ArrowDecimalWriter forField(
- DecimalVector decimalVector, int precision, int scale) {
- return new ArrowDecimalWriter(decimalVector, precision, scale);
- }
-
- private final int precision;
- private final int scale;
-
- private ArrowDecimalWriter(DecimalVector decimalVector, int precision, int
scale) {
- super(decimalVector);
- this.precision = precision;
- this.scale = scale;
- }
-
- @Override
- public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
- DecimalVector vector = (DecimalVector) getValueVector();
- if (isNullAt(row, ordinal)) {
- vector.setNull(getCount());
- } else {
- BigDecimal bigDecimal = readDecimal(row, ordinal).toBigDecimal();
- if (bigDecimal == null) {
- vector.setNull(getCount());
- } else {
- if (handleSafe) {
- vector.setSafe(getCount(), bigDecimal);
- } else {
- vector.set(getCount(), bigDecimal);
- }
- }
- }
- }
-
- private boolean isNullAt(InternalRow row, int ordinal) {
- return row.isNullAt(ordinal);
- }
-
- private Decimal readDecimal(InternalRow row, int ordinal) {
- return row.getDecimal(ordinal, precision, scale);
- }
-}
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowDoubleWriter.java
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowDoubleWriter.java
deleted file mode 100644
index 459d1ac86..000000000
---
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowDoubleWriter.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.lake.lance.writers;
-
-import org.apache.fluss.row.InternalRow;
-
-import org.apache.arrow.vector.Float8Vector;
-
-/** {@link ArrowFieldWriter} for Double. */
-public class ArrowDoubleWriter extends ArrowFieldWriter<InternalRow> {
-
- public static ArrowDoubleWriter forField(Float8Vector doubleVector) {
- return new ArrowDoubleWriter(doubleVector);
- }
-
- private ArrowDoubleWriter(Float8Vector doubleVector) {
- super(doubleVector);
- }
-
- @Override
- public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
- Float8Vector vector = (Float8Vector) getValueVector();
- if (isNullAt(row, ordinal)) {
- vector.setNull(getCount());
- } else if (handleSafe) {
- vector.setSafe(getCount(), readDouble(row, ordinal));
- } else {
- vector.set(getCount(), readDouble(row, ordinal));
- }
- }
-
- private boolean isNullAt(InternalRow row, int ordinal) {
- return row.isNullAt(ordinal);
- }
-
- private double readDouble(InternalRow row, int ordinal) {
- return row.getDouble(ordinal);
- }
-}
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowFieldWriter.java
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowFieldWriter.java
deleted file mode 100644
index 15a56fc55..000000000
---
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowFieldWriter.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.lake.lance.writers;
-
-import org.apache.arrow.vector.ValueVector;
-
-import static org.apache.fluss.utils.Preconditions.checkNotNull;
-
-/**
- * Base class for arrow field writer which is used to convert a field to an
Arrow format.
- *
- * @param <IN> Type of the input to write. Currently, it's always InternalRow,
may support
- * InternalArray in the future.
- */
-public abstract class ArrowFieldWriter<IN> {
-
- /** Container which is used to store the written sequence of values of a
column. */
- private final ValueVector valueVector;
-
- private int count;
-
- public ArrowFieldWriter(ValueVector valueVector) {
- this.valueVector = checkNotNull(valueVector);
- }
-
- /** Returns the underlying container which stores the sequence of values
of a column. */
- public ValueVector getValueVector() {
- return valueVector;
- }
-
- /** Returns the current count of elements written. */
- public int getCount() {
- return count;
- }
-
- /** Sets the field value as the field at the specified ordinal of the
specified row. */
- public abstract void doWrite(IN row, int ordinal, boolean handleSafe);
-
- /** Writes the specified ordinal of the specified row. */
- public void write(IN row, int ordinal, boolean handleSafe) {
- doWrite(row, ordinal, handleSafe);
- count++;
- }
-
- public void finish() {
- valueVector.setValueCount(count);
- }
-}
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowFloatWriter.java
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowFloatWriter.java
deleted file mode 100644
index c5528580a..000000000
---
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowFloatWriter.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.lake.lance.writers;
-
-import org.apache.fluss.row.InternalRow;
-
-import org.apache.arrow.vector.Float4Vector;
-
-/** {@link ArrowFieldWriter} for Float. */
-public class ArrowFloatWriter extends ArrowFieldWriter<InternalRow> {
- public static ArrowFloatWriter forField(Float4Vector float4Vector) {
- return new ArrowFloatWriter(float4Vector);
- }
-
- private ArrowFloatWriter(Float4Vector float4Vector) {
- super(float4Vector);
- }
-
- @Override
- public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
- Float4Vector vector = (Float4Vector) getValueVector();
- if (isNullAt(row, ordinal)) {
- vector.setNull(getCount());
- } else if (handleSafe) {
- vector.setSafe(getCount(), readFloat(row, ordinal));
- } else {
- vector.set(getCount(), readFloat(row, ordinal));
- }
- }
-
- private boolean isNullAt(InternalRow row, int ordinal) {
- return row.isNullAt(ordinal);
- }
-
- private float readFloat(InternalRow row, int ordinal) {
- return row.getFloat(ordinal);
- }
-}
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowIntWriter.java
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowIntWriter.java
deleted file mode 100644
index 6b5ee8588..000000000
---
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowIntWriter.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.lake.lance.writers;
-
-import org.apache.fluss.row.InternalRow;
-
-import org.apache.arrow.vector.IntVector;
-
-/** {@link ArrowFieldWriter} for Int. */
-public class ArrowIntWriter extends ArrowFieldWriter<InternalRow> {
-
- public static ArrowIntWriter forField(IntVector intVector) {
- return new ArrowIntWriter(intVector);
- }
-
- private ArrowIntWriter(IntVector intVector) {
- super(intVector);
- }
-
- @Override
- public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
- IntVector vector = (IntVector) getValueVector();
- if (isNullAt(row, ordinal)) {
- vector.setNull(getCount());
- } else if (handleSafe) {
- vector.setSafe(getCount(), readInt(row, ordinal));
- } else {
- vector.set(getCount(), readInt(row, ordinal));
- }
- }
-
- private boolean isNullAt(InternalRow row, int ordinal) {
- return row.isNullAt(ordinal);
- }
-
- int readInt(InternalRow row, int ordinal) {
- return row.getInt(ordinal);
- }
-}
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowSmallIntWriter.java
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowSmallIntWriter.java
deleted file mode 100644
index aab41753b..000000000
---
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowSmallIntWriter.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.lake.lance.writers;
-
-import org.apache.fluss.row.InternalRow;
-
-import org.apache.arrow.vector.SmallIntVector;
-
-/** {@link ArrowFieldWriter} for SmallInt. */
-public class ArrowSmallIntWriter extends ArrowFieldWriter<InternalRow> {
-
- public static ArrowSmallIntWriter forField(SmallIntVector smallIntVector) {
- return new ArrowSmallIntWriter(smallIntVector);
- }
-
- private ArrowSmallIntWriter(SmallIntVector smallIntVector) {
- super(smallIntVector);
- }
-
- @Override
- public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
- SmallIntVector vector = (SmallIntVector) getValueVector();
- if (isNullAt(row, ordinal)) {
- vector.setNull(getCount());
- } else if (handleSafe) {
- vector.setSafe(getCount(), readShort(row, ordinal));
- } else {
- vector.set(getCount(), readShort(row, ordinal));
- }
- }
-
- public boolean isNullAt(InternalRow row, int ordinal) {
- return row.isNullAt(ordinal);
- }
-
- public short readShort(InternalRow row, int ordinal) {
- return row.getShort(ordinal);
- }
-}
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowTimeWriter.java
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowTimeWriter.java
deleted file mode 100644
index bf1e1e901..000000000
---
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowTimeWriter.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.lake.lance.writers;
-
-import org.apache.fluss.row.InternalRow;
-
-import org.apache.arrow.vector.BaseFixedWidthVector;
-import org.apache.arrow.vector.TimeMicroVector;
-import org.apache.arrow.vector.TimeMilliVector;
-import org.apache.arrow.vector.TimeNanoVector;
-import org.apache.arrow.vector.TimeSecVector;
-import org.apache.arrow.vector.ValueVector;
-
-import static org.apache.fluss.utils.Preconditions.checkState;
-
-/** {@link ArrowFieldWriter} for Time. */
-public class ArrowTimeWriter extends ArrowFieldWriter<InternalRow> {
-
- public static ArrowTimeWriter forField(ValueVector valueVector) {
- return new ArrowTimeWriter(valueVector);
- }
-
- private ArrowTimeWriter(ValueVector valueVector) {
- super(valueVector);
- checkState(
- valueVector instanceof TimeSecVector
- || valueVector instanceof TimeMilliVector
- || valueVector instanceof TimeMicroVector
- || valueVector instanceof TimeNanoVector);
- }
-
- @Override
- public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
- ValueVector valueVector = getValueVector();
- if (isNullAt(row, ordinal)) {
- ((BaseFixedWidthVector) valueVector).setNull(getCount());
- } else if (valueVector instanceof TimeSecVector) {
- int sec = readTime(row, ordinal) / 1000;
- if (handleSafe) {
- ((TimeSecVector) valueVector).setSafe(getCount(), sec);
- } else {
- ((TimeSecVector) valueVector).set(getCount(), sec);
- }
- } else if (valueVector instanceof TimeMilliVector) {
- int ms = readTime(row, ordinal);
- if (handleSafe) {
- ((TimeMilliVector) valueVector).setSafe(getCount(), ms);
- } else {
- ((TimeMilliVector) valueVector).set(getCount(), ms);
- }
- } else if (valueVector instanceof TimeMicroVector) {
- long microSec = readTime(row, ordinal) * 1000L;
- if (handleSafe) {
- ((TimeMicroVector) valueVector).setSafe(getCount(), microSec);
- } else {
- ((TimeMicroVector) valueVector).set(getCount(), microSec);
- }
- } else {
- long nanoSec = readTime(row, ordinal) * 1000000L;
- if (handleSafe) {
- ((TimeNanoVector) valueVector).setSafe(getCount(), nanoSec);
- } else {
- ((TimeNanoVector) valueVector).set(getCount(), nanoSec);
- }
- }
- }
-
- private boolean isNullAt(InternalRow row, int ordinal) {
- return row.isNullAt(ordinal);
- }
-
- private int readTime(InternalRow row, int ordinal) {
- return row.getInt(ordinal);
- }
-}
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowTimestampLtzWriter.java
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowTimestampLtzWriter.java
deleted file mode 100644
index a9333c000..000000000
---
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowTimestampLtzWriter.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.lake.lance.writers;
-
-import org.apache.fluss.row.InternalRow;
-import org.apache.fluss.row.TimestampLtz;
-
-import org.apache.arrow.vector.TimeStampMicroVector;
-import org.apache.arrow.vector.TimeStampMilliVector;
-import org.apache.arrow.vector.TimeStampSecVector;
-import org.apache.arrow.vector.TimeStampVector;
-import org.apache.arrow.vector.ValueVector;
-import org.apache.arrow.vector.types.pojo.ArrowType;
-
-import static org.apache.fluss.utils.Preconditions.checkState;
-
-/** {@link ArrowFieldWriter} for TimestampLtz. */
-public class ArrowTimestampLtzWriter extends ArrowFieldWriter<InternalRow> {
- public static ArrowTimestampLtzWriter forField(ValueVector valueVector,
int precision) {
- return new ArrowTimestampLtzWriter(valueVector, precision);
- }
-
- private final int precision;
-
- private ArrowTimestampLtzWriter(ValueVector valueVector, int precision) {
- super(valueVector);
- checkState(
- valueVector instanceof TimeStampVector
- && ((ArrowType.Timestamp)
valueVector.getField().getType()).getTimezone()
- == null);
- this.precision = precision;
- }
-
- @Override
- public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
- TimeStampVector vector = (TimeStampVector) getValueVector();
- if (isNullAt(row, ordinal)) {
- vector.setNull(getCount());
- } else {
- TimestampLtz timestamp = readTimestamp(row, ordinal);
- if (vector instanceof TimeStampSecVector) {
- long sec = timestamp.getEpochMillisecond() / 1000;
- if (handleSafe) {
- vector.setSafe(getCount(), sec);
- } else {
- vector.set(getCount(), sec);
- }
- } else if (vector instanceof TimeStampMilliVector) {
- long ms = timestamp.getEpochMillisecond();
- if (handleSafe) {
- vector.setSafe(getCount(), ms);
- } else {
- vector.set(getCount(), ms);
- }
- } else if (vector instanceof TimeStampMicroVector) {
- long microSec =
- timestamp.getEpochMillisecond() * 1000
- + timestamp.getNanoOfMillisecond() / 1000;
- if (handleSafe) {
- vector.setSafe(getCount(), microSec);
- } else {
- vector.set(getCount(), microSec);
- }
- } else {
- long nanoSec =
- timestamp.getEpochMillisecond() * 1_000_000
- + timestamp.getNanoOfMillisecond();
- if (handleSafe) {
- vector.setSafe(getCount(), nanoSec);
- } else {
- vector.set(getCount(), nanoSec);
- }
- }
- }
- }
-
- private boolean isNullAt(InternalRow row, int ordinal) {
- return row.isNullAt(ordinal);
- }
-
- private TimestampLtz readTimestamp(InternalRow row, int ordinal) {
- return row.getTimestampLtz(ordinal, precision);
- }
-}
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowTimestampNtzWriter.java
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowTimestampNtzWriter.java
deleted file mode 100644
index c57a30eb2..000000000
---
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowTimestampNtzWriter.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.lake.lance.writers;
-
-import org.apache.fluss.row.InternalRow;
-import org.apache.fluss.row.TimestampNtz;
-
-import org.apache.arrow.vector.TimeStampMicroVector;
-import org.apache.arrow.vector.TimeStampMilliVector;
-import org.apache.arrow.vector.TimeStampSecVector;
-import org.apache.arrow.vector.TimeStampVector;
-import org.apache.arrow.vector.ValueVector;
-import org.apache.arrow.vector.types.pojo.ArrowType;
-
-import static org.apache.fluss.utils.Preconditions.checkState;
-
-/** {@link ArrowFieldWriter} for TimestampNtz. */
-public class ArrowTimestampNtzWriter extends ArrowFieldWriter<InternalRow> {
- public static ArrowTimestampNtzWriter forField(ValueVector valueVector,
int precision) {
- return new ArrowTimestampNtzWriter(valueVector, precision);
- }
-
- private final int precision;
-
- private ArrowTimestampNtzWriter(ValueVector valueVector, int precision) {
- super(valueVector);
- checkState(
- valueVector instanceof TimeStampVector
- && ((ArrowType.Timestamp)
valueVector.getField().getType()).getTimezone()
- == null);
- this.precision = precision;
- }
-
- @Override
- public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
- TimeStampVector vector = (TimeStampVector) getValueVector();
- if (isNullAt(row, ordinal)) {
- vector.setNull(getCount());
- } else {
- TimestampNtz timestamp = readTimestamp(row, ordinal);
- if (vector instanceof TimeStampSecVector) {
- long sec = timestamp.getMillisecond() / 1000;
- if (handleSafe) {
- vector.setSafe(getCount(), sec);
- } else {
- vector.set(getCount(), sec);
- }
- } else if (vector instanceof TimeStampMilliVector) {
- long ms = timestamp.getMillisecond();
- if (handleSafe) {
- vector.setSafe(getCount(), ms);
- } else {
- vector.set(getCount(), ms);
- }
- } else if (vector instanceof TimeStampMicroVector) {
- long microSec =
- timestamp.getMillisecond() * 1000 +
timestamp.getNanoOfMillisecond() / 1000;
- if (handleSafe) {
- vector.setSafe(getCount(), microSec);
- } else {
- vector.set(getCount(), microSec);
- }
- } else {
- long nanoSec =
- timestamp.getMillisecond() * 1_000_000 +
timestamp.getNanoOfMillisecond();
- if (handleSafe) {
- vector.setSafe(getCount(), nanoSec);
- } else {
- vector.set(getCount(), nanoSec);
- }
- }
- }
- }
-
- private boolean isNullAt(InternalRow row, int ordinal) {
- return row.isNullAt(ordinal);
- }
-
- private TimestampNtz readTimestamp(InternalRow row, int ordinal) {
- return row.getTimestampNtz(ordinal, precision);
- }
-}
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowTinyIntWriter.java
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowTinyIntWriter.java
deleted file mode 100644
index 0be03aac7..000000000
---
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowTinyIntWriter.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.lake.lance.writers;
-
-import org.apache.fluss.row.InternalRow;
-
-import org.apache.arrow.vector.TinyIntVector;
-
-/** {@link ArrowFieldWriter} for TinyInt. */
-public class ArrowTinyIntWriter extends ArrowFieldWriter<InternalRow> {
-
- public static ArrowTinyIntWriter forField(TinyIntVector tinyIntVector) {
- return new ArrowTinyIntWriter(tinyIntVector);
- }
-
- private ArrowTinyIntWriter(TinyIntVector tinyIntVector) {
- super(tinyIntVector);
- }
-
- @Override
- public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
- TinyIntVector vector = (TinyIntVector) getValueVector();
- if (isNullAt(row, ordinal)) {
- vector.setNull(getCount());
- } else {
- if (handleSafe) {
- vector.setSafe(getCount(), readByte(row, ordinal));
- } else {
- vector.set(getCount(), readByte(row, ordinal));
- }
- }
- }
-
- private boolean isNullAt(InternalRow row, int ordinal) {
- return row.isNullAt(ordinal);
- }
-
- private byte readByte(InternalRow row, int ordinal) {
- return row.getByte(ordinal);
- }
-}
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowVarBinaryWriter.java
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowVarBinaryWriter.java
deleted file mode 100644
index 3c9668a3c..000000000
---
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowVarBinaryWriter.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.lake.lance.writers;
-
-import org.apache.fluss.row.InternalRow;
-
-import org.apache.arrow.vector.VarBinaryVector;
-
-/** {@link ArrowFieldWriter} for VarBinary. */
-public class ArrowVarBinaryWriter extends ArrowFieldWriter<InternalRow> {
-
- public static ArrowVarBinaryWriter forField(VarBinaryVector
varBinaryVector) {
- return new ArrowVarBinaryWriter(varBinaryVector);
- }
-
- private ArrowVarBinaryWriter(VarBinaryVector varBinaryVector) {
- super(varBinaryVector);
- }
-
- @Override
- public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
- VarBinaryVector vector = (VarBinaryVector) getValueVector();
- if (isNullAt(row, ordinal)) {
- vector.setNull(getCount());
- } else {
- vector.setSafe(getCount(), readBinary(row, ordinal));
- }
- }
-
- private boolean isNullAt(InternalRow row, int ordinal) {
- return row.isNullAt(ordinal);
- }
-
- private byte[] readBinary(InternalRow row, int ordinal) {
- return row.getBytes(ordinal);
- }
-}
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowVarCharWriter.java
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowVarCharWriter.java
deleted file mode 100644
index 72a401785..000000000
---
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/writers/ArrowVarCharWriter.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.lake.lance.writers;
-
-import org.apache.fluss.row.BinaryString;
-import org.apache.fluss.row.InternalRow;
-
-import org.apache.arrow.vector.VarCharVector;
-
-import java.nio.ByteBuffer;
-
-/** {@link ArrowFieldWriter} for VarChar. */
-public class ArrowVarCharWriter extends ArrowFieldWriter<InternalRow> {
-
- public static ArrowVarCharWriter forField(VarCharVector varCharVector) {
- return new ArrowVarCharWriter(varCharVector);
- }
-
- private ArrowVarCharWriter(VarCharVector varCharVector) {
- super(varCharVector);
- }
-
- @Override
- public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
- VarCharVector vector = (VarCharVector) getValueVector();
- if (isNullAt(row, ordinal)) {
- vector.setNull(getCount());
- } else {
- ByteBuffer buffer = readString(row, ordinal).wrapByteBuffer();
- vector.setSafe(getCount(), buffer, buffer.position(),
buffer.remaining());
- }
- }
-
- private boolean isNullAt(InternalRow row, int ordinal) {
- return row.isNullAt(ordinal);
- }
-
- private BinaryString readString(InternalRow row, int ordinal) {
- return row.getString(ordinal);
- }
-}
diff --git
a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/LakeEnabledTableCreateITCase.java
b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/LakeEnabledTableCreateITCase.java
index eeab99b99..45ccefdf0 100644
---
a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/LakeEnabledTableCreateITCase.java
+++
b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/LakeEnabledTableCreateITCase.java
@@ -166,7 +166,11 @@ class LakeEnabledTableCreateITCase {
Field logC10 =
new Field("log_c10", FieldType.nullable(new
ArrowType.FixedSizeBinary(10)), null);
Field logC11 = new Field("log_c11", FieldType.nullable(new
ArrowType.Binary()), null);
- Field logC12 = new Field("log_c12", FieldType.nullable(new
ArrowType.Decimal(10, 2)), null);
+ Field logC12 =
+ new Field(
+ "log_c12",
+ FieldType.nullable(ArrowType.Decimal.createDecimal(10,
2, null)),
+ null);
Field logC13 =
new Field("log_c13", FieldType.nullable(new
ArrowType.Date(DateUnit.DAY)), null);
Field logC14 =