This is an automated email from the ASF dual-hosted git repository. szaszm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit a09d158f500e1e977d289c5b03fc0d11574f8340 Author: Adam Debreceni <[email protected]> AuthorDate: Wed Jun 23 17:29:32 2021 +0200 MINIFICPP-1414 Create in-memory compressed logs Closes #955 Signed-off-by: Marton Szasz <[email protected]> --- conf/minifi-log.properties | 9 ++ libminifi/CMakeLists.txt | 2 +- libminifi/include/core/TypedValues.h | 88 ++++------ .../include/core/logging/LoggerConfiguration.h | 53 +++--- libminifi/include/core/logging/LoggerProperties.h | 69 ++++++++ .../core/logging/internal/ActiveCompressor.h | 72 +++++++++ .../core/logging/internal/CompressionManager.h | 84 ++++++++++ .../core/logging/internal/LogBuffer.h} | 46 ++++-- .../core/logging/internal/LogCompressor.h} | 35 ++-- .../core/logging/internal/LogCompressorSink.h | 92 +++++++++++ libminifi/include/io/BufferStream.h | 8 + libminifi/include/io/ZlibStream.h | 13 +- libminifi/include/utils/Literals.h | 59 +++++++ libminifi/include/utils/StagingQueue.h | 178 +++++++++++++++++++++ libminifi/include/utils/ValueParser.h | 4 + libminifi/src/core/TypedValues.cpp | 9 ++ libminifi/src/core/logging/LoggerConfiguration.cpp | 55 +++++-- .../core/logging/internal/CompressionManager.cpp | 74 +++++++++ .../internal/LogCompressor.cpp} | 31 ++-- .../core/logging/internal/LogCompressorSink.cpp | 81 ++++++++++ libminifi/src/io/ZlibStream.cpp | 39 +++-- libminifi/test/unit/LoggerTests.cpp | 97 ++++++++++- libminifi/test/unit/StagingQueueTests.cpp | 121 ++++++++++++++ 23 files changed, 1176 insertions(+), 143 deletions(-) diff --git a/conf/minifi-log.properties b/conf/minifi-log.properties index 250cb46..aeca6dd 100644 --- a/conf/minifi-log.properties +++ b/conf/minifi-log.properties @@ -45,3 +45,12 @@ logger.org::apache::nifi::minifi=INFO,rolling #Logging configurable by class fully qualified name #logger.org::apache::nifi::minifi::core::logging::LoggerConfiguration=DEBUG + +# Log compression # +## Enables the agent to keep a limited chunk of the application +## logs in memory in compressed format. Note that due to its +## compressed nature this could mean more logs than the contents +## of the log files. +## Setting any of these to 0 disables the in-memory log compression. +#compression.cached.log.max.size=8 MB +#compression.compressed.log.max.size=8 MB diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt index a9cce79..6550e7f 100644 --- a/libminifi/CMakeLists.txt +++ b/libminifi/CMakeLists.txt @@ -57,7 +57,7 @@ if (NOT OPENSSL_OFF) set(TLS_SOURCES "src/utils/tls/*.cpp" "src/io/tls/*.cpp") endif() -file(GLOB SOURCES "src/properties/*.cpp" "src/utils/file/*.cpp" "src/sitetosite/*.cpp" "src/core/logging/*.cpp" "src/core/state/*.cpp" "src/core/state/nodes/*.cpp" "src/c2/protocols/*.cpp" "src/c2/triggers/*.cpp" "src/c2/*.cpp" "src/io/*.cpp" ${SOCKET_SOURCES} ${TLS_SOURCES} "src/core/controller/*.cpp" "src/controllers/*.cpp" "src/controllers/keyvalue/*.cpp" "src/core/*.cpp" "src/core/repository/*.cpp" "src/core/yaml/*.cpp" "src/core/reporting/*.cpp" "src/serialization/*.cpp" "src/pro [...] +file(GLOB SOURCES "src/properties/*.cpp" "src/utils/file/*.cpp" "src/sitetosite/*.cpp" "src/core/logging/*.cpp" "src/core/logging/internal/*.cpp" "src/core/state/*.cpp" "src/core/state/nodes/*.cpp" "src/c2/protocols/*.cpp" "src/c2/triggers/*.cpp" "src/c2/*.cpp" "src/io/*.cpp" ${SOCKET_SOURCES} ${TLS_SOURCES} "src/core/controller/*.cpp" "src/controllers/*.cpp" "src/controllers/keyvalue/*.cpp" "src/core/*.cpp" "src/core/repository/*.cpp" "src/core/yaml/*.cpp" "src/core/reporting/*.cpp" [...] # manually add this as it might not yet be present when this executes list(APPEND SOURCES "src/agent/agent_version.cpp") diff --git a/libminifi/include/core/TypedValues.h b/libminifi/include/core/TypedValues.h index b3808ba..09050f1 100644 --- a/libminifi/include/core/TypedValues.h +++ b/libminifi/include/core/TypedValues.h @@ -21,12 +21,15 @@ #include <algorithm> #include <string> #include <typeindex> +#include <map> +#include <memory> #include "state/Value.h" #include "utils/StringUtils.h" #include "utils/ValueParser.h" #include "utils/PropertyErrors.h" #include "utils/OptionalUtils.h" +#include "utils/Literals.h" namespace org { namespace apache { @@ -114,6 +117,8 @@ class TimePeriodValue : public TransformableValue, public state::response::UInt6 * format <numeric> <byte size>. */ class DataSizeValue : public TransformableValue, public state::response::UInt64Value { + static std::shared_ptr<logging::Logger>& getLogger(); + public: static const std::type_index type_id; @@ -128,72 +133,41 @@ class DataSizeValue : public TransformableValue, public state::response::UInt64V } -// Convert String to Integer + // Convert String to Integer template<typename T, typename std::enable_if< std::is_integral<T>::value>::type* = nullptr> static bool StringToInt(const std::string &input, T &output) { - if (input.size() == 0) { + // TODO(adebreceni): this mapping is to preserve backwards compatibility, + // we should entertain the idea of moving to standardized units in + // the configuration (i.e. K = 1000, Ki = 1024) + static std::map<std::string, int64_t> unit_map{ + {"B", 1}, + {"K", 1_KB}, {"M", 1_MB}, {"G", 1_GB}, {"T", 1_TB}, {"P", 1_PB}, + {"KB", 1_KiB}, {"MB", 1_MiB}, {"GB", 1_GiB}, {"TB", 1_TiB}, {"PB", 1_PiB}, + }; + + int64_t value; + std::string unit_str; + try { + unit_str = utils::StringUtils::trim(utils::internal::ValueParser(input).parse(value).rest()); + } catch (const utils::internal::ParseException&) { return false; } - const char *cvalue = input.c_str(); - char *pEnd; - auto ival = std::strtoll(cvalue, &pEnd, 0); - - if (pEnd[0] == '\0') { - output = gsl::narrow<T>(ival); - return true; - } - - while (*pEnd == ' ') { - // Skip the space - pEnd++; - } - - char end0 = toupper(pEnd[0]); - if (end0 == 'B') { - output = gsl::narrow<T>(ival); - return true; - } else if ((end0 == 'K') || (end0 == 'M') || (end0 == 'G') || (end0 == 'T') || (end0 == 'P')) { - if (pEnd[1] == '\0') { - unsigned long int multiplier = 1000; // NOLINT - - if ((end0 != 'K')) { - multiplier *= 1000; - if (end0 != 'M') { - multiplier *= 1000; - if (end0 != 'G') { - multiplier *= 1000; - if (end0 != 'T') { - multiplier *= 1000; - } - } - } - } - output = gsl::narrow<T>(ival * multiplier); - return true; - - } else if ((pEnd[1] == 'b' || pEnd[1] == 'B') && (pEnd[2] == '\0')) { - unsigned long int multiplier = 1024; // NOLINT - - if ((end0 != 'K')) { - multiplier *= 1024; - if (end0 != 'M') { - multiplier *= 1024; - if (end0 != 'G') { - multiplier *= 1024; - if (end0 != 'T') { - multiplier *= 1024; - } - } - } - } - output = gsl::narrow<T>(ival * multiplier); - return true; + if (!unit_str.empty()) { + std::transform(unit_str.begin(), unit_str.end(), unit_str.begin(), ::toupper); + auto multiplierIt = unit_map.find(unit_str); + if (multiplierIt == unit_map.end()) { + getLogger()->log_warn("Unrecognized data unit: '%s', in the future this will constitute as an error", unit_str); + // backwards compatibility + // return false; + } else { + value *= multiplierIt->second; } } - return false; + output = gsl::narrow<T>(value); + return true; } }; diff --git a/libminifi/include/core/logging/LoggerConfiguration.h b/libminifi/include/core/logging/LoggerConfiguration.h index 7e93ef0..80e2f1e 100644 --- a/libminifi/include/core/logging/LoggerConfiguration.h +++ b/libminifi/include/core/logging/LoggerConfiguration.h @@ -36,7 +36,10 @@ #include "core/Core.h" #include "core/logging/Logger.h" -#include "properties/Properties.h" +#include "LoggerProperties.h" +#include "internal/CompressionManager.h" + +class LoggerTestAccessor; namespace org { namespace apache { @@ -50,6 +53,8 @@ struct LoggerNamespace { spdlog::level::level_enum level; bool has_level; std::vector<std::shared_ptr<spdlog::sinks::sink>> sinks; + // sinks made available to all descendants + std::vector<std::shared_ptr<spdlog::sinks::sink>> exported_sinks; std::map<std::string, std::shared_ptr<LoggerNamespace>> children; LoggerNamespace() @@ -61,37 +66,9 @@ struct LoggerNamespace { }; } // namespace internal -class LoggerProperties : public Properties { - public: - LoggerProperties() - : Properties("Logger properties") { - } - /** - * Gets all keys that start with the given prefix and do not have a "." after the prefix and "." separator. - * - * Ex: with type argument "appender" - * you would get back a property of "appender.rolling" but not "appender.rolling.file_name" - */ - std::vector<std::string> get_keys_of_type(const std::string &type); - - /** - * Registers a sink witht the given name. This allows for programmatic definition of sinks. - */ - void add_sink(const std::string &name, std::shared_ptr<spdlog::sinks::sink> sink) { - sinks_[name] = sink; - } - std::map<std::string, std::shared_ptr<spdlog::sinks::sink>> initial_sinks() { - return sinks_; - } - - static const char* appender_prefix; - static const char* logger_prefix; - - private: - std::map<std::string, std::shared_ptr<spdlog::sinks::sink>> sinks_; -}; - class LoggerConfiguration { + friend class ::LoggerTestAccessor; + public: /** * Gets the current log configuration @@ -121,6 +98,15 @@ class LoggerConfiguration { */ void initialize(const std::shared_ptr<LoggerProperties> &logger_properties); + static std::unique_ptr<io::InputStream> getCompressedLog(bool flush = false) { + return getCompressedLog(std::chrono::milliseconds{0}, flush); + } + + template<class Rep, class Period> + static std::unique_ptr<io::InputStream> getCompressedLog(const std::chrono::duration<Rep, Period>& time, bool flush = false) { + return getConfiguration().compression_manager_.getCompressedLog(time, flush); + } + /** * Can be used to get arbitrarily named Logger, LoggerFactory should be preferred within a class. */ @@ -134,6 +120,10 @@ class LoggerConfiguration { std::shared_ptr<spdlog::formatter> formatter, bool remove_if_present = false); private: + std::shared_ptr<Logger> getLogger(const std::string& name, const std::lock_guard<std::mutex>& lock); + + void initializeCompression(const std::lock_guard<std::mutex>& lock, const std::shared_ptr<LoggerProperties>& properties); + static spdlog::sink_ptr create_syslog_sink(); static spdlog::sink_ptr create_fallback_sink(); @@ -154,6 +144,7 @@ class LoggerConfiguration { }; LoggerConfiguration(); + internal::CompressionManager compression_manager_; std::shared_ptr<internal::LoggerNamespace> root_namespace_; std::vector<std::shared_ptr<LoggerImpl>> loggers; std::shared_ptr<spdlog::formatter> formatter_; diff --git a/libminifi/include/core/logging/LoggerProperties.h b/libminifi/include/core/logging/LoggerProperties.h new file mode 100644 index 0000000..1341a7f --- /dev/null +++ b/libminifi/include/core/logging/LoggerProperties.h @@ -0,0 +1,69 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <memory> +#include <string> +#include <map> +#include <vector> + +#include "spdlog/sinks/sink.h" + +#include "properties/Properties.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { +namespace logging { + +class LoggerProperties : public Properties { + public: + LoggerProperties() + : Properties("Logger properties") { + } + /** + * Gets all keys that start with the given prefix and do not have a "." after the prefix and "." separator. + * + * Ex: with type argument "appender" + * you would get back a property of "appender.rolling" but not "appender.rolling.file_name" + */ + std::vector<std::string> get_keys_of_type(const std::string &type); + + /** + * Registers a sink witht the given name. This allows for programmatic definition of sinks. + */ + void add_sink(const std::string &name, std::shared_ptr<spdlog::sinks::sink> sink) { + sinks_[name] = sink; + } + std::map<std::string, std::shared_ptr<spdlog::sinks::sink>> initial_sinks() { + return sinks_; + } + + private: + std::map<std::string, std::shared_ptr<spdlog::sinks::sink>> sinks_; +}; + +} // namespace logging +} // namespace core +} // namespace minifi +} // namespace nifi +} // namespace apache +} // namespace org diff --git a/libminifi/include/core/logging/internal/ActiveCompressor.h b/libminifi/include/core/logging/internal/ActiveCompressor.h new file mode 100644 index 0000000..045d3e0 --- /dev/null +++ b/libminifi/include/core/logging/internal/ActiveCompressor.h @@ -0,0 +1,72 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <memory> +#include <utility> +#include "LogBuffer.h" +#include "LogCompressor.h" +#include "core/logging/Logger.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { +namespace logging { +namespace internal { + +class ActiveCompressor { + public: + class Allocator { + public: + explicit Allocator(std::shared_ptr<logging::Logger> logger) : logger_{std::move(logger)} {} + + ActiveCompressor operator()(size_t max_size) const { + ActiveCompressor instance; + instance.output_.reset(new io::BufferStream()); + instance.output_->extend(max_size); + instance.compressor_.reset(new LogCompressor(gsl::make_not_null(instance.output_.get()), logger_)); + return instance; + } + + private: + std::shared_ptr<logging::Logger> logger_; + }; + + LogBuffer commit() { + compressor_->close(); + return LogBuffer{std::move(output_)}; + } + + size_t size() const { + return output_->size(); + } + + std::unique_ptr<io::BufferStream> output_; + std::unique_ptr<LogCompressor> compressor_; +}; + +} // namespace internal +} // namespace logging +} // namespace core +} // namespace minifi +} // namespace nifi +} // namespace apache +} // namespace org diff --git a/libminifi/include/core/logging/internal/CompressionManager.h b/libminifi/include/core/logging/internal/CompressionManager.h new file mode 100644 index 0000000..b1ea9fa --- /dev/null +++ b/libminifi/include/core/logging/internal/CompressionManager.h @@ -0,0 +1,84 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <memory> +#include <mutex> +#include <atomic> +#include <functional> +#include <utility> +#include <string> + +#include "core/logging/Logger.h" +#include "LogCompressorSink.h" +#include "core/logging/LoggerProperties.h" +#include "io/InputStream.h" +#include "utils/Literals.h" + +class LoggerTestAccessor; + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { +namespace logging { +namespace internal { + +class CompressionManager { + friend class ::LoggerTestAccessor; + + using LoggerFactory = std::function<std::shared_ptr<Logger>(const std::string&)>; + + public: + std::shared_ptr<LogCompressorSink> initialize(const std::shared_ptr<LoggerProperties>& properties, const std::shared_ptr<Logger>& error_logger, const LoggerFactory& logger_factory); + + template<class Rep, class Period> + std::unique_ptr<io::InputStream> getCompressedLog(const std::chrono::duration<Rep, Period>& time, bool flush = false) { + std::shared_ptr<internal::LogCompressorSink> sink = getSink(); + if (sink) { + return sink->getContent(time, flush); + } + return nullptr; + } + + static constexpr const char* compression_cached_log_max_size_ = "compression.cached.log.max.size"; + static constexpr const char* compression_compressed_log_max_size_ = "compression.compressed.log.max.size"; + + private: + std::shared_ptr<internal::LogCompressorSink> getSink() const { + // gcc4.8 bug => cannot use std::atomic_load + std::lock_guard<std::mutex> lock(mtx_); + return sink_; + } + + std::atomic<size_t> cache_segment_size{1_MiB}; + std::atomic<size_t> compressed_segment_size{1_MiB}; + + mutable std::mutex mtx_; + std::shared_ptr<LogCompressorSink> sink_; +}; + +} // namespace internal +} // namespace logging +} // namespace core +} // namespace minifi +} // namespace nifi +} // namespace apache +} // namespace org diff --git a/libminifi/src/core/TypedValues.cpp b/libminifi/include/core/logging/internal/LogBuffer.h similarity index 53% copy from libminifi/src/core/TypedValues.cpp copy to libminifi/include/core/logging/internal/LogBuffer.h index 93a43b8..968be80 100644 --- a/libminifi/src/core/TypedValues.cpp +++ b/libminifi/include/core/logging/internal/LogBuffer.h @@ -16,19 +16,47 @@ * limitations under the License. */ -#include "core/Property.h" -#include "core/TypedValues.h" +#pragma once + +#include <memory> +#include <utility> + +#include "io/BufferStream.h" + namespace org { namespace apache { namespace nifi { namespace minifi { namespace core { +namespace logging { +namespace internal { + +class LogBuffer { + public: + LogBuffer() = default; + explicit LogBuffer(std::unique_ptr<io::BufferStream> buffer): buffer_{std::move(buffer)} {} + + static LogBuffer allocate(size_t max_size) { + LogBuffer instance{utils::make_unique<io::BufferStream>()}; + instance.buffer_->extend(max_size); + return instance; + } + + LogBuffer commit() { + return LogBuffer{std::move(buffer_)}; + } + + size_t size() const { + return buffer_->size(); + } -const std::type_index DataSizeValue::type_id = typeid(uint64_t); -const std::type_index TimePeriodValue::type_id = typeid(uint64_t); + std::unique_ptr<io::BufferStream> buffer_; +}; -} /* namespace core */ -} /* namespace minifi */ -} /* namespace nifi */ -} /* namespace apache */ -} /* namespace org */ +} // namespace internal +} // namespace logging +} // namespace core +} // namespace minifi +} // namespace nifi +} // namespace apache +} // namespace org diff --git a/libminifi/src/core/TypedValues.cpp b/libminifi/include/core/logging/internal/LogCompressor.h similarity index 62% copy from libminifi/src/core/TypedValues.cpp copy to libminifi/include/core/logging/internal/LogCompressor.h index 93a43b8..9bc722e 100644 --- a/libminifi/src/core/TypedValues.cpp +++ b/libminifi/include/core/logging/internal/LogCompressor.h @@ -16,19 +16,36 @@ * limitations under the License. */ -#include "core/Property.h" -#include "core/TypedValues.h" +#pragma once + +#include <memory> +#include "io/ZlibStream.h" +#include "io/OutputStream.h" + namespace org { namespace apache { namespace nifi { namespace minifi { namespace core { +namespace logging { +namespace internal { + +class LogCompressor : public io::ZlibCompressStream { + public: + LogCompressor(gsl::not_null<OutputStream *> output, std::shared_ptr<logging::Logger> logger); + + enum class FlushResult { + Success, + Error + }; -const std::type_index DataSizeValue::type_id = typeid(uint64_t); -const std::type_index TimePeriodValue::type_id = typeid(uint64_t); + FlushResult flush(); +}; -} /* namespace core */ -} /* namespace minifi */ -} /* namespace nifi */ -} /* namespace apache */ -} /* namespace org */ +} // namespace internal +} // namespace logging +} // namespace core +} // namespace minifi +} // namespace nifi +} // namespace apache +} // namespace org diff --git a/libminifi/include/core/logging/internal/LogCompressorSink.h b/libminifi/include/core/logging/internal/LogCompressorSink.h new file mode 100644 index 0000000..bbb7c24 --- /dev/null +++ b/libminifi/include/core/logging/internal/LogCompressorSink.h @@ -0,0 +1,92 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <memory> +#include <atomic> +#include <utility> + +#include "spdlog/common.h" +#include "spdlog/details/log_msg.h" +#include "spdlog/details/null_mutex.h" +#include "spdlog/sinks/base_sink.h" +#include "ActiveCompressor.h" +#include "LogBuffer.h" +#include "utils/StagingQueue.h" + +class LoggerTestAccessor; + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { +namespace logging { +namespace internal { + +struct LogQueueSize { + size_t max_total_size; + size_t max_segment_size; +}; + +class LogCompressorSink : public spdlog::sinks::base_sink<spdlog::details::null_mutex> { + friend class ::LoggerTestAccessor; + + private: + void sink_it_(const spdlog::details::log_msg& msg) override; + void flush_() override; + + public: + explicit LogCompressorSink(LogQueueSize cache_size, LogQueueSize compressed_size, std::shared_ptr<logging::Logger> logger); + ~LogCompressorSink() override; + + template<class Rep, class Period> + std::unique_ptr<io::InputStream> getContent(const std::chrono::duration<Rep, Period>& time, bool flush = false) { + if (flush) { + cached_logs_.commit(); + compress(true); + } + LogBuffer compressed; + compressed_logs_.tryDequeue(compressed, time); + return std::move(compressed.buffer_); + } + + private: + enum class CompressionResult { + Success, + NothingToCompress + }; + + CompressionResult compress(bool force_rotation = false); + void run(); + + std::atomic<bool> running_{true}; + std::thread compression_thread_; + + utils::StagingQueue<LogBuffer> cached_logs_; + utils::StagingQueue<ActiveCompressor, ActiveCompressor::Allocator> compressed_logs_; +}; + +} // namespace internal +} // namespace logging +} // namespace core +} // namespace minifi +} // namespace nifi +} // namespace apache +} // namespace org diff --git a/libminifi/include/io/BufferStream.h b/libminifi/include/io/BufferStream.h index 2290a3f..c16e15c 100644 --- a/libminifi/include/io/BufferStream.h +++ b/libminifi/include/io/BufferStream.h @@ -43,6 +43,14 @@ class BufferStream : public BaseStream { write(reinterpret_cast<const uint8_t*>(data.c_str()), data.length()); } + /* + * prepares the stream to accept and additional byte_count bytes + * @param byte_count number of bytes we expect to write + */ + void extend(size_t byte_count) { + buffer_.reserve(buffer_.size() + byte_count); + } + using BaseStream::read; using BaseStream::write; diff --git a/libminifi/include/io/ZlibStream.h b/libminifi/include/io/ZlibStream.h index b146413..f8efff8 100644 --- a/libminifi/include/io/ZlibStream.h +++ b/libminifi/include/io/ZlibStream.h @@ -26,7 +26,7 @@ #include <vector> #include "BaseStream.h" -#include "core/logging/LoggerConfiguration.h" +#include "core/logging/Logger.h" #include "utils/gsl.h" namespace org { @@ -80,8 +80,13 @@ class ZlibCompressStream : public ZlibBaseStream { void close() override; - private: - std::shared_ptr<logging::Logger> logger_{logging::LoggerFactory<ZlibCompressStream>::getLogger()}; + protected: + ZlibCompressStream(gsl::not_null<OutputStream*> ouput, ZlibCompressionFormat format, int level, std::shared_ptr<core::logging::Logger> logger); + + using FlushMode = int; + size_t write(const uint8_t* value, size_t size, FlushMode mode); + + std::shared_ptr<core::logging::Logger> logger_; }; class ZlibDecompressStream : public ZlibBaseStream { @@ -98,7 +103,7 @@ class ZlibDecompressStream : public ZlibBaseStream { size_t write(const uint8_t *value, size_t size) override; private: - std::shared_ptr<logging::Logger> logger_{logging::LoggerFactory<ZlibDecompressStream>::getLogger()}; + std::shared_ptr<core::logging::Logger> logger_; }; } // namespace io diff --git a/libminifi/include/utils/Literals.h b/libminifi/include/utils/Literals.h new file mode 100644 index 0000000..9b47b8a --- /dev/null +++ b/libminifi/include/utils/Literals.h @@ -0,0 +1,59 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +constexpr unsigned long long operator "" _KiB(unsigned long long n) { // NOLINT + return 1024 * n; +} + +constexpr unsigned long long operator "" _MiB(unsigned long long n) { // NOLINT + return 1024_KiB * n; +} + +constexpr unsigned long long operator "" _GiB(unsigned long long n) { // NOLINT + return 1024_MiB * n; +} + +constexpr unsigned long long operator "" _TiB(unsigned long long n) { // NOLINT + return 1024_GiB * n; +} + +constexpr unsigned long long operator "" _PiB(unsigned long long n) { // NOLINT + return 1024_TiB * n; +} + +constexpr unsigned long long operator "" _KB(unsigned long long n) { // NOLINT + return 1000 * n; +} + +constexpr unsigned long long operator "" _MB(unsigned long long n) { // NOLINT + return 1000_KB * n; +} + +constexpr unsigned long long operator "" _GB(unsigned long long n) { // NOLINT + return 1000_MB * n; +} + +constexpr unsigned long long operator "" _TB(unsigned long long n) { // NOLINT + return 1000_GB * n; +} + +constexpr unsigned long long operator "" _PB(unsigned long long n) { // NOLINT + return 1000_TB * n; +} diff --git a/libminifi/include/utils/StagingQueue.h b/libminifi/include/utils/StagingQueue.h new file mode 100644 index 0000000..2714a3a --- /dev/null +++ b/libminifi/include/utils/StagingQueue.h @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <mutex> +#include <atomic> +#include <utility> +#include "MinifiConcurrentQueue.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + +namespace internal { +template<typename T> +struct default_allocator { + T operator()(size_t max_size) const { + return T::allocate(max_size); + } +}; +} // namespace internal + +/** + * Purpose: A FIFO container that allows chunked processing while trying to enforce + * soft limits like max chunk size and max total size. The "head" chunk might be + * modified in a thread-safe manner (usually appending to it) before committing it + * thus making it available for dequeuing. + */ +template<typename ActiveItem, typename Allocator = internal::default_allocator<ActiveItem>> +class StagingQueue { + using Item = typename std::decay<decltype(std::declval<ActiveItem&>().commit())>::type; + + static_assert(std::is_same<decltype(std::declval<const Allocator&>()(std::declval<size_t>())), ActiveItem>::value, + "Allocator::operator(size_t) must return an ActiveItem"); + static_assert(std::is_same<decltype(std::declval<const Item&>().size()), size_t>::value, + "Item::size must return size_t"); + static_assert(std::is_same<decltype(std::declval<const ActiveItem&>().size()), size_t>::value, + "ActiveItem::size must return size_t"); + + template<typename Functor, typename Arg, typename = void> + struct FunctorCallHelper; + + template<typename Functor, typename Arg> + struct FunctorCallHelper<Functor, Arg, typename std::enable_if<std::is_same<decltype(std::declval<Functor>()(std::declval<Arg>())), bool>::value>::type> { + static bool call(Functor&& fn, Arg&& arg) { + return std::forward<Functor>(fn)(std::forward<Arg>(arg)); + } + }; + + template<typename Functor, typename Arg> + struct FunctorCallHelper<Functor, Arg, typename std::enable_if<std::is_same<decltype(std::declval<Functor>()(std::declval<Arg>())), void>::value>::type> { + static bool call(Functor&& fn, Arg&& arg) { + std::forward<Functor>(fn)(std::forward<Arg>(arg)); + return false; + } + }; + + static ActiveItem allocateActiveItem(const Allocator& allocator, size_t max_item_size) { + // max_size is a soft limit, i.e. reaching max_size is an indicator + // that that item should be committed, we cannot guarantee that only + // max_size content is in the item, since max_size is the "trigger limit", + // presumable each item would contain (at the trigger point) a little + // more than max_size content, that is the reasoning behind "* 3 / 2" + return allocator(max_item_size * 3 / 2); + } + + public: + StagingQueue(size_t max_size, size_t max_item_size, Allocator allocator = {}) + : max_size_(max_size), + max_item_size_(max_item_size), + active_item_(allocateActiveItem(allocator, max_item_size)), + allocator_(allocator) {} + + void commit() { + std::unique_lock<std::mutex> lock{active_item_mutex_}; + if (active_item_.size() == 0) { + // nothing to commit + return; + } + commit(lock); + } + + /** + * Allows thread-safe modification of the "live" instance. + * @tparam Functor + * @param fn callable which can modify the instance, should return true + * if it would like to force a commit + */ + template<typename Functor> + void modify(Functor&& fn) { + std::unique_lock<std::mutex> lock{active_item_mutex_}; + size_t original_size = active_item_.size(); + bool should_commit = FunctorCallHelper<Functor, ActiveItem&>::call(std::forward<Functor>(fn), active_item_); + size_t new_size = active_item_.size(); + if (new_size >= original_size) { + total_size_ += new_size - original_size; + } else { + total_size_ -= original_size - new_size; + } + if (should_commit || new_size > max_item_size_) { + commit(lock); + } + } + + template<class Rep, class Period> + bool tryDequeue(Item& out, const std::chrono::duration<Rep, Period>& time) { + if (time == std::chrono::duration<Rep, Period>{0}) { + return tryDequeue(out); + } + if (queue_.dequeueWaitFor(out, time)) { + total_size_ -= out.size(); + return true; + } + return false; + } + + bool tryDequeue(Item& out) { + if (queue_.tryDequeue(out)) { + total_size_ -= out.size(); + return true; + } + return false; + } + + void discardOverflow() { + while (total_size_ > max_size_) { + Item item; + if (!queue_.tryDequeue(item)) { + break; + } + total_size_ -= item.size(); + } + } + + size_t size() const { + return total_size_; + } + + private: + void commit(std::unique_lock<std::mutex>& /*lock*/) { + queue_.enqueue(active_item_.commit()); + active_item_ = allocateActiveItem(allocator_, max_item_size_); + } + + const size_t max_size_; + const size_t max_item_size_; + std::atomic<size_t> total_size_{0}; + + std::mutex active_item_mutex_; + ActiveItem active_item_; + + const Allocator allocator_; + + ConditionConcurrentQueue<Item> queue_; +}; + +} // namespace utils +} // namespace minifi +} // namespace nifi +} // namespace apache +} // namespace org diff --git a/libminifi/include/utils/ValueParser.h b/libminifi/include/utils/ValueParser.h index 6639165..8910096 100644 --- a/libminifi/include/utils/ValueParser.h +++ b/libminifi/include/utils/ValueParser.h @@ -137,6 +137,10 @@ class ValueParser { } } + std::string rest() const noexcept { + return str.substr(offset); + } + private: /** * diff --git a/libminifi/src/core/TypedValues.cpp b/libminifi/src/core/TypedValues.cpp index 93a43b8..8306dec 100644 --- a/libminifi/src/core/TypedValues.cpp +++ b/libminifi/src/core/TypedValues.cpp @@ -16,8 +16,12 @@ * limitations under the License. */ +#include <memory> + #include "core/Property.h" #include "core/TypedValues.h" +#include "core/logging/LoggerConfiguration.h" + namespace org { namespace apache { namespace nifi { @@ -27,6 +31,11 @@ namespace core { const std::type_index DataSizeValue::type_id = typeid(uint64_t); const std::type_index TimePeriodValue::type_id = typeid(uint64_t); +std::shared_ptr<logging::Logger>& DataSizeValue::getLogger() { + static std::shared_ptr<logging::Logger> logger = logging::LoggerFactory<DataSizeValue>::getLogger(); + return logger; +} + } /* namespace core */ } /* namespace minifi */ } /* namespace nifi */ diff --git a/libminifi/src/core/logging/LoggerConfiguration.cpp b/libminifi/src/core/logging/LoggerConfiguration.cpp index 2b89547..dfff263 100644 --- a/libminifi/src/core/logging/LoggerConfiguration.cpp +++ b/libminifi/src/core/logging/LoggerConfiguration.cpp @@ -27,12 +27,16 @@ #include <memory> #include <map> #include <string> +#include <atomic> #include "core/Core.h" #include "utils/StringUtils.h" #include "utils/ClassUtils.h" #include "utils/file/FileUtils.h" #include "utils/Environment.h" +#include "core/logging/internal/LogCompressorSink.h" +#include "utils/Literals.h" +#include "core/TypedValues.h" #include "spdlog/spdlog.h" #include "spdlog/sinks/stdout_sinks.h" @@ -60,6 +64,25 @@ namespace logging { const char* LoggerConfiguration::spdlog_default_pattern = "[%Y-%m-%d %H:%M:%S.%e] [%n] [%l] %v"; +utils::optional<spdlog::level::level_enum> parse_log_level(const std::string& level_name) { + if (utils::StringUtils::equalsIgnoreCase(level_name, "trace")) { + return spdlog::level::trace; + } else if (utils::StringUtils::equalsIgnoreCase(level_name, "debug")) { + return spdlog::level::debug; + } else if (utils::StringUtils::equalsIgnoreCase(level_name, "info")) { + return spdlog::level::info; + } else if (utils::StringUtils::equalsIgnoreCase(level_name, "warn")) { + return spdlog::level::warn; + } else if (utils::StringUtils::equalsIgnoreCase(level_name, "error")) { + return spdlog::level::err; + } else if (utils::StringUtils::equalsIgnoreCase(level_name, "critical")) { + return spdlog::level::critical; + } else if (utils::StringUtils::equalsIgnoreCase(level_name, "off")) { + return spdlog::level::off; + } + return {}; +} + std::vector<std::string> LoggerProperties::get_keys_of_type(const std::string &type) { std::vector<std::string> appenders; std::string prefix = type + "."; @@ -85,6 +108,7 @@ LoggerConfiguration::LoggerConfiguration() void LoggerConfiguration::initialize(const std::shared_ptr<LoggerProperties> &logger_properties) { std::lock_guard<std::mutex> lock(mutex); root_namespace_ = initialize_namespaces(logger_properties); + initializeCompression(lock, logger_properties); std::string spdlog_pattern; if (!logger_properties->getString("spdlog.pattern", spdlog_pattern)) { spdlog_pattern = spdlog_default_pattern; @@ -116,6 +140,10 @@ void LoggerConfiguration::initialize(const std::shared_ptr<LoggerProperties> &lo std::shared_ptr<Logger> LoggerConfiguration::getLogger(const std::string &name) { std::lock_guard<std::mutex> lock(mutex); + return getLogger(name, lock); +} + +std::shared_ptr<Logger> LoggerConfiguration::getLogger(const std::string &name, const std::lock_guard<std::mutex>& /*lock*/) { std::string adjusted_name = name; const std::string clazz = "class "; auto haz_clazz = name.find(clazz); @@ -207,19 +235,9 @@ std::shared_ptr<internal::LoggerNamespace> LoggerConfiguration::initialize_names std::string level_name = utils::StringUtils::trim(segment); if (first) { first = false; - std::transform(level_name.begin(), level_name.end(), level_name.begin(), ::tolower); - if ("trace" == level_name) { - level = spdlog::level::trace; - } else if ("debug" == level_name) { - level = spdlog::level::debug; - } else if ("warn" == level_name) { - level = spdlog::level::warn; - } else if ("critical" == level_name) { - level = spdlog::level::critical; - } else if ("error" == level_name) { - level = spdlog::level::err; - } else if ("off" == level_name) { - level = spdlog::level::off; + auto opt_level = parse_log_level(level_name); + if (opt_level) { + level = *opt_level; } } else { sinks.push_back(sink_map[level_name]); @@ -258,6 +276,7 @@ std::shared_ptr<spdlog::logger> LoggerConfiguration::get_logger(std::shared_ptr< } std::shared_ptr<internal::LoggerNamespace> current_namespace = root_namespace; std::vector<std::shared_ptr<spdlog::sinks::sink>> sinks = root_namespace->sinks; + std::vector<std::shared_ptr<spdlog::sinks::sink>> inherited_sinks; spdlog::level::level_enum level = root_namespace->level; std::string current_namespace_str = ""; std::string sink_namespace_str = "root"; @@ -268,6 +287,7 @@ std::shared_ptr<spdlog::logger> LoggerConfiguration::get_logger(std::shared_ptr< if (child_pair == current_namespace->children.end()) { break; } + std::copy(current_namespace->exported_sinks.begin(), current_namespace->exported_sinks.end(), std::back_inserter(inherited_sinks)); current_namespace = child_pair->second; if (current_namespace->sinks.size() > 0) { sinks = current_namespace->sinks; @@ -283,6 +303,7 @@ std::shared_ptr<spdlog::logger> LoggerConfiguration::get_logger(std::shared_ptr< const auto levelView(spdlog::level::to_string_view(level)); logger->log_debug("%s logger got sinks from namespace %s and level %s from namespace %s", name, sink_namespace_str, std::string(levelView.begin(), levelView.end()), level_namespace_str); } + std::copy(inherited_sinks.begin(), inherited_sinks.end(), std::back_inserter(sinks)); spdlogger = std::make_shared<spdlog::logger>(name, begin(sinks), end(sinks)); spdlogger->set_level(level); spdlogger->set_formatter(formatter -> clone()); @@ -318,6 +339,14 @@ std::shared_ptr<internal::LoggerNamespace> LoggerConfiguration::create_default_r return result; } +void LoggerConfiguration::initializeCompression(const std::lock_guard<std::mutex>& lock, const std::shared_ptr<LoggerProperties>& properties) { + auto compression_sink = compression_manager_.initialize(properties, logger_, [&] (const std::string& name) {return getLogger(name, lock);}); + if (compression_sink) { + root_namespace_->sinks.push_back(compression_sink); + root_namespace_->exported_sinks.push_back(compression_sink); + } +} + } /* namespace logging */ } /* namespace core */ } /* namespace minifi */ diff --git a/libminifi/src/core/logging/internal/CompressionManager.cpp b/libminifi/src/core/logging/internal/CompressionManager.cpp new file mode 100644 index 0000000..6347988 --- /dev/null +++ b/libminifi/src/core/logging/internal/CompressionManager.cpp @@ -0,0 +1,74 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <memory> +#include <mutex> + +#include "core/logging/internal/CompressionManager.h" +#include "core/logging/internal/LogCompressorSink.h" +#include "core/logging/Logger.h" +#include "core/logging/LoggerProperties.h" +#include "core/TypedValues.h" +#include "core/Core.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { +namespace logging { +namespace internal { + +std::shared_ptr<LogCompressorSink> CompressionManager::initialize( + const std::shared_ptr<LoggerProperties>& properties, const std::shared_ptr<Logger>& error_logger, const LoggerFactory& logger_factory) { + auto get_size = [&] (const char* const property_name) -> utils::optional<size_t> { + auto size_str = properties->getString(property_name); + if (!size_str) return {}; + size_t value; + if (DataSizeValue::StringToInt(*size_str, value)) { + return value; + } + if (error_logger) { + error_logger->log_error("Invalid format for %s", property_name); + } + return {}; + }; + auto cached_log_max_size = get_size(compression_cached_log_max_size_).value_or(8_MiB); + auto compressed_log_max_size = get_size(compression_compressed_log_max_size_).value_or(8_MiB); + std::shared_ptr<internal::LogCompressorSink> sink; + if (cached_log_max_size != 0 && compressed_log_max_size != 0) { + sink = std::make_shared<internal::LogCompressorSink>( + LogQueueSize{cached_log_max_size, cache_segment_size}, + LogQueueSize{compressed_log_max_size, compressed_segment_size}, + logger_factory(getClassName<LogCompressorSink>())); + } + { + // gcc4.8 bug => cannot use std::atomic_store + std::lock_guard<std::mutex> lock(mtx_); + sink_ = sink; + } + return sink; +} + +} // namespace internal +} // namespace logging +} // namespace core +} // namespace minifi +} // namespace nifi +} // namespace apache +} // namespace org diff --git a/libminifi/src/core/TypedValues.cpp b/libminifi/src/core/logging/internal/LogCompressor.cpp similarity index 56% copy from libminifi/src/core/TypedValues.cpp copy to libminifi/src/core/logging/internal/LogCompressor.cpp index 93a43b8..4dd23c0 100644 --- a/libminifi/src/core/TypedValues.cpp +++ b/libminifi/src/core/logging/internal/LogCompressor.cpp @@ -16,19 +16,32 @@ * limitations under the License. */ -#include "core/Property.h" -#include "core/TypedValues.h" +#include "core/logging/internal/LogCompressor.h" +#include "core/logging/LoggerConfiguration.h" + namespace org { namespace apache { namespace nifi { namespace minifi { namespace core { +namespace logging { +namespace internal { + +LogCompressor::LogCompressor(gsl::not_null<OutputStream *> output, std::shared_ptr<logging::Logger> logger) + : ZlibCompressStream(output, io::ZlibCompressionFormat::GZIP, Z_DEFAULT_COMPRESSION, std::move(logger)) {} + +LogCompressor::FlushResult LogCompressor::flush() { + if (write(nullptr, 0, Z_SYNC_FLUSH) == 0) { + return FlushResult::Success; + } + return FlushResult::Error; +} -const std::type_index DataSizeValue::type_id = typeid(uint64_t); -const std::type_index TimePeriodValue::type_id = typeid(uint64_t); +} // namespace internal +} // namespace logging +} // namespace core +} // namespace minifi +} // namespace nifi +} // namespace apache +} // namespace org -} /* namespace core */ -} /* namespace minifi */ -} /* namespace nifi */ -} /* namespace apache */ -} /* namespace org */ diff --git a/libminifi/src/core/logging/internal/LogCompressorSink.cpp b/libminifi/src/core/logging/internal/LogCompressorSink.cpp new file mode 100644 index 0000000..112e6ae --- /dev/null +++ b/libminifi/src/core/logging/internal/LogCompressorSink.cpp @@ -0,0 +1,81 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "core/logging/internal/LogCompressorSink.h" +#include "spdlog/details/log_msg.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { +namespace logging { +namespace internal { + +LogCompressorSink::LogCompressorSink(LogQueueSize cache_size, LogQueueSize compressed_size, std::shared_ptr<logging::Logger> logger) + : cached_logs_(cache_size.max_total_size, cache_size.max_segment_size), + compressed_logs_(compressed_size.max_total_size, compressed_size.max_segment_size, ActiveCompressor::Allocator{std::move(logger)}) { + compression_thread_ = std::thread{&LogCompressorSink::run, this}; +} + +LogCompressorSink::~LogCompressorSink() { + running_ = false; + compression_thread_.join(); +} + +void LogCompressorSink::sink_it_(const spdlog::details::log_msg &msg) { + cached_logs_.modify([&] (LogBuffer& active) { + active.buffer_->write(reinterpret_cast<const uint8_t*>(msg.payload.data()), msg.payload.size()); + }); +} + +void LogCompressorSink::run() { + while (running_) { + cached_logs_.discardOverflow(); + compressed_logs_.discardOverflow(); + if (compress() == CompressionResult::NothingToCompress) { + std::this_thread::sleep_for(std::chrono::milliseconds{100}); + } + } +} + +LogCompressorSink::CompressionResult LogCompressorSink::compress(bool force_rotation) { + LogBuffer log_cache; + if (!cached_logs_.tryDequeue(log_cache)) { + if (force_rotation) { + compressed_logs_.commit(); + } + return CompressionResult::NothingToCompress; + } + compressed_logs_.modify([&] (ActiveCompressor& compressor) { + compressor.compressor_->write(log_cache.buffer_->getBuffer(), log_cache.buffer_->size()); + compressor.compressor_->flush(); + return force_rotation; + }); + return CompressionResult::Success; +} + +void LogCompressorSink::flush_() {} + +} // namespace internal +} // namespace logging +} // namespace core +} // namespace minifi +} // namespace nifi +} // namespace apache +} // namespace org diff --git a/libminifi/src/io/ZlibStream.cpp b/libminifi/src/io/ZlibStream.cpp index 711a31d..9a9aa2f 100644 --- a/libminifi/src/io/ZlibStream.cpp +++ b/libminifi/src/io/ZlibStream.cpp @@ -19,6 +19,7 @@ #include "io/ZlibStream.h" #include "Exception.h" #include "utils/gsl.h" +#include "core/logging/LoggerConfiguration.h" namespace org { namespace apache { @@ -39,7 +40,11 @@ bool ZlibBaseStream::isFinished() const { } ZlibCompressStream::ZlibCompressStream(gsl::not_null<OutputStream*> output, ZlibCompressionFormat format, int level) - : ZlibBaseStream(output) { + : ZlibCompressStream(output, format, level, logging::LoggerFactory<ZlibCompressStream>::getLogger()) {} + +ZlibCompressStream::ZlibCompressStream(gsl::not_null<OutputStream*> output, ZlibCompressionFormat format, int level, std::shared_ptr<logging::Logger> logger) + : ZlibBaseStream(output), + logger_{std::move(logger)} { int ret = deflateInit2( &strm_, level, @@ -57,11 +62,22 @@ ZlibCompressStream::ZlibCompressStream(gsl::not_null<OutputStream*> output, Zlib ZlibCompressStream::~ZlibCompressStream() { if (state_ != ZlibStreamState::UNINITIALIZED) { - deflateEnd(&strm_); + int result = deflateEnd(&strm_); + if (result == Z_DATA_ERROR) { + logger_->log_debug("Stream was freed prematurely"); + } else if (result == Z_STREAM_ERROR) { + logger_->log_debug("Stream state was inconsistent"); + } else if (result != Z_OK) { + logger_->log_debug("Unknown error while finishing compression %d", result); + } } } -size_t ZlibCompressStream::write(const uint8_t* value, size_t size) { +size_t ZlibCompressStream::write(const uint8_t *value, size_t size) { + return write(value, size, Z_NO_FLUSH); +} + +size_t ZlibCompressStream::write(const uint8_t* value, size_t size, FlushMode mode) { if (state_ != ZlibStreamState::INITIALIZED) { logger_->log_error("writeData called in invalid ZlibCompressStream state, state is %hhu", state_); return STREAM_ERROR; @@ -84,10 +100,9 @@ size_t ZlibCompressStream::write(const uint8_t* value, size_t size) { strm_.next_out = outputBuffer_.data(); strm_.avail_out = gsl::narrow<uInt>(outputBuffer_.size()); - int flush = value == nullptr ? Z_FINISH : Z_NO_FLUSH; - logger_->log_trace("calling deflate with flush %d", flush); + logger_->log_trace("calling deflate with flush %d", mode); - int ret = deflate(&strm_, flush); + int ret = deflate(&strm_, mode); if (ret == Z_STREAM_ERROR) { logger_->log_error("deflate failed, error code: %d", ret); state_ = ZlibStreamState::ERRORED; @@ -107,14 +122,15 @@ size_t ZlibCompressStream::write(const uint8_t* value, size_t size) { void ZlibCompressStream::close() { if (state_ == ZlibStreamState::INITIALIZED) { - if (write(nullptr, 0U) == 0) { + if (write(nullptr, 0U, Z_FINISH) == 0) { state_ = ZlibStreamState::FINISHED; } } } ZlibDecompressStream::ZlibDecompressStream(gsl::not_null<OutputStream*> output, ZlibCompressionFormat format) - : ZlibBaseStream(output) { + : ZlibBaseStream(output), + logger_{logging::LoggerFactory<ZlibDecompressStream>::getLogger()} { int ret = inflateInit2(&strm_, 15 + (format == ZlibCompressionFormat::GZIP ? 16 : 0) /* windowBits */); if (ret != Z_OK) { logger_->log_error("Failed to initialize z_stream with inflateInit2, error code: %d", ret); @@ -126,7 +142,12 @@ ZlibDecompressStream::ZlibDecompressStream(gsl::not_null<OutputStream*> output, ZlibDecompressStream::~ZlibDecompressStream() { if (state_ != ZlibStreamState::UNINITIALIZED) { - inflateEnd(&strm_); + int result = inflateEnd(&strm_); + if (result == Z_STREAM_ERROR) { + logger_->log_error("Stream state was inconsistent"); + } else if (result != Z_OK) { + logger_->log_error("Unknown error while finishing decompression %d", result); + } } } diff --git a/libminifi/test/unit/LoggerTests.cpp b/libminifi/test/unit/LoggerTests.cpp index e9db9a7..8750df7 100644 --- a/libminifi/test/unit/LoggerTests.cpp +++ b/libminifi/test/unit/LoggerTests.cpp @@ -22,6 +22,9 @@ #include <ctime> #include "../TestBase.h" #include "core/logging/LoggerConfiguration.h" +#include "io/ZlibStream.h" +#include "StreamPipe.h" +#include "utils/IntegrationTestUtils.h" TEST_CASE("Test log Levels", "[ttl1]") { LogTestController::getInstance().setTrace<logging::Logger>(); @@ -76,7 +79,7 @@ TEST_CASE("Test log Levels change", "[ttl5]") { namespace single { class TestClass { }; -} +} // namespace single class TestClass2 { }; @@ -107,3 +110,95 @@ TEST_CASE("Test ShortenNames", "[ttl6]") { LogTestController::getInstance(props)->reset(); LogTestController::getInstance().reset(); } + +using namespace minifi::io; + +std::string decompress(const std::shared_ptr<InputStream>& input) { + auto output = utils::make_unique<BufferStream>(); + auto decompressor = std::make_shared<ZlibDecompressStream>(gsl::make_not_null(output.get())); + minifi::internal::pipe(input, decompressor); + decompressor->close(); + return std::string{reinterpret_cast<const char*>(output->getBuffer()), output->size()}; +} + +TEST_CASE("Test Compression", "[ttl7]") { + auto& log_config = logging::LoggerConfiguration::getConfiguration(); + auto properties = std::make_shared<logging::LoggerProperties>(); + std::string className; + SECTION("Using root logger") { + className = "CompressionTestClassUsingRoot"; + // by default the root logger is OFF + properties->set("logger.root", "INFO"); + } + SECTION("Inherit compression sink") { + className = "CompressionTestClassInheriting"; + properties->set("appender.null", "null"); + properties->set("logger." + className, "INFO,null"); + } + log_config.initialize(properties); + auto logger = log_config.getLogger(className); + logger->log_error("Hi there"); + std::shared_ptr<InputStream> compressed_log{logging::LoggerConfiguration::getCompressedLog(true)}; + REQUIRE(compressed_log); + auto logs = decompress(compressed_log); + REQUIRE(logs == "Hi there"); +} + +class LoggerTestAccessor { + public: + static void setCompressionCacheSegmentSize(logging::LoggerConfiguration& log_config, size_t value) { + log_config.compression_manager_.cache_segment_size = value; + } + static void setCompressionCompressedSegmentSize(logging::LoggerConfiguration& log_config, size_t value) { + log_config.compression_manager_.compressed_segment_size = value; + } + static size_t getUncompressedSize(logging::LoggerConfiguration& log_config) { + return log_config.compression_manager_.getSink()->cached_logs_.size(); + } + static size_t getCompressedSize(logging::LoggerConfiguration& log_config) { + return log_config.compression_manager_.getSink()->compressed_logs_.size(); + } +}; + +TEST_CASE("Test Compression cache overflow is discarded intermittently", "[ttl8]") { + auto& log_config = logging::LoggerConfiguration::getConfiguration(); + auto properties = std::make_shared<logging::LoggerProperties>(); + properties->set(logging::internal::CompressionManager::compression_cached_log_max_size_, "10 KB"); + LoggerTestAccessor::setCompressionCacheSegmentSize(log_config, 1_KiB); + std::string className = "CompressionTestCacheCleaned"; + // by default the root logger is OFF + properties->set("logger.root", "INFO"); + log_config.initialize(properties); + auto logger = log_config.getLogger(className); + for (size_t idx = 0; idx < 10000; ++idx) { + logger->log_error("Hi there"); + } + bool cache_shrunk = utils::verifyEventHappenedInPollTime(std::chrono::seconds{1}, [&] { + return LoggerTestAccessor::getUncompressedSize(log_config) <= 10_KiB; + }); + REQUIRE(cache_shrunk); +} + +TEST_CASE("Setting either properties to 0 disables in-memory compressed logs", "[ttl9]") { + auto& log_config = logging::LoggerConfiguration::getConfiguration(); + auto properties = std::make_shared<logging::LoggerProperties>(); + bool is_nullptr = false; + SECTION("Cached log size is set to 0") { + is_nullptr = true; + properties->set(logging::internal::CompressionManager::compression_cached_log_max_size_, "0"); + } + SECTION("Compressed log size is set to 0") { + is_nullptr = true; + properties->set(logging::internal::CompressionManager::compression_compressed_log_max_size_, "0"); + } + SECTION("Sanity check") { + is_nullptr = false; + // pass + } + // by default the root logger is OFF + properties->set("logger.root", "INFO"); + log_config.initialize(properties); + auto logger = log_config.getLogger("DisableCompressionTestLogger"); + logger->log_error("Hi there"); + REQUIRE((logging::LoggerConfiguration::getCompressedLog(true) == nullptr) == is_nullptr); +} diff --git a/libminifi/test/unit/StagingQueueTests.cpp b/libminifi/test/unit/StagingQueueTests.cpp new file mode 100644 index 0000000..9391458 --- /dev/null +++ b/libminifi/test/unit/StagingQueueTests.cpp @@ -0,0 +1,121 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <string> + +#include "utils/StringUtils.h" +#include "../TestBase.h" +#include "utils/StagingQueue.h" + +using org::apache::nifi::minifi::utils::StagingQueue; + +class MockItem { + public: + static MockItem allocate(size_t max_size) { + MockItem instance; + instance.data_.reserve(max_size * 3 / 2); + return instance; + } + + MockItem commit() { + return std::move(*this); + } + + size_t size() const { + return data_.size(); + } + + std::string data_; +}; + +TEST_CASE("Construct queue", "[TestStagingQueue1]") { + StagingQueue<MockItem> queue(30, 10); + REQUIRE(queue.size() == 0); +} + +TEST_CASE("Modify no commit", "[TestStagingQueue2]") { + StagingQueue<MockItem> queue(30, 10); + queue.modify([] (MockItem& item) { + item.data_ += "12345"; + }); + REQUIRE(queue.size() == 5); + SECTION("Decrease size") { + queue.modify([] (MockItem& item) { + REQUIRE(item.data_ == "12345"); + item.data_ = ""; + }); + REQUIRE(queue.size() == 0); + } + MockItem out; + REQUIRE(!queue.tryDequeue(out)); +} + +TEST_CASE("Modify and commit", "[TestStagingQueue3]") { + StagingQueue<MockItem> queue(30, 10); + queue.modify([] (MockItem& item) { + item.data_ += "12345"; + }); + queue.commit(); + SECTION("Commit is idempotent if there is no modification between") { + queue.commit(); + } + REQUIRE(queue.size() == 5); + MockItem out; + REQUIRE(queue.tryDequeue(out)); + REQUIRE(out.data_ == "12345"); + REQUIRE(queue.size() == 0); +} + +TEST_CASE("Modify and overflow triggered automatic commit", "[TestStagingQueue4]") { + StagingQueue<MockItem> queue(30, 10); + queue.modify([] (MockItem& item) { + item.data_ += "123456789ab"; + }); + SECTION("Explicit commit makes no difference") { + queue.commit(); + } + queue.modify([] (MockItem& item) { + // a new item has been allocated + REQUIRE(item.data_ == ""); + }); + REQUIRE(queue.size() == 11); + MockItem out; + REQUIRE(queue.tryDequeue(out)); + REQUIRE(out.data_ == "123456789ab"); + REQUIRE(queue.size() == 0); +} + +TEST_CASE("Discard overflow", "[TestStagingQueue5]") { + StagingQueue<MockItem> queue(30, 10); + for (size_t idx = 0; idx < 5; ++idx) { + queue.modify([&] (MockItem& item) { + item.data_ = utils::StringUtils::repeat(std::to_string(idx), 10); + }); + queue.commit(); + } + REQUIRE(queue.size() == 50); + queue.discardOverflow(); + REQUIRE(queue.size() == 30); + MockItem out; + // idx 0 and 1 have been discarded + for (size_t idx = 2; idx < 5; ++idx) { + REQUIRE(queue.tryDequeue(out)); + REQUIRE(out.data_ == utils::StringUtils::repeat(std::to_string(idx), 10)); + } + REQUIRE(queue.size() == 0); +}
