This is an automated email from the ASF dual-hosted git repository.

xudong963 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new e26452ab3 [MINOR] Add debug logging to plan teardown (#3350)
e26452ab3 is described below

commit e26452ab374ad59effa785aae6235e741973569a
Author: Andrew Lamb <[email protected]>
AuthorDate: Sat Sep 3 08:07:45 2022 -0400

    [MINOR] Add debug logging to plan teardown (#3350)
    
    * [MINOR] Add debug logging to plan teardown
    
    * clippy
---
 datafusion/core/src/physical_plan/common.rs | 13 +++++++++++--
 1 file changed, 11 insertions(+), 2 deletions(-)

diff --git a/datafusion/core/src/physical_plan/common.rs 
b/datafusion/core/src/physical_plan/common.rs
index dd194263d..79535497d 100644
--- a/datafusion/core/src/physical_plan/common.rs
+++ b/datafusion/core/src/physical_plan/common.rs
@@ -21,7 +21,7 @@ use super::{RecordBatchStream, SendableRecordBatchStream};
 use crate::error::{DataFusionError, Result};
 use crate::execution::context::TaskContext;
 use crate::physical_plan::metrics::MemTrackingMetrics;
-use crate::physical_plan::{ColumnStatistics, ExecutionPlan, Statistics};
+use crate::physical_plan::{displayable, ColumnStatistics, ExecutionPlan, 
Statistics};
 use arrow::compute::concat;
 use arrow::datatypes::{Schema, SchemaRef};
 use arrow::error::ArrowError;
@@ -29,6 +29,7 @@ use arrow::error::Result as ArrowResult;
 use arrow::ipc::writer::FileWriter;
 use arrow::record_batch::RecordBatch;
 use futures::{Future, Stream, StreamExt, TryStreamExt};
+use log::debug;
 use pin_project_lite::pin_project;
 use std::fs;
 use std::fs::{metadata, File};
@@ -185,6 +186,10 @@ pub(crate) fn spawn_execution(
                 // there is no place to send the error.
                 let arrow_error = ArrowError::ExternalError(Box::new(e));
                 output.send(Err(arrow_error)).await.ok();
+                debug!(
+                    "Stopping execution: error executing input: {}",
+                    displayable(input.as_ref()).one_line()
+                );
                 return;
             }
             Ok(stream) => stream,
@@ -193,7 +198,11 @@ pub(crate) fn spawn_execution(
         while let Some(item) = stream.next().await {
             // If send fails, plan being torn down,
             // there is no place to send the error.
-            if let Err(_) = output.send(item).await {
+            if output.send(item).await.is_err() {
+                debug!(
+                    "Stopping execution: output is gone, plan cancelling: {}",
+                    displayable(input.as_ref()).one_line()
+                );
                 return;
             }
         }

Reply via email to