This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 9c5e2996b3e8cb7fb4aadf9c29bf6970af6fa618
Author: Gabor Gyimesi <[email protected]>
AuthorDate: Fri Mar 3 14:12:32 2023 +0100

    MINIFICPP-2041 Fix transient failure of HttpPostIntegrationTestChunked
    
    There was a concurrency issue between the test logger on one thread and
    the verifying thread which read the output stream of the logger. Adding
    a mutex that guards ostringstream and the logger sink that writes
    ostringstream solves this issue.
    
    Closes #1512
    Signed-off-by: Marton Szasz <[email protected]>
---
 extensions/civetweb/processors/ListenHTTP.cpp      |  1 +
 .../tests/C2ClearCoreComponentStateTest.cpp        |  2 +-
 extensions/http-curl/tests/C2LogHeartbeatTest.cpp  |  2 +-
 .../http-curl/tests/HttpPostIntegrationTest.cpp    |  2 +-
 extensions/sftp/tests/ListSFTPTests.cpp            | 42 +++++++++---------
 .../tests/unit/ListFileTests.cpp                   |  2 +-
 .../tests/unit/RetryFlowFileTests.cpp              |  4 +-
 .../tests/unit/TailFileTests.cpp                   | 50 +++++++++++-----------
 extensions/windows-event-log/tests/CWELTestUtils.h |  2 +-
 .../tests/ConsumeWindowsEventLogTests.cpp          | 14 +++---
 libminifi/include/utils/IntegrationTestUtils.h     |  2 +-
 libminifi/test/LogUtils.h                          | 12 +++++-
 libminifi/test/TestBase.cpp                        | 21 ++++++---
 libminifi/test/TestBase.h                          | 15 ++++---
 .../test/azure-tests/ListAzureBlobStorageTests.cpp |  4 +-
 .../azure-tests/ListAzureDataLakeStorageTests.cpp  |  4 +-
 .../integration/OnScheduleErrorHandlingTests.cpp   |  8 ++--
 .../integration/StateTransactionalityTests.cpp     |  6 +--
 18 files changed, 109 insertions(+), 84 deletions(-)

