This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop-cpp
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/develop-cpp by this push:
     new 096bd82f fix: revamp TelemetryBidiRecator
096bd82f is described below

commit 096bd82f10fc28fa11829792582be42ed01da301
Author: Li Zhanhui <[email protected]>
AuthorDate: Fri Mar 22 22:14:37 2024 +0800

    fix: revamp TelemetryBidiRecator
    
    Signed-off-by: Li Zhanhui <[email protected]>
---
 cpp/source/client/TelemetryBidiReactor.cpp       | 92 +++++++++++++++++-------
 cpp/source/client/include/TelemetryBidiReactor.h | 74 +++++++++++--------
 cpp/source/stats/MetricBidiReactor.cpp           | 40 ++++++++---
 cpp/source/stats/include/MetricBidiReactor.h     | 24 ++++---
 4 files changed, 155 insertions(+), 75 deletions(-)

diff --git a/cpp/source/client/TelemetryBidiReactor.cpp 
b/cpp/source/client/TelemetryBidiReactor.cpp
index 827fdda2..b5eeb616 100644
--- a/cpp/source/client/TelemetryBidiReactor.cpp
+++ b/cpp/source/client/TelemetryBidiReactor.cpp
@@ -45,7 +45,8 @@ 
TelemetryBidiReactor::TelemetryBidiReactor(std::weak_ptr<Client> client,
     context_.AddMetadata(entry.first, entry.second);
   }
   stub->async()->Telemetry(&context_, this);
-  fireRead();
+  // Increase hold for write stream.
+  AddHold();
   StartCall();
 }
 
