This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 0208755751 Minor: Add documentation about stream cancellation (#8747)
0208755751 is described below

commit 0208755751d5a389a55e9bf505d3b42c85ebedba
Author: Andrew Lamb <[email protected]>
AuthorDate: Fri Jan 5 15:37:51 2024 -0500

    Minor: Add documentation about stream cancellation (#8747)
---
 datafusion/core/src/dataframe/mod.rs | 11 +++++++++++
 datafusion/physical-plan/src/lib.rs  | 33 +++++++++++++++++++++++++++++++--
 2 files changed, 42 insertions(+), 2 deletions(-)

diff --git a/datafusion/core/src/dataframe/mod.rs 
b/datafusion/core/src/dataframe/mod.rs
index 5a8c706e32..33e198d6d5 100644
--- a/datafusion/core/src/dataframe/mod.rs
+++ b/datafusion/core/src/dataframe/mod.rs
@@ -802,6 +802,7 @@ impl DataFrame {
 
     /// Executes this DataFrame and returns a stream over a single partition
     ///
+    /// # Example
     /// ```
     /// # use datafusion::prelude::*;
     /// # use datafusion::error::Result;
@@ -813,6 +814,11 @@ impl DataFrame {
     /// # Ok(())
     /// # }
     /// ```
+    ///
+    /// # Aborting Execution
+    ///
+    /// Dropping the stream will abort the execution of the query, and free up
+    /// any allocated resources
     pub async fn execute_stream(self) -> Result<SendableRecordBatchStream> {
         let task_ctx = Arc::new(self.task_ctx());
         let plan = self.create_physical_plan().await?;
@@ -841,6 +847,7 @@ impl DataFrame {
 
     /// Executes this DataFrame and returns one stream per partition.
     ///
+    /// # Example
     /// ```
     /// # use datafusion::prelude::*;
     /// # use datafusion::error::Result;
@@ -852,6 +859,10 @@ impl DataFrame {
     /// # Ok(())
     /// # }
     /// ```
+    /// # Aborting Execution
+    ///
+    /// Dropping the stream will abort the execution of the query, and free up
+    /// any allocated resources
     pub async fn execute_stream_partitioned(
         self,
     ) -> Result<Vec<SendableRecordBatchStream>> {
diff --git a/datafusion/physical-plan/src/lib.rs 
b/datafusion/physical-plan/src/lib.rs
index 6c9e97e03c..1dd1392b9d 100644
--- a/datafusion/physical-plan/src/lib.rs
+++ b/datafusion/physical-plan/src/lib.rs
@@ -288,6 +288,24 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
     /// [`TryStreamExt`]: futures::stream::TryStreamExt
     /// [`RecordBatchStreamAdapter`]: crate::stream::RecordBatchStreamAdapter
     ///
+    /// # Cancellation / Aborting Execution
+    ///
+    /// The [`Stream`] that is returned must ensure that any allocated 
resources
+    /// are freed when the stream itself is dropped. This is particularly
+    /// important for [`spawn`]ed tasks or threads. Unless care is taken to
+    /// "abort" such tasks, they may continue to consume resources even after
+    /// the plan is dropped, generating intermediate results that are never
+    /// used.
+    ///
+    /// See [`AbortOnDropSingle`], [`AbortOnDropMany`] and
+    /// [`RecordBatchReceiverStreamBuilder`] for structures to help ensure all
+    /// background tasks are cancelled.
+    ///
+    /// [`spawn`]: tokio::task::spawn
+    /// [`AbortOnDropSingle`]: crate::common::AbortOnDropSingle
+    /// [`AbortOnDropMany`]: crate::common::AbortOnDropMany
+    /// [`RecordBatchReceiverStreamBuilder`]: 
crate::stream::RecordBatchReceiverStreamBuilder
+    ///
     /// # Implementation Examples
     ///
     /// While `async` `Stream`s have a non trivial learning curve, the
@@ -491,7 +509,12 @@ pub async fn collect(
     common::collect(stream).await
 }
 
-/// Execute the [ExecutionPlan] and return a single stream of results
+/// Execute the [ExecutionPlan] and return a single stream of results.
+///
+/// # Aborting Execution
+///
+/// Dropping the stream will abort the execution of the query, and free up
+/// any allocated resources
 pub fn execute_stream(
     plan: Arc<dyn ExecutionPlan>,
     context: Arc<TaskContext>,
@@ -549,7 +572,13 @@ pub async fn collect_partitioned(
     Ok(batches)
 }
 
-/// Execute the [ExecutionPlan] and return a vec with one stream per output 
partition
+/// Execute the [ExecutionPlan] and return a vec with one stream per output
+/// partition
+///
+/// # Aborting Execution
+///
+/// Dropping the stream will abort the execution of the query, and free up
+/// any allocated resources
 pub fn execute_stream_partitioned(
     plan: Arc<dyn ExecutionPlan>,
     context: Arc<TaskContext>,

Reply via email to