This is an automated email from the ASF dual-hosted git repository.
mbutrovich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 6c34c7a59 fix: native shuffle now reports spill metrics correctly
(#3197)
6c34c7a59 is described below
commit 6c34c7a592cdea1ac80ba1a602f7b233840babf0
Author: Andy Grove <[email protected]>
AuthorDate: Fri Jan 16 07:31:27 2026 -0700
fix: native shuffle now reports spill metrics correctly (#3197)
---
native/core/src/execution/shuffle/shuffle_writer.rs | 2 +-
.../sql/comet/execution/shuffle/CometNativeShuffleWriter.scala | 8 ++++++++
2 files changed, 9 insertions(+), 1 deletion(-)
diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs
b/native/core/src/execution/shuffle/shuffle_writer.rs
index c4dcddc54..f21cde2ba 100644
--- a/native/core/src/execution/shuffle/shuffle_writer.rs
+++ b/native/core/src/execution/shuffle/shuffle_writer.rs
@@ -700,7 +700,7 @@ impl MultiPartitionShuffleRepartitioner {
}
fn spill(&mut self) -> Result<()> {
- log::debug!(
+ log::info!(
"ShuffleRepartitioner spilling shuffle data of {} to disk while
inserting ({} time(s) so far)",
self.used(),
self.spill_count()
diff --git
a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala
b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala
index 9fd101854..b5d15b41f 100644
---
a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala
+++
b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala
@@ -132,6 +132,14 @@ class CometNativeShuffleWriter[K, V](
metricsReporter.incRecordsWritten(metricsOutputRows.value)
metricsReporter.incWriteTime(metricsWriteTime.value)
+ // Report spill metrics to Spark's task metrics so they appear in
+ // Spark UI task summaries (not just SQL metrics)
+ val spilledBytes =
nativeSQLMetrics.get("spilled_bytes").map(_.value).getOrElse(0L)
+ if (spilledBytes > 0) {
+ context.taskMetrics().incMemoryBytesSpilled(spilledBytes)
+ context.taskMetrics().incDiskBytesSpilled(spilledBytes)
+ }
+
// commit
shuffleBlockResolver.writeMetadataFileAndCommit(
shuffleId,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]