This is an automated email from the ASF dual-hosted git repository.
aboda pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 05ae25c MINIFICPP-1568 Retry PDH processor run to fix test flakiness
05ae25c is described below
commit 05ae25c1aae280767206b641f8c35d485aa85245
Author: Gabor Gyimesi <[email protected]>
AuthorDate: Wed May 26 17:05:47 2021 +0200
MINIFICPP-1568 Retry PDH processor run to fix test flakiness
Signed-off-by: Arpad Boda <[email protected]>
This closes #1085
---
.../pdh/tests/PerformanceDataMonitorTests.cpp | 283 +++++++++++----------
1 file changed, 148 insertions(+), 135 deletions(-)
diff --git a/extensions/pdh/tests/PerformanceDataMonitorTests.cpp
b/extensions/pdh/tests/PerformanceDataMonitorTests.cpp
index 7160bc5..b3d5acd 100644
--- a/extensions/pdh/tests/PerformanceDataMonitorTests.cpp
+++ b/extensions/pdh/tests/PerformanceDataMonitorTests.cpp
@@ -20,6 +20,7 @@
#include <vector>
#include <set>
#include <fstream>
+#include <functional>
#include "TestBase.h"
#include "processors/PutFile.h"
@@ -43,12 +44,18 @@ class PerformanceDataMonitorTester {
plan_->setProperty(putfile_, PutFile::Directory.getName(), dir_);
}
- void runProcessors() {
- plan_->runNextProcessor(); // PerformanceMonitor
- std::this_thread::sleep_for(std::chrono::milliseconds(200));
- plan_->runCurrentProcessor(); // PerformanceMonitor
- plan_->runNextProcessor(); // PutFile
- plan_->runCurrentProcessor(); // PutFile
+ bool runWithRetries(std::function<bool()>&& assertions, uint32_t max_tries =
10) {
+ uint32_t tries = 0;
+ for (uint32_t tries = 0; tries < max_tries; ++tries) {
+ test_controller_.runSession(plan_);
+ if (assertions()) {
+ return true;
+ }
+ plan_->reset();
+ LogTestController::getInstance().reset();
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ }
+ return false;
}
void setPerformanceMonitorProperty(const core::Property& property, const
std::string& value) {
@@ -65,8 +72,7 @@ class PerformanceDataMonitorTester {
TEST_CASE("PerformanceDataMonitorEmptyPropertiesTest",
"[performancedatamonitoremptypropertiestest]") {
PerformanceDataMonitorTester tester;
- tester.runProcessors();
-
+ tester.test_controller_.runSession(tester.plan_);
REQUIRE(tester.test_controller_.getLog().getInstance().contains("No valid
counters for PerformanceDataMonitor", std::chrono::seconds(0)));
auto created_flow_files = utils::file::FileUtils::list_dir_all(tester.dir_,
tester.plan_->getLogger(), false);
@@ -77,61 +83,64 @@
TEST_CASE("PerformanceDataMonitorPartiallyInvalidGroupPropertyTest", "[performan
PerformanceDataMonitorTester tester;
tester.setPerformanceMonitorProperty(PerformanceDataMonitor::PredefinedGroups,
"Disk,CPU,Asd");
tester.setPerformanceMonitorProperty(PerformanceDataMonitor::CustomPDHCounters,
"\\Invalid\\Counter,\\System\\Processes");
- tester.runProcessors();
-
- REQUIRE(tester.test_controller_.getLog().getInstance().contains("Asd is not
a valid predefined group", std::chrono::seconds(0)));
- REQUIRE(tester.test_controller_.getLog().getInstance().contains("Error
adding \\Invalid\\Counter to query", std::chrono::seconds(0)));
-
- uint32_t number_of_flowfiles = 0;
-
- auto lambda = [&number_of_flowfiles](const std::string& path, const
std::string& filename) -> bool {
- ++number_of_flowfiles;
- FILE* fp = fopen((path + utils::file::FileUtils::get_separator() +
filename).c_str(), "r");
- REQUIRE(fp != nullptr);
- char readBuffer[500];
- rapidjson::FileReadStream is(fp, readBuffer, sizeof(readBuffer));
- rapidjson::Document document;
- document.ParseStream(is);
- fclose(fp);
- REQUIRE(document.IsObject());
- REQUIRE(document.HasMember("LogicalDisk"));
- REQUIRE(document["LogicalDisk"].HasMember("_Total"));
- REQUIRE(document["LogicalDisk"]["_Total"].HasMember("Free Megabytes"));
- REQUIRE(document["System"].HasMember("Processes"));
- return true;
+ const auto assertions = [&tester]() {
+ REQUIRE(tester.test_controller_.getLog().getInstance().contains("Asd is
not a valid predefined group", std::chrono::seconds(0)));
+ REQUIRE(tester.test_controller_.getLog().getInstance().contains("Error
adding \\Invalid\\Counter to query", std::chrono::seconds(0)));
+
+ bool ff_contains_all_data = false;
+
+ const auto lambda = [&ff_contains_all_data](const std::string& path, const
std::string& filename) -> bool {
+ FILE* fp = fopen((path + utils::file::FileUtils::get_separator() +
filename).c_str(), "r");
+ REQUIRE(fp != nullptr);
+ char readBuffer[500];
+ rapidjson::FileReadStream is(fp, readBuffer, sizeof(readBuffer));
+ rapidjson::Document document;
+ document.ParseStream(is);
+ fclose(fp);
+ ff_contains_all_data =
+ document.IsObject() &&
+ document.HasMember("LogicalDisk") &&
+ document["LogicalDisk"].HasMember("_Total") &&
+ document["LogicalDisk"]["_Total"].HasMember("Free Megabytes") &&
+ document["System"].HasMember("Processes");
+ return !ff_contains_all_data;
+ };
+
+ utils::file::FileUtils::list_dir(tester.dir_, lambda,
tester.plan_->getLogger(), false);
+ return ff_contains_all_data;
};
-
- utils::file::FileUtils::list_dir(tester.dir_, lambda,
tester.plan_->getLogger(), false);
- REQUIRE(number_of_flowfiles == 2);
+ REQUIRE(tester.runWithRetries(assertions));
}
TEST_CASE("PerformanceDataMonitorCustomPDHCountersTest",
"[performancedatamonitorcustompdhcounterstest]") {
PerformanceDataMonitorTester tester;
tester.setPerformanceMonitorProperty(PerformanceDataMonitor::CustomPDHCounters,
"\\System\\Processes,\\Process(*)\\% Processor Time");
- tester.runProcessors();
-
- uint32_t number_of_flowfiles = 0;
-
- auto lambda = [&number_of_flowfiles](const std::string& path, const
std::string& filename) -> bool {
- ++number_of_flowfiles;
- FILE* fp = fopen((path + utils::file::FileUtils::get_separator() +
filename).c_str(), "r");
- REQUIRE(fp != nullptr);
- char readBuffer[50000];
- rapidjson::FileReadStream is(fp, readBuffer, sizeof(readBuffer));
- rapidjson::Document document;
- document.ParseStream(is);
- fclose(fp);
- REQUIRE(document.IsObject());
- REQUIRE(document.HasMember("System"));
- REQUIRE(document["System"].HasMember("Processes"));
- REQUIRE(document.HasMember("Process"));
- REQUIRE(document["Process"].HasMember("PerformanceDataMonitorTests"));
- REQUIRE(document["Process"]["PerformanceDataMonitorTests"].HasMember("%
Processor Time"));
- return true;
+
+ const auto assertions = [&tester]() {
+ bool ff_contains_all_data = false;
+ const auto lambda = [&ff_contains_all_data](const std::string& path, const
std::string& filename) -> bool {
+ FILE* fp = fopen((path + utils::file::FileUtils::get_separator() +
filename).c_str(), "r");
+ REQUIRE(fp != nullptr);
+ char readBuffer[50000];
+ rapidjson::FileReadStream is(fp, readBuffer, sizeof(readBuffer));
+ rapidjson::Document document;
+ document.ParseStream(is);
+ fclose(fp);
+ ff_contains_all_data =
+ document.IsObject() &&
+ document.HasMember("System") &&
+ document["System"].HasMember("Processes") &&
+ document.HasMember("Process") &&
+ document["Process"].HasMember("PerformanceDataMonitorTests") &&
+ document["Process"]["PerformanceDataMonitorTests"].HasMember("%
Processor Time");
+ return !ff_contains_all_data;
+ };
+
+ utils::file::FileUtils::list_dir(tester.dir_, lambda,
tester.plan_->getLogger(), false);
+ return ff_contains_all_data;
};
- utils::file::FileUtils::list_dir(tester.dir_, lambda,
tester.plan_->getLogger(), false);
- REQUIRE(number_of_flowfiles == 2);
+ REQUIRE(tester.runWithRetries(assertions));
}
TEST_CASE("PerformanceDataMonitorCustomPDHCountersTestOpenTelemetry",
"[performancedatamonitorcustompdhcounterstestopentelemetry]") {
@@ -140,33 +149,35 @@
TEST_CASE("PerformanceDataMonitorCustomPDHCountersTestOpenTelemetry", "[performa
tester.setPerformanceMonitorProperty(PerformanceDataMonitor::CustomPDHCounters,
"\\System\\Processes,\\Process(*)\\ID Process,\\Process(*)\\Private Bytes");
tester.setPerformanceMonitorProperty(PerformanceDataMonitor::OutputFormatProperty,
"OpenTelemetry");
tester.setPerformanceMonitorProperty(PerformanceDataMonitor::OutputCompactness,
"Compact");
- tester.runProcessors();
-
- uint32_t number_of_flowfiles = 0;
-
- auto lambda = [&number_of_flowfiles](const std::string& path, const
std::string& filename) -> bool {
- ++number_of_flowfiles;
- FILE* fp = fopen((path + utils::file::FileUtils::get_separator() +
filename).c_str(), "r");
- REQUIRE(fp != nullptr);
- char readBuffer[50000];
- rapidjson::FileReadStream is(fp, readBuffer, sizeof(readBuffer));
- rapidjson::Document document;
- document.ParseStream(is);
- fclose(fp);
- REQUIRE(document.IsObject());
- REQUIRE(document.HasMember("Name"));
- REQUIRE(document.HasMember("Timestamp"));
- REQUIRE(document.HasMember("Body"));
- REQUIRE(document["Body"].HasMember("System"));
- REQUIRE(document["Body"]["System"].HasMember("Processes"));
- REQUIRE(document["Body"].HasMember("Process"));
-
REQUIRE(document["Body"]["Process"].HasMember("PerformanceDataMonitorTests"));
-
REQUIRE(document["Body"]["Process"]["PerformanceDataMonitorTests"].HasMember("ID
Process"));
- return true;
+
+ const auto assertions = [&tester]() {
+ bool ff_contains_all_data = false;
+ const auto lambda = [&ff_contains_all_data](const std::string& path, const
std::string& filename) -> bool {
+ FILE* fp = fopen((path + utils::file::FileUtils::get_separator() +
filename).c_str(), "r");
+ REQUIRE(fp != nullptr);
+ char readBuffer[50000];
+ rapidjson::FileReadStream is(fp, readBuffer, sizeof(readBuffer));
+ rapidjson::Document document;
+ document.ParseStream(is);
+ fclose(fp);
+ ff_contains_all_data =
+ document.IsObject() &&
+ document.HasMember("Name") &&
+ document.HasMember("Timestamp") &&
+ document.HasMember("Body") &&
+ document["Body"].HasMember("System") &&
+ document["Body"]["System"].HasMember("Processes") &&
+ document["Body"].HasMember("Process") &&
+ document["Body"]["Process"].HasMember("PerformanceDataMonitorTests") &&
+
document["Body"]["Process"]["PerformanceDataMonitorTests"].HasMember("ID
Process");
+ return !ff_contains_all_data;
+ };
+
+ utils::file::FileUtils::list_dir(tester.dir_, lambda,
tester.plan_->getLogger(), false);
+ return ff_contains_all_data;
};
- utils::file::FileUtils::list_dir(tester.dir_, lambda,
tester.plan_->getLogger(), false);
- REQUIRE(number_of_flowfiles == 2);
+ REQUIRE(tester.runWithRetries(assertions));
}
TEST_CASE("PerformanceDataMonitorAllPredefinedGroups",
"[performancedatamonitorallpredefinedgroups]") {
@@ -174,63 +185,65 @@ TEST_CASE("PerformanceDataMonitorAllPredefinedGroups",
"[performancedatamonitora
tester.setPerformanceMonitorProperty(PerformanceDataMonitor::PredefinedGroups,
"CPU,Disk,Network,Memory,IO,System,Process");
tester.setPerformanceMonitorProperty(PerformanceDataMonitor::OutputFormatProperty,
"OpenTelemetry");
tester.setPerformanceMonitorProperty(PerformanceDataMonitor::OutputCompactness,
"Pretty");
- tester.runProcessors();
-
- uint32_t number_of_flowfiles = 0;
-
- auto lambda = [&number_of_flowfiles](const std::string& path, const
std::string& filename) -> bool {
- ++number_of_flowfiles;
- FILE* fp = fopen((path + utils::file::FileUtils::get_separator() +
filename).c_str(), "r");
- REQUIRE(fp != nullptr);
- char readBuffer[50000];
- rapidjson::FileReadStream is(fp, readBuffer, sizeof(readBuffer));
- rapidjson::Document document;
- document.ParseStream(is);
- fclose(fp);
- REQUIRE(document.IsObject());
- REQUIRE(document.HasMember("Name"));
- REQUIRE(document.HasMember("Timestamp"));
- REQUIRE(document.HasMember("Body"));
- REQUIRE(document["Body"].HasMember("PhysicalDisk"));
- REQUIRE(document["Body"]["PhysicalDisk"].HasMember("_Total"));
- REQUIRE(document["Body"]["PhysicalDisk"]["_Total"].HasMember("% Disk Read
Time"));
- REQUIRE(document["Body"]["PhysicalDisk"]["_Total"].HasMember("% Disk
Time"));
- REQUIRE(document["Body"]["PhysicalDisk"]["_Total"].HasMember("% Disk Write
Time"));
- REQUIRE(document["Body"]["PhysicalDisk"]["_Total"].HasMember("% Idle
Time"));
-
- REQUIRE(document["Body"].HasMember("LogicalDisk"));
- REQUIRE(document["Body"]["LogicalDisk"].HasMember("_Total"));
- REQUIRE(document["Body"]["LogicalDisk"]["_Total"].HasMember("Free
Megabytes"));
- REQUIRE(document["Body"]["LogicalDisk"]["_Total"].HasMember("% Free
Space"));
-
- REQUIRE(document["Body"].HasMember("Processor"));
- REQUIRE(document["Body"]["Processor"].HasMember("_Total"));
-
- REQUIRE(document["Body"].HasMember("Network Interface"));
-
- REQUIRE(document["Body"].HasMember("Memory"));
- REQUIRE(document["Body"]["Memory"].HasMember("% Committed Bytes In Use"));
- REQUIRE(document["Body"]["Memory"].HasMember("Available MBytes"));
- REQUIRE(document["Body"]["Memory"].HasMember("Page Faults/sec"));
- REQUIRE(document["Body"]["Memory"].HasMember("Pages/sec"));
-
- REQUIRE(document["Body"].HasMember("System"));
- REQUIRE(document["Body"]["System"].HasMember("% Registry Quota In Use"));
- REQUIRE(document["Body"]["System"].HasMember("Context Switches/sec"));
- REQUIRE(document["Body"]["System"].HasMember("File Control Bytes/sec"));
- REQUIRE(document["Body"]["System"].HasMember("File Control
Operations/sec"));
-
- REQUIRE(document["Body"].HasMember("Process"));
-
REQUIRE(document["Body"]["Process"].HasMember("PerformanceDataMonitorTests"));
-
REQUIRE(document["Body"]["Process"]["PerformanceDataMonitorTests"].HasMember("%
Processor Time"));
-
REQUIRE(document["Body"]["Process"]["PerformanceDataMonitorTests"].HasMember("Elapsed
Time"));
-
REQUIRE(document["Body"]["Process"]["PerformanceDataMonitorTests"].HasMember("ID
Process"));
-
REQUIRE(document["Body"]["Process"]["PerformanceDataMonitorTests"].HasMember("Private
Bytes"));
- return true;
+
+ const auto assertions = [&tester]() {
+ bool ff_contains_all_data = false;
+ const auto lambda = [&ff_contains_all_data](const std::string& path, const
std::string& filename) -> bool {
+ FILE* fp = fopen((path + utils::file::FileUtils::get_separator() +
filename).c_str(), "r");
+ REQUIRE(fp != nullptr);
+ char readBuffer[50000];
+ rapidjson::FileReadStream is(fp, readBuffer, sizeof(readBuffer));
+ rapidjson::Document document;
+ document.ParseStream(is);
+ fclose(fp);
+ ff_contains_all_data =
+ document.IsObject() &&
+ document.HasMember("Name") &&
+ document.HasMember("Timestamp") &&
+ document.HasMember("Body") &&
+ document["Body"].HasMember("PhysicalDisk") &&
+ document["Body"]["PhysicalDisk"].HasMember("_Total") &&
+ document["Body"]["PhysicalDisk"]["_Total"].HasMember("% Disk Read
Time") &&
+ document["Body"]["PhysicalDisk"]["_Total"].HasMember("% Disk Time") &&
+ document["Body"]["PhysicalDisk"]["_Total"].HasMember("% Disk Write
Time") &&
+ document["Body"]["PhysicalDisk"]["_Total"].HasMember("% Idle Time") &&
+
+ document["Body"].HasMember("LogicalDisk") &&
+ document["Body"]["LogicalDisk"].HasMember("_Total") &&
+ document["Body"]["LogicalDisk"]["_Total"].HasMember("Free Megabytes")
&&
+ document["Body"]["LogicalDisk"]["_Total"].HasMember("% Free Space") &&
+
+ document["Body"].HasMember("Processor") &&
+ document["Body"]["Processor"].HasMember("_Total") &&
+
+ document["Body"].HasMember("Network Interface") &&
+
+ document["Body"].HasMember("Memory") &&
+ document["Body"]["Memory"].HasMember("% Committed Bytes In Use") &&
+ document["Body"]["Memory"].HasMember("Available MBytes") &&
+ document["Body"]["Memory"].HasMember("Page Faults/sec") &&
+ document["Body"]["Memory"].HasMember("Pages/sec") &&
+
+ document["Body"].HasMember("System") &&
+ document["Body"]["System"].HasMember("% Registry Quota In Use") &&
+ document["Body"]["System"].HasMember("Context Switches/sec") &&
+ document["Body"]["System"].HasMember("File Control Bytes/sec") &&
+ document["Body"]["System"].HasMember("File Control Operations/sec") &&
+
+ document["Body"].HasMember("Process") &&
+ document["Body"]["Process"].HasMember("PerformanceDataMonitorTests") &&
+
document["Body"]["Process"]["PerformanceDataMonitorTests"].HasMember("%
Processor Time") &&
+
document["Body"]["Process"]["PerformanceDataMonitorTests"].HasMember("Elapsed
Time") &&
+
document["Body"]["Process"]["PerformanceDataMonitorTests"].HasMember("ID
Process") &&
+
document["Body"]["Process"]["PerformanceDataMonitorTests"].HasMember("Private
Bytes");
+ return !ff_contains_all_data;
+ };
+
+ utils::file::FileUtils::list_dir(tester.dir_, lambda,
tester.plan_->getLogger(), false);
+ return ff_contains_all_data;
};
- utils::file::FileUtils::list_dir(tester.dir_, lambda,
tester.plan_->getLogger(), false);
- REQUIRE(number_of_flowfiles == 2);
+ REQUIRE(tester.runWithRetries(assertions));
}
TEST_CASE("PerformanceDataMonitorDecimalPlacesPropertyTest",
"[performancedatamonitordecimalplacespropertytest]") {