This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0ba4a30d5241 [SPARK-54656][SQL] Refactor SupportsPushDownVariants to 
be a ScanBuilder mix-in
0ba4a30d5241 is described below

commit 0ba4a30d52410de550f674ad2a5ab4eba478b132
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Wed Dec 10 13:34:45 2025 -0800

    [SPARK-54656][SQL] Refactor SupportsPushDownVariants to be a ScanBuilder 
mix-in
    
    ### What changes were proposed in this pull request?
    
    SupportsPushDownVariants was a Scan mix-in in #52578. This patch changes it 
to be a ScanBuilder mix-in to follow the established patterns in the codebase.
    
    ### Why are the changes needed?
    
    SupportsPushDownVariants was a Scan mix-in in #52578. This patch changes it 
to be a ScanBuilder mix-in to follow the established patterns in the codebase, 
e.g, join pushdown, aggregate pushdown...etc.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Unit tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code v2.0.14
    
    Closes #53276 from viirya/pushvariantdsv2-pr-refactor.
    
    Lead-authored-by: Liang-Chi Hsieh <[email protected]>
    Co-authored-by: Dongjoon Hyun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../read/SupportsPushDownVariantExtractions.java   |  63 +++++++
 .../connector/read/SupportsPushDownVariants.java   |  77 ---------
 .../sql/connector/read/VariantAccessInfo.java      | 105 -----------
 .../sql/connector/read/VariantExtraction.java      |  64 +++++++
 .../internal/connector/VariantExtractionImpl.scala |  45 +++++
 .../datasources/PushVariantIntoScan.scala          | 116 +------------
 .../datasources/v2/V2ScanRelationPushDown.scala    | 191 ++++++++++++++++++++-
 .../datasources/v2/parquet/ParquetScan.scala       | 110 ++++++------
 .../v2/parquet/ParquetScanBuilder.scala            |  17 +-
 9 files changed, 440 insertions(+), 348 deletions(-)

diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsPushDownVariantExtractions.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsPushDownVariantExtractions.java
new file mode 100644
index 000000000000..750e0479e542
--- /dev/null
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsPushDownVariantExtractions.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.read;
+
+import org.apache.spark.annotation.Experimental;
+
+/**
+ * A mix-in interface for {@link ScanBuilder}. Data sources can implement this 
interface to
+ * support pushing down variant field extraction operations to the data source.
+ * <p>
+ * When variant columns are accessed with specific field extractions (e.g., 
variant_get,
+ * try_variant_get), the optimizer can push these extractions down to the data 
source.
+ * The data source can then read only the required fields from variant 
columns, reducing
+ * I/O and improving performance.
+ * <p>
+ * Each {@link VariantExtraction} in the input array represents one field 
extraction operation.
+ * Data sources should examine each extraction and determine which ones can be 
handled efficiently.
+ * The return value is a boolean array of the same length, where each element 
indicates whether
+ * the corresponding extraction was accepted.
+ *
+ * @since 4.1.0
+ */
+@Experimental
+public interface SupportsPushDownVariantExtractions extends ScanBuilder {
+
+  /**
+   * Pushes down variant field extractions to the data source.
+   * <p>
+   * Each element in the input array represents one field extraction operation 
from a variant
+   * column. Data sources should examine each extraction and determine whether 
it can be
+   * pushed down based on the data source's capabilities (e.g., supported data 
types,
+   * path complexity, etc.).
+   * <p>
+   * The return value is a boolean array of the same length as the input 
array, where each
+   * element indicates whether the corresponding extraction was accepted:
+   * <ul>
+   *   <li>true: The extraction will be handled by the data source</li>
+   *   <li>false: The extraction will be handled by Spark after reading</li>
+   * </ul>
+   * <p>
+   * Data sources can choose to accept all, some, or none of the extractions. 
Spark will
+   * handle any extractions that are not pushed down.
+   *
+   * @param extractions Array of variant extractions, one per field extraction 
operation
+   * @return Boolean array indicating which extractions were accepted (same 
length as input)
+   */
+  boolean[] pushVariantExtractions(VariantExtraction[] extractions);
+}
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsPushDownVariants.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsPushDownVariants.java
deleted file mode 100644
index ff82e71bfd58..000000000000
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsPushDownVariants.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.connector.read;
-
-import org.apache.spark.annotation.Evolving;
-
-/**
- * A mix-in interface for {@link Scan}. Data sources can implement this 
interface to
- * support pushing down variant field access operations to the data source.
- * <p>
- * When variant columns are accessed with specific field extractions (e.g., 
variant_get),
- * the optimizer can push these accesses down to the data source. The data 
source can then
- * read only the required fields from variant columns, reducing I/O and 
improving performance.
- * <p>
- * The typical workflow is:
- * <ol>
- *   <li>Optimizer analyzes the query plan and identifies variant field 
accesses</li>
- *   <li>Optimizer calls {@link #pushVariantAccess} with the access 
information</li>
- *   <li>Data source validates and stores the variant access information</li>
- *   <li>Optimizer retrieves pushed information via {@link 
#pushedVariantAccess}</li>
- *   <li>Data source uses the information to optimize reading in {@link 
#readSchema()}
- *   and readers</li>
- * </ol>
- *
- * @since 4.1.0
- */
-@Evolving
-public interface SupportsPushDownVariants extends Scan {
-
-  /**
-   * Pushes down variant field access information to the data source.
-   * <p>
-   * Implementations should validate if the variant accesses can be pushed 
down based on
-   * the data source's capabilities. If some accesses cannot be pushed down, 
the implementation
-   * can choose to:
-   * <ul>
-   *   <li>Push down only the supported accesses and return true</li>
-   *   <li>Reject all pushdown and return false</li>
-   * </ul>
-   * <p>
-   * The implementation should store the variant access information that can 
be pushed down.
-   * The stored information will be retrieved later via {@link 
#pushedVariantAccess()}.
-   *
-   * @param variantAccessInfo Array of variant access information, one per 
variant column
-   * @return true if at least some variant accesses were pushed down, false if 
none were pushed
-   */
-  boolean pushVariantAccess(VariantAccessInfo[] variantAccessInfo);
-
-  /**
-   * Returns the variant access information that has been pushed down to this 
scan.
-   * <p>
-   * This method is called by the optimizer after {@link #pushVariantAccess} 
to retrieve
-   * what variant accesses were actually accepted by the data source. The 
optimizer uses
-   * this information to rewrite the query plan.
-   * <p>
-   * If {@link #pushVariantAccess} was not called or returned false, this 
should return
-   * an empty array.
-   *
-   * @return Array of pushed down variant access information
-   */
-  VariantAccessInfo[] pushedVariantAccess();
-}
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/VariantAccessInfo.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/VariantAccessInfo.java
deleted file mode 100644
index 4f61a42d0519..000000000000
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/VariantAccessInfo.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.connector.read;
-
-import java.io.Serializable;
-import java.util.Objects;
-
-import org.apache.spark.annotation.Evolving;
-import org.apache.spark.sql.types.StructType;
-
-/**
- * Variant access information that describes how variant fields are accessed 
in a query.
- * <p>
- * This class captures the information needed by data sources to optimize 
reading variant columns.
- * Instead of reading the entire variant value, the data source can read only 
the fields that
- * are actually accessed, represented as a structured schema.
- * <p>
- * For example, if a query accesses `variant_get(v, '$.a', 'int')` and
- * `variant_get(v, '$.b', 'string')`, the extracted schema would be
- * `struct&lt;0:int, 1:string&gt;` where field ordinals correspond to the 
access order.
- *
- * @since 4.1.0
- */
-@Evolving
-public final class VariantAccessInfo implements Serializable {
-  private final String columnName;
-  private final StructType extractedSchema;
-
-  /**
-   * Creates variant access information for a variant column.
-   *
-   * @param columnName The name of the variant column
-   * @param extractedSchema The schema representing extracted fields from the 
variant.
-   *                       Each field represents one variant field access, 
with field names
-   *                       typically being ordinals (e.g., "0", "1", "2") and 
metadata
-   *                       containing variant-specific information like JSON 
path.
-   */
-  public VariantAccessInfo(String columnName, StructType extractedSchema) {
-    this.columnName = Objects.requireNonNull(columnName, "columnName cannot be 
null");
-    this.extractedSchema =
-            Objects.requireNonNull(extractedSchema, "extractedSchema cannot be 
null");
-  }
-
-  /**
-   * Returns the name of the variant column.
-   */
-  public String columnName() {
-    return columnName;
-  }
-
-  /**
-   * Returns the schema representing fields extracted from the variant column.
-   * <p>
-   * The schema structure is:
-   * <ul>
-   *   <li>Field names: Typically ordinals ("0", "1", "2", ...) representing 
access order</li>
-   *   <li>Field types: The target data type for each field extraction</li>
-   *   <li>Field metadata: Contains variant-specific information such as JSON 
path,
-   *       timezone, and error handling mode</li>
-   * </ul>
-   * <p>
-   * Data sources should use this schema to determine what fields to extract 
from the variant
-   * and what types they should be converted to.
-   */
-  public StructType extractedSchema() {
-    return extractedSchema;
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
-    VariantAccessInfo that = (VariantAccessInfo) o;
-    return columnName.equals(that.columnName) &&
-           extractedSchema.equals(that.extractedSchema);
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(columnName, extractedSchema);
-  }
-
-  @Override
-  public String toString() {
-    return "VariantAccessInfo{" +
-           "columnName='" + columnName + '\'' +
-           ", extractedSchema=" + extractedSchema +
-           '}';
-  }
-}
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/VariantExtraction.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/VariantExtraction.java
new file mode 100644
index 000000000000..64987299934a
--- /dev/null
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/VariantExtraction.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.read;
+
+import java.io.Serializable;
+
+import org.apache.spark.annotation.Experimental;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Metadata;
+
+/**
+ * Variant extraction information that describes a single field extraction 
from a variant column.
+ * <p>
+ * This interface captures the information needed by data sources to optimize 
reading variant
+ * columns. Each instance represents one field extraction operation (e.g., 
from variant_get or
+ * try_variant_get).
+ * <p>
+ * For example, if a query contains `variant_get(v, '$.a', 'int')`, this would 
be represented
+ * as a VariantExtraction with columnName=["v"], path="$.a", and 
expectedDataType=IntegerType.
+ *
+ * @since 4.1.0
+ */
+@Experimental
+public interface VariantExtraction extends Serializable {
+  /**
+   * Returns the path to the variant column. For top-level variant columns, 
this is a single
+   * element array containing the column name. For nested variant columns 
within structs,
+   * this is an array representing the path (e.g., ["structCol", 
"innerStruct", "variantCol"]).
+   */
+  String[] columnName();
+
+  /**
+   * Returns the expected data type for the extracted value.
+   * This is the target type specified in variant_get (e.g., IntegerType, 
StringType).
+   */
+  DataType expectedDataType();
+
+  /**
+   * Returns the metadata associated with this variant extraction.
+   * This may include additional information needed by the data source:
+   * - "path": the extraction path from variant_get or try_variant_get.
+   *           This follows JSON path syntax (e.g., "$.a", "$.b.c", "$[0]").
+   * - "failOnError": whether the extraction to expected data type should 
throw an exception
+   *                  or return null if the cast fails.
+   * - "timeZoneId": a string identifier of a time zone. It is required by 
timestamp-related casts.
+   *
+   */
+  Metadata metadata();
+}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/VariantExtractionImpl.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/VariantExtractionImpl.scala
new file mode 100644
index 000000000000..87db41a3217e
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/VariantExtractionImpl.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.internal.connector
+
+import org.apache.spark.sql.connector.read.VariantExtraction
+import org.apache.spark.sql.types.{DataType, Metadata}
+
+/**
+ * Implementation of [[VariantExtraction]].
+ *
+ * @param columnName Path to the variant column (e.g., Array("v") for 
top-level,
+ *                   Array("struct1", "v") for nested)
+ * @param metadata The metadata for extraction including JSON path, 
failOnError, and timeZoneId
+ * @param expectedDataType The expected data type for the extracted value
+ */
+case class VariantExtractionImpl(
+    columnName: Array[String],
+    metadata: Metadata,
+    expectedDataType: DataType) extends VariantExtraction {
+
+  require(columnName != null, "columnName cannot be null")
+  require(metadata != null, "metadata cannot be null")
+  require(expectedDataType != null, "expectedDataType cannot be null")
+  require(columnName.nonEmpty, "columnName cannot be empty")
+
+  override def toString: String = {
+    s"VariantExtraction{columnName=${columnName.mkString("[", ", ", "]")}, " +
+      s"metadata='$metadata', expectedDataType=$expectedDataType}"
+  }
+}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PushVariantIntoScan.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PushVariantIntoScan.scala
index 2cf1a5e9b8cd..b0b20d08dccb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PushVariantIntoScan.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PushVariantIntoScan.scala
@@ -26,10 +26,8 @@ import 
org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project, Subquery}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns
-import org.apache.spark.sql.connector.read.{SupportsPushDownVariants, 
VariantAccessInfo}
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 
@@ -225,8 +223,11 @@ class VariantInRelation {
         case Some(variants) =>
           variants.get(path) match {
             case Some(fields) =>
+              // Accessing the full variant value
               addField(fields, RequestedVariantField.fullVariant)
             case _ =>
+              // Accessing the struct containing a variant.
+              // This variant is not eligible for push down.
               // Remove non-eligible variants.
               variants.filterInPlace { case (key, _) => !key.startsWith(path) }
           }
@@ -281,11 +282,6 @@ object PushVariantIntoScan extends Rule[LogicalPlan] {
       relation @ LogicalRelationWithTable(
       hadoopFsRelation@HadoopFsRelation(_, _, _, _, _: ParquetFileFormat, _), 
_)) =>
         rewritePlan(p, projectList, filters, relation, hadoopFsRelation)
-
-      case p@PhysicalOperation(projectList, filters,
-        scanRelation @ DataSourceV2ScanRelation(
-          relation, scan: SupportsPushDownVariants, output, _, _)) =>
-        rewritePlanV2(p, projectList, filters, scanRelation, scan)
     }
   }
 
