This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 315d6a789 fix: metrics tests for native_datafusion experimental native 
scan (#1445)
315d6a789 is described below

commit 315d6a789d0a36cde677c5e818935663fc398e39
Author: Matt Butrovich <[email protected]>
AuthorDate: Wed Feb 26 18:15:52 2025 -0500

    fix: metrics tests for native_datafusion experimental native scan (#1445)
    
    * Fix ParquetReadSuite scan metrics for native_datafusion.
    
    * Fix CometExecSuite native metrics: scan for native_datafusion.
---
 .../org/apache/comet/exec/CometExecSuite.scala     | 94 +++++++++++++---------
 .../apache/comet/parquet/ParquetReadSuite.scala    | 17 +++-
 2 files changed, 73 insertions(+), 38 deletions(-)

diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala 
b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
index d91631532..1c5395d92 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
@@ -28,12 +28,12 @@ import org.scalactic.source.Position
 import org.scalatest.Tag
 
 import org.apache.hadoop.fs.Path
-import org.apache.spark.sql.{AnalysisException, Column, CometTestBase, 
DataFrame, DataFrameWriter, Row, SaveMode}
+import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
 import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStatistics, 
CatalogTable}
 import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo, 
Hex}
 import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateMode, 
BloomFilterAggregate}
-import org.apache.spark.sql.comet.{CometBroadcastExchangeExec, 
CometBroadcastHashJoinExec, CometCollectLimitExec, CometFilterExec, 
CometHashAggregateExec, CometHashJoinExec, CometNativeScanExec, 
CometProjectExec, CometScanExec, CometSortExec, CometSortMergeJoinExec, 
CometSparkToColumnarExec, CometTakeOrderedAndProjectExec}
+import org.apache.spark.sql.comet._
 import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, 
