This is an automated email from the ASF dual-hosted git repository.

github-merge-queue[bot] pushed a commit to branch 
gh-readonly-queue/main/pr-22026-81cdb1aa503c401caedf5f48ac69fd24865cc7df
in repository https://gitbox.apache.org/repos/asf/datafusion.git

commit 2888bce68bc4ca745ce0c47dc004e7b71f91b0a4
Author: Matt Butrovich <[email protected]>
AuthorDate: Thu May 28 11:56:19 2026 -0400

    feat: Plumb Parquet virtual columns (row_number) through TableSchema and 
ParquetOpener (#22026)
    
    ## Which issue does this PR close?
    
    - Part of #20135 (epic: virtual / metadata columns). Does not close that
    epic; see [this
    
comment](https://github.com/apache/datafusion/issues/20135#issuecomment-4381652523)
    describing the scope split.
    - Revives #20133 (auto-closed stale) — same core plumbing, credit to
    @jkylling.
    - Unblocks apache/datafusion-comet#3432 (remove native_datafusion
    fallback for Spark's `_tmp_metadata_row_index`).
    
    ## Rationale for this change
    
    arrow-rs 57.1.0+ supports Parquet virtual columns (`row_number`,
    `row_group_index`) via `ArrowReaderOptions::with_virtual_columns`, and
    DataFusion pins a new-enough arrow-rs for the API to be available.
    DataFusion does not yet plumb the option through `ParquetOpener`, so
    consumers (notably Comet) cannot project Spark's
    `_tmp_metadata_row_index` through the native_datafusion scan path.
    
    This PR adds the minimal opener-boundary plumbing so `TableSchema` can
    carry virtual columns and the Parquet reader produces them. UX /
    SQL-layer surface for virtual columns stays deferred to the epic in
    #20135 — this follows the same framing alamb blessed for #20071 (the
    `input_file_name()` UDF).
    
    ## What changes are included in this PR?
    
    ### `TableSchema` / `TableSchemaBuilder`
    
    - `TableSchemaBuilder::with_virtual_columns(impl Into<Fields>)` setter,
    picking up #22496's "follow-up" hook: build with `[file, partition,
    virtual]` ordering in a single concatenation. Setter order on the
    builder does not matter; the layout is fixed.
    - Virtual columns are stored as `arrow::datatypes::Fields` (matches the
    `table_partition_cols` storage main switched to in #22496 — no
    `Arc<Vec<_>>` indirection, shareable zero-copy, immutable so the
    in-place mutation panic class is structurally impossible).
    - `TableSchema::virtual_columns()` getter (`&Fields`).
    - `TableSchema::schema_without_virtual_columns()` — file + partition
    schema used by pushdown-planning paths that can't evaluate virtual-col
    refs.
    - `TableSchema::with_virtual_columns(...)` chainable convenience
    preserved for API ergonomics; routes through the builder so it preserves
    any partition columns already on the source `TableSchema`.
    - The deprecated `TableSchema::with_table_partition_cols` was extended
    to preserve virtual columns when routing through the builder (it would
    otherwise drop them for callers still on the deprecated path).
    - Collision check (`virtual` vs `file`, `virtual` vs `partition`,
    duplicates within `virtual`) lives in `TableSchemaBuilder::build()` as a
    `debug_assert!` so release builds pay nothing; setter order is
    irrelevant because the check runs at `build()`.
    
    ### `ParquetOpener` / `ParquetSource`
    
    - `ParquetOpener` forwards the fields to
    `ArrowReaderOptions::with_virtual_columns`; augments the schemas passed
    to the expr-adapter / simplifier with virtual fields so virtual-col refs
    identity-rewrite; the virtual-col stripping (substitute with null
    literals for `ProjectionMask::roots`, append to `stream_schema` so
    `reassign_expr_columns` resolves them by name) lives inside
    `DecoderProjection::try_new` (the abstraction #22398 introduced),
    reached via a new `Option<&VirtualColumnsState>` parameter so the
    zero-virtual-column common path is unchanged.
    - New `ParquetVirtualColumn` enum with `TryFrom<&FieldRef>` (in
    `datasource-parquet::virtual_column`) gates which arrow-rs virtual
    extension types are accepted. Currently only `RowNumber`; adding a
    variant (e.g. `RowGroupIndex`) is a compile-time obligation. Replaces a
    runtime string-allowlist so the contract lives in the type system.
    - `ParquetSource::try_pushdown_filters` classifies filters against
    `schema_without_virtual_columns()` so predicates referencing virtual
    columns are reported as `PushedDown::No` and the `FilterExec` stays
    above the scan — arrow-rs's `RowFilter` addresses parquet leaves only
    and can't evaluate virtual-column refs, so silently pushing them would
    produce wrong results.
    - Defensive check in the opener: `build_virtual_columns_state` (run once
    per scan partition at morselizer-build time) errors when
    `pushdown_filters=true` and the predicate references a virtual column,
    with a clear remediation message pointing at `try_pushdown_filters`.
    This catches callers that bypass the optimizer and set the predicate on
    `ParquetSource` directly. Returns a `Result` (not a panic) so the
    contract is enforced in release builds too.
    - `VirtualColumnsState` is constructed once per scan partition:
    validates the extension-type allowlist, precomputes the
    `null_replacements` HashMap and the `logical_schema_with_virtual`
    schema. Each file's open path then borrows the precomputed state via
    `Arc`.
    
    ### Cargo
    
    - `arrow-schema` added as a direct dep (previously transitive via
    `arrow`) so the enum references `RowNumber::NAME` from arrow-rs (via
    `arrow_schema::extension::ExtensionType`) instead of hardcoding the
    string.
    
    ### Explicitly **not** in scope (follow-ups)
    
    - `ListingTable` / SQL-layer surface
    - `ParquetSource::with_virtual_columns`
    - `RowGroupIndex` support (the enum has a deliberate-rejection test for
    it)
    - Removing the `TableSchema` chainable convenience setter; it could be
    deprecated in a follow-up to align fully with #22496's builder-only
    direction
    
    ## Are these changes tested?
    
    Yes.
    
    **`opener/mod.rs`** (10 new tests, in a `virtual_columns` submodule):
    
    - `test_row_index_basic` — single row group, select data + row_number.
    - `test_row_index_projection_only` — select only row_number.
    - `test_row_index_multi_row_group` — 3 × 100 rows, verify absolute
    0..300 across boundaries.
    - `test_row_index_with_row_group_skip` — predicate stats-prunes the
    middle row group; verify row numbers stay absolute (0..100 ++ 200..300).
    Critical correctness gate for Spark (and for apache/arrow-rs#8863).
    - `test_row_index_with_partition_cols` — partition + virtual + data
    columns compose correctly.
    - `test_row_index_nullable_int64` — nullability flag flows through
    unchanged (matches Spark's `_tmp_metadata_row_index` declaration).
    - `test_unsupported_virtual_extension_type_rejected` — using
    `RowGroupIndex` (a real arrow-rs type deliberately not in the enum yet)
    errors with `NotImplemented` instead of silently forwarding.
    - `test_row_index_predicate_pushdown_mixed_or_errors` /
    `_virtual_only_errors` / `_allowed_when_pushdown_disabled` — exercise
    the opener's defensive check for virtual-col predicate refs with
    `pushdown_filters=true`, and confirm the `pushdown_filters=false` path
    is unaffected.
    
    **`source.rs`**: `test_try_pushdown_filters_rejects_virtual_column_refs`
    pins the planner-boundary contract — file-col filters are
    `PushedDown::Yes`, virtual-only and mixed `OR` filters are
    `PushedDown::No`.
    
    **`virtual_column.rs`** (3 new tests): `TryFrom<&FieldRef>` for valid
    `RowNumber`, missing-extension-type, and unsupported-extension-type
    (real `RowGroupIndex`) inputs.
    
    **`table_schema.rs`** (5 new tests): `[file, partition, virtual]` layout
    regardless of builder-call order; `debug_assert!` collision panics for
    virtual-vs-file, virtual-vs-partition (both setter orderings), and
    duplicates within virtual.
    
    `cargo test -p datafusion-datasource-parquet --all-features` (137
    passing) and `cargo test -p datafusion-datasource` (148 passing). `cargo
    clippy -p datafusion-datasource-parquet -p datafusion-datasource
    --all-targets --all-features -- -D warnings` is clean.
    
    ## Are there any user-facing changes?
    
    Public API additions:
    
    - `TableSchemaBuilder::with_virtual_columns(impl Into<Fields>)`
    - `TableSchema::with_virtual_columns(Vec<FieldRef>)` (chainable
    convenience that routes through the builder)
    - `TableSchema::virtual_columns() -> &Fields`
    - `TableSchema::schema_without_virtual_columns() -> SchemaRef`
    - `ParquetVirtualColumn` (re-exported from
    `datafusion-datasource-parquet`)
    
    No breaking changes; no existing API changed.
---
 Cargo.lock                                         |   1 +
 datafusion/datasource-parquet/Cargo.toml           |   1 +
 .../datasource-parquet/src/decoder_projection.rs   |  33 +-
 datafusion/datasource-parquet/src/mod.rs           |   2 +
 datafusion/datasource-parquet/src/opener/mod.rs    | 695 ++++++++++++++++++++-
 datafusion/datasource-parquet/src/source.rs        |  96 ++-
 .../datasource-parquet/src/virtual_column.rs       | 125 ++++
 datafusion/datasource/src/table_schema.rs          | 243 ++++++-
 8 files changed, 1160 insertions(+), 36 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 66aef04c92..b0370a3e1b 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2013,6 +2013,7 @@ name = "datafusion-datasource-parquet"
 version = "53.1.0"
 dependencies = [
  "arrow",
+ "arrow-schema",
  "async-trait",
  "bytes",
  "chrono",
diff --git a/datafusion/datasource-parquet/Cargo.toml 
b/datafusion/datasource-parquet/Cargo.toml
index a5855af17a..8aa6ca1f97 100644
--- a/datafusion/datasource-parquet/Cargo.toml
+++ b/datafusion/datasource-parquet/Cargo.toml
@@ -32,6 +32,7 @@ all-features = true
 
 [dependencies]
 arrow = { workspace = true }
+arrow-schema = { workspace = true }
 async-trait = { workspace = true }
 bytes = { workspace = true }
 datafusion-common = { workspace = true, features = ["object_store", "parquet"] 
}
diff --git a/datafusion/datasource-parquet/src/decoder_projection.rs 
b/datafusion/datasource-parquet/src/decoder_projection.rs
index dcf52a37d4..27a84f2f50 100644
--- a/datafusion/datasource-parquet/src/decoder_projection.rs
+++ b/datafusion/datasource-parquet/src/decoder_projection.rs
@@ -38,10 +38,12 @@ use arrow::datatypes::SchemaRef;
 use datafusion_common::Result;
 use datafusion_physical_expr::projection::{ProjectionExprs, Projector};
 use datafusion_physical_expr::utils::reassign_expr_columns;
+use datafusion_physical_expr_adapter::replace_columns_with_literals;
 
 use parquet::arrow::ProjectionMask;
 use parquet::schema::types::SchemaDescriptor;
 
+use crate::opener::{VirtualColumnsState, append_fields};
 use crate::row_filter::build_projection_read_plan;
 
 /// Per-file decoder projection: the [`ProjectionMask`] installed on every
@@ -68,19 +70,46 @@ impl DecoderProjection {
     /// adapted by the per-file expr adapter); `parquet_schema` is the
     /// corresponding parquet [`SchemaDescriptor`]. `output_schema` is what
     /// consumers of the scan stream expect.
+    ///
+    /// `virtual_state`, when present, describes virtual columns the reader
+    /// will append to each decoded batch (e.g. parquet `row_number`). Virtual
+    /// columns are stripped from the projection fed into
+    /// `build_projection_read_plan` (which only understands file columns) and
+    /// appended to the stream schema so the projector can resolve them.
     pub(crate) fn try_new(
         projection: &ProjectionExprs,
         physical_file_schema: &SchemaRef,
         parquet_schema: &SchemaDescriptor,
         output_schema: &SchemaRef,
+        virtual_state: Option<&VirtualColumnsState>,
     ) -> Result<Self> {
+        // Virtual columns are produced by the reader separately from the
+        // projection mask, so strip them from the expressions we feed into
+        // `build_projection_read_plan`. We substitute each virtual column
+        // reference with a null literal; that leaves the remaining Column
+        // refs (into `physical_file_schema`) intact for
+        // `ProjectionMask::roots`, which only understands file columns.
+        let projection_for_read_plan = match virtual_state {
+            None => projection.clone(),
+            Some(state) => projection.clone().try_map_exprs(|expr| {
+                replace_columns_with_literals(expr, state.null_replacements())
+            })?,
+        };
         let read_plan = build_projection_read_plan(
-            projection.expr_iter(),
+            projection_for_read_plan.expr_iter(),
             physical_file_schema,
             parquet_schema,
         );
 
-        let stream_schema = read_plan.projected_schema;
+        // The reader produces projected file columns followed by any virtual
+        // columns (`ArrowReaderOptions::with_virtual_columns` appends them to
+        // each decoded batch).
+        let stream_schema = match virtual_state {
+            Some(state) => {
+                append_fields(&read_plan.projected_schema, 
state.virtual_columns())
+            }
+            None => Arc::clone(&read_plan.projected_schema),
+        };
 
         // Rebase the projection onto the decoder's stream schema (column
         // indices change because the decoder yields only the masked columns).
diff --git a/datafusion/datasource-parquet/src/mod.rs 
b/datafusion/datasource-parquet/src/mod.rs
index cf1caf336f..bec0736366 100644
--- a/datafusion/datasource-parquet/src/mod.rs
+++ b/datafusion/datasource-parquet/src/mod.rs
@@ -43,6 +43,7 @@ pub mod source;
 mod supported_predicates;
 #[cfg(test)]
 mod test_util;
+mod virtual_column;
 mod writer;
 
 pub use access_plan::{ParquetAccessPlan, RowGroupAccess};
@@ -60,4 +61,5 @@ pub use schema_coercion::{
     transform_binary_to_string, transform_schema_to_view,
 };
 pub use sink::ParquetSink;
+pub use virtual_column::ParquetVirtualColumn;
 pub use writer::plan_to_parquet;
diff --git a/datafusion/datasource-parquet/src/opener/mod.rs 
b/datafusion/datasource-parquet/src/opener/mod.rs
index 09e7763877..c78e73119e 100644
--- a/datafusion/datasource-parquet/src/opener/mod.rs
+++ b/datafusion/datasource-parquet/src/opener/mod.rs
@@ -31,7 +31,7 @@ use crate::row_filter::RowFilterGenerator;
 use crate::row_group_filter::{BloomFilterStatistics, RowGroupAccessPlanFilter};
 use crate::{
     Int96Coercer, ParquetAccessPlan, ParquetFileMetrics, 
ParquetFileReaderFactory,
-    apply_file_schema_type_coercions,
+    ParquetVirtualColumn, apply_file_schema_type_coercions,
 };
 use arrow::array::RecordBatch;
 use arrow::datatypes::DataType;
@@ -44,12 +44,16 @@ use std::future::Future;
 use std::mem;
 use std::sync::Arc;
 
-use arrow::datatypes::{SchemaRef, TimeUnit};
+use arrow::datatypes::{FieldRef, Schema, SchemaRef, TimeUnit};
 #[cfg(feature = "parquet_encryption")]
 use datafusion_common::encryption::FileDecryptionProperties;
 use datafusion_common::stats::Precision;
-use datafusion_common::{ColumnStatistics, Result, ScalarValue, Statistics, 
exec_err};
+use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
+use datafusion_common::{
+    ColumnStatistics, HashSet, Result, ScalarValue, Statistics, exec_err, 
internal_err,
+};
 use datafusion_datasource::{PartitionedFile, TableSchema};
+use datafusion_physical_expr::expressions::Column;
 use datafusion_physical_expr::simplifier::PhysicalExprSimplifier;
 use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
 use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
@@ -74,6 +78,152 @@ use parquet::basic::Type;
 use parquet::bloom_filter::Sbbf;
 use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader};
 
+/// Morselizer-level state for virtual columns, precomputed once per scan
+/// partition so each file skips the validator walks, `null_replacements`
+/// rebuild, and one of the `append_fields` allocations.
+///
+/// Only constructed when the scan actually requests virtual columns;
+/// [`ParquetMorselizer`] and [`PreparedParquetOpen`] hold
+/// `Option<Arc<VirtualColumnsState>>` so the zero-virtual-column path (the
+/// common case) pays nothing.
+pub(crate) struct VirtualColumnsState {
+    /// Shared list of virtual column fields. Cloned as a `Vec` only at the
+    /// arrow-rs `with_virtual_columns` call site, which takes it by value.
+    virtual_columns: Arc<Vec<FieldRef>>,
+    /// Null-literal substitutions keyed by virtual column name, used to strip
+    /// virtual-column references from the projection fed into
+    /// `build_projection_read_plan` (which only understands file columns).
+    null_replacements: HashMap<String, ScalarValue>,
+    /// `logical_file_schema` with the virtual columns appended. Fed into the
+    /// per-file expression rewriter so virtual-column references
+    /// identity-rewrite instead of being replaced with null literals.
+    logical_schema_with_virtual: SchemaRef,
+}
+
+impl VirtualColumnsState {
+    /// Validate each field carries a supported arrow virtual extension type
+    /// and precompute the per-scan derived state.
+    fn try_new(
+        virtual_columns: Vec<FieldRef>,
+        logical_file_schema: &SchemaRef,
+    ) -> Result<Self> {
+        // Gate which extension types we forward to arrow-rs. Adding a new
+        // supported virtual column means adding a `ParquetVirtualColumn`
+        // variant — not editing a stringly-typed allowlist here.
+        for field in &virtual_columns {
+            ParquetVirtualColumn::try_from(field)?;
+        }
+        let null_replacements = virtual_columns
+            .iter()
+            .map(|f| ScalarValue::try_from(f.data_type()).map(|v| 
(f.name().clone(), v)))
+            .collect::<Result<HashMap<String, ScalarValue>>>()?;
+        let logical_schema_with_virtual =
+            append_fields(logical_file_schema, &virtual_columns);
+        Ok(Self {
+            virtual_columns: Arc::new(virtual_columns),
+            null_replacements,
+            logical_schema_with_virtual,
+        })
+    }
+
+    /// Validated virtual column fields, in declaration order.
+    pub(crate) fn virtual_columns(&self) -> &[FieldRef] {
+        &self.virtual_columns
+    }
+
+    /// Null-literal substitutions keyed by virtual column name. Used to strip
+    /// virtual-column references from a projection before it is fed into the
+    /// parquet `ProjectionMask` (which only understands file columns).
+    pub(crate) fn null_replacements(&self) -> &HashMap<String, ScalarValue> {
+        &self.null_replacements
+    }
+}
+
+/// Build the per-scan virtual-column state.
+///
+/// Two checks run here:
+/// - Extension-type allowlist via [`VirtualColumnsState::try_new`]: returns
+///   `Err` for unsupported virtual extension types.
+/// - Predicate-reference check (when pushdown is enabled): returns `Err` if
+///   the predicate references a virtual column. The contract is that callers
+///   route filters through
+///   [`ParquetSource::try_pushdown_filters`](crate::source::ParquetSource),
+///   which classifies virtual-col filters as `PushedDown::No`. Erroring here
+///   prevents silent wrong results for callers that bypass that path and set
+///   the predicate directly on `ParquetSource`.
+///
+/// Returns `None` when the scan has no virtual columns, so callers avoid
+/// allocating the shared state on the common path.
+pub(crate) fn build_virtual_columns_state(
+    virtual_columns: &[FieldRef],
+    logical_file_schema: &SchemaRef,
+    predicate: Option<&Arc<dyn PhysicalExpr>>,
+    pushdown_filters: bool,
+) -> Result<Option<Arc<VirtualColumnsState>>> {
+    if virtual_columns.is_empty() {
+        return Ok(None);
+    }
+    if pushdown_filters && let Some(predicate) = predicate {
+        validate_predicate_does_not_reference_virtual_columns(
+            predicate,
+            virtual_columns,
+        )?;
+    }
+    let state =
+        VirtualColumnsState::try_new(virtual_columns.to_vec(), 
logical_file_schema)?;
+    Ok(Some(Arc::new(state)))
+}
+
+/// Return `base` unchanged when `extra` is empty; otherwise build a new schema
+/// with `extra` appended to `base`'s fields.
+pub(crate) fn append_fields(base: &SchemaRef, extra: &[FieldRef]) -> SchemaRef 
{
+    if extra.is_empty() {
+        return Arc::clone(base);
+    }
+    let fields = base
+        .fields()
+        .iter()
+        .cloned()
+        .chain(extra.iter().cloned())
+        .collect::<Vec<_>>();
+    Arc::new(Schema::new(fields))
+}
+
+/// Reject predicates that reference a virtual column.
+///
+/// arrow-rs's `RowFilter` evaluates predicates against a `ProjectionMask` that
+/// addresses parquet leaves only; virtual columns (e.g. `row_number`) are
+/// synthesized by the reader *after* filter evaluation and cannot be 
referenced
+/// inside a row filter. Silently dropping such a predicate would produce wrong
+/// results.
+fn validate_predicate_does_not_reference_virtual_columns(
+    predicate: &Arc<dyn PhysicalExpr>,
+    virtual_columns: &[FieldRef],
+) -> Result<()> {
+    if virtual_columns.is_empty() {
+        return Ok(());
+    }
+    let virtual_names: HashSet<&str> =
+        virtual_columns.iter().map(|f| f.name().as_str()).collect();
+    let mut offender: Option<String> = None;
+    predicate.apply(|node: &Arc<dyn PhysicalExpr>| {
+        if let Some(column) = node.downcast_ref::<Column>()
+            && virtual_names.contains(column.name())
+        {
+            offender = Some(column.name().to_string());
+            return Ok(TreeNodeRecursion::Stop);
+        }
+        Ok(TreeNodeRecursion::Continue)
+    })?;
+    if let Some(name) = offender {
+        return internal_err!(
+            "Predicate references virtual column '{name}'; route via \
+             ParquetSource::try_pushdown_filters."
+        );
+    }
+    Ok(())
+}
+
 /// Stateless Parquet morselizer implementation.
 ///
 /// Reading a Parquet file is a multi-stage process, with multiple 
CPU-intensive
@@ -139,6 +289,9 @@ pub(super) struct ParquetMorselizer {
     pub reverse_row_groups: bool,
     /// Optional sort order used to reorder row groups by their min/max 
statistics.
     pub sort_order_for_reorder: Option<LexOrdering>,
+    /// Per-scan virtual-column state (validation already performed). `None`
+    /// when no virtual columns are requested — the common path.
+    pub(crate) virtual_state: Option<Arc<VirtualColumnsState>>,
 }
 
 impl fmt::Debug for ParquetMorselizer {
@@ -277,6 +430,11 @@ struct PreparedParquetOpen {
     output_schema: SchemaRef,
     projection: ProjectionExprs,
     predicate: Option<Arc<dyn PhysicalExpr>>,
+    /// Per-scan virtual-column state, Arc-cloned from [`ParquetMorselizer`] so
+    /// each file shares validated fields, precomputed null replacements, and
+    /// the logical-with-virtual schema. `None` when no virtual columns were
+    /// requested.
+    virtual_state: Option<Arc<VirtualColumnsState>>,
     reorder_predicates: bool,
     pushdown_filters: bool,
     force_filter_selections: bool,
@@ -649,6 +807,7 @@ impl ParquetMorselizer {
             output_schema,
             projection,
             predicate,
+            virtual_state: self.virtual_state.as_ref().map(Arc::clone),
             reorder_predicates: self.reorder_filters,
             pushdown_filters: self.pushdown_filters,
             force_filter_selections: self.force_filter_selections,
@@ -757,22 +916,22 @@ impl MetadataLoadedParquetOpen {
         // - The logical file schema: this is the table schema minus any hive 
partition columns and projections.
         //   This is what the physical file schema is coerced to.
         // - The physical file schema: this is the schema that the arrow-rs
-        //   parquet reader will actually produce.
+        //   parquet reader will actually produce for the file's columns. Any
+        //   virtual columns (see [`crate::TableSchema::virtual_columns`]) are
+        //   produced separately by the reader and are not part of this schema.
         let mut physical_file_schema = Arc::clone(reader_metadata.schema());
 
         // The schema loaded from the file may not be the same as the
         // desired schema (for example if we want to instruct the parquet
         // reader to read strings using Utf8View instead). Update if necessary
+        let mut metadata_dirty = false;
         if let Some(merged) = apply_file_schema_type_coercions(
             &prepared.logical_file_schema,
             &physical_file_schema,
         ) {
             physical_file_schema = Arc::new(merged);
             options = options.with_schema(Arc::clone(&physical_file_schema));
-            reader_metadata = ArrowReaderMetadata::try_new(
-                Arc::clone(reader_metadata.metadata()),
-                options.clone(),
-            )?;
+            metadata_dirty = true;
         }
 
         if let Some(ref coerce) = prepared.coerce_int96
@@ -786,6 +945,17 @@ impl MetadataLoadedParquetOpen {
         {
             physical_file_schema = Arc::new(merged);
             options = options.with_schema(Arc::clone(&physical_file_schema));
+            metadata_dirty = true;
+        }
+
+        // Arrow-rs appends virtual columns to the supplied schema internally,
+        // so any `with_schema` coercion above must stay limited to file 
columns.
+        if let Some(state) = prepared.virtual_state.as_ref() {
+            options = 
options.with_virtual_columns((*state.virtual_columns).clone())?;
+            metadata_dirty = true;
+        }
+
+        if metadata_dirty {
             reader_metadata = ArrowReaderMetadata::try_new(
                 Arc::clone(reader_metadata.metadata()),
                 options.clone(),
@@ -808,11 +978,32 @@ impl MetadataLoadedParquetOpen {
         let needs_rewrite = prepared.predicate.is_some()
             || prepared.logical_file_schema != physical_file_schema;
         if needs_rewrite {
+            // When virtual columns are requested, augment the logical and
+            // physical schemas passed to the rewriter/simplifier with those
+            // fields. The rewriter identity-rewrites references found in both
+            // schemas, keeping virtual-column references as `Column` rather
+            // than replacing them with null literals; the simplifier needs
+            // them present so it can resolve their data types while walking
+            // expression trees. We keep `physical_file_schema` itself as the
+            // pure file schema so downstream predicate pushdown, pruning, and
+            // row filter construction stay unaffected.
+            let (logical_for_rewrite, physical_for_rewrite) =
+                if let Some(state) = prepared.virtual_state.as_ref() {
+                    (
+                        Arc::clone(&state.logical_schema_with_virtual),
+                        append_fields(&physical_file_schema, 
&state.virtual_columns),
+                    )
+                } else {
+                    (
+                        Arc::clone(&prepared.logical_file_schema),
+                        Arc::clone(&physical_file_schema),
+                    )
+                };
             let rewriter = prepared.expr_adapter_factory.create(
-                Arc::clone(&prepared.logical_file_schema),
-                Arc::clone(&physical_file_schema),
+                Arc::clone(&logical_for_rewrite),
+                Arc::clone(&physical_for_rewrite),
             )?;
-            let simplifier = 
PhysicalExprSimplifier::new(&physical_file_schema);
+            let simplifier = 
PhysicalExprSimplifier::new(&physical_for_rewrite);
             prepared.predicate = prepared
                 .predicate
                 .map(|p| simplifier.simplify(rewriter.rewrite(p)?))
@@ -1156,6 +1347,7 @@ impl RowGroupsPrunedParquetOpen {
             &prepared.physical_file_schema,
             reader_metadata.parquet_schema(),
             &prepared.output_schema,
+            prepared.virtual_state.as_deref(),
         )?;
 
         let (decoder, pending_decoders, remaining_limit) = {
@@ -1505,12 +1697,28 @@ mod test {
             self
         }
 
-        /// Set projection by column indices (convenience method for common 
case).
+        /// Set projection by column indices.
+        ///
+        /// The indices are resolved against the **file schema**, not the full
+        /// table schema. Callers that need to project partition columns or
+        /// virtual columns must use [`Self::with_projection`] and construct a
+        /// [`ProjectionExprs`] against [`TableSchema::table_schema`].
         fn with_projection_indices(mut self, indices: &[usize]) -> Self {
             self.projection_indices = Some(indices.to_vec());
             self
         }
 
+        /// Set an explicit projection.
+        ///
+        /// Prefer this over [`Self::with_projection_indices`] whenever the
+        /// projection must reference partition or virtual columns, since
+        /// `with_projection_indices` resolves its indices against the file
+        /// schema only.
+        fn with_projection(mut self, projection: ProjectionExprs) -> Self {
+            self.projection = Some(projection);
+            self
+        }
+
         /// Set the predicate.
         fn with_predicate(mut self, predicate: Arc<dyn PhysicalExpr>) -> Self {
             self.predicate = Some(predicate);
@@ -1553,12 +1761,26 @@ mod test {
             self
         }
 
-        /// Build the ParquetMorselizer instance.
+        /// Build the ParquetMorselizer instance, unwrapping validation errors.
         ///
         /// # Panics
         ///
-        /// Panics if required fields (store, schema/table_schema) are not set.
+        /// Panics if required fields (store, schema/table_schema) are not set,
+        /// or if virtual-column validation fails. Use [`Self::try_build`]
+        /// when the test wants to assert on the validation error.
         fn build(self) -> ParquetMorselizer {
+            self.try_build().expect("ParquetMorselizerBuilder::build")
+        }
+
+        /// Build the ParquetMorselizer instance, returning any 
morselizer-level
+        /// validation error (e.g. unsupported virtual extension type, or a
+        /// predicate that references a virtual column with
+        /// `pushdown_filters=true`).
+        ///
+        /// # Panics
+        ///
+        /// Panics if required fields (store, schema/table_schema) are not set.
+        fn try_build(self) -> Result<ParquetMorselizer> {
             let store = self
                 .store
                 .expect("ParquetMorselizerBuilder: store must be set via 
with_store()");
@@ -1577,7 +1799,14 @@ mod test {
                 ProjectionExprs::from_indices(&all_indices, &file_schema)
             };
 
-            ParquetMorselizer {
+            let virtual_state = build_virtual_columns_state(
+                table_schema.virtual_columns(),
+                table_schema.file_schema(),
+                self.predicate.as_ref(),
+                self.pushdown_filters,
+            )?;
+
+            Ok(ParquetMorselizer {
                 partition_index: self.partition_index,
                 projection,
                 batch_size: self.batch_size,
@@ -1609,7 +1838,8 @@ mod test {
                 max_predicate_cache_size: self.max_predicate_cache_size,
                 reverse_row_groups: self.reverse_row_groups,
                 sort_order_for_reorder: None,
-            }
+                virtual_state,
+            })
         }
     }
 
@@ -1770,7 +2000,7 @@ mod test {
     async fn write_parquet(
         store: Arc<dyn ObjectStore>,
         filename: &str,
-        batch: arrow::record_batch::RecordBatch,
+        batch: RecordBatch,
     ) -> usize {
         write_parquet_batches(store, filename, vec![batch], None).await
     }
@@ -1779,7 +2009,7 @@ mod test {
     async fn write_parquet_batches(
         store: Arc<dyn ObjectStore>,
         filename: &str,
-        batches: Vec<arrow::record_batch::RecordBatch>,
+        batches: Vec<RecordBatch>,
         props: Option<WriterProperties>,
     ) -> usize {
         let mut out = BytesMut::new().writer();
@@ -2683,4 +2913,433 @@ mod test {
         assert!(runs[2].needs_filter);
         assert_eq!(runs[2].access_plan.row_group_indexes(), vec![3]);
     }
+
+    /// Helpers for tests that exercise parquet virtual columns
+    /// (e.g. `row_number`) plumbed through `TableSchema`/`ParquetOpener`.
+    mod virtual_columns {
+        use super::*;
+        use arrow::array::{Array, Int64Array};
+        use arrow::datatypes::FieldRef;
+        use parquet::arrow::RowNumber;
+
+        /// Build a parquet `row_number` virtual column field. Spark's
+        /// `_tmp_metadata_row_index` is declared nullable, so the default
+        /// matches that contract; tests that need `nullable=false` can
+        /// override via `with_nullable`.
+        fn row_number_field(name: &str, nullable: bool) -> FieldRef {
+            Arc::new(
+                Field::new(name, DataType::Int64, nullable)
+                    .with_extension_type(RowNumber),
+            )
+        }
+
+        /// Collect every `Int64` value from the given column in every batch
+        /// of a stream. Used to verify the `row_number` column end to end.
+        async fn collect_int64_values(
+            mut stream: BoxStream<'static, Result<RecordBatch>>,
+            column: usize,
+        ) -> Vec<i64> {
+            let mut out = vec![];
+            while let Some(batch) = stream.next().await {
+                let batch = batch.unwrap();
+                let array = batch
+                    .column(column)
+                    .as_any()
+                    .downcast_ref::<Int64Array>()
+                    .expect("expected Int64 column");
+                for i in 0..array.len() {
+                    assert!(
+                        !array.is_null(i),
+                        "row_number values produced by the reader must not be 
null"
+                    );
+                    out.push(array.value(i));
+                }
+            }
+            out
+        }
+
+        /// Write a parquet file containing `num_row_groups` groups of
+        /// `rows_per_group` rows with a single `value` Int64 column.
+        /// Values are `0..num_row_groups*rows_per_group`.
+        async fn write_grouped_file(
+            store: &Arc<dyn ObjectStore>,
+            path: &str,
+            num_row_groups: usize,
+            rows_per_group: usize,
+        ) -> (SchemaRef, usize) {
+            let schema = Arc::new(Schema::new(vec![Field::new(
+                "value",
+                DataType::Int64,
+                false,
+            )]));
+            let mut batches = Vec::with_capacity(num_row_groups);
+            for g in 0..num_row_groups {
+                let start = (g * rows_per_group) as i64;
+                let values: Vec<i64> = (start..start + rows_per_group as 
i64).collect();
+                batches.push(
+                    RecordBatch::try_new(
+                        Arc::clone(&schema),
+                        vec![Arc::new(Int64Array::from(values))],
+                    )
+                    .unwrap(),
+                );
+            }
+            let props = WriterProperties::builder()
+                .set_max_row_group_row_count(Some(rows_per_group))
+                .build();
+            let data_size =
+                write_parquet_batches(Arc::clone(store), path, batches, 
Some(props))
+                    .await;
+            (schema, data_size)
+        }
+
+        #[tokio::test]
+        async fn test_row_index_basic() {
+            let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
+            let (file_schema, data_size) =
+                write_grouped_file(&store, "basic.parquet", 1, 5).await;
+
+            let rn_field = row_number_field("row_number", false);
+            let table_schema = 
TableSchemaBuilder::new(Arc::clone(&file_schema))
+                .with_virtual_columns(vec![Arc::clone(&rn_field)])
+                .build();
+            // Project [value, row_number] — indices in table_schema are
+            // [0 file:value, 1 virtual:row_number].
+            let projection =
+                ProjectionExprs::from_indices(&[0, 1], 
table_schema.table_schema());
+
+            let morselizer = ParquetMorselizerBuilder::new()
+                .with_store(Arc::clone(&store))
+                .with_table_schema(table_schema)
+                .with_projection(projection)
+                .build();
+
+            let file = PartitionedFile::new(
+                "basic.parquet".to_string(),
+                u64::try_from(data_size).unwrap(),
+            );
+            let stream = open_file(&morselizer, file).await.unwrap();
+            let row_numbers = collect_int64_values(stream, 1).await;
+            assert_eq!(row_numbers, vec![0, 1, 2, 3, 4]);
+        }
+
+        #[tokio::test]
+        async fn test_row_index_projection_only() {
+            let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
+            let (file_schema, data_size) =
+                write_grouped_file(&store, "proj_only.parquet", 1, 4).await;
+
+            let rn_field = row_number_field("row_number", false);
+            let table_schema = 
TableSchemaBuilder::new(Arc::clone(&file_schema))
+                .with_virtual_columns(vec![Arc::clone(&rn_field)])
+                .build();
+            // Project only the virtual column (index 1).
+            let projection =
+                ProjectionExprs::from_indices(&[1], 
table_schema.table_schema());
+
+            let morselizer = ParquetMorselizerBuilder::new()
+                .with_store(Arc::clone(&store))
+                .with_table_schema(table_schema)
+                .with_projection(projection)
+                .build();
+
+            let file = PartitionedFile::new(
+                "proj_only.parquet".to_string(),
+                u64::try_from(data_size).unwrap(),
+            );
+            let stream = open_file(&morselizer, file).await.unwrap();
+            let row_numbers = collect_int64_values(stream, 0).await;
+            assert_eq!(row_numbers, vec![0, 1, 2, 3]);
+        }
+
+        #[tokio::test]
+        async fn test_row_index_multi_row_group() {
+            let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
+            let (file_schema, data_size) =
+                write_grouped_file(&store, "multi_rg.parquet", 3, 100).await;
+
+            let rn_field = row_number_field("row_number", false);
+            let table_schema = 
TableSchemaBuilder::new(Arc::clone(&file_schema))
+                .with_virtual_columns(vec![Arc::clone(&rn_field)])
+                .build();
+            let projection =
+                ProjectionExprs::from_indices(&[0, 1], 
table_schema.table_schema());
+
+            let morselizer = ParquetMorselizerBuilder::new()
+                .with_store(Arc::clone(&store))
+                .with_table_schema(table_schema)
+                .with_projection(projection)
+                .build();
+
+            let file = PartitionedFile::new(
+                "multi_rg.parquet".to_string(),
+                u64::try_from(data_size).unwrap(),
+            );
+            let stream = open_file(&morselizer, file).await.unwrap();
+            let row_numbers = collect_int64_values(stream, 1).await;
+            let expected: Vec<i64> = (0..300).collect();
+            assert_eq!(row_numbers, expected);
+        }
+
+        #[tokio::test]
+        async fn test_row_index_with_row_group_skip() {
+            // 3 row groups of 100 rows. A predicate that excludes the middle
+            // row group (values 100..200) must leave absolute row numbers
+            // 0..100 and 200..300 intact — not 0..200. This guards against
+            // the arrow-rs bug fixed in apache/arrow-rs#8863.
+            let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
+            let (file_schema, data_size) =
+                write_grouped_file(&store, "rg_skip.parquet", 3, 100).await;
+
+            let rn_field = row_number_field("row_number", false);
+            let table_schema = 
TableSchemaBuilder::new(Arc::clone(&file_schema))
+                .with_virtual_columns(vec![Arc::clone(&rn_field)])
+                .build();
+            let projection =
+                ProjectionExprs::from_indices(&[0, 1], 
table_schema.table_schema());
+
+            // `value < 100 OR value >= 200` prunes the middle row group via
+            // min/max statistics.
+            let expr = col("value")
+                .lt(lit(100i64))
+                .or(col("value").gt_eq(lit(200i64)));
+            let predicate = logical2physical(&expr, 
table_schema.table_schema());
+
+            let morselizer = ParquetMorselizerBuilder::new()
+                .with_store(Arc::clone(&store))
+                .with_table_schema(table_schema)
+                .with_projection(projection)
+                .with_predicate(predicate)
+                .with_row_group_stats_pruning(true)
+                .build();
+
+            let file = PartitionedFile::new(
+                "rg_skip.parquet".to_string(),
+                u64::try_from(data_size).unwrap(),
+            );
+            let stream = open_file(&morselizer, file).await.unwrap();
+            let row_numbers = collect_int64_values(stream, 1).await;
+            let expected: Vec<i64> = (0..100).chain(200..300).collect();
+            assert_eq!(row_numbers, expected);
+        }
+
+        #[tokio::test]
+        async fn test_row_index_with_partition_cols() {
+            let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
+            let (file_schema, data_size) =
+                write_grouped_file(&store, "part=5/data.parquet", 1, 3).await;
+
+            let rn_field = row_number_field("row_number", false);
+            let partition_col = Arc::new(Field::new("part", DataType::Int32, 
false));
+            let table_schema = 
TableSchemaBuilder::new(Arc::clone(&file_schema))
+                .with_table_partition_cols(vec![Arc::clone(&partition_col)])
+                .with_virtual_columns(vec![Arc::clone(&rn_field)])
+                .build();
+            // table_schema layout: [value(0), part(1), row_number(2)].
+            let projection =
+                ProjectionExprs::from_indices(&[0, 1, 2], 
table_schema.table_schema());
+
+            let morselizer = ParquetMorselizerBuilder::new()
+                .with_store(Arc::clone(&store))
+                .with_table_schema(table_schema)
+                .with_projection(projection)
+                .build();
+
+            let mut file = PartitionedFile::new(
+                "part=5/data.parquet".to_string(),
+                u64::try_from(data_size).unwrap(),
+            );
+            file.partition_values = vec![ScalarValue::Int32(Some(5))];
+
+            let stream = open_file(&morselizer, file).await.unwrap();
+            let mut stream = stream;
+            let batch = stream.next().await.unwrap().unwrap();
+            assert!(stream.next().await.is_none());
+
+            assert_eq!(batch.num_columns(), 3);
+            assert_eq!(batch.schema().field(0).name(), "value");
+            assert_eq!(batch.schema().field(1).name(), "part");
+            assert_eq!(batch.schema().field(2).name(), "row_number");
+
+            let part = batch
+                .column(1)
+                .as_any()
+                .downcast_ref::<arrow::array::Int32Array>()
+                .unwrap();
+            assert!(part.iter().all(|v| v == Some(5)));
+
+            let rn = batch
+                .column(2)
+                .as_any()
+                .downcast_ref::<Int64Array>()
+                .unwrap();
+            let rn_values: Vec<i64> = (0..rn.len()).map(|i| 
rn.value(i)).collect();
+            assert_eq!(rn_values, vec![0, 1, 2]);
+        }
+
+        #[tokio::test]
+        async fn test_row_index_nullable_int64() {
+            // Spark declares `_tmp_metadata_row_index` nullable. Verify the
+            // nullability flag flows through unchanged.
+            let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
+            let (file_schema, data_size) =
+                write_grouped_file(&store, "nullable.parquet", 1, 3).await;
+
+            let rn_field = row_number_field("_tmp_metadata_row_index", true);
+            let table_schema = 
TableSchemaBuilder::new(Arc::clone(&file_schema))
+                .with_virtual_columns(vec![Arc::clone(&rn_field)])
+                .build();
+            let projection =
+                ProjectionExprs::from_indices(&[0, 1], 
table_schema.table_schema());
+
+            let morselizer = ParquetMorselizerBuilder::new()
+                .with_store(Arc::clone(&store))
+                .with_table_schema(table_schema)
+                .with_projection(projection)
+                .build();
+
+            let file = PartitionedFile::new(
+                "nullable.parquet".to_string(),
+                u64::try_from(data_size).unwrap(),
+            );
+            let mut stream = open_file(&morselizer, file).await.unwrap();
+            let batch = stream.next().await.unwrap().unwrap();
+
+            let schema_field = batch.schema().field(1).clone();
+            assert_eq!(schema_field.name(), "_tmp_metadata_row_index");
+            assert_eq!(schema_field.data_type(), &DataType::Int64);
+            assert!(
+                schema_field.is_nullable(),
+                "nullable flag should be preserved for Spark's row index field"
+            );
+        }
+
+        #[tokio::test]
+        async fn test_unsupported_virtual_extension_type_rejected() {
+            // Guard: opener must reject virtual columns carrying extension
+            // types outside the tested allowlist, rather than silently
+            // forwarding them to arrow-rs (where they would produce columns
+            // we have not validated against DataFusion's projection and
+            // predicate paths).
+            let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
+            let (file_schema, _data_size) =
+                write_grouped_file(&store, "unsupported.parquet", 1, 1).await;
+
+            // RowGroupIndex is a real arrow-rs virtual type but is not in
+            // SUPPORTED_VIRTUAL_EXTENSION_TYPES until a test is added for it.
+            let rg_field = Arc::new(
+                Field::new("row_group_index", DataType::Int64, false)
+                    .with_extension_type(parquet::arrow::RowGroupIndex),
+            );
+            let table_schema = 
TableSchemaBuilder::new(Arc::clone(&file_schema))
+                .with_virtual_columns(vec![rg_field])
+                .build();
+            let projection =
+                ProjectionExprs::from_indices(&[0, 1], 
table_schema.table_schema());
+
+            // Validation now happens at morselizer-build time (once per scan
+            // partition), not once per file inside `prepare_open_file`.
+            let err = ParquetMorselizerBuilder::new()
+                .with_store(Arc::clone(&store))
+                .with_table_schema(table_schema)
+                .with_projection(projection)
+                .try_build()
+                .unwrap_err();
+            let msg = err.to_string();
+            assert!(
+                msg.contains("parquet.virtual.row_group_index"),
+                "error should name the unsupported extension type, got: {msg}"
+            );
+        }
+
+        /// Build a morselizer + file for a 5-row single-row-group parquet at
+        /// `path`, with a single `row_number` virtual column and the given
+        /// physical predicate applied to
+        /// `table_schema = [value(0), row_number(1)]`.
+        async fn build_pushdown_morselizer(
+            store: &Arc<dyn ObjectStore>,
+            path: &str,
+            predicate_expr: datafusion_expr::Expr,
+            pushdown_filters: bool,
+        ) -> Result<(ParquetMorselizer, PartitionedFile)> {
+            let (file_schema, data_size) = write_grouped_file(store, path, 1, 
5).await;
+            let rn_field = row_number_field("row_number", false);
+            let table_schema = 
TableSchemaBuilder::new(Arc::clone(&file_schema))
+                .with_virtual_columns(vec![Arc::clone(&rn_field)])
+                .build();
+            let projection =
+                ProjectionExprs::from_indices(&[0, 1], 
table_schema.table_schema());
+            let predicate =
+                logical2physical(&predicate_expr, table_schema.table_schema());
+
+            let morselizer = ParquetMorselizerBuilder::new()
+                .with_store(Arc::clone(store))
+                .with_table_schema(table_schema)
+                .with_projection(projection)
+                .with_predicate(predicate)
+                .with_pushdown_filters(pushdown_filters)
+                .try_build()?;
+
+            let file =
+                PartitionedFile::new(path.to_string(), 
u64::try_from(data_size).unwrap());
+            Ok((morselizer, file))
+        }
+
+        // The predicate-vs-virtual-column check rejects callers that bypass
+        // `ParquetSource::try_pushdown_filters` (which keeps virtual-col
+        // filters above the scan as a `FilterExec`) and set the predicate
+        // directly on the source with pushdown enabled. Without this guard,
+        // arrow-rs's `RowFilter` would silently drop the virtual-col conjunct
+        // and produce wrong results.
+        #[tokio::test]
+        async fn test_row_index_predicate_pushdown_mixed_or_errors() {
+            let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
+            let expr = col("row_number")
+                .eq(lit(2i64))
+                .or(col("value").eq(lit(4i64)));
+            let err =
+                build_pushdown_morselizer(&store, "pushdown_mixed.parquet", 
expr, true)
+                    .await
+                    .unwrap_err();
+            assert!(
+                err.to_string().contains("try_pushdown_filters"),
+                "error should mention try_pushdown_filters, got: {err}"
+            );
+        }
+
+        #[tokio::test]
+        async fn test_row_index_predicate_pushdown_virtual_only_errors() {
+            let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
+            let expr = col("row_number").eq(lit(2i64));
+            let err = build_pushdown_morselizer(
+                &store,
+                "pushdown_virtual_only.parquet",
+                expr,
+                true,
+            )
+            .await
+            .unwrap_err();
+            assert!(
+                err.to_string().contains("try_pushdown_filters"),
+                "error should mention try_pushdown_filters, got: {err}"
+            );
+        }
+
+        #[tokio::test]
+        async fn test_row_index_predicate_allowed_when_pushdown_disabled() {
+            // Guards the `pushdown_filters=false` path: the predicate is only
+            // used for stats pruning (a no-op for row_number) and must not
+            // trip the virtual-column check.
+            let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
+            let expr = col("row_number").eq(lit(2i64));
+            let (morselizer, file) =
+                build_pushdown_morselizer(&store, "pushdown_off.parquet", 
expr, false)
+                    .await
+                    .unwrap();
+
+            let stream = open_file(&morselizer, file).await.unwrap();
+            let (_batches, rows) = count_batches_and_rows(stream).await;
+            assert_eq!(rows, 5);
+        }
+    }
 }
diff --git a/datafusion/datasource-parquet/src/source.rs 
b/datafusion/datasource-parquet/src/source.rs
index 8952666491..acba8ff828 100644
--- a/datafusion/datasource-parquet/src/source.rs
+++ b/datafusion/datasource-parquet/src/source.rs
@@ -24,6 +24,7 @@ use crate::DefaultParquetFileReaderFactory;
 use crate::ParquetFileReaderFactory;
 use crate::opener::ParquetMorselizer;
 use crate::opener::build_pruning_predicates;
+use crate::opener::build_virtual_columns_state;
 use crate::row_filter::can_expr_be_pushed_down_with_schemas;
 use datafusion_common::config::ConfigOptions;
 #[cfg(feature = "parquet_encryption")]
@@ -346,7 +347,11 @@ impl ParquetSource {
         self
     }
 
-    /// Set predicate information
+    /// Set predicate information.
+    ///
+    /// Predicates referencing virtual columns must go through
+    /// [`Self::try_pushdown_filters`]. Passing them here with pushdown
+    /// enabled trips a debug assert in the opener.
     #[expect(clippy::needless_pass_by_value)]
     pub fn with_predicate(&self, predicate: Arc<dyn PhysicalExpr>) -> Self {
         let mut conf = self.clone();
@@ -584,6 +589,22 @@ impl FileSource for ParquetSource {
             );
         }
 
+        // Validate virtual columns (extension-type allowlist) and, when
+        // pushdown is enabled, reject predicates that reference them. Both
+        // checks depend only on morselizer-level state, so we pay their cost
+        // once per scan partition rather than per file.
+        //
+        // Gating predicate validation on `pushdown_filters` is deliberate:
+        // when pushdown is off the predicate stays above the scan as a
+        // `FilterExec` and resolves virtual columns there; the row-filter
+        // ban only applies to the pushdown path.
+        let virtual_state = build_virtual_columns_state(
+            self.table_schema.virtual_columns(),
+            self.table_schema.file_schema(),
+            self.predicate.as_ref(),
+            self.pushdown_filters(),
+        )?;
+
         Ok(Box::new(ParquetMorselizer {
             partition_index: partition,
             projection: self.projection.clone(),
@@ -613,6 +634,7 @@ impl FileSource for ParquetSource {
             max_predicate_cache_size: self.max_predicate_cache_size(),
             reverse_row_groups: self.reverse_row_groups,
             sort_order_for_reorder: self.sort_order_for_reorder.clone(),
+            virtual_state,
         }))
     }
 
@@ -727,7 +749,12 @@ impl FileSource for ParquetSource {
         filters: Vec<Arc<dyn PhysicalExpr>>,
         config: &ConfigOptions,
     ) -> datafusion_common::Result<FilterPushdownPropagation<Arc<dyn 
FileSource>>> {
-        let table_schema = self.table_schema.table_schema();
+        // Use the schema excluding virtual columns: virtual columns (e.g.
+        // Parquet `row_number`) are produced by the reader itself and cannot
+        // be referenced inside a RowFilter, so predicates that reference them
+        // must not be marked as pushed down — otherwise the scan would
+        // silently drop them and produce wrong results.
+        let pushable_schema = 
self.table_schema.schema_without_virtual_columns();
         // Determine if based on configs we should push filters down.
         // If either the table / scan itself or the config has pushdown 
enabled,
         // we will push down the filters.
@@ -743,7 +770,7 @@ impl FileSource for ParquetSource {
         let filters: Vec<PushedDownPredicate> = filters
             .into_iter()
             .map(|filter| {
-                if can_expr_be_pushed_down_with_schemas(&filter, table_schema) 
{
+                if can_expr_be_pushed_down_with_schemas(&filter, 
&pushable_schema) {
                     PushedDownPredicate::supported(filter)
                 } else {
                     PushedDownPredicate::unsupported(filter)
@@ -1583,4 +1610,67 @@ mod tests {
             );
         }
     }
+
+    #[test]
+    fn test_try_pushdown_filters_rejects_virtual_column_refs() {
+        // Virtual columns are produced by the reader and cannot be referenced
+        // inside a RowFilter. `try_pushdown_filters` must report such filters
+        // as `PushedDown::No` so the FilterExec above the scan stays in
+        // place — otherwise the scan would silently drop the predicate and
+        // produce wrong results.
+        use arrow::datatypes::{DataType, Field, FieldRef, Schema};
+        use datafusion_common::config::ConfigOptions;
+        use datafusion_datasource::TableSchema;
+        use datafusion_expr::{col, lit as logical_lit};
+        use datafusion_physical_expr::planner::logical2physical;
+        use datafusion_physical_plan::filter_pushdown::PushedDown;
+        use parquet::arrow::RowNumber;
+
+        let file_schema = Arc::new(Schema::new(vec![Field::new(
+            "value",
+            DataType::Int64,
+            false,
+        )]));
+        let row_number_field: FieldRef = Arc::new(
+            Field::new("row_number", DataType::Int64, false)
+                .with_extension_type(RowNumber),
+        );
+        let table_schema = TableSchema::builder(file_schema)
+            .with_virtual_columns(vec![row_number_field])
+            .build();
+
+        let source = 
ParquetSource::new(table_schema).with_pushdown_filters(true);
+
+        let full_schema = source.table_schema.table_schema();
+
+        let pushable = logical2physical(&col("value").eq(logical_lit(1i64)), 
full_schema);
+        let virtual_only =
+            logical2physical(&col("row_number").eq(logical_lit(2i64)), 
full_schema);
+        let mixed = logical2physical(
+            &col("row_number")
+                .eq(logical_lit(2i64))
+                .or(col("value").eq(logical_lit(4i64))),
+            full_schema,
+        );
+
+        let config = ConfigOptions::default();
+        let prop = source
+            .try_pushdown_filters(vec![pushable, virtual_only, mixed], &config)
+            .expect("try_pushdown_filters must not error");
+
+        assert_eq!(prop.filters.len(), 3);
+        assert!(
+            matches!(prop.filters[0], PushedDown::Yes),
+            "file-column filter should be pushable"
+        );
+        assert!(
+            matches!(prop.filters[1], PushedDown::No),
+            "filter referencing only a virtual column must not be pushed down"
+        );
+        assert!(
+            matches!(prop.filters[2], PushedDown::No),
+            "filter mixing a virtual column with a file column must not be \
+             pushed down (row filter would silently drop it)"
+        );
+    }
 }
diff --git a/datafusion/datasource-parquet/src/virtual_column.rs 
b/datafusion/datasource-parquet/src/virtual_column.rs
new file mode 100644
index 0000000000..2290ad2aea
--- /dev/null
+++ b/datafusion/datasource-parquet/src/virtual_column.rs
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Typed wrapper for parquet virtual columns.
+//!
+//! arrow-rs identifies virtual columns via arrow extension types carried on
+//! the `FieldRef`. [`ParquetVirtualColumn`] lifts that contract into the type
+//! system so callers validate at the boundary (via `TryFrom<&FieldRef>`)
+//! rather than string-comparing extension-type names deep inside the reader.
+
+use arrow::datatypes::FieldRef;
+use arrow_schema::extension::ExtensionType;
+use datafusion_common::{DataFusionError, Result, not_impl_err};
+use parquet::arrow::RowNumber;
+use std::sync::Arc;
+
+/// A parquet virtual column validated to have a supported arrow extension
+/// type.
+///
+/// Construct via [`TryFrom<&FieldRef>`]; add a new variant (and update the
+/// `TryFrom` impl) when DataFusion gains support for another arrow-rs virtual
+/// extension type.
+#[derive(Debug, Clone)]
+pub enum ParquetVirtualColumn {
+    /// Absolute row number within the parquet file. Backed by arrow-rs's
+    /// [`RowNumber`] extension type.
+    RowNumber(FieldRef),
+}
+
+impl ParquetVirtualColumn {
+    pub fn field(&self) -> &FieldRef {
+        match self {
+            Self::RowNumber(field) => field,
+        }
+    }
+}
+
+impl From<ParquetVirtualColumn> for FieldRef {
+    fn from(col: ParquetVirtualColumn) -> Self {
+        match col {
+            ParquetVirtualColumn::RowNumber(field) => field,
+        }
+    }
+}
+
+impl TryFrom<&FieldRef> for ParquetVirtualColumn {
+    type Error = DataFusionError;
+
+    fn try_from(field: &FieldRef) -> Result<Self> {
+        let Some(name) = field.extension_type_name() else {
+            return not_impl_err!(
+                "Virtual column '{}' is missing an Arrow extension type; \
+                 supported extension types: [{}]",
+                field.name(),
+                RowNumber::NAME
+            );
+        };
+        match name {
+            n if n == RowNumber::NAME => 
Ok(Self::RowNumber(Arc::clone(field))),
+            other => not_impl_err!(
+                "Virtual column '{}' uses unsupported Arrow extension type 
'{}'; \
+                 supported types: [{}]. Add a ParquetVirtualColumn variant and 
\
+                 a test for this type before wiring it through.",
+                field.name(),
+                other,
+                RowNumber::NAME
+            ),
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use arrow::datatypes::{DataType, Field};
+
+    #[test]
+    fn row_number_field_converts() {
+        let field: FieldRef = Arc::new(
+            Field::new("row_number", DataType::Int64, false)
+                .with_extension_type(RowNumber),
+        );
+        let col = ParquetVirtualColumn::try_from(&field).expect("valid 
row_number");
+        assert!(matches!(col, ParquetVirtualColumn::RowNumber(_)));
+        assert_eq!(col.field().name(), "row_number");
+    }
+
+    #[test]
+    fn missing_extension_type_rejected() {
+        let field: FieldRef = Arc::new(Field::new("plain", DataType::Int64, 
false));
+        let err = ParquetVirtualColumn::try_from(&field).unwrap_err();
+        assert!(
+            err.to_string().contains("missing an Arrow extension type"),
+            "got: {err}"
+        );
+    }
+
+    #[test]
+    fn unsupported_extension_type_rejected() {
+        // RowGroupIndex is a real arrow-rs virtual type not yet in our enum.
+        let field: FieldRef = Arc::new(
+            Field::new("row_group_index", DataType::Int64, false)
+                .with_extension_type(parquet::arrow::RowGroupIndex),
+        );
+        let err = ParquetVirtualColumn::try_from(&field).unwrap_err();
+        assert!(
+            err.to_string().contains("parquet.virtual.row_group_index"),
+            "error should name the offending extension type, got: {err}"
+        );
+    }
+}
diff --git a/datafusion/datasource/src/table_schema.rs 
b/datafusion/datasource/src/table_schema.rs
index 8b6d18b0e5..085040e7de 100644
--- a/datafusion/datasource/src/table_schema.rs
+++ b/datafusion/datasource/src/table_schema.rs
@@ -23,10 +23,17 @@ use std::sync::Arc;
 /// The overall schema for potentially partitioned data sources.
 ///
 /// When reading partitioned data (such as Hive-style partitioning), a 
[`TableSchema`]
-/// consists of two parts:
+/// consists of up to three parts:
 /// 1. **File schema**: The schema of the actual data files on disk
 /// 2. **Partition columns**: Columns whose values are encoded in the 
directory structure,
 ///    but not stored in the files themselves
+/// 3. **Virtual columns**: Columns produced by the file reader (e.g. Parquet
+///    `row_number`) that are not stored in the files
+///
+/// The full table schema is composed in that order: file columns, then
+/// partition columns, then virtual columns. Consumers that need a different
+/// output ordering should use a projection on top of
+/// [`TableSchema::table_schema`].
 ///
 /// # Example: Partitioned Table
 ///
@@ -76,10 +83,24 @@ pub struct TableSchema {
     /// with an existing schema.
     table_partition_cols: Fields,
 
-    /// The complete table schema: file_schema columns followed by partition 
columns.
+    /// Virtual columns that are generated by the reader rather than read from
+    /// the data files or the directory structure.
+    ///
+    /// For example, a Parquet reader may inject a `row_number` column whose
+    /// values are produced per file by the reader. Virtual column fields must
+    /// carry an arrow extension type (e.g. `RowNumber`, `RowGroupIndex`) so 
the
+    /// file reader can recognize them.
     ///
-    /// This is pre-computed during construction by concatenating `file_schema`
-    /// and `table_partition_cols`, so it can be returned as a cheap reference.
+    /// Virtual columns are appended at the end of the table schema, after the
+    /// file columns and any partition columns (layout: `[file, partition,
+    /// virtual]`).
+    virtual_columns: Fields,
+
+    /// The complete table schema: file_schema columns, followed by partition
+    /// columns, followed by virtual columns.
+    ///
+    /// This is pre-computed during construction by concatenating the three
+    /// parts, so it can be returned as a cheap reference.
     table_schema: SchemaRef,
 }
 
