This is an automated email from the ASF dual-hosted git repository.

fgerlits pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit a328a01d288d021a19023facd79a76b2a7d2709f
Author: Gabor Gyimesi <[email protected]>
AuthorDate: Wed Aug 9 18:24:24 2023 +0200

    MINIFICPP-2174 Send all cached compressed log files through C2
    
    Signed-off-by: Ferenc Gerlits <[email protected]>
    This closes #1631
---
 .../include/core/logging/LoggerConfiguration.h     |  8 +--
 .../core/logging/internal/CompressionManager.h     |  7 +-
 .../core/logging/internal/LogCompressorSink.h      | 33 ++++++---
 libminifi/include/utils/StagingQueue.h             |  8 +++
 libminifi/src/FlowController.cpp                   |  6 +-
 .../core/logging/internal/CompressionManager.cpp   |  3 +-
 libminifi/test/unit/LoggerTests.cpp                | 78 ++++++++++++++++------
 7 files changed, 104 insertions(+), 39 deletions(-)

diff --git a/libminifi/include/core/logging/LoggerConfiguration.h 
b/libminifi/include/core/logging/LoggerConfiguration.h
index 6dfd7effb..c62ee74b4 100644
--- a/libminifi/include/core/logging/LoggerConfiguration.h
+++ b/libminifi/include/core/logging/LoggerConfiguration.h
@@ -98,15 +98,15 @@ class LoggerConfiguration {
    */
   void initialize(const std::shared_ptr<LoggerProperties> &logger_properties);
 
-  static std::unique_ptr<io::InputStream> getCompressedLog(bool flush = false) 
{
-    return getCompressedLog(std::chrono::milliseconds{0}, flush);
+  static std::vector<std::unique_ptr<io::InputStream>> getCompressedLogs() {
+    return getCompressedLogs(std::chrono::milliseconds{0});
   }
 
   void initializeAlertSinks(core::controller::ControllerServiceProvider* 
controller, const std::shared_ptr<AgentIdentificationProvider>& agent_id);
 
   template<class Rep, class Period>
-  static std::unique_ptr<io::InputStream> getCompressedLog(const 
std::chrono::duration<Rep, Period>& time, bool flush = false) {
-    return getConfiguration().compression_manager_.getCompressedLog(time, 
flush);
+  static std::vector<std::unique_ptr<io::InputStream>> getCompressedLogs(const 
std::chrono::duration<Rep, Period>& time) {
+    return getConfiguration().compression_manager_.getCompressedLogs(time);
   }
 
   /**
diff --git a/libminifi/include/core/logging/internal/CompressionManager.h 
b/libminifi/include/core/logging/internal/CompressionManager.h
index b1ea9fac9..b881a5a3f 100644
--- a/libminifi/include/core/logging/internal/CompressionManager.h
+++ b/libminifi/include/core/logging/internal/CompressionManager.h
@@ -24,6 +24,7 @@
 #include <functional>
 #include <utility>
 #include <string>
+#include <vector>
 
 #include "core/logging/Logger.h"
 #include "LogCompressorSink.h"
@@ -50,12 +51,12 @@ class CompressionManager {
   std::shared_ptr<LogCompressorSink> initialize(const 
std::shared_ptr<LoggerProperties>& properties, const std::shared_ptr<Logger>& 
error_logger, const LoggerFactory& logger_factory);
 
   template<class Rep, class Period>
-  std::unique_ptr<io::InputStream> getCompressedLog(const 
std::chrono::duration<Rep, Period>& time, bool flush = false) {
+  std::vector<std::unique_ptr<io::InputStream>> getCompressedLogs(const 
std::chrono::duration<Rep, Period>& time) {
     std::shared_ptr<internal::LogCompressorSink> sink = getSink();
     if (sink) {
-      return sink->getContent(time, flush);
+      return sink->getContent(time);
     }
-    return nullptr;
+    return {};
   }
 
   static constexpr const char* compression_cached_log_max_size_ = 
"compression.cached.log.max.size";
diff --git a/libminifi/include/core/logging/internal/LogCompressorSink.h 
b/libminifi/include/core/logging/internal/LogCompressorSink.h
index fd02521ad..009fc1235 100644
--- a/libminifi/include/core/logging/internal/LogCompressorSink.h
+++ b/libminifi/include/core/logging/internal/LogCompressorSink.h
@@ -21,6 +21,7 @@
 #include <memory>
 #include <atomic>
 #include <utility>
+#include <vector>
 
 #include "spdlog/common.h"
 #include "spdlog/details/log_msg.h"
@@ -57,26 +58,42 @@ class LogCompressorSink : public 
spdlog::sinks::base_sink<std::mutex> {
   ~LogCompressorSink() override;
 
   template<class Rep, class Period>
-  std::unique_ptr<io::InputStream> getContent(const std::chrono::duration<Rep, 
Period>& time, bool flush = false) {
-    if (flush) {
-      cached_logs_.commit();
-      compress(true);
+  std::vector<std::unique_ptr<io::InputStream>> getContent(const 
std::chrono::duration<Rep, Period>& time) {
+    cached_logs_.commit();
+    compress(true);
+
+    std::vector<std::unique_ptr<io::InputStream>> log_segments;
+    const auto segment_count = compressed_logs_.itemCount();
+    for (size_t i = 0; i < segment_count; ++i) {
+      LogBuffer compressed;
+      if (!compressed_logs_.tryDequeue(compressed, time)) {
+        break;
+      }
+      log_segments.push_back(std::move(compressed.buffer_));
     }
-    LogBuffer compressed;
-    if (!compressed_logs_.tryDequeue(compressed, time) && flush) {
-      return createEmptyArchive();
+
+    if (log_segments.empty()) {
+      log_segments.push_back(createEmptyArchive());
     }
-    return std::move(compressed.buffer_);
+    return log_segments;
   }
 
   size_t getMaxCacheSize() const {
     return cached_logs_.getMaxSize();
   }
 
+  size_t getMaxCacheSegmentSize() const {
+    return cached_logs_.getMaxItemSize();
+  }
+
   size_t getMaxCompressedSize() const {
     return compressed_logs_.getMaxSize();
   }
 
+  size_t getMaxCompressedSegmentSize() const {
+    return compressed_logs_.getMaxItemSize();
+  }
+
  private:
   enum class CompressionResult {
     Success,
diff --git a/libminifi/include/utils/StagingQueue.h 
b/libminifi/include/utils/StagingQueue.h
index 854ea3430..6751c5301 100644
--- a/libminifi/include/utils/StagingQueue.h
+++ b/libminifi/include/utils/StagingQueue.h
@@ -143,6 +143,10 @@ class StagingQueue {
     return max_size_;
   }
 
+  size_t getMaxItemSize() const {
+    return max_item_size_;
+  }
+
   void discardOverflow() {
     while (total_size_ > max_size_) {
       Item item;
@@ -157,6 +161,10 @@ class StagingQueue {
     return total_size_;
   }
 
+  size_t itemCount() const {
+    return queue_.size();
+  }
+
  private:
   void commit(std::unique_lock<std::mutex>& /*lock*/) {
     queue_.enqueue(active_item_.commit());
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 451ac2d18..59b3b9261 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -473,8 +473,10 @@ std::vector<BackTrace> FlowController::getTraces() {
 
 std::map<std::string, std::unique_ptr<io::InputStream>> 
FlowController::getDebugInfo() {
   std::map<std::string, std::unique_ptr<io::InputStream>> debug_info;
-  if (auto logs = core::logging::LoggerConfiguration::getCompressedLog(true)) {
-    debug_info["minifi.log.gz"] = std::move(logs);
+  auto logs = core::logging::LoggerConfiguration::getCompressedLogs();
+  for (size_t i = 0; i < logs.size(); ++i) {
+    std::string index_str = i == logs.size() - 1 ? "" : "." + 
std::to_string(logs.size() - 1 - i);
+    debug_info["minifi.log" + index_str + ".gz"] = std::move(logs[i]);
   }
   if (auto opt_flow_path = flow_configuration_->getConfigurationPath()) {
     debug_info["config.yml"] = 
std::make_unique<io::FileStream>(opt_flow_path.value(), 0, false);
diff --git a/libminifi/src/core/logging/internal/CompressionManager.cpp 
b/libminifi/src/core/logging/internal/CompressionManager.cpp
index 3a85100fb..83da24ee2 100644
--- a/libminifi/src/core/logging/internal/CompressionManager.cpp
+++ b/libminifi/src/core/logging/internal/CompressionManager.cpp
@@ -51,7 +51,8 @@ std::shared_ptr<LogCompressorSink> 
CompressionManager::initialize(
     return sink_;
   }
   // do not create new sink if all relevant parameters match
-  if (!sink_ || sink_->getMaxCacheSize() != cached_log_max_size || 
sink_->getMaxCompressedSize() != compressed_log_max_size) {
+  if (!sink_ || sink_->getMaxCacheSize() != cached_log_max_size || 
sink_->getMaxCompressedSize() != compressed_log_max_size ||
+      sink_->getMaxCacheSegmentSize() != cache_segment_size || 
sink_->getMaxCompressedSegmentSize() != compressed_segment_size) {
     sink_ = std::make_shared<internal::LogCompressorSink>(
         LogQueueSize{cached_log_max_size, cache_segment_size},
         LogQueueSize{compressed_log_max_size, compressed_segment_size},
diff --git a/libminifi/test/unit/LoggerTests.cpp 
b/libminifi/test/unit/LoggerTests.cpp
index 00b7c84e7..adf8ce71d 100644
--- a/libminifi/test/unit/LoggerTests.cpp
+++ b/libminifi/test/unit/LoggerTests.cpp
@@ -20,6 +20,7 @@
 #include <memory>
 #include <vector>
 #include <ctime>
+#include <random>
 #include "../TestBase.h"
 #include "../Catch.h"
 #include "core/logging/LoggerConfiguration.h"
@@ -189,7 +190,7 @@ TEST_CASE("Test ShortenNames", "[ttl8]") {
 
 using namespace minifi::io;
 
-std::string decompress(const std::shared_ptr<InputStream>& input) {
+std::string decompress(const std::unique_ptr<InputStream>& input) {
   auto output = std::make_unique<BufferStream>();
   auto decompressor = 
std::make_shared<ZlibDecompressStream>(gsl::make_not_null(output.get()));
   minifi::internal::pipe(*input, *decompressor);
@@ -214,9 +215,9 @@ TEST_CASE("Test Compression", "[ttl9]") {
   log_config.initialize(properties);
   auto logger = log_config.getLogger(className);
   logger->log_error("Hi there");
-  std::shared_ptr<InputStream> 
compressed_log{logging::LoggerConfiguration::getCompressedLog(true)};
-  REQUIRE(compressed_log);
-  auto logs = decompress(compressed_log);
+  auto compressed_logs = logging::LoggerConfiguration::getCompressedLogs();
+  REQUIRE(compressed_logs.size() == 1);
+  auto logs = decompress(compressed_logs[0]);
   REQUIRE(logs.find("Hi there") != std::string::npos);
 }
 
@@ -234,6 +235,9 @@ class LoggerTestAccessor {
   static size_t getCompressedSize(logging::LoggerConfiguration& log_config) {
     return log_config.compression_manager_.getSink()->compressed_logs_.size();
   }
+  static void runCompression(logging::LoggerConfiguration& log_config) {
+    while (logging::internal::LogCompressorSink::CompressionResult::Success == 
log_config.compression_manager_.getSink()->compress()){}
+  }
 };
 
 TEST_CASE("Test Compression cache overflow is discarded intermittently", 
"[ttl10]") {
@@ -258,17 +262,17 @@ TEST_CASE("Test Compression cache overflow is discarded 
intermittently", "[ttl10
 TEST_CASE("Setting either properties to 0 disables in-memory compressed logs", 
"[ttl11]") {
   auto& log_config = logging::LoggerConfiguration::getConfiguration();
   auto properties = std::make_shared<logging::LoggerProperties>();
-  bool is_nullptr = false;
+  bool is_empty = false;
   SECTION("Cached log size is set to 0") {
-    is_nullptr = true;
+    is_empty = true;
     
properties->set(logging::internal::CompressionManager::compression_cached_log_max_size_,
 "0");
   }
   SECTION("Compressed log size is set to 0") {
-    is_nullptr = true;
+    is_empty = true;
     
properties->set(logging::internal::CompressionManager::compression_compressed_log_max_size_,
 "0");
   }
   SECTION("Sanity check") {
-    is_nullptr = false;
+    is_empty = false;
     // pass
   }
   // by default the root logger is OFF
@@ -276,7 +280,7 @@ TEST_CASE("Setting either properties to 0 disables 
in-memory compressed logs", "
   log_config.initialize(properties);
   auto logger = log_config.getLogger("DisableCompressionTestLogger");
   logger->log_error("Hi there");
-  REQUIRE((logging::LoggerConfiguration::getCompressedLog(true) == nullptr) == 
is_nullptr);
+  REQUIRE(logging::LoggerConfiguration::getCompressedLogs().empty() == 
is_empty);
 }
 
 TEST_CASE("Setting max log entry length property trims long log entries", 
"[ttl12]") {
@@ -288,9 +292,9 @@ TEST_CASE("Setting max log entry length property trims long 
log entries", "[ttl1
   auto logger = log_config.getLogger("SetMaxLogEntryLengthTestLogger");
   logger->log_error("Hi there");
 
-  std::shared_ptr<InputStream> 
compressed_log{logging::LoggerConfiguration::getCompressedLog(true)};
-  REQUIRE(compressed_log);
-  auto logs = decompress(compressed_log);
+  auto compressed_logs = logging::LoggerConfiguration::getCompressedLogs();
+  REQUIRE(compressed_logs.size() == 1);
+  auto logs = decompress(compressed_logs[0]);
   REQUIRE(logs.find("Hi ") == std::string::npos);
   REQUIRE(logs.find("Hi") != std::string::npos);
 }
@@ -304,9 +308,9 @@ TEST_CASE("Setting max log entry length property trims long 
formatted log entrie
   auto logger = log_config.getLogger("SetMaxLogEntryLengthTestLogger");
   logger->log_error("Hi there %s", "John");
 
-  std::shared_ptr<InputStream> 
compressed_log{logging::LoggerConfiguration::getCompressedLog(true)};
-  REQUIRE(compressed_log);
-  auto logs = decompress(compressed_log);
+  auto compressed_logs = logging::LoggerConfiguration::getCompressedLogs();
+  REQUIRE(compressed_logs.size() == 1);
+  auto logs = decompress(compressed_logs[0]);
   REQUIRE(logs.find("Hi ") == std::string::npos);
   REQUIRE(logs.find("Hi") != std::string::npos);
 }
@@ -322,9 +326,9 @@ TEST_CASE("Setting max log entry length to a size larger 
than the internal buffe
   std::string expected_log(1500, 'a');
   logger->log_error(log.c_str());
 
-  std::shared_ptr<InputStream> 
compressed_log{logging::LoggerConfiguration::getCompressedLog(true)};
-  REQUIRE(compressed_log);
-  auto logs = decompress(compressed_log);
+  auto compressed_logs = logging::LoggerConfiguration::getCompressedLogs();
+  REQUIRE(compressed_logs.size() == 1);
+  auto logs = decompress(compressed_logs[0]);
   REQUIRE(logs.find(log) == std::string::npos);
   REQUIRE(logs.find(expected_log) != std::string::npos);
 }
@@ -344,8 +348,40 @@ TEST_CASE("Setting max log entry length to unlimited 
results in unlimited log en
   std::string log(5000, 'a');
   logger->log_error(log.c_str());
 
-  std::shared_ptr<InputStream> 
compressed_log{logging::LoggerConfiguration::getCompressedLog(true)};
-  REQUIRE(compressed_log);
-  auto logs = decompress(compressed_log);
+  auto compressed_logs = logging::LoggerConfiguration::getCompressedLogs();
+  REQUIRE(compressed_logs.size() == 1);
+  auto logs = decompress(compressed_logs[0]);
   REQUIRE(logs.find(log) != std::string::npos);
 }
+
+TEST_CASE("Test sending multiple segments at once", "[ttl16]") {
+  auto& log_config = logging::LoggerConfiguration::getConfiguration();
+  LoggerTestAccessor::setCompressionCompressedSegmentSize(log_config, 100);
+  LoggerTestAccessor::setCompressionCacheSegmentSize(log_config, 100);
+  auto properties = std::make_shared<logging::LoggerProperties>();
+  // by default the root logger is OFF
+  properties->set("logger.root", "INFO");
+  log_config.initialize(properties);
+  auto logger = log_config.getLogger("CompressionTestMultiSegment");
+
+  std::random_device rd;
+  std::mt19937 eng(rd());
+  constexpr const char * TEXT_CHARS = 
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890";
+  const int index_of_last_char = gsl::narrow<int>(strlen(TEXT_CHARS)) - 1;
+  std::uniform_int_distribution<> distr(0, index_of_last_char);
+  std::vector<char> data(100);
+  std::string log_str;
+  const size_t SEGMENT_COUNT = 5;
+  for (size_t idx = 0; idx < SEGMENT_COUNT; ++idx) {
+    std::generate_n(data.begin(), data.size(), [&] { return 
TEXT_CHARS[static_cast<uint8_t>(distr(eng))]; });
+    log_str = std::string{data.begin(), data.end()} + "." + 
std::to_string(idx);
+    logger->log_error(log_str.c_str());
+  }
+
+  LoggerTestAccessor::runCompression(log_config);
+
+  auto compressed_logs = logging::LoggerConfiguration::getCompressedLogs();
+  REQUIRE(compressed_logs.size() == SEGMENT_COUNT);
+  auto logs = decompress(compressed_logs[SEGMENT_COUNT - 1]);
+  REQUIRE(logs.find(log_str) != std::string::npos);
+}

Reply via email to