This is an automated email from the ASF dual-hosted git repository.

yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d0ee101  [refactor] (runtime)tidy up the plan_fragment_executor codes 
(#8110)
d0ee101 is described below

commit d0ee101c2f89cf39c09b80daa2a67eecd42443e0
Author: zuochunwei <[email protected]>
AuthorDate: Tue Feb 22 09:20:27 2022 +0800

    [refactor] (runtime)tidy up the plan_fragment_executor codes (#8110)
    
    Co-authored-by: zuochunwei <[email protected]>
---
 be/src/runtime/plan_fragment_executor.cpp | 23 +++++++++++------------
 be/src/runtime/plan_fragment_executor.h   |  4 ++--
 2 files changed, 13 insertions(+), 14 deletions(-)

diff --git a/be/src/runtime/plan_fragment_executor.cpp 
b/be/src/runtime/plan_fragment_executor.cpp
index 021b05f..3966c8d 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -62,9 +62,7 @@ PlanFragmentExecutor::PlanFragmentExecutor(ExecEnv* exec_env,
           _collect_query_statistics_with_every_batch(false) {}
 
 PlanFragmentExecutor::~PlanFragmentExecutor() {
-    // if (_prepared) {
     close();
-    // }
     // at this point, the report thread should have been stopped
     DCHECK(!_report_thread_active);
 }
@@ -198,7 +196,6 @@ Status PlanFragmentExecutor::prepare(const 
TExecPlanFragmentParams& request,
         RETURN_IF_ERROR(_sink->prepare(runtime_state()));
 
         RuntimeProfile* sink_profile = _sink->profile();
-
         if (sink_profile != nullptr) {
             profile()->add_child(sink_profile, true, nullptr);
         }
@@ -279,8 +276,10 @@ Status PlanFragmentExecutor::open_vectorized_internal() {
         SCOPED_CPU_TIMER(_fragment_cpu_timer);
         RETURN_IF_ERROR(_sink->open(runtime_state()));
     }
-    doris::vectorized::Block* block = nullptr;
+
     while (true) {
+        doris::vectorized::Block* block;
+
         {
             SCOPED_CPU_TIMER(_fragment_cpu_timer);
             RETURN_IF_ERROR(get_vectorized_internal(&block));
@@ -331,11 +330,10 @@ Status 
PlanFragmentExecutor::get_vectorized_internal(::doris::vectorized::Block*
         return Status::OK();
     }
 
-    auto vexec_node = static_cast<doris::ExecNode*>(_plan);
     while (!_done) {
-        
_block->clear_column_data(vexec_node->row_desc().num_materialized_slots());
+        _block->clear_column_data(_plan->row_desc().num_materialized_slots());
         SCOPED_TIMER(profile()->total_time_counter());
-        RETURN_IF_ERROR(vexec_node->get_next(_runtime_state.get(), 
_block.get(), &_done));
+        RETURN_IF_ERROR(_plan->get_next(_runtime_state.get(), _block.get(), 
&_done));
 
         if (_block->rows() > 0) {
             COUNTER_UPDATE(_rows_produced_counter, _block->rows());
@@ -473,8 +471,7 @@ void PlanFragmentExecutor::report_profile() {
             // two cases (e.g. there is a race here where the wait timed out 
but before grabbing
             // the lock, the condition variable was signaled).  Instead, we 
will use an external
             // flag, _report_thread_active, to coordinate this.
-            _stop_report_thread_cv.wait_for(l,
-                                            
std::chrono::seconds(config::status_report_interval));
+            _stop_report_thread_cv.wait_for(l, 
std::chrono::seconds(config::status_report_interval));
         } else {
             LOG(WARNING) << "config::status_report_interval is equal to or 
less than zero, exiting "
                             "reporting thread.";
@@ -617,11 +614,13 @@ void PlanFragmentExecutor::cancel() {
     _runtime_state->set_is_cancelled(true);
 
     // must close stream_mgr to avoid dead lock in Exchange Node
+    auto env = _runtime_state->exec_env();
+    auto id = _runtime_state->fragment_instance_id();
     if (_runtime_state->enable_vectorized_exec()) {
-        
_runtime_state->exec_env()->vstream_mgr()->cancel(_runtime_state->fragment_instance_id());
+        env->vstream_mgr()->cancel(id);
     } else {
-        
_runtime_state->exec_env()->stream_mgr()->cancel(_runtime_state->fragment_instance_id());
-        
_runtime_state->exec_env()->result_mgr()->cancel(_runtime_state->fragment_instance_id());
+        env->stream_mgr()->cancel(id);
+        env->result_mgr()->cancel(id);
     }
 }
 
diff --git a/be/src/runtime/plan_fragment_executor.h 
b/be/src/runtime/plan_fragment_executor.h
index 3cdb6bb..874daa6 100644
--- a/be/src/runtime/plan_fragment_executor.h
+++ b/be/src/runtime/plan_fragment_executor.h
@@ -138,7 +138,7 @@ public:
 
     const Status& status() const { return _status; }
 
-    DataSink* get_sink() { return _sink.get(); }
+    DataSink* get_sink() const { return _sink.get(); }
 
     void set_is_report_on_cancel(bool val) { _is_report_on_cancel = val; }
 
@@ -245,7 +245,7 @@ private:
     // Idempotent.
     void stop_report_thread();
 
-    const DescriptorTbl& desc_tbl() { return _runtime_state->desc_tbl(); }
+    const DescriptorTbl& desc_tbl() const { return _runtime_state->desc_tbl(); 
}
 
     void _collect_query_statistics();
 

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to