rickonji opened a new issue #1079: stream模式,client调用StreamClose()后server中如果还有未处理的数据,将被直接丢弃 URL: https://github.com/apache/incubator-brpc/issues/1079 **Describe the bug (描述bug)** stream模式,client调用StreamClose()后server中如果还有未处理的数据,将被直接丢弃。 **To Reproduce (复现方法)** 如上 **Expected behavior (期望行为)** 按照官方文档说明,调用StreamClose后,如果pending_buf中还有数据,会先处理完再执行on_close。 看了源码,这里应该是个bug: `int Stream::OnReceived(const StreamFrameMeta& fm, butil::IOBuf *buf, Socket* sock) { if (_host_socket == NULL) { if (SetHostSocket(sock) != 0) { return -1; } } switch (fm.frame_type()) { case FRAME_TYPE_DATA: if (_pending_buf != NULL) { _pending_buf->append(*buf); buf->clear(); } else { _pending_buf = new butil::IOBuf; _pending_buf->swap(*buf); } if (!fm.has_continuation()) { butil::IOBuf *tmp = _pending_buf; _pending_buf = NULL; if (bthread::execution_queue_execute(_consumer_queue, tmp) != 0) { CHECK(false) << "Fail to push into channel"; delete tmp; Close(); } } break; case FRAME_TYPE_CLOSE: RPC_VLOG << "stream=" << id() << " recevied close frame"; // TODO:: See the comments in Consume Close(); break; } return 0; }` DATA包会被放入execute_queue里按序执行,如果还没来得及处理完Close包就来了,那exec queue里的数据就到不了on_received_message()了,直接去了on_close()了 ` int Stream::Consume(void *meta, bthread::TaskIterator<butil::IOBuf*>& iter) { Stream* s = (Stream*)meta; s->StopIdleTimer(); if (iter.is_queue_stopped()) { // indicating the queue was closed if (s->_host_socket) { DereferenceSocket(s->_host_socket); s->_host_socket = NULL; } if (s->_options.handler != NULL) { s->_options.handler->on_closed(s->id()); } delete s; return 0; } DEFINE_SMALL_ARRAY(butil::IOBuf*, buf_list, s->_options.messages_in_batch, 256); MessageBatcher mb(buf_list, s->_options.messages_in_batch, s); bool has_timeout_task = false; for (; iter; ++iter) { butil::IOBuf* t= *iter; if (t == TIMEOUT_TASK) { has_timeout_task = true; } else { if (s->_parse_rpc_response) { s->_parse_rpc_response = false; s->HandleRpcResponse(t); } else { mb.push(t); } } } if (s->_options.handler != NULL) { if (has_timeout_task && mb.total_length() == 0) { s->_options.handler->on_idle_timeout(s->id()); } } mb.flush(); `
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
