This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch fifo_opt
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/fifo_opt by this push:
new 8a5704ca fix: log sending sending stages
8a5704ca is described below
commit 8a5704cafb46449587b9c06108640a0b00bd4942
Author: Li Zhanhui <[email protected]>
AuthorDate: Sun Apr 14 22:19:57 2024 +0800
fix: log sending sending stages
Signed-off-by: Li Zhanhui <[email protected]>
---
cpp/examples/ExampleFifoProducer.cpp | 5 +++--
cpp/source/rocketmq/FifoProducerPartition.cpp | 18 +++++++++++++++++-
cpp/source/rocketmq/include/FifoProducerImpl.h | 3 ++-
cpp/source/rocketmq/include/FifoProducerPartition.h | 4 +++-
cpp/tools/trouble_shooting.sh | 0
5 files changed, 25 insertions(+), 5 deletions(-)
diff --git a/cpp/examples/ExampleFifoProducer.cpp
b/cpp/examples/ExampleFifoProducer.cpp
index ff3a14db..9d99be36 100644
--- a/cpp/examples/ExampleFifoProducer.cpp
+++ b/cpp/examples/ExampleFifoProducer.cpp
@@ -106,8 +106,8 @@ int main(int argc, char* argv[]) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
auto& logger = getLogger();
- logger.setConsoleLevel(Level::Info);
- logger.setLevel(Level::Info);
+ logger.setConsoleLevel(Level::Debug);
+ logger.setLevel(Level::Debug);
logger.init();
// Access Key/Secret pair may be acquired from management console
@@ -173,6 +173,7 @@ int main(int argc, char* argv[]) {
semaphore->acquire();
producer.send(std::move(message), callback);
+ std::cout << "Cached No." << i << " message" << std::endl;
}
} catch (...) {
std::cerr << "Ah...No!!!" << std::endl;
diff --git a/cpp/source/rocketmq/FifoProducerPartition.cpp
b/cpp/source/rocketmq/FifoProducerPartition.cpp
index 37526f76..94e1c722 100644
--- a/cpp/source/rocketmq/FifoProducerPartition.cpp
+++ b/cpp/source/rocketmq/FifoProducerPartition.cpp
@@ -19,6 +19,7 @@ void FifoProducerPartition::add(FifoContext&& context) {
{
absl::MutexLock lk(&messages_mtx_);
messages_.emplace_back(std::move(context));
+ SPDLOG_DEBUG("{} has {} pending messages after #add", name_,
messages_.size());
}
trySend();
@@ -29,6 +30,11 @@ void FifoProducerPartition::trySend() {
if (inflight_.compare_exchange_strong(expected, true,
std::memory_order_relaxed)) {
absl::MutexLock lk(&messages_mtx_);
+ if (messages_.empty()) {
+ SPDLOG_DEBUG("There is no more messages to send");
+ return;
+ }
+
FifoContext& ctx = messages_.front();
MessageConstPtr message = std::move(ctx.message);
SendCallback send_callback = ctx.callback;
@@ -37,12 +43,22 @@ void FifoProducerPartition::trySend() {
auto fifo_callback = [=](const std::error_code& ec, const SendReceipt&
receipt) mutable {
partition->onComplete(ec, receipt, send_callback);
};
+ SPDLOG_DEBUG("Sending FIFO message from {}", name_);
producer_->send(std::move(message), fifo_callback);
messages_.pop_front();
+ SPDLOG_DEBUG("In addition to the inflight one, there is {} messages
pending in {}", messages_.size(), name_);
+ } else {
+ SPDLOG_DEBUG("There is an inflight message");
}
}
void FifoProducerPartition::onComplete(const std::error_code& ec, const
SendReceipt& receipt, SendCallback& callback) {
+ if (ec) {
+ SPDLOG_INFO("{} completed with a failure: {}", name_, ec.message());
+ } else {
+ SPDLOG_DEBUG("{} completed OK", name_);
+ }
+
if (!ec) {
callback(ec, receipt);
// update inflight status
@@ -50,7 +66,7 @@ void FifoProducerPartition::onComplete(const std::error_code&
ec, const SendRece
if (inflight_.compare_exchange_strong(expected, false,
std::memory_order_relaxed)) {
trySend();
} else {
- SPDLOG_ERROR("Unexpected inflight status");
+ SPDLOG_ERROR("{}: Unexpected inflight status", name_);
}
return;
}
diff --git a/cpp/source/rocketmq/include/FifoProducerImpl.h
b/cpp/source/rocketmq/include/FifoProducerImpl.h
index cc11dcf6..180c3f93 100644
--- a/cpp/source/rocketmq/include/FifoProducerImpl.h
+++ b/cpp/source/rocketmq/include/FifoProducerImpl.h
@@ -6,6 +6,7 @@
#include "FifoProducerPartition.h"
#include "ProducerImpl.h"
+#include "fmt/format.h"
#include "rocketmq/Message.h"
#include "rocketmq/SendCallback.h"
@@ -16,7 +17,7 @@ public:
FifoProducerImpl(std::shared_ptr<ProducerImpl> producer, std::size_t
concurrency)
: producer_(producer), concurrency_(concurrency),
partitions_(concurrency) {
for (auto i = 0; i < concurrency; i++) {
- partitions_[i] = std::make_shared<FifoProducerPartition>(producer_);
+ partitions_[i] = std::make_shared<FifoProducerPartition>(producer_,
fmt::format("slot-{}", i));
}
};
diff --git a/cpp/source/rocketmq/include/FifoProducerPartition.h
b/cpp/source/rocketmq/include/FifoProducerPartition.h
index 406b8fa6..96bb96f6 100644
--- a/cpp/source/rocketmq/include/FifoProducerPartition.h
+++ b/cpp/source/rocketmq/include/FifoProducerPartition.h
@@ -18,7 +18,8 @@ ROCKETMQ_NAMESPACE_BEGIN
class FifoProducerPartition : public
std::enable_shared_from_this<FifoProducerPartition> {
public:
- FifoProducerPartition(std::shared_ptr<ProducerImpl> producer) :
producer_(producer) {
+ FifoProducerPartition(std::shared_ptr<ProducerImpl> producer, std::string&&
name)
+ : producer_(producer), name_(std::move(name)) {
}
void add(FifoContext&& context) LOCKS_EXCLUDED(messages_mtx_);
@@ -32,6 +33,7 @@ private:
std::list<FifoContext> messages_ GUARDED_BY(messages_mtx_);
absl::Mutex messages_mtx_;
std::atomic_bool inflight_{false};
+ std::string name_;
};
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/tools/trouble_shooting.sh b/cpp/tools/trouble_shooting.sh
old mode 100644
new mode 100755