This is an automated email from the ASF dual-hosted git repository. szaszm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 102a219e4334a4a5b61d040723e6d91c7931758c Author: Martin Zink <[email protected]> AuthorDate: Mon Oct 28 16:43:44 2024 +0100 MINIFICPP-2474 Fix memory leaks in extensions/libarchive Closes #1880 Signed-off-by: Marton Szasz <[email protected]> --- extensions/libarchive/FocusArchiveEntry.cpp | 29 ++++++----- extensions/libarchive/MergeContent.h | 57 ++++++++++------------ extensions/libarchive/ReadArchiveStream.cpp | 10 ++-- extensions/libarchive/ReadArchiveStream.h | 20 +++----- extensions/libarchive/SmartArchivePtrs.h | 42 ++++++++++++++++ extensions/libarchive/UnfocusArchiveEntry.cpp | 39 +++++++-------- extensions/libarchive/WriteArchiveStream.cpp | 6 +-- extensions/libarchive/WriteArchiveStream.h | 37 ++++++-------- extensions/libarchive/tests/CMakeLists.txt | 1 + .../libarchive/tests/CompressContentTests.cpp | 36 +++++++------- extensions/libarchive/tests/FocusArchiveTests.cpp | 16 +++--- .../libarchive/tests/ManipulateArchiveTests.cpp | 2 +- extensions/libarchive/tests/MergeFileTests.cpp | 56 +++++++++++---------- extensions/libarchive/tests/util/ArchiveTests.cpp | 57 ++++++++++------------ .../libarchive/{ => tests/util}/ArchiveTests.h | 15 +++--- 15 files changed, 214 insertions(+), 209 deletions(-) diff --git a/extensions/libarchive/FocusArchiveEntry.cpp b/extensions/libarchive/FocusArchiveEntry.cpp index 078c9afa9..fa69ba4fd 100644 --- a/extensions/libarchive/FocusArchiveEntry.cpp +++ b/extensions/libarchive/FocusArchiveEntry.cpp @@ -33,6 +33,7 @@ #include "core/ProcessSession.h" #include "core/Resource.h" #include "Exception.h" +#include "SmartArchivePtrs.h" #include "utils/gsl.h" namespace org::apache::nifi::minifi::processors { @@ -150,7 +151,7 @@ la_ssize_t FocusArchiveEntry::ReadCallback::read_cb(struct archive * a, void *d, } int64_t FocusArchiveEntry::ReadCallback::operator()(const std::shared_ptr<io::InputStream>& stream) const { - auto inputArchive = archive_read_new(); + auto input_archive = processors::archive_read_unique_ptr{archive_read_new()}; struct archive_entry *entry = nullptr; int64_t nlen = 0; @@ -158,35 +159,35 @@ int64_t FocusArchiveEntry::ReadCallback::operator()(const std::shared_ptr<io::In data.stream = stream; data.processor = proc_; - archive_read_support_format_all(inputArchive); - archive_read_support_filter_all(inputArchive); + archive_read_support_format_all(input_archive.get()); + archive_read_support_filter_all(input_archive.get()); // Read each item in the archive - if (archive_read_open(inputArchive, &data, ok_cb, read_cb, ok_cb)) { - logger_->log_error("FocusArchiveEntry can't open due to archive error: {}", archive_error_string(inputArchive)); + if (archive_read_open(input_archive.get(), &data, ok_cb, read_cb, ok_cb)) { + logger_->log_error("FocusArchiveEntry can't open due to archive error: {}", archive_error_string(input_archive.get())); return nlen; } while (proc_->isRunning()) { - auto res = archive_read_next_header(inputArchive, &entry); + auto res = archive_read_next_header(input_archive.get(), &entry); if (res == ARCHIVE_EOF) { break; } if (res < ARCHIVE_OK) { - logger_->log_error("FocusArchiveEntry can't read header due to archive error: {}", archive_error_string(inputArchive)); + logger_->log_error("FocusArchiveEntry can't read header due to archive error: {}", archive_error_string(input_archive.get())); return nlen; } if (res < ARCHIVE_WARN) { - logger_->log_warn("FocusArchiveEntry got archive warning while reading header: {}", archive_error_string(inputArchive)); + logger_->log_warn("FocusArchiveEntry got archive warning while reading header: {}", archive_error_string(input_archive.get())); return nlen; } auto entryName = archive_entry_pathname(entry); - (*_archiveMetadata).archiveFormatName.assign(archive_format_name(inputArchive)); - (*_archiveMetadata).archiveFormat = archive_format(inputArchive); + _archiveMetadata->archiveFormatName.assign(archive_format_name(input_archive.get())); + _archiveMetadata->archiveFormat = archive_format(input_archive.get()); // Record entry metadata auto entryType = archive_entry_filetype(entry); @@ -215,20 +216,18 @@ int64_t FocusArchiveEntry::ReadCallback::operator()(const std::shared_ptr<io::In if (archive_entry_size(entry) > 0) { #ifdef WIN32 - nlen += archive_read_data_into_fd(inputArchive, _fileno(fd)); + nlen += archive_read_data_into_fd(input_archive.get(), _fileno(fd)); #else - nlen += archive_read_data_into_fd(inputArchive, fileno(fd)); + nlen += archive_read_data_into_fd(input_archive.get(), fileno(fd)); #endif } (void)fclose(fd); } - (*_archiveMetadata).entryMetadata.push_back(metadata); + _archiveMetadata->entryMetadata.push_back(metadata); } - archive_read_close(inputArchive); - archive_read_free(inputArchive); return nlen; } diff --git a/extensions/libarchive/MergeContent.h b/extensions/libarchive/MergeContent.h index e1e7c6273..a0248eec2 100644 --- a/extensions/libarchive/MergeContent.h +++ b/extensions/libarchive/MergeContent.h @@ -27,6 +27,7 @@ #include "BinFiles.h" #include "archive_entry.h" #include "archive.h" +#include "SmartArchivePtrs.h" #include "core/logging/LoggerConfiguration.h" #include "core/PropertyDefinitionBuilder.h" #include "core/PropertyType.h" @@ -123,10 +124,10 @@ class ArchiveMerge { public: class ArchiveWriter : public io::OutputStream { public: - ArchiveWriter(struct archive *arch, struct archive_entry *entry) : arch_(arch), entry_(entry) {} - size_t write(const uint8_t* data, size_t size) override { + ArchiveWriter(archive& arch, archive_entry& entry) : arch_(arch), entry_(entry) {} + size_t write(const uint8_t* data, const size_t size) override { if (!header_emitted_) { - if (archive_write_header(arch_, entry_) != ARCHIVE_OK) { + if (archive_write_header(&arch_, &entry_) != ARCHIVE_OK) { return io::STREAM_ERROR; } header_emitted_ = true; @@ -134,7 +135,7 @@ class ArchiveMerge { size_t totalWrote = 0; size_t remaining = size; while (remaining > 0) { - const auto ret = archive_write_data(arch_, data + totalWrote, remaining); + const auto ret = archive_write_data(&arch_, data + totalWrote, remaining); if (ret < 0) { return io::STREAM_ERROR; } @@ -149,8 +150,8 @@ class ArchiveMerge { } private: - struct archive *arch_; - struct archive_entry *entry_; + archive& arch_; + archive_entry& entry_; bool header_emitted_{false}; }; // Nest Callback Class for write stream @@ -160,20 +161,18 @@ class ArchiveMerge { : merge_type_(merge_type), flows_(flows), serializer_(serializer) { - size_ = 0; - stream_ = nullptr; } std::string merge_type_; std::deque<std::shared_ptr<core::FlowFile>> &flows_; - std::shared_ptr<io::OutputStream> stream_; - size_t size_; + std::shared_ptr<io::OutputStream> stream_ = nullptr; + size_t size_ = 0; std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<ArchiveMerge>::getLogger(); FlowFileSerializer& serializer_; static la_ssize_t archive_write(struct archive* /*arch*/, void *context, const void *buff, size_t size) { - auto* callback = reinterpret_cast<WriteCallback *>(context); - auto* data = reinterpret_cast<uint8_t*>(const_cast<void*>(buff)); + auto* callback = static_cast<WriteCallback *>(context); + const auto* data = static_cast<uint8_t*>(const_cast<void*>(buff)); la_ssize_t totalWrote = 0; size_t remaining = size; while (remaining > 0) { @@ -193,48 +192,42 @@ class ArchiveMerge { } int64_t operator()(const std::shared_ptr<io::OutputStream>& stream) { - struct archive *arch; + const auto arch = archive_write_unique_ptr{archive_write_new()}; - arch = archive_write_new(); if (merge_type_ == merge_content_options::MERGE_FORMAT_TAR_VALUE) { - archive_write_set_format_pax_restricted(arch); // tar format + archive_write_set_format_pax_restricted(arch.get()); // tar format } if (merge_type_ == merge_content_options::MERGE_FORMAT_ZIP_VALUE) { - archive_write_set_format_zip(arch); // zip format + archive_write_set_format_zip(arch.get()); // zip format } - archive_write_set_bytes_per_block(arch, 0); - archive_write_add_filter_none(arch); + archive_write_set_bytes_per_block(arch.get(), 0); + archive_write_add_filter_none(arch.get()); stream_ = stream; - archive_write_open(arch, this, nullptr, archive_write, nullptr); + archive_write_open(arch.get(), this, nullptr, archive_write, nullptr); for (const auto& flow : flows_) { - struct archive_entry *entry = archive_entry_new(); + auto entry = archive_entry_unique_ptr{archive_entry_new()}; std::string fileName; flow->getAttribute(core::SpecialFlowAttribute::FILENAME, fileName); - archive_entry_set_pathname(entry, fileName.c_str()); - archive_entry_set_size(entry, gsl::narrow<la_int64_t>(flow->getSize())); - archive_entry_set_mode(entry, S_IFREG | 0755); + archive_entry_set_pathname(entry.get(), fileName.c_str()); + archive_entry_set_size(entry.get(), gsl::narrow<la_int64_t>(flow->getSize())); + archive_entry_set_mode(entry.get(), S_IFREG | 0755); if (merge_type_ == merge_content_options::MERGE_FORMAT_TAR_VALUE) { - std::string perm; - int permInt; - if (flow->getAttribute(BinFiles::TAR_PERMISSIONS_ATTRIBUTE, perm)) { + if (std::string perm; flow->getAttribute(BinFiles::TAR_PERMISSIONS_ATTRIBUTE, perm)) { try { - permInt = std::stoi(perm); + const int perm_int = std::stoi(perm); logger_->log_debug("Merge Tar File {} permission {}", fileName, perm); - archive_entry_set_perm(entry, (mode_t) permInt); + archive_entry_set_perm(entry.get(), static_cast<mode_t>(perm_int)); } catch (...) { } } } - const auto ret = serializer_.serialize(flow, std::make_shared<ArchiveWriter>(arch, entry)); + const auto ret = serializer_.serialize(flow, std::make_shared<ArchiveWriter>(*arch, *entry)); if (ret < 0) { return ret; } - archive_entry_free(entry); } - archive_write_close(arch); - archive_write_free(arch); return gsl::narrow<int64_t>(size_); } }; diff --git a/extensions/libarchive/ReadArchiveStream.cpp b/extensions/libarchive/ReadArchiveStream.cpp index 6268875f0..c95a7b8f3 100644 --- a/extensions/libarchive/ReadArchiveStream.cpp +++ b/extensions/libarchive/ReadArchiveStream.cpp @@ -20,16 +20,14 @@ namespace org::apache::nifi::minifi::io { -ReadArchiveStreamImpl::archive_ptr ReadArchiveStreamImpl::createReadArchive() { - archive_ptr arch{archive_read_new()}; +processors::archive_read_unique_ptr ReadArchiveStreamImpl::createReadArchive() { + auto arch = processors::archive_read_unique_ptr{archive_read_new()}; if (!arch) { logger_->log_error("Failed to create read archive"); return nullptr; } - int result; - - result = archive_read_support_format_all(arch.get()); + int result = archive_read_support_format_all(arch.get()); if (result != ARCHIVE_OK) { logger_->log_error("Archive read support format all error {}", archive_error_string(arch.get())); return nullptr; @@ -52,7 +50,7 @@ std::optional<EntryInfo> ReadArchiveStreamImpl::nextEntry() { return std::nullopt; } entry_size_.reset(); - struct archive_entry *entry; + struct archive_entry *entry = nullptr; int result = archive_read_next_header(arch_.get(), &entry); if (result != ARCHIVE_OK) { if (result != ARCHIVE_EOF) { diff --git a/extensions/libarchive/ReadArchiveStream.h b/extensions/libarchive/ReadArchiveStream.h index 155463c44..0d1b87abd 100644 --- a/extensions/libarchive/ReadArchiveStream.h +++ b/extensions/libarchive/ReadArchiveStream.h @@ -27,23 +27,17 @@ #include "archive_entry.h" #include "archive.h" +#include "SmartArchivePtrs.h" namespace org::apache::nifi::minifi::io { -class ReadArchiveStreamImpl : public ReadArchiveStream { - struct archive_read_deleter { - int operator()(struct archive* ptr) const { - return archive_read_free(ptr); - } - }; - using archive_ptr = std::unique_ptr<struct archive, archive_read_deleter>; - +class ReadArchiveStreamImpl final : public ReadArchiveStream { class BufferedReader { public: explicit BufferedReader(std::shared_ptr<InputStream> input) : input_(std::move(input)) {} std::optional<std::span<const std::byte>> readChunk() { - size_t result = input_->read(buffer_); + const size_t result = input_->read(buffer_); if (io::isError(result)) { return std::nullopt; } @@ -55,7 +49,7 @@ class ReadArchiveStreamImpl : public ReadArchiveStream { std::array<std::byte, 4096> buffer_{}; }; - archive_ptr createReadArchive(); + processors::archive_read_unique_ptr createReadArchive(); public: explicit ReadArchiveStreamImpl(std::shared_ptr<InputStream> input) : reader_(std::move(input)) { @@ -70,8 +64,8 @@ class ReadArchiveStreamImpl : public ReadArchiveStream { private: static la_ssize_t archive_read(struct archive* archive, void *context, const void **buff) { - auto* const input = reinterpret_cast<BufferedReader*>(context); - auto opt_buffer = input->readChunk(); + auto* const input = static_cast<BufferedReader*>(context); + const auto opt_buffer = input->readChunk(); if (!opt_buffer) { archive_set_error(archive, EIO, "Error reading archive"); return -1; @@ -82,7 +76,7 @@ class ReadArchiveStreamImpl : public ReadArchiveStream { std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<ReadArchiveStream>::getLogger(); BufferedReader reader_; - archive_ptr arch_; + processors::archive_read_unique_ptr arch_; std::optional<size_t> entry_size_; }; diff --git a/extensions/libarchive/SmartArchivePtrs.h b/extensions/libarchive/SmartArchivePtrs.h new file mode 100644 index 000000000..f4c0e0ef2 --- /dev/null +++ b/extensions/libarchive/SmartArchivePtrs.h @@ -0,0 +1,42 @@ +/** +* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <memory> +#include "archive.h" +#include "archive_entry.h" + +namespace org::apache::nifi::minifi::processors { + +struct archive_write_unique_ptr_deleter { + void operator()(archive* arch) const noexcept { archive_write_free(arch); } +}; + +struct archive_read_unique_ptr_deleter { + void operator()(archive* arch) const noexcept { archive_read_free(arch); } +}; + +struct archive_entry_unique_ptr_deleter { + void operator()(archive_entry* arch_entry) const noexcept { archive_entry_free(arch_entry); } +}; + +using archive_write_unique_ptr = std::unique_ptr<archive, archive_write_unique_ptr_deleter>; +using archive_read_unique_ptr = std::unique_ptr<archive, archive_read_unique_ptr_deleter>; +using archive_entry_unique_ptr = std::unique_ptr<archive_entry, archive_entry_unique_ptr_deleter>; +} // namespace org::apache::nifi::minifi::processors diff --git a/extensions/libarchive/UnfocusArchiveEntry.cpp b/extensions/libarchive/UnfocusArchiveEntry.cpp index 96fc54251..bacbd4b9a 100644 --- a/extensions/libarchive/UnfocusArchiveEntry.cpp +++ b/extensions/libarchive/UnfocusArchiveEntry.cpp @@ -25,8 +25,7 @@ #include <string> #include <system_error> -#include "archive.h" -#include "archive_entry.h" +#include "SmartArchivePtrs.h" #include "core/ProcessContext.h" #include "core/ProcessSession.h" @@ -147,23 +146,22 @@ la_ssize_t UnfocusArchiveEntry::WriteCallback::write_cb(struct archive *, void * } int64_t UnfocusArchiveEntry::WriteCallback::operator()(const std::shared_ptr<io::OutputStream>& stream) const { - auto outputArchive = archive_write_new(); + auto output_archive = archive_write_unique_ptr{archive_write_new()}; int64_t nlen = 0; - archive_write_set_format(outputArchive, _archiveMetadata->archiveFormat); + archive_write_set_format(output_archive.get(), _archiveMetadata->archiveFormat); UnfocusArchiveEntryWriteData data; data.stream = stream; - archive_write_open(outputArchive, &data, ok_cb, write_cb, ok_cb); + archive_write_open(output_archive.get(), &data, ok_cb, write_cb, ok_cb); // Iterate entries & write from tmp file to archive std::array<char, 8192> buf{}; struct stat st{}; - struct archive_entry* entry = nullptr; + auto entry = archive_entry_unique_ptr{archive_entry_new()}; for (const auto &entryMetadata : _archiveMetadata->entryMetadata) { - entry = archive_entry_new(); logger_->log_info("UnfocusArchiveEntry writing entry {}", entryMetadata.entryName); if (entryMetadata.entryType == AE_IFREG && entryMetadata.entrySize > 0) { @@ -171,21 +169,21 @@ int64_t UnfocusArchiveEntry::WriteCallback::operator()(const std::shared_ptr<io: if (stat_ok != 0) { logger_->log_error("Error statting {}: {}", entryMetadata.tmpFileName, std::system_category().default_error_condition(errno).message()); } - archive_entry_copy_stat(entry, &st); + archive_entry_copy_stat(entry.get(), &st); } - archive_entry_set_filetype(entry, entryMetadata.entryType); - archive_entry_set_pathname(entry, entryMetadata.entryName.c_str()); - archive_entry_set_perm(entry, entryMetadata.entryPerm); - archive_entry_set_size(entry, gsl::narrow<la_int64_t>(entryMetadata.entrySize)); - archive_entry_set_uid(entry, entryMetadata.entryUID); - archive_entry_set_gid(entry, entryMetadata.entryGID); - archive_entry_set_mtime(entry, entryMetadata.entryMTime, gsl::narrow<long>(entryMetadata.entryMTimeNsec)); // NOLINT long comes from libarchive API + archive_entry_set_filetype(entry.get(), entryMetadata.entryType); + archive_entry_set_pathname(entry.get(), entryMetadata.entryName.c_str()); + archive_entry_set_perm(entry.get(), entryMetadata.entryPerm); + archive_entry_set_size(entry.get(), gsl::narrow<la_int64_t>(entryMetadata.entrySize)); + archive_entry_set_uid(entry.get(), entryMetadata.entryUID); + archive_entry_set_gid(entry.get(), entryMetadata.entryGID); + archive_entry_set_mtime(entry.get(), entryMetadata.entryMTime, gsl::narrow<long>(entryMetadata.entryMTimeNsec)); // NOLINT long comes from libarchive API logger_->log_info("Writing {} with type {}, perms {}, size {}, uid {}, gid {}, mtime {},{}", entryMetadata.entryName, entryMetadata.entryType, entryMetadata.entryPerm, entryMetadata.entrySize, entryMetadata.entryUID, entryMetadata.entryGID, entryMetadata.entryMTime, entryMetadata.entryMTimeNsec); - archive_write_header(outputArchive, entry); + archive_write_header(output_archive.get(), entry.get()); // If entry is regular file, copy entry contents if (entryMetadata.entryType == AE_IFREG && entryMetadata.entrySize > 0) { @@ -197,11 +195,11 @@ int64_t UnfocusArchiveEntry::WriteCallback::operator()(const std::shared_ptr<io: while (ifs.good()) { ifs.read(buf.data(), buf.size()); auto len = gsl::narrow<size_t>(ifs.gcount()); - int64_t written = archive_write_data(outputArchive, buf.data(), len); + int64_t written = archive_write_data(output_archive.get(), buf.data(), len); if (written < 0) { logger_->log_error("UnfocusArchiveEntry failed to write data to " "archive entry %s due to error: %s", - entryMetadata.entryName, archive_error_string(outputArchive)); + entryMetadata.entryName, archive_error_string(output_archive.get())); } else { nlen += written; } @@ -213,12 +211,9 @@ int64_t UnfocusArchiveEntry::WriteCallback::operator()(const std::shared_ptr<io: std::filesystem::remove(entryMetadata.tmpFileName); } - archive_entry_clear(entry); + archive_entry_clear(entry.get()); } - archive_write_close(outputArchive); - archive_entry_free(entry); - archive_write_free(outputArchive); return nlen; } diff --git a/extensions/libarchive/WriteArchiveStream.cpp b/extensions/libarchive/WriteArchiveStream.cpp index c2a6853d8..a706b7d5a 100644 --- a/extensions/libarchive/WriteArchiveStream.cpp +++ b/extensions/libarchive/WriteArchiveStream.cpp @@ -23,8 +23,8 @@ namespace org::apache::nifi::minifi::io { -WriteArchiveStreamImpl::archive_ptr WriteArchiveStreamImpl::createWriteArchive() { - archive_ptr arch{archive_write_new()}; +processors::archive_write_unique_ptr WriteArchiveStreamImpl::createWriteArchive() const { + processors::archive_write_unique_ptr arch{archive_write_new()}; if (!arch) { logger_->log_error("Failed to create write archive"); return nullptr; @@ -141,7 +141,7 @@ bool WriteArchiveStreamImpl::finish() { } WriteArchiveStreamImpl::~WriteArchiveStreamImpl() { - finish(); + WriteArchiveStreamImpl::finish(); } } // namespace org::apache::nifi::minifi::io diff --git a/extensions/libarchive/WriteArchiveStream.h b/extensions/libarchive/WriteArchiveStream.h index 714aacc05..7d86c6db5 100644 --- a/extensions/libarchive/WriteArchiveStream.h +++ b/extensions/libarchive/WriteArchiveStream.h @@ -25,8 +25,8 @@ #include "io/ArchiveStream.h" #include "archive_entry.h" #include "archive.h" +#include "SmartArchivePtrs.h" #include "utils/Enum.h" -#include "core/Core.h" #include "logging/LoggerConfiguration.h" namespace org::apache::nifi::minifi::io { @@ -61,30 +61,23 @@ constexpr customize_t enum_name<CompressionFormat>(CompressionFormat value) noex namespace org::apache::nifi::minifi::io { -class WriteArchiveStreamImpl: public WriteArchiveStream { - struct archive_write_deleter { - int operator()(struct archive* ptr) const { - return archive_write_free(ptr); - } - }; - using archive_ptr = std::unique_ptr<struct archive, archive_write_deleter>; - struct archive_entry_deleter { - void operator()(struct archive_entry* ptr) const { - archive_entry_free(ptr); - } - }; - using archive_entry_ptr = std::unique_ptr<struct archive_entry, archive_entry_deleter>; - - archive_ptr createWriteArchive(); +class WriteArchiveStreamImpl final: public WriteArchiveStream { + [[nodiscard]] processors::archive_write_unique_ptr createWriteArchive() const; public: - WriteArchiveStreamImpl(int compress_level, CompressionFormat compress_format, std::shared_ptr<OutputStream> sink) + WriteArchiveStreamImpl(const int compress_level, CompressionFormat compress_format, std::shared_ptr<OutputStream> sink) : compress_level_(compress_level), compress_format_(compress_format), - sink_(std::move(sink)) { - arch_ = createWriteArchive(); + sink_(std::move(sink)), + arch_(createWriteArchive()) { } + WriteArchiveStreamImpl() = delete; + WriteArchiveStreamImpl(const WriteArchiveStreamImpl&) = delete; + WriteArchiveStreamImpl(WriteArchiveStreamImpl&&) = delete; + WriteArchiveStreamImpl& operator=(const WriteArchiveStreamImpl&) = delete; + WriteArchiveStreamImpl& operator=(WriteArchiveStreamImpl&&) = delete; + using OutputStream::write; bool newEntry(const EntryInfo& info) override; @@ -101,15 +94,15 @@ class WriteArchiveStreamImpl: public WriteArchiveStream { private: static la_ssize_t archive_write(struct archive* /*arch*/, void *context, const void *buff, size_t size) { auto* const output = static_cast<OutputStream*>(context); - const auto ret = output->write(reinterpret_cast<const uint8_t*>(buff), size); + const auto ret = output->write(static_cast<const uint8_t*>(buff), size); return io::isError(ret) ? -1 : gsl::narrow<la_ssize_t>(ret); } int compress_level_; CompressionFormat compress_format_; std::shared_ptr<io::OutputStream> sink_; - archive_ptr arch_; - archive_entry_ptr arch_entry_; + processors::archive_write_unique_ptr arch_{}; + processors::archive_entry_unique_ptr arch_entry_{}; std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<WriteArchiveStreamImpl>::getLogger(); }; diff --git a/extensions/libarchive/tests/CMakeLists.txt b/extensions/libarchive/tests/CMakeLists.txt index 5c1ca342f..c10bca023 100644 --- a/extensions/libarchive/tests/CMakeLists.txt +++ b/extensions/libarchive/tests/CMakeLists.txt @@ -31,5 +31,6 @@ FOREACH(testfile ${ARCHIVE_INTEGRATION_TESTS}) target_link_libraries(${testfilename} Catch2WithMain) MATH(EXPR EXTENSIONS_TEST_COUNT "${EXTENSIONS_TEST_COUNT}+1") add_test(NAME "${testfilename}" COMMAND "${testfilename}" WORKING_DIRECTORY ${TEST_DIR}) + set_tests_properties("${testfilename}" PROPERTIES LABELS "libarchive") ENDFOREACH() message("-- Finished building ${ARCHIVE-EXTENSIONS_TEST_COUNT} Lib Archive related test file(s)...") diff --git a/extensions/libarchive/tests/CompressContentTests.cpp b/extensions/libarchive/tests/CompressContentTests.cpp index 7594151b4..31d266b2b 100644 --- a/extensions/libarchive/tests/CompressContentTests.cpp +++ b/extensions/libarchive/tests/CompressContentTests.cpp @@ -38,7 +38,6 @@ #include "core/ProcessorNode.h" #include "CompressContent.h" #include "io/FileStream.h" -#include "FlowFileRecord.h" #include "processors/LogAttribute.h" #include "processors/GetFile.h" #include "processors/PutFile.h" @@ -69,21 +68,20 @@ class ReadCallback { return total_read; } void archive_read() { - struct archive_read_deleter { void operator()(struct archive* p) const noexcept { archive_read_free(p); } }; - std::unique_ptr<struct archive, archive_read_deleter> a{archive_read_new()}; - archive_read_support_format_all(a.get()); - archive_read_support_filter_all(a.get()); - archive_read_open_memory(a.get(), buffer_.data(), read_size_); - struct archive_entry *ae = nullptr; - - REQUIRE(archive_read_next_header(a.get(), &ae) == ARCHIVE_OK); + const auto archive = minifi::processors::archive_read_unique_ptr{archive_read_new()}; + archive_read_support_format_all(archive.get()); + archive_read_support_filter_all(archive.get()); + archive_read_open_memory(archive.get(), buffer_.data(), read_size_); + struct archive_entry* archive_entry = nullptr; + + REQUIRE(archive_read_next_header(archive.get(), &archive_entry) == ARCHIVE_OK); const auto size = [&] { - const auto size = archive_entry_size(ae); - REQUIRE(size >= 0); - return gsl::narrow<size_t>(size); + const auto entry_size = archive_entry_size(archive_entry); + REQUIRE(entry_size >= 0); + return gsl::narrow<size_t>(entry_size); }(); archive_buffer_.resize(size); - archive_read_data(a.get(), archive_buffer_.data(), size); + archive_read_data(archive.get(), archive_buffer_.data(), size); } std::vector<std::byte> buffer_; @@ -405,7 +403,7 @@ TEST_CASE_METHOD(CompressTestController, "CompressFileLZMA", "[compressfiletest5 auto flow = importFlowFile(rawContentPath()); trigger(); - if (LogTestController::getInstance().contains("compression not supported on this platform")) { + if (LogTestController::getInstance().contains("compression not supported on this platform", 20ms, 5ms)) { // platform not support LZMA LogTestController::getInstance().reset(); return; @@ -441,7 +439,7 @@ TEST_CASE_METHOD(DecompressTestController, "DecompressFileLZMA", "[compressfilet flow->setAttribute(core::SpecialFlowAttribute::MIME_TYPE, "application/x-lzma"); trigger(); - if (LogTestController::getInstance().contains("compression not supported on this platform")) { + if (LogTestController::getInstance().contains("compression not supported on this platform", 20ms, 5ms)) { // platform not support LZMA LogTestController::getInstance().reset(); return; @@ -471,7 +469,7 @@ TEST_CASE_METHOD(CompressTestController, "CompressFileXYLZMA", "[compressfiletes auto flow = importFlowFile(rawContentPath()); trigger(); - if (LogTestController::getInstance().contains("compression not supported on this platform")) { + if (LogTestController::getInstance().contains("compression not supported on this platform", 20ms, 5ms)) { // platform not support LZMA LogTestController::getInstance().reset(); return; @@ -507,7 +505,7 @@ TEST_CASE_METHOD(DecompressTestController, "DecompressFileXYLZMA", "[compressfil flow->setAttribute(core::SpecialFlowAttribute::MIME_TYPE, "application/x-xz"); trigger(); - if (LogTestController::getInstance().contains("compression not supported on this platform")) { + if (LogTestController::getInstance().contains("compression not supported on this platform", 20ms, 5ms)) { // platform not support LZMA LogTestController::getInstance().reset(); return; @@ -634,7 +632,7 @@ TEST_CASE_METHOD(CompressTestController, "Batch CompressFileGZip", "[compressFil utils::string::repeat("0", 1000), utils::string::repeat("1", 1000), utils::string::repeat("2", 1000), utils::string::repeat("3", 1000), }; - const std::size_t batchSize = 3; + constexpr std::size_t batchSize = 3; context->setProperty(minifi::processors::CompressContent::CompressMode, magic_enum::enum_name(CompressionMode::compress)); context->setProperty(minifi::processors::CompressContent::CompressFormat, magic_enum::enum_name(CompressionFormat::GZIP)); @@ -710,7 +708,7 @@ TEST_CASE_METHOD(DecompressTestController, "Invalid archive decompression", "[co importFlowFileFrom(minifi::io::BufferStream(std::string{"banana bread"})); trigger(); - if (LogTestController::getInstance().contains("compression not supported on this platform")) { + if (LogTestController::getInstance().contains("compression not supported on this platform", 20ms, 5ms)) { return; } diff --git a/extensions/libarchive/tests/FocusArchiveTests.cpp b/extensions/libarchive/tests/FocusArchiveTests.cpp index 56c7c83df..4dfbec491 100644 --- a/extensions/libarchive/tests/FocusArchiveTests.cpp +++ b/extensions/libarchive/tests/FocusArchiveTests.cpp @@ -24,9 +24,9 @@ #include <string> #include <utility> -#include <archive.h> // NOLINT -#include <archive_entry.h> // NOLINT -#include "ArchiveTests.h" +#include "archive.h" +#include "archive_entry.h" +#include "util/ArchiveTests.h" #include "FocusArchiveEntry.h" #include "processors/GetFile.h" #include "processors/LogAttribute.h" @@ -40,7 +40,7 @@ namespace org::apache::nifi::minifi::processors::test { const std::string TEST_ARCHIVE_NAME = "focus_test_archive.tar"; -const int NUM_FILES = 2; +constexpr int NUM_FILES = 2; const char* FILE_NAMES[NUM_FILES] = {"file1", "file2"}; // NOLINT(cppcoreguidelines-avoid-c-arrays) const char* FILE_CONTENT[NUM_FILES] = {"Test file 1\n", "Test file 2\n"}; // NOLINT(cppcoreguidelines-avoid-c-arrays) @@ -115,19 +115,17 @@ TEST_CASE("FocusArchive", "[testFocusArchive]") { auto size = gsl::narrow<size_t>(ifs.tellg()); ifs.seekg(0, std::ios::beg); - gsl::owner<char*> content = nullptr; - content = new char[size]; - ifs.read(content, gsl::narrow<std::streamsize>(size)); + auto content = std::vector<char>(size); + ifs.read(content.data(), gsl::narrow<std::streamsize>(size)); REQUIRE(size == strlen(FOCUSED_CONTENT)); - REQUIRE(memcmp(content, FOCUSED_CONTENT, size) == 0); + REQUIRE(memcmp(content.data(), FOCUSED_CONTENT, size) == 0); plan->runNextProcessor(); // UnfocusArchive plan->runNextProcessor(); // PutFile 2 (unfocused) auto archive_path_2 = dir3 / TEST_ARCHIVE_NAME; REQUIRE(check_archive_contents(archive_path_2, test_archive_map)); - delete[] content; } } // namespace org::apache::nifi::minifi::processors::test diff --git a/extensions/libarchive/tests/ManipulateArchiveTests.cpp b/extensions/libarchive/tests/ManipulateArchiveTests.cpp index 67ae36b18..96be45d96 100644 --- a/extensions/libarchive/tests/ManipulateArchiveTests.cpp +++ b/extensions/libarchive/tests/ManipulateArchiveTests.cpp @@ -23,7 +23,7 @@ #include "unit/TestBase.h" #include "unit/Catch.h" -#include "ArchiveTests.h" +#include "util/ArchiveTests.h" #include "core/PropertyDefinition.h" #include "processors/GetFile.h" #include "processors/LogAttribute.h" diff --git a/extensions/libarchive/tests/MergeFileTests.cpp b/extensions/libarchive/tests/MergeFileTests.cpp index 5a6cb5ca4..ec3bd4a40 100644 --- a/extensions/libarchive/tests/MergeFileTests.cpp +++ b/extensions/libarchive/tests/MergeFileTests.cpp @@ -56,22 +56,22 @@ void init_file_paths() { struct Initializer { Initializer() { static TestController global_controller; - auto tempDir = global_controller.createTempDirectory(); - FLOW_FILE = (tempDir / "minifi-mergecontent").string(); - EXPECT_MERGE_CONTENT_FIRST = (tempDir / "minifi-expect-mergecontent1.txt").string(); - EXPECT_MERGE_CONTENT_SECOND = (tempDir / "minifi-expect-mergecontent2.txt").string(); - HEADER_FILE = (tempDir / "minifi-mergecontent.header").string(); - FOOTER_FILE = (tempDir / "minifi-mergecontent.footer").string(); - DEMARCATOR_FILE = (tempDir / "minifi-mergecontent.demarcator").string(); + const auto temp_dir = global_controller.createTempDirectory(); + FLOW_FILE = (temp_dir / "minifi-mergecontent").string(); + EXPECT_MERGE_CONTENT_FIRST = (temp_dir / "minifi-expect-mergecontent1.txt").string(); + EXPECT_MERGE_CONTENT_SECOND = (temp_dir / "minifi-expect-mergecontent2.txt").string(); + HEADER_FILE = (temp_dir / "minifi-mergecontent.header").string(); + FOOTER_FILE = (temp_dir / "minifi-mergecontent.footer").string(); + DEMARCATOR_FILE = (temp_dir / "minifi-mergecontent.demarcator").string(); } }; - static Initializer initializer; + [[maybe_unused]] static Initializer initializer; } class FixedBuffer { public: - explicit FixedBuffer(std::size_t capacity) : capacity_(capacity) { - buf_.reset(new uint8_t[capacity_]); // NOLINT(cppcoreguidelines-owning-memory) + explicit FixedBuffer(const std::size_t capacity) : capacity_(capacity) { + buf_.resize(capacity_); } FixedBuffer(FixedBuffer&& other) noexcept : buf_(std::move(other.buf_)), size_(other.size_), capacity_(other.capacity_) { other.size_ = 0; @@ -83,8 +83,10 @@ class FixedBuffer { ~FixedBuffer() = default; [[nodiscard]] std::size_t size() const { return size_; } [[nodiscard]] std::size_t capacity() const { return capacity_; } - [[nodiscard]] uint8_t* begin() const { return buf_.get(); } - [[nodiscard]] uint8_t* end() const { return buf_.get() + size_; } + [[nodiscard]] uint8_t* begin() { return buf_.data(); } + [[nodiscard]] uint8_t* end() { return buf_.data() + size_; } + [[nodiscard]] const uint8_t* begin() const { return buf_.data(); } + [[nodiscard]] const uint8_t* end() const { return buf_.data() + size_; } [[nodiscard]] std::string to_string() const { return {begin(), end()}; @@ -110,7 +112,7 @@ class FixedBuffer { } private: - std::unique_ptr<uint8_t[]> buf_; // NOLINT(cppcoreguidelines-avoid-c-arrays) + std::vector<uint8_t> buf_; std::size_t size_ = 0; std::size_t capacity_ = 0; }; @@ -118,26 +120,26 @@ class FixedBuffer { std::vector<FixedBuffer> read_archives(const FixedBuffer& input) { class ArchiveEntryReader { public: - explicit ArchiveEntryReader(archive* arch) : arch(arch) {} - size_t read(std::span<std::byte> out_buffer) { - const auto ret = archive_read_data(arch, out_buffer.data(), out_buffer.size()); + explicit ArchiveEntryReader(archive& arch) : arch(arch) {} + [[nodiscard]] size_t read(std::span<std::byte> out_buffer) const { + const auto ret = archive_read_data(&arch, out_buffer.data(), out_buffer.size()); return ret < 0 ? minifi::io::STREAM_ERROR : gsl::narrow<size_t>(ret); } private: - archive* arch; + archive& arch; }; std::vector<FixedBuffer> archive_contents; - struct archive *a = archive_read_new(); - archive_read_support_format_all(a); - archive_read_support_filter_all(a); - archive_read_open_memory(a, input.begin(), input.size()); - struct archive_entry *ae = nullptr; - - while (archive_read_next_header(a, &ae) == ARCHIVE_OK) { - int64_t size{archive_entry_size(ae)}; + const auto archive = minifi::processors::archive_read_unique_ptr{archive_read_new()}; + archive_read_support_format_all(archive.get()); + archive_read_support_filter_all(archive.get()); + archive_read_open_memory(archive.get(), input.begin(), input.size()); + struct archive_entry *archive_entry = nullptr; + + while (archive_read_next_header(archive.get(), &archive_entry) == ARCHIVE_OK) { + const int64_t size{archive_entry_size(archive_entry)}; FixedBuffer buf(size); - ArchiveEntryReader reader(a); - auto ret = buf.write(reader, buf.capacity()); + ArchiveEntryReader reader(*archive); + const auto ret = buf.write(reader, buf.capacity()); REQUIRE(ret == size); archive_contents.emplace_back(std::move(buf)); } diff --git a/extensions/libarchive/tests/util/ArchiveTests.cpp b/extensions/libarchive/tests/util/ArchiveTests.cpp index c5d09d83b..fb18e9ecf 100644 --- a/extensions/libarchive/tests/util/ArchiveTests.cpp +++ b/extensions/libarchive/tests/util/ArchiveTests.cpp @@ -25,11 +25,12 @@ #include <algorithm> #include <set> #include <string> -#include <utility> #include "unit/TestBase.h" #include "unit/Catch.h" #include "utils/gsl.h" +#include "SmartArchivePtrs.h" + TAE_MAP_T build_test_archive_map(int NUM_FILES, const char* const* FILE_NAMES, const char* const* FILE_CONTENT) { TAE_MAP_T test_entries; @@ -70,11 +71,11 @@ OrderedTestArchive build_ordered_test_archive(int NUM_FILES, const char* const* void build_test_archive(const std::filesystem::path& path, const TAE_MAP_T& entries, FN_VEC_T order) { std::cout << "Creating " << path << std::endl; - archive * test_archive = archive_write_new(); + const auto test_archive = minifi::processors::archive_write_unique_ptr{archive_write_new()}; - archive_write_set_format_ustar(test_archive); - archive_write_open_filename(test_archive, path.string().c_str()); - struct archive_entry* entry = archive_entry_new(); + archive_write_set_format_ustar(test_archive.get()); + archive_write_open_filename(test_archive.get(), path.string().c_str()); + const auto entry = minifi::processors::archive_entry_unique_ptr{archive_entry_new()}; if (order.empty()) { // Use map sort order for (auto &kvp : entries) @@ -86,25 +87,22 @@ void build_test_archive(const std::filesystem::path& path, const TAE_MAP_T& entr std::cout << "Adding entry: " << name << std::endl; - archive_entry_set_filetype(entry, test_entry.type); - archive_entry_set_pathname(entry, test_entry.name.c_str()); - archive_entry_set_size(entry, gsl::narrow<la_int64_t>(test_entry.size)); - archive_entry_set_perm(entry, test_entry.perms); - archive_entry_set_uid(entry, test_entry.uid); - archive_entry_set_gid(entry, test_entry.gid); - archive_entry_set_mtime(entry, test_entry.mtime, test_entry.mtime_nsec); + archive_entry_set_filetype(entry.get(), test_entry.type); + archive_entry_set_pathname(entry.get(), test_entry.name.c_str()); + archive_entry_set_size(entry.get(), gsl::narrow<la_int64_t>(test_entry.size)); + archive_entry_set_perm(entry.get(), test_entry.perms); + archive_entry_set_uid(entry.get(), test_entry.uid); + archive_entry_set_gid(entry.get(), test_entry.gid); + archive_entry_set_mtime(entry.get(), test_entry.mtime, test_entry.mtime_nsec); - archive_write_header(test_archive, entry); - archive_write_data(test_archive, test_entry.content, test_entry.size); + archive_write_header(test_archive.get(), entry.get()); + archive_write_data(test_archive.get(), test_entry.content, test_entry.size); - archive_entry_clear(entry); + archive_entry_clear(entry.get()); } - - archive_entry_free(entry); - archive_write_close(test_archive); } -void build_test_archive(const std::filesystem::path& path, OrderedTestArchive& ordered_archive) { +void build_test_archive(const std::filesystem::path& path, const OrderedTestArchive& ordered_archive) { build_test_archive(path, ordered_archive.map, ordered_archive.order); } @@ -112,20 +110,18 @@ bool check_archive_contents(const std::filesystem::path& path, const TAE_MAP_T& FN_VEC_T read_names; FN_VEC_T extra_names; bool ok = true; - struct archive *a = archive_read_new(); + const auto archive = minifi::processors::archive_read_unique_ptr{archive_read_new()}; struct archive_entry *entry = nullptr; - archive_read_support_format_all(a); - archive_read_support_filter_all(a); + archive_read_support_format_all(archive.get()); + archive_read_support_filter_all(archive.get()); - int r = archive_read_open_filename(a, path.string().c_str(), 16384); - - if (r != ARCHIVE_OK) { + if (archive_read_open_filename(archive.get(), path.string().c_str(), 16384) != ARCHIVE_OK) { std::cout << "Unable to open archive " << path << " for checking!" << std::endl; return false; } - while (archive_read_next_header(a, &entry) == ARCHIVE_OK) { + while (archive_read_next_header(archive.get(), &entry) == ARCHIVE_OK) { std::string name { archive_entry_pathname(entry) }; auto it = entries.find(name); if (it == entries.end()) { @@ -145,7 +141,7 @@ bool check_archive_contents(const std::filesystem::path& path, const TAE_MAP_T& bool read_ok = true; for (;;) { - const auto rlen = archive_read_data(a, buf.data(), size); + const auto rlen = archive_read_data(archive.get(), buf.data(), size); if (rlen == 0) break; if (rlen < 0) { @@ -173,9 +169,6 @@ bool check_archive_contents(const std::filesystem::path& path, const TAE_MAP_T& } } - archive_read_close(a); - archive_read_free(a); - if (!extra_names.empty()) { ok = false; std::cout << "Extra files found: "; @@ -195,7 +188,7 @@ bool check_archive_contents(const std::filesystem::path& path, const TAE_MAP_T& } else { std::set<std::string> read_names_set(read_names.begin(), read_names.end()); std::set<std::string> test_file_entries_set; - std::transform(entries.begin(), entries.end(), std::inserter(test_file_entries_set, test_file_entries_set.end()), [](const std::pair<std::string, TestArchiveEntry>& p) {return p.first;}); + ranges::transform(entries, std::inserter(test_file_entries_set, test_file_entries_set.end()), [](const std::pair<std::string, TestArchiveEntry>& p) {return p.first;}); REQUIRE(read_names_set == test_file_entries_set); } @@ -203,6 +196,6 @@ bool check_archive_contents(const std::filesystem::path& path, const TAE_MAP_T& return ok; } -bool check_archive_contents(const std::filesystem::path& path, const OrderedTestArchive& archive, bool check_attributes) { +bool check_archive_contents(const std::filesystem::path& path, const OrderedTestArchive& archive, const bool check_attributes) { return check_archive_contents(path, archive.map, check_attributes, archive.order); } diff --git a/extensions/libarchive/ArchiveTests.h b/extensions/libarchive/tests/util/ArchiveTests.h similarity index 87% rename from extensions/libarchive/ArchiveTests.h rename to extensions/libarchive/tests/util/ArchiveTests.h index a2bc076f9..1edd052fa 100644 --- a/extensions/libarchive/ArchiveTests.h +++ b/extensions/libarchive/tests/util/ArchiveTests.h @@ -29,7 +29,7 @@ #include "ArchiveCommon.h" -typedef struct { +struct TestArchiveEntry { const char* content; std::string name; mode_t type; @@ -39,16 +39,16 @@ typedef struct { time_t mtime; uint32_t mtime_nsec; size_t size; -} TestArchiveEntry; +}; -typedef std::map<std::string, TestArchiveEntry> TAE_MAP_T; -typedef std::vector<std::string> FN_VEC_T; +using TAE_MAP_T = std::map<std::string, TestArchiveEntry>; +using FN_VEC_T = std::vector<std::string>; -typedef struct { +struct OrderedTestArchive { TAE_MAP_T map; FN_VEC_T order; -} OrderedTestArchive; +}; TAE_MAP_T build_test_archive_map(int, const char* const*, const char* const*); @@ -57,8 +57,7 @@ FN_VEC_T build_test_archive_order(int, const char* const*); OrderedTestArchive build_ordered_test_archive(int, const char* const*, const char* const*); void build_test_archive(const std::filesystem::path&, const TAE_MAP_T& entries, FN_VEC_T order = FN_VEC_T()); -void build_test_archive(const std::filesystem::path&, OrderedTestArchive&); +void build_test_archive(const std::filesystem::path&, const OrderedTestArchive&); bool check_archive_contents(const std::filesystem::path&, const TAE_MAP_T& entries, bool check_attributes = true, const FN_VEC_T& order = FN_VEC_T()); bool check_archive_contents(const std::filesystem::path&, const OrderedTestArchive&, bool check_attributes = true); -
