This is an automated email from the ASF dual-hosted git repository. fgerlits pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit a328a01d288d021a19023facd79a76b2a7d2709f Author: Gabor Gyimesi <[email protected]> AuthorDate: Wed Aug 9 18:24:24 2023 +0200 MINIFICPP-2174 Send all cached compressed log files through C2 Signed-off-by: Ferenc Gerlits <[email protected]> This closes #1631 --- .../include/core/logging/LoggerConfiguration.h | 8 +-- .../core/logging/internal/CompressionManager.h | 7 +- .../core/logging/internal/LogCompressorSink.h | 33 ++++++--- libminifi/include/utils/StagingQueue.h | 8 +++ libminifi/src/FlowController.cpp | 6 +- .../core/logging/internal/CompressionManager.cpp | 3 +- libminifi/test/unit/LoggerTests.cpp | 78 ++++++++++++++++------ 7 files changed, 104 insertions(+), 39 deletions(-) diff --git a/libminifi/include/core/logging/LoggerConfiguration.h b/libminifi/include/core/logging/LoggerConfiguration.h index 6dfd7effb..c62ee74b4 100644 --- a/libminifi/include/core/logging/LoggerConfiguration.h +++ b/libminifi/include/core/logging/LoggerConfiguration.h @@ -98,15 +98,15 @@ class LoggerConfiguration { */ void initialize(const std::shared_ptr<LoggerProperties> &logger_properties); - static std::unique_ptr<io::InputStream> getCompressedLog(bool flush = false) { - return getCompressedLog(std::chrono::milliseconds{0}, flush); + static std::vector<std::unique_ptr<io::InputStream>> getCompressedLogs() { + return getCompressedLogs(std::chrono::milliseconds{0}); } void initializeAlertSinks(core::controller::ControllerServiceProvider* controller, const std::shared_ptr<AgentIdentificationProvider>& agent_id); template<class Rep, class Period> - static std::unique_ptr<io::InputStream> getCompressedLog(const std::chrono::duration<Rep, Period>& time, bool flush = false) { - return getConfiguration().compression_manager_.getCompressedLog(time, flush); + static std::vector<std::unique_ptr<io::InputStream>> getCompressedLogs(const std::chrono::duration<Rep, Period>& time) { + return getConfiguration().compression_manager_.getCompressedLogs(time); } /** diff --git a/libminifi/include/core/logging/internal/CompressionManager.h b/libminifi/include/core/logging/internal/CompressionManager.h index b1ea9fac9..b881a5a3f 100644 --- a/libminifi/include/core/logging/internal/CompressionManager.h +++ b/libminifi/include/core/logging/internal/CompressionManager.h @@ -24,6 +24,7 @@ #include <functional> #include <utility> #include <string> +#include <vector> #include "core/logging/Logger.h" #include "LogCompressorSink.h" @@ -50,12 +51,12 @@ class CompressionManager { std::shared_ptr<LogCompressorSink> initialize(const std::shared_ptr<LoggerProperties>& properties, const std::shared_ptr<Logger>& error_logger, const LoggerFactory& logger_factory); template<class Rep, class Period> - std::unique_ptr<io::InputStream> getCompressedLog(const std::chrono::duration<Rep, Period>& time, bool flush = false) { + std::vector<std::unique_ptr<io::InputStream>> getCompressedLogs(const std::chrono::duration<Rep, Period>& time) { std::shared_ptr<internal::LogCompressorSink> sink = getSink(); if (sink) { - return sink->getContent(time, flush); + return sink->getContent(time); } - return nullptr; + return {}; } static constexpr const char* compression_cached_log_max_size_ = "compression.cached.log.max.size"; diff --git a/libminifi/include/core/logging/internal/LogCompressorSink.h b/libminifi/include/core/logging/internal/LogCompressorSink.h index fd02521ad..009fc1235 100644 --- a/libminifi/include/core/logging/internal/LogCompressorSink.h +++ b/libminifi/include/core/logging/internal/LogCompressorSink.h @@ -21,6 +21,7 @@ #include <memory> #include <atomic> #include <utility> +#include <vector> #include "spdlog/common.h" #include "spdlog/details/log_msg.h" @@ -57,26 +58,42 @@ class LogCompressorSink : public spdlog::sinks::base_sink<std::mutex> { ~LogCompressorSink() override; template<class Rep, class Period> - std::unique_ptr<io::InputStream> getContent(const std::chrono::duration<Rep, Period>& time, bool flush = false) { - if (flush) { - cached_logs_.commit(); - compress(true); + std::vector<std::unique_ptr<io::InputStream>> getContent(const std::chrono::duration<Rep, Period>& time) { + cached_logs_.commit(); + compress(true); + + std::vector<std::unique_ptr<io::InputStream>> log_segments; + const auto segment_count = compressed_logs_.itemCount(); + for (size_t i = 0; i < segment_count; ++i) { + LogBuffer compressed; + if (!compressed_logs_.tryDequeue(compressed, time)) { + break; + } + log_segments.push_back(std::move(compressed.buffer_)); } - LogBuffer compressed; - if (!compressed_logs_.tryDequeue(compressed, time) && flush) { - return createEmptyArchive(); + + if (log_segments.empty()) { + log_segments.push_back(createEmptyArchive()); } - return std::move(compressed.buffer_); + return log_segments; } size_t getMaxCacheSize() const { return cached_logs_.getMaxSize(); } + size_t getMaxCacheSegmentSize() const { + return cached_logs_.getMaxItemSize(); + } + size_t getMaxCompressedSize() const { return compressed_logs_.getMaxSize(); } + size_t getMaxCompressedSegmentSize() const { + return compressed_logs_.getMaxItemSize(); + } + private: enum class CompressionResult { Success, diff --git a/libminifi/include/utils/StagingQueue.h b/libminifi/include/utils/StagingQueue.h index 854ea3430..6751c5301 100644 --- a/libminifi/include/utils/StagingQueue.h +++ b/libminifi/include/utils/StagingQueue.h @@ -143,6 +143,10 @@ class StagingQueue { return max_size_; } + size_t getMaxItemSize() const { + return max_item_size_; + } + void discardOverflow() { while (total_size_ > max_size_) { Item item; @@ -157,6 +161,10 @@ class StagingQueue { return total_size_; } + size_t itemCount() const { + return queue_.size(); + } + private: void commit(std::unique_lock<std::mutex>& /*lock*/) { queue_.enqueue(active_item_.commit()); diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp index 451ac2d18..59b3b9261 100644 --- a/libminifi/src/FlowController.cpp +++ b/libminifi/src/FlowController.cpp @@ -473,8 +473,10 @@ std::vector<BackTrace> FlowController::getTraces() { std::map<std::string, std::unique_ptr<io::InputStream>> FlowController::getDebugInfo() { std::map<std::string, std::unique_ptr<io::InputStream>> debug_info; - if (auto logs = core::logging::LoggerConfiguration::getCompressedLog(true)) { - debug_info["minifi.log.gz"] = std::move(logs); + auto logs = core::logging::LoggerConfiguration::getCompressedLogs(); + for (size_t i = 0; i < logs.size(); ++i) { + std::string index_str = i == logs.size() - 1 ? "" : "." + std::to_string(logs.size() - 1 - i); + debug_info["minifi.log" + index_str + ".gz"] = std::move(logs[i]); } if (auto opt_flow_path = flow_configuration_->getConfigurationPath()) { debug_info["config.yml"] = std::make_unique<io::FileStream>(opt_flow_path.value(), 0, false); diff --git a/libminifi/src/core/logging/internal/CompressionManager.cpp b/libminifi/src/core/logging/internal/CompressionManager.cpp index 3a85100fb..83da24ee2 100644 --- a/libminifi/src/core/logging/internal/CompressionManager.cpp +++ b/libminifi/src/core/logging/internal/CompressionManager.cpp @@ -51,7 +51,8 @@ std::shared_ptr<LogCompressorSink> CompressionManager::initialize( return sink_; } // do not create new sink if all relevant parameters match - if (!sink_ || sink_->getMaxCacheSize() != cached_log_max_size || sink_->getMaxCompressedSize() != compressed_log_max_size) { + if (!sink_ || sink_->getMaxCacheSize() != cached_log_max_size || sink_->getMaxCompressedSize() != compressed_log_max_size || + sink_->getMaxCacheSegmentSize() != cache_segment_size || sink_->getMaxCompressedSegmentSize() != compressed_segment_size) { sink_ = std::make_shared<internal::LogCompressorSink>( LogQueueSize{cached_log_max_size, cache_segment_size}, LogQueueSize{compressed_log_max_size, compressed_segment_size}, diff --git a/libminifi/test/unit/LoggerTests.cpp b/libminifi/test/unit/LoggerTests.cpp index 00b7c84e7..adf8ce71d 100644 --- a/libminifi/test/unit/LoggerTests.cpp +++ b/libminifi/test/unit/LoggerTests.cpp @@ -20,6 +20,7 @@ #include <memory> #include <vector> #include <ctime> +#include <random> #include "../TestBase.h" #include "../Catch.h" #include "core/logging/LoggerConfiguration.h" @@ -189,7 +190,7 @@ TEST_CASE("Test ShortenNames", "[ttl8]") { using namespace minifi::io; -std::string decompress(const std::shared_ptr<InputStream>& input) { +std::string decompress(const std::unique_ptr<InputStream>& input) { auto output = std::make_unique<BufferStream>(); auto decompressor = std::make_shared<ZlibDecompressStream>(gsl::make_not_null(output.get())); minifi::internal::pipe(*input, *decompressor); @@ -214,9 +215,9 @@ TEST_CASE("Test Compression", "[ttl9]") { log_config.initialize(properties); auto logger = log_config.getLogger(className); logger->log_error("Hi there"); - std::shared_ptr<InputStream> compressed_log{logging::LoggerConfiguration::getCompressedLog(true)}; - REQUIRE(compressed_log); - auto logs = decompress(compressed_log); + auto compressed_logs = logging::LoggerConfiguration::getCompressedLogs(); + REQUIRE(compressed_logs.size() == 1); + auto logs = decompress(compressed_logs[0]); REQUIRE(logs.find("Hi there") != std::string::npos); } @@ -234,6 +235,9 @@ class LoggerTestAccessor { static size_t getCompressedSize(logging::LoggerConfiguration& log_config) { return log_config.compression_manager_.getSink()->compressed_logs_.size(); } + static void runCompression(logging::LoggerConfiguration& log_config) { + while (logging::internal::LogCompressorSink::CompressionResult::Success == log_config.compression_manager_.getSink()->compress()){} + } }; TEST_CASE("Test Compression cache overflow is discarded intermittently", "[ttl10]") { @@ -258,17 +262,17 @@ TEST_CASE("Test Compression cache overflow is discarded intermittently", "[ttl10 TEST_CASE("Setting either properties to 0 disables in-memory compressed logs", "[ttl11]") { auto& log_config = logging::LoggerConfiguration::getConfiguration(); auto properties = std::make_shared<logging::LoggerProperties>(); - bool is_nullptr = false; + bool is_empty = false; SECTION("Cached log size is set to 0") { - is_nullptr = true; + is_empty = true; properties->set(logging::internal::CompressionManager::compression_cached_log_max_size_, "0"); } SECTION("Compressed log size is set to 0") { - is_nullptr = true; + is_empty = true; properties->set(logging::internal::CompressionManager::compression_compressed_log_max_size_, "0"); } SECTION("Sanity check") { - is_nullptr = false; + is_empty = false; // pass } // by default the root logger is OFF @@ -276,7 +280,7 @@ TEST_CASE("Setting either properties to 0 disables in-memory compressed logs", " log_config.initialize(properties); auto logger = log_config.getLogger("DisableCompressionTestLogger"); logger->log_error("Hi there"); - REQUIRE((logging::LoggerConfiguration::getCompressedLog(true) == nullptr) == is_nullptr); + REQUIRE(logging::LoggerConfiguration::getCompressedLogs().empty() == is_empty); } TEST_CASE("Setting max log entry length property trims long log entries", "[ttl12]") { @@ -288,9 +292,9 @@ TEST_CASE("Setting max log entry length property trims long log entries", "[ttl1 auto logger = log_config.getLogger("SetMaxLogEntryLengthTestLogger"); logger->log_error("Hi there"); - std::shared_ptr<InputStream> compressed_log{logging::LoggerConfiguration::getCompressedLog(true)}; - REQUIRE(compressed_log); - auto logs = decompress(compressed_log); + auto compressed_logs = logging::LoggerConfiguration::getCompressedLogs(); + REQUIRE(compressed_logs.size() == 1); + auto logs = decompress(compressed_logs[0]); REQUIRE(logs.find("Hi ") == std::string::npos); REQUIRE(logs.find("Hi") != std::string::npos); } @@ -304,9 +308,9 @@ TEST_CASE("Setting max log entry length property trims long formatted log entrie auto logger = log_config.getLogger("SetMaxLogEntryLengthTestLogger"); logger->log_error("Hi there %s", "John"); - std::shared_ptr<InputStream> compressed_log{logging::LoggerConfiguration::getCompressedLog(true)}; - REQUIRE(compressed_log); - auto logs = decompress(compressed_log); + auto compressed_logs = logging::LoggerConfiguration::getCompressedLogs(); + REQUIRE(compressed_logs.size() == 1); + auto logs = decompress(compressed_logs[0]); REQUIRE(logs.find("Hi ") == std::string::npos); REQUIRE(logs.find("Hi") != std::string::npos); } @@ -322,9 +326,9 @@ TEST_CASE("Setting max log entry length to a size larger than the internal buffe std::string expected_log(1500, 'a'); logger->log_error(log.c_str()); - std::shared_ptr<InputStream> compressed_log{logging::LoggerConfiguration::getCompressedLog(true)}; - REQUIRE(compressed_log); - auto logs = decompress(compressed_log); + auto compressed_logs = logging::LoggerConfiguration::getCompressedLogs(); + REQUIRE(compressed_logs.size() == 1); + auto logs = decompress(compressed_logs[0]); REQUIRE(logs.find(log) == std::string::npos); REQUIRE(logs.find(expected_log) != std::string::npos); } @@ -344,8 +348,40 @@ TEST_CASE("Setting max log entry length to unlimited results in unlimited log en std::string log(5000, 'a'); logger->log_error(log.c_str()); - std::shared_ptr<InputStream> compressed_log{logging::LoggerConfiguration::getCompressedLog(true)}; - REQUIRE(compressed_log); - auto logs = decompress(compressed_log); + auto compressed_logs = logging::LoggerConfiguration::getCompressedLogs(); + REQUIRE(compressed_logs.size() == 1); + auto logs = decompress(compressed_logs[0]); REQUIRE(logs.find(log) != std::string::npos); } + +TEST_CASE("Test sending multiple segments at once", "[ttl16]") { + auto& log_config = logging::LoggerConfiguration::getConfiguration(); + LoggerTestAccessor::setCompressionCompressedSegmentSize(log_config, 100); + LoggerTestAccessor::setCompressionCacheSegmentSize(log_config, 100); + auto properties = std::make_shared<logging::LoggerProperties>(); + // by default the root logger is OFF + properties->set("logger.root", "INFO"); + log_config.initialize(properties); + auto logger = log_config.getLogger("CompressionTestMultiSegment"); + + std::random_device rd; + std::mt19937 eng(rd()); + constexpr const char * TEXT_CHARS = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890"; + const int index_of_last_char = gsl::narrow<int>(strlen(TEXT_CHARS)) - 1; + std::uniform_int_distribution<> distr(0, index_of_last_char); + std::vector<char> data(100); + std::string log_str; + const size_t SEGMENT_COUNT = 5; + for (size_t idx = 0; idx < SEGMENT_COUNT; ++idx) { + std::generate_n(data.begin(), data.size(), [&] { return TEXT_CHARS[static_cast<uint8_t>(distr(eng))]; }); + log_str = std::string{data.begin(), data.end()} + "." + std::to_string(idx); + logger->log_error(log_str.c_str()); + } + + LoggerTestAccessor::runCompression(log_config); + + auto compressed_logs = logging::LoggerConfiguration::getCompressedLogs(); + REQUIRE(compressed_logs.size() == SEGMENT_COUNT); + auto logs = decompress(compressed_logs[SEGMENT_COUNT - 1]); + REQUIRE(logs.find(log_str) != std::string::npos); +}
