x-tong commented on code in PR #1930: URL: https://github.com/apache/auron/pull/1930#discussion_r2708946841
########## auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowFFIExporterTest.java: ########## @@ -0,0 +1,365 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.auron.flink.arrow; + +import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assumptions.assumeTrue; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import org.apache.arrow.c.ArrowArray; +import org.apache.arrow.c.ArrowSchema; +import org.apache.arrow.c.Data; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.VarCharType; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +/** Unit tests for FlinkArrowFFIExporter. */ +public class FlinkArrowFFIExporterTest { + + private BufferAllocator testAllocator; + private static Boolean nativeLibrariesAvailable; + + /** + * Check if Arrow FFI native libraries are available. + * FFI operations require JNI, which needs native libraries. + */ + private static boolean isNativeLibraryAvailable() { + if (nativeLibrariesAvailable == null) { + try { + // Try to load the JNI library - JniWrapper.get() triggers ensureLoaded() + org.apache.arrow.c.jni.JniWrapper.get(); + nativeLibrariesAvailable = true; + } catch (Throwable e) { + // Catch all throwables including IllegalStateException, UnsatisfiedLinkError, etc. + nativeLibrariesAvailable = false; + } + } + return nativeLibrariesAvailable; + } + + @BeforeEach + public void setUp() { + testAllocator = new RootAllocator(Long.MAX_VALUE); + } + + @AfterEach + public void tearDown() { + testAllocator.close(); + } + + @Test + public void testExportSchemaCreatesExporter() { + RowType rowType = + RowType.of(new LogicalType[] {new IntType(), new VarCharType(100)}, new String[] {"id", "name"}); + + Iterator<RowData> emptyIter = Collections.emptyIterator(); + + try (FlinkArrowFFIExporter exporter = new FlinkArrowFFIExporter(emptyIter, rowType)) { + assertNotNull(exporter); + } + } + + @Test + public void testExporterWithEmptyIterator() { + RowType rowType = RowType.of(new LogicalType[] {new IntType()}, new String[] {"id"}); + Iterator<RowData> emptyIter = Collections.emptyIterator(); + + try (FlinkArrowFFIExporter exporter = new FlinkArrowFFIExporter(emptyIter, rowType)) { + assertNotNull(exporter); + } + } + + @Test + public void testExporterWithMultipleRows() { + RowType rowType = RowType.of(new LogicalType[] {new IntType()}, new String[] {"id"}); + + GenericRowData row1 = new GenericRowData(1); + row1.setField(0, 1); + GenericRowData row2 = new GenericRowData(1); + row2.setField(0, 2); + + Iterator<RowData> iter = Arrays.asList((RowData) row1, row2).iterator(); + + try (FlinkArrowFFIExporter exporter = new FlinkArrowFFIExporter(iter, rowType)) { + assertNotNull(exporter); + } + } + + @Test + public void testExporterWithComplexTypes() { + RowType rowType = + RowType.of(new LogicalType[] {new IntType(), new VarCharType(100)}, new String[] {"id", "name"}); + + GenericRowData row = new GenericRowData(2); + row.setField(0, 42); + row.setField(1, StringData.fromString("test")); + + Iterator<RowData> iter = Collections.singletonList((RowData) row).iterator(); + + try (FlinkArrowFFIExporter exporter = new FlinkArrowFFIExporter(iter, rowType)) { + assertNotNull(exporter); + } + } + + @Test + public void testExporterCloseIsIdempotent() { + RowType rowType = RowType.of(new LogicalType[] {new IntType()}, new String[] {"id"}); + Iterator<RowData> emptyIter = Collections.emptyIterator(); + + FlinkArrowFFIExporter exporter = new FlinkArrowFFIExporter(emptyIter, rowType); + exporter.close(); + exporter.close(); + } + + @Test + public void testExportSchemaWithArrowFFI() { + assumeTrue(isNativeLibraryAvailable(), "Skipping test: Arrow FFI native libraries not available"); + + RowType rowType = + RowType.of(new LogicalType[] {new IntType(), new VarCharType(100)}, new String[] {"id", "name"}); + + try (FlinkArrowFFIExporter exporter = new FlinkArrowFFIExporter(Collections.emptyIterator(), rowType); + ArrowSchema arrowSchema = ArrowSchema.allocateNew(testAllocator)) { + + exporter.exportSchema(arrowSchema.memoryAddress()); + + org.apache.arrow.vector.types.pojo.Schema schema = Data.importSchema(testAllocator, arrowSchema, null); + assertEquals(2, schema.getFields().size()); + assertEquals("id", schema.getFields().get(0).getName()); + assertEquals("name", schema.getFields().get(1).getName()); + } + } + + @Test + public void testExportNextBatchEndToEnd() { + assumeTrue(isNativeLibraryAvailable(), "Skipping test: Arrow FFI native libraries not available"); + + RowType rowType = + RowType.of(new LogicalType[] {new IntType(), new VarCharType(100)}, new String[] {"id", "name"}); + + List<RowData> rows = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + GenericRowData row = new GenericRowData(2); + row.setField(0, i); + row.setField(1, StringData.fromString("value" + i)); + rows.add(row); + } + + try (FlinkArrowFFIExporter exporter = new FlinkArrowFFIExporter(rows.iterator(), rowType); + ArrowArray arrowArray = ArrowArray.allocateNew(testAllocator); + ArrowSchema arrowSchema = ArrowSchema.allocateNew(testAllocator)) { + + // Export schema + exporter.exportSchema(arrowSchema.memoryAddress()); + + // Export batch + assertTrue(exporter.exportNextBatch(arrowArray.memoryAddress())); + + // Import and verify - use ArrowSchema directly + try (VectorSchemaRoot root = Data.importVectorSchemaRoot(testAllocator, arrowArray, arrowSchema, null)) { + assertTrue(root.getRowCount() > 0); + + // Verify data + IntVector idVector = (IntVector) root.getVector("id"); + VarCharVector nameVector = (VarCharVector) root.getVector("name"); + + assertEquals(0, idVector.get(0)); + assertEquals("value0", new String(nameVector.get(0))); + } + } + } + + @Test + public void testExportEmptyIteratorReturnsFalse() { + assumeTrue(isNativeLibraryAvailable(), "Skipping test: Arrow FFI native libraries not available"); + + RowType rowType = RowType.of(new LogicalType[] {new IntType()}, new String[] {"id"}); + + try (FlinkArrowFFIExporter exporter = new FlinkArrowFFIExporter(Collections.emptyIterator(), rowType); + ArrowArray arrowArray = ArrowArray.allocateNew(testAllocator)) { + + // Empty iterator should return false on first call + assertFalse(exporter.exportNextBatch(arrowArray.memoryAddress())); + } + } + + @Test + public void testExportMultipleBatches() { + assumeTrue(isNativeLibraryAvailable(), "Skipping test: Arrow FFI native libraries not available"); + + RowType rowType = RowType.of(new LogicalType[] {new IntType()}, new String[] {"id"}); + + // Create enough rows to potentially span multiple batches + List<RowData> rows = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + GenericRowData row = new GenericRowData(1); + row.setField(0, i); + rows.add(row); + } + + int totalRowsExported = 0; + + try (FlinkArrowFFIExporter exporter = new FlinkArrowFFIExporter(rows.iterator(), rowType); + ArrowSchema arrowSchema = ArrowSchema.allocateNew(testAllocator)) { + + exporter.exportSchema(arrowSchema.memoryAddress()); + + // Export all batches + while (true) { + try (ArrowArray arrowArray = ArrowArray.allocateNew(testAllocator); + ArrowSchema batchSchema = ArrowSchema.allocateNew(testAllocator)) { + // Re-export schema for each batch since ArrowSchema is consumed + exporter.exportSchema(batchSchema.memoryAddress()); + + if (!exporter.exportNextBatch(arrowArray.memoryAddress())) { + break; + } + + try (VectorSchemaRoot root = + Data.importVectorSchemaRoot(testAllocator, arrowArray, batchSchema, null)) { Review Comment: The current Arrow Java version's Data.importVectorSchemaRoot() API does not accept an overloaded Schema object. This is a design limitation of the Arrow C Data Interface – ownership is transferred at the time of import. ########## auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowWriterTest.java: ########## @@ -0,0 +1,376 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.auron.flink.arrow; + +import static org.junit.jupiter.api.Assertions.*; + +import java.util.HashMap; +import java.util.Map; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.BooleanType; +import org.apache.flink.table.types.logical.DoubleType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.TimeType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.VarCharType; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +/** Unit tests for FlinkArrowWriter. */ +public class FlinkArrowWriterTest { + + private BufferAllocator allocator; + + @BeforeEach + public void setUp() { + allocator = FlinkArrowUtils.createChildAllocator("test"); + } + + @AfterEach + public void tearDown() { + allocator.close(); + } + + @Test + public void testWriteBasicTypes() { + RowType rowType = RowType.of( + new LogicalType[] {new IntType(), new VarCharType(100), new DoubleType(), new BooleanType()}, + new String[] {"id", "name", "score", "active"}); + + Schema arrowSchema = FlinkArrowUtils.toArrowSchema(rowType); + try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, allocator)) { + FlinkArrowWriter writer = FlinkArrowWriter.create(root, rowType); + + // Write first row + GenericRowData row1 = new GenericRowData(4); + row1.setField(0, 1); + row1.setField(1, StringData.fromString("Alice")); + row1.setField(2, 95.5); + row1.setField(3, true); + writer.write(row1); + + // Write second row + GenericRowData row2 = new GenericRowData(4); + row2.setField(0, 2); + row2.setField(1, StringData.fromString("Bob")); + row2.setField(2, 87.3); + row2.setField(3, false); + writer.write(row2); + + writer.finish(); + + assertEquals(2, root.getRowCount()); + assertEquals(1, root.getVector("id").getObject(0)); + assertEquals(2, root.getVector("id").getObject(1)); + } + } + + @Test + public void testWriteNullValues() { + RowType rowType = + RowType.of(new LogicalType[] {new IntType(), new VarCharType(100)}, new String[] {"id", "name"}); + + Schema arrowSchema = FlinkArrowUtils.toArrowSchema(rowType); + try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, allocator)) { + FlinkArrowWriter writer = FlinkArrowWriter.create(root, rowType); + + // Write row with null values + GenericRowData row = new GenericRowData(2); + row.setField(0, null); + row.setField(1, null); + writer.write(row); + + writer.finish(); + + assertEquals(1, root.getRowCount()); + assertTrue(root.getVector("id").isNull(0)); + assertTrue(root.getVector("name").isNull(0)); + } + } + + @Test + public void testWriteArrayType() { + RowType rowType = RowType.of(new LogicalType[] {new ArrayType(new IntType())}, new String[] {"numbers"}); + + Schema arrowSchema = FlinkArrowUtils.toArrowSchema(rowType); + try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, allocator)) { + FlinkArrowWriter writer = FlinkArrowWriter.create(root, rowType); + + GenericRowData row = new GenericRowData(1); + row.setField(0, new GenericArrayData(new Integer[] {1, 2, 3})); + writer.write(row); + + writer.finish(); + + assertEquals(1, root.getRowCount()); + } + } + + @Test + public void testWriteMapType() { + RowType rowType = RowType.of( + new LogicalType[] {new MapType(new VarCharType(100), new IntType())}, new String[] {"scores"}); + + Schema arrowSchema = FlinkArrowUtils.toArrowSchema(rowType); + try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, allocator)) { + FlinkArrowWriter writer = FlinkArrowWriter.create(root, rowType); + + Map<StringData, Integer> mapData = new HashMap<>(); + mapData.put(StringData.fromString("math"), 90); + mapData.put(StringData.fromString("english"), 85); + + GenericRowData row = new GenericRowData(1); + row.setField(0, new GenericMapData(mapData)); + writer.write(row); + + writer.finish(); + + assertEquals(1, root.getRowCount()); + } + } + + @Test + public void testTimestampPrecision() { + RowType rowType = RowType.of(new LogicalType[] {new TimestampType(6)}, new String[] {"ts"}); + + Schema arrowSchema = FlinkArrowUtils.toArrowSchema(rowType); + try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, allocator)) { + FlinkArrowWriter writer = FlinkArrowWriter.create(root, rowType); + + // Create timestamp with microsecond precision + long millis = 1705622400000L; // 2024-01-19 00:00:00.000 + int nanos = 123456; // 123.456 microseconds Review Comment: I will fix it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
