This is an automated email from the ASF dual-hosted git repository.

zhangmang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git


The following commit(s) were added to refs/heads/master by this push:
     new d1ac7fe7 [AURON #1850] Add ArrowFieldWriters for temporal and 
composite types (#2086)
d1ac7fe7 is described below

commit d1ac7fe7a478e989adbf7425db3874ea627fef60
Author: xTong <[email protected]>
AuthorDate: Tue Mar 17 10:24:56 2026 +0800

    [AURON #1850] Add ArrowFieldWriters for temporal and composite types (#2086)
    
    # Which issue does this PR close?
    
    Partially addresses #1850 (Part 2b of the Flink RowData to Arrow
    conversion).
    
    # Rationale for this change
    
    Part 2a (#2079) implemented `ArrowFieldWriter` base class, 12 basic type
    writers, and `FlinkArrowWriter` orchestrator. This PR completes the
    remaining 5 writer types (Time, Timestamp, Array, Map, Row), enabling
    full coverage of all Flink logical types supported by the Arrow type
    mapping introduced in Part 1 (#1959).
    
    The implementation follows Flink's official `flink-python` Arrow module
    as established in Part 2a, with the same `forRow()`/`forArray()`
    dual-mode factory pattern and template method design.
    
    # What changes are included in this PR?
    
    ## Commit 1: 5 ArrowFieldWriters + unit tests (10 files, +1509 lines)
    
    - **`TimeWriter`** — Handles all 4 Arrow time precisions
    (`TimeSecVector`, `TimeMilliVector`, `TimeMicroVector`,
    `TimeNanoVector`) via instanceof dispatch. Flink stores TIME as int
    (milliseconds), converted to each precision with `L`-suffixed literals
    to avoid int overflow.
    - **`TimestampWriter`** — Handles all 4 Arrow timestamp precisions.
    Combines `TimestampData.getMillisecond()` (long) and
    `getNanoOfMillisecond()` (int) for sub-millisecond precision.
    Constructor validates `timezone == null` via `Preconditions.checkState`,
    matching Flink official — timezone is not handled at the writer layer.
    - **`ArrayWriter`** — Delegates to an `elementWriter`
    (`ArrowFieldWriter<ArrayData>`) for each array element. Overrides
    `finish()`/`reset()` to propagate to the element writer.
    - **`MapWriter`** — Arrow maps are `List<Struct{key, value}>`. Holds
    separate key and value writers operating on `ArrayData`. Sets
    `structVector.setIndexDefined()` for each entry. Overrides
    `finish()`/`reset()` to propagate to key/value writers.
    - **`RowWriter`** — Nested struct handling with
    `ArrowFieldWriter<RowData>[]` for child fields. Caches a `nullRow`
    (`GenericRowData`) in the constructor for null struct handling (avoids
    per-call allocation). Uses a single child-write loop for both null and
    non-null paths, matching Flink official.
    - **Unit tests**: `TimeWriterTest` (8), `TimestampWriterTest` (9),
    `ArrayWriterTest` (5), `MapWriterTest` (3), `RowWriterTest` (3) — 28
    tests covering all precisions, null handling, reset/multi-batch, edge
    cases (pre-epoch timestamps, empty arrays/maps).
    
    ## Commit 2: Factory method extension + integration test (2 files, +158
    lines)
    
    - **`FlinkArrowUtils`** — Extended `createArrowFieldWriterForRow()` and
    `createArrowFieldWriterForArray()` with branches for `TimeWriter`,
    `TimestampWriter`, `ArrayWriter`, `MapWriter`, `RowWriter`. MapVector
    check is placed before ListVector (since `MapVector extends
    ListVector`). Timestamp branch extracts precision from both
    `TimestampType` and `LocalZonedTimestampType`.
    - **`FlinkArrowWriterTest`** — Added `testWriteTemporalAndComplexTypes`
    integration test covering TIME(6), TIMESTAMP(6), TIMESTAMP_LTZ(3),
    ARRAY\<INT\>, MAP\<VARCHAR, INT\>, ROW\<nested_id INT\>. Updated
    `testUnsupportedTypeThrows` to use `MultisetType` (since `ArrayType` is
    now supported).
    
    # Scope
    
    This PR completes all Flink-to-Arrow writer types. The remaining work
    for #1850 is the reverse direction (Arrow-to-Flink reader), which is
    tracked separately.
    
    # Are there any user-facing changes?
    
    No. Internal API for Flink integration.
    
    # How was this patch tested?
    
    36 tests across 6 test classes (28 new + 8 existing):
    
    ```bash
    ./build/mvn test -Pflink-1.18 -Pspark-3.5 -Pscala-2.12 \
      -pl auron-flink-extension/auron-flink-runtime -am -DskipBuildNative
    ```
    
    Result: 36 pass, 0 failures.
---
 .../apache/auron/flink/arrow/FlinkArrowUtils.java  |  90 +++++++++
 .../auron/flink/arrow/writers/ArrayWriter.java     | 114 +++++++++++
 .../auron/flink/arrow/writers/MapWriter.java       | 133 ++++++++++++
 .../auron/flink/arrow/writers/RowWriter.java       | 125 ++++++++++++
 .../auron/flink/arrow/writers/TimeWriter.java      | 120 +++++++++++
 .../auron/flink/arrow/writers/TimestampWriter.java | 126 ++++++++++++
 .../auron/flink/arrow/FlinkArrowWriterTest.java    |  74 ++++++-
 .../auron/flink/arrow/writers/ArrayWriterTest.java | 185 +++++++++++++++++
 .../auron/flink/arrow/writers/MapWriterTest.java   | 149 ++++++++++++++
 .../auron/flink/arrow/writers/RowWriterTest.java   | 146 +++++++++++++
 .../auron/flink/arrow/writers/TimeWriterTest.java  | 194 ++++++++++++++++++
 .../flink/arrow/writers/TimestampWriterTest.java   | 225 +++++++++++++++++++++
 12 files changed, 1678 insertions(+), 3 deletions(-)

diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java
index ec027ec4..ccaf4f5e 100644
--- 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java
@@ -29,11 +29,18 @@ import org.apache.arrow.vector.Float8Vector;
 import org.apache.arrow.vector.IntVector;
 import org.apache.arrow.vector.NullVector;
 import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.TimeMicroVector;
+import org.apache.arrow.vector.TimeMilliVector;
+import org.apache.arrow.vector.TimeNanoVector;
+import org.apache.arrow.vector.TimeSecVector;
+import org.apache.arrow.vector.TimeStampVector;
 import org.apache.arrow.vector.TinyIntVector;
 import org.apache.arrow.vector.ValueVector;
 import org.apache.arrow.vector.VarBinaryVector;
 import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.complex.ListVector;
 import org.apache.arrow.vector.complex.MapVector;
+import org.apache.arrow.vector.complex.StructVector;
 import org.apache.arrow.vector.types.DateUnit;
 import org.apache.arrow.vector.types.FloatingPointPrecision;
 import org.apache.arrow.vector.types.TimeUnit;
@@ -41,6 +48,7 @@ import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.FieldType;
 import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.auron.flink.arrow.writers.ArrayWriter;
 import org.apache.auron.flink.arrow.writers.ArrowFieldWriter;
 import org.apache.auron.flink.arrow.writers.BigIntWriter;
 import org.apache.auron.flink.arrow.writers.BooleanWriter;
@@ -49,8 +57,12 @@ import org.apache.auron.flink.arrow.writers.DecimalWriter;
 import org.apache.auron.flink.arrow.writers.DoubleWriter;
 import org.apache.auron.flink.arrow.writers.FloatWriter;
 import org.apache.auron.flink.arrow.writers.IntWriter;
+import org.apache.auron.flink.arrow.writers.MapWriter;
 import org.apache.auron.flink.arrow.writers.NullWriter;
+import org.apache.auron.flink.arrow.writers.RowWriter;
 import org.apache.auron.flink.arrow.writers.SmallIntWriter;
+import org.apache.auron.flink.arrow.writers.TimeWriter;
+import org.apache.auron.flink.arrow.writers.TimestampWriter;
 import org.apache.auron.flink.arrow.writers.TinyIntWriter;
 import org.apache.auron.flink.arrow.writers.VarBinaryWriter;
 import org.apache.auron.flink.arrow.writers.VarCharWriter;
@@ -283,6 +295,45 @@ public final class FlinkArrowUtils {
             return DecimalWriter.forRow((DecimalVector) vector, 
decimalType.getPrecision(), decimalType.getScale());
         } else if (vector instanceof DateDayVector) {
             return DateWriter.forRow((DateDayVector) vector);
+        } else if (vector instanceof TimeSecVector
+                || vector instanceof TimeMilliVector
+                || vector instanceof TimeMicroVector
+                || vector instanceof TimeNanoVector) {
+            return TimeWriter.forRow(vector);
+        } else if (vector instanceof TimeStampVector) {
+            int precision;
+            if (fieldType instanceof LocalZonedTimestampType) {
+                precision = ((LocalZonedTimestampType) 
fieldType).getPrecision();
+            } else {
+                precision = ((TimestampType) fieldType).getPrecision();
+            }
+            return TimestampWriter.forRow(vector, precision);
+        } else if (vector instanceof MapVector) {
+            // MapVector extends ListVector, so this check must come before 
ListVector
+            MapVector mapVector = (MapVector) vector;
+            MapType mapType = (MapType) fieldType;
+            StructVector entriesVector = (StructVector) 
mapVector.getDataVector();
+            ArrowFieldWriter<ArrayData> keyWriter =
+                    
createArrowFieldWriterForArray(entriesVector.getChild(MapVector.KEY_NAME), 
mapType.getKeyType());
+            ArrowFieldWriter<ArrayData> valueWriter = 
createArrowFieldWriterForArray(
+                    entriesVector.getChild(MapVector.VALUE_NAME), 
mapType.getValueType());
+            return MapWriter.forRow(mapVector, keyWriter, valueWriter);
+        } else if (vector instanceof ListVector) {
+            ListVector listVector = (ListVector) vector;
+            ArrayType arrayType = (ArrayType) fieldType;
+            ArrowFieldWriter<ArrayData> elementWriter =
+                    createArrowFieldWriterForArray(listVector.getDataVector(), 
arrayType.getElementType());
+            return ArrayWriter.forRow(listVector, elementWriter);
+        } else if (vector instanceof StructVector) {
+            StructVector structVector = (StructVector) vector;
+            RowType rowType = (RowType) fieldType;
+            @SuppressWarnings("unchecked")
+            ArrowFieldWriter<RowData>[] fieldsWriters = new 
ArrowFieldWriter[rowType.getFieldCount()];
+            for (int i = 0; i < fieldsWriters.length; i++) {
+                fieldsWriters[i] =
+                        
createArrowFieldWriterForRow(structVector.getChildByOrdinal(i), 
rowType.getTypeAt(i));
+            }
+            return RowWriter.forRow(structVector, fieldsWriters);
         } else {
             throw new UnsupportedOperationException(
                     "Unsupported vector type: " + 
vector.getClass().getSimpleName());
@@ -323,6 +374,45 @@ public final class FlinkArrowUtils {
             return DecimalWriter.forArray((DecimalVector) vector, 
decimalType.getPrecision(), decimalType.getScale());
         } else if (vector instanceof DateDayVector) {
             return DateWriter.forArray((DateDayVector) vector);
+        } else if (vector instanceof TimeSecVector
+                || vector instanceof TimeMilliVector
+                || vector instanceof TimeMicroVector
+                || vector instanceof TimeNanoVector) {
+            return TimeWriter.forArray(vector);
+        } else if (vector instanceof TimeStampVector) {
+            int precision;
+            if (fieldType instanceof LocalZonedTimestampType) {
+                precision = ((LocalZonedTimestampType) 
fieldType).getPrecision();
+            } else {
+                precision = ((TimestampType) fieldType).getPrecision();
+            }
+            return TimestampWriter.forArray(vector, precision);
+        } else if (vector instanceof MapVector) {
+            // MapVector extends ListVector, so this check must come before 
ListVector
+            MapVector mapVector = (MapVector) vector;
+            MapType mapType = (MapType) fieldType;
+            StructVector entriesVector = (StructVector) 
mapVector.getDataVector();
+            ArrowFieldWriter<ArrayData> keyWriter =
+                    
createArrowFieldWriterForArray(entriesVector.getChild(MapVector.KEY_NAME), 
mapType.getKeyType());
+            ArrowFieldWriter<ArrayData> valueWriter = 
createArrowFieldWriterForArray(
+                    entriesVector.getChild(MapVector.VALUE_NAME), 
mapType.getValueType());
+            return MapWriter.forArray(mapVector, keyWriter, valueWriter);
+        } else if (vector instanceof ListVector) {
+            ListVector listVector = (ListVector) vector;
+            ArrayType arrayType = (ArrayType) fieldType;
+            ArrowFieldWriter<ArrayData> elementWriter =
+                    createArrowFieldWriterForArray(listVector.getDataVector(), 
arrayType.getElementType());
+            return ArrayWriter.forArray(listVector, elementWriter);
+        } else if (vector instanceof StructVector) {
+            StructVector structVector = (StructVector) vector;
+            RowType rowType = (RowType) fieldType;
+            @SuppressWarnings("unchecked")
+            ArrowFieldWriter<RowData>[] fieldsWriters = new 
ArrowFieldWriter[rowType.getFieldCount()];
+            for (int i = 0; i < fieldsWriters.length; i++) {
+                fieldsWriters[i] =
+                        
createArrowFieldWriterForRow(structVector.getChildByOrdinal(i), 
rowType.getTypeAt(i));
+            }
+            return RowWriter.forArray(structVector, fieldsWriters);
         } else {
             throw new UnsupportedOperationException(
                     "Unsupported vector type: " + 
vector.getClass().getSimpleName());
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/ArrayWriter.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/ArrayWriter.java
new file mode 100644
index 00000000..f876d799
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/ArrayWriter.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.writers;
+
+import java.util.Objects;
+import org.apache.arrow.vector.complex.ListVector;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * {@link ArrowFieldWriter} for arrays ({@link ListVector}).
+ *
+ * <p>Holds an {@code elementWriter} that writes each array element. The 
element writer operates on
+ * {@link ArrayData} since array elements are accessed via {@link ArrayData} 
interface.
+ *
+ * @param <T> the input data type
+ */
+public abstract class ArrayWriter<T> extends ArrowFieldWriter<T> {
+
+    /** Creates an ArrayWriter that reads from {@link RowData}. */
+    public static ArrayWriter<RowData> forRow(ListVector listVector, 
ArrowFieldWriter<ArrayData> elementWriter) {
+        return new ArrayWriterForRow(listVector, elementWriter);
+    }
+
+    /** Creates an ArrayWriter that reads from {@link ArrayData}. */
+    public static ArrayWriter<ArrayData> forArray(ListVector listVector, 
ArrowFieldWriter<ArrayData> elementWriter) {
+        return new ArrayWriterForArray(listVector, elementWriter);
+    }
+
+    // 
------------------------------------------------------------------------------------------
+
+    private final ArrowFieldWriter<ArrayData> elementWriter;
+
+    private ArrayWriter(ListVector listVector, ArrowFieldWriter<ArrayData> 
elementWriter) {
+        super(listVector);
+        this.elementWriter = Objects.requireNonNull(elementWriter);
+    }
+
+    abstract boolean isNullAt(T in, int ordinal);
+
+    abstract ArrayData readArray(T in, int ordinal);
+
+    @Override
+    public void doWrite(T in, int ordinal) {
+        if (!isNullAt(in, ordinal)) {
+            ((ListVector) getValueVector()).startNewValue(getCount());
+            ArrayData array = readArray(in, ordinal);
+            for (int i = 0; i < array.size(); i++) {
+                elementWriter.write(array, i);
+            }
+            ((ListVector) getValueVector()).endValue(getCount(), array.size());
+        }
+    }
+
+    @Override
+    public void finish() {
+        super.finish();
+        elementWriter.finish();
+    }
+
+    @Override
+    public void reset() {
+        super.reset();
+        elementWriter.reset();
+    }
+
+    // ---- Inner classes for RowData and ArrayData ----
+
+    public static final class ArrayWriterForRow extends ArrayWriter<RowData> {
+        private ArrayWriterForRow(ListVector listVector, 
ArrowFieldWriter<ArrayData> elementWriter) {
+            super(listVector, elementWriter);
+        }
+
+        @Override
+        boolean isNullAt(RowData in, int ordinal) {
+            return in.isNullAt(ordinal);
+        }
+
+        @Override
+        ArrayData readArray(RowData in, int ordinal) {
+            return in.getArray(ordinal);
+        }
+    }
+
+    public static final class ArrayWriterForArray extends 
ArrayWriter<ArrayData> {
+        private ArrayWriterForArray(ListVector listVector, 
ArrowFieldWriter<ArrayData> elementWriter) {
+            super(listVector, elementWriter);
+        }
+
+        @Override
+        boolean isNullAt(ArrayData in, int ordinal) {
+            return in.isNullAt(ordinal);
+        }
+
+        @Override
+        ArrayData readArray(ArrayData in, int ordinal) {
+            return in.getArray(ordinal);
+        }
+    }
+}
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/MapWriter.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/MapWriter.java
new file mode 100644
index 00000000..f7201e17
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/MapWriter.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.writers;
+
+import java.util.Objects;
+import org.apache.arrow.vector.complex.MapVector;
+import org.apache.arrow.vector.complex.StructVector;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * {@link ArrowFieldWriter} for maps ({@link MapVector}).
+ *
+ * <p>Arrow represents maps as {@code List<Struct{key, value}>}. This writer 
holds separate key and
+ * value writers that operate on {@link ArrayData} (from {@link 
MapData#keyArray()} and {@link
+ * MapData#valueArray()}).
+ *
+ * @param <T> the input data type
+ */
+public abstract class MapWriter<T> extends ArrowFieldWriter<T> {
+
+    /** Creates a MapWriter that reads from {@link RowData}. */
+    public static MapWriter<RowData> forRow(
+            MapVector mapVector, ArrowFieldWriter<ArrayData> keyWriter, 
ArrowFieldWriter<ArrayData> valueWriter) {
+        return new MapWriterForRow(mapVector, keyWriter, valueWriter);
+    }
+
+    /** Creates a MapWriter that reads from {@link ArrayData}. */
+    public static MapWriter<ArrayData> forArray(
+            MapVector mapVector, ArrowFieldWriter<ArrayData> keyWriter, 
ArrowFieldWriter<ArrayData> valueWriter) {
+        return new MapWriterForArray(mapVector, keyWriter, valueWriter);
+    }
+
+    // 
------------------------------------------------------------------------------------------
+
+    private final ArrowFieldWriter<ArrayData> keyWriter;
+    private final ArrowFieldWriter<ArrayData> valueWriter;
+
+    private MapWriter(
+            MapVector mapVector, ArrowFieldWriter<ArrayData> keyWriter, 
ArrowFieldWriter<ArrayData> valueWriter) {
+        super(mapVector);
+        this.keyWriter = Objects.requireNonNull(keyWriter);
+        this.valueWriter = Objects.requireNonNull(valueWriter);
+    }
+
+    abstract boolean isNullAt(T in, int ordinal);
+
+    abstract MapData readMap(T in, int ordinal);
+
+    @Override
+    public void doWrite(T in, int ordinal) {
+        if (!isNullAt(in, ordinal)) {
+            ((MapVector) getValueVector()).startNewValue(getCount());
+
+            StructVector structVector = (StructVector) ((MapVector) 
getValueVector()).getDataVector();
+            MapData map = readMap(in, ordinal);
+            ArrayData keys = map.keyArray();
+            ArrayData values = map.valueArray();
+            for (int i = 0; i < map.size(); i++) {
+                structVector.setIndexDefined(keyWriter.getCount());
+                keyWriter.write(keys, i);
+                valueWriter.write(values, i);
+            }
+
+            ((MapVector) getValueVector()).endValue(getCount(), map.size());
+        }
+    }
+
+    @Override
+    public void finish() {
+        super.finish();
+        keyWriter.finish();
+        valueWriter.finish();
+    }
+
+    @Override
+    public void reset() {
+        super.reset();
+        keyWriter.reset();
+        valueWriter.reset();
+    }
+
+    // ---- Inner classes for RowData and ArrayData ----
+
+    public static final class MapWriterForRow extends MapWriter<RowData> {
+        private MapWriterForRow(
+                MapVector mapVector, ArrowFieldWriter<ArrayData> keyWriter, 
ArrowFieldWriter<ArrayData> valueWriter) {
+            super(mapVector, keyWriter, valueWriter);
+        }
+
+        @Override
+        boolean isNullAt(RowData in, int ordinal) {
+            return in.isNullAt(ordinal);
+        }
+
+        @Override
+        MapData readMap(RowData in, int ordinal) {
+            return in.getMap(ordinal);
+        }
+    }
+
+    public static final class MapWriterForArray extends MapWriter<ArrayData> {
+        private MapWriterForArray(
+                MapVector mapVector, ArrowFieldWriter<ArrayData> keyWriter, 
ArrowFieldWriter<ArrayData> valueWriter) {
+            super(mapVector, keyWriter, valueWriter);
+        }
+
+        @Override
+        boolean isNullAt(ArrayData in, int ordinal) {
+            return in.isNullAt(ordinal);
+        }
+
+        @Override
+        MapData readMap(ArrayData in, int ordinal) {
+            return in.getMap(ordinal);
+        }
+    }
+}
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/RowWriter.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/RowWriter.java
new file mode 100644
index 00000000..f2fad389
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/RowWriter.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.writers;
+
+import org.apache.arrow.vector.complex.StructVector;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * {@link ArrowFieldWriter} for nested rows ({@link StructVector}).
+ *
+ * <p>Holds an array of field writers ({@code ArrowFieldWriter<RowData>[]}) — 
one per struct field.
+ * Both {@link #forRow} and {@link #forArray} accept {@code 
ArrowFieldWriter<RowData>[]} because
+ * nested rows are always accessed as {@link RowData} (via {@link 
RowData#getRow} or {@link
+ * ArrayData#getRow}).
+ *
+ * @param <T> the input data type
+ */
+public abstract class RowWriter<T> extends ArrowFieldWriter<T> {
+
+    /** Creates a RowWriter that reads from {@link RowData}. */
+    public static RowWriter<RowData> forRow(StructVector structVector, 
ArrowFieldWriter<RowData>[] fieldsWriters) {
+        return new RowWriterForRow(structVector, fieldsWriters);
+    }
+
+    /** Creates a RowWriter that reads from {@link ArrayData}. */
+    public static RowWriter<ArrayData> forArray(StructVector structVector, 
ArrowFieldWriter<RowData>[] fieldsWriters) {
+        return new RowWriterForArray(structVector, fieldsWriters);
+    }
+
+    // 
------------------------------------------------------------------------------------------
+
+    protected final ArrowFieldWriter<RowData>[] fieldsWriters;
+    private final GenericRowData nullRow;
+
+    private RowWriter(StructVector structVector, ArrowFieldWriter<RowData>[] 
fieldsWriters) {
+        super(structVector);
+        this.fieldsWriters = fieldsWriters;
+        this.nullRow = new GenericRowData(fieldsWriters.length);
+    }
+
+    abstract boolean isNullAt(T in, int ordinal);
+
+    abstract RowData readRow(T in, int ordinal);
+
+    @Override
+    public void doWrite(T in, int ordinal) {
+        RowData row;
+        if (isNullAt(in, ordinal)) {
+            row = nullRow;
+            ((StructVector) getValueVector()).setNull(getCount());
+        } else {
+            row = readRow(in, ordinal);
+            ((StructVector) getValueVector()).setIndexDefined(getCount());
+        }
+        for (int i = 0; i < fieldsWriters.length; i++) {
+            fieldsWriters[i].write(row, i);
+        }
+    }
+
+    @Override
+    public void finish() {
+        super.finish();
+        for (ArrowFieldWriter<?> fieldsWriter : fieldsWriters) {
+            fieldsWriter.finish();
+        }
+    }
+
+    @Override
+    public void reset() {
+        super.reset();
+        for (ArrowFieldWriter<?> fieldsWriter : fieldsWriters) {
+            fieldsWriter.reset();
+        }
+    }
+
+    // ---- Inner classes for RowData and ArrayData ----
+
+    public static final class RowWriterForRow extends RowWriter<RowData> {
+        private RowWriterForRow(StructVector structVector, 
ArrowFieldWriter<RowData>[] fieldsWriters) {
+            super(structVector, fieldsWriters);
+        }
+
+        @Override
+        boolean isNullAt(RowData in, int ordinal) {
+            return in.isNullAt(ordinal);
+        }
+
+        @Override
+        RowData readRow(RowData in, int ordinal) {
+            return in.getRow(ordinal, fieldsWriters.length);
+        }
+    }
+
+    public static final class RowWriterForArray extends RowWriter<ArrayData> {
+        private RowWriterForArray(StructVector structVector, 
ArrowFieldWriter<RowData>[] fieldsWriters) {
+            super(structVector, fieldsWriters);
+        }
+
+        @Override
+        boolean isNullAt(ArrayData in, int ordinal) {
+            return in.isNullAt(ordinal);
+        }
+
+        @Override
+        RowData readRow(ArrayData in, int ordinal) {
+            return in.getRow(ordinal, fieldsWriters.length);
+        }
+    }
+}
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/TimeWriter.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/TimeWriter.java
new file mode 100644
index 00000000..dedceebc
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/TimeWriter.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.writers;
+
+import org.apache.arrow.vector.BaseFixedWidthVector;
+import org.apache.arrow.vector.TimeMicroVector;
+import org.apache.arrow.vector.TimeMilliVector;
+import org.apache.arrow.vector.TimeNanoVector;
+import org.apache.arrow.vector.TimeSecVector;
+import org.apache.arrow.vector.ValueVector;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * {@link ArrowFieldWriter} for time values stored in Arrow time vectors.
+ *
+ * <p>Supports all four Arrow time precisions: {@link TimeSecVector}, {@link 
TimeMilliVector}, {@link
+ * TimeMicroVector}, and {@link TimeNanoVector}. Flink internally stores TIME 
values as milliseconds
+ * in an {@code int}; this writer converts to the target precision on write.
+ *
+ * <p>Use {@link #forRow(ValueVector)} when writing from {@link RowData} and 
{@link
+ * #forArray(ValueVector)} when writing from {@link ArrayData}.
+ *
+ * @param <T> the input data type
+ */
+public abstract class TimeWriter<T> extends ArrowFieldWriter<T> {
+
+    /** Creates a TimeWriter that reads from {@link RowData}. */
+    public static TimeWriter<RowData> forRow(ValueVector valueVector) {
+        return new TimeWriterForRow(valueVector);
+    }
+
+    /** Creates a TimeWriter that reads from {@link ArrayData}. */
+    public static TimeWriter<ArrayData> forArray(ValueVector valueVector) {
+        return new TimeWriterForArray(valueVector);
+    }
+
+    private TimeWriter(ValueVector valueVector) {
+        super(valueVector);
+        Preconditions.checkState(
+                valueVector instanceof TimeSecVector
+                        || valueVector instanceof TimeMilliVector
+                        || valueVector instanceof TimeMicroVector
+                        || valueVector instanceof TimeNanoVector,
+                "Expected a time vector 
(TimeSecVector/TimeMilliVector/TimeMicroVector/TimeNanoVector), got: %s",
+                valueVector.getClass().getSimpleName());
+    }
+
+    abstract boolean isNullAt(T in, int ordinal);
+
+    abstract int readTime(T in, int ordinal);
+
+    @Override
+    public void doWrite(T in, int ordinal) {
+        ValueVector vector = getValueVector();
+        if (isNullAt(in, ordinal)) {
+            ((BaseFixedWidthVector) vector).setNull(getCount());
+        } else {
+            int millis = readTime(in, ordinal);
+            if (vector instanceof TimeSecVector) {
+                ((TimeSecVector) vector).setSafe(getCount(), millis / 1000);
+            } else if (vector instanceof TimeMilliVector) {
+                ((TimeMilliVector) vector).setSafe(getCount(), millis);
+            } else if (vector instanceof TimeMicroVector) {
+                ((TimeMicroVector) vector).setSafe(getCount(), millis * 1000L);
+            } else if (vector instanceof TimeNanoVector) {
+                ((TimeNanoVector) vector).setSafe(getCount(), millis * 
1_000_000L);
+            }
+        }
+    }
+
+    // ---- Inner classes for RowData and ArrayData ----
+
+    public static final class TimeWriterForRow extends TimeWriter<RowData> {
+        private TimeWriterForRow(ValueVector valueVector) {
+            super(valueVector);
+        }
+
+        @Override
+        boolean isNullAt(RowData in, int ordinal) {
+            return in.isNullAt(ordinal);
+        }
+
+        @Override
+        int readTime(RowData in, int ordinal) {
+            return in.getInt(ordinal);
+        }
+    }
+
+    public static final class TimeWriterForArray extends TimeWriter<ArrayData> 
{
+        private TimeWriterForArray(ValueVector valueVector) {
+            super(valueVector);
+        }
+
+        @Override
+        boolean isNullAt(ArrayData in, int ordinal) {
+            return in.isNullAt(ordinal);
+        }
+
+        @Override
+        int readTime(ArrayData in, int ordinal) {
+            return in.getInt(ordinal);
+        }
+    }
+}
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/TimestampWriter.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/TimestampWriter.java
new file mode 100644
index 00000000..6a927938
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/TimestampWriter.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.writers;
+
+import org.apache.arrow.vector.TimeStampMicroVector;
+import org.apache.arrow.vector.TimeStampMilliVector;
+import org.apache.arrow.vector.TimeStampNanoVector;
+import org.apache.arrow.vector.TimeStampSecVector;
+import org.apache.arrow.vector.TimeStampVector;
+import org.apache.arrow.vector.ValueVector;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * {@link ArrowFieldWriter} for timestamps ({@link TimeStampSecVector}, {@link
+ * TimeStampMilliVector}, {@link TimeStampMicroVector}, {@link 
TimeStampNanoVector}).
+ *
+ * <p>Handles both {@code TimestampType} and {@code LocalZonedTimestampType} — 
time zone is not
+ * handled at the writer layer.
+ *
+ * <p>Use {@link #forRow(ValueVector, int)} when writing from {@link RowData} 
and {@link
+ * #forArray(ValueVector, int)} when writing from {@link ArrayData}.
+ *
+ * @param <T> the input data type
+ */
+public abstract class TimestampWriter<T> extends ArrowFieldWriter<T> {
+
+    protected final int precision;
+
+    /** Creates a TimestampWriter that reads from {@link RowData}. */
+    public static TimestampWriter<RowData> forRow(ValueVector valueVector, int 
precision) {
+        return new TimestampWriterForRow(valueVector, precision);
+    }
+
+    /** Creates a TimestampWriter that reads from {@link ArrayData}. */
+    public static TimestampWriter<ArrayData> forArray(ValueVector valueVector, 
int precision) {
+        return new TimestampWriterForArray(valueVector, precision);
+    }
+
+    private TimestampWriter(ValueVector valueVector, int precision) {
+        super(valueVector);
+        Preconditions.checkState(
+                valueVector instanceof TimeStampVector
+                        && ((ArrowType.Timestamp) 
valueVector.getField().getType()).getTimezone() == null,
+                "Unexpected vector type or timezone for TimestampWriter: %s",
+                valueVector.getClass().getSimpleName());
+        this.precision = precision;
+    }
+
+    abstract boolean isNullAt(T in, int ordinal);
+
+    abstract TimestampData readTimestamp(T in, int ordinal);
+
+    @Override
+    public void doWrite(T in, int ordinal) {
+        if (isNullAt(in, ordinal)) {
+            ((TimeStampVector) getValueVector()).setNull(getCount());
+        } else {
+            TimestampData ts = readTimestamp(in, ordinal);
+            long millis = ts.getMillisecond();
+            int nanoOfMilli = ts.getNanoOfMillisecond();
+            ValueVector vector = getValueVector();
+
+            if (vector instanceof TimeStampSecVector) {
+                ((TimeStampSecVector) vector).setSafe(getCount(), millis / 
1000);
+            } else if (vector instanceof TimeStampMilliVector) {
+                ((TimeStampMilliVector) vector).setSafe(getCount(), millis);
+            } else if (vector instanceof TimeStampMicroVector) {
+                ((TimeStampMicroVector) vector).setSafe(getCount(), millis * 
1000L + nanoOfMilli / 1000);
+            } else if (vector instanceof TimeStampNanoVector) {
+                ((TimeStampNanoVector) vector).setSafe(getCount(), millis * 
1_000_000L + nanoOfMilli);
+            }
+        }
+    }
+
+    // ---- Inner classes for RowData and ArrayData ----
+
+    public static final class TimestampWriterForRow extends 
TimestampWriter<RowData> {
+        private TimestampWriterForRow(ValueVector valueVector, int precision) {
+            super(valueVector, precision);
+        }
+
+        @Override
+        boolean isNullAt(RowData in, int ordinal) {
+            return in.isNullAt(ordinal);
+        }
+
+        @Override
+        TimestampData readTimestamp(RowData in, int ordinal) {
+            return in.getTimestamp(ordinal, precision);
+        }
+    }
+
+    public static final class TimestampWriterForArray extends 
TimestampWriter<ArrayData> {
+        private TimestampWriterForArray(ValueVector valueVector, int 
precision) {
+            super(valueVector, precision);
+        }
+
+        @Override
+        boolean isNullAt(ArrayData in, int ordinal) {
+            return in.isNullAt(ordinal);
+        }
+
+        @Override
+        TimestampData readTimestamp(ArrayData in, int ordinal) {
+            return in.getTimestamp(ordinal, precision);
+        }
+    }
+}
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowWriterTest.java
 
b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowWriterTest.java
index 042a4794..27063e68 100644
--- 
a/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowWriterTest.java
+++ 
b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowWriterTest.java
@@ -20,6 +20,8 @@ import static org.junit.jupiter.api.Assertions.*;
 
 import java.math.BigDecimal;
 import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
 import org.apache.arrow.vector.BigIntVector;
@@ -31,14 +33,21 @@ import org.apache.arrow.vector.Float8Vector;
 import org.apache.arrow.vector.IntVector;
 import org.apache.arrow.vector.NullVector;
 import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.TimeMicroVector;
+import org.apache.arrow.vector.TimeStampMicroVector;
+import org.apache.arrow.vector.TimeStampMilliVector;
 import org.apache.arrow.vector.TinyIntVector;
 import org.apache.arrow.vector.VarBinaryVector;
 import org.apache.arrow.vector.VarCharVector;
 import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.ListVector;
 import org.apache.arrow.vector.types.pojo.Schema;
 import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.BooleanType;
@@ -47,10 +56,15 @@ import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.DoubleType;
 import org.apache.flink.table.types.logical.FloatType;
 import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
 import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
 import org.apache.flink.table.types.logical.NullType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
 import org.apache.flink.table.types.logical.TinyIntType;
 import org.apache.flink.table.types.logical.VarBinaryType;
 import org.apache.flink.table.types.logical.VarCharType;
@@ -250,15 +264,69 @@ public class FlinkArrowWriterTest {
         }
     }
 
-    /** Unsupported vector types (e.g., from ArrayType) throw 
UnsupportedOperationException. */
+    /** Unsupported types throw UnsupportedOperationException. */
     @Test
     public void testUnsupportedTypeThrows() {
-        RowType rowType = RowType.of(new LogicalType[] {new ArrayType(new 
IntType())}, new String[] {"f_array"});
+        // MultisetType is not supported by toArrowType, so this should throw
+        RowType rowType = RowType.of(new LogicalType[] {new MultisetType(new 
IntType())}, new String[] {"f_multiset"});
+
+        assertThrows(UnsupportedOperationException.class, () -> 
FlinkArrowUtils.toArrowSchema(rowType));
+    }
+
+    /** Writes one row containing Time, Timestamp, Array, Map, and Row types. 
*/
+    @Test
+    public void testWriteTemporalAndComplexTypes() {
+        RowType innerRowType = RowType.of(new LogicalType[] {new IntType()}, 
new String[] {"nested_id"});
+
+        RowType rowType = RowType.of(
+                new LogicalType[] {
+                    new TimeType(6), // TIME(6) → TimeMicroVector
+                    new TimestampType(6), // TIMESTAMP(6) → 
TimeStampMicroVector
+                    new LocalZonedTimestampType(3), // TIMESTAMP_LTZ(3) → 
TimeStampMilliVector
+                    new ArrayType(new IntType()), // ARRAY<INT>
+                    new MapType(new VarCharType(100), new IntType()), // 
MAP<VARCHAR, INT>
+                    innerRowType // ROW<nested_id INT>
+                },
+                new String[] {"f_time", "f_ts", "f_ts_ltz", "f_array", 
"f_map", "f_row"});
 
         Schema schema = FlinkArrowUtils.toArrowSchema(rowType);
 
         try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, 
allocator)) {
-            assertThrows(UnsupportedOperationException.class, () -> 
FlinkArrowWriter.create(root, rowType));
+            FlinkArrowWriter writer = FlinkArrowWriter.create(root, rowType);
+
+            GenericRowData row = new GenericRowData(6);
+            row.setField(0, 3661000); // TIME: 1h 1m 1s in ms
+            row.setField(1, TimestampData.fromEpochMillis(1704067200123L, 
456000)); // TIMESTAMP(6)
+            row.setField(2, TimestampData.fromEpochMillis(1704067200000L)); // 
TIMESTAMP_LTZ(3)
+            row.setField(3, new GenericArrayData(new int[] {10, 20, 30})); // 
ARRAY
+            Map<StringData, Integer> map = new HashMap<>();
+            map.put(StringData.fromString("key1"), 100);
+            row.setField(4, new GenericMapData(map)); // MAP
+            row.setField(5, GenericRowData.of(42)); // ROW
+
+            writer.write(row);
+            writer.finish();
+
+            assertEquals(1, root.getRowCount());
+
+            // TIME(6) → micro: 3661000 * 1000 = 3661000000
+            assertEquals(3661000L * 1000, ((TimeMicroVector) 
root.getVector("f_time")).get(0));
+
+            // TIMESTAMP(6) → micro: 1704067200123 * 1000 + 456000 / 1000 = 
1704067200123456
+            assertEquals(1704067200123L * 1000 + 456, ((TimeStampMicroVector) 
root.getVector("f_ts")).get(0));
+
+            // TIMESTAMP_LTZ(3) → milli
+            assertEquals(1704067200000L, ((TimeStampMilliVector) 
root.getVector("f_ts_ltz")).get(0));
+
+            // ARRAY
+            ListVector arrayVec = (ListVector) root.getVector("f_array");
+            assertFalse(arrayVec.isNull(0));
+
+            // MAP
+            assertFalse(root.getVector("f_map").isNull(0));
+
+            // ROW
+            assertFalse(root.getVector("f_row").isNull(0));
         }
     }
 
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/writers/ArrayWriterTest.java
 
b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/writers/ArrayWriterTest.java
new file mode 100644
index 00000000..979921e7
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/writers/ArrayWriterTest.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.writers;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.ListVector;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.auron.flink.arrow.FlinkArrowUtils;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/** Unit tests for {@link ArrayWriter}. */
+public class ArrayWriterTest {
+
+    private BufferAllocator allocator;
+
+    @BeforeEach
+    public void setUp() {
+        allocator = new RootAllocator(Long.MAX_VALUE);
+    }
+
+    @AfterEach
+    public void tearDown() {
+        allocator.close();
+    }
+
+    @Test
+    public void testWriteIntArrayFromRowData() {
+        RowType rowType = RowType.of(new LogicalType[] {new ArrayType(new 
IntType())}, new String[] {"f_array"});
+        Schema schema = FlinkArrowUtils.toArrowSchema(rowType);
+
+        try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, 
allocator)) {
+            ListVector listVector = (ListVector) root.getVector("f_array");
+            listVector.allocateNew();
+            IntVector dataVector = (IntVector) listVector.getDataVector();
+            dataVector.allocateNew();
+
+            ArrowFieldWriter<ArrayData> elementWriter = 
IntWriter.forArray(dataVector);
+            ArrayWriter<RowData> writer = ArrayWriter.forRow(listVector, 
elementWriter);
+
+            GenericRowData row = new GenericRowData(1);
+            row.setField(0, new GenericArrayData(new int[] {10, 20, 30}));
+            writer.write(row, 0);
+
+            writer.finish();
+
+            assertEquals(1, listVector.getValueCount());
+            assertFalse(listVector.isNull(0));
+            // Verify the data vector content
+            assertEquals(3, dataVector.getValueCount());
+            assertEquals(10, dataVector.get(0));
+            assertEquals(20, dataVector.get(1));
+            assertEquals(30, dataVector.get(2));
+        }
+    }
+
+    @Test
+    public void testWriteNullArrayFromRowData() {
+        RowType rowType = RowType.of(new LogicalType[] {new ArrayType(new 
IntType())}, new String[] {"f_array"});
+        Schema schema = FlinkArrowUtils.toArrowSchema(rowType);
+
+        try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, 
allocator)) {
+            ListVector listVector = (ListVector) root.getVector("f_array");
+            listVector.allocateNew();
+            IntVector dataVector = (IntVector) listVector.getDataVector();
+            dataVector.allocateNew();
+
+            ArrowFieldWriter<ArrayData> elementWriter = 
IntWriter.forArray(dataVector);
+            ArrayWriter<RowData> writer = ArrayWriter.forRow(listVector, 
elementWriter);
+
+            GenericRowData row = new GenericRowData(1);
+            row.setField(0, null);
+            writer.write(row, 0);
+
+            writer.finish();
+
+            assertEquals(1, listVector.getValueCount());
+            assertTrue(listVector.isNull(0));
+        }
+    }
+
+    @Test
+    public void testWriteEmptyArrayFromRowData() {
+        RowType rowType = RowType.of(new LogicalType[] {new ArrayType(new 
IntType())}, new String[] {"f_array"});
+        Schema schema = FlinkArrowUtils.toArrowSchema(rowType);
+
+        try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, 
allocator)) {
+            ListVector listVector = (ListVector) root.getVector("f_array");
+            listVector.allocateNew();
+            IntVector dataVector = (IntVector) listVector.getDataVector();
+            dataVector.allocateNew();
+
+            ArrowFieldWriter<ArrayData> elementWriter = 
IntWriter.forArray(dataVector);
+            ArrayWriter<RowData> writer = ArrayWriter.forRow(listVector, 
elementWriter);
+
+            GenericRowData row = new GenericRowData(1);
+            row.setField(0, new GenericArrayData(new int[] {}));
+            writer.write(row, 0);
+
+            writer.finish();
+
+            assertEquals(1, listVector.getValueCount());
+            assertFalse(listVector.isNull(0));
+            assertEquals(0, dataVector.getValueCount());
+        }
+    }
+
+    @Test
+    public void testWriteMultipleArraysFromRowData() {
+        RowType rowType = RowType.of(new LogicalType[] {new ArrayType(new 
IntType())}, new String[] {"f_array"});
+        Schema schema = FlinkArrowUtils.toArrowSchema(rowType);
+
+        try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, 
allocator)) {
+            ListVector listVector = (ListVector) root.getVector("f_array");
+            listVector.allocateNew();
+            IntVector dataVector = (IntVector) listVector.getDataVector();
+            dataVector.allocateNew();
+
+            ArrowFieldWriter<ArrayData> elementWriter = 
IntWriter.forArray(dataVector);
+            ArrayWriter<RowData> writer = ArrayWriter.forRow(listVector, 
elementWriter);
+
+            writer.write(GenericRowData.of((Object) new GenericArrayData(new 
int[] {1, 2})), 0);
+            writer.write(GenericRowData.of((Object) new GenericArrayData(new 
int[] {3})), 0);
+            writer.write(GenericRowData.of((Object) new GenericArrayData(new 
int[] {4, 5, 6})), 0);
+
+            writer.finish();
+
+            assertEquals(3, listVector.getValueCount());
+            assertEquals(6, dataVector.getValueCount());
+        }
+    }
+
+    @Test
+    public void testResetAndWriteNewBatch() {
+        RowType rowType = RowType.of(new LogicalType[] {new ArrayType(new 
IntType())}, new String[] {"f_array"});
+        Schema schema = FlinkArrowUtils.toArrowSchema(rowType);
+
+        try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, 
allocator)) {
+            ListVector listVector = (ListVector) root.getVector("f_array");
+            listVector.allocateNew();
+            IntVector dataVector = (IntVector) listVector.getDataVector();
+            dataVector.allocateNew();
+
+            ArrowFieldWriter<ArrayData> elementWriter = 
IntWriter.forArray(dataVector);
+            ArrayWriter<RowData> writer = ArrayWriter.forRow(listVector, 
elementWriter);
+
+            writer.write(GenericRowData.of((Object) new GenericArrayData(new 
int[] {1, 2})), 0);
+            writer.finish();
+            assertEquals(1, listVector.getValueCount());
+
+            writer.reset();
+            writer.write(GenericRowData.of((Object) new GenericArrayData(new 
int[] {99})), 0);
+            writer.finish();
+            assertEquals(1, listVector.getValueCount());
+        }
+    }
+}
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/writers/MapWriterTest.java
 
