This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop-cpp
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/develop-cpp by this push:
     new cd87b5d2 fix: release write hold when OnReadDone with ok=false and 
there is no inflight write
cd87b5d2 is described below

commit cd87b5d26eae581a35018ad39102f49c8f59865a
Author: Li Zhanhui <[email protected]>
AuthorDate: Mon Mar 25 22:57:20 2024 +0800

    fix: release write hold when OnReadDone with ok=false and there is no 
inflight write
    
    Signed-off-by: Li Zhanhui <[email protected]>
---
 cpp/source/client/TelemetryBidiReactor.cpp       | 46 ++++++++++++++++--------
 cpp/source/client/include/TelemetryBidiReactor.h | 29 +++++++--------
 2 files changed, 44 insertions(+), 31 deletions(-)

diff --git a/cpp/source/client/TelemetryBidiReactor.cpp 
b/cpp/source/client/TelemetryBidiReactor.cpp
index b5eeb616..d45e5058 100644
--- a/cpp/source/client/TelemetryBidiReactor.cpp
+++ b/cpp/source/client/TelemetryBidiReactor.cpp
@@ -65,7 +65,17 @@ bool TelemetryBidiReactor::await() {
   return server_setting_received_;
 }
 
+void TelemetryBidiReactor::changeStreamStateThenNotify(StreamState state) {
+  absl::MutexLock lk(&stream_state_mtx_);
+  if (state == stream_state_) {
+    return;
+  }
+  stream_state_ = state;
+  stream_state_cv_.SignalAll();
+}
+
 void TelemetryBidiReactor::OnWriteDone(bool ok) {
+  SPDLOG_DEBUG("{}#OnWriteDone", peer_address_);
   {
     bool expected = true;
     if (!command_inflight_.compare_exchange_strong(expected, false, 
std::memory_order_relaxed)) {
@@ -103,16 +113,20 @@ void TelemetryBidiReactor::OnWriteDone(bool ok) {
 }
 
 void TelemetryBidiReactor::OnReadDone(bool ok) {
+  SPDLOG_DEBUG("{}#OnReadDone", peer_address_);
   if (!ok) {
     RemoveHold();
     {
       absl::MutexLock lk(&stream_state_mtx_);
-      if (!ok) {
-        if (streamStateGood()) {
-          stream_state_ = StreamState::ReadFailure;
-          SPDLOG_WARN("Faild to read from telemetry stream from {}", 
peer_address_);
-        }
-        return;
+      if (streamStateGood() && stream_state_ != StreamState::Closing) {
+        stream_state_ = StreamState::ReadFailure;
+        SPDLOG_WARN("Faild to read from telemetry stream from {}", 
peer_address_);
+      }
+
+      bool expected = false;
+      if (command_inflight_.compare_exchange_strong(expected, true, 
std::memory_order_relaxed)) {
+        // There is no inflight write request, remove write hold on its behalf.
+        RemoveHold();
       }
     }
     return;
@@ -296,7 +310,7 @@ void TelemetryBidiReactor::fireRead() {
   }
 
   bool expected = false;
-  if (read_stream_started_.compare_exchange_weak(expected, true, 
std::memory_order_relaxed)) {
+  if (read_stream_started_.compare_exchange_strong(expected, true, 
std::memory_order_relaxed)) {
     // Hold for the read stream.
     AddHold();
     StartRead(&read_);
@@ -304,6 +318,7 @@ void TelemetryBidiReactor::fireRead() {
 }
 
 void TelemetryBidiReactor::write(TelemetryCommand command) {
+  SPDLOG_DEBUG("{}#write", peer_address_);
   {
     absl::MutexLock lk(&stream_state_mtx_);
     // Reject incoming write commands if the stream state is closing or has 
witnessed some error.
@@ -320,6 +335,7 @@ void TelemetryBidiReactor::write(TelemetryCommand command) {
 }
 
 void TelemetryBidiReactor::tryWriteNext() {
+  SPDLOG_DEBUG("{}#tryWriteNext", peer_address_);
   {
     absl::MutexLock lk(&stream_state_mtx_);
     if (!streamStateGood()) {
@@ -362,7 +378,6 @@ void TelemetryBidiReactor::tryWriteNext() {
 
 void TelemetryBidiReactor::fireClose() {
   SPDLOG_INFO("{}#fireClose", peer_address_);
-
   {
     absl::MutexLock lk(&stream_state_mtx_);
     if (!streamStateGood()) {
@@ -377,9 +392,16 @@ void TelemetryBidiReactor::fireClose() {
     stream_state_ = StreamState::Closing;
   }
   tryWriteNext();
+  {
+    absl::MutexLock lk(&stream_state_mtx_);
+    if (stream_state_cv_.WaitWithTimeout(&stream_state_mtx_, 
absl::Seconds(3))) {
+      SPDLOG_WARN("StreamState CondVar timed out before getting signalled");
+    }
+  }
 }
 
 void TelemetryBidiReactor::OnWritesDoneDone(bool ok) {
+  SPDLOG_DEBUG("{}#OnWritesDoneDone", peer_address_);
   // Remove the hold for the write stream.
   RemoveHold();
 
@@ -419,12 +441,7 @@ void TelemetryBidiReactor::OnDone(const grpc::Status& 
status) {
 
   {
     SPDLOG_DEBUG("{} notifies awaiting close call", peer_address_);
-    absl::MutexLock lk(&stream_state_mtx_);
-    if (streamStateGood()) {
-      stream_state_ = StreamState::Closed;
-    }
-
-    stream_state_cv_.SignalAll();
+    changeStreamStateThenNotify(StreamState::Closed);
   }
 
   auto client = client_.lock();
@@ -438,6 +455,7 @@ void TelemetryBidiReactor::OnDone(const grpc::Status& 
status) {
 }
 
 void TelemetryBidiReactor::OnReadInitialMetadataDone(bool ok) {
+  SPDLOG_DEBUG("{}#OnReadInitialMetadataDone", peer_address_);
   if (!ok) {
     absl::MutexLock lk(&stream_state_mtx_);
     if (streamStateGood()) {
diff --git a/cpp/source/client/include/TelemetryBidiReactor.h 
b/cpp/source/client/include/TelemetryBidiReactor.h
index 90d2f63d..63c6f5e6 100644
--- a/cpp/source/client/include/TelemetryBidiReactor.h
+++ b/cpp/source/client/include/TelemetryBidiReactor.h
@@ -32,7 +32,8 @@
 
 ROCKETMQ_NAMESPACE_BEGIN
 
-enum class StreamState : std::uint8_t {
+enum class StreamState : std::uint8_t
+{
   Active = 0,
 
   /// Once the stream enters this state, new write requests shall be rejected
@@ -48,13 +49,10 @@ enum class StreamState : std::uint8_t {
   WriteFailure = 5,
 };
 
-class TelemetryBidiReactor
-    : public grpc::ClientBidiReactor<TelemetryCommand, TelemetryCommand>,
-      public std::enable_shared_from_this<TelemetryBidiReactor> {
+class TelemetryBidiReactor : public grpc::ClientBidiReactor<TelemetryCommand, 
TelemetryCommand>,
+                             public 
std::enable_shared_from_this<TelemetryBidiReactor> {
 public:
-  TelemetryBidiReactor(std::weak_ptr<Client> client,
-                       rmq::MessagingService::Stub *stub,
-                       std::string peer_address);
+  TelemetryBidiReactor(std::weak_ptr<Client> client, 
rmq::MessagingService::Stub* stub, std::string peer_address);
 
   ~TelemetryBidiReactor();
 
@@ -65,7 +63,7 @@ public:
   /// (like failure to remove a hold).
   ///
   /// \param[in] s The status outcome of this RPC
-  void OnDone(const grpc::Status &status) override;
+  void OnDone(const grpc::Status& status) override;
 
   /// Notifies the application that a read of initial metadata from the
   /// server is done. If the application chooses not to implement this method,
@@ -99,8 +97,6 @@ public:
   ///               further Start* should be called.
   void OnWritesDoneDone(bool ok) override;
 
-  
-
   /// Core API method to initiate this bidirectional stream.
   void write(TelemetryCommand command);
 
@@ -150,18 +146,17 @@ private:
   absl::Mutex server_setting_received_mtx_;
   absl::CondVar server_setting_received_cv_;
 
+  void changeStreamStateThenNotify(StreamState state);
+
   void onVerifyMessageResult(TelemetryCommand command);
 
-  void applySettings(const rmq::Settings &settings);
+  void applySettings(const rmq::Settings& settings);
 
-  void applyBackoffPolicy(const rmq::Settings &settings,
-                          std::shared_ptr<Client> &client);
+  void applyBackoffPolicy(const rmq::Settings& settings, 
std::shared_ptr<Client>& client);
 
-  void applyPublishingConfig(const rmq::Settings &settings,
-                             std::shared_ptr<Client> client);
+  void applyPublishingConfig(const rmq::Settings& settings, 
std::shared_ptr<Client> client);
 
-  void applySubscriptionConfig(const rmq::Settings &settings,
-                               std::shared_ptr<Client> client);
+  void applySubscriptionConfig(const rmq::Settings& settings, 
std::shared_ptr<Client> client);
 
   /**
    * Indicate if the underlying gRPC bidirectional stream is good enough to 
fire

Reply via email to