lordgamez commented on code in PR #1400:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1400#discussion_r972927409


##########
METRICS.md:
##########
@@ -153,3 +132,40 @@ DeviceInfoNode is a system level metric that reports 
metrics about the system re
 | connection_name | Name of the connection defined in the flow configuration   
  |
 | component_uuid  | UUID of the component                                      
  |
 | component_name  | Name of the component                                      
  |
+
+## Processor Metrics
+
+Processor level metrics can be accessed for any processor provided by MiNiFi. 
These metrics correspond to the name of the processor appended by the "Metrics" 
suffix (e.g. GetFileMetrics, TailFileMetrics, etc.).
+
+### General Metrics
+
+There are general metrics that are available for all processors. Besides these 
metrics processors can implement additional metrics that are speicific to that 
processor.
+
+| Metric name                            | Labels                              
         | Description                                                          
               |
+|----------------------------------------|----------------------------------------------|-------------------------------------------------------------------------------------|
+| average_onTrigger_runtime_milliseconds | metric_class, processor_name, 
processor_uuid | The average runtime in milliseconds of the last 10 onTrigger 
calls of the processor |
+| last_onTrigger_runtime_milliseconds    | metric_class, processor_name, 
processor_uuid | The runtime in milliseconds of the last onTrigger call of the 
processor             |
+| transferred_flow_files                 | metric_class, processor_name, 
processor_uuid | Number of flow files transferred to a relationship             
                     |
+| transferred_bytes                      | metric_class, processor_name, 
processor_uuid | Number of bytes transferred to a relationship                  
                     |
+| transferred_to_\<relationship\>        | metric_class, processor_name, 
processor_uuid | Number of flow files transferred to a specific relationship    
                     |
+
+| Label          | Description                                                 
   |
+|----------------|----------------------------------------------------------------|
+| metric_class   | Class name to filter for this metric, set to GetFileMetrics 
   |

Review Comment:
   Fixed in 6da42b94abc78aaa874eaba0258bf07c49d21660



##########
docker/test/integration/minifi/core/PrometheusChecker.py:
##########
@@ -0,0 +1,90 @@
+import time
+from prometheus_api_client import PrometheusConnect
+
+
+class PrometheusChecker:
+    def __init__(self):
+        self.prometheus_client = PrometheusConnect(url="http://localhost:9090";)

Review Comment:
   It must have been removed by mistake, added in 
6da42b94abc78aaa874eaba0258bf07c49d21660



