This is an automated email from the ASF dual-hosted git repository. lordgamez pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 344c08cb03f87e57cb5806b02dc7a420ba5efacc Author: Adam Debreceni <[email protected]> AuthorDate: Thu Mar 18 17:11:02 2021 +0100 MINIFICPP-1525 - Support flow file swapping in Connection Signed-off-by: Gabor Gyimesi <[email protected]> This closes #1038 --- extensions/rocksdb-repos/FlowFileLoader.cpp | 115 ++++++++++ extensions/rocksdb-repos/FlowFileLoader.h | 65 ++++++ extensions/rocksdb-repos/FlowFileRepository.cpp | 1 + extensions/rocksdb-repos/FlowFileRepository.h | 25 ++- .../tests/unit/YamlConnectionParserTest.cpp | 9 + libminifi/include/Connection.h | 10 + .../{utils/FlowFileQueue.h => SwapManager.h} | 43 ++-- libminifi/include/core/FlowFile.h | 4 + libminifi/include/core/Repository.h | 3 +- libminifi/include/core/yaml/YamlConnectionParser.h | 1 + libminifi/include/utils/FlowFileQueue.h | 72 ++++++- libminifi/include/utils/MinMaxHeap.h | 76 +++++++ libminifi/include/utils/MinifiConcurrentQueue.h | 11 +- libminifi/include/utils/TestUtils.h | 18 ++ libminifi/include/utils/TimeUtil.h | 4 + libminifi/src/Connection.cpp | 31 ++- libminifi/src/core/FlowConfiguration.cpp | 9 + libminifi/src/core/yaml/YamlConfiguration.cpp | 1 + libminifi/src/core/yaml/YamlConnectionParser.cpp | 18 +- libminifi/src/utils/FlowFileQueue.cpp | 235 +++++++++++++++++--- libminifi/src/utils/ThreadPool.cpp | 8 +- libminifi/test/Utils.h | 17 ++ libminifi/test/rocksdb-tests/SwapTests.cpp | 137 ++++++++++++ libminifi/test/unit/FlowFileQueueSwapTests.cpp | 183 ++++++++++++++++ libminifi/test/unit/FlowFileQueueTests.cpp | 19 +- libminifi/test/unit/ProvenanceTestHelper.h | 5 +- libminifi/test/unit/SwapTestController.h | 240 +++++++++++++++++++++ 27 files changed, 1279 insertions(+), 81 deletions(-) diff --git a/extensions/rocksdb-repos/FlowFileLoader.cpp b/extensions/rocksdb-repos/FlowFileLoader.cpp new file mode 100644 index 000000000..f2a13f7bc --- /dev/null +++ b/extensions/rocksdb-repos/FlowFileLoader.cpp @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "FlowFileLoader.h" + +#include <span> +#include <memory> +#include <string> +#include <vector> +#include <utility> + +#include "logging/LoggerConfiguration.h" +#include "FlowFileRecord.h" + +namespace org::apache::nifi::minifi { + +FlowFileLoader::FlowFileLoader(gsl::not_null<minifi::internal::RocksDatabase*> db, std::shared_ptr<core::ContentRepository> content_repo) + : db_(db), + content_repo_(std::move(content_repo)), + logger_(core::logging::LoggerFactory<FlowFileLoader>::getLogger()) {} + +FlowFileLoader::~FlowFileLoader() { + stop(); +} + +std::future<FlowFileLoader::FlowFilePtrVec> FlowFileLoader::load(std::vector<SwappedFlowFile> flow_files) { + auto promise = std::make_shared<std::promise<FlowFilePtrVec>>(); + std::future<FlowFilePtrVec> future = promise->get_future(); + utils::Worker<utils::TaskRescheduleInfo> task{[this, flow_files = std::move(flow_files), promise = std::move(promise)] { + return loadImpl(flow_files, promise); + }, + "", // doesn't matter that tasks alias by name, as we never actually query their status or stop a single task + std::make_unique<utils::ComplexMonitor>()}; + // the dummy_future is for the return value of the Worker's lambda, rerunning this lambda + // depends on run_determinant + result + // we could create a custom run_determinant to instead determine if/when it should be rerun + // based on the lambda's return value (e.g. it could return a nonstd::expected<FlowFilePtrVec, TaskRescheduleInfo>) + // but then the std::future would also bear this type + std::future<utils::TaskRescheduleInfo> dummy_future; + thread_pool_.execute(std::move(task), dummy_future); + return future; +} + +void FlowFileLoader::start() { + thread_pool_.start(); +} + +void FlowFileLoader::stop() { + thread_pool_.shutdown(); +} + +utils::TaskRescheduleInfo FlowFileLoader::loadImpl(const std::vector<SwappedFlowFile>& flow_files, const std::shared_ptr<std::promise<FlowFilePtrVec>>& output) { + auto opendb = db_->open(); + if (!opendb) { + logger_->log_error("Couldn't open database to swap-in flow files"); + return utils::TaskRescheduleInfo::RetryIn(std::chrono::seconds{30}); + } + try { + FlowFilePtrVec result; + result.reserve(flow_files.size()); + rocksdb::ReadOptions read_options; + std::vector<utils::SmallString<36>> serialized_keys; + serialized_keys.reserve(flow_files.size()); + for (const auto& item : flow_files) { + serialized_keys.push_back(item.id.to_string()); + } + std::vector<rocksdb::Slice> keys; + keys.reserve(flow_files.size()); + for (size_t idx = 0; idx < flow_files.size(); ++idx) { + keys.emplace_back(serialized_keys[idx].data(), serialized_keys[idx].length()); + } + std::vector<std::string> serialized_items; + serialized_items.reserve(flow_files.size()); + std::vector<rocksdb::Status> statuses = opendb->MultiGet(read_options, keys, &serialized_items); + for (size_t idx = 0; idx < statuses.size(); ++idx) { + if (!statuses[idx].ok()) { + logger_->log_error("Failed to fetch flow file \"%s\"", serialized_keys[idx]); + return utils::TaskRescheduleInfo::RetryIn(std::chrono::seconds{30}); + } + utils::Identifier container_id; + auto flow_file = FlowFileRecord::DeSerialize( + std::as_bytes(std::span(serialized_items[idx])), content_repo_, container_id); + if (!flow_file) { + // corrupted flow file + logger_->log_error("Failed to deserialize flow file \"%s\"", serialized_keys[idx]); + } else { + flow_file->setStoredToRepository(true); + flow_file->setPenaltyExpiration(flow_files[idx].to_be_processed_after); + result.push_back(std::move(flow_file)); + logger_->log_debug("Deserialized flow file \"%s\"", serialized_keys[idx]); + } + } + output->set_value(result); + return utils::TaskRescheduleInfo::Done(); + } catch (const std::exception& err) { + logger_->log_error("Error while swapping flow files in: %s", err.what()); + return utils::TaskRescheduleInfo::RetryIn(std::chrono::seconds{60}); + } +} + +} // namespace org::apache::nifi::minifi diff --git a/extensions/rocksdb-repos/FlowFileLoader.h b/extensions/rocksdb-repos/FlowFileLoader.h new file mode 100644 index 000000000..9c7ee6d51 --- /dev/null +++ b/extensions/rocksdb-repos/FlowFileLoader.h @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <future> +#include <list> +#include <vector> +#include <memory> + +#include "database/RocksDatabase.h" +#include "FlowFile.h" +#include "utils/gsl.h" +#include "core/ContentRepository.h" +#include "SwapManager.h" +#include "utils/ThreadPool.h" +#include "core/logging/Logger.h" + +namespace org::apache::nifi::minifi { + +class FlowFileLoader { + using FlowFilePtr = std::shared_ptr<core::FlowFile>; + using FlowFilePtrVec = std::vector<FlowFilePtr>; + + static constexpr size_t thread_count_ = 1; + + public: + FlowFileLoader(gsl::not_null<minifi::internal::RocksDatabase*> db, std::shared_ptr<core::ContentRepository> content_repo); + + ~FlowFileLoader(); + + void start(); + + void stop(); + + std::future<FlowFilePtrVec> load(std::vector<SwappedFlowFile> flow_files); + + private: + utils::TaskRescheduleInfo loadImpl(const std::vector<SwappedFlowFile>& flow_files, const std::shared_ptr<std::promise<FlowFilePtrVec>>& output); + + utils::ThreadPool<utils::TaskRescheduleInfo> thread_pool_{thread_count_, false, nullptr, "FlowFileLoaderThreadPool"}; + + gsl::not_null<minifi::internal::RocksDatabase*> db_; + + // TODO(adebreceni): shared_ptr is needed to call FlowFileRecord::Deserialize + // this ownership could be removed if that changes + std::shared_ptr<core::ContentRepository> content_repo_; + std::shared_ptr<core::logging::Logger> logger_; +}; + +} // namespace org::apache::nifi::minifi diff --git a/extensions/rocksdb-repos/FlowFileRepository.cpp b/extensions/rocksdb-repos/FlowFileRepository.cpp index 6bd2f4982..39355dc80 100644 --- a/extensions/rocksdb-repos/FlowFileRepository.cpp +++ b/extensions/rocksdb-repos/FlowFileRepository.cpp @@ -251,6 +251,7 @@ void FlowFileRepository::initialize_repository() { void FlowFileRepository::loadComponent(const std::shared_ptr<core::ContentRepository> &content_repo) { content_repo_ = content_repo; repo_size_ = 0; + swap_loader_ = std::make_unique<FlowFileLoader>(gsl::make_not_null(db_.get()), content_repo_); initialize_repository(); } diff --git a/extensions/rocksdb-repos/FlowFileRepository.h b/extensions/rocksdb-repos/FlowFileRepository.h index 6906979d2..487363cd1 100644 --- a/extensions/rocksdb-repos/FlowFileRepository.h +++ b/extensions/rocksdb-repos/FlowFileRepository.h @@ -34,6 +34,9 @@ #include "database/RocksDatabase.h" #include "encryption/RocksDbEncryptionProvider.h" #include "utils/crypto/EncryptionProvider.h" +#include "SwapManager.h" +#include "FlowFileLoader.h" +#include "range/v3/algorithm/all_of.hpp" namespace org::apache::nifi::minifi::core::repository { @@ -53,7 +56,7 @@ constexpr auto FLOWFILE_REPOSITORY_RETRY_INTERVAL_INCREMENTS = std::chrono::mill * Flow File repository * Design: Extends Repository and implements the run function, using rocksdb as the primary substrate. */ -class FlowFileRepository : public core::Repository { +class FlowFileRepository : public core::Repository, public SwapManager, public std::enable_shared_from_this<FlowFileRepository> { public: static constexpr const char* ENCRYPTION_KEY_NAME = "nifi.flowfile.repository.encryption.key"; // Constructor @@ -203,6 +206,25 @@ class FlowFileRepository : public core::Repository { running_ = true; thread_ = std::thread(&FlowFileRepository::run, this); logger_->log_debug("%s Repository Monitor Thread Start", getName()); + if (swap_loader_) { + swap_loader_->start(); + } + } + + void stop() override { + if (swap_loader_) { + swap_loader_->stop(); + } + core::Repository::stop(); + } + + void store(std::vector<std::shared_ptr<core::FlowFile>> flow_files) override { + gsl_Expects(ranges::all_of(flow_files, &FlowFile::isStored)); + // pass, flowfiles are already persisted in the repository + } + + std::future<std::vector<std::shared_ptr<core::FlowFile>>> load(std::vector<SwappedFlowFile> flow_files) override { + return swap_loader_->load(std::move(flow_files)); } private: @@ -229,6 +251,7 @@ class FlowFileRepository : public core::Repository { std::shared_ptr<core::ContentRepository> content_repo_; std::unique_ptr<minifi::internal::RocksDatabase> db_; std::unique_ptr<rocksdb::Checkpoint> checkpoint_; + std::unique_ptr<FlowFileLoader> swap_loader_; std::shared_ptr<logging::Logger> logger_; std::shared_ptr<minifi::Configure> config_; }; diff --git a/extensions/standard-processors/tests/unit/YamlConnectionParserTest.cpp b/extensions/standard-processors/tests/unit/YamlConnectionParserTest.cpp index 434134ca2..0276ddd90 100644 --- a/extensions/standard-processors/tests/unit/YamlConnectionParserTest.cpp +++ b/extensions/standard-processors/tests/unit/YamlConnectionParserTest.cpp @@ -67,6 +67,12 @@ TEST_CASE("Connections components are parsed from yaml", "[YamlConfiguration]") REQUIRE(231 == yaml_connection_parser.getWorkQueueSizeFromYaml()); REQUIRE(12582912 == yaml_connection_parser.getWorkQueueDataSizeFromYaml()); // 12 * 1024 * 1024 B } + SECTION("Queue swap threshold is read") { + YAML::Node connection_node = YAML::Load(std::string { + "swap threshold: 231\n" }); + YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger); + REQUIRE(231 == yaml_connection_parser.getSwapThresholdFromYaml()); + } SECTION("Source and destination names and uuids are read") { const utils::Identifier expected_source_id = utils::generateUUID(); const utils::Identifier expected_destination_id = utils::generateUUID(); @@ -121,6 +127,7 @@ TEST_CASE("Connections components are parsed from yaml", "[YamlConfiguration]") CHECK_THROWS(yaml_connection_parser.configureConnectionSourceRelationshipsFromYaml(*connection)); CHECK_NOTHROW(yaml_connection_parser.getWorkQueueSizeFromYaml()); CHECK_NOTHROW(yaml_connection_parser.getWorkQueueDataSizeFromYaml()); + CHECK_NOTHROW(yaml_connection_parser.getSwapThresholdFromYaml()); CHECK_THROWS(yaml_connection_parser.getSourceUUIDFromYaml()); CHECK_THROWS(yaml_connection_parser.getDestinationUUIDFromYaml()); CHECK_NOTHROW(yaml_connection_parser.getFlowFileExpirationFromYaml()); @@ -163,11 +170,13 @@ TEST_CASE("Connections components are parsed from yaml", "[YamlConfiguration]") YAML::Node connection_node = YAML::Load(std::string { "max work queue size: \n" "max work queue data size: \n" + "swap threshold: \n" "flowfile expiration: \n" "drop empty: \n"}); YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger); CHECK(0 == yaml_connection_parser.getWorkQueueSizeFromYaml()); CHECK(0 == yaml_connection_parser.getWorkQueueDataSizeFromYaml()); + CHECK(0 == yaml_connection_parser.getSwapThresholdFromYaml()); CHECK(0s == yaml_connection_parser.getFlowFileExpirationFromYaml()); CHECK(0 == yaml_connection_parser.getDropEmptyFromYaml()); } diff --git a/libminifi/include/Connection.h b/libminifi/include/Connection.h index e1196c92e..dcf4af558 100644 --- a/libminifi/include/Connection.h +++ b/libminifi/include/Connection.h @@ -35,9 +35,12 @@ #include "core/Repository.h" #include "utils/FlowFileQueue.h" +struct ConnectionTestAccessor; + namespace org::apache::nifi::minifi { class Connection : public core::Connectable { + friend struct ::ConnectionTestAccessor; public: explicit Connection(std::shared_ptr<core::Repository> flow_repository, std::shared_ptr<core::ContentRepository> content_repo, const std::string &name); explicit Connection(std::shared_ptr<core::Repository> flow_repository, std::shared_ptr<core::ContentRepository> content_repo, const std::string &name, const utils::Identifier &uuid); @@ -45,6 +48,8 @@ class Connection : public core::Connectable { const utils::Identifier &srcUUID); explicit Connection(std::shared_ptr<core::Repository> flow_repository, std::shared_ptr<core::ContentRepository> content_repo, const std::string &name, const utils::Identifier &uuid, const utils::Identifier &srcUUID, const utils::Identifier &destUUID); + explicit Connection(std::shared_ptr<core::Repository> flow_repository, std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<SwapManager> swap_manager, + const std::string& name, const utils::Identifier& uuid); // Destructor ~Connection() override = default; @@ -110,6 +115,11 @@ class Connection : public core::Connectable { void setMaxQueueDataSize(uint64_t size) { max_data_queue_size_ = size; } + void setSwapThreshold(uint64_t size) { + queue_.setTargetSize(size); + queue_.setMinSize(size / 2); + queue_.setMaxSize(size * 3 / 2); + } // Get Max Queue Data Size uint64_t getMaxQueueDataSize() { return max_data_queue_size_; diff --git a/libminifi/include/utils/FlowFileQueue.h b/libminifi/include/SwapManager.h similarity index 56% copy from libminifi/include/utils/FlowFileQueue.h copy to libminifi/include/SwapManager.h index a6d343fbf..ed3535dbc 100644 --- a/libminifi/include/utils/FlowFileQueue.h +++ b/libminifi/include/SwapManager.h @@ -14,41 +14,28 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + #pragma once -#include <memory> -#include <queue> +#include <future> #include <vector> +#include <memory> #include "core/FlowFile.h" +#include "utils/Id.h" -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace utils { - -class FlowFileQueue { - public: - using value_type = std::shared_ptr<core::FlowFile>; - - value_type pop(); - void push(const value_type& element); - void push(value_type&& element); - bool isWorkAvailable() const; - bool empty() const; - size_t size() const; +namespace org::apache::nifi::minifi { - private: - struct FlowFilePenaltyExpirationComparator { - bool operator()(const value_type& left, const value_type& right); - }; +struct SwappedFlowFile { + utils::Identifier id; + std::chrono::steady_clock::time_point to_be_processed_after; +}; - std::priority_queue<value_type, std::vector<value_type>, FlowFilePenaltyExpirationComparator> queue_; +class SwapManager { + public: + virtual void store(std::vector<std::shared_ptr<core::FlowFile>> flow_files) = 0; + virtual std::future<std::vector<std::shared_ptr<core::FlowFile>>> load(std::vector<SwappedFlowFile> flow_files) = 0; + virtual ~SwapManager() = default; }; -} // namespace utils -} // namespace minifi -} // namespace nifi -} // namespace apache -} // namespace org +} // namespace org::apache::nifi::minifi diff --git a/libminifi/include/core/FlowFile.h b/libminifi/include/core/FlowFile.h index cc55913a7..d2c5710c2 100644 --- a/libminifi/include/core/FlowFile.h +++ b/libminifi/include/core/FlowFile.h @@ -219,6 +219,10 @@ class FlowFile : public CoreComponent, public ReferenceContainer { return to_be_processed_after_; } + void setPenaltyExpiration(std::chrono::time_point<std::chrono::steady_clock> to_be_processed_after) { + to_be_processed_after_ = to_be_processed_after; + } + /** * Gets the offset within the flow file * @return size as a uint64_t diff --git a/libminifi/include/core/Repository.h b/libminifi/include/core/Repository.h index 33804517f..afe3e5750 100644 --- a/libminifi/include/core/Repository.h +++ b/libminifi/include/core/Repository.h @@ -43,6 +43,7 @@ #include "core/Connectable.h" #include "core/TraceableResource.h" #include "utils/BackTrace.h" +#include "SwapManager.h" #ifndef WIN32 #include <sys/stat.h> @@ -141,7 +142,7 @@ class Repository : public virtual core::SerializableComponent, public core::Trac // Start the repository monitor thread virtual void start(); // Stop the repository monitor thread - void stop(); + virtual void stop(); // whether the repo is full virtual bool isFull() { return repo_full_; diff --git a/libminifi/include/core/yaml/YamlConnectionParser.h b/libminifi/include/core/yaml/YamlConnectionParser.h index f15224fd8..fe6f58ed8 100644 --- a/libminifi/include/core/yaml/YamlConnectionParser.h +++ b/libminifi/include/core/yaml/YamlConnectionParser.h @@ -48,6 +48,7 @@ class YamlConnectionParser { [[nodiscard]] uint64_t getWorkQueueSizeFromYaml() const; [[nodiscard]] uint64_t getWorkQueueDataSizeFromYaml() const; [[nodiscard]] utils::Identifier getSourceUUIDFromYaml() const; + [[nodiscard]] uint64_t getSwapThresholdFromYaml() const; [[nodiscard]] utils::Identifier getDestinationUUIDFromYaml() const; [[nodiscard]] std::chrono::milliseconds getFlowFileExpirationFromYaml() const; [[nodiscard]] bool getDropEmptyFromYaml() const; diff --git a/libminifi/include/utils/FlowFileQueue.h b/libminifi/include/utils/FlowFileQueue.h index a6d343fbf..952d3fc12 100644 --- a/libminifi/include/utils/FlowFileQueue.h +++ b/libminifi/include/utils/FlowFileQueue.h @@ -17,10 +17,16 @@ #pragma once #include <memory> -#include <queue> #include <vector> +#include <algorithm> +#include <utility> #include "core/FlowFile.h" +#include "MinMaxHeap.h" +#include "SwapManager.h" +#include "TimeUtil.h" + +struct FlowFileQueueTestAccessor; namespace org { namespace apache { @@ -29,22 +35,78 @@ namespace minifi { namespace utils { class FlowFileQueue { + friend struct ::FlowFileQueueTestAccessor; + using TimePoint = std::chrono::steady_clock::time_point; + public: using value_type = std::shared_ptr<core::FlowFile>; + explicit FlowFileQueue(std::shared_ptr<SwapManager> swap_manager = {}); + value_type pop(); - void push(const value_type& element); - void push(value_type&& element); + std::optional<value_type> tryPop(); + std::optional<value_type> tryPop(std::chrono::milliseconds timeout); + void push(value_type element); bool isWorkAvailable() const; bool empty() const; size_t size() const; + void setMinSize(size_t min_size); + void setTargetSize(size_t target_size); + void setMaxSize(size_t max_size); + void clear(); private: + std::optional<value_type> tryPopImpl(std::optional<std::chrono::milliseconds> timeout); + + void initiateLoadIfNeeded(); + + struct LoadTask { + TimePoint min; + TimePoint max; + std::future<std::vector<std::shared_ptr<core::FlowFile>>> items; + size_t count; + // flow files that have been pushed into the queue while a + // load was pending + std::vector<value_type> intermediate_items; + + LoadTask(TimePoint min, TimePoint max, std::future<std::vector<std::shared_ptr<core::FlowFile>>> items, size_t count) + : min(min), max(max), items(std::move(items)), count(count) {} + + size_t size() const { + return count + intermediate_items.size(); + } + }; + + bool processLoadTaskWait(std::optional<std::chrono::milliseconds> timeout); + struct FlowFilePenaltyExpirationComparator { - bool operator()(const value_type& left, const value_type& right); + bool operator()(const value_type& left, const value_type& right) const; + }; + + struct SwappedFlowFileComparator { + bool operator()(const SwappedFlowFile& left, const SwappedFlowFile& right) const; }; - std::priority_queue<value_type, std::vector<value_type>, FlowFilePenaltyExpirationComparator> queue_; + size_t shouldSwapOutCount() const; + + size_t shouldSwapInCount() const; + + std::shared_ptr<SwapManager> swap_manager_; + // a load is initiated if the queue_ shrinks below this threshold + std::atomic<size_t> min_size_{0}; + // a given operation (load/store) will try to approach this size + std::atomic<size_t> target_size_{0}; + // a store is initiated if the queue_ grows beyond this threshold + std::atomic<size_t> max_size_{0}; + + MinMaxHeap<SwappedFlowFile, SwappedFlowFileComparator> swapped_flow_files_; + // the pending swap-in operation (if any) + std::optional<LoadTask> load_task_; + MinMaxHeap<value_type, FlowFilePenaltyExpirationComparator> queue_; + + std::unique_ptr<timeutils::SteadyClock> clock_{std::make_unique<timeutils::SteadyClock>()}; + + std::shared_ptr<core::logging::Logger> logger_; }; } // namespace utils diff --git a/libminifi/include/utils/MinMaxHeap.h b/libminifi/include/utils/MinMaxHeap.h new file mode 100644 index 000000000..edb6365bb --- /dev/null +++ b/libminifi/include/utils/MinMaxHeap.h @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <functional> +#include <set> +#include <cmath> +#include <utility> + +#include "utils/gsl.h" + +struct MinMaxHeapTestAccessor; + +namespace org::apache::nifi::minifi::utils { + +template<typename T, typename Comparator = std::less<T>> +class MinMaxHeap { + public: + void clear() { + data_.clear(); + } + + const T& min() const { + return *data_.begin(); + } + + const T& max() const { + return *data_.rbegin(); + } + + size_t size() const { + return data_.size(); + } + + bool empty() const { + return data_.empty(); + } + + void push(T item) { + data_.insert(std::move(item)); + } + + T popMin() { + auto it = data_.begin(); + T min = std::move(*it); + data_.erase(it); + return min; + } + + T popMax() { + auto it = std::prev(data_.end()); + T max = std::move(*it); + data_.erase(it); + return max; + } + + private: + std::multiset<T, Comparator> data_; +}; + +} // namespace org::apache::nifi::minifi::utils diff --git a/libminifi/include/utils/MinifiConcurrentQueue.h b/libminifi/include/utils/MinifiConcurrentQueue.h index 5c229fac8..8abc0ec93 100644 --- a/libminifi/include/utils/MinifiConcurrentQueue.h +++ b/libminifi/include/utils/MinifiConcurrentQueue.h @@ -25,6 +25,7 @@ #include <condition_variable> #include <utility> #include <stdexcept> +#include <atomic> #include "utils/TryMoveCall.h" @@ -215,25 +216,29 @@ class ConditionConcurrentQueue : private ConcurrentQueue<T> { } void stop() { + // this lock ensures that other threads did not yet + // check the running_ condition (as they all acquire + // the lock before the check) or already unlocked and + // are waiting, thus receiving the notify_all + // TODO(adebreceni): investigate a waiting_ counter + // approach that would render the locking here unnecessary std::lock_guard<std::mutex> guard(this->mtx_); running_ = false; cv_.notify_all(); } void start() { - std::unique_lock<std::mutex> lck(this->mtx_); running_ = true; } bool isRunning() const { - std::lock_guard<std::mutex> guard(this->mtx_); return running_; // In case it's not running no notifications are generated, dequeueing fails instead of blocking to avoid hanging threads } using ConcurrentQueue<T>::remove; private: - bool running_; + std::atomic<bool> running_; std::condition_variable cv_; }; diff --git a/libminifi/include/utils/TestUtils.h b/libminifi/include/utils/TestUtils.h index c125ad72c..37e1543f3 100644 --- a/libminifi/include/utils/TestUtils.h +++ b/libminifi/include/utils/TestUtils.h @@ -69,6 +69,24 @@ class ManualClock : public timeutils::Clock { std::chrono::milliseconds time_{0}; }; +class ManualSteadyClock : public timeutils::SteadyClock { + public: + std::chrono::milliseconds timeSinceEpoch() const override { return time_; } + void advance(std::chrono::milliseconds elapsed_time) { + if (elapsed_time.count() < 0) { + throw std::logic_error("A steady clock can only be advanced forward"); + } + time_ += elapsed_time; + } + + std::chrono::steady_clock::time_point now() const override { + return std::chrono::steady_clock::time_point{time_}; + } + + private: + std::chrono::milliseconds time_{0}; +}; + #ifdef WIN32 // The tzdata location is set as a global variable in date-tz library diff --git a/libminifi/include/utils/TimeUtil.h b/libminifi/include/utils/TimeUtil.h index dbcf31014..906d851fe 100644 --- a/libminifi/include/utils/TimeUtil.h +++ b/libminifi/include/utils/TimeUtil.h @@ -84,6 +84,10 @@ class SteadyClock : public Clock { std::chrono::milliseconds timeSinceEpoch() const override { return std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()); } + + virtual std::chrono::time_point<std::chrono::steady_clock> now() const { + return std::chrono::steady_clock::now(); + } }; inline std::string getTimeStr(std::chrono::system_clock::time_point tp) { diff --git a/libminifi/src/Connection.cpp b/libminifi/src/Connection.cpp index 1de108b73..440fddbb7 100644 --- a/libminifi/src/Connection.cpp +++ b/libminifi/src/Connection.cpp @@ -70,6 +70,15 @@ Connection::Connection(std::shared_ptr<core::Repository> flow_repository, std::s logger_->log_debug("Connection %s created", name_); } +Connection::Connection(std::shared_ptr<core::Repository> flow_repository, std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<SwapManager> swap_manager, + const std::string& name, const utils::Identifier& uuid) + : core::Connectable(name, uuid), + flow_repository_(std::move(flow_repository)), + content_repo_(std::move(content_repo)), + queue_(std::move(swap_manager)) { + logger_->log_debug("Connection %s created", name_); +} + bool Connection::isEmpty() const { std::lock_guard<std::mutex> lock(mutex_); @@ -141,7 +150,11 @@ std::shared_ptr<core::FlowFile> Connection::poll(std::set<std::shared_ptr<core:: std::lock_guard<std::mutex> lock(mutex_); while (queue_.isWorkAvailable()) { - std::shared_ptr<core::FlowFile> item = queue_.pop(); + std::optional<std::shared_ptr<core::FlowFile>> opt_item = queue_.tryPop(); + if (!opt_item) { + return nullptr; + } + std::shared_ptr<core::FlowFile> item = std::move(opt_item.value()); queued_data_size_ -= item->getSize(); if (expired_duration_.load() > 0ms) { @@ -167,11 +180,16 @@ std::shared_ptr<core::FlowFile> Connection::poll(std::set<std::shared_ptr<core:: void Connection::drain(bool delete_permanently) { std::lock_guard<std::mutex> lock(mutex_); - - while (!queue_.empty()) { - std::shared_ptr<core::FlowFile> item = queue_.pop(); - logger_->log_debug("Delete flow file UUID %s from connection %s, because it expired", item->getUUIDStr(), name_); - if (delete_permanently) { + if (!delete_permanently) { + // simply discard in-memory flow files + queue_.clear(); + } else { + while (!queue_.empty()) { + auto opt_item = queue_.tryPop(std::chrono::milliseconds{100}); + if (!opt_item) { + continue; + } + auto& item = opt_item.value(); if (item->isStored() && flow_repository_->Delete(item->getUUIDStr())) { item->setStoredToRepository(false); auto claim = item->getResourceClaim(); @@ -179,6 +197,7 @@ void Connection::drain(bool delete_permanently) { } } } + queued_data_size_ = 0; logger_->log_debug("Drain connection %s", name_); } diff --git a/libminifi/src/core/FlowConfiguration.cpp b/libminifi/src/core/FlowConfiguration.cpp index 6df2ed7f4..b35cda810 100644 --- a/libminifi/src/core/FlowConfiguration.cpp +++ b/libminifi/src/core/FlowConfiguration.cpp @@ -158,6 +158,15 @@ std::unique_ptr<core::ProcessGroup> FlowConfiguration::createRemoteProcessGroup( } std::unique_ptr<minifi::Connection> FlowConfiguration::createConnection(const std::string& name, const utils::Identifier& uuid) const { + // An alternative approach would be to thread the swap manager through all the classes + // but it kind of makes sense that swapping the flow files is the responsibility of the + // flow_file_repo_. If we introduce other swappers then we will have no other choice. + if (flow_file_repo_) { + auto swap_manager = std::dynamic_pointer_cast<SwapManager>(flow_file_repo_); + if (swap_manager) { + return std::make_unique<minifi::Connection>(flow_file_repo_, content_repo_, std::move(swap_manager), name, uuid); + } + } return std::make_unique<minifi::Connection>(flow_file_repo_, content_repo_, name, uuid); } diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/yaml/YamlConfiguration.cpp index 5cd7ed91c..9f9be9be6 100644 --- a/libminifi/src/core/yaml/YamlConfiguration.cpp +++ b/libminifi/src/core/yaml/YamlConfiguration.cpp @@ -569,6 +569,7 @@ void YamlConfiguration::parseConnectionYaml(const YAML::Node& connectionsNode, c connectionParser.configureConnectionSourceRelationshipsFromYaml(*connection); connection->setMaxQueueSize(connectionParser.getWorkQueueSizeFromYaml()); connection->setMaxQueueDataSize(connectionParser.getWorkQueueDataSizeFromYaml()); + connection->setSwapThreshold(connectionParser.getSwapThresholdFromYaml()); connection->setSourceUUID(connectionParser.getSourceUUIDFromYaml()); connection->setDestinationUUID(connectionParser.getDestinationUUIDFromYaml()); connection->setFlowExpirationDuration(connectionParser.getFlowFileExpirationFromYaml()); diff --git a/libminifi/src/core/yaml/YamlConnectionParser.cpp b/libminifi/src/core/yaml/YamlConnectionParser.cpp index b29487275..b19e90eb0 100644 --- a/libminifi/src/core/yaml/YamlConnectionParser.cpp +++ b/libminifi/src/core/yaml/YamlConnectionParser.cpp @@ -75,7 +75,7 @@ uint64_t YamlConnectionParser::getWorkQueueSizeFromYaml() const { logger_->log_debug("Setting %" PRIu64 " as the max queue size.", max_work_queue_size); return max_work_queue_size; } - logger_->log_info("Invalid max queue size value: %s.", max_work_queue_str); + logger_->log_error("Invalid max queue size value: %s.", max_work_queue_str); } return 0; } @@ -89,7 +89,21 @@ uint64_t YamlConnectionParser::getWorkQueueDataSizeFromYaml() const { logger_->log_debug("Setting %" PRIu64 "as the max as the max queue data size.", max_work_queue_data_size); return max_work_queue_data_size; } - logger_->log_info("Invalid max queue data size value: %s.", max_work_queue_str); + logger_->log_error("Invalid max queue data size value: %s.", max_work_queue_str); + } + return 0; +} + +uint64_t YamlConnectionParser::getSwapThresholdFromYaml() const { + const YAML::Node swap_threshold_node = connectionNode_["swap threshold"]; + if (swap_threshold_node) { + auto swap_threshold_str = swap_threshold_node.as<std::string>(); + uint64_t swap_threshold; + if (core::Property::StringToInt(swap_threshold_str, swap_threshold)) { + logger_->log_debug("Setting %" PRIu64 " as the swap threshold.", swap_threshold); + return swap_threshold; + } + logger_->log_error("Invalid swap threshold value: %s.", swap_threshold_str); } return 0; } diff --git a/libminifi/src/utils/FlowFileQueue.cpp b/libminifi/src/utils/FlowFileQueue.cpp index 84b682d12..94414377f 100644 --- a/libminifi/src/utils/FlowFileQueue.cpp +++ b/libminifi/src/utils/FlowFileQueue.cpp @@ -16,59 +16,232 @@ */ #include "utils/FlowFileQueue.h" +#include "core/logging/LoggerConfiguration.h" -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace utils { +namespace org::apache::nifi::minifi::utils { -bool FlowFileQueue::FlowFilePenaltyExpirationComparator::operator()(const value_type& left, const value_type& right) { - // this is operator< implemented using > so that top() is the element with the smallest key (earliest expiration) - // rather than the element with the largest key, which is the default for std::priority_queue - return left->getPenaltyExpiration() > right->getPenaltyExpiration(); +bool FlowFileQueue::FlowFilePenaltyExpirationComparator::operator()(const value_type& left, const value_type& right) const { + // a flow file with earlier expiration compares less + return left->getPenaltyExpiration() < right->getPenaltyExpiration(); } +bool FlowFileQueue::SwappedFlowFileComparator::operator()(const SwappedFlowFile& left, const SwappedFlowFile& right) const { + // a swapped flow file with earlier expiration compares less + return left.to_be_processed_after < right.to_be_processed_after; +} + +FlowFileQueue::FlowFileQueue(std::shared_ptr<SwapManager> swap_manager) + : swap_manager_(std::move(swap_manager)), + logger_(core::logging::LoggerFactory<FlowFileQueue>::getLogger()) {} + FlowFileQueue::value_type FlowFileQueue::pop() { - if (empty()) { - throw std::logic_error{"pop() called on an empty FlowFileQueue"}; - } + return tryPopImpl({}).value(); +} - value_type next_flow_file = queue_.top(); - queue_.pop(); - return next_flow_file; +std::optional<FlowFileQueue::value_type> FlowFileQueue::tryPop() { + return tryPopImpl(std::chrono::milliseconds{0}); +} + +std::optional<FlowFileQueue::value_type> FlowFileQueue::tryPop(std::chrono::milliseconds timeout) { + return tryPopImpl(timeout); +} + +std::optional<FlowFileQueue::value_type> FlowFileQueue::tryPopImpl(std::optional<std::chrono::milliseconds> timeout) { + std::optional<std::shared_ptr<core::FlowFile>> result; + if (!queue_.empty()) { + result = queue_.popMin(); + if (processLoadTaskWait(std::chrono::milliseconds{0})) { + initiateLoadIfNeeded(); + } + return result; + } + if (load_task_) { + logger_->log_debug("Head is empty checking already running load task"); + if (!processLoadTaskWait(timeout)) { + return std::nullopt; + } + if (!queue_.empty()) { + // load provided items + result = queue_.popMin(); + initiateLoadIfNeeded(); + return result; + } + } + // no pending load_task_ and no items in the queue_ + initiateLoadIfNeeded(); + return std::nullopt; } -void FlowFileQueue::push(const value_type& element) { - if (!element->isPenalized()) { - element->penalize(std::chrono::milliseconds{0}); +bool FlowFileQueue::processLoadTaskWait(std::optional<std::chrono::milliseconds> timeout) { + if (!load_task_) { + return true; } + std::future_status status = std::future_status::ready; + if (timeout) { + status = load_task_.value().items.wait_for(timeout.value()); + } + if (status == std::future_status::timeout) { + logger_->log_debug("Load task is not yet completed"); + return false; + } + gsl_Assert(status == std::future_status::ready); - queue_.push(element); + logger_->log_debug("Getting loaded flow files"); + size_t swapped_in_count = 0; + size_t intermediate_count = 0; + for (auto&& item : load_task_->items.get()) { + ++swapped_in_count; + queue_.push(std::move(item)); + } + for (auto&& intermediate_item : load_task_->intermediate_items) { + ++intermediate_count; + queue_.push(std::move(intermediate_item)); + } + load_task_.reset(); + logger_->log_debug("Swapped in '%zu' flow files and committed '%zu' pending files", swapped_in_count, intermediate_count); + return true; } -void FlowFileQueue::push(value_type&& element) { - if (!element->isPenalized()) { - element->penalize(std::chrono::milliseconds{0}); +void FlowFileQueue::push(value_type element) { + // do not allow pushing elements in the past + element->setPenaltyExpiration(std::max(element->getPenaltyExpiration(), clock_->now())); + + std::vector<value_type> flow_files_to_be_swapped_out; + + if (load_task_) { + if (element->getPenaltyExpiration() <= load_task_->min) { + // flow file goes before load_task_ + queue_.push(std::move(element)); + } else if (load_task_->max <= element->getPenaltyExpiration()) { + // flow file goes after load_task_, i.e. immediately swapped out + flow_files_to_be_swapped_out.push_back(std::move(element)); + } else { + // flow file belongs to the same range that is being swapped in + load_task_->intermediate_items.push_back(std::move(element)); + } + } else if (!swapped_flow_files_.empty() && swapped_flow_files_.min().to_be_processed_after < element->getPenaltyExpiration()) { + // flow file goes into the swapped_flow_files_ set, i.e. immediately swapped out + flow_files_to_be_swapped_out.push_back(std::move(element)); + } else { + queue_.push(std::move(element)); } - queue_.push(std::move(element)); + size_t flow_file_count = shouldSwapOutCount(); + if (flow_file_count != 0) { + if (!load_task_) { + // we cannot initiate a queue_ swap while a load_task_ is pending + flow_files_to_be_swapped_out.reserve(flow_files_to_be_swapped_out.size() + flow_file_count); + for (size_t i = 0; i < flow_file_count; ++i) { + flow_files_to_be_swapped_out.push_back(queue_.popMax()); + } + } + } + if (!flow_files_to_be_swapped_out.empty()) { + for (const auto& flow_file : flow_files_to_be_swapped_out) { + swapped_flow_files_.push(SwappedFlowFile{flow_file->getUUID(), flow_file->getPenaltyExpiration()}); + } + logger_->log_debug("Initiating store of %zu flow files", flow_files_to_be_swapped_out.size()); + swap_manager_->store(std::move(flow_files_to_be_swapped_out)); + } } bool FlowFileQueue::isWorkAvailable() const { - return !queue_.empty() && !queue_.top()->isPenalized(); + auto now = clock_->now(); + if (!queue_.empty()) { + return queue_.min()->getPenaltyExpiration() <= now; + } + if (load_task_) { + if (load_task_->min > now) { + return false; + } + auto status = load_task_->items.wait_for(std::chrono::milliseconds{0}); + return status == std::future_status::ready; + } + return !swapped_flow_files_.empty() && swapped_flow_files_.min().to_be_processed_after <= now; } bool FlowFileQueue::empty() const { - return queue_.empty(); + return size() == 0; } size_t FlowFileQueue::size() const { - return queue_.size(); + return queue_.size() + (load_task_ ? load_task_->size() : 0) + swapped_flow_files_.size(); +} + +void FlowFileQueue::clear() { + queue_.clear(); + load_task_.reset(); + swapped_flow_files_.clear(); +} + +void FlowFileQueue::initiateLoadIfNeeded() { + if (load_task_) { + throw std::logic_error("There is already an active load task running"); + } + size_t flow_files_count = shouldSwapInCount(); + if (flow_files_count == 0) { + return; + } + logger_->log_debug("Initiating load of %zu flow files", flow_files_count); + TimePoint min = TimePoint::max(); + TimePoint max = TimePoint::min(); + std::vector<SwappedFlowFile> flow_files; + flow_files.reserve(flow_files_count); + for (size_t i = 0; i < flow_files_count; ++i) { + SwappedFlowFile flow_file = swapped_flow_files_.popMin(); + // TODO(adebreceni): since we are popping in order, we could elide these std::min and std::max comparisons + min = std::min(min, flow_file.to_be_processed_after); + max = std::max(max, flow_file.to_be_processed_after); + flow_files.push_back(flow_file); + } + load_task_ = {min, max, swap_manager_->load(std::move(flow_files)), flow_files_count}; +} + +void FlowFileQueue::setMinSize(size_t min_size) { + min_size_ = min_size; +} + +void FlowFileQueue::setTargetSize(size_t target_size) { + target_size_ = target_size; +} + +void FlowFileQueue::setMaxSize(size_t max_size) { + max_size_ = max_size; +} + +size_t FlowFileQueue::shouldSwapOutCount() const { + if (!swap_manager_) { + return 0; + } + // read once for consistent view of a single atomic variable + size_t max_size = max_size_; + size_t target_size = target_size_; + if (max_size != 0 && target_size != 0 + && max_size < queue_.size() && target_size < queue_.size()) { + return queue_.size() - target_size; + } + return 0; } -} // namespace utils -} // namespace minifi -} // namespace nifi -} // namespace apache -} // namespace org +size_t FlowFileQueue::shouldSwapInCount() const { + if (!swap_manager_) { + return 0; + } + // read once for consistent view of a single atomic variable + size_t min_size = min_size_; + size_t target_size = target_size_; + if (min_size == 0 || target_size == 0) { + if (!swapped_flow_files_.empty()) { + logger_->log_info("Swapping in all the flow files"); + return swapped_flow_files_.size(); + } + return 0; + } + if (queue_.size() < min_size && queue_.size() < target_size) { + return std::min(target_size - queue_.size(), swapped_flow_files_.size()); + } + return 0; +} + + +} // namespace org::apache::nifi::minifi::utils diff --git a/libminifi/src/utils/ThreadPool.cpp b/libminifi/src/utils/ThreadPool.cpp index 640c2c5f4..8612f0dbf 100644 --- a/libminifi/src/utils/ThreadPool.cpp +++ b/libminifi/src/utils/ThreadPool.cpp @@ -253,7 +253,13 @@ void ThreadPool<T>::shutdown() { manager_thread_.join(); } - delayed_task_available_.notify_all(); + { + // this lock ensures that the delayed_scheduler_thread_ + // is not between checking the running_ and before the cv_.wait* + // as then, it would survive the notify_all call + std::lock_guard<std::mutex> worker_lock(worker_queue_mutex_); + delayed_task_available_.notify_all(); + } if (delayed_scheduler_thread_.joinable()) { delayed_scheduler_thread_.join(); } diff --git a/libminifi/test/Utils.h b/libminifi/test/Utils.h index d11757793..09f86663e 100644 --- a/libminifi/test/Utils.h +++ b/libminifi/test/Utils.h @@ -26,6 +26,9 @@ using namespace std::chrono_literals; #undef GetObject // windows.h #defines GetObject = GetObjectA or GetObjectW, which conflicts with rapidjson +#include "Connection.h" +#include "FlowFileQueue.h" +#include "Catch.h" #define FIELD_ACCESSOR(field) \ template<typename T> \ @@ -118,3 +121,17 @@ void sendMessagesViaTCP(const std::vector<std::string_view>& contents, uint64_t REQUIRE(!err); socket.close(); } + +struct ConnectionTestAccessor { + FIELD_ACCESSOR(queue_); +}; + +struct FlowFileQueueTestAccessor { + FIELD_ACCESSOR(min_size_); + FIELD_ACCESSOR(max_size_); + FIELD_ACCESSOR(target_size_); + FIELD_ACCESSOR(clock_); + FIELD_ACCESSOR(swapped_flow_files_); + FIELD_ACCESSOR(load_task_); + FIELD_ACCESSOR(queue_); +}; diff --git a/libminifi/test/rocksdb-tests/SwapTests.cpp b/libminifi/test/rocksdb-tests/SwapTests.cpp new file mode 100644 index 000000000..3739376bb --- /dev/null +++ b/libminifi/test/rocksdb-tests/SwapTests.cpp @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "../Catch.h" +#include "core/RepositoryFactory.h" +#include "core/repository/VolatileContentRepository.h" +#include "FlowFileRepository.h" +#include "../TestBase.h" +#include "../Utils.h" +#include "StreamPipe.h" +#include "IntegrationTestUtils.h" +#include "core/Processor.h" +#include "core/ProcessSession.h" +#include "../unit/ProvenanceTestHelper.h" + +class OutputProcessor : public core::Processor { + public: + using core::Processor::Processor; + + static const core::Relationship Success; + + using core::Processor::onTrigger; + + static constexpr const char* Description = "Processor used for testing cycles"; + static auto properties() { return std::array<core::Property, 0>{}; } + static auto relationships() { return std::array{Success}; } + static constexpr bool SupportsDynamicProperties = false; + static constexpr bool SupportsDynamicRelationships = false; + static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_ALLOWED; + static constexpr bool IsSingleThreaded = false; + + ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS + + void initialize() override { + setSupportedProperties(properties()); + setSupportedRelationships(relationships()); + } + + void onTrigger(core::ProcessContext* /*context*/, core::ProcessSession* session) override { + auto id = std::to_string(next_id_++); + auto ff = session->create(); + ff->addAttribute("index", id); + session->write(ff, [&] (const std::shared_ptr<minifi::io::BaseStream>& output) -> int64_t { + auto ret = output->write(gsl::span<const char>(id.data(), id.size()).as_span<const std::byte>()); + if (minifi::io::isError(ret)) { + return -1; + } + return gsl::narrow<int64_t>(ret); + }); + session->transfer(ff, Success); + flow_files_.push_back(ff); + } + + std::vector<std::shared_ptr<core::FlowFile>> flow_files_; + int next_id_{0}; +}; + +const core::Relationship OutputProcessor::Success{"success", ""}; + +TEST_CASE("Connection will on-demand swap flow files") { + TestController testController; + LogTestController::getInstance().setDebug<core::ContentRepository>(); + LogTestController::getInstance().setTrace<core::repository::FileSystemRepository>(); + LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>(); + LogTestController::getInstance().setTrace<minifi::ResourceClaim>(); + LogTestController::getInstance().setTrace<minifi::FlowFileRecord>(); + LogTestController::getInstance().setTrace<minifi::utils::FlowFileQueue>(); + LogTestController::getInstance().setTrace<minifi::FlowFileLoader>(); + LogTestController::getInstance().setTrace<core::repository::FlowFileRepository>(); + LogTestController::getInstance().setTrace<core::repository::VolatileRepository<minifi::ResourceClaim::Path>>(); + + auto dir = testController.createTempDirectory(); + + auto config = std::make_shared<minifi::Configure>(); + config->set(minifi::Configure::nifi_dbcontent_repository_directory_default, utils::file::FileUtils::concat_path(dir, "content_repository")); + config->set(minifi::Configure::nifi_flowfile_repository_directory_default, utils::file::FileUtils::concat_path(dir, "flowfile_repository")); + + auto prov_repo = std::make_shared<TestRepository>(); + auto ff_repo = std::make_shared<core::repository::FlowFileRepository>("flowFileRepository"); + auto swap_manager = std::dynamic_pointer_cast<minifi::SwapManager>(ff_repo); + auto content_repo = std::make_shared<core::repository::VolatileContentRepository>(); + + ff_repo->initialize(config); + content_repo->initialize(config); + + ff_repo->loadComponent(content_repo); + ff_repo->start(); + + auto processor = std::make_shared<OutputProcessor>("proc"); + + auto connection = std::make_shared<minifi::Connection>(ff_repo, content_repo, swap_manager, "conn", utils::IdGenerator::getIdGenerator()->generate()); + connection->setSwapThreshold(50); + connection->addRelationship(OutputProcessor::Success); + connection->setSourceUUID(processor->getUUID()); + processor->addConnection(connection.get()); + + auto processor_node = std::make_shared<core::ProcessorNode>(processor.get()); + auto context = std::make_shared<core::ProcessContext>(processor_node, nullptr, prov_repo, ff_repo, content_repo); + auto session_factory = std::make_shared<core::ProcessSessionFactory>(context); + + for (size_t i = 0; i < 200; ++i) { + processor->onTrigger(context, session_factory); + } + + REQUIRE(connection->getQueueSize() == processor->flow_files_.size()); + utils::FlowFileQueue& queue = ConnectionTestAccessor::get_queue_(*connection); + // below max threshold live flow files + REQUIRE(FlowFileQueueTestAccessor::get_queue_(queue).size() <= 75); + REQUIRE(queue.size() == 200); + + std::set<std::shared_ptr<core::FlowFile>> expired; + for (size_t i = 0; i < 200; ++i) { + std::shared_ptr<core::FlowFile> ff; + minifi::utils::verifyEventHappenedInPollTime(std::chrono::seconds{1}, [&] { + ff = connection->poll(expired); + return static_cast<bool>(ff); + }); + REQUIRE(ff->getAttribute("index") == std::to_string(i)); + REQUIRE(ff->getResourceClaim()->getContentFullPath() == processor->flow_files_[i]->getResourceClaim()->getContentFullPath()); + } + + REQUIRE(queue.empty()); +} diff --git a/libminifi/test/unit/FlowFileQueueSwapTests.cpp b/libminifi/test/unit/FlowFileQueueSwapTests.cpp new file mode 100644 index 000000000..a2e52cbc4 --- /dev/null +++ b/libminifi/test/unit/FlowFileQueueSwapTests.cpp @@ -0,0 +1,183 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <random> + +#include "Connection.h" + +#include "../TestBase.h" +#include "SwapTestController.h" + +TEST_CASE("Setting swap threshold sets underlying queue limits", "[SwapTest1]") { + const size_t target_size = 4; + const size_t min_size = target_size / 2; + const size_t max_size = target_size * 3 / 2; + + minifi::Connection conn(nullptr, nullptr, ""); + conn.setSwapThreshold(target_size); + REQUIRE(FlowFileQueueTestAccessor::get_min_size_(ConnectionTestAccessor::get_queue_(conn)) == min_size); + REQUIRE(FlowFileQueueTestAccessor::get_target_size_(ConnectionTestAccessor::get_queue_(conn)) == target_size); + REQUIRE(FlowFileQueueTestAccessor::get_max_size_(ConnectionTestAccessor::get_queue_(conn)) == max_size); +} + +TEST_CASE_METHOD(SwapTestController, "Default constructed FlowFileQueue won't swap", "[SwapTest2]") { + for (unsigned i = 0; i < 100; ++i) { + pushAll({i}); + } + + REQUIRE(queue_->impl.size() == 100); + + clock_->advance(std::chrono::seconds{200}); + for (size_t i = 0; i < 100; ++i) { + queue_->poll(); + } + + REQUIRE(queue_->impl.empty()); + verifySwapEvents({}); +} + +TEST_CASE_METHOD(SwapTestController, "Up to max no swap-out is triggered", "[SwapTest3]") { + setLimits(2, 4, 6); + pushAll({50, 20, 30, 60, 10, 40}); + + REQUIRE_FALSE(queue_->isWorkAvailable()); + verifySwapEvents({}); +} + +TEST_CASE_METHOD(SwapTestController, "Pushing beyond max triggers a swap-out", "[SwapTest4]") { + setLimits(2, 4, 6); + pushAll({50, 20, 30, 60, 10, 40}); + + pushAll({28}); + // size goes from 7 to 4, 3 largest must have been swapped out + verifySwapEvents({{Store, {60, 50, 40}}}); + verifyQueue({10, 20, 28, 30}, {}, {40, 50, 60}); +} + +TEST_CASE_METHOD(SwapTestController, "Popping until min size does not trigger swap-in", "[SwapTest5]") { + setLimits(2, 4, 6); + pushAll({50, 20, 30, 60, 10, 40, 28}); + clearSwapEvents(); + clock_->advance(std::chrono::seconds{35}); + REQUIRE(queue_->isWorkAvailable()); + popAll({10, 20}); + verifyQueue({28, 30}, {}, {40, 50, 60}); + verifySwapEvents({}); +} + +TEST_CASE_METHOD(SwapTestController, "Popping beyond min size triggers swap-in", "[SwapTest6]") { + setLimits(2, 4, 6); + pushAll({50, 20, 30, 60, 10, 40, 28}); + clearSwapEvents(); + clock_->advance(std::chrono::seconds{35}); + popAll({10, 20, 28}); + + // trying to swap-in all three swapped flow files + verifyQueue({30}, {{}}, {}); + verifySwapEvents({{Load, {40, 50, 60}}}); +} + +TEST_CASE_METHOD(SwapTestController, "Pushing while a swap-in is pending", "[SwapTest7]") { + setLimits(2, 4, 6); + pushAll({50, 20, 30, 60, 10, 40, 28}); + clock_->advance(std::chrono::seconds{35}); + popAll({10, 20, 28}); + verifyQueue({30}, {{}}, {}); + clearSwapEvents(); + + SECTION("Pushing into the pending swap-in range") { + pushAll({45}); + verifyQueue({30}, {{45}}, {}); + verifySwapEvents({}); + } + + SECTION("Pushing before the pending swap-in range") { + pushAll({35}); + verifyQueue({30, 35}, {{}}, {}); + verifySwapEvents({}); + } + + SECTION("Pushing after the pending swap-in range") { + pushAll({65}); + verifyQueue({30}, {{}}, {65}); + verifySwapEvents({{Store, {65}}}); + } +} + +TEST_CASE_METHOD(SwapTestController, "isWorkAvailable depends on the swap-in task", "[SwapTest8]") { + setLimits(2, 4, 6); + pushAll({50, 20, 30, 60, 10, 40, 28}); + popAll({10, 20, 28}); + verifyQueue({30}, {{}}, {}); + + REQUIRE_FALSE(queue_->isWorkAvailable()); + + clock_->advance(std::chrono::seconds{35}); + + REQUIRE(queue_->isWorkAvailable()); + popAll({30}); + REQUIRE_FALSE(queue_->isWorkAvailable()); + + SECTION("Load is completed but the minimum of those files is still not viable") { + flow_repo_->load_tasks_[0].complete(); + REQUIRE_FALSE(queue_->isWorkAvailable()); + } + + SECTION("The minimum of the load task is viable but not yet completed") { + clock_->advance(std::chrono::seconds{35}); + REQUIRE_FALSE(queue_->isWorkAvailable()); + // completing the task renders it viable + flow_repo_->load_tasks_[0].complete(); + REQUIRE(queue_->isWorkAvailable()); + } +} + +TEST_CASE_METHOD(SwapTestController, "Polling from load task", "[SwapTest8]") { + setLimits(2, 4, 6); + pushAll({50, 20, 30, 60, 10, 40, 28}); + popAll({10, 20, 28}); + pushAll({45}); + verifyQueue({30}, {{45}}, {}); + + flow_repo_->load_tasks_[0].complete(); + + clock_->advance(std::chrono::seconds{100}); + + popAll({30, 40, 45, 50, 60}, true); + verifyQueue({}, {}, {}); +} + +TEST_CASE_METHOD(SwapTestController, "Popping below min checks if the pending load is finished", "[SwapTest8]") { + setLimits(6, 8, 10); + pushAll({10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120}); + verifyQueue({10, 20, 30, 40, 50, 60, 70, 80}, {}, {90, 100, 110, 120}); + clock_->advance(std::chrono::seconds{200}); + clearSwapEvents(); + popAll({10, 20, 30}); + verifySwapEvents({{Load, {90, 100, 110}}}); + verifyQueue({40, 50, 60, 70, 80}, {{}}, {120}); + clearSwapEvents(); + + popAll({40, 50}); + verifyQueue({60, 70, 80}, {{}}, {120}); + flow_repo_->load_tasks_[0].complete(); + popAll({60}); + // even though the live queue is not empty we check if + // the load_task is finished and initiate a load if need be + verifySwapEvents({{Load, {120}}}); + verifyQueue({70, 80, 90, 100, 110}, {{}}, {}); +} diff --git a/libminifi/test/unit/FlowFileQueueTests.cpp b/libminifi/test/unit/FlowFileQueueTests.cpp index 51b287a62..0d0c1b2b9 100644 --- a/libminifi/test/unit/FlowFileQueueTests.cpp +++ b/libminifi/test/unit/FlowFileQueueTests.cpp @@ -28,7 +28,7 @@ TEST_CASE("After construction, a FlowFileQueue is empty", "[FlowFileQueue]") { utils::FlowFileQueue queue; REQUIRE(queue.empty()); - REQUIRE(queue.size() == 0); + REQUIRE(queue.size() == 0); // NOLINT(readability-container-size-empty) REQUIRE_FALSE(queue.isWorkAvailable()); REQUIRE_THROWS(queue.pop()); } @@ -75,11 +75,26 @@ TEST_CASE("If three flow files are added to the FlowFileQueue, we can pop them i REQUIRE_FALSE(queue.isWorkAvailable()); } +TEST_CASE("Cannot add flow files in the past preempting others", "[FlowFileQueue][pop]") { + utils::FlowFileQueue queue; + const auto flow_file_1 = std::make_shared<core::FlowFile>(); + queue.push(flow_file_1); + const auto flow_file_2 = std::make_shared<core::FlowFile>(); + flow_file_2->penalize(std::chrono::seconds{-10}); + queue.push(flow_file_2); + + REQUIRE(queue.isWorkAvailable()); + REQUIRE(queue.pop() == flow_file_1); + REQUIRE(queue.isWorkAvailable()); + REQUIRE(queue.pop() == flow_file_2); + REQUIRE_FALSE(queue.isWorkAvailable()); +} + namespace { class PenaltyHasExpired { public: - explicit PenaltyHasExpired(const std::shared_ptr<core::FlowFile>& flow_file) : flow_file_(flow_file) {} + explicit PenaltyHasExpired(std::shared_ptr<core::FlowFile> flow_file) : flow_file_(std::move(flow_file)) {} bool operator()() { return !flow_file_->isPenalized(); } private: diff --git a/libminifi/test/unit/ProvenanceTestHelper.h b/libminifi/test/unit/ProvenanceTestHelper.h index 65c91bf1f..dd724061a 100644 --- a/libminifi/test/unit/ProvenanceTestHelper.h +++ b/libminifi/test/unit/ProvenanceTestHelper.h @@ -33,6 +33,7 @@ #include "FlowController.h" #include "properties/Configure.h" #include "provenance/Provenance.h" +#include "SwapManager.h" #if defined(__clang__) #pragma clang diagnostic push @@ -227,7 +228,8 @@ class TestFlowRepository : public org::apache::nifi::minifi::core::Repository { } } - void loadComponent(const std::shared_ptr<org::apache::nifi::minifi::core::ContentRepository>& /*content_repo*/) override { + void loadComponent(const std::shared_ptr<org::apache::nifi::minifi::core::ContentRepository>& content_repo) override { + content_repo_ = content_repo; } void run() override { @@ -237,6 +239,7 @@ class TestFlowRepository : public org::apache::nifi::minifi::core::Repository { protected: mutable std::mutex repository_results_mutex_; std::map<std::string, std::string> repository_results_; + std::shared_ptr<org::apache::nifi::minifi::core::ContentRepository> content_repo_; }; class TestFlowController : public org::apache::nifi::minifi::FlowController { diff --git a/libminifi/test/unit/SwapTestController.h b/libminifi/test/unit/SwapTestController.h new file mode 100644 index 000000000..0a921a754 --- /dev/null +++ b/libminifi/test/unit/SwapTestController.h @@ -0,0 +1,240 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <span> +#include <string> +#include <vector> +#include <utility> +#include <memory> + +#include "../TestBase.h" +#include "TestUtils.h" +#include "../Utils.h" +#include "../Catch.h" +#include "../unit/ProvenanceTestHelper.h" + + +using Timepoint = std::chrono::time_point<std::chrono::steady_clock>; + +enum EventKind { + Store, Load +}; + +struct SwapEvent { + EventKind kind; + std::vector<minifi::SwappedFlowFile> flow_files; + + void verifyTimes(std::initializer_list<unsigned> seconds) { + REQUIRE(flow_files.size() == seconds.size()); + size_t idx = 0; + for (auto& second : seconds) { + REQUIRE(flow_files[idx].to_be_processed_after == Timepoint{std::chrono::seconds{second}}); + ++idx; + } + } +}; + +class SwappingFlowFileTestRepo : public TestFlowRepository, public minifi::SwapManager { + public: + SwappingFlowFileTestRepo() + : core::SerializableComponent("ff") {} + + void store(std::vector<std::shared_ptr<core::FlowFile>> flow_files) override { + std::vector<minifi::SwappedFlowFile> ids; + for (const auto& ff : flow_files) { + ids.push_back(minifi::SwappedFlowFile{ff->getUUID(), ff->getPenaltyExpiration()}); + minifi::io::BufferStream output; + std::static_pointer_cast<minifi::FlowFileRecord>(ff)->Serialize(output); + Put(ff->getUUIDStr().c_str(), reinterpret_cast<const uint8_t*>(output.getBuffer().data()), output.size()); + } + swap_events_.push_back({Store, ids}); + } + + std::future<std::vector<std::shared_ptr<core::FlowFile>>> load(std::vector<minifi::SwappedFlowFile> flow_files) override { + swap_events_.push_back({Load, flow_files}); + LoadTask load_task; + auto future = load_task.promise.get_future(); + load_task.result.reserve(flow_files.size()); + for (const auto& ff_id : flow_files) { + std::string value; + Get(ff_id.id.to_string().c_str(), value); + utils::Identifier container_id; + auto ff = minifi::FlowFileRecord::DeSerialize(std::as_bytes(std::span(value)), content_repo_, container_id); + ff->setPenaltyExpiration(ff_id.to_be_processed_after); + load_task.result.push_back(std::move(ff)); + } + load_tasks_.push_back(std::move(load_task)); + return future; + } + + struct LoadTask { + std::promise<std::vector<std::shared_ptr<core::FlowFile>>> promise; + std::vector<std::shared_ptr<core::FlowFile>> result; + + void complete() { + promise.set_value(result); + } + }; + + std::vector<LoadTask> load_tasks_; + std::vector<SwapEvent> swap_events_; +}; + +using FlowFilePtr = std::shared_ptr<core::FlowFile>; +using FlowFilePtrVec = std::vector<FlowFilePtr>; + +struct FlowFileComparator { + bool operator()(const FlowFilePtr& left, const FlowFilePtr& right) const { + return left->getPenaltyExpiration() < right->getPenaltyExpiration(); + } +}; + +struct VerifiedQueue { + void push(FlowFilePtr ff) { + size(); + impl.push(ff); + ref_.insert(std::lower_bound(ref_.begin(), ref_.end(), ff, FlowFileComparator{}), ff); + size(); + } + + FlowFilePtr poll() { + size(); + FlowFilePtr ff = impl.pop(); + REQUIRE(!ref_.empty()); + // the order when flow files have the same penalty is not fixed + REQUIRE(ff->getPenaltyExpiration() == ref_.front()->getPenaltyExpiration()); + ref_.erase(ref_.begin()); + size(); + return ff; + } + + void verify(std::initializer_list<unsigned> live, std::optional<std::initializer_list<unsigned>> inter, std::initializer_list<unsigned> swapped) const { + // check live ffs + auto live_copy = FlowFileQueueTestAccessor::get_queue_(impl); + REQUIRE(live_copy.size() == live.size()); + for (auto sec : live) { + auto min = live_copy.popMin(); + REQUIRE(min->getPenaltyExpiration() == Timepoint{std::chrono::seconds{sec}}); + } + + // check inter ffs + if (!inter) { + REQUIRE_FALSE(FlowFileQueueTestAccessor::get_load_task_(impl).has_value()); + } else { + auto& intermediate = FlowFileQueueTestAccessor::get_load_task_(impl)->intermediate_items; + REQUIRE(intermediate.size() == inter->size()); + size_t idx = 0; + for (auto sec : inter.value()) { + REQUIRE(intermediate[idx]->getPenaltyExpiration() == Timepoint{std::chrono::seconds{sec}}); + ++idx; + } + } + + // check swapped ffs + auto swapped_copy = FlowFileQueueTestAccessor::get_swapped_flow_files_(impl); + REQUIRE(swapped_copy.size() == swapped.size()); + for (auto sec : swapped) { + auto min = swapped_copy.popMin(); + REQUIRE(min.to_be_processed_after == Timepoint{std::chrono::seconds{sec}}); + } + } + + bool isWorkAvailable() const { + return impl.isWorkAvailable(); + } + + size_t size() const { + size_t result = impl.size(); + REQUIRE(result == ref_.size()); + return result; + } + + VerifiedQueue(std::shared_ptr<minifi::SwapManager> swap_manager, std::unique_ptr<utils::timeutils::SteadyClock> clock) + : impl(std::move(swap_manager)) { + FlowFileQueueTestAccessor::get_clock_(impl) = std::move(clock); + } + + utils::FlowFileQueue impl; + FlowFilePtrVec ref_; +}; + +class SwapTestController : public TestController { + public: + SwapTestController() { + content_repo_ = std::make_shared<core::repository::VolatileContentRepository>(); + flow_repo_ = std::make_shared<SwappingFlowFileTestRepo>(); + flow_repo_->loadComponent(content_repo_); + auto clock = std::make_unique<utils::ManualSteadyClock>(); + clock_ = clock.get(); + queue_ = std::make_shared<VerifiedQueue>(std::static_pointer_cast<minifi::SwapManager>(flow_repo_), std::move(clock)); + } + + void setLimits(size_t min_size, size_t target_size, size_t max_size) { + queue_->impl.setMinSize(min_size); + queue_->impl.setTargetSize(target_size); + queue_->impl.setMaxSize(max_size); + } + + struct SwapEventPattern { + EventKind kind; + std::initializer_list<unsigned > seconds; + }; + + void verifySwapEvents(std::vector<SwapEventPattern> events) { + REQUIRE(flow_repo_->swap_events_.size() == events.size()); + size_t idx = 0; + for (auto& pattern : events) { + REQUIRE(pattern.kind == flow_repo_->swap_events_[idx].kind); + flow_repo_->swap_events_[idx].verifyTimes(pattern.seconds); + } + } + + void clearSwapEvents() { + flow_repo_->swap_events_.clear(); + } + + void verifyQueue(std::initializer_list<unsigned> live, std::optional<std::initializer_list<unsigned>> inter, std::initializer_list<unsigned> swapped) { + queue_->verify(live, inter, swapped); + } + + void pushAll(std::initializer_list<unsigned> seconds) { + for (auto sec : seconds) { + auto ff = std::static_pointer_cast<core::FlowFile>(std::make_shared<minifi::FlowFileRecord>()); + ff->setPenaltyExpiration(Timepoint{std::chrono::seconds{sec}}); + queue_->push(std::move(ff)); + } + } + + void popAll(std::initializer_list<unsigned> seconds, bool check_is_work_available = false) { + for (auto sec : seconds) { + if (check_is_work_available) { + REQUIRE(queue_->isWorkAvailable()); + } + auto ff = queue_->poll(); + REQUIRE(ff->getPenaltyExpiration() == Timepoint{std::chrono::seconds{sec}}); + } + } + + std::shared_ptr<SwappingFlowFileTestRepo> flow_repo_; + std::shared_ptr<core::repository::VolatileContentRepository> content_repo_; + std::shared_ptr<VerifiedQueue> queue_; + // owned by the queue_ + utils::ManualSteadyClock* clock_; +};
