This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop-cpp
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git

commit 196b6e9c9387d3f5a7a9fe61d4a9e085a783058d
Author: Li Zhanhui <[email protected]>
AuthorDate: Fri Mar 15 13:12:37 2024 +0800

    fix: fix stream state transition with gRPC reactor
    
    Signed-off-by: Li Zhanhui <[email protected]>
---
 cpp/source/client/TelemetryBidiReactor.cpp       | 104 +++++++++++++++--------
 cpp/source/client/include/TelemetryBidiReactor.h |  60 ++++++++++---
 2 files changed, 119 insertions(+), 45 deletions(-)

diff --git a/cpp/source/client/TelemetryBidiReactor.cpp 
b/cpp/source/client/TelemetryBidiReactor.cpp
index a55a7473..827fdda2 100644
--- a/cpp/source/client/TelemetryBidiReactor.cpp
+++ b/cpp/source/client/TelemetryBidiReactor.cpp
@@ -35,8 +35,8 @@ ROCKETMQ_NAMESPACE_BEGIN
 TelemetryBidiReactor::TelemetryBidiReactor(std::weak_ptr<Client> client,
                                            rmq::MessagingService::Stub* stub,
                                            std::string peer_address)
-    : client_(client), peer_address_(std::move(peer_address)), 
stream_state_(StreamState::Created) {
-  auto ptr = client.lock();
+    : client_(client), peer_address_(std::move(peer_address)), 
stream_state_(StreamState::Active) {
+  auto ptr = client_.lock();
   auto deadline = std::chrono::system_clock::now() + std::chrono::hours(1);
   context_.set_deadline(deadline);
   Metadata metadata;
@@ -45,6 +45,7 @@ 
TelemetryBidiReactor::TelemetryBidiReactor(std::weak_ptr<Client> client,
     context_.AddMetadata(entry.first, entry.second);
   }
   stub->async()->Telemetry(&context_, this);
+  fireRead();
   StartCall();
 }
 
@@ -64,8 +65,6 @@ bool TelemetryBidiReactor::await() {
 }
 
 void TelemetryBidiReactor::OnWriteDone(bool ok) {
-  SPDLOG_DEBUG("OnWriteDone: {}", ok);
-
   {
     bool expect = true;
     if (!command_inflight_.compare_exchange_strong(expect, false, 
std::memory_order_relaxed)) {
@@ -77,36 +76,28 @@ void TelemetryBidiReactor::OnWriteDone(bool ok) {
     SPDLOG_WARN("Failed to write telemetry command {} to {}", 
write_.DebugString(), peer_address_);
     {
       absl::MutexLock lk(&stream_state_mtx_);
-      stream_state_ = StreamState::WriteDone;
+      if (streamStateGood()) {
+        stream_state_ = StreamState::WriteFailure;
+      }
     }
-
-    fireClose();
     return;
   }
 
-  {
-    absl::MutexLock lk(&stream_state_mtx_);
-    if (StreamState::Created == stream_state_) {
-      stream_state_ = StreamState::Active;
-      fireRead();
-    }
-  }
-
   fireWrite();
 }
 
 void TelemetryBidiReactor::OnReadDone(bool ok) {
-  SPDLOG_DEBUG("OnReadDone: ok={}", ok);
   if (!ok) {
-    if (client_.lock()) {
-      SPDLOG_WARN("Failed to read telemetry command from {}", peer_address_);
-    }
-
     {
       absl::MutexLock lk(&stream_state_mtx_);
-      stream_state_ = StreamState::ReadDone;
+      if (!ok) {
+        if (streamStateGood()) {
+          stream_state_ = StreamState::ReadFailure;
+          SPDLOG_WARN("Faild to read from telemetry stream from {}", 
peer_address_);
+        }
+        return;
+      }
     }
-    fireClose();
     return;
   }
 
@@ -283,6 +274,16 @@ void TelemetryBidiReactor::applySubscriptionConfig(const 
rmq::Settings& settings
 
 void TelemetryBidiReactor::fireRead() {
   SPDLOG_DEBUG("{}#fireRead", peer_address_);
+
+  {
+    absl::MutexLock lk(&stream_state_mtx_);
+    if (!streamStateGood()) {
+      SPDLOG_WARN("Further read from {} is not allowded due to 
stream-state={}", peer_address_,
+                  static_cast<std::uint8_t>(stream_state_));
+      return;
+    }
+  }
+
   StartRead(&read_);
 }
 
@@ -295,13 +296,11 @@ void TelemetryBidiReactor::write(TelemetryCommand 
command) {
 }
 
 void TelemetryBidiReactor::fireWrite() {
-  SPDLOG_DEBUG("{}#fireWrite", peer_address_);
-
   {
     absl::MutexLock lk(&stream_state_mtx_);
-    if (stream_state_ != StreamState::Active && stream_state_ != 
StreamState::Created) {
-      SPDLOG_WARN("TelemetryBidiReactor to {} is closed or half-closed, 
ignoring fireWrite event. stream-state={}",
-                  peer_address_, static_cast<std::uint8_t>(stream_state_));
+    if (!streamStateGood()) {
+      SPDLOG_WARN("Further write to {} is not allowded due to 
stream-state={}", peer_address_,
+                  static_cast<std::uint8_t>(stream_state_));
       return;
     }
   }
@@ -328,18 +327,33 @@ void TelemetryBidiReactor::fireWrite() {
 
 void TelemetryBidiReactor::fireClose() {
   SPDLOG_INFO("{}#fireClose", peer_address_);
-  if (StreamState::Active == stream_state_) {
-    StartWritesDone();
-    {
-      absl::MutexLock lk(&stream_state_mtx_);
-      if (StreamState::Active == stream_state_) {
-        stream_state_cv_.Wait(&stream_state_mtx_);
-      }
+
+  {
+    absl::MutexLock lk(&stream_state_mtx_);
+    if (!streamStateGood()) {
+      SPDLOG_WARN("No futher Read/Write call to {} is allowed due to 
stream-state={}", peer_address_,
+                  static_cast<std::uint8_t>(stream_state_));
+      return;
     }
   }
+
+  StartWritesDone();
+  {
+    absl::MutexLock lk(&stream_state_mtx_);
+    stream_state_cv_.Wait(&stream_state_mtx_);
+  }
 }
 
 void TelemetryBidiReactor::OnWritesDoneDone(bool ok) {
+  if (!ok) {
+    absl::MutexLock lk(&stream_state_mtx_);
+    if (streamStateGood()) {
+      stream_state_ = StreamState::WriteFailure;
+    }
+    SPDLOG_WARN("Previous telemetry write to {} failed", peer_address_);
+    return;
+  }
+
   SPDLOG_DEBUG("{}#OnWritesDoneDone", peer_address_);
 }
 
@@ -368,7 +382,10 @@ void TelemetryBidiReactor::OnDone(const grpc::Status& 
status) {
   {
     SPDLOG_DEBUG("{} notifies awaiting close call", peer_address_);
     absl::MutexLock lk(&stream_state_mtx_);
-    stream_state_ = StreamState::Closed;
+    if (streamStateGood()) {
+      stream_state_ = StreamState::Closed;
+    }
+
     stream_state_cv_.SignalAll();
   }
 
@@ -382,4 +399,21 @@ void TelemetryBidiReactor::OnDone(const grpc::Status& 
status) {
   }
 }
 
+void TelemetryBidiReactor::OnReadInitialMetadataDone(bool ok) {
+  if (!ok) {
+    absl::MutexLock lk(&stream_state_mtx_);
+    if (streamStateGood()) {
+      stream_state_ = StreamState::ReadInitialMetadataFailure;
+    }
+    SPDLOG_WARN("Read of initial-metadata failed from {}", peer_address_);
+    return;
+  }
+
+  SPDLOG_DEBUG("Received initial metadata from {}", peer_address_);
+}
+
+bool TelemetryBidiReactor::streamStateGood() {
+  return StreamState::Active == stream_state_;
+}
+
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/client/include/TelemetryBidiReactor.h 
b/cpp/source/client/include/TelemetryBidiReactor.h
index aba116eb..72f21951 100644
--- a/cpp/source/client/include/TelemetryBidiReactor.h
+++ b/cpp/source/client/include/TelemetryBidiReactor.h
@@ -33,11 +33,13 @@ ROCKETMQ_NAMESPACE_BEGIN
 
 enum class StreamState : std::uint8_t
 {
-  Created = 0,
-  Active = 1,
-  ReadDone = 2,
-  WriteDone = 3,
-  Closed = 4,
+  Active = 0,
+
+  // Once stream state reaches one of the following, Start* should not be 
called.
+  Closed = 1,
+  ReadInitialMetadataFailure = 2,
+  ReadFailure = 3,
+  WriteFailure = 4,
 };
 
 class TelemetryBidiReactor : public grpc::ClientBidiReactor<TelemetryCommand, 
TelemetryCommand>,
@@ -47,13 +49,46 @@ public:
 
   ~TelemetryBidiReactor();
 
-  void OnWriteDone(bool ok) override;
-
-  void OnWritesDoneDone(bool ok) override;
+  /// Notifies the application that all operations associated with this RPC
+  /// have completed and all Holds have been removed. OnDone provides the RPC
+  /// status outcome for both successful and failed RPCs and will be called in
+  /// all cases. If it is not called, it indicates an application-level problem
+  /// (like failure to remove a hold).
+  ///
+  /// \param[in] s The status outcome of this RPC
+  void OnDone(const grpc::Status& status) override;
 
+  /// Notifies the application that a read of initial metadata from the
+  /// server is done. If the application chooses not to implement this method,
+  /// it can assume that the initial metadata has been read before the first
+  /// call of OnReadDone or OnDone.
+  ///
+  /// \param[in] ok Was the initial metadata read successfully? If false, no
+  ///               new read/write operation will succeed, and any further
+  ///               Start* operations should not be called.
+  void OnReadInitialMetadataDone(bool /*ok*/) override;
+
+  /// Notifies the application that a StartRead operation completed.
+  ///
+  /// \param[in] ok Was it successful? If false, no new read/write operation
+  ///               will succeed, and any further Start* should not be called.
   void OnReadDone(bool ok) override;
 
-  void OnDone(const grpc::Status& status) override;
+  /// Notifies the application that a StartWrite or StartWriteLast operation
+  /// completed.
+  ///
+  /// \param[in] ok Was it successful? If false, no new read/write operation
+  ///               will succeed, and any further Start* should not be called.
+  void OnWriteDone(bool ok) override;
+
+  /// Notifies the application that a StartWritesDone operation completed. Note
+  /// that this is only used on explicit StartWritesDone operations and not for
+  /// those that are implicitly invoked as part of a StartWriteLast.
+  ///
+  /// \param[in] ok Was it successful? If false, the application will later see
+  ///               the failure reflected as a bad status in OnDone and no
+  ///               further Start* should be called.
+  void OnWritesDoneDone(bool ok) override;
 
   void fireRead();
 
@@ -87,7 +122,7 @@ private:
   TelemetryCommand write_;
 
   /**
-   * @brief Each TelemetryBidiReactor belongs to a specific client as its 
owner. 
+   * @brief Each TelemetryBidiReactor belongs to a specific client as its 
owner.
    */
   std::weak_ptr<Client> client_;
 
@@ -118,6 +153,11 @@ private:
   void applyPublishingConfig(const rmq::Settings& settings, 
std::shared_ptr<Client> client);
 
   void applySubscriptionConfig(const rmq::Settings& settings, 
std::shared_ptr<Client> client);
+
+  /**
+   * Indicate if the underlying gRPC bidirectional stream is good enough to 
fire further Start* calls.
+   */
+  bool streamStateGood() ABSL_EXCLUSIVE_LOCKS_REQUIRED(stream_state_mtx_);
 };
 
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file

Reply via email to