This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ae7caba54d2 [fix](group commit) remove non pipeline code (#37018)
ae7caba54d2 is described below
commit ae7caba54d2a9e30a0c6e7e157c04e1d7f0d81f6
Author: meiyi <[email protected]>
AuthorDate: Tue Jul 2 10:29:02 2024 +0800
[fix](group commit) remove non pipeline code (#37018)
remove non pipilene code in group_commit_mgr
---
be/src/runtime/group_commit_mgr.cpp | 67 +++++++++++--------------------------
be/src/runtime/group_commit_mgr.h | 4 +--
2 files changed, 21 insertions(+), 50 deletions(-)
diff --git a/be/src/runtime/group_commit_mgr.cpp
b/be/src/runtime/group_commit_mgr.cpp
index 111780c9a42..8a81c942fd3 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -266,14 +266,12 @@ Status GroupCommitTable::get_first_block_load_queue(
if (base_schema_version == inner_block_queue->schema_version) {
if (inner_block_queue->add_load_id(load_id,
put_block_dep).ok()) {
load_block_queue = inner_block_queue;
-
return Status::OK();
}
} else {
return Status::DataQualityError<false>(
"schema version not match, maybe a schema change
is in process. "
- "Please "
- "retry this load manually.");
+ "Please retry this load manually.");
}
}
}
@@ -284,13 +282,13 @@ Status GroupCommitTable::get_first_block_load_queue(
if (try_to_get_matched_queue().ok()) {
return Status::OK();
}
+ create_plan_dep->block();
+ _create_plan_deps.push_back(create_plan_dep);
if (!_is_creating_plan_fragment) {
_is_creating_plan_fragment = true;
- create_plan_dep->block();
RETURN_IF_ERROR(_thread_pool->submit_func([&, be_exe_version,
mem_tracker,
dep = create_plan_dep] {
Defer defer {[&, dep = dep]() {
- dep->set_ready();
std::unique_lock l(_lock);
for (auto it : _create_plan_deps) {
it->set_ready();
@@ -303,9 +301,6 @@ Status GroupCommitTable::get_first_block_load_queue(
LOG(WARNING) << "create group commit load error, st=" <<
st.to_string();
}
}));
- } else {
- create_plan_dep->block();
- _create_plan_deps.push_back(create_plan_dep);
}
return try_to_get_matched_queue();
}
@@ -313,16 +308,14 @@ Status GroupCommitTable::get_first_block_load_queue(
Status GroupCommitTable::_create_group_commit_load(int be_exe_version,
std::shared_ptr<MemTrackerLimiter> mem_tracker) {
Status st = Status::OK();
- TStreamLoadPutRequest request;
- UniqueId load_id = UniqueId::gen_uid();
- TUniqueId tload_id;
- bool is_pipeline = true;
TStreamLoadPutResult result;
std::string label;
int64_t txn_id;
TUniqueId instance_id;
{
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker);
+ UniqueId load_id = UniqueId::gen_uid();
+ TUniqueId tload_id;
tload_id.__set_hi(load_id.hi);
tload_id.__set_lo(load_id.lo);
std::regex reg("-");
@@ -330,6 +323,7 @@ Status GroupCommitTable::_create_group_commit_load(int
be_exe_version,
std::stringstream ss;
ss << "insert into doris_internal_table_id(" << _table_id << ") WITH
LABEL " << label
<< " select * from group_commit(\"table_id\"=\"" << _table_id <<
"\")";
+ TStreamLoadPutRequest request;
request.__set_load_sql(ss.str());
request.__set_loadId(tload_id);
request.__set_label(label);
@@ -355,51 +349,36 @@ Status GroupCommitTable::_create_group_commit_load(int
be_exe_version,
return st;
}
st = Status::create<false>(result.status);
+ if (st.ok() && !result.__isset.pipeline_params) {
+ st = Status::InternalError("Non-pipeline is disabled!");
+ }
if (!st.ok()) {
LOG(WARNING) << "create group commit load error, st=" <<
st.to_string();
return st;
}
auto schema_version = result.base_schema_version;
- is_pipeline = result.__isset.pipeline_params;
- auto& params = result.params;
auto& pipeline_params = result.pipeline_params;
- if (!is_pipeline) {
- DCHECK(params.fragment.output_sink.olap_table_sink.db_id ==
_db_id);
- txn_id = params.txn_conf.txn_id;
- instance_id = params.params.fragment_instance_id;
- } else {
- DCHECK(pipeline_params.fragment.output_sink.olap_table_sink.db_id
== _db_id);
- txn_id = pipeline_params.txn_conf.txn_id;
- DCHECK(pipeline_params.local_params.size() == 1);
- instance_id = pipeline_params.local_params[0].fragment_instance_id;
- }
+ DCHECK(pipeline_params.fragment.output_sink.olap_table_sink.db_id ==
_db_id);
+ txn_id = pipeline_params.txn_conf.txn_id;
+ DCHECK(pipeline_params.local_params.size() == 1);
+ instance_id = pipeline_params.local_params[0].fragment_instance_id;
VLOG_DEBUG << "create plan fragment, db_id=" << _db_id << ", table="
<< _table_id
<< ", schema version=" << schema_version << ", label=" <<
label
- << ", txn_id=" << txn_id << ", instance_id=" <<
print_id(instance_id)
- << ", is_pipeline=" << is_pipeline;
+ << ", txn_id=" << txn_id << ", instance_id=" <<
print_id(instance_id);
{
auto load_block_queue = std::make_shared<LoadBlockQueue>(
instance_id, label, txn_id, schema_version,
_all_block_queues_bytes,
result.wait_internal_group_commit_finish,
result.group_commit_interval_ms,
result.group_commit_data_bytes);
std::unique_lock l(_lock);
- //create wal
- if (!is_pipeline) {
- RETURN_IF_ERROR(load_block_queue->create_wal(
- _db_id, _table_id, txn_id, label, _exec_env->wal_mgr(),
-
params.fragment.output_sink.olap_table_sink.schema.slot_descs,
- be_exe_version));
- } else {
- RETURN_IF_ERROR(load_block_queue->create_wal(
- _db_id, _table_id, txn_id, label, _exec_env->wal_mgr(),
-
pipeline_params.fragment.output_sink.olap_table_sink.schema.slot_descs,
- be_exe_version));
- }
+ RETURN_IF_ERROR(load_block_queue->create_wal(
+ _db_id, _table_id, txn_id, label, _exec_env->wal_mgr(),
+
pipeline_params.fragment.output_sink.olap_table_sink.schema.slot_descs,
+ be_exe_version));
_load_block_queues.emplace(instance_id, load_block_queue);
}
}
- st = _exec_plan_fragment(_db_id, _table_id, label, txn_id, is_pipeline,
result.params,
- result.pipeline_params);
+ st = _exec_plan_fragment(_db_id, _table_id, label, txn_id,
result.pipeline_params);
if (!st.ok()) {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker);
auto finish_st = _finish_group_commit_load(_db_id, _table_id, label,
txn_id, instance_id,
@@ -533,8 +512,6 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t
db_id, int64_t table_
Status GroupCommitTable::_exec_plan_fragment(int64_t db_id, int64_t table_id,
const std::string& label, int64_t
txn_id,
- bool is_pipeline,
- const TExecPlanFragmentParams&
params,
const TPipelineFragmentParams&
pipeline_params) {
auto finish_cb = [db_id, table_id, label, txn_id, this](RuntimeState*
state, Status* status) {
DCHECK(state);
@@ -545,11 +522,7 @@ Status GroupCommitTable::_exec_plan_fragment(int64_t
db_id, int64_t table_id,
<< ", st=" << finish_st.to_string();
}
};
- if (is_pipeline) {
- return _exec_env->fragment_mgr()->exec_plan_fragment(pipeline_params,
finish_cb);
- } else {
- return _exec_env->fragment_mgr()->exec_plan_fragment(params,
finish_cb);
- }
+ return _exec_env->fragment_mgr()->exec_plan_fragment(pipeline_params,
finish_cb);
}
Status GroupCommitTable::get_load_block_queue(const TUniqueId& instance_id,
diff --git a/be/src/runtime/group_commit_mgr.h
b/be/src/runtime/group_commit_mgr.h
index c668197e8dc..36c51746ef4 100644
--- a/be/src/runtime/group_commit_mgr.h
+++ b/be/src/runtime/group_commit_mgr.h
@@ -168,9 +168,7 @@ private:
Status _create_group_commit_load(int be_exe_version,
std::shared_ptr<MemTrackerLimiter>
mem_tracker);
Status _exec_plan_fragment(int64_t db_id, int64_t table_id, const
std::string& label,
- int64_t txn_id, bool is_pipeline,
- const TExecPlanFragmentParams& params,
- const TPipelineFragmentParams& pipeline_params);
+ int64_t txn_id, const TPipelineFragmentParams&
pipeline_params);
Status _finish_group_commit_load(int64_t db_id, int64_t table_id, const
std::string& label,
int64_t txn_id, const TUniqueId&
instance_id, Status& status,
RuntimeState* state);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]