b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/writers/MapWriterTest.java
new file mode 100644
index 00000000..e4595941
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/writers/MapWriterTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.writers;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.MapVector;
+import org.apache.arrow.vector.complex.StructVector;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.auron.flink.arrow.FlinkArrowUtils;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/** Unit tests for {@link MapWriter}. */
+public class MapWriterTest {
+
+    private BufferAllocator allocator;
+
+    @BeforeEach
+    public void setUp() {
+        allocator = new RootAllocator(Long.MAX_VALUE);
+    }
+
+    @AfterEach
+    public void tearDown() {
+        allocator.close();
+    }
+
+    @Test
+    public void testWriteMapFromRowData() {
+        RowType rowType = RowType.of(
+                new LogicalType[] {new MapType(new VarCharType(100), new 
IntType())}, new String[] {"f_map"});
+        Schema schema = FlinkArrowUtils.toArrowSchema(rowType);
+
+        try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, 
allocator)) {
+            MapVector mapVector = (MapVector) root.getVector("f_map");
+            mapVector.allocateNew();
+
+            StructVector entriesVector = (StructVector) 
mapVector.getDataVector();
+            ArrowFieldWriter<ArrayData> keyWriter = VarCharWriter.forArray(
+                    (org.apache.arrow.vector.VarCharVector) 
entriesVector.getChild(MapVector.KEY_NAME));
+            ArrowFieldWriter<ArrayData> valueWriter = IntWriter.forArray(
+                    (org.apache.arrow.vector.IntVector) 
entriesVector.getChild(MapVector.VALUE_NAME));
+
+            MapWriter<RowData> writer = MapWriter.forRow(mapVector, keyWriter, 
valueWriter);
+
+            Map<StringData, Integer> map = new HashMap<>();
+            map.put(StringData.fromString("a"), 1);
+            map.put(StringData.fromString("b"), 2);
+
+            GenericRowData row = new GenericRowData(1);
+            row.setField(0, new GenericMapData(map));
+            writer.write(row, 0);
+
+            writer.finish();
+
+            assertEquals(1, mapVector.getValueCount());
+            assertFalse(mapVector.isNull(0));
+        }
+    }
+
+    @Test
+    public void testWriteNullMapFromRowData() {
+        RowType rowType = RowType.of(
+                new LogicalType[] {new MapType(new VarCharType(100), new 
IntType())}, new String[] {"f_map"});
+        Schema schema = FlinkArrowUtils.toArrowSchema(rowType);
+
+        try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, 
allocator)) {
+            MapVector mapVector = (MapVector) root.getVector("f_map");
+            mapVector.allocateNew();
+
+            StructVector entriesVector = (StructVector) 
mapVector.getDataVector();
+            ArrowFieldWriter<ArrayData> keyWriter = VarCharWriter.forArray(
+                    (org.apache.arrow.vector.VarCharVector) 
entriesVector.getChild(MapVector.KEY_NAME));
+            ArrowFieldWriter<ArrayData> valueWriter = IntWriter.forArray(
+                    (org.apache.arrow.vector.IntVector) 
entriesVector.getChild(MapVector.VALUE_NAME));
+
+            MapWriter<RowData> writer = MapWriter.forRow(mapVector, keyWriter, 
valueWriter);
+
+            GenericRowData row = new GenericRowData(1);
+            row.setField(0, null);
+            writer.write(row, 0);
+
+            writer.finish();
+
+            assertEquals(1, mapVector.getValueCount());
+            assertTrue(mapVector.isNull(0));
+        }
+    }
+
+    @Test
+    public void testWriteEmptyMapFromRowData() {
+        RowType rowType = RowType.of(
+                new LogicalType[] {new MapType(new VarCharType(100), new 
IntType())}, new String[] {"f_map"});
+        Schema schema = FlinkArrowUtils.toArrowSchema(rowType);
+
+        try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, 
allocator)) {
+            MapVector mapVector = (MapVector) root.getVector("f_map");
+            mapVector.allocateNew();
+
+            StructVector entriesVector = (StructVector) 
mapVector.getDataVector();
+            ArrowFieldWriter<ArrayData> keyWriter = VarCharWriter.forArray(
+                    (org.apache.arrow.vector.VarCharVector) 
entriesVector.getChild(MapVector.KEY_NAME));
+            ArrowFieldWriter<ArrayData> valueWriter = IntWriter.forArray(
+                    (org.apache.arrow.vector.IntVector) 
entriesVector.getChild(MapVector.VALUE_NAME));
+
+            MapWriter<RowData> writer = MapWriter.forRow(mapVector, keyWriter, 
valueWriter);
+
+            GenericRowData row = new GenericRowData(1);
+            row.setField(0, new GenericMapData(new HashMap<>()));
+            writer.write(row, 0);
+
+            writer.finish();
+
+            assertEquals(1, mapVector.getValueCount());
+            assertFalse(mapVector.isNull(0));
+        }
+    }
+}
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/writers/RowWriterTest.java
 
