This is an automated email from the ASF dual-hosted git repository. aboda pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 1e5e3ae8e006e64dff81096c4cf558b452fd3d1b Author: Adam Debreceni <[email protected]> AuthorDate: Mon Jun 29 14:51:08 2020 +0200 MINIFICPP-1274 - Commit delete operation before shutdown Signed-off-by: Arpad Boda <[email protected]> This closes #826 --- extensions/rocksdb-repos/FlowFileRepository.cpp | 1 + libminifi/include/core/Repository.h | 2 +- libminifi/test/rocksdb-tests/RepoTests.cpp | 81 +++++++++++++++++++++++++ 3 files changed, 83 insertions(+), 1 deletion(-) diff --git a/extensions/rocksdb-repos/FlowFileRepository.cpp b/extensions/rocksdb-repos/FlowFileRepository.cpp index 428db97..127ce9b 100644 --- a/extensions/rocksdb-repos/FlowFileRepository.cpp +++ b/extensions/rocksdb-repos/FlowFileRepository.cpp @@ -119,6 +119,7 @@ void FlowFileRepository::run() { last = now; } } + flush(); } void FlowFileRepository::prune_stored_flowfiles() { diff --git a/libminifi/include/core/Repository.h b/libminifi/include/core/Repository.h index 3de1a54..e44c479 100644 --- a/libminifi/include/core/Repository.h +++ b/libminifi/include/core/Repository.h @@ -242,7 +242,7 @@ class Repository : public virtual core::SerializableComponent, public core::Trac // thread std::thread thread_; // whether the monitoring thread is running for the repo while it was enabled - bool running_; + std::atomic<bool> running_; // whether stop accepting provenace event std::atomic<bool> repo_full_; // repoSize diff --git a/libminifi/test/rocksdb-tests/RepoTests.cpp b/libminifi/test/rocksdb-tests/RepoTests.cpp index e970c40..7521d6f 100644 --- a/libminifi/test/rocksdb-tests/RepoTests.cpp +++ b/libminifi/test/rocksdb-tests/RepoTests.cpp @@ -326,3 +326,84 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") { LogTestController::getInstance().reset(); } + +TEST_CASE("Flush deleted flowfiles before shutdown", "[TestFFR7]") { + class TestFlowFileRepository: public core::repository::FlowFileRepository{ + public: + explicit TestFlowFileRepository(const std::string& name) + : core::SerializableComponent(name), + FlowFileRepository(name, FLOWFILE_REPOSITORY_DIRECTORY, MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME, + MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, 1) {} + void flush() override { + FlowFileRepository::flush(); + if (onFlush_) { + onFlush_(); + } + } + std::function<void()> onFlush_; + }; + + TestController testController; + char format[] = "/var/tmp/testRepo.XXXXXX"; + auto dir = testController.createTempDirectory(format); + + auto config = std::make_shared<minifi::Configure>(); + config->set(minifi::Configure::nifi_flowfile_repository_directory_default, utils::file::FileUtils::concat_path(dir, "flowfile_repository")); + + auto content_repo = std::make_shared<core::repository::VolatileContentRepository>(); + + auto connection = std::make_shared<minifi::Connection>(nullptr, nullptr, "Connection"); + std::map<std::string, std::shared_ptr<core::Connectable>> connectionMap{{connection->getUUIDStr(), connection}}; + // initialize repository + { + std::shared_ptr<TestFlowFileRepository> ff_repository = std::make_shared<TestFlowFileRepository>("flowFileRepository"); + + std::atomic<int> flush_counter{0}; + + std::atomic<bool> stop{false}; + std::thread shutdown{[&] { + while (!stop.load()) { + std::this_thread::sleep_for(std::chrono::milliseconds{10}); + } + ff_repository->stop(); + }}; + + ff_repository->onFlush_ = [&] { + if (++flush_counter != 1) { + return; + } + + for (int keyIdx = 0; keyIdx < 100; ++keyIdx) { + auto file = std::make_shared<minifi::FlowFileRecord>(ff_repository, nullptr); + file->setUuidConnection(connection->getUUIDStr()); + // Serialize is sync + REQUIRE(file->Serialize()); + if (keyIdx % 2 == 0) { + // delete every second flowFile + REQUIRE(ff_repository->Delete(file->getUUIDStr())); + } + } + stop = true; + // wait for the shutdown thread to start waiting for the worker thread + std::this_thread::sleep_for(std::chrono::milliseconds{100}); + }; + + ff_repository->setConnectionMap(connectionMap); + REQUIRE(ff_repository->initialize(config)); + ff_repository->loadComponent(content_repo); + ff_repository->start(); + + shutdown.join(); + } + + // check if the deleted flowfiles are indeed deleted + { + std::shared_ptr<TestFlowFileRepository> ff_repository = std::make_shared<TestFlowFileRepository>("flowFileRepository"); + ff_repository->setConnectionMap(connectionMap); + REQUIRE(ff_repository->initialize(config)); + ff_repository->loadComponent(content_repo); + ff_repository->start(); + std::this_thread::sleep_for(std::chrono::milliseconds{100}); + REQUIRE(connection->getQueueSize() == 50); + } +}
