This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 175a55d4c2 [spark] Support variant pushdown via 
SupportsPushDownVariantExtractions for Spark 4.1+ (#7913)
175a55d4c2 is described below

commit 175a55d4c21dfc4d6aac0d39b3e3c0e184ea4fca
Author: Zouxxyy <[email protected]>
AuthorDate: Thu May 21 10:32:32 2026 +0800

    [spark] Support variant pushdown via SupportsPushDownVariantExtractions for 
Spark 4.1+ (#7913)
    
    Integrate Spark 4.1+ `SupportsPushDownVariantExtractions` so that
    `variant_get(col, '$.path', type)` is pushed into the Paimon read path
    as a shredded variant struct. Same trait-shadowing pattern as
    `PaimonSupportsRuntimeFiltering` — no-op trait in `paimon-spark-common`,
    4.1 binding overrides at the same FQN. Nested variant columns supported;
    column pruning preserved; full-Variant access on a column rejects
    per-column pushdown.
---
 .../scala/org/apache/paimon/spark/PaimonScan.scala |   2 +
 .../scala/org/apache/paimon/spark/PaimonScan.scala |   2 +
 .../PaimonSupportsPushDownVariantExtractions.scala |  46 +++++++
 .../org/apache/paimon/spark/sql/VariantTest.scala  |  24 +++-
 .../scala/org/apache/paimon/spark/PaimonScan.scala |   2 +
 .../apache/paimon/spark/PaimonScanBuilder.scala    |   9 +-
 .../org/apache/paimon/spark/read/BaseScan.scala    |  19 ++-
 ...PaimonSupportsPushDownVariantExtractions.scala} |  21 ++--
 .../paimon/spark/read/VariantExtractionInfo.scala} |  21 ++--
 .../paimon/spark/read/VariantPushDownUtils.scala   | 127 ++++++++++++++++++++
 .../apache/paimon/spark/sql/VariantTestBase.scala  | 133 +++++++++++++++++++++
 11 files changed, 371 insertions(+), 35 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
index f5a5c8e95e..9af6c8f8b5 100644
--- 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
+++ 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
@@ -20,6 +20,7 @@ package org.apache.paimon.spark
 
 import org.apache.paimon.partition.PartitionPredicate
 import org.apache.paimon.predicate.{FullTextSearch, Predicate, TopN, 
VectorSearch}
+import org.apache.paimon.spark.read.VariantExtractionInfo
 import org.apache.paimon.table.InnerTable
 
 import org.apache.spark.sql.types.StructType
@@ -33,5 +34,6 @@ case class PaimonScan(
     override val pushedTopN: Option[TopN] = None,
     override val pushedVectorSearch: Option[VectorSearch] = None,
     override val pushedFullTextSearch: Option[FullTextSearch] = None,
+    override val pushedVariantExtractions: Map[Seq[String], 
Seq[VariantExtractionInfo]] = Map.empty,
     bucketedScanDisabled: Boolean = true)
   extends PaimonBaseScan(table) {}
diff --git 
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
 
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
index 707ee13459..9bedc9bdf1 100644
--- 
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
+++ 
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
@@ -20,6 +20,7 @@ package org.apache.paimon.spark
 
 import org.apache.paimon.partition.PartitionPredicate
 import org.apache.paimon.predicate.{FullTextSearch, Predicate, TopN, 
VectorSearch}
+import org.apache.paimon.spark.read.VariantExtractionInfo
 import org.apache.paimon.table.{BucketMode, FileStoreTable, InnerTable}
 import org.apache.paimon.table.source.{DataSplit, Split}
 
