This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ff5fb36bfe2 [fix](memory) Refactor memory allocated failure processing
(#36090)
ff5fb36bfe2 is described below
commit ff5fb36bfe2369918a7964645cc76aed5d1a5dc2
Author: Gabriel <[email protected]>
AuthorDate: Wed Jun 12 15:18:54 2024 +0800
[fix](memory) Refactor memory allocated failure processing (#36090)
In this PR,
1. Do not catch exception in operators.
2. Catch exception where we will get an exception.
---
be/src/common/exception.h | 17 +++
be/src/pipeline/exec/aggregation_sink_operator.cpp | 2 +-
.../distinct_streaming_aggregation_operator.cpp | 3 +-
be/src/pipeline/exec/hashjoin_probe_operator.cpp | 58 +++++-----
.../exec/join/process_hash_table_probe_impl.h | 4 +-
.../exec/nested_loop_join_probe_operator.cpp | 15 +--
be/src/pipeline/exec/sort_source_operator.cpp | 3 +-
.../exec/streaming_aggregation_operator.cpp | 2 +-
be/src/pipeline/pipeline_fragment_context.cpp | 13 +--
be/src/pipeline/pipeline_task.cpp | 4 +-
be/src/pipeline/task_scheduler.cpp | 41 ++++---
be/src/runtime/fragment_mgr.cpp | 3 +-
be/src/service/internal_service.cpp | 25 ++--
be/src/vec/exec/scan/scanner_scheduler.cpp | 126 +++++++++++----------
14 files changed, 170 insertions(+), 146 deletions(-)
diff --git a/be/src/common/exception.h b/be/src/common/exception.h
index 0ec8d334e8b..ce44e658749 100644
--- a/be/src/common/exception.h
+++ b/be/src/common/exception.h
@@ -122,3 +122,20 @@ inline const std::string& Exception::to_string() const {
return Status::Error<false>(e.code(), e.to_string());
\
}
\
} while (0)
+
+#define ASSIGN_STATUS_IF_CATCH_EXCEPTION(stmt, status_)
\
+ do {
\
+ try {
\
+ doris::enable_thread_catch_bad_alloc++;
\
+ Defer defer {[&]() { doris::enable_thread_catch_bad_alloc--; }};
\
+ { stmt; }
\
+ } catch (const doris::Exception& e) {
\
+ if (e.code() == doris::ErrorCode::MEM_ALLOC_FAILED) {
\
+ status_ = Status::MemoryLimitExceeded(fmt::format(
\
+ "PreCatch error code:{}, {}, __FILE__:{}, __LINE__:{},
__FUNCTION__:{}", \
+ e.code(), e.to_string(), __FILE__, __LINE__,
__PRETTY_FUNCTION__)); \
+ } else {
\
+ status_ = e.to_status();
\
+ }
\
+ }
\
+ } while (0);
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index 7a4a9d9c951..8e34de9bf98 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -151,7 +151,7 @@ Status AggSinkLocalState::open(RuntimeState* state) {
// this could cause unable to get JVM
if (Base::_shared_state->probe_expr_ctxs.empty()) {
// _create_agg_status may acquire a lot of memory, may allocate failed
when memory is very few
-
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_create_agg_status(_agg_data->without_key));
+ RETURN_IF_ERROR(_create_agg_status(_agg_data->without_key));
_shared_state->agg_data_created_without_key = true;
}
return Status::OK();
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
index 73ce8ce5fb4..4390bebcfdd 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
@@ -203,8 +203,7 @@ Status
DistinctStreamingAggLocalState::_distinct_pre_agg_with_serialized_key(
_distinct_row.reserve(rows);
if (!_stop_emplace_flag) {
- RETURN_IF_CATCH_EXCEPTION(
- _emplace_into_hash_table_to_distinct(_distinct_row,
key_columns, rows));
+ _emplace_into_hash_table_to_distinct(_distinct_row, key_columns, rows);
}
bool mem_reuse =
_parent->cast<DistinctStreamingAggOperatorX>()._make_nullable_keys.empty() &&
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
index dc2df872bd5..b4d511fe2dd 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
@@ -294,37 +294,35 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState*
state, vectorized::Bloc
Status st;
if (local_state._probe_index < local_state._probe_block.rows()) {
DCHECK(local_state._has_set_need_null_map_for_probe);
- RETURN_IF_CATCH_EXCEPTION({
- std::visit(
- [&](auto&& arg, auto&& process_hashtable_ctx, auto
need_null_map_for_probe,
- auto ignore_null) {
- using HashTableProbeType =
std::decay_t<decltype(process_hashtable_ctx)>;
- if constexpr (!std::is_same_v<HashTableProbeType,
std::monostate>) {
- using HashTableCtxType =
std::decay_t<decltype(arg)>;
- if constexpr (!std::is_same_v<HashTableCtxType,
std::monostate>) {
- st = process_hashtable_ctx.template
process<need_null_map_for_probe,
-
ignore_null>(
- arg,
- need_null_map_for_probe
- ?
&local_state._null_map_column->get_data()
- : nullptr,
- mutable_join_block, &temp_block,
- local_state._probe_block.rows(),
_is_mark_join,
- _have_other_join_conjunct);
- local_state._mem_tracker->set_consumption(
- arg.serialized_keys_size(false));
- } else {
- st = Status::InternalError("uninited hash
table");
- }
+ std::visit(
+ [&](auto&& arg, auto&& process_hashtable_ctx, auto
need_null_map_for_probe,
+ auto ignore_null) {
+ using HashTableProbeType =
std::decay_t<decltype(process_hashtable_ctx)>;
+ if constexpr (!std::is_same_v<HashTableProbeType,
std::monostate>) {
+ using HashTableCtxType = std::decay_t<decltype(arg)>;
+ if constexpr (!std::is_same_v<HashTableCtxType,
std::monostate>) {
+ st = process_hashtable_ctx
+ .template
process<need_null_map_for_probe, ignore_null>(
+ arg,
+ need_null_map_for_probe
+ ?
&local_state._null_map_column->get_data()
+ : nullptr,
+ mutable_join_block,
&temp_block,
+
local_state._probe_block.rows(), _is_mark_join,
+ _have_other_join_conjunct);
+ local_state._mem_tracker->set_consumption(
+ arg.serialized_keys_size(false));
} else {
- st = Status::InternalError("uninited hash table
probe");
+ st = Status::InternalError("uninited hash table");
}
- },
- *local_state._shared_state->hash_table_variants,
- *local_state._process_hashtable_ctx_variants,
-
vectorized::make_bool_variant(local_state._need_null_map_for_probe),
-
vectorized::make_bool_variant(local_state._shared_state->probe_ignore_null));
- });
+ } else {
+ st = Status::InternalError("uninited hash table
probe");
+ }
+ },
+ *local_state._shared_state->hash_table_variants,
+ *local_state._process_hashtable_ctx_variants,
+
vectorized::make_bool_variant(local_state._need_null_map_for_probe),
+
vectorized::make_bool_variant(local_state._shared_state->probe_ignore_null));
} else if (local_state._probe_eos) {
if (_is_right_semi_anti || (_is_outer_join && _join_op !=
TJoinOp::LEFT_OUTER_JOIN)) {
std::visit(
@@ -457,7 +455,7 @@ Status
HashJoinProbeLocalState::filter_data_and_build_output(RuntimeState* state
temp_block->columns()));
}
- RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_build_output_block(temp_block,
output_block, false));
+ RETURN_IF_ERROR(_build_output_block(temp_block, output_block, false));
_reset_tuple_is_null_column();
reached_limit(output_block, eos);
return Status::OK();
diff --git a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
index 5e023f2c861..3ffdb9cb990 100644
--- a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
+++ b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
@@ -283,10 +283,10 @@ Status
ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
return true;
};
- RETURN_IF_CATCH_EXCEPTION(probe_side_output_column(
+ probe_side_output_column(
mcol, *_left_output_slot_flags, current_offset,
last_probe_index,
check_all_match_one(_probe_indexs, last_probe_index,
current_offset),
- with_other_conjuncts));
+ with_other_conjuncts);
}
output_block->swap(mutable_block.to_block());
diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
index 7dc31cabddb..84112151e63 100644
--- a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
@@ -165,7 +165,7 @@ Status
NestedLoopJoinProbeLocalState::generate_join_block_data(RuntimeState* sta
}
if constexpr (set_probe_side_flag) {
- RETURN_IF_ERROR_OR_CATCH_EXCEPTION(
+ RETURN_IF_ERROR(
(_do_filtering_and_update_visited_flags<set_build_side_flag,
set_probe_side_flag, ignore_null>(
&_join_block, !p._is_left_semi_anti)));
@@ -185,10 +185,9 @@ Status
NestedLoopJoinProbeLocalState::generate_join_block_data(RuntimeState* sta
}
if constexpr (!set_probe_side_flag) {
- RETURN_IF_ERROR_OR_CATCH_EXCEPTION(
- (_do_filtering_and_update_visited_flags<set_build_side_flag,
set_probe_side_flag,
-
ignore_null>(&_join_block,
-
!p._is_right_semi_anti)));
+
RETURN_IF_ERROR((_do_filtering_and_update_visited_flags<set_build_side_flag,
+
set_probe_side_flag, ignore_null>(
+ &_join_block, !p._is_right_semi_anti)));
_update_additional_flags(&_join_block);
}
@@ -499,8 +498,7 @@ Status NestedLoopJoinProbeOperatorX::pull(RuntimeState*
state, vectorized::Block
bool* eos) const {
auto& local_state = get_local_state(state);
if (_is_output_left_side_only) {
- RETURN_IF_ERROR_OR_CATCH_EXCEPTION(
-
local_state._build_output_block(local_state._child_block.get(), block));
+
RETURN_IF_ERROR(local_state._build_output_block(local_state._child_block.get(),
block));
*eos = local_state._shared_state->left_side_eos;
local_state._need_more_input_data =
!local_state._shared_state->left_side_eos;
} else {
@@ -522,8 +520,7 @@ Status NestedLoopJoinProbeOperatorX::pull(RuntimeState*
state, vectorized::Block
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(
local_state._conjuncts, &tmp_block,
tmp_block.columns()));
}
- RETURN_IF_ERROR_OR_CATCH_EXCEPTION(
- local_state._build_output_block(&tmp_block, block, false));
+ RETURN_IF_ERROR(local_state._build_output_block(&tmp_block, block,
false));
local_state._reset_tuple_is_null_column();
}
local_state._join_block.clear_column_data();
diff --git a/be/src/pipeline/exec/sort_source_operator.cpp
b/be/src/pipeline/exec/sort_source_operator.cpp
index 89262828708..fa891196151 100644
--- a/be/src/pipeline/exec/sort_source_operator.cpp
+++ b/be/src/pipeline/exec/sort_source_operator.cpp
@@ -61,8 +61,7 @@ Status SortSourceOperatorX::open(RuntimeState* state) {
Status SortSourceOperatorX::get_block(RuntimeState* state, vectorized::Block*
block, bool* eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
- RETURN_IF_ERROR_OR_CATCH_EXCEPTION(
- local_state._shared_state->sorter->get_next(state, block, eos));
+ RETURN_IF_ERROR(local_state._shared_state->sorter->get_next(state, block,
eos));
local_state.reached_limit(block, eos);
return Status::OK();
}
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
index 40b63783c12..85cf8487575 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
@@ -716,7 +716,7 @@ Status
StreamingAggLocalState::_pre_agg_with_serialized_key(doris::vectorized::B
_agg_data->method_variant));
if (!ret_flag) {
- RETURN_IF_CATCH_EXCEPTION(_emplace_into_hash_table(_places.data(),
key_columns, rows));
+ _emplace_into_hash_table(_places.data(), key_columns, rows);
for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_batch_add(
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 72c06721d89..7a78c255170 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -306,18 +306,17 @@ Status PipelineFragmentContext::prepare(const
doris::TPipelineFragmentParams& re
SCOPED_TIMER(_build_pipelines_timer);
// 2. Build pipelines with operators in this fragment.
auto root_pipeline = add_pipeline();
-
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_build_pipelines(_runtime_state->obj_pool(),
request,
-
*_query_ctx->desc_tbl, &_root_op,
- root_pipeline));
+ RETURN_IF_ERROR(_build_pipelines(_runtime_state->obj_pool(), request,
*_query_ctx->desc_tbl,
+ &_root_op, root_pipeline));
// 3. Create sink operator
if (!request.fragment.__isset.output_sink) {
return Status::InternalError("No output sink in this fragment!");
}
- RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_create_data_sink(
- _runtime_state->obj_pool(), request.fragment.output_sink,
- request.fragment.output_exprs, request,
root_pipeline->output_row_desc(),
- _runtime_state.get(), *_desc_tbl, root_pipeline->id()));
+ RETURN_IF_ERROR(_create_data_sink(_runtime_state->obj_pool(),
request.fragment.output_sink,
+ request.fragment.output_exprs,
request,
+ root_pipeline->output_row_desc(),
_runtime_state.get(),
+ *_desc_tbl, root_pipeline->id()));
RETURN_IF_ERROR(_sink->init(request.fragment.output_sink));
RETURN_IF_ERROR(root_pipeline->set_sink(_sink));
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index c43410e68a4..09f32d9d23e 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -343,7 +343,7 @@ Status PipelineTask::execute(bool* eos) {
} else {
SCOPED_TIMER(_get_block_timer);
_get_block_counter->update(1);
-
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_root->get_block_after_projects(_state,
block, eos));
+ RETURN_IF_ERROR(_root->get_block_after_projects(_state, block,
eos));
}
if (_block->rows() != 0 || *eos) {
@@ -353,7 +353,7 @@ Status PipelineTask::execute(bool* eos) {
// return error status with EOF, it is special, could not return
directly.
auto sink_function = [&]() -> Status {
Status internal_st;
- RETURN_IF_CATCH_EXCEPTION(internal_st = _sink->sink(_state,
block, *eos));
+ internal_st = _sink->sink(_state, block, *eos);
return internal_st;
};
status = sink_function();
diff --git a/be/src/pipeline/task_scheduler.cpp
b/be/src/pipeline/task_scheduler.cpp
index c45186190b7..3b846b60fa8 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -127,31 +127,28 @@ void TaskScheduler::_do_work(size_t index) {
bool eos = false;
auto status = Status::OK();
- try {
- //TODO: use a better enclose to abstracting these
- if (ExecEnv::GetInstance()->pipeline_tracer_context()->enabled()) {
- TUniqueId query_id = task->query_context()->query_id();
- std::string task_name = task->task_name();
#ifdef __APPLE__
- uint32_t core_id = 0;
+ uint32_t core_id = 0;
#else
- uint32_t core_id = sched_getcpu();
+ uint32_t core_id = sched_getcpu();
#endif
- std::thread::id tid = std::this_thread::get_id();
- uint64_t thread_id = *reinterpret_cast<uint64_t*>(&tid);
- uint64_t start_time = MonotonicMicros();
-
- status = task->execute(&eos);
-
- uint64_t end_time = MonotonicMicros();
- ExecEnv::GetInstance()->pipeline_tracer_context()->record(
- {query_id, task_name, core_id, thread_id, start_time,
end_time});
- } else {
- status = task->execute(&eos);
- }
- } catch (const Exception& e) {
- status = e.to_status();
- }
+ ASSIGN_STATUS_IF_CATCH_EXCEPTION(
+ //TODO: use a better enclose to abstracting these
+ if
(ExecEnv::GetInstance()->pipeline_tracer_context()->enabled()) {
+ TUniqueId query_id = task->query_context()->query_id();
+ std::string task_name = task->task_name();
+
+ std::thread::id tid = std::this_thread::get_id();
+ uint64_t thread_id = *reinterpret_cast<uint64_t*>(&tid);
+ uint64_t start_time = MonotonicMicros();
+
+ status = task->execute(&eos);
+
+ uint64_t end_time = MonotonicMicros();
+ ExecEnv::GetInstance()->pipeline_tracer_context()->record(
+ {query_id, task_name, core_id, thread_id,
start_time, end_time});
+ } else { status = task->execute(&eos); },
+ status);
task->set_previous_core_id(index);
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 2b0625207e2..ead6922ae6b 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -718,7 +718,8 @@ Status FragmentMgr::exec_plan_fragment(const
TPipelineFragmentParams& params,
this, std::placeholders::_1,
std::placeholders::_2));
{
SCOPED_RAW_TIMER(&duration_ns);
- auto prepare_st = context->prepare(params);
+ Status prepare_st = Status::OK();
+ ASSIGN_STATUS_IF_CATCH_EXCEPTION(prepare_st =
context->prepare(params), prepare_st);
if (!prepare_st.ok()) {
query_ctx->cancel(prepare_st, params.fragment_id);
query_ctx->set_execution_dependency_ready();
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index 668397eff3b..1cfa0ff0965 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -1463,7 +1463,22 @@ void
PInternalService::fold_constant_expr(google::protobuf::RpcController* contr
google::protobuf::Closure* done) {
bool ret = _light_work_pool.try_offer([this, request, response, done]() {
brpc::ClosureGuard closure_guard(done);
- Status st = _fold_constant_expr(request->request(), response);
+ TFoldConstantParams t_request;
+ Status st = Status::OK();
+ {
+ const uint8_t* buf = (const uint8_t*)request->request().data();
+ uint32_t len = request->request().size();
+ st = deserialize_thrift_msg(buf, &len, false, &t_request);
+ }
+ if (!st.ok()) {
+ LOG(WARNING) << "exec fold constant expr failed, errmsg=" << st
+ << " .and query_id_is: " << t_request.query_id;
+ }
+ st = _fold_constant_expr(request->request(), response);
+ if (!st.ok()) {
+ LOG(WARNING) << "exec fold constant expr failed, errmsg=" << st
+ << " .and query_id_is: " << t_request.query_id;
+ }
st.to_protobuf(response->mutable_status());
});
if (!ret) {
@@ -1481,12 +1496,8 @@ Status PInternalService::_fold_constant_expr(const
std::string& ser_request,
RETURN_IF_ERROR(deserialize_thrift_msg(buf, &len, false, &t_request));
}
std::unique_ptr<FoldConstantExecutor> fold_executor =
std::make_unique<FoldConstantExecutor>();
- Status st = fold_executor->fold_constant_vexpr(t_request, response);
- if (!st.ok()) {
- LOG(WARNING) << "exec fold constant expr failed, errmsg=" << st
- << " .and query_id_is: " <<
fold_executor->query_id_string();
- }
- return st;
+
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(fold_executor->fold_constant_vexpr(t_request,
response));
+ return Status::OK();
}
void PInternalService::transmit_block(google::protobuf::RpcController*
controller,
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index e13ebf7c209..6c1f02530b2 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -246,70 +246,76 @@ void
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
scanner->start_scan_cpu_timer();
Status status = Status::OK();
bool eos = false;
- RuntimeState* state = ctx->state();
- DCHECK(nullptr != state);
- if (!scanner->is_init()) {
- status = scanner->init();
- if (!status.ok()) {
- eos = true;
- }
- }
-
- if (!eos && !scanner->is_open()) {
- status = scanner->open(state);
- if (!status.ok()) {
- eos = true;
- }
- scanner->set_opened();
- }
+ ASSIGN_STATUS_IF_CATCH_EXCEPTION(
+ RuntimeState* state = ctx->state(); DCHECK(nullptr != state);
+ if (!scanner->is_init()) {
+ status = scanner->init();
+ if (!status.ok()) {
+ eos = true;
+ }
+ }
- Status rf_status = scanner->try_append_late_arrival_runtime_filter();
- if (!rf_status.ok()) {
- LOG(WARNING) << "Failed to append late arrival runtime filter: " <<
rf_status.to_string();
- }
+ if (!eos && !scanner->is_open()) {
+ status = scanner->open(state);
+ if (!status.ok()) {
+ eos = true;
+ }
+ scanner->set_opened();
+ }
- size_t raw_bytes_threshold = config::doris_scanner_row_bytes;
- size_t raw_bytes_read = 0;
- bool first_read = true;
- while (!eos && raw_bytes_read < raw_bytes_threshold) {
- if (UNLIKELY(ctx->done())) {
- eos = true;
- break;
- }
- BlockUPtr free_block = ctx->get_free_block(first_read);
- if (free_block == nullptr) {
- break;
- }
- status = scanner->get_block_after_projects(state, free_block.get(),
&eos);
- first_read = false;
- if (!status.ok()) {
- LOG(WARNING) << "Scan thread read VScanner failed: " <<
status.to_string();
- break;
- }
- auto free_block_bytes = free_block->allocated_bytes();
- raw_bytes_read += free_block_bytes;
- if (!scan_task->cached_blocks.empty() &&
- scan_task->cached_blocks.back().first->rows() + free_block->rows()
<=
- ctx->batch_size()) {
- size_t block_size =
scan_task->cached_blocks.back().first->allocated_bytes();
- vectorized::MutableBlock
mutable_block(scan_task->cached_blocks.back().first.get());
- status = mutable_block.merge(*free_block);
- if (!status.ok()) {
- LOG(WARNING) << "Block merge failed: " << status.to_string();
- break;
+ Status rf_status =
scanner->try_append_late_arrival_runtime_filter();
+ if (!rf_status.ok()) {
+ LOG(WARNING) << "Failed to append late arrival runtime filter:
"
+ << rf_status.to_string();
}
- scan_task->cached_blocks.back().first.get()->set_columns(
- std::move(mutable_block.mutable_columns()));
- ctx->return_free_block(std::move(free_block));
-
ctx->inc_free_block_usage(scan_task->cached_blocks.back().first->allocated_bytes()
-
- block_size);
- } else {
- ctx->inc_free_block_usage(free_block->allocated_bytes());
- scan_task->cached_blocks.emplace_back(std::move(free_block),
free_block_bytes);
- }
- } // end for while
- if (UNLIKELY(!status.ok())) {
+ size_t raw_bytes_threshold = config::doris_scanner_row_bytes;
+ size_t raw_bytes_read = 0; bool first_read = true;
+ while (!eos && raw_bytes_read < raw_bytes_threshold) {
+ if (UNLIKELY(ctx->done())) {
+ eos = true;
+ break;
+ }
+ BlockUPtr free_block = ctx->get_free_block(first_read);
+ if (free_block == nullptr) {
+ break;
+ }
+ status = scanner->get_block_after_projects(state,
free_block.get(), &eos);
+ first_read = false;
+ if (!status.ok()) {
+ LOG(WARNING) << "Scan thread read VScanner failed: " <<
status.to_string();
+ break;
+ }
+ auto free_block_bytes = free_block->allocated_bytes();
+ raw_bytes_read += free_block_bytes;
+ if (!scan_task->cached_blocks.empty() &&
+ scan_task->cached_blocks.back().first->rows() +
free_block->rows() <=
+ ctx->batch_size()) {
+ size_t block_size =
scan_task->cached_blocks.back().first->allocated_bytes();
+ vectorized::MutableBlock mutable_block(
+ scan_task->cached_blocks.back().first.get());
+ status = mutable_block.merge(*free_block);
+ if (!status.ok()) {
+ LOG(WARNING) << "Block merge failed: " <<
status.to_string();
+ break;
+ }
+ scan_task->cached_blocks.back().first.get()->set_columns(
+ std::move(mutable_block.mutable_columns()));
+ ctx->return_free_block(std::move(free_block));
+ ctx->inc_free_block_usage(
+
scan_task->cached_blocks.back().first->allocated_bytes() - block_size);
+ } else {
+ ctx->inc_free_block_usage(free_block->allocated_bytes());
+
scan_task->cached_blocks.emplace_back(std::move(free_block), free_block_bytes);
+ }
+ } // end for while
+
+ if (UNLIKELY(!status.ok())) {
+ scan_task->set_status(status);
+ eos = true;
+ },
+ status);
+ if (status.is<doris::ErrorCode::MEM_ALLOC_FAILED>()) {
scan_task->set_status(status);
eos = true;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]