This is an automated email from the ASF dual-hosted git repository. lordgamez pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 3737210b29addb1c60fde730d00a40633f879026 Author: Ferenc Gerlits <[email protected]> AuthorDate: Mon Mar 24 12:58:15 2025 +0100 MINIFICPP-2483 Unify buffer sizes Unify buffer sizes and make them configurable in most places. Signed-off-by: Gabor Gyimesi <[email protected]> This closes #1953 --- conf/minifi.properties | 3 + controller/Controller.cpp | 3 +- encrypt-config/tests/ConfigFileEncryptorTests.cpp | 6 +- encrypt-config/tests/ConfigFileTests.cpp | 6 +- encrypt-config/tests/resources/minifi.properties | 6 ++ .../include/utils/file/FileReaderCallback.h | 3 +- .../src/utils/file/FileReaderCallback.cpp | 15 ++--- extensions/aws/s3/S3Wrapper.cpp | 6 +- extensions/aws/s3/S3Wrapper.h | 3 +- extensions/execute-process/ExecuteProcess.cpp | 6 +- extensions/execute-process/ExecuteProcess.h | 1 + extensions/libarchive/FocusArchiveEntry.cpp | 9 ++- extensions/libarchive/ReadArchiveStream.h | 4 +- extensions/libarchive/UnfocusArchiveEntry.cpp | 7 ++- .../libarchive/tests/CompressContentTests.cpp | 70 +++++++++------------- .../rocksdb-repos/tests/ContentSessionTests.cpp | 4 +- extensions/sftp/client/SFTPClient.cpp | 10 ++-- extensions/sftp/client/SFTPClient.h | 9 +-- extensions/sftp/processors/FetchSFTP.cpp | 5 +- extensions/sftp/processors/ListSFTP.cpp | 5 +- extensions/sftp/processors/PutSFTP.cpp | 5 +- extensions/sftp/processors/SFTPProcessorBase.cpp | 6 +- extensions/sftp/processors/SFTPProcessorBase.h | 3 +- extensions/smb/FetchSmb.cpp | 4 +- extensions/smb/FetchSmb.h | 1 + .../standard-processors/processors/ExtractText.cpp | 37 +++++------- .../standard-processors/processors/ExtractText.h | 4 +- .../standard-processors/processors/FetchFile.cpp | 4 +- .../standard-processors/processors/FetchFile.h | 1 + .../standard-processors/processors/GetFile.cpp | 5 +- .../standard-processors/processors/GetFile.h | 1 + .../processors/SegmentContent.cpp | 9 +-- .../processors/SegmentContent.h | 1 + .../processors/SplitContent.cpp | 15 +++-- .../standard-processors/processors/SplitContent.h | 3 +- .../standard-processors/processors/SplitText.cpp | 43 +++++++------ .../standard-processors/processors/SplitText.h | 10 ++-- .../standard-processors/processors/TailFile.cpp | 39 ++++++------ .../standard-processors/processors/TailFile.h | 1 + .../tests/unit/SplitContentTests.cpp | 4 +- .../tests/unit/SplitTextTests.cpp | 53 ++++++++-------- libminifi/include/controllers/SSLContextService.h | 12 ++-- libminifi/include/core/logging/LoggerBase.h | 3 +- libminifi/src/Configuration.cpp | 1 + libminifi/src/Configure.cpp | 4 +- libminifi/src/c2/ControllerSocketProtocol.cpp | 3 +- libminifi/src/utils/tls/CertificateUtils.cpp | 10 ++-- libminifi/test/unit/ConfigurationUtilsTests.cpp | 38 ++++++++++++ .../include/minifi-cpp/core/ProcessContext.h | 2 +- .../include/minifi-cpp/properties/Configuration.h | 3 +- .../include/minifi-cpp/properties/Configure.h | 7 ++- utils/include/core/ProcessContext.h | 35 ++++------- utils/include/io/StreamPipe.h | 7 ++- utils/include/utils/ConfigurationUtils.h | 31 ++++++++++ utils/src/utils/ConfigurationUtils.cpp | 33 ++++++++++ utils/src/utils/file/FileUtils.cpp | 7 ++- 56 files changed, 389 insertions(+), 237 deletions(-) diff --git a/conf/minifi.properties b/conf/minifi.properties index f681e9a28..b327daaa3 100644 --- a/conf/minifi.properties +++ b/conf/minifi.properties @@ -131,6 +131,9 @@ nifi.c2.full.heartbeat=false ## specify the destination of c2 directed assets #nifi.asset.directory=${MINIFI_HOME}/asset +## You probably don't need to touch this, but you can if you want to +# nifi.default.internal.buffer.size=4096 + # Publish metrics to external consumers # nifi.metrics.publisher.agent.identifier= # nifi.metrics.publisher.class=PrometheusMetricsPublisher diff --git a/controller/Controller.cpp b/controller/Controller.cpp index 411c58ac2..ac2b87b55 100644 --- a/controller/Controller.cpp +++ b/controller/Controller.cpp @@ -26,6 +26,7 @@ #include "asio/ssl/stream.hpp" #include "asio/connect.hpp" #include "core/logging/Logger.h" +#include "utils/ConfigurationUtils.h" #include "utils/net/AsioSocketUtils.h" #include "utils/file/FileUtils.h" @@ -255,7 +256,7 @@ nonstd::expected<void, std::string> getDebugBundle(const utils::net::SocketData& } std::ofstream out_file(target_dir / "debug.tar.gz"); - const size_t BUFFER_SIZE = 4096; + static constexpr auto BUFFER_SIZE = utils::configuration::DEFAULT_BUFFER_SIZE; std::array<char, BUFFER_SIZE> out_buffer{}; while (bundle_size > 0) { const auto next_read_size = (std::min)(bundle_size, BUFFER_SIZE); diff --git a/encrypt-config/tests/ConfigFileEncryptorTests.cpp b/encrypt-config/tests/ConfigFileEncryptorTests.cpp index dd326c5b9..f0bd32e0b 100644 --- a/encrypt-config/tests/ConfigFileEncryptorTests.cpp +++ b/encrypt-config/tests/ConfigFileEncryptorTests.cpp @@ -74,10 +74,10 @@ TEST_CASE("ConfigFileEncryptor can encrypt the sensitive properties", "[encrypt- ConfigFile test_file{std::ifstream{"resources/minifi.properties"}}; std::string original_password = test_file.getValue(Configuration::nifi_rest_api_password).value(); - uint32_t num_properties_encrypted = encryptSensitivePropertiesInFile(test_file, KEY); + uint32_t initial_num_properties_encrypted = encryptSensitivePropertiesInFile(test_file, KEY); - REQUIRE(num_properties_encrypted == 1); - REQUIRE(test_file.size() == 104); + REQUIRE(initial_num_properties_encrypted == 1); + REQUIRE(test_file.size() == 110); REQUIRE(check_encryption(test_file, Configuration::nifi_rest_api_password, original_password.length())); SECTION("calling encryptSensitiveValuesInMinifiProperties a second time does nothing") { diff --git a/encrypt-config/tests/ConfigFileTests.cpp b/encrypt-config/tests/ConfigFileTests.cpp index cd998ff72..4f1c5f1f2 100644 --- a/encrypt-config/tests/ConfigFileTests.cpp +++ b/encrypt-config/tests/ConfigFileTests.cpp @@ -90,7 +90,7 @@ TEST_CASE("ConfigFile creates an empty object from a nonexistent file", "[encryp TEST_CASE("ConfigFile can parse a simple config file", "[encrypt-config][constructor]") { ConfigFile test_file{std::ifstream{"resources/minifi.properties"}}; - REQUIRE(test_file.size() == 103); + REQUIRE(test_file.size() == 109); } TEST_CASE("ConfigFile can test whether a key is present", "[encrypt-config][hasValue]") { @@ -143,7 +143,7 @@ TEST_CASE("ConfigFile can add a new setting after an existing setting", "[encryp SECTION("valid key") { test_file.insertAfter(Configuration::nifi_rest_api_password, "nifi.rest.api.password.protected", "my-cipher-name"); - REQUIRE(test_file.size() == 104); + REQUIRE(test_file.size() == 110); REQUIRE(test_file.getValue("nifi.rest.api.password.protected") == "my-cipher-name"); } @@ -158,7 +158,7 @@ TEST_CASE("ConfigFile can add a new setting at the end", "[encrypt-config][appen const std::string KEY = "nifi.bootstrap.sensitive.key"; const std::string VALUE = "aa411f289c91685ef9d5a9e5a4fad9393ff4c7a78ab978484323488caed7a9ab"; test_file.append(KEY, VALUE); - REQUIRE(test_file.size() == 104); + REQUIRE(test_file.size() == 110); REQUIRE(test_file.getValue(KEY) == std::make_optional(VALUE)); } diff --git a/encrypt-config/tests/resources/minifi.properties b/encrypt-config/tests/resources/minifi.properties index 6bb6566df..898f23435 100644 --- a/encrypt-config/tests/resources/minifi.properties +++ b/encrypt-config/tests/resources/minifi.properties @@ -83,6 +83,12 @@ nifi.c2.agent.identifier=EncryptConfigTester-001 #controller.socket.local.any.interface=false #controller.ssl.context.service=SSLContextService +## specify the destination of c2 directed assets +#nifi.asset.directory=${MINIFI_HOME}/asset + +## You probably don't need to touch this, but you can if you want to +# nifi.default.internal.buffer.size=4096 + # must be comma separated nifi.c2.flow.id= nifi.c2.flow.url= diff --git a/extension-utils/include/utils/file/FileReaderCallback.h b/extension-utils/include/utils/file/FileReaderCallback.h index e78607be5..1780cbb60 100644 --- a/extension-utils/include/utils/file/FileReaderCallback.h +++ b/extension-utils/include/utils/file/FileReaderCallback.h @@ -32,11 +32,12 @@ namespace org::apache::nifi::minifi::utils { */ class FileReaderCallback { public: - explicit FileReaderCallback(std::filesystem::path file_path); + explicit FileReaderCallback(std::filesystem::path file_path, size_t buffer_size); int64_t operator()(const std::shared_ptr<io::OutputStream>& output_stream) const; private: std::filesystem::path file_path_; + size_t buffer_size_; std::shared_ptr<core::logging::Logger> logger_; }; diff --git a/extension-utils/src/utils/file/FileReaderCallback.cpp b/extension-utils/src/utils/file/FileReaderCallback.cpp index 906f09580..9386d6356 100644 --- a/extension-utils/src/utils/file/FileReaderCallback.cpp +++ b/extension-utils/src/utils/file/FileReaderCallback.cpp @@ -23,21 +23,16 @@ #include "core/logging/LoggerFactory.h" #include "utils/StringUtils.h" -namespace { - -constexpr std::size_t BUFFER_SIZE = 4096; - -} // namespace - namespace org::apache::nifi::minifi::utils { -FileReaderCallback::FileReaderCallback(std::filesystem::path file_path) - : file_path_{std::move(file_path)}, +FileReaderCallback::FileReaderCallback(std::filesystem::path file_path, const size_t buffer_size) + : file_path_{std::move(file_path)}, + buffer_size_{buffer_size}, logger_(core::logging::LoggerFactory<FileReaderCallback>::getLogger()) { } int64_t FileReaderCallback::operator()(const std::shared_ptr<io::OutputStream>& output_stream) const { - std::array<char, BUFFER_SIZE> buffer{}; + std::vector<char> buffer(buffer_size_); uint64_t num_bytes_written = 0; std::ifstream input_stream{file_path_, std::ifstream::in | std::ifstream::binary}; @@ -46,7 +41,7 @@ int64_t FileReaderCallback::operator()(const std::shared_ptr<io::OutputStream>& } logger_->log_debug("Opening {}", file_path_); while (input_stream.good()) { - input_stream.read(buffer.data(), buffer.size()); + input_stream.read(buffer.data(), gsl::narrow<std::streamsize>(buffer.size())); if (input_stream.bad()) { throw FileReaderCallbackIOError(string::join_pack("Error reading file: ", std::strerror(errno)), errno); } diff --git a/extensions/aws/s3/S3Wrapper.cpp b/extensions/aws/s3/S3Wrapper.cpp index 08ae983f6..aff06b72f 100644 --- a/extensions/aws/s3/S3Wrapper.cpp +++ b/extensions/aws/s3/S3Wrapper.cpp @@ -74,7 +74,7 @@ std::shared_ptr<Aws::StringStream> S3Wrapper::readFlowFileStream(const std::shar auto data_stream = std::make_shared<Aws::StringStream>(); uint64_t read_size = 0; while (read_size < read_limit) { - const auto next_read_size = (std::min)(read_limit - read_size, BUFFER_SIZE); + const auto next_read_size = (std::min)(read_limit - read_size, uint64_t{BUFFER_SIZE}); const auto read_ret = stream->read(std::span(buffer).subspan(0, next_read_size)); if (io::isError(read_ret)) { throw StreamReadException("Reading flow file inputstream failed!"); @@ -229,11 +229,11 @@ bool S3Wrapper::deleteObject(const DeleteObjectRequestParameters& params) { } int64_t S3Wrapper::writeFetchedBody(Aws::IOStream& source, const int64_t data_size, io::OutputStream& output) { - std::vector<uint8_t> buffer(4096); + std::vector<uint8_t> buffer(BUFFER_SIZE); size_t write_size = 0; if (data_size < 0) return 0; while (write_size < gsl::narrow<uint64_t>(data_size)) { - const auto next_write_size = (std::min)(gsl::narrow<size_t>(data_size) - write_size, size_t{4096}); + const auto next_write_size = (std::min)(gsl::narrow<size_t>(data_size) - write_size, BUFFER_SIZE); if (!source.read(reinterpret_cast<char*>(buffer.data()), gsl::narrow<std::streamsize>(next_write_size))) { return -1; } diff --git a/extensions/aws/s3/S3Wrapper.h b/extensions/aws/s3/S3Wrapper.h index 12c47f3c0..d943076ba 100644 --- a/extensions/aws/s3/S3Wrapper.h +++ b/extensions/aws/s3/S3Wrapper.h @@ -37,6 +37,7 @@ #include "core/logging/Logger.h" #include "core/logging/LoggerFactory.h" #include "utils/AWSInitializer.h" +#include "utils/ConfigurationUtils.h" #include "utils/OptionalUtils.h" #include "utils/StringUtils.h" #include "utils/ListingStateManager.h" @@ -249,7 +250,7 @@ class StreamReadException : public Exception { class S3Wrapper { public: - static constexpr uint64_t BUFFER_SIZE = 4_KiB; + static constexpr auto BUFFER_SIZE = minifi::utils::configuration::DEFAULT_BUFFER_SIZE; S3Wrapper(); explicit S3Wrapper(std::unique_ptr<S3RequestSender>&& request_sender); diff --git a/extensions/execute-process/ExecuteProcess.cpp b/extensions/execute-process/ExecuteProcess.cpp index 4963e2ec0..a4c2188d2 100644 --- a/extensions/execute-process/ExecuteProcess.cpp +++ b/extensions/execute-process/ExecuteProcess.cpp @@ -30,6 +30,7 @@ #include "core/ProcessSession.h" #include "core/Resource.h" #include "core/TypedValues.h" +#include "utils/ConfigurationUtils.h" #include "utils/Environment.h" #include "utils/StringUtils.h" #include "utils/gsl.h" @@ -45,6 +46,7 @@ void ExecuteProcess::initialize() { } void ExecuteProcess::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { + buffer_size_ = utils::configuration::getBufferSize(*context.getConfiguration()); if (auto command = context.getProperty(Command)) { command_ = *command; } @@ -112,9 +114,9 @@ void ExecuteProcess::executeChildProcess() const { } void ExecuteProcess::readOutputInBatches(core::ProcessSession& session) const { + std::vector<char> buffer(buffer_size_); while (true) { std::this_thread::sleep_for(batch_duration_); - std::array<char, 4096> buffer; // NOLINT(cppcoreguidelines-pro-type-member-init) const auto num_read = read(pipefd_[0], buffer.data(), buffer.size()); if (num_read <= 0) { break; @@ -150,7 +152,7 @@ bool ExecuteProcess::writeToFlowFile(core::ProcessSession& session, std::shared_ } void ExecuteProcess::readOutput(core::ProcessSession& session) const { - std::array<char, 4096> buffer; // NOLINT(cppcoreguidelines-pro-type-member-init) + std::vector<char> buffer(buffer_size_); char *buf_ptr = buffer.data(); size_t read_to_buffer = 0; std::shared_ptr<core::FlowFile> flow_file; diff --git a/extensions/execute-process/ExecuteProcess.h b/extensions/execute-process/ExecuteProcess.h index 4e4f0e5cd..2abd0b529 100644 --- a/extensions/execute-process/ExecuteProcess.h +++ b/extensions/execute-process/ExecuteProcess.h @@ -122,6 +122,7 @@ class ExecuteProcess final : public core::ProcessorImpl { std::string full_command_; int pipefd_[2]{}; pid_t pid_{}; + size_t buffer_size_{}; }; } // namespace org::apache::nifi::minifi::processors diff --git a/extensions/libarchive/FocusArchiveEntry.cpp b/extensions/libarchive/FocusArchiveEntry.cpp index 5def48d41..6ee446ade 100644 --- a/extensions/libarchive/FocusArchiveEntry.cpp +++ b/extensions/libarchive/FocusArchiveEntry.cpp @@ -34,8 +34,13 @@ #include "core/Resource.h" #include "Exception.h" #include "SmartArchivePtrs.h" +#include "utils/ConfigurationUtils.h" #include "utils/gsl.h" +namespace { +inline constexpr auto BUFFER_SIZE = org::apache::nifi::minifi::utils::configuration::DEFAULT_BUFFER_SIZE; +} // namespace + namespace org::apache::nifi::minifi::processors { std::shared_ptr<utils::IdGenerator> FocusArchiveEntry::id_generator_ = utils::IdGenerator::getIdGenerator(); @@ -127,7 +132,7 @@ void FocusArchiveEntry::onTrigger(core::ProcessContext& context, core::ProcessSe struct FocusArchiveEntryReadData { std::shared_ptr<io::InputStream> stream; core::Processor *processor = nullptr; - std::array<std::byte, 8196> buf{}; + std::array<std::byte, BUFFER_SIZE> buf{}; }; // Read callback which reads from the flowfile stream @@ -140,7 +145,7 @@ la_ssize_t FocusArchiveEntry::ReadCallback::read_cb(struct archive * a, void *d, do { last_read = data->stream->read(data->buf); read += last_read; - } while (data->processor->isRunning() && last_read > 0 && !io::isError(last_read) && read < 8196); + } while (data->processor->isRunning() && last_read > 0 && !io::isError(last_read) && read < BUFFER_SIZE); if (!data->processor->isRunning()) { archive_set_error(a, EINTR, "Processor shut down during read"); diff --git a/extensions/libarchive/ReadArchiveStream.h b/extensions/libarchive/ReadArchiveStream.h index 910f808e4..e481fc69e 100644 --- a/extensions/libarchive/ReadArchiveStream.h +++ b/extensions/libarchive/ReadArchiveStream.h @@ -25,10 +25,10 @@ #include "io/ArchiveStream.h" #include "io/InputStream.h" #include "core/logging/LoggerFactory.h" - #include "archive_entry.h" #include "archive.h" #include "SmartArchivePtrs.h" +#include "utils/ConfigurationUtils.h" namespace org::apache::nifi::minifi::io { @@ -47,7 +47,7 @@ class ReadArchiveStreamImpl final : public InputStreamImpl, public ReadArchiveSt private: std::shared_ptr<InputStream> input_; - std::array<std::byte, 4096> buffer_{}; + std::array<std::byte, utils::configuration::DEFAULT_BUFFER_SIZE> buffer_{}; }; processors::archive_read_unique_ptr createReadArchive(); diff --git a/extensions/libarchive/UnfocusArchiveEntry.cpp b/extensions/libarchive/UnfocusArchiveEntry.cpp index bacbd4b9a..912e89b05 100644 --- a/extensions/libarchive/UnfocusArchiveEntry.cpp +++ b/extensions/libarchive/UnfocusArchiveEntry.cpp @@ -30,8 +30,13 @@ #include "core/ProcessContext.h" #include "core/ProcessSession.h" #include "core/Resource.h" +#include "utils/ConfigurationUtils.h" #include "utils/gsl.h" +namespace { +inline constexpr auto BUFFER_SIZE = org::apache::nifi::minifi::utils::configuration::DEFAULT_BUFFER_SIZE; +} // namespace + namespace org::apache::nifi::minifi::processors { void UnfocusArchiveEntry::initialize() { @@ -157,7 +162,7 @@ int64_t UnfocusArchiveEntry::WriteCallback::operator()(const std::shared_ptr<io: archive_write_open(output_archive.get(), &data, ok_cb, write_cb, ok_cb); // Iterate entries & write from tmp file to archive - std::array<char, 8192> buf{}; + std::array<char, BUFFER_SIZE> buf{}; struct stat st{}; auto entry = archive_entry_unique_ptr{archive_entry_new()}; diff --git a/extensions/libarchive/tests/CompressContentTests.cpp b/extensions/libarchive/tests/CompressContentTests.cpp index a819a7706..a9b799cfa 100644 --- a/extensions/libarchive/tests/CompressContentTests.cpp +++ b/extensions/libarchive/tests/CompressContentTests.cpp @@ -29,6 +29,7 @@ #include "FlowController.h" #include "unit/TestBase.h" #include "unit/Catch.h" +#include "catch2/generators/catch_generators.hpp" #include "core/Core.h" #include "../../include/core/FlowFile.h" #include "unit/ProvenanceTestHelper.h" @@ -151,6 +152,7 @@ class CompressDecompressionTestController : public TestController { [[nodiscard]] std::shared_ptr<core::FlowFile> importFlowFile(const std::filesystem::path& content_path) const { std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast<core::FlowFile>(helper_session->create()); + REQUIRE(std::filesystem::exists(content_path)); helper_session->import(content_path.string(), flow, true, 0); helper_session->flushContent(); input->put(flow); @@ -293,6 +295,7 @@ TEST_CASE_METHOD(CompressTestController, "CompressFileGZip", "[compressfiletest1 // validate the compress content std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords; std::shared_ptr<core::FlowFile> flow1 = output->poll(expiredFlowRecords); + REQUIRE(flow1); REQUIRE(flow1->getSize() > 0); { REQUIRE(flow1->getSize() != flow->getSize()); @@ -325,6 +328,7 @@ TEST_CASE_METHOD(DecompressTestController, "DecompressFileGZip", "[compressfilet // validate the compress content std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords; std::shared_ptr<core::FlowFile> flow1 = output->poll(expiredFlowRecords); + REQUIRE(flow1); REQUIRE(flow1->getSize() > 0); { REQUIRE(flow1->getSize() != flow->getSize()); @@ -340,6 +344,8 @@ TEST_CASE_METHOD(DecompressTestController, "DecompressFileGZip", "[compressfilet } TEST_CASE_METHOD(CompressTestController, "CompressFileBZip", "[compressfiletest3]") { + if (!archive_bzlib_version()) { return; } // minifi was compiled without BZip2 support + context->setProperty(minifi::processors::CompressContent::CompressMode.name, std::string{magic_enum::enum_name(CompressionMode::compress)}); context->setProperty(minifi::processors::CompressContent::CompressFormat.name, std::string{magic_enum::enum_name(CompressionFormat::BZIP2)}); context->setProperty(minifi::processors::CompressContent::CompressLevel.name, "9"); @@ -351,6 +357,7 @@ TEST_CASE_METHOD(CompressTestController, "CompressFileBZip", "[compressfiletest3 // validate the compress content std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords; std::shared_ptr<core::FlowFile> flow1 = output->poll(expiredFlowRecords); + REQUIRE(flow1); REQUIRE(flow1->getSize() > 0); { REQUIRE(flow1->getSize() != flow->getSize()); @@ -369,6 +376,8 @@ TEST_CASE_METHOD(CompressTestController, "CompressFileBZip", "[compressfiletest3 TEST_CASE_METHOD(DecompressTestController, "DecompressFileBZip", "[compressfiletest4]") { + if (!archive_bzlib_version()) { return; } // minifi was compiled without BZip2 support + context->setProperty(minifi::processors::CompressContent::CompressMode.name, std::string{magic_enum::enum_name(CompressionMode::decompress)}); context->setProperty(minifi::processors::CompressContent::CompressFormat.name, std::string{magic_enum::enum_name(CompressionFormat::BZIP2)}); context->setProperty(minifi::processors::CompressContent::CompressLevel.name, "9"); @@ -381,6 +390,7 @@ TEST_CASE_METHOD(DecompressTestController, "DecompressFileBZip", "[compressfilet // validate the compress content std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords; std::shared_ptr<core::FlowFile> flow1 = output->poll(expiredFlowRecords); + REQUIRE(flow1); REQUIRE(flow1->getSize() > 0); { REQUIRE(flow1->getSize() != flow->getSize()); @@ -394,6 +404,8 @@ TEST_CASE_METHOD(DecompressTestController, "DecompressFileBZip", "[compressfilet } TEST_CASE_METHOD(CompressTestController, "CompressFileLZMA", "[compressfiletest5]") { + if (!archive_liblzma_version()) { return; } // minifi was compiled without LZMA support + context->setProperty(minifi::processors::CompressContent::CompressMode.name, std::string{magic_enum::enum_name(CompressionMode::compress)}); context->setProperty(minifi::processors::CompressContent::CompressFormat.name, std::string{magic_enum::enum_name(CompressionFormat::LZMA)}); context->setProperty(minifi::processors::CompressContent::CompressLevel.name, "9"); @@ -402,15 +414,10 @@ TEST_CASE_METHOD(CompressTestController, "CompressFileLZMA", "[compressfiletest5 auto flow = importFlowFile(rawContentPath()); trigger(); - if (LogTestController::getInstance().contains("compression not supported on this platform", 20ms, 5ms)) { - // platform not support LZMA - LogTestController::getInstance().reset(); - return; - } - // validate the compress content std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords; std::shared_ptr<core::FlowFile> flow1 = output->poll(expiredFlowRecords); + REQUIRE(flow1); REQUIRE(flow1->getSize() > 0); { REQUIRE(flow1->getSize() != flow->getSize()); @@ -429,6 +436,8 @@ TEST_CASE_METHOD(CompressTestController, "CompressFileLZMA", "[compressfiletest5 TEST_CASE_METHOD(DecompressTestController, "DecompressFileLZMA", "[compressfiletest6]") { + if (!archive_liblzma_version()) { return; } // minifi was compiled without LZMA support + context->setProperty(minifi::processors::CompressContent::CompressMode.name, std::string{magic_enum::enum_name(CompressionMode::decompress)}); context->setProperty(minifi::processors::CompressContent::CompressFormat.name, std::string{magic_enum::enum_name(CompressionFormat::USE_MIME_TYPE)}); context->setProperty(minifi::processors::CompressContent::CompressLevel.name, "9"); @@ -438,15 +447,10 @@ TEST_CASE_METHOD(DecompressTestController, "DecompressFileLZMA", "[compressfilet flow->setAttribute(core::SpecialFlowAttribute::MIME_TYPE, "application/x-lzma"); trigger(); - if (LogTestController::getInstance().contains("compression not supported on this platform", 20ms, 5ms)) { - // platform not support LZMA - LogTestController::getInstance().reset(); - return; - } - // validate the compress content std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords; std::shared_ptr<core::FlowFile> flow1 = output->poll(expiredFlowRecords); + REQUIRE(flow1); REQUIRE(flow1->getSize() > 0); { REQUIRE(flow1->getSize() != flow->getSize()); @@ -460,6 +464,8 @@ TEST_CASE_METHOD(DecompressTestController, "DecompressFileLZMA", "[compressfilet } TEST_CASE_METHOD(CompressTestController, "CompressFileXYLZMA", "[compressfiletest7]") { + if (!archive_liblzma_version()) { return; } // minifi was compiled without LZMA support + context->setProperty(minifi::processors::CompressContent::CompressMode.name, std::string{magic_enum::enum_name(CompressionMode::compress)}); context->setProperty(minifi::processors::CompressContent::CompressFormat.name, std::string{magic_enum::enum_name(CompressionFormat::XZ_LZMA2)}); context->setProperty(minifi::processors::CompressContent::CompressLevel.name, "9"); @@ -468,15 +474,10 @@ TEST_CASE_METHOD(CompressTestController, "CompressFileXYLZMA", "[compressfiletes auto flow = importFlowFile(rawContentPath()); trigger(); - if (LogTestController::getInstance().contains("compression not supported on this platform", 20ms, 5ms)) { - // platform not support LZMA - LogTestController::getInstance().reset(); - return; - } - // validate the compress content std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords; std::shared_ptr<core::FlowFile> flow1 = output->poll(expiredFlowRecords); + REQUIRE(flow1); REQUIRE(flow1->getSize() > 0); { REQUIRE(flow1->getSize() != flow->getSize()); @@ -495,6 +496,8 @@ TEST_CASE_METHOD(CompressTestController, "CompressFileXYLZMA", "[compressfiletes TEST_CASE_METHOD(DecompressTestController, "DecompressFileXYLZMA", "[compressfiletest8]") { + if (!archive_liblzma_version()) { return; } // minifi was compiled without LZMA support + context->setProperty(minifi::processors::CompressContent::CompressMode.name, std::string{magic_enum::enum_name(CompressionMode::decompress)}); context->setProperty(minifi::processors::CompressContent::CompressFormat.name, std::string{magic_enum::enum_name(CompressionFormat::USE_MIME_TYPE)}); context->setProperty(minifi::processors::CompressContent::CompressLevel.name, "9"); @@ -504,15 +507,10 @@ TEST_CASE_METHOD(DecompressTestController, "DecompressFileXYLZMA", "[compressfil flow->setAttribute(core::SpecialFlowAttribute::MIME_TYPE, "application/x-xz"); trigger(); - if (LogTestController::getInstance().contains("compression not supported on this platform", 20ms, 5ms)) { - // platform not support LZMA - LogTestController::getInstance().reset(); - return; - } - // validate the compress content std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords; std::shared_ptr<core::FlowFile> flow1 = output->poll(expiredFlowRecords); + REQUIRE(flow1); REQUIRE(flow1->getSize() > 0); { REQUIRE(flow1->getSize() != flow->getSize()); @@ -675,7 +673,7 @@ TEST_CASE_METHOD(CompressTestController, "Batch CompressFileGZip", "[compressFil REQUIRE(outFiles.size() == flowFileContents.size()); for (std::size_t idx = 0; idx < outFiles.size(); ++idx) { - auto file = outFiles[idx]; + const auto& file = outFiles[idx]; std::string mime; file->getAttribute(core::SpecialFlowAttribute::MIME_TYPE, mime); REQUIRE(mime == "application/gzip"); @@ -688,29 +686,19 @@ TEST_CASE_METHOD(CompressTestController, "Batch CompressFileGZip", "[compressFil } TEST_CASE_METHOD(DecompressTestController, "Invalid archive decompression", "[compressfiletest9]") { - context->setProperty(minifi::processors::CompressContent::CompressMode.name, std::string{magic_enum::enum_name(CompressionMode::decompress)}); - SECTION("GZIP") { - context->setProperty(minifi::processors::CompressContent::CompressFormat.name, std::string{magic_enum::enum_name(CompressionFormat::GZIP)}); - } - SECTION("LZMA") { - context->setProperty(minifi::processors::CompressContent::CompressFormat.name, std::string{magic_enum::enum_name(CompressionFormat::LZMA)}); - } - SECTION("XZ_LZMA2") { - context->setProperty(minifi::processors::CompressContent::CompressFormat.name, std::string{magic_enum::enum_name(CompressionFormat::XZ_LZMA2)}); - } - SECTION("BZIP2") { - context->setProperty(minifi::processors::CompressContent::CompressFormat.name, std::string{magic_enum::enum_name(CompressionFormat::BZIP2)}); + const auto compression_format = GENERATE(CompressionFormat::GZIP, CompressionFormat::LZMA, CompressionFormat::XZ_LZMA2, CompressionFormat::BZIP2); + if (((compression_format == CompressionFormat::LZMA || compression_format == CompressionFormat::XZ_LZMA2) && !archive_liblzma_version()) || + (compression_format == CompressionFormat::BZIP2 && !archive_bzlib_version())) { + return; } + context->setProperty(minifi::processors::CompressContent::CompressFormat.name, std::string{magic_enum::enum_name(compression_format)}); + context->setProperty(minifi::processors::CompressContent::CompressMode.name, std::string{magic_enum::enum_name(CompressionMode::decompress)}); context->setProperty(minifi::processors::CompressContent::CompressLevel.name, "9"); context->setProperty(minifi::processors::CompressContent::UpdateFileName.name, "true"); importFlowFileFrom(minifi::io::BufferStream(std::string{"banana bread"})); trigger(); - if (LogTestController::getInstance().contains("compression not supported on this platform", 20ms, 5ms)) { - return; - } - // validate the compress content std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords; REQUIRE_FALSE(output->poll(expiredFlowRecords)); diff --git a/extensions/rocksdb-repos/tests/ContentSessionTests.cpp b/extensions/rocksdb-repos/tests/ContentSessionTests.cpp index 9890a971d..c1c6fdc3d 100644 --- a/extensions/rocksdb-repos/tests/ContentSessionTests.cpp +++ b/extensions/rocksdb-repos/tests/ContentSessionTests.cpp @@ -28,7 +28,7 @@ #include "FlowFileRecord.h" #include "unit/TestBase.h" #include "unit/Catch.h" -#include "utils/gsl.h" +#include "utils/ConfigurationUtils.h" template<typename ContentRepositoryClass> class ContentSessionController : public TestController { @@ -61,7 +61,7 @@ const std::shared_ptr<minifi::io::OutputStream>& operator<<(const std::shared_pt const std::shared_ptr<minifi::io::InputStream>& operator>>(const std::shared_ptr<minifi::io::InputStream>& stream, std::string& str) { str = ""; - std::array<std::byte, 4096> buffer{}; + std::array<std::byte, utils::configuration::DEFAULT_BUFFER_SIZE> buffer{}; while (true) { const auto ret = stream->read(buffer); REQUIRE_FALSE(minifi::io::isError(ret)); diff --git a/extensions/sftp/client/SFTPClient.cpp b/extensions/sftp/client/SFTPClient.cpp index 8ad8ec024..152f5cdd5 100644 --- a/extensions/sftp/client/SFTPClient.cpp +++ b/extensions/sftp/client/SFTPClient.cpp @@ -1,5 +1,4 @@ /** - * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -24,9 +23,9 @@ #include <sstream> #include <string> #include <tuple> -#include <utility> #include <vector> +#include "utils/ConfigurationUtils.h" #include "utils/StringUtils.h" #include "utils/gsl.h" #include "minifi-cpp/io/OutputStream.h" @@ -154,11 +153,12 @@ LastSFTPError::operator SFTPError() const { } -SFTPClient::SFTPClient(std::string hostname, uint16_t port, std::string username) +SFTPClient::SFTPClient(std::string hostname, uint16_t port, std::string username, const size_t buffer_size) : logger_(core::logging::LoggerFactory<SFTPClient>::getLogger()), hostname_(std::move(hostname)), port_(port), username_(std::move(username)), + buffer_size_(buffer_size), curl_errorbuffer_(CURL_ERROR_SIZE, '\0'), easy_(curl_easy_init()) { if (easy_ == nullptr) { @@ -719,8 +719,8 @@ bool SFTPClient::listDirectory(const std::string& path, bool follow_symlinks, }); LIBSSH2_SFTP_ATTRIBUTES attrs; - std::vector<char> filename(4096U); - std::vector<char> longentry(4096U); + std::vector<char> filename(buffer_size_); + std::vector<char> longentry(buffer_size_); do { int ret = libssh2_sftp_readdir_ex(dir_handle, filename.data(), diff --git a/extensions/sftp/client/SFTPClient.h b/extensions/sftp/client/SFTPClient.h index fc6b66f7a..1764f0042 100644 --- a/extensions/sftp/client/SFTPClient.h +++ b/extensions/sftp/client/SFTPClient.h @@ -111,7 +111,7 @@ class LastSFTPError { class SFTPClient { public: - SFTPClient(std::string hostname, uint16_t port, std::string username); + SFTPClient(std::string hostname, uint16_t port, std::string username, size_t buffer_size); ~SFTPClient(); @@ -196,9 +196,10 @@ class SFTPClient { std::shared_ptr<core::logging::Logger> logger_; - const std::string hostname_; - const uint16_t port_; - const std::string username_; + std::string hostname_; + uint16_t port_; + std::string username_; + size_t buffer_size_; LIBSSH2_KNOWNHOSTS *ssh_known_hosts_ = nullptr; bool strict_host_checking_ = false; diff --git a/extensions/sftp/processors/FetchSFTP.cpp b/extensions/sftp/processors/FetchSFTP.cpp index 2a16c13be..6c6ea1a5b 100644 --- a/extensions/sftp/processors/FetchSFTP.cpp +++ b/extensions/sftp/processors/FetchSFTP.cpp @@ -27,6 +27,7 @@ #include "core/Relationship.h" #include "core/Resource.h" #include "io/BufferStream.h" +#include "utils/ConfigurationUtils.h" #include "utils/StringUtils.h" #include "utils/file/FileUtils.h" #include "utils/ProcessorConfigUtils.h" @@ -89,11 +90,13 @@ void FetchSFTP::onTrigger(core::ProcessContext& context, core::ProcessSession& s common_properties.proxy_host, common_properties.proxy_port, common_properties.proxy_username}; + const auto buffer_size = utils::configuration::getBufferSize(*context.getConfiguration()); auto client = getOrCreateConnection(connection_cache_key, common_properties.password, common_properties.private_key_path, common_properties.private_key_passphrase, - common_properties.proxy_password); + common_properties.proxy_password, + buffer_size); if (client == nullptr) { context.yield(); return; diff --git a/extensions/sftp/processors/ListSFTP.cpp b/extensions/sftp/processors/ListSFTP.cpp index a683a3210..894250f4b 100644 --- a/extensions/sftp/processors/ListSFTP.cpp +++ b/extensions/sftp/processors/ListSFTP.cpp @@ -36,6 +36,7 @@ #include "core/Resource.h" #include "io/BufferStream.h" #include "rapidjson/ostreamwrapper.h" +#include "utils/ConfigurationUtils.h" #include "utils/StringUtils.h" #include "utils/TimeUtil.h" #include "utils/file/FileUtils.h" @@ -821,11 +822,13 @@ void ListSFTP::onTrigger(core::ProcessContext& context, core::ProcessSession& se common_properties.proxy_host, common_properties.proxy_port, common_properties.proxy_username}; + const auto buffer_size = utils::configuration::getBufferSize(*context.getConfiguration()); auto client = getOrCreateConnection(connection_cache_key, common_properties.password, common_properties.private_key_path, common_properties.private_key_passphrase, - common_properties.proxy_password); + common_properties.proxy_password, + buffer_size); if (client == nullptr) { context.yield(); return; diff --git a/extensions/sftp/processors/PutSFTP.cpp b/extensions/sftp/processors/PutSFTP.cpp index 9d356d178..a2ab3e157 100644 --- a/extensions/sftp/processors/PutSFTP.cpp +++ b/extensions/sftp/processors/PutSFTP.cpp @@ -28,6 +28,7 @@ #include "core/Resource.h" #include "core/logging/Logger.h" #include "io/BufferStream.h" +#include "utils/ConfigurationUtils.h" #include "utils/StringUtils.h" #include "utils/file/FileUtils.h" #include "utils/ProcessorConfigUtils.h" @@ -120,11 +121,13 @@ bool PutSFTP::processOne(core::ProcessContext& context, core::ProcessSession& se common_properties.proxy_host, common_properties.proxy_port, common_properties.proxy_username}; + const auto buffer_size = utils::configuration::getBufferSize(*context.getConfiguration()); auto client = getOrCreateConnection(connection_cache_key, common_properties.password, common_properties.private_key_path, common_properties.private_key_passphrase, - common_properties.proxy_password); + common_properties.proxy_password, + buffer_size); if (client == nullptr) { context.yield(); return false; diff --git a/extensions/sftp/processors/SFTPProcessorBase.cpp b/extensions/sftp/processors/SFTPProcessorBase.cpp index c15db998b..14721a2bc 100644 --- a/extensions/sftp/processors/SFTPProcessorBase.cpp +++ b/extensions/sftp/processors/SFTPProcessorBase.cpp @@ -237,12 +237,14 @@ std::unique_ptr<utils::SFTPClient> SFTPProcessorBase::getOrCreateConnection( const std::string& password, const std::string& private_key_path, const std::string& private_key_passphrase, - const std::string& proxy_password) { + const std::string& proxy_password, + const size_t buffer_size) { auto client = getConnectionFromCache(connection_cache_key); if (client == nullptr) { client = std::make_unique<utils::SFTPClient>(connection_cache_key.hostname, connection_cache_key.port, - connection_cache_key.username); + connection_cache_key.username, + buffer_size); if (!IsNullOrEmpty(host_key_file_)) { if (!client->setHostKeyFile(host_key_file_, strict_host_checking_)) { logger_->log_error("Cannot set host key file"); diff --git a/extensions/sftp/processors/SFTPProcessorBase.h b/extensions/sftp/processors/SFTPProcessorBase.h index ee7fb6c83..d9e199b2e 100644 --- a/extensions/sftp/processors/SFTPProcessorBase.h +++ b/extensions/sftp/processors/SFTPProcessorBase.h @@ -217,7 +217,8 @@ class SFTPProcessorBase : public core::ProcessorImpl { const std::string& password, const std::string& private_key_path, const std::string& private_key_passphrase, - const std::string& proxy_password); + const std::string& proxy_password, + size_t buffer_size); enum class CreateDirectoryHierarchyError : uint8_t { CREATE_DIRECTORY_HIERARCHY_ERROR_OK = 0, diff --git a/extensions/smb/FetchSmb.cpp b/extensions/smb/FetchSmb.cpp index fa15f92c6..a6c24c59e 100644 --- a/extensions/smb/FetchSmb.cpp +++ b/extensions/smb/FetchSmb.cpp @@ -17,6 +17,7 @@ #include "FetchSmb.h" #include "core/Resource.h" +#include "utils/ConfigurationUtils.h" #include "utils/file/FileReaderCallback.h" namespace org::apache::nifi::minifi::extensions::smb { @@ -28,6 +29,7 @@ void FetchSmb::initialize() { void FetchSmb::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { smb_connection_controller_service_ = SmbConnectionControllerService::getFromProperty(context, FetchSmb::ConnectionControllerService); + buffer_size_ = utils::configuration::getBufferSize(*context.getConfiguration()); } namespace { @@ -58,7 +60,7 @@ void FetchSmb::onTrigger(core::ProcessContext& context, core::ProcessSession& se } try { - session.write(flow_file, utils::FileReaderCallback{smb_connection_controller_service_->getPath() / getTargetRelativePath(context, *flow_file)}); + session.write(flow_file, utils::FileReaderCallback{smb_connection_controller_service_->getPath() / getTargetRelativePath(context, *flow_file), buffer_size_}); session.transfer(flow_file, Success); } catch (const utils::FileReaderCallbackIOError& io_error) { flow_file->addAttribute(ErrorCode.name, std::to_string(io_error.error_code)); diff --git a/extensions/smb/FetchSmb.h b/extensions/smb/FetchSmb.h index 54c794e8a..c69616366 100644 --- a/extensions/smb/FetchSmb.h +++ b/extensions/smb/FetchSmb.h @@ -83,6 +83,7 @@ class FetchSmb final : public core::ProcessorImpl { private: std::shared_ptr<SmbConnectionControllerService> smb_connection_controller_service_; + size_t buffer_size_{}; std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<FetchSmb>::getLogger(uuid_); }; diff --git a/extensions/standard-processors/processors/ExtractText.cpp b/extensions/standard-processors/processors/ExtractText.cpp index 45693a8a7..238860d6c 100644 --- a/extensions/standard-processors/processors/ExtractText.cpp +++ b/extensions/standard-processors/processors/ExtractText.cpp @@ -20,7 +20,6 @@ #include "ExtractText.h" #include <algorithm> -#include <iterator> #include <map> #include <memory> #include <sstream> @@ -31,14 +30,13 @@ #include "core/ProcessContext.h" #include "core/ProcessSession.h" #include "core/Resource.h" +#include "utils/ConfigurationUtils.h" #include "utils/RegexUtils.h" #include "utils/gsl.h" #include "utils/ProcessorConfigUtils.h" namespace org::apache::nifi::minifi::processors { -inline constexpr size_t MAX_BUFFER_SIZE = 4096; - void ExtractText::initialize() { setSupportedProperties(Properties); setSupportedRelationships(Relationships); @@ -51,31 +49,28 @@ void ExtractText::onTrigger(core::ProcessContext& context, core::ProcessSession& return; } - session.read(flowFile, ReadCallback{flowFile, &context, logger_}); + session.read(flowFile, ReadCallback{flowFile, context, logger_}); session.transfer(flowFile, Success); } int64_t ExtractText::ReadCallback::operator()(const std::shared_ptr<io::InputStream>& stream) const { - size_t read_size = 0; - size_t size_limit = flowFile_->getSize(); - std::vector<std::byte> buffer; - buffer.resize(std::min(gsl::narrow<size_t>(flowFile_->getSize()), MAX_BUFFER_SIZE)); - - std::string attrKey = ctx_->getProperty(Attribute).value_or(""); - std::string sizeLimitStr = ctx_->getProperty(SizeLimit).value_or(""); - bool regex_mode = ctx_->getProperty(RegexMode) | utils::andThen(parsing::parseBool) | utils::orThrow("Missing ExtractText::RegexMode despite default value"); - - if (sizeLimitStr.empty()) - sizeLimitStr = DEFAULT_SIZE_LIMIT_STR; - - if (sizeLimitStr != "0") - size_limit = static_cast<size_t>(std::stoi(sizeLimitStr)); + const auto flow_file_size = gsl::narrow<size_t>(flowFile_->getSize()); + const auto default_buffer_size = utils::configuration::getBufferSize(*ctx_->getConfiguration()); + std::vector<std::byte> buffer((std::min)(flow_file_size, default_buffer_size)); + + std::string attrKey = utils::parseOptionalProperty(*ctx_, Attribute).value_or(""); + bool regex_mode = utils::parseBoolProperty(*ctx_, RegexMode); + auto size_limit = gsl::narrow<size_t>(utils::parseU64Property(*ctx_, SizeLimit)); + if (size_limit == 0) { + size_limit = flow_file_size; + } std::ostringstream contentStream; + size_t read_size = 0; while (read_size < size_limit) { // Don't read more than config limit or the size of the buffer - const auto length = std::min(size_limit - read_size, buffer.size()); + const auto length = (std::min)(size_limit - read_size, buffer.size()); const auto ret = stream->read(std::span(buffer).subspan(0, length)); if (io::isError(ret)) { @@ -145,9 +140,9 @@ int64_t ExtractText::ReadCallback::operator()(const std::shared_ptr<io::InputStr return gsl::narrow<int64_t>(read_size); } -ExtractText::ReadCallback::ReadCallback(std::shared_ptr<core::FlowFile> flowFile, core::ProcessContext *ctx, std::shared_ptr<core::logging::Logger> lgr) +ExtractText::ReadCallback::ReadCallback(std::shared_ptr<core::FlowFile> flowFile, core::ProcessContext& ctx, std::shared_ptr<core::logging::Logger> lgr) : flowFile_(std::move(flowFile)), - ctx_(ctx), + ctx_(gsl::make_not_null(&ctx)), logger_(std::move(lgr)) { } diff --git a/extensions/standard-processors/processors/ExtractText.h b/extensions/standard-processors/processors/ExtractText.h index 948f965a7..975928578 100644 --- a/extensions/standard-processors/processors/ExtractText.h +++ b/extensions/standard-processors/processors/ExtractText.h @@ -110,12 +110,12 @@ class ExtractText : public core::ProcessorImpl { class ReadCallback { public: - ReadCallback(std::shared_ptr<core::FlowFile> flowFile, core::ProcessContext *ct, std::shared_ptr<core::logging::Logger> lgr); + ReadCallback(std::shared_ptr<core::FlowFile> flowFile, core::ProcessContext& ctx, std::shared_ptr<core::logging::Logger> lgr); int64_t operator()(const std::shared_ptr<io::InputStream>& stream) const; private: std::shared_ptr<core::FlowFile> flowFile_; - core::ProcessContext *ctx_; + gsl::not_null<core::ProcessContext*> ctx_; std::shared_ptr<core::logging::Logger> logger_; }; diff --git a/extensions/standard-processors/processors/FetchFile.cpp b/extensions/standard-processors/processors/FetchFile.cpp index bc17acee1..41fc564d3 100644 --- a/extensions/standard-processors/processors/FetchFile.cpp +++ b/extensions/standard-processors/processors/FetchFile.cpp @@ -20,6 +20,7 @@ #include <filesystem> #include <utility> +#include "utils/ConfigurationUtils.h" #include "utils/ProcessorConfigUtils.h" #include "utils/file/FileReaderCallback.h" #include "utils/file/FileUtils.h" @@ -41,6 +42,7 @@ void FetchFile::onSchedule(core::ProcessContext& context, core::ProcessSessionFa move_conflict_strategy_ = utils::parseEnumProperty<fetch_file::MoveConflictStrategyOption>(context, MoveConflictStrategy); log_level_when_file_not_found_ = utils::parseEnumProperty<utils::LogUtils::LogLevelOption>(context, LogLevelWhenFileNotFound); log_level_when_permission_denied_ = utils::parseEnumProperty<utils::LogUtils::LogLevelOption>(context, LogLevelWhenPermissionDenied); + buffer_size_ = utils::configuration::getBufferSize(*context.getConfiguration()); } std::filesystem::path FetchFile::getFileToFetch(core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file) { @@ -139,7 +141,7 @@ void FetchFile::onTrigger(core::ProcessContext& context, core::ProcessSession& s } try { - utils::FileReaderCallback callback(file_to_fetch_path); + utils::FileReaderCallback callback(file_to_fetch_path, buffer_size_); session.write(flow_file, std::move(callback)); logger_->log_debug("Fetching file '{}' successful!", file_to_fetch_path); session.transfer(flow_file, Success); diff --git a/extensions/standard-processors/processors/FetchFile.h b/extensions/standard-processors/processors/FetchFile.h index 7103a4950..8c7081348 100644 --- a/extensions/standard-processors/processors/FetchFile.h +++ b/extensions/standard-processors/processors/FetchFile.h @@ -176,6 +176,7 @@ class FetchFile final : public core::ProcessorImpl { fetch_file::MoveConflictStrategyOption move_conflict_strategy_{}; utils::LogUtils::LogLevelOption log_level_when_file_not_found_{}; utils::LogUtils::LogLevelOption log_level_when_permission_denied_{}; + size_t buffer_size_{}; std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<FetchFile>::getLogger(uuid_); }; diff --git a/extensions/standard-processors/processors/GetFile.cpp b/extensions/standard-processors/processors/GetFile.cpp index 41894fa14..ec9cbfc1c 100644 --- a/extensions/standard-processors/processors/GetFile.cpp +++ b/extensions/standard-processors/processors/GetFile.cpp @@ -30,6 +30,7 @@ #include "utils/RegexUtils.h" #include "utils/StringUtils.h" #include "utils/TimeUtil.h" +#include "utils/ConfigurationUtils.h" #include "utils/file/FileReaderCallback.h" #include "utils/file/FileUtils.h" #include "utils/ProcessorConfigUtils.h" @@ -44,6 +45,8 @@ void GetFile::initialize() { } void GetFile::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { + buffer_size_ = utils::configuration::getBufferSize(*context.getConfiguration()); + request_.batchSize = utils::parseU64Property(context, BatchSize); request_.ignoreHiddenFile = utils::parseBoolProperty(context, IgnoreHiddenFile); request_.keepSourceFile = utils::parseBoolProperty(context, KeepSourceFile); @@ -101,7 +104,7 @@ void GetFile::getSingleFile(core::ProcessSession& session, const std::filesystem flow_file->setAttribute(core::SpecialFlowAttribute::PATH, (relative_path / "").string()); try { - session.write(flow_file, utils::FileReaderCallback{file_path}); + session.write(flow_file, utils::FileReaderCallback{file_path, buffer_size_}); session.transfer(flow_file, Success); if (!request_.keepSourceFile) { std::error_code remove_error; diff --git a/extensions/standard-processors/processors/GetFile.h b/extensions/standard-processors/processors/GetFile.h index 345783f89..4ed09360b 100644 --- a/extensions/standard-processors/processors/GetFile.h +++ b/extensions/standard-processors/processors/GetFile.h @@ -195,6 +195,7 @@ class GetFile : public core::ProcessorImpl { std::queue<std::filesystem::path> directory_listing_; mutable std::mutex directory_listing_mutex_; std::atomic<std::chrono::time_point<std::chrono::system_clock>> last_listing_time_{}; + size_t buffer_size_{}; std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<GetFile>::getLogger(uuid_); }; diff --git a/extensions/standard-processors/processors/SegmentContent.cpp b/extensions/standard-processors/processors/SegmentContent.cpp index bbe4cb9f6..8cc2421bc 100644 --- a/extensions/standard-processors/processors/SegmentContent.cpp +++ b/extensions/standard-processors/processors/SegmentContent.cpp @@ -22,18 +22,19 @@ #include "core/ProcessContext.h" #include "core/ProcessSession.h" #include "core/Resource.h" +#include "utils/ConfigurationUtils.h" #include "utils/ProcessorConfigUtils.h" namespace org::apache::nifi::minifi::processors { -constexpr uint64_t BUFFER_TARGET_SIZE = 1024; - void SegmentContent::initialize() { setSupportedProperties(Properties); setSupportedRelationships(Relationships); } -void SegmentContent::onSchedule(core::ProcessContext&, core::ProcessSessionFactory&) {} +void SegmentContent::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { + buffer_size_ = utils::configuration::getBufferSize(*context.getConfiguration()); +} namespace { void updateSplitAttributesAndTransfer(core::ProcessSession& session, const std::vector<std::shared_ptr<core::FlowFile>>& splits, const core::FlowFile& original) { @@ -75,7 +76,7 @@ void SegmentContent::onTrigger(core::ProcessContext& context, core::ProcessSessi bool needs_new_segment = true; while (true) { const uint64_t segment_remaining_size = max_segment_size - current_segment_size; - const uint64_t buffer_size = std::min(BUFFER_TARGET_SIZE, segment_remaining_size); + const uint64_t buffer_size = (std::min)(uint64_t{buffer_size_}, segment_remaining_size); buffer.resize(buffer_size); num_bytes_read = ff_content_stream->read(buffer); if (io::isError(num_bytes_read)) { diff --git a/extensions/standard-processors/processors/SegmentContent.h b/extensions/standard-processors/processors/SegmentContent.h index 321bf29f3..b7c0e8027 100644 --- a/extensions/standard-processors/processors/SegmentContent.h +++ b/extensions/standard-processors/processors/SegmentContent.h @@ -73,6 +73,7 @@ class SegmentContent final : public core::ProcessorImpl { void initialize() override; private: + size_t buffer_size_{}; std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<SegmentContent>::getLogger(uuid_); }; diff --git a/extensions/standard-processors/processors/SplitContent.cpp b/extensions/standard-processors/processors/SplitContent.cpp index 0b3b4e0b7..ac1dd5d79 100644 --- a/extensions/standard-processors/processors/SplitContent.cpp +++ b/extensions/standard-processors/processors/SplitContent.cpp @@ -24,6 +24,7 @@ #include "core/ProcessContext.h" #include "core/ProcessSession.h" #include "core/Resource.h" +#include "utils/ConfigurationUtils.h" #include "utils/ProcessorConfigUtils.h" #include "utils/gsl.h" @@ -34,6 +35,7 @@ void SplitContent::initialize() { } void SplitContent::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { + buffer_size_ = utils::configuration::getBufferSize(*context.getConfiguration()); auto byte_sequence_str = utils::parseProperty(context, ByteSequence); const auto byte_sequence_format = utils::parseEnumProperty<ByteSequenceFormat>(context, ByteSequenceFormatProperty); std::vector<std::byte> byte_sequence{}; @@ -53,13 +55,14 @@ namespace { class Splitter { public: explicit Splitter(core::ProcessSession& session, std::optional<std::string> original_filename, SplitContent::ByteSequenceMatcher& byte_sequence_matcher, const bool keep_byte_sequence, - const SplitContent::ByteSequenceLocation byte_sequence_location) + const SplitContent::ByteSequenceLocation byte_sequence_location, const size_t buffer_size) : session_(session), original_filename_(std::move(original_filename)), byte_sequence_matcher_(byte_sequence_matcher), keep_trailing_byte_sequence_(keep_byte_sequence && byte_sequence_location == SplitContent::ByteSequenceLocation::Trailing), - keep_leading_byte_sequence_(keep_byte_sequence && byte_sequence_location == SplitContent::ByteSequenceLocation::Leading) { - data_before_byte_sequence_.reserve(SplitContent::BUFFER_TARGET_SIZE); + keep_leading_byte_sequence_(keep_byte_sequence && byte_sequence_location == SplitContent::ByteSequenceLocation::Leading), + buffer_size_(buffer_size) { + data_before_byte_sequence_.reserve(buffer_size_); } Splitter(const Splitter&) = delete; @@ -99,7 +102,7 @@ class Splitter { } void flushIfBufferTooLarge() { - if (data_before_byte_sequence_.size() >= SplitContent::BUFFER_TARGET_SIZE) { appendDataBeforeByteSequenceToSplit(); } + if (data_before_byte_sequence_.size() >= buffer_size_) { appendDataBeforeByteSequenceToSplit(); } } private: @@ -158,9 +161,9 @@ class Splitter { std::shared_ptr<core::FlowFile> current_split_ = nullptr; std::vector<std::shared_ptr<core::FlowFile>> completed_splits_; SplitContent::size_type matching_bytes_ = 0; - const bool keep_trailing_byte_sequence_ = false; const bool keep_leading_byte_sequence_ = false; + size_t buffer_size_{}; }; } // namespace @@ -209,7 +212,7 @@ void SplitContent::onTrigger(core::ProcessContext& context, core::ProcessSession const auto ff_content_stream = session.getFlowFileContentStream(*original); if (!ff_content_stream) { throw Exception(PROCESSOR_EXCEPTION, fmt::format("Couldn't access the ContentStream of {}", original->getUUID().to_string())); } - Splitter splitter{session, original->getAttribute(core::SpecialFlowAttribute::FILENAME), *byte_sequence_matcher_, keep_byte_sequence, byte_sequence_location_}; + Splitter splitter{session, original->getAttribute(core::SpecialFlowAttribute::FILENAME), *byte_sequence_matcher_, keep_byte_sequence, byte_sequence_location_, buffer_size_}; while (auto latest_byte = ff_content_stream->readByte()) { splitter.digest(*latest_byte); diff --git a/extensions/standard-processors/processors/SplitContent.h b/extensions/standard-processors/processors/SplitContent.h index 3019f40a2..ff626c626 100644 --- a/extensions/standard-processors/processors/SplitContent.h +++ b/extensions/standard-processors/processors/SplitContent.h @@ -96,8 +96,6 @@ class SplitContent final : public core::ProcessorImpl { EXTENSIONAPI static constexpr bool IsSingleThreaded = true; ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS - static constexpr size_type BUFFER_TARGET_SIZE = 4096; - void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) override; void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override; void initialize() override; @@ -124,6 +122,7 @@ class SplitContent final : public core::ProcessorImpl { std::optional<ByteSequenceMatcher> byte_sequence_matcher_; bool keep_byte_sequence = false; ByteSequenceLocation byte_sequence_location_ = ByteSequenceLocation::Trailing; + size_t buffer_size_{}; std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<SplitContent>::getLogger(uuid_); }; diff --git a/extensions/standard-processors/processors/SplitText.cpp b/extensions/standard-processors/processors/SplitText.cpp index 5295be313..9a9823c5d 100644 --- a/extensions/standard-processors/processors/SplitText.cpp +++ b/extensions/standard-processors/processors/SplitText.cpp @@ -24,6 +24,7 @@ #include "core/ProcessSession.h" #include "core/Resource.h" #include "core/FlowFile.h" +#include "utils/ConfigurationUtils.h" #include "utils/gsl.h" #include "utils/ProcessorConfigUtils.h" #include "io/StreamPipe.h" @@ -32,8 +33,9 @@ namespace org::apache::nifi::minifi::processors { namespace detail { -LineReader::LineReader(const std::shared_ptr<io::InputStream>& stream) - : stream_(stream) { +LineReader::LineReader(const std::shared_ptr<io::InputStream>& stream, const size_t buffer_size) + : stream_(stream), + buffer_size_(buffer_size) { if (!stream_ || stream_->size() == 0) { state_ = StreamReadState::EndOfStream; } @@ -51,7 +53,7 @@ uint8_t LineReader::getEndLineSize(size_t newline_position) { } void LineReader::setLastLineInfoAttributes(uint8_t endline_size, const std::optional<std::string>& starts_with) { - const uint64_t size_from_beginning_of_stream = (current_buffer_count_ - 1) * SPLIT_TEXT_BUFFER_SIZE + buffer_offset_; + const uint64_t size_from_beginning_of_stream = (current_buffer_count_ - 1) * buffer_size_ + buffer_offset_; if (last_line_info_) { LineInfo previous_line_info = *last_line_info_; last_line_info_->offset = previous_line_info.offset + previous_line_info.size; @@ -63,14 +65,15 @@ void LineReader::setLastLineInfoAttributes(uint8_t endline_size, const std::opti } if (starts_with) { - last_line_info_->matches_starts_with = last_line_info_->size >= starts_with->size() && - std::equal(starts_with->begin(), starts_with->end(), buffer_.begin() + last_line_info_->offset, buffer_.begin() + last_line_info_->offset + starts_with->size()); + const auto last_line_info_begin = buffer_.begin() + gsl::narrow<std::vector<char>::difference_type>(last_line_info_->offset); + const auto last_line_info_end = buffer_.begin() + gsl::narrow<std::vector<char>::difference_type>(last_line_info_->offset + starts_with->size()); + last_line_info_->matches_starts_with = (last_line_info_->size >= starts_with->size() && std::equal(starts_with->begin(), starts_with->end(), last_line_info_begin, last_line_info_end)); } } bool LineReader::readNextBuffer() { buffer_offset_ = 0; - last_read_size_ = (std::min)(gsl::narrow<size_t>(stream_->size() - read_size_), SPLIT_TEXT_BUFFER_SIZE); + last_read_size_ = (std::min)(gsl::narrow<size_t>(stream_->size() - read_size_), buffer_size_); const auto read_ret = stream_->read(as_writable_bytes(std::span(buffer_).subspan(0, last_read_size_))); if (io::isError(read_ret)) { state_ = StreamReadState::StreamReadError; @@ -100,8 +103,10 @@ std::optional<LineReader::LineInfo> LineReader::readNextLine(const std::optional return std::nullopt; } - auto endline_pos = std::find_if(buffer_.begin() + buffer_offset_, buffer_.begin() + last_read_size_, [](const auto& buffer_element) { return buffer_element == '\n'; }); - if (endline_pos != buffer_.begin() + last_read_size_) { + const auto begin = buffer_.begin() + gsl::narrow<std::vector<char>::difference_type>(buffer_offset_); + const auto end = buffer_.begin() + gsl::narrow<std::vector<char>::difference_type>(last_read_size_); + auto endline_pos = std::find_if(begin, end, [](const auto& buffer_element) { return buffer_element == '\n'; }); + if (endline_pos != end) { const auto line_length = std::distance(buffer_.begin(), endline_pos); buffer_offset_ = line_length + 1; return finalizeLineInfo(getEndLineSize(line_length), starts_with); @@ -128,7 +133,7 @@ class SplitTextFragmentGenerator { uint8_t endline_size = 0; }; - SplitTextFragmentGenerator(const std::shared_ptr<io::InputStream>& stream, const SplitTextConfiguration& split_text_config); + SplitTextFragmentGenerator(const std::shared_ptr<io::InputStream>& stream, const SplitTextConfiguration& split_text_config, size_t buffer_size); std::optional<Fragment> readNextFragment(); nonstd::expected<Fragment, const char*> readHeaderFragment(); [[nodiscard]] detail::StreamReadState getState() const { return line_reader_.getState(); } @@ -151,7 +156,7 @@ class SplitTextFragmentGenerator { class ReadCallback { public: ReadCallback(std::shared_ptr<core::FlowFile> flow_file, const SplitTextConfiguration& split_text_config, - core::ProcessSession& session, std::shared_ptr<core::logging::Logger> logger); + core::ProcessSession& session, size_t buffer_size, std::shared_ptr<core::logging::Logger> logger); int64_t operator()(const std::shared_ptr<io::InputStream>& stream); std::optional<const char*> error; std::vector<std::shared_ptr<org::apache::nifi::minifi::core::FlowFile>> results; @@ -168,11 +173,12 @@ class ReadCallback { core::ProcessSession& session_; size_t emitted_fragment_index_ = 1; const std::string fragment_identifier_ = utils::IdGenerator::getIdGenerator()->generate().to_string(); + size_t buffer_size_{}; std::shared_ptr<core::logging::Logger> logger_; }; -SplitTextFragmentGenerator::SplitTextFragmentGenerator(const std::shared_ptr<io::InputStream>& stream, const SplitTextConfiguration& split_text_config) - : line_reader_(stream), +SplitTextFragmentGenerator::SplitTextFragmentGenerator(const std::shared_ptr<io::InputStream>& stream, const SplitTextConfiguration& split_text_config, const size_t buffer_size) + : line_reader_(stream, buffer_size), split_text_config_(split_text_config) { } @@ -277,10 +283,11 @@ std::optional<SplitTextFragmentGenerator::Fragment> SplitTextFragmentGenerator:: } ReadCallback::ReadCallback(std::shared_ptr<core::FlowFile> flow_file, const SplitTextConfiguration& split_text_config, - core::ProcessSession& session, std::shared_ptr<core::logging::Logger> logger) + core::ProcessSession& session, const size_t buffer_size, std::shared_ptr<core::logging::Logger> logger) : flow_file_(std::move(flow_file)), split_text_config_(split_text_config), session_(session), + buffer_size_(buffer_size), logger_(std::move(logger)) { } @@ -348,7 +355,7 @@ void ReadCallback::createFragmentFlowWithoutHeader(const SplitTextFragmentGenera } int64_t ReadCallback::operator()(const std::shared_ptr<io::InputStream>& stream) { - SplitTextFragmentGenerator fragment_generator(stream, split_text_config_); + SplitTextFragmentGenerator fragment_generator(stream, split_text_config_, buffer_size_); nonstd::expected<SplitTextFragmentGenerator::Fragment, const char*> header_fragment; std::shared_ptr<core::FlowFile> header_flow; // cache header flow file to avoid cloning it for each fragment if (split_text_config_.header_line_count > 0 || split_text_config_.header_line_marker_characters) { @@ -390,6 +397,7 @@ void SplitText::initialize() { } void SplitText::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& /*sessionFactory*/) { + buffer_size_ = utils::configuration::getBufferSize(*context.getConfiguration()); split_text_config_.line_split_count = utils::parseU64Property(context, LineSplitCount); logger_->log_debug("SplitText line split count: {}", split_text_config_.line_split_count); if (const auto max_fragment_data_size_value = utils::parseOptionalDataSizeProperty(context, MaximumFragmentSize)) { @@ -405,9 +413,10 @@ void SplitText::onSchedule(core::ProcessContext& context, core::ProcessSessionFa split_text_config_.header_line_count = utils::parseU64Property(context, HeaderLineCount); logger_->log_debug("SplitText header line count: {}", split_text_config_.header_line_count); split_text_config_.header_line_marker_characters = context.getProperty(HeaderLineMarkerCharacters) | utils::toOptional(); - if (split_text_config_.header_line_marker_characters && split_text_config_.header_line_marker_characters->size() >= detail::SPLIT_TEXT_BUFFER_SIZE) { + if (split_text_config_.header_line_marker_characters && split_text_config_.header_line_marker_characters->size() >= buffer_size_) { + gsl_Expects(buffer_size_ >= 1); throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("SplitText header line marker characters length is larger than the maximum allowed: {} > {}", - split_text_config_.header_line_marker_characters->size(), detail::SPLIT_TEXT_BUFFER_SIZE - 1)); + split_text_config_.header_line_marker_characters->size(), buffer_size_ - 1)); } if (split_text_config_.header_line_marker_characters) { logger_->log_debug("SplitText header line marker characters were set: {}", *split_text_config_.header_line_marker_characters); @@ -423,7 +432,7 @@ void SplitText::onTrigger(core::ProcessContext& context, core::ProcessSession& s return; } - ReadCallback callback{flow_file, split_text_config_, session, logger_}; + ReadCallback callback{flow_file, split_text_config_, session, buffer_size_, logger_}; session.read(flow_file, std::ref(callback)); if (callback.error) { logger_->log_error("Splitting flow file failed with error: {}", *callback.error); diff --git a/extensions/standard-processors/processors/SplitText.h b/extensions/standard-processors/processors/SplitText.h index c4944214a..2bb37f20c 100644 --- a/extensions/standard-processors/processors/SplitText.h +++ b/extensions/standard-processors/processors/SplitText.h @@ -45,8 +45,6 @@ struct SplitTextConfiguration { namespace detail { -inline constexpr size_t SPLIT_TEXT_BUFFER_SIZE = 8192; - enum class StreamReadState { Ok, StreamReadError, @@ -64,9 +62,9 @@ class LineReader { bool operator==(const LineInfo& line_info) const = default; }; - explicit LineReader(const std::shared_ptr<io::InputStream>& stream); + explicit LineReader(const std::shared_ptr<io::InputStream>& stream, size_t buffer_size); std::optional<LineInfo> readNextLine(const std::optional<std::string>& starts_with = std::nullopt); - StreamReadState getState() const { return state_; } + [[nodiscard]] StreamReadState getState() const { return state_; } private: uint8_t getEndLineSize(size_t newline_position); @@ -78,8 +76,9 @@ class LineReader { uint64_t current_buffer_count_ = 0; size_t last_read_size_ = 0; uint64_t read_size_ = 0; - std::array<char, SPLIT_TEXT_BUFFER_SIZE> buffer_{}; std::shared_ptr<io::InputStream> stream_; + size_t buffer_size_; + std::vector<char> buffer_ = std::vector<char>(buffer_size_); std::optional<LineInfo> last_line_info_; StreamReadState state_ = StreamReadState::Ok; }; @@ -174,6 +173,7 @@ class SplitText : public core::ProcessorImpl { private: SplitTextConfiguration split_text_config_; + size_t buffer_size_{}; std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<SplitText>::getLogger(uuid_); }; diff --git a/extensions/standard-processors/processors/TailFile.cpp b/extensions/standard-processors/processors/TailFile.cpp index c1d714445..5082a8c57 100644 --- a/extensions/standard-processors/processors/TailFile.cpp +++ b/extensions/standard-processors/processors/TailFile.cpp @@ -19,7 +19,6 @@ */ #include <algorithm> -#include <array> #include <iostream> #include <map> #include <unordered_map> @@ -31,6 +30,7 @@ #include "range/v3/action/sort.hpp" #include "io/CRCStream.h" +#include "utils/ConfigurationUtils.h" #include "utils/file/FileUtils.h" #include "utils/file/PathUtils.h" #include "utils/StringUtils.h" @@ -98,16 +98,16 @@ void openFile(const std::filesystem::path& file_path, uint64_t offset, std::ifst } } -constexpr std::size_t BUFFER_SIZE = 4096; - class FileReaderCallback { public: FileReaderCallback(const std::filesystem::path& file_path, uint64_t offset, char input_delimiter, - uint64_t checksum) + uint64_t checksum, + size_t buffer_size) : input_delimiter_(input_delimiter), - checksum_(checksum) { + checksum_(checksum), + buffer_size_(buffer_size) { openFile(file_path, offset, input_stream_, logger_); } @@ -159,12 +159,13 @@ class FileReaderCallback { } private: - char input_delimiter_; - uint64_t checksum_; + char input_delimiter_{}; + uint64_t checksum_{}; std::ifstream input_stream_; + size_t buffer_size_{}; std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<TailFile>::getLogger(); - std::array<char, BUFFER_SIZE> buffer_{}; + std::vector<char> buffer_ = std::vector<char>(buffer_size_); char *begin_ = buffer_.data(); char *end_ = buffer_.data(); @@ -175,8 +176,10 @@ class WholeFileReaderCallback { public: WholeFileReaderCallback(const std::filesystem::path& file_path, uint64_t offset, - uint64_t checksum) - : checksum_(checksum) { + uint64_t checksum, + size_t buffer_size) + : checksum_(checksum), + buffer_size_(buffer_size) { openFile(file_path, offset, input_stream_, logger_); } @@ -185,14 +188,14 @@ class WholeFileReaderCallback { } int64_t operator()(const std::shared_ptr<io::OutputStream>& output_stream) { - std::array<char, BUFFER_SIZE> buffer{}; + std::vector<char> buffer(buffer_size_); io::CRCStream<io::OutputStream> crc_stream{gsl::make_not_null(output_stream.get()), checksum_}; uint64_t num_bytes_written = 0; while (input_stream_.good()) { - input_stream_.read(buffer.data(), buffer.size()); + input_stream_.read(buffer.data(), gsl::narrow<std::streamsize>(buffer.size())); const auto num_bytes_read = input_stream_.gcount(); logger_->log_trace("Read {} bytes of input", std::intmax_t{num_bytes_read}); @@ -211,6 +214,7 @@ class WholeFileReaderCallback { private: uint64_t checksum_; std::ifstream input_stream_; + size_t buffer_size_; std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<TailFile>::getLogger(); }; @@ -234,6 +238,7 @@ void TailFile::initialize() { } void TailFile::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { + buffer_size_ = utils::configuration::getBufferSize(*context.getConfiguration()); tail_states_.clear(); state_manager_ = context.getStateManager(); @@ -464,9 +469,9 @@ bool TailFile::getStateFromLegacyStateFile(const core::ProcessContext& context, } std::map<std::filesystem::path, TailState> legacy_tail_states; - std::array<char, BUFFER_SIZE> buf{}; - for (file.getline(buf.data(), BUFFER_SIZE); file.good(); file.getline(buf.data(), BUFFER_SIZE)) { - parseStateFileLine(buf.data(), legacy_tail_states); + std::vector<char> buffer(buffer_size_); + for (file.getline(buffer.data(), gsl::narrow<std::streamsize>(buffer_size_)); file.good(); file.getline(buffer.data(), gsl::narrow<std::streamsize>(buffer_size_))) { + parseStateFileLine(buffer.data(), legacy_tail_states); } new_tail_states = update_keys_in_legacy_states(legacy_tail_states); @@ -676,7 +681,7 @@ void TailFile::processSingleFile(core::ProcessSession& session, logger_->log_trace("Looking for delimiter 0x{:X}", *delimiter_); std::size_t num_flow_files = 0; - FileReaderCallback file_reader{full_file_name, state.position_, *delimiter_, state.checksum_}; + FileReaderCallback file_reader{full_file_name, state.position_, *delimiter_, state.checksum_, buffer_size_}; TailState state_copy{state}; while (file_reader.hasMoreToRead() && (!batch_size_ || *batch_size_ > num_flow_files)) { @@ -699,7 +704,7 @@ void TailFile::processSingleFile(core::ProcessSession& session, logger_->log_info("{} flowfiles were received from TailFile input", num_flow_files); } else { - WholeFileReaderCallback file_reader{full_file_name, state.position_, state.checksum_}; + WholeFileReaderCallback file_reader{full_file_name, state.position_, state.checksum_, buffer_size_}; auto flow_file = session.create(); session.write(flow_file, std::ref(file_reader)); diff --git a/extensions/standard-processors/processors/TailFile.h b/extensions/standard-processors/processors/TailFile.h index f39a8894a..d70b024d1 100644 --- a/extensions/standard-processors/processors/TailFile.h +++ b/extensions/standard-processors/processors/TailFile.h @@ -283,6 +283,7 @@ class TailFile : public core::ProcessorImpl { controllers::AttributeProviderService* attribute_provider_service_ = nullptr; std::unordered_map<std::string, controllers::AttributeProviderService::AttributeMap> extra_attributes_; std::optional<uint32_t> batch_size_; + size_t buffer_size_{}; std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<TailFile>::getLogger(uuid_); }; diff --git a/extensions/standard-processors/tests/unit/SplitContentTests.cpp b/extensions/standard-processors/tests/unit/SplitContentTests.cpp index 25fa95d27..1005bffd3 100644 --- a/extensions/standard-processors/tests/unit/SplitContentTests.cpp +++ b/extensions/standard-processors/tests/unit/SplitContentTests.cpp @@ -22,6 +22,7 @@ #include "unit/Catch.h" #include "unit/SingleProcessorTestController.h" #include "unit/TestBase.h" +#include "utils/ConfigurationUtils.h" namespace org::apache::nifi::minifi::processors::test { @@ -418,7 +419,8 @@ TEST_CASE("ByteSequenceAtBufferTargetSize") { minifi::test::SingleProcessorTestController controller{std::make_unique<SplitContent>("SplitContent")}; const auto split_content = controller.getProcessor(); - auto x = SplitContent::BUFFER_TARGET_SIZE-10; + static_assert(utils::configuration::DEFAULT_BUFFER_SIZE >= 10); + auto x = utils::configuration::DEFAULT_BUFFER_SIZE - 10; auto [pre_fix_size, separator_size, post_fix_size] = GENERATE_COPY( std::make_tuple(x, x, x), diff --git a/extensions/standard-processors/tests/unit/SplitTextTests.cpp b/extensions/standard-processors/tests/unit/SplitTextTests.cpp index 3d56232c5..b44d56d19 100644 --- a/extensions/standard-processors/tests/unit/SplitTextTests.cpp +++ b/extensions/standard-processors/tests/unit/SplitTextTests.cpp @@ -22,18 +22,21 @@ #include "processors/SplitText.h" #include "unit/SingleProcessorTestController.h" #include "io/BufferStream.h" +#include "utils/ConfigurationUtils.h" namespace org::apache::nifi::minifi::test { +inline constexpr auto BUFFER_SIZE = minifi::utils::configuration::DEFAULT_BUFFER_SIZE; + TEST_CASE("Test LineReader with nullptr") { - processors::detail::LineReader reader{nullptr}; + processors::detail::LineReader reader{nullptr, BUFFER_SIZE}; CHECK(reader.readNextLine() == std::nullopt); CHECK(reader.getState() == processors::detail::StreamReadState::EndOfStream); } TEST_CASE("Test LineReader with empty stream") { auto stream = std::make_shared<io::BufferStream>(); - processors::detail::LineReader reader{stream}; + processors::detail::LineReader reader{stream, BUFFER_SIZE}; CHECK(reader.readNextLine() == std::nullopt); CHECK(reader.getState() == processors::detail::StreamReadState::EndOfStream); } @@ -42,7 +45,7 @@ TEST_CASE("Test LineReader with trailing endline") { auto stream = std::make_shared<io::BufferStream>(); std::string input = "this is a new line\nand another line\r\nthirdline\n"; stream->write(reinterpret_cast<const uint8_t*>(input.data()), input.size()); - processors::detail::LineReader reader{stream}; + processors::detail::LineReader reader{stream, BUFFER_SIZE}; CHECK(reader.readNextLine() == processors::detail::LineReader::LineInfo{.offset = 0, .size = 19, .endline_size = 1}); CHECK(reader.readNextLine() == processors::detail::LineReader::LineInfo{.offset = 19, .size = 18, .endline_size = 2}); CHECK(reader.readNextLine() == processors::detail::LineReader::LineInfo{.offset = 37, .size = 10, .endline_size = 1}); @@ -54,7 +57,7 @@ TEST_CASE("Test LineReader without trailing endlines") { auto stream = std::make_shared<io::BufferStream>(); std::string input = "this is a new line\nand another line\r\nthirdline"; stream->write(reinterpret_cast<const uint8_t*>(input.data()), input.size()); - processors::detail::LineReader reader{stream}; + processors::detail::LineReader reader{stream, BUFFER_SIZE}; CHECK(reader.readNextLine() == processors::detail::LineReader::LineInfo{.offset = 0, .size = 19, .endline_size = 1}); CHECK(reader.readNextLine() == processors::detail::LineReader::LineInfo{.offset = 19, .size = 18, .endline_size = 2}); CHECK(reader.readNextLine() == processors::detail::LineReader::LineInfo{.offset = 37, .size = 9, .endline_size = 0}); @@ -64,11 +67,11 @@ TEST_CASE("Test LineReader without trailing endlines") { TEST_CASE("Test LineReader with input larger than buffer length") { auto stream = std::make_shared<io::BufferStream>(); - const auto first_line_size = static_cast<size_t>(processors::detail::SPLIT_TEXT_BUFFER_SIZE * 1.5); - const auto second_line_size = static_cast<size_t>(processors::detail::SPLIT_TEXT_BUFFER_SIZE * 1.7); + const auto first_line_size = static_cast<size_t>(BUFFER_SIZE * 1.5); + const auto second_line_size = static_cast<size_t>(BUFFER_SIZE * 1.7); std::string input = std::string(first_line_size, 'a') + "\n" + std::string(second_line_size, 'b') + "\n"; stream->write(reinterpret_cast<const uint8_t*>(input.data()), input.size()); - processors::detail::LineReader reader{stream}; + processors::detail::LineReader reader{stream, BUFFER_SIZE}; CHECK(reader.readNextLine() == processors::detail::LineReader::LineInfo{.offset = 0, .size = first_line_size + 1, .endline_size = 1}); CHECK(reader.readNextLine() == processors::detail::LineReader::LineInfo{.offset = first_line_size +1 , .size = second_line_size + 1, .endline_size = 1}); CHECK(reader.readNextLine() == std::nullopt); @@ -77,23 +80,23 @@ TEST_CASE("Test LineReader with input larger than buffer length") { TEST_CASE("Test LineReader with input of same size as buffer length") { auto stream = std::make_shared<io::BufferStream>(); - std::string input = std::string(processors::detail::SPLIT_TEXT_BUFFER_SIZE - 1, 'a') + "\n" + std::string(processors::detail::SPLIT_TEXT_BUFFER_SIZE * 2 - 1, 'b') + "\n"; + std::string input = std::string(BUFFER_SIZE - 1, 'a') + "\n" + std::string(BUFFER_SIZE * 2 - 1, 'b') + "\n"; stream->write(reinterpret_cast<const uint8_t*>(input.data()), input.size()); - processors::detail::LineReader reader{stream}; - CHECK(reader.readNextLine() == processors::detail::LineReader::LineInfo{.offset = 0, .size = processors::detail::SPLIT_TEXT_BUFFER_SIZE, .endline_size = 1}); + processors::detail::LineReader reader{stream, BUFFER_SIZE}; + CHECK(reader.readNextLine() == processors::detail::LineReader::LineInfo{.offset = 0, .size = BUFFER_SIZE, .endline_size = 1}); CHECK(reader.readNextLine() == - processors::detail::LineReader::LineInfo{.offset = processors::detail::SPLIT_TEXT_BUFFER_SIZE, .size = processors::detail::SPLIT_TEXT_BUFFER_SIZE * 2, .endline_size = 1}); + processors::detail::LineReader::LineInfo{.offset = BUFFER_SIZE, .size = BUFFER_SIZE * 2, .endline_size = 1}); CHECK(reader.readNextLine() == std::nullopt); CHECK(reader.getState() == processors::detail::StreamReadState::EndOfStream); } TEST_CASE("Test LineReader with input larger than buffer length without trailing endline") { auto stream = std::make_shared<io::BufferStream>(); - const auto first_line_size = static_cast<size_t>(processors::detail::SPLIT_TEXT_BUFFER_SIZE * 1.5); - const auto second_line_size = static_cast<size_t>(processors::detail::SPLIT_TEXT_BUFFER_SIZE * 1.7); + const auto first_line_size = static_cast<size_t>(BUFFER_SIZE * 1.5); + const auto second_line_size = static_cast<size_t>(BUFFER_SIZE * 1.7); std::string input = std::string(first_line_size, 'a') + "\n" + std::string(second_line_size, 'b'); stream->write(reinterpret_cast<const uint8_t*>(input.data()), input.size()); - processors::detail::LineReader reader{stream}; + processors::detail::LineReader reader{stream, BUFFER_SIZE}; CHECK(reader.readNextLine() == processors::detail::LineReader::LineInfo{.offset = 0, .size = first_line_size + 1, .endline_size = 1}); CHECK(reader.readNextLine() == processors::detail::LineReader::LineInfo{.offset = first_line_size + 1, .size = second_line_size, .endline_size = 0}); CHECK(reader.readNextLine() == std::nullopt); @@ -102,12 +105,12 @@ TEST_CASE("Test LineReader with input larger than buffer length without trailing TEST_CASE("Test LineReader with input of same size as buffer length without trailing endline") { auto stream = std::make_shared<io::BufferStream>(); - std::string input = std::string(processors::detail::SPLIT_TEXT_BUFFER_SIZE - 1, 'a') + "\n" + std::string(processors::detail::SPLIT_TEXT_BUFFER_SIZE * 2, 'b'); + std::string input = std::string(BUFFER_SIZE - 1, 'a') + "\n" + std::string(BUFFER_SIZE * 2, 'b'); stream->write(reinterpret_cast<const uint8_t*>(input.data()), input.size()); - processors::detail::LineReader reader{stream}; - CHECK(reader.readNextLine() == processors::detail::LineReader::LineInfo{.offset = 0, .size = processors::detail::SPLIT_TEXT_BUFFER_SIZE, .endline_size = 1}); + processors::detail::LineReader reader{stream, BUFFER_SIZE}; + CHECK(reader.readNextLine() == processors::detail::LineReader::LineInfo{.offset = 0, .size = BUFFER_SIZE, .endline_size = 1}); CHECK(reader.readNextLine() == - processors::detail::LineReader::LineInfo{.offset = processors::detail::SPLIT_TEXT_BUFFER_SIZE, .size = processors::detail::SPLIT_TEXT_BUFFER_SIZE * 2, .endline_size = 0}); + processors::detail::LineReader::LineInfo{.offset = BUFFER_SIZE, .size = BUFFER_SIZE * 2, .endline_size = 0}); CHECK(reader.readNextLine() == std::nullopt); CHECK(reader.getState() == processors::detail::StreamReadState::EndOfStream); } @@ -116,7 +119,7 @@ TEST_CASE("Test LineReader with starts with filter") { auto stream = std::make_shared<io::BufferStream>(); std::string input = "header this is a new line\nheader and another line\r\nthirdline\nheader line\n"; stream->write(reinterpret_cast<const uint8_t*>(input.data()), input.size()); - processors::detail::LineReader reader{stream}; + processors::detail::LineReader reader{stream, BUFFER_SIZE}; CHECK(reader.readNextLine("header") == processors::detail::LineReader::LineInfo{.offset = 0, .size = 26, .endline_size = 1, .matches_starts_with = true}); CHECK(reader.readNextLine("header") == processors::detail::LineReader::LineInfo{.offset = 26, .size = 25, .endline_size = 2, .matches_starts_with = true}); CHECK(reader.readNextLine("header") == processors::detail::LineReader::LineInfo{.offset = 51, .size = 10, .endline_size = 1, .matches_starts_with = false}); @@ -211,7 +214,7 @@ TEST_CASE("Header Line Marker Characters size cannot be equal or larger than spl SingleProcessorTestController controller{std::make_unique<processors::SplitText>("SplitText")}; const auto split_text = controller.getProcessor(); split_text->setProperty(processors::SplitText::LineSplitCount.name, "1"); - std::string header_marker_character(processors::detail::SPLIT_TEXT_BUFFER_SIZE, 'A'); + std::string header_marker_character(BUFFER_SIZE, 'A'); split_text->setProperty(processors::SplitText::HeaderLineMarkerCharacters.name, header_marker_character); REQUIRE_THROWS_AS(controller.trigger("", {}), minifi::Exception); } @@ -249,7 +252,7 @@ TEST_CASE("SplitText creates new flow file for a single line") { expected_results[0].content = line; } SECTION("Line size larger than buffer size") { - line = std::string(static_cast<size_t>(processors::detail::SPLIT_TEXT_BUFFER_SIZE * 1.5), 'a') + "\n"; + line = std::string(static_cast<size_t>(BUFFER_SIZE * 1.5), 'a') + "\n"; expected_results[0].content = line; } SECTION("Content without endline is a single line") { @@ -294,12 +297,12 @@ TEST_CASE("SplitText creates new flow file with 2 lines") { expected_results[0].content = "this is a new line\r\nand another line"; } SECTION("Line size larger than buffer size") { - std::string str(static_cast<size_t>(processors::detail::SPLIT_TEXT_BUFFER_SIZE * 1.5), 'a'); + std::string str(static_cast<size_t>(BUFFER_SIZE * 1.5), 'a'); input = str + "\n" + str + "\n"; expected_results[0].content = input; } SECTION("Line size larger than buffer size without endline at the end") { - std::string str(static_cast<size_t>(processors::detail::SPLIT_TEXT_BUFFER_SIZE * 1.5), 'a'); + std::string str(static_cast<size_t>(BUFFER_SIZE * 1.5), 'a'); input = str + "\n" + str; expected_results[0].content = input; } @@ -337,13 +340,13 @@ TEST_CASE("SplitText creates separate flow files from 2 lines") { expected_results[1].content = "and another line\r\n"; } SECTION("Line size larger than buffer size") { - std::string str(static_cast<size_t>(processors::detail::SPLIT_TEXT_BUFFER_SIZE * 1.5), 'a'); + std::string str(static_cast<size_t>(BUFFER_SIZE * 1.5), 'a'); input = str + "\n" + str + "\n"; expected_results[0].content = str + "\n"; expected_results[1].content = str + "\n"; } SECTION("Line size larger than buffer size without endline at the end") { - std::string str(static_cast<size_t>(processors::detail::SPLIT_TEXT_BUFFER_SIZE * 1.5), 'a'); + std::string str(static_cast<size_t>(BUFFER_SIZE * 1.5), 'a'); input = str + "\n" + str; expected_results[0].content = str + "\n"; expected_results[1].content = str; diff --git a/libminifi/include/controllers/SSLContextService.h b/libminifi/include/controllers/SSLContextService.h index 16f94c978..5009d54ee 100644 --- a/libminifi/include/controllers/SSLContextService.h +++ b/libminifi/include/controllers/SSLContextService.h @@ -43,6 +43,7 @@ #include "core/PropertyDefinition.h" #include "core/PropertyDefinitionBuilder.h" #include "minifi-cpp/core/PropertyValidator.h" +#include "utils/ConfigurationUtils.h" #include "utils/Export.h" #include "utils/tls/CertificateUtils.h" #include "minifi-cpp/controllers/SSLContextService.h" @@ -254,13 +255,14 @@ class SSLContextServiceImpl : public core::controller::ControllerServiceImpl, pu #endif // WIN32 static std::string getLatestOpenSSLErrorString() { - unsigned long err = ERR_peek_last_error(); // NOLINT - if (err == 0U) { + const auto err = ERR_peek_last_error(); + if (err == 0) { return ""; } - char buf[4096]; - ERR_error_string_n(err, buf, sizeof(buf)); - return buf; + static_assert(utils::configuration::DEFAULT_BUFFER_SIZE >= 1); + std::array<char, utils::configuration::DEFAULT_BUFFER_SIZE> buffer{}; + ERR_error_string_n(err, buffer.data(), buffer.size() - 1); + return {buffer.data()}; } static bool isFileTypeP12(const std::filesystem::path& filename) { diff --git a/libminifi/include/core/logging/LoggerBase.h b/libminifi/include/core/logging/LoggerBase.h index 69142b473..ab2737ba2 100644 --- a/libminifi/include/core/logging/LoggerBase.h +++ b/libminifi/include/core/logging/LoggerBase.h @@ -30,6 +30,7 @@ #include "spdlog/common.h" #include "spdlog/logger.h" #include "utils/gsl.h" +#include "utils/ConfigurationUtils.h" #include "utils/Enum.h" #include "utils/GeneralUtils.h" #include "fmt/chrono.h" @@ -39,7 +40,7 @@ namespace org::apache::nifi::minifi::core::logging { -inline constexpr size_t LOG_BUFFER_SIZE = 4096; +inline constexpr size_t LOG_BUFFER_SIZE = utils::configuration::DEFAULT_BUFFER_SIZE; class LoggerControl { public: diff --git a/libminifi/src/Configuration.cpp b/libminifi/src/Configuration.cpp index e9bf0ba1c..faee19047 100644 --- a/libminifi/src/Configuration.cpp +++ b/libminifi/src/Configuration.cpp @@ -51,6 +51,7 @@ const std::unordered_map<std::string_view, gsl::not_null<const core::PropertyVal {Configuration::nifi_provenance_repository_directory_default, gsl::make_not_null(&core::StandardPropertyValidators::ALWAYS_VALID_VALIDATOR)}, {Configuration::nifi_flowfile_repository_directory_default, gsl::make_not_null(&core::StandardPropertyValidators::ALWAYS_VALID_VALIDATOR)}, {Configuration::nifi_dbcontent_repository_directory_default, gsl::make_not_null(&core::StandardPropertyValidators::ALWAYS_VALID_VALIDATOR)}, + {Configuration::nifi_default_internal_buffer_size, gsl::make_not_null(&core::StandardPropertyValidators::ALWAYS_VALID_VALIDATOR)}, {Configuration::nifi_flowfile_repository_rocksdb_compaction_period, gsl::make_not_null(&core::StandardPropertyValidators::TIME_PERIOD_VALIDATOR)}, {Configuration::nifi_dbcontent_repository_rocksdb_compaction_period, gsl::make_not_null(&core::StandardPropertyValidators::TIME_PERIOD_VALIDATOR)}, {Configuration::nifi_content_repository_rocksdb_use_synchronous_writes, gsl::make_not_null(&core::StandardPropertyValidators::BOOLEAN_VALIDATOR)}, diff --git a/libminifi/src/Configure.cpp b/libminifi/src/Configure.cpp index 56e44f75c..3e895735e 100644 --- a/libminifi/src/Configure.cpp +++ b/libminifi/src/Configure.cpp @@ -135,8 +135,8 @@ bool ConfigureImpl::commitChanges() { return success; } -std::shared_ptr<Configure> Configure::create() { - return std::make_shared<ConfigureImpl>(); +gsl::not_null<std::shared_ptr<Configure>> Configure::create() { + return gsl::make_not_null<std::shared_ptr<Configure>>(std::make_shared<ConfigureImpl>()); } } // namespace org::apache::nifi::minifi diff --git a/libminifi/src/c2/ControllerSocketProtocol.cpp b/libminifi/src/c2/ControllerSocketProtocol.cpp index 89c4dc963..ce070e30b 100644 --- a/libminifi/src/c2/ControllerSocketProtocol.cpp +++ b/libminifi/src/c2/ControllerSocketProtocol.cpp @@ -30,6 +30,7 @@ #include "io/AsioStream.h" #include "asio/ssl/stream.hpp" #include "asio/detached.hpp" +#include "utils/ConfigurationUtils.h" #include "utils/net/AsioSocketUtils.h" #include "c2/C2Utils.h" @@ -396,7 +397,7 @@ void ControllerSocketProtocol::writeDebugBundleResponse(io::BaseStream &stream) size_t bundle_size = bundle.value()->size(); resp.write(bundle_size); - const size_t BUFFER_SIZE = 4096; + static constexpr auto BUFFER_SIZE = utils::configuration::DEFAULT_BUFFER_SIZE; std::array<std::byte, BUFFER_SIZE> out_buffer{}; while (bundle_size > 0) { const auto next_write_size = (std::min)(bundle_size, BUFFER_SIZE); diff --git a/libminifi/src/utils/tls/CertificateUtils.cpp b/libminifi/src/utils/tls/CertificateUtils.cpp index 0e941ea2b..2bfcd0136 100644 --- a/libminifi/src/utils/tls/CertificateUtils.cpp +++ b/libminifi/src/utils/tls/CertificateUtils.cpp @@ -31,6 +31,7 @@ #include "openssl/ssl.h" #endif // WIN32 +#include "utils/ConfigurationUtils.h" #include "utils/StringUtils.h" #include "utils/tls/TLSUtils.h" #include "utils/TimeUtil.h" @@ -43,13 +44,14 @@ const ssl_error_category& ssl_error_category::get() { } std::string ssl_error_category::message(int value) const { - auto err = gsl::narrow<unsigned long>(value); // NOLINT + const auto err = gsl::narrow<unsigned long>(value); // NOLINT(runtime/int,google-runtime-int) long due to SSL lib API if (err == 0) { return ""; } - std::array<char, 4096> buf{}; - ERR_error_string_n(err, buf.data(), buf.size()); - return buf.data(); + static_assert(utils::configuration::DEFAULT_BUFFER_SIZE >= 1); + std::array<char, utils::configuration::DEFAULT_BUFFER_SIZE> buffer{}; + ERR_error_string_n(err, buffer.data(), buffer.size() - 1); + return {buffer.data()}; } std::error_code get_last_ssl_error_code() { diff --git a/libminifi/test/unit/ConfigurationUtilsTests.cpp b/libminifi/test/unit/ConfigurationUtilsTests.cpp new file mode 100644 index 000000000..9ceeee7cd --- /dev/null +++ b/libminifi/test/unit/ConfigurationUtilsTests.cpp @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "unit/Catch.h" +#include "unit/TestBase.h" +#include "properties/Configure.h" +#include "utils/ConfigurationUtils.h" + +TEST_CASE("getBufferSize() returns the default buffer size by default", "[utils::configuration]") { + minifi::ConfigureImpl configuration; + CHECK(minifi::utils::configuration::getBufferSize(configuration) == minifi::utils::configuration::DEFAULT_BUFFER_SIZE); +} + +TEST_CASE("getBufferSize() returns the configured buffer size if it exists", "[utils::configuration]") { + minifi::ConfigureImpl configuration; + configuration.set(minifi::Configure::nifi_default_internal_buffer_size, "12345"); + CHECK(minifi::utils::configuration::getBufferSize(configuration) == 12345); +} + +TEST_CASE("getBufferSize() throws if an invalid buffer size is set in the configuration", "[utils::configuration]") { + minifi::ConfigureImpl configuration; + configuration.set(minifi::Configure::nifi_default_internal_buffer_size, "as large as possible"); + CHECK_THROWS(minifi::utils::configuration::getBufferSize(configuration)); +} diff --git a/minifi-api/include/minifi-cpp/core/ProcessContext.h b/minifi-api/include/minifi-cpp/core/ProcessContext.h index 98f1814d6..50f1518f1 100644 --- a/minifi-api/include/minifi-cpp/core/ProcessContext.h +++ b/minifi-api/include/minifi-cpp/core/ProcessContext.h @@ -67,7 +67,7 @@ class ProcessContext : public virtual core::VariableRegistry, public virtual uti virtual StateManager* getStateManager() = 0; virtual bool hasStateManager() const = 0; - virtual std::shared_ptr<Configure> getConfiguration() const = 0; + virtual gsl::not_null<Configure*> getConfiguration() const = 0; }; } // namespace org::apache::nifi::minifi::core diff --git a/minifi-api/include/minifi-cpp/properties/Configuration.h b/minifi-api/include/minifi-cpp/properties/Configuration.h index 2009e746f..af2cdcdbe 100644 --- a/minifi-api/include/minifi-cpp/properties/Configuration.h +++ b/minifi-api/include/minifi-cpp/properties/Configuration.h @@ -41,8 +41,6 @@ class Configuration : public virtual Properties { static constexpr const char *nifi_content_repository_rocksdb_options = "nifi.content.repository.rocksdb.options."; static constexpr const char *nifi_provenance_repository_rocksdb_options = "nifi.provenance.repository.rocksdb.options."; static constexpr const char *nifi_state_storage_rocksdb_options = "nifi.state.storage.rocksdb.options."; - - // nifi.flow.configuration.file static constexpr const char *nifi_default_directory = "nifi.default.directory"; static constexpr const char *nifi_flow_configuration_file = "nifi.flow.configuration.file"; static constexpr const char *nifi_flow_configuration_encrypt = "nifi.flow.configuration.encrypt"; @@ -72,6 +70,7 @@ class Configuration : public virtual Properties { static constexpr const char *nifi_provenance_repository_directory_default = "nifi.provenance.repository.directory.default"; static constexpr const char *nifi_flowfile_repository_directory_default = "nifi.flowfile.repository.directory.default"; static constexpr const char *nifi_dbcontent_repository_directory_default = "nifi.database.content.repository.directory.default"; + static constexpr const char *nifi_default_internal_buffer_size = "nifi.default.internal.buffer.size"; // these are internal properties related to the rocksdb backend static constexpr const char *nifi_flowfile_repository_rocksdb_compaction_period = "nifi.flowfile.repository.rocksdb.compaction.period"; diff --git a/minifi-api/include/minifi-cpp/properties/Configure.h b/minifi-api/include/minifi-cpp/properties/Configure.h index e72dec51b..393a91c70 100644 --- a/minifi-api/include/minifi-cpp/properties/Configure.h +++ b/minifi-api/include/minifi-cpp/properties/Configure.h @@ -23,6 +23,7 @@ #include "Configuration.h" #include "minifi-cpp/core/AgentIdentificationProvider.h" +#include "utils/gsl.h" struct ConfigTestAccessor; @@ -40,10 +41,10 @@ class Configure : public virtual Configuration, public virtual core::AgentIdenti virtual void setFallbackAgentIdentifier(const std::string& id) = 0; using Configuration::set; - virtual void set(const std::string& key, const std::string& value, PropertyChangeLifetime lifetime) override = 0; - virtual bool commitChanges() override = 0; + void set(const std::string& key, const std::string& value, PropertyChangeLifetime lifetime) override = 0; + bool commitChanges() override = 0; - static std::shared_ptr<Configure> create(); + static gsl::not_null<std::shared_ptr<Configure>> create(); }; } // namespace org::apache::nifi::minifi diff --git a/utils/include/core/ProcessContext.h b/utils/include/core/ProcessContext.h index c6e3b9465..c4c8c7eb8 100644 --- a/utils/include/core/ProcessContext.h +++ b/utils/include/core/ProcessContext.h @@ -48,41 +48,32 @@ namespace org::apache::nifi::minifi::core { class ProcessContextImpl : public core::VariableRegistryImpl, public virtual ProcessContext { public: - /*! - * Create a new process context associated with the processor/controller service/state manager - */ ProcessContextImpl(Processor& processor, controller::ControllerServiceProvider* controller_service_provider, const std::shared_ptr<core::Repository>& repo, const std::shared_ptr<core::Repository>& flow_repo, const std::shared_ptr<core::ContentRepository>& content_repo = repository::createFileSystemRepository()) - : VariableRegistryImpl(Configure::create()), + : VariableRegistryImpl(static_cast<std::shared_ptr<Configure>>(minifi::Configure::create())), + logger_(logging::LoggerFactory<ProcessContext>::getLogger()), controller_service_provider_(controller_service_provider), + state_storage_(getStateStorage(logger_, controller_service_provider_, nullptr)), + repo_(repo), flow_repo_(flow_repo), content_repo_(content_repo), processor_(processor), - logger_(logging::LoggerFactory<ProcessContext>::getLogger()), configure_(minifi::Configure::create()), - initialized_(false) { - repo_ = repo; - state_storage_ = getStateStorage(logger_, controller_service_provider_, nullptr); - } + initialized_(false) {} - /*! - * Create a new process context associated with the processor/controller service/state manager - */ ProcessContextImpl(Processor& processor, controller::ControllerServiceProvider* controller_service_provider, const std::shared_ptr<core::Repository>& repo, const std::shared_ptr<core::Repository>& flow_repo, const std::shared_ptr<minifi::Configure>& configuration, const std::shared_ptr<core::ContentRepository>& content_repo = repository::createFileSystemRepository()) : VariableRegistryImpl(configuration), + logger_(logging::LoggerFactory<ProcessContext>::getLogger()), controller_service_provider_(controller_service_provider), + state_storage_(getStateStorage(logger_, controller_service_provider_, configuration)), + repo_(repo), flow_repo_(flow_repo), content_repo_(content_repo), processor_(processor), - logger_(logging::LoggerFactory<ProcessContext>::getLogger()), - configure_(configuration), - initialized_(false) { - repo_ = repo; - state_storage_ = getStateStorage(logger_, controller_service_provider_, configuration); - if (!configure_) { configure_ = minifi::Configure::create(); } - } + configure_(configuration ? gsl::make_not_null(configuration) : minifi::Configure::create()), + initialized_(false) {} // Get Processor associated with the Process Context Processor& getProcessor() const override { return processor_; } @@ -216,9 +207,10 @@ class ProcessContextImpl : public core::VariableRegistryImpl, public virtual Pro } } - std::shared_ptr<Configure> getConfiguration() const override { return configure_; } + gsl::not_null<Configure*> getConfiguration() const override { return gsl::make_not_null(configure_.get()); } private: + std::shared_ptr<logging::Logger> logger_; controller::ControllerServiceProvider* controller_service_provider_; std::shared_ptr<core::StateStorage> state_storage_; std::unique_ptr<StateManager> state_manager_; @@ -226,8 +218,7 @@ class ProcessContextImpl : public core::VariableRegistryImpl, public virtual Pro std::shared_ptr<core::Repository> flow_repo_; std::shared_ptr<core::ContentRepository> content_repo_; Processor& processor_; - std::shared_ptr<logging::Logger> logger_; - std::shared_ptr<Configure> configure_; + gsl::not_null<std::shared_ptr<Configure>> configure_; bool initialized_; }; diff --git a/utils/include/io/StreamPipe.h b/utils/include/io/StreamPipe.h index 877630bcb..7502239b6 100644 --- a/utils/include/io/StreamPipe.h +++ b/utils/include/io/StreamPipe.h @@ -26,13 +26,14 @@ #include "InputStream.h" #include "OutputStream.h" #include "minifi-cpp/io/StreamCallback.h" +#include "utils/ConfigurationUtils.h" namespace org::apache::nifi::minifi { namespace internal { inline int64_t pipe(io::InputStream& src, io::OutputStream& dst) { - std::array<std::byte, 4096> buffer{}; - int64_t totalTransferred = 0; + std::array<std::byte, utils::configuration::DEFAULT_BUFFER_SIZE> buffer{}; + size_t totalTransferred = 0; while (true) { const auto readRet = src.read(buffer); if (io::isError(readRet)) return -1; @@ -54,7 +55,7 @@ inline int64_t pipe(io::InputStream& src, io::OutputStream& dst) { } totalTransferred += transferred; } - return totalTransferred; + return gsl::narrow<int64_t>(totalTransferred); } } // namespace internal diff --git a/utils/include/utils/ConfigurationUtils.h b/utils/include/utils/ConfigurationUtils.h new file mode 100644 index 000000000..bb309ab20 --- /dev/null +++ b/utils/include/utils/ConfigurationUtils.h @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include <cstddef> + +namespace org::apache::nifi::minifi { + +class Configure; + +namespace utils::configuration { + +inline constexpr size_t DEFAULT_BUFFER_SIZE = 4096; +size_t getBufferSize(const Configure& configuration); + +} // namespace utils::configuration +} // namespace org::apache::nifi::minifi diff --git a/utils/src/utils/ConfigurationUtils.cpp b/utils/src/utils/ConfigurationUtils.cpp new file mode 100644 index 000000000..df44df66d --- /dev/null +++ b/utils/src/utils/ConfigurationUtils.cpp @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "utils/ConfigurationUtils.h" + +#include "minifi-cpp/properties/Configure.h" +#include "utils/ParsingUtils.h" + +namespace org::apache::nifi::minifi::utils::configuration { + +size_t getBufferSize(const Configure& configuration) { + if (const auto buffer_size = configuration.get(Configure::nifi_default_internal_buffer_size); buffer_size && !buffer_size->empty()) { + return parsing::parseIntegral<size_t>(*buffer_size) | utils::orThrow(fmt::format("Invalid value '{}' for {}", *buffer_size, Configure::nifi_default_internal_buffer_size)); + } else { + return DEFAULT_BUFFER_SIZE; + } +} + +} // namespace org::apache::nifi::minifi::utils::configuration diff --git a/utils/src/utils/file/FileUtils.cpp b/utils/src/utils/file/FileUtils.cpp index 55db62364..9c68f1469 100644 --- a/utils/src/utils/file/FileUtils.cpp +++ b/utils/src/utils/file/FileUtils.cpp @@ -22,6 +22,7 @@ #include <algorithm> #include <iostream> +#include "utils/ConfigurationUtils.h" #include "utils/Literals.h" #include "utils/Searcher.h" @@ -32,8 +33,8 @@ namespace org::apache::nifi::minifi::utils::file { uint64_t computeChecksum(const std::filesystem::path& file_name, uint64_t up_to_position) { - constexpr uint64_t BUFFER_SIZE = 4096U; - std::array<char, std::size_t{BUFFER_SIZE}> buffer{}; + static constexpr auto BUFFER_SIZE = utils::configuration::DEFAULT_BUFFER_SIZE; + std::array<char, BUFFER_SIZE> buffer{}; std::ifstream stream{file_name, std::ios::in | std::ios::binary}; @@ -41,7 +42,7 @@ uint64_t computeChecksum(const std::filesystem::path& file_name, uint64_t up_to_ uint64_t remaining_bytes_to_be_read = up_to_position; while (stream && remaining_bytes_to_be_read > 0) { - stream.read(buffer.data(), gsl::narrow<std::streamsize>(std::min(BUFFER_SIZE, remaining_bytes_to_be_read))); + stream.read(buffer.data(), gsl::narrow<std::streamsize>((std::min)(uint64_t{BUFFER_SIZE}, remaining_bytes_to_be_read))); uInt bytes_read = gsl::narrow<uInt>(stream.gcount()); checksum = crc32(checksum, reinterpret_cast<unsigned char*>(buffer.data()), bytes_read); remaining_bytes_to_be_read -= bytes_read;
