This is an automated email from the ASF dual-hosted git repository. szaszm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 9c5e2996b3e8cb7fb4aadf9c29bf6970af6fa618 Author: Gabor Gyimesi <[email protected]> AuthorDate: Fri Mar 3 14:12:32 2023 +0100 MINIFICPP-2041 Fix transient failure of HttpPostIntegrationTestChunked There was a concurrency issue between the test logger on one thread and the verifying thread which read the output stream of the logger. Adding a mutex that guards ostringstream and the logger sink that writes ostringstream solves this issue. Closes #1512 Signed-off-by: Marton Szasz <[email protected]> --- extensions/civetweb/processors/ListenHTTP.cpp | 1 + .../tests/C2ClearCoreComponentStateTest.cpp | 2 +- extensions/http-curl/tests/C2LogHeartbeatTest.cpp | 2 +- .../http-curl/tests/HttpPostIntegrationTest.cpp | 2 +- extensions/sftp/tests/ListSFTPTests.cpp | 42 +++++++++--------- .../tests/unit/ListFileTests.cpp | 2 +- .../tests/unit/RetryFlowFileTests.cpp | 4 +- .../tests/unit/TailFileTests.cpp | 50 +++++++++++----------- extensions/windows-event-log/tests/CWELTestUtils.h | 2 +- .../tests/ConsumeWindowsEventLogTests.cpp | 14 +++--- libminifi/include/utils/IntegrationTestUtils.h | 2 +- libminifi/test/LogUtils.h | 12 +++++- libminifi/test/TestBase.cpp | 21 ++++++--- libminifi/test/TestBase.h | 15 ++++--- .../test/azure-tests/ListAzureBlobStorageTests.cpp | 4 +- .../azure-tests/ListAzureDataLakeStorageTests.cpp | 4 +- .../integration/OnScheduleErrorHandlingTests.cpp | 8 ++-- .../integration/StateTransactionalityTests.cpp | 6 +-- 18 files changed, 109 insertions(+), 84 deletions(-) diff --git a/extensions/civetweb/processors/ListenHTTP.cpp b/extensions/civetweb/processors/ListenHTTP.cpp index 127ac332a..02e8d543e 100644 --- a/extensions/civetweb/processors/ListenHTTP.cpp +++ b/extensions/civetweb/processors/ListenHTTP.cpp @@ -262,6 +262,7 @@ bool ListenHTTP::processIncomingFlowFile(core::ProcessSession &session) { /// @return Whether there was a request processed bool ListenHTTP::processRequestBuffer(core::ProcessSession& session) { + gsl_Expects(handler_); std::size_t flow_file_count = 0; for (; batch_size_ == 0 || batch_size_ > flow_file_count; ++flow_file_count) { FlowFileBufferPair flow_file_buffer_pair; diff --git a/extensions/http-curl/tests/C2ClearCoreComponentStateTest.cpp b/extensions/http-curl/tests/C2ClearCoreComponentStateTest.cpp index ebdfd3e20..181a0f28a 100644 --- a/extensions/http-curl/tests/C2ClearCoreComponentStateTest.cpp +++ b/extensions/http-curl/tests/C2ClearCoreComponentStateTest.cpp @@ -137,7 +137,7 @@ class ClearCoreComponentStateHandler: public HeartbeatHandler { } case FlowState::CLEAR_SENT: { auto tail_file_ran_again_checker = [this] { - const auto log_contents = LogTestController::getInstance().log_output.str(); + const auto log_contents = LogTestController::getInstance().getLogs(); const std::string tailing_file_pattern = "[debug] Tailing file " + file_1_location_.string(); const std::string tail_file_committed_pattern = "[trace] ProcessSession committed for TailFile1"; const std::vector<std::string> patterns = {tailing_file_pattern, tailing_file_pattern, tail_file_committed_pattern}; diff --git a/extensions/http-curl/tests/C2LogHeartbeatTest.cpp b/extensions/http-curl/tests/C2LogHeartbeatTest.cpp index 624185d70..1e50c7e54 100644 --- a/extensions/http-curl/tests/C2LogHeartbeatTest.cpp +++ b/extensions/http-curl/tests/C2LogHeartbeatTest.cpp @@ -54,7 +54,7 @@ class VerifyLogC2Heartbeat : public VerifyC2Base { std::chrono::milliseconds(wait_time_), "\"operation\": \"heartbeat\"")); - const auto log = LogTestController::getInstance().log_output.str(); + const auto log = LogTestController::getInstance().getLogs(); auto types_in_heartbeat = log | ranges::views::split('\n') | ranges::views::transform([](auto&& rng) { return rng | ranges::to<std::string>; }) | ranges::views::filter([](auto&& line) { return utils::StringUtils::startsWith(line, " \"type\":"); }) diff --git a/extensions/http-curl/tests/HttpPostIntegrationTest.cpp b/extensions/http-curl/tests/HttpPostIntegrationTest.cpp index d4271a3e9..dfd6d3d18 100644 --- a/extensions/http-curl/tests/HttpPostIntegrationTest.cpp +++ b/extensions/http-curl/tests/HttpPostIntegrationTest.cpp @@ -37,7 +37,7 @@ using namespace std::literals::chrono_literals; class HttpTestHarness : public HTTPIntegrationBase { public: - HttpTestHarness() : HTTPIntegrationBase(4s) { + HttpTestHarness() { dir_ = test_controller_.createTempDirectory(); } diff --git a/extensions/sftp/tests/ListSFTPTests.cpp b/extensions/sftp/tests/ListSFTPTests.cpp index 68faaab04..6a993a2db 100644 --- a/extensions/sftp/tests/ListSFTPTests.cpp +++ b/extensions/sftp/tests/ListSFTPTests.cpp @@ -416,7 +416,7 @@ TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Timestamps one file", REQUIRE(LogTestController::getInstance().contains("key:filename value:tstFile.ext")); plan->reset(); - LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output); + LogTestController::getInstance().clear(); testController.runSession(plan, true); @@ -435,7 +435,7 @@ TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Timestamps one file on REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext")); plan->reset(); - LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output); + LogTestController::getInstance().clear(); createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -4min); @@ -456,7 +456,7 @@ TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Timestamps one file on REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext")); plan->reset(); - LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output); + LogTestController::getInstance().clear(); createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -6min); @@ -479,7 +479,7 @@ TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Timestamps one file an REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext")); plan->reset(); - LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output); + LogTestController::getInstance().clear(); /* We must sleep to avoid triggering the listing lag. */ std::this_thread::sleep_for(std::chrono::milliseconds(1500)); @@ -506,7 +506,7 @@ TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Timestamps one file ti REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext")); plan->reset(); - LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output); + LogTestController::getInstance().clear(); REQUIRE(utils::file::set_last_write_time(file, mtime + 1s)); @@ -516,7 +516,7 @@ TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Timestamps one file ti REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext")); plan->reset(); - LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output); + LogTestController::getInstance().clear(); /* We must sleep to avoid triggering the listing lag. */ std::this_thread::sleep_for(std::chrono::milliseconds(1500)); @@ -551,7 +551,7 @@ TEST_CASE_METHOD(PersistentListSFTPTestsFixture, "ListSFTP Tracking Timestamps r REQUIRE(list_sftp_uuid); createPlan(&list_sftp_uuid, std::make_shared<minifi::Configure>()); plan->setProperty(list_sftp, "Listing Strategy", "Tracking Timestamps"); - LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output); + LogTestController::getInstance().clear(); createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -4min); @@ -594,7 +594,7 @@ TEST_CASE_METHOD(PersistentListSFTPTestsFixture, "ListSFTP Tracking Timestamps r createPlan(&list_sftp_uuid, std::make_shared<minifi::Configure>()); plan->setProperty(list_sftp, "Listing Strategy", "Tracking Timestamps"); plan->setProperty(list_sftp, "Remote Path", "/nifi_test"); - LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output); + LogTestController::getInstance().clear(); createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -4min); @@ -627,7 +627,7 @@ TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Timestamps changed con REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext")); plan->reset(); - LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output); + LogTestController::getInstance().clear(); plan->setProperty(list_sftp, "Remote Path", "/nifi_test"); createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -4min); @@ -655,7 +655,7 @@ TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities one file", "[ REQUIRE(LogTestController::getInstance().contains("key:filename value:tstFile.ext")); plan->reset(); - LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output); + LogTestController::getInstance().clear(); testController.runSession(plan, true); @@ -674,7 +674,7 @@ TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities one file one REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext")); plan->reset(); - LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output); + LogTestController::getInstance().clear(); createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -4min); @@ -695,7 +695,7 @@ TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities one file one REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext")); plan->reset(); - LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output); + LogTestController::getInstance().clear(); createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -6min); @@ -718,7 +718,7 @@ TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities one file one REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext")); plan->reset(); - LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output); + LogTestController::getInstance().clear(); createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -6h); @@ -741,7 +741,7 @@ TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities one file anot REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext")); plan->reset(); - LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output); + LogTestController::getInstance().clear(); createFile("nifi_test/file2.ext", "Test content 2", mtime); @@ -767,7 +767,7 @@ TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities one file time REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext")); plan->reset(); - LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output); + LogTestController::getInstance().clear(); REQUIRE(utils::file::set_last_write_time(file, mtime + 1s)); @@ -778,7 +778,7 @@ TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities one file time REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext")); plan->reset(); - LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output); + LogTestController::getInstance().clear(); testController.runSession(plan, true); @@ -799,7 +799,7 @@ TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities one file size REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext")); plan->reset(); - LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output); + LogTestController::getInstance().clear(); createFile("nifi_test/file1.ext", "Longer test content 1", mtime); @@ -810,7 +810,7 @@ TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities one file size REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext")); plan->reset(); - LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output); + LogTestController::getInstance().clear(); testController.runSession(plan, true); @@ -832,7 +832,7 @@ TEST_CASE_METHOD(PersistentListSFTPTestsFixture, "ListSFTP Tracking Entities res REQUIRE(list_sftp_uuid); createPlan(&list_sftp_uuid, std::make_shared<minifi::Configure>()); plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities"); - LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output); + LogTestController::getInstance().clear(); std::unordered_map<std::string, std::string> state; REQUIRE(plan->getProcessContextForProcessor(list_sftp)->getStateManager()->get(state)); @@ -893,7 +893,7 @@ TEST_CASE_METHOD(PersistentListSFTPTestsFixture, "ListSFTP Tracking Entities res createPlan(&list_sftp_uuid, std::make_shared<minifi::Configure>()); plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities"); plan->setProperty(list_sftp, "Remote Path", "/nifi_test"); - LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output); + LogTestController::getInstance().clear(); createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -4min); @@ -926,7 +926,7 @@ TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities changed confi REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext")); plan->reset(); - LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output); + LogTestController::getInstance().clear(); plan->setProperty(list_sftp, "Remote Path", "/nifi_test"); createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -4min); diff --git a/extensions/standard-processors/tests/unit/ListFileTests.cpp b/extensions/standard-processors/tests/unit/ListFileTests.cpp index 46ce99b47..32172c05f 100644 --- a/extensions/standard-processors/tests/unit/ListFileTests.cpp +++ b/extensions/standard-processors/tests/unit/ListFileTests.cpp @@ -140,7 +140,7 @@ TEST_CASE_METHOD(ListFileTestFixture, "Test listing files only once with default REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:file.lastModifiedTime value:" + *utils::file::FileUtils::get_last_modified_time_formatted_string(first_sub_file_abs_path_, FORMAT_STRING))); REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:file.lastModifiedTime value:" + *utils::file::FileUtils::get_last_modified_time_formatted_string(second_sub_file_abs_path_, FORMAT_STRING))); plan_->reset(); - LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output); + LogTestController::getInstance().clear(); test_controller_.runSession(plan_, true); REQUIRE_FALSE(LogTestController::getInstance().contains("key:file.size", 0s, 0ms)); } diff --git a/extensions/standard-processors/tests/unit/RetryFlowFileTests.cpp b/extensions/standard-processors/tests/unit/RetryFlowFileTests.cpp index daf2f1abe..9dfd3f53c 100644 --- a/extensions/standard-processors/tests/unit/RetryFlowFileTests.cpp +++ b/extensions/standard-processors/tests/unit/RetryFlowFileTests.cpp @@ -163,13 +163,13 @@ class RetryFlowFileTest { } static bool logContainsText(const std::string& pattern) { - const std::string logs = LogTestController::getInstance().log_output.str(); + const std::string logs = LogTestController::getInstance().getLogs(); return logs.find(pattern) != std::string::npos; } static bool flowfileWasPenalizedARetryflowfile() { std::regex re(R"(\[org::apache::nifi::minifi::core::ProcessSession\] \[info\] Penalizing [0-9a-z\-]+ for [0-9]*ms at retryflowfile)"); - return std::regex_search(LogTestController::getInstance().log_output.str(), re); + return std::regex_search(LogTestController::getInstance().getLogs(), re); } static bool retryFlowfileWarnedForReuse() { diff --git a/extensions/standard-processors/tests/unit/TailFileTests.cpp b/extensions/standard-processors/tests/unit/TailFileTests.cpp index fd82b81c9..dd295a9ef 100644 --- a/extensions/standard-processors/tests/unit/TailFileTests.cpp +++ b/extensions/standard-processors/tests/unit/TailFileTests.cpp @@ -140,7 +140,7 @@ TEST_CASE("TailFile picks up the second line if a delimiter is written between r REQUIRE(LogTestController::getInstance().contains("key:filename value:minifi-tmpfile.0-13.txt")); plan->reset(true); // start a new but with state file - LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output); + LogTestController::getInstance().clear(); std::ofstream appendStream; appendStream.open(temp_file_path, std::ios_base::app | std::ios_base::binary); @@ -182,7 +182,7 @@ TEST_CASE("TailFile re-reads the file if the state is deleted between runs", "[s REQUIRE(LogTestController::getInstance().contains("key:filename value:minifi-tmpfile.0-13.txt")); plan->reset(true); // start a new but with state file - LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output); + LogTestController::getInstance().clear(); plan->getProcessContextForProcessor(tailfile)->getStateManager()->clear(); @@ -230,7 +230,7 @@ TEST_CASE("TailFile picks up the state correctly if it is rewritten between runs // should stay the same for (int i = 0; i < 5; i++) { plan->reset(true); // start a new but with state file - LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output); + LogTestController::getInstance().clear(); plan->getProcessContextForProcessor(tailfile)->getStateManager()->set({{"file.0.name", temp_file_path.filename().string()}, {"file.0.position", "14"}, @@ -392,7 +392,7 @@ TEST_CASE("TailFile picks up the new File to Tail if it is changed between runs" auto second_test_file = createTempFile(directory, "second.log", "my second log line\n"); plan->setProperty(tail_file, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), second_test_file.string()); plan->reset(true); // clear the memory, but keep the state file - LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output); + LogTestController::getInstance().clear(); testController.runSession(plan, true); REQUIRE(LogTestController::getInstance().contains("Logged 1 flow file")); REQUIRE(LogTestController::getInstance().contains("key:filename value:second.0-18.log")); @@ -403,7 +403,7 @@ TEST_CASE("TailFile picks up the new File to Tail if it is changed between runs" auto second_test_file = createTempFile(directory, "second.log", ""); plan->setProperty(tail_file, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), second_test_file.string()); plan->reset(true); // clear the memory, but keep the state file - LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output); + LogTestController::getInstance().clear(); testController.runSession(plan, true); REQUIRE(LogTestController::getInstance().contains("Logged 0 flow files")); } @@ -441,7 +441,7 @@ TEST_CASE("TailFile picks up the new File to Tail if it is changed between runs SECTION("If a file no longer matches the new regex, then we stop tailing it") { plan->setProperty(tail_file, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), "first\\.f.*\\.log"); plan->reset(true); // clear the memory, but keep the state file - LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output); + LogTestController::getInstance().clear(); testController.runSession(plan, true); REQUIRE(LogTestController::getInstance().contains("Logged 1 flow file")); REQUIRE(LogTestController::getInstance().contains("key:filename value:first.fruit.6-12.log")); @@ -450,7 +450,7 @@ TEST_CASE("TailFile picks up the new File to Tail if it is changed between runs SECTION("If a new file matches the new regex, we start tailing it") { plan->setProperty(tail_file, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), ".*\\.fruit\\.log"); plan->reset(true); // clear the memory, but keep the state file - LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output); + LogTestController::getInstance().clear(); testController.runSession(plan, true); REQUIRE(LogTestController::getInstance().contains("Logged 2 flow file")); REQUIRE(LogTestController::getInstance().contains("key:filename value:first.fruit.6-12.log")); @@ -527,7 +527,7 @@ TEST_CASE("TailFile picks up new files created between runs", "[multiple_file]") createTempFile(dir, "another.log", "some more content\n"); plan->reset(); - LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output); + LogTestController::getInstance().clear(); testController.runSession(plan, true); REQUIRE(LogTestController::getInstance().contains("Logged 1 flow file")); @@ -563,7 +563,7 @@ TEST_CASE("TailFile can handle input files getting removed", "[multiple_file]") REQUIRE(LogTestController::getInstance().contains("Logged 2 flow files")); plan->reset(); - LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output); + LogTestController::getInstance().clear(); appendTempFile(dir, "one.log", "line two\nline three\nline four\n"); std::filesystem::remove(dir / "two.log"); @@ -830,7 +830,7 @@ TEST_CASE("TailFile finds and finishes the renamed file and continues with the n new_in_file_stream.close(); plan->reset(); - LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output); + LogTestController::getInstance().clear(); plan->runNextProcessor(); // Tail plan->runNextProcessor(); // Log @@ -891,7 +891,7 @@ TEST_CASE("TailFile finds and finishes multiple rotated files and continues with test_file_stream_2.close(); plan->reset(); - LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output); + LogTestController::getInstance().clear(); testController.runSession(plan, true); @@ -936,7 +936,7 @@ TEST_CASE("TailFile ignores old rotated files", "[rotation]") { createTempFile(dir, "test.log", "line8\nline9\n"); plan->reset(); - LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output); + LogTestController::getInstance().clear(); testController.runSession(plan, true); REQUIRE(LogTestController::getInstance().contains("Logged 2 flow files")); @@ -996,7 +996,7 @@ TEST_CASE("TailFile rotation works with multiple input files", "[rotation][multi appendTempFile(dir, "color.log", "turquoise\n"); plan->reset(); - LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output); + LogTestController::getInstance().clear(); testController.runSession(plan, true); @@ -1063,7 +1063,7 @@ TEST_CASE("TailFile handles the Rolling Filename Pattern property correctly", "[ createTempFile(dir, "other_rolled.log", "some stuff\none more line\n"); // same contents as test.rolled.log plan->reset(); - LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output); + LogTestController::getInstance().clear(); testController.runSession(plan, true); @@ -1104,7 +1104,7 @@ TEST_CASE("TailFile finds and finishes the renamed file and continues with the n REQUIRE(LogTestController::getInstance().contains("Logged 3 flow files")); } - LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output); + LogTestController::getInstance().clear(); std::this_thread::sleep_for(std::chrono::milliseconds(100)); @@ -1466,7 +1466,7 @@ TEST_CASE("TailFile interprets the lookup frequency property correctly", "[multi REQUIRE(LogTestController::getInstance().contains("Logged 1 flow files")); plan->reset(); - LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output); + LogTestController::getInstance().clear(); createTempFile(directory, "test.blue.log", "sky\n"); createTempFile(directory, "test.green.log", "grass\n"); @@ -1482,7 +1482,7 @@ TEST_CASE("TailFile interprets the lookup frequency property correctly", "[multi REQUIRE(LogTestController::getInstance().contains("Logged 1 flow files")); plan->reset(); - LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output); + LogTestController::getInstance().clear(); createTempFile(directory, "test.blue.log", "sky\n"); createTempFile(directory, "test.green.log", "grass\n"); @@ -1491,7 +1491,7 @@ TEST_CASE("TailFile interprets the lookup frequency property correctly", "[multi REQUIRE(LogTestController::getInstance().contains("Logged 0 flow files")); plan->reset(); - LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output); + LogTestController::getInstance().clear(); std::this_thread::sleep_for(std::chrono::milliseconds(550)); testController.runSession(plan, true); @@ -1531,7 +1531,7 @@ TEST_CASE("TailFile reads from a single file when Initial Start Position is set" REQUIRE(LogTestController::getInstance().contains("Size:" + std::to_string(NEWLINE_FILE.find_first_of('\n') + 1) + " Offset:0")); plan->reset(true); - LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output); + LogTestController::getInstance().clear(); appendTempFile(dir, TMP_FILE, NEW_TAIL_DATA); @@ -1551,7 +1551,7 @@ TEST_CASE("TailFile reads from a single file when Initial Start Position is set" REQUIRE(LogTestController::getInstance().contains("Size:" + std::to_string(ROLLED_OVER_TAIL_DATA.find_first_of('\n') + 1) + " Offset:0")); plan->reset(true); - LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output); + LogTestController::getInstance().clear(); appendTempFile(dir, TMP_FILE, NEW_TAIL_DATA); @@ -1569,7 +1569,7 @@ TEST_CASE("TailFile reads from a single file when Initial Start Position is set" REQUIRE(LogTestController::getInstance().contains("Logged 0 flow files")); plan->reset(true); - LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output); + LogTestController::getInstance().clear(); appendTempFile(dir, TMP_FILE, NEW_TAIL_DATA); @@ -1604,7 +1604,7 @@ TEST_CASE("TailFile reads from a single file when Initial Start Position is set REQUIRE(LogTestController::getInstance().contains("Logged 0 flow files")); plan->reset(true); - LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output); + LogTestController::getInstance().clear(); const std::string DATA_IN_NEW_FILE = "data in new file\n"; std::this_thread::sleep_for(std::chrono::milliseconds(100)); // make sure the new file gets newer modification time @@ -1652,7 +1652,7 @@ TEST_CASE("TailFile reads multiple files when Initial Start Position is set", "[ REQUIRE(LogTestController::getInstance().contains("Size:" + std::to_string(TMP_FILE_2_DATA.find_first_of('\n') + 1) + " Offset:0")); plan->reset(true); - LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output); + LogTestController::getInstance().clear(); createTempFile(dir, "minifi-tmpfile-3.txt", ADDITIONALY_CREATED_FILE_CONTENT); appendTempFile(dir, TMP_FILE, NEW_TAIL_DATA); @@ -1675,7 +1675,7 @@ TEST_CASE("TailFile reads multiple files when Initial Start Position is set", "[ REQUIRE(LogTestController::getInstance().contains("Size:" + std::to_string(TMP_FILE_2_DATA.find_first_of('\n') + 1) + " Offset:0")); plan->reset(true); - LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output); + LogTestController::getInstance().clear(); createTempFile(dir, "minifi-tmpfile-3.txt", ADDITIONALY_CREATED_FILE_CONTENT); appendTempFile(dir, TMP_FILE, NEW_TAIL_DATA); @@ -1695,7 +1695,7 @@ TEST_CASE("TailFile reads multiple files when Initial Start Position is set", "[ REQUIRE(LogTestController::getInstance().contains("Logged 0 flow files")); plan->reset(true); - LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output); + LogTestController::getInstance().clear(); createTempFile(dir, "minifi-tmpfile-3.txt", ADDITIONALY_CREATED_FILE_CONTENT); appendTempFile(dir, TMP_FILE, NEW_TAIL_DATA); diff --git a/extensions/windows-event-log/tests/CWELTestUtils.h b/extensions/windows-event-log/tests/CWELTestUtils.h index 0a0bd85ee..efe240141 100644 --- a/extensions/windows-event-log/tests/CWELTestUtils.h +++ b/extensions/windows-event-log/tests/CWELTestUtils.h @@ -69,7 +69,7 @@ class OutputFormatTestController : public TestController { } test_plan->reset(); - LogTestController::resetStream(LogTestController::getInstance().log_output); + LogTestController::getInstance().clear(); { diff --git a/extensions/windows-event-log/tests/ConsumeWindowsEventLogTests.cpp b/extensions/windows-event-log/tests/ConsumeWindowsEventLogTests.cpp index 44a2f125f..ee9020161 100644 --- a/extensions/windows-event-log/tests/ConsumeWindowsEventLogTests.cpp +++ b/extensions/windows-event-log/tests/ConsumeWindowsEventLogTests.cpp @@ -152,7 +152,7 @@ TEST_CASE("ConsumeWindowsEventLog can consume new events", "[onTrigger]") { // later runs will start with a bookmark saved in the state manager test_plan->reset(); - LogTestController::resetStream(LogTestController::getInstance().log_output); + LogTestController::getInstance().clear(); SECTION("Read one event") { reportEvent(APPLICATION_CHANNEL, "Event one"); @@ -198,7 +198,7 @@ TEST_CASE("ConsumeWindowsEventLog bookmarking works", "[onTrigger]") { CHECK(LogTestController::getInstance().contains("processed 0 Events")); test_plan->reset(); - LogTestController::resetStream(LogTestController::getInstance().log_output); + LogTestController::getInstance().clear(); SECTION("Read in one go") { reportEvent(APPLICATION_CHANNEL, "Event one"); @@ -219,7 +219,7 @@ TEST_CASE("ConsumeWindowsEventLog bookmarking works", "[onTrigger]") { reportEvent(APPLICATION_CHANNEL, "Event three"); test_plan->reset(); - LogTestController::resetStream(LogTestController::getInstance().log_output); + LogTestController::getInstance().clear(); TestController::runSession(test_plan); CHECK(LogTestController::getInstance().contains("processed 2 Events")); @@ -259,7 +259,7 @@ TEST_CASE("ConsumeWindowsEventLog extracts some attributes by default", "[onTrig } test_plan->reset(); - LogTestController::resetStream(LogTestController::getInstance().log_output); + LogTestController::getInstance().clear(); // 1st event, on Info level { @@ -272,7 +272,7 @@ TEST_CASE("ConsumeWindowsEventLog extracts some attributes by default", "[onTrig } test_plan->reset(); - LogTestController::resetStream(LogTestController::getInstance().log_output); + LogTestController::getInstance().clear(); // 2st event, on Warning level { @@ -308,7 +308,7 @@ void outputFormatSetterTestHelper(const std::string &output_format, int expected } test_plan->reset(); - LogTestController::resetStream(LogTestController::getInstance().log_output); + LogTestController::getInstance().clear(); { reportEvent(APPLICATION_CHANNEL, "Event one"); @@ -442,7 +442,7 @@ void batchCommitSizeTestHelper(std::size_t num_events_read, std::size_t batch_co } test_plan->reset(); - LogTestController::resetStream(LogTestController::getInstance().log_output); + LogTestController::getInstance().clear(); auto generate_events = [](const std::size_t event_count) { std::vector<std::string> events; diff --git a/libminifi/include/utils/IntegrationTestUtils.h b/libminifi/include/utils/IntegrationTestUtils.h index 4d96132b7..1799a6b87 100644 --- a/libminifi/include/utils/IntegrationTestUtils.h +++ b/libminifi/include/utils/IntegrationTestUtils.h @@ -44,7 +44,7 @@ bool verifyEventHappenedInPollTime( template <class Rep, class Period, typename ...String> bool verifyLogLinePresenceInPollTime(const std::chrono::duration<Rep, Period>& wait_duration, String&&... patterns) { auto check = [&patterns...] { - const std::string logs = LogTestController::getInstance().log_output.str(); + const std::string logs = LogTestController::getInstance().getLogs(); return ((logs.find(patterns) != std::string::npos) && ...); }; return verifyEventHappenedInPollTime(wait_duration, check); diff --git a/libminifi/test/LogUtils.h b/libminifi/test/LogUtils.h index 942775535..5fedd0a75 100644 --- a/libminifi/test/LogUtils.h +++ b/libminifi/test/LogUtils.h @@ -21,27 +21,34 @@ #include <memory> #include <string> #include <utility> +#include <mutex> #include "spdlog/sinks/sink.h" #include "spdlog/sinks/ostream_sink.h" class StringStreamSink : public spdlog::sinks::sink { public: - explicit StringStreamSink(std::shared_ptr<std::ostringstream> stream, bool force_flush = false) - : stream_(std::move(stream)), sink_(*stream_, force_flush) {} + explicit StringStreamSink(std::shared_ptr<std::ostringstream> stream, std::shared_ptr<std::mutex> log_output_mutex, bool force_flush = false) + : stream_(std::move(stream)), + log_output_mutex_(std::move(log_output_mutex)), + sink_(*stream_, force_flush) {} ~StringStreamSink() override = default; void log(const spdlog::details::log_msg &msg) override { + std::lock_guard<std::mutex> guard(*log_output_mutex_); sink_.log(msg); } void flush() override { + std::lock_guard<std::mutex> guard(*log_output_mutex_); sink_.flush(); } void set_pattern(const std::string &pattern) override { + std::lock_guard<std::mutex> guard(*log_output_mutex_); sink_.set_pattern(pattern); } void set_formatter(std::unique_ptr<spdlog::formatter> sink_formatter) override { + std::lock_guard<std::mutex> guard(*log_output_mutex_); sink_.set_formatter(std::move(sink_formatter)); } @@ -51,5 +58,6 @@ class StringStreamSink : public spdlog::sinks::sink { // static storage duration, thus they might outlive the provider of // the stream std::shared_ptr<std::ostringstream> stream_; + std::shared_ptr<std::mutex> log_output_mutex_; spdlog::sinks::ostream_sink_mt sink_; }; diff --git a/libminifi/test/TestBase.cpp b/libminifi/test/TestBase.cpp index ba2cb6d92..cb61f540f 100644 --- a/libminifi/test/TestBase.cpp +++ b/libminifi/test/TestBase.cpp @@ -92,6 +92,14 @@ void LogTestController::setLevelByClassName(spdlog::level::level_enum level, con } bool LogTestController::contains(const std::ostringstream& stream, const std::string& ending, std::chrono::milliseconds timeout, std::chrono::milliseconds sleep_interval) const { + return contains([&stream](){ return stream.str(); }, ending, timeout, sleep_interval); +} + +bool LogTestController::contains(const std::string& ending, std::chrono::milliseconds timeout, std::chrono::milliseconds sleep_interval) const { + return contains([this](){ return getLogs(); }, ending, timeout, sleep_interval); +} + +bool LogTestController::contains(const std::function<std::string()>& log_string_getter, const std::string& ending, std::chrono::milliseconds timeout, std::chrono::milliseconds sleep_interval) const { if (ending.length() == 0) { return false; } @@ -99,7 +107,7 @@ bool LogTestController::contains(const std::ostringstream& stream, const std::st bool found = false; bool timed_out = false; do { - std::string str = stream.str(); + std::string str = log_string_getter(); found = (str.find(ending) != std::string::npos); auto now = std::chrono::steady_clock::now(); timed_out = (now - start > timeout); @@ -122,7 +130,7 @@ std::optional<std::smatch> LogTestController::matchesRegex(const std::string& re std::regex matcher_regex(regex_str); std::smatch match; do { - std::string str = log_output.str(); + std::string str = getLogs(); found = std::regex_search(str, match, matcher_regex); auto now = std::chrono::steady_clock::now(); timed_out = (now - start > timeout); @@ -136,7 +144,7 @@ std::optional<std::smatch> LogTestController::matchesRegex(const std::string& re } int LogTestController::countOccurrences(const std::string& pattern) const { - return minifi::utils::StringUtils::countOccurrences(log_output.str(), pattern).second; + return minifi::utils::StringUtils::countOccurrences(getLogs(), pattern).second; } void LogTestController::reset() { @@ -150,7 +158,9 @@ void LogTestController::reset() { } void LogTestController::clear() { - resetStream(log_output); + std::lock_guard<std::mutex> guard(*log_output_mutex_); + gsl_Expects(log_output_ptr_); + resetStream(*log_output_ptr_); } void LogTestController::resetStream(std::ostringstream& stream) { @@ -159,6 +169,7 @@ void LogTestController::resetStream(std::ostringstream& stream) { } LogTestController::LogTestController(const std::shared_ptr<logging::LoggerProperties>& loggerProps) { + gsl_Expects(log_output_ptr_); my_properties_ = loggerProps; bool initMain = false; if (nullptr == my_properties_) { @@ -169,7 +180,7 @@ LogTestController::LogTestController(const std::shared_ptr<logging::LoggerProper my_properties_->set("logger." + minifi::core::getClassName<LogTestController>(), "INFO"); my_properties_->set("logger." + minifi::core::getClassName<logging::LoggerConfiguration>(), "INFO"); std::shared_ptr<spdlog::sinks::dist_sink_mt> dist_sink = std::make_shared<spdlog::sinks::dist_sink_mt>(); - dist_sink->add_sink(std::make_shared<StringStreamSink>(log_output_ptr, true)); + dist_sink->add_sink(std::make_shared<StringStreamSink>(log_output_ptr_, log_output_mutex_, true)); dist_sink->add_sink(std::make_shared<spdlog::sinks::stderr_sink_mt>()); my_properties_->add_sink("ostream", dist_sink); if (initMain) { diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h index 147a91911..e2118144e 100644 --- a/libminifi/test/TestBase.h +++ b/libminifi/test/TestBase.h @@ -26,6 +26,7 @@ #include <string> #include <utility> #include <vector> +#include <mutex> #include "spdlog/common.h" @@ -129,9 +130,7 @@ class LogTestController { void setLevelByClassName(spdlog::level::level_enum level, const std::string& class_name); - bool contains(const std::string &ending, std::chrono::milliseconds timeout = std::chrono::seconds(3), std::chrono::milliseconds sleep_interval = std::chrono::milliseconds(200)) const { - return contains(log_output, ending, timeout, sleep_interval); - } + bool contains(const std::string &ending, std::chrono::milliseconds timeout = std::chrono::seconds(3), std::chrono::milliseconds sleep_interval = std::chrono::milliseconds(200)) const; bool contains(const std::ostringstream &stream, const std::string &ending, std::chrono::milliseconds timeout = std::chrono::seconds(3), @@ -149,8 +148,11 @@ class LogTestController { static void resetStream(std::ostringstream &stream); - std::shared_ptr<std::ostringstream> log_output_ptr = std::make_shared<std::ostringstream>(); - std::ostringstream& log_output = *log_output_ptr; + std::string getLogs() const { + std::lock_guard<std::mutex> guard(*log_output_mutex_); + gsl_Expects(log_output_ptr_); + return log_output_ptr_->str(); + } std::shared_ptr<logging::Logger> logger_; @@ -162,7 +164,10 @@ class LogTestController { explicit LogTestController(const std::shared_ptr<logging::LoggerProperties> &loggerProps); void setLevel(const std::string& name, spdlog::level::level_enum level); + bool contains(const std::function<std::string()>& log_string_getter, const std::string& ending, std::chrono::milliseconds timeout, std::chrono::milliseconds sleep_interval) const; + mutable std::shared_ptr<std::mutex> log_output_mutex_ = std::make_shared<std::mutex>(); + std::shared_ptr<std::ostringstream> log_output_ptr_ = std::make_shared<std::ostringstream>(); std::shared_ptr<logging::LoggerProperties> my_properties_; std::unique_ptr<logging::LoggerConfiguration> config; std::vector<std::string> modified_loggers; diff --git a/libminifi/test/azure-tests/ListAzureBlobStorageTests.cpp b/libminifi/test/azure-tests/ListAzureBlobStorageTests.cpp index 9fec6d41e..7164becb1 100644 --- a/libminifi/test/azure-tests/ListAzureBlobStorageTests.cpp +++ b/libminifi/test/azure-tests/ListAzureBlobStorageTests.cpp @@ -246,7 +246,7 @@ TEST_CASE_METHOD(ListAzureBlobStorageTestsFixture, "List all files every time", }; run_assertions(); plan_->reset(); - LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output); + LogTestController::getInstance().clear(); test_controller_.runSession(plan_, true); run_assertions(); } @@ -279,7 +279,7 @@ TEST_CASE_METHOD(ListAzureBlobStorageTestsFixture, "Do not list same files the s CHECK(verifyLogLinePresenceInPollTime(1s, "key:lang value:en-US")); CHECK(verifyLogLinePresenceInPollTime(1s, "key:lang value:de-DE")); plan_->reset(); - LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output); + LogTestController::getInstance().clear(); test_controller_.runSession(plan_, true); REQUIRE_FALSE(LogTestController::getInstance().contains("key:azure", 0s, 0ms)); } diff --git a/libminifi/test/azure-tests/ListAzureDataLakeStorageTests.cpp b/libminifi/test/azure-tests/ListAzureDataLakeStorageTests.cpp index d91771ccd..d6682a710 100644 --- a/libminifi/test/azure-tests/ListAzureDataLakeStorageTests.cpp +++ b/libminifi/test/azure-tests/ListAzureDataLakeStorageTests.cpp @@ -118,7 +118,7 @@ TEST_CASE_METHOD(ListAzureDataLakeStorageTestsFixture, "List all files every tim }; run_assertions(); plan_->reset(); - LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output); + LogTestController::getInstance().clear(); test_controller_.runSession(plan_, true); run_assertions(); } @@ -147,7 +147,7 @@ TEST_CASE_METHOD(ListAzureDataLakeStorageTestsFixture, "Do not list same files t CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.lastModified value:" + mock_data_lake_storage_client_ptr_->ITEM1_LAST_MODIFIED + "\n")); CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.lastModified value:" + mock_data_lake_storage_client_ptr_->ITEM2_LAST_MODIFIED + "\n")); plan_->reset(); - LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output); + LogTestController::getInstance().clear(); test_controller_.runSession(plan_, true); REQUIRE_FALSE(LogTestController::getInstance().contains("key:azure", 0s, 0ms)); } diff --git a/libminifi/test/integration/OnScheduleErrorHandlingTests.cpp b/libminifi/test/integration/OnScheduleErrorHandlingTests.cpp index b20d34164..78526ce7b 100644 --- a/libminifi/test/integration/OnScheduleErrorHandlingTests.cpp +++ b/libminifi/test/integration/OnScheduleErrorHandlingTests.cpp @@ -35,7 +35,7 @@ class KamikazeErrorHandlingTests : public IntegrationBase { void runAssertions() override { using org::apache::nifi::minifi::utils::verifyEventHappenedInPollTime; assert(verifyEventHappenedInPollTime(wait_time_, [&] { - const std::string logs = LogTestController::getInstance().log_output.str(); + const std::string logs = LogTestController::getInstance().getLogs(); const auto result = utils::StringUtils::countOccurrences(logs, minifi::processors::KamikazeProcessor::OnScheduleExceptionStr); const int occurrences = result.second; return 1 < occurrences; @@ -48,7 +48,7 @@ class KamikazeErrorHandlingTests : public IntegrationBase { "[warning] ProcessSession rollback for kamikaze executed"}; const bool test_success = verifyEventHappenedInPollTime(std::chrono::milliseconds(wait_time_), [&] { - const std::string logs = LogTestController::getInstance().log_output.str(); + const std::string logs = LogTestController::getInstance().getLogs(); const auto result = utils::StringUtils::countOccurrences(logs, minifi::processors::KamikazeProcessor::OnScheduleExceptionStr); size_t last_pos = result.first; for (const std::string& msg : must_appear_byorder_msgs) { @@ -61,7 +61,7 @@ class KamikazeErrorHandlingTests : public IntegrationBase { }); assert(test_success); - assert(LogTestController::getInstance().log_output.str().find(minifi::processors::KamikazeProcessor::OnTriggerLogStr) == std::string::npos); + assert(LogTestController::getInstance().getLogs().find(minifi::processors::KamikazeProcessor::OnTriggerLogStr) == std::string::npos); } void testSetup() override { @@ -104,7 +104,7 @@ class EventDriverScheduleErrorHandlingTests: public IntegrationBase { } void runAssertions() override { - std::string logs = LogTestController::getInstance().log_output.str(); + std::string logs = LogTestController::getInstance().getLogs(); assert(logs.find("EventDrivenSchedulingAgent cannot schedule processor without incoming connection!") != std::string::npos); } diff --git a/libminifi/test/integration/StateTransactionalityTests.cpp b/libminifi/test/integration/StateTransactionalityTests.cpp index 97602a07c..ff3d86e95 100644 --- a/libminifi/test/integration/StateTransactionalityTests.cpp +++ b/libminifi/test/integration/StateTransactionalityTests.cpp @@ -104,14 +104,14 @@ const std::unordered_map<std::string, std::string> exampleState{{"key1", "value1 const std::unordered_map<std::string, std::string> exampleState2{{"key3", "value3"}, {"key4", "value4"}}; auto standardLogChecker = [] { - const std::string logs = LogTestController::getInstance().log_output.str(); + const std::string logs = LogTestController::getInstance().getLogs(); const auto errorResult = utils::StringUtils::countOccurrences(logs, "[error]"); const auto warningResult = utils::StringUtils::countOccurrences(logs, "[warning]"); return errorResult.second == 0 && warningResult.second == 0; }; auto commitAndRollbackWarnings = [] { - const std::string logs = LogTestController::getInstance().log_output.str(); + const std::string logs = LogTestController::getInstance().getLogs(); const auto errorResult = utils::StringUtils::countOccurrences(logs, "[error]"); const auto commitWarningResult = utils::StringUtils::countOccurrences(logs, "[warning] Caught \"Process Session Operation: State manager commit failed.\""); const auto rollbackWarningFirst = utils::StringUtils::countOccurrences(logs, "[warning] Caught Exception during process session rollback"); @@ -120,7 +120,7 @@ auto commitAndRollbackWarnings = [] { }; auto exceptionRollbackWarnings = [] { - const std::string logs = LogTestController::getInstance().log_output.str(); + const std::string logs = LogTestController::getInstance().getLogs(); const auto errorResult = utils::StringUtils::countOccurrences(logs, "[error]"); const auto exceptionWarningResult = utils::StringUtils::countOccurrences(logs, "[warning] Caught \"Triggering rollback\""); const auto rollbackWarningResult = utils::StringUtils::countOccurrences(logs, "[warning] ProcessSession rollback for statefulProcessor executed");