b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/writers/RowWriterTest.java
new file mode 100644
index 00000000..619fb7ad
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/writers/RowWriterTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.writers;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.StructVector;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.auron.flink.arrow.FlinkArrowUtils;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/** Unit tests for {@link RowWriter}. */
+public class RowWriterTest {
+
+    private BufferAllocator allocator;
+
+    @BeforeEach
+    public void setUp() {
+        allocator = new RootAllocator(Long.MAX_VALUE);
+    }
+
+    @AfterEach
+    public void tearDown() {
+        allocator.close();
+    }
+
+    @Test
+    public void testWriteRowFromRowData() {
+        RowType innerType =
+                RowType.of(new LogicalType[] {new IntType(), new 
VarCharType(100)}, new String[] {"id", "name"});
+        RowType outerType = RowType.of(new LogicalType[] {innerType}, new 
String[] {"f_row"});
+        Schema schema = FlinkArrowUtils.toArrowSchema(outerType);
+
+        try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, 
allocator)) {
+            StructVector structVector = (StructVector) root.getVector("f_row");
+            structVector.allocateNew();
+
+            @SuppressWarnings("unchecked")
+            ArrowFieldWriter<RowData>[] fieldsWriters = new ArrowFieldWriter[] 
{
+                IntWriter.forRow((IntVector) structVector.getChild("id")),
+                VarCharWriter.forRow((VarCharVector) 
structVector.getChild("name"))
+            };
+
+            RowWriter<RowData> writer = RowWriter.forRow(structVector, 
fieldsWriters);
+
+            GenericRowData innerRow = GenericRowData.of(42, 
StringData.fromString("hello"));
+            GenericRowData outerRow = new GenericRowData(1);
+            outerRow.setField(0, innerRow);
+            writer.write(outerRow, 0);
+
+            writer.finish();
+
+            assertEquals(1, structVector.getValueCount());
+            assertFalse(structVector.isNull(0));
+            IntVector idVector = (IntVector) structVector.getChild("id");
+            VarCharVector nameVector = (VarCharVector) 
structVector.getChild("name");
+            assertEquals(42, idVector.get(0));
+            assertArrayEquals("hello".getBytes(), nameVector.get(0));
+        }
+    }
+
+    @Test
+    public void testWriteNullRowFromRowData() {
+        RowType innerType = RowType.of(new LogicalType[] {new IntType()}, new 
String[] {"id"});
+        RowType outerType = RowType.of(new LogicalType[] {innerType}, new 
String[] {"f_row"});
+        Schema schema = FlinkArrowUtils.toArrowSchema(outerType);
+
+        try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, 
allocator)) {
+            StructVector structVector = (StructVector) root.getVector("f_row");
+            structVector.allocateNew();
+
+            @SuppressWarnings("unchecked")
+            ArrowFieldWriter<RowData>[] fieldsWriters =
+                    new ArrowFieldWriter[] {IntWriter.forRow((IntVector) 
structVector.getChild("id"))};
+
+            RowWriter<RowData> writer = RowWriter.forRow(structVector, 
fieldsWriters);
+
+            GenericRowData outerRow = new GenericRowData(1);
+            outerRow.setField(0, null);
+            writer.write(outerRow, 0);
+
+            writer.finish();
+
+            assertEquals(1, structVector.getValueCount());
+            assertTrue(structVector.isNull(0));
+        }
+    }
+
+    @Test
+    public void testWriteMultipleRowsFromRowData() {
+        RowType innerType = RowType.of(new LogicalType[] {new IntType()}, new 
String[] {"val"});
+        RowType outerType = RowType.of(new LogicalType[] {innerType}, new 
String[] {"f_row"});
+        Schema schema = FlinkArrowUtils.toArrowSchema(outerType);
+
+        try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, 
allocator)) {
+            StructVector structVector = (StructVector) root.getVector("f_row");
+            structVector.allocateNew();
+
+            @SuppressWarnings("unchecked")
+            ArrowFieldWriter<RowData>[] fieldsWriters =
+                    new ArrowFieldWriter[] {IntWriter.forRow((IntVector) 
structVector.getChild("val"))};
+
+            RowWriter<RowData> writer = RowWriter.forRow(structVector, 
fieldsWriters);
+
+            writer.write(GenericRowData.of((Object) GenericRowData.of(1)), 0);
+            writer.write(GenericRowData.of((Object) GenericRowData.of(2)), 0);
+            writer.write(GenericRowData.of((Object) GenericRowData.of(3)), 0);
+
+            writer.finish();
+
+            assertEquals(3, structVector.getValueCount());
+            IntVector valVector = (IntVector) structVector.getChild("val");
+            assertEquals(1, valVector.get(0));
+            assertEquals(2, valVector.get(1));
+            assertEquals(3, valVector.get(2));
+        }
+    }
+}
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/writers/TimeWriterTest.java
 
