This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new d357fcc69 MINIFICPP-1848 Create a generic solution for processor 
metrics
d357fcc69 is described below

commit d357fcc69b6e78553ae18dd4200f982421f86159
Author: Gabor Gyimesi <[email protected]>
AuthorDate: Mon Nov 7 17:38:54 2022 +0100

    MINIFICPP-1848 Create a generic solution for processor metrics
    
    Generate common metrics for all MiNiFi processors to be available for 
publishing on C2 and Prometheus interfaces.
    
    Closes #1400
    Signed-off-by: Marton Szasz <[email protected]>
---
 METRICS.md                                         | 105 +++++++++------
 cmake/BuildTests.cmake                             |   2 +-
 .../test/integration/features/prometheus.feature   |   5 +-
 .../integration/minifi/core/DockerTestCluster.py   |  77 +----------
 docker/test/integration/minifi/core/ImageStore.py  |   2 +-
 .../integration/minifi/core/PrometheusChecker.py   |  81 ++++++++++++
 .../tests/GCPCredentialsControllerServiceTests.cpp |  17 +--
 extensions/http-curl/tests/C2MetricsTest.cpp       |   6 +-
 extensions/libarchive/BinFiles.cpp                 |   1 +
 .../standard-processors/processors/GetFile.cpp     |  16 +--
 .../standard-processors/processors/GetFile.h       |  71 +++-------
 .../standard-processors/processors/GetTCP.cpp      |   8 +-
 extensions/standard-processors/processors/GetTCP.h |  51 +------
 .../tests/unit/ProcessorTests.cpp                  |   8 ++
 libminifi/include/core/ProcessSession.h            |  21 ++-
 libminifi/include/core/Processor.h                 |  28 +++-
 libminifi/include/core/ProcessorMetrics.h          |  85 ++++++++++++
 libminifi/include/core/state/nodes/MetricsBase.h   |  40 +-----
 libminifi/src/core/ProcessSession.cpp              |  34 +++--
 libminifi/src/core/Processor.cpp                   |  19 ++-
 libminifi/src/core/ProcessorMetrics.cpp            | 147 +++++++++++++++++++++
 .../src/core/state/nodes/ResponseNodeLoader.cpp    |   7 +-
 libminifi/test/DummyProcessor.cpp                  |  25 ++++
 libminifi/test/DummyProcessor.h                    |  43 ++++++
 .../test/unit/ContentRepositoryDependentTests.h    |  17 +--
 libminifi/test/unit/MetricsTests.cpp               |  46 +++++++
 libminifi/test/unit/ProcessSessionTests.cpp        |  16 ---
 27 files changed, 626 insertions(+), 352 deletions(-)

diff --git a/METRICS.md b/METRICS.md
index fa4b4c7b5..543b67f4c 100644
--- a/METRICS.md
+++ b/METRICS.md
@@ -20,9 +20,18 @@ This readme defines the metrics published by Apache NiFi. 
All options defined ar
 
 ## Table of Contents
 
