This is an automated email from the ASF dual-hosted git repository. fgerlits pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 8bda98d4a959cdf18459cc57f156e2d6dc557e75 Author: Adam Debreceni <[email protected]> AuthorDate: Fri Oct 14 14:15:40 2022 +0200 MINIFICPP-1959 - Ensure that VolatileFlowFileRepository does not delete referenced resource Signed-off-by: Ferenc Gerlits <[email protected]> This closes #1435 --- extensions/sftp/tests/ListSFTPTests.cpp | 2 +- .../tests/unit/TailFileTests.cpp | 4 +- .../core/repository/VolatileFlowFileRepository.h | 4 + .../include/core/repository/VolatileRepository.h | 11 +-- libminifi/src/core/ProcessSession.cpp | 94 +++++++++++++--------- libminifi/test/TestBase.cpp | 32 ++++++-- libminifi/test/TestBase.h | 13 ++- .../test/unit/ContentRepositoryDependentTests.h | 2 +- libminifi/test/unit/ProcessSessionTests.cpp | 51 +++++++++++- 9 files changed, 156 insertions(+), 57 deletions(-) diff --git a/extensions/sftp/tests/ListSFTPTests.cpp b/extensions/sftp/tests/ListSFTPTests.cpp index f98a51d40..0ddabf900 100644 --- a/extensions/sftp/tests/ListSFTPTests.cpp +++ b/extensions/sftp/tests/ListSFTPTests.cpp @@ -94,7 +94,7 @@ class ListSFTPTestsFixture { list_sftp.reset(); plan.reset(); - plan = testController.createPlan(configuration, state_dir.c_str()); + plan = testController.createPlan(configuration, state_dir); if (list_sftp_uuid == nullptr) { list_sftp = plan->addProcessor( "ListSFTP", diff --git a/extensions/standard-processors/tests/unit/TailFileTests.cpp b/extensions/standard-processors/tests/unit/TailFileTests.cpp index 808dbb190..ccfba30ec 100644 --- a/extensions/standard-processors/tests/unit/TailFileTests.cpp +++ b/extensions/standard-processors/tests/unit/TailFileTests.cpp @@ -1117,7 +1117,7 @@ TEST_CASE("TailFile finds and finishes the renamed file and continues with the n // use persistent state storage that defaults to rocksDB, not volatile const auto configuration = std::make_shared<minifi::Configure>(); { - auto test_plan = testController.createPlan(configuration, state_dir.c_str()); + auto test_plan = testController.createPlan(configuration, state_dir); auto tail_file = test_plan->addProcessor("TailFile", tail_file_uuid, "Tail", {success_relationship}); test_plan->setProperty(tail_file, minifi::processors::TailFile::FileName.getName(), test_file); auto log_attr = test_plan->addProcessor("LogAttribute", "Log", success_relationship, true); @@ -1138,7 +1138,7 @@ TEST_CASE("TailFile finds and finishes the renamed file and continues with the n createTempFile(log_dir, "test.log", "line eight is the last line\n"); { - auto test_plan = testController.createPlan(configuration, state_dir.c_str()); + auto test_plan = testController.createPlan(configuration, state_dir); auto tail_file = test_plan->addProcessor("TailFile", tail_file_uuid, "Tail", {success_relationship}); test_plan->setProperty(tail_file, minifi::processors::TailFile::FileName.getName(), test_file); auto log_attr = test_plan->addProcessor("LogAttribute", "Log", success_relationship, true); diff --git a/libminifi/include/core/repository/VolatileFlowFileRepository.h b/libminifi/include/core/repository/VolatileFlowFileRepository.h index 79fbf7671..23b00be97 100644 --- a/libminifi/include/core/repository/VolatileFlowFileRepository.h +++ b/libminifi/include/core/repository/VolatileFlowFileRepository.h @@ -26,6 +26,8 @@ #include "core/ThreadedRepository.h" #include "utils/gsl.h" +struct VolatileFlowFileRepositoryTestAccessor; + namespace org { namespace apache { namespace nifi { @@ -38,6 +40,8 @@ namespace repository { * those which we no longer hold. */ class VolatileFlowFileRepository : public VolatileRepository<std::string, core::ThreadedRepository> { + friend struct ::VolatileFlowFileRepositoryTestAccessor; + public: explicit VolatileFlowFileRepository(const std::string& repo_name = "", const std::string& /*dir*/ = REPOSITORY_DIRECTORY, diff --git a/libminifi/include/core/repository/VolatileRepository.h b/libminifi/include/core/repository/VolatileRepository.h index d3771e612..097f551c5 100644 --- a/libminifi/include/core/repository/VolatileRepository.h +++ b/libminifi/include/core/repository/VolatileRepository.h @@ -25,6 +25,7 @@ #include <string> #include <utility> #include <vector> +#include <cinttypes> #include "AtomicRepoEntries.h" #include "Connection.h" @@ -134,7 +135,7 @@ class VolatileRepository : public RepositoryType { // current size of the volatile repo. std::atomic<size_t> current_size_; // current index. - std::atomic<uint16_t> current_index_; + std::atomic<uint32_t> current_index_; // value vector that exists for non blocking iteration over // objects that store data for this repo instance. std::vector<AtomicEntry<KeyType>*> value_vector_; @@ -226,11 +227,11 @@ bool VolatileRepository<KeyType, RepositoryType>::Put(const KeyType& key, const size_t reclaimed_size = 0; RepoValue<KeyType> old_value; do { - uint16_t private_index = current_index_.fetch_add(1); + uint32_t private_index = current_index_.fetch_add(1); // round robin through the beginning if (private_index >= max_count_) { - uint16_t new_index = 0; - if (current_index_.compare_exchange_weak(new_index, 0)) { + uint32_t new_index = private_index + 1; + if (current_index_.compare_exchange_weak(new_index, 1)) { private_index = 0; } else { continue; @@ -256,7 +257,7 @@ bool VolatileRepository<KeyType, RepositoryType>::Put(const KeyType& key, const } while (!updated); current_size_ += size; - logger_->log_debug("VolatileRepository -- put %u %u", current_size_.load(), current_index_.load()); + logger_->log_debug("VolatileRepository -- put %zu %" PRIu32, current_size_.load(), current_index_.load()); return true; } diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp index b9ca57272..d472fff0e 100644 --- a/libminifi/src/core/ProcessSession.cpp +++ b/libminifi/src/core/ProcessSession.cpp @@ -976,54 +976,72 @@ void ProcessSession::persistFlowFilesBeforeTransfer( auto flowFileRepo = process_context_->getFlowFileRepository(); auto contentRepo = process_context_->getContentRepository(); - for (auto& [target, flows] : transactionMap) { - const auto connection = dynamic_cast<Connection*>(target); - const bool shouldDropEmptyFiles = connection && connection->getDropEmptyFlowFiles(); - for (auto &ff : flows) { - if (shouldDropEmptyFiles && ff->getSize() == 0) { - // the receiver will drop this FF - continue; + enum class Type { + Dropped, Transferred + }; + + auto forEachFlowFile = [&] (Type type, auto fn) { + for (auto& [target, flows] : transactionMap) { + const auto connection = dynamic_cast<Connection*>(target); + const bool shouldDropEmptyFiles = connection && connection->getDropEmptyFlowFiles(); + for (auto &ff : flows) { + auto snapshotIt = modifiedFlowFiles.find(ff->getUUID()); + auto original = snapshotIt != modifiedFlowFiles.end() ? snapshotIt->second.snapshot : nullptr; + if (shouldDropEmptyFiles && ff->getSize() == 0) { + // the receiver will drop this FF + if (type == Type::Dropped) { + fn(ff, original); + } + } else { + if (type == Type::Transferred) { + fn(ff, original); + } + } } + } + }; - std::unique_ptr<io::BufferStream> stream(new io::BufferStream()); - std::static_pointer_cast<FlowFileRecord>(ff)->Serialize(*stream); + // collect serialized flowfiles + forEachFlowFile(Type::Transferred, [&] (auto& ff, auto& /*original*/) { + auto stream = std::make_unique<io::BufferStream>(); + std::static_pointer_cast<FlowFileRecord>(ff)->Serialize(*stream); - flowData.emplace_back(ff->getUUIDStr(), std::move(stream)); - } - } + flowData.emplace_back(ff->getUUIDStr(), std::move(stream)); + }); + + // increment on behalf of the to be persisted instance + forEachFlowFile(Type::Transferred, [&] (auto& ff, auto& /*original*/) { + if (auto claim = ff->getResourceClaim()) + claim->increaseFlowFileRecordOwnedCount(); + }); if (!flowFileRepo->MultiPut(flowData)) { logger_->log_error("Failed execute multiput on FF repo!"); + // decrement on behalf of the failed persisted instance + forEachFlowFile(Type::Transferred, [&] (auto& ff, auto& /*original*/) { + if (auto claim = ff->getResourceClaim()) + claim->decreaseFlowFileRecordOwnedCount(); + }); throw Exception(PROCESS_SESSION_EXCEPTION, "Failed to put flowfiles to repository"); } - for (auto& [target, flows] : transactionMap) { - const auto connection = dynamic_cast<Connection*>(target); - const bool shouldDropEmptyFiles = connection && connection->getDropEmptyFlowFiles(); - for (auto &ff : flows) { - utils::Identifier uuid = ff->getUUID(); - auto snapshotIt = modifiedFlowFiles.find(uuid); - auto original = snapshotIt != modifiedFlowFiles.end() ? snapshotIt->second.snapshot : nullptr; - if (shouldDropEmptyFiles && ff->getSize() == 0) { - // the receiver promised to drop this FF, no need for it anymore - if (ff->isStored() && flowFileRepo->Delete(ff->getUUIDStr())) { - // original must be non-null since this flowFile is already stored in the repos -> - // must have come from a session->get() - assert(original); - ff->setStoredToRepository(false); - } - continue; - } - auto claim = ff->getResourceClaim(); - // increment on behalf of the persisted instance - if (claim) claim->increaseFlowFileRecordOwnedCount(); - auto originalClaim = original ? original->getResourceClaim() : nullptr; - // decrement on behalf of the overridden instance if any - if (originalClaim) originalClaim->decreaseFlowFileRecordOwnedCount(); - - ff->setStoredToRepository(true); + // decrement on behalf of the overridden instance if any + forEachFlowFile(Type::Transferred, [&] (auto& ff, auto& original) { + if (auto original_claim = original ? original->getResourceClaim() : nullptr) { + original_claim->decreaseFlowFileRecordOwnedCount(); } - } + ff->setStoredToRepository(true); + }); + + forEachFlowFile(Type::Dropped, [&] (auto& ff, auto& original) { + // the receiver promised to drop this FF, no need for it anymore + if (ff->isStored() && flowFileRepo->Delete(ff->getUUIDStr())) { + // original must be non-null since this flowFile is already stored in the repos -> + // must have come from a session->get() + gsl_Assert(original); + ff->setStoredToRepository(false); + } + }); } void ProcessSession::ensureNonNullResourceClaim( diff --git a/libminifi/test/TestBase.cpp b/libminifi/test/TestBase.cpp index b8599f9cc..f09983677 100644 --- a/libminifi/test/TestBase.cpp +++ b/libminifi/test/TestBase.cpp @@ -619,16 +619,34 @@ TestController::TestController() flow_version_ = std::make_shared<minifi::state::response::FlowVersion>("test", "test", "test"); } -std::shared_ptr<TestPlan> TestController::createPlan(std::shared_ptr<minifi::Configure> configuration, const char* state_dir, std::shared_ptr<minifi::core::ContentRepository> content_repo) { - if (configuration == nullptr) { - configuration = std::make_shared<minifi::Configure>(); - configuration->set(minifi::Configure::nifi_state_management_provider_local_class_name, "UnorderedMapKeyValueStoreService"); - configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, createTempDirectory()); +std::shared_ptr<TestPlan> TestController::createPlan(PlanConfig config) { + if (!config.configuration) { + config.configuration = std::make_shared<minifi::Configure>(); + config.configuration->set(minifi::Configure::nifi_state_management_provider_local_class_name, "UnorderedMapKeyValueStoreService"); + config.configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, createTempDirectory()); } - content_repo->initialize(configuration); + if (!config.flow_file_repo) + config.flow_file_repo = std::make_shared<TestRepository>(); - return std::make_shared<TestPlan>(std::move(content_repo), std::make_shared<TestRepository>(), std::make_shared<TestRepository>(), flow_version_, configuration, state_dir); + if (!config.content_repo) + config.content_repo = std::make_shared<minifi::core::repository::VolatileContentRepository>(); + + config.content_repo->initialize(config.configuration); + config.flow_file_repo->initialize(config.configuration); + config.flow_file_repo->loadComponent(config.content_repo); + + return std::make_shared<TestPlan>( + std::move(config.content_repo), std::move(config.flow_file_repo), std::make_shared<TestRepository>(), + flow_version_, config.configuration, config.state_dir ? config.state_dir->string().c_str() : nullptr); +} + +std::shared_ptr<TestPlan> TestController::createPlan(std::shared_ptr<minifi::Configure> configuration, std::optional<std::filesystem::path> state_dir, std::shared_ptr<minifi::core::ContentRepository> content_repo) { + return createPlan(PlanConfig{ + .configuration = std::move(configuration), + .state_dir = std::move(state_dir), + .content_repo = std::move(content_repo) + }); } std::string TestController::createTempDirectory() { diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h index fee34300d..df401203c 100644 --- a/libminifi/test/TestBase.h +++ b/libminifi/test/TestBase.h @@ -331,10 +331,19 @@ class TestPlan { class TestController { public: + struct PlanConfig { + std::shared_ptr<minifi::Configure> configuration = {}; + std::optional<std::filesystem::path> state_dir = {}; + std::shared_ptr<minifi::core::ContentRepository> content_repo = {}; + std::shared_ptr<minifi::core::Repository> flow_file_repo = {}; + }; + TestController(); - std::shared_ptr<TestPlan> createPlan(std::shared_ptr<minifi::Configure> configuration = nullptr, const char* state_dir = nullptr, - std::shared_ptr<minifi::core::ContentRepository> content_repo = std::make_shared<minifi::core::repository::VolatileContentRepository>()); + std::shared_ptr<TestPlan> createPlan(PlanConfig config); + + std::shared_ptr<TestPlan> createPlan(std::shared_ptr<minifi::Configure> configuration = nullptr, std::optional<std::filesystem::path> state_dir = {}, + std::shared_ptr<minifi::core::ContentRepository> content_repo = nullptr); static void runSession(const std::shared_ptr<TestPlan> &plan, bool runToCompletion = true, diff --git a/libminifi/test/unit/ContentRepositoryDependentTests.h b/libminifi/test/unit/ContentRepositoryDependentTests.h index 6674729e8..f19533ba3 100644 --- a/libminifi/test/unit/ContentRepositoryDependentTests.h +++ b/libminifi/test/unit/ContentRepositoryDependentTests.h @@ -60,7 +60,7 @@ class Fixture { const core::Relationship Failure{"failure", "something has gone awry"}; explicit Fixture(std::shared_ptr<core::ContentRepository> content_repo) { - test_plan_ = test_controller_.createPlan(nullptr, nullptr, content_repo); + test_plan_ = test_controller_.createPlan(nullptr, std::nullopt, content_repo); dummy_processor_ = test_plan_->addProcessor("DummyProcessor", "dummyProcessor"); context_ = [this] { test_plan_->runNextProcessor(); diff --git a/libminifi/test/unit/ProcessSessionTests.cpp b/libminifi/test/unit/ProcessSessionTests.cpp index 55a39b272..ff4a78645 100644 --- a/libminifi/test/unit/ProcessSessionTests.cpp +++ b/libminifi/test/unit/ProcessSessionTests.cpp @@ -25,16 +25,22 @@ #include "../Catch.h" #include "ContentRepositoryDependentTests.h" #include "Processor.h" +#include "core/repository/VolatileFlowFileRepository.h" +#include "IntegrationTestUtils.h" +#include "../Utils.h" namespace { class Fixture { public: + explicit Fixture(TestController::PlanConfig config = {}): plan_config_(std::move(config)) {} + minifi::core::ProcessSession &processSession() { return *process_session_; } private: TestController test_controller_; - std::shared_ptr<TestPlan> test_plan_ = test_controller_.createPlan(); + TestController::PlanConfig plan_config_; + std::shared_ptr<TestPlan> test_plan_ = test_controller_.createPlan(plan_config_); std::shared_ptr<minifi::core::Processor> dummy_processor_ = test_plan_->addProcessor("DummyProcessor", "dummyProcessor"); std::shared_ptr<minifi::core::ProcessContext> context_ = [this] { test_plan_->runNextProcessor(); return test_plan_->getCurrentContext(); }(); std::unique_ptr<minifi::core::ProcessSession> process_session_ = std::make_unique<core::ProcessSession>(context_); @@ -122,3 +128,46 @@ TEST_CASE("ProcessSession::read can read zero length flowfiles without crash", " ContentRepositoryDependentTests::testReadFromZeroLengthFlowFile(std::make_shared<core::repository::VolatileContentRepository>()); ContentRepositoryDependentTests::testReadFromZeroLengthFlowFile(std::make_shared<core::repository::FileSystemRepository>()); } + +struct VolatileFlowFileRepositoryTestAccessor { + METHOD_ACCESSOR(flush); +}; + +class TestVolatileFlowFileRepository : public core::repository::VolatileFlowFileRepository { + public: + explicit TestVolatileFlowFileRepository(const std::string& name) : core::SerializableComponent(name) {} + + bool MultiPut(const std::vector<std::pair<std::string, std::unique_ptr<minifi::io::BufferStream>>>& data) override { + auto flush_on_exit = gsl::finally([&] {VolatileFlowFileRepositoryTestAccessor::call_flush(*this);}); + return VolatileFlowFileRepository::MultiPut(data); + } +}; + +TEST_CASE("ProcessSession::commit avoids dangling ResourceClaims when using VolatileFlowFileRepository", "[incrementbefore]") { + auto configuration = std::make_shared<minifi::Configure>(); + configuration->set(minifi::Configure::nifi_volatile_repository_options_flowfile_max_count, "2"); + auto ff_repo = std::make_shared<TestVolatileFlowFileRepository>("flowfile"); + Fixture fixture({ + .configuration = std::move(configuration), + .flow_file_repo = ff_repo + }); + auto& session = fixture.processSession(); + + const auto flow_file_1 = session.create(); + const auto flow_file_2 = session.create(); + const auto flow_file_3 = session.create(); + session.transfer(flow_file_1, Success); + session.transfer(flow_file_2, Success); + session.transfer(flow_file_3, Success); + session.commit(); + + // flow_files are owned by the shared_ptr on the stack and the ff_repo + // but the first one has been evicted from the ff_repo + REQUIRE(flow_file_1->getResourceClaim()->getFlowFileRecordOwnedCount() == 1); + REQUIRE(flow_file_2->getResourceClaim()->getFlowFileRecordOwnedCount() == 2); + REQUIRE(flow_file_3->getResourceClaim()->getFlowFileRecordOwnedCount() == 2); + + REQUIRE(flow_file_1->getResourceClaim()->exists()); + REQUIRE(flow_file_2->getResourceClaim()->exists()); + REQUIRE(flow_file_3->getResourceClaim()->exists()); +}
