This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ec29322  [Bug] Avoid waiting too long when rpc is slow. (#5669)
ec29322 is described below

commit ec29322c104e32c3038f8b04fe81281188c39b4a
Author: Lijia Liu <[email protected]>
AuthorDate: Fri Apr 23 09:46:40 2021 +0800

    [Bug] Avoid waiting too long when rpc is slow. (#5669)
    
    Total execution time should not longer than stream load timeout.
---
 be/src/exec/tablet_sink.cpp | 19 +++++++++++++++++--
 be/src/exec/tablet_sink.h   |  2 ++
 2 files changed, 19 insertions(+), 2 deletions(-)

diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index d08d291..d6989be 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -31,6 +31,7 @@
 #include "util/debug/sanitizer_scopes.h"
 #include "util/monotime.h"
 #include "util/uid_util.h"
+#include "util/time.h"
 
 namespace doris {
 namespace stream_load {
@@ -83,6 +84,7 @@ Status NodeChannel::init(RuntimeState* state) {
     _cur_add_batch_request.set_eos(false);
 
     _rpc_timeout_ms = state->query_options().query_timeout * 1000;
+    _timeout_watch.start();
 
     _load_info = "load_id=" + print_id(_parent->_load_id) +
                  ", txn_id=" + std::to_string(_parent->_txn_id);
@@ -299,7 +301,11 @@ void NodeChannel::cancel() {
     auto closure = new RefCountClosure<PTabletWriterCancelResult>();
 
     closure->ref();
-    closure->cntl.set_timeout_ms(_rpc_timeout_ms);
+    int remain_ms = _rpc_timeout_ms - _timeout_watch.elapsed_time() / 
NANOS_PER_MILLIS;
+    if (UNLIKELY(remain_ms < _min_rpc_timeout_ms)) {
+        remain_ms = _min_rpc_timeout_ms;
+    }
+    closure->cntl.set_timeout_ms(remain_ms);
     if (config::tablet_writer_ignore_eovercrowded) {
         closure->cntl.ignore_eovercrowded();
     }
@@ -336,7 +342,16 @@ int NodeChannel::try_send_and_fetch_status() {
         }
 
         _add_batch_closure->reset();
-        _add_batch_closure->cntl.set_timeout_ms(_rpc_timeout_ms);
+        int remain_ms = _rpc_timeout_ms - _timeout_watch.elapsed_time() / 
NANOS_PER_MILLIS;
+        if (UNLIKELY(remain_ms < _min_rpc_timeout_ms)) {
+            if (remain_ms <= 0 && !request.eos()) {
+                cancel();
+                return 0;
+            } else {
+                remain_ms = _min_rpc_timeout_ms;
+            }
+        }
+        _add_batch_closure->cntl.set_timeout_ms(remain_ms);
         if (config::tablet_writer_ignore_eovercrowded) {
             _add_batch_closure->cntl.ignore_eovercrowded();
         }
diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h
index 4a42202..26ff0c8 100644
--- a/be/src/exec/tablet_sink.h
+++ b/be/src/exec/tablet_sink.h
@@ -210,7 +210,9 @@ private:
 
     // this should be set in init() using config
     int _rpc_timeout_ms = 60000;
+    static const int _min_rpc_timeout_ms = 1000; // The min query timeout is 1 
second.
     int64_t _next_packet_seq = 0;
+    MonotonicStopWatch _timeout_watch;
 
     // user cancel or get some errors
     std::atomic<bool> _cancelled{false};

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to