alamb commented on code in PR #6507:
URL: https://github.com/apache/arrow-datafusion/pull/6507#discussion_r1212356119
##########
datafusion/core/src/physical_plan/stream.rs:
##########
@@ -126,3 +256,97 @@ where
self.schema.clone()
}
}
+
+/// Stream wrapper that records `BaselineMetrics` for a particular
+/// partition
+pub(crate) struct ObservedStream {
+ inner: SendableRecordBatchStream,
+ baseline_metrics: BaselineMetrics,
+}
+
+impl ObservedStream {
+ pub fn new(
+ inner: SendableRecordBatchStream,
+ baseline_metrics: BaselineMetrics,
+ ) -> Self {
+ Self {
+ inner,
+ baseline_metrics,
+ }
+ }
+}
+
+impl RecordBatchStream for ObservedStream {
+ fn schema(&self) -> arrow::datatypes::SchemaRef {
+ self.inner.schema()
+ }
+}
+
+impl futures::Stream for ObservedStream {
+ type Item = Result<RecordBatch>;
+
+ fn poll_next(
+ mut self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Option<Self::Item>> {
+ let poll = self.inner.poll_next_unpin(cx);
+ self.baseline_metrics.record_poll(poll)
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+ use arrow_schema::{DataType, Field, Schema};
+
+ use crate::{execution::context::SessionContext, test::exec::PanicingExec};
+
+ #[tokio::test]
+ #[should_panic(expected = "PanickingStream did panic")]
+ async fn record_batch_receiver_stream_propagates_panics() {
+ let schema =
+ Arc::new(Schema::new(vec![Field::new("a", DataType::Float32,
true)]));
+
+ let num_partitions = 10;
+ let input = PanicingExec::new(schema.clone(), num_partitions);
+ consume(input).await
+ }
+
+ #[tokio::test]
+ #[should_panic(expected = "PanickingStream did panic: 1")]
+ async fn record_batch_receiver_stream_propagates_panics_one() {
+ let schema =
+ Arc::new(Schema::new(vec![Field::new("a", DataType::Float32,
true)]));
+
+ // make 2 partitions, second panics before the first
+ let num_partitions = 2;
+ let input = PanicingExec::new(schema.clone(), num_partitions)
+ .with_partition_panic(0, 10)
+ .with_partition_panic(1, 3); // partition 1 should panic first
(after 3 )
Review Comment:
Here is a test showing that when the second partition panic's it is properly
reported
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]