alamb commented on code in PR #6507:
URL: https://github.com/apache/arrow-datafusion/pull/6507#discussion_r1219727470
##########
datafusion/core/src/physical_plan/stream.rs:
##########
@@ -17,43 +17,206 @@
//! Stream wrappers for physical operators
+use std::sync::Arc;
+
use crate::error::Result;
+use crate::physical_plan::displayable;
use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
-use futures::{Stream, StreamExt};
+use datafusion_common::DataFusionError;
+use datafusion_execution::TaskContext;
+use futures::stream::BoxStream;
+use futures::{Future, Stream, StreamExt};
+use log::debug;
use pin_project_lite::pin_project;
-use tokio::task::JoinHandle;
+use tokio::sync::mpsc::{Receiver, Sender};
+use tokio::task::JoinSet;
use tokio_stream::wrappers::ReceiverStream;
-use super::common::AbortOnDropSingle;
-use super::{RecordBatchStream, SendableRecordBatchStream};
+use super::metrics::BaselineMetrics;
+use super::{ExecutionPlan, RecordBatchStream, SendableRecordBatchStream};
-/// Adapter for a tokio [`ReceiverStream`] that implements the
-/// [`SendableRecordBatchStream`]
-/// interface
-pub struct RecordBatchReceiverStream {
+/// Builder for [`RecordBatchReceiverStream`] that propagates errors
+/// and panic's correctly.
+///
+/// [`RecordBatchReceiverStream`] is used to spawn one or more tasks
+/// that produce `RecordBatch`es and send them to a single
+/// `Receiver` which can improve parallelism.
+///
+/// This also handles propagating panic`s and canceling the tasks.
+pub struct RecordBatchReceiverStreamBuilder {
+ tx: Sender<Result<RecordBatch>>,
+ rx: Receiver<Result<RecordBatch>>,
schema: SchemaRef,
+ join_set: JoinSet<()>,
+}
+
+impl RecordBatchReceiverStreamBuilder {
+ /// create new channels with the specified buffer size
+ pub fn new(schema: SchemaRef, capacity: usize) -> Self {
+ let (tx, rx) = tokio::sync::mpsc::channel(capacity);
- inner: ReceiverStream<Result<RecordBatch>>,
+ Self {
+ tx,
+ rx,
+ schema,
+ join_set: JoinSet::new(),
+ }
+ }
+
+ /// Get a handle for sending [`RecordBatch`]es to the output
+ pub fn tx(&self) -> Sender<Result<RecordBatch>> {
+ self.tx.clone()
+ }
+
+ /// Spawn task that will be aborted if this builder (or the stream
+ /// built from it) are dropped
+ ///
+ /// this is often used to spawn tasks that write to the sender
+ /// retrieved from `Self::tx`
+ pub fn spawn<F>(&mut self, task: F)
+ where
+ F: Future<Output = ()>,
+ F: Send + 'static,
+ {
+ self.join_set.spawn(task);
+ }
+
+ /// Spawn a blocking task that will be aborted if this builder (or the
stream
+ /// built from it) are dropped
+ ///
+ /// this is often used to spawn tasks that write to the sender
+ /// retrieved from `Self::tx`
+ pub fn spawn_blocking<F>(&mut self, f: F)
+ where
+ F: FnOnce(),
+ F: Send + 'static,
+ {
+ self.join_set.spawn_blocking(f);
+ }
+
+ /// runs the input_partition of the `input` ExecutionPlan on the
+ /// tokio threadpool and writes its outputs to this stream
+ ///
+ /// If the input partition produces an error, the error will be
+ /// sent to the output stream and no further results are sent.
+ pub(crate) fn run_input(
+ &mut self,
+ input: Arc<dyn ExecutionPlan>,
+ partition: usize,
+ context: Arc<TaskContext>,
+ ) {
+ let output = self.tx();
+
+ self.spawn(async move {
+ let mut stream = match input.execute(partition, context) {
+ Err(e) => {
+ // If send fails, the plan being torn down, there
+ // is no place to send the error and no reason to continue.
+ output.send(Err(e)).await.ok();
+ debug!(
+ "Stopping execution: error executing input: {}",
+ displayable(input.as_ref()).one_line()
+ );
+ return;
+ }
+ Ok(stream) => stream,
+ };
+
+ // Transfer batches from inner stream to the output tx
+ // immediately.
+ while let Some(item) = stream.next().await {
+ let is_err = item.is_err();
+
+ // If send fails, plan being torn down, there is no
+ // place to send the error and no reason to continue.
+ if output.send(item).await.is_err() {
+ debug!(
+ "Stopping execution: output is gone, plan cancelling:
{}",
+ displayable(input.as_ref()).one_line()
+ );
+ return;
+ }
+
+ // stop after the first error is encontered (don't
+ // drive all streams to completion)
+ if is_err {
+ debug!(
+ "Stopping execution: plan returned error: {}",
+ displayable(input.as_ref()).one_line()
+ );
+ return;
+ }
+ }
+ });
+ }
+
+ /// Create a stream of all `RecordBatch`es written to `tx`
+ pub fn build(self) -> SendableRecordBatchStream {
+ let Self {
+ tx,
+ rx,
+ schema,
+ mut join_set,
+ } = self;
- #[allow(dead_code)]
- drop_helper: AbortOnDropSingle<()>,
+ // don't need tx
+ drop(tx);
+
+ // future that checks the result of the join set, and propagates panic
if seen
+ let check = async move {
+ while let Some(result) = join_set.join_next().await {
+ match result {
+ Ok(()) => continue, // nothing to report
+ // This means a tokio task error, likely a panic
+ Err(e) => {
+ if e.is_panic() {
+ // resume on the main thread
+ std::panic::resume_unwind(e.into_panic());
+ } else {
+ // This should only occur if the task is
+ // cancelled, which would only occur if
+ // the JoinSet were aborted, which in turn
+ // would imply that the receiver has been
+ // dropped and this code is not running
+ return Some(Err(DataFusionError::Internal(format!(
Review Comment:
I haven't studied the tokio JoinHandle code or under what conditions it
currently or in the future might return an error (like if the task is canceled
in some way will it error??) .
Given that the API returns an error I think handling and propagating the
error is the most future proof thing to do.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]