This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/master by this push:
     new 74ffc8ae return task status on panic (#67)
74ffc8ae is described below

commit 74ffc8aec269fd4e7890c6b5fb6deab2e286befb
Author: Remco Verhoef <[email protected]>
AuthorDate: Tue Jun 14 16:43:44 2022 +0200

    return task status on panic (#67)
---
 ballista/rust/executor/src/execution_loop.rs | 48 ++++++++++++++++++++++------
 1 file changed, 38 insertions(+), 10 deletions(-)

diff --git a/ballista/rust/executor/src/execution_loop.rs 
b/ballista/rust/executor/src/execution_loop.rs
index e1a55a3e..1e382b02 100644
--- a/ballista/rust/executor/src/execution_loop.rs
+++ b/ballista/rust/executor/src/execution_loop.rs
@@ -28,8 +28,11 @@ use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
 use datafusion::execution::context::TaskContext;
 use datafusion::physical_plan::ExecutionPlan;
 use datafusion_proto::logical_plan::AsLogicalPlan;
+use futures::FutureExt;
 use log::{debug, error, info, warn};
+use std::any::Any;
 use std::collections::HashMap;
+use std::error::Error;
 use std::ops::Deref;
 use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::mpsc::{Receiver, Sender, TryRecvError};
@@ -110,6 +113,19 @@ pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 
'static + AsExecutionPlan>
     }
 }
 
+/// Tries to get meaningful description from panic-error.
+pub(crate) fn any_to_string(any: &Box<dyn Any + Send>) -> String {
+    if let Some(s) = any.downcast_ref::<&str>() {
+        (*s).to_string()
+    } else if let Some(s) = any.downcast_ref::<String>() {
+        s.clone()
+    } else if let Some(error) = any.downcast_ref::<Box<dyn Error + Send>>() {
+        error.to_string()
+    } else {
+        "Unknown error occurred".to_string()
+    }
+}
+
 async fn run_received_tasks<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>(
     executor: Arc<Executor>,
     available_tasks_slots: Arc<AtomicUsize>,
@@ -165,19 +181,31 @@ async fn run_received_tasks<T: 'static + AsLogicalPlan, 
U: 'static + AsExecution
     )?;
 
     tokio::spawn(async move {
-        let execution_result = executor
-            .execute_shuffle_write(
-                task_id.job_id.clone(),
-                task_id.stage_id as usize,
-                task_id.partition_id as usize,
-                plan,
-                task_context,
-                shuffle_output_partitioning,
-            )
-            .await;
+        use std::panic::AssertUnwindSafe;
+
+        let execution_result = match 
AssertUnwindSafe(executor.execute_shuffle_write(
+            task_id.job_id.clone(),
+            task_id.stage_id as usize,
+            task_id.partition_id as usize,
+            plan,
+            task_context,
+            shuffle_output_partitioning,
+        ))
+        .catch_unwind()
+        .await
+        {
+            Ok(Ok(r)) => Ok(r),
+            Ok(Err(r)) => Err(r),
+            Err(r) => {
+                error!("Error executing task: {:?}", any_to_string(&r));
+                Err(BallistaError::Internal(format!("{:#?}", 
any_to_string(&r))))
+            }
+        };
+
         info!("Done with task {}", task_id_log);
         debug!("Statistics: {:?}", execution_result);
         available_tasks_slots.fetch_add(1, Ordering::SeqCst);
+
         let _ = task_status_sender.send(as_task_status(
             execution_result,
             executor.metadata.id.clone(),

Reply via email to