This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 07b9641ff MINIFICPP-2501 Add processorStatuses C2 metric node to 
FlowInformation
07b9641ff is described below

commit 07b9641ff033c1efb214d050079f4451df4e3eb9
Author: Gabor Gyimesi <[email protected]>
AuthorDate: Tue Jan 21 20:23:00 2025 +0100

    MINIFICPP-2501 Add processorStatuses C2 metric node to FlowInformation
    
    Closes #1909
    
    Signed-off-by: Marton Szasz <[email protected]>
---
 C2.md                                              | 400 ++++++++++++++++++++-
 METRICS.md                                         |  34 +-
 .../cluster/checkers/PrometheusChecker.py          |  11 +-
 libminifi/include/core/Processor.h                 |  10 +
 libminifi/include/core/ProcessorMetrics.h          |  10 +-
 .../include/core/state/nodes/FlowInformation.h     |  42 +--
 .../include/core/state/nodes/ResponseNodeLoader.h  |   2 +-
 libminifi/include/io/StreamCallback.h              |   8 +-
 .../utils/LineByLineInputOutputStreamCallback.h    |   2 +-
 libminifi/src/core/ProcessGroup.cpp                |   1 +
 libminifi/src/core/ProcessSession.cpp              |  45 ++-
 libminifi/src/core/Processor.cpp                   |   2 +-
 libminifi/src/core/ProcessorMetrics.cpp            |  29 +-
 libminifi/src/core/state/nodes/FlowInformation.cpp |  69 +++-
 .../src/core/state/nodes/ResponseNodeLoader.cpp    |  20 +-
 .../utils/LineByLineInputOutputStreamCallback.cpp  |  22 +-
 libminifi/test/integration/C2MetricsTest.cpp       |  96 +++--
 .../libtest/unit/SingleProcessorTestController.h   |   2 +-
 libminifi/test/libtest/unit/TestBase.cpp           |   2 +
 libminifi/test/unit/MetricsTests.cpp               |  73 ++++
 20 files changed, 756 insertions(+), 124 deletions(-)

