This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ff5fb36bfe2 [fix](memory) Refactor memory allocated failure processing 
(#36090)
ff5fb36bfe2 is described below

commit ff5fb36bfe2369918a7964645cc76aed5d1a5dc2
Author: Gabriel <[email protected]>
AuthorDate: Wed Jun 12 15:18:54 2024 +0800

    [fix](memory) Refactor memory allocated failure processing (#36090)
    
    In this PR,
    1. Do not catch exception in operators.
    2. Catch exception where we will get an exception.
---
 be/src/common/exception.h                          |  17 +++
 be/src/pipeline/exec/aggregation_sink_operator.cpp |   2 +-
 .../distinct_streaming_aggregation_operator.cpp    |   3 +-
 be/src/pipeline/exec/hashjoin_probe_operator.cpp   |  58 +++++-----
 .../exec/join/process_hash_table_probe_impl.h      |   4 +-
 .../exec/nested_loop_join_probe_operator.cpp       |  15 +--
 be/src/pipeline/exec/sort_source_operator.cpp      |   3 +-
 .../exec/streaming_aggregation_operator.cpp        |   2 +-
 be/src/pipeline/pipeline_fragment_context.cpp      |  13 +--
 be/src/pipeline/pipeline_task.cpp                  |   4 +-
 be/src/pipeline/task_scheduler.cpp                 |  41 ++++---
 be/src/runtime/fragment_mgr.cpp                    |   3 +-
 be/src/service/internal_service.cpp                |  25 ++--
 be/src/vec/exec/scan/scanner_scheduler.cpp         | 126 +++++++++++----------
 14 files changed, 170 insertions(+), 146 deletions(-)

diff --git a/be/src/common/exception.h b/be/src/common/exception.h
index 0ec8d334e8b..ce44e658749 100644
--- a/be/src/common/exception.h
+++ b/be/src/common/exception.h
@@ -122,3 +122,20 @@ inline const std::string& Exception::to_string() const {
             return Status::Error<false>(e.code(), e.to_string());              
                  \
         }                                                                      
                  \
     } while (0)
+
+#define ASSIGN_STATUS_IF_CATCH_EXCEPTION(stmt, status_)                        
                  \
+    do {                                                                       
                  \
+        try {                                                                  
                  \
+            doris::enable_thread_catch_bad_alloc++;                            
                  \
+            Defer defer {[&]() { doris::enable_thread_catch_bad_alloc--; }};   
                  \
+            { stmt; }                                                          
                  \
+        } catch (const doris::Exception& e) {                                  
                  \
+            if (e.code() == doris::ErrorCode::MEM_ALLOC_FAILED) {              
                  \
+                status_ = Status::MemoryLimitExceeded(fmt::format(             
                  \
+                        "PreCatch error code:{}, {}, __FILE__:{}, __LINE__:{}, 
__FUNCTION__:{}", \
+                        e.code(), e.to_string(), __FILE__, __LINE__, 
__PRETTY_FUNCTION__));      \
+            } else {                                                           
                  \
+                status_ = e.to_status();                                       
                  \
+            }                                                                  
                  \
+        }                                                                      
                  \
+    } while (0);
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index 7a4a9d9c951..8e34de9bf98 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -151,7 +151,7 @@ Status AggSinkLocalState::open(RuntimeState* state) {
     // this could cause unable to get JVM
     if (Base::_shared_state->probe_expr_ctxs.empty()) {
         // _create_agg_status may acquire a lot of memory, may allocate failed 
when memory is very few
-        
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_create_agg_status(_agg_data->without_key));
+        RETURN_IF_ERROR(_create_agg_status(_agg_data->without_key));
         _shared_state->agg_data_created_without_key = true;
     }
     return Status::OK();
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp 
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
index 73ce8ce5fb4..4390bebcfdd 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
@@ -203,8 +203,7 @@ Status 
DistinctStreamingAggLocalState::_distinct_pre_agg_with_serialized_key(
     _distinct_row.reserve(rows);
 
     if (!_stop_emplace_flag) {
-        RETURN_IF_CATCH_EXCEPTION(
-                _emplace_into_hash_table_to_distinct(_distinct_row, 
key_columns, rows));
+        _emplace_into_hash_table_to_distinct(_distinct_row, key_columns, rows);
     }
 
     bool mem_reuse = 
_parent->cast<DistinctStreamingAggOperatorX>()._make_nullable_keys.empty() &&
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp 
b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
index dc2df872bd5..b4d511fe2dd 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
@@ -294,37 +294,35 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* 
state, vectorized::Bloc
     Status st;
     if (local_state._probe_index < local_state._probe_block.rows()) {
         DCHECK(local_state._has_set_need_null_map_for_probe);
-        RETURN_IF_CATCH_EXCEPTION({
-            std::visit(
-                    [&](auto&& arg, auto&& process_hashtable_ctx, auto 
need_null_map_for_probe,
-                        auto ignore_null) {
-                        using HashTableProbeType = 
std::decay_t<decltype(process_hashtable_ctx)>;
-                        if constexpr (!std::is_same_v<HashTableProbeType, 
std::monostate>) {
-                            using HashTableCtxType = 
std::decay_t<decltype(arg)>;
-                            if constexpr (!std::is_same_v<HashTableCtxType, 
std::monostate>) {
-                                st = process_hashtable_ctx.template 
process<need_null_map_for_probe,
-                                                                            
ignore_null>(
-                                        arg,
-                                        need_null_map_for_probe
-                                                ? 
&local_state._null_map_column->get_data()
-                                                : nullptr,
-                                        mutable_join_block, &temp_block,
-                                        local_state._probe_block.rows(), 
_is_mark_join,
-                                        _have_other_join_conjunct);
-                                local_state._mem_tracker->set_consumption(
-                                        arg.serialized_keys_size(false));
-                            } else {
-                                st = Status::InternalError("uninited hash 
table");
-                            }
+        std::visit(
+                [&](auto&& arg, auto&& process_hashtable_ctx, auto 
need_null_map_for_probe,
+                    auto ignore_null) {
+                    using HashTableProbeType = 
std::decay_t<decltype(process_hashtable_ctx)>;
+                    if constexpr (!std::is_same_v<HashTableProbeType, 
std::monostate>) {
+                        using HashTableCtxType = std::decay_t<decltype(arg)>;
+                        if constexpr (!std::is_same_v<HashTableCtxType, 
std::monostate>) {
+                            st = process_hashtable_ctx
+                                         .template 
process<need_null_map_for_probe, ignore_null>(
+                                                 arg,
+                                                 need_null_map_for_probe
+                                                         ? 
&local_state._null_map_column->get_data()
+                                                         : nullptr,
+                                                 mutable_join_block, 
&temp_block,
+                                                 
local_state._probe_block.rows(), _is_mark_join,
+                                                 _have_other_join_conjunct);
+                            local_state._mem_tracker->set_consumption(
+                                    arg.serialized_keys_size(false));
                         } else {
-                            st = Status::InternalError("uninited hash table 
probe");
+                            st = Status::InternalError("uninited hash table");
                         }
-                    },
-                    *local_state._shared_state->hash_table_variants,
-                    *local_state._process_hashtable_ctx_variants,
-                    
vectorized::make_bool_variant(local_state._need_null_map_for_probe),
-                    
vectorized::make_bool_variant(local_state._shared_state->probe_ignore_null));
-        });
+                    } else {
+                        st = Status::InternalError("uninited hash table 
probe");
+                    }
+                },
+                *local_state._shared_state->hash_table_variants,
+                *local_state._process_hashtable_ctx_variants,
+                
vectorized::make_bool_variant(local_state._need_null_map_for_probe),
+                
vectorized::make_bool_variant(local_state._shared_state->probe_ignore_null));
     } else if (local_state._probe_eos) {
         if (_is_right_semi_anti || (_is_outer_join && _join_op != 
TJoinOp::LEFT_OUTER_JOIN)) {
             std::visit(
@@ -457,7 +455,7 @@ Status 
HashJoinProbeLocalState::filter_data_and_build_output(RuntimeState* state
                                                                
temp_block->columns()));
     }
 
-    RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_build_output_block(temp_block, 
output_block, false));
+    RETURN_IF_ERROR(_build_output_block(temp_block, output_block, false));
     _reset_tuple_is_null_column();
     reached_limit(output_block, eos);
     return Status::OK();
diff --git a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h 
b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
index 5e023f2c861..3ffdb9cb990 100644
--- a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
+++ b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
@@ -283,10 +283,10 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
             return true;
         };
 
-        RETURN_IF_CATCH_EXCEPTION(probe_side_output_column(
+        probe_side_output_column(
                 mcol, *_left_output_slot_flags, current_offset, 
last_probe_index,
                 check_all_match_one(_probe_indexs, last_probe_index, 
current_offset),
-                with_other_conjuncts));
+                with_other_conjuncts);
     }
 
     output_block->swap(mutable_block.to_block());
diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp 
b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
index 7dc31cabddb..84112151e63 100644
--- a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
@@ -165,7 +165,7 @@ Status 
NestedLoopJoinProbeLocalState::generate_join_block_data(RuntimeState* sta
         }
 
         if constexpr (set_probe_side_flag) {
-            RETURN_IF_ERROR_OR_CATCH_EXCEPTION(
+            RETURN_IF_ERROR(
                     
(_do_filtering_and_update_visited_flags<set_build_side_flag,
                                                             
set_probe_side_flag, ignore_null>(
                             &_join_block, !p._is_left_semi_anti)));
@@ -185,10 +185,9 @@ Status 
NestedLoopJoinProbeLocalState::generate_join_block_data(RuntimeState* sta
     }
 
     if constexpr (!set_probe_side_flag) {
-        RETURN_IF_ERROR_OR_CATCH_EXCEPTION(
-                (_do_filtering_and_update_visited_flags<set_build_side_flag, 
set_probe_side_flag,
-                                                        
ignore_null>(&_join_block,
-                                                                     
!p._is_right_semi_anti)));
+        
RETURN_IF_ERROR((_do_filtering_and_update_visited_flags<set_build_side_flag,
+                                                                
set_probe_side_flag, ignore_null>(
+                &_join_block, !p._is_right_semi_anti)));
         _update_additional_flags(&_join_block);
     }
 
@@ -499,8 +498,7 @@ Status NestedLoopJoinProbeOperatorX::pull(RuntimeState* 
state, vectorized::Block
                                           bool* eos) const {
     auto& local_state = get_local_state(state);
     if (_is_output_left_side_only) {
-        RETURN_IF_ERROR_OR_CATCH_EXCEPTION(
-                
local_state._build_output_block(local_state._child_block.get(), block));
+        
RETURN_IF_ERROR(local_state._build_output_block(local_state._child_block.get(), 
block));
         *eos = local_state._shared_state->left_side_eos;
         local_state._need_more_input_data = 
!local_state._shared_state->left_side_eos;
     } else {
@@ -522,8 +520,7 @@ Status NestedLoopJoinProbeOperatorX::pull(RuntimeState* 
state, vectorized::Block
                 RETURN_IF_ERROR(vectorized::VExprContext::filter_block(
                         local_state._conjuncts, &tmp_block, 
tmp_block.columns()));
             }
-            RETURN_IF_ERROR_OR_CATCH_EXCEPTION(
-                    local_state._build_output_block(&tmp_block, block, false));
+            RETURN_IF_ERROR(local_state._build_output_block(&tmp_block, block, 
false));
             local_state._reset_tuple_is_null_column();
         }
         local_state._join_block.clear_column_data();
diff --git a/be/src/pipeline/exec/sort_source_operator.cpp 
b/be/src/pipeline/exec/sort_source_operator.cpp
index 89262828708..fa891196151 100644
--- a/be/src/pipeline/exec/sort_source_operator.cpp
+++ b/be/src/pipeline/exec/sort_source_operator.cpp
@@ -61,8 +61,7 @@ Status SortSourceOperatorX::open(RuntimeState* state) {
 Status SortSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* 
block, bool* eos) {
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
-    RETURN_IF_ERROR_OR_CATCH_EXCEPTION(
-            local_state._shared_state->sorter->get_next(state, block, eos));
+    RETURN_IF_ERROR(local_state._shared_state->sorter->get_next(state, block, 
eos));
     local_state.reached_limit(block, eos);
     return Status::OK();
 }
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp 
b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
index 40b63783c12..85cf8487575 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
@@ -716,7 +716,7 @@ Status 
StreamingAggLocalState::_pre_agg_with_serialized_key(doris::vectorized::B
             _agg_data->method_variant));
 
     if (!ret_flag) {
-        RETURN_IF_CATCH_EXCEPTION(_emplace_into_hash_table(_places.data(), 
key_columns, rows));
+        _emplace_into_hash_table(_places.data(), key_columns, rows);
 
         for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
             RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_batch_add(
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 72c06721d89..7a78c255170 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -306,18 +306,17 @@ Status PipelineFragmentContext::prepare(const 
doris::TPipelineFragmentParams& re
         SCOPED_TIMER(_build_pipelines_timer);
         // 2. Build pipelines with operators in this fragment.
         auto root_pipeline = add_pipeline();
-        
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_build_pipelines(_runtime_state->obj_pool(), 
request,
-                                                            
*_query_ctx->desc_tbl, &_root_op,
-                                                            root_pipeline));
+        RETURN_IF_ERROR(_build_pipelines(_runtime_state->obj_pool(), request, 
*_query_ctx->desc_tbl,
+                                         &_root_op, root_pipeline));
 
         // 3. Create sink operator
         if (!request.fragment.__isset.output_sink) {
             return Status::InternalError("No output sink in this fragment!");
         }
-        RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_create_data_sink(
-                _runtime_state->obj_pool(), request.fragment.output_sink,
-                request.fragment.output_exprs, request, 
root_pipeline->output_row_desc(),
-                _runtime_state.get(), *_desc_tbl, root_pipeline->id()));
+        RETURN_IF_ERROR(_create_data_sink(_runtime_state->obj_pool(), 
request.fragment.output_sink,
+                                          request.fragment.output_exprs, 
request,
+                                          root_pipeline->output_row_desc(), 
_runtime_state.get(),
+                                          *_desc_tbl, root_pipeline->id()));
         RETURN_IF_ERROR(_sink->init(request.fragment.output_sink));
         RETURN_IF_ERROR(root_pipeline->set_sink(_sink));
 
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index c43410e68a4..09f32d9d23e 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -343,7 +343,7 @@ Status PipelineTask::execute(bool* eos) {
         } else {
             SCOPED_TIMER(_get_block_timer);
             _get_block_counter->update(1);
-            
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_root->get_block_after_projects(_state, 
block, eos));
+            RETURN_IF_ERROR(_root->get_block_after_projects(_state, block, 
eos));
         }
 
         if (_block->rows() != 0 || *eos) {
@@ -353,7 +353,7 @@ Status PipelineTask::execute(bool* eos) {
             // return error status with EOF, it is special, could not return 
directly.
             auto sink_function = [&]() -> Status {
                 Status internal_st;
-                RETURN_IF_CATCH_EXCEPTION(internal_st = _sink->sink(_state, 
block, *eos));
+                internal_st = _sink->sink(_state, block, *eos);
                 return internal_st;
             };
             status = sink_function();
diff --git a/be/src/pipeline/task_scheduler.cpp 
b/be/src/pipeline/task_scheduler.cpp
index c45186190b7..3b846b60fa8 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -127,31 +127,28 @@ void TaskScheduler::_do_work(size_t index) {
         bool eos = false;
         auto status = Status::OK();
 
-        try {
-            //TODO: use a better enclose to abstracting these
-            if (ExecEnv::GetInstance()->pipeline_tracer_context()->enabled()) {
-                TUniqueId query_id = task->query_context()->query_id();
-                std::string task_name = task->task_name();
 #ifdef __APPLE__
-                uint32_t core_id = 0;
+        uint32_t core_id = 0;
 #else
-                uint32_t core_id = sched_getcpu();
+        uint32_t core_id = sched_getcpu();
 #endif
-                std::thread::id tid = std::this_thread::get_id();
-                uint64_t thread_id = *reinterpret_cast<uint64_t*>(&tid);
-                uint64_t start_time = MonotonicMicros();
-
-                status = task->execute(&eos);
-
-                uint64_t end_time = MonotonicMicros();
-                ExecEnv::GetInstance()->pipeline_tracer_context()->record(
-                        {query_id, task_name, core_id, thread_id, start_time, 
end_time});
-            } else {
-                status = task->execute(&eos);
-            }
-        } catch (const Exception& e) {
-            status = e.to_status();
-        }
+        ASSIGN_STATUS_IF_CATCH_EXCEPTION(
+                //TODO: use a better enclose to abstracting these
+                if 
(ExecEnv::GetInstance()->pipeline_tracer_context()->enabled()) {
+                    TUniqueId query_id = task->query_context()->query_id();
+                    std::string task_name = task->task_name();
+
+                    std::thread::id tid = std::this_thread::get_id();
+                    uint64_t thread_id = *reinterpret_cast<uint64_t*>(&tid);
+                    uint64_t start_time = MonotonicMicros();
+
+                    status = task->execute(&eos);
+
+                    uint64_t end_time = MonotonicMicros();
+                    ExecEnv::GetInstance()->pipeline_tracer_context()->record(
+                            {query_id, task_name, core_id, thread_id, 
start_time, end_time});
+                } else { status = task->execute(&eos); },
+                status);
 
         task->set_previous_core_id(index);
 
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 2b0625207e2..ead6922ae6b 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -718,7 +718,8 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
                                       this, std::placeholders::_1, 
std::placeholders::_2));
     {
         SCOPED_RAW_TIMER(&duration_ns);
-        auto prepare_st = context->prepare(params);
+        Status prepare_st = Status::OK();
+        ASSIGN_STATUS_IF_CATCH_EXCEPTION(prepare_st = 
context->prepare(params), prepare_st);
         if (!prepare_st.ok()) {
             query_ctx->cancel(prepare_st, params.fragment_id);
             query_ctx->set_execution_dependency_ready();
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 668397eff3b..1cfa0ff0965 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -1463,7 +1463,22 @@ void 
PInternalService::fold_constant_expr(google::protobuf::RpcController* contr
                                           google::protobuf::Closure* done) {
     bool ret = _light_work_pool.try_offer([this, request, response, done]() {
         brpc::ClosureGuard closure_guard(done);
-        Status st = _fold_constant_expr(request->request(), response);
+        TFoldConstantParams t_request;
+        Status st = Status::OK();
+        {
+            const uint8_t* buf = (const uint8_t*)request->request().data();
+            uint32_t len = request->request().size();
+            st = deserialize_thrift_msg(buf, &len, false, &t_request);
+        }
+        if (!st.ok()) {
+            LOG(WARNING) << "exec fold constant expr failed, errmsg=" << st
+                         << " .and query_id_is: " << t_request.query_id;
+        }
+        st = _fold_constant_expr(request->request(), response);
+        if (!st.ok()) {
+            LOG(WARNING) << "exec fold constant expr failed, errmsg=" << st
+                         << " .and query_id_is: " << t_request.query_id;
+        }
         st.to_protobuf(response->mutable_status());
     });
     if (!ret) {
@@ -1481,12 +1496,8 @@ Status PInternalService::_fold_constant_expr(const 
std::string& ser_request,
         RETURN_IF_ERROR(deserialize_thrift_msg(buf, &len, false, &t_request));
     }
     std::unique_ptr<FoldConstantExecutor> fold_executor = 
std::make_unique<FoldConstantExecutor>();
-    Status st = fold_executor->fold_constant_vexpr(t_request, response);
-    if (!st.ok()) {
-        LOG(WARNING) << "exec fold constant expr failed, errmsg=" << st
-                     << " .and query_id_is: " << 
fold_executor->query_id_string();
-    }
-    return st;
+    
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(fold_executor->fold_constant_vexpr(t_request,
 response));
+    return Status::OK();
 }
 
 void PInternalService::transmit_block(google::protobuf::RpcController* 
controller,
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp 
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index e13ebf7c209..6c1f02530b2 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -246,70 +246,76 @@ void 
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
     scanner->start_scan_cpu_timer();
     Status status = Status::OK();
     bool eos = false;
-    RuntimeState* state = ctx->state();
-    DCHECK(nullptr != state);
-    if (!scanner->is_init()) {
-        status = scanner->init();
-        if (!status.ok()) {
-            eos = true;
-        }
-    }
-
-    if (!eos && !scanner->is_open()) {
-        status = scanner->open(state);
-        if (!status.ok()) {
-            eos = true;
-        }
-        scanner->set_opened();
-    }
+    ASSIGN_STATUS_IF_CATCH_EXCEPTION(
+            RuntimeState* state = ctx->state(); DCHECK(nullptr != state);
+            if (!scanner->is_init()) {
+                status = scanner->init();
+                if (!status.ok()) {
+                    eos = true;
+                }
+            }
 
-    Status rf_status = scanner->try_append_late_arrival_runtime_filter();
-    if (!rf_status.ok()) {
-        LOG(WARNING) << "Failed to append late arrival runtime filter: " << 
rf_status.to_string();
-    }
+            if (!eos && !scanner->is_open()) {
+                status = scanner->open(state);
+                if (!status.ok()) {
+                    eos = true;
+                }
+                scanner->set_opened();
+            }
 
-    size_t raw_bytes_threshold = config::doris_scanner_row_bytes;
-    size_t raw_bytes_read = 0;
-    bool first_read = true;
-    while (!eos && raw_bytes_read < raw_bytes_threshold) {
-        if (UNLIKELY(ctx->done())) {
-            eos = true;
-            break;
-        }
-        BlockUPtr free_block = ctx->get_free_block(first_read);
-        if (free_block == nullptr) {
-            break;
-        }
-        status = scanner->get_block_after_projects(state, free_block.get(), 
&eos);
-        first_read = false;
-        if (!status.ok()) {
-            LOG(WARNING) << "Scan thread read VScanner failed: " << 
status.to_string();
-            break;
-        }
-        auto free_block_bytes = free_block->allocated_bytes();
-        raw_bytes_read += free_block_bytes;
-        if (!scan_task->cached_blocks.empty() &&
-            scan_task->cached_blocks.back().first->rows() + free_block->rows() 
<=
-                    ctx->batch_size()) {
-            size_t block_size = 
scan_task->cached_blocks.back().first->allocated_bytes();
-            vectorized::MutableBlock 
mutable_block(scan_task->cached_blocks.back().first.get());
-            status = mutable_block.merge(*free_block);
-            if (!status.ok()) {
-                LOG(WARNING) << "Block merge failed: " << status.to_string();
-                break;
+            Status rf_status = 
scanner->try_append_late_arrival_runtime_filter();
+            if (!rf_status.ok()) {
+                LOG(WARNING) << "Failed to append late arrival runtime filter: 
"
+                             << rf_status.to_string();
             }
-            scan_task->cached_blocks.back().first.get()->set_columns(
-                    std::move(mutable_block.mutable_columns()));
-            ctx->return_free_block(std::move(free_block));
-            
ctx->inc_free_block_usage(scan_task->cached_blocks.back().first->allocated_bytes()
 -
-                                      block_size);
-        } else {
-            ctx->inc_free_block_usage(free_block->allocated_bytes());
-            scan_task->cached_blocks.emplace_back(std::move(free_block), 
free_block_bytes);
-        }
-    } // end for while
 
-    if (UNLIKELY(!status.ok())) {
+            size_t raw_bytes_threshold = config::doris_scanner_row_bytes;
+            size_t raw_bytes_read = 0; bool first_read = true;
+            while (!eos && raw_bytes_read < raw_bytes_threshold) {
+                if (UNLIKELY(ctx->done())) {
+                    eos = true;
+                    break;
+                }
+                BlockUPtr free_block = ctx->get_free_block(first_read);
+                if (free_block == nullptr) {
+                    break;
+                }
+                status = scanner->get_block_after_projects(state, 
free_block.get(), &eos);
+                first_read = false;
+                if (!status.ok()) {
+                    LOG(WARNING) << "Scan thread read VScanner failed: " << 
status.to_string();
+                    break;
+                }
+                auto free_block_bytes = free_block->allocated_bytes();
+                raw_bytes_read += free_block_bytes;
+                if (!scan_task->cached_blocks.empty() &&
+                    scan_task->cached_blocks.back().first->rows() + 
free_block->rows() <=
+                            ctx->batch_size()) {
+                    size_t block_size = 
scan_task->cached_blocks.back().first->allocated_bytes();
+                    vectorized::MutableBlock mutable_block(
+                            scan_task->cached_blocks.back().first.get());
+                    status = mutable_block.merge(*free_block);
+                    if (!status.ok()) {
+                        LOG(WARNING) << "Block merge failed: " << 
status.to_string();
+                        break;
+                    }
+                    scan_task->cached_blocks.back().first.get()->set_columns(
+                            std::move(mutable_block.mutable_columns()));
+                    ctx->return_free_block(std::move(free_block));
+                    ctx->inc_free_block_usage(
+                            
scan_task->cached_blocks.back().first->allocated_bytes() - block_size);
+                } else {
+                    ctx->inc_free_block_usage(free_block->allocated_bytes());
+                    
scan_task->cached_blocks.emplace_back(std::move(free_block), free_block_bytes);
+                }
+            } // end for while
+
+            if (UNLIKELY(!status.ok())) {
+                scan_task->set_status(status);
+                eos = true;
+            },
+            status);
+    if (status.is<doris::ErrorCode::MEM_ALLOC_FAILED>()) {
         scan_task->set_status(status);
         eos = true;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to