This is an automated email from the ASF dual-hosted git repository.

zhangmang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git


The following commit(s) were added to refs/heads/master by this push:
     new a9adc70b [AURON #1850] Add FlinkArrowUtils for Flink-Arrow type 
conversion (#1959)
a9adc70b is described below

commit a9adc70b4768aaa27bcc9bd19e3987af2171a6eb
Author: xTong <[email protected]>
AuthorDate: Wed Feb 4 16:15:40 2026 +0800

    [AURON #1850] Add FlinkArrowUtils for Flink-Arrow type conversion (#1959)
    
    ## Summary
    
    Part 1/3 of Flink RowData to Arrow conversion implementation (split from
    #1930).
    
    This PR adds the foundational type conversion utilities:
    - `FlinkArrowUtils`: Conversion form Flink RowData to Arrow types
    - Support for all common Flink types including primitives, temporal, and
    complex types
    - Comprehensive unit tests for type conversion
    
    ## Follow-up PRs
    - Part 2: FlinkArrowFieldWriter + FlinkArrowWriter
    - Part 3: FlinkArrowFFIExporter
    
    ## Test plan
    - [x] Unit tests for FlinkArrowUtils type conversion
    - [x] Build passes with `./auron-build.sh --pre --sparkver 3.5
    --scalaver 2.12 -Pflink-1.18 -DskipBuildNative`
    - [x] Code formatted with `./dev/reformat`
    
    Related: #1850
---
 .gitignore                                         |   5 +-
 auron-flink-extension/auron-flink-runtime/pom.xml  |  35 +++
 .../apache/auron/flink/arrow/FlinkArrowUtils.java  | 223 +++++++++++++++++
 .../auron/flink/arrow/FlinkArrowUtilsTest.java     | 268 +++++++++++++++++++++
 4 files changed, 530 insertions(+), 1 deletion(-)

diff --git a/.gitignore b/.gitignore
index 9bb8dcb5..9f5a6a26 100644
--- a/.gitignore
+++ b/.gitignore
@@ -60,4 +60,7 @@ common/src/main/resources/auron-build-info.properties
 
 
 .flattened-pom.xml
-dependency-reduced-pom.xml
\ No newline at end of file
+dependency-reduced-pom.xml
+
+#lsp
+*.prefs
\ No newline at end of file
diff --git a/auron-flink-extension/auron-flink-runtime/pom.xml 
b/auron-flink-extension/auron-flink-runtime/pom.xml
index 3b5dfea2..4998e04c 100644
--- a/auron-flink-extension/auron-flink-runtime/pom.xml
+++ b/auron-flink-extension/auron-flink-runtime/pom.xml
@@ -38,6 +38,41 @@
       <artifactId>proto</artifactId>
       <version>${project.version}</version>
     </dependency>
+
+    <!-- Arrow dependencies -->
+    <dependency>
+      <groupId>org.apache.arrow</groupId>
+      <artifactId>arrow-c-data</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.arrow</groupId>
+      <artifactId>arrow-memory-unsafe</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.arrow</groupId>
+      <artifactId>arrow-vector</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-table-common</artifactId>
+      <version>${flink.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <!-- Test dependencies -->
+    <dependency>
+      <groupId>org.apache.auron</groupId>
+      <artifactId>auron-core</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.junit.jupiter</groupId>
+      <artifactId>junit-jupiter-api</artifactId>
+      <version>${junit.jupiter.version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
 </project>
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java
new file mode 100644
index 00000000..0763a847
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.complex.MapVector;
+import org.apache.arrow.vector.types.DateUnit;
+import org.apache.arrow.vector.types.FloatingPointPrecision;
+import org.apache.arrow.vector.types.TimeUnit;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.NullType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+/**
+ * Utility class for converting Flink {@link LogicalType} instances to Arrow 
types, fields and schemas.
+ */
+public final class FlinkArrowUtils {
+
+    /**
+     * Root allocator for Arrow memory management.
+     */
+    public static final RootAllocator ROOT_ALLOCATOR = new 
RootAllocator(Long.MAX_VALUE);
+
+    static {
+        Runtime.getRuntime().addShutdownHook(new 
Thread(ROOT_ALLOCATOR::close));
+    }
+
+    /**
+     * Creates a child allocator from the root allocator.
+     *
+     * @param name Name for the child allocator
+     * @return A new child allocator
+     */
+    public static BufferAllocator createChildAllocator(String name) {
+        return ROOT_ALLOCATOR.newChildAllocator(name, 0, Long.MAX_VALUE);
+    }
+
+    /**
+     * Converts a Flink LogicalType to Arrow ArrowType.
+     *
+     * @param logicalType The Flink logical type
+     * @return The corresponding Arrow type
+     * @throws UnsupportedOperationException if the type is not supported
+     */
+    public static ArrowType toArrowType(LogicalType logicalType) {
+        if (logicalType == null) {
+            throw new IllegalArgumentException("logicalType cannot be null");
+        }
+        if (logicalType instanceof NullType) {
+            return ArrowType.Null.INSTANCE;
+        } else if (logicalType instanceof BooleanType) {
+            return ArrowType.Bool.INSTANCE;
+        } else if (logicalType instanceof TinyIntType) {
+            return new ArrowType.Int(8, true);
+        } else if (logicalType instanceof SmallIntType) {
+            return new ArrowType.Int(16, true);
+        } else if (logicalType instanceof IntType) {
+            return new ArrowType.Int(32, true);
+        } else if (logicalType instanceof BigIntType) {
+            return new ArrowType.Int(64, true);
+        } else if (logicalType instanceof FloatType) {
+            return new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE);
+        } else if (logicalType instanceof DoubleType) {
+            return new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE);
+        } else if (logicalType instanceof VarCharType || logicalType 
instanceof CharType) {
+            return ArrowType.Utf8.INSTANCE;
+        } else if (logicalType instanceof VarBinaryType || logicalType 
instanceof BinaryType) {
+            return ArrowType.Binary.INSTANCE;
+        } else if (logicalType instanceof DecimalType) {
+            DecimalType decimalType = (DecimalType) logicalType;
+            // Note: Arrow Java only has DecimalVector (128-bit) and 
Decimal256Vector (256-bit).
+            // There's no Decimal64Vector, so we always use 128-bit to match 
the actual storage.
+            // Setting bitWidth=64 would cause FFI export issues since the 
actual data is 128-bit.
+            return new ArrowType.Decimal(decimalType.getPrecision(), 
decimalType.getScale(), 128);
+        } else if (logicalType instanceof DateType) {
+            return new ArrowType.Date(DateUnit.DAY);
+        } else if (logicalType instanceof TimeType) {
+            TimeType timeType = (TimeType) logicalType;
+            int precision = timeType.getPrecision();
+            if (precision == 0) {
+                return new ArrowType.Time(TimeUnit.SECOND, 32);
+            } else if (precision >= 1 && precision <= 3) {
+                return new ArrowType.Time(TimeUnit.MILLISECOND, 32);
+            } else if (precision >= 4 && precision <= 6) {
+                return new ArrowType.Time(TimeUnit.MICROSECOND, 64);
+            } else {
+                return new ArrowType.Time(TimeUnit.NANOSECOND, 64);
+            }
+        } else if (logicalType instanceof TimestampType) {
+            TimestampType timestampType = (TimestampType) logicalType;
+            int precision = timestampType.getPrecision();
+            if (precision == 0) {
+                return new ArrowType.Timestamp(TimeUnit.SECOND, null);
+            } else if (precision >= 1 && precision <= 3) {
+                return new ArrowType.Timestamp(TimeUnit.MILLISECOND, null);
+            } else if (precision >= 4 && precision <= 6) {
+                return new ArrowType.Timestamp(TimeUnit.MICROSECOND, null);
+            } else {
+                return new ArrowType.Timestamp(TimeUnit.NANOSECOND, null);
+            }
+        } else if (logicalType instanceof LocalZonedTimestampType) {
+            LocalZonedTimestampType localZonedTimestampType = 
(LocalZonedTimestampType) logicalType;
+            int precision = localZonedTimestampType.getPrecision();
+            if (precision == 0) {
+                return new ArrowType.Timestamp(TimeUnit.SECOND, null);
+            } else if (precision >= 1 && precision <= 3) {
+                return new ArrowType.Timestamp(TimeUnit.MILLISECOND, null);
+            } else if (precision >= 4 && precision <= 6) {
+                return new ArrowType.Timestamp(TimeUnit.MICROSECOND, null);
+            } else {
+                return new ArrowType.Timestamp(TimeUnit.NANOSECOND, null);
+            }
+        } else {
+            throw new UnsupportedOperationException("Unsupported Flink type: " 
+ logicalType.asSummaryString());
+        }
+    }
+
+    /**
+     * Converts a Flink LogicalType to an Arrow Field.
+     *
+     * @param name        The field name
+     * @param logicalType The Flink logical type
+     * @return The corresponding Arrow Field
+     */
+    public static Field toArrowField(String name, LogicalType logicalType) {
+        boolean nullable = logicalType.isNullable();
+        if (logicalType instanceof ArrayType) {
+            ArrayType arrayType = (ArrayType) logicalType;
+            LogicalType elementType = arrayType.getElementType();
+            FieldType fieldType = new FieldType(nullable, 
ArrowType.List.INSTANCE, null);
+            Field elementField = toArrowField("element", elementType);
+            List<Field> children = new ArrayList<>();
+            children.add(elementField);
+            return new Field(name, fieldType, children);
+        } else if (logicalType instanceof RowType) {
+            RowType rowType = (RowType) logicalType;
+            FieldType fieldType = new FieldType(nullable, 
ArrowType.Struct.INSTANCE, null);
+            List<Field> children = new ArrayList<>();
+            for (RowType.RowField field : rowType.getFields()) {
+                children.add(toArrowField(field.getName(), field.getType()));
+            }
+            return new Field(name, fieldType, children);
+        } else if (logicalType instanceof MapType) {
+            MapType mapType = (MapType) logicalType;
+            LogicalType keyType = mapType.getKeyType();
+            LogicalType valueType = mapType.getValueType();
+
+            // Create entries field (struct<key, value>)
+            FieldType entriesFieldType = new FieldType(false, 
ArrowType.Struct.INSTANCE, null);
+            List<Field> entriesChildren = new ArrayList<>();
+            entriesChildren.add(toArrowField(MapVector.KEY_NAME, 
keyType.copy(false)));
+            entriesChildren.add(toArrowField(MapVector.VALUE_NAME, valueType));
+            Field entriesField = new Field(MapVector.DATA_VECTOR_NAME, 
entriesFieldType, entriesChildren);
+
+            // Create map field
+            FieldType mapFieldType = new FieldType(nullable, new 
ArrowType.Map(false), null);
+            List<Field> mapChildren = new ArrayList<>();
+            mapChildren.add(entriesField);
+            return new Field(name, mapFieldType, mapChildren);
+        } else {
+            ArrowType arrowType = toArrowType(logicalType);
+            FieldType fieldType = new FieldType(nullable, arrowType, null);
+            return new Field(name, fieldType, new ArrayList<>());
+        }
+    }
+
+    /**
+     * Converts a Flink RowType to an Arrow Schema.
+     *
+     * @param rowType The Flink row type
+     * @return The corresponding Arrow Schema
+     */
+    public static Schema toArrowSchema(RowType rowType) {
+        List<Field> fields = new ArrayList<>();
+        for (RowType.RowField field : rowType.getFields()) {
+            fields.add(toArrowField(field.getName(), field.getType()));
+        }
+        return new Schema(fields);
+    }
+
+    private FlinkArrowUtils() {
+        // Utility class
+    }
+}
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowUtilsTest.java
 
b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowUtilsTest.java
new file mode 100644
index 00000000..14116fb5
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowUtilsTest.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import org.apache.arrow.vector.types.DateUnit;
+import org.apache.arrow.vector.types.FloatingPointPrecision;
+import org.apache.arrow.vector.types.TimeUnit;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.NullType;
+import org.apache.flink.table.types.logical.RawType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.junit.jupiter.api.Test;
+
+/** Unit tests for FlinkArrowUtils. */
+public class FlinkArrowUtilsTest {
+
+    @Test
+    public void testBasicTypeConversion() {
+        // Null
+        assertEquals(ArrowType.Null.INSTANCE, FlinkArrowUtils.toArrowType(new 
NullType()));
+
+        // Boolean
+        assertEquals(ArrowType.Bool.INSTANCE, FlinkArrowUtils.toArrowType(new 
BooleanType()));
+
+        // Integer types
+        assertEquals(new ArrowType.Int(8, true), 
FlinkArrowUtils.toArrowType(new TinyIntType()));
+        assertEquals(new ArrowType.Int(16, true), 
FlinkArrowUtils.toArrowType(new SmallIntType()));
+        assertEquals(new ArrowType.Int(32, true), 
FlinkArrowUtils.toArrowType(new IntType()));
+        assertEquals(new ArrowType.Int(64, true), 
FlinkArrowUtils.toArrowType(new BigIntType()));
+
+        // Floating point types
+        assertEquals(
+                new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE),
+                FlinkArrowUtils.toArrowType(new FloatType()));
+        assertEquals(
+                new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE),
+                FlinkArrowUtils.toArrowType(new DoubleType()));
+
+        // String and binary types
+        assertEquals(ArrowType.Utf8.INSTANCE, FlinkArrowUtils.toArrowType(new 
VarCharType(100)));
+        assertEquals(ArrowType.Utf8.INSTANCE, FlinkArrowUtils.toArrowType(new 
CharType(10)));
+        assertEquals(ArrowType.Binary.INSTANCE, 
FlinkArrowUtils.toArrowType(new VarBinaryType(100)));
+        assertEquals(ArrowType.Binary.INSTANCE, 
FlinkArrowUtils.toArrowType(new BinaryType(10)));
+
+        // Decimal type
+        DecimalType decimalType = new DecimalType(10, 2);
+        ArrowType arrowDecimal = FlinkArrowUtils.toArrowType(decimalType);
+        assertTrue(arrowDecimal instanceof ArrowType.Decimal);
+        assertEquals(10, ((ArrowType.Decimal) arrowDecimal).getPrecision());
+        assertEquals(2, ((ArrowType.Decimal) arrowDecimal).getScale());
+        assertEquals(128, ((ArrowType.Decimal) arrowDecimal).getBitWidth());
+
+        // Date type
+        assertEquals(new ArrowType.Date(DateUnit.DAY), 
FlinkArrowUtils.toArrowType(new DateType()));
+    }
+
+    @Test
+    public void testArrayTypeConversion() {
+        ArrayType arrayType = new ArrayType(new IntType());
+        Field field = FlinkArrowUtils.toArrowField("test_array", arrayType);
+
+        assertEquals("test_array", field.getName());
+        assertTrue(field.isNullable());
+        assertTrue(field.getType() instanceof ArrowType.List);
+        assertEquals(1, field.getChildren().size());
+
+        Field elementField = field.getChildren().get(0);
+        assertEquals("element", elementField.getName());
+        assertTrue(elementField.getType() instanceof ArrowType.Int);
+    }
+
+    @Test
+    public void testRowTypeConversion() {
+        RowType rowType =
+                RowType.of(new LogicalType[] {new IntType(), new 
VarCharType(100)}, new String[] {"id", "name"});
+        // Create a non-nullable version of the row type
+        RowType nonNullableRowType = (RowType) rowType.copy(false);
+
+        Field field = FlinkArrowUtils.toArrowField("test_row", 
nonNullableRowType);
+
+        assertEquals("test_row", field.getName());
+        assertFalse(field.isNullable());
+        assertTrue(field.getType() instanceof ArrowType.Struct);
+        assertEquals(2, field.getChildren().size());
+
+        Field idField = field.getChildren().get(0);
+        assertEquals("id", idField.getName());
+        assertTrue(idField.getType() instanceof ArrowType.Int);
+
+        Field nameField = field.getChildren().get(1);
+        assertEquals("name", nameField.getName());
+        assertEquals(ArrowType.Utf8.INSTANCE, nameField.getType());
+    }
+
+    @Test
+    public void testMapTypeConversion() {
+        MapType mapType = new MapType(new VarCharType(100), new IntType());
+        Field field = FlinkArrowUtils.toArrowField("test_map", mapType);
+
+        assertEquals("test_map", field.getName());
+        assertTrue(field.isNullable());
+        assertTrue(field.getType() instanceof ArrowType.Map);
+        assertEquals(1, field.getChildren().size());
+
+        Field entriesField = field.getChildren().get(0);
+        assertEquals("entries", entriesField.getName());
+        assertTrue(entriesField.getType() instanceof ArrowType.Struct);
+        assertEquals(2, entriesField.getChildren().size());
+
+        Field keyField = entriesField.getChildren().get(0);
+        assertEquals("key", keyField.getName());
+        assertEquals(ArrowType.Utf8.INSTANCE, keyField.getType());
+
+        Field valueField = entriesField.getChildren().get(1);
+        assertEquals("value", valueField.getName());
+        assertTrue(valueField.getType() instanceof ArrowType.Int);
+    }
+
+    @Test
+    public void testSchemaConversion() {
+        RowType rowType = RowType.of(
+                new LogicalType[] {new IntType(), new VarCharType(100), new 
DoubleType()},
+                new String[] {"id", "name", "score"});
+
+        Schema schema = FlinkArrowUtils.toArrowSchema(rowType);
+
+        assertEquals(3, schema.getFields().size());
+
+        Field idField = schema.getFields().get(0);
+        assertEquals("id", idField.getName());
+        assertTrue(idField.getType() instanceof ArrowType.Int);
+
+        Field nameField = schema.getFields().get(1);
+        assertEquals("name", nameField.getName());
+        assertEquals(ArrowType.Utf8.INSTANCE, nameField.getType());
+
+        Field scoreField = schema.getFields().get(2);
+        assertEquals("score", scoreField.getName());
+        assertTrue(scoreField.getType() instanceof ArrowType.FloatingPoint);
+    }
+
+    @Test
+    public void testTimeTypeConversion() {
+        // Precision 0 -> SECOND, 32-bit
+        TimeType timeType0 = new TimeType(0);
+        ArrowType.Time arrowTime0 = (ArrowType.Time) 
FlinkArrowUtils.toArrowType(timeType0);
+        assertEquals(TimeUnit.SECOND, arrowTime0.getUnit());
+        assertEquals(32, arrowTime0.getBitWidth());
+
+        // Precision 1-3 -> MILLISECOND, 32-bit
+        TimeType timeType3 = new TimeType(3);
+        ArrowType.Time arrowTime3 = (ArrowType.Time) 
FlinkArrowUtils.toArrowType(timeType3);
+        assertEquals(TimeUnit.MILLISECOND, arrowTime3.getUnit());
+        assertEquals(32, arrowTime3.getBitWidth());
+
+        // Precision 4-6 -> MICROSECOND, 64-bit
+        TimeType timeType6 = new TimeType(6);
+        ArrowType.Time arrowTime6 = (ArrowType.Time) 
FlinkArrowUtils.toArrowType(timeType6);
+        assertEquals(TimeUnit.MICROSECOND, arrowTime6.getUnit());
+        assertEquals(64, arrowTime6.getBitWidth());
+
+        // Precision 7+ -> NANOSECOND, 64-bit
+        TimeType timeType9 = new TimeType(9);
+        ArrowType.Time arrowTime9 = (ArrowType.Time) 
FlinkArrowUtils.toArrowType(timeType9);
+        assertEquals(TimeUnit.NANOSECOND, arrowTime9.getUnit());
+        assertEquals(64, arrowTime9.getBitWidth());
+    }
+
+    @Test
+    public void testTimestampTypeConversion() {
+        // Precision 0 -> SECOND
+        TimestampType ts0 = new TimestampType(0);
+        ArrowType.Timestamp arrowTs0 = (ArrowType.Timestamp) 
FlinkArrowUtils.toArrowType(ts0);
+        assertEquals(TimeUnit.SECOND, arrowTs0.getUnit());
+        assertNull(arrowTs0.getTimezone());
+
+        // Precision 1-3 -> MILLISECOND
+        TimestampType ts3 = new TimestampType(3);
+        ArrowType.Timestamp arrowTs3 = (ArrowType.Timestamp) 
FlinkArrowUtils.toArrowType(ts3);
+        assertEquals(TimeUnit.MILLISECOND, arrowTs3.getUnit());
+        assertNull(arrowTs3.getTimezone());
+
+        // Precision 4-6 -> MICROSECOND
+        TimestampType ts6 = new TimestampType(6);
+        ArrowType.Timestamp arrowTs6 = (ArrowType.Timestamp) 
FlinkArrowUtils.toArrowType(ts6);
+        assertEquals(TimeUnit.MICROSECOND, arrowTs6.getUnit());
+        assertNull(arrowTs6.getTimezone());
+
+        // Precision 7+ -> NANOSECOND
+        TimestampType ts9 = new TimestampType(9);
+        ArrowType.Timestamp arrowTs9 = (ArrowType.Timestamp) 
FlinkArrowUtils.toArrowType(ts9);
+        assertEquals(TimeUnit.NANOSECOND, arrowTs9.getUnit());
+        assertNull(arrowTs9.getTimezone());
+    }
+
+    @Test
+    public void testLocalZonedTimestampTypeConversion() {
+        // Precision 0 -> SECOND
+        LocalZonedTimestampType lzType0 = new LocalZonedTimestampType(0);
+        ArrowType.Timestamp arrowLz0 = (ArrowType.Timestamp) 
FlinkArrowUtils.toArrowType(lzType0);
+        assertEquals(TimeUnit.SECOND, arrowLz0.getUnit());
+        assertNull(arrowLz0.getTimezone());
+
+        // Precision 1-3 -> MILLISECOND
+        LocalZonedTimestampType lzType3 = new LocalZonedTimestampType(3);
+        ArrowType.Timestamp arrowLz3 = (ArrowType.Timestamp) 
FlinkArrowUtils.toArrowType(lzType3);
+        assertEquals(TimeUnit.MILLISECOND, arrowLz3.getUnit());
+        assertNull(arrowLz3.getTimezone());
+
+        // Precision 4-6 -> MICROSECOND
+        LocalZonedTimestampType lzType6 = new LocalZonedTimestampType(6);
+        ArrowType.Timestamp arrowLz6 = (ArrowType.Timestamp) 
FlinkArrowUtils.toArrowType(lzType6);
+        assertEquals(TimeUnit.MICROSECOND, arrowLz6.getUnit());
+        assertNull(arrowLz6.getTimezone());
+
+        // Precision 7+ -> NANOSECOND
+        LocalZonedTimestampType lzType9 = new LocalZonedTimestampType(9);
+        ArrowType.Timestamp arrowLz9 = (ArrowType.Timestamp) 
FlinkArrowUtils.toArrowType(lzType9);
+        assertEquals(TimeUnit.NANOSECOND, arrowLz9.getUnit());
+        assertNull(arrowLz9.getTimezone());
+    }
+
+    @Test
+    public void testUnsupportedTypeThrowsException() {
+        // RawType is not supported
+        assertThrows(
+                UnsupportedOperationException.class,
+                () -> FlinkArrowUtils.toArrowType(new RawType<>(String.class, 
StringSerializer.INSTANCE)));
+    }
+}

Reply via email to