This is an automated email from the ASF dual-hosted git repository. wwbmmm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push: new 353f2d0a add inline redis protocol support (#3024) 353f2d0a is described below commit 353f2d0af11b4fb951b25ad4bb3e671d5c5e67ad Author: Jerry Zhao <ch3nz...@gmail.com> AuthorDate: Fri Jul 25 19:35:33 2025 +0800 add inline redis protocol support (#3024) * add inline redis protocol support * complete code * add check * fix * add inline unitest * use find * fix --- src/brpc/redis_command.cpp | 57 ++++++++++++++++++++++++++++++++++++++++++-- test/brpc_redis_unittest.cpp | 23 +++++++++++++++++- 2 files changed, 77 insertions(+), 3 deletions(-) diff --git a/src/brpc/redis_command.cpp b/src/brpc/redis_command.cpp index 6eabd59b..5803aaa8 100644 --- a/src/brpc/redis_command.cpp +++ b/src/brpc/redis_command.cpp @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +#include <cctype> #include <limits> #include "butil/logging.h" @@ -376,13 +377,65 @@ size_t RedisCommandParser::ParsedArgsSize() { ParseError RedisCommandParser::Consume(butil::IOBuf& buf, std::vector<butil::StringPiece>* args, butil::Arena* arena) { - const char* pfc = (const char*)buf.fetch1(); + const auto pfc = static_cast<const char *>(buf.fetch1()); if (pfc == NULL) { return PARSE_ERROR_NOT_ENOUGH_DATA; } // '*' stands for array "*<size>\r\n<sub-reply1><sub-reply2>..." if (!_parsing_array && *pfc != '*') { - return PARSE_ERROR_TRY_OTHERS; + if (!std::isalpha(static_cast<unsigned char>(*pfc))) { + return PARSE_ERROR_TRY_OTHERS; + } + const size_t buf_size = buf.size(); + const auto copy_str = static_cast<char *>(arena->allocate(buf_size + 1)); + buf.copy_to(copy_str, buf_size); + if (*copy_str == ' ') { + return PARSE_ERROR_ABSOLUTELY_WRONG; + } + copy_str[buf_size] = '\0'; + const size_t crlf_pos = butil::StringPiece(copy_str, buf_size).find("\r\n"); + if (crlf_pos == butil::StringPiece::npos) { // not enough data + return PARSE_ERROR_NOT_ENOUGH_DATA; + } + args->clear(); + size_t offset = 0; + while (offset < crlf_pos && copy_str[offset] != ' ') { + ++offset; + } + const auto first_arg = static_cast<char*>(arena->allocate(offset)); + memcpy(first_arg, copy_str, offset); + for (size_t i = 0; i < offset; ++i) { + first_arg[i] = tolower(first_arg[i]); + } + args->push_back(butil::StringPiece(first_arg, offset)); + if (offset == crlf_pos) { + // only one argument, directly return + buf.pop_front(crlf_pos + 2); + return PARSE_OK; + } + size_t arg_start_pos = ++offset; + + for (; offset < crlf_pos; ++offset) { + if (copy_str[offset] != ' ') { + continue; + } + const auto arg_length = offset - arg_start_pos; + const auto arg = static_cast<char *>(arena->allocate(arg_length)); + memcpy(arg, copy_str + arg_start_pos, arg_length); + args->push_back(butil::StringPiece(arg, arg_length)); + arg_start_pos = ++offset; + } + + if (arg_start_pos < crlf_pos) { + // process the last argument + const auto arg_length = crlf_pos - arg_start_pos; + const auto arg = static_cast<char *>(arena->allocate(arg_length)); + memcpy(arg, copy_str + arg_start_pos, arg_length); + args->push_back(butil::StringPiece(arg, arg_length)); + } + + buf.pop_front(crlf_pos + 2); + return PARSE_OK; } // '$' stands for bulk string "$<length>\r\n<string>\r\n" if (_parsing_array && *pfc != '$') { diff --git a/test/brpc_redis_unittest.cpp b/test/brpc_redis_unittest.cpp index c1e8d059..5e38c374 100644 --- a/test/brpc_redis_unittest.cpp +++ b/test/brpc_redis_unittest.cpp @@ -582,7 +582,7 @@ TEST_F(RedisTest, command_parser) { ASSERT_EQ(command, GetCompleteCommand(command_out)); } { - // simulate parsing from network + // simulate parsing from network following RESP int t = 100; std::string raw_string("*3\r\n$3\r\nset\r\n$3\r\nabc\r\n$3\r\ndef\r\n"); int size = raw_string.size(); @@ -602,6 +602,27 @@ TEST_F(RedisTest, command_parser) { ASSERT_EQ(GetCompleteCommand(command_out), "set abc def"); } } + { + // simulate parsing from network under inline protocol + int t = 100; + std::string raw_string("set abc def\r\n"); + int size = raw_string.size(); + while (t--) { + for (int i = 0; i < size; ++i) { + buf.push_back(raw_string[i]); + if (i == size - 1) { + ASSERT_EQ(brpc::PARSE_OK, parser.Consume(buf, &command_out, &arena)); + } else { + if (butil::fast_rand_less_than(2) == 0) { + ASSERT_EQ(brpc::PARSE_ERROR_NOT_ENOUGH_DATA, + parser.Consume(buf, &command_out, &arena)); + } + } + } + ASSERT_TRUE(buf.empty()); + ASSERT_EQ(GetCompleteCommand(command_out), "set abc def"); + } + } { // there is a non-string message in command and parse should fail buf.append("*3\r\n$3"); --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org