This is an automated email from the ASF dual-hosted git repository.

lordgamez pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 344c08cb03f87e57cb5806b02dc7a420ba5efacc
Author: Adam Debreceni <[email protected]>
AuthorDate: Thu Mar 18 17:11:02 2021 +0100

    MINIFICPP-1525 - Support flow file swapping in Connection
    
    Signed-off-by: Gabor Gyimesi <[email protected]>
    
    This closes #1038
---
 extensions/rocksdb-repos/FlowFileLoader.cpp        | 115 ++++++++++
 extensions/rocksdb-repos/FlowFileLoader.h          |  65 ++++++
 extensions/rocksdb-repos/FlowFileRepository.cpp    |   1 +
 extensions/rocksdb-repos/FlowFileRepository.h      |  25 ++-
 .../tests/unit/YamlConnectionParserTest.cpp        |   9 +
 libminifi/include/Connection.h                     |  10 +
 .../{utils/FlowFileQueue.h => SwapManager.h}       |  43 ++--
 libminifi/include/core/FlowFile.h                  |   4 +
 libminifi/include/core/Repository.h                |   3 +-
 libminifi/include/core/yaml/YamlConnectionParser.h |   1 +
 libminifi/include/utils/FlowFileQueue.h            |  72 ++++++-
 libminifi/include/utils/MinMaxHeap.h               |  76 +++++++
 libminifi/include/utils/MinifiConcurrentQueue.h    |  11 +-
 libminifi/include/utils/TestUtils.h                |  18 ++
 libminifi/include/utils/TimeUtil.h                 |   4 +
 libminifi/src/Connection.cpp                       |  31 ++-
 libminifi/src/core/FlowConfiguration.cpp           |   9 +
 libminifi/src/core/yaml/YamlConfiguration.cpp      |   1 +
 libminifi/src/core/yaml/YamlConnectionParser.cpp   |  18 +-
 libminifi/src/utils/FlowFileQueue.cpp              | 235 +++++++++++++++++---
 libminifi/src/utils/ThreadPool.cpp                 |   8 +-
 libminifi/test/Utils.h                             |  17 ++
 libminifi/test/rocksdb-tests/SwapTests.cpp         | 137 ++++++++++++
 libminifi/test/unit/FlowFileQueueSwapTests.cpp     | 183 ++++++++++++++++
 libminifi/test/unit/FlowFileQueueTests.cpp         |  19 +-
 libminifi/test/unit/ProvenanceTestHelper.h         |   5 +-
 libminifi/test/unit/SwapTestController.h           | 240 +++++++++++++++++++++
 27 files changed, 1279 insertions(+), 81 deletions(-)

