alamb commented on code in PR #8482:
URL: https://github.com/apache/arrow-datafusion/pull/8482#discussion_r1424629371
##########
datafusion/physical-plan/src/unnest.rs:
##########
@@ -207,28 +241,28 @@ impl UnnestStream {
.poll_next_unpin(cx)
.map(|maybe_batch| match maybe_batch {
Some(Ok(batch)) => {
- let start = Instant::now();
+ let timer = self.metrics.elapsed_compute.timer();
let result =
build_batch(&batch, &self.schema, &self.column,
&self.options);
- self.num_input_batches += 1;
- self.num_input_rows += batch.num_rows();
+ self.metrics.input_batches.add(1);
+ self.metrics.input_rows.add(batch.num_rows());
if let Ok(ref batch) = result {
- self.unnest_time += start.elapsed().as_millis() as
usize;
- self.num_output_batches += 1;
- self.num_output_rows += batch.num_rows();
+ timer.done();
+ self.metrics.output_batches.add(1);
+ self.metrics.output_rows.add(batch.num_rows());
}
Some(result)
}
other => {
trace!(
"Processed {} probe-side input batches containing {}
rows and \
- produced {} output batches containing {} rows in {}
ms",
- self.num_input_batches,
- self.num_input_rows,
- self.num_output_batches,
- self.num_output_rows,
- self.unnest_time,
+ produced {} output batches containing {} rows in {}",
Review Comment:
👍
##########
datafusion/core/tests/dataframe/mod.rs:
##########
@@ -1408,6 +1408,28 @@ async fn unnest_with_redundant_columns() -> Result<()> {
Ok(())
}
+#[tokio::test]
+async fn unnest_analyze_metrics() -> Result<()> {
+ const NUM_ROWS: usize = 5;
+
+ let df = table_with_nested_types(NUM_ROWS).await?;
+ let results = df
+ .unnest_column("tags")?
+ .explain(false, true)?
+ .collect()
+ .await?;
+ let formatted = arrow::util::pretty::pretty_format_batches(&results)
+ .unwrap()
+ .to_string();
+ assert_contains!(&formatted, "elapsed_compute=");
Review Comment:
👍
##########
datafusion/core/tests/dataframe/mod.rs:
##########
@@ -1408,6 +1408,28 @@ async fn unnest_with_redundant_columns() -> Result<()> {
Ok(())
}
+#[tokio::test]
+async fn unnest_analyze_metrics() -> Result<()> {
+ const NUM_ROWS: usize = 5;
+
+ let df = table_with_nested_types(NUM_ROWS).await?;
+ let results = df
+ .unnest_column("tags")?
+ .explain(false, true)?
+ .collect()
+ .await?;
+ let formatted = arrow::util::pretty::pretty_format_batches(&results)
+ .unwrap()
+ .to_string();
+ assert_contains!(&formatted, "elapsed_compute=");
Review Comment:
👍
##########
datafusion/physical-plan/src/unnest.rs:
##########
@@ -141,19 +144,58 @@ impl ExecutionPlan for UnnestExec {
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let input = self.input.execute(partition, context)?;
+ let metrics = UnnestMetrics::new(partition, &self.metrics);
Ok(Box::pin(UnnestStream {
input,
schema: self.schema.clone(),
column: self.column.clone(),
options: self.options.clone(),
- num_input_batches: 0,
- num_input_rows: 0,
- num_output_batches: 0,
- num_output_rows: 0,
- unnest_time: 0,
+ metrics,
}))
}
+
+ fn metrics(&self) -> Option<MetricsSet> {
+ Some(self.metrics.clone_inner())
+ }
+}
+
+#[derive(Clone, Debug)]
+struct UnnestMetrics {
Review Comment:
This has some overlap with `BaselineMetrics`, it might be able to be
combined if you wanted to reduce the code size somewhat
https://docs.rs/datafusion/latest/datafusion/physical_plan/metrics/struct.BaselineMetrics.html
##########
datafusion/physical-plan/src/unnest.rs:
##########
@@ -207,28 +241,28 @@ impl UnnestStream {
.poll_next_unpin(cx)
.map(|maybe_batch| match maybe_batch {
Some(Ok(batch)) => {
- let start = Instant::now();
+ let timer = self.metrics.elapsed_compute.timer();
let result =
build_batch(&batch, &self.schema, &self.column,
&self.options);
- self.num_input_batches += 1;
- self.num_input_rows += batch.num_rows();
+ self.metrics.input_batches.add(1);
+ self.metrics.input_rows.add(batch.num_rows());
if let Ok(ref batch) = result {
- self.unnest_time += start.elapsed().as_millis() as
usize;
- self.num_output_batches += 1;
- self.num_output_rows += batch.num_rows();
+ timer.done();
+ self.metrics.output_batches.add(1);
+ self.metrics.output_rows.add(batch.num_rows());
}
Some(result)
}
other => {
trace!(
"Processed {} probe-side input batches containing {}
rows and \
- produced {} output batches containing {} rows in {}
ms",
- self.num_input_batches,
- self.num_input_rows,
- self.num_output_batches,
- self.num_output_rows,
- self.unnest_time,
+ produced {} output batches containing {} rows in {}",
Review Comment:
👍
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]