This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 02e97074 fix(stream): NOGROUP/BUSYGROUP error message should return 
without 'ERR' prefix (#2397)
02e97074 is described below

commit 02e97074eb7921a611928a9f2fdfb5b358e00367
Author: hulk <[email protected]>
AuthorDate: Wed Jul 10 09:14:58 2024 +0800

    fix(stream): NOGROUP/BUSYGROUP error message should return without 'ERR' 
prefix (#2397)
---
 src/commands/cmd_stream.cc                   | 22 ++++++++++----
 src/common/status.h                          |  2 ++
 src/server/redis_reply.cc                    | 15 +++++-----
 src/types/redis_stream.cc                    | 44 +++++-----------------------
 tests/gocase/unit/type/stream/stream_test.go | 32 ++++++++++++++++++--
 5 files changed, 63 insertions(+), 52 deletions(-)

diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc
index 7faeb9c4..f227aecf 100644
--- a/src/commands/cmd_stream.cc
+++ b/src/commands/cmd_stream.cc
@@ -321,13 +321,12 @@ class CommandXClaim : public Commander {
     auto s = stream_db.ClaimPelEntries(stream_name_, group_name_, 
consumer_name_, min_idle_time_ms_, entry_ids_,
                                        stream_claim_options_, &result);
     if (!s.ok()) {
+      if (s.IsNotFound()) {
+        return {Status::RedisNoGroup, "No such key '" + stream_name_ + "' or 
consumer group '" + group_name_ + "'"};
+      }
       return {Status::RedisExecErr, s.ToString()};
     }
 
-    if (s.IsNotFound()) {
-      return {Status::RedisExecErr, errNoSuchKey};
-    }
-
     if (!stream_claim_options_.just_id) {
       output->append(redis::MultiLen(result.entries.size()));
 
@@ -411,8 +410,7 @@ class CommandAutoClaim : public Commander {
     auto s = stream_db.AutoClaim(key_name_, group_name_, consumer_name_, 
options_, &result);
     if (!s.ok()) {
       if (s.IsNotFound()) {
-        return {Status::RedisExecErr,
-                "NOGROUP No such key '" + key_name_ + "' or consumer group '" 
+ group_name_ + "'"};
+        return {Status::RedisNoGroup, "No such key '" + key_name_ + "' or 
consumer group '" + group_name_ + "'"};
       }
       return {Status::RedisExecErr, s.ToString()};
     }
@@ -536,6 +534,9 @@ class CommandXGroup : public Commander {
     if (subcommand_ == "create") {
       auto s = stream_db.CreateGroup(stream_name_, xgroup_create_options_, 
group_name_);
       if (!s.ok()) {
+        if (s.IsBusy()) {
+          return {Status::RedisBusyGroup, "consumer group name '" + 
group_name_ + "' already exists"};
+        }
         return {Status::RedisExecErr, s.ToString()};
       }
 
@@ -560,6 +561,9 @@ class CommandXGroup : public Commander {
       int created_number = 0;
       auto s = stream_db.CreateConsumer(stream_name_, group_name_, 
consumer_name_, &created_number);
       if (!s.ok()) {
+        if (s.IsNotFound()) {
+          return {Status::RedisNoGroup, "No such consumer group " + 
group_name_ + " for key name " + stream_name_};
+        }
         return {Status::RedisExecErr, s.ToString()};
       }
 
@@ -570,6 +574,9 @@ class CommandXGroup : public Commander {
       uint64_t deleted_pel = 0;
       auto s = stream_db.DestroyConsumer(stream_name_, group_name_, 
consumer_name_, deleted_pel);
       if (!s.ok()) {
+        if (s.IsNotFound()) {
+          return {Status::RedisNoGroup, "No such consumer group " + 
group_name_ + " for key name " + stream_name_};
+        }
         return {Status::RedisExecErr, s.ToString()};
       }
 
@@ -579,6 +586,9 @@ class CommandXGroup : public Commander {
     if (subcommand_ == "setid") {
       auto s = stream_db.GroupSetId(stream_name_, group_name_, 
xgroup_create_options_);
       if (!s.ok()) {
+        if (s.IsNotFound()) {
+          return {Status::RedisNoGroup, "No such consumer group " + 
group_name_ + " for key name " + stream_name_};
+        }
         return {Status::RedisExecErr, s.ToString()};
       }
 
diff --git a/src/common/status.h b/src/common/status.h
index d06c2f06..2bd610ad 100644
--- a/src/common/status.h
+++ b/src/common/status.h
@@ -59,6 +59,8 @@ class [[nodiscard]] Status {
     RedisWrongType,
     RedisReadOnly,
     RedisExecAbort,
+    RedisBusyGroup,
+    RedisNoGroup,
     RedisMoved,
     RedisCrossSlot,
     RedisTryAgain,
diff --git a/src/server/redis_reply.cc b/src/server/redis_reply.cc
index 20e15e51..52fc180e 100644
--- a/src/server/redis_reply.cc
+++ b/src/server/redis_reply.cc
@@ -24,13 +24,14 @@
 #include <numeric>
 
 const std::map<Status::Code, std::string> redisErrorPrefixMapping = {
-    {Status::RedisErrorNoPrefix, ""},         {Status::RedisNoProto, 
"NOPROTO"},
-    {Status::RedisLoading, "LOADING"},        {Status::RedisMasterDown, 
"MASTERDOWN"},
-    {Status::RedisNoScript, "NOSCRIPT"},      {Status::RedisNoAuth, "NOAUTH"},
-    {Status::RedisWrongType, "WRONGTYPE"},    {Status::RedisReadOnly, 
"READONLY"},
-    {Status::RedisExecAbort, "EXECABORT"},    {Status::RedisMoved, "MOVED"},
-    {Status::RedisCrossSlot, "CROSSSLOT"},    {Status::RedisTryAgain, 
"TRYAGAIN"},
-    {Status::RedisClusterDown, "CLUSTERDOWN"}};
+    {Status::RedisErrorNoPrefix, ""},          {Status::RedisNoProto, 
"NOPROTO"},
+    {Status::RedisLoading, "LOADING"},         {Status::RedisMasterDown, 
"MASTERDOWN"},
+    {Status::RedisNoScript, "NOSCRIPT"},       {Status::RedisNoAuth, "NOAUTH"},
+    {Status::RedisWrongType, "WRONGTYPE"},     {Status::RedisReadOnly, 
"READONLY"},
+    {Status::RedisExecAbort, "EXECABORT"},     {Status::RedisMoved, "MOVED"},
+    {Status::RedisCrossSlot, "CROSSSLOT"},     {Status::RedisTryAgain, 
"TRYAGAIN"},
+    {Status::RedisClusterDown, "CLUSTERDOWN"}, {Status::RedisNoGroup, 
"NOGROUP"},
+    {Status::RedisBusyGroup, "BUSYGROUP"}};
 
 namespace redis {
 
diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc
index 97fcf948..a3f6c330 100644
--- a/src/types/redis_stream.cc
+++ b/src/types/redis_stream.cc
@@ -392,15 +392,9 @@ rocksdb::Status Stream::ClaimPelEntries(const Slice 
&stream_name, const std::str
   std::string group_key = internalKeyFromGroupName(ns_key, metadata, 
group_name);
   std::string get_group_value;
   s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, group_key, 
&get_group_value);
-  if (!s.ok() && !s.IsNotFound()) {
-    return s;
-  }
-  if (s.IsNotFound()) {
-    return rocksdb::Status::InvalidArgument("NOGROUP No such consumer group " 
+ group_name + " for key name " +
-                                            stream_name.ToString());
-  }
-  StreamConsumerGroupMetadata group_metadata = 
decodeStreamConsumerGroupMetadataValue(get_group_value);
+  if (!s.ok()) return s;
 
+  StreamConsumerGroupMetadata group_metadata = 
decodeStreamConsumerGroupMetadataValue(get_group_value);
   std::string consumer_key = internalKeyFromConsumerName(ns_key, metadata, 
group_name, consumer_name);
   std::string get_consumer_value;
   s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, consumer_key, 
&get_consumer_value);
@@ -716,7 +710,7 @@ rocksdb::Status Stream::CreateGroup(const Slice 
&stream_name, const StreamXGroup
     if (!s.ok()) {
       return s;
     }
-    return rocksdb::Status::InvalidArgument("BUSYGROUP Consumer Group name 
already exists");
+    return rocksdb::Status::Busy();
   }
 
   batch->Put(stream_cf_handle_, entry_key, entry_value);
@@ -799,13 +793,7 @@ rocksdb::Status Stream::createConsumerWithoutLock(const 
Slice &stream_name, cons
   std::string entry_key = internalKeyFromGroupName(ns_key, metadata, 
group_name);
   std::string get_entry_value;
   s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, entry_key, 
&get_entry_value);
-  if (!s.ok() && !s.IsNotFound()) {
-    return s;
-  }
-  if (s.IsNotFound()) {
-    return rocksdb::Status::InvalidArgument("NOGROUP No such consumer group " 
+ group_name + " for key name " +
-                                            stream_name.ToString());
-  }
+  if (!s.ok()) return s;
 
   StreamConsumerMetadata consumer_metadata;
   auto now = util::GetTimeStampMS();
@@ -856,13 +844,7 @@ rocksdb::Status Stream::DestroyConsumer(const Slice 
&stream_name, const std::str
   std::string group_key = internalKeyFromGroupName(ns_key, metadata, 
group_name);
   std::string get_group_value;
   s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, group_key, 
&get_group_value);
-  if (!s.ok() && !s.IsNotFound()) {
-    return s;
-  }
-  if (s.IsNotFound()) {
-    return rocksdb::Status::InvalidArgument("NOGROUP No such consumer group " 
+ group_name + " for key name " +
-                                            stream_name.ToString());
-  }
+  if (!s.ok()) return s;
 
   std::string consumer_key = internalKeyFromConsumerName(ns_key, metadata, 
group_name, consumer_name);
   std::string get_consumer_value;
@@ -932,13 +914,7 @@ rocksdb::Status Stream::GroupSetId(const Slice 
&stream_name, const std::string &
   std::string entry_key = internalKeyFromGroupName(ns_key, metadata, 
group_name);
   std::string get_entry_value;
   s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, entry_key, 
&get_entry_value);
-  if (!s.ok() && !s.IsNotFound()) {
-    return s;
-  }
-  if (s.IsNotFound()) {
-    return rocksdb::Status::InvalidArgument("NOGROUP No such consumer group " 
+ group_name + " for key name " +
-                                            stream_name.ToString());
-  }
+  if (!s.ok()) return s;
 
   StreamConsumerGroupMetadata consumer_group_metadata = 
decodeStreamConsumerGroupMetadataValue(get_entry_value);
   if (options.last_id == "$") {
@@ -1455,13 +1431,7 @@ rocksdb::Status Stream::RangeWithPending(const Slice 
&stream_name, StreamRangeOp
   std::string group_key = internalKeyFromGroupName(ns_key, metadata, 
group_name);
   std::string get_group_value;
   s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, group_key, 
&get_group_value);
-  if (!s.ok() && !s.IsNotFound()) {
-    return s;
-  }
-  if (s.IsNotFound()) {
-    return rocksdb::Status::InvalidArgument("NOGROUP No such consumer group " 
+ group_name + " for key name " +
-                                            stream_name.ToString());
-  }
+  if (!s.ok()) return s;
 
   std::string consumer_key = internalKeyFromConsumerName(ns_key, metadata, 
group_name, consumer_name);
   std::string get_consumer_value;
diff --git a/tests/gocase/unit/type/stream/stream_test.go 
b/tests/gocase/unit/type/stream/stream_test.go
index b8958e70..e4048eaf 100644
--- a/tests/gocase/unit/type/stream/stream_test.go
+++ b/tests/gocase/unit/type/stream/stream_test.go
@@ -889,6 +889,9 @@ func TestStreamOffset(t *testing.T) {
                require.NoError(t, rdb.Del(ctx, "myStream").Err())
                require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: 
"myStream", Values: []string{"iTeM", "1", "vAluE", "a"}}).Err())
                require.NoError(t, rdb.XGroupCreate(ctx, "myStream", "myGroup", 
"$").Err())
+               // duplicate create group
+               require.EqualError(t, rdb.XGroupCreate(ctx, "myStream", 
"myGroup", "$").Err(),
+                       "BUSYGROUP consumer group name 'myGroup' already 
exists")
                result, err := rdb.XGroupDestroy(ctx, "myStream", 
"myGroup").Result()
                require.NoError(t, err)
                require.Equal(t, int64(1), result)
@@ -931,8 +934,12 @@ func TestStreamOffset(t *testing.T) {
                        ID:     "1-0",
                        Values: []string{"data", "a"},
                }).Err())
+
                //no such group
-               require.Error(t, rdb.XGroupCreateConsumer(ctx, streamName, 
groupName, consumerName).Err())
+               expectedError := fmt.Sprintf("NOGROUP No such consumer group %s 
for key name %s", groupName, streamName)
+               require.EqualError(t, rdb.XGroupCreateConsumer(ctx, streamName, 
groupName, consumerName).Err(), expectedError)
+               require.EqualError(t, rdb.XGroupDelConsumer(ctx, streamName, 
groupName, consumerName).Err(), expectedError)
+
                require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, 
"$").Err())
                require.NoError(t, rdb.XGroupCreateConsumer(ctx, streamName, 
groupName, consumerName).Err())
 
@@ -1009,7 +1016,8 @@ func TestStreamOffset(t *testing.T) {
                        Values: []string{"data", "a"},
                }).Err())
                //No such group
-               require.Error(t, rdb.XGroupSetID(ctx, streamName, groupName, 
"$").Err())
+               require.EqualError(t, rdb.XGroupSetID(ctx, streamName, 
groupName, "$").Err(),
+                       fmt.Sprintf("NOGROUP No such consumer group %s for key 
name %s", groupName, streamName))
                require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, 
"$").Err())
 
                require.NoError(t, rdb.XGroupSetID(ctx, streamName, groupName, 
"0-0").Err())
@@ -1342,6 +1350,17 @@ func TestStreamOffset(t *testing.T) {
                        ID:     "1-0",
                        Values: []string{"field1", "data1"},
                }).Err())
+
+               // No such group
+               err := rdb.XClaim(ctx, &redis.XClaimArgs{
+                       Stream:   streamName,
+                       Group:    groupName,
+                       Consumer: consumer1Name,
+                       MinIdle:  0,
+                       Messages: []string{"1-0"},
+               }).Err()
+               require.EqualError(t, err, fmt.Sprintf("NOGROUP No such key 
'%s' or consumer group '%s'", streamName, groupName))
+
                require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, 
"0").Err())
                r, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
                        Group:    groupName,
@@ -1547,6 +1566,15 @@ func TestStreamOffset(t *testing.T) {
                        id4 = rsp.Val()
                }
 
+               // No such group
+               err := rdb.XAutoClaimJustID(ctx, &redis.XAutoClaimArgs{
+                       Stream:   streamName,
+                       Group:    groupName,
+                       Consumer: "consumer",
+                       MinIdle:  10 * time.Millisecond,
+                       Start:    "-",
+               }).Err()
+               require.EqualError(t, err, fmt.Sprintf("NOGROUP No such key 
'%s' or consumer group '%s'", streamName, groupName))
                require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, 
"0").Err())
 
                consumer1 := "consumer1"

Reply via email to