jamesge commented on a change in pull request #972: Redis server protocol
URL: https://github.com/apache/incubator-brpc/pull/972#discussion_r356973158
##########
File path: src/brpc/policy/redis_protocol.cpp
##########
@@ -52,62 +58,206 @@ struct InputResponse : public InputMessageBase {
}
};
-// "Message" = "Response" as we only implement the client for redis.
-ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket,
- bool /*read_eof*/, const void* /*arg*/) {
- if (source->empty()) {
- return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
+// This class is as parsing_context in socket.
+class RedisConnContext : public Destroyable {
+public:
+ RedisConnContext()
+ : redis_service(NULL)
+ , handler_continue(NULL) {}
+ ~RedisConnContext();
+ // @Destroyable
+ void Destroy() override;
+
+ int Init();
+
+ SocketId socket_id;
+ RedisService* redis_service;
+ // If user starts a transaction, handler_continue indicates the
+ // first handler pointer that triggers the transaction.
+ RedisCommandHandler* handler_continue;
+ // The redis command are parsed and pushed into this queue
+ bthread::ExecutionQueueId<std::string*> queue;
+
+ RedisCommandParser parser;
+};
+
+int ConsumeTask(RedisConnContext* ctx, std::string* command, butil::IOBuf*
sendbuf) {
+ butil::Arena arena;
+ RedisReply output(&arena);
+ if (ctx->handler_continue) {
+ RedisCommandHandler::Result result =
+ ctx->handler_continue->Run(command->c_str(), &output);
+ if (result == RedisCommandHandler::OK) {
+ ctx->handler_continue = NULL;
+ }
+ } else {
+ std::string comm;
+ comm.reserve(8);
+ for (int i = 0; i < (int)command->size() && (*command)[i] != ' '; ++i)
{
+ comm.push_back(std::tolower((*command)[i]));
+ }
+ RedisCommandHandler* ch = ctx->redis_service->FindCommandHandler(comm);
+ if (!ch) {
+ char buf[64];
+ snprintf(buf, sizeof(buf), "ERR unknown command `%s`",
comm.c_str());
+ output.SetError(buf);
+ } else {
+ RedisCommandHandler::Result result = ch->Run(command->c_str(),
&output);
+ if (result == RedisCommandHandler::CONTINUE) {
+ ctx->handler_continue = ch;
+ }
+ }
+ }
+ output.SerializeToIOBuf(sendbuf);
+ return 0;
+}
+
+int Consume(void* ctx, bthread::TaskIterator<std::string*>& iter) {
+ RedisConnContext* qctx = static_cast<RedisConnContext*>(ctx);
+ if (iter.is_queue_stopped()) {
+ delete qctx;
+ return 0;
+ }
+ SocketUniquePtr s;
+ bool has_err = false;
+ if (Socket::Address(qctx->socket_id, &s) != 0) {
+ LOG(WARNING) << "Fail to address redis socket";
Review comment:
这个address应该在插入execq时就address好?那样可以在外面用Readdress
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]