geoffreyclaude commented on code in PR #2521:
URL: https://github.com/apache/iceberg-rust/pull/2521#discussion_r3332717784
##########
crates/integrations/datafusion/src/physical_plan/scan.rs:
##########
@@ -237,6 +260,19 @@ async fn get_batch_stream(
Ok(Box::pin(stream))
}
+fn stream_with_baseline_metrics(
+ mut stream: Pin<Box<dyn Stream<Item = DFResult<RecordBatch>> + Send>>,
+ baseline_metrics: BaselineMetrics,
+) -> Pin<Box<dyn Stream<Item = DFResult<RecordBatch>> + Send>> {
+ futures::stream::poll_fn(move |cx| {
+ let baseline_metrics = baseline_metrics.clone();
Review Comment:
Good catch. Removed the per-poll `clone()` in
[c2ba67aa](https://github.com/apache/iceberg-rust/pull/2521/commits/c2ba67aa1ae7e9e1922d645be083386c8a6959e1).
This is cleaner and also avoids dropping a cloned `BaselineMetrics` on every
poll, which could record `end_timestamp` earlier than intended.
##########
crates/integrations/datafusion/src/physical_plan/scan.rs:
##########
@@ -247,3 +283,58 @@ fn get_column_names(
.collect::<Vec<String>>()
})
}
+
+#[cfg(test)]
+mod tests {
+ use std::sync::Arc;
+
+ use datafusion::arrow::array::Int64Array;
+ use datafusion::arrow::datatypes::{
+ DataType, Field, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef,
+ };
+ use datafusion::arrow::record_batch::RecordBatch;
+ use datafusion::physical_plan::metrics::{BaselineMetrics,
ExecutionPlanMetricsSet};
+ use futures::StreamExt;
+
+ use super::stream_with_baseline_metrics;
+
+ #[test]
+ fn stream_with_baseline_metrics_records_rows_and_compute() {
+ let metrics = ExecutionPlanMetricsSet::new();
+ let baseline_metrics = BaselineMetrics::new(&metrics, 0);
+ let batch = make_batch();
+ let stream = Box::pin(futures::stream::iter([Ok(batch)]));
+ let mut stream = stream_with_baseline_metrics(stream,
baseline_metrics);
+
+ futures::executor::block_on(async {
+ let batch = stream
+ .next()
+ .await
+ .expect("stream should return one item")
+ .expect("stream item should be valid");
+ assert_eq!(batch.num_rows(), 3);
+ assert!(stream.next().await.is_none());
+ });
+
+ let metrics = metrics.clone_inner();
+ assert_eq!(metrics.output_rows(), Some(3));
+ assert!(
+ metrics.elapsed_compute().is_some_and(|elapsed| elapsed > 0),
Review Comment:
Agreed on covering the rest of the baseline metrics. I added assertions for
`output_batches`, `output_bytes`, `start_timestamp`, and `end_timestamp` in the
focused `stream_with_baseline_metrics` test in
[7ef46a33](https://github.com/apache/iceberg-rust/pull/2521/commits/7ef46a33f4f10439dd989d0f58a80f87b39b438a).
I adapted this through `MetricsSet::sum(...)` + `MetricValue` matching
because DataFusion 53.1 does not expose `MetricsSet::output_batches()` /
`output_bytes()` convenience methods. I kept the provider-level test narrower
so it only covers end-to-end metrics exposure and `reset_state()` behavior,
avoiding duplicate low-level baseline assertions.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]