This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 98b9055  MINIFICPP-1532: Create a processor to capture resource 
consumption metrics on Windows
98b9055 is described below

commit 98b9055fda485f87cdc6e161c29ed557f063846a
Author: Martin Zink <[email protected]>
AuthorDate: Wed Mar 24 12:29:15 2021 +0100

    MINIFICPP-1532: Create a processor to capture resource consumption metrics 
on Windows
    
    Updated documentation to include information about the new processor
    
    Signed-off-by: Arpad Boda <[email protected]>
    
    This closes #1066
---
 .github/workflows/ci.yml                           |   4 +-
 CMakeLists.txt                                     |  18 +-
 PROCESSORS.md                                      |  22 ++
 README.md                                          |   1 +
 Windows.md                                         |   1 +
 extensions/pdh/CMakeLists.txt                      |  32 +++
 extensions/pdh/MemoryConsumptionCounter.h          |  70 +++++
 extensions/pdh/PDHCounters.cpp                     | 132 ++++++++++
 extensions/pdh/PDHCounters.h                       |  99 +++++++
 extensions/pdh/PerformanceDataCounter.h            |  51 ++++
 extensions/pdh/PerformanceDataMonitor.cpp          | 284 +++++++++++++++++++++
 extensions/pdh/PerformanceDataMonitor.h            |  90 +++++++
 extensions/pdh/tests/CMakeLists.txt                |  37 +++
 .../pdh/tests/PerformanceDataCounterTests.cpp      | 187 ++++++++++++++
 .../pdh/tests/PerformanceDataMonitorTests.cpp      | 231 +++++++++++++++++
 libminifi/include/utils/JsonCallback.h             |  54 ++++
 libminifi/include/utils/OsUtils.h                  |   5 +
 libminifi/src/utils/OsUtils.cpp                    |  10 +
 libminifi/test/unit/MemoryUsageTest.cpp            |  35 ++-
 win_build_vs.bat                                   |   3 +-
 20 files changed, 1347 insertions(+), 19 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 83e7e86..916e6fa 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -102,7 +102,7 @@ jobs:
         run: |
           PATH %PATH%;C:\Program Files (x86)\Windows 
Kits\10\bin\10.0.17763.0\x86
           PATH %PATH%;C:\Program Files (x86)\Microsoft Visual 
Studio\2017\Enterprise\MSBuild\15.0\Bin\Roslyn
-          win_build_vs.bat build /CI /S /A /Z
+          win_build_vs.bat build /CI /S /A /Z /PDH
         shell: cmd
   windows_VS2019:
     name: "windows-vs2019"
@@ -141,7 +141,7 @@ jobs:
         run: |
           PATH %PATH%;C:\Program Files (x86)\Windows 
Kits\10\bin\10.0.19041.0\x64
           PATH %PATH%;C:\Program Files (x86)\Microsoft Visual 
Studio\2019\Enterprise\MSBuild\Current\Bin\Roslyn
-          win_build_vs.bat build /2019 /64 /CI /S /A
+          win_build_vs.bat build /2019 /64 /CI /S /A /PDH
         shell: cmd
   ubuntu_16_04:
     name: "ubuntu-16.04"
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 6a222db..136796d 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -493,7 +493,7 @@ if (ENABLE_TENSORFLOW)
        createExtension(TENSORFLOW-EXTENSIONS "TENSORFLOW EXTENSIONS" "This 
enables TensorFlow support" "extensions/tensorflow" 
"${TEST_DIR}/tensorflow-tests")
 endif()
 
-## AWS Extentions
+## AWS Extensions
 option(ENABLE_AWS "Enables AWS support." OFF)
 if (ENABLE_ALL OR ENABLE_AWS)
        include(BundledAwsSdkCpp)
