szaszm commented on code in PR #1909:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1909#discussion_r1918034022
##########
libminifi/include/core/state/nodes/FlowInformation.h:
##########
@@ -110,32 +117,17 @@ class FlowMonitor : public StateMonitorNode {
connection_store_.updateConnection(connection);
}
- protected:
- std::shared_ptr<state::response::FlowVersion> flow_version_;
- ConnectionStore connection_store_;
-};
-
-/**
- * Justification and Purpose: Provides flow version Information
- */
-class FlowInformation : public FlowMonitor {
- public:
- FlowInformation(std::string_view name, const utils::Identifier &uuid)
- : FlowMonitor(name, uuid) {
- }
-
- explicit FlowInformation(std::string_view name)
- : FlowMonitor(name) {
- }
-
- MINIFIAPI static constexpr const char* Description = "Metric node that
defines the flow ID and flow URL deployed to this agent";
-
- std::string getName() const override {
- return "flowInfo";
+ void setProcessors(std::vector<core::Processor*> processors) {
Review Comment:
can the pointers be null? If yes, is it checked at the right places? If not,
consider wrapping it in not_null.
##########
libminifi/src/core/ProcessorMetrics.cpp:
##########
@@ -122,6 +132,14 @@ std::chrono::milliseconds
ProcessorMetrics::getLastSessionCommitRuntime() const
return session_commit_runtime_averager_.getLastValue();
}
+std::optional<size_t>
ProcessorMetrics::getTransferredFlowFilesToRelationshipCount(const std::string&
relationship) const {
+ std::lock_guard<std::mutex> lock(transferred_relationships_mutex_);
+ if (transferred_relationships_.contains(relationship)) {
+ return transferred_relationships_.at(relationship);
Review Comment:
double lookup
```suggestion
const auto relationship_it = transferred_relationships_.find(relationship);
if (relationship_it != std::end(transferred_relationships_)) {
return *relationship_it;
```
##########
C2.md:
##########
@@ -271,3 +284,340 @@ in minifi.properties to activate the file update trigger
specify
# specifying a trigger
nifi.c2.agent.trigger.classes=FileUpdateTrigger
nifi.c2.file.watch=<full path of file to monitor>
+
+## C2 Response Nodes
+
+The following is a list of nodes that can be defined in the minifi.properties
file for the C2 heartbeat response as part of the C2 root nodes defined in the
`nifi.c2.root.classes` property or in the metrics nodes defined in the tree
under `nifi.c2.root.class.definitions` as stated above.
+
+### AgentInformation
+
+Contains information about the agent's build, extensions, supported C2
operations and status of its components.
+
+```
+"agentInfo": {
+ "agentManifest": {
+ "buildInfo": {
+ "compiler": "/usr/lib/ccache/g++",
+ "flags": "
-std=c++20;-Wall;-Wextra;-Werror;-Wno-error=restrict;SODIUM_STATIC=1",
+ "revision": "cc9aaac37a9a6b7efeb3c4394a97522a600a1758",
+ "timestamp": 1734001238,
+ "version": "0.99.1"
+ },
+ "bundles": [
+ {
+ "componentManifest": {
+ "processors": [
+ ...
+ ]
+ },
+ "artifact": "minifi-civet-extensions",
+ "group": "org.apache.nifi.minifi",
+ "version": "0.99.1"
+ }
+ ],
+ "schedulingDefaults": {
+ "defaultMaxConcurrentTasks": 1,
+ "defaultRunDurationNanos": 0,
+ "defaultSchedulingPeriodMillis": 1000,
+ "defaultSchedulingStrategy": "TIMER_DRIVEN",
+ "penalizationPeriodMillis": 30000,
+ "yieldDurationMillis": 1000
+ },
+ "supportedOperations": [
+ {
+ "type": "acknowledge"
+ }
+ ...
+ ],
+ "agentType": "cpp",
+ "identifier": "bH77vXakM0Lkgt8VcDOGZVW3"
+ },
+ "status": {
+ "repositories": {
+ "content_repo": {
+ "entryCount": 0,
+ "full": false,
+ "maxSize": 0,
+ "running": true,
+ "size": 0
+ },
+ "flow_file_repo": {
+ "entryCount": 0,
+ "full": false,
+ "maxSize": 0,
+ "running": true,
+ "size": 0
+ },
+
"org::apache::nifi::minifi::core::repository::VolatileContentRepository": {
+ "entryCount": 4,
+ "full": false,
+ "maxSize": 7864320,
+ "running": true,
+ "size": 40
+ }
+ },
+ "components": {
+ "LogAttribute": {
+ "running": true,
+ "uuid": "5128e3c8-015a-1000-79ca-83af40ec1990"
+ },
+ "GenerateFlowFile": {
+ "running": true,
+ "uuid": "4fe2d51d-076a-49b0-88de-5cf5adf52b8f"
+ },
+ "FlowController": {
+ "running": true,
+ "uuid": "2438e3c8-015a-1000-79ca-83af40ec1990"
+ }
+ },
+ "resourceConsumption": {
+ "cpuUtilization": 0.05,
+ "memoryUsage": 97955840
+ },
+ "uptime": 1025
+ },
+ "agentClass": "test",
+ "agentManifestHash":
"9FFC8326121A816E5B2FD674CE9A34321F89CC690AD0D1FD79DFB5969B3B523D6570520382E82C68CFA347FBD9897FC027E518E98CFA229C18617B062E1C9E77",
+ "identifier": "9628acfe-b9fe-11ef-a0c0-10f60a596f64"
+}
+```
+
+### AgentStatus
+
+Contains information about the agent's status, including the status of its
components, repositories, and resource consumption.
+
+```
+"AgentStatus": {
+ "repositories": {
+ "repo_name": {
+ "entryCount": 0,
+ "full": false,
+ "maxSize": 0,
+ "running": true,
+ "size": 0
+ },
+ "ff": {
+ "entryCount": 0,
+ "full": false,
+ "maxSize": 0,
+ "running": true,
+ "size": 0
+ },
+
"org::apache::nifi::minifi::core::repository::VolatileContentRepository": {
+ "entryCount": 4,
+ "full": false,
+ "maxSize": 7864320,
+ "running": true,
+ "size": 40
+ }
+ },
+ "components": {
+ "LogAttribute": {
+ "running": true,
+ "uuid": "5128e3c8-015a-1000-79ca-83af40ec1990"
+ },
+ "GenerateFlowFile": {
+ "running": true,
+ "uuid": "4fe2d51d-076a-49b0-88de-5cf5adf52b8f"
+ },
+ "FlowController": {
+ "running": true,
+ "uuid": "2438e3c8-015a-1000-79ca-83af40ec1990"
+ }
+ },
+ "resourceConsumption": {
+ "cpuUtilization": 0.0028846153846153849,
+ "memoryUsage": 97955840
+ },
+ "uptime": 995
+}
+```
+
+### AssetInformation
+
+Contains the calculated hash of the assets.
+
+```
+"resourceInfo": {
+ "hash": "null"
+}
+```
+
+### BuildInformation
+
+Contains information about the agent's build.
+
+```
+"BuildInformation": {
+ "compiler": {
+ "compiler_command": "/usr/lib/ccache/g++",
+ "compiler_flags": "
-std=c++20;-Wall;-Wextra;-Werror;-Wno-error=restrict;SODIUM_STATIC=1",
+ "compiler_version": "11.4.0"
+ },
+ "build_date": "1734001238",
+ "build_rev": "cc9aaac37a9a6b7efeb3c4394a97522a600a1758",
+ "build_version": "0.99.1",
+ "device_id": "bH77vXakM0Lkgt8VcDOGZVW3"
+}
+```
+
+### ConfigurationChecksums
+
+Metric node that defines checksums of configuration files in the C2 protocol.
+
+```
+"configurationChecksums": {
+ "SHA256": {
+ "TestC2Metrics.yml":
"9af6589bf7729bb88857aafe98cea4f41df049725401b5f0ded0a7b949d9b90c",
+ "minifi.properties":
"06fb9f4730e3db7d0a0a1ee606a7de3fee5813edf42eab140616e8a2995072df"
+ }
+},
+```
+
+### DeviceInfoNode
+
+Contains information about the device the agent is running on.
+
+```
+"deviceInfo": {
+ "systemInfo": {
+ "cpuLoadAverage": 1.271484375,
+ "cpuUtilization": 0.06179499754781756,
+ "machineArch": "x86_64",
+ "memoryUsage": 12681670656,
+ "operatingSystem": "Linux",
+ "physicalMem": 67081129984,
+ "vCores": 20
+ },
+ "networkInfo": {
+ "hostname": "ggyimesi-5570-ubuntu",
+ "ipAddress": "10.255.0.1"
+ },
+ "identifier": "16475557466943148337"
+}
+```
+
+### FlowInformation
+
+Contains information about the flow the agent is running, including the
versioned flow snapshot URI, queues, components, and processor statuses.
+
+```
+"flowInfo": {
+ "versionedFlowSnapshotURI": {
+ "bucketId": "default",
+ "flowId": "96273342-b9fe-11ef-a0ad-10f60a596f64"
+ },
+ "queues": {
+ "8368e3c8-015a-1003-52ca-83af40ec1332": {
+ "dataSize": 40,
+ "dataSizeMax": 1048576,
+ "name": "GenerateFlowFile/success/LogAttribute",
+ "size": 4,
+ "sizeMax": 0,
+ "uuid": "8368e3c8-015a-1003-52ca-83af40ec1332"
+ }
+ },
+ "processorStatuses": [
+ {
+ "id": "5128e3c8-015a-1000-79ca-83af40ec1990",
+ "groupId": "2438e3c8-015a-1000-79ca-83af40ec1990",
+ "bytesRead": 0,
+ "bytesWritten": 0,
+ "flowFilesIn": 0,
+ "flowFilesOut": 0,
+ "bytesIn": 0,
+ "bytesOut": 0,
+ "invocations": 0,
+ "processingNanos": 0,
+ "activeThreadCount": -1,
+ "terminatedThreadCount": -1,
+ "running": true
+ },
+ {
+ "id": "4fe2d51d-076a-49b0-88de-5cf5adf52b8f",
+ "groupId": "2438e3c8-015a-1000-79ca-83af40ec1990",
+ "bytesRead": 0,
+ "bytesWritten": 40,
+ "flowFilesIn": 0,
+ "flowFilesOut": 4,
+ "bytesIn": 0,
+ "bytesOut": 40,
+ "invocations": 4,
+ "processingNanos": 2119148,
+ "activeThreadCount": -1,
+ "terminatedThreadCount": -1,
+ "running": true
+ }
+ ],
+ "flowId": "96273342-b9fe-11ef-a0ad-10f60a596f64"
+}
+```
+
+### QueueMetrics
+
+Contains information about the queues in the flow, including the contained
data and number of flow files.
+
+```
+"QueueMetrics": {
+ "GenerateFlowFile/success/LogAttribute": {
+ "datasize": "40",
+ "datasizemax": "1048576",
+ "queued": "4",
+ "queuedmax": "0"
+ }
+}
+```
+
+### RepositoryMetrics
+
+Contains information about the repositories in the agent, including the number
of entries, size, and whether the repository is full.
+
+
+```
+"RepositoryMetrics": {
+ "repo_name": {
+ "entryCount": 0,
+ "full": false,
+ "maxSize": 0,
+ "running": true,
+ "size": 0
+ },
+ "ff": {
+ "entryCount": 0,
+ "full": false,
+ "maxSize": 0,
+ "running": true,
+ "size": 0
+ },
+ "org::apache::nifi::minifi::core::repository::VolatileContentRepository": {
+ "entryCount": 4,
+ "full": false,
+ "maxSize": 7864320,
+ "running": true,
+ "size": 40
+ }
+}
+```
+
+### Processor Metric Response Nodes
+
+Each processor can have its own metrics. These metric nodes can be configured
in the minifi.properties by requesting metrics in the <ProcessorTye>Metric
format, for example GetTCPMetrics to request metrics for the GetTCP processors.
Besides configuring processor metrics directly, they can also be configured
using regular expressions with the `processorMetrics/` prefix. For example
`processorMetrics/Get.*Metrics` will match all processor metrics that start
with Get.
Review Comment:
```suggestion
Each processor can have its own metrics. These metric nodes can be
configured in the minifi.properties by requesting metrics in the
\<ProcessorType\>Metric format, for example GetTCPMetrics to request metrics
for the GetTCP processors. Besides configuring processor metrics directly, they
can also be configured using regular expressions with the `processorMetrics/`
prefix. For example `processorMetrics/Get.*Metrics` will match all processor
metrics that start with Get.
```
##########
libminifi/test/integration/C2MetricsTest.cpp:
##########
@@ -131,23 +134,67 @@ class MetricsHandler: public HeartbeatHandler {
runtime_metrics.HasMember("flowInfo") &&
runtime_metrics["flowInfo"].HasMember("versionedFlowSnapshotURI") &&
runtime_metrics["flowInfo"].HasMember("queues") &&
- runtime_metrics["flowInfo"].HasMember("components") &&
runtime_metrics["flowInfo"]["queues"].HasMember("2438e3c8-015a-1000-79ca-83af40ec1997")
&&
- runtime_metrics["flowInfo"]["components"].HasMember("FlowController") &&
- runtime_metrics["flowInfo"]["components"].HasMember("GetTCP") &&
- runtime_metrics["flowInfo"]["components"].HasMember("LogAttribute");
+ runtime_metrics["flowInfo"].HasMember("processorStatuses") &&
+ [&]() {
+ if (runtime_metrics["flowInfo"]["processorStatuses"].GetArray().Size()
!= 2) {
+ return false;
+ }
+ for (const auto& processor :
runtime_metrics["flowInfo"]["processorStatuses"].GetArray()) { //
NOLINT(readability-use-anyofallof)
+ if (processor["id"].GetString() != std::string(GETTCP_UUID) &&
processor["id"].GetString() != std::string(LOGATTRIBUTE1_UUID)) {
+ throw std::runtime_error(std::string("Unexpected processor id in
processorStatuses: ") + processor["id"].GetString());
+ }
+
+ if (processor["bytesRead"].GetInt() < 0 ||
+ processor["bytesWritten"].GetInt() < 0 ||
+ processor["flowFilesIn"].GetInt() < 0 ||
+ processor["flowFilesOut"].GetInt() < 0 ||
+ processor["bytesIn"].GetInt() < 0 ||
+ processor["bytesOut"].GetInt() < 0 ||
+ processor["invocations"].GetInt() < 0 ||
+ processor["processingNanos"].GetInt() < 0 ||
+ processor["activeThreadCount"].GetInt() != -1 ||
+ processor["terminatedThreadCount"].GetInt() != -1 ||
+ !processor["running"].GetBool()) {
+ return false;
+ }
+ }
+ return true;
+ }();
}
static bool verifyUpdatedRuntimeMetrics(const rapidjson::Value&
runtime_metrics) {
return runtime_metrics.HasMember("deviceInfo") &&
runtime_metrics.HasMember("flowInfo") &&
runtime_metrics["flowInfo"].HasMember("versionedFlowSnapshotURI") &&
runtime_metrics["flowInfo"].HasMember("queues") &&
- runtime_metrics["flowInfo"].HasMember("components") &&
runtime_metrics["flowInfo"]["queues"].HasMember("8368e3c8-015a-1003-52ca-83af40ec1332")
&&
- runtime_metrics["flowInfo"]["components"].HasMember("FlowController") &&
- runtime_metrics["flowInfo"]["components"].HasMember("GenerateFlowFile")
&&
- runtime_metrics["flowInfo"]["components"].HasMember("LogAttribute");
+ runtime_metrics["flowInfo"].HasMember("processorStatuses") &&
+ [&]() {
+ if (runtime_metrics["flowInfo"]["processorStatuses"].GetArray().Size()
!= 2) {
+ return false;
+ }
+ for (const auto& processor :
runtime_metrics["flowInfo"]["processorStatuses"].GetArray()) { //
NOLINT(readability-use-anyofallof)
Review Comment:
same as above. It also looks like code duplication, consider extracting the
common code to a local helper function.
##########
libminifi/test/integration/C2MetricsTest.cpp:
##########
@@ -131,23 +134,67 @@ class MetricsHandler: public HeartbeatHandler {
runtime_metrics.HasMember("flowInfo") &&
runtime_metrics["flowInfo"].HasMember("versionedFlowSnapshotURI") &&
runtime_metrics["flowInfo"].HasMember("queues") &&
- runtime_metrics["flowInfo"].HasMember("components") &&
runtime_metrics["flowInfo"]["queues"].HasMember("2438e3c8-015a-1000-79ca-83af40ec1997")
&&
- runtime_metrics["flowInfo"]["components"].HasMember("FlowController") &&
- runtime_metrics["flowInfo"]["components"].HasMember("GetTCP") &&
- runtime_metrics["flowInfo"]["components"].HasMember("LogAttribute");
+ runtime_metrics["flowInfo"].HasMember("processorStatuses") &&
+ [&]() {
+ if (runtime_metrics["flowInfo"]["processorStatuses"].GetArray().Size()
!= 2) {
+ return false;
+ }
+ for (const auto& processor :
runtime_metrics["flowInfo"]["processorStatuses"].GetArray()) { //
NOLINT(readability-use-anyofallof)
Review Comment:
I side with the linter here, we should use `all_of`. The predicate callback
can still throw if necessary.
##########
libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp:
##########
@@ -26,27 +26,36 @@
LineByLineInputOutputStreamCallback::LineByLineInputOutputStreamCallback(Callbac
: callback_(std::move(callback)) {
}
-int64_t LineByLineInputOutputStreamCallback::operator()(const
std::shared_ptr<io::InputStream>& input, const
std::shared_ptr<io::OutputStream>& output) {
+io::ReadWriteResult LineByLineInputOutputStreamCallback::operator()(const
std::shared_ptr<io::InputStream>& input, const
std::shared_ptr<io::OutputStream>& output) {
gsl_Expects(input);
gsl_Expects(output);
+ io::ReadWriteResult result;
+
if (int64_t status = readInput(*input); status <= 0) {
- return status;
+ result.bytes_read = status;
+ return result;
}
- std::size_t total_bytes_written_ = 0;
+ result.bytes_read = gsl::narrow<int64_t>(input_.size());
+
+ std::size_t total_bytes_written = 0;
bool is_first_line = true;
readLine();
do {
readLine();
std::string output_line = callback_(*current_line_, is_first_line,
isLastLine());
const auto bytes_written = output->write(reinterpret_cast<const uint8_t
*>(output_line.data()), output_line.size());
- if (io::isError(bytes_written)) { return -1; }
- total_bytes_written_ += bytes_written;
+ if (io::isError(bytes_written)) {
+ result.bytes_written = gsl::narrow<int64_t>(bytes_written);
+ return result;
+ }
Review Comment:
did you mean to return the error code as `result.bytes_written`? It used to
return -1.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]