This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop-cpp
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/develop-cpp by this push:
new cd87b5d2 fix: release write hold when OnReadDone with ok=false and
there is no inflight write
cd87b5d2 is described below
commit cd87b5d26eae581a35018ad39102f49c8f59865a
Author: Li Zhanhui <[email protected]>
AuthorDate: Mon Mar 25 22:57:20 2024 +0800
fix: release write hold when OnReadDone with ok=false and there is no
inflight write
Signed-off-by: Li Zhanhui <[email protected]>
---
cpp/source/client/TelemetryBidiReactor.cpp | 46 ++++++++++++++++--------
cpp/source/client/include/TelemetryBidiReactor.h | 29 +++++++--------
2 files changed, 44 insertions(+), 31 deletions(-)
diff --git a/cpp/source/client/TelemetryBidiReactor.cpp
b/cpp/source/client/TelemetryBidiReactor.cpp
index b5eeb616..d45e5058 100644
--- a/cpp/source/client/TelemetryBidiReactor.cpp
+++ b/cpp/source/client/TelemetryBidiReactor.cpp
@@ -65,7 +65,17 @@ bool TelemetryBidiReactor::await() {
return server_setting_received_;
}
+void TelemetryBidiReactor::changeStreamStateThenNotify(StreamState state) {
+ absl::MutexLock lk(&stream_state_mtx_);
+ if (state == stream_state_) {
+ return;
+ }
+ stream_state_ = state;
+ stream_state_cv_.SignalAll();
+}
+
void TelemetryBidiReactor::OnWriteDone(bool ok) {
+ SPDLOG_DEBUG("{}#OnWriteDone", peer_address_);
{
bool expected = true;
if (!command_inflight_.compare_exchange_strong(expected, false,
std::memory_order_relaxed)) {
@@ -103,16 +113,20 @@ void TelemetryBidiReactor::OnWriteDone(bool ok) {
}
void TelemetryBidiReactor::OnReadDone(bool ok) {
+ SPDLOG_DEBUG("{}#OnReadDone", peer_address_);
if (!ok) {
RemoveHold();
{
absl::MutexLock lk(&stream_state_mtx_);
- if (!ok) {
- if (streamStateGood()) {
- stream_state_ = StreamState::ReadFailure;
- SPDLOG_WARN("Faild to read from telemetry stream from {}",
peer_address_);
- }
- return;
+ if (streamStateGood() && stream_state_ != StreamState::Closing) {
+ stream_state_ = StreamState::ReadFailure;
+ SPDLOG_WARN("Faild to read from telemetry stream from {}",
peer_address_);
+ }
+
+ bool expected = false;
+ if (command_inflight_.compare_exchange_strong(expected, true,
std::memory_order_relaxed)) {
+ // There is no inflight write request, remove write hold on its behalf.
+ RemoveHold();
}
}
return;
@@ -296,7 +310,7 @@ void TelemetryBidiReactor::fireRead() {
}
bool expected = false;
- if (read_stream_started_.compare_exchange_weak(expected, true,
std::memory_order_relaxed)) {
+ if (read_stream_started_.compare_exchange_strong(expected, true,
std::memory_order_relaxed)) {
// Hold for the read stream.
AddHold();
StartRead(&read_);
@@ -304,6 +318,7 @@ void TelemetryBidiReactor::fireRead() {
}
void TelemetryBidiReactor::write(TelemetryCommand command) {
+ SPDLOG_DEBUG("{}#write", peer_address_);
{
absl::MutexLock lk(&stream_state_mtx_);
// Reject incoming write commands if the stream state is closing or has
witnessed some error.
@@ -320,6 +335,7 @@ void TelemetryBidiReactor::write(TelemetryCommand command) {
}
void TelemetryBidiReactor::tryWriteNext() {
+ SPDLOG_DEBUG("{}#tryWriteNext", peer_address_);
{
absl::MutexLock lk(&stream_state_mtx_);
if (!streamStateGood()) {
@@ -362,7 +378,6 @@ void TelemetryBidiReactor::tryWriteNext() {
void TelemetryBidiReactor::fireClose() {
SPDLOG_INFO("{}#fireClose", peer_address_);
-
{
absl::MutexLock lk(&stream_state_mtx_);
if (!streamStateGood()) {
@@ -377,9 +392,16 @@ void TelemetryBidiReactor::fireClose() {
stream_state_ = StreamState::Closing;
}
tryWriteNext();
+ {
+ absl::MutexLock lk(&stream_state_mtx_);
+ if (stream_state_cv_.WaitWithTimeout(&stream_state_mtx_,
absl::Seconds(3))) {
+ SPDLOG_WARN("StreamState CondVar timed out before getting signalled");
+ }
+ }
}
void TelemetryBidiReactor::OnWritesDoneDone(bool ok) {
+ SPDLOG_DEBUG("{}#OnWritesDoneDone", peer_address_);
// Remove the hold for the write stream.
RemoveHold();
@@ -419,12 +441,7 @@ void TelemetryBidiReactor::OnDone(const grpc::Status&
status) {
{
SPDLOG_DEBUG("{} notifies awaiting close call", peer_address_);
- absl::MutexLock lk(&stream_state_mtx_);
- if (streamStateGood()) {
- stream_state_ = StreamState::Closed;
- }
-
- stream_state_cv_.SignalAll();
+ changeStreamStateThenNotify(StreamState::Closed);
}
auto client = client_.lock();
@@ -438,6 +455,7 @@ void TelemetryBidiReactor::OnDone(const grpc::Status&
status) {
}
void TelemetryBidiReactor::OnReadInitialMetadataDone(bool ok) {
+ SPDLOG_DEBUG("{}#OnReadInitialMetadataDone", peer_address_);
if (!ok) {
absl::MutexLock lk(&stream_state_mtx_);
if (streamStateGood()) {
diff --git a/cpp/source/client/include/TelemetryBidiReactor.h
b/cpp/source/client/include/TelemetryBidiReactor.h
index 90d2f63d..63c6f5e6 100644
--- a/cpp/source/client/include/TelemetryBidiReactor.h
+++ b/cpp/source/client/include/TelemetryBidiReactor.h
@@ -32,7 +32,8 @@
ROCKETMQ_NAMESPACE_BEGIN
-enum class StreamState : std::uint8_t {
+enum class StreamState : std::uint8_t
+{
Active = 0,
/// Once the stream enters this state, new write requests shall be rejected
@@ -48,13 +49,10 @@ enum class StreamState : std::uint8_t {
WriteFailure = 5,
};
-class TelemetryBidiReactor
- : public grpc::ClientBidiReactor<TelemetryCommand, TelemetryCommand>,
- public std::enable_shared_from_this<TelemetryBidiReactor> {
+class TelemetryBidiReactor : public grpc::ClientBidiReactor<TelemetryCommand,
TelemetryCommand>,
+ public
std::enable_shared_from_this<TelemetryBidiReactor> {
public:
- TelemetryBidiReactor(std::weak_ptr<Client> client,
- rmq::MessagingService::Stub *stub,
- std::string peer_address);
+ TelemetryBidiReactor(std::weak_ptr<Client> client,
rmq::MessagingService::Stub* stub, std::string peer_address);
~TelemetryBidiReactor();
@@ -65,7 +63,7 @@ public:
/// (like failure to remove a hold).
///
/// \param[in] s The status outcome of this RPC
- void OnDone(const grpc::Status &status) override;
+ void OnDone(const grpc::Status& status) override;
/// Notifies the application that a read of initial metadata from the
/// server is done. If the application chooses not to implement this method,
@@ -99,8 +97,6 @@ public:
/// further Start* should be called.
void OnWritesDoneDone(bool ok) override;
-
-
/// Core API method to initiate this bidirectional stream.
void write(TelemetryCommand command);
@@ -150,18 +146,17 @@ private:
absl::Mutex server_setting_received_mtx_;
absl::CondVar server_setting_received_cv_;
+ void changeStreamStateThenNotify(StreamState state);
+
void onVerifyMessageResult(TelemetryCommand command);
- void applySettings(const rmq::Settings &settings);
+ void applySettings(const rmq::Settings& settings);
- void applyBackoffPolicy(const rmq::Settings &settings,
- std::shared_ptr<Client> &client);
+ void applyBackoffPolicy(const rmq::Settings& settings,
std::shared_ptr<Client>& client);
- void applyPublishingConfig(const rmq::Settings &settings,
- std::shared_ptr<Client> client);
+ void applyPublishingConfig(const rmq::Settings& settings,
std::shared_ptr<Client> client);
- void applySubscriptionConfig(const rmq::Settings &settings,
- std::shared_ptr<Client> client);
+ void applySubscriptionConfig(const rmq::Settings& settings,
std::shared_ptr<Client> client);
/**
* Indicate if the underlying gRPC bidirectional stream is good enough to
fire