jamesge commented on a change in pull request #972: Redis server protocol
URL: https://github.com/apache/incubator-brpc/pull/972#discussion_r356953916
 
 

 ##########
 File path: src/brpc/policy/redis_protocol.cpp
 ##########
 @@ -52,62 +58,206 @@ struct InputResponse : public InputMessageBase {
     }
 };
 
-// "Message" = "Response" as we only implement the client for redis.
-ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket,
-                              bool /*read_eof*/, const void* /*arg*/) {
-    if (source->empty()) {
-        return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
+// This class is as parsing_context in socket.
+class RedisConnContext : public Destroyable  {
+public:
+    RedisConnContext()
+        : redis_service(NULL)
+        , handler_continue(NULL) {}
+    ~RedisConnContext();
+    // @Destroyable
+    void Destroy() override;
+
+    int Init();
+
+    SocketId socket_id;
+    RedisService* redis_service;
+    // If user starts a transaction, handler_continue indicates the
+    // first handler pointer that triggers the transaction.
+    RedisCommandHandler* handler_continue;
+    // The redis command are parsed and pushed into this queue
+    bthread::ExecutionQueueId<std::string*> queue;
+
+    RedisCommandParser parser;
+};
+
+int ConsumeTask(RedisConnContext* ctx, std::string* command, butil::IOBuf* 
sendbuf) {
+    butil::Arena arena;
+    RedisReply output(&arena);
+    if (ctx->handler_continue) {
+        RedisCommandHandler::Result result =
+            ctx->handler_continue->Run(command->c_str(), &output);
+        if (result == RedisCommandHandler::OK) {
+            ctx->handler_continue = NULL;
+        }
+    } else {
+        std::string comm;
+        comm.reserve(8);
+        for (int i = 0; i < (int)command->size() && (*command)[i] != ' '; ++i) 
{
+            comm.push_back(std::tolower((*command)[i]));
+        }
+        RedisCommandHandler* ch = ctx->redis_service->FindCommandHandler(comm);
+        if (!ch) {
+            char buf[64];
+            snprintf(buf, sizeof(buf), "ERR unknown command `%s`", 
comm.c_str());
+            output.SetError(buf);
+        } else {
+            RedisCommandHandler::Result result = ch->Run(command->c_str(), 
&output);
+            if (result == RedisCommandHandler::CONTINUE) {
+                ctx->handler_continue = ch;
+            }
+        }
+    }
+    output.SerializeToIOBuf(sendbuf);
+    return 0;
+}
+
+int Consume(void* ctx, bthread::TaskIterator<std::string*>& iter) {
+    RedisConnContext* qctx = static_cast<RedisConnContext*>(ctx);
+    if (iter.is_queue_stopped()) {
+        delete qctx;
+        return 0;
+    }
+    SocketUniquePtr s;
+    bool has_err = false;
+    if (Socket::Address(qctx->socket_id, &s) != 0) {
+        LOG(WARNING) << "Fail to address redis socket";
+        has_err = true;
     }
-    // NOTE(gejun): PopPipelinedInfo() is actually more contended than what
-    // I thought before. The Socket._pipeline_q is a SPSC queue pushed before
-    // sending and popped when response comes back, being protected by a
-    // mutex. Previously the mutex is shared with Socket._id_wait_list. When
-    // 200 bthreads access one redis-server, ~1.5s in total is spent on
-    // contention in 10-second duration. If the mutex is separated, the time
-    // drops to ~0.25s. I further replaced PeekPipelinedInfo() with
-    // GivebackPipelinedInfo() to lock only once(when receiving response)
-    // in most cases, and the time decreases to ~0.14s.
-    PipelinedInfo pi;
-    if (!socket->PopPipelinedInfo(&pi)) {
-        LOG(WARNING) << "No corresponding PipelinedInfo in socket";
-        return MakeParseError(PARSE_ERROR_TRY_OTHERS);
-    }
-
-    do {
-        InputResponse* msg = 
static_cast<InputResponse*>(socket->parsing_context());
-        if (msg == NULL) {
-            msg = new InputResponse;
-            socket->reset_parsing_context(msg);
+    Socket::WriteOptions wopt;
+    wopt.ignore_eovercrowded = true;
+    butil::IOBuf sendbuf;
+    for (; iter; ++iter) {
+        std::unique_ptr<std::string> guard(*iter);
+        if (has_err) {
+            continue;
         }
+        ConsumeTask(qctx, *iter, &sendbuf);
+        // If there are too many tasks to execute, latency of the front
+        // responses will be increased by waiting the following tasks to
+        // be completed. To prevent this, if the current buf size is greater
+        // than FLAGS_redis_batch_flush_max_size, we just write the current
+        // buf first.
+        if ((int)sendbuf.size() >= FLAGS_redis_batch_flush_data_size) {
+            LOG_IF(WARNING, s->Write(&sendbuf, &wopt) != 0)
+                << "Fail to send redis reply";
+        }
+    }
+    if (!has_err && !sendbuf.empty()) {
+        LOG_IF(WARNING, s->Write(&sendbuf, &wopt) != 0)
+            << "Fail to send redis reply";
+    }
+    return 0;
+}
+
+// ========== impl of RedisConnContext ==========
 
-        const int consume_count = (pi.with_auth ? 1 : pi.count);
+RedisConnContext::~RedisConnContext() { }
 
-        ParseError err = msg->response.ConsumePartialIOBuf(*source, 
consume_count);
+void RedisConnContext::Destroy() {
+    bthread::execution_queue_stop(queue);
+}
+
+int RedisConnContext::Init() {
+    bthread::ExecutionQueueOptions q_opt;
+    q_opt.bthread_attr =
+        FLAGS_usercode_in_pthread ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL;
+    if (bthread::execution_queue_start(&queue, &q_opt, Consume, this) != 0) {
+        LOG(ERROR) << "Fail to start execution queue";
+        return -1;
+    }
+    return 0;
+}
+
+// ========== impl of RedisConnContext ==========
+
+ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket,
+                              bool read_eof, const void* arg) {
+    if (read_eof || source->empty()) {
+        return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
+    }
+    const Server* server = static_cast<const Server*>(arg);
+    if (server) {
+        RedisService* rs = server->options().redis_service;
+        if (!rs) {
+            return MakeParseError(PARSE_ERROR_TRY_OTHERS);
+        }
+        RedisConnContext* ctx = 
static_cast<RedisConnContext*>(socket->parsing_context());
+        if (ctx == NULL) {
+            ctx = new RedisConnContext;
+            ctx->socket_id = socket->id();
+            ctx->redis_service = rs;
+            if (ctx->Init() != 0) {
+                delete ctx;
+                LOG(ERROR) << "Fail to init redis RedisConnContext";
+                return MakeParseError(PARSE_ERROR_NO_RESOURCE);
+            }
+            socket->reset_parsing_context(ctx);
+        }
+        ParseError err = ctx->parser.Parse(*source);
         if (err != PARSE_OK) {
-            socket->GivebackPipelinedInfo(pi);
             return MakeParseError(err);
         }
+        std::unique_ptr<std::string> command(new std::string);
 
 Review comment:
   new一个string看起来大概率是不必要、不太正确的做法。要么是有个专用的结构体,方便以后扩展字段,要么是传值?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to