This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 5e99b276 Improve RESP handling code in replication (#2334)
5e99b276 is described below
commit 5e99b276e5983acc142a7606fd62445b4914b41a
Author: Twice <[email protected]>
AuthorDate: Wed May 29 21:17:50 2024 +0900
Improve RESP handling code in replication (#2334)
---
src/cluster/replication.cc | 33 ++++++++++++++++++---------------
src/cluster/replication.h | 6 +++---
src/commands/cmd_replication.cc | 5 +++--
src/commands/cmd_server.cc | 4 +++-
src/commands/error_constants.h | 2 ++
src/common/event_util.h | 3 +++
src/server/redis_connection.cc | 8 +++++---
src/server/redis_reply.cc | 4 ++--
src/server/redis_reply.h | 4 +++-
tests/gocase/unit/hello/hello_test.go | 4 ++--
10 files changed, 44 insertions(+), 29 deletions(-)
diff --git a/src/cluster/replication.cc b/src/cluster/replication.cc
index ea9c5587..a3567284 100644
--- a/src/cluster/replication.cc
+++ b/src/cluster/replication.cc
@@ -33,6 +33,7 @@
#include <string>
#include <thread>
+#include "commands/error_constants.h"
#include "event_util.h"
#include "fmt/format.h"
#include "io_util.h"
@@ -402,13 +403,13 @@ ReplicationThread::CBState
ReplicationThread::authWriteCB(bufferevent *bev) {
return CBState::NEXT;
}
-inline bool ResponseLineIsOK(const char *line) { return strncmp(line, "+OK",
3) == 0; }
+inline bool ResponseLineIsOK(std::string_view line) { return line ==
RESP_PREFIX_SIMPLE_STRING "OK"; }
ReplicationThread::CBState ReplicationThread::authReadCB(bufferevent *bev) {
// NOLINT
auto input = bufferevent_get_input(bev);
UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
if (!line) return CBState::AGAIN;
- if (!ResponseLineIsOK(line.get())) {
+ if (!ResponseLineIsOK(line.View())) {
// Auth failed
LOG(ERROR) << "[replication] Auth failed: " << line.get();
return CBState::RESTART;
@@ -430,7 +431,7 @@ ReplicationThread::CBState
ReplicationThread::checkDBNameReadCB(bufferevent *bev
if (!line) return CBState::AGAIN;
if (line[0] == '-') {
- if (isRestoringError(line.get())) {
+ if (isRestoringError(line.View())) {
LOG(WARNING) << "The master was restoring the db, retry later";
} else {
LOG(ERROR) << "Failed to get the db name, " << line.get();
@@ -468,18 +469,18 @@ ReplicationThread::CBState
ReplicationThread::replConfReadCB(bufferevent *bev) {
if (!line) return CBState::AGAIN;
// on unknown option: first try without announce ip, if it fails again - do
nothing (to prevent infinite loop)
- if (isUnknownOption(line.get()) && !next_try_without_announce_ip_address_) {
+ if (isUnknownOption(line.View()) && !next_try_without_announce_ip_address_) {
next_try_without_announce_ip_address_ = true;
LOG(WARNING) << "The old version master, can't handle ip-address, "
<< "try without it again";
// Retry previous state, i.e. send replconf again
return CBState::PREV;
}
- if (line[0] == '-' && isRestoringError(line.get())) {
+ if (line[0] == '-' && isRestoringError(line.View())) {
LOG(WARNING) << "The master was restoring the db, retry later";
return CBState::RESTART;
}
- if (!ResponseLineIsOK(line.get())) {
+ if (!ResponseLineIsOK(line.View())) {
LOG(WARNING) << "[replication] Failed to replconf: " << line.get() + 1;
// backward compatible with old version that doesn't support replconf cmd
return CBState::NEXT;
@@ -530,12 +531,12 @@ ReplicationThread::CBState
ReplicationThread::tryPSyncReadCB(bufferevent *bev) {
UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
if (!line) return CBState::AGAIN;
- if (line[0] == '-' && isRestoringError(line.get())) {
+ if (line[0] == '-' && isRestoringError(line.View())) {
LOG(WARNING) << "The master was restoring the db, retry later";
return CBState::RESTART;
}
- if (line[0] == '-' && isWrongPsyncNum(line.get())) {
+ if (line[0] == '-' && isWrongPsyncNum(line.View())) {
next_try_old_psync_ = true;
LOG(WARNING) << "The old version master, can't handle new PSYNC, "
<< "try old PSYNC again";
@@ -543,7 +544,7 @@ ReplicationThread::CBState
ReplicationThread::tryPSyncReadCB(bufferevent *bev) {
return CBState::PREV;
}
- if (!ResponseLineIsOK(line.get())) {
+ if (!ResponseLineIsOK(line.View())) {
// PSYNC isn't OK, we should use FullSync
// Switch to fullsync state machine
fullsync_steps_.Start();
@@ -844,7 +845,7 @@ Status ReplicationThread::sendAuth(int sock_fd, ssl_st
*ssl) {
}
UniqueEvbufReadln line(evbuf.get(), EVBUFFER_EOL_CRLF_STRICT);
if (!line) continue;
- if (!ResponseLineIsOK(line.get())) {
+ if (!ResponseLineIsOK(line.View())) {
return {Status::NotOK, "auth got invalid response"};
}
break;
@@ -998,15 +999,17 @@ Status ReplicationThread::parseWriteBatch(const
std::string &batch_string) {
return Status::OK();
}
-bool ReplicationThread::isRestoringError(const char *err) {
- return std::string(err) == "-ERR restoring the db from backup";
+bool ReplicationThread::isRestoringError(std::string_view err) {
+ return err == std::string(RESP_PREFIX_ERROR) + redis::errRestoringBackup;
}
-bool ReplicationThread::isWrongPsyncNum(const char *err) {
- return std::string(err) == "-ERR wrong number of arguments";
+bool ReplicationThread::isWrongPsyncNum(std::string_view err) {
+ return err == std::string(RESP_PREFIX_ERROR) + redis::errWrongNumArguments;
}
-bool ReplicationThread::isUnknownOption(const char *err) { return
std::string(err) == "-ERR unknown option"; }
+bool ReplicationThread::isUnknownOption(std::string_view err) {
+ return err == fmt::format("{}ERR {}", RESP_PREFIX_ERROR,
redis::errUnknownOption);
+}
rocksdb::Status WriteBatchHandler::PutCF(uint32_t column_family_id, const
rocksdb::Slice &key,
const rocksdb::Slice &value) {
diff --git a/src/cluster/replication.h b/src/cluster/replication.h
index b223bd6a..8da25713 100644
--- a/src/cluster/replication.h
+++ b/src/cluster/replication.h
@@ -204,9 +204,9 @@ class ReplicationThread : private
EventCallbackBase<ReplicationThread> {
Status fetchFiles(int sock_fd, const std::string &dir, const
std::vector<std::string> &files,
const std::vector<uint32_t> &crcs, const FetchFileCallback
&fn, ssl_st *ssl);
Status parallelFetchFile(const std::string &dir, const
std::vector<std::pair<std::string, uint32_t>> &files);
- static bool isRestoringError(const char *err);
- static bool isWrongPsyncNum(const char *err);
- static bool isUnknownOption(const char *err);
+ static bool isRestoringError(std::string_view err);
+ static bool isWrongPsyncNum(std::string_view err);
+ static bool isUnknownOption(std::string_view err);
Status parseWriteBatch(const std::string &batch_string);
};
diff --git a/src/commands/cmd_replication.cc b/src/commands/cmd_replication.cc
index 5c46e23d..5ec6faa0 100644
--- a/src/commands/cmd_replication.cc
+++ b/src/commands/cmd_replication.cc
@@ -22,6 +22,7 @@
#include "error_constants.h"
#include "io_util.h"
#include "scope_exit.h"
+#include "server/redis_reply.h"
#include "server/server.h"
#include "thread_util.h"
#include "time_util.h"
@@ -101,7 +102,7 @@ class CommandPSync : public Commander {
srv->stats.IncrPSyncOKCount();
s = srv->AddSlave(conn, next_repl_seq_);
if (!s.IsOK()) {
- std::string err = "-ERR " + s.Msg() + "\r\n";
+ std::string err = redis::Error(s.Msg());
s = util::SockSend(conn->GetFD(), err, conn->GetBufferEvent());
if (!s.IsOK()) {
LOG(WARNING) << "failed to send error message to the replica: " <<
s.Msg();
@@ -229,7 +230,7 @@ class CommandFetchMeta : public Commander {
std::string files;
auto s =
engine::Storage::ReplDataManager::GetFullReplDataInfo(srv->storage, &files);
if (!s.IsOK()) {
- s = util::SockSend(repl_fd, "-ERR can't create db checkpoint", bev);
+ s = util::SockSend(repl_fd, redis::Error("can't create db
checkpoint"), bev);
if (!s.IsOK()) {
LOG(WARNING) << "[replication] Failed to send error response: " <<
s.Msg();
}
diff --git a/src/commands/cmd_server.cc b/src/commands/cmd_server.cc
index 28c31603..7362e298 100644
--- a/src/commands/cmd_server.cc
+++ b/src/commands/cmd_server.cc
@@ -29,6 +29,7 @@
#include "config/config.h"
#include "error_constants.h"
#include "server/redis_connection.h"
+#include "server/redis_reply.h"
#include "server/server.h"
#include "stats/disk_stats.h"
#include "storage/rdb.h"
@@ -740,7 +741,8 @@ class CommandHello final : public Commander {
// kvrocks only supports REPL2 by now, but for supporting some
// `hello 3`, it will not report error when using 3.
if (protocol < 2 || protocol > 3) {
- return {Status::NotOK, "-NOPROTO unsupported protocol version"};
+ conn->Reply(redis::Error("NOPROTO unsupported protocol version"));
+ return Status::OK();
}
}
diff --git a/src/commands/error_constants.h b/src/commands/error_constants.h
index 43c7440d..2074a705 100644
--- a/src/commands/error_constants.h
+++ b/src/commands/error_constants.h
@@ -43,5 +43,7 @@ inline constexpr const char *errValueIsNotFloat = "value is
not a valid float";
inline constexpr const char *errNoMatchingScript = "NOSCRIPT No matching
script. Please use EVAL";
inline constexpr const char *errUnknownOption = "unknown option";
inline constexpr const char *errUnknownSubcommandOrWrongArguments = "Unknown
subcommand or wrong number of arguments";
+inline constexpr const char *errWrongNumArguments = "ERR wrong number of
arguments";
+inline constexpr const char *errRestoringBackup = "LOADING kvrocks is
restoring the db from backup";
} // namespace redis
diff --git a/src/common/event_util.h b/src/common/event_util.h
index fccdfd53..f3c83012 100644
--- a/src/common/event_util.h
+++ b/src/common/event_util.h
@@ -22,6 +22,7 @@
#include <cstdlib>
#include <memory>
+#include <string_view>
#include <utility>
#include "event2/buffer.h"
@@ -44,6 +45,8 @@ struct UniqueEvbufReadln : UniqueFreePtr<char[]> {
: UniqueFreePtr(evbuffer_readln(buffer, &length, eol_style)) {}
size_t length;
+
+ std::string_view View() { return {get(), length}; }
};
using StaticEvbufFree = StaticFunction<decltype(evbuffer_free), evbuffer_free>;
diff --git a/src/server/redis_connection.cc b/src/server/redis_connection.cc
index 4a84f0a7..536c4d20 100644
--- a/src/server/redis_connection.cc
+++ b/src/server/redis_connection.cc
@@ -26,7 +26,9 @@
#include <shared_mutex>
#include "commands/commander.h"
+#include "commands/error_constants.h"
#include "fmt/format.h"
+#include "server/redis_reply.h"
#include "string_util.h"
#ifdef ENABLE_OPENSSL
#include <event2/bufferevent_ssl.h>
@@ -138,7 +140,7 @@ std::string Connection::Bool(bool b) const {
}
std::string Connection::MultiBulkString(const std::vector<std::string>
&values) const {
- std::string result = "*" + std::to_string(values.size()) + CRLF;
+ std::string result = MultiLen(values.size());
for (const auto &value : values) {
if (value.empty()) {
result += NilString();
@@ -151,7 +153,7 @@ std::string Connection::MultiBulkString(const
std::vector<std::string> &values)
std::string Connection::MultiBulkString(const std::vector<std::string> &values,
const std::vector<rocksdb::Status>
&statuses) const {
- std::string result = "*" + std::to_string(values.size()) + CRLF;
+ std::string result = MultiLen(values.size());
for (size_t i = 0; i < values.size(); i++) {
if (i < statuses.size() && !statuses[i].ok()) {
result += NilString();
@@ -470,7 +472,7 @@ void Connection::ExecuteCommands(std::deque<CommandTokens>
*to_process_cmds) {
}
if (srv_->IsLoading() && !(cmd_flags & kCmdLoading)) {
- Reply(redis::Error("LOADING kvrocks is restoring the db from backup"));
+ Reply(redis::Error(errRestoringBackup));
if (is_multi_exec) multi_error_ = true;
continue;
}
diff --git a/src/server/redis_reply.cc b/src/server/redis_reply.cc
index 95bbf9fd..d2143b6a 100644
--- a/src/server/redis_reply.cc
+++ b/src/server/redis_reply.cc
@@ -34,7 +34,7 @@ std::string BulkString(const std::string &data) { return "$"
+ std::to_string(da
std::string Array(const std::vector<std::string> &list) {
size_t n = std::accumulate(list.begin(), list.end(), 0, [](size_t n, const
std::string &s) { return n + s.size(); });
- std::string result = "*" + std::to_string(list.size()) + CRLF;
+ std::string result = MultiLen(list.size());
std::string::size_type final_size = result.size() + n;
result.reserve(final_size);
for (const auto &i : list) result += i;
@@ -42,7 +42,7 @@ std::string Array(const std::vector<std::string> &list) {
}
std::string ArrayOfBulkStrings(const std::vector<std::string> &elems) {
- std::string result = "*" + std::to_string(elems.size()) + CRLF;
+ std::string result = MultiLen(elems.size());
for (const auto &elem : elems) {
result += BulkString(elem);
}
diff --git a/src/server/redis_reply.h b/src/server/redis_reply.h
index 213a8bc0..c7fd3b2a 100644
--- a/src/server/redis_reply.h
+++ b/src/server/redis_reply.h
@@ -25,7 +25,9 @@
#include <string>
#include <vector>
-#define CRLF "\r\n" // NOLINT
+#define CRLF "\r\n" // NOLINT
+#define RESP_PREFIX_ERROR "-" // NOLINT
+#define RESP_PREFIX_SIMPLE_STRING "+" // NOLINT
namespace redis {
diff --git a/tests/gocase/unit/hello/hello_test.go
b/tests/gocase/unit/hello/hello_test.go
index 24f8baf3..5b6eea35 100644
--- a/tests/gocase/unit/hello/hello_test.go
+++ b/tests/gocase/unit/hello/hello_test.go
@@ -38,7 +38,7 @@ func TestHello(t *testing.T) {
t.Run("hello with wrong protocol", func(t *testing.T) {
r := rdb.Do(ctx, "HELLO", "1")
- require.ErrorContains(t, r.Err(), "-NOPROTO unsupported
protocol version")
+ require.ErrorContains(t, r.Err(), "NOPROTO unsupported protocol
version")
})
t.Run("hello with protocol 2", func(t *testing.T) {
@@ -61,7 +61,7 @@ func TestHello(t *testing.T) {
t.Run("hello with wrong protocol", func(t *testing.T) {
r := rdb.Do(ctx, "HELLO", "5")
- require.ErrorContains(t, r.Err(), "-NOPROTO unsupported
protocol version")
+ require.ErrorContains(t, r.Err(), "NOPROTO unsupported protocol
version")
})
t.Run("hello with non protocol", func(t *testing.T) {