@@ -66,14 +67,20 @@ bool TelemetryBidiReactor::await() {
 
 void TelemetryBidiReactor::OnWriteDone(bool ok) {
   {
-    bool expect = true;
-    if (!command_inflight_.compare_exchange_strong(expect, false, 
std::memory_order_relaxed)) {
+    bool expected = true;
+    if (!command_inflight_.compare_exchange_strong(expected, false, 
std::memory_order_relaxed)) {
       SPDLOG_WARN("Illegal command-inflight state");
+      return;
     }
   }
 
   if (!ok) {
-    SPDLOG_WARN("Failed to write telemetry command {} to {}", 
write_.DebugString(), peer_address_);
+    RemoveHold();
+    {
+      absl::MutexLock lk(&writes_mtx_);
+      SPDLOG_WARN("Failed to write telemetry command {} to {}", 
writes_.front().DebugString(), peer_address_);
+    }
+
     {
       absl::MutexLock lk(&stream_state_mtx_);
       if (streamStateGood()) {
@@ -83,11 +90,21 @@ void TelemetryBidiReactor::OnWriteDone(bool ok) {
     return;
   }
 
-  fireWrite();
+  // Check if the read stream has started.
+  fireRead();
+
+  // Remove the command that has been written to server.
+  {
+    absl::MutexLock lk(&writes_mtx_);
+    writes_.pop_front();
+  }
+
+  tryWriteNext();
 }
 
 void TelemetryBidiReactor::OnReadDone(bool ok) {
   if (!ok) {
+    RemoveHold();
     {
       absl::MutexLock lk(&stream_state_mtx_);
       if (!ok) {
@@ -142,11 +159,7 @@ void TelemetryBidiReactor::OnReadDone(bool ok) {
       TelemetryCommand response;
       
response.mutable_thread_stack_trace()->set_nonce(read_.print_thread_stack_trace_command().nonce());
       
response.mutable_thread_stack_trace()->set_thread_stack_trace("PrintStackTrace 
is not supported");
-      {
-        absl::MutexLock lk(&writes_mtx_);
-        writes_.push_back(response);
-      }
-      fireWrite();
+      write(std::move(response));
       break;
     }
 
@@ -179,7 +192,7 @@ void TelemetryBidiReactor::OnReadDone(bool ok) {
     }
   }
 
-  fireRead();
+  StartRead(&read_);
 }
 
 void TelemetryBidiReactor::applySettings(const rmq::Settings& settings) {
@@ -273,8 +286,6 @@ void TelemetryBidiReactor::applySubscriptionConfig(const 
rmq::Settings& settings
 }
 
 void TelemetryBidiReactor::fireRead() {
-  SPDLOG_DEBUG("{}#fireRead", peer_address_);
-
   {
     absl::MutexLock lk(&stream_state_mtx_);
     if (!streamStateGood()) {
@@ -284,18 +295,31 @@ void TelemetryBidiReactor::fireRead() {
     }
   }
 
-  StartRead(&read_);
+  bool expected = false;
+  if (read_stream_started_.compare_exchange_weak(expected, true, 
std::memory_order_relaxed)) {
+    // Hold for the read stream.
+    AddHold();
+    StartRead(&read_);
+  }
 }
 
 void TelemetryBidiReactor::write(TelemetryCommand command) {
+  {
+    absl::MutexLock lk(&stream_state_mtx_);
+    // Reject incoming write commands if the stream state is closing or has 
witnessed some error.
+    if (!streamStateGood() || StreamState::Closing == stream_state_) {
+      return;
+    }
+  }
+
   {
     absl::MutexLock lk(&writes_mtx_);
     writes_.push_back(command);
   }
-  fireWrite();
+  tryWriteNext();
 }
 
-void TelemetryBidiReactor::fireWrite() {
+void TelemetryBidiReactor::tryWriteNext() {
   {
     absl::MutexLock lk(&stream_state_mtx_);
     if (!streamStateGood()) {
@@ -305,24 +329,35 @@ void TelemetryBidiReactor::fireWrite() {
     }
   }
 
+  bool closing = false;
+  {
+    absl::MutexLock lk(&stream_state_mtx_);
+    if (StreamState::Closing == stream_state_) {
+      closing = true;
+    }
+  }
+
   {
     absl::MutexLock lk(&writes_mtx_);
-    if (writes_.empty()) {
+    if (writes_.empty() && !closing) {
       SPDLOG_DEBUG("No TelemtryCommand to write. Peer={}", peer_address_);
       return;
     }
 
-    bool expect = false;
-    if (command_inflight_.compare_exchange_strong(expect, true, 
std::memory_order_relaxed)) {
-      write_ = std::move(*writes_.begin());
-      writes_.erase(writes_.begin());
+    bool expected = false;
+    if (command_inflight_.compare_exchange_strong(expected, true, 
std::memory_order_relaxed)) {
+      if (writes_.empty()) {
+        // Tell server there is no more write requests.
+        StartWritesDone();
+      } else {
+        SPDLOG_DEBUG("Writing telemetry command to {}: {}", peer_address_, 
writes_.front().DebugString());
+        StartWrite(&(writes_.front()));
+      }
     } else {
       SPDLOG_DEBUG("Another command is already on the wire. Peer={}", 
peer_address_);
       return;
     }
   }
-  SPDLOG_DEBUG("Writing telemetry command to {}: {}", peer_address_, 
write_.DebugString());
-  StartWrite(&write_);
 }
 
 void TelemetryBidiReactor::fireClose() {
@@ -337,14 +372,17 @@ void TelemetryBidiReactor::fireClose() {
     }
   }
 
-  StartWritesDone();
   {
     absl::MutexLock lk(&stream_state_mtx_);
-    stream_state_cv_.Wait(&stream_state_mtx_);
+    stream_state_ = StreamState::Closing;
   }
+  tryWriteNext();
 }
 
 void TelemetryBidiReactor::OnWritesDoneDone(bool ok) {
+  // Remove the hold for the write stream.
+  RemoveHold();
+
   if (!ok) {
     absl::MutexLock lk(&stream_state_mtx_);
     if (streamStateGood()) {
@@ -362,7 +400,7 @@ void 
TelemetryBidiReactor::onVerifyMessageResult(TelemetryCommand command) {
     absl::MutexLock lk(&writes_mtx_);
     writes_.emplace_back(command);
   }
-  fireWrite();
+  tryWriteNext();
 }
 
 /// Notifies the application that all operations associated with this RPC
@@ -413,7 +451,7 @@ void TelemetryBidiReactor::OnReadInitialMetadataDone(bool 
ok) {
 }
 
 bool TelemetryBidiReactor::streamStateGood() {
-  return StreamState::Active == stream_state_;
+  return StreamState::Active == stream_state_ || StreamState::Closing == 
stream_state_;
 }
 
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/client/include/TelemetryBidiReactor.h 
b/cpp/source/client/include/TelemetryBidiReactor.h
index 72f21951..90d2f63d 100644
--- a/cpp/source/client/include/TelemetryBidiReactor.h
+++ b/cpp/source/client/include/TelemetryBidiReactor.h
@@ -19,6 +19,7 @@
 #include <atomic>
 #include <chrono>
 #include <cstdint>
+#include <list>
 #include <memory>
 #include <utility>
 #include <vector>
@@ -31,21 +32,29 @@
 
 ROCKETMQ_NAMESPACE_BEGIN
 
-enum class StreamState : std::uint8_t
-{
+enum class StreamState : std::uint8_t {
   Active = 0,
 
-  // Once stream state reaches one of the following, Start* should not be 
called.
-  Closed = 1,
-  ReadInitialMetadataFailure = 2,
-  ReadFailure = 3,
-  WriteFailure = 4,
+  /// Once the stream enters this state, new write requests shall be rejected
+  /// and once currently pending requests are written, write stream should be
+  /// closed as soon as possible.
+  Closing = 1,
+
+  // Once stream state reaches one of the following, Start* should not be
+  // called.
+  Closed = 2,
+  ReadInitialMetadataFailure = 3,
+  ReadFailure = 4,
+  WriteFailure = 5,
 };
 
-class TelemetryBidiReactor : public grpc::ClientBidiReactor<TelemetryCommand, 
TelemetryCommand>,
-                             public 
std::enable_shared_from_this<TelemetryBidiReactor> {
+class TelemetryBidiReactor
+    : public grpc::ClientBidiReactor<TelemetryCommand, TelemetryCommand>,
+      public std::enable_shared_from_this<TelemetryBidiReactor> {
 public:
-  TelemetryBidiReactor(std::weak_ptr<Client> client, 
rmq::MessagingService::Stub* stub, std::string peer_address);
+  TelemetryBidiReactor(std::weak_ptr<Client> client,
+                       rmq::MessagingService::Stub *stub,
+                       std::string peer_address);
 
   ~TelemetryBidiReactor();
 
@@ -56,7 +65,7 @@ public:
   /// (like failure to remove a hold).
   ///
   /// \param[in] s The status outcome of this RPC
-  void OnDone(const grpc::Status& status) override;
+  void OnDone(const grpc::Status &status) override;
 
   /// Notifies the application that a read of initial metadata from the
   /// server is done. If the application chooses not to implement this method,
@@ -90,16 +99,15 @@ public:
   ///               further Start* should be called.
   void OnWritesDoneDone(bool ok) override;
 
-  void fireRead();
-
-  void fireWrite();
-
-  void fireClose();
+  
 
+  /// Core API method to initiate this bidirectional stream.
   void write(TelemetryCommand command);
 
   bool await();
 
+  void fireClose();
+
 private:
   grpc::ClientContext context_;
 
@@ -111,16 +119,12 @@ private:
   /**
    * @brief Buffered commands to write to server
    *
-   * TODO: move buffered commands to a shared container, which may survive 
multiple TelemetryBidiReactor lifecycles.
+   * TODO: move buffered commands to a shared container, which may survive
+   * multiple TelemetryBidiReactor lifecycles.
    */
-  std::vector<TelemetryCommand> writes_ GUARDED_BY(writes_mtx_);
+  std::list<TelemetryCommand> writes_ GUARDED_BY(writes_mtx_);
   absl::Mutex writes_mtx_;
 
-  /**
-   * @brief The command that is currently being written back to server.
-   */
-  TelemetryCommand write_;
-
   /**
    * @brief Each TelemetryBidiReactor belongs to a specific client as its 
owner.
    */
@@ -136,6 +140,8 @@ private:
    */
   std::atomic_bool command_inflight_{false};
 
+  std::atomic_bool read_stream_started_{false};
+
   StreamState stream_state_ GUARDED_BY(stream_state_mtx_);
   absl::Mutex stream_state_mtx_;
   absl::CondVar stream_state_cv_;
@@ -146,18 +152,30 @@ private:
 
   void onVerifyMessageResult(TelemetryCommand command);
 
-  void applySettings(const rmq::Settings& settings);
+  void applySettings(const rmq::Settings &settings);
 
-  void applyBackoffPolicy(const rmq::Settings& settings, 
std::shared_ptr<Client>& client);
+  void applyBackoffPolicy(const rmq::Settings &settings,
+                          std::shared_ptr<Client> &client);
 
-  void applyPublishingConfig(const rmq::Settings& settings, 
std::shared_ptr<Client> client);
+  void applyPublishingConfig(const rmq::Settings &settings,
+                             std::shared_ptr<Client> client);
 
-  void applySubscriptionConfig(const rmq::Settings& settings, 
std::shared_ptr<Client> client);
+  void applySubscriptionConfig(const rmq::Settings &settings,
+                               std::shared_ptr<Client> client);
 
   /**
-   * Indicate if the underlying gRPC bidirectional stream is good enough to 
fire further Start* calls.
+   * Indicate if the underlying gRPC bidirectional stream is good enough to 
fire
+   * further Start* calls.
    */
   bool streamStateGood() ABSL_EXCLUSIVE_LOCKS_REQUIRED(stream_state_mtx_);
+
+  /// Start the read stream.
+  ///
+  /// Once got the OnReadDone and status is OK, call StartRead immediately.
+  void fireRead();
+
+  /// Attempt to write pending telemetry command to server.
+  void tryWriteNext();
 };
 
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/stats/MetricBidiReactor.cpp 
b/cpp/source/stats/MetricBidiReactor.cpp
index 22363bd0..e6921378 100644
--- a/cpp/source/stats/MetricBidiReactor.cpp
+++ b/cpp/source/stats/MetricBidiReactor.cpp
@@ -18,10 +18,10 @@
 
 #include <chrono>
 
-#include "rocketmq/Logger.h"
-#include "spdlog/spdlog.h"
 #include "OpencensusExporter.h"
 #include "Signature.h"
+#include "rocketmq/Logger.h"
+#include "spdlog/spdlog.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
@@ -43,12 +43,15 @@ MetricBidiReactor::MetricBidiReactor(std::weak_ptr<Client> 
client, std::weak_ptr
     return;
   }
   exporter_ptr->stub()->async()->Export(&context_, this);
+  AddHold();
   StartCall();
 }
 
 void MetricBidiReactor::OnReadDone(bool ok) {
   if (!ok) {
     SPDLOG_WARN("Failed to read response");
+    // match the AddHold() call in MetricBidiReactor::fireRead
+    RemoveHold();
     return;
   }
   SPDLOG_DEBUG("OnReadDone OK");
@@ -56,16 +59,32 @@ void MetricBidiReactor::OnReadDone(bool ok) {
 }
 
 void MetricBidiReactor::OnWriteDone(bool ok) {
+  {
+    bool expected = true;
+    if (!inflight_.compare_exchange_strong(expected, false, 
std::memory_order_relaxed)) {
+      SPDLOG_WARN("Illegal command-inflight state");
+      return;
+    }
+  }
+
   if (!ok) {
     SPDLOG_WARN("Failed to report metrics");
+    // match AddHold() call in MetricBidiReactor::MetricBidiReactor
+    RemoveHold();
     return;
   }
   SPDLOG_DEBUG("OnWriteDone OK");
+
+  // If the read stream has not started yet, start it now.
   fireRead();
-  bool expected = true;
-  if (inflight_.compare_exchange_strong(expected, false, 
std::memory_order_relaxed)) {
-    fireWrite();
+
+  // Remove the one that been written.
+  {
+    absl::MutexLock lk(&requests_mtx_);
+    requests_.pop_front();
   }
+
+  tryWriteNext();
 }
 
 void MetricBidiReactor::OnDone(const grpc::Status& s) {
@@ -89,13 +108,13 @@ void MetricBidiReactor::write(ExportMetricsServiceRequest 
request) {
   SPDLOG_DEBUG("Append ExportMetricsServiceRequest to buffer");
   {
     absl::MutexLock lk(&requests_mtx_);
-    requests_.emplace_back(std::move(request));
+    requests_.push_back(std::move(request));
   }
 
-  fireWrite();
+  tryWriteNext();
 }
 
-void MetricBidiReactor::fireWrite() {
+void MetricBidiReactor::tryWriteNext() {
   {
     absl::MutexLock lk(&requests_mtx_);
     if (requests_.empty()) {
@@ -107,16 +126,15 @@ void MetricBidiReactor::fireWrite() {
   bool expected = false;
   if (inflight_.compare_exchange_strong(expected, true, 
std::memory_order_relaxed)) {
     absl::MutexLock lk(&requests_mtx_);
-    request_ = std::move(*requests_.begin());
-    requests_.erase(requests_.begin());
     SPDLOG_DEBUG("MetricBidiReactor#StartWrite");
-    StartWrite(&request_);
+    StartWrite(&(requests_.front()));
   }
 }
 
 void MetricBidiReactor::fireRead() {
   bool expected = false;
   if (read_.compare_exchange_strong(expected, true, 
std::memory_order_relaxed)) {
+    AddHold();
     StartRead(&response_);
   }
 }
diff --git a/cpp/source/stats/include/MetricBidiReactor.h 
b/cpp/source/stats/include/MetricBidiReactor.h
index 0a10cd88..e4d75344 100644
--- a/cpp/source/stats/include/MetricBidiReactor.h
+++ b/cpp/source/stats/include/MetricBidiReactor.h
@@ -16,6 +16,8 @@
  */
 #pragma once
 
+#include <list>
+
 #include "Client.h"
 #include "grpcpp/grpcpp.h"
 #include "grpcpp/impl/codegen/client_callback.h"
@@ -25,12 +27,17 @@ ROCKETMQ_NAMESPACE_BEGIN
 
 class OpencensusExporter;
 
-using ExportMetricsServiceRequest = 
opencensus::proto::agent::metrics::v1::ExportMetricsServiceRequest;
-using ExportMetricsServiceResponse = 
opencensus::proto::agent::metrics::v1::ExportMetricsServiceResponse;
+using ExportMetricsServiceRequest =
+    opencensus::proto::agent::metrics::v1::ExportMetricsServiceRequest;
+using ExportMetricsServiceResponse =
+    opencensus::proto::agent::metrics::v1::ExportMetricsServiceResponse;
 
-class MetricBidiReactor : public 
grpc::ClientBidiReactor<ExportMetricsServiceRequest, 
ExportMetricsServiceResponse> {
+class MetricBidiReactor
+    : public grpc::ClientBidiReactor<ExportMetricsServiceRequest,
+                                     ExportMetricsServiceResponse> {
 public:
-  MetricBidiReactor(std::weak_ptr<Client> client, 
std::weak_ptr<OpencensusExporter> exporter);
+  MetricBidiReactor(std::weak_ptr<Client> client,
+                    std::weak_ptr<OpencensusExporter> exporter);
 
   /// Notifies the application that a StartRead operation completed.
   ///
@@ -52,7 +59,7 @@ public:
   /// (like failure to remove a hold).
   ///
   /// \param[in] s The status outcome of this RPC
-  void OnDone(const grpc::Status& /*s*/) override;
+  void OnDone(const grpc::Status & /*s*/) override;
 
   void write(ExportMetricsServiceRequest request) 
LOCKS_EXCLUDED(requests_mtx_);
 
@@ -61,9 +68,8 @@ private:
   std::weak_ptr<OpencensusExporter> exporter_;
   grpc::ClientContext context_;
 
-  ExportMetricsServiceRequest request_;
-
-  std::vector<ExportMetricsServiceRequest> requests_ GUARDED_BY(requests_mtx_);
+  /// Pending ExportMetricsServiceRequest items to write to server
+  std::list<ExportMetricsServiceRequest> requests_ GUARDED_BY(requests_mtx_);
   absl::Mutex requests_mtx_;
 
   std::atomic_bool inflight_{false};
@@ -71,7 +77,7 @@ private:
 
   ExportMetricsServiceResponse response_;
 
-  void fireWrite();
+  void tryWriteNext();
 
   void fireRead();
 };

Reply via email to