This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/dev-1.1.2 by this push:
     new d22b7a09c9 (sink) stat rpc time in tablet_sink (#12251)
d22b7a09c9 is described below

commit d22b7a09c9a23eb9d391d9f3ea44c9e63791b3d4
Author: Yongqiang YANG <[email protected]>
AuthorDate: Thu Sep 1 16:35:58 2022 +0800

    (sink) stat rpc time in tablet_sink (#12251)
---
 be/src/exec/tablet_sink.cpp | 16 ++++++++++------
 be/src/exec/tablet_sink.h   |  7 ++++++-
 2 files changed, 16 insertions(+), 7 deletions(-)

diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index 560189c03b..a600f83e29 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -202,6 +202,7 @@ Status NodeChannel::open_wait() {
             // will be blocked.
             _add_batches_finished = true;
         }
+        _add_batch_counter.add_batch_rpc_time_us += 
_add_batch_closure->watch.elapsed_time() / 1000;
     });
 
     _add_batch_closure->addSuccessHandler([this](const 
PTabletWriterAddBatchResult& result,
@@ -242,6 +243,7 @@ Status NodeChannel::open_wait() {
             _add_batch_counter.add_batch_wait_execution_time_us += 
result.wait_execution_time_us();
             _add_batch_counter.add_batch_num++;
         }
+        _add_batch_counter.add_batch_rpc_time_us += 
_add_batch_closure->watch.elapsed_time() / 1000;
     });
     return status;
 }
@@ -330,13 +332,14 @@ Status NodeChannel::add_row(BlockRow& block_row, int64_t 
tablet_id) {
 }
 
 void NodeChannel::_sleep_if_memory_exceed() {
-    size_t begin_us = _mem_exceeded_block_ns / 1000;
+    static size_t times = 1;
     while (!_cancelled && _pending_batches_bytes > _max_pending_batches_bytes) 
{
         SCOPED_ATOMIC_TIMER(&_mem_exceeded_block_ns);
         SleepFor(MonoDelta::FromMilliseconds(10));
-        if (_mem_exceeded_block_ns / 1000 - begin_us > 5000000) {
-            begin_us = _mem_exceeded_block_ns / 1000;
-            LOG(INFO) << "sink sleeps too long, pending_batches_bytes = " << 
_pending_batches_bytes
+        if (_mem_exceeded_block_ns / 1000 / times > 60000000) {
+            ++times;
+            LOG(INFO) << "sink sleep, mem_exceeded_block_us = " << 
_mem_exceeded_block_ns
+                      << ", pending_batches_bytes = " << _pending_batches_bytes
                       << ", max_pending_batches_bytes = " << 
_max_pending_batches_bytes
                       << ", is_packet_in_flight = " << 
_add_batch_closure->is_packet_in_flight()
                       << ", next_packet_seq = " << _next_packet_seq
@@ -1088,11 +1091,12 @@ Status OlapTableSink::close(RuntimeState* state, Status 
close_status) {
         std::stringstream ss;
         ss << "finished to close olap table sink. load_id=" << 
print_id(_load_id)
            << ", txn_id=" << _txn_id
-           << ", node add batch time(ms)/wait execution time(ms)/close 
time(ms)/num: ";
+           << ", node add batch time(ms)/wait execution time(ms)/rpc 
time(ms)/close time(ms)/num: ";
         for (auto const& pair : node_add_batch_counter_map) {
             ss << "{" << pair.first << ":(" << 
(pair.second.add_batch_execution_time_us / 1000)
                << ")(" << (pair.second.add_batch_wait_execution_time_us / 
1000) << ")("
-               << pair.second.close_wait_time_ms << ")(" << 
pair.second.add_batch_num << ")} ";
+               << pair.second.close_wait_time_ms << ")(" << 
(pair.second.add_batch_rpc_time_us / 1000)
+               << ")(" << pair.second.add_batch_num << ")} ";
         }
         LOG(INFO) << ss.str();
     } else {
diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h
index 34abf496c0..2a258d62da 100644
--- a/be/src/exec/tablet_sink.h
+++ b/be/src/exec/tablet_sink.h
@@ -67,12 +67,15 @@ struct AddBatchCounter {
     int64_t add_batch_num = 0;
     // time passed between marked close and finish close
     int64_t close_wait_time_ms = 0;
+    // time consumed by rpc
+    int64_t add_batch_rpc_time_us = 0;
 
     AddBatchCounter& operator+=(const AddBatchCounter& rhs) {
         add_batch_execution_time_us += rhs.add_batch_execution_time_us;
         add_batch_wait_execution_time_us += 
rhs.add_batch_wait_execution_time_us;
         add_batch_num += rhs.add_batch_num;
         close_wait_time_ms += rhs.close_wait_time_ms;
+        add_batch_rpc_time_us += rhs.add_batch_rpc_time_us;
         return *this;
     }
     friend AddBatchCounter operator+(const AddBatchCounter& lhs, const 
AddBatchCounter& rhs) {
@@ -88,7 +91,7 @@ struct AddBatchCounter {
 template <typename T>
 class ReusableClosure : public google::protobuf::Closure {
 public:
-    ReusableClosure() : cid(INVALID_BTHREAD_ID) {}
+    ReusableClosure() : cid(INVALID_BTHREAD_ID) { watch.start(); }
     ~ReusableClosure() {
         // shouldn't delete when Run() is calling or going to be called, wait 
for current Run() done.
         join();
@@ -120,6 +123,7 @@ public:
     void reset() {
         cntl.Reset();
         cid = cntl.call_id();
+        watch.reset();
     }
 
     bool try_set_in_flight() {
@@ -152,6 +156,7 @@ public:
 
     brpc::Controller cntl;
     T result;
+    MonotonicStopWatch watch;
 
 private:
     brpc::CallId cid;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to