This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 5e99b276 Improve RESP handling code in replication (#2334)
5e99b276 is described below

commit 5e99b276e5983acc142a7606fd62445b4914b41a
Author: Twice <[email protected]>
AuthorDate: Wed May 29 21:17:50 2024 +0900

    Improve RESP handling code in replication (#2334)
---
 src/cluster/replication.cc            | 33 ++++++++++++++++++---------------
 src/cluster/replication.h             |  6 +++---
 src/commands/cmd_replication.cc       |  5 +++--
 src/commands/cmd_server.cc            |  4 +++-
 src/commands/error_constants.h        |  2 ++
 src/common/event_util.h               |  3 +++
 src/server/redis_connection.cc        |  8 +++++---
 src/server/redis_reply.cc             |  4 ++--
 src/server/redis_reply.h              |  4 +++-
 tests/gocase/unit/hello/hello_test.go |  4 ++--
 10 files changed, 44 insertions(+), 29 deletions(-)

diff --git a/src/cluster/replication.cc b/src/cluster/replication.cc
index ea9c5587..a3567284 100644
--- a/src/cluster/replication.cc
+++ b/src/cluster/replication.cc
@@ -33,6 +33,7 @@
 #include <string>
 #include <thread>
 
+#include "commands/error_constants.h"
 #include "event_util.h"
 #include "fmt/format.h"
 #include "io_util.h"
@@ -402,13 +403,13 @@ ReplicationThread::CBState 
ReplicationThread::authWriteCB(bufferevent *bev) {
   return CBState::NEXT;
 }
 
-inline bool ResponseLineIsOK(const char *line) { return strncmp(line, "+OK", 
3) == 0; }
+inline bool ResponseLineIsOK(std::string_view line) { return line == 
RESP_PREFIX_SIMPLE_STRING "OK"; }
 
 ReplicationThread::CBState ReplicationThread::authReadCB(bufferevent *bev) {  
// NOLINT
   auto input = bufferevent_get_input(bev);
   UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
   if (!line) return CBState::AGAIN;
-  if (!ResponseLineIsOK(line.get())) {
+  if (!ResponseLineIsOK(line.View())) {
     // Auth failed
     LOG(ERROR) << "[replication] Auth failed: " << line.get();
     return CBState::RESTART;
@@ -430,7 +431,7 @@ ReplicationThread::CBState 
ReplicationThread::checkDBNameReadCB(bufferevent *bev
   if (!line) return CBState::AGAIN;
 
   if (line[0] == '-') {
-    if (isRestoringError(line.get())) {
+    if (isRestoringError(line.View())) {
       LOG(WARNING) << "The master was restoring the db, retry later";
     } else {
       LOG(ERROR) << "Failed to get the db name, " << line.get();
@@ -468,18 +469,18 @@ ReplicationThread::CBState 
ReplicationThread::replConfReadCB(bufferevent *bev) {
   if (!line) return CBState::AGAIN;
 
   // on unknown option: first try without announce ip, if it fails again - do 
nothing (to prevent infinite loop)
-  if (isUnknownOption(line.get()) && !next_try_without_announce_ip_address_) {
+  if (isUnknownOption(line.View()) && !next_try_without_announce_ip_address_) {
     next_try_without_announce_ip_address_ = true;
     LOG(WARNING) << "The old version master, can't handle ip-address, "
                  << "try without it again";
     // Retry previous state, i.e. send replconf again
     return CBState::PREV;
   }
-  if (line[0] == '-' && isRestoringError(line.get())) {
+  if (line[0] == '-' && isRestoringError(line.View())) {
     LOG(WARNING) << "The master was restoring the db, retry later";
     return CBState::RESTART;
   }
-  if (!ResponseLineIsOK(line.get())) {
+  if (!ResponseLineIsOK(line.View())) {
     LOG(WARNING) << "[replication] Failed to replconf: " << line.get() + 1;
     //  backward compatible with old version that doesn't support replconf cmd
     return CBState::NEXT;
@@ -530,12 +531,12 @@ ReplicationThread::CBState 
ReplicationThread::tryPSyncReadCB(bufferevent *bev) {
   UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
   if (!line) return CBState::AGAIN;
 
-  if (line[0] == '-' && isRestoringError(line.get())) {
+  if (line[0] == '-' && isRestoringError(line.View())) {
     LOG(WARNING) << "The master was restoring the db, retry later";
     return CBState::RESTART;
   }
 
-  if (line[0] == '-' && isWrongPsyncNum(line.get())) {
+  if (line[0] == '-' && isWrongPsyncNum(line.View())) {
     next_try_old_psync_ = true;
     LOG(WARNING) << "The old version master, can't handle new PSYNC, "
                  << "try old PSYNC again";
@@ -543,7 +544,7 @@ ReplicationThread::CBState 
ReplicationThread::tryPSyncReadCB(bufferevent *bev) {
     return CBState::PREV;
   }
 
-  if (!ResponseLineIsOK(line.get())) {
+  if (!ResponseLineIsOK(line.View())) {
     // PSYNC isn't OK, we should use FullSync
     // Switch to fullsync state machine
     fullsync_steps_.Start();
@@ -844,7 +845,7 @@ Status ReplicationThread::sendAuth(int sock_fd, ssl_st 
*ssl) {
       }
       UniqueEvbufReadln line(evbuf.get(), EVBUFFER_EOL_CRLF_STRICT);
       if (!line) continue;
-      if (!ResponseLineIsOK(line.get())) {
+      if (!ResponseLineIsOK(line.View())) {
         return {Status::NotOK, "auth got invalid response"};
       }
       break;
@@ -998,15 +999,17 @@ Status ReplicationThread::parseWriteBatch(const 
std::string &batch_string) {
   return Status::OK();
 }
 
-bool ReplicationThread::isRestoringError(const char *err) {
-  return std::string(err) == "-ERR restoring the db from backup";
+bool ReplicationThread::isRestoringError(std::string_view err) {
+  return err == std::string(RESP_PREFIX_ERROR) + redis::errRestoringBackup;
 }
 
-bool ReplicationThread::isWrongPsyncNum(const char *err) {
-  return std::string(err) == "-ERR wrong number of arguments";
+bool ReplicationThread::isWrongPsyncNum(std::string_view err) {
+  return err == std::string(RESP_PREFIX_ERROR) + redis::errWrongNumArguments;
 }
 
-bool ReplicationThread::isUnknownOption(const char *err) { return 
std::string(err) == "-ERR unknown option"; }
+bool ReplicationThread::isUnknownOption(std::string_view err) {
+  return err == fmt::format("{}ERR {}", RESP_PREFIX_ERROR, 
redis::errUnknownOption);
+}
 
 rocksdb::Status WriteBatchHandler::PutCF(uint32_t column_family_id, const 
rocksdb::Slice &key,
                                          const rocksdb::Slice &value) {
diff --git a/src/cluster/replication.h b/src/cluster/replication.h
index b223bd6a..8da25713 100644
--- a/src/cluster/replication.h
+++ b/src/cluster/replication.h
@@ -204,9 +204,9 @@ class ReplicationThread : private 
EventCallbackBase<ReplicationThread> {
   Status fetchFiles(int sock_fd, const std::string &dir, const 
std::vector<std::string> &files,
                     const std::vector<uint32_t> &crcs, const FetchFileCallback 
&fn, ssl_st *ssl);
   Status parallelFetchFile(const std::string &dir, const 
std::vector<std::pair<std::string, uint32_t>> &files);
-  static bool isRestoringError(const char *err);
-  static bool isWrongPsyncNum(const char *err);
-  static bool isUnknownOption(const char *err);
+  static bool isRestoringError(std::string_view err);
+  static bool isWrongPsyncNum(std::string_view err);
+  static bool isUnknownOption(std::string_view err);
 
   Status parseWriteBatch(const std::string &batch_string);
 };
diff --git a/src/commands/cmd_replication.cc b/src/commands/cmd_replication.cc
index 5c46e23d..5ec6faa0 100644
--- a/src/commands/cmd_replication.cc
+++ b/src/commands/cmd_replication.cc
@@ -22,6 +22,7 @@
 #include "error_constants.h"
 #include "io_util.h"
 #include "scope_exit.h"
+#include "server/redis_reply.h"
 #include "server/server.h"
 #include "thread_util.h"
 #include "time_util.h"
@@ -101,7 +102,7 @@ class CommandPSync : public Commander {
     srv->stats.IncrPSyncOKCount();
     s = srv->AddSlave(conn, next_repl_seq_);
     if (!s.IsOK()) {
-      std::string err = "-ERR " + s.Msg() + "\r\n";
+      std::string err = redis::Error(s.Msg());
       s = util::SockSend(conn->GetFD(), err, conn->GetBufferEvent());
       if (!s.IsOK()) {
         LOG(WARNING) << "failed to send error message to the replica: " << 
s.Msg();
@@ -229,7 +230,7 @@ class CommandFetchMeta : public Commander {
       std::string files;
       auto s = 
engine::Storage::ReplDataManager::GetFullReplDataInfo(srv->storage, &files);
       if (!s.IsOK()) {
-        s = util::SockSend(repl_fd, "-ERR can't create db checkpoint", bev);
+        s = util::SockSend(repl_fd, redis::Error("can't create db 
checkpoint"), bev);
         if (!s.IsOK()) {
           LOG(WARNING) << "[replication] Failed to send error response: " << 
s.Msg();
         }
diff --git a/src/commands/cmd_server.cc b/src/commands/cmd_server.cc
index 28c31603..7362e298 100644
--- a/src/commands/cmd_server.cc
+++ b/src/commands/cmd_server.cc
@@ -29,6 +29,7 @@
 #include "config/config.h"
 #include "error_constants.h"
 #include "server/redis_connection.h"
+#include "server/redis_reply.h"
 #include "server/server.h"
 #include "stats/disk_stats.h"
 #include "storage/rdb.h"
@@ -740,7 +741,8 @@ class CommandHello final : public Commander {
       // kvrocks only supports REPL2 by now, but for supporting some
       // `hello 3`, it will not report error when using 3.
       if (protocol < 2 || protocol > 3) {
-        return {Status::NotOK, "-NOPROTO unsupported protocol version"};
+        conn->Reply(redis::Error("NOPROTO unsupported protocol version"));
+        return Status::OK();
       }
     }
 
diff --git a/src/commands/error_constants.h b/src/commands/error_constants.h
index 43c7440d..2074a705 100644
--- a/src/commands/error_constants.h
+++ b/src/commands/error_constants.h
@@ -43,5 +43,7 @@ inline constexpr const char *errValueIsNotFloat = "value is 
not a valid float";
 inline constexpr const char *errNoMatchingScript = "NOSCRIPT No matching 
script. Please use EVAL";
 inline constexpr const char *errUnknownOption = "unknown option";
 inline constexpr const char *errUnknownSubcommandOrWrongArguments = "Unknown 
subcommand or wrong number of arguments";
+inline constexpr const char *errWrongNumArguments = "ERR wrong number of 
arguments";
+inline constexpr const char *errRestoringBackup = "LOADING kvrocks is 
restoring the db from backup";
 
 }  // namespace redis
diff --git a/src/common/event_util.h b/src/common/event_util.h
index fccdfd53..f3c83012 100644
--- a/src/common/event_util.h
+++ b/src/common/event_util.h
@@ -22,6 +22,7 @@
 
 #include <cstdlib>
 #include <memory>
+#include <string_view>
 #include <utility>
 
 #include "event2/buffer.h"
@@ -44,6 +45,8 @@ struct UniqueEvbufReadln : UniqueFreePtr<char[]> {
       : UniqueFreePtr(evbuffer_readln(buffer, &length, eol_style)) {}
 
   size_t length;
+
+  std::string_view View() { return {get(), length}; }
 };
 
 using StaticEvbufFree = StaticFunction<decltype(evbuffer_free), evbuffer_free>;
diff --git a/src/server/redis_connection.cc b/src/server/redis_connection.cc
index 4a84f0a7..536c4d20 100644
--- a/src/server/redis_connection.cc
+++ b/src/server/redis_connection.cc
@@ -26,7 +26,9 @@
 #include <shared_mutex>
 
 #include "commands/commander.h"
+#include "commands/error_constants.h"
 #include "fmt/format.h"
+#include "server/redis_reply.h"
 #include "string_util.h"
 #ifdef ENABLE_OPENSSL
 #include <event2/bufferevent_ssl.h>
@@ -138,7 +140,7 @@ std::string Connection::Bool(bool b) const {
 }
 
 std::string Connection::MultiBulkString(const std::vector<std::string> 
&values) const {
-  std::string result = "*" + std::to_string(values.size()) + CRLF;
+  std::string result = MultiLen(values.size());
   for (const auto &value : values) {
     if (value.empty()) {
       result += NilString();
@@ -151,7 +153,7 @@ std::string Connection::MultiBulkString(const 
std::vector<std::string> &values)
 
 std::string Connection::MultiBulkString(const std::vector<std::string> &values,
                                         const std::vector<rocksdb::Status> 
&statuses) const {
-  std::string result = "*" + std::to_string(values.size()) + CRLF;
+  std::string result = MultiLen(values.size());
   for (size_t i = 0; i < values.size(); i++) {
     if (i < statuses.size() && !statuses[i].ok()) {
       result += NilString();
@@ -470,7 +472,7 @@ void Connection::ExecuteCommands(std::deque<CommandTokens> 
*to_process_cmds) {
     }
 
     if (srv_->IsLoading() && !(cmd_flags & kCmdLoading)) {
-      Reply(redis::Error("LOADING kvrocks is restoring the db from backup"));
+      Reply(redis::Error(errRestoringBackup));
       if (is_multi_exec) multi_error_ = true;
       continue;
     }
diff --git a/src/server/redis_reply.cc b/src/server/redis_reply.cc
index 95bbf9fd..d2143b6a 100644
--- a/src/server/redis_reply.cc
+++ b/src/server/redis_reply.cc
@@ -34,7 +34,7 @@ std::string BulkString(const std::string &data) { return "$" 
+ std::to_string(da
 
 std::string Array(const std::vector<std::string> &list) {
   size_t n = std::accumulate(list.begin(), list.end(), 0, [](size_t n, const 
std::string &s) { return n + s.size(); });
-  std::string result = "*" + std::to_string(list.size()) + CRLF;
+  std::string result = MultiLen(list.size());
   std::string::size_type final_size = result.size() + n;
   result.reserve(final_size);
   for (const auto &i : list) result += i;
@@ -42,7 +42,7 @@ std::string Array(const std::vector<std::string> &list) {
 }
 
 std::string ArrayOfBulkStrings(const std::vector<std::string> &elems) {
-  std::string result = "*" + std::to_string(elems.size()) + CRLF;
+  std::string result = MultiLen(elems.size());
   for (const auto &elem : elems) {
     result += BulkString(elem);
   }
diff --git a/src/server/redis_reply.h b/src/server/redis_reply.h
index 213a8bc0..c7fd3b2a 100644
--- a/src/server/redis_reply.h
+++ b/src/server/redis_reply.h
@@ -25,7 +25,9 @@
 #include <string>
 #include <vector>
 
-#define CRLF "\r\n"  // NOLINT
+#define CRLF "\r\n"                    // NOLINT
+#define RESP_PREFIX_ERROR "-"          // NOLINT
+#define RESP_PREFIX_SIMPLE_STRING "+"  // NOLINT
 
 namespace redis {
 
diff --git a/tests/gocase/unit/hello/hello_test.go 
b/tests/gocase/unit/hello/hello_test.go
index 24f8baf3..5b6eea35 100644
--- a/tests/gocase/unit/hello/hello_test.go
+++ b/tests/gocase/unit/hello/hello_test.go
@@ -38,7 +38,7 @@ func TestHello(t *testing.T) {
 
        t.Run("hello with wrong protocol", func(t *testing.T) {
                r := rdb.Do(ctx, "HELLO", "1")
-               require.ErrorContains(t, r.Err(), "-NOPROTO unsupported 
protocol version")
+               require.ErrorContains(t, r.Err(), "NOPROTO unsupported protocol 
version")
        })
 
        t.Run("hello with protocol 2", func(t *testing.T) {
@@ -61,7 +61,7 @@ func TestHello(t *testing.T) {
 
        t.Run("hello with wrong protocol", func(t *testing.T) {
                r := rdb.Do(ctx, "HELLO", "5")
-               require.ErrorContains(t, r.Err(), "-NOPROTO unsupported 
protocol version")
+               require.ErrorContains(t, r.Err(), "NOPROTO unsupported protocol 
version")
        })
 
        t.Run("hello with non protocol", func(t *testing.T) {

Reply via email to