@@ -37,6 +38,7 @@ case class PaimonScan(
     override val pushedTopN: Option[TopN],
     override val pushedVectorSearch: Option[VectorSearch],
     override val pushedFullTextSearch: Option[FullTextSearch] = None,
+    override val pushedVariantExtractions: Map[Seq[String], 
Seq[VariantExtractionInfo]] = Map.empty,
     bucketedScanDisabled: Boolean = false)
   extends PaimonBaseScan(table)
   with SupportsReportPartitioning {
diff --git 
a/paimon-spark/paimon-spark-4.1/src/main/scala/org/apache/paimon/spark/read/PaimonSupportsPushDownVariantExtractions.scala
 
b/paimon-spark/paimon-spark-4.1/src/main/scala/org/apache/paimon/spark/read/PaimonSupportsPushDownVariantExtractions.scala
new file mode 100644
index 0000000000..ee27ddf34b
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-4.1/src/main/scala/org/apache/paimon/spark/read/PaimonSupportsPushDownVariantExtractions.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.read
+
+import org.apache.paimon.spark.SparkTypeUtils
+
+import 
org.apache.spark.sql.connector.read.{SupportsPushDownVariantExtractions, 
VariantExtraction}
+import org.apache.spark.sql.execution.datasources.VariantMetadata
+import org.apache.spark.sql.types.VariantType
+
+/** Spark 4.1+ binding; shadows the no-op `paimon-spark-common` trait by FQN. 
*/
+trait PaimonSupportsPushDownVariantExtractions extends 
SupportsPushDownVariantExtractions {
+  protected var acceptedVariantExtractions: Map[Seq[String], 
Seq[VariantExtractionInfo]] = Map.empty
+
+  override def pushVariantExtractions(extractions: Array[VariantExtraction]): 
Array[Boolean] = {
+    val decoded = extractions.iterator.map {
+      ex =>
+        val vm = VariantMetadata.fromMetadata(ex.metadata())
+        val info = VariantExtractionInfo(
+          paimonType = SparkTypeUtils.toPaimonType(ex.expectedDataType()),
+          path = vm.path,
+          failOnError = vm.failOnError,
+          timeZoneId = vm.timeZoneId)
+        (ex.columnName().toSeq, info, ex.expectedDataType() == VariantType)
+    }.toIndexedSeq
+    val (newMap, accepted) = VariantPushDownUtils.acceptByPath(decoded)
+    acceptedVariantExtractions = newMap
+    accepted
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/VariantTest.scala
 
b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/VariantTest.scala
index 94e9ac683f..368020422c 100644
--- 
a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/VariantTest.scala
+++ 
b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/VariantTest.scala
@@ -22,12 +22,32 @@ import org.apache.spark.SparkConf
 
 class VariantTest extends VariantTestBase {
   override protected def sparkConf: SparkConf = {
-    super.sparkConf.set("spark.paimon.variant.inferShreddingSchema", "false")
+    super.sparkConf
+      .set("spark.paimon.variant.inferShreddingSchema", "false")
+      .set("spark.sql.variant.pushVariantIntoScan", "false")
   }
 }
 
 class VariantInferShreddingTest extends VariantTestBase {
   override protected def sparkConf: SparkConf = {
-    super.sparkConf.set("spark.paimon.variant.inferShreddingSchema", "true")
+    super.sparkConf
+      .set("spark.paimon.variant.inferShreddingSchema", "true")
+      .set("spark.sql.variant.pushVariantIntoScan", "false")
+  }
+}
+
+class VariantWithPushDownTest extends VariantTestBase {
+  override protected def sparkConf: SparkConf = {
+    super.sparkConf
+      .set("spark.paimon.variant.inferShreddingSchema", "false")
+      .set("spark.sql.variant.pushVariantIntoScan", "true")
+  }
+}
+
+class VariantInferShreddingWithPushDownTest extends VariantTestBase {
+  override protected def sparkConf: SparkConf = {
+    super.sparkConf
+      .set("spark.paimon.variant.inferShreddingSchema", "true")
+      .set("spark.sql.variant.pushVariantIntoScan", "true")
   }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
index 0fa4d73054..a2b04c3ac4 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions.BucketFunctionType
 import org.apache.paimon.partition.PartitionPredicate
 import org.apache.paimon.predicate.{FullTextSearch, Predicate, TopN, 
VectorSearch}
 import org.apache.paimon.spark.commands.BucketExpression.quote
+import org.apache.paimon.spark.read.VariantExtractionInfo
 import org.apache.paimon.table.{BucketMode, FileStoreTable, InnerTable}
 import org.apache.paimon.table.source.{DataSplit, Split}
 
@@ -43,6 +44,7 @@ case class PaimonScan(
     override val pushedTopN: Option[TopN],
     override val pushedVectorSearch: Option[VectorSearch],
     override val pushedFullTextSearch: Option[FullTextSearch] = None,
+    override val pushedVariantExtractions: Map[Seq[String], 
Seq[VariantExtractionInfo]] = Map.empty,
     bucketedScanDisabled: Boolean = false)
   extends PaimonBaseScan(table)
   with SupportsReportPartitioning
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
index 7e1989283b..e6f98363a0 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
@@ -22,7 +22,7 @@ import org.apache.paimon.partition.PartitionPredicate
 import org.apache.paimon.predicate._
 import org.apache.paimon.predicate.SortValue.{NullOrdering, SortDirection}
 import 
org.apache.paimon.spark.aggregate.AggregatePushDownUtils.tryPushdownAggregation
-import org.apache.paimon.spark.read.PaimonLocalScan
+import org.apache.paimon.spark.read.{PaimonLocalScan, 
PaimonSupportsPushDownVariantExtractions}
 import org.apache.paimon.table.{FileStoreTable, InnerTable}
 
 import org.apache.spark.sql.connector.expressions
@@ -35,7 +35,8 @@ import scala.collection.JavaConverters._
 class PaimonScanBuilder(val table: InnerTable)
   extends PaimonBaseScanBuilder
   with SupportsPushDownAggregates
-  with SupportsPushDownTopN {
+  with SupportsPushDownTopN
+  with PaimonSupportsPushDownVariantExtractions {
 
   private var localScan: Option[Scan] = None
 
@@ -150,7 +151,9 @@ class PaimonScanBuilder(val table: InnerTable)
           pushedLimit,
           pushedTopN,
           vectorSearch,
-          fullTextSearch)
+          fullTextSearch,
+          acceptedVariantExtractions
+        )
     }
   }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BaseScan.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BaseScan.scala
index 3237475d69..33b5d4e176 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BaseScan.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BaseScan.scala
@@ -52,6 +52,7 @@ trait BaseScan extends Scan with SupportsReportStatistics 
with Logging {
   def pushedTopN: Option[TopN] = None
   def pushedVectorSearch: Option[VectorSearch] = None
   def pushedFullTextSearch: Option[FullTextSearch] = None
+  def pushedVariantExtractions: Map[Seq[String], Seq[VariantExtractionInfo]] = 
Map.empty
 
   // Runtime push down
   val pushedRuntimePartitionFilters: ListBuffer[PartitionPredicate] = 
ListBuffer.empty
@@ -78,13 +79,17 @@ trait BaseScan extends Scan with SupportsReportStatistics 
with Logging {
     }
   }
 
+  /** Pruned read RowType, with variant fields rewritten if variant pushdown 
was accepted. */
   private[paimon] val (readTableRowType, metadataFields) = {
     requiredSchema.fields.foreach(f => checkMetadataColumn(f.name))
     val (_requiredTableFields, _metadataFields) =
       requiredSchema.fields.partition(field => 
tableRowType.containsField(field.name))
-    val _readTableRowType =
+    val pruned =
       SparkTypeUtils.prunePaimonRowType(StructType(_requiredTableFields), 
tableRowType)
-    (_readTableRowType, _metadataFields)
+    val withVariants =
+      if (pushedVariantExtractions.isEmpty) pruned
+      else VariantPushDownUtils.rewriteRowType(pruned, 
pushedVariantExtractions)
+    (withVariants, _metadataFields)
   }
 
   private def checkMetadataColumn(fieldName: String): Unit = {
@@ -185,6 +190,13 @@ trait BaseScan extends Scan with SupportsReportStatistics 
with Logging {
     } else {
       ""
     }
+    val pushedVariantsStr =
+      if (pushedVariantExtractions.isEmpty) ""
+      else
+        VariantPushDownUtils
+          .describeRewrittenRowType(readTableRowType)
+          .map(s => s", PushedVariants: [$s]")
+          .getOrElse("")
     s"${getClass.getSimpleName}: [${table.name}]" +
       pushedPartitionFiltersStr +
       pushedRuntimePartitionFiltersStr +
@@ -192,6 +204,7 @@ trait BaseScan extends Scan with SupportsReportStatistics 
with Logging {
       pushedTopN.map(topN => s", TopN: [$topN]").getOrElse("") +
       pushedLimit.map(limit => s", Limit: [$limit]").getOrElse("") +
       pushedVectorSearch.map(vs => s", VectorSearch: [$vs]").getOrElse("") +
-      pushedFullTextSearch.map(fts => s", FullTextSearch: 
[$fts]").getOrElse("")
+      pushedFullTextSearch.map(fts => s", FullTextSearch: 
[$fts]").getOrElse("") +
+      pushedVariantsStr
   }
 }
diff --git 
a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/VariantTest.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/PaimonSupportsPushDownVariantExtractions.scala
similarity index 64%
copy from 
paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/VariantTest.scala
copy to 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/PaimonSupportsPushDownVariantExtractions.scala
index 94e9ac683f..04ff78c5cf 100644
--- 
a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/VariantTest.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/PaimonSupportsPushDownVariantExtractions.scala
@@ -16,18 +16,13 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.spark.sql
+package org.apache.paimon.spark.read
 
-import org.apache.spark.SparkConf
-
-class VariantTest extends VariantTestBase {
-  override protected def sparkConf: SparkConf = {
-    super.sparkConf.set("spark.paimon.variant.inferShreddingSchema", "false")
-  }
-}
-
-class VariantInferShreddingTest extends VariantTestBase {
-  override protected def sparkConf: SparkConf = {
-    super.sparkConf.set("spark.paimon.variant.inferShreddingSchema", "true")
-  }
+/**
+ * No-op default for Spark versions without 
`SupportsPushDownVariantExtractions` (3.x / 4.0). The
+ * 4.1 module overrides this trait at the same FQN to mix in the Spark 
interface. See
+ * [[PaimonSupportsRuntimeFiltering]] for the same shadowing pattern.
+ */
+trait PaimonSupportsPushDownVariantExtractions {
+  protected var acceptedVariantExtractions: Map[Seq[String], 
Seq[VariantExtractionInfo]] = Map.empty
 }
diff --git 
a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/VariantTest.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/VariantExtractionInfo.scala
similarity index 64%
copy from 
paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/VariantTest.scala
copy to 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/VariantExtractionInfo.scala
index 94e9ac683f..285c7dd204 100644
--- 
a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/VariantTest.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/VariantExtractionInfo.scala
@@ -16,18 +16,11 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.spark.sql
+package org.apache.paimon.spark.read
 
-import org.apache.spark.SparkConf
-
-class VariantTest extends VariantTestBase {
-  override protected def sparkConf: SparkConf = {
-    super.sparkConf.set("spark.paimon.variant.inferShreddingSchema", "false")
-  }
-}
-
-class VariantInferShreddingTest extends VariantTestBase {
-  override protected def sparkConf: SparkConf = {
-    super.sparkConf.set("spark.paimon.variant.inferShreddingSchema", "true")
-  }
-}
+/** One accepted variant extraction; one field of the projected variant 
struct. */
+case class VariantExtractionInfo(
+    paimonType: org.apache.paimon.types.DataType,
+    path: String,
+    failOnError: Boolean,
+    timeZoneId: String)
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/VariantPushDownUtils.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/VariantPushDownUtils.scala
new file mode 100644
index 0000000000..16abb0e2d9
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/VariantPushDownUtils.scala
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.read
+
+import org.apache.paimon.data.variant.VariantMetadataUtils
+import org.apache.paimon.types.{DataField, RowType}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+/** Variant pushdown business logic, shared across Spark profiles. */
+object VariantPushDownUtils {
+
+  /**
+   * Group extractions by column path. Whole-column rule: if any extraction on 
a path targets the
+   * full Variant type, reject the whole path — Paimon can't reconstruct a 
Variant from shredded
+   * fields, and Spark expects per-column acceptance to be all-or-nothing. 
Within-path order is
+   * preserved (it maps to the projected struct's field index).
+   */
+  def acceptByPath(extractions: IndexedSeq[(Seq[String], 
VariantExtractionInfo, Boolean)])
+      : (Map[Seq[String], Seq[VariantExtractionInfo]], Array[Boolean]) = {
+    val byPath = mutable.LinkedHashMap.empty[Seq[String], 
mutable.ArrayBuffer[Int]]
+    val rejected = mutable.HashSet.empty[Seq[String]]
+
+    var i = 0
+    while (i < extractions.length) {
+      val (path, _, isVariantTarget) = extractions(i)
+      if (path.isEmpty || isVariantTarget) {
+        if (path.nonEmpty) {
+          rejected += path
+        }
+      } else {
+        byPath.getOrElseUpdate(path, mutable.ArrayBuffer.empty).append(i)
+      }
+      i += 1
+    }
+
+    val accepted = new Array[Boolean](extractions.length)
+    val out = mutable.LinkedHashMap.empty[Seq[String], 
Seq[VariantExtractionInfo]]
+    byPath.foreach {
+      case (path, indices) =>
+        if (!rejected.contains(path)) {
+          val infos = indices.iterator.map {
+            idx =>
+              accepted(idx) = true
+              extractions(idx)._2
+          }.toVector
+          out(path) = infos
+        }
+    }
+    (out.toMap, accepted)
+  }
+
+  /**
+   * Replace each variant field at a path in `accepted` with a Paimon variant 
`RowType`; recurse
+   * into nested structs.
+   */
+  def rewriteRowType(rt: RowType, accepted: Map[Seq[String], 
Seq[VariantExtractionInfo]]): RowType =
+    rewriteRowType(rt, accepted, Vector.empty)
+
+  private def rewriteRowType(
+      rt: RowType,
+      accepted: Map[Seq[String], Seq[VariantExtractionInfo]],
+      prefix: Seq[String]): RowType = {
+    val newFields = rt.getFields.asScala.map(f => rewriteField(f, accepted, 
prefix)).asJava
+    new RowType(rt.isNullable, newFields)
+  }
+
+  private def rewriteField(
+      f: DataField,
+      accepted: Map[Seq[String], Seq[VariantExtractionInfo]],
+      prefix: Seq[String]): DataField = {
+    val path = prefix :+ f.name()
+    accepted.get(path) match {
+      case Some(infos) =>
+        val builder = 
VariantMetadataUtils.VariantRowTypeBuilder.builder(f.`type`().isNullable)
+        infos.foreach(i => builder.field(i.paimonType, i.path, i.failOnError, 
i.timeZoneId))
+        f.newType(builder.build())
+      case None =>
+        f.`type`() match {
+          case nested: RowType => f.newType(rewriteRowType(nested, accepted, 
path))
+          case _ => f
+        }
+    }
+  }
+
+  /** "col=[paths], nested.col=[paths]" rendering for `Scan.description()`. */
+  def describeRewrittenRowType(rt: RowType): Option[String] = {
+    val parts = mutable.ArrayBuffer.empty[String]
+    collectVariantPaths(rt, Vector.empty, parts)
+    if (parts.isEmpty) None else Some(parts.mkString(", "))
+  }
+
+  private def collectVariantPaths(
+      rt: RowType,
+      prefix: Seq[String],
+      out: mutable.ArrayBuffer[String]): Unit = {
+    rt.getFields.asScala.foreach {
+      f =>
+        val path = prefix :+ f.name()
+        f.`type`() match {
+          case child: RowType if VariantMetadataUtils.isVariantRowType(child) 
=>
+            val paths = child.getFields.asScala.map(c => 
VariantMetadataUtils.path(c.description()))
+            out.append(s"${path.mkString(".")}=[${paths.mkString(",")}]")
+          case child: RowType =>
+            collectVariantPaths(child, path, out)
+          case _ =>
+        }
+    }
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/VariantTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/VariantTestBase.scala
index e7c3e7a2f4..34c48a5ba4 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/VariantTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/VariantTestBase.scala
@@ -21,9 +21,53 @@ package org.apache.paimon.spark.sql
 import org.apache.paimon.spark.PaimonSparkTestBase
 
 import org.apache.spark.sql.Row
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
+import org.apache.spark.sql.types.{DataType, StructField, StructType}
 
 abstract class VariantTestBase extends PaimonSparkTestBase {
 
+  private def variantPushDownEnabled: Boolean =
+    
spark.conf.getOption("spark.sql.variant.pushVariantIntoScan").contains("true")
+
+  private def scanReadSchemaOf(df: org.apache.spark.sql.DataFrame): StructType 
= {
+    df.queryExecution.optimizedPlan
+      .collectFirst { case DataSourceV2ScanRelation(_, scan, _, _, _) => 
scan.readSchema() }
+      .getOrElse(fail("expected a DataSourceV2ScanRelation in the optimized 
plan"))
+  }
+
+  private def fieldByPath(schema: StructType, path: Seq[String]): StructField 
= {
+    var current: StructType = schema
+    var field: StructField = null
+    path.zipWithIndex.foreach {
+      case (name, i) =>
+        field = current.find(_.name == name).getOrElse(fail(s"missing 
${path.take(i + 1)}"))
+        if (i < path.length - 1) {
+          current = field.dataType match {
+            case s: StructType => s
+            case other => fail(s"${path.take(i + 1)} should be a struct, got 
$other")
+          }
+        }
+    }
+    field
+  }
+
+  // Compare via `typeName` so `paimon-spark-ut` builds under Spark 3.x (no 
VariantType class).
+  private def isVariantType(t: DataType): Boolean = t.typeName == "variant"
+
+  private def assertVariantStruct(field: StructField, expectedFieldCount: 
Int): Unit = {
+    val s = field.dataType match {
+      case s: StructType => s
+      case other =>
+        fail(s"${field.name} should be a struct under variant pushdown, got 
$other")
+    }
+    assert(
+      s.length == expectedFieldCount,
+      s"${field.name} should expose $expectedFieldCount extracted field(s), 
got ${s.fields.toSeq}")
+    assert(
+      !s.fields.exists(f => isVariantType(f.dataType)),
+      s"no extracted field on ${field.name} should still be of Variant type 
when pushdown succeeds")
+  }
+
   test("Paimon Variant: read and write variant") {
     sql("CREATE TABLE T (id INT, v VARIANT)")
     sql("""
@@ -982,4 +1026,93 @@ abstract class VariantTestBase extends 
PaimonSparkTestBase {
       Seq(Row(2, 2))
     )
   }
+
+  test("Paimon Variant pushdown: top-level variant column rewrites readSchema 
iff conf is on") {
+    assume(gteqSpark4_1)
+    sql("CREATE TABLE T (id INT, v VARIANT)")
+    sql("""INSERT INTO T VALUES (1, 
parse_json('{"age":26,"city":"Beijing"}'))""")
+
+    val df =
+      sql("SELECT id, variant_get(v, '$.age', 'int'), variant_get(v, '$.city', 
'string') FROM T")
+    val v = fieldByPath(scanReadSchemaOf(df), Seq("v"))
+    if (variantPushDownEnabled) assertVariantStruct(v, expectedFieldCount = 2)
+    else assert(isVariantType(v.dataType))
+    checkAnswer(df, Seq(Row(1, 26, "Beijing")))
+  }
+
+  test("Paimon Variant pushdown: nested variant column inside a struct") {
+    assume(gteqSpark4_1)
+    sql("CREATE TABLE T (id INT, nested STRUCT<v: VARIANT, x: INT>)")
+    sql("""INSERT INTO T VALUES (1, named_struct('v', 
parse_json('{"age":26}'), 'x', 100))""")
+
+    val df = sql("SELECT id, variant_get(nested.v, '$.age', 'int') FROM T")
+    val v = fieldByPath(scanReadSchemaOf(df), Seq("nested", "v"))
+    if (variantPushDownEnabled) assertVariantStruct(v, expectedFieldCount = 1)
+    else assert(isVariantType(v.dataType))
+    checkAnswer(df, Seq(Row(1, 26)))
+  }
+
+  test("Paimon Variant pushdown: full variant access (SELECT v) blocks 
per-column pushdown") {
+    assume(gteqSpark4_1)
+    sql("CREATE TABLE T (id INT, v VARIANT)")
+    sql("""INSERT INTO T VALUES (1, parse_json('{"age":26}'))""")
+
+    // SELECT v + variant_get(v, ...) requires the full Variant value 
alongside extractions —
+    // Paimon can't reconstruct a Variant from shredded fields, so the entire 
column's
+    // extractions must be rejected, regardless of the pushdown conf.
+    val df = sql("SELECT v, variant_get(v, '$.age', 'int') FROM T")
+    val v = fieldByPath(scanReadSchemaOf(df), Seq("v"))
+    assert(isVariantType(v.dataType))
+    checkAnswer(df, sql("""SELECT parse_json('{"age":26}'), 26"""))
+  }
+
+  test("Paimon Variant pushdown: multiple variant columns rewrite 
independently") {
+    assume(gteqSpark4_1)
+    sql("CREATE TABLE T (id INT, v1 VARIANT, v2 VARIANT)")
+    sql("""INSERT INTO T VALUES (1, parse_json('{"a":1}'), 
parse_json('{"x":"hi","y":"bye"}'))""")
+
+    val df = sql(
+      "SELECT variant_get(v1, '$.a', 'int'), variant_get(v2, '$.x', 'string'), 
variant_get(v2, '$.y', 'string') FROM T")
+    val readSchema = scanReadSchemaOf(df)
+    val v1 = fieldByPath(readSchema, Seq("v1"))
+    val v2 = fieldByPath(readSchema, Seq("v2"))
+    if (variantPushDownEnabled) {
+      assertVariantStruct(v1, expectedFieldCount = 1)
+      assertVariantStruct(v2, expectedFieldCount = 2)
+    } else {
+      assert(isVariantType(v1.dataType))
+      assert(isVariantType(v2.dataType))
+    }
+    checkAnswer(df, Seq(Row(1, "hi", "bye")))
+  }
+
+  test("Paimon Variant pushdown: column pruning still applies for non-variant 
queries") {
+    assume(gteqSpark4_1)
+    sql("CREATE TABLE T (id INT, x INT, y INT, v VARIANT)")
+    sql("""INSERT INTO T VALUES (1, 10, 100, parse_json('{"age":26}'))""")
+
+    // No variant_get → Spark's `pushDownVariants` rule does NOT short-circuit 
`pruneColumns`,
+    // so the scan must report only the projected columns.
+    val df = sql("SELECT id, x FROM T")
+    val readSchema = scanReadSchemaOf(df)
+    assert(readSchema.fieldNames.toSet == Set("id", "x"), s"got 
${readSchema.fieldNames.toSeq}")
+    checkAnswer(df, Seq(Row(1, 10)))
+  }
+
+  test("Paimon Variant pushdown: scan description exposes PushedVariants only 
when active") {
+    assume(gteqSpark4_1)
+    sql("CREATE TABLE T (id INT, v VARIANT)")
+    sql("""INSERT INTO T VALUES (1, parse_json('{"age":26}'))""")
+
+    val df = sql("SELECT variant_get(v, '$.age', 'int') FROM T")
+    val desc = df.queryExecution.optimizedPlan
+      .collectFirst { case DataSourceV2ScanRelation(_, scan, _, _, _) => 
scan.description() }
+      .getOrElse(fail("expected a DataSourceV2ScanRelation in the plan"))
+
+    if (variantPushDownEnabled) {
+      assert(desc.contains("PushedVariants: [v=[$.age]]"), s"got: $desc")
+    } else {
+      assert(!desc.contains("PushedVariants"), s"got: $desc")
+    }
+  }
 }

Reply via email to