This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop-cpp
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/develop-cpp by this push:
new 096bd82f fix: revamp TelemetryBidiRecator
096bd82f is described below
commit 096bd82f10fc28fa11829792582be42ed01da301
Author: Li Zhanhui <[email protected]>
AuthorDate: Fri Mar 22 22:14:37 2024 +0800
fix: revamp TelemetryBidiRecator
Signed-off-by: Li Zhanhui <[email protected]>
---
cpp/source/client/TelemetryBidiReactor.cpp | 92 +++++++++++++++++-------
cpp/source/client/include/TelemetryBidiReactor.h | 74 +++++++++++--------
cpp/source/stats/MetricBidiReactor.cpp | 40 ++++++++---
cpp/source/stats/include/MetricBidiReactor.h | 24 ++++---
4 files changed, 155 insertions(+), 75 deletions(-)
diff --git a/cpp/source/client/TelemetryBidiReactor.cpp
b/cpp/source/client/TelemetryBidiReactor.cpp
index 827fdda2..b5eeb616 100644
--- a/cpp/source/client/TelemetryBidiReactor.cpp
+++ b/cpp/source/client/TelemetryBidiReactor.cpp
@@ -45,7 +45,8 @@
TelemetryBidiReactor::TelemetryBidiReactor(std::weak_ptr<Client> client,
context_.AddMetadata(entry.first, entry.second);
}
stub->async()->Telemetry(&context_, this);
- fireRead();
+ // Increase hold for write stream.
+ AddHold();
StartCall();
}
@@ -66,14 +67,20 @@ bool TelemetryBidiReactor::await() {
void TelemetryBidiReactor::OnWriteDone(bool ok) {
{
- bool expect = true;
- if (!command_inflight_.compare_exchange_strong(expect, false,
std::memory_order_relaxed)) {
+ bool expected = true;
+ if (!command_inflight_.compare_exchange_strong(expected, false,
std::memory_order_relaxed)) {
SPDLOG_WARN("Illegal command-inflight state");
+ return;
}
}
if (!ok) {
- SPDLOG_WARN("Failed to write telemetry command {} to {}",
write_.DebugString(), peer_address_);
+ RemoveHold();
+ {
+ absl::MutexLock lk(&writes_mtx_);
+ SPDLOG_WARN("Failed to write telemetry command {} to {}",
writes_.front().DebugString(), peer_address_);
+ }
+
{
absl::MutexLock lk(&stream_state_mtx_);
if (streamStateGood()) {
@@ -83,11 +90,21 @@ void TelemetryBidiReactor::OnWriteDone(bool ok) {
return;
}
- fireWrite();
+ // Check if the read stream has started.
+ fireRead();
+
+ // Remove the command that has been written to server.
+ {
+ absl::MutexLock lk(&writes_mtx_);
+ writes_.pop_front();
+ }
+
+ tryWriteNext();
}
void TelemetryBidiReactor::OnReadDone(bool ok) {
if (!ok) {
+ RemoveHold();
{
absl::MutexLock lk(&stream_state_mtx_);
if (!ok) {
@@ -142,11 +159,7 @@ void TelemetryBidiReactor::OnReadDone(bool ok) {
TelemetryCommand response;
response.mutable_thread_stack_trace()->set_nonce(read_.print_thread_stack_trace_command().nonce());
response.mutable_thread_stack_trace()->set_thread_stack_trace("PrintStackTrace
is not supported");
- {
- absl::MutexLock lk(&writes_mtx_);
- writes_.push_back(response);
- }
- fireWrite();
+ write(std::move(response));
break;
}
@@ -179,7 +192,7 @@ void TelemetryBidiReactor::OnReadDone(bool ok) {
}
}
- fireRead();
+ StartRead(&read_);
}
void TelemetryBidiReactor::applySettings(const rmq::Settings& settings) {
@@ -273,8 +286,6 @@ void TelemetryBidiReactor::applySubscriptionConfig(const
rmq::Settings& settings
}
void TelemetryBidiReactor::fireRead() {
- SPDLOG_DEBUG("{}#fireRead", peer_address_);
-
{
absl::MutexLock lk(&stream_state_mtx_);
if (!streamStateGood()) {
@@ -284,18 +295,31 @@ void TelemetryBidiReactor::fireRead() {
}
}
- StartRead(&read_);
+ bool expected = false;
+ if (read_stream_started_.compare_exchange_weak(expected, true,
std::memory_order_relaxed)) {
+ // Hold for the read stream.
+ AddHold();
+ StartRead(&read_);
+ }
}
void TelemetryBidiReactor::write(TelemetryCommand command) {
+ {
+ absl::MutexLock lk(&stream_state_mtx_);
+ // Reject incoming write commands if the stream state is closing or has
witnessed some error.
+ if (!streamStateGood() || StreamState::Closing == stream_state_) {
+ return;
+ }
+ }
+
{
absl::MutexLock lk(&writes_mtx_);
writes_.push_back(command);
}
- fireWrite();
+ tryWriteNext();
}
-void TelemetryBidiReactor::fireWrite() {
+void TelemetryBidiReactor::tryWriteNext() {
{
absl::MutexLock lk(&stream_state_mtx_);
if (!streamStateGood()) {
@@ -305,24 +329,35 @@ void TelemetryBidiReactor::fireWrite() {
}
}
+ bool closing = false;
+ {
+ absl::MutexLock lk(&stream_state_mtx_);
+ if (StreamState::Closing == stream_state_) {
+ closing = true;
+ }
+ }
+
{
absl::MutexLock lk(&writes_mtx_);
- if (writes_.empty()) {
+ if (writes_.empty() && !closing) {
SPDLOG_DEBUG("No TelemtryCommand to write. Peer={}", peer_address_);
return;
}
- bool expect = false;
- if (command_inflight_.compare_exchange_strong(expect, true,
std::memory_order_relaxed)) {
- write_ = std::move(*writes_.begin());
- writes_.erase(writes_.begin());
+ bool expected = false;
+ if (command_inflight_.compare_exchange_strong(expected, true,
std::memory_order_relaxed)) {
+ if (writes_.empty()) {
+ // Tell server there is no more write requests.
+ StartWritesDone();
+ } else {
+ SPDLOG_DEBUG("Writing telemetry command to {}: {}", peer_address_,
writes_.front().DebugString());
+ StartWrite(&(writes_.front()));
+ }
} else {
SPDLOG_DEBUG("Another command is already on the wire. Peer={}",
peer_address_);
return;
}
}
- SPDLOG_DEBUG("Writing telemetry command to {}: {}", peer_address_,
write_.DebugString());
- StartWrite(&write_);
}
void TelemetryBidiReactor::fireClose() {
@@ -337,14 +372,17 @@ void TelemetryBidiReactor::fireClose() {
}
}
- StartWritesDone();
{
absl::MutexLock lk(&stream_state_mtx_);
- stream_state_cv_.Wait(&stream_state_mtx_);
+ stream_state_ = StreamState::Closing;
}
+ tryWriteNext();
}
void TelemetryBidiReactor::OnWritesDoneDone(bool ok) {
+ // Remove the hold for the write stream.
+ RemoveHold();
+
if (!ok) {
absl::MutexLock lk(&stream_state_mtx_);
if (streamStateGood()) {
@@ -362,7 +400,7 @@ void
TelemetryBidiReactor::onVerifyMessageResult(TelemetryCommand command) {
absl::MutexLock lk(&writes_mtx_);
writes_.emplace_back(command);
}
- fireWrite();
+ tryWriteNext();
}
/// Notifies the application that all operations associated with this RPC
@@ -413,7 +451,7 @@ void TelemetryBidiReactor::OnReadInitialMetadataDone(bool
ok) {
}
bool TelemetryBidiReactor::streamStateGood() {
- return StreamState::Active == stream_state_;
+ return StreamState::Active == stream_state_ || StreamState::Closing ==
stream_state_;
}
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/client/include/TelemetryBidiReactor.h
b/cpp/source/client/include/TelemetryBidiReactor.h
index 72f21951..90d2f63d 100644
--- a/cpp/source/client/include/TelemetryBidiReactor.h
+++ b/cpp/source/client/include/TelemetryBidiReactor.h
@@ -19,6 +19,7 @@
#include <atomic>
#include <chrono>
#include <cstdint>
+#include <list>
#include <memory>
#include <utility>
#include <vector>
@@ -31,21 +32,29 @@
ROCKETMQ_NAMESPACE_BEGIN
-enum class StreamState : std::uint8_t
-{
+enum class StreamState : std::uint8_t {
Active = 0,
- // Once stream state reaches one of the following, Start* should not be
called.
- Closed = 1,
- ReadInitialMetadataFailure = 2,
- ReadFailure = 3,
- WriteFailure = 4,
+ /// Once the stream enters this state, new write requests shall be rejected
+ /// and once currently pending requests are written, write stream should be
+ /// closed as soon as possible.
+ Closing = 1,
+
+ // Once stream state reaches one of the following, Start* should not be
+ // called.
+ Closed = 2,
+ ReadInitialMetadataFailure = 3,
+ ReadFailure = 4,
+ WriteFailure = 5,
};
-class TelemetryBidiReactor : public grpc::ClientBidiReactor<TelemetryCommand,
TelemetryCommand>,
- public
std::enable_shared_from_this<TelemetryBidiReactor> {
+class TelemetryBidiReactor
+ : public grpc::ClientBidiReactor<TelemetryCommand, TelemetryCommand>,
+ public std::enable_shared_from_this<TelemetryBidiReactor> {
public:
- TelemetryBidiReactor(std::weak_ptr<Client> client,
rmq::MessagingService::Stub* stub, std::string peer_address);
+ TelemetryBidiReactor(std::weak_ptr<Client> client,
+ rmq::MessagingService::Stub *stub,
+ std::string peer_address);
~TelemetryBidiReactor();
@@ -56,7 +65,7 @@ public:
/// (like failure to remove a hold).
///
/// \param[in] s The status outcome of this RPC
- void OnDone(const grpc::Status& status) override;
+ void OnDone(const grpc::Status &status) override;
/// Notifies the application that a read of initial metadata from the
/// server is done. If the application chooses not to implement this method,
@@ -90,16 +99,15 @@ public:
/// further Start* should be called.
void OnWritesDoneDone(bool ok) override;
- void fireRead();
-
- void fireWrite();
-
- void fireClose();
+
+ /// Core API method to initiate this bidirectional stream.
void write(TelemetryCommand command);
bool await();
+ void fireClose();
+
private:
grpc::ClientContext context_;
@@ -111,16 +119,12 @@ private:
/**
* @brief Buffered commands to write to server
*
- * TODO: move buffered commands to a shared container, which may survive
multiple TelemetryBidiReactor lifecycles.
+ * TODO: move buffered commands to a shared container, which may survive
+ * multiple TelemetryBidiReactor lifecycles.
*/
- std::vector<TelemetryCommand> writes_ GUARDED_BY(writes_mtx_);
+ std::list<TelemetryCommand> writes_ GUARDED_BY(writes_mtx_);
absl::Mutex writes_mtx_;
- /**
- * @brief The command that is currently being written back to server.
- */
- TelemetryCommand write_;
-
/**
* @brief Each TelemetryBidiReactor belongs to a specific client as its
owner.
*/
@@ -136,6 +140,8 @@ private:
*/
std::atomic_bool command_inflight_{false};
+ std::atomic_bool read_stream_started_{false};
+
StreamState stream_state_ GUARDED_BY(stream_state_mtx_);
absl::Mutex stream_state_mtx_;
absl::CondVar stream_state_cv_;
@@ -146,18 +152,30 @@ private:
void onVerifyMessageResult(TelemetryCommand command);
- void applySettings(const rmq::Settings& settings);
+ void applySettings(const rmq::Settings &settings);
- void applyBackoffPolicy(const rmq::Settings& settings,
std::shared_ptr<Client>& client);
+ void applyBackoffPolicy(const rmq::Settings &settings,
+ std::shared_ptr<Client> &client);
- void applyPublishingConfig(const rmq::Settings& settings,
std::shared_ptr<Client> client);
+ void applyPublishingConfig(const rmq::Settings &settings,
+ std::shared_ptr<Client> client);
- void applySubscriptionConfig(const rmq::Settings& settings,
std::shared_ptr<Client> client);
+ void applySubscriptionConfig(const rmq::Settings &settings,
+ std::shared_ptr<Client> client);
/**
- * Indicate if the underlying gRPC bidirectional stream is good enough to
fire further Start* calls.
+ * Indicate if the underlying gRPC bidirectional stream is good enough to
fire
+ * further Start* calls.
*/
bool streamStateGood() ABSL_EXCLUSIVE_LOCKS_REQUIRED(stream_state_mtx_);
+
+ /// Start the read stream.
+ ///
+ /// Once got the OnReadDone and status is OK, call StartRead immediately.
+ void fireRead();
+
+ /// Attempt to write pending telemetry command to server.
+ void tryWriteNext();
};
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/stats/MetricBidiReactor.cpp
b/cpp/source/stats/MetricBidiReactor.cpp
index 22363bd0..e6921378 100644
--- a/cpp/source/stats/MetricBidiReactor.cpp
+++ b/cpp/source/stats/MetricBidiReactor.cpp
@@ -18,10 +18,10 @@
#include <chrono>
-#include "rocketmq/Logger.h"
-#include "spdlog/spdlog.h"
#include "OpencensusExporter.h"
#include "Signature.h"
+#include "rocketmq/Logger.h"
+#include "spdlog/spdlog.h"
ROCKETMQ_NAMESPACE_BEGIN
@@ -43,12 +43,15 @@ MetricBidiReactor::MetricBidiReactor(std::weak_ptr<Client>
client, std::weak_ptr
return;
}
exporter_ptr->stub()->async()->Export(&context_, this);
+ AddHold();
StartCall();
}
void MetricBidiReactor::OnReadDone(bool ok) {
if (!ok) {
SPDLOG_WARN("Failed to read response");
+ // match the AddHold() call in MetricBidiReactor::fireRead
+ RemoveHold();
return;
}
SPDLOG_DEBUG("OnReadDone OK");
@@ -56,16 +59,32 @@ void MetricBidiReactor::OnReadDone(bool ok) {
}
void MetricBidiReactor::OnWriteDone(bool ok) {
+ {
+ bool expected = true;
+ if (!inflight_.compare_exchange_strong(expected, false,
std::memory_order_relaxed)) {
+ SPDLOG_WARN("Illegal command-inflight state");
+ return;
+ }
+ }
+
if (!ok) {
SPDLOG_WARN("Failed to report metrics");
+ // match AddHold() call in MetricBidiReactor::MetricBidiReactor
+ RemoveHold();
return;
}
SPDLOG_DEBUG("OnWriteDone OK");
+
+ // If the read stream has not started yet, start it now.
fireRead();
- bool expected = true;
- if (inflight_.compare_exchange_strong(expected, false,
std::memory_order_relaxed)) {
- fireWrite();
+
+ // Remove the one that been written.
+ {
+ absl::MutexLock lk(&requests_mtx_);
+ requests_.pop_front();
}
+
+ tryWriteNext();
}
void MetricBidiReactor::OnDone(const grpc::Status& s) {
@@ -89,13 +108,13 @@ void MetricBidiReactor::write(ExportMetricsServiceRequest
request) {
SPDLOG_DEBUG("Append ExportMetricsServiceRequest to buffer");
{
absl::MutexLock lk(&requests_mtx_);
- requests_.emplace_back(std::move(request));
+ requests_.push_back(std::move(request));
}
- fireWrite();
+ tryWriteNext();
}
-void MetricBidiReactor::fireWrite() {
+void MetricBidiReactor::tryWriteNext() {
{
absl::MutexLock lk(&requests_mtx_);
if (requests_.empty()) {
@@ -107,16 +126,15 @@ void MetricBidiReactor::fireWrite() {
bool expected = false;
if (inflight_.compare_exchange_strong(expected, true,
std::memory_order_relaxed)) {
absl::MutexLock lk(&requests_mtx_);
- request_ = std::move(*requests_.begin());
- requests_.erase(requests_.begin());
SPDLOG_DEBUG("MetricBidiReactor#StartWrite");
- StartWrite(&request_);
+ StartWrite(&(requests_.front()));
}
}
void MetricBidiReactor::fireRead() {
bool expected = false;
if (read_.compare_exchange_strong(expected, true,
std::memory_order_relaxed)) {
+ AddHold();
StartRead(&response_);
}
}
diff --git a/cpp/source/stats/include/MetricBidiReactor.h
b/cpp/source/stats/include/MetricBidiReactor.h
index 0a10cd88..e4d75344 100644
--- a/cpp/source/stats/include/MetricBidiReactor.h
+++ b/cpp/source/stats/include/MetricBidiReactor.h
@@ -16,6 +16,8 @@
*/
#pragma once
+#include <list>
+
#include "Client.h"
#include "grpcpp/grpcpp.h"
#include "grpcpp/impl/codegen/client_callback.h"
@@ -25,12 +27,17 @@ ROCKETMQ_NAMESPACE_BEGIN
class OpencensusExporter;
-using ExportMetricsServiceRequest =
opencensus::proto::agent::metrics::v1::ExportMetricsServiceRequest;
-using ExportMetricsServiceResponse =
opencensus::proto::agent::metrics::v1::ExportMetricsServiceResponse;
+using ExportMetricsServiceRequest =
+ opencensus::proto::agent::metrics::v1::ExportMetricsServiceRequest;
+using ExportMetricsServiceResponse =
+ opencensus::proto::agent::metrics::v1::ExportMetricsServiceResponse;
-class MetricBidiReactor : public
grpc::ClientBidiReactor<ExportMetricsServiceRequest,
ExportMetricsServiceResponse> {
+class MetricBidiReactor
+ : public grpc::ClientBidiReactor<ExportMetricsServiceRequest,
+ ExportMetricsServiceResponse> {
public:
- MetricBidiReactor(std::weak_ptr<Client> client,
std::weak_ptr<OpencensusExporter> exporter);
+ MetricBidiReactor(std::weak_ptr<Client> client,
+ std::weak_ptr<OpencensusExporter> exporter);
/// Notifies the application that a StartRead operation completed.
///
@@ -52,7 +59,7 @@ public:
/// (like failure to remove a hold).
///
/// \param[in] s The status outcome of this RPC
- void OnDone(const grpc::Status& /*s*/) override;
+ void OnDone(const grpc::Status & /*s*/) override;
void write(ExportMetricsServiceRequest request)
LOCKS_EXCLUDED(requests_mtx_);
@@ -61,9 +68,8 @@ private:
std::weak_ptr<OpencensusExporter> exporter_;
grpc::ClientContext context_;
- ExportMetricsServiceRequest request_;
-
- std::vector<ExportMetricsServiceRequest> requests_ GUARDED_BY(requests_mtx_);
+ /// Pending ExportMetricsServiceRequest items to write to server
+ std::list<ExportMetricsServiceRequest> requests_ GUARDED_BY(requests_mtx_);
absl::Mutex requests_mtx_;
std::atomic_bool inflight_{false};
@@ -71,7 +77,7 @@ private:
ExportMetricsServiceResponse response_;
- void fireWrite();
+ void tryWriteNext();
void fireRead();
};