This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new a363f417 Fix crash issue of TelemetryBidiReactor and MetricBidiReactor
(#809)
a363f417 is described below
commit a363f417c76cd3ae6777e433b64456f30d30b5b1
Author: Wang Chuan <[email protected]>
AuthorDate: Tue Aug 6 17:44:51 2024 +0800
Fix crash issue of TelemetryBidiReactor and MetricBidiReactor (#809)
* Fix crash issue of TelemetryBidiReactor and MetricBidiReactor
* Update TelemetryBidiReactor.cpp
* Update TelemetryBidiReactor.h
* fix: remove macos-11 as there is no runner of this type available
Signed-off-by: Zhanhui Li <[email protected]>
---------
Signed-off-by: Zhanhui Li <[email protected]>
Co-authored-by: Zhanhui Li <[email protected]>
---
.github/workflows/cpp_build.yml | 3 +-
cpp/source/client/SessionImpl.cpp | 4 +-
cpp/source/client/TelemetryBidiReactor.cpp | 330 +++++------------------
cpp/source/client/include/TelemetryBidiReactor.h | 52 +---
cpp/source/stats/MetricBidiReactor.cpp | 6 +-
cpp/source/stats/OpencensusExporter.cpp | 18 +-
cpp/source/stats/include/MetricBidiReactor.h | 4 +-
7 files changed, 104 insertions(+), 313 deletions(-)
diff --git a/.github/workflows/cpp_build.yml b/.github/workflows/cpp_build.yml
index 785995bd..7973881a 100644
--- a/.github/workflows/cpp_build.yml
+++ b/.github/workflows/cpp_build.yml
@@ -9,8 +9,9 @@ jobs:
fail-fast: false
matrix:
# Disable VS 2022 before
https://github.com/bazelbuild/bazel/issues/18592 issue is solved
+ # Remove macos-11 since there is no such runner available
# os: [ubuntu-20.04, ubuntu-22.04, macos-11, macos-12, windows-2019,
windows-2022]
- os: [ubuntu-20.04, ubuntu-22.04, macos-11, macos-12, windows-2019]
+ os: [ubuntu-20.04, ubuntu-22.04, macos-12, windows-2019]
steps:
- uses: actions/checkout@v2
- name: Compile On Linux
diff --git a/cpp/source/client/SessionImpl.cpp
b/cpp/source/client/SessionImpl.cpp
index 36416829..b3f8b73b 100644
--- a/cpp/source/client/SessionImpl.cpp
+++ b/cpp/source/client/SessionImpl.cpp
@@ -29,7 +29,7 @@ SessionImpl::SessionImpl(std::weak_ptr<Client> client,
std::shared_ptr<RpcClient
}
bool SessionImpl::await() {
- return telemetry_->await();
+ return telemetry_->awaitApplyingSettings();
}
void SessionImpl::syncSettings() {
@@ -41,7 +41,7 @@ void SessionImpl::syncSettings() {
}
SessionImpl::~SessionImpl() {
- telemetry_->fireClose();
+ telemetry_->close();
SPDLOG_DEBUG("Session for {} destructed", rpc_client_->remoteAddress());
}
diff --git a/cpp/source/client/TelemetryBidiReactor.cpp
b/cpp/source/client/TelemetryBidiReactor.cpp
index d75ac361..e0a83a28 100644
--- a/cpp/source/client/TelemetryBidiReactor.cpp
+++ b/cpp/source/client/TelemetryBidiReactor.cpp
@@ -37,87 +37,50 @@
TelemetryBidiReactor::TelemetryBidiReactor(std::weak_ptr<Client> client,
std::string peer_address)
: client_(client),
peer_address_(std::move(peer_address)),
- read_state_(StreamState::Created),
- write_state_(StreamState::Created) {
+ state_(StreamState::Ready) {
auto ptr = client_.lock();
auto deadline = std::chrono::system_clock::now() + std::chrono::hours(1);
context_.set_deadline(deadline);
+ sync_settings_future_ = sync_settings_promise_.get_future();
Metadata metadata;
Signature::sign(ptr->config(), metadata);
for (const auto& entry : metadata) {
context_.AddMetadata(entry.first, entry.second);
}
stub->async()->Telemetry(&context_, this);
- write_state_ = StreamState::Ready;
- // Increase hold for write stream.
+ StartRead(&read_);
+ // for read stream
AddHold();
StartCall();
}
TelemetryBidiReactor::~TelemetryBidiReactor() {
- SPDLOG_INFO("Telemetry stream for {} destructed. ReadStreamState={},
WriteStreamState={}", peer_address_,
- static_cast<std::uint8_t>(read_state_),
static_cast<std::uint8_t>(read_state_));
+ SPDLOG_INFO("Telemetry stream for {} destructed. StreamState={}",
peer_address_, static_cast<std::uint8_t>(state_));
}
-bool TelemetryBidiReactor::await() {
- absl::MutexLock lk(&state_mtx_);
- if (StreamState::Created != write_state_) {
- return true;
- }
-
- state_cv_.Wait(&state_mtx_);
- return StreamState::Error != write_state_;
+bool TelemetryBidiReactor::awaitApplyingSettings() {
+ sync_settings_future_.get();
+ return true;
}
void TelemetryBidiReactor::OnWriteDone(bool ok) {
SPDLOG_DEBUG("{}#OnWriteDone", peer_address_);
- if (!ok) {
- RemoveHold();
- {
- absl::MutexLock lk(&state_mtx_);
- SPDLOG_WARN("Failed to write telemetry command {} to {}",
writes_.front().DebugString(), peer_address_);
- write_state_ = StreamState::Error;
-
- // Sync read state.
- switch (read_state_) {
- case StreamState::Created:
- case StreamState::Ready: {
- SPDLOG_DEBUG("Change read-state {} --> {}",
static_cast<std::uint8_t>(read_state_),
- static_cast<std::uint8_t>(StreamState::Closed));
- read_state_ = StreamState::Closed;
- break;
- }
- case StreamState::Inflight: {
- SPDLOG_DEBUG("Change read-state {} --> {}",
static_cast<std::uint8_t>(read_state_),
- static_cast<std::uint8_t>(StreamState::Closing));
- read_state_ = StreamState::Closing;
- break;
- }
- case StreamState::Closing:
- case StreamState::Error:
- case StreamState::Closed: {
- break;
- }
- }
+ // for write stream
+ RemoveHold();
- state_cv_.SignalAll();
- }
+ if (!ok) {
+ SPDLOG_WARN("Failed to write telemetry command {} to {}",
writes_.front().DebugString(), peer_address_);
+ signalClose();
return;
- } else {
- absl::MutexLock lk(&state_mtx_);
- if (StreamState::Inflight == write_state_) {
- write_state_ = StreamState::Ready;
- }
}
- // Check if the read stream has started.
- fireRead();
-
// Remove the command that has been written to server.
{
absl::MutexLock lk(&writes_mtx_);
- writes_.pop_front();
+ if (!writes_.empty()) {
+ writes_.pop_front();
+ }
}
tryWriteNext();
@@ -125,55 +88,26 @@ void TelemetryBidiReactor::OnWriteDone(bool ok) {
void TelemetryBidiReactor::OnReadDone(bool ok) {
SPDLOG_DEBUG("{}#OnReadDone", peer_address_);
+ if (!ok) {
+ // for read stream
+ RemoveHold();
+ SPDLOG_WARN("Failed to read from telemetry stream from {}", peer_address_);
+ signalClose();
+ return;
+ }
+
{
absl::MutexLock lk(&state_mtx_);
- if (!ok) {
- // Remove read hold.
- RemoveHold();
- {
- SPDLOG_DEBUG("Change read-state {} --> {}",
static_cast<std::uint8_t>(read_state_),
- static_cast<std::uint8_t>(StreamState::Error));
- read_state_ = StreamState::Error;
- SPDLOG_WARN("Failed to read from telemetry stream from {}",
peer_address_);
-
- // Sync write state
- switch (write_state_) {
- case StreamState::Created: {
- // Not reachable
- break;
- }
- case StreamState::Ready: {
- write_state_ = StreamState::Closed;
- // There is no inflight write request, remove write hold on its
behalf.
- RemoveHold();
- state_cv_.SignalAll();
- break;
- }
- case StreamState::Inflight: {
- write_state_ = StreamState::Closing;
- break;
- }
- case StreamState::Closing:
- case StreamState::Error:
- case StreamState::Closed: {
- break;
- }
- }
- }
- return;
- } else if (StreamState::Closing == read_state_) {
- SPDLOG_DEBUG("Change read-state {} --> {}",
static_cast<std::uint8_t>(read_state_),
- static_cast<std::uint8_t>(StreamState::Closed));
- read_state_ = StreamState::Closed;
- state_cv_.SignalAll();
+ if (StreamState::Ready != state_) {
return;
}
}
SPDLOG_DEBUG("Read a telemetry command from {}: {}", peer_address_,
read_.DebugString());
- auto ptr = client_.lock();
- if (!ptr) {
+ auto client = client_.lock();
+ if (!client) {
SPDLOG_INFO("Client for {} has destructed", peer_address_);
+ signalClose();
return;
}
@@ -182,14 +116,10 @@ void TelemetryBidiReactor::OnReadDone(bool ok) {
auto settings = read_.settings();
SPDLOG_INFO("Received settings from {}: {}", peer_address_,
settings.DebugString());
applySettings(settings);
+ sync_settings_promise_.set_value(true);
break;
}
case rmq::TelemetryCommand::kRecoverOrphanedTransactionCommand: {
- auto client = client_.lock();
- if (!client) {
- fireClose();
- return;
- }
SPDLOG_DEBUG("Receive orphan transaction command: {}",
read_.DebugString());
auto message =
client->manager()->wrapMessage(read_.release_verify_message_command()->message());
auto raw = const_cast<Message*>(message.get());
@@ -209,19 +139,13 @@ void TelemetryBidiReactor::OnReadDone(bool ok) {
}
case rmq::TelemetryCommand::kVerifyMessageCommand: {
- auto client = client_.lock();
- if (!client) {
- fireClose();
- return;
- }
-
std::weak_ptr<TelemetryBidiReactor> ptr(shared_from_this());
auto cb = [ptr](TelemetryCommand command) {
auto reactor = ptr.lock();
if (!reactor) {
return;
}
- reactor->onVerifyMessageResult(std::move(command));
+ reactor->write(std::move(command));
};
auto message =
client->manager()->wrapMessage(read_.verify_message_command().message());
auto raw = const_cast<Message*>(message.get());
@@ -239,14 +163,9 @@ void TelemetryBidiReactor::OnReadDone(bool ok) {
{
absl::MutexLock lk(&state_mtx_);
- if (StreamState::Inflight == read_state_) {
- SPDLOG_DEBUG("Spawn new read op, read-state={}",
static_cast<std::uint8_t>(read_state_));
+ if (StreamState::Ready == state_) {
+ SPDLOG_DEBUG("Spawn new read op, state={}",
static_cast<std::uint8_t>(state_));
StartRead(&read_);
- } else if (read_state_ == StreamState::Closing) {
- SPDLOG_DEBUG("Change read-state {} --> {}",
static_cast<std::uint8_t>(read_state_),
- static_cast<std::uint8_t>(StreamState::Closed));
- read_state_ = StreamState::Closed;
- state_cv_.SignalAll();
}
}
}
@@ -341,36 +260,13 @@ void TelemetryBidiReactor::applySubscriptionConfig(const
rmq::Settings& settings
client->config().subscriber.receive_batch_size =
settings.subscription().receive_batch_size();
}
-void TelemetryBidiReactor::fireRead() {
- absl::MutexLock lk(&state_mtx_);
- if (StreamState::Created != read_state_) {
- SPDLOG_DEBUG("Further read from {} is not allowded due to
stream-state={}", peer_address_,
- static_cast<std::uint8_t>(read_state_));
- return;
- }
- SPDLOG_DEBUG("Change read-state {} --> {}",
static_cast<std::uint8_t>(read_state_),
- static_cast<std::uint8_t>(StreamState::Ready));
- read_state_ = StreamState::Ready;
- AddHold();
- SPDLOG_DEBUG("Change read-state {} --> {}",
static_cast<std::uint8_t>(read_state_),
- static_cast<std::uint8_t>(StreamState::Inflight));
- read_state_ = StreamState::Inflight;
- StartRead(&read_);
-}
-
void TelemetryBidiReactor::write(TelemetryCommand command) {
SPDLOG_DEBUG("{}#write", peer_address_);
{
absl::MutexLock lk(&state_mtx_);
// Reject incoming write commands if the stream state is closing or has
witnessed some error.
- switch (write_state_) {
- case StreamState::Closing:
- case StreamState::Error:
- case StreamState::Closed:
- return;
- default:
- // no-op
- break;
+ if (StreamState::Ready != state_) {
+ return;
}
}
@@ -383,134 +279,57 @@ void TelemetryBidiReactor::write(TelemetryCommand
command) {
void TelemetryBidiReactor::tryWriteNext() {
SPDLOG_DEBUG("{}#tryWriteNext", peer_address_);
- {
- absl::MutexLock lk(&state_mtx_);
- if (StreamState::Error == write_state_ || StreamState::Closed ==
write_state_) {
- SPDLOG_WARN("Further write to {} is not allowded due to
stream-state={}", peer_address_,
- static_cast<std::uint8_t>(write_state_));
- return;
- }
+ absl::MutexLock lk(&writes_mtx_);
+ if (StreamState::Ready != state_) {
+ SPDLOG_WARN("Further write to {} is not allowed due to stream-state={}",
peer_address_,
+ static_cast<std::uint8_t>(state_));
+ return;
+ }
+ if (writes_.empty()) {
+ SPDLOG_DEBUG("No TelemetryCommand to write. Peer={}", peer_address_);
+ return;
}
- {
- absl::MutexLock lk(&writes_mtx_);
- if (writes_.empty() && StreamState::Closing != write_state_) {
- SPDLOG_DEBUG("No TelemtryCommand to write. Peer={}", peer_address_);
- return;
- }
-
- if (StreamState::Ready == write_state_) {
- write_state_ = StreamState::Inflight;
- }
-
- if (writes_.empty()) {
- // Tell server there is no more write requests.
- StartWritesDone();
- } else {
- SPDLOG_DEBUG("Writing telemetry command to {}: {}", peer_address_,
writes_.front().DebugString());
- StartWrite(&(writes_.front()));
- }
+ if (!writes_.empty()) {
+ SPDLOG_DEBUG("Writing telemetry command to {}: {}", peer_address_,
writes_.front().DebugString());
+ AddHold();
+ StartWrite(&(writes_.front()));
}
}
-void TelemetryBidiReactor::fireClose() {
+void TelemetryBidiReactor::signalClose() {
+ absl::MutexLock lk(&state_mtx_);
+ state_ = StreamState::Closing;
+}
+
+void TelemetryBidiReactor::close() {
SPDLOG_INFO("{}#fireClose", peer_address_);
{
- // Acquire state lock
absl::MutexLock lk(&state_mtx_);
-
- // Transition read state
- switch (read_state_) {
- case StreamState::Created:
- case StreamState::Ready: {
- SPDLOG_DEBUG("Change read-state {} --> {}",
static_cast<std::uint8_t>(read_state_),
- static_cast<std::uint8_t>(StreamState::Closed));
- read_state_ = StreamState::Closed;
- state_cv_.SignalAll();
- break;
- }
-
- case StreamState::Inflight: {
- SPDLOG_DEBUG("Change read-state {} --> {}",
static_cast<std::uint8_t>(read_state_),
- static_cast<std::uint8_t>(StreamState::Closing));
- read_state_ = StreamState::Closing;
- break;
- }
- case StreamState::Closing: {
- break;
- }
- case StreamState::Closed:
- case StreamState::Error: {
- state_cv_.SignalAll();
- break;
- }
- }
-
- // Transition write state
- switch (write_state_) {
- case StreamState::Created:
- case StreamState::Ready:
- case StreamState::Inflight: {
- SPDLOG_DEBUG("Change write-state {} --> {}",
static_cast<std::uint8_t>(read_state_),
- static_cast<std::uint8_t>(StreamState::Closing));
- write_state_ = StreamState::Closing;
- break;
- }
- case StreamState::Closing: {
- break;
- }
- case StreamState::Closed:
- case StreamState::Error: {
- state_cv_.SignalAll();
- break;
- }
+ if (state_ == StreamState::Ready) {
+ state_ = StreamState::Closing;
}
}
- if (StreamState::Closing == write_state_) {
- tryWriteNext();
+ {
+ absl::MutexLock lk(&writes_mtx_);
+ writes_.clear();
}
+ context_.TryCancel();
{
// Acquire state lock
absl::MutexLock lk(&state_mtx_);
- while ((StreamState::Closed != read_state_ && StreamState::Error !=
read_state_) ||
- (StreamState::Closed != write_state_ && StreamState::Error !=
write_state_)) {
+ while (StreamState::Closed != state_) {
if (state_cv_.WaitWithTimeout(&state_mtx_, absl::Seconds(1))) {
- SPDLOG_WARN("StreamState CondVar timed out before getting signalled:
read-state={}, write-state={}",
- static_cast<uint8_t>(read_state_),
static_cast<uint8_t>(write_state_));
+ SPDLOG_WARN("StreamState CondVar timed out before getting signalled:
state={}",
+ static_cast<uint8_t>(state_));
}
}
}
}
-void TelemetryBidiReactor::OnWritesDoneDone(bool ok) {
- SPDLOG_DEBUG("{}#OnWritesDoneDone", peer_address_);
- assert(StreamState::Closing == write_state_);
-
- absl::MutexLock lk(&state_mtx_);
- // Remove the hold for the write stream.
- RemoveHold();
-
- if (!ok) {
- write_state_ = StreamState::Error;
- SPDLOG_WARN("Previous telemetry write to {} failed", peer_address_);
- } else {
- write_state_ = StreamState::Closed;
- SPDLOG_DEBUG("{}#OnWritesDoneDone", peer_address_);
- }
- state_cv_.SignalAll();
-}
-
-void TelemetryBidiReactor::onVerifyMessageResult(TelemetryCommand command) {
- {
- absl::MutexLock lk(&writes_mtx_);
- writes_.emplace_back(command);
- }
- tryWriteNext();
-}
-
/// Notifies the application that all operations associated with this RPC
/// have completed and all Holds have been removed. OnDone provides the RPC
/// status outcome for both successful and failed RPCs and will be called in
@@ -524,20 +343,9 @@ void TelemetryBidiReactor::OnDone(const grpc::Status&
status) {
SPDLOG_WARN("{}#OnDone, status.error_code={}, status.error_message={},
status.error_details={}", peer_address_,
status.error_code(), status.error_message(),
status.error_details());
}
-
{
- SPDLOG_DEBUG("{} notifies awaiting close call", peer_address_);
absl::MutexLock lk(&state_mtx_);
- if (StreamState::Error != read_state_) {
- SPDLOG_DEBUG("Change read-state {} --> {}",
static_cast<std::uint8_t>(read_state_),
- static_cast<std::uint8_t>(StreamState::Closed));
- read_state_ = StreamState::Closed;
- }
- if (StreamState::Error != write_state_) {
- SPDLOG_DEBUG("Change write-state {} --> {}",
static_cast<std::uint8_t>(read_state_),
- static_cast<std::uint8_t>(StreamState::Closed));
- write_state_ = StreamState::Closed;
- }
+ state_ = StreamState::Closed;
state_cv_.SignalAll();
}
@@ -555,12 +363,14 @@ void TelemetryBidiReactor::OnReadInitialMetadataDone(bool
ok) {
SPDLOG_DEBUG("{}#OnReadInitialMetadataDone", peer_address_);
if (!ok) {
- absl::MutexLock lk(&state_mtx_);
- SPDLOG_DEBUG("Change write-state {} --> {}",
static_cast<std::uint8_t>(read_state_),
- static_cast<std::uint8_t>(StreamState::Error));
- read_state_ = StreamState::Error;
- state_cv_.SignalAll();
+ // for read stream
+ // Remove the hold corresponding to AddHold in
TelemetryBidiReactor::TelemetryBidiReactor.
+ RemoveHold();
+
+ SPDLOG_DEBUG("Change state {} --> {}", static_cast<std::uint8_t>(state_),
+ static_cast<std::uint8_t>(StreamState::Closing));
SPDLOG_WARN("Read of initial-metadata failed from {}", peer_address_);
+ signalClose();
return;
}
diff --git a/cpp/source/client/include/TelemetryBidiReactor.h
b/cpp/source/client/include/TelemetryBidiReactor.h
index 3bdbe3d3..4bd58f96 100644
--- a/cpp/source/client/include/TelemetryBidiReactor.h
+++ b/cpp/source/client/include/TelemetryBidiReactor.h
@@ -19,6 +19,7 @@
#include <atomic>
#include <chrono>
#include <cstdint>
+#include <future>
#include <list>
#include <memory>
#include <utility>
@@ -34,29 +35,17 @@ ROCKETMQ_NAMESPACE_BEGIN
enum class StreamState : std::uint8_t
{
- Created = 0,
- Ready = 1,
- Inflight = 2,
- Closing = 3,
- Closed = 4,
- Error = 5,
+ Ready = 0,
+ Closing = 1,
+ Closed = 2,
};
-/// write-stream-state: created --> ready --> inflight --> ready --> ...
-/// --> error
-/// --> closing --> closed
-/// --> closing --> closed
-/// --> error
+/// stream-state: ready --> closing --> closed
///
-///
-/// read-stream-state: created --> ready --> inflight --> inflight
-/// --> closing --> closed
-/// --> error
-/// --> closed
/// requirement:
-/// 1, fireClose --> blocking await till bidireactor is closed;
+/// 1, close --> blocking wait till bidireactor is closed;
/// 2, when session is closed and client is still active, recreate a new
session to accept incoming commands from
-/// server 3, after writing the first Settings telemetry command, launch
the read directional stream
+/// server
///
class TelemetryBidiReactor : public grpc::ClientBidiReactor<TelemetryCommand,
TelemetryCommand>,
public
std::enable_shared_from_this<TelemetryBidiReactor> {
@@ -97,21 +86,12 @@ public:
/// will succeed, and any further Start* should not be called.
void OnWriteDone(bool ok) override;
- /// Notifies the application that a StartWritesDone operation completed. Note
- /// that this is only used on explicit StartWritesDone operations and not for
- /// those that are implicitly invoked as part of a StartWriteLast.
- ///
- /// \param[in] ok Was it successful? If false, the application will later see
- /// the failure reflected as a bad status in OnDone and no
- /// further Start* should be called.
- void OnWritesDoneDone(bool ok) override;
-
/// Core API method to initiate this bidirectional stream.
void write(TelemetryCommand command);
- bool await();
+ bool awaitApplyingSettings();
- void fireClose();
+ void close();
private:
grpc::ClientContext context_;
@@ -140,14 +120,12 @@ private:
*/
std::string peer_address_;
- StreamState read_state_ GUARDED_BY(state_mtx_);
- StreamState write_state_ GUARDED_BY(state_mtx_);
+ StreamState state_ GUARDED_BY(state_mtx_);
absl::Mutex state_mtx_;
absl::CondVar state_cv_;
- void changeStreamStateThenNotify(StreamState state);
-
- void onVerifyMessageResult(TelemetryCommand command);
+ std::promise<bool> sync_settings_promise_;
+ std::future<bool> sync_settings_future_;
void applySettings(const rmq::Settings& settings);
@@ -157,13 +135,9 @@ private:
void applySubscriptionConfig(const rmq::Settings& settings,
std::shared_ptr<Client> client);
- /// Start the read stream.
- ///
- /// Once got the OnReadDone and status is OK, call StartRead immediately.
- void fireRead();
-
/// Attempt to write pending telemetry command to server.
void tryWriteNext() LOCKS_EXCLUDED(state_mtx_, writes_mtx_);
+ void signalClose();
};
ROCKETMQ_NAMESPACE_END
diff --git a/cpp/source/stats/MetricBidiReactor.cpp
b/cpp/source/stats/MetricBidiReactor.cpp
index e6921378..e03e7c61 100644
--- a/cpp/source/stats/MetricBidiReactor.cpp
+++ b/cpp/source/stats/MetricBidiReactor.cpp
@@ -25,12 +25,10 @@
ROCKETMQ_NAMESPACE_BEGIN
-MetricBidiReactor::MetricBidiReactor(std::weak_ptr<Client> client,
std::weak_ptr<OpencensusExporter> exporter)
+MetricBidiReactor::MetricBidiReactor(std::shared_ptr<Client> client,
std::shared_ptr<OpencensusExporter> exporter)
: client_(client), exporter_(exporter) {
- auto ptr = client_.lock();
-
Metadata metadata;
- Signature::sign(ptr->config(), metadata);
+ Signature::sign(client->config(), metadata);
for (const auto& entry : metadata) {
context_.AddMetadata(entry.first, entry.second);
diff --git a/cpp/source/stats/OpencensusExporter.cpp
b/cpp/source/stats/OpencensusExporter.cpp
index effe5120..2c4c187d 100644
--- a/cpp/source/stats/OpencensusExporter.cpp
+++ b/cpp/source/stats/OpencensusExporter.cpp
@@ -167,12 +167,16 @@ void OpencensusExporter::ExportViewData(
const std::vector<std::pair<opencensus::stats::ViewDescriptor,
opencensus::stats::ViewData>>& data) {
opencensus::proto::agent::metrics::v1::ExportMetricsServiceRequest request;
wrap(data, request);
- std::weak_ptr<OpencensusExporter> exporter{shared_from_this()};
if (!bidi_reactor_) {
- bidi_reactor_ = absl::make_unique<MetricBidiReactor>(client_, exporter);
+ auto ptr = client_.lock();
+ if (ptr) {
+ bidi_reactor_ = absl::make_unique<MetricBidiReactor>(ptr,
shared_from_this());
+ } else {
+ SPDLOG_INFO("did not create stream since the client is no longer
available.");
+ }
}
- if (request.metrics_size()) {
+ if (request.metrics_size() && bidi_reactor_) {
SPDLOG_DEBUG("ExportMetricRequest: {}", request.DebugString());
bidi_reactor_->write(request);
} else {
@@ -181,8 +185,12 @@ void OpencensusExporter::ExportViewData(
}
void OpencensusExporter::resetStream() {
- std::weak_ptr<OpencensusExporter> exporter{shared_from_this()};
- bidi_reactor_.reset(new MetricBidiReactor(client_, exporter));
+ auto ptr = client_.lock();
+ if (ptr) {
+ bidi_reactor_.reset(new MetricBidiReactor(ptr, shared_from_this()));
+ } else {
+ SPDLOG_INFO("did not reset stream since the client is no longer
available.");
+ }
}
ROCKETMQ_NAMESPACE_END
diff --git a/cpp/source/stats/include/MetricBidiReactor.h
b/cpp/source/stats/include/MetricBidiReactor.h
index e4d75344..ae49f6e8 100644
--- a/cpp/source/stats/include/MetricBidiReactor.h
+++ b/cpp/source/stats/include/MetricBidiReactor.h
@@ -36,8 +36,8 @@ class MetricBidiReactor
: public grpc::ClientBidiReactor<ExportMetricsServiceRequest,
ExportMetricsServiceResponse> {
public:
- MetricBidiReactor(std::weak_ptr<Client> client,
- std::weak_ptr<OpencensusExporter> exporter);
+ MetricBidiReactor(std::shared_ptr<Client> client,
+ std::shared_ptr<OpencensusExporter> exporter);
/// Notifies the application that a StartRead operation completed.
///