b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/writers/TimeWriterTest.java
new file mode 100644
index 00000000..120ea5a2
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/writers/TimeWriterTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.writers;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.TimeMicroVector;
+import org.apache.arrow.vector.TimeMilliVector;
+import org.apache.arrow.vector.TimeNanoVector;
+import org.apache.arrow.vector.TimeSecVector;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/** Unit tests for {@link TimeWriter}. */
+public class TimeWriterTest {
+
+    private BufferAllocator allocator;
+
+    @BeforeEach
+    public void setUp() {
+        allocator = new RootAllocator(Long.MAX_VALUE);
+    }
+
+    @AfterEach
+    public void tearDown() {
+        allocator.close();
+    }
+
+    @Test
+    public void testWriteSecondPrecisionFromRowData() {
+        try (TimeSecVector vector = new TimeSecVector("test_time_sec", 
allocator)) {
+            vector.allocateNew();
+            TimeWriter<RowData> writer = TimeWriter.forRow(vector);
+
+            // 3661000ms = 1h 1m 1s → 3661s
+            writer.write(GenericRowData.of(3661000), 0);
+            writer.finish();
+
+            assertEquals(1, vector.getValueCount());
+            assertEquals(3661, vector.get(0));
+        }
+    }
+
+    @Test
+    public void testWriteMillisecondPrecisionFromRowData() {
+        try (TimeMilliVector vector = new TimeMilliVector("test_time_milli", 
allocator)) {
+            vector.allocateNew();
+            TimeWriter<RowData> writer = TimeWriter.forRow(vector);
+
+            writer.write(GenericRowData.of(3661000), 0);
+            writer.write(GenericRowData.of(0), 0);
+            writer.write(GenericRowData.of(86399999), 0);
+            writer.finish();
+
+            assertEquals(3, vector.getValueCount());
+            assertEquals(3661000, vector.get(0));
+            assertEquals(0, vector.get(1));
+            assertEquals(86399999, vector.get(2));
+        }
+    }
+
+    @Test
+    public void testWriteMicrosecondPrecisionFromRowData() {
+        try (TimeMicroVector vector = new TimeMicroVector("test_time_micro", 
allocator)) {
+            vector.allocateNew();
+            TimeWriter<RowData> writer = TimeWriter.forRow(vector);
+
+            writer.write(GenericRowData.of(3661000), 0);
+            writer.finish();
+
+            assertEquals(1, vector.getValueCount());
+            assertEquals(3661000L * 1000L, vector.get(0));
+        }
+    }
+
+    @Test
+    public void testWriteNanosecondPrecisionFromRowData() {
+        try (TimeNanoVector vector = new TimeNanoVector("test_time_nano", 
allocator)) {
+            vector.allocateNew();
+            TimeWriter<RowData> writer = TimeWriter.forRow(vector);
+
+            writer.write(GenericRowData.of(3661000), 0);
+            writer.finish();
+
+            assertEquals(1, vector.getValueCount());
+            assertEquals(3661000L * 1_000_000L, vector.get(0));
+        }
+    }
+
+    @Test
+    public void testWriteNullFromRowData() {
+        try (TimeMilliVector vector = new TimeMilliVector("test_time_null", 
allocator)) {
+            vector.allocateNew();
+            TimeWriter<RowData> writer = TimeWriter.forRow(vector);
+
+            writer.write(GenericRowData.of(3661000), 0);
+            writer.write(GenericRowData.of((Object) null), 0);
+            writer.write(GenericRowData.of(1000), 0);
+            writer.finish();
+
+            assertEquals(3, vector.getValueCount());
+            assertFalse(vector.isNull(0));
+            assertEquals(3661000, vector.get(0));
+            assertTrue(vector.isNull(1));
+            assertFalse(vector.isNull(2));
+            assertEquals(1000, vector.get(2));
+        }
+    }
+
+    @Test
+    public void testWriteFromArrayData() {
+        try (TimeMilliVector vector = new TimeMilliVector("test_time_array", 
allocator)) {
+            vector.allocateNew();
+            TimeWriter<ArrayData> writer = TimeWriter.forArray(vector);
+
+            GenericArrayData array = new GenericArrayData(new int[] {3661000, 
0, 86399999});
+
+            writer.write(array, 0);
+            writer.write(array, 1);
+            writer.write(array, 2);
+            writer.finish();
+
+            assertEquals(3, vector.getValueCount());
+            assertEquals(3661000, vector.get(0));
+            assertEquals(0, vector.get(1));
+            assertEquals(86399999, vector.get(2));
+        }
+    }
+
+    @Test
+    public void testWriteMicroFromArrayData() {
+        try (TimeMicroVector vector = new 
TimeMicroVector("test_time_micro_array", allocator)) {
+            vector.allocateNew();
+            TimeWriter<ArrayData> writer = TimeWriter.forArray(vector);
+
+            GenericArrayData array = new GenericArrayData(new int[] {5000, 
12345});
+
+            writer.write(array, 0);
+            writer.write(array, 1);
+            writer.finish();
+
+            assertEquals(2, vector.getValueCount());
+            assertEquals(5000L * 1000L, vector.get(0));
+            assertEquals(12345L * 1000L, vector.get(1));
+        }
+    }
+
+    @Test
+    public void testResetAndWriteNewBatch() {
+        try (TimeMilliVector vector = new TimeMilliVector("test_time_reset", 
allocator)) {
+            vector.allocateNew();
+            TimeWriter<RowData> writer = TimeWriter.forRow(vector);
+
+            // First batch
+            writer.write(GenericRowData.of(1000), 0);
+            writer.write(GenericRowData.of(2000), 0);
+            writer.finish();
+
+            assertEquals(2, vector.getValueCount());
+            assertEquals(1000, vector.get(0));
+            assertEquals(2000, vector.get(1));
+
+            // Reset and write second batch
+            writer.reset();
+
+            writer.write(GenericRowData.of(9000), 0);
+            writer.finish();
+
+            assertEquals(1, vector.getValueCount());
+            assertEquals(9000, vector.get(0));
+        }
+    }
+}
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/writers/TimestampWriterTest.java
 
