tustvold commented on code in PR #7817:
URL: https://github.com/apache/arrow-datafusion/pull/7817#discussion_r1358365586
##########
datafusion/physical-plan/src/stream.rs:
##########
@@ -38,6 +38,113 @@ use tokio::task::JoinSet;
use super::metrics::BaselineMetrics;
use super::{ExecutionPlan, RecordBatchStream, SendableRecordBatchStream};
+/// Creates a stream from a collection of producing tasks, routing panics to
the stream
+pub(crate) struct ReceiverStreamBuilder<O> {
+ tx: Sender<Result<O>>,
+ rx: Receiver<Result<O>>,
+ join_set: JoinSet<Result<()>>,
+}
+
+impl<O: Send + 'static> ReceiverStreamBuilder<O> {
+ /// create new channels with the specified buffer size
+ pub fn new(capacity: usize) -> Self {
+ let (tx, rx) = tokio::sync::mpsc::channel(capacity);
+
+ Self {
+ tx,
+ rx,
+ join_set: JoinSet::new(),
+ }
+ }
+
+ /// Get a handle for sending [`O`] to the output
+ pub fn tx(&self) -> Sender<Result<O>> {
+ self.tx.clone()
+ }
+
+ /// Spawn task that will be aborted if this builder (or the stream
+ /// built from it) are dropped
+ pub fn spawn<F>(&mut self, task: F)
+ where
+ F: Future<Output = Result<()>>,
+ F: Send + 'static,
+ {
+ self.join_set.spawn(task);
+ }
+
+ /// Spawn a blocking task that will be aborted if this builder (or the
stream
+ /// built from it) are dropped
+ ///
+ /// this is often used to spawn tasks that write to the sender
+ /// retrieved from `Self::tx`
+ pub fn spawn_blocking<F>(&mut self, f: F)
+ where
+ F: FnOnce() -> Result<()>,
+ F: Send + 'static,
+ {
+ self.join_set.spawn_blocking(f);
+ }
+
+ /// Create a stream of all [`O`] written to `tx`
+ pub fn build(self) -> BoxStream<'static, Result<O>> {
+ let Self {
+ tx,
+ rx,
+ mut join_set,
+ } = self;
+
+ // don't need tx
+ drop(tx);
+
+ // future that checks the result of the join set, and propagates panic
if seen
Review Comment:
This is the interesting logic that needs DRYing
##########
datafusion/physical-plan/src/stream.rs:
##########
@@ -47,28 +154,22 @@ use super::{ExecutionPlan, RecordBatchStream,
SendableRecordBatchStream};
///
/// This also handles propagating panic`s and canceling the tasks.
pub struct RecordBatchReceiverStreamBuilder {
- tx: Sender<Result<RecordBatch>>,
- rx: Receiver<Result<RecordBatch>>,
schema: SchemaRef,
- join_set: JoinSet<Result<()>>,
+ inner: ReceiverStreamBuilder<RecordBatch>,
}
impl RecordBatchReceiverStreamBuilder {
/// create new channels with the specified buffer size
pub fn new(schema: SchemaRef, capacity: usize) -> Self {
- let (tx, rx) = tokio::sync::mpsc::channel(capacity);
-
Self {
- tx,
- rx,
schema,
- join_set: JoinSet::new(),
+ inner: ReceiverStreamBuilder::new(capacity),
Review Comment:
It seemed unnecessary / undesirable to burden ReceiverStreamBuilder with a
notion of Schema
##########
datafusion/physical-plan/src/stream.rs:
##########
@@ -110,7 +211,7 @@ impl RecordBatchReceiverStreamBuilder {
) {
let output = self.tx();
- self.spawn(async move {
+ self.inner.spawn(async move {
Review Comment:
This logic by comparison is rather ExecutionPlan specifi, and I don't think
valuable to DRY
##########
datafusion/physical-plan/src/stream.rs:
##########
@@ -38,6 +38,113 @@ use tokio::task::JoinSet;
use super::metrics::BaselineMetrics;
use super::{ExecutionPlan, RecordBatchStream, SendableRecordBatchStream};
+/// Creates a stream from a collection of producing tasks, routing panics to
the stream
+pub(crate) struct ReceiverStreamBuilder<O> {
Review Comment:
I made this pub(crate) but it could easily be made public should we wish to
do so
##########
datafusion/physical-plan/src/stream.rs:
##########
@@ -155,80 +256,14 @@ impl RecordBatchReceiverStreamBuilder {
});
}
- /// Create a stream of all `RecordBatch`es written to `tx`
+ /// Create a stream of all [`O`] written to `tx`
pub fn build(self) -> SendableRecordBatchStream {
- let Self {
- tx,
- rx,
- schema,
- mut join_set,
- } = self;
-
- // don't need tx
- drop(tx);
-
- // future that checks the result of the join set, and propagates panic
if seen
- let check = async move {
- while let Some(result) = join_set.join_next().await {
- match result {
- Ok(task_result) => {
- match task_result {
- // nothing to report
- Ok(_) => continue,
- // This means a blocking task error
- Err(e) => {
- return Some(exec_err!("Spawned Task error:
{e}"));
- }
- }
- }
- // This means a tokio task error, likely a panic
- Err(e) => {
- if e.is_panic() {
- // resume on the main thread
- std::panic::resume_unwind(e.into_panic());
- } else {
- // This should only occur if the task is
- // cancelled, which would only occur if
- // the JoinSet were aborted, which in turn
- // would imply that the receiver has been
- // dropped and this code is not running
- return Some(internal_err!("Non Panic Task error:
{e}"));
- }
- }
- }
- }
- None
- };
-
- let check_stream = futures::stream::once(check)
- // unwrap Option / only return the error
- .filter_map(|item| async move { item });
-
- // Convert the receiver into a stream
- let rx_stream = futures::stream::unfold(rx, |mut rx| async move {
- let next_item = rx.recv().await;
- next_item.map(|next_item| (next_item, rx))
- });
-
- // Merge the streams together so whichever is ready first
- // produces the batch
- let inner = futures::stream::select(rx_stream, check_stream).boxed();
-
- Box::pin(RecordBatchReceiverStream { schema, inner })
+ Box::pin(RecordBatchStreamAdapter::new(self.schema,
self.inner.build()))
}
}
-/// A [`SendableRecordBatchStream`] that combines [`RecordBatch`]es from
multiple inputs,
-/// on new tokio Tasks, increasing the potential parallelism.
-///
-/// This structure also handles propagating panics and cancelling the
-/// underlying tasks correctly.
-///
-/// Use [`Self::builder`] to construct one.
-pub struct RecordBatchReceiverStream {
Review Comment:
This type doesn't really make a lot of sense, given it isn't actually what
the builder returns
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]