This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 02e97074 fix(stream): NOGROUP/BUSYGROUP error message should return
without 'ERR' prefix (#2397)
02e97074 is described below
commit 02e97074eb7921a611928a9f2fdfb5b358e00367
Author: hulk <[email protected]>
AuthorDate: Wed Jul 10 09:14:58 2024 +0800
fix(stream): NOGROUP/BUSYGROUP error message should return without 'ERR'
prefix (#2397)
---
src/commands/cmd_stream.cc | 22 ++++++++++----
src/common/status.h | 2 ++
src/server/redis_reply.cc | 15 +++++-----
src/types/redis_stream.cc | 44 +++++-----------------------
tests/gocase/unit/type/stream/stream_test.go | 32 ++++++++++++++++++--
5 files changed, 63 insertions(+), 52 deletions(-)
diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc
index 7faeb9c4..f227aecf 100644
--- a/src/commands/cmd_stream.cc
+++ b/src/commands/cmd_stream.cc
@@ -321,13 +321,12 @@ class CommandXClaim : public Commander {
auto s = stream_db.ClaimPelEntries(stream_name_, group_name_,
consumer_name_, min_idle_time_ms_, entry_ids_,
stream_claim_options_, &result);
if (!s.ok()) {
+ if (s.IsNotFound()) {
+ return {Status::RedisNoGroup, "No such key '" + stream_name_ + "' or
consumer group '" + group_name_ + "'"};
+ }
return {Status::RedisExecErr, s.ToString()};
}
- if (s.IsNotFound()) {
- return {Status::RedisExecErr, errNoSuchKey};
- }
-
if (!stream_claim_options_.just_id) {
output->append(redis::MultiLen(result.entries.size()));
@@ -411,8 +410,7 @@ class CommandAutoClaim : public Commander {
auto s = stream_db.AutoClaim(key_name_, group_name_, consumer_name_,
options_, &result);
if (!s.ok()) {
if (s.IsNotFound()) {
- return {Status::RedisExecErr,
- "NOGROUP No such key '" + key_name_ + "' or consumer group '"
+ group_name_ + "'"};
+ return {Status::RedisNoGroup, "No such key '" + key_name_ + "' or
consumer group '" + group_name_ + "'"};
}
return {Status::RedisExecErr, s.ToString()};
}
@@ -536,6 +534,9 @@ class CommandXGroup : public Commander {
if (subcommand_ == "create") {
auto s = stream_db.CreateGroup(stream_name_, xgroup_create_options_,
group_name_);
if (!s.ok()) {
+ if (s.IsBusy()) {
+ return {Status::RedisBusyGroup, "consumer group name '" +
group_name_ + "' already exists"};
+ }
return {Status::RedisExecErr, s.ToString()};
}
@@ -560,6 +561,9 @@ class CommandXGroup : public Commander {
int created_number = 0;
auto s = stream_db.CreateConsumer(stream_name_, group_name_,
consumer_name_, &created_number);
if (!s.ok()) {
+ if (s.IsNotFound()) {
+ return {Status::RedisNoGroup, "No such consumer group " +
group_name_ + " for key name " + stream_name_};
+ }
return {Status::RedisExecErr, s.ToString()};
}
@@ -570,6 +574,9 @@ class CommandXGroup : public Commander {
uint64_t deleted_pel = 0;
auto s = stream_db.DestroyConsumer(stream_name_, group_name_,
consumer_name_, deleted_pel);
if (!s.ok()) {
+ if (s.IsNotFound()) {
+ return {Status::RedisNoGroup, "No such consumer group " +
group_name_ + " for key name " + stream_name_};
+ }
return {Status::RedisExecErr, s.ToString()};
}
@@ -579,6 +586,9 @@ class CommandXGroup : public Commander {
if (subcommand_ == "setid") {
auto s = stream_db.GroupSetId(stream_name_, group_name_,
xgroup_create_options_);
if (!s.ok()) {
+ if (s.IsNotFound()) {
+ return {Status::RedisNoGroup, "No such consumer group " +
group_name_ + " for key name " + stream_name_};
+ }
return {Status::RedisExecErr, s.ToString()};
}
diff --git a/src/common/status.h b/src/common/status.h
index d06c2f06..2bd610ad 100644
--- a/src/common/status.h
+++ b/src/common/status.h
@@ -59,6 +59,8 @@ class [[nodiscard]] Status {
RedisWrongType,
RedisReadOnly,
RedisExecAbort,
+ RedisBusyGroup,
+ RedisNoGroup,
RedisMoved,
RedisCrossSlot,
RedisTryAgain,
diff --git a/src/server/redis_reply.cc b/src/server/redis_reply.cc
index 20e15e51..52fc180e 100644
--- a/src/server/redis_reply.cc
+++ b/src/server/redis_reply.cc
@@ -24,13 +24,14 @@
#include <numeric>
const std::map<Status::Code, std::string> redisErrorPrefixMapping = {
- {Status::RedisErrorNoPrefix, ""}, {Status::RedisNoProto,
"NOPROTO"},
- {Status::RedisLoading, "LOADING"}, {Status::RedisMasterDown,
"MASTERDOWN"},
- {Status::RedisNoScript, "NOSCRIPT"}, {Status::RedisNoAuth, "NOAUTH"},
- {Status::RedisWrongType, "WRONGTYPE"}, {Status::RedisReadOnly,
"READONLY"},
- {Status::RedisExecAbort, "EXECABORT"}, {Status::RedisMoved, "MOVED"},
- {Status::RedisCrossSlot, "CROSSSLOT"}, {Status::RedisTryAgain,
"TRYAGAIN"},
- {Status::RedisClusterDown, "CLUSTERDOWN"}};
+ {Status::RedisErrorNoPrefix, ""}, {Status::RedisNoProto,
"NOPROTO"},
+ {Status::RedisLoading, "LOADING"}, {Status::RedisMasterDown,
"MASTERDOWN"},
+ {Status::RedisNoScript, "NOSCRIPT"}, {Status::RedisNoAuth, "NOAUTH"},
+ {Status::RedisWrongType, "WRONGTYPE"}, {Status::RedisReadOnly,
"READONLY"},
+ {Status::RedisExecAbort, "EXECABORT"}, {Status::RedisMoved, "MOVED"},
+ {Status::RedisCrossSlot, "CROSSSLOT"}, {Status::RedisTryAgain,
"TRYAGAIN"},
+ {Status::RedisClusterDown, "CLUSTERDOWN"}, {Status::RedisNoGroup,
"NOGROUP"},
+ {Status::RedisBusyGroup, "BUSYGROUP"}};
namespace redis {
diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc
index 97fcf948..a3f6c330 100644
--- a/src/types/redis_stream.cc
+++ b/src/types/redis_stream.cc
@@ -392,15 +392,9 @@ rocksdb::Status Stream::ClaimPelEntries(const Slice
&stream_name, const std::str
std::string group_key = internalKeyFromGroupName(ns_key, metadata,
group_name);
std::string get_group_value;
s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, group_key,
&get_group_value);
- if (!s.ok() && !s.IsNotFound()) {
- return s;
- }
- if (s.IsNotFound()) {
- return rocksdb::Status::InvalidArgument("NOGROUP No such consumer group "
+ group_name + " for key name " +
- stream_name.ToString());
- }
- StreamConsumerGroupMetadata group_metadata =
decodeStreamConsumerGroupMetadataValue(get_group_value);
+ if (!s.ok()) return s;
+ StreamConsumerGroupMetadata group_metadata =
decodeStreamConsumerGroupMetadataValue(get_group_value);
std::string consumer_key = internalKeyFromConsumerName(ns_key, metadata,
group_name, consumer_name);
std::string get_consumer_value;
s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, consumer_key,
&get_consumer_value);
@@ -716,7 +710,7 @@ rocksdb::Status Stream::CreateGroup(const Slice
&stream_name, const StreamXGroup
if (!s.ok()) {
return s;
}
- return rocksdb::Status::InvalidArgument("BUSYGROUP Consumer Group name
already exists");
+ return rocksdb::Status::Busy();
}
batch->Put(stream_cf_handle_, entry_key, entry_value);
@@ -799,13 +793,7 @@ rocksdb::Status Stream::createConsumerWithoutLock(const
Slice &stream_name, cons
std::string entry_key = internalKeyFromGroupName(ns_key, metadata,
group_name);
std::string get_entry_value;
s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, entry_key,
&get_entry_value);
- if (!s.ok() && !s.IsNotFound()) {
- return s;
- }
- if (s.IsNotFound()) {
- return rocksdb::Status::InvalidArgument("NOGROUP No such consumer group "
+ group_name + " for key name " +
- stream_name.ToString());
- }
+ if (!s.ok()) return s;
StreamConsumerMetadata consumer_metadata;
auto now = util::GetTimeStampMS();
@@ -856,13 +844,7 @@ rocksdb::Status Stream::DestroyConsumer(const Slice
&stream_name, const std::str
std::string group_key = internalKeyFromGroupName(ns_key, metadata,
group_name);
std::string get_group_value;
s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, group_key,
&get_group_value);
- if (!s.ok() && !s.IsNotFound()) {
- return s;
- }
- if (s.IsNotFound()) {
- return rocksdb::Status::InvalidArgument("NOGROUP No such consumer group "
+ group_name + " for key name " +
- stream_name.ToString());
- }
+ if (!s.ok()) return s;
std::string consumer_key = internalKeyFromConsumerName(ns_key, metadata,
group_name, consumer_name);
std::string get_consumer_value;
@@ -932,13 +914,7 @@ rocksdb::Status Stream::GroupSetId(const Slice
&stream_name, const std::string &
std::string entry_key = internalKeyFromGroupName(ns_key, metadata,
group_name);
std::string get_entry_value;
s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, entry_key,
&get_entry_value);
- if (!s.ok() && !s.IsNotFound()) {
- return s;
- }
- if (s.IsNotFound()) {
- return rocksdb::Status::InvalidArgument("NOGROUP No such consumer group "
+ group_name + " for key name " +
- stream_name.ToString());
- }
+ if (!s.ok()) return s;
StreamConsumerGroupMetadata consumer_group_metadata =
decodeStreamConsumerGroupMetadataValue(get_entry_value);
if (options.last_id == "$") {
@@ -1455,13 +1431,7 @@ rocksdb::Status Stream::RangeWithPending(const Slice
&stream_name, StreamRangeOp
std::string group_key = internalKeyFromGroupName(ns_key, metadata,
group_name);
std::string get_group_value;
s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, group_key,
&get_group_value);
- if (!s.ok() && !s.IsNotFound()) {
- return s;
- }
- if (s.IsNotFound()) {
- return rocksdb::Status::InvalidArgument("NOGROUP No such consumer group "
+ group_name + " for key name " +
- stream_name.ToString());
- }
+ if (!s.ok()) return s;
std::string consumer_key = internalKeyFromConsumerName(ns_key, metadata,
group_name, consumer_name);
std::string get_consumer_value;
diff --git a/tests/gocase/unit/type/stream/stream_test.go
b/tests/gocase/unit/type/stream/stream_test.go
index b8958e70..e4048eaf 100644
--- a/tests/gocase/unit/type/stream/stream_test.go
+++ b/tests/gocase/unit/type/stream/stream_test.go
@@ -889,6 +889,9 @@ func TestStreamOffset(t *testing.T) {
require.NoError(t, rdb.Del(ctx, "myStream").Err())
require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream:
"myStream", Values: []string{"iTeM", "1", "vAluE", "a"}}).Err())
require.NoError(t, rdb.XGroupCreate(ctx, "myStream", "myGroup",
"$").Err())
+ // duplicate create group
+ require.EqualError(t, rdb.XGroupCreate(ctx, "myStream",
"myGroup", "$").Err(),
+ "BUSYGROUP consumer group name 'myGroup' already
exists")
result, err := rdb.XGroupDestroy(ctx, "myStream",
"myGroup").Result()
require.NoError(t, err)
require.Equal(t, int64(1), result)
@@ -931,8 +934,12 @@ func TestStreamOffset(t *testing.T) {
ID: "1-0",
Values: []string{"data", "a"},
}).Err())
+
//no such group
- require.Error(t, rdb.XGroupCreateConsumer(ctx, streamName,
groupName, consumerName).Err())
+ expectedError := fmt.Sprintf("NOGROUP No such consumer group %s
for key name %s", groupName, streamName)
+ require.EqualError(t, rdb.XGroupCreateConsumer(ctx, streamName,
groupName, consumerName).Err(), expectedError)
+ require.EqualError(t, rdb.XGroupDelConsumer(ctx, streamName,
groupName, consumerName).Err(), expectedError)
+
require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName,
"$").Err())
require.NoError(t, rdb.XGroupCreateConsumer(ctx, streamName,
groupName, consumerName).Err())
@@ -1009,7 +1016,8 @@ func TestStreamOffset(t *testing.T) {
Values: []string{"data", "a"},
}).Err())
//No such group
- require.Error(t, rdb.XGroupSetID(ctx, streamName, groupName,
"$").Err())
+ require.EqualError(t, rdb.XGroupSetID(ctx, streamName,
groupName, "$").Err(),
+ fmt.Sprintf("NOGROUP No such consumer group %s for key
name %s", groupName, streamName))
require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName,
"$").Err())
require.NoError(t, rdb.XGroupSetID(ctx, streamName, groupName,
"0-0").Err())
@@ -1342,6 +1350,17 @@ func TestStreamOffset(t *testing.T) {
ID: "1-0",
Values: []string{"field1", "data1"},
}).Err())
+
+ // No such group
+ err := rdb.XClaim(ctx, &redis.XClaimArgs{
+ Stream: streamName,
+ Group: groupName,
+ Consumer: consumer1Name,
+ MinIdle: 0,
+ Messages: []string{"1-0"},
+ }).Err()
+ require.EqualError(t, err, fmt.Sprintf("NOGROUP No such key
'%s' or consumer group '%s'", streamName, groupName))
+
require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName,
"0").Err())
r, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: groupName,
@@ -1547,6 +1566,15 @@ func TestStreamOffset(t *testing.T) {
id4 = rsp.Val()
}
+ // No such group
+ err := rdb.XAutoClaimJustID(ctx, &redis.XAutoClaimArgs{
+ Stream: streamName,
+ Group: groupName,
+ Consumer: "consumer",
+ MinIdle: 10 * time.Millisecond,
+ Start: "-",
+ }).Err()
+ require.EqualError(t, err, fmt.Sprintf("NOGROUP No such key
'%s' or consumer group '%s'", streamName, groupName))
require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName,
"0").Err())
consumer1 := "consumer1"