This is an automated email from the ASF dual-hosted git repository.
wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-brpc.git
The following commit(s) were added to refs/heads/master by this push:
new a6427c4b Fix a deadlock happened in ClearAbandonedStreamsImpl
path(issues/1778) (#1781)
a6427c4b is described below
commit a6427c4bce76d1024798da16b8625d57d9fbb65c
Author: Jiashun Zhu <[email protected]>
AuthorDate: Tue Jun 7 09:54:15 2022 +0200
Fix a deadlock happened in ClearAbandonedStreamsImpl path(issues/1778)
(#1781)
---
src/brpc/policy/http2_rpc_protocol.cpp | 23 +++++++++++++----------
src/brpc/policy/http2_rpc_protocol.h | 2 +-
2 files changed, 14 insertions(+), 11 deletions(-)
diff --git a/src/brpc/policy/http2_rpc_protocol.cpp
b/src/brpc/policy/http2_rpc_protocol.cpp
index ab2d2799..ed57bcfe 100644
--- a/src/brpc/policy/http2_rpc_protocol.cpp
+++ b/src/brpc/policy/http2_rpc_protocol.cpp
@@ -380,10 +380,17 @@ H2StreamContext* H2Context::RemoveStream(int stream_id) {
return NULL;
}
}
+ return sctx;
+}
+
+H2StreamContext* H2Context::RemoveStreamAndDeferWU(int stream_id) {
+ H2StreamContext* sctx = RemoveStream(stream_id);
// The remote stream will not send any more data, sending back the
// stream-level WINDOW_UPDATE is pointless, just move the value into
// the connection.
- DeferWindowUpdate(sctx->ReleaseDeferredWindowUpdate());
+ if (sctx) {
+ DeferWindowUpdate(sctx->ReleaseDeferredWindowUpdate());
+ }
return sctx;
}
@@ -516,7 +523,7 @@ ParseResult H2Context::Consume(
LOG(WARNING) << "Fail to send RST_STREAM to " << *_socket;
return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG);
}
- H2StreamContext* sctx = RemoveStream(h2_res.stream_id());
+ H2StreamContext* sctx = RemoveStreamAndDeferWU(h2_res.stream_id());
if (sctx) {
if (is_server_side()) {
delete sctx;
@@ -804,7 +811,7 @@ H2ParseResult H2StreamContext::OnResetStream(
return MakeH2Error(H2_PROTOCOL_ERROR);
}
#endif
- H2StreamContext* sctx = _conn_ctx->RemoveStream(stream_id());
+ H2StreamContext* sctx = _conn_ctx->RemoveStreamAndDeferWU(stream_id());
if (sctx == NULL) {
LOG(ERROR) << "Fail to find stream_id=" << stream_id();
return MakeH2Error(H2_PROTOCOL_ERROR);
@@ -831,7 +838,7 @@ H2ParseResult H2StreamContext::OnEndStream() {
return MakeH2Error(H2_PROTOCOL_ERROR);
}
#endif
- H2StreamContext* sctx = _conn_ctx->RemoveStream(stream_id());
+ H2StreamContext* sctx = _conn_ctx->RemoveStreamAndDeferWU(stream_id());
if (sctx == NULL) {
RPC_VLOG << "Fail to find stream_id=" << stream_id();
return MakeH2Message(NULL);
@@ -1140,20 +1147,16 @@ void H2Context::AddAbandonedStream(uint32_t stream_id) {
}
inline void H2Context::ClearAbandonedStreams() {
- if (!_abandoned_streams.empty()) {
- ClearAbandonedStreamsImpl();
- }
-}
-
-void H2Context::ClearAbandonedStreamsImpl() {
std::unique_lock<butil::Mutex> mu(_abandoned_streams_mutex);
while (!_abandoned_streams.empty()) {
const uint32_t stream_id = _abandoned_streams.back();
_abandoned_streams.pop_back();
+ mu.unlock();
H2StreamContext* sctx = RemoveStream(stream_id);
if (sctx != NULL) {
delete sctx;
}
+ mu.lock();
}
}
diff --git a/src/brpc/policy/http2_rpc_protocol.h
b/src/brpc/policy/http2_rpc_protocol.h
index 85747239..7e2061b4 100644
--- a/src/brpc/policy/http2_rpc_protocol.h
+++ b/src/brpc/policy/http2_rpc_protocol.h
@@ -370,11 +370,11 @@ friend void InitFrameHandlers();
H2ParseResult OnWindowUpdate(butil::IOBufBytesIterator&, const
H2FrameHead&);
H2ParseResult OnContinuation(butil::IOBufBytesIterator&, const
H2FrameHead&);
+ H2StreamContext* RemoveStreamAndDeferWU(int stream_id);
H2StreamContext* RemoveStream(int stream_id);
void RemoveGoAwayStreams(int goaway_stream_id,
std::vector<H2StreamContext*>* out_streams);
H2StreamContext* FindStream(int stream_id);
- void ClearAbandonedStreamsImpl();
// True if the connection is established by client, otherwise it's
// accepted by server.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]