This is an automated email from the ASF dual-hosted git repository.
zhangmang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git
The following commit(s) were added to refs/heads/master by this push:
new d1ac7fe7 [AURON #1850] Add ArrowFieldWriters for temporal and
composite types (#2086)
d1ac7fe7 is described below
commit d1ac7fe7a478e989adbf7425db3874ea627fef60
Author: xTong <[email protected]>
AuthorDate: Tue Mar 17 10:24:56 2026 +0800
[AURON #1850] Add ArrowFieldWriters for temporal and composite types (#2086)
# Which issue does this PR close?
Partially addresses #1850 (Part 2b of the Flink RowData to Arrow
conversion).
# Rationale for this change
Part 2a (#2079) implemented `ArrowFieldWriter` base class, 12 basic type
writers, and `FlinkArrowWriter` orchestrator. This PR completes the
remaining 5 writer types (Time, Timestamp, Array, Map, Row), enabling
full coverage of all Flink logical types supported by the Arrow type
mapping introduced in Part 1 (#1959).
The implementation follows Flink's official `flink-python` Arrow module
as established in Part 2a, with the same `forRow()`/`forArray()`
dual-mode factory pattern and template method design.
# What changes are included in this PR?
## Commit 1: 5 ArrowFieldWriters + unit tests (10 files, +1509 lines)
- **`TimeWriter`** — Handles all 4 Arrow time precisions
(`TimeSecVector`, `TimeMilliVector`, `TimeMicroVector`,
`TimeNanoVector`) via instanceof dispatch. Flink stores TIME as int
(milliseconds), converted to each precision with `L`-suffixed literals
to avoid int overflow.
- **`TimestampWriter`** — Handles all 4 Arrow timestamp precisions.
Combines `TimestampData.getMillisecond()` (long) and
`getNanoOfMillisecond()` (int) for sub-millisecond precision.
Constructor validates `timezone == null` via `Preconditions.checkState`,
matching Flink official — timezone is not handled at the writer layer.
- **`ArrayWriter`** — Delegates to an `elementWriter`
(`ArrowFieldWriter<ArrayData>`) for each array element. Overrides
`finish()`/`reset()` to propagate to the element writer.
- **`MapWriter`** — Arrow maps are `List<Struct{key, value}>`. Holds
separate key and value writers operating on `ArrayData`. Sets
`structVector.setIndexDefined()` for each entry. Overrides
`finish()`/`reset()` to propagate to key/value writers.
- **`RowWriter`** — Nested struct handling with
`ArrowFieldWriter<RowData>[]` for child fields. Caches a `nullRow`
(`GenericRowData`) in the constructor for null struct handling (avoids
per-call allocation). Uses a single child-write loop for both null and
non-null paths, matching Flink official.
- **Unit tests**: `TimeWriterTest` (8), `TimestampWriterTest` (9),
`ArrayWriterTest` (5), `MapWriterTest` (3), `RowWriterTest` (3) — 28
tests covering all precisions, null handling, reset/multi-batch, edge
cases (pre-epoch timestamps, empty arrays/maps).
## Commit 2: Factory method extension + integration test (2 files, +158
lines)
- **`FlinkArrowUtils`** — Extended `createArrowFieldWriterForRow()` and
`createArrowFieldWriterForArray()` with branches for `TimeWriter`,
`TimestampWriter`, `ArrayWriter`, `MapWriter`, `RowWriter`. MapVector
check is placed before ListVector (since `MapVector extends
ListVector`). Timestamp branch extracts precision from both
`TimestampType` and `LocalZonedTimestampType`.
- **`FlinkArrowWriterTest`** — Added `testWriteTemporalAndComplexTypes`
integration test covering TIME(6), TIMESTAMP(6), TIMESTAMP_LTZ(3),
ARRAY\<INT\>, MAP\<VARCHAR, INT\>, ROW\<nested_id INT\>. Updated
`testUnsupportedTypeThrows` to use `MultisetType` (since `ArrayType` is
now supported).
# Scope
This PR completes all Flink-to-Arrow writer types. The remaining work
for #1850 is the reverse direction (Arrow-to-Flink reader), which is
tracked separately.
# Are there any user-facing changes?
No. Internal API for Flink integration.
# How was this patch tested?
36 tests across 6 test classes (28 new + 8 existing):
```bash
./build/mvn test -Pflink-1.18 -Pspark-3.5 -Pscala-2.12 \
-pl auron-flink-extension/auron-flink-runtime -am -DskipBuildNative
```
Result: 36 pass, 0 failures.
---
.../apache/auron/flink/arrow/FlinkArrowUtils.java | 90 +++++++++
.../auron/flink/arrow/writers/ArrayWriter.java | 114 +++++++++++
.../auron/flink/arrow/writers/MapWriter.java | 133 ++++++++++++
.../auron/flink/arrow/writers/RowWriter.java | 125 ++++++++++++
.../auron/flink/arrow/writers/TimeWriter.java | 120 +++++++++++
.../auron/flink/arrow/writers/TimestampWriter.java | 126 ++++++++++++
.../auron/flink/arrow/FlinkArrowWriterTest.java | 74 ++++++-
.../auron/flink/arrow/writers/ArrayWriterTest.java | 185 +++++++++++++++++
.../auron/flink/arrow/writers/MapWriterTest.java | 149 ++++++++++++++
.../auron/flink/arrow/writers/RowWriterTest.java | 146 +++++++++++++
.../auron/flink/arrow/writers/TimeWriterTest.java | 194 ++++++++++++++++++
.../flink/arrow/writers/TimestampWriterTest.java | 225 +++++++++++++++++++++
12 files changed, 1678 insertions(+), 3 deletions(-)
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java
index ec027ec4..ccaf4f5e 100644
---
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java
@@ -29,11 +29,18 @@ import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.NullVector;
import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.TimeMicroVector;
+import org.apache.arrow.vector.TimeMilliVector;
+import org.apache.arrow.vector.TimeNanoVector;
+import org.apache.arrow.vector.TimeSecVector;
+import org.apache.arrow.vector.TimeStampVector;
import org.apache.arrow.vector.TinyIntVector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.complex.MapVector;
+import org.apache.arrow.vector.complex.StructVector;
import org.apache.arrow.vector.types.DateUnit;
import org.apache.arrow.vector.types.FloatingPointPrecision;
import org.apache.arrow.vector.types.TimeUnit;
@@ -41,6 +48,7 @@ import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.auron.flink.arrow.writers.ArrayWriter;
import org.apache.auron.flink.arrow.writers.ArrowFieldWriter;
import org.apache.auron.flink.arrow.writers.BigIntWriter;
import org.apache.auron.flink.arrow.writers.BooleanWriter;
@@ -49,8 +57,12 @@ import org.apache.auron.flink.arrow.writers.DecimalWriter;
import org.apache.auron.flink.arrow.writers.DoubleWriter;
import org.apache.auron.flink.arrow.writers.FloatWriter;
import org.apache.auron.flink.arrow.writers.IntWriter;
+import org.apache.auron.flink.arrow.writers.MapWriter;
import org.apache.auron.flink.arrow.writers.NullWriter;
+import org.apache.auron.flink.arrow.writers.RowWriter;
import org.apache.auron.flink.arrow.writers.SmallIntWriter;
+import org.apache.auron.flink.arrow.writers.TimeWriter;
+import org.apache.auron.flink.arrow.writers.TimestampWriter;
import org.apache.auron.flink.arrow.writers.TinyIntWriter;
import org.apache.auron.flink.arrow.writers.VarBinaryWriter;
import org.apache.auron.flink.arrow.writers.VarCharWriter;
@@ -283,6 +295,45 @@ public final class FlinkArrowUtils {
return DecimalWriter.forRow((DecimalVector) vector,
decimalType.getPrecision(), decimalType.getScale());
} else if (vector instanceof DateDayVector) {
return DateWriter.forRow((DateDayVector) vector);
+ } else if (vector instanceof TimeSecVector
+ || vector instanceof TimeMilliVector
+ || vector instanceof TimeMicroVector
+ || vector instanceof TimeNanoVector) {
+ return TimeWriter.forRow(vector);
+ } else if (vector instanceof TimeStampVector) {
+ int precision;
+ if (fieldType instanceof LocalZonedTimestampType) {
+ precision = ((LocalZonedTimestampType)
fieldType).getPrecision();
+ } else {
+ precision = ((TimestampType) fieldType).getPrecision();
+ }
+ return TimestampWriter.forRow(vector, precision);
+ } else if (vector instanceof MapVector) {
+ // MapVector extends ListVector, so this check must come before
ListVector
+ MapVector mapVector = (MapVector) vector;
+ MapType mapType = (MapType) fieldType;
+ StructVector entriesVector = (StructVector)
mapVector.getDataVector();
+ ArrowFieldWriter<ArrayData> keyWriter =
+
createArrowFieldWriterForArray(entriesVector.getChild(MapVector.KEY_NAME),
mapType.getKeyType());
+ ArrowFieldWriter<ArrayData> valueWriter =
createArrowFieldWriterForArray(
+ entriesVector.getChild(MapVector.VALUE_NAME),
mapType.getValueType());
+ return MapWriter.forRow(mapVector, keyWriter, valueWriter);
+ } else if (vector instanceof ListVector) {
+ ListVector listVector = (ListVector) vector;
+ ArrayType arrayType = (ArrayType) fieldType;
+ ArrowFieldWriter<ArrayData> elementWriter =
+ createArrowFieldWriterForArray(listVector.getDataVector(),
arrayType.getElementType());
+ return ArrayWriter.forRow(listVector, elementWriter);
+ } else if (vector instanceof StructVector) {
+ StructVector structVector = (StructVector) vector;
+ RowType rowType = (RowType) fieldType;
+ @SuppressWarnings("unchecked")
+ ArrowFieldWriter<RowData>[] fieldsWriters = new
ArrowFieldWriter[rowType.getFieldCount()];
+ for (int i = 0; i < fieldsWriters.length; i++) {
+ fieldsWriters[i] =
+
createArrowFieldWriterForRow(structVector.getChildByOrdinal(i),
rowType.getTypeAt(i));
+ }
+ return RowWriter.forRow(structVector, fieldsWriters);
} else {
throw new UnsupportedOperationException(
"Unsupported vector type: " +
vector.getClass().getSimpleName());
@@ -323,6 +374,45 @@ public final class FlinkArrowUtils {
return DecimalWriter.forArray((DecimalVector) vector,
decimalType.getPrecision(), decimalType.getScale());
} else if (vector instanceof DateDayVector) {
return DateWriter.forArray((DateDayVector) vector);
+ } else if (vector instanceof TimeSecVector
+ || vector instanceof TimeMilliVector
+ || vector instanceof TimeMicroVector
+ || vector instanceof TimeNanoVector) {
+ return TimeWriter.forArray(vector);
+ } else if (vector instanceof TimeStampVector) {
+ int precision;
+ if (fieldType instanceof LocalZonedTimestampType) {
+ precision = ((LocalZonedTimestampType)
fieldType).getPrecision();
+ } else {
+ precision = ((TimestampType) fieldType).getPrecision();
+ }
+ return TimestampWriter.forArray(vector, precision);
+ } else if (vector instanceof MapVector) {
+ // MapVector extends ListVector, so this check must come before
ListVector
+ MapVector mapVector = (MapVector) vector;
+ MapType mapType = (MapType) fieldType;
+ StructVector entriesVector = (StructVector)
mapVector.getDataVector();
+ ArrowFieldWriter<ArrayData> keyWriter =
+
createArrowFieldWriterForArray(entriesVector.getChild(MapVector.KEY_NAME),
mapType.getKeyType());
+ ArrowFieldWriter<ArrayData> valueWriter =
createArrowFieldWriterForArray(
+ entriesVector.getChild(MapVector.VALUE_NAME),
mapType.getValueType());
+ return MapWriter.forArray(mapVector, keyWriter, valueWriter);
+ } else if (vector instanceof ListVector) {
+ ListVector listVector = (ListVector) vector;
+ ArrayType arrayType = (ArrayType) fieldType;
+ ArrowFieldWriter<ArrayData> elementWriter =
+ createArrowFieldWriterForArray(listVector.getDataVector(),
arrayType.getElementType());
+ return ArrayWriter.forArray(listVector, elementWriter);
+ } else if (vector instanceof StructVector) {
+ StructVector structVector = (StructVector) vector;
+ RowType rowType = (RowType) fieldType;
+ @SuppressWarnings("unchecked")
+ ArrowFieldWriter<RowData>[] fieldsWriters = new
ArrowFieldWriter[rowType.getFieldCount()];
+ for (int i = 0; i < fieldsWriters.length; i++) {
+ fieldsWriters[i] =
+
createArrowFieldWriterForRow(structVector.getChildByOrdinal(i),
rowType.getTypeAt(i));
+ }
+ return RowWriter.forArray(structVector, fieldsWriters);
} else {
throw new UnsupportedOperationException(
"Unsupported vector type: " +
vector.getClass().getSimpleName());
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/ArrayWriter.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/ArrayWriter.java
new file mode 100644
index 00000000..f876d799
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/ArrayWriter.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.writers;
+
+import java.util.Objects;
+import org.apache.arrow.vector.complex.ListVector;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * {@link ArrowFieldWriter} for arrays ({@link ListVector}).
+ *
+ * <p>Holds an {@code elementWriter} that writes each array element. The
element writer operates on
+ * {@link ArrayData} since array elements are accessed via {@link ArrayData}
interface.
+ *
+ * @param <T> the input data type
+ */
+public abstract class ArrayWriter<T> extends ArrowFieldWriter<T> {
+
+ /** Creates an ArrayWriter that reads from {@link RowData}. */
+ public static ArrayWriter<RowData> forRow(ListVector listVector,
ArrowFieldWriter<ArrayData> elementWriter) {
+ return new ArrayWriterForRow(listVector, elementWriter);
+ }
+
+ /** Creates an ArrayWriter that reads from {@link ArrayData}. */
+ public static ArrayWriter<ArrayData> forArray(ListVector listVector,
ArrowFieldWriter<ArrayData> elementWriter) {
+ return new ArrayWriterForArray(listVector, elementWriter);
+ }
+
+ //
------------------------------------------------------------------------------------------
+
+ private final ArrowFieldWriter<ArrayData> elementWriter;
+
+ private ArrayWriter(ListVector listVector, ArrowFieldWriter<ArrayData>
elementWriter) {
+ super(listVector);
+ this.elementWriter = Objects.requireNonNull(elementWriter);
+ }
+
+ abstract boolean isNullAt(T in, int ordinal);
+
+ abstract ArrayData readArray(T in, int ordinal);
+
+ @Override
+ public void doWrite(T in, int ordinal) {
+ if (!isNullAt(in, ordinal)) {
+ ((ListVector) getValueVector()).startNewValue(getCount());
+ ArrayData array = readArray(in, ordinal);
+ for (int i = 0; i < array.size(); i++) {
+ elementWriter.write(array, i);
+ }
+ ((ListVector) getValueVector()).endValue(getCount(), array.size());
+ }
+ }
+
+ @Override
+ public void finish() {
+ super.finish();
+ elementWriter.finish();
+ }
+
+ @Override
+ public void reset() {
+ super.reset();
+ elementWriter.reset();
+ }
+
+ // ---- Inner classes for RowData and ArrayData ----
+
+ public static final class ArrayWriterForRow extends ArrayWriter<RowData> {
+ private ArrayWriterForRow(ListVector listVector,
ArrowFieldWriter<ArrayData> elementWriter) {
+ super(listVector, elementWriter);
+ }
+
+ @Override
+ boolean isNullAt(RowData in, int ordinal) {
+ return in.isNullAt(ordinal);
+ }
+
+ @Override
+ ArrayData readArray(RowData in, int ordinal) {
+ return in.getArray(ordinal);
+ }
+ }
+
+ public static final class ArrayWriterForArray extends
ArrayWriter<ArrayData> {
+ private ArrayWriterForArray(ListVector listVector,
ArrowFieldWriter<ArrayData> elementWriter) {
+ super(listVector, elementWriter);
+ }
+
+ @Override
+ boolean isNullAt(ArrayData in, int ordinal) {
+ return in.isNullAt(ordinal);
+ }
+
+ @Override
+ ArrayData readArray(ArrayData in, int ordinal) {
+ return in.getArray(ordinal);
+ }
+ }
+}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/MapWriter.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/MapWriter.java
new file mode 100644
index 00000000..f7201e17
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/MapWriter.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.writers;
+
+import java.util.Objects;
+import org.apache.arrow.vector.complex.MapVector;
+import org.apache.arrow.vector.complex.StructVector;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * {@link ArrowFieldWriter} for maps ({@link MapVector}).
+ *
+ * <p>Arrow represents maps as {@code List<Struct{key, value}>}. This writer
holds separate key and
+ * value writers that operate on {@link ArrayData} (from {@link
MapData#keyArray()} and {@link
+ * MapData#valueArray()}).
+ *
+ * @param <T> the input data type
+ */
+public abstract class MapWriter<T> extends ArrowFieldWriter<T> {
+
+ /** Creates a MapWriter that reads from {@link RowData}. */
+ public static MapWriter<RowData> forRow(
+ MapVector mapVector, ArrowFieldWriter<ArrayData> keyWriter,
ArrowFieldWriter<ArrayData> valueWriter) {
+ return new MapWriterForRow(mapVector, keyWriter, valueWriter);
+ }
+
+ /** Creates a MapWriter that reads from {@link ArrayData}. */
+ public static MapWriter<ArrayData> forArray(
+ MapVector mapVector, ArrowFieldWriter<ArrayData> keyWriter,
ArrowFieldWriter<ArrayData> valueWriter) {
+ return new MapWriterForArray(mapVector, keyWriter, valueWriter);
+ }
+
+ //
------------------------------------------------------------------------------------------
+
+ private final ArrowFieldWriter<ArrayData> keyWriter;
+ private final ArrowFieldWriter<ArrayData> valueWriter;
+
+ private MapWriter(
+ MapVector mapVector, ArrowFieldWriter<ArrayData> keyWriter,
ArrowFieldWriter<ArrayData> valueWriter) {
+ super(mapVector);
+ this.keyWriter = Objects.requireNonNull(keyWriter);
+ this.valueWriter = Objects.requireNonNull(valueWriter);
+ }
+
+ abstract boolean isNullAt(T in, int ordinal);
+
+ abstract MapData readMap(T in, int ordinal);
+
+ @Override
+ public void doWrite(T in, int ordinal) {
+ if (!isNullAt(in, ordinal)) {
+ ((MapVector) getValueVector()).startNewValue(getCount());
+
+ StructVector structVector = (StructVector) ((MapVector)
getValueVector()).getDataVector();
+ MapData map = readMap(in, ordinal);
+ ArrayData keys = map.keyArray();
+ ArrayData values = map.valueArray();
+ for (int i = 0; i < map.size(); i++) {
+ structVector.setIndexDefined(keyWriter.getCount());
+ keyWriter.write(keys, i);
+ valueWriter.write(values, i);
+ }
+
+ ((MapVector) getValueVector()).endValue(getCount(), map.size());
+ }
+ }
+
+ @Override
+ public void finish() {
+ super.finish();
+ keyWriter.finish();
+ valueWriter.finish();
+ }
+
+ @Override
+ public void reset() {
+ super.reset();
+ keyWriter.reset();
+ valueWriter.reset();
+ }
+
+ // ---- Inner classes for RowData and ArrayData ----
+
+ public static final class MapWriterForRow extends MapWriter<RowData> {
+ private MapWriterForRow(
+ MapVector mapVector, ArrowFieldWriter<ArrayData> keyWriter,
ArrowFieldWriter<ArrayData> valueWriter) {
+ super(mapVector, keyWriter, valueWriter);
+ }
+
+ @Override
+ boolean isNullAt(RowData in, int ordinal) {
+ return in.isNullAt(ordinal);
+ }
+
+ @Override
+ MapData readMap(RowData in, int ordinal) {
+ return in.getMap(ordinal);
+ }
+ }
+
+ public static final class MapWriterForArray extends MapWriter<ArrayData> {
+ private MapWriterForArray(
+ MapVector mapVector, ArrowFieldWriter<ArrayData> keyWriter,
ArrowFieldWriter<ArrayData> valueWriter) {
+ super(mapVector, keyWriter, valueWriter);
+ }
+
+ @Override
+ boolean isNullAt(ArrayData in, int ordinal) {
+ return in.isNullAt(ordinal);
+ }
+
+ @Override
+ MapData readMap(ArrayData in, int ordinal) {
+ return in.getMap(ordinal);
+ }
+ }
+}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/RowWriter.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/RowWriter.java
new file mode 100644
index 00000000..f2fad389
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/RowWriter.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.writers;
+
+import org.apache.arrow.vector.complex.StructVector;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * {@link ArrowFieldWriter} for nested rows ({@link StructVector}).
+ *
+ * <p>Holds an array of field writers ({@code ArrowFieldWriter<RowData>[]}) —
one per struct field.
+ * Both {@link #forRow} and {@link #forArray} accept {@code
ArrowFieldWriter<RowData>[]} because
+ * nested rows are always accessed as {@link RowData} (via {@link
RowData#getRow} or {@link
+ * ArrayData#getRow}).
+ *
+ * @param <T> the input data type
+ */
+public abstract class RowWriter<T> extends ArrowFieldWriter<T> {
+
+ /** Creates a RowWriter that reads from {@link RowData}. */
+ public static RowWriter<RowData> forRow(StructVector structVector,
ArrowFieldWriter<RowData>[] fieldsWriters) {
+ return new RowWriterForRow(structVector, fieldsWriters);
+ }
+
+ /** Creates a RowWriter that reads from {@link ArrayData}. */
+ public static RowWriter<ArrayData> forArray(StructVector structVector,
ArrowFieldWriter<RowData>[] fieldsWriters) {
+ return new RowWriterForArray(structVector, fieldsWriters);
+ }
+
+ //
------------------------------------------------------------------------------------------
+
+ protected final ArrowFieldWriter<RowData>[] fieldsWriters;
+ private final GenericRowData nullRow;
+
+ private RowWriter(StructVector structVector, ArrowFieldWriter<RowData>[]
fieldsWriters) {
+ super(structVector);
+ this.fieldsWriters = fieldsWriters;
+ this.nullRow = new GenericRowData(fieldsWriters.length);
+ }
+
+ abstract boolean isNullAt(T in, int ordinal);
+
+ abstract RowData readRow(T in, int ordinal);
+
+ @Override
+ public void doWrite(T in, int ordinal) {
+ RowData row;
+ if (isNullAt(in, ordinal)) {
+ row = nullRow;
+ ((StructVector) getValueVector()).setNull(getCount());
+ } else {
+ row = readRow(in, ordinal);
+ ((StructVector) getValueVector()).setIndexDefined(getCount());
+ }
+ for (int i = 0; i < fieldsWriters.length; i++) {
+ fieldsWriters[i].write(row, i);
+ }
+ }
+
+ @Override
+ public void finish() {
+ super.finish();
+ for (ArrowFieldWriter<?> fieldsWriter : fieldsWriters) {
+ fieldsWriter.finish();
+ }
+ }
+
+ @Override
+ public void reset() {
+ super.reset();
+ for (ArrowFieldWriter<?> fieldsWriter : fieldsWriters) {
+ fieldsWriter.reset();
+ }
+ }
+
+ // ---- Inner classes for RowData and ArrayData ----
+
+ public static final class RowWriterForRow extends RowWriter<RowData> {
+ private RowWriterForRow(StructVector structVector,
ArrowFieldWriter<RowData>[] fieldsWriters) {
+ super(structVector, fieldsWriters);
+ }
+
+ @Override
+ boolean isNullAt(RowData in, int ordinal) {
+ return in.isNullAt(ordinal);
+ }
+
+ @Override
+ RowData readRow(RowData in, int ordinal) {
+ return in.getRow(ordinal, fieldsWriters.length);
+ }
+ }
+
+ public static final class RowWriterForArray extends RowWriter<ArrayData> {
+ private RowWriterForArray(StructVector structVector,
ArrowFieldWriter<RowData>[] fieldsWriters) {
+ super(structVector, fieldsWriters);
+ }
+
+ @Override
+ boolean isNullAt(ArrayData in, int ordinal) {
+ return in.isNullAt(ordinal);
+ }
+
+ @Override
+ RowData readRow(ArrayData in, int ordinal) {
+ return in.getRow(ordinal, fieldsWriters.length);
+ }
+ }
+}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/TimeWriter.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/TimeWriter.java
new file mode 100644
index 00000000..dedceebc
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/TimeWriter.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.writers;
+
+import org.apache.arrow.vector.BaseFixedWidthVector;
+import org.apache.arrow.vector.TimeMicroVector;
+import org.apache.arrow.vector.TimeMilliVector;
+import org.apache.arrow.vector.TimeNanoVector;
+import org.apache.arrow.vector.TimeSecVector;
+import org.apache.arrow.vector.ValueVector;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * {@link ArrowFieldWriter} for time values stored in Arrow time vectors.
+ *
+ * <p>Supports all four Arrow time precisions: {@link TimeSecVector}, {@link
TimeMilliVector}, {@link
+ * TimeMicroVector}, and {@link TimeNanoVector}. Flink internally stores TIME
values as milliseconds
+ * in an {@code int}; this writer converts to the target precision on write.
+ *
+ * <p>Use {@link #forRow(ValueVector)} when writing from {@link RowData} and
{@link
+ * #forArray(ValueVector)} when writing from {@link ArrayData}.
+ *
+ * @param <T> the input data type
+ */
+public abstract class TimeWriter<T> extends ArrowFieldWriter<T> {
+
+ /** Creates a TimeWriter that reads from {@link RowData}. */
+ public static TimeWriter<RowData> forRow(ValueVector valueVector) {
+ return new TimeWriterForRow(valueVector);
+ }
+
+ /** Creates a TimeWriter that reads from {@link ArrayData}. */
+ public static TimeWriter<ArrayData> forArray(ValueVector valueVector) {
+ return new TimeWriterForArray(valueVector);
+ }
+
+ private TimeWriter(ValueVector valueVector) {
+ super(valueVector);
+ Preconditions.checkState(
+ valueVector instanceof TimeSecVector
+ || valueVector instanceof TimeMilliVector
+ || valueVector instanceof TimeMicroVector
+ || valueVector instanceof TimeNanoVector,
+ "Expected a time vector
(TimeSecVector/TimeMilliVector/TimeMicroVector/TimeNanoVector), got: %s",
+ valueVector.getClass().getSimpleName());
+ }
+
+ abstract boolean isNullAt(T in, int ordinal);
+
+ abstract int readTime(T in, int ordinal);
+
+ @Override
+ public void doWrite(T in, int ordinal) {
+ ValueVector vector = getValueVector();
+ if (isNullAt(in, ordinal)) {
+ ((BaseFixedWidthVector) vector).setNull(getCount());
+ } else {
+ int millis = readTime(in, ordinal);
+ if (vector instanceof TimeSecVector) {
+ ((TimeSecVector) vector).setSafe(getCount(), millis / 1000);
+ } else if (vector instanceof TimeMilliVector) {
+ ((TimeMilliVector) vector).setSafe(getCount(), millis);
+ } else if (vector instanceof TimeMicroVector) {
+ ((TimeMicroVector) vector).setSafe(getCount(), millis * 1000L);
+ } else if (vector instanceof TimeNanoVector) {
+ ((TimeNanoVector) vector).setSafe(getCount(), millis *
1_000_000L);
+ }
+ }
+ }
+
+ // ---- Inner classes for RowData and ArrayData ----
+
+ public static final class TimeWriterForRow extends TimeWriter<RowData> {
+ private TimeWriterForRow(ValueVector valueVector) {
+ super(valueVector);
+ }
+
+ @Override
+ boolean isNullAt(RowData in, int ordinal) {
+ return in.isNullAt(ordinal);
+ }
+
+ @Override
+ int readTime(RowData in, int ordinal) {
+ return in.getInt(ordinal);
+ }
+ }
+
+ public static final class TimeWriterForArray extends TimeWriter<ArrayData>
{
+ private TimeWriterForArray(ValueVector valueVector) {
+ super(valueVector);
+ }
+
+ @Override
+ boolean isNullAt(ArrayData in, int ordinal) {
+ return in.isNullAt(ordinal);
+ }
+
+ @Override
+ int readTime(ArrayData in, int ordinal) {
+ return in.getInt(ordinal);
+ }
+ }
+}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/TimestampWriter.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/TimestampWriter.java
new file mode 100644
index 00000000..6a927938
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/TimestampWriter.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.writers;
+
+import org.apache.arrow.vector.TimeStampMicroVector;
+import org.apache.arrow.vector.TimeStampMilliVector;
+import org.apache.arrow.vector.TimeStampNanoVector;
+import org.apache.arrow.vector.TimeStampSecVector;
+import org.apache.arrow.vector.TimeStampVector;
+import org.apache.arrow.vector.ValueVector;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * {@link ArrowFieldWriter} for timestamps ({@link TimeStampSecVector}, {@link
+ * TimeStampMilliVector}, {@link TimeStampMicroVector}, {@link
TimeStampNanoVector}).
+ *
+ * <p>Handles both {@code TimestampType} and {@code LocalZonedTimestampType} —
time zone is not
+ * handled at the writer layer.
+ *
+ * <p>Use {@link #forRow(ValueVector, int)} when writing from {@link RowData}
and {@link
+ * #forArray(ValueVector, int)} when writing from {@link ArrayData}.
+ *
+ * @param <T> the input data type
+ */
+public abstract class TimestampWriter<T> extends ArrowFieldWriter<T> {
+
+ protected final int precision;
+
+ /** Creates a TimestampWriter that reads from {@link RowData}. */
+ public static TimestampWriter<RowData> forRow(ValueVector valueVector, int
precision) {
+ return new TimestampWriterForRow(valueVector, precision);
+ }
+
+ /** Creates a TimestampWriter that reads from {@link ArrayData}. */
+ public static TimestampWriter<ArrayData> forArray(ValueVector valueVector,
int precision) {
+ return new TimestampWriterForArray(valueVector, precision);
+ }
+
+ private TimestampWriter(ValueVector valueVector, int precision) {
+ super(valueVector);
+ Preconditions.checkState(
+ valueVector instanceof TimeStampVector
+ && ((ArrowType.Timestamp)
valueVector.getField().getType()).getTimezone() == null,
+ "Unexpected vector type or timezone for TimestampWriter: %s",
+ valueVector.getClass().getSimpleName());
+ this.precision = precision;
+ }
+
+ abstract boolean isNullAt(T in, int ordinal);
+
+ abstract TimestampData readTimestamp(T in, int ordinal);
+
+ @Override
+ public void doWrite(T in, int ordinal) {
+ if (isNullAt(in, ordinal)) {
+ ((TimeStampVector) getValueVector()).setNull(getCount());
+ } else {
+ TimestampData ts = readTimestamp(in, ordinal);
+ long millis = ts.getMillisecond();
+ int nanoOfMilli = ts.getNanoOfMillisecond();
+ ValueVector vector = getValueVector();
+
+ if (vector instanceof TimeStampSecVector) {
+ ((TimeStampSecVector) vector).setSafe(getCount(), millis /
1000);
+ } else if (vector instanceof TimeStampMilliVector) {
+ ((TimeStampMilliVector) vector).setSafe(getCount(), millis);
+ } else if (vector instanceof TimeStampMicroVector) {
+ ((TimeStampMicroVector) vector).setSafe(getCount(), millis *
1000L + nanoOfMilli / 1000);
+ } else if (vector instanceof TimeStampNanoVector) {
+ ((TimeStampNanoVector) vector).setSafe(getCount(), millis *
1_000_000L + nanoOfMilli);
+ }
+ }
+ }
+
+ // ---- Inner classes for RowData and ArrayData ----
+
+ public static final class TimestampWriterForRow extends
TimestampWriter<RowData> {
+ private TimestampWriterForRow(ValueVector valueVector, int precision) {
+ super(valueVector, precision);
+ }
+
+ @Override
+ boolean isNullAt(RowData in, int ordinal) {
+ return in.isNullAt(ordinal);
+ }
+
+ @Override
+ TimestampData readTimestamp(RowData in, int ordinal) {
+ return in.getTimestamp(ordinal, precision);
+ }
+ }
+
+ public static final class TimestampWriterForArray extends
TimestampWriter<ArrayData> {
+ private TimestampWriterForArray(ValueVector valueVector, int
precision) {
+ super(valueVector, precision);
+ }
+
+ @Override
+ boolean isNullAt(ArrayData in, int ordinal) {
+ return in.isNullAt(ordinal);
+ }
+
+ @Override
+ TimestampData readTimestamp(ArrayData in, int ordinal) {
+ return in.getTimestamp(ordinal, precision);
+ }
+ }
+}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowWriterTest.java
b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowWriterTest.java
index 042a4794..27063e68 100644
---
a/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowWriterTest.java
+++
b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowWriterTest.java
@@ -20,6 +20,8 @@ import static org.junit.jupiter.api.Assertions.*;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.BigIntVector;
@@ -31,14 +33,21 @@ import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.NullVector;
import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.TimeMicroVector;
+import org.apache.arrow.vector.TimeStampMicroVector;
+import org.apache.arrow.vector.TimeStampMilliVector;
import org.apache.arrow.vector.TinyIntVector;
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.BooleanType;
@@ -47,10 +56,15 @@ import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.DoubleType;
import org.apache.flink.table.types.logical.FloatType;
import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
import org.apache.flink.table.types.logical.NullType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.table.types.logical.VarCharType;
@@ -250,15 +264,69 @@ public class FlinkArrowWriterTest {
}
}
- /** Unsupported vector types (e.g., from ArrayType) throw
UnsupportedOperationException. */
+ /** Unsupported types throw UnsupportedOperationException. */
@Test
public void testUnsupportedTypeThrows() {
- RowType rowType = RowType.of(new LogicalType[] {new ArrayType(new
IntType())}, new String[] {"f_array"});
+ // MultisetType is not supported by toArrowType, so this should throw
+ RowType rowType = RowType.of(new LogicalType[] {new MultisetType(new
IntType())}, new String[] {"f_multiset"});
+
+ assertThrows(UnsupportedOperationException.class, () ->
FlinkArrowUtils.toArrowSchema(rowType));
+ }
+
+ /** Writes one row containing Time, Timestamp, Array, Map, and Row types.
*/
+ @Test
+ public void testWriteTemporalAndComplexTypes() {
+ RowType innerRowType = RowType.of(new LogicalType[] {new IntType()},
new String[] {"nested_id"});
+
+ RowType rowType = RowType.of(
+ new LogicalType[] {
+ new TimeType(6), // TIME(6) → TimeMicroVector
+ new TimestampType(6), // TIMESTAMP(6) →
TimeStampMicroVector
+ new LocalZonedTimestampType(3), // TIMESTAMP_LTZ(3) →
TimeStampMilliVector
+ new ArrayType(new IntType()), // ARRAY<INT>
+ new MapType(new VarCharType(100), new IntType()), //
MAP<VARCHAR, INT>
+ innerRowType // ROW<nested_id INT>
+ },
+ new String[] {"f_time", "f_ts", "f_ts_ltz", "f_array",
"f_map", "f_row"});
Schema schema = FlinkArrowUtils.toArrowSchema(rowType);
try (VectorSchemaRoot root = VectorSchemaRoot.create(schema,
allocator)) {
- assertThrows(UnsupportedOperationException.class, () ->
FlinkArrowWriter.create(root, rowType));
+ FlinkArrowWriter writer = FlinkArrowWriter.create(root, rowType);
+
+ GenericRowData row = new GenericRowData(6);
+ row.setField(0, 3661000); // TIME: 1h 1m 1s in ms
+ row.setField(1, TimestampData.fromEpochMillis(1704067200123L,
456000)); // TIMESTAMP(6)
+ row.setField(2, TimestampData.fromEpochMillis(1704067200000L)); //
TIMESTAMP_LTZ(3)
+ row.setField(3, new GenericArrayData(new int[] {10, 20, 30})); //
ARRAY
+ Map<StringData, Integer> map = new HashMap<>();
+ map.put(StringData.fromString("key1"), 100);
+ row.setField(4, new GenericMapData(map)); // MAP
+ row.setField(5, GenericRowData.of(42)); // ROW
+
+ writer.write(row);
+ writer.finish();
+
+ assertEquals(1, root.getRowCount());
+
+ // TIME(6) → micro: 3661000 * 1000 = 3661000000
+ assertEquals(3661000L * 1000, ((TimeMicroVector)
root.getVector("f_time")).get(0));
+
+ // TIMESTAMP(6) → micro: 1704067200123 * 1000 + 456000 / 1000 =
1704067200123456
+ assertEquals(1704067200123L * 1000 + 456, ((TimeStampMicroVector)
root.getVector("f_ts")).get(0));
+
+ // TIMESTAMP_LTZ(3) → milli
+ assertEquals(1704067200000L, ((TimeStampMilliVector)
root.getVector("f_ts_ltz")).get(0));
+
+ // ARRAY
+ ListVector arrayVec = (ListVector) root.getVector("f_array");
+ assertFalse(arrayVec.isNull(0));
+
+ // MAP
+ assertFalse(root.getVector("f_map").isNull(0));
+
+ // ROW
+ assertFalse(root.getVector("f_row").isNull(0));
}
}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/writers/ArrayWriterTest.java
b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/writers/ArrayWriterTest.java
new file mode 100644
index 00000000..979921e7
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/writers/ArrayWriterTest.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.writers;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.ListVector;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.auron.flink.arrow.FlinkArrowUtils;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/** Unit tests for {@link ArrayWriter}. */
+public class ArrayWriterTest {
+
+ private BufferAllocator allocator;
+
+ @BeforeEach
+ public void setUp() {
+ allocator = new RootAllocator(Long.MAX_VALUE);
+ }
+
+ @AfterEach
+ public void tearDown() {
+ allocator.close();
+ }
+
+ @Test
+ public void testWriteIntArrayFromRowData() {
+ RowType rowType = RowType.of(new LogicalType[] {new ArrayType(new
IntType())}, new String[] {"f_array"});
+ Schema schema = FlinkArrowUtils.toArrowSchema(rowType);
+
+ try (VectorSchemaRoot root = VectorSchemaRoot.create(schema,
allocator)) {
+ ListVector listVector = (ListVector) root.getVector("f_array");
+ listVector.allocateNew();
+ IntVector dataVector = (IntVector) listVector.getDataVector();
+ dataVector.allocateNew();
+
+ ArrowFieldWriter<ArrayData> elementWriter =
IntWriter.forArray(dataVector);
+ ArrayWriter<RowData> writer = ArrayWriter.forRow(listVector,
elementWriter);
+
+ GenericRowData row = new GenericRowData(1);
+ row.setField(0, new GenericArrayData(new int[] {10, 20, 30}));
+ writer.write(row, 0);
+
+ writer.finish();
+
+ assertEquals(1, listVector.getValueCount());
+ assertFalse(listVector.isNull(0));
+ // Verify the data vector content
+ assertEquals(3, dataVector.getValueCount());
+ assertEquals(10, dataVector.get(0));
+ assertEquals(20, dataVector.get(1));
+ assertEquals(30, dataVector.get(2));
+ }
+ }
+
+ @Test
+ public void testWriteNullArrayFromRowData() {
+ RowType rowType = RowType.of(new LogicalType[] {new ArrayType(new
IntType())}, new String[] {"f_array"});
+ Schema schema = FlinkArrowUtils.toArrowSchema(rowType);
+
+ try (VectorSchemaRoot root = VectorSchemaRoot.create(schema,
allocator)) {
+ ListVector listVector = (ListVector) root.getVector("f_array");
+ listVector.allocateNew();
+ IntVector dataVector = (IntVector) listVector.getDataVector();
+ dataVector.allocateNew();
+
+ ArrowFieldWriter<ArrayData> elementWriter =
IntWriter.forArray(dataVector);
+ ArrayWriter<RowData> writer = ArrayWriter.forRow(listVector,
elementWriter);
+
+ GenericRowData row = new GenericRowData(1);
+ row.setField(0, null);
+ writer.write(row, 0);
+
+ writer.finish();
+
+ assertEquals(1, listVector.getValueCount());
+ assertTrue(listVector.isNull(0));
+ }
+ }
+
+ @Test
+ public void testWriteEmptyArrayFromRowData() {
+ RowType rowType = RowType.of(new LogicalType[] {new ArrayType(new
IntType())}, new String[] {"f_array"});
+ Schema schema = FlinkArrowUtils.toArrowSchema(rowType);
+
+ try (VectorSchemaRoot root = VectorSchemaRoot.create(schema,
allocator)) {
+ ListVector listVector = (ListVector) root.getVector("f_array");
+ listVector.allocateNew();
+ IntVector dataVector = (IntVector) listVector.getDataVector();
+ dataVector.allocateNew();
+
+ ArrowFieldWriter<ArrayData> elementWriter =
IntWriter.forArray(dataVector);
+ ArrayWriter<RowData> writer = ArrayWriter.forRow(listVector,
elementWriter);
+
+ GenericRowData row = new GenericRowData(1);
+ row.setField(0, new GenericArrayData(new int[] {}));
+ writer.write(row, 0);
+
+ writer.finish();
+
+ assertEquals(1, listVector.getValueCount());
+ assertFalse(listVector.isNull(0));
+ assertEquals(0, dataVector.getValueCount());
+ }
+ }
+
+ @Test
+ public void testWriteMultipleArraysFromRowData() {
+ RowType rowType = RowType.of(new LogicalType[] {new ArrayType(new
IntType())}, new String[] {"f_array"});
+ Schema schema = FlinkArrowUtils.toArrowSchema(rowType);
+
+ try (VectorSchemaRoot root = VectorSchemaRoot.create(schema,
allocator)) {
+ ListVector listVector = (ListVector) root.getVector("f_array");
+ listVector.allocateNew();
+ IntVector dataVector = (IntVector) listVector.getDataVector();
+ dataVector.allocateNew();
+
+ ArrowFieldWriter<ArrayData> elementWriter =
IntWriter.forArray(dataVector);
+ ArrayWriter<RowData> writer = ArrayWriter.forRow(listVector,
elementWriter);
+
+ writer.write(GenericRowData.of((Object) new GenericArrayData(new
int[] {1, 2})), 0);
+ writer.write(GenericRowData.of((Object) new GenericArrayData(new
int[] {3})), 0);
+ writer.write(GenericRowData.of((Object) new GenericArrayData(new
int[] {4, 5, 6})), 0);
+
+ writer.finish();
+
+ assertEquals(3, listVector.getValueCount());
+ assertEquals(6, dataVector.getValueCount());
+ }
+ }
+
+ @Test
+ public void testResetAndWriteNewBatch() {
+ RowType rowType = RowType.of(new LogicalType[] {new ArrayType(new
IntType())}, new String[] {"f_array"});
+ Schema schema = FlinkArrowUtils.toArrowSchema(rowType);
+
+ try (VectorSchemaRoot root = VectorSchemaRoot.create(schema,
allocator)) {
+ ListVector listVector = (ListVector) root.getVector("f_array");
+ listVector.allocateNew();
+ IntVector dataVector = (IntVector) listVector.getDataVector();
+ dataVector.allocateNew();
+
+ ArrowFieldWriter<ArrayData> elementWriter =
IntWriter.forArray(dataVector);
+ ArrayWriter<RowData> writer = ArrayWriter.forRow(listVector,
elementWriter);
+
+ writer.write(GenericRowData.of((Object) new GenericArrayData(new
int[] {1, 2})), 0);
+ writer.finish();
+ assertEquals(1, listVector.getValueCount());
+
+ writer.reset();
+ writer.write(GenericRowData.of((Object) new GenericArrayData(new
int[] {99})), 0);
+ writer.finish();
+ assertEquals(1, listVector.getValueCount());
+ }
+ }
+}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/writers/MapWriterTest.java
b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/writers/MapWriterTest.java
new file mode 100644
index 00000000..e4595941
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/writers/MapWriterTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.writers;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.MapVector;
+import org.apache.arrow.vector.complex.StructVector;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.auron.flink.arrow.FlinkArrowUtils;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/** Unit tests for {@link MapWriter}. */
+public class MapWriterTest {
+
+ private BufferAllocator allocator;
+
+ @BeforeEach
+ public void setUp() {
+ allocator = new RootAllocator(Long.MAX_VALUE);
+ }
+
+ @AfterEach
+ public void tearDown() {
+ allocator.close();
+ }
+
+ @Test
+ public void testWriteMapFromRowData() {
+ RowType rowType = RowType.of(
+ new LogicalType[] {new MapType(new VarCharType(100), new
IntType())}, new String[] {"f_map"});
+ Schema schema = FlinkArrowUtils.toArrowSchema(rowType);
+
+ try (VectorSchemaRoot root = VectorSchemaRoot.create(schema,
allocator)) {
+ MapVector mapVector = (MapVector) root.getVector("f_map");
+ mapVector.allocateNew();
+
+ StructVector entriesVector = (StructVector)
mapVector.getDataVector();
+ ArrowFieldWriter<ArrayData> keyWriter = VarCharWriter.forArray(
+ (org.apache.arrow.vector.VarCharVector)
entriesVector.getChild(MapVector.KEY_NAME));
+ ArrowFieldWriter<ArrayData> valueWriter = IntWriter.forArray(
+ (org.apache.arrow.vector.IntVector)
entriesVector.getChild(MapVector.VALUE_NAME));
+
+ MapWriter<RowData> writer = MapWriter.forRow(mapVector, keyWriter,
valueWriter);
+
+ Map<StringData, Integer> map = new HashMap<>();
+ map.put(StringData.fromString("a"), 1);
+ map.put(StringData.fromString("b"), 2);
+
+ GenericRowData row = new GenericRowData(1);
+ row.setField(0, new GenericMapData(map));
+ writer.write(row, 0);
+
+ writer.finish();
+
+ assertEquals(1, mapVector.getValueCount());
+ assertFalse(mapVector.isNull(0));
+ }
+ }
+
+ @Test
+ public void testWriteNullMapFromRowData() {
+ RowType rowType = RowType.of(
+ new LogicalType[] {new MapType(new VarCharType(100), new
IntType())}, new String[] {"f_map"});
+ Schema schema = FlinkArrowUtils.toArrowSchema(rowType);
+
+ try (VectorSchemaRoot root = VectorSchemaRoot.create(schema,
allocator)) {
+ MapVector mapVector = (MapVector) root.getVector("f_map");
+ mapVector.allocateNew();
+
+ StructVector entriesVector = (StructVector)
mapVector.getDataVector();
+ ArrowFieldWriter<ArrayData> keyWriter = VarCharWriter.forArray(
+ (org.apache.arrow.vector.VarCharVector)
entriesVector.getChild(MapVector.KEY_NAME));
+ ArrowFieldWriter<ArrayData> valueWriter = IntWriter.forArray(
+ (org.apache.arrow.vector.IntVector)
entriesVector.getChild(MapVector.VALUE_NAME));
+
+ MapWriter<RowData> writer = MapWriter.forRow(mapVector, keyWriter,
valueWriter);
+
+ GenericRowData row = new GenericRowData(1);
+ row.setField(0, null);
+ writer.write(row, 0);
+
+ writer.finish();
+
+ assertEquals(1, mapVector.getValueCount());
+ assertTrue(mapVector.isNull(0));
+ }
+ }
+
+ @Test
+ public void testWriteEmptyMapFromRowData() {
+ RowType rowType = RowType.of(
+ new LogicalType[] {new MapType(new VarCharType(100), new
IntType())}, new String[] {"f_map"});
+ Schema schema = FlinkArrowUtils.toArrowSchema(rowType);
+
+ try (VectorSchemaRoot root = VectorSchemaRoot.create(schema,
allocator)) {
+ MapVector mapVector = (MapVector) root.getVector("f_map");
+ mapVector.allocateNew();
+
+ StructVector entriesVector = (StructVector)
mapVector.getDataVector();
+ ArrowFieldWriter<ArrayData> keyWriter = VarCharWriter.forArray(
+ (org.apache.arrow.vector.VarCharVector)
entriesVector.getChild(MapVector.KEY_NAME));
+ ArrowFieldWriter<ArrayData> valueWriter = IntWriter.forArray(
+ (org.apache.arrow.vector.IntVector)
entriesVector.getChild(MapVector.VALUE_NAME));
+
+ MapWriter<RowData> writer = MapWriter.forRow(mapVector, keyWriter,
valueWriter);
+
+ GenericRowData row = new GenericRowData(1);
+ row.setField(0, new GenericMapData(new HashMap<>()));
+ writer.write(row, 0);
+
+ writer.finish();
+
+ assertEquals(1, mapVector.getValueCount());
+ assertFalse(mapVector.isNull(0));
+ }
+ }
+}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/writers/RowWriterTest.java
b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/writers/RowWriterTest.java
new file mode 100644
index 00000000..619fb7ad
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/writers/RowWriterTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.writers;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.StructVector;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.auron.flink.arrow.FlinkArrowUtils;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/** Unit tests for {@link RowWriter}. */
+public class RowWriterTest {
+
+ private BufferAllocator allocator;
+
+ @BeforeEach
+ public void setUp() {
+ allocator = new RootAllocator(Long.MAX_VALUE);
+ }
+
+ @AfterEach
+ public void tearDown() {
+ allocator.close();
+ }
+
+ @Test
+ public void testWriteRowFromRowData() {
+ RowType innerType =
+ RowType.of(new LogicalType[] {new IntType(), new
VarCharType(100)}, new String[] {"id", "name"});
+ RowType outerType = RowType.of(new LogicalType[] {innerType}, new
String[] {"f_row"});
+ Schema schema = FlinkArrowUtils.toArrowSchema(outerType);
+
+ try (VectorSchemaRoot root = VectorSchemaRoot.create(schema,
allocator)) {
+ StructVector structVector = (StructVector) root.getVector("f_row");
+ structVector.allocateNew();
+
+ @SuppressWarnings("unchecked")
+ ArrowFieldWriter<RowData>[] fieldsWriters = new ArrowFieldWriter[]
{
+ IntWriter.forRow((IntVector) structVector.getChild("id")),
+ VarCharWriter.forRow((VarCharVector)
structVector.getChild("name"))
+ };
+
+ RowWriter<RowData> writer = RowWriter.forRow(structVector,
fieldsWriters);
+
+ GenericRowData innerRow = GenericRowData.of(42,
StringData.fromString("hello"));
+ GenericRowData outerRow = new GenericRowData(1);
+ outerRow.setField(0, innerRow);
+ writer.write(outerRow, 0);
+
+ writer.finish();
+
+ assertEquals(1, structVector.getValueCount());
+ assertFalse(structVector.isNull(0));
+ IntVector idVector = (IntVector) structVector.getChild("id");
+ VarCharVector nameVector = (VarCharVector)
structVector.getChild("name");
+ assertEquals(42, idVector.get(0));
+ assertArrayEquals("hello".getBytes(), nameVector.get(0));
+ }
+ }
+
+ @Test
+ public void testWriteNullRowFromRowData() {
+ RowType innerType = RowType.of(new LogicalType[] {new IntType()}, new
String[] {"id"});
+ RowType outerType = RowType.of(new LogicalType[] {innerType}, new
String[] {"f_row"});
+ Schema schema = FlinkArrowUtils.toArrowSchema(outerType);
+
+ try (VectorSchemaRoot root = VectorSchemaRoot.create(schema,
allocator)) {
+ StructVector structVector = (StructVector) root.getVector("f_row");
+ structVector.allocateNew();
+
+ @SuppressWarnings("unchecked")
+ ArrowFieldWriter<RowData>[] fieldsWriters =
+ new ArrowFieldWriter[] {IntWriter.forRow((IntVector)
structVector.getChild("id"))};
+
+ RowWriter<RowData> writer = RowWriter.forRow(structVector,
fieldsWriters);
+
+ GenericRowData outerRow = new GenericRowData(1);
+ outerRow.setField(0, null);
+ writer.write(outerRow, 0);
+
+ writer.finish();
+
+ assertEquals(1, structVector.getValueCount());
+ assertTrue(structVector.isNull(0));
+ }
+ }
+
+ @Test
+ public void testWriteMultipleRowsFromRowData() {
+ RowType innerType = RowType.of(new LogicalType[] {new IntType()}, new
String[] {"val"});
+ RowType outerType = RowType.of(new LogicalType[] {innerType}, new
String[] {"f_row"});
+ Schema schema = FlinkArrowUtils.toArrowSchema(outerType);
+
+ try (VectorSchemaRoot root = VectorSchemaRoot.create(schema,
allocator)) {
+ StructVector structVector = (StructVector) root.getVector("f_row");
+ structVector.allocateNew();
+
+ @SuppressWarnings("unchecked")
+ ArrowFieldWriter<RowData>[] fieldsWriters =
+ new ArrowFieldWriter[] {IntWriter.forRow((IntVector)
structVector.getChild("val"))};
+
+ RowWriter<RowData> writer = RowWriter.forRow(structVector,
fieldsWriters);
+
+ writer.write(GenericRowData.of((Object) GenericRowData.of(1)), 0);
+ writer.write(GenericRowData.of((Object) GenericRowData.of(2)), 0);
+ writer.write(GenericRowData.of((Object) GenericRowData.of(3)), 0);
+
+ writer.finish();
+
+ assertEquals(3, structVector.getValueCount());
+ IntVector valVector = (IntVector) structVector.getChild("val");
+ assertEquals(1, valVector.get(0));
+ assertEquals(2, valVector.get(1));
+ assertEquals(3, valVector.get(2));
+ }
+ }
+}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/writers/TimeWriterTest.java
b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/writers/TimeWriterTest.java
new file mode 100644
index 00000000..120ea5a2
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/writers/TimeWriterTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.writers;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.TimeMicroVector;
+import org.apache.arrow.vector.TimeMilliVector;
+import org.apache.arrow.vector.TimeNanoVector;
+import org.apache.arrow.vector.TimeSecVector;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/** Unit tests for {@link TimeWriter}. */
+public class TimeWriterTest {
+
+ private BufferAllocator allocator;
+
+ @BeforeEach
+ public void setUp() {
+ allocator = new RootAllocator(Long.MAX_VALUE);
+ }
+
+ @AfterEach
+ public void tearDown() {
+ allocator.close();
+ }
+
+ @Test
+ public void testWriteSecondPrecisionFromRowData() {
+ try (TimeSecVector vector = new TimeSecVector("test_time_sec",
allocator)) {
+ vector.allocateNew();
+ TimeWriter<RowData> writer = TimeWriter.forRow(vector);
+
+ // 3661000ms = 1h 1m 1s → 3661s
+ writer.write(GenericRowData.of(3661000), 0);
+ writer.finish();
+
+ assertEquals(1, vector.getValueCount());
+ assertEquals(3661, vector.get(0));
+ }
+ }
+
+ @Test
+ public void testWriteMillisecondPrecisionFromRowData() {
+ try (TimeMilliVector vector = new TimeMilliVector("test_time_milli",
allocator)) {
+ vector.allocateNew();
+ TimeWriter<RowData> writer = TimeWriter.forRow(vector);
+
+ writer.write(GenericRowData.of(3661000), 0);
+ writer.write(GenericRowData.of(0), 0);
+ writer.write(GenericRowData.of(86399999), 0);
+ writer.finish();
+
+ assertEquals(3, vector.getValueCount());
+ assertEquals(3661000, vector.get(0));
+ assertEquals(0, vector.get(1));
+ assertEquals(86399999, vector.get(2));
+ }
+ }
+
+ @Test
+ public void testWriteMicrosecondPrecisionFromRowData() {
+ try (TimeMicroVector vector = new TimeMicroVector("test_time_micro",
allocator)) {
+ vector.allocateNew();
+ TimeWriter<RowData> writer = TimeWriter.forRow(vector);
+
+ writer.write(GenericRowData.of(3661000), 0);
+ writer.finish();
+
+ assertEquals(1, vector.getValueCount());
+ assertEquals(3661000L * 1000L, vector.get(0));
+ }
+ }
+
+ @Test
+ public void testWriteNanosecondPrecisionFromRowData() {
+ try (TimeNanoVector vector = new TimeNanoVector("test_time_nano",
allocator)) {
+ vector.allocateNew();
+ TimeWriter<RowData> writer = TimeWriter.forRow(vector);
+
+ writer.write(GenericRowData.of(3661000), 0);
+ writer.finish();
+
+ assertEquals(1, vector.getValueCount());
+ assertEquals(3661000L * 1_000_000L, vector.get(0));
+ }
+ }
+
+ @Test
+ public void testWriteNullFromRowData() {
+ try (TimeMilliVector vector = new TimeMilliVector("test_time_null",
allocator)) {
+ vector.allocateNew();
+ TimeWriter<RowData> writer = TimeWriter.forRow(vector);
+
+ writer.write(GenericRowData.of(3661000), 0);
+ writer.write(GenericRowData.of((Object) null), 0);
+ writer.write(GenericRowData.of(1000), 0);
+ writer.finish();
+
+ assertEquals(3, vector.getValueCount());
+ assertFalse(vector.isNull(0));
+ assertEquals(3661000, vector.get(0));
+ assertTrue(vector.isNull(1));
+ assertFalse(vector.isNull(2));
+ assertEquals(1000, vector.get(2));
+ }
+ }
+
+ @Test
+ public void testWriteFromArrayData() {
+ try (TimeMilliVector vector = new TimeMilliVector("test_time_array",
allocator)) {
+ vector.allocateNew();
+ TimeWriter<ArrayData> writer = TimeWriter.forArray(vector);
+
+ GenericArrayData array = new GenericArrayData(new int[] {3661000,
0, 86399999});
+
+ writer.write(array, 0);
+ writer.write(array, 1);
+ writer.write(array, 2);
+ writer.finish();
+
+ assertEquals(3, vector.getValueCount());
+ assertEquals(3661000, vector.get(0));
+ assertEquals(0, vector.get(1));
+ assertEquals(86399999, vector.get(2));
+ }
+ }
+
+ @Test
+ public void testWriteMicroFromArrayData() {
+ try (TimeMicroVector vector = new
TimeMicroVector("test_time_micro_array", allocator)) {
+ vector.allocateNew();
+ TimeWriter<ArrayData> writer = TimeWriter.forArray(vector);
+
+ GenericArrayData array = new GenericArrayData(new int[] {5000,
12345});
+
+ writer.write(array, 0);
+ writer.write(array, 1);
+ writer.finish();
+
+ assertEquals(2, vector.getValueCount());
+ assertEquals(5000L * 1000L, vector.get(0));
+ assertEquals(12345L * 1000L, vector.get(1));
+ }
+ }
+
+ @Test
+ public void testResetAndWriteNewBatch() {
+ try (TimeMilliVector vector = new TimeMilliVector("test_time_reset",
allocator)) {
+ vector.allocateNew();
+ TimeWriter<RowData> writer = TimeWriter.forRow(vector);
+
+ // First batch
+ writer.write(GenericRowData.of(1000), 0);
+ writer.write(GenericRowData.of(2000), 0);
+ writer.finish();
+
+ assertEquals(2, vector.getValueCount());
+ assertEquals(1000, vector.get(0));
+ assertEquals(2000, vector.get(1));
+
+ // Reset and write second batch
+ writer.reset();
+
+ writer.write(GenericRowData.of(9000), 0);
+ writer.finish();
+
+ assertEquals(1, vector.getValueCount());
+ assertEquals(9000, vector.get(0));
+ }
+ }
+}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/writers/TimestampWriterTest.java
b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/writers/TimestampWriterTest.java
new file mode 100644
index 00000000..c78cba54
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/writers/TimestampWriterTest.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.writers;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.TimeStampMicroVector;
+import org.apache.arrow.vector.TimeStampMilliVector;
+import org.apache.arrow.vector.TimeStampNanoVector;
+import org.apache.arrow.vector.TimeStampSecVector;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/** Unit tests for {@link TimestampWriter}. */
+public class TimestampWriterTest {
+
+ private BufferAllocator allocator;
+
+ @BeforeEach
+ public void setUp() {
+ allocator = new RootAllocator(Long.MAX_VALUE);
+ }
+
+ @AfterEach
+ public void tearDown() {
+ allocator.close();
+ }
+
+ @Test
+ public void testWriteSecondPrecisionFromRowData() {
+ try (TimeStampSecVector vector = new TimeStampSecVector("test_ts_sec",
allocator)) {
+ vector.allocateNew();
+ TimestampWriter<RowData> writer = TimestampWriter.forRow(vector,
0);
+
+ // 1704067200000ms = 2024-01-01T00:00:00Z → 1704067200 seconds
+ GenericRowData row =
GenericRowData.of(TimestampData.fromEpochMillis(1704067200000L));
+ writer.write(row, 0);
+ writer.finish();
+
+ assertEquals(1, vector.getValueCount());
+ assertEquals(1704067200L, vector.get(0));
+ }
+ }
+
+ @Test
+ public void testWriteMillisecondPrecisionFromRowData() {
+ try (TimeStampMilliVector vector = new
TimeStampMilliVector("test_ts_milli", allocator)) {
+ vector.allocateNew();
+ TimestampWriter<RowData> writer = TimestampWriter.forRow(vector,
3);
+
+ GenericRowData row =
GenericRowData.of(TimestampData.fromEpochMillis(1704067200123L));
+ writer.write(row, 0);
+ writer.finish();
+
+ assertEquals(1, vector.getValueCount());
+ assertEquals(1704067200123L, vector.get(0));
+ }
+ }
+
+ @Test
+ public void testWriteMicrosecondPrecisionFromRowData() {
+ try (TimeStampMicroVector vector = new
TimeStampMicroVector("test_ts_micro", allocator)) {
+ vector.allocateNew();
+ TimestampWriter<RowData> writer = TimestampWriter.forRow(vector,
6);
+
+ // millis=1704067200123, nanoOfMilli=456000 → micros =
1704067200123*1000 + 456000/1000
+ // = 1704067200123000 + 456 = 1704067200123456
+ GenericRowData row =
GenericRowData.of(TimestampData.fromEpochMillis(1704067200123L, 456000));
+ writer.write(row, 0);
+ writer.finish();
+
+ assertEquals(1, vector.getValueCount());
+ assertEquals(1704067200123456L, vector.get(0));
+ }
+ }
+
+ @Test
+ public void testWriteNanosecondPrecisionFromRowData() {
+ try (TimeStampNanoVector vector = new
TimeStampNanoVector("test_ts_nano", allocator)) {
+ vector.allocateNew();
+ TimestampWriter<RowData> writer = TimestampWriter.forRow(vector,
9);
+
+ // millis=1704067200123, nanoOfMilli=456789 → nanos =
1704067200123*1_000_000 + 456789
+ // = 1704067200123000000 + 456789 = 1704067200123456789
+ GenericRowData row =
GenericRowData.of(TimestampData.fromEpochMillis(1704067200123L, 456789));
+ writer.write(row, 0);
+ writer.finish();
+
+ assertEquals(1, vector.getValueCount());
+ assertEquals(1704067200123456789L, vector.get(0));
+ }
+ }
+
+ @Test
+ public void testWriteNullFromRowData() {
+ try (TimeStampMilliVector vector = new
TimeStampMilliVector("test_ts_null", allocator)) {
+ vector.allocateNew();
+ TimestampWriter<RowData> writer = TimestampWriter.forRow(vector,
3);
+
+ GenericRowData row0 =
GenericRowData.of(TimestampData.fromEpochMillis(1000L));
+ GenericRowData row1 = GenericRowData.of((Object) null);
+ GenericRowData row2 =
GenericRowData.of(TimestampData.fromEpochMillis(2000L));
+
+ writer.write(row0, 0);
+ writer.write(row1, 0);
+ writer.write(row2, 0);
+ writer.finish();
+
+ assertEquals(3, vector.getValueCount());
+ assertFalse(vector.isNull(0));
+ assertEquals(1000L, vector.get(0));
+ assertTrue(vector.isNull(1));
+ assertFalse(vector.isNull(2));
+ assertEquals(2000L, vector.get(2));
+ }
+ }
+
+ @Test
+ public void testWritePreEpochMicrosecond() {
+ try (TimeStampMicroVector vector = new
TimeStampMicroVector("test_ts_pre_epoch_micro", allocator)) {
+ vector.allocateNew();
+ TimestampWriter<RowData> writer = TimestampWriter.forRow(vector,
6);
+
+ // millis=-1, nanoOfMilli=999000 → micros = -1*1000 + 999000/1000
= -1000 + 999 = -1
+ GenericRowData row =
GenericRowData.of(TimestampData.fromEpochMillis(-1, 999000));
+ writer.write(row, 0);
+ writer.finish();
+
+ assertEquals(1, vector.getValueCount());
+ assertEquals(-1L, vector.get(0));
+ }
+ }
+
+ @Test
+ public void testWriteFromArrayData() {
+ try (TimeStampMilliVector vector = new
TimeStampMilliVector("test_ts_array", allocator)) {
+ vector.allocateNew();
+ TimestampWriter<ArrayData> writer =
TimestampWriter.forArray(vector, 3);
+
+ GenericArrayData array = new GenericArrayData(new TimestampData[] {
+ TimestampData.fromEpochMillis(1000L),
+ TimestampData.fromEpochMillis(2000L),
+ TimestampData.fromEpochMillis(3000L)
+ });
+
+ writer.write(array, 0);
+ writer.write(array, 1);
+ writer.write(array, 2);
+ writer.finish();
+
+ assertEquals(3, vector.getValueCount());
+ assertEquals(1000L, vector.get(0));
+ assertEquals(2000L, vector.get(1));
+ assertEquals(3000L, vector.get(2));
+ }
+ }
+
+ @Test
+ public void testWritePreEpochNanosecond() {
+ try (TimeStampNanoVector vector = new
TimeStampNanoVector("test_ts_pre_epoch_nano", allocator)) {
+ vector.allocateNew();
+ TimestampWriter<RowData> writer = TimestampWriter.forRow(vector,
9);
+
+ // millis=-1, nanoOfMilli=999000 → nanos = -1*1_000_000 + 999000 =
-1000000 + 999000
+ // = -1000
+ GenericRowData row =
GenericRowData.of(TimestampData.fromEpochMillis(-1, 999000));
+ writer.write(row, 0);
+ writer.finish();
+
+ assertEquals(1, vector.getValueCount());
+ assertEquals(-1000L, vector.get(0));
+ }
+ }
+
+ @Test
+ public void testResetAndWriteNewBatch() {
+ try (TimeStampMilliVector vector = new
TimeStampMilliVector("test_ts_reset", allocator)) {
+ vector.allocateNew();
+ TimestampWriter<RowData> writer = TimestampWriter.forRow(vector,
3);
+
+ // First batch
+
writer.write(GenericRowData.of(TimestampData.fromEpochMillis(1000L)), 0);
+
writer.write(GenericRowData.of(TimestampData.fromEpochMillis(2000L)), 0);
+ writer.finish();
+
+ assertEquals(2, vector.getValueCount());
+ assertEquals(1000L, vector.get(0));
+ assertEquals(2000L, vector.get(1));
+
+ // Reset
+ writer.reset();
+
+ assertEquals(0, vector.getValueCount());
+
+ // Second batch
+
writer.write(GenericRowData.of(TimestampData.fromEpochMillis(9999L)), 0);
+ writer.finish();
+
+ assertEquals(1, vector.getValueCount());
+ assertEquals(9999L, vector.get(0));
+ }
+ }
+}