This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a77ae27f15e [SPARK-41442][SQL][FOLLOWUP] SQLMetric should not expose 
-1 value as it's invalid
a77ae27f15e is described below

commit a77ae27f15e804a8f14119a6003cecc1a9ff201c
Author: Wenchen Fan <[email protected]>
AuthorDate: Sat Dec 31 09:56:09 2022 +0900

    [SPARK-41442][SQL][FOLLOWUP] SQLMetric should not expose -1 value as it's 
invalid
    
    ### What changes were proposed in this pull request?
    
    This is a followup of https://github.com/apache/spark/pull/38969 to fix a 
regression.
    
    SQL UI is not the only way for end users to see the SQL metrics. They can 
also access the accumulator values in the physical plan programmatically via 
query execution listener. We should be consistent with the SQL UI and not 
expose the -1 value.
    
    ### Why are the changes needed?
    
    make SQL UI and the accumulator value consistent
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, as end users can access accumulator values directly.
    
    ### How was this patch tested?
    
    existing tests
    
    Closes #39311 from cloud-fan/metric.
    
    Authored-by: Wenchen Fan <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 .../scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala | 7 ++++++-
 .../org/apache/spark/sql/DynamicPartitionPruningSuite.scala      | 4 ++--
 .../spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala    | 9 +--------
 .../org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala  | 6 +-----
 4 files changed, 10 insertions(+), 16 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
index b31d0b9989d..6d2578c3576 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
@@ -78,7 +78,12 @@ class SQLMetric(val metricType: String, initValue: Long = 
0L) extends Accumulato
 
   def +=(v: Long): Unit = add(v)
 
-  override def value: Long = _value
+  // We may use -1 as initial value of the accumulator, so that the SQL UI can 
filter out
+  // invalid accumulator values (0 is a valid metric value) when calculating 
min, max, etc.
+  // However, users can also access the SQL metrics values programmatically 
via this method.
+  // We should be consistent with the SQL UI and don't expose -1 to users.
+  // See `SQLMetrics.stringValue`. When there is no valid accumulator values, 
0 is the metric value.
+  override def value: Long = if (_value < 0) 0 else _value
 
   // Provide special identifier as metadata so we can tell that this is a 
`SQLMetric` later
   override def toInfo(update: Option[Any], value: Option[Any]): 
AccumulableInfo = {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
index f7b51db1c90..ff78af7e636 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
@@ -1715,7 +1715,7 @@ abstract class DynamicPartitionPruningV1Suite extends 
DynamicPartitionPruningDat
         val allFilesNum = scan1.metrics("numFiles").value
         val allFilesSize = scan1.metrics("filesSize").value
         assert(scan1.metrics("numPartitions").value === numPartitions)
-        assert(scan1.metrics("pruningTime").value === -1)
+        assert(scan1.metrics("pruningTime").value === 0)
 
         // No dynamic partition pruning, so no static metrics
         // Only files from fid = 5 partition are scanned
@@ -1729,7 +1729,7 @@ abstract class DynamicPartitionPruningV1Suite extends 
DynamicPartitionPruningDat
         assert(0 < partFilesNum && partFilesNum < allFilesNum)
         assert(0 < partFilesSize && partFilesSize < allFilesSize)
         assert(scan2.metrics("numPartitions").value === 1)
-        assert(scan2.metrics("pruningTime").value === -1)
+        assert(scan2.metrics("pruningTime").value === 0)
 
         // Dynamic partition pruning is used
         // Static metrics are as-if reading the whole fact table
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 1f10ff36acb..988695e2667 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -2170,15 +2170,8 @@ class AdaptiveQueryExecSuite
         assert(aqeReads.length == 2)
         aqeReads.foreach { c =>
           val stats = c.child.asInstanceOf[QueryStageExec].getRuntimeStatistics
-          val rowCount = stats.rowCount.get
+          assert(stats.sizeInBytes >= 0)
           assert(stats.rowCount.get >= 0)
-          if (rowCount == 0) {
-            // For empty relation, the query stage doesn't serialize any bytes.
-            // The SQLMetric keeps initial value.
-            assert(stats.sizeInBytes == -1)
-          } else {
-            assert(stats.sizeInBytes > 0)
-          }
         }
       }
     }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 424052df289..76b5364164e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -786,11 +786,7 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils
 
     testMetricsInSparkPlanOperator(exchanges.head,
       Map("dataSize" -> 3200, "shuffleRecordsWritten" -> 100))
-    // `testData2.filter($"b" === 0)` is an empty relation.
-    // The exchange doesn't serialize any bytes.
-    // The SQLMetric keeps initial value.
-    testMetricsInSparkPlanOperator(exchanges(1),
-      Map("dataSize" -> -1, "shuffleRecordsWritten" -> 0))
+    testMetricsInSparkPlanOperator(exchanges(1), Map("dataSize" -> 0, 
"shuffleRecordsWritten" -> 0))
   }
 
   test("Add numRows to metric of BroadcastExchangeExec") {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to