This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new a3835e193 MINIFICPP-1812 Clean up Repository threads
a3835e193 is described below

commit a3835e1938db1287b0063d159b453b8a70f7a48b
Author: Adam Markovics <[email protected]>
AuthorDate: Thu Sep 1 12:21:00 2022 +0200

    MINIFICPP-1812 Clean up Repository threads
    
    Closes #1328
    Signed-off-by: Marton Szasz <[email protected]>
---
 controller/Controller.h                            |  22 +++-
 extensions/coap/tests/CoapIntegrationBase.h        |   4 +-
 extensions/http-curl/tests/C2PauseResumeTest.cpp   |   2 +-
 .../tests/ControllerServiceIntegrationTests.cpp    |   5 +-
 extensions/http-curl/tests/VerifyInvokeHTTP.h      |   2 +-
 .../tests/PrometheusMetricsPublisherTest.cpp       |   4 +-
 .../rocksdb-repos/DatabaseContentRepository.h      |   2 +-
 extensions/rocksdb-repos/FlowFileRepository.cpp    |   6 +-
 extensions/rocksdb-repos/FlowFileRepository.h      |  70 +++++------
 extensions/rocksdb-repos/ProvenanceRepository.cpp  |   2 +-
 extensions/rocksdb-repos/ProvenanceRepository.h    |  77 +++++-------
 .../tests/integration/TestExecuteProcess.cpp       |   8 +-
 .../tests/unit/YamlConfigurationTests.cpp          |  56 ++++-----
 libminifi/include/core/ContentRepository.h         |  15 +--
 libminifi/include/core/Repository.h                | 116 ++++++-----------
 libminifi/include/core/RepositoryFactory.h         |  21 +---
 libminifi/include/core/ThreadedRepository.h        | 106 ++++++++++++++++
 .../include/core/repository/FileSystemRepository.h |  32 ++---
 .../core/repository/VolatileContentRepository.h    |  58 ++++-----
 .../core/repository/VolatileFlowFileRepository.h   |  23 ++--
 .../core/repository/VolatileProvenanceRepository.h |  25 ++--
 .../include/core/repository/VolatileRepository.h   | 140 ++++++++-------------
 libminifi/src/FlowController.cpp                   |  13 +-
 libminifi/src/c2/C2Client.cpp                      |   1 +
 libminifi/src/core/Repository.cpp                  |  64 ----------
 libminifi/src/core/RepositoryFactory.cpp           |  93 +++++++-------
 .../src/core/repository/FileSystemRepository.cpp   |  26 ++--
 .../core/repository/VolatileContentRepository.cpp  |  19 ---
 libminifi/test/flow-tests/SessionTests.cpp         |   3 +-
 libminifi/test/flow-tests/TestControllerWithFlow.h |   2 +-
 libminifi/test/integration/IntegrationBase.h       |   4 +-
 .../test/integration/ProvenanceReportingTest.cpp   |   5 +-
 .../test/persistence-tests/PersistenceTests.cpp    |   6 +-
 libminifi/test/rocksdb-tests/ProvenanceTests.cpp   |  16 +--
 libminifi/test/rocksdb-tests/RepoTests.cpp         |  11 +-
 libminifi/test/unit/MetricsTests.cpp               |   2 +-
 libminifi/test/unit/ProvenanceTestHelper.h         | 136 +++++++++-----------
 libminifi/test/unit/SchedulingAgentTests.cpp       |   4 +-
 minifi_main/MiNiFiMain.cpp                         |  10 +-
 nanofi/include/cxx/Instance.h                      |   3 +-
 40 files changed, 543 insertions(+), 671 deletions(-)

diff --git a/controller/Controller.h b/controller/Controller.h
index 0254d8ef6..b30f79748 100644
--- a/controller/Controller.h
+++ b/controller/Controller.h
@@ -26,6 +26,7 @@
 #include "io/ClientSocket.h"
 #include "c2/ControllerSocketProtocol.h"
 #include "utils/gsl.h"
+#include "Exception.h"
 #include "FlowController.h"
 
 /**
@@ -246,15 +247,22 @@ 
std::shared_ptr<org::apache::nifi::minifi::core::controller::ControllerService>
 
   
configuration->get(org::apache::nifi::minifi::Configure::nifi_provenance_repository_class_name,
 prov_repo_class);
   // Create repos for flow record and provenance
-  const std::shared_ptr prov_repo = 
org::apache::nifi::minifi::core::createRepository(prov_repo_class, true, 
"provenance");
+  const std::shared_ptr prov_repo = 
org::apache::nifi::minifi::core::createRepository(prov_repo_class, 
"provenance");
+  if (!prov_repo) {
+    throw 
org::apache::nifi::minifi::Exception(org::apache::nifi::minifi::REPOSITORY_EXCEPTION,
 "Could not create provenance repository");
+  }
   prov_repo->initialize(configuration);
 
   
configuration->get(org::apache::nifi::minifi::Configure::nifi_flow_repository_class_name,
 flow_repo_class);
 
-  const std::shared_ptr flow_repo = 
org::apache::nifi::minifi::core::createRepository(flow_repo_class, true, 
"flowfile");
+  const std::shared_ptr flow_repo = 
org::apache::nifi::minifi::core::createRepository(flow_repo_class, "flowfile");
+  if (!flow_repo) {
+    throw 
org::apache::nifi::minifi::Exception(org::apache::nifi::minifi::REPOSITORY_EXCEPTION,
 "Could not create flowfile repository");
+  }
 
   flow_repo->initialize(configuration);
 
+
   
configuration->get(org::apache::nifi::minifi::Configure::nifi_content_repository_class_name,
 content_repo_class);
 
   const std::shared_ptr content_repo = 
org::apache::nifi::minifi::core::createContentRepository(content_repo_class, 
true, "content");
@@ -295,12 +303,18 @@ void printManifest(const 
std::shared_ptr<org::apache::nifi::minifi::Configure> &
 
   
configuration->get(org::apache::nifi::minifi::Configure::nifi_provenance_repository_class_name,
 prov_repo_class);
   // Create repos for flow record and provenance
-  const std::shared_ptr prov_repo = 
org::apache::nifi::minifi::core::createRepository(prov_repo_class, true, 
"provenance");
+  const std::shared_ptr prov_repo = 
org::apache::nifi::minifi::core::createRepository(prov_repo_class, 
"provenance");
+  if (!prov_repo) {
+    throw 
org::apache::nifi::minifi::Exception(org::apache::nifi::minifi::REPOSITORY_EXCEPTION,
 "Could not create provenance repository");
+  }
   prov_repo->initialize(configuration);
 
   
configuration->get(org::apache::nifi::minifi::Configure::nifi_flow_repository_class_name,
 flow_repo_class);
 
-  const std::shared_ptr flow_repo = 
org::apache::nifi::minifi::core::createRepository(flow_repo_class, true, 
"flowfile");
+  const std::shared_ptr flow_repo = 
org::apache::nifi::minifi::core::createRepository(flow_repo_class, "flowfile");
+  if (!flow_repo) {
+    throw 
org::apache::nifi::minifi::Exception(org::apache::nifi::minifi::REPOSITORY_EXCEPTION,
 "Could not create flowfile repository");
+  }
 
   flow_repo->initialize(configuration);
 
diff --git a/extensions/coap/tests/CoapIntegrationBase.h 
b/extensions/coap/tests/CoapIntegrationBase.h
index 964b0de1a..7f62adf50 100644
--- a/extensions/coap/tests/CoapIntegrationBase.h
+++ b/extensions/coap/tests/CoapIntegrationBase.h
@@ -53,7 +53,7 @@ class CoapIntegrationBase : public IntegrationBase {
   void run(const std::optional<std::string>& test_file_location = {}, const 
std::optional<std::string>& = {}) override {
     testSetup();
 
-    std::shared_ptr<core::Repository> test_repo = 
std::make_shared<TestRepository>();
+    std::shared_ptr<core::Repository> test_repo = 
std::make_shared<TestThreadedRepository>();
     std::shared_ptr<core::Repository> test_flow_repo = 
std::make_shared<TestFlowRepository>();
 
     if (test_file_location) {
@@ -72,8 +72,6 @@ class CoapIntegrationBase : public IntegrationBase {
 
     queryRootProcessGroup(pg);
 
-    std::shared_ptr<TestRepository> repo = 
std::static_pointer_cast<TestRepository>(test_repo);
-
     std::shared_ptr<minifi::FlowController> controller = 
std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, 
configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME,
       std::make_shared<utils::file::FileSystem>(), []{});
 
diff --git a/extensions/http-curl/tests/C2PauseResumeTest.cpp 
b/extensions/http-curl/tests/C2PauseResumeTest.cpp
index d8294d849..bc152618c 100644
--- a/extensions/http-curl/tests/C2PauseResumeTest.cpp
+++ b/extensions/http-curl/tests/C2PauseResumeTest.cpp
@@ -123,7 +123,7 @@ int main(int argc, char **argv) {
   harness.setKeyDir(args.key_dir);
   PauseResumeHandler responder{flow_resumed_successfully, 
harness.getConfiguration()};
 
-  std::shared_ptr<core::Repository> test_repo = 
std::make_shared<TestRepository>();
+  std::shared_ptr<core::Repository> test_repo = 
std::make_shared<TestThreadedRepository>();
   std::shared_ptr<core::Repository> test_flow_repo = 
std::make_shared<TestFlowRepository>();
   std::shared_ptr<minifi::Configure> configuration = 
std::make_shared<minifi::Configure>();
   configuration->set(minifi::Configure::nifi_default_directory, args.key_dir);
diff --git a/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp 
b/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp
index 12525b06d..a89a5cf61 100644
--- a/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp
+++ b/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp
@@ -53,14 +53,12 @@ int main(int argc, char **argv) {
 
   std::shared_ptr<minifi::Configure> configuration = 
std::make_shared<minifi::Configure>();
 
-  std::shared_ptr<core::Repository> test_repo = 
std::make_shared<TestRepository>();
+  std::shared_ptr<core::Repository> test_repo = 
std::make_shared<TestThreadedRepository>();
   std::shared_ptr<core::Repository> test_flow_repo = 
std::make_shared<TestFlowRepository>();
 
   configuration->set(minifi::Configure::nifi_flow_configuration_file, 
args.test_file);
-  std::string client_cert = "cn.crt.pem";
   std::string priv_key_file = "cn.ckey.pem";
   std::string passphrase = "cn.pass";
-  std::string ca_cert = "nifi-cert.pem";
   configuration->set(minifi::Configure::nifi_security_client_certificate, 
args.test_file);
   configuration->set(minifi::Configure::nifi_security_client_private_key, 
priv_key_file);
   configuration->set(minifi::Configure::nifi_security_client_pass_phrase, 
passphrase);
@@ -71,7 +69,6 @@ int main(int argc, char **argv) {
   content_repo->initialize(configuration);
   std::unique_ptr<core::FlowConfiguration> yaml_ptr = 
std::make_unique<core::YamlConfiguration>(
       test_repo, test_repo, content_repo, stream_factory, configuration, 
args.test_file);
-  std::shared_ptr<TestRepository> repo = 
std::static_pointer_cast<TestRepository>(test_repo);
 
   const auto controller = std::make_shared<minifi::FlowController>(test_repo, 
test_flow_repo, configuration, std::move(yaml_ptr),
       content_repo,
diff --git a/extensions/http-curl/tests/VerifyInvokeHTTP.h 
b/extensions/http-curl/tests/VerifyInvokeHTTP.h
index fbca3f9b0..a4ce0196d 100644
--- a/extensions/http-curl/tests/VerifyInvokeHTTP.h
+++ b/extensions/http-curl/tests/VerifyInvokeHTTP.h
@@ -81,7 +81,7 @@ class VerifyInvokeHTTP : public HTTPIntegrationBase {
   virtual void setupFlow(const std::optional<std::string>& flow_yml_path) {
     testSetup();
 
-    std::shared_ptr<core::Repository> test_repo = 
std::make_shared<TestRepository>();
+    std::shared_ptr<core::Repository> test_repo = 
std::make_shared<TestThreadedRepository>();
     std::shared_ptr<core::Repository> test_flow_repo = 
std::make_shared<TestFlowRepository>();
 
     if (flow_yml_path) {
diff --git a/extensions/prometheus/tests/PrometheusMetricsPublisherTest.cpp 
b/extensions/prometheus/tests/PrometheusMetricsPublisherTest.cpp
index 111089909..b10b779d0 100644
--- a/extensions/prometheus/tests/PrometheusMetricsPublisherTest.cpp
+++ b/extensions/prometheus/tests/PrometheusMetricsPublisherTest.cpp
@@ -49,8 +49,8 @@ class PrometheusPublisherTestFixture {
  public:
   explicit PrometheusPublisherTestFixture(bool user_dummy_exposer)
     : configuration_(std::make_shared<Configure>()),
-      provenance_repo_(core::createRepository("provenancerepository", true)),
-      flow_file_repo_(core::createRepository("flowfilerepository", true)),
+      provenance_repo_(core::createRepository("provenancerepository")),
+      flow_file_repo_(core::createRepository("flowfilerepository")),
       response_node_loader_(configuration_, provenance_repo_, flow_file_repo_, 
nullptr) {
     std::unique_ptr<DummyMetricsExposer> dummy_exposer;
     if (user_dummy_exposer) {
diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.h 
b/extensions/rocksdb-repos/DatabaseContentRepository.h
index e93071589..08e6a4896 100644
--- a/extensions/rocksdb-repos/DatabaseContentRepository.h
+++ b/extensions/rocksdb-repos/DatabaseContentRepository.h
@@ -62,7 +62,7 @@ class DatabaseContentRepository : public 
core::ContentRepository, public core::C
 
   bool initialize(const std::shared_ptr<minifi::Configure> &configuration) 
override;
 
-  void stop() override;
+  void stop();
 
   std::shared_ptr<io::BaseStream> write(const minifi::ResourceClaim &claim, 
bool append = false) override;
 
diff --git a/extensions/rocksdb-repos/FlowFileRepository.cpp 
b/extensions/rocksdb-repos/FlowFileRepository.cpp
index 39355dc80..2412bf97f 100644
--- a/extensions/rocksdb-repos/FlowFileRepository.cpp
+++ b/extensions/rocksdb-repos/FlowFileRepository.cpp
@@ -111,10 +111,10 @@ void FlowFileRepository::printStats() {
 
 void FlowFileRepository::run() {
   auto last = std::chrono::steady_clock::now();
-  if (running_) {
+  if (isRunning()) {
     prune_stored_flowfiles();
   }
-  while (running_) {
+  while (isRunning()) {
     std::this_thread::sleep_for(purge_period_);
     flush();
     auto now = std::chrono::steady_clock::now();
@@ -193,7 +193,7 @@ void FlowFileRepository::prune_stored_flowfiles() {
   }
 }
 
-bool FlowFileRepository::ExecuteWithRetry(std::function<rocksdb::Status()> 
operation) {
+bool FlowFileRepository::ExecuteWithRetry(const 
std::function<rocksdb::Status()>& operation) {
   std::chrono::milliseconds waitTime = 0ms;
   for (int i=0; i < 3; ++i) {
     auto status = operation();
diff --git a/extensions/rocksdb-repos/FlowFileRepository.h 
b/extensions/rocksdb-repos/FlowFileRepository.h
index 487363cd1..86e9d652e 100644
--- a/extensions/rocksdb-repos/FlowFileRepository.h
+++ b/extensions/rocksdb-repos/FlowFileRepository.h
@@ -26,10 +26,10 @@
 #include "rocksdb/options.h"
 #include "rocksdb/slice.h"
 #include "rocksdb/utilities/checkpoint.h"
-#include "core/Repository.h"
 #include "core/Core.h"
-#include "Connection.h"
 #include "core/logging/LoggerConfiguration.h"
+#include "core/ThreadedRepository.h"
+#include "Connection.h"
 #include "concurrentqueue.h"
 #include "database/RocksDatabase.h"
 #include "encryption/RocksDbEncryptionProvider.h"
@@ -37,17 +37,18 @@
 #include "SwapManager.h"
 #include "FlowFileLoader.h"
 #include "range/v3/algorithm/all_of.hpp"
+#include "utils/Literals.h"
 
 namespace org::apache::nifi::minifi::core::repository {
 
 #ifdef WIN32
-#define FLOWFILE_REPOSITORY_DIRECTORY ".\\flowfile_repository"
-#define FLOWFILE_CHECKPOINT_DIRECTORY ".\\flowfile_checkpoint"
+constexpr auto FLOWFILE_REPOSITORY_DIRECTORY = ".\\flowfile_repository";
+constexpr auto FLOWFILE_CHECKPOINT_DIRECTORY = ".\\flowfile_checkpoint";
 #else
-#define FLOWFILE_REPOSITORY_DIRECTORY "./flowfile_repository"
-#define FLOWFILE_CHECKPOINT_DIRECTORY "./flowfile_checkpoint"
+constexpr auto FLOWFILE_REPOSITORY_DIRECTORY = "./flowfile_repository";
+constexpr auto FLOWFILE_CHECKPOINT_DIRECTORY = "./flowfile_checkpoint";
 #endif
-#define MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE (10*1024*1024)  // 10M
+constexpr auto MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE = 10_MiB;
 constexpr auto MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME = 
std::chrono::minutes(10);
 constexpr auto FLOWFILE_REPOSITORY_PURGE_PERIOD = std::chrono::seconds(2);
 constexpr auto FLOWFILE_REPOSITORY_RETRY_INTERVAL_INCREMENTS = 
std::chrono::milliseconds(500);
@@ -56,23 +57,22 @@ constexpr auto 
FLOWFILE_REPOSITORY_RETRY_INTERVAL_INCREMENTS = std::chrono::mill
  * Flow File repository
  * Design: Extends Repository and implements the run function, using rocksdb 
as the primary substrate.
  */