@@ -333,112 +329,10 @@ object PushVariantIntoScan extends Rule[LogicalPlan] {
       hadoopFsRelation.sparkSession)
     val newRelation = relation.copy(relation = newHadoopFsRelation, output = 
newOutput.toIndexedSeq)
 
-    buildFilterAndProject(newRelation, projectList, filters, variants, 
attributeMap)
-  }
-
-  // DataSource V2 rewrite method using SupportsPushDownVariants API
-  // Key differences from V1 implementation:
-  // 1. V2 uses DataSourceV2ScanRelation instead of LogicalRelation
-  // 2. Uses SupportsPushDownVariants API instead of directly manipulating scan
-  // 3. Schema is already resolved in scanRelation.output (no need for 
relation.resolve())
-  // 4. Scan rebuilding is handled by the scan implementation via the API
-  // Data sources like Delta and Iceberg can implement this API to support 
variant pushdown.
-  private def rewritePlanV2(
-      originalPlan: LogicalPlan,
-      projectList: Seq[NamedExpression],
-      filters: Seq[Expression],
-      scanRelation: DataSourceV2ScanRelation,
-      scan: SupportsPushDownVariants): LogicalPlan = {
-    val variants = new VariantInRelation
-
-    // Extract schema attributes from V2 scan relation
-    val schemaAttributes = scanRelation.output
-
-    // Construct schema for default value resolution
-    val structSchema = StructType(schemaAttributes.map(a =>
-      StructField(a.name, a.dataType, a.nullable, a.metadata)))
-
-    val defaultValues = 
ResolveDefaultColumns.existenceDefaultValues(structSchema)
-
-    // Add variant fields from the V2 scan schema
-    for ((a, defaultValue) <- schemaAttributes.zip(defaultValues)) {
-      variants.addVariantFields(a.exprId, a.dataType, defaultValue, Nil)
-    }
-    if (variants.mapping.isEmpty) return originalPlan
-
-    // Collect requested fields from project list and filters
-    projectList.foreach(variants.collectRequestedFields)
-    filters.foreach(variants.collectRequestedFields)
-
-    // If no variant columns remain after collection, return original plan
-    if (variants.mapping.forall(_._2.isEmpty)) return originalPlan
-
-    // Build VariantAccessInfo array for the API
-    val variantAccessInfoArray = schemaAttributes.flatMap { attr =>
-      variants.mapping.get(attr.exprId).flatMap(_.get(Nil)).map { fields =>
-        // Build extracted schema for this variant column
-        val extractedFields = fields.toArray.sortBy(_._2).map { case (field, 
ordinal) =>
-          StructField(ordinal.toString, field.targetType, metadata = 
field.path.toMetadata)
-        }
-        val extractedSchema = if (extractedFields.isEmpty) {
-          // Add placeholder field to avoid empty struct
-          val placeholder = VariantMetadata("$.__placeholder_field__",
-            failOnError = false, timeZoneId = "UTC")
-          StructType(Array(StructField("0", BooleanType, metadata = 
placeholder.toMetadata)))
-        } else {
-          StructType(extractedFields)
-        }
-        new VariantAccessInfo(attr.name, extractedSchema)
-      }
-    }.toArray
-
-    // Call the API to push down variant access
-    if (variantAccessInfoArray.isEmpty) return originalPlan
-
-    val pushed = scan.pushVariantAccess(variantAccessInfoArray)
-    if (!pushed) return originalPlan
-
-    // Get what was actually pushed
-    val pushedVariantAccess = scan.pushedVariantAccess()
-    if (pushedVariantAccess.isEmpty) return originalPlan
-
-    // Build new attribute mapping based on pushed variant access
-    val pushedColumnNames = pushedVariantAccess.map(_.columnName()).toSet
-    val attributeMap = schemaAttributes.map { a =>
-      if (pushedColumnNames.contains(a.name) && 
variants.mapping.get(a.exprId).exists(_.nonEmpty)) {
-        val newType = variants.rewriteType(a.exprId, a.dataType, Nil)
-        val newAttr = AttributeReference(a.name, newType, a.nullable, 
a.metadata)(
-          qualifier = a.qualifier)
-        (a.exprId, newAttr)
-      } else {
-        (a.exprId, a)
-      }
-    }.toMap
-
-    val newOutput = scanRelation.output.map(a => 
attributeMap.getOrElse(a.exprId, a))
-
-    // The scan implementation should have updated its readSchema() based on 
the pushed info
-    // We just need to create a new scan relation with the updated output
-    val newScanRelation = scanRelation.copy(
-      output = newOutput
-    )
-
-    buildFilterAndProject(newScanRelation, projectList, filters, variants, 
attributeMap)
-  }
-
-  /**
-   * Build the final Project(Filter(relation)) plan with rewritten expressions.
-   */
-  private def buildFilterAndProject(
-      relation: LogicalPlan,
-      projectList: Seq[NamedExpression],
-      filters: Seq[Expression],
-      variants: VariantInRelation,
-      attributeMap: Map[ExprId, AttributeReference]): LogicalPlan = {
     val withFilter = if (filters.nonEmpty) {
-      Filter(filters.map(variants.rewriteExpr(_, attributeMap)).reduce(And), 
relation)
+      Filter(filters.map(variants.rewriteExpr(_, attributeMap)).reduce(And), 
newRelation)
     } else {
-      relation
+      newRelation
     }
 
     val newProjectList = projectList.map { e =>
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
index 31a98e1ff96c..adfd5ceacd67 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
@@ -21,8 +21,9 @@ import java.util.Locale
 
 import scala.collection.mutable
 
+import org.apache.spark.SparkException
 import org.apache.spark.internal.LogKeys.{AGGREGATE_FUNCTIONS, COLUMN_NAMES, 
GROUP_BY_EXPRS, JOIN_CONDITION, JOIN_TYPE, POST_SCAN_FILTERS, PUSHED_FILTERS, 
RELATION_NAME, RELATION_OUTPUT}
-import org.apache.spark.sql.catalyst.expressions.{aggregate, Alias, And, 
Attribute, AttributeMap, AttributeReference, AttributeSet, Cast, Expression, 
IntegerLiteral, Literal, NamedExpression, PredicateHelper, 
ProjectionOverSchema, SortOrder, SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.{aggregate, Alias, And, 
Attribute, AttributeMap, AttributeReference, AttributeSet, Cast, Expression, 
ExprId, IntegerLiteral, Literal, NamedExpression, PredicateHelper, 
ProjectionOverSchema, SortOrder, SubqueryExpression}
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.optimizer.CollapseProject
 import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, 
ScanOperation}
@@ -32,10 +33,11 @@ import 
org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
 import org.apache.spark.sql.connector.expressions.{SortOrder => V2SortOrder}
 import org.apache.spark.sql.connector.expressions.aggregate.{Aggregation, Avg, 
Count, CountStar, Max, Min, Sum}
 import org.apache.spark.sql.connector.expressions.filter.Predicate
-import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, 
SupportsPushDownAggregates, SupportsPushDownFilters, SupportsPushDownJoin, 
V1Scan}
-import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, 
SupportsPushDownAggregates, SupportsPushDownFilters, SupportsPushDownJoin, 
SupportsPushDownVariantExtractions, V1Scan, VariantExtraction}
+import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, 
VariantInRelation}
+import org.apache.spark.sql.internal.connector.VariantExtractionImpl
 import org.apache.spark.sql.sources
