alamb commented on code in PR #7817:
URL: https://github.com/apache/arrow-datafusion/pull/7817#discussion_r1358707688
##########
datafusion/physical-plan/src/stream.rs:
##########
@@ -155,80 +256,17 @@ impl RecordBatchReceiverStreamBuilder {
});
}
- /// Create a stream of all `RecordBatch`es written to `tx`
+ /// Create a stream of all [`RecordBatch`] written to `tx`
pub fn build(self) -> SendableRecordBatchStream {
- let Self {
- tx,
- rx,
- schema,
- mut join_set,
- } = self;
-
- // don't need tx
- drop(tx);
-
- // future that checks the result of the join set, and propagates panic
if seen
- let check = async move {
- while let Some(result) = join_set.join_next().await {
- match result {
- Ok(task_result) => {
- match task_result {
- // nothing to report
- Ok(_) => continue,
- // This means a blocking task error
- Err(e) => {
- return Some(exec_err!("Spawned Task error:
{e}"));
- }
- }
- }
- // This means a tokio task error, likely a panic
- Err(e) => {
- if e.is_panic() {
- // resume on the main thread
- std::panic::resume_unwind(e.into_panic());
- } else {
- // This should only occur if the task is
- // cancelled, which would only occur if
- // the JoinSet were aborted, which in turn
- // would imply that the receiver has been
- // dropped and this code is not running
- return Some(internal_err!("Non Panic Task error:
{e}"));
- }
- }
- }
- }
- None
- };
-
- let check_stream = futures::stream::once(check)
- // unwrap Option / only return the error
- .filter_map(|item| async move { item });
-
- // Convert the receiver into a stream
- let rx_stream = futures::stream::unfold(rx, |mut rx| async move {
- let next_item = rx.recv().await;
- next_item.map(|next_item| (next_item, rx))
- });
-
- // Merge the streams together so whichever is ready first
- // produces the batch
- let inner = futures::stream::select(rx_stream, check_stream).boxed();
-
- Box::pin(RecordBatchReceiverStream { schema, inner })
+ Box::pin(RecordBatchStreamAdapter::new(
+ self.schema,
+ self.inner.build(),
+ ))
}
}
-/// A [`SendableRecordBatchStream`] that combines [`RecordBatch`]es from
multiple inputs,
-/// on new tokio Tasks, increasing the potential parallelism.
-///
-/// This structure also handles propagating panics and cancelling the
-/// underlying tasks correctly.
-///
-/// Use [`Self::builder`] to construct one.
-pub struct RecordBatchReceiverStream {
- schema: SchemaRef,
- inner: BoxStream<'static, Result<RecordBatch>>,
-}
+#[doc(hidden)]
+pub struct RecordBatchReceiverStream {}
impl RecordBatchReceiverStream {
/// Create a builder with an internal buffer of capacity batches.
Review Comment:
Yeah, this method documentation also seems incorrect at this time.
##########
datafusion/physical-plan/src/stream.rs:
##########
@@ -47,28 +154,22 @@ use super::{ExecutionPlan, RecordBatchStream,
SendableRecordBatchStream};
///
/// This also handles propagating panic`s and canceling the tasks.
pub struct RecordBatchReceiverStreamBuilder {
- tx: Sender<Result<RecordBatch>>,
- rx: Receiver<Result<RecordBatch>>,
schema: SchemaRef,
- join_set: JoinSet<Result<()>>,
+ inner: ReceiverStreamBuilder<RecordBatch>,
}
impl RecordBatchReceiverStreamBuilder {
/// create new channels with the specified buffer size
pub fn new(schema: SchemaRef, capacity: usize) -> Self {
- let (tx, rx) = tokio::sync::mpsc::channel(capacity);
-
Self {
- tx,
- rx,
schema,
- join_set: JoinSet::new(),
+ inner: ReceiverStreamBuilder::new(capacity),
Review Comment:
I agree
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]