-- [Description](#description)
-- [Configuration](#configuration)
-- [Metrics](#metrics)
+- [Apache NiFi - MiNiFi - C++ Metrics 
Readme.](#apache-nifi---minifi---c-metrics-readme)
+  - [Table of Contents](#table-of-contents)
+  - [Description](#description)
+  - [Configuration](#configuration)
+  - [System Metrics](#system-metrics)
+    - [QueueMetrics](#queuemetrics)
+    - [RepositoryMetrics](#repositorymetrics)
+    - [DeviceInfoNode](#deviceinfonode)
+    - [FlowInformation](#flowinformation)
+  - [Processor Metrics](#processor-metrics)
+    - [General Metrics](#general-metrics)
+    - [GetFileMetrics](#getfilemetrics)
 
 ## Description
 
@@ -35,22 +44,22 @@ Aside from the publisher exposed metrics, metrics are also 
sent through C2 proto
 
 To configure the a metrics publisher first we have to set which publisher 
class should be used:
 
-       # in minifi.properties
+    # in minifi.properties
 
-       nifi.metrics.publisher.class=PrometheusMetricsPublisher
+    nifi.metrics.publisher.class=PrometheusMetricsPublisher
 
 Currently PrometheusMetricsPublisher is the only available publisher in MiNiFi 
C++ which publishes metrics to a Prometheus server.
 To use the publisher a port should also be configured where the metrics will 
be available to be scraped through:
 
-       # in minifi.properties
+    # in minifi.properties
 
-       nifi.metrics.publisher.PrometheusMetricsPublisher.port=9936
+    nifi.metrics.publisher.PrometheusMetricsPublisher.port=9936
 
 The following option defines which metric classes should be exposed through 
the metrics publisher in configured with a comma separated value:
 
-       # in minifi.properties
+    # in minifi.properties
 
-       
nifi.metrics.publisher.metrics=QueueMetrics,RepositoryMetrics,GetFileMetrics,DeviceInfoNode,FlowInformation
+    
nifi.metrics.publisher.metrics=QueueMetrics,RepositoryMetrics,GetFileMetrics,DeviceInfoNode,FlowInformation
 
 An agent identifier should also be defined to identify which agent the metric 
is exposed from. If not set, the hostname is used as the identifier.
 
@@ -58,7 +67,7 @@ An agent identifier should also be defined to identify which 
agent the metric is
 
        nifi.metrics.publisher.agent.identifier=Agent1
 
-## Metrics
+## System Metrics
 
 The following section defines the currently available metrics to be published 
by the MiNiFi C++ agent.
 
@@ -103,43 +112,15 @@ RepositoryMetrics is a system level metric that reports 
metrics for the register
 
|--------------------------|-----------------------------------------------------------------|
 | repository_name          | Name of the reported repository                   
              |
 
-### GetFileMetrics
-
-Processor level metric that reports metrics for the GetFile processor if 
defined in the flow configuration
-
-| Metric name           | Labels                         | Description         
                           |
-|-----------------------|--------------------------------|------------------------------------------------|
-| onTrigger_invocations | processor_name, processor_uuid | Number of times the 
processor was triggered    |
-| accepted_files        | processor_name, processor_uuid | Number of files 
that matched the set criterias |
-| input_bytes           | processor_name, processor_uuid | Sum of file sizes 
processed                    |
-
-| Label          | Description                                                 
   |
-|----------------|----------------------------------------------------------------|
-| processor_name | Name of the processor                                       
   |
-| processor_uuid | UUID of the processor                                       
   |
-
-### GetTCPMetrics
-
-Processor level metric that reports metrics for the GetTCPMetrics processor if 
defined in the flow configuration
-
-| Metric name           | Labels                         | Description         
                           |
-|-----------------------|--------------------------------|------------------------------------------------|
-| onTrigger_invocations | processor_name, processor_uuid | Number of times the 
processor was triggered    |
-
-| Label          | Description                                                 
   |
-|----------------|----------------------------------------------------------------|
-| processor_name | Name of the processor                                       
   |
-| processor_uuid | UUID of the processor                                       
   |
-
 ### DeviceInfoNode
 
 DeviceInfoNode is a system level metric that reports metrics about the system 
resources used and available
 
-| Metric name     | Labels       | Description               |
-|-----------------|--------------|---------------------------|
-| physical_mem    | -            | Physical memory available |
-| memory_usage    | -            | Memory used by the agent  |
-| cpu_utilization | -            | CPU utilized by the agent |
+| Metric name     | Labels       | Description                         |
+|-----------------|--------------|-------------------------------------|
+| physical_mem    | -            | Physical memory available           |
+| memory_usage    | -            | Physical memory usage of the system |
+| cpu_utilization | -            | CPU utilized by the system          |
 
 ### FlowInformation
 
@@ -159,3 +140,41 @@ DeviceInfoNode is a system level metric that reports 
metrics about the system re
 | connection_name | Name of the connection defined in the flow configuration   
  |
 | component_uuid  | UUID of the component                                      
  |
 | component_name  | Name of the component                                      
  |
+
+## Processor Metrics
+
+Processor level metrics can be accessed for any processor provided by MiNiFi. 
These metrics correspond to the name of the processor appended by the "Metrics" 
suffix (e.g. GetFileMetrics, TailFileMetrics, etc.).
+
+### General Metrics
+
+There are general metrics that are available for all processors. Besides these 
metrics processors can implement additional metrics that are speicific to that 
processor.
+
+| Metric name                            | Labels                              
         | Description                                                          
               |
+|----------------------------------------|----------------------------------------------|-------------------------------------------------------------------------------------|
+| onTrigger_invocations                  | metric_class, processor_name, 
processor_uuid | The number of processor onTrigger calls                        
                     |
+| average_onTrigger_runtime_milliseconds | metric_class, processor_name, 
processor_uuid | The average runtime in milliseconds of the last 10 onTrigger 
calls of the processor |
+| last_onTrigger_runtime_milliseconds    | metric_class, processor_name, 
processor_uuid | The runtime in milliseconds of the last onTrigger call of the 
processor             |
+| transferred_flow_files                 | metric_class, processor_name, 
processor_uuid | Number of flow files transferred to a relationship             
                     |
+| transferred_bytes                      | metric_class, processor_name, 
processor_uuid | Number of bytes transferred to a relationship                  
                     |
+| transferred_to_\<relationship\>        | metric_class, processor_name, 
processor_uuid | Number of flow files transferred to a specific relationship    
                     |
+
+| Label          | Description                                                 
           |
+|----------------|------------------------------------------------------------------------|
+| metric_class   | Class name to filter for this metric, set to \<processor 
type\>Metrics |
+| processor_name | Name of the processor                                       
           |
+| processor_uuid | UUID of the processor                                       
           |
+
+### GetFileMetrics
+
+Processor level metric that reports metrics for the GetFile processor if 
defined in the flow configuration.
+
+| Metric name           | Labels                                       | 
Description                                    |
+|-----------------------|----------------------------------------------|------------------------------------------------|
+| accepted_files        | metric_class, processor_name, processor_uuid | 
Number of files that matched the set criterias |
+| input_bytes           | metric_class, processor_name, processor_uuid | Sum 
of file sizes processed                    |
+
+| Label          | Description                                                 
   |
+|----------------|----------------------------------------------------------------|
+| metric_class   | Class name to filter for this metric, set to GetFileMetrics 
   |
+| processor_name | Name of the processor                                       
   |
+| processor_uuid | UUID of the processor                                       
   |
diff --git a/cmake/BuildTests.cmake b/cmake/BuildTests.cmake
index a0346f41c..62fc03ebf 100644
--- a/cmake/BuildTests.cmake
+++ b/cmake/BuildTests.cmake
@@ -83,7 +83,7 @@ endfunction()
 enable_testing(test)
 
 SET(TEST_BASE_LIB test_base)
-set(TEST_BASE_SOURCES "TestBase.cpp" "RandomServerSocket.cpp" 
"StatefulProcessor.cpp" "WriteToFlowFileTestProcessor.cpp" 
"ReadFromFlowFileTestProcessor.cpp")
+set(TEST_BASE_SOURCES "TestBase.cpp" "RandomServerSocket.cpp" 
"StatefulProcessor.cpp" "WriteToFlowFileTestProcessor.cpp" 
"ReadFromFlowFileTestProcessor.cpp" "DummyProcessor.cpp")
 list(TRANSFORM TEST_BASE_SOURCES PREPEND "${TEST_DIR}/")
 add_library(${TEST_BASE_LIB} STATIC "${TEST_BASE_SOURCES}")
 target_link_libraries(${TEST_BASE_LIB} core-minifi)
diff --git a/docker/test/integration/features/prometheus.feature 
b/docker/test/integration/features/prometheus.feature
index e22aa5faf..8c37a0109 100644
--- a/docker/test/integration/features/prometheus.feature
+++ b/docker/test/integration/features/prometheus.feature
@@ -13,12 +13,15 @@ Feature: MiNiFi can publish metrics to Prometheus server
     Then "RepositoryMetrics" is published to the Prometheus server in less 
than 60 seconds
     And "QueueMetrics" is published to the Prometheus server in less than 60 
seconds
     And "GetFileMetrics" processor metric is published to the Prometheus 
server in less than 60 seconds for "GetFile1" processor
+    And "PutFileMetrics" processor metric is published to the Prometheus 
server in less than 60 seconds for "PutFile" processor
     And "FlowInformation" is published to the Prometheus server in less than 
60 seconds
     And "DeviceInfoNode" is published to the Prometheus server in less than 60 
seconds
 
-   Scenario: Multiple GetFile metrics are reported by Prometheus
+  Scenario: Multiple GetFile metrics are reported by Prometheus
     Given a GetFile processor with the name "GetFile1" and the "Input 
Directory" property set to "/tmp/input"
     And a GetFile processor with the name "GetFile2" and the "Input Directory" 
property set to "/tmp/input"
+    And the "Keep Source File" property of the GetFile1 processor is set to 
"true"
+    And the "Keep Source File" property of the GetFile2 processor is set to 
"true"
     And "GetFile2" processor is a start node
     And a file with the content "test" is present in "/tmp/input"
     And a PutFile processor with the "Directory" property set to "/tmp/output"
diff --git a/docker/test/integration/minifi/core/DockerTestCluster.py 
b/docker/test/integration/minifi/core/DockerTestCluster.py
index 51fa9fd30..e89895dd0 100644
--- a/docker/test/integration/minifi/core/DockerTestCluster.py
+++ b/docker/test/integration/minifi/core/DockerTestCluster.py
@@ -24,10 +24,10 @@ import tempfile
 
 from .LogSource import LogSource
 from .SingleNodeDockerCluster import SingleNodeDockerCluster
+from .PrometheusChecker import PrometheusChecker
 from .utils import retry_check
 from azure.storage.blob import BlobServiceClient
 from azure.core.exceptions import ResourceExistsError
-from prometheus_api_client import PrometheusConnect
 
 
 class DockerTestCluster(SingleNodeDockerCluster):
@@ -314,78 +314,7 @@ class DockerTestCluster(SingleNodeDockerCluster):
                 return container.put_archive(os.path.dirname(dst_path), 
data.read())
 
     def wait_for_metric_class_on_prometheus(self, metric_class, 
timeout_seconds):
-        start_time = time.perf_counter()
-        while (time.perf_counter() - start_time) < timeout_seconds:
-            if self.verify_metric_class(metric_class):
-                return True
-            time.sleep(1)
-        return False
+        return 
PrometheusChecker().wait_for_metric_class_on_prometheus(metric_class, 
timeout_seconds)
 
     def wait_for_processor_metric_on_prometheus(self, metric_class, 
timeout_seconds, processor_name):
-        start_time = time.perf_counter()
-        while (time.perf_counter() - start_time) < timeout_seconds:
-            if self.verify_processor_metric(metric_class, processor_name):
-                return True
-            time.sleep(1)
-        return False
-
-    def verify_processor_metric(self, metric_class, processor_name):
-        if metric_class == "GetFileMetrics":
-            return self.verify_getfile_metrics(processor_name)
-        else:
-            raise Exception("Metric class '%s' verification is not 
implemented" % metric_class)
-
-    def verify_metric_class(self, metric_class):
-        if metric_class == "RepositoryMetrics":
-            return self.verify_repository_metrics()
-        elif metric_class == "QueueMetrics":
-            return self.verify_queue_metrics()
-        elif metric_class == "FlowInformation":
-            return self.verify_flow_information_metrics()
-        elif metric_class == "DeviceInfoNode":
-            return self.verify_device_info_node_metrics()
-        else:
-            raise Exception("Metric class '%s' verification is not 
implemented" % metric_class)
-
-    def verify_repository_metrics(self):
-        prom = PrometheusConnect(url="http://localhost:9090";, disable_ssl=True)
-        label_list = [{'repository_name': 'provenance'}, {'repository_name': 
'flowfile'}]
-        for labels in label_list:
-            if not (self.verify_metric_exists(prom, 'minifi_is_running', 
'RepositoryMetrics', labels)
-                    and self.verify_metric_exists(prom, 'minifi_is_full', 
'RepositoryMetrics', labels)
-                    and self.verify_metric_exists(prom, 
'minifi_repository_size', 'RepositoryMetrics', labels)):
-                return False
-        return True
-
-    def verify_queue_metrics(self):
-        prom = PrometheusConnect(url="http://localhost:9090";, disable_ssl=True)
-        return self.verify_metric_exists(prom, 'minifi_queue_data_size', 
'QueueMetrics') and \
-            self.verify_metric_exists(prom, 'minifi_queue_data_size_max', 
'QueueMetrics') and \
-            self.verify_metric_exists(prom, 'minifi_queue_size', 
'QueueMetrics') and \
-            self.verify_metric_exists(prom, 'minifi_queue_size_max', 
'QueueMetrics')
-
-    def verify_getfile_metrics(self, processor_name):
-        prom = PrometheusConnect(url="http://localhost:9090";, disable_ssl=True)
-        labels = {'processor_name': processor_name}
-        return self.verify_metric_exists(prom, 'minifi_onTrigger_invocations', 
'GetFileMetrics', labels) and \
-            self.verify_metric_exists(prom, 'minifi_accepted_files', 
'GetFileMetrics', labels) and \
-            self.verify_metric_exists(prom, 'minifi_input_bytes', 
'GetFileMetrics', labels)
-
-    def verify_flow_information_metrics(self):
-        prom = PrometheusConnect(url="http://localhost:9090";, disable_ssl=True)
-        return self.verify_metric_exists(prom, 'minifi_queue_data_size', 
'FlowInformation') and \
-            self.verify_metric_exists(prom, 'minifi_queue_data_size_max', 
'FlowInformation') and \
-            self.verify_metric_exists(prom, 'minifi_queue_size', 
'FlowInformation') and \
-            self.verify_metric_exists(prom, 'minifi_queue_size_max', 
'FlowInformation') and \
-            self.verify_metric_exists(prom, 'minifi_is_running', 
'FlowInformation', {'component_name': 'FlowController'})
-
-    def verify_device_info_node_metrics(self):
-        prom = PrometheusConnect(url="http://localhost:9090";, disable_ssl=True)
-        return self.verify_metric_exists(prom, 'minifi_physical_mem', 
'DeviceInfoNode') and \
-            self.verify_metric_exists(prom, 'minifi_memory_usage', 
'DeviceInfoNode') and \
-            self.verify_metric_exists(prom, 'minifi_cpu_utilization', 
'DeviceInfoNode')
-
-    def verify_metric_exists(self, prometheus_client, metric_name, 
metric_class, labels={}):
-        labels['metric_class'] = metric_class
-        labels['agent_identifier'] = "Agent1"
-        return 
len(prometheus_client.get_current_metric_value(metric_name=metric_name, 
label_config=labels)) > 0
+        return 
PrometheusChecker().wait_for_processor_metric_on_prometheus(metric_class, 
timeout_seconds, processor_name)
diff --git a/docker/test/integration/minifi/core/ImageStore.py 
b/docker/test/integration/minifi/core/ImageStore.py
index 23d46afaa..7f45e8273 100644
--- a/docker/test/integration/minifi/core/ImageStore.py
+++ b/docker/test/integration/minifi/core/ImageStore.py
@@ -105,7 +105,7 @@ class ImageStore:
                 RUN echo nifi.metrics.publisher.agent.identifier=Agent1 >> 
{minifi_root}/conf/minifi.properties
                 RUN echo 
nifi.metrics.publisher.class=PrometheusMetricsPublisher >> 
{minifi_root}/conf/minifi.properties
                 RUN echo 
nifi.metrics.publisher.PrometheusMetricsPublisher.port=9936 >> 
{minifi_root}/conf/minifi.properties
-                RUN echo 
nifi.metrics.publisher.metrics=RepositoryMetrics,QueueMetrics,GetFileMetrics,GetTCPMetrics,FlowInformation,DeviceInfoNode
 >> {minifi_root}/conf/minifi.properties
+                RUN echo 
nifi.metrics.publisher.metrics=RepositoryMetrics,QueueMetrics,GetFileMetrics,GetTCPMetrics,PutFileMetrics,FlowInformation,DeviceInfoNode
 >> {minifi_root}/conf/minifi.properties
                 USER minificpp
                 """.format(base_image='apacheminificpp:' + 
MinifiContainer.MINIFI_VERSION,
                            minifi_root=MinifiContainer.MINIFI_ROOT))
diff --git a/docker/test/integration/minifi/core/PrometheusChecker.py 
b/docker/test/integration/minifi/core/PrometheusChecker.py
new file mode 100644
index 000000000..936ae4178
--- /dev/null
+++ b/docker/test/integration/minifi/core/PrometheusChecker.py
@@ -0,0 +1,81 @@
+import time
+from prometheus_api_client import PrometheusConnect
+
+
+class PrometheusChecker:
+    def __init__(self):
+        self.prometheus_client = 
PrometheusConnect(url="http://localhost:9090";, disable_ssl=True)
+
+    def wait_for_metric_class_on_prometheus(self, metric_class, 
timeout_seconds):
+        start_time = time.perf_counter()
+        while (time.perf_counter() - start_time) < timeout_seconds:
+            if self.verify_metric_class(metric_class):
+                return True
+            time.sleep(1)
+        return False
+
+    def wait_for_processor_metric_on_prometheus(self, metric_class, 
timeout_seconds, processor_name):
+        start_time = time.perf_counter()
+        while (time.perf_counter() - start_time) < timeout_seconds:
+            if self.verify_processor_metric(metric_class, processor_name):
+                return True
+            time.sleep(1)
+        return False
+
+    def verify_processor_metric(self, metric_class, processor_name):
+        if metric_class == "GetFileMetrics":
+            return self.verify_getfile_metrics(metric_class, processor_name)
+        else:
+            return self.verify_general_processor_metrics(metric_class, 
processor_name)
+
+    def verify_metric_class(self, metric_class):
+        if metric_class == "RepositoryMetrics":
+            return self.verify_repository_metrics()
+        elif metric_class == "QueueMetrics":
+            return self.verify_queue_metrics()
+        elif metric_class == "FlowInformation":
+            return self.verify_flow_information_metrics()
+        elif metric_class == "DeviceInfoNode":
+            return self.verify_device_info_node_metrics()
+        else:
+            raise Exception("Metric class '%s' verification is not 
implemented" % metric_class)
+
+    def verify_repository_metrics(self):
+        label_list = [{'repository_name': 'provenance'}, {'repository_name': 
'flowfile'}]
+        return all((self.verify_metrics_exist(['minifi_is_running', 
'minifi_is_full', 'minifi_repository_size'], 'RepositoryMetrics', labels) for 
labels in label_list))
+
+    def verify_queue_metrics(self):
+        return self.verify_metrics_exist(['minifi_queue_data_size', 
'minifi_queue_data_size_max', 'minifi_queue_size', 'minifi_queue_size_max'], 
'QueueMetrics')
+
+    def verify_general_processor_metrics(self, metric_class, processor_name):
+        labels = {'processor_name': processor_name}
+        return 
self.verify_metrics_exist(['minifi_average_onTrigger_runtime_milliseconds', 
'minifi_last_onTrigger_runtime_milliseconds'], metric_class, labels) and \
+            
self.verify_metrics_larger_than_zero(['minifi_onTrigger_invocations', 
'minifi_transferred_flow_files', 'minifi_transferred_to_success', 
'minifi_transferred_bytes'], metric_class, labels)
+
+    def verify_getfile_metrics(self, metric_class, processor_name):
+        labels = {'processor_name': processor_name}
+        return self.verify_general_processor_metrics(metric_class, 
processor_name) and \
+            self.verify_metrics_exist(['minifi_input_bytes', 
'minifi_accepted_files'], metric_class, labels)
+
+    def verify_flow_information_metrics(self):
+        return self.verify_metrics_exist(['minifi_queue_data_size', 
'minifi_queue_data_size_max', 'minifi_queue_size', 'minifi_queue_size_max'], 
'FlowInformation') and \
+            self.verify_metric_exists('minifi_is_running', 'FlowInformation', 
{'component_name': 'FlowController'})
+
+    def verify_device_info_node_metrics(self):
+        return self.verify_metrics_exist(['minifi_physical_mem', 
'minifi_memory_usage', 'minifi_cpu_utilization'], 'DeviceInfoNode')
+
+    def verify_metric_exists(self, metric_name, metric_class, labels={}):
+        labels['metric_class'] = metric_class
+        labels['agent_identifier'] = "Agent1"
+        return 
len(self.prometheus_client.get_current_metric_value(metric_name=metric_name, 
label_config=labels)) > 0
+
+    def verify_metrics_exist(self, metric_names, metric_class, labels={}):
+        return all((self.verify_metric_exists(metric_name, metric_class, 
labels) for metric_name in metric_names))
+
+    def verify_metric_larger_than_zero(self, metric_name, metric_class, 
labels={}):
+        labels['metric_class'] = metric_class
+        result = 
self.prometheus_client.get_current_metric_value(metric_name=metric_name, 
label_config=labels)
+        return len(result) > 0 and int(result[0]['value'][1]) > 0
+
+    def verify_metrics_larger_than_zero(self, metric_names, metric_class, 
labels={}):
+        return all((self.verify_metric_larger_than_zero(metric_name, 
metric_class, labels) for metric_name in metric_names))
diff --git a/extensions/gcp/tests/GCPCredentialsControllerServiceTests.cpp 
b/extensions/gcp/tests/GCPCredentialsControllerServiceTests.cpp
index 9254d0902..b26fd7155 100644
--- a/extensions/gcp/tests/GCPCredentialsControllerServiceTests.cpp
+++ b/extensions/gcp/tests/GCPCredentialsControllerServiceTests.cpp
@@ -26,6 +26,7 @@
 #include "rapidjson/stream.h"
 #include "rapidjson/writer.h"
 #include "google/cloud/internal/setenv.h"
+#include "DummyProcessor.h"
 
 using GCPCredentialsControllerService = 
org::apache::nifi::minifi::extensions::gcp::GCPCredentialsControllerService;
 
@@ -62,22 +63,6 @@ std::optional<std::filesystem::path> 
create_mock_json_file(const std::filesystem
   p.close();
   return path;
 }
-
-class DummyProcessor : public org::apache::nifi::minifi::core::Processor {
- public:
-  using minifi::core::Processor::Processor;
-
-  static constexpr const char* Description = "A processor that does nothing.";
-  static auto properties() { return std::array<core::Property, 0>{}; }
-  static auto relationships() { return std::array<core::Relationship, 0>{}; }
-  static constexpr bool SupportsDynamicProperties = false;
-  static constexpr bool SupportsDynamicRelationships = false;
-  static constexpr core::annotation::Input InputRequirement = 
core::annotation::Input::INPUT_ALLOWED;
-  static constexpr bool IsSingleThreaded = false;
-  ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
-};
-
-REGISTER_RESOURCE(DummyProcessor, Processor);
 }  // namespace
 
 class GCPCredentialsTests : public ::testing::Test {
diff --git a/extensions/http-curl/tests/C2MetricsTest.cpp 
b/extensions/http-curl/tests/C2MetricsTest.cpp
index 99fe46e52..0da989316 100644
--- a/extensions/http-curl/tests/C2MetricsTest.cpp
+++ b/extensions/http-curl/tests/C2MetricsTest.cpp
@@ -173,7 +173,11 @@ class MetricsHandler: public HeartbeatHandler {
     return processor_metrics.HasMember("GetTCPMetrics") &&
       processor_metrics["GetTCPMetrics"].HasMember(GETTCP1_UUID) &&
       
processor_metrics["GetTCPMetrics"][GETTCP1_UUID].HasMember("OnTriggerInvocations")
 &&
-      
processor_metrics["GetTCPMetrics"][GETTCP1_UUID]["OnTriggerInvocations"].GetUint()
 > 0;
+      
processor_metrics["GetTCPMetrics"][GETTCP1_UUID]["OnTriggerInvocations"].GetUint()
 > 0 &&
+      
processor_metrics["GetTCPMetrics"][GETTCP1_UUID].HasMember("TransferredFlowFiles")
 &&
+      
processor_metrics["GetTCPMetrics"][GETTCP1_UUID].HasMember("AverageOnTriggerRunTime")
 &&
+      
processor_metrics["GetTCPMetrics"][GETTCP1_UUID].HasMember("LastOnTriggerRunTime")
 &&
+      
processor_metrics["GetTCPMetrics"][GETTCP1_UUID].HasMember("TransferredBytes");
   }
 
   [[nodiscard]] static std::string getReplacementConfigAsJsonValue(const 
std::string& replacement_config_path) {
diff --git a/extensions/libarchive/BinFiles.cpp 
b/extensions/libarchive/BinFiles.cpp
index 82b31c282..cf2fd4446 100644
--- a/extensions/libarchive/BinFiles.cpp
+++ b/extensions/libarchive/BinFiles.cpp
@@ -266,6 +266,7 @@ void BinFiles::onTrigger(const 
std::shared_ptr<core::ProcessContext> &context, c
     // for each merge as a rollback erases all
     // previously added files
     core::ProcessSession mergeSession(context);
+    mergeSession.setMetrics(metrics_);
     std::unique_ptr<Bin> bin = std::move(readyBins.front());
     readyBins.pop_front();
     // add bin's flows to the session
diff --git a/extensions/standard-processors/processors/GetFile.cpp 
b/extensions/standard-processors/processors/GetFile.cpp
index c1451d917..10480b518 100644
--- a/extensions/standard-processors/processors/GetFile.cpp
+++ b/extensions/standard-processors/processors/GetFile.cpp
@@ -136,8 +136,6 @@ void GetFile::onSchedule(core::ProcessContext *context, 
core::ProcessSessionFact
 }
 
 void GetFile::onTrigger(core::ProcessContext* /*context*/, 
core::ProcessSession* session) {
-  metrics_->iterations_++;
-
   const bool is_dir_empty_before_poll = isListingEmpty();
   logger_->log_debug("Listing is %s before polling directory", 
is_dir_empty_before_poll ? "empty" : "not empty");
   if (is_dir_empty_before_poll) {
@@ -194,7 +192,7 @@ bool GetFile::isListingEmpty() const {
   return directory_listing_.empty();
 }
 
-void GetFile::putListing(std::string fileName) {
+void GetFile::putListing(const std::string& fileName) {
   logger_->log_trace("Adding file to queue: %s", fileName);
 
   std::lock_guard<std::mutex> lock(directory_listing_mutex_);
@@ -213,7 +211,7 @@ std::queue<std::string> GetFile::pollListing(uint64_t 
batch_size) {
   return list;
 }
 
-bool GetFile::fileMatchesRequestCriteria(std::string fullName, std::string 
name, const GetFileRequest &request) {
+bool GetFile::fileMatchesRequestCriteria(const std::string& fullName, const 
std::string& name, const GetFileRequest &request) {
   logger_->log_trace("Checking file: %s", fullName);
 
   std::error_code ec;
@@ -248,8 +246,9 @@ bool GetFile::fileMatchesRequestCriteria(std::string 
fullName, std::string name,
     return false;
   }
 
-  metrics_->input_bytes_ += file_size;
-  metrics_->accepted_files_++;
+  auto* const getfile_metrics = static_cast<GetFileMetrics*>(metrics_.get());
+  getfile_metrics->input_bytes += file_size;
+  ++getfile_metrics->accepted_files;
   return true;
 }
 
@@ -264,11 +263,6 @@ void GetFile::performListing(const GetFileRequest 
&request) {
   utils::file::list_dir(request.inputDirectory, callback, logger_, 
request.recursive);
 }
 
-int16_t 
GetFile::getMetricNodes(std::vector<std::shared_ptr<state::response::ResponseNode>>
 &metric_vector) {
-  metric_vector.push_back(metrics_);
-  return 0;
-}
-
 REGISTER_RESOURCE(GetFile, Processor);
 
 }  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/standard-processors/processors/GetFile.h 
b/extensions/standard-processors/processors/GetFile.h
index 086ec41bd..9e7d9ca9c 100644
--- a/extensions/standard-processors/processors/GetFile.h
+++ b/extensions/standard-processors/processors/GetFile.h
@@ -48,70 +48,40 @@ struct GetFileRequest {
   std::string inputDirectory;
 };
 
-class GetFileMetrics : public state::response::ResponseNode {
+class GetFileMetrics : public core::ProcessorMetrics {
  public:
-  explicit GetFileMetrics(const CoreComponent& source_component)
-    : state::response::ResponseNode("GetFileMetrics"),
-      source_component_(source_component) {
-  }
-
-  std::string getName() const override {
-    return core::Connectable::getName();
+  explicit GetFileMetrics(const core::Processor& source_processor)
+    : core::ProcessorMetrics(source_processor) {
   }
 
   std::vector<state::response::SerializedResponseNode> serialize() override {
-    std::vector<state::response::SerializedResponseNode> resp;
-
-    state::response::SerializedResponseNode root_node;
-    root_node.name = source_component_.getUUIDStr();
-
-    state::response::SerializedResponseNode iter;
-    iter.name = "OnTriggerInvocations";
-    iter.value = (uint32_t)iterations_.load();
-
-    root_node.children.push_back(iter);
+    auto resp = core::ProcessorMetrics::serialize();
+    auto& root_node = resp[0];
 
-    state::response::SerializedResponseNode accepted_files;
-    accepted_files.name = "AcceptedFiles";
-    accepted_files.value = (uint32_t)accepted_files_.load();
+    state::response::SerializedResponseNode 
accepted_files_node{"AcceptedFiles", accepted_files.load()};
+    root_node.children.push_back(accepted_files_node);
 
-    root_node.children.push_back(accepted_files);
-
-    state::response::SerializedResponseNode input_bytes;
-    input_bytes.name = "InputBytes";
-    input_bytes.value = (uint32_t)input_bytes_.load();
-
-    root_node.children.push_back(input_bytes);
-    resp.push_back(root_node);
+    state::response::SerializedResponseNode input_bytes_node{"InputBytes", 
input_bytes.load()};
+    root_node.children.push_back(input_bytes_node);
 
     return resp;
   }
 
   std::vector<state::PublishedMetric> calculateMetrics() override {
-    return {
-      {"onTrigger_invocations", static_cast<double>(iterations_.load()),
-        {{"metric_class", "GetFileMetrics"}, {"processor_name", 
source_component_.getName()}, {"processor_uuid", 
source_component_.getUUIDStr()}}},
-      {"accepted_files", static_cast<double>(accepted_files_.load()),
-        {{"metric_class", "GetFileMetrics"}, {"processor_name", 
source_component_.getName()}, {"processor_uuid", 
source_component_.getUUIDStr()}}},
-      {"input_bytes", static_cast<double>(input_bytes_.load()),
-        {{"metric_class", "GetFileMetrics"}, {"processor_name", 
source_component_.getName()}, {"processor_uuid", 
source_component_.getUUIDStr()}}}
-    };
+    auto metrics = core::ProcessorMetrics::calculateMetrics();
+    metrics.push_back({"accepted_files", 
static_cast<double>(accepted_files.load()), getCommonLabels()});
+    metrics.push_back({"input_bytes", static_cast<double>(input_bytes.load()), 
getCommonLabels()});
+    return metrics;
   }
 
- protected:
-  friend class GetFile;
-
-  const CoreComponent& source_component_;
-  std::atomic<size_t> iterations_{0};
-  std::atomic<size_t> accepted_files_{0};
-  std::atomic<size_t> input_bytes_{0};
+  std::atomic<uint32_t> accepted_files{0};
+  std::atomic<uint64_t> input_bytes{0};
 };
 
-class GetFile : public core::Processor, public 
state::response::MetricsNodeSource {
+class GetFile : public core::Processor {
  public:
   explicit GetFile(const std::string& name, const utils::Identifier& uuid = {})
-      : Processor(name, uuid),
-        metrics_(std::make_shared<GetFileMetrics>(*this)) {
+      : Processor(name, uuid, std::make_shared<GetFileMetrics>(*this)) {
   }
   ~GetFile() override = default;
 
@@ -164,16 +134,13 @@ class GetFile : public core::Processor, public 
state::response::MetricsNodeSourc
    */
   void performListing(const GetFileRequest &request);
 
-  int16_t 
getMetricNodes(std::vector<std::shared_ptr<state::response::ResponseNode>> 
&metric_vector) override;
-
  private:
   bool isListingEmpty() const;
-  void putListing(std::string fileName);
+  void putListing(const std::string& fileName);
   std::queue<std::string> pollListing(uint64_t batch_size);
-  bool fileMatchesRequestCriteria(std::string fullName, std::string name, 
const GetFileRequest &request);
+  bool fileMatchesRequestCriteria(const std::string& fullName, const 
std::string& name, const GetFileRequest &request);
   void getSingleFile(core::ProcessSession& session, const std::string& 
file_name) const;
 
-  std::shared_ptr<GetFileMetrics> metrics_;
   GetFileRequest request_;
   std::queue<std::string> directory_listing_;
   mutable std::mutex directory_listing_mutex_;
diff --git a/extensions/standard-processors/processors/GetTCP.cpp 
b/extensions/standard-processors/processors/GetTCP.cpp
index d4d7ac93d..4eed2d13d 100644
--- a/extensions/standard-processors/processors/GetTCP.cpp
+++ b/extensions/standard-processors/processors/GetTCP.cpp
@@ -77,7 +77,7 @@ const core::Property GetTCP::EndOfMessageByte(
 const core::Relationship GetTCP::Success("success", "All files are routed to 
success");
 const core::Relationship GetTCP::Partial("partial", "Indicates an incomplete 
message as a result of encountering the end of message byte trigger");
 
-int16_t DataHandler::handle(std::string source, uint8_t *message, size_t size, 
bool partial) {
+int16_t DataHandler::handle(const std::string& source, uint8_t *message, 
size_t size, bool partial) {
   std::shared_ptr<core::ProcessSession> my_session = 
sessionFactory_->createSession();
   std::shared_ptr<core::FlowFile> flowFile = my_session->create();
 
@@ -224,7 +224,6 @@ void GetTCP::notifyStop() {
 }
 void GetTCP::onTrigger(const std::shared_ptr<core::ProcessContext> &context, 
const std::shared_ptr<core::ProcessSession>& /*session*/) {
   // Perform directory list
-  metrics_->iterations_++;
   std::lock_guard<std::mutex> lock(mutex_);
   // check if the futures are valid. If they've terminated remove it from the 
map.
 
@@ -289,11 +288,6 @@ void GetTCP::onTrigger(const 
std::shared_ptr<core::ProcessContext> &context, con
   context->yield();
 }
 
-int16_t 
GetTCP::getMetricNodes(std::vector<std::shared_ptr<state::response::ResponseNode>>
 &metric_vector) {
-  metric_vector.push_back(metrics_);
-  return 0;
-}
-
 REGISTER_RESOURCE(GetTCP, Processor);
 
 }  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/standard-processors/processors/GetTCP.h 
b/extensions/standard-processors/processors/GetTCP.h
index 7e326eb19..c2085ce37 100644
--- a/extensions/standard-processors/processors/GetTCP.h
+++ b/extensions/standard-processors/processors/GetTCP.h
@@ -91,54 +91,13 @@ class DataHandler {
   }
   static const char *SOURCE_ENDPOINT_ATTRIBUTE;
 
-  int16_t handle(std::string source, uint8_t *message, size_t size, bool 
partial);
+  int16_t handle(const std::string& source, uint8_t *message, size_t size, 
bool partial);
 
  private:
   std::shared_ptr<core::ProcessSessionFactory> sessionFactory_;
 };
 
-class GetTCPMetrics : public state::response::ResponseNode {
- public:
-  explicit GetTCPMetrics(const CoreComponent& source_component)
-    : state::response::ResponseNode("GetTCPMetrics"),
-      source_component_(source_component) {
-  }
-
-  std::string getName() const override {
-    return core::Connectable::getName();
-  }
-
-  std::vector<state::response::SerializedResponseNode> serialize() override {
-    std::vector<state::response::SerializedResponseNode> resp;
-
-    state::response::SerializedResponseNode root_node;
-    root_node.name = source_component_.getUUIDStr();
-
-    state::response::SerializedResponseNode iter;
-    iter.name = "OnTriggerInvocations";
-    iter.value = (uint32_t)iterations_.load();
-
-    root_node.children.push_back(iter);
-    resp.push_back(root_node);
-
-    return resp;
-  }
-
-  std::vector<state::PublishedMetric> calculateMetrics() override {
-    return {
-      {"onTrigger_invocations", static_cast<double>(iterations_.load()),
-        {{"metric_class", getName()}, {"processor_name", 
source_component_.getName()}, {"processor_uuid", 
source_component_.getUUIDStr()}}}
-    };
-  }
-
- protected:
-  friend class GetTCP;
-
-  const CoreComponent& source_component_;
-  std::atomic<size_t> iterations_{0};
-};
-
-class GetTCP : public core::Processor, public 
state::response::MetricsNodeSource {
+class GetTCP : public core::Processor {
  public:
   explicit GetTCP(const std::string& name, const utils::Identifier& uuid = {})
     : Processor(name, uuid),
@@ -147,8 +106,7 @@ class GetTCP : public core::Processor, public 
state::response::MetricsNodeSource
       concurrent_handlers_(2),
       endOfMessageByte(static_cast<std::byte>(13)),
       receive_buffer_size_(16 * 1024 * 1024),
-      connection_attempt_limit_(3),
-      metrics_(std::make_shared<GetTCPMetrics>(*this)) {
+      connection_attempt_limit_(3) {
   }
 
   ~GetTCP() override {
@@ -200,8 +158,6 @@ class GetTCP : public core::Processor, public 
state::response::MetricsNodeSource
   }
   void initialize() override;
 
-  int16_t 
getMetricNodes(std::vector<std::shared_ptr<state::response::ResponseNode>> 
&metric_vector) override;
-
  protected:
   void notifyStop() override;
 
@@ -218,7 +174,6 @@ class GetTCP : public core::Processor, public 
state::response::MetricsNodeSource
   std::chrono::milliseconds reconnect_interval_{5000};
   uint64_t receive_buffer_size_;
   uint16_t connection_attempt_limit_;
-  std::shared_ptr<GetTCPMetrics> metrics_;
   // Mutex for ensuring clients are running
   std::mutex mutex_;
   std::shared_ptr<minifi::controllers::SSLContextService> ssl_service_;
diff --git a/extensions/standard-processors/tests/unit/ProcessorTests.cpp 
b/extensions/standard-processors/tests/unit/ProcessorTests.cpp
index d563baf59..3b4d49079 100644
--- a/extensions/standard-processors/tests/unit/ProcessorTests.cpp
+++ b/extensions/standard-processors/tests/unit/ProcessorTests.cpp
@@ -822,3 +822,11 @@ TEST_CASE("isSingleThreaded - two threads for a single 
threaded processor", "[is
   REQUIRE(LogTestController::getInstance().contains("[warning] Processor 
myProc can not be run in parallel, its "
                                                     "\"max concurrent tasks\" 
value is too high. It was set to 1 from 2."));
 }
+
+TEST_CASE("Test getProcessorType", "[getProcessorType]") {
+  TestController testController;
+
+  std::shared_ptr<TestPlan> plan = testController.createPlan();
+  auto processor = plan->addProcessor("GenerateFlowFile", "myProc");
+  REQUIRE(processor->getProcessorType() == "GenerateFlowFile");
+}
diff --git a/libminifi/include/core/ProcessSession.h 
b/libminifi/include/core/ProcessSession.h
index 470df5801..702e35231 100644
--- a/libminifi/include/core/ProcessSession.h
+++ b/libminifi/include/core/ProcessSession.h
@@ -27,6 +27,7 @@
 #include <atomic>
 #include <algorithm>
 #include <set>
+#include <unordered_map>
 
 #include "ProcessContext.h"
 #include "FlowFileRecord.h"
@@ -37,6 +38,7 @@
 #include "WeakReference.h"
 #include "provenance/Provenance.h"
 #include "utils/gsl.h"
+#include "ProcessorMetrics.h"
 
 namespace org::apache::nifi::minifi::core {
 namespace detail {
@@ -82,11 +84,11 @@ class ProcessSession : public ReferenceContainer {
   // Clone a new UUID FlowFile from parent for attributes and sub set of 
parent content resource claim
   std::shared_ptr<core::FlowFile> clone(const std::shared_ptr<core::FlowFile> 
&parent, int64_t offset, int64_t size);
   // Transfer the FlowFile to the relationship
-  virtual void transfer(const std::shared_ptr<core::FlowFile> &flow, const 
Relationship& relationship);
+  virtual void transfer(const std::shared_ptr<core::FlowFile>& flow, const 
Relationship& relationship);
   // Put Attribute
-  void putAttribute(const std::shared_ptr<core::FlowFile> &flow, const 
std::string& key, const std::string& value);
+  void putAttribute(const std::shared_ptr<core::FlowFile>& flow, const 
std::string& key, const std::string& value);
   // Remove Attribute
-  void removeAttribute(const std::shared_ptr<core::FlowFile> &flow, const 
std::string& key);
+  void removeAttribute(const std::shared_ptr<core::FlowFile>& flow, const 
std::string& key);
   // Remove Flow File
   void remove(const std::shared_ptr<core::FlowFile> &flow);
   // Access the contents of the flow file as an input stream; returns null if 
the flow file has no content claim
@@ -144,6 +146,10 @@ class ProcessSession : public ReferenceContainer {
 
   bool existsFlowFileInRelationship(const Relationship &relationship);
 
+  void setMetrics(const std::shared_ptr<ProcessorMetrics>& metrics) {
+    metrics_ = metrics;
+  }
+
 // Prevent default copy constructor and assignment operation
 // Only support pass by reference or pointer
   ProcessSession(const ProcessSession &parent) = delete;
@@ -174,7 +180,12 @@ class ProcessSession : public ReferenceContainer {
     Error_NoRelationship
   };
 
-  RouteResult routeFlowFile(const std::shared_ptr<FlowFile>& record);
+  struct TransferMetrics {
+    size_t transfer_count = 0;
+    uint64_t transfer_size = 0;
+  };
+
+  RouteResult routeFlowFile(const std::shared_ptr<FlowFile>& record, const 
std::function<void(const FlowFile&, const Relationship&)>& transfer_callback);
 
   void persistFlowFilesBeforeTransfer(
       std::map<Connectable*, std::vector<std::shared_ptr<core::FlowFile>>>& 
transactionMap,
@@ -197,6 +208,8 @@ class ProcessSession : public ReferenceContainer {
   CoreComponentStateManager* stateManager_;
 
   static std::shared_ptr<utils::IdGenerator> id_generator_;
+
+  std::shared_ptr<ProcessorMetrics> metrics_;
 };
 
 }  // namespace org::apache::nifi::minifi::core
diff --git a/libminifi/include/core/Processor.h 
b/libminifi/include/core/Processor.h
index 4dca4b7b0..900f5d0c5 100644
--- a/libminifi/include/core/Processor.h
+++ b/libminifi/include/core/Processor.h
@@ -29,6 +29,7 @@
 #include <unordered_set>
 #include <unordered_map>
 #include <utility>
+#include <vector>
 
 #include "ConfigurableComponent.h"
 #include "Connectable.h"
@@ -36,12 +37,23 @@
 #include "core/Annotation.h"
 #include "Scheduling.h"
 #include "utils/TimeUtil.h"
+#include "core/state/nodes/MetricsBase.h"
+#include "ProcessorMetrics.h"
+#include "utils/gsl.h"
+
+#define ADD_GET_PROCESSOR_NAME \
+  std::string getProcessorType() const override { \
+    auto class_name = 
org::apache::nifi::minifi::core::getClassName<decltype(*this)>(); \
+    auto splitted = 
org::apache::nifi::minifi::utils::StringUtils::split(class_name, "::"); \
+    return splitted[splitted.size() - 1]; \
+  }
 
 #define ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS \
   bool supportsDynamicProperties() const override { return 
SupportsDynamicProperties; } \
   bool supportsDynamicRelationships() const override { return 
SupportsDynamicRelationships; } \
   minifi::core::annotation::Input getInputRequirement() const override { 
return InputRequirement; } \
-  bool isSingleThreaded() const override { return IsSingleThreaded; }
+  bool isSingleThreaded() const override { return IsSingleThreaded; } \
+  ADD_GET_PROCESSOR_NAME
 
 namespace org::apache::nifi::minifi {
 
@@ -62,10 +74,10 @@ constexpr std::chrono::nanoseconds 
MINIMUM_SCHEDULING_NANOS{30000};
 
 #define BUILDING_DLL 1
 
-class Processor : public Connectable, public ConfigurableComponent {
+class Processor : public Connectable, public ConfigurableComponent, public 
state::response::ResponseNodeSource {
  public:
-  Processor(const std::string& name, const utils::Identifier& uuid);
-  explicit Processor(const std::string& name);
+  Processor(const std::string& name, const utils::Identifier& uuid, 
std::shared_ptr<ProcessorMetrics> metrics = nullptr);
+  explicit Processor(const std::string& name, 
std::shared_ptr<ProcessorMetrics> metrics = nullptr);
 
   Processor(const Processor& parent) = delete;
   Processor& operator=(const Processor& parent) = delete;
@@ -126,6 +138,8 @@ class Processor : public Connectable, public 
ConfigurableComponent {
 
   virtual bool isSingleThreaded() const = 0;
 
+  virtual std::string getProcessorType() const = 0;
+
   void setTriggerWhenEmpty(bool value) {
     _triggerWhenEmpty = value;
   }
@@ -172,7 +186,6 @@ class Processor : public Connectable, public 
ConfigurableComponent {
     return !isRunning();
   }
 
- public:
   virtual void onTrigger(const std::shared_ptr<ProcessContext> &context, const 
std::shared_ptr<ProcessSession> &session) {
     onTrigger(context.get(), session.get());
   }
@@ -206,6 +219,10 @@ class Processor : public Connectable, public 
ConfigurableComponent {
 
   virtual annotation::Input getInputRequirement() const = 0;
 
+  std::shared_ptr<state::response::ResponseNode> getResponseNodes() override {
+    return metrics_;
+  }
+
  protected:
   virtual void notifyStop() {
   }
@@ -222,6 +239,7 @@ class Processor : public Connectable, public 
ConfigurableComponent {
   std::atomic<bool> _triggerWhenEmpty;
 
   std::string cron_period_;
+  gsl::not_null<std::shared_ptr<ProcessorMetrics>> metrics_;
 
  private:
   mutable std::mutex mutex_;
diff --git a/libminifi/include/core/ProcessorMetrics.h 
b/libminifi/include/core/ProcessorMetrics.h
new file mode 100644
index 000000000..ae1aa90dc
--- /dev/null
+++ b/libminifi/include/core/ProcessorMetrics.h
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <string>
+#include <chrono>
+#include <atomic>
+#include <unordered_map>
+#include <mutex>
+#include <vector>
+
+#include "core/state/nodes/MetricsBase.h"
+#include "core/state/PublishedMetricProvider.h"
+
+namespace org::apache::nifi::minifi::core {
+
+template<typename T>
+concept Summable = requires(T x) { x + x; };  // NOLINT(readability/braces)
+
+template<typename T>
+concept DividableByInteger = requires(T x, uint32_t divisor) { x / divisor; }; 
 // NOLINT(readability/braces)
+
+class Processor;
+
+class ProcessorMetrics : public state::response::ResponseNode {
+ public:
+  explicit ProcessorMetrics(const Processor& source_processor);
+
+  [[nodiscard]] std::string getName() const override;
+
+  std::vector<state::response::SerializedResponseNode> serialize() override;
+  std::vector<state::PublishedMetric> calculateMetrics() override;
+  void increaseRelationshipTransferCount(const std::string& relationship, 
size_t count = 1);
+  std::chrono::milliseconds getAverageOnTriggerRuntime() const;
+  std::chrono::milliseconds getLastOnTriggerRuntime() const;
+  void addLastOnTriggerRuntime(std::chrono::milliseconds runtime);
+
+  std::atomic<size_t> iterations{0};
+  std::atomic<size_t> transferred_flow_files{0};
+  std::atomic<uint64_t> transferred_bytes{0};
+
+ protected:
+  template<typename ValueType>
+  requires Summable<ValueType> && DividableByInteger<ValueType>
+  class Averager {
+   public:
+    explicit Averager(uint32_t sample_size) : SAMPLE_SIZE_(sample_size), 
next_average_index_(SAMPLE_SIZE_) {
+      values_.reserve(SAMPLE_SIZE_);
+    }
+
+    ValueType getAverage() const;
+    ValueType getLastValue() const;
+    void addValue(ValueType runtime);
+
+   private:
+    const uint32_t SAMPLE_SIZE_;
+    mutable std::mutex average_value_mutex_;
+    uint32_t next_average_index_;
+    std::vector<ValueType> values_;
+  };
+
+  [[nodiscard]] std::unordered_map<std::string, std::string> getCommonLabels() 
const;
+  static const uint8_t STORED_ON_TRIGGER_RUNTIME_COUNT = 10;
+
+  std::mutex transferred_relationships_mutex_;
+  std::unordered_map<std::string, size_t> transferred_relationships_;
+  const Processor& source_processor_;
+  Averager<std::chrono::milliseconds> on_trigger_runtime_averager_;
+};
+
+}  // namespace org::apache::nifi::minifi::core
diff --git a/libminifi/include/core/state/nodes/MetricsBase.h 
b/libminifi/include/core/state/nodes/MetricsBase.h
index da545fb14..57ef26187 100644
--- a/libminifi/include/core/state/nodes/MetricsBase.h
+++ b/libminifi/include/core/state/nodes/MetricsBase.h
@@ -154,46 +154,8 @@ class ObjectNode : public ResponseNode {
  */
 class ResponseNodeSource {
  public:
-  ResponseNodeSource() = default;
-
   virtual ~ResponseNodeSource() = default;
-
-  /**
-   * Retrieves all metrics from this source.
-   * @param metric_vector -- metrics will be placed in this vector.
-   * @return result of the get operation.
-   *  0 Success
-   *  1 No error condition, but cannot obtain lock in timely manner.
-   *  -1 failure
-   */
-  virtual int16_t getResponseNodes(std::vector<std::shared_ptr<ResponseNode>> 
&metric_vector) = 0;
-
-  virtual int16_t getMetricNodes(std::vector<std::shared_ptr<ResponseNode>> 
&metric_vector) = 0;
-};
-
-/**
- * Purpose: Retrieves Metrics from the defined class. The current Metric, 
which is a consumable for any reader of Metrics must have the ability to set 
metrics.
- *
- */
-class MetricsNodeSource : public ResponseNodeSource {
- public:
-  MetricsNodeSource() = default;
-
-  virtual ~MetricsNodeSource() = default;
-
-  /**
-   * Retrieves all metrics from this source.
-   * @param metric_vector -- metrics will be placed in this vector.
-   * @return result of the get operation.
-   *  0 Success
-   *  1 No error condition, but cannot obtain lock in timely manner.
-   *  -1 failure
-   */
-  virtual int16_t getResponseNodes(std::vector<std::shared_ptr<ResponseNode>> 
&metric_vector) {
-    return getMetricNodes(metric_vector);
-  }
-
-  virtual int16_t getMetricNodes(std::vector<std::shared_ptr<ResponseNode>> 
&metric_vector) = 0;
+  virtual std::shared_ptr<ResponseNode> getResponseNodes() = 0;
 };
 
 class NodeReporter {
diff --git a/libminifi/src/core/ProcessSession.cpp 
b/libminifi/src/core/ProcessSession.cpp
index 35dac48fb..b9ca57272 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -201,14 +201,14 @@ void ProcessSession::remove(const 
std::shared_ptr<core::FlowFile> &flow) {
   provenance_report_->drop(flow, reason);
 }
 
-void ProcessSession::putAttribute(const std::shared_ptr<core::FlowFile> &flow, 
const std::string& key, const std::string& value) {
+void ProcessSession::putAttribute(const std::shared_ptr<core::FlowFile>& flow, 
const std::string& key, const std::string& value) {
   flow->setAttribute(key, value);
   std::stringstream details;
   details << process_context_->getProcessorNode()->getName() << " modify flow 
record " << flow->getUUIDStr() << " attribute " << key << ":" << value;
   provenance_report_->modifyAttributes(flow, details.str());
 }
 
-void ProcessSession::removeAttribute(const std::shared_ptr<core::FlowFile> 
&flow, const std::string& key) {
+void ProcessSession::removeAttribute(const std::shared_ptr<core::FlowFile>& 
flow, const std::string& key) {
   flow->removeAttribute(key);
   std::stringstream details;
   details << process_context_->getProcessorNode()->getName() << " remove flow 
record " << flow->getUUIDStr() << " attribute " + key;
@@ -221,7 +221,7 @@ void ProcessSession::penalize(const 
std::shared_ptr<core::FlowFile> &flow) {
   flow->penalize(penalization_period);
 }
 
-void ProcessSession::transfer(const std::shared_ptr<core::FlowFile> &flow, 
const Relationship& relationship) {
+void ProcessSession::transfer(const std::shared_ptr<core::FlowFile>& flow, 
const Relationship& relationship) {
   logging::LOG_INFO(logger_) << "Transferring " << flow->getUUIDStr() << " 
from " << process_context_->getProcessorNode()->getName() << " to relationship 
" << relationship.getName();
   utils::Identifier uuid = flow->getUUID();
   _transferRelationship[uuid] = relationship;
@@ -743,7 +743,7 @@ void ProcessSession::restore(const std::string &key, const 
std::shared_ptr<core:
   flow->clearStashClaim(key);
 }
 
-ProcessSession::RouteResult ProcessSession::routeFlowFile(const 
std::shared_ptr<FlowFile> &record) {
+ProcessSession::RouteResult ProcessSession::routeFlowFile(const 
std::shared_ptr<FlowFile> &record, const std::function<void(const FlowFile&, 
const Relationship&)>& transfer_callback) {
   if (record->isDeleted()) {
     return RouteResult::Ok_Deleted;
   }
@@ -764,6 +764,8 @@ ProcessSession::RouteResult 
ProcessSession::routeFlowFile(const std::shared_ptr<
     } else {
       // Autoterminated
       remove(record);
+      transfer_callback(*record, relationship);
+      return RouteResult::Ok_AutoTerminated;
     }
   } else {
     // We connections, clone the flow and assign the connection accordingly
@@ -772,13 +774,16 @@ ProcessSession::RouteResult 
ProcessSession::routeFlowFile(const std::shared_ptr<
       if (itConnection == connections.begin()) {
         // First connection which the flow need be routed to
         record->setConnection(connection);
+        transfer_callback(*record, relationship);
       } else {
         // Clone the flow file and route to the connection
         std::shared_ptr<core::FlowFile> cloneRecord = 
this->cloneDuringTransfer(record);
-        if (cloneRecord)
+        if (cloneRecord) {
           cloneRecord->setConnection(connection);
-        else
+          transfer_callback(*cloneRecord, relationship);
+        } else {
           throw Exception(PROCESS_SESSION_EXCEPTION, "Can not clone the flow 
for transfer " + record->getUUIDStr());
+        }
       }
     }
   }
@@ -787,10 +792,15 @@ ProcessSession::RouteResult 
ProcessSession::routeFlowFile(const std::shared_ptr<
 
 void ProcessSession::commit() {
   try {
+    std::unordered_map<std::string, TransferMetrics> transfers;
+      auto increaseTransferMetrics = [&](const FlowFile& record, const 
Relationship& relationship) {
+      ++transfers[relationship.getName()].transfer_count;
+      transfers[relationship.getName()].transfer_size += record.getSize();
+    };
     // First we clone the flow record based on the transferred relationship 
for updated flow record
     for (auto && it : _updatedFlowFiles) {
       auto record = it.second.modified;
-      if (routeFlowFile(record) == RouteResult::Error_NoRelationship) {
+      if (routeFlowFile(record, increaseTransferMetrics) == 
RouteResult::Error_NoRelationship) {
         // Can not find relationship for the flow
         throw Exception(PROCESS_SESSION_EXCEPTION, "Can not find the transfer 
relationship for the updated flow " + record->getUUIDStr());
       }
@@ -799,7 +809,7 @@ void ProcessSession::commit() {
     // Do the same thing for added flow file
     for (const auto& it : _addedFlowFiles) {
       auto record = it.second;
-      if (routeFlowFile(record) == RouteResult::Error_NoRelationship) {
+      if (routeFlowFile(record, increaseTransferMetrics) == 
RouteResult::Error_NoRelationship) {
         // Can not find relationship for the flow
         throw Exception(PROCESS_SESSION_EXCEPTION, "Can not find the transfer 
relationship for the added flow " + record->getUUIDStr());
       }
@@ -875,6 +885,14 @@ void ProcessSession::commit() {
       }
     }
 
+    if (metrics_) {
+      for (const auto& [relationship_name, transfer_metrics] : transfers) {
+        metrics_->transferred_bytes += transfer_metrics.transfer_size;
+        metrics_->transferred_flow_files += transfer_metrics.transfer_count;
+        metrics_->increaseRelationshipTransferCount(relationship_name, 
transfer_metrics.transfer_count);
+      }
+    }
+
     // All done
     _updatedFlowFiles.clear();
     _addedFlowFiles.clear();
diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp
index 5aaffb4a0..4e73c2976 100644
--- a/libminifi/src/core/Processor.cpp
+++ b/libminifi/src/core/Processor.cpp
@@ -20,6 +20,7 @@
 #include "core/Processor.h"
 
 #include <ctime>
+#include <cctype>
 
 #include <memory>
 #include <set>
@@ -40,9 +41,10 @@ using namespace std::literals::chrono_literals;
 
 namespace org::apache::nifi::minifi::core {
 
-Processor::Processor(const std::string& name)
+Processor::Processor(const std::string& name, 
std::shared_ptr<ProcessorMetrics> metrics)
     : Connectable(name),
-      logger_(logging::LoggerFactory<Processor>::getLogger()) {
+      logger_(logging::LoggerFactory<Processor>::getLogger()),
+      metrics_(metrics ? std::move(metrics) : 
std::make_shared<ProcessorMetrics>(*this)) {
   has_work_.store(false);
   // Setup the default values
   state_ = DISABLED;
@@ -58,9 +60,10 @@ Processor::Processor(const std::string& name)
   logger_->log_debug("Processor %s created UUID %s", name_, getUUIDStr());
 }
 
-Processor::Processor(const std::string& name, const utils::Identifier& uuid)
+Processor::Processor(const std::string& name, const utils::Identifier& uuid, 
std::shared_ptr<ProcessorMetrics> metrics)
     : Connectable(name, uuid),
-      logger_(logging::LoggerFactory<Processor>::getLogger()) {
+      logger_(logging::LoggerFactory<Processor>::getLogger()),
+      metrics_(metrics ? std::move(metrics) : 
std::make_shared<ProcessorMetrics>(*this)) {
   has_work_.store(false);
   // Setup the default values
   state_ = DISABLED;
@@ -174,11 +177,15 @@ bool Processor::flowFilesOutGoingFull() const {
 }
 
 void Processor::onTrigger(ProcessContext *context, ProcessSessionFactory 
*sessionFactory) {
+  ++metrics_->iterations;
   auto session = sessionFactory->createSession();
+  session->setMetrics(metrics_);
 
   try {
     // Call the virtual trigger function
+    const auto start = std::chrono::steady_clock::now();
     onTrigger(context, session.get());
+    
metrics_->addLastOnTriggerRuntime(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now()
 - start));
     session->commit();
   } catch (const std::exception& exception) {
     logger_->log_warn("Caught \"%s\" (%s) during Processor::onTrigger of 
processor: %s (%s)",
@@ -193,11 +200,15 @@ void Processor::onTrigger(ProcessContext *context, 
ProcessSessionFactory *sessio
 }
 
 void Processor::onTrigger(const std::shared_ptr<ProcessContext> &context, 
const std::shared_ptr<ProcessSessionFactory> &sessionFactory) {
+  ++metrics_->iterations;
   auto session = sessionFactory->createSession();
+  session->setMetrics(metrics_);
 
   try {
     // Call the virtual trigger function
+    const auto start = std::chrono::steady_clock::now();
     onTrigger(context, session);
+    
metrics_->addLastOnTriggerRuntime(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now()
 - start));
     session->commit();
   } catch (std::exception &exception) {
     logger_->log_warn("Caught \"%s\" (%s) during Processor::onTrigger of 
processor: %s (%s)",
diff --git a/libminifi/src/core/ProcessorMetrics.cpp 
b/libminifi/src/core/ProcessorMetrics.cpp
new file mode 100644
index 000000000..e243a0178
--- /dev/null
+++ b/libminifi/src/core/ProcessorMetrics.cpp
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "core/ProcessorMetrics.h"
+
+#include "core/Processor.h"
+#include "utils/gsl.h"
+#include "range/v3/numeric/accumulate.hpp"
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::core {
+
+ProcessorMetrics::ProcessorMetrics(const Processor& source_processor)
+    : source_processor_(source_processor),
+      on_trigger_runtime_averager_(STORED_ON_TRIGGER_RUNTIME_COUNT) {
+}
+
+std::string ProcessorMetrics::getName() const {
+  return source_processor_.getProcessorType() + "Metrics";
+}
+
+std::unordered_map<std::string, std::string> 
ProcessorMetrics::getCommonLabels() const {
+  return {{"metric_class", getName()}, {"processor_name", 
source_processor_.getName()}, {"processor_uuid", 
source_processor_.getUUIDStr()}};
+}
+
+std::vector<state::response::SerializedResponseNode> 
ProcessorMetrics::serialize() {
+  std::vector<state::response::SerializedResponseNode> resp;
+
+  state::response::SerializedResponseNode root_node {
+    .name = source_processor_.getUUIDStr(),
+    .children = {
+      {.name = "OnTriggerInvocations", .value = 
static_cast<uint32_t>(iterations.load())},
+      {.name = "AverageOnTriggerRunTime", .value = 
static_cast<uint64_t>(getAverageOnTriggerRuntime().count())},
+      {.name = "LastOnTriggerRunTime", .value = 
static_cast<uint64_t>(getLastOnTriggerRuntime().count())},
+      {.name = "TransferredFlowFiles", .value = 
static_cast<uint32_t>(transferred_flow_files.load())},
+      {.name = "TransferredBytes", .value = transferred_bytes.load()}
+    }
+  };
+
+  {
+    std::lock_guard<std::mutex> lock(transferred_relationships_mutex_);
+    for (const auto& [relationship, count] : transferred_relationships_) {
+      gsl_Expects(!relationship.empty());
+      state::response::SerializedResponseNode transferred_to_relationship_node;
+      transferred_to_relationship_node.name = 
std::string("TransferredTo").append(1, 
toupper(relationship[0])).append(relationship.substr(1));
+      transferred_to_relationship_node.value = static_cast<uint32_t>(count);
+
+      root_node.children.push_back(transferred_to_relationship_node);
+    }
+  }
+
+  resp.push_back(root_node);
+
+  return resp;
+}
+
+std::vector<state::PublishedMetric> ProcessorMetrics::calculateMetrics() {
+  std::vector<state::PublishedMetric> metrics = {
+    {"onTrigger_invocations", static_cast<double>(iterations.load()), 
getCommonLabels()},
+    {"average_onTrigger_runtime_milliseconds", 
static_cast<double>(getAverageOnTriggerRuntime().count()), getCommonLabels()},
+    {"last_onTrigger_runtime_milliseconds", 
static_cast<double>(getLastOnTriggerRuntime().count()), getCommonLabels()},
+    {"transferred_flow_files", 
static_cast<double>(transferred_flow_files.load()), getCommonLabels()},
+    {"transferred_bytes", static_cast<double>(transferred_bytes.load()), 
getCommonLabels()}
+  };
+
+  {
+    std::lock_guard<std::mutex> lock(transferred_relationships_mutex_);
+    for (const auto& [relationship, count] : transferred_relationships_) {
+      metrics.push_back({"transferred_to_" + relationship, 
static_cast<double>(count),
+        {{"metric_class", getName()}, {"processor_name", 
source_processor_.getName()}, {"processor_uuid", 
source_processor_.getUUIDStr()}}});
+    }
+  }
+
+  return metrics;
+}
+
+void ProcessorMetrics::increaseRelationshipTransferCount(const std::string& 
relationship, size_t count) {
+  std::lock_guard<std::mutex> lock(transferred_relationships_mutex_);
+  transferred_relationships_[relationship] += count;
+}
+
+std::chrono::milliseconds ProcessorMetrics::getAverageOnTriggerRuntime() const 
{
+  return on_trigger_runtime_averager_.getAverage();
+}
+
+void ProcessorMetrics::addLastOnTriggerRuntime(std::chrono::milliseconds 
runtime) {
+  on_trigger_runtime_averager_.addValue(runtime);
+}
+
+std::chrono::milliseconds ProcessorMetrics::getLastOnTriggerRuntime() const {
+  return on_trigger_runtime_averager_.getLastValue();
+}
+
+
+template<typename ValueType>
+requires Summable<ValueType> && DividableByInteger<ValueType>
+ValueType ProcessorMetrics::Averager<ValueType>::getAverage() const {
+  if (values_.empty()) {
+    return {};
+  }
+  std::lock_guard<std::mutex> lock(average_value_mutex_);
+  return ranges::accumulate(values_, ValueType{}) / values_.size();
+}
+
+template<typename ValueType>
+requires Summable<ValueType> && DividableByInteger<ValueType>
+void ProcessorMetrics::Averager<ValueType>::addValue(ValueType runtime) {
+  std::lock_guard<std::mutex> lock(average_value_mutex_);
+  if (values_.size() < SAMPLE_SIZE_) {
+    values_.push_back(runtime);
+  } else {
+    if (next_average_index_ >= values_.size()) {
+      next_average_index_ = 0;
+    }
+    values_[next_average_index_] = runtime;
+    ++next_average_index_;
+  }
+}
+
+template<typename ValueType>
+requires Summable<ValueType> && DividableByInteger<ValueType>
+ValueType ProcessorMetrics::Averager<ValueType>::getLastValue() const {
+  std::lock_guard<std::mutex> lock(average_value_mutex_);
+  if (values_.empty()) {
+    return {};
+  } else if (values_.size() < SAMPLE_SIZE_) {
+    return values_[values_.size() - 1];
+  } else {
+    return values_[next_average_index_ - 1];
+  }
+}
+
+}  // namespace org::apache::nifi::minifi::core
diff --git a/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp 
b/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp
index feb44a00e..ad1389223 100644
--- a/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp
+++ b/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp
@@ -56,12 +56,9 @@ void 
ResponseNodeLoader::initializeComponentMetrics(core::ProcessGroup* root) {
       continue;
     }
     // we have a metrics source.
-    std::vector<std::shared_ptr<ResponseNode>> metric_vector;
-    node_source->getResponseNodes(metric_vector);
+    auto metric = node_source->getResponseNodes();
     std::lock_guard<std::mutex> guard(component_metrics_mutex_);
-    for (const auto& metric : metric_vector) {
-      component_metrics_[metric->getName()].push_back(metric);
-    }
+    component_metrics_[metric->getName()].push_back(metric);
   }
 }
 
diff --git a/libminifi/test/DummyProcessor.cpp 
b/libminifi/test/DummyProcessor.cpp
new file mode 100644
index 000000000..783475a7c
--- /dev/null
+++ b/libminifi/test/DummyProcessor.cpp
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "DummyProcessor.h"
+
+#include "core/Resource.h"
+
+namespace org::apache::nifi::minifi::test {
+
+REGISTER_RESOURCE(DummyProcessor, Processor);
+
+}  // namespace org::apache::nifi::minifi::test
diff --git a/libminifi/test/DummyProcessor.h b/libminifi/test/DummyProcessor.h
new file mode 100644
index 000000000..a5eceecc4
--- /dev/null
+++ b/libminifi/test/DummyProcessor.h
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <array>
+#include <string>
+
+#include "core/Processor.h"
+#include "agent/agent_docs.h"
+
+namespace org::apache::nifi::minifi::test {
+
+class DummyProcessor : public minifi::core::Processor {
+  using minifi::core::Processor::Processor;
+
+ public:
+  DummyProcessor(const std::string& name, const minifi::utils::Identifier& 
uuid) : Processor(name, uuid) {}
+  explicit DummyProcessor(const std::string& name) : Processor(name) {}
+  static constexpr const char* Description = "A processor that does nothing.";
+  static auto properties() { return std::array<core::Property, 0>{}; }
+  static auto relationships() { return std::array<core::Relationship, 0>{}; }
+  static constexpr bool SupportsDynamicProperties = false;
+  static constexpr bool SupportsDynamicRelationships = false;
+  static constexpr core::annotation::Input InputRequirement = 
core::annotation::Input::INPUT_ALLOWED;
+  static constexpr bool IsSingleThreaded = false;
+  ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
+};
+
+}  // namespace org::apache::nifi::minifi::test
diff --git a/libminifi/test/unit/ContentRepositoryDependentTests.h 
b/libminifi/test/unit/ContentRepositoryDependentTests.h
index a476b4558..6674729e8 100644
--- a/libminifi/test/unit/ContentRepositoryDependentTests.h
+++ b/libminifi/test/unit/ContentRepositoryDependentTests.h
@@ -27,6 +27,7 @@
 #include "core/Resource.h"
 #include "../TestBase.h"
 #include "../Catch.h"
+#include "../DummyProcessor.h"
 #include "StreamPipe.h"
 
 #pragma once
@@ -53,22 +54,6 @@ struct ReadUntilItCan {
   }
 };
 
-class DummyProcessor : public core::Processor {
- public:
-  using core::Processor::Processor;
-
-  static constexpr const char* Description = "A processor that does nothing.";
-  static auto properties() { return std::array<core::Property, 0>{}; }
-  static auto relationships() { return std::array<core::Relationship, 0>{}; }
-  static constexpr bool SupportsDynamicProperties = false;
-  static constexpr bool SupportsDynamicRelationships = false;
-  static constexpr core::annotation::Input InputRequirement = 
core::annotation::Input::INPUT_ALLOWED;
-  static constexpr bool IsSingleThreaded = false;
-  ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
-};
-
-REGISTER_RESOURCE(DummyProcessor, Processor);
-
 class Fixture {
  public:
   const core::Relationship Success{"success", "everything is fine"};
diff --git a/libminifi/test/unit/MetricsTests.cpp 
b/libminifi/test/unit/MetricsTests.cpp
index a3c96128b..20aeb7c6e 100644
--- a/libminifi/test/unit/MetricsTests.cpp
+++ b/libminifi/test/unit/MetricsTests.cpp
@@ -25,6 +25,11 @@
 #include "core/ClassLoader.h"
 #include "repository/VolatileContentRepository.h"
 #include "ProvenanceTestHelper.h"
+#include "../DummyProcessor.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::test {
 
 TEST_CASE("QueueMetricsTestNoConnections", "[c2m2]") {
   minifi::state::response::QueueMetrics metrics;
@@ -203,3 +208,44 @@ TEST_CASE("RepositorymetricsHaveRepo", "[c2m4]") {
     REQUIRE("0" == size.value);
   }
 }
+
+TEST_CASE("Test ProcessorMetrics", "[ProcessorMetrics]") {
+  DummyProcessor dummy_processor("dummy");
+  minifi::core::ProcessorMetrics metrics(dummy_processor);
+
+  REQUIRE("DummyProcessorMetrics" == metrics.getName());
+
+  REQUIRE(metrics.getLastOnTriggerRuntime() == 0ms);
+  REQUIRE(metrics.getAverageOnTriggerRuntime() == 0ms);
+
+  metrics.addLastOnTriggerRuntime(10ms);
+  metrics.addLastOnTriggerRuntime(20ms);
+  metrics.addLastOnTriggerRuntime(30ms);
+
+  REQUIRE(metrics.getLastOnTriggerRuntime() == 30ms);
+  REQUIRE(metrics.getAverageOnTriggerRuntime() == 20ms);
+
+  for (auto i = 0; i < 7; ++i) {
+    metrics.addLastOnTriggerRuntime(50ms);
+  }
+  REQUIRE(metrics.getAverageOnTriggerRuntime() == 41ms);
+  REQUIRE(metrics.getLastOnTriggerRuntime() == 50ms);
+
+  for (auto i = 0; i < 3; ++i) {
+    metrics.addLastOnTriggerRuntime(50ms);
+  }
+  REQUIRE(metrics.getAverageOnTriggerRuntime() == 50ms);
+  REQUIRE(metrics.getLastOnTriggerRuntime() == 50ms);
+
+  for (auto i = 0; i < 10; ++i) {
+    metrics.addLastOnTriggerRuntime(40ms);
+  }
+  REQUIRE(metrics.getAverageOnTriggerRuntime() == 40ms);
+  REQUIRE(metrics.getLastOnTriggerRuntime() == 40ms);
+
+  metrics.addLastOnTriggerRuntime(10ms);
+  REQUIRE(metrics.getLastOnTriggerRuntime() == 10ms);
+  REQUIRE(metrics.getAverageOnTriggerRuntime() == 37ms);
+}
+
+}  // namespace org::apache::nifi::minifi::test
diff --git a/libminifi/test/unit/ProcessSessionTests.cpp 
b/libminifi/test/unit/ProcessSessionTests.cpp
index f6df20829..55a39b272 100644
--- a/libminifi/test/unit/ProcessSessionTests.cpp
+++ b/libminifi/test/unit/ProcessSessionTests.cpp
@@ -28,22 +28,6 @@
 
 namespace {
 
-class DummyProcessor : public minifi::core::Processor {
-  using minifi::core::Processor::Processor;
-
- public:
-  static constexpr const char* Description = "A processor that does nothing.";
-  static auto properties() { return std::array<core::Property, 0>{}; }
-  static auto relationships() { return std::array<core::Relationship, 0>{}; }
-  static constexpr bool SupportsDynamicProperties = false;
-  static constexpr bool SupportsDynamicRelationships = false;
-  static constexpr core::annotation::Input InputRequirement = 
core::annotation::Input::INPUT_ALLOWED;
-  static constexpr bool IsSingleThreaded = false;
-  ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
-};
-
-REGISTER_RESOURCE(DummyProcessor, Processor);
-
 class Fixture {
  public:
   minifi::core::ProcessSession &processSession() { return *process_session_; }

Reply via email to