This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 40867742116 [pick](branch-3.0) pick #42216 #42421 (#42542)
40867742116 is described below
commit 408677421165ab66e7976790789eb257de57e490
Author: Xinyi Zou <[email protected]>
AuthorDate: Mon Oct 28 19:23:36 2024 +0800
[pick](branch-3.0) pick #42216 #42421 (#42542)
pick #42216 #42421
---
.../pipeline/exec/memory_scratch_sink_operator.cpp | 2 +-
be/src/runtime/fragment_mgr.cpp | 44 +++++++++++++---------
be/src/runtime/fragment_mgr.h | 1 +
be/src/runtime/record_batch_queue.cpp | 10 ++++-
be/src/runtime/result_queue_mgr.cpp | 6 ++-
be/src/service/backend_service.cpp | 14 ++++++-
6 files changed, 52 insertions(+), 25 deletions(-)
diff --git a/be/src/pipeline/exec/memory_scratch_sink_operator.cpp
b/be/src/pipeline/exec/memory_scratch_sink_operator.cpp
index 69e30791c13..131f3caf42c 100644
--- a/be/src/pipeline/exec/memory_scratch_sink_operator.cpp
+++ b/be/src/pipeline/exec/memory_scratch_sink_operator.cpp
@@ -100,7 +100,7 @@ Status MemoryScratchSinkOperatorX::sink(RuntimeState*
state, vectorized::Block*
RETURN_IF_ERROR(convert_to_arrow_batch(block, block_arrow_schema,
arrow::default_memory_pool(),
&result, _timezone_obj));
local_state._queue->blocking_put(result);
- if (local_state._queue->size() < 10) {
+ if (local_state._queue->size() > config::max_memory_sink_batch_count) {
local_state._queue_dependency->block();
}
return Status::OK();
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 7ba73442c90..e683b84e2b4 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -299,6 +299,10 @@ Status FragmentMgr::trigger_pipeline_context_report(
// including the final status when execution finishes.
void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) {
DCHECK(req.status.ok() || req.done); // if !status.ok() => done
+ if (req.coord_addr.hostname == "external") {
+ // External query (flink/spark read tablets) not need to report to FE.
+ return;
+ }
Status exec_status = req.status;
Status coord_status;
FrontendServiceConnection coord(_exec_env->frontend_client_cache(),
req.coord_addr,
@@ -836,31 +840,33 @@ Status FragmentMgr::exec_plan_fragment(const
TPipelineFragmentParams& params,
query_ctx->set_merge_controller_handler(handler);
}
- for (const auto& local_param : params.local_params) {
- const TUniqueId& fragment_instance_id =
local_param.fragment_instance_id;
+ {
+ // (query_id, fragment_id) is executed only on one BE, locks
_pipeline_map.
std::lock_guard<std::mutex> lock(_lock);
- auto iter = _pipeline_map.find({params.query_id, params.fragment_id});
- if (iter != _pipeline_map.end()) {
- return Status::InternalError("exec_plan_fragment input duplicated
fragment_id({})",
- params.fragment_id);
+ for (const auto& local_param : params.local_params) {
+ const TUniqueId& fragment_instance_id =
local_param.fragment_instance_id;
+ auto iter = _pipeline_map.find({params.query_id,
params.fragment_id});
+ if (iter != _pipeline_map.end()) {
+ return Status::InternalError(
+ "exec_plan_fragment query_id({}) input duplicated
fragment_id({})",
+ print_id(params.query_id), params.fragment_id);
+ }
+ query_ctx->fragment_instance_ids.push_back(fragment_instance_id);
}
- query_ctx->fragment_instance_ids.push_back(fragment_instance_id);
- }
- if (!params.__isset.need_wait_execution_trigger ||
!params.need_wait_execution_trigger) {
- query_ctx->set_ready_to_execute_only();
- }
-
- int64 now = duration_cast<std::chrono::milliseconds>(
- std::chrono::system_clock::now().time_since_epoch())
- .count();
- {
+ int64 now = duration_cast<std::chrono::milliseconds>(
+
std::chrono::system_clock::now().time_since_epoch())
+ .count();
g_fragment_executing_count << 1;
g_fragment_last_active_time.set_value(now);
- std::lock_guard<std::mutex> lock(_lock);
// TODO: simplify this mapping
_pipeline_map.insert({{params.query_id, params.fragment_id}, context});
}
+
+ if (!params.__isset.need_wait_execution_trigger ||
!params.need_wait_execution_trigger) {
+ query_ctx->set_ready_to_execute_only();
+ }
+
query_ctx->set_pipeline_context(params.fragment_id, context);
RETURN_IF_ERROR(context->submit());
@@ -1070,6 +1076,7 @@ void FragmentMgr::debug(std::stringstream& ss) {}
*/
Status FragmentMgr::exec_external_plan_fragment(const TScanOpenParams& params,
const TQueryPlanInfo&
t_query_plan_info,
+ const TUniqueId& query_id,
const TUniqueId&
fragment_instance_id,
std::vector<TScanColumnDesc>*
selected_columns) {
// set up desc tbl
@@ -1110,8 +1117,9 @@ Status FragmentMgr::exec_external_plan_fragment(const
TScanOpenParams& params,
// assign the param used for executing of PlanFragment-self
TPipelineInstanceParams fragment_exec_params;
- exec_fragment_params.query_id = t_query_plan_info.query_id;
+ exec_fragment_params.query_id = query_id;
fragment_exec_params.fragment_instance_id = fragment_instance_id;
+ exec_fragment_params.coord.hostname = "external";
std::map<::doris::TPlanNodeId, std::vector<TScanRangeParams>>
per_node_scan_ranges;
std::vector<TScanRangeParams> scan_ranges;
std::vector<int64_t> tablet_ids = params.tablet_ids;
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 41b63db0b23..20b2fd8cdc2 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -112,6 +112,7 @@ public:
// execute external query, all query info are packed in TScanOpenParams
Status exec_external_plan_fragment(const TScanOpenParams& params,
const TQueryPlanInfo& t_query_plan_info,
+ const TUniqueId& query_id,
const TUniqueId& fragment_instance_id,
std::vector<TScanColumnDesc>*
selected_columns);
diff --git a/be/src/runtime/record_batch_queue.cpp
b/be/src/runtime/record_batch_queue.cpp
index 83982688880..25db550db3a 100644
--- a/be/src/runtime/record_batch_queue.cpp
+++ b/be/src/runtime/record_batch_queue.cpp
@@ -23,10 +23,16 @@
namespace doris {
bool RecordBatchQueue::blocking_get(std::shared_ptr<arrow::RecordBatch>*
result) {
- auto res = _queue.blocking_get(result);
- if (_dep && size() <= 10) {
+ if (_dep && size() <= config::max_memory_sink_batch_count) {
_dep->set_ready();
}
+ // Before each get queue, will set sink task dependency ready.
+ // so if the sink task put queue faster than the fetch result get queue,
+ // the queue size will always be 10.
+ // be sure to set sink dependency ready before getting queue.
+ // otherwise, if queue is emptied after sink task put queue and before
block dependency,
+ // get queue will stuck and will never set sink dependency ready.
+ auto res = _queue.blocking_get(result);
return res;
}
diff --git a/be/src/runtime/result_queue_mgr.cpp
b/be/src/runtime/result_queue_mgr.cpp
index 8090a3e6ee4..8a6e5b10935 100644
--- a/be/src/runtime/result_queue_mgr.cpp
+++ b/be/src/runtime/result_queue_mgr.cpp
@@ -82,8 +82,10 @@ void ResultQueueMgr::create_queue(const TUniqueId&
fragment_instance_id,
if (iter != _fragment_queue_map.end()) {
*queue = iter->second;
} else {
- // the blocking queue size = 20 (default), in this way, one queue have
20 * 1024 rows at most
- BlockQueueSharedPtr tmp(new
RecordBatchQueue(config::max_memory_sink_batch_count));
+ // max_elements will not take effect, because when queue size reaches
max_memory_sink_batch_count,
+ // MemoryScratchSink will block queue dependency, in this way, one
queue have 20 * 1024 rows at most.
+ // use MemoryScratchSink queue dependency instead of BlockingQueue to
achieve blocking.
+ BlockQueueSharedPtr tmp(new
RecordBatchQueue(config::max_memory_sink_batch_count * 2));
_fragment_queue_map.insert(std::make_pair(fragment_instance_id, tmp));
*queue = tmp;
}
diff --git a/be/src/service/backend_service.cpp
b/be/src/service/backend_service.cpp
index d56aa49b19b..e6fdfaa8765 100644
--- a/be/src/service/backend_service.cpp
+++ b/be/src/service/backend_service.cpp
@@ -802,6 +802,11 @@ void BaseBackendService::submit_routine_load_task(TStatus&
t_status,
void BaseBackendService::open_scanner(TScanOpenResult& result_, const
TScanOpenParams& params) {
TStatus t_status;
TUniqueId fragment_instance_id = generate_uuid();
+ // A query_id is randomly generated to replace t_query_plan_info.query_id.
+ // external query does not need to report anything to FE, so the query_id
can be changed.
+ // Otherwise, multiple independent concurrent open tablet scanners have
the same query_id.
+ // when one of the scanners ends, the other scanners will be canceled
through FragmentMgr.cancel(query_id).
+ TUniqueId query_id = generate_uuid();
std::shared_ptr<ScanContext> p_context;
static_cast<void>(_exec_env->external_scan_context_mgr()->create_scan_context(&p_context));
p_context->fragment_instance_id = fragment_instance_id;
@@ -838,13 +843,18 @@ void BaseBackendService::open_scanner(TScanOpenResult&
result_, const TScanOpenP
<< " deserialize error, should not be modified after returned
Doris FE processed";
exec_st = Status::InvalidArgument(msg.str());
}
- p_context->query_id = t_query_plan_info.query_id;
+ p_context->query_id = query_id;
}
std::vector<TScanColumnDesc> selected_columns;
if (exec_st.ok()) {
// start the scan procedure
+ LOG(INFO) << fmt::format(
+ "exec external scanner, old_query_id = {}, new_query_id = {},
fragment_instance_id "
+ "= {}",
+ print_id(t_query_plan_info.query_id), print_id(query_id),
+ print_id(fragment_instance_id));
exec_st = _exec_env->fragment_mgr()->exec_external_plan_fragment(
- params, t_query_plan_info, fragment_instance_id,
&selected_columns);
+ params, t_query_plan_info, query_id, fragment_instance_id,
&selected_columns);
}
exec_st.to_thrift(&t_status);
//return status
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]