This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 175a55d4c2 [spark] Support variant pushdown via
SupportsPushDownVariantExtractions for Spark 4.1+ (#7913)
175a55d4c2 is described below
commit 175a55d4c21dfc4d6aac0d39b3e3c0e184ea4fca
Author: Zouxxyy <[email protected]>
AuthorDate: Thu May 21 10:32:32 2026 +0800
[spark] Support variant pushdown via SupportsPushDownVariantExtractions for
Spark 4.1+ (#7913)
Integrate Spark 4.1+ `SupportsPushDownVariantExtractions` so that
`variant_get(col, '$.path', type)` is pushed into the Paimon read path
as a shredded variant struct. Same trait-shadowing pattern as
`PaimonSupportsRuntimeFiltering` — no-op trait in `paimon-spark-common`,
4.1 binding overrides at the same FQN. Nested variant columns supported;
column pruning preserved; full-Variant access on a column rejects
per-column pushdown.
---
.../scala/org/apache/paimon/spark/PaimonScan.scala | 2 +
.../scala/org/apache/paimon/spark/PaimonScan.scala | 2 +
.../PaimonSupportsPushDownVariantExtractions.scala | 46 +++++++
.../org/apache/paimon/spark/sql/VariantTest.scala | 24 +++-
.../scala/org/apache/paimon/spark/PaimonScan.scala | 2 +
.../apache/paimon/spark/PaimonScanBuilder.scala | 9 +-
.../org/apache/paimon/spark/read/BaseScan.scala | 19 ++-
...PaimonSupportsPushDownVariantExtractions.scala} | 21 ++--
.../paimon/spark/read/VariantExtractionInfo.scala} | 21 ++--
.../paimon/spark/read/VariantPushDownUtils.scala | 127 ++++++++++++++++++++
.../apache/paimon/spark/sql/VariantTestBase.scala | 133 +++++++++++++++++++++
11 files changed, 371 insertions(+), 35 deletions(-)
diff --git
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
index f5a5c8e95e..9af6c8f8b5 100644
---
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
+++
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
@@ -20,6 +20,7 @@ package org.apache.paimon.spark
import org.apache.paimon.partition.PartitionPredicate
import org.apache.paimon.predicate.{FullTextSearch, Predicate, TopN,
VectorSearch}
+import org.apache.paimon.spark.read.VariantExtractionInfo
import org.apache.paimon.table.InnerTable
import org.apache.spark.sql.types.StructType
@@ -33,5 +34,6 @@ case class PaimonScan(
override val pushedTopN: Option[TopN] = None,
override val pushedVectorSearch: Option[VectorSearch] = None,
override val pushedFullTextSearch: Option[FullTextSearch] = None,
+ override val pushedVariantExtractions: Map[Seq[String],
Seq[VariantExtractionInfo]] = Map.empty,
bucketedScanDisabled: Boolean = true)
extends PaimonBaseScan(table) {}
diff --git
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
index 707ee13459..9bedc9bdf1 100644
---
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
+++
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
@@ -20,6 +20,7 @@ package org.apache.paimon.spark
import org.apache.paimon.partition.PartitionPredicate
import org.apache.paimon.predicate.{FullTextSearch, Predicate, TopN,
VectorSearch}
+import org.apache.paimon.spark.read.VariantExtractionInfo
import org.apache.paimon.table.{BucketMode, FileStoreTable, InnerTable}
import org.apache.paimon.table.source.{DataSplit, Split}
@@ -37,6 +38,7 @@ case class PaimonScan(
override val pushedTopN: Option[TopN],
override val pushedVectorSearch: Option[VectorSearch],
override val pushedFullTextSearch: Option[FullTextSearch] = None,
+ override val pushedVariantExtractions: Map[Seq[String],
Seq[VariantExtractionInfo]] = Map.empty,
bucketedScanDisabled: Boolean = false)
extends PaimonBaseScan(table)
with SupportsReportPartitioning {
diff --git
a/paimon-spark/paimon-spark-4.1/src/main/scala/org/apache/paimon/spark/read/PaimonSupportsPushDownVariantExtractions.scala
b/paimon-spark/paimon-spark-4.1/src/main/scala/org/apache/paimon/spark/read/PaimonSupportsPushDownVariantExtractions.scala
new file mode 100644
index 0000000000..ee27ddf34b
--- /dev/null
+++
b/paimon-spark/paimon-spark-4.1/src/main/scala/org/apache/paimon/spark/read/PaimonSupportsPushDownVariantExtractions.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.read
+
+import org.apache.paimon.spark.SparkTypeUtils
+
+import
org.apache.spark.sql.connector.read.{SupportsPushDownVariantExtractions,
VariantExtraction}
+import org.apache.spark.sql.execution.datasources.VariantMetadata
+import org.apache.spark.sql.types.VariantType
+
+/** Spark 4.1+ binding; shadows the no-op `paimon-spark-common` trait by FQN.
*/
+trait PaimonSupportsPushDownVariantExtractions extends
SupportsPushDownVariantExtractions {
+ protected var acceptedVariantExtractions: Map[Seq[String],
Seq[VariantExtractionInfo]] = Map.empty
+
+ override def pushVariantExtractions(extractions: Array[VariantExtraction]):
Array[Boolean] = {
+ val decoded = extractions.iterator.map {
+ ex =>
+ val vm = VariantMetadata.fromMetadata(ex.metadata())
+ val info = VariantExtractionInfo(
+ paimonType = SparkTypeUtils.toPaimonType(ex.expectedDataType()),
+ path = vm.path,
+ failOnError = vm.failOnError,
+ timeZoneId = vm.timeZoneId)
+ (ex.columnName().toSeq, info, ex.expectedDataType() == VariantType)
+ }.toIndexedSeq
+ val (newMap, accepted) = VariantPushDownUtils.acceptByPath(decoded)
+ acceptedVariantExtractions = newMap
+ accepted
+ }
+}
diff --git
a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/VariantTest.scala
b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/VariantTest.scala
index 94e9ac683f..368020422c 100644
---
a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/VariantTest.scala
+++
b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/VariantTest.scala
@@ -22,12 +22,32 @@ import org.apache.spark.SparkConf
class VariantTest extends VariantTestBase {
override protected def sparkConf: SparkConf = {
- super.sparkConf.set("spark.paimon.variant.inferShreddingSchema", "false")
+ super.sparkConf
+ .set("spark.paimon.variant.inferShreddingSchema", "false")
+ .set("spark.sql.variant.pushVariantIntoScan", "false")
}
}
class VariantInferShreddingTest extends VariantTestBase {
override protected def sparkConf: SparkConf = {
- super.sparkConf.set("spark.paimon.variant.inferShreddingSchema", "true")
+ super.sparkConf
+ .set("spark.paimon.variant.inferShreddingSchema", "true")
+ .set("spark.sql.variant.pushVariantIntoScan", "false")
+ }
+}
+
+class VariantWithPushDownTest extends VariantTestBase {
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf
+ .set("spark.paimon.variant.inferShreddingSchema", "false")
+ .set("spark.sql.variant.pushVariantIntoScan", "true")
+ }
+}
+
+class VariantInferShreddingWithPushDownTest extends VariantTestBase {
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf
+ .set("spark.paimon.variant.inferShreddingSchema", "true")
+ .set("spark.sql.variant.pushVariantIntoScan", "true")
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
index 0fa4d73054..a2b04c3ac4 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions.BucketFunctionType
import org.apache.paimon.partition.PartitionPredicate
import org.apache.paimon.predicate.{FullTextSearch, Predicate, TopN,
VectorSearch}
import org.apache.paimon.spark.commands.BucketExpression.quote
+import org.apache.paimon.spark.read.VariantExtractionInfo
import org.apache.paimon.table.{BucketMode, FileStoreTable, InnerTable}
import org.apache.paimon.table.source.{DataSplit, Split}
@@ -43,6 +44,7 @@ case class PaimonScan(
override val pushedTopN: Option[TopN],
override val pushedVectorSearch: Option[VectorSearch],
override val pushedFullTextSearch: Option[FullTextSearch] = None,
+ override val pushedVariantExtractions: Map[Seq[String],
Seq[VariantExtractionInfo]] = Map.empty,
bucketedScanDisabled: Boolean = false)
extends PaimonBaseScan(table)
with SupportsReportPartitioning
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
index 7e1989283b..e6f98363a0 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
@@ -22,7 +22,7 @@ import org.apache.paimon.partition.PartitionPredicate
import org.apache.paimon.predicate._
import org.apache.paimon.predicate.SortValue.{NullOrdering, SortDirection}
import
org.apache.paimon.spark.aggregate.AggregatePushDownUtils.tryPushdownAggregation
-import org.apache.paimon.spark.read.PaimonLocalScan
+import org.apache.paimon.spark.read.{PaimonLocalScan,
PaimonSupportsPushDownVariantExtractions}
import org.apache.paimon.table.{FileStoreTable, InnerTable}
import org.apache.spark.sql.connector.expressions
@@ -35,7 +35,8 @@ import scala.collection.JavaConverters._
class PaimonScanBuilder(val table: InnerTable)
extends PaimonBaseScanBuilder
with SupportsPushDownAggregates
- with SupportsPushDownTopN {
+ with SupportsPushDownTopN
+ with PaimonSupportsPushDownVariantExtractions {
private var localScan: Option[Scan] = None
@@ -150,7 +151,9 @@ class PaimonScanBuilder(val table: InnerTable)
pushedLimit,
pushedTopN,
vectorSearch,
- fullTextSearch)
+ fullTextSearch,
+ acceptedVariantExtractions
+ )
}
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BaseScan.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BaseScan.scala
index 3237475d69..33b5d4e176 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BaseScan.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BaseScan.scala
@@ -52,6 +52,7 @@ trait BaseScan extends Scan with SupportsReportStatistics
with Logging {
def pushedTopN: Option[TopN] = None
def pushedVectorSearch: Option[VectorSearch] = None
def pushedFullTextSearch: Option[FullTextSearch] = None
+ def pushedVariantExtractions: Map[Seq[String], Seq[VariantExtractionInfo]] =
Map.empty
// Runtime push down
val pushedRuntimePartitionFilters: ListBuffer[PartitionPredicate] =
ListBuffer.empty
@@ -78,13 +79,17 @@ trait BaseScan extends Scan with SupportsReportStatistics
with Logging {
}
}
+ /** Pruned read RowType, with variant fields rewritten if variant pushdown
was accepted. */
private[paimon] val (readTableRowType, metadataFields) = {
requiredSchema.fields.foreach(f => checkMetadataColumn(f.name))
val (_requiredTableFields, _metadataFields) =
requiredSchema.fields.partition(field =>
tableRowType.containsField(field.name))
- val _readTableRowType =
+ val pruned =
SparkTypeUtils.prunePaimonRowType(StructType(_requiredTableFields),
tableRowType)
- (_readTableRowType, _metadataFields)
+ val withVariants =
+ if (pushedVariantExtractions.isEmpty) pruned
+ else VariantPushDownUtils.rewriteRowType(pruned,
pushedVariantExtractions)
+ (withVariants, _metadataFields)
}
private def checkMetadataColumn(fieldName: String): Unit = {
@@ -185,6 +190,13 @@ trait BaseScan extends Scan with SupportsReportStatistics
with Logging {
} else {
""
}
+ val pushedVariantsStr =
+ if (pushedVariantExtractions.isEmpty) ""
+ else
+ VariantPushDownUtils
+ .describeRewrittenRowType(readTableRowType)
+ .map(s => s", PushedVariants: [$s]")
+ .getOrElse("")
s"${getClass.getSimpleName}: [${table.name}]" +
pushedPartitionFiltersStr +
pushedRuntimePartitionFiltersStr +
@@ -192,6 +204,7 @@ trait BaseScan extends Scan with SupportsReportStatistics
with Logging {
pushedTopN.map(topN => s", TopN: [$topN]").getOrElse("") +
pushedLimit.map(limit => s", Limit: [$limit]").getOrElse("") +
pushedVectorSearch.map(vs => s", VectorSearch: [$vs]").getOrElse("") +
- pushedFullTextSearch.map(fts => s", FullTextSearch:
[$fts]").getOrElse("")
+ pushedFullTextSearch.map(fts => s", FullTextSearch:
[$fts]").getOrElse("") +
+ pushedVariantsStr
}
}
diff --git
a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/VariantTest.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/PaimonSupportsPushDownVariantExtractions.scala
similarity index 64%
copy from
paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/VariantTest.scala
copy to
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/PaimonSupportsPushDownVariantExtractions.scala
index 94e9ac683f..04ff78c5cf 100644
---
a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/VariantTest.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/PaimonSupportsPushDownVariantExtractions.scala
@@ -16,18 +16,13 @@
* limitations under the License.
*/
-package org.apache.paimon.spark.sql
+package org.apache.paimon.spark.read
-import org.apache.spark.SparkConf
-
-class VariantTest extends VariantTestBase {
- override protected def sparkConf: SparkConf = {
- super.sparkConf.set("spark.paimon.variant.inferShreddingSchema", "false")
- }
-}
-
-class VariantInferShreddingTest extends VariantTestBase {
- override protected def sparkConf: SparkConf = {
- super.sparkConf.set("spark.paimon.variant.inferShreddingSchema", "true")
- }
+/**
+ * No-op default for Spark versions without
`SupportsPushDownVariantExtractions` (3.x / 4.0). The
+ * 4.1 module overrides this trait at the same FQN to mix in the Spark
interface. See
+ * [[PaimonSupportsRuntimeFiltering]] for the same shadowing pattern.
+ */
+trait PaimonSupportsPushDownVariantExtractions {
+ protected var acceptedVariantExtractions: Map[Seq[String],
Seq[VariantExtractionInfo]] = Map.empty
}
diff --git
a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/VariantTest.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/VariantExtractionInfo.scala
similarity index 64%
copy from
paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/VariantTest.scala
copy to
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/VariantExtractionInfo.scala
index 94e9ac683f..285c7dd204 100644
---
a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/VariantTest.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/VariantExtractionInfo.scala
@@ -16,18 +16,11 @@
* limitations under the License.
*/
-package org.apache.paimon.spark.sql
+package org.apache.paimon.spark.read
-import org.apache.spark.SparkConf
-
-class VariantTest extends VariantTestBase {
- override protected def sparkConf: SparkConf = {
- super.sparkConf.set("spark.paimon.variant.inferShreddingSchema", "false")
- }
-}
-
-class VariantInferShreddingTest extends VariantTestBase {
- override protected def sparkConf: SparkConf = {
- super.sparkConf.set("spark.paimon.variant.inferShreddingSchema", "true")
- }
-}
+/** One accepted variant extraction; one field of the projected variant
struct. */
+case class VariantExtractionInfo(
+ paimonType: org.apache.paimon.types.DataType,
+ path: String,
+ failOnError: Boolean,
+ timeZoneId: String)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/VariantPushDownUtils.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/VariantPushDownUtils.scala
new file mode 100644
index 0000000000..16abb0e2d9
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/VariantPushDownUtils.scala
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.read
+
+import org.apache.paimon.data.variant.VariantMetadataUtils
+import org.apache.paimon.types.{DataField, RowType}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+/** Variant pushdown business logic, shared across Spark profiles. */
+object VariantPushDownUtils {
+
+ /**
+ * Group extractions by column path. Whole-column rule: if any extraction on
a path targets the
+ * full Variant type, reject the whole path — Paimon can't reconstruct a
Variant from shredded
+ * fields, and Spark expects per-column acceptance to be all-or-nothing.
Within-path order is
+ * preserved (it maps to the projected struct's field index).
+ */
+ def acceptByPath(extractions: IndexedSeq[(Seq[String],
VariantExtractionInfo, Boolean)])
+ : (Map[Seq[String], Seq[VariantExtractionInfo]], Array[Boolean]) = {
+ val byPath = mutable.LinkedHashMap.empty[Seq[String],
mutable.ArrayBuffer[Int]]
+ val rejected = mutable.HashSet.empty[Seq[String]]
+
+ var i = 0
+ while (i < extractions.length) {
+ val (path, _, isVariantTarget) = extractions(i)
+ if (path.isEmpty || isVariantTarget) {
+ if (path.nonEmpty) {
+ rejected += path
+ }
+ } else {
+ byPath.getOrElseUpdate(path, mutable.ArrayBuffer.empty).append(i)
+ }
+ i += 1
+ }
+
+ val accepted = new Array[Boolean](extractions.length)
+ val out = mutable.LinkedHashMap.empty[Seq[String],
Seq[VariantExtractionInfo]]
+ byPath.foreach {
+ case (path, indices) =>
+ if (!rejected.contains(path)) {
+ val infos = indices.iterator.map {
+ idx =>
+ accepted(idx) = true
+ extractions(idx)._2
+ }.toVector
+ out(path) = infos
+ }
+ }
+ (out.toMap, accepted)
+ }
+
+ /**
+ * Replace each variant field at a path in `accepted` with a Paimon variant
`RowType`; recurse
+ * into nested structs.
+ */
+ def rewriteRowType(rt: RowType, accepted: Map[Seq[String],
Seq[VariantExtractionInfo]]): RowType =
+ rewriteRowType(rt, accepted, Vector.empty)
+
+ private def rewriteRowType(
+ rt: RowType,
+ accepted: Map[Seq[String], Seq[VariantExtractionInfo]],
+ prefix: Seq[String]): RowType = {
+ val newFields = rt.getFields.asScala.map(f => rewriteField(f, accepted,
prefix)).asJava
+ new RowType(rt.isNullable, newFields)
+ }
+
+ private def rewriteField(
+ f: DataField,
+ accepted: Map[Seq[String], Seq[VariantExtractionInfo]],
+ prefix: Seq[String]): DataField = {
+ val path = prefix :+ f.name()
+ accepted.get(path) match {
+ case Some(infos) =>
+ val builder =
VariantMetadataUtils.VariantRowTypeBuilder.builder(f.`type`().isNullable)
+ infos.foreach(i => builder.field(i.paimonType, i.path, i.failOnError,
i.timeZoneId))
+ f.newType(builder.build())
+ case None =>
+ f.`type`() match {
+ case nested: RowType => f.newType(rewriteRowType(nested, accepted,
path))
+ case _ => f
+ }
+ }
+ }
+
+ /** "col=[paths], nested.col=[paths]" rendering for `Scan.description()`. */
+ def describeRewrittenRowType(rt: RowType): Option[String] = {
+ val parts = mutable.ArrayBuffer.empty[String]
+ collectVariantPaths(rt, Vector.empty, parts)
+ if (parts.isEmpty) None else Some(parts.mkString(", "))
+ }
+
+ private def collectVariantPaths(
+ rt: RowType,
+ prefix: Seq[String],
+ out: mutable.ArrayBuffer[String]): Unit = {
+ rt.getFields.asScala.foreach {
+ f =>
+ val path = prefix :+ f.name()
+ f.`type`() match {
+ case child: RowType if VariantMetadataUtils.isVariantRowType(child)
=>
+ val paths = child.getFields.asScala.map(c =>
VariantMetadataUtils.path(c.description()))
+ out.append(s"${path.mkString(".")}=[${paths.mkString(",")}]")
+ case child: RowType =>
+ collectVariantPaths(child, path, out)
+ case _ =>
+ }
+ }
+ }
+}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/VariantTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/VariantTestBase.scala
index e7c3e7a2f4..34c48a5ba4 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/VariantTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/VariantTestBase.scala
@@ -21,9 +21,53 @@ package org.apache.paimon.spark.sql
import org.apache.paimon.spark.PaimonSparkTestBase
import org.apache.spark.sql.Row
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
+import org.apache.spark.sql.types.{DataType, StructField, StructType}
abstract class VariantTestBase extends PaimonSparkTestBase {
+ private def variantPushDownEnabled: Boolean =
+
spark.conf.getOption("spark.sql.variant.pushVariantIntoScan").contains("true")
+
+ private def scanReadSchemaOf(df: org.apache.spark.sql.DataFrame): StructType
= {
+ df.queryExecution.optimizedPlan
+ .collectFirst { case DataSourceV2ScanRelation(_, scan, _, _, _) =>
scan.readSchema() }
+ .getOrElse(fail("expected a DataSourceV2ScanRelation in the optimized
plan"))
+ }
+
+ private def fieldByPath(schema: StructType, path: Seq[String]): StructField
= {
+ var current: StructType = schema
+ var field: StructField = null
+ path.zipWithIndex.foreach {
+ case (name, i) =>
+ field = current.find(_.name == name).getOrElse(fail(s"missing
${path.take(i + 1)}"))
+ if (i < path.length - 1) {
+ current = field.dataType match {
+ case s: StructType => s
+ case other => fail(s"${path.take(i + 1)} should be a struct, got
$other")
+ }
+ }
+ }
+ field
+ }
+
+ // Compare via `typeName` so `paimon-spark-ut` builds under Spark 3.x (no
VariantType class).
+ private def isVariantType(t: DataType): Boolean = t.typeName == "variant"
+
+ private def assertVariantStruct(field: StructField, expectedFieldCount:
Int): Unit = {
+ val s = field.dataType match {
+ case s: StructType => s
+ case other =>
+ fail(s"${field.name} should be a struct under variant pushdown, got
$other")
+ }
+ assert(
+ s.length == expectedFieldCount,
+ s"${field.name} should expose $expectedFieldCount extracted field(s),
got ${s.fields.toSeq}")
+ assert(
+ !s.fields.exists(f => isVariantType(f.dataType)),
+ s"no extracted field on ${field.name} should still be of Variant type
when pushdown succeeds")
+ }
+
test("Paimon Variant: read and write variant") {
sql("CREATE TABLE T (id INT, v VARIANT)")
sql("""
@@ -982,4 +1026,93 @@ abstract class VariantTestBase extends
PaimonSparkTestBase {
Seq(Row(2, 2))
)
}
+
+ test("Paimon Variant pushdown: top-level variant column rewrites readSchema
iff conf is on") {
+ assume(gteqSpark4_1)
+ sql("CREATE TABLE T (id INT, v VARIANT)")
+ sql("""INSERT INTO T VALUES (1,
parse_json('{"age":26,"city":"Beijing"}'))""")
+
+ val df =
+ sql("SELECT id, variant_get(v, '$.age', 'int'), variant_get(v, '$.city',
'string') FROM T")
+ val v = fieldByPath(scanReadSchemaOf(df), Seq("v"))
+ if (variantPushDownEnabled) assertVariantStruct(v, expectedFieldCount = 2)
+ else assert(isVariantType(v.dataType))
+ checkAnswer(df, Seq(Row(1, 26, "Beijing")))
+ }
+
+ test("Paimon Variant pushdown: nested variant column inside a struct") {
+ assume(gteqSpark4_1)
+ sql("CREATE TABLE T (id INT, nested STRUCT<v: VARIANT, x: INT>)")
+ sql("""INSERT INTO T VALUES (1, named_struct('v',
parse_json('{"age":26}'), 'x', 100))""")
+
+ val df = sql("SELECT id, variant_get(nested.v, '$.age', 'int') FROM T")
+ val v = fieldByPath(scanReadSchemaOf(df), Seq("nested", "v"))
+ if (variantPushDownEnabled) assertVariantStruct(v, expectedFieldCount = 1)
+ else assert(isVariantType(v.dataType))
+ checkAnswer(df, Seq(Row(1, 26)))
+ }
+
+ test("Paimon Variant pushdown: full variant access (SELECT v) blocks
per-column pushdown") {
+ assume(gteqSpark4_1)
+ sql("CREATE TABLE T (id INT, v VARIANT)")
+ sql("""INSERT INTO T VALUES (1, parse_json('{"age":26}'))""")
+
+ // SELECT v + variant_get(v, ...) requires the full Variant value
alongside extractions —
+ // Paimon can't reconstruct a Variant from shredded fields, so the entire
column's
+ // extractions must be rejected, regardless of the pushdown conf.
+ val df = sql("SELECT v, variant_get(v, '$.age', 'int') FROM T")
+ val v = fieldByPath(scanReadSchemaOf(df), Seq("v"))
+ assert(isVariantType(v.dataType))
+ checkAnswer(df, sql("""SELECT parse_json('{"age":26}'), 26"""))
+ }
+
+ test("Paimon Variant pushdown: multiple variant columns rewrite
independently") {
+ assume(gteqSpark4_1)
+ sql("CREATE TABLE T (id INT, v1 VARIANT, v2 VARIANT)")
+ sql("""INSERT INTO T VALUES (1, parse_json('{"a":1}'),
parse_json('{"x":"hi","y":"bye"}'))""")
+
+ val df = sql(
+ "SELECT variant_get(v1, '$.a', 'int'), variant_get(v2, '$.x', 'string'),
variant_get(v2, '$.y', 'string') FROM T")
+ val readSchema = scanReadSchemaOf(df)
+ val v1 = fieldByPath(readSchema, Seq("v1"))
+ val v2 = fieldByPath(readSchema, Seq("v2"))
+ if (variantPushDownEnabled) {
+ assertVariantStruct(v1, expectedFieldCount = 1)
+ assertVariantStruct(v2, expectedFieldCount = 2)
+ } else {
+ assert(isVariantType(v1.dataType))
+ assert(isVariantType(v2.dataType))
+ }
+ checkAnswer(df, Seq(Row(1, "hi", "bye")))
+ }
+
+ test("Paimon Variant pushdown: column pruning still applies for non-variant
queries") {
+ assume(gteqSpark4_1)
+ sql("CREATE TABLE T (id INT, x INT, y INT, v VARIANT)")
+ sql("""INSERT INTO T VALUES (1, 10, 100, parse_json('{"age":26}'))""")
+
+ // No variant_get → Spark's `pushDownVariants` rule does NOT short-circuit
`pruneColumns`,
+ // so the scan must report only the projected columns.
+ val df = sql("SELECT id, x FROM T")
+ val readSchema = scanReadSchemaOf(df)
+ assert(readSchema.fieldNames.toSet == Set("id", "x"), s"got
${readSchema.fieldNames.toSeq}")
+ checkAnswer(df, Seq(Row(1, 10)))
+ }
+
+ test("Paimon Variant pushdown: scan description exposes PushedVariants only
when active") {
+ assume(gteqSpark4_1)
+ sql("CREATE TABLE T (id INT, v VARIANT)")
+ sql("""INSERT INTO T VALUES (1, parse_json('{"age":26}'))""")
+
+ val df = sql("SELECT variant_get(v, '$.age', 'int') FROM T")
+ val desc = df.queryExecution.optimizedPlan
+ .collectFirst { case DataSourceV2ScanRelation(_, scan, _, _, _) =>
scan.description() }
+ .getOrElse(fail("expected a DataSourceV2ScanRelation in the plan"))
+
+ if (variantPushDownEnabled) {
+ assert(desc.contains("PushedVariants: [v=[$.age]]"), s"got: $desc")
+ } else {
+ assert(!desc.contains("PushedVariants"), s"got: $desc")
+ }
+ }
}