This is an automated email from the ASF dual-hosted git repository.
wangbo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 441125652c5 [Chore](statistic) do not use memory order relaxed on
QueryStatistics and add sync on te… (#38048)
441125652c5 is described below
commit 441125652c56a1e9caac48240309464f765191e1
Author: Pxl <[email protected]>
AuthorDate: Mon Jul 22 10:56:56 2024 +0800
[Chore](statistic) do not use memory order relaxed on QueryStatistics and
add sync on te… (#38048)
## Proposed changes
1. do not use memory order relaxed on QueryStatistics
2. remove some unused code
3. add sync to test dry_run
---
be/src/runtime/query_statistics.cpp | 75 +++++-------------
be/src/runtime/query_statistics.h | 90 ++++------------------
.../suites/query_p0/dry_run/dry_run.groovy | 2 +
3 files changed, 33 insertions(+), 134 deletions(-)
diff --git a/be/src/runtime/query_statistics.cpp
b/be/src/runtime/query_statistics.cpp
index 126fb10af5b..110efef5ab9 100644
--- a/be/src/runtime/query_statistics.cpp
+++ b/be/src/runtime/query_statistics.cpp
@@ -27,22 +27,20 @@
namespace doris {
void QueryStatistics::merge(const QueryStatistics& other) {
- scan_rows += other.scan_rows.load(std::memory_order_relaxed);
- scan_bytes += other.scan_bytes.load(std::memory_order_relaxed);
- cpu_nanos += other.cpu_nanos.load(std::memory_order_relaxed);
- shuffle_send_bytes +=
other.shuffle_send_bytes.load(std::memory_order_relaxed);
- shuffle_send_rows +=
other.shuffle_send_rows.load(std::memory_order_relaxed);
- _scan_bytes_from_local_storage +=
-
other._scan_bytes_from_local_storage.load(std::memory_order_relaxed);
- _scan_bytes_from_remote_storage +=
-
other._scan_bytes_from_remote_storage.load(std::memory_order_relaxed);
-
- int64_t other_peak_mem =
other.max_peak_memory_bytes.load(std::memory_order_relaxed);
+ scan_rows += other.scan_rows;
+ scan_bytes += other.scan_bytes;
+ cpu_nanos += other.cpu_nanos;
+ shuffle_send_bytes += other.shuffle_send_bytes;
+ shuffle_send_rows += other.shuffle_send_rows;
+ _scan_bytes_from_local_storage += other._scan_bytes_from_local_storage;
+ _scan_bytes_from_remote_storage += other._scan_bytes_from_remote_storage;
+
+ int64_t other_peak_mem = other.max_peak_memory_bytes;
if (other_peak_mem > this->max_peak_memory_bytes) {
this->max_peak_memory_bytes = other_peak_mem;
}
- int64_t other_memory_used =
other.current_used_memory_bytes.load(std::memory_order_relaxed);
+ int64_t other_memory_used = other.current_used_memory_bytes;
if (other_memory_used > 0) {
this->current_used_memory_bytes = other_memory_used;
}
@@ -61,15 +59,14 @@ void QueryStatistics::to_pb(PQueryStatistics* statistics) {
void QueryStatistics::to_thrift(TQueryStatistics* statistics) const {
DCHECK(statistics != nullptr);
- statistics->__set_scan_bytes(scan_bytes.load(std::memory_order_relaxed));
- statistics->__set_scan_rows(scan_rows.load(std::memory_order_relaxed));
- statistics->__set_cpu_ms(cpu_nanos.load(std::memory_order_relaxed) /
NANOS_PER_MILLIS);
-
statistics->__set_returned_rows(returned_rows.load(std::memory_order_relaxed));
-
statistics->__set_max_peak_memory_bytes(max_peak_memory_bytes.load(std::memory_order_relaxed));
- statistics->__set_current_used_memory_bytes(
- current_used_memory_bytes.load(std::memory_order_relaxed));
-
statistics->__set_shuffle_send_bytes(shuffle_send_bytes.load(std::memory_order_relaxed));
-
statistics->__set_shuffle_send_rows(shuffle_send_rows.load(std::memory_order_relaxed));
+ statistics->__set_scan_bytes(scan_bytes);
+ statistics->__set_scan_rows(scan_rows);
+ statistics->__set_cpu_ms(cpu_nanos / NANOS_PER_MILLIS);
+ statistics->__set_returned_rows(returned_rows);
+ statistics->__set_max_peak_memory_bytes(max_peak_memory_bytes);
+ statistics->__set_current_used_memory_bytes(current_used_memory_bytes);
+ statistics->__set_shuffle_send_bytes(shuffle_send_bytes);
+ statistics->__set_shuffle_send_rows(shuffle_send_rows);
statistics->__set_scan_bytes_from_remote_storage(_scan_bytes_from_remote_storage);
statistics->__set_scan_bytes_from_local_storage(_scan_bytes_from_local_storage);
}
@@ -82,42 +79,6 @@ void QueryStatistics::from_pb(const PQueryStatistics&
statistics) {
_scan_bytes_from_remote_storage =
statistics.scan_bytes_from_remote_storage();
}
-void QueryStatistics::merge(QueryStatisticsRecvr* recvr) {
- recvr->merge(this);
-}
-
-void QueryStatistics::merge(QueryStatisticsRecvr* recvr, int sender_id) {
- DCHECK(recvr != nullptr);
- auto QueryStatisticsptr = recvr->find(sender_id);
- if (QueryStatisticsptr) {
- merge(*QueryStatisticsptr);
- }
-}
-
QueryStatistics::~QueryStatistics() {}
-void QueryStatisticsRecvr::insert(const PQueryStatistics& statistics, int
sender_id) {
- std::lock_guard<std::mutex> l(_lock);
- if (!_query_statistics.contains(sender_id)) {
- _query_statistics[sender_id] = std::make_shared<QueryStatistics>();
- }
- _query_statistics[sender_id]->from_pb(statistics);
-}
-
-void QueryStatisticsRecvr::insert(QueryStatisticsPtr statistics, int
sender_id) {
- if (!statistics->collected()) return;
- if (_query_statistics.contains(sender_id)) return;
- std::lock_guard<std::mutex> l(_lock);
- _query_statistics[sender_id] = statistics;
-}
-
-QueryStatisticsPtr QueryStatisticsRecvr::find(int sender_id) {
- std::lock_guard<std::mutex> l(_lock);
- auto it = _query_statistics.find(sender_id);
- if (it != _query_statistics.end()) {
- return it->second;
- }
- return nullptr;
-}
-
} // namespace doris
diff --git a/be/src/runtime/query_statistics.h
b/be/src/runtime/query_statistics.h
index e71a136789a..0a19dfd46f0 100644
--- a/be/src/runtime/query_statistics.h
+++ b/be/src/runtime/query_statistics.h
@@ -31,7 +31,6 @@
namespace doris {
-class QueryStatisticsRecvr;
class PNodeStatistics;
class PQueryStatistics;
@@ -53,82 +52,44 @@ public:
void merge(const QueryStatistics& other);
- void add_scan_rows(int64_t delta_scan_rows) {
- this->scan_rows.fetch_add(delta_scan_rows, std::memory_order_relaxed);
- }
+ void add_scan_rows(int64_t delta_scan_rows) { scan_rows +=
delta_scan_rows; }
- void add_scan_bytes(int64_t delta_scan_bytes) {
- this->scan_bytes.fetch_add(delta_scan_bytes,
std::memory_order_relaxed);
- }
+ void add_scan_bytes(int64_t delta_scan_bytes) { scan_bytes +=
delta_scan_bytes; }
- void add_cpu_nanos(int64_t delta_cpu_time) {
- this->cpu_nanos.fetch_add(delta_cpu_time, std::memory_order_relaxed);
- }
+ void add_cpu_nanos(int64_t delta_cpu_time) { cpu_nanos += delta_cpu_time; }
- void add_shuffle_send_bytes(int64_t delta_bytes) {
- this->shuffle_send_bytes.fetch_add(delta_bytes,
std::memory_order_relaxed);
- }
+ void add_shuffle_send_bytes(int64_t delta_bytes) { shuffle_send_bytes +=
delta_bytes; }
- void add_shuffle_send_rows(int64_t delta_rows) {
- this->shuffle_send_rows.fetch_add(delta_rows,
std::memory_order_relaxed);
- }
+ void add_shuffle_send_rows(int64_t delta_rows) { shuffle_send_rows +=
delta_rows; }
void add_scan_bytes_from_local_storage(int64_t
scan_bytes_from_local_storage) {
- this->_scan_bytes_from_local_storage += scan_bytes_from_local_storage;
+ _scan_bytes_from_local_storage += scan_bytes_from_local_storage;
}
void add_scan_bytes_from_remote_storage(int64_t
scan_bytes_from_remote_storage) {
- this->_scan_bytes_from_remote_storage +=
scan_bytes_from_remote_storage;
+ _scan_bytes_from_remote_storage += scan_bytes_from_remote_storage;
}
- void add_returned_rows(int64_t num_rows) {
- this->returned_rows.fetch_add(num_rows, std::memory_order_relaxed);
- }
+ void add_returned_rows(int64_t num_rows) { returned_rows += num_rows; }
void set_max_peak_memory_bytes(int64_t max_peak_memory_bytes) {
- this->max_peak_memory_bytes.store(max_peak_memory_bytes,
std::memory_order_relaxed);
+ this->max_peak_memory_bytes = max_peak_memory_bytes;
}
void set_current_used_memory_bytes(int64_t current_used_memory) {
- this->current_used_memory_bytes.store(current_used_memory,
std::memory_order_relaxed);
- }
-
- void merge(QueryStatisticsRecvr* recvr);
-
- void merge(QueryStatisticsRecvr* recvr, int sender_id);
-
- void clearNodeStatistics();
-
- void clear() {
- scan_rows.store(0, std::memory_order_relaxed);
- scan_bytes.store(0, std::memory_order_relaxed);
- cpu_nanos.store(0, std::memory_order_relaxed);
- shuffle_send_bytes.store(0, std::memory_order_relaxed);
- shuffle_send_rows.store(0, std::memory_order_relaxed);
- _scan_bytes_from_local_storage.store(0);
- _scan_bytes_from_remote_storage.store(0);
-
- returned_rows.store(0, std::memory_order_relaxed);
- max_peak_memory_bytes.store(0, std::memory_order_relaxed);
- clearNodeStatistics();
- //clear() is used before collection, so calling "clear" is equivalent
to being collected.
- set_collected();
+ current_used_memory_bytes = current_used_memory;
}
void to_pb(PQueryStatistics* statistics);
void to_thrift(TQueryStatistics* statistics) const;
void from_pb(const PQueryStatistics& statistics);
bool collected() const { return _collected; }
- void set_collected() { _collected = true; }
- int64_t get_scan_rows() { return
scan_rows.load(std::memory_order_relaxed); }
- int64_t get_scan_bytes() { return
scan_bytes.load(std::memory_order_relaxed); }
- int64_t get_current_used_memory_bytes() {
- return current_used_memory_bytes.load(std::memory_order_relaxed);
- }
+ int64_t get_scan_rows() { return scan_rows; }
+ int64_t get_scan_bytes() { return scan_bytes; }
+ int64_t get_current_used_memory_bytes() { return
current_used_memory_bytes; }
private:
- friend class QueryStatisticsRecvr;
std::atomic<int64_t> scan_rows;
std::atomic<int64_t> scan_bytes;
std::atomic<int64_t> cpu_nanos;
@@ -148,30 +109,5 @@ private:
};
using QueryStatisticsPtr = std::shared_ptr<QueryStatistics>;
// It is used for collecting sub plan query statistics in DataStreamRecvr.
-class QueryStatisticsRecvr {
-public:
- ~QueryStatisticsRecvr() = default;
-
- // Transmitted via RPC, incurring serialization overhead.
- void insert(const PQueryStatistics& statistics, int sender_id);
-
- // using local_exchange for transmission, only need to hold a shared
pointer.
- void insert(QueryStatisticsPtr statistics, int sender_id);
-
- QueryStatisticsPtr find(int sender_id);
-
-private:
- friend class QueryStatistics;
-
- void merge(QueryStatistics* statistics) {
- std::lock_guard<std::mutex> l(_lock);
- for (auto& pair : _query_statistics) {
- statistics->merge(*(pair.second));
- }
- }
-
- std::map<int, QueryStatisticsPtr> _query_statistics;
- std::mutex _lock;
-};
} // namespace doris
diff --git a/regression-test/suites/query_p0/dry_run/dry_run.groovy
b/regression-test/suites/query_p0/dry_run/dry_run.groovy
index 19fc6e011d0..98a7d14d713 100644
--- a/regression-test/suites/query_p0/dry_run/dry_run.groovy
+++ b/regression-test/suites/query_p0/dry_run/dry_run.groovy
@@ -38,6 +38,8 @@ suite ("dry_run") {
sql "insert into d_table select -4,-4,-4,'d';"
sql "insert into d_table(k4,k2) values('d',4);"
+ sql "sync"
+
sql "set dry_run_query=true;"
qt_select_star "select * from d_table;"
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]