This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new c9da1ea8 Add new MPUBLISH command (#1657)
c9da1ea8 is described below
commit c9da1ea813afd5fa544bd2c3d6fcf42447b8148c
Author: Yaroslav <[email protected]>
AuthorDate: Fri Aug 11 16:45:02 2023 +0300
Add new MPUBLISH command (#1657)
The new `MPUBLISH` command allows publishing one or more messages to a
channel.
Syntax: `MPUBLISH channel-name message1 message2 ... messageN`
---
src/commands/cmd_pubsub.cc | 39 +++++++++++++++++++---
src/commands/error_constants.h | 1 +
tests/gocase/unit/pubsub/pubsub_test.go | 57 +++++++++++++++++++++++++++++++++
3 files changed, 92 insertions(+), 5 deletions(-)
diff --git a/src/commands/cmd_pubsub.cc b/src/commands/cmd_pubsub.cc
index 01422ea8..352930a0 100644
--- a/src/commands/cmd_pubsub.cc
+++ b/src/commands/cmd_pubsub.cc
@@ -27,12 +27,13 @@ namespace redis {
class CommandPublish : public Commander {
public:
- // mark is_write as false here because slave should be able to execute
publish command
Status Execute(Server *svr, Connection *conn, std::string *output) override {
if (!svr->IsSlave()) {
- // Compromise: can't replicate message to sub-replicas in a
cascading-like structure.
- // Replication relies on WAL seq, increase the seq on slave will break
the replication, hence the compromise
+ // Compromise: can't replicate a message to sub-replicas in a
cascading-like structure.
+ // Replication relies on WAL seq; increasing the seq on a replica will
break the replication process,
+ // hence the compromise solution
redis::PubSub pubsub_db(svr->storage);
+
auto s = pubsub_db.Publish(args_[1], args_[2]);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
@@ -40,7 +41,34 @@ class CommandPublish : public Commander {
}
int receivers = svr->PublishMessage(args_[1], args_[2]);
+
*output = redis::Integer(receivers);
+
+ return Status::OK();
+ }
+};
+
+class CommandMPublish : public Commander {
+ public:
+ Status Execute(Server *svr, Connection *conn, std::string *output) override {
+ int total_receivers = 0;
+
+ for (size_t i = 2; i < args_.size(); i++) {
+ if (!svr->IsSlave()) {
+ redis::PubSub pubsub_db(svr->storage);
+
+ auto s = pubsub_db.Publish(args_[1], args_[i]);
+ if (!s.ok()) {
+ return {Status::RedisExecErr, s.ToString()};
+ }
+ }
+
+ int receivers = svr->PublishMessage(args_[1], args_[i]);
+ total_receivers += receivers;
+ }
+
+ *output = redis::Integer(total_receivers);
+
return Status::OK();
}
};
@@ -132,7 +160,7 @@ class CommandPubSub : public Commander {
return Status::OK();
}
- return {Status::RedisInvalidCmd, "Unknown subcommand or wrong number of
arguments"};
+ return {Status::RedisInvalidCmd, errUnknownSubcommandOrWrongArguments};
}
Status Execute(Server *srv, Connection *conn, std::string *output) override {
@@ -161,7 +189,7 @@ class CommandPubSub : public Commander {
return Status::OK();
}
- return {Status::RedisInvalidCmd, "Unknown subcommand or wrong number of
arguments"};
+ return {Status::RedisInvalidCmd, errUnknownSubcommandOrWrongArguments};
}
private:
@@ -172,6 +200,7 @@ class CommandPubSub : public Commander {
REDIS_REGISTER_COMMANDS(
MakeCmdAttr<CommandPublish>("publish", 3, "read-only pub-sub", 0, 0, 0),
+ MakeCmdAttr<CommandMPublish>("mpublish", -3, "read-only pub-sub", 0, 0, 0),
MakeCmdAttr<CommandSubscribe>("subscribe", -2, "read-only pub-sub no-multi
no-script", 0, 0, 0),
MakeCmdAttr<CommandUnSubscribe>("unsubscribe", -1, "read-only pub-sub
no-multi no-script", 0, 0, 0),
MakeCmdAttr<CommandPSubscribe>("psubscribe", -2, "read-only pub-sub
no-multi no-script", 0, 0, 0),
diff --git a/src/commands/error_constants.h b/src/commands/error_constants.h
index b9c05ed5..df650fab 100644
--- a/src/commands/error_constants.h
+++ b/src/commands/error_constants.h
@@ -41,5 +41,6 @@ inline constexpr const char *errScoreIsNotValidFloat = "score
is not a valid flo
inline constexpr const char *errValueIsNotFloat = "value is not a valid float";
inline constexpr const char *errNoMatchingScript = "NOSCRIPT No matching
script. Please use EVAL";
inline constexpr const char *errUnknownOption = "unknown option";
+inline constexpr const char *errUnknownSubcommandOrWrongArguments = "Unknown
subcommand or wrong number of arguments";
} // namespace redis
diff --git a/tests/gocase/unit/pubsub/pubsub_test.go
b/tests/gocase/unit/pubsub/pubsub_test.go
index 3259d76c..d3f8c4a1 100644
--- a/tests/gocase/unit/pubsub/pubsub_test.go
+++ b/tests/gocase/unit/pubsub/pubsub_test.go
@@ -115,6 +115,63 @@ func TestPubSub(t *testing.T) {
require.Equal(t, "hello", msg.Payload)
})
+ t.Run("MPUBLISH basics", func(t *testing.T) {
+ var (
+ channelName = "channel1"
+ msg1 = "hello"
+ msg2 = "world"
+ msg3 = "!"
+ msg4 = "foo-bar"
+ )
+
+ c1 := srv.NewClient()
+ defer func() { require.NoError(t, c1.Close()) }()
+ c2 := srv.NewClient()
+ defer func() { require.NoError(t, c2.Close()) }()
+
+ pubsub1 := c1.Subscribe(ctx, channelName)
+ pubsub2 := c2.Subscribe(ctx, channelName)
+
+ require.EqualValues(t, 1, receiveType(t, pubsub1,
&redis.Subscription{}).Count)
+ require.EqualValues(t, 1, receiveType(t, pubsub2,
&redis.Subscription{}).Count)
+
+ require.EqualValues(t, 6, rdb.Do(ctx, "MPUBLISH", channelName,
msg1, msg2, msg3).Val())
+
+ msg := receiveType(t, pubsub1, &redis.Message{})
+ require.Equal(t, channelName, msg.Channel)
+ require.Equal(t, msg1, msg.Payload)
+
+ msg = receiveType(t, pubsub2, &redis.Message{})
+ require.Equal(t, channelName, msg.Channel)
+ require.Equal(t, msg1, msg.Payload)
+
+ msg = receiveType(t, pubsub1, &redis.Message{})
+ require.Equal(t, channelName, msg.Channel)
+ require.Equal(t, msg2, msg.Payload)
+
+ msg = receiveType(t, pubsub2, &redis.Message{})
+ require.Equal(t, channelName, msg.Channel)
+ require.Equal(t, msg2, msg.Payload)
+
+ msg = receiveType(t, pubsub1, &redis.Message{})
+ require.Equal(t, channelName, msg.Channel)
+ require.Equal(t, msg3, msg.Payload)
+
+ msg = receiveType(t, pubsub2, &redis.Message{})
+ require.Equal(t, channelName, msg.Channel)
+ require.Equal(t, msg3, msg.Payload)
+
+ require.EqualValues(t, 2, rdb.Do(ctx, "MPUBLISH", channelName,
msg4).Val())
+
+ msg = receiveType(t, pubsub1, &redis.Message{})
+ require.Equal(t, channelName, msg.Channel)
+ require.Equal(t, msg4, msg.Payload)
+
+ msg = receiveType(t, pubsub2, &redis.Message{})
+ require.Equal(t, channelName, msg.Channel)
+ require.Equal(t, msg4, msg.Payload)
+ })
+
t.Run("PUBLISH/SUBSCRIBE after UNSUBSCRIBE without arguments", func(t
*testing.T) {
pubsub := rdb.Subscribe(ctx, "chan1", "chan2", "chan3")
require.EqualValues(t, 1, receiveType(t, pubsub,
&redis.Subscription{}).Count)