Copilot commented on code in PR #2063: URL: https://github.com/apache/auron/pull/2063#discussion_r2877127400
########## auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowBooleanColumnVector.java: ########## @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.auron.flink.arrow.vectors; + +import org.apache.arrow.vector.BitVector; +import org.apache.flink.table.data.columnar.vector.BooleanColumnVector; +import org.apache.flink.util.Preconditions; + +/** + * A Flink {@link BooleanColumnVector} backed by an Arrow {@link BitVector}. + * + * <p>This wrapper delegates all reads to the underlying Arrow vector, providing zero-copy access + * to Arrow data from Flink's columnar batch execution engine. + */ +public final class ArrowBooleanColumnVector implements BooleanColumnVector { + + private BitVector vector; + + /** + * Creates a new wrapper around the given Arrow {@link BitVector}. + * + * @param vector the Arrow vector to wrap, must not be null + */ + public ArrowBooleanColumnVector(BitVector vector) { + this.vector = Preconditions.checkNotNull(vector); + } + + /** {@inheritDoc} */ + @Override + public boolean isNullAt(int i) { + return vector.isNull(i); + } + + /** {@inheritDoc} */ + @Override + public boolean getBoolean(int i) { + return vector.get(i) != 0; + } + + /** + * Replaces the underlying Arrow vector. Used during reader reset to point at a new batch + * without allocating a new wrapper. + * + * @param vector the new Arrow vector, must not be null + */ + void setVector(BitVector vector) { + this.vector = Preconditions.checkNotNull(vector); + } Review Comment: The `setVector()` method in `ArrowBooleanColumnVector` (and all other `ArrowXxxColumnVector` wrapper classes) is dead code. The `FlinkArrowReader.reset()` method does not call `setVector()` on the existing wrappers — instead it creates entirely new wrapper instances via `createColumnVector()`. Since these package-private methods are not called from anywhere and have no external callers, they add maintenance overhead without serving any purpose. They should either be removed, or `FlinkArrowReader.reset()` should be updated to use them (which would avoid the extra wrapper allocations per batch reset). ```suggestion ``` ########## auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowReaderTest.java: ########## @@ -0,0 +1,714 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.auron.flink.arrow; + +import static org.junit.jupiter.api.Assertions.*; + +import java.math.BigDecimal; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.BitVector; +import org.apache.arrow.vector.DateDayVector; +import org.apache.arrow.vector.DecimalVector; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.Float4Vector; +import org.apache.arrow.vector.Float8Vector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.SmallIntVector; +import org.apache.arrow.vector.TimeMicroVector; +import org.apache.arrow.vector.TimeStampMicroTZVector; +import org.apache.arrow.vector.TimeStampMicroVector; +import org.apache.arrow.vector.TinyIntVector; +import org.apache.arrow.vector.VarBinaryVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.complex.ListVector; +import org.apache.arrow.vector.complex.MapVector; +import org.apache.arrow.vector.complex.StructVector; +import org.apache.arrow.vector.complex.impl.UnionListWriter; +import org.apache.arrow.vector.complex.writer.BaseWriter; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.BooleanType; +import org.apache.flink.table.types.logical.DateType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.DoubleType; +import org.apache.flink.table.types.logical.FloatType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.RawType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.SmallIntType; +import org.apache.flink.table.types.logical.TimeType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.TinyIntType; +import org.apache.flink.table.types.logical.VarBinaryType; +import org.apache.flink.table.types.logical.VarCharType; +import org.junit.jupiter.api.Test; + +/** Unit tests for {@link FlinkArrowReader}. */ +public class FlinkArrowReaderTest { + + @Test + public void testBooleanVector() { + try (BufferAllocator allocator = + FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testBoolean", 0, Long.MAX_VALUE)) { + BitVector bitVector = new BitVector("col", allocator); + bitVector.allocateNew(3); + bitVector.setSafe(0, 1); + bitVector.setNull(1); + bitVector.setSafe(2, 0); + bitVector.setValueCount(3); + + VectorSchemaRoot root = new VectorSchemaRoot(Collections.singletonList(bitVector)); + RowType rowType = RowType.of(new BooleanType()); + FlinkArrowReader reader = FlinkArrowReader.create(root, rowType); + + assertEquals(3, reader.getRowCount()); + assertTrue(reader.read(0).getBoolean(0)); + assertTrue(reader.read(1).isNullAt(0)); + assertFalse(reader.read(2).getBoolean(0)); + + reader.close(); + root.close(); + } + } + + @Test + public void testTinyIntVector() { + try (BufferAllocator allocator = + FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testTinyInt", 0, Long.MAX_VALUE)) { + TinyIntVector vec = new TinyIntVector("col", allocator); + vec.allocateNew(3); + vec.setSafe(0, 1); + vec.setNull(1); + vec.setSafe(2, -1); + vec.setValueCount(3); + + VectorSchemaRoot root = new VectorSchemaRoot(Collections.singletonList(vec)); + RowType rowType = RowType.of(new TinyIntType()); + FlinkArrowReader reader = FlinkArrowReader.create(root, rowType); + + assertEquals((byte) 1, reader.read(0).getByte(0)); + assertTrue(reader.read(1).isNullAt(0)); + assertEquals((byte) -1, reader.read(2).getByte(0)); + + reader.close(); + root.close(); + } + } + + @Test + public void testSmallIntVector() { + try (BufferAllocator allocator = + FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testSmallInt", 0, Long.MAX_VALUE)) { + SmallIntVector vec = new SmallIntVector("col", allocator); + vec.allocateNew(3); + vec.setSafe(0, 100); + vec.setNull(1); + vec.setSafe(2, -100); + vec.setValueCount(3); + + VectorSchemaRoot root = new VectorSchemaRoot(Collections.singletonList(vec)); + RowType rowType = RowType.of(new SmallIntType()); + FlinkArrowReader reader = FlinkArrowReader.create(root, rowType); + + assertEquals((short) 100, reader.read(0).getShort(0)); + assertTrue(reader.read(1).isNullAt(0)); + assertEquals((short) -100, reader.read(2).getShort(0)); + + reader.close(); + root.close(); + } + } + + @Test + public void testIntVector() { + try (BufferAllocator allocator = + FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testInt", 0, Long.MAX_VALUE)) { + IntVector vec = new IntVector("col", allocator); + vec.allocateNew(3); + vec.setSafe(0, 42); + vec.setNull(1); + vec.setSafe(2, Integer.MAX_VALUE); + vec.setValueCount(3); + + VectorSchemaRoot root = new VectorSchemaRoot(Collections.singletonList(vec)); + RowType rowType = RowType.of(new IntType()); + FlinkArrowReader reader = FlinkArrowReader.create(root, rowType); + + assertEquals(42, reader.read(0).getInt(0)); + assertTrue(reader.read(1).isNullAt(0)); + assertEquals(Integer.MAX_VALUE, reader.read(2).getInt(0)); + + reader.close(); + root.close(); + } + } + + @Test + public void testBigIntVector() { + try (BufferAllocator allocator = + FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testBigInt", 0, Long.MAX_VALUE)) { + BigIntVector vec = new BigIntVector("col", allocator); + vec.allocateNew(3); + vec.setSafe(0, Long.MAX_VALUE); + vec.setNull(1); + vec.setSafe(2, -1L); + vec.setValueCount(3); + + VectorSchemaRoot root = new VectorSchemaRoot(Collections.singletonList(vec)); + RowType rowType = RowType.of(new BigIntType()); + FlinkArrowReader reader = FlinkArrowReader.create(root, rowType); + + assertEquals(Long.MAX_VALUE, reader.read(0).getLong(0)); + assertTrue(reader.read(1).isNullAt(0)); + assertEquals(-1L, reader.read(2).getLong(0)); + + reader.close(); + root.close(); + } + } + + @Test + public void testFloatVector() { + try (BufferAllocator allocator = + FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testFloat", 0, Long.MAX_VALUE)) { + Float4Vector vec = new Float4Vector("col", allocator); + vec.allocateNew(3); + vec.setSafe(0, 3.14f); + vec.setNull(1); + vec.setSafe(2, Float.NaN); + vec.setValueCount(3); + + VectorSchemaRoot root = new VectorSchemaRoot(Collections.singletonList(vec)); + RowType rowType = RowType.of(new FloatType()); + FlinkArrowReader reader = FlinkArrowReader.create(root, rowType); + + assertEquals(3.14f, reader.read(0).getFloat(0), 0.001f); + assertTrue(reader.read(1).isNullAt(0)); + assertTrue(Float.isNaN(reader.read(2).getFloat(0))); + + reader.close(); + root.close(); + } + } + + @Test + public void testDoubleVector() { + try (BufferAllocator allocator = + FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testDouble", 0, Long.MAX_VALUE)) { + Float8Vector vec = new Float8Vector("col", allocator); + vec.allocateNew(3); + vec.setSafe(0, 2.718); + vec.setNull(1); + vec.setSafe(2, Double.MAX_VALUE); + vec.setValueCount(3); + + VectorSchemaRoot root = new VectorSchemaRoot(Collections.singletonList(vec)); + RowType rowType = RowType.of(new DoubleType()); + FlinkArrowReader reader = FlinkArrowReader.create(root, rowType); + + assertEquals(2.718, reader.read(0).getDouble(0), 0.001); + assertTrue(reader.read(1).isNullAt(0)); + assertEquals(Double.MAX_VALUE, reader.read(2).getDouble(0)); + + reader.close(); + root.close(); + } + } + + @Test + public void testVarCharVector() { + try (BufferAllocator allocator = + FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testVarChar", 0, Long.MAX_VALUE)) { + VarCharVector vec = new VarCharVector("col", allocator); + vec.allocateNew(3); + vec.setSafe(0, "hello".getBytes(StandardCharsets.UTF_8)); + vec.setNull(1); + vec.setSafe(2, "".getBytes(StandardCharsets.UTF_8)); + vec.setValueCount(3); + + VectorSchemaRoot root = new VectorSchemaRoot(Collections.singletonList(vec)); + RowType rowType = RowType.of(new VarCharType(100)); + FlinkArrowReader reader = FlinkArrowReader.create(root, rowType); + + RowData row0 = reader.read(0); + assertArrayEquals( + "hello".getBytes(StandardCharsets.UTF_8), row0.getString(0).toBytes()); + assertTrue(reader.read(1).isNullAt(0)); + assertEquals(0, reader.read(2).getString(0).toBytes().length); + + reader.close(); + root.close(); + } + } + + @Test + public void testVarBinaryVector() { + try (BufferAllocator allocator = + FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testVarBinary", 0, Long.MAX_VALUE)) { + VarBinaryVector vec = new VarBinaryVector("col", allocator); + vec.allocateNew(3); + vec.setSafe(0, new byte[] {0x01, 0x02}); + vec.setNull(1); + vec.setSafe(2, new byte[] {}); + vec.setValueCount(3); + + VectorSchemaRoot root = new VectorSchemaRoot(Collections.singletonList(vec)); + RowType rowType = RowType.of(new VarBinaryType(100)); + FlinkArrowReader reader = FlinkArrowReader.create(root, rowType); + + assertArrayEquals(new byte[] {0x01, 0x02}, reader.read(0).getBinary(0)); + assertTrue(reader.read(1).isNullAt(0)); + assertArrayEquals(new byte[] {}, reader.read(2).getBinary(0)); + + reader.close(); + root.close(); + } + } + + @Test + public void testDecimalVector() { + try (BufferAllocator allocator = + FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testDecimal", 0, Long.MAX_VALUE)) { + // Compact path: precision 10 (<= 18) + DecimalVector compactVec = new DecimalVector("compact", allocator, 10, 2); + compactVec.allocateNew(2); + compactVec.setSafe(0, new BigDecimal("123.45")); + compactVec.setNull(1); + compactVec.setValueCount(2); + + VectorSchemaRoot compactRoot = new VectorSchemaRoot(Collections.singletonList(compactVec)); + RowType compactType = RowType.of(new DecimalType(10, 2)); + FlinkArrowReader compactReader = FlinkArrowReader.create(compactRoot, compactType); + + DecimalData compactVal = compactReader.read(0).getDecimal(0, 10, 2); + assertEquals(new BigDecimal("123.45"), compactVal.toBigDecimal()); + assertTrue(compactReader.read(1).isNullAt(0)); + + compactReader.close(); + compactRoot.close(); + + // Wide path: precision 20 (> 18) + DecimalVector wideVec = new DecimalVector("wide", allocator, 20, 2); + wideVec.allocateNew(1); + wideVec.setSafe(0, new BigDecimal("123456789012345678.90")); + wideVec.setValueCount(1); + + VectorSchemaRoot wideRoot = new VectorSchemaRoot(Collections.singletonList(wideVec)); + RowType wideType = RowType.of(new DecimalType(20, 2)); + FlinkArrowReader wideReader = FlinkArrowReader.create(wideRoot, wideType); + + DecimalData wideVal = wideReader.read(0).getDecimal(0, 20, 2); + assertEquals(0, new BigDecimal("123456789012345678.90").compareTo(wideVal.toBigDecimal())); + + wideReader.close(); + wideRoot.close(); + } + } + + @Test + public void testDateVector() { + try (BufferAllocator allocator = + FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testDate", 0, Long.MAX_VALUE)) { + DateDayVector vec = new DateDayVector("col", allocator); + vec.allocateNew(3); + vec.setSafe(0, 18000); + vec.setNull(1); + vec.setSafe(2, 0); + vec.setValueCount(3); + + VectorSchemaRoot root = new VectorSchemaRoot(Collections.singletonList(vec)); + RowType rowType = RowType.of(new DateType()); + FlinkArrowReader reader = FlinkArrowReader.create(root, rowType); + + assertEquals(18000, reader.read(0).getInt(0)); + assertTrue(reader.read(1).isNullAt(0)); + assertEquals(0, reader.read(2).getInt(0)); + + reader.close(); + root.close(); + } + } + + @Test + public void testTimeVector() { + try (BufferAllocator allocator = + FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testTime", 0, Long.MAX_VALUE)) { + TimeMicroVector vec = new TimeMicroVector("col", allocator); + vec.allocateNew(3); + vec.setSafe(0, 45_296_000_000L); // 45296000000 micros -> 45296000 millis + vec.setNull(1); + vec.setSafe(2, 0L); + vec.setValueCount(3); + + VectorSchemaRoot root = new VectorSchemaRoot(Collections.singletonList(vec)); + RowType rowType = RowType.of(new TimeType(6)); + FlinkArrowReader reader = FlinkArrowReader.create(root, rowType); + + // ArrowTimeColumnVector divides micros by 1000 to get millis + assertEquals(45_296_000, reader.read(0).getInt(0)); + assertTrue(reader.read(1).isNullAt(0)); + assertEquals(0, reader.read(2).getInt(0)); + + reader.close(); + root.close(); + } + } Review Comment: The test only covers `TIME(6)` and `TIMESTAMP(6)`, which are the only precisions that happen to work correctly with the current reader implementation. Tests for `TIME(0)`, `TIME(3)`, and `TIME(9)` (and their TIMESTAMP equivalents) are missing. Adding such tests would immediately expose the critical `ClassCastException` / incorrect unit conversion bug in `FlinkArrowReader.createColumnVector` for TIME/TIMESTAMP of non-microsecond precision. ########## docs/PR-AURON-1851/AURON-1851-DESIGN.md: ########## @@ -0,0 +1,461 @@ +# Design: Arrow to Flink RowData Conversion (AURON #1851) + +**Issue**: https://github.com/apache/auron/issues/1851 +**AIP**: AIP-1 — Introduce Flink integration of native engine +**Author**: @weiqingy +**Status**: Draft + +## 1. Motivation + +Per AIP-1, the Flink integration data path is: + +``` +Flink RowData → Arrow (Writer) → C Data Interface / JNI → Rust/DataFusion → Arrow → Flink RowData (Reader) + ^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + #1850 (x-tong) #1851 (this issue) +``` + +The writer side (#1850, PR #1930) converts Flink `RowData` into Arrow `VectorSchemaRoot` for export to the native engine. This issue implements the reverse: converting Arrow vectors returned by the native engine back into Flink `RowData` so downstream Flink operators can process the results. + +Without this component, the native execution results cannot flow back into the Flink pipeline. + +## 2. Design Approach + +### Two Candidate Approaches + +**Approach A — Columnar ColumnVector wrappers (recommended)** + +Follow Flink's own pattern from `flink-python`. Create thin `ColumnVector` wrappers around Arrow `FieldVector` types, then compose them into an `ArrowReader` that returns `ColumnarRowData`. This is how Flink itself reads Arrow data internally. + +- Each wrapper implements a Flink `ColumnVector` sub-interface (e.g., `IntColumnVector`, `BytesColumnVector`) +- The wrapper delegates reads directly to the underlying Arrow vector — zero data copying +- `ColumnarRowData` provides a row view over the batch without materializing individual rows + +**Approach B — Row-at-a-time FlinkArrowFieldReader** + +Mirror the writer pattern (PR #1930). Create `FlinkArrowFieldReader` subclasses that read field-by-field into `GenericRowData`, iterating row-by-row over the batch. + +- Conceptually simpler, symmetric with the writer +- Copies data element-by-element — O(rows × columns) allocations per batch + +### Decision: Approach A + +Approach A is chosen for these reasons: + +1. **Performance**: Zero-copy columnar access. `ColumnarRowData` is just a view — no per-row object allocation. This aligns with AIP-1's goal of native vectorized execution. +2. **Flink precedent**: This is exactly how Flink's own `flink-python` module reads Arrow data. The pattern is proven and maintained upstream. +3. **Compatibility**: `ColumnarRowData` implements `RowData`, so it is transparent to all downstream Flink operators. +4. **Memory efficiency**: Arrow vectors remain the single source of truth. No duplicate data structures. + +The tradeoff is more classes (one wrapper per vector type), but each is ~20 lines of delegation code. + +## 3. Detailed Design + +### 3.1 Package Structure + +All classes in `org.apache.auron.flink.arrow` (same package as `FlinkArrowUtils`, `FlinkArrowWriter`). + +``` +auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/ +├── FlinkArrowUtils.java (existing) +├── FlinkArrowWriter.java (PR #1930) +├── FlinkArrowFieldWriter.java (PR #1930) +├── FlinkArrowReader.java (NEW — orchestrator) +└── vectors/ (NEW — ColumnVector wrappers) + ├── ArrowBooleanColumnVector.java + ├── ArrowTinyIntColumnVector.java + ├── ArrowSmallIntColumnVector.java + ├── ArrowIntColumnVector.java + ├── ArrowBigIntColumnVector.java + ├── ArrowFloatColumnVector.java + ├── ArrowDoubleColumnVector.java + ├── ArrowDecimalColumnVector.java + ├── ArrowVarCharColumnVector.java + ├── ArrowVarBinaryColumnVector.java + ├── ArrowDateColumnVector.java + ├── ArrowTimeColumnVector.java + ├── ArrowTimestampColumnVector.java + ├── ArrowArrayColumnVector.java + ├── ArrowMapColumnVector.java + └── ArrowRowColumnVector.java +``` + +### 3.2 ColumnVector Wrappers + +Each wrapper implements the corresponding Flink `ColumnVector` sub-interface and delegates to the Arrow `FieldVector`. Example pattern: + +```java +/** + * Arrow-backed column vector for INT columns. + * Wraps an Arrow IntVector and implements Flink's IntColumnVector. + */ +public final class ArrowIntColumnVector implements IntColumnVector { + + private final IntVector vector; + + public ArrowIntColumnVector(IntVector vector) { + this.vector = Preconditions.checkNotNull(vector); + } + + @Override + public int getInt(int i) { + return vector.get(i); + } + + @Override + public boolean isNullAt(int i) { + return vector.isNull(i); + } +} +``` + +All wrappers follow this same pattern. Notable type-specific handling: + +| Flink ColumnVector Interface | Arrow Vector | Notes | +|---|---|---| +| `BooleanColumnVector` | `BitVector` | | +| `ByteColumnVector` | `TinyIntVector` | | +| `ShortColumnVector` | `SmallIntVector` | | +| `IntColumnVector` | `IntVector` | Also used for `DateDayVector` (date as epoch days) | +| `LongColumnVector` | `BigIntVector` | | +| `FloatColumnVector` | `Float4Vector` | | +| `DoubleColumnVector` | `Float8Vector` | | +| `DecimalColumnVector` | `DecimalVector` | `fromUnscaledLong` for precision ≤ 18, `fromBigDecimal` otherwise | +| `BytesColumnVector` | `VarCharVector` | `new Bytes(vector.get(i))` — matches Flink upstream pattern | +| `BytesColumnVector` | `VarBinaryVector` | `new Bytes(vector.get(i))` — matches Flink upstream pattern | +| `IntColumnVector` | `DateDayVector` | Date stored as int (epoch days), same as Flink internal | +| `IntColumnVector` | `TimeMicroVector` | Convert microseconds back to milliseconds (int) to match Flink's internal representation | +| `TimestampColumnVector` | `TimeStampMicroVector` | Convert micros → `TimestampData.fromEpochMillis(millis, nanoOfMillisecond)` | +| `TimestampColumnVector` | `TimeStampMicroTZVector` | Same conversion, for `TIMESTAMP WITH LOCAL TIME ZONE` | +| `ArrayColumnVector` | `ListVector` | Recursively wraps element vector | +| `MapColumnVector` | `MapVector` | Recursively wraps key/value vectors | +| `RowColumnVector` | `StructVector` | Recursively wraps child field vectors | + +### 3.3 Timestamp Precision Handling + +The writer (PR #1930) normalizes all timestamps to **microsecond** precision in Arrow. The reader must reverse this: + +``` +Writer path: TimestampData → microseconds (long) stored in TimeStampMicroVector +Reader path: TimeStampMicroVector → microseconds (long) → TimestampData + +Conversion: + long micros = vector.get(i); + long millis = micros / 1000; + int nanoOfMillisecond = (int) (micros % 1000) * 1000; + return TimestampData.fromEpochMillis(millis, nanoOfMillisecond); +``` + +Similarly for Time: +``` +Writer path: int millis → micros (long) stored in TimeMicroVector +Reader path: TimeMicroVector → micros (long) → millis (int) + +Conversion: + return (int) (vector.get(i) / 1000); +``` + +This matches the writer's conversions in `FlinkArrowFieldWriter.TimestampWriter` and `FlinkArrowFieldWriter.TimeWriter`. Review Comment: The design document claims "The writer (PR #1930) normalizes all timestamps to **microsecond** precision in Arrow" and makes the same claim for TIME types. However, `FlinkArrowUtils.toArrowType()` (which is the existing type-mapping utility used by the writer) maps TIME and TIMESTAMP to different Arrow time units based on the declared Flink precision (SECOND for precision 0, MILLISECOND for 1-3, MICROSECOND for 4-6, NANOSECOND for 7+). The design document's claim is inaccurate and should be corrected to reflect the actual precision-dependent mapping. ########## auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowReader.java: ########## @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.auron.flink.arrow; + +import java.util.List; +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.BitVector; +import org.apache.arrow.vector.DateDayVector; +import org.apache.arrow.vector.DecimalVector; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.Float4Vector; +import org.apache.arrow.vector.Float8Vector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.SmallIntVector; +import org.apache.arrow.vector.TimeMicroVector; +import org.apache.arrow.vector.TimeStampVector; +import org.apache.arrow.vector.TinyIntVector; +import org.apache.arrow.vector.VarBinaryVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.complex.ListVector; +import org.apache.arrow.vector.complex.MapVector; +import org.apache.arrow.vector.complex.StructVector; +import org.apache.auron.flink.arrow.vectors.ArrowArrayColumnVector; +import org.apache.auron.flink.arrow.vectors.ArrowBigIntColumnVector; +import org.apache.auron.flink.arrow.vectors.ArrowBooleanColumnVector; +import org.apache.auron.flink.arrow.vectors.ArrowDateColumnVector; +import org.apache.auron.flink.arrow.vectors.ArrowDecimalColumnVector; +import org.apache.auron.flink.arrow.vectors.ArrowDoubleColumnVector; +import org.apache.auron.flink.arrow.vectors.ArrowFloatColumnVector; +import org.apache.auron.flink.arrow.vectors.ArrowIntColumnVector; +import org.apache.auron.flink.arrow.vectors.ArrowMapColumnVector; +import org.apache.auron.flink.arrow.vectors.ArrowRowColumnVector; +import org.apache.auron.flink.arrow.vectors.ArrowSmallIntColumnVector; +import org.apache.auron.flink.arrow.vectors.ArrowTimeColumnVector; +import org.apache.auron.flink.arrow.vectors.ArrowTimestampColumnVector; +import org.apache.auron.flink.arrow.vectors.ArrowTinyIntColumnVector; +import org.apache.auron.flink.arrow.vectors.ArrowVarBinaryColumnVector; +import org.apache.auron.flink.arrow.vectors.ArrowVarCharColumnVector; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.columnar.ColumnarRowData; +import org.apache.flink.table.data.columnar.vector.ColumnVector; +import org.apache.flink.table.data.columnar.vector.VectorizedColumnBatch; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.Preconditions; + +/** + * Reads Arrow {@link VectorSchemaRoot} data as Flink {@link RowData}. + * + * <p>Uses Flink's columnar data structures ({@link ColumnarRowData} backed by {@link + * VectorizedColumnBatch}) for zero-copy access to Arrow vectors. Each Arrow {@link FieldVector} is + * wrapped in a Flink {@link ColumnVector} implementation that delegates reads directly to the + * underlying Arrow vector. + * + * <p>Object reuse: {@link #read(int)} returns the same {@link ColumnarRowData} instance with a + * different row ID. Callers must consume or copy the returned row before the next call. This is + * standard Flink practice for columnar readers. + * + * <p>Implements {@link AutoCloseable} for use in resource management blocks. Note: this does NOT + * close the underlying {@link VectorSchemaRoot}. The caller that created the root is responsible + * for its lifecycle. + */ +public class FlinkArrowReader implements AutoCloseable { + + private final RowType rowType; + private ColumnVector[] columnVectors; + private VectorizedColumnBatch batch; + private ColumnarRowData reusableRow; + private VectorSchemaRoot root; + + private FlinkArrowReader(ColumnVector[] columnVectors, VectorSchemaRoot root, RowType rowType) { + this.columnVectors = columnVectors; + this.batch = new VectorizedColumnBatch(columnVectors); + this.reusableRow = new ColumnarRowData(batch); + this.root = root; + this.rowType = rowType; + } + + /** + * Creates a {@link FlinkArrowReader} from a {@link VectorSchemaRoot} and {@link RowType}. + * + * <p>The RowType must match the schema of the VectorSchemaRoot (same number of fields, matching + * types). Each Arrow field vector is wrapped in the appropriate Flink {@link ColumnVector} + * implementation based on the corresponding Flink {@link LogicalType}. + * + * @param root the Arrow VectorSchemaRoot containing the data + * @param rowType the Flink RowType describing the schema + * @return a new FlinkArrowReader + * @throws IllegalArgumentException if field counts do not match + * @throws UnsupportedOperationException if a LogicalType is not supported + */ + public static FlinkArrowReader create(VectorSchemaRoot root, RowType rowType) { + Preconditions.checkNotNull(root, "root must not be null"); + Preconditions.checkNotNull(rowType, "rowType must not be null"); + List<FieldVector> fieldVectors = root.getFieldVectors(); + List<RowType.RowField> fields = rowType.getFields(); + if (fieldVectors.size() != fields.size()) { + throw new IllegalArgumentException( + "VectorSchemaRoot has " + fieldVectors.size() + " fields but RowType has " + fields.size()); + } + ColumnVector[] columns = new ColumnVector[fieldVectors.size()]; + for (int i = 0; i < fieldVectors.size(); i++) { + columns[i] = createColumnVector(fieldVectors.get(i), fields.get(i).getType()); + } + return new FlinkArrowReader(columns, root, rowType); + } + + /** + * Reads a row at the given position. Returns a reused {@link RowData} object — callers must not + * hold references across calls. + * + * @param rowId the row index within the current batch + * @return the row data at the given position + */ + public RowData read(int rowId) { + reusableRow.setRowId(rowId); + return reusableRow; + } + + /** + * Returns the number of rows in the current batch. + * + * @return the row count + */ + public int getRowCount() { + return root.getRowCount(); + } + + /** + * Resets the reader to use a new {@link VectorSchemaRoot} with the same schema. Recreates + * column vector wrappers for the new root's field vectors. + * + * <p>The new root must have the same schema (same number and types of fields) as the original. + * + * @param newRoot the new VectorSchemaRoot, must not be null + */ + public void reset(VectorSchemaRoot newRoot) { + Preconditions.checkNotNull(newRoot, "newRoot must not be null"); + this.root = newRoot; + List<FieldVector> newVectors = newRoot.getFieldVectors(); + Preconditions.checkArgument( + newVectors.size() == columnVectors.length, + "New root has %s fields but reader expects %s", + newVectors.size(), + columnVectors.length); + List<RowType.RowField> fields = rowType.getFields(); + for (int i = 0; i < columnVectors.length; i++) { + columnVectors[i] = + createColumnVector(newVectors.get(i), fields.get(i).getType()); + } + this.batch = new VectorizedColumnBatch(columnVectors); + this.reusableRow = new ColumnarRowData(batch); + } + + /** + * Implements {@link AutoCloseable} for use in resource management blocks. Note: this does NOT + * close the underlying {@link VectorSchemaRoot}. The caller that created the root is responsible + * for its lifecycle. + */ + @Override + public void close() { + // Reader is a view; root lifecycle managed by caller. + } + + /** + * Creates the appropriate Flink {@link ColumnVector} wrapper for the given Arrow {@link + * FieldVector} and Flink {@link LogicalType}. + */ + static ColumnVector createColumnVector(FieldVector vector, LogicalType type) { + switch (type.getTypeRoot()) { + case BOOLEAN: + return new ArrowBooleanColumnVector((BitVector) vector); + case TINYINT: + return new ArrowTinyIntColumnVector((TinyIntVector) vector); + case SMALLINT: + return new ArrowSmallIntColumnVector((SmallIntVector) vector); + case INTEGER: + return new ArrowIntColumnVector((IntVector) vector); + case BIGINT: + return new ArrowBigIntColumnVector((BigIntVector) vector); + case FLOAT: + return new ArrowFloatColumnVector((Float4Vector) vector); + case DOUBLE: + return new ArrowDoubleColumnVector((Float8Vector) vector); + case VARCHAR: + case CHAR: + return new ArrowVarCharColumnVector((VarCharVector) vector); + case VARBINARY: + case BINARY: + return new ArrowVarBinaryColumnVector((VarBinaryVector) vector); + case DECIMAL: + return new ArrowDecimalColumnVector((DecimalVector) vector); + case DATE: + return new ArrowDateColumnVector((DateDayVector) vector); + case TIME_WITHOUT_TIME_ZONE: + // The writer (FlinkArrowFieldWriter) normalizes all TIME values to microseconds + // in a TimeMicroVector, regardless of the declared Flink TIME precision. + return new ArrowTimeColumnVector((TimeMicroVector) vector); + case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + // The writer normalizes all timestamps to microseconds. TimeStampVector is the + // common parent of TimeStampMicroVector and TimeStampMicroTZVector. + return new ArrowTimestampColumnVector((TimeStampVector) vector); Review Comment: The `createColumnVector` method for `TIME_WITHOUT_TIME_ZONE` unconditionally casts the Arrow vector to `TimeMicroVector`, but `FlinkArrowUtils.toArrowType()` maps TIME to different Arrow time units based on precision: - TIME(0) → `TimeSecVector` (not `TimeMicroVector`) → `ClassCastException` at runtime - TIME(1-3) → `TimeMilliVector` (not `TimeMicroVector`) → `ClassCastException` at runtime - TIME(4-6) → `TimeMicroVector` ← only this precision range works correctly - TIME(7+) → `TimeNanoVector` (not `TimeMicroVector`) → `ClassCastException` at runtime For `TIMESTAMP_WITHOUT_TIME_ZONE`/`TIMESTAMP_WITH_LOCAL_TIME_ZONE`, the cast to `TimeStampVector` will not fail (it is the parent class of all timestamp vectors), but the conversion `micros / 1000` and `% 1000` is only correct for microsecond-precision vectors. For other precisions: - TIMESTAMP(0) → `TimeStampSecVector`: `vector.get(i)` returns seconds, but is treated as microseconds → result is off by 10^6 - TIMESTAMP(1-3) → `TimeStampMilliVector`: treated as microseconds → off by 10^3 - TIMESTAMP(7+) → `TimeStampNanoVector`: treated as microseconds → off by 10^3 (nanos ÷ 1000 for millis, nanos % 1000 * 1000 for nano) The `createColumnVector` method needs to dispatch on the TIME/TIMESTAMP precision (via the `LogicalType`'s precision) and apply the correct unit conversion and vector cast for each precision range, consistent with `FlinkArrowUtils.toArrowType()`. The comment claiming "writer normalizes all TIME/TIMESTAMP values to microseconds regardless of declared precision" is incorrect — `FlinkArrowUtils.toArrowType()` selects the Arrow time unit based on precision. ########## auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowTimestampColumnVector.java: ########## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.auron.flink.arrow.vectors; + +import org.apache.arrow.vector.TimeStampVector; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.data.columnar.vector.TimestampColumnVector; +import org.apache.flink.util.Preconditions; + +/** + * A Flink {@link TimestampColumnVector} backed by an Arrow {@link TimeStampVector}. + * + * <p>This wrapper delegates all reads to the underlying Arrow vector, providing zero-copy access + * to Arrow data from Flink's columnar batch execution engine. It handles both {@code + * TimeStampMicroVector} (TIMESTAMP) and {@code TimeStampMicroTZVector} (TIMESTAMP_LTZ) by + * accepting their common parent type {@link TimeStampVector}. Microsecond values are converted to + * Flink's {@link TimestampData} representation (epoch millis + sub-millisecond nanos). + */ +public final class ArrowTimestampColumnVector implements TimestampColumnVector { + + private TimeStampVector vector; + + /** + * Creates a new wrapper around the given Arrow {@link TimeStampVector}. + * + * <p>Accepts both {@code TimeStampMicroVector} and {@code TimeStampMicroTZVector} since they + * share the same storage format and parent type. + * + * @param vector the Arrow vector to wrap, must not be null + */ + public ArrowTimestampColumnVector(TimeStampVector vector) { + this.vector = Preconditions.checkNotNull(vector); + } + + /** {@inheritDoc} */ + @Override + public boolean isNullAt(int i) { + return vector.isNull(i); + } + + /** + * Returns the timestamp at the given index as a {@link TimestampData}. + * + * <p>The underlying Arrow vector stores microseconds since epoch. This method splits the value + * into epoch milliseconds and sub-millisecond nanoseconds to construct a {@link TimestampData}. + * + * @param i the row index + * @param precision the timestamp precision (unused; conversion is always from microseconds) + * @return the timestamp value + */ + @Override + public TimestampData getTimestamp(int i, int precision) { + long micros = vector.get(i); + long millis = micros / 1000; + // micros % 1000 yields the sub-millisecond remainder in microseconds; * 1000 converts to nanos. + // For negative micros (pre-epoch), Java's truncation-toward-zero produces a negative + // remainder, which is consistent with the writer's inverse conversion. + int nanoOfMillisecond = ((int) (micros % 1000)) * 1000; Review Comment: For pre-epoch timestamps (negative `micros` values), Java's truncation-toward-zero in the conversion `nanoOfMillisecond = ((int)(micros % 1000)) * 1000` produces a negative `nanoOfMillisecond`. For example, with `micros = -1500`: `millis = -1`, `nanoOfMillisecond = -500000`. `TimestampData.fromEpochMillis` documents that `nanoOfMillisecond` must be in the range `[0, 999999]`. Passing a negative value creates a `TimestampData` that violates this contract and may produce incorrect behavior in downstream Flink operators that rely on this invariant. The correct approach for pre-epoch timestamps is to use floor division and adjust the remainder to be non-negative. For `micros = -1500`: `millis = Math.floorDiv(-1500, 1000) = -2`, `nanoOfMillisecond = (int)(Math.floorMod(-1500, 1000)) * 1000 = 500000`. ```suggestion // Use floor-based division so that for negative micros (pre-epoch), the remainder is // non-negative and nanoOfMillisecond stays within [0, 999_999], as required by // TimestampData.fromEpochMillis. long millis = Math.floorDiv(micros, 1000); int nanoOfMillisecond = (int) Math.floorMod(micros, 1000) * 1000; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
