This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit e7b8e06770ceab9639ffca0e26dddbaaa5c2370c
Author: Ferenc Gerlits <[email protected]>
AuthorDate: Fri Apr 14 15:06:30 2023 +0200

    MINIFICPP-2030 Expose InFlightMessageCounter in PublishMQTT as processor 
metric
    
    Closes #1527
    Signed-off-by: Marton Szasz <[email protected]>
---
 extensions/mqtt/processors/AbstractMQTTProcessor.h |  4 ++--
 extensions/mqtt/processors/PublishMQTT.cpp         | 26 ++++++++++++++++++++++
 extensions/mqtt/processors/PublishMQTT.h           | 16 +++++++++++--
 extensions/mqtt/tests/PublishMQTTTests.cpp         | 21 +++++++++++++++++
 4 files changed, 63 insertions(+), 4 deletions(-)

diff --git a/extensions/mqtt/processors/AbstractMQTTProcessor.h 
b/extensions/mqtt/processors/AbstractMQTTProcessor.h
index 58aca34c0..5808520e4 100644
--- a/extensions/mqtt/processors/AbstractMQTTProcessor.h
+++ b/extensions/mqtt/processors/AbstractMQTTProcessor.h
@@ -36,8 +36,8 @@ static constexpr const char* const MQTT_SECURITY_PROTOCOL_SSL 
= "ssl";
 
 class AbstractMQTTProcessor : public core::Processor {
  public:
-  explicit AbstractMQTTProcessor(std::string name, const utils::Identifier& 
uuid = {})
-      : core::Processor(std::move(name), uuid) {
+  explicit AbstractMQTTProcessor(std::string name, const utils::Identifier& 
uuid = {}, std::shared_ptr<core::ProcessorMetrics> metrics = {})
+      : core::Processor(std::move(name), uuid, std::move(metrics)) {
   }
 
   ~AbstractMQTTProcessor() override {
diff --git a/extensions/mqtt/processors/PublishMQTT.cpp 
b/extensions/mqtt/processors/PublishMQTT.cpp
index 37e453eaa..7629d7ef9 100644
--- a/extensions/mqtt/processors/PublishMQTT.cpp
+++ b/extensions/mqtt/processors/PublishMQTT.cpp
@@ -289,4 +289,30 @@ void PublishMQTT::InFlightMessageCounter::decrease() {
   cv_.notify_one();
 }
 
+uint16_t PublishMQTT::InFlightMessageCounter::getCounter() const {
+  std::lock_guard lock{mutex_};
+  return counter_;
+}
+
+PublishMQTT::PublishMQTTMetrics::PublishMQTTMetrics(const core::Processor& 
source_processor, const InFlightMessageCounter& in_flight_message_counter)
+  : core::ProcessorMetrics(source_processor),
+    in_flight_message_counter_(&in_flight_message_counter) {
+}
+
+std::vector<state::response::SerializedResponseNode> 
PublishMQTT::PublishMQTTMetrics::serialize() {
+  auto metrics_vector = core::ProcessorMetrics::serialize();
+  gsl_Expects(!metrics_vector.empty());
+  auto& metrics = metrics_vector[0];
+
+  state::response::SerializedResponseNode 
in_flight_message_count_node{"InFlightMessageCount", 
static_cast<uint32_t>(in_flight_message_counter_->getCounter())};
+  metrics.children.push_back(in_flight_message_count_node);
+
+  return metrics_vector;
+}
+
+std::vector<state::PublishedMetric> 
PublishMQTT::PublishMQTTMetrics::calculateMetrics() {
+  auto metrics = core::ProcessorMetrics::calculateMetrics();
+  metrics.push_back({"in_flight_message_count", 
static_cast<double>(in_flight_message_counter_->getCounter()), 
getCommonLabels()});
+  return metrics;
+}
 }  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/mqtt/processors/PublishMQTT.h 
b/extensions/mqtt/processors/PublishMQTT.h
index 396810a9d..8be2b146e 100644
--- a/extensions/mqtt/processors/PublishMQTT.h
+++ b/extensions/mqtt/processors/PublishMQTT.h
@@ -38,7 +38,7 @@ namespace org::apache::nifi::minifi::processors {
 class PublishMQTT : public processors::AbstractMQTTProcessor {
  public:
   explicit PublishMQTT(std::string name, const utils::Identifier& uuid = {})
-      : processors::AbstractMQTTProcessor(std::move(name), uuid) {
+      : processors::AbstractMQTTProcessor(std::move(name), uuid, 
std::make_shared<PublishMQTTMetrics>(*this, in_flight_message_counter_)) {
   }
 
   EXTENSIONAPI static constexpr const char* Description = "PublishMQTT 
serializes FlowFile content as an MQTT payload, sending the message to the 
configured topic and broker.";
@@ -84,14 +84,26 @@ class PublishMQTT : public 
processors::AbstractMQTTProcessor {
     void increase();
     void decrease();
 
+    uint16_t getCounter() const;
+
    private:
     bool enabled_ = false;
-    std::mutex mutex_;
+    mutable std::mutex mutex_;
     std::condition_variable cv_;
     uint16_t counter_{0};
     uint16_t limit_{MQTT_MAX_RECEIVE_MAXIMUM};
   };
 
+  class PublishMQTTMetrics : public core::ProcessorMetrics {
+   public:
+    PublishMQTTMetrics(const core::Processor& source_processor, const 
InFlightMessageCounter& in_flight_message_counter);
+    std::vector<state::response::SerializedResponseNode> serialize() override;
+    std::vector<state::PublishedMetric> calculateMetrics() override;
+
+   private:
+    gsl::not_null<const InFlightMessageCounter*> in_flight_message_counter_;
+  };
+
   // MQTT static async callbacks, calling their notify with context being 
pointer to a packaged_task to notify()
   static void sendSuccess(void* context, MQTTAsync_successData* response);
   static void sendSuccess5(void* context, MQTTAsync_successData5* response);
diff --git a/extensions/mqtt/tests/PublishMQTTTests.cpp 
b/extensions/mqtt/tests/PublishMQTTTests.cpp
index 0111da0a8..491bbb1c8 100644
--- a/extensions/mqtt/tests/PublishMQTTTests.cpp
+++ b/extensions/mqtt/tests/PublishMQTTTests.cpp
@@ -16,6 +16,8 @@
  * limitations under the License.
  */
 
+#include "range/v3/algorithm/find_if.hpp"
+
 #include "Catch.h"
 #include "TestBase.h"
 #include "../processors/PublishMQTT.h"
@@ -72,3 +74,22 @@ TEST_CASE_METHOD(Fixture, "PublishMQTTTest_ContentType_V_3", 
"[publishMQTTTest]"
   REQUIRE_NOTHROW(plan_->scheduleProcessor(publishMqttProcessor_));
   REQUIRE(LogTestController::getInstance().contains("[warning] MQTT 3.x 
specification does not support Content Types. Property is not used.", 1s));
 }
+
+TEST_CASE_METHOD(Fixture, "PublishMQTT can publish the number of in-flight 
messages as a metric") {
+  const auto node = publishMqttProcessor_->getResponseNode();
+
+  SECTION("heartbeat metric") {
+    const auto serialized_nodes = 
minifi::state::response::ResponseNode::serializeAndMergeResponseNodes({node});
+    REQUIRE_FALSE(serialized_nodes.empty());
+    const auto it = ranges::find_if(serialized_nodes[0].children, [](const 
auto& metric) { return metric.name == "InFlightMessageCount"; });
+    REQUIRE(it != serialized_nodes[0].children.end());
+    CHECK(it->value == "0");
+  }
+
+  SECTION("Prometheus metric") {
+    const auto metrics = node->calculateMetrics();
+    const auto it = ranges::find_if(metrics, [](const auto& metric) { return 
metric.name == "in_flight_message_count"; });
+    REQUIRE(it != metrics.end());
+    CHECK(it->value == 0.0);
+  }
+}

Reply via email to