szaszm commented on code in PR #1909:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1909#discussion_r1918034022


##########
libminifi/include/core/state/nodes/FlowInformation.h:
##########
@@ -110,32 +117,17 @@ class FlowMonitor : public StateMonitorNode {
     connection_store_.updateConnection(connection);
   }
 
- protected:
-  std::shared_ptr<state::response::FlowVersion> flow_version_;
-  ConnectionStore connection_store_;
-};
-
-/**
- * Justification and Purpose: Provides flow version Information
- */
-class FlowInformation : public FlowMonitor {
- public:
-  FlowInformation(std::string_view name, const utils::Identifier &uuid)
-      : FlowMonitor(name, uuid) {
-  }
-
-  explicit FlowInformation(std::string_view name)
-      : FlowMonitor(name) {
-  }
-
-  MINIFIAPI static constexpr const char* Description = "Metric node that 
defines the flow ID and flow URL deployed to this agent";
-
-  std::string getName() const override {
-    return "flowInfo";
+  void setProcessors(std::vector<core::Processor*> processors) {

Review Comment:
   can the pointers be null? If yes, is it checked at the right places? If not, 
consider wrapping it in not_null.



##########
libminifi/src/core/ProcessorMetrics.cpp:
##########
@@ -122,6 +132,14 @@ std::chrono::milliseconds 
ProcessorMetrics::getLastSessionCommitRuntime() const
   return session_commit_runtime_averager_.getLastValue();
 }
 
+std::optional<size_t> 
ProcessorMetrics::getTransferredFlowFilesToRelationshipCount(const std::string& 
relationship) const {
+  std::lock_guard<std::mutex> lock(transferred_relationships_mutex_);
+  if (transferred_relationships_.contains(relationship)) {
+    return transferred_relationships_.at(relationship);

Review Comment:
   double lookup
   ```suggestion
     const auto relationship_it = transferred_relationships_.find(relationship);
     if (relationship_it != std::end(transferred_relationships_)) {
       return *relationship_it;
   ```



##########
C2.md:
##########
@@ -271,3 +284,340 @@ in minifi.properties to activate the file update trigger 
specify
     # specifying a trigger
     nifi.c2.agent.trigger.classes=FileUpdateTrigger
     nifi.c2.file.watch=<full path of file to monitor>
+
+## C2 Response Nodes
+
+The following is a list of nodes that can be defined in the minifi.properties 
file for the C2 heartbeat response as part of the C2 root nodes defined in the 
`nifi.c2.root.classes` property or in the metrics nodes defined in the tree 
under `nifi.c2.root.class.definitions` as stated above.
+
+### AgentInformation
+
+Contains information about the agent's build, extensions, supported C2 
operations and status of its components.
+
+```
+"agentInfo": {
+    "agentManifest": {
+        "buildInfo": {
+            "compiler": "/usr/lib/ccache/g++",
+            "flags": " 
-std=c++20;-Wall;-Wextra;-Werror;-Wno-error=restrict;SODIUM_STATIC=1",
+            "revision": "cc9aaac37a9a6b7efeb3c4394a97522a600a1758",
+            "timestamp": 1734001238,
+            "version": "0.99.1"
+        },
+        "bundles": [
+            {
+                "componentManifest": {
+                    "processors": [
+                    ...
+                    ]
+                },
+                "artifact": "minifi-civet-extensions",
+                "group": "org.apache.nifi.minifi",
+                "version": "0.99.1"
+            }
+        ],
+        "schedulingDefaults": {
+            "defaultMaxConcurrentTasks": 1,
+            "defaultRunDurationNanos": 0,
+            "defaultSchedulingPeriodMillis": 1000,
+            "defaultSchedulingStrategy": "TIMER_DRIVEN",
+            "penalizationPeriodMillis": 30000,
+            "yieldDurationMillis": 1000
+        },
+        "supportedOperations": [
+            {
+                "type": "acknowledge"
+            }
+            ...
+        ],
+        "agentType": "cpp",
+        "identifier": "bH77vXakM0Lkgt8VcDOGZVW3"
+    },
+    "status": {
+        "repositories": {
+            "content_repo": {
+                "entryCount": 0,
+                "full": false,
+                "maxSize": 0,
+                "running": true,
+                "size": 0
+            },
+            "flow_file_repo": {
+                "entryCount": 0,
+                "full": false,
+                "maxSize": 0,
+                "running": true,
+                "size": 0
+            },
+            
"org::apache::nifi::minifi::core::repository::VolatileContentRepository": {
+                "entryCount": 4,
+                "full": false,
+                "maxSize": 7864320,
+                "running": true,
+                "size": 40
+            }
+        },
+        "components": {
+            "LogAttribute": {
+                "running": true,
+                "uuid": "5128e3c8-015a-1000-79ca-83af40ec1990"
+            },
+            "GenerateFlowFile": {
+                "running": true,
+                "uuid": "4fe2d51d-076a-49b0-88de-5cf5adf52b8f"
+            },
+            "FlowController": {
+                "running": true,
+                "uuid": "2438e3c8-015a-1000-79ca-83af40ec1990"
+            }
+        },
+        "resourceConsumption": {
+            "cpuUtilization": 0.05,
+            "memoryUsage": 97955840
+        },
+        "uptime": 1025
+    },
+    "agentClass": "test",
+    "agentManifestHash": 
"9FFC8326121A816E5B2FD674CE9A34321F89CC690AD0D1FD79DFB5969B3B523D6570520382E82C68CFA347FBD9897FC027E518E98CFA229C18617B062E1C9E77",
+    "identifier": "9628acfe-b9fe-11ef-a0c0-10f60a596f64"
+}
+```
+
+### AgentStatus
+
+Contains information about the agent's status, including the status of its 
components, repositories, and resource consumption.
+
+```
+"AgentStatus": {
+    "repositories": {
+        "repo_name": {
+            "entryCount": 0,
+            "full": false,
+            "maxSize": 0,
+            "running": true,
+            "size": 0
+        },
+        "ff": {
+            "entryCount": 0,
+            "full": false,
+            "maxSize": 0,
+            "running": true,
+            "size": 0
+        },
+        
"org::apache::nifi::minifi::core::repository::VolatileContentRepository": {
+            "entryCount": 4,
+            "full": false,
+            "maxSize": 7864320,
+            "running": true,
+            "size": 40
+        }
+    },
+    "components": {
+        "LogAttribute": {
+            "running": true,
+            "uuid": "5128e3c8-015a-1000-79ca-83af40ec1990"
+        },
+        "GenerateFlowFile": {
+            "running": true,
+            "uuid": "4fe2d51d-076a-49b0-88de-5cf5adf52b8f"
+        },
+        "FlowController": {
+            "running": true,
+            "uuid": "2438e3c8-015a-1000-79ca-83af40ec1990"
+        }
+    },
+    "resourceConsumption": {
+        "cpuUtilization": 0.0028846153846153849,
+        "memoryUsage": 97955840
+    },
+    "uptime": 995
+}
+```
+
+### AssetInformation
+
+Contains the calculated hash of the assets.
+
+```
+"resourceInfo": {
+    "hash": "null"
+}
+```
+
+### BuildInformation
+
+Contains information about the agent's build.
+
+```
+"BuildInformation": {
+    "compiler": {
+        "compiler_command": "/usr/lib/ccache/g++",
+        "compiler_flags": " 
-std=c++20;-Wall;-Wextra;-Werror;-Wno-error=restrict;SODIUM_STATIC=1",
+        "compiler_version": "11.4.0"
+    },
+    "build_date": "1734001238",
+    "build_rev": "cc9aaac37a9a6b7efeb3c4394a97522a600a1758",
+    "build_version": "0.99.1",
+    "device_id": "bH77vXakM0Lkgt8VcDOGZVW3"
+}
+```
+
+### ConfigurationChecksums
+
+Metric node that defines checksums of configuration files in the C2 protocol.
+
+```
+"configurationChecksums": {
+    "SHA256": {
+        "TestC2Metrics.yml": 
"9af6589bf7729bb88857aafe98cea4f41df049725401b5f0ded0a7b949d9b90c",
+        "minifi.properties": 
"06fb9f4730e3db7d0a0a1ee606a7de3fee5813edf42eab140616e8a2995072df"
+    }
+},
+```
+
+### DeviceInfoNode
+
+Contains information about the device the agent is running on.
+
+```
+"deviceInfo": {
+    "systemInfo": {
+        "cpuLoadAverage": 1.271484375,
+        "cpuUtilization": 0.06179499754781756,
+        "machineArch": "x86_64",
+        "memoryUsage": 12681670656,
+        "operatingSystem": "Linux",
+        "physicalMem": 67081129984,
+        "vCores": 20
+    },
+    "networkInfo": {
+        "hostname": "ggyimesi-5570-ubuntu",
+        "ipAddress": "10.255.0.1"
+    },
+    "identifier": "16475557466943148337"
+}
+```
+
+### FlowInformation
+
+Contains information about the flow the agent is running, including the 
versioned flow snapshot URI, queues, components, and processor statuses.
+
+```
+"flowInfo": {
+    "versionedFlowSnapshotURI": {
+        "bucketId": "default",
+        "flowId": "96273342-b9fe-11ef-a0ad-10f60a596f64"
+    },
+    "queues": {
+        "8368e3c8-015a-1003-52ca-83af40ec1332": {
+            "dataSize": 40,
+            "dataSizeMax": 1048576,
+            "name": "GenerateFlowFile/success/LogAttribute",
+            "size": 4,
+            "sizeMax": 0,
+            "uuid": "8368e3c8-015a-1003-52ca-83af40ec1332"
+        }
+    },
+    "processorStatuses": [
+        {
+            "id": "5128e3c8-015a-1000-79ca-83af40ec1990",
+            "groupId": "2438e3c8-015a-1000-79ca-83af40ec1990",
+            "bytesRead": 0,
+            "bytesWritten": 0,
+            "flowFilesIn": 0,
+            "flowFilesOut": 0,
+            "bytesIn": 0,
+            "bytesOut": 0,
+            "invocations": 0,
+            "processingNanos": 0,
+            "activeThreadCount": -1,
+            "terminatedThreadCount": -1,
+            "running": true
+        },
+        {
+            "id": "4fe2d51d-076a-49b0-88de-5cf5adf52b8f",
+            "groupId": "2438e3c8-015a-1000-79ca-83af40ec1990",
+            "bytesRead": 0,
+            "bytesWritten": 40,
+            "flowFilesIn": 0,
+            "flowFilesOut": 4,
+            "bytesIn": 0,
+            "bytesOut": 40,
+            "invocations": 4,
+            "processingNanos": 2119148,
+            "activeThreadCount": -1,
+            "terminatedThreadCount": -1,
+            "running": true
+        }
+    ],
+    "flowId": "96273342-b9fe-11ef-a0ad-10f60a596f64"
+}
+```
+
+### QueueMetrics
+
+Contains information about the queues in the flow, including the contained 
data and number of flow files.
+
+```
+"QueueMetrics": {
+    "GenerateFlowFile/success/LogAttribute": {
+        "datasize": "40",
+        "datasizemax": "1048576",
+        "queued": "4",
+        "queuedmax": "0"
+    }
+}
+```
+
+### RepositoryMetrics
+
+Contains information about the repositories in the agent, including the number 
of entries, size, and whether the repository is full.
+
+
+```
+"RepositoryMetrics": {
+    "repo_name": {
+        "entryCount": 0,
+        "full": false,
+        "maxSize": 0,
+        "running": true,
+        "size": 0
+    },
+    "ff": {
+        "entryCount": 0,
+        "full": false,
+        "maxSize": 0,
+        "running": true,
+        "size": 0
+    },
+    "org::apache::nifi::minifi::core::repository::VolatileContentRepository": {
+        "entryCount": 4,
+        "full": false,
+        "maxSize": 7864320,
+        "running": true,
+        "size": 40
+    }
+}
+```
+
+### Processor Metric Response Nodes
+
+Each processor can have its own metrics. These metric nodes can be configured 
in the minifi.properties by requesting metrics in the <ProcessorTye>Metric 
format, for example GetTCPMetrics to request metrics for the GetTCP processors. 
Besides configuring processor metrics directly, they can also be configured 
using regular expressions with the `processorMetrics/` prefix. For example 
`processorMetrics/Get.*Metrics` will match all processor metrics that start 
with Get.

Review Comment:
   ```suggestion
   Each processor can have its own metrics. These metric nodes can be 
configured in the minifi.properties by requesting metrics in the 
\<ProcessorType\>Metric format, for example GetTCPMetrics to request metrics 
for the GetTCP processors. Besides configuring processor metrics directly, they 
can also be configured using regular expressions with the `processorMetrics/` 
prefix. For example `processorMetrics/Get.*Metrics` will match all processor 
metrics that start with Get.
   ```



##########
libminifi/test/integration/C2MetricsTest.cpp:
##########
@@ -131,23 +134,67 @@ class MetricsHandler: public HeartbeatHandler {
       runtime_metrics.HasMember("flowInfo") &&
       runtime_metrics["flowInfo"].HasMember("versionedFlowSnapshotURI") &&
       runtime_metrics["flowInfo"].HasMember("queues") &&
-      runtime_metrics["flowInfo"].HasMember("components") &&
       
runtime_metrics["flowInfo"]["queues"].HasMember("2438e3c8-015a-1000-79ca-83af40ec1997")
 &&
-      runtime_metrics["flowInfo"]["components"].HasMember("FlowController") &&
-      runtime_metrics["flowInfo"]["components"].HasMember("GetTCP") &&
-      runtime_metrics["flowInfo"]["components"].HasMember("LogAttribute");
+      runtime_metrics["flowInfo"].HasMember("processorStatuses") &&
+      [&]() {
+        if (runtime_metrics["flowInfo"]["processorStatuses"].GetArray().Size() 
!= 2) {
+          return false;
+        }
+        for (const auto& processor : 
runtime_metrics["flowInfo"]["processorStatuses"].GetArray()) {  // 
NOLINT(readability-use-anyofallof)
+          if (processor["id"].GetString() != std::string(GETTCP_UUID) && 
processor["id"].GetString() != std::string(LOGATTRIBUTE1_UUID)) {
+            throw std::runtime_error(std::string("Unexpected processor id in 
processorStatuses: ") + processor["id"].GetString());
+          }
+
+          if (processor["bytesRead"].GetInt() < 0 ||
+              processor["bytesWritten"].GetInt() < 0 ||
+              processor["flowFilesIn"].GetInt() < 0 ||
+              processor["flowFilesOut"].GetInt() < 0 ||
+              processor["bytesIn"].GetInt() < 0 ||
+              processor["bytesOut"].GetInt() < 0 ||
+              processor["invocations"].GetInt() < 0 ||
+              processor["processingNanos"].GetInt() < 0 ||
+              processor["activeThreadCount"].GetInt() != -1 ||
+              processor["terminatedThreadCount"].GetInt() != -1 ||
+              !processor["running"].GetBool()) {
+            return false;
+          }
+        }
+        return true;
+      }();
   }
 
   static bool verifyUpdatedRuntimeMetrics(const rapidjson::Value& 
runtime_metrics) {
     return runtime_metrics.HasMember("deviceInfo") &&
       runtime_metrics.HasMember("flowInfo") &&
       runtime_metrics["flowInfo"].HasMember("versionedFlowSnapshotURI") &&
       runtime_metrics["flowInfo"].HasMember("queues") &&
-      runtime_metrics["flowInfo"].HasMember("components") &&
       
runtime_metrics["flowInfo"]["queues"].HasMember("8368e3c8-015a-1003-52ca-83af40ec1332")
 &&
-      runtime_metrics["flowInfo"]["components"].HasMember("FlowController") &&
-      runtime_metrics["flowInfo"]["components"].HasMember("GenerateFlowFile") 
&&
-      runtime_metrics["flowInfo"]["components"].HasMember("LogAttribute");
+      runtime_metrics["flowInfo"].HasMember("processorStatuses") &&
+      [&]() {
+        if (runtime_metrics["flowInfo"]["processorStatuses"].GetArray().Size() 
!= 2) {
+          return false;
+        }
+        for (const auto& processor : 
runtime_metrics["flowInfo"]["processorStatuses"].GetArray()) {  // 
NOLINT(readability-use-anyofallof)

Review Comment:
   same as above. It also looks like code duplication, consider extracting the 
common code to a local helper function.



##########
libminifi/test/integration/C2MetricsTest.cpp:
##########
@@ -131,23 +134,67 @@ class MetricsHandler: public HeartbeatHandler {
       runtime_metrics.HasMember("flowInfo") &&
       runtime_metrics["flowInfo"].HasMember("versionedFlowSnapshotURI") &&
       runtime_metrics["flowInfo"].HasMember("queues") &&
-      runtime_metrics["flowInfo"].HasMember("components") &&
       
runtime_metrics["flowInfo"]["queues"].HasMember("2438e3c8-015a-1000-79ca-83af40ec1997")
 &&
-      runtime_metrics["flowInfo"]["components"].HasMember("FlowController") &&
-      runtime_metrics["flowInfo"]["components"].HasMember("GetTCP") &&
-      runtime_metrics["flowInfo"]["components"].HasMember("LogAttribute");
+      runtime_metrics["flowInfo"].HasMember("processorStatuses") &&
+      [&]() {
+        if (runtime_metrics["flowInfo"]["processorStatuses"].GetArray().Size() 
!= 2) {
+          return false;
+        }
+        for (const auto& processor : 
runtime_metrics["flowInfo"]["processorStatuses"].GetArray()) {  // 
NOLINT(readability-use-anyofallof)

Review Comment:
   I side with the linter here, we should use `all_of`. The predicate callback 
can still throw if necessary.



##########
libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp:
##########
@@ -26,27 +26,36 @@ 
LineByLineInputOutputStreamCallback::LineByLineInputOutputStreamCallback(Callbac
   : callback_(std::move(callback)) {
 }
 
-int64_t LineByLineInputOutputStreamCallback::operator()(const 
std::shared_ptr<io::InputStream>& input, const 
std::shared_ptr<io::OutputStream>& output) {
+io::ReadWriteResult LineByLineInputOutputStreamCallback::operator()(const 
std::shared_ptr<io::InputStream>& input, const 
std::shared_ptr<io::OutputStream>& output) {
   gsl_Expects(input);
   gsl_Expects(output);
 
+  io::ReadWriteResult result;
+
   if (int64_t status = readInput(*input); status <= 0) {
-    return status;
+    result.bytes_read = status;
+    return result;
   }
 
-  std::size_t total_bytes_written_ = 0;
+  result.bytes_read = gsl::narrow<int64_t>(input_.size());
+
+  std::size_t total_bytes_written = 0;
   bool is_first_line = true;
   readLine();
   do {
     readLine();
     std::string output_line = callback_(*current_line_, is_first_line, 
isLastLine());
     const auto bytes_written = output->write(reinterpret_cast<const uint8_t 
*>(output_line.data()), output_line.size());
-    if (io::isError(bytes_written)) { return -1; }
-    total_bytes_written_ += bytes_written;
+    if (io::isError(bytes_written)) {
+      result.bytes_written = gsl::narrow<int64_t>(bytes_written);
+      return result;
+    }

Review Comment:
   did you mean to return the error code as `result.bytes_written`? It used to 
return -1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to