This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new e297d23b feat: Improve shuffle metrics (second attempt) (#1175)
e297d23b is described below

commit e297d23bd38bc306c90ed21a154d1495f985683e
Author: Andy Grove <[email protected]>
AuthorDate: Wed Dec 18 10:50:07 2024 -0700

    feat: Improve shuffle metrics (second attempt) (#1175)
    
    * improve shuffle metrics
    
    * docs
    
    * more metrics
    
    * refactor
    
    * address feedback
---
 docs/source/index.rst                              |   1 +
 docs/source/user-guide/metrics.md                  |  66 +++++
 docs/source/user-guide/tuning.md                   |  25 --
 native/core/src/execution/shuffle/row.rs           |   6 +-
 .../core/src/execution/shuffle/shuffle_writer.rs   | 271 ++++++++++++---------
 .../spark/sql/comet/CometCollectLimitExec.scala    |   3 +-
 .../apache/spark/sql/comet/CometMetricNode.scala   |  11 +
 .../sql/comet/CometTakeOrderedAndProjectExec.scala |   3 +-
 .../shuffle/CometShuffleExchangeExec.scala         |  15 +-
 9 files changed, 261 insertions(+), 140 deletions(-)

diff --git a/docs/source/index.rst b/docs/source/index.rst
index 39ad27a5..21ec36ca 100644
--- a/docs/source/index.rst
+++ b/docs/source/index.rst
@@ -51,6 +51,7 @@ as a native runtime to achieve improvement in terms of query 
efficiency and quer
    Configuration Settings <user-guide/configs>
    Compatibility Guide <user-guide/compatibility>
    Tuning Guide <user-guide/tuning>
+   Metrics Guide <user-guide/metrics>
 
 .. _toc.contributor-guide-links:
 .. toctree::
diff --git a/docs/source/user-guide/metrics.md 
b/docs/source/user-guide/metrics.md
new file mode 100644
index 00000000..509d0ae8
--- /dev/null
+++ b/docs/source/user-guide/metrics.md
@@ -0,0 +1,66 @@
+<!---
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Comet Metrics
+
+## Spark SQL Metrics
+
+Set `spark.comet.metrics.detailed=true` to see all available Comet metrics.
+
+### CometScanExec
+
+| Metric      | Description                                                    
                                                                                
                                                                                
                                                    |
+| ----------- | 
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 |
+| `scan time` | Total time to scan a Parquet file. This is not comparable to 
the same metric in Spark because Comet's scan metric is more accurate. Although 
both Comet and Spark measure the time in nanoseconds, Spark rounds this time to 
the nearest millisecond per batch and Comet does not. |
+
+### Exchange
+
+Comet adds some additional metrics:
+
+| Metric                          | Description                                
                   |
+| ------------------------------- | 
------------------------------------------------------------- |
+| `native shuffle time`           | Total time in native code excluding any 
child operators.      |
+| `repartition time`              | Time to repartition batches.               
                   |
+| `memory pool time`              | Time interacting with memory pool.         
                   |
+| `encoding and compression time` | Time to encode batches in IPC format and 
compress using ZSTD. |
+
+## Native Metrics
+
+Setting `spark.comet.explain.native.enabled=true` will cause native plans to 
be logged in each executor. Metrics are
+logged for each native plan (and there is one plan per task, so this is very 
verbose).
+
+Here is a guide to some of the native metrics.
+
+### ScanExec
+
+| Metric            | Description                                              
                                           |
+| ----------------- | 
---------------------------------------------------------------------------------------------------
 |
+| `elapsed_compute` | Total time spent in this operator, fetching batches from 
a JVM iterator.                            |
+| `jvm_fetch_time`  | Time spent in the JVM fetching input batches to be read 
by this `ScanExec` instance.                |
+| `arrow_ffi_time`  | Time spent using Arrow FFI to create Arrow batches from 
the memory addresses returned from the JVM. |
+
+### ShuffleWriterExec
+
+| Metric            | Description                                              
     |
+| ----------------- | 
------------------------------------------------------------- |
+| `elapsed_compute` | Total time excluding any child operators.                
     |
+| `repart_time`     | Time to repartition batches.                             
     |
+| `ipc_time`        | Time to encode batches in IPC format and compress using 
ZSTD. |
+| `mempool_time`    | Time interacting with memory pool.                       
     |
+| `write_time`      | Time spent writing bytes to disk.                        
     |
diff --git a/docs/source/user-guide/tuning.md b/docs/source/user-guide/tuning.md
index f10a0dde..d68481d1 100644
--- a/docs/source/user-guide/tuning.md
+++ b/docs/source/user-guide/tuning.md
@@ -103,31 +103,6 @@ native shuffle currently only supports `HashPartitioning` 
and `SinglePartitionin
 To enable native shuffle, set `spark.comet.exec.shuffle.mode` to `native`. If 
this mode is explicitly set,
 then any shuffle operations that cannot be supported in this mode will fall 
back to Spark.
 
-##  Metrics
-
-### Spark SQL Metrics
-
-Some Comet metrics are not directly comparable to Spark metrics in some cases:
-
-- `CometScanExec` uses nanoseconds for total scan time. Spark also measures 
scan time in nanoseconds but converts to
-  milliseconds _per batch_ which can result in a large loss of precision, 
making it difficult to compare scan times
-  between Spark and Comet.
-
-### Native Metrics
-
-Setting `spark.comet.explain.native.enabled=true` will cause native plans to 
be logged in each executor. Metrics are
-logged for each native plan (and there is one plan per task, so this is very 
verbose).
-
-Here is a guide to some of the native metrics.
-
-### ScanExec
-
-| Metric            | Description                                              
                                           |
-| ----------------- | 
---------------------------------------------------------------------------------------------------
 |
-| `elapsed_compute` | Total time spent in this operator, fetching batches from 
a JVM iterator.                            |
-| `jvm_fetch_time`  | Time spent in the JVM fetching input batches to be read 
by this `ScanExec` instance.                |
-| `arrow_ffi_time`  | Time spent using Arrow FFI to create Arrow batches from 
the memory addresses returned from the JVM. |
-
 ## Explain Plan
 ### Extended Explain
 With Spark 4.0.0 and newer, Comet can provide extended explain plan 
information in the Spark UI. Currently this lists
diff --git a/native/core/src/execution/shuffle/row.rs 
b/native/core/src/execution/shuffle/row.rs
index ce752e68..ecab77d9 100644
--- a/native/core/src/execution/shuffle/row.rs
+++ b/native/core/src/execution/shuffle/row.rs
@@ -40,6 +40,7 @@ use arrow_array::{
     Array, ArrayRef, RecordBatch, RecordBatchOptions,
 };
 use arrow_schema::{ArrowError, DataType, Field, Schema, TimeUnit};
+use datafusion::physical_plan::metrics::Time;
 use jni::sys::{jint, jlong};
 use std::{
     fs::OpenOptions,
@@ -3354,7 +3355,10 @@ pub fn process_sorted_row_partition(
         let mut frozen: Vec<u8> = vec![];
         let mut cursor = Cursor::new(&mut frozen);
         cursor.seek(SeekFrom::End(0))?;
-        written += write_ipc_compressed(&batch, &mut cursor)?;
+
+        // we do not collect metrics in Native_writeSortedFileNative
+        let ipc_time = Time::default();
+        written += write_ipc_compressed(&batch, &mut cursor, &ipc_time)?;
 
         if let Some(checksum) = &mut current_checksum {
             checksum.update(&mut cursor)?;
diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs 
b/native/core/src/execution/shuffle/shuffle_writer.rs
index 7587ff06..fcc8c51f 100644
--- a/native/core/src/execution/shuffle/shuffle_writer.rs
+++ b/native/core/src/execution/shuffle/shuffle_writer.rs
@@ -17,22 +17,14 @@
 
 //! Defines the External shuffle repartition plan.
 
-use std::{
-    any::Any,
-    fmt,
-    fmt::{Debug, Formatter},
-    fs::{File, OpenOptions},
-    io::{BufReader, BufWriter, Cursor, Read, Seek, SeekFrom, Write},
-    path::Path,
-    sync::Arc,
-    task::{Context, Poll},
+use crate::{
+    common::bit::ceil,
+    errors::{CometError, CometResult},
 };
-
 use arrow::{datatypes::*, ipc::writer::StreamWriter};
 use async_trait::async_trait;
 use bytes::Buf;
 use crc32fast::Hasher;
-use datafusion::physical_plan::metrics::Time;
 use datafusion::{
     arrow::{
         array::*,
@@ -48,23 +40,32 @@ use datafusion::{
         runtime_env::RuntimeEnv,
     },
     physical_plan::{
-        metrics::{BaselineMetrics, Count, ExecutionPlanMetricsSet, 
MetricBuilder, MetricsSet},
+        metrics::{
+            BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, 
MetricsSet, Time,
+        },
         stream::RecordBatchStreamAdapter,
         DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, 
Partitioning, PlanProperties,
         RecordBatchStream, SendableRecordBatchStream, Statistics,
     },
 };
+use datafusion_comet_spark_expr::spark_hash::create_murmur3_hashes;
 use datafusion_physical_expr::EquivalenceProperties;
 use futures::executor::block_on;
 use futures::{lock::Mutex, Stream, StreamExt, TryFutureExt, TryStreamExt};
 use itertools::Itertools;
 use simd_adler32::Adler32;
-
-use crate::{
-    common::bit::ceil,
-    errors::{CometError, CometResult},
+use std::io::Error;
+use std::{
+    any::Any,
+    fmt,
+    fmt::{Debug, Formatter},
+    fs::{File, OpenOptions},
+    io::{BufReader, BufWriter, Cursor, Read, Seek, SeekFrom, Write},
+    path::Path,
+    sync::Arc,
+    task::{Context, Poll},
 };
-use datafusion_comet_spark_expr::spark_hash::create_murmur3_hashes;
+use tokio::time::Instant;
 
 /// The status of appending rows to a partition buffer.
 enum AppendRowStatus {
@@ -242,7 +243,7 @@ impl PartitionBuffer {
 
     /// Initializes active builders if necessary.
     /// Returns error if memory reservation fails.
-    fn init_active_if_necessary(&mut self) -> Result<isize> {
+    fn init_active_if_necessary(&mut self, metrics: 
&ShuffleRepartitionerMetrics) -> Result<isize> {
         let mut mem_diff = 0;
 
         if self.active.is_empty() {
@@ -256,9 +257,13 @@ impl PartitionBuffer {
                     .sum::<usize>();
             }
 
+            let mut mempool_timer = metrics.mempool_time.timer();
             self.reservation.try_grow(self.active_slots_mem_size)?;
+            mempool_timer.stop();
 
+            let mut repart_timer = metrics.repart_time.timer();
             self.active = new_array_builders(&self.schema, self.batch_size);
+            repart_timer.stop();
 
             mem_diff += self.active_slots_mem_size as isize;
         }
@@ -271,13 +276,13 @@ impl PartitionBuffer {
         columns: &[ArrayRef],
         indices: &[usize],
         start_index: usize,
-        time_metric: &Time,
+        metrics: &ShuffleRepartitionerMetrics,
     ) -> AppendRowStatus {
         let mut mem_diff = 0;
         let mut start = start_index;
 
         // lazy init because some partition may be empty
-        let init = self.init_active_if_necessary();
+        let init = self.init_active_if_necessary(metrics);
         if init.is_err() {
             return AppendRowStatus::StartIndex(start);
         }
@@ -285,6 +290,8 @@ impl PartitionBuffer {
 
         while start < indices.len() {
             let end = (start + self.batch_size).min(indices.len());
+
+            let mut repart_timer = metrics.repart_time.timer();
             self.active
                 .iter_mut()
                 .zip(columns)
@@ -292,16 +299,16 @@ impl PartitionBuffer {
                     append_columns(builder, column, &indices[start..end], 
column.data_type());
                 });
             self.num_active_rows += end - start;
+            repart_timer.stop();
+
             if self.num_active_rows >= self.batch_size {
-                let mut timer = time_metric.timer();
-                let flush = self.flush();
+                let flush = self.flush(&metrics.ipc_time);
                 if let Err(e) = flush {
                     return AppendRowStatus::MemDiff(Err(e));
                 }
                 mem_diff += flush.unwrap();
-                timer.stop();
 
-                let init = self.init_active_if_necessary();
+                let init = self.init_active_if_necessary(metrics);
                 if init.is_err() {
                     return AppendRowStatus::StartIndex(end);
                 }
@@ -313,7 +320,7 @@ impl PartitionBuffer {
     }
 
     /// flush active data into frozen bytes
-    fn flush(&mut self) -> Result<isize> {
+    fn flush(&mut self, ipc_time: &Time) -> Result<isize> {
         if self.num_active_rows == 0 {
             return Ok(0);
         }
@@ -330,7 +337,7 @@ impl PartitionBuffer {
         let frozen_capacity_old = self.frozen.capacity();
         let mut cursor = Cursor::new(&mut self.frozen);
         cursor.seek(SeekFrom::End(0))?;
-        write_ipc_compressed(&frozen_batch, &mut cursor)?;
+        write_ipc_compressed(&frozen_batch, &mut cursor, ipc_time)?;
 
         mem_diff += (self.frozen.capacity() - frozen_capacity_old) as isize;
         Ok(mem_diff)
@@ -628,6 +635,21 @@ struct ShuffleRepartitionerMetrics {
     /// metrics
     baseline: BaselineMetrics,
 
+    /// Time to perform repartitioning
+    repart_time: Time,
+
+    /// Time interacting with memory pool
+    mempool_time: Time,
+
+    /// Time encoding batches to IPC format
+    ipc_time: Time,
+
+    /// Time spent writing to disk. Maps to "shuffleWriteTime" in Spark SQL 
Metrics.
+    write_time: Time,
+
+    /// Number of input batches
+    input_batches: Count,
+
     /// count of spills during the execution of the operator
     spill_count: Count,
 
@@ -642,6 +664,11 @@ impl ShuffleRepartitionerMetrics {
     fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
         Self {
             baseline: BaselineMetrics::new(metrics, partition),
+            repart_time: 
MetricBuilder::new(metrics).subset_time("repart_time", partition),
+            mempool_time: 
MetricBuilder::new(metrics).subset_time("mempool_time", partition),
+            ipc_time: MetricBuilder::new(metrics).subset_time("ipc_time", 
partition),
+            write_time: MetricBuilder::new(metrics).subset_time("write_time", 
partition),
+            input_batches: 
MetricBuilder::new(metrics).counter("input_batches", partition),
             spill_count: MetricBuilder::new(metrics).spill_count(partition),
             spilled_bytes: 
MetricBuilder::new(metrics).spilled_bytes(partition),
             data_size: MetricBuilder::new(metrics).counter("data_size", 
partition),
@@ -701,6 +728,7 @@ impl ShuffleRepartitioner {
     /// This function will slice input batch according to configured batch 
size and then
     /// shuffle rows into corresponding partition buffer.
     async fn insert_batch(&mut self, batch: RecordBatch) -> Result<()> {
+        let start_time = Instant::now();
         let mut start = 0;
         while start < batch.num_rows() {
             let end = (start + self.batch_size).min(batch.num_rows());
@@ -708,6 +736,11 @@ impl ShuffleRepartitioner {
             self.partitioning_batch(batch).await?;
             start = end;
         }
+        self.metrics.input_batches.add(1);
+        self.metrics
+            .baseline
+            .elapsed_compute()
+            .add_duration(start_time.elapsed());
         Ok(())
     }
 
@@ -738,53 +771,61 @@ impl ShuffleRepartitioner {
         let num_output_partitions = self.num_output_partitions;
         match &self.partitioning {
             Partitioning::Hash(exprs, _) => {
-                let arrays = exprs
-                    .iter()
-                    .map(|expr| 
expr.evaluate(&input)?.into_array(input.num_rows()))
-                    .collect::<Result<Vec<_>>>()?;
-
-                // use identical seed as spark hash partition
-                let hashes_buf = &mut self.hashes_buf[..arrays[0].len()];
-                hashes_buf.fill(42_u32);
-
-                // Hash arrays and compute buckets based on number of 
partitions
-                let partition_ids = &mut self.partition_ids[..arrays[0].len()];
-                create_murmur3_hashes(&arrays, hashes_buf)?
-                    .iter()
-                    .enumerate()
-                    .for_each(|(idx, hash)| {
-                        partition_ids[idx] = pmod(*hash, 
num_output_partitions) as u64
+                let (partition_starts, shuffled_partition_ids): (Vec<usize>, 
Vec<usize>) = {
+                    let mut timer = self.metrics.repart_time.timer();
+                    let arrays = exprs
+                        .iter()
+                        .map(|expr| 
expr.evaluate(&input)?.into_array(input.num_rows()))
+                        .collect::<Result<Vec<_>>>()?;
+
+                    // use identical seed as spark hash partition
+                    let hashes_buf = &mut self.hashes_buf[..arrays[0].len()];
+                    hashes_buf.fill(42_u32);
+
+                    // Hash arrays and compute buckets based on number of 
partitions
+                    let partition_ids = &mut 
self.partition_ids[..arrays[0].len()];
+                    create_murmur3_hashes(&arrays, hashes_buf)?
+                        .iter()
+                        .enumerate()
+                        .for_each(|(idx, hash)| {
+                            partition_ids[idx] = pmod(*hash, 
num_output_partitions) as u64
+                        });
+
+                    // count each partition size
+                    let mut partition_counters = vec![0usize; 
num_output_partitions];
+                    partition_ids
+                        .iter()
+                        .for_each(|partition_id| 
partition_counters[*partition_id as usize] += 1);
+
+                    // accumulate partition counters into partition ends
+                    // e.g. partition counter: [1, 3, 2, 1] => [1, 4, 6, 7]
+                    let mut partition_ends = partition_counters;
+                    let mut accum = 0;
+                    partition_ends.iter_mut().for_each(|v| {
+                        *v += accum;
+                        accum = *v;
                     });
 
-                // count each partition size
-                let mut partition_counters = vec![0usize; 
num_output_partitions];
-                partition_ids
-                    .iter()
-                    .for_each(|partition_id| partition_counters[*partition_id 
as usize] += 1);
-
-                // accumulate partition counters into partition ends
-                // e.g. partition counter: [1, 3, 2, 1] => [1, 4, 6, 7]
-                let mut partition_ends = partition_counters;
-                let mut accum = 0;
-                partition_ends.iter_mut().for_each(|v| {
-                    *v += accum;
-                    accum = *v;
-                });
-
-                // calculate shuffled partition ids
-                // e.g. partition ids: [3, 1, 1, 1, 2, 2, 0] => [6, 1, 2, 3, 
4, 5, 0] which is the
-                // row indices for rows ordered by their partition id. For 
example, first partition
-                // 0 has one row index [6], partition 1 has row indices [1, 2, 
3], etc.
-                let mut shuffled_partition_ids = vec![0usize; 
input.num_rows()];
-                for (index, partition_id) in 
partition_ids.iter().enumerate().rev() {
-                    partition_ends[*partition_id as usize] -= 1;
-                    let end = partition_ends[*partition_id as usize];
-                    shuffled_partition_ids[end] = index;
-                }
+                    // calculate shuffled partition ids
+                    // e.g. partition ids: [3, 1, 1, 1, 2, 2, 0] => [6, 1, 2, 
3, 4, 5, 0] which is the
+                    // row indices for rows ordered by their partition id. For 
example, first partition
+                    // 0 has one row index [6], partition 1 has row indices 
[1, 2, 3], etc.
+                    let mut shuffled_partition_ids = vec![0usize; 
input.num_rows()];
+                    for (index, partition_id) in 
partition_ids.iter().enumerate().rev() {
+                        partition_ends[*partition_id as usize] -= 1;
+                        let end = partition_ends[*partition_id as usize];
+                        shuffled_partition_ids[end] = index;
+                    }
 
-                // after calculating, partition ends become partition starts
-                let mut partition_starts = partition_ends;
-                partition_starts.push(input.num_rows());
+                    // after calculating, partition ends become partition 
starts
+                    let mut partition_starts = partition_ends;
+                    partition_starts.push(input.num_rows());
+                    timer.stop();
+                    Ok::<(Vec<usize>, Vec<usize>), DataFusionError>((
+                        partition_starts,
+                        shuffled_partition_ids,
+                    ))
+                }?;
 
                 // For each interval of row indices of partition, taking rows 
from input batch and
                 // appending into output buffer.
@@ -804,11 +845,20 @@ impl ShuffleRepartitioner {
 
                     if mem_diff > 0 {
                         let mem_increase = mem_diff as usize;
-                        if self.reservation.try_grow(mem_increase).is_err() {
+
+                        let try_grow = {
+                            let mut mempool_timer = 
self.metrics.mempool_time.timer();
+                            let result = 
self.reservation.try_grow(mem_increase);
+                            mempool_timer.stop();
+                            result
+                        };
+
+                        if try_grow.is_err() {
                             self.spill().await?;
+                            let mut mempool_timer = 
self.metrics.mempool_time.timer();
                             self.reservation.free();
                             self.reservation.try_grow(mem_increase)?;
-
+                            mempool_timer.stop();
                             mem_diff = 0;
                         }
                     }
@@ -816,7 +866,9 @@ impl ShuffleRepartitioner {
                     if mem_diff < 0 {
                         let mem_used = self.reservation.size();
                         let mem_decrease = mem_used.min(-mem_diff as usize);
+                        let mut mempool_timer = 
self.metrics.mempool_time.timer();
                         self.reservation.shrink(mem_decrease);
+                        mempool_timer.stop();
                     }
                 }
             }
@@ -848,12 +900,13 @@ impl ShuffleRepartitioner {
 
     /// Writes buffered shuffled record batches into Arrow IPC bytes.
     async fn shuffle_write(&mut self) -> Result<SendableRecordBatchStream> {
+        let mut elapsed_compute = 
self.metrics.baseline.elapsed_compute().timer();
         let num_output_partitions = self.num_output_partitions;
         let buffered_partitions = &mut self.buffered_partitions;
         let mut output_batches: Vec<Vec<u8>> = vec![vec![]; 
num_output_partitions];
-
+        let mut offsets = vec![0; num_output_partitions + 1];
         for i in 0..num_output_partitions {
-            buffered_partitions[i].flush()?;
+            buffered_partitions[i].flush(&self.metrics.ipc_time)?;
             output_batches[i] = std::mem::take(&mut 
buffered_partitions[i].frozen);
         }
 
@@ -863,53 +916,38 @@ impl ShuffleRepartitioner {
         let data_file = self.output_data_file.clone();
         let index_file = self.output_index_file.clone();
 
-        let mut offsets = vec![0; num_output_partitions + 1];
-        let mut output_data = OpenOptions::new()
+        let mut write_time = self.metrics.write_time.timer();
+
+        let output_data = OpenOptions::new()
             .write(true)
             .create(true)
             .truncate(true)
             .open(data_file)
             .map_err(|e| DataFusionError::Execution(format!("shuffle write 
error: {:?}", e)))?;
 
-        for i in 0..num_output_partitions {
-            let mut timer = self.metrics.baseline.elapsed_compute().timer();
+        let mut output_data = BufWriter::new(output_data);
 
+        for i in 0..num_output_partitions {
             offsets[i] = output_data.stream_position()?;
             output_data.write_all(&output_batches[i])?;
-
-            timer.stop();
-
             output_batches[i].clear();
 
             // append partition in each spills
             for spill in &output_spills {
                 let length = spill.offsets[i + 1] - spill.offsets[i];
                 if length > 0 {
-                    let mut timer = 
self.metrics.baseline.elapsed_compute().timer();
-
                     let mut spill_file =
-                        
BufReader::new(File::open(spill.file.path()).map_err(|e| {
-                            DataFusionError::Execution(format!("shuffle write 
error: {:?}", e))
-                        })?);
+                        
BufReader::new(File::open(spill.file.path()).map_err(Self::to_df_err)?);
                     spill_file.seek(SeekFrom::Start(spill.offsets[i]))?;
-                    std::io::copy(&mut spill_file.take(length), &mut 
output_data).map_err(|e| {
-                        DataFusionError::Execution(format!("shuffle write 
error: {:?}", e))
-                    })?;
-
-                    timer.stop();
+                    std::io::copy(&mut spill_file.take(length), &mut 
output_data)
+                        .map_err(Self::to_df_err)?;
                 }
             }
         }
-        let mut timer = self.metrics.baseline.elapsed_compute().timer();
         output_data.flush()?;
-        timer.stop();
 
         // add one extra offset at last to ease partition length computation
-        offsets[num_output_partitions] = output_data
-            .stream_position()
-            .map_err(|e| DataFusionError::Execution(format!("shuffle write 
error: {:?}", e)))?;
-
-        let mut timer = self.metrics.baseline.elapsed_compute().timer();
+        offsets[num_output_partitions] = 
output_data.stream_position().map_err(Self::to_df_err)?;
 
         let mut output_index =
             BufWriter::new(File::create(index_file).map_err(|e| {
@@ -918,19 +956,27 @@ impl ShuffleRepartitioner {
         for offset in offsets {
             output_index
                 .write_all(&(offset as i64).to_le_bytes()[..])
-                .map_err(|e| DataFusionError::Execution(format!("shuffle write 
error: {:?}", e)))?;
+                .map_err(Self::to_df_err)?;
         }
         output_index.flush()?;
 
-        timer.stop();
+        write_time.stop();
 
+        let mut mempool_timer = self.metrics.mempool_time.timer();
         let used = self.reservation.size();
         self.reservation.shrink(used);
+        mempool_timer.stop();
+
+        elapsed_compute.stop();
 
         // shuffle writer always has empty output
         Ok(Box::pin(EmptyStream::try_new(Arc::clone(&self.schema))?))
     }
 
+    fn to_df_err(e: Error) -> DataFusionError {
+        DataFusionError::Execution(format!("shuffle write error: {:?}", e))
+    }
+
     fn used(&self) -> usize {
         self.reservation.size()
     }
@@ -959,7 +1005,7 @@ impl ShuffleRepartitioner {
             return Ok(0);
         }
 
-        let mut timer = self.metrics.baseline.elapsed_compute().timer();
+        let mut timer = self.metrics.write_time.timer();
 
         let spillfile = self
             .runtime
@@ -969,6 +1015,7 @@ impl ShuffleRepartitioner {
             &mut self.buffered_partitions,
             spillfile.path(),
             self.num_output_partitions,
+            &self.metrics.ipc_time,
         )?;
 
         timer.stop();
@@ -995,12 +1042,10 @@ impl ShuffleRepartitioner {
 
         let output = &mut self.buffered_partitions[partition_id];
 
-        let time_metric = self.metrics.baseline.elapsed_compute();
-
         // If the range of indices is not big enough, just appending the rows 
into
         // active array builders instead of directly adding them as a record 
batch.
         let mut start_index: usize = 0;
-        let mut output_ret = output.append_rows(columns, indices, start_index, 
time_metric);
+        let mut output_ret = output.append_rows(columns, indices, start_index, 
&self.metrics);
 
         loop {
             match output_ret {
@@ -1012,15 +1057,15 @@ impl ShuffleRepartitioner {
                     // Cannot allocate enough memory for the array builders in 
the partition,
                     // spill partitions and retry.
                     self.spill().await?;
-                    self.reservation.free();
 
+                    let mut mempool_timer = self.metrics.mempool_time.timer();
+                    self.reservation.free();
                     let output = &mut self.buffered_partitions[partition_id];
                     output.reservation.free();
-
-                    let time_metric = self.metrics.baseline.elapsed_compute();
+                    mempool_timer.stop();
 
                     start_index = new_start;
-                    output_ret = output.append_rows(columns, indices, 
start_index, time_metric);
+                    output_ret = output.append_rows(columns, indices, 
start_index, &self.metrics);
 
                     if let AppendRowStatus::StartIndex(new_start) = output_ret 
{
                         if new_start == start_index {
@@ -1045,11 +1090,12 @@ fn spill_into(
     buffered_partitions: &mut [PartitionBuffer],
     path: &Path,
     num_output_partitions: usize,
+    ipc_time: &Time,
 ) -> Result<Vec<u64>> {
     let mut output_batches: Vec<Vec<u8>> = vec![vec![]; num_output_partitions];
 
     for i in 0..num_output_partitions {
-        buffered_partitions[i].flush()?;
+        buffered_partitions[i].flush(ipc_time)?;
         output_batches[i] = std::mem::take(&mut buffered_partitions[i].frozen);
     }
     let path = path.to_owned();
@@ -1485,10 +1531,13 @@ impl Checksum {
 pub(crate) fn write_ipc_compressed<W: Write + Seek>(
     batch: &RecordBatch,
     output: &mut W,
+    ipc_time: &Time,
 ) -> Result<usize> {
     if batch.num_rows() == 0 {
         return Ok(0);
     }
+
+    let mut timer = ipc_time.timer();
     let start_pos = output.stream_position()?;
 
     // write ipc_length placeholder
@@ -1508,8 +1557,10 @@ pub(crate) fn write_ipc_compressed<W: Write + Seek>(
     // fill ipc length
     output.seek(SeekFrom::Start(start_pos))?;
     output.write_all(&ipc_length.to_le_bytes()[..])?;
-
     output.seek(SeekFrom::Start(end_pos))?;
+
+    timer.stop();
+
     Ok((end_pos - start_pos) as usize)
 }
 
diff --git 
a/spark/src/main/scala/org/apache/spark/sql/comet/CometCollectLimitExec.scala 
b/spark/src/main/scala/org/apache/spark/sql/comet/CometCollectLimitExec.scala
index 8ea0b176..f75af507 100644
--- 
a/spark/src/main/scala/org/apache/spark/sql/comet/CometCollectLimitExec.scala
+++ 
b/spark/src/main/scala/org/apache/spark/sql/comet/CometCollectLimitExec.scala
@@ -57,7 +57,8 @@ case class CometCollectLimitExec(
     "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
     "numPartitions" -> SQLMetrics.createMetric(
       sparkContext,
-      "number of partitions")) ++ readMetrics ++ writeMetrics
+      "number of partitions")) ++ readMetrics ++ writeMetrics ++ 
CometMetricNode.shuffleMetrics(
+    sparkContext)
 
   private lazy val serializer: Serializer =
     new UnsafeRowSerializer(child.output.size, longMetric("dataSize"))
diff --git 
a/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala 
b/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala
index 47c89d94..a26fa28c 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala
@@ -130,6 +130,17 @@ object CometMetricNode {
       "spilled_rows" -> SQLMetrics.createMetric(sc, "Total spilled rows"))
   }
 
+  def shuffleMetrics(sc: SparkContext): Map[String, SQLMetric] = {
+    Map(
+      "elapsed_compute" -> SQLMetrics.createNanoTimingMetric(sc, "native 
shuffle time"),
+      "mempool_time" -> SQLMetrics.createNanoTimingMetric(sc, "memory pool 
time"),
+      "repart_time" -> SQLMetrics.createNanoTimingMetric(sc, "repartition 
time"),
+      "ipc_time" -> SQLMetrics.createNanoTimingMetric(sc, "encoding and 
compression time"),
+      "spill_count" -> SQLMetrics.createMetric(sc, "number of spills"),
+      "spilled_bytes" -> SQLMetrics.createMetric(sc, "spilled bytes"),
+      "input_batches" -> SQLMetrics.createMetric(sc, "number of input 
batches"))
+  }
+
   /**
    * Creates a [[CometMetricNode]] from a [[CometPlan]].
    */
diff --git 
a/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala
 
b/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala
index 5582f4d6..19586628 100644
--- 
a/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala
+++ 
b/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala
@@ -57,7 +57,8 @@ case class CometTakeOrderedAndProjectExec(
     "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
     "numPartitions" -> SQLMetrics.createMetric(
       sparkContext,
-      "number of partitions")) ++ readMetrics ++ writeMetrics
+      "number of partitions")) ++ readMetrics ++ writeMetrics ++ 
CometMetricNode.shuffleMetrics(
+    sparkContext)
 
   private lazy val serializer: Serializer =
     new UnsafeRowSerializer(child.output.size, longMetric("dataSize"))
diff --git 
a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala
 
b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala
index b1dd9ac8..0cd8a9ce 100644
--- 
a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala
+++ 
b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala
@@ -79,7 +79,8 @@ case class CometShuffleExchangeExec(
     "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
     "numPartitions" -> SQLMetrics.createMetric(
       sparkContext,
-      "number of partitions")) ++ readMetrics ++ writeMetrics
+      "number of partitions")) ++ readMetrics ++ writeMetrics ++ 
CometMetricNode.shuffleMetrics(
+    sparkContext)
 
   override def nodeName: String = if (shuffleType == CometNativeShuffle) {
     "CometExchange"
@@ -477,11 +478,21 @@ class CometShuffleWriteProcessor(
     // Call native shuffle write
     val nativePlan = getNativePlan(tempDataFilename, tempIndexFilename)
 
+    val detailedMetrics = Seq(
+      "elapsed_compute",
+      "ipc_time",
+      "repart_time",
+      "mempool_time",
+      "input_batches",
+      "spill_count",
+      "spilled_bytes")
+
     // Maps native metrics to SQL metrics
     val nativeSQLMetrics = Map(
       "output_rows" -> 
metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN),
       "data_size" -> metrics("dataSize"),
-      "elapsed_compute" -> 
metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME))
+      "write_time" -> 
metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME)) ++
+      metrics.filterKeys(detailedMetrics.contains)
     val nativeMetrics = CometMetricNode(nativeSQLMetrics)
 
     // Getting rid of the fake partitionId


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to