fgerlits commented on a change in pull request #1066:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1066#discussion_r629991357
##########
File path: CMakeLists.txt
##########
@@ -501,6 +501,14 @@ if (ENABLE_ALL OR ENABLE_AWS)
createExtension(AWS-EXTENSIONS "AWS EXTENSIONS" "This enables AWS
support" "extensions/aws" "${TEST_DIR}/aws-tests")
endif()
+## PDH Extentions
Review comment:
typo: Exten(t -> s)ions
##########
File path: extensions/pdh/MemoryConsumptionCounter.h
##########
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "PerformanceDataCounter.h"
+#include <string>
+#include "utils/OsUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class MemoryConsumptionCounter : public PerformanceDataCounter {
+ public:
+ MemoryConsumptionCounter() {
+ }
+
+ void addToJson(rapidjson::Value& body, rapidjson::Document::AllocatorType&
alloc) const override {
+ rapidjson::Value& group_node = acquireNode(std::string("Memory"), body,
alloc);
+
+ rapidjson::Value total_pysical_memory;
Review comment:
typo: pysical -> physical
##########
File path: extensions/pdh/PerformanceDataMonitor.h
##########
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <pdh.h>
+#include <string>
+#include <vector>
+#include <memory>
+#include <utility>
+
+#include "core/Processor.h"
+
+#include "rapidjson/stream.h"
+#include "rapidjson/writer.h"
+#include "rapidjson/prettywriter.h"
+
+#include "PerformanceDataCounter.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+// PerformanceDataMonitor Class
+class PerformanceDataMonitor : public core::Processor {
+ public:
+ static constexpr char const* JSON_FORMAT_STR = "JSON";
+ static constexpr char const* OPEN_TELEMETRY_FORMAT_STR = "OpenTelemetry";
+
+ explicit PerformanceDataMonitor(const std::string& name, utils::Identifier
uuid = utils::Identifier())
+ : Processor(name, uuid), output_format_(OutputFormat::kJSON),
+ logger_(logging::LoggerFactory<PerformanceDataMonitor>::getLogger()),
+ pdh_query_(nullptr), resource_consumption_counters_() {
+ }
+ ~PerformanceDataMonitor() override;
+ static constexpr char const* ProcessorName = "PerformanceDataMonitor";
+ // Supported Properties
+ static core::Property PredefinedGroups;
+ static core::Property CustomPDHCounters;
+ static core::Property OutputFormatProperty;
+ // Supported Relationships
+ static core::Relationship Success;
+ // Nest Callback Class for write stream
+ class WriteCallback : public OutputStreamCallback {
Review comment:
This class could be useful elsewhere, too. If you gave it a better name
(eg. `JSONFlowFileWriterCallback`), I think it could live in `libminifi/utils`.
##########
File path: extensions/pdh/PerformanceDataMonitor.cpp
##########
@@ -0,0 +1,283 @@
+/**
+ * @file GenerateFlowFile.cpp
+ * GenerateFlowFile class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "PerformanceDataMonitor.h"
+#include "PDHCounters.h"
+#include "MemoryConsumptionCounter.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Relationship PerformanceDataMonitor::Success("success", "All files are
routed to success");
+
+core::Property PerformanceDataMonitor::PredefinedGroups(
+ core::PropertyBuilder::createProperty("Predefined Groups")->
+ withDescription("Comma separated list to which predefined groups should be
included")->
+ withDefaultValue("")->build());
+
+core::Property PerformanceDataMonitor::CustomPDHCounters(
+ core::PropertyBuilder::createProperty("Custom PDH Counters")->
+ withDescription("Comma separated list of PDHCounters to collect from")->
+ withDefaultValue("")->build());
+
+core::Property PerformanceDataMonitor::OutputFormatProperty(
+ core::PropertyBuilder::createProperty("Output format")->
+ withDescription("Format of the created flowfiles")->
+ withAllowableValue<std::string>(JSON_FORMAT_STR)->
+ withAllowableValue(OPEN_TELEMETRY_FORMAT_STR)->
+ withDefaultValue(JSON_FORMAT_STR)->build());
+
+PerformanceDataMonitor::~PerformanceDataMonitor() {
+ if (pdh_query_ != nullptr)
+ PdhCloseQuery(pdh_query_);
+ for (PerformanceDataCounter* resource_consumption_counter :
resource_consumption_counters_) {
+ delete resource_consumption_counter;
+ }
+}
+
+void PerformanceDataMonitor::onSchedule(const
std::shared_ptr<core::ProcessContext>& context, const
std::shared_ptr<core::ProcessSessionFactory>& sessionFactory) {
+ setupMembersFromProperties(context);
+
+ std::vector<PerformanceDataCounter*> valid_counters;
+
+ for (PerformanceDataCounter* counter : resource_consumption_counters_) {
+ PDHCounterBase* pdh_counter = dynamic_cast<PDHCounterBase*> (counter);
+ if (pdh_counter != nullptr) {
+ if (pdh_query_ == nullptr)
+ PdhOpenQuery(NULL, NULL, &pdh_query_);
+ PDH_STATUS add_to_query_result = pdh_counter->addToQuery(pdh_query_);
+ if (add_to_query_result != ERROR_SUCCESS) {
+ logger_->log_error(("Error adding " + pdh_counter->getName() + " to
query: " + std::to_string(add_to_query_result)).c_str());
+ delete counter;
+ } else {
+ valid_counters.push_back(counter);
+ }
+ } else {
+ valid_counters.push_back(counter);
+ }
+ }
+ resource_consumption_counters_ = valid_counters;
+ PdhCollectQueryData(pdh_query_);
+}
+
+void PerformanceDataMonitor::onTrigger(core::ProcessContext* context,
core::ProcessSession* session) {
+ if (resource_consumption_counters_.empty()) {
+ logger_->log_error("No valid counters for PerformanceDataMonitor");
+ return;
+ }
+
+ std::shared_ptr<core::FlowFile> flowFile = session->create();
+ if (!flowFile) {
+ logger_->log_error("Failed to create flowfile!");
+ return;
+ }
+
+ if (pdh_query_ != nullptr)
+ PdhCollectQueryData(pdh_query_);
+
+ rapidjson::Document root = rapidjson::Document(rapidjson::kObjectType);
+ rapidjson::Value& body = prepareJSONBody(root);
+ for (PerformanceDataCounter* counter : resource_consumption_counters_) {
+ PDHCounterBase* pdh_counter = dynamic_cast<PDHCounterBase*>(counter);
+ if (pdh_counter != nullptr && pdh_counter->collectData() != ERROR_SUCCESS)
+ continue;
+ counter->addToJson(body, root.GetAllocator());
+ }
+ PerformanceDataMonitor::WriteCallback callback(std::move(root));
+ session->write(flowFile, &callback);
+ session->transfer(flowFile, Success);
+}
+
+void PerformanceDataMonitor::initialize(void) {
+ setSupportedProperties({ CustomPDHCounters, PredefinedGroups,
OutputFormatProperty });
+ setSupportedRelationships({ PerformanceDataMonitor::Success });
+}
+
+rapidjson::Value& PerformanceDataMonitor::prepareJSONBody(rapidjson::Document&
root) {
+ switch (output_format_) {
+ case (OutputFormat::kOpenTelemetry):
+ root.AddMember("Name", "PerformanceData", root.GetAllocator());
+ root.AddMember("Timestamp", std::time(0), root.GetAllocator());
+ root.AddMember("Body", rapidjson::Value{ rapidjson::kObjectType },
root.GetAllocator());
+ return root["Body"];
+ case (OutputFormat::kJSON):
+ return root;
+ default:
+ return root;
+ }
+}
+
+void add_cpu_related_counters(std::vector<PerformanceDataCounter*>&
resource_consumption_counters) {
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\Processor(*)\\%
Processor Time"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\Processor(*)\\%
User Time"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\Processor(*)\\%
Privileged Time"));
+}
+
+void add_io_related_counters(std::vector<PerformanceDataCounter*>&
resource_consumption_counters) {
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\Process(_Total)\\IO
Read Bytes/sec"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\Process(_Total)\\IO
Write Bytes/sec"));
+}
+
+void add_disk_related_counters(std::vector<PerformanceDataCounter*>&
resource_consumption_counters) {
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\LogicalDisk(*)\\%
Free Space"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\LogicalDisk(*)\\Free
Megabytes", false));
+
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\PhysicalDisk(*)\\%
Disk Read Time"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\PhysicalDisk(*)\\%
Disk Time"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\PhysicalDisk(*)\\%
Disk Write Time"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\PhysicalDisk(*)\\%
Idle Time"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\PhysicalDisk(*)\\Avg.
Disk Bytes/Transfer"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\PhysicalDisk(*)\\Avg.
Disk Bytes/Read"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\PhysicalDisk(*)\\Avg.
Disk Bytes/Write"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\PhysicalDisk(*)\\Avg.
Disk Write Queue Length"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\PhysicalDisk(*)\\Avg.
Disk Read Queue Length"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\PhysicalDisk(*)\\Avg.
Disk Queue Length"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\PhysicalDisk(*)\\Avg.
Disk sec/Transfer"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\PhysicalDisk(*)\\Avg.
Disk sec/Read"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\PhysicalDisk(*)\\Avg.
Disk sec/Write"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\PhysicalDisk(*)\\Current
Disk Queue Length"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\PhysicalDisk(*)\\Disk
Transfers/sec"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\PhysicalDisk(*)\\Disk
Reads/sec"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\PhysicalDisk(*)\\Disk
Writes/sec"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\PhysicalDisk(*)\\Disk
Bytes/sec"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\PhysicalDisk(*)\\Disk
Read Bytes/sec"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\PhysicalDisk(*)\\Disk
Write Bytes/sec"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\PhysicalDisk(*)\\Split
IO/Sec"));
+}
+
+void add_network_related_counters(std::vector<PerformanceDataCounter*>&
resource_consumption_counters) {
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\Network
Interface(*)\\Bytes Received/sec"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\Network
Interface(*)\\Bytes Sent/sec"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\Network
Interface(*)\\Bytes Total/sec"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\Network
Interface(*)\\Current Bandwidth"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\Network
Interface(*)\\Packets/sec"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\Network
Interface(*)\\Packets Received/sec"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\Network
Interface(*)\\Packets Sent/sec"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\Network
Interface(*)\\Packets Received Discarded", false));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\Network
Interface(*)\\Packets Received Errors", false));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\Network
Interface(*)\\Packets Received Unknown", false));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\Network
Interface(*)\\Packets Received Non-Unicast/sec"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\Network
Interface(*)\\Packets Received Unicast/sec"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\Network
Interface(*)\\Packets Sent Unicast/sec"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\Network
Interface(*)\\Packets Sent Non-Unicast/sec"));
+}
+
+void add_memory_related_counters(std::vector<PerformanceDataCounter*>&
resource_consumption_counters) {
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\Memory\\%
Committed Bytes In Use"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\Memory\\Available
MBytes"));
Review comment:
should this be an integer value (ie. created with `..., false`)?
##########
File path: extensions/pdh/PerformanceDataMonitor.h
##########
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <pdh.h>
+#include <string>
+#include <vector>
+#include <memory>
+#include <utility>
+
+#include "core/Processor.h"
+
+#include "rapidjson/stream.h"
+#include "rapidjson/writer.h"
+#include "rapidjson/prettywriter.h"
+
+#include "PerformanceDataCounter.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+// PerformanceDataMonitor Class
+class PerformanceDataMonitor : public core::Processor {
+ public:
+ static constexpr char const* JSON_FORMAT_STR = "JSON";
+ static constexpr char const* OPEN_TELEMETRY_FORMAT_STR = "OpenTelemetry";
Review comment:
very minor, but why `char const` instead of the more usual `const char`?
##########
File path: extensions/pdh/PerformanceDataMonitor.cpp
##########
@@ -0,0 +1,283 @@
+/**
+ * @file GenerateFlowFile.cpp
+ * GenerateFlowFile class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "PerformanceDataMonitor.h"
+#include "PDHCounters.h"
+#include "MemoryConsumptionCounter.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Relationship PerformanceDataMonitor::Success("success", "All files are
routed to success");
+
+core::Property PerformanceDataMonitor::PredefinedGroups(
+ core::PropertyBuilder::createProperty("Predefined Groups")->
+ withDescription("Comma separated list to which predefined groups should be
included")->
+ withDefaultValue("")->build());
+
+core::Property PerformanceDataMonitor::CustomPDHCounters(
+ core::PropertyBuilder::createProperty("Custom PDH Counters")->
+ withDescription("Comma separated list of PDHCounters to collect from")->
+ withDefaultValue("")->build());
+
+core::Property PerformanceDataMonitor::OutputFormatProperty(
+ core::PropertyBuilder::createProperty("Output format")->
+ withDescription("Format of the created flowfiles")->
+ withAllowableValue<std::string>(JSON_FORMAT_STR)->
+ withAllowableValue(OPEN_TELEMETRY_FORMAT_STR)->
+ withDefaultValue(JSON_FORMAT_STR)->build());
+
+PerformanceDataMonitor::~PerformanceDataMonitor() {
+ if (pdh_query_ != nullptr)
+ PdhCloseQuery(pdh_query_);
+ for (PerformanceDataCounter* resource_consumption_counter :
resource_consumption_counters_) {
+ delete resource_consumption_counter;
+ }
+}
+
+void PerformanceDataMonitor::onSchedule(const
std::shared_ptr<core::ProcessContext>& context, const
std::shared_ptr<core::ProcessSessionFactory>& sessionFactory) {
+ setupMembersFromProperties(context);
+
+ std::vector<PerformanceDataCounter*> valid_counters;
+
+ for (PerformanceDataCounter* counter : resource_consumption_counters_) {
+ PDHCounterBase* pdh_counter = dynamic_cast<PDHCounterBase*> (counter);
+ if (pdh_counter != nullptr) {
+ if (pdh_query_ == nullptr)
+ PdhOpenQuery(NULL, NULL, &pdh_query_);
+ PDH_STATUS add_to_query_result = pdh_counter->addToQuery(pdh_query_);
+ if (add_to_query_result != ERROR_SUCCESS) {
+ logger_->log_error(("Error adding " + pdh_counter->getName() + " to
query: " + std::to_string(add_to_query_result)).c_str());
+ delete counter;
+ } else {
+ valid_counters.push_back(counter);
+ }
+ } else {
+ valid_counters.push_back(counter);
+ }
+ }
+ resource_consumption_counters_ = valid_counters;
+ PdhCollectQueryData(pdh_query_);
+}
+
+void PerformanceDataMonitor::onTrigger(core::ProcessContext* context,
core::ProcessSession* session) {
+ if (resource_consumption_counters_.empty()) {
+ logger_->log_error("No valid counters for PerformanceDataMonitor");
+ return;
Review comment:
We could `yield()` here, and also before line 92, to make the
misconfigured or failing processor get called less frequently.
##########
File path: extensions/pdh/PerformanceDataMonitor.h
##########
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <pdh.h>
+#include <string>
+#include <vector>
+#include <memory>
+#include <utility>
+
+#include "core/Processor.h"
+
+#include "rapidjson/stream.h"
+#include "rapidjson/writer.h"
+#include "rapidjson/prettywriter.h"
+
+#include "PerformanceDataCounter.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+// PerformanceDataMonitor Class
+class PerformanceDataMonitor : public core::Processor {
+ public:
+ static constexpr char const* JSON_FORMAT_STR = "JSON";
+ static constexpr char const* OPEN_TELEMETRY_FORMAT_STR = "OpenTelemetry";
+
+ explicit PerformanceDataMonitor(const std::string& name, utils::Identifier
uuid = utils::Identifier())
+ : Processor(name, uuid), output_format_(OutputFormat::kJSON),
+ logger_(logging::LoggerFactory<PerformanceDataMonitor>::getLogger()),
+ pdh_query_(nullptr), resource_consumption_counters_() {
+ }
+ ~PerformanceDataMonitor() override;
+ static constexpr char const* ProcessorName = "PerformanceDataMonitor";
+ // Supported Properties
+ static core::Property PredefinedGroups;
+ static core::Property CustomPDHCounters;
+ static core::Property OutputFormatProperty;
+ // Supported Relationships
+ static core::Relationship Success;
+ // Nest Callback Class for write stream
+ class WriteCallback : public OutputStreamCallback {
+ public:
+ explicit WriteCallback(rapidjson::Document&& root) :
root_(std::move(root)) {
+ }
+ rapidjson::Document root_;
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
+ rapidjson::StringBuffer buffer;
+ rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(buffer);
+ root_.Accept(writer);
+ return stream->write(reinterpret_cast<const
uint8_t*>(buffer.GetString()), gsl::narrow<int>(buffer.GetSize()));
+ }
+ };
+
+ public:
+ void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const
std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
+ void onTrigger(core::ProcessContext *context, core::ProcessSession *session)
override;
+ void initialize(void) override;
Review comment:
we usually have `()` instead of the C-style `(void)`
##########
File path: extensions/pdh/PDHCounters.cpp
##########
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "PDHCounters.h"
+#include "utils/StringUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+DWORD PDHCounterBase::getDWFormat() const {
+ return is_double_format_ ? PDH_FMT_DOUBLE : PDH_FMT_LARGE;
+}
+
+PDHCounterBase* PDHCounterBase::createPDHCounter(const std::string&
query_name, bool is_double) {
+ auto groups = utils::StringUtils::split(query_name, "\\");
+ if (groups.size() != 2 || query_name.substr(0, 1) != "\\")
+ return nullptr;
+ if (query_name.find("(*)") != std::string::npos) {
+ return new PDHCounterArray(query_name, is_double);
+ } else {
+ return new PDHCounter(query_name, is_double);
+ }
+}
+
+const std::string& PDHCounterBase::getName() const {
+ return pdh_english_counter_name_;
+}
+
+std::string PDHCounterBase::getObjectName() const {
+ auto groups = utils::StringUtils::split(pdh_english_counter_name_, "\\");
+ return groups[0];
+}
+
+std::string PDHCounterBase::getCounterName() const {
+ auto groups = utils::StringUtils::split(pdh_english_counter_name_, "\\");
+ return groups[1];
+}
+
+void PDHCounter::addToJson(rapidjson::Value& body,
rapidjson::Document::AllocatorType& alloc) const {
+ rapidjson::Value key(getCounterName().c_str(), getCounterName().length(),
alloc);
+ rapidjson::Value& group_node = acquireNode(getObjectName(), body, alloc);
+ group_node.AddMember(key, getValue(), alloc);
+}
+
+PDH_STATUS PDHCounter::addToQuery(PDH_HQUERY& pdh_query) {
+ return PdhAddEnglishCounter(pdh_query, pdh_english_counter_name_.c_str(),
NULL, &counter_);
Review comment:
I think it is better to explicitly use the Ascii version of the Windows
system calls (`PdhAddEnglishCounterA` in this case) and not to let Visual
Studio choose based on the `UNICODE` compile flag. (Except if we handled the
flag and used wide strings when it is set, which we don't do here, and probably
shouldn't.)
##########
File path: extensions/pdh/PerformanceDataMonitor.cpp
##########
@@ -0,0 +1,283 @@
+/**
+ * @file GenerateFlowFile.cpp
+ * GenerateFlowFile class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "PerformanceDataMonitor.h"
+#include "PDHCounters.h"
+#include "MemoryConsumptionCounter.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Relationship PerformanceDataMonitor::Success("success", "All files are
routed to success");
+
+core::Property PerformanceDataMonitor::PredefinedGroups(
+ core::PropertyBuilder::createProperty("Predefined Groups")->
+ withDescription("Comma separated list to which predefined groups should be
included")->
+ withDefaultValue("")->build());
+
+core::Property PerformanceDataMonitor::CustomPDHCounters(
+ core::PropertyBuilder::createProperty("Custom PDH Counters")->
+ withDescription("Comma separated list of PDHCounters to collect from")->
+ withDefaultValue("")->build());
+
+core::Property PerformanceDataMonitor::OutputFormatProperty(
+ core::PropertyBuilder::createProperty("Output format")->
+ withDescription("Format of the created flowfiles")->
+ withAllowableValue<std::string>(JSON_FORMAT_STR)->
+ withAllowableValue(OPEN_TELEMETRY_FORMAT_STR)->
+ withDefaultValue(JSON_FORMAT_STR)->build());
+
+PerformanceDataMonitor::~PerformanceDataMonitor() {
+ if (pdh_query_ != nullptr)
+ PdhCloseQuery(pdh_query_);
+ for (PerformanceDataCounter* resource_consumption_counter :
resource_consumption_counters_) {
+ delete resource_consumption_counter;
+ }
+}
+
+void PerformanceDataMonitor::onSchedule(const
std::shared_ptr<core::ProcessContext>& context, const
std::shared_ptr<core::ProcessSessionFactory>& sessionFactory) {
+ setupMembersFromProperties(context);
+
+ std::vector<PerformanceDataCounter*> valid_counters;
+
+ for (PerformanceDataCounter* counter : resource_consumption_counters_) {
+ PDHCounterBase* pdh_counter = dynamic_cast<PDHCounterBase*> (counter);
+ if (pdh_counter != nullptr) {
+ if (pdh_query_ == nullptr)
+ PdhOpenQuery(NULL, NULL, &pdh_query_);
+ PDH_STATUS add_to_query_result = pdh_counter->addToQuery(pdh_query_);
+ if (add_to_query_result != ERROR_SUCCESS) {
+ logger_->log_error(("Error adding " + pdh_counter->getName() + " to
query: " + std::to_string(add_to_query_result)).c_str());
+ delete counter;
+ } else {
+ valid_counters.push_back(counter);
+ }
+ } else {
+ valid_counters.push_back(counter);
+ }
+ }
+ resource_consumption_counters_ = valid_counters;
+ PdhCollectQueryData(pdh_query_);
+}
+
+void PerformanceDataMonitor::onTrigger(core::ProcessContext* context,
core::ProcessSession* session) {
+ if (resource_consumption_counters_.empty()) {
+ logger_->log_error("No valid counters for PerformanceDataMonitor");
+ return;
+ }
+
+ std::shared_ptr<core::FlowFile> flowFile = session->create();
+ if (!flowFile) {
+ logger_->log_error("Failed to create flowfile!");
+ return;
+ }
+
+ if (pdh_query_ != nullptr)
+ PdhCollectQueryData(pdh_query_);
+
+ rapidjson::Document root = rapidjson::Document(rapidjson::kObjectType);
+ rapidjson::Value& body = prepareJSONBody(root);
+ for (PerformanceDataCounter* counter : resource_consumption_counters_) {
+ PDHCounterBase* pdh_counter = dynamic_cast<PDHCounterBase*>(counter);
+ if (pdh_counter != nullptr && pdh_counter->collectData() != ERROR_SUCCESS)
+ continue;
+ counter->addToJson(body, root.GetAllocator());
+ }
+ PerformanceDataMonitor::WriteCallback callback(std::move(root));
+ session->write(flowFile, &callback);
+ session->transfer(flowFile, Success);
+}
+
+void PerformanceDataMonitor::initialize(void) {
+ setSupportedProperties({ CustomPDHCounters, PredefinedGroups,
OutputFormatProperty });
+ setSupportedRelationships({ PerformanceDataMonitor::Success });
+}
+
+rapidjson::Value& PerformanceDataMonitor::prepareJSONBody(rapidjson::Document&
root) {
+ switch (output_format_) {
+ case (OutputFormat::kOpenTelemetry):
+ root.AddMember("Name", "PerformanceData", root.GetAllocator());
+ root.AddMember("Timestamp", std::time(0), root.GetAllocator());
+ root.AddMember("Body", rapidjson::Value{ rapidjson::kObjectType },
root.GetAllocator());
+ return root["Body"];
+ case (OutputFormat::kJSON):
+ return root;
+ default:
+ return root;
+ }
+}
+
+void add_cpu_related_counters(std::vector<PerformanceDataCounter*>&
resource_consumption_counters) {
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\Processor(*)\\%
Processor Time"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\Processor(*)\\%
User Time"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\Processor(*)\\%
Privileged Time"));
+}
+
+void add_io_related_counters(std::vector<PerformanceDataCounter*>&
resource_consumption_counters) {
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\Process(_Total)\\IO
Read Bytes/sec"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\Process(_Total)\\IO
Write Bytes/sec"));
+}
+
+void add_disk_related_counters(std::vector<PerformanceDataCounter*>&
resource_consumption_counters) {
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\LogicalDisk(*)\\%
Free Space"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\LogicalDisk(*)\\Free
Megabytes", false));
+
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\PhysicalDisk(*)\\%
Disk Read Time"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\PhysicalDisk(*)\\%
Disk Time"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\PhysicalDisk(*)\\%
Disk Write Time"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\PhysicalDisk(*)\\%
Idle Time"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\PhysicalDisk(*)\\Avg.
Disk Bytes/Transfer"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\PhysicalDisk(*)\\Avg.
Disk Bytes/Read"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\PhysicalDisk(*)\\Avg.
Disk Bytes/Write"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\PhysicalDisk(*)\\Avg.
Disk Write Queue Length"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\PhysicalDisk(*)\\Avg.
Disk Read Queue Length"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\PhysicalDisk(*)\\Avg.
Disk Queue Length"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\PhysicalDisk(*)\\Avg.
Disk sec/Transfer"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\PhysicalDisk(*)\\Avg.
Disk sec/Read"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\PhysicalDisk(*)\\Avg.
Disk sec/Write"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\PhysicalDisk(*)\\Current
Disk Queue Length"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\PhysicalDisk(*)\\Disk
Transfers/sec"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\PhysicalDisk(*)\\Disk
Reads/sec"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\PhysicalDisk(*)\\Disk
Writes/sec"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\PhysicalDisk(*)\\Disk
Bytes/sec"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\PhysicalDisk(*)\\Disk
Read Bytes/sec"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\PhysicalDisk(*)\\Disk
Write Bytes/sec"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\PhysicalDisk(*)\\Split
IO/Sec"));
+}
+
+void add_network_related_counters(std::vector<PerformanceDataCounter*>&
resource_consumption_counters) {
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\Network
Interface(*)\\Bytes Received/sec"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\Network
Interface(*)\\Bytes Sent/sec"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\Network
Interface(*)\\Bytes Total/sec"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\Network
Interface(*)\\Current Bandwidth"));
Review comment:
should this be an integer value (ie. created with `..., false`)?
##########
File path: extensions/pdh/PerformanceDataMonitor.cpp
##########
@@ -0,0 +1,283 @@
+/**
+ * @file GenerateFlowFile.cpp
+ * GenerateFlowFile class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "PerformanceDataMonitor.h"
+#include "PDHCounters.h"
+#include "MemoryConsumptionCounter.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Relationship PerformanceDataMonitor::Success("success", "All files are
routed to success");
+
+core::Property PerformanceDataMonitor::PredefinedGroups(
+ core::PropertyBuilder::createProperty("Predefined Groups")->
+ withDescription("Comma separated list to which predefined groups should be
included")->
+ withDefaultValue("")->build());
+
+core::Property PerformanceDataMonitor::CustomPDHCounters(
+ core::PropertyBuilder::createProperty("Custom PDH Counters")->
+ withDescription("Comma separated list of PDHCounters to collect from")->
+ withDefaultValue("")->build());
+
+core::Property PerformanceDataMonitor::OutputFormatProperty(
+ core::PropertyBuilder::createProperty("Output format")->
+ withDescription("Format of the created flowfiles")->
+ withAllowableValue<std::string>(JSON_FORMAT_STR)->
+ withAllowableValue(OPEN_TELEMETRY_FORMAT_STR)->
+ withDefaultValue(JSON_FORMAT_STR)->build());
+
+PerformanceDataMonitor::~PerformanceDataMonitor() {
+ if (pdh_query_ != nullptr)
+ PdhCloseQuery(pdh_query_);
+ for (PerformanceDataCounter* resource_consumption_counter :
resource_consumption_counters_) {
+ delete resource_consumption_counter;
+ }
+}
+
+void PerformanceDataMonitor::onSchedule(const
std::shared_ptr<core::ProcessContext>& context, const
std::shared_ptr<core::ProcessSessionFactory>& sessionFactory) {
+ setupMembersFromProperties(context);
+
+ std::vector<PerformanceDataCounter*> valid_counters;
+
+ for (PerformanceDataCounter* counter : resource_consumption_counters_) {
+ PDHCounterBase* pdh_counter = dynamic_cast<PDHCounterBase*> (counter);
+ if (pdh_counter != nullptr) {
+ if (pdh_query_ == nullptr)
+ PdhOpenQuery(NULL, NULL, &pdh_query_);
+ PDH_STATUS add_to_query_result = pdh_counter->addToQuery(pdh_query_);
+ if (add_to_query_result != ERROR_SUCCESS) {
+ logger_->log_error(("Error adding " + pdh_counter->getName() + " to
query: " + std::to_string(add_to_query_result)).c_str());
+ delete counter;
+ } else {
+ valid_counters.push_back(counter);
+ }
+ } else {
+ valid_counters.push_back(counter);
+ }
+ }
+ resource_consumption_counters_ = valid_counters;
+ PdhCollectQueryData(pdh_query_);
Review comment:
Does it make sense to call `PdhCollectQueryData` with a null argument?
I think we should check `pdh_query_`, and throw here in `onSchedule()` if it is
null. Then the null check could be removed from `onTrigger()`.
##########
File path: extensions/pdh/PerformanceDataMonitor.cpp
##########
@@ -0,0 +1,283 @@
+/**
+ * @file GenerateFlowFile.cpp
+ * GenerateFlowFile class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "PerformanceDataMonitor.h"
+#include "PDHCounters.h"
+#include "MemoryConsumptionCounter.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Relationship PerformanceDataMonitor::Success("success", "All files are
routed to success");
+
+core::Property PerformanceDataMonitor::PredefinedGroups(
+ core::PropertyBuilder::createProperty("Predefined Groups")->
+ withDescription("Comma separated list to which predefined groups should be
included")->
+ withDefaultValue("")->build());
+
+core::Property PerformanceDataMonitor::CustomPDHCounters(
+ core::PropertyBuilder::createProperty("Custom PDH Counters")->
+ withDescription("Comma separated list of PDHCounters to collect from")->
+ withDefaultValue("")->build());
+
+core::Property PerformanceDataMonitor::OutputFormatProperty(
+ core::PropertyBuilder::createProperty("Output format")->
+ withDescription("Format of the created flowfiles")->
+ withAllowableValue<std::string>(JSON_FORMAT_STR)->
+ withAllowableValue(OPEN_TELEMETRY_FORMAT_STR)->
+ withDefaultValue(JSON_FORMAT_STR)->build());
+
+PerformanceDataMonitor::~PerformanceDataMonitor() {
+ if (pdh_query_ != nullptr)
+ PdhCloseQuery(pdh_query_);
+ for (PerformanceDataCounter* resource_consumption_counter :
resource_consumption_counters_) {
+ delete resource_consumption_counter;
+ }
+}
+
+void PerformanceDataMonitor::onSchedule(const
std::shared_ptr<core::ProcessContext>& context, const
std::shared_ptr<core::ProcessSessionFactory>& sessionFactory) {
+ setupMembersFromProperties(context);
+
+ std::vector<PerformanceDataCounter*> valid_counters;
+
+ for (PerformanceDataCounter* counter : resource_consumption_counters_) {
+ PDHCounterBase* pdh_counter = dynamic_cast<PDHCounterBase*> (counter);
+ if (pdh_counter != nullptr) {
+ if (pdh_query_ == nullptr)
+ PdhOpenQuery(NULL, NULL, &pdh_query_);
+ PDH_STATUS add_to_query_result = pdh_counter->addToQuery(pdh_query_);
+ if (add_to_query_result != ERROR_SUCCESS) {
+ logger_->log_error(("Error adding " + pdh_counter->getName() + " to
query: " + std::to_string(add_to_query_result)).c_str());
+ delete counter;
+ } else {
+ valid_counters.push_back(counter);
+ }
+ } else {
+ valid_counters.push_back(counter);
+ }
+ }
+ resource_consumption_counters_ = valid_counters;
+ PdhCollectQueryData(pdh_query_);
+}
+
+void PerformanceDataMonitor::onTrigger(core::ProcessContext* context,
core::ProcessSession* session) {
+ if (resource_consumption_counters_.empty()) {
+ logger_->log_error("No valid counters for PerformanceDataMonitor");
+ return;
+ }
+
+ std::shared_ptr<core::FlowFile> flowFile = session->create();
+ if (!flowFile) {
+ logger_->log_error("Failed to create flowfile!");
+ return;
+ }
+
+ if (pdh_query_ != nullptr)
+ PdhCollectQueryData(pdh_query_);
+
+ rapidjson::Document root = rapidjson::Document(rapidjson::kObjectType);
+ rapidjson::Value& body = prepareJSONBody(root);
+ for (PerformanceDataCounter* counter : resource_consumption_counters_) {
+ PDHCounterBase* pdh_counter = dynamic_cast<PDHCounterBase*>(counter);
+ if (pdh_counter != nullptr && pdh_counter->collectData() != ERROR_SUCCESS)
+ continue;
+ counter->addToJson(body, root.GetAllocator());
+ }
+ PerformanceDataMonitor::WriteCallback callback(std::move(root));
+ session->write(flowFile, &callback);
+ session->transfer(flowFile, Success);
+}
+
+void PerformanceDataMonitor::initialize(void) {
+ setSupportedProperties({ CustomPDHCounters, PredefinedGroups,
OutputFormatProperty });
+ setSupportedRelationships({ PerformanceDataMonitor::Success });
+}
+
+rapidjson::Value& PerformanceDataMonitor::prepareJSONBody(rapidjson::Document&
root) {
+ switch (output_format_) {
+ case (OutputFormat::kOpenTelemetry):
+ root.AddMember("Name", "PerformanceData", root.GetAllocator());
+ root.AddMember("Timestamp", std::time(0), root.GetAllocator());
+ root.AddMember("Body", rapidjson::Value{ rapidjson::kObjectType },
root.GetAllocator());
+ return root["Body"];
+ case (OutputFormat::kJSON):
+ return root;
+ default:
+ return root;
+ }
+}
+
+void add_cpu_related_counters(std::vector<PerformanceDataCounter*>&
resource_consumption_counters) {
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\Processor(*)\\%
Processor Time"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\Processor(*)\\%
User Time"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\Processor(*)\\%
Privileged Time"));
+}
+
+void add_io_related_counters(std::vector<PerformanceDataCounter*>&
resource_consumption_counters) {
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\Process(_Total)\\IO
Read Bytes/sec"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\Process(_Total)\\IO
Write Bytes/sec"));
+}
+
+void add_disk_related_counters(std::vector<PerformanceDataCounter*>&
resource_consumption_counters) {
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\LogicalDisk(*)\\%
Free Space"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\LogicalDisk(*)\\Free
Megabytes", false));
+
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\PhysicalDisk(*)\\%
Disk Read Time"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\PhysicalDisk(*)\\%
Disk Time"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\PhysicalDisk(*)\\%
Disk Write Time"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\PhysicalDisk(*)\\%
Idle Time"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\PhysicalDisk(*)\\Avg.
Disk Bytes/Transfer"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\PhysicalDisk(*)\\Avg.
Disk Bytes/Read"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\PhysicalDisk(*)\\Avg.
Disk Bytes/Write"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\PhysicalDisk(*)\\Avg.
Disk Write Queue Length"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\PhysicalDisk(*)\\Avg.
Disk Read Queue Length"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\PhysicalDisk(*)\\Avg.
Disk Queue Length"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\PhysicalDisk(*)\\Avg.
Disk sec/Transfer"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\PhysicalDisk(*)\\Avg.
Disk sec/Read"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\PhysicalDisk(*)\\Avg.
Disk sec/Write"));
+
resource_consumption_counters.push_back(PDHCounterBase::createPDHCounter("\\PhysicalDisk(*)\\Current
Disk Queue Length"));
Review comment:
should this be an integer value (ie. created with `..., false`)?
##########
File path: extensions/pdh/PerformanceDataMonitor.cpp
##########
@@ -0,0 +1,283 @@
+/**
+ * @file GenerateFlowFile.cpp
+ * GenerateFlowFile class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "PerformanceDataMonitor.h"
+#include "PDHCounters.h"
+#include "MemoryConsumptionCounter.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Relationship PerformanceDataMonitor::Success("success", "All files are
routed to success");
+
+core::Property PerformanceDataMonitor::PredefinedGroups(
+ core::PropertyBuilder::createProperty("Predefined Groups")->
+ withDescription("Comma separated list to which predefined groups should be
included")->
+ withDefaultValue("")->build());
+
+core::Property PerformanceDataMonitor::CustomPDHCounters(
+ core::PropertyBuilder::createProperty("Custom PDH Counters")->
+ withDescription("Comma separated list of PDHCounters to collect from")->
+ withDefaultValue("")->build());
+
+core::Property PerformanceDataMonitor::OutputFormatProperty(
+ core::PropertyBuilder::createProperty("Output format")->
+ withDescription("Format of the created flowfiles")->
+ withAllowableValue<std::string>(JSON_FORMAT_STR)->
+ withAllowableValue(OPEN_TELEMETRY_FORMAT_STR)->
+ withDefaultValue(JSON_FORMAT_STR)->build());
+
+PerformanceDataMonitor::~PerformanceDataMonitor() {
+ if (pdh_query_ != nullptr)
+ PdhCloseQuery(pdh_query_);
+ for (PerformanceDataCounter* resource_consumption_counter :
resource_consumption_counters_) {
+ delete resource_consumption_counter;
+ }
+}
+
+void PerformanceDataMonitor::onSchedule(const
std::shared_ptr<core::ProcessContext>& context, const
std::shared_ptr<core::ProcessSessionFactory>& sessionFactory) {
+ setupMembersFromProperties(context);
+
+ std::vector<PerformanceDataCounter*> valid_counters;
+
+ for (PerformanceDataCounter* counter : resource_consumption_counters_) {
+ PDHCounterBase* pdh_counter = dynamic_cast<PDHCounterBase*> (counter);
Review comment:
I think this is done because of `MemoryConsumptionCounter` which
inherits from `PerformanceDataCounter` but not from `PDHCounterBase`. Could we
make `MemoryConsumptionCounter` inherit from `PDHCounterBase`, and implement
the unnecessary virtual methods as no-ops? That would make the code easier to
understand.
##########
File path: extensions/pdh/PerformanceDataMonitor.cpp
##########
@@ -0,0 +1,283 @@
+/**
+ * @file GenerateFlowFile.cpp
+ * GenerateFlowFile class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "PerformanceDataMonitor.h"
+#include "PDHCounters.h"
+#include "MemoryConsumptionCounter.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Relationship PerformanceDataMonitor::Success("success", "All files are
routed to success");
+
+core::Property PerformanceDataMonitor::PredefinedGroups(
+ core::PropertyBuilder::createProperty("Predefined Groups")->
+ withDescription("Comma separated list to which predefined groups should be
included")->
+ withDefaultValue("")->build());
+
+core::Property PerformanceDataMonitor::CustomPDHCounters(
+ core::PropertyBuilder::createProperty("Custom PDH Counters")->
+ withDescription("Comma separated list of PDHCounters to collect from")->
+ withDefaultValue("")->build());
+
+core::Property PerformanceDataMonitor::OutputFormatProperty(
+ core::PropertyBuilder::createProperty("Output format")->
+ withDescription("Format of the created flowfiles")->
+ withAllowableValue<std::string>(JSON_FORMAT_STR)->
+ withAllowableValue(OPEN_TELEMETRY_FORMAT_STR)->
+ withDefaultValue(JSON_FORMAT_STR)->build());
+
+PerformanceDataMonitor::~PerformanceDataMonitor() {
+ if (pdh_query_ != nullptr)
+ PdhCloseQuery(pdh_query_);
+ for (PerformanceDataCounter* resource_consumption_counter :
resource_consumption_counters_) {
+ delete resource_consumption_counter;
+ }
+}
+
+void PerformanceDataMonitor::onSchedule(const
std::shared_ptr<core::ProcessContext>& context, const
std::shared_ptr<core::ProcessSessionFactory>& sessionFactory) {
+ setupMembersFromProperties(context);
+
+ std::vector<PerformanceDataCounter*> valid_counters;
+
+ for (PerformanceDataCounter* counter : resource_consumption_counters_) {
+ PDHCounterBase* pdh_counter = dynamic_cast<PDHCounterBase*> (counter);
+ if (pdh_counter != nullptr) {
+ if (pdh_query_ == nullptr)
+ PdhOpenQuery(NULL, NULL, &pdh_query_);
+ PDH_STATUS add_to_query_result = pdh_counter->addToQuery(pdh_query_);
+ if (add_to_query_result != ERROR_SUCCESS) {
+ logger_->log_error(("Error adding " + pdh_counter->getName() + " to
query: " + std::to_string(add_to_query_result)).c_str());
Review comment:
I would write this as
```suggestion
logger_->log_error("Error adding %s to query, error code: 0x%x",
pdh_counter->getName(), add_to_query_result);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]