This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new d1acbcba Implement the RESP3 null for the nil string and array (#2017)
d1acbcba is described below
commit d1acbcbae8693bc895fa5749dd55ddc7458bedc8
Author: hulk <[email protected]>
AuthorDate: Mon Jan 15 18:40:23 2024 +0800
Implement the RESP3 null for the nil string and array (#2017)
For the null type in RESP2, the nil string is represented by `$-1\r\n`
and nil array is `*-1\r\n`. But after RESP3, they will be reduced into
a unified string `_\r\n`. So in this PR, most of them are renamed
redis::MultiBulkString/Redis::NilString to connection::MultiBulkString and
connection::NilString.
---
src/cluster/replication.cc | 16 ++---
src/cluster/slot_migrate.cc | 25 ++++----
src/cluster/sync_migrate_context.cc | 2 +-
src/commands/blocking_commander.h | 6 +-
src/commands/cmd_bit.cc | 2 +-
src/commands/cmd_bloom_filter.cc | 4 +-
src/commands/cmd_geo.cc | 28 ++++-----
src/commands/cmd_hash.cc | 20 +++---
src/commands/cmd_json.cc | 58 ++++++++---------
src/commands/cmd_list.cc | 34 +++++-----
src/commands/cmd_pubsub.cc | 33 +++++-----
src/commands/cmd_server.cc | 29 +++++----
src/commands/cmd_set.cc | 16 ++---
src/commands/cmd_stream.cc | 40 ++++++------
src/commands/cmd_string.cc | 18 +++---
src/commands/cmd_txn.cc | 2 +-
src/commands/cmd_zset.cc | 26 ++++----
src/commands/commander.cc | 2 +-
src/commands/scan_base.h | 9 +--
src/server/redis_connection.cc | 33 ++++++++++
src/server/redis_connection.h | 6 ++
src/server/redis_reply.cc | 34 +++-------
src/server/redis_reply.h | 13 ++--
src/server/server.cc | 4 +-
src/stats/log_collector.cc | 2 +-
src/storage/batch_extractor.cc | 12 ++--
src/storage/scripting.cc | 6 +-
tests/cppunit/string_reply_test.cc | 2 +-
tests/gocase/unit/debug/debug_test.go | 5 ++
tests/gocase/unit/protocol/protocol_test.go | 98 +++++++++++++++++++++++++++++
utils/kvrocks2redis/parser.cc | 18 +++---
utils/kvrocks2redis/redis_writer.cc | 2 +-
32 files changed, 364 insertions(+), 241 deletions(-)
diff --git a/src/cluster/replication.cc b/src/cluster/replication.cc
index c2d17b5a..51e536c7 100644
--- a/src/cluster/replication.cc
+++ b/src/cluster/replication.cc
@@ -396,7 +396,7 @@ void ReplicationThread::run() {
}
ReplicationThread::CBState ReplicationThread::authWriteCB(bufferevent *bev) {
- SendString(bev, redis::MultiBulkString({"AUTH",
srv_->GetConfig()->masterauth}));
+ SendString(bev, redis::ArrayOfBulkStrings({"AUTH",
srv_->GetConfig()->masterauth}));
LOG(INFO) << "[replication] Auth request was sent, waiting for response";
repl_state_.store(kReplSendAuth, std::memory_order_relaxed);
return CBState::NEXT;
@@ -418,7 +418,7 @@ ReplicationThread::CBState
ReplicationThread::authReadCB(bufferevent *bev) { //
}
ReplicationThread::CBState ReplicationThread::checkDBNameWriteCB(bufferevent
*bev) {
- SendString(bev, redis::MultiBulkString({"_db_name"}));
+ SendString(bev, redis::ArrayOfBulkStrings({"_db_name"}));
repl_state_.store(kReplCheckDBName, std::memory_order_relaxed);
LOG(INFO) << "[replication] Check db name request was sent, waiting for
response";
return CBState::NEXT;
@@ -456,7 +456,7 @@ ReplicationThread::CBState
ReplicationThread::replConfWriteCB(bufferevent *bev)
data_to_send.emplace_back("ip-address");
data_to_send.emplace_back(config->replica_announce_ip);
}
- SendString(bev, redis::MultiBulkString(data_to_send));
+ SendString(bev, redis::ArrayOfBulkStrings(data_to_send));
repl_state_.store(kReplReplConf, std::memory_order_relaxed);
LOG(INFO) << "[replication] replconf request was sent, waiting for response";
return CBState::NEXT;
@@ -513,11 +513,11 @@ ReplicationThread::CBState
ReplicationThread::tryPSyncWriteCB(bufferevent *bev)
// Also use old PSYNC if replica can't find replication id from WAL and DB.
if (!srv_->GetConfig()->use_rsid_psync || next_try_old_psync_ ||
replid.length() != kReplIdLength) {
next_try_old_psync_ = false; // Reset next_try_old_psync_
- SendString(bev, redis::MultiBulkString({"PSYNC",
std::to_string(next_seq)}));
+ SendString(bev, redis::ArrayOfBulkStrings({"PSYNC",
std::to_string(next_seq)}));
LOG(INFO) << "[replication] Try to use psync, next seq: " << next_seq;
} else {
// NEW PSYNC "Unique Replication Sequence ID": replication id and sequence
id
- SendString(bev, redis::MultiBulkString({"PSYNC", replid,
std::to_string(next_seq)}));
+ SendString(bev, redis::ArrayOfBulkStrings({"PSYNC", replid,
std::to_string(next_seq)}));
LOG(INFO) << "[replication] Try to use new psync, current unique
replication sequence id: " << replid << ":"
<< cur_seq;
}
@@ -607,7 +607,7 @@ ReplicationThread::CBState
ReplicationThread::incrementBatchLoopCB(bufferevent *
}
ReplicationThread::CBState ReplicationThread::fullSyncWriteCB(bufferevent
*bev) {
- SendString(bev, redis::MultiBulkString({"_fetch_meta"}));
+ SendString(bev, redis::ArrayOfBulkStrings({"_fetch_meta"}));
repl_state_.store(kReplFetchMeta, std::memory_order_relaxed);
LOG(INFO) << "[replication] Start syncing data with fullsync";
return CBState::NEXT;
@@ -835,7 +835,7 @@ Status ReplicationThread::sendAuth(int sock_fd, ssl_st
*ssl) {
std::string auth = srv_->GetConfig()->masterauth;
if (!auth.empty()) {
UniqueEvbuf evbuf;
- const auto auth_command = redis::MultiBulkString({"AUTH", auth});
+ const auto auth_command = redis::ArrayOfBulkStrings({"AUTH", auth});
auto s = util::SockSend(sock_fd, auth_command, ssl);
if (!s.IsOK()) return s.Prefixed("send auth command err");
while (true) {
@@ -921,7 +921,7 @@ Status ReplicationThread::fetchFiles(int sock_fd, const
std::string &dir, const
}
files_str.pop_back();
- const auto fetch_command = redis::MultiBulkString({"_fetch_file",
files_str});
+ const auto fetch_command = redis::ArrayOfBulkStrings({"_fetch_file",
files_str});
auto s = util::SockSend(sock_fd, fetch_command, ssl);
if (!s.IsOK()) return s.Prefixed("send fetch file command");
diff --git a/src/cluster/slot_migrate.cc b/src/cluster/slot_migrate.cc
index 09b220ae..424d0b4c 100644
--- a/src/cluster/slot_migrate.cc
+++ b/src/cluster/slot_migrate.cc
@@ -438,7 +438,7 @@ void SlotMigrator::clean() {
}
Status SlotMigrator::authOnDstNode(int sock_fd, const std::string &password) {
- std::string cmd = redis::MultiBulkString({"auth", password}, false);
+ std::string cmd = redis::ArrayOfBulkStrings({"auth", password});
auto s = util::SockSend(sock_fd, cmd);
if (!s.IsOK()) {
return s.Prefixed("failed to send AUTH command");
@@ -456,7 +456,7 @@ Status SlotMigrator::setImportStatusOnDstNode(int sock_fd,
int status) {
if (sock_fd <= 0) return {Status::NotOK, "invalid socket descriptor"};
std::string cmd =
- redis::MultiBulkString({"cluster", "import",
std::to_string(migrating_slot_), std::to_string(status)});
+ redis::ArrayOfBulkStrings({"cluster", "import",
std::to_string(migrating_slot_), std::to_string(status)});
auto s = util::SockSend(sock_fd, cmd);
if (!s.IsOK()) {
return s.Prefixed("failed to send command to the destination node");
@@ -666,7 +666,7 @@ Status SlotMigrator::migrateSimpleKey(const rocksdb::Slice
&key, const Metadata
command.emplace_back("PXAT");
command.emplace_back(std::to_string(metadata.expire));
}
- *restore_cmds += redis::MultiBulkString(command, false);
+ *restore_cmds += redis::ArrayOfBulkStrings(command);
current_pipeline_size_++;
// Check whether pipeline needs to be sent
@@ -747,7 +747,7 @@ Status SlotMigrator::migrateComplexKey(const rocksdb::Slice
&key, const Metadata
if (metadata.Type() != kRedisBitmap) {
item_count++;
if (item_count >= kMaxItemsInCommand) {
- *restore_cmds += redis::MultiBulkString(user_cmd, false);
+ *restore_cmds += redis::ArrayOfBulkStrings(user_cmd);
current_pipeline_size_++;
item_count = 0;
// Have to clear saved items
@@ -764,13 +764,13 @@ Status SlotMigrator::migrateComplexKey(const
rocksdb::Slice &key, const Metadata
// Have to check the item count of the last command list
if (item_count % kMaxItemsInCommand != 0) {
- *restore_cmds += redis::MultiBulkString(user_cmd, false);
+ *restore_cmds += redis::ArrayOfBulkStrings(user_cmd);
current_pipeline_size_++;
}
// Add TTL for complex key
if (metadata.expire > 0) {
- *restore_cmds += redis::MultiBulkString({"PEXPIREAT", key.ToString(),
std::to_string(metadata.expire)}, false);
+ *restore_cmds += redis::ArrayOfBulkStrings({"PEXPIREAT", key.ToString(),
std::to_string(metadata.expire)});
current_pipeline_size_++;
}
@@ -809,7 +809,7 @@ Status SlotMigrator::migrateStream(const Slice &key, const
StreamMetadata &metad
if (!s.IsOK()) {
return s;
}
- *restore_cmds += redis::MultiBulkString(user_cmd, false);
+ *restore_cmds += redis::ArrayOfBulkStrings(user_cmd);
current_pipeline_size_++;
user_cmd.erase(user_cmd.begin() + 2, user_cmd.end());
@@ -822,15 +822,14 @@ Status SlotMigrator::migrateStream(const Slice &key,
const StreamMetadata &metad
// commands like XTRIM and XDEL affect stream's metadata, but we use only
XADD for a slot migration
// XSETID is used to adjust stream's info on the destination node according
to the current values on the source
- *restore_cmds += redis::MultiBulkString(
- {"XSETID", key.ToString(), metadata.last_generated_id.ToString(),
"ENTRIESADDED",
- std::to_string(metadata.entries_added), "MAXDELETEDID",
metadata.max_deleted_entry_id.ToString()},
- false);
+ *restore_cmds += redis::ArrayOfBulkStrings({"XSETID", key.ToString(),
metadata.last_generated_id.ToString(),
+ "ENTRIESADDED",
std::to_string(metadata.entries_added), "MAXDELETEDID",
+
metadata.max_deleted_entry_id.ToString()});
current_pipeline_size_++;
// Add TTL
if (metadata.expire > 0) {
- *restore_cmds += redis::MultiBulkString({"PEXPIREAT", key.ToString(),
std::to_string(metadata.expire)}, false);
+ *restore_cmds += redis::ArrayOfBulkStrings({"PEXPIREAT", key.ToString(),
std::to_string(metadata.expire)});
current_pipeline_size_++;
}
@@ -862,7 +861,7 @@ Status SlotMigrator::migrateBitmapKey(const InternalKey
&inkey, std::unique_ptr<
uint32_t offset = (index * 8) + (byte_idx * 8) + bit_idx;
user_cmd->emplace_back(std::to_string(offset));
user_cmd->emplace_back("1");
- *restore_cmds += redis::MultiBulkString(*user_cmd, false);
+ *restore_cmds += redis::ArrayOfBulkStrings(*user_cmd);
current_pipeline_size_++;
user_cmd->erase(user_cmd->begin() + 2, user_cmd->end());
}
diff --git a/src/cluster/sync_migrate_context.cc
b/src/cluster/sync_migrate_context.cc
index 0633cecd..3ba2806c 100644
--- a/src/cluster/sync_migrate_context.cc
+++ b/src/cluster/sync_migrate_context.cc
@@ -54,7 +54,7 @@ void SyncMigrateContext::OnEvent(bufferevent *bev, int16_t
events) {
void SyncMigrateContext::TimerCB(int, int16_t events) {
auto &&slot_migrator = srv_->slot_migrator;
- conn_->Reply(redis::NilString());
+ conn_->Reply(conn_->NilString());
timer_.reset();
slot_migrator->CancelSyncCtx();
diff --git a/src/commands/blocking_commander.h
b/src/commands/blocking_commander.h
index 537e770f..05883a8a 100644
--- a/src/commands/blocking_commander.h
+++ b/src/commands/blocking_commander.h
@@ -31,7 +31,7 @@ class BlockingCommander : public Commander,
private EventCallbackBase<BlockingCommander> {
public:
// method to reply when no operation happens
- virtual std::string NoopReply() = 0;
+ virtual std::string NoopReply(const Connection *conn) = 0;
// method to block keys
virtual void BlockKeys() = 0;
@@ -48,7 +48,7 @@ class BlockingCommander : public Commander,
// usually put to the end of the Execute method
Status StartBlocking(int64_t timeout, std::string *output) {
if (conn_->IsInExec()) {
- *output = NoopReply();
+ *output = NoopReply(conn_);
return Status::OK(); // no blocking in multi-exec
}
@@ -111,7 +111,7 @@ class BlockingCommander : public Commander,
}
void TimerCB(int, int16_t) {
- conn_->Reply(NoopReply());
+ conn_->Reply(NoopReply(conn_));
timer_.reset();
UnblockKeys();
auto bev = conn_->GetBufferEvent();
diff --git a/src/commands/cmd_bit.cc b/src/commands/cmd_bit.cc
index 6e9fe5ba..65e90d04 100644
--- a/src/commands/cmd_bit.cc
+++ b/src/commands/cmd_bit.cc
@@ -342,7 +342,7 @@ class CommandBitfield : public Commander {
str_rets[i] = redis::Integer(rets[i]->Value());
}
} else {
- str_rets[i] = redis::NilString();
+ str_rets[i] = conn->NilString();
}
}
*output = redis::Array(str_rets);
diff --git a/src/commands/cmd_bloom_filter.cc b/src/commands/cmd_bloom_filter.cc
index e4316a3c..f33979e3 100644
--- a/src/commands/cmd_bloom_filter.cc
+++ b/src/commands/cmd_bloom_filter.cc
@@ -345,7 +345,7 @@ class CommandBFInfo : public Commander {
*output += redis::SimpleString("Number of items inserted");
*output += redis::Integer(info.size);
*output += redis::SimpleString("Expansion rate");
- *output += info.expansion == 0 ? redis::NilString() :
redis::Integer(info.expansion);
+ *output += info.expansion == 0 ? conn->NilString() :
redis::Integer(info.expansion);
break;
case BloomInfoType::kCapacity:
*output = redis::Integer(info.capacity);
@@ -360,7 +360,7 @@ class CommandBFInfo : public Commander {
*output = redis::Integer(info.size);
break;
case BloomInfoType::kExpansion:
- *output = info.expansion == 0 ? redis::NilString() :
redis::Integer(info.expansion);
+ *output = info.expansion == 0 ? conn->NilString() :
redis::Integer(info.expansion);
break;
}
diff --git a/src/commands/cmd_geo.cc b/src/commands/cmd_geo.cc
index 39854368..0f4d98eb 100644
--- a/src/commands/cmd_geo.cc
+++ b/src/commands/cmd_geo.cc
@@ -148,7 +148,7 @@ class CommandGeoDist : public CommandGeoBase {
}
if (s.IsNotFound()) {
- *output = redis::NilString();
+ *output = conn->NilString();
} else {
*output =
redis::BulkString(util::Float2String(GetDistanceByUnit(distance)));
}
@@ -177,7 +177,7 @@ class CommandGeoHash : public Commander {
hashes.resize(members_.size(), "");
}
- *output = redis::MultiBulkString(hashes);
+ *output = conn->MultiBulkString(hashes);
return Status::OK();
}
@@ -206,16 +206,16 @@ class CommandGeoPos : public Commander {
if (s.IsNotFound()) {
list.resize(members_.size(), "");
- *output = redis::MultiBulkString(list);
+ *output = conn->MultiBulkString(list);
return Status::OK();
}
for (const auto &member : members_) {
auto iter = geo_points.find(member.ToString());
if (iter == geo_points.end()) {
- list.emplace_back(redis::NilString());
+ list.emplace_back(conn->NilString());
} else {
- list.emplace_back(redis::MultiBulkString(
+ list.emplace_back(conn->MultiBulkString(
{util::Float2String(iter->second.longitude),
util::Float2String(iter->second.latitude)}));
}
}
@@ -314,12 +314,12 @@ class CommandGeoRadius : public CommandGeoBase {
if (store_key_.size() != 0) {
*output = redis::Integer(geo_points.size());
} else {
- *output = GenerateOutput(geo_points);
+ *output = GenerateOutput(conn, geo_points);
}
return Status::OK();
}
- std::string GenerateOutput(const std::vector<GeoPoint> &geo_points) {
+ std::string GenerateOutput(const Connection *conn, const
std::vector<GeoPoint> &geo_points) {
int result_length = static_cast<int>(geo_points.size());
int returned_items_count = (count_ == 0 || result_length < count_) ?
result_length : count_;
std::vector<std::string> list;
@@ -337,8 +337,8 @@ class CommandGeoRadius : public CommandGeoBase {
one.emplace_back(redis::BulkString(util::Float2String(geo_point.score)));
}
if (with_coord_) {
- one.emplace_back(redis::MultiBulkString(
- {util::Float2String(geo_point.longitude),
util::Float2String(geo_point.latitude)}));
+ one.emplace_back(
+ conn->MultiBulkString({util::Float2String(geo_point.longitude),
util::Float2String(geo_point.latitude)}));
}
list.emplace_back(redis::Array(one));
}
@@ -440,7 +440,7 @@ class CommandGeoSearch : public CommandGeoBase {
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}
- *output = generateOutput(geo_points);
+ *output = generateOutput(conn, geo_points);
return Status::OK();
}
@@ -496,7 +496,7 @@ class CommandGeoSearch : public CommandGeoBase {
return Status::OK();
}
- std::string generateOutput(const std::vector<GeoPoint> &geo_points) {
+ std::string generateOutput(const Connection *conn, const
std::vector<GeoPoint> &geo_points) {
int result_length = static_cast<int>(geo_points.size());
int returned_items_count = (count_ == 0 || result_length < count_) ?
result_length : count_;
std::vector<std::string> output;
@@ -515,8 +515,8 @@ class CommandGeoSearch : public CommandGeoBase {
one.emplace_back(redis::BulkString(util::Float2String(geo_point.score)));
}
if (with_coord_) {
- one.emplace_back(redis::MultiBulkString(
- {util::Float2String(geo_point.longitude),
util::Float2String(geo_point.latitude)}));
+ one.emplace_back(
+ conn->MultiBulkString({util::Float2String(geo_point.longitude),
util::Float2String(geo_point.latitude)}));
}
output.emplace_back(redis::Array(one));
}
@@ -644,7 +644,7 @@ class CommandGeoRadiusByMember : public CommandGeoRadius {
if (store_key_.size() != 0) {
*output = redis::Integer(geo_points.size());
} else {
- *output = GenerateOutput(geo_points);
+ *output = GenerateOutput(conn, geo_points);
}
return Status::OK();
diff --git a/src/commands/cmd_hash.cc b/src/commands/cmd_hash.cc
index 7ae90191..8b9526f9 100644
--- a/src/commands/cmd_hash.cc
+++ b/src/commands/cmd_hash.cc
@@ -37,7 +37,7 @@ class CommandHGet : public Commander {
return {Status::RedisExecErr, s.ToString()};
}
- *output = s.IsNotFound() ? redis::NilString() : redis::BulkString(value);
+ *output = s.IsNotFound() ? conn->NilString() : redis::BulkString(value);
return Status::OK();
}
};
@@ -208,9 +208,9 @@ class CommandHMGet : public Commander {
if (s.IsNotFound()) {
values.resize(fields.size(), "");
- *output = redis::MultiBulkString(values);
+ *output = conn->MultiBulkString(values);
} else {
- *output = redis::MultiBulkString(values, statuses);
+ *output = conn->MultiBulkString(values, statuses);
}
return Status::OK();
}
@@ -263,7 +263,7 @@ class CommandHKeys : public Commander {
for (const auto &fv : field_values) {
keys.emplace_back(fv.field);
}
- *output = redis::MultiBulkString(keys);
+ *output = conn->MultiBulkString(keys);
return Status::OK();
}
@@ -284,7 +284,7 @@ class CommandHVals : public Commander {
for (const auto &p : field_values) {
values.emplace_back(p.value);
}
- *output = MultiBulkString(values, false);
+ *output = conn->MultiBulkString(values, false);
return Status::OK();
}
@@ -306,7 +306,7 @@ class CommandHGetAll : public Commander {
kv_pairs.emplace_back(p.field);
kv_pairs.emplace_back(p.value);
}
- *output = MultiBulkString(kv_pairs, false);
+ *output = conn->MultiBulkString(kv_pairs, false);
return Status::OK();
}
@@ -350,7 +350,7 @@ class CommandHRangeByLex : public Commander {
kv_pairs.emplace_back(p.field);
kv_pairs.emplace_back(p.value);
}
- *output = MultiBulkString(kv_pairs, false);
+ *output = conn->MultiBulkString(kv_pairs, false);
return Status::OK();
}
@@ -372,7 +372,7 @@ class CommandHScan : public CommandSubkeyScanBase {
return {Status::RedisExecErr, s.ToString()};
}
- *output = GenerateOutput(srv, fields, values, CursorType::kTypeHash);
+ *output = GenerateOutput(srv, conn, fields, values, CursorType::kTypeHash);
return Status::OK();
}
};
@@ -415,9 +415,9 @@ class CommandHRandField : public Commander {
}
if (no_parameters_)
- *output = s.IsNotFound() ? redis::NilString() :
redis::BulkString(result_entries[0]);
+ *output = s.IsNotFound() ? conn->NilString() :
redis::BulkString(result_entries[0]);
else
- *output = redis::MultiBulkString(result_entries, false);
+ *output = conn->MultiBulkString(result_entries, false);
return Status::OK();
}
diff --git a/src/commands/cmd_json.cc b/src/commands/cmd_json.cc
index 737581f8..5377ed31 100644
--- a/src/commands/cmd_json.cc
+++ b/src/commands/cmd_json.cc
@@ -33,13 +33,13 @@
namespace redis {
template <typename T, std::enable_if_t<std::is_integral_v<T>, int> = 0>
-std::string OptionalsToString(Optionals<T> &opts) {
+std::string OptionalsToString(const Connection *conn, Optionals<T> &opts) {
std::string str = MultiLen(opts.size());
for (const auto &opt : opts) {
if (opt.has_value()) {
str += redis::Integer(opt.value());
} else {
- str += redis::NilString();
+ str += conn->NilString();
}
}
return str;
@@ -100,7 +100,7 @@ class CommandJsonGet : public Commander {
JsonValue result;
auto s = json.Get(args_[1], paths_, &result);
if (s.IsNotFound()) {
- *output = redis::NilString();
+ *output = conn->NilString();
return Status::OK();
}
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
@@ -129,7 +129,7 @@ class CommandJsonInfo : public Commander {
auto format_str = storage_format == JsonStorageFormat::JSON ? "json"
: storage_format == JsonStorageFormat::CBOR ? "cbor"
: "unknown";
- output->append(redis::MultiBulkString({"storage_format", format_str}));
+ output->append(conn->MultiBulkString({"storage_format", format_str}));
return Status::OK();
}
};
@@ -144,7 +144,7 @@ class CommandJsonArrAppend : public Commander {
auto s = json.ArrAppend(args_[1], args_[2], {args_.begin() + 3,
args_.end()}, &results);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
- *output = OptionalsToString(results);
+ *output = OptionalsToString(conn, results);
return Status::OK();
}
};
@@ -169,12 +169,12 @@ class CommandJsonArrInsert : public Commander {
auto s = json.ArrInsert(args_[1], args_[2], index_, {args_.begin() + 4,
args_.end()}, &results);
if (s.IsNotFound()) {
- *output = redis::NilString();
+ *output = conn->NilString();
return Status::OK();
}
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
- *output = OptionalsToString(results);
+ *output = OptionalsToString(conn, results);
return Status::OK();
}
@@ -198,11 +198,11 @@ class CommandJsonType : public Commander {
auto s = json.Type(args_[1], path, &types);
if (!s.ok() && !s.IsNotFound()) return {Status::RedisExecErr,
s.ToString()};
if (s.IsNotFound()) {
- *output = redis::NilString();
+ *output = conn->NilString();
return Status::OK();
}
- *output = redis::MultiBulkString(types);
+ *output = conn->MultiBulkString(types);
return Status::OK();
}
};
@@ -219,16 +219,16 @@ class CommandJsonObjkeys : public Commander {
auto s = json.ObjKeys(args_[1], path, &results);
if (!s.ok() && !s.IsNotFound()) return {Status::RedisExecErr,
s.ToString()};
if (s.IsNotFound()) {
- *output = redis::NilString();
+ *output = conn->NilString();
return Status::OK();
}
*output = redis::MultiLen(results.size());
for (const auto &item : results) {
if (item.has_value()) {
- *output += redis::MultiBulkString(item.value(), false);
+ *output += conn->MultiBulkString(item.value(), false);
} else {
- *output += redis::NilString();
+ *output += conn->NilString();
}
}
@@ -248,7 +248,7 @@ class CommandJsonClear : public Commander {
auto s = json.Clear(args_[1], path, &result);
if (s.IsNotFound()) {
- *output = redis::NilString();
+ *output = conn->NilString();
return Status::OK();
}
@@ -269,11 +269,11 @@ class CommandJsonToggle : public Commander {
auto s = json.Toggle(args_[1], path, &results);
if (!s.ok() && !s.IsNotFound()) return {Status::RedisExecErr,
s.ToString()};
if (s.IsNotFound()) {
- *output = redis::NilString();
+ *output = conn->NilString();
return Status::OK();
}
- *output = OptionalsToString(results);
+ *output = OptionalsToString(conn, results);
return Status::OK();
}
};
@@ -293,12 +293,12 @@ class CommandJsonArrLen : public Commander {
Optionals<uint64_t> results;
auto s = json.ArrLen(args_[1], path, &results);
if (s.IsNotFound()) {
- *output = redis::NilString();
+ *output = conn->NilString();
return Status::OK();
}
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
- *output = OptionalsToString(results);
+ *output = OptionalsToString(conn, results);
return Status::OK();
}
};
@@ -320,7 +320,7 @@ class CommandJsonMerge : public Commander {
}
if (!result) {
- *output = redis::NilString();
+ *output = conn->NilString();
} else {
*output = redis::SimpleString("OK");
}
@@ -356,7 +356,7 @@ class CommandJsonArrPop : public Commander {
if (data.has_value()) {
*output += redis::BulkString(GET_OR_RET(data->Print()));
} else {
- *output += redis::NilString();
+ *output += conn->NilString();
}
}
@@ -383,12 +383,12 @@ class CommandJsonObjLen : public Commander {
Optionals<uint64_t> results;
auto s = json.ObjLen(args_[1], path, &results);
if (s.IsNotFound()) {
- *output = redis::NilString();
+ *output = conn->NilString();
return Status::OK();
}
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
- *output = OptionalsToString(results);
+ *output = OptionalsToString(conn, results);
return Status::OK();
}
};
@@ -411,12 +411,12 @@ class CommandJsonArrTrim : public Commander {
auto s = json.ArrTrim(args_[1], path_, start_, stop_, &results);
if (s.IsNotFound()) {
- *output = redis::NilString();
+ *output = conn->NilString();
return Status::OK();
}
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
- *output = OptionalsToString(results);
+ *output = OptionalsToString(conn, results);
return Status::OK();
}
@@ -452,13 +452,13 @@ class CommanderJsonArrIndex : public Commander {
auto s = json.ArrIndex(args_[1], args_[2], args_[3], start_, end_,
&results);
if (s.IsNotFound()) {
- *output = redis::NilString();
+ *output = conn->NilString();
return Status::OK();
}
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
- *output = OptionalsToString(results);
+ *output = OptionalsToString(conn, results);
return Status::OK();
}
@@ -480,7 +480,7 @@ class CommandJsonDel : public Commander {
}
auto s = json.Del(args_[1], path, &result);
if (s.IsNotFound()) {
- *output = redis::NilString();
+ *output = conn->NilString();
return Status::OK();
}
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
@@ -540,7 +540,7 @@ class CommandJsonStrAppend : public Commander {
auto s = json.StrAppend(args_[1], path, args_[3], &results);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
- *output = OptionalsToString(results);
+ *output = OptionalsToString(conn, results);
return Status::OK();
}
};
@@ -561,7 +561,7 @@ class CommandJsonStrLen : public Commander {
auto s = json.StrLen(args_[1], path, &results);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
- *output = OptionalsToString(results);
+ *output = OptionalsToString(conn, results);
return Status::OK();
}
};
@@ -589,7 +589,7 @@ class CommandJsonMGet : public Commander {
}
}
- *output = MultiBulkString(values, statuses);
+ *output = conn->MultiBulkString(values, statuses);
return Status::OK();
}
};
diff --git a/src/commands/cmd_list.cc b/src/commands/cmd_list.cc
index 86a0708d..5470d444 100644
--- a/src/commands/cmd_list.cc
+++ b/src/commands/cmd_list.cc
@@ -119,9 +119,9 @@ class CommandPop : public Commander {
}
if (s.IsNotFound()) {
- *output = redis::MultiLen(-1);
+ *output = conn->NilArray();
} else {
- *output = redis::MultiBulkString(elems);
+ *output = conn->MultiBulkString(elems);
}
} else {
std::string elem;
@@ -131,7 +131,7 @@ class CommandPop : public Commander {
}
if (s.IsNotFound()) {
- *output = redis::NilString();
+ *output = conn->NilString();
} else {
*output = redis::BulkString(elem);
}
@@ -209,9 +209,9 @@ class CommandLMPop : public Commander {
}
if (elems.empty()) {
- *output = redis::NilString();
+ *output = conn->NilString();
} else {
- std::string elems_bulk = redis::MultiBulkString(elems);
+ std::string elems_bulk = conn->MultiBulkString(elems);
*output = redis::Array({redis::BulkString(chosen_key),
std::move(elems_bulk)});
}
@@ -298,10 +298,10 @@ class CommandBPop : public BlockingCommander {
if (s.ok()) {
if (!last_key_ptr) {
- conn_->Reply(redis::MultiBulkString({"", ""}));
+ conn_->Reply(conn_->MultiBulkString({"", ""}));
} else {
conn_->GetServer()->UpdateWatchedKeysManually({*last_key_ptr});
- conn_->Reply(redis::MultiBulkString({*last_key_ptr, std::move(elem)}));
+ conn_->Reply(conn_->MultiBulkString({*last_key_ptr, std::move(elem)}));
}
} else if (!s.IsNotFound()) {
conn_->Reply(redis::Error("ERR " + s.ToString()));
@@ -315,7 +315,7 @@ class CommandBPop : public BlockingCommander {
return !s.IsNotFound();
}
- std::string NoopReply() override { return redis::NilString(); }
+ std::string NoopReply(const Connection *conn) override { return
conn->NilString(); }
private:
bool left_ = false;
@@ -410,7 +410,7 @@ class CommandBLMPop : public BlockingCommander {
if (s.ok()) {
if (!elems.empty()) {
conn_->GetServer()->UpdateWatchedKeysManually({chosen_key});
- std::string elems_bulk = redis::MultiBulkString(elems);
+ std::string elems_bulk = conn_->MultiBulkString(elems);
conn_->Reply(redis::Array({redis::BulkString(chosen_key),
std::move(elems_bulk)}));
}
} else if (!s.IsNotFound()) {
@@ -437,7 +437,7 @@ class CommandBLMPop : public BlockingCommander {
return !s.IsNotFound();
}
- std::string NoopReply() override { return redis::NilString(); }
+ std::string NoopReply(const Connection *conn) override { return
conn->NilString(); }
static const inline CommandKeyRangeGen keyRangeGen = [](const
std::vector<std::string> &args) {
CommandKeyRange range;
@@ -536,7 +536,7 @@ class CommandLRange : public Commander {
return {Status::RedisExecErr, s.ToString()};
}
- *output = redis::MultiBulkString(elems, false);
+ *output = conn->MultiBulkString(elems, false);
return Status::OK();
}
@@ -580,7 +580,7 @@ class CommandLIndex : public Commander {
}
if (s.IsNotFound()) {
- *output = redis::NilString();
+ *output = conn->NilString();
} else {
*output = redis::BulkString(elem);
}
@@ -663,7 +663,7 @@ class CommandRPopLPUSH : public Commander {
return {Status::RedisExecErr, s.ToString()};
}
- *output = s.IsNotFound() ? redis::NilString() : redis::BulkString(elem);
+ *output = s.IsNotFound() ? conn->NilString() : redis::BulkString(elem);
return Status::OK();
}
};
@@ -694,7 +694,7 @@ class CommandLMove : public Commander {
return {Status::RedisExecErr, s.ToString()};
}
- *output = s.IsNotFound() ? redis::NilString() : redis::BulkString(elem);
+ *output = s.IsNotFound() ? conn->NilString() : redis::BulkString(elem);
return Status::OK();
}
@@ -769,7 +769,7 @@ class CommandBLMove : public BlockingCommander {
return !empty;
}
- std::string NoopReply() override { return redis::MultiLen(-1); }
+ std::string NoopReply(const Connection *conn) override { return
conn->NilArray(); }
private:
bool src_left_;
@@ -826,7 +826,7 @@ class CommandLPos : public Commander {
// We return nil or a single value if `COUNT` option is not given.
if (!spec_.count.has_value()) {
if (s.IsNotFound() || indexes.empty()) {
- *output = redis::NilString();
+ *output = conn->NilString();
} else {
assert(indexes.size() == 1);
*output = redis::Integer(indexes[0]);
@@ -839,7 +839,7 @@ class CommandLPos : public Commander {
for (const auto &index : indexes) {
values.emplace_back(std::to_string(index));
}
- *output = redis::MultiBulkString(values, false);
+ *output = conn->MultiBulkString(values, false);
}
return Status::OK();
}
diff --git a/src/commands/cmd_pubsub.cc b/src/commands/cmd_pubsub.cc
index 6ec61eea..8f0ddfbd 100644
--- a/src/commands/cmd_pubsub.cc
+++ b/src/commands/cmd_pubsub.cc
@@ -73,10 +73,11 @@ class CommandMPublish : public Commander {
}
};
-void SubscribeCommandReply(std::string *output, const std::string &name, const
std::string &sub_name, int num) {
+void SubscribeCommandReply(const Connection *conn, std::string *output, const
std::string &name,
+ const std::string &sub_name, int num) {
output->append(redis::MultiLen(3));
output->append(redis::BulkString(name));
- output->append(sub_name.empty() ? redis::NilString() :
redis::BulkString(sub_name));
+ output->append(sub_name.empty() ? conn->NilString() : BulkString(sub_name));
output->append(redis::Integer(num));
}
@@ -85,7 +86,8 @@ class CommandSubscribe : public Commander {
Status Execute(Server *srv, Connection *conn, std::string *output) override {
for (unsigned i = 1; i < args_.size(); i++) {
conn->SubscribeChannel(args_[i]);
- SubscribeCommandReply(output, "subscribe", args_[i],
conn->SubscriptionsCount() + conn->PSubscriptionsCount());
+ SubscribeCommandReply(conn, output, "subscribe", args_[i],
+ conn->SubscriptionsCount() +
conn->PSubscriptionsCount());
}
return Status::OK();
}
@@ -95,13 +97,13 @@ class CommandUnSubscribe : public Commander {
public:
Status Execute(Server *srv, Connection *conn, std::string *output) override {
if (args_.size() == 1) {
- conn->UnsubscribeAll([output](const std::string &sub_name, int num) {
- SubscribeCommandReply(output, "unsubscribe", sub_name, num);
+ conn->UnsubscribeAll([conn, output](const std::string &sub_name, int
num) {
+ SubscribeCommandReply(conn, output, "unsubscribe", sub_name, num);
});
} else {
for (size_t i = 1; i < args_.size(); i++) {
conn->UnsubscribeChannel(args_[i]);
- SubscribeCommandReply(output, "unsubscribe", args_[i],
+ SubscribeCommandReply(conn, output, "unsubscribe", args_[i],
conn->SubscriptionsCount() +
conn->PSubscriptionsCount());
}
}
@@ -114,7 +116,8 @@ class CommandPSubscribe : public Commander {
Status Execute(Server *srv, Connection *conn, std::string *output) override {
for (size_t i = 1; i < args_.size(); i++) {
conn->PSubscribeChannel(args_[i]);
- SubscribeCommandReply(output, "psubscribe", args_[i],
conn->SubscriptionsCount() + conn->PSubscriptionsCount());
+ SubscribeCommandReply(conn, output, "psubscribe", args_[i],
+ conn->SubscriptionsCount() +
conn->PSubscriptionsCount());
}
return Status::OK();
}
@@ -124,13 +127,13 @@ class CommandPUnSubscribe : public Commander {
public:
Status Execute(Server *srv, Connection *conn, std::string *output) override {
if (args_.size() == 1) {
- conn->PUnsubscribeAll([output](const std::string &sub_name, int num) {
- SubscribeCommandReply(output, "punsubscribe", sub_name, num);
+ conn->PUnsubscribeAll([conn, output](const std::string &sub_name, int
num) {
+ SubscribeCommandReply(conn, output, "punsubscribe", sub_name, num);
});
} else {
for (size_t i = 1; i < args_.size(); i++) {
conn->PUnsubscribeChannel(args_[i]);
- SubscribeCommandReply(output, "punsubscribe", args_[i],
+ SubscribeCommandReply(conn, output, "punsubscribe", args_[i],
conn->SubscriptionsCount() +
conn->PSubscriptionsCount());
}
}
@@ -153,7 +156,7 @@ class CommandSSubscribe : public Commander {
for (unsigned int i = 1; i < args_.size(); i++) {
conn->SSubscribeChannel(args_[i], slot);
- SubscribeCommandReply(output, "ssubscribe", args_[i],
conn->SSubscriptionsCount());
+ SubscribeCommandReply(conn, output, "ssubscribe", args_[i],
conn->SSubscriptionsCount());
}
return Status::OK();
}
@@ -163,13 +166,13 @@ class CommandSUnSubscribe : public Commander {
public:
Status Execute(Server *srv, Connection *conn, std::string *output) override {
if (args_.size() == 1) {
- conn->SUnsubscribeAll([output](const std::string &sub_name, int num) {
- SubscribeCommandReply(output, "sunsubscribe", sub_name, num);
+ conn->SUnsubscribeAll([conn, output](const std::string &sub_name, int
num) {
+ SubscribeCommandReply(conn, output, "sunsubscribe", sub_name, num);
});
} else {
for (size_t i = 1; i < args_.size(); i++) {
conn->SUnsubscribeChannel(args_[i], srv->GetConfig()->cluster_enabled
? GetSlotIdFromKey(args_[i]) : 0);
- SubscribeCommandReply(output, "sunsubscribe", args_[i],
conn->SSubscriptionsCount());
+ SubscribeCommandReply(conn, output, "sunsubscribe", args_[i],
conn->SSubscriptionsCount());
}
}
return Status::OK();
@@ -231,7 +234,7 @@ class CommandPubSub : public Commander {
} else {
srv->GetSChannelsByPattern(pattern_, &channels);
}
- *output = redis::MultiBulkString(channels);
+ *output = conn->MultiBulkString(channels);
return Status::OK();
}
diff --git a/src/commands/cmd_server.cc b/src/commands/cmd_server.cc
index ef1f9a17..71781cd5 100644
--- a/src/commands/cmd_server.cc
+++ b/src/commands/cmd_server.cc
@@ -105,11 +105,11 @@ class CommandNamespace : public Commander {
}
namespaces.emplace_back(kDefaultNamespace);
namespaces.emplace_back(config->requirepass);
- *output = redis::MultiBulkString(namespaces, false);
+ *output = conn->MultiBulkString(namespaces, false);
} else {
auto token = srv->GetNamespace()->Get(args_[2]);
if (token.Is<Status::NotFound>()) {
- *output = redis::NilString();
+ *output = conn->NilString();
} else {
*output = redis::BulkString(token.GetValue());
}
@@ -155,7 +155,7 @@ class CommandKeys : public Commander {
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}
- *output = redis::MultiBulkString(keys);
+ *output = conn->MultiBulkString(keys);
return Status::OK();
}
};
@@ -252,7 +252,7 @@ class CommandConfig : public Commander {
} else if (args_.size() == 3 && sub_command == "get") {
std::vector<std::string> values;
config->Get(args_[2], &values);
- *output = redis::MultiBulkString(values);
+ *output = conn->MultiBulkString(values);
} else if (args_.size() == 4 && sub_command == "set") {
Status s = config->Set(srv, args_[2], args_[3]);
if (!s.IsOK()) {
@@ -302,7 +302,7 @@ class CommandDisk : public Commander {
if (!s.ok()) {
// Redis returns the Nil string when the key does not exist
if (s.IsNotFound()) {
- *output = redis::NilString();
+ *output = conn->NilString();
return Status::OK();
}
return {Status::RedisExecErr, s.ToString()};
@@ -514,7 +514,7 @@ class CommandClient : public Commander {
return Status::OK();
} else if (subcommand_ == "getname") {
std::string name = conn->GetName();
- *output = name == "" ? redis::NilString() : redis::BulkString(name);
+ *output = name == "" ? conn->NilString() : redis::BulkString(name);
return Status::OK();
} else if (subcommand_ == "id") {
*output = redis::Integer(conn->GetID());
@@ -613,12 +613,14 @@ class CommandDebug : public Commander {
*output += redis::Integer(i);
}
} else if (protocol_type_ == "true") {
- *output = redis::Bool(conn->GetProtocolVersion(), true);
+ *output = conn->Bool(true);
} else if (protocol_type_ == "false") {
- *output = redis::Bool(conn->GetProtocolVersion(), false);
+ *output = conn->Bool(false);
+ } else if (protocol_type_ == "null") {
+ *output = conn->NilString();
} else {
*output =
- redis::Error("Wrong protocol type name. Please use one of the
following: string|int|array|true|false");
+ redis::Error("Wrong protocol type name. Please use one of the
following: string|int|array|true|false|null");
}
} else {
return {Status::RedisInvalidCmd, "Unknown subcommand, should be DEBUG or
PROTOCOL"};
@@ -668,7 +670,7 @@ class CommandCommand : public Commander {
for (const auto &key_index : keys_indexes) {
keys.emplace_back(args_[key_index + 2]);
}
- *output = redis::MultiBulkString(keys);
+ *output = conn->MultiBulkString(keys);
} else {
return {Status::RedisExecErr, "Command subcommand must be one of
COUNT, GETKEYS, INFO"};
}
@@ -807,7 +809,8 @@ class CommandScan : public CommandScanBase {
return Commander::Parse(args);
}
- static std::string GenerateOutput(Server *srv, const
std::vector<std::string> &keys, const std::string &end_cursor) {
+ static std::string GenerateOutput(Server *srv, const Connection *conn, const
std::vector<std::string> &keys,
+ const std::string &end_cursor) {
std::vector<std::string> list;
if (!end_cursor.empty()) {
list.emplace_back(
@@ -816,7 +819,7 @@ class CommandScan : public CommandScanBase {
list.emplace_back(redis::BulkString("0"));
}
- list.emplace_back(redis::MultiBulkString(keys, false));
+ list.emplace_back(conn->MultiBulkString(keys, false));
return redis::Array(list);
}
@@ -831,7 +834,7 @@ class CommandScan : public CommandScanBase {
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}
- *output = GenerateOutput(srv, keys, end_key);
+ *output = GenerateOutput(srv, conn, keys, end_key);
return Status::OK();
}
};
diff --git a/src/commands/cmd_set.cc b/src/commands/cmd_set.cc
index f2c60a3b..0b5a8bda 100644
--- a/src/commands/cmd_set.cc
+++ b/src/commands/cmd_set.cc
@@ -93,7 +93,7 @@ class CommandSMembers : public Commander {
return {Status::RedisExecErr, s.ToString()};
}
- *output = redis::MultiBulkString(members, false);
+ *output = conn->MultiBulkString(members, false);
return Status::OK();
}
};
@@ -171,12 +171,12 @@ class CommandSPop : public Commander {
}
if (with_count_) {
- *output = redis::MultiBulkString(members, false);
+ *output = conn->MultiBulkString(members, false);
} else {
if (members.size() > 0) {
*output = redis::BulkString(members.front());
} else {
- *output = redis::NilString();
+ *output = conn->NilString();
}
}
return Status::OK();
@@ -211,7 +211,7 @@ class CommandSRandMember : public Commander {
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}
- *output = redis::MultiBulkString(members, false);
+ *output = conn->MultiBulkString(members, false);
return Status::OK();
}
@@ -249,7 +249,7 @@ class CommandSDiff : public Commander {
return {Status::RedisExecErr, s.ToString()};
}
- *output = redis::MultiBulkString(members, false);
+ *output = conn->MultiBulkString(members, false);
return Status::OK();
}
};
@@ -269,7 +269,7 @@ class CommandSUnion : public Commander {
return {Status::RedisExecErr, s.ToString()};
}
- *output = redis::MultiBulkString(members, false);
+ *output = conn->MultiBulkString(members, false);
return Status::OK();
}
};
@@ -289,7 +289,7 @@ class CommandSInter : public Commander {
return {Status::RedisExecErr, s.ToString()};
}
- *output = redis::MultiBulkString(members, false);
+ *output = conn->MultiBulkString(members, false);
return Status::OK();
}
};
@@ -432,7 +432,7 @@ class CommandSScan : public CommandSubkeyScanBase {
return {Status::RedisExecErr, s.ToString()};
}
- *output = CommandScanBase::GenerateOutput(srv, members,
CursorType::kTypeSet);
+ *output = CommandScanBase::GenerateOutput(srv, conn, members,
CursorType::kTypeSet);
return Status::OK();
}
};
diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc
index 545d438d..f82497fc 100644
--- a/src/commands/cmd_stream.cc
+++ b/src/commands/cmd_stream.cc
@@ -153,7 +153,7 @@ class CommandXAdd : public Commander {
}
if (s.IsNotFound() && nomkstream_) {
- *output = redis::NilString();
+ *output = conn->NilString();
return Status::OK();
}
@@ -464,17 +464,17 @@ class CommandXInfo : public Commander {
if (info.first_entry) {
output->append(redis::MultiLen(2));
output->append(redis::BulkString(info.first_entry->key));
- output->append(redis::MultiBulkString(info.first_entry->values));
+ output->append(conn->MultiBulkString(info.first_entry->values));
} else {
- output->append(redis::NilString());
+ output->append(conn->NilString());
}
output->append(redis::BulkString("last-entry"));
if (info.last_entry) {
output->append(redis::MultiLen(2));
output->append(redis::BulkString(info.last_entry->key));
- output->append(redis::MultiBulkString(info.last_entry->values));
+ output->append(conn->MultiBulkString(info.last_entry->values));
} else {
- output->append(redis::NilString());
+ output->append(conn->NilString());
}
} else {
output->append(redis::BulkString("entries"));
@@ -482,7 +482,7 @@ class CommandXInfo : public Commander {
for (const auto &e : info.entries) {
output->append(redis::MultiLen(2));
output->append(redis::BulkString(e.key));
- output->append(redis::MultiBulkString(e.values));
+ output->append(conn->MultiBulkString(e.values));
}
}
@@ -514,13 +514,13 @@ class CommandXInfo : public Commander {
output->append(redis::BulkString(it.second.last_delivered_id.ToString()));
output->append(redis::BulkString("entries-read"));
if (it.second.entries_read == -1) {
- output->append(redis::NilString());
+ output->append(conn->NilString());
} else {
output->append(redis::Integer(it.second.entries_read));
}
output->append(redis::BulkString("lag"));
if (it.second.lag == UINT64_MAX) {
- output->append(redis::NilString());
+ output->append(conn->NilString());
} else {
output->append(redis::Integer(it.second.lag));
}
@@ -611,7 +611,7 @@ class CommandXRange : public Commander {
Status Execute(Server *srv, Connection *conn, std::string *output) override {
if (with_count_ && count_ == 0) {
- *output = redis::NilString();
+ *output = conn->NilString();
return Status::OK();
}
@@ -637,7 +637,7 @@ class CommandXRange : public Commander {
for (const auto &e : result) {
output->append(redis::MultiLen(2));
output->append(redis::BulkString(e.key));
- output->append(redis::MultiBulkString(e.values));
+ output->append(conn->MultiBulkString(e.values));
}
return Status::OK();
@@ -704,7 +704,7 @@ class CommandXRevRange : public Commander {
Status Execute(Server *srv, Connection *conn, std::string *output) override {
if (with_count_ && count_ == 0) {
- *output = redis::NilString();
+ *output = conn->NilString();
return Status::OK();
}
@@ -730,7 +730,7 @@ class CommandXRevRange : public Commander {
for (const auto &e : result) {
output->append(redis::MultiLen(2));
output->append(redis::BulkString(e.key));
- output->append(redis::MultiBulkString(e.values));
+ output->append(conn->MultiBulkString(e.values));
}
return Status::OK();
@@ -862,7 +862,7 @@ class CommandXRead : public Commander,
if (block_ && results.empty()) {
if (conn->IsInExec()) {
- *output = redis::MultiLen(-1);
+ *output = conn->NilArray();
return Status::OK(); // No blocking in multi-exec
}
@@ -870,14 +870,14 @@ class CommandXRead : public Commander,
}
if (!block_ && results.empty()) {
- *output = redis::MultiLen(-1);
+ *output = conn->NilArray();
return Status::OK();
}
- return SendResults(output, results);
+ return SendResults(conn, output, results);
}
- static Status SendResults(std::string *output, const
std::vector<StreamReadResult> &results) {
+ static Status SendResults(Connection *conn, std::string *output, const
std::vector<StreamReadResult> &results) {
output->append(redis::MultiLen(results.size()));
for (const auto &result : results) {
@@ -887,7 +887,7 @@ class CommandXRead : public Commander,
for (const auto &entry : result.entries) {
output->append(redis::MultiLen(2));
output->append(redis::BulkString(entry.key));
- output->append(redis::MultiBulkString(entry.values));
+ output->append(conn->MultiBulkString(entry.values));
}
}
@@ -973,7 +973,7 @@ class CommandXRead : public Commander,
}
if (results.empty()) {
- conn_->Reply(redis::MultiLen(-1));
+ conn_->Reply(conn_->NilArray());
}
SendReply(results);
@@ -991,7 +991,7 @@ class CommandXRead : public Commander,
for (const auto &entry : result.entries) {
output.append(redis::MultiLen(2));
output.append(redis::BulkString(entry.key));
- output.append(redis::MultiBulkString(entry.values));
+ output.append(conn_->MultiBulkString(entry.values));
}
}
@@ -1009,7 +1009,7 @@ class CommandXRead : public Commander,
}
void TimerCB(int, int16_t events) {
- conn_->Reply(redis::NilString());
+ conn_->Reply(conn_->NilString());
timer_.reset();
diff --git a/src/commands/cmd_string.cc b/src/commands/cmd_string.cc
index a0e1a690..debf5e3f 100644
--- a/src/commands/cmd_string.cc
+++ b/src/commands/cmd_string.cc
@@ -54,7 +54,7 @@ class CommandGet : public Commander {
return {Status::RedisExecErr, s.ToString()};
}
- *output = s.IsNotFound() ? redis::NilString() : redis::BulkString(value);
+ *output = s.IsNotFound() ? conn->NilString() : redis::BulkString(value);
return Status::OK();
}
};
@@ -101,7 +101,7 @@ class CommandGetEx : public Commander {
return {Status::RedisExecErr, s.ToString()};
}
- *output = s.IsNotFound() ? redis::NilString() : redis::BulkString(value);
+ *output = s.IsNotFound() ? conn->NilString() : redis::BulkString(value);
return Status::OK();
}
@@ -142,7 +142,7 @@ class CommandGetSet : public Commander {
if (old_value.has_value()) {
*output = redis::BulkString(old_value.value());
} else {
- *output = redis::NilString();
+ *output = conn->NilString();
}
return Status::OK();
}
@@ -159,7 +159,7 @@ class CommandGetDel : public Commander {
}
if (s.IsNotFound()) {
- *output = redis::NilString();
+ *output = conn->NilString();
} else {
*output = redis::BulkString(value);
}
@@ -190,7 +190,7 @@ class CommandGetRange : public Commander {
}
if (s.IsNotFound()) {
- *output = redis::NilString();
+ *output = conn->NilString();
return Status::OK();
}
@@ -199,7 +199,7 @@ class CommandGetRange : public Commander {
if (start_ < 0) start_ = 0;
if (stop_ > static_cast<int>(value.size())) stop_ =
static_cast<int>(value.size());
if (start_ > stop_) {
- *output = redis::NilString();
+ *output = conn->NilString();
} else {
*output = redis::BulkString(value.substr(start_, stop_ - start_ + 1));
}
@@ -255,7 +255,7 @@ class CommandMGet : public Commander {
std::vector<std::string> values;
// always return OK
auto statuses = string_db.MGet(keys, &values);
- *output = redis::MultiBulkString(values, statuses);
+ *output = conn->MultiBulkString(values, statuses);
return Status::OK();
}
};
@@ -323,13 +323,13 @@ class CommandSet : public Commander {
if (ret.has_value()) {
*output = redis::BulkString(ret.value());
} else {
- *output = redis::NilString();
+ *output = conn->NilString();
}
} else {
if (ret.has_value()) {
*output = redis::SimpleString("OK");
} else {
- *output = redis::NilString();
+ *output = conn->NilString();
}
}
return Status::OK();
diff --git a/src/commands/cmd_txn.cc b/src/commands/cmd_txn.cc
index c99c6ddc..fa1a47aa 100644
--- a/src/commands/cmd_txn.cc
+++ b/src/commands/cmd_txn.cc
@@ -73,7 +73,7 @@ class CommandExec : public Commander {
}
if (srv->IsWatchedKeysModified(conn)) {
- *output = redis::NilString();
+ *output = conn->NilString();
return Status::OK();
}
diff --git a/src/commands/cmd_zset.cc b/src/commands/cmd_zset.cc
index f1160274..acddad82 100644
--- a/src/commands/cmd_zset.cc
+++ b/src/commands/cmd_zset.cc
@@ -83,7 +83,7 @@ class CommandZAdd : public Commander {
auto new_score = member_scores_[0].score;
if ((flags_.HasNX() || flags_.HasXX() || flags_.HasLT() ||
flags_.HasGT()) && old_score == new_score &&
ret == 0) { // not the first time using incr && score not changed
- *output = redis::NilString();
+ *output = conn->NilString();
return Status::OK();
}
@@ -336,7 +336,7 @@ class CommandBZPop : public BlockingCommander {
return StartBlocking(timeout_, output);
}
- std::string NoopReply() override { return redis::MultiLen(-1); }
+ std::string NoopReply(const Connection *conn) override { return
conn->NilArray(); }
void BlockKeys() override {
for (const auto &key : keys_) {
@@ -454,7 +454,7 @@ class CommandZMPop : public Commander {
SendMembersWithScoresForZMpop(conn, user_key, member_scores);
return Status::OK();
}
- *output = redis::MultiLen(-1);
+ *output = conn->NilArray();
return Status::OK();
}
@@ -538,7 +538,7 @@ class CommandBZMPop : public BlockingCommander {
}
}
- std::string NoopReply() override { return redis::NilString(); }
+ std::string NoopReply(const Connection *conn) override { return
conn->NilString(); }
bool OnBlockingWrite() override {
std::string user_key;
@@ -797,14 +797,14 @@ class CommandZRangeGeneric : public Commander {
break;
case kZRangeScore:
if (score_spec_.count == 0) {
- *output = redis::MultiBulkString({});
+ *output = conn->MultiBulkString({});
return Status::OK();
}
s = zset_db.RangeByScore(key_, score_spec_, &member_scores, nullptr);
break;
case kZRangeLex:
if (lex_spec_.count == 0) {
- *output = redis::MultiBulkString({});
+ *output = conn->MultiBulkString({});
return Status::OK();
}
s = zset_db.RangeByLex(key_, lex_spec_, &member_scores, nullptr);
@@ -896,9 +896,9 @@ class CommandZRank : public Commander {
if (rank == -1) {
if (with_score_) {
- output->append(redis::MultiLen(-1));
+ output->append(conn->NilArray());
} else {
- *output = redis::NilString();
+ *output = conn->NilString();
}
} else {
if (with_score_) {
@@ -1045,7 +1045,7 @@ class CommandZScore : public Commander {
}
if (s.IsNotFound()) {
- *output = redis::NilString();
+ *output = conn->NilString();
} else {
*output = redis::BulkString(util::Float2String(score));
}
@@ -1080,7 +1080,7 @@ class CommandZMScore : public Commander {
}
}
}
- *output = redis::MultiBulkString(values);
+ *output = conn->MultiBulkString(values);
return Status::OK();
}
};
@@ -1355,7 +1355,7 @@ class CommandZScan : public CommandSubkeyScanBase {
for (const auto &score : scores) {
score_strings.emplace_back(util::Float2String(score));
}
- *output = GenerateOutput(srv, members, score_strings,
CursorType::kTypeZSet);
+ *output = GenerateOutput(srv, conn, members, score_strings,
CursorType::kTypeZSet);
return Status::OK();
}
};
@@ -1407,9 +1407,9 @@ class CommandZRandMember : public Commander {
}
if (no_parameters_)
- *output = s.IsNotFound() ? redis::NilString() :
redis::BulkString(result_entries[0]);
+ *output = s.IsNotFound() ? conn->NilString() :
redis::BulkString(result_entries[0]);
else
- *output = redis::MultiBulkString(result_entries, false);
+ *output = conn->MultiBulkString(result_entries, false);
return Status::OK();
}
diff --git a/src/commands/commander.cc b/src/commands/commander.cc
index 024db614..6ac10581 100644
--- a/src/commands/commander.cc
+++ b/src/commands/commander.cc
@@ -68,7 +68,7 @@ void CommandTable::GetCommandsInfo(std::string *info, const
std::vector<std::str
for (const auto &cmd_name : cmd_names) {
auto cmd_iter = original_commands.find(util::ToLower(cmd_name));
if (cmd_iter == original_commands.end()) {
- info->append(redis::NilString());
+ info->append(NilString(RESP::v2));
} else {
auto command_attribute = cmd_iter->second;
auto command_info = GetCommandInfo(command_attribute);
diff --git a/src/commands/scan_base.h b/src/commands/scan_base.h
index 0bfb188b..bab86e21 100644
--- a/src/commands/scan_base.h
+++ b/src/commands/scan_base.h
@@ -64,7 +64,8 @@ class CommandScanBase : public Commander {
}
}
- std::string GenerateOutput(Server *srv, const std::vector<std::string>
&keys, CursorType cursor_type) const {
+ std::string GenerateOutput(Server *srv, const Connection *conn, const
std::vector<std::string> &keys,
+ CursorType cursor_type) const {
std::vector<std::string> list;
if (keys.size() == static_cast<size_t>(limit_)) {
auto end_cursor = srv->GenerateCursorFromKeyName(keys.back(),
cursor_type);
@@ -73,7 +74,7 @@ class CommandScanBase : public Commander {
list.emplace_back(redis::BulkString("0"));
}
- list.emplace_back(redis::MultiBulkString(keys, false));
+ list.emplace_back(conn->MultiBulkString(keys, false));
return redis::Array(list);
}
@@ -111,7 +112,7 @@ class CommandSubkeyScanBase : public CommandScanBase {
return Commander::Parse(args);
}
- std::string GenerateOutput(Server *srv, const std::vector<std::string>
&fields,
+ std::string GenerateOutput(Server *srv, const Connection *conn, const
std::vector<std::string> &fields,
const std::vector<std::string> &values,
CursorType cursor_type) {
std::vector<std::string> list;
auto items_count = fields.size();
@@ -128,7 +129,7 @@ class CommandSubkeyScanBase : public CommandScanBase {
fvs.emplace_back(values[i]);
}
}
- list.emplace_back(redis::MultiBulkString(fvs, false));
+ list.emplace_back(conn->MultiBulkString(fvs, false));
return redis::Array(list);
}
diff --git a/src/server/redis_connection.cc b/src/server/redis_connection.cc
index d6e0b5f6..87370ffa 100644
--- a/src/server/redis_connection.cc
+++ b/src/server/redis_connection.cc
@@ -130,6 +130,39 @@ void Connection::Reply(const std::string &msg) {
redis::Reply(bufferevent_get_output(bev_), msg);
}
+std::string Connection::Bool(bool b) const {
+ if (protocol_version_ == RESP::v3) {
+ return b ? "#t" CRLF : "#f" CRLF;
+ }
+ return Integer(b ? 1 : 0);
+}
+
+std::string Connection::MultiBulkString(const std::vector<std::string> &values,
+ bool output_nil_for_empty_string)
const {
+ std::string result = "*" + std::to_string(values.size()) + CRLF;
+ for (const auto &value : values) {
+ if (value.empty() && output_nil_for_empty_string) {
+ result += NilString();
+ } else {
+ result += BulkString(value);
+ }
+ }
+ return result;
+}
+
+std::string Connection::MultiBulkString(const std::vector<std::string> &values,
+ const std::vector<rocksdb::Status>
&statuses) const {
+ std::string result = "*" + std::to_string(values.size()) + CRLF;
+ for (size_t i = 0; i < values.size(); i++) {
+ if (i < statuses.size() && !statuses[i].ok()) {
+ result += NilString();
+ } else {
+ result += BulkString(values[i]);
+ }
+ }
+ return result;
+}
+
void Connection::SendFile(int fd) {
// NOTE: we don't need to close the fd, the libevent will do that
auto output = bufferevent_get_output(bev_);
diff --git a/src/server/redis_connection.h b/src/server/redis_connection.h
index 34fbcbae..f35a3889 100644
--- a/src/server/redis_connection.h
+++ b/src/server/redis_connection.h
@@ -64,6 +64,12 @@ class Connection : public EvbufCallbackBase<Connection> {
void Reply(const std::string &msg);
RESP GetProtocolVersion() const { return protocol_version_; }
void SetProtocolVersion(RESP version) { protocol_version_ = version; }
+ std::string Bool(bool b) const;
+ std::string NilString() const { return redis::NilString(protocol_version_); }
+ std::string NilArray() const { return protocol_version_ == RESP::v3 ? "_"
CRLF : "*-1" CRLF; }
+ std::string MultiBulkString(const std::vector<std::string> &values, bool
output_nil_for_empty_string = true) const;
+ std::string MultiBulkString(const std::vector<std::string> &values,
+ const std::vector<rocksdb::Status> &statuses)
const;
using UnsubscribeCallback = std::function<void(std::string, int)>;
void SubscribeChannel(const std::string &channel);
diff --git a/src/server/redis_reply.cc b/src/server/redis_reply.cc
index 768db4eb..95bbf9fd 100644
--- a/src/server/redis_reply.cc
+++ b/src/server/redis_reply.cc
@@ -32,32 +32,6 @@ std::string Error(const std::string &err) { return "-" + err
+ CRLF; }
std::string BulkString(const std::string &data) { return "$" +
std::to_string(data.length()) + CRLF + data + CRLF; }
-std::string NilString() { return "$-1" CRLF; }
-
-std::string MultiBulkString(const std::vector<std::string> &values, bool
output_nil_for_empty_string) {
- std::string result = "*" + std::to_string(values.size()) + CRLF;
- for (const auto &value : values) {
- if (value.empty() && output_nil_for_empty_string) {
- result += NilString();
- } else {
- result += BulkString(value);
- }
- }
- return result;
-}
-
-std::string MultiBulkString(const std::vector<std::string> &values, const
std::vector<rocksdb::Status> &statuses) {
- std::string result = "*" + std::to_string(values.size()) + CRLF;
- for (size_t i = 0; i < values.size(); i++) {
- if (i < statuses.size() && !statuses[i].ok()) {
- result += NilString();
- } else {
- result += BulkString(values[i]);
- }
- }
- return result;
-}
-
std::string Array(const std::vector<std::string> &list) {
size_t n = std::accumulate(list.begin(), list.end(), 0, [](size_t n, const
std::string &s) { return n + s.size(); });
std::string result = "*" + std::to_string(list.size()) + CRLF;
@@ -67,6 +41,12 @@ std::string Array(const std::vector<std::string> &list) {
return result;
}
-std::string Command2RESP(const std::vector<std::string> &cmd_args) { return
MultiBulkString(cmd_args, false); }
+std::string ArrayOfBulkStrings(const std::vector<std::string> &elems) {
+ std::string result = "*" + std::to_string(elems.size()) + CRLF;
+ for (const auto &elem : elems) {
+ result += BulkString(elem);
+ }
+ return result;
+}
} // namespace redis
diff --git a/src/server/redis_reply.h b/src/server/redis_reply.h
index e23380cc..213a8bc0 100644
--- a/src/server/redis_reply.h
+++ b/src/server/redis_reply.h
@@ -21,10 +21,8 @@
#pragma once
#include <event2/buffer.h>
-#include <rocksdb/status.h>
#include <string>
-#include <type_traits>
#include <vector>
#define CRLF "\r\n" // NOLINT
@@ -42,15 +40,14 @@ std::string Integer(T data) {
return ":" + std::to_string(data) + CRLF;
}
-inline std::string Bool(const RESP ver, const bool b) {
+inline std::string NilString(const RESP ver) {
if (ver == RESP::v3) {
- return b ? "#t" CRLF : "#f" CRLF;
+ return "_" CRLF;
}
- return Integer(b ? 1 : 0);
+ return "$-1" CRLF;
}
std::string BulkString(const std::string &data);
-std::string NilString();
template <typename T, std::enable_if_t<std::is_integral_v<T>, int> = 0>
std::string MultiLen(T len) {
@@ -58,8 +55,6 @@ std::string MultiLen(T len) {
}
std::string Array(const std::vector<std::string> &list);
-std::string MultiBulkString(const std::vector<std::string> &values, bool
output_nil_for_empty_string = true);
-std::string MultiBulkString(const std::vector<std::string> &values, const
std::vector<rocksdb::Status> &statuses);
-std::string Command2RESP(const std::vector<std::string> &cmd_args);
+std::string ArrayOfBulkStrings(const std::vector<std::string> &elements);
} // namespace redis
diff --git a/src/server/server.cc b/src/server/server.cc
index efe721b2..bc4c2939 100644
--- a/src/server/server.cc
+++ b/src/server/server.cc
@@ -1024,7 +1024,7 @@ void Server::GetRoleInfo(std::string *info) {
roles.emplace_back("connecting");
}
roles.emplace_back(std::to_string(storage->LatestSeqNumber()));
- *info = redis::MultiBulkString(roles);
+ *info = redis::ArrayOfBulkStrings(roles);
} else {
std::vector<std::string> list;
@@ -1032,7 +1032,7 @@ void Server::GetRoleInfo(std::string *info) {
for (const auto &slave : slave_threads_) {
if (slave->IsStopped()) continue;
- list.emplace_back(redis::MultiBulkString({
+ list.emplace_back(redis::ArrayOfBulkStrings({
slave->GetConn()->GetAnnounceIP(),
std::to_string(slave->GetConn()->GetListeningPort()),
std::to_string(slave->GetCurrentReplSeq()),
diff --git a/src/stats/log_collector.cc b/src/stats/log_collector.cc
index 1d3f9f40..842a7e55 100644
--- a/src/stats/log_collector.cc
+++ b/src/stats/log_collector.cc
@@ -32,7 +32,7 @@ std::string SlowEntry::ToRedisString() const {
output.append(redis::Integer(id));
output.append(redis::Integer(time));
output.append(redis::Integer(duration));
- output.append(redis::MultiBulkString(args));
+ output.append(redis::ArrayOfBulkStrings(args));
output.append(redis::BulkString(ip + ":" + std::to_string(port)));
output.append(redis::BulkString(client_name));
return output;
diff --git a/src/storage/batch_extractor.cc b/src/storage/batch_extractor.cc
index db7174d0..56f58874 100644
--- a/src/storage/batch_extractor.cc
+++ b/src/storage/batch_extractor.cc
@@ -63,10 +63,10 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t
column_family_id, const Slic
if (metadata.Type() == kRedisString) {
command_args = {"SET", user_key,
value.ToString().substr(Metadata::GetOffsetAfterExpire(value[0]))};
- resp_commands_[ns].emplace_back(redis::Command2RESP(command_args));
+ resp_commands_[ns].emplace_back(redis::ArrayOfBulkStrings(command_args));
if (metadata.expire > 0) {
command_args = {"PEXPIREAT", user_key,
std::to_string(metadata.expire)};
- resp_commands_[ns].emplace_back(redis::Command2RESP(command_args));
+
resp_commands_[ns].emplace_back(redis::ArrayOfBulkStrings(command_args));
}
} else if (metadata.expire > 0) {
auto args = log_data_.GetArguments();
@@ -80,7 +80,7 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t
column_family_id, const Slic
auto cmd = static_cast<RedisCommand>(*parse_result);
if (cmd == kRedisCmdExpire) {
command_args = {"PEXPIREAT", user_key,
std::to_string(metadata.expire)};
- resp_commands_[ns].emplace_back(redis::Command2RESP(command_args));
+
resp_commands_[ns].emplace_back(redis::ArrayOfBulkStrings(command_args));
}
}
}
@@ -103,7 +103,7 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t
column_family_id, const Slic
std::to_string(stream_metadata.entries_added),
"MAXDELETEDID",
stream_metadata.max_deleted_entry_id.ToString()};
- resp_commands_[ns].emplace_back(redis::Command2RESP(command_args));
+ resp_commands_[ns].emplace_back(redis::ArrayOfBulkStrings(command_args));
}
return rocksdb::Status::OK();
@@ -262,7 +262,7 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t
column_family_id, const Slic
}
if (!command_args.empty()) {
- resp_commands_[ns].emplace_back(redis::Command2RESP(command_args));
+ resp_commands_[ns].emplace_back(redis::ArrayOfBulkStrings(command_args));
}
return rocksdb::Status::OK();
@@ -387,7 +387,7 @@ rocksdb::Status WriteBatchExtractor::DeleteCF(uint32_t
column_family_id, const S
}
if (!command_args.empty()) {
- resp_commands_[ns].emplace_back(redis::Command2RESP(command_args));
+ resp_commands_[ns].emplace_back(redis::ArrayOfBulkStrings(command_args));
}
return rocksdb::Status::OK();
diff --git a/src/storage/scripting.cc b/src/storage/scripting.cc
index b8fbc02a..a6e73fa0 100644
--- a/src/storage/scripting.cc
+++ b/src/storage/scripting.cc
@@ -1086,9 +1086,9 @@ std::string ReplyToRedisReply(redis::Connection *conn,
lua_State *lua) {
break;
case LUA_TBOOLEAN:
if (conn->GetProtocolVersion() == redis::RESP::v2) {
- output = lua_toboolean(lua, -1) ? redis::Integer(1) :
redis::NilString();
+ output = lua_toboolean(lua, -1) ? redis::Integer(1) :
conn->NilString();
} else {
- output = redis::Bool(redis::RESP::v3, lua_toboolean(lua, -1));
+ output = conn->Bool(lua_toboolean(lua, -1));
}
break;
case LUA_TNUMBER:
@@ -1138,7 +1138,7 @@ std::string ReplyToRedisReply(redis::Connection *conn,
lua_State *lua) {
}
break;
default:
- output = redis::NilString();
+ output = conn->NilString();
}
return output;
}
diff --git a/tests/cppunit/string_reply_test.cc
b/tests/cppunit/string_reply_test.cc
index 80e74972..8aa93507 100644
--- a/tests/cppunit/string_reply_test.cc
+++ b/tests/cppunit/string_reply_test.cc
@@ -39,7 +39,7 @@ class StringReplyTest : public testing::Test {
std::vector<std::string> StringReplyTest::values;
TEST_F(StringReplyTest, MultiBulkString) {
- std::string result = redis::MultiBulkString(values);
+ std::string result = redis::ArrayOfBulkStrings(values);
ASSERT_EQ(result.length(), 13 * 10 + 14 * 90 + 15 * 900 + 17 * 9000 + 18 *
90000 + 9);
}
diff --git a/tests/gocase/unit/debug/debug_test.go
b/tests/gocase/unit/debug/debug_test.go
index 7221d830..6b65ad8c 100644
--- a/tests/gocase/unit/debug/debug_test.go
+++ b/tests/gocase/unit/debug/debug_test.go
@@ -52,6 +52,9 @@ func TestDebugProtocolV2(t *testing.T) {
require.NoError(t, r.Err())
require.EqualValues(t, expectedValue, r.Val())
}
+
+ r := rdb.Do(ctx, "DEBUG", "PROTOCOL", "null")
+ require.EqualError(t, r.Err(), redis.Nil.Error())
})
t.Run("lua script return value type", func(t *testing.T) {
@@ -90,6 +93,8 @@ func TestDebugProtocolV3(t *testing.T) {
require.NoError(t, r.Err())
require.EqualValues(t, expectedValue, r.Val())
}
+ r := rdb.Do(ctx, "DEBUG", "PROTOCOL", "null")
+ require.EqualError(t, r.Err(), redis.Nil.Error())
})
t.Run("lua script return value type", func(t *testing.T) {
diff --git a/tests/gocase/unit/protocol/protocol_test.go
b/tests/gocase/unit/protocol/protocol_test.go
index 9ba40bec..7896cf00 100644
--- a/tests/gocase/unit/protocol/protocol_test.go
+++ b/tests/gocase/unit/protocol/protocol_test.go
@@ -137,3 +137,101 @@ func TestProtocolNetwork(t *testing.T) {
c.MustRead(t, "+string")
})
}
+
+func TestProtocolRESP2(t *testing.T) {
+ srv := util.StartServer(t, map[string]string{
+ "resp3-enabled": "no",
+ })
+ defer srv.Close()
+
+ c := srv.NewTCPClient()
+ defer func() {
+ require.NoError(t, c.Close())
+ }()
+
+ t.Run("debug protocol string", func(t *testing.T) {
+ types := map[string][]string{
+ "string": {"$11", "Hello World"},
+ "integer": {":12345"},
+ "array": {"*3", ":0", ":1", ":2"},
+ "true": {":1"},
+ "false": {":0"},
+ "null": {"$-1"},
+ }
+ for typ, expected := range types {
+ args := []string{"DEBUG", "PROTOCOL", typ}
+ require.NoError(t, c.WriteArgs(args...))
+ for _, line := range expected {
+ c.MustRead(t, line)
+ }
+ }
+ })
+
+ t.Run("multi bulk strings with null string", func(t *testing.T) {
+ require.NoError(t, c.WriteArgs("HSET", "hash", "f1", "v1"))
+ c.MustRead(t, ":1")
+
+ require.NoError(t, c.WriteArgs("HMGET", "hash", "f1", "f2"))
+ c.MustRead(t, "*2")
+ c.MustRead(t, "$2")
+ c.MustRead(t, "v1")
+ c.MustRead(t, "$-1")
+ })
+
+ t.Run("null array", func(t *testing.T) {
+ require.NoError(t, c.WriteArgs("ZRANK", "no-exists-zset", "m0",
"WITHSCORE"))
+ c.MustRead(t, "*-1")
+ })
+}
+
+func TestProtocolRESP3(t *testing.T) {
+ srv := util.StartServer(t, map[string]string{
+ "resp3-enabled": "yes",
+ })
+ defer srv.Close()
+
+ c := srv.NewTCPClient()
+ defer func() {
+ require.NoError(t, c.Close())
+ }()
+
+ t.Run("debug protocol string", func(t *testing.T) {
+ require.NoError(t, c.WriteArgs("HELLO", "3"))
+ values := []string{"*6", "$6", "server", "$5", "redis", "$5",
"proto", ":3", "$4", "mode", "$10", "standalone"}
+ for _, line := range values {
+ c.MustRead(t, line)
+ }
+
+ types := map[string][]string{
+ "string": {"$11", "Hello World"},
+ "integer": {":12345"},
+ "array": {"*3", ":0", ":1", ":2"},
+ "true": {"#t"},
+ "false": {"#f"},
+ "null": {"_"},
+ }
+ for typ, expected := range types {
+ args := []string{"DEBUG", "PROTOCOL", typ}
+ require.NoError(t, c.WriteArgs(args...))
+ for _, line := range expected {
+ c.MustRead(t, line)
+ }
+ }
+ })
+
+ t.Run("multi bulk strings with null", func(t *testing.T) {
+ require.NoError(t, c.WriteArgs("HSET", "hash", "f1", "v1"))
+ c.MustRead(t, ":1")
+
+ require.NoError(t, c.WriteArgs("HMGET", "hash", "f1", "f2"))
+ c.MustRead(t, "*2")
+ c.MustRead(t, "$2")
+ c.MustRead(t, "v1")
+ c.MustRead(t, "_")
+ })
+
+ t.Run("null array", func(t *testing.T) {
+ require.NoError(t, c.WriteArgs("ZRANK", "no-exists-zset", "m0",
"WITHSCORE"))
+ c.MustRead(t, "_")
+ })
+}
diff --git a/utils/kvrocks2redis/parser.cc b/utils/kvrocks2redis/parser.cc
index 86b3e1a5..9d4db5ec 100644
--- a/utils/kvrocks2redis/parser.cc
+++ b/utils/kvrocks2redis/parser.cc
@@ -67,12 +67,12 @@ Status Parser::parseSimpleKV(const Slice &ns_key, const
Slice &value, uint64_t e
auto [ns, user_key] = ExtractNamespaceKey<std::string>(ns_key,
slot_id_encoded_);
auto command =
- redis::Command2RESP({"SET", user_key,
value.ToString().substr(Metadata::GetOffsetAfterExpire(value[0]))});
+ redis::ArrayOfBulkStrings({"SET", user_key,
value.ToString().substr(Metadata::GetOffsetAfterExpire(value[0]))});
Status s = writer_->Write(ns, {command});
if (!s.IsOK()) return s;
if (expire > 0) {
- command = redis::Command2RESP({"EXPIREAT", user_key, std::to_string(expire
/ 1000)});
+ command = redis::ArrayOfBulkStrings({"EXPIREAT", user_key,
std::to_string(expire / 1000)});
s = writer_->Write(ns, {command});
}
@@ -105,17 +105,17 @@ Status Parser::parseComplexKV(const Slice &ns_key, const
Metadata &metadata) {
std::string value = iter->value().ToString();
switch (type) {
case kRedisHash:
- output = redis::Command2RESP({"HSET", user_key, sub_key, value});
+ output = redis::ArrayOfBulkStrings({"HSET", user_key, sub_key, value});
break;
case kRedisSet:
- output = redis::Command2RESP({"SADD", user_key, sub_key});
+ output = redis::ArrayOfBulkStrings({"SADD", user_key, sub_key});
break;
case kRedisList:
- output = redis::Command2RESP({"RPUSH", user_key, value});
+ output = redis::ArrayOfBulkStrings({"RPUSH", user_key, value});
break;
case kRedisZSet: {
double score = DecodeDouble(value.data());
- output = redis::Command2RESP({"ZADD", user_key,
util::Float2String(score), sub_key});
+ output = redis::ArrayOfBulkStrings({"ZADD", user_key,
util::Float2String(score), sub_key});
break;
}
case kRedisBitmap: {
@@ -126,7 +126,7 @@ Status Parser::parseComplexKV(const Slice &ns_key, const
Metadata &metadata) {
}
case kRedisSortedint: {
std::string val =
std::to_string(DecodeFixed64(ikey.GetSubKey().data()));
- output = redis::Command2RESP({"ZADD", user_key, val, val});
+ output = redis::ArrayOfBulkStrings({"ZADD", user_key, val, val});
break;
}
default:
@@ -140,7 +140,7 @@ Status Parser::parseComplexKV(const Slice &ns_key, const
Metadata &metadata) {
}
if (metadata.expire > 0) {
- output = redis::Command2RESP({"EXPIREAT", user_key,
std::to_string(metadata.expire / 1000)});
+ output = redis::ArrayOfBulkStrings({"EXPIREAT", user_key,
std::to_string(metadata.expire / 1000)});
Status s = writer_->Write(ns, {output});
if (!s.IsOK()) return s.Prefixed("failed to write the EXPIREAT command to
AOF");
}
@@ -158,7 +158,7 @@ Status Parser::parseBitmapSegment(const Slice &ns, const
Slice &user_key, int in
s = writer_->Write(
ns.ToString(),
- {redis::Command2RESP({"SETBIT", user_key.ToString(),
std::to_string(index * 8 + i * 8 + j), "1"})});
+ {redis::ArrayOfBulkStrings({"SETBIT", user_key.ToString(),
std::to_string(index * 8 + i * 8 + j), "1"})});
if (!s.IsOK()) return s.Prefixed("failed to write SETBIT command to
AOF");
}
}
diff --git a/utils/kvrocks2redis/redis_writer.cc
b/utils/kvrocks2redis/redis_writer.cc
index 27e7d1d8..74410669 100644
--- a/utils/kvrocks2redis/redis_writer.cc
+++ b/utils/kvrocks2redis/redis_writer.cc
@@ -72,7 +72,7 @@ Status RedisWriter::FlushDB(const std::string &ns) {
return s;
}
- s = Write(ns, {redis::Command2RESP({"FLUSHDB"})});
+ s = Write(ns, {redis::ArrayOfBulkStrings({"FLUSHDB"})});
if (!s.IsOK()) return s;
return Status::OK();