This is an automated email from the ASF dual-hosted git repository.

wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new 645013bc Remove recursion of redis command parser (#3136)
645013bc is described below

commit 645013bce13ebff4ea3f099f86705b3f61a7a6d0
Author: sunhao <[email protected]>
AuthorDate: Sat Nov 15 14:42:21 2025 +0800

    Remove recursion of redis command parser (#3136)
---
 src/brpc/redis_command.cpp   | 88 +++++++++++++++++++++++++++++---------------
 src/brpc/redis_command.h     | 14 +++++++
 test/brpc_redis_unittest.cpp | 19 ++++++++++
 3 files changed, 92 insertions(+), 29 deletions(-)

diff --git a/src/brpc/redis_command.cpp b/src/brpc/redis_command.cpp
index 5803aaa8..d5e76c39 100644
--- a/src/brpc/redis_command.cpp
+++ b/src/brpc/redis_command.cpp
@@ -377,27 +377,50 @@ size_t RedisCommandParser::ParsedArgsSize() {
 ParseError RedisCommandParser::Consume(butil::IOBuf& buf,
                                        std::vector<butil::StringPiece>* args,
                                        butil::Arena* arena) {
+    ParseError err = PARSE_OK;
+    do {
+        RedisCommandConsumeState state = ConsumeImpl(buf, arena, &err);
+        if (state == CONSUME_STATE_CONTINUE) {
+            continue;
+        } else if (state == CONSUME_STATE_DONE) {
+            break;
+        } else {
+            return err;
+        }
+    } while (true);
+    
+    args->swap(_args);
+    Reset();
+    return PARSE_OK;
+}
+
+RedisCommandConsumeState RedisCommandParser::ConsumeImpl(butil::IOBuf& buf,
+                                                         butil::Arena* arena,
+                                                         ParseError* err) {
     const auto pfc = static_cast<const char *>(buf.fetch1());
     if (pfc == NULL) {
-        return PARSE_ERROR_NOT_ENOUGH_DATA;
+        *err = PARSE_ERROR_NOT_ENOUGH_DATA;
+        return CONSUME_STATE_ERROR;
     }
     // '*' stands for array "*<size>\r\n<sub-reply1><sub-reply2>..."
     if (!_parsing_array && *pfc != '*') {
         if (!std::isalpha(static_cast<unsigned char>(*pfc))) {
-            return PARSE_ERROR_TRY_OTHERS;
+            *err = PARSE_ERROR_TRY_OTHERS;
+            return CONSUME_STATE_ERROR;
         }
         const size_t buf_size = buf.size();
         const auto copy_str = static_cast<char *>(arena->allocate(buf_size + 
1));
         buf.copy_to(copy_str, buf_size);
         if (*copy_str == ' ') {
-            return PARSE_ERROR_ABSOLUTELY_WRONG;
+            *err = PARSE_ERROR_ABSOLUTELY_WRONG;
+            return CONSUME_STATE_ERROR;
         }
         copy_str[buf_size] = '\0';
         const size_t crlf_pos = butil::StringPiece(copy_str, 
buf_size).find("\r\n");
         if (crlf_pos == butil::StringPiece::npos) {  // not enough data
-            return PARSE_ERROR_NOT_ENOUGH_DATA;
+            *err = PARSE_ERROR_NOT_ENOUGH_DATA;
+            return CONSUME_STATE_ERROR;
         }
-        args->clear();
         size_t offset = 0;
         while (offset < crlf_pos && copy_str[offset] != ' ') {
             ++offset;
@@ -407,11 +430,11 @@ ParseError RedisCommandParser::Consume(butil::IOBuf& buf,
         for (size_t i = 0; i < offset; ++i) {
             first_arg[i] = tolower(first_arg[i]);
         }
-        args->push_back(butil::StringPiece(first_arg, offset));
+        _args.push_back(butil::StringPiece(first_arg, offset));
         if (offset == crlf_pos) {
             // only one argument, directly return
             buf.pop_front(crlf_pos + 2);
-            return PARSE_OK;
+            return CONSUME_STATE_DONE;
         }
         size_t arg_start_pos = ++offset;
 
@@ -422,7 +445,7 @@ ParseError RedisCommandParser::Consume(butil::IOBuf& buf,
             const auto arg_length = offset - arg_start_pos;
             const auto arg = static_cast<char *>(arena->allocate(arg_length));
             memcpy(arg, copy_str + arg_start_pos, arg_length);
-            args->push_back(butil::StringPiece(arg, arg_length));
+            _args.push_back(butil::StringPiece(arg, arg_length));
             arg_start_pos = ++offset;
         }
 
@@ -431,61 +454,69 @@ ParseError RedisCommandParser::Consume(butil::IOBuf& buf,
             const auto arg_length = crlf_pos - arg_start_pos;
             const auto arg = static_cast<char *>(arena->allocate(arg_length));
             memcpy(arg, copy_str + arg_start_pos, arg_length);
-            args->push_back(butil::StringPiece(arg, arg_length));
+            _args.push_back(butil::StringPiece(arg, arg_length));
         }
 
         buf.pop_front(crlf_pos + 2);
-        return PARSE_OK;
+        return CONSUME_STATE_DONE;
     }
     // '$' stands for bulk string "$<length>\r\n<string>\r\n"
     if (_parsing_array && *pfc != '$') {
-        return PARSE_ERROR_ABSOLUTELY_WRONG;
+        *err = PARSE_ERROR_ABSOLUTELY_WRONG;
+        return CONSUME_STATE_ERROR;
     }
     char intbuf[32];  // enough for fc + 64-bit decimal + \r\n
     const size_t ncopied = buf.copy_to(intbuf, sizeof(intbuf) - 1);
     intbuf[ncopied] = '\0';
     const size_t crlf_pos = butil::StringPiece(intbuf, ncopied).find("\r\n");
     if (crlf_pos == butil::StringPiece::npos) {  // not enough data
-        return PARSE_ERROR_NOT_ENOUGH_DATA;
+        *err = PARSE_ERROR_NOT_ENOUGH_DATA;
+        return CONSUME_STATE_ERROR;
     }
     char* endptr = NULL;
     int64_t value = strtoll(intbuf + 1/*skip fc*/, &endptr, 10);
     if (endptr != intbuf + crlf_pos) {
         LOG(ERROR) << '`' << intbuf + 1 << "' is not a valid 64-bit decimal";
-        return PARSE_ERROR_ABSOLUTELY_WRONG;
+        *err = PARSE_ERROR_ABSOLUTELY_WRONG;
+        return CONSUME_STATE_ERROR;
     }
     if (value < 0) {
         LOG(ERROR) << "Invalid len=" << value << " in redis command";
-        return PARSE_ERROR_ABSOLUTELY_WRONG;
+        *err = PARSE_ERROR_ABSOLUTELY_WRONG;
+        return CONSUME_STATE_ERROR;
     }
     if (!_parsing_array) {
         if (value > (int64_t)(FLAGS_redis_max_allocation_size / 
sizeof(butil::StringPiece))) {
-            LOG(ERROR) << "command array size exceeds limit! max=" 
-                       << (FLAGS_redis_max_allocation_size / 
sizeof(butil::StringPiece)) 
+            LOG(ERROR) << "command array size exceeds limit! max="
+                       << (FLAGS_redis_max_allocation_size / 
sizeof(butil::StringPiece))
                        << ", actually=" << value;
-            return PARSE_ERROR_ABSOLUTELY_WRONG;
+            *err = PARSE_ERROR_ABSOLUTELY_WRONG;
+            return CONSUME_STATE_ERROR;
         }
         buf.pop_front(crlf_pos + 2/*CRLF*/);
         _parsing_array = true;
         _length = value;
         _index = 0;
         _args.resize(value);
-        return Consume(buf, args, arena);
+        return CONSUME_STATE_CONTINUE;
     }
     CHECK(_index < _length) << "a complete command has been parsed. "
-            "impl of RedisCommandParser::Parse is buggy";
+                               "impl of RedisCommandParser::Parse is buggy";
     const int64_t len = value;  // `value' is length of the string
     if (len < 0) {
         LOG(ERROR) << "string in command is nil!";
-        return PARSE_ERROR_ABSOLUTELY_WRONG;
+        *err = PARSE_ERROR_ABSOLUTELY_WRONG;
+        return CONSUME_STATE_ERROR;
     }
     if (len > FLAGS_redis_max_allocation_size) {
-        LOG(ERROR) << "command string exceeds max allocation size! max=" 
+        LOG(ERROR) << "command string exceeds max allocation size! max="
                    << FLAGS_redis_max_allocation_size << ", actually=" << len;
-        return PARSE_ERROR_ABSOLUTELY_WRONG;
+        *err = PARSE_ERROR_ABSOLUTELY_WRONG;
+        return CONSUME_STATE_ERROR;
     }
     if (buf.size() < crlf_pos + 2 + (size_t)len + 2/*CRLF*/) {
-        return PARSE_ERROR_NOT_ENOUGH_DATA;
+        *err = PARSE_ERROR_NOT_ENOUGH_DATA;
+        return CONSUME_STATE_ERROR;
     }
     buf.pop_front(crlf_pos + 2/*CRLF*/);
     char* d = (char*)arena->allocate((len/8 + 1) * 8);
@@ -502,14 +533,13 @@ ParseError RedisCommandParser::Consume(butil::IOBuf& buf,
     buf.cutn(crlf, sizeof(crlf));
     if (crlf[0] != '\r' || crlf[1] != '\n') {
         LOG(ERROR) << "string in command is not ended with CRLF";
-        return PARSE_ERROR_ABSOLUTELY_WRONG;
+        *err = PARSE_ERROR_ABSOLUTELY_WRONG;
+        return CONSUME_STATE_ERROR;
     }
-    if (++_index < _length) {
-        return Consume(buf, args, arena);
+    if (++_index == _length) {
+        return CONSUME_STATE_DONE;
     }
-    args->swap(_args);
-    Reset();
-    return PARSE_OK;
+    return CONSUME_STATE_CONTINUE;
 }
 
 void RedisCommandParser::Reset() {
diff --git a/src/brpc/redis_command.h b/src/brpc/redis_command.h
index 5ddfb8e9..25bd8859 100644
--- a/src/brpc/redis_command.h
+++ b/src/brpc/redis_command.h
@@ -43,6 +43,12 @@ butil::Status RedisCommandByComponents(butil::IOBuf* buf,
                                       const butil::StringPiece* components,
                                       size_t num_components);
 
+enum RedisCommandConsumeState {
+    CONSUME_STATE_CONTINUE,
+    CONSUME_STATE_DONE,
+    CONSUME_STATE_ERROR,
+};
+
 // A parser used to parse redis raw command.
 class RedisCommandParser {
 public:
@@ -59,6 +65,14 @@ private:
     // Reset parser to the initial state.
     void Reset();
 
+    // Consume one arg from `buf'. 
+    // Return CONSUME_STATE_CONTINUE if the parser needs more data. 
+    // Return CONSUME_STATE_DONE if the parser has parsed a complete command.
+    // Return CONSUME_STATE_ERROR if the parser meets an error.
+    RedisCommandConsumeState ConsumeImpl(butil::IOBuf& buf,
+                                         butil::Arena* arena,
+                                         ParseError* err);
+
     bool _parsing_array;            // if the parser has met array indicator 
'*'
     int _length;                    // array length
     int _index;                     // current parsing array index
diff --git a/test/brpc_redis_unittest.cpp b/test/brpc_redis_unittest.cpp
index 5c8aef12..4dffd55e 100644
--- a/test/brpc_redis_unittest.cpp
+++ b/test/brpc_redis_unittest.cpp
@@ -1423,6 +1423,25 @@ TEST_F(RedisTest, memory_allocation_limits) {
         brpc::ParseError err = parser.Consume(buf, &args, &arena);
         ASSERT_EQ(brpc::PARSE_ERROR_ABSOLUTELY_WRONG, err);
     }
+
+    {
+        // Test large command array work
+        int32_t original_limit_tmp = brpc::FLAGS_redis_max_allocation_size;
+        brpc::FLAGS_redis_max_allocation_size = 1024 * 1024;
+        brpc::RedisCommandParser parser;
+        butil::IOBuf buf;
+        int32_t large_array_size = brpc::FLAGS_redis_max_allocation_size / 
sizeof(butil::StringPiece);
+        std::string large_array_cmd = "*" + std::to_string(large_array_size) + 
"\r\n";
+        for(int i = 0; i < large_array_size; i++){
+            large_array_cmd.append("$1\r\n1\r\n");
+        }
+        buf.append(large_array_cmd);
+
+        std::vector<butil::StringPiece> args;
+        brpc::ParseError err = parser.Consume(buf, &args, &arena);
+        ASSERT_EQ(brpc::PARSE_OK, err);
+        brpc::FLAGS_redis_max_allocation_size = original_limit_tmp;
+    }
     
     // Test valid cases within limits
     {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to