This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new f1b57c4 [Optimize] Avoid repeated sending of common components in
Fragments (#4904)
f1b57c4 is described below
commit f1b57c4418e5cdf399a6d73214f5992c2e9ecce7
Author: Mingyu Chen <[email protected]>
AuthorDate: Sun Nov 22 20:38:05 2020 +0800
[Optimize] Avoid repeated sending of common components in Fragments (#4904)
This CL mainly changes:
1. Avoid repeated sending of common components in Fragments
In the previous implementation, a query may generate multiple Fragments,
these Fragments contain some common information, such as DescriptorTable.
Fragment will be sent to BE in a certain order, so these public information
will be sent repeatedly
and generated repeatedly on the BE side.
In some complex SQL, these public information may be very large,
thereby increasing the execution time of Fragment.
So I improved this. For multiple Fragments sent to the same BE, only
the first Fragment will carry
these public information, and it will be cached on the BE side, and
subsequent Fragments
no longer need to carry this information.
In the local test, the execution time of some complex SQL can be
reduced from 3 seconds to 1 second.
2. Add the time-consuming part of FE logic in Profile
Including SQL analysis, planning, Fragment scheduling and sending on
the FE side, and the time to fetch data.
---
be/src/olap/push_handler.cpp | 2 +-
be/src/runtime/fragment_mgr.cpp | 193 +++++++++++++++------
be/src/runtime/fragment_mgr.h | 10 +-
be/src/runtime/plan_fragment_executor.cpp | 28 +--
be/src/runtime/plan_fragment_executor.h | 60 ++++++-
be/src/runtime/runtime_state.cpp | 8 +-
be/src/runtime/runtime_state.h | 2 +-
be/src/runtime/test_env.cc | 2 +-
be/src/service/internal_service.cpp | 4 +-
be/test/runtime/buffered_block_mgr2_test.cpp | 2 -
be/test/runtime/fragment_mgr_test.cpp | 4 +-
.../doris/common/util/QueryPlannerProfile.java | 104 +++++++++++
.../apache/doris/common/util/RuntimeProfile.java | 4 +-
.../main/java/org/apache/doris/qe/Coordinator.java | 116 +++++++++----
.../java/org/apache/doris/qe/StmtExecutor.java | 32 +++-
.../org/apache/doris/rpc/BackendServiceProxy.java | 16 +-
.../java/org/apache/doris/rpc/PBackendService.java | 8 +-
gensrc/thrift/PaloInternalService.thrift | 11 ++
18 files changed, 465 insertions(+), 141 deletions(-)
diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp
index b1df06a..0ca7235 100644
--- a/be/src/olap/push_handler.cpp
+++ b/be/src/olap/push_handler.cpp
@@ -944,7 +944,7 @@ OLAPStatus PushBrokerReader::init(const Schema* schema,
fragment_params.protocol_version = PaloInternalServiceVersion::V1;
TQueryOptions query_options;
TQueryGlobals query_globals;
- _runtime_state.reset(new RuntimeState(fragment_params, query_options,
query_globals,
+ _runtime_state.reset(new RuntimeState(params, query_options, query_globals,
ExecEnv::GetInstance()));
DescriptorTbl* desc_tbl = NULL;
Status status = DescriptorTbl::create(_runtime_state->obj_pool(),
t_desc_tbl, &desc_tbl);
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index c2932e3..fa7ed31 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -71,15 +71,22 @@ using apache::thrift::TProcessor;
using apache::thrift::transport::TTransportException;
class RuntimeProfile;
-
class FragmentExecState {
public:
+ // Constructor by using QueryFragmentsCtx
+ FragmentExecState(
+ const TUniqueId& query_id,
+ const TUniqueId& instance_id,
+ int backend_num,
+ ExecEnv* exec_env,
+ std::shared_ptr<QueryFragmentsCtx> fragments_ctx);
+
FragmentExecState(
const TUniqueId& query_id,
const TUniqueId& instance_id,
int backend_num,
ExecEnv* exec_env,
- const TNetworkAddress& coord_hostport);
+ const TNetworkAddress& coord_addr);
~FragmentExecState();
@@ -135,6 +142,10 @@ public:
int get_timeout_second() const { return _timeout_second; }
+ std::shared_ptr<QueryFragmentsCtx> get_fragments_ctx() {
+ return _fragments_ctx;
+ }
+
private:
void coordinator_callback(const Status& status, RuntimeProfile* profile,
bool done);
@@ -160,6 +171,9 @@ private:
int _timeout_second;
std::unique_ptr<std::thread> _exec_thread;
+
+ // This context is shared by all fragments of this host in a query
+ std::shared_ptr<QueryFragmentsCtx> _fragments_ctx;
};
FragmentExecState::FragmentExecState(
@@ -167,6 +181,24 @@ FragmentExecState::FragmentExecState(
const TUniqueId& fragment_instance_id,
int backend_num,
ExecEnv* exec_env,
+ std::shared_ptr<QueryFragmentsCtx> fragments_ctx) :
+ _query_id(query_id),
+ _fragment_instance_id(fragment_instance_id),
+ _backend_num(backend_num),
+ _exec_env(exec_env),
+ _executor(exec_env, boost::bind<void>(
+ boost::mem_fn(&FragmentExecState::coordinator_callback),
this, _1, _2, _3)),
+ _timeout_second(-1),
+ _fragments_ctx(fragments_ctx) {
+ _start_time = DateTimeValue::local_time();
+ _coord_addr = _fragments_ctx->coord_addr;
+}
+
+FragmentExecState::FragmentExecState(
+ const TUniqueId& query_id,
+ const TUniqueId& fragment_instance_id,
+ int backend_num,
+ ExecEnv* exec_env,
const TNetworkAddress& coord_addr) :
_query_id(query_id),
_fragment_instance_id(fragment_instance_id),
@@ -175,7 +207,6 @@ FragmentExecState::FragmentExecState(
_coord_addr(coord_addr),
_executor(exec_env, boost::bind<void>(
boost::mem_fn(&FragmentExecState::coordinator_callback),
this, _1, _2, _3)),
- _set_rsc_info(false),
_timeout_second(-1) {
_start_time = DateTimeValue::local_time();
}
@@ -188,35 +219,24 @@ Status FragmentExecState::prepare(const
TExecPlanFragmentParams& params) {
_timeout_second = params.query_options.query_timeout;
}
- if (params.__isset.resource_info) {
- set_group(params.resource_info);
+ if (_fragments_ctx == nullptr) {
+ if (params.__isset.resource_info) {
+ set_group(params.resource_info);
+ }
}
- return _executor.prepare(params);
-}
-
-static void register_cgroups(const std::string& user, const std::string&
group) {
- TResourceInfo* new_info = new TResourceInfo();
- new_info->user = user;
- new_info->group = group;
- int ret = ResourceTls::set_resource_tls(new_info);
- if (ret != 0) {
- delete new_info;
- return;
+ if (_fragments_ctx == nullptr) {
+ return _executor.prepare(params);
+ } else {
+ return _executor.prepare(params, _fragments_ctx.get());
}
- CgroupsMgr::apply_cgroup(new_info->user, new_info->group);
}
Status FragmentExecState::execute() {
int64_t duration_ns = 0;
{
SCOPED_RAW_TIMER(&duration_ns);
- if (_set_rsc_info) {
- register_cgroups(_user, _group);
- } else {
- CgroupsMgr::apply_system_cgroup();
- }
-
+ CgroupsMgr::apply_system_cgroup();
WARN_IF_ERROR(_executor.open(), strings::Substitute("Got error while
opening fragment $0",
print_id(_fragment_instance_id)));
_executor.close();
@@ -381,6 +401,7 @@ void FragmentExecState::coordinator_callback(
FragmentMgr::FragmentMgr(ExecEnv* exec_env)
: _exec_env(exec_env),
_fragment_map(),
+ _fragments_ctx_map(),
_stop_background_threads_latch(1) {
_entity =
DorisMetrics::instance()->metric_registry()->register_entity("FragmentMgr");
INT_UGAUGE_METRIC_REGISTER(_entity, timeout_canceled_fragment_count);
@@ -418,31 +439,37 @@ FragmentMgr::~FragmentMgr() {
{
std::lock_guard<std::mutex> lock(_lock);
_fragment_map.clear();
+ _fragments_ctx_map.clear();
}
}
static void empty_function(PlanFragmentExecutor* exec) {
}
-void FragmentMgr::exec_actual(
+void FragmentMgr::_exec_actual(
std::shared_ptr<FragmentExecState> exec_state,
FinishCallback cb) {
+
exec_state->execute();
+ std::shared_ptr<QueryFragmentsCtx> fragments_ctx =
exec_state->get_fragments_ctx();
+ bool all_done = false;
+ if (fragments_ctx != nullptr) {
+ // decrease the number of unfinished fragments
+ all_done = fragments_ctx->countdown();
+ }
+
+ // remove exec state after this fragment finished
{
std::lock_guard<std::mutex> lock(_lock);
- auto iter = _fragment_map.find(exec_state->fragment_instance_id());
- if (iter != _fragment_map.end()) {
- _fragment_map.erase(iter);
- } else {
- // Impossible
- LOG(WARNING) << "missing entry in fragment exec state map:
instance_id="
- << exec_state->fragment_instance_id();
+ _fragment_map.erase(exec_state->fragment_instance_id());
+ if (all_done) {
+ _fragments_ctx_map.erase(fragments_ctx->query_id);
}
}
+
// Callback after remove from this id
cb(exec_state->executor());
- // NOTE: 'exec_state' is desconstructed here without lock
}
Status FragmentMgr::exec_plan_fragment(
@@ -454,7 +481,6 @@ Status FragmentMgr::exec_plan_fragment(
const TExecPlanFragmentParams& params,
FinishCallback cb) {
const TUniqueId& fragment_instance_id = params.params.fragment_instance_id;
- std::shared_ptr<FragmentExecState> exec_state;
{
std::lock_guard<std::mutex> lock(_lock);
auto iter = _fragment_map.find(fragment_instance_id);
@@ -463,32 +489,82 @@ Status FragmentMgr::exec_plan_fragment(
return Status::OK();
}
}
- exec_state.reset(new FragmentExecState(
- params.params.query_id,
- fragment_instance_id,
- params.backend_num,
- _exec_env,
- params.coord));
- RETURN_IF_ERROR(exec_state->prepare(params));
+ std::shared_ptr<FragmentExecState> exec_state;
+ if (!params.__isset.is_simplified_param) {
+ // This is an old version params, all @Common components is set in
TExecPlanFragmentParams.
+ exec_state.reset(new FragmentExecState(
+ params.params.query_id,
+ params.params.fragment_instance_id,
+ params.backend_num,
+ _exec_env,
+ params.coord));
+ } else {
+ std::shared_ptr<QueryFragmentsCtx> fragments_ctx;
+ if (params.is_simplified_param) {
+ // Get common components from _fragments_ctx_map
+ std::lock_guard<std::mutex> lock(_lock);
+ auto search = _fragments_ctx_map.find(params.params.query_id);
+ if (search == _fragments_ctx_map.end()) {
+ return Status::InternalError(strings::Substitute(
+ "Failed to get query fragments context. Query may
be timeout or be cancelled. host: ",
+ BackendOptions::get_localhost()));
+ }
+ fragments_ctx = search->second;
+ } else {
+ // This may be a first fragment request of the query.
+ // Create the query fragments context.
+ fragments_ctx.reset(new
QueryFragmentsCtx(params.fragment_num_on_host));
+ fragments_ctx->query_id = params.params.query_id;
+ RETURN_IF_ERROR(DescriptorTbl::create(&(fragments_ctx->obj_pool),
params.desc_tbl, &(fragments_ctx->desc_tbl)));
+ fragments_ctx->coord_addr = params.coord;
+ fragments_ctx->query_globals = params.query_globals;
+
+ if (params.__isset.resource_info) {
+ fragments_ctx->user = params.resource_info.user;
+ fragments_ctx->group = params.resource_info.group;
+ fragments_ctx->set_rsc_info = true;
+ }
+
+ if (params.__isset.query_options) {
+ fragments_ctx->timeout_second =
params.query_options.query_timeout;
+ }
+
+ {
+ // Find _fragments_ctx_map again, in case some other request
has already
+ // create the query fragments context.
+ std::lock_guard<std::mutex> lock(_lock);
+ auto search = _fragments_ctx_map.find(params.params.query_id);
+ if (search == _fragments_ctx_map.end()) {
+
_fragments_ctx_map.insert(std::make_pair(fragments_ctx->query_id,
fragments_ctx));
+ } else {
+ // Already has a query fragmentscontext, use it
+ fragments_ctx = search->second;
+ }
+ }
+ }
+
+ exec_state.reset(new FragmentExecState(
+ fragments_ctx->query_id,
+ params.params.fragment_instance_id,
+ params.backend_num,
+ _exec_env,
+ fragments_ctx));
+ }
+
+ RETURN_IF_ERROR(exec_state->prepare(params));
{
std::lock_guard<std::mutex> lock(_lock);
- auto iter = _fragment_map.find(fragment_instance_id);
- if (iter != _fragment_map.end()) {
- // Duplicated
- return Status::InternalError("Double execute");
- }
- // register exec_state before starting exec thread
- _fragment_map.insert(std::make_pair(fragment_instance_id, exec_state));
+
_fragment_map.insert(std::make_pair(params.params.fragment_instance_id,
exec_state));
}
auto st = _thread_pool->submit_func(
- std::bind<void>(&FragmentMgr::exec_actual, this, exec_state, cb));
+ std::bind<void>(&FragmentMgr::_exec_actual, this, exec_state, cb));
if (!st.ok()) {
{
// Remove the exec state added
std::lock_guard<std::mutex> lock(_lock);
- _fragment_map.erase(fragment_instance_id);
+ _fragment_map.erase(params.params.fragment_instance_id);
}
exec_state->cancel_before_execute();
return Status::InternalError(strings::Substitute(
@@ -498,11 +574,11 @@ Status FragmentMgr::exec_plan_fragment(
return Status::OK();
}
-Status FragmentMgr::cancel(const TUniqueId& id, const
PPlanFragmentCancelReason& reason) {
+Status FragmentMgr::cancel(const TUniqueId& fragment_id, const
PPlanFragmentCancelReason& reason) {
std::shared_ptr<FragmentExecState> exec_state;
{
std::lock_guard<std::mutex> lock(_lock);
- auto iter = _fragment_map.find(id);
+ auto iter = _fragment_map.find(fragment_id);
if (iter == _fragment_map.end()) {
// No match
return Status::OK();
@@ -517,18 +593,25 @@ Status FragmentMgr::cancel(const TUniqueId& id, const
PPlanFragmentCancelReason&
void FragmentMgr::cancel_worker() {
LOG(INFO) << "FragmentMgr cancel worker start working.";
do {
- std::vector<TUniqueId> to_delete;
+ std::vector<TUniqueId> to_cancel;
DateTimeValue now = DateTimeValue::local_time();
{
std::lock_guard<std::mutex> lock(_lock);
for (auto& it : _fragment_map) {
if (it.second->is_timeout(now)) {
- to_delete.push_back(it.second->fragment_instance_id());
+ to_cancel.push_back(it.second->fragment_instance_id());
+ }
+ }
+ for (auto it = _fragments_ctx_map.begin(); it !=
_fragments_ctx_map.end();) {
+ if (it->second->is_timeout(now)) {
+ it = _fragments_ctx_map.erase(it);
+ } else {
+ ++it;
}
}
}
- timeout_canceled_fragment_count->increment(to_delete.size());
- for (auto& id : to_delete) {
+ timeout_canceled_fragment_count->increment(to_cancel.size());
+ for (auto& id : to_cancel) {
cancel(id, PPlanFragmentCancelReason::TIMEOUT);
LOG(INFO) << "FragmentMgr cancel worker going to cancel timeout
fragment " << print_id(id);
}
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 6c5a24c..d69f128 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -38,12 +38,14 @@
namespace doris {
+class QueryFragmentsCtx;
class ExecEnv;
class FragmentExecState;
-class TExecPlanFragmentParams;
-class TUniqueId;
class PlanFragmentExecutor;
class ThreadPool;
+class TExecPlanFragmentParams;
+class TExecPlanFragmentParamsList;
+class TUniqueId;
std::string to_load_error_http_path(const std::string& file_name);
@@ -79,7 +81,7 @@ public:
Status exec_external_plan_fragment(const TScanOpenParams& params, const
TUniqueId& fragment_instance_id, std::vector<TScanColumnDesc>*
selected_columns);
private:
- void exec_actual(std::shared_ptr<FragmentExecState> exec_state,
+ void _exec_actual(std::shared_ptr<FragmentExecState> exec_state,
FinishCallback cb);
// This is input params
@@ -89,6 +91,8 @@ private:
// Make sure that remove this before no data reference FragmentExecState
std::unordered_map<TUniqueId, std::shared_ptr<FragmentExecState>>
_fragment_map;
+ // query id -> QueryFragmentsCtx
+ std::unordered_map<TUniqueId, std::shared_ptr<QueryFragmentsCtx>>
_fragments_ctx_map;
CountDownLatch _stop_background_threads_latch;
scoped_refptr<Thread> _cancel_thread;
diff --git a/be/src/runtime/plan_fragment_executor.cpp
b/be/src/runtime/plan_fragment_executor.cpp
index dac1c71..23b2cce 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -68,7 +68,9 @@ PlanFragmentExecutor::~PlanFragmentExecutor() {
DCHECK(!_report_thread_active);
}
-Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request) {
+Status PlanFragmentExecutor::prepare(
+ const TExecPlanFragmentParams& request,
+ const QueryFragmentsCtx* fragments_ctx) {
const TPlanFragmentExecParams& params = request.params;
_query_id = params.query_id;
@@ -77,8 +79,9 @@ Status PlanFragmentExecutor::prepare(const
TExecPlanFragmentParams& request) {
<< " backend_num=" << request.backend_num;
// VLOG(2) << "request:\n" << apache::thrift::ThriftDebugString(request);
+ const TQueryGlobals& query_globals = fragments_ctx == nullptr ?
request.query_globals : fragments_ctx->query_globals;
_runtime_state.reset(new RuntimeState(
- request, request.query_options, request.query_globals, _exec_env));
+ params, request.query_options, query_globals, _exec_env));
RETURN_IF_ERROR(_runtime_state->init_mem_trackers(_query_id));
_runtime_state->set_be_number(request.backend_num);
@@ -139,9 +142,13 @@ Status PlanFragmentExecutor::prepare(const
TExecPlanFragmentParams& request) {
RETURN_IF_ERROR(_runtime_state->create_block_mgr());
// set up desc tbl
- DescriptorTbl* desc_tbl = NULL;
- DCHECK(request.__isset.desc_tbl);
- RETURN_IF_ERROR(DescriptorTbl::create(obj_pool(), request.desc_tbl,
&desc_tbl));
+ DescriptorTbl* desc_tbl = nullptr;
+ if (fragments_ctx != nullptr) {
+ desc_tbl = fragments_ctx->desc_tbl;
+ } else {
+ DCHECK(request.__isset.desc_tbl);
+ RETURN_IF_ERROR(DescriptorTbl::create(obj_pool(), request.desc_tbl,
&desc_tbl));
+ }
_runtime_state->set_desc_tbl(desc_tbl);
// set up plan
@@ -150,12 +157,12 @@ Status PlanFragmentExecutor::prepare(const
TExecPlanFragmentParams& request) {
ExecNode::create_tree(_runtime_state.get(), obj_pool(),
request.fragment.plan, *desc_tbl, &_plan));
_runtime_state->set_fragment_root_id(_plan->id());
- if (request.params.__isset.debug_node_id) {
- DCHECK(request.params.__isset.debug_action);
- DCHECK(request.params.__isset.debug_phase);
+ if (params.__isset.debug_node_id) {
+ DCHECK(params.__isset.debug_action);
+ DCHECK(params.__isset.debug_phase);
ExecNode::set_debug_options(
- request.params.debug_node_id, request.params.debug_phase,
- request.params.debug_action, _plan);
+ params.debug_node_id, params.debug_phase,
+ params.debug_action, _plan);
}
// set #senders of exchange nodes before calling Prepare()
@@ -167,7 +174,6 @@ Status PlanFragmentExecutor::prepare(const
TExecPlanFragmentParams& request) {
DCHECK_GT(num_senders, 0);
static_cast<ExchangeNode*>(exch_node)->set_num_senders(num_senders);
}
-
RETURN_IF_ERROR(_plan->prepare(_runtime_state.get()));
// set scan ranges
diff --git a/be/src/runtime/plan_fragment_executor.h
b/be/src/runtime/plan_fragment_executor.h
index 9e56866..515e95d 100644
--- a/be/src/runtime/plan_fragment_executor.h
+++ b/be/src/runtime/plan_fragment_executor.h
@@ -24,11 +24,14 @@
#include "common/status.h"
#include "common/object_pool.h"
+#include "runtime/datetime_value.h"
#include "runtime/query_statistics.h"
#include "runtime/runtime_state.h"
+#include "util/hash_util.hpp"
namespace doris {
+class QueryFragmentsCtx;
class HdfsFsCache;
class ExecNode;
class RowDescriptor;
@@ -37,6 +40,7 @@ class DataSink;
class DataStreamMgr;
class RuntimeProfile;
class RuntimeState;
+class TNetworkAddress;
class TPlanExecRequest;
class TPlanFragment;
class TPlanFragmentExecParams;
@@ -90,7 +94,10 @@ public:
// If request.query_options.mem_limit > 0, it is used as an approximate
limit on the
// number of bytes this query can consume at runtime.
// The query will be aborted (MEM_LIMIT_EXCEEDED) if it goes over that
limit.
- Status prepare(const TExecPlanFragmentParams& request);
+ // If fragments_ctx is not null, some components will be got from
fragments_ctx.
+ Status prepare(
+ const TExecPlanFragmentParams& request,
+ const QueryFragmentsCtx* fragments_ctx = nullptr);
// Start execution. Call this prior to get_next().
// If this fragment has a sink, open() will send all rows produced
@@ -270,6 +277,57 @@ private:
};
+// Save the common components of fragments in a query.
+// Some components like DescriptorTbl may be very large
+// that will slow down each execution of fragments when DeSer them every time.
+class QueryFragmentsCtx {
+
+public:
+ QueryFragmentsCtx(int total_fragment_num)
+ : fragment_num(total_fragment_num),
+ timeout_second(-1) {
+ _start_time = DateTimeValue::local_time();
+ }
+
+ bool countdown() {
+ return fragment_num.fetch_sub(1) == 1;
+ }
+
+ bool is_timeout(const DateTimeValue& now) const {
+ if (timeout_second <= 0) {
+ return false;
+ }
+ if (now.second_diff(_start_time) > timeout_second) {
+ return true;
+ }
+ return false;
+ }
+
+public:
+ TUniqueId query_id;
+ DescriptorTbl* desc_tbl;
+ bool set_rsc_info = false;
+ std::string user;
+ std::string group;
+ TNetworkAddress coord_addr;
+ TQueryGlobals query_globals;
+
+ /// In the current implementation, for multiple fragments executed by a
query on the same BE node,
+ /// we store some common components in QueryFragmentsCtx, and save
QueryFragmentsCtx in FragmentMgr.
+ /// When all Fragments are executed, QueryFragmentsCtx needs to be deleted
from FragmentMgr.
+ /// Here we use a counter to store the number of Fragments that have not
yet been completed,
+ /// and after each Fragment is completed, this value will be reduced by
one.
+ /// When the last Fragment is completed, the counter is cleared, and the
worker thread of the last Fragment
+ /// will clean up QueryFragmentsCtx.
+ std::atomic<int> fragment_num;
+ int timeout_second;
+ ObjectPool obj_pool;
+
+private:
+ DateTimeValue _start_time;
+
+};
+
}
#endif
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 8b4de84..c6fe91a 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -73,15 +73,15 @@ RuntimeState::RuntimeState(
}
RuntimeState::RuntimeState(
- const TExecPlanFragmentParams& fragment_params,
+ const TPlanFragmentExecParams& fragment_exec_params,
const TQueryOptions& query_options,
const TQueryGlobals& query_globals, ExecEnv* exec_env) :
_fragment_mem_tracker(nullptr),
- _profile("Fragment " +
print_id(fragment_params.params.fragment_instance_id)),
+ _profile("Fragment " +
print_id(fragment_exec_params.fragment_instance_id)),
_obj_pool(new ObjectPool()),
_data_stream_recvrs_pool(new ObjectPool()),
_unreported_error_idx(0),
- _query_id(fragment_params.params.query_id),
+ _query_id(fragment_exec_params.query_id),
_is_cancelled(false),
_per_fragment_instance_idx(0),
_root_node_id(-1),
@@ -94,7 +94,7 @@ RuntimeState::RuntimeState(
_error_log_file_path(""),
_error_log_file(nullptr),
_instance_buffer_reservation(new ReservationTracker) {
- Status status = init(fragment_params.params.fragment_instance_id,
query_options, query_globals, exec_env);
+ Status status = init(fragment_exec_params.fragment_instance_id,
query_options, query_globals, exec_env);
DCHECK(status.ok());
}
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 8fc07e7..980e0af 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -70,7 +70,7 @@ public:
const TQueryGlobals& query_globals, ExecEnv* exec_env);
RuntimeState(
- const TExecPlanFragmentParams& fragment_params,
+ const TPlanFragmentExecParams& fragment_exec_params,
const TQueryOptions& query_options,
const TQueryGlobals& query_globals, ExecEnv* exec_env);
diff --git a/be/src/runtime/test_env.cc b/be/src/runtime/test_env.cc
index 6698c06..d22a288 100644
--- a/be/src/runtime/test_env.cc
+++ b/be/src/runtime/test_env.cc
@@ -53,7 +53,7 @@ RuntimeState* TestEnv::create_runtime_state(int64_t query_id)
{
TExecPlanFragmentParams plan_params = TExecPlanFragmentParams();
plan_params.params.query_id.hi = 0;
plan_params.params.query_id.lo = query_id;
- return new RuntimeState(plan_params, TQueryOptions(), TQueryGlobals(),
_exec_env.get());
+ return new RuntimeState(plan_params.params, TQueryOptions(),
TQueryGlobals(), _exec_env.get());
}
Status TestEnv::create_query_state(int64_t query_id, int max_buffers, int
block_size,
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index 799b847..61c2340 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -148,8 +148,8 @@ Status
PInternalServiceImpl<T>::_exec_plan_fragment(brpc::Controller* cntl) {
uint32_t len = ser_request.size();
RETURN_IF_ERROR(deserialize_thrift_msg(buf, &len, false, &t_request));
}
- LOG(INFO) << "exec plan fragment, fragment_instance_id=" <<
print_id(t_request.params.fragment_instance_id)
- << ", coord=" << t_request.coord << ", backend=" <<
t_request.backend_num;
+ // LOG(INFO) << "exec plan fragment, fragment_instance_id=" <<
print_id(t_request.params.fragment_instance_id)
+ // << ", coord=" << t_request.coord << ", backend=" <<
t_request.backend_num;
return _exec_env->fragment_mgr()->exec_plan_fragment(t_request);
}
diff --git a/be/test/runtime/buffered_block_mgr2_test.cpp
b/be/test/runtime/buffered_block_mgr2_test.cpp
index 709413b..94ca7a0 100644
--- a/be/test/runtime/buffered_block_mgr2_test.cpp
+++ b/be/test/runtime/buffered_block_mgr2_test.cpp
@@ -553,8 +553,6 @@ protected:
const int num_threads = 4;
boost::thread_group workers;
// Create a shared RuntimeState with no BufferedBlockMgr2.
- // RuntimeState* shared_state = new
RuntimeState(TExecPlanFragmentParams(), "",
- // _test_env->exec_env());
RuntimeState* shared_state = new RuntimeState(TUniqueId(),
TQueryOptions(), TQueryGlobals(),
_test_env->exec_env());
for (int i = 0; i < num_threads; ++i) {
diff --git a/be/test/runtime/fragment_mgr_test.cpp
b/be/test/runtime/fragment_mgr_test.cpp
index a8b0ce7..eb23dd9 100644
--- a/be/test/runtime/fragment_mgr_test.cpp
+++ b/be/test/runtime/fragment_mgr_test.cpp
@@ -37,7 +37,9 @@ PlanFragmentExecutor::PlanFragmentExecutor(ExecEnv* exec_env,
PlanFragmentExecutor::~PlanFragmentExecutor() {}
-Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request) {
+Status PlanFragmentExecutor::prepare(
+ const TExecPlanFragmentParams& request,
+ const BatchFragmentsCtx* batch_ctx) {
return s_prepare_status;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/QueryPlannerProfile.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/QueryPlannerProfile.java
new file mode 100644
index 0000000..6ea3efa
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/util/QueryPlannerProfile.java
@@ -0,0 +1,104 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.util;
+
+import org.apache.doris.thrift.TUnit;
+
+/**
+ * This profile is mainly used to record the time-consuming situation related
to
+ * executing SQL parsing, planning, scheduling, and fetching results on the FE
side.
+ * Can be expanded later.
+ *
+ * All timestamp is in nona second
+ */
+public class QueryPlannerProfile {
+ public static final String KEY_ANALYSIS = "Analysis Time";
+ public static final String KEY_PLAN = "Plan Time";
+ public static final String KEY_SCHEDULE = "Schedule Time";
+ public static final String KEY_FETCH = "Wait and Fetch Result Time";
+
+ // timestamp of query begin
+ private long queryBeginTime = -1;
+ // Analysis end time
+ private long queryAnalysisFinishTime = -1;
+ // Plan end time
+ private long queryPlanFinishTime = -1;
+ // Fragment schedule and send end time
+ private long queryScheduleFinishTime = -1;
+ // Query result fetch end time
+ private long queryFetchResultFinishTime = -1;
+
+ public void setQueryBeginTime() {
+ this.queryBeginTime = TimeUtils.getStartTime();
+ }
+
+ public void setQueryAnalysisFinishTime() {
+ this.queryAnalysisFinishTime = TimeUtils.getStartTime();
+ }
+
+ public void setQueryPlanFinishTime() {
+ this.queryPlanFinishTime = TimeUtils.getStartTime();
+ }
+
+ public void setQueryScheduleFinishTime() {
+ this.queryScheduleFinishTime = TimeUtils.getStartTime();
+ }
+
+ public void setQueryFetchResultFinishTime() {
+ this.queryFetchResultFinishTime = TimeUtils.getStartTime();
+ }
+
+ public long getQueryBeginTime() {
+ return queryBeginTime;
+ }
+
+ private String getPrettyQueryAnalysisFinishTime() {
+ if (queryBeginTime == -1 || queryAnalysisFinishTime == -1) {
+ return "N/A";
+ }
+ return RuntimeProfile.printCounter(queryAnalysisFinishTime -
queryBeginTime, TUnit.TIME_NS);
+ }
+
+ private String getPrettyQueryPlanFinishTime() {
+ if (queryAnalysisFinishTime == -1 || queryPlanFinishTime == -1) {
+ return "N/A";
+ }
+ return RuntimeProfile.printCounter(queryPlanFinishTime -
queryAnalysisFinishTime, TUnit.TIME_NS);
+ }
+
+ private String getPrettyQueryScheduleFinishTime() {
+ if (queryPlanFinishTime == -1 || queryScheduleFinishTime == -1) {
+ return "N/A";
+ }
+ return RuntimeProfile.printCounter(queryScheduleFinishTime -
queryPlanFinishTime, TUnit.TIME_NS);
+ }
+
+ private String getPrettyQueryFetchResultFinishTime() {
+ if (queryScheduleFinishTime == -1 || queryFetchResultFinishTime == -1)
{
+ return "N/A";
+ }
+ return RuntimeProfile.printCounter(queryFetchResultFinishTime -
queryScheduleFinishTime, TUnit.TIME_NS);
+ }
+
+ public void initRuntimeProfile(RuntimeProfile plannerProfile) {
+ plannerProfile.addInfoString(KEY_ANALYSIS,
getPrettyQueryAnalysisFinishTime());
+ plannerProfile.addInfoString(KEY_PLAN, getPrettyQueryPlanFinishTime());
+ plannerProfile.addInfoString(KEY_SCHEDULE,
getPrettyQueryScheduleFinishTime());
+ plannerProfile.addInfoString(KEY_FETCH,
getPrettyQueryFetchResultFinishTime());
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java
index d462c08..8d10831 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java
@@ -240,8 +240,8 @@ public class RuntimeProfile {
this.printChildCounters(prefix + " ", childCounterName, builder);
}
}
-
- private String printCounter(long value, TUnit type) {
+
+ public static String printCounter(long value, TUnit type) {
StringBuilder builder = new StringBuilder();
long tmpValue = value;
switch (type) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 4d4f851..ef71425 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -81,18 +81,20 @@ import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TTabletCommitInfo;
import org.apache.doris.thrift.TUniqueId;
+import org.apache.commons.collections.map.HashedMap;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.thrift.TException;
+
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
+import com.google.common.collect.HashMultiset;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Multiset;
import com.google.common.collect.Sets;
-import org.apache.commons.collections.map.HashedMap;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-import org.apache.thrift.TException;
-
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
@@ -134,6 +136,8 @@ public class Coordinator {
// copied from TQueryExecRequest; constant across all fragments
private TDescriptorTable descTable;
+ private Set<Long> alreadySentBackendIds = Sets.newHashSet();
+
// Why we use query global?
// When `NOW()` function is in sql, we need only one now(),
// but, we execute `NOW()` distributed.
@@ -440,22 +444,35 @@ public class Coordinator {
for (TUniqueId instanceId : instanceIds) {
profileDoneSignal.addMark(instanceId, -1L /* value is meaningless
*/);
}
+
+ sendFragment();
+ }
+
+ private void sendFragment() throws TException, RpcException, UserException
{
lock();
try {
- // execute all instances from up to bottom
- int backendId = 0;
+ Multiset<TNetworkAddress> hostCounter = HashMultiset.create();
+ for (FragmentExecParams params : fragmentExecParamsMap.values()) {
+ for (FInstanceExecParam fi : params.instanceExecParams) {
+ hostCounter.add(fi.host);
+ }
+ }
+ // Execute all instances from up to bottom
+ // NOTICE: We must ensure that these fragments are executed
sequentially,
+ // otherwise the data dependency between the fragments will be
destroyed.
+ int backendIdx = 0;
int profileFragmentId = 0;
long memoryLimit = queryOptions.getMemLimit();
for (PlanFragment fragment : fragments) {
FragmentExecParams params =
fragmentExecParamsMap.get(fragment.getFragmentId());
-
+
// set up exec states
int instanceNum = params.instanceExecParams.size();
Preconditions.checkState(instanceNum > 0);
- List<TExecPlanFragmentParams> tParams =
params.toThrift(backendId);
+ List<TExecPlanFragmentParams> tParams =
params.toThrift(backendIdx);
List<Pair<BackendExecState, Future<PExecPlanFragmentResult>>>
futures = Lists.newArrayList();
- //update memory limit for colocate join
+ // update memory limit for colocate join
if
(colocateFragmentIds.contains(fragment.getFragmentId().asInt())) {
int rate =
Math.min(Config.query_colocate_join_memory_limit_penalty_factor, instanceNum);
long newmemory = memoryLimit / rate;
@@ -464,7 +481,7 @@ public class Coordinator {
tParam.query_options.setMemLimit(newmemory);
}
}
-
+
boolean needCheckBackendState = false;
if (queryOptions.getQueryType() == TQueryType.LOAD &&
profileFragmentId == 0) {
// this is a load process, and it is the first fragment.
@@ -475,9 +492,13 @@ public class Coordinator {
int instanceId = 0;
for (TExecPlanFragmentParams tParam : tParams) {
- // TODO: pool of pre-formatted BackendExecStates?
BackendExecState execState = new
BackendExecState(fragment.getFragmentId(), instanceId++,
- profileFragmentId, tParam,
this.addressToBackendID);
+ profileFragmentId, tParam,
this.addressToBackendID);
+ execState.unsetFields();
+ // Each tParam will set the total number of Fragments that
need to be executed on the same BE,
+ // and the BE will determine whether all Fragments have
been executed based on this information.
+
tParam.setFragmentNumOnHost(hostCounter.count(execState.address));
+
backendExecStates.add(execState);
if (needCheckBackendState) {
needCheckBackendExecStates.add(execState);
@@ -486,18 +507,18 @@ public class Coordinator {
fragment.getFragmentId().asInt(), jobId);
}
}
-
futures.add(Pair.create(execState,
execState.execRemoteFragmentAsync()));
- backendId++;
+ backendIdx++;
}
+
for (Pair<BackendExecState, Future<PExecPlanFragmentResult>>
pair : futures) {
- TStatusCode code = TStatusCode.INTERNAL_ERROR;
+ TStatusCode code;
String errMsg = null;
Exception exception = null;
try {
PExecPlanFragmentResult result =
pair.second.get(Config.remote_fragment_exec_timeout_ms,
-
TimeUnit.MILLISECONDS);
+ TimeUnit.MILLISECONDS);
code =
TStatusCode.findByValue(result.status.status_code);
if (result.status.error_msgs != null &&
!result.status.error_msgs.isEmpty()) {
errMsg = result.status.error_msgs.get(0);
@@ -526,22 +547,27 @@ public class Coordinator {
}
queryStatus.setStatus(errMsg);
LOG.warn("exec plan fragment failed, errmsg={}, code:
{}, fragmentId={}, backend={}:{}",
- errMsg, code, fragment.getFragmentId(),
- pair.first.address.hostname,
pair.first.address.port);
+ errMsg, code, fragment.getFragmentId(),
+ pair.first.address.hostname,
pair.first.address.port);
cancelInternal(PPlanFragmentCancelReason.INTERNAL_ERROR);
switch (code) {
- case TIMEOUT:
- throw new UserException("query timeout. backend
id: " + pair.first.backend.getId());
- case THRIFT_RPC_ERROR:
-
SimpleScheduler.addToBlacklist(pair.first.backend.getId(), errMsg);
- throw new
RpcException(pair.first.backend.getHost(), "rpc failed");
- default:
- throw new UserException(errMsg);
+ case TIMEOUT:
+ throw new UserException("query timeout.
backend id: " + pair.first.backend.getId());
+ case THRIFT_RPC_ERROR:
+
SimpleScheduler.addToBlacklist(pair.first.backend.getId(), errMsg);
+ throw new
RpcException(pair.first.backend.getHost(), "rpc failed");
+ default:
+ throw new UserException(errMsg);
}
}
+
+ // succeed to send the plan fragment, update the
"alreadySentBackendIds"
+ alreadySentBackendIds.add(pair.first.backend.getId());
}
+
profileFragmentId += 1;
}
+
attachInstanceProfileToFragmentProfile();
} finally {
unlock();
@@ -1663,16 +1689,34 @@ public class Coordinator {
this.rpcParams = rpcParams;
this.initiated = false;
this.done = false;
- this.address =
fragmentExecParamsMap.get(fragmentId).instanceExecParams.get(instanceId).host;
+ FInstanceExecParam fi =
fragmentExecParamsMap.get(fragmentId).instanceExecParams.get(instanceId);
+ this.address = fi.host;
this.backend = idToBackend.get(addressToBackendID.get(address));
- String name = "Instance " +
DebugUtil.printId(fragmentExecParamsMap.get(fragmentId)
- .instanceExecParams.get(instanceId).instanceId) + "
(host=" + address + ")";
+ String name = "Instance " + DebugUtil.printId(fi.instanceId) + "
(host=" + address + ")";
this.profile = new RuntimeProfile(name);
this.hasCanceled = false;
this.lastMissingHeartbeatTime =
backend.getLastMissingHeartbeatTime();
}
+ /**
+ * Some information common to all Fragments does not need to be sent
repeatedly.
+ * Therefore, when we confirm that a certain BE has accepted the
information,
+ * we will delete the information in the subsequent Fragment to avoid
repeated sending.
+ * This information can be obtained from the cache of BE.
+ */
+ public void unsetFields() {
+ if (alreadySentBackendIds.contains(backend.getId())) {
+ this.rpcParams.unsetDescTbl();
+ this.rpcParams.unsetCoord();
+ this.rpcParams.unsetQueryGlobals();
+ this.rpcParams.unsetResourceInfo();
+ this.rpcParams.setIsSimplifiedParam(true);
+ } else {
+ this.rpcParams.setIsSimplifiedParam(false);
+ }
+ }
+
// update profile.
// return true if profile is updated. Otherwise, return false.
public synchronized boolean updateProfile(TReportExecStatusParams
params) {
@@ -1809,12 +1853,12 @@ public class Coordinator {
// execution parameters for a single fragment,
// per-fragment can have multiple FInstanceExecParam,
- // used to assemble TPlanFragmentExecParas
+ // used to assemble TPlanFragmentExecParas
protected class FragmentExecParams {
public PlanFragment fragment;
public List<TPlanFragmentDestination> destinations =
Lists.newArrayList();
public Map<Integer, Integer> perExchNumSenders =
Maps.newHashMap();
-
+
public List<PlanFragmentId> inputFragments = Lists.newArrayList();
public List<FInstanceExecParam> instanceExecParams =
Lists.newArrayList();
public FragmentScanRangeAssignment scanRangeAssignment = new
FragmentScanRangeAssignment();
@@ -1929,19 +1973,19 @@ public class Coordinator {
}
}
- // fragment instance exec param, it is used to assemble
- // the per-instance TPlanFragmentExecParas, as a member of
+ // fragment instance exec param, it is used to assemble
+ // the per-instance TPlanFragmentExecParas, as a member of
// FragmentExecParams
static class FInstanceExecParam {
TUniqueId instanceId;
TNetworkAddress host;
Map<Integer, List<TScanRangeParams>> perNodeScanRanges =
Maps.newHashMap();
-
+
int perFragmentInstanceIdx;
int senderId;
Set<Integer> bucketSeqSet = Sets.newHashSet();
-
+
FragmentExecParams fragmentExecParams;
public void addBucketSeq(int bucketSeq) {
@@ -1955,7 +1999,7 @@ public class Coordinator {
this.perFragmentInstanceIdx = perFragmentInstanceIdx;
this.fragmentExecParams = fragmentExecParams;
}
-
+
public PlanFragment fragment() {
return fragmentExecParams.fragment;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 0b2338f..daaea7d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -29,15 +29,15 @@ import org.apache.doris.analysis.QueryStmt;
import org.apache.doris.analysis.RedirectStatus;
import org.apache.doris.analysis.SelectStmt;
import org.apache.doris.analysis.SetStmt;
+import org.apache.doris.analysis.SetVar;
import org.apache.doris.analysis.ShowStmt;
import org.apache.doris.analysis.SqlParser;
import org.apache.doris.analysis.SqlScanner;
import org.apache.doris.analysis.StatementBase;
import org.apache.doris.analysis.StmtRewriter;
+import org.apache.doris.analysis.StringLiteral;
import org.apache.doris.analysis.UnsupportedStmt;
import org.apache.doris.analysis.UseStmt;
-import org.apache.doris.analysis.SetVar;
-import org.apache.doris.analysis.StringLiteral;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
@@ -55,6 +55,7 @@ import org.apache.doris.common.UserException;
import org.apache.doris.common.Version;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.ProfileManager;
+import org.apache.doris.common.util.QueryPlannerProfile;
import org.apache.doris.common.util.RuntimeProfile;
import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.common.util.TimeUtils;
@@ -125,6 +126,8 @@ public class StmtExecutor {
private PQueryStatistics statisticsForAuditLog;
private boolean isCached;
+ private QueryPlannerProfile plannerProfile = new QueryPlannerProfile();
+
// this constructor is mainly for proxy
public StmtExecutor(ConnectContext context, OriginStatement originStmt,
boolean isProxy) {
this.context = context;
@@ -148,7 +151,8 @@ public class StmtExecutor {
}
// At the end of query execution, we begin to add up profile
- public void initProfile(long beginTimeInNanoSecond) {
+ public void initProfile(QueryPlannerProfile plannerProfile) {
+ // Summary profile
profile = new RuntimeProfile("Query");
summaryProfile = new RuntimeProfile("Summary");
summaryProfile.addInfoString(ProfileManager.QUERY_ID,
DebugUtil.printId(context.queryId()));
@@ -167,9 +171,14 @@ public class StmtExecutor {
summaryProfile.addInfoString(ProfileManager.SQL_STATEMENT,
originStmt.originStmt);
summaryProfile.addInfoString(ProfileManager.IS_CACHED, isCached ?
"Yes" : "No");
+ RuntimeProfile plannerRuntimeProfile = new RuntimeProfile("Execution
Summary");
+ plannerProfile.initRuntimeProfile(plannerRuntimeProfile);
+ summaryProfile.addChild(plannerRuntimeProfile);
+
profile.addChild(summaryProfile);
+
if (coord != null) {
-
coord.getQueryProfile().getCounterTotalTime().setValue(TimeUtils.getEstimatedTime(beginTimeInNanoSecond));
+
coord.getQueryProfile().getCounterTotalTime().setValue(TimeUtils.getEstimatedTime(plannerProfile.getQueryBeginTime()));
coord.endProfile();
profile.addChild(coord.getQueryProfile());
coord = null;
@@ -231,7 +240,7 @@ public class StmtExecutor {
// IOException: talk with client failed.
public void execute() throws Exception {
- long beginTimeInNanoSecond = TimeUtils.getStartTime();
+ plannerProfile.setQueryBeginTime();
context.setStmtId(STMT_ID_GENERATOR.incrementAndGet());
// set query id
@@ -272,7 +281,7 @@ public class StmtExecutor {
}
handleQueryStmt();
if (context.getSessionVariable().isReportSucc()) {
- writeProfile(beginTimeInNanoSecond);
+ writeProfile();
}
break;
} catch (RpcException e) {
@@ -300,7 +309,7 @@ public class StmtExecutor {
try {
handleInsertStmt();
if (context.getSessionVariable().isReportSucc()) {
- writeProfile(beginTimeInNanoSecond);
+ writeProfile();
}
} catch (Throwable t) {
LOG.warn("handle insert stmt fail", t);
@@ -366,8 +375,8 @@ public class StmtExecutor {
masterOpExecutor.execute();
}
- private void writeProfile(long beginTimeInNanoSecond) {
- initProfile(beginTimeInNanoSecond);
+ private void writeProfile() {
+ initProfile(plannerProfile);
profile.computeTimeInChildProfile();
StringBuilder builder = new StringBuilder();
profile.prettyPrint(builder, "");
@@ -537,6 +546,7 @@ public class StmtExecutor {
if (isExplain) parsedStmt.setIsExplain(isExplain, isVerbose);
}
}
+ plannerProfile.setQueryAnalysisFinishTime();
// create plan
planner = new Planner();
@@ -548,6 +558,8 @@ public class StmtExecutor {
}
// TODO(zc):
// Preconditions.checkState(!analyzer.hasUnassignedConjuncts());
+
+ plannerProfile.setQueryPlanFinishTime();
}
private void resetAnalyzerAndStmt() {
@@ -751,6 +763,7 @@ public class StmtExecutor {
QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
new QeProcessorImpl.QueryInfo(context, originStmt.originStmt,
coord));
coord.exec();
+ plannerProfile.setQueryScheduleFinishTime();
while (true) {
batch = coord.getNext();
// for outfile query, there will be only one empty batch send back
with eos flag
@@ -780,6 +793,7 @@ public class StmtExecutor {
} else {
context.getState().setOk(statisticsForAuditLog.returned_rows, 0,
"");
}
+ plannerProfile.setQueryFetchResultFinishTime();
}
// Process a select statement.
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
index 7a01b7a..12394f6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
@@ -19,9 +19,13 @@ package org.apache.doris.rpc;
import org.apache.doris.common.Config;
import org.apache.doris.common.util.JdkUtils;
+import org.apache.doris.proto.PCacheResponse;
import org.apache.doris.proto.PCancelPlanFragmentRequest;
import org.apache.doris.proto.PCancelPlanFragmentResult;
+import org.apache.doris.proto.PClearCacheRequest;
import org.apache.doris.proto.PExecPlanFragmentResult;
+import org.apache.doris.proto.PFetchCacheRequest;
+import org.apache.doris.proto.PFetchCacheResult;
import org.apache.doris.proto.PFetchDataResult;
import org.apache.doris.proto.PPlanFragmentCancelReason;
import org.apache.doris.proto.PProxyRequest;
@@ -29,14 +33,14 @@ import org.apache.doris.proto.PProxyResult;
import org.apache.doris.proto.PTriggerProfileReportResult;
import org.apache.doris.proto.PUniqueId;
import org.apache.doris.proto.PUpdateCacheRequest;
-import org.apache.doris.proto.PCacheResponse;
-import org.apache.doris.proto.PFetchCacheRequest;
-import org.apache.doris.proto.PFetchCacheResult;
-import org.apache.doris.proto.PClearCacheRequest;
import org.apache.doris.thrift.TExecPlanFragmentParams;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TUniqueId;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.thrift.TException;
+
import com.baidu.bjf.remoting.protobuf.utils.JDKCompilerHelper;
import com.baidu.bjf.remoting.protobuf.utils.compiler.JdkCompiler;
import com.baidu.jprotobuf.pbrpc.client.ProtobufRpcProxy;
@@ -44,10 +48,6 @@ import com.baidu.jprotobuf.pbrpc.transport.RpcClient;
import com.baidu.jprotobuf.pbrpc.transport.RpcClientOptions;
import com.google.common.collect.Maps;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-import org.apache.thrift.TException;
-
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.Future;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/PBackendService.java
b/fe/fe-core/src/main/java/org/apache/doris/rpc/PBackendService.java
index 3e68009..6e95fe6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/PBackendService.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/PBackendService.java
@@ -17,18 +17,18 @@
package org.apache.doris.rpc;
+import org.apache.doris.proto.PCacheResponse;
import org.apache.doris.proto.PCancelPlanFragmentRequest;
import org.apache.doris.proto.PCancelPlanFragmentResult;
+import org.apache.doris.proto.PClearCacheRequest;
import org.apache.doris.proto.PExecPlanFragmentResult;
+import org.apache.doris.proto.PFetchCacheRequest;
+import org.apache.doris.proto.PFetchCacheResult;
import org.apache.doris.proto.PFetchDataResult;
import org.apache.doris.proto.PProxyRequest;
import org.apache.doris.proto.PProxyResult;
import org.apache.doris.proto.PTriggerProfileReportResult;
import org.apache.doris.proto.PUpdateCacheRequest;
-import org.apache.doris.proto.PClearCacheRequest;
-import org.apache.doris.proto.PCacheResponse;
-import org.apache.doris.proto.PFetchCacheRequest;
-import org.apache.doris.proto.PFetchCacheResult;
import com.baidu.jprotobuf.pbrpc.ProtobufRPC;
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index 0f5c231..0263934 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -223,6 +223,7 @@ struct TExecPlanFragmentParams {
2: optional Planner.TPlanFragment fragment
// required in V1
+ // @Common components
3: optional Descriptors.TDescriptorTable desc_tbl
// required in V1
@@ -231,6 +232,7 @@ struct TExecPlanFragmentParams {
// Initiating coordinator.
// TODO: determine whether we can get this somehow via the Thrift rpc
mechanism.
// required in V1
+ // @Common components
5: optional Types.TNetworkAddress coord
// backend number assigned by coord to identify backend
@@ -239,6 +241,7 @@ struct TExecPlanFragmentParams {
// Global query parameters assigned by coordinator.
// required in V1
+ // @Common components
7: optional TQueryGlobals query_globals
// options for the query
@@ -250,6 +253,7 @@ struct TExecPlanFragmentParams {
9: optional bool is_report_success
// required in V1
+ // @Common components
10: optional Types.TResourceInfo resource_info
// load job related
@@ -257,6 +261,13 @@ struct TExecPlanFragmentParams {
12: optional string db_name
13: optional i64 load_job_id
14: optional TLoadErrorHubInfo load_error_hub_info
+
+ // The total number of fragments on same BE host
+ 15: optional i32 fragment_num_on_host;
+
+ // If true, all @Common components is unset and should be got from BE's cache
+ // If this field is unset or it set to false, all @Common components is set.
+ 16: optional bool is_simplified_param
}
struct TExecPlanFragmentResult {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]