jamesge commented on a change in pull request #972: Redis server protocol
URL: https://github.com/apache/incubator-brpc/pull/972#discussion_r354708023
 
 

 ##########
 File path: src/brpc/policy/redis_protocol.cpp
 ##########
 @@ -52,62 +57,237 @@ struct InputResponse : public InputMessageBase {
     }
 };
 
-// "Message" = "Response" as we only implement the client for redis.
-ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket,
-                              bool /*read_eof*/, const void* /*arg*/) {
-    if (source->empty()) {
-        return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
+static bool ParseArgs(const RedisReply& message, std::ostringstream& os) {
+    if (!message.is_array() || message.size() == 0) {
+        LOG(WARNING) << "request message is not array or size equals to zero";
+        return false;
     }
-    // NOTE(gejun): PopPipelinedInfo() is actually more contended than what
-    // I thought before. The Socket._pipeline_q is a SPSC queue pushed before
-    // sending and popped when response comes back, being protected by a
-    // mutex. Previously the mutex is shared with Socket._id_wait_list. When
-    // 200 bthreads access one redis-server, ~1.5s in total is spent on
-    // contention in 10-second duration. If the mutex is separated, the time
-    // drops to ~0.25s. I further replaced PeekPipelinedInfo() with
-    // GivebackPipelinedInfo() to lock only once(when receiving response)
-    // in most cases, and the time decreases to ~0.14s.
-    PipelinedInfo pi;
-    if (!socket->PopPipelinedInfo(&pi)) {
-        LOG(WARNING) << "No corresponding PipelinedInfo in socket";
-        return MakeParseError(PARSE_ERROR_TRY_OTHERS);
-    }
-
-    do {
-        InputResponse* msg = 
static_cast<InputResponse*>(socket->parsing_context());
-        if (msg == NULL) {
-            msg = new InputResponse;
-            socket->reset_parsing_context(msg);
+    for (size_t i = 0; i < message.size(); ++i) {
+        if (!message[i].is_string()) {
+            LOG(WARNING) << "request message[" << i << "] is not array";
+            return false;
+        }
+        if (i != 0) {
+            os << " ";
         }
+        os << message[i].c_str();
+    }
+    return true;
+}
+
+struct RedisTask {
+    RedisReply input_message;
+    butil::Arena arena;
+};
 
-        const int consume_count = (pi.with_auth ? 1 : pi.count);
+// This class is as parsing_context in socket.
+class RedisConnContext : public SharedObject
+                       , public Destroyable  {
+public:
+    RedisConnContext()
+        : handler_continue(NULL) {}
+    ~RedisConnContext();
+    // @Destroyable
+    void Destroy();
 
-        ParseError err = msg->response.ConsumePartialIOBuf(*source, 
consume_count);
+    int Init();
+
+    SocketId socket_id;
+    RedisService::CommandMap command_map;
+    // If user starts a transaction, handler_continue indicates the
+    // first handler pointer that triggers the transaction.
+    RedisCommandHandler* handler_continue;
+    // The redis command are parsed and pushed into this queue
+    bthread::ExecutionQueueId<RedisTask*> queue;
+
+    RedisReply parsing_message;
+    butil::Arena arena;
+};
+
+int ConsumeTask(RedisConnContext* ctx, RedisTask* task, butil::IOBuf* sendbuf) 
{
+    RedisReply output(&task->arena);
+    std::ostringstream os;
+    if (!ParseArgs(task->input_message, os)) {
+        LOG(ERROR) << "ERR command not string";
+        output.SetError("ERR command not string");
+        return -1;
+    }
+    if (ctx->handler_continue) {
+        RedisCommandHandler::Result result =
+            ctx->handler_continue->Run(os.str().c_str(), &output);
+        if (result == RedisCommandHandler::OK) {
+            ctx->handler_continue = NULL;
+        }
+    } else {
+        std::string comm;
+        comm.reserve(8);
+        for (const char* c = task->input_message[0].c_str(); *c; ++c) {
+            comm.push_back(std::tolower(*c));
+        }
 
 Review comment:
   没有现成函数么?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org
For additional commands, e-mail: dev-h...@brpc.apache.org

Reply via email to