This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new c9da1ea8 Add new MPUBLISH command (#1657)
c9da1ea8 is described below

commit c9da1ea813afd5fa544bd2c3d6fcf42447b8148c
Author: Yaroslav <[email protected]>
AuthorDate: Fri Aug 11 16:45:02 2023 +0300

    Add new MPUBLISH command (#1657)
    
    The new `MPUBLISH` command allows publishing one or more messages to a 
channel.
    Syntax: `MPUBLISH channel-name message1 message2 ... messageN`
---
 src/commands/cmd_pubsub.cc              | 39 +++++++++++++++++++---
 src/commands/error_constants.h          |  1 +
 tests/gocase/unit/pubsub/pubsub_test.go | 57 +++++++++++++++++++++++++++++++++
 3 files changed, 92 insertions(+), 5 deletions(-)

diff --git a/src/commands/cmd_pubsub.cc b/src/commands/cmd_pubsub.cc
index 01422ea8..352930a0 100644
--- a/src/commands/cmd_pubsub.cc
+++ b/src/commands/cmd_pubsub.cc
@@ -27,12 +27,13 @@ namespace redis {
 
 class CommandPublish : public Commander {
  public:
-  // mark is_write as false here because slave should be able to execute 
publish command
   Status Execute(Server *svr, Connection *conn, std::string *output) override {
     if (!svr->IsSlave()) {
-      // Compromise: can't replicate message to sub-replicas in a 
cascading-like structure.
-      // Replication relies on WAL seq, increase the seq on slave will break 
the replication, hence the compromise
+      // Compromise: can't replicate a message to sub-replicas in a 
cascading-like structure.
+      // Replication relies on WAL seq; increasing the seq on a replica will 
break the replication process,
+      // hence the compromise solution
       redis::PubSub pubsub_db(svr->storage);
+
       auto s = pubsub_db.Publish(args_[1], args_[2]);
       if (!s.ok()) {
         return {Status::RedisExecErr, s.ToString()};
@@ -40,7 +41,34 @@ class CommandPublish : public Commander {
     }
 
     int receivers = svr->PublishMessage(args_[1], args_[2]);
+
     *output = redis::Integer(receivers);
+
+    return Status::OK();
+  }
+};
+
+class CommandMPublish : public Commander {
+ public:
+  Status Execute(Server *svr, Connection *conn, std::string *output) override {
+    int total_receivers = 0;
+
+    for (size_t i = 2; i < args_.size(); i++) {
+      if (!svr->IsSlave()) {
+        redis::PubSub pubsub_db(svr->storage);
+
+        auto s = pubsub_db.Publish(args_[1], args_[i]);
+        if (!s.ok()) {
+          return {Status::RedisExecErr, s.ToString()};
+        }
+      }
+
+      int receivers = svr->PublishMessage(args_[1], args_[i]);
+      total_receivers += receivers;
+    }
+
+    *output = redis::Integer(total_receivers);
+
     return Status::OK();
   }
 };
@@ -132,7 +160,7 @@ class CommandPubSub : public Commander {
       return Status::OK();
     }
 
-    return {Status::RedisInvalidCmd, "Unknown subcommand or wrong number of 
arguments"};
+    return {Status::RedisInvalidCmd, errUnknownSubcommandOrWrongArguments};
   }
 
   Status Execute(Server *srv, Connection *conn, std::string *output) override {
@@ -161,7 +189,7 @@ class CommandPubSub : public Commander {
       return Status::OK();
     }
 
-    return {Status::RedisInvalidCmd, "Unknown subcommand or wrong number of 
arguments"};
+    return {Status::RedisInvalidCmd, errUnknownSubcommandOrWrongArguments};
   }
 
  private:
@@ -172,6 +200,7 @@ class CommandPubSub : public Commander {
 
 REDIS_REGISTER_COMMANDS(
     MakeCmdAttr<CommandPublish>("publish", 3, "read-only pub-sub", 0, 0, 0),
+    MakeCmdAttr<CommandMPublish>("mpublish", -3, "read-only pub-sub", 0, 0, 0),
     MakeCmdAttr<CommandSubscribe>("subscribe", -2, "read-only pub-sub no-multi 
no-script", 0, 0, 0),
     MakeCmdAttr<CommandUnSubscribe>("unsubscribe", -1, "read-only pub-sub 
no-multi no-script", 0, 0, 0),
     MakeCmdAttr<CommandPSubscribe>("psubscribe", -2, "read-only pub-sub 
no-multi no-script", 0, 0, 0),
diff --git a/src/commands/error_constants.h b/src/commands/error_constants.h
index b9c05ed5..df650fab 100644
--- a/src/commands/error_constants.h
+++ b/src/commands/error_constants.h
@@ -41,5 +41,6 @@ inline constexpr const char *errScoreIsNotValidFloat = "score 
is not a valid flo
 inline constexpr const char *errValueIsNotFloat = "value is not a valid float";
 inline constexpr const char *errNoMatchingScript = "NOSCRIPT No matching 
script. Please use EVAL";
 inline constexpr const char *errUnknownOption = "unknown option";
+inline constexpr const char *errUnknownSubcommandOrWrongArguments = "Unknown 
subcommand or wrong number of arguments";
 
 }  // namespace redis
diff --git a/tests/gocase/unit/pubsub/pubsub_test.go 
b/tests/gocase/unit/pubsub/pubsub_test.go
index 3259d76c..d3f8c4a1 100644
--- a/tests/gocase/unit/pubsub/pubsub_test.go
+++ b/tests/gocase/unit/pubsub/pubsub_test.go
@@ -115,6 +115,63 @@ func TestPubSub(t *testing.T) {
                require.Equal(t, "hello", msg.Payload)
        })
 
+       t.Run("MPUBLISH basics", func(t *testing.T) {
+               var (
+                       channelName = "channel1"
+                       msg1        = "hello"
+                       msg2        = "world"
+                       msg3        = "!"
+                       msg4        = "foo-bar"
+               )
+
+               c1 := srv.NewClient()
+               defer func() { require.NoError(t, c1.Close()) }()
+               c2 := srv.NewClient()
+               defer func() { require.NoError(t, c2.Close()) }()
+
+               pubsub1 := c1.Subscribe(ctx, channelName)
+               pubsub2 := c2.Subscribe(ctx, channelName)
+
+               require.EqualValues(t, 1, receiveType(t, pubsub1, 
&redis.Subscription{}).Count)
+               require.EqualValues(t, 1, receiveType(t, pubsub2, 
&redis.Subscription{}).Count)
+
+               require.EqualValues(t, 6, rdb.Do(ctx, "MPUBLISH", channelName, 
msg1, msg2, msg3).Val())
+
+               msg := receiveType(t, pubsub1, &redis.Message{})
+               require.Equal(t, channelName, msg.Channel)
+               require.Equal(t, msg1, msg.Payload)
+
+               msg = receiveType(t, pubsub2, &redis.Message{})
+               require.Equal(t, channelName, msg.Channel)
+               require.Equal(t, msg1, msg.Payload)
+
+               msg = receiveType(t, pubsub1, &redis.Message{})
+               require.Equal(t, channelName, msg.Channel)
+               require.Equal(t, msg2, msg.Payload)
+
+               msg = receiveType(t, pubsub2, &redis.Message{})
+               require.Equal(t, channelName, msg.Channel)
+               require.Equal(t, msg2, msg.Payload)
+
+               msg = receiveType(t, pubsub1, &redis.Message{})
+               require.Equal(t, channelName, msg.Channel)
+               require.Equal(t, msg3, msg.Payload)
+
+               msg = receiveType(t, pubsub2, &redis.Message{})
+               require.Equal(t, channelName, msg.Channel)
+               require.Equal(t, msg3, msg.Payload)
+
+               require.EqualValues(t, 2, rdb.Do(ctx, "MPUBLISH", channelName, 
msg4).Val())
+
+               msg = receiveType(t, pubsub1, &redis.Message{})
+               require.Equal(t, channelName, msg.Channel)
+               require.Equal(t, msg4, msg.Payload)
+
+               msg = receiveType(t, pubsub2, &redis.Message{})
+               require.Equal(t, channelName, msg.Channel)
+               require.Equal(t, msg4, msg.Payload)
+       })
+
        t.Run("PUBLISH/SUBSCRIBE after UNSUBSCRIBE without arguments", func(t 
*testing.T) {
                pubsub := rdb.Subscribe(ctx, "chan1", "chan2", "chan3")
                require.EqualValues(t, 1, receiveType(t, pubsub, 
&redis.Subscription{}).Count)

Reply via email to