This is an automated email from the ASF dual-hosted git repository. szaszm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 7e8c5c49c0987bf3e2dcf1e0e16599c5cd7e69ec Author: Adam Debreceni <[email protected]> AuthorDate: Fri Feb 17 01:14:07 2023 +0100 MINIFICPP-2045 Synchronous flow file reloading ... and orphan content cleanup at startup Closes #1509 Signed-off-by: Marton Szasz <[email protected]> --- CONFIGURE.md | 1 - conf/minifi.properties | 1 - .../rocksdb-repos/DatabaseContentRepository.cpp | 33 ++++++ .../rocksdb-repos/DatabaseContentRepository.h | 2 + extensions/rocksdb-repos/FlowFileRepository.cpp | 116 ++++---------------- extensions/rocksdb-repos/FlowFileRepository.h | 12 --- libminifi/include/core/ContentRepository.h | 2 + .../include/core/repository/FileSystemRepository.h | 2 + .../core/repository/VolatileContentRepository.h | 4 + libminifi/include/properties/Configuration.h | 1 - libminifi/src/Configuration.cpp | 1 - .../src/core/repository/FileSystemRepository.cpp | 13 +++ libminifi/test/flow-tests/SessionTests.cpp | 8 +- .../test/persistence-tests/PersistenceTests.cpp | 10 +- .../rocksdb-tests/DBContentRepositoryTests.cpp | 39 +++++++ libminifi/test/rocksdb-tests/EncryptionTests.cpp | 9 +- libminifi/test/rocksdb-tests/RepoTests.cpp | 120 ++++++++++++++++----- libminifi/test/unit/FileSystemRepositoryTests.cpp | 24 +++++ 18 files changed, 242 insertions(+), 156 deletions(-) diff --git a/CONFIGURE.md b/CONFIGURE.md index a56504f96..304407ecb 100644 --- a/CONFIGURE.md +++ b/CONFIGURE.md @@ -160,7 +160,6 @@ folder. You may specify your own path in place of these defaults. in minifi.properties nifi.provenance.repository.directory.default=${MINIFI_HOME}/provenance_repository nifi.flowfile.repository.directory.default=${MINIFI_HOME}/flowfile_repository - nifi.flowfile.checkpoint.directory.default=${MINIFI_HOME}/flowfile_checkpoint nifi.database.content.repository.directory.default=${MINIFI_HOME}/content_repository #### Shared database diff --git a/conf/minifi.properties b/conf/minifi.properties index dca8e39fb..491a94016 100644 --- a/conf/minifi.properties +++ b/conf/minifi.properties @@ -28,7 +28,6 @@ nifi.provenance.repository.directory.default=${MINIFI_HOME}/provenance_repositor nifi.provenance.repository.max.storage.time=1 MIN nifi.provenance.repository.max.storage.size=1 MB nifi.flowfile.repository.directory.default=${MINIFI_HOME}/flowfile_repository -nifi.flowfile.checkpoint.directory.default=${MINIFI_HOME}/flowfile_checkpoint nifi.database.content.repository.directory.default=${MINIFI_HOME}/content_repository nifi.provenance.repository.class.name=NoOpRepository nifi.content.repository.class.name=DatabaseContentRepository diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.cpp b/extensions/rocksdb-repos/DatabaseContentRepository.cpp index 0924534f0..881de1085 100644 --- a/extensions/rocksdb-repos/DatabaseContentRepository.cpp +++ b/extensions/rocksdb-repos/DatabaseContentRepository.cpp @@ -20,6 +20,7 @@ #include <memory> #include <string> #include <utility> +#include <vector> #include "encryption/RocksDbEncryptionProvider.h" #include "RocksDbStream.h" @@ -179,6 +180,38 @@ std::shared_ptr<io::BaseStream> DatabaseContentRepository::write(const minifi::R return std::make_shared<io::RocksDbStream>(claim.getContentFullPath(), gsl::make_not_null<minifi::internal::RocksDatabase*>(db_.get()), true, batch); } +void DatabaseContentRepository::clearOrphans() { + if (!is_valid_ || !db_) { + logger_->log_error("Cannot delete orphan content entries, repository is invalid"); + return; + } + auto opendb = db_->open(); + if (!opendb) { + logger_->log_error("Cannot delete orphan content entries, could not open repository"); + return; + } + std::vector<std::string> keys_to_be_deleted; + auto it = opendb->NewIterator(rocksdb::ReadOptions()); + for (it->SeekToFirst(); it->Valid(); it->Next()) { + auto key = it->key().ToString(); + auto claim_it = count_map_.find(key); + if (claim_it == count_map_.end() || claim_it->second == 0) { + logger_->log_error("Deleting orphan resource %s", key); + keys_to_be_deleted.push_back(key); + } + } + auto batch = opendb->createWriteBatch(); + for (auto& key : keys_to_be_deleted) { + batch.Delete(key); + } + + rocksdb::Status status = opendb->Write(rocksdb::WriteOptions(), &batch); + + if (!status.ok()) { + logger_->log_error("Could not delete orphan contents from rocksdb database: %s", status.ToString()); + } +} + REGISTER_RESOURCE_AS(DatabaseContentRepository, InternalResource, ("DatabaseContentRepository", "databasecontentrepository")); } // namespace org::apache::nifi::minifi::core::repository diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.h b/extensions/rocksdb-repos/DatabaseContentRepository.h index 9961d2607..98f3acb79 100644 --- a/extensions/rocksdb-repos/DatabaseContentRepository.h +++ b/extensions/rocksdb-repos/DatabaseContentRepository.h @@ -67,6 +67,8 @@ class DatabaseContentRepository : public core::ContentRepository { bool remove(const minifi::ResourceClaim &claim) override; bool exists(const minifi::ResourceClaim &streamId) override; + void clearOrphans() override; + private: std::shared_ptr<io::BaseStream> write(const minifi::ResourceClaim &claim, bool append, minifi::internal::WriteBatch* batch); diff --git a/extensions/rocksdb-repos/FlowFileRepository.cpp b/extensions/rocksdb-repos/FlowFileRepository.cpp index 690a797a8..ecb9a1bcc 100644 --- a/extensions/rocksdb-repos/FlowFileRepository.cpp +++ b/extensions/rocksdb-repos/FlowFileRepository.cpp @@ -111,9 +111,6 @@ void FlowFileRepository::printStats() { void FlowFileRepository::run() { auto last = std::chrono::steady_clock::now(); - if (isRunning()) { - prune_stored_flowfiles(); - } while (isRunning()) { std::this_thread::sleep_for(purge_period_); flush(); @@ -126,38 +123,29 @@ void FlowFileRepository::run() { flush(); } -void FlowFileRepository::prune_stored_flowfiles() { - const auto encrypted_env = createEncryptingEnv(utils::crypto::EncryptionManager{config_->getHome()}, DbEncryptionOptions{checkpoint_dir_.string(), ENCRYPTION_KEY_NAME}); - logger_->log_info("Using %s FlowFileRepository checkpoint", encrypted_env ? "encrypted" : "plaintext"); - - auto set_db_opts = [encrypted_env] (minifi::internal::Writable<rocksdb::DBOptions>& db_opts) { - db_opts.set(&rocksdb::DBOptions::create_if_missing, true); - db_opts.set(&rocksdb::DBOptions::use_direct_io_for_flush_and_compaction, true); - db_opts.set(&rocksdb::DBOptions::use_direct_reads, true); - if (encrypted_env) { - db_opts.set(&rocksdb::DBOptions::env, encrypted_env.get(), EncryptionEq{}); - } else { - db_opts.set(&rocksdb::DBOptions::env, rocksdb::Env::Default()); - } - }; - auto checkpointDB = minifi::internal::RocksDatabase::create(set_db_opts, {}, checkpoint_dir_.string(), minifi::internal::RocksDbMode::ReadOnly); - std::optional<minifi::internal::OpenRocksDb> opendb; - if (nullptr != checkpoint_) { - opendb = checkpointDB->open(); - if (opendb) { - logger_->log_trace("Successfully opened checkpoint database at '%s'", checkpoint_dir_.string()); - } else { - logger_->log_error("Couldn't open checkpoint database at '%s' using live database", checkpoint_dir_.string()); - opendb = db_->open(); - } - if (!opendb) { - logger_->log_trace("Could not open neither the checkpoint nor the live database."); - return; +bool FlowFileRepository::ExecuteWithRetry(const std::function<rocksdb::Status()>& operation) { + constexpr int RETRY_COUNT = 3; + std::chrono::milliseconds wait_time = 0ms; + for (int i=0; i < RETRY_COUNT; ++i) { + auto status = operation(); + if (status.ok()) { + logger_->log_trace("Rocksdb operation executed successfully"); + return true; } - } else { - logger_->log_trace("Could not open checkpoint as object doesn't exist. Likely not needed or file system error."); + logger_->log_error("Rocksdb operation failed: %s", status.ToString()); + wait_time += FLOWFILE_REPOSITORY_RETRY_INTERVAL_INCREMENTS; + std::this_thread::sleep_for(wait_time); + } + return false; +} + +void FlowFileRepository::initialize_repository() { + auto opendb = db_->open(); + if (!opendb) { + logger_->log_trace("Couldn't open database to load existing flow files"); return; } + logger_->log_info("Reading existing flow files from database"); auto it = opendb->NewIterator(rocksdb::ReadOptions()); for (it->SeekToFirst(); it->Valid(); it->Next()) { @@ -191,62 +179,8 @@ void FlowFileRepository::prune_stored_flowfiles() { keys_to_delete.enqueue(key); } } -} - -bool FlowFileRepository::ExecuteWithRetry(const std::function<rocksdb::Status()>& operation) { - std::chrono::milliseconds waitTime = 0ms; - for (int i=0; i < 3; ++i) { - auto status = operation(); - if (status.ok()) { - logger_->log_trace("Rocksdb operation executed successfully"); - return true; - } - logger_->log_error("Rocksdb operation failed: %s", status.ToString()); - waitTime += FLOWFILE_REPOSITORY_RETRY_INTERVAL_INCREMENTS; - std::this_thread::sleep_for(waitTime); - } - return false; -} - -/** - * Returns True if there is data to interrogate. - * @return true if our db has data stored. - */ -bool FlowFileRepository::need_checkpoint(minifi::internal::OpenRocksDb& opendb) { - auto it = opendb.NewIterator(rocksdb::ReadOptions()); - it->SeekToFirst(); - return it->Valid(); -} -void FlowFileRepository::initialize_repository() { - checkpoint_.reset(); - auto opendb = db_->open(); - if (!opendb) { - logger_->log_trace("Couldn't open database, no way to checkpoint"); - return; - } - // first we need to establish a checkpoint iff it is needed. - if (!need_checkpoint(*opendb)) { - logger_->log_trace("Do not need checkpoint"); - return; - } - // delete any previous copy - if (utils::file::delete_dir(checkpoint_dir_) < 0) { - logger_->log_error("Could not delete existing checkpoint directory '%s'", checkpoint_dir_.string()); - return; - } - std::unique_ptr<rocksdb::Checkpoint> checkpoint; - rocksdb::Status checkpoint_status = opendb->NewCheckpoint(checkpoint); - if (!checkpoint_status.ok()) { - logger_->log_error("Could not create checkpoint object: %s", checkpoint_status.ToString()); - return; - } - checkpoint_status = checkpoint->CreateCheckpoint(checkpoint_dir_.string()); - if (!checkpoint_status.ok()) { - logger_->log_error("Could not initialize checkpoint: %s", checkpoint_status.ToString()); - return; - } - checkpoint_ = std::move(checkpoint); - logger_->log_trace("Created checkpoint in directory '%s'", checkpoint_dir_.string()); + flush(); + content_repo_->clearOrphans(); } void FlowFileRepository::loadComponent(const std::shared_ptr<core::ContentRepository> &content_repo) { @@ -266,12 +200,6 @@ bool FlowFileRepository::initialize(const std::shared_ptr<Configure> &configure) } logger_->log_debug("NiFi FlowFile Repository Directory %s", directory_); - value.clear(); - if (configure->get(Configure::nifi_flowfile_checkpoint_directory_default, value) && !value.empty()) { - checkpoint_dir_ = value; - } - logger_->log_debug("NiFi FlowFile Checkpoint Directory %s", checkpoint_dir_.string()); - const auto encrypted_env = createEncryptingEnv(utils::crypto::EncryptionManager{configure->getHome()}, DbEncryptionOptions{directory_, ENCRYPTION_KEY_NAME}); logger_->log_info("Using %s FlowFileRepository", encrypted_env ? "encrypted" : "plaintext"); diff --git a/extensions/rocksdb-repos/FlowFileRepository.h b/extensions/rocksdb-repos/FlowFileRepository.h index aeda5af9c..832fde642 100644 --- a/extensions/rocksdb-repos/FlowFileRepository.h +++ b/extensions/rocksdb-repos/FlowFileRepository.h @@ -66,13 +66,11 @@ class FlowFileRepository : public ThreadedRepository, public SwapManager { } explicit FlowFileRepository(const std::string& repo_name = "", - std::filesystem::path checkpoint_dir = FLOWFILE_CHECKPOINT_DIRECTORY, std::string directory = FLOWFILE_REPOSITORY_DIRECTORY, std::chrono::milliseconds maxPartitionMillis = MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME, int64_t maxPartitionBytes = MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, std::chrono::milliseconds purgePeriod = FLOWFILE_REPOSITORY_PURGE_PERIOD) : ThreadedRepository(repo_name.length() > 0 ? std::move(repo_name) : core::getClassName<FlowFileRepository>(), std::move(directory), maxPartitionMillis, maxPartitionBytes, purgePeriod), - checkpoint_dir_(std::move(checkpoint_dir)), logger_(logging::LoggerFactory<FlowFileRepository>::getLogger()) { } @@ -112,23 +110,13 @@ class FlowFileRepository : public ThreadedRepository, public SwapManager { void initialize_repository(); - /** - * Returns true if a checkpoint is needed at startup - * @return true if a checkpoint is needed. - */ - static bool need_checkpoint(minifi::internal::OpenRocksDb& opendb); - - void prune_stored_flowfiles(); - std::thread& getThread() override { return thread_; } - std::filesystem::path checkpoint_dir_; moodycamel::ConcurrentQueue<std::string> keys_to_delete; std::shared_ptr<core::ContentRepository> content_repo_; std::unique_ptr<minifi::internal::RocksDatabase> db_; - std::unique_ptr<rocksdb::Checkpoint> checkpoint_; std::unique_ptr<FlowFileLoader> swap_loader_; std::shared_ptr<logging::Logger> logger_; std::shared_ptr<minifi::Configure> config_; diff --git a/libminifi/include/core/ContentRepository.h b/libminifi/include/core/ContentRepository.h index 84ac4eded..7d92634fb 100644 --- a/libminifi/include/core/ContentRepository.h +++ b/libminifi/include/core/ContentRepository.h @@ -51,6 +51,8 @@ class ContentRepository : public StreamManager<minifi::ResourceClaim>, public ut void incrementStreamCount(const minifi::ResourceClaim &streamId) override; StreamState decrementStreamCount(const minifi::ResourceClaim &streamId) override; + virtual void clearOrphans() = 0; + protected: std::string directory_; std::mutex count_map_mutex_; diff --git a/libminifi/include/core/repository/FileSystemRepository.h b/libminifi/include/core/repository/FileSystemRepository.h index 66cec39ef..01926f5bd 100644 --- a/libminifi/include/core/repository/FileSystemRepository.h +++ b/libminifi/include/core/repository/FileSystemRepository.h @@ -49,6 +49,8 @@ class FileSystemRepository : public core::ContentRepository { bool remove(const minifi::ResourceClaim& claim) override; std::shared_ptr<ContentSession> createSession() override; + void clearOrphans() override; + private: std::shared_ptr<logging::Logger> logger_; }; diff --git a/libminifi/include/core/repository/VolatileContentRepository.h b/libminifi/include/core/repository/VolatileContentRepository.h index 54c1dcc6f..62e9fd80b 100644 --- a/libminifi/include/core/repository/VolatileContentRepository.h +++ b/libminifi/include/core/repository/VolatileContentRepository.h @@ -94,6 +94,10 @@ class VolatileContentRepository : public core::ContentRepository { */ bool remove(const minifi::ResourceClaim &claim) override; + void clearOrphans() override { + // there are no persisted orphans to delete + } + private: VolatileRepositoryData repo_data_; bool minimize_locking_; diff --git a/libminifi/include/properties/Configuration.h b/libminifi/include/properties/Configuration.h index eb2f598dc..25e6b5ebc 100644 --- a/libminifi/include/properties/Configuration.h +++ b/libminifi/include/properties/Configuration.h @@ -66,7 +66,6 @@ class Configuration : public Properties { static constexpr const char *nifi_provenance_repository_max_storage_time = "nifi.provenance.repository.max.storage.time"; static constexpr const char *nifi_provenance_repository_directory_default = "nifi.provenance.repository.directory.default"; static constexpr const char *nifi_flowfile_repository_directory_default = "nifi.flowfile.repository.directory.default"; - static constexpr const char *nifi_flowfile_checkpoint_directory_default = "nifi.flowfile.checkpoint.directory.default"; static constexpr const char *nifi_dbcontent_repository_directory_default = "nifi.database.content.repository.directory.default"; static constexpr const char *nifi_remote_input_secure = "nifi.remote.input.secure"; static constexpr const char *nifi_security_need_ClientAuth = "nifi.security.need.ClientAuth"; diff --git a/libminifi/src/Configuration.cpp b/libminifi/src/Configuration.cpp index 69b24e9e8..78ae790f7 100644 --- a/libminifi/src/Configuration.cpp +++ b/libminifi/src/Configuration.cpp @@ -51,7 +51,6 @@ const std::vector<core::ConfigurationProperty> Configuration::CONFIGURATION_PROP core::ConfigurationProperty{Configuration::nifi_provenance_repository_max_storage_time, gsl::make_not_null(core::StandardValidators::get().TIME_PERIOD_VALIDATOR.get())}, core::ConfigurationProperty{Configuration::nifi_provenance_repository_directory_default}, core::ConfigurationProperty{Configuration::nifi_flowfile_repository_directory_default}, - core::ConfigurationProperty{Configuration::nifi_flowfile_checkpoint_directory_default}, core::ConfigurationProperty{Configuration::nifi_dbcontent_repository_directory_default}, core::ConfigurationProperty{Configuration::nifi_remote_input_secure, gsl::make_not_null(core::StandardValidators::get().BOOLEAN_VALIDATOR.get())}, core::ConfigurationProperty{Configuration::nifi_security_need_ClientAuth, gsl::make_not_null(core::StandardValidators::get().BOOLEAN_VALIDATOR.get())}, diff --git a/libminifi/src/core/repository/FileSystemRepository.cpp b/libminifi/src/core/repository/FileSystemRepository.cpp index b41cbcfaf..034e9c36a 100644 --- a/libminifi/src/core/repository/FileSystemRepository.cpp +++ b/libminifi/src/core/repository/FileSystemRepository.cpp @@ -59,4 +59,17 @@ std::shared_ptr<ContentSession> FileSystemRepository::createSession() { return std::make_shared<ForwardingContentSession>(sharedFromThis()); } +void FileSystemRepository::clearOrphans() { + std::lock_guard<std::mutex> lock(count_map_mutex_); + utils::file::list_dir(directory_, [&] (auto& /*dir*/, auto& filename) { + auto path = directory_ + "/" + filename.string(); + auto it = count_map_.find(path); + if (it == count_map_.end() || it->second == 0) { + logger_->log_debug("Deleting orphan resource %s", path); + std::remove(path.c_str()); + } + return true; + }, logger_, false); +} + } // namespace org::apache::nifi::minifi::core::repository diff --git a/libminifi/test/flow-tests/SessionTests.cpp b/libminifi/test/flow-tests/SessionTests.cpp index 59cd8e26b..b722e93b1 100644 --- a/libminifi/test/flow-tests/SessionTests.cpp +++ b/libminifi/test/flow-tests/SessionTests.cpp @@ -46,12 +46,6 @@ class TestProcessor : public minifi::core::Processor { ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS }; -#ifdef WIN32 -const std::string SESSIONTEST_FLOWFILE_CHECKPOINT_DIR = ".\\sessiontest_flowfile_checkpoint"; -#else -const std::string SESSIONTEST_FLOWFILE_CHECKPOINT_DIR = "./sessiontest_flowfile_checkpoint"; -#endif - TEST_CASE("Import null data") { TestController testController; LogTestController::getInstance().setDebug<core::ContentRepository>(); @@ -69,7 +63,7 @@ TEST_CASE("Import null data") { config->set(minifi::Configure::nifi_flowfile_repository_directory_default, (dir / "flowfile_repository").string()); std::shared_ptr<core::Repository> prov_repo = core::createRepository("nooprepository"); - std::shared_ptr<core::Repository> ff_repository = std::make_shared<core::repository::FlowFileRepository>("flowFileRepository", SESSIONTEST_FLOWFILE_CHECKPOINT_DIR); + std::shared_ptr<core::Repository> ff_repository = std::make_shared<core::repository::FlowFileRepository>("flowFileRepository"); std::shared_ptr<core::ContentRepository> content_repo; SECTION("VolatileContentRepository") { testController.getLogger()->log_info("Using VolatileContentRepository"); diff --git a/libminifi/test/persistence-tests/PersistenceTests.cpp b/libminifi/test/persistence-tests/PersistenceTests.cpp index 530aa06c6..e417ae417 100644 --- a/libminifi/test/persistence-tests/PersistenceTests.cpp +++ b/libminifi/test/persistence-tests/PersistenceTests.cpp @@ -53,12 +53,6 @@ class TestProcessor : public minifi::core::Processor { ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS }; -#ifdef WIN32 -const std::string PERSISTENCETEST_FLOWFILE_CHECKPOINT_DIR = ".\\persistencetest_flowfile_checkpoint"; -#else -const std::string PERSISTENCETEST_FLOWFILE_CHECKPOINT_DIR = "./persistencetest_flowfile_checkpoint"; -#endif - struct TestFlow{ TestFlow(const std::shared_ptr<core::Repository>& ff_repository, const std::shared_ptr<core::ContentRepository>& content_repo, const std::shared_ptr<core::Repository>& prov_repo, const std::function<std::unique_ptr<core::Processor>(utils::Identifier&)>& processorGenerator, const core::Relationship& relationshipToOutput) @@ -179,7 +173,7 @@ TEST_CASE("Processors Can Store FlowFiles", "[TestP1]") { config->set(minifi::Configure::nifi_flowfile_repository_directory_default, (dir / "flowfile_repository").string()); std::shared_ptr<core::Repository> prov_repo = std::make_shared<TestThreadedRepository>(); - auto ff_repository = std::make_shared<core::repository::FlowFileRepository>("flowFileRepository", PERSISTENCETEST_FLOWFILE_CHECKPOINT_DIR); + auto ff_repository = std::make_shared<core::repository::FlowFileRepository>("flowFileRepository"); std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::FileSystemRepository>(); ff_repository->initialize(config); content_repo->initialize(config); @@ -286,7 +280,7 @@ TEST_CASE("Persisted flowFiles are updated on modification", "[TestP1]") { config->set(minifi::Configure::nifi_flowfile_repository_directory_default, (dir / "flowfile_repository").string()); std::shared_ptr<core::Repository> prov_repo = std::make_shared<TestThreadedRepository>(); - std::shared_ptr<core::Repository> ff_repository = std::make_shared<core::repository::FlowFileRepository>("flowFileRepository", PERSISTENCETEST_FLOWFILE_CHECKPOINT_DIR); + std::shared_ptr<core::Repository> ff_repository = std::make_shared<core::repository::FlowFileRepository>("flowFileRepository"); std::shared_ptr<core::ContentRepository> content_repo; SECTION("VolatileContentRepository") { testController.getLogger()->log_info("Using VolatileContentRepository"); diff --git a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp index 0a50cd129..942f1544e 100644 --- a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp +++ b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp @@ -242,3 +242,42 @@ TEST_CASE("ProcessSession::append should append to the flowfile and set its size TEST_CASE("ProcessSession::read can read zero length flowfiles without crash (RocksDB)", "[zerolengthread]") { ContentRepositoryDependentTests::testReadFromZeroLengthFlowFile(std::make_shared<core::repository::DatabaseContentRepository>()); } + +size_t getDbSize(const std::filesystem::path& dir) { + auto db = minifi::internal::RocksDatabase::create({}, {}, dir.string()); + auto opendb = db->open(); + REQUIRE(opendb); + + size_t count = 0; + auto it = opendb->NewIterator({}); + for (it->SeekToFirst(); it->Valid(); it->Next()) { + ++count; + } + return count; +} + +TEST_CASE("DBContentRepository can clear orphan entries") { + TestController testController; + auto dir = testController.createTempDirectory(); + auto configuration = std::make_shared<org::apache::nifi::minifi::Configure>(); + configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir.string()); + { + auto content_repo = std::make_shared<core::repository::DatabaseContentRepository>(); + REQUIRE(content_repo->initialize(configuration)); + + minifi::ResourceClaim claim(content_repo); + content_repo->write(claim)->write("hi"); + // ensure that the content is not deleted during resource claim destruction + content_repo->incrementStreamCount(claim); + } + + REQUIRE(getDbSize(dir) == 1); + + { + auto content_repo = std::make_shared<core::repository::DatabaseContentRepository>(); + REQUIRE(content_repo->initialize(configuration)); + content_repo->clearOrphans(); + } + + REQUIRE(getDbSize(dir) == 0); +} diff --git a/libminifi/test/rocksdb-tests/EncryptionTests.cpp b/libminifi/test/rocksdb-tests/EncryptionTests.cpp index 2fff68980..b544d7016 100644 --- a/libminifi/test/rocksdb-tests/EncryptionTests.cpp +++ b/libminifi/test/rocksdb-tests/EncryptionTests.cpp @@ -33,7 +33,6 @@ class FFRepoFixture : public TestController { LogTestController::getInstance().setTrace<FlowFileRepository>(); home_ = createTempDirectory(); repo_dir_ = home_ / "flowfile_repo"; - checkpoint_dir_ = home_ / "checkpoint_dir"; config_ = std::make_shared<minifi::Configure>(); config_->setHome(home_); container_ = std::make_unique<minifi::Connection>(nullptr, nullptr, "container"); @@ -50,7 +49,7 @@ class FFRepoFixture : public TestController { template<typename Fn> void runWithNewRepository(Fn&& fn) { - auto repository = std::make_shared<FlowFileRepository>("ff", checkpoint_dir_, repo_dir_.string()); + auto repository = std::make_shared<FlowFileRepository>("ff", repo_dir_.string()); repository->initialize(config_); std::map<std::string, core::Connectable*> container_map; container_map[container_->getUUIDStr()] = container_.get(); @@ -65,7 +64,6 @@ class FFRepoFixture : public TestController { std::unique_ptr<minifi::Connection> container_; std::filesystem::path home_; std::filesystem::path repo_dir_; - std::filesystem::path checkpoint_dir_; std::shared_ptr<minifi::Configure> config_; std::shared_ptr<core::repository::VolatileContentRepository> content_repo_; }; @@ -93,14 +91,11 @@ TEST_CASE_METHOD(FFRepoFixture, "FlowFileRepository creates checkpoint and loads REQUIRE(container_->isEmpty()); runWithNewRepository([&] (const std::shared_ptr<core::repository::FlowFileRepository>& /*repo*/) { - // wait for the flowfiles to be loaded from the checkpoint + // wait for the flowfiles to be loaded bool success = utils::verifyEventHappenedInPollTime(std::chrono::seconds{5}, [&] { return !container_->isEmpty(); }); REQUIRE(success); - REQUIRE(utils::verifyLogLinePresenceInPollTime( - std::chrono::seconds{5}, - "Successfully opened checkpoint database at '" + checkpoint_dir_.string() + "'")); std::set<std::shared_ptr<core::FlowFile>> expired; auto flowfile = container_->poll(expired); REQUIRE(expired.empty()); diff --git a/libminifi/test/rocksdb-tests/RepoTests.cpp b/libminifi/test/rocksdb-tests/RepoTests.cpp index e280c0432..ef4d073bf 100644 --- a/libminifi/test/rocksdb-tests/RepoTests.cpp +++ b/libminifi/test/rocksdb-tests/RepoTests.cpp @@ -39,12 +39,6 @@ using namespace std::literals::chrono_literals; namespace { -#ifdef WIN32 -const std::string REPOTEST_FLOWFILE_CHECKPOINT_DIR = ".\\repotest_flowfile_checkpoint"; -#else -const std::string REPOTEST_FLOWFILE_CHECKPOINT_DIR = "./repotest_flowfile_checkpoint"; -#endif - namespace { class TestProcessor : public minifi::core::Processor { public: @@ -72,7 +66,7 @@ TEST_CASE("Test Repo Empty Value Attribute", "[TestFFR1]") { LogTestController::getInstance().setDebug<core::repository::FlowFileRepository>(); TestController testController; auto dir = testController.createTempDirectory(); - std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", REPOTEST_FLOWFILE_CHECKPOINT_DIR, dir.string(), 0ms, 0, 1ms); + std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", dir.string(), 0ms, 0, 1ms); repository->initialize(std::make_shared<minifi::Configure>()); @@ -83,8 +77,6 @@ TEST_CASE("Test Repo Empty Value Attribute", "[TestFFR1]") { REQUIRE(true == file->Persist(repository)); - utils::file::FileUtils::delete_dir(REPOTEST_FLOWFILE_CHECKPOINT_DIR, true); - repository->stop(); } @@ -94,7 +86,7 @@ TEST_CASE("Test Repo Empty Key Attribute ", "[TestFFR2]") { LogTestController::getInstance().setDebug<core::repository::FlowFileRepository>(); TestController testController; auto dir = testController.createTempDirectory(); - std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", REPOTEST_FLOWFILE_CHECKPOINT_DIR, dir.string(), 0ms, 0, 1ms); + std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", dir.string(), 0ms, 0, 1ms); repository->initialize(std::make_shared<minifi::Configure>()); std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); @@ -106,8 +98,6 @@ TEST_CASE("Test Repo Empty Key Attribute ", "[TestFFR2]") { REQUIRE(true == file->Persist(repository)); - utils::file::FileUtils::delete_dir(REPOTEST_FLOWFILE_CHECKPOINT_DIR, true); - repository->stop(); } @@ -117,7 +107,7 @@ TEST_CASE("Test Repo Key Attribute Verify ", "[TestFFR3]") { LogTestController::getInstance().setDebug<core::repository::FlowFileRepository>(); TestController testController; auto dir = testController.createTempDirectory(); - std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", REPOTEST_FLOWFILE_CHECKPOINT_DIR, dir.string(), 0ms, 0, 1ms); + std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", dir.string(), 0ms, 0, 1ms); repository->initialize(std::make_shared<org::apache::nifi::minifi::Configure>()); @@ -154,8 +144,6 @@ TEST_CASE("Test Repo Key Attribute Verify ", "[TestFFR3]") { REQUIRE(record2->getAttribute("keyB", value)); REQUIRE(value.empty()); - - utils::file::FileUtils::delete_dir(REPOTEST_FLOWFILE_CHECKPOINT_DIR, true); } TEST_CASE("Test Delete Content ", "[TestFFR4]") { @@ -167,7 +155,7 @@ TEST_CASE("Test Delete Content ", "[TestFFR4]") { auto dir = testController.createTempDirectory(); - std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", REPOTEST_FLOWFILE_CHECKPOINT_DIR, dir.string(), 0ms, 0, 1ms); + std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", dir.string(), 0ms, 0, 1ms); std::fstream file; file.open(dir / "tstFile.ext", std::ios::out); @@ -203,14 +191,11 @@ TEST_CASE("Test Delete Content ", "[TestFFR4]") { std::ifstream fileopen(dir / "tstFile.ext", std::ios::in); REQUIRE(!fileopen.good()); - utils::file::FileUtils::delete_dir(REPOTEST_FLOWFILE_CHECKPOINT_DIR, true); - LogTestController::getInstance().reset(); } TEST_CASE("Test Validate Checkpoint ", "[TestFFR5]") { TestController testController; - utils::file::FileUtils::delete_dir(REPOTEST_FLOWFILE_CHECKPOINT_DIR, true); LogTestController::getInstance().setDebug<core::ContentRepository>(); LogTestController::getInstance().setTrace<core::repository::FileSystemRepository>(); @@ -219,7 +204,7 @@ TEST_CASE("Test Validate Checkpoint ", "[TestFFR5]") { auto dir = testController.createTempDirectory(); - std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", REPOTEST_FLOWFILE_CHECKPOINT_DIR, dir.string(), 0ms, 0, 1ms); + std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", dir.string(), 0ms, 0, 1ms); std::fstream file; file.open(dir / "tstFile.ext", std::ios::out); @@ -264,8 +249,6 @@ TEST_CASE("Test Validate Checkpoint ", "[TestFFR5]") { std::ifstream fileopen(dir / "tstFile.ext", std::ios::in); REQUIRE(fileopen.fail()); - utils::file::FileUtils::delete_dir(REPOTEST_FLOWFILE_CHECKPOINT_DIR, true); - LogTestController::getInstance().reset(); } @@ -284,7 +267,7 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") { config->set(minifi::Configure::nifi_flowfile_repository_directory_default, (dir / "flowfile_repository").string()); std::shared_ptr<core::Repository> prov_repo = std::make_shared<TestThreadedRepository>(); - auto ff_repository = std::make_shared<core::repository::FlowFileRepository>("flowFileRepository", REPOTEST_FLOWFILE_CHECKPOINT_DIR); + auto ff_repository = std::make_shared<core::repository::FlowFileRepository>("flowFileRepository"); std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::FileSystemRepository>(); ff_repository->initialize(config); content_repo->initialize(config); @@ -361,7 +344,7 @@ TEST_CASE("Flush deleted flowfiles before shutdown", "[TestFFR7]") { class TestFlowFileRepository: public core::repository::FlowFileRepository{ public: explicit TestFlowFileRepository(const std::string& name) - : FlowFileRepository(name, REPOTEST_FLOWFILE_CHECKPOINT_DIR, core::repository::FLOWFILE_REPOSITORY_DIRECTORY, + : FlowFileRepository(name, core::repository::FLOWFILE_REPOSITORY_DIRECTORY, 10min, core::repository::MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, 1ms) {} void flush() override { @@ -438,4 +421,93 @@ TEST_CASE("Flush deleted flowfiles before shutdown", "[TestFFR7]") { } } +TEST_CASE("FlowFileRepository triggers content repo orphan clear") { + LogTestController::getInstance().setDebug<core::ContentRepository>(); + LogTestController::getInstance().setDebug<core::repository::FileSystemRepository>(); + LogTestController::getInstance().setDebug<core::repository::FlowFileRepository>(); + TestController testController; + auto ff_dir = testController.createTempDirectory(); + auto content_dir = testController.createTempDirectory(); + + auto config = std::make_shared<minifi::Configure>(); + config->set(minifi::Configure::nifi_flowfile_repository_directory_default, ff_dir.string()); + config->set(minifi::Configure::nifi_dbcontent_repository_directory_default, content_dir.string()); + + { + auto content_repo = std::make_shared<core::repository::FileSystemRepository>(); + REQUIRE(content_repo->initialize(config)); + minifi::ResourceClaim claim(content_repo); + content_repo->write(claim)->write("hi"); + // ensure that the content is not deleted during resource claim destruction + content_repo->incrementStreamCount(claim); + } + + REQUIRE(utils::file::list_dir_all(content_dir, testController.getLogger()).size() == 1); + + auto ff_repo = std::make_shared<core::repository::FlowFileRepository>(); + REQUIRE(ff_repo->initialize(config)); + auto content_repo = std::make_shared<core::repository::FileSystemRepository>(); + REQUIRE(content_repo->initialize(config)); + + ff_repo->loadComponent(content_repo); + + REQUIRE(utils::file::list_dir_all(content_dir, testController.getLogger()).empty()); +} + +TEST_CASE("FlowFileRepository synchronously pushes existing flow files") { + LogTestController::getInstance().setDebug<core::ContentRepository>(); + LogTestController::getInstance().setDebug<core::repository::FileSystemRepository>(); + LogTestController::getInstance().setDebug<core::repository::FlowFileRepository>(); + TestController testController; + auto ff_dir = testController.createTempDirectory(); + auto content_dir = testController.createTempDirectory(); + + auto config = std::make_shared<minifi::Configure>(); + config->set(minifi::Configure::nifi_flowfile_repository_directory_default, ff_dir.string()); + config->set(minifi::Configure::nifi_dbcontent_repository_directory_default, content_dir.string()); + + + utils::Identifier ff_id; + auto connection_id = utils::IdGenerator::getIdGenerator()->generate(); + + { + auto ff_repo = std::make_shared<core::repository::FlowFileRepository>(); + REQUIRE(ff_repo->initialize(config)); + auto content_repo = std::make_shared<core::repository::FileSystemRepository>(); + REQUIRE(content_repo->initialize(config)); + auto conn = std::make_shared<minifi::Connection>(ff_repo, content_repo, "TestConnection", connection_id); + + auto claim = std::make_shared<minifi::ResourceClaim>(content_repo); + + std::vector<std::pair<std::string, std::unique_ptr<minifi::io::BufferStream>>> flow_data; + auto ff = std::make_shared<minifi::FlowFileRecord>(); + ff_id = ff->getUUID(); + ff->setConnection(conn.get()); + content_repo->write(*claim)->write("hello"); + ff->setResourceClaim(claim); + auto stream = std::make_unique<minifi::io::BufferStream>(); + ff->Serialize(*stream); + flow_data.emplace_back(ff->getUUIDStr(), std::move(stream)); + + REQUIRE(ff_repo->MultiPut(flow_data)); + } + + { + auto ff_repo = std::make_shared<core::repository::FlowFileRepository>(); + REQUIRE(ff_repo->initialize(config)); + auto content_repo = std::make_shared<core::repository::FileSystemRepository>(); + REQUIRE(content_repo->initialize(config)); + auto conn = std::make_shared<minifi::Connection>(ff_repo, content_repo, "TestConnection", connection_id); + + ff_repo->setConnectionMap({{connection_id.to_string(), conn.get()}}); + ff_repo->loadComponent(content_repo); + + std::set<std::shared_ptr<core::FlowFile>> expired; + std::shared_ptr<core::FlowFile> ff = conn->poll(expired); + REQUIRE(expired.empty()); + REQUIRE(ff); + REQUIRE(ff->getUUID() == ff_id); + } +} + } // namespace diff --git a/libminifi/test/unit/FileSystemRepositoryTests.cpp b/libminifi/test/unit/FileSystemRepositoryTests.cpp index b49fe60c5..c61579dd9 100644 --- a/libminifi/test/unit/FileSystemRepositoryTests.cpp +++ b/libminifi/test/unit/FileSystemRepositoryTests.cpp @@ -58,3 +58,27 @@ TEST_CASE("Test Physical memory usage", "[testphysicalmemoryusage]") { return end_memory < start_memory + int64_t{5_MB}; }, 100ms)); } + +TEST_CASE("FileSystemRepository can clear orphan entries") { + TestController testController; + auto dir = testController.createTempDirectory(); + auto configuration = std::make_shared<org::apache::nifi::minifi::Configure>(); + configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir.string()); + { + auto content_repo = std::make_shared<core::repository::FileSystemRepository>(); + REQUIRE(content_repo->initialize(configuration)); + + minifi::ResourceClaim claim(content_repo); + content_repo->write(claim)->write("hi"); + // ensure that the content is not deleted during resource claim destruction + content_repo->incrementStreamCount(claim); + } + + REQUIRE(utils::file::list_dir_all(dir, testController.getLogger()).size() == 1); + + auto content_repo = std::make_shared<core::repository::FileSystemRepository>(); + REQUIRE(content_repo->initialize(configuration)); + content_repo->clearOrphans(); + + REQUIRE(utils::file::list_dir_all(dir, testController.getLogger()).empty()); +}
