This is an automated email from the ASF dual-hosted git repository.
mrhhsg pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 87912de93f3 [fix](scan) catch exceptions thrown in scanner (#36101)
(#37408)
87912de93f3 is described below
commit 87912de93f36d17aeac9c63f66e60628627ff96b
Author: Jerry Hu <[email protected]>
AuthorDate: Fri Jul 12 08:49:39 2024 +0800
[fix](scan) catch exceptions thrown in scanner (#36101) (#37408)
## Proposed changes
pick #36101
The uncaught exceptions thrown in the scanner will cause the BE to
crash.
---
be/src/vec/exec/scan/scanner_scheduler.cpp | 37 +++++++++++++++++++++++++-----
1 file changed, 31 insertions(+), 6 deletions(-)
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index 0df501f6919..351912f5b17 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -136,8 +136,17 @@ void
ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
}
scanner_delegate->_scanner->start_wait_worker_timer();
- auto s = ctx->thread_token->submit_func(
- [this, scanner_ref = scan_task, ctx]() {
this->_scanner_scan(ctx, scanner_ref); });
+ auto s = ctx->thread_token->submit_func([scanner_ref = scan_task,
ctx]() {
+ auto status = [&] {
+ RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, scanner_ref));
+ return Status::OK();
+ }();
+
+ if (!status.ok()) {
+ scanner_ref->set_status(status);
+ ctx->append_block_to_queue(scanner_ref);
+ }
+ });
if (!s.ok()) {
scan_task->set_status(s);
ctx->append_block_to_queue(scan_task);
@@ -157,16 +166,32 @@ void
ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
is_local ? ctx->get_simple_scan_scheduler() :
ctx->get_remote_scan_scheduler();
auto& thread_pool = is_local ? _local_scan_thread_pool :
_remote_scan_thread_pool;
if (scan_sched) {
- auto work_func = [this, scanner_ref = scan_task, ctx]() {
- this->_scanner_scan(ctx, scanner_ref);
+ auto work_func = [scanner_ref = scan_task, ctx]() {
+ auto status = [&] {
+ RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx,
scanner_ref));
+ return Status::OK();
+ }();
+
+ if (!status.ok()) {
+ scanner_ref->set_status(status);
+ ctx->append_block_to_queue(scanner_ref);
+ }
};
SimplifiedScanTask simple_scan_task = {work_func, ctx};
return scan_sched->submit_scan_task(simple_scan_task);
}
PriorityThreadPool::Task task;
- task.work_function = [this, scanner_ref = scan_task, ctx]() {
- this->_scanner_scan(ctx, scanner_ref);
+ task.work_function = [scanner_ref = scan_task, ctx]() {
+ auto status = [&] {
+ RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, scanner_ref));
+ return Status::OK();
+ }();
+
+ if (!status.ok()) {
+ scanner_ref->set_status(status);
+ ctx->append_block_to_queue(scanner_ref);
+ }
};
task.priority = nice;
return thread_pool->offer(task)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]