-class FlowFileRepository : public core::Repository, public SwapManager, public 
std::enable_shared_from_this<FlowFileRepository> {
+class FlowFileRepository : public ThreadedRepository, public SwapManager {
  public:
   static constexpr const char* ENCRYPTION_KEY_NAME = 
"nifi.flowfile.repository.encryption.key";
-  // Constructor
 
   FlowFileRepository(const std::string& name, const utils::Identifier& 
/*uuid*/)
       : FlowFileRepository(name) {
   }
 
-  explicit FlowFileRepository(const std::string repo_name = "",
-                     const std::string& checkpoint_dir = 
FLOWFILE_CHECKPOINT_DIRECTORY,
+  explicit FlowFileRepository(const std::string& repo_name = "",
+                     std::string checkpoint_dir = 
FLOWFILE_CHECKPOINT_DIRECTORY,
                      std::string directory = FLOWFILE_REPOSITORY_DIRECTORY,
                      std::chrono::milliseconds maxPartitionMillis = 
MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME,
                      int64_t maxPartitionBytes = 
MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE,
                      std::chrono::milliseconds purgePeriod = 
FLOWFILE_REPOSITORY_PURGE_PERIOD)
       : core::SerializableComponent(repo_name),
-        Repository(repo_name.length() > 0 ? repo_name : 
core::getClassName<FlowFileRepository>(), std::move(directory), 
maxPartitionMillis, maxPartitionBytes, purgePeriod),
+        ThreadedRepository(repo_name.length() > 0 ? repo_name : 
core::getClassName<FlowFileRepository>(), std::move(directory), 
maxPartitionMillis, maxPartitionBytes, purgePeriod),
         checkpoint_dir_(std::move(checkpoint_dir)),
         content_repo_(nullptr),
         checkpoint_(nullptr),
@@ -88,7 +88,7 @@ class FlowFileRepository : public core::Repository, public 
SwapManager, public s
   EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
   EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
 
-  bool isNoop() override {
+  bool isNoop() const override {
     return false;
   }
 
@@ -96,7 +96,6 @@ class FlowFileRepository : public core::Repository, public 
SwapManager, public s
 
   virtual void printStats();
 
-  // initialize
   bool initialize(const std::shared_ptr<Configure> &configure) override {
     config_ = configure;
     std::string value;
@@ -141,9 +140,7 @@ class FlowFileRepository : public core::Repository, public 
SwapManager, public s
     }
   }
 
-  void run() override;
-
-  bool Put(std::string key, const uint8_t *buf, size_t bufLen) override {
+  bool Put(const std::string& key, const uint8_t *buf, size_t bufLen) override 
{
     // persistent to the DB
     auto opendb = db_->open();
     if (!opendb) {
@@ -172,16 +169,15 @@ class FlowFileRepository : public core::Repository, 
public SwapManager, public s
     return ExecuteWithRetry(operation);
   }
 
-
   /**
-   *
    * Deletes the key
    * @return status of the delete operation
    */
-  bool Delete(std::string key) override {
+  bool Delete(const std::string& key) override {
     keys_to_delete.enqueue(key);
     return true;
   }
+
   /**
    * Sets the value from the provided key
    * @return status of the get operation.
@@ -196,29 +192,22 @@ class FlowFileRepository : public core::Repository, 
public SwapManager, public s
 
   void loadComponent(const std::shared_ptr<core::ContentRepository> 
&content_repo) override;
 
-  void start() override {
-    if (this->purge_period_ <= std::chrono::milliseconds(0)) {
-      return;
-    }
-    if (running_) {
-      return;
-    }
-    running_ = true;
-    thread_ = std::thread(&FlowFileRepository::run, this);
-    logger_->log_debug("%s Repository Monitor Thread Start", getName());
+  bool start() override {
+    const bool ret = ThreadedRepository::start();
     if (swap_loader_) {
       swap_loader_->start();
     }
+    return ret;
   }
 
-  void stop() override {
+  bool stop() override {
     if (swap_loader_) {
       swap_loader_->stop();
     }
-    core::Repository::stop();
+    return ThreadedRepository::stop();
   }
 
-  void store(std::vector<std::shared_ptr<core::FlowFile>> flow_files) override 
{
+  void store([[maybe_unused]] std::vector<std::shared_ptr<core::FlowFile>> 
flow_files) override {
     gsl_Expects(ranges::all_of(flow_files, &FlowFile::isStored));
     // pass, flowfiles are already persisted in the repository
   }
@@ -228,11 +217,10 @@ class FlowFileRepository : public core::Repository, 
public SwapManager, public s
   }
 
  private:
-  bool ExecuteWithRetry(std::function<rocksdb::Status()> operation);
+  void run() override;
+
+  bool ExecuteWithRetry(const std::function<rocksdb::Status()>& operation);
 
-  /**
-   * Initialize the repository
-   */
   void initialize_repository();
 
   /**
@@ -241,11 +229,12 @@ class FlowFileRepository : public core::Repository, 
public SwapManager, public s
    */
   static bool need_checkpoint(minifi::internal::OpenRocksDb& opendb);
 
-  /**
-   * Prunes stored flow files.
-   */
   void prune_stored_flowfiles();
 
+  std::thread& getThread() override {
+    return thread_;
+  }
+
   std::string checkpoint_dir_;
   moodycamel::ConcurrentQueue<std::string> keys_to_delete;
   std::shared_ptr<core::ContentRepository> content_repo_;
@@ -254,6 +243,7 @@ class FlowFileRepository : public core::Repository, public 
SwapManager, public s
   std::unique_ptr<FlowFileLoader> swap_loader_;
   std::shared_ptr<logging::Logger> logger_;
   std::shared_ptr<minifi::Configure> config_;
+  std::thread thread_;
 };
 
 }  // namespace org::apache::nifi::minifi::core::repository
diff --git a/extensions/rocksdb-repos/ProvenanceRepository.cpp 
b/extensions/rocksdb-repos/ProvenanceRepository.cpp
index ee30c9f39..d4619ad6c 100644
--- a/extensions/rocksdb-repos/ProvenanceRepository.cpp
+++ b/extensions/rocksdb-repos/ProvenanceRepository.cpp
@@ -39,7 +39,7 @@ void ProvenanceRepository::printStats() {
 
 void ProvenanceRepository::run() {
   size_t count = 0;
-  while (running_) {
+  while (isRunning()) {
     std::this_thread::sleep_for(std::chrono::seconds(1));
     count++;
     // Hack, to be removed in scope of 
https://issues.apache.org/jira/browse/MINIFICPP-1145
diff --git a/extensions/rocksdb-repos/ProvenanceRepository.h 
b/extensions/rocksdb-repos/ProvenanceRepository.h
index 713fce870..0ad58ed21 100644
--- a/extensions/rocksdb-repos/ProvenanceRepository.h
+++ b/extensions/rocksdb-repos/ProvenanceRepository.h
@@ -26,28 +26,32 @@
 #include "rocksdb/db.h"
 #include "rocksdb/options.h"
 #include "rocksdb/slice.h"
-#include "core/Repository.h"
 #include "core/Core.h"
-#include "provenance/Provenance.h"
 #include "core/logging/LoggerConfiguration.h"
+#include "core/ThreadedRepository.h"
+#include "provenance/Provenance.h"
+#include "utils/Literals.h"
 
 namespace org::apache::nifi::minifi::provenance {
 
-#define PROVENANCE_DIRECTORY "./provenance_repository"
-#define MAX_PROVENANCE_STORAGE_SIZE (10*1024*1024)  // 10M
+constexpr auto PROVENANCE_DIRECTORY = "./provenance_repository";
+constexpr auto MAX_PROVENANCE_STORAGE_SIZE = 10_MiB;
 constexpr auto MAX_PROVENANCE_ENTRY_LIFE_TIME = std::chrono::minutes(1);
 constexpr auto PROVENANCE_PURGE_PERIOD = std::chrono::milliseconds(2500);
 
-class ProvenanceRepository : public core::Repository {
+class ProvenanceRepository : public core::ThreadedRepository {
  public:
   ProvenanceRepository(const std::string& name, const utils::Identifier& 
/*uuid*/)
       : ProvenanceRepository(name) {
   }
-  explicit ProvenanceRepository(const std::string& repo_name = "", std::string 
directory = PROVENANCE_DIRECTORY, std::chrono::milliseconds maxPartitionMillis 
= MAX_PROVENANCE_ENTRY_LIFE_TIME,
-      int64_t maxPartitionBytes = MAX_PROVENANCE_STORAGE_SIZE, 
std::chrono::milliseconds purgePeriod = PROVENANCE_PURGE_PERIOD)
-      : core::SerializableComponent(repo_name),
-        Repository(repo_name.length() > 0 ? repo_name : 
core::getClassName<ProvenanceRepository>(), directory, maxPartitionMillis, 
maxPartitionBytes, purgePeriod) {
-    db_ = nullptr;
+
+  explicit ProvenanceRepository(const std::string& repo_name = "", std::string 
directory = PROVENANCE_DIRECTORY,
+      std::chrono::milliseconds maxPartitionMillis = 
MAX_PROVENANCE_ENTRY_LIFE_TIME,
+      int64_t maxPartitionBytes = MAX_PROVENANCE_STORAGE_SIZE,
+      std::chrono::milliseconds purgePeriod = PROVENANCE_PURGE_PERIOD)
+    : core::SerializableComponent(repo_name),
+      ThreadedRepository(repo_name.length() > 0 ? repo_name : 
core::getClassName<ProvenanceRepository>(), directory,
+        maxPartitionMillis, maxPartitionBytes, purgePeriod) {
   }
 
   ~ProvenanceRepository() override {
@@ -60,19 +64,10 @@ class ProvenanceRepository : public core::Repository {
 
   void printStats();
 
-  bool isNoop() override {
+  bool isNoop() const override {
     return false;
   }
 
-  void start() override {
-    if (running_)
-      return;
-    running_ = true;
-    thread_ = std::thread(&ProvenanceRepository::run, this);
-    logger_->log_debug("%s Repository Monitor Thread Start", name_);
-  }
-
-  // initialize
   bool initialize(const std::shared_ptr<org::apache::nifi::minifi::Configure> 
&config) override {
     std::string value;
     if (config->get(Configure::nifi_provenance_repository_directory_default, 
value)) {
@@ -85,9 +80,10 @@ class ProvenanceRepository : public core::Repository {
     logger_->log_debug("MiNiFi Provenance Max Partition Bytes %d", 
max_partition_bytes_);
     if (config->get(Configure::nifi_provenance_repository_max_storage_time, 
value)) {
       if (auto max_partition = 
utils::timeutils::StringToDuration<std::chrono::milliseconds>(value))
-          max_partition_millis_ = *max_partition;
+        max_partition_millis_ = *max_partition;
     }
-    logger_->log_debug("MiNiFi Provenance Max Storage Time: [%" PRId64 "] ms", 
int64_t{max_partition_millis_.count()});
+    logger_->log_debug("MiNiFi Provenance Max Storage Time: [%" PRId64 "] ms",
+                       int64_t{max_partition_millis_.count()});
     rocksdb::Options options;
     options.create_if_missing = true;
     options.use_direct_io_for_flush_and_compaction = true;
@@ -121,8 +117,8 @@ class ProvenanceRepository : public core::Repository {
 
     return true;
   }
-  // Put
-  bool Put(std::string key, const uint8_t *buf, size_t bufLen) override {
+
+  bool Put(const std::string& key, const uint8_t *buf, size_t bufLen) override 
{
     // persist to the DB
     rocksdb::Slice value((const char *) buf, bufLen);
     return db_->Put(rocksdb::WriteOptions(), key, value).ok();
@@ -140,12 +136,11 @@ class ProvenanceRepository : public core::Repository {
     return db_->Write(rocksdb::WriteOptions(), &batch).ok();
   }
 
-  // Delete
-  bool Delete(std::string /*key*/) override {
+  bool Delete(const std::string& /*key*/) override {
     // The repo is cleaned up by itself, there is no need to delete items.
     return true;
   }
-  // Get
+
   bool Get(const std::string &key, std::string &value) override {
     return db_->Get(rocksdb::ReadOptions(), key, &value).ok();
   }
@@ -168,7 +163,8 @@ class ProvenanceRepository : public core::Repository {
     return true;
   }
 
-  bool DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> 
&records, size_t &max_size, 
std::function<std::shared_ptr<core::SerializableComponent>()> lambda) override {
+  bool DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> 
&records, size_t &max_size,
+                   
std::function<std::shared_ptr<core::SerializableComponent>()> lambda) override {
     std::unique_ptr<rocksdb::Iterator> 
it(db_->NewIterator(rocksdb::ReadOptions()));
     size_t requested_batch = max_size;
     max_size = 0;
@@ -185,19 +181,6 @@ class ProvenanceRepository : public core::Repository {
     return max_size > 0;
   }
 
-  void getProvenanceRecord(std::vector<std::shared_ptr<ProvenanceEventRecord>> 
&records, int maxSize) {
-    std::unique_ptr<rocksdb::Iterator> 
it(db_->NewIterator(rocksdb::ReadOptions()));
-    for (it->SeekToFirst(); it->Valid(); it->Next()) {
-      std::shared_ptr<ProvenanceEventRecord> eventRead = 
std::make_shared<ProvenanceEventRecord>();
-      std::string key = it->key().ToString();
-      if (records.size() >= (uint64_t)maxSize)
-        break;
-      if (eventRead->DeSerialize(gsl::make_span(it->value()).as_span<const 
std::byte>())) {
-        records.push_back(eventRead);
-      }
-    }
-  }
-
   bool DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> 
&store, size_t &max_size) override {
     std::unique_ptr<rocksdb::Iterator> 
it(db_->NewIterator(rocksdb::ReadOptions()));
     max_size = 0;
@@ -214,12 +197,9 @@ class ProvenanceRepository : public core::Repository {
     return max_size > 0;
   }
 
-  // destroy
   void destroy() {
     db_.reset();
   }
-  // Run function for the thread
-  void run() override;
 
   uint64_t getKeyCount() const {
     std::string key_count;
@@ -231,11 +211,20 @@ class ProvenanceRepository : public core::Repository {
   // Prevent default copy constructor and assignment operation
   // Only support pass by reference or pointer
   ProvenanceRepository(const ProvenanceRepository &parent) = delete;
+
   ProvenanceRepository &operator=(const ProvenanceRepository &parent) = delete;
 
  private:
+  // Run function for the thread
+  void run() override;
+
+  std::thread& getThread() override {
+    return thread_;
+  }
+
   std::unique_ptr<rocksdb::DB> db_;
   std::shared_ptr<core::logging::Logger> logger_ = 
core::logging::LoggerFactory<ProvenanceRepository>::getLogger();
+  std::thread thread_;
 };
 
 }  // namespace org::apache::nifi::minifi::provenance
diff --git 
a/extensions/standard-processors/tests/integration/TestExecuteProcess.cpp 
b/extensions/standard-processors/tests/integration/TestExecuteProcess.cpp
index 7b932244e..9d6ae3da1 100644
--- a/extensions/standard-processors/tests/integration/TestExecuteProcess.cpp
+++ b/extensions/standard-processors/tests/integration/TestExecuteProcess.cpp
@@ -50,13 +50,9 @@ int main(int /*argc*/, char ** /*argv*/) {
   std::shared_ptr<core::Processor> processor = 
std::make_shared<org::apache::nifi::minifi::processors::ExecuteProcess>("executeProcess");
   processor->setMaxConcurrentTasks(1);
 
-  std::shared_ptr<core::Repository> test_repo =
-      std::make_shared<TestRepository>();
+  std::shared_ptr<core::Repository> test_repo = 
std::make_shared<TestThreadedRepository>();
   std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
-  std::shared_ptr<TestRepository> repo =
-      std::static_pointer_cast<TestRepository>(test_repo);
-  std::shared_ptr<minifi::FlowController> controller =
-      std::make_shared<TestFlowController>(test_repo, test_repo, content_repo);
+  std::shared_ptr<minifi::FlowController> controller = 
std::make_shared<TestFlowController>(test_repo, test_repo, content_repo);
 
   utils::Identifier processoruuid = processor->getUUID();
   assert(processoruuid);
diff --git 
a/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp 
b/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp
index dad668e31..30bdd9b38 100644
--- a/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp
+++ b/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp
@@ -34,8 +34,8 @@ using namespace std::literals::chrono_literals;
 TEST_CASE("Test YAML Config Processing", "[YamlConfiguration]") {
   TestController test_controller;
 
-  std::shared_ptr<core::Repository> testProvRepo = 
core::createRepository("provenancerepository", true);
-  std::shared_ptr<core::Repository> testFlowFileRepo = 
core::createRepository("flowfilerepository", true);
+  std::shared_ptr<core::Repository> testProvRepo = 
core::createRepository("provenancerepository");
+  std::shared_ptr<core::Repository> testFlowFileRepo = 
core::createRepository("flowfilerepository");
   std::shared_ptr<minifi::Configure> configuration = 
std::make_shared<minifi::Configure>();
   std::shared_ptr<minifi::io::StreamFactory> streamFactory = 
minifi::io::StreamFactory::getInstance(configuration);
   std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
@@ -225,8 +225,8 @@ Provenance Reporting:
 TEST_CASE("Test YAML v3 Invalid Type", "[YamlConfiguration3]") {
   TestController test_controller;
 
-  std::shared_ptr<core::Repository> testProvRepo = 
core::createRepository("provenancerepository", true);
-  std::shared_ptr<core::Repository> testFlowFileRepo = 
core::createRepository("flowfilerepository", true);
+  std::shared_ptr<core::Repository> testProvRepo = 
core::createRepository("provenancerepository");
+  std::shared_ptr<core::Repository> testFlowFileRepo = 
core::createRepository("flowfilerepository");
   std::shared_ptr<minifi::Configure> configuration = 
std::make_shared<minifi::Configure>();
   std::shared_ptr<minifi::io::StreamFactory> streamFactory = 
minifi::io::StreamFactory::getInstance(configuration);
   std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
@@ -351,8 +351,8 @@ NiFi Properties Overrides: {}
 TEST_CASE("Test YAML v3 Config Processing", "[YamlConfiguration3]") {
   TestController test_controller;
 
-  std::shared_ptr<core::Repository> testProvRepo = 
core::createRepository("provenancerepository", true);
-  std::shared_ptr<core::Repository> testFlowFileRepo = 
core::createRepository("flowfilerepository", true);
+  std::shared_ptr<core::Repository> testProvRepo = 
core::createRepository("provenancerepository");
+  std::shared_ptr<core::Repository> testFlowFileRepo = 
core::createRepository("flowfilerepository");
   std::shared_ptr<minifi::Configure> configuration = 
std::make_shared<minifi::Configure>();
   std::shared_ptr<minifi::io::StreamFactory> streamFactory = 
minifi::io::StreamFactory::getInstance(configuration);
   std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
@@ -505,8 +505,8 @@ TEST_CASE("Test Dynamic Unsupported", 
"[YamlConfigurationDynamicUnsupported]") {
   logTestController.setDebug<TestPlan>();
   logTestController.setTrace<core::YamlConfiguration>();
 
-  std::shared_ptr<core::Repository> testProvRepo = 
core::createRepository("provenancerepository", true);
-  std::shared_ptr<core::Repository> testFlowFileRepo = 
core::createRepository("flowfilerepository", true);
+  std::shared_ptr<core::Repository> testProvRepo = 
core::createRepository("provenancerepository");
+  std::shared_ptr<core::Repository> testFlowFileRepo = 
core::createRepository("flowfilerepository");
   std::shared_ptr<minifi::Configure> configuration = 
std::make_shared<minifi::Configure>();
   std::shared_ptr<minifi::io::StreamFactory> streamFactory = 
minifi::io::StreamFactory::getInstance(configuration);
   std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
@@ -541,8 +541,8 @@ TEST_CASE("Test Required Property", 
"[YamlConfigurationRequiredProperty]") {
   logTestController.setDebug<TestPlan>();
   logTestController.setDebug<core::YamlConfiguration>();
 
-  std::shared_ptr<core::Repository> testProvRepo = 
core::createRepository("provenancerepository", true);
-  std::shared_ptr<core::Repository> testFlowFileRepo = 
core::createRepository("flowfilerepository", true);
+  std::shared_ptr<core::Repository> testProvRepo = 
core::createRepository("provenancerepository");
+  std::shared_ptr<core::Repository> testFlowFileRepo = 
core::createRepository("flowfilerepository");
   std::shared_ptr<minifi::Configure> configuration = 
std::make_shared<minifi::Configure>();
   std::shared_ptr<minifi::io::StreamFactory> streamFactory = 
minifi::io::StreamFactory::getInstance(configuration);
   std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
@@ -586,8 +586,8 @@ TEST_CASE("Test Required Property 2", 
"[YamlConfigurationRequiredProperty2]") {
   logTestController.setDebug<core::YamlConfiguration>();
   logTestController.setDebug<core::Processor>();
 
-  std::shared_ptr<core::Repository> testProvRepo = 
core::createRepository("provenancerepository", true);
-  std::shared_ptr<core::Repository> testFlowFileRepo = 
core::createRepository("flowfilerepository", true);
+  std::shared_ptr<core::Repository> testProvRepo = 
core::createRepository("provenancerepository");
+  std::shared_ptr<core::Repository> testFlowFileRepo = 
core::createRepository("flowfilerepository");
   std::shared_ptr<minifi::Configure> configuration = 
std::make_shared<minifi::Configure>();
   std::shared_ptr<minifi::io::StreamFactory> streamFactory = 
minifi::io::StreamFactory::getInstance(configuration);
   std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
@@ -635,8 +635,8 @@ TEST_CASE("Test Dependent Property", 
"[YamlConfigurationDependentProperty]") {
   logTestController.setDebug<TestPlan>();
   logTestController.setDebug<core::YamlConfiguration>();
 
-  std::shared_ptr<core::Repository> testProvRepo = 
core::createRepository("provenancerepository", true);
-  std::shared_ptr<core::Repository> testFlowFileRepo = 
core::createRepository("flowfilerepository", true);
+  std::shared_ptr<core::Repository> testProvRepo = 
core::createRepository("provenancerepository");
+  std::shared_ptr<core::Repository> testFlowFileRepo = 
core::createRepository("flowfilerepository");
   std::shared_ptr<minifi::Configure> configuration = 
std::make_shared<minifi::Configure>();
   std::shared_ptr<minifi::io::StreamFactory> streamFactory = 
minifi::io::StreamFactory::getInstance(configuration);
   std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
@@ -657,8 +657,8 @@ TEST_CASE("Test Dependent Property 2", 
"[YamlConfigurationDependentProperty2]")
   logTestController.setDebug<TestPlan>();
   logTestController.setDebug<core::YamlConfiguration>();
 
-  std::shared_ptr<core::Repository> testProvRepo = 
core::createRepository("provenancerepository", true);
-  std::shared_ptr<core::Repository> testFlowFileRepo = 
core::createRepository("flowfilerepository", true);
+  std::shared_ptr<core::Repository> testProvRepo = 
core::createRepository("provenancerepository");
+  std::shared_ptr<core::Repository> testFlowFileRepo = 
core::createRepository("flowfilerepository");
   std::shared_ptr<minifi::Configure> configuration = 
std::make_shared<minifi::Configure>();
   std::shared_ptr<minifi::io::StreamFactory> streamFactory = 
minifi::io::StreamFactory::getInstance(configuration);
   std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
@@ -687,8 +687,8 @@ TEST_CASE("Test Exclusive Property", 
"[YamlConfigurationExclusiveProperty]") {
   LogTestController &logTestController = LogTestController::getInstance();
   logTestController.setDebug<TestPlan>();
   logTestController.setDebug<core::YamlConfiguration>();
-  std::shared_ptr<core::Repository> testProvRepo = 
core::createRepository("provenancerepository", true);
-  std::shared_ptr<core::Repository> testFlowFileRepo = 
core::createRepository("flowfilerepository", true);
+  std::shared_ptr<core::Repository> testProvRepo = 
core::createRepository("provenancerepository");
+  std::shared_ptr<core::Repository> testFlowFileRepo = 
core::createRepository("flowfilerepository");
   std::shared_ptr<minifi::Configure> configuration = 
std::make_shared<minifi::Configure>();
   std::shared_ptr<minifi::io::StreamFactory> streamFactory = 
minifi::io::StreamFactory::getInstance(configuration);
   std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
@@ -707,8 +707,8 @@ TEST_CASE("Test Regex Property", 
"[YamlConfigurationRegexProperty]") {
   LogTestController &logTestController = LogTestController::getInstance();
   logTestController.setDebug<TestPlan>();
   logTestController.setDebug<core::YamlConfiguration>();
-  std::shared_ptr<core::Repository> testProvRepo = 
core::createRepository("provenancerepository", true);
-  std::shared_ptr<core::Repository> testFlowFileRepo = 
core::createRepository("flowfilerepository", true);
+  std::shared_ptr<core::Repository> testProvRepo = 
core::createRepository("provenancerepository");
+  std::shared_ptr<core::Repository> testFlowFileRepo = 
core::createRepository("flowfilerepository");
   std::shared_ptr<minifi::Configure> configuration = 
std::make_shared<minifi::Configure>();
   std::shared_ptr<minifi::io::StreamFactory> streamFactory = 
minifi::io::StreamFactory::getInstance(configuration);
   std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
@@ -728,8 +728,8 @@ TEST_CASE("Test Exclusive Property 2", 
"[YamlConfigurationExclusiveProperty2]")
   LogTestController &logTestController = LogTestController::getInstance();
   logTestController.setDebug<TestPlan>();
   logTestController.setDebug<core::YamlConfiguration>();
-  std::shared_ptr<core::Repository> testProvRepo = 
core::createRepository("provenancerepository", true);
-  std::shared_ptr<core::Repository> testFlowFileRepo = 
core::createRepository("flowfilerepository", true);
+  std::shared_ptr<core::Repository> testProvRepo = 
core::createRepository("provenancerepository");
+  std::shared_ptr<core::Repository> testFlowFileRepo = 
core::createRepository("flowfilerepository");
   std::shared_ptr<minifi::Configure> configuration = 
std::make_shared<minifi::Configure>();
   std::shared_ptr<minifi::io::StreamFactory> streamFactory = 
minifi::io::StreamFactory::getInstance(configuration);
   std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
@@ -756,8 +756,8 @@ TEST_CASE("Test Regex Property 2", 
"[YamlConfigurationRegexProperty2]") {
   LogTestController &logTestController = LogTestController::getInstance();
   logTestController.setDebug<TestPlan>();
   logTestController.setDebug<core::YamlConfiguration>();
-  std::shared_ptr<core::Repository> testProvRepo = 
core::createRepository("provenancerepository", true);
-  std::shared_ptr<core::Repository> testFlowFileRepo = 
core::createRepository("flowfilerepository", true);
+  std::shared_ptr<core::Repository> testProvRepo = 
core::createRepository("provenancerepository");
+  std::shared_ptr<core::Repository> testFlowFileRepo = 
core::createRepository("flowfilerepository");
   std::shared_ptr<minifi::Configure> configuration = 
std::make_shared<minifi::Configure>();
   std::shared_ptr<minifi::io::StreamFactory> streamFactory = 
minifi::io::StreamFactory::getInstance(configuration);
   std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
@@ -784,8 +784,8 @@ TEST_CASE("Test Regex Property 2", 
"[YamlConfigurationRegexProperty2]") {
 TEST_CASE("Test YAML Config With Funnel", "[YamlConfiguration]") {
   TestController test_controller;
 
-  std::shared_ptr<core::Repository> testProvRepo = 
core::createRepository("provenancerepository", true);
-  std::shared_ptr<core::Repository> testFlowFileRepo = 
core::createRepository("flowfilerepository", true);
+  std::shared_ptr<core::Repository> testProvRepo = 
core::createRepository("provenancerepository");
+  std::shared_ptr<core::Repository> testFlowFileRepo = 
core::createRepository("flowfilerepository");
   std::shared_ptr<minifi::Configure> configuration = 
std::make_shared<minifi::Configure>();
   std::shared_ptr<minifi::io::StreamFactory> streamFactory = 
minifi::io::StreamFactory::getInstance(configuration);
   std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
@@ -875,8 +875,8 @@ Remote Process Groups: []
 
 TEST_CASE("Test UUID duplication checks", "[YamlConfiguration]") {
   TestController test_controller;
-  std::shared_ptr<core::Repository> test_prov_repo = 
core::createRepository("provenancerepository", true);
-  std::shared_ptr<core::Repository> test_flow_file_repo = 
core::createRepository("flowfilerepository", true);
+  std::shared_ptr<core::Repository> test_prov_repo = 
core::createRepository("provenancerepository");
+  std::shared_ptr<core::Repository> test_flow_file_repo = 
core::createRepository("flowfilerepository");
   std::shared_ptr<minifi::Configure> configuration = 
std::make_shared<minifi::Configure>();
   std::shared_ptr<minifi::io::StreamFactory> stream_factory = 
minifi::io::StreamFactory::getInstance(configuration);
   std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
diff --git a/libminifi/include/core/ContentRepository.h 
b/libminifi/include/core/ContentRepository.h
index 68818c17a..e109eb97a 100644
--- a/libminifi/include/core/ContentRepository.h
+++ b/libminifi/include/core/ContentRepository.h
@@ -42,29 +42,24 @@ namespace core {
  */
 class ContentRepository : public StreamManager<minifi::ResourceClaim>, public 
utils::EnableSharedFromThis<ContentRepository> {
  public:
-  virtual ~ContentRepository() = default;
+  ~ContentRepository() override = default;
 
   /**
    * initialize this content repository using the provided configuration.
    */
   virtual bool initialize(const std::shared_ptr<Configure> &configure) = 0;
 
-  virtual std::string getStoragePath() const;
+  std::string getStoragePath() const override;
 
   virtual std::shared_ptr<ContentSession> createSession();
 
-  /**
-   * Stops this repository.
-   */
-  virtual void stop() = 0;
-
   void reset();
 
-  virtual uint32_t getStreamCount(const minifi::ResourceClaim &streamId);
+  uint32_t getStreamCount(const minifi::ResourceClaim &streamId) override;
 
-  virtual void incrementStreamCount(const minifi::ResourceClaim &streamId);
+  void incrementStreamCount(const minifi::ResourceClaim &streamId) override;
 
-  virtual StreamState decrementStreamCount(const minifi::ResourceClaim 
&streamId);
+  StreamState decrementStreamCount(const minifi::ResourceClaim &streamId) 
override;
 
  protected:
   std::string directory_;
diff --git a/libminifi/include/core/Repository.h 
b/libminifi/include/core/Repository.h
index 5e265d091..5e2169187 100644
--- a/libminifi/include/core/Repository.h
+++ b/libminifi/include/core/Repository.h
@@ -20,30 +20,29 @@
 #ifndef LIBMINIFI_INCLUDE_CORE_REPOSITORY_H_
 #define LIBMINIFI_INCLUDE_CORE_REPOSITORY_H_
 
-#include <memory>
-#include <utility>
 #include <atomic>
 #include <cstdint>
 #include <cstring>
-#include <iostream>
 #include <map>
+#include <memory>
 #include <set>
 #include <string>
-#include <thread>
+#include <utility>
 #include <vector>
+
+#include "Core.h"
+#include "ResourceClaim.h"
+#include "core/Connectable.h"
 #include "core/ContentRepository.h"
+#include "core/Property.h"
 #include "core/SerializableComponent.h"
-#include "properties/Configure.h"
 #include "core/logging/LoggerFactory.h"
-#include "core/Property.h"
-#include "ResourceClaim.h"
-#include "utils/TimeUtil.h"
-#include "utils/StringUtils.h"
-#include "Core.h"
-#include "core/Connectable.h"
-#include "core/TraceableResource.h"
+#include "properties/Configure.h"
 #include "utils/BackTrace.h"
 #include "SwapManager.h"
+#include "utils/Literals.h"
+#include "utils/StringUtils.h"
+#include "utils/TimeUtil.h"
 
 #ifndef WIN32
 #include <sys/stat.h>
@@ -51,50 +50,42 @@
 
 namespace org::apache::nifi::minifi::core {
 
-#define REPOSITORY_DIRECTORY "./repo"
-#define MAX_REPOSITORY_STORAGE_SIZE (10*1024*1024)  // 10M
+constexpr auto REPOSITORY_DIRECTORY = "./repo";
+constexpr auto MAX_REPOSITORY_STORAGE_SIZE = 10_MiB;
 constexpr auto MAX_REPOSITORY_ENTRY_LIFE_TIME = std::chrono::minutes(10);
 constexpr auto REPOSITORY_PURGE_PERIOD = std::chrono::milliseconds(2500);
 
-class Repository : public virtual core::SerializableComponent, public 
core::TraceableResource {
+class Repository : public virtual core::SerializableComponent {
  public:
-  /*
-   * Constructor for the repository
-   */
-  explicit Repository(std::string repo_name = "Repository",
+  explicit Repository(const std::string& repo_name = "Repository",
              std::string directory = REPOSITORY_DIRECTORY,
              std::chrono::milliseconds maxPartitionMillis = 
MAX_REPOSITORY_ENTRY_LIFE_TIME,
              int64_t maxPartitionBytes = MAX_REPOSITORY_STORAGE_SIZE,
              std::chrono::milliseconds purgePeriod = REPOSITORY_PURGE_PERIOD)
       : core::SerializableComponent(repo_name),
-        thread_(),
+        directory_(std::move(directory)),
+        max_partition_millis_(maxPartitionMillis),
+        max_partition_bytes_(maxPartitionBytes),
+        purge_period_(purgePeriod),
+        repo_full_(false),
         repo_size_(0),
         logger_(logging::LoggerFactory<Repository>::getLogger()) {
-    directory_ = directory;
-    max_partition_millis_ = maxPartitionMillis;
-    max_partition_bytes_ = maxPartitionBytes;
-    purge_period_ = purgePeriod;
-    running_ = false;
-    repo_full_ = false;
   }
 
-  // Destructor
-  ~Repository() override {
-    stop();
-  }
+  virtual bool initialize(const std::shared_ptr<Configure>& /*configure*/) = 0;
 
-  virtual bool isNoop() {
-    return true;
-  }
+  virtual bool start() = 0;
 
-  virtual void flush();
+  virtual bool stop() = 0;
 
-  // initialize
-  virtual bool initialize(const std::shared_ptr<Configure>& /*configure*/) {
+  virtual bool isNoop() const {
     return true;
   }
-  // Put
-  virtual bool Put(std::string /*key*/, const uint8_t* /*buf*/, size_t 
/*bufLen*/) {
+
+  virtual void flush() {
+  }
+
+  virtual bool Put(const std::string& /*key*/, const uint8_t* /*buf*/, size_t 
/*bufLen*/) {
     return true;
   }
 
@@ -102,8 +93,7 @@ class Repository : public virtual 
core::SerializableComponent, public core::Trac
     return true;
   }
 
-  // Delete
-  virtual bool Delete(std::string /*key*/) {
+  virtual bool Delete(const std::string& /*key*/) {
     return true;
   }
 
@@ -127,30 +117,10 @@ class Repository : public virtual 
core::SerializableComponent, public core::Trac
     return false;
   }
 
-  // Run function for the thread
-  virtual void run() {
-    // no op
-  }
-
-  /**
-   * Since SerializableComponents represent a runnable object, we should 
return traces
-   */
-  BackTrace getTraces() override {
-    return TraceResolver::getResolver().getBackTrace(getName(), 
thread_.native_handle());
-  }
-
-  // Start the repository monitor thread
-  virtual void start();
-  // Stop the repository monitor thread
-  virtual void stop();
   // whether the repo is full
   virtual bool isFull() {
     return repo_full_;
   }
-  // whether the repo is enable
-  bool isRunning() override {
-    return running_;
-  }
 
   /**
    * Specialization that allows us to serialize max_size objects into store.
@@ -219,14 +189,12 @@ class Repository : public virtual 
core::SerializableComponent, public core::Trac
     return Put(key, buffer, bufferSize);
   }
 
-  uint64_t incrementSize(const char* /*fpath*/, const struct stat *sb, int 
/*typeflag*/) {
-    return (repo_size_ += sb->st_size);
-  }
-
   virtual void loadComponent(const std::shared_ptr<core::ContentRepository>& 
/*content_repo*/) {
   }
 
-  virtual uint64_t getRepoSize();
+  virtual uint64_t getRepoSize() const {
+    return repo_size_;
+  }
 
   std::string getDirectory() const {
     return directory_;
@@ -241,30 +209,16 @@ class Repository : public virtual 
core::SerializableComponent, public core::Trac
   std::map<std::string, core::Connectable*> containers_;
 
   std::map<std::string, core::Connectable*> connection_map_;
-  // Mutex for protection
-  std::mutex mutex_;
-  // repository directory
   std::string directory_;
-  // max db entry life time
+  // max db entry lifetime
   std::chrono::milliseconds max_partition_millis_;
   // max db size
   int64_t max_partition_bytes_;
-  // purge period
   std::chrono::milliseconds purge_period_;
-  // thread
-  std::thread thread_;
-  // whether the monitoring thread is running for the repo while it was enabled
-  std::atomic<bool> running_;
-  // whether stop accepting provenace event
+  // whether to stop accepting provenance event
   std::atomic<bool> repo_full_;
-  // repoSize
 
-  // size of the directory
   std::atomic<uint64_t> repo_size_;
-  // Run function for the thread
-  void threadExecutor() {
-    run();
-  }
 
  private:
   std::shared_ptr<logging::Logger> logger_;
diff --git a/libminifi/include/core/RepositoryFactory.h 
b/libminifi/include/core/RepositoryFactory.h
index 2a56728b2..60ba5775a 100644
--- a/libminifi/include/core/RepositoryFactory.h
+++ b/libminifi/include/core/RepositoryFactory.h
@@ -26,33 +26,24 @@
 #include "core/Repository.h"
 #include "Core.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-
-namespace core {
+namespace org::apache::nifi::minifi::core {
 
 /**
- * Create a repository represented by the configuration class name
+ * Create a context repository
  * @param configuration_class_name configuration class name
  * @param fail_safe determines whether or not to make the default class if 
configuration_class_name is invalid
  * @param repo_name name of the repository
  */
-std::unique_ptr<core::Repository> createRepository(const std::string& 
configuration_class_name, bool fail_safe = false, const std::string& repo_name 
= "");
+std::unique_ptr<core::ContentRepository> createContentRepository(const 
std::string& configuration_class_name, bool fail_safe = false, const 
std::string& repo_name = "");
 
 /**
- * Create a context repository
+ * Create a repository represented by the configuration class name
  * @param configuration_class_name configuration class name
  * @param fail_safe determines whether or not to make the default class if 
configuration_class_name is invalid
  * @param repo_name name of the repository
  */
-std::unique_ptr<core::ContentRepository> createContentRepository(const 
std::string& configuration_class_name, bool fail_safe = false, const 
std::string& repo_name = "");
+std::unique_ptr<core::Repository> createRepository(const std::string& 
configuration_class_name, const std::string& repo_name = "");
 
-}  // namespace core
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi::core
 
 #endif  // LIBMINIFI_INCLUDE_CORE_REPOSITORYFACTORY_H_
diff --git a/libminifi/include/core/ThreadedRepository.h 
b/libminifi/include/core/ThreadedRepository.h
new file mode 100644
index 000000000..e03aed9e5
--- /dev/null
+++ b/libminifi/include/core/ThreadedRepository.h
@@ -0,0 +1,106 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <thread>
+
+#include "Repository.h"
+#include "TraceableResource.h"
+
+namespace org::apache::nifi::minifi::core {
+
+class ThreadedRepository : public core::Repository, public 
core::TraceableResource {
+ public:
+  using Repository::Repository;
+
+  ~ThreadedRepository() override {
+    if (running_state_.load() != RunningState::Stopped) {
+      logger_->log_error("Thread of %s should have been stopped in subclass 
before ThreadedRepository's destruction", name_);
+    }
+  }
+
+  bool initialize(const std::shared_ptr<Configure>& /*configure*/) override {
+    return true;
+  }
+
+  // Starts repository monitor thread
+  bool start() override {
+    // if Stopped, turn to Starting, otherwise return
+    RunningState expected{RunningState::Stopped};
+    if (!running_state_.compare_exchange_strong(expected, 
RunningState::Starting)) {
+      return false;
+    }
+    if (purge_period_ <= std::chrono::milliseconds(0)) {
+      running_state_.store(RunningState::Running);
+      return true;
+    }
+    getThread() = std::thread(&ThreadedRepository::run, this);
+    running_state_.store(RunningState::Running);
+
+    logger_->log_debug("%s ThreadedRepository monitor thread start", name_);
+    return true;
+  }
+
+  // Stops repository monitor thread
+  bool stop() override {
+    // if RUNNING, turn to STOPPING, otherwise return
+    RunningState expected{RunningState::Running};
+    if (!running_state_.compare_exchange_strong(expected, 
RunningState::Stopping)) {
+      return false;
+    }
+    if (getThread().joinable()) {
+      getThread().join();
+    }
+    running_state_.store(RunningState::Stopped);
+    logger_->log_debug("%s ThreadedRepository monitor thread stop", name_);
+    return true;
+  }
+
+  bool isRunning() override {
+    return running_state_.load() == RunningState::Running;
+  }
+
+  BackTrace getTraces() override {
+    return TraceResolver::getResolver().getBackTrace(getName(), 
getThread().native_handle());
+  }
+
+ private:
+  virtual void run() = 0;
+
+  /**
+   * READ BEFORE USING!
+   * @returns repository monitor thread
+   * Thread-owning overriding subclasses MUST also call stop() in their 
destructor
+   * to prevent the thread still using their members after they are are 
destructed (it's too late in the destructor of this base class)
+   */
+  virtual std::thread& getThread() = 0;
+
+  enum class RunningState : uint8_t {
+    Starting,
+    Running,
+    Stopping,
+    Stopped
+  };
+
+  std::atomic<RunningState> running_state_{RunningState::Stopped};
+  std::shared_ptr<logging::Logger> logger_ 
{logging::LoggerFactory<ThreadedRepository>::getLogger()};
+};
+
+}  // namespace org::apache::nifi::minifi::core
diff --git a/libminifi/include/core/repository/FileSystemRepository.h 
b/libminifi/include/core/repository/FileSystemRepository.h
index 01e231c2a..54ce36b6c 100644
--- a/libminifi/include/core/repository/FileSystemRepository.h
+++ b/libminifi/include/core/repository/FileSystemRepository.h
@@ -25,49 +25,39 @@
 #include "../ContentRepository.h"
 #include "properties/Configure.h"
 #include "core/logging/LoggerFactory.h"
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace core {
-namespace repository {
+
+namespace org::apache::nifi::minifi::core::repository {
 
 /**
  * FileSystemRepository is a content repository that stores data onto the 
local file system.
  */
 class FileSystemRepository : public core::ContentRepository, public 
core::CoreComponent {
  public:
-  FileSystemRepository(std::string name = 
getClassName<FileSystemRepository>()) // NOLINT
+  explicit FileSystemRepository(const std::string& name = 
getClassName<FileSystemRepository>())
       : core::CoreComponent(name),
         logger_(logging::LoggerFactory<FileSystemRepository>::getLogger()) {
   }
-  virtual ~FileSystemRepository() = default;
 
-  virtual bool initialize(const std::shared_ptr<minifi::Configure> 
&configuration);
+  virtual ~FileSystemRepository() = default;
 
-  virtual void stop();
+  virtual bool initialize(const std::shared_ptr<minifi::Configure>& 
configuration);
 
-  bool exists(const minifi::ResourceClaim &streamId);
+  bool exists(const minifi::ResourceClaim& streamId);
 
-  virtual std::shared_ptr<io::BaseStream> write(const minifi::ResourceClaim 
&claim, bool append = false);
+  virtual std::shared_ptr<io::BaseStream> write(const minifi::ResourceClaim& 
claim, bool append = false);
 
-  virtual std::shared_ptr<io::BaseStream> read(const minifi::ResourceClaim 
&claim);
+  virtual std::shared_ptr<io::BaseStream> read(const minifi::ResourceClaim& 
claim);
 
-  virtual bool close(const minifi::ResourceClaim &claim) {
+  virtual bool close(const minifi::ResourceClaim& claim) {
     return remove(claim);
   }
 
-  virtual bool remove(const minifi::ResourceClaim &claim);
+  virtual bool remove(const minifi::ResourceClaim& claim);
 
  private:
   std::shared_ptr<logging::Logger> logger_;
 };
 
-}  // namespace repository
-}  // namespace core
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi::core::repository
 
 #endif  // LIBMINIFI_INCLUDE_CORE_REPOSITORY_FILESYSTEMREPOSITORY_H_
diff --git a/libminifi/include/core/repository/VolatileContentRepository.h 
b/libminifi/include/core/repository/VolatileContentRepository.h
index 52122ebad..2759af81c 100644
--- a/libminifi/include/core/repository/VolatileContentRepository.h
+++ b/libminifi/include/core/repository/VolatileContentRepository.h
@@ -1,5 +1,4 @@
 /**
- *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -15,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 #ifndef LIBMINIFI_INCLUDE_CORE_REPOSITORY_VOLATILECONTENTREPOSITORY_H_
 #define LIBMINIFI_INCLUDE_CORE_REPOSITORY_VOLATILECONTENTREPOSITORY_H_
 
@@ -32,23 +32,13 @@
 #include "core/logging/LoggerFactory.h"
 #include "utils/GeneralUtils.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace core {
-namespace repository {
+namespace org::apache::nifi::minifi::core::repository {
 
 /**
- * Purpose: Stages content into a volatile area of memory. Note that   when 
the maximum number
+ * Purpose: Stages content into a volatile area of memory. Note that when the 
maximum number
  * of entries is consumed we will rollback a session to wait for others to be 
freed.
  */
-class VolatileContentRepository :
-    public core::ContentRepository,
-    public core::repository::VolatileRepository<ResourceClaim::Path>,
-    public utils::EnableSharedFromThis<VolatileContentRepository> {
-  using utils::EnableSharedFromThis<VolatileContentRepository>::sharedFromThis;
-
+class VolatileContentRepository : public core::ContentRepository, public 
core::repository::VolatileRepository<ResourceClaim::Path> {
  public:
   static const char *minimal_locking;
 
@@ -59,7 +49,8 @@ class VolatileContentRepository :
         
logger_(logging::LoggerFactory<VolatileContentRepository>::getLogger()) {
     max_count_ = 15000;
   }
-  virtual ~VolatileContentRepository() {
+
+  ~VolatileContentRepository() override {
     logger_->log_debug("Clearing repository");
     if (!minimize_locking_) {
       std::lock_guard<std::mutex> lock(map_mutex_);
@@ -70,38 +61,41 @@ class VolatileContentRepository :
     }
   }
 
+  bool start() override {
+    return true;
+  }
+
+  bool stop() override {
+    return true;
+  }
+
   /**
    * Initialize the volatile content repo
    * @param configure configuration
    */
-  virtual bool initialize(const std::shared_ptr<Configure> &configure);
-
-  /**
-   * Stop any thread associated with the volatile content repository.
-   */
-  virtual void stop();
+  bool initialize(const std::shared_ptr<Configure> &configure) override;
 
   /**
    * Creates writable stream.
    * @param claim resource claim
    * @return BaseStream shared pointer that represents the stream the consumer 
will write to.
    */
-  virtual std::shared_ptr<io::BaseStream> write(const minifi::ResourceClaim 
&claim, bool append);
+  std::shared_ptr<io::BaseStream> write(const minifi::ResourceClaim &claim, 
bool append) override;
 
   /**
    * Creates readable stream.
    * @param claim resource claim
    * @return BaseStream shared pointer that represents the stream from which 
the consumer will read..
    */
-  virtual std::shared_ptr<io::BaseStream> read(const minifi::ResourceClaim 
&claim);
+  std::shared_ptr<io::BaseStream> read(const minifi::ResourceClaim &claim) 
override;
 
-  virtual bool exists(const minifi::ResourceClaim &claim);
+  bool exists(const minifi::ResourceClaim &claim) override;
 
   /**
    * Closes the claim.
    * @return whether or not the claim is associated with content stored in 
volatile memory.
    */
-  virtual bool close(const minifi::ResourceClaim &claim) {
+  bool close(const minifi::ResourceClaim &claim) override {
     return remove(claim);
   }
 
@@ -109,12 +103,7 @@ class VolatileContentRepository :
    * Closes the claim.
    * @return whether or not the claim is associated with content stored in 
volatile memory.
    */
-  virtual bool remove(const minifi::ResourceClaim &claim);
-
- protected:
-  virtual void start();
-
-  virtual void run();
+  bool remove(const minifi::ResourceClaim &claim) override;
 
  private:
   bool minimize_locking_;
@@ -129,11 +118,6 @@ class VolatileContentRepository :
   std::shared_ptr<logging::Logger> logger_;
 };
 
-}  // namespace repository
-}  // namespace core
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi::core::repository
 
 #endif  // LIBMINIFI_INCLUDE_CORE_REPOSITORY_VOLATILECONTENTREPOSITORY_H_
diff --git a/libminifi/include/core/repository/VolatileFlowFileRepository.h 
b/libminifi/include/core/repository/VolatileFlowFileRepository.h
index ddf93810b..79fbf7671 100644
--- a/libminifi/include/core/repository/VolatileFlowFileRepository.h
+++ b/libminifi/include/core/repository/VolatileFlowFileRepository.h
@@ -23,6 +23,7 @@
 
 #include "VolatileRepository.h"
 #include "FlowFileRecord.h"
+#include "core/ThreadedRepository.h"
 #include "utils/gsl.h"
 
 namespace org {
@@ -36,12 +37,10 @@ namespace repository {
  * Volatile flow file repository. keeps a running counter of the current 
location, freeing
  * those which we no longer hold.
  */
-class VolatileFlowFileRepository : public VolatileRepository<std::string>, 
public utils::EnableSharedFromThis<VolatileFlowFileRepository> {
-  using 
utils::EnableSharedFromThis<VolatileFlowFileRepository>::sharedFromThis;
-
+class VolatileFlowFileRepository : public VolatileRepository<std::string, 
core::ThreadedRepository> {
  public:
-  explicit VolatileFlowFileRepository(std::string repo_name = "",
-                                      std::string /*dir*/ = 
REPOSITORY_DIRECTORY,
+  explicit VolatileFlowFileRepository(const std::string& repo_name = "",
+                                      const std::string& /*dir*/ = 
REPOSITORY_DIRECTORY,
                                       std::chrono::milliseconds 
maxPartitionMillis = MAX_REPOSITORY_ENTRY_LIFE_TIME,
                                       int64_t maxPartitionBytes = 
MAX_REPOSITORY_STORAGE_SIZE,
                                       std::chrono::milliseconds purgePeriod = 
REPOSITORY_PURGE_PERIOD)
@@ -51,15 +50,23 @@ class VolatileFlowFileRepository : public 
VolatileRepository<std::string>, publi
     content_repo_ = nullptr;
   }
 
+  ~VolatileFlowFileRepository() override {
+    stop();
+  }
+
+ private:
   void run() override {
-    repo_full_ = false;
-    while (running_) {
+    while (isRunning()) {
       std::this_thread::sleep_for(purge_period_);
       flush();
     }
     flush();
   }
 
+  std::thread& getThread() override {
+    return thread_;
+  }
+
   void flush() override {
     if (purge_required_ && nullptr != content_repo_) {
       std::lock_guard<std::mutex> lock(purge_mutex_);
@@ -80,7 +87,6 @@ class VolatileFlowFileRepository : public 
VolatileRepository<std::string>, publi
     content_repo_ = content_repo;
   }
 
- protected:
   void emplace(RepoValue<std::string> &old_value) override {
     std::string buffer;
     old_value.emplace(buffer);
@@ -89,6 +95,7 @@ class VolatileFlowFileRepository : public 
VolatileRepository<std::string>, publi
   }
 
   std::shared_ptr<core::ContentRepository> content_repo_;
+  std::thread thread_;
 };
 }  // namespace repository
 }  // namespace core
diff --git a/libminifi/include/core/repository/VolatileProvenanceRepository.h 
b/libminifi/include/core/repository/VolatileProvenanceRepository.h
index 14ab1c035..a8f03f198 100644
--- a/libminifi/include/core/repository/VolatileProvenanceRepository.h
+++ b/libminifi/include/core/repository/VolatileProvenanceRepository.h
@@ -21,6 +21,7 @@
 #include <string>
 
 #include "VolatileRepository.h"
+#include "core/ThreadedRepository.h"
 
 namespace org {
 namespace apache {
@@ -29,10 +30,7 @@ namespace minifi {
 namespace core {
 namespace repository {
 
-/**
- * Volatile provenance repository.
- */
-class VolatileProvenanceRepository : public VolatileRepository<std::string> {
+class VolatileProvenanceRepository : public VolatileRepository<std::string, 
core::ThreadedRepository> {
  public:
   explicit VolatileProvenanceRepository(std::string repo_name = "",
                                         std::string /*dir*/ = 
REPOSITORY_DIRECTORY,
@@ -43,14 +41,23 @@ class VolatileProvenanceRepository : public 
VolatileRepository<std::string> {
     purge_required_ = false;
   }
 
-  virtual void run() {
-    repo_full_ = false;
+  ~VolatileProvenanceRepository() override {
+    stop();
+  }
+
+ private:
+  void run() override {
+  }
+
+  std::thread& getThread() override {
+    return thread_;
   }
- protected:
-  virtual void emplace(RepoValue<std::string> &old_value) {
+
+  void emplace(RepoValue<std::string> &old_value) override {
     purge_list_.push_back(old_value.getKey());
   }
- private:
+
+  std::thread thread_;
 };
 
 }  // namespace repository
diff --git a/libminifi/include/core/repository/VolatileRepository.h 
b/libminifi/include/core/repository/VolatileRepository.h
index 67f48bc64..d3771e612 100644
--- a/libminifi/include/core/repository/VolatileRepository.h
+++ b/libminifi/include/core/repository/VolatileRepository.h
@@ -39,23 +39,16 @@ namespace nifi {
 namespace minifi {
 namespace core {
 namespace repository {
-#if defined(__clang__)
-#pragma clang diagnostic push
-#pragma clang diagnostic ignored "-Woverloaded-virtual"
-#elif defined(__GNUC__) || defined(__GNUG__)
-#pragma GCC diagnostic push
-#pragma GCC diagnostic ignored "-Woverloaded-virtual"
-#endif
+
 /**
  * Flow File repository
  * Design: Extends Repository and implements the run function, using RocksDB 
as the primary substrate.
  */
-template<typename T>
-class VolatileRepository : public core::Repository {
+template<typename KeyType, typename RepositoryType = core::Repository>
+class VolatileRepository : public RepositoryType {
  public:
   static const char *volatile_repo_max_count;
   static const char *volatile_repo_max_bytes;
-  // Constructor
 
   explicit VolatileRepository(std::string repo_name = "",
                               std::string /*dir*/ = REPOSITORY_DIRECTORY,
@@ -63,7 +56,7 @@ class VolatileRepository : public core::Repository {
                               int64_t maxPartitionBytes = 
MAX_REPOSITORY_STORAGE_SIZE,
                               std::chrono::milliseconds purgePeriod = 
REPOSITORY_PURGE_PERIOD)
       : core::SerializableComponent(repo_name),
-        Repository(repo_name.length() > 0 ? repo_name : 
core::getClassName<VolatileRepository>(), "", maxPartitionMillis, 
maxPartitionBytes, purgePeriod),
+        RepositoryType(repo_name.length() > 0 ? repo_name : 
core::getClassName<VolatileRepository>(), "", maxPartitionMillis, 
maxPartitionBytes, purgePeriod),
         current_size_(0),
         current_index_(0),
         max_count_(10000),
@@ -75,14 +68,11 @@ class VolatileRepository : public core::Repository {
   ~VolatileRepository() override;
 
   /**
-   * Initialize thevolatile repsitory
+   * Initialize the volatile repository
    **/
-
   bool initialize(const std::shared_ptr<Configure> &configure) override;
 
-  void run() override = 0;
-
-  bool isNoop() override {
+  bool isNoop() const override {
     return false;
   }
 
@@ -91,26 +81,26 @@ class VolatileRepository : public core::Repository {
    * @param key key to add to the repository
    * @param buf buffer
    **/
-  bool Put(T key, const uint8_t *buf, size_t bufLen) override;
+  bool Put(const KeyType& key, const uint8_t *buf, size_t bufLen) override;
 
   /**
    * Places new objects into the volatile memory area
    * @param data the key-value pairs to add to the repository
    **/
-  bool MultiPut(const std::vector<std::pair<T, 
std::unique_ptr<io::BufferStream>>>& data) override;
+  bool MultiPut(const std::vector<std::pair<KeyType, 
std::unique_ptr<io::BufferStream>>>& data) override;
 
   /**
    * Deletes the key
    * @return status of the delete operation
    */
-  bool Delete(T key) override;
+  bool Delete(const KeyType& key) override;
 
   /**
    * Sets the value from the provided key. Once the item is retrieved
    * it may not be retrieved again.
    * @return status of the get operation.
    */
-  bool Get(const T &key, std::string &value) override;
+  bool Get(const KeyType& key, std::string &value) override;
   /**
    * Deserializes objects into store
    * @param store vector in which we will store newly created objects.
@@ -131,37 +121,23 @@ class VolatileRepository : public core::Repository {
    */
   void loadComponent(const std::shared_ptr<core::ContentRepository> 
&content_repo) override;
 
-  void start() override;
-
-  uint64_t getRepoSize() override {
+  uint64_t getRepoSize() const override {
     return current_size_;
   }
 
  protected:
-  virtual void emplace(RepoValue<T> &old_value) {
+  virtual void emplace(RepoValue<KeyType> &old_value) {
     std::lock_guard<std::mutex> lock(purge_mutex_);
     purge_list_.push_back(old_value.getKey());
   }
 
-  /**
-   * Tests whether or not the current size exceeds the capacity
-   * if the new prospectiveSize is inserted.
-   * @param prospectiveSize size of item to be added.
-   */
-  inline bool exceedsCapacity(uint32_t prospectiveSize) {
-    if (current_size_ + prospectiveSize > max_size_)
-      return true;
-    else
-      return false;
-  }
-
   // current size of the volatile repo.
   std::atomic<size_t> current_size_;
   // current index.
   std::atomic<uint16_t> current_index_;
   // value vector that exists for non blocking iteration over
   // objects that store data for this repo instance.
-  std::vector<AtomicEntry<T>*> value_vector_;
+  std::vector<AtomicEntry<KeyType>*> value_vector_;
 
   // max count we are allowed to store.
   uint32_t max_count_;
@@ -172,42 +148,40 @@ class VolatileRepository : public core::Repository {
 
   std::mutex purge_mutex_;
   // purge list
-  std::vector<T> purge_list_;
+  std::vector<KeyType> purge_list_;
 
  private:
   std::shared_ptr<logging::Logger> logger_;
 };
 
-template<typename T>
-const char *VolatileRepository<T>::volatile_repo_max_count = "max.count";
-template<typename T>
-const char *VolatileRepository<T>::volatile_repo_max_bytes = "max.bytes";
+template<typename KeyType, typename RepositoryType>
+const char *VolatileRepository<KeyType, 
RepositoryType>::volatile_repo_max_count = "max.count";
+template<typename KeyType, typename RepositoryType>
+const char *VolatileRepository<KeyType, 
RepositoryType>::volatile_repo_max_bytes = "max.bytes";
 
-template<typename T>
-void VolatileRepository<T>::loadComponent(const 
std::shared_ptr<core::ContentRepository>& /*content_repo*/) {
+template<typename KeyType, typename RepositoryType>
+void VolatileRepository<KeyType, RepositoryType>::loadComponent(const 
std::shared_ptr<core::ContentRepository>& /*content_repo*/) {
 }
 
 // Destructor
-template<typename T>
-VolatileRepository<T>::~VolatileRepository() {
+template<typename KeyType, typename RepositoryType>
+VolatileRepository<KeyType, RepositoryType>::~VolatileRepository() {
   for (auto ent : value_vector_) {
     delete ent;
   }
-
-  stop();
 }
 
 /**
- * Initialize the volatile repsitory
+ * Initialize the volatile repository
  **/
-template<typename T>
-bool VolatileRepository<T>::initialize(const std::shared_ptr<Configure> 
&configure) {
+template<typename KeyType, typename RepositoryType>
+bool VolatileRepository<KeyType, RepositoryType>::initialize(const 
std::shared_ptr<Configure> &configure) {
   std::string value = "";
 
   if (configure != nullptr) {
     int64_t max_cnt = 0;
     std::stringstream strstream;
-    strstream << Configure::nifi_volatile_repository_options << getName() << 
"." << volatile_repo_max_count;
+    strstream << Configure::nifi_volatile_repository_options << 
RepositoryType::getName() << "." << volatile_repo_max_count;
     if (configure->get(strstream.str(), value)) {
       if (core::Property::StringToInt(value, max_cnt)) {
         max_count_ = gsl::narrow<uint32_t>(max_cnt);
@@ -217,7 +191,7 @@ bool VolatileRepository<T>::initialize(const 
std::shared_ptr<Configure> &configu
     strstream.str("");
     strstream.clear();
     int64_t max_bytes = 0;
-    strstream << Configure::nifi_volatile_repository_options << getName() << 
"." << volatile_repo_max_bytes;
+    strstream << Configure::nifi_volatile_repository_options << 
RepositoryType::getName() << "." << volatile_repo_max_bytes;
     if (configure->get(strstream.str(), value)) {
       if (core::Property::StringToInt(value, max_bytes)) {
         if (max_bytes <= 0) {
@@ -229,11 +203,11 @@ bool VolatileRepository<T>::initialize(const 
std::shared_ptr<Configure> &configu
     }
   }
 
-  logging::LOG_INFO(logger_) << "Resizing value_vector_ for " << getName() << 
" count is " << max_count_;
-  logging::LOG_INFO(logger_) << "Using a maximum size for " << getName() << " 
of  " << max_size_;
+  logging::LOG_INFO(logger_) << "Resizing value_vector_ for " << 
RepositoryType::getName() << " count is " << max_count_;
+  logging::LOG_INFO(logger_) << "Using a maximum size for " << 
RepositoryType::getName() << " of  " << max_size_;
   value_vector_.reserve(max_count_);
   for (uint32_t i = 0; i < max_count_; i++) {
-    value_vector_.emplace_back(new AtomicEntry<T>(&current_size_, &max_size_));
+    value_vector_.emplace_back(new AtomicEntry<KeyType>(&current_size_, 
&max_size_));
   }
   return true;
 }
@@ -243,14 +217,14 @@ bool VolatileRepository<T>::initialize(const 
std::shared_ptr<Configure> &configu
  * @param key key to add to the repository
  * @param buf buffer
  **/
-template<typename T>
-bool VolatileRepository<T>::Put(T key, const uint8_t *buf, size_t bufLen) {
-  RepoValue<T> new_value(key, buf, bufLen);
+template<typename KeyType, typename RepositoryType>
+bool VolatileRepository<KeyType, RepositoryType>::Put(const KeyType& key, 
const uint8_t *buf, size_t bufLen) {
+  RepoValue<KeyType> new_value(key, buf, bufLen);
 
   const size_t size = new_value.size();
   bool updated = false;
   size_t reclaimed_size = 0;
-  RepoValue<T> old_value;
+  RepoValue<KeyType> old_value;
   do {
     uint16_t private_index = current_index_.fetch_add(1);
     // round robin through the beginning
@@ -266,7 +240,6 @@ bool VolatileRepository<T>::Put(T key, const uint8_t *buf, 
size_t bufLen) {
     updated = value_vector_.at(private_index)->setRepoValue(new_value, 
old_value, reclaimed_size);
     logger_->log_debug("Set repo value at %u out of %u updated %u current_size 
%u, adding %u to  %u", private_index, max_count_, updated == true, 
reclaimed_size, size, current_size_.load());
     if (updated && reclaimed_size > 0) {
-      std::lock_guard<std::mutex> lock(mutex_);
       emplace(old_value);
     }
     if (reclaimed_size > 0) {
@@ -287,8 +260,8 @@ bool VolatileRepository<T>::Put(T key, const uint8_t *buf, 
size_t bufLen) {
   return true;
 }
 
-template<typename T>
-bool VolatileRepository<T>::MultiPut(const std::vector<std::pair<T, 
std::unique_ptr<io::BufferStream>>>& data) {
+template<typename KeyType, typename RepositoryType>
+bool VolatileRepository<KeyType, RepositoryType>::MultiPut(const 
std::vector<std::pair<KeyType, std::unique_ptr<io::BufferStream>>>& data) {
   for (const auto& item : data) {
     if (!Put(item.first, item.second->getBuffer().template as_span<const 
uint8_t>().data(), item.second->size())) {
       return false;
@@ -301,12 +274,12 @@ bool VolatileRepository<T>::MultiPut(const 
std::vector<std::pair<T, std::unique_
  * Deletes the key
  * @return status of the delete operation
  */
-template<typename T>
-bool VolatileRepository<T>::Delete(T key) {
+template<typename KeyType, typename RepositoryType>
+bool VolatileRepository<KeyType, RepositoryType>::Delete(const KeyType& key) {
   logger_->log_debug("Delete from volatile");
   for (auto ent : value_vector_) {
     // let the destructor do the cleanup
-    RepoValue<T> value;
+    RepoValue<KeyType> value;
     if (ent->getValue(key, value)) {
       current_size_ -= value.size();
       logger_->log_debug("Delete and pushed into purge_list from volatile");
@@ -321,11 +294,11 @@ bool VolatileRepository<T>::Delete(T key) {
  * it may not be retrieved again.
  * @return status of the get operation.
  */
-template<typename T>
-bool VolatileRepository<T>::Get(const T &key, std::string &value) {
+template<typename KeyType, typename RepositoryType>
+bool VolatileRepository<KeyType, RepositoryType>::Get(const KeyType &key, 
std::string &value) {
   for (auto ent : value_vector_) {
     // let the destructor do the cleanup
-    RepoValue<T> repo_value;
+    RepoValue<KeyType> repo_value;
     if (ent->getValue(key, repo_value)) {
       current_size_ -= value.size();
       repo_value.emplace(value);
@@ -335,13 +308,14 @@ bool VolatileRepository<T>::Get(const T &key, std::string 
&value) {
   return false;
 }
 
-template<typename T>
-bool 
VolatileRepository<T>::DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>>
 &store, size_t &max_size, 
std::function<std::shared_ptr<core::SerializableComponent>()> lambda) {
+template<typename KeyType, typename RepositoryType>
+bool VolatileRepository<KeyType, 
RepositoryType>::DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>>
 &store,
+                                                      size_t &max_size, 
std::function<std::shared_ptr<core::SerializableComponent>()> lambda) {
   size_t requested_batch = max_size;
   max_size = 0;
   for (auto ent : value_vector_) {
     // let the destructor do the cleanup
-    RepoValue<T> repo_value;
+    RepoValue<KeyType> repo_value;
 
     if (ent->getValue(repo_value)) {
       std::shared_ptr<core::SerializableComponent> newComponent = lambda();
@@ -364,13 +338,13 @@ bool 
VolatileRepository<T>::DeSerialize(std::vector<std::shared_ptr<core::Serial
   }
 }
 
-template<typename T>
-bool 
VolatileRepository<T>::DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>>
 &store, size_t &max_size) {
+template<typename KeyType, typename RepositoryType>
+bool VolatileRepository<KeyType, 
RepositoryType>::DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>>
 &store, size_t &max_size) {
   logger_->log_debug("VolatileRepository -- DeSerialize %u", 
current_size_.load());
   max_size = 0;
   for (auto ent : value_vector_) {
     // let the destructor do the cleanup
-    RepoValue<T> repo_value;
+    RepoValue<KeyType> repo_value;
 
     if (ent->getValue(repo_value)) {
       // we've taken ownership of this repo value
@@ -388,22 +362,6 @@ bool 
VolatileRepository<T>::DeSerialize(std::vector<std::shared_ptr<core::Serial
   }
 }
 
-template<typename T>
-void VolatileRepository<T>::start() {
-  if (this->purge_period_ <= std::chrono::milliseconds(0))
-    return;
-  if (running_)
-    return;
-  running_ = true;
-  thread_ = std::thread(&VolatileRepository<T>::run, this);
-  logger_->log_debug("%s Repository Monitor Thread Start", name_);
-}
-#if defined(__clang__)
-#pragma clang diagnostic pop
-#elif defined(__GNUC__) || defined(__GNUG__)
-#pragma GCC diagnostic pop
-#endif
-
 }  // namespace repository
 }  // namespace core
 }  // namespace minifi
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index e19acb5d0..02ef0914b 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -43,6 +43,7 @@
 #include "io/NetworkPrioritizer.h"
 #include "io/FileStream.h"
 #include "core/ClassLoader.h"
+#include "core/ThreadedRepository.h"
 
 namespace org::apache::nifi::minifi {
 
@@ -513,10 +514,14 @@ uint64_t FlowController::getUptime() {
 
 std::vector<BackTrace> FlowController::getTraces() {
   std::vector<BackTrace> traces{thread_pool_.getTraces()};
-  auto prov_repo_trace = provenance_repo_->getTraces();
-  traces.emplace_back(std::move(prov_repo_trace));
-  auto flow_repo_trace = flow_file_repo_->getTraces();
-  traces.emplace_back(std::move(flow_repo_trace));
+  if (auto provenance_repo = 
std::dynamic_pointer_cast<core::ThreadedRepository>(provenance_repo_)) {
+    auto prov_repo_trace = provenance_repo->getTraces();
+    traces.emplace_back(std::move(prov_repo_trace));
+  }
+  if (auto flow_file_repo = 
std::dynamic_pointer_cast<core::ThreadedRepository>(flow_file_repo_)) {
+    auto flow_repo_trace = flow_file_repo->getTraces();
+    traces.emplace_back(std::move(flow_repo_trace));
+  }
   auto my_traces = TraceResolver::getResolver().getBackTrace("main");
   traces.emplace_back(std::move(my_traces));
   return traces;
diff --git a/libminifi/src/c2/C2Client.cpp b/libminifi/src/c2/C2Client.cpp
index 139661909..667a429ed 100644
--- a/libminifi/src/c2/C2Client.cpp
+++ b/libminifi/src/c2/C2Client.cpp
@@ -30,6 +30,7 @@
 #include "core/controller/ControllerServiceProvider.h"
 #include "c2/C2Agent.h"
 #include "core/state/nodes/FlowInformation.h"
+#include "properties/Configuration.h"
 #include "utils/file/FileSystem.h"
 #include "utils/file/FileUtils.h"
 #include "utils/gsl.h"
diff --git a/libminifi/src/core/Repository.cpp 
b/libminifi/src/core/Repository.cpp
deleted file mode 100644
index a6703092c..000000000
--- a/libminifi/src/core/Repository.cpp
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "core/Repository.h"
-#include <cstdint>
-
-#include "io/BufferStream.h"
-#include "core/logging/Logger.h"
-#include "provenance/Provenance.h"
-
-using namespace std::literals::chrono_literals;
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace core {
-
-void Repository::start() {
-  if (this->purge_period_ <= 0ms)
-    return;
-  if (running_)
-    return;
-  running_ = true;
-  thread_ = std::thread(&Repository::threadExecutor, this);
-  logger_->log_debug("%s Repository Monitor Thread Start", name_);
-}
-
-void Repository::stop() {
-  if (!running_)
-    return;
-  running_ = false;
-  if (thread_.joinable())
-    thread_.join();
-  logger_->log_debug("%s Repository Monitor Thread Stop", name_);
-}
-
-// repoSize
-uint64_t Repository::getRepoSize() {
-  return repo_size_;
-}
-
-void Repository::flush() {
-}
-
-} /* namespace core */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
diff --git a/libminifi/src/core/RepositoryFactory.cpp 
b/libminifi/src/core/RepositoryFactory.cpp
index 47ff0bb0b..6ee1e6af1 100644
--- a/libminifi/src/core/RepositoryFactory.cpp
+++ b/libminifi/src/core/RepositoryFactory.cpp
@@ -17,7 +17,6 @@
 #include "core/RepositoryFactory.h"
 #include <memory>
 #include <string>
-#include <utility>
 #include <algorithm>
 #include "core/ContentRepository.h"
 #include "core/repository/VolatileContentRepository.h"
@@ -29,75 +28,81 @@
 
 using namespace std::literals::chrono_literals;
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace core {
+namespace org::apache::nifi::minifi::core {
 
-std::unique_ptr<core::Repository> createRepository(const std::string& 
configuration_class_name, bool fail_safe, const std::string& repo_name) {
+std::unique_ptr<core::ContentRepository>
+createContentRepository(const std::string& configuration_class_name, bool 
fail_safe, const std::string& repo_name) {
   std::string class_name_lc = configuration_class_name;
   std::transform(class_name_lc.begin(), class_name_lc.end(), 
class_name_lc.begin(), ::tolower);
   try {
-    auto return_obj = 
core::ClassLoader::getDefaultClassLoader().instantiate<core::Repository>(class_name_lc,
 class_name_lc);
+    auto return_obj = 
core::ClassLoader::getDefaultClassLoader().instantiate<core::ContentRepository>(class_name_lc,
+                                                                               
                       class_name_lc);
     if (return_obj) {
-      return_obj->setName(repo_name);
       return return_obj;
     }
-    // if the desired repos don't exist, we can try doing string matches and 
reoly on volatile repositories
-    if (class_name_lc == "flowfilerepository" || class_name_lc == 
"volatileflowfilerepository") {
-      return_obj = 
instantiate<repository::VolatileFlowFileRepository>(repo_name);
-    } else if (class_name_lc == "provenancerepository" || class_name_lc == 
"volatileprovenancefilerepository") {
-      return_obj = 
instantiate<repository::VolatileProvenanceRepository>(repo_name);
-    } else if (class_name_lc == "nooprepository") {
-      return_obj = instantiate<core::Repository>(repo_name);
-    }
-    if (return_obj) {
-      return return_obj;
+    if (class_name_lc == "volatilecontentrepository") {
+      return 
std::make_unique<core::repository::VolatileContentRepository>(repo_name);
+    } else if (class_name_lc == "filesystemrepository") {
+      return 
std::make_unique<core::repository::FileSystemRepository>(repo_name);
     }
     if (fail_safe) {
-      return std::make_unique<core::Repository>("fail_safe", "fail_safe", 1ms, 
1, 1ms);
+      return 
std::make_unique<core::repository::VolatileContentRepository>("fail_safe");
     } else {
       throw std::runtime_error("Support for the provided configuration class 
could not be found");
     }
-  } catch (const std::runtime_error &) {
+  } catch (const std::runtime_error&) {
     if (fail_safe) {
-      return std::make_unique<core::Repository>("fail_safe", "fail_safe", 1ms, 
1, 1ms);
+      return 
std::make_unique<core::repository::VolatileContentRepository>("fail_safe");
     }
   }
 
   throw std::runtime_error("Support for the provided configuration class could 
not be found");
 }
 
-std::unique_ptr<core::ContentRepository> createContentRepository(const 
std::string& configuration_class_name, bool fail_safe, const std::string& 
repo_name) {
+class NoOpThreadedRepository : public core::ThreadedRepository {
+ public:
+  explicit NoOpThreadedRepository(const std::string& repo_name)
+          : core::SerializableComponent(repo_name),
+            ThreadedRepository(repo_name) {
+  }
+
+  ~NoOpThreadedRepository() override {
+    stop();
+  }
+
+ private:
+  void run() override {
+  }
+
+  std::thread& getThread() override {
+    return thread_;
+  }
+
+  std::thread thread_;
+};
+
+std::unique_ptr<core::Repository> createRepository(const std::string& 
configuration_class_name, const std::string& repo_name) {
   std::string class_name_lc = configuration_class_name;
   std::transform(class_name_lc.begin(), class_name_lc.end(), 
class_name_lc.begin(), ::tolower);
   try {
-    auto return_obj = 
core::ClassLoader::getDefaultClassLoader().instantiate<core::ContentRepository>(class_name_lc,
 class_name_lc);
+    auto return_obj = 
core::ClassLoader::getDefaultClassLoader().instantiate<core::ThreadedRepository>(class_name_lc,
+                                                                               
                        class_name_lc);
     if (return_obj) {
+      return_obj->setName(repo_name);
       return return_obj;
     }
-    if (class_name_lc == "volatilecontentrepository") {
-      return 
std::make_unique<core::repository::VolatileContentRepository>(repo_name);
-    } else if (class_name_lc == "filesystemrepository") {
-      return 
std::make_unique<core::repository::FileSystemRepository>(repo_name);
-    }
-    if (fail_safe) {
-      return 
std::make_unique<core::repository::VolatileContentRepository>("fail_safe");
-    } else {
-      throw std::runtime_error("Support for the provided configuration class 
could not be found");
-    }
-  } catch (const std::runtime_error &) {
-    if (fail_safe) {
-      return 
std::make_unique<core::repository::VolatileContentRepository>("fail_safe");
+    // if the desired repos don't exist, we can try doing string matches and 
rely on volatile repositories
+    if (class_name_lc == "flowfilerepository" || class_name_lc == 
"volatileflowfilerepository") {
+      return instantiate<repository::VolatileFlowFileRepository>(repo_name);
+    } else if (class_name_lc == "provenancerepository" || class_name_lc == 
"volatileprovenancefilerepository") {
+      return instantiate<repository::VolatileProvenanceRepository>(repo_name);
+    } else if (class_name_lc == "nooprepository") {
+      return std::make_unique<core::NoOpThreadedRepository>(repo_name);
     }
+    return {};
+  } catch (const std::runtime_error&) {
+    throw;
   }
-
-  throw std::runtime_error("Support for the provided configuration class could 
not be found");
 }
 
-} /* namespace core */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+}  // namespace org::apache::nifi::minifi::core
diff --git a/libminifi/src/core/repository/FileSystemRepository.cpp 
b/libminifi/src/core/repository/FileSystemRepository.cpp
index 0b233cd8a..85f42bfb4 100644
--- a/libminifi/src/core/repository/FileSystemRepository.cpp
+++ b/libminifi/src/core/repository/FileSystemRepository.cpp
@@ -22,14 +22,9 @@
 #include "io/FileStream.h"
 #include "utils/file/FileUtils.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace core {
-namespace repository {
+namespace org::apache::nifi::minifi::core::repository {
 
-bool FileSystemRepository::initialize(const std::shared_ptr<minifi::Configure> 
&configuration) {
+bool FileSystemRepository::initialize(const 
std::shared_ptr<minifi::Configure>& configuration) {
   std::string value;
   if 
(configuration->get(Configure::nifi_dbcontent_repository_directory_default, 
value)) {
     directory_ = value;
@@ -39,31 +34,24 @@ bool FileSystemRepository::initialize(const 
std::shared_ptr<minifi::Configure> &
   utils::file::create_dir(directory_);
   return true;
 }
-void FileSystemRepository::stop() {
-}
 
-std::shared_ptr<io::BaseStream> FileSystemRepository::write(const 
minifi::ResourceClaim &claim, bool append) {
+std::shared_ptr<io::BaseStream> FileSystemRepository::write(const 
minifi::ResourceClaim& claim, bool append) {
   return std::make_shared<io::FileStream>(claim.getContentFullPath(), append);
 }
 
-bool FileSystemRepository::exists(const minifi::ResourceClaim &streamId) {
+bool FileSystemRepository::exists(const minifi::ResourceClaim& streamId) {
   std::ifstream file(streamId.getContentFullPath());
   return file.good();
 }
 
-std::shared_ptr<io::BaseStream> FileSystemRepository::read(const 
minifi::ResourceClaim &claim) {
+std::shared_ptr<io::BaseStream> FileSystemRepository::read(const 
minifi::ResourceClaim& claim) {
   return std::make_shared<io::FileStream>(claim.getContentFullPath(), 0, 
false);
 }
 
-bool FileSystemRepository::remove(const minifi::ResourceClaim &claim) {
+bool FileSystemRepository::remove(const minifi::ResourceClaim& claim) {
   logger_->log_debug("Deleting resource %s", claim.getContentFullPath());
   std::remove(claim.getContentFullPath().c_str());
   return true;
 }
 
-} /* namespace repository */
-} /* namespace core */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+}  // namespace org::apache::nifi::minifi::core::repository
diff --git a/libminifi/src/core/repository/VolatileContentRepository.cpp 
b/libminifi/src/core/repository/VolatileContentRepository.cpp
index a6fd0bb39..db642e973 100644
--- a/libminifi/src/core/repository/VolatileContentRepository.cpp
+++ b/libminifi/src/core/repository/VolatileContentRepository.cpp
@@ -50,29 +50,10 @@ bool VolatileContentRepository::initialize(const 
std::shared_ptr<Configure> &con
     }
     value_vector_.clear();
   }
-  start();
 
   return true;
 }
 
-void VolatileContentRepository::stop() {
-  running_ = false;
-}
-
-void VolatileContentRepository::run() {
-}
-
-void VolatileContentRepository::start() {
-  if (this->purge_period_ <= 0ms)
-    return;
-  if (running_)
-    return;
-  thread_ = std::thread(&VolatileContentRepository::run, sharedFromThis());
-  thread_.detach();
-  running_ = true;
-  logger_->log_info("%s Repository Monitor Thread Start", getName());
-}
-
 std::shared_ptr<io::BaseStream> VolatileContentRepository::write(const 
minifi::ResourceClaim &claim, bool /*append*/) {
   logger_->log_info("enter write for %s", claim.getContentFullPath());
   {
diff --git a/libminifi/test/flow-tests/SessionTests.cpp 
b/libminifi/test/flow-tests/SessionTests.cpp
index ed317d4c2..36a15a08d 100644
--- a/libminifi/test/flow-tests/SessionTests.cpp
+++ b/libminifi/test/flow-tests/SessionTests.cpp
@@ -30,6 +30,7 @@
 #include "core/ProcessSession.h"
 #include "core/ProcessorNode.h"
 #include "core/Processor.h"
+#include "core/RepositoryFactory.h"
 #include "repository/VolatileContentRepository.h"
 
 namespace {
@@ -68,7 +69,7 @@ TEST_CASE("Import null data") {
   config->set(minifi::Configure::nifi_dbcontent_repository_directory_default, 
utils::file::FileUtils::concat_path(dir, "content_repository"));
   config->set(minifi::Configure::nifi_flowfile_repository_directory_default, 
utils::file::FileUtils::concat_path(dir, "flowfile_repository"));
 
-  auto prov_repo = std::make_shared<core::Repository>();
+  std::shared_ptr<core::Repository> prov_repo = 
core::createRepository("nooprepository");
   std::shared_ptr<core::Repository> ff_repository = 
std::make_shared<core::repository::FlowFileRepository>("flowFileRepository", 
SESSIONTEST_FLOWFILE_CHECKPOINT_DIR);
   std::shared_ptr<core::ContentRepository> content_repo;
   SECTION("VolatileContentRepository") {
diff --git a/libminifi/test/flow-tests/TestControllerWithFlow.h 
b/libminifi/test/flow-tests/TestControllerWithFlow.h
index b834b8374..b311b15c3 100644
--- a/libminifi/test/flow-tests/TestControllerWithFlow.h
+++ b/libminifi/test/flow-tests/TestControllerWithFlow.h
@@ -55,7 +55,7 @@ class TestControllerWithFlow: public TestController{
   }
 
   void setupFlow() {
-    std::shared_ptr<core::Repository> prov_repo = 
std::make_shared<TestRepository>();
+    std::shared_ptr<core::Repository> prov_repo = 
std::make_shared<TestThreadedRepository>();
     std::shared_ptr<core::Repository> ff_repo = 
std::make_shared<TestFlowRepository>();
     std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
 
diff --git a/libminifi/test/integration/IntegrationBase.h 
b/libminifi/test/integration/IntegrationBase.h
index 3c58d67cc..58c4a790a 100644
--- a/libminifi/test/integration/IntegrationBase.h
+++ b/libminifi/test/integration/IntegrationBase.h
@@ -140,7 +140,7 @@ void IntegrationBase::run(const std::optional<std::string>& 
test_file_location,
   using namespace std::literals::chrono_literals;
   testSetup();
 
-  std::shared_ptr<core::Repository> test_repo = 
std::make_shared<TestRepository>();
+  std::shared_ptr<core::Repository> test_repo = 
std::make_shared<TestThreadedRepository>();
   std::shared_ptr<core::Repository> test_flow_repo = 
std::make_shared<TestFlowRepository>();
 
   if (test_file_location) {
@@ -187,8 +187,6 @@ void IntegrationBase::run(const std::optional<std::string>& 
test_file_location,
     std::shared_ptr<core::ProcessGroup> pg(flow_config->getRoot());
     queryRootProcessGroup(pg);
 
-    std::shared_ptr<TestRepository> repo = 
std::static_pointer_cast<TestRepository>(test_repo);
-
     const auto request_restart = [&, this] {
       ++restart_requested_count_;
       running = true;
diff --git a/libminifi/test/integration/ProvenanceReportingTest.cpp 
b/libminifi/test/integration/ProvenanceReportingTest.cpp
index 80aad7d0e..902d81671 100644
--- a/libminifi/test/integration/ProvenanceReportingTest.cpp
+++ b/libminifi/test/integration/ProvenanceReportingTest.cpp
@@ -50,15 +50,14 @@ int main(int argc, char **argv) {
   LogTestController::getInstance().setDebug<core::ProcessGroup>();
 
   std::shared_ptr<minifi::Configure> configuration = 
std::make_shared<minifi::Configure>();
-  std::shared_ptr<core::Repository> test_repo = 
std::make_shared<TestRepository>();
-  std::shared_ptr<core::Repository> test_flow_repo = 
std::make_shared<TestFlowRepository>();
+  auto test_repo = std::make_shared<TestThreadedRepository>();
+  auto test_flow_repo = std::make_shared<TestFlowRepository>();
 
   configuration->set(minifi::Configure::nifi_flow_configuration_file, 
test_file_location);
   std::shared_ptr<minifi::io::StreamFactory> stream_factory = 
minifi::io::StreamFactory::getInstance(configuration);
   std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
   std::unique_ptr<core::FlowConfiguration> yaml_ptr = 
std::make_unique<core::YamlConfiguration>(
       test_repo, test_repo, content_repo, stream_factory, configuration, 
test_file_location);
-  std::shared_ptr<TestRepository> repo = 
std::static_pointer_cast<TestRepository>(test_repo);
 
   const auto controller = std::make_shared<minifi::FlowController>(
       test_repo, test_flow_repo, configuration, std::move(yaml_ptr), 
content_repo, DEFAULT_ROOT_GROUP_NAME,
diff --git a/libminifi/test/persistence-tests/PersistenceTests.cpp 
b/libminifi/test/persistence-tests/PersistenceTests.cpp
index 5fe5cf24a..a0369062a 100644
--- a/libminifi/test/persistence-tests/PersistenceTests.cpp
+++ b/libminifi/test/persistence-tests/PersistenceTests.cpp
@@ -179,8 +179,8 @@ TEST_CASE("Processors Can Store FlowFiles", "[TestP1]") {
   config->set(minifi::Configure::nifi_dbcontent_repository_directory_default, 
utils::file::FileUtils::concat_path(dir, "content_repository"));
   config->set(minifi::Configure::nifi_flowfile_repository_directory_default, 
utils::file::FileUtils::concat_path(dir, "flowfile_repository"));
 
-  std::shared_ptr<core::Repository> prov_repo = 
std::make_shared<TestRepository>();
-  std::shared_ptr<core::repository::FlowFileRepository> ff_repository = 
std::make_shared<core::repository::FlowFileRepository>("flowFileRepository", 
PERSISTENCETEST_FLOWFILE_CHECKPOINT_DIR);
+  std::shared_ptr<core::Repository> prov_repo = 
std::make_shared<TestThreadedRepository>();
+  auto ff_repository = 
std::make_shared<core::repository::FlowFileRepository>("flowFileRepository", 
PERSISTENCETEST_FLOWFILE_CHECKPOINT_DIR);
   std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::FileSystemRepository>();
   ff_repository->initialize(config);
   content_repo->initialize(config);
@@ -288,7 +288,7 @@ TEST_CASE("Persisted flowFiles are updated on 
modification", "[TestP1]") {
   config->set(minifi::Configure::nifi_dbcontent_repository_directory_default, 
utils::file::FileUtils::concat_path(dir, "content_repository"));
   config->set(minifi::Configure::nifi_flowfile_repository_directory_default, 
utils::file::FileUtils::concat_path(dir, "flowfile_repository"));
 
-  std::shared_ptr<core::Repository> prov_repo = 
std::make_shared<TestRepository>();
+  std::shared_ptr<core::Repository> prov_repo = 
std::make_shared<TestThreadedRepository>();
   std::shared_ptr<core::Repository> ff_repository = 
std::make_shared<core::repository::FlowFileRepository>("flowFileRepository", 
PERSISTENCETEST_FLOWFILE_CHECKPOINT_DIR);
   std::shared_ptr<core::ContentRepository> content_repo;
   SECTION("VolatileContentRepository") {
diff --git a/libminifi/test/rocksdb-tests/ProvenanceTests.cpp 
b/libminifi/test/rocksdb-tests/ProvenanceTests.cpp
index 817b85f7b..7a3f06557 100644
--- a/libminifi/test/rocksdb-tests/ProvenanceTests.cpp
+++ b/libminifi/test/rocksdb-tests/ProvenanceTests.cpp
@@ -24,8 +24,8 @@
 #include "core/Core.h"
 #include "core/repository/AtomicRepoEntries.h"
 #include "core/repository/VolatileProvenanceRepository.h"
+#include "core/RepositoryFactory.h"
 #include "FlowFileRecord.h"
-#include "FlowFileRepository.h"
 #include "provenance/Provenance.h"
 #include "../unit/ProvenanceTestHelper.h"
 #include "../TestBase.h"
@@ -36,7 +36,7 @@ using namespace std::literals::chrono_literals;
 
 TEST_CASE("Test Provenance record create", 
"[Testprovenance::ProvenanceEventRecord]") {
   provenance::ProvenanceEventRecord 
record1(provenance::ProvenanceEventRecord::ProvenanceEventType::CREATE, "blah", 
"blahblah");
-  REQUIRE(record1.getAttributes().size() == 0);
+  REQUIRE(record1.getAttributes().empty());
   REQUIRE(record1.getAlternateIdentifierUri().length() == 0);
 }
 
@@ -86,7 +86,7 @@ TEST_CASE("Test Flowfile record added to provenance", 
"[TestFlowAndProv1]") {
   utils::Identifier childId = record2.getChildrenUuids().at(0);
   REQUIRE(childId == ffr1->getUUID());
   record2.removeChildUuid(childId);
-  REQUIRE(record2.getChildrenUuids().size() == 0);
+  REQUIRE(record2.getChildrenUuids().empty());
 }
 
 TEST_CASE("Test Provenance record serialization Volatile", 
"[Testprovenance::ProvenanceEventRecordSerializeDeser]") {
@@ -100,7 +100,7 @@ TEST_CASE("Test Provenance record serialization Volatile", 
"[Testprovenance::Pro
   auto sample = 65555ms;
 
   std::shared_ptr<core::Repository> testRepository = 
std::make_shared<core::repository::VolatileProvenanceRepository>();
-  testRepository->initialize(0);
+  testRepository->initialize(nullptr);
   record1.setEventDuration(sample);
 
   record1.Serialize(testRepository);
@@ -126,7 +126,7 @@ TEST_CASE("Test Flowfile record added to provenance using 
Volatile Repo", "[Test
 
   auto sample = 65555ms;
   std::shared_ptr<core::Repository> testRepository = 
std::make_shared<core::repository::VolatileProvenanceRepository>();
-  testRepository->initialize(0);
+  testRepository->initialize(nullptr);
   record1.setEventDuration(sample);
 
   record1.Serialize(testRepository);
@@ -138,7 +138,7 @@ TEST_CASE("Test Flowfile record added to provenance using 
Volatile Repo", "[Test
   utils::Identifier childId = record2.getChildrenUuids().at(0);
   REQUIRE(childId == ffr1->getUUID());
   record2.removeChildUuid(childId);
-  REQUIRE(record2.getChildrenUuids().size() == 0);
+  REQUIRE(record2.getChildrenUuids().empty());
 }
 
 TEST_CASE("Test Provenance record serialization NoOp", 
"[Testprovenance::ProvenanceEventRecordSerializeDeser]") {
@@ -151,8 +151,8 @@ TEST_CASE("Test Provenance record serialization NoOp", 
"[Testprovenance::Provena
 
   auto sample = 65555ms;
 
-  std::shared_ptr<core::Repository> testRepository = 
std::make_shared<core::Repository>();
-  testRepository->initialize(0);
+  std::shared_ptr<core::Repository> testRepository = 
core::createRepository("nooprepository");
+  testRepository->initialize(nullptr);
   record1.setEventDuration(sample);
 
   REQUIRE(record1.Serialize(testRepository) == true);
diff --git a/libminifi/test/rocksdb-tests/RepoTests.cpp 
b/libminifi/test/rocksdb-tests/RepoTests.cpp
index cfd1b9ac6..f02dff669 100644
--- a/libminifi/test/rocksdb-tests/RepoTests.cpp
+++ b/libminifi/test/rocksdb-tests/RepoTests.cpp
@@ -59,10 +59,10 @@ class TestProcessor : public minifi::core::Processor {
 }  // namespace
 
 TEST_CASE("Test Repo Names", "[TestFFR1]") {
-  auto repoA = minifi::core::createRepository("FlowFileRepository", false, 
"flowfile");
+  auto repoA = minifi::core::createRepository("FlowFileRepository", 
"flowfile");
   REQUIRE("flowfile" == repoA->getName());
 
-  auto repoB = minifi::core::createRepository("ProvenanceRepository", false, 
"provenance");
+  auto repoB = minifi::core::createRepository("ProvenanceRepository", 
"provenance");
   REQUIRE("provenance" == repoB->getName());
 }
 
@@ -287,8 +287,8 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") {
   config->set(minifi::Configure::nifi_dbcontent_repository_directory_default, 
utils::file::FileUtils::concat_path(dir, "content_repository"));
   config->set(minifi::Configure::nifi_flowfile_repository_directory_default, 
utils::file::FileUtils::concat_path(dir, "flowfile_repository"));
 
-  std::shared_ptr<core::Repository> prov_repo = 
std::make_shared<TestRepository>();
-  std::shared_ptr<core::repository::FlowFileRepository> ff_repository = 
std::make_shared<core::repository::FlowFileRepository>("flowFileRepository", 
REPOTEST_FLOWFILE_CHECKPOINT_DIR);
+  std::shared_ptr<core::Repository> prov_repo = 
std::make_shared<TestThreadedRepository>();
+  auto ff_repository = 
std::make_shared<core::repository::FlowFileRepository>("flowFileRepository", 
REPOTEST_FLOWFILE_CHECKPOINT_DIR);
   std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::FileSystemRepository>();
   ff_repository->initialize(config);
   content_repo->initialize(config);
@@ -366,7 +366,8 @@ TEST_CASE("Flush deleted flowfiles before shutdown", 
"[TestFFR7]") {
    public:
     explicit TestFlowFileRepository(const std::string& name)
         : core::SerializableComponent(name),
-          FlowFileRepository(name, REPOTEST_FLOWFILE_CHECKPOINT_DIR, 
FLOWFILE_REPOSITORY_DIRECTORY, 10min, MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, 
1ms) {}
+          FlowFileRepository(name, REPOTEST_FLOWFILE_CHECKPOINT_DIR, 
core::repository::FLOWFILE_REPOSITORY_DIRECTORY,
+                             10min, 
core::repository::MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, 1ms) {}
     void flush() override {
       FlowFileRepository::flush();
       if (onFlush_) {
diff --git a/libminifi/test/unit/MetricsTests.cpp 
b/libminifi/test/unit/MetricsTests.cpp
index 6c4faafbb..a3c96128b 100644
--- a/libminifi/test/unit/MetricsTests.cpp
+++ b/libminifi/test/unit/MetricsTests.cpp
@@ -95,7 +95,7 @@ TEST_CASE("RepositorymetricsHaveRepo", "[c2m4]") {
 
   REQUIRE("RepositoryMetrics" == metrics.getName());
 
-  std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
+  auto repo = std::make_shared<TestThreadedRepository>();
 
   metrics.addRepository(repo);
   {
diff --git a/libminifi/test/unit/ProvenanceTestHelper.h 
b/libminifi/test/unit/ProvenanceTestHelper.h
index dd724061a..194db5c96 100644
--- a/libminifi/test/unit/ProvenanceTestHelper.h
+++ b/libminifi/test/unit/ProvenanceTestHelper.h
@@ -29,53 +29,36 @@
 #include <vector>
 #include "core/repository/VolatileContentRepository.h"
 #include "core/Processor.h"
+#include "core/ThreadedRepository.h"
 #include "Connection.h"
 #include "FlowController.h"
 #include "properties/Configure.h"
 #include "provenance/Provenance.h"
 #include "SwapManager.h"
 
-#if defined(__clang__)
-#pragma clang diagnostic push
-#pragma clang diagnostic ignored "-Woverloaded-virtual"
-#elif defined(__GNUC__) || defined(__GNUG__)
-#pragma GCC diagnostic push
-#pragma GCC diagnostic ignored "-Woverloaded-virtual"
-#endif
-
 using namespace std::literals::chrono_literals;
 
-/**
- * Test repository
- */
-class TestRepository : public org::apache::nifi::minifi::core::Repository {
+template <typename T_BaseRepository>
+class TestRepositoryBase : public T_BaseRepository {
  public:
-  TestRepository()
+  TestRepositoryBase()
       : org::apache::nifi::minifi::core::SerializableComponent("repo_name"),
-        Repository("repo_name", "./dir", 1s, 100, 0ms) {
-  }
-
-  ~TestRepository() override {
-    stop();
+        T_BaseRepository("repo_name", "./dir", 1s, 100, 0ms) {
   }
 
   bool initialize(const std::shared_ptr<org::apache::nifi::minifi::Configure> 
&) override {
     return true;
   }
 
-  void start() override {
-    running_ = true;
-  }
-
   void setFull() {
-    repo_full_ = true;
+    T_BaseRepository::repo_full_ = true;
   }
 
-  bool isNoop() override {
+  bool isNoop() const override {
     return false;
   }
 
-  bool Put(std::string key, const uint8_t *buf, size_t bufLen) override {
+  bool Put(const std::string& key, const uint8_t *buf, size_t bufLen) override 
{
     std::lock_guard<std::mutex> lock{repository_results_mutex_};
     repository_results_.emplace(key, std::string{reinterpret_cast<const 
char*>(buf), bufLen});
     return true;
@@ -95,7 +78,7 @@ class TestRepository : public 
org::apache::nifi::minifi::core::Repository {
     return Put(key, buffer, bufferSize);
   }
 
-  bool Delete(std::string key) override {
+  bool Delete(const std::string& key) override {
     std::lock_guard<std::mutex> lock{repository_results_mutex_};
     repository_results_.erase(key);
     return true;
@@ -124,7 +107,7 @@ class TestRepository : public 
org::apache::nifi::minifi::core::Repository {
         break;
       }
       const auto eventRead = store.at(max_size);
-      eventRead->DeSerialize(gsl::make_span(entry.second).as_span<const 
std::byte>());
+      eventRead->DeSerialize(gsl::make_span(entry.second).template 
as_span<const std::byte>());
       ++max_size;
     }
     return true;
@@ -150,50 +133,66 @@ class TestRepository : public 
org::apache::nifi::minifi::core::Repository {
     return repository_results_;
   }
 
-  void 
getProvenanceRecord(std::vector<std::shared_ptr<org::apache::nifi::minifi::provenance::ProvenanceEventRecord>>
 &records, int maxSize) {
-    std::lock_guard<std::mutex> lock{repository_results_mutex_};
-    for (const auto &entry : repository_results_) {
-      if (records.size() >= static_cast<uint64_t>(maxSize))
-        break;
-      const auto eventRead = 
std::make_shared<org::apache::nifi::minifi::provenance::ProvenanceEventRecord>();
+ protected:
+  mutable std::mutex repository_results_mutex_;
+  std::map<std::string, std::string> repository_results_;
+};
 
-      if (eventRead->DeSerialize(gsl::make_span(entry.second).as_span<const 
std::byte>())) {
-        records.push_back(eventRead);
-      }
-    }
+class TestRepository : public 
TestRepositoryBase<org::apache::nifi::minifi::core::Repository> {
+ public:
+  TestRepository()
+    : org::apache::nifi::minifi::core::SerializableComponent("repo_name") {
+  }
+
+  bool start() override {
+    return true;
+  }
+
+  bool stop() override {
+    return true;
+  }
+};
+
+class TestThreadedRepository : public 
TestRepositoryBase<org::apache::nifi::minifi::core::ThreadedRepository> {
+ public:
+  TestThreadedRepository()
+    : org::apache::nifi::minifi::core::SerializableComponent("repo_name") {
   }
 
+  ~TestThreadedRepository() override {
+    stop();
+  }
+
+ private:
   void run() override {
     // do nothing
   }
 
- protected:
-  mutable std::mutex repository_results_mutex_;
-  std::map<std::string, std::string> repository_results_;
+  std::thread& getThread() override {
+    return thread_;
+  }
+
+  std::thread thread_;
 };
 
-class TestFlowRepository : public org::apache::nifi::minifi::core::Repository {
+class TestFlowRepository : public 
org::apache::nifi::minifi::core::ThreadedRepository {
  public:
   TestFlowRepository()
       : org::apache::nifi::minifi::core::SerializableComponent("ff"),
-        org::apache::nifi::minifi::core::Repository("ff", "./dir", 1s, 100, 
0ms) {
-  }
-
-  ~TestFlowRepository() override {
-    stop();
+        org::apache::nifi::minifi::core::ThreadedRepository("ff", "./dir", 1s, 
100, 0ms) {
   }
 
   bool initialize(const std::shared_ptr<org::apache::nifi::minifi::Configure> 
&) override {
     return true;
   }
 
-  bool Put(std::string key, const uint8_t *buf, size_t bufLen) override {
+  bool Put(const std::string& key, const uint8_t *buf, size_t bufLen) override 
{
     std::lock_guard<std::mutex> lock{repository_results_mutex_};
     repository_results_.emplace(key, std::string{reinterpret_cast<const 
char*>(buf), bufLen});
     return true;
   }
-  // Delete
-  bool Delete(std::string key) override {
+
+  bool Delete(const std::string& key) override {
     std::lock_guard<std::mutex> lock{repository_results_mutex_};
     repository_results_.erase(key);
     return true;
@@ -210,36 +209,22 @@ class TestFlowRepository : public 
org::apache::nifi::minifi::core::Repository {
     }
   }
 
-  std::map<std::string, std::string> getRepoMap() const {
-    std::lock_guard<std::mutex> lock{repository_results_mutex_};
-    return repository_results_;
-  }
-
-  void 
getProvenanceRecord(std::vector<std::shared_ptr<org::apache::nifi::minifi::provenance::ProvenanceEventRecord>>
 &records, int maxSize) {
-    std::lock_guard<std::mutex> lock{repository_results_mutex_};
-    for (const auto &entry : repository_results_) {
-      if (records.size() >= static_cast<uint64_t>(maxSize))
-        break;
-      const auto eventRead = 
std::make_shared<org::apache::nifi::minifi::provenance::ProvenanceEventRecord>();
-
-      if (eventRead->DeSerialize(gsl::make_span(entry.second).as_span<const 
std::byte>())) {
-        records.push_back(eventRead);
-      }
-    }
+  void loadComponent(const 
std::shared_ptr<org::apache::nifi::minifi::core::ContentRepository>& 
/*content_repo*/) override {
   }
 
-  void loadComponent(const 
std::shared_ptr<org::apache::nifi::minifi::core::ContentRepository>& 
content_repo) override {
-    content_repo_ = content_repo;
+ private:
+  void run() override {
   }
 
-  void run() override {
-    // do nothing
+  std::thread& getThread() override {
+    return thread_;
   }
 
  protected:
   mutable std::mutex repository_results_mutex_;
   std::map<std::string, std::string> repository_results_;
   std::shared_ptr<org::apache::nifi::minifi::core::ContentRepository> 
content_repo_;
+  std::thread thread_;
 };
 
 class TestFlowController : public org::apache::nifi::minifi::FlowController {
@@ -286,23 +271,18 @@ class TestFlowController : public 
org::apache::nifi::minifi::FlowController {
   }
 
   std::shared_ptr<org::apache::nifi::minifi::core::Processor> 
createProcessor(const std::string& /*name*/, const 
org::apache::nifi::minifi::utils::Identifier& /*uuid*/) {
-    return 0;
+    return nullptr;
   }
 
   org::apache::nifi::minifi::core::ProcessGroup *createRootProcessGroup(const 
std::string& /*name*/, const org::apache::nifi::minifi::utils::Identifier& 
/*uuid*/) {
-    return 0;
+    return nullptr;
   }
 
   org::apache::nifi::minifi::core::ProcessGroup 
*createRemoteProcessGroup(const std::string& /*name*/, const 
org::apache::nifi::minifi::utils::Identifier& /*uuid*/) {
-    return 0;
+    return nullptr;
   }
 
   std::shared_ptr<org::apache::nifi::minifi::Connection> 
createConnection(const std::string& /*name*/, const 
org::apache::nifi::minifi::utils::Identifier& /*uuid*/) {
-    return 0;
+    return nullptr;
   }
 };
-#if defined(__clang__)
-#pragma clang diagnostic pop
-#elif defined(__GNUC__) || defined(__GNUG__)
-#pragma GCC diagnostic pop
-#endif
diff --git a/libminifi/test/unit/SchedulingAgentTests.cpp 
b/libminifi/test/unit/SchedulingAgentTests.cpp
index 6cb2fc68d..b80c7563c 100644
--- a/libminifi/test/unit/SchedulingAgentTests.cpp
+++ b/libminifi/test/unit/SchedulingAgentTests.cpp
@@ -46,9 +46,9 @@ class CountOnTriggersProcessor : public 
minifi::core::Processor {
 
 
 TEST_CASE("SchedulingAgentTests", "[SchedulingAgent]") {
-  std::shared_ptr<core::Repository> test_repo = 
std::make_shared<TestRepository>();
+  std::shared_ptr<core::Repository> test_repo = 
std::make_shared<TestThreadedRepository>();
   std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
-  std::shared_ptr<TestRepository> repo = 
std::static_pointer_cast<TestRepository>(test_repo);
+  auto repo = std::static_pointer_cast<TestThreadedRepository>(test_repo);
   std::shared_ptr<minifi::FlowController> controller =
       std::make_shared<TestFlowController>(test_repo, test_repo, content_repo);
 
diff --git a/minifi_main/MiNiFiMain.cpp b/minifi_main/MiNiFiMain.cpp
index 1d90cef9c..8dee11d0f 100644
--- a/minifi_main/MiNiFiMain.cpp
+++ b/minifi_main/MiNiFiMain.cpp
@@ -286,18 +286,18 @@ int main(int argc, char **argv) {
 
     configure->get(minifi::Configure::nifi_provenance_repository_class_name, 
prov_repo_class);
     // Create repos for flow record and provenance
-    std::shared_ptr<core::Repository> prov_repo = 
core::createRepository(prov_repo_class, true, "provenance");
+    std::shared_ptr prov_repo = core::createRepository(prov_repo_class, 
"provenance");
 
-    if (!prov_repo->initialize(configure)) {
+    if (!prov_repo || !prov_repo->initialize(configure)) {
       logger->log_error("Provenance repository failed to initialize, 
exiting..");
       exit(1);
     }
 
     configure->get(minifi::Configure::nifi_flow_repository_class_name, 
flow_repo_class);
 
-    std::shared_ptr<core::Repository> flow_repo = 
core::createRepository(flow_repo_class, true, "flowfile");
+    std::shared_ptr flow_repo = core::createRepository(flow_repo_class, 
"flowfile");
 
-    if (!flow_repo->initialize(configure)) {
+    if (!flow_repo || !flow_repo->initialize(configure)) {
       logger->log_error("Flow file repository failed to initialize, 
exiting..");
       exit(1);
     }
@@ -343,7 +343,7 @@ int main(int argc, char **argv) {
           std::vector<std::string> repo_paths;
           repo_paths.reserve(3);
           // REPOSITORY_DIRECTORY is a dummy path used by noop repositories
-          const auto path_valid = [](const std::string& p) { return !p.empty() 
&& p != REPOSITORY_DIRECTORY; };
+          const auto path_valid = [](const std::string& p) { return !p.empty() 
&& p != org::apache::nifi::minifi::core::REPOSITORY_DIRECTORY; };
           auto prov_repo_path = prov_repo->getDirectory();
           auto flow_repo_path = flow_repo->getDirectory();
           auto content_repo_storage_path = content_repo->getStoragePath();
diff --git a/nanofi/include/cxx/Instance.h b/nanofi/include/cxx/Instance.h
index dac38ca76..40d0f039f 100644
--- a/nanofi/include/cxx/Instance.h
+++ b/nanofi/include/cxx/Instance.h
@@ -29,6 +29,7 @@
 #include "core/repository/VolatileContentRepository.h"
 #include "core/repository/FileSystemRepository.h"
 #include "core/Repository.h"
+#include "core/RepositoryFactory.h"
 
 #include "C2CallbackAgent.h"
 #include "core/Connectable.h"
@@ -67,7 +68,7 @@ class Instance {
  public:
 
   explicit Instance(const std::string &url, const std::string &port, const 
std::string &repo_class_name = "")
-      : no_op_repo_(std::make_shared<minifi::core::Repository>()),
+      : no_op_repo_(minifi::core::createRepository("nooprepository")),
         url_(url),
         configure_(std::make_shared<Configure>()) {
     if (repo_class_name == "filesystemrepository") {

Reply via email to