-import org.apache.spark.sql.types.{DataType, DecimalType, IntegerType, 
StructType}
+import org.apache.spark.sql.types.{DataType, DecimalType, IntegerType, 
StructField, StructType}
 import org.apache.spark.sql.util.SchemaUtils._
 import org.apache.spark.util.ArrayImplicits._
 
@@ -49,9 +51,11 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with 
PredicateHelper {
       pushDownFilters,
       pushDownJoin,
       pushDownAggregates,
+      pushDownVariants,
       pushDownLimitAndOffset,
       buildScanWithPushedAggregate,
       buildScanWithPushedJoin,
+      buildScanWithPushedVariants,
       pruneColumns)
 
     pushdownRules.foldLeft(plan) { (newPlan, pushDownRule) =>
@@ -318,6 +322,139 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] 
with PredicateHelper {
     case agg: Aggregate => rewriteAggregate(agg)
   }
 
+  def pushDownVariants(plan: LogicalPlan): LogicalPlan = plan.transformDown {
+    case p@PhysicalOperation(projectList, filters, sHolder @ 
ScanBuilderHolder(_, _,
+        builder: SupportsPushDownVariantExtractions))
+        if 
conf.getConf(org.apache.spark.sql.internal.SQLConf.PUSH_VARIANT_INTO_SCAN) =>
+      pushVariantExtractions(p, projectList, filters, sHolder, builder)
+  }
+
+  /**
+   * Converts an ordinal path to a field name path.
+   *
+   * @param structType The top-level struct type
+   * @param ordinals The ordinal path (e.g., [1, 1] for nested.field)
+   * @return The field name path (e.g., ["nested", "field"])
+   */
+  private def getColumnName(structType: StructType, ordinals: Seq[Int]): 
Seq[String] = {
+    ordinals match {
+      case Seq() =>
+        // Base case: no more ordinals
+        Seq.empty
+      case ordinal +: rest =>
+        // Get the field at this ordinal
+        val field = structType.fields(ordinal)
+        if (rest.isEmpty) {
+          // Last ordinal in the path
+          Seq(field.name)
+        } else {
+          // Recurse into nested struct
+          field.dataType match {
+            case nestedStruct: StructType =>
+              field.name +: getColumnName(nestedStruct, rest)
+            case _ =>
+              throw SparkException.internalError(
+                s"Expected StructType at field '${field.name}' but got 
${field.dataType}")
+          }
+        }
+    }
+  }
+
+  private def pushVariantExtractions(
+      originalPlan: LogicalPlan,
+      projectList: Seq[NamedExpression],
+      filters: Seq[Expression],
+      sHolder: ScanBuilderHolder,
+      builder: SupportsPushDownVariantExtractions): LogicalPlan = {
+    val variants = new VariantInRelation
+
+    // Extract schema attributes from scan builder holder
+    val schemaAttributes = sHolder.output
+
+    // Construct schema for default value resolution
+    val structSchema = StructType(schemaAttributes.map(a =>
+      StructField(a.name, a.dataType, a.nullable, a.metadata)))
+
+    val defaultValues = 
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.
+      existenceDefaultValues(structSchema)
+
+    // Add variant fields from the V2 scan schema
+    for ((a, defaultValue) <- schemaAttributes.zip(defaultValues)) {
+      variants.addVariantFields(a.exprId, a.dataType, defaultValue, Nil)
+    }
+    if (variants.mapping.isEmpty) return originalPlan
+
+    // Collect requested fields from project list and filters
+    projectList.foreach(variants.collectRequestedFields)
+    filters.foreach(variants.collectRequestedFields)
+
+    // If no variant columns remain after collection, return original plan
+    if (variants.mapping.forall(_._2.isEmpty)) return originalPlan
+
+    // Build individual VariantExtraction for each field access
+    // Track which extraction corresponds to which (attr, field, ordinal)
+    val extractionInfo = schemaAttributes.flatMap { topAttr =>
+      val variantFields = variants.mapping.get(topAttr.exprId)
+      if (variantFields.isEmpty || variantFields.get.isEmpty) {
+        // No variant fields for this attribute
+        Seq.empty
+      } else {
+        variantFields.get.toSeq.flatMap { case (pathToVariant, fields) =>
+          val columnName = if (pathToVariant.isEmpty) {
+            Seq(topAttr.name)
+          } else {
+            Seq(topAttr.name) ++
+              getColumnName(topAttr.dataType.asInstanceOf[StructType], 
pathToVariant)
+          }
+          fields.toArray.sortBy(_._2).map { case (field, ordinal) =>
+            val extraction = new VariantExtractionImpl(
+              columnName.toArray,
+              field.path.toMetadata,
+              field.targetType
+            )
+            (extraction, topAttr, field, ordinal)
+          }
+        }
+      }
+    }
+
+    // Call the API to push down variant extractions
+    if (extractionInfo.isEmpty) return originalPlan
+
+    val extractions: Array[VariantExtraction] = 
extractionInfo.map(_._1).toArray
+    val pushedResults = builder.pushVariantExtractions(extractions)
+
+    // Filter to only the accepted extractions
+    val acceptedExtractions = 
extractionInfo.zip(pushedResults).filter(_._2).map(_._1)
+    if (acceptedExtractions.isEmpty) return originalPlan
+
+    // Group accepted extractions by attribute to rebuild the struct schemas
+    val extractionsByAttr = acceptedExtractions.groupBy(_._2)
+    val pushedColumnNames = extractionsByAttr.keys.map(_.name).toSet
+
+    // Build new attribute mapping based on pushed variant extractions
+    val attributeMap = schemaAttributes.map { a =>
+      if (pushedColumnNames.contains(a.name) && 
variants.mapping.get(a.exprId).exists(_.nonEmpty)) {
+        val newType = variants.rewriteType(a.exprId, a.dataType, Nil)
+        val newAttr = AttributeReference(a.name, newType, a.nullable, 
a.metadata)(
+          qualifier = a.qualifier)
+        (a.exprId, newAttr)
+      } else {
+        (a.exprId, a.asInstanceOf[AttributeReference])
+      }
+    }.toMap
+
+    val newOutput = sHolder.output.map(a => attributeMap.getOrElse(a.exprId, 
a))
+
+    // Store the transformation info on the holder for later use
+    sHolder.pushedVariants = Some(variants)
+    sHolder.pushedVariantAttributeMap = attributeMap
+    sHolder.output = newOutput
+
+    // Return the original plan unchanged - transformation happens in 
buildScanWithPushedVariants
+    originalPlan
+  }
+
   private def rewriteAggregate(agg: Aggregate): LogicalPlan = agg.child match {
     case PhysicalOperation(project, Nil, holder @ ScanBuilderHolder(_, _,
         r: SupportsPushDownAggregates)) if 
CollapseProject.canCollapseExpressions(
@@ -589,6 +726,48 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] 
with PredicateHelper {
       Project(projectList, scanRelation)
   }
 
+  def buildScanWithPushedVariants(plan: LogicalPlan): LogicalPlan = 
plan.transform {
+    case p@PhysicalOperation(projectList, filters, holder: ScanBuilderHolder)
+        if holder.pushedVariants.isDefined =>
+      val variants = holder.pushedVariants.get
+      val attributeMap = holder.pushedVariantAttributeMap
+
+      // Build the scan
+      val scan = holder.builder.build()
+      val realOutput = toAttributes(scan.readSchema())
+      val wrappedScan = getWrappedScan(scan, holder)
+      val scanRelation = DataSourceV2ScanRelation(holder.relation, 
wrappedScan, realOutput)
+
+      // Create projection to map real output to expected output (with 
transformed types)
+      val outputProjection = realOutput.zip(holder.output).map { case 
(realAttr, expectedAttr) =>
+        Alias(realAttr, expectedAttr.name)(expectedAttr.exprId)
+      }
+
+      // Rewrite filter expressions using the variant transformation
+      val rewrittenFilters = if (filters.nonEmpty) {
+        val rewrittenFilterExprs = filters.map(variants.rewriteExpr(_, 
attributeMap))
+        Some(rewrittenFilterExprs.reduce(And))
+      } else {
+        None
+      }
+
+      // Rewrite project list expressions using the variant transformation
+      val rewrittenProjectList = projectList.map { e =>
+        val rewritten = variants.rewriteExpr(e, attributeMap)
+        rewritten match {
+          case n: NamedExpression => n
+          // When the variant column is directly selected, we replace the 
attribute
+          // reference with a struct access, which is not a NamedExpression. 
Wrap it with Alias.
+          case _ => Alias(rewritten, e.name)(e.exprId, e.qualifier)
+        }
+      }
+
+      // Build the plan: Project(outputProjection) -> [Filter?] -> scanRelation
+      val withProjection = Project(outputProjection, scanRelation)
+      val withFilter = rewrittenFilters.map(Filter(_, 
withProjection)).getOrElse(withProjection)
+      Project(rewrittenProjectList, withFilter)
+  }
+
   def pruneColumns(plan: LogicalPlan): LogicalPlan = plan.transform {
     case ScanOperation(project, filtersStayUp, filtersPushDown, sHolder: 
ScanBuilderHolder) =>
       // column pruning
@@ -834,6 +1013,10 @@ case class ScanBuilderHolder(
   var joinedRelationsPushedDownOperators: Seq[PushedDownOperators] = 
Seq.empty[PushedDownOperators]
 
   var pushedJoinOutputMap: AttributeMap[Expression] = 
AttributeMap.empty[Expression]
+
+  var pushedVariantAttributeMap: Map[ExprId, AttributeReference] = Map.empty
+
+  var pushedVariants: Option[VariantInRelation] = None
 }
 
 // A wrapper for v1 scan to carry the translated filters and the handled ones, 
along with
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala
index d347cb04f0bc..5a427aad5f89 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala
@@ -25,13 +25,13 @@ import org.apache.parquet.hadoop.ParquetInputFormat
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.connector.expressions.aggregate.Aggregation
-import org.apache.spark.sql.connector.read.{PartitionReaderFactory, 
SupportsPushDownVariants, VariantAccessInfo}
-import org.apache.spark.sql.execution.datasources.{AggregatePushDownUtils, 
PartitioningAwareFileIndex}
+import org.apache.spark.sql.connector.read.{PartitionReaderFactory, 
VariantExtraction}
+import org.apache.spark.sql.execution.datasources.{AggregatePushDownUtils, 
PartitioningAwareFileIndex, VariantMetadata}
 import org.apache.spark.sql.execution.datasources.parquet.{ParquetOptions, 
ParquetReadSupport, ParquetWriteSupport}
 import org.apache.spark.sql.execution.datasources.v2.FileScan
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.Filter
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{BooleanType, DataType, StructField, 
StructType, VariantType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.util.ArrayImplicits._
 import org.apache.spark.util.SerializableConfiguration
@@ -48,8 +48,7 @@ case class ParquetScan(
     pushedAggregate: Option[Aggregation] = None,
     partitionFilters: Seq[Expression] = Seq.empty,
     dataFilters: Seq[Expression] = Seq.empty,
-    pushedVariantAccessInfo: Array[VariantAccessInfo] = Array.empty) extends 
FileScan
-    with SupportsPushDownVariants {
+    pushedVariantExtractions: Array[VariantExtraction] = Array.empty) extends 
FileScan {
   override def isSplitable(path: Path): Boolean = {
     // If aggregate is pushed down, only the file footer will be read once,
     // so file should not be split across multiple tasks.
@@ -58,20 +57,57 @@ case class ParquetScan(
 
   // Build transformed schema if variant pushdown is active
   private def effectiveReadDataSchema: StructType = {
-    if (_pushedVariantAccess.isEmpty) {
+    if (pushedVariantExtractions.isEmpty) {
       readDataSchema
     } else {
-      // Build a mapping from column name to extracted schema
-      val variantSchemaMap = _pushedVariantAccess.map(info =>
-        info.columnName() -> info.extractedSchema()).toMap
-
-      // Transform the read data schema by replacing variant columns with 
their extracted schemas
-      StructType(readDataSchema.map { field =>
-        variantSchemaMap.get(field.name) match {
-          case Some(extractedSchema) => field.copy(dataType = extractedSchema)
-          case None => field
+      rewriteVariantPushdownSchema(readDataSchema)
+    }
+  }
+
+  private def rewriteVariantPushdownSchema(schema: StructType): StructType = {
+    // Group extractions by column name and build extracted schemas
+    val variantSchemaMap: Map[Seq[String], StructType] = 
pushedVariantExtractions
+      .groupBy(e => e.columnName().toSeq)
+      .map { case (colName, extractions) =>
+        // Build struct schema with ordinal-named fields for each extraction
+        var fields = extractions.zipWithIndex.map { case (extraction, idx) =>
+          // Attach VariantMetadata so Parquet reader knows this is a variant 
extraction
+          StructField(idx.toString, extraction.expectedDataType(), nullable = 
true,
+            extraction.metadata())
+        }
+
+        // Avoid producing an empty struct of requested fields. This happens
+        // if the variant is not used, or only used in `IsNotNull/IsNull` 
expressions.
+        // The value of the placeholder field doesn't matter.
+        if (fields.size == 1 && 
fields.head.dataType.isInstanceOf[VariantType]) {
+          val placeholder = VariantMetadata("$.__placeholder_field__",
+            failOnError = false, timeZoneId = "UTC")
+          fields = Array(StructField("0", BooleanType,
+            metadata = placeholder.toMetadata))
         }
-      })
+
+        colName -> StructType(fields)
+      }.toMap
+
+    rewriteType(schema, Seq.empty, variantSchemaMap).asInstanceOf[StructType]
+  }
+
+  private def rewriteType(
+      dataType: DataType,
+      path: Seq[String],
+      mapping: Map[Seq[String], StructType]): DataType = {
+    dataType match {
+      case structType: StructType if 
!VariantMetadata.isVariantStruct(structType) =>
+        val fields = structType.fields.map { field =>
+          mapping.get(path :+ field.name) match {
+            case Some(extractedSchema) =>
+              field.copy(dataType = extractedSchema)
+            case None =>
+              field.copy(dataType = rewriteType(field.dataType, path :+ 
field.name, mapping))
+          }
+        }
+        StructType(fields)
+      case otherType => otherType
     }
   }
 
@@ -84,38 +120,14 @@ case class ParquetScan(
       // super.readSchema() combines readDataSchema + readPartitionSchema
       // Apply variant transformation if variant pushdown is active
       val baseSchema = super.readSchema()
-      if (_pushedVariantAccess.isEmpty) {
+      if (pushedVariantExtractions.isEmpty) {
         baseSchema
       } else {
-        val variantSchemaMap = _pushedVariantAccess.map(info =>
-          info.columnName() -> info.extractedSchema()).toMap
-        StructType(baseSchema.map { field =>
-          variantSchemaMap.get(field.name) match {
-            case Some(extractedSchema) => field.copy(dataType = 
extractedSchema)
-            case None => field
-          }
-        })
+        rewriteVariantPushdownSchema(baseSchema)
       }
     }
   }
 
-  // SupportsPushDownVariants API implementation
-  private var _pushedVariantAccess: Array[VariantAccessInfo] = 
pushedVariantAccessInfo
-
-  override def pushVariantAccess(variantAccessInfo: Array[VariantAccessInfo]): 
Boolean = {
-    // Parquet supports variant pushdown for all variant accesses
-    if (variantAccessInfo.nonEmpty) {
-      _pushedVariantAccess = variantAccessInfo
-      true
-    } else {
-      false
-    }
-  }
-
-  override def pushedVariantAccess(): Array[VariantAccessInfo] = {
-    _pushedVariantAccess
-  }
-
   override def createReaderFactory(): PartitionReaderFactory = {
     val effectiveSchema = effectiveReadDataSchema
     val readDataSchemaAsJson = effectiveSchema.json
@@ -171,8 +183,8 @@ case class ParquetScan(
         pushedAggregate.isEmpty && p.pushedAggregate.isEmpty
       }
       val pushedVariantEqual =
-        
java.util.Arrays.equals(_pushedVariantAccess.asInstanceOf[Array[Object]],
-          p._pushedVariantAccess.asInstanceOf[Array[Object]])
+        
java.util.Arrays.equals(pushedVariantExtractions.asInstanceOf[Array[Object]],
+          p.pushedVariantExtractions.asInstanceOf[Array[Object]])
       super.equals(p) && dataSchema == p.dataSchema && options == p.options &&
         equivalentFilters(pushedFilters, p.pushedFilters) && 
pushedDownAggEqual &&
         pushedVariantEqual
@@ -189,15 +201,17 @@ case class ParquetScan(
   }
 
   override def getMetaData(): Map[String, String] = {
-    val variantAccessStr = if (_pushedVariantAccess.nonEmpty) {
-      _pushedVariantAccess.map(info =>
-        s"${info.columnName()}->${info.extractedSchema()}").mkString("[", ", 
", "]")
+    val variantExtractionStr = if (pushedVariantExtractions.nonEmpty) {
+      pushedVariantExtractions.map { extraction =>
+        val colName = extraction.columnName().mkString(".")
+        s"$colName:${extraction.metadata()}:${extraction.expectedDataType()}"
+      }.mkString("[", ", ", "]")
     } else {
       "[]"
     }
     super.getMetaData() ++ Map("PushedFilters" -> 
seqToString(pushedFilters.toImmutableArraySeq)) ++
       Map("PushedAggregation" -> pushedAggregationsStr) ++
       Map("PushedGroupBy" -> pushedGroupByStr) ++
-      Map("PushedVariantAccess" -> variantAccessStr)
+      Map("PushedVariantExtractions" -> variantExtractionStr)
   }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala
index 01367675e65b..94da53f22934 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala
@@ -22,7 +22,7 @@ import scala.jdk.CollectionConverters._
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
 import org.apache.spark.sql.connector.expressions.aggregate.Aggregation
-import org.apache.spark.sql.connector.read.SupportsPushDownAggregates
+import org.apache.spark.sql.connector.read.{SupportsPushDownAggregates, 
SupportsPushDownVariantExtractions, VariantExtraction}
 import org.apache.spark.sql.execution.datasources.{AggregatePushDownUtils, 
PartitioningAwareFileIndex}
 import org.apache.spark.sql.execution.datasources.parquet.{ParquetFilters, 
SparkToParquetSchemaConverter}
 import org.apache.spark.sql.execution.datasources.v2.FileScanBuilder
@@ -39,7 +39,8 @@ case class ParquetScanBuilder(
     dataSchema: StructType,
     options: CaseInsensitiveStringMap)
   extends FileScanBuilder(sparkSession, fileIndex, dataSchema)
-    with SupportsPushDownAggregates {
+    with SupportsPushDownAggregates
+    with SupportsPushDownVariantExtractions {
   lazy val hadoopConf = {
     val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap
     // Hadoop Configurations are case sensitive.
@@ -50,6 +51,8 @@ case class ParquetScanBuilder(
 
   private var pushedAggregations = Option.empty[Aggregation]
 
+  private var pushedVariantExtractions = Array.empty[VariantExtraction]
+
   override protected val supportsNestedSchemaPruning: Boolean = true
 
   override def pushDataFilters(dataFilters: Array[Filter]): Array[Filter] = {
@@ -99,6 +102,14 @@ case class ParquetScanBuilder(
     }
   }
 
+  // SupportsPushDownVariantExtractions API implementation
+  override def pushVariantExtractions(extractions: Array[VariantExtraction]): 
Array[Boolean] = {
+    // Parquet supports variant pushdown for all variant extractions
+    pushedVariantExtractions = extractions
+    // Return true for all extractions (Parquet can handle all of them)
+    Array.fill(extractions.length)(true)
+  }
+
   override def build(): ParquetScan = {
     // the `finalSchema` is either pruned in pushAggregation (if aggregates are
     // pushed down), or pruned in readDataSchema() (in regular column 
pruning). These
@@ -108,6 +119,6 @@ case class ParquetScanBuilder(
     }
     ParquetScan(sparkSession, hadoopConf, fileIndex, dataSchema, finalSchema,
       readPartitionSchema(), pushedDataFilters, options, pushedAggregations,
-      partitionFilters, dataFilters)
+      partitionFilters, dataFilters, pushedVariantExtractions)
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to