wuchong commented on code in PR #2345:
URL: https://github.com/apache/fluss/pull/2345#discussion_r2693394570


##########
fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/ShadedArrowBatchWriter.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.lance.tiering;
+
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.arrow.writers.ArrowFieldWriter;
+import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator;
+import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.FieldVector;
+import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.fluss.types.RowType;
+import org.apache.fluss.utils.ArrowUtils;
+
+/**
+ * Batch writer using shaded Arrow and ArrowFieldWriter from fluss-common.
+ *
+ * <p>This class uses shaded Arrow vectors and ArrowFieldWriters to write 
Fluss InternalRows. It can
+ * later be converted to non-shaded Arrow format for Lance compatibility.
+ */
+public class ShadedArrowBatchWriter implements AutoCloseable {
+    private final VectorSchemaRoot shadedRoot;
+    private final ArrowFieldWriter[] fieldWriters;
+    private int recordsCount;
+
+    public ShadedArrowBatchWriter(BufferAllocator shadedAllocator, RowType 
rowType) {
+        this.shadedRoot =
+                VectorSchemaRoot.create(ArrowUtils.toArrowSchema(rowType), 
shadedAllocator);
+        this.shadedRoot.allocateNew();
+        this.fieldWriters = new ArrowFieldWriter[rowType.getFieldCount()];
+
+        for (int i = 0; i < fieldWriters.length; i++) {
+            FieldVector fieldVector = shadedRoot.getVector(i);
+            fieldWriters[i] = ArrowUtils.createArrowFieldWriter(fieldVector, 
rowType.getTypeAt(i));
+        }
+        this.recordsCount = 0;
+    }
+
+    public void writeRow(InternalRow row) {
+        boolean handleSafe = recordsCount >= 1024;
+        for (int i = 0; i < fieldWriters.length; i++) {
+            fieldWriters[i].write(recordsCount, row, i, handleSafe);
+        }
+        recordsCount++;
+    }
+
+    public void finish() {
+        shadedRoot.setRowCount(recordsCount);
+    }
+
+    public void reset() {

Review Comment:
   This is never called, should we call it in `LanceLakeWriter#complete`?



##########
fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/ArrowDataConverter.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.lance.utils;
+
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ * Utility class to convert between shaded and non-shaded Arrow 
VectorSchemaRoot by sharing off-heap
+ * memory directly.
+ *
+ * <p>Since both shaded and non-shaded Arrow use the same off-heap memory 
layout, we can share the
+ * underlying ByteBuffer/memory address directly without serialization 
overhead.
+ */
+public class ArrowDataConverter {
+
+    /**
+     * Converts a shaded Arrow VectorSchemaRoot to a non-shaded Arrow 
VectorSchemaRoot by sharing
+     * the underlying off-heap memory.
+     *
+     * @param shadedRoot The shaded Arrow VectorSchemaRoot from fluss-common
+     * @param nonShadedAllocator The non-shaded BufferAllocator for Lance
+     * @param nonShadedSchema The non-shaded Arrow Schema for Lance
+     * @return A non-shaded VectorSchemaRoot compatible with Lance
+     */
+    public static VectorSchemaRoot convertToNonShaded(
+            
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.VectorSchemaRoot 
shadedRoot,
+            BufferAllocator nonShadedAllocator,
+            Schema nonShadedSchema) {
+
+        VectorSchemaRoot nonShadedRoot =
+                VectorSchemaRoot.create(nonShadedSchema, nonShadedAllocator);
+        nonShadedRoot.allocateNew();
+
+        
List<org.apache.fluss.shaded.arrow.org.apache.arrow.vector.FieldVector> 
shadedVectors =
+                shadedRoot.getFieldVectors();
+        List<FieldVector> nonShadedVectors = nonShadedRoot.getFieldVectors();
+
+        for (int i = 0; i < shadedVectors.size(); i++) {
+            copyVectorData(shadedVectors.get(i), nonShadedVectors.get(i));
+        }
+
+        nonShadedRoot.setRowCount(shadedRoot.getRowCount());
+        return nonShadedRoot;
+    }
+
+    private static void copyVectorData(
+            org.apache.fluss.shaded.arrow.org.apache.arrow.vector.FieldVector 
shadedVector,
+            FieldVector nonShadedVector) {
+        try {
+            
List<org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowBuf> 
shadedBuffers =
+                    getFieldBuffers(shadedVector);
+
+            int valueCount = getValueCount(shadedVector);
+            nonShadedVector.setValueCount(valueCount);
+
+            List<ArrowBuf> nonShadedBuffers = 
nonShadedVector.getFieldBuffers();
+
+            for (int i = 0; i < Math.min(shadedBuffers.size(), 
nonShadedBuffers.size()); i++) {
+                org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowBuf 
shadedBuf =
+                        shadedBuffers.get(i);
+                ArrowBuf nonShadedBuf = nonShadedBuffers.get(i);
+
+                long size = Math.min(shadedBuf.capacity(), 
nonShadedBuf.capacity());
+                if (size > 0) {
+                    ByteBuffer srcBuffer = getByteBuffer(shadedBuf);
+                    if (srcBuffer != null) {
+                        srcBuffer.position(0);
+                        srcBuffer.limit((int) Math.min(size, 
Integer.MAX_VALUE));
+                        nonShadedBuf.setBytes(0, srcBuffer);
+                    }
+                }
+            }
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to copy vector data via 
off-heap memory", e);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private static 
List<org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowBuf>
+            getFieldBuffers(
+                    
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.FieldVector vector) {
+        try {
+            Method method = vector.getClass().getMethod("getFieldBuffers");
+            return 
(List<org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowBuf>)
+                    method.invoke(vector);
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to get field buffers from 
shaded vector", e);
+        }
+    }
+
+    private static int getValueCount(
+            org.apache.fluss.shaded.arrow.org.apache.arrow.vector.FieldVector 
vector) {
+        try {
+            Method method = vector.getClass().getMethod("getValueCount");
+            return (int) method.invoke(vector);
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to get value count from shaded 
vector", e);
+        }
+    }
+
+    private static ByteBuffer getByteBuffer(
+            org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowBuf 
buf) {
+        try {
+            Method method = buf.getClass().getMethod("nioBuffer", long.class, 
int.class);
+            return (ByteBuffer) method.invoke(buf, 0L, (int) buf.capacity());
+        } catch (Exception e) {
+            try {
+                Field field = buf.getClass().getDeclaredField("memoryAddress");
+                field.setAccessible(true);
+                long address = (long) field.get(buf);
+                return null;
+            } catch (Exception ex) {
+                throw new RuntimeException("Failed to get ByteBuffer from 
ArrowBuf", ex);

Review Comment:
   Why introduce these reflections? It seems they provide these methods and can 
be directly invoked. 



##########
fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceLakeWriter.java:
##########
@@ -48,47 +62,86 @@ public LanceLakeWriter(Configuration options, 
WriterInitContext writerInitContex
                         
writerInitContext.tableInfo().getCustomProperties().toMap(),
                         writerInitContext.tablePath().getDatabaseName(),
                         writerInitContext.tablePath().getTableName());
-        int batchSize = LanceConfig.getBatchSize(config);
+
+        this.batchSize = LanceConfig.getBatchSize(config);
+        this.datasetUri = config.getDatasetUri();
+        this.writeParams = LanceConfig.genWriteParamsFromConfig(config);
+        this.rowType = writerInitContext.tableInfo().getRowType();
+        this.nonShadedAllocator = new RootAllocator();
+        this.shadedAllocator =
+                new 
org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator();
+        this.buffer = new ArrayList<>(batchSize);
+        this.allFragments = new ArrayList<>();
+
         Optional<Schema> schema = LanceDatasetAdapter.getSchema(config);
         if (!schema.isPresent()) {
-            throw new IOException("Fail to get dataset " + 
config.getDatasetUri() + " in Lance.");
+            throw new IOException("Fail to get dataset " + datasetUri + " in 
Lance.");
         }
-
-        this.arrowWriter =
-                LanceDatasetAdapter.getArrowWriter(
-                        schema.get(), batchSize, 
writerInitContext.tableInfo().getRowType());
-
-        WriteParams params = LanceConfig.genWriteParamsFromConfig(config);
-        Callable<List<FragmentMetadata>> fragmentCreator =
-                () ->
-                        LanceDatasetAdapter.createFragment(
-                                config.getDatasetUri(), arrowWriter, params);
-        fragmentCreationTask = new FutureTask<>(fragmentCreator);
-        Thread fragmentCreationThread = new Thread(fragmentCreationTask);
-        fragmentCreationThread.start();
+        this.nonShadedSchema = schema.get();
     }
 
     @Override
     public void write(LogRecord record) throws IOException {
-        arrowWriter.write(record);
+        buffer.add(record.getRow());
+
+        // Flush when buffer reaches batch size
+        if (buffer.size() >= batchSize) {
+            flush();
+        }
     }
 
-    @Override
-    public LanceWriteResult complete() throws IOException {
-        arrowWriter.setFinished();
+    private void flush() throws IOException {
+        if (buffer.isEmpty()) {
+            return;
+        }
+
+        ShadedArrowBatchWriter shadedWriter = null;
+        VectorSchemaRoot nonShadedRoot = null;
+
         try {
-            List<FragmentMetadata> fragmentMetadata = 
fragmentCreationTask.get();
-            return new LanceWriteResult(fragmentMetadata);
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new IOException("Interrupted while waiting for reader thread 
to finish", e);
-        } catch (ExecutionException e) {
-            throw new IOException("Exception in reader thread", e);
+            shadedWriter = new ShadedArrowBatchWriter(shadedAllocator, 
rowType);
+
+            for (InternalRow row : buffer) {
+                shadedWriter.writeRow(row);
+            }
+            shadedWriter.finish();
+
+            nonShadedRoot =
+                    ArrowDataConverter.convertToNonShaded(
+                            shadedWriter.getShadedRoot(), nonShadedAllocator, 
nonShadedSchema);
+
+            List<FragmentMetadata> fragments =
+                    Fragment.create(datasetUri, nonShadedAllocator, 
nonShadedRoot, writeParams);
+
+            allFragments.addAll(fragments);

Review Comment:
   No need to use a memory shared variable `allFragments`, this can be a local 
variable and as a return value of the `flush()` method? I can't find the reset 
of `allFragments`.



##########
fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/ShadedArrowBatchWriter.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.lance.tiering;
+
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.arrow.writers.ArrowFieldWriter;
+import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator;
+import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.FieldVector;
+import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.fluss.types.RowType;
+import org.apache.fluss.utils.ArrowUtils;
+
+/**
+ * Batch writer using shaded Arrow and ArrowFieldWriter from fluss-common.
+ *
+ * <p>This class uses shaded Arrow vectors and ArrowFieldWriters to write 
Fluss InternalRows. It can
+ * later be converted to non-shaded Arrow format for Lance compatibility.
+ */
+public class ShadedArrowBatchWriter implements AutoCloseable {
+    private final VectorSchemaRoot shadedRoot;
+    private final ArrowFieldWriter[] fieldWriters;
+    private int recordsCount;
+
+    public ShadedArrowBatchWriter(BufferAllocator shadedAllocator, RowType 
rowType) {
+        this.shadedRoot =
+                VectorSchemaRoot.create(ArrowUtils.toArrowSchema(rowType), 
shadedAllocator);
+        this.shadedRoot.allocateNew();
+        this.fieldWriters = new ArrowFieldWriter[rowType.getFieldCount()];
+
+        for (int i = 0; i < fieldWriters.length; i++) {
+            FieldVector fieldVector = shadedRoot.getVector(i);
+            fieldWriters[i] = ArrowUtils.createArrowFieldWriter(fieldVector, 
rowType.getTypeAt(i));

Review Comment:
   Can we direclty use `ArrowWriter` instead of `ArrowFieldWriter`? It seems 
here missed to call `initFieldVector` which has been done in `ArrowWriter`.



##########
fluss-lake/fluss-lake-lance/pom.xml:
##########
@@ -41,7 +41,6 @@
             <groupId>org.apache.fluss</groupId>
             <artifactId>fluss-common</artifactId>
             <version>${project.version}</version>
-            <scope>provided</scope>

Review Comment:
   Why need to include `fluss-common` into shaded jar? If this is needed for 
testing, we can add a `test` scope. 
   
   @luoyuxia could you also help to check this ?



##########
fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceLakeWriter.java:
##########
@@ -48,47 +62,86 @@ public LanceLakeWriter(Configuration options, 
WriterInitContext writerInitContex
                         
writerInitContext.tableInfo().getCustomProperties().toMap(),
                         writerInitContext.tablePath().getDatabaseName(),
                         writerInitContext.tablePath().getTableName());
-        int batchSize = LanceConfig.getBatchSize(config);
+
+        this.batchSize = LanceConfig.getBatchSize(config);
+        this.datasetUri = config.getDatasetUri();
+        this.writeParams = LanceConfig.genWriteParamsFromConfig(config);
+        this.rowType = writerInitContext.tableInfo().getRowType();
+        this.nonShadedAllocator = new RootAllocator();
+        this.shadedAllocator =
+                new 
org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator();
+        this.buffer = new ArrayList<>(batchSize);
+        this.allFragments = new ArrayList<>();
+
         Optional<Schema> schema = LanceDatasetAdapter.getSchema(config);
         if (!schema.isPresent()) {
-            throw new IOException("Fail to get dataset " + 
config.getDatasetUri() + " in Lance.");
+            throw new IOException("Fail to get dataset " + datasetUri + " in 
Lance.");
         }
-
-        this.arrowWriter =
-                LanceDatasetAdapter.getArrowWriter(
-                        schema.get(), batchSize, 
writerInitContext.tableInfo().getRowType());
-
-        WriteParams params = LanceConfig.genWriteParamsFromConfig(config);
-        Callable<List<FragmentMetadata>> fragmentCreator =
-                () ->
-                        LanceDatasetAdapter.createFragment(
-                                config.getDatasetUri(), arrowWriter, params);
-        fragmentCreationTask = new FutureTask<>(fragmentCreator);
-        Thread fragmentCreationThread = new Thread(fragmentCreationTask);
-        fragmentCreationThread.start();
+        this.nonShadedSchema = schema.get();
     }
 
     @Override
     public void write(LogRecord record) throws IOException {
-        arrowWriter.write(record);
+        buffer.add(record.getRow());

Review Comment:
   Why buffer it first instead of writing to arrow directly? This introduce 
doubled memory overhead. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to