This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 1a11f205bf9a97a5454c206fc745204b0d08c8df
Author: Adam Debreceni <[email protected]>
AuthorDate: Tue Sep 12 15:58:36 2023 +0200

    MINIFICPP-1372 Allow async content deletion
    
    Closes #1499
    
    Signed-off-by: Marton Szasz <[email protected]>
---
 CONFIGURE.md                                       | 11 ++++
 conf/minifi.properties                             |  3 +
 .../rocksdb-repos/DatabaseContentRepository.cpp    | 77 +++++++++++++++++++---
 .../rocksdb-repos/DatabaseContentRepository.h      | 10 +++
 libminifi/include/properties/Configuration.h       |  1 +
 libminifi/src/Configuration.cpp                    |  1 +
 .../test/persistence-tests/PersistenceTests.cpp    |  1 +
 .../rocksdb-tests/DBContentRepositoryTests.cpp     | 33 ++++++++--
 .../rocksdb-tests/DBProvenanceRepositoryTests.cpp  |  2 +-
 9 files changed, 123 insertions(+), 16 deletions(-)

diff --git a/CONFIGURE.md b/CONFIGURE.md
index 478614e20..178821994 100644
--- a/CONFIGURE.md
+++ b/CONFIGURE.md
@@ -360,6 +360,17 @@ Finally, as the last line is commented out, it will make 
the state manager use p
 
 When multiple repositories use the same directory (as with `minifidb://` 
scheme) they should either be all plaintext or all encrypted with the same key.
 
+### Configuring Repository Cleanup
+
+When a flow file content is no longer needed we can specify the deletion 
strategy.
+
+    # any value other than 0 enables garbage collection with the specified 
frequency
+    # while a value of 0 sec triggers an immediate deletion as soon as the 
resource
+    # is not needed
+    # (the default value is 1 sec)
+    nifi.database.content.repository.purge.period = 1 sec
+
+
 ### Configuring Volatile and NO-OP Repositories
 Each of the repositories can be configured to be volatile ( state kept in 
memory and flushed
  upon restart ) or persistent. Currently, the flow file and provenance 
repositories can persist
diff --git a/conf/minifi.properties b/conf/minifi.properties
index 2bc0ff264..d2d679f35 100644
--- a/conf/minifi.properties
+++ b/conf/minifi.properties
@@ -38,6 +38,9 @@ nifi.content.repository.class.name=DatabaseContentRepository
 # nifi.flowfile.repository.rocksdb.compaction.period=2 min
 # nifi.database.content.repository.rocksdb.compaction.period=2 min
 
+# setting this value to "0" enables synchronous deletion
+# nifi.database.content.repository.purge.period = 1 sec
+
 #nifi.remote.input.secure=true
 #nifi.security.need.ClientAuth=
 #nifi.security.client.certificate=
diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.cpp 
b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
index 6994a482a..e89243a90 100644
--- a/extensions/rocksdb-repos/DatabaseContentRepository.cpp
+++ b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
@@ -30,6 +30,7 @@
 #include "database/RocksDbUtils.h"
 #include "database/StringAppender.h"
 #include "core/Resource.h"
