This is an automated email from the ASF dual-hosted git repository.

wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new 353f2d0a add inline redis protocol support (#3024)
353f2d0a is described below

commit 353f2d0af11b4fb951b25ad4bb3e671d5c5e67ad
Author: Jerry Zhao <ch3nz...@gmail.com>
AuthorDate: Fri Jul 25 19:35:33 2025 +0800

    add inline redis protocol support (#3024)
    
    * add inline redis protocol support
    
    * complete code
    
    * add check
    
    * fix
    
    * add inline unitest
    
    * use find
    
    * fix
---
 src/brpc/redis_command.cpp   | 57 ++++++++++++++++++++++++++++++++++++++++++--
 test/brpc_redis_unittest.cpp | 23 +++++++++++++++++-
 2 files changed, 77 insertions(+), 3 deletions(-)

diff --git a/src/brpc/redis_command.cpp b/src/brpc/redis_command.cpp
index 6eabd59b..5803aaa8 100644
--- a/src/brpc/redis_command.cpp
+++ b/src/brpc/redis_command.cpp
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <cctype>
 #include <limits>
 
 #include "butil/logging.h"
@@ -376,13 +377,65 @@ size_t RedisCommandParser::ParsedArgsSize() {
 ParseError RedisCommandParser::Consume(butil::IOBuf& buf,
                                        std::vector<butil::StringPiece>* args,
                                        butil::Arena* arena) {
-    const char* pfc = (const char*)buf.fetch1();
+    const auto pfc = static_cast<const char *>(buf.fetch1());
     if (pfc == NULL) {
         return PARSE_ERROR_NOT_ENOUGH_DATA;
     }
     // '*' stands for array "*<size>\r\n<sub-reply1><sub-reply2>..."
     if (!_parsing_array && *pfc != '*') {
-        return PARSE_ERROR_TRY_OTHERS;
+        if (!std::isalpha(static_cast<unsigned char>(*pfc))) {
+            return PARSE_ERROR_TRY_OTHERS;
+        }
+        const size_t buf_size = buf.size();
+        const auto copy_str = static_cast<char *>(arena->allocate(buf_size + 
1));
+        buf.copy_to(copy_str, buf_size);
+        if (*copy_str == ' ') {
+            return PARSE_ERROR_ABSOLUTELY_WRONG;
+        }
+        copy_str[buf_size] = '\0';
+        const size_t crlf_pos = butil::StringPiece(copy_str, 
buf_size).find("\r\n");
+        if (crlf_pos == butil::StringPiece::npos) {  // not enough data
+            return PARSE_ERROR_NOT_ENOUGH_DATA;
+        }
+        args->clear();
+        size_t offset = 0;
+        while (offset < crlf_pos && copy_str[offset] != ' ') {
+            ++offset;
+        }
+        const auto first_arg = static_cast<char*>(arena->allocate(offset));
+        memcpy(first_arg, copy_str, offset);
+        for (size_t i = 0; i < offset; ++i) {
+            first_arg[i] = tolower(first_arg[i]);
+        }
+        args->push_back(butil::StringPiece(first_arg, offset));
+        if (offset == crlf_pos) {
+            // only one argument, directly return
+            buf.pop_front(crlf_pos + 2);
+            return PARSE_OK;
+        }
+        size_t arg_start_pos = ++offset;
+
+        for (; offset < crlf_pos; ++offset) {
+            if (copy_str[offset] != ' ') {
+                continue;
+            }
+            const auto arg_length = offset - arg_start_pos;
+            const auto arg = static_cast<char *>(arena->allocate(arg_length));
+            memcpy(arg, copy_str + arg_start_pos, arg_length);
+            args->push_back(butil::StringPiece(arg, arg_length));
+            arg_start_pos = ++offset;
+        }
+
+        if (arg_start_pos < crlf_pos) {
+            // process the last argument
+            const auto arg_length = crlf_pos - arg_start_pos;
+            const auto arg = static_cast<char *>(arena->allocate(arg_length));
+            memcpy(arg, copy_str + arg_start_pos, arg_length);
+            args->push_back(butil::StringPiece(arg, arg_length));
+        }
+
+        buf.pop_front(crlf_pos + 2);
+        return PARSE_OK;
     }
     // '$' stands for bulk string "$<length>\r\n<string>\r\n"
     if (_parsing_array && *pfc != '$') {
diff --git a/test/brpc_redis_unittest.cpp b/test/brpc_redis_unittest.cpp
index c1e8d059..5e38c374 100644
--- a/test/brpc_redis_unittest.cpp
+++ b/test/brpc_redis_unittest.cpp
@@ -582,7 +582,7 @@ TEST_F(RedisTest, command_parser) {
         ASSERT_EQ(command, GetCompleteCommand(command_out));
     }
     {
-        // simulate parsing from network
+        // simulate parsing from network following RESP
         int t = 100;
         std::string 
raw_string("*3\r\n$3\r\nset\r\n$3\r\nabc\r\n$3\r\ndef\r\n");
         int size = raw_string.size();
@@ -602,6 +602,27 @@ TEST_F(RedisTest, command_parser) {
             ASSERT_EQ(GetCompleteCommand(command_out), "set abc def");
         }
     }
+    {
+        // simulate parsing from network under inline protocol
+        int t = 100;
+        std::string raw_string("set abc def\r\n");
+        int size = raw_string.size();
+        while (t--) {
+            for (int i = 0; i < size; ++i) {
+                buf.push_back(raw_string[i]);
+                if (i == size - 1) {
+                    ASSERT_EQ(brpc::PARSE_OK, parser.Consume(buf, 
&command_out, &arena));
+                } else {
+                    if (butil::fast_rand_less_than(2) == 0) {
+                        ASSERT_EQ(brpc::PARSE_ERROR_NOT_ENOUGH_DATA,
+                                parser.Consume(buf, &command_out, &arena));
+                    }
+                }
+            }
+            ASSERT_TRUE(buf.empty());
+            ASSERT_EQ(GetCompleteCommand(command_out), "set abc def");
+        }
+    }
     {
         // there is a non-string message in command and parse should fail
         buf.append("*3\r\n$3");


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org
For additional commands, e-mail: dev-h...@brpc.apache.org

Reply via email to