This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 5e9db799 fix(replication): slave blocks until keepalive timer is 
reached when master is gone without fin/rst notification (#2662)
5e9db799 is described below

commit 5e9db7995192be8cfc0edc3ac9775081580580c2
Author: sryan yuan <[email protected]>
AuthorDate: Sat Nov 16 23:22:56 2024 +0800

    fix(replication): slave blocks until keepalive timer is reached when master 
is gone without fin/rst notification (#2662)
    
    Co-authored-by: yxj25245 <[email protected]>
    Co-authored-by: hulk <[email protected]>
    Co-authored-by: Twice <[email protected]>
    Co-authored-by: Twice <[email protected]>
---
 kvrocks.conf               | 14 ++++++++++++++
 src/cluster/replication.cc | 20 +++++++++++++++++---
 src/common/io_util.cc      | 12 ++++++++++--
 src/common/status.h        |  4 ++++
 src/config/config.cc       |  2 ++
 src/config/config.h        |  2 ++
 6 files changed, 49 insertions(+), 5 deletions(-)

diff --git a/kvrocks.conf b/kvrocks.conf
index 0ff0ce50..13b1fb6c 100644
--- a/kvrocks.conf
+++ b/kvrocks.conf
@@ -174,6 +174,20 @@ slave-read-only yes
 # By default the priority is 100.
 slave-priority 100
 
+# Change the default timeout in milliseconds for socket connect during 
replication.
+# The default value is 3100, and 0 means no timeout.
+#
+# If the master is unreachable before connecting, not having a timeout may 
block future
+# 'clusterx setnodes' commands because the replication thread is blocked on 
connect.
+replication-connect-timeout-ms 3100
+
+# Change the default timeout in milliseconds for socket recv during fullsync.
+# The default value is 3200, and 0 means no timeout.
+#
+# If the master is unreachable when fetching SST files, not having a timeout 
may block
+# future 'clusterx setnodes' commands because the replication thread is 
blocked on recv.
+replication-recv-timeout-ms 3200
+
 # TCP listen() backlog.
 #
 # In high requests-per-second environments you need an high backlog in order
diff --git a/src/cluster/replication.cc b/src/cluster/replication.cc
index dff2d3d7..cd7fe197 100644
--- a/src/cluster/replication.cc
+++ b/src/cluster/replication.cc
@@ -252,7 +252,6 @@ void ReplicationThread::CallbacksStateMachine::Start() {
   }
 
   uint64_t last_connect_timestamp = 0;
-  int connect_timeout_ms = 3100;
 
   while (!repl_->stop_flag_ && bev == nullptr) {
     if (util::GetTimeStampMS() - last_connect_timestamp < 1000) {
@@ -260,7 +259,7 @@ void ReplicationThread::CallbacksStateMachine::Start() {
       sleep(1);
     }
     last_connect_timestamp = util::GetTimeStampMS();
-    auto cfd = util::SockConnect(repl_->host_, repl_->port_, 
connect_timeout_ms);
+    auto cfd = util::SockConnect(repl_->host_, repl_->port_, 
repl_->srv_->GetConfig()->replication_connect_timeout_ms);
     if (!cfd) {
       LOG(ERROR) << "[replication] Failed to connect the master, err: " << 
cfd.Msg();
       continue;
@@ -777,7 +776,10 @@ Status ReplicationThread::parallelFetchFile(const 
std::string &dir,
           }
           auto exit = MakeScopeExit([ssl] { SSL_free(ssl); });
 #endif
-          int sock_fd = GET_OR_RET(util::SockConnect(this->host_, this->port_, 
ssl).Prefixed("connect the server err"));
+          int sock_fd = GET_OR_RET(util::SockConnect(this->host_, this->port_, 
ssl,
+                                                     
this->srv_->GetConfig()->replication_connect_timeout_ms,
+                                                     
this->srv_->GetConfig()->replication_recv_timeout_ms)
+                                       .Prefixed("connect the server err"));
 #ifdef ENABLE_OPENSSL
           exit.Disable();
 #endif
@@ -874,6 +876,12 @@ Status ReplicationThread::fetchFile(int sock_fd, evbuffer 
*evbuf, const std::str
     UniqueEvbufReadln line(evbuf, EVBUFFER_EOL_CRLF_STRICT);
     if (!line) {
       if (auto s = util::EvbufferRead(evbuf, sock_fd, -1, ssl); !s) {
+        if (s.Is<Status::TryAgain>()) {
+          if (stop_flag_) {
+            return {Status::NotOK, "replication thread was stopped"};
+          }
+          continue;
+        }
         return std::move(s).Prefixed("read size");
       }
       continue;
@@ -907,6 +915,12 @@ Status ReplicationThread::fetchFile(int sock_fd, evbuffer 
*evbuf, const std::str
       remain -= data_len;
     } else {
       if (auto s = util::EvbufferRead(evbuf, sock_fd, -1, ssl); !s) {
+        if (s.Is<Status::TryAgain>()) {
+          if (stop_flag_) {
+            return {Status::NotOK, "replication thread was stopped"};
+          }
+          continue;
+        }
         return std::move(s).Prefixed("read sst file");
       }
     }
diff --git a/src/common/io_util.cc b/src/common/io_util.cc
index 35fa80d9..23cccc69 100644
--- a/src/common/io_util.cc
+++ b/src/common/io_util.cc
@@ -502,7 +502,12 @@ StatusOr<int> EvbufferRead(evbuffer *buf, evutil_socket_t 
fd, int howmuch, [[may
       howmuch = BUFFER_SIZE;
     }
     if (howmuch = SSL_read(ssl, tmp, howmuch); howmuch <= 0) {
-      return {Status::NotOK, fmt::format("failed to read from SSL connection: 
{}", fmt::streamed(SSLError(howmuch)))};
+      int err = SSL_get_error(ssl, howmuch);
+      if (err == SSL_ERROR_ZERO_RETURN) {
+        return {Status::EndOfFile, "EOF encountered while reading from SSL 
connection"};
+      }
+      return {(err == SSL_ERROR_WANT_READ) ? Status::TryAgain : Status::NotOK,
+              fmt::format("failed to read from SSL connection: {}", 
fmt::streamed(SSLError(howmuch)))};
     }
 
     if (int ret = evbuffer_add(buf, tmp, howmuch); ret == -1) {
@@ -514,8 +519,11 @@ StatusOr<int> EvbufferRead(evbuffer *buf, evutil_socket_t 
fd, int howmuch, [[may
 #endif
   if (int ret = evbuffer_read(buf, fd, howmuch); ret > 0) {
     return ret;
+  } else if (ret == 0) {
+    return {Status::EndOfFile, "EOF encountered while reading from socket"};
   } else {
-    return {Status::NotOK, fmt::format("failed to read from socket: {}", 
strerror(errno))};
+    return {(errno == EWOULDBLOCK || errno == EAGAIN) ? Status::TryAgain : 
Status::NotOK,
+            fmt::format("failed to read from socket: {}", strerror(errno))};
   }
 }
 
diff --git a/src/common/status.h b/src/common/status.h
index b4b228a0..823e5681 100644
--- a/src/common/status.h
+++ b/src/common/status.h
@@ -75,6 +75,10 @@ class [[nodiscard]] Status {
     // Search
     NoPrefixMatched,
     TypeMismatched,
+
+    // IO
+    TryAgain,
+    EndOfFile,
   };
 
   Status() : impl_{nullptr} {}
diff --git a/src/config/config.cc b/src/config/config.cc
index 165e352f..c2491844 100644
--- a/src/config/config.cc
+++ b/src/config/config.cc
@@ -203,6 +203,8 @@ Config::Config() {
       {"slave-empty-db-before-fullsync", false, new 
YesNoField(&slave_empty_db_before_fullsync, false)},
       {"slave-priority", false, new IntField(&slave_priority, 100, 0, 
INT_MAX)},
       {"slave-read-only", false, new YesNoField(&slave_readonly, true)},
+      {"replication-connect-timeout-ms", false, new 
IntField(&replication_connect_timeout_ms, 3100, 0, INT_MAX)},
+      {"replication-recv-timeout-ms", false, new 
IntField(&replication_recv_timeout_ms, 3200, 0, INT_MAX)},
       {"use-rsid-psync", true, new YesNoField(&use_rsid_psync, false)},
       {"profiling-sample-ratio", false, new IntField(&profiling_sample_ratio, 
0, 0, 100)},
       {"profiling-sample-record-max-len", false, new 
IntField(&profiling_sample_record_max_len, 256, 0, INT_MAX)},
diff --git a/src/config/config.h b/src/config/config.h
index 3b9d99de..3dcc8d87 100644
--- a/src/config/config.h
+++ b/src/config/config.h
@@ -105,6 +105,8 @@ struct Config {
   bool slave_serve_stale_data = true;
   bool slave_empty_db_before_fullsync = false;
   int slave_priority = 100;
+  int replication_connect_timeout_ms = 3100;
+  int replication_recv_timeout_ms = 3200;
   int max_db_size = 0;
   int max_replication_mb = 0;
   int max_io_mb = 0;

Reply via email to