HappenLee commented on code in PR #16383:
URL: https://github.com/apache/doris/pull/16383#discussion_r1107198204


##########
be/src/runtime/fragment_mgr.cpp:
##########
@@ -777,6 +782,157 @@ Status FragmentMgr::exec_plan_fragment(const 
TExecPlanFragmentParams& params, Fi
     return Status::OK();
 }
 
+Status FragmentMgr::exec_plan_fragment(const TPipelineParams& params, 
FinishCallback cb) {
+    auto tracer = telemetry::is_current_span_valid() ? 
telemetry::get_tracer("tracer")
+                                                     : 
telemetry::get_noop_tracer();
+    VLOG_ROW << "exec_plan_fragment params is "
+             << apache::thrift::ThriftDebugString(params).c_str();
+    // sometimes TExecPlanFragmentParams debug string is too long and glog
+    // will truncate the log line, so print query options seperately for 
debuggin purpose
+    VLOG_ROW << "query options is "
+             << 
apache::thrift::ThriftDebugString(params.query_options).c_str();
+    START_AND_SCOPE_SPAN(tracer, span, "FragmentMgr::exec_plan_fragment");
+
+    std::shared_ptr<FragmentExecState> exec_state;
+    std::shared_ptr<QueryFragmentsCtx> fragments_ctx;
+    if (params.is_simplified_param) {
+        // Get common components from _fragments_ctx_map
+        std::lock_guard<std::mutex> lock(_lock);
+        auto search = _fragments_ctx_map.find(params.query_id);
+        if (search == _fragments_ctx_map.end()) {
+            return Status::InternalError(
+                    "Failed to get query fragments context. Query may be "
+                    "timeout or be cancelled. host: {}",
+                    BackendOptions::get_localhost());
+        }
+        fragments_ctx = search->second;
+    } else {
+        // This may be a first fragment request of the query.
+        // Create the query fragments context.
+        fragments_ctx.reset(new QueryFragmentsCtx(params.fragment_num_on_host, 
_exec_env));
+        fragments_ctx->query_id = params.query_id;
+        RETURN_IF_ERROR(DescriptorTbl::create(&(fragments_ctx->obj_pool), 
params.desc_tbl,
+                                              &(fragments_ctx->desc_tbl)));
+        fragments_ctx->coord_addr = params.coord;
+        LOG(INFO) << "query_id: "
+                  << UniqueId(fragments_ctx->query_id.hi, 
fragments_ctx->query_id.lo)
+                  << " coord_addr " << fragments_ctx->coord_addr
+                  << " total fragment num on current host: " << 
params.fragment_num_on_host;
+        fragments_ctx->query_globals = params.query_globals;
+
+        if (params.__isset.resource_info) {
+            fragments_ctx->user = params.resource_info.user;
+            fragments_ctx->group = params.resource_info.group;
+            fragments_ctx->set_rsc_info = true;
+        }
+
+        
fragments_ctx->get_shared_hash_table_controller()->set_pipeline_engine_enabled(true);
+        fragments_ctx->timeout_second = params.query_options.query_timeout;
+        _set_scan_concurrency(params, fragments_ctx.get());
+
+        bool has_query_mem_tracker =
+                params.query_options.__isset.mem_limit && 
(params.query_options.mem_limit > 0);
+        int64_t bytes_limit = has_query_mem_tracker ? 
params.query_options.mem_limit : -1;
+        if (bytes_limit > MemInfo::mem_limit()) {
+            VLOG_NOTICE << "Query memory limit " << 
PrettyPrinter::print(bytes_limit, TUnit::BYTES)
+                        << " exceeds process memory limit of "
+                        << PrettyPrinter::print(MemInfo::mem_limit(), 
TUnit::BYTES)
+                        << ". Using process memory limit instead";
+            bytes_limit = MemInfo::mem_limit();
+        }
+        if (params.query_options.query_type == TQueryType::SELECT) {
+            fragments_ctx->query_mem_tracker = 
std::make_shared<MemTrackerLimiter>(
+                    MemTrackerLimiter::Type::QUERY,
+                    fmt::format("Query#Id={}", 
print_id(fragments_ctx->query_id)), bytes_limit);
+        } else if (params.query_options.query_type == TQueryType::LOAD) {
+            fragments_ctx->query_mem_tracker = 
std::make_shared<MemTrackerLimiter>(
+                    MemTrackerLimiter::Type::LOAD,
+                    fmt::format("Load#Id={}", 
print_id(fragments_ctx->query_id)), bytes_limit);
+        } else { // EXTERNAL
+            fragments_ctx->query_mem_tracker = 
std::make_shared<MemTrackerLimiter>(
+                    MemTrackerLimiter::Type::LOAD,
+                    fmt::format("External#Id={}", 
print_id(fragments_ctx->query_id)), bytes_limit);
+        }
+        if (params.query_options.__isset.is_report_success &&
+            params.query_options.is_report_success) {
+            fragments_ctx->query_mem_tracker->enable_print_log_usage();
+        }
+        {
+            // Find _fragments_ctx_map again, in case some other request has 
already
+            // create the query fragments context.
+            std::lock_guard<std::mutex> lock(_lock);
+            auto search = _fragments_ctx_map.find(params.query_id);
+            if (search == _fragments_ctx_map.end()) {
+                
_fragments_ctx_map.insert(std::make_pair(fragments_ctx->query_id, 
fragments_ctx));
+                LOG(INFO) << "Register query/load memory tracker, query/load 
id: "
+                          << print_id(fragments_ctx->query_id)
+                          << " limit: " << PrettyPrinter::print(bytes_limit, 
TUnit::BYTES);
+            } else {
+                // Already has a query fragments context, use it
+                fragments_ctx = search->second;
+            }
+        }
+    }
+
+    for (size_t i = 0; i < params.local_params.size(); i++) {
+        TPipelineLocalParams local_params = params.local_params[i];

Review Comment:
   same to up



##########
be/src/runtime/fragment_mgr.cpp:
##########
@@ -777,6 +782,157 @@ Status FragmentMgr::exec_plan_fragment(const 
TExecPlanFragmentParams& params, Fi
     return Status::OK();
 }
 
+Status FragmentMgr::exec_plan_fragment(const TPipelineParams& params, 
FinishCallback cb) {
+    auto tracer = telemetry::is_current_span_valid() ? 
telemetry::get_tracer("tracer")
+                                                     : 
telemetry::get_noop_tracer();
+    VLOG_ROW << "exec_plan_fragment params is "
+             << apache::thrift::ThriftDebugString(params).c_str();
+    // sometimes TExecPlanFragmentParams debug string is too long and glog
+    // will truncate the log line, so print query options seperately for 
debuggin purpose
+    VLOG_ROW << "query options is "
+             << 
apache::thrift::ThriftDebugString(params.query_options).c_str();
+    START_AND_SCOPE_SPAN(tracer, span, "FragmentMgr::exec_plan_fragment");
+
+    std::shared_ptr<FragmentExecState> exec_state;
+    std::shared_ptr<QueryFragmentsCtx> fragments_ctx;
+    if (params.is_simplified_param) {
+        // Get common components from _fragments_ctx_map
+        std::lock_guard<std::mutex> lock(_lock);
+        auto search = _fragments_ctx_map.find(params.query_id);
+        if (search == _fragments_ctx_map.end()) {
+            return Status::InternalError(
+                    "Failed to get query fragments context. Query may be "
+                    "timeout or be cancelled. host: {}",
+                    BackendOptions::get_localhost());
+        }
+        fragments_ctx = search->second;
+    } else {
+        // This may be a first fragment request of the query.
+        // Create the query fragments context.
+        fragments_ctx.reset(new QueryFragmentsCtx(params.fragment_num_on_host, 
_exec_env));
+        fragments_ctx->query_id = params.query_id;
+        RETURN_IF_ERROR(DescriptorTbl::create(&(fragments_ctx->obj_pool), 
params.desc_tbl,
+                                              &(fragments_ctx->desc_tbl)));
+        fragments_ctx->coord_addr = params.coord;
+        LOG(INFO) << "query_id: "
+                  << UniqueId(fragments_ctx->query_id.hi, 
fragments_ctx->query_id.lo)
+                  << " coord_addr " << fragments_ctx->coord_addr
+                  << " total fragment num on current host: " << 
params.fragment_num_on_host;
+        fragments_ctx->query_globals = params.query_globals;
+
+        if (params.__isset.resource_info) {
+            fragments_ctx->user = params.resource_info.user;
+            fragments_ctx->group = params.resource_info.group;
+            fragments_ctx->set_rsc_info = true;
+        }
+
+        
fragments_ctx->get_shared_hash_table_controller()->set_pipeline_engine_enabled(true);
+        fragments_ctx->timeout_second = params.query_options.query_timeout;
+        _set_scan_concurrency(params, fragments_ctx.get());
+
+        bool has_query_mem_tracker =
+                params.query_options.__isset.mem_limit && 
(params.query_options.mem_limit > 0);
+        int64_t bytes_limit = has_query_mem_tracker ? 
params.query_options.mem_limit : -1;
+        if (bytes_limit > MemInfo::mem_limit()) {
+            VLOG_NOTICE << "Query memory limit " << 
PrettyPrinter::print(bytes_limit, TUnit::BYTES)
+                        << " exceeds process memory limit of "
+                        << PrettyPrinter::print(MemInfo::mem_limit(), 
TUnit::BYTES)
+                        << ". Using process memory limit instead";
+            bytes_limit = MemInfo::mem_limit();
+        }
+        if (params.query_options.query_type == TQueryType::SELECT) {
+            fragments_ctx->query_mem_tracker = 
std::make_shared<MemTrackerLimiter>(
+                    MemTrackerLimiter::Type::QUERY,
+                    fmt::format("Query#Id={}", 
print_id(fragments_ctx->query_id)), bytes_limit);
+        } else if (params.query_options.query_type == TQueryType::LOAD) {
+            fragments_ctx->query_mem_tracker = 
std::make_shared<MemTrackerLimiter>(
+                    MemTrackerLimiter::Type::LOAD,
+                    fmt::format("Load#Id={}", 
print_id(fragments_ctx->query_id)), bytes_limit);
+        } else { // EXTERNAL
+            fragments_ctx->query_mem_tracker = 
std::make_shared<MemTrackerLimiter>(
+                    MemTrackerLimiter::Type::LOAD,
+                    fmt::format("External#Id={}", 
print_id(fragments_ctx->query_id)), bytes_limit);
+        }
+        if (params.query_options.__isset.is_report_success &&
+            params.query_options.is_report_success) {
+            fragments_ctx->query_mem_tracker->enable_print_log_usage();
+        }
+        {
+            // Find _fragments_ctx_map again, in case some other request has 
already
+            // create the query fragments context.
+            std::lock_guard<std::mutex> lock(_lock);
+            auto search = _fragments_ctx_map.find(params.query_id);
+            if (search == _fragments_ctx_map.end()) {
+                
_fragments_ctx_map.insert(std::make_pair(fragments_ctx->query_id, 
fragments_ctx));
+                LOG(INFO) << "Register query/load memory tracker, query/load 
id: "
+                          << print_id(fragments_ctx->query_id)
+                          << " limit: " << PrettyPrinter::print(bytes_limit, 
TUnit::BYTES);
+            } else {
+                // Already has a query fragments context, use it
+                fragments_ctx = search->second;
+            }
+        }
+    }
+
+    for (size_t i = 0; i < params.local_params.size(); i++) {
+        TPipelineLocalParams local_params = params.local_params[i];

Review Comment:
   same to up



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to