diff --git a/extensions/civetweb/processors/ListenHTTP.cpp 
b/extensions/civetweb/processors/ListenHTTP.cpp
index 127ac332a..02e8d543e 100644
--- a/extensions/civetweb/processors/ListenHTTP.cpp
+++ b/extensions/civetweb/processors/ListenHTTP.cpp
@@ -262,6 +262,7 @@ bool 
ListenHTTP::processIncomingFlowFile(core::ProcessSession &session) {
 
 /// @return Whether there was a request processed
 bool ListenHTTP::processRequestBuffer(core::ProcessSession& session) {
+  gsl_Expects(handler_);
   std::size_t flow_file_count = 0;
   for (; batch_size_ == 0 || batch_size_ > flow_file_count; ++flow_file_count) 
{
     FlowFileBufferPair flow_file_buffer_pair;
diff --git a/extensions/http-curl/tests/C2ClearCoreComponentStateTest.cpp 
b/extensions/http-curl/tests/C2ClearCoreComponentStateTest.cpp
index ebdfd3e20..181a0f28a 100644
--- a/extensions/http-curl/tests/C2ClearCoreComponentStateTest.cpp
+++ b/extensions/http-curl/tests/C2ClearCoreComponentStateTest.cpp
@@ -137,7 +137,7 @@ class ClearCoreComponentStateHandler: public 
HeartbeatHandler {
       }
       case FlowState::CLEAR_SENT: {
         auto tail_file_ran_again_checker = [this] {
-          const auto log_contents = 
LogTestController::getInstance().log_output.str();
+          const auto log_contents = LogTestController::getInstance().getLogs();
           const std::string tailing_file_pattern = "[debug] Tailing file " + 
file_1_location_.string();
           const std::string tail_file_committed_pattern = "[trace] 
ProcessSession committed for TailFile1";
           const std::vector<std::string> patterns = {tailing_file_pattern, 
tailing_file_pattern, tail_file_committed_pattern};
diff --git a/extensions/http-curl/tests/C2LogHeartbeatTest.cpp 
b/extensions/http-curl/tests/C2LogHeartbeatTest.cpp
index 624185d70..1e50c7e54 100644
--- a/extensions/http-curl/tests/C2LogHeartbeatTest.cpp
+++ b/extensions/http-curl/tests/C2LogHeartbeatTest.cpp
@@ -54,7 +54,7 @@ class VerifyLogC2Heartbeat : public VerifyC2Base {
         std::chrono::milliseconds(wait_time_),
         "\"operation\": \"heartbeat\""));
 
-    const auto log = LogTestController::getInstance().log_output.str();
+    const auto log = LogTestController::getInstance().getLogs();
     auto types_in_heartbeat = log | ranges::views::split('\n')
         | ranges::views::transform([](auto&& rng) { return rng | 
ranges::to<std::string>; })
         | ranges::views::filter([](auto&& line) { return 
utils::StringUtils::startsWith(line, "                                
\"type\":"); })
diff --git a/extensions/http-curl/tests/HttpPostIntegrationTest.cpp 
b/extensions/http-curl/tests/HttpPostIntegrationTest.cpp
index d4271a3e9..dfd6d3d18 100644
--- a/extensions/http-curl/tests/HttpPostIntegrationTest.cpp
+++ b/extensions/http-curl/tests/HttpPostIntegrationTest.cpp
@@ -37,7 +37,7 @@ using namespace std::literals::chrono_literals;
 
 class HttpTestHarness : public HTTPIntegrationBase {
  public:
-  HttpTestHarness() : HTTPIntegrationBase(4s) {
+  HttpTestHarness() {
     dir_ = test_controller_.createTempDirectory();
   }
 
diff --git a/extensions/sftp/tests/ListSFTPTests.cpp 
b/extensions/sftp/tests/ListSFTPTests.cpp
index 68faaab04..6a993a2db 100644
--- a/extensions/sftp/tests/ListSFTPTests.cpp
+++ b/extensions/sftp/tests/ListSFTPTests.cpp
@@ -416,7 +416,7 @@ TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking 
Timestamps one file",
   REQUIRE(LogTestController::getInstance().contains("key:filename 
value:tstFile.ext"));
 
   plan->reset();
-  
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+  LogTestController::getInstance().clear();
 
   testController.runSession(plan, true);
 
@@ -435,7 +435,7 @@ TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking 
Timestamps one file on
   REQUIRE(LogTestController::getInstance().contains("key:filename 
value:file1.ext"));
 
   plan->reset();
-  
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+  LogTestController::getInstance().clear();
 
   createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", 
-4min);
 
@@ -456,7 +456,7 @@ TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking 
Timestamps one file on
   REQUIRE(LogTestController::getInstance().contains("key:filename 
value:file1.ext"));
 
   plan->reset();
-  
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+  LogTestController::getInstance().clear();
 
   createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", 
-6min);
 
@@ -479,7 +479,7 @@ TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking 
Timestamps one file an
   REQUIRE(LogTestController::getInstance().contains("key:filename 
value:file1.ext"));
 
   plan->reset();
-  
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+  LogTestController::getInstance().clear();
 
   /* We must sleep to avoid triggering the listing lag. */
   std::this_thread::sleep_for(std::chrono::milliseconds(1500));
@@ -506,7 +506,7 @@ TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking 
Timestamps one file ti
   REQUIRE(LogTestController::getInstance().contains("key:filename 
value:file1.ext"));
 
   plan->reset();
-  
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+  LogTestController::getInstance().clear();
 
   REQUIRE(utils::file::set_last_write_time(file, mtime + 1s));
 
@@ -516,7 +516,7 @@ TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking 
Timestamps one file ti
   REQUIRE(LogTestController::getInstance().contains("key:filename 
value:file1.ext"));
 
   plan->reset();
-  
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+  LogTestController::getInstance().clear();
 
   /* We must sleep to avoid triggering the listing lag. */
   std::this_thread::sleep_for(std::chrono::milliseconds(1500));
@@ -551,7 +551,7 @@ TEST_CASE_METHOD(PersistentListSFTPTestsFixture, "ListSFTP 
Tracking Timestamps r
   REQUIRE(list_sftp_uuid);
   createPlan(&list_sftp_uuid, std::make_shared<minifi::Configure>());
   plan->setProperty(list_sftp, "Listing Strategy", "Tracking Timestamps");
-  
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+  LogTestController::getInstance().clear();
 
   createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", 
-4min);
 
@@ -594,7 +594,7 @@ TEST_CASE_METHOD(PersistentListSFTPTestsFixture, "ListSFTP 
Tracking Timestamps r
   createPlan(&list_sftp_uuid, std::make_shared<minifi::Configure>());
   plan->setProperty(list_sftp, "Listing Strategy", "Tracking Timestamps");
   plan->setProperty(list_sftp, "Remote Path", "/nifi_test");
-  
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+  LogTestController::getInstance().clear();
 
   createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", 
-4min);
 
@@ -627,7 +627,7 @@ TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking 
Timestamps changed con
   REQUIRE(LogTestController::getInstance().contains("key:filename 
value:file1.ext"));
 
   plan->reset();
-  
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+  LogTestController::getInstance().clear();
   plan->setProperty(list_sftp, "Remote Path", "/nifi_test");
 
   createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", 
-4min);
@@ -655,7 +655,7 @@ TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking 
Entities one file", "[
   REQUIRE(LogTestController::getInstance().contains("key:filename 
value:tstFile.ext"));
 
   plan->reset();
-  
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+  LogTestController::getInstance().clear();
 
   testController.runSession(plan, true);
 
@@ -674,7 +674,7 @@ TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking 
Entities one file one
   REQUIRE(LogTestController::getInstance().contains("key:filename 
value:file1.ext"));
 
   plan->reset();
-  
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+  LogTestController::getInstance().clear();
 
   createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", 
-4min);
 
@@ -695,7 +695,7 @@ TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking 
Entities one file one
   REQUIRE(LogTestController::getInstance().contains("key:filename 
value:file1.ext"));
 
   plan->reset();
-  
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+  LogTestController::getInstance().clear();
 
   createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", 
-6min);
 
@@ -718,7 +718,7 @@ TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking 
Entities one file one
   REQUIRE(LogTestController::getInstance().contains("key:filename 
value:file1.ext"));
 
   plan->reset();
-  
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+  LogTestController::getInstance().clear();
 
   createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", 
-6h);
 
@@ -741,7 +741,7 @@ TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking 
Entities one file anot
   REQUIRE(LogTestController::getInstance().contains("key:filename 
value:file1.ext"));
 
   plan->reset();
-  
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+  LogTestController::getInstance().clear();
 
   createFile("nifi_test/file2.ext", "Test content 2", mtime);
 
@@ -767,7 +767,7 @@ TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking 
Entities one file time
   REQUIRE(LogTestController::getInstance().contains("key:filename 
value:file1.ext"));
 
   plan->reset();
-  
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+  LogTestController::getInstance().clear();
 
   REQUIRE(utils::file::set_last_write_time(file, mtime + 1s));
 
@@ -778,7 +778,7 @@ TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking 
Entities one file time
   REQUIRE(LogTestController::getInstance().contains("key:filename 
value:file1.ext"));
 
   plan->reset();
-  
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+  LogTestController::getInstance().clear();
 
   testController.runSession(plan, true);
 
@@ -799,7 +799,7 @@ TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking 
Entities one file size
   REQUIRE(LogTestController::getInstance().contains("key:filename 
value:file1.ext"));
 
   plan->reset();
-  
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+  LogTestController::getInstance().clear();
 
   createFile("nifi_test/file1.ext", "Longer test content 1", mtime);
 
@@ -810,7 +810,7 @@ TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking 
Entities one file size
   REQUIRE(LogTestController::getInstance().contains("key:filename 
value:file1.ext"));
 
   plan->reset();
-  
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+  LogTestController::getInstance().clear();
 
   testController.runSession(plan, true);
 
@@ -832,7 +832,7 @@ TEST_CASE_METHOD(PersistentListSFTPTestsFixture, "ListSFTP 
Tracking Entities res
   REQUIRE(list_sftp_uuid);
   createPlan(&list_sftp_uuid, std::make_shared<minifi::Configure>());
   plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
-  
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+  LogTestController::getInstance().clear();
 
   std::unordered_map<std::string, std::string> state;
   
REQUIRE(plan->getProcessContextForProcessor(list_sftp)->getStateManager()->get(state));
@@ -893,7 +893,7 @@ TEST_CASE_METHOD(PersistentListSFTPTestsFixture, "ListSFTP 
Tracking Entities res
   createPlan(&list_sftp_uuid, std::make_shared<minifi::Configure>());
   plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
   plan->setProperty(list_sftp, "Remote Path", "/nifi_test");
-  
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+  LogTestController::getInstance().clear();
 
   createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", 
-4min);
 
@@ -926,7 +926,7 @@ TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking 
Entities changed confi
   REQUIRE(LogTestController::getInstance().contains("key:filename 
value:file1.ext"));
 
   plan->reset();
-  
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+  LogTestController::getInstance().clear();
   plan->setProperty(list_sftp, "Remote Path", "/nifi_test");
 
   createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", 
-4min);
diff --git a/extensions/standard-processors/tests/unit/ListFileTests.cpp 
b/extensions/standard-processors/tests/unit/ListFileTests.cpp
index 46ce99b47..32172c05f 100644
--- a/extensions/standard-processors/tests/unit/ListFileTests.cpp
+++ b/extensions/standard-processors/tests/unit/ListFileTests.cpp
@@ -140,7 +140,7 @@ TEST_CASE_METHOD(ListFileTestFixture, "Test listing files 
only once with default
   REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:file.lastModifiedTime 
value:" + 
*utils::file::FileUtils::get_last_modified_time_formatted_string(first_sub_file_abs_path_,
 FORMAT_STRING)));
   REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:file.lastModifiedTime 
value:" + 
*utils::file::FileUtils::get_last_modified_time_formatted_string(second_sub_file_abs_path_,
 FORMAT_STRING)));
   plan_->reset();
-  
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+  LogTestController::getInstance().clear();
   test_controller_.runSession(plan_, true);
   REQUIRE_FALSE(LogTestController::getInstance().contains("key:file.size", 0s, 
0ms));
 }
diff --git a/extensions/standard-processors/tests/unit/RetryFlowFileTests.cpp 
b/extensions/standard-processors/tests/unit/RetryFlowFileTests.cpp
index daf2f1abe..9dfd3f53c 100644
--- a/extensions/standard-processors/tests/unit/RetryFlowFileTests.cpp
+++ b/extensions/standard-processors/tests/unit/RetryFlowFileTests.cpp
@@ -163,13 +163,13 @@ class RetryFlowFileTest {
   }
 
   static bool logContainsText(const std::string& pattern) {
-    const std::string logs = LogTestController::getInstance().log_output.str();
+    const std::string logs = LogTestController::getInstance().getLogs();
     return logs.find(pattern) != std::string::npos;
   }
 
   static bool flowfileWasPenalizedARetryflowfile() {
     std::regex re(R"(\[org::apache::nifi::minifi::core::ProcessSession\] 
\[info\] Penalizing [0-9a-z\-]+ for [0-9]*ms at retryflowfile)");
-    return 
std::regex_search(LogTestController::getInstance().log_output.str(), re);
+    return std::regex_search(LogTestController::getInstance().getLogs(), re);
   }
 
   static bool retryFlowfileWarnedForReuse() {
diff --git a/extensions/standard-processors/tests/unit/TailFileTests.cpp 
b/extensions/standard-processors/tests/unit/TailFileTests.cpp
index fd82b81c9..dd295a9ef 100644
--- a/extensions/standard-processors/tests/unit/TailFileTests.cpp
+++ b/extensions/standard-processors/tests/unit/TailFileTests.cpp
@@ -140,7 +140,7 @@ TEST_CASE("TailFile picks up the second line if a delimiter 
is written between r
   REQUIRE(LogTestController::getInstance().contains("key:filename 
value:minifi-tmpfile.0-13.txt"));
 
   plan->reset(true);  // start a new but with state file
-  
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+  LogTestController::getInstance().clear();
 
   std::ofstream appendStream;
   appendStream.open(temp_file_path, std::ios_base::app | 
std::ios_base::binary);
@@ -182,7 +182,7 @@ TEST_CASE("TailFile re-reads the file if the state is 
deleted between runs", "[s
   REQUIRE(LogTestController::getInstance().contains("key:filename 
value:minifi-tmpfile.0-13.txt"));
 
   plan->reset(true);  // start a new but with state file
-  
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+  LogTestController::getInstance().clear();
 
   plan->getProcessContextForProcessor(tailfile)->getStateManager()->clear();
 
@@ -230,7 +230,7 @@ TEST_CASE("TailFile picks up the state correctly if it is 
rewritten between runs
   // should stay the same
   for (int i = 0; i < 5; i++) {
     plan->reset(true);  // start a new but with state file
-    
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+    LogTestController::getInstance().clear();
 
     
plan->getProcessContextForProcessor(tailfile)->getStateManager()->set({{"file.0.name",
 temp_file_path.filename().string()},
                                                                                
    {"file.0.position", "14"},
@@ -392,7 +392,7 @@ TEST_CASE("TailFile picks up the new File to Tail if it is 
changed between runs"
     auto second_test_file = createTempFile(directory, "second.log", "my second 
log line\n");
     plan->setProperty(tail_file, 
org::apache::nifi::minifi::processors::TailFile::FileName.getName(), 
second_test_file.string());
     plan->reset(true);  // clear the memory, but keep the state file
-    
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+    LogTestController::getInstance().clear();
     testController.runSession(plan, true);
     REQUIRE(LogTestController::getInstance().contains("Logged 1 flow file"));
     REQUIRE(LogTestController::getInstance().contains("key:filename 
value:second.0-18.log"));
@@ -403,7 +403,7 @@ TEST_CASE("TailFile picks up the new File to Tail if it is 
changed between runs"
     auto second_test_file = createTempFile(directory, "second.log", "");
     plan->setProperty(tail_file, 
org::apache::nifi::minifi::processors::TailFile::FileName.getName(), 
second_test_file.string());
     plan->reset(true);  // clear the memory, but keep the state file
-    
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+    LogTestController::getInstance().clear();
     testController.runSession(plan, true);
     REQUIRE(LogTestController::getInstance().contains("Logged 0 flow files"));
   }
@@ -441,7 +441,7 @@ TEST_CASE("TailFile picks up the new File to Tail if it is 
changed between runs
   SECTION("If a file no longer matches the new regex, then we stop tailing 
it") {
     plan->setProperty(tail_file, 
org::apache::nifi::minifi::processors::TailFile::FileName.getName(), 
"first\\.f.*\\.log");
     plan->reset(true);  // clear the memory, but keep the state file
-    
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+    LogTestController::getInstance().clear();
     testController.runSession(plan, true);
     REQUIRE(LogTestController::getInstance().contains("Logged 1 flow file"));
     REQUIRE(LogTestController::getInstance().contains("key:filename 
value:first.fruit.6-12.log"));
@@ -450,7 +450,7 @@ TEST_CASE("TailFile picks up the new File to Tail if it is 
changed between runs
   SECTION("If a new file matches the new regex, we start tailing it") {
     plan->setProperty(tail_file, 
org::apache::nifi::minifi::processors::TailFile::FileName.getName(), 
".*\\.fruit\\.log");
     plan->reset(true);  // clear the memory, but keep the state file
-    
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+    LogTestController::getInstance().clear();
     testController.runSession(plan, true);
     REQUIRE(LogTestController::getInstance().contains("Logged 2 flow file"));
     REQUIRE(LogTestController::getInstance().contains("key:filename 
value:first.fruit.6-12.log"));
@@ -527,7 +527,7 @@ TEST_CASE("TailFile picks up new files created between 
runs", "[multiple_file]")
   createTempFile(dir, "another.log", "some more content\n");
 
   plan->reset();
-  
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+  LogTestController::getInstance().clear();
 
   testController.runSession(plan, true);
   REQUIRE(LogTestController::getInstance().contains("Logged 1 flow file"));
@@ -563,7 +563,7 @@ TEST_CASE("TailFile can handle input files getting 
removed", "[multiple_file]")
   REQUIRE(LogTestController::getInstance().contains("Logged 2 flow files"));
 
   plan->reset();
-  
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+  LogTestController::getInstance().clear();
 
   appendTempFile(dir, "one.log", "line two\nline three\nline four\n");
   std::filesystem::remove(dir / "two.log");
@@ -830,7 +830,7 @@ TEST_CASE("TailFile finds and finishes the renamed file and 
continues with the n
   new_in_file_stream.close();
 
   plan->reset();
-  
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+  LogTestController::getInstance().clear();
 
   plan->runNextProcessor();  // Tail
   plan->runNextProcessor();  // Log
@@ -891,7 +891,7 @@ TEST_CASE("TailFile finds and finishes multiple rotated 
files and continues with
   test_file_stream_2.close();
 
   plan->reset();
-  
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+  LogTestController::getInstance().clear();
 
   testController.runSession(plan, true);
 
@@ -936,7 +936,7 @@ TEST_CASE("TailFile ignores old rotated files", 
"[rotation]") {
   createTempFile(dir, "test.log", "line8\nline9\n");
 
   plan->reset();
-  
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+  LogTestController::getInstance().clear();
 
   testController.runSession(plan, true);
   REQUIRE(LogTestController::getInstance().contains("Logged 2 flow files"));
@@ -996,7 +996,7 @@ TEST_CASE("TailFile rotation works with multiple input 
files", "[rotation][multi
   appendTempFile(dir, "color.log", "turquoise\n");
 
   plan->reset();
-  
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+  LogTestController::getInstance().clear();
 
   testController.runSession(plan, true);
 
@@ -1063,7 +1063,7 @@ TEST_CASE("TailFile handles the Rolling Filename Pattern 
property correctly", "[
   createTempFile(dir, "other_rolled.log", "some stuff\none more line\n");  // 
same contents as test.rolled.log
 
   plan->reset();
-  
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+  LogTestController::getInstance().clear();
 
   testController.runSession(plan, true);
 
@@ -1104,7 +1104,7 @@ TEST_CASE("TailFile finds and finishes the renamed file 
and continues with the n
     REQUIRE(LogTestController::getInstance().contains("Logged 3 flow files"));
   }
 
-  
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+  LogTestController::getInstance().clear();
 
   std::this_thread::sleep_for(std::chrono::milliseconds(100));
 
@@ -1466,7 +1466,7 @@ TEST_CASE("TailFile interprets the lookup frequency 
property correctly", "[multi
     REQUIRE(LogTestController::getInstance().contains("Logged 1 flow files"));
 
     plan->reset();
-    
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+    LogTestController::getInstance().clear();
 
     createTempFile(directory, "test.blue.log", "sky\n");
     createTempFile(directory, "test.green.log", "grass\n");
@@ -1482,7 +1482,7 @@ TEST_CASE("TailFile interprets the lookup frequency 
property correctly", "[multi
     REQUIRE(LogTestController::getInstance().contains("Logged 1 flow files"));
 
     plan->reset();
-    
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+    LogTestController::getInstance().clear();
 
     createTempFile(directory, "test.blue.log", "sky\n");
     createTempFile(directory, "test.green.log", "grass\n");
@@ -1491,7 +1491,7 @@ TEST_CASE("TailFile interprets the lookup frequency 
property correctly", "[multi
     REQUIRE(LogTestController::getInstance().contains("Logged 0 flow files"));
 
     plan->reset();
-    
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+    LogTestController::getInstance().clear();
 
     std::this_thread::sleep_for(std::chrono::milliseconds(550));
     testController.runSession(plan, true);
@@ -1531,7 +1531,7 @@ TEST_CASE("TailFile reads from a single file when Initial 
Start Position is set"
     REQUIRE(LogTestController::getInstance().contains("Size:" + 
std::to_string(NEWLINE_FILE.find_first_of('\n') + 1) + " Offset:0"));
 
     plan->reset(true);
-    
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+    LogTestController::getInstance().clear();
 
     appendTempFile(dir, TMP_FILE, NEW_TAIL_DATA);
 
@@ -1551,7 +1551,7 @@ TEST_CASE("TailFile reads from a single file when Initial 
Start Position is set"
     REQUIRE(LogTestController::getInstance().contains("Size:" + 
std::to_string(ROLLED_OVER_TAIL_DATA.find_first_of('\n') + 1) + " Offset:0"));
 
     plan->reset(true);
-    
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+    LogTestController::getInstance().clear();
 
     appendTempFile(dir, TMP_FILE, NEW_TAIL_DATA);
 
@@ -1569,7 +1569,7 @@ TEST_CASE("TailFile reads from a single file when Initial 
Start Position is set"
     REQUIRE(LogTestController::getInstance().contains("Logged 0 flow files"));
 
     plan->reset(true);
-    
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+    LogTestController::getInstance().clear();
 
     appendTempFile(dir, TMP_FILE, NEW_TAIL_DATA);
 
@@ -1604,7 +1604,7 @@ TEST_CASE("TailFile reads from a single file when Initial 
Start Position is set
   REQUIRE(LogTestController::getInstance().contains("Logged 0 flow files"));
 
   plan->reset(true);
-  
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+  LogTestController::getInstance().clear();
 
   const std::string DATA_IN_NEW_FILE = "data in new file\n";
   std::this_thread::sleep_for(std::chrono::milliseconds(100));  // make sure 
the new file gets newer modification time
@@ -1652,7 +1652,7 @@ TEST_CASE("TailFile reads multiple files when Initial 
Start Position is set", "[
     REQUIRE(LogTestController::getInstance().contains("Size:" + 
std::to_string(TMP_FILE_2_DATA.find_first_of('\n') + 1) + " Offset:0"));
 
     plan->reset(true);
-    
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+    LogTestController::getInstance().clear();
 
     createTempFile(dir, "minifi-tmpfile-3.txt", 
ADDITIONALY_CREATED_FILE_CONTENT);
     appendTempFile(dir, TMP_FILE, NEW_TAIL_DATA);
@@ -1675,7 +1675,7 @@ TEST_CASE("TailFile reads multiple files when Initial 
Start Position is set", "[
     REQUIRE(LogTestController::getInstance().contains("Size:" + 
std::to_string(TMP_FILE_2_DATA.find_first_of('\n') + 1) + " Offset:0"));
 
     plan->reset(true);
-    
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+    LogTestController::getInstance().clear();
 
     createTempFile(dir, "minifi-tmpfile-3.txt", 
ADDITIONALY_CREATED_FILE_CONTENT);
     appendTempFile(dir, TMP_FILE, NEW_TAIL_DATA);
@@ -1695,7 +1695,7 @@ TEST_CASE("TailFile reads multiple files when Initial 
Start Position is set", "[
     REQUIRE(LogTestController::getInstance().contains("Logged 0 flow files"));
 
     plan->reset(true);
-    
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+    LogTestController::getInstance().clear();
 
     createTempFile(dir, "minifi-tmpfile-3.txt", 
ADDITIONALY_CREATED_FILE_CONTENT);
     appendTempFile(dir, TMP_FILE, NEW_TAIL_DATA);
diff --git a/extensions/windows-event-log/tests/CWELTestUtils.h 
b/extensions/windows-event-log/tests/CWELTestUtils.h
index 0a0bd85ee..efe240141 100644
--- a/extensions/windows-event-log/tests/CWELTestUtils.h
+++ b/extensions/windows-event-log/tests/CWELTestUtils.h
@@ -69,7 +69,7 @@ class OutputFormatTestController : public TestController {
     }
 
     test_plan->reset();
-    
LogTestController::resetStream(LogTestController::getInstance().log_output);
+    LogTestController::getInstance().clear();
 
 
     {
diff --git a/extensions/windows-event-log/tests/ConsumeWindowsEventLogTests.cpp 
b/extensions/windows-event-log/tests/ConsumeWindowsEventLogTests.cpp
index 44a2f125f..ee9020161 100644
--- a/extensions/windows-event-log/tests/ConsumeWindowsEventLogTests.cpp
+++ b/extensions/windows-event-log/tests/ConsumeWindowsEventLogTests.cpp
@@ -152,7 +152,7 @@ TEST_CASE("ConsumeWindowsEventLog can consume new events", 
"[onTrigger]") {
   // later runs will start with a bookmark saved in the state manager
 
   test_plan->reset();
-  LogTestController::resetStream(LogTestController::getInstance().log_output);
+  LogTestController::getInstance().clear();
 
   SECTION("Read one event") {
     reportEvent(APPLICATION_CHANNEL, "Event one");
@@ -198,7 +198,7 @@ TEST_CASE("ConsumeWindowsEventLog bookmarking works", 
"[onTrigger]") {
   CHECK(LogTestController::getInstance().contains("processed 0 Events"));
 
   test_plan->reset();
-  LogTestController::resetStream(LogTestController::getInstance().log_output);
+  LogTestController::getInstance().clear();
 
   SECTION("Read in one go") {
     reportEvent(APPLICATION_CHANNEL, "Event one");
@@ -219,7 +219,7 @@ TEST_CASE("ConsumeWindowsEventLog bookmarking works", 
"[onTrigger]") {
     reportEvent(APPLICATION_CHANNEL, "Event three");
 
     test_plan->reset();
-    
LogTestController::resetStream(LogTestController::getInstance().log_output);
+    LogTestController::getInstance().clear();
 
     TestController::runSession(test_plan);
     CHECK(LogTestController::getInstance().contains("processed 2 Events"));
@@ -259,7 +259,7 @@ TEST_CASE("ConsumeWindowsEventLog extracts some attributes 
by default", "[onTrig
   }
 
   test_plan->reset();
-  LogTestController::resetStream(LogTestController::getInstance().log_output);
+  LogTestController::getInstance().clear();
 
   // 1st event, on Info level
   {
@@ -272,7 +272,7 @@ TEST_CASE("ConsumeWindowsEventLog extracts some attributes 
by default", "[onTrig
   }
 
   test_plan->reset();
-  LogTestController::resetStream(LogTestController::getInstance().log_output);
+  LogTestController::getInstance().clear();
 
   // 2st event, on Warning level
   {
@@ -308,7 +308,7 @@ void outputFormatSetterTestHelper(const std::string 
&output_format, int expected
   }
 
   test_plan->reset();
-  LogTestController::resetStream(LogTestController::getInstance().log_output);
+  LogTestController::getInstance().clear();
 
   {
     reportEvent(APPLICATION_CHANNEL, "Event one");
@@ -442,7 +442,7 @@ void batchCommitSizeTestHelper(std::size_t num_events_read, 
std::size_t batch_co
   }
 
   test_plan->reset();
-  LogTestController::resetStream(LogTestController::getInstance().log_output);
+  LogTestController::getInstance().clear();
 
   auto generate_events = [](const std::size_t event_count) {
     std::vector<std::string> events;
diff --git a/libminifi/include/utils/IntegrationTestUtils.h 
b/libminifi/include/utils/IntegrationTestUtils.h
index 4d96132b7..1799a6b87 100644
--- a/libminifi/include/utils/IntegrationTestUtils.h
+++ b/libminifi/include/utils/IntegrationTestUtils.h
@@ -44,7 +44,7 @@ bool verifyEventHappenedInPollTime(
 template <class Rep, class Period, typename ...String>
 bool verifyLogLinePresenceInPollTime(const std::chrono::duration<Rep, Period>& 
wait_duration, String&&... patterns) {
   auto check = [&patterns...] {
-    const std::string logs = LogTestController::getInstance().log_output.str();
+    const std::string logs = LogTestController::getInstance().getLogs();
     return ((logs.find(patterns) != std::string::npos) && ...);
   };
   return verifyEventHappenedInPollTime(wait_duration, check);
diff --git a/libminifi/test/LogUtils.h b/libminifi/test/LogUtils.h
index 942775535..5fedd0a75 100644
--- a/libminifi/test/LogUtils.h
+++ b/libminifi/test/LogUtils.h
@@ -21,27 +21,34 @@
 #include <memory>
 #include <string>
 #include <utility>
+#include <mutex>
 
 #include "spdlog/sinks/sink.h"
 #include "spdlog/sinks/ostream_sink.h"
 
 class StringStreamSink : public spdlog::sinks::sink {
  public:
-  explicit StringStreamSink(std::shared_ptr<std::ostringstream> stream, bool 
force_flush = false)
-    : stream_(std::move(stream)), sink_(*stream_, force_flush) {}
+  explicit StringStreamSink(std::shared_ptr<std::ostringstream> stream, 
std::shared_ptr<std::mutex> log_output_mutex, bool force_flush = false)
+      : stream_(std::move(stream)),
+        log_output_mutex_(std::move(log_output_mutex)),
+        sink_(*stream_, force_flush) {}
 
   ~StringStreamSink() override = default;
 
   void log(const spdlog::details::log_msg &msg) override {
+    std::lock_guard<std::mutex> guard(*log_output_mutex_);
     sink_.log(msg);
   }
   void flush() override {
+    std::lock_guard<std::mutex> guard(*log_output_mutex_);
     sink_.flush();
   }
   void set_pattern(const std::string &pattern) override {
+    std::lock_guard<std::mutex> guard(*log_output_mutex_);
     sink_.set_pattern(pattern);
   }
   void set_formatter(std::unique_ptr<spdlog::formatter> sink_formatter) 
override {
+    std::lock_guard<std::mutex> guard(*log_output_mutex_);
     sink_.set_formatter(std::move(sink_formatter));
   }
 
@@ -51,5 +58,6 @@ class StringStreamSink : public spdlog::sinks::sink {
   // static storage duration, thus they might outlive the provider of
   // the stream
   std::shared_ptr<std::ostringstream> stream_;
+  std::shared_ptr<std::mutex> log_output_mutex_;
   spdlog::sinks::ostream_sink_mt sink_;
 };
diff --git a/libminifi/test/TestBase.cpp b/libminifi/test/TestBase.cpp
index ba2cb6d92..cb61f540f 100644
--- a/libminifi/test/TestBase.cpp
+++ b/libminifi/test/TestBase.cpp
@@ -92,6 +92,14 @@ void 
LogTestController::setLevelByClassName(spdlog::level::level_enum level, con
 }
 
 bool LogTestController::contains(const std::ostringstream& stream, const 
std::string& ending, std::chrono::milliseconds timeout, 
std::chrono::milliseconds sleep_interval) const {
+  return contains([&stream](){ return stream.str(); }, ending, timeout, 
sleep_interval);
+}
+
+bool LogTestController::contains(const std::string& ending, 
std::chrono::milliseconds timeout, std::chrono::milliseconds sleep_interval) 
const {
+  return contains([this](){ return getLogs(); }, ending, timeout, 
sleep_interval);
+}
+
+bool LogTestController::contains(const std::function<std::string()>& 
log_string_getter, const std::string& ending, std::chrono::milliseconds 
timeout, std::chrono::milliseconds sleep_interval) const {
   if (ending.length() == 0) {
     return false;
   }
@@ -99,7 +107,7 @@ bool LogTestController::contains(const std::ostringstream& 
stream, const std::st
   bool found = false;
   bool timed_out = false;
   do {
-    std::string str = stream.str();
+    std::string str = log_string_getter();
     found = (str.find(ending) != std::string::npos);
     auto now = std::chrono::steady_clock::now();
     timed_out = (now - start > timeout);
@@ -122,7 +130,7 @@ std::optional<std::smatch> 
LogTestController::matchesRegex(const std::string& re
   std::regex matcher_regex(regex_str);
   std::smatch match;
   do {
-    std::string str = log_output.str();
+    std::string str = getLogs();
     found = std::regex_search(str, match, matcher_regex);
     auto now = std::chrono::steady_clock::now();
     timed_out = (now - start > timeout);
@@ -136,7 +144,7 @@ std::optional<std::smatch> 
LogTestController::matchesRegex(const std::string& re
 }
 
 int LogTestController::countOccurrences(const std::string& pattern) const {
-  return minifi::utils::StringUtils::countOccurrences(log_output.str(), 
pattern).second;
+  return minifi::utils::StringUtils::countOccurrences(getLogs(), 
pattern).second;
 }
 
 void LogTestController::reset() {
@@ -150,7 +158,9 @@ void LogTestController::reset() {
 }
 
 void LogTestController::clear() {
-  resetStream(log_output);
+  std::lock_guard<std::mutex> guard(*log_output_mutex_);
+  gsl_Expects(log_output_ptr_);
+  resetStream(*log_output_ptr_);
 }
 
 void LogTestController::resetStream(std::ostringstream& stream) {
@@ -159,6 +169,7 @@ void LogTestController::resetStream(std::ostringstream& 
stream) {
 }
 
 LogTestController::LogTestController(const 
std::shared_ptr<logging::LoggerProperties>& loggerProps) {
+  gsl_Expects(log_output_ptr_);
   my_properties_ = loggerProps;
   bool initMain = false;
   if (nullptr == my_properties_) {
@@ -169,7 +180,7 @@ LogTestController::LogTestController(const 
std::shared_ptr<logging::LoggerProper
   my_properties_->set("logger." + 
minifi::core::getClassName<LogTestController>(), "INFO");
   my_properties_->set("logger." + 
minifi::core::getClassName<logging::LoggerConfiguration>(), "INFO");
   std::shared_ptr<spdlog::sinks::dist_sink_mt> dist_sink = 
std::make_shared<spdlog::sinks::dist_sink_mt>();
-  dist_sink->add_sink(std::make_shared<StringStreamSink>(log_output_ptr, 
true));
+  dist_sink->add_sink(std::make_shared<StringStreamSink>(log_output_ptr_, 
log_output_mutex_, true));
   dist_sink->add_sink(std::make_shared<spdlog::sinks::stderr_sink_mt>());
   my_properties_->add_sink("ostream", dist_sink);
   if (initMain) {
diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h
index 147a91911..e2118144e 100644
--- a/libminifi/test/TestBase.h
+++ b/libminifi/test/TestBase.h
@@ -26,6 +26,7 @@
 #include <string>
 #include <utility>
 #include <vector>
+#include <mutex>
 
 #include "spdlog/common.h"
 
@@ -129,9 +130,7 @@ class LogTestController {
 
   void setLevelByClassName(spdlog::level::level_enum level, const std::string& 
class_name);
 
-  bool contains(const std::string &ending, std::chrono::milliseconds timeout = 
std::chrono::seconds(3), std::chrono::milliseconds sleep_interval = 
std::chrono::milliseconds(200)) const {
-    return contains(log_output, ending, timeout, sleep_interval);
-  }
+  bool contains(const std::string &ending, std::chrono::milliseconds timeout = 
std::chrono::seconds(3), std::chrono::milliseconds sleep_interval = 
std::chrono::milliseconds(200)) const;
 
   bool contains(const std::ostringstream &stream, const std::string &ending,
                 std::chrono::milliseconds timeout = std::chrono::seconds(3),
@@ -149,8 +148,11 @@ class LogTestController {
 
   static void resetStream(std::ostringstream &stream);
 
-  std::shared_ptr<std::ostringstream> log_output_ptr = 
std::make_shared<std::ostringstream>();
-  std::ostringstream& log_output = *log_output_ptr;
+  std::string getLogs() const {
+    std::lock_guard<std::mutex> guard(*log_output_mutex_);
+    gsl_Expects(log_output_ptr_);
+    return log_output_ptr_->str();
+  }
 
   std::shared_ptr<logging::Logger> logger_;
 
@@ -162,7 +164,10 @@ class LogTestController {
   explicit LogTestController(const std::shared_ptr<logging::LoggerProperties> 
&loggerProps);
 
   void setLevel(const std::string& name, spdlog::level::level_enum level);
+  bool contains(const std::function<std::string()>& log_string_getter, const 
std::string& ending, std::chrono::milliseconds timeout, 
std::chrono::milliseconds sleep_interval) const;
 
+  mutable std::shared_ptr<std::mutex> log_output_mutex_ = 
std::make_shared<std::mutex>();
+  std::shared_ptr<std::ostringstream> log_output_ptr_ = 
std::make_shared<std::ostringstream>();
   std::shared_ptr<logging::LoggerProperties> my_properties_;
   std::unique_ptr<logging::LoggerConfiguration> config;
   std::vector<std::string> modified_loggers;
diff --git a/libminifi/test/azure-tests/ListAzureBlobStorageTests.cpp 
b/libminifi/test/azure-tests/ListAzureBlobStorageTests.cpp
index 9fec6d41e..7164becb1 100644
--- a/libminifi/test/azure-tests/ListAzureBlobStorageTests.cpp
+++ b/libminifi/test/azure-tests/ListAzureBlobStorageTests.cpp
@@ -246,7 +246,7 @@ TEST_CASE_METHOD(ListAzureBlobStorageTestsFixture, "List 
all files every time",
   };
   run_assertions();
   plan_->reset();
-  
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+  LogTestController::getInstance().clear();
   test_controller_.runSession(plan_, true);
   run_assertions();
 }
@@ -279,7 +279,7 @@ TEST_CASE_METHOD(ListAzureBlobStorageTestsFixture, "Do not 
list same files the s
   CHECK(verifyLogLinePresenceInPollTime(1s, "key:lang value:en-US"));
   CHECK(verifyLogLinePresenceInPollTime(1s, "key:lang value:de-DE"));
   plan_->reset();
-  
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+  LogTestController::getInstance().clear();
   test_controller_.runSession(plan_, true);
   REQUIRE_FALSE(LogTestController::getInstance().contains("key:azure", 0s, 
0ms));
 }
diff --git a/libminifi/test/azure-tests/ListAzureDataLakeStorageTests.cpp 
b/libminifi/test/azure-tests/ListAzureDataLakeStorageTests.cpp
index d91771ccd..d6682a710 100644
--- a/libminifi/test/azure-tests/ListAzureDataLakeStorageTests.cpp
+++ b/libminifi/test/azure-tests/ListAzureDataLakeStorageTests.cpp
@@ -118,7 +118,7 @@ TEST_CASE_METHOD(ListAzureDataLakeStorageTestsFixture, 
"List all files every tim
   };
   run_assertions();
   plan_->reset();
-  
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+  LogTestController::getInstance().clear();
   test_controller_.runSession(plan_, true);
   run_assertions();
 }
@@ -147,7 +147,7 @@ TEST_CASE_METHOD(ListAzureDataLakeStorageTestsFixture, "Do 
not list same files t
   CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.lastModified value:" + 
mock_data_lake_storage_client_ptr_->ITEM1_LAST_MODIFIED + "\n"));
   CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.lastModified value:" + 
mock_data_lake_storage_client_ptr_->ITEM2_LAST_MODIFIED + "\n"));
   plan_->reset();
-  
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+  LogTestController::getInstance().clear();
   test_controller_.runSession(plan_, true);
   REQUIRE_FALSE(LogTestController::getInstance().contains("key:azure", 0s, 
0ms));
 }
diff --git a/libminifi/test/integration/OnScheduleErrorHandlingTests.cpp 
b/libminifi/test/integration/OnScheduleErrorHandlingTests.cpp
index b20d34164..78526ce7b 100644
--- a/libminifi/test/integration/OnScheduleErrorHandlingTests.cpp
+++ b/libminifi/test/integration/OnScheduleErrorHandlingTests.cpp
@@ -35,7 +35,7 @@ class KamikazeErrorHandlingTests : public IntegrationBase {
   void runAssertions() override {
     using org::apache::nifi::minifi::utils::verifyEventHappenedInPollTime;
     assert(verifyEventHappenedInPollTime(wait_time_, [&] {
-      const std::string logs = 
LogTestController::getInstance().log_output.str();
+      const std::string logs = LogTestController::getInstance().getLogs();
       const auto result = utils::StringUtils::countOccurrences(logs, 
minifi::processors::KamikazeProcessor::OnScheduleExceptionStr);
       const int occurrences = result.second;
       return 1 < occurrences;
@@ -48,7 +48,7 @@ class KamikazeErrorHandlingTests : public IntegrationBase {
                                                  "[warning] ProcessSession 
rollback for kamikaze executed"};
 
     const bool test_success = 
verifyEventHappenedInPollTime(std::chrono::milliseconds(wait_time_), [&] {
-      const std::string logs = 
LogTestController::getInstance().log_output.str();
+      const std::string logs = LogTestController::getInstance().getLogs();
       const auto result = utils::StringUtils::countOccurrences(logs, 
minifi::processors::KamikazeProcessor::OnScheduleExceptionStr);
       size_t last_pos = result.first;
       for (const std::string& msg : must_appear_byorder_msgs) {
@@ -61,7 +61,7 @@ class KamikazeErrorHandlingTests : public IntegrationBase {
     });
     assert(test_success);
 
-    
assert(LogTestController::getInstance().log_output.str().find(minifi::processors::KamikazeProcessor::OnTriggerLogStr)
 == std::string::npos);
+    
assert(LogTestController::getInstance().getLogs().find(minifi::processors::KamikazeProcessor::OnTriggerLogStr)
 == std::string::npos);
   }
 
   void testSetup() override {
@@ -104,7 +104,7 @@ class EventDriverScheduleErrorHandlingTests: public 
IntegrationBase {
   }
 
   void runAssertions() override {
-    std::string logs = LogTestController::getInstance().log_output.str();
+    std::string logs = LogTestController::getInstance().getLogs();
     assert(logs.find("EventDrivenSchedulingAgent cannot schedule processor 
without incoming connection!") != std::string::npos);
   }
 
diff --git a/libminifi/test/integration/StateTransactionalityTests.cpp 
b/libminifi/test/integration/StateTransactionalityTests.cpp
index 97602a07c..ff3d86e95 100644
--- a/libminifi/test/integration/StateTransactionalityTests.cpp
+++ b/libminifi/test/integration/StateTransactionalityTests.cpp
@@ -104,14 +104,14 @@ const std::unordered_map<std::string, std::string> 
exampleState{{"key1", "value1
 const std::unordered_map<std::string, std::string> exampleState2{{"key3", 
"value3"}, {"key4", "value4"}};
 
 auto standardLogChecker = [] {
-  const std::string logs = LogTestController::getInstance().log_output.str();
+  const std::string logs = LogTestController::getInstance().getLogs();
   const auto errorResult = utils::StringUtils::countOccurrences(logs, 
"[error]");
   const auto warningResult = utils::StringUtils::countOccurrences(logs, 
"[warning]");
   return errorResult.second == 0 && warningResult.second == 0;
 };
 
 auto commitAndRollbackWarnings = [] {
-  const std::string logs = LogTestController::getInstance().log_output.str();
+  const std::string logs = LogTestController::getInstance().getLogs();
   const auto errorResult = utils::StringUtils::countOccurrences(logs, 
"[error]");
   const auto commitWarningResult = utils::StringUtils::countOccurrences(logs, 
"[warning] Caught \"Process Session Operation: State manager commit failed.\"");
   const auto rollbackWarningFirst = utils::StringUtils::countOccurrences(logs, 
"[warning] Caught Exception during process session rollback");
@@ -120,7 +120,7 @@ auto commitAndRollbackWarnings = [] {
 };
 
 auto exceptionRollbackWarnings = [] {
-  const std::string logs = LogTestController::getInstance().log_output.str();
+  const std::string logs = LogTestController::getInstance().getLogs();
   const auto errorResult = utils::StringUtils::countOccurrences(logs, 
"[error]");
   const auto exceptionWarningResult = 
utils::StringUtils::countOccurrences(logs, "[warning] Caught \"Triggering 
rollback\"");
   const auto rollbackWarningResult = 
utils::StringUtils::countOccurrences(logs, "[warning] ProcessSession rollback 
for statefulProcessor executed");


Reply via email to