x-tong opened a new pull request, #2086:
URL: https://github.com/apache/auron/pull/2086
# Which issue does this PR close?
Partially addresses #1850 (Part 2b of the Flink RowData to Arrow conversion).
# Rationale for this change
Part 2a (#2079) implemented `ArrowFieldWriter` base class, 12 basic type
writers, and `FlinkArrowWriter` orchestrator. This PR completes the remaining 5
writer types (Time, Timestamp, Array, Map, Row), enabling full coverage of all
Flink logical types supported by the Arrow type mapping introduced in Part 1
(#1959).
The implementation follows Flink's official `flink-python` Arrow module as
established in Part 2a, with the same `forRow()`/`forArray()` dual-mode factory
pattern and template method design.
# What changes are included in this PR?
## Commit 1: 5 ArrowFieldWriters + unit tests (10 files, +1509 lines)
- **`TimeWriter`** — Handles all 4 Arrow time precisions (`TimeSecVector`,
`TimeMilliVector`, `TimeMicroVector`, `TimeNanoVector`) via instanceof
dispatch. Flink stores TIME as int (milliseconds), converted to each precision
with `L`-suffixed literals to avoid int overflow.
- **`TimestampWriter`** — Handles all 4 Arrow timestamp precisions. Combines
`TimestampData.getMillisecond()` (long) and `getNanoOfMillisecond()` (int) for
sub-millisecond precision. Constructor validates `timezone == null` via
`Preconditions.checkState`, matching Flink official — timezone is not handled
at the writer layer.
- **`ArrayWriter`** — Delegates to an `elementWriter`
(`ArrowFieldWriter<ArrayData>`) for each array element. Overrides
`finish()`/`reset()` to propagate to the element writer.
- **`MapWriter`** — Arrow maps are `List<Struct{key, value}>`. Holds
separate key and value writers operating on `ArrayData`. Sets
`structVector.setIndexDefined()` for each entry. Overrides `finish()`/`reset()`
to propagate to key/value writers.
- **`RowWriter`** — Nested struct handling with
`ArrowFieldWriter<RowData>[]` for child fields. Caches a `nullRow`
(`GenericRowData`) in the constructor for null struct handling (avoids per-call
allocation). Uses a single child-write loop for both null and non-null paths,
matching Flink official.
- **Unit tests**: `TimeWriterTest` (8), `TimestampWriterTest` (9),
`ArrayWriterTest` (5), `MapWriterTest` (3), `RowWriterTest` (3) — 28 tests
covering all precisions, null handling, reset/multi-batch, edge cases
(pre-epoch timestamps, empty arrays/maps).
## Commit 2: Factory method extension + integration test (2 files, +158
lines)
- **`FlinkArrowUtils`** — Extended `createArrowFieldWriterForRow()` and
`createArrowFieldWriterForArray()` with branches for `TimeWriter`,
`TimestampWriter`, `ArrayWriter`, `MapWriter`, `RowWriter`. MapVector check is
placed before ListVector (since `MapVector extends ListVector`). Timestamp
branch extracts precision from both `TimestampType` and
`LocalZonedTimestampType`.
- **`FlinkArrowWriterTest`** — Added `testWriteTemporalAndComplexTypes`
integration test covering TIME(6), TIMESTAMP(6), TIMESTAMP_LTZ(3),
ARRAY\<INT\>, MAP\<VARCHAR, INT\>, ROW\<nested_id INT\>. Updated
`testUnsupportedTypeThrows` to use `MultisetType` (since `ArrayType` is now
supported).
# Scope
This PR completes all Flink-to-Arrow writer types. The remaining work for
#1850 is the reverse direction (Arrow-to-Flink reader), which is tracked
separately.
# Are there any user-facing changes?
No. Internal API for Flink integration.
# How was this patch tested?
36 tests across 6 test classes (28 new + 8 existing):
```bash
./build/mvn test -Pflink-1.18 -Pspark-3.5 -Pscala-2.12 \
-pl auron-flink-extension/auron-flink-runtime -am -DskipBuildNative
```
Result: 36 pass, 0 failures.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]