lordgamez commented on code in PR #1400:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1400#discussion_r972927586


##########
libminifi/src/core/Processor.cpp:
##########
@@ -40,9 +41,133 @@ using namespace std::literals::chrono_literals;
 
 namespace org::apache::nifi::minifi::core {
 
-Processor::Processor(const std::string& name)
+ProcessorMetrics::ProcessorMetrics(const Processor& source_processor)
+    : source_processor_(source_processor) {
+  on_trigger_runtimes_.reserve(STORED_ON_TRIGGER_RUNTIME_COUNT);

Review Comment:
   Good idea, added `Averager.h` as a utility in 
6da42b94abc78aaa874eaba0258bf07c49d21660



##########
libminifi/include/core/Processor.h:
##########
@@ -62,10 +79,41 @@ constexpr std::chrono::nanoseconds 
MINIMUM_SCHEDULING_NANOS{30000};
 
 #define BUILDING_DLL 1
 
-class Processor : public Connectable, public ConfigurableComponent {
+class Processor;
+
+class ProcessorMetrics : public state::response::ResponseNode {
+ public:
+  explicit ProcessorMetrics(const Processor& source_processor);
+
+  [[nodiscard]] std::string getName() const override;
+
+  std::vector<state::response::SerializedResponseNode> serialize() override;
+  std::vector<state::PublishedMetric> calculateMetrics() override;
+  void incrementRelationshipTransferCount(const std::string& relationship);
+  std::chrono::milliseconds getAverageOnTriggerRuntime() const;
+  std::chrono::milliseconds getLastOnTriggerRuntime() const;
+  void addLastOnTriggerRuntime(std::chrono::milliseconds runtime);
+
+  std::atomic<size_t> iterations{0};
+  std::atomic<size_t> transferred_flow_files{0};
+  std::atomic<uint64_t> transferred_bytes{0};
+
+ protected:
+  [[nodiscard]] std::unordered_map<std::string, std::string> getCommonLabels() 
const;
+  static const uint8_t STORED_ON_TRIGGER_RUNTIME_COUNT = 10;
+
+  std::mutex relationship_mutex_;

Review Comment:
   Renamed in 6da42b94abc78aaa874eaba0258bf07c49d21660



##########
libminifi/test/unit/ProcessSessionTests.cpp:
##########
@@ -28,22 +28,6 @@
 
 namespace {
 
-class DummyProcessor : public minifi::core::Processor {
-  using minifi::core::Processor::Processor;
-
- public:
-  static constexpr const char* Description = "A processor that does nothing.";
-  static auto properties() { return std::array<core::Property, 0>{}; }
-  static auto relationships() { return std::array<core::Relationship, 0>{}; }
-  static constexpr bool SupportsDynamicProperties = false;
-  static constexpr bool SupportsDynamicRelationships = false;
-  static constexpr core::annotation::Input InputRequirement = 
core::annotation::Input::INPUT_ALLOWED;
-  static constexpr bool IsSingleThreaded = false;
-  ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
-};
-
-REGISTER_RESOURCE(DummyProcessor, Processor);

Review Comment:
   Updated in 6da42b94abc78aaa874eaba0258bf07c49d21660



##########
libminifi/test/unit/MetricsTests.cpp:
##########
@@ -203,3 +208,32 @@ TEST_CASE("RepositorymetricsHaveRepo", "[c2m4]") {
     REQUIRE("0" == size.value);
   }
 }
+
+TEST_CASE("Test ProcessorMetrics", "[ProcessorMetrics]") {
+  DummyProcessor dummy_processor("dummy");
+  minifi::core::ProcessorMetrics metrics(dummy_processor);
+
+  REQUIRE("DummyProcessorMetrics" == metrics.getName());
+
+  REQUIRE(metrics.getLastOnTriggerRuntime() == 0ms);
+  REQUIRE(metrics.getAverageOnTriggerRuntime() == 0ms);
+
+  metrics.addLastOnTriggerRuntime(10ms);
+  metrics.addLastOnTriggerRuntime(20ms);
+  metrics.addLastOnTriggerRuntime(30ms);
+
+  REQUIRE(metrics.getLastOnTriggerRuntime() == 30ms);
+  REQUIRE(metrics.getAverageOnTriggerRuntime() == 20ms);
+
+  for (auto i = 0; i < 10; ++i) {
+    metrics.addLastOnTriggerRuntime(50ms);
+  }
+
+  REQUIRE(metrics.getAverageOnTriggerRuntime() == 50ms);
+  REQUIRE(metrics.getLastOnTriggerRuntime() == 50ms);
+
+  metrics.addLastOnTriggerRuntime(10ms);
+  REQUIRE(metrics.getLastOnTriggerRuntime() == 10ms);
+}

Review Comment:
   Added average check in 6da42b94abc78aaa874eaba0258bf07c49d21660



##########
libminifi/src/core/Processor.cpp:
##########
@@ -40,9 +41,133 @@ using namespace std::literals::chrono_literals;
 
 namespace org::apache::nifi::minifi::core {
 
-Processor::Processor(const std::string& name)
+ProcessorMetrics::ProcessorMetrics(const Processor& source_processor)
+    : source_processor_(source_processor) {
+  on_trigger_runtimes_.reserve(STORED_ON_TRIGGER_RUNTIME_COUNT);
+}
+
+std::string ProcessorMetrics::getName() const {
+  return source_processor_.getProcessorType() + "Metrics";
+}
+
+std::unordered_map<std::string, std::string> 
ProcessorMetrics::getCommonLabels() const {
+  return {{"metric_class", getName()}, {"processor_name", 
source_processor_.getName()}, {"processor_uuid", 
source_processor_.getUUIDStr()}};
+}
+
+std::vector<state::response::SerializedResponseNode> 
ProcessorMetrics::serialize() {
+  std::vector<state::response::SerializedResponseNode> resp;
+
+  state::response::SerializedResponseNode root_node;
+  root_node.name = source_processor_.getUUIDStr();
+
+  state::response::SerializedResponseNode iter;
+  iter.name = "OnTriggerInvocations";
+  iter.value = static_cast<uint32_t>(iterations.load());
+
+  root_node.children.push_back(iter);
+
+  state::response::SerializedResponseNode average_ontrigger_runtime_node;
+  average_ontrigger_runtime_node.name = "AverageOnTriggerRunTime";
+  average_ontrigger_runtime_node.value = 
static_cast<uint64_t>(getAverageOnTriggerRuntime().count());
+
+  root_node.children.push_back(average_ontrigger_runtime_node);
+
+  state::response::SerializedResponseNode last_ontrigger_runtime_node;
+  last_ontrigger_runtime_node.name = "LastOnTriggerRunTime";
+  last_ontrigger_runtime_node.value = 
static_cast<uint64_t>(getLastOnTriggerRuntime().count());
+
+  root_node.children.push_back(last_ontrigger_runtime_node);
+
+  state::response::SerializedResponseNode transferred_flow_files_node;
+  transferred_flow_files_node.name = "TransferredFlowFiles";
+  transferred_flow_files_node.value = 
static_cast<uint32_t>(transferred_flow_files.load());
+
+  root_node.children.push_back(transferred_flow_files_node);
+
+  for (const auto& [relationship, count] : transferred_relationships_) {
+    state::response::SerializedResponseNode transferred_to_relationship_node;
+    transferred_to_relationship_node.name = 
std::string("TransferredTo").append(1, 
toupper(relationship[0])).append(relationship.substr(1));
+    transferred_to_relationship_node.value = static_cast<uint32_t>(count);
+
+    root_node.children.push_back(transferred_to_relationship_node);
+  }

Review Comment:
   Added check in 6da42b94abc78aaa874eaba0258bf07c49d21660



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to