CometShuffleExchangeExec}
 import org.apache.spark.sql.execution.{CollectLimitExec, ProjectExec, 
SQLExecution, UnionExec}
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
@@ -43,7 +43,7 @@ import 
org.apache.spark.sql.execution.joins.{BroadcastNestedLoopJoinExec, Cartes
 import org.apache.spark.sql.execution.reuse.ReuseExchangeAndSubquery
 import org.apache.spark.sql.execution.window.WindowExec
 import org.apache.spark.sql.expressions.Window
-import org.apache.spark.sql.functions.{col, count, date_add, expr, lead, sum}
+import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.SQLConf.SESSION_LOCAL_TIMEZONE
 import org.apache.spark.unsafe.types.UTF8String
@@ -53,6 +53,7 @@ import 
org.apache.comet.CometSparkSessionExtensions.{isSpark33Plus, isSpark34Plu
 import org.apache.comet.testing.{DataGenOptions, ParquetGenerator}
 
 class CometExecSuite extends CometTestBase {
+
   import testImplicits._
 
   override protected def test(testName: String, testTags: Tag*)(testFun: => 
Any)(implicit
@@ -216,18 +217,18 @@ class CometExecSuite extends CometTestBase {
 
     withTable("t1") {
       sql("""
-            |CREATE TABLE t1 USING PARQUET
-            |AS SELECT * FROM VALUES
-            |(1, "a"),
-            |(2, "a"),
-            |(3, "a") t(id, value)
-            |""".stripMargin)
+          |CREATE TABLE t1 USING PARQUET
+          |AS SELECT * FROM VALUES
+          |(1, "a"),
+          |(2, "a"),
+          |(3, "a") t(id, value)
+          |""".stripMargin)
       val df = sql("""
-                     |WITH t2 AS (
-                     |  SELECT * FROM t1 ORDER BY id
-                     |)
-                     |SELECT *, (SELECT COUNT(*) FROM t2) FROM t2 LIMIT 10
-                     |""".stripMargin)
+          |WITH t2 AS (
+          |  SELECT * FROM t1 ORDER BY id
+          |)
+          |SELECT *, (SELECT COUNT(*) FROM t2) FROM t2 LIMIT 10
+          |""".stripMargin)
       checkSparkAnswer(df)
     }
   }
@@ -532,15 +533,34 @@ class CometExecSuite extends CometTestBase {
           val df = sql("SELECT * FROM tbl WHERE _2 > _3")
           df.collect()
 
-          val metrics = find(df.queryExecution.executedPlan)(s =>
+          find(df.queryExecution.executedPlan)(s =>
             s.isInstanceOf[CometScanExec] || 
s.isInstanceOf[CometNativeScanExec])
-            .map(_.metrics)
-            .get
+            .foreach(scan => {
+              val metrics = scan.metrics
+              scan match {
+                case _: CometScanExec => {
+                  assert(metrics.contains("scanTime"))
+                  assert(metrics.contains("cast_time"))
+                  assert(metrics("scanTime").value > 0)
+                  assert(metrics("cast_time").value > 0)
+                }
+                case _: CometNativeScanExec => {
+                  assert(metrics.contains("time_elapsed_scanning_total"))
+                  assert(metrics.contains("bytes_scanned"))
+                  assert(metrics.contains("output_rows"))
+                  assert(metrics.contains("time_elapsed_opening"))
+                  assert(metrics.contains("time_elapsed_processing"))
+                  assert(metrics.contains("time_elapsed_scanning_until_data"))
+                  assert(metrics("time_elapsed_scanning_total").value > 0)
+                  assert(metrics("bytes_scanned").value > 0)
+                  assert(metrics("output_rows").value == 0)
+                  assert(metrics("time_elapsed_opening").value > 0)
+                  assert(metrics("time_elapsed_processing").value > 0)
+                  assert(metrics("time_elapsed_scanning_until_data").value > 0)
+                }
+              }
+            })
 
-          assert(metrics.contains("scanTime"))
-          assert(metrics.contains("cast_time"))
-          assert(metrics("scanTime").value > 0)
-          assert(metrics("cast_time").value > 0)
         }
       }
     }
@@ -1265,16 +1285,16 @@ class CometExecSuite extends CometTestBase {
              | ) VALUES('a')
         """.stripMargin)
         checkSparkAnswerAndOperator(sql("""
-              | SELECT
-              |   name,
-              |   CAST(part1 AS STRING),
-              |   CAST(part2 as STRING),
-              |   CAST(part3 as STRING),
-              |   part4,
-              |   part5,
-              |   part6,
-              |   part7
-              | FROM t1
+            | SELECT
+            |   name,
+            |   CAST(part1 AS STRING),
+            |   CAST(part2 as STRING),
+            |   CAST(part3 as STRING),
+            |   part4,
+            |   part5,
+            |   part6,
+            |   part7
+            | FROM t1
         """.stripMargin))
 
         val e = intercept[AnalysisException] {
@@ -1504,17 +1524,17 @@ class CometExecSuite extends CometTestBase {
             .saveAsTable("t1")
 
           val df1 = spark.sql("""
-                                |SELECT a, b, ROW_NUMBER() OVER(ORDER BY a, b) 
AS rn
-                                |FROM t1 LIMIT 3
-                                |""".stripMargin)
+              |SELECT a, b, ROW_NUMBER() OVER(ORDER BY a, b) AS rn
+              |FROM t1 LIMIT 3
+              |""".stripMargin)
 
           assert(df1.rdd.getNumPartitions == 1)
           checkSparkAnswerAndOperator(df1, classOf[WindowExec])
 
           val df2 = spark.sql("""
-                                |SELECT b, RANK() OVER(ORDER BY a, b) AS rk, 
DENSE_RANK(b) OVER(ORDER BY a, b) AS s
-                                |FROM t1 LIMIT 2
-                                |""".stripMargin)
+              |SELECT b, RANK() OVER(ORDER BY a, b) AS rk, DENSE_RANK(b) 
OVER(ORDER BY a, b) AS s
+              |FROM t1 LIMIT 2
+              |""".stripMargin)
           assert(df2.rdd.getNumPartitions == 1)
           checkSparkAnswerAndOperator(df2, classOf[WindowExec], 
classOf[ProjectExec])
 
diff --git 
a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala 
b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
index 431847839..b4c09a4a5 100644
--- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
@@ -1026,7 +1026,7 @@ abstract class ParquetReadSuite extends CometTestBase {
   }
 
   test("scan metrics") {
-    val metricNames = Seq(
+    val cometScanMetricNames = Seq(
       "ParquetRowGroups",
       "ParquetNativeDecodeTime",
       "ParquetNativeLoadTime",
@@ -1035,14 +1035,29 @@ abstract class ParquetReadSuite extends CometTestBase {
       "ParquetInputFileReadSize",
       "ParquetInputFileReadThroughput")
 
+    val cometNativeScanMetricNames = Seq(
+      "time_elapsed_scanning_total",
+      "bytes_scanned",
+      "output_rows",
+      "time_elapsed_opening",
+      "time_elapsed_processing",
+      "time_elapsed_scanning_until_data")
+
     withParquetTable((0 until 10000).map(i => (i, i.toDouble)), "tbl") {
       val df = sql("SELECT * FROM tbl WHERE _1 > 0")
       val scans = df.queryExecution.executedPlan collect {
         case s: CometScanExec => s
         case s: CometBatchScanExec => s
+        case s: CometNativeScanExec => s
       }
       assert(scans.size == 1, s"Expect one scan node but found ${scans.size}")
       val metrics = scans.head.metrics
+
+      val metricNames = scans.head match {
+        case _: CometNativeScanExec => cometNativeScanMetricNames
+        case _ => cometScanMetricNames
+      }
+
       metricNames.foreach { metricName =>
         assert(metrics.contains(metricName), s"metric $metricName was not 
found")
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to