This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 1e5e3ae8e006e64dff81096c4cf558b452fd3d1b
Author: Adam Debreceni <[email protected]>
AuthorDate: Mon Jun 29 14:51:08 2020 +0200

    MINIFICPP-1274 - Commit delete operation before shutdown
    
    Signed-off-by: Arpad Boda <[email protected]>
    
    This closes #826
---
 extensions/rocksdb-repos/FlowFileRepository.cpp |  1 +
 libminifi/include/core/Repository.h             |  2 +-
 libminifi/test/rocksdb-tests/RepoTests.cpp      | 81 +++++++++++++++++++++++++
 3 files changed, 83 insertions(+), 1 deletion(-)

diff --git a/extensions/rocksdb-repos/FlowFileRepository.cpp 
b/extensions/rocksdb-repos/FlowFileRepository.cpp
index 428db97..127ce9b 100644
--- a/extensions/rocksdb-repos/FlowFileRepository.cpp
+++ b/extensions/rocksdb-repos/FlowFileRepository.cpp
@@ -119,6 +119,7 @@ void FlowFileRepository::run() {
       last = now;
     }
   }
+  flush();
 }
 
 void FlowFileRepository::prune_stored_flowfiles() {
diff --git a/libminifi/include/core/Repository.h 
b/libminifi/include/core/Repository.h
index 3de1a54..e44c479 100644
--- a/libminifi/include/core/Repository.h
+++ b/libminifi/include/core/Repository.h
@@ -242,7 +242,7 @@ class Repository : public virtual 
core::SerializableComponent, public core::Trac
   // thread
   std::thread thread_;
   // whether the monitoring thread is running for the repo while it was enabled
-  bool running_;
+  std::atomic<bool> running_;
   // whether stop accepting provenace event
   std::atomic<bool> repo_full_;
   // repoSize
diff --git a/libminifi/test/rocksdb-tests/RepoTests.cpp 
b/libminifi/test/rocksdb-tests/RepoTests.cpp
index e970c40..7521d6f 100644
--- a/libminifi/test/rocksdb-tests/RepoTests.cpp
+++ b/libminifi/test/rocksdb-tests/RepoTests.cpp
@@ -326,3 +326,84 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") {
 
   LogTestController::getInstance().reset();
 }
+
+TEST_CASE("Flush deleted flowfiles before shutdown", "[TestFFR7]") {
+  class TestFlowFileRepository: public core::repository::FlowFileRepository{
+   public:
+    explicit TestFlowFileRepository(const std::string& name)
+        : core::SerializableComponent(name),
+          FlowFileRepository(name, FLOWFILE_REPOSITORY_DIRECTORY, 
MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME,
+                             MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, 1) {}
+    void flush() override {
+      FlowFileRepository::flush();
+      if (onFlush_) {
+        onFlush_();
+      }
+    }
+    std::function<void()> onFlush_;
+  };
+
+  TestController testController;
+  char format[] = "/var/tmp/testRepo.XXXXXX";
+  auto dir = testController.createTempDirectory(format);
+
+  auto config = std::make_shared<minifi::Configure>();
+  config->set(minifi::Configure::nifi_flowfile_repository_directory_default, 
utils::file::FileUtils::concat_path(dir, "flowfile_repository"));
+
+  auto content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
+
+  auto connection = std::make_shared<minifi::Connection>(nullptr, nullptr, 
"Connection");
+  std::map<std::string, std::shared_ptr<core::Connectable>> 
connectionMap{{connection->getUUIDStr(), connection}};
+  // initialize repository
+  {
+    std::shared_ptr<TestFlowFileRepository> ff_repository = 
std::make_shared<TestFlowFileRepository>("flowFileRepository");
+
+    std::atomic<int> flush_counter{0};
+
+    std::atomic<bool> stop{false};
+    std::thread shutdown{[&] {
+      while (!stop.load()) {
+        std::this_thread::sleep_for(std::chrono::milliseconds{10});
+      }
+      ff_repository->stop();
+    }};
+
+    ff_repository->onFlush_ = [&] {
+      if (++flush_counter != 1) {
+        return;
+      }
+
+      for (int keyIdx = 0; keyIdx < 100; ++keyIdx) {
+        auto file = std::make_shared<minifi::FlowFileRecord>(ff_repository, 
nullptr);
+        file->setUuidConnection(connection->getUUIDStr());
+        // Serialize is sync
+        REQUIRE(file->Serialize());
+        if (keyIdx % 2 == 0) {
+          // delete every second flowFile
+          REQUIRE(ff_repository->Delete(file->getUUIDStr()));
+        }
+      }
+      stop = true;
+      // wait for the shutdown thread to start waiting for the worker thread
+      std::this_thread::sleep_for(std::chrono::milliseconds{100});
+    };
+
+    ff_repository->setConnectionMap(connectionMap);
+    REQUIRE(ff_repository->initialize(config));
+    ff_repository->loadComponent(content_repo);
+    ff_repository->start();
+
+    shutdown.join();
+  }
+
+  // check if the deleted flowfiles are indeed deleted
+  {
+    std::shared_ptr<TestFlowFileRepository> ff_repository = 
std::make_shared<TestFlowFileRepository>("flowFileRepository");
+    ff_repository->setConnectionMap(connectionMap);
+    REQUIRE(ff_repository->initialize(config));
+    ff_repository->loadComponent(content_repo);
+    ff_repository->start();
+    std::this_thread::sleep_for(std::chrono::milliseconds{100});
+    REQUIRE(connection->getQueueSize() == 50);
+  }
+}

Reply via email to