weiqingy commented on code in PR #2063:
URL: https://github.com/apache/auron/pull/2063#discussion_r2879734624


##########
auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowReader.java:
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow;
+
+import java.util.List;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.TimeMicroVector;
+import org.apache.arrow.vector.TimeStampVector;
+import org.apache.arrow.vector.TinyIntVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.ListVector;
+import org.apache.arrow.vector.complex.MapVector;
+import org.apache.arrow.vector.complex.StructVector;
+import org.apache.auron.flink.arrow.vectors.ArrowArrayColumnVector;
+import org.apache.auron.flink.arrow.vectors.ArrowBigIntColumnVector;
+import org.apache.auron.flink.arrow.vectors.ArrowBooleanColumnVector;
+import org.apache.auron.flink.arrow.vectors.ArrowDateColumnVector;
+import org.apache.auron.flink.arrow.vectors.ArrowDecimalColumnVector;
+import org.apache.auron.flink.arrow.vectors.ArrowDoubleColumnVector;
+import org.apache.auron.flink.arrow.vectors.ArrowFloatColumnVector;
+import org.apache.auron.flink.arrow.vectors.ArrowIntColumnVector;
+import org.apache.auron.flink.arrow.vectors.ArrowMapColumnVector;
+import org.apache.auron.flink.arrow.vectors.ArrowRowColumnVector;
+import org.apache.auron.flink.arrow.vectors.ArrowSmallIntColumnVector;
+import org.apache.auron.flink.arrow.vectors.ArrowTimeColumnVector;
+import org.apache.auron.flink.arrow.vectors.ArrowTimestampColumnVector;
+import org.apache.auron.flink.arrow.vectors.ArrowTinyIntColumnVector;
+import org.apache.auron.flink.arrow.vectors.ArrowVarBinaryColumnVector;
+import org.apache.auron.flink.arrow.vectors.ArrowVarCharColumnVector;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.columnar.ColumnarRowData;
+import org.apache.flink.table.data.columnar.vector.ColumnVector;
+import org.apache.flink.table.data.columnar.vector.VectorizedColumnBatch;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Reads Arrow {@link VectorSchemaRoot} data as Flink {@link RowData}.
+ *
+ * <p>Uses Flink's columnar data structures ({@link ColumnarRowData} backed by 
{@link
+ * VectorizedColumnBatch}) for zero-copy access to Arrow vectors. Each Arrow 
{@link FieldVector} is
+ * wrapped in a Flink {@link ColumnVector} implementation that delegates reads 
directly to the
+ * underlying Arrow vector.
+ *
+ * <p>Object reuse: {@link #read(int)} returns the same {@link 
ColumnarRowData} instance with a
+ * different row ID. Callers must consume or copy the returned row before the 
next call. This is
+ * standard Flink practice for columnar readers.
+ *
+ * <p>Implements {@link AutoCloseable} for use in resource management blocks. 
Note: this does NOT
+ * close the underlying {@link VectorSchemaRoot}. The caller that created the 
root is responsible
+ * for its lifecycle.
+ */
+public class FlinkArrowReader implements AutoCloseable {
+
+    private final RowType rowType;
+    private ColumnVector[] columnVectors;
+    private VectorizedColumnBatch batch;
+    private ColumnarRowData reusableRow;
+    private VectorSchemaRoot root;
+
+    private FlinkArrowReader(ColumnVector[] columnVectors, VectorSchemaRoot 
root, RowType rowType) {
+        this.columnVectors = columnVectors;
+        this.batch = new VectorizedColumnBatch(columnVectors);
+        this.reusableRow = new ColumnarRowData(batch);
+        this.root = root;
+        this.rowType = rowType;
+    }
+
+    /**
+     * Creates a {@link FlinkArrowReader} from a {@link VectorSchemaRoot} and 
{@link RowType}.
+     *
+     * <p>The RowType must match the schema of the VectorSchemaRoot (same 
number of fields, matching
+     * types). Each Arrow field vector is wrapped in the appropriate Flink 
{@link ColumnVector}
+     * implementation based on the corresponding Flink {@link LogicalType}.
+     *
+     * @param root the Arrow VectorSchemaRoot containing the data
+     * @param rowType the Flink RowType describing the schema
+     * @return a new FlinkArrowReader
+     * @throws IllegalArgumentException if field counts do not match
+     * @throws UnsupportedOperationException if a LogicalType is not supported
+     */
+    public static FlinkArrowReader create(VectorSchemaRoot root, RowType 
rowType) {
+        Preconditions.checkNotNull(root, "root must not be null");
+        Preconditions.checkNotNull(rowType, "rowType must not be null");
+        List<FieldVector> fieldVectors = root.getFieldVectors();
+        List<RowType.RowField> fields = rowType.getFields();
+        if (fieldVectors.size() != fields.size()) {
+            throw new IllegalArgumentException(
+                    "VectorSchemaRoot has " + fieldVectors.size() + " fields 
but RowType has " + fields.size());
+        }
+        ColumnVector[] columns = new ColumnVector[fieldVectors.size()];
+        for (int i = 0; i < fieldVectors.size(); i++) {
+            columns[i] = createColumnVector(fieldVectors.get(i), 
fields.get(i).getType());
+        }
+        return new FlinkArrowReader(columns, root, rowType);
+    }
+
+    /**
+     * Reads a row at the given position. Returns a reused {@link RowData} 
object — callers must not
+     * hold references across calls.
+     *
+     * @param rowId the row index within the current batch
+     * @return the row data at the given position
+     */
+    public RowData read(int rowId) {
+        reusableRow.setRowId(rowId);
+        return reusableRow;
+    }
+
+    /**
+     * Returns the number of rows in the current batch.
+     *
+     * @return the row count
+     */
+    public int getRowCount() {
+        return root.getRowCount();
+    }
+
+    /**
+     * Resets the reader to use a new {@link VectorSchemaRoot} with the same 
schema. Recreates
+     * column vector wrappers for the new root's field vectors.
+     *
+     * <p>The new root must have the same schema (same number and types of 
fields) as the original.
+     *
+     * @param newRoot the new VectorSchemaRoot, must not be null
+     */
+    public void reset(VectorSchemaRoot newRoot) {
+        Preconditions.checkNotNull(newRoot, "newRoot must not be null");
+        this.root = newRoot;
+        List<FieldVector> newVectors = newRoot.getFieldVectors();
+        Preconditions.checkArgument(
+                newVectors.size() == columnVectors.length,
+                "New root has %s fields but reader expects %s",
+                newVectors.size(),
+                columnVectors.length);
+        List<RowType.RowField> fields = rowType.getFields();
+        for (int i = 0; i < columnVectors.length; i++) {
+            columnVectors[i] =
+                    createColumnVector(newVectors.get(i), 
fields.get(i).getType());
+        }
+        this.batch = new VectorizedColumnBatch(columnVectors);
+        this.reusableRow = new ColumnarRowData(batch);
+    }
+
+    /**
+     * Implements {@link AutoCloseable} for use in resource management blocks. 
Note: this does NOT
+     * close the underlying {@link VectorSchemaRoot}. The caller that created 
the root is responsible
+     * for its lifecycle.
+     */
+    @Override
+    public void close() {
+        // Reader is a view; root lifecycle managed by caller.
+    }
+
+    /**
+     * Creates the appropriate Flink {@link ColumnVector} wrapper for the 
given Arrow {@link
+     * FieldVector} and Flink {@link LogicalType}.
+     */
+    static ColumnVector createColumnVector(FieldVector vector, LogicalType 
type) {
+        switch (type.getTypeRoot()) {
+            case BOOLEAN:
+                return new ArrowBooleanColumnVector((BitVector) vector);
+            case TINYINT:
+                return new ArrowTinyIntColumnVector((TinyIntVector) vector);
+            case SMALLINT:
+                return new ArrowSmallIntColumnVector((SmallIntVector) vector);
+            case INTEGER:
+                return new ArrowIntColumnVector((IntVector) vector);
+            case BIGINT:
+                return new ArrowBigIntColumnVector((BigIntVector) vector);
+            case FLOAT:
+                return new ArrowFloatColumnVector((Float4Vector) vector);
+            case DOUBLE:
+                return new ArrowDoubleColumnVector((Float8Vector) vector);
+            case VARCHAR:
+            case CHAR:
+                return new ArrowVarCharColumnVector((VarCharVector) vector);
+            case VARBINARY:
+            case BINARY:
+                return new ArrowVarBinaryColumnVector((VarBinaryVector) 
vector);
+            case DECIMAL:
+                return new ArrowDecimalColumnVector((DecimalVector) vector);
+            case DATE:
+                return new ArrowDateColumnVector((DateDayVector) vector);
+            case TIME_WITHOUT_TIME_ZONE:
+                // The writer (FlinkArrowFieldWriter) normalizes all TIME 
values to microseconds
+                // in a TimeMicroVector, regardless of the declared Flink TIME 
precision.
+                return new ArrowTimeColumnVector((TimeMicroVector) vector);
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                // The writer normalizes all timestamps to microseconds. 
TimeStampVector is the
+                // common parent of TimeStampMicroVector and 
TimeStampMicroTZVector.
+                return new ArrowTimestampColumnVector((TimeStampVector) 
vector);

Review Comment:
   Good catch on the FlinkArrowUtils.toArrowType() mapping. The reader is 
designed as the inverse of the
     writer (PR #1930), which per the design discussion normalizes all 
TIME/TIMESTAMP values to microsecond
     precision in Arrow, regardless of the declared Flink precision. This is 
documented in the design doc
     (section 3.3) and in the code comments on the TIME/TIMESTAMP switch cases.
   
     That said, this coupling to the writer's normalization behavior is worth 
highlighting. If the writer's
     approach changes to preserve precision-dependent time units, the reader 
would need to be updated to
     match. Will need to confirm with @x-tong on PR #1930 that the writer does 
indeed normalize to microseconds.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to