This is an automated email from the ASF dual-hosted git repository.
zhztheplayer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git
The following commit(s) were added to refs/heads/main by this push:
new b8c5711d67 [VL] Add metric abandonedPartialAggregation (#12106)
b8c5711d67 is described below
commit b8c5711d672c40ef3cc0717646074fa1bdeac46e
Author: Hongze Zhang <[email protected]>
AuthorDate: Mon May 25 16:27:40 2026 +0100
[VL] Add metric abandonedPartialAggregation (#12106)
---
.../java/org/apache/gluten/metrics/Metrics.java | 4 ++++
.../org/apache/gluten/metrics/OperatorMetrics.java | 3 +++
.../gluten/backendsapi/velox/VeloxMetricsApi.scala | 3 +++
.../metrics/HashAggregateMetricsUpdater.scala | 2 ++
.../org/apache/gluten/metrics/MetricsUtil.scala | 3 +++
.../gluten/execution/VeloxMetricsSuite.scala | 27 +++++++++++++++++++++-
cpp/core/jni/JniWrapper.cc | 3 ++-
cpp/core/utils/Metrics.h | 1 +
cpp/velox/compute/WholeStageResultIterator.cc | 3 +++
9 files changed, 47 insertions(+), 2 deletions(-)
diff --git
a/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java
b/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java
index 1bcfd67392..12f38d84e5 100644
--- a/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java
+++ b/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java
@@ -42,6 +42,7 @@ public class Metrics implements IMetrics {
public long[] numReplacedWithDynamicFilterRows;
public long[] numDynamicFilterInputRows;
public long[] flushRowCount;
+ public long[] abandonedPartialAggregationRows;
public long[] loadedToValueHook;
public long[] bloomFilterBlocksByteSize;
public long[] skippedSplits;
@@ -94,6 +95,7 @@ public class Metrics implements IMetrics {
long[] numReplacedWithDynamicFilterRows,
long[] numDynamicFilterInputRows,
long[] flushRowCount,
+ long[] abandonedPartialAggregationRows,
long[] loadedToValueHook,
long[] bloomFilterBlocksByteSize,
long[] scanTime,
@@ -140,6 +142,7 @@ public class Metrics implements IMetrics {
this.numReplacedWithDynamicFilterRows = numReplacedWithDynamicFilterRows;
this.numDynamicFilterInputRows = numDynamicFilterInputRows;
this.flushRowCount = flushRowCount;
+ this.abandonedPartialAggregationRows = abandonedPartialAggregationRows;
this.loadedToValueHook = loadedToValueHook;
this.bloomFilterBlocksByteSize = bloomFilterBlocksByteSize;
this.skippedSplits = skippedSplits;
@@ -192,6 +195,7 @@ public class Metrics implements IMetrics {
numReplacedWithDynamicFilterRows[index],
numDynamicFilterInputRows[index],
flushRowCount[index],
+ abandonedPartialAggregationRows[index],
loadedToValueHook[index],
bloomFilterBlocksByteSize[index],
scanTime[index],
diff --git
a/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java
b/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java
index d52245a334..18feed0b9e 100644
---
a/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java
+++
b/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java
@@ -40,6 +40,7 @@ public class OperatorMetrics implements IOperatorMetrics {
public long numReplacedWithDynamicFilterRows;
public long numDynamicFilterInputRows;
public long flushRowCount;
+ public long abandonedPartialAggregationRows;
public long loadedToValueHook;
public long bloomFilterBlocksByteSize;
public long skippedSplits;
@@ -87,6 +88,7 @@ public class OperatorMetrics implements IOperatorMetrics {
long numReplacedWithDynamicFilterRows,
long numDynamicFilterInputRows,
long flushRowCount,
+ long abandonedPartialAggregationRows,
long loadedToValueHook,
long bloomFilterBlocksByteSize,
long scanTime,
@@ -131,6 +133,7 @@ public class OperatorMetrics implements IOperatorMetrics {
this.numReplacedWithDynamicFilterRows = numReplacedWithDynamicFilterRows;
this.numDynamicFilterInputRows = numDynamicFilterInputRows;
this.flushRowCount = flushRowCount;
+ this.abandonedPartialAggregationRows = abandonedPartialAggregationRows;
this.loadedToValueHook = loadedToValueHook;
this.bloomFilterBlocksByteSize = bloomFilterBlocksByteSize;
this.skippedSplits = skippedSplits;
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
index 69ed2f09a9..800fd758e8 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
@@ -311,6 +311,9 @@ class VeloxMetricsApi extends MetricsApi with Logging {
"number of spilled partitions"),
"aggSpilledFiles" -> SQLMetrics.createMetric(sparkContext, "number of
spilled files"),
"flushRowCount" -> SQLMetrics.createMetric(sparkContext, "number of
flushed rows"),
+ "abandonedPartialAggregationRows" -> SQLMetrics.createMetric(
+ sparkContext,
+ "number of rows after partial aggregation abandonment"),
"loadedToValueHook" -> SQLMetrics.createMetric(
sparkContext,
"number of pushdown aggregations"),
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/metrics/HashAggregateMetricsUpdater.scala
b/backends-velox/src/main/scala/org/apache/gluten/metrics/HashAggregateMetricsUpdater.scala
index 3843a34b83..dcfa0d7cf3 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/metrics/HashAggregateMetricsUpdater.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/metrics/HashAggregateMetricsUpdater.scala
@@ -44,6 +44,7 @@ class HashAggregateMetricsUpdaterImpl(val metrics:
Map[String, SQLMetric])
val aggSpilledPartitions: SQLMetric = metrics("aggSpilledPartitions")
val aggSpilledFiles: SQLMetric = metrics("aggSpilledFiles")
val flushRowCount: SQLMetric = metrics("flushRowCount")
+ val abandonedPartialAggregationRows: SQLMetric =
metrics("abandonedPartialAggregationRows")
val loadedToValueHook: SQLMetric = metrics("loadedToValueHook")
val rowConstructionCpuCount: SQLMetric = metrics("rowConstructionCpuCount")
@@ -81,6 +82,7 @@ class HashAggregateMetricsUpdaterImpl(val metrics:
Map[String, SQLMetric])
aggSpilledPartitions += aggMetrics.spilledPartitions
aggSpilledFiles += aggMetrics.spilledFiles
flushRowCount += aggMetrics.flushRowCount
+ abandonedPartialAggregationRows +=
aggMetrics.abandonedPartialAggregationRows
loadedToValueHook += aggMetrics.loadedToValueHook
idx += 1
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
b/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
index f2c9214b06..24930ea153 100644
--- a/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
+++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
@@ -122,6 +122,7 @@ object MetricsUtil extends Logging {
var numReplacedWithDynamicFilterRows: Long = 0
var numDynamicFilterInputRows: Long = 0
var flushRowCount: Long = 0
+ var abandonedPartialAggregationRows: Long = 0
var loadedToValueHook: Long = 0
var bloomFilterBlocksByteSize: Long = 0
var scanTime: Long = 0
@@ -159,6 +160,7 @@ object MetricsUtil extends Logging {
numReplacedWithDynamicFilterRows +=
metrics.numReplacedWithDynamicFilterRows
numDynamicFilterInputRows += metrics.numDynamicFilterInputRows
flushRowCount += metrics.flushRowCount
+ abandonedPartialAggregationRows +=
metrics.abandonedPartialAggregationRows
loadedToValueHook += metrics.loadedToValueHook
bloomFilterBlocksByteSize += metrics.bloomFilterBlocksByteSize
scanTime += metrics.scanTime
@@ -203,6 +205,7 @@ object MetricsUtil extends Logging {
numReplacedWithDynamicFilterRows,
numDynamicFilterInputRows,
flushRowCount,
+ abandonedPartialAggregationRows,
loadedToValueHook,
bloomFilterBlocksByteSize,
scanTime,
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala
index 35edc4fa6e..861729e349 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala
@@ -16,7 +16,7 @@
*/
package org.apache.gluten.execution
-import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.config.{GlutenConfig, VeloxConfig}
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.spark.SparkConf
@@ -189,6 +189,31 @@ class VeloxMetricsSuite extends
VeloxWholeStageTransformerSuite with AdaptiveSpa
}
}
+ test("Hash aggregate metrics include abandoned partial aggregation rows") {
+ withSQLConf(
+ GlutenConfig.COLUMNAR_MAX_BATCH_SIZE.key -> "10",
+ VeloxConfig.ABANDON_PARTIAL_AGGREGATION_MIN_ROWS.key -> "0",
+ VeloxConfig.ABANDON_PARTIAL_AGGREGATION_MIN_PCT.key -> "0"
+ ) {
+ runQueryAndCompare("SELECT c2, sum(c1) FROM metrics_t1 GROUP BY c2") {
+ df =>
+ val aggregates = collect(df.queryExecution.executedPlan) {
+ case agg: HashAggregateExecBaseTransformer => agg
+ }
+ assert(aggregates.nonEmpty)
+ val numTotalAbandonedPartialAggregationRows = aggregates.map {
+ agg =>
+ val metrics = agg.metrics
+ assert(metrics.contains("abandonedPartialAggregationRows"))
+ val num = metrics("abandonedPartialAggregationRows").value
+ assert(num >= 0)
+ num
+ }.sum
+ assert(numTotalAbandonedPartialAggregationRows > 0)
+ }
+ }
+ }
+
test("Metrics of noop filter's children") {
runQueryAndCompare("SELECT c1, c2 FROM metrics_t1 where c1 < 50") {
df =>
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index 46b9d7603c..a726d3be91 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -277,7 +277,7 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
env,
metricsBuilderClass,
"<init>",
-
"([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[JLjava/lang/String;)V");
+
"([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[JLjava/lang/String;)V");
nativeColumnarToRowInfoClass =
createGlobalClassReferenceOrError(env,
"Lorg/apache/gluten/vectorized/NativeColumnarToRowInfo;");
@@ -595,6 +595,7 @@ JNIEXPORT jobject JNICALL
Java_org_apache_gluten_metrics_IteratorMetricsJniWrapp
longArray[Metrics::kNumReplacedWithDynamicFilterRows],
longArray[Metrics::kNumDynamicFilterInputRows],
longArray[Metrics::kFlushRowCount],
+ longArray[Metrics::kAbandonedPartialAggregationRows],
longArray[Metrics::kLoadedToValueHook],
longArray[Metrics::kBloomFilterBlocksByteSize],
longArray[Metrics::kScanTime],
diff --git a/cpp/core/utils/Metrics.h b/cpp/core/utils/Metrics.h
index 55f2bb4ef5..67c0b485c7 100644
--- a/cpp/core/utils/Metrics.h
+++ b/cpp/core/utils/Metrics.h
@@ -69,6 +69,7 @@ struct Metrics {
kNumReplacedWithDynamicFilterRows,
kNumDynamicFilterInputRows,
kFlushRowCount,
+ kAbandonedPartialAggregationRows,
kLoadedToValueHook,
kBloomFilterBlocksByteSize,
kScanTime,
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc
b/cpp/velox/compute/WholeStageResultIterator.cc
index ccc1917f41..2b957ce54d 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -44,6 +44,7 @@ const std::string kDynamicFiltersAccepted =
"dynamicFiltersAccepted";
const std::string kReplacedWithDynamicFilterRows =
"replacedWithDynamicFilterRows";
const std::string kDynamicFilterInputRows = "dynamicFilterInputRows";
const std::string kFlushRowCount = "flushRowCount";
+const std::string kAbandonedPartialAggregationRows =
"abandonedPartialAggregationRows";
const std::string kLoadedToValueHook = "loadedToValueHook";
const std::string kBloomFilterBlocksByteSize = "bloomFilterSize";
const std::string kTotalScanTime = "totalScanTime";
@@ -504,6 +505,8 @@ void WholeStageResultIterator::collectMetrics() {
metrics_->get(Metrics::kNumDynamicFilterInputRows)[metricIndex] =
runtimeMetric("sum", second->customStats, kDynamicFilterInputRows);
metrics_->get(Metrics::kFlushRowCount)[metricIndex] =
runtimeMetric("sum", second->customStats, kFlushRowCount);
+ metrics_->get(Metrics::kAbandonedPartialAggregationRows)[metricIndex] =
+ runtimeMetric("sum", second->customStats,
kAbandonedPartialAggregationRows);
metrics_->get(Metrics::kLoadedToValueHook)[metricIndex] =
runtimeMetric("sum", second->customStats, kLoadedToValueHook);
metrics_->get(Metrics::kBloomFilterBlocksByteSize)[metricIndex] =
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]