This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 71f22f502 fix: enable more Spark SQL tests for `native_datafusion` 
(`DynamicPartitionPruningSuite` / `ExplainSuite`) (#3694)
71f22f502 is described below

commit 71f22f502ca9f80eb34a5cc0fbd1d1b875c7846c
Author: Andy Grove <[email protected]>
AuthorDate: Mon Mar 16 21:43:29 2026 -0600

    fix: enable more Spark SQL tests for `native_datafusion` 
(`DynamicPartitionPruningSuite` / `ExplainSuite`) (#3694)
---
 dev/diffs/3.5.8.diff                               | 41 +++++++---------------
 .../spark/sql/comet/CometNativeScanExec.scala      | 40 ++++++++++++++++++++-
 2 files changed, 51 insertions(+), 30 deletions(-)

diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff
index 3ed7d9ce1..619898e19 100644
--- a/dev/diffs/3.5.8.diff
+++ b/dev/diffs/3.5.8.diff
@@ -93,22 +93,23 @@ index 27ae10b3d59..78e69902dfd 100644
 +  }
  }
 diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
-index db587dd9868..aac7295a53d 100644
+index db587dd9868..33802f29253 100644
 --- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
 +++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
 @@ -18,6 +18,7 @@
  package org.apache.spark.sql.execution
  
  import org.apache.spark.annotation.DeveloperApi
-+import org.apache.spark.sql.comet.CometScanExec
++import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec}
  import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, 
QueryStageExec}
  import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
  import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
-@@ -67,6 +68,7 @@ private[execution] object SparkPlanInfo {
+@@ -67,6 +68,8 @@ private[execution] object SparkPlanInfo {
      // dump the file scan metadata (e.g file path) to event log
      val metadata = plan match {
        case fileScan: FileSourceScanExec => fileScan.metadata
 +      case cometScan: CometScanExec => cometScan.metadata
++      case nativeScan: CometNativeScanExec => nativeScan.metadata
        case _ => Map[String, String]()
      }
      new SparkPlanInfo(
@@ -396,14 +397,14 @@ index c4fb4fa943c..a04b23870a8 100644
      assert(exchanges.size == 2)
    }
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
-index f33432ddb6f..42eb9fd1cb7 100644
+index f33432ddb6f..4acdf7e9cfb 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
 @@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen
  import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, 
Expression}
  import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode._
  import org.apache.spark.sql.catalyst.plans.ExistenceJoin
-+import org.apache.spark.sql.comet.CometScanExec
++import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec}
  import org.apache.spark.sql.connector.catalog.{InMemoryTableCatalog, 
InMemoryTableWithV2FilterCatalog}
  import org.apache.spark.sql.execution._
  import org.apache.spark.sql.execution.adaptive._
@@ -447,40 +448,22 @@ index f33432ddb6f..42eb9fd1cb7 100644
      withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> 
