This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 315d6a789 fix: metrics tests for native_datafusion experimental native
scan (#1445)
315d6a789 is described below
commit 315d6a789d0a36cde677c5e818935663fc398e39
Author: Matt Butrovich <[email protected]>
AuthorDate: Wed Feb 26 18:15:52 2025 -0500
fix: metrics tests for native_datafusion experimental native scan (#1445)
* Fix ParquetReadSuite scan metrics for native_datafusion.
* Fix CometExecSuite native metrics: scan for native_datafusion.
---
.../org/apache/comet/exec/CometExecSuite.scala | 94 +++++++++++++---------
.../apache/comet/parquet/ParquetReadSuite.scala | 17 +++-
2 files changed, 73 insertions(+), 38 deletions(-)
diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
index d91631532..1c5395d92 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
@@ -28,12 +28,12 @@ import org.scalactic.source.Position
import org.scalatest.Tag
import org.apache.hadoop.fs.Path
-import org.apache.spark.sql.{AnalysisException, Column, CometTestBase,
DataFrame, DataFrameWriter, Row, SaveMode}
+import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStatistics,
CatalogTable}
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo,
Hex}
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateMode,
BloomFilterAggregate}
-import org.apache.spark.sql.comet.{CometBroadcastExchangeExec,
CometBroadcastHashJoinExec, CometCollectLimitExec, CometFilterExec,
CometHashAggregateExec, CometHashJoinExec, CometNativeScanExec,
CometProjectExec, CometScanExec, CometSortExec, CometSortMergeJoinExec,
CometSparkToColumnarExec, CometTakeOrderedAndProjectExec}
+import org.apache.spark.sql.comet._
import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle,
CometShuffleExchangeExec}
import org.apache.spark.sql.execution.{CollectLimitExec, ProjectExec,
SQLExecution, UnionExec}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
@@ -43,7 +43,7 @@ import
org.apache.spark.sql.execution.joins.{BroadcastNestedLoopJoinExec, Cartes
import org.apache.spark.sql.execution.reuse.ReuseExchangeAndSubquery
import org.apache.spark.sql.execution.window.WindowExec
import org.apache.spark.sql.expressions.Window
-import org.apache.spark.sql.functions.{col, count, date_add, expr, lead, sum}
+import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.SESSION_LOCAL_TIMEZONE
import org.apache.spark.unsafe.types.UTF8String
@@ -53,6 +53,7 @@ import
org.apache.comet.CometSparkSessionExtensions.{isSpark33Plus, isSpark34Plu
import org.apache.comet.testing.{DataGenOptions, ParquetGenerator}
class CometExecSuite extends CometTestBase {
+
import testImplicits._
override protected def test(testName: String, testTags: Tag*)(testFun: =>
Any)(implicit
@@ -216,18 +217,18 @@ class CometExecSuite extends CometTestBase {
withTable("t1") {
sql("""
- |CREATE TABLE t1 USING PARQUET
- |AS SELECT * FROM VALUES
- |(1, "a"),
- |(2, "a"),
- |(3, "a") t(id, value)
- |""".stripMargin)
+ |CREATE TABLE t1 USING PARQUET
+ |AS SELECT * FROM VALUES
+ |(1, "a"),
+ |(2, "a"),
+ |(3, "a") t(id, value)
+ |""".stripMargin)
val df = sql("""
- |WITH t2 AS (
- | SELECT * FROM t1 ORDER BY id
- |)
- |SELECT *, (SELECT COUNT(*) FROM t2) FROM t2 LIMIT 10
- |""".stripMargin)
+ |WITH t2 AS (
+ | SELECT * FROM t1 ORDER BY id
+ |)
+ |SELECT *, (SELECT COUNT(*) FROM t2) FROM t2 LIMIT 10
+ |""".stripMargin)
checkSparkAnswer(df)
}
}
@@ -532,15 +533,34 @@ class CometExecSuite extends CometTestBase {
val df = sql("SELECT * FROM tbl WHERE _2 > _3")
df.collect()
- val metrics = find(df.queryExecution.executedPlan)(s =>
+ find(df.queryExecution.executedPlan)(s =>
s.isInstanceOf[CometScanExec] ||
s.isInstanceOf[CometNativeScanExec])
- .map(_.metrics)
- .get
+ .foreach(scan => {
+ val metrics = scan.metrics
+ scan match {
+ case _: CometScanExec => {
+ assert(metrics.contains("scanTime"))
+ assert(metrics.contains("cast_time"))
+ assert(metrics("scanTime").value > 0)
+ assert(metrics("cast_time").value > 0)
+ }
+ case _: CometNativeScanExec => {
+ assert(metrics.contains("time_elapsed_scanning_total"))
+ assert(metrics.contains("bytes_scanned"))
+ assert(metrics.contains("output_rows"))
+ assert(metrics.contains("time_elapsed_opening"))
+ assert(metrics.contains("time_elapsed_processing"))
+ assert(metrics.contains("time_elapsed_scanning_until_data"))
+ assert(metrics("time_elapsed_scanning_total").value > 0)
+ assert(metrics("bytes_scanned").value > 0)
+ assert(metrics("output_rows").value == 0)
+ assert(metrics("time_elapsed_opening").value > 0)
+ assert(metrics("time_elapsed_processing").value > 0)
+ assert(metrics("time_elapsed_scanning_until_data").value > 0)
+ }
+ }
+ })
- assert(metrics.contains("scanTime"))
- assert(metrics.contains("cast_time"))
- assert(metrics("scanTime").value > 0)
- assert(metrics("cast_time").value > 0)
}
}
}
@@ -1265,16 +1285,16 @@ class CometExecSuite extends CometTestBase {
| ) VALUES('a')
""".stripMargin)
checkSparkAnswerAndOperator(sql("""
- | SELECT
- | name,
- | CAST(part1 AS STRING),
- | CAST(part2 as STRING),
- | CAST(part3 as STRING),
- | part4,
- | part5,
- | part6,
- | part7
- | FROM t1
+ | SELECT
+ | name,
+ | CAST(part1 AS STRING),
+ | CAST(part2 as STRING),
+ | CAST(part3 as STRING),
+ | part4,
+ | part5,
+ | part6,
+ | part7
+ | FROM t1
""".stripMargin))
val e = intercept[AnalysisException] {
@@ -1504,17 +1524,17 @@ class CometExecSuite extends CometTestBase {
.saveAsTable("t1")
val df1 = spark.sql("""
- |SELECT a, b, ROW_NUMBER() OVER(ORDER BY a, b)
AS rn
- |FROM t1 LIMIT 3
- |""".stripMargin)
+ |SELECT a, b, ROW_NUMBER() OVER(ORDER BY a, b) AS rn
+ |FROM t1 LIMIT 3
+ |""".stripMargin)
assert(df1.rdd.getNumPartitions == 1)
checkSparkAnswerAndOperator(df1, classOf[WindowExec])
val df2 = spark.sql("""
- |SELECT b, RANK() OVER(ORDER BY a, b) AS rk,
DENSE_RANK(b) OVER(ORDER BY a, b) AS s
- |FROM t1 LIMIT 2
- |""".stripMargin)
+ |SELECT b, RANK() OVER(ORDER BY a, b) AS rk, DENSE_RANK(b)
OVER(ORDER BY a, b) AS s
+ |FROM t1 LIMIT 2
+ |""".stripMargin)
assert(df2.rdd.getNumPartitions == 1)
checkSparkAnswerAndOperator(df2, classOf[WindowExec],
classOf[ProjectExec])
diff --git
a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
index 431847839..b4c09a4a5 100644
--- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
@@ -1026,7 +1026,7 @@ abstract class ParquetReadSuite extends CometTestBase {
}
test("scan metrics") {
- val metricNames = Seq(
+ val cometScanMetricNames = Seq(
"ParquetRowGroups",
"ParquetNativeDecodeTime",
"ParquetNativeLoadTime",
@@ -1035,14 +1035,29 @@ abstract class ParquetReadSuite extends CometTestBase {
"ParquetInputFileReadSize",
"ParquetInputFileReadThroughput")
+ val cometNativeScanMetricNames = Seq(
+ "time_elapsed_scanning_total",
+ "bytes_scanned",
+ "output_rows",
+ "time_elapsed_opening",
+ "time_elapsed_processing",
+ "time_elapsed_scanning_until_data")
+
withParquetTable((0 until 10000).map(i => (i, i.toDouble)), "tbl") {
val df = sql("SELECT * FROM tbl WHERE _1 > 0")
val scans = df.queryExecution.executedPlan collect {
case s: CometScanExec => s
case s: CometBatchScanExec => s
+ case s: CometNativeScanExec => s
}
assert(scans.size == 1, s"Expect one scan node but found ${scans.size}")
val metrics = scans.head.metrics
+
+ val metricNames = scans.head match {
+ case _: CometNativeScanExec => cometNativeScanMetricNames
+ case _ => cometScanMetricNames
+ }
+
metricNames.foreach { metricName =>
assert(metrics.contains(metricName), s"metric $metricName was not
found")
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]