This is an automated email from the ASF dual-hosted git repository. lizhanhui pushed a commit to branch develop-cpp in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit 196b6e9c9387d3f5a7a9fe61d4a9e085a783058d Author: Li Zhanhui <[email protected]> AuthorDate: Fri Mar 15 13:12:37 2024 +0800 fix: fix stream state transition with gRPC reactor Signed-off-by: Li Zhanhui <[email protected]> --- cpp/source/client/TelemetryBidiReactor.cpp | 104 +++++++++++++++-------- cpp/source/client/include/TelemetryBidiReactor.h | 60 ++++++++++--- 2 files changed, 119 insertions(+), 45 deletions(-) diff --git a/cpp/source/client/TelemetryBidiReactor.cpp b/cpp/source/client/TelemetryBidiReactor.cpp index a55a7473..827fdda2 100644 --- a/cpp/source/client/TelemetryBidiReactor.cpp +++ b/cpp/source/client/TelemetryBidiReactor.cpp @@ -35,8 +35,8 @@ ROCKETMQ_NAMESPACE_BEGIN TelemetryBidiReactor::TelemetryBidiReactor(std::weak_ptr<Client> client, rmq::MessagingService::Stub* stub, std::string peer_address) - : client_(client), peer_address_(std::move(peer_address)), stream_state_(StreamState::Created) { - auto ptr = client.lock(); + : client_(client), peer_address_(std::move(peer_address)), stream_state_(StreamState::Active) { + auto ptr = client_.lock(); auto deadline = std::chrono::system_clock::now() + std::chrono::hours(1); context_.set_deadline(deadline); Metadata metadata; @@ -45,6 +45,7 @@ TelemetryBidiReactor::TelemetryBidiReactor(std::weak_ptr<Client> client, context_.AddMetadata(entry.first, entry.second); } stub->async()->Telemetry(&context_, this); + fireRead(); StartCall(); } @@ -64,8 +65,6 @@ bool TelemetryBidiReactor::await() { } void TelemetryBidiReactor::OnWriteDone(bool ok) { - SPDLOG_DEBUG("OnWriteDone: {}", ok); - { bool expect = true; if (!command_inflight_.compare_exchange_strong(expect, false, std::memory_order_relaxed)) { @@ -77,36 +76,28 @@ void TelemetryBidiReactor::OnWriteDone(bool ok) { SPDLOG_WARN("Failed to write telemetry command {} to {}", write_.DebugString(), peer_address_); { absl::MutexLock lk(&stream_state_mtx_); - stream_state_ = StreamState::WriteDone; + if (streamStateGood()) { + stream_state_ = StreamState::WriteFailure; + } } - - fireClose(); return; } - { - absl::MutexLock lk(&stream_state_mtx_); - if (StreamState::Created == stream_state_) { - stream_state_ = StreamState::Active; - fireRead(); - } - } - fireWrite(); } void TelemetryBidiReactor::OnReadDone(bool ok) { - SPDLOG_DEBUG("OnReadDone: ok={}", ok); if (!ok) { - if (client_.lock()) { - SPDLOG_WARN("Failed to read telemetry command from {}", peer_address_); - } - { absl::MutexLock lk(&stream_state_mtx_); - stream_state_ = StreamState::ReadDone; + if (!ok) { + if (streamStateGood()) { + stream_state_ = StreamState::ReadFailure; + SPDLOG_WARN("Faild to read from telemetry stream from {}", peer_address_); + } + return; + } } - fireClose(); return; } @@ -283,6 +274,16 @@ void TelemetryBidiReactor::applySubscriptionConfig(const rmq::Settings& settings void TelemetryBidiReactor::fireRead() { SPDLOG_DEBUG("{}#fireRead", peer_address_); + + { + absl::MutexLock lk(&stream_state_mtx_); + if (!streamStateGood()) { + SPDLOG_WARN("Further read from {} is not allowded due to stream-state={}", peer_address_, + static_cast<std::uint8_t>(stream_state_)); + return; + } + } + StartRead(&read_); } @@ -295,13 +296,11 @@ void TelemetryBidiReactor::write(TelemetryCommand command) { } void TelemetryBidiReactor::fireWrite() { - SPDLOG_DEBUG("{}#fireWrite", peer_address_); - { absl::MutexLock lk(&stream_state_mtx_); - if (stream_state_ != StreamState::Active && stream_state_ != StreamState::Created) { - SPDLOG_WARN("TelemetryBidiReactor to {} is closed or half-closed, ignoring fireWrite event. stream-state={}", - peer_address_, static_cast<std::uint8_t>(stream_state_)); + if (!streamStateGood()) { + SPDLOG_WARN("Further write to {} is not allowded due to stream-state={}", peer_address_, + static_cast<std::uint8_t>(stream_state_)); return; } } @@ -328,18 +327,33 @@ void TelemetryBidiReactor::fireWrite() { void TelemetryBidiReactor::fireClose() { SPDLOG_INFO("{}#fireClose", peer_address_); - if (StreamState::Active == stream_state_) { - StartWritesDone(); - { - absl::MutexLock lk(&stream_state_mtx_); - if (StreamState::Active == stream_state_) { - stream_state_cv_.Wait(&stream_state_mtx_); - } + + { + absl::MutexLock lk(&stream_state_mtx_); + if (!streamStateGood()) { + SPDLOG_WARN("No futher Read/Write call to {} is allowed due to stream-state={}", peer_address_, + static_cast<std::uint8_t>(stream_state_)); + return; } } + + StartWritesDone(); + { + absl::MutexLock lk(&stream_state_mtx_); + stream_state_cv_.Wait(&stream_state_mtx_); + } } void TelemetryBidiReactor::OnWritesDoneDone(bool ok) { + if (!ok) { + absl::MutexLock lk(&stream_state_mtx_); + if (streamStateGood()) { + stream_state_ = StreamState::WriteFailure; + } + SPDLOG_WARN("Previous telemetry write to {} failed", peer_address_); + return; + } + SPDLOG_DEBUG("{}#OnWritesDoneDone", peer_address_); } @@ -368,7 +382,10 @@ void TelemetryBidiReactor::OnDone(const grpc::Status& status) { { SPDLOG_DEBUG("{} notifies awaiting close call", peer_address_); absl::MutexLock lk(&stream_state_mtx_); - stream_state_ = StreamState::Closed; + if (streamStateGood()) { + stream_state_ = StreamState::Closed; + } + stream_state_cv_.SignalAll(); } @@ -382,4 +399,21 @@ void TelemetryBidiReactor::OnDone(const grpc::Status& status) { } } +void TelemetryBidiReactor::OnReadInitialMetadataDone(bool ok) { + if (!ok) { + absl::MutexLock lk(&stream_state_mtx_); + if (streamStateGood()) { + stream_state_ = StreamState::ReadInitialMetadataFailure; + } + SPDLOG_WARN("Read of initial-metadata failed from {}", peer_address_); + return; + } + + SPDLOG_DEBUG("Received initial metadata from {}", peer_address_); +} + +bool TelemetryBidiReactor::streamStateGood() { + return StreamState::Active == stream_state_; +} + ROCKETMQ_NAMESPACE_END \ No newline at end of file diff --git a/cpp/source/client/include/TelemetryBidiReactor.h b/cpp/source/client/include/TelemetryBidiReactor.h index aba116eb..72f21951 100644 --- a/cpp/source/client/include/TelemetryBidiReactor.h +++ b/cpp/source/client/include/TelemetryBidiReactor.h @@ -33,11 +33,13 @@ ROCKETMQ_NAMESPACE_BEGIN enum class StreamState : std::uint8_t { - Created = 0, - Active = 1, - ReadDone = 2, - WriteDone = 3, - Closed = 4, + Active = 0, + + // Once stream state reaches one of the following, Start* should not be called. + Closed = 1, + ReadInitialMetadataFailure = 2, + ReadFailure = 3, + WriteFailure = 4, }; class TelemetryBidiReactor : public grpc::ClientBidiReactor<TelemetryCommand, TelemetryCommand>, @@ -47,13 +49,46 @@ public: ~TelemetryBidiReactor(); - void OnWriteDone(bool ok) override; - - void OnWritesDoneDone(bool ok) override; + /// Notifies the application that all operations associated with this RPC + /// have completed and all Holds have been removed. OnDone provides the RPC + /// status outcome for both successful and failed RPCs and will be called in + /// all cases. If it is not called, it indicates an application-level problem + /// (like failure to remove a hold). + /// + /// \param[in] s The status outcome of this RPC + void OnDone(const grpc::Status& status) override; + /// Notifies the application that a read of initial metadata from the + /// server is done. If the application chooses not to implement this method, + /// it can assume that the initial metadata has been read before the first + /// call of OnReadDone or OnDone. + /// + /// \param[in] ok Was the initial metadata read successfully? If false, no + /// new read/write operation will succeed, and any further + /// Start* operations should not be called. + void OnReadInitialMetadataDone(bool /*ok*/) override; + + /// Notifies the application that a StartRead operation completed. + /// + /// \param[in] ok Was it successful? If false, no new read/write operation + /// will succeed, and any further Start* should not be called. void OnReadDone(bool ok) override; - void OnDone(const grpc::Status& status) override; + /// Notifies the application that a StartWrite or StartWriteLast operation + /// completed. + /// + /// \param[in] ok Was it successful? If false, no new read/write operation + /// will succeed, and any further Start* should not be called. + void OnWriteDone(bool ok) override; + + /// Notifies the application that a StartWritesDone operation completed. Note + /// that this is only used on explicit StartWritesDone operations and not for + /// those that are implicitly invoked as part of a StartWriteLast. + /// + /// \param[in] ok Was it successful? If false, the application will later see + /// the failure reflected as a bad status in OnDone and no + /// further Start* should be called. + void OnWritesDoneDone(bool ok) override; void fireRead(); @@ -87,7 +122,7 @@ private: TelemetryCommand write_; /** - * @brief Each TelemetryBidiReactor belongs to a specific client as its owner. + * @brief Each TelemetryBidiReactor belongs to a specific client as its owner. */ std::weak_ptr<Client> client_; @@ -118,6 +153,11 @@ private: void applyPublishingConfig(const rmq::Settings& settings, std::shared_ptr<Client> client); void applySubscriptionConfig(const rmq::Settings& settings, std::shared_ptr<Client> client); + + /** + * Indicate if the underlying gRPC bidirectional stream is good enough to fire further Start* calls. + */ + bool streamStateGood() ABSL_EXCLUSIVE_LOCKS_REQUIRED(stream_state_mtx_); }; ROCKETMQ_NAMESPACE_END \ No newline at end of file