@@ -501,7 +501,15 @@ if (ENABLE_ALL OR ENABLE_AWS)
        createExtension(AWS-EXTENSIONS "AWS EXTENSIONS" "This enables AWS 
support" "extensions/aws" "${TEST_DIR}/aws-tests")
 endif()
 
-## OpenCV Extesions
+## PDH Extentsions
+if (WIN32)
+       option(ENABLE_PDH "Enables PDH support." OFF)
+       if (ENABLE_PDH)
+               createExtension(PDH-EXTENSIONS "PDH EXTENSIONS" "This enables 
PDH support" "extensions/pdh" "extensions/pdh/tests")
+       endif()
+endif()
+
+## OpenCV Extensions
 option(ENABLE_OPENCV "Disables the OpenCV extensions." OFF)
 if (ENABLE_OPENCV)
        createExtension(OPENCV-EXTENSIONS "OPENCV EXTENSIONS" "This enabled 
OpenCV support" "extensions/opencv" "extensions/opencv/tests")
@@ -515,7 +523,7 @@ if (ENABLE_BUSTACHE)
        createExtension(BUSTACHE-EXTENSIONS "BUSTACHE EXTENSIONS" "This enables 
bustache functionality including ApplyTemplate." "extensions/bustache" 
"${TEST_DIR}/bustache-tests")
 endif()
 
-## OPC Extentions
+## OPC Extensions
 if (ENABLE_OPC)
        include(BundledMbedTLS)
        use_bundled_mbedtls(${CMAKE_CURRENT_SOURCE_DIR} 
${CMAKE_CURRENT_BINARY_DIR})
@@ -546,7 +554,7 @@ if ((ENABLE_OPENWSMAN AND NOT DISABLE_CIVET AND NOT 
DISABLE_CURL) OR ENABLE_ALL
        list(APPEND CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/cmake/libxml2/dummy")
 endif()
 
-## Openwsman Extesions
+## Openwsman Extensions
 if (ENABLE_OPENWSMAN AND NOT DISABLE_CIVET AND NOT DISABLE_CURL)
        include(BundledOpenWSMAN)
        use_bundled_openwsman(${CMAKE_CURRENT_SOURCE_DIR} 
${CMAKE_CURRENT_BINARY_DIR})
@@ -554,7 +562,7 @@ if (ENABLE_OPENWSMAN AND NOT DISABLE_CIVET AND NOT 
DISABLE_CURL)
        createExtension(OPENWSMAN-EXTENSIONS "OPENWSMAN EXTENSIONS" "This 
enables Openwsman support" "extensions/openwsman")
 endif()
 
-## Azure Extentions
+## Azure Extensions
 if (ENABLE_ALL OR ENABLE_AZURE)
        include(NlohmannJson)
        list(APPEND CMAKE_MODULE_PATH 
"${CMAKE_SOURCE_DIR}/cmake/nlohmann_json/dummy")
diff --git a/PROCESSORS.md b/PROCESSORS.md
index d90ff6e..4820510 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -36,6 +36,7 @@
 - [ManipulateArchive](#manipulatearchive)
 - [MergeContent](#mergecontent)
 - [MotionDetector](#motiondetector)
+- [PerformanceDataMonitor](#performancedatamonitor)
 - [PublishKafka](#publishkafka)
 - [PublishMQTT](#publishmqtt)
 - [PutAzureBlobStorage](#putazureblobstorage)
@@ -955,6 +956,27 @@ In the list below, the names of required properties appear 
in bold. Any other pr
 |success|Successful to detect motion|
 
 
+## PerformanceDataMonitor
+
+### Description
+This Windows only processor can create FlowFiles populated with various 
performance data with the help of Windows Performance Counters.
+Windows Performance Counters provide a high-level abstraction layer with a 
consistent interface for collecting various kinds of system data such as CPU, 
memory, and disk usage statistics.
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other 
properties (not in bold) are considered optional. The table also indicates any 
default values, and whether a property supports the NiFi Expression Language.
+
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
+|Predefined 
Groups||CPU<br>IO<br>Disk<br>Network<br>Memory<br>System<br>Process|Comma 
separated list from the allowable values, to monitor multiple common Windows 
Performance counters related to these groups. (e.g. "CPU,Network")|
+|Custom PDH Counters|||Comma separated list of Windows Performance Counters to 
monitor. (e.g. "\\System\\Threads,\\Process(*)\\ID Process")|
+|**Output Format**|JSON|JSON<br>OpenTelemetry|The output format of the new 
flowfile|
+### Relationships
+
+| Name | Description |
+| - | - |
+|success|All files are routed to success|
+
+
 ## PublishKafka
 
 ### Description
diff --git a/README.md b/README.md
index 43ee8e2..b1c97be 100644
--- a/README.md
+++ b/README.md
@@ -87,6 +87,7 @@ Through JNI extensions you can run NiFi processors using 
NARs. The JNI extension
 | OpenCV | [CaptureRTSPFrame](PROCESSORS.md#captureRTSPFrame)     |    
-DENABLE_OPENCV=ON  |
 | OpenWSMAN | SourceInitiatedSubscriptionListener | -DENABLE_OPENWSMAN=ON |
 | PCAP | [CapturePacket](PROCESSORS.md#capturepacket)      |    
-DENABLE_PCAP=ON  |
+| PDH (Windows only) | 
[PerformanceDataMonitor](PROCESSORS.md#performancedatamonitor)      |    
-DENABLE_PDH=ON  |
 | Scripting | [ExecuteScript](PROCESSORS.md#executescript)<br/>**Custom Python 
Processors**     |    -DDISABLE_SCRIPTING=ON  |
 | Sensors | GetEnvironmentalSensors<br/>GetMovementSensors | 
-DENABLE_SENSORS=ON |
 | SFTP | 
[FetchSFTP](PROCESSORS.md#fetchsftp)<br/>[ListSFTP](PROCESSORS.md#listsftp)<br/>[PutSFTP](PROCESSORS.md#putsftp)
 | -DENABLE_SFTP=ON |
diff --git a/Windows.md b/Windows.md
index f32a851..76314c6 100644
--- a/Windows.md
+++ b/Windows.md
@@ -64,6 +64,7 @@ After the build directory it will take optional parameters 
modifying the CMake c
 | /C | Enables CoAP |
 | /A | Enables AWS |
 | /Z | Enables Azure |
+| /PDH | Enables Performance Monitor |
 | /M | Creates installer with merge modules |
 | /64 | Creates 64-bit build instead of a 32-bit one |
 | /D | Builds RelWithDebInfo build instead of Release |
diff --git a/extensions/pdh/CMakeLists.txt b/extensions/pdh/CMakeLists.txt
new file mode 100644
index 0000000..d6c644e
--- /dev/null
+++ b/extensions/pdh/CMakeLists.txt
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt)
+
+file(GLOB SOURCES  "*.cpp")
+
+add_library(minifi-pdh STATIC ${SOURCES})
+set_property(TARGET minifi-pdh PROPERTY POSITION_INDEPENDENT_CODE ON)
+
+target_link_libraries(minifi-pdh ${LIBMINIFI} pdh)
+
+SET(PDH-EXTENSION minifi-pdh PARENT_SCOPE)
+register_extension(minifi-pdh)
+
+register_extension_linter(minifi-pdh-extensions-linter)
diff --git a/extensions/pdh/MemoryConsumptionCounter.h 
b/extensions/pdh/MemoryConsumptionCounter.h
new file mode 100644
index 0000000..f04fbcc
--- /dev/null
+++ b/extensions/pdh/MemoryConsumptionCounter.h
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "PerformanceDataCounter.h"
+#include <string>
+#include "utils/OsUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class MemoryConsumptionCounter : public PerformanceDataCounter {
+ public:
+  MemoryConsumptionCounter() : PerformanceDataCounter(), 
total_physical_memory_(-1), available_physical_memory_(-1), 
total_paging_file_size_(-1) {
+  }
+
+  bool dataIsValid() {
+    return total_physical_memory_ > 0 && available_physical_memory_ > 0 && 
total_paging_file_size_ > 0;
+  }
+
+  bool collectData() override {
+    total_physical_memory_ = utils::OsUtils::getSystemTotalPhysicalMemory();
+    available_physical_memory_ = 
utils::OsUtils::getSystemTotalPhysicalMemory() - 
utils::OsUtils::getSystemPhysicalMemoryUsage();
+    total_paging_file_size_ = utils::OsUtils::getTotalPagingFileSize();
+    return dataIsValid();
+  }
+
+  void addToJson(rapidjson::Value& body, rapidjson::Document::AllocatorType& 
alloc) const override {
+    rapidjson::Value& group_node = acquireNode(std::string("Memory"), body, 
alloc);
+
+    rapidjson::Value total_physical_memory_value;
+    total_physical_memory_value.SetInt64(total_physical_memory_);
+    group_node.AddMember(rapidjson::Value("Total Physical Memory", alloc), 
total_physical_memory_value, alloc);
+
+    rapidjson::Value available_physical_memory_value;
+    available_physical_memory_value.SetInt64(available_physical_memory_);
+    group_node.AddMember(rapidjson::Value("Available Physical Memory", alloc), 
available_physical_memory_value, alloc);
+
+    rapidjson::Value total_paging_file_size_value;
+    total_paging_file_size_value.SetInt64(total_paging_file_size_);
+    group_node.AddMember(rapidjson::Value("Total paging file size", alloc), 
total_paging_file_size_value, alloc);
+  }
+
+  int64_t total_physical_memory_;
+  int64_t available_physical_memory_;
+  int64_t total_paging_file_size_;
+};
+}  // namespace processors
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/extensions/pdh/PDHCounters.cpp b/extensions/pdh/PDHCounters.cpp
new file mode 100644
index 0000000..03a839f
--- /dev/null
+++ b/extensions/pdh/PDHCounters.cpp
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "PDHCounters.h"
+#include "utils/StringUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+DWORD PDHCounter::getDWFormat() const {
+  return is_double_format_ ? PDH_FMT_DOUBLE : PDH_FMT_LARGE;
+}
+
+std::unique_ptr<PDHCounter> PDHCounter::createPDHCounter(const std::string& 
query_name, bool is_double) {
+  auto groups = utils::StringUtils::split(query_name, "\\");
+  if (groups.size() != 2 || query_name.substr(0, 1) != "\\")
+    return nullptr;
+  if (query_name.find("(*)") != std::string::npos) {
+    return std::unique_ptr<PDHCounter> { new PDHCounterArray(query_name, 
is_double) };
+  } else {
+    return std::unique_ptr<SinglePDHCounter> { new 
SinglePDHCounter(query_name, is_double) };
+  }
+}
+
+const std::string& PDHCounter::getName() const {
+  return pdh_english_counter_name_;
+}
+
+std::string PDHCounter::getObjectName() const {
+  auto groups = utils::StringUtils::split(pdh_english_counter_name_, "\\");
+  return groups[0];
+}
+
+std::string PDHCounter::getCounterName() const {
+  auto groups = utils::StringUtils::split(pdh_english_counter_name_, "\\");
+  return groups[1];
+}
+
+void SinglePDHCounter::addToJson(rapidjson::Value& body, 
rapidjson::Document::AllocatorType& alloc) const {
+  rapidjson::Value key(getCounterName().c_str(), getCounterName().length(), 
alloc);
+  rapidjson::Value& group_node = acquireNode(getObjectName(), body, alloc);
+  group_node.AddMember(key, getValue(), alloc);
+}
+
+PDH_STATUS SinglePDHCounter::addToQuery(PDH_HQUERY& pdh_query)  {
+  return PdhAddEnglishCounterA(pdh_query, pdh_english_counter_name_.c_str(), 
0, &counter_);
+}
+
+bool SinglePDHCounter::collectData() {
+  return PdhGetFormattedCounterValue(counter_, getDWFormat(), nullptr, 
&current_value_) == ERROR_SUCCESS;
+}
+
+rapidjson::Value SinglePDHCounter::getValue() const {
+  rapidjson::Value value;
+  if (is_double_format_)
+    value.SetDouble(current_value_.doubleValue);
+  else
+    value.SetInt64(current_value_.largeValue);
+  return value;
+}
+
+std::string PDHCounterArray::getObjectName() const {
+  std::string group_name_with_wildcard = PDHCounter::getObjectName();
+  return group_name_with_wildcard.substr(0, 
group_name_with_wildcard.find("(*)"));
+}
+
+void PDHCounterArray::addToJson(rapidjson::Value& body, 
rapidjson::Document::AllocatorType& alloc) const {
+  rapidjson::Value& group_node = acquireNode(getObjectName(), body, alloc);
+  for (DWORD i = 0; i < item_count_; ++i) {
+    rapidjson::Value& counter_node = 
acquireNode(std::string(values_[i].szName), group_node, alloc);
+    rapidjson::Value value;
+    if (is_double_format_)
+      value.SetDouble(values_[i].FmtValue.doubleValue);
+    else
+      value.SetInt64(values_[i].FmtValue.largeValue);
+    rapidjson::Value key;
+    key.SetString(getCounterName().c_str(), getCounterName().length(), alloc);
+    counter_node.AddMember(key, value, alloc);
+  }
+}
+
+PDH_STATUS PDHCounterArray::addToQuery(PDH_HQUERY& pdh_query) {
+  return PdhAddEnglishCounterA(pdh_query, pdh_english_counter_name_.c_str(), 
0, &counter_);
+}
+
+bool PDHCounterArray::collectData() {
+  clearCurrentData();
+  PDH_STATUS status = PdhGetFormattedCounterArrayA(counter_, getDWFormat(), 
&buffer_size_, &item_count_, values_);
+  if (PDH_MORE_DATA == status) {
+    values_ = 
reinterpret_cast<PDH_FMT_COUNTERVALUE_ITEM*>(malloc(buffer_size_));
+    status = PdhGetFormattedCounterArrayA(counter_, getDWFormat(), 
&buffer_size_, &item_count_, values_);
+  }
+  return status == ERROR_SUCCESS;
+}
+
+void PDHCounterArray::clearCurrentData() {
+  free(values_);
+  values_ = nullptr;
+  buffer_size_ = item_count_ = 0;
+}
+
+rapidjson::Value PDHCounterArray::getValue(const DWORD i) const {
+  rapidjson::Value value;
+  if (is_double_format_)
+    value.SetDouble(values_[i].FmtValue.doubleValue);
+  else
+    value.SetInt64(values_[i].FmtValue.largeValue);
+  return value;
+}
+
+}  // namespace processors
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/extensions/pdh/PDHCounters.h b/extensions/pdh/PDHCounters.h
new file mode 100644
index 0000000..2a0d3e7
--- /dev/null
+++ b/extensions/pdh/PDHCounters.h
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <TCHAR.h>
+#include <pdh.h>
+#include <pdhmsg.h>
+#include <string>
+
+#include "PerformanceDataCounter.h"
+#include <memory>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class PDHCounter : public PerformanceDataCounter {
+ public:
+  virtual ~PDHCounter() {}
+  static std::unique_ptr<PDHCounter> createPDHCounter(const std::string& 
query_name, bool is_double = true);
+
+  const std::string& getName() const;
+  virtual std::string getObjectName() const;
+  virtual std::string getCounterName() const;
+  virtual PDH_STATUS addToQuery(PDH_HQUERY& pdh_query) = 0;
+ protected:
+  PDHCounter(const std::string& query_name, bool is_double)
+      : PerformanceDataCounter(), is_double_format_(is_double), 
pdh_english_counter_name_(query_name) {}
+
+  DWORD getDWFormat() const;
+
+  const bool is_double_format_;
+  std::string pdh_english_counter_name_;
+};
+
+class SinglePDHCounter : public PDHCounter {
+ public:
+  void addToJson(rapidjson::Value& body, rapidjson::Document::AllocatorType& 
alloc) const override;
+
+  PDH_STATUS addToQuery(PDH_HQUERY& pdh_query) override;
+  bool collectData() override;
+
+ protected:
+  friend std::unique_ptr<PDHCounter> PDHCounter::createPDHCounter(const 
std::string& query_name, bool is_double);
+  explicit SinglePDHCounter(const std::string& query_name, bool is_double)
+      : PDHCounter(query_name, is_double), counter_(nullptr), current_value_() 
{}
+
+  rapidjson::Value getValue() const;
+
+  PDH_HCOUNTER counter_;
+  PDH_FMT_COUNTERVALUE current_value_;
+};
+
+class PDHCounterArray : public PDHCounter {
+ public:
+  virtual ~PDHCounterArray() { clearCurrentData(); }
+  void addToJson(rapidjson::Value& body, rapidjson::Document::AllocatorType& 
alloc) const override;
+
+  std::string getObjectName() const override;
+  PDH_STATUS addToQuery(PDH_HQUERY& pdh_query) override;
+  bool collectData() override;
+
+ protected:
+  friend std::unique_ptr<PDHCounter> PDHCounter::createPDHCounter(const 
std::string& query_name, bool is_double);
+  explicit PDHCounterArray(const std::string& query_name, bool is_double)
+      : PDHCounter(query_name, is_double), counter_(nullptr), buffer_size_(0), 
item_count_(0), values_(nullptr) {}
+
+  void clearCurrentData();
+  rapidjson::Value getValue(const DWORD i) const;
+
+  PDH_HCOUNTER counter_;
+  DWORD buffer_size_;         // Size of the values_ array in bytes
+  DWORD item_count_;          // Number of items in values_ array
+  PDH_FMT_COUNTERVALUE_ITEM* values_;  // Array of PDH_FMT_COUNTERVALUE_ITEM 
structures
+};
+
+
+}  // namespace processors
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/extensions/pdh/PerformanceDataCounter.h 
b/extensions/pdh/PerformanceDataCounter.h
new file mode 100644
index 0000000..b7f69d3
--- /dev/null
+++ b/extensions/pdh/PerformanceDataCounter.h
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <string>
+#include "rapidjson/document.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class PerformanceDataCounter {
+ public:
+  PerformanceDataCounter() = default;
+  virtual ~PerformanceDataCounter() = default;
+
+  virtual bool collectData() = 0;
+  virtual void addToJson(rapidjson::Value& body, 
rapidjson::Document::AllocatorType& alloc) const = 0;
+
+ protected:
+  static rapidjson::Value& acquireNode(const std::string& node_name, 
rapidjson::Value& parent_node, rapidjson::Document::AllocatorType& alloc) {
+    if (!parent_node.HasMember(node_name.c_str())) {
+      rapidjson::Value value(rapidjson::kObjectType);
+      rapidjson::Value key(node_name.c_str(), alloc);
+      parent_node.AddMember(key, value, alloc);
+    }
+    return parent_node[node_name.c_str()];
+  }
+};
+
+}  // namespace processors
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/extensions/pdh/PerformanceDataMonitor.cpp 
b/extensions/pdh/PerformanceDataMonitor.cpp
new file mode 100644
index 0000000..f25517c
--- /dev/null
+++ b/extensions/pdh/PerformanceDataMonitor.cpp
@@ -0,0 +1,284 @@
+/**
+ * @file GenerateFlowFile.cpp
+ * GenerateFlowFile class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "PerformanceDataMonitor.h"
+#include "PDHCounters.h"
+#include "MemoryConsumptionCounter.h"
+#include "utils/StringUtils.h"
+#include "utils/JsonCallback.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Relationship PerformanceDataMonitor::Success("success", "All files are 
routed to success");
+
+core::Property PerformanceDataMonitor::PredefinedGroups(
+    core::PropertyBuilder::createProperty("Predefined Groups")->
+    withDescription("Comma separated list from the allowable values, to 
monitor multiple common Windows Performance counters related to these groups")->
+    withDefaultValue("")->build());
+
+core::Property PerformanceDataMonitor::CustomPDHCounters(
+    core::PropertyBuilder::createProperty("Custom PDH Counters")->
+    withDescription("Comma separated list of Windows Performance Counters to 
monitor")->
+    withDefaultValue("")->build());
+
+core::Property PerformanceDataMonitor::OutputFormatProperty(
+    core::PropertyBuilder::createProperty("Output Format")->
+    withDescription("Format of the created flowfiles")->
+    withAllowableValue<std::string>(JSON_FORMAT_STR)->
+    withAllowableValue(OPEN_TELEMETRY_FORMAT_STR)->
+    withDefaultValue(JSON_FORMAT_STR)->build());
+
+PerformanceDataMonitor::~PerformanceDataMonitor() {
+  PdhCloseQuery(pdh_query_);
+}
+
+void PerformanceDataMonitor::onSchedule(const 
std::shared_ptr<core::ProcessContext>& context, const 
std::shared_ptr<core::ProcessSessionFactory>& sessionFactory) {
+  setupMembersFromProperties(context);
+
+  PdhOpenQueryA(nullptr, 0, &pdh_query_);
+
+  for (auto it = resource_consumption_counters_.begin(); it != 
resource_consumption_counters_.end();) {
+    PDHCounter* pdh_counter = dynamic_cast<PDHCounter*> (it->get());
+    if (pdh_counter != nullptr) {
+      PDH_STATUS add_to_query_result = pdh_counter->addToQuery(pdh_query_);
+      if (add_to_query_result != ERROR_SUCCESS) {
+        logger_->log_error("Error adding %s to query, error code: 0x%x", 
pdh_counter->getName(), add_to_query_result);
+        it = resource_consumption_counters_.erase(it);
+        continue;
+      }
+    }
+    ++it;
+  }
+
+  PDH_STATUS collect_query_data_result = PdhCollectQueryData(pdh_query_);
+  if (ERROR_SUCCESS != collect_query_data_result) {
+    logger_->log_error("Error during PdhCollectQueryData, error code: 0x%x", 
collect_query_data_result);
+  }
+}
+
+void PerformanceDataMonitor::onTrigger(core::ProcessContext* context, 
core::ProcessSession* session) {
+  if (resource_consumption_counters_.empty()) {
+    logger_->log_error("No valid counters for PerformanceDataMonitor");
+    yield();
+    return;
+  }
+
+  std::shared_ptr<core::FlowFile> flowFile = session->create();
+  if (!flowFile) {
+    logger_->log_error("Failed to create flowfile!");
+    yield();
+    return;
+  }
+
+  PDH_STATUS collect_query_data_result = PdhCollectQueryData(pdh_query_);
+  if (ERROR_SUCCESS != collect_query_data_result) {
+    logger_->log_error("Error during PdhCollectQueryData, error code: 0x%x", 
collect_query_data_result);
+    yield();
+    return;
+  }
+
+  rapidjson::Document root = rapidjson::Document(rapidjson::kObjectType);
+  rapidjson::Value& body = prepareJSONBody(root);
+  for (auto& counter : resource_consumption_counters_) {
+    if (counter->collectData())
+      counter->addToJson(body, root.GetAllocator());
+  }
+  utils::JsonOutputCallback callback(std::move(root));
+  session->write(flowFile, &callback);
+  session->transfer(flowFile, Success);
+}
+
+void PerformanceDataMonitor::initialize() {
+  setSupportedProperties({ CustomPDHCounters, PredefinedGroups, 
OutputFormatProperty });
+  setSupportedRelationships({ PerformanceDataMonitor::Success });
+}
+
+rapidjson::Value& PerformanceDataMonitor::prepareJSONBody(rapidjson::Document& 
root) {
+  switch (output_format_) {
+    case OutputFormat::OPENTELEMETRY:
+      root.AddMember("Name", "PerformanceData", root.GetAllocator());
+      root.AddMember("Timestamp", std::time(0), root.GetAllocator());
+      root.AddMember("Body", rapidjson::Value{ rapidjson::kObjectType }, 
root.GetAllocator());
+      return root["Body"];
+    case OutputFormat::JSON:
+      return root;
+    default:
+      return root;
+  }
+}
+
+void 
add_cpu_related_counters(std::vector<std::unique_ptr<PerformanceDataCounter>>& 
resource_consumption_counters) {
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Processor(*)\\%
 Processor Time"));
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Processor(*)\\%
 User Time"));
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Processor(*)\\%
 Privileged Time"));
+}
+
+void 
add_io_related_counters(std::vector<std::unique_ptr<PerformanceDataCounter>>& 
resource_consumption_counters) {
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Process(_Total)\\IO
 Read Bytes/sec"));
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Process(_Total)\\IO
 Write Bytes/sec"));
+}
+
+void 
add_disk_related_counters(std::vector<std::unique_ptr<PerformanceDataCounter>>& 
resource_consumption_counters) {
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\LogicalDisk(*)\\%
 Free Space"));
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\LogicalDisk(*)\\Free
 Megabytes", false));
+
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\%
 Disk Read Time"));
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\%
 Disk Time"));
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\%
 Disk Write Time"));
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\%
 Idle Time"));
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\Avg.
 Disk Bytes/Transfer"));
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\Avg.
 Disk Bytes/Read"));
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\Avg.
 Disk Bytes/Write"));
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\Avg.
 Disk Write Queue Length"));
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\Avg.
 Disk Read Queue Length"));
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\Avg.
 Disk Queue Length"));
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\Avg.
 Disk sec/Transfer"));
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\Avg.
 Disk sec/Read"));
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\Avg.
 Disk sec/Write"));
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\Current
 Disk Queue Length", false));
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\Disk
 Transfers/sec"));
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\Disk
 Reads/sec"));
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\Disk
 Writes/sec"));
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\Disk
 Bytes/sec"));
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\Disk
 Read Bytes/sec"));
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\Disk
 Write Bytes/sec"));
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\PhysicalDisk(*)\\Split
 IO/Sec"));
+}
+
+void 
add_network_related_counters(std::vector<std::unique_ptr<PerformanceDataCounter>>&
 resource_consumption_counters) {
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Network 
Interface(*)\\Bytes Received/sec"));
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Network 
Interface(*)\\Bytes Sent/sec"));
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Network 
Interface(*)\\Bytes Total/sec"));
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Network 
Interface(*)\\Current Bandwidth", false));
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Network 
Interface(*)\\Packets/sec"));
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Network 
Interface(*)\\Packets Received/sec"));
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Network 
Interface(*)\\Packets Sent/sec"));
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Network 
Interface(*)\\Packets Received Discarded", false));
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Network 
Interface(*)\\Packets Received Errors", false));
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Network 
Interface(*)\\Packets Received Unknown", false));
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Network 
Interface(*)\\Packets Received Non-Unicast/sec"));
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Network 
Interface(*)\\Packets Received Unicast/sec"));
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Network 
Interface(*)\\Packets Sent Unicast/sec"));
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Network 
Interface(*)\\Packets Sent Non-Unicast/sec"));
+}
+
+void 
add_memory_related_counters(std::vector<std::unique_ptr<PerformanceDataCounter>>&
 resource_consumption_counters) {
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Memory\\%
 Committed Bytes In Use"));
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Memory\\Available
 MBytes", false));
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Memory\\Page
 Faults/sec"));
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Memory\\Pages/sec"));
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Paging 
File(_Total)\\% Usage"));
+
+  
resource_consumption_counters.push_back(std::make_unique<MemoryConsumptionCounter>());
+}
+
+void 
add_process_related_counters(std::vector<std::unique_ptr<PerformanceDataCounter>>&
 resource_consumption_counters) {
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Process(*)\\%
 Processor Time"));
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Process(*)\\Elapsed
 Time"));
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Process(*)\\ID
 Process", false));
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\Process(*)\\Private
 Bytes", false));
+}
+
+void 
add_system_related_counters(std::vector<std::unique_ptr<PerformanceDataCounter>>&
 resource_consumption_counters) {
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\System\\%
 Registry Quota In Use"));
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\System\\Context
 Switches/sec"));
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\System\\File
 Control Bytes/sec"));
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\System\\File
 Control Operations/sec"));
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\System\\File
 Read Bytes/sec"));
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\System\\File
 Read Operations/sec"));
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\System\\File
 Write Bytes/sec"));
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\System\\File
 Write Operations/sec"));
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\System\\File
 Data Operations/sec"));
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\System\\Processes",
 false));
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\System\\Processor
 Queue Length", false));
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\System\\System
 Calls/sec"));
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\System\\System
 Up Time"));
+  
resource_consumption_counters.push_back(PDHCounter::createPDHCounter("\\System\\Threads",
 false));
+}
+
+void PerformanceDataMonitor::addCountersFromPredefinedGroupsProperty(const 
std::string& predefined_groups) {
+  auto groups = utils::StringUtils::splitAndTrim(predefined_groups, ",");
+  for (const auto& group : groups) {
+    if (group == "CPU") {
+      add_cpu_related_counters(resource_consumption_counters_);
+    } else if (group == "IO") {
+      add_io_related_counters(resource_consumption_counters_);
+    } else if (group == "Disk") {
+      add_disk_related_counters(resource_consumption_counters_);
+    } else if (group == "Network") {
+      add_network_related_counters(resource_consumption_counters_);
+    } else if (group == "Memory") {
+      add_memory_related_counters(resource_consumption_counters_);
+    } else if (group == "System") {
+      add_system_related_counters(resource_consumption_counters_);
+    } else if (group == "Process") {
+      add_process_related_counters(resource_consumption_counters_);
+    } else {
+      logger_->log_error((group + " is not a valid predefined group for 
PerformanceDataMonitor").c_str());
+    }
+  }
+}
+
+void PerformanceDataMonitor::addCustomPDHCountersFromProperty(const 
std::string& custom_pdh_counters) {
+  const auto custom_counters = 
utils::StringUtils::splitAndTrim(custom_pdh_counters, ",");
+  for (const auto& custom_counter : custom_counters) {
+    auto counter = PDHCounter::createPDHCounter(custom_counter);
+    if (counter != nullptr)
+      resource_consumption_counters_.push_back(std::move(counter));
+  }
+}
+
+void PerformanceDataMonitor::setupMembersFromProperties(const 
std::shared_ptr<core::ProcessContext>& context) {
+  std::string predefined_groups;
+  if (context->getProperty(PredefinedGroups.getName(), predefined_groups)) {
+    logger_->log_trace("Predefined group configured to be %s", 
predefined_groups);
+    addCountersFromPredefinedGroupsProperty(predefined_groups);
+  }
+
+  std::string custom_pdh_counters;
+  if (context->getProperty(CustomPDHCounters.getName(), custom_pdh_counters)) {
+    logger_->log_trace("Custom PDH counters configured to be %s", 
custom_pdh_counters);
+    addCustomPDHCountersFromProperty(custom_pdh_counters);
+  }
+
+  std::string output_format_string;
+  if (context->getProperty(OutputFormatProperty.getName(), 
output_format_string)) {
+    if (output_format_string == OPEN_TELEMETRY_FORMAT_STR) {
+      logger_->log_trace("OutputFormat is configured to be OpenTelemetry");
+      output_format_ = OutputFormat::OPENTELEMETRY;
+    } else if (output_format_string == JSON_FORMAT_STR) {
+      logger_->log_trace("OutputFormat is configured to be JSON");
+      output_format_ = OutputFormat::JSON;
+    } else {
+      logger_->log_error("Invalid OutputFormat, defaulting to JSON");
+      output_format_ = OutputFormat::JSON;
+    }
+  }
+}
+
+}  // namespace processors
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/extensions/pdh/PerformanceDataMonitor.h 
b/extensions/pdh/PerformanceDataMonitor.h
new file mode 100644
index 0000000..9cc37d2
--- /dev/null
+++ b/extensions/pdh/PerformanceDataMonitor.h
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <pdh.h>
+#include <string>
+#include <vector>
+#include <memory>
+#include <utility>
+
+#include "core/Processor.h"
+
+#include "rapidjson/stream.h"
+#include "rapidjson/writer.h"
+#include "rapidjson/prettywriter.h"
+
+#include "PerformanceDataCounter.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+// PerformanceDataMonitor Class
+class PerformanceDataMonitor : public core::Processor {
+ public:
+  static constexpr const char* JSON_FORMAT_STR = "JSON";
+  static constexpr const char* OPEN_TELEMETRY_FORMAT_STR = "OpenTelemetry";
+
+  explicit PerformanceDataMonitor(const std::string& name, utils::Identifier 
uuid = utils::Identifier())
+      : Processor(name, uuid), output_format_(OutputFormat::JSON),
+        logger_(logging::LoggerFactory<PerformanceDataMonitor>::getLogger()),
+        pdh_query_(nullptr), resource_consumption_counters_() {}
+
+  ~PerformanceDataMonitor() override;
+  static constexpr char const* ProcessorName = "PerformanceDataMonitor";
+  // Supported Properties
+  static core::Property PredefinedGroups;
+  static core::Property CustomPDHCounters;
+  static core::Property OutputFormatProperty;
+  // Supported Relationships
+  static core::Relationship Success;
+
+
+ public:
+  void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const 
std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
+  void onTrigger(core::ProcessContext *context, core::ProcessSession *session) 
override;
+  void initialize() override;
+
+ protected:
+  enum class OutputFormat {
+    JSON, OPENTELEMETRY
+  };
+
+  rapidjson::Value& prepareJSONBody(rapidjson::Document& root);
+
+  void setupMembersFromProperties(const std::shared_ptr<core::ProcessContext>& 
context);
+  void addCountersFromPredefinedGroupsProperty(const std::string& 
custom_pdh_counters);
+  void addCustomPDHCountersFromProperty(const std::string& 
custom_pdh_counters);
+
+  OutputFormat output_format_;
+
+  std::shared_ptr<logging::Logger> logger_;
+  PDH_HQUERY pdh_query_;
+  std::vector<std::unique_ptr<PerformanceDataCounter>> 
resource_consumption_counters_;
+};
+
+REGISTER_RESOURCE(PerformanceDataMonitor, "This processor can create FlowFiles 
with various performance data through Performance Data Helper. (Windows only)");
+
+}  // namespace processors
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/extensions/pdh/tests/CMakeLists.txt 
b/extensions/pdh/tests/CMakeLists.txt
new file mode 100644
index 0000000..29d144b
--- /dev/null
+++ b/extensions/pdh/tests/CMakeLists.txt
@@ -0,0 +1,37 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+file(GLOB PDH_TESTS  "*.cpp")
+
+SET(PDH_TEST_COUNT 0)
+FOREACH(testfile ${PDH_TESTS})
+       get_filename_component(testfilename "${testfile}" NAME_WE)
+       add_executable("${testfilename}" "${testfile}")
+       target_include_directories(${testfilename} PRIVATE BEFORE 
"${CMAKE_SOURCE_DIR}/extensions/standard-processors")
+       target_include_directories(${testfilename} PRIVATE BEFORE 
"${CMAKE_SOURCE_DIR}/extensions/pdh")
+       target_include_directories(${testfilename} PRIVATE BEFORE 
"${CMAKE_SOURCE_DIR}/libminifi/test/")
+       target_include_directories(${testfilename} PRIVATE BEFORE 
"${CMAKE_SOURCE_DIR}/extensions/standard-processors")
+       createTests("${testfilename}")
+       target_link_libraries(${testfilename} ${CATCH_MAIN_LIB})
+       target_wholearchive_library(${testfilename} minifi-pdh)
+       target_wholearchive_library(${testfilename} minifi-standard-processors)
+       MATH(EXPR PDH_TEST_COUNT "${PDH_TEST_COUNT}+1")
+       add_test(NAME "${testfilename}" COMMAND "${testfilename}" 
WORKING_DIRECTORY ${TEST_DIR})
+ENDFOREACH()
+message("-- Finished building ${PDH_TEST_COUNT} PDH related test file(s)...")
diff --git a/extensions/pdh/tests/PerformanceDataCounterTests.cpp 
b/extensions/pdh/tests/PerformanceDataCounterTests.cpp
new file mode 100644
index 0000000..879228c
--- /dev/null
+++ b/extensions/pdh/tests/PerformanceDataCounterTests.cpp
@@ -0,0 +1,187 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "TestBase.h"
+#include "PDHCounters.h"
+#include "MemoryConsumptionCounter.h"
+#include "gsl/gsl-lite.hpp"
+
+using org::apache::nifi::minifi::processors::SinglePDHCounter;
+using org::apache::nifi::minifi::processors::PDHCounterArray;
+using org::apache::nifi::minifi::processors::PDHCounter;
+using org::apache::nifi::minifi::processors::MemoryConsumptionCounter;
+
+
+TEST_CASE("PDHCounterNameTests", "[pdhcounternametests]") {
+  std::unique_ptr<PDHCounter> test_counter = 
PDHCounter::createPDHCounter("\\System\\Threads");
+  REQUIRE(nullptr != dynamic_cast<SinglePDHCounter*> (test_counter.get()));
+  REQUIRE("\\System\\Threads" == test_counter->getName());
+  REQUIRE("System" == test_counter->getObjectName());
+  REQUIRE("Threads" == test_counter->getCounterName());
+}
+
+TEST_CASE("PDHCounterArrayNameTests", "[pdhcounterarraytests]") {
+  std::unique_ptr<PDHCounter> test_counter_array = 
PDHCounter::createPDHCounter("\\LogicalDisk(*)\\% Free Space");
+  REQUIRE(nullptr != dynamic_cast<PDHCounterArray*> 
(test_counter_array.get()));
+  REQUIRE("\\LogicalDisk(*)\\% Free Space" == test_counter_array->getName());
+  REQUIRE("LogicalDisk" == test_counter_array->getObjectName());
+  REQUIRE("% Free Space" == test_counter_array->getCounterName());
+}
+
+TEST_CASE("PDHCountersInvalidNameTests", "[pdhcountersinvalidnametests]") {
+  REQUIRE(nullptr == PDHCounter::createPDHCounter("Invalid Name"));
+  REQUIRE(nullptr == PDHCounter::createPDHCounter(""));
+  REQUIRE(nullptr == PDHCounter::createPDHCounter("Something\\Counter"));
+  REQUIRE(nullptr == PDHCounter::createPDHCounter("\\Too\\Many\\Separators"));
+  REQUIRE(nullptr == PDHCounter::createPDHCounter("Too\\Many\\Separators"));
+  REQUIRE(nullptr != PDHCounter::createPDHCounter("\\Valid\\Counter"));
+}
+
+class TestablePDHCounter : public SinglePDHCounter {
+ public:
+  explicit TestablePDHCounter(const std::string& query_name, bool is_double = 
true) : SinglePDHCounter(query_name, is_double) {
+  }
+};
+
+class TestablePDHCounterArray : public PDHCounterArray {
+ public:
+  explicit TestablePDHCounterArray(const std::string& query_name, bool 
is_double = true) : PDHCounterArray(query_name, is_double) {
+  }
+};
+
+TEST_CASE("PDHCountersAddingToQueryTests", "[pdhcountersaddingtoquerytests]") {
+  PDH_HQUERY pdh_query;
+  PdhOpenQueryA(nullptr, 0, &pdh_query);
+  auto cleanup = gsl::finally([&pdh_query] { PdhCloseQuery(pdh_query); });
+
+  PDH_HQUERY unopened_pdh_query;
+
+  TestablePDHCounter valid_counter("\\System\\Threads");
+  REQUIRE(ERROR_SUCCESS == valid_counter.addToQuery(pdh_query));
+  REQUIRE_FALSE(ERROR_SUCCESS == valid_counter.addToQuery(unopened_pdh_query));
+
+  TestablePDHCounter counter_with_invalid_object_name("\\Invalid\\Threads");
+  REQUIRE(PDH_CSTATUS_NO_OBJECT == 
counter_with_invalid_object_name.addToQuery(pdh_query));
+
+  TestablePDHCounter counter_with_invalid_counter_name("\\System\\Invalid");
+  REQUIRE(PDH_CSTATUS_NO_COUNTER == 
counter_with_invalid_counter_name.addToQuery(pdh_query));
+
+  TestablePDHCounter unparsable_counter("asd");  // Unparsable names are also 
filtered when using PDHCounter::createPDHCounter
+  REQUIRE(PDH_CSTATUS_BAD_COUNTERNAME == 
unparsable_counter.addToQuery(pdh_query));
+}
+
+TEST_CASE("PDHCounterArraysAddingToQueryTests", 
"[pdhcounterarraysaddingtoquerytests]") {
+  PDH_HQUERY pdh_query;
+  PdhOpenQueryA(nullptr, 0, &pdh_query);
+  auto cleanup = gsl::finally([&pdh_query] { PdhCloseQuery(pdh_query); });
+
+  PDH_HQUERY unopened_pdh_query;
+
+  TestablePDHCounterArray valid_counter("\\Processor(*)\\% Processor Time");
+  REQUIRE(ERROR_SUCCESS == valid_counter.addToQuery(pdh_query));
+  REQUIRE_FALSE(ERROR_SUCCESS == valid_counter.addToQuery(unopened_pdh_query));
+
+  TestablePDHCounterArray 
counter_with_invalid_object_name("\\SomethingInvalid(*)\\% Processor Time");
+  REQUIRE(PDH_CSTATUS_NO_OBJECT == 
counter_with_invalid_object_name.addToQuery(pdh_query));
+
+  TestablePDHCounterArray 
counter_with_invalid_counter_name("\\Processor(*)\\SomethingInvalid");
+  REQUIRE(PDH_CSTATUS_NO_COUNTER == 
counter_with_invalid_counter_name.addToQuery(pdh_query));
+
+  TestablePDHCounterArray unparsable_counter("asd");  // Unparsable names are 
also filtered when using PDHCounter::createPDHCounter
+  REQUIRE(PDH_CSTATUS_BAD_COUNTERNAME == 
unparsable_counter.addToQuery(pdh_query));
+}
+
+TEST_CASE("PDHCounterDataCollectionTest", "[pdhcounterdatacollectiontest]") {
+  PDH_HQUERY pdh_query;
+  PdhOpenQueryA(nullptr, 0, &pdh_query);
+  auto cleanup = gsl::finally([&pdh_query] { PdhCloseQuery(pdh_query); });
+
+  TestablePDHCounter double_counter("\\System\\Threads");
+  TestablePDHCounter int_counter("\\System\\Processes", false);
+  REQUIRE(ERROR_SUCCESS == double_counter.addToQuery(pdh_query));
+  REQUIRE(ERROR_SUCCESS == int_counter.addToQuery(pdh_query));
+
+  PdhCollectQueryData(pdh_query);
+
+  REQUIRE(double_counter.collectData());
+  REQUIRE(int_counter.collectData());
+
+  rapidjson::Document document(rapidjson::kObjectType);
+  double_counter.addToJson(document, document.GetAllocator());
+  int_counter.addToJson(document, document.GetAllocator());
+
+  REQUIRE(document.HasMember("System"));
+  REQUIRE(document["System"].HasMember("Threads"));
+  REQUIRE(document["System"]["Threads"].IsDouble());
+  REQUIRE(document["System"]["Threads"].GetDouble() > 0);
+  REQUIRE(document["System"]["Processes"].IsInt64());
+  REQUIRE(document["System"]["Processes"].GetInt64() > 0);
+}
+
+TEST_CASE("PDHCounterArrayDataCollectionTest", 
"[pdhcounterarraydatacollectiontest]") {
+  PDH_HQUERY pdh_query;
+  PdhOpenQueryA(nullptr, 0, &pdh_query);
+  auto cleanup = gsl::finally([&pdh_query] { PdhCloseQuery(pdh_query); });
+
+  TestablePDHCounterArray double_counter_array("\\Process(*)\\Thread Count");
+  TestablePDHCounterArray int_counter_array("\\Process(*)\\ID Process", false);
+  REQUIRE(ERROR_SUCCESS == double_counter_array.addToQuery(pdh_query));
+  REQUIRE(ERROR_SUCCESS == int_counter_array.addToQuery(pdh_query));
+
+  PdhCollectQueryData(pdh_query);
+
+  double_counter_array.collectData();
+  int_counter_array.collectData();
+  std::this_thread::sleep_for(std::chrono::milliseconds(2000));
+  REQUIRE(double_counter_array.collectData());
+  REQUIRE(int_counter_array.collectData());
+
+  rapidjson::Document document(rapidjson::kObjectType);
+  double_counter_array.addToJson(document, document.GetAllocator());
+  int_counter_array.addToJson(document, document.GetAllocator());
+
+  REQUIRE(document.HasMember("Process"));
+  REQUIRE(document["Process"].HasMember("PerformanceDataCounterTests"));
+  REQUIRE(document["Process"]["PerformanceDataCounterTests"].HasMember("Thread 
Count"));
+  REQUIRE(document["Process"]["PerformanceDataCounterTests"].HasMember("ID 
Process"));
+  REQUIRE(document["Process"]["PerformanceDataCounterTests"]["Thread 
Count"].IsDouble());
+  REQUIRE(document["Process"]["PerformanceDataCounterTests"]["ID 
Process"].IsInt64());
+}
+
+TEST_CASE("MemoryConsumptionCounterTest", "[memoryconsumptioncountertest]") {
+  MemoryConsumptionCounter memory_counter;
+  REQUIRE_FALSE(memory_counter.dataIsValid());
+  memory_counter.collectData();
+  REQUIRE(memory_counter.dataIsValid());
+
+  rapidjson::Document document(rapidjson::kObjectType);
+  memory_counter.addToJson(document, document.GetAllocator());
+
+  REQUIRE(document.HasMember("Memory"));
+  REQUIRE(document["Memory"].HasMember("Total Physical Memory"));
+  REQUIRE(document["Memory"].HasMember("Available Physical Memory"));
+  REQUIRE(document["Memory"].HasMember("Total paging file size"));
+
+  REQUIRE(document["Memory"]["Total Physical Memory"].IsInt64());
+  REQUIRE(document["Memory"]["Available Physical Memory"].IsInt64());
+  REQUIRE(document["Memory"]["Total paging file size"].IsInt64());
+
+  REQUIRE(document["Memory"]["Total Physical Memory"].GetInt64() > 0);
+  REQUIRE(document["Memory"]["Available Physical Memory"].GetInt64() > 0);
+  REQUIRE(document["Memory"]["Total paging file size"].GetInt64() > 0);
+}
diff --git a/extensions/pdh/tests/PerformanceDataMonitorTests.cpp 
b/extensions/pdh/tests/PerformanceDataMonitorTests.cpp
new file mode 100644
index 0000000..2c08ced
--- /dev/null
+++ b/extensions/pdh/tests/PerformanceDataMonitorTests.cpp
@@ -0,0 +1,231 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <memory>
+#include <string>
+#include <vector>
+#include <set>
+#include <fstream>
+
+#include "TestBase.h"
+#include "processors/PutFile.h"
+#include "utils/file/FileUtils.h"
+#include "utils/TestUtils.h"
+#include "PerformanceDataMonitor.h"
+#include "rapidjson/filereadstream.h"
+
+using org::apache::nifi::minifi::processors::PutFile;
+using org::apache::nifi::minifi::processors::PerformanceDataMonitor;
+using org::apache::nifi::minifi::processors::PerformanceDataCounter;
+
+class PerformanceDataMonitorTester {
+ public:
+  PerformanceDataMonitorTester() {
+    LogTestController::getInstance().setTrace<TestPlan>();
+    dir_ = utils::createTempDir(&test_controller_);
+    plan_ = test_controller_.createPlan();
+    performance_monitor_ = plan_->addProcessor("PerformanceDataMonitor", 
"pdhsys");
+    putfile_ = plan_->addProcessor("PutFile", "putfile", 
core::Relationship("success", "description"), true);
+    plan_->setProperty(putfile_, PutFile::Directory.getName(), dir_);
+  }
+
+  void runProcessors() {
+    plan_->runNextProcessor();      // PerformanceMonitor
+    std::this_thread::sleep_for(std::chrono::milliseconds(200));
+    plan_->runCurrentProcessor();   // PerformanceMonitor
+    plan_->runNextProcessor();      // PutFile
+    plan_->runCurrentProcessor();   // PutFile
+  }
+
+  void setPerformanceMonitorProperty(const core::Property& property, const 
std::string& value) {
+    plan_->setProperty(performance_monitor_, property.getName(), value);
+  }
+
+  TestController test_controller_;
+  std::string dir_;
+  std::shared_ptr<TestPlan> plan_;
+  std::shared_ptr<core::Processor> performance_monitor_;
+  std::shared_ptr<core::Processor> putfile_;
+};
+
+
+TEST_CASE("PerformanceDataMonitorEmptyPropertiesTest", 
"[performancedatamonitoremptypropertiestest]") {
+  PerformanceDataMonitorTester tester;
+  tester.runProcessors();
+
+  REQUIRE(tester.test_controller_.getLog().getInstance().contains("No valid 
counters for PerformanceDataMonitor", std::chrono::seconds(0)));
+
+  auto created_flow_files = utils::file::FileUtils::list_dir_all(tester.dir_, 
tester.plan_->getLogger(), false);
+  REQUIRE(created_flow_files.size() == 0);
+}
+
+TEST_CASE("PerformanceDataMonitorPartiallyInvalidGroupPropertyTest", 
"[performancedatamonitorpartiallyinvalidgrouppropertytest]") {
+  PerformanceDataMonitorTester tester;
+  
tester.setPerformanceMonitorProperty(PerformanceDataMonitor::PredefinedGroups, 
"Disk,CPU,Asd");
+  
tester.setPerformanceMonitorProperty(PerformanceDataMonitor::CustomPDHCounters, 
"\\Invalid\\Counter,\\System\\Processes");
+  tester.runProcessors();
+
+  REQUIRE(tester.test_controller_.getLog().getInstance().contains("Asd is not 
a valid predefined group", std::chrono::seconds(0)));
+  REQUIRE(tester.test_controller_.getLog().getInstance().contains("Error 
adding \\Invalid\\Counter to query", std::chrono::seconds(0)));
+
+  uint32_t number_of_flowfiles = 0;
+
+  auto lambda = [&number_of_flowfiles](const std::string& path, const 
std::string& filename) -> bool {
+    ++number_of_flowfiles;
+    FILE* fp = fopen((path + utils::file::FileUtils::get_separator() + 
filename).c_str(), "r");
+    REQUIRE(fp != nullptr);
+    char readBuffer[500];
+    rapidjson::FileReadStream is(fp, readBuffer, sizeof(readBuffer));
+    rapidjson::Document document;
+    document.ParseStream(is);
+    fclose(fp);
+    REQUIRE(document.IsObject());
+    REQUIRE(document.HasMember("LogicalDisk"));
+    REQUIRE(document["LogicalDisk"].HasMember("_Total"));
+    REQUIRE(document["LogicalDisk"]["_Total"].HasMember("Free Megabytes"));
+    REQUIRE(document["System"].HasMember("Processes"));
+    return true;
+  };
+
+  utils::file::FileUtils::list_dir(tester.dir_, lambda, 
tester.plan_->getLogger(), false);
+  REQUIRE(number_of_flowfiles == 2);
+}
+
+TEST_CASE("PerformanceDataMonitorCustomPDHCountersTest", 
"[performancedatamonitorcustompdhcounterstest]") {
+  PerformanceDataMonitorTester tester;
+  
tester.setPerformanceMonitorProperty(PerformanceDataMonitor::CustomPDHCounters, 
"\\System\\Processes,\\Process(*)\\% Processor Time");
+  tester.runProcessors();
+
+  uint32_t number_of_flowfiles = 0;
+
+  auto lambda = [&number_of_flowfiles](const std::string& path, const 
std::string& filename) -> bool {
+    ++number_of_flowfiles;
+    FILE* fp = fopen((path + utils::file::FileUtils::get_separator() + 
filename).c_str(), "r");
+    REQUIRE(fp != nullptr);
+    char readBuffer[50000];
+    rapidjson::FileReadStream is(fp, readBuffer, sizeof(readBuffer));
+    rapidjson::Document document;
+    document.ParseStream(is);
+    fclose(fp);
+    REQUIRE(document.IsObject());
+    REQUIRE(document.HasMember("System"));
+    REQUIRE(document["System"].HasMember("Processes"));
+    REQUIRE(document.HasMember("Process"));
+    REQUIRE(document["Process"].HasMember("PerformanceDataMonitorTests"));
+    REQUIRE(document["Process"]["PerformanceDataMonitorTests"].HasMember("% 
Processor Time"));
+    return true;
+  };
+
+  utils::file::FileUtils::list_dir(tester.dir_, lambda, 
tester.plan_->getLogger(), false);
+  REQUIRE(number_of_flowfiles == 2);
+}
+
+TEST_CASE("PerformanceDataMonitorCustomPDHCountersTestOpenTelemetry", 
"[performancedatamonitorcustompdhcounterstestopentelemetry]") {
+  PerformanceDataMonitorTester tester;
+  
tester.setPerformanceMonitorProperty(PerformanceDataMonitor::CustomPDHCounters, 
"\\System\\Processes,\\Process(*)\\ID Process");
+  
tester.setPerformanceMonitorProperty(PerformanceDataMonitor::OutputFormatProperty,
 "OpenTelemetry");
+  tester.runProcessors();
+
+  uint32_t number_of_flowfiles = 0;
+
+  auto lambda = [&number_of_flowfiles](const std::string& path, const 
std::string& filename) -> bool {
+    ++number_of_flowfiles;
+    FILE* fp = fopen((path + utils::file::FileUtils::get_separator() + 
filename).c_str(), "r");
+    REQUIRE(fp != nullptr);
+    char readBuffer[50000];
+    rapidjson::FileReadStream is(fp, readBuffer, sizeof(readBuffer));
+    rapidjson::Document document;
+    document.ParseStream(is);
+    fclose(fp);
+    REQUIRE(document.IsObject());
+    REQUIRE(document.HasMember("Name"));
+    REQUIRE(document.HasMember("Timestamp"));
+    REQUIRE(document.HasMember("Body"));
+    REQUIRE(document["Body"].HasMember("System"));
+    REQUIRE(document["Body"]["System"].HasMember("Processes"));
+    REQUIRE(document["Body"].HasMember("Process"));
+    
REQUIRE(document["Body"]["Process"].HasMember("PerformanceDataMonitorTests"));
+    
REQUIRE(document["Body"]["Process"]["PerformanceDataMonitorTests"].HasMember("ID
 Process"));
+    return true;
+  };
+
+  utils::file::FileUtils::list_dir(tester.dir_, lambda, 
tester.plan_->getLogger(), false);
+  REQUIRE(number_of_flowfiles == 2);
+}
+
+TEST_CASE("PerformanceDataMonitorAllPredefinedGroups", 
"[performancedatamonitorallpredefinedgroups]") {
+  PerformanceDataMonitorTester tester;
+  
tester.setPerformanceMonitorProperty(PerformanceDataMonitor::PredefinedGroups, 
"CPU,Disk,Network,Memory,IO,System,Process");
+  
tester.setPerformanceMonitorProperty(PerformanceDataMonitor::OutputFormatProperty,
 "OpenTelemetry");
+  tester.runProcessors();
+
+  uint32_t number_of_flowfiles = 0;
+
+  auto lambda = [&number_of_flowfiles](const std::string& path, const 
std::string& filename) -> bool {
+    ++number_of_flowfiles;
+    FILE* fp = fopen((path + utils::file::FileUtils::get_separator() + 
filename).c_str(), "r");
+    REQUIRE(fp != nullptr);
+    char readBuffer[50000];
+    rapidjson::FileReadStream is(fp, readBuffer, sizeof(readBuffer));
+    rapidjson::Document document;
+    document.ParseStream(is);
+    fclose(fp);
+    REQUIRE(document.IsObject());
+    REQUIRE(document.HasMember("Name"));
+    REQUIRE(document.HasMember("Timestamp"));
+    REQUIRE(document.HasMember("Body"));
+    REQUIRE(document["Body"].HasMember("PhysicalDisk"));
+    REQUIRE(document["Body"]["PhysicalDisk"].HasMember("_Total"));
+    REQUIRE(document["Body"]["PhysicalDisk"]["_Total"].HasMember("% Disk Read 
Time"));
+    REQUIRE(document["Body"]["PhysicalDisk"]["_Total"].HasMember("% Disk 
Time"));
+    REQUIRE(document["Body"]["PhysicalDisk"]["_Total"].HasMember("% Disk Write 
Time"));
+    REQUIRE(document["Body"]["PhysicalDisk"]["_Total"].HasMember("% Idle 
Time"));
+
+    REQUIRE(document["Body"].HasMember("LogicalDisk"));
+    REQUIRE(document["Body"]["LogicalDisk"].HasMember("_Total"));
+    REQUIRE(document["Body"]["LogicalDisk"]["_Total"].HasMember("Free 
Megabytes"));
+    REQUIRE(document["Body"]["LogicalDisk"]["_Total"].HasMember("% Free 
Space"));
+
+    REQUIRE(document["Body"].HasMember("Processor"));
+    REQUIRE(document["Body"]["Processor"].HasMember("_Total"));
+
+    REQUIRE(document["Body"].HasMember("Network Interface"));
+
+    REQUIRE(document["Body"].HasMember("Memory"));
+    REQUIRE(document["Body"]["Memory"].HasMember("% Committed Bytes In Use"));
+    REQUIRE(document["Body"]["Memory"].HasMember("Available MBytes"));
+    REQUIRE(document["Body"]["Memory"].HasMember("Page Faults/sec"));
+    REQUIRE(document["Body"]["Memory"].HasMember("Pages/sec"));
+
+    REQUIRE(document["Body"].HasMember("System"));
+    REQUIRE(document["Body"]["System"].HasMember("% Registry Quota In Use"));
+    REQUIRE(document["Body"]["System"].HasMember("Context Switches/sec"));
+    REQUIRE(document["Body"]["System"].HasMember("File Control Bytes/sec"));
+    REQUIRE(document["Body"]["System"].HasMember("File Control 
Operations/sec"));
+
+    REQUIRE(document["Body"].HasMember("Process"));
+    
REQUIRE(document["Body"]["Process"].HasMember("PerformanceDataMonitorTests"));
+    
REQUIRE(document["Body"]["Process"]["PerformanceDataMonitorTests"].HasMember("% 
Processor Time"));
+    
REQUIRE(document["Body"]["Process"]["PerformanceDataMonitorTests"].HasMember("Elapsed
 Time"));
+    
REQUIRE(document["Body"]["Process"]["PerformanceDataMonitorTests"].HasMember("ID
 Process"));
+    
REQUIRE(document["Body"]["Process"]["PerformanceDataMonitorTests"].HasMember("Private
 Bytes"));
+    return true;
+  };
+
+  utils::file::FileUtils::list_dir(tester.dir_, lambda, 
tester.plan_->getLogger(), false);
+  REQUIRE(number_of_flowfiles == 2);
+}
diff --git a/libminifi/include/utils/JsonCallback.h 
b/libminifi/include/utils/JsonCallback.h
new file mode 100644
index 0000000..9446fc4
--- /dev/null
+++ b/libminifi/include/utils/JsonCallback.h
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <string>
+#include <utility>
+#include <memory>
+
+#include "rapidjson/stream.h"
+#include "rapidjson/writer.h"
+#include "rapidjson/prettywriter.h"
+
+#include "io/StreamPipe.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+class JsonOutputCallback : public OutputStreamCallback {
+ public:
+  explicit JsonOutputCallback(rapidjson::Document&& root) : 
root_(std::move(root)) {}
+
+  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+    rapidjson::StringBuffer buffer;
+    rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(buffer);
+    root_.Accept(writer);
+    return stream->write(reinterpret_cast<const uint8_t*>(buffer.GetString()), 
gsl::narrow<int>(buffer.GetSize()));
+  }
+
+ protected:
+  rapidjson::Document root_;
+};
+
+}  // namespace utils
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/libminifi/include/utils/OsUtils.h 
b/libminifi/include/utils/OsUtils.h
index 387fc03..82c69fc 100644
--- a/libminifi/include/utils/OsUtils.h
+++ b/libminifi/include/utils/OsUtils.h
@@ -38,6 +38,11 @@ int64_t getSystemPhysicalMemoryUsage();
 /// Returns the total physical memory in the system in bytes
 int64_t getSystemTotalPhysicalMemory();
 
+#ifdef WIN32
+/// Returns the total paging file size in bytes
+int64_t getTotalPagingFileSize();
+#endif
+
 /// Returns the host architecture (e.g. x32, arm64)
 std::string getMachineArchitecture();
 
diff --git a/libminifi/src/utils/OsUtils.cpp b/libminifi/src/utils/OsUtils.cpp
index 437442a..5d38279 100644
--- a/libminifi/src/utils/OsUtils.cpp
+++ b/libminifi/src/utils/OsUtils.cpp
@@ -277,6 +277,16 @@ int64_t OsUtils::getSystemTotalPhysicalMemory() {
 #endif
 }
 
+#ifdef WIN32
+int64_t OsUtils::getTotalPagingFileSize() {
+  MEMORYSTATUSEX memory_info;
+  memory_info.dwLength = sizeof(MEMORYSTATUSEX);
+  GlobalMemoryStatusEx(&memory_info);
+  DWORDLONG total_paging_file_size = memory_info.ullTotalPageFile;
+  return total_paging_file_size;
+}
+#endif
+
 std::string OsUtils::getMachineArchitecture() {
 #if defined(WIN32)
   SYSTEM_INFO system_information;
diff --git a/libminifi/test/unit/MemoryUsageTest.cpp 
b/libminifi/test/unit/MemoryUsageTest.cpp
index 9fd08a5..3e6726b 100644
--- a/libminifi/test/unit/MemoryUsageTest.cpp
+++ b/libminifi/test/unit/MemoryUsageTest.cpp
@@ -20,22 +20,22 @@
 #include "utils/OsUtils.h"
 #include "../TestBase.h"
 
-TEST_CASE("Test memory usage", "[testmemoryusage]") {
+TEST_CASE("Test Physical memory usage", "[testphysicalmemoryusage]") {
   constexpr bool cout_enabled = true;
   std::vector<char> v(30000000);
-  const auto RAMUsagebyProcess = 
utils::OsUtils::getCurrentProcessPhysicalMemoryUsage();
-  const auto RAMUsagebySystem = utils::OsUtils::getSystemPhysicalMemoryUsage();
-  const auto RAMTotal = utils::OsUtils::getSystemTotalPhysicalMemory();
+  const auto ram_usage_by_process = 
utils::OsUtils::getCurrentProcessPhysicalMemoryUsage();
+  const auto ram_usage_by_system = 
utils::OsUtils::getSystemPhysicalMemoryUsage();
+  const auto ram_total = utils::OsUtils::getSystemTotalPhysicalMemory();
 
   if (cout_enabled) {
-    std::cout << "Physical Memory used by this process: " << RAMUsagebyProcess 
 << " bytes" << std::endl;
-    std::cout << "Physical Memory used by the system: " << RAMUsagebySystem << 
" bytes" << std::endl;
-    std::cout << "Total Physical Memory in the system: " << RAMTotal << " 
bytes" << std::endl;
+    std::cout << "Physical Memory used by this process: " << 
ram_usage_by_process << " bytes" << std::endl;
+    std::cout << "Physical Memory used by the system: " << ram_usage_by_system 
<< " bytes" << std::endl;
+    std::cout << "Total Physical Memory in the system: " << ram_total << " 
bytes" << std::endl;
   }
-  REQUIRE(RAMUsagebyProcess >= v.size());
-  REQUIRE(v.size()*2 >= RAMUsagebyProcess);
-  REQUIRE(RAMUsagebySystem >= RAMUsagebyProcess);
-  REQUIRE(RAMTotal >= RAMUsagebySystem);
+  REQUIRE(ram_usage_by_process >= v.size());
+  REQUIRE(v.size()*2 >= ram_usage_by_process);
+  REQUIRE(ram_usage_by_system >= ram_usage_by_process);
+  REQUIRE(ram_total >= ram_usage_by_system);
 }
 
 #ifndef WIN32
@@ -47,3 +47,16 @@ TEST_CASE("Test new and legacy total system memory query 
equivalency") {
   REQUIRE(GetTotalMemoryLegacy() == 
utils::OsUtils::getSystemTotalPhysicalMemory());
 }
 #endif
+
+
+#ifdef WIN32
+TEST_CASE("Test Paging file size", "[testpagingfile]") {
+  constexpr bool cout_enabled = true;
+  const auto total_paging_file_size = utils::OsUtils::getTotalPagingFileSize();
+
+  if (cout_enabled) {
+    std::cout << "Total Paging file size: " << total_paging_file_size << " 
bytes" << std::endl;
+  }
+  REQUIRE(total_paging_file_size > 0);
+}
+#endif
diff --git a/win_build_vs.bat b/win_build_vs.bat
index cfef2c0..952e3af 100755
--- a/win_build_vs.bat
+++ b/win_build_vs.bat
@@ -49,6 +49,7 @@ for %%x in (%*) do (
     if [%%~x] EQU [/C]           set build_coap=ON
     if [%%~x] EQU [/A]           set build_AWS=ON
     if [%%~x] EQU [/SFTP]        set build_SFTP=ON
+    if [%%~x] EQU [/PDH]         set build_PDH=ON
     if [%%~x] EQU [/M]           set installer_merge_modules=ON
     if [%%~x] EQU [/Z]           set build_azure=ON
     if [%%~x] EQU [/2019]        set generator="Visual Studio 16 2019"
@@ -62,7 +63,7 @@ for %%x in (%*) do (
 mkdir %builddir%
 pushd %builddir%\
 
-cmake -G %generator% -A %build_platform% 
-DINSTALLER_MERGE_MODULES=%installer_merge_modules% 
-DTEST_CUSTOM_WEL_PROVIDER=%test_custom_wel_provider% -DENABLE_SQL=%build_SQL% 
-DCMAKE_BUILD_TYPE_INIT=%cmake_build_type% 
-DCMAKE_BUILD_TYPE=%cmake_build_type% -DWIN32=WIN32 
-DENABLE_LIBRDKAFKA=%build_kafka% -DENABLE_JNI=%build_jni% -DOPENSSL_OFF=OFF 
-DENABLE_COAP=%build_coap% -DENABLE_AWS=%build_AWS% 
-DENABLE_AZURE=%build_azure% -DENABLE_SFTP=%build_SFTP%  -DUSE_SHARED_LIBS=OFF 
-DDISABLE_CONTROL [...]
+cmake -G %generator% -A %build_platform% 
-DINSTALLER_MERGE_MODULES=%installer_merge_modules% 
-DTEST_CUSTOM_WEL_PROVIDER=%test_custom_wel_provider% -DENABLE_SQL=%build_SQL% 
-DCMAKE_BUILD_TYPE_INIT=%cmake_build_type% 
-DCMAKE_BUILD_TYPE=%cmake_build_type% -DWIN32=WIN32 
-DENABLE_LIBRDKAFKA=%build_kafka% -DENABLE_JNI=%build_jni% -DOPENSSL_OFF=OFF 
-DENABLE_COAP=%build_coap% -DENABLE_AWS=%build_AWS% -DENABLE_PDH=%build_PDH% 
-DENABLE_AZURE=%build_azure% -DENABLE_SFTP=%build_SFTP%  -DUSE_SHARED_L [...]
 IF %ERRORLEVEL% NEQ 0 EXIT /b %ERRORLEVEL%
 if [%cpack%] EQU [ON] (
     cpack -C %cmake_build_type%

Reply via email to