This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/master by this push:
new 74ffc8ae return task status on panic (#67)
74ffc8ae is described below
commit 74ffc8aec269fd4e7890c6b5fb6deab2e286befb
Author: Remco Verhoef <[email protected]>
AuthorDate: Tue Jun 14 16:43:44 2022 +0200
return task status on panic (#67)
---
ballista/rust/executor/src/execution_loop.rs | 48 ++++++++++++++++++++++------
1 file changed, 38 insertions(+), 10 deletions(-)
diff --git a/ballista/rust/executor/src/execution_loop.rs
b/ballista/rust/executor/src/execution_loop.rs
index e1a55a3e..1e382b02 100644
--- a/ballista/rust/executor/src/execution_loop.rs
+++ b/ballista/rust/executor/src/execution_loop.rs
@@ -28,8 +28,11 @@ use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_proto::logical_plan::AsLogicalPlan;
+use futures::FutureExt;
use log::{debug, error, info, warn};
+use std::any::Any;
use std::collections::HashMap;
+use std::error::Error;
use std::ops::Deref;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc::{Receiver, Sender, TryRecvError};
@@ -110,6 +113,19 @@ pub async fn poll_loop<T: 'static + AsLogicalPlan, U:
'static + AsExecutionPlan>
}
}
+/// Tries to get meaningful description from panic-error.
+pub(crate) fn any_to_string(any: &Box<dyn Any + Send>) -> String {
+ if let Some(s) = any.downcast_ref::<&str>() {
+ (*s).to_string()
+ } else if let Some(s) = any.downcast_ref::<String>() {
+ s.clone()
+ } else if let Some(error) = any.downcast_ref::<Box<dyn Error + Send>>() {
+ error.to_string()
+ } else {
+ "Unknown error occurred".to_string()
+ }
+}
+
async fn run_received_tasks<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan>(
executor: Arc<Executor>,
available_tasks_slots: Arc<AtomicUsize>,
@@ -165,19 +181,31 @@ async fn run_received_tasks<T: 'static + AsLogicalPlan,
U: 'static + AsExecution
)?;
tokio::spawn(async move {
- let execution_result = executor
- .execute_shuffle_write(
- task_id.job_id.clone(),
- task_id.stage_id as usize,
- task_id.partition_id as usize,
- plan,
- task_context,
- shuffle_output_partitioning,
- )
- .await;
+ use std::panic::AssertUnwindSafe;
+
+ let execution_result = match
AssertUnwindSafe(executor.execute_shuffle_write(
+ task_id.job_id.clone(),
+ task_id.stage_id as usize,
+ task_id.partition_id as usize,
+ plan,
+ task_context,
+ shuffle_output_partitioning,
+ ))
+ .catch_unwind()
+ .await
+ {
+ Ok(Ok(r)) => Ok(r),
+ Ok(Err(r)) => Err(r),
+ Err(r) => {
+ error!("Error executing task: {:?}", any_to_string(&r));
+ Err(BallistaError::Internal(format!("{:#?}",
any_to_string(&r))))
+ }
+ };
+
info!("Done with task {}", task_id_log);
debug!("Statistics: {:?}", execution_result);
available_tasks_slots.fetch_add(1, Ordering::SeqCst);
+
let _ = task_status_sender.send(as_task_status(
execution_result,
executor.metadata.id.clone(),