"true") {
        val df = sql(
          """ WITH v as (
-@@ -1698,7 +1705,8 @@ abstract class DynamicPartitionPruningV1Suite extends 
DynamicPartitionPruningDat
-    * Check the static scan metrics with and without DPP
-    */
-   test("static scan metrics",
--    DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) {
-+    DisableAdaptiveExecution("DPP in AQE must reuse broadcast"),
-+    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3313";))
 {
-     withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
-       SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
-       SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") {
-@@ -1729,6 +1737,8 @@ abstract class DynamicPartitionPruningV1Suite extends 
DynamicPartitionPruningDat
+@@ -1729,6 +1736,10 @@ abstract class DynamicPartitionPruningV1Suite extends 
DynamicPartitionPruningDat
                case s: BatchScanExec =>
                  // we use f1 col for v2 tables due to schema pruning
                  s.output.exists(_.exists(_.argString(maxFields = 
100).contains("f1")))
 +              case s: CometScanExec =>
++                s.output.exists(_.exists(_.argString(maxFields = 
100).contains("fid")))
++              case s: CometNativeScanExec =>
 +                s.output.exists(_.exists(_.argString(maxFields = 
100).contains("fid")))
                case _ => false
              }
            assert(scanOption.isDefined)
 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
-index a206e97c353..79813d8e259 100644
+index a206e97c353..fea1149b67d 100644
 --- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
 +++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
-@@ -280,7 +280,8 @@ class ExplainSuite extends ExplainSuiteHelper with 
DisableAdaptiveExecutionSuite
-     }
-   }
- 
--  test("explain formatted - check presence of subquery in case of DPP") {
-+  test("explain formatted - check presence of subquery in case of DPP",
-+    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3313";))
 {
-     withTable("df1", "df2") {
-       withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
-         SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
-@@ -467,7 +468,8 @@ class ExplainSuite extends ExplainSuiteHelper with 
DisableAdaptiveExecutionSuite
+@@ -467,7 +467,8 @@ class ExplainSuite extends ExplainSuiteHelper with 
DisableAdaptiveExecutionSuite
      }
    }
  
@@ -490,7 +473,7 @@ index a206e97c353..79813d8e259 100644
      withTempDir { dir =>
        Seq("parquet", "orc", "csv", "json").foreach { fmt =>
          val basePath = dir.getCanonicalPath + "/" + fmt
-@@ -545,7 +547,9 @@ class ExplainSuite extends ExplainSuiteHelper with 
DisableAdaptiveExecutionSuite
+@@ -545,7 +546,9 @@ class ExplainSuite extends ExplainSuiteHelper with 
DisableAdaptiveExecutionSuite
    }
  }
  
diff --git 
a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala 
b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala
index 909384c09..dcb975ac7 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala
@@ -78,6 +78,33 @@ case class CometNativeScanExec(
   override val nodeName: String =
     s"CometNativeScan $relation 
${tableIdentifier.map(_.unquotedString).getOrElse("")}"
 
+  override def verboseStringWithOperatorId(): String = {
+    val metadataStr = metadata.toSeq.sorted
+      .filterNot {
+        case (_, value) if (value.isEmpty || value.equals("[]")) => true
+        case (key, _) if (key.equals("DataFilters") || key.equals("Format")) 
=> true
+        case (_, _) => false
+      }
+      .map {
+        case (key, _) if (key.equals("Location")) =>
+          val location = relation.location
+          val numPaths = location.rootPaths.length
+          val abbreviatedLocation = if (numPaths <= 1) {
+            location.rootPaths.mkString("[", ", ", "]")
+          } else {
+            "[" + location.rootPaths.head + s", ... ${numPaths - 1} entries]"
+          }
+          s"$key: ${location.getClass.getSimpleName} 
${redact(abbreviatedLocation)}"
+        case (key, value) => s"$key: ${redact(value)}"
+      }
+
+    s"""
+       |$formattedNodeName
+       |${ExplainUtils.generateFieldString("Output", output)}
+       |${metadataStr.mkString("\n")}
+       |""".stripMargin
+  }
+
   // exposed for testing
   lazy val bucketedScan: Boolean = originalPlan.bucketedScan && 
!disableBucketedScan
 
@@ -202,13 +229,24 @@ case class CometNativeScanExec(
 
   override def hashCode(): Int = Objects.hashCode(originalPlan, 
serializedPlanOpt)
 
+  private val driverMetricKeys =
+    Set(
+      "numFiles",
+      "filesSize",
+      "numPartitions",
+      "metadataTime",
+      "staticFilesNum",
+      "staticFilesSize",
+      "pruningTime")
+
   override lazy val metrics: Map[String, SQLMetric] = {
     val nativeMetrics = CometMetricNode.nativeScanMetrics(session.sparkContext)
     // Map native metric names to Spark metric names
-    nativeMetrics.get("output_rows") match {
+    val withAlias = nativeMetrics.get("output_rows") match {
       case Some(metric) => nativeMetrics + ("numOutputRows" -> metric)
       case None => nativeMetrics
     }
+    withAlias ++ scan.metrics.filterKeys(driverMetricKeys)
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to