##########
docker/test/integration/minifi/core/PrometheusChecker.py:
##########
@@ -0,0 +1,90 @@
+import time
+from prometheus_api_client import PrometheusConnect
+
+
+class PrometheusChecker:
+    def __init__(self):
+        self.prometheus_client = PrometheusConnect(url="http://localhost:9090";)
+
+    def wait_for_metric_class_on_prometheus(self, metric_class, 
timeout_seconds):
+        start_time = time.perf_counter()
+        while (time.perf_counter() - start_time) < timeout_seconds:
+            if self.verify_metric_class(metric_class):
+                return True
+            time.sleep(1)
+        return False
+
+    def wait_for_processor_metric_on_prometheus(self, metric_class, 
timeout_seconds, processor_name):
+        start_time = time.perf_counter()
+        while (time.perf_counter() - start_time) < timeout_seconds:
+            if self.verify_processor_metric(metric_class, processor_name):
+                return True
+            time.sleep(1)
+        return False
+
+    def verify_processor_metric(self, metric_class, processor_name):
+        if metric_class == "GetFileMetrics":
+            return self.verify_getfile_metrics(metric_class, processor_name)
+        else:
+            return self.verify_general_processor_metrics(metric_class, 
processor_name)
+
+    def verify_metric_class(self, metric_class):
+        if metric_class == "RepositoryMetrics":
+            return self.verify_repository_metrics()
+        elif metric_class == "QueueMetrics":
+            return self.verify_queue_metrics()
+        elif metric_class == "FlowInformation":
+            return self.verify_flow_information_metrics()
+        elif metric_class == "DeviceInfoNode":
+            return self.verify_device_info_node_metrics()
+        else:
+            raise Exception("Metric class '%s' verification is not 
implemented" % metric_class)
+
+    def verify_repository_metrics(self):
+        label_list = [{'repository_name': 'provenance'}, {'repository_name': 
'flowfile'}]
+        for labels in label_list:
+            if not self.verify_metrics_exist(['minifi_is_running', 
'minifi_is_full', 'minifi_repository_size'], 'RepositoryMetrics', labels):
+                return False
+        return True
+
+    def verify_queue_metrics(self):
+        return self.verify_metrics_exist(['minifi_queue_data_size', 
'minifi_queue_data_size_max', 'minifi_queue_size', 'minifi_queue_size_max'], 
'QueueMetrics')
+
+    def verify_general_processor_metrics(self, metric_class, processor_name):
+        labels = {'processor_name': processor_name}
+        return 
self.verify_metrics_exist(['minifi_average_onTrigger_runtime_milliseconds', 
'minifi_last_onTrigger_runtime_milliseconds'], metric_class, labels) and \
+            
self.verify_metrics_larger_than_zero(['minifi_onTrigger_invocations', 
'minifi_transferred_flow_files', 'minifi_transferred_to_success', 
'minifi_transferred_bytes'], metric_class, labels)
+
+    def verify_getfile_metrics(self, metric_class, processor_name):
+        labels = {'processor_name': processor_name}
+        return self.verify_general_processor_metrics(metric_class, 
processor_name) and \
+            self.verify_metrics_exist(['minifi_input_bytes', 
'minifi_accepted_files'], metric_class, labels)
+
+    def verify_flow_information_metrics(self):
+        return self.verify_metrics_exist(['minifi_queue_data_size', 
'minifi_queue_data_size_max', 'minifi_queue_size', 'minifi_queue_size_max'], 
'FlowInformation') and \
+            self.verify_metric_exists('minifi_is_running', 'FlowInformation', 
{'component_name': 'FlowController'})
+
+    def verify_device_info_node_metrics(self):
+        return self.verify_metrics_exist(['minifi_physical_mem', 
'minifi_memory_usage', 'minifi_cpu_utilization'], 'DeviceInfoNode')
+
+    def verify_metric_exists(self, metric_name, metric_class, labels={}):
+        labels['metric_class'] = metric_class
+        return 
len(self.prometheus_client.get_current_metric_value(metric_name=metric_name, 
label_config=labels)) > 0
+
+    def verify_metrics_exist(self, metric_names, metric_class, labels={}):
+        for metric_name in metric_names:
+            if not self.verify_metric_exists(metric_name, metric_class, 
labels):
+                return False
+        return True
+
+    def verify_metric_larger_than_zero(self, metric_name, metric_class, 
labels={}):
+        labels['metric_class'] = metric_class
+        result = 
self.prometheus_client.get_current_metric_value(metric_name=metric_name, 
label_config=labels)
+        print(result)

Review Comment:
   Yes, good catch, removed in 6da42b94abc78aaa874eaba0258bf07c49d21660



##########
extensions/standard-processors/processors/GetFile.h:
##########
@@ -48,70 +48,46 @@ struct GetFileRequest {
   std::string inputDirectory;
 };
 
-class GetFileMetrics : public state::response::ResponseNode {
+class GetFileMetrics : public core::ProcessorMetrics {
  public:
-  explicit GetFileMetrics(const CoreComponent& source_component)
-    : state::response::ResponseNode("GetFileMetrics"),
-      source_component_(source_component) {
-  }
-
-  std::string getName() const override {
-    return core::Connectable::getName();
+  explicit GetFileMetrics(const core::Processor& source_processor)
+    : core::ProcessorMetrics(source_processor) {
   }
 
   std::vector<state::response::SerializedResponseNode> serialize() override {
-    std::vector<state::response::SerializedResponseNode> resp;
-
-    state::response::SerializedResponseNode root_node;
-    root_node.name = source_component_.getUUIDStr();
-
-    state::response::SerializedResponseNode iter;
-    iter.name = "OnTriggerInvocations";
-    iter.value = (uint32_t)iterations_.load();
+    auto resp = core::ProcessorMetrics::serialize();
+    auto& root_node = resp[0];
 
-    root_node.children.push_back(iter);
+    state::response::SerializedResponseNode accepted_files_node;
+    accepted_files_node.name = "AcceptedFiles";
+    accepted_files_node.value = (uint32_t)accepted_files.load();
 
-    state::response::SerializedResponseNode accepted_files;
-    accepted_files.name = "AcceptedFiles";
-    accepted_files.value = (uint32_t)accepted_files_.load();
+    root_node.children.push_back(accepted_files_node);
 
-    root_node.children.push_back(accepted_files);
+    state::response::SerializedResponseNode input_bytes_node;
+    input_bytes_node.name = "InputBytes";
+    input_bytes_node.value = (uint32_t)input_bytes.load();

Review Comment:
   Good point, updated in 6da42b94abc78aaa874eaba0258bf07c49d21660



##########
extensions/standard-processors/tests/unit/ProcessorTests.cpp:
##########
@@ -822,3 +822,11 @@ TEST_CASE("isSingleThreaded - two threads for a single 
threaded processor", "[is
   REQUIRE(LogTestController::getInstance().contains("[warning] Processor 
myProc can not be run in parallel, its "
                                                     "\"max concurrent tasks\" 
value is too high. It was set to 1 from 2."));
 }
+
+TEST_CASE("Test getProcessorName", "[getProcessorName]") {

Review Comment:
   Fixed in 6da42b94abc78aaa874eaba0258bf07c49d21660



##########
libminifi/include/core/ProcessSession.h:
##########
@@ -37,6 +37,7 @@
 #include "WeakReference.h"
 #include "provenance/Provenance.h"
 #include "utils/gsl.h"
+#include "Processor.h"

Review Comment:
   Good idea, moved in 6da42b94abc78aaa874eaba0258bf07c49d21660



##########
libminifi/include/core/Processor.h:
##########
@@ -29,19 +29,36 @@
 #include <unordered_set>
 #include <unordered_map>
 #include <utility>
+#include <vector>
 
 #include "ConfigurableComponent.h"
 #include "Connectable.h"
 #include "Core.h"
 #include "core/Annotation.h"
 #include "Scheduling.h"
 #include "utils/TimeUtil.h"
+#include "core/state/nodes/MetricsBase.h"
+#include "utils/gsl.h"
+
+#if WIN32

Review Comment:
   Fixed in 6da42b94abc78aaa874eaba0258bf07c49d21660



##########
libminifi/include/core/Processor.h:
##########
@@ -29,19 +29,36 @@
 #include <unordered_set>
 #include <unordered_map>
 #include <utility>
+#include <vector>
 
 #include "ConfigurableComponent.h"
 #include "Connectable.h"
 #include "Core.h"
 #include "core/Annotation.h"
 #include "Scheduling.h"
 #include "utils/TimeUtil.h"
+#include "core/state/nodes/MetricsBase.h"
+#include "utils/gsl.h"
+
+#if WIN32
+#define ADD_GET_PROCESSOR_NAME \
+  std::string getProcessorType() const override { \
+    return org::apache::nifi::minifi::utils::StringUtils::split(__FUNCDNAME__, 
"@")[1]; \
+  }
+#else
+#define ADD_GET_PROCESSOR_NAME \
+  std::string getProcessorType() const override { \
+    auto splitted = 
org::apache::nifi::minifi::utils::StringUtils::split(__PRETTY_FUNCTION__, 
"::"); \
+    return splitted[splitted.size() - 2]; \
+  }
+#endif

Review Comment:
   I was trying to use that previously, but seemed more complicated and did not 
really have an good option for it on Windows. Boost's demangling function would 
be the best option for this, but wouldn't want to introduce that as a strict 
dependency. I think this version is simpler for our use case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to