This is an automated email from the ASF dual-hosted git repository.
wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new 645013bc Remove recursion of redis command parser (#3136)
645013bc is described below
commit 645013bce13ebff4ea3f099f86705b3f61a7a6d0
Author: sunhao <[email protected]>
AuthorDate: Sat Nov 15 14:42:21 2025 +0800
Remove recursion of redis command parser (#3136)
---
src/brpc/redis_command.cpp | 88 +++++++++++++++++++++++++++++---------------
src/brpc/redis_command.h | 14 +++++++
test/brpc_redis_unittest.cpp | 19 ++++++++++
3 files changed, 92 insertions(+), 29 deletions(-)
diff --git a/src/brpc/redis_command.cpp b/src/brpc/redis_command.cpp
index 5803aaa8..d5e76c39 100644
--- a/src/brpc/redis_command.cpp
+++ b/src/brpc/redis_command.cpp
@@ -377,27 +377,50 @@ size_t RedisCommandParser::ParsedArgsSize() {
ParseError RedisCommandParser::Consume(butil::IOBuf& buf,
std::vector<butil::StringPiece>* args,
butil::Arena* arena) {
+ ParseError err = PARSE_OK;
+ do {
+ RedisCommandConsumeState state = ConsumeImpl(buf, arena, &err);
+ if (state == CONSUME_STATE_CONTINUE) {
+ continue;
+ } else if (state == CONSUME_STATE_DONE) {
+ break;
+ } else {
+ return err;
+ }
+ } while (true);
+
+ args->swap(_args);
+ Reset();
+ return PARSE_OK;
+}
+
+RedisCommandConsumeState RedisCommandParser::ConsumeImpl(butil::IOBuf& buf,
+ butil::Arena* arena,
+ ParseError* err) {
const auto pfc = static_cast<const char *>(buf.fetch1());
if (pfc == NULL) {
- return PARSE_ERROR_NOT_ENOUGH_DATA;
+ *err = PARSE_ERROR_NOT_ENOUGH_DATA;
+ return CONSUME_STATE_ERROR;
}
// '*' stands for array "*<size>\r\n<sub-reply1><sub-reply2>..."
if (!_parsing_array && *pfc != '*') {
if (!std::isalpha(static_cast<unsigned char>(*pfc))) {
- return PARSE_ERROR_TRY_OTHERS;
+ *err = PARSE_ERROR_TRY_OTHERS;
+ return CONSUME_STATE_ERROR;
}
const size_t buf_size = buf.size();
const auto copy_str = static_cast<char *>(arena->allocate(buf_size +
1));
buf.copy_to(copy_str, buf_size);
if (*copy_str == ' ') {
- return PARSE_ERROR_ABSOLUTELY_WRONG;
+ *err = PARSE_ERROR_ABSOLUTELY_WRONG;
+ return CONSUME_STATE_ERROR;
}
copy_str[buf_size] = '\0';
const size_t crlf_pos = butil::StringPiece(copy_str,
buf_size).find("\r\n");
if (crlf_pos == butil::StringPiece::npos) { // not enough data
- return PARSE_ERROR_NOT_ENOUGH_DATA;
+ *err = PARSE_ERROR_NOT_ENOUGH_DATA;
+ return CONSUME_STATE_ERROR;
}
- args->clear();
size_t offset = 0;
while (offset < crlf_pos && copy_str[offset] != ' ') {
++offset;
@@ -407,11 +430,11 @@ ParseError RedisCommandParser::Consume(butil::IOBuf& buf,
for (size_t i = 0; i < offset; ++i) {
first_arg[i] = tolower(first_arg[i]);
}
- args->push_back(butil::StringPiece(first_arg, offset));
+ _args.push_back(butil::StringPiece(first_arg, offset));
if (offset == crlf_pos) {
// only one argument, directly return
buf.pop_front(crlf_pos + 2);
- return PARSE_OK;
+ return CONSUME_STATE_DONE;
}
size_t arg_start_pos = ++offset;
@@ -422,7 +445,7 @@ ParseError RedisCommandParser::Consume(butil::IOBuf& buf,
const auto arg_length = offset - arg_start_pos;
const auto arg = static_cast<char *>(arena->allocate(arg_length));
memcpy(arg, copy_str + arg_start_pos, arg_length);
- args->push_back(butil::StringPiece(arg, arg_length));
+ _args.push_back(butil::StringPiece(arg, arg_length));
arg_start_pos = ++offset;
}
@@ -431,61 +454,69 @@ ParseError RedisCommandParser::Consume(butil::IOBuf& buf,
const auto arg_length = crlf_pos - arg_start_pos;
const auto arg = static_cast<char *>(arena->allocate(arg_length));
memcpy(arg, copy_str + arg_start_pos, arg_length);
- args->push_back(butil::StringPiece(arg, arg_length));
+ _args.push_back(butil::StringPiece(arg, arg_length));
}
buf.pop_front(crlf_pos + 2);
- return PARSE_OK;
+ return CONSUME_STATE_DONE;
}
// '$' stands for bulk string "$<length>\r\n<string>\r\n"
if (_parsing_array && *pfc != '$') {
- return PARSE_ERROR_ABSOLUTELY_WRONG;
+ *err = PARSE_ERROR_ABSOLUTELY_WRONG;
+ return CONSUME_STATE_ERROR;
}
char intbuf[32]; // enough for fc + 64-bit decimal + \r\n
const size_t ncopied = buf.copy_to(intbuf, sizeof(intbuf) - 1);
intbuf[ncopied] = '\0';
const size_t crlf_pos = butil::StringPiece(intbuf, ncopied).find("\r\n");
if (crlf_pos == butil::StringPiece::npos) { // not enough data
- return PARSE_ERROR_NOT_ENOUGH_DATA;
+ *err = PARSE_ERROR_NOT_ENOUGH_DATA;
+ return CONSUME_STATE_ERROR;
}
char* endptr = NULL;
int64_t value = strtoll(intbuf + 1/*skip fc*/, &endptr, 10);
if (endptr != intbuf + crlf_pos) {
LOG(ERROR) << '`' << intbuf + 1 << "' is not a valid 64-bit decimal";
- return PARSE_ERROR_ABSOLUTELY_WRONG;
+ *err = PARSE_ERROR_ABSOLUTELY_WRONG;
+ return CONSUME_STATE_ERROR;
}
if (value < 0) {
LOG(ERROR) << "Invalid len=" << value << " in redis command";
- return PARSE_ERROR_ABSOLUTELY_WRONG;
+ *err = PARSE_ERROR_ABSOLUTELY_WRONG;
+ return CONSUME_STATE_ERROR;
}
if (!_parsing_array) {
if (value > (int64_t)(FLAGS_redis_max_allocation_size /
sizeof(butil::StringPiece))) {
- LOG(ERROR) << "command array size exceeds limit! max="
- << (FLAGS_redis_max_allocation_size /
sizeof(butil::StringPiece))
+ LOG(ERROR) << "command array size exceeds limit! max="
+ << (FLAGS_redis_max_allocation_size /
sizeof(butil::StringPiece))
<< ", actually=" << value;
- return PARSE_ERROR_ABSOLUTELY_WRONG;
+ *err = PARSE_ERROR_ABSOLUTELY_WRONG;
+ return CONSUME_STATE_ERROR;
}
buf.pop_front(crlf_pos + 2/*CRLF*/);
_parsing_array = true;
_length = value;
_index = 0;
_args.resize(value);
- return Consume(buf, args, arena);
+ return CONSUME_STATE_CONTINUE;
}
CHECK(_index < _length) << "a complete command has been parsed. "
- "impl of RedisCommandParser::Parse is buggy";
+ "impl of RedisCommandParser::Parse is buggy";
const int64_t len = value; // `value' is length of the string
if (len < 0) {
LOG(ERROR) << "string in command is nil!";
- return PARSE_ERROR_ABSOLUTELY_WRONG;
+ *err = PARSE_ERROR_ABSOLUTELY_WRONG;
+ return CONSUME_STATE_ERROR;
}
if (len > FLAGS_redis_max_allocation_size) {
- LOG(ERROR) << "command string exceeds max allocation size! max="
+ LOG(ERROR) << "command string exceeds max allocation size! max="
<< FLAGS_redis_max_allocation_size << ", actually=" << len;
- return PARSE_ERROR_ABSOLUTELY_WRONG;
+ *err = PARSE_ERROR_ABSOLUTELY_WRONG;
+ return CONSUME_STATE_ERROR;
}
if (buf.size() < crlf_pos + 2 + (size_t)len + 2/*CRLF*/) {
- return PARSE_ERROR_NOT_ENOUGH_DATA;
+ *err = PARSE_ERROR_NOT_ENOUGH_DATA;
+ return CONSUME_STATE_ERROR;
}
buf.pop_front(crlf_pos + 2/*CRLF*/);
char* d = (char*)arena->allocate((len/8 + 1) * 8);
@@ -502,14 +533,13 @@ ParseError RedisCommandParser::Consume(butil::IOBuf& buf,
buf.cutn(crlf, sizeof(crlf));
if (crlf[0] != '\r' || crlf[1] != '\n') {
LOG(ERROR) << "string in command is not ended with CRLF";
- return PARSE_ERROR_ABSOLUTELY_WRONG;
+ *err = PARSE_ERROR_ABSOLUTELY_WRONG;
+ return CONSUME_STATE_ERROR;
}
- if (++_index < _length) {
- return Consume(buf, args, arena);
+ if (++_index == _length) {
+ return CONSUME_STATE_DONE;
}
- args->swap(_args);
- Reset();
- return PARSE_OK;
+ return CONSUME_STATE_CONTINUE;
}
void RedisCommandParser::Reset() {
diff --git a/src/brpc/redis_command.h b/src/brpc/redis_command.h
index 5ddfb8e9..25bd8859 100644
--- a/src/brpc/redis_command.h
+++ b/src/brpc/redis_command.h
@@ -43,6 +43,12 @@ butil::Status RedisCommandByComponents(butil::IOBuf* buf,
const butil::StringPiece* components,
size_t num_components);
+enum RedisCommandConsumeState {
+ CONSUME_STATE_CONTINUE,
+ CONSUME_STATE_DONE,
+ CONSUME_STATE_ERROR,
+};
+
// A parser used to parse redis raw command.
class RedisCommandParser {
public:
@@ -59,6 +65,14 @@ private:
// Reset parser to the initial state.
void Reset();
+ // Consume one arg from `buf'.
+ // Return CONSUME_STATE_CONTINUE if the parser needs more data.
+ // Return CONSUME_STATE_DONE if the parser has parsed a complete command.
+ // Return CONSUME_STATE_ERROR if the parser meets an error.
+ RedisCommandConsumeState ConsumeImpl(butil::IOBuf& buf,
+ butil::Arena* arena,
+ ParseError* err);
+
bool _parsing_array; // if the parser has met array indicator
'*'
int _length; // array length
int _index; // current parsing array index
diff --git a/test/brpc_redis_unittest.cpp b/test/brpc_redis_unittest.cpp
index 5c8aef12..4dffd55e 100644
--- a/test/brpc_redis_unittest.cpp
+++ b/test/brpc_redis_unittest.cpp
@@ -1423,6 +1423,25 @@ TEST_F(RedisTest, memory_allocation_limits) {
brpc::ParseError err = parser.Consume(buf, &args, &arena);
ASSERT_EQ(brpc::PARSE_ERROR_ABSOLUTELY_WRONG, err);
}
+
+ {
+ // Test large command array work
+ int32_t original_limit_tmp = brpc::FLAGS_redis_max_allocation_size;
+ brpc::FLAGS_redis_max_allocation_size = 1024 * 1024;
+ brpc::RedisCommandParser parser;
+ butil::IOBuf buf;
+ int32_t large_array_size = brpc::FLAGS_redis_max_allocation_size /
sizeof(butil::StringPiece);
+ std::string large_array_cmd = "*" + std::to_string(large_array_size) +
"\r\n";
+ for(int i = 0; i < large_array_size; i++){
+ large_array_cmd.append("$1\r\n1\r\n");
+ }
+ buf.append(large_array_cmd);
+
+ std::vector<butil::StringPiece> args;
+ brpc::ParseError err = parser.Consume(buf, &args, &arena);
+ ASSERT_EQ(brpc::PARSE_OK, err);
+ brpc::FLAGS_redis_max_allocation_size = original_limit_tmp;
+ }
// Test valid cases within limits
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]