This is an automated email from the ASF dual-hosted git repository. szaszm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 54a643c729575e01e147ae83fa2c3a3684c6ee6a Author: Adam Debreceni <[email protected]> AuthorDate: Fri Apr 25 17:02:20 2025 +0200 MINIFICPP-2541 - Simplify VolatileContentRepository Closes #1945 Signed-off-by: Marton Szasz <[email protected]> --- .github/workflows/ci.yml | 15 +- LICENSE | 1 + NOTICE | 1 + README.md | 14 ++ cmake/FetchBenchmark.cmake | 29 +++ cmake/MiNiFiOptions.cmake | 1 + extensions/rocksdb-repos/tests/RepoTests.cpp | 4 +- ...ository.h => LegacyVolatileContentRepository.h} | 30 +-- .../core/repository/VolatileContentRepository.h | 107 ++------- ...ory.cpp => LegacyVolatileContentRepository.cpp} | 14 +- .../core/repository/VolatileContentRepository.cpp | 207 ++++++++--------- libminifi/test/unit/CMakeLists.txt | 2 + .../test/unit/{ => performance}/CMakeLists.txt | 27 +-- .../performance/VolatileRepositoryPerfTests.cpp | 258 +++++++++++++++++++++ win_build_vs.bat | 5 +- 15 files changed, 464 insertions(+), 251 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index dc4ea07b9..2d1ce025d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -3,7 +3,7 @@ on: [push, pull_request, workflow_dispatch] env: DOCKER_CMAKE_FLAGS: -DDOCKER_VERIFY_THREAD=3 -DUSE_SHARED_LIBS= -DSTRICT_GSL_CHECKS=AUDIT -DCI_BUILD=ON -DENABLE_AWS=ON -DENABLE_KAFKA=ON -DENABLE_MQTT=ON -DENABLE_AZURE=ON -DENABLE_SQL=ON \ -DENABLE_SPLUNK=ON -DENABLE_GCP=ON -DENABLE_OPC=ON -DENABLE_PYTHON_SCRIPTING=ON -DENABLE_LUA_SCRIPTING=ON -DENABLE_KUBERNETES=ON -DENABLE_TEST_PROCESSORS=ON -DENABLE_PROMETHEUS=ON \ - -DENABLE_ELASTICSEARCH=ON -DENABLE_GRAFANA_LOKI=ON -DENABLE_COUCHBASE=ON -DDOCKER_BUILD_ONLY=ON + -DENABLE_ELASTICSEARCH=ON -DENABLE_GRAFANA_LOKI=ON -DENABLE_COUCHBASE=ON -DDOCKER_BUILD_ONLY=ON -DMINIFI_PERFORMANCE_TESTS=ON SCCACHE_GHA_ENABLE: true CCACHE_DIR: ${{ GITHUB.WORKSPACE }}/.ccache jobs: @@ -56,6 +56,7 @@ jobs: -DPORTABLE=ON -DSKIP_TESTS=OFF -DUSE_SHARED_LIBS=ON + -DMINIFI_PERFORMANCE_TESTS=ON steps: - id: checkout uses: actions/checkout@v4 @@ -92,7 +93,7 @@ jobs: run: | # Set core file size limit to unlimited ulimit -c unlimited - ctest --timeout 300 -j4 --output-on-failure --timeout 300 + ctest --timeout 300 -j4 --output-on-failure --timeout 300 -LE performance working-directory: build - name: check-cores if: ${{ failure() && steps.test.conclusion == 'failure' }} @@ -164,6 +165,7 @@ jobs: -DSTATIC_BUILD=ON -DMINIFI_USE_REAL_ODBC_TEST_DRIVER=ON -DUSE_SHARED_LIBS=OFF + -DMINIFI_PERFORMANCE_TESTS=ON steps: - name: Support longpaths run: git config --system core.longpaths true @@ -204,7 +206,7 @@ jobs: path: ~/AppData/Local/Mozilla/sccache/cache key: ${{ runner.os }}-2022-sccache-${{ github.ref }}-${{ github.sha }} - name: Run tests - run: ctest --timeout 300 --parallel %NUMBER_OF_PROCESSORS% -C Release --output-on-failure + run: ctest --timeout 300 --parallel %NUMBER_OF_PROCESSORS% -C Release --output-on-failure -LE performance shell: cmd working-directory: ./build - name: Upload artifact @@ -263,6 +265,7 @@ jobs: -DSKIP_TESTS=OFF -DMINIFI_USE_REAL_ODBC_TEST_DRIVER=OFF -DUSE_SHARED_LIBS=ON + -DMINIFI_PERFORMANCE_TESTS=ON steps: - id: checkout uses: actions/checkout@v4 @@ -316,7 +319,11 @@ jobs: run: | # Set core file size limit to unlimited ulimit -c unlimited - ctest --timeout 300 -j$(nproc) --output-on-failure + ctest --timeout 300 -j$(nproc) --output-on-failure -LE performance + working-directory: build + - name: performance-test + id: performance-test + run: ctest -j1 --verbose -L performance working-directory: build - id: files uses: Ana06/[email protected] diff --git a/LICENSE b/LICENSE index 5033bad4a..fa4c6a70d 100644 --- a/LICENSE +++ b/LICENSE @@ -215,6 +215,7 @@ This project bundles a configuration file from 'Kubernetes Metrics Server' (kube This project bundles 'OpenSSL' which is available under an ALv2 license This project bundles 'gRPC' which is available under an ALv2 license This project bundles 'couchbase-cxx-client' which is available under an ALv2 license +This project bundles 'benchmark' which is available under an ALv2 license The Apache NiFi - MiNiFi C++ project contains subcomponents with separate copyright notices and license terms. Your use of the source code for the these diff --git a/NOTICE b/NOTICE index 81a9644b2..cf9247053 100644 --- a/NOTICE +++ b/NOTICE @@ -76,6 +76,7 @@ This software includes third party software subject to the following copyrights: - couchbase-cxx-client - Copyright 2023-Present Couchbase, Inc. - snappy - Copyright 2011, Google Inc. - llhttp - Copyright Fedor Indutny, 2018. +- benchmark - Copyright 2015 Google Inc. The licenses for these third party components are included in LICENSE.txt diff --git a/README.md b/README.md index b40d40026..380eeeaff 100644 --- a/README.md +++ b/README.md @@ -28,6 +28,7 @@ MiNiFi is a child project effort of Apache NiFi. This repository is for a nativ - [Bootstrapping](#bootstrapping) - [Building For Other Distros](#building-for-other-distros) - [Installation](#installation) + - [Tests](#tests) - [Configuring](#configuring) - [Running](#running) - [Deploying](#deploying) @@ -481,6 +482,19 @@ $ sudo tar xvzf "$MINIFI_PACKAGE" $ cd nifi-minifi-cpp-* ``` +### Tests +If you have enabled tests to be built during the bootstrap process, you can run them +in the build directory. (See `ctest --help` for selecting tests or running them in parallel) +``` +$ ctest --output-on-failure +``` + +The performance tests can similarly be enabled. To execute them and see their output. + +``` +$ ctest --verbose -L performance +``` + ### Configuring The 'conf' directory in the installation root contains a template config.yml document, minifi.properties, and minifi-log.properties. Please see our [Configuration document](CONFIGURE.md) for details on how to configure agents. diff --git a/cmake/FetchBenchmark.cmake b/cmake/FetchBenchmark.cmake new file mode 100644 index 000000000..e65839d56 --- /dev/null +++ b/cmake/FetchBenchmark.cmake @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +include(FetchContent) + +set(BENCHMARK_ENABLE_TESTING "OFF" CACHE STRING "" FORCE) + +FetchContent_Declare( + benchmark + URL https://github.com/google/benchmark/archive/refs/tags/v1.9.1.tar.gz + URL_HASH SHA256=32131c08ee31eeff2c8968d7e874f3cb648034377dfc32a4c377fa8796d84981 + OVERRIDE_FIND_PACKAGE +) +FetchContent_MakeAvailable(benchmark) diff --git a/cmake/MiNiFiOptions.cmake b/cmake/MiNiFiOptions.cmake index 60cf179a9..5dd4d0920 100644 --- a/cmake/MiNiFiOptions.cmake +++ b/cmake/MiNiFiOptions.cmake @@ -44,6 +44,7 @@ endfunction() add_minifi_option(CI_BUILD "Build is used for CI." OFF) add_minifi_option(SKIP_TESTS "Skips building all tests." OFF) +add_minifi_option(MINIFI_PERFORMANCE_TESTS "Build performance tests" OFF) add_minifi_option(DOCKER_BUILD_ONLY "Disables all targets except docker build scripts. Ideal for systems without an up-to-date compiler." OFF) add_minifi_option(DOCKER_SKIP_TESTS "Skip building tests in docker image targets." ON) add_minifi_option(DOCKER_PUSH "Push created images to the specified tags" OFF) diff --git a/extensions/rocksdb-repos/tests/RepoTests.cpp b/extensions/rocksdb-repos/tests/RepoTests.cpp index 35189956d..e153cc140 100644 --- a/extensions/rocksdb-repos/tests/RepoTests.cpp +++ b/extensions/rocksdb-repos/tests/RepoTests.cpp @@ -669,8 +669,8 @@ TEST_CASE("Test getting content repository size properties", "[TestGettingReposi SECTION("VolatileContentRepository") { content_repo = std::make_shared<core::repository::VolatileContentRepository>("content"); - expected_is_full = true; - expected_max_repo_size = content.size(); + expected_is_full = false; + expected_max_repo_size = std::numeric_limits<uint64_t>::max(); } SECTION("DatabaseContentRepository") { diff --git a/libminifi/include/core/repository/VolatileContentRepository.h b/libminifi/include/core/repository/LegacyVolatileContentRepository.h similarity index 77% copy from libminifi/include/core/repository/VolatileContentRepository.h copy to libminifi/include/core/repository/LegacyVolatileContentRepository.h index 1cc2f4d9c..bfbdc7c50 100644 --- a/libminifi/include/core/repository/VolatileContentRepository.h +++ b/libminifi/include/core/repository/LegacyVolatileContentRepository.h @@ -37,18 +37,18 @@ namespace org::apache::nifi::minifi::core::repository { * Purpose: Stages content into a volatile area of memory. Note that when the maximum number * of entries is consumed we will rollback a session to wait for others to be freed. */ -class VolatileContentRepository : public core::ContentRepositoryImpl { +class LegacyVolatileContentRepository : public core::ContentRepositoryImpl { public: static const char *minimal_locking; - explicit VolatileContentRepository(std::string_view name = className<VolatileContentRepository>()) + explicit LegacyVolatileContentRepository(std::string_view name = className<LegacyVolatileContentRepository>()) : core::ContentRepositoryImpl(name), repo_data_(15000, static_cast<size_t>(10_MiB * 0.75)), minimize_locking_(true), - logger_(logging::LoggerFactory<VolatileContentRepository>::getLogger()) { + logger_(logging::LoggerFactory<LegacyVolatileContentRepository>::getLogger()) { } - ~VolatileContentRepository() override { + ~LegacyVolatileContentRepository() override { logger_->log_debug("Clearing repository"); if (!minimize_locking_) { std::lock_guard<std::mutex> lock(map_mutex_); @@ -75,32 +75,14 @@ class VolatileContentRepository : public core::ContentRepositoryImpl { return repo_data_.isFull(); } - /** - * Initialize the volatile content repo - * @param configure configuration - */ bool initialize(const std::shared_ptr<Configure> &configure) override; - /** - * Creates writable stream. - * @param claim resource claim - * @return BaseStream shared pointer that represents the stream the consumer will write to. - */ std::shared_ptr<io::BaseStream> write(const minifi::ResourceClaim &claim, bool append) override; - /** - * Creates readable stream. - * @param claim resource claim - * @return BaseStream shared pointer that represents the stream from which the consumer will read.. - */ std::shared_ptr<io::BaseStream> read(const minifi::ResourceClaim &claim) override; bool exists(const minifi::ResourceClaim &claim) override; - /** - * Closes the claim. - * @return whether or not the claim is associated with content stored in volatile memory. - */ bool close(const minifi::ResourceClaim &claim) override { return remove(claim); } @@ -110,10 +92,6 @@ class VolatileContentRepository : public core::ContentRepositoryImpl { } protected: - /** - * Closes the claim. - * @return whether or not the claim is associated with content stored in volatile memory. - */ bool removeKey(const std::string& content_path) override; private: diff --git a/libminifi/include/core/repository/VolatileContentRepository.h b/libminifi/include/core/repository/VolatileContentRepository.h index 1cc2f4d9c..dd4a9ef2e 100644 --- a/libminifi/include/core/repository/VolatileContentRepository.h +++ b/libminifi/include/core/repository/VolatileContentRepository.h @@ -17,113 +17,38 @@ #pragma once -#include <map> -#include <memory> +#include <unordered_map> #include <string> -#include <string_view> - -#include "AtomicRepoEntries.h" -#include "io/AtomicEntryStream.h" +#include <memory> +#include <mutex> #include "core/ContentRepository.h" -#include "properties/Configure.h" -#include "core/Connectable.h" -#include "core/logging/LoggerFactory.h" -#include "utils/GeneralUtils.h" -#include "VolatileRepositoryData.h" -#include "utils/Literals.h" namespace org::apache::nifi::minifi::core::repository { -/** - * Purpose: Stages content into a volatile area of memory. Note that when the maximum number - * of entries is consumed we will rollback a session to wait for others to be freed. - */ -class VolatileContentRepository : public core::ContentRepositoryImpl { - public: - static const char *minimal_locking; - - explicit VolatileContentRepository(std::string_view name = className<VolatileContentRepository>()) - : core::ContentRepositoryImpl(name), - repo_data_(15000, static_cast<size_t>(10_MiB * 0.75)), - minimize_locking_(true), - logger_(logging::LoggerFactory<VolatileContentRepository>::getLogger()) { - } - - ~VolatileContentRepository() override { - logger_->log_debug("Clearing repository"); - if (!minimize_locking_) { - std::lock_guard<std::mutex> lock(map_mutex_); - for (const auto &item : master_list_) { - delete item.second; - } - master_list_.clear(); - } - } - - uint64_t getRepositorySize() const override { - return repo_data_.getRepositorySize(); - } - uint64_t getMaxRepositorySize() const override { - return repo_data_.getMaxRepositorySize(); - } - - uint64_t getRepositoryEntryCount() const override { - return master_list_.size(); - } - - bool isFull() const override { - return repo_data_.isFull(); - } +class VolatileContentRepository : public ContentRepositoryImpl { + public: + explicit VolatileContentRepository(std::string_view name = className<VolatileContentRepository>()); - /** - * Initialize the volatile content repo - * @param configure configuration - */ + uint64_t getRepositorySize() const override; + uint64_t getMaxRepositorySize() const override; + uint64_t getRepositoryEntryCount() const override; + bool isFull() const override; bool initialize(const std::shared_ptr<Configure> &configure) override; - - /** - * Creates writable stream. - * @param claim resource claim - * @return BaseStream shared pointer that represents the stream the consumer will write to. - */ std::shared_ptr<io::BaseStream> write(const minifi::ResourceClaim &claim, bool append) override; - - /** - * Creates readable stream. - * @param claim resource claim - * @return BaseStream shared pointer that represents the stream from which the consumer will read.. - */ std::shared_ptr<io::BaseStream> read(const minifi::ResourceClaim &claim) override; - bool exists(const minifi::ResourceClaim &claim) override; + bool close(const minifi::ResourceClaim &claim) override; + void clearOrphans() override; - /** - * Closes the claim. - * @return whether or not the claim is associated with content stored in volatile memory. - */ - bool close(const minifi::ResourceClaim &claim) override { - return remove(claim); - } - - void clearOrphans() override { - // there are no persisted orphans to delete - } + ~VolatileContentRepository() override = default; protected: - /** - * Closes the claim. - * @return whether or not the claim is associated with content stored in volatile memory. - */ bool removeKey(const std::string& content_path) override; private: - VolatileRepositoryData repo_data_; - bool minimize_locking_; - - // mutex and master list that represent a cache of Atomic entries. this exists so that we don't have to walk the atomic entry list. - // The idea is to reduce the computational complexity while keeping access as maximally lock free as we can. - std::mutex map_mutex_; - std::map<ResourceClaim::Path, AtomicEntry<ResourceClaim::Path>*> master_list_; + mutable std::mutex data_mtx_; + std::unordered_map<std::string, std::shared_ptr<std::string>> data_; + std::atomic<size_t> total_size_{0}; std::shared_ptr<logging::Logger> logger_; }; diff --git a/libminifi/src/core/repository/VolatileContentRepository.cpp b/libminifi/src/core/repository/LegacyVolatileContentRepository.cpp similarity index 89% copy from libminifi/src/core/repository/VolatileContentRepository.cpp copy to libminifi/src/core/repository/LegacyVolatileContentRepository.cpp index 459f41bc2..a7a469585 100644 --- a/libminifi/src/core/repository/VolatileContentRepository.cpp +++ b/libminifi/src/core/repository/LegacyVolatileContentRepository.cpp @@ -16,7 +16,7 @@ * limitations under the License. */ -#include "core/repository/VolatileContentRepository.h" +#include "core/repository/LegacyVolatileContentRepository.h" #include <cstdio> #include <memory> @@ -31,9 +31,9 @@ using namespace std::literals::chrono_literals; namespace org::apache::nifi::minifi::core::repository { -const char *VolatileContentRepository::minimal_locking = "minimal.locking"; +const char *LegacyVolatileContentRepository::minimal_locking = "minimal.locking"; -bool VolatileContentRepository::initialize(const std::shared_ptr<Configure> &configure) { +bool LegacyVolatileContentRepository::initialize(const std::shared_ptr<Configure> &configure) { repo_data_.initialize(configure, getName()); logger_->log_info("Resizing repo_data_.value_vector for {} count is {}", getName(), repo_data_.max_count); @@ -54,7 +54,7 @@ bool VolatileContentRepository::initialize(const std::shared_ptr<Configure> &con return true; } -std::shared_ptr<io::BaseStream> VolatileContentRepository::write(const minifi::ResourceClaim &claim, bool /*append*/) { +std::shared_ptr<io::BaseStream> LegacyVolatileContentRepository::write(const minifi::ResourceClaim &claim, bool /*append*/) { logger_->log_info("enter write for {}", claim.getContentFullPath()); { std::lock_guard<std::mutex> lock(map_mutex_); @@ -97,7 +97,7 @@ std::shared_ptr<io::BaseStream> VolatileContentRepository::write(const minifi::R return nullptr; } -bool VolatileContentRepository::exists(const minifi::ResourceClaim &claim) { +bool LegacyVolatileContentRepository::exists(const minifi::ResourceClaim &claim) { std::lock_guard<std::mutex> lock(map_mutex_); auto claim_check = master_list_.find(claim.getContentFullPath()); if (claim_check != master_list_.end()) { @@ -108,7 +108,7 @@ bool VolatileContentRepository::exists(const minifi::ResourceClaim &claim) { return false; } -std::shared_ptr<io::BaseStream> VolatileContentRepository::read(const minifi::ResourceClaim &claim) { +std::shared_ptr<io::BaseStream> LegacyVolatileContentRepository::read(const minifi::ResourceClaim &claim) { std::lock_guard<std::mutex> lock(map_mutex_); auto claim_check = master_list_.find(claim.getContentFullPath()); if (claim_check != master_list_.end()) { @@ -122,7 +122,7 @@ std::shared_ptr<io::BaseStream> VolatileContentRepository::read(const minifi::Re return nullptr; } -bool VolatileContentRepository::removeKey(const std::string& content_path) { +bool LegacyVolatileContentRepository::removeKey(const std::string& content_path) { if (LIKELY(minimize_locking_ == true)) { std::lock_guard<std::mutex> lock(map_mutex_); auto ent = master_list_.find(content_path); diff --git a/libminifi/src/core/repository/VolatileContentRepository.cpp b/libminifi/src/core/repository/VolatileContentRepository.cpp index 459f41bc2..c3838e5cd 100644 --- a/libminifi/src/core/repository/VolatileContentRepository.cpp +++ b/libminifi/src/core/repository/VolatileContentRepository.cpp @@ -1,5 +1,4 @@ /** - * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -17,142 +16,136 @@ */ #include "core/repository/VolatileContentRepository.h" +#include "core/logging/LoggerFactory.h" -#include <cstdio> -#include <memory> -#include <string> -#include <thread> +namespace org::apache::nifi::minifi::core::repository { -#include "core/expect.h" -#include "io/FileStream.h" -#include "utils/StringUtils.h" +namespace { -using namespace std::literals::chrono_literals; +class StringRefStream : public io::BaseStream { + public: + StringRefStream(std::shared_ptr<std::string> data, std::mutex& data_store_mtx, std::shared_ptr<std::string>& data_store, std::atomic<size_t>& total_size) + : data_(std::move(data)), data_store_mtx_(data_store_mtx), data_store_(data_store), total_size_(total_size) {} -namespace org::apache::nifi::minifi::core::repository { + [[nodiscard]] size_t size() const override { + return data_->size(); + } -const char *VolatileContentRepository::minimal_locking = "minimal.locking"; + size_t read(std::span<std::byte> out_buffer) override { + auto read_size = std::min(data_->size() - read_offset_, out_buffer.size()); + const auto source_span = as_bytes(std::span{*data_}.subspan(read_offset_, read_size)); + std::copy_n(source_span.data(), source_span.size(), out_buffer.data()); + read_offset_ += read_size; + return read_size; + } -bool VolatileContentRepository::initialize(const std::shared_ptr<Configure> &configure) { - repo_data_.initialize(configure, getName()); + size_t write(const uint8_t *value, size_t len) override { + data_ = std::make_shared<std::string>(*data_); + data_->append(reinterpret_cast<const char*>(value), len); + total_size_ += len; + { + std::lock_guard lock(data_store_mtx_); + data_store_ = data_; + } + return len; + } - logger_->log_info("Resizing repo_data_.value_vector for {} count is {}", getName(), repo_data_.max_count); - logger_->log_info("Using a maximum size for {} of {}", getName(), repo_data_.max_size); + void close() override {} - if (configure != nullptr) { - std::string value; - std::stringstream strstream; - strstream << Configure::nifi_volatile_repository_options << getName() << "." << minimal_locking; - if (configure->get(strstream.str(), value)) { - minimize_locking_ = utils::string::toBool(value).value_or(true); - } + void seek(size_t offset) override { + read_offset_ = std::min(offset, data_->size()); } - if (!minimize_locking_) { - repo_data_.clear(); + + [[nodiscard]] size_t tell() const override { + return read_offset_; + } + + int initialize() override { + return 1; } + [[nodiscard]] std::span<const std::byte> getBuffer() const override { + return as_bytes(std::span{*data_}); + } + + private: + size_t read_offset_{0}; + std::shared_ptr<std::string> data_; + std::mutex& data_store_mtx_; + std::shared_ptr<std::string>& data_store_; + std::atomic<size_t>& total_size_; +}; + +} // namespace + +VolatileContentRepository::VolatileContentRepository(std::string_view name) + : ContentRepositoryImpl(name), + logger_(logging::LoggerFactory<VolatileContentRepository>::getLogger()) {} + +uint64_t VolatileContentRepository::getRepositorySize() const { + return total_size_.load(); +} + +uint64_t VolatileContentRepository::getMaxRepositorySize() const { + return std::numeric_limits<uint64_t>::max(); +} + +uint64_t VolatileContentRepository::getRepositoryEntryCount() const { + std::lock_guard lock(data_mtx_); + return data_.size(); +} + +bool VolatileContentRepository::isFull() const { + return false; +} + +bool VolatileContentRepository::initialize(const std::shared_ptr<Configure>& /*configure*/) { return true; } -std::shared_ptr<io::BaseStream> VolatileContentRepository::write(const minifi::ResourceClaim &claim, bool /*append*/) { - logger_->log_info("enter write for {}", claim.getContentFullPath()); - { - std::lock_guard<std::mutex> lock(map_mutex_); - auto claim_check = master_list_.find(claim.getContentFullPath()); - if (claim_check != master_list_.end()) { - logger_->log_info("Creating copy of atomic entry"); - auto ent = claim_check->second->takeOwnership(); - if (ent == nullptr) { - return nullptr; - } - return std::make_shared<io::AtomicEntryStream<ResourceClaim::Path>>(claim.getContentFullPath(), ent); - } +std::shared_ptr<io::BaseStream> VolatileContentRepository::write(const minifi::ResourceClaim &claim, bool append) { + std::lock_guard lock(data_mtx_); + auto& value_ref = data_[claim.getContentFullPath()]; + if (!value_ref) { + value_ref = std::make_shared<std::string>(); + } else if (!append) { + total_size_ -= value_ref->size(); + value_ref = std::make_shared<std::string>(); } + return std::make_shared<StringRefStream>(value_ref, data_mtx_, value_ref, total_size_); +} - int size = 0; - if (LIKELY(minimize_locking_ == true)) { - for (auto ent : repo_data_.value_vector) { - if (ent->testAndSetKey(claim.getContentFullPath())) { - std::lock_guard<std::mutex> lock(map_mutex_); - master_list_[claim.getContentFullPath()] = ent; - logger_->log_info("Minimize locking, return stream for {}", claim.getContentFullPath()); - return std::make_shared<io::AtomicEntryStream<ResourceClaim::Path>>(claim.getContentFullPath(), ent); - } - size++; - } - } else { - std::lock_guard<std::mutex> lock(map_mutex_); - auto claim_check = master_list_.find(claim.getContentFullPath()); - if (claim_check != master_list_.end()) { - return std::make_shared<io::AtomicEntryStream<ResourceClaim::Path>>(claim.getContentFullPath(), claim_check->second); - } else { - auto *ent = new AtomicEntry<ResourceClaim::Path>(&repo_data_.current_size, &repo_data_.max_size); // NOLINT(cppcoreguidelines-owning-memory) - if (ent->testAndSetKey(claim.getContentFullPath())) { - master_list_[claim.getContentFullPath()] = ent; - return std::make_shared<io::AtomicEntryStream<ResourceClaim::Path>>(claim.getContentFullPath(), ent); - } - } +std::shared_ptr<io::BaseStream> VolatileContentRepository::read(const minifi::ResourceClaim &claim) { + std::lock_guard lock(data_mtx_); + if (auto it = data_.find(claim.getContentFullPath()); it != data_.end()) { + return std::make_shared<StringRefStream>(it->second, data_mtx_, it->second, total_size_); } - logger_->log_info("Cannot write {} {}, returning nullptr to roll back session. Repo is either full or locked", claim.getContentFullPath(), size); return nullptr; } bool VolatileContentRepository::exists(const minifi::ResourceClaim &claim) { - std::lock_guard<std::mutex> lock(map_mutex_); - auto claim_check = master_list_.find(claim.getContentFullPath()); - if (claim_check != master_list_.end()) { - auto ent = claim_check->second->takeOwnership(); - return ent != nullptr; - } - - return false; + std::lock_guard lock(data_mtx_); + return data_.contains(claim.getContentFullPath()); } -std::shared_ptr<io::BaseStream> VolatileContentRepository::read(const minifi::ResourceClaim &claim) { - std::lock_guard<std::mutex> lock(map_mutex_); - auto claim_check = master_list_.find(claim.getContentFullPath()); - if (claim_check != master_list_.end()) { - auto ent = claim_check->second->takeOwnership(); - if (ent == nullptr) { - return nullptr; - } - return std::make_shared<io::AtomicEntryStream<ResourceClaim::Path>>(claim.getContentFullPath(), ent); - } +bool VolatileContentRepository::close(const minifi::ResourceClaim &claim) { + return remove(claim); +} - return nullptr; +void VolatileContentRepository::clearOrphans() { + // there are no persisted orphans to delete } bool VolatileContentRepository::removeKey(const std::string& content_path) { - if (LIKELY(minimize_locking_ == true)) { - std::lock_guard<std::mutex> lock(map_mutex_); - auto ent = master_list_.find(content_path); - if (ent != master_list_.end()) { - auto ptr = ent->second; - // if we cannot remove the entry we will let the owner's destructor - // decrement the reference count and free it - master_list_.erase(content_path); - // because of the test and set we need to decrement ownership - ptr->decrementOwnership(); - if (ptr->freeValue(content_path)) { - logger_->log_info("Deleting resource {}", content_path); - } else { - logger_->log_info("free failed for {}", content_path); - } - } else { - logger_->log_info("Could not remove {}", content_path); - } + std::lock_guard lock(data_mtx_); + if (auto it = data_.find(content_path); it != data_.end()) { + total_size_ -= it->second->size(); + data_.erase(it); + logger_->log_info("Deleting resource {}", content_path); } else { - std::lock_guard<std::mutex> lock(map_mutex_); - auto claim_item = master_list_.find(content_path); - if (claim_item != master_list_.end()) { - auto size = claim_item->second->getLength(); - delete claim_item->second; // NOLINT(cppcoreguidelines-owning-memory) - master_list_.erase(content_path); - repo_data_.current_size -= size; - } + logger_->log_error("Could not find key {}", content_path); } - logger_->log_info("Could not remove {}, may not exist", content_path); return true; } diff --git a/libminifi/test/unit/CMakeLists.txt b/libminifi/test/unit/CMakeLists.txt index 08e93d4ca..1dd43ce00 100644 --- a/libminifi/test/unit/CMakeLists.txt +++ b/libminifi/test/unit/CMakeLists.txt @@ -35,3 +35,5 @@ FOREACH(testfile ${UNIT_TESTS}) add_test(NAME "${testfilename}" COMMAND "${testfilename}") ENDFOREACH() message("-- Finished building ${UNIT_TEST_COUNT} unit test file(s)...") + +add_subdirectory(performance) diff --git a/libminifi/test/unit/CMakeLists.txt b/libminifi/test/unit/performance/CMakeLists.txt similarity index 58% copy from libminifi/test/unit/CMakeLists.txt copy to libminifi/test/unit/performance/CMakeLists.txt index 08e93d4ca..49210c559 100644 --- a/libminifi/test/unit/CMakeLists.txt +++ b/libminifi/test/unit/performance/CMakeLists.txt @@ -17,21 +17,22 @@ # under the License. # -GETSOURCEFILES(UNIT_TESTS "${TEST_DIR}/unit/") - -if (NOT WIN32) - list(REMOVE_ITEM UNIT_TESTS WindowsCertStoreLocationTests.cpp) +if (NOT MINIFI_PERFORMANCE_TESTS) + return() endif() -SET(UNIT_TEST_COUNT 0) -FOREACH(testfile ${UNIT_TESTS}) +include(FetchBenchmark) + +GETSOURCEFILES(PERF_TESTS "${TEST_DIR}/unit/performance") + +SET(PERF_TEST_COUNT 0) +FOREACH(testfile ${PERF_TESTS}) get_filename_component(testfilename "${testfile}" NAME_WE) - add_minifi_executable("${testfilename}" "${TEST_DIR}/unit/${testfile}") - target_compile_definitions("${testfilename}" PRIVATE TZ_DATA_DIR="${CMAKE_BINARY_DIR}/tzdata") - target_compile_definitions("${testfilename}" PRIVATE TEST_RESOURCES="${TEST_RESOURCES}") - createTests("${testfilename}") - target_link_libraries(${testfilename} Catch2WithMain) - MATH(EXPR UNIT_TEST_COUNT "${UNIT_TEST_COUNT}+1") + add_minifi_executable("${testfilename}" "${TEST_DIR}/unit/performance/${testfile}") + target_link_libraries(${testfilename} benchmark::benchmark core-minifi) + target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/libminifi/include") + MATH(EXPR PERF_TEST_COUNT "${PERF_TEST_COUNT}+1") add_test(NAME "${testfilename}" COMMAND "${testfilename}") + set_tests_properties(${testfilename} PROPERTIES LABELS "performance") ENDFOREACH() -message("-- Finished building ${UNIT_TEST_COUNT} unit test file(s)...") +message("-- Finished building ${PERF_TEST_COUNT} performance test file(s)...") diff --git a/libminifi/test/unit/performance/VolatileRepositoryPerfTests.cpp b/libminifi/test/unit/performance/VolatileRepositoryPerfTests.cpp new file mode 100644 index 000000000..44a65056d --- /dev/null +++ b/libminifi/test/unit/performance/VolatileRepositoryPerfTests.cpp @@ -0,0 +1,258 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <benchmark/benchmark.h> + +#include "core/repository/LegacyVolatileContentRepository.h" +#include "core/repository/VolatileContentRepository.h" +#include "core/logging/LoggerFactory.h" +#include "core/logging/LoggerConfiguration.h" + +// clang-tidy does not appreciate how the google benchmark macros are written +// NOLINTBEGIN + +static bool initializeLogger = [] { + auto log_props = std::make_shared<org::apache::nifi::minifi::core::logging::LoggerProperties>(); + log_props->set("logger.root", "OFF"); + org::apache::nifi::minifi::core::logging::LoggerConfiguration::getConfiguration().initialize(log_props); + return true; +}(); + +static constexpr int N = 10000; + +static void BM_LegacyVolatileContentRepository_Write(benchmark::State& state) { + auto repo = std::make_shared<org::apache::nifi::minifi::core::repository::LegacyVolatileContentRepository>(); + repo->initialize(nullptr); + for (auto _ : state) { + auto session = repo->createSession(); + auto claim = session->create(); + session->write(claim)->write("Lorem ipsum dolor sit amet, consectetur adipiscing elit"); + session->commit(); + } +} +BENCHMARK(BM_LegacyVolatileContentRepository_Write); + +static void BM_LegacyVolatileContentRepository_Write2(benchmark::State& state) { + auto repo = std::make_shared<org::apache::nifi::minifi::core::repository::LegacyVolatileContentRepository>(); + repo->initialize(nullptr); + for (auto _ : state) { + auto session = repo->createSession(); + auto claim1 = session->create(); + session->write(claim1)->write("Lorem ipsum dolor sit amet, consectetur adipiscing elit"); + auto claim2 = session->create(); + session->write(claim2)->write("Lorem ipsum dolor sit amet, consectetur adipiscing elit"); + session->commit(); + } +} +BENCHMARK(BM_LegacyVolatileContentRepository_Write2); + +static void BM_LegacyVolatileContentRepository_Write3(benchmark::State& state) { + auto repo = std::make_shared<org::apache::nifi::minifi::core::repository::LegacyVolatileContentRepository>(); + repo->initialize(nullptr); + { + auto session = repo->createSession(); + for (int i = 0; i < N; ++i) { + auto claim = session->create(); + session->write(claim)->write("Lorem ipsum dolor sit amet, consectetur adipiscing elit"); + } + session->commit(); + } + for (auto _ : state) { + auto session = repo->createSession(); + auto claim1 = session->create(); + session->write(claim1)->write("Lorem ipsum dolor sit amet, consectetur adipiscing elit"); + auto claim2 = session->create(); + session->write(claim2)->write("Lorem ipsum dolor sit amet, consectetur adipiscing elit"); + session->commit(); + } +} +BENCHMARK(BM_LegacyVolatileContentRepository_Write3); + +static void BM_LegacyVolatileContentRepository_Read(benchmark::State& state) { + auto repo = std::make_shared<org::apache::nifi::minifi::core::repository::LegacyVolatileContentRepository>(); + repo->initialize(nullptr); + std::shared_ptr<org::apache::nifi::minifi::ResourceClaim> claim; + { + auto session = repo->createSession(); + claim = session->create(); + session->write(claim)->write("Lorem ipsum dolor sit amet, consectetur adipiscing elit"); + session->commit(); + } + for (auto _ : state) { + auto session = repo->createSession(); + std::string data; + session->read(claim)->read(data); + session->commit(); + } +} +BENCHMARK(BM_LegacyVolatileContentRepository_Read); + +static void BM_LegacyVolatileContentRepository_Read2(benchmark::State& state) { + auto repo = std::make_shared<org::apache::nifi::minifi::core::repository::LegacyVolatileContentRepository>(); + repo->initialize(nullptr); + std::shared_ptr<org::apache::nifi::minifi::ResourceClaim> claim; + { + auto session = repo->createSession(); + claim = session->create(); + session->write(claim)->write("Lorem ipsum dolor sit amet, consectetur adipiscing elit"); + session->commit(); + } + for (auto _ : state) { + auto session = repo->createSession(); + std::string data; + session->read(claim)->read(data); + session->read(claim)->read(data); + session->commit(); + } +} +BENCHMARK(BM_LegacyVolatileContentRepository_Read2); + +static void BM_LegacyVolatileContentRepository_Read3(benchmark::State& state) { + auto repo = std::make_shared<org::apache::nifi::minifi::core::repository::LegacyVolatileContentRepository>(); + repo->initialize(nullptr); + std::shared_ptr<org::apache::nifi::minifi::ResourceClaim> claim; + { + auto session = repo->createSession(); + for (int i = 0; i < N; ++i) { + claim = session->create(); + session->write(claim)->write("Lorem ipsum dolor sit amet, consectetur adipiscing elit"); + } + session->commit(); + } + for (auto _ : state) { + auto session = repo->createSession(); + std::string data; + session->read(claim)->read(data); + session->read(claim)->read(data); + session->commit(); + } +} +BENCHMARK(BM_LegacyVolatileContentRepository_Read3); + +static void BM_VolatileContentRepository_Write(benchmark::State& state) { + auto repo = std::make_shared<org::apache::nifi::minifi::core::repository::VolatileContentRepository>(); + repo->initialize(nullptr); + for (auto _ : state) { + auto session = repo->createSession(); + auto claim = session->create(); + session->write(claim)->write("Lorem ipsum dolor sit amet, consectetur adipiscing elit"); + session->commit(); + } +} +BENCHMARK(BM_VolatileContentRepository_Write); + +static void BM_VolatileContentRepository_Write2(benchmark::State& state) { + auto repo = std::make_shared<org::apache::nifi::minifi::core::repository::VolatileContentRepository>(); + repo->initialize(nullptr); + for (auto _ : state) { + auto session = repo->createSession(); + auto claim1 = session->create(); + session->write(claim1)->write("Lorem ipsum dolor sit amet, consectetur adipiscing elit"); + auto claim2 = session->create(); + session->write(claim2)->write("Lorem ipsum dolor sit amet, consectetur adipiscing elit"); + session->commit(); + } +} +BENCHMARK(BM_VolatileContentRepository_Write2); + +static void BM_VolatileContentRepository_Write3(benchmark::State& state) { + auto repo = std::make_shared<org::apache::nifi::minifi::core::repository::VolatileContentRepository>(); + repo->initialize(nullptr); + { + auto session = repo->createSession(); + for (int i = 0; i < N; ++i) { + auto claim = session->create(); + session->write(claim)->write("Lorem ipsum dolor sit amet, consectetur adipiscing elit"); + } + session->commit(); + } + for (auto _ : state) { + auto session = repo->createSession(); + auto claim1 = session->create(); + session->write(claim1)->write("Lorem ipsum dolor sit amet, consectetur adipiscing elit"); + auto claim2 = session->create(); + session->write(claim2)->write("Lorem ipsum dolor sit amet, consectetur adipiscing elit"); + session->commit(); + } +} +BENCHMARK(BM_VolatileContentRepository_Write3); + +static void BM_VolatileContentRepository_Read(benchmark::State& state) { + auto repo = std::make_shared<org::apache::nifi::minifi::core::repository::VolatileContentRepository>(); + repo->initialize(nullptr); + std::shared_ptr<org::apache::nifi::minifi::ResourceClaim> claim; + { + auto session = repo->createSession(); + claim = session->create(); + session->write(claim)->write("Lorem ipsum dolor sit amet, consectetur adipiscing elit"); + session->commit(); + } + for (auto _ : state) { + auto session = repo->createSession(); + std::string data; + session->read(claim)->read(data); + session->commit(); + } +} +BENCHMARK(BM_VolatileContentRepository_Read); + +static void BM_VolatileContentRepository_Read2(benchmark::State& state) { + auto repo = std::make_shared<org::apache::nifi::minifi::core::repository::VolatileContentRepository>(); + repo->initialize(nullptr); + std::shared_ptr<org::apache::nifi::minifi::ResourceClaim> claim; + { + auto session = repo->createSession(); + claim = session->create(); + session->write(claim)->write("Lorem ipsum dolor sit amet, consectetur adipiscing elit"); + session->commit(); + } + for (auto _ : state) { + auto session = repo->createSession(); + std::string data; + session->read(claim)->read(data); + session->read(claim)->read(data); + session->commit(); + } +} +BENCHMARK(BM_VolatileContentRepository_Read2); + +static void BM_VolatileContentRepository_Read3(benchmark::State& state) { + auto repo = std::make_shared<org::apache::nifi::minifi::core::repository::VolatileContentRepository>(); + repo->initialize(nullptr); + std::shared_ptr<org::apache::nifi::minifi::ResourceClaim> claim; + { + auto session = repo->createSession(); + for (int i = 0; i < N; ++i) { + claim = session->create(); + session->write(claim)->write("Lorem ipsum dolor sit amet, consectetur adipiscing elit"); + } + session->commit(); + } + for (auto _ : state) { + auto session = repo->createSession(); + std::string data; + session->read(claim)->read(data); + session->read(claim)->read(data); + session->commit(); + } +} +BENCHMARK(BM_VolatileContentRepository_Read3); + +BENCHMARK_MAIN(); + +// NOLINTEND diff --git a/win_build_vs.bat b/win_build_vs.bat index b28bd3881..7d85ed9ed 100644 --- a/win_build_vs.bat +++ b/win_build_vs.bat @@ -55,6 +55,7 @@ set ucrt=OFF set real_odbc=OFF set sccache_arg= set vc_redist=OFF +set performance_tests=OFF set arg_counter=0 for %%x in (%*) do ( @@ -94,6 +95,7 @@ for %%x in (%*) do ( if [%%~x] EQU [/NINJA] set generator="Ninja" if [%%~x] EQU [/SCCACHE] set "sccache_arg=-DCMAKE_C_COMPILER_LAUNCHER=sccache -DCMAKE_CXX_COMPILER_LAUNCHER=sccache" if [%%~x] EQU [/VC_REDIST] set vc_redist=ON + if [%%~x] EQU [/PERF] set performance_tests=ON ) mkdir %builddir% @@ -115,6 +117,7 @@ cmake -G %generator% %build_platform_cmd% -DMINIFI_INCLUDE_VC_REDIST_MERGE_MODUL -DENABLE_MQTT=%enable_mqtt% -DENABLE_OPC=%enable_opc% -DENABLE_OPS=%enable_ops% ^ -DENABLE_PYTHON_SCRIPTING=%enable_python_scripting% -DENABLE_GRAFANA_LOKI=%enable_grafana_loki% -DENABLE_COUCHBASE=%enable_couchbase% ^ -DBUILD_ROCKSDB=ON -DUSE_SYSTEM_UUID=OFF -DENABLE_LIBARCHIVE=ON -DENABLE_WEL=ON -DMINIFI_FAIL_ON_WARNINGS=OFF -DSKIP_TESTS=%skiptests% -DMINIFI_INCLUDE_VC_REDIST_DLLS=%vc_redist% ^ + -DMINIFI_PERFORMANCE_TESTS=%performance_tests% ^ %strict_gsl_checks% -DMINIFI_INCLUDE_UCRT_DLLS=%ucrt% %sccache_arg% %EXTRA_CMAKE_ARGUMENTS% "%scriptdir%" && %buildcmd% IF %ERRORLEVEL% NEQ 0 EXIT /b %ERRORLEVEL% if [%cpack%] EQU [ON] ( @@ -123,7 +126,7 @@ if [%cpack%] EQU [ON] ( ) if [%skiptests%] NEQ [ON] ( if [%skiptestrun%] NEQ [ON] ( - ctest --timeout 300 --parallel 8 -C %cmake_build_type% --output-on-failure + ctest --timeout 300 --parallel 8 -C %cmake_build_type% --output-on-failure -LE performance IF !ERRORLEVEL! NEQ 0 ( popd & exit /b !ERRORLEVEL! ) ) )
