This is an automated email from the ASF dual-hosted git repository.
zhangmang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git
The following commit(s) were added to refs/heads/master by this push:
new 8dc034a0 [AURON #1851] Introduce Arrow to Flink RowData reader (#2063)
8dc034a0 is described below
commit 8dc034a09338ffde319e6e3493508b0baf1a2e5c
Author: Weiqing Yang <[email protected]>
AuthorDate: Mon Mar 9 20:58:19 2026 -0700
[AURON #1851] Introduce Arrow to Flink RowData reader (#2063)
# Which issue does this PR close?
Closes #1851
# Rationale for this change
Per AIP-1, the Flink integration data path requires converting Arrow
vectors returned by the native engine (DataFusion/Rust) back into Flink
RowData so downstream Flink operators can process results.
# What changes are included in this PR?
- FlinkArrowReader orchestrator — zero-copy columnar access via
ColumnarRowData + VectorizedColumnBatch
- 16 ArrowXxxColumnVector wrappers for all 17 supported types
- Decimal fromUnscaledLong optimization for precision ≤ 18
- Batch reset support for streaming pipelines
- 21 unit tests in FlinkArrowReaderTest
# Are there any user-facing changes?
No. Internal API for Flink integration.
# How was this patch tested?
21 tests: ./build/mvn test -pl auron-flink-extension/auron-flink-runtime
-am -Pscala-2.12 -Pflink-1.18 -Pspark-3.5 -DskipBuildNative
-Dtest=FlinkArrowReaderTest
Result: 21 pass, 0 failures.
---
.../apache/auron/flink/arrow/FlinkArrowReader.java | 257 +++++++
.../arrow/vectors/ArrowArrayColumnVector.java | 64 ++
.../arrow/vectors/ArrowBigIntColumnVector.java | 53 ++
.../arrow/vectors/ArrowBooleanColumnVector.java | 53 ++
.../flink/arrow/vectors/ArrowDateColumnVector.java | 54 ++
.../arrow/vectors/ArrowDecimalColumnVector.java | 84 +++
.../arrow/vectors/ArrowDoubleColumnVector.java | 53 ++
.../arrow/vectors/ArrowFloatColumnVector.java | 53 ++
.../flink/arrow/vectors/ArrowIntColumnVector.java | 53 ++
.../flink/arrow/vectors/ArrowMapColumnVector.java | 69 ++
.../flink/arrow/vectors/ArrowRowColumnVector.java | 66 ++
.../arrow/vectors/ArrowSmallIntColumnVector.java | 53 ++
.../flink/arrow/vectors/ArrowTimeColumnVector.java | 64 ++
.../arrow/vectors/ArrowTimestampColumnVector.java | 76 +++
.../arrow/vectors/ArrowTinyIntColumnVector.java | 53 ++
.../arrow/vectors/ArrowVarBinaryColumnVector.java | 54 ++
.../arrow/vectors/ArrowVarCharColumnVector.java | 54 ++
.../auron/flink/arrow/FlinkArrowReaderTest.java | 737 +++++++++++++++++++++
18 files changed, 1950 insertions(+)
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowReader.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowReader.java
new file mode 100644
index 00000000..aa0280af
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowReader.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow;
+
+import java.util.List;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.TimeMicroVector;
+import org.apache.arrow.vector.TimeStampVector;
+import org.apache.arrow.vector.TinyIntVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.ListVector;
+import org.apache.arrow.vector.complex.MapVector;
+import org.apache.arrow.vector.complex.StructVector;
+import org.apache.auron.flink.arrow.vectors.ArrowArrayColumnVector;
+import org.apache.auron.flink.arrow.vectors.ArrowBigIntColumnVector;
+import org.apache.auron.flink.arrow.vectors.ArrowBooleanColumnVector;
+import org.apache.auron.flink.arrow.vectors.ArrowDateColumnVector;
+import org.apache.auron.flink.arrow.vectors.ArrowDecimalColumnVector;
+import org.apache.auron.flink.arrow.vectors.ArrowDoubleColumnVector;
+import org.apache.auron.flink.arrow.vectors.ArrowFloatColumnVector;
+import org.apache.auron.flink.arrow.vectors.ArrowIntColumnVector;
+import org.apache.auron.flink.arrow.vectors.ArrowMapColumnVector;
+import org.apache.auron.flink.arrow.vectors.ArrowRowColumnVector;
+import org.apache.auron.flink.arrow.vectors.ArrowSmallIntColumnVector;
+import org.apache.auron.flink.arrow.vectors.ArrowTimeColumnVector;
+import org.apache.auron.flink.arrow.vectors.ArrowTimestampColumnVector;
+import org.apache.auron.flink.arrow.vectors.ArrowTinyIntColumnVector;
+import org.apache.auron.flink.arrow.vectors.ArrowVarBinaryColumnVector;
+import org.apache.auron.flink.arrow.vectors.ArrowVarCharColumnVector;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.columnar.ColumnarRowData;
+import org.apache.flink.table.data.columnar.vector.ColumnVector;
+import org.apache.flink.table.data.columnar.vector.VectorizedColumnBatch;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Reads Arrow {@link VectorSchemaRoot} data as Flink {@link RowData}.
+ *
+ * <p>Uses Flink's columnar data structures ({@link ColumnarRowData} backed by
{@link
+ * VectorizedColumnBatch}) for zero-copy access to Arrow vectors. Each Arrow
{@link FieldVector} is
+ * wrapped in a Flink {@link ColumnVector} implementation that delegates reads
directly to the
+ * underlying Arrow vector.
+ *
+ * <p>Object reuse: {@link #read(int)} returns the same {@link
ColumnarRowData} instance with a
+ * different row ID. Callers must consume or copy the returned row before the
next call. This is
+ * standard Flink practice for columnar readers.
+ *
+ * <p>Implements {@link AutoCloseable} for use in resource management blocks.
Note: this does NOT
+ * close the underlying {@link VectorSchemaRoot}. The caller that created the
root is responsible
+ * for its lifecycle.
+ */
+public class FlinkArrowReader implements AutoCloseable {
+
+ private final RowType rowType;
+ private ColumnVector[] columnVectors;
+ private VectorizedColumnBatch batch;
+ private ColumnarRowData reusableRow;
+ private VectorSchemaRoot root;
+
+ private FlinkArrowReader(ColumnVector[] columnVectors, VectorSchemaRoot
root, RowType rowType) {
+ this.columnVectors = columnVectors;
+ this.batch = new VectorizedColumnBatch(columnVectors);
+ this.reusableRow = new ColumnarRowData(batch);
+ this.root = root;
+ this.rowType = rowType;
+ }
+
+ /**
+ * Creates a {@link FlinkArrowReader} from a {@link VectorSchemaRoot} and
{@link RowType}.
+ *
+ * <p>The RowType must match the schema of the VectorSchemaRoot (same
number of fields, matching
+ * types). Each Arrow field vector is wrapped in the appropriate Flink
{@link ColumnVector}
+ * implementation based on the corresponding Flink {@link LogicalType}.
+ *
+ * @param root the Arrow VectorSchemaRoot containing the data
+ * @param rowType the Flink RowType describing the schema
+ * @return a new FlinkArrowReader
+ * @throws IllegalArgumentException if field counts do not match
+ * @throws UnsupportedOperationException if a LogicalType is not supported
+ */
+ public static FlinkArrowReader create(VectorSchemaRoot root, RowType
rowType) {
+ Preconditions.checkNotNull(root, "root must not be null");
+ Preconditions.checkNotNull(rowType, "rowType must not be null");
+ List<FieldVector> fieldVectors = root.getFieldVectors();
+ List<RowType.RowField> fields = rowType.getFields();
+ if (fieldVectors.size() != fields.size()) {
+ throw new IllegalArgumentException(
+ "VectorSchemaRoot has " + fieldVectors.size() + " fields
but RowType has " + fields.size());
+ }
+ ColumnVector[] columns = new ColumnVector[fieldVectors.size()];
+ for (int i = 0; i < fieldVectors.size(); i++) {
+ columns[i] = createColumnVector(fieldVectors.get(i),
fields.get(i).getType());
+ }
+ return new FlinkArrowReader(columns, root, rowType);
+ }
+
+ /**
+ * Reads a row at the given position. Returns a reused {@link RowData}
object — callers must not
+ * hold references across calls.
+ *
+ * @param rowId the row index within the current batch
+ * @return the row data at the given position
+ */
+ public RowData read(int rowId) {
+ reusableRow.setRowId(rowId);
+ return reusableRow;
+ }
+
+ /**
+ * Returns the number of rows in the current batch.
+ *
+ * @return the row count
+ */
+ public int getRowCount() {
+ return root.getRowCount();
+ }
+
+ /**
+ * Resets the reader to use a new {@link VectorSchemaRoot} with the same
schema. Recreates
+ * column vector wrappers for the new root's field vectors.
+ *
+ * <p>The new root must have the same schema (same number and types of
fields) as the original.
+ *
+ * @param newRoot the new VectorSchemaRoot, must not be null
+ */
+ public void reset(VectorSchemaRoot newRoot) {
+ Preconditions.checkNotNull(newRoot, "newRoot must not be null");
+ this.root = newRoot;
+ List<FieldVector> newVectors = newRoot.getFieldVectors();
+ Preconditions.checkArgument(
+ newVectors.size() == columnVectors.length,
+ "New root has %s fields but reader expects %s",
+ newVectors.size(),
+ columnVectors.length);
+ List<RowType.RowField> fields = rowType.getFields();
+ for (int i = 0; i < columnVectors.length; i++) {
+ columnVectors[i] =
+ createColumnVector(newVectors.get(i),
fields.get(i).getType());
+ }
+ this.batch = new VectorizedColumnBatch(columnVectors);
+ this.reusableRow = new ColumnarRowData(batch);
+ }
+
+ /**
+ * Implements {@link AutoCloseable} for use in resource management blocks.
Note: this does NOT
+ * close the underlying {@link VectorSchemaRoot}. The caller that created
the root is responsible
+ * for its lifecycle.
+ */
+ @Override
+ public void close() {
+ // Reader is a view; root lifecycle managed by caller.
+ }
+
+ /**
+ * Creates the appropriate Flink {@link ColumnVector} wrapper for the
given Arrow {@link
+ * FieldVector} and Flink {@link LogicalType}.
+ */
+ static ColumnVector createColumnVector(FieldVector vector, LogicalType
type) {
+ switch (type.getTypeRoot()) {
+ case BOOLEAN:
+ return new ArrowBooleanColumnVector((BitVector) vector);
+ case TINYINT:
+ return new ArrowTinyIntColumnVector((TinyIntVector) vector);
+ case SMALLINT:
+ return new ArrowSmallIntColumnVector((SmallIntVector) vector);
+ case INTEGER:
+ return new ArrowIntColumnVector((IntVector) vector);
+ case BIGINT:
+ return new ArrowBigIntColumnVector((BigIntVector) vector);
+ case FLOAT:
+ return new ArrowFloatColumnVector((Float4Vector) vector);
+ case DOUBLE:
+ return new ArrowDoubleColumnVector((Float8Vector) vector);
+ case VARCHAR:
+ case CHAR:
+ return new ArrowVarCharColumnVector((VarCharVector) vector);
+ case VARBINARY:
+ case BINARY:
+ return new ArrowVarBinaryColumnVector((VarBinaryVector)
vector);
+ case DECIMAL:
+ return new ArrowDecimalColumnVector((DecimalVector) vector);
+ case DATE:
+ return new ArrowDateColumnVector((DateDayVector) vector);
+ case TIME_WITHOUT_TIME_ZONE:
+ // The native engine (DataFusion) uses microsecond precision
for all temporal
+ // types, producing TimeMicroVector regardless of declared
Flink TIME precision.
+ return new ArrowTimeColumnVector((TimeMicroVector) vector);
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ // The native engine (DataFusion) uses microsecond precision
for all temporal
+ // types. TimeStampVector is the common parent of
TimeStampMicroVector and
+ // TimeStampMicroTZVector.
+ return new ArrowTimestampColumnVector((TimeStampVector)
vector);
+ case ARRAY:
+ return createArrayColumnVector((ListVector) vector,
(ArrayType) type);
+ case MAP:
+ return createMapColumnVector((MapVector) vector, (MapType)
type);
+ case ROW:
+ return createRowColumnVector((StructVector) vector, (RowType)
type);
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported Flink type for Arrow reader: " +
type.asSummaryString());
+ }
+ }
+
+ private static ColumnVector createArrayColumnVector(ListVector vector,
ArrayType arrayType) {
+ ColumnVector elementVector =
createColumnVector(vector.getDataVector(), arrayType.getElementType());
+ return new ArrowArrayColumnVector(vector, elementVector);
+ }
+
+ private static ColumnVector createMapColumnVector(MapVector vector,
MapType mapType) {
+ StructVector entriesVector = (StructVector) vector.getDataVector();
+ ColumnVector keyVector =
createColumnVector(entriesVector.getChild(MapVector.KEY_NAME),
mapType.getKeyType());
+ ColumnVector valueVector =
+
createColumnVector(entriesVector.getChild(MapVector.VALUE_NAME),
mapType.getValueType());
+ return new ArrowMapColumnVector(vector, keyVector, valueVector);
+ }
+
+ private static ColumnVector createRowColumnVector(StructVector vector,
RowType rowType) {
+ List<FieldVector> childVectors = vector.getChildrenFromFields();
+ List<RowType.RowField> fields = rowType.getFields();
+ ColumnVector[] childColumns = new ColumnVector[childVectors.size()];
+ for (int i = 0; i < childVectors.size(); i++) {
+ childColumns[i] =
+ createColumnVector(childVectors.get(i),
fields.get(i).getType());
+ }
+ return new ArrowRowColumnVector(vector, childColumns);
+ }
+}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowArrayColumnVector.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowArrayColumnVector.java
new file mode 100644
index 00000000..f3af97ab
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowArrayColumnVector.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.vectors;
+
+import org.apache.arrow.vector.complex.ListVector;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.columnar.ColumnarArrayData;
+import org.apache.flink.table.data.columnar.vector.ArrayColumnVector;
+import org.apache.flink.table.data.columnar.vector.ColumnVector;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A Flink {@link ArrayColumnVector} backed by an Arrow {@link ListVector}.
+ *
+ * <p>This wrapper delegates all reads to the underlying Arrow vector,
providing zero-copy access
+ * to Arrow list data from Flink's columnar batch execution engine. The child
element vector is
+ * itself a Flink {@link ColumnVector} wrapping the corresponding Arrow child
vector, enabling
+ * recursive nesting of complex types.
+ */
+public final class ArrowArrayColumnVector implements ArrayColumnVector {
+
+ private final ListVector vector;
+ private final ColumnVector elementColumnVector;
+
+ /**
+ * Creates a new wrapper around the given Arrow {@link ListVector}.
+ *
+ * @param vector the Arrow list vector to wrap, must not be null
+ * @param elementColumnVector the Flink column vector wrapping the Arrow
child element vector,
+ * must not be null
+ */
+ public ArrowArrayColumnVector(ListVector vector, ColumnVector
elementColumnVector) {
+ this.vector = Preconditions.checkNotNull(vector);
+ this.elementColumnVector =
Preconditions.checkNotNull(elementColumnVector);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean isNullAt(int i) {
+ return vector.isNull(i);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public ArrayData getArray(int i) {
+ int offset = vector.getOffsetBuffer().getInt((long) i *
ListVector.OFFSET_WIDTH);
+ int length = vector.getOffsetBuffer().getInt((long) (i + 1) *
ListVector.OFFSET_WIDTH) - offset;
+ return new ColumnarArrayData(elementColumnVector, offset, length);
+ }
+}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowBigIntColumnVector.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowBigIntColumnVector.java
new file mode 100644
index 00000000..6800234d
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowBigIntColumnVector.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.vectors;
+
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.flink.table.data.columnar.vector.LongColumnVector;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A Flink {@link LongColumnVector} backed by an Arrow {@link BigIntVector}.
+ *
+ * <p>This wrapper delegates all reads to the underlying Arrow vector,
providing zero-copy access
+ * to Arrow data from Flink's columnar batch execution engine.
+ */
+public final class ArrowBigIntColumnVector implements LongColumnVector {
+
+ private final BigIntVector vector;
+
+ /**
+ * Creates a new wrapper around the given Arrow {@link BigIntVector}.
+ *
+ * @param vector the Arrow vector to wrap, must not be null
+ */
+ public ArrowBigIntColumnVector(BigIntVector vector) {
+ this.vector = Preconditions.checkNotNull(vector);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean isNullAt(int i) {
+ return vector.isNull(i);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long getLong(int i) {
+ return vector.get(i);
+ }
+}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowBooleanColumnVector.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowBooleanColumnVector.java
new file mode 100644
index 00000000..e3dfad39
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowBooleanColumnVector.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.vectors;
+
+import org.apache.arrow.vector.BitVector;
+import org.apache.flink.table.data.columnar.vector.BooleanColumnVector;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A Flink {@link BooleanColumnVector} backed by an Arrow {@link BitVector}.
+ *
+ * <p>This wrapper delegates all reads to the underlying Arrow vector,
providing zero-copy access
+ * to Arrow data from Flink's columnar batch execution engine.
+ */
+public final class ArrowBooleanColumnVector implements BooleanColumnVector {
+
+ private final BitVector vector;
+
+ /**
+ * Creates a new wrapper around the given Arrow {@link BitVector}.
+ *
+ * @param vector the Arrow vector to wrap, must not be null
+ */
+ public ArrowBooleanColumnVector(BitVector vector) {
+ this.vector = Preconditions.checkNotNull(vector);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean isNullAt(int i) {
+ return vector.isNull(i);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean getBoolean(int i) {
+ return vector.get(i) != 0;
+ }
+}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowDateColumnVector.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowDateColumnVector.java
new file mode 100644
index 00000000..e52d476d
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowDateColumnVector.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.vectors;
+
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.flink.table.data.columnar.vector.IntColumnVector;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A Flink {@link IntColumnVector} backed by an Arrow {@link DateDayVector}.
+ *
+ * <p>This wrapper delegates all reads to the underlying Arrow vector,
providing zero-copy access
+ * to Arrow data from Flink's columnar batch execution engine. {@link
DateDayVector} stores dates
+ * as epoch days ({@code int}), which maps directly to Flink's internal DATE
representation.
+ */
+public final class ArrowDateColumnVector implements IntColumnVector {
+
+ private final DateDayVector vector;
+
+ /**
+ * Creates a new wrapper around the given Arrow {@link DateDayVector}.
+ *
+ * @param vector the Arrow vector to wrap, must not be null
+ */
+ public ArrowDateColumnVector(DateDayVector vector) {
+ this.vector = Preconditions.checkNotNull(vector);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean isNullAt(int i) {
+ return vector.isNull(i);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int getInt(int i) {
+ return vector.get(i);
+ }
+}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowDecimalColumnVector.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowDecimalColumnVector.java
new file mode 100644
index 00000000..dd3147dc
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowDecimalColumnVector.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.vectors;
+
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.columnar.vector.DecimalColumnVector;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A Flink {@link DecimalColumnVector} backed by an Arrow {@link
DecimalVector}.
+ *
+ * <p>This wrapper delegates all reads to the underlying Arrow vector,
providing zero-copy access
+ * to Arrow data from Flink's columnar batch execution engine.
+ *
+ * <p>For compact decimals (precision <= 18), the implementation avoids
BigDecimal allocation by
+ * reading the unscaled value directly from Arrow's little-endian byte
representation.
+ */
+public final class ArrowDecimalColumnVector implements DecimalColumnVector {
+
+ /**
+ * Maximum precision that fits in a long (same as Flink's internal compact
decimal threshold).
+ * DecimalData.MAX_COMPACT_PRECISION is package-private, so we define our
own constant.
+ */
+ private static final int MAX_COMPACT_PRECISION = 18;
+
+ private final DecimalVector vector;
+
+ /**
+ * Creates a new wrapper around the given Arrow {@link DecimalVector}.
+ *
+ * @param vector the Arrow vector to wrap, must not be null
+ */
+ public ArrowDecimalColumnVector(DecimalVector vector) {
+ this.vector = Preconditions.checkNotNull(vector);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean isNullAt(int i) {
+ return vector.isNull(i);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public DecimalData getDecimal(int i, int precision, int scale) {
+ if (precision <= MAX_COMPACT_PRECISION) {
+ // Compact path: avoid BigDecimal allocation for precision <= 18.
+ // Arrow stores decimals as 128-bit little-endian two's complement.
+ // For precision <= 18, the value fits in a long.
+ long unscaledLong = readLittleEndianLong(vector.getDataBuffer(),
(long) i * 16);
+ return DecimalData.fromUnscaledLong(unscaledLong, precision,
scale);
+ }
+ return DecimalData.fromBigDecimal(vector.getObject(i), precision,
scale);
+ }
+
+ /**
+ * Reads a little-endian long (8 bytes) from an Arrow buffer at the given
byte offset. Arrow
+ * stores decimals as 128-bit little-endian two's complement. For values
that fit in a long
+ * (precision <= 18), the lower 8 bytes are sufficient.
+ *
+ * @param buffer the Arrow data buffer
+ * @param offset byte offset into the buffer
+ * @return the unscaled value as a long
+ */
+ private static long readLittleEndianLong(ArrowBuf buffer, long offset) {
+ return buffer.getLong(offset);
+ }
+}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowDoubleColumnVector.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowDoubleColumnVector.java
new file mode 100644
index 00000000..a401ebea
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowDoubleColumnVector.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.vectors;
+
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.flink.table.data.columnar.vector.DoubleColumnVector;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A Flink {@link DoubleColumnVector} backed by an Arrow {@link Float8Vector}.
+ *
+ * <p>This wrapper delegates all reads to the underlying Arrow vector,
providing zero-copy access
+ * to Arrow data from Flink's columnar batch execution engine.
+ */
+public final class ArrowDoubleColumnVector implements DoubleColumnVector {
+
+ private final Float8Vector vector;
+
+ /**
+ * Creates a new wrapper around the given Arrow {@link Float8Vector}.
+ *
+ * @param vector the Arrow vector to wrap, must not be null
+ */
+ public ArrowDoubleColumnVector(Float8Vector vector) {
+ this.vector = Preconditions.checkNotNull(vector);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean isNullAt(int i) {
+ return vector.isNull(i);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public double getDouble(int i) {
+ return vector.get(i);
+ }
+}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowFloatColumnVector.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowFloatColumnVector.java
new file mode 100644
index 00000000..3cd5b696
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowFloatColumnVector.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.vectors;
+
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.flink.table.data.columnar.vector.FloatColumnVector;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A Flink {@link FloatColumnVector} backed by an Arrow {@link Float4Vector}.
+ *
+ * <p>This wrapper delegates all reads to the underlying Arrow vector,
providing zero-copy access
+ * to Arrow data from Flink's columnar batch execution engine.
+ */
+public final class ArrowFloatColumnVector implements FloatColumnVector {
+
+ private final Float4Vector vector;
+
+ /**
+ * Creates a new wrapper around the given Arrow {@link Float4Vector}.
+ *
+ * @param vector the Arrow vector to wrap, must not be null
+ */
+ public ArrowFloatColumnVector(Float4Vector vector) {
+ this.vector = Preconditions.checkNotNull(vector);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean isNullAt(int i) {
+ return vector.isNull(i);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public float getFloat(int i) {
+ return vector.get(i);
+ }
+}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowIntColumnVector.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowIntColumnVector.java
new file mode 100644
index 00000000..974f5fd0
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowIntColumnVector.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.vectors;
+
+import org.apache.arrow.vector.IntVector;
+import org.apache.flink.table.data.columnar.vector.IntColumnVector;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A Flink {@link IntColumnVector} backed by an Arrow {@link IntVector}.
+ *
+ * <p>This wrapper delegates all reads to the underlying Arrow vector,
providing zero-copy access
+ * to Arrow data from Flink's columnar batch execution engine.
+ */
+public final class ArrowIntColumnVector implements IntColumnVector {
+
+ private final IntVector vector;
+
+ /**
+ * Creates a new wrapper around the given Arrow {@link IntVector}.
+ *
+ * @param vector the Arrow vector to wrap, must not be null
+ */
+ public ArrowIntColumnVector(IntVector vector) {
+ this.vector = Preconditions.checkNotNull(vector);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean isNullAt(int i) {
+ return vector.isNull(i);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int getInt(int i) {
+ return vector.get(i);
+ }
+}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowMapColumnVector.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowMapColumnVector.java
new file mode 100644
index 00000000..fa17ab35
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowMapColumnVector.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.vectors;
+
+import org.apache.arrow.vector.complex.MapVector;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.columnar.ColumnarMapData;
+import org.apache.flink.table.data.columnar.vector.ColumnVector;
+import org.apache.flink.table.data.columnar.vector.MapColumnVector;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A Flink {@link MapColumnVector} backed by an Arrow {@link MapVector}.
+ *
+ * <p>This wrapper delegates all reads to the underlying Arrow vector,
providing zero-copy access
+ * to Arrow map data from Flink's columnar batch execution engine. Arrow
represents maps as a list
+ * of struct entries, where each entry contains a "key" and "value" child
vector. The key and value
+ * Flink {@link ColumnVector} instances wrap the corresponding Arrow child
vectors, enabling
+ * recursive nesting of complex types.
+ */
+public final class ArrowMapColumnVector implements MapColumnVector {
+
+ private final MapVector vector;
+ private final ColumnVector keyColumnVector;
+ private final ColumnVector valueColumnVector;
+
+ /**
+ * Creates a new wrapper around the given Arrow {@link MapVector}.
+ *
+ * @param vector the Arrow map vector to wrap, must not be null
+ * @param keyColumnVector the Flink column vector wrapping the Arrow key
child vector, must not
+ * be null
+ * @param valueColumnVector the Flink column vector wrapping the Arrow
value child vector, must
+ * not be null
+ */
+ public ArrowMapColumnVector(MapVector vector, ColumnVector
keyColumnVector, ColumnVector valueColumnVector) {
+ this.vector = Preconditions.checkNotNull(vector);
+ this.keyColumnVector = Preconditions.checkNotNull(keyColumnVector);
+ this.valueColumnVector = Preconditions.checkNotNull(valueColumnVector);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean isNullAt(int i) {
+ return vector.isNull(i);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public MapData getMap(int i) {
+ int offset = vector.getOffsetBuffer().getInt((long) i *
MapVector.OFFSET_WIDTH);
+ int length = vector.getOffsetBuffer().getInt((long) (i + 1) *
MapVector.OFFSET_WIDTH) - offset;
+ return new ColumnarMapData(keyColumnVector, valueColumnVector, offset,
length);
+ }
+}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowRowColumnVector.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowRowColumnVector.java
new file mode 100644
index 00000000..f6c5b2f5
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowRowColumnVector.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.vectors;
+
+import org.apache.arrow.vector.complex.StructVector;
+import org.apache.flink.table.data.columnar.ColumnarRowData;
+import org.apache.flink.table.data.columnar.vector.ColumnVector;
+import org.apache.flink.table.data.columnar.vector.RowColumnVector;
+import org.apache.flink.table.data.columnar.vector.VectorizedColumnBatch;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A Flink {@link RowColumnVector} backed by an Arrow {@link StructVector}.
+ *
+ * <p>This wrapper delegates all reads to the underlying Arrow vector,
providing zero-copy access
+ * to Arrow struct data from Flink's columnar batch execution engine. Each
struct field is
+ * represented as a Flink {@link ColumnVector} wrapping the corresponding
Arrow child vector,
+ * enabling recursive nesting of complex types. The child vectors are bundled
into a {@link
+ * VectorizedColumnBatch} and accessed through a reusable {@link
ColumnarRowData} instance.
+ */
+public final class ArrowRowColumnVector implements RowColumnVector {
+
+ private final StructVector vector;
+ private final VectorizedColumnBatch childBatch;
+ private final ColumnarRowData reusableRow;
+
+ /**
+ * Creates a new wrapper around the given Arrow {@link StructVector}.
+ *
+ * @param vector the Arrow struct vector to wrap, must not be null
+ * @param childColumnVectors the Flink column vectors wrapping each Arrow
child field vector,
+ * must not be null
+ */
+ public ArrowRowColumnVector(StructVector vector, ColumnVector[]
childColumnVectors) {
+ this.vector = Preconditions.checkNotNull(vector);
+ this.childBatch = new
VectorizedColumnBatch(Preconditions.checkNotNull(childColumnVectors));
+ this.reusableRow = new ColumnarRowData(childBatch);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean isNullAt(int i) {
+ return vector.isNull(i);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public ColumnarRowData getRow(int i) {
+ reusableRow.setRowId(i);
+ return reusableRow;
+ }
+}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowSmallIntColumnVector.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowSmallIntColumnVector.java
new file mode 100644
index 00000000..0dfee501
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowSmallIntColumnVector.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.vectors;
+
+import org.apache.arrow.vector.SmallIntVector;
+import org.apache.flink.table.data.columnar.vector.ShortColumnVector;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A Flink {@link ShortColumnVector} backed by an Arrow {@link SmallIntVector}.
+ *
+ * <p>This wrapper delegates all reads to the underlying Arrow vector,
providing zero-copy access
+ * to Arrow data from Flink's columnar batch execution engine.
+ */
+public final class ArrowSmallIntColumnVector implements ShortColumnVector {
+
+ private final SmallIntVector vector;
+
+ /**
+ * Creates a new wrapper around the given Arrow {@link SmallIntVector}.
+ *
+ * @param vector the Arrow vector to wrap, must not be null
+ */
+ public ArrowSmallIntColumnVector(SmallIntVector vector) {
+ this.vector = Preconditions.checkNotNull(vector);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean isNullAt(int i) {
+ return vector.isNull(i);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public short getShort(int i) {
+ return vector.get(i);
+ }
+}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowTimeColumnVector.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowTimeColumnVector.java
new file mode 100644
index 00000000..66555256
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowTimeColumnVector.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.vectors;
+
+import org.apache.arrow.vector.TimeMicroVector;
+import org.apache.flink.table.data.columnar.vector.IntColumnVector;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A Flink {@link IntColumnVector} backed by an Arrow {@link TimeMicroVector}.
+ *
+ * <p>This wrapper delegates all reads to the underlying Arrow vector,
providing zero-copy access
+ * to Arrow data from Flink's columnar batch execution engine. {@link
TimeMicroVector} stores time
+ * values as microseconds since midnight ({@code long}), which are converted
to milliseconds
+ * ({@code int}) to match Flink's internal TIME representation. The native
engine (DataFusion)
+ * uses microsecond precision for all temporal types.
+ */
+public final class ArrowTimeColumnVector implements IntColumnVector {
+
+ private final TimeMicroVector vector;
+
+ /**
+ * Creates a new wrapper around the given Arrow {@link TimeMicroVector}.
+ *
+ * @param vector the Arrow vector to wrap, must not be null
+ */
+ public ArrowTimeColumnVector(TimeMicroVector vector) {
+ this.vector = Preconditions.checkNotNull(vector);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean isNullAt(int i) {
+ return vector.isNull(i);
+ }
+
+ /**
+ * Returns the time value at the given index as milliseconds since
midnight.
+ *
+ * <p>The underlying Arrow vector stores microseconds; this method divides
by 1000 to produce
+ * the millisecond value expected by Flink's internal TIME type.
+ *
+ * @param i the row index
+ * @return time of day in milliseconds since midnight
+ */
+ @Override
+ public int getInt(int i) {
+ return (int) (vector.get(i) / 1000);
+ }
+}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowTimestampColumnVector.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowTimestampColumnVector.java
new file mode 100644
index 00000000..d10b2d30
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowTimestampColumnVector.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.vectors;
+
+import org.apache.arrow.vector.TimeStampVector;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.columnar.vector.TimestampColumnVector;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A Flink {@link TimestampColumnVector} backed by an Arrow {@link
TimeStampVector}.
+ *
+ * <p>This wrapper delegates all reads to the underlying Arrow vector,
providing zero-copy access
+ * to Arrow data from Flink's columnar batch execution engine. It handles both
{@code
+ * TimeStampMicroVector} (TIMESTAMP) and {@code TimeStampMicroTZVector}
(TIMESTAMP_LTZ) by
+ * accepting their common parent type {@link TimeStampVector}. The native
engine (DataFusion)
+ * uses microsecond precision for all temporal types. Microsecond values are
converted to Flink's
+ * {@link TimestampData} representation (epoch millis + sub-millisecond nanos).
+ */
+public final class ArrowTimestampColumnVector implements TimestampColumnVector
{
+
+ private final TimeStampVector vector;
+
+ /**
+ * Creates a new wrapper around the given Arrow {@link TimeStampVector}.
+ *
+ * <p>Accepts both {@code TimeStampMicroVector} and {@code
TimeStampMicroTZVector} since they
+ * share the same storage format and parent type.
+ *
+ * @param vector the Arrow vector to wrap, must not be null
+ */
+ public ArrowTimestampColumnVector(TimeStampVector vector) {
+ this.vector = Preconditions.checkNotNull(vector);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean isNullAt(int i) {
+ return vector.isNull(i);
+ }
+
+ /**
+ * Returns the timestamp at the given index as a {@link TimestampData}.
+ *
+ * <p>The underlying Arrow vector stores microseconds since epoch. This
method splits the value
+ * into epoch milliseconds and sub-millisecond nanoseconds to construct a
{@link TimestampData}.
+ *
+ * @param i the row index
+ * @param precision the timestamp precision (unused; conversion is always
from microseconds)
+ * @return the timestamp value
+ */
+ @Override
+ public TimestampData getTimestamp(int i, int precision) {
+ long micros = vector.get(i);
+ // Use floor-based division so that for negative micros (pre-epoch),
the remainder is
+ // non-negative and nanoOfMillisecond stays within [0, 999_999], as
required by
+ // TimestampData.fromEpochMillis.
+ long millis = Math.floorDiv(micros, 1000);
+ int nanoOfMillisecond = ((int) Math.floorMod(micros, 1000)) * 1000;
+ return TimestampData.fromEpochMillis(millis, nanoOfMillisecond);
+ }
+}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowTinyIntColumnVector.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowTinyIntColumnVector.java
new file mode 100644
index 00000000..a5ac0f8c
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowTinyIntColumnVector.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.vectors;
+
+import org.apache.arrow.vector.TinyIntVector;
+import org.apache.flink.table.data.columnar.vector.ByteColumnVector;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A Flink {@link ByteColumnVector} backed by an Arrow {@link TinyIntVector}.
+ *
+ * <p>This wrapper delegates all reads to the underlying Arrow vector,
providing zero-copy access
+ * to Arrow data from Flink's columnar batch execution engine.
+ */
+public final class ArrowTinyIntColumnVector implements ByteColumnVector {
+
+ private final TinyIntVector vector;
+
+ /**
+ * Creates a new wrapper around the given Arrow {@link TinyIntVector}.
+ *
+ * @param vector the Arrow vector to wrap, must not be null
+ */
+ public ArrowTinyIntColumnVector(TinyIntVector vector) {
+ this.vector = Preconditions.checkNotNull(vector);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean isNullAt(int i) {
+ return vector.isNull(i);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public byte getByte(int i) {
+ return vector.get(i);
+ }
+}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowVarBinaryColumnVector.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowVarBinaryColumnVector.java
new file mode 100644
index 00000000..cec43c6c
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowVarBinaryColumnVector.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.vectors;
+
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.flink.table.data.columnar.vector.BytesColumnVector;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A Flink {@link BytesColumnVector} backed by an Arrow {@link
VarBinaryVector}.
+ *
+ * <p>This wrapper delegates all reads to the underlying Arrow vector,
providing zero-copy access
+ * to Arrow data from Flink's columnar batch execution engine.
+ */
+public final class ArrowVarBinaryColumnVector implements BytesColumnVector {
+
+ private final VarBinaryVector vector;
+
+ /**
+ * Creates a new wrapper around the given Arrow {@link VarBinaryVector}.
+ *
+ * @param vector the Arrow vector to wrap, must not be null
+ */
+ public ArrowVarBinaryColumnVector(VarBinaryVector vector) {
+ this.vector = Preconditions.checkNotNull(vector);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean isNullAt(int i) {
+ return vector.isNull(i);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Bytes getBytes(int i) {
+ byte[] bytes = vector.get(i);
+ return new Bytes(bytes, 0, bytes.length);
+ }
+}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowVarCharColumnVector.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowVarCharColumnVector.java
new file mode 100644
index 00000000..d2b19bb8
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowVarCharColumnVector.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.vectors;
+
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.flink.table.data.columnar.vector.BytesColumnVector;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A Flink {@link BytesColumnVector} backed by an Arrow {@link VarCharVector}.
+ *
+ * <p>This wrapper delegates all reads to the underlying Arrow vector,
providing zero-copy access
+ * to Arrow data from Flink's columnar batch execution engine.
+ */
+public final class ArrowVarCharColumnVector implements BytesColumnVector {
+
+ private final VarCharVector vector;
+
+ /**
+ * Creates a new wrapper around the given Arrow {@link VarCharVector}.
+ *
+ * @param vector the Arrow vector to wrap, must not be null
+ */
+ public ArrowVarCharColumnVector(VarCharVector vector) {
+ this.vector = Preconditions.checkNotNull(vector);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean isNullAt(int i) {
+ return vector.isNull(i);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Bytes getBytes(int i) {
+ byte[] bytes = vector.get(i);
+ return new Bytes(bytes, 0, bytes.length);
+ }
+}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowReaderTest.java
b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowReaderTest.java
new file mode 100644
index 00000000..1816c6c8
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowReaderTest.java
@@ -0,0 +1,737 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.TimeMicroVector;
+import org.apache.arrow.vector.TimeStampMicroTZVector;
+import org.apache.arrow.vector.TimeStampMicroVector;
+import org.apache.arrow.vector.TinyIntVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.ListVector;
+import org.apache.arrow.vector.complex.MapVector;
+import org.apache.arrow.vector.complex.StructVector;
+import org.apache.arrow.vector.complex.impl.UnionListWriter;
+import org.apache.arrow.vector.complex.writer.BaseWriter;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RawType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.junit.jupiter.api.Test;
+
+/** Unit tests for {@link FlinkArrowReader}. */
+public class FlinkArrowReaderTest {
+
+ @Test
+ public void testBooleanVector() {
+ try (BufferAllocator allocator =
+
FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testBoolean", 0,
Long.MAX_VALUE)) {
+ BitVector bitVector = new BitVector("col", allocator);
+ bitVector.allocateNew(3);
+ bitVector.setSafe(0, 1);
+ bitVector.setNull(1);
+ bitVector.setSafe(2, 0);
+ bitVector.setValueCount(3);
+
+ VectorSchemaRoot root = new
VectorSchemaRoot(Collections.singletonList(bitVector));
+ RowType rowType = RowType.of(new BooleanType());
+ FlinkArrowReader reader = FlinkArrowReader.create(root, rowType);
+
+ assertEquals(3, reader.getRowCount());
+ assertTrue(reader.read(0).getBoolean(0));
+ assertTrue(reader.read(1).isNullAt(0));
+ assertFalse(reader.read(2).getBoolean(0));
+
+ reader.close();
+ root.close();
+ }
+ }
+
+ @Test
+ public void testTinyIntVector() {
+ try (BufferAllocator allocator =
+
FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testTinyInt", 0,
Long.MAX_VALUE)) {
+ TinyIntVector vec = new TinyIntVector("col", allocator);
+ vec.allocateNew(3);
+ vec.setSafe(0, 1);
+ vec.setNull(1);
+ vec.setSafe(2, -1);
+ vec.setValueCount(3);
+
+ VectorSchemaRoot root = new
VectorSchemaRoot(Collections.singletonList(vec));
+ RowType rowType = RowType.of(new TinyIntType());
+ FlinkArrowReader reader = FlinkArrowReader.create(root, rowType);
+
+ assertEquals((byte) 1, reader.read(0).getByte(0));
+ assertTrue(reader.read(1).isNullAt(0));
+ assertEquals((byte) -1, reader.read(2).getByte(0));
+
+ reader.close();
+ root.close();
+ }
+ }
+
+ @Test
+ public void testSmallIntVector() {
+ try (BufferAllocator allocator =
+
FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testSmallInt", 0,
Long.MAX_VALUE)) {
+ SmallIntVector vec = new SmallIntVector("col", allocator);
+ vec.allocateNew(3);
+ vec.setSafe(0, 100);
+ vec.setNull(1);
+ vec.setSafe(2, -100);
+ vec.setValueCount(3);
+
+ VectorSchemaRoot root = new
VectorSchemaRoot(Collections.singletonList(vec));
+ RowType rowType = RowType.of(new SmallIntType());
+ FlinkArrowReader reader = FlinkArrowReader.create(root, rowType);
+
+ assertEquals((short) 100, reader.read(0).getShort(0));
+ assertTrue(reader.read(1).isNullAt(0));
+ assertEquals((short) -100, reader.read(2).getShort(0));
+
+ reader.close();
+ root.close();
+ }
+ }
+
+ @Test
+ public void testIntVector() {
+ try (BufferAllocator allocator =
+ FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testInt", 0,
Long.MAX_VALUE)) {
+ IntVector vec = new IntVector("col", allocator);
+ vec.allocateNew(3);
+ vec.setSafe(0, 42);
+ vec.setNull(1);
+ vec.setSafe(2, Integer.MAX_VALUE);
+ vec.setValueCount(3);
+
+ VectorSchemaRoot root = new
VectorSchemaRoot(Collections.singletonList(vec));
+ RowType rowType = RowType.of(new IntType());
+ FlinkArrowReader reader = FlinkArrowReader.create(root, rowType);
+
+ assertEquals(42, reader.read(0).getInt(0));
+ assertTrue(reader.read(1).isNullAt(0));
+ assertEquals(Integer.MAX_VALUE, reader.read(2).getInt(0));
+
+ reader.close();
+ root.close();
+ }
+ }
+
+ @Test
+ public void testBigIntVector() {
+ try (BufferAllocator allocator =
+ FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testBigInt",
0, Long.MAX_VALUE)) {
+ BigIntVector vec = new BigIntVector("col", allocator);
+ vec.allocateNew(3);
+ vec.setSafe(0, Long.MAX_VALUE);
+ vec.setNull(1);
+ vec.setSafe(2, -1L);
+ vec.setValueCount(3);
+
+ VectorSchemaRoot root = new
VectorSchemaRoot(Collections.singletonList(vec));
+ RowType rowType = RowType.of(new BigIntType());
+ FlinkArrowReader reader = FlinkArrowReader.create(root, rowType);
+
+ assertEquals(Long.MAX_VALUE, reader.read(0).getLong(0));
+ assertTrue(reader.read(1).isNullAt(0));
+ assertEquals(-1L, reader.read(2).getLong(0));
+
+ reader.close();
+ root.close();
+ }
+ }
+
+ @Test
+ public void testFloatVector() {
+ try (BufferAllocator allocator =
+ FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testFloat",
0, Long.MAX_VALUE)) {
+ Float4Vector vec = new Float4Vector("col", allocator);
+ vec.allocateNew(3);
+ vec.setSafe(0, 3.14f);
+ vec.setNull(1);
+ vec.setSafe(2, Float.NaN);
+ vec.setValueCount(3);
+
+ VectorSchemaRoot root = new
VectorSchemaRoot(Collections.singletonList(vec));
+ RowType rowType = RowType.of(new FloatType());
+ FlinkArrowReader reader = FlinkArrowReader.create(root, rowType);
+
+ assertEquals(3.14f, reader.read(0).getFloat(0), 0.001f);
+ assertTrue(reader.read(1).isNullAt(0));
+ assertTrue(Float.isNaN(reader.read(2).getFloat(0)));
+
+ reader.close();
+ root.close();
+ }
+ }
+
+ @Test
+ public void testDoubleVector() {
+ try (BufferAllocator allocator =
+ FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testDouble",
0, Long.MAX_VALUE)) {
+ Float8Vector vec = new Float8Vector("col", allocator);
+ vec.allocateNew(3);
+ vec.setSafe(0, 2.718);
+ vec.setNull(1);
+ vec.setSafe(2, Double.MAX_VALUE);
+ vec.setValueCount(3);
+
+ VectorSchemaRoot root = new
VectorSchemaRoot(Collections.singletonList(vec));
+ RowType rowType = RowType.of(new DoubleType());
+ FlinkArrowReader reader = FlinkArrowReader.create(root, rowType);
+
+ assertEquals(2.718, reader.read(0).getDouble(0), 0.001);
+ assertTrue(reader.read(1).isNullAt(0));
+ assertEquals(Double.MAX_VALUE, reader.read(2).getDouble(0));
+
+ reader.close();
+ root.close();
+ }
+ }
+
+ @Test
+ public void testVarCharVector() {
+ try (BufferAllocator allocator =
+
FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testVarChar", 0,
Long.MAX_VALUE)) {
+ VarCharVector vec = new VarCharVector("col", allocator);
+ vec.allocateNew(3);
+ vec.setSafe(0, "hello".getBytes(StandardCharsets.UTF_8));
+ vec.setNull(1);
+ vec.setSafe(2, "".getBytes(StandardCharsets.UTF_8));
+ vec.setValueCount(3);
+
+ VectorSchemaRoot root = new
VectorSchemaRoot(Collections.singletonList(vec));
+ RowType rowType = RowType.of(new VarCharType(100));
+ FlinkArrowReader reader = FlinkArrowReader.create(root, rowType);
+
+ RowData row0 = reader.read(0);
+ assertArrayEquals(
+ "hello".getBytes(StandardCharsets.UTF_8),
row0.getString(0).toBytes());
+ assertTrue(reader.read(1).isNullAt(0));
+ assertEquals(0, reader.read(2).getString(0).toBytes().length);
+
+ reader.close();
+ root.close();
+ }
+ }
+
+ @Test
+ public void testVarBinaryVector() {
+ try (BufferAllocator allocator =
+
FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testVarBinary", 0,
Long.MAX_VALUE)) {
+ VarBinaryVector vec = new VarBinaryVector("col", allocator);
+ vec.allocateNew(3);
+ vec.setSafe(0, new byte[] {0x01, 0x02});
+ vec.setNull(1);
+ vec.setSafe(2, new byte[] {});
+ vec.setValueCount(3);
+
+ VectorSchemaRoot root = new
VectorSchemaRoot(Collections.singletonList(vec));
+ RowType rowType = RowType.of(new VarBinaryType(100));
+ FlinkArrowReader reader = FlinkArrowReader.create(root, rowType);
+
+ assertArrayEquals(new byte[] {0x01, 0x02},
reader.read(0).getBinary(0));
+ assertTrue(reader.read(1).isNullAt(0));
+ assertArrayEquals(new byte[] {}, reader.read(2).getBinary(0));
+
+ reader.close();
+ root.close();
+ }
+ }
+
+ @Test
+ public void testDecimalVector() {
+ try (BufferAllocator allocator =
+
FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testDecimal", 0,
Long.MAX_VALUE)) {
+ // Compact path: precision 10 (<= 18)
+ DecimalVector compactVec = new DecimalVector("compact", allocator,
10, 2);
+ compactVec.allocateNew(2);
+ compactVec.setSafe(0, new BigDecimal("123.45"));
+ compactVec.setNull(1);
+ compactVec.setValueCount(2);
+
+ VectorSchemaRoot compactRoot = new
VectorSchemaRoot(Collections.singletonList(compactVec));
+ RowType compactType = RowType.of(new DecimalType(10, 2));
+ FlinkArrowReader compactReader =
FlinkArrowReader.create(compactRoot, compactType);
+
+ DecimalData compactVal = compactReader.read(0).getDecimal(0, 10,
2);
+ assertEquals(new BigDecimal("123.45"), compactVal.toBigDecimal());
+ assertTrue(compactReader.read(1).isNullAt(0));
+
+ compactReader.close();
+ compactRoot.close();
+
+ // Wide path: precision 20 (> 18)
+ DecimalVector wideVec = new DecimalVector("wide", allocator, 20,
2);
+ wideVec.allocateNew(1);
+ wideVec.setSafe(0, new BigDecimal("123456789012345678.90"));
+ wideVec.setValueCount(1);
+
+ VectorSchemaRoot wideRoot = new
VectorSchemaRoot(Collections.singletonList(wideVec));
+ RowType wideType = RowType.of(new DecimalType(20, 2));
+ FlinkArrowReader wideReader = FlinkArrowReader.create(wideRoot,
wideType);
+
+ DecimalData wideVal = wideReader.read(0).getDecimal(0, 20, 2);
+ assertEquals(0, new
BigDecimal("123456789012345678.90").compareTo(wideVal.toBigDecimal()));
+
+ wideReader.close();
+ wideRoot.close();
+ }
+ }
+
+ @Test
+ public void testDateVector() {
+ try (BufferAllocator allocator =
+ FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testDate",
0, Long.MAX_VALUE)) {
+ DateDayVector vec = new DateDayVector("col", allocator);
+ vec.allocateNew(3);
+ vec.setSafe(0, 18000);
+ vec.setNull(1);
+ vec.setSafe(2, 0);
+ vec.setValueCount(3);
+
+ VectorSchemaRoot root = new
VectorSchemaRoot(Collections.singletonList(vec));
+ RowType rowType = RowType.of(new DateType());
+ FlinkArrowReader reader = FlinkArrowReader.create(root, rowType);
+
+ assertEquals(18000, reader.read(0).getInt(0));
+ assertTrue(reader.read(1).isNullAt(0));
+ assertEquals(0, reader.read(2).getInt(0));
+
+ reader.close();
+ root.close();
+ }
+ }
+
+ @Test
+ public void testTimeVector() {
+ try (BufferAllocator allocator =
+ FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testTime",
0, Long.MAX_VALUE)) {
+ TimeMicroVector vec = new TimeMicroVector("col", allocator);
+ vec.allocateNew(3);
+ vec.setSafe(0, 45_296_000_000L); // 45296000000 micros -> 45296000
millis
+ vec.setNull(1);
+ vec.setSafe(2, 0L);
+ vec.setValueCount(3);
+
+ VectorSchemaRoot root = new
VectorSchemaRoot(Collections.singletonList(vec));
+ RowType rowType = RowType.of(new TimeType(6));
+ FlinkArrowReader reader = FlinkArrowReader.create(root, rowType);
+
+ // ArrowTimeColumnVector divides micros by 1000 to get millis
+ assertEquals(45_296_000, reader.read(0).getInt(0));
+ assertTrue(reader.read(1).isNullAt(0));
+ assertEquals(0, reader.read(2).getInt(0));
+
+ reader.close();
+ root.close();
+ }
+ }
+
+ @Test
+ public void testTimestampVector() {
+ try (BufferAllocator allocator =
+
FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testTimestamp", 0,
Long.MAX_VALUE)) {
+ TimeStampMicroVector vec = new TimeStampMicroVector("col",
allocator);
+ vec.allocateNew(2);
+ vec.setSafe(0, 1_672_531_200_000_123L); //
2023-01-01T00:00:00.000123
+ vec.setNull(1);
+ vec.setValueCount(2);
+
+ VectorSchemaRoot root = new
VectorSchemaRoot(Collections.singletonList(vec));
+ RowType rowType = RowType.of(new TimestampType(6));
+ FlinkArrowReader reader = FlinkArrowReader.create(root, rowType);
+
+ TimestampData ts = reader.read(0).getTimestamp(0, 6);
+ // millis = 1672531200000123 / 1000 = 1672531200000
+ assertEquals(1_672_531_200_000L, ts.getMillisecond());
+ // nanoOfMillisecond = (1672531200000123 % 1000) * 1000 = 123 *
1000 = 123000
+ assertEquals(123_000, ts.getNanoOfMillisecond());
+ assertTrue(reader.read(1).isNullAt(0));
+
+ reader.close();
+ root.close();
+ }
+ }
+
+ @Test
+ public void testTimestampLtzVector() {
+ try (BufferAllocator allocator =
+
FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testTimestampLtz", 0,
Long.MAX_VALUE)) {
+ TimeStampMicroTZVector vec = new TimeStampMicroTZVector("col",
allocator, "UTC");
+ vec.allocateNew(2);
+ vec.setSafe(0, 1_672_531_200_000_123L);
+ vec.setNull(1);
+ vec.setValueCount(2);
+
+ VectorSchemaRoot root = new
VectorSchemaRoot(Collections.singletonList(vec));
+ RowType rowType = RowType.of(new LocalZonedTimestampType(6));
+ FlinkArrowReader reader = FlinkArrowReader.create(root, rowType);
+
+ TimestampData ts = reader.read(0).getTimestamp(0, 6);
+ assertEquals(1_672_531_200_000L, ts.getMillisecond());
+ assertEquals(123_000, ts.getNanoOfMillisecond());
+ assertTrue(reader.read(1).isNullAt(0));
+
+ reader.close();
+ root.close();
+ }
+ }
+
+ @Test
+ public void testNegativeTimestamp() {
+ try (BufferAllocator allocator =
+ FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testNegTs",
0, Long.MAX_VALUE)) {
+ TimeStampMicroVector vec = new TimeStampMicroVector("col",
allocator);
+ vec.allocateNew(1);
+ vec.setSafe(0, -1500L); // 1.5 millis before epoch
+ vec.setValueCount(1);
+
+ VectorSchemaRoot root = new
VectorSchemaRoot(Collections.singletonList(vec));
+ RowType rowType = RowType.of(new TimestampType(6));
+ FlinkArrowReader reader = FlinkArrowReader.create(root, rowType);
+
+ TimestampData ts = reader.read(0).getTimestamp(0, 6);
+ // floorDiv(-1500, 1000) = -2, floorMod(-1500, 1000) = 500
+ assertEquals(-2L, ts.getMillisecond());
+ assertEquals(500_000, ts.getNanoOfMillisecond());
+
+ reader.close();
+ root.close();
+ }
+ }
+
+ @Test
+ public void testArrayVector() {
+ try (BufferAllocator allocator =
+ FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testArray",
0, Long.MAX_VALUE)) {
+ ListVector listVector = ListVector.empty("col", allocator);
+ listVector.addOrGetVector(FieldType.nullable(new ArrowType.Int(32,
true)));
+ listVector.allocateNew();
+
+ UnionListWriter writer = listVector.getWriter();
+ // Row 0: [1, 2, 3]
+ writer.setPosition(0);
+ writer.startList();
+ writer.writeInt(1);
+ writer.writeInt(2);
+ writer.writeInt(3);
+ writer.endList();
+ // Row 1: null (set after writer completes)
+ writer.setPosition(1);
+ writer.startList();
+ writer.endList();
+ // Row 2: []
+ writer.setPosition(2);
+ writer.startList();
+ writer.endList();
+ writer.setValueCount(3);
+ listVector.setNull(1);
+
+ VectorSchemaRoot root = new
VectorSchemaRoot(Collections.singletonList(listVector));
+ RowType rowType = RowType.of(new ArrayType(new IntType()));
+ FlinkArrowReader reader = FlinkArrowReader.create(root, rowType);
+
+ // Row 0: [1, 2, 3]
+ ArrayData arr0 = reader.read(0).getArray(0);
+ assertEquals(3, arr0.size());
+ assertEquals(1, arr0.getInt(0));
+ assertEquals(2, arr0.getInt(1));
+ assertEquals(3, arr0.getInt(2));
+
+ // Row 1: null
+ assertTrue(reader.read(1).isNullAt(0));
+
+ // Row 2: []
+ ArrayData arr2 = reader.read(2).getArray(0);
+ assertEquals(0, arr2.size());
+
+ reader.close();
+ root.close();
+ }
+ }
+
+ @Test
+ public void testMapVector() {
+ try (BufferAllocator allocator =
+ FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testMap", 0,
Long.MAX_VALUE)) {
+ MapVector mapVector = MapVector.empty("col", allocator, false);
+ mapVector.allocateNew();
+
+ BaseWriter.MapWriter mapWriter = mapVector.getWriter();
+ // Row 0: {"a" -> 1}
+ mapWriter.setPosition(0);
+ mapWriter.startMap();
+ mapWriter.startEntry();
+ byte[] keyBytes = "a".getBytes(StandardCharsets.UTF_8);
+ ArrowBuf keyBuf = allocator.buffer(keyBytes.length);
+ keyBuf.setBytes(0, keyBytes);
+ mapWriter.key().varChar().writeVarChar(0, keyBytes.length, keyBuf);
+ keyBuf.close();
+ mapWriter.value().integer().writeInt(1);
+ mapWriter.endEntry();
+ mapWriter.endMap();
+ // Row 1: null (write empty map, then mark null)
+ mapWriter.setPosition(1);
+ mapWriter.startMap();
+ mapWriter.endMap();
+ // Row 2: {} (empty map)
+ mapWriter.setPosition(2);
+ mapWriter.startMap();
+ mapWriter.endMap();
+ mapVector.setValueCount(3);
+ mapVector.setNull(1);
+
+ VectorSchemaRoot root = new
VectorSchemaRoot(Collections.singletonList(mapVector));
+ RowType rowType = RowType.of(new MapType(new VarCharType(100), new
IntType()));
+ FlinkArrowReader reader = FlinkArrowReader.create(root, rowType);
+
+ // Row 0: {"a" -> 1}
+ MapData map0 = reader.read(0).getMap(0);
+ assertEquals(1, map0.size());
+
+ // Row 1: null
+ assertTrue(reader.read(1).isNullAt(0));
+
+ // Row 2: empty
+ MapData map2 = reader.read(2).getMap(0);
+ assertEquals(0, map2.size());
+
+ reader.close();
+ root.close();
+ }
+ }
+
+ @Test
+ public void testRowVector() {
+ try (BufferAllocator allocator =
+ FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testRow", 0,
Long.MAX_VALUE)) {
+ FieldType intFieldType = FieldType.nullable(new ArrowType.Int(32,
true));
+ FieldType utf8FieldType =
FieldType.nullable(ArrowType.Utf8.INSTANCE);
+ StructVector structVector = StructVector.empty("col", allocator);
+ structVector.addOrGet("f0", intFieldType, IntVector.class);
+ structVector.addOrGet("f1", utf8FieldType, VarCharVector.class);
+
+ IntVector intChild = (IntVector) structVector.getChild("f0");
+ VarCharVector strChild = (VarCharVector)
structVector.getChild("f1");
+
+ structVector.allocateNew();
+ structVector.setIndexDefined(0);
+ intChild.setSafe(0, 42);
+ strChild.setSafe(0, "hello".getBytes(StandardCharsets.UTF_8));
+ structVector.setNull(1);
+ intChild.setNull(1);
+ strChild.setNull(1);
+ structVector.setValueCount(2);
+ intChild.setValueCount(2);
+ strChild.setValueCount(2);
+
+ VectorSchemaRoot root = new
VectorSchemaRoot(Collections.singletonList(structVector));
+ RowType innerType =
+ RowType.of(new LogicalType[] {new IntType(), new
VarCharType(100)}, new String[] {"f0", "f1"});
+ RowType rowType = RowType.of(new LogicalType[] {innerType}, new
String[] {"col"});
+ FlinkArrowReader reader = FlinkArrowReader.create(root, rowType);
+
+ // Row 0: struct(42, "hello")
+ RowData nested0 = reader.read(0).getRow(0, 2);
+ assertEquals(42, nested0.getInt(0));
+ assertArrayEquals(
+ "hello".getBytes(StandardCharsets.UTF_8),
+ nested0.getString(1).toBytes());
+
+ // Row 1: null struct
+ assertTrue(reader.read(1).isNullAt(0));
+
+ reader.close();
+ root.close();
+ }
+ }
+
+ @Test
+ public void testMultiColumnBatch() {
+ try (BufferAllocator allocator =
+
FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testMultiCol", 0,
Long.MAX_VALUE)) {
+ IntVector intVec = new IntVector("id", allocator);
+ intVec.allocateNew(3);
+ intVec.setSafe(0, 1);
+ intVec.setSafe(1, 2);
+ intVec.setSafe(2, 3);
+ intVec.setValueCount(3);
+
+ VarCharVector strVec = new VarCharVector("name", allocator);
+ strVec.allocateNew(3);
+ strVec.setSafe(0, "alice".getBytes(StandardCharsets.UTF_8));
+ strVec.setSafe(1, "bob".getBytes(StandardCharsets.UTF_8));
+ strVec.setNull(2);
+ strVec.setValueCount(3);
+
+ BitVector boolVec = new BitVector("active", allocator);
+ boolVec.allocateNew(3);
+ boolVec.setSafe(0, 1);
+ boolVec.setSafe(1, 0);
+ boolVec.setSafe(2, 1);
+ boolVec.setValueCount(3);
+
+ List<FieldVector> vectors = Arrays.asList(intVec, strVec, boolVec);
+ VectorSchemaRoot root = new VectorSchemaRoot(vectors);
+ RowType rowType = RowType.of(
+ new LogicalType[] {new IntType(), new VarCharType(100),
new BooleanType()},
+ new String[] {"id", "name", "active"});
+ FlinkArrowReader reader = FlinkArrowReader.create(root, rowType);
+
+ assertEquals(3, reader.getRowCount());
+
+ RowData row0 = reader.read(0);
+ assertEquals(1, row0.getInt(0));
+ assertArrayEquals(
+ "alice".getBytes(StandardCharsets.UTF_8),
row0.getString(1).toBytes());
+ assertTrue(row0.getBoolean(2));
+
+ RowData row1 = reader.read(1);
+ assertEquals(2, row1.getInt(0));
+ assertArrayEquals(
+ "bob".getBytes(StandardCharsets.UTF_8),
row1.getString(1).toBytes());
+ assertFalse(row1.getBoolean(2));
+
+ RowData row2 = reader.read(2);
+ assertEquals(3, row2.getInt(0));
+ assertTrue(row2.isNullAt(1));
+ assertTrue(row2.getBoolean(2));
+
+ reader.close();
+ root.close();
+ }
+ }
+
+ @Test
+ public void testEmptyBatch() {
+ try (BufferAllocator allocator =
+ FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testEmpty",
0, Long.MAX_VALUE)) {
+ IntVector vec = new IntVector("col", allocator);
+ vec.allocateNew(0);
+ vec.setValueCount(0);
+
+ VectorSchemaRoot root = new
VectorSchemaRoot(Collections.singletonList(vec));
+ RowType rowType = RowType.of(new IntType());
+ FlinkArrowReader reader = FlinkArrowReader.create(root, rowType);
+
+ assertEquals(0, reader.getRowCount());
+
+ reader.close();
+ root.close();
+ }
+ }
+
+ @Test
+ public void testResetWithNewRoot() {
+ try (BufferAllocator allocator =
+ FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testReset",
0, Long.MAX_VALUE)) {
+ // Batch 1
+ IntVector vec1 = new IntVector("col", allocator);
+ vec1.allocateNew(2);
+ vec1.setSafe(0, 10);
+ vec1.setSafe(1, 20);
+ vec1.setValueCount(2);
+ VectorSchemaRoot root1 = new
VectorSchemaRoot(Collections.singletonList(vec1));
+
+ RowType rowType = RowType.of(new IntType());
+ FlinkArrowReader reader = FlinkArrowReader.create(root1, rowType);
+
+ assertEquals(10, reader.read(0).getInt(0));
+ assertEquals(20, reader.read(1).getInt(0));
+
+ // Batch 2 — different values, same schema
+ IntVector vec2 = new IntVector("col", allocator);
+ vec2.allocateNew(2);
+ vec2.setSafe(0, 99);
+ vec2.setSafe(1, 100);
+ vec2.setValueCount(2);
+ VectorSchemaRoot root2 = new
VectorSchemaRoot(Collections.singletonList(vec2));
+
+ reader.reset(root2);
+ assertEquals(2, reader.getRowCount());
+ assertEquals(99, reader.read(0).getInt(0));
+ assertEquals(100, reader.read(1).getInt(0));
+
+ reader.close();
+ root1.close();
+ root2.close();
+ }
+ }
+
+ @Test
+ public void testUnsupportedTypeThrows() {
+ try (BufferAllocator allocator =
+
FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testUnsupported", 0,
Long.MAX_VALUE)) {
+ IntVector vec = new IntVector("col", allocator);
+ vec.allocateNew(1);
+ vec.setSafe(0, 1);
+ vec.setValueCount(1);
+
+ assertThrows(
+ UnsupportedOperationException.class,
+ () -> FlinkArrowReader.createColumnVector(
+ vec, new RawType<>(String.class,
StringSerializer.INSTANCE)));
+
+ vec.close();
+ }
+ }
+}