jamesge commented on a change in pull request #972: Redis server protocol URL: https://github.com/apache/incubator-brpc/pull/972#discussion_r358587955
########## File path: src/brpc/policy/redis_protocol.cpp ########## @@ -52,62 +54,202 @@ struct InputResponse : public InputMessageBase { } }; -// "Message" = "Response" as we only implement the client for redis. +// This class is as parsing_context in socket. +class RedisConnContext : public Destroyable { +public: + RedisConnContext() + : redis_service(NULL) + , batched_size(0) {} + + ~RedisConnContext(); + // @Destroyable + void Destroy() override; + + SocketId socket_id; + RedisService* redis_service; + // If user starts a transaction, handler_continue indicates the + // handler pointer that runs the transaction command. + std::unique_ptr<RedisCommandHandler> handler_continue; + // >0 if command handler is run in batched mode. + int batched_size; + + RedisCommandParser parser; +}; + +static std::string ToLowercase(const std::string& command) { + std::string res; + res.resize(command.size()); + std::transform(command.begin(), command.end(), res.begin(), + [](unsigned char c){ return std::tolower(c); }); + return res; +} + +int ConsumeCommand(RedisConnContext* ctx, + const std::unique_ptr<const char*[]>& commands, + int len, butil::Arena* arena, + bool is_last, + butil::IOBuf* sendbuf) { + RedisReply output(arena); + RedisCommandHandler::Result result = RedisCommandHandler::OK; + if (ctx->handler_continue) { + result = ctx->handler_continue->Run(len, commands.get(), &output, is_last); + if (result == RedisCommandHandler::OK) { + ctx->handler_continue.reset(NULL); + } else if (result == RedisCommandHandler::BATCHED) { + LOG(ERROR) << "BATCHED should not be returned in redis transaction process."; + return -1; + } + } else { + std::string lcname = ToLowercase(commands[0]); + RedisCommandHandler* ch = ctx->redis_service->FindCommandHandler(lcname); + if (!ch) { + char buf[64]; + snprintf(buf, sizeof(buf), "ERR unknown command `%s`", lcname.c_str()); + output.SetError(buf); + } else { + result = ch->Run(len, commands.get(), &output, is_last); + if (result == RedisCommandHandler::CONTINUE) { + if (ctx->batched_size) { + LOG(ERROR) << "CONTINUE should not be returned in redis batched process."; + return -1; + } + ctx->handler_continue.reset(ch->NewTransactionHandler()); + } else if (result == RedisCommandHandler::BATCHED) { + ctx->batched_size++; + } + } + } + if (result == RedisCommandHandler::OK && ctx->batched_size) { + if ((int)output.size() != (ctx->batched_size + 1)) { + LOG(ERROR) << "reply array size can't be matched with batched size, " + << " expected=" << ctx->batched_size + 1 << " actual=" << output.size(); + return -1; + } + for (int i = 0; i < (int)output.size(); ++i) { + output[i].SerializeTo(sendbuf); + } + ctx->batched_size = 0; + } else if (result != RedisCommandHandler::BATCHED) { + output.SerializeTo(sendbuf); + } // else result == RedisCommandHandler::BATCHED, do not serialize to buf + return 0; +} + +// ========== impl of RedisConnContext ========== + +RedisConnContext::~RedisConnContext() { } + +void RedisConnContext::Destroy() { + delete this; +} + +// ========== impl of RedisConnContext ========== + ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket, - bool /*read_eof*/, const void* /*arg*/) { - if (source->empty()) { + bool read_eof, const void* arg) { + if (read_eof || source->empty()) { return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA); } - // NOTE(gejun): PopPipelinedInfo() is actually more contended than what - // I thought before. The Socket._pipeline_q is a SPSC queue pushed before - // sending and popped when response comes back, being protected by a - // mutex. Previously the mutex is shared with Socket._id_wait_list. When - // 200 bthreads access one redis-server, ~1.5s in total is spent on - // contention in 10-second duration. If the mutex is separated, the time - // drops to ~0.25s. I further replaced PeekPipelinedInfo() with - // GivebackPipelinedInfo() to lock only once(when receiving response) - // in most cases, and the time decreases to ~0.14s. - PipelinedInfo pi; - if (!socket->PopPipelinedInfo(&pi)) { - LOG(WARNING) << "No corresponding PipelinedInfo in socket"; - return MakeParseError(PARSE_ERROR_TRY_OTHERS); - } - - do { - InputResponse* msg = static_cast<InputResponse*>(socket->parsing_context()); - if (msg == NULL) { - msg = new InputResponse; - socket->reset_parsing_context(msg); + const Server* server = static_cast<const Server*>(arg); + if (server) { + RedisService* rs = server->options().redis_service; + if (!rs) { + return MakeParseError(PARSE_ERROR_TRY_OTHERS); } + RedisConnContext* ctx = static_cast<RedisConnContext*>(socket->parsing_context()); + if (ctx == NULL) { + ctx = new RedisConnContext; + ctx->socket_id = socket->id(); + ctx->redis_service = rs; + socket->reset_parsing_context(ctx); + } + butil::Arena arena; + std::unique_ptr<const char*[]> current_commands; + int current_len = 0; + butil::IOBuf sendbuf; + ParseError err = PARSE_OK; - const int consume_count = (pi.with_auth ? 1 : pi.count); - - ParseError err = msg->response.ConsumePartialIOBuf(*source, consume_count); + err = ctx->parser.Consume(*source, ¤t_commands, ¤t_len, &arena); if (err != PARSE_OK) { - socket->GivebackPipelinedInfo(pi); return MakeParseError(err); } + while (true) { + std::unique_ptr<const char*[]> next_commands; + int next_len = 0; + err = ctx->parser.Consume(*source, &next_commands, &next_len, &arena); + if (err != PARSE_OK) { + break; + } + // safe to read first element. + // current_commands and next_commands both have at least one element(NULL). + bool is_last = (strcasecmp(current_commands[0], next_commands[0]) != 0); Review comment: is_last应该不需要比较内容。 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org