This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new a77ae27f15e [SPARK-41442][SQL][FOLLOWUP] SQLMetric should not expose
-1 value as it's invalid
a77ae27f15e is described below
commit a77ae27f15e804a8f14119a6003cecc1a9ff201c
Author: Wenchen Fan <[email protected]>
AuthorDate: Sat Dec 31 09:56:09 2022 +0900
[SPARK-41442][SQL][FOLLOWUP] SQLMetric should not expose -1 value as it's
invalid
### What changes were proposed in this pull request?
This is a followup of https://github.com/apache/spark/pull/38969 to fix a
regression.
SQL UI is not the only way for end users to see the SQL metrics. They can
also access the accumulator values in the physical plan programmatically via
query execution listener. We should be consistent with the SQL UI and not
expose the -1 value.
### Why are the changes needed?
make SQL UI and the accumulator value consistent
### Does this PR introduce _any_ user-facing change?
Yes, as end users can access accumulator values directly.
### How was this patch tested?
existing tests
Closes #39311 from cloud-fan/metric.
Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala | 7 ++++++-
.../org/apache/spark/sql/DynamicPartitionPruningSuite.scala | 4 ++--
.../spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala | 9 +--------
.../org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala | 6 +-----
4 files changed, 10 insertions(+), 16 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
index b31d0b9989d..6d2578c3576 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
@@ -78,7 +78,12 @@ class SQLMetric(val metricType: String, initValue: Long =
0L) extends Accumulato
def +=(v: Long): Unit = add(v)
- override def value: Long = _value
+ // We may use -1 as initial value of the accumulator, so that the SQL UI can
filter out
+ // invalid accumulator values (0 is a valid metric value) when calculating
min, max, etc.
+ // However, users can also access the SQL metrics values programmatically
via this method.
+ // We should be consistent with the SQL UI and don't expose -1 to users.
+ // See `SQLMetrics.stringValue`. When there is no valid accumulator values,
0 is the metric value.
+ override def value: Long = if (_value < 0) 0 else _value
// Provide special identifier as metadata so we can tell that this is a
`SQLMetric` later
override def toInfo(update: Option[Any], value: Option[Any]):
AccumulableInfo = {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
index f7b51db1c90..ff78af7e636 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
@@ -1715,7 +1715,7 @@ abstract class DynamicPartitionPruningV1Suite extends
DynamicPartitionPruningDat
val allFilesNum = scan1.metrics("numFiles").value
val allFilesSize = scan1.metrics("filesSize").value
assert(scan1.metrics("numPartitions").value === numPartitions)
- assert(scan1.metrics("pruningTime").value === -1)
+ assert(scan1.metrics("pruningTime").value === 0)
// No dynamic partition pruning, so no static metrics
// Only files from fid = 5 partition are scanned
@@ -1729,7 +1729,7 @@ abstract class DynamicPartitionPruningV1Suite extends
DynamicPartitionPruningDat
assert(0 < partFilesNum && partFilesNum < allFilesNum)
assert(0 < partFilesSize && partFilesSize < allFilesSize)
assert(scan2.metrics("numPartitions").value === 1)
- assert(scan2.metrics("pruningTime").value === -1)
+ assert(scan2.metrics("pruningTime").value === 0)
// Dynamic partition pruning is used
// Static metrics are as-if reading the whole fact table
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 1f10ff36acb..988695e2667 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -2170,15 +2170,8 @@ class AdaptiveQueryExecSuite
assert(aqeReads.length == 2)
aqeReads.foreach { c =>
val stats = c.child.asInstanceOf[QueryStageExec].getRuntimeStatistics
- val rowCount = stats.rowCount.get
+ assert(stats.sizeInBytes >= 0)
assert(stats.rowCount.get >= 0)
- if (rowCount == 0) {
- // For empty relation, the query stage doesn't serialize any bytes.
- // The SQLMetric keeps initial value.
- assert(stats.sizeInBytes == -1)
- } else {
- assert(stats.sizeInBytes > 0)
- }
}
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 424052df289..76b5364164e 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -786,11 +786,7 @@ class SQLMetricsSuite extends SharedSparkSession with
SQLMetricsTestUtils
testMetricsInSparkPlanOperator(exchanges.head,
Map("dataSize" -> 3200, "shuffleRecordsWritten" -> 100))
- // `testData2.filter($"b" === 0)` is an empty relation.
- // The exchange doesn't serialize any bytes.
- // The SQLMetric keeps initial value.
- testMetricsInSparkPlanOperator(exchanges(1),
- Map("dataSize" -> -1, "shuffleRecordsWritten" -> 0))
+ testMetricsInSparkPlanOperator(exchanges(1), Map("dataSize" -> 0,
"shuffleRecordsWritten" -> 0))
}
test("Add numRows to metric of BroadcastExchangeExec") {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]