martinzink commented on a change in pull request #1066: URL: https://github.com/apache/nifi-minifi-cpp/pull/1066#discussion_r632397255
########## File path: extensions/pdh/PerformanceDataMonitor.cpp ########## @@ -0,0 +1,285 @@ +/** + * @file GenerateFlowFile.cpp + * GenerateFlowFile class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "PerformanceDataMonitor.h" +#include "PDHCounters.h" +#include "MemoryConsumptionCounter.h" +#include "utils/StringUtils.h" +#include "utils/JsonCallback.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +core::Relationship PerformanceDataMonitor::Success("success", "All files are routed to success"); + +core::Property PerformanceDataMonitor::PredefinedGroups( + core::PropertyBuilder::createProperty("Predefined Groups")-> + withDescription("Comma separated list from the allowable values, to monitor multiple common Windows Performance counters related to these groups")-> + withDefaultValue("")->build()); + +core::Property PerformanceDataMonitor::CustomPDHCounters( + core::PropertyBuilder::createProperty("Custom PDH Counters")-> + withDescription("Comma separated list of Windows Performance Counters to monitor")-> + withDefaultValue("")->build()); + +core::Property PerformanceDataMonitor::OutputFormatProperty( + core::PropertyBuilder::createProperty("Output Format")-> + withDescription("Format of the created flowfiles")-> + withAllowableValue<std::string>(JSON_FORMAT_STR)-> + withAllowableValue(OPEN_TELEMETRY_FORMAT_STR)-> + withDefaultValue(JSON_FORMAT_STR)->build()); + +PerformanceDataMonitor::~PerformanceDataMonitor() { + PdhCloseQuery(pdh_query_); +} + +void PerformanceDataMonitor::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& sessionFactory) { + setupMembersFromProperties(context); + + PdhOpenQueryA(nullptr, 0, &pdh_query_); + + for (auto it = resource_consumption_counters_.begin(); it != resource_consumption_counters_.end();) { + PDHCounter* pdh_counter = dynamic_cast<PDHCounter*> (it->get()); + if (pdh_counter != nullptr) { + PDH_STATUS add_to_query_result = pdh_counter->addToQuery(pdh_query_); + if (add_to_query_result != ERROR_SUCCESS) { + logger_->log_error("Error adding %s to query, error code: 0x%x", pdh_counter->getName(), add_to_query_result); + it = resource_consumption_counters_.erase(it); + continue; + } + } + ++it; + } + + PDH_STATUS collect_query_data_result = PdhCollectQueryData(pdh_query_); + if (ERROR_SUCCESS != collect_query_data_result) { + logger_->log_error("Error during PdhCollectQueryData, error code: 0x%x", collect_query_data_result); + } +} + +void PerformanceDataMonitor::onTrigger(core::ProcessContext* context, core::ProcessSession* session) { + if (resource_consumption_counters_.empty()) { + logger_->log_error("No valid counters for PerformanceDataMonitor"); + yield(); + return; + } + + std::shared_ptr<core::FlowFile> flowFile = session->create(); + if (!flowFile) { + logger_->log_error("Failed to create flowfile!"); + yield(); + return; + } + + PDH_STATUS collect_query_data_result = PdhCollectQueryData(pdh_query_); + if (ERROR_SUCCESS != collect_query_data_result) { + logger_->log_error("Error during PdhCollectQueryData, error code: 0x%x", collect_query_data_result); + yield(); + return; + } + + rapidjson::Document root = rapidjson::Document(rapidjson::kObjectType); + rapidjson::Value& body = prepareJSONBody(root); + for (auto& counter : resource_consumption_counters_) { + if (counter->collectData()) + counter->addToJson(body, root.GetAllocator()); + } + utils::JsonOutputCallback callback(std::move(root)); + session->write(flowFile, &callback); + session->transfer(flowFile, Success); +} + +void PerformanceDataMonitor::initialize() { + setSupportedProperties({ CustomPDHCounters, PredefinedGroups, OutputFormatProperty }); + setSupportedRelationships({ PerformanceDataMonitor::Success }); +} + +rapidjson::Value& PerformanceDataMonitor::prepareJSONBody(rapidjson::Document& root) { + switch (output_format_) { + case OutputFormat::OPENTELEMETRY: + root.AddMember("Name", "PerformanceData", root.GetAllocator()); + root.AddMember("Timestamp", std::time(0), root.GetAllocator()); + root.AddMember("Body", rapidjson::Value{ rapidjson::kObjectType }, root.GetAllocator()); + return root["Body"]; + case OutputFormat::JSON: + return root; + default: + return root; + } +} + +void add_cpu_related_counters(std::vector<std::unique_ptr<PerformanceDataCounter>>& resource_consumption_counters) { + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Processor(*)\\% Processor Time")); + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Processor(*)\\% User Time")); + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Processor(*)\\% Privileged Time")); +} + +void add_io_related_counters(std::vector<std::unique_ptr<PerformanceDataCounter>>& resource_consumption_counters) { + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Process(_Total)\\IO Read Bytes/sec")); + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Process(_Total)\\IO Write Bytes/sec")); +} + +void add_disk_related_counters(std::vector<std::unique_ptr<PerformanceDataCounter>>& resource_consumption_counters) { + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\LogicalDisk(*)\\% Free Space")); + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\LogicalDisk(*)\\Free Megabytes", false)); + + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\% Disk Read Time")); + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\% Disk Time")); + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\% Disk Write Time")); + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\% Idle Time")); + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\Avg. Disk Bytes/Transfer")); + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\Avg. Disk Bytes/Read")); + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\Avg. Disk Bytes/Write")); + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\Avg. Disk Write Queue Length")); + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\Avg. Disk Read Queue Length")); + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\Avg. Disk Queue Length")); + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\Avg. Disk sec/Transfer")); + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\Avg. Disk sec/Read")); + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\Avg. Disk sec/Write")); + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\Current Disk Queue Length", false)); + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\Disk Transfers/sec")); + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\Disk Reads/sec")); + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\Disk Writes/sec")); + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\Disk Bytes/sec")); + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\Disk Read Bytes/sec")); + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\Disk Write Bytes/sec")); + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\Split IO/Sec")); +} + +void add_network_related_counters(std::vector<std::unique_ptr<PerformanceDataCounter>>& resource_consumption_counters) { + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Network Interface(*)\\Bytes Received/sec")); + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Network Interface(*)\\Bytes Sent/sec")); + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Network Interface(*)\\Bytes Total/sec")); + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Network Interface(*)\\Current Bandwidth", false)); + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Network Interface(*)\\Packets/sec")); + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Network Interface(*)\\Packets Received/sec")); + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Network Interface(*)\\Packets Sent/sec")); + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Network Interface(*)\\Packets Received Discarded", false)); + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Network Interface(*)\\Packets Received Errors", false)); + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Network Interface(*)\\Packets Received Unknown", false)); + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Network Interface(*)\\Packets Received Non-Unicast/sec")); + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Network Interface(*)\\Packets Received Unicast/sec")); + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Network Interface(*)\\Packets Sent Unicast/sec")); + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Network Interface(*)\\Packets Sent Non-Unicast/sec")); +} + +void add_memory_related_counters(std::vector<std::unique_ptr<PerformanceDataCounter>>& resource_consumption_counters) { + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Memory\\% Committed Bytes In Use")); + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Memory\\Available MBytes", false)); + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Memory\\Page Faults/sec")); + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Memory\\Pages/sec")); + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Paging File(_Total)\\% Usage")); + + resource_consumption_counters.push_back(std::make_unique<MemoryConsumptionCounter>()); +} + +void add_process_related_counters(std::vector<std::unique_ptr<PerformanceDataCounter>>& resource_consumption_counters) { + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Process(*)\\% Processor Time")); + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Process(*)\\Elapsed Time")); + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Process(*)\\ID Process", false)); + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Process(*)\\Private Bytes", false)); +} + +void add_system_related_counters(std::vector<std::unique_ptr<PerformanceDataCounter>>& resource_consumption_counters) { + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\System\\% Registry Quota In Use")); + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\System\\Context Switches/sec")); + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\System\\File Control Bytes/sec")); + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\System\\File Control Operations/sec")); + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\System\\File Read Bytes/sec")); + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\System\\File Read Operations/sec")); + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\System\\File Write Bytes/sec")); + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\System\\File Write Operations/sec")); + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\System\\File Data Operations/sec")); + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\System\\Processes", false)); + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\System\\Processor Queue Length", false)); + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\System\\System Calls/sec")); + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\System\\System Up Time")); + resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\System\\Threads", false)); +} + +void PerformanceDataMonitor::addCountersFromPredefinedGroupsProperty(const std::string& predefined_groups) { + auto groups = utils::StringUtils::split(predefined_groups, ","); Review comment: awesome, replaced it looks much cleaner [a1e5bdf](https://github.com/martinzink/nifi-minifi-cpp/commit/a1e5bdfae1284b3308e315df28b02a1e246ffaa4) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org