weiqingy commented on code in PR #2063: URL: https://github.com/apache/auron/pull/2063#discussion_r2879744303
########## docs/PR-AURON-1851/AURON-1851-DESIGN.md: ########## @@ -0,0 +1,461 @@ +# Design: Arrow to Flink RowData Conversion (AURON #1851) + +**Issue**: https://github.com/apache/auron/issues/1851 +**AIP**: AIP-1 — Introduce Flink integration of native engine +**Author**: @weiqingy +**Status**: Draft + +## 1. Motivation + +Per AIP-1, the Flink integration data path is: + +``` +Flink RowData → Arrow (Writer) → C Data Interface / JNI → Rust/DataFusion → Arrow → Flink RowData (Reader) + ^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + #1850 (x-tong) #1851 (this issue) +``` + +The writer side (#1850, PR #1930) converts Flink `RowData` into Arrow `VectorSchemaRoot` for export to the native engine. This issue implements the reverse: converting Arrow vectors returned by the native engine back into Flink `RowData` so downstream Flink operators can process the results. + +Without this component, the native execution results cannot flow back into the Flink pipeline. + +## 2. Design Approach + +### Two Candidate Approaches + +**Approach A — Columnar ColumnVector wrappers (recommended)** + +Follow Flink's own pattern from `flink-python`. Create thin `ColumnVector` wrappers around Arrow `FieldVector` types, then compose them into an `ArrowReader` that returns `ColumnarRowData`. This is how Flink itself reads Arrow data internally. + +- Each wrapper implements a Flink `ColumnVector` sub-interface (e.g., `IntColumnVector`, `BytesColumnVector`) +- The wrapper delegates reads directly to the underlying Arrow vector — zero data copying +- `ColumnarRowData` provides a row view over the batch without materializing individual rows + +**Approach B — Row-at-a-time FlinkArrowFieldReader** + +Mirror the writer pattern (PR #1930). Create `FlinkArrowFieldReader` subclasses that read field-by-field into `GenericRowData`, iterating row-by-row over the batch. + +- Conceptually simpler, symmetric with the writer +- Copies data element-by-element — O(rows × columns) allocations per batch + +### Decision: Approach A + +Approach A is chosen for these reasons: + +1. **Performance**: Zero-copy columnar access. `ColumnarRowData` is just a view — no per-row object allocation. This aligns with AIP-1's goal of native vectorized execution. +2. **Flink precedent**: This is exactly how Flink's own `flink-python` module reads Arrow data. The pattern is proven and maintained upstream. +3. **Compatibility**: `ColumnarRowData` implements `RowData`, so it is transparent to all downstream Flink operators. +4. **Memory efficiency**: Arrow vectors remain the single source of truth. No duplicate data structures. + +The tradeoff is more classes (one wrapper per vector type), but each is ~20 lines of delegation code. + +## 3. Detailed Design + +### 3.1 Package Structure + +All classes in `org.apache.auron.flink.arrow` (same package as `FlinkArrowUtils`, `FlinkArrowWriter`). + +``` +auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/ +├── FlinkArrowUtils.java (existing) +├── FlinkArrowWriter.java (PR #1930) +├── FlinkArrowFieldWriter.java (PR #1930) +├── FlinkArrowReader.java (NEW — orchestrator) +└── vectors/ (NEW — ColumnVector wrappers) + ├── ArrowBooleanColumnVector.java + ├── ArrowTinyIntColumnVector.java + ├── ArrowSmallIntColumnVector.java + ├── ArrowIntColumnVector.java + ├── ArrowBigIntColumnVector.java + ├── ArrowFloatColumnVector.java + ├── ArrowDoubleColumnVector.java + ├── ArrowDecimalColumnVector.java + ├── ArrowVarCharColumnVector.java + ├── ArrowVarBinaryColumnVector.java + ├── ArrowDateColumnVector.java + ├── ArrowTimeColumnVector.java + ├── ArrowTimestampColumnVector.java + ├── ArrowArrayColumnVector.java + ├── ArrowMapColumnVector.java + └── ArrowRowColumnVector.java +``` + +### 3.2 ColumnVector Wrappers + +Each wrapper implements the corresponding Flink `ColumnVector` sub-interface and delegates to the Arrow `FieldVector`. Example pattern: + +```java +/** + * Arrow-backed column vector for INT columns. + * Wraps an Arrow IntVector and implements Flink's IntColumnVector. + */ +public final class ArrowIntColumnVector implements IntColumnVector { + + private final IntVector vector; + + public ArrowIntColumnVector(IntVector vector) { + this.vector = Preconditions.checkNotNull(vector); + } + + @Override + public int getInt(int i) { + return vector.get(i); + } + + @Override + public boolean isNullAt(int i) { + return vector.isNull(i); + } +} +``` + +All wrappers follow this same pattern. Notable type-specific handling: + +| Flink ColumnVector Interface | Arrow Vector | Notes | +|---|---|---| +| `BooleanColumnVector` | `BitVector` | | +| `ByteColumnVector` | `TinyIntVector` | | +| `ShortColumnVector` | `SmallIntVector` | | +| `IntColumnVector` | `IntVector` | Also used for `DateDayVector` (date as epoch days) | +| `LongColumnVector` | `BigIntVector` | | +| `FloatColumnVector` | `Float4Vector` | | +| `DoubleColumnVector` | `Float8Vector` | | +| `DecimalColumnVector` | `DecimalVector` | `fromUnscaledLong` for precision ≤ 18, `fromBigDecimal` otherwise | +| `BytesColumnVector` | `VarCharVector` | `new Bytes(vector.get(i))` — matches Flink upstream pattern | +| `BytesColumnVector` | `VarBinaryVector` | `new Bytes(vector.get(i))` — matches Flink upstream pattern | +| `IntColumnVector` | `DateDayVector` | Date stored as int (epoch days), same as Flink internal | +| `IntColumnVector` | `TimeMicroVector` | Convert microseconds back to milliseconds (int) to match Flink's internal representation | +| `TimestampColumnVector` | `TimeStampMicroVector` | Convert micros → `TimestampData.fromEpochMillis(millis, nanoOfMillisecond)` | +| `TimestampColumnVector` | `TimeStampMicroTZVector` | Same conversion, for `TIMESTAMP WITH LOCAL TIME ZONE` | +| `ArrayColumnVector` | `ListVector` | Recursively wraps element vector | +| `MapColumnVector` | `MapVector` | Recursively wraps key/value vectors | +| `RowColumnVector` | `StructVector` | Recursively wraps child field vectors | + +### 3.3 Timestamp Precision Handling + +The writer (PR #1930) normalizes all timestamps to **microsecond** precision in Arrow. The reader must reverse this: + +``` +Writer path: TimestampData → microseconds (long) stored in TimeStampMicroVector +Reader path: TimeStampMicroVector → microseconds (long) → TimestampData + +Conversion: + long micros = vector.get(i); + long millis = micros / 1000; + int nanoOfMillisecond = (int) (micros % 1000) * 1000; + return TimestampData.fromEpochMillis(millis, nanoOfMillisecond); +``` + +Similarly for Time: +``` +Writer path: int millis → micros (long) stored in TimeMicroVector +Reader path: TimeMicroVector → micros (long) → millis (int) + +Conversion: + return (int) (vector.get(i) / 1000); +``` + +This matches the writer's conversions in `FlinkArrowFieldWriter.TimestampWriter` and `FlinkArrowFieldWriter.TimeWriter`. Review Comment: FlinkArrowUtils.toArrowType() defines the Arrow schema type metadata, but the writer's FlinkArrowFieldWriter controls the actual data normalization. These are separate concerns — the schema metadata says "this is a Timestamp(MICROSECOND)" and the writer ensures the values are indeed in microseconds. The design doc's claim refers to the writer's data normalization, not toArrowType(). Will confirm with @x-tong on PR #1930. ########## docs/PR-AURON-1851/AURON-1851-DESIGN.md: ########## @@ -0,0 +1,461 @@ +# Design: Arrow to Flink RowData Conversion (AURON #1851) + +**Issue**: https://github.com/apache/auron/issues/1851 +**AIP**: AIP-1 — Introduce Flink integration of native engine +**Author**: @weiqingy +**Status**: Draft + +## 1. Motivation + +Per AIP-1, the Flink integration data path is: + +``` +Flink RowData → Arrow (Writer) → C Data Interface / JNI → Rust/DataFusion → Arrow → Flink RowData (Reader) + ^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + #1850 (x-tong) #1851 (this issue) +``` + +The writer side (#1850, PR #1930) converts Flink `RowData` into Arrow `VectorSchemaRoot` for export to the native engine. This issue implements the reverse: converting Arrow vectors returned by the native engine back into Flink `RowData` so downstream Flink operators can process the results. + +Without this component, the native execution results cannot flow back into the Flink pipeline. + +## 2. Design Approach + +### Two Candidate Approaches + +**Approach A — Columnar ColumnVector wrappers (recommended)** + +Follow Flink's own pattern from `flink-python`. Create thin `ColumnVector` wrappers around Arrow `FieldVector` types, then compose them into an `ArrowReader` that returns `ColumnarRowData`. This is how Flink itself reads Arrow data internally. + +- Each wrapper implements a Flink `ColumnVector` sub-interface (e.g., `IntColumnVector`, `BytesColumnVector`) +- The wrapper delegates reads directly to the underlying Arrow vector — zero data copying +- `ColumnarRowData` provides a row view over the batch without materializing individual rows + +**Approach B — Row-at-a-time FlinkArrowFieldReader** + +Mirror the writer pattern (PR #1930). Create `FlinkArrowFieldReader` subclasses that read field-by-field into `GenericRowData`, iterating row-by-row over the batch. + +- Conceptually simpler, symmetric with the writer +- Copies data element-by-element — O(rows × columns) allocations per batch + +### Decision: Approach A + +Approach A is chosen for these reasons: + +1. **Performance**: Zero-copy columnar access. `ColumnarRowData` is just a view — no per-row object allocation. This aligns with AIP-1's goal of native vectorized execution. +2. **Flink precedent**: This is exactly how Flink's own `flink-python` module reads Arrow data. The pattern is proven and maintained upstream. +3. **Compatibility**: `ColumnarRowData` implements `RowData`, so it is transparent to all downstream Flink operators. +4. **Memory efficiency**: Arrow vectors remain the single source of truth. No duplicate data structures. + +The tradeoff is more classes (one wrapper per vector type), but each is ~20 lines of delegation code. + +## 3. Detailed Design + +### 3.1 Package Structure + +All classes in `org.apache.auron.flink.arrow` (same package as `FlinkArrowUtils`, `FlinkArrowWriter`). + +``` +auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/ +├── FlinkArrowUtils.java (existing) +├── FlinkArrowWriter.java (PR #1930) +├── FlinkArrowFieldWriter.java (PR #1930) +├── FlinkArrowReader.java (NEW — orchestrator) +└── vectors/ (NEW — ColumnVector wrappers) + ├── ArrowBooleanColumnVector.java + ├── ArrowTinyIntColumnVector.java + ├── ArrowSmallIntColumnVector.java + ├── ArrowIntColumnVector.java + ├── ArrowBigIntColumnVector.java + ├── ArrowFloatColumnVector.java + ├── ArrowDoubleColumnVector.java + ├── ArrowDecimalColumnVector.java + ├── ArrowVarCharColumnVector.java + ├── ArrowVarBinaryColumnVector.java + ├── ArrowDateColumnVector.java + ├── ArrowTimeColumnVector.java + ├── ArrowTimestampColumnVector.java + ├── ArrowArrayColumnVector.java + ├── ArrowMapColumnVector.java + └── ArrowRowColumnVector.java +``` + +### 3.2 ColumnVector Wrappers + +Each wrapper implements the corresponding Flink `ColumnVector` sub-interface and delegates to the Arrow `FieldVector`. Example pattern: + +```java +/** + * Arrow-backed column vector for INT columns. + * Wraps an Arrow IntVector and implements Flink's IntColumnVector. + */ +public final class ArrowIntColumnVector implements IntColumnVector { + + private final IntVector vector; + + public ArrowIntColumnVector(IntVector vector) { + this.vector = Preconditions.checkNotNull(vector); + } + + @Override + public int getInt(int i) { + return vector.get(i); + } + + @Override + public boolean isNullAt(int i) { + return vector.isNull(i); + } +} +``` + +All wrappers follow this same pattern. Notable type-specific handling: + +| Flink ColumnVector Interface | Arrow Vector | Notes | +|---|---|---| +| `BooleanColumnVector` | `BitVector` | | +| `ByteColumnVector` | `TinyIntVector` | | +| `ShortColumnVector` | `SmallIntVector` | | +| `IntColumnVector` | `IntVector` | Also used for `DateDayVector` (date as epoch days) | +| `LongColumnVector` | `BigIntVector` | | +| `FloatColumnVector` | `Float4Vector` | | +| `DoubleColumnVector` | `Float8Vector` | | +| `DecimalColumnVector` | `DecimalVector` | `fromUnscaledLong` for precision ≤ 18, `fromBigDecimal` otherwise | +| `BytesColumnVector` | `VarCharVector` | `new Bytes(vector.get(i))` — matches Flink upstream pattern | +| `BytesColumnVector` | `VarBinaryVector` | `new Bytes(vector.get(i))` — matches Flink upstream pattern | +| `IntColumnVector` | `DateDayVector` | Date stored as int (epoch days), same as Flink internal | +| `IntColumnVector` | `TimeMicroVector` | Convert microseconds back to milliseconds (int) to match Flink's internal representation | +| `TimestampColumnVector` | `TimeStampMicroVector` | Convert micros → `TimestampData.fromEpochMillis(millis, nanoOfMillisecond)` | +| `TimestampColumnVector` | `TimeStampMicroTZVector` | Same conversion, for `TIMESTAMP WITH LOCAL TIME ZONE` | +| `ArrayColumnVector` | `ListVector` | Recursively wraps element vector | +| `MapColumnVector` | `MapVector` | Recursively wraps key/value vectors | +| `RowColumnVector` | `StructVector` | Recursively wraps child field vectors | + +### 3.3 Timestamp Precision Handling + +The writer (PR #1930) normalizes all timestamps to **microsecond** precision in Arrow. The reader must reverse this: + +``` +Writer path: TimestampData → microseconds (long) stored in TimeStampMicroVector +Reader path: TimeStampMicroVector → microseconds (long) → TimestampData + +Conversion: + long micros = vector.get(i); + long millis = micros / 1000; + int nanoOfMillisecond = (int) (micros % 1000) * 1000; + return TimestampData.fromEpochMillis(millis, nanoOfMillisecond); +``` + +Similarly for Time: +``` +Writer path: int millis → micros (long) stored in TimeMicroVector +Reader path: TimeMicroVector → micros (long) → millis (int) + +Conversion: + return (int) (vector.get(i) / 1000); +``` + +This matches the writer's conversions in `FlinkArrowFieldWriter.TimestampWriter` and `FlinkArrowFieldWriter.TimeWriter`. Review Comment: FlinkArrowUtils.toArrowType() defines the Arrow schema type metadata, but the writer's FlinkArrowFieldWriter controls the actual data normalization. These are separate concerns — the schema metadata says "this is a Timestamp(MICROSECOND)" and the writer ensures the values are indeed in microseconds. The design doc's claim refers to the writer's data normalization, not toArrowType(). Will confirm with @x-tong on PR #1930. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