b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/writers/TimestampWriterTest.java
new file mode 100644
index 00000000..c78cba54
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/writers/TimestampWriterTest.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.writers;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.TimeStampMicroVector;
+import org.apache.arrow.vector.TimeStampMilliVector;
+import org.apache.arrow.vector.TimeStampNanoVector;
+import org.apache.arrow.vector.TimeStampSecVector;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/** Unit tests for {@link TimestampWriter}. */
+public class TimestampWriterTest {
+
+    private BufferAllocator allocator;
+
+    @BeforeEach
+    public void setUp() {
+        allocator = new RootAllocator(Long.MAX_VALUE);
+    }
+
+    @AfterEach
+    public void tearDown() {
+        allocator.close();
+    }
+
+    @Test
+    public void testWriteSecondPrecisionFromRowData() {
+        try (TimeStampSecVector vector = new TimeStampSecVector("test_ts_sec", 
allocator)) {
+            vector.allocateNew();
+            TimestampWriter<RowData> writer = TimestampWriter.forRow(vector, 
0);
+
+            // 1704067200000ms = 2024-01-01T00:00:00Z → 1704067200 seconds
+            GenericRowData row = 
GenericRowData.of(TimestampData.fromEpochMillis(1704067200000L));
+            writer.write(row, 0);
+            writer.finish();
+
+            assertEquals(1, vector.getValueCount());
+            assertEquals(1704067200L, vector.get(0));
+        }
+    }
+
+    @Test
+    public void testWriteMillisecondPrecisionFromRowData() {
+        try (TimeStampMilliVector vector = new 
TimeStampMilliVector("test_ts_milli", allocator)) {
+            vector.allocateNew();
+            TimestampWriter<RowData> writer = TimestampWriter.forRow(vector, 
3);
+
+            GenericRowData row = 
GenericRowData.of(TimestampData.fromEpochMillis(1704067200123L));
+            writer.write(row, 0);
+            writer.finish();
+
+            assertEquals(1, vector.getValueCount());
+            assertEquals(1704067200123L, vector.get(0));
+        }
+    }
+
+    @Test
+    public void testWriteMicrosecondPrecisionFromRowData() {
+        try (TimeStampMicroVector vector = new 
TimeStampMicroVector("test_ts_micro", allocator)) {
+            vector.allocateNew();
+            TimestampWriter<RowData> writer = TimestampWriter.forRow(vector, 
6);
+
+            // millis=1704067200123, nanoOfMilli=456000 → micros = 
1704067200123*1000 + 456000/1000
+            //   = 1704067200123000 + 456 = 1704067200123456
+            GenericRowData row = 
GenericRowData.of(TimestampData.fromEpochMillis(1704067200123L, 456000));
+            writer.write(row, 0);
+            writer.finish();
+
+            assertEquals(1, vector.getValueCount());
+            assertEquals(1704067200123456L, vector.get(0));
+        }
+    }
+
+    @Test
+    public void testWriteNanosecondPrecisionFromRowData() {
+        try (TimeStampNanoVector vector = new 
TimeStampNanoVector("test_ts_nano", allocator)) {
+            vector.allocateNew();
+            TimestampWriter<RowData> writer = TimestampWriter.forRow(vector, 
9);
+
+            // millis=1704067200123, nanoOfMilli=456789 → nanos = 
1704067200123*1_000_000 + 456789
+            //   = 1704067200123000000 + 456789 = 1704067200123456789
+            GenericRowData row = 
GenericRowData.of(TimestampData.fromEpochMillis(1704067200123L, 456789));
+            writer.write(row, 0);
+            writer.finish();
+
+            assertEquals(1, vector.getValueCount());
+            assertEquals(1704067200123456789L, vector.get(0));
+        }
+    }
+
+    @Test
+    public void testWriteNullFromRowData() {
+        try (TimeStampMilliVector vector = new 
TimeStampMilliVector("test_ts_null", allocator)) {
+            vector.allocateNew();
+            TimestampWriter<RowData> writer = TimestampWriter.forRow(vector, 
3);
+
+            GenericRowData row0 = 
GenericRowData.of(TimestampData.fromEpochMillis(1000L));
+            GenericRowData row1 = GenericRowData.of((Object) null);
+            GenericRowData row2 = 
GenericRowData.of(TimestampData.fromEpochMillis(2000L));
+
+            writer.write(row0, 0);
+            writer.write(row1, 0);
+            writer.write(row2, 0);
+            writer.finish();
+
+            assertEquals(3, vector.getValueCount());
+            assertFalse(vector.isNull(0));
+            assertEquals(1000L, vector.get(0));
+            assertTrue(vector.isNull(1));
+            assertFalse(vector.isNull(2));
+            assertEquals(2000L, vector.get(2));
+        }
+    }
+
+    @Test
+    public void testWritePreEpochMicrosecond() {
+        try (TimeStampMicroVector vector = new 
TimeStampMicroVector("test_ts_pre_epoch_micro", allocator)) {
+            vector.allocateNew();
+            TimestampWriter<RowData> writer = TimestampWriter.forRow(vector, 
6);
+
+            // millis=-1, nanoOfMilli=999000 → micros = -1*1000 + 999000/1000 
= -1000 + 999 = -1
+            GenericRowData row = 
GenericRowData.of(TimestampData.fromEpochMillis(-1, 999000));
+            writer.write(row, 0);
+            writer.finish();
+
+            assertEquals(1, vector.getValueCount());
+            assertEquals(-1L, vector.get(0));
+        }
+    }
+
+    @Test
+    public void testWriteFromArrayData() {
+        try (TimeStampMilliVector vector = new 
TimeStampMilliVector("test_ts_array", allocator)) {
+            vector.allocateNew();
+            TimestampWriter<ArrayData> writer = 
TimestampWriter.forArray(vector, 3);
+
+            GenericArrayData array = new GenericArrayData(new TimestampData[] {
+                TimestampData.fromEpochMillis(1000L),
+                TimestampData.fromEpochMillis(2000L),
+                TimestampData.fromEpochMillis(3000L)
+            });
+
+            writer.write(array, 0);
+            writer.write(array, 1);
+            writer.write(array, 2);
+            writer.finish();
+
+            assertEquals(3, vector.getValueCount());
+            assertEquals(1000L, vector.get(0));
+            assertEquals(2000L, vector.get(1));
+            assertEquals(3000L, vector.get(2));
+        }
+    }
+
+    @Test
+    public void testWritePreEpochNanosecond() {
+        try (TimeStampNanoVector vector = new 
TimeStampNanoVector("test_ts_pre_epoch_nano", allocator)) {
+            vector.allocateNew();
+            TimestampWriter<RowData> writer = TimestampWriter.forRow(vector, 
9);
+
+            // millis=-1, nanoOfMilli=999000 → nanos = -1*1_000_000 + 999000 = 
-1000000 + 999000
+            //   = -1000
+            GenericRowData row = 
GenericRowData.of(TimestampData.fromEpochMillis(-1, 999000));
+            writer.write(row, 0);
+            writer.finish();
+
+            assertEquals(1, vector.getValueCount());
+            assertEquals(-1000L, vector.get(0));
+        }
+    }
+
+    @Test
+    public void testResetAndWriteNewBatch() {
+        try (TimeStampMilliVector vector = new 
TimeStampMilliVector("test_ts_reset", allocator)) {
+            vector.allocateNew();
+            TimestampWriter<RowData> writer = TimestampWriter.forRow(vector, 
3);
+
+            // First batch
+            
writer.write(GenericRowData.of(TimestampData.fromEpochMillis(1000L)), 0);
+            
writer.write(GenericRowData.of(TimestampData.fromEpochMillis(2000L)), 0);
+            writer.finish();
+
+            assertEquals(2, vector.getValueCount());
+            assertEquals(1000L, vector.get(0));
+            assertEquals(2000L, vector.get(1));
+
+            // Reset
+            writer.reset();
+
+            assertEquals(0, vector.getValueCount());
+
+            // Second batch
+            
writer.write(GenericRowData.of(TimestampData.fromEpochMillis(9999L)), 0);
+            writer.finish();
+
+            assertEquals(1, vector.getValueCount());
+            assertEquals(9999L, vector.get(0));
+        }
+    }
+}

Reply via email to