alamb commented on code in PR #6507:
URL: https://github.com/apache/arrow-datafusion/pull/6507#discussion_r1219958272
##########
datafusion/core/src/physical_plan/stream.rs:
##########
@@ -17,43 +17,206 @@
//! Stream wrappers for physical operators
+use std::sync::Arc;
+
use crate::error::Result;
+use crate::physical_plan::displayable;
use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
-use futures::{Stream, StreamExt};
+use datafusion_common::DataFusionError;
+use datafusion_execution::TaskContext;
+use futures::stream::BoxStream;
+use futures::{Future, Stream, StreamExt};
+use log::debug;
use pin_project_lite::pin_project;
-use tokio::task::JoinHandle;
+use tokio::sync::mpsc::{Receiver, Sender};
+use tokio::task::JoinSet;
use tokio_stream::wrappers::ReceiverStream;
-use super::common::AbortOnDropSingle;
-use super::{RecordBatchStream, SendableRecordBatchStream};
+use super::metrics::BaselineMetrics;
+use super::{ExecutionPlan, RecordBatchStream, SendableRecordBatchStream};
-/// Adapter for a tokio [`ReceiverStream`] that implements the
-/// [`SendableRecordBatchStream`]
-/// interface
-pub struct RecordBatchReceiverStream {
+/// Builder for [`RecordBatchReceiverStream`] that propagates errors
+/// and panic's correctly.
+///
+/// [`RecordBatchReceiverStream`] is used to spawn one or more tasks
+/// that produce `RecordBatch`es and send them to a single
+/// `Receiver` which can improve parallelism.
+///
+/// This also handles propagating panic`s and canceling the tasks.
+pub struct RecordBatchReceiverStreamBuilder {
+ tx: Sender<Result<RecordBatch>>,
+ rx: Receiver<Result<RecordBatch>>,
schema: SchemaRef,
+ join_set: JoinSet<()>,
+}
+
+impl RecordBatchReceiverStreamBuilder {
+ /// create new channels with the specified buffer size
+ pub fn new(schema: SchemaRef, capacity: usize) -> Self {
+ let (tx, rx) = tokio::sync::mpsc::channel(capacity);
- inner: ReceiverStream<Result<RecordBatch>>,
+ Self {
+ tx,
+ rx,
+ schema,
+ join_set: JoinSet::new(),
+ }
+ }
+
+ /// Get a handle for sending [`RecordBatch`]es to the output
+ pub fn tx(&self) -> Sender<Result<RecordBatch>> {
+ self.tx.clone()
+ }
+
+ /// Spawn task that will be aborted if this builder (or the stream
+ /// built from it) are dropped
+ ///
+ /// this is often used to spawn tasks that write to the sender
+ /// retrieved from `Self::tx`
+ pub fn spawn<F>(&mut self, task: F)
+ where
+ F: Future<Output = ()>,
+ F: Send + 'static,
+ {
+ self.join_set.spawn(task);
+ }
+
+ /// Spawn a blocking task that will be aborted if this builder (or the
stream
+ /// built from it) are dropped
+ ///
+ /// this is often used to spawn tasks that write to the sender
+ /// retrieved from `Self::tx`
+ pub fn spawn_blocking<F>(&mut self, f: F)
+ where
+ F: FnOnce(),
+ F: Send + 'static,
+ {
+ self.join_set.spawn_blocking(f);
+ }
+
+ /// runs the input_partition of the `input` ExecutionPlan on the
+ /// tokio threadpool and writes its outputs to this stream
+ ///
+ /// If the input partition produces an error, the error will be
+ /// sent to the output stream and no further results are sent.
+ pub(crate) fn run_input(
+ &mut self,
+ input: Arc<dyn ExecutionPlan>,
+ partition: usize,
+ context: Arc<TaskContext>,
+ ) {
+ let output = self.tx();
+
+ self.spawn(async move {
+ let mut stream = match input.execute(partition, context) {
+ Err(e) => {
+ // If send fails, the plan being torn down, there
+ // is no place to send the error and no reason to continue.
+ output.send(Err(e)).await.ok();
+ debug!(
+ "Stopping execution: error executing input: {}",
+ displayable(input.as_ref()).one_line()
+ );
+ return;
+ }
+ Ok(stream) => stream,
+ };
+
+ // Transfer batches from inner stream to the output tx
+ // immediately.
+ while let Some(item) = stream.next().await {
+ let is_err = item.is_err();
+
+ // If send fails, plan being torn down, there is no
+ // place to send the error and no reason to continue.
+ if output.send(item).await.is_err() {
+ debug!(
+ "Stopping execution: output is gone, plan cancelling:
{}",
+ displayable(input.as_ref()).one_line()
+ );
+ return;
+ }
+
+ // stop after the first error is encontered (don't
+ // drive all streams to completion)
+ if is_err {
+ debug!(
+ "Stopping execution: plan returned error: {}",
+ displayable(input.as_ref()).one_line()
+ );
+ return;
+ }
+ }
+ });
+ }
+
+ /// Create a stream of all `RecordBatch`es written to `tx`
+ pub fn build(self) -> SendableRecordBatchStream {
+ let Self {
+ tx,
+ rx,
+ schema,
+ mut join_set,
+ } = self;
- #[allow(dead_code)]
- drop_helper: AbortOnDropSingle<()>,
+ // don't need tx
+ drop(tx);
+
+ // future that checks the result of the join set, and propagates panic
if seen
+ let check = async move {
+ while let Some(result) = join_set.join_next().await {
+ match result {
+ Ok(()) => continue, // nothing to report
+ // This means a tokio task error, likely a panic
+ Err(e) => {
+ if e.is_panic() {
+ // resume on the main thread
+ std::panic::resume_unwind(e.into_panic());
+ } else {
+ // This should only occur if the task is
+ // cancelled, which would only occur if
+ // the JoinSet were aborted, which in turn
+ // would imply that the receiver has been
+ // dropped and this code is not running
+ return Some(Err(DataFusionError::Internal(format!(
+ "Non Panic Task error: {e}"
+ ))));
+ }
+ }
+ }
+ }
+ None
+ };
+
+ let check_stream = futures::stream::once(check)
+ // unwrap Option / only return the error
+ .filter_map(|item| async move { item });
+
+ // Merge the streams together so whichever is ready first
+ // produces the batch (since futures::stream:StreamExt is
+ // already in scope, need to call it explicitly)
Review Comment:
(it also inspired me to do
https://github.com/apache/arrow-datafusion/pull/6565)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]