martin-g commented on code in PR #1793:
URL:
https://github.com/apache/datafusion-ballista/pull/1793#discussion_r3330131250
##########
ballista/executor/src/executor.rs:
##########
@@ -197,7 +201,19 @@ impl Executor {
self.abort_handles
.insert((task_id, partition.clone()), abort_handle);
- let partitions = task.await??;
+ let partitions = match
std::panic::AssertUnwindSafe(task).catch_unwind().await {
+ Ok(Ok(result)) => result,
+ Ok(Err(_)) => {
+ let error_msg = "Task has been aborted!";
+ error!("{}", error_msg);
+ Err(DataFusionError::Internal(error_msg.to_string()))
+ }
+ Err(p) => {
+ let error_msg = format!("Task panicked: {}",
any_to_string(&p));
+ error!("{}", error_msg);
+ Err(DataFusionError::Internal(error_msg))
Review Comment:
Is the error type a DataFusionError ?
`error_msg` might contain details about a DataFusionError but here the error
type should be `BallistaError::Internal`.
If accepted then also remove the wrapping at line 227.
##########
ballista/executor/src/executor.rs:
##########
@@ -197,7 +201,19 @@ impl Executor {
self.abort_handles
.insert((task_id, partition.clone()), abort_handle);
- let partitions = task.await??;
+ let partitions = match
std::panic::AssertUnwindSafe(task).catch_unwind().await {
+ Ok(Ok(result)) => result,
+ Ok(Err(_)) => {
+ let error_msg = "Task has been aborted!";
+ error!("{}", error_msg);
+ Err(DataFusionError::Internal(error_msg.to_string()))
+ }
+ Err(p) => {
+ let error_msg = format!("Task panicked: {}",
any_to_string(&p));
+ error!("{}", error_msg);
Review Comment:
```suggestion
error!("{error_msg}");
```
##########
ballista/executor/src/executor.rs:
##########
@@ -197,7 +201,19 @@ impl Executor {
self.abort_handles
.insert((task_id, partition.clone()), abort_handle);
- let partitions = task.await??;
+ let partitions = match
std::panic::AssertUnwindSafe(task).catch_unwind().await {
+ Ok(Ok(result)) => result,
+ Ok(Err(_)) => {
+ let error_msg = "Task has been aborted!";
+ error!("{}", error_msg);
Review Comment:
Does normal `CancelJob` operation lead to this code here ?
If YES, then it should not be logged as ERROR, but WARN or DEBUG.
What other functionality could abort a task ?
##########
ballista/executor/src/executor.rs:
##########
@@ -197,7 +201,19 @@ impl Executor {
self.abort_handles
.insert((task_id, partition.clone()), abort_handle);
- let partitions = task.await??;
+ let partitions = match
std::panic::AssertUnwindSafe(task).catch_unwind().await {
+ Ok(Ok(result)) => result,
+ Ok(Err(_)) => {
+ let error_msg = "Task has been aborted!";
+ error!("{}", error_msg);
Review Comment:
```suggestion
error!("{error_msg}");
```
##########
ballista/executor/src/executor.rs:
##########
@@ -197,7 +201,19 @@ impl Executor {
self.abort_handles
.insert((task_id, partition.clone()), abort_handle);
- let partitions = task.await??;
+ let partitions = match
std::panic::AssertUnwindSafe(task).catch_unwind().await {
+ Ok(Ok(result)) => result,
+ Ok(Err(_)) => {
+ let error_msg = "Task has been aborted!";
+ error!("{}", error_msg);
+ Err(DataFusionError::Internal(error_msg.to_string()))
Review Comment:
```suggestion
Err(BallistaError::Cancelled)
```
?
##########
ballista/executor/src/executor.rs:
##########
@@ -197,7 +201,19 @@ impl Executor {
self.abort_handles
.insert((task_id, partition.clone()), abort_handle);
- let partitions = task.await??;
+ let partitions = match
std::panic::AssertUnwindSafe(task).catch_unwind().await {
+ Ok(Ok(result)) => result,
+ Ok(Err(_)) => {
+ let error_msg = "Task has been aborted!";
+ error!("{}", error_msg);
+ Err(DataFusionError::Internal(error_msg.to_string()))
+ }
+ Err(p) => {
+ let error_msg = format!("Task panicked: {}",
any_to_string(&p));
Review Comment:
https://github.com/apache/datafusion-ballista/blob/1b1d72c2922b6c2347dfaac76f66a161ed091dc8/ballista/executor/src/execution_loop.rs#L306
uses Debug format. Maybe they should use the same formatting styles ?!
##########
ballista/executor/src/executor.rs:
##########
@@ -208,7 +224,7 @@ impl Executor {
query_stage_exec,
);
- Ok(partitions)
+ partitions.map_err(|e| BallistaError::DataFusionError(Box::new(e)))
Review Comment:
If `map_err()` is needed then consider:
```suggestion
partitions.map_err(BallistaError::from)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]