diff --git a/extensions/rocksdb-repos/FlowFileLoader.cpp 
b/extensions/rocksdb-repos/FlowFileLoader.cpp
new file mode 100644
index 000000000..f2a13f7bc
--- /dev/null
+++ b/extensions/rocksdb-repos/FlowFileLoader.cpp
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "FlowFileLoader.h"
+
+#include <span>
+#include <memory>
+#include <string>
+#include <vector>
+#include <utility>
+
+#include "logging/LoggerConfiguration.h"
+#include "FlowFileRecord.h"
+
+namespace org::apache::nifi::minifi {
+
+FlowFileLoader::FlowFileLoader(gsl::not_null<minifi::internal::RocksDatabase*> 
db, std::shared_ptr<core::ContentRepository> content_repo)
+  : db_(db),
+    content_repo_(std::move(content_repo)),
+    logger_(core::logging::LoggerFactory<FlowFileLoader>::getLogger()) {}
+
+FlowFileLoader::~FlowFileLoader() {
+  stop();
+}
+
+std::future<FlowFileLoader::FlowFilePtrVec> 
FlowFileLoader::load(std::vector<SwappedFlowFile> flow_files) {
+  auto promise = std::make_shared<std::promise<FlowFilePtrVec>>();
+  std::future<FlowFilePtrVec> future = promise->get_future();
+  utils::Worker<utils::TaskRescheduleInfo> task{[this, flow_files = 
std::move(flow_files), promise = std::move(promise)] {
+      return loadImpl(flow_files, promise);
+    },
+    "",  // doesn't matter that tasks alias by name, as we never actually 
query their status or stop a single task
+    std::make_unique<utils::ComplexMonitor>()};
+  // the dummy_future is for the return value of the Worker's lambda, 
rerunning this lambda
+  // depends on run_determinant + result
+  // we could create a custom run_determinant to instead determine if/when it 
should be rerun
+  // based on the lambda's return value (e.g. it could return a 
nonstd::expected<FlowFilePtrVec, TaskRescheduleInfo>)
+  // but then the std::future would also bear this type
+  std::future<utils::TaskRescheduleInfo> dummy_future;
+  thread_pool_.execute(std::move(task), dummy_future);
+  return future;
+}
+
+void FlowFileLoader::start() {
+  thread_pool_.start();
+}
+
+void FlowFileLoader::stop() {
+  thread_pool_.shutdown();
+}
+
+utils::TaskRescheduleInfo FlowFileLoader::loadImpl(const 
std::vector<SwappedFlowFile>& flow_files, const 
std::shared_ptr<std::promise<FlowFilePtrVec>>& output) {
+  auto opendb = db_->open();
+  if (!opendb) {
+    logger_->log_error("Couldn't open database to swap-in flow files");
+    return utils::TaskRescheduleInfo::RetryIn(std::chrono::seconds{30});
+  }
+  try {
+    FlowFilePtrVec result;
+    result.reserve(flow_files.size());
+    rocksdb::ReadOptions read_options;
+    std::vector<utils::SmallString<36>> serialized_keys;
+    serialized_keys.reserve(flow_files.size());
+    for (const auto& item : flow_files) {
+      serialized_keys.push_back(item.id.to_string());
+    }
+    std::vector<rocksdb::Slice> keys;
+    keys.reserve(flow_files.size());
+    for (size_t idx = 0; idx < flow_files.size(); ++idx) {
+      keys.emplace_back(serialized_keys[idx].data(), 
serialized_keys[idx].length());
+    }
+    std::vector<std::string> serialized_items;
+    serialized_items.reserve(flow_files.size());
+    std::vector<rocksdb::Status> statuses = opendb->MultiGet(read_options, 
keys, &serialized_items);
+    for (size_t idx = 0; idx < statuses.size(); ++idx) {
+      if (!statuses[idx].ok()) {
+        logger_->log_error("Failed to fetch flow file \"%s\"", 
serialized_keys[idx]);
+        return utils::TaskRescheduleInfo::RetryIn(std::chrono::seconds{30});
+      }
+      utils::Identifier container_id;
+      auto flow_file = FlowFileRecord::DeSerialize(
+          std::as_bytes(std::span(serialized_items[idx])), content_repo_, 
container_id);
+      if (!flow_file) {
+        // corrupted flow file
+        logger_->log_error("Failed to deserialize flow file \"%s\"", 
serialized_keys[idx]);
+      } else {
+        flow_file->setStoredToRepository(true);
+        flow_file->setPenaltyExpiration(flow_files[idx].to_be_processed_after);
+        result.push_back(std::move(flow_file));
+        logger_->log_debug("Deserialized flow file \"%s\"", 
serialized_keys[idx]);
+      }
+    }
+    output->set_value(result);
+    return utils::TaskRescheduleInfo::Done();
+  } catch (const std::exception& err) {
+    logger_->log_error("Error while swapping flow files in: %s", err.what());
+    return utils::TaskRescheduleInfo::RetryIn(std::chrono::seconds{60});
+  }
+}
+
+}  // namespace org::apache::nifi::minifi
diff --git a/extensions/rocksdb-repos/FlowFileLoader.h 
b/extensions/rocksdb-repos/FlowFileLoader.h
new file mode 100644
index 000000000..9c7ee6d51
--- /dev/null
+++ b/extensions/rocksdb-repos/FlowFileLoader.h
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <future>
+#include <list>
+#include <vector>
+#include <memory>
+
+#include "database/RocksDatabase.h"
+#include "FlowFile.h"
+#include "utils/gsl.h"
+#include "core/ContentRepository.h"
+#include "SwapManager.h"
+#include "utils/ThreadPool.h"
+#include "core/logging/Logger.h"
+
+namespace org::apache::nifi::minifi {
+
+class FlowFileLoader {
+  using FlowFilePtr = std::shared_ptr<core::FlowFile>;
+  using FlowFilePtrVec = std::vector<FlowFilePtr>;
+
+  static constexpr size_t thread_count_ = 1;
+
+ public:
+  FlowFileLoader(gsl::not_null<minifi::internal::RocksDatabase*> db, 
std::shared_ptr<core::ContentRepository> content_repo);
+
+  ~FlowFileLoader();
+
+  void start();
+
+  void stop();
+
+  std::future<FlowFilePtrVec> load(std::vector<SwappedFlowFile> flow_files);
+
+ private:
+  utils::TaskRescheduleInfo loadImpl(const std::vector<SwappedFlowFile>& 
flow_files, const std::shared_ptr<std::promise<FlowFilePtrVec>>& output);
+
+  utils::ThreadPool<utils::TaskRescheduleInfo> thread_pool_{thread_count_, 
false, nullptr, "FlowFileLoaderThreadPool"};
+
+  gsl::not_null<minifi::internal::RocksDatabase*> db_;
+
+  // TODO(adebreceni): shared_ptr is needed to call FlowFileRecord::Deserialize
+  //    this ownership could be removed if that changes
+  std::shared_ptr<core::ContentRepository> content_repo_;
+  std::shared_ptr<core::logging::Logger> logger_;
+};
+
+}  // namespace org::apache::nifi::minifi
diff --git a/extensions/rocksdb-repos/FlowFileRepository.cpp 
b/extensions/rocksdb-repos/FlowFileRepository.cpp
index 6bd2f4982..39355dc80 100644
--- a/extensions/rocksdb-repos/FlowFileRepository.cpp
+++ b/extensions/rocksdb-repos/FlowFileRepository.cpp
@@ -251,6 +251,7 @@ void FlowFileRepository::initialize_repository() {
 void FlowFileRepository::loadComponent(const 
std::shared_ptr<core::ContentRepository> &content_repo) {
   content_repo_ = content_repo;
   repo_size_ = 0;
+  swap_loader_ = 
std::make_unique<FlowFileLoader>(gsl::make_not_null(db_.get()), content_repo_);
 
   initialize_repository();
 }
diff --git a/extensions/rocksdb-repos/FlowFileRepository.h 
b/extensions/rocksdb-repos/FlowFileRepository.h
index 6906979d2..487363cd1 100644
--- a/extensions/rocksdb-repos/FlowFileRepository.h
+++ b/extensions/rocksdb-repos/FlowFileRepository.h
@@ -34,6 +34,9 @@
 #include "database/RocksDatabase.h"
 #include "encryption/RocksDbEncryptionProvider.h"
 #include "utils/crypto/EncryptionProvider.h"
+#include "SwapManager.h"
+#include "FlowFileLoader.h"
+#include "range/v3/algorithm/all_of.hpp"
 
 namespace org::apache::nifi::minifi::core::repository {
 
@@ -53,7 +56,7 @@ constexpr auto FLOWFILE_REPOSITORY_RETRY_INTERVAL_INCREMENTS 
= std::chrono::mill
  * Flow File repository
  * Design: Extends Repository and implements the run function, using rocksdb 
as the primary substrate.
  */
-class FlowFileRepository : public core::Repository {
+class FlowFileRepository : public core::Repository, public SwapManager, public 
std::enable_shared_from_this<FlowFileRepository> {
  public:
   static constexpr const char* ENCRYPTION_KEY_NAME = 
"nifi.flowfile.repository.encryption.key";
   // Constructor
@@ -203,6 +206,25 @@ class FlowFileRepository : public core::Repository {
     running_ = true;
     thread_ = std::thread(&FlowFileRepository::run, this);
     logger_->log_debug("%s Repository Monitor Thread Start", getName());
+    if (swap_loader_) {
+      swap_loader_->start();
+    }
+  }
+
+  void stop() override {
+    if (swap_loader_) {
+      swap_loader_->stop();
+    }
+    core::Repository::stop();
+  }
+
+  void store(std::vector<std::shared_ptr<core::FlowFile>> flow_files) override 
{
+    gsl_Expects(ranges::all_of(flow_files, &FlowFile::isStored));
+    // pass, flowfiles are already persisted in the repository
+  }
+
+  std::future<std::vector<std::shared_ptr<core::FlowFile>>> 
load(std::vector<SwappedFlowFile> flow_files) override {
+    return swap_loader_->load(std::move(flow_files));
   }
 
  private:
@@ -229,6 +251,7 @@ class FlowFileRepository : public core::Repository {
   std::shared_ptr<core::ContentRepository> content_repo_;
   std::unique_ptr<minifi::internal::RocksDatabase> db_;
   std::unique_ptr<rocksdb::Checkpoint> checkpoint_;
+  std::unique_ptr<FlowFileLoader> swap_loader_;
   std::shared_ptr<logging::Logger> logger_;
   std::shared_ptr<minifi::Configure> config_;
 };
diff --git 
a/extensions/standard-processors/tests/unit/YamlConnectionParserTest.cpp 
b/extensions/standard-processors/tests/unit/YamlConnectionParserTest.cpp
index 434134ca2..0276ddd90 100644
--- a/extensions/standard-processors/tests/unit/YamlConnectionParserTest.cpp
+++ b/extensions/standard-processors/tests/unit/YamlConnectionParserTest.cpp
@@ -67,6 +67,12 @@ TEST_CASE("Connections components are parsed from yaml", 
"[YamlConfiguration]")
     REQUIRE(231 == yaml_connection_parser.getWorkQueueSizeFromYaml());
     REQUIRE(12582912 == 
yaml_connection_parser.getWorkQueueDataSizeFromYaml());  // 12 * 1024 * 1024 B
   }
+  SECTION("Queue swap threshold is read") {
+    YAML::Node connection_node = YAML::Load(std::string {
+        "swap threshold: 231\n" });
+    YamlConnectionParser yaml_connection_parser(connection_node, "test_node", 
parent_ptr, logger);
+    REQUIRE(231 == yaml_connection_parser.getSwapThresholdFromYaml());
+  }
   SECTION("Source and destination names and uuids are read") {
     const utils::Identifier expected_source_id = utils::generateUUID();
     const utils::Identifier expected_destination_id = utils::generateUUID();
@@ -121,6 +127,7 @@ TEST_CASE("Connections components are parsed from yaml", 
"[YamlConfiguration]")
       
CHECK_THROWS(yaml_connection_parser.configureConnectionSourceRelationshipsFromYaml(*connection));
       CHECK_NOTHROW(yaml_connection_parser.getWorkQueueSizeFromYaml());
       CHECK_NOTHROW(yaml_connection_parser.getWorkQueueDataSizeFromYaml());
+      CHECK_NOTHROW(yaml_connection_parser.getSwapThresholdFromYaml());
       CHECK_THROWS(yaml_connection_parser.getSourceUUIDFromYaml());
       CHECK_THROWS(yaml_connection_parser.getDestinationUUIDFromYaml());
       CHECK_NOTHROW(yaml_connection_parser.getFlowFileExpirationFromYaml());
@@ -163,11 +170,13 @@ TEST_CASE("Connections components are parsed from yaml", 
"[YamlConfiguration]")
         YAML::Node connection_node = YAML::Load(std::string {
             "max work queue size: \n"
             "max work queue data size: \n"
+            "swap threshold: \n"
             "flowfile expiration: \n"
             "drop empty: \n"});
         YamlConnectionParser yaml_connection_parser(connection_node, 
"test_node", parent_ptr, logger);
         CHECK(0 == yaml_connection_parser.getWorkQueueSizeFromYaml());
         CHECK(0 == yaml_connection_parser.getWorkQueueDataSizeFromYaml());
+        CHECK(0 == yaml_connection_parser.getSwapThresholdFromYaml());
         CHECK(0s == yaml_connection_parser.getFlowFileExpirationFromYaml());
         CHECK(0 == yaml_connection_parser.getDropEmptyFromYaml());
       }
diff --git a/libminifi/include/Connection.h b/libminifi/include/Connection.h
index e1196c92e..dcf4af558 100644
--- a/libminifi/include/Connection.h
+++ b/libminifi/include/Connection.h
@@ -35,9 +35,12 @@
 #include "core/Repository.h"
 #include "utils/FlowFileQueue.h"
 
+struct ConnectionTestAccessor;
+
 namespace org::apache::nifi::minifi {
 
 class Connection : public core::Connectable {
+  friend struct ::ConnectionTestAccessor;
  public:
   explicit Connection(std::shared_ptr<core::Repository> flow_repository, 
std::shared_ptr<core::ContentRepository> content_repo, const std::string &name);
   explicit Connection(std::shared_ptr<core::Repository> flow_repository, 
std::shared_ptr<core::ContentRepository> content_repo, const std::string &name, 
const utils::Identifier &uuid);
@@ -45,6 +48,8 @@ class Connection : public core::Connectable {
                       const utils::Identifier &srcUUID);
   explicit Connection(std::shared_ptr<core::Repository> flow_repository, 
std::shared_ptr<core::ContentRepository> content_repo, const std::string &name, 
const utils::Identifier &uuid,
                       const utils::Identifier &srcUUID, const 
utils::Identifier &destUUID);
+  explicit Connection(std::shared_ptr<core::Repository> flow_repository, 
std::shared_ptr<core::ContentRepository> content_repo, 
std::shared_ptr<SwapManager> swap_manager,
+                      const std::string& name, const utils::Identifier& uuid);
   // Destructor
   ~Connection() override = default;
 
@@ -110,6 +115,11 @@ class Connection : public core::Connectable {
   void setMaxQueueDataSize(uint64_t size) {
     max_data_queue_size_ = size;
   }
+  void setSwapThreshold(uint64_t size) {
+    queue_.setTargetSize(size);
+    queue_.setMinSize(size / 2);
+    queue_.setMaxSize(size * 3 / 2);
+  }
   // Get Max Queue Data Size
   uint64_t getMaxQueueDataSize() {
     return max_data_queue_size_;
diff --git a/libminifi/include/utils/FlowFileQueue.h 
b/libminifi/include/SwapManager.h
similarity index 56%
copy from libminifi/include/utils/FlowFileQueue.h
copy to libminifi/include/SwapManager.h
index a6d343fbf..ed3535dbc 100644
--- a/libminifi/include/utils/FlowFileQueue.h
+++ b/libminifi/include/SwapManager.h
@@ -14,41 +14,28 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 #pragma once
 
-#include <memory>
-#include <queue>
+#include <future>
 #include <vector>
+#include <memory>
 
 #include "core/FlowFile.h"
+#include "utils/Id.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace utils {
-
-class FlowFileQueue {
- public:
-  using value_type = std::shared_ptr<core::FlowFile>;
-
-  value_type pop();
-  void push(const value_type& element);
-  void push(value_type&& element);
-  bool isWorkAvailable() const;
-  bool empty() const;
-  size_t size() const;
+namespace org::apache::nifi::minifi {
 
- private:
-  struct FlowFilePenaltyExpirationComparator {
-    bool operator()(const value_type& left, const value_type& right);
-  };
+struct SwappedFlowFile {
+  utils::Identifier id;
+  std::chrono::steady_clock::time_point to_be_processed_after;
+};
 
-  std::priority_queue<value_type, std::vector<value_type>, 
FlowFilePenaltyExpirationComparator> queue_;
+class SwapManager {
+ public:
+  virtual void store(std::vector<std::shared_ptr<core::FlowFile>> flow_files) 
= 0;
+  virtual std::future<std::vector<std::shared_ptr<core::FlowFile>>> 
load(std::vector<SwappedFlowFile> flow_files) = 0;
+  virtual ~SwapManager() = default;
 };
 
-}  // namespace utils
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi
diff --git a/libminifi/include/core/FlowFile.h 
b/libminifi/include/core/FlowFile.h
index cc55913a7..d2c5710c2 100644
--- a/libminifi/include/core/FlowFile.h
+++ b/libminifi/include/core/FlowFile.h
@@ -219,6 +219,10 @@ class FlowFile : public CoreComponent, public 
ReferenceContainer {
     return to_be_processed_after_;
   }
 
+  void setPenaltyExpiration(std::chrono::time_point<std::chrono::steady_clock> 
to_be_processed_after) {
+    to_be_processed_after_ = to_be_processed_after;
+  }
+
   /**
    * Gets the offset within the flow file
    * @return size as a uint64_t
diff --git a/libminifi/include/core/Repository.h 
b/libminifi/include/core/Repository.h
index 33804517f..afe3e5750 100644
--- a/libminifi/include/core/Repository.h
+++ b/libminifi/include/core/Repository.h
@@ -43,6 +43,7 @@
 #include "core/Connectable.h"
 #include "core/TraceableResource.h"
 #include "utils/BackTrace.h"
+#include "SwapManager.h"
 
 #ifndef WIN32
 #include <sys/stat.h>
@@ -141,7 +142,7 @@ class Repository : public virtual 
core::SerializableComponent, public core::Trac
   // Start the repository monitor thread
   virtual void start();
   // Stop the repository monitor thread
-  void stop();
+  virtual void stop();
   // whether the repo is full
   virtual bool isFull() {
     return repo_full_;
diff --git a/libminifi/include/core/yaml/YamlConnectionParser.h 
b/libminifi/include/core/yaml/YamlConnectionParser.h
index f15224fd8..fe6f58ed8 100644
--- a/libminifi/include/core/yaml/YamlConnectionParser.h
+++ b/libminifi/include/core/yaml/YamlConnectionParser.h
@@ -48,6 +48,7 @@ class YamlConnectionParser {
   [[nodiscard]] uint64_t getWorkQueueSizeFromYaml() const;
   [[nodiscard]] uint64_t getWorkQueueDataSizeFromYaml() const;
   [[nodiscard]] utils::Identifier getSourceUUIDFromYaml() const;
+  [[nodiscard]] uint64_t getSwapThresholdFromYaml() const;
   [[nodiscard]] utils::Identifier getDestinationUUIDFromYaml() const;
   [[nodiscard]] std::chrono::milliseconds getFlowFileExpirationFromYaml() 
const;
   [[nodiscard]] bool getDropEmptyFromYaml() const;
diff --git a/libminifi/include/utils/FlowFileQueue.h 
b/libminifi/include/utils/FlowFileQueue.h
index a6d343fbf..952d3fc12 100644
--- a/libminifi/include/utils/FlowFileQueue.h
+++ b/libminifi/include/utils/FlowFileQueue.h
@@ -17,10 +17,16 @@
 #pragma once
 
 #include <memory>
-#include <queue>
 #include <vector>
+#include <algorithm>
+#include <utility>
 
 #include "core/FlowFile.h"
+#include "MinMaxHeap.h"
+#include "SwapManager.h"
+#include "TimeUtil.h"
+
+struct FlowFileQueueTestAccessor;
 
 namespace org {
 namespace apache {
@@ -29,22 +35,78 @@ namespace minifi {
 namespace utils {
 
 class FlowFileQueue {
+  friend struct ::FlowFileQueueTestAccessor;
+  using TimePoint = std::chrono::steady_clock::time_point;
+
  public:
   using value_type = std::shared_ptr<core::FlowFile>;
 
+  explicit FlowFileQueue(std::shared_ptr<SwapManager> swap_manager = {});
+
   value_type pop();
-  void push(const value_type& element);
-  void push(value_type&& element);
+  std::optional<value_type> tryPop();
+  std::optional<value_type> tryPop(std::chrono::milliseconds timeout);
+  void push(value_type element);
   bool isWorkAvailable() const;
   bool empty() const;
   size_t size() const;
+  void setMinSize(size_t min_size);
+  void setTargetSize(size_t target_size);
+  void setMaxSize(size_t max_size);
+  void clear();
 
  private:
+  std::optional<value_type> 
tryPopImpl(std::optional<std::chrono::milliseconds> timeout);
+
+  void initiateLoadIfNeeded();
+
+  struct LoadTask {
+    TimePoint min;
+    TimePoint max;
+    std::future<std::vector<std::shared_ptr<core::FlowFile>>> items;
+    size_t count;
+    // flow files that have been pushed into the queue while a
+    // load was pending
+    std::vector<value_type> intermediate_items;
+
+    LoadTask(TimePoint min, TimePoint max, 
std::future<std::vector<std::shared_ptr<core::FlowFile>>> items, size_t count)
+      : min(min), max(max), items(std::move(items)), count(count) {}
+
+    size_t size() const {
+      return count + intermediate_items.size();
+    }
+  };
+
+  bool processLoadTaskWait(std::optional<std::chrono::milliseconds> timeout);
+
   struct FlowFilePenaltyExpirationComparator {
-    bool operator()(const value_type& left, const value_type& right);
+    bool operator()(const value_type& left, const value_type& right) const;
+  };
+
+  struct SwappedFlowFileComparator {
+    bool operator()(const SwappedFlowFile& left, const SwappedFlowFile& right) 
const;
   };
 
-  std::priority_queue<value_type, std::vector<value_type>, 
FlowFilePenaltyExpirationComparator> queue_;
+  size_t shouldSwapOutCount() const;
+
+  size_t shouldSwapInCount() const;
+
+  std::shared_ptr<SwapManager> swap_manager_;
+  // a load is initiated if the queue_ shrinks below this threshold
+  std::atomic<size_t> min_size_{0};
+  // a given operation (load/store) will try to approach this size
+  std::atomic<size_t> target_size_{0};
+  // a store is initiated if the queue_ grows beyond this threshold
+  std::atomic<size_t> max_size_{0};
+
+  MinMaxHeap<SwappedFlowFile, SwappedFlowFileComparator> swapped_flow_files_;
+  // the pending swap-in operation (if any)
+  std::optional<LoadTask> load_task_;
+  MinMaxHeap<value_type, FlowFilePenaltyExpirationComparator> queue_;
+
+  std::unique_ptr<timeutils::SteadyClock> 
clock_{std::make_unique<timeutils::SteadyClock>()};
+
+  std::shared_ptr<core::logging::Logger> logger_;
 };
 
 }  // namespace utils
diff --git a/libminifi/include/utils/MinMaxHeap.h 
b/libminifi/include/utils/MinMaxHeap.h
new file mode 100644
index 000000000..edb6365bb
--- /dev/null
+++ b/libminifi/include/utils/MinMaxHeap.h
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <functional>
+#include <set>
+#include <cmath>
+#include <utility>
+
+#include "utils/gsl.h"
+
+struct MinMaxHeapTestAccessor;
+
+namespace org::apache::nifi::minifi::utils {
+
+template<typename T, typename Comparator = std::less<T>>
+class MinMaxHeap {
+ public:
+  void clear() {
+    data_.clear();
+  }
+
+  const T& min() const {
+    return *data_.begin();
+  }
+
+  const T& max() const {
+    return *data_.rbegin();
+  }
+
+  size_t size() const {
+    return data_.size();
+  }
+
+  bool empty() const {
+    return data_.empty();
+  }
+
+  void push(T item) {
+    data_.insert(std::move(item));
+  }
+
+  T popMin() {
+    auto it = data_.begin();
+    T min = std::move(*it);
+    data_.erase(it);
+    return min;
+  }
+
+  T popMax() {
+    auto it = std::prev(data_.end());
+    T max = std::move(*it);
+    data_.erase(it);
+    return max;
+  }
+
+ private:
+  std::multiset<T, Comparator> data_;
+};
+
+}  // namespace org::apache::nifi::minifi::utils
diff --git a/libminifi/include/utils/MinifiConcurrentQueue.h 
b/libminifi/include/utils/MinifiConcurrentQueue.h
index 5c229fac8..8abc0ec93 100644
--- a/libminifi/include/utils/MinifiConcurrentQueue.h
+++ b/libminifi/include/utils/MinifiConcurrentQueue.h
@@ -25,6 +25,7 @@
 #include <condition_variable>
 #include <utility>
 #include <stdexcept>
+#include <atomic>
 
 #include "utils/TryMoveCall.h"
 
@@ -215,25 +216,29 @@ class ConditionConcurrentQueue : private 
ConcurrentQueue<T> {
   }
 
   void stop() {
+    // this lock ensures that other threads did not yet
+    // check the running_ condition (as they all acquire
+    // the lock before the check) or already unlocked and
+    // are waiting, thus receiving the notify_all
+    // TODO(adebreceni): investigate a waiting_ counter
+    //   approach that would render the locking here unnecessary
     std::lock_guard<std::mutex> guard(this->mtx_);
     running_ = false;
     cv_.notify_all();
   }
 
   void start() {
-    std::unique_lock<std::mutex> lck(this->mtx_);
     running_ = true;
   }
 
   bool isRunning() const {
-    std::lock_guard<std::mutex> guard(this->mtx_);
     return running_;  // In case it's not running no notifications are 
generated, dequeueing fails instead of blocking to avoid hanging threads
   }
 
   using ConcurrentQueue<T>::remove;
 
  private:
-  bool running_;
+  std::atomic<bool> running_;
   std::condition_variable cv_;
 };
 
diff --git a/libminifi/include/utils/TestUtils.h 
b/libminifi/include/utils/TestUtils.h
index c125ad72c..37e1543f3 100644
--- a/libminifi/include/utils/TestUtils.h
+++ b/libminifi/include/utils/TestUtils.h
@@ -69,6 +69,24 @@ class ManualClock : public timeutils::Clock {
   std::chrono::milliseconds time_{0};
 };
 
+class ManualSteadyClock : public timeutils::SteadyClock {
+ public:
+  std::chrono::milliseconds timeSinceEpoch() const override { return time_; }
+  void advance(std::chrono::milliseconds elapsed_time) {
+    if (elapsed_time.count() < 0) {
+      throw std::logic_error("A steady clock can only be advanced forward");
+    }
+    time_ += elapsed_time;
+  }
+
+  std::chrono::steady_clock::time_point now() const override {
+    return std::chrono::steady_clock::time_point{time_};
+  }
+
+ private:
+  std::chrono::milliseconds time_{0};
+};
+
 
 #ifdef WIN32
 // The tzdata location is set as a global variable in date-tz library
diff --git a/libminifi/include/utils/TimeUtil.h 
b/libminifi/include/utils/TimeUtil.h
index dbcf31014..906d851fe 100644
--- a/libminifi/include/utils/TimeUtil.h
+++ b/libminifi/include/utils/TimeUtil.h
@@ -84,6 +84,10 @@ class SteadyClock : public Clock {
   std::chrono::milliseconds timeSinceEpoch() const override {
     return 
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch());
   }
+
+  virtual std::chrono::time_point<std::chrono::steady_clock> now() const {
+    return std::chrono::steady_clock::now();
+  }
 };
 
 inline std::string getTimeStr(std::chrono::system_clock::time_point tp) {
diff --git a/libminifi/src/Connection.cpp b/libminifi/src/Connection.cpp
index 1de108b73..440fddbb7 100644
--- a/libminifi/src/Connection.cpp
+++ b/libminifi/src/Connection.cpp
@@ -70,6 +70,15 @@ Connection::Connection(std::shared_ptr<core::Repository> 
flow_repository, std::s
   logger_->log_debug("Connection %s created", name_);
 }
 
+Connection::Connection(std::shared_ptr<core::Repository> flow_repository, 
std::shared_ptr<core::ContentRepository> content_repo, 
std::shared_ptr<SwapManager> swap_manager,
+                       const std::string& name, const utils::Identifier& uuid)
+    : core::Connectable(name, uuid),
+      flow_repository_(std::move(flow_repository)),
+      content_repo_(std::move(content_repo)),
+      queue_(std::move(swap_manager)) {
+  logger_->log_debug("Connection %s created", name_);
+}
+
 bool Connection::isEmpty() const {
   std::lock_guard<std::mutex> lock(mutex_);
 
@@ -141,7 +150,11 @@ std::shared_ptr<core::FlowFile> 
Connection::poll(std::set<std::shared_ptr<core::
   std::lock_guard<std::mutex> lock(mutex_);
 
   while (queue_.isWorkAvailable()) {
-    std::shared_ptr<core::FlowFile> item = queue_.pop();
+    std::optional<std::shared_ptr<core::FlowFile>> opt_item = queue_.tryPop();
+    if (!opt_item) {
+      return nullptr;
+    }
+    std::shared_ptr<core::FlowFile> item = std::move(opt_item.value());
     queued_data_size_ -= item->getSize();
 
     if (expired_duration_.load() > 0ms) {
@@ -167,11 +180,16 @@ std::shared_ptr<core::FlowFile> 
Connection::poll(std::set<std::shared_ptr<core::
 
 void Connection::drain(bool delete_permanently) {
   std::lock_guard<std::mutex> lock(mutex_);
-
-  while (!queue_.empty()) {
-    std::shared_ptr<core::FlowFile> item = queue_.pop();
-    logger_->log_debug("Delete flow file UUID %s from connection %s, because 
it expired", item->getUUIDStr(), name_);
-    if (delete_permanently) {
+  if (!delete_permanently) {
+    // simply discard in-memory flow files
+    queue_.clear();
+  } else {
+    while (!queue_.empty()) {
+      auto opt_item = queue_.tryPop(std::chrono::milliseconds{100});
+      if (!opt_item) {
+        continue;
+      }
+      auto& item = opt_item.value();
       if (item->isStored() && flow_repository_->Delete(item->getUUIDStr())) {
         item->setStoredToRepository(false);
         auto claim = item->getResourceClaim();
@@ -179,6 +197,7 @@ void Connection::drain(bool delete_permanently) {
       }
     }
   }
+
   queued_data_size_ = 0;
   logger_->log_debug("Drain connection %s", name_);
 }
diff --git a/libminifi/src/core/FlowConfiguration.cpp 
b/libminifi/src/core/FlowConfiguration.cpp
index 6df2ed7f4..b35cda810 100644
--- a/libminifi/src/core/FlowConfiguration.cpp
+++ b/libminifi/src/core/FlowConfiguration.cpp
@@ -158,6 +158,15 @@ std::unique_ptr<core::ProcessGroup> 
FlowConfiguration::createRemoteProcessGroup(
 }
 
 std::unique_ptr<minifi::Connection> FlowConfiguration::createConnection(const 
std::string& name, const utils::Identifier& uuid) const {
+  // An alternative approach would be to thread the swap manager through all 
the classes
+  // but it kind of makes sense that swapping the flow files is the 
responsibility of the
+  // flow_file_repo_. If we introduce other swappers then we will have no 
other choice.
+  if (flow_file_repo_) {
+    auto swap_manager = 
std::dynamic_pointer_cast<SwapManager>(flow_file_repo_);
+    if (swap_manager) {
+      return std::make_unique<minifi::Connection>(flow_file_repo_, 
content_repo_, std::move(swap_manager), name, uuid);
+    }
+  }
   return std::make_unique<minifi::Connection>(flow_file_repo_, content_repo_, 
name, uuid);
 }
 
diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp 
b/libminifi/src/core/yaml/YamlConfiguration.cpp
index 5cd7ed91c..9f9be9be6 100644
--- a/libminifi/src/core/yaml/YamlConfiguration.cpp
+++ b/libminifi/src/core/yaml/YamlConfiguration.cpp
@@ -569,6 +569,7 @@ void YamlConfiguration::parseConnectionYaml(const 
YAML::Node& connectionsNode, c
     
connectionParser.configureConnectionSourceRelationshipsFromYaml(*connection);
     connection->setMaxQueueSize(connectionParser.getWorkQueueSizeFromYaml());
     
connection->setMaxQueueDataSize(connectionParser.getWorkQueueDataSizeFromYaml());
+    connection->setSwapThreshold(connectionParser.getSwapThresholdFromYaml());
     connection->setSourceUUID(connectionParser.getSourceUUIDFromYaml());
     
connection->setDestinationUUID(connectionParser.getDestinationUUIDFromYaml());
     
connection->setFlowExpirationDuration(connectionParser.getFlowFileExpirationFromYaml());
diff --git a/libminifi/src/core/yaml/YamlConnectionParser.cpp 
b/libminifi/src/core/yaml/YamlConnectionParser.cpp
index b29487275..b19e90eb0 100644
--- a/libminifi/src/core/yaml/YamlConnectionParser.cpp
+++ b/libminifi/src/core/yaml/YamlConnectionParser.cpp
@@ -75,7 +75,7 @@ uint64_t YamlConnectionParser::getWorkQueueSizeFromYaml() 
const {
       logger_->log_debug("Setting %" PRIu64 " as the max queue size.", 
max_work_queue_size);
       return max_work_queue_size;
     }
-    logger_->log_info("Invalid max queue size value: %s.", max_work_queue_str);
+    logger_->log_error("Invalid max queue size value: %s.", 
max_work_queue_str);
   }
   return 0;
 }
@@ -89,7 +89,21 @@ uint64_t 
YamlConnectionParser::getWorkQueueDataSizeFromYaml() const {
       logger_->log_debug("Setting %" PRIu64 "as the max as the max queue data 
size.", max_work_queue_data_size);
       return max_work_queue_data_size;
     }
-    logger_->log_info("Invalid max queue data size value: %s.", 
max_work_queue_str);
+    logger_->log_error("Invalid max queue data size value: %s.", 
max_work_queue_str);
+  }
+  return 0;
+}
+
+uint64_t YamlConnectionParser::getSwapThresholdFromYaml() const {
+  const YAML::Node swap_threshold_node = connectionNode_["swap threshold"];
+  if (swap_threshold_node) {
+    auto swap_threshold_str = swap_threshold_node.as<std::string>();
+    uint64_t swap_threshold;
+    if (core::Property::StringToInt(swap_threshold_str, swap_threshold)) {
+      logger_->log_debug("Setting %" PRIu64 " as the swap threshold.", 
swap_threshold);
+      return swap_threshold;
+    }
+    logger_->log_error("Invalid swap threshold value: %s.", 
swap_threshold_str);
   }
   return 0;
 }
diff --git a/libminifi/src/utils/FlowFileQueue.cpp 
b/libminifi/src/utils/FlowFileQueue.cpp
index 84b682d12..94414377f 100644
--- a/libminifi/src/utils/FlowFileQueue.cpp
+++ b/libminifi/src/utils/FlowFileQueue.cpp
@@ -16,59 +16,232 @@
  */
 
 #include "utils/FlowFileQueue.h"
+#include "core/logging/LoggerConfiguration.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace utils {
+namespace org::apache::nifi::minifi::utils {
 
-bool FlowFileQueue::FlowFilePenaltyExpirationComparator::operator()(const 
value_type& left, const value_type& right) {
-  // this is operator< implemented using > so that top() is the element with 
the smallest key (earliest expiration)
-  // rather than the element with the largest key, which is the default for 
std::priority_queue
-  return left->getPenaltyExpiration() > right->getPenaltyExpiration();
+bool FlowFileQueue::FlowFilePenaltyExpirationComparator::operator()(const 
value_type& left, const value_type& right) const {
+  // a flow file with earlier expiration compares less
+  return left->getPenaltyExpiration() < right->getPenaltyExpiration();
 }
 
+bool FlowFileQueue::SwappedFlowFileComparator::operator()(const 
SwappedFlowFile& left, const SwappedFlowFile& right) const {
+  // a swapped flow file with earlier expiration compares less
+  return left.to_be_processed_after < right.to_be_processed_after;
+}
+
+FlowFileQueue::FlowFileQueue(std::shared_ptr<SwapManager> swap_manager)
+  : swap_manager_(std::move(swap_manager)),
+    logger_(core::logging::LoggerFactory<FlowFileQueue>::getLogger()) {}
+
 FlowFileQueue::value_type FlowFileQueue::pop() {
-  if (empty()) {
-    throw std::logic_error{"pop() called on an empty FlowFileQueue"};
-  }
+  return tryPopImpl({}).value();
+}
 
-  value_type next_flow_file = queue_.top();
-  queue_.pop();
-  return next_flow_file;
+std::optional<FlowFileQueue::value_type> FlowFileQueue::tryPop() {
+  return tryPopImpl(std::chrono::milliseconds{0});
+}
+
+std::optional<FlowFileQueue::value_type> 
FlowFileQueue::tryPop(std::chrono::milliseconds timeout) {
+  return tryPopImpl(timeout);
+}
+
+std::optional<FlowFileQueue::value_type> 
FlowFileQueue::tryPopImpl(std::optional<std::chrono::milliseconds> timeout) {
+  std::optional<std::shared_ptr<core::FlowFile>> result;
+  if (!queue_.empty()) {
+    result = queue_.popMin();
+    if (processLoadTaskWait(std::chrono::milliseconds{0})) {
+      initiateLoadIfNeeded();
+    }
+    return result;
+  }
+  if (load_task_) {
+    logger_->log_debug("Head is empty checking already running load task");
+    if (!processLoadTaskWait(timeout)) {
+      return std::nullopt;
+    }
+    if (!queue_.empty()) {
+      // load provided items
+      result = queue_.popMin();
+      initiateLoadIfNeeded();
+      return result;
+    }
+  }
+  // no pending load_task_ and no items in the queue_
+  initiateLoadIfNeeded();
+  return std::nullopt;
 }
 
-void FlowFileQueue::push(const value_type& element) {
-  if (!element->isPenalized()) {
-    element->penalize(std::chrono::milliseconds{0});
+bool 
FlowFileQueue::processLoadTaskWait(std::optional<std::chrono::milliseconds> 
timeout) {
+  if (!load_task_) {
+    return true;
   }
+  std::future_status status = std::future_status::ready;
+  if (timeout) {
+    status = load_task_.value().items.wait_for(timeout.value());
+  }
+  if (status == std::future_status::timeout) {
+    logger_->log_debug("Load task is not yet completed");
+    return false;
+  }
+  gsl_Assert(status == std::future_status::ready);
 
-  queue_.push(element);
+  logger_->log_debug("Getting loaded flow files");
+  size_t swapped_in_count = 0;
+  size_t intermediate_count = 0;
+  for (auto&& item : load_task_->items.get()) {
+    ++swapped_in_count;
+    queue_.push(std::move(item));
+  }
+  for (auto&& intermediate_item : load_task_->intermediate_items) {
+    ++intermediate_count;
+    queue_.push(std::move(intermediate_item));
+  }
+  load_task_.reset();
+  logger_->log_debug("Swapped in '%zu' flow files and committed '%zu' pending 
files", swapped_in_count, intermediate_count);
+  return true;
 }
 
-void FlowFileQueue::push(value_type&& element) {
-  if (!element->isPenalized()) {
-    element->penalize(std::chrono::milliseconds{0});
+void FlowFileQueue::push(value_type element) {
+  // do not allow pushing elements in the past
+  element->setPenaltyExpiration(std::max(element->getPenaltyExpiration(), 
clock_->now()));
+
+  std::vector<value_type> flow_files_to_be_swapped_out;
+
+  if (load_task_) {
+    if (element->getPenaltyExpiration() <= load_task_->min) {
+      // flow file goes before load_task_
+      queue_.push(std::move(element));
+    } else if (load_task_->max <= element->getPenaltyExpiration()) {
+      // flow file goes after load_task_, i.e. immediately swapped out
+      flow_files_to_be_swapped_out.push_back(std::move(element));
+    } else {
+      // flow file belongs to the same range that is being swapped in
+      load_task_->intermediate_items.push_back(std::move(element));
+    }
+  } else if (!swapped_flow_files_.empty() && 
swapped_flow_files_.min().to_be_processed_after < 
element->getPenaltyExpiration()) {
+    // flow file goes into the swapped_flow_files_ set, i.e. immediately 
swapped out
+    flow_files_to_be_swapped_out.push_back(std::move(element));
+  } else {
+    queue_.push(std::move(element));
   }
 
-  queue_.push(std::move(element));
+  size_t flow_file_count = shouldSwapOutCount();
+  if (flow_file_count != 0) {
+    if (!load_task_) {
+      // we cannot initiate a queue_ swap while a load_task_ is pending
+      flow_files_to_be_swapped_out.reserve(flow_files_to_be_swapped_out.size() 
+ flow_file_count);
+      for (size_t i = 0; i < flow_file_count; ++i) {
+        flow_files_to_be_swapped_out.push_back(queue_.popMax());
+      }
+    }
+  }
+  if (!flow_files_to_be_swapped_out.empty()) {
+    for (const auto& flow_file : flow_files_to_be_swapped_out) {
+      swapped_flow_files_.push(SwappedFlowFile{flow_file->getUUID(), 
flow_file->getPenaltyExpiration()});
+    }
+    logger_->log_debug("Initiating store of %zu flow files", 
flow_files_to_be_swapped_out.size());
+    swap_manager_->store(std::move(flow_files_to_be_swapped_out));
+  }
 }
 
 bool FlowFileQueue::isWorkAvailable() const {
-  return !queue_.empty() && !queue_.top()->isPenalized();
+  auto now = clock_->now();
+  if (!queue_.empty()) {
+    return queue_.min()->getPenaltyExpiration() <= now;
+  }
+  if (load_task_) {
+    if (load_task_->min > now) {
+      return false;
+    }
+    auto status = load_task_->items.wait_for(std::chrono::milliseconds{0});
+    return status == std::future_status::ready;
+  }
+  return !swapped_flow_files_.empty() && 
swapped_flow_files_.min().to_be_processed_after <= now;
 }
 
 bool FlowFileQueue::empty() const {
-  return queue_.empty();
+  return size() == 0;
 }
 
 size_t FlowFileQueue::size() const {
-  return queue_.size();
+  return queue_.size() + (load_task_ ? load_task_->size()  : 0) + 
swapped_flow_files_.size();
+}
+
+void FlowFileQueue::clear() {
+  queue_.clear();
+  load_task_.reset();
+  swapped_flow_files_.clear();
+}
+
+void FlowFileQueue::initiateLoadIfNeeded() {
+  if (load_task_) {
+    throw std::logic_error("There is already an active load task running");
+  }
+  size_t flow_files_count = shouldSwapInCount();
+  if (flow_files_count == 0) {
+    return;
+  }
+  logger_->log_debug("Initiating load of %zu flow files", flow_files_count);
+  TimePoint min = TimePoint::max();
+  TimePoint max = TimePoint::min();
+  std::vector<SwappedFlowFile> flow_files;
+  flow_files.reserve(flow_files_count);
+  for (size_t i = 0; i < flow_files_count; ++i) {
+    SwappedFlowFile flow_file = swapped_flow_files_.popMin();
+    // TODO(adebreceni): since we are popping in order, we could elide these 
std::min and std::max comparisons
+    min = std::min(min, flow_file.to_be_processed_after);
+    max = std::max(max, flow_file.to_be_processed_after);
+    flow_files.push_back(flow_file);
+  }
+  load_task_ = {min, max, swap_manager_->load(std::move(flow_files)), 
flow_files_count};
+}
+
+void FlowFileQueue::setMinSize(size_t min_size) {
+  min_size_ = min_size;
+}
+
+void FlowFileQueue::setTargetSize(size_t target_size) {
+  target_size_ = target_size;
+}
+
+void FlowFileQueue::setMaxSize(size_t max_size) {
+  max_size_ = max_size;
+}
+
+size_t FlowFileQueue::shouldSwapOutCount() const {
+  if (!swap_manager_) {
+    return 0;
+  }
+  // read once for consistent view of a single atomic variable
+  size_t max_size = max_size_;
+  size_t target_size = target_size_;
+  if (max_size != 0 && target_size != 0
+      && max_size < queue_.size() && target_size < queue_.size()) {
+    return queue_.size() - target_size;
+  }
+  return 0;
 }
 
-}  // namespace utils
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+size_t FlowFileQueue::shouldSwapInCount() const {
+  if (!swap_manager_) {
+    return 0;
+  }
+  // read once for consistent view of a single atomic variable
+  size_t min_size = min_size_;
+  size_t target_size = target_size_;
+  if (min_size == 0 || target_size == 0) {
+    if (!swapped_flow_files_.empty()) {
+      logger_->log_info("Swapping in all the flow files");
+      return swapped_flow_files_.size();
+    }
+    return 0;
+  }
+  if (queue_.size() < min_size && queue_.size() < target_size) {
+    return std::min(target_size - queue_.size(), swapped_flow_files_.size());
+  }
+  return 0;
+}
+
+
+}  // namespace org::apache::nifi::minifi::utils
diff --git a/libminifi/src/utils/ThreadPool.cpp 
b/libminifi/src/utils/ThreadPool.cpp
index 640c2c5f4..8612f0dbf 100644
--- a/libminifi/src/utils/ThreadPool.cpp
+++ b/libminifi/src/utils/ThreadPool.cpp
@@ -253,7 +253,13 @@ void ThreadPool<T>::shutdown() {
       manager_thread_.join();
     }
 
-    delayed_task_available_.notify_all();
+    {
+      // this lock ensures that the delayed_scheduler_thread_
+      // is not between checking the running_ and before the cv_.wait*
+      // as then, it would survive the notify_all call
+      std::lock_guard<std::mutex> worker_lock(worker_queue_mutex_);
+      delayed_task_available_.notify_all();
+    }
     if (delayed_scheduler_thread_.joinable()) {
       delayed_scheduler_thread_.join();
     }
diff --git a/libminifi/test/Utils.h b/libminifi/test/Utils.h
index d11757793..09f86663e 100644
--- a/libminifi/test/Utils.h
+++ b/libminifi/test/Utils.h
@@ -26,6 +26,9 @@
 using namespace std::chrono_literals;
 
 #undef GetObject  // windows.h #defines GetObject = GetObjectA or GetObjectW, 
which conflicts with rapidjson
+#include "Connection.h"
+#include "FlowFileQueue.h"
+#include "Catch.h"
 
 #define FIELD_ACCESSOR(field) \
   template<typename T> \
@@ -118,3 +121,17 @@ void sendMessagesViaTCP(const 
std::vector<std::string_view>& contents, uint64_t
   REQUIRE(!err);
   socket.close();
 }
+
+struct ConnectionTestAccessor {
+  FIELD_ACCESSOR(queue_);
+};
+
+struct FlowFileQueueTestAccessor {
+  FIELD_ACCESSOR(min_size_);
+  FIELD_ACCESSOR(max_size_);
+  FIELD_ACCESSOR(target_size_);
+  FIELD_ACCESSOR(clock_);
+  FIELD_ACCESSOR(swapped_flow_files_);
+  FIELD_ACCESSOR(load_task_);
+  FIELD_ACCESSOR(queue_);
+};
diff --git a/libminifi/test/rocksdb-tests/SwapTests.cpp 
b/libminifi/test/rocksdb-tests/SwapTests.cpp
new file mode 100644
index 000000000..3739376bb
--- /dev/null
+++ b/libminifi/test/rocksdb-tests/SwapTests.cpp
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "../Catch.h"
+#include "core/RepositoryFactory.h"
+#include "core/repository/VolatileContentRepository.h"
+#include "FlowFileRepository.h"
+#include "../TestBase.h"
+#include "../Utils.h"
+#include "StreamPipe.h"
+#include "IntegrationTestUtils.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "../unit/ProvenanceTestHelper.h"
+
+class OutputProcessor : public core::Processor {
+ public:
+  using core::Processor::Processor;
+
+  static const core::Relationship Success;
+
+  using core::Processor::onTrigger;
+
+  static constexpr const char* Description = "Processor used for testing 
cycles";
+  static auto properties() { return std::array<core::Property, 0>{}; }
+  static auto relationships() { return std::array{Success}; }
+  static constexpr bool SupportsDynamicProperties = false;
+  static constexpr bool SupportsDynamicRelationships = false;
+  static constexpr core::annotation::Input InputRequirement = 
core::annotation::Input::INPUT_ALLOWED;
+  static constexpr bool IsSingleThreaded = false;
+
+  ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
+
+  void initialize() override {
+    setSupportedProperties(properties());
+    setSupportedRelationships(relationships());
+  }
+
+  void onTrigger(core::ProcessContext* /*context*/, core::ProcessSession* 
session) override {
+    auto id = std::to_string(next_id_++);
+    auto ff = session->create();
+    ff->addAttribute("index", id);
+    session->write(ff, [&] (const std::shared_ptr<minifi::io::BaseStream>& 
output) -> int64_t {
+      auto ret = output->write(gsl::span<const char>(id.data(), 
id.size()).as_span<const std::byte>());
+      if (minifi::io::isError(ret)) {
+        return -1;
+      }
+      return gsl::narrow<int64_t>(ret);
+    });
+    session->transfer(ff, Success);
+    flow_files_.push_back(ff);
+  }
+
+  std::vector<std::shared_ptr<core::FlowFile>> flow_files_;
+  int next_id_{0};
+};
+
+const core::Relationship OutputProcessor::Success{"success", ""};
+
+TEST_CASE("Connection will on-demand swap flow files") {
+  TestController testController;
+  LogTestController::getInstance().setDebug<core::ContentRepository>();
+  
LogTestController::getInstance().setTrace<core::repository::FileSystemRepository>();
+  
LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
+  LogTestController::getInstance().setTrace<minifi::ResourceClaim>();
+  LogTestController::getInstance().setTrace<minifi::FlowFileRecord>();
+  LogTestController::getInstance().setTrace<minifi::utils::FlowFileQueue>();
+  LogTestController::getInstance().setTrace<minifi::FlowFileLoader>();
+  
LogTestController::getInstance().setTrace<core::repository::FlowFileRepository>();
+  
LogTestController::getInstance().setTrace<core::repository::VolatileRepository<minifi::ResourceClaim::Path>>();
+
+  auto dir = testController.createTempDirectory();
+
+  auto config = std::make_shared<minifi::Configure>();
+  config->set(minifi::Configure::nifi_dbcontent_repository_directory_default, 
utils::file::FileUtils::concat_path(dir, "content_repository"));
+  config->set(minifi::Configure::nifi_flowfile_repository_directory_default, 
utils::file::FileUtils::concat_path(dir, "flowfile_repository"));
+
+  auto prov_repo = std::make_shared<TestRepository>();
+  auto ff_repo = 
std::make_shared<core::repository::FlowFileRepository>("flowFileRepository");
+  auto swap_manager = std::dynamic_pointer_cast<minifi::SwapManager>(ff_repo);
+  auto content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
+
+  ff_repo->initialize(config);
+  content_repo->initialize(config);
+
+  ff_repo->loadComponent(content_repo);
+  ff_repo->start();
+
+  auto processor = std::make_shared<OutputProcessor>("proc");
+
+  auto connection = std::make_shared<minifi::Connection>(ff_repo, 
content_repo, swap_manager, "conn", 
utils::IdGenerator::getIdGenerator()->generate());
+  connection->setSwapThreshold(50);
+  connection->addRelationship(OutputProcessor::Success);
+  connection->setSourceUUID(processor->getUUID());
+  processor->addConnection(connection.get());
+
+  auto processor_node = std::make_shared<core::ProcessorNode>(processor.get());
+  auto context = std::make_shared<core::ProcessContext>(processor_node, 
nullptr, prov_repo, ff_repo, content_repo);
+  auto session_factory = 
std::make_shared<core::ProcessSessionFactory>(context);
+
+  for (size_t i = 0; i < 200; ++i) {
+    processor->onTrigger(context, session_factory);
+  }
+
+  REQUIRE(connection->getQueueSize() == processor->flow_files_.size());
+  utils::FlowFileQueue& queue = 
ConnectionTestAccessor::get_queue_(*connection);
+  // below max threshold live flow files
+  REQUIRE(FlowFileQueueTestAccessor::get_queue_(queue).size() <= 75);
+  REQUIRE(queue.size() == 200);
+
+  std::set<std::shared_ptr<core::FlowFile>> expired;
+  for (size_t i = 0; i < 200; ++i) {
+    std::shared_ptr<core::FlowFile> ff;
+    minifi::utils::verifyEventHappenedInPollTime(std::chrono::seconds{1}, [&] {
+      ff = connection->poll(expired);
+      return static_cast<bool>(ff);
+    });
+    REQUIRE(ff->getAttribute("index") == std::to_string(i));
+    REQUIRE(ff->getResourceClaim()->getContentFullPath() == 
processor->flow_files_[i]->getResourceClaim()->getContentFullPath());
+  }
+
+  REQUIRE(queue.empty());
+}
diff --git a/libminifi/test/unit/FlowFileQueueSwapTests.cpp 
b/libminifi/test/unit/FlowFileQueueSwapTests.cpp
new file mode 100644
index 000000000..a2e52cbc4
--- /dev/null
+++ b/libminifi/test/unit/FlowFileQueueSwapTests.cpp
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <random>
+
+#include "Connection.h"
+
+#include "../TestBase.h"
+#include "SwapTestController.h"
+
+TEST_CASE("Setting swap threshold sets underlying queue limits", 
"[SwapTest1]") {
+  const size_t target_size = 4;
+  const size_t min_size = target_size / 2;
+  const size_t max_size = target_size * 3 / 2;
+
+  minifi::Connection conn(nullptr, nullptr, "");
+  conn.setSwapThreshold(target_size);
+  
REQUIRE(FlowFileQueueTestAccessor::get_min_size_(ConnectionTestAccessor::get_queue_(conn))
 == min_size);
+  
REQUIRE(FlowFileQueueTestAccessor::get_target_size_(ConnectionTestAccessor::get_queue_(conn))
 == target_size);
+  
REQUIRE(FlowFileQueueTestAccessor::get_max_size_(ConnectionTestAccessor::get_queue_(conn))
 == max_size);
+}
+
+TEST_CASE_METHOD(SwapTestController, "Default constructed FlowFileQueue won't 
swap", "[SwapTest2]") {
+  for (unsigned i = 0; i < 100; ++i) {
+    pushAll({i});
+  }
+
+  REQUIRE(queue_->impl.size() == 100);
+
+  clock_->advance(std::chrono::seconds{200});
+  for (size_t i = 0; i < 100; ++i) {
+    queue_->poll();
+  }
+
+  REQUIRE(queue_->impl.empty());
+  verifySwapEvents({});
+}
+
+TEST_CASE_METHOD(SwapTestController, "Up to max no swap-out is triggered", 
"[SwapTest3]") {
+  setLimits(2, 4, 6);
+  pushAll({50, 20, 30, 60, 10, 40});
+
+  REQUIRE_FALSE(queue_->isWorkAvailable());
+  verifySwapEvents({});
+}
+
+TEST_CASE_METHOD(SwapTestController, "Pushing beyond max triggers a swap-out", 
"[SwapTest4]") {
+  setLimits(2, 4, 6);
+  pushAll({50, 20, 30, 60, 10, 40});
+
+  pushAll({28});
+  // size goes from 7 to 4, 3 largest must have been swapped out
+  verifySwapEvents({{Store, {60, 50, 40}}});
+  verifyQueue({10, 20, 28, 30}, {}, {40, 50, 60});
+}
+
+TEST_CASE_METHOD(SwapTestController, "Popping until min size does not trigger 
swap-in", "[SwapTest5]") {
+  setLimits(2, 4, 6);
+  pushAll({50, 20, 30, 60, 10, 40, 28});
+  clearSwapEvents();
+  clock_->advance(std::chrono::seconds{35});
+  REQUIRE(queue_->isWorkAvailable());
+  popAll({10, 20});
+  verifyQueue({28, 30}, {}, {40, 50, 60});
+  verifySwapEvents({});
+}
+
+TEST_CASE_METHOD(SwapTestController, "Popping beyond min size triggers 
swap-in", "[SwapTest6]") {
+  setLimits(2, 4, 6);
+  pushAll({50, 20, 30, 60, 10, 40, 28});
+  clearSwapEvents();
+  clock_->advance(std::chrono::seconds{35});
+  popAll({10, 20, 28});
+
+  // trying to swap-in all three swapped flow files
+  verifyQueue({30}, {{}}, {});
+  verifySwapEvents({{Load, {40, 50, 60}}});
+}
+
+TEST_CASE_METHOD(SwapTestController, "Pushing while a swap-in is pending", 
"[SwapTest7]") {
+  setLimits(2, 4, 6);
+  pushAll({50, 20, 30, 60, 10, 40, 28});
+  clock_->advance(std::chrono::seconds{35});
+  popAll({10, 20, 28});
+  verifyQueue({30}, {{}}, {});
+  clearSwapEvents();
+
+  SECTION("Pushing into the pending swap-in range") {
+    pushAll({45});
+    verifyQueue({30}, {{45}}, {});
+    verifySwapEvents({});
+  }
+
+  SECTION("Pushing before the pending swap-in range") {
+    pushAll({35});
+    verifyQueue({30, 35}, {{}}, {});
+    verifySwapEvents({});
+  }
+
+  SECTION("Pushing after the pending swap-in range") {
+    pushAll({65});
+    verifyQueue({30}, {{}}, {65});
+    verifySwapEvents({{Store, {65}}});
+  }
+}
+
+TEST_CASE_METHOD(SwapTestController, "isWorkAvailable depends on the swap-in 
task", "[SwapTest8]") {
+  setLimits(2, 4, 6);
+  pushAll({50, 20, 30, 60, 10, 40, 28});
+  popAll({10, 20, 28});
+  verifyQueue({30}, {{}}, {});
+
+  REQUIRE_FALSE(queue_->isWorkAvailable());
+
+  clock_->advance(std::chrono::seconds{35});
+
+  REQUIRE(queue_->isWorkAvailable());
+  popAll({30});
+  REQUIRE_FALSE(queue_->isWorkAvailable());
+
+  SECTION("Load is completed but the minimum of those files is still not 
viable") {
+    flow_repo_->load_tasks_[0].complete();
+    REQUIRE_FALSE(queue_->isWorkAvailable());
+  }
+
+  SECTION("The minimum of the load task is viable but not yet completed") {
+    clock_->advance(std::chrono::seconds{35});
+    REQUIRE_FALSE(queue_->isWorkAvailable());
+    // completing the task renders it viable
+    flow_repo_->load_tasks_[0].complete();
+    REQUIRE(queue_->isWorkAvailable());
+  }
+}
+
+TEST_CASE_METHOD(SwapTestController, "Polling from load task", "[SwapTest8]") {
+  setLimits(2, 4, 6);
+  pushAll({50, 20, 30, 60, 10, 40, 28});
+  popAll({10, 20, 28});
+  pushAll({45});
+  verifyQueue({30}, {{45}}, {});
+
+  flow_repo_->load_tasks_[0].complete();
+
+  clock_->advance(std::chrono::seconds{100});
+
+  popAll({30, 40, 45, 50, 60}, true);
+  verifyQueue({}, {}, {});
+}
+
+TEST_CASE_METHOD(SwapTestController, "Popping below min checks if the pending 
load is finished", "[SwapTest8]") {
+  setLimits(6, 8, 10);
+  pushAll({10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120});
+  verifyQueue({10, 20, 30, 40, 50, 60, 70, 80}, {}, {90, 100, 110, 120});
+  clock_->advance(std::chrono::seconds{200});
+  clearSwapEvents();
+  popAll({10, 20, 30});
+  verifySwapEvents({{Load, {90, 100, 110}}});
+  verifyQueue({40, 50, 60, 70, 80}, {{}}, {120});
+  clearSwapEvents();
+
+  popAll({40, 50});
+  verifyQueue({60, 70, 80}, {{}}, {120});
+  flow_repo_->load_tasks_[0].complete();
+  popAll({60});
+  // even though the live queue is not empty we check if
+  // the load_task is finished and initiate a load if need be
+  verifySwapEvents({{Load, {120}}});
+  verifyQueue({70, 80, 90, 100, 110}, {{}}, {});
+}
diff --git a/libminifi/test/unit/FlowFileQueueTests.cpp 
b/libminifi/test/unit/FlowFileQueueTests.cpp
index 51b287a62..0d0c1b2b9 100644
--- a/libminifi/test/unit/FlowFileQueueTests.cpp
+++ b/libminifi/test/unit/FlowFileQueueTests.cpp
@@ -28,7 +28,7 @@ TEST_CASE("After construction, a FlowFileQueue is empty", 
"[FlowFileQueue]") {
   utils::FlowFileQueue queue;
 
   REQUIRE(queue.empty());
-  REQUIRE(queue.size() == 0);
+  REQUIRE(queue.size() == 0);  // NOLINT(readability-container-size-empty)
   REQUIRE_FALSE(queue.isWorkAvailable());
   REQUIRE_THROWS(queue.pop());
 }
@@ -75,11 +75,26 @@ TEST_CASE("If three flow files are added to the 
FlowFileQueue, we can pop them i
   REQUIRE_FALSE(queue.isWorkAvailable());
 }
 
+TEST_CASE("Cannot add flow files in the past preempting others", 
"[FlowFileQueue][pop]") {
+  utils::FlowFileQueue queue;
+  const auto flow_file_1 = std::make_shared<core::FlowFile>();
+  queue.push(flow_file_1);
+  const auto flow_file_2 = std::make_shared<core::FlowFile>();
+  flow_file_2->penalize(std::chrono::seconds{-10});
+  queue.push(flow_file_2);
+
+  REQUIRE(queue.isWorkAvailable());
+  REQUIRE(queue.pop() == flow_file_1);
+  REQUIRE(queue.isWorkAvailable());
+  REQUIRE(queue.pop() == flow_file_2);
+  REQUIRE_FALSE(queue.isWorkAvailable());
+}
+
 namespace {
 
 class PenaltyHasExpired {
  public:
-  explicit PenaltyHasExpired(const std::shared_ptr<core::FlowFile>& flow_file) 
: flow_file_(flow_file) {}
+  explicit PenaltyHasExpired(std::shared_ptr<core::FlowFile> flow_file) : 
flow_file_(std::move(flow_file)) {}
   bool operator()() { return !flow_file_->isPenalized(); }
 
  private:
diff --git a/libminifi/test/unit/ProvenanceTestHelper.h 
b/libminifi/test/unit/ProvenanceTestHelper.h
index 65c91bf1f..dd724061a 100644
--- a/libminifi/test/unit/ProvenanceTestHelper.h
+++ b/libminifi/test/unit/ProvenanceTestHelper.h
@@ -33,6 +33,7 @@
 #include "FlowController.h"
 #include "properties/Configure.h"
 #include "provenance/Provenance.h"
+#include "SwapManager.h"
 
 #if defined(__clang__)
 #pragma clang diagnostic push
@@ -227,7 +228,8 @@ class TestFlowRepository : public 
org::apache::nifi::minifi::core::Repository {
     }
   }
 
-  void loadComponent(const 
std::shared_ptr<org::apache::nifi::minifi::core::ContentRepository>& 
/*content_repo*/) override {
+  void loadComponent(const 
std::shared_ptr<org::apache::nifi::minifi::core::ContentRepository>& 
content_repo) override {
+    content_repo_ = content_repo;
   }
 
   void run() override {
@@ -237,6 +239,7 @@ class TestFlowRepository : public 
org::apache::nifi::minifi::core::Repository {
  protected:
   mutable std::mutex repository_results_mutex_;
   std::map<std::string, std::string> repository_results_;
+  std::shared_ptr<org::apache::nifi::minifi::core::ContentRepository> 
content_repo_;
 };
 
 class TestFlowController : public org::apache::nifi::minifi::FlowController {
diff --git a/libminifi/test/unit/SwapTestController.h 
b/libminifi/test/unit/SwapTestController.h
new file mode 100644
index 000000000..0a921a754
--- /dev/null
+++ b/libminifi/test/unit/SwapTestController.h
@@ -0,0 +1,240 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <span>
+#include <string>
+#include <vector>
+#include <utility>
+#include <memory>
+
+#include "../TestBase.h"
+#include "TestUtils.h"
+#include "../Utils.h"
+#include "../Catch.h"
+#include "../unit/ProvenanceTestHelper.h"
+
+
+using Timepoint = std::chrono::time_point<std::chrono::steady_clock>;
+
+enum EventKind {
+  Store, Load
+};
+
+struct SwapEvent {
+  EventKind kind;
+  std::vector<minifi::SwappedFlowFile> flow_files;
+
+  void verifyTimes(std::initializer_list<unsigned> seconds) {
+    REQUIRE(flow_files.size() == seconds.size());
+    size_t idx = 0;
+    for (auto& second : seconds) {
+      REQUIRE(flow_files[idx].to_be_processed_after == 
Timepoint{std::chrono::seconds{second}});
+      ++idx;
+    }
+  }
+};
+
+class SwappingFlowFileTestRepo : public TestFlowRepository, public 
minifi::SwapManager {
+ public:
+  SwappingFlowFileTestRepo()
+      : core::SerializableComponent("ff") {}
+
+  void store(std::vector<std::shared_ptr<core::FlowFile>> flow_files) override 
{
+    std::vector<minifi::SwappedFlowFile> ids;
+    for (const auto& ff : flow_files) {
+      ids.push_back(minifi::SwappedFlowFile{ff->getUUID(), 
ff->getPenaltyExpiration()});
+      minifi::io::BufferStream output;
+      std::static_pointer_cast<minifi::FlowFileRecord>(ff)->Serialize(output);
+      Put(ff->getUUIDStr().c_str(), reinterpret_cast<const 
uint8_t*>(output.getBuffer().data()), output.size());
+    }
+    swap_events_.push_back({Store, ids});
+  }
+
+  std::future<std::vector<std::shared_ptr<core::FlowFile>>> 
load(std::vector<minifi::SwappedFlowFile> flow_files) override {
+    swap_events_.push_back({Load, flow_files});
+    LoadTask load_task;
+    auto future = load_task.promise.get_future();
+    load_task.result.reserve(flow_files.size());
+    for (const auto& ff_id : flow_files) {
+      std::string value;
+      Get(ff_id.id.to_string().c_str(), value);
+      utils::Identifier container_id;
+      auto ff = 
minifi::FlowFileRecord::DeSerialize(std::as_bytes(std::span(value)), 
content_repo_, container_id);
+      ff->setPenaltyExpiration(ff_id.to_be_processed_after);
+      load_task.result.push_back(std::move(ff));
+    }
+    load_tasks_.push_back(std::move(load_task));
+    return future;
+  }
+
+  struct LoadTask {
+    std::promise<std::vector<std::shared_ptr<core::FlowFile>>> promise;
+    std::vector<std::shared_ptr<core::FlowFile>> result;
+
+    void complete() {
+      promise.set_value(result);
+    }
+  };
+
+  std::vector<LoadTask> load_tasks_;
+  std::vector<SwapEvent> swap_events_;
+};
+
+using FlowFilePtr = std::shared_ptr<core::FlowFile>;
+using FlowFilePtrVec = std::vector<FlowFilePtr>;
+
+struct FlowFileComparator {
+  bool operator()(const FlowFilePtr& left, const FlowFilePtr& right) const {
+    return left->getPenaltyExpiration() < right->getPenaltyExpiration();
+  }
+};
+
+struct VerifiedQueue {
+  void push(FlowFilePtr ff) {
+    size();
+    impl.push(ff);
+    ref_.insert(std::lower_bound(ref_.begin(), ref_.end(), ff, 
FlowFileComparator{}), ff);
+    size();
+  }
+
+  FlowFilePtr poll() {
+    size();
+    FlowFilePtr ff = impl.pop();
+    REQUIRE(!ref_.empty());
+    // the order when flow files have the same penalty is not fixed
+    REQUIRE(ff->getPenaltyExpiration() == 
ref_.front()->getPenaltyExpiration());
+    ref_.erase(ref_.begin());
+    size();
+    return ff;
+  }
+
+  void verify(std::initializer_list<unsigned> live, 
std::optional<std::initializer_list<unsigned>> inter, 
std::initializer_list<unsigned> swapped) const {
+    // check live ffs
+    auto live_copy = FlowFileQueueTestAccessor::get_queue_(impl);
+    REQUIRE(live_copy.size() == live.size());
+    for (auto sec : live) {
+      auto min = live_copy.popMin();
+      REQUIRE(min->getPenaltyExpiration() == 
Timepoint{std::chrono::seconds{sec}});
+    }
+
+    // check inter ffs
+    if (!inter) {
+      
REQUIRE_FALSE(FlowFileQueueTestAccessor::get_load_task_(impl).has_value());
+    } else {
+      auto& intermediate = 
FlowFileQueueTestAccessor::get_load_task_(impl)->intermediate_items;
+      REQUIRE(intermediate.size() == inter->size());
+      size_t idx = 0;
+      for (auto sec : inter.value()) {
+        REQUIRE(intermediate[idx]->getPenaltyExpiration() == 
Timepoint{std::chrono::seconds{sec}});
+        ++idx;
+      }
+    }
+
+    // check swapped ffs
+    auto swapped_copy = 
FlowFileQueueTestAccessor::get_swapped_flow_files_(impl);
+    REQUIRE(swapped_copy.size() == swapped.size());
+    for (auto sec : swapped) {
+      auto min = swapped_copy.popMin();
+      REQUIRE(min.to_be_processed_after == 
Timepoint{std::chrono::seconds{sec}});
+    }
+  }
+
+  bool isWorkAvailable() const {
+    return impl.isWorkAvailable();
+  }
+
+  size_t size() const {
+    size_t result = impl.size();
+    REQUIRE(result == ref_.size());
+    return result;
+  }
+
+  VerifiedQueue(std::shared_ptr<minifi::SwapManager> swap_manager, 
std::unique_ptr<utils::timeutils::SteadyClock> clock)
+    : impl(std::move(swap_manager)) {
+    FlowFileQueueTestAccessor::get_clock_(impl) = std::move(clock);
+  }
+
+  utils::FlowFileQueue impl;
+  FlowFilePtrVec ref_;
+};
+
+class SwapTestController : public TestController {
+ public:
+  SwapTestController() {
+    content_repo_ = 
std::make_shared<core::repository::VolatileContentRepository>();
+    flow_repo_ = std::make_shared<SwappingFlowFileTestRepo>();
+    flow_repo_->loadComponent(content_repo_);
+    auto clock = std::make_unique<utils::ManualSteadyClock>();
+    clock_ = clock.get();
+    queue_ = 
std::make_shared<VerifiedQueue>(std::static_pointer_cast<minifi::SwapManager>(flow_repo_),
 std::move(clock));
+  }
+
+  void setLimits(size_t min_size, size_t target_size, size_t max_size) {
+    queue_->impl.setMinSize(min_size);
+    queue_->impl.setTargetSize(target_size);
+    queue_->impl.setMaxSize(max_size);
+  }
+
+  struct SwapEventPattern {
+    EventKind kind;
+    std::initializer_list<unsigned > seconds;
+  };
+
+  void verifySwapEvents(std::vector<SwapEventPattern> events) {
+    REQUIRE(flow_repo_->swap_events_.size() == events.size());
+    size_t idx = 0;
+    for (auto& pattern : events) {
+      REQUIRE(pattern.kind == flow_repo_->swap_events_[idx].kind);
+      flow_repo_->swap_events_[idx].verifyTimes(pattern.seconds);
+    }
+  }
+
+  void clearSwapEvents() {
+    flow_repo_->swap_events_.clear();
+  }
+
+  void verifyQueue(std::initializer_list<unsigned> live, 
std::optional<std::initializer_list<unsigned>> inter, 
std::initializer_list<unsigned> swapped) {
+    queue_->verify(live, inter, swapped);
+  }
+
+  void pushAll(std::initializer_list<unsigned> seconds) {
+    for (auto sec : seconds) {
+      auto ff = 
std::static_pointer_cast<core::FlowFile>(std::make_shared<minifi::FlowFileRecord>());
+      ff->setPenaltyExpiration(Timepoint{std::chrono::seconds{sec}});
+      queue_->push(std::move(ff));
+    }
+  }
+
+  void popAll(std::initializer_list<unsigned> seconds, bool 
check_is_work_available = false) {
+    for (auto sec : seconds) {
+      if (check_is_work_available) {
+        REQUIRE(queue_->isWorkAvailable());
+      }
+      auto ff = queue_->poll();
+      REQUIRE(ff->getPenaltyExpiration() == 
Timepoint{std::chrono::seconds{sec}});
+    }
+  }
+
+  std::shared_ptr<SwappingFlowFileTestRepo> flow_repo_;
+  std::shared_ptr<core::repository::VolatileContentRepository> content_repo_;
+  std::shared_ptr<VerifiedQueue> queue_;
+  // owned by the queue_
+  utils::ManualSteadyClock* clock_;
+};

Reply via email to