This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 67127a1  MINIFICPP-1300 - Extract conditional reload for the 
schedulers of the FlowController
67127a1 is described below

commit 67127a18e3a1c4c388ac41e5c2ebc3d53e9a85af
Author: Adam Hunyadi <[email protected]>
AuthorDate: Fri Jul 31 15:17:54 2020 +0200

    MINIFICPP-1300 - Extract conditional reload for the schedulers of the 
FlowController
    
    MINIFICPP-1300 - Decouple the ownership models of components using a 
ControllerServiceProvider (eg. FlowController)
    
    Signed-off-by: Arpad Boda <[email protected]>
    
    This closes #858
---
 extensions/coap/protocols/CoapC2Protocol.cpp       |  2 +-
 extensions/coap/protocols/CoapC2Protocol.h         |  2 +-
 .../expression-language/ProcessContextExpr.h       |  4 ++--
 extensions/http-curl/protocols/AgentPrinter.cpp    |  2 +-
 extensions/http-curl/protocols/AgentPrinter.h      |  2 +-
 extensions/http-curl/protocols/RESTReceiver.cpp    |  2 +-
 extensions/http-curl/protocols/RESTReceiver.h      |  2 +-
 extensions/http-curl/protocols/RESTSender.cpp      |  2 +-
 extensions/http-curl/protocols/RESTSender.h        |  2 +-
 .../http-curl/tests/unit/InvokeHTTPTests.cpp       | 10 ++++----
 extensions/mqtt/protocol/MQTTC2Protocol.cpp        |  2 +-
 extensions/mqtt/protocol/MQTTC2Protocol.h          |  2 +-
 .../tests/integration/TestExecuteProcess.cpp       |  6 ++---
 .../standard-processors/tests/unit/GetTCPTests.cpp | 15 +++++-------
 .../tests/unit/ProcessorTests.cpp                  | 12 ++++------
 libminifi/include/CronDrivenSchedulingAgent.h      |  2 +-
 libminifi/include/EventDrivenSchedulingAgent.h     |  2 +-
 libminifi/include/FlowController.h                 | 11 ++++++++-
 libminifi/include/SchedulingAgent.h                |  4 ++--
 libminifi/include/ThreadedSchedulingAgent.h        |  5 ++--
 libminifi/include/TimerDrivenSchedulingAgent.h     |  2 +-
 libminifi/include/c2/C2Agent.h                     |  4 ++--
 libminifi/include/c2/C2Protocol.h                  |  4 ++--
 libminifi/include/c2/ControllerSocketProtocol.h    |  2 +-
 libminifi/include/c2/HeartBeatReporter.h           |  4 ++--
 libminifi/include/core/ProcessContext.h            | 14 +++++------
 libminifi/include/core/ProcessContextBuilder.h     |  4 ++--
 .../core/controller/ControllerServiceProvider.h    |  2 +-
 libminifi/src/FlowController.cpp                   | 27 +++++++---------------
 libminifi/src/c2/C2Agent.cpp                       |  2 +-
 libminifi/src/c2/ControllerSocketProtocol.cpp      |  2 +-
 libminifi/src/core/ProcessContextBuilder.cpp       |  2 +-
 libminifi/test/TestBase.cpp                        |  4 ++--
 .../test/archive-tests/CompressContentTests.cpp    |  3 +--
 libminifi/test/archive-tests/MergeFileTests.cpp    |  3 +--
 libminifi/test/integration/IntegrationBase.h       |  2 +-
 .../test/persistence-tests/PersistenceTests.cpp    |  6 ++---
 libminifi/test/rocksdb-tests/RepoTests.cpp         |  3 +--
 nanofi/include/cxx/C2CallbackAgent.h               |  2 +-
 nanofi/include/cxx/Instance.h                      |  6 ++---
 nanofi/src/cxx/C2CallbackAgent.cpp                 |  2 +-
 nanofi/src/cxx/Plan.cpp                            |  4 ++--
 42 files changed, 87 insertions(+), 108 deletions(-)

