This is an automated email from the ASF dual-hosted git repository.

mbutrovich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 6c34c7a59 fix: native shuffle now reports spill metrics correctly 
(#3197)
6c34c7a59 is described below

commit 6c34c7a592cdea1ac80ba1a602f7b233840babf0
Author: Andy Grove <[email protected]>
AuthorDate: Fri Jan 16 07:31:27 2026 -0700

    fix: native shuffle now reports spill metrics correctly (#3197)
---
 native/core/src/execution/shuffle/shuffle_writer.rs               | 2 +-
 .../sql/comet/execution/shuffle/CometNativeShuffleWriter.scala    | 8 ++++++++
 2 files changed, 9 insertions(+), 1 deletion(-)

diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs 
b/native/core/src/execution/shuffle/shuffle_writer.rs
index c4dcddc54..f21cde2ba 100644
--- a/native/core/src/execution/shuffle/shuffle_writer.rs
+++ b/native/core/src/execution/shuffle/shuffle_writer.rs
@@ -700,7 +700,7 @@ impl MultiPartitionShuffleRepartitioner {
     }
 
     fn spill(&mut self) -> Result<()> {
-        log::debug!(
+        log::info!(
             "ShuffleRepartitioner spilling shuffle data of {} to disk while 
inserting ({} time(s) so far)",
             self.used(),
             self.spill_count()
diff --git 
a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala
 
b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala
index 9fd101854..b5d15b41f 100644
--- 
a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala
+++ 
b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala
@@ -132,6 +132,14 @@ class CometNativeShuffleWriter[K, V](
     metricsReporter.incRecordsWritten(metricsOutputRows.value)
     metricsReporter.incWriteTime(metricsWriteTime.value)
 
+    // Report spill metrics to Spark's task metrics so they appear in
+    // Spark UI task summaries (not just SQL metrics)
+    val spilledBytes = 
nativeSQLMetrics.get("spilled_bytes").map(_.value).getOrElse(0L)
+    if (spilledBytes > 0) {
+      context.taskMetrics().incMemoryBytesSpilled(spilledBytes)
+      context.taskMetrics().incDiskBytesSpilled(spilledBytes)
+    }
+
     // commit
     shuffleBlockResolver.writeMetadataFileAndCommit(
       shuffleId,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to