This is an automated email from the ASF dual-hosted git repository. szaszm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 26efc3d7048f346003d4c4afc12408da509888da Author: Gabor Gyimesi <[email protected]> AuthorDate: Tue Nov 18 12:34:43 2025 +0100 MINIFICPP-2670 Clean up volatile repositories - Remove unused configuration options - Remove VolatileFlowFileRepository and make it an alias for NoOpThreadedRepository - Remove LegacyVolatileContentRepository - Update documentation Closes #2062 Signed-off-by: Marton Szasz <[email protected]> --- CONFIGURE.md | 23 +- encrypt-config/FlowConfigEncryptor.cpp | 1 - extensions/rocksdb-repos/tests/ProvenanceTests.cpp | 2 +- extensions/rocksdb-repos/tests/RepoTests.cpp | 20 +- .../repository/LegacyVolatileContentRepository.h | 107 --------- .../core/repository/NoOpThreadedRepository.h | 63 +++++ .../core/repository/VolatileFlowFileRepository.h | 96 -------- libminifi/src/Configuration.cpp | 5 - libminifi/src/core/RepositoryFactory.cpp | 42 +--- .../repository/LegacyVolatileContentRepository.cpp | 159 ------------- libminifi/test/flow-tests/SessionTests.cpp | 1 - libminifi/test/libtest/unit/ProvenanceTestHelper.h | 17 -- libminifi/test/libtest/unit/TestBase.cpp | 1 + .../test/persistence-tests/PersistenceTests.cpp | 1 - libminifi/test/unit/FlowFileQueueSwapTests.cpp | 1 + libminifi/test/unit/MetricsTests.cpp | 41 ---- libminifi/test/unit/ProcessSessionTests.cpp | 45 ---- libminifi/test/unit/SiteToSiteTests.cpp | 1 + .../performance/VolatileRepositoryPerfTests.cpp | 258 --------------------- .../include/minifi-cpp/properties/Configuration.h | 5 - minifi_main/MiNiFiMain.cpp | 4 +- 21 files changed, 80 insertions(+), 813 deletions(-) diff --git a/CONFIGURE.md b/CONFIGURE.md index 5773ee47d..a1bab7754 100644 --- a/CONFIGURE.md +++ b/CONFIGURE.md @@ -613,10 +613,10 @@ Apache MiNiFi C++ uses three repositories similarly to Apache NiFi: The underlying implementation to use for these repositories can be configured in the minifi.properties file. -The Flow File Repository can be configured with the `nifi.flowfile.repository.class.name` property. If not specified, it uses the `FlowFileRepository` class by default, which stores the flow file metadata in a RocksDB database. Alternatively it can be configured to use a `VolatileFlowFileRepository` that keeps the state in memory (so the state gets lost upon restart), or the `NoOpRepository` for not keeping any state. +The Flow File Repository can be configured with the `nifi.flowfile.repository.class.name` property. If not specified, it uses the `FlowFileRepository` class by default, which stores the flow file metadata in a RocksDB database. Alternatively it can be configured to use a `NoOpRepository` for not keeping any state, flow files are only stored in memory while being transferred between processors. # in minifi.properties - nifi.flowfile.repository.class.name=VolatileFlowFileRepository + nifi.flowfile.repository.class.name=NoOpRepository # VolatileFlowFileRepository can also be used which is an alias for NoOpRepository The Content Repository can be configured with the `nifi.content.repository.class.name` property. If not specified, it uses the `DatabaseContentRepository` class by default, which persists the content in a RocksDB database. `DatabaseContentRepository` is also the default value specified in the minifi.properties file. Alternatively it can be configured to use a `VolatileContentRepository` that keeps the state in memory (so the state gets lost upon restart), or the `FileSystemRepository` to [...] @@ -636,33 +636,20 @@ The Provenance Repository can be configured with the `nifi.provenance.repository ### Configuring Volatile Repositories -As stated before each of the repositories can be configured to be volatile (state kept in memory and flushed upon restart) or persistent. Volatile repositories have some additional options, that can be specified in the following ways: +As stated before each of the repositories can be configured to be volatile (state kept in memory and flushed upon restart) or persistent. Volatile provenance repository also has some additional options, that can be specified in the following ways: # in minifi.properties # For Volatile Repositories: - nifi.flowfile.repository.class.name=VolatileFlowFileRepository + nifi.flowfile.repository.class.name=VolatileFlowFileRepository # alias for NoOpRepository in case of flowfile repository nifi.provenance.repository.class.name=VolatileProvenanceRepository nifi.content.repository.class.name=VolatileContentRepository - # configuration options - # maximum number of entries to keep in memory - nifi.volatile.repository.options.flowfile.max.count=15000 - # maximum number of bytes to keep in memory, also limited by option above - nifi.volatile.repository.options.flowfile.max.bytes=7680 KB - # maximum number of entries to keep in memory nifi.volatile.repository.options.provenance.max.count=15000 # maximum number of bytes to keep in memory, also limited by option above nifi.volatile.repository.options.provenance.max.bytes=7680 KB - # maximum number of entries to keep in memory - nifi.volatile.repository.options.content.max.count=15000 - # maximum number of bytes to keep in memory, also limited by option above - nifi.volatile.repository.options.content.max.bytes=7680 KB - # limits locking for the content repository - nifi.volatile.repository.options.content.minimal.locking=true - -**NOTE:** If the volatile repository reaches the maximum number of entries, it will start to drop the oldest entries, and replace them with the new entries in round robin manner. Make sure to set the maximum number of entries to a reasonable value, so that the repository does not run out of memory. +**NOTE:** If the volatile provenance repository reaches the maximum number of entries, it will start to drop the oldest entries, and replace them with the new entries in round robin manner. Make sure to set the maximum number of entries to a reasonable value, so that the repository does not run out of memory. Volatile content and flowfile repositories do not have such limits, their size is only limited by the available system memory. ### Configuring Repository storage locations Persistent repositories, such as the Flow File repository, use configurable paths to store data. The application detects its installation type at runtime and uses the appropriate default locations. diff --git a/encrypt-config/FlowConfigEncryptor.cpp b/encrypt-config/FlowConfigEncryptor.cpp index fcb9a927a..dec950757 100644 --- a/encrypt-config/FlowConfigEncryptor.cpp +++ b/encrypt-config/FlowConfigEncryptor.cpp @@ -23,7 +23,6 @@ #include "core/RepositoryFactory.h" #include "core/extension/ExtensionManager.h" #include "core/flow/AdaptiveConfiguration.h" -#include "core/repository/VolatileContentRepository.h" #include "utils/Id.h" #include "utils/file/FileSystem.h" diff --git a/extensions/rocksdb-repos/tests/ProvenanceTests.cpp b/extensions/rocksdb-repos/tests/ProvenanceTests.cpp index ab70db15e..95f129883 100644 --- a/extensions/rocksdb-repos/tests/ProvenanceTests.cpp +++ b/extensions/rocksdb-repos/tests/ProvenanceTests.cpp @@ -25,7 +25,7 @@ #include "core/repository/AtomicRepoEntries.h" #include "core/repository/VolatileProvenanceRepository.h" #include "core/RepositoryFactory.h" -#include "minifi-cpp/FlowFileRecord.h" +#include "FlowFileRecord.h" #include "unit/ProvenanceTestHelper.h" #include "unit/TestBase.h" #include "unit/Catch.h" diff --git a/extensions/rocksdb-repos/tests/RepoTests.cpp b/extensions/rocksdb-repos/tests/RepoTests.cpp index 275e4ba27..d8c8dd869 100644 --- a/extensions/rocksdb-repos/tests/RepoTests.cpp +++ b/extensions/rocksdb-repos/tests/RepoTests.cpp @@ -26,7 +26,7 @@ #include "core/ProcessContextImpl.h" #include "core/repository/AtomicRepoEntries.h" #include "core/RepositoryFactory.h" -#include "minifi-cpp/FlowFileRecord.h" +#include "FlowFileRecord.h" #include "FlowFileRepository.h" #include "ProvenanceRepository.h" #include "properties/Configure.h" @@ -35,7 +35,6 @@ #include "unit/Catch.h" #include "minifi-cpp/utils/gsl.h" #include "unit/TestUtils.h" -#include "core/repository/VolatileFlowFileRepository.h" #include "core/repository/VolatileProvenanceRepository.h" #include "DatabaseContentRepository.h" #include "catch2/generators/catch_generators.hpp" @@ -537,7 +536,6 @@ TEST_CASE("FlowFileRepository synchronously pushes existing flow files") { TEST_CASE("Test getting flow file repository size properties", "[TestGettingRepositorySize]") { LogTestController::getInstance().setDebug<core::repository::FlowFileRepository>(); LogTestController::getInstance().setDebug<minifi::provenance::ProvenanceRepository>(); - LogTestController::getInstance().setDebug<core::repository::VolatileFlowFileRepository>(); LogTestController::getInstance().setDebug<core::repository::VolatileProvenanceRepository>(); TestController testController; auto dir = testController.createTempDirectory(); @@ -556,12 +554,6 @@ TEST_CASE("Test getting flow file repository size properties", "[TestGettingRepo expected_rocksdb_stats = true; } - SECTION("VolatileFlowFileRepository") { - repository = std::make_shared<core::repository::VolatileFlowFileRepository>("ff", dir.string(), 0ms, 10, 1ms); - expected_is_full = true; - expected_max_repo_size = 7; - } - SECTION("VolatileProvenanceRepository") { repository = std::make_shared<core::repository::VolatileProvenanceRepository>("ff", dir.string(), 0ms, 10, 1ms); expected_is_full = true; @@ -643,8 +635,7 @@ TEST_CASE("Test getting content repository size properties", "[TestGettingReposi const auto content_repo_dir = testController.createTempDirectory(); const auto configuration = std::make_shared<minifi::ConfigureImpl>(); configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, content_repo_dir.string()); - std::string content = "content"; - configuration->set(minifi::Configure::nifi_volatile_repository_options_content_max_bytes, std::to_string(content.size())); + const std::string content = "content"; std::shared_ptr<core::ContentRepository> content_repo; auto expected_is_full = false; @@ -655,7 +646,7 @@ TEST_CASE("Test getting content repository size properties", "[TestGettingReposi } SECTION("VolatileContentRepository") { - content_repo = std::make_shared<core::repository::VolatileContentRepository>("content"); + content_repo = std::make_shared<core::repository::VolatileContentRepository>(content); expected_is_full = false; expected_max_repo_size = std::numeric_limits<uint64_t>::max(); } @@ -707,7 +698,6 @@ TEST_CASE("Test getting content repository size properties", "[TestGettingReposi TEST_CASE("Flow file repositories can be stopped", "[TestRepoIsRunning]") { LogTestController::getInstance().setDebug<core::repository::FlowFileRepository>(); LogTestController::getInstance().setDebug<minifi::provenance::ProvenanceRepository>(); - LogTestController::getInstance().setDebug<core::repository::VolatileFlowFileRepository>(); LogTestController::getInstance().setDebug<core::repository::VolatileProvenanceRepository>(); TestController testController; const auto dir = testController.createTempDirectory(); @@ -721,10 +711,6 @@ TEST_CASE("Flow file repositories can be stopped", "[TestRepoIsRunning]") { repository = std::make_shared<minifi::provenance::ProvenanceRepository>("ff", dir.string(), 0ms, 0, 1ms); } - SECTION("VolatileFlowFileRepository") { - repository = std::make_shared<core::repository::VolatileFlowFileRepository>("ff", dir.string(), 0ms, 10, 1ms); - } - SECTION("VolatileProvenanceRepository") { repository = std::make_shared<core::repository::VolatileProvenanceRepository>("ff", dir.string(), 0ms, 10, 1ms); } diff --git a/libminifi/include/core/repository/LegacyVolatileContentRepository.h b/libminifi/include/core/repository/LegacyVolatileContentRepository.h deleted file mode 100644 index a4780a491..000000000 --- a/libminifi/include/core/repository/LegacyVolatileContentRepository.h +++ /dev/null @@ -1,107 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include <map> -#include <memory> -#include <string> -#include <string_view> - -#include "AtomicRepoEntries.h" -#include "io/AtomicEntryStream.h" -#include "core/ContentRepository.h" -#include "properties/Configure.h" -#include "core/logging/LoggerFactory.h" -#include "utils/GeneralUtils.h" -#include "VolatileRepositoryData.h" -#include "minifi-cpp/utils/Literals.h" - -namespace org::apache::nifi::minifi::core::repository { -/** - * Purpose: Stages content into a volatile area of memory. Note that when the maximum number - * of entries is consumed we will rollback a session to wait for others to be freed. - */ -class LegacyVolatileContentRepository : public core::ContentRepositoryImpl { - public: - static const char *minimal_locking; - - explicit LegacyVolatileContentRepository(std::string_view name = className<LegacyVolatileContentRepository>()) - : core::ContentRepositoryImpl(name), - repo_data_(15000, static_cast<size_t>(10_MiB * 0.75)), - minimize_locking_(true), - logger_(logging::LoggerFactory<LegacyVolatileContentRepository>::getLogger()) { - } - - ~LegacyVolatileContentRepository() override { - logger_->log_debug("Clearing repository"); - if (!minimize_locking_) { - std::lock_guard<std::mutex> lock(map_mutex_); - for (const auto &item : master_list_) { - delete item.second; - } - master_list_.clear(); - } - } - - uint64_t getRepositorySize() const override { - return repo_data_.getRepositorySize(); - } - - uint64_t getMaxRepositorySize() const override { - return repo_data_.getMaxRepositorySize(); - } - - uint64_t getRepositoryEntryCount() const override { - return master_list_.size(); - } - - bool isFull() const override { - return repo_data_.isFull(); - } - - bool initialize(const std::shared_ptr<Configure> &configure) override; - - std::shared_ptr<io::BaseStream> write(const minifi::ResourceClaim &claim, bool append) override; - - std::shared_ptr<io::BaseStream> read(const minifi::ResourceClaim &claim) override; - - bool exists(const minifi::ResourceClaim &claim) override; - - bool close(const minifi::ResourceClaim &claim) override { - return remove(claim); - } - - void clearOrphans() override { - // there are no persisted orphans to delete - } - - protected: - bool removeKey(const std::string& content_path) override; - - private: - VolatileRepositoryData repo_data_; - bool minimize_locking_; - - // mutex and master list that represent a cache of Atomic entries. this exists so that we don't have to walk the atomic entry list. - // The idea is to reduce the computational complexity while keeping access as maximally lock free as we can. - std::mutex map_mutex_; - std::map<ResourceClaim::Path, AtomicEntry<ResourceClaim::Path>*> master_list_; - std::shared_ptr<logging::Logger> logger_; -}; - -} // namespace org::apache::nifi::minifi::core::repository diff --git a/libminifi/include/core/repository/NoOpThreadedRepository.h b/libminifi/include/core/repository/NoOpThreadedRepository.h new file mode 100644 index 000000000..df23a1b32 --- /dev/null +++ b/libminifi/include/core/repository/NoOpThreadedRepository.h @@ -0,0 +1,63 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include <string_view> +#include <thread> + +#include "core/ThreadedRepository.h" + +namespace org::apache::nifi::minifi::core::repository { + +class NoOpThreadedRepository : public core::ThreadedRepositoryImpl { + public: + explicit NoOpThreadedRepository(std::string_view repo_name) + : ThreadedRepositoryImpl(repo_name) { + } + + NoOpThreadedRepository(NoOpThreadedRepository&&) = delete; + NoOpThreadedRepository(const NoOpThreadedRepository&) = delete; + NoOpThreadedRepository& operator=(NoOpThreadedRepository&&) = delete; + NoOpThreadedRepository& operator=(const NoOpThreadedRepository&) = delete; + + ~NoOpThreadedRepository() override { + stop(); + } + + uint64_t getRepositorySize() const override { + return 0; + } + + uint64_t getRepositoryEntryCount() const override { + return 0; + } + + private: + void run() override { + } + + std::thread& getThread() override { + return thread_; + } + + std::thread thread_; +}; + +using VolatileFlowFileRepository = NoOpThreadedRepository; + +} // namespace org::apache::nifi::minifi::core::repository diff --git a/libminifi/include/core/repository/VolatileFlowFileRepository.h b/libminifi/include/core/repository/VolatileFlowFileRepository.h deleted file mode 100644 index 81c1112de..000000000 --- a/libminifi/include/core/repository/VolatileFlowFileRepository.h +++ /dev/null @@ -1,96 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#pragma once - -#include <memory> -#include <string> -#include <string_view> - -#include "VolatileRepository.h" -#include "FlowFileRecord.h" -#include "minifi-cpp/utils/gsl.h" - -struct VolatileFlowFileRepositoryTestAccessor; - -namespace org::apache::nifi::minifi::core::repository { - -/** - * Volatile flow file repository. keeps a running counter of the current location, freeing - * those which we no longer hold. - */ -class VolatileFlowFileRepository : public VolatileRepository { - friend struct ::VolatileFlowFileRepositoryTestAccessor; - - public: - explicit VolatileFlowFileRepository(std::string_view repo_name = "", - const std::string& /*dir*/ = REPOSITORY_DIRECTORY, - std::chrono::milliseconds maxPartitionMillis = MAX_REPOSITORY_ENTRY_LIFE_TIME, - int64_t maxPartitionBytes = MAX_REPOSITORY_STORAGE_SIZE, - std::chrono::milliseconds purgePeriod = REPOSITORY_PURGE_PERIOD) - : VolatileRepository(repo_name.length() > 0 ? repo_name : core::className<VolatileRepository>(), "", maxPartitionMillis, maxPartitionBytes, purgePeriod) { - } - - ~VolatileFlowFileRepository() override { - stop(); - } - - private: - void run() override { - while (isRunning()) { - std::this_thread::sleep_for(purge_period_); - flush(); - } - flush(); - } - - std::thread& getThread() override { - return thread_; - } - - void flush() override { - if (!content_repo_) { - return; - } - std::lock_guard<std::mutex> lock(purge_mutex_); - for (auto purgeItem : purge_list_) { - utils::Identifier containerId; - auto eventRead = FlowFileRecord::DeSerialize(gsl::make_span(purgeItem).as_span<const std::byte>(), content_repo_, containerId); - if (eventRead) { - auto claim = eventRead->getResourceClaim(); - if (claim) claim->decreaseFlowFileRecordOwnedCount(); - } - } - purge_list_.resize(0); - purge_list_.clear(); - } - - void loadComponent(const std::shared_ptr<core::ContentRepository> &content_repo) override { - content_repo_ = content_repo; - } - - void emplace(RepoValue<std::string> &old_value) override { - std::string buffer; - old_value.emplace(buffer); - std::lock_guard<std::mutex> lock(purge_mutex_); - purge_list_.push_back(buffer); - } - - std::shared_ptr<core::ContentRepository> content_repo_; - std::thread thread_; -}; -} // namespace org::apache::nifi::minifi::core::repository diff --git a/libminifi/src/Configuration.cpp b/libminifi/src/Configuration.cpp index b8881c0b0..cab4ed8e5 100644 --- a/libminifi/src/Configuration.cpp +++ b/libminifi/src/Configuration.cpp @@ -39,13 +39,8 @@ const std::unordered_map<std::string_view, gsl::not_null<const core::PropertyVal {Configuration::nifi_content_repository_class_name, gsl::make_not_null(&core::StandardPropertyValidators::ALWAYS_VALID_VALIDATOR)}, {Configuration::nifi_content_repository_rocksdb_compression, gsl::make_not_null(&core::StandardPropertyValidators::ALWAYS_VALID_VALIDATOR)}, {Configuration::nifi_provenance_repository_class_name, gsl::make_not_null(&core::StandardPropertyValidators::ALWAYS_VALID_VALIDATOR)}, - {Configuration::nifi_volatile_repository_options_flowfile_max_count, gsl::make_not_null(&core::StandardPropertyValidators::UNSIGNED_INTEGER_VALIDATOR)}, - {Configuration::nifi_volatile_repository_options_flowfile_max_bytes, gsl::make_not_null(&core::StandardPropertyValidators::DATA_SIZE_VALIDATOR)}, {Configuration::nifi_volatile_repository_options_provenance_max_count, gsl::make_not_null(&core::StandardPropertyValidators::UNSIGNED_INTEGER_VALIDATOR)}, {Configuration::nifi_volatile_repository_options_provenance_max_bytes, gsl::make_not_null(&core::StandardPropertyValidators::DATA_SIZE_VALIDATOR)}, - {Configuration::nifi_volatile_repository_options_content_max_count, gsl::make_not_null(&core::StandardPropertyValidators::UNSIGNED_INTEGER_VALIDATOR)}, - {Configuration::nifi_volatile_repository_options_content_max_bytes, gsl::make_not_null(&core::StandardPropertyValidators::DATA_SIZE_VALIDATOR)}, - {Configuration::nifi_volatile_repository_options_content_minimal_locking, gsl::make_not_null(&core::StandardPropertyValidators::BOOLEAN_VALIDATOR)}, {Configuration::nifi_provenance_repository_max_storage_size, gsl::make_not_null(&core::StandardPropertyValidators::DATA_SIZE_VALIDATOR)}, {Configuration::nifi_provenance_repository_max_storage_time, gsl::make_not_null(&core::StandardPropertyValidators::TIME_PERIOD_VALIDATOR)}, {Configuration::nifi_provenance_repository_directory_default, gsl::make_not_null(&core::StandardPropertyValidators::ALWAYS_VALID_VALIDATOR)}, diff --git a/libminifi/src/core/RepositoryFactory.cpp b/libminifi/src/core/RepositoryFactory.cpp index db714c97a..07aa04883 100644 --- a/libminifi/src/core/RepositoryFactory.cpp +++ b/libminifi/src/core/RepositoryFactory.cpp @@ -21,8 +21,8 @@ #include "core/repository/VolatileContentRepository.h" #include "core/ClassLoader.h" #include "core/repository/FileSystemRepository.h" -#include "core/repository/VolatileFlowFileRepository.h" #include "core/repository/VolatileProvenanceRepository.h" +#include "core/repository/NoOpThreadedRepository.h" using namespace std::literals::chrono_literals; @@ -57,40 +57,6 @@ std::unique_ptr<core::ContentRepository> createContentRepository(const std::stri throw std::runtime_error("Support for the provided configuration class could not be found"); } -class NoOpThreadedRepository : public core::ThreadedRepositoryImpl { - public: - explicit NoOpThreadedRepository(std::string_view repo_name) - : ThreadedRepositoryImpl(repo_name) { - } - - NoOpThreadedRepository(NoOpThreadedRepository&&) = delete; - NoOpThreadedRepository(const NoOpThreadedRepository&) = delete; - NoOpThreadedRepository& operator=(NoOpThreadedRepository&&) = delete; - NoOpThreadedRepository& operator=(const NoOpThreadedRepository&) = delete; - - ~NoOpThreadedRepository() override { - stop(); - } - - uint64_t getRepositorySize() const override { - return 0; - } - - uint64_t getRepositoryEntryCount() const override { - return 0; - } - - private: - void run() override { - } - - std::thread& getThread() override { - return thread_; - } - - std::thread thread_; -}; - std::unique_ptr<core::Repository> createRepository(const std::string& configuration_class_name, const std::string& repo_name) { std::string class_name_lc = configuration_class_name; std::transform(class_name_lc.begin(), class_name_lc.end(), class_name_lc.begin(), ::tolower); @@ -101,12 +67,10 @@ std::unique_ptr<core::Repository> createRepository(const std::string& configurat return return_obj; } // if the desired repos don't exist, we can try doing string matches and rely on volatile repositories - if (class_name_lc == "flowfilerepository" || class_name_lc == "volatileflowfilerepository") { - return instantiate<repository::VolatileFlowFileRepository>(repo_name); + if (class_name_lc == "flowfilerepository" || class_name_lc == "volatileflowfilerepository" || class_name_lc == "nooprepository") { + return std::make_unique<repository::NoOpThreadedRepository>(repo_name); } else if (class_name_lc == "provenancerepository" || class_name_lc == "volatileprovenancerepository") { return instantiate<repository::VolatileProvenanceRepository>(repo_name); - } else if (class_name_lc == "nooprepository") { - return std::make_unique<core::NoOpThreadedRepository>(repo_name); } return {}; } diff --git a/libminifi/src/core/repository/LegacyVolatileContentRepository.cpp b/libminifi/src/core/repository/LegacyVolatileContentRepository.cpp deleted file mode 100644 index a7a469585..000000000 --- a/libminifi/src/core/repository/LegacyVolatileContentRepository.cpp +++ /dev/null @@ -1,159 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "core/repository/LegacyVolatileContentRepository.h" - -#include <cstdio> -#include <memory> -#include <string> -#include <thread> - -#include "core/expect.h" -#include "io/FileStream.h" -#include "utils/StringUtils.h" - -using namespace std::literals::chrono_literals; - -namespace org::apache::nifi::minifi::core::repository { - -const char *LegacyVolatileContentRepository::minimal_locking = "minimal.locking"; - -bool LegacyVolatileContentRepository::initialize(const std::shared_ptr<Configure> &configure) { - repo_data_.initialize(configure, getName()); - - logger_->log_info("Resizing repo_data_.value_vector for {} count is {}", getName(), repo_data_.max_count); - logger_->log_info("Using a maximum size for {} of {}", getName(), repo_data_.max_size); - - if (configure != nullptr) { - std::string value; - std::stringstream strstream; - strstream << Configure::nifi_volatile_repository_options << getName() << "." << minimal_locking; - if (configure->get(strstream.str(), value)) { - minimize_locking_ = utils::string::toBool(value).value_or(true); - } - } - if (!minimize_locking_) { - repo_data_.clear(); - } - - return true; -} - -std::shared_ptr<io::BaseStream> LegacyVolatileContentRepository::write(const minifi::ResourceClaim &claim, bool /*append*/) { - logger_->log_info("enter write for {}", claim.getContentFullPath()); - { - std::lock_guard<std::mutex> lock(map_mutex_); - auto claim_check = master_list_.find(claim.getContentFullPath()); - if (claim_check != master_list_.end()) { - logger_->log_info("Creating copy of atomic entry"); - auto ent = claim_check->second->takeOwnership(); - if (ent == nullptr) { - return nullptr; - } - return std::make_shared<io::AtomicEntryStream<ResourceClaim::Path>>(claim.getContentFullPath(), ent); - } - } - - int size = 0; - if (LIKELY(minimize_locking_ == true)) { - for (auto ent : repo_data_.value_vector) { - if (ent->testAndSetKey(claim.getContentFullPath())) { - std::lock_guard<std::mutex> lock(map_mutex_); - master_list_[claim.getContentFullPath()] = ent; - logger_->log_info("Minimize locking, return stream for {}", claim.getContentFullPath()); - return std::make_shared<io::AtomicEntryStream<ResourceClaim::Path>>(claim.getContentFullPath(), ent); - } - size++; - } - } else { - std::lock_guard<std::mutex> lock(map_mutex_); - auto claim_check = master_list_.find(claim.getContentFullPath()); - if (claim_check != master_list_.end()) { - return std::make_shared<io::AtomicEntryStream<ResourceClaim::Path>>(claim.getContentFullPath(), claim_check->second); - } else { - auto *ent = new AtomicEntry<ResourceClaim::Path>(&repo_data_.current_size, &repo_data_.max_size); // NOLINT(cppcoreguidelines-owning-memory) - if (ent->testAndSetKey(claim.getContentFullPath())) { - master_list_[claim.getContentFullPath()] = ent; - return std::make_shared<io::AtomicEntryStream<ResourceClaim::Path>>(claim.getContentFullPath(), ent); - } - } - } - logger_->log_info("Cannot write {} {}, returning nullptr to roll back session. Repo is either full or locked", claim.getContentFullPath(), size); - return nullptr; -} - -bool LegacyVolatileContentRepository::exists(const minifi::ResourceClaim &claim) { - std::lock_guard<std::mutex> lock(map_mutex_); - auto claim_check = master_list_.find(claim.getContentFullPath()); - if (claim_check != master_list_.end()) { - auto ent = claim_check->second->takeOwnership(); - return ent != nullptr; - } - - return false; -} - -std::shared_ptr<io::BaseStream> LegacyVolatileContentRepository::read(const minifi::ResourceClaim &claim) { - std::lock_guard<std::mutex> lock(map_mutex_); - auto claim_check = master_list_.find(claim.getContentFullPath()); - if (claim_check != master_list_.end()) { - auto ent = claim_check->second->takeOwnership(); - if (ent == nullptr) { - return nullptr; - } - return std::make_shared<io::AtomicEntryStream<ResourceClaim::Path>>(claim.getContentFullPath(), ent); - } - - return nullptr; -} - -bool LegacyVolatileContentRepository::removeKey(const std::string& content_path) { - if (LIKELY(minimize_locking_ == true)) { - std::lock_guard<std::mutex> lock(map_mutex_); - auto ent = master_list_.find(content_path); - if (ent != master_list_.end()) { - auto ptr = ent->second; - // if we cannot remove the entry we will let the owner's destructor - // decrement the reference count and free it - master_list_.erase(content_path); - // because of the test and set we need to decrement ownership - ptr->decrementOwnership(); - if (ptr->freeValue(content_path)) { - logger_->log_info("Deleting resource {}", content_path); - } else { - logger_->log_info("free failed for {}", content_path); - } - } else { - logger_->log_info("Could not remove {}", content_path); - } - } else { - std::lock_guard<std::mutex> lock(map_mutex_); - auto claim_item = master_list_.find(content_path); - if (claim_item != master_list_.end()) { - auto size = claim_item->second->getLength(); - delete claim_item->second; // NOLINT(cppcoreguidelines-owning-memory) - master_list_.erase(content_path); - repo_data_.current_size -= size; - } - } - - logger_->log_info("Could not remove {}, may not exist", content_path); - return true; -} - -} // namespace org::apache::nifi::minifi::core::repository diff --git a/libminifi/test/flow-tests/SessionTests.cpp b/libminifi/test/flow-tests/SessionTests.cpp index f86540d52..91d1171b1 100644 --- a/libminifi/test/flow-tests/SessionTests.cpp +++ b/libminifi/test/flow-tests/SessionTests.cpp @@ -20,7 +20,6 @@ #include <string> #include "unit/TestBase.h" #include "unit/Catch.h" -#include "core/repository/VolatileFlowFileRepository.h" #include "../../extensions/rocksdb-repos/DatabaseContentRepository.h" #include "../../extensions/rocksdb-repos/FlowFileRepository.h" #include "repository/FileSystemRepository.h" diff --git a/libminifi/test/libtest/unit/ProvenanceTestHelper.h b/libminifi/test/libtest/unit/ProvenanceTestHelper.h index ed06cc250..2b5b70bca 100644 --- a/libminifi/test/libtest/unit/ProvenanceTestHelper.h +++ b/libminifi/test/libtest/unit/ProvenanceTestHelper.h @@ -28,7 +28,6 @@ #include <utility> #include <vector> #include "core/repository/VolatileContentRepository.h" -#include "core/repository/VolatileFlowFileRepository.h" #include "core/Processor.h" #include "core/ThreadedRepository.h" #include "FlowController.h" @@ -131,22 +130,6 @@ class TestRepository : public TestRepositoryBase<org::apache::nifi::minifi::core } }; -class TestVolatileRepository : public TestRepositoryBase<org::apache::nifi::minifi::core::repository::VolatileFlowFileRepository> { - public: - bool start() override { - return true; - } - - bool stop() override { - return true; - } - - void setFull() { - repo_data_.current_size = repo_data_.max_size; - repo_data_.current_entry_count = repo_data_.max_count; - } -}; - class TestThreadedRepository : public TestRepositoryBase<org::apache::nifi::minifi::core::ThreadedRepositoryImpl> { public: ~TestThreadedRepository() override { diff --git a/libminifi/test/libtest/unit/TestBase.cpp b/libminifi/test/libtest/unit/TestBase.cpp index 1fbc35cc8..525ce13c1 100644 --- a/libminifi/test/libtest/unit/TestBase.cpp +++ b/libminifi/test/libtest/unit/TestBase.cpp @@ -42,6 +42,7 @@ #include "minifi-cpp/core/ProcessContext.h" #include "core/ProcessSessionFactory.h" #include "ResourceClaim.h" +#include "io/StreamPipe.h" #include "fmt/format.h" #include "spdlog/sinks/stdout_sinks.h" diff --git a/libminifi/test/persistence-tests/PersistenceTests.cpp b/libminifi/test/persistence-tests/PersistenceTests.cpp index 43dd56d31..8eb5f6702 100644 --- a/libminifi/test/persistence-tests/PersistenceTests.cpp +++ b/libminifi/test/persistence-tests/PersistenceTests.cpp @@ -33,7 +33,6 @@ #include "unit/Catch.h" #include "catch2/matchers/catch_matchers_string.hpp" #include "../../extensions/libarchive/MergeContent.h" -#include "core/repository/VolatileFlowFileRepository.h" #include "../../extensions/rocksdb-repos/DatabaseContentRepository.h" #include "unit/TestUtils.h" #include "core/repository/FileSystemRepository.h" diff --git a/libminifi/test/unit/FlowFileQueueSwapTests.cpp b/libminifi/test/unit/FlowFileQueueSwapTests.cpp index 9c737d86f..65868e131 100644 --- a/libminifi/test/unit/FlowFileQueueSwapTests.cpp +++ b/libminifi/test/unit/FlowFileQueueSwapTests.cpp @@ -26,6 +26,7 @@ #include "unit/Catch.h" #include "unit/ProvenanceTestHelper.h" #include "minifi-cpp/utils/gsl.h" +#include "FlowFileRecord.h" namespace org::apache::nifi::minifi::test { diff --git a/libminifi/test/unit/MetricsTests.cpp b/libminifi/test/unit/MetricsTests.cpp index e118fd12f..25840d2e6 100644 --- a/libminifi/test/unit/MetricsTests.cpp +++ b/libminifi/test/unit/MetricsTests.cpp @@ -170,47 +170,6 @@ TEST_CASE("RepositorymetricsHaveRepo", "[c2m4]") { } } -TEST_CASE("VolatileRepositorymetricsCanBeFull", "[c2m4]") { - minifi::state::response::RepositoryMetrics metrics; - - REQUIRE("RepositoryMetrics" == metrics.getName()); - - auto repo = std::make_shared<TestVolatileRepository>(); - - metrics.addRepository(repo); - { - REQUIRE(1 == metrics.serialize().size()); - - minifi::state::response::SerializedResponseNode resp = metrics.serialize().at(0); - - REQUIRE("repo_name" == resp.name); - REQUIRE(5 == resp.children.size()); - - checkSerializedValue(resp.children, "running", "false"); - checkSerializedValue(resp.children, "full", "false"); - checkSerializedValue(resp.children, "size", "0"); - checkSerializedValue(resp.children, "maxSize", std::to_string(static_cast<int64_t>(TEST_MAX_REPOSITORY_STORAGE_SIZE * 0.75))); - checkSerializedValue(resp.children, "entryCount", "0"); - } - - repo->setFull(); - - { - REQUIRE(1 == metrics.serialize().size()); - - minifi::state::response::SerializedResponseNode resp = metrics.serialize().at(0); - - REQUIRE("repo_name" == resp.name); - REQUIRE(5 == resp.children.size()); - - checkSerializedValue(resp.children, "running", "false"); - checkSerializedValue(resp.children, "full", "true"); - checkSerializedValue(resp.children, "size", std::to_string(static_cast<int64_t>(TEST_MAX_REPOSITORY_STORAGE_SIZE * 0.75))); - checkSerializedValue(resp.children, "maxSize", std::to_string(static_cast<int64_t>(TEST_MAX_REPOSITORY_STORAGE_SIZE * 0.75))); - checkSerializedValue(resp.children, "entryCount", "10000"); - } -} - TEST_CASE("Test on trigger runtime processor metrics", "[ProcessorMetrics]") { auto dummy_processor = minifi::test::utils::make_processor<DummyProcessor>("dummy"); minifi::core::ProcessorMetrics metrics(*dummy_processor); diff --git a/libminifi/test/unit/ProcessSessionTests.cpp b/libminifi/test/unit/ProcessSessionTests.cpp index 8585f6456..1987b00d6 100644 --- a/libminifi/test/unit/ProcessSessionTests.cpp +++ b/libminifi/test/unit/ProcessSessionTests.cpp @@ -25,7 +25,6 @@ #include "unit/Catch.h" #include "unit/ContentRepositoryDependentTests.h" #include "core/Processor.h" -#include "core/repository/VolatileFlowFileRepository.h" #include "unit/TestUtils.h" #include "core/repository/FileSystemRepository.h" @@ -128,47 +127,3 @@ TEST_CASE("ProcessSession::read can read zero length flowfiles without crash", " ContentRepositoryDependentTests::testReadFromZeroLengthFlowFile(std::make_shared<core::repository::VolatileContentRepository>()); ContentRepositoryDependentTests::testReadFromZeroLengthFlowFile(std::make_shared<core::repository::FileSystemRepository>()); } - -struct VolatileFlowFileRepositoryTestAccessor { - METHOD_ACCESSOR(flush); -}; - -class TestVolatileFlowFileRepository : public core::repository::VolatileFlowFileRepository { - public: - explicit TestVolatileFlowFileRepository(const std::string& name) : core::repository::VolatileFlowFileRepository(name) {} - - bool MultiPut(const std::vector<std::pair<std::string, std::unique_ptr<minifi::io::BufferStream>>>& data) override { - auto flush_on_exit = gsl::finally([&] {VolatileFlowFileRepositoryTestAccessor::call_flush(*this);}); - return VolatileFlowFileRepository::MultiPut(data); - } -}; - -TEST_CASE("ProcessSession::commit avoids dangling ResourceClaims when using VolatileFlowFileRepository", "[incrementbefore]") { - TempDirectory tmp_dir; - auto configuration = std::make_shared<minifi::ConfigureImpl>(); - configuration->set(minifi::Configure::nifi_volatile_repository_options_flowfile_max_count, "2"); - auto ff_repo = std::make_shared<TestVolatileFlowFileRepository>("flowfile"); - Fixture fixture({ - .configuration = std::move(configuration), - .flow_file_repo = ff_repo - }); - auto& session = fixture.processSession(); - - const auto flow_file_1 = session.create(); - const auto flow_file_2 = session.create(); - const auto flow_file_3 = session.create(); - session.transfer(flow_file_1, Success); - session.transfer(flow_file_2, Success); - session.transfer(flow_file_3, Success); - session.commit(); - - // flow_files are owned by the shared_ptr on the stack and the ff_repo - // but the first one has been evicted from the ff_repo - REQUIRE(flow_file_1->getResourceClaim()->getFlowFileRecordOwnedCount() == 1); - REQUIRE(flow_file_2->getResourceClaim()->getFlowFileRecordOwnedCount() == 2); - REQUIRE(flow_file_3->getResourceClaim()->getFlowFileRecordOwnedCount() == 2); - - REQUIRE(flow_file_1->getResourceClaim()->exists()); - REQUIRE(flow_file_2->getResourceClaim()->exists()); - REQUIRE(flow_file_3->getResourceClaim()->exists()); -} diff --git a/libminifi/test/unit/SiteToSiteTests.cpp b/libminifi/test/unit/SiteToSiteTests.cpp index 2a0c36cd5..90c6b740d 100644 --- a/libminifi/test/unit/SiteToSiteTests.cpp +++ b/libminifi/test/unit/SiteToSiteTests.cpp @@ -33,6 +33,7 @@ #include "catch2/generators/catch_generators.hpp" #include "io/ZlibStream.h" #include "Connection.h" +#include "io/StreamPipe.h" namespace org::apache::nifi::minifi::test { diff --git a/libminifi/test/unit/performance/VolatileRepositoryPerfTests.cpp b/libminifi/test/unit/performance/VolatileRepositoryPerfTests.cpp deleted file mode 100644 index 2b25c281d..000000000 --- a/libminifi/test/unit/performance/VolatileRepositoryPerfTests.cpp +++ /dev/null @@ -1,258 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <benchmark/benchmark.h> - -#include "core/repository/LegacyVolatileContentRepository.h" -#include "core/repository/VolatileContentRepository.h" -#include "core/logging/LoggerFactory.h" -#include "core/logging/LoggerConfiguration.h" - -// clang-tidy does not appreciate how the google benchmark macros are written -// NOLINTBEGIN - -static bool initializeLogger = [] { - auto log_props = std::make_shared<org::apache::nifi::minifi::core::logging::LoggerProperties>(""); - log_props->set("logger.root", "OFF"); - org::apache::nifi::minifi::core::logging::LoggerConfiguration::getConfiguration().initialize(log_props); - return true; -}(); - -static constexpr int N = 10000; - -static void BM_LegacyVolatileContentRepository_Write(benchmark::State& state) { - auto repo = std::make_shared<org::apache::nifi::minifi::core::repository::LegacyVolatileContentRepository>(); - repo->initialize(nullptr); - for (auto _ : state) { - auto session = repo->createSession(); - auto claim = session->create(); - session->write(claim)->write("Lorem ipsum dolor sit amet, consectetur adipiscing elit"); - session->commit(); - } -} -BENCHMARK(BM_LegacyVolatileContentRepository_Write); - -static void BM_LegacyVolatileContentRepository_Write2(benchmark::State& state) { - auto repo = std::make_shared<org::apache::nifi::minifi::core::repository::LegacyVolatileContentRepository>(); - repo->initialize(nullptr); - for (auto _ : state) { - auto session = repo->createSession(); - auto claim1 = session->create(); - session->write(claim1)->write("Lorem ipsum dolor sit amet, consectetur adipiscing elit"); - auto claim2 = session->create(); - session->write(claim2)->write("Lorem ipsum dolor sit amet, consectetur adipiscing elit"); - session->commit(); - } -} -BENCHMARK(BM_LegacyVolatileContentRepository_Write2); - -static void BM_LegacyVolatileContentRepository_Write3(benchmark::State& state) { - auto repo = std::make_shared<org::apache::nifi::minifi::core::repository::LegacyVolatileContentRepository>(); - repo->initialize(nullptr); - { - auto session = repo->createSession(); - for (int i = 0; i < N; ++i) { - auto claim = session->create(); - session->write(claim)->write("Lorem ipsum dolor sit amet, consectetur adipiscing elit"); - } - session->commit(); - } - for (auto _ : state) { - auto session = repo->createSession(); - auto claim1 = session->create(); - session->write(claim1)->write("Lorem ipsum dolor sit amet, consectetur adipiscing elit"); - auto claim2 = session->create(); - session->write(claim2)->write("Lorem ipsum dolor sit amet, consectetur adipiscing elit"); - session->commit(); - } -} -BENCHMARK(BM_LegacyVolatileContentRepository_Write3); - -static void BM_LegacyVolatileContentRepository_Read(benchmark::State& state) { - auto repo = std::make_shared<org::apache::nifi::minifi::core::repository::LegacyVolatileContentRepository>(); - repo->initialize(nullptr); - std::shared_ptr<org::apache::nifi::minifi::ResourceClaim> claim; - { - auto session = repo->createSession(); - claim = session->create(); - session->write(claim)->write("Lorem ipsum dolor sit amet, consectetur adipiscing elit"); - session->commit(); - } - for (auto _ : state) { - auto session = repo->createSession(); - std::string data; - session->read(claim)->read(data); - session->commit(); - } -} -BENCHMARK(BM_LegacyVolatileContentRepository_Read); - -static void BM_LegacyVolatileContentRepository_Read2(benchmark::State& state) { - auto repo = std::make_shared<org::apache::nifi::minifi::core::repository::LegacyVolatileContentRepository>(); - repo->initialize(nullptr); - std::shared_ptr<org::apache::nifi::minifi::ResourceClaim> claim; - { - auto session = repo->createSession(); - claim = session->create(); - session->write(claim)->write("Lorem ipsum dolor sit amet, consectetur adipiscing elit"); - session->commit(); - } - for (auto _ : state) { - auto session = repo->createSession(); - std::string data; - session->read(claim)->read(data); - session->read(claim)->read(data); - session->commit(); - } -} -BENCHMARK(BM_LegacyVolatileContentRepository_Read2); - -static void BM_LegacyVolatileContentRepository_Read3(benchmark::State& state) { - auto repo = std::make_shared<org::apache::nifi::minifi::core::repository::LegacyVolatileContentRepository>(); - repo->initialize(nullptr); - std::shared_ptr<org::apache::nifi::minifi::ResourceClaim> claim; - { - auto session = repo->createSession(); - for (int i = 0; i < N; ++i) { - claim = session->create(); - session->write(claim)->write("Lorem ipsum dolor sit amet, consectetur adipiscing elit"); - } - session->commit(); - } - for (auto _ : state) { - auto session = repo->createSession(); - std::string data; - session->read(claim)->read(data); - session->read(claim)->read(data); - session->commit(); - } -} -BENCHMARK(BM_LegacyVolatileContentRepository_Read3); - -static void BM_VolatileContentRepository_Write(benchmark::State& state) { - auto repo = std::make_shared<org::apache::nifi::minifi::core::repository::VolatileContentRepository>(); - repo->initialize(nullptr); - for (auto _ : state) { - auto session = repo->createSession(); - auto claim = session->create(); - session->write(claim)->write("Lorem ipsum dolor sit amet, consectetur adipiscing elit"); - session->commit(); - } -} -BENCHMARK(BM_VolatileContentRepository_Write); - -static void BM_VolatileContentRepository_Write2(benchmark::State& state) { - auto repo = std::make_shared<org::apache::nifi::minifi::core::repository::VolatileContentRepository>(); - repo->initialize(nullptr); - for (auto _ : state) { - auto session = repo->createSession(); - auto claim1 = session->create(); - session->write(claim1)->write("Lorem ipsum dolor sit amet, consectetur adipiscing elit"); - auto claim2 = session->create(); - session->write(claim2)->write("Lorem ipsum dolor sit amet, consectetur adipiscing elit"); - session->commit(); - } -} -BENCHMARK(BM_VolatileContentRepository_Write2); - -static void BM_VolatileContentRepository_Write3(benchmark::State& state) { - auto repo = std::make_shared<org::apache::nifi::minifi::core::repository::VolatileContentRepository>(); - repo->initialize(nullptr); - { - auto session = repo->createSession(); - for (int i = 0; i < N; ++i) { - auto claim = session->create(); - session->write(claim)->write("Lorem ipsum dolor sit amet, consectetur adipiscing elit"); - } - session->commit(); - } - for (auto _ : state) { - auto session = repo->createSession(); - auto claim1 = session->create(); - session->write(claim1)->write("Lorem ipsum dolor sit amet, consectetur adipiscing elit"); - auto claim2 = session->create(); - session->write(claim2)->write("Lorem ipsum dolor sit amet, consectetur adipiscing elit"); - session->commit(); - } -} -BENCHMARK(BM_VolatileContentRepository_Write3); - -static void BM_VolatileContentRepository_Read(benchmark::State& state) { - auto repo = std::make_shared<org::apache::nifi::minifi::core::repository::VolatileContentRepository>(); - repo->initialize(nullptr); - std::shared_ptr<org::apache::nifi::minifi::ResourceClaim> claim; - { - auto session = repo->createSession(); - claim = session->create(); - session->write(claim)->write("Lorem ipsum dolor sit amet, consectetur adipiscing elit"); - session->commit(); - } - for (auto _ : state) { - auto session = repo->createSession(); - std::string data; - session->read(claim)->read(data); - session->commit(); - } -} -BENCHMARK(BM_VolatileContentRepository_Read); - -static void BM_VolatileContentRepository_Read2(benchmark::State& state) { - auto repo = std::make_shared<org::apache::nifi::minifi::core::repository::VolatileContentRepository>(); - repo->initialize(nullptr); - std::shared_ptr<org::apache::nifi::minifi::ResourceClaim> claim; - { - auto session = repo->createSession(); - claim = session->create(); - session->write(claim)->write("Lorem ipsum dolor sit amet, consectetur adipiscing elit"); - session->commit(); - } - for (auto _ : state) { - auto session = repo->createSession(); - std::string data; - session->read(claim)->read(data); - session->read(claim)->read(data); - session->commit(); - } -} -BENCHMARK(BM_VolatileContentRepository_Read2); - -static void BM_VolatileContentRepository_Read3(benchmark::State& state) { - auto repo = std::make_shared<org::apache::nifi::minifi::core::repository::VolatileContentRepository>(); - repo->initialize(nullptr); - std::shared_ptr<org::apache::nifi::minifi::ResourceClaim> claim; - { - auto session = repo->createSession(); - for (int i = 0; i < N; ++i) { - claim = session->create(); - session->write(claim)->write("Lorem ipsum dolor sit amet, consectetur adipiscing elit"); - } - session->commit(); - } - for (auto _ : state) { - auto session = repo->createSession(); - std::string data; - session->read(claim)->read(data); - session->read(claim)->read(data); - session->commit(); - } -} -BENCHMARK(BM_VolatileContentRepository_Read3); - -BENCHMARK_MAIN(); - -// NOLINTEND diff --git a/minifi-api/include/minifi-cpp/properties/Configuration.h b/minifi-api/include/minifi-cpp/properties/Configuration.h index a5872be6f..070bc570e 100644 --- a/minifi-api/include/minifi-cpp/properties/Configuration.h +++ b/minifi-api/include/minifi-cpp/properties/Configuration.h @@ -58,13 +58,8 @@ class Configuration : public virtual Properties { static constexpr const char *nifi_content_repository_class_name = "nifi.content.repository.class.name"; static constexpr const char *nifi_content_repository_rocksdb_compression = "nifi.content.repository.rocksdb.compression"; static constexpr const char *nifi_provenance_repository_class_name = "nifi.provenance.repository.class.name"; - static constexpr const char *nifi_volatile_repository_options_flowfile_max_count = "nifi.volatile.repository.options.flowfile.max.count"; - static constexpr const char *nifi_volatile_repository_options_flowfile_max_bytes = "nifi.volatile.repository.options.flowfile.max.bytes"; static constexpr const char *nifi_volatile_repository_options_provenance_max_count = "nifi.volatile.repository.options.provenance.max.count"; static constexpr const char *nifi_volatile_repository_options_provenance_max_bytes = "nifi.volatile.repository.options.provenance.max.bytes"; - static constexpr const char *nifi_volatile_repository_options_content_max_count = "nifi.volatile.repository.options.content.max.count"; - static constexpr const char *nifi_volatile_repository_options_content_max_bytes = "nifi.volatile.repository.options.content.max.bytes"; - static constexpr const char *nifi_volatile_repository_options_content_minimal_locking = "nifi.volatile.repository.options.content.minimal.locking"; static constexpr const char *nifi_provenance_repository_max_storage_size = "nifi.provenance.repository.max.storage.size"; static constexpr const char *nifi_provenance_repository_max_storage_time = "nifi.provenance.repository.max.storage.time"; static constexpr const char *nifi_provenance_repository_directory_default = "nifi.provenance.repository.directory.default"; diff --git a/minifi_main/MiNiFiMain.cpp b/minifi_main/MiNiFiMain.cpp index 0cb4a4ac7..b34408576 100644 --- a/minifi_main/MiNiFiMain.cpp +++ b/minifi_main/MiNiFiMain.cpp @@ -62,7 +62,6 @@ #include "core/RepositoryFactory.h" #include "core/extension/ExtensionManager.h" #include "core/repository/VolatileContentRepository.h" -#include "core/repository/VolatileFlowFileRepository.h" #include "core/state/MetricsPublisherStore.h" #include "properties/Decryptor.h" #include "utils/Environment.h" @@ -71,6 +70,7 @@ #include "utils/file/FileUtils.h" #include "utils/file/PathUtils.h" #include "range/v3/algorithm/min_element.hpp" +#include "core/Repository.h" namespace minifi = org::apache::nifi::minifi; namespace core = minifi::core; @@ -368,7 +368,7 @@ int main(int argc, char **argv) { logger->log_error("Content repository failed to initialize, exiting.."); std::exit(1); } - const bool is_flow_repo_non_persistent = flow_repo->isNoop() || std::dynamic_pointer_cast<core::repository::VolatileFlowFileRepository>(flow_repo) != nullptr; + const bool is_flow_repo_non_persistent = flow_repo->isNoop(); const bool is_content_repo_non_persistent = std::dynamic_pointer_cast<core::repository::VolatileContentRepository>(content_repo) != nullptr; if (is_flow_repo_non_persistent != is_content_repo_non_persistent) { logger->log_error("Both or neither of flowfile and content repositories must be persistent! Exiting..");
