This is an automated email from the ASF dual-hosted git repository.
zychen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-brpc.git
The following commit(s) were added to refs/heads/master by this push:
new f0fad0e Fix a bug that server will send unexpected data frame to
client if there are errors occur during processing stream create request
new 206864b Merge pull request #1516 from
jenrryyou/fix_parse_create_stream_rpc_error
f0fad0e is described below
commit f0fad0e8adeee736623a578318788a440f82b45f
Author: youzhiyuan <[email protected]>
AuthorDate: Thu Aug 12 14:36:32 2021 +0800
Fix a bug that server will send unexpected data frame to client if there
are errors occur during processing stream create request
---
src/brpc/policy/baidu_rpc_protocol.cpp | 28 +++++++++++++++++++---------
1 file changed, 19 insertions(+), 9 deletions(-)
diff --git a/src/brpc/policy/baidu_rpc_protocol.cpp
b/src/brpc/policy/baidu_rpc_protocol.cpp
index 23a729f..8025e77 100644
--- a/src/brpc/policy/baidu_rpc_protocol.cpp
+++ b/src/brpc/policy/baidu_rpc_protocol.cpp
@@ -228,10 +228,12 @@ void SendRpcResponse(int64_t correlation_id,
if (span) {
span->set_response_size(res_buf.size());
}
- if (stream_ptr) {
- CHECK(accessor.remote_stream_settings() != NULL);
+ // Send rpc response over stream even if server side failed to create
+ // stream for some reasons.
+ if(cntl->has_remote_stream()){
// Send the response over stream to notify that this stream connection
// is successfully built.
+ // Response_stream can be INVALID_STREAM_ID when error occurs.
if (SendStreamData(sock, &res_buf,
accessor.remote_stream_settings()->stream_id(),
accessor.response_stream()) != 0) {
@@ -239,13 +241,18 @@ void SendRpcResponse(int64_t correlation_id,
PLOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " <<
*sock;
cntl->SetFailed(errcode, "Fail to write into %s",
sock->description().c_str());
- ((Stream*)stream_ptr->conn())->Close();
+ if(stream_ptr) {
+ ((Stream*)stream_ptr->conn())->Close();
+ }
return;
}
- // Now it's ok the mark this server-side stream as connectted as all
the
- // written user data would follower the RPC response.
- ((Stream*)stream_ptr->conn())->SetConnected();
- } else {
+
+ if(stream_ptr) {
+ // Now it's ok the mark this server-side stream as connectted as
all the
+ // written user data would follower the RPC response.
+ ((Stream*)stream_ptr->conn())->SetConnected();
+ }
+ } else{
// Have the risk of unlimited pending responses, in which case, tell
// users to set max_concurrency.
Socket::WriteOptions wopt;
@@ -552,18 +559,21 @@ void ProcessRpcResponse(InputMessageBase* msg_base) {
const bthread_id_t cid = { static_cast<uint64_t>(meta.correlation_id()) };
Controller* cntl = NULL;
+
+ StreamId remote_stream_id = meta.has_stream_settings() ?
meta.stream_settings().stream_id(): INVALID_STREAM_ID;
+
const int rc = bthread_id_lock(cid, (void**)&cntl);
if (rc != 0) {
LOG_IF(ERROR, rc != EINVAL && rc != EPERM)
<< "Fail to lock correlation_id=" << cid << ": " << berror(rc);
- if (meta.has_stream_settings()) {
+ if (remote_stream_id != INVALID_STREAM_ID) {
SendStreamRst(msg->socket(), meta.stream_settings().stream_id());
}
return;
}
ControllerPrivateAccessor accessor(cntl);
- if (meta.has_stream_settings()) {
+ if (remote_stream_id != INVALID_STREAM_ID) {
accessor.set_remote_stream_settings(
new StreamSettings(meta.stream_settings()));
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]