weiqingy commented on code in PR #2063:
URL: https://github.com/apache/auron/pull/2063#discussion_r2879744303


##########
docs/PR-AURON-1851/AURON-1851-DESIGN.md:
##########
@@ -0,0 +1,461 @@
+# Design: Arrow to Flink RowData Conversion (AURON #1851)
+
+**Issue**: https://github.com/apache/auron/issues/1851
+**AIP**: AIP-1 — Introduce Flink integration of native engine
+**Author**: @weiqingy
+**Status**: Draft
+
+## 1. Motivation
+
+Per AIP-1, the Flink integration data path is:
+
+```
+Flink RowData → Arrow (Writer) → C Data Interface / JNI → Rust/DataFusion → 
Arrow → Flink RowData (Reader)
+                  ^^^^^^^^^^^^                                                 
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+                  #1850 (x-tong)                                               
 #1851 (this issue)
+```
+
+The writer side (#1850, PR #1930) converts Flink `RowData` into Arrow 
`VectorSchemaRoot` for export to the native engine. This issue implements the 
reverse: converting Arrow vectors returned by the native engine back into Flink 
`RowData` so downstream Flink operators can process the results.
+
+Without this component, the native execution results cannot flow back into the 
Flink pipeline.
+
+## 2. Design Approach
+
+### Two Candidate Approaches
+
+**Approach A — Columnar ColumnVector wrappers (recommended)**
+
+Follow Flink's own pattern from `flink-python`. Create thin `ColumnVector` 
wrappers around Arrow `FieldVector` types, then compose them into an 
`ArrowReader` that returns `ColumnarRowData`. This is how Flink itself reads 
Arrow data internally.
+
+- Each wrapper implements a Flink `ColumnVector` sub-interface (e.g., 
`IntColumnVector`, `BytesColumnVector`)
+- The wrapper delegates reads directly to the underlying Arrow vector — zero 
data copying
+- `ColumnarRowData` provides a row view over the batch without materializing 
individual rows
+
+**Approach B — Row-at-a-time FlinkArrowFieldReader**
+
+Mirror the writer pattern (PR #1930). Create `FlinkArrowFieldReader` 
subclasses that read field-by-field into `GenericRowData`, iterating row-by-row 
over the batch.
+
+- Conceptually simpler, symmetric with the writer
+- Copies data element-by-element — O(rows × columns) allocations per batch
+
+### Decision: Approach A
+
+Approach A is chosen for these reasons:
+
+1. **Performance**: Zero-copy columnar access. `ColumnarRowData` is just a 
view — no per-row object allocation. This aligns with AIP-1's goal of native 
vectorized execution.
+2. **Flink precedent**: This is exactly how Flink's own `flink-python` module 
reads Arrow data. The pattern is proven and maintained upstream.
+3. **Compatibility**: `ColumnarRowData` implements `RowData`, so it is 
transparent to all downstream Flink operators.
+4. **Memory efficiency**: Arrow vectors remain the single source of truth. No 
duplicate data structures.
+
+The tradeoff is more classes (one wrapper per vector type), but each is ~20 
lines of delegation code.
+
+## 3. Detailed Design
+
+### 3.1 Package Structure
+
+All classes in `org.apache.auron.flink.arrow` (same package as 
`FlinkArrowUtils`, `FlinkArrowWriter`).
+
+```
+auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/
+├── FlinkArrowUtils.java          (existing)
+├── FlinkArrowWriter.java         (PR #1930)
+├── FlinkArrowFieldWriter.java    (PR #1930)
+├── FlinkArrowReader.java         (NEW — orchestrator)
+└── vectors/                      (NEW — ColumnVector wrappers)
+    ├── ArrowBooleanColumnVector.java
+    ├── ArrowTinyIntColumnVector.java
+    ├── ArrowSmallIntColumnVector.java
+    ├── ArrowIntColumnVector.java
+    ├── ArrowBigIntColumnVector.java
+    ├── ArrowFloatColumnVector.java
+    ├── ArrowDoubleColumnVector.java
+    ├── ArrowDecimalColumnVector.java
+    ├── ArrowVarCharColumnVector.java
+    ├── ArrowVarBinaryColumnVector.java
+    ├── ArrowDateColumnVector.java
+    ├── ArrowTimeColumnVector.java
+    ├── ArrowTimestampColumnVector.java
+    ├── ArrowArrayColumnVector.java
+    ├── ArrowMapColumnVector.java
+    └── ArrowRowColumnVector.java
+```
+
+### 3.2 ColumnVector Wrappers
+
+Each wrapper implements the corresponding Flink `ColumnVector` sub-interface 
and delegates to the Arrow `FieldVector`. Example pattern:
+
+```java
+/**
+ * Arrow-backed column vector for INT columns.
+ * Wraps an Arrow IntVector and implements Flink's IntColumnVector.
+ */
+public final class ArrowIntColumnVector implements IntColumnVector {
+
+    private final IntVector vector;
+
+    public ArrowIntColumnVector(IntVector vector) {
+        this.vector = Preconditions.checkNotNull(vector);
+    }
+
+    @Override
+    public int getInt(int i) {
+        return vector.get(i);
+    }
+
+    @Override
+    public boolean isNullAt(int i) {
+        return vector.isNull(i);
+    }
+}
+```
+
+All wrappers follow this same pattern. Notable type-specific handling:
+
+| Flink ColumnVector Interface | Arrow Vector | Notes |
+|---|---|---|
+| `BooleanColumnVector` | `BitVector` | |
+| `ByteColumnVector` | `TinyIntVector` | |
+| `ShortColumnVector` | `SmallIntVector` | |
+| `IntColumnVector` | `IntVector` | Also used for `DateDayVector` (date as 
epoch days) |
+| `LongColumnVector` | `BigIntVector` | |
+| `FloatColumnVector` | `Float4Vector` | |
+| `DoubleColumnVector` | `Float8Vector` | |
+| `DecimalColumnVector` | `DecimalVector` | `fromUnscaledLong` for precision ≤ 
18, `fromBigDecimal` otherwise |
+| `BytesColumnVector` | `VarCharVector` | `new Bytes(vector.get(i))` — matches 
Flink upstream pattern |
+| `BytesColumnVector` | `VarBinaryVector` | `new Bytes(vector.get(i))` — 
matches Flink upstream pattern |
+| `IntColumnVector` | `DateDayVector` | Date stored as int (epoch days), same 
as Flink internal |
+| `IntColumnVector` | `TimeMicroVector` | Convert microseconds back to 
milliseconds (int) to match Flink's internal representation |
+| `TimestampColumnVector` | `TimeStampMicroVector` | Convert micros → 
`TimestampData.fromEpochMillis(millis, nanoOfMillisecond)` |
+| `TimestampColumnVector` | `TimeStampMicroTZVector` | Same conversion, for 
`TIMESTAMP WITH LOCAL TIME ZONE` |
+| `ArrayColumnVector` | `ListVector` | Recursively wraps element vector |
+| `MapColumnVector` | `MapVector` | Recursively wraps key/value vectors |
+| `RowColumnVector` | `StructVector` | Recursively wraps child field vectors |
+
+### 3.3 Timestamp Precision Handling
+
+The writer (PR #1930) normalizes all timestamps to **microsecond** precision 
in Arrow. The reader must reverse this:
+
+```
+Writer path:  TimestampData → microseconds (long) stored in 
TimeStampMicroVector
+Reader path:  TimeStampMicroVector → microseconds (long) → TimestampData
+
+Conversion:
+  long micros = vector.get(i);
+  long millis = micros / 1000;
+  int nanoOfMillisecond = (int) (micros % 1000) * 1000;
+  return TimestampData.fromEpochMillis(millis, nanoOfMillisecond);
+```
+
+Similarly for Time:
+```
+Writer path:  int millis → micros (long) stored in TimeMicroVector
+Reader path:  TimeMicroVector → micros (long) → millis (int)
+
+Conversion:
+  return (int) (vector.get(i) / 1000);
+```
+
+This matches the writer's conversions in 
`FlinkArrowFieldWriter.TimestampWriter` and `FlinkArrowFieldWriter.TimeWriter`.

Review Comment:
   FlinkArrowUtils.toArrowType() defines the Arrow schema type metadata, but 
the writer's
     FlinkArrowFieldWriter controls the actual data normalization. These are 
separate concerns — the schema
     metadata says "this is a Timestamp(MICROSECOND)" and the writer ensures 
the values are indeed in
     microseconds. The design doc's claim refers to the writer's data 
normalization, not toArrowType(). Will
     confirm with @x-tong on PR #1930.
   



##########
docs/PR-AURON-1851/AURON-1851-DESIGN.md:
##########
@@ -0,0 +1,461 @@
+# Design: Arrow to Flink RowData Conversion (AURON #1851)
+
+**Issue**: https://github.com/apache/auron/issues/1851
+**AIP**: AIP-1 — Introduce Flink integration of native engine
+**Author**: @weiqingy
+**Status**: Draft
+
+## 1. Motivation
+
+Per AIP-1, the Flink integration data path is:
+
+```
+Flink RowData → Arrow (Writer) → C Data Interface / JNI → Rust/DataFusion → 
Arrow → Flink RowData (Reader)
+                  ^^^^^^^^^^^^                                                 
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+                  #1850 (x-tong)                                               
 #1851 (this issue)
+```
+
+The writer side (#1850, PR #1930) converts Flink `RowData` into Arrow 
`VectorSchemaRoot` for export to the native engine. This issue implements the 
reverse: converting Arrow vectors returned by the native engine back into Flink 
`RowData` so downstream Flink operators can process the results.
+
+Without this component, the native execution results cannot flow back into the 
Flink pipeline.
+
+## 2. Design Approach
+
+### Two Candidate Approaches
+
+**Approach A — Columnar ColumnVector wrappers (recommended)**
+
+Follow Flink's own pattern from `flink-python`. Create thin `ColumnVector` 
wrappers around Arrow `FieldVector` types, then compose them into an 
`ArrowReader` that returns `ColumnarRowData`. This is how Flink itself reads 
Arrow data internally.
+
+- Each wrapper implements a Flink `ColumnVector` sub-interface (e.g., 
`IntColumnVector`, `BytesColumnVector`)
+- The wrapper delegates reads directly to the underlying Arrow vector — zero 
data copying
+- `ColumnarRowData` provides a row view over the batch without materializing 
individual rows
+
+**Approach B — Row-at-a-time FlinkArrowFieldReader**
+
+Mirror the writer pattern (PR #1930). Create `FlinkArrowFieldReader` 
subclasses that read field-by-field into `GenericRowData`, iterating row-by-row 
over the batch.
+
+- Conceptually simpler, symmetric with the writer
+- Copies data element-by-element — O(rows × columns) allocations per batch
+
+### Decision: Approach A
+
+Approach A is chosen for these reasons:
+
+1. **Performance**: Zero-copy columnar access. `ColumnarRowData` is just a 
view — no per-row object allocation. This aligns with AIP-1's goal of native 
vectorized execution.
+2. **Flink precedent**: This is exactly how Flink's own `flink-python` module 
reads Arrow data. The pattern is proven and maintained upstream.
+3. **Compatibility**: `ColumnarRowData` implements `RowData`, so it is 
transparent to all downstream Flink operators.
+4. **Memory efficiency**: Arrow vectors remain the single source of truth. No 
duplicate data structures.
+
+The tradeoff is more classes (one wrapper per vector type), but each is ~20 
lines of delegation code.
+
+## 3. Detailed Design
+
+### 3.1 Package Structure
+
+All classes in `org.apache.auron.flink.arrow` (same package as 
`FlinkArrowUtils`, `FlinkArrowWriter`).
+
+```
+auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/
+├── FlinkArrowUtils.java          (existing)
+├── FlinkArrowWriter.java         (PR #1930)
+├── FlinkArrowFieldWriter.java    (PR #1930)
+├── FlinkArrowReader.java         (NEW — orchestrator)
+└── vectors/                      (NEW — ColumnVector wrappers)
+    ├── ArrowBooleanColumnVector.java
+    ├── ArrowTinyIntColumnVector.java
+    ├── ArrowSmallIntColumnVector.java
+    ├── ArrowIntColumnVector.java
+    ├── ArrowBigIntColumnVector.java
+    ├── ArrowFloatColumnVector.java
+    ├── ArrowDoubleColumnVector.java
+    ├── ArrowDecimalColumnVector.java
+    ├── ArrowVarCharColumnVector.java
+    ├── ArrowVarBinaryColumnVector.java
+    ├── ArrowDateColumnVector.java
+    ├── ArrowTimeColumnVector.java
+    ├── ArrowTimestampColumnVector.java
+    ├── ArrowArrayColumnVector.java
+    ├── ArrowMapColumnVector.java
+    └── ArrowRowColumnVector.java
+```
+
+### 3.2 ColumnVector Wrappers
+
+Each wrapper implements the corresponding Flink `ColumnVector` sub-interface 
and delegates to the Arrow `FieldVector`. Example pattern:
+
+```java
+/**
+ * Arrow-backed column vector for INT columns.
+ * Wraps an Arrow IntVector and implements Flink's IntColumnVector.
+ */
+public final class ArrowIntColumnVector implements IntColumnVector {
+
+    private final IntVector vector;
+
+    public ArrowIntColumnVector(IntVector vector) {
+        this.vector = Preconditions.checkNotNull(vector);
+    }
+
+    @Override
+    public int getInt(int i) {
+        return vector.get(i);
+    }
+
+    @Override
+    public boolean isNullAt(int i) {
+        return vector.isNull(i);
+    }
+}
+```
+
+All wrappers follow this same pattern. Notable type-specific handling:
+
+| Flink ColumnVector Interface | Arrow Vector | Notes |
+|---|---|---|
+| `BooleanColumnVector` | `BitVector` | |
+| `ByteColumnVector` | `TinyIntVector` | |
+| `ShortColumnVector` | `SmallIntVector` | |
+| `IntColumnVector` | `IntVector` | Also used for `DateDayVector` (date as 
epoch days) |
+| `LongColumnVector` | `BigIntVector` | |
+| `FloatColumnVector` | `Float4Vector` | |
+| `DoubleColumnVector` | `Float8Vector` | |
+| `DecimalColumnVector` | `DecimalVector` | `fromUnscaledLong` for precision ≤ 
18, `fromBigDecimal` otherwise |
+| `BytesColumnVector` | `VarCharVector` | `new Bytes(vector.get(i))` — matches 
Flink upstream pattern |
+| `BytesColumnVector` | `VarBinaryVector` | `new Bytes(vector.get(i))` — 
matches Flink upstream pattern |
+| `IntColumnVector` | `DateDayVector` | Date stored as int (epoch days), same 
as Flink internal |
+| `IntColumnVector` | `TimeMicroVector` | Convert microseconds back to 
milliseconds (int) to match Flink's internal representation |
+| `TimestampColumnVector` | `TimeStampMicroVector` | Convert micros → 
`TimestampData.fromEpochMillis(millis, nanoOfMillisecond)` |
+| `TimestampColumnVector` | `TimeStampMicroTZVector` | Same conversion, for 
`TIMESTAMP WITH LOCAL TIME ZONE` |
+| `ArrayColumnVector` | `ListVector` | Recursively wraps element vector |
+| `MapColumnVector` | `MapVector` | Recursively wraps key/value vectors |
+| `RowColumnVector` | `StructVector` | Recursively wraps child field vectors |
+
+### 3.3 Timestamp Precision Handling
+
+The writer (PR #1930) normalizes all timestamps to **microsecond** precision 
in Arrow. The reader must reverse this:
+
+```
+Writer path:  TimestampData → microseconds (long) stored in 
TimeStampMicroVector
+Reader path:  TimeStampMicroVector → microseconds (long) → TimestampData
+
+Conversion:
+  long micros = vector.get(i);
+  long millis = micros / 1000;
+  int nanoOfMillisecond = (int) (micros % 1000) * 1000;
+  return TimestampData.fromEpochMillis(millis, nanoOfMillisecond);
+```
+
+Similarly for Time:
+```
+Writer path:  int millis → micros (long) stored in TimeMicroVector
+Reader path:  TimeMicroVector → micros (long) → millis (int)
+
+Conversion:
+  return (int) (vector.get(i) / 1000);
+```
+
+This matches the writer's conversions in 
`FlinkArrowFieldWriter.TimestampWriter` and `FlinkArrowFieldWriter.TimeWriter`.

Review Comment:
   FlinkArrowUtils.toArrowType() defines the Arrow schema type metadata, but 
the writer's FlinkArrowFieldWriter controls the actual data normalization. 
These are separate concerns — the schema
     metadata says "this is a Timestamp(MICROSECOND)" and the writer ensures 
the values are indeed in
     microseconds. The design doc's claim refers to the writer's data 
normalization, not toArrowType(). Will
     confirm with @x-tong on PR #1930.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to