This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new cfa9afa58 [GLUTEN-4835][CORE] Match metric names with Spark (#4834)
cfa9afa58 is described below
commit cfa9afa58ce17ff001282a82f632100d941a0322
Author: Chungmin Lee <[email protected]>
AuthorDate: Mon Mar 11 02:21:58 2024 -0700
[GLUTEN-4835][CORE] Match metric names with Spark (#4834)
---
.../backendsapi/clickhouse/CHMetricsApi.scala | 52 +++++++++++-----------
.../metrics/BatchScanMetricsUpdater.scala | 6 +--
.../metrics/ExpandMetricsUpdater.scala | 4 +-
.../metrics/FileSourceScanMetricsUpdater.scala | 6 +--
.../metrics/FilterMetricsUpdater.scala | 4 +-
.../metrics/GenerateMetricsUpdater.scala | 4 +-
.../metrics/HashAggregateMetricsUpdater.scala | 4 +-
.../metrics/HashJoinMetricsUpdater.scala | 4 +-
.../metrics/HiveTableScanMetricsUpdater.scala | 4 +-
.../metrics/InputIteratorMetricsUpdater.scala | 4 +-
.../metrics/ProjectMetricsUpdater.scala | 4 +-
.../glutenproject/metrics/SortMetricsUpdater.scala | 4 +-
.../metrics/WindowMetricsUpdater.scala | 4 +-
.../GlutenClickHouseTPCHBucketSuite.scala | 18 ++++----
...ckHouseTPCHColumnarShuffleParquetAQESuite.scala | 44 +++++++++---------
.../GlutenClickHouseTPCHParquetBucketSuite.scala | 18 ++++----
.../GlutenClickHouseTPCDSMetricsSuite.scala | 12 ++---
.../metrics/GlutenClickHouseTPCHMetricsSuite.scala | 36 +++++++--------
.../benchmarks/CHParquetReadBenchmark.scala | 2 +-
.../backendsapi/velox/MetricsApiImpl.scala | 20 ++++-----
.../execution/BasicScanExecTransformer.scala | 2 +-
.../metrics/BatchScanMetricsUpdater.scala | 4 +-
.../metrics/ExpandMetricsUpdater.scala | 2 +-
.../metrics/FileSourceScanMetricsUpdater.scala | 2 +-
.../metrics/FilterMetricsUpdater.scala | 2 +-
.../metrics/HiveTableScanMetricsUpdater.scala | 2 +-
.../metrics/InputIteratorMetricsUpdater.scala | 4 +-
.../metrics/LimitMetricsUpdater.scala | 2 +-
.../metrics/ProjectMetricsUpdater.scala | 2 +-
.../glutenproject/metrics/SortMetricsUpdater.scala | 2 +-
.../metrics/WindowMetricsUpdater.scala | 2 +-
.../org/apache/spark/sql/GlutenSQLQuerySuite.scala | 26 +++++++++++
32 files changed, 165 insertions(+), 141 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHMetricsApi.scala
b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHMetricsApi.scala
index 838612036..488686e93 100644
---
a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHMetricsApi.scala
+++
b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHMetricsApi.scala
@@ -44,8 +44,8 @@ class CHMetricsApi extends MetricsApi with Logging with
LogLevelUtil {
"iterReadTime" -> SQLMetrics.createTimingMetric(
sparkContext,
"time of reading from iterator"),
- "inputRows" -> SQLMetrics.createMetric(sparkContext, "number of input
rows"),
- "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output
rows"),
+ "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input
rows"),
+ "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of
output rows"),
"fillingRightJoinSideTime" -> SQLMetrics.createTimingMetric(
sparkContext,
"filling right join side time")
@@ -59,12 +59,12 @@ class CHMetricsApi extends MetricsApi with Logging with
LogLevelUtil {
override def genBatchScanTransformerMetrics(sparkContext: SparkContext):
Map[String, SQLMetric] =
Map(
- "inputRows" -> SQLMetrics.createMetric(sparkContext, "number of input
rows"),
+ "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input
rows"),
"inputVectors" -> SQLMetrics.createMetric(sparkContext, "number of input
vectors"),
"inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of
input bytes"),
"rawInputRows" -> SQLMetrics.createMetric(sparkContext, "number of raw
input rows"),
"rawInputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of
raw input bytes"),
- "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output
rows"),
+ "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of
output rows"),
"outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of
output vectors"),
"outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of
output bytes"),
"scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"),
@@ -79,12 +79,12 @@ class CHMetricsApi extends MetricsApi with Logging with
LogLevelUtil {
override def genHiveTableScanTransformerMetrics(
sparkContext: SparkContext): Map[String, SQLMetric] =
Map(
- "inputRows" -> SQLMetrics.createMetric(sparkContext, "number of input
rows"),
+ "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input
rows"),
"inputVectors" -> SQLMetrics.createMetric(sparkContext, "number of input
vectors"),
"inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of
input bytes"),
"rawInputRows" -> SQLMetrics.createMetric(sparkContext, "number of raw
input rows"),
"rawInputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of
raw input bytes"),
- "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output
rows"),
+ "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of
output rows"),
"outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of
output vectors"),
"outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of
output bytes"),
"scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"),
@@ -107,12 +107,12 @@ class CHMetricsApi extends MetricsApi with Logging with
LogLevelUtil {
override def genFileSourceScanTransformerMetrics(
sparkContext: SparkContext): Map[String, SQLMetric] =
Map(
- "inputRows" -> SQLMetrics.createMetric(sparkContext, "number of input
rows"),
+ "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input
rows"),
"inputVectors" -> SQLMetrics.createMetric(sparkContext, "number of input
vectors"),
"inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of
input bytes"),
"rawInputRows" -> SQLMetrics.createMetric(sparkContext, "number of raw
input rows"),
"rawInputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of
raw input bytes"),
- "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output
rows"),
+ "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of
output rows"),
"outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of
output vectors"),
"outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of
output bytes"),
"scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"),
@@ -133,10 +133,10 @@ class CHMetricsApi extends MetricsApi with Logging with
LogLevelUtil {
override def genFilterTransformerMetrics(sparkContext: SparkContext):
Map[String, SQLMetric] =
Map(
- "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output
rows"),
+ "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of
output rows"),
"outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of
output vectors"),
"outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of
output bytes"),
- "inputRows" -> SQLMetrics.createMetric(sparkContext, "number of input
rows"),
+ "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input
rows"),
"inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of
input bytes"),
"extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra
operators time"),
"inputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of
waiting for data"),
@@ -149,10 +149,10 @@ class CHMetricsApi extends MetricsApi with Logging with
LogLevelUtil {
override def genProjectTransformerMetrics(sparkContext: SparkContext):
Map[String, SQLMetric] =
Map(
- "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output
rows"),
+ "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of
output rows"),
"outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of
output vectors"),
"outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of
output bytes"),
- "inputRows" -> SQLMetrics.createMetric(sparkContext, "number of input
rows"),
+ "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input
rows"),
"inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of
input bytes"),
"extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra
operators time"),
"inputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of
waiting for data"),
@@ -166,10 +166,10 @@ class CHMetricsApi extends MetricsApi with Logging with
LogLevelUtil {
override def genHashAggregateTransformerMetrics(
sparkContext: SparkContext): Map[String, SQLMetric] =
Map(
- "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output
rows"),
+ "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of
output rows"),
"outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of
output vectors"),
"outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of
output bytes"),
- "inputRows" -> SQLMetrics.createMetric(sparkContext, "number of input
rows"),
+ "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input
rows"),
"inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of
input bytes"),
"extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra
operators time"),
"inputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of
waiting for data"),
@@ -187,10 +187,10 @@ class CHMetricsApi extends MetricsApi with Logging with
LogLevelUtil {
override def genExpandTransformerMetrics(sparkContext: SparkContext):
Map[String, SQLMetric] =
Map(
- "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output
rows"),
+ "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of
output rows"),
"outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of
output vectors"),
"outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of
output bytes"),
- "inputRows" -> SQLMetrics.createMetric(sparkContext, "number of input
rows"),
+ "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input
rows"),
"inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of
input bytes"),
"extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra
operators time"),
"inputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of
waiting for data"),
@@ -233,10 +233,10 @@ class CHMetricsApi extends MetricsApi with Logging with
LogLevelUtil {
override def genWindowTransformerMetrics(sparkContext: SparkContext):
Map[String, SQLMetric] =
Map(
- "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output
rows"),
+ "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of
output rows"),
"outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of
output vectors"),
"outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of
output bytes"),
- "inputRows" -> SQLMetrics.createMetric(sparkContext, "number of input
rows"),
+ "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input
rows"),
"inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of
input bytes"),
"extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra
operators time"),
"inputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of
waiting for data"),
@@ -263,10 +263,10 @@ class CHMetricsApi extends MetricsApi with Logging with
LogLevelUtil {
override def genLimitTransformerMetrics(sparkContext: SparkContext):
Map[String, SQLMetric] =
Map(
- "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output
rows"),
+ "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of
output rows"),
"outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of
output vectors"),
"outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of
output bytes"),
- "inputRows" -> SQLMetrics.createMetric(sparkContext, "number of input
rows"),
+ "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input
rows"),
"inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of
input bytes"),
"extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra
operators time"),
"inputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of
waiting for data"),
@@ -279,10 +279,10 @@ class CHMetricsApi extends MetricsApi with Logging with
LogLevelUtil {
override def genSortTransformerMetrics(sparkContext: SparkContext):
Map[String, SQLMetric] =
Map(
- "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output
rows"),
+ "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of
output rows"),
"outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of
output vectors"),
"outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of
output bytes"),
- "inputRows" -> SQLMetrics.createMetric(sparkContext, "number of input
rows"),
+ "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input
rows"),
"inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of
input bytes"),
"extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra
operators time"),
"inputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of
waiting for data"),
@@ -319,10 +319,10 @@ class CHMetricsApi extends MetricsApi with Logging with
LogLevelUtil {
override def genHashJoinTransformerMetrics(sparkContext: SparkContext):
Map[String, SQLMetric] =
Map(
- "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output
rows"),
+ "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of
output rows"),
"outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of
output vectors"),
"outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of
output bytes"),
- "inputRows" -> SQLMetrics.createMetric(sparkContext, "number of input
rows"),
+ "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input
rows"),
"inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of
input bytes"),
"extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra
operators time"),
"inputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of
waiting for data"),
@@ -358,10 +358,10 @@ class CHMetricsApi extends MetricsApi with Logging with
LogLevelUtil {
}
override def genGenerateTransformerMetrics(sparkContext: SparkContext):
Map[String, SQLMetric] =
Map(
- "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output
rows"),
+ "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of
output rows"),
"outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of
output vectors"),
"outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of
output bytes"),
- "inputRows" -> SQLMetrics.createMetric(sparkContext, "number of input
rows"),
+ "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input
rows"),
"inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of
input bytes"),
"extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra
operators time"),
"inputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of
waiting for data"),
diff --git
a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/BatchScanMetricsUpdater.scala
b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/BatchScanMetricsUpdater.scala
index 5cd44a508..d173f1715 100644
---
a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/BatchScanMetricsUpdater.scala
+++
b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/BatchScanMetricsUpdater.scala
@@ -23,10 +23,10 @@ class BatchScanMetricsUpdater(@transient val metrics:
Map[String, SQLMetric])
extends MetricsUpdater {
val scanTime: SQLMetric = metrics("scanTime")
- val outputRows: SQLMetric = metrics("outputRows")
+ val outputRows: SQLMetric = metrics("numOutputRows")
val outputVectors: SQLMetric = metrics("outputVectors")
val outputBytes: SQLMetric = metrics("outputBytes")
- val inputRows: SQLMetric = metrics("inputRows")
+ val inputRows: SQLMetric = metrics("numInputRows")
val inputBytes: SQLMetric = metrics("inputBytes")
val extraTime: SQLMetric = metrics("extraTime")
val inputWaitTime: SQLMetric = metrics("inputWaitTime")
@@ -34,7 +34,7 @@ class BatchScanMetricsUpdater(@transient val metrics:
Map[String, SQLMetric])
override def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = {
// inputMetrics.bridgeIncBytesRead(metrics("inputBytes").value)
- // inputMetrics.bridgeIncRecordsRead(metrics("inputRows").value)
+ // inputMetrics.bridgeIncRecordsRead(metrics("numInputRows").value)
}
override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = {
diff --git
a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/ExpandMetricsUpdater.scala
b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/ExpandMetricsUpdater.scala
index 0aa62e875..d43cbcbc6 100644
---
a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/ExpandMetricsUpdater.scala
+++
b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/ExpandMetricsUpdater.scala
@@ -33,9 +33,9 @@ class ExpandMetricsUpdater(val metrics: Map[String,
SQLMetric]) extends MetricsU
MetricsUtil.updateExtraTimeMetric(
metricsData,
metrics("extraTime"),
- metrics("outputRows"),
+ metrics("numOutputRows"),
metrics("outputBytes"),
- metrics("inputRows"),
+ metrics("numInputRows"),
metrics("inputBytes"),
ExpandMetricsUpdater.INCLUDING_PROCESSORS,
ExpandMetricsUpdater.CH_PLAN_NODE_NAME
diff --git
a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/FileSourceScanMetricsUpdater.scala
b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/FileSourceScanMetricsUpdater.scala
index 7985efbf0..1c6da8dad 100644
---
a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/FileSourceScanMetricsUpdater.scala
+++
b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/FileSourceScanMetricsUpdater.scala
@@ -27,10 +27,10 @@ class FileSourceScanMetricsUpdater(@transient val metrics:
Map[String, SQLMetric
extends MetricsUpdater {
val scanTime: SQLMetric = metrics("scanTime")
- val outputRows: SQLMetric = metrics("outputRows")
+ val outputRows: SQLMetric = metrics("numOutputRows")
val outputVectors: SQLMetric = metrics("outputVectors")
val outputBytes: SQLMetric = metrics("outputBytes")
- val inputRows: SQLMetric = metrics("inputRows")
+ val inputRows: SQLMetric = metrics("numInputRows")
val inputBytes: SQLMetric = metrics("inputBytes")
val extraTime: SQLMetric = metrics("extraTime")
val inputWaitTime: SQLMetric = metrics("inputWaitTime")
@@ -38,7 +38,7 @@ class FileSourceScanMetricsUpdater(@transient val metrics:
Map[String, SQLMetric
override def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = {
// inputMetrics.bridgeIncBytesRead(metrics("inputBytes").value)
- // inputMetrics.bridgeIncRecordsRead(metrics("inputRows").value)
+ // inputMetrics.bridgeIncRecordsRead(metrics("numInputRows").value)
}
override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = {
diff --git
a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/FilterMetricsUpdater.scala
b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/FilterMetricsUpdater.scala
index 1a2b77f35..b44a9e382 100644
---
a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/FilterMetricsUpdater.scala
+++
b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/FilterMetricsUpdater.scala
@@ -33,9 +33,9 @@ class FilterMetricsUpdater(val metrics: Map[String,
SQLMetric]) extends MetricsU
MetricsUtil.updateExtraTimeMetric(
metricsData,
metrics("extraTime"),
- metrics("outputRows"),
+ metrics("numOutputRows"),
metrics("outputBytes"),
- metrics("inputRows"),
+ metrics("numInputRows"),
metrics("inputBytes"),
FilterMetricsUpdater.INCLUDING_PROCESSORS,
FilterMetricsUpdater.INCLUDING_PROCESSORS
diff --git
a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/GenerateMetricsUpdater.scala
b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/GenerateMetricsUpdater.scala
index 166ba5d7e..5f2e3f63c 100644
---
a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/GenerateMetricsUpdater.scala
+++
b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/GenerateMetricsUpdater.scala
@@ -33,9 +33,9 @@ class GenerateMetricsUpdater(val metrics: Map[String,
SQLMetric]) extends Metric
MetricsUtil.updateExtraTimeMetric(
metricsData,
metrics("extraTime"),
- metrics("outputRows"),
+ metrics("numOutputRows"),
metrics("outputBytes"),
- metrics("inputRows"),
+ metrics("numInputRows"),
metrics("inputBytes"),
GenerateMetricsUpdater.INCLUDING_PROCESSORS,
GenerateMetricsUpdater.CH_PLAN_NODE_NAME
diff --git
a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/HashAggregateMetricsUpdater.scala
b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/HashAggregateMetricsUpdater.scala
index ac55b8d17..8f6e07a94 100644
---
a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/HashAggregateMetricsUpdater.scala
+++
b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/HashAggregateMetricsUpdater.scala
@@ -44,9 +44,9 @@ class HashAggregateMetricsUpdater(val metrics: Map[String,
SQLMetric])
MetricsUtil.updateExtraTimeMetric(
aggMetricsData,
metrics("extraTime"),
- metrics("outputRows"),
+ metrics("numOutputRows"),
metrics("outputBytes"),
- metrics("inputRows"),
+ metrics("numInputRows"),
metrics("inputBytes"),
HashAggregateMetricsUpdater.INCLUDING_PROCESSORS,
HashAggregateMetricsUpdater.CH_PLAN_NODE_NAME
diff --git
a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/HashJoinMetricsUpdater.scala
b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/HashJoinMetricsUpdater.scala
index 522069aef..180bcc034 100644
---
a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/HashJoinMetricsUpdater.scala
+++
b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/HashJoinMetricsUpdater.scala
@@ -83,9 +83,9 @@ class HashJoinMetricsUpdater(val metrics: Map[String,
SQLMetric])
metrics("extraTime") += (processor.time / 1000L).toLong
}
if
(HashJoinMetricsUpdater.CH_PLAN_NODE_NAME.contains(processor.name)) {
- metrics("outputRows") += processor.outputRows
+ metrics("numOutputRows") += processor.outputRows
metrics("outputBytes") += processor.outputBytes
- metrics("inputRows") += processor.inputRows
+ metrics("numInputRows") += processor.inputRows
metrics("inputBytes") += processor.inputBytes
}
})
diff --git
a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/HiveTableScanMetricsUpdater.scala
b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/HiveTableScanMetricsUpdater.scala
index 4e3682561..89c3198da 100644
---
a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/HiveTableScanMetricsUpdater.scala
+++
b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/HiveTableScanMetricsUpdater.scala
@@ -23,10 +23,10 @@ class HiveTableScanMetricsUpdater(@transient val metrics:
Map[String, SQLMetric]
extends MetricsUpdater {
val scanTime: SQLMetric = metrics("scanTime")
- val outputRows: SQLMetric = metrics("outputRows")
+ val outputRows: SQLMetric = metrics("numOutputRows")
val outputVectors: SQLMetric = metrics("outputVectors")
val outputBytes: SQLMetric = metrics("outputBytes")
- val inputRows: SQLMetric = metrics("inputRows")
+ val inputRows: SQLMetric = metrics("numInputRows")
val inputBytes: SQLMetric = metrics("inputBytes")
val extraTime: SQLMetric = metrics("extraTime")
val inputWaitTime: SQLMetric = metrics("inputWaitTime")
diff --git
a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/InputIteratorMetricsUpdater.scala
b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/InputIteratorMetricsUpdater.scala
index e608cf1e3..3a3477659 100644
---
a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/InputIteratorMetricsUpdater.scala
+++
b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/InputIteratorMetricsUpdater.scala
@@ -33,8 +33,8 @@ case class InputIteratorMetricsUpdater(metrics: Map[String,
SQLMetric]) extends
InputIteratorMetricsUpdater.CH_PLAN_NODE_NAME
.exists(processor.name.startsWith(_))
) {
- metrics("inputRows") += processor.inputRows
- metrics("outputRows") += processor.outputRows
+ metrics("numInputRows") += processor.inputRows
+ metrics("numOutputRows") += processor.outputRows
}
if (processor.name.equalsIgnoreCase("FillingRightJoinSide")) {
metrics("fillingRightJoinSideTime") += (processor.time /
1000L).toLong
diff --git
a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/ProjectMetricsUpdater.scala
b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/ProjectMetricsUpdater.scala
index bfdcdfd02..4bd445590 100644
---
a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/ProjectMetricsUpdater.scala
+++
b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/ProjectMetricsUpdater.scala
@@ -33,9 +33,9 @@ class ProjectMetricsUpdater(val metrics: Map[String,
SQLMetric]) extends Metrics
MetricsUtil.updateExtraTimeMetric(
metricsData,
metrics("extraTime"),
- metrics("outputRows"),
+ metrics("numOutputRows"),
metrics("outputBytes"),
- metrics("inputRows"),
+ metrics("numInputRows"),
metrics("inputBytes"),
ProjectMetricsUpdater.INCLUDING_PROCESSORS,
ProjectMetricsUpdater.CH_PLAN_NODE_NAME
diff --git
a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/SortMetricsUpdater.scala
b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/SortMetricsUpdater.scala
index 5aba5e290..e53ba6ccf 100644
---
a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/SortMetricsUpdater.scala
+++
b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/SortMetricsUpdater.scala
@@ -33,9 +33,9 @@ class SortMetricsUpdater(val metrics: Map[String, SQLMetric])
extends MetricsUpd
MetricsUtil.updateExtraTimeMetric(
metricsData,
metrics("extraTime"),
- metrics("outputRows"),
+ metrics("numOutputRows"),
metrics("outputBytes"),
- metrics("inputRows"),
+ metrics("numInputRows"),
metrics("inputBytes"),
SortMetricsUpdater.INCLUDING_PROCESSORS,
SortMetricsUpdater.CH_PLAN_NODE_NAME
diff --git
a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/WindowMetricsUpdater.scala
b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/WindowMetricsUpdater.scala
index 6943359ae..e36713c34 100644
---
a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/WindowMetricsUpdater.scala
+++
b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/WindowMetricsUpdater.scala
@@ -33,9 +33,9 @@ class WindowMetricsUpdater(val metrics: Map[String,
SQLMetric]) extends MetricsU
MetricsUtil.updateExtraTimeMetric(
metricsData,
metrics("extraTime"),
- metrics("outputRows"),
+ metrics("numOutputRows"),
metrics("outputBytes"),
- metrics("inputRows"),
+ metrics("numInputRows"),
metrics("inputBytes"),
WindowMetricsUpdater.INCLUDING_PROCESSORS,
WindowMetricsUpdater.CH_PLAN_NODE_NAME
diff --git
a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHBucketSuite.scala
b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHBucketSuite.scala
index 23face9ce..02d5bcb63 100644
---
a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHBucketSuite.scala
+++
b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHBucketSuite.scala
@@ -242,7 +242,7 @@ class GlutenClickHouseTPCHBucketSuite
assert(!(plans(0).asInstanceOf[FileSourceScanExecTransformer].bucketedScan))
assert(plans(0).metrics("numFiles").value === 2)
assert(plans(0).metrics("pruningTime").value === -1)
- assert(plans(0).metrics("outputRows").value === 591673)
+ assert(plans(0).metrics("numOutputRows").value === 591673)
})
}
@@ -301,7 +301,7 @@ class GlutenClickHouseTPCHBucketSuite
assert(plans(11).asInstanceOf[FileSourceScanExecTransformer].bucketedScan)
}
assert(plans(11).metrics("numFiles").value === 1)
- assert(plans(11).metrics("outputRows").value === 1000)
+ assert(plans(11).metrics("numOutputRows").value === 1000)
})
}
@@ -337,11 +337,11 @@ class GlutenClickHouseTPCHBucketSuite
assert(plans(2).asInstanceOf[FileSourceScanExecTransformer].bucketedScan)
}
assert(plans(2).metrics("numFiles").value === 2)
- assert(plans(2).metrics("outputRows").value === 3111)
+ assert(plans(2).metrics("numOutputRows").value === 3111)
assert(!(plans(3).asInstanceOf[FileSourceScanExecTransformer].bucketedScan))
assert(plans(3).metrics("numFiles").value === 2)
- assert(plans(3).metrics("outputRows").value === 72678)
+ assert(plans(3).metrics("numOutputRows").value === 72678)
})
withSQLConf(
@@ -383,11 +383,11 @@ class GlutenClickHouseTPCHBucketSuite
assert(plans(1).asInstanceOf[FileSourceScanExecTransformer].bucketedScan)
assert(plans(1).metrics("numFiles").value === 2)
- assert(plans(1).metrics("outputRows").value === 5552)
+ assert(plans(1).metrics("numOutputRows").value === 5552)
assert(plans(2).asInstanceOf[FileSourceScanExecTransformer].bucketedScan)
assert(plans(2).metrics("numFiles").value === 2)
- assert(plans(2).metrics("outputRows").value === 379809)
+ assert(plans(2).metrics("numOutputRows").value === 379809)
})
withSQLConf(
@@ -417,7 +417,7 @@ class GlutenClickHouseTPCHBucketSuite
assert(!(plans(0).asInstanceOf[FileSourceScanExecTransformer].bucketedScan))
assert(plans(0).metrics("numFiles").value === 2)
assert(plans(0).metrics("pruningTime").value === -1)
- assert(plans(0).metrics("outputRows").value === 11618)
+ assert(plans(0).metrics("numOutputRows").value === 11618)
})
}
@@ -442,11 +442,11 @@ class GlutenClickHouseTPCHBucketSuite
assert(plans(1).asInstanceOf[FileSourceScanExecTransformer].bucketedScan)
assert(plans(1).metrics("numFiles").value === 2)
- assert(plans(1).metrics("outputRows").value === 150000)
+ assert(plans(1).metrics("numOutputRows").value === 150000)
assert(plans(2).asInstanceOf[FileSourceScanExecTransformer].bucketedScan)
assert(plans(2).metrics("numFiles").value === 2)
- assert(plans(2).metrics("outputRows").value === 3155)
+ assert(plans(2).metrics("numOutputRows").value === 3155)
})
withSQLConf(
diff --git
a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala
b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala
index f710333d6..29815aff6 100644
---
a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala
+++
b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala
@@ -67,21 +67,21 @@ class GlutenClickHouseTPCHColumnarShuffleParquetAQESuite
assert(plans(4).metrics("numFiles").value === 1)
assert(plans(4).metrics("pruningTime").value === -1)
assert(plans(4).metrics("filesSize").value === 19230111)
- assert(plans(4).metrics("outputRows").value === 600572)
+ assert(plans(4).metrics("numOutputRows").value === 600572)
- assert(plans(3).metrics("inputRows").value === 591673)
- assert(plans(3).metrics("outputRows").value === 4)
+ assert(plans(3).metrics("numInputRows").value === 591673)
+ assert(plans(3).metrics("numOutputRows").value === 4)
assert(plans(3).metrics("outputVectors").value === 1)
- assert(plans(2).metrics("inputRows").value === 8)
- assert(plans(2).metrics("outputRows").value === 8)
+ assert(plans(2).metrics("numInputRows").value === 8)
+ assert(plans(2).metrics("numOutputRows").value === 8)
// Execute Sort operator, it will read the data twice.
- assert(plans(1).metrics("outputRows").value === 8)
+ assert(plans(1).metrics("numOutputRows").value === 8)
assert(plans(1).metrics("outputVectors").value === 2)
- assert(plans(0).metrics("inputRows").value === 4)
- assert(plans(0).metrics("outputRows").value === 4)
+ assert(plans(0).metrics("numInputRows").value === 4)
+ assert(plans(0).metrics("numOutputRows").value === 4)
}
}
@@ -100,12 +100,12 @@ class GlutenClickHouseTPCHColumnarShuffleParquetAQESuite
assert(plans(2).metrics("pruningTime").value === -1)
assert(plans(2).metrics("filesSize").value === 19230111)
- assert(plans(1).metrics("inputRows").value === 591673)
- assert(plans(1).metrics("outputRows").value === 4)
+ assert(plans(1).metrics("numInputRows").value === 591673)
+ assert(plans(1).metrics("numOutputRows").value === 4)
assert(plans(1).metrics("outputVectors").value === 1)
// Execute Sort operator, it will read the data twice.
- assert(plans(0).metrics("outputRows").value === 8)
+ assert(plans(0).metrics("numOutputRows").value === 8)
assert(plans(0).metrics("outputVectors").value === 2)
}
}
@@ -138,17 +138,17 @@ class GlutenClickHouseTPCHColumnarShuffleParquetAQESuite
assert(inputIteratorTransformers.size == 4)
- assert(inputIteratorTransformers(3).metrics("inputRows").value ===
324322)
- assert(inputIteratorTransformers(3).metrics("outputRows").value ===
324322)
+ assert(inputIteratorTransformers(3).metrics("numInputRows").value
=== 324322)
+ assert(inputIteratorTransformers(3).metrics("numOutputRows").value
=== 324322)
- assert(inputIteratorTransformers(2).metrics("inputRows").value ===
72678)
- assert(inputIteratorTransformers(2).metrics("outputRows").value ===
72678)
+ assert(inputIteratorTransformers(2).metrics("numInputRows").value
=== 72678)
+ assert(inputIteratorTransformers(2).metrics("numOutputRows").value
=== 72678)
- assert(inputIteratorTransformers(1).metrics("inputRows").value ===
3111)
- assert(inputIteratorTransformers(1).metrics("outputRows").value ===
3111)
+ assert(inputIteratorTransformers(1).metrics("numInputRows").value
=== 3111)
+ assert(inputIteratorTransformers(1).metrics("numOutputRows").value
=== 3111)
- assert(inputIteratorTransformers(0).metrics("inputRows").value ===
15224)
- assert(inputIteratorTransformers(0).metrics("outputRows").value ===
15224)
+ assert(inputIteratorTransformers(0).metrics("numInputRows").value
=== 15224)
+ assert(inputIteratorTransformers(0).metrics("numOutputRows").value
=== 15224)
}
}
}
@@ -280,10 +280,10 @@ class GlutenClickHouseTPCHColumnarShuffleParquetAQESuite
case scanExec: BasicScanExecTransformer => scanExec
case filterExec: FilterExecTransformerBase => filterExec
}
- assert(plans(2).metrics("inputRows").value === 600572)
- assert(plans(2).metrics("outputRows").value === 379809)
+ assert(plans(2).metrics("numInputRows").value === 600572)
+ assert(plans(2).metrics("numOutputRows").value === 379809)
- assert(plans(3).metrics("outputRows").value === 600572)
+ assert(plans(3).metrics("numOutputRows").value === 600572)
}
}
diff --git
a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHParquetBucketSuite.scala
b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHParquetBucketSuite.scala
index 427c4a6eb..8c417499e 100644
---
a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHParquetBucketSuite.scala
+++
b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHParquetBucketSuite.scala
@@ -267,7 +267,7 @@ class GlutenClickHouseTPCHParquetBucketSuite
assert(!(plans(0).asInstanceOf[FileSourceScanExecTransformer].bucketedScan))
assert(plans(0).metrics("numFiles").value === 4)
assert(plans(0).metrics("pruningTime").value === -1)
- assert(plans(0).metrics("outputRows").value === 600572)
+ assert(plans(0).metrics("numOutputRows").value === 600572)
}
)
}
@@ -329,7 +329,7 @@ class GlutenClickHouseTPCHParquetBucketSuite
assert(plans(11).asInstanceOf[FileSourceScanExecTransformer].bucketedScan)
}
assert(plans(11).metrics("numFiles").value === 1)
- assert(plans(11).metrics("outputRows").value === 1000)
+ assert(plans(11).metrics("numOutputRows").value === 1000)
}
)
}
@@ -369,11 +369,11 @@ class GlutenClickHouseTPCHParquetBucketSuite
assert(plans(2).asInstanceOf[FileSourceScanExecTransformer].bucketedScan)
}
assert(plans(2).metrics("numFiles").value === 4)
- assert(plans(2).metrics("outputRows").value === 15000)
+ assert(plans(2).metrics("numOutputRows").value === 15000)
assert(!(plans(3).asInstanceOf[FileSourceScanExecTransformer].bucketedScan))
assert(plans(3).metrics("numFiles").value === 4)
- assert(plans(3).metrics("outputRows").value === 150000)
+ assert(plans(3).metrics("numOutputRows").value === 150000)
}
)
@@ -421,11 +421,11 @@ class GlutenClickHouseTPCHParquetBucketSuite
assert(plans(1).asInstanceOf[FileSourceScanExecTransformer].bucketedScan)
assert(plans(1).metrics("numFiles").value === 4)
- assert(plans(1).metrics("outputRows").value === 150000)
+ assert(plans(1).metrics("numOutputRows").value === 150000)
assert(plans(2).asInstanceOf[FileSourceScanExecTransformer].bucketedScan)
assert(plans(2).metrics("numFiles").value === 4)
- assert(plans(2).metrics("outputRows").value === 600572)
+ assert(plans(2).metrics("numOutputRows").value === 600572)
}
)
@@ -461,7 +461,7 @@ class GlutenClickHouseTPCHParquetBucketSuite
assert(!(plans(0).asInstanceOf[FileSourceScanExecTransformer].bucketedScan))
assert(plans(0).metrics("numFiles").value === 4)
assert(plans(0).metrics("pruningTime").value === -1)
- assert(plans(0).metrics("outputRows").value === 600572)
+ assert(plans(0).metrics("numOutputRows").value === 600572)
}
)
}
@@ -489,11 +489,11 @@ class GlutenClickHouseTPCHParquetBucketSuite
assert(plans(1).asInstanceOf[FileSourceScanExecTransformer].bucketedScan)
assert(plans(1).metrics("numFiles").value === 4)
- assert(plans(1).metrics("outputRows").value === 150000)
+ assert(plans(1).metrics("numOutputRows").value === 150000)
assert(plans(2).asInstanceOf[FileSourceScanExecTransformer].bucketedScan)
assert(plans(2).metrics("numFiles").value === 4)
- assert(plans(2).metrics("outputRows").value === 600572)
+ assert(plans(2).metrics("numOutputRows").value === 600572)
}
)
diff --git
a/backends-clickhouse/src/test/scala/io/glutenproject/execution/metrics/GlutenClickHouseTPCDSMetricsSuite.scala
b/backends-clickhouse/src/test/scala/io/glutenproject/execution/metrics/GlutenClickHouseTPCDSMetricsSuite.scala
index 3a4cee9c0..bffd4f9ed 100644
---
a/backends-clickhouse/src/test/scala/io/glutenproject/execution/metrics/GlutenClickHouseTPCDSMetricsSuite.scala
+++
b/backends-clickhouse/src/test/scala/io/glutenproject/execution/metrics/GlutenClickHouseTPCDSMetricsSuite.scala
@@ -99,9 +99,9 @@ class GlutenClickHouseTPCDSMetricsSuite extends
GlutenClickHouseTPCDSAbstractSui
assert(windowPlan0.metrics("totalTime").value == 2)
assert(windowPlan0.metrics("inputWaitTime").value == 12)
assert(windowPlan0.metrics("outputWaitTime").value == 0)
- assert(windowPlan0.metrics("outputRows").value == 10717)
+ assert(windowPlan0.metrics("numOutputRows").value == 10717)
assert(windowPlan0.metrics("outputBytes").value == 1224479)
- assert(windowPlan0.metrics("inputRows").value == 10717)
+ assert(windowPlan0.metrics("numInputRows").value == 10717)
assert(windowPlan0.metrics("inputBytes").value == 1128026)
val windowPlan1 = allGlutenPlans(5)
@@ -109,18 +109,18 @@ class GlutenClickHouseTPCDSMetricsSuite extends
GlutenClickHouseTPCDSAbstractSui
assert(windowPlan1.metrics("extraTime").value == 1)
assert(windowPlan1.metrics("inputWaitTime").value == 23)
assert(windowPlan1.metrics("outputWaitTime").value == 2)
- assert(windowPlan1.metrics("outputRows").value == 12333)
+ assert(windowPlan1.metrics("numOutputRows").value == 12333)
assert(windowPlan1.metrics("outputBytes").value == 1360484)
- assert(windowPlan1.metrics("inputRows").value == 12333)
+ assert(windowPlan1.metrics("numInputRows").value == 12333)
assert(windowPlan1.metrics("inputBytes").value == 1261820)
val sortPlan = allGlutenPlans(6)
assert(sortPlan.metrics("totalTime").value == 3)
assert(sortPlan.metrics("inputWaitTime").value == 30)
assert(sortPlan.metrics("outputWaitTime").value == 1)
- assert(sortPlan.metrics("outputRows").value == 12333)
+ assert(sortPlan.metrics("numOutputRows").value == 12333)
assert(sortPlan.metrics("outputBytes").value == 1261820)
- assert(sortPlan.metrics("inputRows").value == 12333)
+ assert(sortPlan.metrics("numInputRows").value == 12333)
assert(sortPlan.metrics("inputBytes").value == 1261820)
}
}
diff --git
a/backends-clickhouse/src/test/scala/io/glutenproject/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala
b/backends-clickhouse/src/test/scala/io/glutenproject/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala
index 7b90a5500..012578317 100644
---
a/backends-clickhouse/src/test/scala/io/glutenproject/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala
+++
b/backends-clickhouse/src/test/scala/io/glutenproject/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala
@@ -68,11 +68,11 @@ class GlutenClickHouseTPCHMetricsSuite extends
GlutenClickHouseTPCHAbstractSuite
assert(plans(2).metrics("pruningTime").value === -1)
assert(plans(2).metrics("filesSize").value === 19230111)
- assert(plans(1).metrics("outputRows").value === 4)
+ assert(plans(1).metrics("numOutputRows").value === 4)
assert(plans(1).metrics("outputVectors").value === 1)
// Execute Sort operator, it will read the data twice.
- assert(plans(0).metrics("outputRows").value === 4)
+ assert(plans(0).metrics("numOutputRows").value === 4)
assert(plans(0).metrics("outputVectors").value === 1)
}
}
@@ -89,8 +89,8 @@ class GlutenClickHouseTPCHMetricsSuite extends
GlutenClickHouseTPCHAbstractSuite
case generate: GenerateExecTransformer => generate
}
assert(plans.size == 1)
- assert(plans.head.metrics("inputRows").value == 25)
- assert(plans.head.metrics("outputRows").value == 266)
+ assert(plans.head.metrics("numInputRows").value == 25)
+ assert(plans.head.metrics("numOutputRows").value == 266)
assert(plans.head.metrics("outputVectors").value == 1)
}
}
@@ -109,11 +109,11 @@ class GlutenClickHouseTPCHMetricsSuite extends
GlutenClickHouseTPCHAbstractSuite
assert(plans(2).metrics("pruningTime").value === -1)
assert(plans(2).metrics("filesSize").value === 19230111)
- assert(plans(1).metrics("outputRows").value === 4)
+ assert(plans(1).metrics("numOutputRows").value === 4)
assert(plans(1).metrics("outputVectors").value === 1)
// Execute Sort operator, it will read the data twice.
- assert(plans(0).metrics("outputRows").value === 4)
+ assert(plans(0).metrics("numOutputRows").value === 4)
assert(plans(0).metrics("outputVectors").value === 1)
}
}
@@ -193,24 +193,24 @@ class GlutenClickHouseTPCHMetricsSuite extends
GlutenClickHouseTPCHAbstractSuite
assert(s.metrics("scanTime").value == 2)
assert(s.metrics("inputWaitTime").value == 4)
assert(s.metrics("outputWaitTime").value == 2)
- assert(s.metrics("outputRows").value == 20000)
+ assert(s.metrics("numOutputRows").value == 20000)
assert(s.metrics("outputBytes").value == 1451663)
case f: FilterExecTransformerBase =>
assert(f.metrics("totalTime").value == 3)
assert(f.metrics("inputWaitTime").value == 14)
assert(f.metrics("outputWaitTime").value == 1)
- assert(f.metrics("outputRows").value == 73)
+ assert(f.metrics("numOutputRows").value == 73)
assert(f.metrics("outputBytes").value == 5304)
- assert(f.metrics("inputRows").value == 20000)
+ assert(f.metrics("numInputRows").value == 20000)
assert(f.metrics("inputBytes").value == 1451663)
assert(f.metrics("extraTime").value == 1)
case p: ProjectExecTransformer =>
assert(p.metrics("totalTime").value == 0)
assert(p.metrics("inputWaitTime").value == 7)
assert(p.metrics("outputWaitTime").value == 0)
- assert(p.metrics("outputRows").value == 73)
+ assert(p.metrics("numOutputRows").value == 73)
assert(p.metrics("outputBytes").value == 2336)
- assert(p.metrics("inputRows").value == 73)
+ assert(p.metrics("numInputRows").value == 73)
assert(p.metrics("inputBytes").value == 5085)
}
}
@@ -230,25 +230,25 @@ class GlutenClickHouseTPCHMetricsSuite extends
GlutenClickHouseTPCHAbstractSuite
assert(scanPlan.metrics("scanTime").value == 2)
assert(scanPlan.metrics("inputWaitTime").value == 3)
assert(scanPlan.metrics("outputWaitTime").value == 1)
- assert(scanPlan.metrics("outputRows").value == 80000)
+ assert(scanPlan.metrics("numOutputRows").value == 80000)
assert(scanPlan.metrics("outputBytes").value == 2160000)
val filterPlan = allGlutenPlans(8)
assert(filterPlan.metrics("totalTime").value == 1)
assert(filterPlan.metrics("inputWaitTime").value == 13)
assert(filterPlan.metrics("outputWaitTime").value == 1)
- assert(filterPlan.metrics("outputRows").value == 80000)
+ assert(filterPlan.metrics("numOutputRows").value == 80000)
assert(filterPlan.metrics("outputBytes").value == 2160000)
- assert(filterPlan.metrics("inputRows").value == 80000)
+ assert(filterPlan.metrics("numInputRows").value == 80000)
assert(filterPlan.metrics("inputBytes").value == 2160000)
val joinPlan = allGlutenPlans(2)
assert(joinPlan.metrics("totalTime").value == 1)
assert(joinPlan.metrics("inputWaitTime").value == 6)
assert(joinPlan.metrics("outputWaitTime").value == 0)
- assert(joinPlan.metrics("outputRows").value == 292)
+ assert(joinPlan.metrics("numOutputRows").value == 292)
assert(joinPlan.metrics("outputBytes").value == 16644)
- assert(joinPlan.metrics("inputRows").value == 80000)
+ assert(joinPlan.metrics("numInputRows").value == 80000)
assert(joinPlan.metrics("inputBytes").value == 1920000)
}
@@ -269,9 +269,9 @@ class GlutenClickHouseTPCHMetricsSuite extends
GlutenClickHouseTPCHAbstractSuite
assert(shjPlan.metrics("totalTime").value == 6)
assert(shjPlan.metrics("inputWaitTime").value == 5)
assert(shjPlan.metrics("outputWaitTime").value == 0)
- assert(shjPlan.metrics("outputRows").value == 44)
+ assert(shjPlan.metrics("numOutputRows").value == 44)
assert(shjPlan.metrics("outputBytes").value == 3740)
- assert(shjPlan.metrics("inputRows").value == 11985)
+ assert(shjPlan.metrics("numInputRows").value == 11985)
assert(shjPlan.metrics("inputBytes").value == 299625)
}
}
diff --git
a/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHParquetReadBenchmark.scala
b/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHParquetReadBenchmark.scala
index d6e1d314c..d2687bd69 100644
---
a/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHParquetReadBenchmark.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHParquetReadBenchmark.scala
@@ -98,7 +98,7 @@ object CHParquetReadBenchmark extends SqlBasedBenchmark with
CHSqlBasedBenchmark
.take(readFileCnt)
.map(_.asInstanceOf[FilePartition])
- val numOutputRows = chFileScan.longMetric("outputRows")
+ val numOutputRows = chFileScan.longMetric("numOutputRows")
val numOutputVectors = chFileScan.longMetric("outputVectors")
val scanTime = chFileScan.longMetric("scanTime")
// Generate Substrait plan
diff --git
a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/MetricsApiImpl.scala
b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/MetricsApiImpl.scala
index 2d1eb1315..71c2642ff 100644
---
a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/MetricsApiImpl.scala
+++
b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/MetricsApiImpl.scala
@@ -42,7 +42,7 @@ class MetricsApiImpl extends MetricsApi with Logging {
Map(
"cpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu wall time
count"),
"wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext,
"totaltime of input iterator"),
- "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output
rows"),
+ "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of
output rows"),
"outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of
output vectors")
)
}
@@ -54,12 +54,12 @@ class MetricsApiImpl extends MetricsApi with Logging {
override def genBatchScanTransformerMetrics(sparkContext: SparkContext):
Map[String, SQLMetric] =
Map(
- "inputRows" -> SQLMetrics.createMetric(sparkContext, "number of input
rows"),
+ "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input
rows"),
"inputVectors" -> SQLMetrics.createMetric(sparkContext, "number of input
vectors"),
"inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of
input bytes"),
"rawInputRows" -> SQLMetrics.createMetric(sparkContext, "number of raw
input rows"),
"rawInputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of
raw input bytes"),
- "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output
rows"),
+ "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of
output rows"),
"outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of
output vectors"),
"outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of
output bytes"),
"wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext,
"totaltime of batch scan"),
@@ -91,7 +91,6 @@ class MetricsApiImpl extends MetricsApi with Logging {
Map(
"rawInputRows" -> SQLMetrics.createMetric(sparkContext, "number of raw
input rows"),
"rawInputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of
raw input bytes"),
- "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output
rows"),
"outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of
output vectors"),
"outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of
output bytes"),
"scanTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime
of scan"),
@@ -132,7 +131,6 @@ class MetricsApiImpl extends MetricsApi with Logging {
Map(
"rawInputRows" -> SQLMetrics.createMetric(sparkContext, "number of raw
input rows"),
"rawInputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of
raw input bytes"),
- "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output
rows"),
"outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of
output vectors"),
"outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of
output bytes"),
"scanTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime
of scan"),
@@ -170,7 +168,7 @@ class MetricsApiImpl extends MetricsApi with Logging {
override def genFilterTransformerMetrics(sparkContext: SparkContext):
Map[String, SQLMetric] =
Map(
- "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output
rows"),
+ "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of
output rows"),
"outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of
output vectors"),
"outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of
output bytes"),
"wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext,
"totaltime of filter"),
@@ -186,7 +184,7 @@ class MetricsApiImpl extends MetricsApi with Logging {
override def genProjectTransformerMetrics(sparkContext: SparkContext):
Map[String, SQLMetric] =
Map(
- "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output
rows"),
+ "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of
output rows"),
"outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of
output vectors"),
"outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of
output bytes"),
"wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext,
"totaltime of project"),
@@ -243,7 +241,7 @@ class MetricsApiImpl extends MetricsApi with Logging {
override def genExpandTransformerMetrics(sparkContext: SparkContext):
Map[String, SQLMetric] =
Map(
- "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output
rows"),
+ "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of
output rows"),
"outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of
output vectors"),
"outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of
output bytes"),
"wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext,
"totaltime of expand"),
@@ -285,7 +283,7 @@ class MetricsApiImpl extends MetricsApi with Logging {
override def genWindowTransformerMetrics(sparkContext: SparkContext):
Map[String, SQLMetric] =
Map(
- "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output
rows"),
+ "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of
output rows"),
"outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of
output vectors"),
"outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of
output bytes"),
"wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext,
"totaltime of window"),
@@ -315,7 +313,7 @@ class MetricsApiImpl extends MetricsApi with Logging {
override def genLimitTransformerMetrics(sparkContext: SparkContext):
Map[String, SQLMetric] =
Map(
- "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output
rows"),
+ "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of
output rows"),
"outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of
output vectors"),
"outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of
output bytes"),
"wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext,
"totaltime of limit"),
@@ -340,7 +338,7 @@ class MetricsApiImpl extends MetricsApi with Logging {
override def genSortTransformerMetrics(sparkContext: SparkContext):
Map[String, SQLMetric] =
Map(
- "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output
rows"),
+ "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of
output rows"),
"outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of
output vectors"),
"outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of
output bytes"),
"wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext,
"totaltime of sort"),
diff --git
a/gluten-core/src/main/scala/io/glutenproject/execution/BasicScanExecTransformer.scala
b/gluten-core/src/main/scala/io/glutenproject/execution/BasicScanExecTransformer.scala
index 3ef1eb75b..625ab6e97 100644
---
a/gluten-core/src/main/scala/io/glutenproject/execution/BasicScanExecTransformer.scala
+++
b/gluten-core/src/main/scala/io/glutenproject/execution/BasicScanExecTransformer.scala
@@ -67,7 +67,7 @@ trait BasicScanExecTransformer extends LeafTransformSupport
with BaseDataSource
}
def doExecuteColumnarInternal(): RDD[ColumnarBatch] = {
- val numOutputRows = longMetric("outputRows")
+ val numOutputRows = longMetric("numOutputRows")
val numOutputVectors = longMetric("outputVectors")
val scanTime = longMetric("scanTime")
val substraitContext = new SubstraitContext
diff --git
a/gluten-data/src/main/scala/io/glutenproject/metrics/BatchScanMetricsUpdater.scala
b/gluten-data/src/main/scala/io/glutenproject/metrics/BatchScanMetricsUpdater.scala
index 0eb8141e5..32f8cd880 100644
---
a/gluten-data/src/main/scala/io/glutenproject/metrics/BatchScanMetricsUpdater.scala
+++
b/gluten-data/src/main/scala/io/glutenproject/metrics/BatchScanMetricsUpdater.scala
@@ -29,12 +29,12 @@ class BatchScanMetricsUpdater(val metrics: Map[String,
SQLMetric]) extends Metri
override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = {
if (opMetrics != null) {
val operatorMetrics = opMetrics.asInstanceOf[OperatorMetrics]
- metrics("inputRows") += operatorMetrics.inputRows
+ metrics("numInputRows") += operatorMetrics.inputRows
metrics("inputVectors") += operatorMetrics.inputVectors
metrics("inputBytes") += operatorMetrics.inputBytes
metrics("rawInputRows") += operatorMetrics.rawInputRows
metrics("rawInputBytes") += operatorMetrics.rawInputBytes
- metrics("outputRows") += operatorMetrics.outputRows
+ metrics("numOutputRows") += operatorMetrics.outputRows
metrics("outputVectors") += operatorMetrics.outputVectors
metrics("outputBytes") += operatorMetrics.outputBytes
metrics("cpuCount") += operatorMetrics.cpuCount
diff --git
a/gluten-data/src/main/scala/io/glutenproject/metrics/ExpandMetricsUpdater.scala
b/gluten-data/src/main/scala/io/glutenproject/metrics/ExpandMetricsUpdater.scala
index 1d57fffd2..e254469f8 100644
---
a/gluten-data/src/main/scala/io/glutenproject/metrics/ExpandMetricsUpdater.scala
+++
b/gluten-data/src/main/scala/io/glutenproject/metrics/ExpandMetricsUpdater.scala
@@ -23,7 +23,7 @@ class ExpandMetricsUpdater(val metrics: Map[String,
SQLMetric]) extends MetricsU
override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = {
if (opMetrics != null) {
val operatorMetrics = opMetrics.asInstanceOf[OperatorMetrics]
- metrics("outputRows") += operatorMetrics.outputRows
+ metrics("numOutputRows") += operatorMetrics.outputRows
metrics("outputVectors") += operatorMetrics.outputVectors
metrics("outputBytes") += operatorMetrics.outputBytes
metrics("cpuCount") += operatorMetrics.cpuCount
diff --git
a/gluten-data/src/main/scala/io/glutenproject/metrics/FileSourceScanMetricsUpdater.scala
b/gluten-data/src/main/scala/io/glutenproject/metrics/FileSourceScanMetricsUpdater.scala
index cbdb7b9fc..ff8b1a576 100644
---
a/gluten-data/src/main/scala/io/glutenproject/metrics/FileSourceScanMetricsUpdater.scala
+++
b/gluten-data/src/main/scala/io/glutenproject/metrics/FileSourceScanMetricsUpdater.scala
@@ -28,7 +28,7 @@ class FileSourceScanMetricsUpdater(@transient val metrics:
Map[String, SQLMetric
val rawInputRows: SQLMetric = metrics("rawInputRows")
val rawInputBytes: SQLMetric = metrics("rawInputBytes")
- val outputRows: SQLMetric = metrics("outputRows")
+ val outputRows: SQLMetric = metrics("numOutputRows")
val outputVectors: SQLMetric = metrics("outputVectors")
val outputBytes: SQLMetric = metrics("outputBytes")
val wallNanos: SQLMetric = metrics("wallNanos")
diff --git
a/gluten-data/src/main/scala/io/glutenproject/metrics/FilterMetricsUpdater.scala
b/gluten-data/src/main/scala/io/glutenproject/metrics/FilterMetricsUpdater.scala
index f9e95875a..f29931023 100644
---
a/gluten-data/src/main/scala/io/glutenproject/metrics/FilterMetricsUpdater.scala
+++
b/gluten-data/src/main/scala/io/glutenproject/metrics/FilterMetricsUpdater.scala
@@ -23,7 +23,7 @@ class FilterMetricsUpdater(val metrics: Map[String,
SQLMetric]) extends MetricsU
override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = {
if (opMetrics != null) {
val operatorMetrics = opMetrics.asInstanceOf[OperatorMetrics]
- metrics("outputRows") += operatorMetrics.outputRows
+ metrics("numOutputRows") += operatorMetrics.outputRows
metrics("outputVectors") += operatorMetrics.outputVectors
metrics("outputBytes") += operatorMetrics.outputBytes
metrics("cpuCount") += operatorMetrics.cpuCount
diff --git
a/gluten-data/src/main/scala/io/glutenproject/metrics/HiveTableScanMetricsUpdater.scala
b/gluten-data/src/main/scala/io/glutenproject/metrics/HiveTableScanMetricsUpdater.scala
index 26ad731e6..b7a858d44 100644
---
a/gluten-data/src/main/scala/io/glutenproject/metrics/HiveTableScanMetricsUpdater.scala
+++
b/gluten-data/src/main/scala/io/glutenproject/metrics/HiveTableScanMetricsUpdater.scala
@@ -23,7 +23,7 @@ class HiveTableScanMetricsUpdater(@transient val metrics:
Map[String, SQLMetric]
extends MetricsUpdater {
val rawInputRows: SQLMetric = metrics("rawInputRows")
val rawInputBytes: SQLMetric = metrics("rawInputBytes")
- val outputRows: SQLMetric = metrics("outputRows")
+ val outputRows: SQLMetric = metrics("numOutputRows")
val outputVectors: SQLMetric = metrics("outputVectors")
val outputBytes: SQLMetric = metrics("outputBytes")
val wallNanos: SQLMetric = metrics("wallNanos")
diff --git
a/gluten-data/src/main/scala/io/glutenproject/metrics/InputIteratorMetricsUpdater.scala
b/gluten-data/src/main/scala/io/glutenproject/metrics/InputIteratorMetricsUpdater.scala
index 2b3967ae3..87ca348fa 100644
---
a/gluten-data/src/main/scala/io/glutenproject/metrics/InputIteratorMetricsUpdater.scala
+++
b/gluten-data/src/main/scala/io/glutenproject/metrics/InputIteratorMetricsUpdater.scala
@@ -26,10 +26,10 @@ case class InputIteratorMetricsUpdater(metrics: Map[String,
SQLMetric]) extends
if (operatorMetrics.outputRows == 0 && operatorMetrics.outputVectors ==
0) {
// Sometimes, velox does not update metrics for intermediate operator,
// here we try to use the input metrics
- metrics("outputRows") += operatorMetrics.inputRows
+ metrics("numOutputRows") += operatorMetrics.inputRows
metrics("outputVectors") += operatorMetrics.inputVectors
} else {
- metrics("outputRows") += operatorMetrics.outputRows
+ metrics("numOutputRows") += operatorMetrics.outputRows
metrics("outputVectors") += operatorMetrics.outputVectors
}
}
diff --git
a/gluten-data/src/main/scala/io/glutenproject/metrics/LimitMetricsUpdater.scala
b/gluten-data/src/main/scala/io/glutenproject/metrics/LimitMetricsUpdater.scala
index f61f7443f..a3ab24637 100644
---
a/gluten-data/src/main/scala/io/glutenproject/metrics/LimitMetricsUpdater.scala
+++
b/gluten-data/src/main/scala/io/glutenproject/metrics/LimitMetricsUpdater.scala
@@ -23,7 +23,7 @@ class LimitMetricsUpdater(val metrics: Map[String,
SQLMetric]) extends MetricsUp
override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = {
if (opMetrics != null) {
val operatorMetrics = opMetrics.asInstanceOf[OperatorMetrics]
- metrics("outputRows") += operatorMetrics.outputRows
+ metrics("numOutputRows") += operatorMetrics.outputRows
metrics("outputVectors") += operatorMetrics.outputVectors
metrics("outputBytes") += operatorMetrics.outputBytes
metrics("cpuCount") += operatorMetrics.cpuCount
diff --git
a/gluten-data/src/main/scala/io/glutenproject/metrics/ProjectMetricsUpdater.scala
b/gluten-data/src/main/scala/io/glutenproject/metrics/ProjectMetricsUpdater.scala
index b2bb17961..03b41202e 100644
---
a/gluten-data/src/main/scala/io/glutenproject/metrics/ProjectMetricsUpdater.scala
+++
b/gluten-data/src/main/scala/io/glutenproject/metrics/ProjectMetricsUpdater.scala
@@ -23,7 +23,7 @@ class ProjectMetricsUpdater(val metrics: Map[String,
SQLMetric]) extends Metrics
override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = {
if (opMetrics != null) {
val operatorMetrics = opMetrics.asInstanceOf[OperatorMetrics]
- metrics("outputRows") += operatorMetrics.outputRows
+ metrics("numOutputRows") += operatorMetrics.outputRows
metrics("outputVectors") += operatorMetrics.outputVectors
metrics("outputBytes") += operatorMetrics.outputBytes
metrics("cpuCount") += operatorMetrics.cpuCount
diff --git
a/gluten-data/src/main/scala/io/glutenproject/metrics/SortMetricsUpdater.scala
b/gluten-data/src/main/scala/io/glutenproject/metrics/SortMetricsUpdater.scala
index 2351b1c9f..38414002d 100644
---
a/gluten-data/src/main/scala/io/glutenproject/metrics/SortMetricsUpdater.scala
+++
b/gluten-data/src/main/scala/io/glutenproject/metrics/SortMetricsUpdater.scala
@@ -23,7 +23,7 @@ class SortMetricsUpdater(val metrics: Map[String, SQLMetric])
extends MetricsUpd
override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = {
if (opMetrics != null) {
val operatorMetrics = opMetrics.asInstanceOf[OperatorMetrics]
- metrics("outputRows") += operatorMetrics.outputRows
+ metrics("numOutputRows") += operatorMetrics.outputRows
metrics("outputVectors") += operatorMetrics.outputVectors
metrics("outputBytes") += operatorMetrics.outputBytes
metrics("cpuCount") += operatorMetrics.cpuCount
diff --git
a/gluten-data/src/main/scala/io/glutenproject/metrics/WindowMetricsUpdater.scala
b/gluten-data/src/main/scala/io/glutenproject/metrics/WindowMetricsUpdater.scala
index 032d7aa1b..7b962294e 100644
---
a/gluten-data/src/main/scala/io/glutenproject/metrics/WindowMetricsUpdater.scala
+++
b/gluten-data/src/main/scala/io/glutenproject/metrics/WindowMetricsUpdater.scala
@@ -23,7 +23,7 @@ class WindowMetricsUpdater(val metrics: Map[String,
SQLMetric]) extends MetricsU
override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = {
if (opMetrics != null) {
val operatorMetrics = opMetrics.asInstanceOf[OperatorMetrics]
- metrics("outputRows") += operatorMetrics.outputRows
+ metrics("numOutputRows") += operatorMetrics.outputRows
metrics("outputVectors") += operatorMetrics.outputVectors
metrics("outputBytes") += operatorMetrics.outputBytes
metrics("cpuCount") += operatorMetrics.cpuCount
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenSQLQuerySuite.scala
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenSQLQuerySuite.scala
index 7199ddd70..bd9699e40 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenSQLQuerySuite.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenSQLQuerySuite.scala
@@ -128,4 +128,30 @@ class GlutenSQLQuerySuite extends SQLQuerySuite with
GlutenSQLTestsTrait {
"Escape character must be followed by '%', '_' or the escape
character itself"))
}
}
+
+ testGluten("StreamingQueryProgress.numInputRows should be correct") {
+ withTempDir {
+ dir =>
+ val path = dir.toURI.getPath
+ val numRows = 20
+ val df = spark.range(0, numRows)
+ df.write.mode("overwrite").format("parquet").save(path)
+ val q = spark.readStream
+ .format("parquet")
+ .schema(df.schema)
+ .load(path)
+ .writeStream
+ .format("memory")
+ .queryName("test")
+ .start()
+ q.processAllAvailable
+ val inputOutputPairs = q.recentProgress.map(p => (p.numInputRows,
p.sink.numOutputRows))
+
+ // numInputRows and sink.numOutputRows must be the same
+ assert(inputOutputPairs.forall(x => x._1 == x._2))
+
+ // Sum of numInputRows must match the total number of rows of the input
+ assert(inputOutputPairs.map(_._1).sum == numRows)
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]