This is an automated email from the ASF dual-hosted git repository.
szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new d357fcc69 MINIFICPP-1848 Create a generic solution for processor
metrics
d357fcc69 is described below
commit d357fcc69b6e78553ae18dd4200f982421f86159
Author: Gabor Gyimesi <[email protected]>
AuthorDate: Mon Nov 7 17:38:54 2022 +0100
MINIFICPP-1848 Create a generic solution for processor metrics
Generate common metrics for all MiNiFi processors to be available for
publishing on C2 and Prometheus interfaces.
Closes #1400
Signed-off-by: Marton Szasz <[email protected]>
---
METRICS.md | 105 +++++++++------
cmake/BuildTests.cmake | 2 +-
.../test/integration/features/prometheus.feature | 5 +-
.../integration/minifi/core/DockerTestCluster.py | 77 +----------
docker/test/integration/minifi/core/ImageStore.py | 2 +-
.../integration/minifi/core/PrometheusChecker.py | 81 ++++++++++++
.../tests/GCPCredentialsControllerServiceTests.cpp | 17 +--
extensions/http-curl/tests/C2MetricsTest.cpp | 6 +-
extensions/libarchive/BinFiles.cpp | 1 +
.../standard-processors/processors/GetFile.cpp | 16 +--
.../standard-processors/processors/GetFile.h | 71 +++-------
.../standard-processors/processors/GetTCP.cpp | 8 +-
extensions/standard-processors/processors/GetTCP.h | 51 +------
.../tests/unit/ProcessorTests.cpp | 8 ++
libminifi/include/core/ProcessSession.h | 21 ++-
libminifi/include/core/Processor.h | 28 +++-
libminifi/include/core/ProcessorMetrics.h | 85 ++++++++++++
libminifi/include/core/state/nodes/MetricsBase.h | 40 +-----
libminifi/src/core/ProcessSession.cpp | 34 +++--
libminifi/src/core/Processor.cpp | 19 ++-
libminifi/src/core/ProcessorMetrics.cpp | 147 +++++++++++++++++++++
.../src/core/state/nodes/ResponseNodeLoader.cpp | 7 +-
libminifi/test/DummyProcessor.cpp | 25 ++++
libminifi/test/DummyProcessor.h | 43 ++++++
.../test/unit/ContentRepositoryDependentTests.h | 17 +--
libminifi/test/unit/MetricsTests.cpp | 46 +++++++
libminifi/test/unit/ProcessSessionTests.cpp | 16 ---
27 files changed, 626 insertions(+), 352 deletions(-)
diff --git a/METRICS.md b/METRICS.md
index fa4b4c7b5..543b67f4c 100644
--- a/METRICS.md
+++ b/METRICS.md
@@ -20,9 +20,18 @@ This readme defines the metrics published by Apache NiFi.
All options defined ar
## Table of Contents
-- [Description](#description)
-- [Configuration](#configuration)
-- [Metrics](#metrics)
+- [Apache NiFi - MiNiFi - C++ Metrics
Readme.](#apache-nifi---minifi---c-metrics-readme)
+ - [Table of Contents](#table-of-contents)
+ - [Description](#description)
+ - [Configuration](#configuration)
+ - [System Metrics](#system-metrics)
+ - [QueueMetrics](#queuemetrics)
+ - [RepositoryMetrics](#repositorymetrics)
+ - [DeviceInfoNode](#deviceinfonode)
+ - [FlowInformation](#flowinformation)
+ - [Processor Metrics](#processor-metrics)
+ - [General Metrics](#general-metrics)
+ - [GetFileMetrics](#getfilemetrics)
## Description
@@ -35,22 +44,22 @@ Aside from the publisher exposed metrics, metrics are also
sent through C2 proto
To configure the a metrics publisher first we have to set which publisher
class should be used:
- # in minifi.properties
+ # in minifi.properties
- nifi.metrics.publisher.class=PrometheusMetricsPublisher
+ nifi.metrics.publisher.class=PrometheusMetricsPublisher
Currently PrometheusMetricsPublisher is the only available publisher in MiNiFi
C++ which publishes metrics to a Prometheus server.
To use the publisher a port should also be configured where the metrics will
be available to be scraped through:
- # in minifi.properties
+ # in minifi.properties
- nifi.metrics.publisher.PrometheusMetricsPublisher.port=9936
+ nifi.metrics.publisher.PrometheusMetricsPublisher.port=9936
The following option defines which metric classes should be exposed through
the metrics publisher in configured with a comma separated value:
- # in minifi.properties
+ # in minifi.properties
-
nifi.metrics.publisher.metrics=QueueMetrics,RepositoryMetrics,GetFileMetrics,DeviceInfoNode,FlowInformation
+
nifi.metrics.publisher.metrics=QueueMetrics,RepositoryMetrics,GetFileMetrics,DeviceInfoNode,FlowInformation
An agent identifier should also be defined to identify which agent the metric
is exposed from. If not set, the hostname is used as the identifier.
@@ -58,7 +67,7 @@ An agent identifier should also be defined to identify which
agent the metric is
nifi.metrics.publisher.agent.identifier=Agent1
-## Metrics
+## System Metrics
The following section defines the currently available metrics to be published
by the MiNiFi C++ agent.
@@ -103,43 +112,15 @@ RepositoryMetrics is a system level metric that reports
metrics for the register
|--------------------------|-----------------------------------------------------------------|
| repository_name | Name of the reported repository
|
-### GetFileMetrics
-
-Processor level metric that reports metrics for the GetFile processor if
defined in the flow configuration
-
-| Metric name | Labels | Description
|
-|-----------------------|--------------------------------|------------------------------------------------|
-| onTrigger_invocations | processor_name, processor_uuid | Number of times the
processor was triggered |
-| accepted_files | processor_name, processor_uuid | Number of files
that matched the set criterias |
-| input_bytes | processor_name, processor_uuid | Sum of file sizes
processed |
-
-| Label | Description
|
-|----------------|----------------------------------------------------------------|
-| processor_name | Name of the processor
|
-| processor_uuid | UUID of the processor
|
-
-### GetTCPMetrics
-
-Processor level metric that reports metrics for the GetTCPMetrics processor if
defined in the flow configuration
-
-| Metric name | Labels | Description
|
-|-----------------------|--------------------------------|------------------------------------------------|
-| onTrigger_invocations | processor_name, processor_uuid | Number of times the
processor was triggered |
-
-| Label | Description
|
-|----------------|----------------------------------------------------------------|
-| processor_name | Name of the processor
|
-| processor_uuid | UUID of the processor
|
-
### DeviceInfoNode
DeviceInfoNode is a system level metric that reports metrics about the system
resources used and available
-| Metric name | Labels | Description |
-|-----------------|--------------|---------------------------|
-| physical_mem | - | Physical memory available |
-| memory_usage | - | Memory used by the agent |
-| cpu_utilization | - | CPU utilized by the agent |
+| Metric name | Labels | Description |
+|-----------------|--------------|-------------------------------------|
+| physical_mem | - | Physical memory available |
+| memory_usage | - | Physical memory usage of the system |
+| cpu_utilization | - | CPU utilized by the system |
### FlowInformation
@@ -159,3 +140,41 @@ DeviceInfoNode is a system level metric that reports
metrics about the system re
| connection_name | Name of the connection defined in the flow configuration
|
| component_uuid | UUID of the component
|
| component_name | Name of the component
|
+
+## Processor Metrics
+
+Processor level metrics can be accessed for any processor provided by MiNiFi.
These metrics correspond to the name of the processor appended by the "Metrics"
suffix (e.g. GetFileMetrics, TailFileMetrics, etc.).
+
+### General Metrics
+
+There are general metrics that are available for all processors. Besides these
metrics processors can implement additional metrics that are speicific to that
processor.
+
+| Metric name | Labels
| Description
|
+|----------------------------------------|----------------------------------------------|-------------------------------------------------------------------------------------|
+| onTrigger_invocations | metric_class, processor_name,
processor_uuid | The number of processor onTrigger calls
|
+| average_onTrigger_runtime_milliseconds | metric_class, processor_name,
processor_uuid | The average runtime in milliseconds of the last 10 onTrigger
calls of the processor |
+| last_onTrigger_runtime_milliseconds | metric_class, processor_name,
processor_uuid | The runtime in milliseconds of the last onTrigger call of the
processor |
+| transferred_flow_files | metric_class, processor_name,
processor_uuid | Number of flow files transferred to a relationship
|
+| transferred_bytes | metric_class, processor_name,
processor_uuid | Number of bytes transferred to a relationship
|
+| transferred_to_\<relationship\> | metric_class, processor_name,
processor_uuid | Number of flow files transferred to a specific relationship
|
+
+| Label | Description
|
+|----------------|------------------------------------------------------------------------|
+| metric_class | Class name to filter for this metric, set to \<processor
type\>Metrics |
+| processor_name | Name of the processor
|
+| processor_uuid | UUID of the processor
|
+
+### GetFileMetrics
+
+Processor level metric that reports metrics for the GetFile processor if
defined in the flow configuration.
+
+| Metric name | Labels |
Description |
+|-----------------------|----------------------------------------------|------------------------------------------------|
+| accepted_files | metric_class, processor_name, processor_uuid |
Number of files that matched the set criterias |
+| input_bytes | metric_class, processor_name, processor_uuid | Sum
of file sizes processed |
+
+| Label | Description
|
+|----------------|----------------------------------------------------------------|
+| metric_class | Class name to filter for this metric, set to GetFileMetrics
|
+| processor_name | Name of the processor
|
+| processor_uuid | UUID of the processor
|
diff --git a/cmake/BuildTests.cmake b/cmake/BuildTests.cmake
index a0346f41c..62fc03ebf 100644
--- a/cmake/BuildTests.cmake
+++ b/cmake/BuildTests.cmake
@@ -83,7 +83,7 @@ endfunction()
enable_testing(test)
SET(TEST_BASE_LIB test_base)
-set(TEST_BASE_SOURCES "TestBase.cpp" "RandomServerSocket.cpp"
"StatefulProcessor.cpp" "WriteToFlowFileTestProcessor.cpp"
"ReadFromFlowFileTestProcessor.cpp")
+set(TEST_BASE_SOURCES "TestBase.cpp" "RandomServerSocket.cpp"
"StatefulProcessor.cpp" "WriteToFlowFileTestProcessor.cpp"
"ReadFromFlowFileTestProcessor.cpp" "DummyProcessor.cpp")
list(TRANSFORM TEST_BASE_SOURCES PREPEND "${TEST_DIR}/")
add_library(${TEST_BASE_LIB} STATIC "${TEST_BASE_SOURCES}")
target_link_libraries(${TEST_BASE_LIB} core-minifi)
diff --git a/docker/test/integration/features/prometheus.feature
b/docker/test/integration/features/prometheus.feature
index e22aa5faf..8c37a0109 100644
--- a/docker/test/integration/features/prometheus.feature
+++ b/docker/test/integration/features/prometheus.feature
@@ -13,12 +13,15 @@ Feature: MiNiFi can publish metrics to Prometheus server
Then "RepositoryMetrics" is published to the Prometheus server in less
than 60 seconds
And "QueueMetrics" is published to the Prometheus server in less than 60
seconds
And "GetFileMetrics" processor metric is published to the Prometheus
server in less than 60 seconds for "GetFile1" processor
+ And "PutFileMetrics" processor metric is published to the Prometheus
server in less than 60 seconds for "PutFile" processor
And "FlowInformation" is published to the Prometheus server in less than
60 seconds
And "DeviceInfoNode" is published to the Prometheus server in less than 60
seconds
- Scenario: Multiple GetFile metrics are reported by Prometheus
+ Scenario: Multiple GetFile metrics are reported by Prometheus
Given a GetFile processor with the name "GetFile1" and the "Input
Directory" property set to "/tmp/input"
And a GetFile processor with the name "GetFile2" and the "Input Directory"
property set to "/tmp/input"
+ And the "Keep Source File" property of the GetFile1 processor is set to
"true"
+ And the "Keep Source File" property of the GetFile2 processor is set to
"true"
And "GetFile2" processor is a start node
And a file with the content "test" is present in "/tmp/input"
And a PutFile processor with the "Directory" property set to "/tmp/output"
diff --git a/docker/test/integration/minifi/core/DockerTestCluster.py
b/docker/test/integration/minifi/core/DockerTestCluster.py
index 51fa9fd30..e89895dd0 100644
--- a/docker/test/integration/minifi/core/DockerTestCluster.py
+++ b/docker/test/integration/minifi/core/DockerTestCluster.py
@@ -24,10 +24,10 @@ import tempfile
from .LogSource import LogSource
from .SingleNodeDockerCluster import SingleNodeDockerCluster
+from .PrometheusChecker import PrometheusChecker
from .utils import retry_check
from azure.storage.blob import BlobServiceClient
from azure.core.exceptions import ResourceExistsError
-from prometheus_api_client import PrometheusConnect
class DockerTestCluster(SingleNodeDockerCluster):
@@ -314,78 +314,7 @@ class DockerTestCluster(SingleNodeDockerCluster):
return container.put_archive(os.path.dirname(dst_path),
data.read())
def wait_for_metric_class_on_prometheus(self, metric_class,
timeout_seconds):
- start_time = time.perf_counter()
- while (time.perf_counter() - start_time) < timeout_seconds:
- if self.verify_metric_class(metric_class):
- return True
- time.sleep(1)
- return False
+ return
PrometheusChecker().wait_for_metric_class_on_prometheus(metric_class,
timeout_seconds)
def wait_for_processor_metric_on_prometheus(self, metric_class,
timeout_seconds, processor_name):
- start_time = time.perf_counter()
- while (time.perf_counter() - start_time) < timeout_seconds:
- if self.verify_processor_metric(metric_class, processor_name):
- return True
- time.sleep(1)
- return False
-
- def verify_processor_metric(self, metric_class, processor_name):
- if metric_class == "GetFileMetrics":
- return self.verify_getfile_metrics(processor_name)
- else:
- raise Exception("Metric class '%s' verification is not
implemented" % metric_class)
-
- def verify_metric_class(self, metric_class):
- if metric_class == "RepositoryMetrics":
- return self.verify_repository_metrics()
- elif metric_class == "QueueMetrics":
- return self.verify_queue_metrics()
- elif metric_class == "FlowInformation":
- return self.verify_flow_information_metrics()
- elif metric_class == "DeviceInfoNode":
- return self.verify_device_info_node_metrics()
- else:
- raise Exception("Metric class '%s' verification is not
implemented" % metric_class)
-
- def verify_repository_metrics(self):
- prom = PrometheusConnect(url="http://localhost:9090", disable_ssl=True)
- label_list = [{'repository_name': 'provenance'}, {'repository_name':
'flowfile'}]
- for labels in label_list:
- if not (self.verify_metric_exists(prom, 'minifi_is_running',
'RepositoryMetrics', labels)
- and self.verify_metric_exists(prom, 'minifi_is_full',
'RepositoryMetrics', labels)
- and self.verify_metric_exists(prom,
'minifi_repository_size', 'RepositoryMetrics', labels)):
- return False
- return True
-
- def verify_queue_metrics(self):
- prom = PrometheusConnect(url="http://localhost:9090", disable_ssl=True)
- return self.verify_metric_exists(prom, 'minifi_queue_data_size',
'QueueMetrics') and \
- self.verify_metric_exists(prom, 'minifi_queue_data_size_max',
'QueueMetrics') and \
- self.verify_metric_exists(prom, 'minifi_queue_size',
'QueueMetrics') and \
- self.verify_metric_exists(prom, 'minifi_queue_size_max',
'QueueMetrics')
-
- def verify_getfile_metrics(self, processor_name):
- prom = PrometheusConnect(url="http://localhost:9090", disable_ssl=True)
- labels = {'processor_name': processor_name}
- return self.verify_metric_exists(prom, 'minifi_onTrigger_invocations',
'GetFileMetrics', labels) and \
- self.verify_metric_exists(prom, 'minifi_accepted_files',
'GetFileMetrics', labels) and \
- self.verify_metric_exists(prom, 'minifi_input_bytes',
'GetFileMetrics', labels)
-
- def verify_flow_information_metrics(self):
- prom = PrometheusConnect(url="http://localhost:9090", disable_ssl=True)
- return self.verify_metric_exists(prom, 'minifi_queue_data_size',
'FlowInformation') and \
- self.verify_metric_exists(prom, 'minifi_queue_data_size_max',
'FlowInformation') and \
- self.verify_metric_exists(prom, 'minifi_queue_size',
'FlowInformation') and \
- self.verify_metric_exists(prom, 'minifi_queue_size_max',
'FlowInformation') and \
- self.verify_metric_exists(prom, 'minifi_is_running',
'FlowInformation', {'component_name': 'FlowController'})
-
- def verify_device_info_node_metrics(self):
- prom = PrometheusConnect(url="http://localhost:9090", disable_ssl=True)
- return self.verify_metric_exists(prom, 'minifi_physical_mem',
'DeviceInfoNode') and \
- self.verify_metric_exists(prom, 'minifi_memory_usage',
'DeviceInfoNode') and \
- self.verify_metric_exists(prom, 'minifi_cpu_utilization',
'DeviceInfoNode')
-
- def verify_metric_exists(self, prometheus_client, metric_name,
metric_class, labels={}):
- labels['metric_class'] = metric_class
- labels['agent_identifier'] = "Agent1"
- return
len(prometheus_client.get_current_metric_value(metric_name=metric_name,
label_config=labels)) > 0
+ return
PrometheusChecker().wait_for_processor_metric_on_prometheus(metric_class,
timeout_seconds, processor_name)
diff --git a/docker/test/integration/minifi/core/ImageStore.py
b/docker/test/integration/minifi/core/ImageStore.py
index 23d46afaa..7f45e8273 100644
--- a/docker/test/integration/minifi/core/ImageStore.py
+++ b/docker/test/integration/minifi/core/ImageStore.py
@@ -105,7 +105,7 @@ class ImageStore:
RUN echo nifi.metrics.publisher.agent.identifier=Agent1 >>
{minifi_root}/conf/minifi.properties
RUN echo
nifi.metrics.publisher.class=PrometheusMetricsPublisher >>
{minifi_root}/conf/minifi.properties
RUN echo
nifi.metrics.publisher.PrometheusMetricsPublisher.port=9936 >>
{minifi_root}/conf/minifi.properties
- RUN echo
nifi.metrics.publisher.metrics=RepositoryMetrics,QueueMetrics,GetFileMetrics,GetTCPMetrics,FlowInformation,DeviceInfoNode
>> {minifi_root}/conf/minifi.properties
+ RUN echo
nifi.metrics.publisher.metrics=RepositoryMetrics,QueueMetrics,GetFileMetrics,GetTCPMetrics,PutFileMetrics,FlowInformation,DeviceInfoNode
>> {minifi_root}/conf/minifi.properties
USER minificpp
""".format(base_image='apacheminificpp:' +
MinifiContainer.MINIFI_VERSION,
minifi_root=MinifiContainer.MINIFI_ROOT))
diff --git a/docker/test/integration/minifi/core/PrometheusChecker.py
b/docker/test/integration/minifi/core/PrometheusChecker.py
new file mode 100644
index 000000000..936ae4178
--- /dev/null
+++ b/docker/test/integration/minifi/core/PrometheusChecker.py
@@ -0,0 +1,81 @@
+import time
+from prometheus_api_client import PrometheusConnect
+
+
+class PrometheusChecker:
+ def __init__(self):
+ self.prometheus_client =
PrometheusConnect(url="http://localhost:9090", disable_ssl=True)
+
+ def wait_for_metric_class_on_prometheus(self, metric_class,
timeout_seconds):
+ start_time = time.perf_counter()
+ while (time.perf_counter() - start_time) < timeout_seconds:
+ if self.verify_metric_class(metric_class):
+ return True
+ time.sleep(1)
+ return False
+
+ def wait_for_processor_metric_on_prometheus(self, metric_class,
timeout_seconds, processor_name):
+ start_time = time.perf_counter()
+ while (time.perf_counter() - start_time) < timeout_seconds:
+ if self.verify_processor_metric(metric_class, processor_name):
+ return True
+ time.sleep(1)
+ return False
+
+ def verify_processor_metric(self, metric_class, processor_name):
+ if metric_class == "GetFileMetrics":
+ return self.verify_getfile_metrics(metric_class, processor_name)
+ else:
+ return self.verify_general_processor_metrics(metric_class,
processor_name)
+
+ def verify_metric_class(self, metric_class):
+ if metric_class == "RepositoryMetrics":
+ return self.verify_repository_metrics()
+ elif metric_class == "QueueMetrics":
+ return self.verify_queue_metrics()
+ elif metric_class == "FlowInformation":
+ return self.verify_flow_information_metrics()
+ elif metric_class == "DeviceInfoNode":
+ return self.verify_device_info_node_metrics()
+ else:
+ raise Exception("Metric class '%s' verification is not
implemented" % metric_class)
+
+ def verify_repository_metrics(self):
+ label_list = [{'repository_name': 'provenance'}, {'repository_name':
'flowfile'}]
+ return all((self.verify_metrics_exist(['minifi_is_running',
'minifi_is_full', 'minifi_repository_size'], 'RepositoryMetrics', labels) for
labels in label_list))
+
+ def verify_queue_metrics(self):
+ return self.verify_metrics_exist(['minifi_queue_data_size',
'minifi_queue_data_size_max', 'minifi_queue_size', 'minifi_queue_size_max'],
'QueueMetrics')
+
+ def verify_general_processor_metrics(self, metric_class, processor_name):
+ labels = {'processor_name': processor_name}
+ return
self.verify_metrics_exist(['minifi_average_onTrigger_runtime_milliseconds',
'minifi_last_onTrigger_runtime_milliseconds'], metric_class, labels) and \
+
self.verify_metrics_larger_than_zero(['minifi_onTrigger_invocations',
'minifi_transferred_flow_files', 'minifi_transferred_to_success',
'minifi_transferred_bytes'], metric_class, labels)
+
+ def verify_getfile_metrics(self, metric_class, processor_name):
+ labels = {'processor_name': processor_name}
+ return self.verify_general_processor_metrics(metric_class,
processor_name) and \
+ self.verify_metrics_exist(['minifi_input_bytes',
'minifi_accepted_files'], metric_class, labels)
+
+ def verify_flow_information_metrics(self):
+ return self.verify_metrics_exist(['minifi_queue_data_size',
'minifi_queue_data_size_max', 'minifi_queue_size', 'minifi_queue_size_max'],
'FlowInformation') and \
+ self.verify_metric_exists('minifi_is_running', 'FlowInformation',
{'component_name': 'FlowController'})
+
+ def verify_device_info_node_metrics(self):
+ return self.verify_metrics_exist(['minifi_physical_mem',
'minifi_memory_usage', 'minifi_cpu_utilization'], 'DeviceInfoNode')
+
+ def verify_metric_exists(self, metric_name, metric_class, labels={}):
+ labels['metric_class'] = metric_class
+ labels['agent_identifier'] = "Agent1"
+ return
len(self.prometheus_client.get_current_metric_value(metric_name=metric_name,
label_config=labels)) > 0
+
+ def verify_metrics_exist(self, metric_names, metric_class, labels={}):
+ return all((self.verify_metric_exists(metric_name, metric_class,
labels) for metric_name in metric_names))
+
+ def verify_metric_larger_than_zero(self, metric_name, metric_class,
labels={}):
+ labels['metric_class'] = metric_class
+ result =
self.prometheus_client.get_current_metric_value(metric_name=metric_name,
label_config=labels)
+ return len(result) > 0 and int(result[0]['value'][1]) > 0
+
+ def verify_metrics_larger_than_zero(self, metric_names, metric_class,
labels={}):
+ return all((self.verify_metric_larger_than_zero(metric_name,
metric_class, labels) for metric_name in metric_names))
diff --git a/extensions/gcp/tests/GCPCredentialsControllerServiceTests.cpp
b/extensions/gcp/tests/GCPCredentialsControllerServiceTests.cpp
index 9254d0902..b26fd7155 100644
--- a/extensions/gcp/tests/GCPCredentialsControllerServiceTests.cpp
+++ b/extensions/gcp/tests/GCPCredentialsControllerServiceTests.cpp
@@ -26,6 +26,7 @@
#include "rapidjson/stream.h"
#include "rapidjson/writer.h"
#include "google/cloud/internal/setenv.h"
+#include "DummyProcessor.h"
using GCPCredentialsControllerService =
org::apache::nifi::minifi::extensions::gcp::GCPCredentialsControllerService;
@@ -62,22 +63,6 @@ std::optional<std::filesystem::path>
create_mock_json_file(const std::filesystem
p.close();
return path;
}
-
-class DummyProcessor : public org::apache::nifi::minifi::core::Processor {
- public:
- using minifi::core::Processor::Processor;
-
- static constexpr const char* Description = "A processor that does nothing.";
- static auto properties() { return std::array<core::Property, 0>{}; }
- static auto relationships() { return std::array<core::Relationship, 0>{}; }
- static constexpr bool SupportsDynamicProperties = false;
- static constexpr bool SupportsDynamicRelationships = false;
- static constexpr core::annotation::Input InputRequirement =
core::annotation::Input::INPUT_ALLOWED;
- static constexpr bool IsSingleThreaded = false;
- ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
-};
-
-REGISTER_RESOURCE(DummyProcessor, Processor);
} // namespace
class GCPCredentialsTests : public ::testing::Test {
diff --git a/extensions/http-curl/tests/C2MetricsTest.cpp
b/extensions/http-curl/tests/C2MetricsTest.cpp
index 99fe46e52..0da989316 100644
--- a/extensions/http-curl/tests/C2MetricsTest.cpp
+++ b/extensions/http-curl/tests/C2MetricsTest.cpp
@@ -173,7 +173,11 @@ class MetricsHandler: public HeartbeatHandler {
return processor_metrics.HasMember("GetTCPMetrics") &&
processor_metrics["GetTCPMetrics"].HasMember(GETTCP1_UUID) &&
processor_metrics["GetTCPMetrics"][GETTCP1_UUID].HasMember("OnTriggerInvocations")
&&
-
processor_metrics["GetTCPMetrics"][GETTCP1_UUID]["OnTriggerInvocations"].GetUint()
> 0;
+
processor_metrics["GetTCPMetrics"][GETTCP1_UUID]["OnTriggerInvocations"].GetUint()
> 0 &&
+
processor_metrics["GetTCPMetrics"][GETTCP1_UUID].HasMember("TransferredFlowFiles")
&&
+
processor_metrics["GetTCPMetrics"][GETTCP1_UUID].HasMember("AverageOnTriggerRunTime")
&&
+
processor_metrics["GetTCPMetrics"][GETTCP1_UUID].HasMember("LastOnTriggerRunTime")
&&
+
processor_metrics["GetTCPMetrics"][GETTCP1_UUID].HasMember("TransferredBytes");
}
[[nodiscard]] static std::string getReplacementConfigAsJsonValue(const
std::string& replacement_config_path) {
diff --git a/extensions/libarchive/BinFiles.cpp
b/extensions/libarchive/BinFiles.cpp
index 82b31c282..cf2fd4446 100644
--- a/extensions/libarchive/BinFiles.cpp
+++ b/extensions/libarchive/BinFiles.cpp
@@ -266,6 +266,7 @@ void BinFiles::onTrigger(const
std::shared_ptr<core::ProcessContext> &context, c
// for each merge as a rollback erases all
// previously added files
core::ProcessSession mergeSession(context);
+ mergeSession.setMetrics(metrics_);
std::unique_ptr<Bin> bin = std::move(readyBins.front());
readyBins.pop_front();
// add bin's flows to the session
diff --git a/extensions/standard-processors/processors/GetFile.cpp
b/extensions/standard-processors/processors/GetFile.cpp
index c1451d917..10480b518 100644
--- a/extensions/standard-processors/processors/GetFile.cpp
+++ b/extensions/standard-processors/processors/GetFile.cpp
@@ -136,8 +136,6 @@ void GetFile::onSchedule(core::ProcessContext *context,
core::ProcessSessionFact
}
void GetFile::onTrigger(core::ProcessContext* /*context*/,
core::ProcessSession* session) {
- metrics_->iterations_++;
-
const bool is_dir_empty_before_poll = isListingEmpty();
logger_->log_debug("Listing is %s before polling directory",
is_dir_empty_before_poll ? "empty" : "not empty");
if (is_dir_empty_before_poll) {
@@ -194,7 +192,7 @@ bool GetFile::isListingEmpty() const {
return directory_listing_.empty();
}
-void GetFile::putListing(std::string fileName) {
+void GetFile::putListing(const std::string& fileName) {
logger_->log_trace("Adding file to queue: %s", fileName);
std::lock_guard<std::mutex> lock(directory_listing_mutex_);
@@ -213,7 +211,7 @@ std::queue<std::string> GetFile::pollListing(uint64_t
batch_size) {
return list;
}
-bool GetFile::fileMatchesRequestCriteria(std::string fullName, std::string
name, const GetFileRequest &request) {
+bool GetFile::fileMatchesRequestCriteria(const std::string& fullName, const
std::string& name, const GetFileRequest &request) {
logger_->log_trace("Checking file: %s", fullName);
std::error_code ec;
@@ -248,8 +246,9 @@ bool GetFile::fileMatchesRequestCriteria(std::string
fullName, std::string name,
return false;
}
- metrics_->input_bytes_ += file_size;
- metrics_->accepted_files_++;
+ auto* const getfile_metrics = static_cast<GetFileMetrics*>(metrics_.get());
+ getfile_metrics->input_bytes += file_size;
+ ++getfile_metrics->accepted_files;
return true;
}
@@ -264,11 +263,6 @@ void GetFile::performListing(const GetFileRequest
&request) {
utils::file::list_dir(request.inputDirectory, callback, logger_,
request.recursive);
}
-int16_t
GetFile::getMetricNodes(std::vector<std::shared_ptr<state::response::ResponseNode>>
&metric_vector) {
- metric_vector.push_back(metrics_);
- return 0;
-}
-
REGISTER_RESOURCE(GetFile, Processor);
} // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/standard-processors/processors/GetFile.h
b/extensions/standard-processors/processors/GetFile.h
index 086ec41bd..9e7d9ca9c 100644
--- a/extensions/standard-processors/processors/GetFile.h
+++ b/extensions/standard-processors/processors/GetFile.h
@@ -48,70 +48,40 @@ struct GetFileRequest {
std::string inputDirectory;
};
-class GetFileMetrics : public state::response::ResponseNode {
+class GetFileMetrics : public core::ProcessorMetrics {
public:
- explicit GetFileMetrics(const CoreComponent& source_component)
- : state::response::ResponseNode("GetFileMetrics"),
- source_component_(source_component) {
- }
-
- std::string getName() const override {
- return core::Connectable::getName();
+ explicit GetFileMetrics(const core::Processor& source_processor)
+ : core::ProcessorMetrics(source_processor) {
}
std::vector<state::response::SerializedResponseNode> serialize() override {
- std::vector<state::response::SerializedResponseNode> resp;
-
- state::response::SerializedResponseNode root_node;
- root_node.name = source_component_.getUUIDStr();
-
- state::response::SerializedResponseNode iter;
- iter.name = "OnTriggerInvocations";
- iter.value = (uint32_t)iterations_.load();
-
- root_node.children.push_back(iter);
+ auto resp = core::ProcessorMetrics::serialize();
+ auto& root_node = resp[0];
- state::response::SerializedResponseNode accepted_files;
- accepted_files.name = "AcceptedFiles";
- accepted_files.value = (uint32_t)accepted_files_.load();
+ state::response::SerializedResponseNode
accepted_files_node{"AcceptedFiles", accepted_files.load()};
+ root_node.children.push_back(accepted_files_node);
- root_node.children.push_back(accepted_files);
-
- state::response::SerializedResponseNode input_bytes;
- input_bytes.name = "InputBytes";
- input_bytes.value = (uint32_t)input_bytes_.load();
-
- root_node.children.push_back(input_bytes);
- resp.push_back(root_node);
+ state::response::SerializedResponseNode input_bytes_node{"InputBytes",
input_bytes.load()};
+ root_node.children.push_back(input_bytes_node);
return resp;
}
std::vector<state::PublishedMetric> calculateMetrics() override {
- return {
- {"onTrigger_invocations", static_cast<double>(iterations_.load()),
- {{"metric_class", "GetFileMetrics"}, {"processor_name",
source_component_.getName()}, {"processor_uuid",
source_component_.getUUIDStr()}}},
- {"accepted_files", static_cast<double>(accepted_files_.load()),
- {{"metric_class", "GetFileMetrics"}, {"processor_name",
source_component_.getName()}, {"processor_uuid",
source_component_.getUUIDStr()}}},
- {"input_bytes", static_cast<double>(input_bytes_.load()),
- {{"metric_class", "GetFileMetrics"}, {"processor_name",
source_component_.getName()}, {"processor_uuid",
source_component_.getUUIDStr()}}}
- };
+ auto metrics = core::ProcessorMetrics::calculateMetrics();
+ metrics.push_back({"accepted_files",
static_cast<double>(accepted_files.load()), getCommonLabels()});
+ metrics.push_back({"input_bytes", static_cast<double>(input_bytes.load()),
getCommonLabels()});
+ return metrics;
}
- protected:
- friend class GetFile;
-
- const CoreComponent& source_component_;
- std::atomic<size_t> iterations_{0};
- std::atomic<size_t> accepted_files_{0};
- std::atomic<size_t> input_bytes_{0};
+ std::atomic<uint32_t> accepted_files{0};
+ std::atomic<uint64_t> input_bytes{0};
};
-class GetFile : public core::Processor, public
state::response::MetricsNodeSource {
+class GetFile : public core::Processor {
public:
explicit GetFile(const std::string& name, const utils::Identifier& uuid = {})
- : Processor(name, uuid),
- metrics_(std::make_shared<GetFileMetrics>(*this)) {
+ : Processor(name, uuid, std::make_shared<GetFileMetrics>(*this)) {
}
~GetFile() override = default;
@@ -164,16 +134,13 @@ class GetFile : public core::Processor, public
state::response::MetricsNodeSourc
*/
void performListing(const GetFileRequest &request);
- int16_t
getMetricNodes(std::vector<std::shared_ptr<state::response::ResponseNode>>
&metric_vector) override;
-
private:
bool isListingEmpty() const;
- void putListing(std::string fileName);
+ void putListing(const std::string& fileName);
std::queue<std::string> pollListing(uint64_t batch_size);
- bool fileMatchesRequestCriteria(std::string fullName, std::string name,
const GetFileRequest &request);
+ bool fileMatchesRequestCriteria(const std::string& fullName, const
std::string& name, const GetFileRequest &request);
void getSingleFile(core::ProcessSession& session, const std::string&
file_name) const;
- std::shared_ptr<GetFileMetrics> metrics_;
GetFileRequest request_;
std::queue<std::string> directory_listing_;
mutable std::mutex directory_listing_mutex_;
diff --git a/extensions/standard-processors/processors/GetTCP.cpp
b/extensions/standard-processors/processors/GetTCP.cpp
index d4d7ac93d..4eed2d13d 100644
--- a/extensions/standard-processors/processors/GetTCP.cpp
+++ b/extensions/standard-processors/processors/GetTCP.cpp
@@ -77,7 +77,7 @@ const core::Property GetTCP::EndOfMessageByte(
const core::Relationship GetTCP::Success("success", "All files are routed to
success");
const core::Relationship GetTCP::Partial("partial", "Indicates an incomplete
message as a result of encountering the end of message byte trigger");
-int16_t DataHandler::handle(std::string source, uint8_t *message, size_t size,
bool partial) {
+int16_t DataHandler::handle(const std::string& source, uint8_t *message,
size_t size, bool partial) {
std::shared_ptr<core::ProcessSession> my_session =
sessionFactory_->createSession();
std::shared_ptr<core::FlowFile> flowFile = my_session->create();
@@ -224,7 +224,6 @@ void GetTCP::notifyStop() {
}
void GetTCP::onTrigger(const std::shared_ptr<core::ProcessContext> &context,
const std::shared_ptr<core::ProcessSession>& /*session*/) {
// Perform directory list
- metrics_->iterations_++;
std::lock_guard<std::mutex> lock(mutex_);
// check if the futures are valid. If they've terminated remove it from the
map.
@@ -289,11 +288,6 @@ void GetTCP::onTrigger(const
std::shared_ptr<core::ProcessContext> &context, con
context->yield();
}
-int16_t
GetTCP::getMetricNodes(std::vector<std::shared_ptr<state::response::ResponseNode>>
&metric_vector) {
- metric_vector.push_back(metrics_);
- return 0;
-}
-
REGISTER_RESOURCE(GetTCP, Processor);
} // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/standard-processors/processors/GetTCP.h
b/extensions/standard-processors/processors/GetTCP.h
index 7e326eb19..c2085ce37 100644
--- a/extensions/standard-processors/processors/GetTCP.h
+++ b/extensions/standard-processors/processors/GetTCP.h
@@ -91,54 +91,13 @@ class DataHandler {
}
static const char *SOURCE_ENDPOINT_ATTRIBUTE;
- int16_t handle(std::string source, uint8_t *message, size_t size, bool
partial);
+ int16_t handle(const std::string& source, uint8_t *message, size_t size,
bool partial);
private:
std::shared_ptr<core::ProcessSessionFactory> sessionFactory_;
};
-class GetTCPMetrics : public state::response::ResponseNode {
- public:
- explicit GetTCPMetrics(const CoreComponent& source_component)
- : state::response::ResponseNode("GetTCPMetrics"),
- source_component_(source_component) {
- }
-
- std::string getName() const override {
- return core::Connectable::getName();
- }
-
- std::vector<state::response::SerializedResponseNode> serialize() override {
- std::vector<state::response::SerializedResponseNode> resp;
-
- state::response::SerializedResponseNode root_node;
- root_node.name = source_component_.getUUIDStr();
-
- state::response::SerializedResponseNode iter;
- iter.name = "OnTriggerInvocations";
- iter.value = (uint32_t)iterations_.load();
-
- root_node.children.push_back(iter);
- resp.push_back(root_node);
-
- return resp;
- }
-
- std::vector<state::PublishedMetric> calculateMetrics() override {
- return {
- {"onTrigger_invocations", static_cast<double>(iterations_.load()),
- {{"metric_class", getName()}, {"processor_name",
source_component_.getName()}, {"processor_uuid",
source_component_.getUUIDStr()}}}
- };
- }
-
- protected:
- friend class GetTCP;
-
- const CoreComponent& source_component_;
- std::atomic<size_t> iterations_{0};
-};
-
-class GetTCP : public core::Processor, public
state::response::MetricsNodeSource {
+class GetTCP : public core::Processor {
public:
explicit GetTCP(const std::string& name, const utils::Identifier& uuid = {})
: Processor(name, uuid),
@@ -147,8 +106,7 @@ class GetTCP : public core::Processor, public
state::response::MetricsNodeSource
concurrent_handlers_(2),
endOfMessageByte(static_cast<std::byte>(13)),
receive_buffer_size_(16 * 1024 * 1024),
- connection_attempt_limit_(3),
- metrics_(std::make_shared<GetTCPMetrics>(*this)) {
+ connection_attempt_limit_(3) {
}
~GetTCP() override {
@@ -200,8 +158,6 @@ class GetTCP : public core::Processor, public
state::response::MetricsNodeSource
}
void initialize() override;
- int16_t
getMetricNodes(std::vector<std::shared_ptr<state::response::ResponseNode>>
&metric_vector) override;
-
protected:
void notifyStop() override;
@@ -218,7 +174,6 @@ class GetTCP : public core::Processor, public
state::response::MetricsNodeSource
std::chrono::milliseconds reconnect_interval_{5000};
uint64_t receive_buffer_size_;
uint16_t connection_attempt_limit_;
- std::shared_ptr<GetTCPMetrics> metrics_;
// Mutex for ensuring clients are running
std::mutex mutex_;
std::shared_ptr<minifi::controllers::SSLContextService> ssl_service_;
diff --git a/extensions/standard-processors/tests/unit/ProcessorTests.cpp
b/extensions/standard-processors/tests/unit/ProcessorTests.cpp
index d563baf59..3b4d49079 100644
--- a/extensions/standard-processors/tests/unit/ProcessorTests.cpp
+++ b/extensions/standard-processors/tests/unit/ProcessorTests.cpp
@@ -822,3 +822,11 @@ TEST_CASE("isSingleThreaded - two threads for a single
threaded processor", "[is
REQUIRE(LogTestController::getInstance().contains("[warning] Processor
myProc can not be run in parallel, its "
"\"max concurrent tasks\"
value is too high. It was set to 1 from 2."));
}
+
+TEST_CASE("Test getProcessorType", "[getProcessorType]") {
+ TestController testController;
+
+ std::shared_ptr<TestPlan> plan = testController.createPlan();
+ auto processor = plan->addProcessor("GenerateFlowFile", "myProc");
+ REQUIRE(processor->getProcessorType() == "GenerateFlowFile");
+}
diff --git a/libminifi/include/core/ProcessSession.h
b/libminifi/include/core/ProcessSession.h
index 470df5801..702e35231 100644
--- a/libminifi/include/core/ProcessSession.h
+++ b/libminifi/include/core/ProcessSession.h
@@ -27,6 +27,7 @@
#include <atomic>
#include <algorithm>
#include <set>
+#include <unordered_map>
#include "ProcessContext.h"
#include "FlowFileRecord.h"
@@ -37,6 +38,7 @@
#include "WeakReference.h"
#include "provenance/Provenance.h"
#include "utils/gsl.h"
+#include "ProcessorMetrics.h"
namespace org::apache::nifi::minifi::core {
namespace detail {
@@ -82,11 +84,11 @@ class ProcessSession : public ReferenceContainer {
// Clone a new UUID FlowFile from parent for attributes and sub set of
parent content resource claim
std::shared_ptr<core::FlowFile> clone(const std::shared_ptr<core::FlowFile>
&parent, int64_t offset, int64_t size);
// Transfer the FlowFile to the relationship
- virtual void transfer(const std::shared_ptr<core::FlowFile> &flow, const
Relationship& relationship);
+ virtual void transfer(const std::shared_ptr<core::FlowFile>& flow, const
Relationship& relationship);
// Put Attribute
- void putAttribute(const std::shared_ptr<core::FlowFile> &flow, const
std::string& key, const std::string& value);
+ void putAttribute(const std::shared_ptr<core::FlowFile>& flow, const
std::string& key, const std::string& value);
// Remove Attribute
- void removeAttribute(const std::shared_ptr<core::FlowFile> &flow, const
std::string& key);
+ void removeAttribute(const std::shared_ptr<core::FlowFile>& flow, const
std::string& key);
// Remove Flow File
void remove(const std::shared_ptr<core::FlowFile> &flow);
// Access the contents of the flow file as an input stream; returns null if
the flow file has no content claim
@@ -144,6 +146,10 @@ class ProcessSession : public ReferenceContainer {
bool existsFlowFileInRelationship(const Relationship &relationship);
+ void setMetrics(const std::shared_ptr<ProcessorMetrics>& metrics) {
+ metrics_ = metrics;
+ }
+
// Prevent default copy constructor and assignment operation
// Only support pass by reference or pointer
ProcessSession(const ProcessSession &parent) = delete;
@@ -174,7 +180,12 @@ class ProcessSession : public ReferenceContainer {
Error_NoRelationship
};
- RouteResult routeFlowFile(const std::shared_ptr<FlowFile>& record);
+ struct TransferMetrics {
+ size_t transfer_count = 0;
+ uint64_t transfer_size = 0;
+ };
+
+ RouteResult routeFlowFile(const std::shared_ptr<FlowFile>& record, const
std::function<void(const FlowFile&, const Relationship&)>& transfer_callback);
void persistFlowFilesBeforeTransfer(
std::map<Connectable*, std::vector<std::shared_ptr<core::FlowFile>>>&
transactionMap,
@@ -197,6 +208,8 @@ class ProcessSession : public ReferenceContainer {
CoreComponentStateManager* stateManager_;
static std::shared_ptr<utils::IdGenerator> id_generator_;
+
+ std::shared_ptr<ProcessorMetrics> metrics_;
};
} // namespace org::apache::nifi::minifi::core
diff --git a/libminifi/include/core/Processor.h
b/libminifi/include/core/Processor.h
index 4dca4b7b0..900f5d0c5 100644
--- a/libminifi/include/core/Processor.h
+++ b/libminifi/include/core/Processor.h
@@ -29,6 +29,7 @@
#include <unordered_set>
#include <unordered_map>
#include <utility>
+#include <vector>
#include "ConfigurableComponent.h"
#include "Connectable.h"
@@ -36,12 +37,23 @@
#include "core/Annotation.h"
#include "Scheduling.h"
#include "utils/TimeUtil.h"
+#include "core/state/nodes/MetricsBase.h"
+#include "ProcessorMetrics.h"
+#include "utils/gsl.h"
+
+#define ADD_GET_PROCESSOR_NAME \
+ std::string getProcessorType() const override { \
+ auto class_name =
org::apache::nifi::minifi::core::getClassName<decltype(*this)>(); \
+ auto splitted =
org::apache::nifi::minifi::utils::StringUtils::split(class_name, "::"); \
+ return splitted[splitted.size() - 1]; \
+ }
#define ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS \
bool supportsDynamicProperties() const override { return
SupportsDynamicProperties; } \
bool supportsDynamicRelationships() const override { return
SupportsDynamicRelationships; } \
minifi::core::annotation::Input getInputRequirement() const override {
return InputRequirement; } \
- bool isSingleThreaded() const override { return IsSingleThreaded; }
+ bool isSingleThreaded() const override { return IsSingleThreaded; } \
+ ADD_GET_PROCESSOR_NAME
namespace org::apache::nifi::minifi {
@@ -62,10 +74,10 @@ constexpr std::chrono::nanoseconds
MINIMUM_SCHEDULING_NANOS{30000};
#define BUILDING_DLL 1
-class Processor : public Connectable, public ConfigurableComponent {
+class Processor : public Connectable, public ConfigurableComponent, public
state::response::ResponseNodeSource {
public:
- Processor(const std::string& name, const utils::Identifier& uuid);
- explicit Processor(const std::string& name);
+ Processor(const std::string& name, const utils::Identifier& uuid,
std::shared_ptr<ProcessorMetrics> metrics = nullptr);
+ explicit Processor(const std::string& name,
std::shared_ptr<ProcessorMetrics> metrics = nullptr);
Processor(const Processor& parent) = delete;
Processor& operator=(const Processor& parent) = delete;
@@ -126,6 +138,8 @@ class Processor : public Connectable, public
ConfigurableComponent {
virtual bool isSingleThreaded() const = 0;
+ virtual std::string getProcessorType() const = 0;
+
void setTriggerWhenEmpty(bool value) {
_triggerWhenEmpty = value;
}
@@ -172,7 +186,6 @@ class Processor : public Connectable, public
ConfigurableComponent {
return !isRunning();
}
- public:
virtual void onTrigger(const std::shared_ptr<ProcessContext> &context, const
std::shared_ptr<ProcessSession> &session) {
onTrigger(context.get(), session.get());
}
@@ -206,6 +219,10 @@ class Processor : public Connectable, public
ConfigurableComponent {
virtual annotation::Input getInputRequirement() const = 0;
+ std::shared_ptr<state::response::ResponseNode> getResponseNodes() override {
+ return metrics_;
+ }
+
protected:
virtual void notifyStop() {
}
@@ -222,6 +239,7 @@ class Processor : public Connectable, public
ConfigurableComponent {
std::atomic<bool> _triggerWhenEmpty;
std::string cron_period_;
+ gsl::not_null<std::shared_ptr<ProcessorMetrics>> metrics_;
private:
mutable std::mutex mutex_;
diff --git a/libminifi/include/core/ProcessorMetrics.h
b/libminifi/include/core/ProcessorMetrics.h
new file mode 100644
index 000000000..ae1aa90dc
--- /dev/null
+++ b/libminifi/include/core/ProcessorMetrics.h
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <string>
+#include <chrono>
+#include <atomic>
+#include <unordered_map>
+#include <mutex>
+#include <vector>
+
+#include "core/state/nodes/MetricsBase.h"
+#include "core/state/PublishedMetricProvider.h"
+
+namespace org::apache::nifi::minifi::core {
+
+template<typename T>
+concept Summable = requires(T x) { x + x; }; // NOLINT(readability/braces)
+
+template<typename T>
+concept DividableByInteger = requires(T x, uint32_t divisor) { x / divisor; };
// NOLINT(readability/braces)
+
+class Processor;
+
+class ProcessorMetrics : public state::response::ResponseNode {
+ public:
+ explicit ProcessorMetrics(const Processor& source_processor);
+
+ [[nodiscard]] std::string getName() const override;
+
+ std::vector<state::response::SerializedResponseNode> serialize() override;
+ std::vector<state::PublishedMetric> calculateMetrics() override;
+ void increaseRelationshipTransferCount(const std::string& relationship,
size_t count = 1);
+ std::chrono::milliseconds getAverageOnTriggerRuntime() const;
+ std::chrono::milliseconds getLastOnTriggerRuntime() const;
+ void addLastOnTriggerRuntime(std::chrono::milliseconds runtime);
+
+ std::atomic<size_t> iterations{0};
+ std::atomic<size_t> transferred_flow_files{0};
+ std::atomic<uint64_t> transferred_bytes{0};
+
+ protected:
+ template<typename ValueType>
+ requires Summable<ValueType> && DividableByInteger<ValueType>
+ class Averager {
+ public:
+ explicit Averager(uint32_t sample_size) : SAMPLE_SIZE_(sample_size),
next_average_index_(SAMPLE_SIZE_) {
+ values_.reserve(SAMPLE_SIZE_);
+ }
+
+ ValueType getAverage() const;
+ ValueType getLastValue() const;
+ void addValue(ValueType runtime);
+
+ private:
+ const uint32_t SAMPLE_SIZE_;
+ mutable std::mutex average_value_mutex_;
+ uint32_t next_average_index_;
+ std::vector<ValueType> values_;
+ };
+
+ [[nodiscard]] std::unordered_map<std::string, std::string> getCommonLabels()
const;
+ static const uint8_t STORED_ON_TRIGGER_RUNTIME_COUNT = 10;
+
+ std::mutex transferred_relationships_mutex_;
+ std::unordered_map<std::string, size_t> transferred_relationships_;
+ const Processor& source_processor_;
+ Averager<std::chrono::milliseconds> on_trigger_runtime_averager_;
+};
+
+} // namespace org::apache::nifi::minifi::core
diff --git a/libminifi/include/core/state/nodes/MetricsBase.h
b/libminifi/include/core/state/nodes/MetricsBase.h
index da545fb14..57ef26187 100644
--- a/libminifi/include/core/state/nodes/MetricsBase.h
+++ b/libminifi/include/core/state/nodes/MetricsBase.h
@@ -154,46 +154,8 @@ class ObjectNode : public ResponseNode {
*/
class ResponseNodeSource {
public:
- ResponseNodeSource() = default;
-
virtual ~ResponseNodeSource() = default;
-
- /**
- * Retrieves all metrics from this source.
- * @param metric_vector -- metrics will be placed in this vector.
- * @return result of the get operation.
- * 0 Success
- * 1 No error condition, but cannot obtain lock in timely manner.
- * -1 failure
- */
- virtual int16_t getResponseNodes(std::vector<std::shared_ptr<ResponseNode>>
&metric_vector) = 0;
-
- virtual int16_t getMetricNodes(std::vector<std::shared_ptr<ResponseNode>>
&metric_vector) = 0;
-};
-
-/**
- * Purpose: Retrieves Metrics from the defined class. The current Metric,
which is a consumable for any reader of Metrics must have the ability to set
metrics.
- *
- */
-class MetricsNodeSource : public ResponseNodeSource {
- public:
- MetricsNodeSource() = default;
-
- virtual ~MetricsNodeSource() = default;
-
- /**
- * Retrieves all metrics from this source.
- * @param metric_vector -- metrics will be placed in this vector.
- * @return result of the get operation.
- * 0 Success
- * 1 No error condition, but cannot obtain lock in timely manner.
- * -1 failure
- */
- virtual int16_t getResponseNodes(std::vector<std::shared_ptr<ResponseNode>>
&metric_vector) {
- return getMetricNodes(metric_vector);
- }
-
- virtual int16_t getMetricNodes(std::vector<std::shared_ptr<ResponseNode>>
&metric_vector) = 0;
+ virtual std::shared_ptr<ResponseNode> getResponseNodes() = 0;
};
class NodeReporter {
diff --git a/libminifi/src/core/ProcessSession.cpp
b/libminifi/src/core/ProcessSession.cpp
index 35dac48fb..b9ca57272 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -201,14 +201,14 @@ void ProcessSession::remove(const
std::shared_ptr<core::FlowFile> &flow) {
provenance_report_->drop(flow, reason);
}
-void ProcessSession::putAttribute(const std::shared_ptr<core::FlowFile> &flow,
const std::string& key, const std::string& value) {
+void ProcessSession::putAttribute(const std::shared_ptr<core::FlowFile>& flow,
const std::string& key, const std::string& value) {
flow->setAttribute(key, value);
std::stringstream details;
details << process_context_->getProcessorNode()->getName() << " modify flow
record " << flow->getUUIDStr() << " attribute " << key << ":" << value;
provenance_report_->modifyAttributes(flow, details.str());
}
-void ProcessSession::removeAttribute(const std::shared_ptr<core::FlowFile>
&flow, const std::string& key) {
+void ProcessSession::removeAttribute(const std::shared_ptr<core::FlowFile>&
flow, const std::string& key) {
flow->removeAttribute(key);
std::stringstream details;
details << process_context_->getProcessorNode()->getName() << " remove flow
record " << flow->getUUIDStr() << " attribute " + key;
@@ -221,7 +221,7 @@ void ProcessSession::penalize(const
std::shared_ptr<core::FlowFile> &flow) {
flow->penalize(penalization_period);
}
-void ProcessSession::transfer(const std::shared_ptr<core::FlowFile> &flow,
const Relationship& relationship) {
+void ProcessSession::transfer(const std::shared_ptr<core::FlowFile>& flow,
const Relationship& relationship) {
logging::LOG_INFO(logger_) << "Transferring " << flow->getUUIDStr() << "
from " << process_context_->getProcessorNode()->getName() << " to relationship
" << relationship.getName();
utils::Identifier uuid = flow->getUUID();
_transferRelationship[uuid] = relationship;
@@ -743,7 +743,7 @@ void ProcessSession::restore(const std::string &key, const
std::shared_ptr<core:
flow->clearStashClaim(key);
}
-ProcessSession::RouteResult ProcessSession::routeFlowFile(const
std::shared_ptr<FlowFile> &record) {
+ProcessSession::RouteResult ProcessSession::routeFlowFile(const
std::shared_ptr<FlowFile> &record, const std::function<void(const FlowFile&,
const Relationship&)>& transfer_callback) {
if (record->isDeleted()) {
return RouteResult::Ok_Deleted;
}
@@ -764,6 +764,8 @@ ProcessSession::RouteResult
ProcessSession::routeFlowFile(const std::shared_ptr<
} else {
// Autoterminated
remove(record);
+ transfer_callback(*record, relationship);
+ return RouteResult::Ok_AutoTerminated;
}
} else {
// We connections, clone the flow and assign the connection accordingly
@@ -772,13 +774,16 @@ ProcessSession::RouteResult
ProcessSession::routeFlowFile(const std::shared_ptr<
if (itConnection == connections.begin()) {
// First connection which the flow need be routed to
record->setConnection(connection);
+ transfer_callback(*record, relationship);
} else {
// Clone the flow file and route to the connection
std::shared_ptr<core::FlowFile> cloneRecord =
this->cloneDuringTransfer(record);
- if (cloneRecord)
+ if (cloneRecord) {
cloneRecord->setConnection(connection);
- else
+ transfer_callback(*cloneRecord, relationship);
+ } else {
throw Exception(PROCESS_SESSION_EXCEPTION, "Can not clone the flow
for transfer " + record->getUUIDStr());
+ }
}
}
}
@@ -787,10 +792,15 @@ ProcessSession::RouteResult
ProcessSession::routeFlowFile(const std::shared_ptr<
void ProcessSession::commit() {
try {
+ std::unordered_map<std::string, TransferMetrics> transfers;
+ auto increaseTransferMetrics = [&](const FlowFile& record, const
Relationship& relationship) {
+ ++transfers[relationship.getName()].transfer_count;
+ transfers[relationship.getName()].transfer_size += record.getSize();
+ };
// First we clone the flow record based on the transferred relationship
for updated flow record
for (auto && it : _updatedFlowFiles) {
auto record = it.second.modified;
- if (routeFlowFile(record) == RouteResult::Error_NoRelationship) {
+ if (routeFlowFile(record, increaseTransferMetrics) ==
RouteResult::Error_NoRelationship) {
// Can not find relationship for the flow
throw Exception(PROCESS_SESSION_EXCEPTION, "Can not find the transfer
relationship for the updated flow " + record->getUUIDStr());
}
@@ -799,7 +809,7 @@ void ProcessSession::commit() {
// Do the same thing for added flow file
for (const auto& it : _addedFlowFiles) {
auto record = it.second;
- if (routeFlowFile(record) == RouteResult::Error_NoRelationship) {
+ if (routeFlowFile(record, increaseTransferMetrics) ==
RouteResult::Error_NoRelationship) {
// Can not find relationship for the flow
throw Exception(PROCESS_SESSION_EXCEPTION, "Can not find the transfer
relationship for the added flow " + record->getUUIDStr());
}
@@ -875,6 +885,14 @@ void ProcessSession::commit() {
}
}
+ if (metrics_) {
+ for (const auto& [relationship_name, transfer_metrics] : transfers) {
+ metrics_->transferred_bytes += transfer_metrics.transfer_size;
+ metrics_->transferred_flow_files += transfer_metrics.transfer_count;
+ metrics_->increaseRelationshipTransferCount(relationship_name,
transfer_metrics.transfer_count);
+ }
+ }
+
// All done
_updatedFlowFiles.clear();
_addedFlowFiles.clear();
diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp
index 5aaffb4a0..4e73c2976 100644
--- a/libminifi/src/core/Processor.cpp
+++ b/libminifi/src/core/Processor.cpp
@@ -20,6 +20,7 @@
#include "core/Processor.h"
#include <ctime>
+#include <cctype>
#include <memory>
#include <set>
@@ -40,9 +41,10 @@ using namespace std::literals::chrono_literals;
namespace org::apache::nifi::minifi::core {
-Processor::Processor(const std::string& name)
+Processor::Processor(const std::string& name,
std::shared_ptr<ProcessorMetrics> metrics)
: Connectable(name),
- logger_(logging::LoggerFactory<Processor>::getLogger()) {
+ logger_(logging::LoggerFactory<Processor>::getLogger()),
+ metrics_(metrics ? std::move(metrics) :
std::make_shared<ProcessorMetrics>(*this)) {
has_work_.store(false);
// Setup the default values
state_ = DISABLED;
@@ -58,9 +60,10 @@ Processor::Processor(const std::string& name)
logger_->log_debug("Processor %s created UUID %s", name_, getUUIDStr());
}
-Processor::Processor(const std::string& name, const utils::Identifier& uuid)
+Processor::Processor(const std::string& name, const utils::Identifier& uuid,
std::shared_ptr<ProcessorMetrics> metrics)
: Connectable(name, uuid),
- logger_(logging::LoggerFactory<Processor>::getLogger()) {
+ logger_(logging::LoggerFactory<Processor>::getLogger()),
+ metrics_(metrics ? std::move(metrics) :
std::make_shared<ProcessorMetrics>(*this)) {
has_work_.store(false);
// Setup the default values
state_ = DISABLED;
@@ -174,11 +177,15 @@ bool Processor::flowFilesOutGoingFull() const {
}
void Processor::onTrigger(ProcessContext *context, ProcessSessionFactory
*sessionFactory) {
+ ++metrics_->iterations;
auto session = sessionFactory->createSession();
+ session->setMetrics(metrics_);
try {
// Call the virtual trigger function
+ const auto start = std::chrono::steady_clock::now();
onTrigger(context, session.get());
+
metrics_->addLastOnTriggerRuntime(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now()
- start));
session->commit();
} catch (const std::exception& exception) {
logger_->log_warn("Caught \"%s\" (%s) during Processor::onTrigger of
processor: %s (%s)",
@@ -193,11 +200,15 @@ void Processor::onTrigger(ProcessContext *context,
ProcessSessionFactory *sessio
}
void Processor::onTrigger(const std::shared_ptr<ProcessContext> &context,
const std::shared_ptr<ProcessSessionFactory> &sessionFactory) {
+ ++metrics_->iterations;
auto session = sessionFactory->createSession();
+ session->setMetrics(metrics_);
try {
// Call the virtual trigger function
+ const auto start = std::chrono::steady_clock::now();
onTrigger(context, session);
+
metrics_->addLastOnTriggerRuntime(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now()
- start));
session->commit();
} catch (std::exception &exception) {
logger_->log_warn("Caught \"%s\" (%s) during Processor::onTrigger of
processor: %s (%s)",
diff --git a/libminifi/src/core/ProcessorMetrics.cpp
b/libminifi/src/core/ProcessorMetrics.cpp
new file mode 100644
index 000000000..e243a0178
--- /dev/null
+++ b/libminifi/src/core/ProcessorMetrics.cpp
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "core/ProcessorMetrics.h"
+
+#include "core/Processor.h"
+#include "utils/gsl.h"
+#include "range/v3/numeric/accumulate.hpp"
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::core {
+
+ProcessorMetrics::ProcessorMetrics(const Processor& source_processor)
+ : source_processor_(source_processor),
+ on_trigger_runtime_averager_(STORED_ON_TRIGGER_RUNTIME_COUNT) {
+}
+
+std::string ProcessorMetrics::getName() const {
+ return source_processor_.getProcessorType() + "Metrics";
+}
+
+std::unordered_map<std::string, std::string>
ProcessorMetrics::getCommonLabels() const {
+ return {{"metric_class", getName()}, {"processor_name",
source_processor_.getName()}, {"processor_uuid",
source_processor_.getUUIDStr()}};
+}
+
+std::vector<state::response::SerializedResponseNode>
ProcessorMetrics::serialize() {
+ std::vector<state::response::SerializedResponseNode> resp;
+
+ state::response::SerializedResponseNode root_node {
+ .name = source_processor_.getUUIDStr(),
+ .children = {
+ {.name = "OnTriggerInvocations", .value =
static_cast<uint32_t>(iterations.load())},
+ {.name = "AverageOnTriggerRunTime", .value =
static_cast<uint64_t>(getAverageOnTriggerRuntime().count())},
+ {.name = "LastOnTriggerRunTime", .value =
static_cast<uint64_t>(getLastOnTriggerRuntime().count())},
+ {.name = "TransferredFlowFiles", .value =
static_cast<uint32_t>(transferred_flow_files.load())},
+ {.name = "TransferredBytes", .value = transferred_bytes.load()}
+ }
+ };
+
+ {
+ std::lock_guard<std::mutex> lock(transferred_relationships_mutex_);
+ for (const auto& [relationship, count] : transferred_relationships_) {
+ gsl_Expects(!relationship.empty());
+ state::response::SerializedResponseNode transferred_to_relationship_node;
+ transferred_to_relationship_node.name =
std::string("TransferredTo").append(1,
toupper(relationship[0])).append(relationship.substr(1));
+ transferred_to_relationship_node.value = static_cast<uint32_t>(count);
+
+ root_node.children.push_back(transferred_to_relationship_node);
+ }
+ }
+
+ resp.push_back(root_node);
+
+ return resp;
+}
+
+std::vector<state::PublishedMetric> ProcessorMetrics::calculateMetrics() {
+ std::vector<state::PublishedMetric> metrics = {
+ {"onTrigger_invocations", static_cast<double>(iterations.load()),
getCommonLabels()},
+ {"average_onTrigger_runtime_milliseconds",
static_cast<double>(getAverageOnTriggerRuntime().count()), getCommonLabels()},
+ {"last_onTrigger_runtime_milliseconds",
static_cast<double>(getLastOnTriggerRuntime().count()), getCommonLabels()},
+ {"transferred_flow_files",
static_cast<double>(transferred_flow_files.load()), getCommonLabels()},
+ {"transferred_bytes", static_cast<double>(transferred_bytes.load()),
getCommonLabels()}
+ };
+
+ {
+ std::lock_guard<std::mutex> lock(transferred_relationships_mutex_);
+ for (const auto& [relationship, count] : transferred_relationships_) {
+ metrics.push_back({"transferred_to_" + relationship,
static_cast<double>(count),
+ {{"metric_class", getName()}, {"processor_name",
source_processor_.getName()}, {"processor_uuid",
source_processor_.getUUIDStr()}}});
+ }
+ }
+
+ return metrics;
+}
+
+void ProcessorMetrics::increaseRelationshipTransferCount(const std::string&
relationship, size_t count) {
+ std::lock_guard<std::mutex> lock(transferred_relationships_mutex_);
+ transferred_relationships_[relationship] += count;
+}
+
+std::chrono::milliseconds ProcessorMetrics::getAverageOnTriggerRuntime() const
{
+ return on_trigger_runtime_averager_.getAverage();
+}
+
+void ProcessorMetrics::addLastOnTriggerRuntime(std::chrono::milliseconds
runtime) {
+ on_trigger_runtime_averager_.addValue(runtime);
+}
+
+std::chrono::milliseconds ProcessorMetrics::getLastOnTriggerRuntime() const {
+ return on_trigger_runtime_averager_.getLastValue();
+}
+
+
+template<typename ValueType>
+requires Summable<ValueType> && DividableByInteger<ValueType>
+ValueType ProcessorMetrics::Averager<ValueType>::getAverage() const {
+ if (values_.empty()) {
+ return {};
+ }
+ std::lock_guard<std::mutex> lock(average_value_mutex_);
+ return ranges::accumulate(values_, ValueType{}) / values_.size();
+}
+
+template<typename ValueType>
+requires Summable<ValueType> && DividableByInteger<ValueType>
+void ProcessorMetrics::Averager<ValueType>::addValue(ValueType runtime) {
+ std::lock_guard<std::mutex> lock(average_value_mutex_);
+ if (values_.size() < SAMPLE_SIZE_) {
+ values_.push_back(runtime);
+ } else {
+ if (next_average_index_ >= values_.size()) {
+ next_average_index_ = 0;
+ }
+ values_[next_average_index_] = runtime;
+ ++next_average_index_;
+ }
+}
+
+template<typename ValueType>
+requires Summable<ValueType> && DividableByInteger<ValueType>
+ValueType ProcessorMetrics::Averager<ValueType>::getLastValue() const {
+ std::lock_guard<std::mutex> lock(average_value_mutex_);
+ if (values_.empty()) {
+ return {};
+ } else if (values_.size() < SAMPLE_SIZE_) {
+ return values_[values_.size() - 1];
+ } else {
+ return values_[next_average_index_ - 1];
+ }
+}
+
+} // namespace org::apache::nifi::minifi::core
diff --git a/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp
b/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp
index feb44a00e..ad1389223 100644
--- a/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp
+++ b/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp
@@ -56,12 +56,9 @@ void
ResponseNodeLoader::initializeComponentMetrics(core::ProcessGroup* root) {
continue;
}
// we have a metrics source.
- std::vector<std::shared_ptr<ResponseNode>> metric_vector;
- node_source->getResponseNodes(metric_vector);
+ auto metric = node_source->getResponseNodes();
std::lock_guard<std::mutex> guard(component_metrics_mutex_);
- for (const auto& metric : metric_vector) {
- component_metrics_[metric->getName()].push_back(metric);
- }
+ component_metrics_[metric->getName()].push_back(metric);
}
}
diff --git a/libminifi/test/DummyProcessor.cpp
b/libminifi/test/DummyProcessor.cpp
new file mode 100644
index 000000000..783475a7c
--- /dev/null
+++ b/libminifi/test/DummyProcessor.cpp
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "DummyProcessor.h"
+
+#include "core/Resource.h"
+
+namespace org::apache::nifi::minifi::test {
+
+REGISTER_RESOURCE(DummyProcessor, Processor);
+
+} // namespace org::apache::nifi::minifi::test
diff --git a/libminifi/test/DummyProcessor.h b/libminifi/test/DummyProcessor.h
new file mode 100644
index 000000000..a5eceecc4
--- /dev/null
+++ b/libminifi/test/DummyProcessor.h
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <array>
+#include <string>
+
+#include "core/Processor.h"
+#include "agent/agent_docs.h"
+
+namespace org::apache::nifi::minifi::test {
+
+class DummyProcessor : public minifi::core::Processor {
+ using minifi::core::Processor::Processor;
+
+ public:
+ DummyProcessor(const std::string& name, const minifi::utils::Identifier&
uuid) : Processor(name, uuid) {}
+ explicit DummyProcessor(const std::string& name) : Processor(name) {}
+ static constexpr const char* Description = "A processor that does nothing.";
+ static auto properties() { return std::array<core::Property, 0>{}; }
+ static auto relationships() { return std::array<core::Relationship, 0>{}; }
+ static constexpr bool SupportsDynamicProperties = false;
+ static constexpr bool SupportsDynamicRelationships = false;
+ static constexpr core::annotation::Input InputRequirement =
core::annotation::Input::INPUT_ALLOWED;
+ static constexpr bool IsSingleThreaded = false;
+ ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
+};
+
+} // namespace org::apache::nifi::minifi::test
diff --git a/libminifi/test/unit/ContentRepositoryDependentTests.h
b/libminifi/test/unit/ContentRepositoryDependentTests.h
index a476b4558..6674729e8 100644
--- a/libminifi/test/unit/ContentRepositoryDependentTests.h
+++ b/libminifi/test/unit/ContentRepositoryDependentTests.h
@@ -27,6 +27,7 @@
#include "core/Resource.h"
#include "../TestBase.h"
#include "../Catch.h"
+#include "../DummyProcessor.h"
#include "StreamPipe.h"
#pragma once
@@ -53,22 +54,6 @@ struct ReadUntilItCan {
}
};
-class DummyProcessor : public core::Processor {
- public:
- using core::Processor::Processor;
-
- static constexpr const char* Description = "A processor that does nothing.";
- static auto properties() { return std::array<core::Property, 0>{}; }
- static auto relationships() { return std::array<core::Relationship, 0>{}; }
- static constexpr bool SupportsDynamicProperties = false;
- static constexpr bool SupportsDynamicRelationships = false;
- static constexpr core::annotation::Input InputRequirement =
core::annotation::Input::INPUT_ALLOWED;
- static constexpr bool IsSingleThreaded = false;
- ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
-};
-
-REGISTER_RESOURCE(DummyProcessor, Processor);
-
class Fixture {
public:
const core::Relationship Success{"success", "everything is fine"};
diff --git a/libminifi/test/unit/MetricsTests.cpp
b/libminifi/test/unit/MetricsTests.cpp
index a3c96128b..20aeb7c6e 100644
--- a/libminifi/test/unit/MetricsTests.cpp
+++ b/libminifi/test/unit/MetricsTests.cpp
@@ -25,6 +25,11 @@
#include "core/ClassLoader.h"
#include "repository/VolatileContentRepository.h"
#include "ProvenanceTestHelper.h"
+#include "../DummyProcessor.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::test {
TEST_CASE("QueueMetricsTestNoConnections", "[c2m2]") {
minifi::state::response::QueueMetrics metrics;
@@ -203,3 +208,44 @@ TEST_CASE("RepositorymetricsHaveRepo", "[c2m4]") {
REQUIRE("0" == size.value);
}
}
+
+TEST_CASE("Test ProcessorMetrics", "[ProcessorMetrics]") {
+ DummyProcessor dummy_processor("dummy");
+ minifi::core::ProcessorMetrics metrics(dummy_processor);
+
+ REQUIRE("DummyProcessorMetrics" == metrics.getName());
+
+ REQUIRE(metrics.getLastOnTriggerRuntime() == 0ms);
+ REQUIRE(metrics.getAverageOnTriggerRuntime() == 0ms);
+
+ metrics.addLastOnTriggerRuntime(10ms);
+ metrics.addLastOnTriggerRuntime(20ms);
+ metrics.addLastOnTriggerRuntime(30ms);
+
+ REQUIRE(metrics.getLastOnTriggerRuntime() == 30ms);
+ REQUIRE(metrics.getAverageOnTriggerRuntime() == 20ms);
+
+ for (auto i = 0; i < 7; ++i) {
+ metrics.addLastOnTriggerRuntime(50ms);
+ }
+ REQUIRE(metrics.getAverageOnTriggerRuntime() == 41ms);
+ REQUIRE(metrics.getLastOnTriggerRuntime() == 50ms);
+
+ for (auto i = 0; i < 3; ++i) {
+ metrics.addLastOnTriggerRuntime(50ms);
+ }
+ REQUIRE(metrics.getAverageOnTriggerRuntime() == 50ms);
+ REQUIRE(metrics.getLastOnTriggerRuntime() == 50ms);
+
+ for (auto i = 0; i < 10; ++i) {
+ metrics.addLastOnTriggerRuntime(40ms);
+ }
+ REQUIRE(metrics.getAverageOnTriggerRuntime() == 40ms);
+ REQUIRE(metrics.getLastOnTriggerRuntime() == 40ms);
+
+ metrics.addLastOnTriggerRuntime(10ms);
+ REQUIRE(metrics.getLastOnTriggerRuntime() == 10ms);
+ REQUIRE(metrics.getAverageOnTriggerRuntime() == 37ms);
+}
+
+} // namespace org::apache::nifi::minifi::test
diff --git a/libminifi/test/unit/ProcessSessionTests.cpp
b/libminifi/test/unit/ProcessSessionTests.cpp
index f6df20829..55a39b272 100644
--- a/libminifi/test/unit/ProcessSessionTests.cpp
+++ b/libminifi/test/unit/ProcessSessionTests.cpp
@@ -28,22 +28,6 @@
namespace {
-class DummyProcessor : public minifi::core::Processor {
- using minifi::core::Processor::Processor;
-
- public:
- static constexpr const char* Description = "A processor that does nothing.";
- static auto properties() { return std::array<core::Property, 0>{}; }
- static auto relationships() { return std::array<core::Relationship, 0>{}; }
- static constexpr bool SupportsDynamicProperties = false;
- static constexpr bool SupportsDynamicRelationships = false;
- static constexpr core::annotation::Input InputRequirement =
core::annotation::Input::INPUT_ALLOWED;
- static constexpr bool IsSingleThreaded = false;
- ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
-};
-
-REGISTER_RESOURCE(DummyProcessor, Processor);
-
class Fixture {
public:
minifi::core::ProcessSession &processSession() { return *process_session_; }