@@ -140,7 +161,7 @@ impl TableSchema {
     }
 
     /// Return a new `TableSchema` with `partition_cols` as its partition 
columns,
-    /// replacing any existing ones.
+    /// replacing any existing ones. Existing virtual columns are preserved.
     #[deprecated(
         since = "55.0.0",
         note = "use 
TableSchema::builder(file_schema).with_table_partition_cols(cols).build()"
@@ -148,6 +169,22 @@ impl TableSchema {
     pub fn with_table_partition_cols(self, partition_cols: Vec<FieldRef>) -> 
Self {
         TableSchemaBuilder::new(self.file_schema)
             .with_table_partition_cols(partition_cols)
+            .with_virtual_columns(self.virtual_columns)
+            .build()
+    }
+
+    /// Return a new `TableSchema` with `virtual_columns` as its virtual 
columns,
+    /// replacing any existing ones. Existing partition columns are preserved.
+    ///
+    /// Virtual columns are produced by the file reader (e.g. a Parquet
+    /// `row_number` column) rather than stored in the files or derived from
+    /// partition paths. Each field must carry an arrow virtual extension type 
so
+    /// the reader can recognize it; `ParquetOpener` forwards these fields to
+    /// 
`parquet::arrow::arrow_reader::ArrowReaderOptions::with_virtual_columns`.
+    pub fn with_virtual_columns(self, virtual_columns: Vec<FieldRef>) -> Self {
+        TableSchemaBuilder::new(self.file_schema)
+            .with_table_partition_cols(self.table_partition_cols)
+            .with_virtual_columns(virtual_columns)
             .build()
     }
 
@@ -166,13 +203,43 @@ impl TableSchema {
         &self.table_partition_cols
     }
 
-    /// Get the full table schema (file schema + partition columns).
+    /// Get the virtual columns.
     ///
-    /// This is the complete schema that will be seen by queries, combining
-    /// both the columns from the files and the partition columns.
+    /// Virtual columns are produced by the file reader (e.g. Parquet
+    /// `row_number`) and are not stored in the data files or derived from
+    /// partition paths.
+    pub fn virtual_columns(&self) -> &Fields {
+        &self.virtual_columns
+    }
+
+    /// Get the full table schema (file schema + partition columns + virtual 
columns).
+    ///
+    /// This is the complete schema that will be seen by queries. Fields appear
+    /// in the order: file columns, partition columns, virtual columns.
     pub fn table_schema(&self) -> &SchemaRef {
         &self.table_schema
     }
