zhiqiang-hhhh commented on code in PR #23281:
URL: https://github.com/apache/doris/pull/23281#discussion_r1300049147


##########
be/src/runtime/fragment_mgr.cpp:
##########
@@ -881,178 +659,89 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
 
     std::shared_ptr<QueryContext> query_ctx;
     RETURN_IF_ERROR(_get_query_ctx(params, params.query_id, true, query_ctx));
+    auto pre_and_submit = [&](int i) {
+        const auto& local_params = params.local_params[i];
+
+        const TUniqueId& fragment_instance_id = 
local_params.fragment_instance_id;
+        {
+            std::lock_guard<std::mutex> lock(_lock);
+            auto iter = _pipeline_map.find(fragment_instance_id);
+            if (iter != _pipeline_map.end()) {
+                // Duplicated
+                return Status::OK();
+            }
+            query_ctx->fragment_ids.push_back(fragment_instance_id);
+        }
+        START_AND_SCOPE_SPAN(tracer, span, "exec_instance");
+        span->SetAttribute("instance_id", print_id(fragment_instance_id));
 
-    const bool enable_pipeline_x = 
params.query_options.__isset.enable_pipeline_x_engine &&
-                                   
params.query_options.enable_pipeline_x_engine;
-    if (enable_pipeline_x) {
         int64_t duration_ns = 0;
+        if (!params.__isset.need_wait_execution_trigger || 
!params.need_wait_execution_trigger) {
+            query_ctx->set_ready_to_execute_only();
+        }
+        _setup_shared_hashtable_for_broadcast_join(params, local_params, 
query_ctx.get());
         std::shared_ptr<pipeline::PipelineFragmentContext> context =
-                std::make_shared<pipeline::PipelineXFragmentContext>(
-                        query_ctx->query_id, params.fragment_id, query_ctx, 
_exec_env, cb,
+                std::make_shared<pipeline::PipelineFragmentContext>(
+                        query_ctx->query_id(), fragment_instance_id, 
params.fragment_id,
+                        local_params.backend_num, query_ctx, _exec_env, cb,
                         
std::bind<void>(std::mem_fn(&FragmentMgr::coordinator_callback), this,
                                         std::placeholders::_1));
         {
             SCOPED_RAW_TIMER(&duration_ns);
-            auto prepare_st = context->prepare(params);
+            auto prepare_st = context->prepare(params, i);
             if (!prepare_st.ok()) {
                 context->close_if_prepare_failed();
                 return prepare_st;
             }
         }
         g_fragmentmgr_prepare_latency << (duration_ns / 1000);
 
-        //        std::shared_ptr<RuntimeFilterMergeControllerEntity> handler;
-        //        _runtimefilter_controller.add_entity(params, local_params, 
&handler,
-        //                                             
context->get_runtime_state());
-        //        context->set_merge_controller_handler(handler);
-
-        for (size_t i = 0; i < params.local_params.size(); i++) {
-            const TUniqueId& fragment_instance_id = 
params.local_params[i].fragment_instance_id;
-            {
-                std::lock_guard<std::mutex> lock(_lock);
-                auto iter = _pipeline_map.find(fragment_instance_id);
-                if (iter != _pipeline_map.end()) {
-                    // Duplicated
-                    return Status::OK();
-                }
-                query_ctx->fragment_ids.push_back(fragment_instance_id);
-            }
-            START_AND_SCOPE_SPAN(tracer, span, "exec_instance");
-            span->SetAttribute("instance_id", print_id(fragment_instance_id));
-
-            std::shared_ptr<FragmentExecState> exec_state(new 
FragmentExecState(
-                    query_ctx->query_id, fragment_instance_id, 
params.local_params[i].backend_num,
-                    _exec_env, query_ctx,
-                    
std::bind<void>(std::mem_fn(&FragmentMgr::coordinator_callback), this,
-                                    std::placeholders::_1)));
-            if (params.__isset.need_wait_execution_trigger && 
params.need_wait_execution_trigger) {
-                // set need_wait_execution_trigger means this instance will 
not actually being executed
-                // until the execPlanFragmentStart RPC trigger to start it.
-                exec_state->set_need_wait_execution_trigger();
-            }
+        std::shared_ptr<RuntimeFilterMergeControllerEntity> handler;
+        _runtimefilter_controller.add_entity(params, local_params, &handler,
+                                             context->get_runtime_state());
+        context->set_merge_controller_handler(handler);
 
-            if (!params.__isset.need_wait_execution_trigger ||
-                !params.need_wait_execution_trigger) {
-                query_ctx->set_ready_to_execute_only();
-            }
-            _setup_shared_hashtable_for_broadcast_join(params, 
params.local_params[i],
-                                                       
exec_state->executor()->runtime_state(),
-                                                       query_ctx.get());
-        }
         {
             std::lock_guard<std::mutex> lock(_lock);
-            std::vector<TUniqueId> ins_ids;
-            
reinterpret_cast<pipeline::PipelineXFragmentContext*>(context.get())
-                    ->instance_ids(ins_ids);
-            // TODO: simplify this mapping
-            for (const auto& ins_id : ins_ids) {
-                _pipeline_map.insert({ins_id, context});
-            }
-
+            _pipeline_map.insert(std::make_pair(fragment_instance_id, 
context));
             _cv.notify_all();
         }
 
-        RETURN_IF_ERROR(context->submit());
-        return Status::OK();
-    } else {
-        auto pre_and_submit = [&](int i) {
-            const auto& local_params = params.local_params[i];
-
-            const TUniqueId& fragment_instance_id = 
local_params.fragment_instance_id;
-            {
-                std::lock_guard<std::mutex> lock(_lock);
-                auto iter = _pipeline_map.find(fragment_instance_id);
-                if (iter != _pipeline_map.end()) {
-                    // Duplicated
-                    return Status::OK();
+        return context->submit();
+    };
+
+    int target_size = params.local_params.size();

Review Comment:
   Maybe target_size should be renamed to instance_num ?



##########
be/src/runtime/fragment_mgr.cpp:
##########
@@ -519,11 +316,11 @@ void FragmentMgr::coordinator_callback(const 
ReportStatusRequest& req) {
 
 static void empty_function(RuntimeState*, Status*) {}
 
-void FragmentMgr::_exec_actual(std::shared_ptr<FragmentExecState> exec_state,
+void FragmentMgr::_exec_actual(std::shared_ptr<PlanFragmentExecutor> 
exec_state,

Review Comment:
   exec_state -> executor



##########
be/src/runtime/fragment_mgr.cpp:
##########
@@ -809,53 +588,52 @@ Status FragmentMgr::exec_plan_fragment(const 
TExecPlanFragmentParams& params,
         }
     }
 
-    std::shared_ptr<FragmentExecState> exec_state;
     std::shared_ptr<QueryContext> query_ctx;
     bool pipeline_engine_enabled = 
params.query_options.__isset.enable_pipeline_engine &&
                                    params.query_options.enable_pipeline_engine;
     RETURN_IF_ERROR(
             _get_query_ctx(params, params.params.query_id, 
pipeline_engine_enabled, query_ctx));
     query_ctx->fragment_ids.push_back(fragment_instance_id);
 
-    exec_state.reset(
-            new FragmentExecState(query_ctx->query_id, 
params.params.fragment_instance_id,
-                                  params.backend_num, _exec_env, query_ctx,
-                                  
std::bind<void>(std::mem_fn(&FragmentMgr::coordinator_callback),
-                                                  this, 
std::placeholders::_1)));
+    auto fragment_executor = std::make_shared<PlanFragmentExecutor>(
+            _exec_env, query_ctx, params.params.fragment_instance_id, -1, 
params.backend_num,
+            std::bind<void>(std::mem_fn(&FragmentMgr::coordinator_callback), 
this,
+                            std::placeholders::_1));
     if (params.__isset.need_wait_execution_trigger && 
params.need_wait_execution_trigger) {
         // set need_wait_execution_trigger means this instance will not 
actually being executed
         // until the execPlanFragmentStart RPC trigger to start it.
-        exec_state->set_need_wait_execution_trigger();
+        fragment_executor->set_need_wait_execution_trigger();
     }
 
     int64_t duration_ns = 0;
     DCHECK(!pipeline_engine_enabled);
     {
         SCOPED_RAW_TIMER(&duration_ns);
-        RETURN_IF_ERROR(exec_state->prepare(params));
+        RETURN_IF_ERROR(fragment_executor->prepare(params));
     }
     g_fragmentmgr_prepare_latency << (duration_ns / 1000);
     std::shared_ptr<RuntimeFilterMergeControllerEntity> handler;
-    _runtimefilter_controller.add_entity(params, &handler, 
exec_state->executor()->runtime_state());
-    exec_state->set_merge_controller_handler(handler);
+    _runtimefilter_controller.add_entity(params, &handler, 
fragment_executor->runtime_state());

Review Comment:
   add_entiry has status as return value and is not checked. We need a todo 
here.



##########
be/src/runtime/fragment_mgr.cpp:
##########
@@ -881,178 +659,89 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
 
     std::shared_ptr<QueryContext> query_ctx;
     RETURN_IF_ERROR(_get_query_ctx(params, params.query_id, true, query_ctx));
+    auto pre_and_submit = [&](int i) {
+        const auto& local_params = params.local_params[i];
+
+        const TUniqueId& fragment_instance_id = 
local_params.fragment_instance_id;
+        {
+            std::lock_guard<std::mutex> lock(_lock);
+            auto iter = _pipeline_map.find(fragment_instance_id);
+            if (iter != _pipeline_map.end()) {
+                // Duplicated
+                return Status::OK();
+            }
+            query_ctx->fragment_ids.push_back(fragment_instance_id);
+        }
+        START_AND_SCOPE_SPAN(tracer, span, "exec_instance");
+        span->SetAttribute("instance_id", print_id(fragment_instance_id));
 
-    const bool enable_pipeline_x = 
params.query_options.__isset.enable_pipeline_x_engine &&
-                                   
params.query_options.enable_pipeline_x_engine;
-    if (enable_pipeline_x) {
         int64_t duration_ns = 0;
+        if (!params.__isset.need_wait_execution_trigger || 
!params.need_wait_execution_trigger) {
+            query_ctx->set_ready_to_execute_only();
+        }
+        _setup_shared_hashtable_for_broadcast_join(params, local_params, 
query_ctx.get());
         std::shared_ptr<pipeline::PipelineFragmentContext> context =
-                std::make_shared<pipeline::PipelineXFragmentContext>(
-                        query_ctx->query_id, params.fragment_id, query_ctx, 
_exec_env, cb,
+                std::make_shared<pipeline::PipelineFragmentContext>(
+                        query_ctx->query_id(), fragment_instance_id, 
params.fragment_id,
+                        local_params.backend_num, query_ctx, _exec_env, cb,
                         
std::bind<void>(std::mem_fn(&FragmentMgr::coordinator_callback), this,
                                         std::placeholders::_1));
         {
             SCOPED_RAW_TIMER(&duration_ns);
-            auto prepare_st = context->prepare(params);
+            auto prepare_st = context->prepare(params, i);
             if (!prepare_st.ok()) {
                 context->close_if_prepare_failed();
                 return prepare_st;
             }
         }
         g_fragmentmgr_prepare_latency << (duration_ns / 1000);
 
-        //        std::shared_ptr<RuntimeFilterMergeControllerEntity> handler;
-        //        _runtimefilter_controller.add_entity(params, local_params, 
&handler,
-        //                                             
context->get_runtime_state());
-        //        context->set_merge_controller_handler(handler);
-
-        for (size_t i = 0; i < params.local_params.size(); i++) {
-            const TUniqueId& fragment_instance_id = 
params.local_params[i].fragment_instance_id;
-            {
-                std::lock_guard<std::mutex> lock(_lock);
-                auto iter = _pipeline_map.find(fragment_instance_id);
-                if (iter != _pipeline_map.end()) {
-                    // Duplicated
-                    return Status::OK();
-                }
-                query_ctx->fragment_ids.push_back(fragment_instance_id);
-            }
-            START_AND_SCOPE_SPAN(tracer, span, "exec_instance");
-            span->SetAttribute("instance_id", print_id(fragment_instance_id));
-
-            std::shared_ptr<FragmentExecState> exec_state(new 
FragmentExecState(
-                    query_ctx->query_id, fragment_instance_id, 
params.local_params[i].backend_num,
-                    _exec_env, query_ctx,
-                    
std::bind<void>(std::mem_fn(&FragmentMgr::coordinator_callback), this,
-                                    std::placeholders::_1)));
-            if (params.__isset.need_wait_execution_trigger && 
params.need_wait_execution_trigger) {
-                // set need_wait_execution_trigger means this instance will 
not actually being executed
-                // until the execPlanFragmentStart RPC trigger to start it.
-                exec_state->set_need_wait_execution_trigger();
-            }
+        std::shared_ptr<RuntimeFilterMergeControllerEntity> handler;
+        _runtimefilter_controller.add_entity(params, local_params, &handler,

Review Comment:
   TODO: Check return status.



##########
be/src/runtime/fragment_mgr.cpp:
##########
@@ -881,178 +659,89 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
 
     std::shared_ptr<QueryContext> query_ctx;
     RETURN_IF_ERROR(_get_query_ctx(params, params.query_id, true, query_ctx));
+    auto pre_and_submit = [&](int i) {
+        const auto& local_params = params.local_params[i];
+
+        const TUniqueId& fragment_instance_id = 
local_params.fragment_instance_id;
+        {
+            std::lock_guard<std::mutex> lock(_lock);
+            auto iter = _pipeline_map.find(fragment_instance_id);
+            if (iter != _pipeline_map.end()) {
+                // Duplicated
+                return Status::OK();
+            }
+            query_ctx->fragment_ids.push_back(fragment_instance_id);
+        }
+        START_AND_SCOPE_SPAN(tracer, span, "exec_instance");
+        span->SetAttribute("instance_id", print_id(fragment_instance_id));
 
-    const bool enable_pipeline_x = 
params.query_options.__isset.enable_pipeline_x_engine &&
-                                   
params.query_options.enable_pipeline_x_engine;
-    if (enable_pipeline_x) {
         int64_t duration_ns = 0;
+        if (!params.__isset.need_wait_execution_trigger || 
!params.need_wait_execution_trigger) {
+            query_ctx->set_ready_to_execute_only();

Review Comment:
   The clause should be move outside of the pre_and_submit lambda function. 
Since it should be called only once for a query.



##########
be/src/runtime/plan_fragment_executor.cpp:
##########
@@ -72,9 +76,17 @@ namespace doris {
 using namespace ErrorCode;
 
 PlanFragmentExecutor::PlanFragmentExecutor(ExecEnv* exec_env,
+                                           std::shared_ptr<QueryContext> 
query_ctx,
+                                           const TUniqueId& instance_id, int 
fragment_id,
+                                           int backend_num,
                                            const report_status_callback& 
report_status_cb)
         : _exec_env(exec_env),
           _plan(nullptr),
+          _query_ctx(query_ctx),
+          _query_id(query_ctx->query_id()),

Review Comment:
   Seems like this variable is not necessary.



##########
be/src/runtime/plan_fragment_executor.cpp:
##########
@@ -488,24 +561,39 @@ void PlanFragmentExecutor::stop_report_thread() {
 }
 
 void PlanFragmentExecutor::cancel(const PPlanFragmentCancelReason& reason, 
const std::string& msg) {
+    std::lock_guard<std::mutex> l(_status_lock);
     LOG_INFO("PlanFragmentExecutor::cancel")
             .tag("query_id", _query_id)

Review Comment:
   will print non hunman readable query_id



##########
be/src/runtime/plan_fragment_executor.cpp:
##########
@@ -100,32 +113,31 @@ PlanFragmentExecutor::~PlanFragmentExecutor() {
     DCHECK(!_report_thread_active);
 }
 
-Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request,
-                                     QueryContext* query_ctx) {
+Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request) {
     OpentelemetryTracer tracer = telemetry::get_noop_tracer();
     if 
(opentelemetry::trace::Tracer::GetCurrentSpan()->GetContext().IsValid()) {
         tracer = telemetry::get_tracer(print_id(_query_id));
     }
     _span = tracer->StartSpan("Plan_fragment_executor");
     OpentelemetryScope scope {_span};
 
-    const TPlanFragmentExecParams& params = request.params;
-    _query_id = params.query_id;
+    if (request.__isset.query_options) {
+        _timeout_second = request.query_options.execution_timeout;
+    }
 
+    const TPlanFragmentExecParams& params = request.params;
     LOG_INFO("PlanFragmentExecutor::prepare")
             .tag("query_id", _query_id)
             .tag("instance_id", params.fragment_instance_id)

Review Comment:
   fragment_instance_id will not be printed corretly, since it is a thrift 
type, not a pod type.



##########
be/src/runtime/plan_fragment_executor.cpp:
##########
@@ -100,32 +113,31 @@ PlanFragmentExecutor::~PlanFragmentExecutor() {
     DCHECK(!_report_thread_active);
 }
 
-Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request,
-                                     QueryContext* query_ctx) {
+Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request) {
     OpentelemetryTracer tracer = telemetry::get_noop_tracer();
     if 
(opentelemetry::trace::Tracer::GetCurrentSpan()->GetContext().IsValid()) {
         tracer = telemetry::get_tracer(print_id(_query_id));
     }
     _span = tracer->StartSpan("Plan_fragment_executor");
     OpentelemetryScope scope {_span};
 
-    const TPlanFragmentExecParams& params = request.params;
-    _query_id = params.query_id;
+    if (request.__isset.query_options) {
+        _timeout_second = request.query_options.execution_timeout;
+    }
 
+    const TPlanFragmentExecParams& params = request.params;
     LOG_INFO("PlanFragmentExecutor::prepare")
             .tag("query_id", _query_id)

Review Comment:
   _query_id  -> printId(query_ctx->query_id())



##########
be/src/runtime/fragment_mgr.cpp:
##########
@@ -881,178 +659,89 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
 
     std::shared_ptr<QueryContext> query_ctx;
     RETURN_IF_ERROR(_get_query_ctx(params, params.query_id, true, query_ctx));
+    auto pre_and_submit = [&](int i) {
+        const auto& local_params = params.local_params[i];
+
+        const TUniqueId& fragment_instance_id = 
local_params.fragment_instance_id;
+        {
+            std::lock_guard<std::mutex> lock(_lock);
+            auto iter = _pipeline_map.find(fragment_instance_id);
+            if (iter != _pipeline_map.end()) {
+                // Duplicated
+                return Status::OK();
+            }
+            query_ctx->fragment_ids.push_back(fragment_instance_id);
+        }
+        START_AND_SCOPE_SPAN(tracer, span, "exec_instance");
+        span->SetAttribute("instance_id", print_id(fragment_instance_id));
 
-    const bool enable_pipeline_x = 
params.query_options.__isset.enable_pipeline_x_engine &&
-                                   
params.query_options.enable_pipeline_x_engine;
-    if (enable_pipeline_x) {

Review Comment:
   Code branch of this prediction is removed ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to