This is an automated email from the ASF dual-hosted git repository. szaszm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit e7b8e06770ceab9639ffca0e26dddbaaa5c2370c Author: Ferenc Gerlits <[email protected]> AuthorDate: Fri Apr 14 15:06:30 2023 +0200 MINIFICPP-2030 Expose InFlightMessageCounter in PublishMQTT as processor metric Closes #1527 Signed-off-by: Marton Szasz <[email protected]> --- extensions/mqtt/processors/AbstractMQTTProcessor.h | 4 ++-- extensions/mqtt/processors/PublishMQTT.cpp | 26 ++++++++++++++++++++++ extensions/mqtt/processors/PublishMQTT.h | 16 +++++++++++-- extensions/mqtt/tests/PublishMQTTTests.cpp | 21 +++++++++++++++++ 4 files changed, 63 insertions(+), 4 deletions(-) diff --git a/extensions/mqtt/processors/AbstractMQTTProcessor.h b/extensions/mqtt/processors/AbstractMQTTProcessor.h index 58aca34c0..5808520e4 100644 --- a/extensions/mqtt/processors/AbstractMQTTProcessor.h +++ b/extensions/mqtt/processors/AbstractMQTTProcessor.h @@ -36,8 +36,8 @@ static constexpr const char* const MQTT_SECURITY_PROTOCOL_SSL = "ssl"; class AbstractMQTTProcessor : public core::Processor { public: - explicit AbstractMQTTProcessor(std::string name, const utils::Identifier& uuid = {}) - : core::Processor(std::move(name), uuid) { + explicit AbstractMQTTProcessor(std::string name, const utils::Identifier& uuid = {}, std::shared_ptr<core::ProcessorMetrics> metrics = {}) + : core::Processor(std::move(name), uuid, std::move(metrics)) { } ~AbstractMQTTProcessor() override { diff --git a/extensions/mqtt/processors/PublishMQTT.cpp b/extensions/mqtt/processors/PublishMQTT.cpp index 37e453eaa..7629d7ef9 100644 --- a/extensions/mqtt/processors/PublishMQTT.cpp +++ b/extensions/mqtt/processors/PublishMQTT.cpp @@ -289,4 +289,30 @@ void PublishMQTT::InFlightMessageCounter::decrease() { cv_.notify_one(); } +uint16_t PublishMQTT::InFlightMessageCounter::getCounter() const { + std::lock_guard lock{mutex_}; + return counter_; +} + +PublishMQTT::PublishMQTTMetrics::PublishMQTTMetrics(const core::Processor& source_processor, const InFlightMessageCounter& in_flight_message_counter) + : core::ProcessorMetrics(source_processor), + in_flight_message_counter_(&in_flight_message_counter) { +} + +std::vector<state::response::SerializedResponseNode> PublishMQTT::PublishMQTTMetrics::serialize() { + auto metrics_vector = core::ProcessorMetrics::serialize(); + gsl_Expects(!metrics_vector.empty()); + auto& metrics = metrics_vector[0]; + + state::response::SerializedResponseNode in_flight_message_count_node{"InFlightMessageCount", static_cast<uint32_t>(in_flight_message_counter_->getCounter())}; + metrics.children.push_back(in_flight_message_count_node); + + return metrics_vector; +} + +std::vector<state::PublishedMetric> PublishMQTT::PublishMQTTMetrics::calculateMetrics() { + auto metrics = core::ProcessorMetrics::calculateMetrics(); + metrics.push_back({"in_flight_message_count", static_cast<double>(in_flight_message_counter_->getCounter()), getCommonLabels()}); + return metrics; +} } // namespace org::apache::nifi::minifi::processors diff --git a/extensions/mqtt/processors/PublishMQTT.h b/extensions/mqtt/processors/PublishMQTT.h index 396810a9d..8be2b146e 100644 --- a/extensions/mqtt/processors/PublishMQTT.h +++ b/extensions/mqtt/processors/PublishMQTT.h @@ -38,7 +38,7 @@ namespace org::apache::nifi::minifi::processors { class PublishMQTT : public processors::AbstractMQTTProcessor { public: explicit PublishMQTT(std::string name, const utils::Identifier& uuid = {}) - : processors::AbstractMQTTProcessor(std::move(name), uuid) { + : processors::AbstractMQTTProcessor(std::move(name), uuid, std::make_shared<PublishMQTTMetrics>(*this, in_flight_message_counter_)) { } EXTENSIONAPI static constexpr const char* Description = "PublishMQTT serializes FlowFile content as an MQTT payload, sending the message to the configured topic and broker."; @@ -84,14 +84,26 @@ class PublishMQTT : public processors::AbstractMQTTProcessor { void increase(); void decrease(); + uint16_t getCounter() const; + private: bool enabled_ = false; - std::mutex mutex_; + mutable std::mutex mutex_; std::condition_variable cv_; uint16_t counter_{0}; uint16_t limit_{MQTT_MAX_RECEIVE_MAXIMUM}; }; + class PublishMQTTMetrics : public core::ProcessorMetrics { + public: + PublishMQTTMetrics(const core::Processor& source_processor, const InFlightMessageCounter& in_flight_message_counter); + std::vector<state::response::SerializedResponseNode> serialize() override; + std::vector<state::PublishedMetric> calculateMetrics() override; + + private: + gsl::not_null<const InFlightMessageCounter*> in_flight_message_counter_; + }; + // MQTT static async callbacks, calling their notify with context being pointer to a packaged_task to notify() static void sendSuccess(void* context, MQTTAsync_successData* response); static void sendSuccess5(void* context, MQTTAsync_successData5* response); diff --git a/extensions/mqtt/tests/PublishMQTTTests.cpp b/extensions/mqtt/tests/PublishMQTTTests.cpp index 0111da0a8..491bbb1c8 100644 --- a/extensions/mqtt/tests/PublishMQTTTests.cpp +++ b/extensions/mqtt/tests/PublishMQTTTests.cpp @@ -16,6 +16,8 @@ * limitations under the License. */ +#include "range/v3/algorithm/find_if.hpp" + #include "Catch.h" #include "TestBase.h" #include "../processors/PublishMQTT.h" @@ -72,3 +74,22 @@ TEST_CASE_METHOD(Fixture, "PublishMQTTTest_ContentType_V_3", "[publishMQTTTest]" REQUIRE_NOTHROW(plan_->scheduleProcessor(publishMqttProcessor_)); REQUIRE(LogTestController::getInstance().contains("[warning] MQTT 3.x specification does not support Content Types. Property is not used.", 1s)); } + +TEST_CASE_METHOD(Fixture, "PublishMQTT can publish the number of in-flight messages as a metric") { + const auto node = publishMqttProcessor_->getResponseNode(); + + SECTION("heartbeat metric") { + const auto serialized_nodes = minifi::state::response::ResponseNode::serializeAndMergeResponseNodes({node}); + REQUIRE_FALSE(serialized_nodes.empty()); + const auto it = ranges::find_if(serialized_nodes[0].children, [](const auto& metric) { return metric.name == "InFlightMessageCount"; }); + REQUIRE(it != serialized_nodes[0].children.end()); + CHECK(it->value == "0"); + } + + SECTION("Prometheus metric") { + const auto metrics = node->calculateMetrics(); + const auto it = ranges::find_if(metrics, [](const auto& metric) { return metric.name == "in_flight_message_count"; }); + REQUIRE(it != metrics.end()); + CHECK(it->value == 0.0); + } +}
