Copilot commented on code in PR #1930: URL: https://github.com/apache/auron/pull/1930#discussion_r2706595758
########## auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java: ########## @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.auron.flink.arrow; + +import java.util.ArrayList; +import java.util.List; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.complex.MapVector; +import org.apache.arrow.vector.types.DateUnit; +import org.apache.arrow.vector.types.FloatingPointPrecision; +import org.apache.arrow.vector.types.TimeUnit; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.BinaryType; +import org.apache.flink.table.types.logical.BooleanType; +import org.apache.flink.table.types.logical.CharType; +import org.apache.flink.table.types.logical.DateType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.DoubleType; +import org.apache.flink.table.types.logical.FloatType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.NullType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.SmallIntType; +import org.apache.flink.table.types.logical.TimeType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.TinyIntType; +import org.apache.flink.table.types.logical.VarBinaryType; +import org.apache.flink.table.types.logical.VarCharType; + +/** Utility class for converting between Flink LogicalType and Arrow types. */ +public class FlinkArrowUtils { + + /** Root allocator for Arrow memory management. */ + public static final RootAllocator ROOT_ALLOCATOR = new RootAllocator(Long.MAX_VALUE); + + static { + Runtime.getRuntime().addShutdownHook(new Thread(() -> ROOT_ALLOCATOR.close())); + } Review Comment: Using a global RootAllocator with Long.MAX_VALUE limit could lead to excessive memory consumption. The shutdown hook at line 60 may not execute in all termination scenarios (e.g., kill -9, OutOfMemoryError). Consider documenting the memory implications and whether users should be aware of memory limits, or provide a configuration option for the allocator limit. ########## auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowFFIExporter.java: ########## @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.auron.flink.arrow; + +import java.util.Iterator; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import org.apache.arrow.c.ArrowArray; +import org.apache.arrow.c.ArrowSchema; +import org.apache.arrow.c.Data; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.dictionary.DictionaryProvider; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.auron.arrowio.AuronArrowFFIExporter; +import org.apache.auron.configuration.AuronConfiguration; +import org.apache.auron.jni.AuronAdaptor; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; + +/** + * Exports Flink RowData to Arrow format via FFI (Foreign Function Interface) + * for consumption by native code. + * + * <p>This exporter uses an asynchronous producer-consumer model with double + * queues to ensure safe resource management. The producer thread creates + * batches ahead of time while the consumer (native side) processes them. + * Resources are only cleaned up after the consumer confirms it has finished + * using the previous batch. + * + * <p>Key design points: + * <ul> + * <li>Producer thread pre-creates batches and puts them in outputQueue</li> + * <li>Consumer takes batches and signals via processingQueue when done</li> + * <li>Previous batch resources are cleaned up only after consumer confirms</li> + * <li>No TaskContext in Flink, so cancellation uses closed flag + thread interrupt</li> + * </ul> + */ +public class FlinkArrowFFIExporter extends AuronArrowFFIExporter { + + /** Queue state representing a data batch ready for export. */ + private static final class NextBatch { + final VectorSchemaRoot root; + final BufferAllocator allocator; + + NextBatch(VectorSchemaRoot root, BufferAllocator allocator) { + this.root = root; + this.allocator = allocator; + } + } + + /** Queue state representing end of data or an error. */ + private static final class Finished { + final Throwable error; // null means normal completion + + Finished(Throwable error) { + this.error = error; + } + } + + private final Iterator<RowData> rowIterator; + private final RowType rowType; + private final Schema arrowSchema; + private final DictionaryProvider.MapDictionaryProvider emptyDictionaryProvider; + private final int maxBatchNumRows; + private final long maxBatchMemorySize; + + // Double queue synchronization (capacity 4, smaller than Spark's 16 for streaming) + private final BlockingQueue<Object> outputQueue; + private final BlockingQueue<Object> processingQueue; + + // Previous batch resources (cleaned up after consumer confirms) + private VectorSchemaRoot previousRoot; + private BufferAllocator previousAllocator; + + // Producer thread + private final Thread producerThread; + private volatile boolean closed = false; + + /** + * Creates a new FlinkArrowFFIExporter. + * + * @param rowIterator Iterator over RowData to export + * @param rowType The Flink row type + */ + public FlinkArrowFFIExporter(Iterator<RowData> rowIterator, RowType rowType) { + this.rowIterator = rowIterator; + this.rowType = rowType; + this.arrowSchema = FlinkArrowUtils.toArrowSchema(rowType); + this.emptyDictionaryProvider = new DictionaryProvider.MapDictionaryProvider(); + + // Get configuration + AuronConfiguration config = AuronAdaptor.getInstance().getAuronConfiguration(); + this.maxBatchNumRows = config.getInteger(AuronConfiguration.BATCH_SIZE); + this.maxBatchMemorySize = 8 * 1024 * 1024; // 8MB default, same as Spark + + this.outputQueue = new ArrayBlockingQueue<>(4); + this.processingQueue = new ArrayBlockingQueue<>(4); + + this.producerThread = startProducerThread(); + } + + /** + * Starts the producer thread that creates batches asynchronously. + */ + private Thread startProducerThread() { + Thread thread = new Thread( + () -> { + try { + while (!closed && !Thread.currentThread().isInterrupted()) { + if (!rowIterator.hasNext()) { + outputQueue.put(new Finished(null)); + return; + } + + // Create a new batch + BufferAllocator allocator = + FlinkArrowUtils.createChildAllocator("FlinkArrowFFIExporter-producer"); + VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, allocator); + FlinkArrowWriter writer = FlinkArrowWriter.create(root, rowType); + + // Fill the batch with data + while (!closed + && rowIterator.hasNext() + && allocator.getAllocatedMemory() < maxBatchMemorySize + && writer.currentCount() < maxBatchNumRows) { + writer.write(rowIterator.next()); + } + writer.finish(); + + // Put batch in output queue for consumer + outputQueue.put(new NextBatch(root, allocator)); + + // Wait for consumer to confirm it's done with previous batch + // This is critical for safe resource management! + processingQueue.take(); + } + outputQueue.put(new Finished(null)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + // Normal interruption, not an error + } catch (Throwable e) { + outputQueue.clear(); + try { + outputQueue.put(new Finished(e)); + } catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + } + } + }, + "FlinkArrowFFIExporter-producer"); + + thread.setDaemon(true); + thread.setUncaughtExceptionHandler((t, e) -> { + outputQueue.clear(); + try { + outputQueue.put(new Finished(e)); + } catch (InterruptedException ignored) { + // Ignore + } + }); + thread.start(); + return thread; + } + + /** + * Exports the Arrow schema to the native side. + * + * @param exportArrowSchemaPtr Pointer to the Arrow C Data Interface schema + * structure + */ + public void exportSchema(long exportArrowSchemaPtr) { + try (ArrowSchema exportSchema = ArrowSchema.wrap(exportArrowSchemaPtr)) { + Data.exportSchema(FlinkArrowUtils.ROOT_ALLOCATOR, arrowSchema, emptyDictionaryProvider, exportSchema); + } + } + + /** + * Exports the next batch of data to the native side. + * + * <p>This method takes a batch from the producer thread and exports it + * via the Arrow C Data Interface. The previous batch's resources are + * cleaned up before taking the new batch (after consumer confirms). + * + * @param exportArrowArrayPtr Pointer to the Arrow C Data Interface array + * structure + * @return true if a batch was exported, false if no more data is available + */ + @Override + public boolean exportNextBatch(long exportArrowArrayPtr) { + // Clean up previous batch resources (consumer has confirmed it's done) + cleanupPreviousBatch(); + + if (closed) { + return false; + } + + // Wait for producer to provide next batch + Object state; + try { + state = outputQueue.take(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return false; + } Review Comment: There's a potential race condition between checking the closed flag at line 208 and taking from outputQueue at line 215. If close() is called between these two operations, the thread could block indefinitely on outputQueue.take(). Consider checking the closed flag again after the InterruptedException catch, or use a timeout-based poll operation instead of blocking take. ########## auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowWriterTest.java: ########## @@ -0,0 +1,376 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.auron.flink.arrow; + +import static org.junit.jupiter.api.Assertions.*; + +import java.util.HashMap; +import java.util.Map; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.BooleanType; +import org.apache.flink.table.types.logical.DoubleType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.TimeType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.VarCharType; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +/** Unit tests for FlinkArrowWriter. */ +public class FlinkArrowWriterTest { + + private BufferAllocator allocator; + + @BeforeEach + public void setUp() { + allocator = FlinkArrowUtils.createChildAllocator("test"); + } + + @AfterEach + public void tearDown() { + allocator.close(); + } + + @Test + public void testWriteBasicTypes() { + RowType rowType = RowType.of( + new LogicalType[] {new IntType(), new VarCharType(100), new DoubleType(), new BooleanType()}, + new String[] {"id", "name", "score", "active"}); + + Schema arrowSchema = FlinkArrowUtils.toArrowSchema(rowType); + try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, allocator)) { + FlinkArrowWriter writer = FlinkArrowWriter.create(root, rowType); + + // Write first row + GenericRowData row1 = new GenericRowData(4); + row1.setField(0, 1); + row1.setField(1, StringData.fromString("Alice")); + row1.setField(2, 95.5); + row1.setField(3, true); + writer.write(row1); + + // Write second row + GenericRowData row2 = new GenericRowData(4); + row2.setField(0, 2); + row2.setField(1, StringData.fromString("Bob")); + row2.setField(2, 87.3); + row2.setField(3, false); + writer.write(row2); + + writer.finish(); + + assertEquals(2, root.getRowCount()); + assertEquals(1, root.getVector("id").getObject(0)); + assertEquals(2, root.getVector("id").getObject(1)); + } + } + + @Test + public void testWriteNullValues() { + RowType rowType = + RowType.of(new LogicalType[] {new IntType(), new VarCharType(100)}, new String[] {"id", "name"}); + + Schema arrowSchema = FlinkArrowUtils.toArrowSchema(rowType); + try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, allocator)) { + FlinkArrowWriter writer = FlinkArrowWriter.create(root, rowType); + + // Write row with null values + GenericRowData row = new GenericRowData(2); + row.setField(0, null); + row.setField(1, null); + writer.write(row); + + writer.finish(); + + assertEquals(1, root.getRowCount()); + assertTrue(root.getVector("id").isNull(0)); + assertTrue(root.getVector("name").isNull(0)); + } + } + + @Test + public void testWriteArrayType() { + RowType rowType = RowType.of(new LogicalType[] {new ArrayType(new IntType())}, new String[] {"numbers"}); + + Schema arrowSchema = FlinkArrowUtils.toArrowSchema(rowType); + try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, allocator)) { + FlinkArrowWriter writer = FlinkArrowWriter.create(root, rowType); + + GenericRowData row = new GenericRowData(1); + row.setField(0, new GenericArrayData(new Integer[] {1, 2, 3})); + writer.write(row); + + writer.finish(); + + assertEquals(1, root.getRowCount()); + } + } + + @Test + public void testWriteMapType() { + RowType rowType = RowType.of( + new LogicalType[] {new MapType(new VarCharType(100), new IntType())}, new String[] {"scores"}); + + Schema arrowSchema = FlinkArrowUtils.toArrowSchema(rowType); + try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, allocator)) { + FlinkArrowWriter writer = FlinkArrowWriter.create(root, rowType); + + Map<StringData, Integer> mapData = new HashMap<>(); + mapData.put(StringData.fromString("math"), 90); + mapData.put(StringData.fromString("english"), 85); + + GenericRowData row = new GenericRowData(1); + row.setField(0, new GenericMapData(mapData)); + writer.write(row); + + writer.finish(); + + assertEquals(1, root.getRowCount()); + } + } + + @Test + public void testTimestampPrecision() { + RowType rowType = RowType.of(new LogicalType[] {new TimestampType(6)}, new String[] {"ts"}); + + Schema arrowSchema = FlinkArrowUtils.toArrowSchema(rowType); + try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, allocator)) { + FlinkArrowWriter writer = FlinkArrowWriter.create(root, rowType); + + // Create timestamp with microsecond precision + long millis = 1705622400000L; // 2024-01-19 00:00:00.000 + int nanos = 123456; // 123.456 microseconds Review Comment: The comment states "123.456 microseconds" but the value 123456 represents nanoseconds, not microseconds. In the context of TimestampData.fromEpochMillis(), the second parameter is the nanosecond-of-millisecond value (0-999999). The comment should be corrected to "123456 nanoseconds" or "123.456 microseconds worth of nanoseconds". ```suggestion int nanos = 123456; // 123456 nanoseconds (123.456 microseconds) ``` ########## auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowFFIExporter.java: ########## @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.auron.flink.arrow; + +import java.util.Iterator; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import org.apache.arrow.c.ArrowArray; +import org.apache.arrow.c.ArrowSchema; +import org.apache.arrow.c.Data; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.dictionary.DictionaryProvider; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.auron.arrowio.AuronArrowFFIExporter; +import org.apache.auron.configuration.AuronConfiguration; +import org.apache.auron.jni.AuronAdaptor; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; + +/** + * Exports Flink RowData to Arrow format via FFI (Foreign Function Interface) + * for consumption by native code. + * + * <p>This exporter uses an asynchronous producer-consumer model with double + * queues to ensure safe resource management. The producer thread creates + * batches ahead of time while the consumer (native side) processes them. + * Resources are only cleaned up after the consumer confirms it has finished + * using the previous batch. + * + * <p>Key design points: + * <ul> + * <li>Producer thread pre-creates batches and puts them in outputQueue</li> + * <li>Consumer takes batches and signals via processingQueue when done</li> + * <li>Previous batch resources are cleaned up only after consumer confirms</li> + * <li>No TaskContext in Flink, so cancellation uses closed flag + thread interrupt</li> + * </ul> + */ +public class FlinkArrowFFIExporter extends AuronArrowFFIExporter { + + /** Queue state representing a data batch ready for export. */ + private static final class NextBatch { + final VectorSchemaRoot root; + final BufferAllocator allocator; + + NextBatch(VectorSchemaRoot root, BufferAllocator allocator) { + this.root = root; + this.allocator = allocator; + } + } + + /** Queue state representing end of data or an error. */ + private static final class Finished { + final Throwable error; // null means normal completion + + Finished(Throwable error) { + this.error = error; + } + } + + private final Iterator<RowData> rowIterator; + private final RowType rowType; + private final Schema arrowSchema; + private final DictionaryProvider.MapDictionaryProvider emptyDictionaryProvider; + private final int maxBatchNumRows; + private final long maxBatchMemorySize; + + // Double queue synchronization (capacity 4, smaller than Spark's 16 for streaming) + private final BlockingQueue<Object> outputQueue; + private final BlockingQueue<Object> processingQueue; + + // Previous batch resources (cleaned up after consumer confirms) + private VectorSchemaRoot previousRoot; + private BufferAllocator previousAllocator; + + // Producer thread + private final Thread producerThread; + private volatile boolean closed = false; + + /** + * Creates a new FlinkArrowFFIExporter. + * + * @param rowIterator Iterator over RowData to export + * @param rowType The Flink row type + */ + public FlinkArrowFFIExporter(Iterator<RowData> rowIterator, RowType rowType) { + this.rowIterator = rowIterator; + this.rowType = rowType; + this.arrowSchema = FlinkArrowUtils.toArrowSchema(rowType); + this.emptyDictionaryProvider = new DictionaryProvider.MapDictionaryProvider(); + + // Get configuration + AuronConfiguration config = AuronAdaptor.getInstance().getAuronConfiguration(); + this.maxBatchNumRows = config.getInteger(AuronConfiguration.BATCH_SIZE); + this.maxBatchMemorySize = 8 * 1024 * 1024; // 8MB default, same as Spark + + this.outputQueue = new ArrayBlockingQueue<>(4); + this.processingQueue = new ArrayBlockingQueue<>(4); + + this.producerThread = startProducerThread(); + } + + /** + * Starts the producer thread that creates batches asynchronously. + */ + private Thread startProducerThread() { + Thread thread = new Thread( + () -> { + try { + while (!closed && !Thread.currentThread().isInterrupted()) { + if (!rowIterator.hasNext()) { + outputQueue.put(new Finished(null)); + return; + } + + // Create a new batch + BufferAllocator allocator = + FlinkArrowUtils.createChildAllocator("FlinkArrowFFIExporter-producer"); + VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, allocator); + FlinkArrowWriter writer = FlinkArrowWriter.create(root, rowType); + + // Fill the batch with data + while (!closed + && rowIterator.hasNext() + && allocator.getAllocatedMemory() < maxBatchMemorySize + && writer.currentCount() < maxBatchNumRows) { + writer.write(rowIterator.next()); + } + writer.finish(); + + // Put batch in output queue for consumer + outputQueue.put(new NextBatch(root, allocator)); + + // Wait for consumer to confirm it's done with previous batch + // This is critical for safe resource management! + processingQueue.take(); + } + outputQueue.put(new Finished(null)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + // Normal interruption, not an error + } catch (Throwable e) { + outputQueue.clear(); + try { + outputQueue.put(new Finished(e)); + } catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + } + } + }, + "FlinkArrowFFIExporter-producer"); + + thread.setDaemon(true); + thread.setUncaughtExceptionHandler((t, e) -> { + outputQueue.clear(); + try { + outputQueue.put(new Finished(e)); + } catch (InterruptedException ignored) { + // Ignore + } + }); + thread.start(); + return thread; + } + + /** + * Exports the Arrow schema to the native side. + * + * @param exportArrowSchemaPtr Pointer to the Arrow C Data Interface schema + * structure + */ + public void exportSchema(long exportArrowSchemaPtr) { + try (ArrowSchema exportSchema = ArrowSchema.wrap(exportArrowSchemaPtr)) { + Data.exportSchema(FlinkArrowUtils.ROOT_ALLOCATOR, arrowSchema, emptyDictionaryProvider, exportSchema); + } + } + + /** + * Exports the next batch of data to the native side. + * + * <p>This method takes a batch from the producer thread and exports it + * via the Arrow C Data Interface. The previous batch's resources are + * cleaned up before taking the new batch (after consumer confirms). + * + * @param exportArrowArrayPtr Pointer to the Arrow C Data Interface array + * structure + * @return true if a batch was exported, false if no more data is available + */ + @Override + public boolean exportNextBatch(long exportArrowArrayPtr) { + // Clean up previous batch resources (consumer has confirmed it's done) + cleanupPreviousBatch(); + + if (closed) { + return false; + } + + // Wait for producer to provide next batch + Object state; + try { + state = outputQueue.take(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return false; + } + + if (state instanceof Finished) { + Finished finished = (Finished) state; + if (finished.error != null) { + throw new RuntimeException("Producer thread error", finished.error); + } + return false; + } + + NextBatch batch = (NextBatch) state; + + // Export data via Arrow FFI + try (ArrowArray exportArray = ArrowArray.wrap(exportArrowArrayPtr)) { + Data.exportVectorSchemaRoot( + FlinkArrowUtils.ROOT_ALLOCATOR, batch.root, emptyDictionaryProvider, exportArray); + } + + // Save references for cleanup on next call + previousRoot = batch.root; + previousAllocator = batch.allocator; + + // Signal producer that it can continue (we've taken the batch) + try { + processingQueue.put(new Object()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); Review Comment: If an InterruptedException occurs while trying to signal the producer at line 243-246, the method swallows the exception after setting the interrupt flag but still returns true, indicating success. This could lead to the producer thread remaining blocked indefinitely on processingQueue.take(). Consider returning false or handling this more explicitly to avoid potential deadlock scenarios. ```suggestion Thread.currentThread().interrupt(); // Failed to signal producer; indicate failure to avoid potential deadlock return false; ``` ########## auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowFFIExporterTest.java: ########## @@ -0,0 +1,365 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.auron.flink.arrow; + +import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assumptions.assumeTrue; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import org.apache.arrow.c.ArrowArray; +import org.apache.arrow.c.ArrowSchema; +import org.apache.arrow.c.Data; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.VarCharType; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +/** Unit tests for FlinkArrowFFIExporter. */ +public class FlinkArrowFFIExporterTest { + + private BufferAllocator testAllocator; + private static Boolean nativeLibrariesAvailable; + + /** + * Check if Arrow FFI native libraries are available. + * FFI operations require JNI, which needs native libraries. + */ + private static boolean isNativeLibraryAvailable() { + if (nativeLibrariesAvailable == null) { + try { + // Try to load the JNI library - JniWrapper.get() triggers ensureLoaded() + org.apache.arrow.c.jni.JniWrapper.get(); + nativeLibrariesAvailable = true; + } catch (Throwable e) { + // Catch all throwables including IllegalStateException, UnsatisfiedLinkError, etc. + nativeLibrariesAvailable = false; + } + } + return nativeLibrariesAvailable; + } + + @BeforeEach + public void setUp() { + testAllocator = new RootAllocator(Long.MAX_VALUE); + } + + @AfterEach + public void tearDown() { + testAllocator.close(); + } + + @Test + public void testExportSchemaCreatesExporter() { + RowType rowType = + RowType.of(new LogicalType[] {new IntType(), new VarCharType(100)}, new String[] {"id", "name"}); + + Iterator<RowData> emptyIter = Collections.emptyIterator(); + + try (FlinkArrowFFIExporter exporter = new FlinkArrowFFIExporter(emptyIter, rowType)) { + assertNotNull(exporter); + } + } + + @Test + public void testExporterWithEmptyIterator() { + RowType rowType = RowType.of(new LogicalType[] {new IntType()}, new String[] {"id"}); + Iterator<RowData> emptyIter = Collections.emptyIterator(); + + try (FlinkArrowFFIExporter exporter = new FlinkArrowFFIExporter(emptyIter, rowType)) { + assertNotNull(exporter); + } + } + + @Test + public void testExporterWithMultipleRows() { + RowType rowType = RowType.of(new LogicalType[] {new IntType()}, new String[] {"id"}); + + GenericRowData row1 = new GenericRowData(1); + row1.setField(0, 1); + GenericRowData row2 = new GenericRowData(1); + row2.setField(0, 2); + + Iterator<RowData> iter = Arrays.asList((RowData) row1, row2).iterator(); + + try (FlinkArrowFFIExporter exporter = new FlinkArrowFFIExporter(iter, rowType)) { + assertNotNull(exporter); + } + } + + @Test + public void testExporterWithComplexTypes() { + RowType rowType = + RowType.of(new LogicalType[] {new IntType(), new VarCharType(100)}, new String[] {"id", "name"}); + + GenericRowData row = new GenericRowData(2); + row.setField(0, 42); + row.setField(1, StringData.fromString("test")); + + Iterator<RowData> iter = Collections.singletonList((RowData) row).iterator(); + + try (FlinkArrowFFIExporter exporter = new FlinkArrowFFIExporter(iter, rowType)) { + assertNotNull(exporter); + } + } + + @Test + public void testExporterCloseIsIdempotent() { + RowType rowType = RowType.of(new LogicalType[] {new IntType()}, new String[] {"id"}); + Iterator<RowData> emptyIter = Collections.emptyIterator(); + + FlinkArrowFFIExporter exporter = new FlinkArrowFFIExporter(emptyIter, rowType); + exporter.close(); + exporter.close(); + } + + @Test + public void testExportSchemaWithArrowFFI() { + assumeTrue(isNativeLibraryAvailable(), "Skipping test: Arrow FFI native libraries not available"); + + RowType rowType = + RowType.of(new LogicalType[] {new IntType(), new VarCharType(100)}, new String[] {"id", "name"}); + + try (FlinkArrowFFIExporter exporter = new FlinkArrowFFIExporter(Collections.emptyIterator(), rowType); + ArrowSchema arrowSchema = ArrowSchema.allocateNew(testAllocator)) { + + exporter.exportSchema(arrowSchema.memoryAddress()); + + org.apache.arrow.vector.types.pojo.Schema schema = Data.importSchema(testAllocator, arrowSchema, null); + assertEquals(2, schema.getFields().size()); + assertEquals("id", schema.getFields().get(0).getName()); + assertEquals("name", schema.getFields().get(1).getName()); + } + } + + @Test + public void testExportNextBatchEndToEnd() { + assumeTrue(isNativeLibraryAvailable(), "Skipping test: Arrow FFI native libraries not available"); + + RowType rowType = + RowType.of(new LogicalType[] {new IntType(), new VarCharType(100)}, new String[] {"id", "name"}); + + List<RowData> rows = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + GenericRowData row = new GenericRowData(2); + row.setField(0, i); + row.setField(1, StringData.fromString("value" + i)); + rows.add(row); + } + + try (FlinkArrowFFIExporter exporter = new FlinkArrowFFIExporter(rows.iterator(), rowType); + ArrowArray arrowArray = ArrowArray.allocateNew(testAllocator); + ArrowSchema arrowSchema = ArrowSchema.allocateNew(testAllocator)) { + + // Export schema + exporter.exportSchema(arrowSchema.memoryAddress()); + + // Export batch + assertTrue(exporter.exportNextBatch(arrowArray.memoryAddress())); + + // Import and verify - use ArrowSchema directly + try (VectorSchemaRoot root = Data.importVectorSchemaRoot(testAllocator, arrowArray, arrowSchema, null)) { + assertTrue(root.getRowCount() > 0); + + // Verify data + IntVector idVector = (IntVector) root.getVector("id"); + VarCharVector nameVector = (VarCharVector) root.getVector("name"); + + assertEquals(0, idVector.get(0)); + assertEquals("value0", new String(nameVector.get(0))); + } + } + } + + @Test + public void testExportEmptyIteratorReturnsFalse() { + assumeTrue(isNativeLibraryAvailable(), "Skipping test: Arrow FFI native libraries not available"); + + RowType rowType = RowType.of(new LogicalType[] {new IntType()}, new String[] {"id"}); + + try (FlinkArrowFFIExporter exporter = new FlinkArrowFFIExporter(Collections.emptyIterator(), rowType); + ArrowArray arrowArray = ArrowArray.allocateNew(testAllocator)) { + + // Empty iterator should return false on first call + assertFalse(exporter.exportNextBatch(arrowArray.memoryAddress())); + } + } + + @Test + public void testExportMultipleBatches() { + assumeTrue(isNativeLibraryAvailable(), "Skipping test: Arrow FFI native libraries not available"); + + RowType rowType = RowType.of(new LogicalType[] {new IntType()}, new String[] {"id"}); + + // Create enough rows to potentially span multiple batches + List<RowData> rows = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + GenericRowData row = new GenericRowData(1); + row.setField(0, i); + rows.add(row); + } + + int totalRowsExported = 0; + + try (FlinkArrowFFIExporter exporter = new FlinkArrowFFIExporter(rows.iterator(), rowType); + ArrowSchema arrowSchema = ArrowSchema.allocateNew(testAllocator)) { + + exporter.exportSchema(arrowSchema.memoryAddress()); + + // Export all batches + while (true) { + try (ArrowArray arrowArray = ArrowArray.allocateNew(testAllocator); + ArrowSchema batchSchema = ArrowSchema.allocateNew(testAllocator)) { + // Re-export schema for each batch since ArrowSchema is consumed + exporter.exportSchema(batchSchema.memoryAddress()); + + if (!exporter.exportNextBatch(arrowArray.memoryAddress())) { + break; + } + + try (VectorSchemaRoot root = + Data.importVectorSchemaRoot(testAllocator, arrowArray, batchSchema, null)) { Review Comment: The comment at line 242 states "Re-export schema for each batch since ArrowSchema is consumed". However, calling exportSchema() multiple times is not necessary - the schema only needs to be exported once at the beginning. The ArrowSchema used for importVectorSchemaRoot can be the same arrowSchema from line 234. Re-exporting the schema for each batch is inefficient and could potentially cause issues. The test should be updated to reuse the single schema export. ```suggestion try (ArrowArray arrowArray = ArrowArray.allocateNew(testAllocator)) { if (!exporter.exportNextBatch(arrowArray.memoryAddress())) { break; } try (VectorSchemaRoot root = Data.importVectorSchemaRoot(testAllocator, arrowArray, arrowSchema, null)) { ``` ########## auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowWriter.java: ########## @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.auron.flink.arrow; + +import java.util.List; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.ValueVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.complex.ListVector; +import org.apache.arrow.vector.complex.MapVector; +import org.apache.arrow.vector.complex.StructVector; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.BinaryType; +import org.apache.flink.table.types.logical.BooleanType; +import org.apache.flink.table.types.logical.CharType; +import org.apache.flink.table.types.logical.DateType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.DoubleType; +import org.apache.flink.table.types.logical.FloatType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.NullType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.SmallIntType; +import org.apache.flink.table.types.logical.TimeType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.TinyIntType; +import org.apache.flink.table.types.logical.VarBinaryType; +import org.apache.flink.table.types.logical.VarCharType; + +/** Writer that converts Flink RowData to Arrow VectorSchemaRoot. */ +public class FlinkArrowWriter { + + private final VectorSchemaRoot root; + private final FlinkArrowFieldWriter[] fieldWriters; + + private FlinkArrowWriter(VectorSchemaRoot root, FlinkArrowFieldWriter[] fieldWriters) { + this.root = root; + this.fieldWriters = fieldWriters; + } + + /** + * Creates a FlinkArrowWriter from a Flink RowType. + * + * @param rowType The Flink row type + * @return A new FlinkArrowWriter instance + */ + public static FlinkArrowWriter create(RowType rowType) { + Schema arrowSchema = FlinkArrowUtils.toArrowSchema(rowType); + VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, FlinkArrowUtils.ROOT_ALLOCATOR); + return create(root, rowType); + } Review Comment: The FlinkArrowWriter class lacks a close() or cleanup method to manage the lifecycle of the VectorSchemaRoot it creates in the create(RowType) factory method. Users who call create(RowType) have no way to properly close the created VectorSchemaRoot, potentially leading to resource leaks. Consider either making FlinkArrowWriter implement AutoCloseable or documenting that users are responsible for closing the VectorSchemaRoot obtained via getRoot(). ########## auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowFieldWriter.java: ########## @@ -0,0 +1,688 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.auron.flink.arrow; + +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.BitVector; +import org.apache.arrow.vector.DateDayVector; +import org.apache.arrow.vector.DecimalVector; +import org.apache.arrow.vector.Float4Vector; +import org.apache.arrow.vector.Float8Vector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.NullVector; +import org.apache.arrow.vector.SmallIntVector; +import org.apache.arrow.vector.TimeMicroVector; +import org.apache.arrow.vector.TimeStampMicroTZVector; +import org.apache.arrow.vector.TimeStampMicroVector; +import org.apache.arrow.vector.TinyIntVector; +import org.apache.arrow.vector.ValueVector; +import org.apache.arrow.vector.VarBinaryVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.complex.ListVector; +import org.apache.arrow.vector.complex.MapVector; +import org.apache.arrow.vector.complex.StructVector; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.TimestampData; + +/** + * Base class for Arrow field writers that convert Flink RowData fields to Arrow + * vectors. Supports reading from both RowData and ArrayData for nested + * structures. + */ +public abstract class FlinkArrowFieldWriter { + + protected final ValueVector valueVector; + protected int count = 0; + + protected FlinkArrowFieldWriter(ValueVector valueVector) { + this.valueVector = valueVector; + } + + /** + * Writes a field value from RowData at the specified position. + * + * @param row The RowData containing the field + * @param ordinal The position of the field in the row + */ + public void write(RowData row, int ordinal) { + if (row.isNullAt(ordinal)) { + setNull(); + } else { + setValue(row, ordinal); + } + count++; + } + + /** + * Writes an element value from ArrayData at the specified position. + * Used for array elements and map keys/values. + * + * @param array The ArrayData containing the element + * @param ordinal The position of the element in the array + */ + public void writeFromArray(ArrayData array, int ordinal) { + if (array.isNullAt(ordinal)) { + setNull(); + } else { + setValueFromArray(array, ordinal); + } + count++; + } + + /** Sets a null value at the current position. */ + protected abstract void setNull(); + + /** + * Sets a non-null value from RowData at the current position. + * + * @param row The RowData containing the field + * @param ordinal The position of the field in the row + */ + protected abstract void setValue(RowData row, int ordinal); + + /** + * Sets a non-null value from ArrayData at the current position. + * + * @param array The ArrayData containing the element + * @param ordinal The position of the element in the array + */ + protected abstract void setValueFromArray(ArrayData array, int ordinal); + + /** Finalizes the writing process for the current batch. */ + public void finish() { + valueVector.setValueCount(count); + } + + /** Resets the writer for a new batch. */ + public void reset() { + valueVector.reset(); + count = 0; + } + + public int getCount() { + return count; + } + + /** NullWriter for writing null values. */ + public static class NullWriter extends FlinkArrowFieldWriter { + public NullWriter(NullVector vector) { + super(vector); + } + + @Override + protected void setNull() {} + + @Override + protected void setValue(RowData row, int ordinal) {} + + @Override + protected void setValueFromArray(ArrayData array, int ordinal) {} + } + + /** BooleanWriter for writing boolean values. */ + public static class BooleanWriter extends FlinkArrowFieldWriter { + private final BitVector vector; + + public BooleanWriter(BitVector vector) { + super(vector); + this.vector = vector; + } + + @Override + protected void setNull() { + vector.setNull(count); + } + + @Override + protected void setValue(RowData row, int ordinal) { + vector.setSafe(count, row.getBoolean(ordinal) ? 1 : 0); + } + + @Override + protected void setValueFromArray(ArrayData array, int ordinal) { + vector.setSafe(count, array.getBoolean(ordinal) ? 1 : 0); + } + } + + /** TinyIntWriter for writing byte values. */ + public static class TinyIntWriter extends FlinkArrowFieldWriter { + private final TinyIntVector vector; + + public TinyIntWriter(TinyIntVector vector) { + super(vector); + this.vector = vector; + } + + @Override + protected void setNull() { + vector.setNull(count); + } + + @Override + protected void setValue(RowData row, int ordinal) { + vector.setSafe(count, row.getByte(ordinal)); + } + + @Override + protected void setValueFromArray(ArrayData array, int ordinal) { + vector.setSafe(count, array.getByte(ordinal)); + } + } + + /** SmallIntWriter for writing short values. */ + public static class SmallIntWriter extends FlinkArrowFieldWriter { + private final SmallIntVector vector; + + public SmallIntWriter(SmallIntVector vector) { + super(vector); + this.vector = vector; + } + + @Override + protected void setNull() { + vector.setNull(count); + } + + @Override + protected void setValue(RowData row, int ordinal) { + vector.setSafe(count, row.getShort(ordinal)); + } + + @Override + protected void setValueFromArray(ArrayData array, int ordinal) { + vector.setSafe(count, array.getShort(ordinal)); + } + } + + /** IntWriter for writing integer values. */ + public static class IntWriter extends FlinkArrowFieldWriter { + private final IntVector vector; + + public IntWriter(IntVector vector) { + super(vector); + this.vector = vector; + } + + @Override + protected void setNull() { + vector.setNull(count); + } + + @Override + protected void setValue(RowData row, int ordinal) { + vector.setSafe(count, row.getInt(ordinal)); + } + + @Override + protected void setValueFromArray(ArrayData array, int ordinal) { + vector.setSafe(count, array.getInt(ordinal)); + } + } + + /** BigIntWriter for writing long values. */ + public static class BigIntWriter extends FlinkArrowFieldWriter { + private final BigIntVector vector; + + public BigIntWriter(BigIntVector vector) { + super(vector); + this.vector = vector; + } + + @Override + protected void setNull() { + vector.setNull(count); + } + + @Override + protected void setValue(RowData row, int ordinal) { + vector.setSafe(count, row.getLong(ordinal)); + } + + @Override + protected void setValueFromArray(ArrayData array, int ordinal) { + vector.setSafe(count, array.getLong(ordinal)); + } + } + + /** FloatWriter for writing float values. */ + public static class FloatWriter extends FlinkArrowFieldWriter { + private final Float4Vector vector; + + public FloatWriter(Float4Vector vector) { + super(vector); + this.vector = vector; + } + + @Override + protected void setNull() { + vector.setNull(count); + } + + @Override + protected void setValue(RowData row, int ordinal) { + vector.setSafe(count, row.getFloat(ordinal)); + } + + @Override + protected void setValueFromArray(ArrayData array, int ordinal) { + vector.setSafe(count, array.getFloat(ordinal)); + } + } + + /** DoubleWriter for writing double values. */ + public static class DoubleWriter extends FlinkArrowFieldWriter { + private final Float8Vector vector; + + public DoubleWriter(Float8Vector vector) { + super(vector); + this.vector = vector; + } + + @Override + protected void setNull() { + vector.setNull(count); + } + + @Override + protected void setValue(RowData row, int ordinal) { + vector.setSafe(count, row.getDouble(ordinal)); + } + + @Override + protected void setValueFromArray(ArrayData array, int ordinal) { + vector.setSafe(count, array.getDouble(ordinal)); + } + } + + /** StringWriter for writing string values. */ + public static class StringWriter extends FlinkArrowFieldWriter { + private final VarCharVector vector; + + public StringWriter(VarCharVector vector) { + super(vector); + this.vector = vector; + } + + @Override + protected void setNull() { + vector.setNull(count); + } + + @Override + protected void setValue(RowData row, int ordinal) { + byte[] bytes = row.getString(ordinal).toBytes(); + vector.setSafe(count, bytes); + } + + @Override + protected void setValueFromArray(ArrayData array, int ordinal) { + byte[] bytes = array.getString(ordinal).toBytes(); + vector.setSafe(count, bytes); + } + } + + /** BinaryWriter for writing binary values. */ + public static class BinaryWriter extends FlinkArrowFieldWriter { + private final VarBinaryVector vector; + + public BinaryWriter(VarBinaryVector vector) { + super(vector); + this.vector = vector; + } + + @Override + protected void setNull() { + vector.setNull(count); + } + + @Override + protected void setValue(RowData row, int ordinal) { + byte[] bytes = row.getBinary(ordinal); + vector.setSafe(count, bytes); + } + + @Override + protected void setValueFromArray(ArrayData array, int ordinal) { + byte[] bytes = array.getBinary(ordinal); + vector.setSafe(count, bytes); + } + } + + /** DecimalWriter for writing decimal values. */ + public static class DecimalWriter extends FlinkArrowFieldWriter { + private final DecimalVector vector; + private final int precision; + private final int scale; + + public DecimalWriter(DecimalVector vector, int precision, int scale) { + super(vector); + this.vector = vector; + this.precision = precision; + this.scale = scale; + } + + @Override + protected void setNull() { + vector.setNull(count); + } + + @Override + protected void setValue(RowData row, int ordinal) { + DecimalData decimal = row.getDecimal(ordinal, precision, scale); + vector.setSafe(count, decimal.toBigDecimal()); + } + + @Override + protected void setValueFromArray(ArrayData array, int ordinal) { + DecimalData decimal = array.getDecimal(ordinal, precision, scale); + vector.setSafe(count, decimal.toBigDecimal()); + } + } + + /** DateWriter for writing date values. */ + public static class DateWriter extends FlinkArrowFieldWriter { + private final DateDayVector vector; + + public DateWriter(DateDayVector vector) { + super(vector); + this.vector = vector; + } + + @Override + protected void setNull() { + vector.setNull(count); + } + + @Override + protected void setValue(RowData row, int ordinal) { + vector.setSafe(count, row.getInt(ordinal)); + } + + @Override + protected void setValueFromArray(ArrayData array, int ordinal) { + vector.setSafe(count, array.getInt(ordinal)); + } + } + + /** TimestampWriter for writing timestamp values. */ + public static class TimestampWriter extends FlinkArrowFieldWriter { + private final TimeStampMicroVector vector; + private final int precision; + + public TimestampWriter(TimeStampMicroVector vector, int precision) { + super(vector); + this.vector = vector; + this.precision = precision; + } + + @Override + protected void setNull() { + vector.setNull(count); + } + + @Override + protected void setValue(RowData row, int ordinal) { + TimestampData timestamp = row.getTimestamp(ordinal, precision); + // Convert to microseconds: milliseconds * 1000 + nanoseconds / 1000 + long micros = timestamp.getMillisecond() * 1000L + timestamp.getNanoOfMillisecond() / 1000; + vector.setSafe(count, micros); + } + + @Override + protected void setValueFromArray(ArrayData array, int ordinal) { + TimestampData timestamp = array.getTimestamp(ordinal, precision); + long micros = timestamp.getMillisecond() * 1000L + timestamp.getNanoOfMillisecond() / 1000; + vector.setSafe(count, micros); + } + } + + /** TimeWriter for writing time values. */ + public static class TimeWriter extends FlinkArrowFieldWriter { + private final TimeMicroVector vector; + + public TimeWriter(TimeMicroVector vector) { + super(vector); + this.vector = vector; + } + + @Override + protected void setNull() { + vector.setNull(count); + } + + @Override + protected void setValue(RowData row, int ordinal) { + // Flink TimeType stores time as milliseconds (int), convert to microseconds + int millis = row.getInt(ordinal); + vector.setSafe(count, millis * 1000L); + } + + @Override + protected void setValueFromArray(ArrayData array, int ordinal) { + int millis = array.getInt(ordinal); + vector.setSafe(count, millis * 1000L); + } + } + + /** LocalZonedTimestampWriter for writing local-zoned timestamp values with UTC timezone. */ + public static class LocalZonedTimestampWriter extends FlinkArrowFieldWriter { + private final TimeStampMicroTZVector vector; + private final int precision; + + public LocalZonedTimestampWriter(TimeStampMicroTZVector vector, int precision) { + super(vector); + this.vector = vector; + this.precision = precision; + } + + @Override + protected void setNull() { + vector.setNull(count); + } + + @Override + protected void setValue(RowData row, int ordinal) { + TimestampData timestamp = row.getTimestamp(ordinal, precision); + // Convert to microseconds: milliseconds * 1000 + nanoseconds / 1000 + long micros = timestamp.getMillisecond() * 1000L + timestamp.getNanoOfMillisecond() / 1000; + vector.setSafe(count, micros); + } + + @Override + protected void setValueFromArray(ArrayData array, int ordinal) { + TimestampData timestamp = array.getTimestamp(ordinal, precision); + long micros = timestamp.getMillisecond() * 1000L + timestamp.getNanoOfMillisecond() / 1000; + vector.setSafe(count, micros); + } + } + + /** ArrayWriter for writing array values using recursive field writers. */ + public static class ArrayWriter extends FlinkArrowFieldWriter { + private final ListVector vector; + private final FlinkArrowFieldWriter elementWriter; + + public ArrayWriter(ListVector vector, FlinkArrowFieldWriter elementWriter) { + super(vector); + this.vector = vector; + this.elementWriter = elementWriter; + } + + @Override + protected void setNull() { + vector.setNull(count); + } + + @Override + protected void setValue(RowData row, int ordinal) { + ArrayData array = row.getArray(ordinal); + vector.startNewValue(count); + for (int i = 0; i < array.size(); i++) { + elementWriter.writeFromArray(array, i); + } + vector.endValue(count, array.size()); + } + + @Override + protected void setValueFromArray(ArrayData arrayData, int ordinal) { + ArrayData array = arrayData.getArray(ordinal); + vector.startNewValue(count); + for (int i = 0; i < array.size(); i++) { + elementWriter.writeFromArray(array, i); + } + vector.endValue(count, array.size()); + } + + @Override + public void finish() { + super.finish(); + elementWriter.finish(); + } + + @Override + public void reset() { + super.reset(); + elementWriter.reset(); + } + } + + /** MapWriter for writing map values using recursive field writers. */ + public static class MapWriter extends FlinkArrowFieldWriter { + private final MapVector vector; + private final StructVector structVector; + private final FlinkArrowFieldWriter keyWriter; + private final FlinkArrowFieldWriter valueWriter; + + public MapWriter( + MapVector vector, + StructVector structVector, + FlinkArrowFieldWriter keyWriter, + FlinkArrowFieldWriter valueWriter) { + super(vector); + this.vector = vector; + this.structVector = structVector; + this.keyWriter = keyWriter; + this.valueWriter = valueWriter; + } + + @Override + protected void setNull() { + vector.setNull(count); + } + + @Override + protected void setValue(RowData row, int ordinal) { + MapData map = row.getMap(ordinal); + ArrayData keys = map.keyArray(); + ArrayData values = map.valueArray(); + + vector.startNewValue(count); + for (int i = 0; i < map.size(); i++) { + structVector.setIndexDefined(keyWriter.getCount()); + keyWriter.writeFromArray(keys, i); + valueWriter.writeFromArray(values, i); + } + vector.endValue(count, map.size()); + } + + @Override + protected void setValueFromArray(ArrayData arrayData, int ordinal) { + MapData map = arrayData.getMap(ordinal); + ArrayData keys = map.keyArray(); + ArrayData values = map.valueArray(); + + vector.startNewValue(count); + for (int i = 0; i < map.size(); i++) { + structVector.setIndexDefined(keyWriter.getCount()); + keyWriter.writeFromArray(keys, i); + valueWriter.writeFromArray(values, i); + } + vector.endValue(count, map.size()); + } + + @Override + public void finish() { + super.finish(); + keyWriter.finish(); + valueWriter.finish(); + } + + @Override + public void reset() { + super.reset(); + keyWriter.reset(); + valueWriter.reset(); + } + } + + /** RowWriter for writing row/struct values using recursive field writers. */ + public static class RowWriter extends FlinkArrowFieldWriter { + private final StructVector vector; + private final FlinkArrowFieldWriter[] fieldWriters; + + public RowWriter(StructVector vector, FlinkArrowFieldWriter[] fieldWriters) { + super(vector); + this.vector = vector; + this.fieldWriters = fieldWriters; + } + + @Override + protected void setNull() { + for (FlinkArrowFieldWriter writer : fieldWriters) { + writer.setNull(); + writer.count++; + } Review Comment: In the setNull() method, directly manipulating the count field of child writers (writer.count++) breaks encapsulation and could lead to inconsistencies. The child writers' count is already incremented inside setNull() when called through their write() method. This double increment could cause the child writers to have incorrect counts. Consider calling writer.write() with a synthetic null RowData instead, or redesign to avoid direct count manipulation. ########## auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowFFIExporter.java: ########## @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.auron.flink.arrow; + +import java.util.Iterator; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import org.apache.arrow.c.ArrowArray; +import org.apache.arrow.c.ArrowSchema; +import org.apache.arrow.c.Data; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.dictionary.DictionaryProvider; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.auron.arrowio.AuronArrowFFIExporter; +import org.apache.auron.configuration.AuronConfiguration; +import org.apache.auron.jni.AuronAdaptor; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; + +/** + * Exports Flink RowData to Arrow format via FFI (Foreign Function Interface) + * for consumption by native code. + * + * <p>This exporter uses an asynchronous producer-consumer model with double + * queues to ensure safe resource management. The producer thread creates + * batches ahead of time while the consumer (native side) processes them. + * Resources are only cleaned up after the consumer confirms it has finished + * using the previous batch. + * + * <p>Key design points: + * <ul> + * <li>Producer thread pre-creates batches and puts them in outputQueue</li> + * <li>Consumer takes batches and signals via processingQueue when done</li> + * <li>Previous batch resources are cleaned up only after consumer confirms</li> + * <li>No TaskContext in Flink, so cancellation uses closed flag + thread interrupt</li> + * </ul> + */ +public class FlinkArrowFFIExporter extends AuronArrowFFIExporter { + + /** Queue state representing a data batch ready for export. */ + private static final class NextBatch { + final VectorSchemaRoot root; + final BufferAllocator allocator; + + NextBatch(VectorSchemaRoot root, BufferAllocator allocator) { + this.root = root; + this.allocator = allocator; + } + } + + /** Queue state representing end of data or an error. */ + private static final class Finished { + final Throwable error; // null means normal completion + + Finished(Throwable error) { + this.error = error; + } + } + + private final Iterator<RowData> rowIterator; + private final RowType rowType; + private final Schema arrowSchema; + private final DictionaryProvider.MapDictionaryProvider emptyDictionaryProvider; + private final int maxBatchNumRows; + private final long maxBatchMemorySize; + + // Double queue synchronization (capacity 4, smaller than Spark's 16 for streaming) + private final BlockingQueue<Object> outputQueue; + private final BlockingQueue<Object> processingQueue; + + // Previous batch resources (cleaned up after consumer confirms) + private VectorSchemaRoot previousRoot; + private BufferAllocator previousAllocator; + + // Producer thread + private final Thread producerThread; + private volatile boolean closed = false; + + /** + * Creates a new FlinkArrowFFIExporter. + * + * @param rowIterator Iterator over RowData to export + * @param rowType The Flink row type + */ + public FlinkArrowFFIExporter(Iterator<RowData> rowIterator, RowType rowType) { + this.rowIterator = rowIterator; + this.rowType = rowType; + this.arrowSchema = FlinkArrowUtils.toArrowSchema(rowType); + this.emptyDictionaryProvider = new DictionaryProvider.MapDictionaryProvider(); + + // Get configuration + AuronConfiguration config = AuronAdaptor.getInstance().getAuronConfiguration(); + this.maxBatchNumRows = config.getInteger(AuronConfiguration.BATCH_SIZE); + this.maxBatchMemorySize = 8 * 1024 * 1024; // 8MB default, same as Spark + + this.outputQueue = new ArrayBlockingQueue<>(4); + this.processingQueue = new ArrayBlockingQueue<>(4); + + this.producerThread = startProducerThread(); + } + + /** + * Starts the producer thread that creates batches asynchronously. + */ + private Thread startProducerThread() { + Thread thread = new Thread( + () -> { + try { + while (!closed && !Thread.currentThread().isInterrupted()) { + if (!rowIterator.hasNext()) { + outputQueue.put(new Finished(null)); + return; + } + + // Create a new batch + BufferAllocator allocator = + FlinkArrowUtils.createChildAllocator("FlinkArrowFFIExporter-producer"); + VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, allocator); + FlinkArrowWriter writer = FlinkArrowWriter.create(root, rowType); + + // Fill the batch with data + while (!closed + && rowIterator.hasNext() + && allocator.getAllocatedMemory() < maxBatchMemorySize + && writer.currentCount() < maxBatchNumRows) { + writer.write(rowIterator.next()); + } + writer.finish(); + + // Put batch in output queue for consumer + outputQueue.put(new NextBatch(root, allocator)); + + // Wait for consumer to confirm it's done with previous batch + // This is critical for safe resource management! + processingQueue.take(); + } + outputQueue.put(new Finished(null)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + // Normal interruption, not an error + } catch (Throwable e) { + outputQueue.clear(); + try { + outputQueue.put(new Finished(e)); + } catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + } + } + }, + "FlinkArrowFFIExporter-producer"); + + thread.setDaemon(true); + thread.setUncaughtExceptionHandler((t, e) -> { + outputQueue.clear(); + try { + outputQueue.put(new Finished(e)); + } catch (InterruptedException ignored) { + // Ignore + } + }); + thread.start(); + return thread; + } + + /** + * Exports the Arrow schema to the native side. + * + * @param exportArrowSchemaPtr Pointer to the Arrow C Data Interface schema + * structure + */ + public void exportSchema(long exportArrowSchemaPtr) { + try (ArrowSchema exportSchema = ArrowSchema.wrap(exportArrowSchemaPtr)) { + Data.exportSchema(FlinkArrowUtils.ROOT_ALLOCATOR, arrowSchema, emptyDictionaryProvider, exportSchema); + } + } + + /** + * Exports the next batch of data to the native side. + * + * <p>This method takes a batch from the producer thread and exports it + * via the Arrow C Data Interface. The previous batch's resources are + * cleaned up before taking the new batch (after consumer confirms). + * + * @param exportArrowArrayPtr Pointer to the Arrow C Data Interface array + * structure + * @return true if a batch was exported, false if no more data is available + */ + @Override + public boolean exportNextBatch(long exportArrowArrayPtr) { + // Clean up previous batch resources (consumer has confirmed it's done) + cleanupPreviousBatch(); + + if (closed) { + return false; + } + + // Wait for producer to provide next batch + Object state; + try { + state = outputQueue.take(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return false; + } + + if (state instanceof Finished) { + Finished finished = (Finished) state; + if (finished.error != null) { + throw new RuntimeException("Producer thread error", finished.error); + } + return false; + } + + NextBatch batch = (NextBatch) state; + + // Export data via Arrow FFI + try (ArrowArray exportArray = ArrowArray.wrap(exportArrowArrayPtr)) { + Data.exportVectorSchemaRoot( + FlinkArrowUtils.ROOT_ALLOCATOR, batch.root, emptyDictionaryProvider, exportArray); + } + + // Save references for cleanup on next call + previousRoot = batch.root; + previousAllocator = batch.allocator; + + // Signal producer that it can continue (we've taken the batch) + try { + processingQueue.put(new Object()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + return true; + } + + /** + * Cleans up resources from the previous batch. + */ + private void cleanupPreviousBatch() { + if (previousRoot != null) { + previousRoot.close(); + previousRoot = null; + } + if (previousAllocator != null) { + previousAllocator.close(); + previousAllocator = null; + } + } + + @Override + public void close() { + if (closed) { + return; + } + closed = true; + producerThread.interrupt(); + cleanupPreviousBatch(); + + // Drain any remaining batches in the queue to prevent resource leaks + Object state; + while ((state = outputQueue.poll()) != null) { + if (state instanceof NextBatch) { + NextBatch batch = (NextBatch) state; + batch.root.close(); + batch.allocator.close(); + } + } + } Review Comment: In the close method, after draining the queue, consider also draining the processingQueue. If the producer thread is blocked on processingQueue.take() at line 150, the thread interrupt alone may not be sufficient to guarantee timely termination. Adding processingQueue.clear() and/or placing a sentinel value could help ensure the producer thread can exit cleanly. ########## auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowFFIExporter.java: ########## @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.auron.flink.arrow; + +import java.util.Iterator; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import org.apache.arrow.c.ArrowArray; +import org.apache.arrow.c.ArrowSchema; +import org.apache.arrow.c.Data; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.dictionary.DictionaryProvider; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.auron.arrowio.AuronArrowFFIExporter; +import org.apache.auron.configuration.AuronConfiguration; +import org.apache.auron.jni.AuronAdaptor; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; + +/** + * Exports Flink RowData to Arrow format via FFI (Foreign Function Interface) + * for consumption by native code. + * + * <p>This exporter uses an asynchronous producer-consumer model with double + * queues to ensure safe resource management. The producer thread creates + * batches ahead of time while the consumer (native side) processes them. + * Resources are only cleaned up after the consumer confirms it has finished + * using the previous batch. + * + * <p>Key design points: + * <ul> + * <li>Producer thread pre-creates batches and puts them in outputQueue</li> + * <li>Consumer takes batches and signals via processingQueue when done</li> + * <li>Previous batch resources are cleaned up only after consumer confirms</li> + * <li>No TaskContext in Flink, so cancellation uses closed flag + thread interrupt</li> + * </ul> + */ +public class FlinkArrowFFIExporter extends AuronArrowFFIExporter { + + /** Queue state representing a data batch ready for export. */ + private static final class NextBatch { + final VectorSchemaRoot root; + final BufferAllocator allocator; + + NextBatch(VectorSchemaRoot root, BufferAllocator allocator) { + this.root = root; + this.allocator = allocator; + } + } + + /** Queue state representing end of data or an error. */ + private static final class Finished { + final Throwable error; // null means normal completion + + Finished(Throwable error) { + this.error = error; + } + } + + private final Iterator<RowData> rowIterator; + private final RowType rowType; + private final Schema arrowSchema; + private final DictionaryProvider.MapDictionaryProvider emptyDictionaryProvider; + private final int maxBatchNumRows; + private final long maxBatchMemorySize; + + // Double queue synchronization (capacity 4, smaller than Spark's 16 for streaming) + private final BlockingQueue<Object> outputQueue; + private final BlockingQueue<Object> processingQueue; + + // Previous batch resources (cleaned up after consumer confirms) + private VectorSchemaRoot previousRoot; + private BufferAllocator previousAllocator; + + // Producer thread + private final Thread producerThread; + private volatile boolean closed = false; + + /** + * Creates a new FlinkArrowFFIExporter. + * + * @param rowIterator Iterator over RowData to export + * @param rowType The Flink row type + */ + public FlinkArrowFFIExporter(Iterator<RowData> rowIterator, RowType rowType) { + this.rowIterator = rowIterator; + this.rowType = rowType; + this.arrowSchema = FlinkArrowUtils.toArrowSchema(rowType); + this.emptyDictionaryProvider = new DictionaryProvider.MapDictionaryProvider(); + + // Get configuration + AuronConfiguration config = AuronAdaptor.getInstance().getAuronConfiguration(); + this.maxBatchNumRows = config.getInteger(AuronConfiguration.BATCH_SIZE); + this.maxBatchMemorySize = 8 * 1024 * 1024; // 8MB default, same as Spark + + this.outputQueue = new ArrayBlockingQueue<>(4); + this.processingQueue = new ArrayBlockingQueue<>(4); + + this.producerThread = startProducerThread(); + } + + /** + * Starts the producer thread that creates batches asynchronously. + */ + private Thread startProducerThread() { + Thread thread = new Thread( + () -> { + try { + while (!closed && !Thread.currentThread().isInterrupted()) { + if (!rowIterator.hasNext()) { + outputQueue.put(new Finished(null)); + return; + } + + // Create a new batch + BufferAllocator allocator = + FlinkArrowUtils.createChildAllocator("FlinkArrowFFIExporter-producer"); + VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, allocator); + FlinkArrowWriter writer = FlinkArrowWriter.create(root, rowType); + + // Fill the batch with data + while (!closed + && rowIterator.hasNext() + && allocator.getAllocatedMemory() < maxBatchMemorySize + && writer.currentCount() < maxBatchNumRows) { + writer.write(rowIterator.next()); + } + writer.finish(); + + // Put batch in output queue for consumer + outputQueue.put(new NextBatch(root, allocator)); + + // Wait for consumer to confirm it's done with previous batch + // This is critical for safe resource management! + processingQueue.take(); + } + outputQueue.put(new Finished(null)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + // Normal interruption, not an error + } catch (Throwable e) { + outputQueue.clear(); + try { + outputQueue.put(new Finished(e)); + } catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + } + } + }, + "FlinkArrowFFIExporter-producer"); + + thread.setDaemon(true); + thread.setUncaughtExceptionHandler((t, e) -> { + outputQueue.clear(); + try { + outputQueue.put(new Finished(e)); + } catch (InterruptedException ignored) { + // Ignore + } Review Comment: The uncaught exception handler at lines 168-175 attempts to put into outputQueue after clearing it. However, if the queue is full or if another thread is waiting on the queue, this could cause issues. Additionally, this handler may be redundant since the producer thread's try-catch block already handles exceptions. Consider whether this handler is necessary or if it should be simplified. ```suggestion // Use non-blocking offer here to avoid potential deadlock in an exceptional state. outputQueue.offer(new Finished(e)); ``` ########## auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowFFIExporter.java: ########## @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.auron.flink.arrow; + +import java.util.Iterator; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import org.apache.arrow.c.ArrowArray; +import org.apache.arrow.c.ArrowSchema; +import org.apache.arrow.c.Data; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.dictionary.DictionaryProvider; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.auron.arrowio.AuronArrowFFIExporter; +import org.apache.auron.configuration.AuronConfiguration; +import org.apache.auron.jni.AuronAdaptor; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; + +/** + * Exports Flink RowData to Arrow format via FFI (Foreign Function Interface) + * for consumption by native code. + * + * <p>This exporter uses an asynchronous producer-consumer model with double + * queues to ensure safe resource management. The producer thread creates + * batches ahead of time while the consumer (native side) processes them. + * Resources are only cleaned up after the consumer confirms it has finished + * using the previous batch. + * + * <p>Key design points: + * <ul> + * <li>Producer thread pre-creates batches and puts them in outputQueue</li> + * <li>Consumer takes batches and signals via processingQueue when done</li> + * <li>Previous batch resources are cleaned up only after consumer confirms</li> + * <li>No TaskContext in Flink, so cancellation uses closed flag + thread interrupt</li> + * </ul> + */ +public class FlinkArrowFFIExporter extends AuronArrowFFIExporter { + + /** Queue state representing a data batch ready for export. */ + private static final class NextBatch { + final VectorSchemaRoot root; + final BufferAllocator allocator; + + NextBatch(VectorSchemaRoot root, BufferAllocator allocator) { + this.root = root; + this.allocator = allocator; + } + } + + /** Queue state representing end of data or an error. */ + private static final class Finished { + final Throwable error; // null means normal completion + + Finished(Throwable error) { + this.error = error; + } + } + + private final Iterator<RowData> rowIterator; + private final RowType rowType; + private final Schema arrowSchema; + private final DictionaryProvider.MapDictionaryProvider emptyDictionaryProvider; + private final int maxBatchNumRows; + private final long maxBatchMemorySize; + + // Double queue synchronization (capacity 4, smaller than Spark's 16 for streaming) + private final BlockingQueue<Object> outputQueue; + private final BlockingQueue<Object> processingQueue; + + // Previous batch resources (cleaned up after consumer confirms) + private VectorSchemaRoot previousRoot; + private BufferAllocator previousAllocator; + + // Producer thread + private final Thread producerThread; + private volatile boolean closed = false; + + /** + * Creates a new FlinkArrowFFIExporter. + * + * @param rowIterator Iterator over RowData to export + * @param rowType The Flink row type + */ + public FlinkArrowFFIExporter(Iterator<RowData> rowIterator, RowType rowType) { + this.rowIterator = rowIterator; + this.rowType = rowType; + this.arrowSchema = FlinkArrowUtils.toArrowSchema(rowType); + this.emptyDictionaryProvider = new DictionaryProvider.MapDictionaryProvider(); + + // Get configuration + AuronConfiguration config = AuronAdaptor.getInstance().getAuronConfiguration(); + this.maxBatchNumRows = config.getInteger(AuronConfiguration.BATCH_SIZE); + this.maxBatchMemorySize = 8 * 1024 * 1024; // 8MB default, same as Spark + + this.outputQueue = new ArrayBlockingQueue<>(4); + this.processingQueue = new ArrayBlockingQueue<>(4); + + this.producerThread = startProducerThread(); + } + + /** + * Starts the producer thread that creates batches asynchronously. + */ + private Thread startProducerThread() { + Thread thread = new Thread( + () -> { + try { + while (!closed && !Thread.currentThread().isInterrupted()) { + if (!rowIterator.hasNext()) { + outputQueue.put(new Finished(null)); + return; + } + + // Create a new batch + BufferAllocator allocator = + FlinkArrowUtils.createChildAllocator("FlinkArrowFFIExporter-producer"); + VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, allocator); + FlinkArrowWriter writer = FlinkArrowWriter.create(root, rowType); + + // Fill the batch with data + while (!closed + && rowIterator.hasNext() + && allocator.getAllocatedMemory() < maxBatchMemorySize + && writer.currentCount() < maxBatchNumRows) { + writer.write(rowIterator.next()); + } + writer.finish(); + + // Put batch in output queue for consumer + outputQueue.put(new NextBatch(root, allocator)); + Review Comment: If an exception occurs after a batch is created (lines 131-134) but before it's successfully added to the queue or if the thread is interrupted during the fill operation (lines 137-143), the VectorSchemaRoot and BufferAllocator resources may leak. Consider wrapping the batch creation and queue insertion in a try-catch block that ensures cleanup of the current batch resources on failure. ```suggestion boolean batchEnqueued = false; try { // Fill the batch with data while (!closed && rowIterator.hasNext() && allocator.getAllocatedMemory() < maxBatchMemorySize && writer.currentCount() < maxBatchNumRows) { writer.write(rowIterator.next()); } writer.finish(); // Put batch in output queue for consumer outputQueue.put(new NextBatch(root, allocator)); batchEnqueued = true; } finally { // If the batch was not successfully enqueued, we must release resources here if (!batchEnqueued) { try { root.close(); } finally { allocator.close(); } } } ``` ########## auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowWriterTest.java: ########## @@ -0,0 +1,376 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.auron.flink.arrow; + +import static org.junit.jupiter.api.Assertions.*; + +import java.util.HashMap; +import java.util.Map; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.BooleanType; +import org.apache.flink.table.types.logical.DoubleType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.TimeType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.VarCharType; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +/** Unit tests for FlinkArrowWriter. */ +public class FlinkArrowWriterTest { + + private BufferAllocator allocator; + + @BeforeEach + public void setUp() { + allocator = FlinkArrowUtils.createChildAllocator("test"); + } + + @AfterEach + public void tearDown() { + allocator.close(); + } + + @Test + public void testWriteBasicTypes() { + RowType rowType = RowType.of( + new LogicalType[] {new IntType(), new VarCharType(100), new DoubleType(), new BooleanType()}, + new String[] {"id", "name", "score", "active"}); + + Schema arrowSchema = FlinkArrowUtils.toArrowSchema(rowType); + try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, allocator)) { + FlinkArrowWriter writer = FlinkArrowWriter.create(root, rowType); + + // Write first row + GenericRowData row1 = new GenericRowData(4); + row1.setField(0, 1); + row1.setField(1, StringData.fromString("Alice")); + row1.setField(2, 95.5); + row1.setField(3, true); + writer.write(row1); + + // Write second row + GenericRowData row2 = new GenericRowData(4); + row2.setField(0, 2); + row2.setField(1, StringData.fromString("Bob")); + row2.setField(2, 87.3); + row2.setField(3, false); + writer.write(row2); + + writer.finish(); + + assertEquals(2, root.getRowCount()); + assertEquals(1, root.getVector("id").getObject(0)); + assertEquals(2, root.getVector("id").getObject(1)); + } + } + + @Test + public void testWriteNullValues() { + RowType rowType = + RowType.of(new LogicalType[] {new IntType(), new VarCharType(100)}, new String[] {"id", "name"}); + + Schema arrowSchema = FlinkArrowUtils.toArrowSchema(rowType); + try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, allocator)) { + FlinkArrowWriter writer = FlinkArrowWriter.create(root, rowType); + + // Write row with null values + GenericRowData row = new GenericRowData(2); + row.setField(0, null); + row.setField(1, null); + writer.write(row); + + writer.finish(); + + assertEquals(1, root.getRowCount()); + assertTrue(root.getVector("id").isNull(0)); + assertTrue(root.getVector("name").isNull(0)); + } + } + + @Test + public void testWriteArrayType() { + RowType rowType = RowType.of(new LogicalType[] {new ArrayType(new IntType())}, new String[] {"numbers"}); + + Schema arrowSchema = FlinkArrowUtils.toArrowSchema(rowType); + try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, allocator)) { + FlinkArrowWriter writer = FlinkArrowWriter.create(root, rowType); + + GenericRowData row = new GenericRowData(1); + row.setField(0, new GenericArrayData(new Integer[] {1, 2, 3})); + writer.write(row); + + writer.finish(); + + assertEquals(1, root.getRowCount()); + } + } + + @Test + public void testWriteMapType() { + RowType rowType = RowType.of( + new LogicalType[] {new MapType(new VarCharType(100), new IntType())}, new String[] {"scores"}); + + Schema arrowSchema = FlinkArrowUtils.toArrowSchema(rowType); + try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, allocator)) { + FlinkArrowWriter writer = FlinkArrowWriter.create(root, rowType); + + Map<StringData, Integer> mapData = new HashMap<>(); + mapData.put(StringData.fromString("math"), 90); + mapData.put(StringData.fromString("english"), 85); + + GenericRowData row = new GenericRowData(1); + row.setField(0, new GenericMapData(mapData)); + writer.write(row); + + writer.finish(); + + assertEquals(1, root.getRowCount()); + } + } + + @Test + public void testTimestampPrecision() { + RowType rowType = RowType.of(new LogicalType[] {new TimestampType(6)}, new String[] {"ts"}); + + Schema arrowSchema = FlinkArrowUtils.toArrowSchema(rowType); + try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, allocator)) { + FlinkArrowWriter writer = FlinkArrowWriter.create(root, rowType); + + // Create timestamp with microsecond precision + long millis = 1705622400000L; // 2024-01-19 00:00:00.000 + int nanos = 123456; // 123.456 microseconds + + GenericRowData row = new GenericRowData(1); + row.setField(0, TimestampData.fromEpochMillis(millis, nanos)); + writer.write(row); + + writer.finish(); + + assertEquals(1, root.getRowCount()); + // Verify the timestamp is written in microseconds + Object value = root.getVector("ts").getObject(0); + assertNotNull(value); + } + } + + @Test + public void testReset() { + RowType rowType = RowType.of(new LogicalType[] {new IntType()}, new String[] {"id"}); + + Schema arrowSchema = FlinkArrowUtils.toArrowSchema(rowType); + try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, allocator)) { + FlinkArrowWriter writer = FlinkArrowWriter.create(root, rowType); + + // Write first batch + GenericRowData row1 = new GenericRowData(1); + row1.setField(0, 1); + writer.write(row1); + writer.finish(); + assertEquals(1, root.getRowCount()); + + // Reset and write second batch + writer.reset(); + GenericRowData row2 = new GenericRowData(1); + row2.setField(0, 2); + writer.write(row2); + writer.finish(); + assertEquals(1, root.getRowCount()); + assertEquals(2, root.getVector("id").getObject(0)); + } + } + + @Test + public void testNestedArrayType() { + // Test Array<Array<Int>> + RowType rowType = RowType.of( + new LogicalType[] {new ArrayType(new ArrayType(new IntType()))}, new String[] {"nested_array"}); + + Schema arrowSchema = FlinkArrowUtils.toArrowSchema(rowType); + try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, allocator)) { + FlinkArrowWriter writer = FlinkArrowWriter.create(root, rowType); + + // Create nested array: [[1, 2], [3, 4, 5]] + GenericArrayData innerArray1 = new GenericArrayData(new Integer[] {1, 2}); + GenericArrayData innerArray2 = new GenericArrayData(new Integer[] {3, 4, 5}); + GenericArrayData outerArray = new GenericArrayData(new Object[] {innerArray1, innerArray2}); + + GenericRowData row = new GenericRowData(1); + row.setField(0, outerArray); + writer.write(row); + writer.finish(); + + assertEquals(1, root.getRowCount()); + assertNotNull(root.getVector("nested_array").getObject(0)); + } + } + + @Test + public void testArrayOfMapType() { + // Test Array<Map<String, Int>> + MapType mapType = new MapType(new VarCharType(100), new IntType()); + RowType rowType = RowType.of(new LogicalType[] {new ArrayType(mapType)}, new String[] {"array_of_map"}); + + Schema arrowSchema = FlinkArrowUtils.toArrowSchema(rowType); + try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, allocator)) { + FlinkArrowWriter writer = FlinkArrowWriter.create(root, rowType); + + // Create array of maps: [{a: 1, b: 2}, {c: 3}] + Map<StringData, Integer> map1 = new HashMap<>(); + map1.put(StringData.fromString("a"), 1); + map1.put(StringData.fromString("b"), 2); + + Map<StringData, Integer> map2 = new HashMap<>(); + map2.put(StringData.fromString("c"), 3); + + GenericMapData genericMap1 = new GenericMapData(map1); + GenericMapData genericMap2 = new GenericMapData(map2); + GenericArrayData arrayOfMaps = new GenericArrayData(new Object[] {genericMap1, genericMap2}); + + GenericRowData row = new GenericRowData(1); + row.setField(0, arrayOfMaps); + writer.write(row); + writer.finish(); + + assertEquals(1, root.getRowCount()); + assertNotNull(root.getVector("array_of_map").getObject(0)); + } + } + + @Test + public void testWriteEmptyBatch() { + RowType rowType = RowType.of(new LogicalType[] {new IntType()}, new String[] {"id"}); + Schema arrowSchema = FlinkArrowUtils.toArrowSchema(rowType); + + try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, allocator)) { + FlinkArrowWriter writer = FlinkArrowWriter.create(root, rowType); + writer.finish(); + assertEquals(0, root.getRowCount()); + } + } + + @Test + public void testArrayWithNullElements() { + RowType rowType = RowType.of(new LogicalType[] {new ArrayType(new IntType())}, new String[] {"numbers"}); + + Schema arrowSchema = FlinkArrowUtils.toArrowSchema(rowType); + try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, allocator)) { + FlinkArrowWriter writer = FlinkArrowWriter.create(root, rowType); + + // Array with null element: [1, null, 3] + GenericRowData row = new GenericRowData(1); + row.setField(0, new GenericArrayData(new Integer[] {1, null, 3})); + writer.write(row); + writer.finish(); + + assertEquals(1, root.getRowCount()); + } + } + + @Test + public void testNullStruct() { + RowType innerType = RowType.of(new LogicalType[] {new IntType()}, new String[] {"value"}); + RowType rowType = RowType.of(new LogicalType[] {innerType}, new String[] {"struct_field"}); + + Schema arrowSchema = FlinkArrowUtils.toArrowSchema(rowType); + try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, allocator)) { + FlinkArrowWriter writer = FlinkArrowWriter.create(root, rowType); + + // Row with null struct + GenericRowData row = new GenericRowData(1); + row.setField(0, null); + writer.write(row); + writer.finish(); + + assertEquals(1, root.getRowCount()); + assertTrue(root.getVector("struct_field").isNull(0)); + } + } + + @Test + public void testWriteTimeType() { + RowType rowType = RowType.of(new LogicalType[] {new TimeType(3)}, new String[] {"time"}); + + Schema arrowSchema = FlinkArrowUtils.toArrowSchema(rowType); + try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, allocator)) { + FlinkArrowWriter writer = FlinkArrowWriter.create(root, rowType); + + // Write time value (milliseconds since midnight) + GenericRowData row = new GenericRowData(1); + row.setField(0, 43200000); // 12:00:00.000 = 12 hours * 60 * 60 * 1000 ms + writer.write(row); + + // Write null time + GenericRowData nullRow = new GenericRowData(1); + nullRow.setField(0, null); + writer.write(nullRow); + + writer.finish(); + + assertEquals(2, root.getRowCount()); + // Verify the time is written in microseconds (43200000 ms * 1000 = 43200000000 us) + Object value = root.getVector("time").getObject(0); + assertNotNull(value); + assertEquals(43200000000L, value); + assertTrue(root.getVector("time").isNull(1)); + } + } + + @Test + public void testWriteLocalZonedTimestampType() { + RowType rowType = RowType.of(new LogicalType[] {new LocalZonedTimestampType(6)}, new String[] {"lz_ts"}); + + Schema arrowSchema = FlinkArrowUtils.toArrowSchema(rowType); + try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, allocator)) { + FlinkArrowWriter writer = FlinkArrowWriter.create(root, rowType); + + // Create timestamp with microsecond precision + long millis = 1705622400000L; // 2024-01-19 00:00:00.000 + int nanos = 123456; // 123.456 microseconds Review Comment: The comment states "123.456 microseconds" but the value 123456 represents nanoseconds, not microseconds. In the context of TimestampData.fromEpochMillis(), the second parameter is the nanosecond-of-millisecond value (0-999999). The comment should be corrected to "123456 nanoseconds" or "123.456 microseconds worth of nanoseconds". ```suggestion int nanos = 123456; // 123456 nanoseconds (123.456 microseconds) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
