This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 5e9db799 fix(replication): slave blocks until keepalive timer is
reached when master is gone without fin/rst notification (#2662)
5e9db799 is described below
commit 5e9db7995192be8cfc0edc3ac9775081580580c2
Author: sryan yuan <[email protected]>
AuthorDate: Sat Nov 16 23:22:56 2024 +0800
fix(replication): slave blocks until keepalive timer is reached when master
is gone without fin/rst notification (#2662)
Co-authored-by: yxj25245 <[email protected]>
Co-authored-by: hulk <[email protected]>
Co-authored-by: Twice <[email protected]>
Co-authored-by: Twice <[email protected]>
---
kvrocks.conf | 14 ++++++++++++++
src/cluster/replication.cc | 20 +++++++++++++++++---
src/common/io_util.cc | 12 ++++++++++--
src/common/status.h | 4 ++++
src/config/config.cc | 2 ++
src/config/config.h | 2 ++
6 files changed, 49 insertions(+), 5 deletions(-)
diff --git a/kvrocks.conf b/kvrocks.conf
index 0ff0ce50..13b1fb6c 100644
--- a/kvrocks.conf
+++ b/kvrocks.conf
@@ -174,6 +174,20 @@ slave-read-only yes
# By default the priority is 100.
slave-priority 100
+# Change the default timeout in milliseconds for socket connect during
replication.
+# The default value is 3100, and 0 means no timeout.
+#
+# If the master is unreachable before connecting, not having a timeout may
block future
+# 'clusterx setnodes' commands because the replication thread is blocked on
connect.
+replication-connect-timeout-ms 3100
+
+# Change the default timeout in milliseconds for socket recv during fullsync.
+# The default value is 3200, and 0 means no timeout.
+#
+# If the master is unreachable when fetching SST files, not having a timeout
may block
+# future 'clusterx setnodes' commands because the replication thread is
blocked on recv.
+replication-recv-timeout-ms 3200
+
# TCP listen() backlog.
#
# In high requests-per-second environments you need an high backlog in order
diff --git a/src/cluster/replication.cc b/src/cluster/replication.cc
index dff2d3d7..cd7fe197 100644
--- a/src/cluster/replication.cc
+++ b/src/cluster/replication.cc
@@ -252,7 +252,6 @@ void ReplicationThread::CallbacksStateMachine::Start() {
}
uint64_t last_connect_timestamp = 0;
- int connect_timeout_ms = 3100;
while (!repl_->stop_flag_ && bev == nullptr) {
if (util::GetTimeStampMS() - last_connect_timestamp < 1000) {
@@ -260,7 +259,7 @@ void ReplicationThread::CallbacksStateMachine::Start() {
sleep(1);
}
last_connect_timestamp = util::GetTimeStampMS();
- auto cfd = util::SockConnect(repl_->host_, repl_->port_,
connect_timeout_ms);
+ auto cfd = util::SockConnect(repl_->host_, repl_->port_,
repl_->srv_->GetConfig()->replication_connect_timeout_ms);
if (!cfd) {
LOG(ERROR) << "[replication] Failed to connect the master, err: " <<
cfd.Msg();
continue;
@@ -777,7 +776,10 @@ Status ReplicationThread::parallelFetchFile(const
std::string &dir,
}
auto exit = MakeScopeExit([ssl] { SSL_free(ssl); });
#endif
- int sock_fd = GET_OR_RET(util::SockConnect(this->host_, this->port_,
ssl).Prefixed("connect the server err"));
+ int sock_fd = GET_OR_RET(util::SockConnect(this->host_, this->port_,
ssl,
+
this->srv_->GetConfig()->replication_connect_timeout_ms,
+
this->srv_->GetConfig()->replication_recv_timeout_ms)
+ .Prefixed("connect the server err"));
#ifdef ENABLE_OPENSSL
exit.Disable();
#endif
@@ -874,6 +876,12 @@ Status ReplicationThread::fetchFile(int sock_fd, evbuffer
*evbuf, const std::str
UniqueEvbufReadln line(evbuf, EVBUFFER_EOL_CRLF_STRICT);
if (!line) {
if (auto s = util::EvbufferRead(evbuf, sock_fd, -1, ssl); !s) {
+ if (s.Is<Status::TryAgain>()) {
+ if (stop_flag_) {
+ return {Status::NotOK, "replication thread was stopped"};
+ }
+ continue;
+ }
return std::move(s).Prefixed("read size");
}
continue;
@@ -907,6 +915,12 @@ Status ReplicationThread::fetchFile(int sock_fd, evbuffer
*evbuf, const std::str
remain -= data_len;
} else {
if (auto s = util::EvbufferRead(evbuf, sock_fd, -1, ssl); !s) {
+ if (s.Is<Status::TryAgain>()) {
+ if (stop_flag_) {
+ return {Status::NotOK, "replication thread was stopped"};
+ }
+ continue;
+ }
return std::move(s).Prefixed("read sst file");
}
}
diff --git a/src/common/io_util.cc b/src/common/io_util.cc
index 35fa80d9..23cccc69 100644
--- a/src/common/io_util.cc
+++ b/src/common/io_util.cc
@@ -502,7 +502,12 @@ StatusOr<int> EvbufferRead(evbuffer *buf, evutil_socket_t
fd, int howmuch, [[may
howmuch = BUFFER_SIZE;
}
if (howmuch = SSL_read(ssl, tmp, howmuch); howmuch <= 0) {
- return {Status::NotOK, fmt::format("failed to read from SSL connection:
{}", fmt::streamed(SSLError(howmuch)))};
+ int err = SSL_get_error(ssl, howmuch);
+ if (err == SSL_ERROR_ZERO_RETURN) {
+ return {Status::EndOfFile, "EOF encountered while reading from SSL
connection"};
+ }
+ return {(err == SSL_ERROR_WANT_READ) ? Status::TryAgain : Status::NotOK,
+ fmt::format("failed to read from SSL connection: {}",
fmt::streamed(SSLError(howmuch)))};
}
if (int ret = evbuffer_add(buf, tmp, howmuch); ret == -1) {
@@ -514,8 +519,11 @@ StatusOr<int> EvbufferRead(evbuffer *buf, evutil_socket_t
fd, int howmuch, [[may
#endif
if (int ret = evbuffer_read(buf, fd, howmuch); ret > 0) {
return ret;
+ } else if (ret == 0) {
+ return {Status::EndOfFile, "EOF encountered while reading from socket"};
} else {
- return {Status::NotOK, fmt::format("failed to read from socket: {}",
strerror(errno))};
+ return {(errno == EWOULDBLOCK || errno == EAGAIN) ? Status::TryAgain :
Status::NotOK,
+ fmt::format("failed to read from socket: {}", strerror(errno))};
}
}
diff --git a/src/common/status.h b/src/common/status.h
index b4b228a0..823e5681 100644
--- a/src/common/status.h
+++ b/src/common/status.h
@@ -75,6 +75,10 @@ class [[nodiscard]] Status {
// Search
NoPrefixMatched,
TypeMismatched,
+
+ // IO
+ TryAgain,
+ EndOfFile,
};
Status() : impl_{nullptr} {}
diff --git a/src/config/config.cc b/src/config/config.cc
index 165e352f..c2491844 100644
--- a/src/config/config.cc
+++ b/src/config/config.cc
@@ -203,6 +203,8 @@ Config::Config() {
{"slave-empty-db-before-fullsync", false, new
YesNoField(&slave_empty_db_before_fullsync, false)},
{"slave-priority", false, new IntField(&slave_priority, 100, 0,
INT_MAX)},
{"slave-read-only", false, new YesNoField(&slave_readonly, true)},
+ {"replication-connect-timeout-ms", false, new
IntField(&replication_connect_timeout_ms, 3100, 0, INT_MAX)},
+ {"replication-recv-timeout-ms", false, new
IntField(&replication_recv_timeout_ms, 3200, 0, INT_MAX)},
{"use-rsid-psync", true, new YesNoField(&use_rsid_psync, false)},
{"profiling-sample-ratio", false, new IntField(&profiling_sample_ratio,
0, 0, 100)},
{"profiling-sample-record-max-len", false, new
IntField(&profiling_sample_record_max_len, 256, 0, INT_MAX)},
diff --git a/src/config/config.h b/src/config/config.h
index 3b9d99de..3dcc8d87 100644
--- a/src/config/config.h
+++ b/src/config/config.h
@@ -105,6 +105,8 @@ struct Config {
bool slave_serve_stale_data = true;
bool slave_empty_db_before_fullsync = false;
int slave_priority = 100;
+ int replication_connect_timeout_ms = 3100;
+ int replication_recv_timeout_ms = 3200;
int max_db_size = 0;
int max_replication_mb = 0;
int max_io_mb = 0;