+
+    /// Schema of columns that can be referenced by predicates pushed into the
+    /// file reader: file columns plus partition columns, excluding virtual
+    /// columns.
+    ///
+    /// Virtual columns are produced by the reader itself (e.g. Parquet
+    /// `row_number`) and cannot be referenced inside the reader's row filter,
+    /// so predicates that reference them must stay above the scan. Callers
+    /// deciding which filters to push down should check against this schema
+    /// rather than [`Self::table_schema`].
+    ///
+    /// When there are no virtual columns this returns the same schema as
+    /// [`Self::table_schema`].
+    pub fn schema_without_virtual_columns(&self) -> SchemaRef {
+        if self.virtual_columns.is_empty() {
+            return Arc::clone(&self.table_schema);
+        }
+        let mut builder = SchemaBuilder::from(self.file_schema.as_ref());
+        builder.extend(self.table_partition_cols.iter().cloned());
+        Arc::new(builder.finish())
+    }
 }
 
 impl From<SchemaRef> for TableSchema {
@@ -189,9 +256,10 @@ impl From<&SchemaRef> for TableSchema {
 
 /// Builder for [`TableSchema`].
 ///
-/// The file schema is the only required input; partition columns are optional.
-/// Unlike calling [`TableSchema`]'s setters repeatedly, the builder computes 
the
-/// concatenated table schema exactly once, in [`TableSchemaBuilder::build`].
+/// The file schema is the only required input; partition columns and virtual
+/// columns are optional. Unlike calling [`TableSchema`]'s setters repeatedly,
+/// the builder computes the concatenated table schema exactly once, in
+/// [`TableSchemaBuilder::build`].
 ///
 /// ```
 /// # use std::sync::Arc;
@@ -207,15 +275,17 @@ impl From<&SchemaRef> for TableSchema {
 pub struct TableSchemaBuilder {
     file_schema: SchemaRef,
     table_partition_cols: Fields,
+    virtual_columns: Fields,
 }
 
 impl TableSchemaBuilder {
     /// Create a builder for a `TableSchema` over the given file schema, with 
no
-    /// partition columns yet.
+    /// partition or virtual columns yet.
     pub fn new(file_schema: SchemaRef) -> Self {
         Self {
             file_schema,
             table_partition_cols: Fields::empty(),
+            virtual_columns: Fields::empty(),
         }
     }
 
@@ -231,13 +301,39 @@ impl TableSchemaBuilder {
         self
     }
 
-    /// Build the [`TableSchema`], computing the full `file + partition` 
schema once.
+    /// Set the virtual columns, replacing any previously set.
+    ///
+    /// Virtual columns are produced by the file reader (e.g. Parquet
+    /// `row_number`) and appended at the end of the table schema. Each field
+    /// must carry an arrow virtual extension type so the reader can recognize
+    /// it.
+    ///
+    /// Accepts anything convertible into [`Fields`] (e.g. `Vec<FieldRef>`).
+    pub fn with_virtual_columns(mut self, virtual_columns: impl Into<Fields>) 
-> Self {
+        self.virtual_columns = virtual_columns.into();
+        self
+    }
+
+    /// Build the [`TableSchema`], computing the full
+    /// `file + partition + virtual` schema once.
     pub fn build(self) -> TableSchema {
+        debug_assert!(
+            self.virtual_columns.iter().enumerate().all(|(i, v)| {
+                let name = v.name();
+                !self.file_schema.fields().iter().any(|f| f.name() == name)
+                    && !self.table_partition_cols.iter().any(|p| p.name() == 
name)
+                    && !self.virtual_columns[..i].iter().any(|w| w.name() == 
name)
+            }),
+            "virtual column name collides with an existing file, partition, or 
virtual column"
+        );
+
         let mut builder = SchemaBuilder::from(self.file_schema.as_ref());
         builder.extend(self.table_partition_cols.iter().cloned());
+        builder.extend(self.virtual_columns.iter().cloned());
         TableSchema {
             file_schema: self.file_schema,
             table_partition_cols: self.table_partition_cols,
+            virtual_columns: self.virtual_columns,
             table_schema: Arc::new(builder.finish()),
         }
     }
@@ -393,4 +489,125 @@ mod tests {
         assert_eq!(original.table_partition_cols().len(), 1);
         assert_eq!(original.table_partition_cols()[0].name(), "country");
     }
+
+    #[test]
+    fn test_builder_with_virtual_columns_layout() {
+        let file_schema = Arc::new(Schema::new(vec![
+            Field::new("user_id", DataType::Int64, false),
+            Field::new("amount", DataType::Float64, false),
+        ]));
+
+        let virtual_cols =
+            vec![Arc::new(Field::new("row_number", DataType::Int64, true))];
+
+        let partition_cols = vec![Arc::new(Field::new("date", DataType::Utf8, 
false))];
+
+        // Apply virtual columns and partition columns in either order on the
+        // builder; the resulting table schema should always be
+        // [file, partition, virtual].
+        let built_virtual_first = 
TableSchemaBuilder::new(Arc::clone(&file_schema))
+            .with_virtual_columns(virtual_cols.clone())
+            .with_table_partition_cols(partition_cols.clone())
+            .build();
+
+        let built_partition_first = 
TableSchemaBuilder::new(Arc::clone(&file_schema))
+            .with_table_partition_cols(partition_cols.clone())
+            .with_virtual_columns(virtual_cols.clone())
+            .build();
+
+        let expected = Schema::new(vec![
+            Field::new("user_id", DataType::Int64, false),
+            Field::new("amount", DataType::Float64, false),
+            Field::new("date", DataType::Utf8, false),
+            Field::new("row_number", DataType::Int64, true),
+        ]);
+
+        for ts in [built_virtual_first, built_partition_first] {
+            assert_eq!(ts.table_schema().as_ref(), &expected);
+            assert_eq!(ts.virtual_columns().len(), 1);
+            assert_eq!(ts.virtual_columns()[0].name(), "row_number");
+            assert_eq!(ts.table_partition_cols().len(), 1);
+            assert_eq!(ts.file_schema().fields().len(), 2);
+        }
+    }
+
+    #[test]
+    #[should_panic(expected = "virtual column name collides")]
+    #[cfg(debug_assertions)]
+    fn test_virtual_column_collides_with_file_schema_panics_in_debug() {
+        let file_schema = Arc::new(Schema::new(vec![Field::new(
+            "row_number",
+            DataType::Int64,
+            false,
+        )]));
+        let _ = TableSchemaBuilder::new(file_schema)
+            .with_virtual_columns(vec![Arc::new(Field::new(
+                "row_number",
+                DataType::Int64,
+                true,
+            ))])
+            .build();
+    }
+
+    #[test]
+    #[should_panic(expected = "virtual column name collides")]
+    #[cfg(debug_assertions)]
+    fn test_virtual_column_collides_with_partition_panics_in_debug() {
+        let file_schema = Arc::new(Schema::new(vec![Field::new(
+            "user_id",
+            DataType::Int64,
+            false,
+        )]));
+        let partition_cols =
+            vec![Arc::new(Field::new("row_number", DataType::Utf8, false))];
+        let _ = TableSchemaBuilder::new(file_schema)
+            .with_table_partition_cols(partition_cols)
+            .with_virtual_columns(vec![Arc::new(Field::new(
+                "row_number",
+                DataType::Int64,
+                true,
+            ))])
+            .build();
+    }
+
+    #[test]
+    #[should_panic(expected = "virtual column name collides")]
+    #[cfg(debug_assertions)]
+    fn test_duplicate_virtual_columns_panic_in_debug() {
+        let file_schema = Arc::new(Schema::new(vec![Field::new(
+            "user_id",
+            DataType::Int64,
+            false,
+        )]));
+        let _ = TableSchemaBuilder::new(file_schema)
+            .with_virtual_columns(vec![
+                Arc::new(Field::new("vc", DataType::Int64, true)),
+                Arc::new(Field::new("vc", DataType::Int64, true)),
+            ])
+            .build();
+    }
+
+    #[test]
+    #[should_panic(expected = "virtual column name collides")]
+    #[cfg(debug_assertions)]
+    fn test_partition_column_added_after_colliding_virtual_panics_in_debug() {
+        // Builder order is irrelevant: collision check runs in build().
+        let file_schema = Arc::new(Schema::new(vec![Field::new(
+            "user_id",
+            DataType::Int64,
+            false,
+        )]));
+        let _ = TableSchemaBuilder::new(file_schema)
+            .with_virtual_columns(vec![Arc::new(Field::new(
+                "row_number",
+                DataType::Int64,
+                true,
+            ))])
+            .with_table_partition_cols(vec![Arc::new(Field::new(
+                "row_number",
+                DataType::Utf8,
+                false,
+            ))])
+            .build();
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to