This is an automated email from the ASF dual-hosted git repository.

zhangmang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git


The following commit(s) were added to refs/heads/master by this push:
     new 8dc034a0 [AURON #1851] Introduce Arrow to Flink RowData reader (#2063)
8dc034a0 is described below

commit 8dc034a09338ffde319e6e3493508b0baf1a2e5c
Author: Weiqing Yang <[email protected]>
AuthorDate: Mon Mar 9 20:58:19 2026 -0700

    [AURON #1851] Introduce Arrow to Flink RowData reader (#2063)
    
    # Which issue does this PR close?
    
     Closes #1851
    
    # Rationale for this change
    Per AIP-1, the Flink integration data path requires converting Arrow
    vectors returned by the native engine (DataFusion/Rust) back into Flink
    RowData so downstream Flink operators can process results.
    
    # What changes are included in this PR?
    - FlinkArrowReader orchestrator — zero-copy columnar access via
    ColumnarRowData + VectorizedColumnBatch
      - 16 ArrowXxxColumnVector wrappers for all 17 supported types
      - Decimal fromUnscaledLong optimization for precision ≤ 18
      - Batch reset support for streaming pipelines
      - 21 unit tests in FlinkArrowReaderTest
    
    # Are there any user-facing changes?
     No. Internal API for Flink integration.
    
    # How was this patch tested?
    21 tests: ./build/mvn test -pl auron-flink-extension/auron-flink-runtime
    -am -Pscala-2.12 -Pflink-1.18 -Pspark-3.5 -DskipBuildNative
    -Dtest=FlinkArrowReaderTest
    Result: 21 pass, 0 failures.
---
 .../apache/auron/flink/arrow/FlinkArrowReader.java | 257 +++++++
 .../arrow/vectors/ArrowArrayColumnVector.java      |  64 ++
 .../arrow/vectors/ArrowBigIntColumnVector.java     |  53 ++
 .../arrow/vectors/ArrowBooleanColumnVector.java    |  53 ++
 .../flink/arrow/vectors/ArrowDateColumnVector.java |  54 ++
 .../arrow/vectors/ArrowDecimalColumnVector.java    |  84 +++
 .../arrow/vectors/ArrowDoubleColumnVector.java     |  53 ++
 .../arrow/vectors/ArrowFloatColumnVector.java      |  53 ++
 .../flink/arrow/vectors/ArrowIntColumnVector.java  |  53 ++
 .../flink/arrow/vectors/ArrowMapColumnVector.java  |  69 ++
 .../flink/arrow/vectors/ArrowRowColumnVector.java  |  66 ++
 .../arrow/vectors/ArrowSmallIntColumnVector.java   |  53 ++
 .../flink/arrow/vectors/ArrowTimeColumnVector.java |  64 ++
 .../arrow/vectors/ArrowTimestampColumnVector.java  |  76 +++
 .../arrow/vectors/ArrowTinyIntColumnVector.java    |  53 ++
 .../arrow/vectors/ArrowVarBinaryColumnVector.java  |  54 ++
 .../arrow/vectors/ArrowVarCharColumnVector.java    |  54 ++
 .../auron/flink/arrow/FlinkArrowReaderTest.java    | 737 +++++++++++++++++++++
 18 files changed, 1950 insertions(+)

diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowReader.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowReader.java
new file mode 100644
index 00000000..aa0280af
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowReader.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow;
+
+import java.util.List;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.TimeMicroVector;
+import org.apache.arrow.vector.TimeStampVector;
+import org.apache.arrow.vector.TinyIntVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.ListVector;
+import org.apache.arrow.vector.complex.MapVector;
+import org.apache.arrow.vector.complex.StructVector;
+import org.apache.auron.flink.arrow.vectors.ArrowArrayColumnVector;
+import org.apache.auron.flink.arrow.vectors.ArrowBigIntColumnVector;
+import org.apache.auron.flink.arrow.vectors.ArrowBooleanColumnVector;
+import org.apache.auron.flink.arrow.vectors.ArrowDateColumnVector;
+import org.apache.auron.flink.arrow.vectors.ArrowDecimalColumnVector;
+import org.apache.auron.flink.arrow.vectors.ArrowDoubleColumnVector;
+import org.apache.auron.flink.arrow.vectors.ArrowFloatColumnVector;
+import org.apache.auron.flink.arrow.vectors.ArrowIntColumnVector;
+import org.apache.auron.flink.arrow.vectors.ArrowMapColumnVector;
+import org.apache.auron.flink.arrow.vectors.ArrowRowColumnVector;
+import org.apache.auron.flink.arrow.vectors.ArrowSmallIntColumnVector;
+import org.apache.auron.flink.arrow.vectors.ArrowTimeColumnVector;
+import org.apache.auron.flink.arrow.vectors.ArrowTimestampColumnVector;
+import org.apache.auron.flink.arrow.vectors.ArrowTinyIntColumnVector;
+import org.apache.auron.flink.arrow.vectors.ArrowVarBinaryColumnVector;
+import org.apache.auron.flink.arrow.vectors.ArrowVarCharColumnVector;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.columnar.ColumnarRowData;
+import org.apache.flink.table.data.columnar.vector.ColumnVector;
+import org.apache.flink.table.data.columnar.vector.VectorizedColumnBatch;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Reads Arrow {@link VectorSchemaRoot} data as Flink {@link RowData}.
+ *
+ * <p>Uses Flink's columnar data structures ({@link ColumnarRowData} backed by 
{@link
+ * VectorizedColumnBatch}) for zero-copy access to Arrow vectors. Each Arrow 
{@link FieldVector} is
+ * wrapped in a Flink {@link ColumnVector} implementation that delegates reads 
directly to the
+ * underlying Arrow vector.
+ *
+ * <p>Object reuse: {@link #read(int)} returns the same {@link 
ColumnarRowData} instance with a
+ * different row ID. Callers must consume or copy the returned row before the 
next call. This is
+ * standard Flink practice for columnar readers.
+ *
+ * <p>Implements {@link AutoCloseable} for use in resource management blocks. 
Note: this does NOT
+ * close the underlying {@link VectorSchemaRoot}. The caller that created the 
root is responsible
+ * for its lifecycle.
+ */
+public class FlinkArrowReader implements AutoCloseable {
+
+    private final RowType rowType;
+    private ColumnVector[] columnVectors;
+    private VectorizedColumnBatch batch;
+    private ColumnarRowData reusableRow;
+    private VectorSchemaRoot root;
+
+    private FlinkArrowReader(ColumnVector[] columnVectors, VectorSchemaRoot 
root, RowType rowType) {
+        this.columnVectors = columnVectors;
+        this.batch = new VectorizedColumnBatch(columnVectors);
+        this.reusableRow = new ColumnarRowData(batch);
+        this.root = root;
+        this.rowType = rowType;
+    }
+
+    /**
+     * Creates a {@link FlinkArrowReader} from a {@link VectorSchemaRoot} and 
{@link RowType}.
+     *
+     * <p>The RowType must match the schema of the VectorSchemaRoot (same 
number of fields, matching
+     * types). Each Arrow field vector is wrapped in the appropriate Flink 
{@link ColumnVector}
+     * implementation based on the corresponding Flink {@link LogicalType}.
+     *
+     * @param root the Arrow VectorSchemaRoot containing the data
+     * @param rowType the Flink RowType describing the schema
+     * @return a new FlinkArrowReader
+     * @throws IllegalArgumentException if field counts do not match
+     * @throws UnsupportedOperationException if a LogicalType is not supported
+     */
+    public static FlinkArrowReader create(VectorSchemaRoot root, RowType 
rowType) {
+        Preconditions.checkNotNull(root, "root must not be null");
+        Preconditions.checkNotNull(rowType, "rowType must not be null");
+        List<FieldVector> fieldVectors = root.getFieldVectors();
+        List<RowType.RowField> fields = rowType.getFields();
+        if (fieldVectors.size() != fields.size()) {
+            throw new IllegalArgumentException(
+                    "VectorSchemaRoot has " + fieldVectors.size() + " fields 
but RowType has " + fields.size());
+        }
+        ColumnVector[] columns = new ColumnVector[fieldVectors.size()];
+        for (int i = 0; i < fieldVectors.size(); i++) {
+            columns[i] = createColumnVector(fieldVectors.get(i), 
fields.get(i).getType());
+        }
+        return new FlinkArrowReader(columns, root, rowType);
+    }
+
+    /**
+     * Reads a row at the given position. Returns a reused {@link RowData} 
object — callers must not
+     * hold references across calls.
+     *
+     * @param rowId the row index within the current batch
+     * @return the row data at the given position
+     */
+    public RowData read(int rowId) {
+        reusableRow.setRowId(rowId);
+        return reusableRow;
+    }
+
+    /**
+     * Returns the number of rows in the current batch.
+     *
+     * @return the row count
+     */
+    public int getRowCount() {
+        return root.getRowCount();
+    }
+
+    /**
+     * Resets the reader to use a new {@link VectorSchemaRoot} with the same 
schema. Recreates
+     * column vector wrappers for the new root's field vectors.
+     *
+     * <p>The new root must have the same schema (same number and types of 
fields) as the original.
+     *
+     * @param newRoot the new VectorSchemaRoot, must not be null
+     */
+    public void reset(VectorSchemaRoot newRoot) {
+        Preconditions.checkNotNull(newRoot, "newRoot must not be null");
+        this.root = newRoot;
+        List<FieldVector> newVectors = newRoot.getFieldVectors();
+        Preconditions.checkArgument(
+                newVectors.size() == columnVectors.length,
+                "New root has %s fields but reader expects %s",
+                newVectors.size(),
+                columnVectors.length);
+        List<RowType.RowField> fields = rowType.getFields();
+        for (int i = 0; i < columnVectors.length; i++) {
+            columnVectors[i] =
+                    createColumnVector(newVectors.get(i), 
fields.get(i).getType());
+        }
+        this.batch = new VectorizedColumnBatch(columnVectors);
+        this.reusableRow = new ColumnarRowData(batch);
+    }
+
+    /**
+     * Implements {@link AutoCloseable} for use in resource management blocks. 
Note: this does NOT
+     * close the underlying {@link VectorSchemaRoot}. The caller that created 
the root is responsible
+     * for its lifecycle.
+     */
+    @Override
+    public void close() {
+        // Reader is a view; root lifecycle managed by caller.
+    }
+
+    /**
+     * Creates the appropriate Flink {@link ColumnVector} wrapper for the 
given Arrow {@link
+     * FieldVector} and Flink {@link LogicalType}.
+     */
+    static ColumnVector createColumnVector(FieldVector vector, LogicalType 
type) {
+        switch (type.getTypeRoot()) {
+            case BOOLEAN:
+                return new ArrowBooleanColumnVector((BitVector) vector);
+            case TINYINT:
+                return new ArrowTinyIntColumnVector((TinyIntVector) vector);
+            case SMALLINT:
+                return new ArrowSmallIntColumnVector((SmallIntVector) vector);
+            case INTEGER:
+                return new ArrowIntColumnVector((IntVector) vector);
+            case BIGINT:
+                return new ArrowBigIntColumnVector((BigIntVector) vector);
+            case FLOAT:
+                return new ArrowFloatColumnVector((Float4Vector) vector);
+            case DOUBLE:
+                return new ArrowDoubleColumnVector((Float8Vector) vector);
+            case VARCHAR:
+            case CHAR:
+                return new ArrowVarCharColumnVector((VarCharVector) vector);
+            case VARBINARY:
+            case BINARY:
+                return new ArrowVarBinaryColumnVector((VarBinaryVector) 
vector);
+            case DECIMAL:
+                return new ArrowDecimalColumnVector((DecimalVector) vector);
+            case DATE:
+                return new ArrowDateColumnVector((DateDayVector) vector);
+            case TIME_WITHOUT_TIME_ZONE:
+                // The native engine (DataFusion) uses microsecond precision 
for all temporal
+                // types, producing TimeMicroVector regardless of declared 
Flink TIME precision.
+                return new ArrowTimeColumnVector((TimeMicroVector) vector);
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                // The native engine (DataFusion) uses microsecond precision 
for all temporal
+                // types. TimeStampVector is the common parent of 
TimeStampMicroVector and
+                // TimeStampMicroTZVector.
+                return new ArrowTimestampColumnVector((TimeStampVector) 
vector);
+            case ARRAY:
+                return createArrayColumnVector((ListVector) vector, 
(ArrayType) type);
+            case MAP:
+                return createMapColumnVector((MapVector) vector, (MapType) 
type);
+            case ROW:
+                return createRowColumnVector((StructVector) vector, (RowType) 
type);
+            default:
+                throw new UnsupportedOperationException(
+                        "Unsupported Flink type for Arrow reader: " + 
type.asSummaryString());
+        }
+    }
+
+    private static ColumnVector createArrayColumnVector(ListVector vector, 
ArrayType arrayType) {
+        ColumnVector elementVector = 
createColumnVector(vector.getDataVector(), arrayType.getElementType());
+        return new ArrowArrayColumnVector(vector, elementVector);
+    }
+
+    private static ColumnVector createMapColumnVector(MapVector vector, 
MapType mapType) {
+        StructVector entriesVector = (StructVector) vector.getDataVector();
+        ColumnVector keyVector = 
createColumnVector(entriesVector.getChild(MapVector.KEY_NAME), 
mapType.getKeyType());
+        ColumnVector valueVector =
+                
createColumnVector(entriesVector.getChild(MapVector.VALUE_NAME), 
mapType.getValueType());
+        return new ArrowMapColumnVector(vector, keyVector, valueVector);
+    }
+
+    private static ColumnVector createRowColumnVector(StructVector vector, 
RowType rowType) {
+        List<FieldVector> childVectors = vector.getChildrenFromFields();
+        List<RowType.RowField> fields = rowType.getFields();
+        ColumnVector[] childColumns = new ColumnVector[childVectors.size()];
+        for (int i = 0; i < childVectors.size(); i++) {
+            childColumns[i] =
+                    createColumnVector(childVectors.get(i), 
fields.get(i).getType());
+        }
+        return new ArrowRowColumnVector(vector, childColumns);
+    }
+}
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowArrayColumnVector.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowArrayColumnVector.java
new file mode 100644
index 00000000..f3af97ab
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowArrayColumnVector.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.vectors;
+
+import org.apache.arrow.vector.complex.ListVector;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.columnar.ColumnarArrayData;
+import org.apache.flink.table.data.columnar.vector.ArrayColumnVector;
+import org.apache.flink.table.data.columnar.vector.ColumnVector;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A Flink {@link ArrayColumnVector} backed by an Arrow {@link ListVector}.
+ *
+ * <p>This wrapper delegates all reads to the underlying Arrow vector, 
providing zero-copy access
+ * to Arrow list data from Flink's columnar batch execution engine. The child 
element vector is
+ * itself a Flink {@link ColumnVector} wrapping the corresponding Arrow child 
vector, enabling
+ * recursive nesting of complex types.
+ */
+public final class ArrowArrayColumnVector implements ArrayColumnVector {
+
+    private final ListVector vector;
+    private final ColumnVector elementColumnVector;
+
+    /**
+     * Creates a new wrapper around the given Arrow {@link ListVector}.
+     *
+     * @param vector the Arrow list vector to wrap, must not be null
+     * @param elementColumnVector the Flink column vector wrapping the Arrow 
child element vector,
+     *     must not be null
+     */
+    public ArrowArrayColumnVector(ListVector vector, ColumnVector 
elementColumnVector) {
+        this.vector = Preconditions.checkNotNull(vector);
+        this.elementColumnVector = 
Preconditions.checkNotNull(elementColumnVector);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean isNullAt(int i) {
+        return vector.isNull(i);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public ArrayData getArray(int i) {
+        int offset = vector.getOffsetBuffer().getInt((long) i * 
ListVector.OFFSET_WIDTH);
+        int length = vector.getOffsetBuffer().getInt((long) (i + 1) * 
ListVector.OFFSET_WIDTH) - offset;
+        return new ColumnarArrayData(elementColumnVector, offset, length);
+    }
+}
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowBigIntColumnVector.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowBigIntColumnVector.java
new file mode 100644
index 00000000..6800234d
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowBigIntColumnVector.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.vectors;
+
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.flink.table.data.columnar.vector.LongColumnVector;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A Flink {@link LongColumnVector} backed by an Arrow {@link BigIntVector}.
+ *
+ * <p>This wrapper delegates all reads to the underlying Arrow vector, 
providing zero-copy access
+ * to Arrow data from Flink's columnar batch execution engine.
+ */
+public final class ArrowBigIntColumnVector implements LongColumnVector {
+
+    private final BigIntVector vector;
+
+    /**
+     * Creates a new wrapper around the given Arrow {@link BigIntVector}.
+     *
+     * @param vector the Arrow vector to wrap, must not be null
+     */
+    public ArrowBigIntColumnVector(BigIntVector vector) {
+        this.vector = Preconditions.checkNotNull(vector);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean isNullAt(int i) {
+        return vector.isNull(i);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long getLong(int i) {
+        return vector.get(i);
+    }
+}
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowBooleanColumnVector.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowBooleanColumnVector.java
new file mode 100644
index 00000000..e3dfad39
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowBooleanColumnVector.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.vectors;
+
+import org.apache.arrow.vector.BitVector;
+import org.apache.flink.table.data.columnar.vector.BooleanColumnVector;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A Flink {@link BooleanColumnVector} backed by an Arrow {@link BitVector}.
+ *
+ * <p>This wrapper delegates all reads to the underlying Arrow vector, 
providing zero-copy access
+ * to Arrow data from Flink's columnar batch execution engine.
+ */
+public final class ArrowBooleanColumnVector implements BooleanColumnVector {
+
+    private final BitVector vector;
+
+    /**
+     * Creates a new wrapper around the given Arrow {@link BitVector}.
+     *
+     * @param vector the Arrow vector to wrap, must not be null
+     */
+    public ArrowBooleanColumnVector(BitVector vector) {
+        this.vector = Preconditions.checkNotNull(vector);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean isNullAt(int i) {
+        return vector.isNull(i);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean getBoolean(int i) {
+        return vector.get(i) != 0;
+    }
+}
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowDateColumnVector.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowDateColumnVector.java
new file mode 100644
index 00000000..e52d476d
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowDateColumnVector.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.vectors;
+
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.flink.table.data.columnar.vector.IntColumnVector;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A Flink {@link IntColumnVector} backed by an Arrow {@link DateDayVector}.
+ *
+ * <p>This wrapper delegates all reads to the underlying Arrow vector, 
providing zero-copy access
+ * to Arrow data from Flink's columnar batch execution engine. {@link 
DateDayVector} stores dates
+ * as epoch days ({@code int}), which maps directly to Flink's internal DATE 
representation.
+ */
+public final class ArrowDateColumnVector implements IntColumnVector {
+
+    private final DateDayVector vector;
+
+    /**
+     * Creates a new wrapper around the given Arrow {@link DateDayVector}.
+     *
+     * @param vector the Arrow vector to wrap, must not be null
+     */
+    public ArrowDateColumnVector(DateDayVector vector) {
+        this.vector = Preconditions.checkNotNull(vector);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean isNullAt(int i) {
+        return vector.isNull(i);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int getInt(int i) {
+        return vector.get(i);
+    }
+}
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowDecimalColumnVector.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowDecimalColumnVector.java
new file mode 100644
index 00000000..dd3147dc
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowDecimalColumnVector.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.vectors;
+
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.columnar.vector.DecimalColumnVector;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A Flink {@link DecimalColumnVector} backed by an Arrow {@link 
DecimalVector}.
+ *
+ * <p>This wrapper delegates all reads to the underlying Arrow vector, 
providing zero-copy access
+ * to Arrow data from Flink's columnar batch execution engine.
+ *
+ * <p>For compact decimals (precision &lt;= 18), the implementation avoids 
BigDecimal allocation by
+ * reading the unscaled value directly from Arrow's little-endian byte 
representation.
+ */
+public final class ArrowDecimalColumnVector implements DecimalColumnVector {
+
+    /**
+     * Maximum precision that fits in a long (same as Flink's internal compact 
decimal threshold).
+     * DecimalData.MAX_COMPACT_PRECISION is package-private, so we define our 
own constant.
+     */
+    private static final int MAX_COMPACT_PRECISION = 18;
+
+    private final DecimalVector vector;
+
+    /**
+     * Creates a new wrapper around the given Arrow {@link DecimalVector}.
+     *
+     * @param vector the Arrow vector to wrap, must not be null
+     */
+    public ArrowDecimalColumnVector(DecimalVector vector) {
+        this.vector = Preconditions.checkNotNull(vector);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean isNullAt(int i) {
+        return vector.isNull(i);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public DecimalData getDecimal(int i, int precision, int scale) {
+        if (precision <= MAX_COMPACT_PRECISION) {
+            // Compact path: avoid BigDecimal allocation for precision <= 18.
+            // Arrow stores decimals as 128-bit little-endian two's complement.
+            // For precision <= 18, the value fits in a long.
+            long unscaledLong = readLittleEndianLong(vector.getDataBuffer(), 
(long) i * 16);
+            return DecimalData.fromUnscaledLong(unscaledLong, precision, 
scale);
+        }
+        return DecimalData.fromBigDecimal(vector.getObject(i), precision, 
scale);
+    }
+
+    /**
+     * Reads a little-endian long (8 bytes) from an Arrow buffer at the given 
byte offset. Arrow
+     * stores decimals as 128-bit little-endian two's complement. For values 
that fit in a long
+     * (precision &lt;= 18), the lower 8 bytes are sufficient.
+     *
+     * @param buffer the Arrow data buffer
+     * @param offset byte offset into the buffer
+     * @return the unscaled value as a long
+     */
+    private static long readLittleEndianLong(ArrowBuf buffer, long offset) {
+        return buffer.getLong(offset);
+    }
+}
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowDoubleColumnVector.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowDoubleColumnVector.java
new file mode 100644
index 00000000..a401ebea
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowDoubleColumnVector.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.vectors;
+
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.flink.table.data.columnar.vector.DoubleColumnVector;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A Flink {@link DoubleColumnVector} backed by an Arrow {@link Float8Vector}.
+ *
+ * <p>This wrapper delegates all reads to the underlying Arrow vector, 
providing zero-copy access
+ * to Arrow data from Flink's columnar batch execution engine.
+ */
+public final class ArrowDoubleColumnVector implements DoubleColumnVector {
+
+    private final Float8Vector vector;
+
+    /**
+     * Creates a new wrapper around the given Arrow {@link Float8Vector}.
+     *
+     * @param vector the Arrow vector to wrap, must not be null
+     */
+    public ArrowDoubleColumnVector(Float8Vector vector) {
+        this.vector = Preconditions.checkNotNull(vector);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean isNullAt(int i) {
+        return vector.isNull(i);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public double getDouble(int i) {
+        return vector.get(i);
+    }
+}
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowFloatColumnVector.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowFloatColumnVector.java
new file mode 100644
index 00000000..3cd5b696
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowFloatColumnVector.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.vectors;
+
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.flink.table.data.columnar.vector.FloatColumnVector;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A Flink {@link FloatColumnVector} backed by an Arrow {@link Float4Vector}.
+ *
+ * <p>This wrapper delegates all reads to the underlying Arrow vector, 
providing zero-copy access
+ * to Arrow data from Flink's columnar batch execution engine.
+ */
+public final class ArrowFloatColumnVector implements FloatColumnVector {
+
+    private final Float4Vector vector;
+
+    /**
+     * Creates a new wrapper around the given Arrow {@link Float4Vector}.
+     *
+     * @param vector the Arrow vector to wrap, must not be null
+     */
+    public ArrowFloatColumnVector(Float4Vector vector) {
+        this.vector = Preconditions.checkNotNull(vector);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean isNullAt(int i) {
+        return vector.isNull(i);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public float getFloat(int i) {
+        return vector.get(i);
+    }
+}
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowIntColumnVector.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowIntColumnVector.java
new file mode 100644
index 00000000..974f5fd0
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowIntColumnVector.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.vectors;
+
+import org.apache.arrow.vector.IntVector;
+import org.apache.flink.table.data.columnar.vector.IntColumnVector;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A Flink {@link IntColumnVector} backed by an Arrow {@link IntVector}.
+ *
+ * <p>This wrapper delegates all reads to the underlying Arrow vector, 
providing zero-copy access
+ * to Arrow data from Flink's columnar batch execution engine.
+ */
+public final class ArrowIntColumnVector implements IntColumnVector {
+
+    private final IntVector vector;
+
+    /**
+     * Creates a new wrapper around the given Arrow {@link IntVector}.
+     *
+     * @param vector the Arrow vector to wrap, must not be null
+     */
+    public ArrowIntColumnVector(IntVector vector) {
+        this.vector = Preconditions.checkNotNull(vector);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean isNullAt(int i) {
+        return vector.isNull(i);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int getInt(int i) {
+        return vector.get(i);
+    }
+}
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowMapColumnVector.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowMapColumnVector.java
new file mode 100644
index 00000000..fa17ab35
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowMapColumnVector.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.vectors;
+
+import org.apache.arrow.vector.complex.MapVector;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.columnar.ColumnarMapData;
+import org.apache.flink.table.data.columnar.vector.ColumnVector;
+import org.apache.flink.table.data.columnar.vector.MapColumnVector;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A Flink {@link MapColumnVector} backed by an Arrow {@link MapVector}.
+ *
+ * <p>This wrapper delegates all reads to the underlying Arrow vector, 
providing zero-copy access
+ * to Arrow map data from Flink's columnar batch execution engine. Arrow 
represents maps as a list
+ * of struct entries, where each entry contains a "key" and "value" child 
vector. The key and value
+ * Flink {@link ColumnVector} instances wrap the corresponding Arrow child 
vectors, enabling
+ * recursive nesting of complex types.
+ */
+public final class ArrowMapColumnVector implements MapColumnVector {
+
+    private final MapVector vector;
+    private final ColumnVector keyColumnVector;
+    private final ColumnVector valueColumnVector;
+
+    /**
+     * Creates a new wrapper around the given Arrow {@link MapVector}.
+     *
+     * @param vector the Arrow map vector to wrap, must not be null
+     * @param keyColumnVector the Flink column vector wrapping the Arrow key 
child vector, must not
+     *     be null
+     * @param valueColumnVector the Flink column vector wrapping the Arrow 
value child vector, must
+     *     not be null
+     */
+    public ArrowMapColumnVector(MapVector vector, ColumnVector 
keyColumnVector, ColumnVector valueColumnVector) {
+        this.vector = Preconditions.checkNotNull(vector);
+        this.keyColumnVector = Preconditions.checkNotNull(keyColumnVector);
+        this.valueColumnVector = Preconditions.checkNotNull(valueColumnVector);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean isNullAt(int i) {
+        return vector.isNull(i);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public MapData getMap(int i) {
+        int offset = vector.getOffsetBuffer().getInt((long) i * 
MapVector.OFFSET_WIDTH);
+        int length = vector.getOffsetBuffer().getInt((long) (i + 1) * 
MapVector.OFFSET_WIDTH) - offset;
+        return new ColumnarMapData(keyColumnVector, valueColumnVector, offset, 
length);
+    }
+}
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowRowColumnVector.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowRowColumnVector.java
new file mode 100644
index 00000000..f6c5b2f5
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowRowColumnVector.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.vectors;
+
+import org.apache.arrow.vector.complex.StructVector;
+import org.apache.flink.table.data.columnar.ColumnarRowData;
+import org.apache.flink.table.data.columnar.vector.ColumnVector;
+import org.apache.flink.table.data.columnar.vector.RowColumnVector;
+import org.apache.flink.table.data.columnar.vector.VectorizedColumnBatch;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A Flink {@link RowColumnVector} backed by an Arrow {@link StructVector}.
+ *
+ * <p>This wrapper delegates all reads to the underlying Arrow vector, 
providing zero-copy access
+ * to Arrow struct data from Flink's columnar batch execution engine. Each 
struct field is
+ * represented as a Flink {@link ColumnVector} wrapping the corresponding 
Arrow child vector,
+ * enabling recursive nesting of complex types. The child vectors are bundled 
into a {@link
+ * VectorizedColumnBatch} and accessed through a reusable {@link 
ColumnarRowData} instance.
+ */
+public final class ArrowRowColumnVector implements RowColumnVector {
+
+    private final StructVector vector;
+    private final VectorizedColumnBatch childBatch;
+    private final ColumnarRowData reusableRow;
+
+    /**
+     * Creates a new wrapper around the given Arrow {@link StructVector}.
+     *
+     * @param vector the Arrow struct vector to wrap, must not be null
+     * @param childColumnVectors the Flink column vectors wrapping each Arrow 
child field vector,
+     *     must not be null
+     */
+    public ArrowRowColumnVector(StructVector vector, ColumnVector[] 
childColumnVectors) {
+        this.vector = Preconditions.checkNotNull(vector);
+        this.childBatch = new 
VectorizedColumnBatch(Preconditions.checkNotNull(childColumnVectors));
+        this.reusableRow = new ColumnarRowData(childBatch);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean isNullAt(int i) {
+        return vector.isNull(i);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public ColumnarRowData getRow(int i) {
+        reusableRow.setRowId(i);
+        return reusableRow;
+    }
+}
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowSmallIntColumnVector.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowSmallIntColumnVector.java
new file mode 100644
index 00000000..0dfee501
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowSmallIntColumnVector.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.vectors;
+
+import org.apache.arrow.vector.SmallIntVector;
+import org.apache.flink.table.data.columnar.vector.ShortColumnVector;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A Flink {@link ShortColumnVector} backed by an Arrow {@link SmallIntVector}.
+ *
+ * <p>This wrapper delegates all reads to the underlying Arrow vector, 
providing zero-copy access
+ * to Arrow data from Flink's columnar batch execution engine.
+ */
+public final class ArrowSmallIntColumnVector implements ShortColumnVector {
+
+    private final SmallIntVector vector;
+
+    /**
+     * Creates a new wrapper around the given Arrow {@link SmallIntVector}.
+     *
+     * @param vector the Arrow vector to wrap, must not be null
+     */
+    public ArrowSmallIntColumnVector(SmallIntVector vector) {
+        this.vector = Preconditions.checkNotNull(vector);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean isNullAt(int i) {
+        return vector.isNull(i);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public short getShort(int i) {
+        return vector.get(i);
+    }
+}
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowTimeColumnVector.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowTimeColumnVector.java
new file mode 100644
index 00000000..66555256
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowTimeColumnVector.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.vectors;
+
+import org.apache.arrow.vector.TimeMicroVector;
+import org.apache.flink.table.data.columnar.vector.IntColumnVector;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A Flink {@link IntColumnVector} backed by an Arrow {@link TimeMicroVector}.
+ *
+ * <p>This wrapper delegates all reads to the underlying Arrow vector, 
providing zero-copy access
+ * to Arrow data from Flink's columnar batch execution engine. {@link 
TimeMicroVector} stores time
+ * values as microseconds since midnight ({@code long}), which are converted 
to milliseconds
+ * ({@code int}) to match Flink's internal TIME representation. The native 
engine (DataFusion)
+ * uses microsecond precision for all temporal types.
+ */
+public final class ArrowTimeColumnVector implements IntColumnVector {
+
+    private final TimeMicroVector vector;
+
+    /**
+     * Creates a new wrapper around the given Arrow {@link TimeMicroVector}.
+     *
+     * @param vector the Arrow vector to wrap, must not be null
+     */
+    public ArrowTimeColumnVector(TimeMicroVector vector) {
+        this.vector = Preconditions.checkNotNull(vector);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean isNullAt(int i) {
+        return vector.isNull(i);
+    }
+
+    /**
+     * Returns the time value at the given index as milliseconds since 
midnight.
+     *
+     * <p>The underlying Arrow vector stores microseconds; this method divides 
by 1000 to produce
+     * the millisecond value expected by Flink's internal TIME type.
+     *
+     * @param i the row index
+     * @return time of day in milliseconds since midnight
+     */
+    @Override
+    public int getInt(int i) {
+        return (int) (vector.get(i) / 1000);
+    }
+}
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowTimestampColumnVector.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowTimestampColumnVector.java
new file mode 100644
index 00000000..d10b2d30
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowTimestampColumnVector.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.vectors;
+
+import org.apache.arrow.vector.TimeStampVector;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.columnar.vector.TimestampColumnVector;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A Flink {@link TimestampColumnVector} backed by an Arrow {@link 
TimeStampVector}.
+ *
+ * <p>This wrapper delegates all reads to the underlying Arrow vector, 
providing zero-copy access
+ * to Arrow data from Flink's columnar batch execution engine. It handles both 
{@code
+ * TimeStampMicroVector} (TIMESTAMP) and {@code TimeStampMicroTZVector} 
(TIMESTAMP_LTZ) by
+ * accepting their common parent type {@link TimeStampVector}. The native 
engine (DataFusion)
+ * uses microsecond precision for all temporal types. Microsecond values are 
converted to Flink's
+ * {@link TimestampData} representation (epoch millis + sub-millisecond nanos).
+ */
+public final class ArrowTimestampColumnVector implements TimestampColumnVector 
{
+
+    private final TimeStampVector vector;
+
+    /**
+     * Creates a new wrapper around the given Arrow {@link TimeStampVector}.
+     *
+     * <p>Accepts both {@code TimeStampMicroVector} and {@code 
TimeStampMicroTZVector} since they
+     * share the same storage format and parent type.
+     *
+     * @param vector the Arrow vector to wrap, must not be null
+     */
+    public ArrowTimestampColumnVector(TimeStampVector vector) {
+        this.vector = Preconditions.checkNotNull(vector);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean isNullAt(int i) {
+        return vector.isNull(i);
+    }
+
+    /**
+     * Returns the timestamp at the given index as a {@link TimestampData}.
+     *
+     * <p>The underlying Arrow vector stores microseconds since epoch. This 
method splits the value
+     * into epoch milliseconds and sub-millisecond nanoseconds to construct a 
{@link TimestampData}.
+     *
+     * @param i the row index
+     * @param precision the timestamp precision (unused; conversion is always 
from microseconds)
+     * @return the timestamp value
+     */
+    @Override
+    public TimestampData getTimestamp(int i, int precision) {
+        long micros = vector.get(i);
+        // Use floor-based division so that for negative micros (pre-epoch), 
the remainder is
+        // non-negative and nanoOfMillisecond stays within [0, 999_999], as 
required by
+        // TimestampData.fromEpochMillis.
+        long millis = Math.floorDiv(micros, 1000);
+        int nanoOfMillisecond = ((int) Math.floorMod(micros, 1000)) * 1000;
+        return TimestampData.fromEpochMillis(millis, nanoOfMillisecond);
+    }
+}
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowTinyIntColumnVector.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowTinyIntColumnVector.java
new file mode 100644
index 00000000..a5ac0f8c
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowTinyIntColumnVector.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.vectors;
+
+import org.apache.arrow.vector.TinyIntVector;
+import org.apache.flink.table.data.columnar.vector.ByteColumnVector;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A Flink {@link ByteColumnVector} backed by an Arrow {@link TinyIntVector}.
+ *
+ * <p>This wrapper delegates all reads to the underlying Arrow vector, 
providing zero-copy access
+ * to Arrow data from Flink's columnar batch execution engine.
+ */
+public final class ArrowTinyIntColumnVector implements ByteColumnVector {
+
+    private final TinyIntVector vector;
+
+    /**
+     * Creates a new wrapper around the given Arrow {@link TinyIntVector}.
+     *
+     * @param vector the Arrow vector to wrap, must not be null
+     */
+    public ArrowTinyIntColumnVector(TinyIntVector vector) {
+        this.vector = Preconditions.checkNotNull(vector);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean isNullAt(int i) {
+        return vector.isNull(i);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public byte getByte(int i) {
+        return vector.get(i);
+    }
+}
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowVarBinaryColumnVector.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowVarBinaryColumnVector.java
new file mode 100644
index 00000000..cec43c6c
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowVarBinaryColumnVector.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.vectors;
+
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.flink.table.data.columnar.vector.BytesColumnVector;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A Flink {@link BytesColumnVector} backed by an Arrow {@link 
VarBinaryVector}.
+ *
+ * <p>This wrapper delegates all reads to the underlying Arrow vector, 
providing zero-copy access
+ * to Arrow data from Flink's columnar batch execution engine.
+ */
+public final class ArrowVarBinaryColumnVector implements BytesColumnVector {
+
+    private final VarBinaryVector vector;
+
+    /**
+     * Creates a new wrapper around the given Arrow {@link VarBinaryVector}.
+     *
+     * @param vector the Arrow vector to wrap, must not be null
+     */
+    public ArrowVarBinaryColumnVector(VarBinaryVector vector) {
+        this.vector = Preconditions.checkNotNull(vector);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean isNullAt(int i) {
+        return vector.isNull(i);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Bytes getBytes(int i) {
+        byte[] bytes = vector.get(i);
+        return new Bytes(bytes, 0, bytes.length);
+    }
+}
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowVarCharColumnVector.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowVarCharColumnVector.java
new file mode 100644
index 00000000..d2b19bb8
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowVarCharColumnVector.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.vectors;
+
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.flink.table.data.columnar.vector.BytesColumnVector;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A Flink {@link BytesColumnVector} backed by an Arrow {@link VarCharVector}.
+ *
+ * <p>This wrapper delegates all reads to the underlying Arrow vector, 
providing zero-copy access
+ * to Arrow data from Flink's columnar batch execution engine.
+ */
+public final class ArrowVarCharColumnVector implements BytesColumnVector {
+
+    private final VarCharVector vector;
+
+    /**
+     * Creates a new wrapper around the given Arrow {@link VarCharVector}.
+     *
+     * @param vector the Arrow vector to wrap, must not be null
+     */
+    public ArrowVarCharColumnVector(VarCharVector vector) {
+        this.vector = Preconditions.checkNotNull(vector);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean isNullAt(int i) {
+        return vector.isNull(i);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Bytes getBytes(int i) {
+        byte[] bytes = vector.get(i);
+        return new Bytes(bytes, 0, bytes.length);
+    }
+}
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowReaderTest.java
 
b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowReaderTest.java
new file mode 100644
index 00000000..1816c6c8
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowReaderTest.java
@@ -0,0 +1,737 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.TimeMicroVector;
+import org.apache.arrow.vector.TimeStampMicroTZVector;
+import org.apache.arrow.vector.TimeStampMicroVector;
+import org.apache.arrow.vector.TinyIntVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.ListVector;
+import org.apache.arrow.vector.complex.MapVector;
+import org.apache.arrow.vector.complex.StructVector;
+import org.apache.arrow.vector.complex.impl.UnionListWriter;
+import org.apache.arrow.vector.complex.writer.BaseWriter;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RawType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.junit.jupiter.api.Test;
+
+/** Unit tests for {@link FlinkArrowReader}. */
+public class FlinkArrowReaderTest {
+
+    @Test
+    public void testBooleanVector() {
+        try (BufferAllocator allocator =
+                
FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testBoolean", 0, 
Long.MAX_VALUE)) {
+            BitVector bitVector = new BitVector("col", allocator);
+            bitVector.allocateNew(3);
+            bitVector.setSafe(0, 1);
+            bitVector.setNull(1);
+            bitVector.setSafe(2, 0);
+            bitVector.setValueCount(3);
+
+            VectorSchemaRoot root = new 
VectorSchemaRoot(Collections.singletonList(bitVector));
+            RowType rowType = RowType.of(new BooleanType());
+            FlinkArrowReader reader = FlinkArrowReader.create(root, rowType);
+
+            assertEquals(3, reader.getRowCount());
+            assertTrue(reader.read(0).getBoolean(0));
+            assertTrue(reader.read(1).isNullAt(0));
+            assertFalse(reader.read(2).getBoolean(0));
+
+            reader.close();
+            root.close();
+        }
+    }
+
+    @Test
+    public void testTinyIntVector() {
+        try (BufferAllocator allocator =
+                
FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testTinyInt", 0, 
Long.MAX_VALUE)) {
+            TinyIntVector vec = new TinyIntVector("col", allocator);
+            vec.allocateNew(3);
+            vec.setSafe(0, 1);
+            vec.setNull(1);
+            vec.setSafe(2, -1);
+            vec.setValueCount(3);
+
+            VectorSchemaRoot root = new 
VectorSchemaRoot(Collections.singletonList(vec));
+            RowType rowType = RowType.of(new TinyIntType());
+            FlinkArrowReader reader = FlinkArrowReader.create(root, rowType);
+
+            assertEquals((byte) 1, reader.read(0).getByte(0));
+            assertTrue(reader.read(1).isNullAt(0));
+            assertEquals((byte) -1, reader.read(2).getByte(0));
+
+            reader.close();
+            root.close();
+        }
+    }
+
+    @Test
+    public void testSmallIntVector() {
+        try (BufferAllocator allocator =
+                
FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testSmallInt", 0, 
Long.MAX_VALUE)) {
+            SmallIntVector vec = new SmallIntVector("col", allocator);
+            vec.allocateNew(3);
+            vec.setSafe(0, 100);
+            vec.setNull(1);
+            vec.setSafe(2, -100);
+            vec.setValueCount(3);
+
+            VectorSchemaRoot root = new 
VectorSchemaRoot(Collections.singletonList(vec));
+            RowType rowType = RowType.of(new SmallIntType());
+            FlinkArrowReader reader = FlinkArrowReader.create(root, rowType);
+
+            assertEquals((short) 100, reader.read(0).getShort(0));
+            assertTrue(reader.read(1).isNullAt(0));
+            assertEquals((short) -100, reader.read(2).getShort(0));
+
+            reader.close();
+            root.close();
+        }
+    }
+
+    @Test
+    public void testIntVector() {
+        try (BufferAllocator allocator =
+                FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testInt", 0, 
Long.MAX_VALUE)) {
+            IntVector vec = new IntVector("col", allocator);
+            vec.allocateNew(3);
+            vec.setSafe(0, 42);
+            vec.setNull(1);
+            vec.setSafe(2, Integer.MAX_VALUE);
+            vec.setValueCount(3);
+
+            VectorSchemaRoot root = new 
VectorSchemaRoot(Collections.singletonList(vec));
+            RowType rowType = RowType.of(new IntType());
+            FlinkArrowReader reader = FlinkArrowReader.create(root, rowType);
+
+            assertEquals(42, reader.read(0).getInt(0));
+            assertTrue(reader.read(1).isNullAt(0));
+            assertEquals(Integer.MAX_VALUE, reader.read(2).getInt(0));
+
+            reader.close();
+            root.close();
+        }
+    }
+
+    @Test
+    public void testBigIntVector() {
+        try (BufferAllocator allocator =
+                FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testBigInt", 
0, Long.MAX_VALUE)) {
+            BigIntVector vec = new BigIntVector("col", allocator);
+            vec.allocateNew(3);
+            vec.setSafe(0, Long.MAX_VALUE);
+            vec.setNull(1);
+            vec.setSafe(2, -1L);
+            vec.setValueCount(3);
+
+            VectorSchemaRoot root = new 
VectorSchemaRoot(Collections.singletonList(vec));
+            RowType rowType = RowType.of(new BigIntType());
+            FlinkArrowReader reader = FlinkArrowReader.create(root, rowType);
+
+            assertEquals(Long.MAX_VALUE, reader.read(0).getLong(0));
+            assertTrue(reader.read(1).isNullAt(0));
+            assertEquals(-1L, reader.read(2).getLong(0));
+
+            reader.close();
+            root.close();
+        }
+    }
+
+    @Test
+    public void testFloatVector() {
+        try (BufferAllocator allocator =
+                FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testFloat", 
0, Long.MAX_VALUE)) {
+            Float4Vector vec = new Float4Vector("col", allocator);
+            vec.allocateNew(3);
+            vec.setSafe(0, 3.14f);
+            vec.setNull(1);
+            vec.setSafe(2, Float.NaN);
+            vec.setValueCount(3);
+
+            VectorSchemaRoot root = new 
VectorSchemaRoot(Collections.singletonList(vec));
+            RowType rowType = RowType.of(new FloatType());
+            FlinkArrowReader reader = FlinkArrowReader.create(root, rowType);
+
+            assertEquals(3.14f, reader.read(0).getFloat(0), 0.001f);
+            assertTrue(reader.read(1).isNullAt(0));
+            assertTrue(Float.isNaN(reader.read(2).getFloat(0)));
+
+            reader.close();
+            root.close();
+        }
+    }
+
+    @Test
+    public void testDoubleVector() {
+        try (BufferAllocator allocator =
+                FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testDouble", 
0, Long.MAX_VALUE)) {
+            Float8Vector vec = new Float8Vector("col", allocator);
+            vec.allocateNew(3);
+            vec.setSafe(0, 2.718);
+            vec.setNull(1);
+            vec.setSafe(2, Double.MAX_VALUE);
+            vec.setValueCount(3);
+
+            VectorSchemaRoot root = new 
VectorSchemaRoot(Collections.singletonList(vec));
+            RowType rowType = RowType.of(new DoubleType());
+            FlinkArrowReader reader = FlinkArrowReader.create(root, rowType);
+
+            assertEquals(2.718, reader.read(0).getDouble(0), 0.001);
+            assertTrue(reader.read(1).isNullAt(0));
+            assertEquals(Double.MAX_VALUE, reader.read(2).getDouble(0));
+
+            reader.close();
+            root.close();
+        }
+    }
+
+    @Test
+    public void testVarCharVector() {
+        try (BufferAllocator allocator =
+                
FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testVarChar", 0, 
Long.MAX_VALUE)) {
+            VarCharVector vec = new VarCharVector("col", allocator);
+            vec.allocateNew(3);
+            vec.setSafe(0, "hello".getBytes(StandardCharsets.UTF_8));
+            vec.setNull(1);
+            vec.setSafe(2, "".getBytes(StandardCharsets.UTF_8));
+            vec.setValueCount(3);
+
+            VectorSchemaRoot root = new 
VectorSchemaRoot(Collections.singletonList(vec));
+            RowType rowType = RowType.of(new VarCharType(100));
+            FlinkArrowReader reader = FlinkArrowReader.create(root, rowType);
+
+            RowData row0 = reader.read(0);
+            assertArrayEquals(
+                    "hello".getBytes(StandardCharsets.UTF_8), 
row0.getString(0).toBytes());
+            assertTrue(reader.read(1).isNullAt(0));
+            assertEquals(0, reader.read(2).getString(0).toBytes().length);
+
+            reader.close();
+            root.close();
+        }
+    }
+
+    @Test
+    public void testVarBinaryVector() {
+        try (BufferAllocator allocator =
+                
FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testVarBinary", 0, 
Long.MAX_VALUE)) {
+            VarBinaryVector vec = new VarBinaryVector("col", allocator);
+            vec.allocateNew(3);
+            vec.setSafe(0, new byte[] {0x01, 0x02});
+            vec.setNull(1);
+            vec.setSafe(2, new byte[] {});
+            vec.setValueCount(3);
+
+            VectorSchemaRoot root = new 
VectorSchemaRoot(Collections.singletonList(vec));
+            RowType rowType = RowType.of(new VarBinaryType(100));
+            FlinkArrowReader reader = FlinkArrowReader.create(root, rowType);
+
+            assertArrayEquals(new byte[] {0x01, 0x02}, 
reader.read(0).getBinary(0));
+            assertTrue(reader.read(1).isNullAt(0));
+            assertArrayEquals(new byte[] {}, reader.read(2).getBinary(0));
+
+            reader.close();
+            root.close();
+        }
+    }
+
+    @Test
+    public void testDecimalVector() {
+        try (BufferAllocator allocator =
+                
FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testDecimal", 0, 
Long.MAX_VALUE)) {
+            // Compact path: precision 10 (<= 18)
+            DecimalVector compactVec = new DecimalVector("compact", allocator, 
10, 2);
+            compactVec.allocateNew(2);
+            compactVec.setSafe(0, new BigDecimal("123.45"));
+            compactVec.setNull(1);
+            compactVec.setValueCount(2);
+
+            VectorSchemaRoot compactRoot = new 
VectorSchemaRoot(Collections.singletonList(compactVec));
+            RowType compactType = RowType.of(new DecimalType(10, 2));
+            FlinkArrowReader compactReader = 
FlinkArrowReader.create(compactRoot, compactType);
+
+            DecimalData compactVal = compactReader.read(0).getDecimal(0, 10, 
2);
+            assertEquals(new BigDecimal("123.45"), compactVal.toBigDecimal());
+            assertTrue(compactReader.read(1).isNullAt(0));
+
+            compactReader.close();
+            compactRoot.close();
+
+            // Wide path: precision 20 (> 18)
+            DecimalVector wideVec = new DecimalVector("wide", allocator, 20, 
2);
+            wideVec.allocateNew(1);
+            wideVec.setSafe(0, new BigDecimal("123456789012345678.90"));
+            wideVec.setValueCount(1);
+
+            VectorSchemaRoot wideRoot = new 
VectorSchemaRoot(Collections.singletonList(wideVec));
+            RowType wideType = RowType.of(new DecimalType(20, 2));
+            FlinkArrowReader wideReader = FlinkArrowReader.create(wideRoot, 
wideType);
+
+            DecimalData wideVal = wideReader.read(0).getDecimal(0, 20, 2);
+            assertEquals(0, new 
BigDecimal("123456789012345678.90").compareTo(wideVal.toBigDecimal()));
+
+            wideReader.close();
+            wideRoot.close();
+        }
+    }
+
+    @Test
+    public void testDateVector() {
+        try (BufferAllocator allocator =
+                FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testDate", 
0, Long.MAX_VALUE)) {
+            DateDayVector vec = new DateDayVector("col", allocator);
+            vec.allocateNew(3);
+            vec.setSafe(0, 18000);
+            vec.setNull(1);
+            vec.setSafe(2, 0);
+            vec.setValueCount(3);
+
+            VectorSchemaRoot root = new 
VectorSchemaRoot(Collections.singletonList(vec));
+            RowType rowType = RowType.of(new DateType());
+            FlinkArrowReader reader = FlinkArrowReader.create(root, rowType);
+
+            assertEquals(18000, reader.read(0).getInt(0));
+            assertTrue(reader.read(1).isNullAt(0));
+            assertEquals(0, reader.read(2).getInt(0));
+
+            reader.close();
+            root.close();
+        }
+    }
+
+    @Test
+    public void testTimeVector() {
+        try (BufferAllocator allocator =
+                FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testTime", 
0, Long.MAX_VALUE)) {
+            TimeMicroVector vec = new TimeMicroVector("col", allocator);
+            vec.allocateNew(3);
+            vec.setSafe(0, 45_296_000_000L); // 45296000000 micros -> 45296000 
millis
+            vec.setNull(1);
+            vec.setSafe(2, 0L);
+            vec.setValueCount(3);
+
+            VectorSchemaRoot root = new 
VectorSchemaRoot(Collections.singletonList(vec));
+            RowType rowType = RowType.of(new TimeType(6));
+            FlinkArrowReader reader = FlinkArrowReader.create(root, rowType);
+
+            // ArrowTimeColumnVector divides micros by 1000 to get millis
+            assertEquals(45_296_000, reader.read(0).getInt(0));
+            assertTrue(reader.read(1).isNullAt(0));
+            assertEquals(0, reader.read(2).getInt(0));
+
+            reader.close();
+            root.close();
+        }
+    }
+
+    @Test
+    public void testTimestampVector() {
+        try (BufferAllocator allocator =
+                
FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testTimestamp", 0, 
Long.MAX_VALUE)) {
+            TimeStampMicroVector vec = new TimeStampMicroVector("col", 
allocator);
+            vec.allocateNew(2);
+            vec.setSafe(0, 1_672_531_200_000_123L); // 
2023-01-01T00:00:00.000123
+            vec.setNull(1);
+            vec.setValueCount(2);
+
+            VectorSchemaRoot root = new 
VectorSchemaRoot(Collections.singletonList(vec));
+            RowType rowType = RowType.of(new TimestampType(6));
+            FlinkArrowReader reader = FlinkArrowReader.create(root, rowType);
+
+            TimestampData ts = reader.read(0).getTimestamp(0, 6);
+            // millis = 1672531200000123 / 1000 = 1672531200000
+            assertEquals(1_672_531_200_000L, ts.getMillisecond());
+            // nanoOfMillisecond = (1672531200000123 % 1000) * 1000 = 123 * 
1000 = 123000
+            assertEquals(123_000, ts.getNanoOfMillisecond());
+            assertTrue(reader.read(1).isNullAt(0));
+
+            reader.close();
+            root.close();
+        }
+    }
+
+    @Test
+    public void testTimestampLtzVector() {
+        try (BufferAllocator allocator =
+                
FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testTimestampLtz", 0, 
Long.MAX_VALUE)) {
+            TimeStampMicroTZVector vec = new TimeStampMicroTZVector("col", 
allocator, "UTC");
+            vec.allocateNew(2);
+            vec.setSafe(0, 1_672_531_200_000_123L);
+            vec.setNull(1);
+            vec.setValueCount(2);
+
+            VectorSchemaRoot root = new 
VectorSchemaRoot(Collections.singletonList(vec));
+            RowType rowType = RowType.of(new LocalZonedTimestampType(6));
+            FlinkArrowReader reader = FlinkArrowReader.create(root, rowType);
+
+            TimestampData ts = reader.read(0).getTimestamp(0, 6);
+            assertEquals(1_672_531_200_000L, ts.getMillisecond());
+            assertEquals(123_000, ts.getNanoOfMillisecond());
+            assertTrue(reader.read(1).isNullAt(0));
+
+            reader.close();
+            root.close();
+        }
+    }
+
+    @Test
+    public void testNegativeTimestamp() {
+        try (BufferAllocator allocator =
+                FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testNegTs", 
0, Long.MAX_VALUE)) {
+            TimeStampMicroVector vec = new TimeStampMicroVector("col", 
allocator);
+            vec.allocateNew(1);
+            vec.setSafe(0, -1500L); // 1.5 millis before epoch
+            vec.setValueCount(1);
+
+            VectorSchemaRoot root = new 
VectorSchemaRoot(Collections.singletonList(vec));
+            RowType rowType = RowType.of(new TimestampType(6));
+            FlinkArrowReader reader = FlinkArrowReader.create(root, rowType);
+
+            TimestampData ts = reader.read(0).getTimestamp(0, 6);
+            // floorDiv(-1500, 1000) = -2, floorMod(-1500, 1000) = 500
+            assertEquals(-2L, ts.getMillisecond());
+            assertEquals(500_000, ts.getNanoOfMillisecond());
+
+            reader.close();
+            root.close();
+        }
+    }
+
+    @Test
+    public void testArrayVector() {
+        try (BufferAllocator allocator =
+                FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testArray", 
0, Long.MAX_VALUE)) {
+            ListVector listVector = ListVector.empty("col", allocator);
+            listVector.addOrGetVector(FieldType.nullable(new ArrowType.Int(32, 
true)));
+            listVector.allocateNew();
+
+            UnionListWriter writer = listVector.getWriter();
+            // Row 0: [1, 2, 3]
+            writer.setPosition(0);
+            writer.startList();
+            writer.writeInt(1);
+            writer.writeInt(2);
+            writer.writeInt(3);
+            writer.endList();
+            // Row 1: null (set after writer completes)
+            writer.setPosition(1);
+            writer.startList();
+            writer.endList();
+            // Row 2: []
+            writer.setPosition(2);
+            writer.startList();
+            writer.endList();
+            writer.setValueCount(3);
+            listVector.setNull(1);
+
+            VectorSchemaRoot root = new 
VectorSchemaRoot(Collections.singletonList(listVector));
+            RowType rowType = RowType.of(new ArrayType(new IntType()));
+            FlinkArrowReader reader = FlinkArrowReader.create(root, rowType);
+
+            // Row 0: [1, 2, 3]
+            ArrayData arr0 = reader.read(0).getArray(0);
+            assertEquals(3, arr0.size());
+            assertEquals(1, arr0.getInt(0));
+            assertEquals(2, arr0.getInt(1));
+            assertEquals(3, arr0.getInt(2));
+
+            // Row 1: null
+            assertTrue(reader.read(1).isNullAt(0));
+
+            // Row 2: []
+            ArrayData arr2 = reader.read(2).getArray(0);
+            assertEquals(0, arr2.size());
+
+            reader.close();
+            root.close();
+        }
+    }
+
+    @Test
+    public void testMapVector() {
+        try (BufferAllocator allocator =
+                FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testMap", 0, 
Long.MAX_VALUE)) {
+            MapVector mapVector = MapVector.empty("col", allocator, false);
+            mapVector.allocateNew();
+
+            BaseWriter.MapWriter mapWriter = mapVector.getWriter();
+            // Row 0: {"a" -> 1}
+            mapWriter.setPosition(0);
+            mapWriter.startMap();
+            mapWriter.startEntry();
+            byte[] keyBytes = "a".getBytes(StandardCharsets.UTF_8);
+            ArrowBuf keyBuf = allocator.buffer(keyBytes.length);
+            keyBuf.setBytes(0, keyBytes);
+            mapWriter.key().varChar().writeVarChar(0, keyBytes.length, keyBuf);
+            keyBuf.close();
+            mapWriter.value().integer().writeInt(1);
+            mapWriter.endEntry();
+            mapWriter.endMap();
+            // Row 1: null (write empty map, then mark null)
+            mapWriter.setPosition(1);
+            mapWriter.startMap();
+            mapWriter.endMap();
+            // Row 2: {} (empty map)
+            mapWriter.setPosition(2);
+            mapWriter.startMap();
+            mapWriter.endMap();
+            mapVector.setValueCount(3);
+            mapVector.setNull(1);
+
+            VectorSchemaRoot root = new 
VectorSchemaRoot(Collections.singletonList(mapVector));
+            RowType rowType = RowType.of(new MapType(new VarCharType(100), new 
IntType()));
+            FlinkArrowReader reader = FlinkArrowReader.create(root, rowType);
+
+            // Row 0: {"a" -> 1}
+            MapData map0 = reader.read(0).getMap(0);
+            assertEquals(1, map0.size());
+
+            // Row 1: null
+            assertTrue(reader.read(1).isNullAt(0));
+
+            // Row 2: empty
+            MapData map2 = reader.read(2).getMap(0);
+            assertEquals(0, map2.size());
+
+            reader.close();
+            root.close();
+        }
+    }
+
+    @Test
+    public void testRowVector() {
+        try (BufferAllocator allocator =
+                FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testRow", 0, 
Long.MAX_VALUE)) {
+            FieldType intFieldType = FieldType.nullable(new ArrowType.Int(32, 
true));
+            FieldType utf8FieldType = 
FieldType.nullable(ArrowType.Utf8.INSTANCE);
+            StructVector structVector = StructVector.empty("col", allocator);
+            structVector.addOrGet("f0", intFieldType, IntVector.class);
+            structVector.addOrGet("f1", utf8FieldType, VarCharVector.class);
+
+            IntVector intChild = (IntVector) structVector.getChild("f0");
+            VarCharVector strChild = (VarCharVector) 
structVector.getChild("f1");
+
+            structVector.allocateNew();
+            structVector.setIndexDefined(0);
+            intChild.setSafe(0, 42);
+            strChild.setSafe(0, "hello".getBytes(StandardCharsets.UTF_8));
+            structVector.setNull(1);
+            intChild.setNull(1);
+            strChild.setNull(1);
+            structVector.setValueCount(2);
+            intChild.setValueCount(2);
+            strChild.setValueCount(2);
+
+            VectorSchemaRoot root = new 
VectorSchemaRoot(Collections.singletonList(structVector));
+            RowType innerType =
+                    RowType.of(new LogicalType[] {new IntType(), new 
VarCharType(100)}, new String[] {"f0", "f1"});
+            RowType rowType = RowType.of(new LogicalType[] {innerType}, new 
String[] {"col"});
+            FlinkArrowReader reader = FlinkArrowReader.create(root, rowType);
+
+            // Row 0: struct(42, "hello")
+            RowData nested0 = reader.read(0).getRow(0, 2);
+            assertEquals(42, nested0.getInt(0));
+            assertArrayEquals(
+                    "hello".getBytes(StandardCharsets.UTF_8),
+                    nested0.getString(1).toBytes());
+
+            // Row 1: null struct
+            assertTrue(reader.read(1).isNullAt(0));
+
+            reader.close();
+            root.close();
+        }
+    }
+
+    @Test
+    public void testMultiColumnBatch() {
+        try (BufferAllocator allocator =
+                
FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testMultiCol", 0, 
Long.MAX_VALUE)) {
+            IntVector intVec = new IntVector("id", allocator);
+            intVec.allocateNew(3);
+            intVec.setSafe(0, 1);
+            intVec.setSafe(1, 2);
+            intVec.setSafe(2, 3);
+            intVec.setValueCount(3);
+
+            VarCharVector strVec = new VarCharVector("name", allocator);
+            strVec.allocateNew(3);
+            strVec.setSafe(0, "alice".getBytes(StandardCharsets.UTF_8));
+            strVec.setSafe(1, "bob".getBytes(StandardCharsets.UTF_8));
+            strVec.setNull(2);
+            strVec.setValueCount(3);
+
+            BitVector boolVec = new BitVector("active", allocator);
+            boolVec.allocateNew(3);
+            boolVec.setSafe(0, 1);
+            boolVec.setSafe(1, 0);
+            boolVec.setSafe(2, 1);
+            boolVec.setValueCount(3);
+
+            List<FieldVector> vectors = Arrays.asList(intVec, strVec, boolVec);
+            VectorSchemaRoot root = new VectorSchemaRoot(vectors);
+            RowType rowType = RowType.of(
+                    new LogicalType[] {new IntType(), new VarCharType(100), 
new BooleanType()},
+                    new String[] {"id", "name", "active"});
+            FlinkArrowReader reader = FlinkArrowReader.create(root, rowType);
+
+            assertEquals(3, reader.getRowCount());
+
+            RowData row0 = reader.read(0);
+            assertEquals(1, row0.getInt(0));
+            assertArrayEquals(
+                    "alice".getBytes(StandardCharsets.UTF_8), 
row0.getString(1).toBytes());
+            assertTrue(row0.getBoolean(2));
+
+            RowData row1 = reader.read(1);
+            assertEquals(2, row1.getInt(0));
+            assertArrayEquals(
+                    "bob".getBytes(StandardCharsets.UTF_8), 
row1.getString(1).toBytes());
+            assertFalse(row1.getBoolean(2));
+
+            RowData row2 = reader.read(2);
+            assertEquals(3, row2.getInt(0));
+            assertTrue(row2.isNullAt(1));
+            assertTrue(row2.getBoolean(2));
+
+            reader.close();
+            root.close();
+        }
+    }
+
+    @Test
+    public void testEmptyBatch() {
+        try (BufferAllocator allocator =
+                FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testEmpty", 
0, Long.MAX_VALUE)) {
+            IntVector vec = new IntVector("col", allocator);
+            vec.allocateNew(0);
+            vec.setValueCount(0);
+
+            VectorSchemaRoot root = new 
VectorSchemaRoot(Collections.singletonList(vec));
+            RowType rowType = RowType.of(new IntType());
+            FlinkArrowReader reader = FlinkArrowReader.create(root, rowType);
+
+            assertEquals(0, reader.getRowCount());
+
+            reader.close();
+            root.close();
+        }
+    }
+
+    @Test
+    public void testResetWithNewRoot() {
+        try (BufferAllocator allocator =
+                FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testReset", 
0, Long.MAX_VALUE)) {
+            // Batch 1
+            IntVector vec1 = new IntVector("col", allocator);
+            vec1.allocateNew(2);
+            vec1.setSafe(0, 10);
+            vec1.setSafe(1, 20);
+            vec1.setValueCount(2);
+            VectorSchemaRoot root1 = new 
VectorSchemaRoot(Collections.singletonList(vec1));
+
+            RowType rowType = RowType.of(new IntType());
+            FlinkArrowReader reader = FlinkArrowReader.create(root1, rowType);
+
+            assertEquals(10, reader.read(0).getInt(0));
+            assertEquals(20, reader.read(1).getInt(0));
+
+            // Batch 2 — different values, same schema
+            IntVector vec2 = new IntVector("col", allocator);
+            vec2.allocateNew(2);
+            vec2.setSafe(0, 99);
+            vec2.setSafe(1, 100);
+            vec2.setValueCount(2);
+            VectorSchemaRoot root2 = new 
VectorSchemaRoot(Collections.singletonList(vec2));
+
+            reader.reset(root2);
+            assertEquals(2, reader.getRowCount());
+            assertEquals(99, reader.read(0).getInt(0));
+            assertEquals(100, reader.read(1).getInt(0));
+
+            reader.close();
+            root1.close();
+            root2.close();
+        }
+    }
+
+    @Test
+    public void testUnsupportedTypeThrows() {
+        try (BufferAllocator allocator =
+                
FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testUnsupported", 0, 
Long.MAX_VALUE)) {
+            IntVector vec = new IntVector("col", allocator);
+            vec.allocateNew(1);
+            vec.setSafe(0, 1);
+            vec.setValueCount(1);
+
+            assertThrows(
+                    UnsupportedOperationException.class,
+                    () -> FlinkArrowReader.createColumnVector(
+                            vec, new RawType<>(String.class, 
StringSerializer.INSTANCE)));
+
+            vec.close();
+        }
+    }
+}


Reply via email to