This is an automated email from the ASF dual-hosted git repository.
szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 07b9641ff MINIFICPP-2501 Add processorStatuses C2 metric node to
FlowInformation
07b9641ff is described below
commit 07b9641ff033c1efb214d050079f4451df4e3eb9
Author: Gabor Gyimesi <[email protected]>
AuthorDate: Tue Jan 21 20:23:00 2025 +0100
MINIFICPP-2501 Add processorStatuses C2 metric node to FlowInformation
Closes #1909
Signed-off-by: Marton Szasz <[email protected]>
---
C2.md | 400 ++++++++++++++++++++-
METRICS.md | 34 +-
.../cluster/checkers/PrometheusChecker.py | 11 +-
libminifi/include/core/Processor.h | 10 +
libminifi/include/core/ProcessorMetrics.h | 10 +-
.../include/core/state/nodes/FlowInformation.h | 42 +--
.../include/core/state/nodes/ResponseNodeLoader.h | 2 +-
libminifi/include/io/StreamCallback.h | 8 +-
.../utils/LineByLineInputOutputStreamCallback.h | 2 +-
libminifi/src/core/ProcessGroup.cpp | 1 +
libminifi/src/core/ProcessSession.cpp | 45 ++-
libminifi/src/core/Processor.cpp | 2 +-
libminifi/src/core/ProcessorMetrics.cpp | 29 +-
libminifi/src/core/state/nodes/FlowInformation.cpp | 69 +++-
.../src/core/state/nodes/ResponseNodeLoader.cpp | 20 +-
.../utils/LineByLineInputOutputStreamCallback.cpp | 22 +-
libminifi/test/integration/C2MetricsTest.cpp | 96 +++--
.../libtest/unit/SingleProcessorTestController.h | 2 +-
libminifi/test/libtest/unit/TestBase.cpp | 2 +
libminifi/test/unit/MetricsTests.cpp | 73 ++++
20 files changed, 756 insertions(+), 124 deletions(-)
diff --git a/C2.md b/C2.md
index ad492ec57..91a29e030 100644
--- a/C2.md
+++ b/C2.md
@@ -26,10 +26,23 @@ options defined are located in minifi.properties.
- [Description](#description)
- [Configuration](#configuration)
- [Base Options](#base-options)
+ - [Flow Id and URL](#flow-id-and-url)
+ - [Agent Identifier Fallback](#agent-identifier-fallback)
- [Metrics](#metrics)
- [UpdatePolicies](#updatepolicies)
- [Triggers](#triggers)
- [C2 File triggers](#c2-file-triggers)
+ - [C2 Response Nodes](#c2-response-nodes)
+ - [AgentInformation](#agentinformation)
+ - [AgentStatus](#agentstatus)
+ - [AssetInformation](#assetinformation)
+ - [BuildInformation](#buildinformation)
+ - [ConfigurationChecksums](#configurationchecksums)
+ - [DeviceInfoNode](#deviceinfonode)
+ - [FlowInformation](#flowinformation)
+ - [QueueMetrics](#queuemetrics)
+ - [RepositoryMetrics](#repositorymetrics)
+ - [Processor Metric Response Nodes](#processor-metric-response-nodes)
## Description
@@ -181,21 +194,40 @@ configuration produces the following JSON:
"uuid": "2438e3c8-015a-1000-79ca-83af40ec1997"
}
},
- "components": {
- "FlowController": {
- "running": true,
- "uuid": "2438e3c8-015a-1000-79ca-83af40ec1990"
- },
- "GetFile": {
- "running": false,
- "uuid": "2438e3c8-015a-1000-79ca-83af40ec1991"
- },
- "LogAttribute": {
- "running": true,
- "uuid": "2438e3c8-015a-1000-79ca-83af40ec1992"
- }
- },
- "flowId": "8db40550-db5d-11ec-95d7-0433c2c9832b"
+ "processorStatuses": [
+ {
+ "id": "5128e3c8-015a-1000-79ca-83af40ec1990",
+ "groupId": "2438e3c8-015a-1000-79ca-83af40ec1990",
+ "bytesRead": 0,
+ "bytesWritten": 0,
+ "flowFilesIn": 0,
+ "flowFilesOut": 0,
+ "bytesIn": 0,
+ "bytesOut": 0,
+ "invocations": 0,
+ "processingNanos": 0,
+ "activeThreadCount": -1,
+ "terminatedThreadCount": -1,
+ "runStatus": "Running"
+ },
+ {
+ "id": "4fe2d51d-076a-49b0-88de-5cf5adf52b8f",
+ "groupId": "2438e3c8-015a-1000-79ca-83af40ec1990",
+ "bytesRead": 0,
+ "bytesWritten": 40,
+ "flowFilesIn": 0,
+ "flowFilesOut": 4,
+ "bytesIn": 0,
+ "bytesOut": 40,
+ "invocations": 4,
+ "processingNanos": 2119148,
+ "activeThreadCount": -1,
+ "terminatedThreadCount": -1,
+ "runStatus": "Running"
+ }
+ ],
+ "flowId": "96273342-b9fe-11ef-a0ad-10f60a596f64",
+ "runStatus": "Running"
}
},
"LoadMetrics": {
@@ -271,3 +303,341 @@ in minifi.properties to activate the file update trigger
specify
# specifying a trigger
nifi.c2.agent.trigger.classes=FileUpdateTrigger
nifi.c2.file.watch=<full path of file to monitor>
+
+## C2 Response Nodes
+
+The following is a list of nodes that can be defined in the minifi.properties
file for the C2 heartbeat response as part of the C2 root nodes defined in the
`nifi.c2.root.classes` property or in the metrics nodes defined in the tree
under `nifi.c2.root.class.definitions` as stated above.
+
+### AgentInformation
+
+Contains information about the agent's build, extensions, supported C2
operations and status of its components.
+
+```
+"agentInfo": {
+ "agentManifest": {
+ "buildInfo": {
+ "compiler": "/usr/lib/ccache/g++",
+ "flags": "
-std=c++20;-Wall;-Wextra;-Werror;-Wno-error=restrict;SODIUM_STATIC=1",
+ "revision": "cc9aaac37a9a6b7efeb3c4394a97522a600a1758",
+ "timestamp": 1734001238,
+ "version": "0.99.1"
+ },
+ "bundles": [
+ {
+ "componentManifest": {
+ "processors": [
+ ...
+ ]
+ },
+ "artifact": "minifi-civet-extensions",
+ "group": "org.apache.nifi.minifi",
+ "version": "0.99.1"
+ }
+ ],
+ "schedulingDefaults": {
+ "defaultMaxConcurrentTasks": 1,
+ "defaultRunDurationNanos": 0,
+ "defaultSchedulingPeriodMillis": 1000,
+ "defaultSchedulingStrategy": "TIMER_DRIVEN",
+ "penalizationPeriodMillis": 30000,
+ "yieldDurationMillis": 1000
+ },
+ "supportedOperations": [
+ {
+ "type": "acknowledge"
+ }
+ ...
+ ],
+ "agentType": "cpp",
+ "identifier": "bH77vXakM0Lkgt8VcDOGZVW3"
+ },
+ "status": {
+ "repositories": {
+ "content_repo": {
+ "entryCount": 0,
+ "full": false,
+ "maxSize": 0,
+ "running": true,
+ "size": 0
+ },
+ "flow_file_repo": {
+ "entryCount": 0,
+ "full": false,
+ "maxSize": 0,
+ "running": true,
+ "size": 0
+ },
+
"org::apache::nifi::minifi::core::repository::VolatileContentRepository": {
+ "entryCount": 4,
+ "full": false,
+ "maxSize": 7864320,
+ "running": true,
+ "size": 40
+ }
+ },
+ "components": {
+ "LogAttribute": {
+ "running": true,
+ "uuid": "5128e3c8-015a-1000-79ca-83af40ec1990"
+ },
+ "GenerateFlowFile": {
+ "running": true,
+ "uuid": "4fe2d51d-076a-49b0-88de-5cf5adf52b8f"
+ },
+ "FlowController": {
+ "running": true,
+ "uuid": "2438e3c8-015a-1000-79ca-83af40ec1990"
+ }
+ },
+ "resourceConsumption": {
+ "cpuUtilization": 0.05,
+ "memoryUsage": 97955840
+ },
+ "uptime": 1025
+ },
+ "agentClass": "test",
+ "agentManifestHash":
"9FFC8326121A816E5B2FD674CE9A34321F89CC690AD0D1FD79DFB5969B3B523D6570520382E82C68CFA347FBD9897FC027E518E98CFA229C18617B062E1C9E77",
+ "identifier": "9628acfe-b9fe-11ef-a0c0-10f60a596f64"
+}
+```
+
+### AgentStatus
+
+Contains information about the agent's status, including the status of its
components, repositories, and resource consumption.
+
+```
+"AgentStatus": {
+ "repositories": {
+ "repo_name": {
+ "entryCount": 0,
+ "full": false,
+ "maxSize": 0,
+ "running": true,
+ "size": 0
+ },
+ "ff": {
+ "entryCount": 0,
+ "full": false,
+ "maxSize": 0,
+ "running": true,
+ "size": 0
+ },
+
"org::apache::nifi::minifi::core::repository::VolatileContentRepository": {
+ "entryCount": 4,
+ "full": false,
+ "maxSize": 7864320,
+ "running": true,
+ "size": 40
+ }
+ },
+ "components": {
+ "LogAttribute": {
+ "running": true,
+ "uuid": "5128e3c8-015a-1000-79ca-83af40ec1990"
+ },
+ "GenerateFlowFile": {
+ "running": true,
+ "uuid": "4fe2d51d-076a-49b0-88de-5cf5adf52b8f"
+ },
+ "FlowController": {
+ "running": true,
+ "uuid": "2438e3c8-015a-1000-79ca-83af40ec1990"
+ }
+ },
+ "resourceConsumption": {
+ "cpuUtilization": 0.0028846153846153849,
+ "memoryUsage": 97955840
+ },
+ "uptime": 995
+}
+```
+
+### AssetInformation
+
+Contains the calculated hash of the assets.
+
+```
+"resourceInfo": {
+ "hash": "null"
+}
+```
+
+### BuildInformation
+
+Contains information about the agent's build.
+
+```
+"BuildInformation": {
+ "compiler": {
+ "compiler_command": "/usr/lib/ccache/g++",
+ "compiler_flags": "
-std=c++20;-Wall;-Wextra;-Werror;-Wno-error=restrict;SODIUM_STATIC=1",
+ "compiler_version": "11.4.0"
+ },
+ "build_date": "1734001238",
+ "build_rev": "cc9aaac37a9a6b7efeb3c4394a97522a600a1758",
+ "build_version": "0.99.1",
+ "device_id": "bH77vXakM0Lkgt8VcDOGZVW3"
+}
+```
+
+### ConfigurationChecksums
+
+Metric node that defines checksums of configuration files in the C2 protocol.
+
+```
+"configurationChecksums": {
+ "SHA256": {
+ "TestC2Metrics.yml":
"9af6589bf7729bb88857aafe98cea4f41df049725401b5f0ded0a7b949d9b90c",
+ "minifi.properties":
"06fb9f4730e3db7d0a0a1ee606a7de3fee5813edf42eab140616e8a2995072df"
+ }
+},
+```
+
+### DeviceInfoNode
+
+Contains information about the device the agent is running on.
+
+```
+"deviceInfo": {
+ "systemInfo": {
+ "cpuLoadAverage": 1.271484375,
+ "cpuUtilization": 0.06179499754781756,
+ "machineArch": "x86_64",
+ "memoryUsage": 12681670656,
+ "operatingSystem": "Linux",
+ "physicalMem": 67081129984,
+ "vCores": 20
+ },
+ "networkInfo": {
+ "hostname": "ggyimesi-5570-ubuntu",
+ "ipAddress": "10.255.0.1"
+ },
+ "identifier": "16475557466943148337"
+}
+```
+
+### FlowInformation
+
+Contains information about the flow the agent is running, including the
versioned flow snapshot URI, queues, components, and processor statuses.
+
+```
+"flowInfo": {
+ "versionedFlowSnapshotURI": {
+ "bucketId": "default",
+ "flowId": "96273342-b9fe-11ef-a0ad-10f60a596f64"
+ },
+ "queues": {
+ "8368e3c8-015a-1003-52ca-83af40ec1332": {
+ "dataSize": 40,
+ "dataSizeMax": 1048576,
+ "name": "GenerateFlowFile/success/LogAttribute",
+ "size": 4,
+ "sizeMax": 0,
+ "uuid": "8368e3c8-015a-1003-52ca-83af40ec1332"
+ }
+ },
+ "processorStatuses": [
+ {
+ "id": "5128e3c8-015a-1000-79ca-83af40ec1990",
+ "groupId": "2438e3c8-015a-1000-79ca-83af40ec1990",
+ "bytesRead": 0,
+ "bytesWritten": 0,
+ "flowFilesIn": 0,
+ "flowFilesOut": 0,
+ "bytesIn": 0,
+ "bytesOut": 0,
+ "invocations": 0,
+ "processingNanos": 0,
+ "activeThreadCount": -1,
+ "terminatedThreadCount": -1,
+ "running": true
+ },
+ {
+ "id": "4fe2d51d-076a-49b0-88de-5cf5adf52b8f",
+ "groupId": "2438e3c8-015a-1000-79ca-83af40ec1990",
+ "bytesRead": 0,
+ "bytesWritten": 40,
+ "flowFilesIn": 0,
+ "flowFilesOut": 4,
+ "bytesIn": 0,
+ "bytesOut": 40,
+ "invocations": 4,
+ "processingNanos": 2119148,
+ "activeThreadCount": -1,
+ "terminatedThreadCount": -1,
+ "running": true
+ }
+ ],
+ "flowId": "96273342-b9fe-11ef-a0ad-10f60a596f64",
+ "running": true
+}
+```
+
+### QueueMetrics
+
+Contains information about the queues in the flow, including the contained
data and number of flow files.
+
+```
+"QueueMetrics": {
+ "GenerateFlowFile/success/LogAttribute": {
+ "datasize": "40",
+ "datasizemax": "1048576",
+ "queued": "4",
+ "queuedmax": "0"
+ }
+}
+```
+
+### RepositoryMetrics
+
+Contains information about the repositories in the agent, including the number
of entries, size, and whether the repository is full.
+
+
+```
+"RepositoryMetrics": {
+ "repo_name": {
+ "entryCount": 0,
+ "full": false,
+ "maxSize": 0,
+ "running": true,
+ "size": 0
+ },
+ "ff": {
+ "entryCount": 0,
+ "full": false,
+ "maxSize": 0,
+ "running": true,
+ "size": 0
+ },
+ "org::apache::nifi::minifi::core::repository::VolatileContentRepository": {
+ "entryCount": 4,
+ "full": false,
+ "maxSize": 7864320,
+ "running": true,
+ "size": 40
+ }
+}
+```
+
+### Processor Metric Response Nodes
+
+Each processor can have its own metrics. These metric nodes can be configured
in the minifi.properties by requesting metrics in the \<ProcessorType\>Metrics
format, for example GetTCPMetrics to request metrics for the GetTCP processors.
Besides configuring processor metrics directly, they can also be configured
using regular expressions with the `processorMetrics/` prefix. For example
`processorMetrics/Get.*Metrics` will match all processor metrics that start
with Get.
+
+```
+"GetTCPMetrics": {
+ "2438e3c8-015a-1000-79ca-83af40ec1991": {
+ "AverageOnTriggerRunTime": 0,
+ "AverageSessionCommitRunTime": 0,
+ "BytesRead": 0,
+ "BytesWritten": 0,
+ "IncomingBytes": 0,
+ "IncomingFlowFiles": 0,
+ "LastOnTriggerRunTime": 0,
+ "LastSessionCommitRunTime": 0,
+ "OnTriggerInvocations": 11,
+ "ProcessingNanos": 729328,
+ "TransferredBytes": 0,
+ "TransferredFlowFiles": 0
+ }
+}
+```
diff --git a/METRICS.md b/METRICS.md
index 7aa53fda7..e28f77a40 100644
--- a/METRICS.md
+++ b/METRICS.md
@@ -162,8 +162,8 @@ RepositoryMetrics is a system level metric that reports
metrics for the register
| rocksdb_table_readers_size_bytes | repository_name | RocksDB's estimated
memory used for reading SST tables (only present if repository uses RocksDB)
|
| rocksdb_all_memory_tables_size_bytes | repository_name | RocksDB's
approximate size of active and unflushed immutable memtables (only present if
repository uses RocksDB) |
-| Label | Description
|
-|--------------------------|---------------------------------------------------------------------------------------------------------------------------------------|
+| Label | Description
|
+|--------------------------|----------------------------------------------------------------------------------------------------------------------------------------|
| repository_name | Name of the reported repository. There are three
repositories present with the following names: `flowfile`, `content` and
`provenance` |
### DeviceInfoNode
@@ -181,13 +181,22 @@ DeviceInfoNode is a system level metric that reports
metrics about the system re
FlowInformation is a system level metric that reports component and queue
related metrics.
-| Metric name | Labels | Description
|
-|----------------------|----------------------------------|--------------------------------------------|
-| queue_data_size | connection_uuid, connection_name | Current queue data
size |
-| queue_data_size_max | connection_uuid, connection_name | Max queue data
size to apply back pressure |
-| queue_size | connection_uuid, connection_name | Current queue size
|
-| queue_size_max | connection_uuid, connection_name | Max queue size to
apply back pressure |
-| is_running | component_uuid, component_name | Check if the
component is running (1 or 0) |
+| Metric name | Labels | Description
|
+|----------------------|----------------------------------|----------------------------------------------------------------------------|
+| queue_data_size | connection_uuid, connection_name | Current queue data
size |
+| queue_data_size_max | connection_uuid, connection_name | Max queue data
size to apply back pressure |
+| queue_size | connection_uuid, connection_name | Current queue size
|
+| queue_size_max | connection_uuid, connection_name | Max queue size to
apply back pressure |
+| is_running | component_uuid, component_name | Check if the
component is running (1 or 0) |
+| bytes_read | processor_uuid, processor_name | Number of bytes
read by the processor |
+| bytes_written | processor_uuid, processor_name | Number of bytes
written by the processor |
+| flow_files_in | processor_uuid, processor_name | Number of flow
files from the incoming queue processed by the processor |
+| flow_files_out | processor_uuid, processor_name | Number of flow
files transferred to outgoing relationship by the processor |
+| bytes_in | processor_uuid, processor_name | Sum of data from
the incoming queue processed by the processor |
+| bytes_out | processor_uuid, processor_name | Sum of data
transferred to outgoing relationship by the processor |
+| invocations | processor_uuid, processor_name | Number of times
the processor was triggered |
+| processing_nanos | processor_uuid, processor_name | Sum of the runtime
spent in the processor in nanoseconds |
+
| Label | Description
|
|-----------------|--------------------------------------------------------------|
@@ -195,6 +204,8 @@ FlowInformation is a system level metric that reports
component and queue relate
| connection_name | Name of the connection defined in the flow configuration
|
| component_uuid | UUID of the component
|
| component_name | Name of the component
|
+| processor_uuid | UUID of the processor
|
+| processor_name | Name of the processor
|
### AgentStatus
@@ -251,6 +262,11 @@ There are general metrics that are available for all
processors. Besides these m
| transferred_flow_files | metric_class, processor_name,
processor_uuid | Number of flow files transferred to a relationship
|
| transferred_bytes | metric_class, processor_name,
processor_uuid | Number of bytes transferred to a relationship
|
| transferred_to_\<relationship\> | metric_class, processor_name,
processor_uuid | Number of flow files transferred to a specific relationship
|
+| incoming_flow_files | metric_class, processor_name,
processor_uuid | Number of flow files from the incoming queue processed by the
processor |
+| incoming_bytes | metric_class, processor_name,
processor_uuid | Sum of data from the incoming queue processed by the processor
|
+| bytes_read | metric_class, processor_name,
processor_uuid | Number of bytes read by the processor
|
+| bytes_written | metric_class, processor_name,
processor_uuid | Number of bytes written by the processor
|
+| processing_nanos | metric_class, processor_name,
processor_uuid | Sum of the runtime spent in the processor in nanoseconds
|
| Label | Description
|
|----------------|------------------------------------------------------------------------|
diff --git a/docker/test/integration/cluster/checkers/PrometheusChecker.py
b/docker/test/integration/cluster/checkers/PrometheusChecker.py
index 8c12bc25f..d73859f58 100644
--- a/docker/test/integration/cluster/checkers/PrometheusChecker.py
+++ b/docker/test/integration/cluster/checkers/PrometheusChecker.py
@@ -60,8 +60,11 @@ class PrometheusChecker:
def verify_general_processor_metrics(self, metric_class, processor_name):
labels = {'processor_name': processor_name}
return
self.verify_metrics_exist(['minifi_average_onTrigger_runtime_milliseconds',
'minifi_last_onTrigger_runtime_milliseconds',
-
'minifi_average_session_commit_runtime_milliseconds',
'minifi_last_session_commit_runtime_milliseconds'], metric_class, labels) and \
-
self.verify_metrics_larger_than_zero(['minifi_onTrigger_invocations',
'minifi_transferred_flow_files', 'minifi_transferred_to_success',
'minifi_transferred_bytes'], metric_class, labels)
+
'minifi_average_session_commit_runtime_milliseconds',
'minifi_last_session_commit_runtime_milliseconds',
+ 'minifi_incoming_flow_files',
'minifi_incoming_bytes', 'minifi_bytes_read', 'minifi_bytes_written'],
metric_class, labels) and \
+
self.verify_metrics_larger_than_zero(['minifi_onTrigger_invocations',
'minifi_transferred_flow_files', 'minifi_transferred_to_success',
+ 'minifi_transferred_bytes',
'minifi_processing_nanos'],
+ metric_class, labels)
def verify_getfile_metrics(self, metric_class, processor_name):
labels = {'processor_name': processor_name}
@@ -69,7 +72,9 @@ class PrometheusChecker:
self.verify_metrics_exist(['minifi_input_bytes',
'minifi_accepted_files'], metric_class, labels)
def verify_flow_information_metrics(self):
- return self.verify_metrics_exist(['minifi_queue_data_size',
'minifi_queue_data_size_max', 'minifi_queue_size', 'minifi_queue_size_max'],
'FlowInformation') and \
+ return self.verify_metrics_exist(['minifi_queue_data_size',
'minifi_queue_data_size_max', 'minifi_queue_size', 'minifi_queue_size_max',
+ 'minifi_bytes_read',
'minifi_bytes_written', 'minifi_flow_files_in', 'minifi_flow_files_out',
'minifi_bytes_in', 'minifi_bytes_out',
+ 'minifi_invocations',
'minifi_processing_nanos'], 'FlowInformation') and \
self.verify_metric_exists('minifi_is_running', 'FlowInformation',
{'component_name': 'FlowController'})
def verify_device_info_node_metrics(self):
diff --git a/libminifi/include/core/Processor.h
b/libminifi/include/core/Processor.h
index 6f25c176c..8dfc3db77 100644
--- a/libminifi/include/core/Processor.h
+++ b/libminifi/include/core/Processor.h
@@ -165,6 +165,14 @@ class Processor : public Connectable, public
ConfigurableComponent, public state
active_tasks_ = 0;
}
+ std::string getProcessGroupUUIDStr() const {
+ return process_group_uuid_;
+ }
+
+ void setProcessGroupUUIDStr(const std::string &uuid) {
+ process_group_uuid_ = uuid;
+ }
+
void yield() override;
void yield(std::chrono::steady_clock::duration delta_time);
@@ -256,6 +264,8 @@ class Processor : public Connectable, public
ConfigurableComponent, public state
// an outgoing connection allows us to reach these nodes
std::unordered_map<Connection*, std::unordered_set<Processor*>>
reachable_processors_;
+
+ std::string process_group_uuid_;
};
} // namespace core
diff --git a/libminifi/include/core/ProcessorMetrics.h
b/libminifi/include/core/ProcessorMetrics.h
index de5c3d157..231f2efb4 100644
--- a/libminifi/include/core/ProcessorMetrics.h
+++ b/libminifi/include/core/ProcessorMetrics.h
@@ -52,10 +52,16 @@ class ProcessorMetrics : public
state::response::ResponseNode {
std::chrono::milliseconds getAverageSessionCommitRuntime() const;
std::chrono::milliseconds getLastSessionCommitRuntime() const;
void addLastSessionCommitRuntime(std::chrono::milliseconds runtime);
+ std::optional<size_t> getTransferredFlowFilesToRelationshipCount(const
std::string& relationship) const;
- std::atomic<size_t> iterations{0};
+ std::atomic<size_t> invocations{0};
+ std::atomic<size_t> incoming_flow_files{0};
std::atomic<size_t> transferred_flow_files{0};
+ std::atomic<uint64_t> incoming_bytes{0};
std::atomic<uint64_t> transferred_bytes{0};
+ std::atomic<uint64_t> bytes_read{0};
+ std::atomic<uint64_t> bytes_written{0};
+ std::atomic<uint64_t> processing_nanos{0};
protected:
template<typename ValueType>
@@ -80,7 +86,7 @@ class ProcessorMetrics : public state::response::ResponseNode
{
[[nodiscard]] std::unordered_map<std::string, std::string> getCommonLabels()
const;
static constexpr uint8_t STORED_ON_TRIGGER_RUNTIME_COUNT = 10;
- std::mutex transferred_relationships_mutex_;
+ mutable std::mutex transferred_relationships_mutex_;
std::unordered_map<std::string, size_t> transferred_relationships_;
const Processor& source_processor_;
Averager<std::chrono::milliseconds> on_trigger_runtime_averager_;
diff --git a/libminifi/include/core/state/nodes/FlowInformation.h
b/libminifi/include/core/state/nodes/FlowInformation.h
index a605bd8b2..5bfc257e1 100644
--- a/libminifi/include/core/state/nodes/FlowInformation.h
+++ b/libminifi/include/core/state/nodes/FlowInformation.h
@@ -26,6 +26,7 @@
#include "core/state/nodes/StateMonitor.h"
#include "Connection.h"
#include "core/state/ConnectionStore.h"
+#include "core/Processor.h"
namespace org::apache::nifi::minifi::state::response {
@@ -92,16 +93,22 @@ class FlowVersion : public DeviceInformation {
std::shared_ptr<FlowIdentifier> identifier;
};
-class FlowMonitor : public StateMonitorNode {
+class FlowInformation : public StateMonitorNode {
public:
- FlowMonitor(std::string_view name, const utils::Identifier &uuid)
+ FlowInformation(std::string_view name, const utils::Identifier &uuid)
: StateMonitorNode(name, uuid) {
}
- explicit FlowMonitor(std::string_view name)
+ explicit FlowInformation(std::string_view name)
: StateMonitorNode(name) {
}
+ MINIFIAPI static constexpr const char* Description = "Metric node that
defines the flow ID and flow URL deployed to this agent";
+
+ std::string getName() const override {
+ return "flowInfo";
+ }
+
void setFlowVersion(std::shared_ptr<state::response::FlowVersion>
flow_version) {
flow_version_ = std::move(flow_version);
}
@@ -110,32 +117,17 @@ class FlowMonitor : public StateMonitorNode {
connection_store_.updateConnection(connection);
}
- protected:
- std::shared_ptr<state::response::FlowVersion> flow_version_;
- ConnectionStore connection_store_;
-};
-
-/**
- * Justification and Purpose: Provides flow version Information
- */
-class FlowInformation : public FlowMonitor {
- public:
- FlowInformation(std::string_view name, const utils::Identifier &uuid)
- : FlowMonitor(name, uuid) {
- }
-
- explicit FlowInformation(std::string_view name)
- : FlowMonitor(name) {
- }
-
- MINIFIAPI static constexpr const char* Description = "Metric node that
defines the flow ID and flow URL deployed to this agent";
-
- std::string getName() const override {
- return "flowInfo";
+ void setProcessors(std::vector<core::Processor*> processors) {
+ processors_ = std::move(processors);
}
std::vector<SerializedResponseNode> serialize() override;
std::vector<PublishedMetric> calculateMetrics() override;
+
+ private:
+ std::shared_ptr<state::response::FlowVersion> flow_version_;
+ ConnectionStore connection_store_;
+ std::vector<core::Processor*> processors_;
};
} // namespace org::apache::nifi::minifi::state::response
diff --git a/libminifi/include/core/state/nodes/ResponseNodeLoader.h
b/libminifi/include/core/state/nodes/ResponseNodeLoader.h
index 31e8ac93f..c94743431 100644
--- a/libminifi/include/core/state/nodes/ResponseNodeLoader.h
+++ b/libminifi/include/core/state/nodes/ResponseNodeLoader.h
@@ -62,7 +62,7 @@ class ResponseNodeLoader {
void initializeAgentNode(const SharedResponseNode& response_node) const;
void initializeAgentStatus(const SharedResponseNode& response_node) const;
void initializeConfigurationChecksums(const SharedResponseNode&
response_node) const;
- void initializeFlowMonitor(const SharedResponseNode& response_node) const;
+ void initializeFlowInformation(const SharedResponseNode& response_node)
const;
void initializeAssetInformation(const SharedResponseNode& response_node)
const;
std::vector<SharedResponseNode> getMatchingComponentMetricsNodes(const
std::string& regex_str) const;
diff --git a/libminifi/include/io/StreamCallback.h
b/libminifi/include/io/StreamCallback.h
index 6d4e6c00a..8467c0f96 100644
--- a/libminifi/include/io/StreamCallback.h
+++ b/libminifi/include/io/StreamCallback.h
@@ -18,16 +18,22 @@
#include <functional>
#include <memory>
+#include <optional>
namespace org::apache::nifi::minifi::io {
class InputStream;
class OutputStream;
+struct ReadWriteResult {
+ int64_t bytes_written = 0;
+ int64_t bytes_read = 0;
+};
+
// FlowFile IO Callback functions for input and output
// throw exception for error
using InputStreamCallback = std::function<int64_t(const
std::shared_ptr<InputStream>& input_stream)>;
using OutputStreamCallback = std::function<int64_t(const
std::shared_ptr<OutputStream>& output_stream)>;
-using InputOutputStreamCallback = std::function<int64_t(const
std::shared_ptr<InputStream>& input_stream, const
std::shared_ptr<OutputStream>& output_stream)>;
+using InputOutputStreamCallback =
std::function<std::optional<ReadWriteResult>(const
std::shared_ptr<InputStream>& input_stream, const
std::shared_ptr<OutputStream>& output_stream)>;
} // namespace org::apache::nifi::minifi::io
diff --git a/libminifi/include/utils/LineByLineInputOutputStreamCallback.h
b/libminifi/include/utils/LineByLineInputOutputStreamCallback.h
index 0d68fe568..b52821d8e 100644
--- a/libminifi/include/utils/LineByLineInputOutputStreamCallback.h
+++ b/libminifi/include/utils/LineByLineInputOutputStreamCallback.h
@@ -33,7 +33,7 @@ class LineByLineInputOutputStreamCallback {
public:
using CallbackType = std::function<std::string(const std::string&
input_line, bool is_first_line, bool is_last_line)>;
explicit LineByLineInputOutputStreamCallback(CallbackType callback);
- int64_t operator()(const std::shared_ptr<io::InputStream>& input, const
std::shared_ptr<io::OutputStream>& output);
+ std::optional<io::ReadWriteResult> operator()(const
std::shared_ptr<io::InputStream>& input, const
std::shared_ptr<io::OutputStream>& output);
private:
int64_t readInput(io::InputStream& stream);
diff --git a/libminifi/src/core/ProcessGroup.cpp
b/libminifi/src/core/ProcessGroup.cpp
index 7fdcf175d..fcf916ceb 100644
--- a/libminifi/src/core/ProcessGroup.cpp
+++ b/libminifi/src/core/ProcessGroup.cpp
@@ -86,6 +86,7 @@ std::tuple<Processor*, bool>
ProcessGroup::addProcessor(std::unique_ptr<Processo
gsl_Expects(processor);
const auto name = processor->getName();
std::lock_guard<std::recursive_mutex> lock(mutex_);
+ processor->setProcessGroupUUIDStr(getUUIDStr());
const auto [iter, inserted] = processors_.insert(std::move(processor));
if (inserted) {
logger_->log_debug("Add processor {} into process group {}", name, name_);
diff --git a/libminifi/src/core/ProcessSession.cpp
b/libminifi/src/core/ProcessSession.cpp
index 79c02ff63..15941f0b7 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -268,6 +268,9 @@ void ProcessSession::write(core::FlowFile &flow, const
io::OutputStreamCallback&
std::string details = process_context_->getProcessorNode()->getName() + "
modify flow record content " + flow.getUUIDStr();
auto duration =
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now()
- start_time);
provenance_report_->modifyContent(flow, details, duration);
+ if (metrics_) {
+ metrics_->bytes_written += stream->size();
+ }
} catch (const std::exception& exception) {
logger_->log_debug("Caught Exception during process session write, type:
{}, what: {}", typeid(exception).name(), exception.what());
throw;
@@ -280,6 +283,7 @@ void ProcessSession::write(core::FlowFile &flow, const
io::OutputStreamCallback&
void ProcessSession::writeBuffer(const std::shared_ptr<core::FlowFile>&
flow_file, std::span<const char> buffer) {
writeBuffer(flow_file, as_bytes(buffer));
}
+
void ProcessSession::writeBuffer(const std::shared_ptr<core::FlowFile>&
flow_file, std::span<const std::byte> buffer) {
write(flow_file, [buffer](const std::shared_ptr<io::OutputStream>&
output_stream) {
const auto write_status = output_stream->write(buffer);
@@ -316,6 +320,9 @@ void ProcessSession::append(const
std::shared_ptr<core::FlowFile> &flow, const i
throw Exception(FILE_OPERATION_EXCEPTION, "Failed to process flowfile
content");
}
flow->setSize(flow_file_size + (stream->size() -
stream_size_before_callback));
+ if (metrics_) {
+ metrics_->bytes_written += stream->size() - stream_size_before_callback;
+ }
std::stringstream details;
details << process_context_->getProcessorNode()->getName() << " modify
flow record content " << flow->getUUIDStr();
@@ -373,6 +380,9 @@ int64_t ProcessSession::read(const core::FlowFile&
flow_file, const io::InputStr
if (ret < 0) {
throw Exception(FILE_OPERATION_EXCEPTION, "Failed to process flowfile
content");
}
+ if (metrics_) {
+ metrics_->bytes_read += ret;
+ }
return ret;
} catch (const std::exception& exception) {
logger_->log_debug("Caught Exception {}", exception.what());
@@ -383,7 +393,6 @@ int64_t ProcessSession::read(const core::FlowFile&
flow_file, const io::InputStr
}
}
-
int64_t ProcessSession::readWrite(const std::shared_ptr<core::FlowFile> &flow,
const io::InputOutputStreamCallback& callback) {
gsl_Expects(callback);
@@ -409,19 +418,23 @@ int64_t ProcessSession::readWrite(const
std::shared_ptr<core::FlowFile> &flow, c
throw Exception(FILE_OPERATION_EXCEPTION, "Failed to open flowfile
content for write");
}
- int64_t bytes_written = callback(input_stream, output_stream);
- if (bytes_written < 0) {
+ auto read_write_result = callback(input_stream, output_stream);
+ if (!read_write_result) {
throw Exception(FILE_OPERATION_EXCEPTION, "Failed to process flowfile
content");
}
input_stream->close();
output_stream->close();
- flow->setSize(gsl::narrow<uint64_t>(bytes_written));
+ flow->setSize(gsl::narrow<uint64_t>(read_write_result->bytes_written));
flow->setOffset(0);
flow->setResourceClaim(output_claim);
+ if (metrics_) {
+ metrics_->bytes_written += read_write_result->bytes_written;
+ metrics_->bytes_read += read_write_result->bytes_read;
+ }
- return bytes_written;
+ return read_write_result->bytes_written;
} catch (const std::exception& exception) {
logger_->log_debug("Caught exception during process session readWrite,
type: {}, what: {}", typeid(exception).name(), exception.what());
throw;
@@ -486,6 +499,9 @@ void ProcessSession::importFrom(io::InputStream &stream,
const std::shared_ptr<c
flow->getOffset(), flow->getSize(),
flow->getResourceClaim()->getContentFullPath(), flow->getUUIDStr());
content_stream->close();
+ if (metrics_) {
+ metrics_->bytes_written += content_stream->size();
+ }
std::stringstream details;
details << process_context_->getProcessorNode()->getName() << " modify
flow record content " << flow->getUUIDStr();
auto duration =
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now()
- start_time);
@@ -547,6 +563,9 @@ void ProcessSession::import(const std::string& source,
const std::shared_ptr<Flo
flow->getUUIDStr());
stream->close();
+ if (metrics_) {
+ metrics_->bytes_written += stream->size();
+ }
input.close();
if (!keepSource) {
(void)std::remove(source.c_str());
@@ -649,6 +668,9 @@ void ProcessSession::import(const std::string& source,
std::vector<std::shared_p
logger_->log_debug("Import offset {} length {} into content {},
FlowFile UUID {}",
flowFile->getOffset(), flowFile->getSize(),
flowFile->getResourceClaim()->getContentFullPath(), flowFile->getUUIDStr());
stream->close();
+ if (metrics_) {
+ metrics_->bytes_written += stream->size();
+ }
std::string details = process_context_->getProcessorNode()->getName()
+ " modify flow record content " + flowFile->getUUIDStr();
auto duration =
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now()
- start_time);
provenance_report_->modifyContent(*flowFile, details, duration);
@@ -819,7 +841,7 @@ void ProcessSession::commit() {
const auto commit_start_time = std::chrono::steady_clock::now();
try {
std::unordered_map<std::string, TransferMetrics> transfers;
- auto increaseTransferMetrics = [&](const FlowFile& record, const
Relationship& relationship) {
+ auto increaseTransferMetrics = [&](const FlowFile& record, const
Relationship& relationship) {
++transfers[relationship.getName()].transfer_count;
transfers[relationship.getName()].transfer_size += record.getSize();
};
@@ -930,8 +952,11 @@ void ProcessSession::commit() {
// persistent the provenance report
this->provenance_report_->commit();
logger_->log_debug("ProcessSession committed for {}",
process_context_->getProcessorNode()->getName());
- if (metrics_)
-
metrics_->addLastSessionCommitRuntime(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now()
- commit_start_time));
+ if (metrics_) {
+ auto time_delta = std::chrono::steady_clock::now() - commit_start_time;
+
metrics_->addLastSessionCommitRuntime(std::chrono::duration_cast<std::chrono::milliseconds>(time_delta));
+ metrics_->processing_nanos +=
std::chrono::duration_cast<std::chrono::nanoseconds>(time_delta).count();
+ }
} catch (const std::exception& exception) {
logger_->log_debug("Caught Exception during process session commit, type:
{}, what: {}", typeid(exception).name(), exception.what());
throw;
@@ -1141,6 +1166,10 @@ std::shared_ptr<core::FlowFile> ProcessSession::get() {
if (flow_version != nullptr) {
ret->setAttribute(SpecialFlowAttribute::FLOW_ID,
flow_version->getFlowId());
}
+ if (metrics_) {
+ metrics_->incoming_bytes += ret->getSize();
+ ++metrics_->incoming_flow_files;
+ }
return ret;
}
current =
dynamic_cast<Connection*>(process_context_->getProcessorNode()->pickIncomingConnection());
diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp
index b5cf616e2..77b8d3fa1 100644
--- a/libminifi/src/core/Processor.cpp
+++ b/libminifi/src/core/Processor.cpp
@@ -198,7 +198,7 @@ void Processor::triggerAndCommit(const
std::shared_ptr<ProcessContext>& context,
}
void Processor::trigger(const std::shared_ptr<ProcessContext>& context, const
std::shared_ptr<ProcessSession>& process_session) {
- ++metrics_->iterations;
+ ++metrics_->invocations;
const auto start = std::chrono::steady_clock::now();
onTrigger(*context, *process_session);
metrics_->addLastOnTriggerRuntime(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now()
- start));
diff --git a/libminifi/src/core/ProcessorMetrics.cpp
b/libminifi/src/core/ProcessorMetrics.cpp
index 8cd054888..95e00c487 100644
--- a/libminifi/src/core/ProcessorMetrics.cpp
+++ b/libminifi/src/core/ProcessorMetrics.cpp
@@ -44,13 +44,18 @@ std::vector<state::response::SerializedResponseNode>
ProcessorMetrics::serialize
state::response::SerializedResponseNode root_node {
.name = source_processor_.getUUIDStr(),
.children = {
- {.name = "OnTriggerInvocations", .value =
static_cast<uint32_t>(iterations.load())},
+ {.name = "OnTriggerInvocations", .value =
static_cast<uint32_t>(invocations.load())},
{.name = "AverageOnTriggerRunTime", .value =
static_cast<uint64_t>(getAverageOnTriggerRuntime().count())},
{.name = "LastOnTriggerRunTime", .value =
static_cast<uint64_t>(getLastOnTriggerRuntime().count())},
{.name = "AverageSessionCommitRunTime", .value =
static_cast<uint64_t>(getAverageSessionCommitRuntime().count())},
{.name = "LastSessionCommitRunTime", .value =
static_cast<uint64_t>(getLastSessionCommitRuntime().count())},
{.name = "TransferredFlowFiles", .value =
static_cast<uint32_t>(transferred_flow_files.load())},
- {.name = "TransferredBytes", .value = transferred_bytes.load()}
+ {.name = "TransferredBytes", .value =
static_cast<uint64_t>(transferred_bytes.load())},
+ {.name = "IncomingFlowFiles", .value =
static_cast<uint32_t>(incoming_flow_files.load())},
+ {.name = "IncomingBytes", .value =
static_cast<uint64_t>(incoming_bytes.load())},
+ {.name = "BytesRead", .value = static_cast<uint64_t>(bytes_read.load())},
+ {.name = "BytesWritten", .value =
static_cast<uint64_t>(bytes_written.load())},
+ {.name = "ProcessingNanos", .value =
static_cast<uint64_t>(processing_nanos.load())}
}
};
@@ -59,7 +64,7 @@ std::vector<state::response::SerializedResponseNode>
ProcessorMetrics::serialize
for (const auto& [relationship, count] : transferred_relationships_) {
gsl_Expects(!relationship.empty());
state::response::SerializedResponseNode transferred_to_relationship_node;
- transferred_to_relationship_node.name =
std::string("TransferredTo").append(1,
toupper(relationship[0])).append(relationship.substr(1));
+ transferred_to_relationship_node.name =
std::string("TransferredTo").append(1,
gsl::narrow<char>(toupper(relationship[0]))).append(relationship.substr(1));
transferred_to_relationship_node.value = static_cast<uint32_t>(count);
root_node.children.push_back(transferred_to_relationship_node);
@@ -73,13 +78,18 @@ std::vector<state::response::SerializedResponseNode>
ProcessorMetrics::serialize
std::vector<state::PublishedMetric> ProcessorMetrics::calculateMetrics() {
std::vector<state::PublishedMetric> metrics = {
- {"onTrigger_invocations", static_cast<double>(iterations.load()),
getCommonLabels()},
+ {"onTrigger_invocations", static_cast<double>(invocations.load()),
getCommonLabels()},
{"average_onTrigger_runtime_milliseconds",
static_cast<double>(getAverageOnTriggerRuntime().count()), getCommonLabels()},
{"last_onTrigger_runtime_milliseconds",
static_cast<double>(getLastOnTriggerRuntime().count()), getCommonLabels()},
{"average_session_commit_runtime_milliseconds",
static_cast<double>(getAverageSessionCommitRuntime().count()),
getCommonLabels()},
{"last_session_commit_runtime_milliseconds",
static_cast<double>(getLastSessionCommitRuntime().count()), getCommonLabels()},
{"transferred_flow_files",
static_cast<double>(transferred_flow_files.load()), getCommonLabels()},
- {"transferred_bytes", static_cast<double>(transferred_bytes.load()),
getCommonLabels()}
+ {"transferred_bytes", static_cast<double>(transferred_bytes.load()),
getCommonLabels()},
+ {"incoming_flow_files", static_cast<double>(incoming_flow_files.load()),
getCommonLabels()},
+ {"incoming_bytes", static_cast<double>(incoming_bytes.load()),
getCommonLabels()},
+ {"bytes_read", static_cast<double>(bytes_read.load()), getCommonLabels()},
+ {"bytes_written", static_cast<double>(bytes_written.load()),
getCommonLabels()},
+ {"processing_nanos", static_cast<double>(processing_nanos.load()),
getCommonLabels()}
};
{
@@ -122,6 +132,15 @@ std::chrono::milliseconds
ProcessorMetrics::getLastSessionCommitRuntime() const
return session_commit_runtime_averager_.getLastValue();
}
+std::optional<size_t>
ProcessorMetrics::getTransferredFlowFilesToRelationshipCount(const std::string&
relationship) const {
+ std::lock_guard<std::mutex> lock(transferred_relationships_mutex_);
+ const auto relationship_it = transferred_relationships_.find(relationship);
+ if (relationship_it != std::end(transferred_relationships_)) {
+ return relationship_it->second;
+ }
+ return {};
+}
+
template<typename ValueType>
requires Summable<ValueType> && DividableByInteger<ValueType>
ValueType ProcessorMetrics::Averager<ValueType>::getAverage() const {
diff --git a/libminifi/src/core/state/nodes/FlowInformation.cpp
b/libminifi/src/core/state/nodes/FlowInformation.cpp
index 5756072ac..f31b26c29 100644
--- a/libminifi/src/core/state/nodes/FlowInformation.cpp
+++ b/libminifi/src/core/state/nodes/FlowInformation.cpp
@@ -34,6 +34,12 @@ std::vector<SerializedResponseNode>
FlowInformation::serialize() {
{.name = "flowId", .value = flow_version_->getFlowId()}
};
+ if (nullptr != monitor_) {
+ monitor_->executeOnComponent("FlowController",
[&serialized](StateController& component) {
+ serialized.push_back({.name = "runStatus", .value =
(component.isRunning() ? "Running" : "Stopped")});
+ });
+ }
+
SerializedResponseNode uri;
uri.name = "versionedFlowSnapshotURI";
for (auto &entry : flow_version_->serialize()) {
@@ -61,19 +67,35 @@ std::vector<SerializedResponseNode>
FlowInformation::serialize() {
serialized.push_back(queues);
}
- if (nullptr != monitor_) {
- SerializedResponseNode componentsNode{.name = "components", .collapsible =
false};
- monitor_->executeOnAllComponents([&componentsNode](StateController&
component){
- componentsNode.children.push_back({
- .name = component.getComponentName(),
+ if (!processors_.empty()) {
+ SerializedResponseNode processorsStatusesNode{.name = "processorStatuses",
.array = true, .collapsible = false};
+ for (const auto processor : processors_) {
+ if (!processor) {
+ continue;
+ }
+
+ auto metrics = processor->getMetrics();
+ processorsStatusesNode.children.push_back({
+ .name = processor->getName(),
.collapsible = false,
.children = {
- {.name = "running", .value = component.isRunning()},
- {.name = "uuid", .value =
std::string{component.getComponentUUID().to_string()}}
+ {.name = "id", .value = std::string{processor->getUUIDStr()}},
+ {.name = "groupId", .value = processor->getProcessGroupUUIDStr()},
+ {.name = "bytesRead", .value = metrics->bytes_read.load()},
+ {.name = "bytesWritten", .value = metrics->bytes_written.load()},
+ {.name = "flowFilesIn", .value =
metrics->incoming_flow_files.load()},
+ {.name = "flowFilesOut", .value =
metrics->transferred_flow_files.load()},
+ {.name = "bytesIn", .value = metrics->incoming_bytes.load()},
+ {.name = "bytesOut", .value = metrics->transferred_bytes.load()},
+ {.name = "invocations", .value = metrics->invocations.load()},
+ {.name = "processingNanos", .value =
metrics->processing_nanos.load()},
+ {.name = "activeThreadCount", .value = -1},
+ {.name = "terminatedThreadCount", .value = -1},
+ {.name = "runStatus", .value = (processor->isRunning() ? "Running" :
"Stopped")}
}
});
- });
- serialized.push_back(componentsNode);
+ }
+ serialized.push_back(processorsStatusesNode);
}
return serialized;
@@ -81,13 +103,38 @@ std::vector<SerializedResponseNode>
FlowInformation::serialize() {
std::vector<PublishedMetric> FlowInformation::calculateMetrics() {
std::vector<PublishedMetric> metrics =
connection_store_.calculateConnectionMetrics("FlowInformation");
-
if (nullptr != monitor_) {
- monitor_->executeOnAllComponents([&metrics](StateController& component){
+ monitor_->executeOnComponent("FlowController", [&metrics](StateController&
component) {
metrics.push_back({"is_running", (component.isRunning() ? 1.0 : 0.0),
{{"component_uuid", component.getComponentUUID().to_string()},
{"component_name", component.getComponentName()}, {"metric_class",
"FlowInformation"}}});
});
}
+
+ for (const auto& processor : processors_) {
+ if (!processor) {
+ continue;
+ }
+ auto processor_metrics = processor->getMetrics();
+ metrics.push_back({"bytes_read",
gsl::narrow<double>(processor_metrics->bytes_read.load()),
+ {{"processor_uuid", processor->getUUIDStr()}, {"processor_name",
processor->getName()}, {"metric_class", "FlowInformation"}}});
+ metrics.push_back({"bytes_written",
gsl::narrow<double>(processor_metrics->bytes_written.load()),
+ {{"processor_uuid", processor->getUUIDStr()}, {"processor_name",
processor->getName()}, {"metric_class", "FlowInformation"}}});
+ metrics.push_back({"flow_files_in",
gsl::narrow<double>(processor_metrics->incoming_flow_files.load()),
+ {{"processor_uuid", processor->getUUIDStr()}, {"processor_name",
processor->getName()}, {"metric_class", "FlowInformation"}}});
+ metrics.push_back({"flow_files_out",
gsl::narrow<double>(processor_metrics->transferred_flow_files.load()),
+ {{"processor_uuid", processor->getUUIDStr()}, {"processor_name",
processor->getName()}, {"metric_class", "FlowInformation"}}});
+ metrics.push_back({"bytes_in",
gsl::narrow<double>(processor_metrics->incoming_bytes.load()),
+ {{"processor_uuid", processor->getUUIDStr()}, {"processor_name",
processor->getName()}, {"metric_class", "FlowInformation"}}});
+ metrics.push_back({"bytes_out",
gsl::narrow<double>(processor_metrics->transferred_bytes.load()),
+ {{"processor_uuid", processor->getUUIDStr()}, {"processor_name",
processor->getName()}, {"metric_class", "FlowInformation"}}});
+ metrics.push_back({"invocations",
gsl::narrow<double>(processor_metrics->invocations.load()),
+ {{"processor_uuid", processor->getUUIDStr()}, {"processor_name",
processor->getName()}, {"metric_class", "FlowInformation"}}});
+ metrics.push_back({"processing_nanos",
gsl::narrow<double>(processor_metrics->processing_nanos.load()),
+ {{"processor_uuid", processor->getUUIDStr()}, {"processor_name",
processor->getName()}, {"metric_class", "FlowInformation"}}});
+ metrics.push_back({"is_running", (processor->isRunning() ? 1.0 : 0.0),
+ {{"processor_uuid", processor->getUUIDStr()}, {"processor_name",
processor->getName()}, {"metric_class", "FlowInformation"}}});
+ }
+
return metrics;
}
diff --git a/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp
b/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp
index 2d892a60b..37337e3b0 100644
--- a/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp
+++ b/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp
@@ -213,9 +213,9 @@ void ResponseNodeLoader::initializeAssetInformation(const
SharedResponseNode& re
}
}
-void ResponseNodeLoader::initializeFlowMonitor(const SharedResponseNode&
response_node) const {
- auto flowMonitor =
dynamic_cast<state::response::FlowMonitor*>(response_node.get());
- if (flowMonitor == nullptr) {
+void ResponseNodeLoader::initializeFlowInformation(const SharedResponseNode&
response_node) const {
+ auto flow_information =
dynamic_cast<state::response::FlowInformation*>(response_node.get());
+ if (flow_information == nullptr) {
return;
}
@@ -226,11 +226,17 @@ void ResponseNodeLoader::initializeFlowMonitor(const
SharedResponseNode& respons
}
for (auto &con : connections) {
- flowMonitor->updateConnection(con.second);
+ flow_information->updateConnection(con.second);
}
- flowMonitor->setStateMonitor(update_sink_);
+ flow_information->setStateMonitor(update_sink_);
if (flow_configuration_) {
- flowMonitor->setFlowVersion(flow_configuration_->getFlowVersion());
+ flow_information->setFlowVersion(flow_configuration_->getFlowVersion());
+ }
+
+ if (root_) {
+ std::vector<core::Processor*> processors;
+ root_->getAllProcessors(processors);
+ flow_information->setProcessors(processors);
}
}
@@ -253,7 +259,7 @@ std::vector<SharedResponseNode>
ResponseNodeLoader::loadResponseNodes(const std:
initializeAgentNode(response_node);
initializeAgentStatus(response_node);
initializeConfigurationChecksums(response_node);
- initializeFlowMonitor(response_node);
+ initializeFlowInformation(response_node);
initializeAssetInformation(response_node);
initialized_metrics_.insert(response_node->getName());
}
diff --git a/libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp
b/libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp
index af2307e5c..6be8b7fd3 100644
--- a/libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp
+++ b/libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp
@@ -26,27 +26,37 @@
LineByLineInputOutputStreamCallback::LineByLineInputOutputStreamCallback(Callbac
: callback_(std::move(callback)) {
}
-int64_t LineByLineInputOutputStreamCallback::operator()(const
std::shared_ptr<io::InputStream>& input, const
std::shared_ptr<io::OutputStream>& output) {
+std::optional<io::ReadWriteResult>
LineByLineInputOutputStreamCallback::operator()(const
std::shared_ptr<io::InputStream>& input, const
std::shared_ptr<io::OutputStream>& output) {
gsl_Expects(input);
gsl_Expects(output);
+ io::ReadWriteResult result;
+
if (int64_t status = readInput(*input); status <= 0) {
- return status;
+ if (status < 0) {
+ return std::nullopt;
+ }
+ return result;
}
- std::size_t total_bytes_written_ = 0;
+ result.bytes_read = gsl::narrow<int64_t>(input_.size());
+
+ std::size_t total_bytes_written = 0;
bool is_first_line = true;
readLine();
do {
readLine();
std::string output_line = callback_(*current_line_, is_first_line,
isLastLine());
const auto bytes_written = output->write(reinterpret_cast<const uint8_t
*>(output_line.data()), output_line.size());
- if (io::isError(bytes_written)) { return -1; }
- total_bytes_written_ += bytes_written;
+ if (io::isError(bytes_written)) {
+ return std::nullopt;
+ }
+ total_bytes_written += bytes_written;
is_first_line = false;
} while (!isLastLine());
- return gsl::narrow<int64_t>(total_bytes_written_);
+ result.bytes_written = gsl::narrow<int64_t>(total_bytes_written);
+ return result;
}
int64_t LineByLineInputOutputStreamCallback::readInput(io::InputStream&
stream) {
diff --git a/libminifi/test/integration/C2MetricsTest.cpp
b/libminifi/test/integration/C2MetricsTest.cpp
index 6065b5ecf..6f6ec14ca 100644
--- a/libminifi/test/integration/C2MetricsTest.cpp
+++ b/libminifi/test/integration/C2MetricsTest.cpp
@@ -18,6 +18,7 @@
#include <string>
#include <iostream>
#include <filesystem>
+#include <algorithm>
#include "unit/TestBase.h"
#include "integration/HTTPIntegrationBase.h"
@@ -92,7 +93,10 @@ class MetricsHandler: public HeartbeatHandler {
VERIFY_UPDATED_METRICS
};
- static constexpr const char* GETTCP1_UUID =
"2438e3c8-015a-1000-79ca-83af40ec1991";
+ static constexpr const char* GETTCP_UUID =
"2438e3c8-015a-1000-79ca-83af40ec1991";
+ static constexpr const char* LOGATTRIBUTE1_UUID =
"2438e3c8-015a-1000-79ca-83af40ec1992";
+ static constexpr const char* LOGATTRIBUTE2_UUID =
"5128e3c8-015a-1000-79ca-83af40ec1990";
+ static constexpr const char* GENERATE_FLOWFILE_UUID =
"4fe2d51d-076a-49b0-88de-5cf5adf52b8f";
static void sendEmptyHeartbeatResponse(struct mg_connection* conn) {
mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type:
text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
@@ -126,36 +130,67 @@ class MetricsHandler: public HeartbeatHandler {
}
}
- static bool verifyRuntimeMetrics(const rapidjson::Value& runtime_metrics) {
+ static bool processorMetricsAreValid(const auto& processor) {
+ return processor["bytesRead"].GetInt() >= 0 &&
+ processor["bytesWritten"].GetInt() >= 0 &&
+ processor["flowFilesIn"].GetInt() >= 0 &&
+ processor["flowFilesOut"].GetInt() >= 0 &&
+ processor["bytesIn"].GetInt() >= 0 &&
+ processor["bytesOut"].GetInt() >= 0 &&
+ processor["invocations"].GetInt() >= 0 &&
+ processor["processingNanos"].GetInt() >= 0 &&
+ processor["activeThreadCount"].GetInt() == -1 &&
+ processor["terminatedThreadCount"].GetInt() == -1 &&
+ processor["runStatus"].GetString() == std::string("Running");
+ }
+
+ static bool verifyCommonRuntimeMetricNodes(const rapidjson::Value&
runtime_metrics, const std::string& queue_id) {
return runtime_metrics.HasMember("deviceInfo") &&
runtime_metrics["deviceInfo"]["systemInfo"].HasMember("operatingSystem")
&&
runtime_metrics["deviceInfo"]["networkInfo"].HasMember("hostname") &&
runtime_metrics.HasMember("flowInfo") &&
+ runtime_metrics["flowInfo"].HasMember("flowId") &&
+ runtime_metrics["flowInfo"].HasMember("runStatus") &&
+ runtime_metrics["flowInfo"]["runStatus"].GetString() ==
std::string("Running") &&
runtime_metrics["flowInfo"].HasMember("versionedFlowSnapshotURI") &&
runtime_metrics["flowInfo"].HasMember("queues") &&
- runtime_metrics["flowInfo"].HasMember("components") &&
-
runtime_metrics["flowInfo"]["queues"].HasMember("2438e3c8-015a-1000-79ca-83af40ec1997")
&&
- runtime_metrics["flowInfo"]["components"].HasMember("FlowController") &&
- runtime_metrics["flowInfo"]["components"].HasMember("GetTCP") &&
- runtime_metrics["flowInfo"]["components"].HasMember("LogAttribute") &&
+ runtime_metrics["flowInfo"]["queues"].HasMember(queue_id) &&
runtime_metrics.HasMember("agentInfo") &&
-
runtime_metrics["agentInfo"]["status"]["repositories"]["ff"].HasMember("size");
+
runtime_metrics["agentInfo"]["status"]["repositories"]["ff"].HasMember("size")
&&
+ runtime_metrics["flowInfo"].HasMember("processorStatuses");
+ }
+
+ static bool verifyRuntimeMetrics(const rapidjson::Value& runtime_metrics) {
+ return verifyCommonRuntimeMetricNodes(runtime_metrics,
"2438e3c8-015a-1000-79ca-83af40ec1997") &&
+ [&]() {
+ const auto processor_statuses =
runtime_metrics["flowInfo"]["processorStatuses"].GetArray();
+ if (processor_statuses.Size() != 2) {
+ return false;
+ }
+ return std::all_of(processor_statuses.begin(),
processor_statuses.end(), [&](const auto& processor) {
+ if (processor["id"].GetString() != std::string(GETTCP_UUID) &&
processor["id"].GetString() != std::string(LOGATTRIBUTE1_UUID)) {
+ throw std::runtime_error(std::string("Unexpected processor id in
processorStatuses: ") + processor["id"].GetString());
+ }
+ return processorMetricsAreValid(processor);
+ });
+ }();
}
static bool verifyUpdatedRuntimeMetrics(const rapidjson::Value&
runtime_metrics) {
- return runtime_metrics.HasMember("deviceInfo") &&
- runtime_metrics["deviceInfo"]["systemInfo"].HasMember("operatingSystem")
&&
- runtime_metrics["deviceInfo"]["networkInfo"].HasMember("hostname") &&
- runtime_metrics.HasMember("flowInfo") &&
- runtime_metrics["flowInfo"].HasMember("versionedFlowSnapshotURI") &&
- runtime_metrics["flowInfo"].HasMember("queues") &&
- runtime_metrics["flowInfo"].HasMember("components") &&
-
runtime_metrics["flowInfo"]["queues"].HasMember("8368e3c8-015a-1003-52ca-83af40ec1332")
&&
- runtime_metrics["flowInfo"]["components"].HasMember("FlowController") &&
- runtime_metrics["flowInfo"]["components"].HasMember("GenerateFlowFile")
&&
- runtime_metrics["flowInfo"]["components"].HasMember("LogAttribute") &&
- runtime_metrics.HasMember("agentInfo") &&
-
runtime_metrics["agentInfo"]["status"]["repositories"]["ff"].HasMember("size");
+ return verifyCommonRuntimeMetricNodes(runtime_metrics,
"8368e3c8-015a-1003-52ca-83af40ec1332") &&
+ runtime_metrics["flowInfo"].HasMember("processorStatuses") &&
+ [&]() {
+ const auto processor_statuses =
runtime_metrics["flowInfo"]["processorStatuses"].GetArray();
+ if (processor_statuses.Size() != 2) {
+ return false;
+ }
+ return std::all_of(processor_statuses.begin(),
processor_statuses.end(), [&](const auto& processor) {
+ if (processor["id"].GetString() !=
std::string(GENERATE_FLOWFILE_UUID) && processor["id"].GetString() !=
std::string(LOGATTRIBUTE2_UUID)) {
+ throw std::runtime_error(std::string("Unexpected processor id in
processorStatuses: ") + processor["id"].GetString());
+ }
+ return processorMetricsAreValid(processor);
+ });
+ }();
}
static bool verifyLoadMetrics(const rapidjson::Value& load_metrics) {
@@ -177,13 +212,18 @@ class MetricsHandler: public HeartbeatHandler {
static bool verifyProcessorMetrics(const rapidjson::Value&
processor_metrics) {
return processor_metrics.HasMember("GetTCPMetrics") &&
- processor_metrics["GetTCPMetrics"].HasMember(GETTCP1_UUID) &&
-
processor_metrics["GetTCPMetrics"][GETTCP1_UUID].HasMember("OnTriggerInvocations")
&&
-
processor_metrics["GetTCPMetrics"][GETTCP1_UUID]["OnTriggerInvocations"].GetUint()
> 0 &&
-
processor_metrics["GetTCPMetrics"][GETTCP1_UUID].HasMember("TransferredFlowFiles")
&&
-
processor_metrics["GetTCPMetrics"][GETTCP1_UUID].HasMember("AverageOnTriggerRunTime")
&&
-
processor_metrics["GetTCPMetrics"][GETTCP1_UUID].HasMember("LastOnTriggerRunTime")
&&
-
processor_metrics["GetTCPMetrics"][GETTCP1_UUID].HasMember("TransferredBytes");
+ processor_metrics["GetTCPMetrics"].HasMember(GETTCP_UUID) &&
+
processor_metrics["GetTCPMetrics"][GETTCP_UUID].HasMember("OnTriggerInvocations")
&&
+
processor_metrics["GetTCPMetrics"][GETTCP_UUID]["OnTriggerInvocations"].GetUint()
> 0 &&
+
processor_metrics["GetTCPMetrics"][GETTCP_UUID].HasMember("TransferredFlowFiles")
&&
+
processor_metrics["GetTCPMetrics"][GETTCP_UUID].HasMember("AverageOnTriggerRunTime")
&&
+
processor_metrics["GetTCPMetrics"][GETTCP_UUID].HasMember("LastOnTriggerRunTime")
&&
+
processor_metrics["GetTCPMetrics"][GETTCP_UUID].HasMember("TransferredBytes") &&
+
processor_metrics["GetTCPMetrics"][GETTCP_UUID].HasMember("IncomingFlowFiles")
&&
+
processor_metrics["GetTCPMetrics"][GETTCP_UUID].HasMember("IncomingBytes") &&
+ processor_metrics["GetTCPMetrics"][GETTCP_UUID].HasMember("BytesRead") &&
+
processor_metrics["GetTCPMetrics"][GETTCP_UUID].HasMember("BytesWritten") &&
+
processor_metrics["GetTCPMetrics"][GETTCP_UUID].HasMember("ProcessingNanos");
}
std::atomic_bool& metrics_updated_successfully_;
diff --git a/libminifi/test/libtest/unit/SingleProcessorTestController.h
b/libminifi/test/libtest/unit/SingleProcessorTestController.h
index 0245e3314..5928c6bc5 100644
--- a/libminifi/test/libtest/unit/SingleProcessorTestController.h
+++ b/libminifi/test/libtest/unit/SingleProcessorTestController.h
@@ -39,7 +39,7 @@ class SingleProcessorTestController : public TestController {
public:
explicit SingleProcessorTestController(std::unique_ptr<core::Processor>
processor) {
auto name = processor->getName();
- processor_ = plan->addProcessor(std::move(processor), name);
+ processor_ = plan->addProcessor(std::move(processor), name, {});
input_ = plan->addConnection(nullptr, core::Relationship{"success",
"success"}, processor_);
outgoing_connections_ = [this] {
std::unordered_map<core::Relationship, Connection*> result;
diff --git a/libminifi/test/libtest/unit/TestBase.cpp
b/libminifi/test/libtest/unit/TestBase.cpp
index ee337986e..19f5de798 100644
--- a/libminifi/test/libtest/unit/TestBase.cpp
+++ b/libminifi/test/libtest/unit/TestBase.cpp
@@ -539,11 +539,13 @@ bool TestPlan::runProcessor(size_t target_location, const
PreTriggerVerifier& ve
if (verify) {
auto current_session =
std::make_shared<minifi::core::ProcessSession>(context);
+ current_session->setMetrics(processor->getMetrics());
process_sessions_.push_back(current_session);
verify(context, current_session);
current_session->commit();
} else {
auto session_factory = std::make_shared<TestSessionFactory>(context, [&]
(auto current_session) {
+ current_session->setMetrics(processor->getMetrics());
process_sessions_.push_back(current_session);
});
logger_->log_info("Running {}", processor->getName());
diff --git a/libminifi/test/unit/MetricsTests.cpp
b/libminifi/test/unit/MetricsTests.cpp
index 16e732c18..a8c4ad5ee 100644
--- a/libminifi/test/unit/MetricsTests.cpp
+++ b/libminifi/test/unit/MetricsTests.cpp
@@ -27,6 +27,7 @@
#include "unit/ProvenanceTestHelper.h"
#include "unit/DummyProcessor.h"
#include "range/v3/algorithm/find_if.hpp"
+#include "unit/SingleProcessorTestController.h"
using namespace std::literals::chrono_literals;
@@ -285,4 +286,76 @@ TEST_CASE("Test commit runtime processor metrics",
"[ProcessorMetrics]") {
REQUIRE(metrics.getAverageSessionCommitRuntime() == 37ms);
}
+class DuplicateContentProcessor : public minifi::core::Processor {
+ using minifi::core::Processor::Processor;
+
+ public:
+ DuplicateContentProcessor(std::string_view name, const
minifi::utils::Identifier& uuid) : Processor(name, uuid) {}
+ explicit DuplicateContentProcessor(std::string_view name) : Processor(name)
{}
+ static constexpr const char* Description = "A processor that creates two
more of the same flow file.";
+ static constexpr auto Properties = std::array<core::PropertyReference, 0>{};
+ static constexpr auto Success = core::RelationshipDefinition{"success",
"Newly created FlowFiles"};
+ static constexpr auto Original = core::RelationshipDefinition{"original",
"Original FlowFile"};
+ static constexpr auto Relationships = std::array{Success, Original};
+ static constexpr bool SupportsDynamicProperties = false;
+ static constexpr bool SupportsDynamicRelationships = false;
+ static constexpr core::annotation::Input InputRequirement =
core::annotation::Input::INPUT_REQUIRED;
+ static constexpr bool IsSingleThreaded = false;
+ void initialize() override {
+ setSupportedRelationships(Relationships);
+ }
+ void onTrigger(core::ProcessContext& /*context*/, core::ProcessSession&
session) override {
+ auto flow_file = session.get();
+ if (!flow_file) {
+ return;
+ }
+
+ auto flow_file_copy = session.create();
+ std::vector<std::byte> buffer;
+ session.read(flow_file, [&](const std::shared_ptr<io::InputStream>&
stream) -> int64_t {
+ buffer.resize(stream->size());
+ return gsl::narrow<int64_t>(stream->read(buffer));
+ });
+ session.write(flow_file_copy, [&](const std::shared_ptr<io::OutputStream>&
stream) -> int64_t {
+ return gsl::narrow<int64_t>(stream->write(buffer));
+ });
+ session.append(flow_file_copy, [&](const
std::shared_ptr<io::OutputStream>& stream) -> int64_t {
+ return gsl::narrow<int64_t>(stream->write(buffer));
+ });
+ session.transfer(flow_file_copy, Success);
+ session.transfer(flow_file, Original);
+ }
+ ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
+};
+
+TEST_CASE("Test processor metrics change after trigger", "[ProcessorMetrics]")
{
+ minifi::test::SingleProcessorTestController
test_controller(std::make_unique<DuplicateContentProcessor>("DuplicateContentProcessor"));
+ test_controller.trigger({minifi::test::InputFlowFileData{"log line 1", {}}});
+ auto metrics = test_controller.getProcessor()->getMetrics();
+ CHECK(metrics->invocations == 1);
+ CHECK(metrics->incoming_flow_files == 1);
+ CHECK(metrics->transferred_flow_files == 2);
+ CHECK(metrics->getTransferredFlowFilesToRelationshipCount("success") == 1);
+ CHECK(metrics->getTransferredFlowFilesToRelationshipCount("original") == 1);
+ CHECK(metrics->incoming_bytes == 10);
+ CHECK(metrics->transferred_bytes == 30);
+ CHECK(metrics->bytes_read == 10);
+ CHECK(metrics->bytes_written == 20);
+ auto old_nanos = metrics->processing_nanos.load();
+ CHECK(metrics->processing_nanos > 0);
+
+ test_controller.trigger({minifi::test::InputFlowFileData{"new log line 2",
{}}});
+ CHECK(metrics->invocations == 2);
+ CHECK(metrics->incoming_flow_files == 2);
+ CHECK(metrics->transferred_flow_files == 4);
+ CHECK(metrics->getTransferredFlowFilesToRelationshipCount("success") == 2);
+ CHECK(metrics->getTransferredFlowFilesToRelationshipCount("original") == 2);
+ CHECK(metrics->incoming_bytes == 24);
+ CHECK(metrics->transferred_bytes == 72);
+ CHECK(metrics->bytes_read == 24);
+ CHECK(metrics->bytes_written == 48);
+ CHECK(metrics->processing_nanos > old_nanos);
+}
+
+
} // namespace org::apache::nifi::minifi::test