This is an automated email from the ASF dual-hosted git repository. lizhanhui pushed a commit to branch feature_collect_cache_stats in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit 3a68784d55ab8a91be171b2eec4ddc5efa05d5b5 Author: Li Zhanhui <[email protected]> AuthorDate: Thu Jul 21 15:21:00 2022 +0800 Collect stats for local cache --- .../rocketmq/AsyncReceiveMessageCallback.cpp | 3 +- cpp/source/rocketmq/ProcessQueueImpl.cpp | 59 ++++++---------------- cpp/source/rocketmq/PushConsumerImpl.cpp | 59 ++++++++++++++++++++++ cpp/source/rocketmq/include/ProcessQueue.h | 15 +++--- cpp/source/rocketmq/include/ProcessQueueImpl.h | 48 +++--------------- cpp/source/rocketmq/include/PushConsumerImpl.h | 4 ++ 6 files changed, 91 insertions(+), 97 deletions(-) diff --git a/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp b/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp index c1b57d4..30cdb7a 100644 --- a/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp +++ b/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp @@ -51,8 +51,7 @@ void AsyncReceiveMessageCallback::onCompletion(const std::error_code& ec, const SPDLOG_DEBUG("Receive messages from broker[host={}] returns with status=FOUND, msgListSize={}, queue={}", result.source_host, result.messages.size(), process_queue->simpleName()); - process_queue->cacheMessages(result.messages); - + process_queue->accountCache(result.messages); consumer->getConsumeMessageService()->dispatch(process_queue, result.messages); checkThrottleThenReceive(); } diff --git a/cpp/source/rocketmq/ProcessQueueImpl.cpp b/cpp/source/rocketmq/ProcessQueueImpl.cpp index 70ab77b..16574e6 100644 --- a/cpp/source/rocketmq/ProcessQueueImpl.cpp +++ b/cpp/source/rocketmq/ProcessQueueImpl.cpp @@ -62,6 +62,14 @@ bool ProcessQueueImpl::expired() const { return false; } +std::uint64_t ProcessQueueImpl::cachedMessageQuantity() const { + return cached_message_quantity_.load(std::memory_order_relaxed); +} + +std::uint64_t ProcessQueueImpl::cachedMessageMemory() const { + return cached_message_memory_.load(std::memory_order_relaxed); +} + bool ProcessQueueImpl::shouldThrottle() const { auto consumer = consumer_.lock(); if (!consumer) { @@ -124,57 +132,20 @@ void ProcessQueueImpl::popMessage() { absl::ToChronoMilliseconds(consumer_client->config().subscriber.polling_timeout), callback); } -bool ProcessQueueImpl::hasPendingMessages() const { - absl::MutexLock lk(&messages_mtx_); - return !cached_messages_.empty(); -} - -void ProcessQueueImpl::cacheMessages(const std::vector<MessageConstSharedPtr>& messages) { +void ProcessQueueImpl::accountCache(const std::vector<MessageConstSharedPtr>& messages) { auto consumer = consumer_.lock(); if (!consumer) { return; } - { - absl::MutexLock messages_lock_guard(&messages_mtx_); - for (const auto& message : messages) { - const std::string& msg_id = message->id(); - if (!filter_expression_.accept(*message)) { - const std::string& topic = message->topic(); - auto callback = [topic, msg_id](const std::error_code& ec) { - if (ec) { - SPDLOG_WARN( - "Failed to ack message[Topic={}, MsgId={}] directly as it fails to pass filter expression. Cause: {}", - topic, msg_id, ec.message()); - } else { - SPDLOG_DEBUG("Ack message[Topic={}, MsgId={}] directly as it fails to pass filter expression", topic, - msg_id); - } - }; - consumer->ack(*message, callback); - continue; - } - cached_messages_.emplace_back(message); - cached_message_quantity_.fetch_add(1, std::memory_order_relaxed); - cached_message_memory_.fetch_add(message->body().size(), std::memory_order_relaxed); - } - } -} - -bool ProcessQueueImpl::take(uint32_t batch_size, std::vector<MessageConstSharedPtr>& messages) { - absl::MutexLock lock(&messages_mtx_); - if (cached_messages_.empty()) { - return false; + for (const auto& message : messages) { + cached_message_quantity_.fetch_add(1, std::memory_order_relaxed); + cached_message_memory_.fetch_add(message->body().size(), std::memory_order_relaxed); } - for (auto it = cached_messages_.begin(); it != cached_messages_.end();) { - if (0 == batch_size--) { - break; - } - messages.push_back(*it); - it = cached_messages_.erase(it); - } - return !cached_messages_.empty(); + SPDLOG_DEBUG("Cache of process-queue={} has {} messages, body of them taking up {} bytes", simple_name_, + cached_message_quantity_.load(std::memory_order_relaxed), + cached_message_memory_.load(std::memory_order_relaxed)); } void ProcessQueueImpl::release(uint64_t body_size) { diff --git a/cpp/source/rocketmq/PushConsumerImpl.cpp b/cpp/source/rocketmq/PushConsumerImpl.cpp index 55e9728..c6e67bd 100644 --- a/cpp/source/rocketmq/PushConsumerImpl.cpp +++ b/cpp/source/rocketmq/PushConsumerImpl.cpp @@ -19,7 +19,9 @@ #include <atomic> #include <cassert> #include <chrono> +#include <cstdint> #include <cstdlib> +#include <string> #include <system_error> #include "AsyncReceiveMessageCallback.h" @@ -29,7 +31,9 @@ #include "Protocol.h" #include "RpcClient.h" #include "Signature.h" +#include "Tag.h" #include "google/protobuf/util/time_util.h" +#include "opencensus/stats/stats.h" #include "rocketmq/MQClientException.h" #include "rocketmq/MessageListener.h" @@ -89,9 +93,20 @@ void PushConsumerImpl::start() { scan_assignment_handle_ = client_manager_->getScheduler()->schedule( scan_assignment_functor, SCAN_ASSIGNMENT_TASK_NAME, std::chrono::milliseconds(100), std::chrono::seconds(5)); SPDLOG_INFO("PushConsumer started, groupName={}", client_config_.subscriber.group.name()); + + auto collect_stats_functor = [consumer_weak_ptr] { + auto consumer = consumer_weak_ptr.lock(); + if (consumer) { + consumer->collectCacheStats(); + } + }; + + collect_stats_handle_ = client_manager_->getScheduler()->schedule(collect_stats_functor, COLLECT_STATS_TASK_NAME, + std::chrono::seconds(3), std::chrono::seconds(3)); } const char* PushConsumerImpl::SCAN_ASSIGNMENT_TASK_NAME = "scan-assignment-task"; +const char* PushConsumerImpl::COLLECT_STATS_TASK_NAME = "collect-stats-task"; void PushConsumerImpl::shutdown() { State expecting = State::STARTED; @@ -101,6 +116,11 @@ void PushConsumerImpl::shutdown() { SPDLOG_DEBUG("Scan assignment periodic task cancelled"); } + if (collect_stats_handle_) { + client_manager_->getScheduler()->cancel(collect_stats_handle_); + SPDLOG_DEBUG("Collect cache stats periodic task cancelled"); + } + { absl::MutexLock lock(&process_queue_table_mtx_); process_queue_table_.clear(); @@ -558,4 +578,43 @@ void PushConsumerImpl::onVerifyMessage(MessageConstSharedPtr message, std::funct } } +void PushConsumerImpl::collectCacheStats() { + absl::flat_hash_map<std::string, std::uint64_t> topic_count; + absl::flat_hash_map<std::string, std::uint64_t> topic_memory; + + { + absl::MutexLock lk(&process_queue_table_mtx_); + for (const auto& entry : process_queue_table_) { + auto&& topic = entry.second->topic(); + std::uint64_t cnt = entry.second->cachedMessageQuantity(); + std::uint64_t memory = entry.second->cachedMessageMemory(); + auto it = topic_count.find(topic); + if (it == topic_count.end()) { + topic_count.insert_or_assign(topic, cnt); + } else { + it->second += cnt; + } + + it = topic_memory.find(topic); + if (it == topic_memory.end()) { + topic_memory.insert_or_assign(topic, memory); + } else { + it->second += memory; + } + } + } + + for (const auto& entry : topic_count) { + opencensus::stats::Record({{stats_.cachedMessageQuantity(), entry.second}}, + {{Tag::topicTag(), entry.first}, {Tag::clientIdTag(), client_config_.client_id}}); + SPDLOG_DEBUG("Cache on Quantity {} --> {}", entry.first, entry.second); + } + + for (const auto& entry : topic_memory) { + opencensus::stats::Record({{stats_.cachedMessageBytes(), entry.second}}, + {{Tag::topicTag(), entry.first}, {Tag::clientIdTag(), client_config_.client_id}}); + SPDLOG_DEBUG("Cache on Memory {} --> {}", entry.first, entry.second); + } +} + ROCKETMQ_NAMESPACE_END diff --git a/cpp/source/rocketmq/include/ProcessQueue.h b/cpp/source/rocketmq/include/ProcessQueue.h index ac3576d..0e8ce74 100644 --- a/cpp/source/rocketmq/include/ProcessQueue.h +++ b/cpp/source/rocketmq/include/ProcessQueue.h @@ -16,6 +16,7 @@ */ #pragma once +#include <cstdint> #include <memory> #include "AsyncReceiveMessageCallback.h" @@ -39,19 +40,19 @@ public: virtual void receiveMessage() = 0; - virtual bool hasPendingMessages() const = 0; - virtual std::string topic() const = 0; - virtual bool take(uint32_t batch_size, std::vector<MessageConstSharedPtr>& messages) = 0; - virtual std::weak_ptr<PushConsumerImpl> getConsumer() = 0; virtual const std::string& simpleName() const = 0; virtual void release(uint64_t body_size) = 0; - virtual void cacheMessages(const std::vector<MessageConstSharedPtr>& messages) = 0; + virtual void accountCache(const std::vector<MessageConstSharedPtr>& messages) = 0; + + virtual std::uint64_t cachedMessageQuantity() const = 0; + + virtual std::uint64_t cachedMessageMemory() const = 0; virtual bool shouldThrottle() const = 0; @@ -61,10 +62,6 @@ public: virtual const FilterExpression& getFilterExpression() const = 0; - virtual bool bindFifoConsumeTask() = 0; - - virtual bool unbindFifoConsumeTask() = 0; - virtual const rmq::MessageQueue& messageQueue() const = 0; }; diff --git a/cpp/source/rocketmq/include/ProcessQueueImpl.h b/cpp/source/rocketmq/include/ProcessQueueImpl.h index 6479718..36464fc 100644 --- a/cpp/source/rocketmq/include/ProcessQueueImpl.h +++ b/cpp/source/rocketmq/include/ProcessQueueImpl.h @@ -56,7 +56,7 @@ public: bool expired() const override; - bool shouldThrottle() const override LOCKS_EXCLUDED(messages_mtx_); + bool shouldThrottle() const override; const FilterExpression& getFilterExpression() const override; @@ -74,46 +74,22 @@ public: return message_queue_.topic().name(); } - bool hasPendingMessages() const override LOCKS_EXCLUDED(messages_mtx_); + std::uint64_t cachedMessageQuantity() const override; + + std::uint64_t cachedMessageMemory() const override; /** * Put message fetched from broker into cache. * * @param messages */ - void cacheMessages(const std::vector<MessageConstSharedPtr>& messages) override LOCKS_EXCLUDED(messages_mtx_); - - /** - * @return Number of messages that is not yet dispatched to thread pool, likely, due to topic-rate-limiting. - */ - uint32_t cachedMessagesSize() const LOCKS_EXCLUDED(messages_mtx_) { - absl::MutexLock lk(&messages_mtx_); - return cached_messages_.size(); - } - - /** - * Dispatch messages from cache to thread pool in form of consumeTask. - * @param batch_size - * @param messages - * @return true if there are more messages to consume in cache - */ - bool take(uint32_t batch_size, std::vector<MessageConstSharedPtr>& messages) override LOCKS_EXCLUDED(messages_mtx_); + void accountCache(const std::vector<MessageConstSharedPtr>& messages) override; void syncIdleState() override { idle_since_ = std::chrono::steady_clock::now(); } - void release(uint64_t body_size) override LOCKS_EXCLUDED(messages_mtx_); - - bool unbindFifoConsumeTask() override { - bool expected = true; - return has_fifo_task_bound_.compare_exchange_strong(expected, false, std::memory_order_relaxed); - } - - bool bindFifoConsumeTask() override { - bool expected = false; - return has_fifo_task_bound_.compare_exchange_strong(expected, true, std::memory_order_relaxed); - } + void release(uint64_t body_size) override; const rmq::MessageQueue& messageQueue() const override { return message_queue_; @@ -140,13 +116,6 @@ private: std::shared_ptr<AsyncReceiveMessageCallback> receive_callback_; - /** - * Messages that are pending to be submitted to thread pool. - */ - mutable std::vector<MessageConstSharedPtr> cached_messages_ GUARDED_BY(messages_mtx_); - - mutable absl::Mutex messages_mtx_; - /** * @brief Quantity of the cached messages. * @@ -159,11 +128,6 @@ private: */ std::atomic<uint64_t> cached_message_memory_; - /** - * If this process queue is used in FIFO scenario, this field marks if there is an task in thread pool. - */ - std::atomic_bool has_fifo_task_bound_{false}; - void popMessage(); void wrapPopMessageRequest(absl::flat_hash_map<std::string, std::string>& metadata, rmq::ReceiveMessageRequest& request); diff --git a/cpp/source/rocketmq/include/PushConsumerImpl.h b/cpp/source/rocketmq/include/PushConsumerImpl.h index 95a07f2..cfc13a2 100644 --- a/cpp/source/rocketmq/include/PushConsumerImpl.h +++ b/cpp/source/rocketmq/include/PushConsumerImpl.h @@ -212,11 +212,15 @@ private: int32_t max_delivery_attempts_{MixAll::DEFAULT_MAX_DELIVERY_ATTEMPTS}; ConsumeStats stats_; + std::uintptr_t collect_stats_handle_{0}; + static const char* COLLECT_STATS_TASK_NAME; void fetchRoutes() LOCKS_EXCLUDED(topic_filter_expression_table_mtx_); std::chrono::milliseconds invisibleDuration(std::size_t attempt); + void collectCacheStats() LOCKS_EXCLUDED(process_queue_table_mtx_); + friend class ConsumeMessageService; friend class ConsumeFifoMessageService; friend class ConsumeStandardMessageService;
