This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/dev-1.1.2 by this push:
new d22b7a09c9 (sink) stat rpc time in tablet_sink (#12251)
d22b7a09c9 is described below
commit d22b7a09c9a23eb9d391d9f3ea44c9e63791b3d4
Author: Yongqiang YANG <[email protected]>
AuthorDate: Thu Sep 1 16:35:58 2022 +0800
(sink) stat rpc time in tablet_sink (#12251)
---
be/src/exec/tablet_sink.cpp | 16 ++++++++++------
be/src/exec/tablet_sink.h | 7 ++++++-
2 files changed, 16 insertions(+), 7 deletions(-)
diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index 560189c03b..a600f83e29 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -202,6 +202,7 @@ Status NodeChannel::open_wait() {
// will be blocked.
_add_batches_finished = true;
}
+ _add_batch_counter.add_batch_rpc_time_us +=
_add_batch_closure->watch.elapsed_time() / 1000;
});
_add_batch_closure->addSuccessHandler([this](const
PTabletWriterAddBatchResult& result,
@@ -242,6 +243,7 @@ Status NodeChannel::open_wait() {
_add_batch_counter.add_batch_wait_execution_time_us +=
result.wait_execution_time_us();
_add_batch_counter.add_batch_num++;
}
+ _add_batch_counter.add_batch_rpc_time_us +=
_add_batch_closure->watch.elapsed_time() / 1000;
});
return status;
}
@@ -330,13 +332,14 @@ Status NodeChannel::add_row(BlockRow& block_row, int64_t
tablet_id) {
}
void NodeChannel::_sleep_if_memory_exceed() {
- size_t begin_us = _mem_exceeded_block_ns / 1000;
+ static size_t times = 1;
while (!_cancelled && _pending_batches_bytes > _max_pending_batches_bytes)
{
SCOPED_ATOMIC_TIMER(&_mem_exceeded_block_ns);
SleepFor(MonoDelta::FromMilliseconds(10));
- if (_mem_exceeded_block_ns / 1000 - begin_us > 5000000) {
- begin_us = _mem_exceeded_block_ns / 1000;
- LOG(INFO) << "sink sleeps too long, pending_batches_bytes = " <<
_pending_batches_bytes
+ if (_mem_exceeded_block_ns / 1000 / times > 60000000) {
+ ++times;
+ LOG(INFO) << "sink sleep, mem_exceeded_block_us = " <<
_mem_exceeded_block_ns
+ << ", pending_batches_bytes = " << _pending_batches_bytes
<< ", max_pending_batches_bytes = " <<
_max_pending_batches_bytes
<< ", is_packet_in_flight = " <<
_add_batch_closure->is_packet_in_flight()
<< ", next_packet_seq = " << _next_packet_seq
@@ -1088,11 +1091,12 @@ Status OlapTableSink::close(RuntimeState* state, Status
close_status) {
std::stringstream ss;
ss << "finished to close olap table sink. load_id=" <<
print_id(_load_id)
<< ", txn_id=" << _txn_id
- << ", node add batch time(ms)/wait execution time(ms)/close
time(ms)/num: ";
+ << ", node add batch time(ms)/wait execution time(ms)/rpc
time(ms)/close time(ms)/num: ";
for (auto const& pair : node_add_batch_counter_map) {
ss << "{" << pair.first << ":(" <<
(pair.second.add_batch_execution_time_us / 1000)
<< ")(" << (pair.second.add_batch_wait_execution_time_us /
1000) << ")("
- << pair.second.close_wait_time_ms << ")(" <<
pair.second.add_batch_num << ")} ";
+ << pair.second.close_wait_time_ms << ")(" <<
(pair.second.add_batch_rpc_time_us / 1000)
+ << ")(" << pair.second.add_batch_num << ")} ";
}
LOG(INFO) << ss.str();
} else {
diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h
index 34abf496c0..2a258d62da 100644
--- a/be/src/exec/tablet_sink.h
+++ b/be/src/exec/tablet_sink.h
@@ -67,12 +67,15 @@ struct AddBatchCounter {
int64_t add_batch_num = 0;
// time passed between marked close and finish close
int64_t close_wait_time_ms = 0;
+ // time consumed by rpc
+ int64_t add_batch_rpc_time_us = 0;
AddBatchCounter& operator+=(const AddBatchCounter& rhs) {
add_batch_execution_time_us += rhs.add_batch_execution_time_us;
add_batch_wait_execution_time_us +=
rhs.add_batch_wait_execution_time_us;
add_batch_num += rhs.add_batch_num;
close_wait_time_ms += rhs.close_wait_time_ms;
+ add_batch_rpc_time_us += rhs.add_batch_rpc_time_us;
return *this;
}
friend AddBatchCounter operator+(const AddBatchCounter& lhs, const
AddBatchCounter& rhs) {
@@ -88,7 +91,7 @@ struct AddBatchCounter {
template <typename T>
class ReusableClosure : public google::protobuf::Closure {
public:
- ReusableClosure() : cid(INVALID_BTHREAD_ID) {}
+ ReusableClosure() : cid(INVALID_BTHREAD_ID) { watch.start(); }
~ReusableClosure() {
// shouldn't delete when Run() is calling or going to be called, wait
for current Run() done.
join();
@@ -120,6 +123,7 @@ public:
void reset() {
cntl.Reset();
cid = cntl.call_id();
+ watch.reset();
}
bool try_set_in_flight() {
@@ -152,6 +156,7 @@ public:
brpc::Controller cntl;
T result;
+ MonotonicStopWatch watch;
private:
brpc::CallId cid;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]