This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 5af0b3b4 Implement RESP3 set type for the set data structure (#2024)
5af0b3b4 is described below
commit 5af0b3b46f423ec40b5faaf3785f3cb1b5b41394
Author: hulk <[email protected]>
AuthorDate: Wed Jan 17 13:38:16 2024 +0800
Implement RESP3 set type for the set data structure (#2024)
---
src/commands/cmd_server.cc | 9 +++++++--
src/commands/cmd_set.cc | 12 ++++++------
src/server/redis_connection.cc | 9 +++++++++
src/server/redis_connection.h | 5 +++++
tests/gocase/unit/debug/debug_test.go | 2 ++
tests/gocase/unit/protocol/protocol_test.go | 2 ++
tests/gocase/unit/type/set/set_test.go | 12 +++++++++++-
7 files changed, 42 insertions(+), 9 deletions(-)
diff --git a/src/commands/cmd_server.cc b/src/commands/cmd_server.cc
index 71781cd5..6ad11e4c 100644
--- a/src/commands/cmd_server.cc
+++ b/src/commands/cmd_server.cc
@@ -612,6 +612,11 @@ class CommandDebug : public Commander {
for (int i = 0; i < 3; i++) {
*output += redis::Integer(i);
}
+ } else if (protocol_type_ == "set") {
+ *output = conn->SizeOfSet(3);
+ for (int i = 0; i < 3; i++) {
+ *output += redis::Integer(i);
+ }
} else if (protocol_type_ == "true") {
*output = conn->Bool(true);
} else if (protocol_type_ == "false") {
@@ -619,8 +624,8 @@ class CommandDebug : public Commander {
} else if (protocol_type_ == "null") {
*output = conn->NilString();
} else {
- *output =
- redis::Error("Wrong protocol type name. Please use one of the
following: string|int|array|true|false|null");
+ *output = redis::Error(
+ "Wrong protocol type name. Please use one of the following:
string|int|array|set|true|false|null");
}
} else {
return {Status::RedisInvalidCmd, "Unknown subcommand, should be DEBUG or
PROTOCOL"};
diff --git a/src/commands/cmd_set.cc b/src/commands/cmd_set.cc
index 0b5a8bda..079b2d26 100644
--- a/src/commands/cmd_set.cc
+++ b/src/commands/cmd_set.cc
@@ -93,7 +93,7 @@ class CommandSMembers : public Commander {
return {Status::RedisExecErr, s.ToString()};
}
- *output = conn->MultiBulkString(members, false);
+ *output = conn->ArrayOfSet(members);
return Status::OK();
}
};
@@ -171,7 +171,7 @@ class CommandSPop : public Commander {
}
if (with_count_) {
- *output = conn->MultiBulkString(members, false);
+ *output = conn->ArrayOfSet(members);
} else {
if (members.size() > 0) {
*output = redis::BulkString(members.front());
@@ -211,7 +211,7 @@ class CommandSRandMember : public Commander {
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}
- *output = conn->MultiBulkString(members, false);
+ *output = conn->ArrayOfSet(members);
return Status::OK();
}
@@ -249,7 +249,7 @@ class CommandSDiff : public Commander {
return {Status::RedisExecErr, s.ToString()};
}
- *output = conn->MultiBulkString(members, false);
+ *output = conn->ArrayOfSet(members);
return Status::OK();
}
};
@@ -269,7 +269,7 @@ class CommandSUnion : public Commander {
return {Status::RedisExecErr, s.ToString()};
}
- *output = conn->MultiBulkString(members, false);
+ *output = conn->ArrayOfSet(members);
return Status::OK();
}
};
@@ -289,7 +289,7 @@ class CommandSInter : public Commander {
return {Status::RedisExecErr, s.ToString()};
}
- *output = conn->MultiBulkString(members, false);
+ *output = conn->ArrayOfSet(members);
return Status::OK();
}
};
diff --git a/src/server/redis_connection.cc b/src/server/redis_connection.cc
index 87370ffa..5c93a214 100644
--- a/src/server/redis_connection.cc
+++ b/src/server/redis_connection.cc
@@ -163,6 +163,15 @@ std::string Connection::MultiBulkString(const
std::vector<std::string> &values,
return result;
}
+std::string Connection::ArrayOfSet(const std::vector<std::string> &elems)
const {
+ std::string result;
+ result += SizeOfSet(elems.size());
+ for (const auto &elem : elems) {
+ result += BulkString(elem);
+ }
+ return result;
+}
+
void Connection::SendFile(int fd) {
// NOTE: we don't need to close the fd, the libevent will do that
auto output = bufferevent_get_output(bev_);
diff --git a/src/server/redis_connection.h b/src/server/redis_connection.h
index f35a3889..c2a0d8cb 100644
--- a/src/server/redis_connection.h
+++ b/src/server/redis_connection.h
@@ -70,6 +70,11 @@ class Connection : public EvbufCallbackBase<Connection> {
std::string MultiBulkString(const std::vector<std::string> &values, bool
output_nil_for_empty_string = true) const;
std::string MultiBulkString(const std::vector<std::string> &values,
const std::vector<rocksdb::Status> &statuses)
const;
+ template <typename T, std::enable_if_t<std::is_integral_v<T>, int> = 0>
+ std::string SizeOfSet(T len) const {
+ return protocol_version_ == RESP::v3 ? "~" + std::to_string(len) + CRLF :
MultiLen(len);
+ }
+ std::string ArrayOfSet(const std::vector<std::string> &elems) const;
using UnsubscribeCallback = std::function<void(std::string, int)>;
void SubscribeChannel(const std::string &channel);
diff --git a/tests/gocase/unit/debug/debug_test.go
b/tests/gocase/unit/debug/debug_test.go
index 6b65ad8c..a8a158c3 100644
--- a/tests/gocase/unit/debug/debug_test.go
+++ b/tests/gocase/unit/debug/debug_test.go
@@ -44,6 +44,7 @@ func TestDebugProtocolV2(t *testing.T) {
"string": "Hello World",
"integer": int64(12345),
"array": []interface{}{int64(0), int64(1), int64(2)},
+ "set": []interface{}{int64(0), int64(1), int64(2)},
"true": int64(1),
"false": int64(0),
}
@@ -85,6 +86,7 @@ func TestDebugProtocolV3(t *testing.T) {
"string": "Hello World",
"integer": int64(12345),
"array": []interface{}{int64(0), int64(1), int64(2)},
+ "set": []interface{}{int64(0), int64(1), int64(2)},
"true": true,
"false": false,
}
diff --git a/tests/gocase/unit/protocol/protocol_test.go
b/tests/gocase/unit/protocol/protocol_test.go
index 7896cf00..d7f57145 100644
--- a/tests/gocase/unit/protocol/protocol_test.go
+++ b/tests/gocase/unit/protocol/protocol_test.go
@@ -154,6 +154,7 @@ func TestProtocolRESP2(t *testing.T) {
"string": {"$11", "Hello World"},
"integer": {":12345"},
"array": {"*3", ":0", ":1", ":2"},
+ "set": {"*3", ":0", ":1", ":2"},
"true": {":1"},
"false": {":0"},
"null": {"$-1"},
@@ -206,6 +207,7 @@ func TestProtocolRESP3(t *testing.T) {
"string": {"$11", "Hello World"},
"integer": {":12345"},
"array": {"*3", ":0", ":1", ":2"},
+ "set": {"~3", ":0", ":1", ":2"},
"true": {"#t"},
"false": {"#f"},
"null": {"_"},
diff --git a/tests/gocase/unit/type/set/set_test.go
b/tests/gocase/unit/type/set/set_test.go
index 8a58c7b9..0d596620 100644
--- a/tests/gocase/unit/type/set/set_test.go
+++ b/tests/gocase/unit/type/set/set_test.go
@@ -57,7 +57,17 @@ func GetArrayUnion(arrays ...[]string) []string {
}
func TestSet(t *testing.T) {
- srv := util.StartServer(t, map[string]string{})
+ setTests(t, "no")
+}
+
+func TestSetWithRESP3(t *testing.T) {
+ setTests(t, "yes")
+}
+
+var setTests = func(t *testing.T, enabledRESP3 string) {
+ srv := util.StartServer(t, map[string]string{
+ "resp3-enabled": enabledRESP3,
+ })
defer srv.Close()
ctx := context.Background()
rdb := srv.NewClient()