+#include "core/TypedValues.h"
 
 namespace org::apache::nifi::minifi::core::repository {
 
@@ -40,6 +41,15 @@ bool DatabaseContentRepository::initialize(const 
std::shared_ptr<minifi::Configu
   } else {
     directory_ = (configuration->getHome() / "dbcontentrepository").string();
   }
+  auto purge_period_str = 
utils::StringUtils::trim(configuration->get(Configure::nifi_dbcontent_repository_purge_period).value_or("1
 s"));
+  if (purge_period_str == "0") {
+    purge_period_ = std::chrono::seconds{0};
+  } else if (auto purge_period_val = 
core::TimePeriodValue::fromString(purge_period_str)) {
+    purge_period_ = purge_period_val->getMilliseconds();
+  } else {
+    logger_->log_error("Malformed delete period value, expected time format: 
'%s'", purge_period_str);
+    purge_period_ = std::chrono::seconds{1};
+  }
   const auto encrypted_env = 
createEncryptingEnv(utils::crypto::EncryptionManager{configuration->getHome()}, 
DbEncryptionOptions{directory_, ENCRYPTION_KEY_NAME});
   logger_->log_info("Using %s DatabaseContentRepository", encrypted_env ? 
"encrypted" : "plaintext");
 
@@ -107,10 +117,15 @@ void DatabaseContentRepository::start() {
     return;
   }
   if (compaction_period_.count() != 0) {
-    compaction_thread_ = std::make_unique<utils::StoppableThread>([this] () {
+    compaction_thread_ = std::make_unique<utils::StoppableThread>([this] {
       runCompaction();
     });
   }
+  if (purge_period_.count() != 0) {
+    gc_thread_ = std::make_unique<utils::StoppableThread>([this] {
+      runGc();
+    });
+  }
 }
 
 void DatabaseContentRepository::stop() {
@@ -120,6 +135,7 @@ void DatabaseContentRepository::stop() {
       opendb->FlushWAL(true);
     }
     compaction_thread_.reset();
+    gc_thread_.reset();
   }
 }
 
@@ -197,18 +213,15 @@ bool DatabaseContentRepository::exists(const 
minifi::ResourceClaim &streamId) {
   }
 }
 
-bool DatabaseContentRepository::removeKey(const std::string& content_path) {
-  if (!is_valid_ || !db_) {
-    logger_->log_error("DB is not valid, could not delete %s", content_path);
+bool DatabaseContentRepository::removeKeySync(const std::string &content_path) 
{
+  if (!is_valid_ || !db_)
     return false;
-  }
+  // synchronous deletion
   auto opendb = db_->open();
   if (!opendb) {
-    logger_->log_error("Could not open DB, did not delete %s", content_path);
     return false;
   }
-  rocksdb::Status status;
-  status = opendb->Delete(rocksdb::WriteOptions(), content_path);
+  rocksdb::Status status = opendb->Delete(rocksdb::WriteOptions(), 
content_path);
   if (status.ok()) {
     logger_->log_debug("Deleting resource %s", content_path);
     return true;
@@ -216,11 +229,57 @@ bool DatabaseContentRepository::removeKey(const 
std::string& content_path) {
     logger_->log_debug("Resource %s was not found", content_path);
     return true;
   } else {
-    logger_->log_error("Attempted, but could not delete %s", content_path);
+    logger_->log_debug("Attempted, but could not delete %s", content_path);
     return false;
   }
 }
 
+bool DatabaseContentRepository::removeKey(const std::string& content_path) {
+  if (purge_period_ == std::chrono::seconds(0)) {
+    return removeKeySync(content_path);
+  }
+  // asynchronous deletion
+  std::lock_guard guard(keys_mtx_);
+  logger_->log_debug("Staging resource for deletion %s", content_path);
+  keys_to_delete_.push_back(content_path);
+  return true;
+}
+
+void DatabaseContentRepository::runGc() {
+  while (!utils::StoppableThread::waitForStopRequest(purge_period_)) {
+    auto opendb = db_->open();
+    if (!opendb) {
+      continue;
+    }
+    // keys_to_delete_ is not persisted, in memory only, and is lost on restart
+    // the clearOrphans method is executed during agent startup making sure 
that this
+    // does not cause a content leak
+    std::vector<std::string> keys;
+    {
+      std::lock_guard guard(keys_mtx_);
+      keys = std::exchange(keys_to_delete_, std::vector<std::string>{});
+    }
+    auto batch = opendb->createWriteBatch();
+    for (auto& key : keys) {
+      batch.Delete(key);
+    }
+    rocksdb::Status status;
+    status = opendb->Write(rocksdb::WriteOptions(), &batch);
+    if (status.ok()) {
+      for (auto& key : keys) {
+        logger_->log_debug("Deleted resource async %s", key);
+      }
+    } else {
+      for (auto& key : keys) {
+        logger_->log_debug("Failed to delete resource async %s", key);
+      }
+      // move keys we could not delete back to the list for a retry
+      std::lock_guard guard(keys_mtx_);
+      keys_to_delete_.insert(keys_to_delete_.end(), keys.begin(), keys.end());
+    }
+  }
+}
+
 std::shared_ptr<io::BaseStream> DatabaseContentRepository::write(const 
minifi::ResourceClaim& claim, bool /*append*/, minifi::internal::WriteBatch* 
batch) {
   // the traditional approach with these has been to return -1 from the 
stream; however, since we have the ability here
   // we can simply return a nullptr, which is also valid from the API when 
this stream is not valid.
diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.h 
b/extensions/rocksdb-repos/DatabaseContentRepository.h
index ec3c24199..46641e7f1 100644
--- a/extensions/rocksdb-repos/DatabaseContentRepository.h
+++ b/extensions/rocksdb-repos/DatabaseContentRepository.h
@@ -21,6 +21,7 @@
 #include <string_view>
 #include <utility>
 #include <thread>
+#include <vector>
 
 #include "core/ContentRepository.h"
 #include "core/BufferedContentSession.h"
@@ -79,7 +80,11 @@ class DatabaseContentRepository : public 
core::ContentRepository {
   uint64_t getRepositoryEntryCount() const override;
   std::optional<RepositoryMetricsSource::RocksDbStats> getRocksDbStats() const 
override;
 
+ private:
+  void runGc();
+
  protected:
+  bool removeKeySync(const std::string& content_path);
   bool removeKey(const std::string& content_path) override;
 
   std::shared_ptr<io::BaseStream> write(const minifi::ResourceClaim &claim, 
bool append, minifi::internal::WriteBatch* batch);
@@ -93,6 +98,11 @@ class DatabaseContentRepository : public 
core::ContentRepository {
 
   std::chrono::milliseconds compaction_period_{DEFAULT_COMPACTION_PERIOD};
   std::unique_ptr<utils::StoppableThread> compaction_thread_;
+
+  std::chrono::milliseconds purge_period_{std::chrono::seconds{1}};
+  std::mutex keys_mtx_;
+  std::vector<std::string> keys_to_delete_;
+  std::unique_ptr<utils::StoppableThread> gc_thread_;
 };
 
 }  // namespace org::apache::nifi::minifi::core::repository
diff --git a/libminifi/include/properties/Configuration.h 
b/libminifi/include/properties/Configuration.h
index a87e0ee78..579adca34 100644
--- a/libminifi/include/properties/Configuration.h
+++ b/libminifi/include/properties/Configuration.h
@@ -71,6 +71,7 @@ class Configuration : public Properties {
   // these are internal properties related to the rocksdb backend
   static constexpr const char 
*nifi_flowfile_repository_rocksdb_compaction_period = 
"nifi.flowfile.repository.rocksdb.compaction.period";
   static constexpr const char 
*nifi_dbcontent_repository_rocksdb_compaction_period = 
"nifi.database.content.repository.rocksdb.compaction.period";
+  static constexpr const char *nifi_dbcontent_repository_purge_period = 
"nifi.database.content.repository.purge.period";
 
   static constexpr const char *nifi_remote_input_secure = 
"nifi.remote.input.secure";
   static constexpr const char *nifi_security_need_ClientAuth = 
"nifi.security.need.ClientAuth";
diff --git a/libminifi/src/Configuration.cpp b/libminifi/src/Configuration.cpp
index 62d4e49d5..84a77a9ef 100644
--- a/libminifi/src/Configuration.cpp
+++ b/libminifi/src/Configuration.cpp
@@ -53,6 +53,7 @@ const std::unordered_map<std::string_view, 
gsl::not_null<const core::PropertyVal
   {Configuration::nifi_dbcontent_repository_directory_default, 
gsl::make_not_null(&core::StandardPropertyTypes::VALID_TYPE)},
   {Configuration::nifi_flowfile_repository_rocksdb_compaction_period, 
gsl::make_not_null(&core::StandardPropertyTypes::TIME_PERIOD_TYPE)},
   {Configuration::nifi_dbcontent_repository_rocksdb_compaction_period, 
gsl::make_not_null(&core::StandardPropertyTypes::TIME_PERIOD_TYPE)},
+  {Configuration::nifi_dbcontent_repository_purge_period, 
gsl::make_not_null(&core::StandardPropertyTypes::TIME_PERIOD_TYPE)},
   {Configuration::nifi_remote_input_secure, 
gsl::make_not_null(&core::StandardPropertyTypes::BOOLEAN_TYPE)},
   {Configuration::nifi_security_need_ClientAuth, 
gsl::make_not_null(&core::StandardPropertyTypes::BOOLEAN_TYPE)},
   {Configuration::nifi_sensitive_props_additional_keys, 
gsl::make_not_null(&core::StandardPropertyTypes::VALID_TYPE)},
diff --git a/libminifi/test/persistence-tests/PersistenceTests.cpp 
b/libminifi/test/persistence-tests/PersistenceTests.cpp
index 5eb5a3da3..c62a77915 100644
--- a/libminifi/test/persistence-tests/PersistenceTests.cpp
+++ b/libminifi/test/persistence-tests/PersistenceTests.cpp
@@ -278,6 +278,7 @@ TEST_CASE("Persisted flowFiles are updated on 
modification", "[TestP1]") {
   auto config = std::make_shared<minifi::Configure>();
   config->set(minifi::Configure::nifi_dbcontent_repository_directory_default, 
(dir / "content_repository").string());
   config->set(minifi::Configure::nifi_flowfile_repository_directory_default, 
(dir / "flowfile_repository").string());
+  config->set(minifi::Configure::nifi_dbcontent_repository_purge_period, "0 
s");
 
   std::shared_ptr<core::Repository> prov_repo = 
std::make_shared<TestThreadedRepository>();
   std::shared_ptr<core::Repository> ff_repository = 
std::make_shared<core::repository::FlowFileRepository>("flowFileRepository");
diff --git a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp 
b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
index ac84cd4b0..592418d7e 100644
--- a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
+++ b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
@@ -28,6 +28,7 @@
 #include "../Catch.h"
 #include "../unit/ProvenanceTestHelper.h"
 #include "../unit/ContentRepositoryDependentTests.h"
+#include "IntegrationTestUtils.h"
 
 class TestDatabaseContentRepository : public 
core::repository::DatabaseContentRepository {
  public:
@@ -79,6 +80,7 @@ TEST_CASE("Write Claim", "[TestDBCR1]") {
 
 TEST_CASE("Delete Claim", "[TestDBCR2]") {
   TestController testController;
+  
LogTestController::getInstance().setDebug<core::repository::DatabaseContentRepository>();
   auto dir = testController.createTempDirectory();
   auto content_repo = std::make_shared<TestDatabaseContentRepository>();
 
@@ -103,16 +105,35 @@ TEST_CASE("Delete Claim", "[TestDBCR2]") {
 
   configuration = std::make_shared<org::apache::nifi::minifi::Configure>();
   
configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default,
 dir.string());
-  REQUIRE(content_repo->initialize(configuration));
 
-  content_repo->remove(*claim);
+  std::string readstr;
 
-  auto read_stream = content_repo->read(*claim);
+  SECTION("Sync") {
+    
configuration->set(minifi::Configure::nifi_dbcontent_repository_purge_period, 
"0");
+    REQUIRE(content_repo->initialize(configuration));
 
-  std::string readstr;
+    content_repo->remove(*claim);
 
-  // error tells us we have an invalid stream
-  REQUIRE(minifi::io::isError(read_stream->read(readstr)));
+    auto read_stream = content_repo->read(*claim);
+
+    // error tells us we have an invalid stream
+    REQUIRE(minifi::io::isError(read_stream->read(readstr)));
+  }
+
+  SECTION("Async") {
+    
configuration->set(minifi::Configure::nifi_dbcontent_repository_purge_period, 
"100 ms");
+    REQUIRE(content_repo->initialize(configuration));
+    content_repo->start();
+
+    content_repo->remove(*claim);
+
+    // an immediate read will still be able to access the content
+    
REQUIRE_FALSE(minifi::io::isError(content_repo->read(*claim)->read(readstr)));
+
+    REQUIRE(minifi::utils::verifyEventHappenedInPollTime(1s, [&] {
+      return minifi::io::isError(content_repo->read(*claim)->read(readstr));
+    }));
+  }
 }
 
 TEST_CASE("Test Empty Claim", "[TestDBCR3]") {
diff --git a/libminifi/test/rocksdb-tests/DBProvenanceRepositoryTests.cpp 
b/libminifi/test/rocksdb-tests/DBProvenanceRepositoryTests.cpp
index ef061d290..8f2e23138 100644
--- a/libminifi/test/rocksdb-tests/DBProvenanceRepositoryTests.cpp
+++ b/libminifi/test/rocksdb-tests/DBProvenanceRepositoryTests.cpp
@@ -48,7 +48,7 @@ void provisionRepo(minifi::provenance::ProvenanceRepository& 
repo, size_t number
 }
 
 void verifyMaxKeyCount(const minifi::provenance::ProvenanceRepository& repo, 
uint64_t keyCount) {
-  uint64_t k = keyCount;
+  uint64_t k = std::numeric_limits<uint64_t>::max();
 
   for (int i = 0; i < 50; ++i) {
     std::this_thread::sleep_for(100ms);

Reply via email to