This is an automated email from the ASF dual-hosted git repository. szaszm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 1a11f205bf9a97a5454c206fc745204b0d08c8df Author: Adam Debreceni <[email protected]> AuthorDate: Tue Sep 12 15:58:36 2023 +0200 MINIFICPP-1372 Allow async content deletion Closes #1499 Signed-off-by: Marton Szasz <[email protected]> --- CONFIGURE.md | 11 ++++ conf/minifi.properties | 3 + .../rocksdb-repos/DatabaseContentRepository.cpp | 77 +++++++++++++++++++--- .../rocksdb-repos/DatabaseContentRepository.h | 10 +++ libminifi/include/properties/Configuration.h | 1 + libminifi/src/Configuration.cpp | 1 + .../test/persistence-tests/PersistenceTests.cpp | 1 + .../rocksdb-tests/DBContentRepositoryTests.cpp | 33 ++++++++-- .../rocksdb-tests/DBProvenanceRepositoryTests.cpp | 2 +- 9 files changed, 123 insertions(+), 16 deletions(-) diff --git a/CONFIGURE.md b/CONFIGURE.md index 478614e20..178821994 100644 --- a/CONFIGURE.md +++ b/CONFIGURE.md @@ -360,6 +360,17 @@ Finally, as the last line is commented out, it will make the state manager use p When multiple repositories use the same directory (as with `minifidb://` scheme) they should either be all plaintext or all encrypted with the same key. +### Configuring Repository Cleanup + +When a flow file content is no longer needed we can specify the deletion strategy. + + # any value other than 0 enables garbage collection with the specified frequency + # while a value of 0 sec triggers an immediate deletion as soon as the resource + # is not needed + # (the default value is 1 sec) + nifi.database.content.repository.purge.period = 1 sec + + ### Configuring Volatile and NO-OP Repositories Each of the repositories can be configured to be volatile ( state kept in memory and flushed upon restart ) or persistent. Currently, the flow file and provenance repositories can persist diff --git a/conf/minifi.properties b/conf/minifi.properties index 2bc0ff264..d2d679f35 100644 --- a/conf/minifi.properties +++ b/conf/minifi.properties @@ -38,6 +38,9 @@ nifi.content.repository.class.name=DatabaseContentRepository # nifi.flowfile.repository.rocksdb.compaction.period=2 min # nifi.database.content.repository.rocksdb.compaction.period=2 min +# setting this value to "0" enables synchronous deletion +# nifi.database.content.repository.purge.period = 1 sec + #nifi.remote.input.secure=true #nifi.security.need.ClientAuth= #nifi.security.client.certificate= diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.cpp b/extensions/rocksdb-repos/DatabaseContentRepository.cpp index 6994a482a..e89243a90 100644 --- a/extensions/rocksdb-repos/DatabaseContentRepository.cpp +++ b/extensions/rocksdb-repos/DatabaseContentRepository.cpp @@ -30,6 +30,7 @@ #include "database/RocksDbUtils.h" #include "database/StringAppender.h" #include "core/Resource.h" +#include "core/TypedValues.h" namespace org::apache::nifi::minifi::core::repository { @@ -40,6 +41,15 @@ bool DatabaseContentRepository::initialize(const std::shared_ptr<minifi::Configu } else { directory_ = (configuration->getHome() / "dbcontentrepository").string(); } + auto purge_period_str = utils::StringUtils::trim(configuration->get(Configure::nifi_dbcontent_repository_purge_period).value_or("1 s")); + if (purge_period_str == "0") { + purge_period_ = std::chrono::seconds{0}; + } else if (auto purge_period_val = core::TimePeriodValue::fromString(purge_period_str)) { + purge_period_ = purge_period_val->getMilliseconds(); + } else { + logger_->log_error("Malformed delete period value, expected time format: '%s'", purge_period_str); + purge_period_ = std::chrono::seconds{1}; + } const auto encrypted_env = createEncryptingEnv(utils::crypto::EncryptionManager{configuration->getHome()}, DbEncryptionOptions{directory_, ENCRYPTION_KEY_NAME}); logger_->log_info("Using %s DatabaseContentRepository", encrypted_env ? "encrypted" : "plaintext"); @@ -107,10 +117,15 @@ void DatabaseContentRepository::start() { return; } if (compaction_period_.count() != 0) { - compaction_thread_ = std::make_unique<utils::StoppableThread>([this] () { + compaction_thread_ = std::make_unique<utils::StoppableThread>([this] { runCompaction(); }); } + if (purge_period_.count() != 0) { + gc_thread_ = std::make_unique<utils::StoppableThread>([this] { + runGc(); + }); + } } void DatabaseContentRepository::stop() { @@ -120,6 +135,7 @@ void DatabaseContentRepository::stop() { opendb->FlushWAL(true); } compaction_thread_.reset(); + gc_thread_.reset(); } } @@ -197,18 +213,15 @@ bool DatabaseContentRepository::exists(const minifi::ResourceClaim &streamId) { } } -bool DatabaseContentRepository::removeKey(const std::string& content_path) { - if (!is_valid_ || !db_) { - logger_->log_error("DB is not valid, could not delete %s", content_path); +bool DatabaseContentRepository::removeKeySync(const std::string &content_path) { + if (!is_valid_ || !db_) return false; - } + // synchronous deletion auto opendb = db_->open(); if (!opendb) { - logger_->log_error("Could not open DB, did not delete %s", content_path); return false; } - rocksdb::Status status; - status = opendb->Delete(rocksdb::WriteOptions(), content_path); + rocksdb::Status status = opendb->Delete(rocksdb::WriteOptions(), content_path); if (status.ok()) { logger_->log_debug("Deleting resource %s", content_path); return true; @@ -216,11 +229,57 @@ bool DatabaseContentRepository::removeKey(const std::string& content_path) { logger_->log_debug("Resource %s was not found", content_path); return true; } else { - logger_->log_error("Attempted, but could not delete %s", content_path); + logger_->log_debug("Attempted, but could not delete %s", content_path); return false; } } +bool DatabaseContentRepository::removeKey(const std::string& content_path) { + if (purge_period_ == std::chrono::seconds(0)) { + return removeKeySync(content_path); + } + // asynchronous deletion + std::lock_guard guard(keys_mtx_); + logger_->log_debug("Staging resource for deletion %s", content_path); + keys_to_delete_.push_back(content_path); + return true; +} + +void DatabaseContentRepository::runGc() { + while (!utils::StoppableThread::waitForStopRequest(purge_period_)) { + auto opendb = db_->open(); + if (!opendb) { + continue; + } + // keys_to_delete_ is not persisted, in memory only, and is lost on restart + // the clearOrphans method is executed during agent startup making sure that this + // does not cause a content leak + std::vector<std::string> keys; + { + std::lock_guard guard(keys_mtx_); + keys = std::exchange(keys_to_delete_, std::vector<std::string>{}); + } + auto batch = opendb->createWriteBatch(); + for (auto& key : keys) { + batch.Delete(key); + } + rocksdb::Status status; + status = opendb->Write(rocksdb::WriteOptions(), &batch); + if (status.ok()) { + for (auto& key : keys) { + logger_->log_debug("Deleted resource async %s", key); + } + } else { + for (auto& key : keys) { + logger_->log_debug("Failed to delete resource async %s", key); + } + // move keys we could not delete back to the list for a retry + std::lock_guard guard(keys_mtx_); + keys_to_delete_.insert(keys_to_delete_.end(), keys.begin(), keys.end()); + } + } +} + std::shared_ptr<io::BaseStream> DatabaseContentRepository::write(const minifi::ResourceClaim& claim, bool /*append*/, minifi::internal::WriteBatch* batch) { // the traditional approach with these has been to return -1 from the stream; however, since we have the ability here // we can simply return a nullptr, which is also valid from the API when this stream is not valid. diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.h b/extensions/rocksdb-repos/DatabaseContentRepository.h index ec3c24199..46641e7f1 100644 --- a/extensions/rocksdb-repos/DatabaseContentRepository.h +++ b/extensions/rocksdb-repos/DatabaseContentRepository.h @@ -21,6 +21,7 @@ #include <string_view> #include <utility> #include <thread> +#include <vector> #include "core/ContentRepository.h" #include "core/BufferedContentSession.h" @@ -79,7 +80,11 @@ class DatabaseContentRepository : public core::ContentRepository { uint64_t getRepositoryEntryCount() const override; std::optional<RepositoryMetricsSource::RocksDbStats> getRocksDbStats() const override; + private: + void runGc(); + protected: + bool removeKeySync(const std::string& content_path); bool removeKey(const std::string& content_path) override; std::shared_ptr<io::BaseStream> write(const minifi::ResourceClaim &claim, bool append, minifi::internal::WriteBatch* batch); @@ -93,6 +98,11 @@ class DatabaseContentRepository : public core::ContentRepository { std::chrono::milliseconds compaction_period_{DEFAULT_COMPACTION_PERIOD}; std::unique_ptr<utils::StoppableThread> compaction_thread_; + + std::chrono::milliseconds purge_period_{std::chrono::seconds{1}}; + std::mutex keys_mtx_; + std::vector<std::string> keys_to_delete_; + std::unique_ptr<utils::StoppableThread> gc_thread_; }; } // namespace org::apache::nifi::minifi::core::repository diff --git a/libminifi/include/properties/Configuration.h b/libminifi/include/properties/Configuration.h index a87e0ee78..579adca34 100644 --- a/libminifi/include/properties/Configuration.h +++ b/libminifi/include/properties/Configuration.h @@ -71,6 +71,7 @@ class Configuration : public Properties { // these are internal properties related to the rocksdb backend static constexpr const char *nifi_flowfile_repository_rocksdb_compaction_period = "nifi.flowfile.repository.rocksdb.compaction.period"; static constexpr const char *nifi_dbcontent_repository_rocksdb_compaction_period = "nifi.database.content.repository.rocksdb.compaction.period"; + static constexpr const char *nifi_dbcontent_repository_purge_period = "nifi.database.content.repository.purge.period"; static constexpr const char *nifi_remote_input_secure = "nifi.remote.input.secure"; static constexpr const char *nifi_security_need_ClientAuth = "nifi.security.need.ClientAuth"; diff --git a/libminifi/src/Configuration.cpp b/libminifi/src/Configuration.cpp index 62d4e49d5..84a77a9ef 100644 --- a/libminifi/src/Configuration.cpp +++ b/libminifi/src/Configuration.cpp @@ -53,6 +53,7 @@ const std::unordered_map<std::string_view, gsl::not_null<const core::PropertyVal {Configuration::nifi_dbcontent_repository_directory_default, gsl::make_not_null(&core::StandardPropertyTypes::VALID_TYPE)}, {Configuration::nifi_flowfile_repository_rocksdb_compaction_period, gsl::make_not_null(&core::StandardPropertyTypes::TIME_PERIOD_TYPE)}, {Configuration::nifi_dbcontent_repository_rocksdb_compaction_period, gsl::make_not_null(&core::StandardPropertyTypes::TIME_PERIOD_TYPE)}, + {Configuration::nifi_dbcontent_repository_purge_period, gsl::make_not_null(&core::StandardPropertyTypes::TIME_PERIOD_TYPE)}, {Configuration::nifi_remote_input_secure, gsl::make_not_null(&core::StandardPropertyTypes::BOOLEAN_TYPE)}, {Configuration::nifi_security_need_ClientAuth, gsl::make_not_null(&core::StandardPropertyTypes::BOOLEAN_TYPE)}, {Configuration::nifi_sensitive_props_additional_keys, gsl::make_not_null(&core::StandardPropertyTypes::VALID_TYPE)}, diff --git a/libminifi/test/persistence-tests/PersistenceTests.cpp b/libminifi/test/persistence-tests/PersistenceTests.cpp index 5eb5a3da3..c62a77915 100644 --- a/libminifi/test/persistence-tests/PersistenceTests.cpp +++ b/libminifi/test/persistence-tests/PersistenceTests.cpp @@ -278,6 +278,7 @@ TEST_CASE("Persisted flowFiles are updated on modification", "[TestP1]") { auto config = std::make_shared<minifi::Configure>(); config->set(minifi::Configure::nifi_dbcontent_repository_directory_default, (dir / "content_repository").string()); config->set(minifi::Configure::nifi_flowfile_repository_directory_default, (dir / "flowfile_repository").string()); + config->set(minifi::Configure::nifi_dbcontent_repository_purge_period, "0 s"); std::shared_ptr<core::Repository> prov_repo = std::make_shared<TestThreadedRepository>(); std::shared_ptr<core::Repository> ff_repository = std::make_shared<core::repository::FlowFileRepository>("flowFileRepository"); diff --git a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp index ac84cd4b0..592418d7e 100644 --- a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp +++ b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp @@ -28,6 +28,7 @@ #include "../Catch.h" #include "../unit/ProvenanceTestHelper.h" #include "../unit/ContentRepositoryDependentTests.h" +#include "IntegrationTestUtils.h" class TestDatabaseContentRepository : public core::repository::DatabaseContentRepository { public: @@ -79,6 +80,7 @@ TEST_CASE("Write Claim", "[TestDBCR1]") { TEST_CASE("Delete Claim", "[TestDBCR2]") { TestController testController; + LogTestController::getInstance().setDebug<core::repository::DatabaseContentRepository>(); auto dir = testController.createTempDirectory(); auto content_repo = std::make_shared<TestDatabaseContentRepository>(); @@ -103,16 +105,35 @@ TEST_CASE("Delete Claim", "[TestDBCR2]") { configuration = std::make_shared<org::apache::nifi::minifi::Configure>(); configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir.string()); - REQUIRE(content_repo->initialize(configuration)); - content_repo->remove(*claim); + std::string readstr; - auto read_stream = content_repo->read(*claim); + SECTION("Sync") { + configuration->set(minifi::Configure::nifi_dbcontent_repository_purge_period, "0"); + REQUIRE(content_repo->initialize(configuration)); - std::string readstr; + content_repo->remove(*claim); - // error tells us we have an invalid stream - REQUIRE(minifi::io::isError(read_stream->read(readstr))); + auto read_stream = content_repo->read(*claim); + + // error tells us we have an invalid stream + REQUIRE(minifi::io::isError(read_stream->read(readstr))); + } + + SECTION("Async") { + configuration->set(minifi::Configure::nifi_dbcontent_repository_purge_period, "100 ms"); + REQUIRE(content_repo->initialize(configuration)); + content_repo->start(); + + content_repo->remove(*claim); + + // an immediate read will still be able to access the content + REQUIRE_FALSE(minifi::io::isError(content_repo->read(*claim)->read(readstr))); + + REQUIRE(minifi::utils::verifyEventHappenedInPollTime(1s, [&] { + return minifi::io::isError(content_repo->read(*claim)->read(readstr)); + })); + } } TEST_CASE("Test Empty Claim", "[TestDBCR3]") { diff --git a/libminifi/test/rocksdb-tests/DBProvenanceRepositoryTests.cpp b/libminifi/test/rocksdb-tests/DBProvenanceRepositoryTests.cpp index ef061d290..8f2e23138 100644 --- a/libminifi/test/rocksdb-tests/DBProvenanceRepositoryTests.cpp +++ b/libminifi/test/rocksdb-tests/DBProvenanceRepositoryTests.cpp @@ -48,7 +48,7 @@ void provisionRepo(minifi::provenance::ProvenanceRepository& repo, size_t number } void verifyMaxKeyCount(const minifi::provenance::ProvenanceRepository& repo, uint64_t keyCount) { - uint64_t k = keyCount; + uint64_t k = std::numeric_limits<uint64_t>::max(); for (int i = 0; i < 50; ++i) { std::this_thread::sleep_for(100ms);