diff --git a/C2.md b/C2.md
index ad492ec57..91a29e030 100644
--- a/C2.md
+++ b/C2.md
@@ -26,10 +26,23 @@ options defined are located in minifi.properties.
   - [Description](#description)
   - [Configuration](#configuration)
     - [Base Options](#base-options)
+      - [Flow Id and URL](#flow-id-and-url)
+      - [Agent Identifier Fallback](#agent-identifier-fallback)
     - [Metrics](#metrics)
     - [UpdatePolicies](#updatepolicies)
     - [Triggers](#triggers)
       - [C2 File triggers](#c2-file-triggers)
+  - [C2 Response Nodes](#c2-response-nodes)
+    - [AgentInformation](#agentinformation)
+    - [AgentStatus](#agentstatus)
+    - [AssetInformation](#assetinformation)
+    - [BuildInformation](#buildinformation)
+    - [ConfigurationChecksums](#configurationchecksums)
+    - [DeviceInfoNode](#deviceinfonode)
+    - [FlowInformation](#flowinformation)
+    - [QueueMetrics](#queuemetrics)
+    - [RepositoryMetrics](#repositorymetrics)
+    - [Processor Metric Response Nodes](#processor-metric-response-nodes)
 
 ## Description
 
@@ -181,21 +194,40 @@ configuration produces the following JSON:
                       "uuid": "2438e3c8-015a-1000-79ca-83af40ec1997"
                   }
               },
-              "components": {
-                  "FlowController": {
-                      "running": true,
-                      "uuid": "2438e3c8-015a-1000-79ca-83af40ec1990"
-                  },
-                  "GetFile": {
-                      "running": false,
-                      "uuid": "2438e3c8-015a-1000-79ca-83af40ec1991"
-                  },
-                  "LogAttribute": {
-                      "running": true,
-                      "uuid": "2438e3c8-015a-1000-79ca-83af40ec1992"
-                  }
-              },
-              "flowId": "8db40550-db5d-11ec-95d7-0433c2c9832b"
+              "processorStatuses": [
+                {
+                    "id": "5128e3c8-015a-1000-79ca-83af40ec1990",
+                    "groupId": "2438e3c8-015a-1000-79ca-83af40ec1990",
+                    "bytesRead": 0,
+                    "bytesWritten": 0,
+                    "flowFilesIn": 0,
+                    "flowFilesOut": 0,
+                    "bytesIn": 0,
+                    "bytesOut": 0,
+                    "invocations": 0,
+                    "processingNanos": 0,
+                    "activeThreadCount": -1,
+                    "terminatedThreadCount": -1,
+                    "runStatus": "Running"
+                },
+                {
+                    "id": "4fe2d51d-076a-49b0-88de-5cf5adf52b8f",
+                    "groupId": "2438e3c8-015a-1000-79ca-83af40ec1990",
+                    "bytesRead": 0,
+                    "bytesWritten": 40,
+                    "flowFilesIn": 0,
+                    "flowFilesOut": 4,
+                    "bytesIn": 0,
+                    "bytesOut": 40,
+                    "invocations": 4,
+                    "processingNanos": 2119148,
+                    "activeThreadCount": -1,
+                    "terminatedThreadCount": -1,
+                    "runStatus": "Running"
+                }
+            ],
+            "flowId": "96273342-b9fe-11ef-a0ad-10f60a596f64",
+            "runStatus": "Running"
           }
       },
       "LoadMetrics": {
@@ -271,3 +303,341 @@ in minifi.properties to activate the file update trigger 
specify
     # specifying a trigger
     nifi.c2.agent.trigger.classes=FileUpdateTrigger
     nifi.c2.file.watch=<full path of file to monitor>
+
+## C2 Response Nodes
+
+The following is a list of nodes that can be defined in the minifi.properties 
file for the C2 heartbeat response as part of the C2 root nodes defined in the 
`nifi.c2.root.classes` property or in the metrics nodes defined in the tree 
under `nifi.c2.root.class.definitions` as stated above.
+
+### AgentInformation
+
+Contains information about the agent's build, extensions, supported C2 
operations and status of its components.
+
+```
+"agentInfo": {
+    "agentManifest": {
+        "buildInfo": {
+            "compiler": "/usr/lib/ccache/g++",
+            "flags": " 
-std=c++20;-Wall;-Wextra;-Werror;-Wno-error=restrict;SODIUM_STATIC=1",
+            "revision": "cc9aaac37a9a6b7efeb3c4394a97522a600a1758",
+            "timestamp": 1734001238,
+            "version": "0.99.1"
+        },
+        "bundles": [
+            {
+                "componentManifest": {
+                    "processors": [
+                    ...
+                    ]
+                },
+                "artifact": "minifi-civet-extensions",
+                "group": "org.apache.nifi.minifi",
+                "version": "0.99.1"
+            }
+        ],
+        "schedulingDefaults": {
+            "defaultMaxConcurrentTasks": 1,
+            "defaultRunDurationNanos": 0,
+            "defaultSchedulingPeriodMillis": 1000,
+            "defaultSchedulingStrategy": "TIMER_DRIVEN",
+            "penalizationPeriodMillis": 30000,
+            "yieldDurationMillis": 1000
+        },
+        "supportedOperations": [
+            {
+                "type": "acknowledge"
+            }
+            ...
+        ],
+        "agentType": "cpp",
+        "identifier": "bH77vXakM0Lkgt8VcDOGZVW3"
+    },
+    "status": {
+        "repositories": {
+            "content_repo": {
+                "entryCount": 0,
+                "full": false,
+                "maxSize": 0,
+                "running": true,
+                "size": 0
+            },
+            "flow_file_repo": {
+                "entryCount": 0,
+                "full": false,
+                "maxSize": 0,
+                "running": true,
+                "size": 0
+            },
+            
"org::apache::nifi::minifi::core::repository::VolatileContentRepository": {
+                "entryCount": 4,
+                "full": false,
+                "maxSize": 7864320,
+                "running": true,
+                "size": 40
+            }
+        },
+        "components": {
+            "LogAttribute": {
+                "running": true,
+                "uuid": "5128e3c8-015a-1000-79ca-83af40ec1990"
+            },
+            "GenerateFlowFile": {
+                "running": true,
+                "uuid": "4fe2d51d-076a-49b0-88de-5cf5adf52b8f"
+            },
+            "FlowController": {
+                "running": true,
+                "uuid": "2438e3c8-015a-1000-79ca-83af40ec1990"
+            }
+        },
+        "resourceConsumption": {
+            "cpuUtilization": 0.05,
+            "memoryUsage": 97955840
+        },
+        "uptime": 1025
+    },
+    "agentClass": "test",
+    "agentManifestHash": 
"9FFC8326121A816E5B2FD674CE9A34321F89CC690AD0D1FD79DFB5969B3B523D6570520382E82C68CFA347FBD9897FC027E518E98CFA229C18617B062E1C9E77",
+    "identifier": "9628acfe-b9fe-11ef-a0c0-10f60a596f64"
+}
+```
+
+### AgentStatus
+
+Contains information about the agent's status, including the status of its 
components, repositories, and resource consumption.
+
+```
+"AgentStatus": {
+    "repositories": {
+        "repo_name": {
+            "entryCount": 0,
+            "full": false,
+            "maxSize": 0,
+            "running": true,
+            "size": 0
+        },
+        "ff": {
+            "entryCount": 0,
+            "full": false,
+            "maxSize": 0,
+            "running": true,
+            "size": 0
+        },
+        
"org::apache::nifi::minifi::core::repository::VolatileContentRepository": {
+            "entryCount": 4,
+            "full": false,
+            "maxSize": 7864320,
+            "running": true,
+            "size": 40
+        }
+    },
+    "components": {
+        "LogAttribute": {
+            "running": true,
+            "uuid": "5128e3c8-015a-1000-79ca-83af40ec1990"
+        },
+        "GenerateFlowFile": {
+            "running": true,
+            "uuid": "4fe2d51d-076a-49b0-88de-5cf5adf52b8f"
+        },
+        "FlowController": {
+            "running": true,
+            "uuid": "2438e3c8-015a-1000-79ca-83af40ec1990"
+        }
+    },
+    "resourceConsumption": {
+        "cpuUtilization": 0.0028846153846153849,
+        "memoryUsage": 97955840
+    },
+    "uptime": 995
+}
+```
+
+### AssetInformation
+
+Contains the calculated hash of the assets.
+
+```
+"resourceInfo": {
+    "hash": "null"
+}
+```
+
+### BuildInformation
+
+Contains information about the agent's build.
+
+```
+"BuildInformation": {
+    "compiler": {
+        "compiler_command": "/usr/lib/ccache/g++",
+        "compiler_flags": " 
-std=c++20;-Wall;-Wextra;-Werror;-Wno-error=restrict;SODIUM_STATIC=1",
+        "compiler_version": "11.4.0"
+    },
+    "build_date": "1734001238",
+    "build_rev": "cc9aaac37a9a6b7efeb3c4394a97522a600a1758",
+    "build_version": "0.99.1",
+    "device_id": "bH77vXakM0Lkgt8VcDOGZVW3"
+}
+```
+
+### ConfigurationChecksums
+
+Metric node that defines checksums of configuration files in the C2 protocol.
+
+```
+"configurationChecksums": {
+    "SHA256": {
+        "TestC2Metrics.yml": 
"9af6589bf7729bb88857aafe98cea4f41df049725401b5f0ded0a7b949d9b90c",
+        "minifi.properties": 
"06fb9f4730e3db7d0a0a1ee606a7de3fee5813edf42eab140616e8a2995072df"
+    }
+},
+```
+
+### DeviceInfoNode
+
+Contains information about the device the agent is running on.
+
+```
+"deviceInfo": {
+    "systemInfo": {
+        "cpuLoadAverage": 1.271484375,
+        "cpuUtilization": 0.06179499754781756,
+        "machineArch": "x86_64",
+        "memoryUsage": 12681670656,
+        "operatingSystem": "Linux",
+        "physicalMem": 67081129984,
+        "vCores": 20
+    },
+    "networkInfo": {
+        "hostname": "ggyimesi-5570-ubuntu",
+        "ipAddress": "10.255.0.1"
+    },
+    "identifier": "16475557466943148337"
+}
+```
+
+### FlowInformation
+
+Contains information about the flow the agent is running, including the 
versioned flow snapshot URI, queues, components, and processor statuses.
+
+```
+"flowInfo": {
+    "versionedFlowSnapshotURI": {
+        "bucketId": "default",
+        "flowId": "96273342-b9fe-11ef-a0ad-10f60a596f64"
+    },
+    "queues": {
+        "8368e3c8-015a-1003-52ca-83af40ec1332": {
+            "dataSize": 40,
+            "dataSizeMax": 1048576,
+            "name": "GenerateFlowFile/success/LogAttribute",
+            "size": 4,
+            "sizeMax": 0,
+            "uuid": "8368e3c8-015a-1003-52ca-83af40ec1332"
+        }
+    },
+    "processorStatuses": [
+        {
+            "id": "5128e3c8-015a-1000-79ca-83af40ec1990",
+            "groupId": "2438e3c8-015a-1000-79ca-83af40ec1990",
+            "bytesRead": 0,
+            "bytesWritten": 0,
+            "flowFilesIn": 0,
+            "flowFilesOut": 0,
+            "bytesIn": 0,
+            "bytesOut": 0,
+            "invocations": 0,
+            "processingNanos": 0,
+            "activeThreadCount": -1,
+            "terminatedThreadCount": -1,
+            "running": true
+        },
+        {
+            "id": "4fe2d51d-076a-49b0-88de-5cf5adf52b8f",
+            "groupId": "2438e3c8-015a-1000-79ca-83af40ec1990",
+            "bytesRead": 0,
+            "bytesWritten": 40,
+            "flowFilesIn": 0,
+            "flowFilesOut": 4,
+            "bytesIn": 0,
+            "bytesOut": 40,
+            "invocations": 4,
+            "processingNanos": 2119148,
+            "activeThreadCount": -1,
+            "terminatedThreadCount": -1,
+            "running": true
+        }
+    ],
+    "flowId": "96273342-b9fe-11ef-a0ad-10f60a596f64",
+    "running": true
+}
+```
+
+### QueueMetrics
+
+Contains information about the queues in the flow, including the contained 
data and number of flow files.
+
+```
+"QueueMetrics": {
+    "GenerateFlowFile/success/LogAttribute": {
+        "datasize": "40",
+        "datasizemax": "1048576",
+        "queued": "4",
+        "queuedmax": "0"
+    }
+}
+```
+
+### RepositoryMetrics
+
+Contains information about the repositories in the agent, including the number 
of entries, size, and whether the repository is full.
+
+
+```
+"RepositoryMetrics": {
+    "repo_name": {
+        "entryCount": 0,
+        "full": false,
+        "maxSize": 0,
+        "running": true,
+        "size": 0
+    },
+    "ff": {
+        "entryCount": 0,
+        "full": false,
+        "maxSize": 0,
+        "running": true,
+        "size": 0
+    },
+    "org::apache::nifi::minifi::core::repository::VolatileContentRepository": {
+        "entryCount": 4,
+        "full": false,
+        "maxSize": 7864320,
+        "running": true,
+        "size": 40
+    }
+}
+```
+
+### Processor Metric Response Nodes
+
+Each processor can have its own metrics. These metric nodes can be configured 
in the minifi.properties by requesting metrics in the \<ProcessorType\>Metrics 
format, for example GetTCPMetrics to request metrics for the GetTCP processors. 
Besides configuring processor metrics directly, they can also be configured 
using regular expressions with the `processorMetrics/` prefix. For example 
`processorMetrics/Get.*Metrics` will match all processor metrics that start 
with Get.
+
+```
+"GetTCPMetrics": {
+    "2438e3c8-015a-1000-79ca-83af40ec1991": {
+        "AverageOnTriggerRunTime": 0,
+        "AverageSessionCommitRunTime": 0,
+        "BytesRead": 0,
+        "BytesWritten": 0,
+        "IncomingBytes": 0,
+        "IncomingFlowFiles": 0,
+        "LastOnTriggerRunTime": 0,
+        "LastSessionCommitRunTime": 0,
+        "OnTriggerInvocations": 11,
+        "ProcessingNanos": 729328,
+        "TransferredBytes": 0,
+        "TransferredFlowFiles": 0
+    }
+}
+```
diff --git a/METRICS.md b/METRICS.md
index 7aa53fda7..e28f77a40 100644
--- a/METRICS.md
+++ b/METRICS.md
@@ -162,8 +162,8 @@ RepositoryMetrics is a system level metric that reports 
metrics for the register
 | rocksdb_table_readers_size_bytes     | repository_name | RocksDB's estimated 
memory used for reading SST tables (only present if repository uses RocksDB)    
             |
 | rocksdb_all_memory_tables_size_bytes | repository_name | RocksDB's 
approximate size of active and unflushed immutable memtables (only present if 
repository uses RocksDB) |
 
-| Label                    | Description                                       
                                                                                
    |
-|--------------------------|---------------------------------------------------------------------------------------------------------------------------------------|
+| Label                    | Description                                       
                                                                                
     |
+|--------------------------|----------------------------------------------------------------------------------------------------------------------------------------|
 | repository_name          | Name of the reported repository. There are three 
repositories present with the following names: `flowfile`, `content` and 
`provenance` |
 
 ### DeviceInfoNode
@@ -181,13 +181,22 @@ DeviceInfoNode is a system level metric that reports 
metrics about the system re
 
 FlowInformation is a system level metric that reports component and queue 
related metrics.
 
-| Metric name          | Labels                           | Description        
                        |
-|----------------------|----------------------------------|--------------------------------------------|
-| queue_data_size      | connection_uuid, connection_name | Current queue data 
size                    |
-| queue_data_size_max  | connection_uuid, connection_name | Max queue data 
size to apply back pressure |
-| queue_size           | connection_uuid, connection_name | Current queue size 
                        |
-| queue_size_max       | connection_uuid, connection_name | Max queue size to 
apply back pressure      |
-| is_running           | component_uuid, component_name   | Check if the 
component is running (1 or 0) |
+| Metric name          | Labels                           | Description        
                                                        |
+|----------------------|----------------------------------|----------------------------------------------------------------------------|
+| queue_data_size      | connection_uuid, connection_name | Current queue data 
size                                                    |
+| queue_data_size_max  | connection_uuid, connection_name | Max queue data 
size to apply back pressure                                 |
+| queue_size           | connection_uuid, connection_name | Current queue size 
                                                        |
+| queue_size_max       | connection_uuid, connection_name | Max queue size to 
apply back pressure                                      |
+| is_running           | component_uuid, component_name   | Check if the 
component is running (1 or 0)                                 |
+| bytes_read           | processor_uuid, processor_name   | Number of bytes 
read by the processor                                      |
+| bytes_written        | processor_uuid, processor_name   | Number of bytes 
written by the processor                                   |
+| flow_files_in        | processor_uuid, processor_name   | Number of flow 
files from the incoming queue processed by the processor    |
+| flow_files_out       | processor_uuid, processor_name   | Number of flow 
files transferred to outgoing relationship by the processor |
+| bytes_in             | processor_uuid, processor_name   | Sum of data from 
the incoming queue processed by the processor             |
+| bytes_out            | processor_uuid, processor_name   | Sum of data 
transferred to outgoing relationship by the processor          |
+| invocations          | processor_uuid, processor_name   | Number of times 
the processor was triggered                                |
+| processing_nanos     | processor_uuid, processor_name   | Sum of the runtime 
spent in the processor in nanoseconds                   |
+
 
 | Label           | Description                                                
  |
 
|-----------------|--------------------------------------------------------------|
@@ -195,6 +204,8 @@ FlowInformation is a system level metric that reports 
component and queue relate
 | connection_name | Name of the connection defined in the flow configuration   
  |
 | component_uuid  | UUID of the component                                      
  |
 | component_name  | Name of the component                                      
  |
+| processor_uuid  | UUID of the processor                                      
  |
+| processor_name  | Name of the processor                                      
  |
 
 ### AgentStatus
 
@@ -251,6 +262,11 @@ There are general metrics that are available for all 
processors. Besides these m
 | transferred_flow_files                      | metric_class, processor_name, 
processor_uuid | Number of flow files transferred to a relationship             
                          |
 | transferred_bytes                           | metric_class, processor_name, 
processor_uuid | Number of bytes transferred to a relationship                  
                          |
 | transferred_to_\<relationship\>             | metric_class, processor_name, 
processor_uuid | Number of flow files transferred to a specific relationship    
                          |
+| incoming_flow_files                         | metric_class, processor_name, 
processor_uuid | Number of flow files from the incoming queue processed by the 
processor                  |
+| incoming_bytes                              | metric_class, processor_name, 
processor_uuid | Sum of data from the incoming queue processed by the processor 
                          |
+| bytes_read                                  | metric_class, processor_name, 
processor_uuid | Number of bytes read by the processor                          
                          |
+| bytes_written                               | metric_class, processor_name, 
processor_uuid | Number of bytes written by the processor                       
                          |
+| processing_nanos                            | metric_class, processor_name, 
processor_uuid | Sum of the runtime spent in the processor in nanoseconds       
                          |
 
 | Label          | Description                                                 
           |
 
|----------------|------------------------------------------------------------------------|
diff --git a/docker/test/integration/cluster/checkers/PrometheusChecker.py 
b/docker/test/integration/cluster/checkers/PrometheusChecker.py
index 8c12bc25f..d73859f58 100644
--- a/docker/test/integration/cluster/checkers/PrometheusChecker.py
+++ b/docker/test/integration/cluster/checkers/PrometheusChecker.py
@@ -60,8 +60,11 @@ class PrometheusChecker:
     def verify_general_processor_metrics(self, metric_class, processor_name):
         labels = {'processor_name': processor_name}
         return 
self.verify_metrics_exist(['minifi_average_onTrigger_runtime_milliseconds', 
'minifi_last_onTrigger_runtime_milliseconds',
-                                          
'minifi_average_session_commit_runtime_milliseconds', 
'minifi_last_session_commit_runtime_milliseconds'], metric_class, labels) and \
-            
self.verify_metrics_larger_than_zero(['minifi_onTrigger_invocations', 
'minifi_transferred_flow_files', 'minifi_transferred_to_success', 
'minifi_transferred_bytes'], metric_class, labels)
+                                          
'minifi_average_session_commit_runtime_milliseconds', 
'minifi_last_session_commit_runtime_milliseconds',
+                                          'minifi_incoming_flow_files', 
'minifi_incoming_bytes', 'minifi_bytes_read', 'minifi_bytes_written'], 
metric_class, labels) and \
+            
self.verify_metrics_larger_than_zero(['minifi_onTrigger_invocations', 
'minifi_transferred_flow_files', 'minifi_transferred_to_success',
+                                                  'minifi_transferred_bytes', 
'minifi_processing_nanos'],
+                                                 metric_class, labels)
 
     def verify_getfile_metrics(self, metric_class, processor_name):
         labels = {'processor_name': processor_name}
@@ -69,7 +72,9 @@ class PrometheusChecker:
             self.verify_metrics_exist(['minifi_input_bytes', 
'minifi_accepted_files'], metric_class, labels)
 
     def verify_flow_information_metrics(self):
-        return self.verify_metrics_exist(['minifi_queue_data_size', 
'minifi_queue_data_size_max', 'minifi_queue_size', 'minifi_queue_size_max'], 
'FlowInformation') and \
+        return self.verify_metrics_exist(['minifi_queue_data_size', 
'minifi_queue_data_size_max', 'minifi_queue_size', 'minifi_queue_size_max',
+                                          'minifi_bytes_read', 
'minifi_bytes_written', 'minifi_flow_files_in', 'minifi_flow_files_out', 
'minifi_bytes_in', 'minifi_bytes_out',
+                                          'minifi_invocations', 
'minifi_processing_nanos'], 'FlowInformation') and \
             self.verify_metric_exists('minifi_is_running', 'FlowInformation', 
{'component_name': 'FlowController'})
 
     def verify_device_info_node_metrics(self):
diff --git a/libminifi/include/core/Processor.h 
b/libminifi/include/core/Processor.h
index 6f25c176c..8dfc3db77 100644
--- a/libminifi/include/core/Processor.h
+++ b/libminifi/include/core/Processor.h
@@ -165,6 +165,14 @@ class Processor : public Connectable, public 
ConfigurableComponent, public state
     active_tasks_ = 0;
   }
 
+  std::string getProcessGroupUUIDStr() const {
+    return process_group_uuid_;
+  }
+
+  void setProcessGroupUUIDStr(const std::string &uuid) {
+    process_group_uuid_ = uuid;
+  }
+
   void yield() override;
 
   void yield(std::chrono::steady_clock::duration delta_time);
@@ -256,6 +264,8 @@ class Processor : public Connectable, public 
ConfigurableComponent, public state
 
   // an outgoing connection allows us to reach these nodes
   std::unordered_map<Connection*, std::unordered_set<Processor*>> 
reachable_processors_;
+
+  std::string process_group_uuid_;
 };
 
 }  // namespace core
diff --git a/libminifi/include/core/ProcessorMetrics.h 
b/libminifi/include/core/ProcessorMetrics.h
index de5c3d157..231f2efb4 100644
--- a/libminifi/include/core/ProcessorMetrics.h
+++ b/libminifi/include/core/ProcessorMetrics.h
@@ -52,10 +52,16 @@ class ProcessorMetrics : public 
state::response::ResponseNode {
   std::chrono::milliseconds getAverageSessionCommitRuntime() const;
   std::chrono::milliseconds getLastSessionCommitRuntime() const;
   void addLastSessionCommitRuntime(std::chrono::milliseconds runtime);
+  std::optional<size_t> getTransferredFlowFilesToRelationshipCount(const 
std::string& relationship) const;
 
-  std::atomic<size_t> iterations{0};
+  std::atomic<size_t> invocations{0};
+  std::atomic<size_t> incoming_flow_files{0};
   std::atomic<size_t> transferred_flow_files{0};
+  std::atomic<uint64_t> incoming_bytes{0};
   std::atomic<uint64_t> transferred_bytes{0};
+  std::atomic<uint64_t> bytes_read{0};
+  std::atomic<uint64_t> bytes_written{0};
+  std::atomic<uint64_t> processing_nanos{0};
 
  protected:
   template<typename ValueType>
@@ -80,7 +86,7 @@ class ProcessorMetrics : public state::response::ResponseNode 
{
   [[nodiscard]] std::unordered_map<std::string, std::string> getCommonLabels() 
const;
   static constexpr uint8_t STORED_ON_TRIGGER_RUNTIME_COUNT = 10;
 
-  std::mutex transferred_relationships_mutex_;
+  mutable std::mutex transferred_relationships_mutex_;
   std::unordered_map<std::string, size_t> transferred_relationships_;
   const Processor& source_processor_;
   Averager<std::chrono::milliseconds> on_trigger_runtime_averager_;
diff --git a/libminifi/include/core/state/nodes/FlowInformation.h 
b/libminifi/include/core/state/nodes/FlowInformation.h
index a605bd8b2..5bfc257e1 100644
--- a/libminifi/include/core/state/nodes/FlowInformation.h
+++ b/libminifi/include/core/state/nodes/FlowInformation.h
@@ -26,6 +26,7 @@
 #include "core/state/nodes/StateMonitor.h"
 #include "Connection.h"
 #include "core/state/ConnectionStore.h"
+#include "core/Processor.h"
 
 namespace org::apache::nifi::minifi::state::response {
 
@@ -92,16 +93,22 @@ class FlowVersion : public DeviceInformation {
   std::shared_ptr<FlowIdentifier> identifier;
 };
 
-class FlowMonitor : public StateMonitorNode {
+class FlowInformation : public StateMonitorNode {
  public:
-  FlowMonitor(std::string_view name, const utils::Identifier &uuid)
+  FlowInformation(std::string_view name, const utils::Identifier &uuid)
       : StateMonitorNode(name, uuid) {
   }
 
-  explicit FlowMonitor(std::string_view name)
+  explicit FlowInformation(std::string_view name)
       : StateMonitorNode(name) {
   }
 
+  MINIFIAPI static constexpr const char* Description = "Metric node that 
defines the flow ID and flow URL deployed to this agent";
+
+  std::string getName() const override {
+    return "flowInfo";
+  }
+
   void setFlowVersion(std::shared_ptr<state::response::FlowVersion> 
flow_version) {
     flow_version_ = std::move(flow_version);
   }
@@ -110,32 +117,17 @@ class FlowMonitor : public StateMonitorNode {
     connection_store_.updateConnection(connection);
   }
 
- protected:
-  std::shared_ptr<state::response::FlowVersion> flow_version_;
-  ConnectionStore connection_store_;
-};
-
-/**
- * Justification and Purpose: Provides flow version Information
- */
-class FlowInformation : public FlowMonitor {
- public:
-  FlowInformation(std::string_view name, const utils::Identifier &uuid)
-      : FlowMonitor(name, uuid) {
-  }
-
-  explicit FlowInformation(std::string_view name)
-      : FlowMonitor(name) {
-  }
-
-  MINIFIAPI static constexpr const char* Description = "Metric node that 
defines the flow ID and flow URL deployed to this agent";
-
-  std::string getName() const override {
-    return "flowInfo";
+  void setProcessors(std::vector<core::Processor*> processors) {
+    processors_ = std::move(processors);
   }
 
   std::vector<SerializedResponseNode> serialize() override;
   std::vector<PublishedMetric> calculateMetrics() override;
+
+ private:
+  std::shared_ptr<state::response::FlowVersion> flow_version_;
+  ConnectionStore connection_store_;
+  std::vector<core::Processor*> processors_;
 };
 
 }  // namespace org::apache::nifi::minifi::state::response
diff --git a/libminifi/include/core/state/nodes/ResponseNodeLoader.h 
b/libminifi/include/core/state/nodes/ResponseNodeLoader.h
index 31e8ac93f..c94743431 100644
--- a/libminifi/include/core/state/nodes/ResponseNodeLoader.h
+++ b/libminifi/include/core/state/nodes/ResponseNodeLoader.h
@@ -62,7 +62,7 @@ class ResponseNodeLoader {
   void initializeAgentNode(const SharedResponseNode& response_node) const;
   void initializeAgentStatus(const SharedResponseNode& response_node) const;
   void initializeConfigurationChecksums(const SharedResponseNode& 
response_node) const;
-  void initializeFlowMonitor(const SharedResponseNode& response_node) const;
+  void initializeFlowInformation(const SharedResponseNode& response_node) 
const;
   void initializeAssetInformation(const SharedResponseNode& response_node) 
const;
   std::vector<SharedResponseNode> getMatchingComponentMetricsNodes(const 
std::string& regex_str) const;
 
diff --git a/libminifi/include/io/StreamCallback.h 
b/libminifi/include/io/StreamCallback.h
index 6d4e6c00a..8467c0f96 100644
--- a/libminifi/include/io/StreamCallback.h
+++ b/libminifi/include/io/StreamCallback.h
@@ -18,16 +18,22 @@
 
 #include <functional>
 #include <memory>
+#include <optional>
 
 namespace org::apache::nifi::minifi::io {
 
 class InputStream;
 class OutputStream;
 
+struct ReadWriteResult {
+  int64_t bytes_written = 0;
+  int64_t bytes_read = 0;
+};
+
 // FlowFile IO Callback functions for input and output
 // throw exception for error
 using InputStreamCallback = std::function<int64_t(const 
std::shared_ptr<InputStream>& input_stream)>;
 using OutputStreamCallback = std::function<int64_t(const 
std::shared_ptr<OutputStream>& output_stream)>;
-using InputOutputStreamCallback = std::function<int64_t(const 
std::shared_ptr<InputStream>& input_stream, const 
std::shared_ptr<OutputStream>& output_stream)>;
+using InputOutputStreamCallback = 
std::function<std::optional<ReadWriteResult>(const 
std::shared_ptr<InputStream>& input_stream, const 
std::shared_ptr<OutputStream>& output_stream)>;
 
 }  // namespace org::apache::nifi::minifi::io
diff --git a/libminifi/include/utils/LineByLineInputOutputStreamCallback.h 
b/libminifi/include/utils/LineByLineInputOutputStreamCallback.h
index 0d68fe568..b52821d8e 100644
--- a/libminifi/include/utils/LineByLineInputOutputStreamCallback.h
+++ b/libminifi/include/utils/LineByLineInputOutputStreamCallback.h
@@ -33,7 +33,7 @@ class LineByLineInputOutputStreamCallback {
  public:
   using CallbackType = std::function<std::string(const std::string& 
input_line, bool is_first_line, bool is_last_line)>;
   explicit LineByLineInputOutputStreamCallback(CallbackType callback);
-  int64_t operator()(const std::shared_ptr<io::InputStream>& input, const 
std::shared_ptr<io::OutputStream>& output);
+  std::optional<io::ReadWriteResult> operator()(const 
std::shared_ptr<io::InputStream>& input, const 
std::shared_ptr<io::OutputStream>& output);
 
  private:
   int64_t readInput(io::InputStream& stream);
diff --git a/libminifi/src/core/ProcessGroup.cpp 
b/libminifi/src/core/ProcessGroup.cpp
index 7fdcf175d..fcf916ceb 100644
--- a/libminifi/src/core/ProcessGroup.cpp
+++ b/libminifi/src/core/ProcessGroup.cpp
@@ -86,6 +86,7 @@ std::tuple<Processor*, bool> 
ProcessGroup::addProcessor(std::unique_ptr<Processo
   gsl_Expects(processor);
   const auto name = processor->getName();
   std::lock_guard<std::recursive_mutex> lock(mutex_);
+  processor->setProcessGroupUUIDStr(getUUIDStr());
   const auto [iter, inserted] = processors_.insert(std::move(processor));
   if (inserted) {
     logger_->log_debug("Add processor {} into process group {}", name, name_);
diff --git a/libminifi/src/core/ProcessSession.cpp 
b/libminifi/src/core/ProcessSession.cpp
index 79c02ff63..15941f0b7 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -268,6 +268,9 @@ void ProcessSession::write(core::FlowFile &flow, const 
io::OutputStreamCallback&
     std::string details = process_context_->getProcessorNode()->getName() + " 
modify flow record content " + flow.getUUIDStr();
     auto duration = 
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now()
 - start_time);
     provenance_report_->modifyContent(flow, details, duration);
+    if (metrics_) {
+      metrics_->bytes_written += stream->size();
+    }
   } catch (const std::exception& exception) {
     logger_->log_debug("Caught Exception during process session write, type: 
{}, what: {}", typeid(exception).name(), exception.what());
     throw;
@@ -280,6 +283,7 @@ void ProcessSession::write(core::FlowFile &flow, const 
io::OutputStreamCallback&
 void ProcessSession::writeBuffer(const std::shared_ptr<core::FlowFile>& 
flow_file, std::span<const char> buffer) {
   writeBuffer(flow_file, as_bytes(buffer));
 }
+
 void ProcessSession::writeBuffer(const std::shared_ptr<core::FlowFile>& 
flow_file, std::span<const std::byte> buffer) {
   write(flow_file, [buffer](const std::shared_ptr<io::OutputStream>& 
output_stream) {
     const auto write_status = output_stream->write(buffer);
@@ -316,6 +320,9 @@ void ProcessSession::append(const 
std::shared_ptr<core::FlowFile> &flow, const i
       throw Exception(FILE_OPERATION_EXCEPTION, "Failed to process flowfile 
content");
     }
     flow->setSize(flow_file_size + (stream->size() - 
stream_size_before_callback));
+    if (metrics_) {
+      metrics_->bytes_written += stream->size() - stream_size_before_callback;
+    }
 
     std::stringstream details;
     details << process_context_->getProcessorNode()->getName() << " modify 
flow record content " << flow->getUUIDStr();
@@ -373,6 +380,9 @@ int64_t ProcessSession::read(const core::FlowFile& 
flow_file, const io::InputStr
     if (ret < 0) {
       throw Exception(FILE_OPERATION_EXCEPTION, "Failed to process flowfile 
content");
     }
+    if (metrics_) {
+      metrics_->bytes_read += ret;
+    }
     return ret;
   } catch (const std::exception& exception) {
     logger_->log_debug("Caught Exception {}", exception.what());
@@ -383,7 +393,6 @@ int64_t ProcessSession::read(const core::FlowFile& 
flow_file, const io::InputStr
   }
 }
 
-
 int64_t ProcessSession::readWrite(const std::shared_ptr<core::FlowFile> &flow, 
const io::InputOutputStreamCallback& callback) {
   gsl_Expects(callback);
 
@@ -409,19 +418,23 @@ int64_t ProcessSession::readWrite(const 
std::shared_ptr<core::FlowFile> &flow, c
       throw Exception(FILE_OPERATION_EXCEPTION, "Failed to open flowfile 
content for write");
     }
 
-    int64_t bytes_written = callback(input_stream, output_stream);
-    if (bytes_written < 0) {
+    auto read_write_result = callback(input_stream, output_stream);
+    if (!read_write_result) {
       throw Exception(FILE_OPERATION_EXCEPTION, "Failed to process flowfile 
content");
     }
 
     input_stream->close();
     output_stream->close();
 
-    flow->setSize(gsl::narrow<uint64_t>(bytes_written));
+    flow->setSize(gsl::narrow<uint64_t>(read_write_result->bytes_written));
     flow->setOffset(0);
     flow->setResourceClaim(output_claim);
+    if (metrics_) {
+      metrics_->bytes_written += read_write_result->bytes_written;
+      metrics_->bytes_read += read_write_result->bytes_read;
+    }
 
-    return bytes_written;
+    return read_write_result->bytes_written;
   } catch (const std::exception& exception) {
     logger_->log_debug("Caught exception during process session readWrite, 
type: {}, what: {}", typeid(exception).name(), exception.what());
     throw;
@@ -486,6 +499,9 @@ void ProcessSession::importFrom(io::InputStream &stream, 
const std::shared_ptr<c
         flow->getOffset(), flow->getSize(), 
flow->getResourceClaim()->getContentFullPath(), flow->getUUIDStr());
 
     content_stream->close();
+    if (metrics_) {
+      metrics_->bytes_written += content_stream->size();
+    }
     std::stringstream details;
     details << process_context_->getProcessorNode()->getName() << " modify 
flow record content " << flow->getUUIDStr();
     auto duration = 
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now()
 - start_time);
@@ -547,6 +563,9 @@ void ProcessSession::import(const std::string& source, 
const std::shared_ptr<Flo
                            flow->getUUIDStr());
 
         stream->close();
+        if (metrics_) {
+          metrics_->bytes_written += stream->size();
+        }
         input.close();
         if (!keepSource) {
           (void)std::remove(source.c_str());
@@ -649,6 +668,9 @@ void ProcessSession::import(const std::string& source, 
std::vector<std::shared_p
         logger_->log_debug("Import offset {} length {} into content {}, 
FlowFile UUID {}",
             flowFile->getOffset(), flowFile->getSize(), 
flowFile->getResourceClaim()->getContentFullPath(), flowFile->getUUIDStr());
         stream->close();
+        if (metrics_) {
+          metrics_->bytes_written += stream->size();
+        }
         std::string details = process_context_->getProcessorNode()->getName() 
+ " modify flow record content " + flowFile->getUUIDStr();
         auto duration = 
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now()
 - start_time);
         provenance_report_->modifyContent(*flowFile, details, duration);
@@ -819,7 +841,7 @@ void ProcessSession::commit() {
   const auto commit_start_time = std::chrono::steady_clock::now();
   try {
     std::unordered_map<std::string, TransferMetrics> transfers;
-      auto increaseTransferMetrics = [&](const FlowFile& record, const 
Relationship& relationship) {
+    auto increaseTransferMetrics = [&](const FlowFile& record, const 
Relationship& relationship) {
       ++transfers[relationship.getName()].transfer_count;
       transfers[relationship.getName()].transfer_size += record.getSize();
     };
@@ -930,8 +952,11 @@ void ProcessSession::commit() {
     // persistent the provenance report
     this->provenance_report_->commit();
     logger_->log_debug("ProcessSession committed for {}", 
process_context_->getProcessorNode()->getName());
-    if (metrics_)
-      
metrics_->addLastSessionCommitRuntime(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now()
 - commit_start_time));
+    if (metrics_) {
+      auto time_delta = std::chrono::steady_clock::now() - commit_start_time;
+      
metrics_->addLastSessionCommitRuntime(std::chrono::duration_cast<std::chrono::milliseconds>(time_delta));
+      metrics_->processing_nanos += 
std::chrono::duration_cast<std::chrono::nanoseconds>(time_delta).count();
+    }
   } catch (const std::exception& exception) {
     logger_->log_debug("Caught Exception during process session commit, type: 
{}, what: {}", typeid(exception).name(), exception.what());
     throw;
@@ -1141,6 +1166,10 @@ std::shared_ptr<core::FlowFile> ProcessSession::get() {
       if (flow_version != nullptr) {
         ret->setAttribute(SpecialFlowAttribute::FLOW_ID, 
flow_version->getFlowId());
       }
+      if (metrics_) {
+        metrics_->incoming_bytes += ret->getSize();
+        ++metrics_->incoming_flow_files;
+      }
       return ret;
     }
     current = 
dynamic_cast<Connection*>(process_context_->getProcessorNode()->pickIncomingConnection());
diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp
index b5cf616e2..77b8d3fa1 100644
--- a/libminifi/src/core/Processor.cpp
+++ b/libminifi/src/core/Processor.cpp
@@ -198,7 +198,7 @@ void Processor::triggerAndCommit(const 
std::shared_ptr<ProcessContext>& context,
 }
 
 void Processor::trigger(const std::shared_ptr<ProcessContext>& context, const 
std::shared_ptr<ProcessSession>& process_session) {
-  ++metrics_->iterations;
+  ++metrics_->invocations;
   const auto start = std::chrono::steady_clock::now();
   onTrigger(*context, *process_session);
   
metrics_->addLastOnTriggerRuntime(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now()
 - start));
diff --git a/libminifi/src/core/ProcessorMetrics.cpp 
b/libminifi/src/core/ProcessorMetrics.cpp
index 8cd054888..95e00c487 100644
--- a/libminifi/src/core/ProcessorMetrics.cpp
+++ b/libminifi/src/core/ProcessorMetrics.cpp
@@ -44,13 +44,18 @@ std::vector<state::response::SerializedResponseNode> 
ProcessorMetrics::serialize
   state::response::SerializedResponseNode root_node {
     .name = source_processor_.getUUIDStr(),
     .children = {
-      {.name = "OnTriggerInvocations", .value = 
static_cast<uint32_t>(iterations.load())},
+      {.name = "OnTriggerInvocations", .value = 
static_cast<uint32_t>(invocations.load())},
       {.name = "AverageOnTriggerRunTime", .value = 
static_cast<uint64_t>(getAverageOnTriggerRuntime().count())},
       {.name = "LastOnTriggerRunTime", .value = 
static_cast<uint64_t>(getLastOnTriggerRuntime().count())},
       {.name = "AverageSessionCommitRunTime", .value = 
static_cast<uint64_t>(getAverageSessionCommitRuntime().count())},
       {.name = "LastSessionCommitRunTime", .value = 
static_cast<uint64_t>(getLastSessionCommitRuntime().count())},
       {.name = "TransferredFlowFiles", .value = 
static_cast<uint32_t>(transferred_flow_files.load())},
-      {.name = "TransferredBytes", .value = transferred_bytes.load()}
+      {.name = "TransferredBytes", .value = 
static_cast<uint64_t>(transferred_bytes.load())},
+      {.name = "IncomingFlowFiles", .value = 
static_cast<uint32_t>(incoming_flow_files.load())},
+      {.name = "IncomingBytes", .value = 
static_cast<uint64_t>(incoming_bytes.load())},
+      {.name = "BytesRead", .value = static_cast<uint64_t>(bytes_read.load())},
+      {.name = "BytesWritten", .value = 
static_cast<uint64_t>(bytes_written.load())},
+      {.name = "ProcessingNanos", .value = 
static_cast<uint64_t>(processing_nanos.load())}
     }
   };
 
@@ -59,7 +64,7 @@ std::vector<state::response::SerializedResponseNode> 
ProcessorMetrics::serialize
     for (const auto& [relationship, count] : transferred_relationships_) {
       gsl_Expects(!relationship.empty());
       state::response::SerializedResponseNode transferred_to_relationship_node;
-      transferred_to_relationship_node.name = 
std::string("TransferredTo").append(1, 
toupper(relationship[0])).append(relationship.substr(1));
+      transferred_to_relationship_node.name = 
std::string("TransferredTo").append(1, 
gsl::narrow<char>(toupper(relationship[0]))).append(relationship.substr(1));
       transferred_to_relationship_node.value = static_cast<uint32_t>(count);
 
       root_node.children.push_back(transferred_to_relationship_node);
@@ -73,13 +78,18 @@ std::vector<state::response::SerializedResponseNode> 
ProcessorMetrics::serialize
 
 std::vector<state::PublishedMetric> ProcessorMetrics::calculateMetrics() {
   std::vector<state::PublishedMetric> metrics = {
-    {"onTrigger_invocations", static_cast<double>(iterations.load()), 
getCommonLabels()},
+    {"onTrigger_invocations", static_cast<double>(invocations.load()), 
getCommonLabels()},
     {"average_onTrigger_runtime_milliseconds", 
static_cast<double>(getAverageOnTriggerRuntime().count()), getCommonLabels()},
     {"last_onTrigger_runtime_milliseconds", 
static_cast<double>(getLastOnTriggerRuntime().count()), getCommonLabels()},
     {"average_session_commit_runtime_milliseconds", 
static_cast<double>(getAverageSessionCommitRuntime().count()), 
getCommonLabels()},
     {"last_session_commit_runtime_milliseconds", 
static_cast<double>(getLastSessionCommitRuntime().count()), getCommonLabels()},
     {"transferred_flow_files", 
static_cast<double>(transferred_flow_files.load()), getCommonLabels()},
-    {"transferred_bytes", static_cast<double>(transferred_bytes.load()), 
getCommonLabels()}
+    {"transferred_bytes", static_cast<double>(transferred_bytes.load()), 
getCommonLabels()},
+    {"incoming_flow_files", static_cast<double>(incoming_flow_files.load()), 
getCommonLabels()},
+    {"incoming_bytes", static_cast<double>(incoming_bytes.load()), 
getCommonLabels()},
+    {"bytes_read", static_cast<double>(bytes_read.load()), getCommonLabels()},
+    {"bytes_written", static_cast<double>(bytes_written.load()), 
getCommonLabels()},
+    {"processing_nanos", static_cast<double>(processing_nanos.load()), 
getCommonLabels()}
   };
 
   {
@@ -122,6 +132,15 @@ std::chrono::milliseconds 
ProcessorMetrics::getLastSessionCommitRuntime() const
   return session_commit_runtime_averager_.getLastValue();
 }
 
+std::optional<size_t> 
ProcessorMetrics::getTransferredFlowFilesToRelationshipCount(const std::string& 
relationship) const {
+  std::lock_guard<std::mutex> lock(transferred_relationships_mutex_);
+  const auto relationship_it = transferred_relationships_.find(relationship);
+  if (relationship_it != std::end(transferred_relationships_)) {
+    return relationship_it->second;
+  }
+  return {};
+}
+
 template<typename ValueType>
 requires Summable<ValueType> && DividableByInteger<ValueType>
 ValueType ProcessorMetrics::Averager<ValueType>::getAverage() const {
diff --git a/libminifi/src/core/state/nodes/FlowInformation.cpp 
b/libminifi/src/core/state/nodes/FlowInformation.cpp
index 5756072ac..f31b26c29 100644
--- a/libminifi/src/core/state/nodes/FlowInformation.cpp
+++ b/libminifi/src/core/state/nodes/FlowInformation.cpp
@@ -34,6 +34,12 @@ std::vector<SerializedResponseNode> 
FlowInformation::serialize() {
     {.name = "flowId", .value = flow_version_->getFlowId()}
   };
 
+  if (nullptr != monitor_) {
+    monitor_->executeOnComponent("FlowController", 
[&serialized](StateController& component) {
+      serialized.push_back({.name = "runStatus", .value = 
(component.isRunning() ? "Running" : "Stopped")});
+    });
+  }
+
   SerializedResponseNode uri;
   uri.name = "versionedFlowSnapshotURI";
   for (auto &entry : flow_version_->serialize()) {
@@ -61,19 +67,35 @@ std::vector<SerializedResponseNode> 
FlowInformation::serialize() {
     serialized.push_back(queues);
   }
 
-  if (nullptr != monitor_) {
-    SerializedResponseNode componentsNode{.name = "components", .collapsible = 
false};
-    monitor_->executeOnAllComponents([&componentsNode](StateController& 
component){
-      componentsNode.children.push_back({
-        .name = component.getComponentName(),
+  if (!processors_.empty()) {
+    SerializedResponseNode processorsStatusesNode{.name = "processorStatuses", 
.array = true, .collapsible = false};
+    for (const auto processor : processors_) {
+      if (!processor) {
+        continue;
+      }
+
+      auto metrics = processor->getMetrics();
+      processorsStatusesNode.children.push_back({
+        .name = processor->getName(),
         .collapsible = false,
         .children = {
-          {.name = "running", .value = component.isRunning()},
-          {.name = "uuid", .value = 
std::string{component.getComponentUUID().to_string()}}
+          {.name = "id", .value = std::string{processor->getUUIDStr()}},
+          {.name = "groupId", .value = processor->getProcessGroupUUIDStr()},
+          {.name = "bytesRead", .value = metrics->bytes_read.load()},
+          {.name = "bytesWritten", .value = metrics->bytes_written.load()},
+          {.name = "flowFilesIn", .value = 
metrics->incoming_flow_files.load()},
+          {.name = "flowFilesOut", .value = 
metrics->transferred_flow_files.load()},
+          {.name = "bytesIn", .value = metrics->incoming_bytes.load()},
+          {.name = "bytesOut", .value = metrics->transferred_bytes.load()},
+          {.name = "invocations", .value = metrics->invocations.load()},
+          {.name = "processingNanos", .value = 
metrics->processing_nanos.load()},
+          {.name = "activeThreadCount", .value = -1},
+          {.name = "terminatedThreadCount", .value = -1},
+          {.name = "runStatus", .value = (processor->isRunning() ? "Running" : 
"Stopped")}
         }
       });
-    });
-    serialized.push_back(componentsNode);
+    }
+    serialized.push_back(processorsStatusesNode);
   }
 
   return serialized;
@@ -81,13 +103,38 @@ std::vector<SerializedResponseNode> 
FlowInformation::serialize() {
 
 std::vector<PublishedMetric> FlowInformation::calculateMetrics() {
   std::vector<PublishedMetric> metrics = 
connection_store_.calculateConnectionMetrics("FlowInformation");
-
   if (nullptr != monitor_) {
-    monitor_->executeOnAllComponents([&metrics](StateController& component){
+    monitor_->executeOnComponent("FlowController", [&metrics](StateController& 
component) {
       metrics.push_back({"is_running", (component.isRunning() ? 1.0 : 0.0),
         {{"component_uuid", component.getComponentUUID().to_string()}, 
{"component_name", component.getComponentName()}, {"metric_class", 
"FlowInformation"}}});
     });
   }
+
+  for (const auto& processor : processors_) {
+    if (!processor) {
+      continue;
+    }
+    auto processor_metrics = processor->getMetrics();
+    metrics.push_back({"bytes_read", 
gsl::narrow<double>(processor_metrics->bytes_read.load()),
+        {{"processor_uuid", processor->getUUIDStr()}, {"processor_name", 
processor->getName()}, {"metric_class", "FlowInformation"}}});
+    metrics.push_back({"bytes_written", 
gsl::narrow<double>(processor_metrics->bytes_written.load()),
+        {{"processor_uuid", processor->getUUIDStr()}, {"processor_name", 
processor->getName()}, {"metric_class", "FlowInformation"}}});
+    metrics.push_back({"flow_files_in", 
gsl::narrow<double>(processor_metrics->incoming_flow_files.load()),
+        {{"processor_uuid", processor->getUUIDStr()}, {"processor_name", 
processor->getName()}, {"metric_class", "FlowInformation"}}});
+    metrics.push_back({"flow_files_out", 
gsl::narrow<double>(processor_metrics->transferred_flow_files.load()),
+        {{"processor_uuid", processor->getUUIDStr()}, {"processor_name", 
processor->getName()}, {"metric_class", "FlowInformation"}}});
+    metrics.push_back({"bytes_in", 
gsl::narrow<double>(processor_metrics->incoming_bytes.load()),
+        {{"processor_uuid", processor->getUUIDStr()}, {"processor_name", 
processor->getName()}, {"metric_class", "FlowInformation"}}});
+    metrics.push_back({"bytes_out", 
gsl::narrow<double>(processor_metrics->transferred_bytes.load()),
+        {{"processor_uuid", processor->getUUIDStr()}, {"processor_name", 
processor->getName()}, {"metric_class", "FlowInformation"}}});
+    metrics.push_back({"invocations", 
gsl::narrow<double>(processor_metrics->invocations.load()),
+        {{"processor_uuid", processor->getUUIDStr()}, {"processor_name", 
processor->getName()}, {"metric_class", "FlowInformation"}}});
+    metrics.push_back({"processing_nanos", 
gsl::narrow<double>(processor_metrics->processing_nanos.load()),
+        {{"processor_uuid", processor->getUUIDStr()}, {"processor_name", 
processor->getName()}, {"metric_class", "FlowInformation"}}});
+    metrics.push_back({"is_running", (processor->isRunning() ? 1.0 : 0.0),
+        {{"processor_uuid", processor->getUUIDStr()}, {"processor_name", 
processor->getName()}, {"metric_class", "FlowInformation"}}});
+  }
+
   return metrics;
 }
 
diff --git a/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp 
b/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp
index 2d892a60b..37337e3b0 100644
--- a/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp
+++ b/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp
@@ -213,9 +213,9 @@ void ResponseNodeLoader::initializeAssetInformation(const 
SharedResponseNode& re
   }
 }
 
-void ResponseNodeLoader::initializeFlowMonitor(const SharedResponseNode& 
response_node) const {
-  auto flowMonitor = 
dynamic_cast<state::response::FlowMonitor*>(response_node.get());
-  if (flowMonitor == nullptr) {
+void ResponseNodeLoader::initializeFlowInformation(const SharedResponseNode& 
response_node) const {
+  auto flow_information = 
dynamic_cast<state::response::FlowInformation*>(response_node.get());
+  if (flow_information == nullptr) {
     return;
   }
 
@@ -226,11 +226,17 @@ void ResponseNodeLoader::initializeFlowMonitor(const 
SharedResponseNode& respons
   }
 
   for (auto &con : connections) {
-    flowMonitor->updateConnection(con.second);
+    flow_information->updateConnection(con.second);
   }
-  flowMonitor->setStateMonitor(update_sink_);
+  flow_information->setStateMonitor(update_sink_);
   if (flow_configuration_) {
-    flowMonitor->setFlowVersion(flow_configuration_->getFlowVersion());
+    flow_information->setFlowVersion(flow_configuration_->getFlowVersion());
+  }
+
+  if (root_) {
+    std::vector<core::Processor*> processors;
+    root_->getAllProcessors(processors);
+    flow_information->setProcessors(processors);
   }
 }
 
@@ -253,7 +259,7 @@ std::vector<SharedResponseNode> 
ResponseNodeLoader::loadResponseNodes(const std:
     initializeAgentNode(response_node);
     initializeAgentStatus(response_node);
     initializeConfigurationChecksums(response_node);
-    initializeFlowMonitor(response_node);
+    initializeFlowInformation(response_node);
     initializeAssetInformation(response_node);
     initialized_metrics_.insert(response_node->getName());
   }
diff --git a/libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp 
b/libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp
index af2307e5c..6be8b7fd3 100644
--- a/libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp
+++ b/libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp
@@ -26,27 +26,37 @@ 
LineByLineInputOutputStreamCallback::LineByLineInputOutputStreamCallback(Callbac
   : callback_(std::move(callback)) {
 }
 
-int64_t LineByLineInputOutputStreamCallback::operator()(const 
std::shared_ptr<io::InputStream>& input, const 
std::shared_ptr<io::OutputStream>& output) {
+std::optional<io::ReadWriteResult> 
LineByLineInputOutputStreamCallback::operator()(const 
std::shared_ptr<io::InputStream>& input, const 
std::shared_ptr<io::OutputStream>& output) {
   gsl_Expects(input);
   gsl_Expects(output);
 
+  io::ReadWriteResult result;
+
   if (int64_t status = readInput(*input); status <= 0) {
-    return status;
+    if (status < 0) {
+      return std::nullopt;
+    }
+    return result;
   }
 
-  std::size_t total_bytes_written_ = 0;
+  result.bytes_read = gsl::narrow<int64_t>(input_.size());
+
+  std::size_t total_bytes_written = 0;
   bool is_first_line = true;
   readLine();
   do {
     readLine();
     std::string output_line = callback_(*current_line_, is_first_line, 
isLastLine());
     const auto bytes_written = output->write(reinterpret_cast<const uint8_t 
*>(output_line.data()), output_line.size());
-    if (io::isError(bytes_written)) { return -1; }
-    total_bytes_written_ += bytes_written;
+    if (io::isError(bytes_written)) {
+      return std::nullopt;
+    }
+    total_bytes_written += bytes_written;
     is_first_line = false;
   } while (!isLastLine());
 
-  return gsl::narrow<int64_t>(total_bytes_written_);
+  result.bytes_written = gsl::narrow<int64_t>(total_bytes_written);
+  return result;
 }
 
 int64_t LineByLineInputOutputStreamCallback::readInput(io::InputStream& 
stream) {
diff --git a/libminifi/test/integration/C2MetricsTest.cpp 
b/libminifi/test/integration/C2MetricsTest.cpp
index 6065b5ecf..6f6ec14ca 100644
--- a/libminifi/test/integration/C2MetricsTest.cpp
+++ b/libminifi/test/integration/C2MetricsTest.cpp
@@ -18,6 +18,7 @@
 #include <string>
 #include <iostream>
 #include <filesystem>
+#include <algorithm>
 
 #include "unit/TestBase.h"
 #include "integration/HTTPIntegrationBase.h"
@@ -92,7 +93,10 @@ class MetricsHandler: public HeartbeatHandler {
     VERIFY_UPDATED_METRICS
   };
 
-  static constexpr const char* GETTCP1_UUID = 
"2438e3c8-015a-1000-79ca-83af40ec1991";
+  static constexpr const char* GETTCP_UUID = 
"2438e3c8-015a-1000-79ca-83af40ec1991";
+  static constexpr const char* LOGATTRIBUTE1_UUID = 
"2438e3c8-015a-1000-79ca-83af40ec1992";
+  static constexpr const char* LOGATTRIBUTE2_UUID = 
"5128e3c8-015a-1000-79ca-83af40ec1990";
+  static constexpr const char* GENERATE_FLOWFILE_UUID = 
"4fe2d51d-076a-49b0-88de-5cf5adf52b8f";
 
   static void sendEmptyHeartbeatResponse(struct mg_connection* conn) {
     mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: 
text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
@@ -126,36 +130,67 @@ class MetricsHandler: public HeartbeatHandler {
     }
   }
 
-  static bool verifyRuntimeMetrics(const rapidjson::Value& runtime_metrics) {
+  static bool processorMetricsAreValid(const auto& processor) {
+    return processor["bytesRead"].GetInt() >= 0 &&
+      processor["bytesWritten"].GetInt() >= 0 &&
+      processor["flowFilesIn"].GetInt() >= 0 &&
+      processor["flowFilesOut"].GetInt() >= 0 &&
+      processor["bytesIn"].GetInt() >= 0 &&
+      processor["bytesOut"].GetInt() >= 0 &&
+      processor["invocations"].GetInt() >= 0 &&
+      processor["processingNanos"].GetInt() >= 0 &&
+      processor["activeThreadCount"].GetInt() == -1 &&
+      processor["terminatedThreadCount"].GetInt() == -1 &&
+      processor["runStatus"].GetString() == std::string("Running");
+  }
+
+  static bool verifyCommonRuntimeMetricNodes(const rapidjson::Value& 
runtime_metrics, const std::string& queue_id) {
     return runtime_metrics.HasMember("deviceInfo") &&
       runtime_metrics["deviceInfo"]["systemInfo"].HasMember("operatingSystem") 
&&
       runtime_metrics["deviceInfo"]["networkInfo"].HasMember("hostname") &&
       runtime_metrics.HasMember("flowInfo") &&
+      runtime_metrics["flowInfo"].HasMember("flowId") &&
+      runtime_metrics["flowInfo"].HasMember("runStatus") &&
+      runtime_metrics["flowInfo"]["runStatus"].GetString() == 
std::string("Running") &&
       runtime_metrics["flowInfo"].HasMember("versionedFlowSnapshotURI") &&
       runtime_metrics["flowInfo"].HasMember("queues") &&
-      runtime_metrics["flowInfo"].HasMember("components") &&
-      
runtime_metrics["flowInfo"]["queues"].HasMember("2438e3c8-015a-1000-79ca-83af40ec1997")
 &&
-      runtime_metrics["flowInfo"]["components"].HasMember("FlowController") &&
-      runtime_metrics["flowInfo"]["components"].HasMember("GetTCP") &&
-      runtime_metrics["flowInfo"]["components"].HasMember("LogAttribute") &&
+      runtime_metrics["flowInfo"]["queues"].HasMember(queue_id) &&
       runtime_metrics.HasMember("agentInfo") &&
-      
runtime_metrics["agentInfo"]["status"]["repositories"]["ff"].HasMember("size");
+      
runtime_metrics["agentInfo"]["status"]["repositories"]["ff"].HasMember("size") 
&&
+      runtime_metrics["flowInfo"].HasMember("processorStatuses");
+  }
+
+  static bool verifyRuntimeMetrics(const rapidjson::Value& runtime_metrics) {
+    return verifyCommonRuntimeMetricNodes(runtime_metrics, 
"2438e3c8-015a-1000-79ca-83af40ec1997") &&
+      [&]() {
+        const auto processor_statuses = 
runtime_metrics["flowInfo"]["processorStatuses"].GetArray();
+        if (processor_statuses.Size() != 2) {
+          return false;
+        }
+        return std::all_of(processor_statuses.begin(), 
processor_statuses.end(), [&](const auto& processor) {
+          if (processor["id"].GetString() != std::string(GETTCP_UUID) && 
processor["id"].GetString() != std::string(LOGATTRIBUTE1_UUID)) {
+            throw std::runtime_error(std::string("Unexpected processor id in 
processorStatuses: ") + processor["id"].GetString());
+          }
+          return processorMetricsAreValid(processor);
+        });
+      }();
   }
 
   static bool verifyUpdatedRuntimeMetrics(const rapidjson::Value& 
runtime_metrics) {
-    return runtime_metrics.HasMember("deviceInfo") &&
-      runtime_metrics["deviceInfo"]["systemInfo"].HasMember("operatingSystem") 
&&
-      runtime_metrics["deviceInfo"]["networkInfo"].HasMember("hostname") &&
-      runtime_metrics.HasMember("flowInfo") &&
-      runtime_metrics["flowInfo"].HasMember("versionedFlowSnapshotURI") &&
-      runtime_metrics["flowInfo"].HasMember("queues") &&
-      runtime_metrics["flowInfo"].HasMember("components") &&
-      
runtime_metrics["flowInfo"]["queues"].HasMember("8368e3c8-015a-1003-52ca-83af40ec1332")
 &&
-      runtime_metrics["flowInfo"]["components"].HasMember("FlowController") &&
-      runtime_metrics["flowInfo"]["components"].HasMember("GenerateFlowFile") 
&&
-      runtime_metrics["flowInfo"]["components"].HasMember("LogAttribute") &&
-      runtime_metrics.HasMember("agentInfo") &&
-      
runtime_metrics["agentInfo"]["status"]["repositories"]["ff"].HasMember("size");
+    return verifyCommonRuntimeMetricNodes(runtime_metrics, 
"8368e3c8-015a-1003-52ca-83af40ec1332") &&
+      runtime_metrics["flowInfo"].HasMember("processorStatuses") &&
+      [&]() {
+        const auto processor_statuses = 
runtime_metrics["flowInfo"]["processorStatuses"].GetArray();
+        if (processor_statuses.Size() != 2) {
+          return false;
+        }
+        return std::all_of(processor_statuses.begin(), 
processor_statuses.end(), [&](const auto& processor) {
+          if (processor["id"].GetString() != 
std::string(GENERATE_FLOWFILE_UUID) && processor["id"].GetString() != 
std::string(LOGATTRIBUTE2_UUID)) {
+            throw std::runtime_error(std::string("Unexpected processor id in 
processorStatuses: ") + processor["id"].GetString());
+          }
+          return processorMetricsAreValid(processor);
+        });
+      }();
   }
 
   static bool verifyLoadMetrics(const rapidjson::Value& load_metrics) {
@@ -177,13 +212,18 @@ class MetricsHandler: public HeartbeatHandler {
 
   static bool verifyProcessorMetrics(const rapidjson::Value& 
processor_metrics) {
     return processor_metrics.HasMember("GetTCPMetrics") &&
-      processor_metrics["GetTCPMetrics"].HasMember(GETTCP1_UUID) &&
-      
processor_metrics["GetTCPMetrics"][GETTCP1_UUID].HasMember("OnTriggerInvocations")
 &&
-      
processor_metrics["GetTCPMetrics"][GETTCP1_UUID]["OnTriggerInvocations"].GetUint()
 > 0 &&
-      
processor_metrics["GetTCPMetrics"][GETTCP1_UUID].HasMember("TransferredFlowFiles")
 &&
-      
processor_metrics["GetTCPMetrics"][GETTCP1_UUID].HasMember("AverageOnTriggerRunTime")
 &&
-      
processor_metrics["GetTCPMetrics"][GETTCP1_UUID].HasMember("LastOnTriggerRunTime")
 &&
-      
processor_metrics["GetTCPMetrics"][GETTCP1_UUID].HasMember("TransferredBytes");
+      processor_metrics["GetTCPMetrics"].HasMember(GETTCP_UUID) &&
+      
processor_metrics["GetTCPMetrics"][GETTCP_UUID].HasMember("OnTriggerInvocations")
 &&
+      
processor_metrics["GetTCPMetrics"][GETTCP_UUID]["OnTriggerInvocations"].GetUint()
 > 0 &&
+      
processor_metrics["GetTCPMetrics"][GETTCP_UUID].HasMember("TransferredFlowFiles")
 &&
+      
processor_metrics["GetTCPMetrics"][GETTCP_UUID].HasMember("AverageOnTriggerRunTime")
 &&
+      
processor_metrics["GetTCPMetrics"][GETTCP_UUID].HasMember("LastOnTriggerRunTime")
 &&
+      
processor_metrics["GetTCPMetrics"][GETTCP_UUID].HasMember("TransferredBytes") &&
+      
processor_metrics["GetTCPMetrics"][GETTCP_UUID].HasMember("IncomingFlowFiles") 
&&
+      
processor_metrics["GetTCPMetrics"][GETTCP_UUID].HasMember("IncomingBytes") &&
+      processor_metrics["GetTCPMetrics"][GETTCP_UUID].HasMember("BytesRead") &&
+      
processor_metrics["GetTCPMetrics"][GETTCP_UUID].HasMember("BytesWritten") &&
+      
processor_metrics["GetTCPMetrics"][GETTCP_UUID].HasMember("ProcessingNanos");
   }
 
   std::atomic_bool& metrics_updated_successfully_;
diff --git a/libminifi/test/libtest/unit/SingleProcessorTestController.h 
b/libminifi/test/libtest/unit/SingleProcessorTestController.h
index 0245e3314..5928c6bc5 100644
--- a/libminifi/test/libtest/unit/SingleProcessorTestController.h
+++ b/libminifi/test/libtest/unit/SingleProcessorTestController.h
@@ -39,7 +39,7 @@ class SingleProcessorTestController : public TestController {
  public:
   explicit SingleProcessorTestController(std::unique_ptr<core::Processor> 
processor) {
     auto name = processor->getName();
-    processor_ = plan->addProcessor(std::move(processor), name);
+    processor_ = plan->addProcessor(std::move(processor), name, {});
     input_ = plan->addConnection(nullptr, core::Relationship{"success", 
"success"}, processor_);
     outgoing_connections_ = [this] {
       std::unordered_map<core::Relationship, Connection*> result;
diff --git a/libminifi/test/libtest/unit/TestBase.cpp 
b/libminifi/test/libtest/unit/TestBase.cpp
index ee337986e..19f5de798 100644
--- a/libminifi/test/libtest/unit/TestBase.cpp
+++ b/libminifi/test/libtest/unit/TestBase.cpp
@@ -539,11 +539,13 @@ bool TestPlan::runProcessor(size_t target_location, const 
PreTriggerVerifier& ve
 
   if (verify) {
     auto current_session = 
std::make_shared<minifi::core::ProcessSession>(context);
+    current_session->setMetrics(processor->getMetrics());
     process_sessions_.push_back(current_session);
     verify(context, current_session);
     current_session->commit();
   } else {
     auto session_factory = std::make_shared<TestSessionFactory>(context, [&] 
(auto current_session) {
+      current_session->setMetrics(processor->getMetrics());
       process_sessions_.push_back(current_session);
     });
     logger_->log_info("Running {}", processor->getName());
diff --git a/libminifi/test/unit/MetricsTests.cpp 
b/libminifi/test/unit/MetricsTests.cpp
index 16e732c18..a8c4ad5ee 100644
--- a/libminifi/test/unit/MetricsTests.cpp
+++ b/libminifi/test/unit/MetricsTests.cpp
@@ -27,6 +27,7 @@
 #include "unit/ProvenanceTestHelper.h"
 #include "unit/DummyProcessor.h"
 #include "range/v3/algorithm/find_if.hpp"
+#include "unit/SingleProcessorTestController.h"
 
 using namespace std::literals::chrono_literals;
 
@@ -285,4 +286,76 @@ TEST_CASE("Test commit runtime processor metrics", 
"[ProcessorMetrics]") {
   REQUIRE(metrics.getAverageSessionCommitRuntime() == 37ms);
 }
 
+class DuplicateContentProcessor : public minifi::core::Processor {
+  using minifi::core::Processor::Processor;
+
+ public:
+  DuplicateContentProcessor(std::string_view name, const 
minifi::utils::Identifier& uuid) : Processor(name, uuid) {}
+  explicit DuplicateContentProcessor(std::string_view name) : Processor(name) 
{}
+  static constexpr const char* Description = "A processor that creates two 
more of the same flow file.";
+  static constexpr auto Properties = std::array<core::PropertyReference, 0>{};
+  static constexpr auto Success = core::RelationshipDefinition{"success", 
"Newly created FlowFiles"};
+  static constexpr auto Original = core::RelationshipDefinition{"original", 
"Original FlowFile"};
+  static constexpr auto Relationships = std::array{Success, Original};
+  static constexpr bool SupportsDynamicProperties = false;
+  static constexpr bool SupportsDynamicRelationships = false;
+  static constexpr core::annotation::Input InputRequirement = 
core::annotation::Input::INPUT_REQUIRED;
+  static constexpr bool IsSingleThreaded = false;
+  void initialize() override {
+    setSupportedRelationships(Relationships);
+  }
+  void onTrigger(core::ProcessContext& /*context*/, core::ProcessSession& 
session) override {
+    auto flow_file = session.get();
+    if (!flow_file) {
+      return;
+    }
+
+    auto flow_file_copy = session.create();
+    std::vector<std::byte> buffer;
+    session.read(flow_file, [&](const std::shared_ptr<io::InputStream>& 
stream) -> int64_t {
+      buffer.resize(stream->size());
+      return gsl::narrow<int64_t>(stream->read(buffer));
+    });
+    session.write(flow_file_copy, [&](const std::shared_ptr<io::OutputStream>& 
stream) -> int64_t {
+      return gsl::narrow<int64_t>(stream->write(buffer));
+    });
+    session.append(flow_file_copy, [&](const 
std::shared_ptr<io::OutputStream>& stream) -> int64_t {
+      return gsl::narrow<int64_t>(stream->write(buffer));
+    });
+    session.transfer(flow_file_copy, Success);
+    session.transfer(flow_file, Original);
+  }
+  ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
+};
+
+TEST_CASE("Test processor metrics change after trigger", "[ProcessorMetrics]") 
{
+  minifi::test::SingleProcessorTestController 
test_controller(std::make_unique<DuplicateContentProcessor>("DuplicateContentProcessor"));
+  test_controller.trigger({minifi::test::InputFlowFileData{"log line 1", {}}});
+  auto metrics = test_controller.getProcessor()->getMetrics();
+  CHECK(metrics->invocations == 1);
+  CHECK(metrics->incoming_flow_files == 1);
+  CHECK(metrics->transferred_flow_files == 2);
+  CHECK(metrics->getTransferredFlowFilesToRelationshipCount("success") == 1);
+  CHECK(metrics->getTransferredFlowFilesToRelationshipCount("original") == 1);
+  CHECK(metrics->incoming_bytes == 10);
+  CHECK(metrics->transferred_bytes == 30);
+  CHECK(metrics->bytes_read == 10);
+  CHECK(metrics->bytes_written == 20);
+  auto old_nanos = metrics->processing_nanos.load();
+  CHECK(metrics->processing_nanos > 0);
+
+  test_controller.trigger({minifi::test::InputFlowFileData{"new log line 2", 
{}}});
+  CHECK(metrics->invocations == 2);
+  CHECK(metrics->incoming_flow_files == 2);
+  CHECK(metrics->transferred_flow_files == 4);
+  CHECK(metrics->getTransferredFlowFilesToRelationshipCount("success") == 2);
+  CHECK(metrics->getTransferredFlowFilesToRelationshipCount("original") == 2);
+  CHECK(metrics->incoming_bytes == 24);
+  CHECK(metrics->transferred_bytes == 72);
+  CHECK(metrics->bytes_read == 24);
+  CHECK(metrics->bytes_written == 48);
+  CHECK(metrics->processing_nanos > old_nanos);
+}
+
+
 }  // namespace org::apache::nifi::minifi::test

Reply via email to