jamesge commented on a change in pull request #972: Redis server protocol
URL: https://github.com/apache/incubator-brpc/pull/972#discussion_r358587955
 
 

 ##########
 File path: src/brpc/policy/redis_protocol.cpp
 ##########
 @@ -52,62 +54,202 @@ struct InputResponse : public InputMessageBase {
     }
 };
 
-// "Message" = "Response" as we only implement the client for redis.
+// This class is as parsing_context in socket.
+class RedisConnContext : public Destroyable  {
+public:
+    RedisConnContext()
+        : redis_service(NULL)
+        , batched_size(0) {}
+
+    ~RedisConnContext();
+    // @Destroyable
+    void Destroy() override;
+
+    SocketId socket_id;
+    RedisService* redis_service;
+    // If user starts a transaction, handler_continue indicates the
+    // handler pointer that runs the transaction command.
+    std::unique_ptr<RedisCommandHandler> handler_continue;
+    // >0 if command handler is run in batched mode.
+    int batched_size;
+
+    RedisCommandParser parser;
+};
+
+static std::string ToLowercase(const std::string& command) {
+    std::string res;
+    res.resize(command.size());
+    std::transform(command.begin(), command.end(), res.begin(),
+        [](unsigned char c){ return std::tolower(c); });
+    return res;
+}
+
+int ConsumeCommand(RedisConnContext* ctx,
+                   const std::unique_ptr<const char*[]>& commands,
+                   int len, butil::Arena* arena,
+                   bool is_last,
+                   butil::IOBuf* sendbuf) {
+    RedisReply output(arena);
+    RedisCommandHandler::Result result = RedisCommandHandler::OK;
+    if (ctx->handler_continue) {
+        result = ctx->handler_continue->Run(len, commands.get(), &output, 
is_last);
+        if (result == RedisCommandHandler::OK) {
+            ctx->handler_continue.reset(NULL);
+        } else if (result == RedisCommandHandler::BATCHED) {
+            LOG(ERROR) << "BATCHED should not be returned in redis transaction 
process.";
+            return -1;
+        }
+    } else {
+        std::string lcname = ToLowercase(commands[0]);
+        RedisCommandHandler* ch = 
ctx->redis_service->FindCommandHandler(lcname);
+        if (!ch) {
+            char buf[64];
+            snprintf(buf, sizeof(buf), "ERR unknown command `%s`", 
lcname.c_str());
+            output.SetError(buf);
+        } else {
+            result = ch->Run(len, commands.get(), &output, is_last);
+            if (result == RedisCommandHandler::CONTINUE) {
+                if (ctx->batched_size) {
+                    LOG(ERROR) << "CONTINUE should not be returned in redis 
batched process.";
+                    return -1;
+                }
+                ctx->handler_continue.reset(ch->NewTransactionHandler());
+            } else if (result == RedisCommandHandler::BATCHED) {
+                ctx->batched_size++;
+            }
+        }
+    }
+    if (result == RedisCommandHandler::OK && ctx->batched_size) {
+        if ((int)output.size() != (ctx->batched_size + 1)) {
+            LOG(ERROR) << "reply array size can't be matched with batched 
size, "
+                << " expected=" << ctx->batched_size + 1 << " actual=" << 
output.size();
+            return -1;
+        }
+        for (int i = 0; i < (int)output.size(); ++i) {
+            output[i].SerializeTo(sendbuf);
+        }
+        ctx->batched_size = 0;
+    } else if (result != RedisCommandHandler::BATCHED) {
+        output.SerializeTo(sendbuf);
+    } // else result == RedisCommandHandler::BATCHED, do not serialize to buf
+    return 0;
+}
+
+// ========== impl of RedisConnContext ==========
+
+RedisConnContext::~RedisConnContext() { }
+
+void RedisConnContext::Destroy() {
+    delete this;
+}
+
+// ========== impl of RedisConnContext ==========
+
 ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket,
-                              bool /*read_eof*/, const void* /*arg*/) {
-    if (source->empty()) {
+                              bool read_eof, const void* arg) {
+    if (read_eof || source->empty()) {
         return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
     }
-    // NOTE(gejun): PopPipelinedInfo() is actually more contended than what
-    // I thought before. The Socket._pipeline_q is a SPSC queue pushed before
-    // sending and popped when response comes back, being protected by a
-    // mutex. Previously the mutex is shared with Socket._id_wait_list. When
-    // 200 bthreads access one redis-server, ~1.5s in total is spent on
-    // contention in 10-second duration. If the mutex is separated, the time
-    // drops to ~0.25s. I further replaced PeekPipelinedInfo() with
-    // GivebackPipelinedInfo() to lock only once(when receiving response)
-    // in most cases, and the time decreases to ~0.14s.
-    PipelinedInfo pi;
-    if (!socket->PopPipelinedInfo(&pi)) {
-        LOG(WARNING) << "No corresponding PipelinedInfo in socket";
-        return MakeParseError(PARSE_ERROR_TRY_OTHERS);
-    }
-
-    do {
-        InputResponse* msg = 
static_cast<InputResponse*>(socket->parsing_context());
-        if (msg == NULL) {
-            msg = new InputResponse;
-            socket->reset_parsing_context(msg);
+    const Server* server = static_cast<const Server*>(arg);
+    if (server) {
+        RedisService* rs = server->options().redis_service;
+        if (!rs) {
+            return MakeParseError(PARSE_ERROR_TRY_OTHERS);
         }
+        RedisConnContext* ctx = 
static_cast<RedisConnContext*>(socket->parsing_context());
+        if (ctx == NULL) {
+            ctx = new RedisConnContext;
+            ctx->socket_id = socket->id();
+            ctx->redis_service = rs;
+            socket->reset_parsing_context(ctx);
+        }
+        butil::Arena arena;
+        std::unique_ptr<const char*[]> current_commands;
+        int current_len = 0;
+        butil::IOBuf sendbuf;
+        ParseError err = PARSE_OK;
 
-        const int consume_count = (pi.with_auth ? 1 : pi.count);
-
-        ParseError err = msg->response.ConsumePartialIOBuf(*source, 
consume_count);
+        err = ctx->parser.Consume(*source, &current_commands, &current_len, 
&arena);
         if (err != PARSE_OK) {
-            socket->GivebackPipelinedInfo(pi);
             return MakeParseError(err);
         }
+        while (true) {
+            std::unique_ptr<const char*[]> next_commands;
+            int next_len = 0;
+            err = ctx->parser.Consume(*source, &next_commands, &next_len, 
&arena);
+            if (err != PARSE_OK) {
+                break;
+            }
+            // safe to read first element.
+            // current_commands and next_commands both have at least one 
element(NULL).
+            bool is_last = (strcasecmp(current_commands[0], next_commands[0]) 
!= 0);
 
 Review comment:
   is_last应该不需要比较内容。

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org
For additional commands, e-mail: dev-h...@brpc.apache.org

Reply via email to