This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a05003fbe1 [fix](pipeline) fix remove pipeline_x_context from fragment
manager (#24062)
a05003fbe1 is described below
commit a05003fbe1c8b3443fe4b52389cdc93f997e300e
Author: Lijia Liu <[email protected]>
AuthorDate: Sun Sep 10 20:53:26 2023 +0800
[fix](pipeline) fix remove pipeline_x_context from fragment manager (#24062)
---
be/src/pipeline/pipeline_fragment_context.h | 4 ++++
be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h | 2 +-
be/src/runtime/fragment_mgr.cpp | 14 +-------------
be/src/runtime/fragment_mgr.h | 3 ---
be/src/runtime/query_context.h | 4 +---
5 files changed, 7 insertions(+), 20 deletions(-)
diff --git a/be/src/pipeline/pipeline_fragment_context.h
b/be/src/pipeline/pipeline_fragment_context.h
index 47dad12d7c..415015a807 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -135,6 +135,10 @@ public:
}
bool is_group_commit() { return _group_commit; }
+ virtual void instance_ids(std::vector<TUniqueId>& ins_ids) const {
+ ins_ids.resize(1);
+ ins_ids[0] = _fragment_instance_id;
+ }
protected:
Status _create_sink(int sender_id, const TDataSink& t_data_sink,
RuntimeState* state);
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
index e21a004c7a..a0970b166e 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
@@ -69,7 +69,7 @@ public:
~PipelineXFragmentContext() override;
- void instance_ids(std::vector<TUniqueId>& ins_ids) const {
+ void instance_ids(std::vector<TUniqueId>& ins_ids) const override {
ins_ids.resize(_runtime_states.size());
for (size_t i = 0; i < _runtime_states.size(); i++) {
ins_ids[i] = _runtime_states[i]->fragment_instance_id();
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index fdb807c43d..703019a39b 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -344,7 +344,7 @@ void
FragmentMgr::_exec_actual(std::shared_ptr<PlanFragmentExecutor> fragment_ex
bool all_done = false;
if (query_ctx != nullptr) {
// decrease the number of unfinished fragments
- all_done = query_ctx->countdown();
+ all_done = query_ctx->countdown(1);
}
// remove exec state after this fragment finished
@@ -455,18 +455,6 @@ void FragmentMgr::remove_pipeline_context(
std::lock_guard<std::mutex> lock(_lock);
auto query_id = f_context->get_query_id();
auto* q_context = f_context->get_query_context();
- bool all_done = q_context->countdown();
- _pipeline_map.erase(f_context->get_fragment_instance_id());
- if (all_done) {
- _query_ctx_map.erase(query_id);
- }
-}
-
-void FragmentMgr::remove_pipeline_context(
- std::shared_ptr<pipeline::PipelineXFragmentContext> f_context) {
- std::lock_guard<std::mutex> lock(_lock);
- auto query_id = f_context->get_query_id();
- auto* q_context = f_context->get_query_context();
std::vector<TUniqueId> ins_ids;
f_context->instance_ids(ins_ids);
bool all_done = q_context->countdown(ins_ids.size());
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 367661dda4..0cf5cf2d58 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -83,9 +83,6 @@ public:
void remove_pipeline_context(
std::shared_ptr<pipeline::PipelineFragmentContext>
pipeline_context);
- void remove_pipeline_context(
- std::shared_ptr<pipeline::PipelineXFragmentContext>
pipeline_context);
-
// TODO(zc): report this is over
Status exec_plan_fragment(const TExecPlanFragmentParams& params, const
FinishCallback& cb);
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 7882b21c8d..c27c517ac1 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -95,9 +95,7 @@ public:
// Notice. For load fragments, the fragment_num sent by FE has a small
probability of 0.
// this may be a bug, bug <= 1 in theory it shouldn't cause any problems
at this stage.
- bool countdown() { return countdown(1); }
-
- bool countdown(int delta) { return fragment_num.fetch_sub(delta) <= 1; }
+ bool countdown(int instance_num) { return
fragment_num.fetch_sub(instance_num) <= 1; }
ExecEnv* exec_env() { return _exec_env; }
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]