This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new d1acbcba Implement the RESP3 null for the nil string and array (#2017)
d1acbcba is described below

commit d1acbcbae8693bc895fa5749dd55ddc7458bedc8
Author: hulk <[email protected]>
AuthorDate: Mon Jan 15 18:40:23 2024 +0800

    Implement the RESP3 null for the nil string and array (#2017)
    
    For the null type in RESP2, the nil string is represented by `$-1\r\n`
    and nil array is `*-1\r\n`. But after RESP3, they will be reduced into
    a unified string `_\r\n`. So in this PR, most of them are renamed 
redis::MultiBulkString/Redis::NilString to connection::MultiBulkString and 
connection::NilString.
---
 src/cluster/replication.cc                  | 16 ++---
 src/cluster/slot_migrate.cc                 | 25 ++++----
 src/cluster/sync_migrate_context.cc         |  2 +-
 src/commands/blocking_commander.h           |  6 +-
 src/commands/cmd_bit.cc                     |  2 +-
 src/commands/cmd_bloom_filter.cc            |  4 +-
 src/commands/cmd_geo.cc                     | 28 ++++-----
 src/commands/cmd_hash.cc                    | 20 +++---
 src/commands/cmd_json.cc                    | 58 ++++++++---------
 src/commands/cmd_list.cc                    | 34 +++++-----
 src/commands/cmd_pubsub.cc                  | 33 +++++-----
 src/commands/cmd_server.cc                  | 29 +++++----
 src/commands/cmd_set.cc                     | 16 ++---
 src/commands/cmd_stream.cc                  | 40 ++++++------
 src/commands/cmd_string.cc                  | 18 +++---
 src/commands/cmd_txn.cc                     |  2 +-
 src/commands/cmd_zset.cc                    | 26 ++++----
 src/commands/commander.cc                   |  2 +-
 src/commands/scan_base.h                    |  9 +--
 src/server/redis_connection.cc              | 33 ++++++++++
 src/server/redis_connection.h               |  6 ++
 src/server/redis_reply.cc                   | 34 +++-------
 src/server/redis_reply.h                    | 13 ++--
 src/server/server.cc                        |  4 +-
 src/stats/log_collector.cc                  |  2 +-
 src/storage/batch_extractor.cc              | 12 ++--
 src/storage/scripting.cc                    |  6 +-
 tests/cppunit/string_reply_test.cc          |  2 +-
 tests/gocase/unit/debug/debug_test.go       |  5 ++
 tests/gocase/unit/protocol/protocol_test.go | 98 +++++++++++++++++++++++++++++
 utils/kvrocks2redis/parser.cc               | 18 +++---
 utils/kvrocks2redis/redis_writer.cc         |  2 +-
 32 files changed, 364 insertions(+), 241 deletions(-)

diff --git a/src/cluster/replication.cc b/src/cluster/replication.cc
index c2d17b5a..51e536c7 100644
--- a/src/cluster/replication.cc
+++ b/src/cluster/replication.cc
@@ -396,7 +396,7 @@ void ReplicationThread::run() {
 }
 
 ReplicationThread::CBState ReplicationThread::authWriteCB(bufferevent *bev) {
-  SendString(bev, redis::MultiBulkString({"AUTH", 
srv_->GetConfig()->masterauth}));
+  SendString(bev, redis::ArrayOfBulkStrings({"AUTH", 
srv_->GetConfig()->masterauth}));
   LOG(INFO) << "[replication] Auth request was sent, waiting for response";
   repl_state_.store(kReplSendAuth, std::memory_order_relaxed);
   return CBState::NEXT;
@@ -418,7 +418,7 @@ ReplicationThread::CBState 
ReplicationThread::authReadCB(bufferevent *bev) {  //
 }
 
 ReplicationThread::CBState ReplicationThread::checkDBNameWriteCB(bufferevent 
*bev) {
-  SendString(bev, redis::MultiBulkString({"_db_name"}));
+  SendString(bev, redis::ArrayOfBulkStrings({"_db_name"}));
   repl_state_.store(kReplCheckDBName, std::memory_order_relaxed);
   LOG(INFO) << "[replication] Check db name request was sent, waiting for 
response";
   return CBState::NEXT;
@@ -456,7 +456,7 @@ ReplicationThread::CBState 
ReplicationThread::replConfWriteCB(bufferevent *bev)
     data_to_send.emplace_back("ip-address");
     data_to_send.emplace_back(config->replica_announce_ip);
   }
-  SendString(bev, redis::MultiBulkString(data_to_send));
+  SendString(bev, redis::ArrayOfBulkStrings(data_to_send));
   repl_state_.store(kReplReplConf, std::memory_order_relaxed);
   LOG(INFO) << "[replication] replconf request was sent, waiting for response";
   return CBState::NEXT;
@@ -513,11 +513,11 @@ ReplicationThread::CBState 
ReplicationThread::tryPSyncWriteCB(bufferevent *bev)
   // Also use old PSYNC if replica can't find replication id from WAL and DB.
   if (!srv_->GetConfig()->use_rsid_psync || next_try_old_psync_ || 
replid.length() != kReplIdLength) {
     next_try_old_psync_ = false;  // Reset next_try_old_psync_
-    SendString(bev, redis::MultiBulkString({"PSYNC", 
std::to_string(next_seq)}));
+    SendString(bev, redis::ArrayOfBulkStrings({"PSYNC", 
std::to_string(next_seq)}));
     LOG(INFO) << "[replication] Try to use psync, next seq: " << next_seq;
   } else {
     // NEW PSYNC "Unique Replication Sequence ID": replication id and sequence 
id
-    SendString(bev, redis::MultiBulkString({"PSYNC", replid, 
std::to_string(next_seq)}));
+    SendString(bev, redis::ArrayOfBulkStrings({"PSYNC", replid, 
std::to_string(next_seq)}));
     LOG(INFO) << "[replication] Try to use new psync, current unique 
replication sequence id: " << replid << ":"
               << cur_seq;
   }
@@ -607,7 +607,7 @@ ReplicationThread::CBState 
ReplicationThread::incrementBatchLoopCB(bufferevent *
 }
 
 ReplicationThread::CBState ReplicationThread::fullSyncWriteCB(bufferevent 
*bev) {
-  SendString(bev, redis::MultiBulkString({"_fetch_meta"}));
+  SendString(bev, redis::ArrayOfBulkStrings({"_fetch_meta"}));
   repl_state_.store(kReplFetchMeta, std::memory_order_relaxed);
   LOG(INFO) << "[replication] Start syncing data with fullsync";
   return CBState::NEXT;
@@ -835,7 +835,7 @@ Status ReplicationThread::sendAuth(int sock_fd, ssl_st 
*ssl) {
   std::string auth = srv_->GetConfig()->masterauth;
   if (!auth.empty()) {
     UniqueEvbuf evbuf;
-    const auto auth_command = redis::MultiBulkString({"AUTH", auth});
+    const auto auth_command = redis::ArrayOfBulkStrings({"AUTH", auth});
     auto s = util::SockSend(sock_fd, auth_command, ssl);
     if (!s.IsOK()) return s.Prefixed("send auth command err");
     while (true) {
@@ -921,7 +921,7 @@ Status ReplicationThread::fetchFiles(int sock_fd, const 
std::string &dir, const
   }
   files_str.pop_back();
 
-  const auto fetch_command = redis::MultiBulkString({"_fetch_file", 
files_str});
+  const auto fetch_command = redis::ArrayOfBulkStrings({"_fetch_file", 
files_str});
   auto s = util::SockSend(sock_fd, fetch_command, ssl);
   if (!s.IsOK()) return s.Prefixed("send fetch file command");
 
diff --git a/src/cluster/slot_migrate.cc b/src/cluster/slot_migrate.cc
index 09b220ae..424d0b4c 100644
--- a/src/cluster/slot_migrate.cc
+++ b/src/cluster/slot_migrate.cc
@@ -438,7 +438,7 @@ void SlotMigrator::clean() {
 }
 
 Status SlotMigrator::authOnDstNode(int sock_fd, const std::string &password) {
-  std::string cmd = redis::MultiBulkString({"auth", password}, false);
+  std::string cmd = redis::ArrayOfBulkStrings({"auth", password});
   auto s = util::SockSend(sock_fd, cmd);
   if (!s.IsOK()) {
     return s.Prefixed("failed to send AUTH command");
@@ -456,7 +456,7 @@ Status SlotMigrator::setImportStatusOnDstNode(int sock_fd, 
int status) {
   if (sock_fd <= 0) return {Status::NotOK, "invalid socket descriptor"};
 
   std::string cmd =
-      redis::MultiBulkString({"cluster", "import", 
std::to_string(migrating_slot_), std::to_string(status)});
+      redis::ArrayOfBulkStrings({"cluster", "import", 
std::to_string(migrating_slot_), std::to_string(status)});
   auto s = util::SockSend(sock_fd, cmd);
   if (!s.IsOK()) {
     return s.Prefixed("failed to send command to the destination node");
@@ -666,7 +666,7 @@ Status SlotMigrator::migrateSimpleKey(const rocksdb::Slice 
&key, const Metadata
     command.emplace_back("PXAT");
     command.emplace_back(std::to_string(metadata.expire));
   }
-  *restore_cmds += redis::MultiBulkString(command, false);
+  *restore_cmds += redis::ArrayOfBulkStrings(command);
   current_pipeline_size_++;
 
   // Check whether pipeline needs to be sent
@@ -747,7 +747,7 @@ Status SlotMigrator::migrateComplexKey(const rocksdb::Slice 
&key, const Metadata
     if (metadata.Type() != kRedisBitmap) {
       item_count++;
       if (item_count >= kMaxItemsInCommand) {
-        *restore_cmds += redis::MultiBulkString(user_cmd, false);
+        *restore_cmds += redis::ArrayOfBulkStrings(user_cmd);
         current_pipeline_size_++;
         item_count = 0;
         // Have to clear saved items
@@ -764,13 +764,13 @@ Status SlotMigrator::migrateComplexKey(const 
rocksdb::Slice &key, const Metadata
 
   // Have to check the item count of the last command list
   if (item_count % kMaxItemsInCommand != 0) {
-    *restore_cmds += redis::MultiBulkString(user_cmd, false);
+    *restore_cmds += redis::ArrayOfBulkStrings(user_cmd);
     current_pipeline_size_++;
   }
 
   // Add TTL for complex key
   if (metadata.expire > 0) {
-    *restore_cmds += redis::MultiBulkString({"PEXPIREAT", key.ToString(), 
std::to_string(metadata.expire)}, false);
+    *restore_cmds += redis::ArrayOfBulkStrings({"PEXPIREAT", key.ToString(), 
std::to_string(metadata.expire)});
     current_pipeline_size_++;
   }
 
@@ -809,7 +809,7 @@ Status SlotMigrator::migrateStream(const Slice &key, const 
StreamMetadata &metad
     if (!s.IsOK()) {
       return s;
     }
-    *restore_cmds += redis::MultiBulkString(user_cmd, false);
+    *restore_cmds += redis::ArrayOfBulkStrings(user_cmd);
     current_pipeline_size_++;
 
     user_cmd.erase(user_cmd.begin() + 2, user_cmd.end());
@@ -822,15 +822,14 @@ Status SlotMigrator::migrateStream(const Slice &key, 
const StreamMetadata &metad
 
   // commands like XTRIM and XDEL affect stream's metadata, but we use only 
XADD for a slot migration
   // XSETID is used to adjust stream's info on the destination node according 
to the current values on the source
-  *restore_cmds += redis::MultiBulkString(
-      {"XSETID", key.ToString(), metadata.last_generated_id.ToString(), 
"ENTRIESADDED",
-       std::to_string(metadata.entries_added), "MAXDELETEDID", 
metadata.max_deleted_entry_id.ToString()},
-      false);
+  *restore_cmds += redis::ArrayOfBulkStrings({"XSETID", key.ToString(), 
metadata.last_generated_id.ToString(),
+                                              "ENTRIESADDED", 
std::to_string(metadata.entries_added), "MAXDELETEDID",
+                                              
metadata.max_deleted_entry_id.ToString()});
   current_pipeline_size_++;
 
   // Add TTL
   if (metadata.expire > 0) {
-    *restore_cmds += redis::MultiBulkString({"PEXPIREAT", key.ToString(), 
std::to_string(metadata.expire)}, false);
+    *restore_cmds += redis::ArrayOfBulkStrings({"PEXPIREAT", key.ToString(), 
std::to_string(metadata.expire)});
     current_pipeline_size_++;
   }
 
@@ -862,7 +861,7 @@ Status SlotMigrator::migrateBitmapKey(const InternalKey 
&inkey, std::unique_ptr<
           uint32_t offset = (index * 8) + (byte_idx * 8) + bit_idx;
           user_cmd->emplace_back(std::to_string(offset));
           user_cmd->emplace_back("1");
-          *restore_cmds += redis::MultiBulkString(*user_cmd, false);
+          *restore_cmds += redis::ArrayOfBulkStrings(*user_cmd);
           current_pipeline_size_++;
           user_cmd->erase(user_cmd->begin() + 2, user_cmd->end());
         }
diff --git a/src/cluster/sync_migrate_context.cc 
b/src/cluster/sync_migrate_context.cc
index 0633cecd..3ba2806c 100644
--- a/src/cluster/sync_migrate_context.cc
+++ b/src/cluster/sync_migrate_context.cc
@@ -54,7 +54,7 @@ void SyncMigrateContext::OnEvent(bufferevent *bev, int16_t 
events) {
 void SyncMigrateContext::TimerCB(int, int16_t events) {
   auto &&slot_migrator = srv_->slot_migrator;
 
-  conn_->Reply(redis::NilString());
+  conn_->Reply(conn_->NilString());
   timer_.reset();
 
   slot_migrator->CancelSyncCtx();
diff --git a/src/commands/blocking_commander.h 
b/src/commands/blocking_commander.h
index 537e770f..05883a8a 100644
--- a/src/commands/blocking_commander.h
+++ b/src/commands/blocking_commander.h
@@ -31,7 +31,7 @@ class BlockingCommander : public Commander,
                           private EventCallbackBase<BlockingCommander> {
  public:
   // method to reply when no operation happens
-  virtual std::string NoopReply() = 0;
+  virtual std::string NoopReply(const Connection *conn) = 0;
 
   // method to block keys
   virtual void BlockKeys() = 0;
@@ -48,7 +48,7 @@ class BlockingCommander : public Commander,
   // usually put to the end of the Execute method
   Status StartBlocking(int64_t timeout, std::string *output) {
     if (conn_->IsInExec()) {
-      *output = NoopReply();
+      *output = NoopReply(conn_);
       return Status::OK();  // no blocking in multi-exec
     }
 
@@ -111,7 +111,7 @@ class BlockingCommander : public Commander,
   }
 
   void TimerCB(int, int16_t) {
-    conn_->Reply(NoopReply());
+    conn_->Reply(NoopReply(conn_));
     timer_.reset();
     UnblockKeys();
     auto bev = conn_->GetBufferEvent();
diff --git a/src/commands/cmd_bit.cc b/src/commands/cmd_bit.cc
index 6e9fe5ba..65e90d04 100644
--- a/src/commands/cmd_bit.cc
+++ b/src/commands/cmd_bit.cc
@@ -342,7 +342,7 @@ class CommandBitfield : public Commander {
           str_rets[i] = redis::Integer(rets[i]->Value());
         }
       } else {
-        str_rets[i] = redis::NilString();
+        str_rets[i] = conn->NilString();
       }
     }
     *output = redis::Array(str_rets);
diff --git a/src/commands/cmd_bloom_filter.cc b/src/commands/cmd_bloom_filter.cc
index e4316a3c..f33979e3 100644
--- a/src/commands/cmd_bloom_filter.cc
+++ b/src/commands/cmd_bloom_filter.cc
@@ -345,7 +345,7 @@ class CommandBFInfo : public Commander {
         *output += redis::SimpleString("Number of items inserted");
         *output += redis::Integer(info.size);
         *output += redis::SimpleString("Expansion rate");
-        *output += info.expansion == 0 ? redis::NilString() : 
redis::Integer(info.expansion);
+        *output += info.expansion == 0 ? conn->NilString() : 
redis::Integer(info.expansion);
         break;
       case BloomInfoType::kCapacity:
         *output = redis::Integer(info.capacity);
@@ -360,7 +360,7 @@ class CommandBFInfo : public Commander {
         *output = redis::Integer(info.size);
         break;
       case BloomInfoType::kExpansion:
-        *output = info.expansion == 0 ? redis::NilString() : 
redis::Integer(info.expansion);
+        *output = info.expansion == 0 ? conn->NilString() : 
redis::Integer(info.expansion);
         break;
     }
 
diff --git a/src/commands/cmd_geo.cc b/src/commands/cmd_geo.cc
index 39854368..0f4d98eb 100644
--- a/src/commands/cmd_geo.cc
+++ b/src/commands/cmd_geo.cc
@@ -148,7 +148,7 @@ class CommandGeoDist : public CommandGeoBase {
     }
 
     if (s.IsNotFound()) {
-      *output = redis::NilString();
+      *output = conn->NilString();
     } else {
       *output = 
redis::BulkString(util::Float2String(GetDistanceByUnit(distance)));
     }
@@ -177,7 +177,7 @@ class CommandGeoHash : public Commander {
       hashes.resize(members_.size(), "");
     }
 
-    *output = redis::MultiBulkString(hashes);
+    *output = conn->MultiBulkString(hashes);
     return Status::OK();
   }
 
@@ -206,16 +206,16 @@ class CommandGeoPos : public Commander {
 
     if (s.IsNotFound()) {
       list.resize(members_.size(), "");
-      *output = redis::MultiBulkString(list);
+      *output = conn->MultiBulkString(list);
       return Status::OK();
     }
 
     for (const auto &member : members_) {
       auto iter = geo_points.find(member.ToString());
       if (iter == geo_points.end()) {
-        list.emplace_back(redis::NilString());
+        list.emplace_back(conn->NilString());
       } else {
-        list.emplace_back(redis::MultiBulkString(
+        list.emplace_back(conn->MultiBulkString(
             {util::Float2String(iter->second.longitude), 
util::Float2String(iter->second.latitude)}));
       }
     }
@@ -314,12 +314,12 @@ class CommandGeoRadius : public CommandGeoBase {
     if (store_key_.size() != 0) {
       *output = redis::Integer(geo_points.size());
     } else {
-      *output = GenerateOutput(geo_points);
+      *output = GenerateOutput(conn, geo_points);
     }
     return Status::OK();
   }
 
-  std::string GenerateOutput(const std::vector<GeoPoint> &geo_points) {
+  std::string GenerateOutput(const Connection *conn, const 
std::vector<GeoPoint> &geo_points) {
     int result_length = static_cast<int>(geo_points.size());
     int returned_items_count = (count_ == 0 || result_length < count_) ? 
result_length : count_;
     std::vector<std::string> list;
@@ -337,8 +337,8 @@ class CommandGeoRadius : public CommandGeoBase {
           
one.emplace_back(redis::BulkString(util::Float2String(geo_point.score)));
         }
         if (with_coord_) {
-          one.emplace_back(redis::MultiBulkString(
-              {util::Float2String(geo_point.longitude), 
util::Float2String(geo_point.latitude)}));
+          one.emplace_back(
+              conn->MultiBulkString({util::Float2String(geo_point.longitude), 
util::Float2String(geo_point.latitude)}));
         }
         list.emplace_back(redis::Array(one));
       }
@@ -440,7 +440,7 @@ class CommandGeoSearch : public CommandGeoBase {
     if (!s.ok()) {
       return {Status::RedisExecErr, s.ToString()};
     }
-    *output = generateOutput(geo_points);
+    *output = generateOutput(conn, geo_points);
 
     return Status::OK();
   }
@@ -496,7 +496,7 @@ class CommandGeoSearch : public CommandGeoBase {
     return Status::OK();
   }
 
-  std::string generateOutput(const std::vector<GeoPoint> &geo_points) {
+  std::string generateOutput(const Connection *conn, const 
std::vector<GeoPoint> &geo_points) {
     int result_length = static_cast<int>(geo_points.size());
     int returned_items_count = (count_ == 0 || result_length < count_) ? 
result_length : count_;
     std::vector<std::string> output;
@@ -515,8 +515,8 @@ class CommandGeoSearch : public CommandGeoBase {
           
one.emplace_back(redis::BulkString(util::Float2String(geo_point.score)));
         }
         if (with_coord_) {
-          one.emplace_back(redis::MultiBulkString(
-              {util::Float2String(geo_point.longitude), 
util::Float2String(geo_point.latitude)}));
+          one.emplace_back(
+              conn->MultiBulkString({util::Float2String(geo_point.longitude), 
util::Float2String(geo_point.latitude)}));
         }
         output.emplace_back(redis::Array(one));
       }
@@ -644,7 +644,7 @@ class CommandGeoRadiusByMember : public CommandGeoRadius {
     if (store_key_.size() != 0) {
       *output = redis::Integer(geo_points.size());
     } else {
-      *output = GenerateOutput(geo_points);
+      *output = GenerateOutput(conn, geo_points);
     }
 
     return Status::OK();
diff --git a/src/commands/cmd_hash.cc b/src/commands/cmd_hash.cc
index 7ae90191..8b9526f9 100644
--- a/src/commands/cmd_hash.cc
+++ b/src/commands/cmd_hash.cc
@@ -37,7 +37,7 @@ class CommandHGet : public Commander {
       return {Status::RedisExecErr, s.ToString()};
     }
 
-    *output = s.IsNotFound() ? redis::NilString() : redis::BulkString(value);
+    *output = s.IsNotFound() ? conn->NilString() : redis::BulkString(value);
     return Status::OK();
   }
 };
@@ -208,9 +208,9 @@ class CommandHMGet : public Commander {
 
     if (s.IsNotFound()) {
       values.resize(fields.size(), "");
-      *output = redis::MultiBulkString(values);
+      *output = conn->MultiBulkString(values);
     } else {
-      *output = redis::MultiBulkString(values, statuses);
+      *output = conn->MultiBulkString(values, statuses);
     }
     return Status::OK();
   }
@@ -263,7 +263,7 @@ class CommandHKeys : public Commander {
     for (const auto &fv : field_values) {
       keys.emplace_back(fv.field);
     }
-    *output = redis::MultiBulkString(keys);
+    *output = conn->MultiBulkString(keys);
 
     return Status::OK();
   }
@@ -284,7 +284,7 @@ class CommandHVals : public Commander {
     for (const auto &p : field_values) {
       values.emplace_back(p.value);
     }
-    *output = MultiBulkString(values, false);
+    *output = conn->MultiBulkString(values, false);
 
     return Status::OK();
   }
@@ -306,7 +306,7 @@ class CommandHGetAll : public Commander {
       kv_pairs.emplace_back(p.field);
       kv_pairs.emplace_back(p.value);
     }
-    *output = MultiBulkString(kv_pairs, false);
+    *output = conn->MultiBulkString(kv_pairs, false);
 
     return Status::OK();
   }
@@ -350,7 +350,7 @@ class CommandHRangeByLex : public Commander {
       kv_pairs.emplace_back(p.field);
       kv_pairs.emplace_back(p.value);
     }
-    *output = MultiBulkString(kv_pairs, false);
+    *output = conn->MultiBulkString(kv_pairs, false);
 
     return Status::OK();
   }
@@ -372,7 +372,7 @@ class CommandHScan : public CommandSubkeyScanBase {
       return {Status::RedisExecErr, s.ToString()};
     }
 
-    *output = GenerateOutput(srv, fields, values, CursorType::kTypeHash);
+    *output = GenerateOutput(srv, conn, fields, values, CursorType::kTypeHash);
     return Status::OK();
   }
 };
@@ -415,9 +415,9 @@ class CommandHRandField : public Commander {
     }
 
     if (no_parameters_)
-      *output = s.IsNotFound() ? redis::NilString() : 
redis::BulkString(result_entries[0]);
+      *output = s.IsNotFound() ? conn->NilString() : 
redis::BulkString(result_entries[0]);
     else
-      *output = redis::MultiBulkString(result_entries, false);
+      *output = conn->MultiBulkString(result_entries, false);
     return Status::OK();
   }
 
diff --git a/src/commands/cmd_json.cc b/src/commands/cmd_json.cc
index 737581f8..5377ed31 100644
--- a/src/commands/cmd_json.cc
+++ b/src/commands/cmd_json.cc
@@ -33,13 +33,13 @@
 namespace redis {
 
 template <typename T, std::enable_if_t<std::is_integral_v<T>, int> = 0>
-std::string OptionalsToString(Optionals<T> &opts) {
+std::string OptionalsToString(const Connection *conn, Optionals<T> &opts) {
   std::string str = MultiLen(opts.size());
   for (const auto &opt : opts) {
     if (opt.has_value()) {
       str += redis::Integer(opt.value());
     } else {
-      str += redis::NilString();
+      str += conn->NilString();
     }
   }
   return str;
@@ -100,7 +100,7 @@ class CommandJsonGet : public Commander {
     JsonValue result;
     auto s = json.Get(args_[1], paths_, &result);
     if (s.IsNotFound()) {
-      *output = redis::NilString();
+      *output = conn->NilString();
       return Status::OK();
     }
     if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
@@ -129,7 +129,7 @@ class CommandJsonInfo : public Commander {
     auto format_str = storage_format == JsonStorageFormat::JSON   ? "json"
                       : storage_format == JsonStorageFormat::CBOR ? "cbor"
                                                                   : "unknown";
-    output->append(redis::MultiBulkString({"storage_format", format_str}));
+    output->append(conn->MultiBulkString({"storage_format", format_str}));
     return Status::OK();
   }
 };
@@ -144,7 +144,7 @@ class CommandJsonArrAppend : public Commander {
     auto s = json.ArrAppend(args_[1], args_[2], {args_.begin() + 3, 
args_.end()}, &results);
     if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
 
-    *output = OptionalsToString(results);
+    *output = OptionalsToString(conn, results);
     return Status::OK();
   }
 };
@@ -169,12 +169,12 @@ class CommandJsonArrInsert : public Commander {
 
     auto s = json.ArrInsert(args_[1], args_[2], index_, {args_.begin() + 4, 
args_.end()}, &results);
     if (s.IsNotFound()) {
-      *output = redis::NilString();
+      *output = conn->NilString();
       return Status::OK();
     }
     if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
 
-    *output = OptionalsToString(results);
+    *output = OptionalsToString(conn, results);
     return Status::OK();
   }
 
@@ -198,11 +198,11 @@ class CommandJsonType : public Commander {
     auto s = json.Type(args_[1], path, &types);
     if (!s.ok() && !s.IsNotFound()) return {Status::RedisExecErr, 
s.ToString()};
     if (s.IsNotFound()) {
-      *output = redis::NilString();
+      *output = conn->NilString();
       return Status::OK();
     }
 
-    *output = redis::MultiBulkString(types);
+    *output = conn->MultiBulkString(types);
     return Status::OK();
   }
 };
@@ -219,16 +219,16 @@ class CommandJsonObjkeys : public Commander {
     auto s = json.ObjKeys(args_[1], path, &results);
     if (!s.ok() && !s.IsNotFound()) return {Status::RedisExecErr, 
s.ToString()};
     if (s.IsNotFound()) {
-      *output = redis::NilString();
+      *output = conn->NilString();
       return Status::OK();
     }
 
     *output = redis::MultiLen(results.size());
     for (const auto &item : results) {
       if (item.has_value()) {
-        *output += redis::MultiBulkString(item.value(), false);
+        *output += conn->MultiBulkString(item.value(), false);
       } else {
-        *output += redis::NilString();
+        *output += conn->NilString();
       }
     }
 
@@ -248,7 +248,7 @@ class CommandJsonClear : public Commander {
     auto s = json.Clear(args_[1], path, &result);
 
     if (s.IsNotFound()) {
-      *output = redis::NilString();
+      *output = conn->NilString();
       return Status::OK();
     }
 
@@ -269,11 +269,11 @@ class CommandJsonToggle : public Commander {
     auto s = json.Toggle(args_[1], path, &results);
     if (!s.ok() && !s.IsNotFound()) return {Status::RedisExecErr, 
s.ToString()};
     if (s.IsNotFound()) {
-      *output = redis::NilString();
+      *output = conn->NilString();
       return Status::OK();
     }
 
-    *output = OptionalsToString(results);
+    *output = OptionalsToString(conn, results);
     return Status::OK();
   }
 };
@@ -293,12 +293,12 @@ class CommandJsonArrLen : public Commander {
     Optionals<uint64_t> results;
     auto s = json.ArrLen(args_[1], path, &results);
     if (s.IsNotFound()) {
-      *output = redis::NilString();
+      *output = conn->NilString();
       return Status::OK();
     }
     if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
 
-    *output = OptionalsToString(results);
+    *output = OptionalsToString(conn, results);
     return Status::OK();
   }
 };
@@ -320,7 +320,7 @@ class CommandJsonMerge : public Commander {
     }
 
     if (!result) {
-      *output = redis::NilString();
+      *output = conn->NilString();
     } else {
       *output = redis::SimpleString("OK");
     }
@@ -356,7 +356,7 @@ class CommandJsonArrPop : public Commander {
       if (data.has_value()) {
         *output += redis::BulkString(GET_OR_RET(data->Print()));
       } else {
-        *output += redis::NilString();
+        *output += conn->NilString();
       }
     }
 
@@ -383,12 +383,12 @@ class CommandJsonObjLen : public Commander {
     Optionals<uint64_t> results;
     auto s = json.ObjLen(args_[1], path, &results);
     if (s.IsNotFound()) {
-      *output = redis::NilString();
+      *output = conn->NilString();
       return Status::OK();
     }
     if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
 
-    *output = OptionalsToString(results);
+    *output = OptionalsToString(conn, results);
     return Status::OK();
   }
 };
@@ -411,12 +411,12 @@ class CommandJsonArrTrim : public Commander {
     auto s = json.ArrTrim(args_[1], path_, start_, stop_, &results);
 
     if (s.IsNotFound()) {
-      *output = redis::NilString();
+      *output = conn->NilString();
       return Status::OK();
     }
     if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
 
-    *output = OptionalsToString(results);
+    *output = OptionalsToString(conn, results);
     return Status::OK();
   }
 
@@ -452,13 +452,13 @@ class CommanderJsonArrIndex : public Commander {
     auto s = json.ArrIndex(args_[1], args_[2], args_[3], start_, end_, 
&results);
 
     if (s.IsNotFound()) {
-      *output = redis::NilString();
+      *output = conn->NilString();
       return Status::OK();
     }
 
     if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
 
-    *output = OptionalsToString(results);
+    *output = OptionalsToString(conn, results);
     return Status::OK();
   }
 
@@ -480,7 +480,7 @@ class CommandJsonDel : public Commander {
     }
     auto s = json.Del(args_[1], path, &result);
     if (s.IsNotFound()) {
-      *output = redis::NilString();
+      *output = conn->NilString();
       return Status::OK();
     }
     if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
@@ -540,7 +540,7 @@ class CommandJsonStrAppend : public Commander {
     auto s = json.StrAppend(args_[1], path, args_[3], &results);
     if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
 
-    *output = OptionalsToString(results);
+    *output = OptionalsToString(conn, results);
     return Status::OK();
   }
 };
@@ -561,7 +561,7 @@ class CommandJsonStrLen : public Commander {
     auto s = json.StrLen(args_[1], path, &results);
     if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
 
-    *output = OptionalsToString(results);
+    *output = OptionalsToString(conn, results);
     return Status::OK();
   }
 };
@@ -589,7 +589,7 @@ class CommandJsonMGet : public Commander {
       }
     }
 
-    *output = MultiBulkString(values, statuses);
+    *output = conn->MultiBulkString(values, statuses);
     return Status::OK();
   }
 };
diff --git a/src/commands/cmd_list.cc b/src/commands/cmd_list.cc
index 86a0708d..5470d444 100644
--- a/src/commands/cmd_list.cc
+++ b/src/commands/cmd_list.cc
@@ -119,9 +119,9 @@ class CommandPop : public Commander {
       }
 
       if (s.IsNotFound()) {
-        *output = redis::MultiLen(-1);
+        *output = conn->NilArray();
       } else {
-        *output = redis::MultiBulkString(elems);
+        *output = conn->MultiBulkString(elems);
       }
     } else {
       std::string elem;
@@ -131,7 +131,7 @@ class CommandPop : public Commander {
       }
 
       if (s.IsNotFound()) {
-        *output = redis::NilString();
+        *output = conn->NilString();
       } else {
         *output = redis::BulkString(elem);
       }
@@ -209,9 +209,9 @@ class CommandLMPop : public Commander {
     }
 
     if (elems.empty()) {
-      *output = redis::NilString();
+      *output = conn->NilString();
     } else {
-      std::string elems_bulk = redis::MultiBulkString(elems);
+      std::string elems_bulk = conn->MultiBulkString(elems);
       *output = redis::Array({redis::BulkString(chosen_key), 
std::move(elems_bulk)});
     }
 
@@ -298,10 +298,10 @@ class CommandBPop : public BlockingCommander {
 
     if (s.ok()) {
       if (!last_key_ptr) {
-        conn_->Reply(redis::MultiBulkString({"", ""}));
+        conn_->Reply(conn_->MultiBulkString({"", ""}));
       } else {
         conn_->GetServer()->UpdateWatchedKeysManually({*last_key_ptr});
-        conn_->Reply(redis::MultiBulkString({*last_key_ptr, std::move(elem)}));
+        conn_->Reply(conn_->MultiBulkString({*last_key_ptr, std::move(elem)}));
       }
     } else if (!s.IsNotFound()) {
       conn_->Reply(redis::Error("ERR " + s.ToString()));
@@ -315,7 +315,7 @@ class CommandBPop : public BlockingCommander {
     return !s.IsNotFound();
   }
 
-  std::string NoopReply() override { return redis::NilString(); }
+  std::string NoopReply(const Connection *conn) override { return 
conn->NilString(); }
 
  private:
   bool left_ = false;
@@ -410,7 +410,7 @@ class CommandBLMPop : public BlockingCommander {
     if (s.ok()) {
       if (!elems.empty()) {
         conn_->GetServer()->UpdateWatchedKeysManually({chosen_key});
-        std::string elems_bulk = redis::MultiBulkString(elems);
+        std::string elems_bulk = conn_->MultiBulkString(elems);
         conn_->Reply(redis::Array({redis::BulkString(chosen_key), 
std::move(elems_bulk)}));
       }
     } else if (!s.IsNotFound()) {
@@ -437,7 +437,7 @@ class CommandBLMPop : public BlockingCommander {
     return !s.IsNotFound();
   }
 
-  std::string NoopReply() override { return redis::NilString(); }
+  std::string NoopReply(const Connection *conn) override { return 
conn->NilString(); }
 
   static const inline CommandKeyRangeGen keyRangeGen = [](const 
std::vector<std::string> &args) {
     CommandKeyRange range;
@@ -536,7 +536,7 @@ class CommandLRange : public Commander {
       return {Status::RedisExecErr, s.ToString()};
     }
 
-    *output = redis::MultiBulkString(elems, false);
+    *output = conn->MultiBulkString(elems, false);
     return Status::OK();
   }
 
@@ -580,7 +580,7 @@ class CommandLIndex : public Commander {
     }
 
     if (s.IsNotFound()) {
-      *output = redis::NilString();
+      *output = conn->NilString();
     } else {
       *output = redis::BulkString(elem);
     }
@@ -663,7 +663,7 @@ class CommandRPopLPUSH : public Commander {
       return {Status::RedisExecErr, s.ToString()};
     }
 
-    *output = s.IsNotFound() ? redis::NilString() : redis::BulkString(elem);
+    *output = s.IsNotFound() ? conn->NilString() : redis::BulkString(elem);
     return Status::OK();
   }
 };
@@ -694,7 +694,7 @@ class CommandLMove : public Commander {
       return {Status::RedisExecErr, s.ToString()};
     }
 
-    *output = s.IsNotFound() ? redis::NilString() : redis::BulkString(elem);
+    *output = s.IsNotFound() ? conn->NilString() : redis::BulkString(elem);
     return Status::OK();
   }
 
@@ -769,7 +769,7 @@ class CommandBLMove : public BlockingCommander {
     return !empty;
   }
 
-  std::string NoopReply() override { return redis::MultiLen(-1); }
+  std::string NoopReply(const Connection *conn) override { return 
conn->NilArray(); }
 
  private:
   bool src_left_;
@@ -826,7 +826,7 @@ class CommandLPos : public Commander {
     // We return nil or a single value if `COUNT` option is not given.
     if (!spec_.count.has_value()) {
       if (s.IsNotFound() || indexes.empty()) {
-        *output = redis::NilString();
+        *output = conn->NilString();
       } else {
         assert(indexes.size() == 1);
         *output = redis::Integer(indexes[0]);
@@ -839,7 +839,7 @@ class CommandLPos : public Commander {
       for (const auto &index : indexes) {
         values.emplace_back(std::to_string(index));
       }
-      *output = redis::MultiBulkString(values, false);
+      *output = conn->MultiBulkString(values, false);
     }
     return Status::OK();
   }
diff --git a/src/commands/cmd_pubsub.cc b/src/commands/cmd_pubsub.cc
index 6ec61eea..8f0ddfbd 100644
--- a/src/commands/cmd_pubsub.cc
+++ b/src/commands/cmd_pubsub.cc
@@ -73,10 +73,11 @@ class CommandMPublish : public Commander {
   }
 };
 
-void SubscribeCommandReply(std::string *output, const std::string &name, const 
std::string &sub_name, int num) {
+void SubscribeCommandReply(const Connection *conn, std::string *output, const 
std::string &name,
+                           const std::string &sub_name, int num) {
   output->append(redis::MultiLen(3));
   output->append(redis::BulkString(name));
-  output->append(sub_name.empty() ? redis::NilString() : 
redis::BulkString(sub_name));
+  output->append(sub_name.empty() ? conn->NilString() : BulkString(sub_name));
   output->append(redis::Integer(num));
 }
 
@@ -85,7 +86,8 @@ class CommandSubscribe : public Commander {
   Status Execute(Server *srv, Connection *conn, std::string *output) override {
     for (unsigned i = 1; i < args_.size(); i++) {
       conn->SubscribeChannel(args_[i]);
-      SubscribeCommandReply(output, "subscribe", args_[i], 
conn->SubscriptionsCount() + conn->PSubscriptionsCount());
+      SubscribeCommandReply(conn, output, "subscribe", args_[i],
+                            conn->SubscriptionsCount() + 
conn->PSubscriptionsCount());
     }
     return Status::OK();
   }
@@ -95,13 +97,13 @@ class CommandUnSubscribe : public Commander {
  public:
   Status Execute(Server *srv, Connection *conn, std::string *output) override {
     if (args_.size() == 1) {
-      conn->UnsubscribeAll([output](const std::string &sub_name, int num) {
-        SubscribeCommandReply(output, "unsubscribe", sub_name, num);
+      conn->UnsubscribeAll([conn, output](const std::string &sub_name, int 
num) {
+        SubscribeCommandReply(conn, output, "unsubscribe", sub_name, num);
       });
     } else {
       for (size_t i = 1; i < args_.size(); i++) {
         conn->UnsubscribeChannel(args_[i]);
-        SubscribeCommandReply(output, "unsubscribe", args_[i],
+        SubscribeCommandReply(conn, output, "unsubscribe", args_[i],
                               conn->SubscriptionsCount() + 
conn->PSubscriptionsCount());
       }
     }
@@ -114,7 +116,8 @@ class CommandPSubscribe : public Commander {
   Status Execute(Server *srv, Connection *conn, std::string *output) override {
     for (size_t i = 1; i < args_.size(); i++) {
       conn->PSubscribeChannel(args_[i]);
-      SubscribeCommandReply(output, "psubscribe", args_[i], 
conn->SubscriptionsCount() + conn->PSubscriptionsCount());
+      SubscribeCommandReply(conn, output, "psubscribe", args_[i],
+                            conn->SubscriptionsCount() + 
conn->PSubscriptionsCount());
     }
     return Status::OK();
   }
@@ -124,13 +127,13 @@ class CommandPUnSubscribe : public Commander {
  public:
   Status Execute(Server *srv, Connection *conn, std::string *output) override {
     if (args_.size() == 1) {
-      conn->PUnsubscribeAll([output](const std::string &sub_name, int num) {
-        SubscribeCommandReply(output, "punsubscribe", sub_name, num);
+      conn->PUnsubscribeAll([conn, output](const std::string &sub_name, int 
num) {
+        SubscribeCommandReply(conn, output, "punsubscribe", sub_name, num);
       });
     } else {
       for (size_t i = 1; i < args_.size(); i++) {
         conn->PUnsubscribeChannel(args_[i]);
-        SubscribeCommandReply(output, "punsubscribe", args_[i],
+        SubscribeCommandReply(conn, output, "punsubscribe", args_[i],
                               conn->SubscriptionsCount() + 
conn->PSubscriptionsCount());
       }
     }
@@ -153,7 +156,7 @@ class CommandSSubscribe : public Commander {
 
     for (unsigned int i = 1; i < args_.size(); i++) {
       conn->SSubscribeChannel(args_[i], slot);
-      SubscribeCommandReply(output, "ssubscribe", args_[i], 
conn->SSubscriptionsCount());
+      SubscribeCommandReply(conn, output, "ssubscribe", args_[i], 
conn->SSubscriptionsCount());
     }
     return Status::OK();
   }
@@ -163,13 +166,13 @@ class CommandSUnSubscribe : public Commander {
  public:
   Status Execute(Server *srv, Connection *conn, std::string *output) override {
     if (args_.size() == 1) {
-      conn->SUnsubscribeAll([output](const std::string &sub_name, int num) {
-        SubscribeCommandReply(output, "sunsubscribe", sub_name, num);
+      conn->SUnsubscribeAll([conn, output](const std::string &sub_name, int 
num) {
+        SubscribeCommandReply(conn, output, "sunsubscribe", sub_name, num);
       });
     } else {
       for (size_t i = 1; i < args_.size(); i++) {
         conn->SUnsubscribeChannel(args_[i], srv->GetConfig()->cluster_enabled 
? GetSlotIdFromKey(args_[i]) : 0);
-        SubscribeCommandReply(output, "sunsubscribe", args_[i], 
conn->SSubscriptionsCount());
+        SubscribeCommandReply(conn, output, "sunsubscribe", args_[i], 
conn->SSubscriptionsCount());
       }
     }
     return Status::OK();
@@ -231,7 +234,7 @@ class CommandPubSub : public Commander {
       } else {
         srv->GetSChannelsByPattern(pattern_, &channels);
       }
-      *output = redis::MultiBulkString(channels);
+      *output = conn->MultiBulkString(channels);
       return Status::OK();
     }
 
diff --git a/src/commands/cmd_server.cc b/src/commands/cmd_server.cc
index ef1f9a17..71781cd5 100644
--- a/src/commands/cmd_server.cc
+++ b/src/commands/cmd_server.cc
@@ -105,11 +105,11 @@ class CommandNamespace : public Commander {
         }
         namespaces.emplace_back(kDefaultNamespace);
         namespaces.emplace_back(config->requirepass);
-        *output = redis::MultiBulkString(namespaces, false);
+        *output = conn->MultiBulkString(namespaces, false);
       } else {
         auto token = srv->GetNamespace()->Get(args_[2]);
         if (token.Is<Status::NotFound>()) {
-          *output = redis::NilString();
+          *output = conn->NilString();
         } else {
           *output = redis::BulkString(token.GetValue());
         }
@@ -155,7 +155,7 @@ class CommandKeys : public Commander {
     if (!s.ok()) {
       return {Status::RedisExecErr, s.ToString()};
     }
-    *output = redis::MultiBulkString(keys);
+    *output = conn->MultiBulkString(keys);
     return Status::OK();
   }
 };
@@ -252,7 +252,7 @@ class CommandConfig : public Commander {
     } else if (args_.size() == 3 && sub_command == "get") {
       std::vector<std::string> values;
       config->Get(args_[2], &values);
-      *output = redis::MultiBulkString(values);
+      *output = conn->MultiBulkString(values);
     } else if (args_.size() == 4 && sub_command == "set") {
       Status s = config->Set(srv, args_[2], args_[3]);
       if (!s.IsOK()) {
@@ -302,7 +302,7 @@ class CommandDisk : public Commander {
     if (!s.ok()) {
       // Redis returns the Nil string when the key does not exist
       if (s.IsNotFound()) {
-        *output = redis::NilString();
+        *output = conn->NilString();
         return Status::OK();
       }
       return {Status::RedisExecErr, s.ToString()};
@@ -514,7 +514,7 @@ class CommandClient : public Commander {
       return Status::OK();
     } else if (subcommand_ == "getname") {
       std::string name = conn->GetName();
-      *output = name == "" ? redis::NilString() : redis::BulkString(name);
+      *output = name == "" ? conn->NilString() : redis::BulkString(name);
       return Status::OK();
     } else if (subcommand_ == "id") {
       *output = redis::Integer(conn->GetID());
@@ -613,12 +613,14 @@ class CommandDebug : public Commander {
           *output += redis::Integer(i);
         }
       } else if (protocol_type_ == "true") {
-        *output = redis::Bool(conn->GetProtocolVersion(), true);
+        *output = conn->Bool(true);
       } else if (protocol_type_ == "false") {
-        *output = redis::Bool(conn->GetProtocolVersion(), false);
+        *output = conn->Bool(false);
+      } else if (protocol_type_ == "null") {
+        *output = conn->NilString();
       } else {
         *output =
-            redis::Error("Wrong protocol type name. Please use one of the 
following: string|int|array|true|false");
+            redis::Error("Wrong protocol type name. Please use one of the 
following: string|int|array|true|false|null");
       }
     } else {
       return {Status::RedisInvalidCmd, "Unknown subcommand, should be DEBUG or 
PROTOCOL"};
@@ -668,7 +670,7 @@ class CommandCommand : public Commander {
         for (const auto &key_index : keys_indexes) {
           keys.emplace_back(args_[key_index + 2]);
         }
-        *output = redis::MultiBulkString(keys);
+        *output = conn->MultiBulkString(keys);
       } else {
         return {Status::RedisExecErr, "Command subcommand must be one of 
COUNT, GETKEYS, INFO"};
       }
@@ -807,7 +809,8 @@ class CommandScan : public CommandScanBase {
     return Commander::Parse(args);
   }
 
-  static std::string GenerateOutput(Server *srv, const 
std::vector<std::string> &keys, const std::string &end_cursor) {
+  static std::string GenerateOutput(Server *srv, const Connection *conn, const 
std::vector<std::string> &keys,
+                                    const std::string &end_cursor) {
     std::vector<std::string> list;
     if (!end_cursor.empty()) {
       list.emplace_back(
@@ -816,7 +819,7 @@ class CommandScan : public CommandScanBase {
       list.emplace_back(redis::BulkString("0"));
     }
 
-    list.emplace_back(redis::MultiBulkString(keys, false));
+    list.emplace_back(conn->MultiBulkString(keys, false));
 
     return redis::Array(list);
   }
@@ -831,7 +834,7 @@ class CommandScan : public CommandScanBase {
     if (!s.ok()) {
       return {Status::RedisExecErr, s.ToString()};
     }
-    *output = GenerateOutput(srv, keys, end_key);
+    *output = GenerateOutput(srv, conn, keys, end_key);
     return Status::OK();
   }
 };
diff --git a/src/commands/cmd_set.cc b/src/commands/cmd_set.cc
index f2c60a3b..0b5a8bda 100644
--- a/src/commands/cmd_set.cc
+++ b/src/commands/cmd_set.cc
@@ -93,7 +93,7 @@ class CommandSMembers : public Commander {
       return {Status::RedisExecErr, s.ToString()};
     }
 
-    *output = redis::MultiBulkString(members, false);
+    *output = conn->MultiBulkString(members, false);
     return Status::OK();
   }
 };
@@ -171,12 +171,12 @@ class CommandSPop : public Commander {
     }
 
     if (with_count_) {
-      *output = redis::MultiBulkString(members, false);
+      *output = conn->MultiBulkString(members, false);
     } else {
       if (members.size() > 0) {
         *output = redis::BulkString(members.front());
       } else {
-        *output = redis::NilString();
+        *output = conn->NilString();
       }
     }
     return Status::OK();
@@ -211,7 +211,7 @@ class CommandSRandMember : public Commander {
     if (!s.ok()) {
       return {Status::RedisExecErr, s.ToString()};
     }
-    *output = redis::MultiBulkString(members, false);
+    *output = conn->MultiBulkString(members, false);
     return Status::OK();
   }
 
@@ -249,7 +249,7 @@ class CommandSDiff : public Commander {
       return {Status::RedisExecErr, s.ToString()};
     }
 
-    *output = redis::MultiBulkString(members, false);
+    *output = conn->MultiBulkString(members, false);
     return Status::OK();
   }
 };
@@ -269,7 +269,7 @@ class CommandSUnion : public Commander {
       return {Status::RedisExecErr, s.ToString()};
     }
 
-    *output = redis::MultiBulkString(members, false);
+    *output = conn->MultiBulkString(members, false);
     return Status::OK();
   }
 };
@@ -289,7 +289,7 @@ class CommandSInter : public Commander {
       return {Status::RedisExecErr, s.ToString()};
     }
 
-    *output = redis::MultiBulkString(members, false);
+    *output = conn->MultiBulkString(members, false);
     return Status::OK();
   }
 };
@@ -432,7 +432,7 @@ class CommandSScan : public CommandSubkeyScanBase {
       return {Status::RedisExecErr, s.ToString()};
     }
 
-    *output = CommandScanBase::GenerateOutput(srv, members, 
CursorType::kTypeSet);
+    *output = CommandScanBase::GenerateOutput(srv, conn, members, 
CursorType::kTypeSet);
     return Status::OK();
   }
 };
diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc
index 545d438d..f82497fc 100644
--- a/src/commands/cmd_stream.cc
+++ b/src/commands/cmd_stream.cc
@@ -153,7 +153,7 @@ class CommandXAdd : public Commander {
     }
 
     if (s.IsNotFound() && nomkstream_) {
-      *output = redis::NilString();
+      *output = conn->NilString();
       return Status::OK();
     }
 
@@ -464,17 +464,17 @@ class CommandXInfo : public Commander {
       if (info.first_entry) {
         output->append(redis::MultiLen(2));
         output->append(redis::BulkString(info.first_entry->key));
-        output->append(redis::MultiBulkString(info.first_entry->values));
+        output->append(conn->MultiBulkString(info.first_entry->values));
       } else {
-        output->append(redis::NilString());
+        output->append(conn->NilString());
       }
       output->append(redis::BulkString("last-entry"));
       if (info.last_entry) {
         output->append(redis::MultiLen(2));
         output->append(redis::BulkString(info.last_entry->key));
-        output->append(redis::MultiBulkString(info.last_entry->values));
+        output->append(conn->MultiBulkString(info.last_entry->values));
       } else {
-        output->append(redis::NilString());
+        output->append(conn->NilString());
       }
     } else {
       output->append(redis::BulkString("entries"));
@@ -482,7 +482,7 @@ class CommandXInfo : public Commander {
       for (const auto &e : info.entries) {
         output->append(redis::MultiLen(2));
         output->append(redis::BulkString(e.key));
-        output->append(redis::MultiBulkString(e.values));
+        output->append(conn->MultiBulkString(e.values));
       }
     }
 
@@ -514,13 +514,13 @@ class CommandXInfo : public Commander {
       
output->append(redis::BulkString(it.second.last_delivered_id.ToString()));
       output->append(redis::BulkString("entries-read"));
       if (it.second.entries_read == -1) {
-        output->append(redis::NilString());
+        output->append(conn->NilString());
       } else {
         output->append(redis::Integer(it.second.entries_read));
       }
       output->append(redis::BulkString("lag"));
       if (it.second.lag == UINT64_MAX) {
-        output->append(redis::NilString());
+        output->append(conn->NilString());
       } else {
         output->append(redis::Integer(it.second.lag));
       }
@@ -611,7 +611,7 @@ class CommandXRange : public Commander {
 
   Status Execute(Server *srv, Connection *conn, std::string *output) override {
     if (with_count_ && count_ == 0) {
-      *output = redis::NilString();
+      *output = conn->NilString();
       return Status::OK();
     }
 
@@ -637,7 +637,7 @@ class CommandXRange : public Commander {
     for (const auto &e : result) {
       output->append(redis::MultiLen(2));
       output->append(redis::BulkString(e.key));
-      output->append(redis::MultiBulkString(e.values));
+      output->append(conn->MultiBulkString(e.values));
     }
 
     return Status::OK();
@@ -704,7 +704,7 @@ class CommandXRevRange : public Commander {
 
   Status Execute(Server *srv, Connection *conn, std::string *output) override {
     if (with_count_ && count_ == 0) {
-      *output = redis::NilString();
+      *output = conn->NilString();
       return Status::OK();
     }
 
@@ -730,7 +730,7 @@ class CommandXRevRange : public Commander {
     for (const auto &e : result) {
       output->append(redis::MultiLen(2));
       output->append(redis::BulkString(e.key));
-      output->append(redis::MultiBulkString(e.values));
+      output->append(conn->MultiBulkString(e.values));
     }
 
     return Status::OK();
@@ -862,7 +862,7 @@ class CommandXRead : public Commander,
 
     if (block_ && results.empty()) {
       if (conn->IsInExec()) {
-        *output = redis::MultiLen(-1);
+        *output = conn->NilArray();
         return Status::OK();  // No blocking in multi-exec
       }
 
@@ -870,14 +870,14 @@ class CommandXRead : public Commander,
     }
 
     if (!block_ && results.empty()) {
-      *output = redis::MultiLen(-1);
+      *output = conn->NilArray();
       return Status::OK();
     }
 
-    return SendResults(output, results);
+    return SendResults(conn, output, results);
   }
 
-  static Status SendResults(std::string *output, const 
std::vector<StreamReadResult> &results) {
+  static Status SendResults(Connection *conn, std::string *output, const 
std::vector<StreamReadResult> &results) {
     output->append(redis::MultiLen(results.size()));
 
     for (const auto &result : results) {
@@ -887,7 +887,7 @@ class CommandXRead : public Commander,
       for (const auto &entry : result.entries) {
         output->append(redis::MultiLen(2));
         output->append(redis::BulkString(entry.key));
-        output->append(redis::MultiBulkString(entry.values));
+        output->append(conn->MultiBulkString(entry.values));
       }
     }
 
@@ -973,7 +973,7 @@ class CommandXRead : public Commander,
     }
 
     if (results.empty()) {
-      conn_->Reply(redis::MultiLen(-1));
+      conn_->Reply(conn_->NilArray());
     }
 
     SendReply(results);
@@ -991,7 +991,7 @@ class CommandXRead : public Commander,
       for (const auto &entry : result.entries) {
         output.append(redis::MultiLen(2));
         output.append(redis::BulkString(entry.key));
-        output.append(redis::MultiBulkString(entry.values));
+        output.append(conn_->MultiBulkString(entry.values));
       }
     }
 
@@ -1009,7 +1009,7 @@ class CommandXRead : public Commander,
   }
 
   void TimerCB(int, int16_t events) {
-    conn_->Reply(redis::NilString());
+    conn_->Reply(conn_->NilString());
 
     timer_.reset();
 
diff --git a/src/commands/cmd_string.cc b/src/commands/cmd_string.cc
index a0e1a690..debf5e3f 100644
--- a/src/commands/cmd_string.cc
+++ b/src/commands/cmd_string.cc
@@ -54,7 +54,7 @@ class CommandGet : public Commander {
       return {Status::RedisExecErr, s.ToString()};
     }
 
-    *output = s.IsNotFound() ? redis::NilString() : redis::BulkString(value);
+    *output = s.IsNotFound() ? conn->NilString() : redis::BulkString(value);
     return Status::OK();
   }
 };
@@ -101,7 +101,7 @@ class CommandGetEx : public Commander {
       return {Status::RedisExecErr, s.ToString()};
     }
 
-    *output = s.IsNotFound() ? redis::NilString() : redis::BulkString(value);
+    *output = s.IsNotFound() ? conn->NilString() : redis::BulkString(value);
     return Status::OK();
   }
 
@@ -142,7 +142,7 @@ class CommandGetSet : public Commander {
     if (old_value.has_value()) {
       *output = redis::BulkString(old_value.value());
     } else {
-      *output = redis::NilString();
+      *output = conn->NilString();
     }
     return Status::OK();
   }
@@ -159,7 +159,7 @@ class CommandGetDel : public Commander {
     }
 
     if (s.IsNotFound()) {
-      *output = redis::NilString();
+      *output = conn->NilString();
     } else {
       *output = redis::BulkString(value);
     }
@@ -190,7 +190,7 @@ class CommandGetRange : public Commander {
     }
 
     if (s.IsNotFound()) {
-      *output = redis::NilString();
+      *output = conn->NilString();
       return Status::OK();
     }
 
@@ -199,7 +199,7 @@ class CommandGetRange : public Commander {
     if (start_ < 0) start_ = 0;
     if (stop_ > static_cast<int>(value.size())) stop_ = 
static_cast<int>(value.size());
     if (start_ > stop_) {
-      *output = redis::NilString();
+      *output = conn->NilString();
     } else {
       *output = redis::BulkString(value.substr(start_, stop_ - start_ + 1));
     }
@@ -255,7 +255,7 @@ class CommandMGet : public Commander {
     std::vector<std::string> values;
     // always return OK
     auto statuses = string_db.MGet(keys, &values);
-    *output = redis::MultiBulkString(values, statuses);
+    *output = conn->MultiBulkString(values, statuses);
     return Status::OK();
   }
 };
@@ -323,13 +323,13 @@ class CommandSet : public Commander {
       if (ret.has_value()) {
         *output = redis::BulkString(ret.value());
       } else {
-        *output = redis::NilString();
+        *output = conn->NilString();
       }
     } else {
       if (ret.has_value()) {
         *output = redis::SimpleString("OK");
       } else {
-        *output = redis::NilString();
+        *output = conn->NilString();
       }
     }
     return Status::OK();
diff --git a/src/commands/cmd_txn.cc b/src/commands/cmd_txn.cc
index c99c6ddc..fa1a47aa 100644
--- a/src/commands/cmd_txn.cc
+++ b/src/commands/cmd_txn.cc
@@ -73,7 +73,7 @@ class CommandExec : public Commander {
     }
 
     if (srv->IsWatchedKeysModified(conn)) {
-      *output = redis::NilString();
+      *output = conn->NilString();
       return Status::OK();
     }
 
diff --git a/src/commands/cmd_zset.cc b/src/commands/cmd_zset.cc
index f1160274..acddad82 100644
--- a/src/commands/cmd_zset.cc
+++ b/src/commands/cmd_zset.cc
@@ -83,7 +83,7 @@ class CommandZAdd : public Commander {
       auto new_score = member_scores_[0].score;
       if ((flags_.HasNX() || flags_.HasXX() || flags_.HasLT() || 
flags_.HasGT()) && old_score == new_score &&
           ret == 0) {  // not the first time using incr && score not changed
-        *output = redis::NilString();
+        *output = conn->NilString();
         return Status::OK();
       }
 
@@ -336,7 +336,7 @@ class CommandBZPop : public BlockingCommander {
     return StartBlocking(timeout_, output);
   }
 
-  std::string NoopReply() override { return redis::MultiLen(-1); }
+  std::string NoopReply(const Connection *conn) override { return 
conn->NilArray(); }
 
   void BlockKeys() override {
     for (const auto &key : keys_) {
@@ -454,7 +454,7 @@ class CommandZMPop : public Commander {
       SendMembersWithScoresForZMpop(conn, user_key, member_scores);
       return Status::OK();
     }
-    *output = redis::MultiLen(-1);
+    *output = conn->NilArray();
     return Status::OK();
   }
 
@@ -538,7 +538,7 @@ class CommandBZMPop : public BlockingCommander {
     }
   }
 
-  std::string NoopReply() override { return redis::NilString(); }
+  std::string NoopReply(const Connection *conn) override { return 
conn->NilString(); }
 
   bool OnBlockingWrite() override {
     std::string user_key;
@@ -797,14 +797,14 @@ class CommandZRangeGeneric : public Commander {
         break;
       case kZRangeScore:
         if (score_spec_.count == 0) {
-          *output = redis::MultiBulkString({});
+          *output = conn->MultiBulkString({});
           return Status::OK();
         }
         s = zset_db.RangeByScore(key_, score_spec_, &member_scores, nullptr);
         break;
       case kZRangeLex:
         if (lex_spec_.count == 0) {
-          *output = redis::MultiBulkString({});
+          *output = conn->MultiBulkString({});
           return Status::OK();
         }
         s = zset_db.RangeByLex(key_, lex_spec_, &member_scores, nullptr);
@@ -896,9 +896,9 @@ class CommandZRank : public Commander {
 
     if (rank == -1) {
       if (with_score_) {
-        output->append(redis::MultiLen(-1));
+        output->append(conn->NilArray());
       } else {
-        *output = redis::NilString();
+        *output = conn->NilString();
       }
     } else {
       if (with_score_) {
@@ -1045,7 +1045,7 @@ class CommandZScore : public Commander {
     }
 
     if (s.IsNotFound()) {
-      *output = redis::NilString();
+      *output = conn->NilString();
     } else {
       *output = redis::BulkString(util::Float2String(score));
     }
@@ -1080,7 +1080,7 @@ class CommandZMScore : public Commander {
         }
       }
     }
-    *output = redis::MultiBulkString(values);
+    *output = conn->MultiBulkString(values);
     return Status::OK();
   }
 };
@@ -1355,7 +1355,7 @@ class CommandZScan : public CommandSubkeyScanBase {
     for (const auto &score : scores) {
       score_strings.emplace_back(util::Float2String(score));
     }
-    *output = GenerateOutput(srv, members, score_strings, 
CursorType::kTypeZSet);
+    *output = GenerateOutput(srv, conn, members, score_strings, 
CursorType::kTypeZSet);
     return Status::OK();
   }
 };
@@ -1407,9 +1407,9 @@ class CommandZRandMember : public Commander {
     }
 
     if (no_parameters_)
-      *output = s.IsNotFound() ? redis::NilString() : 
redis::BulkString(result_entries[0]);
+      *output = s.IsNotFound() ? conn->NilString() : 
redis::BulkString(result_entries[0]);
     else
-      *output = redis::MultiBulkString(result_entries, false);
+      *output = conn->MultiBulkString(result_entries, false);
     return Status::OK();
   }
 
diff --git a/src/commands/commander.cc b/src/commands/commander.cc
index 024db614..6ac10581 100644
--- a/src/commands/commander.cc
+++ b/src/commands/commander.cc
@@ -68,7 +68,7 @@ void CommandTable::GetCommandsInfo(std::string *info, const 
std::vector<std::str
   for (const auto &cmd_name : cmd_names) {
     auto cmd_iter = original_commands.find(util::ToLower(cmd_name));
     if (cmd_iter == original_commands.end()) {
-      info->append(redis::NilString());
+      info->append(NilString(RESP::v2));
     } else {
       auto command_attribute = cmd_iter->second;
       auto command_info = GetCommandInfo(command_attribute);
diff --git a/src/commands/scan_base.h b/src/commands/scan_base.h
index 0bfb188b..bab86e21 100644
--- a/src/commands/scan_base.h
+++ b/src/commands/scan_base.h
@@ -64,7 +64,8 @@ class CommandScanBase : public Commander {
     }
   }
 
-  std::string GenerateOutput(Server *srv, const std::vector<std::string> 
&keys, CursorType cursor_type) const {
+  std::string GenerateOutput(Server *srv, const Connection *conn, const 
std::vector<std::string> &keys,
+                             CursorType cursor_type) const {
     std::vector<std::string> list;
     if (keys.size() == static_cast<size_t>(limit_)) {
       auto end_cursor = srv->GenerateCursorFromKeyName(keys.back(), 
cursor_type);
@@ -73,7 +74,7 @@ class CommandScanBase : public Commander {
       list.emplace_back(redis::BulkString("0"));
     }
 
-    list.emplace_back(redis::MultiBulkString(keys, false));
+    list.emplace_back(conn->MultiBulkString(keys, false));
 
     return redis::Array(list);
   }
@@ -111,7 +112,7 @@ class CommandSubkeyScanBase : public CommandScanBase {
     return Commander::Parse(args);
   }
 
-  std::string GenerateOutput(Server *srv, const std::vector<std::string> 
&fields,
+  std::string GenerateOutput(Server *srv, const Connection *conn, const 
std::vector<std::string> &fields,
                              const std::vector<std::string> &values, 
CursorType cursor_type) {
     std::vector<std::string> list;
     auto items_count = fields.size();
@@ -128,7 +129,7 @@ class CommandSubkeyScanBase : public CommandScanBase {
         fvs.emplace_back(values[i]);
       }
     }
-    list.emplace_back(redis::MultiBulkString(fvs, false));
+    list.emplace_back(conn->MultiBulkString(fvs, false));
     return redis::Array(list);
   }
 
diff --git a/src/server/redis_connection.cc b/src/server/redis_connection.cc
index d6e0b5f6..87370ffa 100644
--- a/src/server/redis_connection.cc
+++ b/src/server/redis_connection.cc
@@ -130,6 +130,39 @@ void Connection::Reply(const std::string &msg) {
   redis::Reply(bufferevent_get_output(bev_), msg);
 }
 
+std::string Connection::Bool(bool b) const {
+  if (protocol_version_ == RESP::v3) {
+    return b ? "#t" CRLF : "#f" CRLF;
+  }
+  return Integer(b ? 1 : 0);
+}
+
+std::string Connection::MultiBulkString(const std::vector<std::string> &values,
+                                        bool output_nil_for_empty_string) 
const {
+  std::string result = "*" + std::to_string(values.size()) + CRLF;
+  for (const auto &value : values) {
+    if (value.empty() && output_nil_for_empty_string) {
+      result += NilString();
+    } else {
+      result += BulkString(value);
+    }
+  }
+  return result;
+}
+
+std::string Connection::MultiBulkString(const std::vector<std::string> &values,
+                                        const std::vector<rocksdb::Status> 
&statuses) const {
+  std::string result = "*" + std::to_string(values.size()) + CRLF;
+  for (size_t i = 0; i < values.size(); i++) {
+    if (i < statuses.size() && !statuses[i].ok()) {
+      result += NilString();
+    } else {
+      result += BulkString(values[i]);
+    }
+  }
+  return result;
+}
+
 void Connection::SendFile(int fd) {
   // NOTE: we don't need to close the fd, the libevent will do that
   auto output = bufferevent_get_output(bev_);
diff --git a/src/server/redis_connection.h b/src/server/redis_connection.h
index 34fbcbae..f35a3889 100644
--- a/src/server/redis_connection.h
+++ b/src/server/redis_connection.h
@@ -64,6 +64,12 @@ class Connection : public EvbufCallbackBase<Connection> {
   void Reply(const std::string &msg);
   RESP GetProtocolVersion() const { return protocol_version_; }
   void SetProtocolVersion(RESP version) { protocol_version_ = version; }
+  std::string Bool(bool b) const;
+  std::string NilString() const { return redis::NilString(protocol_version_); }
+  std::string NilArray() const { return protocol_version_ == RESP::v3 ? "_" 
CRLF : "*-1" CRLF; }
+  std::string MultiBulkString(const std::vector<std::string> &values, bool 
output_nil_for_empty_string = true) const;
+  std::string MultiBulkString(const std::vector<std::string> &values,
+                              const std::vector<rocksdb::Status> &statuses) 
const;
 
   using UnsubscribeCallback = std::function<void(std::string, int)>;
   void SubscribeChannel(const std::string &channel);
diff --git a/src/server/redis_reply.cc b/src/server/redis_reply.cc
index 768db4eb..95bbf9fd 100644
--- a/src/server/redis_reply.cc
+++ b/src/server/redis_reply.cc
@@ -32,32 +32,6 @@ std::string Error(const std::string &err) { return "-" + err 
+ CRLF; }
 
 std::string BulkString(const std::string &data) { return "$" + 
std::to_string(data.length()) + CRLF + data + CRLF; }
 
-std::string NilString() { return "$-1" CRLF; }
-
-std::string MultiBulkString(const std::vector<std::string> &values, bool 
output_nil_for_empty_string) {
-  std::string result = "*" + std::to_string(values.size()) + CRLF;
-  for (const auto &value : values) {
-    if (value.empty() && output_nil_for_empty_string) {
-      result += NilString();
-    } else {
-      result += BulkString(value);
-    }
-  }
-  return result;
-}
-
-std::string MultiBulkString(const std::vector<std::string> &values, const 
std::vector<rocksdb::Status> &statuses) {
-  std::string result = "*" + std::to_string(values.size()) + CRLF;
-  for (size_t i = 0; i < values.size(); i++) {
-    if (i < statuses.size() && !statuses[i].ok()) {
-      result += NilString();
-    } else {
-      result += BulkString(values[i]);
-    }
-  }
-  return result;
-}
-
 std::string Array(const std::vector<std::string> &list) {
   size_t n = std::accumulate(list.begin(), list.end(), 0, [](size_t n, const 
std::string &s) { return n + s.size(); });
   std::string result = "*" + std::to_string(list.size()) + CRLF;
@@ -67,6 +41,12 @@ std::string Array(const std::vector<std::string> &list) {
   return result;
 }
 
-std::string Command2RESP(const std::vector<std::string> &cmd_args) { return 
MultiBulkString(cmd_args, false); }
+std::string ArrayOfBulkStrings(const std::vector<std::string> &elems) {
+  std::string result = "*" + std::to_string(elems.size()) + CRLF;
+  for (const auto &elem : elems) {
+    result += BulkString(elem);
+  }
+  return result;
+}
 
 }  // namespace redis
diff --git a/src/server/redis_reply.h b/src/server/redis_reply.h
index e23380cc..213a8bc0 100644
--- a/src/server/redis_reply.h
+++ b/src/server/redis_reply.h
@@ -21,10 +21,8 @@
 #pragma once
 
 #include <event2/buffer.h>
-#include <rocksdb/status.h>
 
 #include <string>
-#include <type_traits>
 #include <vector>
 
 #define CRLF "\r\n"  // NOLINT
@@ -42,15 +40,14 @@ std::string Integer(T data) {
   return ":" + std::to_string(data) + CRLF;
 }
 
-inline std::string Bool(const RESP ver, const bool b) {
+inline std::string NilString(const RESP ver) {
   if (ver == RESP::v3) {
-    return b ? "#t" CRLF : "#f" CRLF;
+    return "_" CRLF;
   }
-  return Integer(b ? 1 : 0);
+  return "$-1" CRLF;
 }
 
 std::string BulkString(const std::string &data);
-std::string NilString();
 
 template <typename T, std::enable_if_t<std::is_integral_v<T>, int> = 0>
 std::string MultiLen(T len) {
@@ -58,8 +55,6 @@ std::string MultiLen(T len) {
 }
 
 std::string Array(const std::vector<std::string> &list);
-std::string MultiBulkString(const std::vector<std::string> &values, bool 
output_nil_for_empty_string = true);
-std::string MultiBulkString(const std::vector<std::string> &values, const 
std::vector<rocksdb::Status> &statuses);
-std::string Command2RESP(const std::vector<std::string> &cmd_args);
+std::string ArrayOfBulkStrings(const std::vector<std::string> &elements);
 
 }  // namespace redis
diff --git a/src/server/server.cc b/src/server/server.cc
index efe721b2..bc4c2939 100644
--- a/src/server/server.cc
+++ b/src/server/server.cc
@@ -1024,7 +1024,7 @@ void Server::GetRoleInfo(std::string *info) {
       roles.emplace_back("connecting");
     }
     roles.emplace_back(std::to_string(storage->LatestSeqNumber()));
-    *info = redis::MultiBulkString(roles);
+    *info = redis::ArrayOfBulkStrings(roles);
   } else {
     std::vector<std::string> list;
 
@@ -1032,7 +1032,7 @@ void Server::GetRoleInfo(std::string *info) {
     for (const auto &slave : slave_threads_) {
       if (slave->IsStopped()) continue;
 
-      list.emplace_back(redis::MultiBulkString({
+      list.emplace_back(redis::ArrayOfBulkStrings({
           slave->GetConn()->GetAnnounceIP(),
           std::to_string(slave->GetConn()->GetListeningPort()),
           std::to_string(slave->GetCurrentReplSeq()),
diff --git a/src/stats/log_collector.cc b/src/stats/log_collector.cc
index 1d3f9f40..842a7e55 100644
--- a/src/stats/log_collector.cc
+++ b/src/stats/log_collector.cc
@@ -32,7 +32,7 @@ std::string SlowEntry::ToRedisString() const {
   output.append(redis::Integer(id));
   output.append(redis::Integer(time));
   output.append(redis::Integer(duration));
-  output.append(redis::MultiBulkString(args));
+  output.append(redis::ArrayOfBulkStrings(args));
   output.append(redis::BulkString(ip + ":" + std::to_string(port)));
   output.append(redis::BulkString(client_name));
   return output;
diff --git a/src/storage/batch_extractor.cc b/src/storage/batch_extractor.cc
index db7174d0..56f58874 100644
--- a/src/storage/batch_extractor.cc
+++ b/src/storage/batch_extractor.cc
@@ -63,10 +63,10 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t 
column_family_id, const Slic
 
     if (metadata.Type() == kRedisString) {
       command_args = {"SET", user_key, 
value.ToString().substr(Metadata::GetOffsetAfterExpire(value[0]))};
-      resp_commands_[ns].emplace_back(redis::Command2RESP(command_args));
+      resp_commands_[ns].emplace_back(redis::ArrayOfBulkStrings(command_args));
       if (metadata.expire > 0) {
         command_args = {"PEXPIREAT", user_key, 
std::to_string(metadata.expire)};
-        resp_commands_[ns].emplace_back(redis::Command2RESP(command_args));
+        
resp_commands_[ns].emplace_back(redis::ArrayOfBulkStrings(command_args));
       }
     } else if (metadata.expire > 0) {
       auto args = log_data_.GetArguments();
@@ -80,7 +80,7 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t 
column_family_id, const Slic
         auto cmd = static_cast<RedisCommand>(*parse_result);
         if (cmd == kRedisCmdExpire) {
           command_args = {"PEXPIREAT", user_key, 
std::to_string(metadata.expire)};
-          resp_commands_[ns].emplace_back(redis::Command2RESP(command_args));
+          
resp_commands_[ns].emplace_back(redis::ArrayOfBulkStrings(command_args));
         }
       }
     }
@@ -103,7 +103,7 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t 
column_family_id, const Slic
                       std::to_string(stream_metadata.entries_added),
                       "MAXDELETEDID",
                       stream_metadata.max_deleted_entry_id.ToString()};
-      resp_commands_[ns].emplace_back(redis::Command2RESP(command_args));
+      resp_commands_[ns].emplace_back(redis::ArrayOfBulkStrings(command_args));
     }
 
     return rocksdb::Status::OK();
@@ -262,7 +262,7 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t 
column_family_id, const Slic
   }
 
   if (!command_args.empty()) {
-    resp_commands_[ns].emplace_back(redis::Command2RESP(command_args));
+    resp_commands_[ns].emplace_back(redis::ArrayOfBulkStrings(command_args));
   }
 
   return rocksdb::Status::OK();
@@ -387,7 +387,7 @@ rocksdb::Status WriteBatchExtractor::DeleteCF(uint32_t 
column_family_id, const S
   }
 
   if (!command_args.empty()) {
-    resp_commands_[ns].emplace_back(redis::Command2RESP(command_args));
+    resp_commands_[ns].emplace_back(redis::ArrayOfBulkStrings(command_args));
   }
 
   return rocksdb::Status::OK();
diff --git a/src/storage/scripting.cc b/src/storage/scripting.cc
index b8fbc02a..a6e73fa0 100644
--- a/src/storage/scripting.cc
+++ b/src/storage/scripting.cc
@@ -1086,9 +1086,9 @@ std::string ReplyToRedisReply(redis::Connection *conn, 
lua_State *lua) {
       break;
     case LUA_TBOOLEAN:
       if (conn->GetProtocolVersion() == redis::RESP::v2) {
-        output = lua_toboolean(lua, -1) ? redis::Integer(1) : 
redis::NilString();
+        output = lua_toboolean(lua, -1) ? redis::Integer(1) : 
conn->NilString();
       } else {
-        output = redis::Bool(redis::RESP::v3, lua_toboolean(lua, -1));
+        output = conn->Bool(lua_toboolean(lua, -1));
       }
       break;
     case LUA_TNUMBER:
@@ -1138,7 +1138,7 @@ std::string ReplyToRedisReply(redis::Connection *conn, 
lua_State *lua) {
       }
       break;
     default:
-      output = redis::NilString();
+      output = conn->NilString();
   }
   return output;
 }
diff --git a/tests/cppunit/string_reply_test.cc 
b/tests/cppunit/string_reply_test.cc
index 80e74972..8aa93507 100644
--- a/tests/cppunit/string_reply_test.cc
+++ b/tests/cppunit/string_reply_test.cc
@@ -39,7 +39,7 @@ class StringReplyTest : public testing::Test {
 std::vector<std::string> StringReplyTest::values;
 
 TEST_F(StringReplyTest, MultiBulkString) {
-  std::string result = redis::MultiBulkString(values);
+  std::string result = redis::ArrayOfBulkStrings(values);
   ASSERT_EQ(result.length(), 13 * 10 + 14 * 90 + 15 * 900 + 17 * 9000 + 18 * 
90000 + 9);
 }
 
diff --git a/tests/gocase/unit/debug/debug_test.go 
b/tests/gocase/unit/debug/debug_test.go
index 7221d830..6b65ad8c 100644
--- a/tests/gocase/unit/debug/debug_test.go
+++ b/tests/gocase/unit/debug/debug_test.go
@@ -52,6 +52,9 @@ func TestDebugProtocolV2(t *testing.T) {
                        require.NoError(t, r.Err())
                        require.EqualValues(t, expectedValue, r.Val())
                }
+
+               r := rdb.Do(ctx, "DEBUG", "PROTOCOL", "null")
+               require.EqualError(t, r.Err(), redis.Nil.Error())
        })
 
        t.Run("lua script return value type", func(t *testing.T) {
@@ -90,6 +93,8 @@ func TestDebugProtocolV3(t *testing.T) {
                        require.NoError(t, r.Err())
                        require.EqualValues(t, expectedValue, r.Val())
                }
+               r := rdb.Do(ctx, "DEBUG", "PROTOCOL", "null")
+               require.EqualError(t, r.Err(), redis.Nil.Error())
        })
 
        t.Run("lua script return value type", func(t *testing.T) {
diff --git a/tests/gocase/unit/protocol/protocol_test.go 
b/tests/gocase/unit/protocol/protocol_test.go
index 9ba40bec..7896cf00 100644
--- a/tests/gocase/unit/protocol/protocol_test.go
+++ b/tests/gocase/unit/protocol/protocol_test.go
@@ -137,3 +137,101 @@ func TestProtocolNetwork(t *testing.T) {
                c.MustRead(t, "+string")
        })
 }
+
+func TestProtocolRESP2(t *testing.T) {
+       srv := util.StartServer(t, map[string]string{
+               "resp3-enabled": "no",
+       })
+       defer srv.Close()
+
+       c := srv.NewTCPClient()
+       defer func() {
+               require.NoError(t, c.Close())
+       }()
+
+       t.Run("debug protocol string", func(t *testing.T) {
+               types := map[string][]string{
+                       "string":  {"$11", "Hello World"},
+                       "integer": {":12345"},
+                       "array":   {"*3", ":0", ":1", ":2"},
+                       "true":    {":1"},
+                       "false":   {":0"},
+                       "null":    {"$-1"},
+               }
+               for typ, expected := range types {
+                       args := []string{"DEBUG", "PROTOCOL", typ}
+                       require.NoError(t, c.WriteArgs(args...))
+                       for _, line := range expected {
+                               c.MustRead(t, line)
+                       }
+               }
+       })
+
+       t.Run("multi bulk strings with null string", func(t *testing.T) {
+               require.NoError(t, c.WriteArgs("HSET", "hash", "f1", "v1"))
+               c.MustRead(t, ":1")
+
+               require.NoError(t, c.WriteArgs("HMGET", "hash", "f1", "f2"))
+               c.MustRead(t, "*2")
+               c.MustRead(t, "$2")
+               c.MustRead(t, "v1")
+               c.MustRead(t, "$-1")
+       })
+
+       t.Run("null array", func(t *testing.T) {
+               require.NoError(t, c.WriteArgs("ZRANK", "no-exists-zset", "m0", 
"WITHSCORE"))
+               c.MustRead(t, "*-1")
+       })
+}
+
+func TestProtocolRESP3(t *testing.T) {
+       srv := util.StartServer(t, map[string]string{
+               "resp3-enabled": "yes",
+       })
+       defer srv.Close()
+
+       c := srv.NewTCPClient()
+       defer func() {
+               require.NoError(t, c.Close())
+       }()
+
+       t.Run("debug protocol string", func(t *testing.T) {
+               require.NoError(t, c.WriteArgs("HELLO", "3"))
+               values := []string{"*6", "$6", "server", "$5", "redis", "$5", 
"proto", ":3", "$4", "mode", "$10", "standalone"}
+               for _, line := range values {
+                       c.MustRead(t, line)
+               }
+
+               types := map[string][]string{
+                       "string":  {"$11", "Hello World"},
+                       "integer": {":12345"},
+                       "array":   {"*3", ":0", ":1", ":2"},
+                       "true":    {"#t"},
+                       "false":   {"#f"},
+                       "null":    {"_"},
+               }
+               for typ, expected := range types {
+                       args := []string{"DEBUG", "PROTOCOL", typ}
+                       require.NoError(t, c.WriteArgs(args...))
+                       for _, line := range expected {
+                               c.MustRead(t, line)
+                       }
+               }
+       })
+
+       t.Run("multi bulk strings with null", func(t *testing.T) {
+               require.NoError(t, c.WriteArgs("HSET", "hash", "f1", "v1"))
+               c.MustRead(t, ":1")
+
+               require.NoError(t, c.WriteArgs("HMGET", "hash", "f1", "f2"))
+               c.MustRead(t, "*2")
+               c.MustRead(t, "$2")
+               c.MustRead(t, "v1")
+               c.MustRead(t, "_")
+       })
+
+       t.Run("null array", func(t *testing.T) {
+               require.NoError(t, c.WriteArgs("ZRANK", "no-exists-zset", "m0", 
"WITHSCORE"))
+               c.MustRead(t, "_")
+       })
+}
diff --git a/utils/kvrocks2redis/parser.cc b/utils/kvrocks2redis/parser.cc
index 86b3e1a5..9d4db5ec 100644
--- a/utils/kvrocks2redis/parser.cc
+++ b/utils/kvrocks2redis/parser.cc
@@ -67,12 +67,12 @@ Status Parser::parseSimpleKV(const Slice &ns_key, const 
Slice &value, uint64_t e
   auto [ns, user_key] = ExtractNamespaceKey<std::string>(ns_key, 
slot_id_encoded_);
 
   auto command =
-      redis::Command2RESP({"SET", user_key, 
value.ToString().substr(Metadata::GetOffsetAfterExpire(value[0]))});
+      redis::ArrayOfBulkStrings({"SET", user_key, 
value.ToString().substr(Metadata::GetOffsetAfterExpire(value[0]))});
   Status s = writer_->Write(ns, {command});
   if (!s.IsOK()) return s;
 
   if (expire > 0) {
-    command = redis::Command2RESP({"EXPIREAT", user_key, std::to_string(expire 
/ 1000)});
+    command = redis::ArrayOfBulkStrings({"EXPIREAT", user_key, 
std::to_string(expire / 1000)});
     s = writer_->Write(ns, {command});
   }
 
@@ -105,17 +105,17 @@ Status Parser::parseComplexKV(const Slice &ns_key, const 
Metadata &metadata) {
     std::string value = iter->value().ToString();
     switch (type) {
       case kRedisHash:
-        output = redis::Command2RESP({"HSET", user_key, sub_key, value});
+        output = redis::ArrayOfBulkStrings({"HSET", user_key, sub_key, value});
         break;
       case kRedisSet:
-        output = redis::Command2RESP({"SADD", user_key, sub_key});
+        output = redis::ArrayOfBulkStrings({"SADD", user_key, sub_key});
         break;
       case kRedisList:
-        output = redis::Command2RESP({"RPUSH", user_key, value});
+        output = redis::ArrayOfBulkStrings({"RPUSH", user_key, value});
         break;
       case kRedisZSet: {
         double score = DecodeDouble(value.data());
-        output = redis::Command2RESP({"ZADD", user_key, 
util::Float2String(score), sub_key});
+        output = redis::ArrayOfBulkStrings({"ZADD", user_key, 
util::Float2String(score), sub_key});
         break;
       }
       case kRedisBitmap: {
@@ -126,7 +126,7 @@ Status Parser::parseComplexKV(const Slice &ns_key, const 
Metadata &metadata) {
       }
       case kRedisSortedint: {
         std::string val = 
std::to_string(DecodeFixed64(ikey.GetSubKey().data()));
-        output = redis::Command2RESP({"ZADD", user_key, val, val});
+        output = redis::ArrayOfBulkStrings({"ZADD", user_key, val, val});
         break;
       }
       default:
@@ -140,7 +140,7 @@ Status Parser::parseComplexKV(const Slice &ns_key, const 
Metadata &metadata) {
   }
 
   if (metadata.expire > 0) {
-    output = redis::Command2RESP({"EXPIREAT", user_key, 
std::to_string(metadata.expire / 1000)});
+    output = redis::ArrayOfBulkStrings({"EXPIREAT", user_key, 
std::to_string(metadata.expire / 1000)});
     Status s = writer_->Write(ns, {output});
     if (!s.IsOK()) return s.Prefixed("failed to write the EXPIREAT command to 
AOF");
   }
@@ -158,7 +158,7 @@ Status Parser::parseBitmapSegment(const Slice &ns, const 
Slice &user_key, int in
 
       s = writer_->Write(
           ns.ToString(),
-          {redis::Command2RESP({"SETBIT", user_key.ToString(), 
std::to_string(index * 8 + i * 8 + j), "1"})});
+          {redis::ArrayOfBulkStrings({"SETBIT", user_key.ToString(), 
std::to_string(index * 8 + i * 8 + j), "1"})});
       if (!s.IsOK()) return s.Prefixed("failed to write SETBIT command to 
AOF");
     }
   }
diff --git a/utils/kvrocks2redis/redis_writer.cc 
b/utils/kvrocks2redis/redis_writer.cc
index 27e7d1d8..74410669 100644
--- a/utils/kvrocks2redis/redis_writer.cc
+++ b/utils/kvrocks2redis/redis_writer.cc
@@ -72,7 +72,7 @@ Status RedisWriter::FlushDB(const std::string &ns) {
     return s;
   }
 
-  s = Write(ns, {redis::Command2RESP({"FLUSHDB"})});
+  s = Write(ns, {redis::ArrayOfBulkStrings({"FLUSHDB"})});
   if (!s.IsOK()) return s;
 
   return Status::OK();

Reply via email to