diff --git a/extensions/coap/protocols/CoapC2Protocol.cpp 
b/extensions/coap/protocols/CoapC2Protocol.cpp
index b9d195e..f51d5a3 100644
--- a/extensions/coap/protocols/CoapC2Protocol.cpp
+++ b/extensions/coap/protocols/CoapC2Protocol.cpp
@@ -39,7 +39,7 @@ CoapProtocol::CoapProtocol(const std::string &name, const 
utils::Identifier &uui
 
 CoapProtocol::~CoapProtocol() = default;
 
-void CoapProtocol::initialize(const 
std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const 
std::shared_ptr<Configure> &configure) {
+void CoapProtocol::initialize(core::controller::ControllerServiceProvider* 
controller, const std::shared_ptr<Configure> &configure) {
   RESTSender::initialize(controller, configure);
   if (configure->get("nifi.c2.coap.connector.service", 
controller_service_name_)) {
     auto service = controller->getControllerService(controller_service_name_);
diff --git a/extensions/coap/protocols/CoapC2Protocol.h 
b/extensions/coap/protocols/CoapC2Protocol.h
index a2a702c..ce74f35 100644
--- a/extensions/coap/protocols/CoapC2Protocol.h
+++ b/extensions/coap/protocols/CoapC2Protocol.h
@@ -84,7 +84,7 @@ class CoapProtocol : public minifi::c2::RESTSender {
     // no op.
   }
 
-  void initialize(const 
std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const 
std::shared_ptr<Configure> &configure) override;
+  void initialize(core::controller::ControllerServiceProvider* controller, 
const std::shared_ptr<Configure> &configure) override;
 
   // Supported Properties
 
diff --git a/extensions/expression-language/ProcessContextExpr.h 
b/extensions/expression-language/ProcessContextExpr.h
index 4d0db54..191859f 100644
--- a/extensions/expression-language/ProcessContextExpr.h
+++ b/extensions/expression-language/ProcessContextExpr.h
@@ -37,14 +37,14 @@ class ProcessContextExpr : public core::ProcessContext {
   /**
    std::forward of argument list did not work on all platform.
    **/
-  ProcessContextExpr(const std::shared_ptr<ProcessorNode> &processor, 
std::shared_ptr<controller::ControllerServiceProvider> 
&controller_service_provider,
+  ProcessContextExpr(const std::shared_ptr<ProcessorNode> &processor, 
controller::ControllerServiceProvider* controller_service_provider,
                      const std::shared_ptr<core::Repository> &repo, const 
std::shared_ptr<core::Repository> &flow_repo,
                      const std::shared_ptr<core::ContentRepository> 
&content_repo = std::make_shared<core::repository::FileSystemRepository>())
       : core::ProcessContext(processor, controller_service_provider, repo, 
flow_repo, content_repo),
         logger_(logging::LoggerFactory<ProcessContextExpr>::getLogger()) {
   }
 
-  ProcessContextExpr(const std::shared_ptr<ProcessorNode> &processor, 
std::shared_ptr<controller::ControllerServiceProvider> 
&controller_service_provider,
+  ProcessContextExpr(const std::shared_ptr<ProcessorNode> &processor, 
controller::ControllerServiceProvider* controller_service_provider,
                      const std::shared_ptr<core::Repository> &repo, const 
std::shared_ptr<core::Repository> &flow_repo, const 
std::shared_ptr<minifi::Configure> &configuration,
                      const std::shared_ptr<core::ContentRepository> 
&content_repo = std::make_shared<core::repository::FileSystemRepository>())
       : core::ProcessContext(processor, controller_service_provider, repo, 
flow_repo, configuration, content_repo),
diff --git a/extensions/http-curl/protocols/AgentPrinter.cpp 
b/extensions/http-curl/protocols/AgentPrinter.cpp
index b22331e..672652a 100644
--- a/extensions/http-curl/protocols/AgentPrinter.cpp
+++ b/extensions/http-curl/protocols/AgentPrinter.cpp
@@ -36,7 +36,7 @@ AgentPrinter::AgentPrinter(std::string name, 
utils::Identifier uuid)
       logger_(logging::LoggerFactory<AgentPrinter>::getLogger()) {
 }
 
-void AgentPrinter::initialize(const 
std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const 
std::shared_ptr<state::StateMonitor> &updateSink,
+void AgentPrinter::initialize(core::controller::ControllerServiceProvider* 
controller, const std::shared_ptr<state::StateMonitor> &updateSink,
                               const std::shared_ptr<Configure> &configure) {
   HeartBeatReporter::initialize(controller, updateSink, configure);
 }
diff --git a/extensions/http-curl/protocols/AgentPrinter.h 
b/extensions/http-curl/protocols/AgentPrinter.h
index 50000fe..83c8353 100644
--- a/extensions/http-curl/protocols/AgentPrinter.h
+++ b/extensions/http-curl/protocols/AgentPrinter.h
@@ -45,7 +45,7 @@ class AgentPrinter : public RESTProtocol, public 
HeartBeatReporter {
   /**
    * Initialize agent printer.
    */
-  virtual void initialize(const 
std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const 
std::shared_ptr<state::StateMonitor> &updateSink,
+  virtual void initialize(core::controller::ControllerServiceProvider* 
controller, const std::shared_ptr<state::StateMonitor> &updateSink,
                           const std::shared_ptr<Configure> &configure) 
override;
 
   /**
diff --git a/extensions/http-curl/protocols/RESTReceiver.cpp 
b/extensions/http-curl/protocols/RESTReceiver.cpp
index ae1ebf6..10557e7 100644
--- a/extensions/http-curl/protocols/RESTReceiver.cpp
+++ b/extensions/http-curl/protocols/RESTReceiver.cpp
@@ -44,7 +44,7 @@ RESTReceiver::RESTReceiver(std::string name, 
utils::Identifier uuid)
       logger_(logging::LoggerFactory<RESTReceiver>::getLogger()) {
 }
 
-void RESTReceiver::initialize(const 
std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const 
std::shared_ptr<state::StateMonitor> &updateSink, const 
std::shared_ptr<Configure> &configure) {
+void RESTReceiver::initialize(core::controller::ControllerServiceProvider* 
controller, const std::shared_ptr<state::StateMonitor> &updateSink, const 
std::shared_ptr<Configure> &configure) {
   HeartBeatReporter::initialize(controller, updateSink, configure);
   logger_->log_trace("Initializing rest receiver");
   if (nullptr != configuration_) {
diff --git a/extensions/http-curl/protocols/RESTReceiver.h 
b/extensions/http-curl/protocols/RESTReceiver.h
index 9c78a83..4bd6d51 100644
--- a/extensions/http-curl/protocols/RESTReceiver.h
+++ b/extensions/http-curl/protocols/RESTReceiver.h
@@ -48,7 +48,7 @@ class RESTReceiver : public RESTProtocol, public 
HeartBeatReporter {
  public:
   RESTReceiver(std::string name, utils::Identifier uuid = utils::Identifier());
 
-  virtual void initialize(const 
std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const 
std::shared_ptr<state::StateMonitor> &updateSink,
+  virtual void initialize(core::controller::ControllerServiceProvider* 
controller, const std::shared_ptr<state::StateMonitor> &updateSink,
                           const std::shared_ptr<Configure> &configure) 
override;
   virtual int16_t heartbeat(const C2Payload &heartbeat) override;
 
diff --git a/extensions/http-curl/protocols/RESTSender.cpp 
b/extensions/http-curl/protocols/RESTSender.cpp
index 160f048..649910e 100644
--- a/extensions/http-curl/protocols/RESTSender.cpp
+++ b/extensions/http-curl/protocols/RESTSender.cpp
@@ -41,7 +41,7 @@ RESTSender::RESTSender(const std::string &name, const 
utils::Identifier &uuid)
       logger_(logging::LoggerFactory<Connectable>::getLogger()) {
 }
 
-void RESTSender::initialize(const 
std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const 
std::shared_ptr<Configure> &configure) {
+void RESTSender::initialize(core::controller::ControllerServiceProvider* 
controller, const std::shared_ptr<Configure> &configure) {
   C2Protocol::initialize(controller, configure);
   // base URL when one is not specified.
   if (nullptr != configure) {
diff --git a/extensions/http-curl/protocols/RESTSender.h 
b/extensions/http-curl/protocols/RESTSender.h
index 40a81f2..211716c 100644
--- a/extensions/http-curl/protocols/RESTSender.h
+++ b/extensions/http-curl/protocols/RESTSender.h
@@ -56,7 +56,7 @@ class RESTSender : public RESTProtocol, public C2Protocol {
 
   virtual void update(const std::shared_ptr<Configure> &configure) override;
 
-  virtual void initialize(const 
std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const 
std::shared_ptr<Configure> &configure) override;
+  virtual void initialize(core::controller::ControllerServiceProvider* 
controller, const std::shared_ptr<Configure> &configure) override;
 
  protected:
 
diff --git a/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp 
b/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp
index 48e6710..93e1091 100644
--- a/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp
+++ b/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp
@@ -89,9 +89,8 @@ TEST_CASE("HTTPTestsWithNoResourceClaimPOST", "[httptest1]") {
 
   std::shared_ptr<core::ProcessorNode> node = 
std::make_shared<core::ProcessorNode>(listenhttp);
   std::shared_ptr<core::ProcessorNode> node2 = 
std::make_shared<core::ProcessorNode>(invokehttp);
-  std::shared_ptr<core::controller::ControllerServiceProvider> 
controller_services_provider = nullptr;
-  std::shared_ptr<core::ProcessContext> context = 
std::make_shared<core::ProcessContext>(node, controller_services_provider, 
repo, repo, content_repo);
-  std::shared_ptr<core::ProcessContext> context2 = 
std::make_shared<core::ProcessContext>(node2, controller_services_provider, 
repo, repo, content_repo);
+  std::shared_ptr<core::ProcessContext> context = 
std::make_shared<core::ProcessContext>(node, nullptr, repo, repo, content_repo);
+  std::shared_ptr<core::ProcessContext> context2 = 
std::make_shared<core::ProcessContext>(node2, nullptr, repo, repo, 
content_repo);
   
context->setProperty(org::apache::nifi::minifi::processors::ListenHTTP::Port, 
"8686");
   
context->setProperty(org::apache::nifi::minifi::processors::ListenHTTP::BasePath,
 "/testytesttest");
 
@@ -211,9 +210,8 @@ TEST_CASE("HTTPTestsWithResourceClaimPOST", "[httptest1]") {
 
   std::shared_ptr<core::ProcessorNode> node = 
std::make_shared<core::ProcessorNode>(listenhttp);
   std::shared_ptr<core::ProcessorNode> node2 = 
std::make_shared<core::ProcessorNode>(invokehttp);
-  std::shared_ptr<core::controller::ControllerServiceProvider> 
controller_services_provider = nullptr;
-  std::shared_ptr<core::ProcessContext> context = 
std::make_shared<core::ProcessContext>(node, controller_services_provider, 
repo, repo, content_repo);
-  std::shared_ptr<core::ProcessContext> context2 = 
std::make_shared<core::ProcessContext>(node2, controller_services_provider, 
repo, repo, content_repo);
+  std::shared_ptr<core::ProcessContext> context = 
std::make_shared<core::ProcessContext>(node, nullptr, repo, repo, content_repo);
+  std::shared_ptr<core::ProcessContext> context2 = 
std::make_shared<core::ProcessContext>(node2, nullptr, repo, repo, 
content_repo);
   
context->setProperty(org::apache::nifi::minifi::processors::ListenHTTP::Port, 
"8680");
   
context->setProperty(org::apache::nifi::minifi::processors::ListenHTTP::BasePath,
 "/testytesttest");
 
diff --git a/extensions/mqtt/protocol/MQTTC2Protocol.cpp 
b/extensions/mqtt/protocol/MQTTC2Protocol.cpp
index 9637e20..2dcdf68 100644
--- a/extensions/mqtt/protocol/MQTTC2Protocol.cpp
+++ b/extensions/mqtt/protocol/MQTTC2Protocol.cpp
@@ -30,7 +30,7 @@ MQTTC2Protocol::MQTTC2Protocol(std::string name, 
utils::Identifier uuid)
 
 MQTTC2Protocol::~MQTTC2Protocol() = default;
 
-void MQTTC2Protocol::initialize(const 
std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const 
std::shared_ptr<Configure> &configure) {
+void MQTTC2Protocol::initialize(core::controller::ControllerServiceProvider* 
controller, const std::shared_ptr<Configure> &configure) {
   if (configure->get("nifi.c2.mqtt.connector.service", 
controller_service_name_)) {
     auto service = controller->getControllerService(controller_service_name_);
     mqtt_service_ = 
std::static_pointer_cast<controllers::MQTTControllerService>(service);
diff --git a/extensions/mqtt/protocol/MQTTC2Protocol.h 
b/extensions/mqtt/protocol/MQTTC2Protocol.h
index 47e04a6..1497b10 100644
--- a/extensions/mqtt/protocol/MQTTC2Protocol.h
+++ b/extensions/mqtt/protocol/MQTTC2Protocol.h
@@ -64,7 +64,7 @@ class MQTTC2Protocol : public C2Protocol {
     // no op.
   }
 
-  virtual void initialize(const 
std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const 
std::shared_ptr<Configure> &configure) override;
+  virtual void initialize(core::controller::ControllerServiceProvider* 
controller, const std::shared_ptr<Configure> &configure) override;
 
  protected:
 
diff --git 
a/extensions/standard-processors/tests/integration/TestExecuteProcess.cpp 
b/extensions/standard-processors/tests/integration/TestExecuteProcess.cpp
index 138e610..fa597ce 100644
--- a/extensions/standard-processors/tests/integration/TestExecuteProcess.cpp
+++ b/extensions/standard-processors/tests/integration/TestExecuteProcess.cpp
@@ -82,16 +82,14 @@ int main(int argc, char **argv) {
   std::vector<std::thread> processor_workers;
 
   std::shared_ptr<core::ProcessorNode> node2 = 
std::make_shared<core::ProcessorNode>(processor);
-  std::shared_ptr<core::controller::ControllerServiceProvider> 
controller_services_provider = nullptr;
-  std::shared_ptr<core::ProcessContext> contextset = 
std::make_shared<core::ProcessContext>(node2, controller_services_provider, 
test_repo, test_repo);
+  std::shared_ptr<core::ProcessContext> contextset = 
std::make_shared<core::ProcessContext>(node2, nullptr, test_repo, test_repo);
   core::ProcessSessionFactory factory(contextset);
   processor->onSchedule(contextset.get(), &factory);
 
   for (int i = 0; i < 1; i++) {
     processor_workers.push_back(std::thread([processor, test_repo, 
&is_ready]() {
       std::shared_ptr<core::ProcessorNode> node = 
std::make_shared<core::ProcessorNode>(processor);
-      std::shared_ptr<core::controller::ControllerServiceProvider> 
controller_services_provider = nullptr;
-      std::shared_ptr<core::ProcessContext> context = 
std::make_shared<core::ProcessContext>(node, controller_services_provider, 
test_repo, test_repo);
+      std::shared_ptr<core::ProcessContext> context = 
std::make_shared<core::ProcessContext>(node, nullptr, test_repo, test_repo);
       
context->setProperty(org::apache::nifi::minifi::processors::ExecuteProcess::Command,
 "sleep 0.5");
       std::shared_ptr<core::ProcessSession> session = 
std::make_shared<core::ProcessSession>(context);
       while (!is_ready.load(std::memory_order_relaxed)) {
diff --git a/extensions/standard-processors/tests/unit/GetTCPTests.cpp 
b/extensions/standard-processors/tests/unit/GetTCPTests.cpp
index 10d5f77..5f86a51 100644
--- a/extensions/standard-processors/tests/unit/GetTCPTests.cpp
+++ b/extensions/standard-processors/tests/unit/GetTCPTests.cpp
@@ -95,9 +95,8 @@ TEST_CASE("GetTCPWithoutEOM", "[GetTCP1]") {
 
   std::shared_ptr<core::ProcessorNode> node = 
std::make_shared<core::ProcessorNode>(processor);
     std::shared_ptr<core::ProcessorNode> node2 = 
std::make_shared<core::ProcessorNode>(logAttribute);
-    std::shared_ptr<core::controller::ControllerServiceProvider> 
controller_services_provider = nullptr;
-    std::shared_ptr<core::ProcessContext> context = 
std::make_shared<core::ProcessContext>(node, controller_services_provider, 
repo, repo, content_repo);
-    std::shared_ptr<core::ProcessContext> context2 = 
std::make_shared<core::ProcessContext>(node2, controller_services_provider, 
repo, repo, content_repo);
+    std::shared_ptr<core::ProcessContext> context = 
std::make_shared<core::ProcessContext>(node, nullptr, repo, repo, content_repo);
+    std::shared_ptr<core::ProcessContext> context2 = 
std::make_shared<core::ProcessContext>(node2, nullptr, repo, repo, 
content_repo);
   
context->setProperty(org::apache::nifi::minifi::processors::GetTCP::EndpointList,
 org::apache::nifi::minifi::io::Socket::getMyHostName() + ":" + 
std::to_string(server.getPort()));
   
context->setProperty(org::apache::nifi::minifi::processors::GetTCP::ReconnectInterval,
 "100 msec");
   auto session = std::make_shared<core::ProcessSession>(context);
@@ -206,9 +205,8 @@ TEST_CASE("GetTCPWithOEM", "[GetTCP2]") {
 
   std::shared_ptr<core::ProcessorNode> node = 
std::make_shared<core::ProcessorNode>(processor);
     std::shared_ptr<core::ProcessorNode> node2 = 
std::make_shared<core::ProcessorNode>(logAttribute);
-    std::shared_ptr<core::controller::ControllerServiceProvider> 
controller_services_provider = nullptr;
-    std::shared_ptr<core::ProcessContext> context = 
std::make_shared<core::ProcessContext>(node, controller_services_provider, 
repo, repo, content_repo);
-    std::shared_ptr<core::ProcessContext> context2 = 
std::make_shared<core::ProcessContext>(node2, controller_services_provider, 
repo, repo, content_repo);
+    std::shared_ptr<core::ProcessContext> context = 
std::make_shared<core::ProcessContext>(node, nullptr, repo, repo, content_repo);
+    std::shared_ptr<core::ProcessContext> context2 = 
std::make_shared<core::ProcessContext>(node2, nullptr, repo, repo, 
content_repo);
   
context->setProperty(org::apache::nifi::minifi::processors::GetTCP::EndpointList,
 org::apache::nifi::minifi::io::Socket::getMyHostName() + ":" + 
std::to_string(server.getPort()));
   
context->setProperty(org::apache::nifi::minifi::processors::GetTCP::ReconnectInterval,
 "100 msec");
   // we're using new lines above
@@ -329,9 +327,8 @@ TEST_CASE("GetTCPWithOnlyOEM", "[GetTCP3]") {
 
   std::shared_ptr<core::ProcessorNode> node = 
std::make_shared<core::ProcessorNode>(processor);
     std::shared_ptr<core::ProcessorNode> node2 = 
std::make_shared<core::ProcessorNode>(logAttribute);
-    std::shared_ptr<core::controller::ControllerServiceProvider> 
controller_services_provider = nullptr;
-    std::shared_ptr<core::ProcessContext> context = 
std::make_shared<core::ProcessContext>(node, controller_services_provider, 
repo, repo, content_repo);
-    std::shared_ptr<core::ProcessContext> context2 = 
std::make_shared<core::ProcessContext>(node2, controller_services_provider, 
repo, repo, content_repo);
+    std::shared_ptr<core::ProcessContext> context = 
std::make_shared<core::ProcessContext>(node, nullptr, repo, repo, content_repo);
+    std::shared_ptr<core::ProcessContext> context2 = 
std::make_shared<core::ProcessContext>(node2, nullptr, repo, repo, 
content_repo);
   
context->setProperty(org::apache::nifi::minifi::processors::GetTCP::EndpointList,
 org::apache::nifi::minifi::io::Socket::getMyHostName() + ":" + 
std::to_string(server.getPort()));
   
context->setProperty(org::apache::nifi::minifi::processors::GetTCP::ReconnectInterval,
 "100 msec");
   // we're using new lines above
diff --git a/extensions/standard-processors/tests/unit/ProcessorTests.cpp 
b/extensions/standard-processors/tests/unit/ProcessorTests.cpp
index a251b4e..a99d8b3 100644
--- a/extensions/standard-processors/tests/unit/ProcessorTests.cpp
+++ b/extensions/standard-processors/tests/unit/ProcessorTests.cpp
@@ -83,8 +83,7 @@ TEST_CASE("Test GetFileMultiple", "[getfileCreate3]") {
   REQUIRE(!dir.empty());
 
   std::shared_ptr<core::ProcessorNode> node = 
std::make_shared<core::ProcessorNode>(processor);
-  std::shared_ptr<core::controller::ControllerServiceProvider> 
controller_services_provider = nullptr;
-  auto context = std::make_shared<core::ProcessContext>(node, 
controller_services_provider, repo, repo, content_repo);
+  auto context = std::make_shared<core::ProcessContext>(node, nullptr, repo, 
repo, content_repo);
 
   
context->setProperty(org::apache::nifi::minifi::processors::GetFile::Directory, 
dir);
   // replicate 10 threads
@@ -169,8 +168,7 @@ TEST_CASE("Test GetFile Ignore", "[getfileCreate3]") {
   REQUIRE(!dir.empty());
 
   std::shared_ptr<core::ProcessorNode> node = 
std::make_shared<core::ProcessorNode>(processor);
-  std::shared_ptr<core::controller::ControllerServiceProvider> 
controller_services_provider = nullptr;
-  auto context = std::make_shared<core::ProcessContext>(node, 
controller_services_provider, repo, repo, content_repo);
+  auto context = std::make_shared<core::ProcessContext>(node, nullptr, repo, 
repo, content_repo);
 
   
context->setProperty(org::apache::nifi::minifi::processors::GetFile::Directory, 
dir);
   // replicate 10 threads
@@ -265,8 +263,7 @@ TEST_CASE("TestConnectionFull", "[ConnectionFull]") {
   processor->setScheduledState(core::ScheduledState::RUNNING);
 
   std::shared_ptr<core::ProcessorNode> node = 
std::make_shared<core::ProcessorNode>(processor);
-  std::shared_ptr<core::controller::ControllerServiceProvider> 
controller_services_provider = nullptr;
-  auto context = std::make_shared<core::ProcessContext>(node, 
controller_services_provider, repo, repo, content_repo);
+  auto context = std::make_shared<core::ProcessContext>(node, nullptr, repo, 
repo, content_repo);
 
   auto factory = std::make_shared<core::ProcessSessionFactory>(context);
 
@@ -574,8 +571,7 @@ void testRPGBypass(const std::string &host, const 
std::string &port, const std::
   REQUIRE(rpg->setProperty(minifi::RemoteProcessorGroupPort::hostName, host));
   rpg->setProperty(minifi::RemoteProcessorGroupPort::port, port);
   std::shared_ptr<core::ProcessorNode> node = 
std::make_shared<core::ProcessorNode>(rpg);
-  std::shared_ptr<core::controller::ControllerServiceProvider> 
controller_services_provider = nullptr;
-  auto context = std::make_shared<core::ProcessContext>(node, 
controller_services_provider, repo, repo, content_repo);
+  auto context = std::make_shared<core::ProcessContext>(node, nullptr, repo, 
repo, content_repo);
   auto psf = std::make_shared<core::ProcessSessionFactory>(context);
   if (hasException) {
     auto expected_error = "Site2Site Protocol: HTTPClient not resolvable. No 
peers configured or any port specific hostname and port -- cannot schedule";
diff --git a/libminifi/include/CronDrivenSchedulingAgent.h 
b/libminifi/include/CronDrivenSchedulingAgent.h
index 31ac37a..3f1e7e0 100644
--- a/libminifi/include/CronDrivenSchedulingAgent.h
+++ b/libminifi/include/CronDrivenSchedulingAgent.h
@@ -44,7 +44,7 @@ class CronDrivenSchedulingAgent : public 
ThreadedSchedulingAgent {
   /*!
    * Create a new event driven scheduling agent.
    */
-  
CronDrivenSchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider>
 controller_service_provider, std::shared_ptr<core::Repository> repo,
+  CronDrivenSchedulingAgent(const 
gsl::not_null<core::controller::ControllerServiceProvider*> 
controller_service_provider, std::shared_ptr<core::Repository> repo,
                             std::shared_ptr<core::Repository> flow_repo, 
std::shared_ptr<core::ContentRepository> content_repo, 
std::shared_ptr<Configure> configuration,
                             utils::ThreadPool<utils::TaskRescheduleInfo> 
&thread_pool)
       : ThreadedSchedulingAgent(controller_service_provider, repo, flow_repo, 
content_repo, configuration, thread_pool) {
diff --git a/libminifi/include/EventDrivenSchedulingAgent.h 
b/libminifi/include/EventDrivenSchedulingAgent.h
index 65d25bb..d56e929 100644
--- a/libminifi/include/EventDrivenSchedulingAgent.h
+++ b/libminifi/include/EventDrivenSchedulingAgent.h
@@ -43,7 +43,7 @@ class EventDrivenSchedulingAgent : public 
ThreadedSchedulingAgent {
   /*!
    * Create a new event driven scheduling agent.
    */
-  
EventDrivenSchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider>
 controller_service_provider, std::shared_ptr<core::Repository> repo,
+  EventDrivenSchedulingAgent(const 
gsl::not_null<core::controller::ControllerServiceProvider*> 
controller_service_provider, std::shared_ptr<core::Repository> repo,
                              std::shared_ptr<core::Repository> flow_repo, 
std::shared_ptr<core::ContentRepository> content_repo, 
std::shared_ptr<Configure> configuration,
                              utils::ThreadPool<utils::TaskRescheduleInfo> 
&thread_pool)
       : ThreadedSchedulingAgent(controller_service_provider, repo, flow_repo, 
content_repo, configuration, thread_pool) {
diff --git a/libminifi/include/FlowController.h 
b/libminifi/include/FlowController.h
index c40c573..cd1804e 100644
--- a/libminifi/include/FlowController.h
+++ b/libminifi/include/FlowController.h
@@ -235,7 +235,7 @@ class FlowController : public 
core::controller::ControllerServiceProvider, publi
    * @param id service identifier
    * @return shared pointer to the controller service node or nullptr if it 
does not exist.
    */
-  std::shared_ptr<core::controller::ControllerServiceNode> 
getControllerServiceNode(const std::string &id) override;
+  std::shared_ptr<core::controller::ControllerServiceNode> 
getControllerServiceNode(const std::string &id) const override;
 
   void 
verifyCanStopReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode>
 &serviceNode) override;
 
@@ -337,6 +337,15 @@ class FlowController : public 
core::controller::ControllerServiceProvider, publi
 
   utils::optional<std::chrono::milliseconds> 
loadShutdownTimeoutFromConfiguration();
 
+ private:
+  template <typename T, typename = typename 
std::enable_if<std::is_base_of<SchedulingAgent, T>::value>::type>
+  void conditionalReloadScheduler(std::shared_ptr<T>& scheduler, const bool 
condition) {
+    if (condition) {
+      scheduler = 
std::make_shared<T>(gsl::not_null<core::controller::ControllerServiceProvider*>(this),
 provenance_repo_, flow_file_repo_, content_repo_, configuration_, 
thread_pool_);
+    }
+  }
+
+ protected:
   // flow controller mutex
   std::recursive_mutex mutex_;
 
diff --git a/libminifi/include/SchedulingAgent.h 
b/libminifi/include/SchedulingAgent.h
index 088c3e1..fb5532d 100644
--- a/libminifi/include/SchedulingAgent.h
+++ b/libminifi/include/SchedulingAgent.h
@@ -59,7 +59,7 @@ class SchedulingAgent {
   /*!
    * Create a new scheduling agent.
    */
-  SchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> 
controller_service_provider, std::shared_ptr<core::Repository> repo, 
std::shared_ptr<core::Repository> flow_repo,
+  SchedulingAgent(const 
gsl::not_null<core::controller::ControllerServiceProvider*> 
controller_service_provider, std::shared_ptr<core::Repository> repo, 
std::shared_ptr<core::Repository> flow_repo,
                   std::shared_ptr<core::ContentRepository> content_repo, 
std::shared_ptr<Configure> configuration, 
utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool)
       : admin_yield_duration_(),
         bored_yield_duration_(0),
@@ -135,7 +135,7 @@ class SchedulingAgent {
   // thread pool for components.
   utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool_;
   // controller service provider reference
-  std::shared_ptr<core::controller::ControllerServiceProvider> 
controller_service_provider_;
+  gsl::not_null<core::controller::ControllerServiceProvider*> 
controller_service_provider_;
 
  private:
   struct SchedulingInfo {
diff --git a/libminifi/include/ThreadedSchedulingAgent.h 
b/libminifi/include/ThreadedSchedulingAgent.h
index 8a35580..2e8edda 100644
--- a/libminifi/include/ThreadedSchedulingAgent.h
+++ b/libminifi/include/ThreadedSchedulingAgent.h
@@ -47,8 +47,9 @@ class ThreadedSchedulingAgent : public SchedulingAgent {
   /*!
    * Create a new threaded scheduling agent.
    */
-  
ThreadedSchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider>
 controller_service_provider, std::shared_ptr<core::Repository> repo, 
std::shared_ptr<core::Repository> flow_repo,
-                          std::shared_ptr<core::ContentRepository> 
content_repo, std::shared_ptr<Configure> configuration,  
utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool)
+  ThreadedSchedulingAgent(const 
gsl::not_null<core::controller::ControllerServiceProvider*> 
controller_service_provider, std::shared_ptr<core::Repository> repo,
+        std::shared_ptr<core::Repository> flow_repo, 
std::shared_ptr<core::ContentRepository> content_repo,
+        std::shared_ptr<Configure> configuration,  
utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool)
       : SchedulingAgent(controller_service_provider, repo, flow_repo, 
content_repo, configuration, thread_pool),
         logger_(logging::LoggerFactory<ThreadedSchedulingAgent>::getLogger()) {
   }
diff --git a/libminifi/include/TimerDrivenSchedulingAgent.h 
b/libminifi/include/TimerDrivenSchedulingAgent.h
index 160db1d..1481c18 100644
--- a/libminifi/include/TimerDrivenSchedulingAgent.h
+++ b/libminifi/include/TimerDrivenSchedulingAgent.h
@@ -39,7 +39,7 @@ class TimerDrivenSchedulingAgent : public 
ThreadedSchedulingAgent {
   /*!
    * Create a new processor
    */
-  
TimerDrivenSchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider>
 controller_service_provider, std::shared_ptr<core::Repository> repo,
+  TimerDrivenSchedulingAgent(const 
gsl::not_null<core::controller::ControllerServiceProvider*> 
controller_service_provider, std::shared_ptr<core::Repository> repo,
                              std::shared_ptr<core::Repository> flow_repo, 
std::shared_ptr<core::ContentRepository> content_repo, 
std::shared_ptr<Configure> configure,
                              utils::ThreadPool<utils::TaskRescheduleInfo> 
&thread_pool)
       : ThreadedSchedulingAgent(controller_service_provider, repo, flow_repo, 
content_repo, configure, thread_pool),
diff --git a/libminifi/include/c2/C2Agent.h b/libminifi/include/c2/C2Agent.h
index 273fd43..edcf096 100644
--- a/libminifi/include/c2/C2Agent.h
+++ b/libminifi/include/c2/C2Agent.h
@@ -62,7 +62,7 @@ namespace c2 {
  */
 class C2Agent : public state::UpdateController {
  public:
-  C2Agent(const std::shared_ptr<core::controller::ControllerServiceProvider> 
&controller,
+  C2Agent(core::controller::ControllerServiceProvider* controller,
           const std::shared_ptr<state::StateMonitor> &updateSink,
           const std::shared_ptr<Configure> &configure);
   virtual ~C2Agent() noexcept {
@@ -210,7 +210,7 @@ class C2Agent : public state::UpdateController {
   std::shared_ptr<controllers::UpdatePolicyControllerService> update_service_;
 
   // controller service provider reference.
-  std::shared_ptr<core::controller::ControllerServiceProvider> controller_;
+  core::controller::ControllerServiceProvider* controller_;
 
   // shared pointer to the configuration of this agent
   std::shared_ptr<Configure> configuration_;
diff --git a/libminifi/include/c2/C2Protocol.h 
b/libminifi/include/c2/C2Protocol.h
index 2cbd0bd..23e7748 100644
--- a/libminifi/include/c2/C2Protocol.h
+++ b/libminifi/include/c2/C2Protocol.h
@@ -41,7 +41,7 @@ class C2Protocol : public core::Connectable {
         running_(true) {
   }
 
-  virtual void initialize(const 
std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const 
std::shared_ptr<Configure> &configure) {
+  virtual void initialize(core::controller::ControllerServiceProvider* 
controller, const std::shared_ptr<Configure> &configure) {
     controller_ = controller;
     configuration_ = configure;
   }
@@ -102,7 +102,7 @@ class C2Protocol : public core::Connectable {
  protected:
   std::atomic<bool> running_;
 
-  std::shared_ptr<core::controller::ControllerServiceProvider> controller_;
+  core::controller::ControllerServiceProvider* controller_;
 
   std::shared_ptr<Configure> configuration_;
 };
diff --git a/libminifi/include/c2/ControllerSocketProtocol.h 
b/libminifi/include/c2/ControllerSocketProtocol.h
index d24c8fd..1636477 100644
--- a/libminifi/include/c2/ControllerSocketProtocol.h
+++ b/libminifi/include/c2/ControllerSocketProtocol.h
@@ -51,7 +51,7 @@ class ControllerSocketProtocol : public HeartBeatReporter {
    * @param updateSink update mechanism that will be used to stop/clear 
elements
    * @param configuration configuration class.
    */
-  virtual void initialize(const 
std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const 
std::shared_ptr<state::StateMonitor> &updateSink,
+  virtual void initialize(core::controller::ControllerServiceProvider* 
controller, const std::shared_ptr<state::StateMonitor> &updateSink,
                           const std::shared_ptr<Configure> &configuration);
 
   /**
diff --git a/libminifi/include/c2/HeartBeatReporter.h 
b/libminifi/include/c2/HeartBeatReporter.h
index 9de242d..82d5f2a 100644
--- a/libminifi/include/c2/HeartBeatReporter.h
+++ b/libminifi/include/c2/HeartBeatReporter.h
@@ -45,7 +45,7 @@ class HeartBeatReporter : public core::Connectable {
         configuration_(nullptr) {
   }
 
-  virtual void initialize(const 
std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const 
std::shared_ptr<state::StateMonitor> &updateSink,
+  virtual void initialize(core::controller::ControllerServiceProvider* 
controller, const std::shared_ptr<state::StateMonitor> &updateSink,
                           const std::shared_ptr<Configure> &configure) {
     controller_ = controller;
     update_sink_ = updateSink;
@@ -89,7 +89,7 @@ class HeartBeatReporter : public core::Connectable {
   }
 
  protected:
-  std::shared_ptr<core::controller::ControllerServiceProvider> controller_;
+  core::controller::ControllerServiceProvider* controller_;
 
   std::shared_ptr<state::StateMonitor> update_sink_;
 
diff --git a/libminifi/include/core/ProcessContext.h 
b/libminifi/include/core/ProcessContext.h
index 53d42aa..3e0da1a 100644
--- a/libminifi/include/core/ProcessContext.h
+++ b/libminifi/include/core/ProcessContext.h
@@ -55,7 +55,7 @@ class ProcessContext : public 
controller::ControllerServiceLookup, public core::
   /*!
    * Create a new process context associated with the processor/controller 
service/state manager
    */
-  ProcessContext(const std::shared_ptr<ProcessorNode> &processor, 
std::shared_ptr<controller::ControllerServiceProvider> 
&controller_service_provider, const std::shared_ptr<core::Repository> &repo,
+  ProcessContext(const std::shared_ptr<ProcessorNode> &processor, 
controller::ControllerServiceProvider* controller_service_provider, const 
std::shared_ptr<core::Repository> &repo,
                  const std::shared_ptr<core::Repository> &flow_repo, const 
std::shared_ptr<core::ContentRepository> &content_repo = 
std::make_shared<core::repository::FileSystemRepository>())
       : VariableRegistry(std::make_shared<minifi::Configure>()),
         controller_service_provider_(controller_service_provider),
@@ -73,7 +73,7 @@ class ProcessContext : public 
controller::ControllerServiceLookup, public core::
   /*!
    * Create a new process context associated with the processor/controller 
service/state manager
    */
-  ProcessContext(const std::shared_ptr<ProcessorNode> &processor, 
std::shared_ptr<controller::ControllerServiceProvider> 
&controller_service_provider, const std::shared_ptr<core::Repository> &repo,
+  ProcessContext(const std::shared_ptr<ProcessorNode> &processor, 
controller::ControllerServiceProvider* controller_service_provider, const 
std::shared_ptr<core::Repository> &repo,
                  const std::shared_ptr<core::Repository> &flow_repo, const 
std::shared_ptr<minifi::Configure> &configuration, const 
std::shared_ptr<core::ContentRepository> &content_repo =
                      
std::make_shared<core::repository::FileSystemRepository>())
       : VariableRegistry(configuration),
@@ -172,9 +172,7 @@ class ProcessContext : public 
controller::ControllerServiceLookup, public core::
    * identifier
    */
   std::shared_ptr<core::controller::ControllerService> 
getControllerService(const std::string &identifier) {
-    if (controller_service_provider_ != nullptr)
-      return 
controller_service_provider_->getControllerServiceForComponent(identifier, 
processor_node_->getUUIDStr());
-    return nullptr;
+    return controller_service_provider_ == nullptr ? nullptr : 
controller_service_provider_->getControllerServiceForComponent(identifier, 
processor_node_->getUUIDStr());
   }
 
   /**
@@ -228,7 +226,7 @@ class ProcessContext : public 
controller::ControllerServiceLookup, public core::
   }
 
   static std::shared_ptr<core::CoreComponentStateManagerProvider> 
getOrCreateDefaultStateManagerProvider(
-      std::shared_ptr<controller::ControllerServiceProvider> 
controller_service_provider,
+      controller::ControllerServiceProvider* controller_service_provider,
       std::shared_ptr<minifi::Configure> configuration,
       const char *base_path = "") {
     static std::mutex mutex;
@@ -300,7 +298,7 @@ class ProcessContext : public 
controller::ControllerServiceLookup, public core::
 
   static std::shared_ptr<core::CoreComponentStateManagerProvider> 
getStateManagerProvider(
       std::shared_ptr<logging::Logger> logger,
-      std::shared_ptr<controller::ControllerServiceProvider> 
controller_service_provider,
+      controller::ControllerServiceProvider* controller_service_provider,
       std::shared_ptr<minifi::Configure> configuration) {
     if (controller_service_provider == nullptr) {
       return nullptr;
@@ -330,7 +328,7 @@ class ProcessContext : public 
controller::ControllerServiceLookup, public core::
   }
 
   // controller service provider.
-  std::shared_ptr<controller::ControllerServiceProvider> 
controller_service_provider_;
+  controller::ControllerServiceProvider* controller_service_provider_;
   // state manager provider
   std::shared_ptr<core::CoreComponentStateManagerProvider> 
state_manager_provider_;
   // repository shared pointer.
diff --git a/libminifi/include/core/ProcessContextBuilder.h 
b/libminifi/include/core/ProcessContextBuilder.h
index 155ab41..281be79 100644
--- a/libminifi/include/core/ProcessContextBuilder.h
+++ b/libminifi/include/core/ProcessContextBuilder.h
@@ -62,7 +62,7 @@ class ProcessContextBuilder : public core::CoreComponent, 
public std::enable_sha
 
   virtual ~ProcessContextBuilder() = default;
 
-  std::shared_ptr<ProcessContextBuilder> withProvider(const 
std::shared_ptr<controller::ControllerServiceProvider> 
&controller_service_provider);
+  std::shared_ptr<ProcessContextBuilder> 
withProvider(core::controller::ControllerServiceProvider* 
controller_service_provider);
 
   std::shared_ptr<ProcessContextBuilder> withProvenanceRepository(const 
std::shared_ptr<core::Repository> &repo);
 
@@ -76,7 +76,7 @@ class ProcessContextBuilder : public core::CoreComponent, 
public std::enable_sha
 
  protected:
   std::shared_ptr<minifi::Configure> configuration_;
-  std::shared_ptr<controller::ControllerServiceProvider> 
controller_service_provider_;
+  core::controller::ControllerServiceProvider* controller_service_provider_;
   std::shared_ptr<core::Repository> prov_repo_;
   std::shared_ptr<core::Repository> flow_repo_;
   std::shared_ptr<core::ContentRepository> content_repo_;
diff --git a/libminifi/include/core/controller/ControllerServiceProvider.h 
b/libminifi/include/core/controller/ControllerServiceProvider.h
index 8628f94..428b977 100644
--- a/libminifi/include/core/controller/ControllerServiceProvider.h
+++ b/libminifi/include/core/controller/ControllerServiceProvider.h
@@ -82,7 +82,7 @@ class ControllerServiceProvider : public CoreComponent, 
public ConfigurableCompo
    * @param id controller service identifier.
    * @return shared pointer to the controller service node.
    */
-  virtual std::shared_ptr<ControllerServiceNode> 
getControllerServiceNode(const std::string &id) {
+  virtual std::shared_ptr<ControllerServiceNode> 
getControllerServiceNode(const std::string &id) const {
     return controller_map_->getControllerServiceNode(id);
   }
 
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index e088628..01c9a1e 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -82,10 +82,6 @@ 
FlowController::FlowController(std::shared_ptr<core::Repository> provenance_repo
       flow_file_repo_(flow_file_repo),
       
controller_service_map_(std::make_shared<core::controller::ControllerServiceMap>()),
       thread_pool_(2, false, nullptr, "Flowcontroller threadpool"),
-      timer_scheduler_(nullptr),
-      event_scheduler_(nullptr),
-      cron_scheduler_(nullptr),
-      controller_service_provider_(nullptr),
       flow_configuration_(std::move(flow_configuration)),
       configuration_(configure),
       content_repo_(content_repo),
@@ -267,9 +263,9 @@ int16_t FlowController::stop() {
       this->root_->stopProcessing(timer_scheduler_, event_scheduler_, 
cron_scheduler_);
     }
     // stop after we've attempted to stop the processors.
-    this->timer_scheduler_->stop();
-    this->event_scheduler_->stop();
-    this->cron_scheduler_->stop();
+    timer_scheduler_->stop();
+    event_scheduler_->stop();
+    cron_scheduler_->stop();
     thread_pool_.shutdown();
     /* STOP! Before you change it, consider the following:
      * -Stopping the schedulers doesn't actually quit the onTrigger functions 
of processors
@@ -350,17 +346,10 @@ void FlowController::load(const 
std::shared_ptr<core::ProcessGroup> &root, bool
       thread_pool_.setControllerServiceProvider(base_shared_ptr);
       thread_pool_.start();
     }
-    if (nullptr == timer_scheduler_ || reload) {
-      timer_scheduler_ = 
std::make_shared<TimerDrivenSchedulingAgent>(base_shared_ptr, provenance_repo_, 
flow_file_repo_, content_repo_, configuration_, thread_pool_);
-    }
-
-    if (nullptr == event_scheduler_ || reload) {
-      event_scheduler_ = 
std::make_shared<EventDrivenSchedulingAgent>(base_shared_ptr, provenance_repo_, 
flow_file_repo_, content_repo_, configuration_, thread_pool_);
-    }
 
-    if (nullptr == cron_scheduler_ || reload) {
-      cron_scheduler_ = 
std::make_shared<CronDrivenSchedulingAgent>(base_shared_ptr, provenance_repo_, 
flow_file_repo_, content_repo_, configuration_, thread_pool_);
-    }
+    conditionalReloadScheduler<TimerDrivenSchedulingAgent>(timer_scheduler_, 
!timer_scheduler_ || reload);
+    conditionalReloadScheduler<EventDrivenSchedulingAgent>(event_scheduler_, 
!event_scheduler_ || reload);
+    conditionalReloadScheduler<CronDrivenSchedulingAgent>(cron_scheduler_, 
!cron_scheduler_ || reload);
 
     
std::static_pointer_cast<core::controller::StandardControllerServiceProvider>(controller_service_provider_)->setRootGroup(root_);
     
std::static_pointer_cast<core::controller::StandardControllerServiceProvider>(controller_service_provider_)->setSchedulingAgent(
@@ -591,7 +580,7 @@ void FlowController::initializeC2() {
   loadC2ResponseConfiguration();
 
   if (!c2_initialized_) {
-    c2_agent_ = std::unique_ptr<c2::C2Agent>(new 
c2::C2Agent(std::dynamic_pointer_cast<FlowController>(shared_from_this()),
+    c2_agent_ = std::unique_ptr<c2::C2Agent>(new c2::C2Agent(this,
                                                              
std::dynamic_pointer_cast<FlowController>(shared_from_this()),
                                                              configuration_));
     c2_agent_->start();
@@ -801,7 +790,7 @@ std::shared_ptr<core::controller::ControllerService> 
FlowController::getControll
  * @param id service identifier
  * @return shared pointer to the controller service node or nullptr if it does 
not exist.
  */
-std::shared_ptr<core::controller::ControllerServiceNode> 
FlowController::getControllerServiceNode(const std::string &id) {
+std::shared_ptr<core::controller::ControllerServiceNode> 
FlowController::getControllerServiceNode(const std::string &id) const {
   return controller_service_provider_->getControllerServiceNode(id);
 }
 
diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp
index 0ea31d2..9c0085d 100644
--- a/libminifi/src/c2/C2Agent.cpp
+++ b/libminifi/src/c2/C2Agent.cpp
@@ -47,7 +47,7 @@ namespace nifi {
 namespace minifi {
 namespace c2 {
 
-C2Agent::C2Agent(const 
std::shared_ptr<core::controller::ControllerServiceProvider> &controller,
+C2Agent::C2Agent(core::controller::ControllerServiceProvider* controller,
                  const std::shared_ptr<state::StateMonitor> &updateSink,
                  const std::shared_ptr<Configure> &configuration)
     : heart_beat_period_(3000),
diff --git a/libminifi/src/c2/ControllerSocketProtocol.cpp 
b/libminifi/src/c2/ControllerSocketProtocol.cpp
index 9b7162d..124490f 100644
--- a/libminifi/src/c2/ControllerSocketProtocol.cpp
+++ b/libminifi/src/c2/ControllerSocketProtocol.cpp
@@ -32,7 +32,7 @@ namespace nifi {
 namespace minifi {
 namespace c2 {
 
-void ControllerSocketProtocol::initialize(const 
std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const 
std::shared_ptr<state::StateMonitor> &updateSink,
+void 
ControllerSocketProtocol::initialize(core::controller::ControllerServiceProvider*
 controller, const std::shared_ptr<state::StateMonitor> &updateSink,
                                           const std::shared_ptr<Configure> 
&configuration) {
   HeartBeatReporter::initialize(controller, updateSink, configuration);
   stream_factory_ = minifi::io::StreamFactory::getInstance(configuration);
diff --git a/libminifi/src/core/ProcessContextBuilder.cpp 
b/libminifi/src/core/ProcessContextBuilder.cpp
index b1b794c..2cba8b7 100644
--- a/libminifi/src/core/ProcessContextBuilder.cpp
+++ b/libminifi/src/core/ProcessContextBuilder.cpp
@@ -48,7 +48,7 @@ ProcessContextBuilder::ProcessContextBuilder(const 
std::string &name)
   configuration_ = std::make_shared<minifi::Configure>();
 }
 
-std::shared_ptr<ProcessContextBuilder> 
ProcessContextBuilder::withProvider(const 
std::shared_ptr<controller::ControllerServiceProvider> 
&controller_service_provider) {
+std::shared_ptr<ProcessContextBuilder> 
ProcessContextBuilder::withProvider(core::controller::ControllerServiceProvider*
 controller_service_provider) {
   controller_service_provider_ = controller_service_provider;
   return this->shared_from_this();
 }
diff --git a/libminifi/test/TestBase.cpp b/libminifi/test/TestBase.cpp
index 6f415eb..c897459 100644
--- a/libminifi/test/TestBase.cpp
+++ b/libminifi/test/TestBase.cpp
@@ -54,7 +54,7 @@ TestPlan::TestPlan(std::shared_ptr<core::ContentRepository> 
content_repo, std::s
   } else {
     state_dir_ = state_dir;
   }
-  state_manager_provider_ = 
core::ProcessContext::getOrCreateDefaultStateManagerProvider(controller_services_provider_,
 configuration_, state_dir_.c_str());
+  state_manager_provider_ = 
core::ProcessContext::getOrCreateDefaultStateManagerProvider(controller_services_provider_.get(),
 configuration_, state_dir_.c_str());
 }
 
 TestPlan::~TestPlan() {
@@ -125,7 +125,7 @@ std::shared_ptr<core::Processor> 
TestPlan::addProcessor(const std::shared_ptr<co
 
   auto contextBuilder = 
core::ClassLoader::getDefaultClassLoader().instantiate<core::ProcessContextBuilder>("ProcessContextBuilder");
 
-  contextBuilder = 
contextBuilder->withContentRepository(content_repo_)->withFlowFileRepository(flow_repo_)->withProvider(controller_services_provider_)->withProvenanceRepository(prov_repo_)->withConfiguration(configuration_);
+  contextBuilder = 
contextBuilder->withContentRepository(content_repo_)->withFlowFileRepository(flow_repo_)->withProvider(controller_services_provider_.get())->withProvenanceRepository(prov_repo_)->withConfiguration(configuration_);
 
   auto context = contextBuilder->build(node);
 
diff --git a/libminifi/test/archive-tests/CompressContentTests.cpp 
b/libminifi/test/archive-tests/CompressContentTests.cpp
index bb31681..5944b41 100644
--- a/libminifi/test/archive-tests/CompressContentTests.cpp
+++ b/libminifi/test/archive-tests/CompressContentTests.cpp
@@ -144,8 +144,7 @@ class CompressDecompressionTestController : public 
TestController{
     processor_->setScheduledState(core::ScheduledState::RUNNING);
 
     std::shared_ptr<core::ProcessorNode> node = 
std::make_shared<core::ProcessorNode>(processor_);
-    std::shared_ptr<core::controller::ControllerServiceProvider> 
controller_services_provider = nullptr;
-    context_ = std::make_shared<core::ProcessContext>(node, 
controller_services_provider, repo, repo, content_repo);
+    context_ = std::make_shared<core::ProcessContext>(node, nullptr, repo, 
repo, content_repo);
   }
 
  public:
diff --git a/libminifi/test/archive-tests/MergeFileTests.cpp 
b/libminifi/test/archive-tests/MergeFileTests.cpp
index 1809d7c..6c1ec65 100644
--- a/libminifi/test/archive-tests/MergeFileTests.cpp
+++ b/libminifi/test/archive-tests/MergeFileTests.cpp
@@ -185,8 +185,7 @@ class MergeTestController : public TestController {
     logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
 
     node = std::make_shared<core::ProcessorNode>(processor);
-    std::shared_ptr<core::controller::ControllerServiceProvider> 
controller_service_provider = nullptr;
-    context = std::make_shared<core::ProcessContext>(node, 
controller_service_provider, repo, repo, content_repo);
+    context = std::make_shared<core::ProcessContext>(node, nullptr, repo, 
repo, content_repo);
   }
   ~MergeTestController() = default;
   std::shared_ptr<core::ProcessContext> context;
diff --git a/libminifi/test/integration/IntegrationBase.h 
b/libminifi/test/integration/IntegrationBase.h
index 3a24eef..ce22ef2 100644
--- a/libminifi/test/integration/IntegrationBase.h
+++ b/libminifi/test/integration/IntegrationBase.h
@@ -118,7 +118,7 @@ void IntegrationBase::run(std::string test_file_location) {
   auto controller_service_provider = yaml_ptr->getControllerServiceProvider();
   char state_dir_name_template[] = "/var/tmp/integrationstate.XXXXXX";
   state_dir = 
utils::file::FileUtils::create_temp_directory(state_dir_name_template);
-  
core::ProcessContext::getOrCreateDefaultStateManagerProvider(controller_service_provider,
 configuration, state_dir.c_str());
+  
core::ProcessContext::getOrCreateDefaultStateManagerProvider(controller_service_provider.get(),
 configuration, state_dir.c_str());
 
   std::shared_ptr<core::ProcessGroup> 
pg(yaml_config.getRoot(test_file_location));
   queryRootProcessGroup(pg);
diff --git a/libminifi/test/persistence-tests/PersistenceTests.cpp 
b/libminifi/test/persistence-tests/PersistenceTests.cpp
index 466110d..4844729 100644
--- a/libminifi/test/persistence-tests/PersistenceTests.cpp
+++ b/libminifi/test/persistence-tests/PersistenceTests.cpp
@@ -41,20 +41,18 @@ struct TestFlow{
   TestFlow(const std::shared_ptr<core::repository::FlowFileRepository>& 
ff_repository, const std::shared_ptr<core::ContentRepository>& content_repo, 
const std::shared_ptr<core::Repository>& prov_repo,
         const 
std::function<std::shared_ptr<core::Processor>(utils::Identifier&)>& 
processorGenerator, const core::Relationship& relationshipToOutput)
       : ff_repository(ff_repository), content_repo(content_repo), 
prov_repo(prov_repo) {
-    std::shared_ptr<core::controller::ControllerServiceProvider> 
controller_services_provider = nullptr;
-
     // setup processor
     {
       processor = processorGenerator(mainProcUUID());
       std::shared_ptr<core::ProcessorNode> node = 
std::make_shared<core::ProcessorNode>(processor);
-      processorContext = std::make_shared<core::ProcessContext>(node, 
controller_services_provider, prov_repo, ff_repository, content_repo);
+      processorContext = std::make_shared<core::ProcessContext>(node, nullptr, 
prov_repo, ff_repository, content_repo);
     }
 
     // setup INPUT processor
     {
       inputProcessor = std::make_shared<core::Processor>("source", 
inputProcUUID());
       std::shared_ptr<core::ProcessorNode> node = 
std::make_shared<core::ProcessorNode>(inputProcessor);
-      inputContext = std::make_shared<core::ProcessContext>(node, 
controller_services_provider, prov_repo,
+      inputContext = std::make_shared<core::ProcessContext>(node, nullptr, 
prov_repo,
                                                             ff_repository, 
content_repo);
     }
 
diff --git a/libminifi/test/rocksdb-tests/RepoTests.cpp 
b/libminifi/test/rocksdb-tests/RepoTests.cpp
index b48ebdd..aa2ad84 100644
--- a/libminifi/test/rocksdb-tests/RepoTests.cpp
+++ b/libminifi/test/rocksdb-tests/RepoTests.cpp
@@ -298,8 +298,7 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") {
     input->setSourceUUID(uuid);
     processor->addConnection(input);
     std::shared_ptr<core::ProcessorNode> node = 
std::make_shared<core::ProcessorNode>(processor);
-    std::shared_ptr<core::controller::ControllerServiceProvider> 
controller_services_provider = nullptr;
-    auto context = std::make_shared<core::ProcessContext>(node, 
controller_services_provider, prov_repo, ff_repository, content_repo);
+    auto context = std::make_shared<core::ProcessContext>(node, nullptr, 
prov_repo, ff_repository, content_repo);
     core::ProcessSession sessionGenFlowFile(context);
     std::shared_ptr<core::FlowFile> flow = 
std::static_pointer_cast<core::FlowFile>(sessionGenFlowFile.create());
     sessionGenFlowFile.importFrom(content, flow);
diff --git a/nanofi/include/cxx/C2CallbackAgent.h 
b/nanofi/include/cxx/C2CallbackAgent.h
index cc8a665..81198bb 100644
--- a/nanofi/include/cxx/C2CallbackAgent.h
+++ b/nanofi/include/cxx/C2CallbackAgent.h
@@ -47,7 +47,7 @@ class C2CallbackAgent : public c2::C2Agent {
 
  public:
 
-  explicit C2CallbackAgent(const 
std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const 
std::shared_ptr<state::StateMonitor> &updateSink, const 
std::shared_ptr<Configure> &configure);
+  explicit C2CallbackAgent(core::controller::ControllerServiceProvider* 
controller, const std::shared_ptr<state::StateMonitor> &updateSink, const 
std::shared_ptr<Configure> &configure);
 
   virtual ~C2CallbackAgent() = default;
 
diff --git a/nanofi/include/cxx/Instance.h b/nanofi/include/cxx/Instance.h
index 0a594e4..c8347c7 100644
--- a/nanofi/include/cxx/Instance.h
+++ b/nanofi/include/cxx/Instance.h
@@ -101,13 +101,12 @@ class Instance {
   }
 
   void enableAsyncC2(C2_Server *server, c2_stop_callback *c1, 
c2_start_callback *c2, c2_update_callback *c3) {
-    std::shared_ptr<core::controller::ControllerServiceProvider> 
controller_service_provider = nullptr;
     running_ = true;
     if (server->type != C2_Server_Type::MQTT) {
       configure_->set("c2.rest.url", server->url);
       configure_->set("c2.rest.url.ack", server->ack_url);
     }
-    agent_ = 
std::make_shared<c2::C2CallbackAgent>(controller_service_provider, nullptr, 
configure_);
+    agent_ = std::make_shared<c2::C2CallbackAgent>(nullptr, nullptr, 
configure_);
     listener_thread_pool_.start();
     registerUpdateListener(agent_, 1000);
     agent_->setStopCallback(c1);
@@ -133,8 +132,7 @@ class Instance {
   }
 
   void transfer(const std::shared_ptr<FlowFileRecord> &ff, const 
std::shared_ptr<minifi::io::DataStream> &stream = nullptr) {
-    std::shared_ptr<core::controller::ControllerServiceProvider> 
controller_service_provider = nullptr;
-    auto processContext = std::make_shared<core::ProcessContext>(proc_node_, 
controller_service_provider, no_op_repo_, no_op_repo_, configure_, 
content_repo_);
+    auto processContext = std::make_shared<core::ProcessContext>(proc_node_, 
nullptr, no_op_repo_, no_op_repo_, configure_, content_repo_);
     auto sessionFactory = 
std::make_shared<core::ProcessSessionFactory>(processContext);
 
     rpg_->onSchedule(processContext, sessionFactory);
diff --git a/nanofi/src/cxx/C2CallbackAgent.cpp 
b/nanofi/src/cxx/C2CallbackAgent.cpp
index 3a2b0d1..b41a870 100644
--- a/nanofi/src/cxx/C2CallbackAgent.cpp
+++ b/nanofi/src/cxx/C2CallbackAgent.cpp
@@ -34,7 +34,7 @@ namespace nifi {
 namespace minifi {
 namespace c2 {
 
-C2CallbackAgent::C2CallbackAgent(const 
std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const 
std::shared_ptr<state::StateMonitor> &updateSink,
+C2CallbackAgent::C2CallbackAgent(core::controller::ControllerServiceProvider* 
controller, const std::shared_ptr<state::StateMonitor> &updateSink,
                                  const std::shared_ptr<Configure> 
&configuration)
     : C2Agent(controller, updateSink, configuration),
       stop(nullptr),
diff --git a/nanofi/src/cxx/Plan.cpp b/nanofi/src/cxx/Plan.cpp
index 72dc162..e6a0211 100644
--- a/nanofi/src/cxx/Plan.cpp
+++ b/nanofi/src/cxx/Plan.cpp
@@ -115,7 +115,7 @@ std::shared_ptr<core::Processor> 
ExecutionPlan::addProcessor(const std::shared_p
 
   processor_nodes_.push_back(node);
 
-  std::shared_ptr<core::ProcessContext> context = 
std::make_shared<core::ProcessContext>(node, controller_services_provider_, 
prov_repo_, flow_repo_, content_repo_);
+  std::shared_ptr<core::ProcessContext> context = 
std::make_shared<core::ProcessContext>(node, 
controller_services_provider_.get(), prov_repo_, flow_repo_, content_repo_);
   processor_contexts_.push_back(context);
 
   processor_queue_.push_back(processor);
@@ -228,7 +228,7 @@ void ExecutionPlan::finalize() {
 
     processor_nodes_.push_back(node);
 
-    std::shared_ptr<core::ProcessContext> context = 
std::make_shared<core::ProcessContext>(node, controller_services_provider_, 
prov_repo_, flow_repo_, content_repo_);
+    std::shared_ptr<core::ProcessContext> context = 
std::make_shared<core::ProcessContext>(node, 
controller_services_provider_.get(), prov_repo_, flow_repo_, content_repo_);
     processor_contexts_.push_back(context);
 
     processor_queue_.push_back(failure_proc);

Reply via email to