This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 0208755751 Minor: Add documentation about stream cancellation (#8747)
0208755751 is described below
commit 0208755751d5a389a55e9bf505d3b42c85ebedba
Author: Andrew Lamb <[email protected]>
AuthorDate: Fri Jan 5 15:37:51 2024 -0500
Minor: Add documentation about stream cancellation (#8747)
---
datafusion/core/src/dataframe/mod.rs | 11 +++++++++++
datafusion/physical-plan/src/lib.rs | 33 +++++++++++++++++++++++++++++++--
2 files changed, 42 insertions(+), 2 deletions(-)
diff --git a/datafusion/core/src/dataframe/mod.rs
b/datafusion/core/src/dataframe/mod.rs
index 5a8c706e32..33e198d6d5 100644
--- a/datafusion/core/src/dataframe/mod.rs
+++ b/datafusion/core/src/dataframe/mod.rs
@@ -802,6 +802,7 @@ impl DataFrame {
/// Executes this DataFrame and returns a stream over a single partition
///
+ /// # Example
/// ```
/// # use datafusion::prelude::*;
/// # use datafusion::error::Result;
@@ -813,6 +814,11 @@ impl DataFrame {
/// # Ok(())
/// # }
/// ```
+ ///
+ /// # Aborting Execution
+ ///
+ /// Dropping the stream will abort the execution of the query, and free up
+ /// any allocated resources
pub async fn execute_stream(self) -> Result<SendableRecordBatchStream> {
let task_ctx = Arc::new(self.task_ctx());
let plan = self.create_physical_plan().await?;
@@ -841,6 +847,7 @@ impl DataFrame {
/// Executes this DataFrame and returns one stream per partition.
///
+ /// # Example
/// ```
/// # use datafusion::prelude::*;
/// # use datafusion::error::Result;
@@ -852,6 +859,10 @@ impl DataFrame {
/// # Ok(())
/// # }
/// ```
+ /// # Aborting Execution
+ ///
+ /// Dropping the stream will abort the execution of the query, and free up
+ /// any allocated resources
pub async fn execute_stream_partitioned(
self,
) -> Result<Vec<SendableRecordBatchStream>> {
diff --git a/datafusion/physical-plan/src/lib.rs
b/datafusion/physical-plan/src/lib.rs
index 6c9e97e03c..1dd1392b9d 100644
--- a/datafusion/physical-plan/src/lib.rs
+++ b/datafusion/physical-plan/src/lib.rs
@@ -288,6 +288,24 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
/// [`TryStreamExt`]: futures::stream::TryStreamExt
/// [`RecordBatchStreamAdapter`]: crate::stream::RecordBatchStreamAdapter
///
+ /// # Cancellation / Aborting Execution
+ ///
+ /// The [`Stream`] that is returned must ensure that any allocated
resources
+ /// are freed when the stream itself is dropped. This is particularly
+ /// important for [`spawn`]ed tasks or threads. Unless care is taken to
+ /// "abort" such tasks, they may continue to consume resources even after
+ /// the plan is dropped, generating intermediate results that are never
+ /// used.
+ ///
+ /// See [`AbortOnDropSingle`], [`AbortOnDropMany`] and
+ /// [`RecordBatchReceiverStreamBuilder`] for structures to help ensure all
+ /// background tasks are cancelled.
+ ///
+ /// [`spawn`]: tokio::task::spawn
+ /// [`AbortOnDropSingle`]: crate::common::AbortOnDropSingle
+ /// [`AbortOnDropMany`]: crate::common::AbortOnDropMany
+ /// [`RecordBatchReceiverStreamBuilder`]:
crate::stream::RecordBatchReceiverStreamBuilder
+ ///
/// # Implementation Examples
///
/// While `async` `Stream`s have a non trivial learning curve, the
@@ -491,7 +509,12 @@ pub async fn collect(
common::collect(stream).await
}
-/// Execute the [ExecutionPlan] and return a single stream of results
+/// Execute the [ExecutionPlan] and return a single stream of results.
+///
+/// # Aborting Execution
+///
+/// Dropping the stream will abort the execution of the query, and free up
+/// any allocated resources
pub fn execute_stream(
plan: Arc<dyn ExecutionPlan>,
context: Arc<TaskContext>,
@@ -549,7 +572,13 @@ pub async fn collect_partitioned(
Ok(batches)
}
-/// Execute the [ExecutionPlan] and return a vec with one stream per output
partition
+/// Execute the [ExecutionPlan] and return a vec with one stream per output
+/// partition
+///
+/// # Aborting Execution
+///
+/// Dropping the stream will abort the execution of the query, and free up
+/// any allocated resources
pub fn execute_stream_partitioned(
plan: Arc<dyn ExecutionPlan>,
context: Arc<TaskContext>,