This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop-cpp
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/develop-cpp by this push:
new bb533223 feat: revamp TelemetryBidiReactor states and their transition
graph
bb533223 is described below
commit bb533223b8c5db8990c262bb63f831470fedd480
Author: Li Zhanhui <[email protected]>
AuthorDate: Tue Mar 26 11:59:55 2024 +0800
feat: revamp TelemetryBidiReactor states and their transition graph
Signed-off-by: Li Zhanhui <[email protected]>
---
cpp/source/client/SessionImpl.cpp | 6 +-
cpp/source/client/TelemetryBidiReactor.cpp | 331 +++++++++++++++--------
cpp/source/client/include/TelemetryBidiReactor.h | 61 ++---
3 files changed, 244 insertions(+), 154 deletions(-)
diff --git a/cpp/source/client/SessionImpl.cpp
b/cpp/source/client/SessionImpl.cpp
index 0ca3fff2..36416829 100644
--- a/cpp/source/client/SessionImpl.cpp
+++ b/cpp/source/client/SessionImpl.cpp
@@ -16,12 +16,14 @@
*/
#include "SessionImpl.h"
+
#include "rocketmq/Logger.h"
#include "spdlog/spdlog.h"
ROCKETMQ_NAMESPACE_BEGIN
-SessionImpl::SessionImpl(std::weak_ptr<Client> client,
std::shared_ptr<RpcClient> rpc_client) : client_(client),
rpc_client_(rpc_client) {
+SessionImpl::SessionImpl(std::weak_ptr<Client> client,
std::shared_ptr<RpcClient> rpc_client)
+ : client_(client), rpc_client_(rpc_client) {
telemetry_ = rpc_client->asyncTelemetry(client_);
syncSettings();
}
@@ -39,8 +41,8 @@ void SessionImpl::syncSettings() {
}
SessionImpl::~SessionImpl() {
- SPDLOG_DEBUG("Session for {} destructed", rpc_client_->remoteAddress());
telemetry_->fireClose();
+ SPDLOG_DEBUG("Session for {} destructed", rpc_client_->remoteAddress());
}
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/client/TelemetryBidiReactor.cpp
b/cpp/source/client/TelemetryBidiReactor.cpp
index d45e5058..61796437 100644
--- a/cpp/source/client/TelemetryBidiReactor.cpp
+++ b/cpp/source/client/TelemetryBidiReactor.cpp
@@ -35,7 +35,10 @@ ROCKETMQ_NAMESPACE_BEGIN
TelemetryBidiReactor::TelemetryBidiReactor(std::weak_ptr<Client> client,
rmq::MessagingService::Stub* stub,
std::string peer_address)
- : client_(client), peer_address_(std::move(peer_address)),
stream_state_(StreamState::Active) {
+ : client_(client),
+ peer_address_(std::move(peer_address)),
+ read_state_(StreamState::Created),
+ write_state_(StreamState::Created) {
auto ptr = client_.lock();
auto deadline = std::chrono::system_clock::now() + std::chrono::hours(1);
context_.set_deadline(deadline);
@@ -45,59 +48,67 @@
TelemetryBidiReactor::TelemetryBidiReactor(std::weak_ptr<Client> client,
context_.AddMetadata(entry.first, entry.second);
}
stub->async()->Telemetry(&context_, this);
+ write_state_ = StreamState::Ready;
// Increase hold for write stream.
AddHold();
StartCall();
}
TelemetryBidiReactor::~TelemetryBidiReactor() {
- SPDLOG_INFO("Telemetry stream for {} destructed. StreamState={}",
peer_address_,
- static_cast<std::uint8_t>(stream_state_));
+ SPDLOG_INFO("Telemetry stream for {} destructed. ReadStreamState={},
WriteStreamState={}", peer_address_,
+ static_cast<std::uint8_t>(read_state_),
static_cast<std::uint8_t>(read_state_));
}
bool TelemetryBidiReactor::await() {
- absl::MutexLock lk(&server_setting_received_mtx_);
- if (server_setting_received_) {
+ absl::MutexLock lk(&state_mtx_);
+ if (StreamState::Created != write_state_) {
return true;
}
- server_setting_received_cv_.Wait(&server_setting_received_mtx_);
- return server_setting_received_;
-}
-
-void TelemetryBidiReactor::changeStreamStateThenNotify(StreamState state) {
- absl::MutexLock lk(&stream_state_mtx_);
- if (state == stream_state_) {
- return;
- }
- stream_state_ = state;
- stream_state_cv_.SignalAll();
+ state_cv_.Wait(&state_mtx_);
+ return StreamState::Error != write_state_;
}
void TelemetryBidiReactor::OnWriteDone(bool ok) {
SPDLOG_DEBUG("{}#OnWriteDone", peer_address_);
- {
- bool expected = true;
- if (!command_inflight_.compare_exchange_strong(expected, false,
std::memory_order_relaxed)) {
- SPDLOG_WARN("Illegal command-inflight state");
- return;
- }
- }
if (!ok) {
RemoveHold();
{
- absl::MutexLock lk(&writes_mtx_);
+ absl::MutexLock lk(&state_mtx_);
SPDLOG_WARN("Failed to write telemetry command {} to {}",
writes_.front().DebugString(), peer_address_);
- }
-
- {
- absl::MutexLock lk(&stream_state_mtx_);
- if (streamStateGood()) {
- stream_state_ = StreamState::WriteFailure;
+ write_state_ = StreamState::Error;
+
+ // Sync read state.
+ switch (read_state_) {
+ case StreamState::Created:
+ case StreamState::Ready: {
+ SPDLOG_DEBUG("Change read-state {} --> {}",
static_cast<std::uint8_t>(read_state_),
+ static_cast<std::uint8_t>(StreamState::Closed));
+ read_state_ = StreamState::Closed;
+ break;
+ }
+ case StreamState::Inflight: {
+ SPDLOG_DEBUG("Change read-state {} --> {}",
static_cast<std::uint8_t>(read_state_),
+ static_cast<std::uint8_t>(StreamState::Closing));
+ read_state_ = StreamState::Closing;
+ break;
+ }
+ case StreamState::Closing:
+ case StreamState::Error:
+ case StreamState::Closed: {
+ break;
+ }
}
+
+ state_cv_.SignalAll();
}
return;
+ } else {
+ absl::MutexLock lk(&state_mtx_);
+ if (StreamState::Inflight == write_state_) {
+ write_state_ = StreamState::Ready;
+ }
}
// Check if the read stream has started.
@@ -114,22 +125,49 @@ void TelemetryBidiReactor::OnWriteDone(bool ok) {
void TelemetryBidiReactor::OnReadDone(bool ok) {
SPDLOG_DEBUG("{}#OnReadDone", peer_address_);
- if (!ok) {
- RemoveHold();
- {
- absl::MutexLock lk(&stream_state_mtx_);
- if (streamStateGood() && stream_state_ != StreamState::Closing) {
- stream_state_ = StreamState::ReadFailure;
+ {
+ absl::MutexLock lk(&state_mtx_);
+ if (!ok) {
+ // Remove read hold.
+ RemoveHold();
+ {
+ SPDLOG_DEBUG("Change read-state {} --> {}",
static_cast<std::uint8_t>(read_state_),
+ static_cast<std::uint8_t>(StreamState::Error));
+ read_state_ = StreamState::Error;
SPDLOG_WARN("Faild to read from telemetry stream from {}",
peer_address_);
- }
- bool expected = false;
- if (command_inflight_.compare_exchange_strong(expected, true,
std::memory_order_relaxed)) {
- // There is no inflight write request, remove write hold on its behalf.
- RemoveHold();
+ // Sync write state
+ switch (write_state_) {
+ case StreamState::Created: {
+ // Not reachable
+ break;
+ }
+ case StreamState::Ready: {
+ write_state_ = StreamState::Closed;
+ // There is no inflight write request, remove write hold on its
behalf.
+ RemoveHold();
+ state_cv_.SignalAll();
+ break;
+ }
+ case StreamState::Inflight: {
+ write_state_ = StreamState::Closing;
+ break;
+ }
+ case StreamState::Closing:
+ case StreamState::Error:
+ case StreamState::Closed: {
+ break;
+ }
+ }
}
+ return;
+ } else if (StreamState::Closing == read_state_) {
+ SPDLOG_DEBUG("Change read-state {} --> {}",
static_cast<std::uint8_t>(read_state_),
+ static_cast<std::uint8_t>(StreamState::Closed));
+ read_state_ = StreamState::Closed;
+ state_cv_.SignalAll();
+ return;
}
- return;
}
SPDLOG_DEBUG("Read a telemetry command from {}: {}", peer_address_,
read_.DebugString());
@@ -144,13 +182,6 @@ void TelemetryBidiReactor::OnReadDone(bool ok) {
auto settings = read_.settings();
SPDLOG_INFO("Received settings from {}: {}", peer_address_,
settings.DebugString());
applySettings(settings);
- {
- absl::MutexLock lk(&server_setting_received_mtx_);
- if (!server_setting_received_) {
- server_setting_received_ = true;
- server_setting_received_cv_.SignalAll();
- }
- }
break;
}
case rmq::TelemetryCommand::kRecoverOrphanedTransactionCommand: {
@@ -206,7 +237,18 @@ void TelemetryBidiReactor::OnReadDone(bool ok) {
}
}
- StartRead(&read_);
+ {
+ absl::MutexLock lk(&state_mtx_);
+ if (StreamState::Inflight == read_state_) {
+ SPDLOG_DEBUG("Spawn new read op, read-state={}",
static_cast<std::uint8_t>(read_state_));
+ StartRead(&read_);
+ } else if (read_state_ == StreamState::Closing) {
+ SPDLOG_DEBUG("Change read-state {} --> {}",
static_cast<std::uint8_t>(read_state_),
+ static_cast<std::uint8_t>(StreamState::Closed));
+ read_state_ = StreamState::Closed;
+ state_cv_.SignalAll();
+ }
+ }
}
void TelemetryBidiReactor::applySettings(const rmq::Settings& settings) {
@@ -300,30 +342,35 @@ void TelemetryBidiReactor::applySubscriptionConfig(const
rmq::Settings& settings
}
void TelemetryBidiReactor::fireRead() {
- {
- absl::MutexLock lk(&stream_state_mtx_);
- if (!streamStateGood()) {
- SPDLOG_WARN("Further read from {} is not allowded due to
stream-state={}", peer_address_,
- static_cast<std::uint8_t>(stream_state_));
- return;
- }
- }
-
- bool expected = false;
- if (read_stream_started_.compare_exchange_strong(expected, true,
std::memory_order_relaxed)) {
- // Hold for the read stream.
- AddHold();
- StartRead(&read_);
+ absl::MutexLock lk(&state_mtx_);
+ if (StreamState::Created != read_state_) {
+ SPDLOG_DEBUG("Further read from {} is not allowded due to
stream-state={}", peer_address_,
+ static_cast<std::uint8_t>(read_state_));
+ return;
}
+ SPDLOG_DEBUG("Change read-state {} --> {}",
static_cast<std::uint8_t>(read_state_),
+ static_cast<std::uint8_t>(StreamState::Ready));
+ read_state_ = StreamState::Ready;
+ AddHold();
+ SPDLOG_DEBUG("Change read-state {} --> {}",
static_cast<std::uint8_t>(read_state_),
+ static_cast<std::uint8_t>(StreamState::Inflight));
+ read_state_ = StreamState::Inflight;
+ StartRead(&read_);
}
void TelemetryBidiReactor::write(TelemetryCommand command) {
SPDLOG_DEBUG("{}#write", peer_address_);
{
- absl::MutexLock lk(&stream_state_mtx_);
+ absl::MutexLock lk(&state_mtx_);
// Reject incoming write commands if the stream state is closing or has
witnessed some error.
- if (!streamStateGood() || StreamState::Closing == stream_state_) {
- return;
+ switch (write_state_) {
+ case StreamState::Closing:
+ case StreamState::Error:
+ case StreamState::Closed:
+ return;
+ default:
+ // no-op
+ break;
}
}
@@ -337,84 +384,123 @@ void TelemetryBidiReactor::write(TelemetryCommand
command) {
void TelemetryBidiReactor::tryWriteNext() {
SPDLOG_DEBUG("{}#tryWriteNext", peer_address_);
{
- absl::MutexLock lk(&stream_state_mtx_);
- if (!streamStateGood()) {
+ absl::MutexLock lk(&state_mtx_);
+ if (StreamState::Error == write_state_ || StreamState::Closed ==
write_state_) {
SPDLOG_WARN("Further write to {} is not allowded due to
stream-state={}", peer_address_,
- static_cast<std::uint8_t>(stream_state_));
+ static_cast<std::uint8_t>(write_state_));
return;
}
}
- bool closing = false;
- {
- absl::MutexLock lk(&stream_state_mtx_);
- if (StreamState::Closing == stream_state_) {
- closing = true;
- }
- }
-
{
absl::MutexLock lk(&writes_mtx_);
- if (writes_.empty() && !closing) {
+ if (writes_.empty() && StreamState::Closing != write_state_) {
SPDLOG_DEBUG("No TelemtryCommand to write. Peer={}", peer_address_);
return;
}
- bool expected = false;
- if (command_inflight_.compare_exchange_strong(expected, true,
std::memory_order_relaxed)) {
- if (writes_.empty()) {
- // Tell server there is no more write requests.
- StartWritesDone();
- } else {
- SPDLOG_DEBUG("Writing telemetry command to {}: {}", peer_address_,
writes_.front().DebugString());
- StartWrite(&(writes_.front()));
- }
+ if (StreamState::Ready == write_state_) {
+ write_state_ = StreamState::Inflight;
+ }
+
+ if (writes_.empty()) {
+ // Tell server there is no more write requests.
+ StartWritesDone();
} else {
- SPDLOG_DEBUG("Another command is already on the wire. Peer={}",
peer_address_);
- return;
+ SPDLOG_DEBUG("Writing telemetry command to {}: {}", peer_address_,
writes_.front().DebugString());
+ StartWrite(&(writes_.front()));
}
}
}
void TelemetryBidiReactor::fireClose() {
SPDLOG_INFO("{}#fireClose", peer_address_);
+
{
- absl::MutexLock lk(&stream_state_mtx_);
- if (!streamStateGood()) {
- SPDLOG_WARN("No futher Read/Write call to {} is allowed due to
stream-state={}", peer_address_,
- static_cast<std::uint8_t>(stream_state_));
- return;
+ // Acquire state lock
+ absl::MutexLock lk(&state_mtx_);
+
+ // Transition read state
+ switch (read_state_) {
+ case StreamState::Created:
+ case StreamState::Ready: {
+ SPDLOG_DEBUG("Change read-state {} --> {}",
static_cast<std::uint8_t>(read_state_),
+ static_cast<std::uint8_t>(StreamState::Closed));
+ read_state_ = StreamState::Closed;
+ state_cv_.SignalAll();
+ break;
+ }
+
+ case StreamState::Inflight: {
+ SPDLOG_DEBUG("Change read-state {} --> {}",
static_cast<std::uint8_t>(read_state_),
+ static_cast<std::uint8_t>(StreamState::Closing));
+ read_state_ = StreamState::Closing;
+ break;
+ }
+ case StreamState::Closing: {
+ break;
+ }
+ case StreamState::Closed:
+ case StreamState::Error: {
+ state_cv_.SignalAll();
+ break;
+ }
+ }
+
+ // Transition write state
+ switch (write_state_) {
+ case StreamState::Created:
+ case StreamState::Ready:
+ case StreamState::Inflight: {
+ SPDLOG_DEBUG("Change write-state {} --> {}",
static_cast<std::uint8_t>(read_state_),
+ static_cast<std::uint8_t>(StreamState::Closing));
+ write_state_ = StreamState::Closing;
+ break;
+ }
+ case StreamState::Closing: {
+ break;
+ }
+ case StreamState::Closed:
+ case StreamState::Error: {
+ state_cv_.SignalAll();
+ break;
+ }
}
}
- {
- absl::MutexLock lk(&stream_state_mtx_);
- stream_state_ = StreamState::Closing;
+ if (StreamState::Closing == write_state_) {
+ tryWriteNext();
}
- tryWriteNext();
+
{
- absl::MutexLock lk(&stream_state_mtx_);
- if (stream_state_cv_.WaitWithTimeout(&stream_state_mtx_,
absl::Seconds(3))) {
- SPDLOG_WARN("StreamState CondVar timed out before getting signalled");
+ // Acquire state lock
+ absl::MutexLock lk(&state_mtx_);
+ while ((StreamState::Closed != read_state_ && StreamState::Error !=
read_state_) ||
+ (StreamState::Closed != write_state_ && StreamState::Error !=
write_state_)) {
+ if (state_cv_.WaitWithTimeout(&state_mtx_, absl::Seconds(1))) {
+ SPDLOG_WARN("StreamState CondVar timed out before getting signalled:
read-state={}, write-state={}",
+ static_cast<uint8_t>(read_state_),
static_cast<uint8_t>(write_state_));
+ }
}
}
}
void TelemetryBidiReactor::OnWritesDoneDone(bool ok) {
SPDLOG_DEBUG("{}#OnWritesDoneDone", peer_address_);
+ assert(StreamState::Closing == write_state_);
+
+ absl::MutexLock lk(&state_mtx_);
// Remove the hold for the write stream.
RemoveHold();
if (!ok) {
- absl::MutexLock lk(&stream_state_mtx_);
- if (streamStateGood()) {
- stream_state_ = StreamState::WriteFailure;
- }
+ write_state_ = StreamState::Error;
SPDLOG_WARN("Previous telemetry write to {} failed", peer_address_);
- return;
+ } else {
+ write_state_ = StreamState::Closed;
+ SPDLOG_DEBUG("{}#OnWritesDoneDone", peer_address_);
}
-
- SPDLOG_DEBUG("{}#OnWritesDoneDone", peer_address_);
+ state_cv_.SignalAll();
}
void TelemetryBidiReactor::onVerifyMessageResult(TelemetryCommand command) {
@@ -441,7 +527,18 @@ void TelemetryBidiReactor::OnDone(const grpc::Status&
status) {
{
SPDLOG_DEBUG("{} notifies awaiting close call", peer_address_);
- changeStreamStateThenNotify(StreamState::Closed);
+ absl::MutexLock lk(&state_mtx_);
+ if (StreamState::Error != read_state_) {
+ SPDLOG_DEBUG("Change read-state {} --> {}",
static_cast<std::uint8_t>(read_state_),
+ static_cast<std::uint8_t>(StreamState::Closed));
+ read_state_ = StreamState::Closed;
+ }
+ if (StreamState::Error != write_state_) {
+ SPDLOG_DEBUG("Change write-state {} --> {}",
static_cast<std::uint8_t>(read_state_),
+ static_cast<std::uint8_t>(StreamState::Closed));
+ write_state_ = StreamState::Closed;
+ }
+ state_cv_.SignalAll();
}
auto client = client_.lock();
@@ -456,11 +553,13 @@ void TelemetryBidiReactor::OnDone(const grpc::Status&
status) {
void TelemetryBidiReactor::OnReadInitialMetadataDone(bool ok) {
SPDLOG_DEBUG("{}#OnReadInitialMetadataDone", peer_address_);
+
if (!ok) {
- absl::MutexLock lk(&stream_state_mtx_);
- if (streamStateGood()) {
- stream_state_ = StreamState::ReadInitialMetadataFailure;
- }
+ absl::MutexLock lk(&state_mtx_);
+ SPDLOG_DEBUG("Change write-state {} --> {}",
static_cast<std::uint8_t>(read_state_),
+ static_cast<std::uint8_t>(StreamState::Error));
+ read_state_ = StreamState::Error;
+ state_cv_.SignalAll();
SPDLOG_WARN("Read of initial-metadata failed from {}", peer_address_);
return;
}
@@ -468,8 +567,4 @@ void TelemetryBidiReactor::OnReadInitialMetadataDone(bool
ok) {
SPDLOG_DEBUG("Received initial metadata from {}", peer_address_);
}
-bool TelemetryBidiReactor::streamStateGood() {
- return StreamState::Active == stream_state_ || StreamState::Closing ==
stream_state_;
-}
-
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/client/include/TelemetryBidiReactor.h
b/cpp/source/client/include/TelemetryBidiReactor.h
index 63c6f5e6..77ee484f 100644
--- a/cpp/source/client/include/TelemetryBidiReactor.h
+++ b/cpp/source/client/include/TelemetryBidiReactor.h
@@ -34,21 +34,30 @@ ROCKETMQ_NAMESPACE_BEGIN
enum class StreamState : std::uint8_t
{
- Active = 0,
-
- /// Once the stream enters this state, new write requests shall be rejected
- /// and once currently pending requests are written, write stream should be
- /// closed as soon as possible.
- Closing = 1,
-
- // Once stream state reaches one of the following, Start* should not be
- // called.
- Closed = 2,
- ReadInitialMetadataFailure = 3,
- ReadFailure = 4,
- WriteFailure = 5,
+ Created = 0,
+ Ready = 1,
+ Inflight = 2,
+ Closing = 3,
+ Closed = 4,
+ Error = 5,
};
+/// write-stream-state: created --> ready --> inflight --> ready --> ...
+/// --> error
+/// --> closing --> closed
+/// --> closing --> closed
+/// --> error
+///
+///
+/// read-stream-state: created --> ready --> inflight --> inflight
+/// --> closing --> closed
+/// --> error
+/// --> closed
+/// requirement:
+/// 1, fireClose --> blocking await till bidireactor is closed;
+/// 2, when session is closed and client is still active, recreate a new
session to accept incoming commands from
+/// server 3, after writing the first Setttings telemetry command, launch
the read directional stream
+///
class TelemetryBidiReactor : public grpc::ClientBidiReactor<TelemetryCommand,
TelemetryCommand>,
public
std::enable_shared_from_this<TelemetryBidiReactor> {
public:
@@ -131,20 +140,10 @@ private:
*/
std::string peer_address_;
- /**
- * @brief Indicate if there is a command being written to network.
- */
- std::atomic_bool command_inflight_{false};
-
- std::atomic_bool read_stream_started_{false};
-
- StreamState stream_state_ GUARDED_BY(stream_state_mtx_);
- absl::Mutex stream_state_mtx_;
- absl::CondVar stream_state_cv_;
-
- bool server_setting_received_
GUARDED_BY(server_setting_received_mtx_){false};
- absl::Mutex server_setting_received_mtx_;
- absl::CondVar server_setting_received_cv_;
+ StreamState read_state_ GUARDED_BY(state_mtx_);
+ StreamState write_state_ GUARDED_BY(state_mtx_);
+ absl::Mutex state_mtx_;
+ absl::CondVar state_cv_;
void changeStreamStateThenNotify(StreamState state);
@@ -158,19 +157,13 @@ private:
void applySubscriptionConfig(const rmq::Settings& settings,
std::shared_ptr<Client> client);
- /**
- * Indicate if the underlying gRPC bidirectional stream is good enough to
fire
- * further Start* calls.
- */
- bool streamStateGood() ABSL_EXCLUSIVE_LOCKS_REQUIRED(stream_state_mtx_);
-
/// Start the read stream.
///
/// Once got the OnReadDone and status is OK, call StartRead immediately.
void fireRead();
/// Attempt to write pending telemetry command to server.
- void tryWriteNext();
+ void tryWriteNext() LOCKS_EXCLUDED(state_mtx_, writes_mtx_);
};
ROCKETMQ_NAMESPACE_END
\ No newline at end of file