This is an automated email from the ASF dual-hosted git repository.
xudong963 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new e26452ab3 [MINOR] Add debug logging to plan teardown (#3350)
e26452ab3 is described below
commit e26452ab374ad59effa785aae6235e741973569a
Author: Andrew Lamb <[email protected]>
AuthorDate: Sat Sep 3 08:07:45 2022 -0400
[MINOR] Add debug logging to plan teardown (#3350)
* [MINOR] Add debug logging to plan teardown
* clippy
---
datafusion/core/src/physical_plan/common.rs | 13 +++++++++++--
1 file changed, 11 insertions(+), 2 deletions(-)
diff --git a/datafusion/core/src/physical_plan/common.rs
b/datafusion/core/src/physical_plan/common.rs
index dd194263d..79535497d 100644
--- a/datafusion/core/src/physical_plan/common.rs
+++ b/datafusion/core/src/physical_plan/common.rs
@@ -21,7 +21,7 @@ use super::{RecordBatchStream, SendableRecordBatchStream};
use crate::error::{DataFusionError, Result};
use crate::execution::context::TaskContext;
use crate::physical_plan::metrics::MemTrackingMetrics;
-use crate::physical_plan::{ColumnStatistics, ExecutionPlan, Statistics};
+use crate::physical_plan::{displayable, ColumnStatistics, ExecutionPlan,
Statistics};
use arrow::compute::concat;
use arrow::datatypes::{Schema, SchemaRef};
use arrow::error::ArrowError;
@@ -29,6 +29,7 @@ use arrow::error::Result as ArrowResult;
use arrow::ipc::writer::FileWriter;
use arrow::record_batch::RecordBatch;
use futures::{Future, Stream, StreamExt, TryStreamExt};
+use log::debug;
use pin_project_lite::pin_project;
use std::fs;
use std::fs::{metadata, File};
@@ -185,6 +186,10 @@ pub(crate) fn spawn_execution(
// there is no place to send the error.
let arrow_error = ArrowError::ExternalError(Box::new(e));
output.send(Err(arrow_error)).await.ok();
+ debug!(
+ "Stopping execution: error executing input: {}",
+ displayable(input.as_ref()).one_line()
+ );
return;
}
Ok(stream) => stream,
@@ -193,7 +198,11 @@ pub(crate) fn spawn_execution(
while let Some(item) = stream.next().await {
// If send fails, plan being torn down,
// there is no place to send the error.
- if let Err(_) = output.send(item).await {
+ if output.send(item).await.is_err() {
+ debug!(
+ "Stopping execution: output is gone, plan cancelling: {}",
+ displayable(input.as_ref()).one_line()
+ );
return;
}
}