This is an automated email from the ASF dual-hosted git repository. szaszm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 7425983a118ef9d81e87e17b764f4ab57b44b2b7 Author: Adam Debreceni <[email protected]> AuthorDate: Sat Mar 4 23:33:02 2023 +0100 MINIFICPP-2054 Periodically run rocksdb compaction Some SSTs seem to get stuck in some use cases, so this change implements a periodically triggered manual compaction. Closes #1516 Signed-off-by: Marton Szasz <[email protected]> --- CONFIGURE.md | 9 ++++ conf/minifi.properties | 4 ++ .../rocksdb-repos/DatabaseContentRepository.cpp | 43 +++++++++++++++- .../rocksdb-repos/DatabaseContentRepository.h | 16 +++++- extensions/rocksdb-repos/FlowFileRepository.cpp | 35 +++++++++++++ extensions/rocksdb-repos/FlowFileRepository.h | 9 ++++ extensions/rocksdb-repos/database/OpenRocksDb.cpp | 20 ++++---- extensions/rocksdb-repos/database/OpenRocksDb.h | 14 ++--- libminifi/include/core/ContentRepository.h | 3 ++ libminifi/include/properties/Configuration.h | 5 ++ libminifi/include/utils/StoppableThread.h | 60 ++++++++++++++++++++++ libminifi/src/Configuration.cpp | 2 + libminifi/src/FlowController.cpp | 2 + libminifi/src/utils/StoppableThread.cpp | 39 ++++++++++++++ .../rocksdb-tests/DBContentRepositoryTests.cpp | 38 ++++++++------ 15 files changed, 261 insertions(+), 38 deletions(-) diff --git a/CONFIGURE.md b/CONFIGURE.md index c9beff2c1..06e66962d 100644 --- a/CONFIGURE.md +++ b/CONFIGURE.md @@ -171,6 +171,15 @@ If content repository or flow file repository is set to use the rocksdb database nifi.flowfile.repository.rocksdb.compression=zlib nifi.content.repository.rocksdb.compression=auto + +### Configuring compaction for rocksdb database + +Rocksdb has an option to run compaction at specific intervals not just when needed. + + in minifi.properties + nifi.flowfile.repository.rocksdb.compaction.period=2 min + nifi.database.content.repository.rocksdb.compaction.period=2 min + #### Shared database It is also possible to use a single database to store multiple repositories with the `minifidb://` scheme. diff --git a/conf/minifi.properties b/conf/minifi.properties index 884a03f34..1a041f294 100644 --- a/conf/minifi.properties +++ b/conf/minifi.properties @@ -34,6 +34,10 @@ nifi.provenance.repository.class.name=NoOpRepository nifi.content.repository.class.name=DatabaseContentRepository # nifi.content.repository.rocksdb.compression=auto +## Relates to the internal workings of the rocksdb backend +# nifi.flowfile.repository.rocksdb.compaction.period=2 min +# nifi.database.content.repository.rocksdb.compaction.period=2 min + #nifi.remote.input.secure=true #nifi.security.need.ClientAuth= #nifi.security.client.certificate= diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.cpp b/extensions/rocksdb-repos/DatabaseContentRepository.cpp index 9beeb2498..020c60fca 100644 --- a/extensions/rocksdb-repos/DatabaseContentRepository.cpp +++ b/extensions/rocksdb-repos/DatabaseContentRepository.cpp @@ -21,6 +21,7 @@ #include <string> #include <utility> #include <vector> +#include <cinttypes> #include "encryption/RocksDbEncryptionProvider.h" #include "RocksDbStream.h" @@ -42,6 +43,8 @@ bool DatabaseContentRepository::initialize(const std::shared_ptr<minifi::Configu const auto encrypted_env = createEncryptingEnv(utils::crypto::EncryptionManager{configuration->getHome()}, DbEncryptionOptions{directory_, ENCRYPTION_KEY_NAME}); logger_->log_info("Using %s DatabaseContentRepository", encrypted_env ? "encrypted" : "plaintext"); + setCompactionPeriod(configuration); + auto set_db_opts = [encrypted_env] (minifi::internal::Writable<rocksdb::DBOptions>& db_opts) { db_opts.set(&rocksdb::DBOptions::create_if_missing, true); db_opts.set(&rocksdb::DBOptions::use_direct_io_for_flush_and_compaction, true); @@ -72,14 +75,52 @@ bool DatabaseContentRepository::initialize(const std::shared_ptr<minifi::Configu return is_valid_; } +void DatabaseContentRepository::setCompactionPeriod(const std::shared_ptr<minifi::Configure> &configuration) { + compaction_period_ = DEFAULT_COMPACTION_PERIOD; + if (auto compaction_period_str = configuration->get(Configure::nifi_dbcontent_repository_rocksdb_compaction_period)) { + if (auto compaction_period = TimePeriodValue::fromString(compaction_period_str.value())) { + compaction_period_ = compaction_period->getMilliseconds(); + if (compaction_period_.count() == 0) { + logger_->log_warn("Setting '%s' to 0 disables forced compaction", Configure::nifi_dbcontent_repository_rocksdb_compaction_period); + } + } else { + logger_->log_error("Malformed property '%s', expected time period, using default", Configure::nifi_dbcontent_repository_rocksdb_compaction_period); + } + } else { + logger_->log_debug("Using default compaction period of %" PRId64 " ms", int64_t{compaction_period_.count()}); + } +} + +void DatabaseContentRepository::runCompaction() { + do { + if (auto opendb = db_->open()) { + auto status = opendb->RunCompaction(); + logger_->log_trace("Compaction triggered: %s", status.ToString()); + } else { + logger_->log_error("Failed to open database for compaction"); + } + } while (!utils::StoppableThread::waitForStopRequest(compaction_period_)); +} + +void DatabaseContentRepository::start() { + if (!db_ || !is_valid_) { + return; + } + if (compaction_period_.count() != 0) { + compaction_thread_ = std::make_unique<utils::StoppableThread>([this] () { + runCompaction(); + }); + } +} + void DatabaseContentRepository::stop() { if (db_) { auto opendb = db_->open(); if (opendb) { opendb->FlushWAL(true); } + compaction_thread_.reset(); } - db_.reset(); } DatabaseContentRepository::Session::Session(std::shared_ptr<ContentRepository> repository) : BufferedContentSession(std::move(repository)) {} diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.h b/extensions/rocksdb-repos/DatabaseContentRepository.h index 98f3acb79..7e1bac61c 100644 --- a/extensions/rocksdb-repos/DatabaseContentRepository.h +++ b/extensions/rocksdb-repos/DatabaseContentRepository.h @@ -19,6 +19,7 @@ #include <memory> #include <string> #include <utility> +#include <thread> #include "core/ContentRepository.h" #include "core/BufferedContentSession.h" @@ -26,6 +27,7 @@ #include "core/Property.h" #include "database/RocksDatabase.h" #include "properties/Configure.h" +#include "utils/StoppableThread.h" namespace org::apache::nifi::minifi::core::repository { @@ -37,6 +39,8 @@ class DatabaseContentRepository : public core::ContentRepository { void commit() override; }; + static constexpr std::chrono::milliseconds DEFAULT_COMPACTION_PERIOD = std::chrono::minutes{2}; + public: static constexpr const char* ENCRYPTION_KEY_NAME = "nifi.database.content.repository.encryption.key"; @@ -56,7 +60,6 @@ class DatabaseContentRepository : public core::ContentRepository { std::shared_ptr<ContentSession> createSession() override; bool initialize(const std::shared_ptr<minifi::Configure> &configuration) override; - void stop(); std::shared_ptr<io::BaseStream> write(const minifi::ResourceClaim &claim, bool append = false) override; std::shared_ptr<io::BaseStream> read(const minifi::ResourceClaim &claim) override; @@ -69,12 +72,21 @@ class DatabaseContentRepository : public core::ContentRepository { void clearOrphans() override; - private: + void start() override; + void stop() override; + + protected: std::shared_ptr<io::BaseStream> write(const minifi::ResourceClaim &claim, bool append, minifi::internal::WriteBatch* batch); + void runCompaction(); + void setCompactionPeriod(const std::shared_ptr<minifi::Configure> &configuration); + bool is_valid_; std::unique_ptr<minifi::internal::RocksDatabase> db_; std::shared_ptr<logging::Logger> logger_; + + std::chrono::milliseconds compaction_period_{DEFAULT_COMPACTION_PERIOD}; + std::unique_ptr<utils::StoppableThread> compaction_thread_; }; } // namespace org::apache::nifi::minifi::core::repository diff --git a/extensions/rocksdb-repos/FlowFileRepository.cpp b/extensions/rocksdb-repos/FlowFileRepository.cpp index 8637b4282..7568201e3 100644 --- a/extensions/rocksdb-repos/FlowFileRepository.cpp +++ b/extensions/rocksdb-repos/FlowFileRepository.cpp @@ -200,6 +200,8 @@ bool FlowFileRepository::initialize(const std::shared_ptr<Configure> &configure) } logger_->log_debug("NiFi FlowFile Repository Directory %s", directory_); + setCompactionPeriod(configure); + const auto encrypted_env = createEncryptingEnv(utils::crypto::EncryptionManager{configure->getHome()}, DbEncryptionOptions{directory_, ENCRYPTION_KEY_NAME}); logger_->log_info("Using %s FlowFileRepository", encrypted_env ? "encrypted" : "plaintext"); @@ -239,6 +241,22 @@ bool FlowFileRepository::initialize(const std::shared_ptr<Configure> &configure) } } +void FlowFileRepository::setCompactionPeriod(const std::shared_ptr<Configure> &configure) { + compaction_period_ = DEFAULT_COMPACTION_PERIOD; + if (auto compaction_period_str = configure->get(Configure::nifi_flowfile_repository_rocksdb_compaction_period)) { + if (auto compaction_period = TimePeriodValue::fromString(compaction_period_str.value())) { + compaction_period_ = compaction_period->getMilliseconds(); + if (compaction_period_.count() == 0) { + logger_->log_warn("Setting '%s' to 0 disables forced compaction", Configure::nifi_flowfile_repository_rocksdb_compaction_period); + } + } else { + logger_->log_error("Malformed property '%s', expected time period, using default", Configure::nifi_flowfile_repository_rocksdb_compaction_period); + } + } else { + logger_->log_debug("Using default compaction period of %" PRId64 " ms", int64_t{compaction_period_.count()}); + } +} + bool FlowFileRepository::Put(const std::string& key, const uint8_t *buf, size_t bufLen) { // persistent to the DB auto opendb = db_->open(); @@ -281,15 +299,32 @@ bool FlowFileRepository::Get(const std::string &key, std::string &value) { return opendb->Get(rocksdb::ReadOptions(), key, &value).ok(); } +void FlowFileRepository::runCompaction() { + do { + if (auto opendb = db_->open()) { + auto status = opendb->RunCompaction(); + logger_->log_trace("Compaction triggered: %s", status.ToString()); + } else { + logger_->log_error("Failed to open database for compaction"); + } + } while (!utils::StoppableThread::waitForStopRequest(compaction_period_)); +} + bool FlowFileRepository::start() { const bool ret = ThreadedRepository::start(); if (swap_loader_) { swap_loader_->start(); } + if (compaction_period_.count() != 0) { + compaction_thread_ = std::make_unique<utils::StoppableThread>([this] () { + runCompaction(); + }); + } return ret; } bool FlowFileRepository::stop() { + compaction_thread_.reset(); if (swap_loader_) { swap_loader_->stop(); } diff --git a/extensions/rocksdb-repos/FlowFileRepository.h b/extensions/rocksdb-repos/FlowFileRepository.h index 832fde642..e90081221 100644 --- a/extensions/rocksdb-repos/FlowFileRepository.h +++ b/extensions/rocksdb-repos/FlowFileRepository.h @@ -38,6 +38,7 @@ #include "FlowFileLoader.h" #include "range/v3/algorithm/all_of.hpp" #include "utils/Literals.h" +#include "utils/StoppableThread.h" namespace org::apache::nifi::minifi::core::repository { @@ -58,6 +59,8 @@ constexpr auto FLOWFILE_REPOSITORY_RETRY_INTERVAL_INCREMENTS = std::chrono::mill * Design: Extends Repository and implements the run function, using rocksdb as the primary substrate. */ class FlowFileRepository : public ThreadedRepository, public SwapManager { + static constexpr std::chrono::milliseconds DEFAULT_COMPACTION_PERIOD = std::chrono::minutes{2}; + public: static constexpr const char* ENCRYPTION_KEY_NAME = "nifi.flowfile.repository.encryption.key"; @@ -106,6 +109,9 @@ class FlowFileRepository : public ThreadedRepository, public SwapManager { private: void run() override; + void runCompaction(); + void setCompactionPeriod(const std::shared_ptr<Configure> &configure); + bool ExecuteWithRetry(const std::function<rocksdb::Status()>& operation); void initialize_repository(); @@ -121,6 +127,9 @@ class FlowFileRepository : public ThreadedRepository, public SwapManager { std::shared_ptr<logging::Logger> logger_; std::shared_ptr<minifi::Configure> config_; std::thread thread_; + + std::chrono::milliseconds compaction_period_; + std::unique_ptr<utils::StoppableThread> compaction_thread_; }; } // namespace org::apache::nifi::minifi::core::repository diff --git a/extensions/rocksdb-repos/database/OpenRocksDb.cpp b/extensions/rocksdb-repos/database/OpenRocksDb.cpp index 9bf2746f7..be5f1960c 100644 --- a/extensions/rocksdb-repos/database/OpenRocksDb.cpp +++ b/extensions/rocksdb-repos/database/OpenRocksDb.cpp @@ -22,11 +22,7 @@ #include "ColumnHandle.h" #include "RocksDbInstance.h" -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace internal { +namespace org::apache::nifi::minifi::internal { OpenRocksDb::OpenRocksDb(RocksDbInstance& db, gsl::not_null<std::shared_ptr<rocksdb::DB>> impl, gsl::not_null<std::shared_ptr<ColumnHandle>> column) : db_(&db), impl_(std::move(impl)), column_(std::move(column)) {} @@ -95,6 +91,14 @@ rocksdb::Status OpenRocksDb::FlushWAL(bool sync) { return result; } +rocksdb::Status OpenRocksDb::RunCompaction() { + rocksdb::Status result = impl_->CompactRange(rocksdb::CompactRangeOptions{ + .bottommost_level_compaction = rocksdb::BottommostLevelCompaction::kForce + }, nullptr, nullptr); + handleResult(result); + return result; +} + void OpenRocksDb::handleResult(const rocksdb::Status& result) { if (result == rocksdb::Status::NoSpace()) { db_->invalidate(); @@ -118,8 +122,4 @@ rocksdb::DB* OpenRocksDb::get() { return impl_.get(); } -} // namespace internal -} // namespace minifi -} // namespace nifi -} // namespace apache -} // namespace org +} // namespace org::apache::nifi::minifi::internal diff --git a/extensions/rocksdb-repos/database/OpenRocksDb.h b/extensions/rocksdb-repos/database/OpenRocksDb.h index 33d26d50a..ebe25aacc 100644 --- a/extensions/rocksdb-repos/database/OpenRocksDb.h +++ b/extensions/rocksdb-repos/database/OpenRocksDb.h @@ -27,11 +27,7 @@ #include "rocksdb/utilities/checkpoint.h" #include "WriteBatch.h" -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace internal { +namespace org::apache::nifi::minifi::internal { class RocksDbInstance; struct ColumnHandle; @@ -71,6 +67,8 @@ class OpenRocksDb { rocksdb::Status FlushWAL(bool sync); + rocksdb::Status RunCompaction(); + rocksdb::DB* get(); private: @@ -82,8 +80,4 @@ class OpenRocksDb { gsl::not_null<std::shared_ptr<ColumnHandle>> column_; }; -} // namespace internal -} // namespace minifi -} // namespace nifi -} // namespace apache -} // namespace org +} // namespace org::apache::nifi::minifi::internal diff --git a/libminifi/include/core/ContentRepository.h b/libminifi/include/core/ContentRepository.h index 7d92634fb..ce5a0756c 100644 --- a/libminifi/include/core/ContentRepository.h +++ b/libminifi/include/core/ContentRepository.h @@ -53,6 +53,9 @@ class ContentRepository : public StreamManager<minifi::ResourceClaim>, public ut virtual void clearOrphans() = 0; + virtual void start() {} + virtual void stop() {} + protected: std::string directory_; std::mutex count_map_mutex_; diff --git a/libminifi/include/properties/Configuration.h b/libminifi/include/properties/Configuration.h index 978600bb3..c3b450325 100644 --- a/libminifi/include/properties/Configuration.h +++ b/libminifi/include/properties/Configuration.h @@ -69,6 +69,11 @@ class Configuration : public Properties { static constexpr const char *nifi_provenance_repository_directory_default = "nifi.provenance.repository.directory.default"; static constexpr const char *nifi_flowfile_repository_directory_default = "nifi.flowfile.repository.directory.default"; static constexpr const char *nifi_dbcontent_repository_directory_default = "nifi.database.content.repository.directory.default"; + + // these are internal properties related to the rocksdb backend + static constexpr const char *nifi_flowfile_repository_rocksdb_compaction_period = "nifi.flowfile.repository.rocksdb.compaction.period"; + static constexpr const char *nifi_dbcontent_repository_rocksdb_compaction_period = "nifi.database.content.repository.rocksdb.compaction.period"; + static constexpr const char *nifi_remote_input_secure = "nifi.remote.input.secure"; static constexpr const char *nifi_security_need_ClientAuth = "nifi.security.need.ClientAuth"; static constexpr const char *nifi_sensitive_props_additional_keys = "nifi.sensitive.props.additional.keys"; diff --git a/libminifi/include/utils/StoppableThread.h b/libminifi/include/utils/StoppableThread.h new file mode 100644 index 000000000..c21ca2633 --- /dev/null +++ b/libminifi/include/utils/StoppableThread.h @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <mutex> +#include <thread> +#include <condition_variable> +#include <atomic> +#include <functional> + +namespace org::apache::nifi::minifi::utils { + +// mimics some aspects of std::jthread +// unfortunately clang's jthread support is lacking +// TODO(adebreceni): replace this with std::jthread +class StoppableThread { + public: + explicit StoppableThread(std::function<void()> fn); + + void stopAndJoin() { + running_ = false; + { + std::unique_lock lock(mtx_); + cv_.notify_all(); + } + if (thread_.joinable()) { + thread_.join(); + } + } + + ~StoppableThread() { + stopAndJoin(); + } + + // return true if stop was requested + static bool waitForStopRequest(std::chrono::milliseconds time); + + private: + std::atomic_bool running_{true}; + std::mutex mtx_; + std::condition_variable cv_; + std::thread thread_; +}; + +} // namespace org::apache::nifi::minifi::utils diff --git a/libminifi/src/Configuration.cpp b/libminifi/src/Configuration.cpp index c584cb574..ab4bc2670 100644 --- a/libminifi/src/Configuration.cpp +++ b/libminifi/src/Configuration.cpp @@ -54,6 +54,8 @@ const std::vector<core::ConfigurationProperty> Configuration::CONFIGURATION_PROP core::ConfigurationProperty{Configuration::nifi_provenance_repository_directory_default}, core::ConfigurationProperty{Configuration::nifi_flowfile_repository_directory_default}, core::ConfigurationProperty{Configuration::nifi_dbcontent_repository_directory_default}, + core::ConfigurationProperty{Configuration::nifi_flowfile_repository_rocksdb_compaction_period, gsl::make_not_null(core::StandardValidators::get().TIME_PERIOD_VALIDATOR.get())}, + core::ConfigurationProperty{Configuration::nifi_dbcontent_repository_rocksdb_compaction_period, gsl::make_not_null(core::StandardValidators::get().TIME_PERIOD_VALIDATOR.get())}, core::ConfigurationProperty{Configuration::nifi_remote_input_secure, gsl::make_not_null(core::StandardValidators::get().BOOLEAN_VALIDATOR.get())}, core::ConfigurationProperty{Configuration::nifi_security_need_ClientAuth, gsl::make_not_null(core::StandardValidators::get().BOOLEAN_VALIDATOR.get())}, core::ConfigurationProperty{Configuration::nifi_sensitive_props_additional_keys}, diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp index 9d0d62e15..a31e6536b 100644 --- a/libminifi/src/FlowController.cpp +++ b/libminifi/src/FlowController.cpp @@ -210,6 +210,7 @@ int16_t FlowController::stop() { } this->flow_file_repo_->stop(); this->provenance_repo_->stop(); + this->content_repo_->stop(); // stop the ControllerServices disableAllControllerServices(); running_ = false; @@ -385,6 +386,7 @@ int16_t FlowController::start() { core::logging::LoggerConfiguration::getConfiguration().initializeAlertSinks(this, configuration_); running_ = true; this->protocol_->start(); + this->content_repo_->start(); this->provenance_repo_->start(); this->flow_file_repo_->start(); logger_->log_info("Started Flow Controller"); diff --git a/libminifi/src/utils/StoppableThread.cpp b/libminifi/src/utils/StoppableThread.cpp new file mode 100644 index 000000000..8fea65a6d --- /dev/null +++ b/libminifi/src/utils/StoppableThread.cpp @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "utils/StoppableThread.h" +#include "utils/gsl.h" + +namespace org::apache::nifi::minifi::utils { + +static thread_local StoppableThread* current_thread; + +StoppableThread::StoppableThread(std::function<void()> fn) { + thread_ = std::thread{[fn = std::move(fn), this] { + current_thread = this; + fn(); + }}; +} + +bool StoppableThread::waitForStopRequest(std::chrono::milliseconds time) { + gsl_Expects(current_thread); + std::unique_lock lock(current_thread->mtx_); + // wait_for returns false if the predicate is still false, i.e. the thread is running + return current_thread->cv_.wait_for(lock, time, [&] {return !current_thread->running_;}); +} + +} // namespace org::apache::nifi::minifi::utils diff --git a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp index 942f1544e..ac84cd4b0 100644 --- a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp +++ b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp @@ -29,10 +29,18 @@ #include "../unit/ProvenanceTestHelper.h" #include "../unit/ContentRepositoryDependentTests.h" +class TestDatabaseContentRepository : public core::repository::DatabaseContentRepository { + public: + void invalidate() { + stop(); + db_.reset(); + } +}; + TEST_CASE("Write Claim", "[TestDBCR1]") { TestController testController; auto dir = testController.createTempDirectory(); - auto content_repo = std::make_shared<core::repository::DatabaseContentRepository>(); + auto content_repo = std::make_shared<TestDatabaseContentRepository>(); auto configuration = std::make_shared<org::apache::nifi::minifi::Configure>(); configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir.string()); @@ -46,11 +54,11 @@ TEST_CASE("Write Claim", "[TestDBCR1]") { stream->close(); - content_repo->stop(); + content_repo->invalidate(); // reclaim the memory content_repo = nullptr; - content_repo = std::make_shared<core::repository::DatabaseContentRepository>(); + content_repo = std::make_shared<TestDatabaseContentRepository>(); configuration = std::make_shared<org::apache::nifi::minifi::Configure>(); configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir.string()); @@ -72,7 +80,7 @@ TEST_CASE("Write Claim", "[TestDBCR1]") { TEST_CASE("Delete Claim", "[TestDBCR2]") { TestController testController; auto dir = testController.createTempDirectory(); - auto content_repo = std::make_shared<core::repository::DatabaseContentRepository>(); + auto content_repo = std::make_shared<TestDatabaseContentRepository>(); auto configuration = std::make_shared<org::apache::nifi::minifi::Configure>(); configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir.string()); @@ -86,12 +94,12 @@ TEST_CASE("Delete Claim", "[TestDBCR2]") { stream->close(); - content_repo->stop(); + content_repo->invalidate(); // reclaim the memory content_repo = nullptr; - content_repo = std::make_shared<core::repository::DatabaseContentRepository>(); + content_repo = std::make_shared<TestDatabaseContentRepository>(); configuration = std::make_shared<org::apache::nifi::minifi::Configure>(); configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir.string()); @@ -110,7 +118,7 @@ TEST_CASE("Delete Claim", "[TestDBCR2]") { TEST_CASE("Test Empty Claim", "[TestDBCR3]") { TestController testController; auto dir = testController.createTempDirectory(); - auto content_repo = std::make_shared<core::repository::DatabaseContentRepository>(); + auto content_repo = std::make_shared<TestDatabaseContentRepository>(); auto configuration = std::make_shared<org::apache::nifi::minifi::Configure>(); configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir.string()); @@ -123,12 +131,12 @@ TEST_CASE("Test Empty Claim", "[TestDBCR3]") { stream->close(); - content_repo->stop(); + content_repo->invalidate(); // reclaim the memory content_repo = nullptr; - content_repo = std::make_shared<core::repository::DatabaseContentRepository>(); + content_repo = std::make_shared<TestDatabaseContentRepository>(); configuration = std::make_shared<org::apache::nifi::minifi::Configure>(); configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir.string()); @@ -145,7 +153,7 @@ TEST_CASE("Test Empty Claim", "[TestDBCR3]") { TEST_CASE("Delete NonExistent Claim", "[TestDBCR4]") { TestController testController; auto dir = testController.createTempDirectory(); - auto content_repo = std::make_shared<core::repository::DatabaseContentRepository>(); + auto content_repo = std::make_shared<TestDatabaseContentRepository>(); auto configuration = std::make_shared<org::apache::nifi::minifi::Configure>(); configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir.string()); @@ -159,12 +167,12 @@ TEST_CASE("Delete NonExistent Claim", "[TestDBCR4]") { stream->close(); - content_repo->stop(); + content_repo->invalidate(); // reclaim the memory content_repo = nullptr; - content_repo = std::make_shared<core::repository::DatabaseContentRepository>(); + content_repo = std::make_shared<TestDatabaseContentRepository>(); configuration = std::make_shared<org::apache::nifi::minifi::Configure>(); configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir.string()); @@ -185,7 +193,7 @@ TEST_CASE("Delete NonExistent Claim", "[TestDBCR4]") { TEST_CASE("Delete Remove Count Claim", "[TestDBCR5]") { TestController testController; auto dir = testController.createTempDirectory(); - auto content_repo = std::make_shared<core::repository::DatabaseContentRepository>(); + auto content_repo = std::make_shared<TestDatabaseContentRepository>(); auto configuration = std::make_shared<org::apache::nifi::minifi::Configure>(); configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir.string()); @@ -199,12 +207,12 @@ TEST_CASE("Delete Remove Count Claim", "[TestDBCR5]") { stream->close(); - content_repo->stop(); + content_repo->invalidate(); // reclaim the memory content_repo = nullptr; - content_repo = std::make_shared<core::repository::DatabaseContentRepository>(); + content_repo = std::make_shared<TestDatabaseContentRepository>(); configuration = std::make_shared<org::apache::nifi::minifi::Configure>(); configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir.string());
