This is an automated email from the ASF dual-hosted git repository.
aboda pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 98b9055 MINIFICPP-1532: Create a processor to capture resource
consumption metrics on Windows
98b9055 is described below
commit 98b9055fda485f87cdc6e161c29ed557f063846a
Author: Martin Zink <[email protected]>
AuthorDate: Wed Mar 24 12:29:15 2021 +0100
MINIFICPP-1532: Create a processor to capture resource consumption metrics
on Windows
Updated documentation to include information about the new processor
Signed-off-by: Arpad Boda <[email protected]>
This closes #1066
---
.github/workflows/ci.yml | 4 +-
CMakeLists.txt | 18 +-
PROCESSORS.md | 22 ++
README.md | 1 +
Windows.md | 1 +
extensions/pdh/CMakeLists.txt | 32 +++
extensions/pdh/MemoryConsumptionCounter.h | 70 +++++
extensions/pdh/PDHCounters.cpp | 132 ++++++++++
extensions/pdh/PDHCounters.h | 99 +++++++
extensions/pdh/PerformanceDataCounter.h | 51 ++++
extensions/pdh/PerformanceDataMonitor.cpp | 284 +++++++++++++++++++++
extensions/pdh/PerformanceDataMonitor.h | 90 +++++++
extensions/pdh/tests/CMakeLists.txt | 37 +++
.../pdh/tests/PerformanceDataCounterTests.cpp | 187 ++++++++++++++
.../pdh/tests/PerformanceDataMonitorTests.cpp | 231 +++++++++++++++++
libminifi/include/utils/JsonCallback.h | 54 ++++
libminifi/include/utils/OsUtils.h | 5 +
libminifi/src/utils/OsUtils.cpp | 10 +
libminifi/test/unit/MemoryUsageTest.cpp | 35 ++-
win_build_vs.bat | 3 +-
20 files changed, 1347 insertions(+), 19 deletions(-)
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 83e7e86..916e6fa 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -102,7 +102,7 @@ jobs:
run: |
PATH %PATH%;C:\Program Files (x86)\Windows
Kits\10\bin\10.0.17763.0\x86
PATH %PATH%;C:\Program Files (x86)\Microsoft Visual
Studio\2017\Enterprise\MSBuild\15.0\Bin\Roslyn
- win_build_vs.bat build /CI /S /A /Z
+ win_build_vs.bat build /CI /S /A /Z /PDH
shell: cmd
windows_VS2019:
name: "windows-vs2019"
@@ -141,7 +141,7 @@ jobs:
run: |
PATH %PATH%;C:\Program Files (x86)\Windows
Kits\10\bin\10.0.19041.0\x64
PATH %PATH%;C:\Program Files (x86)\Microsoft Visual
Studio\2019\Enterprise\MSBuild\Current\Bin\Roslyn
- win_build_vs.bat build /2019 /64 /CI /S /A
+ win_build_vs.bat build /2019 /64 /CI /S /A /PDH
shell: cmd
ubuntu_16_04:
name: "ubuntu-16.04"
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 6a222db..136796d 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -493,7 +493,7 @@ if (ENABLE_TENSORFLOW)
createExtension(TENSORFLOW-EXTENSIONS "TENSORFLOW EXTENSIONS" "This
enables TensorFlow support" "extensions/tensorflow"
"${TEST_DIR}/tensorflow-tests")
endif()
-## AWS Extentions
+## AWS Extensions
option(ENABLE_AWS "Enables AWS support." OFF)
if (ENABLE_ALL OR ENABLE_AWS)
include(BundledAwsSdkCpp)
@@ -501,7 +501,15 @@ if (ENABLE_ALL OR ENABLE_AWS)
createExtension(AWS-EXTENSIONS "AWS EXTENSIONS" "This enables AWS
support" "extensions/aws" "${TEST_DIR}/aws-tests")
endif()
-## OpenCV Extesions
+## PDH Extentsions
+if (WIN32)
+ option(ENABLE_PDH "Enables PDH support." OFF)
+ if (ENABLE_PDH)
+ createExtension(PDH-EXTENSIONS "PDH EXTENSIONS" "This enables
PDH support" "extensions/pdh" "extensions/pdh/tests")
+ endif()
+endif()
+
+## OpenCV Extensions
option(ENABLE_OPENCV "Disables the OpenCV extensions." OFF)
if (ENABLE_OPENCV)
createExtension(OPENCV-EXTENSIONS "OPENCV EXTENSIONS" "This enabled
OpenCV support" "extensions/opencv" "extensions/opencv/tests")
@@ -515,7 +523,7 @@ if (ENABLE_BUSTACHE)
createExtension(BUSTACHE-EXTENSIONS "BUSTACHE EXTENSIONS" "This enables
bustache functionality including ApplyTemplate." "extensions/bustache"
"${TEST_DIR}/bustache-tests")
endif()
-## OPC Extentions
+## OPC Extensions
if (ENABLE_OPC)
include(BundledMbedTLS)
use_bundled_mbedtls(${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_BINARY_DIR})
@@ -546,7 +554,7 @@ if ((ENABLE_OPENWSMAN AND NOT DISABLE_CIVET AND NOT
DISABLE_CURL) OR ENABLE_ALL
list(APPEND CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/cmake/libxml2/dummy")
endif()
-## Openwsman Extesions
+## Openwsman Extensions
if (ENABLE_OPENWSMAN AND NOT DISABLE_CIVET AND NOT DISABLE_CURL)
include(BundledOpenWSMAN)
use_bundled_openwsman(${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_BINARY_DIR})
@@ -554,7 +562,7 @@ if (ENABLE_OPENWSMAN AND NOT DISABLE_CIVET AND NOT
DISABLE_CURL)
createExtension(OPENWSMAN-EXTENSIONS "OPENWSMAN EXTENSIONS" "This
enables Openwsman support" "extensions/openwsman")
endif()
-## Azure Extentions
+## Azure Extensions
if (ENABLE_ALL OR ENABLE_AZURE)
include(NlohmannJson)
list(APPEND CMAKE_MODULE_PATH
"${CMAKE_SOURCE_DIR}/cmake/nlohmann_json/dummy")
diff --git a/PROCESSORS.md b/PROCESSORS.md
index d90ff6e..4820510 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -36,6 +36,7 @@
- [ManipulateArchive](#manipulatearchive)
- [MergeContent](#mergecontent)
- [MotionDetector](#motiondetector)
+- [PerformanceDataMonitor](#performancedatamonitor)
- [PublishKafka](#publishkafka)
- [PublishMQTT](#publishmqtt)
- [PutAzureBlobStorage](#putazureblobstorage)
@@ -955,6 +956,27 @@ In the list below, the names of required properties appear
in bold. Any other pr
|success|Successful to detect motion|
+## PerformanceDataMonitor
+
+### Description
+This Windows only processor can create FlowFiles populated with various
performance data with the help of Windows Performance Counters.
+Windows Performance Counters provide a high-level abstraction layer with a
consistent interface for collecting various kinds of system data such as CPU,
memory, and disk usage statistics.
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other
properties (not in bold) are considered optional. The table also indicates any
default values, and whether a property supports the NiFi Expression Language.
+
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
+|Predefined
Groups||CPU<br>IO<br>Disk<br>Network<br>Memory<br>System<br>Process|Comma
separated list from the allowable values, to monitor multiple common Windows
Performance counters related to these groups. (e.g. "CPU,Network")|
+|Custom PDH Counters|||Comma separated list of Windows Performance Counters to
monitor. (e.g. "\\System\\Threads,\\Process(*)\\ID Process")|
+|**Output Format**|JSON|JSON<br>OpenTelemetry|The output format of the new
flowfile|
+### Relationships
+
+| Name | Description |
+| - | - |
+|success|All files are routed to success|
+
+
## PublishKafka
### Description
diff --git a/README.md b/README.md
index 43ee8e2..b1c97be 100644
--- a/README.md
+++ b/README.md
@@ -87,6 +87,7 @@ Through JNI extensions you can run NiFi processors using
NARs. The JNI extension
| OpenCV | [CaptureRTSPFrame](PROCESSORS.md#captureRTSPFrame) |
-DENABLE_OPENCV=ON |
| OpenWSMAN | SourceInitiatedSubscriptionListener | -DENABLE_OPENWSMAN=ON |
| PCAP | [CapturePacket](PROCESSORS.md#capturepacket) |
-DENABLE_PCAP=ON |
+| PDH (Windows only) |
[PerformanceDataMonitor](PROCESSORS.md#performancedatamonitor) |
-DENABLE_PDH=ON |
| Scripting | [ExecuteScript](PROCESSORS.md#executescript)<br/>**Custom Python
Processors** | -DDISABLE_SCRIPTING=ON |
| Sensors | GetEnvironmentalSensors<br/>GetMovementSensors |
-DENABLE_SENSORS=ON |
| SFTP |
[FetchSFTP](PROCESSORS.md#fetchsftp)<br/>[ListSFTP](PROCESSORS.md#listsftp)<br/>[PutSFTP](PROCESSORS.md#putsftp)
| -DENABLE_SFTP=ON |
diff --git a/Windows.md b/Windows.md
index f32a851..76314c6 100644
--- a/Windows.md
+++ b/Windows.md
@@ -64,6 +64,7 @@ After the build directory it will take optional parameters
modifying the CMake c
| /C | Enables CoAP |
| /A | Enables AWS |
| /Z | Enables Azure |
+| /PDH | Enables Performance Monitor |
| /M | Creates installer with merge modules |
| /64 | Creates 64-bit build instead of a 32-bit one |
| /D | Builds RelWithDebInfo build instead of Release |
diff --git a/extensions/pdh/CMakeLists.txt b/extensions/pdh/CMakeLists.txt
new file mode 100644
index 0000000..d6c644e
--- /dev/null
+++ b/extensions/pdh/CMakeLists.txt
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt)
+
+file(GLOB SOURCES "*.cpp")
+
+add_library(minifi-pdh STATIC ${SOURCES})
+set_property(TARGET minifi-pdh PROPERTY POSITION_INDEPENDENT_CODE ON)
+
+target_link_libraries(minifi-pdh ${LIBMINIFI} pdh)
+
+SET(PDH-EXTENSION minifi-pdh PARENT_SCOPE)
+register_extension(minifi-pdh)
+
+register_extension_linter(minifi-pdh-extensions-linter)
diff --git a/extensions/pdh/MemoryConsumptionCounter.h
b/extensions/pdh/MemoryConsumptionCounter.h
new file mode 100644
index 0000000..f04fbcc
--- /dev/null
+++ b/extensions/pdh/MemoryConsumptionCounter.h
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "PerformanceDataCounter.h"
+#include <string>
+#include "utils/OsUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class MemoryConsumptionCounter : public PerformanceDataCounter {
+ public:
+ MemoryConsumptionCounter() : PerformanceDataCounter(),
total_physical_memory_(-1), available_physical_memory_(-1),
total_paging_file_size_(-1) {
+ }
+
+ bool dataIsValid() {
+ return total_physical_memory_ > 0 && available_physical_memory_ > 0 &&
total_paging_file_size_ > 0;
+ }
+
+ bool collectData() override {
+ total_physical_memory_ = utils::OsUtils::getSystemTotalPhysicalMemory();
+ available_physical_memory_ =
utils::OsUtils::getSystemTotalPhysicalMemory() -
utils::OsUtils::getSystemPhysicalMemoryUsage();
+ total_paging_file_size_ = utils::OsUtils::getTotalPagingFileSize();
+ return dataIsValid();
+ }
+
+ void addToJson(rapidjson::Value& body, rapidjson::Document::AllocatorType&
alloc) const override {
+ rapidjson::Value& group_node = acquireNode(std::string("Memory"), body,
alloc);
+
+ rapidjson::Value total_physical_memory_value;
+ total_physical_memory_value.SetInt64(total_physical_memory_);
+ group_node.AddMember(rapidjson::Value("Total Physical Memory", alloc),
total_physical_memory_value, alloc);
+
+ rapidjson::Value available_physical_memory_value;
+ available_physical_memory_value.SetInt64(available_physical_memory_);
+ group_node.AddMember(rapidjson::Value("Available Physical Memory", alloc),
available_physical_memory_value, alloc);
+
+ rapidjson::Value total_paging_file_size_value;
+ total_paging_file_size_value.SetInt64(total_paging_file_size_);
+ group_node.AddMember(rapidjson::Value("Total paging file size", alloc),
total_paging_file_size_value, alloc);
+ }
+
+ int64_t total_physical_memory_;
+ int64_t available_physical_memory_;
+ int64_t total_paging_file_size_;
+};
+} // namespace processors
+} // namespace minifi
+} // namespace nifi
+} // namespace apache
+} // namespace org
diff --git a/extensions/pdh/PDHCounters.cpp b/extensions/pdh/PDHCounters.cpp
new file mode 100644
index 0000000..03a839f
--- /dev/null
+++ b/extensions/pdh/PDHCounters.cpp
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "PDHCounters.h"
+#include "utils/StringUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+DWORD PDHCounter::getDWFormat() const {
+ return is_double_format_ ? PDH_FMT_DOUBLE : PDH_FMT_LARGE;
+}
+
+std::unique_ptr<PDHCounter> PDHCounter::createPDHCounter(const std::string&
query_name, bool is_double) {
+ auto groups = utils::StringUtils::split(query_name, "\\");
+ if (groups.size() != 2 || query_name.substr(0, 1) != "\\")
+ return nullptr;
+ if (query_name.find("(*)") != std::string::npos) {
+ return std::unique_ptr<PDHCounter> { new PDHCounterArray(query_name,
is_double) };
+ } else {
+ return std::unique_ptr<SinglePDHCounter> { new
SinglePDHCounter(query_name, is_double) };
+ }
+}
+
+const std::string& PDHCounter::getName() const {
+ return pdh_english_counter_name_;
+}
+
+std::string PDHCounter::getObjectName() const {
+ auto groups = utils::StringUtils::split(pdh_english_counter_name_, "\\");
+ return groups[0];
+}
+
+std::string PDHCounter::getCounterName() const {
+ auto groups = utils::StringUtils::split(pdh_english_counter_name_, "\\");
+ return groups[1];
+}
+
+void SinglePDHCounter::addToJson(rapidjson::Value& body,
rapidjson::Document::AllocatorType& alloc) const {
+ rapidjson::Value key(getCounterName().c_str(), getCounterName().length(),
alloc);
+ rapidjson::Value& group_node = acquireNode(getObjectName(), body, alloc);
+ group_node.AddMember(key, getValue(), alloc);
+}
+
+PDH_STATUS SinglePDHCounter::addToQuery(PDH_HQUERY& pdh_query) {
+ return PdhAddEnglishCounterA(pdh_query, pdh_english_counter_name_.c_str(),
0, &counter_);
+}
+
+bool SinglePDHCounter::collectData() {
+ return PdhGetFormattedCounterValue(counter_, getDWFormat(), nullptr,
¤t_value_) == ERROR_SUCCESS;
+}
+
+rapidjson::Value SinglePDHCounter::getValue() const {
+ rapidjson::Value value;
+ if (is_double_format_)
+ value.SetDouble(current_value_.doubleValue);
+ else
+ value.SetInt64(current_value_.largeValue);
+ return value;
+}
+
+std::string PDHCounterArray::getObjectName() const {
+ std::string group_name_with_wildcard = PDHCounter::getObjectName();
+ return group_name_with_wildcard.substr(0,
group_name_with_wildcard.find("(*)"));
+}
+
+void PDHCounterArray::addToJson(rapidjson::Value& body,
rapidjson::Document::AllocatorType& alloc) const {
+ rapidjson::Value& group_node = acquireNode(getObjectName(), body, alloc);
+ for (DWORD i = 0; i < item_count_; ++i) {
+ rapidjson::Value& counter_node =
acquireNode(std::string(values_[i].szName), group_node, alloc);
+ rapidjson::Value value;
+ if (is_double_format_)
+ value.SetDouble(values_[i].FmtValue.doubleValue);
+ else
+ value.SetInt64(values_[i].FmtValue.largeValue);
+ rapidjson::Value key;
+ key.SetString(getCounterName().c_str(), getCounterName().length(), alloc);
+ counter_node.AddMember(key, value, alloc);
+ }
+}
+
+PDH_STATUS PDHCounterArray::addToQuery(PDH_HQUERY& pdh_query) {
+ return PdhAddEnglishCounterA(pdh_query, pdh_english_counter_name_.c_str(),
0, &counter_);
+}
+
+bool PDHCounterArray::collectData() {
+ clearCurrentData();
+ PDH_STATUS status = PdhGetFormattedCounterArrayA(counter_, getDWFormat(),
&buffer_size_, &item_count_, values_);
+ if (PDH_MORE_DATA == status) {
+ values_ =
reinterpret_cast<PDH_FMT_COUNTERVALUE_ITEM*>(malloc(buffer_size_));
+ status = PdhGetFormattedCounterArrayA(counter_, getDWFormat(),
&buffer_size_, &item_count_, values_);
+ }
+ return status == ERROR_SUCCESS;
+}
+
+void PDHCounterArray::clearCurrentData() {
+ free(values_);
+ values_ = nullptr;
+ buffer_size_ = item_count_ = 0;
+}
+
+rapidjson::Value PDHCounterArray::getValue(const DWORD i) const {
+ rapidjson::Value value;
+ if (is_double_format_)
+ value.SetDouble(values_[i].FmtValue.doubleValue);
+ else
+ value.SetInt64(values_[i].FmtValue.largeValue);
+ return value;
+}
+
+} // namespace processors
+} // namespace minifi
+} // namespace nifi
+} // namespace apache
+} // namespace org
diff --git a/extensions/pdh/PDHCounters.h b/extensions/pdh/PDHCounters.h
new file mode 100644
index 0000000..2a0d3e7
--- /dev/null
+++ b/extensions/pdh/PDHCounters.h
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <TCHAR.h>
+#include <pdh.h>
+#include <pdhmsg.h>
+#include <string>
+
+#include "PerformanceDataCounter.h"
+#include <memory>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class PDHCounter : public PerformanceDataCounter {
+ public:
+ virtual ~PDHCounter() {}
+ static std::unique_ptr<PDHCounter> createPDHCounter(const std::string&
query_name, bool is_double = true);
+
+ const std::string& getName() const;
+ virtual std::string getObjectName() const;
+ virtual std::string getCounterName() const;
+ virtual PDH_STATUS addToQuery(PDH_HQUERY& pdh_query) = 0;
+ protected:
+ PDHCounter(const std::string& query_name, bool is_double)
+ : PerformanceDataCounter(), is_double_format_(is_double),
pdh_english_counter_name_(query_name) {}
+
+ DWORD getDWFormat() const;
+
+ const bool is_double_format_;
+ std::string pdh_english_counter_name_;
+};
+
+class SinglePDHCounter : public PDHCounter {
+ public:
+ void addToJson(rapidjson::Value& body, rapidjson::Document::AllocatorType&
alloc) const override;
+
+ PDH_STATUS addToQuery(PDH_HQUERY& pdh_query) override;
+ bool collectData() override;
+
+ protected:
+ friend std::unique_ptr<PDHCounter> PDHCounter::createPDHCounter(const
std::string& query_name, bool is_double);
+ explicit SinglePDHCounter(const std::string& query_name, bool is_double)
+ : PDHCounter(query_name, is_double), counter_(nullptr), current_value_()
{}
+
+ rapidjson::Value getValue() const;
+
+ PDH_HCOUNTER counter_;
+ PDH_FMT_COUNTERVALUE current_value_;
+};
+
+class PDHCounterArray : public PDHCounter {
+ public:
+ virtual ~PDHCounterArray() { clearCurrentData(); }
+ void addToJson(rapidjson::Value& body, rapidjson::Document::AllocatorType&
alloc) const override;
+
+ std::string getObjectName() const override;
+ PDH_STATUS addToQuery(PDH_HQUERY& pdh_query) override;
+ bool collectData() override;
+
+ protected:
+ friend std::unique_ptr<PDHCounter> PDHCounter::createPDHCounter(const
std::string& query_name, bool is_double);
+ explicit PDHCounterArray(const std::string& query_name, bool is_double)
+ : PDHCounter(query_name, is_double), counter_(nullptr), buffer_size_(0),
item_count_(0), values_(nullptr) {}
+
+ void clearCurrentData();
+ rapidjson::Value getValue(const DWORD i) const;
+
+ PDH_HCOUNTER counter_;
+ DWORD buffer_size_; // Size of the values_ array in bytes
+ DWORD item_count_; // Number of items in values_ array
+ PDH_FMT_COUNTERVALUE_ITEM* values_; // Array of PDH_FMT_COUNTERVALUE_ITEM
structures
+};
+
+
+} // namespace processors
+} // namespace minifi
+} // namespace nifi
+} // namespace apache
+} // namespace org
diff --git a/extensions/pdh/PerformanceDataCounter.h
b/extensions/pdh/PerformanceDataCounter.h
new file mode 100644
index 0000000..b7f69d3
--- /dev/null
+++ b/extensions/pdh/PerformanceDataCounter.h
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <string>
+#include "rapidjson/document.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class PerformanceDataCounter {
+ public:
+ PerformanceDataCounter() = default;
+ virtual ~PerformanceDataCounter() = default;
+
+ virtual bool collectData() = 0;
+ virtual void addToJson(rapidjson::Value& body,
rapidjson::Document::AllocatorType& alloc) const = 0;
+
+ protected:
+ static rapidjson::Value& acquireNode(const std::string& node_name,
rapidjson::Value& parent_node, rapidjson::Document::AllocatorType& alloc) {
+ if (!parent_node.HasMember(node_name.c_str())) {
+ rapidjson::Value value(rapidjson::kObjectType);
+ rapidjson::Value key(node_name.c_str(), alloc);
+ parent_node.AddMember(key, value, alloc);
+ }
+ return parent_node[node_name.c_str()];
+ }
+};
+
+} // namespace processors
+} // namespace minifi
+} // namespace nifi
+} // namespace apache
+} // namespace org
diff --git a/extensions/pdh/PerformanceDataMonitor.cpp
b/extensions/pdh/PerformanceDataMonitor.cpp
new file mode 100644
index 0000000..f25517c
--- /dev/null
+++ b/extensions/pdh/PerformanceDataMonitor.cpp
@@ -0,0 +1,284 @@
+/**
+ * @file GenerateFlowFile.cpp
+ * GenerateFlowFile class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "PerformanceDataMonitor.h"
+#include "PDHCounters.h"
+#include "MemoryConsumptionCounter.h"
+#include "utils/StringUtils.h"
+#include "utils/JsonCallback.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Relationship PerformanceDataMonitor::Success("success", "All files are
routed to success");
+
+core::Property PerformanceDataMonitor::PredefinedGroups(
+ core::PropertyBuilder::createProperty("Predefined Groups")->
+ withDescription("Comma separated list from the allowable values, to
monitor multiple common Windows Performance counters related to these groups")->
+ withDefaultValue("")->build());
+
+core::Property PerformanceDataMonitor::CustomPDHCounters(
+ core::PropertyBuilder::createProperty("Custom PDH Counters")->
+ withDescription("Comma separated list of Windows Performance Counters to
monitor")->
+ withDefaultValue("")->build());
+
+core::Property PerformanceDataMonitor::OutputFormatProperty(
+ core::PropertyBuilder::createProperty("Output Format")->
+ withDescription("Format of the created flowfiles")->
+ withAllowableValue<std::string>(JSON_FORMAT_STR)->
+ withAllowableValue(OPEN_TELEMETRY_FORMAT_STR)->
+ withDefaultValue(JSON_FORMAT_STR)->build());
+
+PerformanceDataMonitor::~PerformanceDataMonitor() {
+ PdhCloseQuery(pdh_query_);
+}
+
+void PerformanceDataMonitor::onSchedule(const
std::shared_ptr<core::ProcessContext>& context, const
std::shared_ptr<core::ProcessSessionFactory>& sessionFactory) {
+ setupMembersFromProperties(context);
+
+ PdhOpenQueryA(nullptr, 0, &pdh_query_);
+
+ for (auto it = resource_consumption_counters_.begin(); it !=
resource_consumption_counters_.end();) {
+ PDHCounter* pdh_counter = dynamic_cast<PDHCounter*> (it->get());
+ if (pdh_counter != nullptr) {
+ PDH_STATUS add_to_query_result = pdh_counter->addToQuery(pdh_query_);
+ if (add_to_query_result != ERROR_SUCCESS) {
+ logger_->log_error("Error adding %s to query, error code: 0x%x",
pdh_counter->getName(), add_to_query_result);
+ it = resource_consumption_counters_.erase(it);
+ continue;
+ }
+ }
+ ++it;
+ }
+
+ PDH_STATUS collect_query_data_result = PdhCollectQueryData(pdh_query_);
+ if (ERROR_SUCCESS != collect_query_data_result) {
+ logger_->log_error("Error during PdhCollectQueryData, error code: 0x%x",
collect_query_data_result);
+ }
+}
+
+void PerformanceDataMonitor::onTrigger(core::ProcessContext* context,
core::ProcessSession* session) {
+ if (resource_consumption_counters_.empty()) {
+ logger_->log_error("No valid counters for PerformanceDataMonitor");
+ yield();
+ return;
+ }
+
+ std::shared_ptr<core::FlowFile> flowFile = session->create();
+ if (!flowFile) {
+ logger_->log_error("Failed to create flowfile!");
+ yield();
+ return;
+ }
+
+ PDH_STATUS collect_query_data_result = PdhCollectQueryData(pdh_query_);
+ if (ERROR_SUCCESS != collect_query_data_result) {
+ logger_->log_error("Error during PdhCollectQueryData, error code: 0x%x",
collect_query_data_result);
+ yield();
+ return;
+ }
+
+ rapidjson::Document root = rapidjson::Document(rapidjson::kObjectType);
+ rapidjson::Value& body = prepareJSONBody(root);
+ for (auto& counter : resource_consumption_counters_) {
+ if (counter->collectData())
+ counter->addToJson(body, root.GetAllocator());
+ }
+ utils::JsonOutputCallback callback(std::move(root));
+ session->write(flowFile, &callback);
+ session->transfer(flowFile, Success);
+}
+
+void PerformanceDataMonitor::initialize() {
+ setSupportedProperties({ CustomPDHCounters, PredefinedGroups,
OutputFormatProperty });
+ setSupportedRelationships({ PerformanceDataMonitor::Success });
+}
+
+rapidjson::Value& PerformanceDataMonitor::prepareJSONBody(rapidjson::Document&
root) {
+ switch (output_format_) {
+ case OutputFormat::OPENTELEMETRY:
+ root.AddMember("Name", "PerformanceData", root.GetAllocator());
+ root.AddMember("Timestamp", std::time(0), root.GetAllocator());
+ root.AddMember("Body", rapidjson::Value{ rapidjson::kObjectType },
root.GetAllocator());
+ return root["Body"];
+ case OutputFormat::JSON:
+ return root;
+ default:
+ return root;
+ }
+}
+
+void
add_cpu_related_counters(std::vector<std::unique_ptr<PerformanceDataCounter>>&
resource_consumption_counters) {
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Processor(*)\\%
Processor Time"));
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Processor(*)\\%
User Time"));
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Processor(*)\\%
Privileged Time"));
+}
+
+void
add_io_related_counters(std::vector<std::unique_ptr<PerformanceDataCounter>>&
resource_consumption_counters) {
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Process(_Total)\\IO
Read Bytes/sec"));
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Process(_Total)\\IO
Write Bytes/sec"));
+}
+
+void
add_disk_related_counters(std::vector<std::unique_ptr<PerformanceDataCounter>>&
resource_consumption_counters) {
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\LogicalDisk(*)\\%
Free Space"));
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\LogicalDisk(*)\\Free
Megabytes", false));
+
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\%
Disk Read Time"));
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\%
Disk Time"));
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\%
Disk Write Time"));
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\%
Idle Time"));
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\Avg.
Disk Bytes/Transfer"));
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\Avg.
Disk Bytes/Read"));
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\Avg.
Disk Bytes/Write"));
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\Avg.
Disk Write Queue Length"));
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\Avg.
Disk Read Queue Length"));
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\Avg.
Disk Queue Length"));
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\Avg.
Disk sec/Transfer"));
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\Avg.
Disk sec/Read"));
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\Avg.
Disk sec/Write"));
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\Current
Disk Queue Length", false));
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\Disk
Transfers/sec"));
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\Disk
Reads/sec"));
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\Disk
Writes/sec"));
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\Disk
Bytes/sec"));
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\Disk
Read Bytes/sec"));
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\Disk
Write Bytes/sec"));
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\Split
IO/Sec"));
+}
+
+void
add_network_related_counters(std::vector<std::unique_ptr<PerformanceDataCounter>>&
resource_consumption_counters) {
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Network
Interface(*)\\Bytes Received/sec"));
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Network
Interface(*)\\Bytes Sent/sec"));
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Network
Interface(*)\\Bytes Total/sec"));
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Network
Interface(*)\\Current Bandwidth", false));
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Network
Interface(*)\\Packets/sec"));
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Network
Interface(*)\\Packets Received/sec"));
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Network
Interface(*)\\Packets Sent/sec"));
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Network
Interface(*)\\Packets Received Discarded", false));
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Network
Interface(*)\\Packets Received Errors", false));
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Network
Interface(*)\\Packets Received Unknown", false));
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Network
Interface(*)\\Packets Received Non-Unicast/sec"));
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Network
Interface(*)\\Packets Received Unicast/sec"));
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Network
Interface(*)\\Packets Sent Unicast/sec"));
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Network
Interface(*)\\Packets Sent Non-Unicast/sec"));
+}
+
+void
add_memory_related_counters(std::vector<std::unique_ptr<PerformanceDataCounter>>&
resource_consumption_counters) {
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Memory\\%
Committed Bytes In Use"));
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Memory\\Available
MBytes", false));
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Memory\\Page
Faults/sec"));
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Memory\\Pages/sec"));
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Paging
File(_Total)\\% Usage"));
+
+
resource_consumption_counters.push_back(std::make_unique<MemoryConsumptionCounter>());
+}
+
+void
add_process_related_counters(std::vector<std::unique_ptr<PerformanceDataCounter>>&
resource_consumption_counters) {
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Process(*)\\%
Processor Time"));
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Process(*)\\Elapsed
Time"));
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Process(*)\\ID
Process", false));
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Process(*)\\Private
Bytes", false));
+}
+
+void
add_system_related_counters(std::vector<std::unique_ptr<PerformanceDataCounter>>&
resource_consumption_counters) {
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\System\\%
Registry Quota In Use"));
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\System\\Context
Switches/sec"));
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\System\\File
Control Bytes/sec"));
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\System\\File
Control Operations/sec"));
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\System\\File
Read Bytes/sec"));
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\System\\File
Read Operations/sec"));
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\System\\File
Write Bytes/sec"));
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\System\\File
Write Operations/sec"));
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\System\\File
Data Operations/sec"));
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\System\\Processes",
false));
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\System\\Processor
Queue Length", false));
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\System\\System
Calls/sec"));
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\System\\System
Up Time"));
+
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\System\\Threads",
false));
+}
+
+void PerformanceDataMonitor::addCountersFromPredefinedGroupsProperty(const
std::string& predefined_groups) {
+ auto groups = utils::StringUtils::splitAndTrim(predefined_groups, ",");
+ for (const auto& group : groups) {
+ if (group == "CPU") {
+ add_cpu_related_counters(resource_consumption_counters_);
+ } else if (group == "IO") {
+ add_io_related_counters(resource_consumption_counters_);
+ } else if (group == "Disk") {
+ add_disk_related_counters(resource_consumption_counters_);
+ } else if (group == "Network") {
+ add_network_related_counters(resource_consumption_counters_);
+ } else if (group == "Memory") {
+ add_memory_related_counters(resource_consumption_counters_);
+ } else if (group == "System") {
+ add_system_related_counters(resource_consumption_counters_);
+ } else if (group == "Process") {
+ add_process_related_counters(resource_consumption_counters_);
+ } else {
+ logger_->log_error((group + " is not a valid predefined group for
PerformanceDataMonitor").c_str());
+ }
+ }
+}
+
+void PerformanceDataMonitor::addCustomPDHCountersFromProperty(const
std::string& custom_pdh_counters) {
+ const auto custom_counters =
utils::StringUtils::splitAndTrim(custom_pdh_counters, ",");
+ for (const auto& custom_counter : custom_counters) {
+ auto counter = PDHCounter::createPDHCounter(custom_counter);
+ if (counter != nullptr)
+ resource_consumption_counters_.push_back(std::move(counter));
+ }
+}
+
+void PerformanceDataMonitor::setupMembersFromProperties(const
std::shared_ptr<core::ProcessContext>& context) {
+ std::string predefined_groups;
+ if (context->getProperty(PredefinedGroups.getName(), predefined_groups)) {
+ logger_->log_trace("Predefined group configured to be %s",
predefined_groups);
+ addCountersFromPredefinedGroupsProperty(predefined_groups);
+ }
+
+ std::string custom_pdh_counters;
+ if (context->getProperty(CustomPDHCounters.getName(), custom_pdh_counters)) {
+ logger_->log_trace("Custom PDH counters configured to be %s",
custom_pdh_counters);
+ addCustomPDHCountersFromProperty(custom_pdh_counters);
+ }
+
+ std::string output_format_string;
+ if (context->getProperty(OutputFormatProperty.getName(),
output_format_string)) {
+ if (output_format_string == OPEN_TELEMETRY_FORMAT_STR) {
+ logger_->log_trace("OutputFormat is configured to be OpenTelemetry");
+ output_format_ = OutputFormat::OPENTELEMETRY;
+ } else if (output_format_string == JSON_FORMAT_STR) {
+ logger_->log_trace("OutputFormat is configured to be JSON");
+ output_format_ = OutputFormat::JSON;
+ } else {
+ logger_->log_error("Invalid OutputFormat, defaulting to JSON");
+ output_format_ = OutputFormat::JSON;
+ }
+ }
+}
+
+} // namespace processors
+} // namespace minifi
+} // namespace nifi
+} // namespace apache
+} // namespace org
diff --git a/extensions/pdh/PerformanceDataMonitor.h
b/extensions/pdh/PerformanceDataMonitor.h
new file mode 100644
index 0000000..9cc37d2
--- /dev/null
+++ b/extensions/pdh/PerformanceDataMonitor.h
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <pdh.h>
+#include <string>
+#include <vector>
+#include <memory>
+#include <utility>
+
+#include "core/Processor.h"
+
+#include "rapidjson/stream.h"
+#include "rapidjson/writer.h"
+#include "rapidjson/prettywriter.h"
+
+#include "PerformanceDataCounter.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+// PerformanceDataMonitor Class
+class PerformanceDataMonitor : public core::Processor {
+ public:
+ static constexpr const char* JSON_FORMAT_STR = "JSON";
+ static constexpr const char* OPEN_TELEMETRY_FORMAT_STR = "OpenTelemetry";
+
+ explicit PerformanceDataMonitor(const std::string& name, utils::Identifier
uuid = utils::Identifier())
+ : Processor(name, uuid), output_format_(OutputFormat::JSON),
+ logger_(logging::LoggerFactory<PerformanceDataMonitor>::getLogger()),
+ pdh_query_(nullptr), resource_consumption_counters_() {}
+
+ ~PerformanceDataMonitor() override;
+ static constexpr char const* ProcessorName = "PerformanceDataMonitor";
+ // Supported Properties
+ static core::Property PredefinedGroups;
+ static core::Property CustomPDHCounters;
+ static core::Property OutputFormatProperty;
+ // Supported Relationships
+ static core::Relationship Success;
+
+
+ public:
+ void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const
std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
+ void onTrigger(core::ProcessContext *context, core::ProcessSession *session)
override;
+ void initialize() override;
+
+ protected:
+ enum class OutputFormat {
+ JSON, OPENTELEMETRY
+ };
+
+ rapidjson::Value& prepareJSONBody(rapidjson::Document& root);
+
+ void setupMembersFromProperties(const std::shared_ptr<core::ProcessContext>&
context);
+ void addCountersFromPredefinedGroupsProperty(const std::string&
custom_pdh_counters);
+ void addCustomPDHCountersFromProperty(const std::string&
custom_pdh_counters);
+
+ OutputFormat output_format_;
+
+ std::shared_ptr<logging::Logger> logger_;
+ PDH_HQUERY pdh_query_;
+ std::vector<std::unique_ptr<PerformanceDataCounter>>
resource_consumption_counters_;
+};
+
+REGISTER_RESOURCE(PerformanceDataMonitor, "This processor can create FlowFiles
with various performance data through Performance Data Helper. (Windows only)");
+
+} // namespace processors
+} // namespace minifi
+} // namespace nifi
+} // namespace apache
+} // namespace org
diff --git a/extensions/pdh/tests/CMakeLists.txt
b/extensions/pdh/tests/CMakeLists.txt
new file mode 100644
index 0000000..29d144b
--- /dev/null
+++ b/extensions/pdh/tests/CMakeLists.txt
@@ -0,0 +1,37 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+file(GLOB PDH_TESTS "*.cpp")
+
+SET(PDH_TEST_COUNT 0)
+FOREACH(testfile ${PDH_TESTS})
+ get_filename_component(testfilename "${testfile}" NAME_WE)
+ add_executable("${testfilename}" "${testfile}")
+ target_include_directories(${testfilename} PRIVATE BEFORE
"${CMAKE_SOURCE_DIR}/extensions/standard-processors")
+ target_include_directories(${testfilename} PRIVATE BEFORE
"${CMAKE_SOURCE_DIR}/extensions/pdh")
+ target_include_directories(${testfilename} PRIVATE BEFORE
"${CMAKE_SOURCE_DIR}/libminifi/test/")
+ target_include_directories(${testfilename} PRIVATE BEFORE
"${CMAKE_SOURCE_DIR}/extensions/standard-processors")
+ createTests("${testfilename}")
+ target_link_libraries(${testfilename} ${CATCH_MAIN_LIB})
+ target_wholearchive_library(${testfilename} minifi-pdh)
+ target_wholearchive_library(${testfilename} minifi-standard-processors)
+ MATH(EXPR PDH_TEST_COUNT "${PDH_TEST_COUNT}+1")
+ add_test(NAME "${testfilename}" COMMAND "${testfilename}"
WORKING_DIRECTORY ${TEST_DIR})
+ENDFOREACH()
+message("-- Finished building ${PDH_TEST_COUNT} PDH related test file(s)...")
diff --git a/extensions/pdh/tests/PerformanceDataCounterTests.cpp
b/extensions/pdh/tests/PerformanceDataCounterTests.cpp
new file mode 100644
index 0000000..879228c
--- /dev/null
+++ b/extensions/pdh/tests/PerformanceDataCounterTests.cpp
@@ -0,0 +1,187 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "TestBase.h"
+#include "PDHCounters.h"
+#include "MemoryConsumptionCounter.h"
+#include "gsl/gsl-lite.hpp"
+
+using org::apache::nifi::minifi::processors::SinglePDHCounter;
+using org::apache::nifi::minifi::processors::PDHCounterArray;
+using org::apache::nifi::minifi::processors::PDHCounter;
+using org::apache::nifi::minifi::processors::MemoryConsumptionCounter;
+
+
+TEST_CASE("PDHCounterNameTests", "[pdhcounternametests]") {
+ std::unique_ptr<PDHCounter> test_counter =
PDHCounter::createPDHCounter("\\System\\Threads");
+ REQUIRE(nullptr != dynamic_cast<SinglePDHCounter*> (test_counter.get()));
+ REQUIRE("\\System\\Threads" == test_counter->getName());
+ REQUIRE("System" == test_counter->getObjectName());
+ REQUIRE("Threads" == test_counter->getCounterName());
+}
+
+TEST_CASE("PDHCounterArrayNameTests", "[pdhcounterarraytests]") {
+ std::unique_ptr<PDHCounter> test_counter_array =
PDHCounter::createPDHCounter("\\LogicalDisk(*)\\% Free Space");
+ REQUIRE(nullptr != dynamic_cast<PDHCounterArray*>
(test_counter_array.get()));
+ REQUIRE("\\LogicalDisk(*)\\% Free Space" == test_counter_array->getName());
+ REQUIRE("LogicalDisk" == test_counter_array->getObjectName());
+ REQUIRE("% Free Space" == test_counter_array->getCounterName());
+}
+
+TEST_CASE("PDHCountersInvalidNameTests", "[pdhcountersinvalidnametests]") {
+ REQUIRE(nullptr == PDHCounter::createPDHCounter("Invalid Name"));
+ REQUIRE(nullptr == PDHCounter::createPDHCounter(""));
+ REQUIRE(nullptr == PDHCounter::createPDHCounter("Something\\Counter"));
+ REQUIRE(nullptr == PDHCounter::createPDHCounter("\\Too\\Many\\Separators"));
+ REQUIRE(nullptr == PDHCounter::createPDHCounter("Too\\Many\\Separators"));
+ REQUIRE(nullptr != PDHCounter::createPDHCounter("\\Valid\\Counter"));
+}
+
+class TestablePDHCounter : public SinglePDHCounter {
+ public:
+ explicit TestablePDHCounter(const std::string& query_name, bool is_double =
true) : SinglePDHCounter(query_name, is_double) {
+ }
+};
+
+class TestablePDHCounterArray : public PDHCounterArray {
+ public:
+ explicit TestablePDHCounterArray(const std::string& query_name, bool
is_double = true) : PDHCounterArray(query_name, is_double) {
+ }
+};
+
+TEST_CASE("PDHCountersAddingToQueryTests", "[pdhcountersaddingtoquerytests]") {
+ PDH_HQUERY pdh_query;
+ PdhOpenQueryA(nullptr, 0, &pdh_query);
+ auto cleanup = gsl::finally([&pdh_query] { PdhCloseQuery(pdh_query); });
+
+ PDH_HQUERY unopened_pdh_query;
+
+ TestablePDHCounter valid_counter("\\System\\Threads");
+ REQUIRE(ERROR_SUCCESS == valid_counter.addToQuery(pdh_query));
+ REQUIRE_FALSE(ERROR_SUCCESS == valid_counter.addToQuery(unopened_pdh_query));
+
+ TestablePDHCounter counter_with_invalid_object_name("\\Invalid\\Threads");
+ REQUIRE(PDH_CSTATUS_NO_OBJECT ==
counter_with_invalid_object_name.addToQuery(pdh_query));
+
+ TestablePDHCounter counter_with_invalid_counter_name("\\System\\Invalid");
+ REQUIRE(PDH_CSTATUS_NO_COUNTER ==
counter_with_invalid_counter_name.addToQuery(pdh_query));
+
+ TestablePDHCounter unparsable_counter("asd"); // Unparsable names are also
filtered when using PDHCounter::createPDHCounter
+ REQUIRE(PDH_CSTATUS_BAD_COUNTERNAME ==
unparsable_counter.addToQuery(pdh_query));
+}
+
+TEST_CASE("PDHCounterArraysAddingToQueryTests",
"[pdhcounterarraysaddingtoquerytests]") {
+ PDH_HQUERY pdh_query;
+ PdhOpenQueryA(nullptr, 0, &pdh_query);
+ auto cleanup = gsl::finally([&pdh_query] { PdhCloseQuery(pdh_query); });
+
+ PDH_HQUERY unopened_pdh_query;
+
+ TestablePDHCounterArray valid_counter("\\Processor(*)\\% Processor Time");
+ REQUIRE(ERROR_SUCCESS == valid_counter.addToQuery(pdh_query));
+ REQUIRE_FALSE(ERROR_SUCCESS == valid_counter.addToQuery(unopened_pdh_query));
+
+ TestablePDHCounterArray
counter_with_invalid_object_name("\\SomethingInvalid(*)\\% Processor Time");
+ REQUIRE(PDH_CSTATUS_NO_OBJECT ==
counter_with_invalid_object_name.addToQuery(pdh_query));
+
+ TestablePDHCounterArray
counter_with_invalid_counter_name("\\Processor(*)\\SomethingInvalid");
+ REQUIRE(PDH_CSTATUS_NO_COUNTER ==
counter_with_invalid_counter_name.addToQuery(pdh_query));
+
+ TestablePDHCounterArray unparsable_counter("asd"); // Unparsable names are
also filtered when using PDHCounter::createPDHCounter
+ REQUIRE(PDH_CSTATUS_BAD_COUNTERNAME ==
unparsable_counter.addToQuery(pdh_query));
+}
+
+TEST_CASE("PDHCounterDataCollectionTest", "[pdhcounterdatacollectiontest]") {
+ PDH_HQUERY pdh_query;
+ PdhOpenQueryA(nullptr, 0, &pdh_query);
+ auto cleanup = gsl::finally([&pdh_query] { PdhCloseQuery(pdh_query); });
+
+ TestablePDHCounter double_counter("\\System\\Threads");
+ TestablePDHCounter int_counter("\\System\\Processes", false);
+ REQUIRE(ERROR_SUCCESS == double_counter.addToQuery(pdh_query));
+ REQUIRE(ERROR_SUCCESS == int_counter.addToQuery(pdh_query));
+
+ PdhCollectQueryData(pdh_query);
+
+ REQUIRE(double_counter.collectData());
+ REQUIRE(int_counter.collectData());
+
+ rapidjson::Document document(rapidjson::kObjectType);
+ double_counter.addToJson(document, document.GetAllocator());
+ int_counter.addToJson(document, document.GetAllocator());
+
+ REQUIRE(document.HasMember("System"));
+ REQUIRE(document["System"].HasMember("Threads"));
+ REQUIRE(document["System"]["Threads"].IsDouble());
+ REQUIRE(document["System"]["Threads"].GetDouble() > 0);
+ REQUIRE(document["System"]["Processes"].IsInt64());
+ REQUIRE(document["System"]["Processes"].GetInt64() > 0);
+}
+
+TEST_CASE("PDHCounterArrayDataCollectionTest",
"[pdhcounterarraydatacollectiontest]") {
+ PDH_HQUERY pdh_query;
+ PdhOpenQueryA(nullptr, 0, &pdh_query);
+ auto cleanup = gsl::finally([&pdh_query] { PdhCloseQuery(pdh_query); });
+
+ TestablePDHCounterArray double_counter_array("\\Process(*)\\Thread Count");
+ TestablePDHCounterArray int_counter_array("\\Process(*)\\ID Process", false);
+ REQUIRE(ERROR_SUCCESS == double_counter_array.addToQuery(pdh_query));
+ REQUIRE(ERROR_SUCCESS == int_counter_array.addToQuery(pdh_query));
+
+ PdhCollectQueryData(pdh_query);
+
+ double_counter_array.collectData();
+ int_counter_array.collectData();
+ std::this_thread::sleep_for(std::chrono::milliseconds(2000));
+ REQUIRE(double_counter_array.collectData());
+ REQUIRE(int_counter_array.collectData());
+
+ rapidjson::Document document(rapidjson::kObjectType);
+ double_counter_array.addToJson(document, document.GetAllocator());
+ int_counter_array.addToJson(document, document.GetAllocator());
+
+ REQUIRE(document.HasMember("Process"));
+ REQUIRE(document["Process"].HasMember("PerformanceDataCounterTests"));
+ REQUIRE(document["Process"]["PerformanceDataCounterTests"].HasMember("Thread
Count"));
+ REQUIRE(document["Process"]["PerformanceDataCounterTests"].HasMember("ID
Process"));
+ REQUIRE(document["Process"]["PerformanceDataCounterTests"]["Thread
Count"].IsDouble());
+ REQUIRE(document["Process"]["PerformanceDataCounterTests"]["ID
Process"].IsInt64());
+}
+
+TEST_CASE("MemoryConsumptionCounterTest", "[memoryconsumptioncountertest]") {
+ MemoryConsumptionCounter memory_counter;
+ REQUIRE_FALSE(memory_counter.dataIsValid());
+ memory_counter.collectData();
+ REQUIRE(memory_counter.dataIsValid());
+
+ rapidjson::Document document(rapidjson::kObjectType);
+ memory_counter.addToJson(document, document.GetAllocator());
+
+ REQUIRE(document.HasMember("Memory"));
+ REQUIRE(document["Memory"].HasMember("Total Physical Memory"));
+ REQUIRE(document["Memory"].HasMember("Available Physical Memory"));
+ REQUIRE(document["Memory"].HasMember("Total paging file size"));
+
+ REQUIRE(document["Memory"]["Total Physical Memory"].IsInt64());
+ REQUIRE(document["Memory"]["Available Physical Memory"].IsInt64());
+ REQUIRE(document["Memory"]["Total paging file size"].IsInt64());
+
+ REQUIRE(document["Memory"]["Total Physical Memory"].GetInt64() > 0);
+ REQUIRE(document["Memory"]["Available Physical Memory"].GetInt64() > 0);
+ REQUIRE(document["Memory"]["Total paging file size"].GetInt64() > 0);
+}
diff --git a/extensions/pdh/tests/PerformanceDataMonitorTests.cpp
b/extensions/pdh/tests/PerformanceDataMonitorTests.cpp
new file mode 100644
index 0000000..2c08ced
--- /dev/null
+++ b/extensions/pdh/tests/PerformanceDataMonitorTests.cpp
@@ -0,0 +1,231 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <memory>
+#include <string>
+#include <vector>
+#include <set>
+#include <fstream>
+
+#include "TestBase.h"
+#include "processors/PutFile.h"
+#include "utils/file/FileUtils.h"
+#include "utils/TestUtils.h"
+#include "PerformanceDataMonitor.h"
+#include "rapidjson/filereadstream.h"
+
+using org::apache::nifi::minifi::processors::PutFile;
+using org::apache::nifi::minifi::processors::PerformanceDataMonitor;
+using org::apache::nifi::minifi::processors::PerformanceDataCounter;
+
+class PerformanceDataMonitorTester {
+ public:
+ PerformanceDataMonitorTester() {
+ LogTestController::getInstance().setTrace<TestPlan>();
+ dir_ = utils::createTempDir(&test_controller_);
+ plan_ = test_controller_.createPlan();
+ performance_monitor_ = plan_->addProcessor("PerformanceDataMonitor",
"pdhsys");
+ putfile_ = plan_->addProcessor("PutFile", "putfile",
core::Relationship("success", "description"), true);
+ plan_->setProperty(putfile_, PutFile::Directory.getName(), dir_);
+ }
+
+ void runProcessors() {
+ plan_->runNextProcessor(); // PerformanceMonitor
+ std::this_thread::sleep_for(std::chrono::milliseconds(200));
+ plan_->runCurrentProcessor(); // PerformanceMonitor
+ plan_->runNextProcessor(); // PutFile
+ plan_->runCurrentProcessor(); // PutFile
+ }
+
+ void setPerformanceMonitorProperty(const core::Property& property, const
std::string& value) {
+ plan_->setProperty(performance_monitor_, property.getName(), value);
+ }
+
+ TestController test_controller_;
+ std::string dir_;
+ std::shared_ptr<TestPlan> plan_;
+ std::shared_ptr<core::Processor> performance_monitor_;
+ std::shared_ptr<core::Processor> putfile_;
+};
+
+
+TEST_CASE("PerformanceDataMonitorEmptyPropertiesTest",
"[performancedatamonitoremptypropertiestest]") {
+ PerformanceDataMonitorTester tester;
+ tester.runProcessors();
+
+ REQUIRE(tester.test_controller_.getLog().getInstance().contains("No valid
counters for PerformanceDataMonitor", std::chrono::seconds(0)));
+
+ auto created_flow_files = utils::file::FileUtils::list_dir_all(tester.dir_,
tester.plan_->getLogger(), false);
+ REQUIRE(created_flow_files.size() == 0);
+}
+
+TEST_CASE("PerformanceDataMonitorPartiallyInvalidGroupPropertyTest",
"[performancedatamonitorpartiallyinvalidgrouppropertytest]") {
+ PerformanceDataMonitorTester tester;
+
tester.setPerformanceMonitorProperty(PerformanceDataMonitor::PredefinedGroups,
"Disk,CPU,Asd");
+
tester.setPerformanceMonitorProperty(PerformanceDataMonitor::CustomPDHCounters,
"\\Invalid\\Counter,\\System\\Processes");
+ tester.runProcessors();
+
+ REQUIRE(tester.test_controller_.getLog().getInstance().contains("Asd is not
a valid predefined group", std::chrono::seconds(0)));
+ REQUIRE(tester.test_controller_.getLog().getInstance().contains("Error
adding \\Invalid\\Counter to query", std::chrono::seconds(0)));
+
+ uint32_t number_of_flowfiles = 0;
+
+ auto lambda = [&number_of_flowfiles](const std::string& path, const
std::string& filename) -> bool {
+ ++number_of_flowfiles;
+ FILE* fp = fopen((path + utils::file::FileUtils::get_separator() +
filename).c_str(), "r");
+ REQUIRE(fp != nullptr);
+ char readBuffer[500];
+ rapidjson::FileReadStream is(fp, readBuffer, sizeof(readBuffer));
+ rapidjson::Document document;
+ document.ParseStream(is);
+ fclose(fp);
+ REQUIRE(document.IsObject());
+ REQUIRE(document.HasMember("LogicalDisk"));
+ REQUIRE(document["LogicalDisk"].HasMember("_Total"));
+ REQUIRE(document["LogicalDisk"]["_Total"].HasMember("Free Megabytes"));
+ REQUIRE(document["System"].HasMember("Processes"));
+ return true;
+ };
+
+ utils::file::FileUtils::list_dir(tester.dir_, lambda,
tester.plan_->getLogger(), false);
+ REQUIRE(number_of_flowfiles == 2);
+}
+
+TEST_CASE("PerformanceDataMonitorCustomPDHCountersTest",
"[performancedatamonitorcustompdhcounterstest]") {
+ PerformanceDataMonitorTester tester;
+
tester.setPerformanceMonitorProperty(PerformanceDataMonitor::CustomPDHCounters,
"\\System\\Processes,\\Process(*)\\% Processor Time");
+ tester.runProcessors();
+
+ uint32_t number_of_flowfiles = 0;
+
+ auto lambda = [&number_of_flowfiles](const std::string& path, const
std::string& filename) -> bool {
+ ++number_of_flowfiles;
+ FILE* fp = fopen((path + utils::file::FileUtils::get_separator() +
filename).c_str(), "r");
+ REQUIRE(fp != nullptr);
+ char readBuffer[50000];
+ rapidjson::FileReadStream is(fp, readBuffer, sizeof(readBuffer));
+ rapidjson::Document document;
+ document.ParseStream(is);
+ fclose(fp);
+ REQUIRE(document.IsObject());
+ REQUIRE(document.HasMember("System"));
+ REQUIRE(document["System"].HasMember("Processes"));
+ REQUIRE(document.HasMember("Process"));
+ REQUIRE(document["Process"].HasMember("PerformanceDataMonitorTests"));
+ REQUIRE(document["Process"]["PerformanceDataMonitorTests"].HasMember("%
Processor Time"));
+ return true;
+ };
+
+ utils::file::FileUtils::list_dir(tester.dir_, lambda,
tester.plan_->getLogger(), false);
+ REQUIRE(number_of_flowfiles == 2);
+}
+
+TEST_CASE("PerformanceDataMonitorCustomPDHCountersTestOpenTelemetry",
"[performancedatamonitorcustompdhcounterstestopentelemetry]") {
+ PerformanceDataMonitorTester tester;
+
tester.setPerformanceMonitorProperty(PerformanceDataMonitor::CustomPDHCounters,
"\\System\\Processes,\\Process(*)\\ID Process");
+
tester.setPerformanceMonitorProperty(PerformanceDataMonitor::OutputFormatProperty,
"OpenTelemetry");
+ tester.runProcessors();
+
+ uint32_t number_of_flowfiles = 0;
+
+ auto lambda = [&number_of_flowfiles](const std::string& path, const
std::string& filename) -> bool {
+ ++number_of_flowfiles;
+ FILE* fp = fopen((path + utils::file::FileUtils::get_separator() +
filename).c_str(), "r");
+ REQUIRE(fp != nullptr);
+ char readBuffer[50000];
+ rapidjson::FileReadStream is(fp, readBuffer, sizeof(readBuffer));
+ rapidjson::Document document;
+ document.ParseStream(is);
+ fclose(fp);
+ REQUIRE(document.IsObject());
+ REQUIRE(document.HasMember("Name"));
+ REQUIRE(document.HasMember("Timestamp"));
+ REQUIRE(document.HasMember("Body"));
+ REQUIRE(document["Body"].HasMember("System"));
+ REQUIRE(document["Body"]["System"].HasMember("Processes"));
+ REQUIRE(document["Body"].HasMember("Process"));
+
REQUIRE(document["Body"]["Process"].HasMember("PerformanceDataMonitorTests"));
+
REQUIRE(document["Body"]["Process"]["PerformanceDataMonitorTests"].HasMember("ID
Process"));
+ return true;
+ };
+
+ utils::file::FileUtils::list_dir(tester.dir_, lambda,
tester.plan_->getLogger(), false);
+ REQUIRE(number_of_flowfiles == 2);
+}
+
+TEST_CASE("PerformanceDataMonitorAllPredefinedGroups",
"[performancedatamonitorallpredefinedgroups]") {
+ PerformanceDataMonitorTester tester;
+
tester.setPerformanceMonitorProperty(PerformanceDataMonitor::PredefinedGroups,
"CPU,Disk,Network,Memory,IO,System,Process");
+
tester.setPerformanceMonitorProperty(PerformanceDataMonitor::OutputFormatProperty,
"OpenTelemetry");
+ tester.runProcessors();
+
+ uint32_t number_of_flowfiles = 0;
+
+ auto lambda = [&number_of_flowfiles](const std::string& path, const
std::string& filename) -> bool {
+ ++number_of_flowfiles;
+ FILE* fp = fopen((path + utils::file::FileUtils::get_separator() +
filename).c_str(), "r");
+ REQUIRE(fp != nullptr);
+ char readBuffer[50000];
+ rapidjson::FileReadStream is(fp, readBuffer, sizeof(readBuffer));
+ rapidjson::Document document;
+ document.ParseStream(is);
+ fclose(fp);
+ REQUIRE(document.IsObject());
+ REQUIRE(document.HasMember("Name"));
+ REQUIRE(document.HasMember("Timestamp"));
+ REQUIRE(document.HasMember("Body"));
+ REQUIRE(document["Body"].HasMember("PhysicalDisk"));
+ REQUIRE(document["Body"]["PhysicalDisk"].HasMember("_Total"));
+ REQUIRE(document["Body"]["PhysicalDisk"]["_Total"].HasMember("% Disk Read
Time"));
+ REQUIRE(document["Body"]["PhysicalDisk"]["_Total"].HasMember("% Disk
Time"));
+ REQUIRE(document["Body"]["PhysicalDisk"]["_Total"].HasMember("% Disk Write
Time"));
+ REQUIRE(document["Body"]["PhysicalDisk"]["_Total"].HasMember("% Idle
Time"));
+
+ REQUIRE(document["Body"].HasMember("LogicalDisk"));
+ REQUIRE(document["Body"]["LogicalDisk"].HasMember("_Total"));
+ REQUIRE(document["Body"]["LogicalDisk"]["_Total"].HasMember("Free
Megabytes"));
+ REQUIRE(document["Body"]["LogicalDisk"]["_Total"].HasMember("% Free
Space"));
+
+ REQUIRE(document["Body"].HasMember("Processor"));
+ REQUIRE(document["Body"]["Processor"].HasMember("_Total"));
+
+ REQUIRE(document["Body"].HasMember("Network Interface"));
+
+ REQUIRE(document["Body"].HasMember("Memory"));
+ REQUIRE(document["Body"]["Memory"].HasMember("% Committed Bytes In Use"));
+ REQUIRE(document["Body"]["Memory"].HasMember("Available MBytes"));
+ REQUIRE(document["Body"]["Memory"].HasMember("Page Faults/sec"));
+ REQUIRE(document["Body"]["Memory"].HasMember("Pages/sec"));
+
+ REQUIRE(document["Body"].HasMember("System"));
+ REQUIRE(document["Body"]["System"].HasMember("% Registry Quota In Use"));
+ REQUIRE(document["Body"]["System"].HasMember("Context Switches/sec"));
+ REQUIRE(document["Body"]["System"].HasMember("File Control Bytes/sec"));
+ REQUIRE(document["Body"]["System"].HasMember("File Control
Operations/sec"));
+
+ REQUIRE(document["Body"].HasMember("Process"));
+
REQUIRE(document["Body"]["Process"].HasMember("PerformanceDataMonitorTests"));
+
REQUIRE(document["Body"]["Process"]["PerformanceDataMonitorTests"].HasMember("%
Processor Time"));
+
REQUIRE(document["Body"]["Process"]["PerformanceDataMonitorTests"].HasMember("Elapsed
Time"));
+
REQUIRE(document["Body"]["Process"]["PerformanceDataMonitorTests"].HasMember("ID
Process"));
+
REQUIRE(document["Body"]["Process"]["PerformanceDataMonitorTests"].HasMember("Private
Bytes"));
+ return true;
+ };
+
+ utils::file::FileUtils::list_dir(tester.dir_, lambda,
tester.plan_->getLogger(), false);
+ REQUIRE(number_of_flowfiles == 2);
+}
diff --git a/libminifi/include/utils/JsonCallback.h
b/libminifi/include/utils/JsonCallback.h
new file mode 100644
index 0000000..9446fc4
--- /dev/null
+++ b/libminifi/include/utils/JsonCallback.h
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <string>
+#include <utility>
+#include <memory>
+
+#include "rapidjson/stream.h"
+#include "rapidjson/writer.h"
+#include "rapidjson/prettywriter.h"
+
+#include "io/StreamPipe.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+class JsonOutputCallback : public OutputStreamCallback {
+ public:
+ explicit JsonOutputCallback(rapidjson::Document&& root) :
root_(std::move(root)) {}
+
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+ rapidjson::StringBuffer buffer;
+ rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(buffer);
+ root_.Accept(writer);
+ return stream->write(reinterpret_cast<const uint8_t*>(buffer.GetString()),
gsl::narrow<int>(buffer.GetSize()));
+ }
+
+ protected:
+ rapidjson::Document root_;
+};
+
+} // namespace utils
+} // namespace minifi
+} // namespace nifi
+} // namespace apache
+} // namespace org
diff --git a/libminifi/include/utils/OsUtils.h
b/libminifi/include/utils/OsUtils.h
index 387fc03..82c69fc 100644
--- a/libminifi/include/utils/OsUtils.h
+++ b/libminifi/include/utils/OsUtils.h
@@ -38,6 +38,11 @@ int64_t getSystemPhysicalMemoryUsage();
/// Returns the total physical memory in the system in bytes
int64_t getSystemTotalPhysicalMemory();
+#ifdef WIN32
+/// Returns the total paging file size in bytes
+int64_t getTotalPagingFileSize();
+#endif
+
/// Returns the host architecture (e.g. x32, arm64)
std::string getMachineArchitecture();
diff --git a/libminifi/src/utils/OsUtils.cpp b/libminifi/src/utils/OsUtils.cpp
index 437442a..5d38279 100644
--- a/libminifi/src/utils/OsUtils.cpp
+++ b/libminifi/src/utils/OsUtils.cpp
@@ -277,6 +277,16 @@ int64_t OsUtils::getSystemTotalPhysicalMemory() {
#endif
}
+#ifdef WIN32
+int64_t OsUtils::getTotalPagingFileSize() {
+ MEMORYSTATUSEX memory_info;
+ memory_info.dwLength = sizeof(MEMORYSTATUSEX);
+ GlobalMemoryStatusEx(&memory_info);
+ DWORDLONG total_paging_file_size = memory_info.ullTotalPageFile;
+ return total_paging_file_size;
+}
+#endif
+
std::string OsUtils::getMachineArchitecture() {
#if defined(WIN32)
SYSTEM_INFO system_information;
diff --git a/libminifi/test/unit/MemoryUsageTest.cpp
b/libminifi/test/unit/MemoryUsageTest.cpp
index 9fd08a5..3e6726b 100644
--- a/libminifi/test/unit/MemoryUsageTest.cpp
+++ b/libminifi/test/unit/MemoryUsageTest.cpp
@@ -20,22 +20,22 @@
#include "utils/OsUtils.h"
#include "../TestBase.h"
-TEST_CASE("Test memory usage", "[testmemoryusage]") {
+TEST_CASE("Test Physical memory usage", "[testphysicalmemoryusage]") {
constexpr bool cout_enabled = true;
std::vector<char> v(30000000);
- const auto RAMUsagebyProcess =
utils::OsUtils::getCurrentProcessPhysicalMemoryUsage();
- const auto RAMUsagebySystem = utils::OsUtils::getSystemPhysicalMemoryUsage();
- const auto RAMTotal = utils::OsUtils::getSystemTotalPhysicalMemory();
+ const auto ram_usage_by_process =
utils::OsUtils::getCurrentProcessPhysicalMemoryUsage();
+ const auto ram_usage_by_system =
utils::OsUtils::getSystemPhysicalMemoryUsage();
+ const auto ram_total = utils::OsUtils::getSystemTotalPhysicalMemory();
if (cout_enabled) {
- std::cout << "Physical Memory used by this process: " << RAMUsagebyProcess
<< " bytes" << std::endl;
- std::cout << "Physical Memory used by the system: " << RAMUsagebySystem <<
" bytes" << std::endl;
- std::cout << "Total Physical Memory in the system: " << RAMTotal << "
bytes" << std::endl;
+ std::cout << "Physical Memory used by this process: " <<
ram_usage_by_process << " bytes" << std::endl;
+ std::cout << "Physical Memory used by the system: " << ram_usage_by_system
<< " bytes" << std::endl;
+ std::cout << "Total Physical Memory in the system: " << ram_total << "
bytes" << std::endl;
}
- REQUIRE(RAMUsagebyProcess >= v.size());
- REQUIRE(v.size()*2 >= RAMUsagebyProcess);
- REQUIRE(RAMUsagebySystem >= RAMUsagebyProcess);
- REQUIRE(RAMTotal >= RAMUsagebySystem);
+ REQUIRE(ram_usage_by_process >= v.size());
+ REQUIRE(v.size()*2 >= ram_usage_by_process);
+ REQUIRE(ram_usage_by_system >= ram_usage_by_process);
+ REQUIRE(ram_total >= ram_usage_by_system);
}
#ifndef WIN32
@@ -47,3 +47,16 @@ TEST_CASE("Test new and legacy total system memory query
equivalency") {
REQUIRE(GetTotalMemoryLegacy() ==
utils::OsUtils::getSystemTotalPhysicalMemory());
}
#endif
+
+
+#ifdef WIN32
+TEST_CASE("Test Paging file size", "[testpagingfile]") {
+ constexpr bool cout_enabled = true;
+ const auto total_paging_file_size = utils::OsUtils::getTotalPagingFileSize();
+
+ if (cout_enabled) {
+ std::cout << "Total Paging file size: " << total_paging_file_size << "
bytes" << std::endl;
+ }
+ REQUIRE(total_paging_file_size > 0);
+}
+#endif
diff --git a/win_build_vs.bat b/win_build_vs.bat
index cfef2c0..952e3af 100755
--- a/win_build_vs.bat
+++ b/win_build_vs.bat
@@ -49,6 +49,7 @@ for %%x in (%*) do (
if [%%~x] EQU [/C] set build_coap=ON
if [%%~x] EQU [/A] set build_AWS=ON
if [%%~x] EQU [/SFTP] set build_SFTP=ON
+ if [%%~x] EQU [/PDH] set build_PDH=ON
if [%%~x] EQU [/M] set installer_merge_modules=ON
if [%%~x] EQU [/Z] set build_azure=ON
if [%%~x] EQU [/2019] set generator="Visual Studio 16 2019"
@@ -62,7 +63,7 @@ for %%x in (%*) do (
mkdir %builddir%
pushd %builddir%\
-cmake -G %generator% -A %build_platform%
-DINSTALLER_MERGE_MODULES=%installer_merge_modules%
-DTEST_CUSTOM_WEL_PROVIDER=%test_custom_wel_provider% -DENABLE_SQL=%build_SQL%
-DCMAKE_BUILD_TYPE_INIT=%cmake_build_type%
-DCMAKE_BUILD_TYPE=%cmake_build_type% -DWIN32=WIN32
-DENABLE_LIBRDKAFKA=%build_kafka% -DENABLE_JNI=%build_jni% -DOPENSSL_OFF=OFF
-DENABLE_COAP=%build_coap% -DENABLE_AWS=%build_AWS%
-DENABLE_AZURE=%build_azure% -DENABLE_SFTP=%build_SFTP% -DUSE_SHARED_LIBS=OFF
-DDISABLE_CONTROL [...]
+cmake -G %generator% -A %build_platform%
-DINSTALLER_MERGE_MODULES=%installer_merge_modules%
-DTEST_CUSTOM_WEL_PROVIDER=%test_custom_wel_provider% -DENABLE_SQL=%build_SQL%
-DCMAKE_BUILD_TYPE_INIT=%cmake_build_type%
-DCMAKE_BUILD_TYPE=%cmake_build_type% -DWIN32=WIN32
-DENABLE_LIBRDKAFKA=%build_kafka% -DENABLE_JNI=%build_jni% -DOPENSSL_OFF=OFF
-DENABLE_COAP=%build_coap% -DENABLE_AWS=%build_AWS% -DENABLE_PDH=%build_PDH%
-DENABLE_AZURE=%build_azure% -DENABLE_SFTP=%build_SFTP% -DUSE_SHARED_L [...]
IF %ERRORLEVEL% NEQ 0 EXIT /b %ERRORLEVEL%
if [%cpack%] EQU [ON] (
cpack -C %cmake_build_type%