This is an automated email from the ASF dual-hosted git repository.
yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new d0ee101 [refactor] (runtime)tidy up the plan_fragment_executor codes
(#8110)
d0ee101 is described below
commit d0ee101c2f89cf39c09b80daa2a67eecd42443e0
Author: zuochunwei <[email protected]>
AuthorDate: Tue Feb 22 09:20:27 2022 +0800
[refactor] (runtime)tidy up the plan_fragment_executor codes (#8110)
Co-authored-by: zuochunwei <[email protected]>
---
be/src/runtime/plan_fragment_executor.cpp | 23 +++++++++++------------
be/src/runtime/plan_fragment_executor.h | 4 ++--
2 files changed, 13 insertions(+), 14 deletions(-)
diff --git a/be/src/runtime/plan_fragment_executor.cpp
b/be/src/runtime/plan_fragment_executor.cpp
index 021b05f..3966c8d 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -62,9 +62,7 @@ PlanFragmentExecutor::PlanFragmentExecutor(ExecEnv* exec_env,
_collect_query_statistics_with_every_batch(false) {}
PlanFragmentExecutor::~PlanFragmentExecutor() {
- // if (_prepared) {
close();
- // }
// at this point, the report thread should have been stopped
DCHECK(!_report_thread_active);
}
@@ -198,7 +196,6 @@ Status PlanFragmentExecutor::prepare(const
TExecPlanFragmentParams& request,
RETURN_IF_ERROR(_sink->prepare(runtime_state()));
RuntimeProfile* sink_profile = _sink->profile();
-
if (sink_profile != nullptr) {
profile()->add_child(sink_profile, true, nullptr);
}
@@ -279,8 +276,10 @@ Status PlanFragmentExecutor::open_vectorized_internal() {
SCOPED_CPU_TIMER(_fragment_cpu_timer);
RETURN_IF_ERROR(_sink->open(runtime_state()));
}
- doris::vectorized::Block* block = nullptr;
+
while (true) {
+ doris::vectorized::Block* block;
+
{
SCOPED_CPU_TIMER(_fragment_cpu_timer);
RETURN_IF_ERROR(get_vectorized_internal(&block));
@@ -331,11 +330,10 @@ Status
PlanFragmentExecutor::get_vectorized_internal(::doris::vectorized::Block*
return Status::OK();
}
- auto vexec_node = static_cast<doris::ExecNode*>(_plan);
while (!_done) {
-
_block->clear_column_data(vexec_node->row_desc().num_materialized_slots());
+ _block->clear_column_data(_plan->row_desc().num_materialized_slots());
SCOPED_TIMER(profile()->total_time_counter());
- RETURN_IF_ERROR(vexec_node->get_next(_runtime_state.get(),
_block.get(), &_done));
+ RETURN_IF_ERROR(_plan->get_next(_runtime_state.get(), _block.get(),
&_done));
if (_block->rows() > 0) {
COUNTER_UPDATE(_rows_produced_counter, _block->rows());
@@ -473,8 +471,7 @@ void PlanFragmentExecutor::report_profile() {
// two cases (e.g. there is a race here where the wait timed out
but before grabbing
// the lock, the condition variable was signaled). Instead, we
will use an external
// flag, _report_thread_active, to coordinate this.
- _stop_report_thread_cv.wait_for(l,
-
std::chrono::seconds(config::status_report_interval));
+ _stop_report_thread_cv.wait_for(l,
std::chrono::seconds(config::status_report_interval));
} else {
LOG(WARNING) << "config::status_report_interval is equal to or
less than zero, exiting "
"reporting thread.";
@@ -617,11 +614,13 @@ void PlanFragmentExecutor::cancel() {
_runtime_state->set_is_cancelled(true);
// must close stream_mgr to avoid dead lock in Exchange Node
+ auto env = _runtime_state->exec_env();
+ auto id = _runtime_state->fragment_instance_id();
if (_runtime_state->enable_vectorized_exec()) {
-
_runtime_state->exec_env()->vstream_mgr()->cancel(_runtime_state->fragment_instance_id());
+ env->vstream_mgr()->cancel(id);
} else {
-
_runtime_state->exec_env()->stream_mgr()->cancel(_runtime_state->fragment_instance_id());
-
_runtime_state->exec_env()->result_mgr()->cancel(_runtime_state->fragment_instance_id());
+ env->stream_mgr()->cancel(id);
+ env->result_mgr()->cancel(id);
}
}
diff --git a/be/src/runtime/plan_fragment_executor.h
b/be/src/runtime/plan_fragment_executor.h
index 3cdb6bb..874daa6 100644
--- a/be/src/runtime/plan_fragment_executor.h
+++ b/be/src/runtime/plan_fragment_executor.h
@@ -138,7 +138,7 @@ public:
const Status& status() const { return _status; }
- DataSink* get_sink() { return _sink.get(); }
+ DataSink* get_sink() const { return _sink.get(); }
void set_is_report_on_cancel(bool val) { _is_report_on_cancel = val; }
@@ -245,7 +245,7 @@ private:
// Idempotent.
void stop_report_thread();
- const DescriptorTbl& desc_tbl() { return _runtime_state->desc_tbl(); }
+ const DescriptorTbl& desc_tbl() const { return _runtime_state->desc_tbl();
}
void _collect_query_statistics();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]