This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new dbf10b5a85e [exceptionsafe](be) only throw exception in get block and
sink method, since open and prepare maybe not exception safe (#34998)
dbf10b5a85e is described below
commit dbf10b5a85e9558c9a6567cafb064ed5db7077b7
Author: yiguolei <[email protected]>
AuthorDate: Fri May 17 22:24:38 2024 +0800
[exceptionsafe](be) only throw exception in get block and sink method,
since open and prepare maybe not exception safe (#34998)
Co-authored-by: yiguolei <[email protected]>
---
be/src/pipeline/pipeline_task.cpp | 17 +++++++++--------
be/src/pipeline/task_scheduler.cpp | 4 ----
2 files changed, 9 insertions(+), 12 deletions(-)
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index 82b822edbaa..ada46968a70 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -303,18 +303,12 @@ Status PipelineTask::execute(bool* eos) {
RETURN_IF_ERROR(_sink->revoke_memory(_state));
continue;
}
-
*eos = _eos;
// Pull block from operator chain
if (!_dry_run) {
SCOPED_TIMER(_get_block_timer);
_get_block_counter->update(1);
- try {
- RETURN_IF_ERROR(_root->get_block_after_projects(_state, block,
eos));
- } catch (const Exception& e) {
- return Status::InternalError(e.to_string() +
- " task debug string: " +
debug_string());
- }
+
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_root->get_block_after_projects(_state,
block, eos));
} else {
*eos = true;
_eos = true;
@@ -323,7 +317,14 @@ Status PipelineTask::execute(bool* eos) {
if (_block->rows() != 0 || *eos) {
SCOPED_TIMER(_sink_timer);
Status status = Status::OK();
- status = _sink->sink(_state, block, *eos);
+ // Define a lambda function to catch sink exception, because sink
will check
+ // return error status with EOF, it is special, could not return
directly.
+ auto sink_function = [&]() -> Status {
+ Status internal_st;
+ RETURN_IF_CATCH_EXCEPTION(internal_st = _sink->sink(_state,
block, *eos));
+ return internal_st;
+ };
+ status = sink_function();
if (!status.is<ErrorCode::END_OF_FILE>()) {
RETURN_IF_ERROR(status);
}
diff --git a/be/src/pipeline/task_scheduler.cpp
b/be/src/pipeline/task_scheduler.cpp
index 4694e752961..c45186190b7 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -128,10 +128,6 @@ void TaskScheduler::_do_work(size_t index) {
auto status = Status::OK();
try {
- // This will enable exception handling logic in allocator.h when
memory allocate
- // failed or system memory is not sufficient.
- doris::enable_thread_catch_bad_alloc++;
- Defer defer {[&]() { doris::enable_thread_catch_bad_alloc--; }};
//TODO: use a better enclose to abstracting these
if (ExecEnv::GetInstance()->pipeline_tracer_context()->enabled()) {
TUniqueId query_id = task->query_context()->query_id();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]