This is an automated email from the ASF dual-hosted git repository.
aboda pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 67127a1 MINIFICPP-1300 - Extract conditional reload for the
schedulers of the FlowController
67127a1 is described below
commit 67127a18e3a1c4c388ac41e5c2ebc3d53e9a85af
Author: Adam Hunyadi <[email protected]>
AuthorDate: Fri Jul 31 15:17:54 2020 +0200
MINIFICPP-1300 - Extract conditional reload for the schedulers of the
FlowController
MINIFICPP-1300 - Decouple the ownership models of components using a
ControllerServiceProvider (eg. FlowController)
Signed-off-by: Arpad Boda <[email protected]>
This closes #858
---
extensions/coap/protocols/CoapC2Protocol.cpp | 2 +-
extensions/coap/protocols/CoapC2Protocol.h | 2 +-
.../expression-language/ProcessContextExpr.h | 4 ++--
extensions/http-curl/protocols/AgentPrinter.cpp | 2 +-
extensions/http-curl/protocols/AgentPrinter.h | 2 +-
extensions/http-curl/protocols/RESTReceiver.cpp | 2 +-
extensions/http-curl/protocols/RESTReceiver.h | 2 +-
extensions/http-curl/protocols/RESTSender.cpp | 2 +-
extensions/http-curl/protocols/RESTSender.h | 2 +-
.../http-curl/tests/unit/InvokeHTTPTests.cpp | 10 ++++----
extensions/mqtt/protocol/MQTTC2Protocol.cpp | 2 +-
extensions/mqtt/protocol/MQTTC2Protocol.h | 2 +-
.../tests/integration/TestExecuteProcess.cpp | 6 ++---
.../standard-processors/tests/unit/GetTCPTests.cpp | 15 +++++-------
.../tests/unit/ProcessorTests.cpp | 12 ++++------
libminifi/include/CronDrivenSchedulingAgent.h | 2 +-
libminifi/include/EventDrivenSchedulingAgent.h | 2 +-
libminifi/include/FlowController.h | 11 ++++++++-
libminifi/include/SchedulingAgent.h | 4 ++--
libminifi/include/ThreadedSchedulingAgent.h | 5 ++--
libminifi/include/TimerDrivenSchedulingAgent.h | 2 +-
libminifi/include/c2/C2Agent.h | 4 ++--
libminifi/include/c2/C2Protocol.h | 4 ++--
libminifi/include/c2/ControllerSocketProtocol.h | 2 +-
libminifi/include/c2/HeartBeatReporter.h | 4 ++--
libminifi/include/core/ProcessContext.h | 14 +++++------
libminifi/include/core/ProcessContextBuilder.h | 4 ++--
.../core/controller/ControllerServiceProvider.h | 2 +-
libminifi/src/FlowController.cpp | 27 +++++++---------------
libminifi/src/c2/C2Agent.cpp | 2 +-
libminifi/src/c2/ControllerSocketProtocol.cpp | 2 +-
libminifi/src/core/ProcessContextBuilder.cpp | 2 +-
libminifi/test/TestBase.cpp | 4 ++--
.../test/archive-tests/CompressContentTests.cpp | 3 +--
libminifi/test/archive-tests/MergeFileTests.cpp | 3 +--
libminifi/test/integration/IntegrationBase.h | 2 +-
.../test/persistence-tests/PersistenceTests.cpp | 6 ++---
libminifi/test/rocksdb-tests/RepoTests.cpp | 3 +--
nanofi/include/cxx/C2CallbackAgent.h | 2 +-
nanofi/include/cxx/Instance.h | 6 ++---
nanofi/src/cxx/C2CallbackAgent.cpp | 2 +-
nanofi/src/cxx/Plan.cpp | 4 ++--
42 files changed, 87 insertions(+), 108 deletions(-)
diff --git a/extensions/coap/protocols/CoapC2Protocol.cpp
b/extensions/coap/protocols/CoapC2Protocol.cpp
index b9d195e..f51d5a3 100644
--- a/extensions/coap/protocols/CoapC2Protocol.cpp
+++ b/extensions/coap/protocols/CoapC2Protocol.cpp
@@ -39,7 +39,7 @@ CoapProtocol::CoapProtocol(const std::string &name, const
utils::Identifier &uui
CoapProtocol::~CoapProtocol() = default;
-void CoapProtocol::initialize(const
std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const
std::shared_ptr<Configure> &configure) {
+void CoapProtocol::initialize(core::controller::ControllerServiceProvider*
controller, const std::shared_ptr<Configure> &configure) {
RESTSender::initialize(controller, configure);
if (configure->get("nifi.c2.coap.connector.service",
controller_service_name_)) {
auto service = controller->getControllerService(controller_service_name_);
diff --git a/extensions/coap/protocols/CoapC2Protocol.h
b/extensions/coap/protocols/CoapC2Protocol.h
index a2a702c..ce74f35 100644
--- a/extensions/coap/protocols/CoapC2Protocol.h
+++ b/extensions/coap/protocols/CoapC2Protocol.h
@@ -84,7 +84,7 @@ class CoapProtocol : public minifi::c2::RESTSender {
// no op.
}
- void initialize(const
std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const
std::shared_ptr<Configure> &configure) override;
+ void initialize(core::controller::ControllerServiceProvider* controller,
const std::shared_ptr<Configure> &configure) override;
// Supported Properties
diff --git a/extensions/expression-language/ProcessContextExpr.h
b/extensions/expression-language/ProcessContextExpr.h
index 4d0db54..191859f 100644
--- a/extensions/expression-language/ProcessContextExpr.h
+++ b/extensions/expression-language/ProcessContextExpr.h
@@ -37,14 +37,14 @@ class ProcessContextExpr : public core::ProcessContext {
/**
std::forward of argument list did not work on all platform.
**/
- ProcessContextExpr(const std::shared_ptr<ProcessorNode> &processor,
std::shared_ptr<controller::ControllerServiceProvider>
&controller_service_provider,
+ ProcessContextExpr(const std::shared_ptr<ProcessorNode> &processor,
controller::ControllerServiceProvider* controller_service_provider,
const std::shared_ptr<core::Repository> &repo, const
std::shared_ptr<core::Repository> &flow_repo,
const std::shared_ptr<core::ContentRepository>
&content_repo = std::make_shared<core::repository::FileSystemRepository>())
: core::ProcessContext(processor, controller_service_provider, repo,
flow_repo, content_repo),
logger_(logging::LoggerFactory<ProcessContextExpr>::getLogger()) {
}
- ProcessContextExpr(const std::shared_ptr<ProcessorNode> &processor,
std::shared_ptr<controller::ControllerServiceProvider>
&controller_service_provider,
+ ProcessContextExpr(const std::shared_ptr<ProcessorNode> &processor,
controller::ControllerServiceProvider* controller_service_provider,
const std::shared_ptr<core::Repository> &repo, const
std::shared_ptr<core::Repository> &flow_repo, const
std::shared_ptr<minifi::Configure> &configuration,
const std::shared_ptr<core::ContentRepository>
&content_repo = std::make_shared<core::repository::FileSystemRepository>())
: core::ProcessContext(processor, controller_service_provider, repo,
flow_repo, configuration, content_repo),
diff --git a/extensions/http-curl/protocols/AgentPrinter.cpp
b/extensions/http-curl/protocols/AgentPrinter.cpp
index b22331e..672652a 100644
--- a/extensions/http-curl/protocols/AgentPrinter.cpp
+++ b/extensions/http-curl/protocols/AgentPrinter.cpp
@@ -36,7 +36,7 @@ AgentPrinter::AgentPrinter(std::string name,
utils::Identifier uuid)
logger_(logging::LoggerFactory<AgentPrinter>::getLogger()) {
}
-void AgentPrinter::initialize(const
std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const
std::shared_ptr<state::StateMonitor> &updateSink,
+void AgentPrinter::initialize(core::controller::ControllerServiceProvider*
controller, const std::shared_ptr<state::StateMonitor> &updateSink,
const std::shared_ptr<Configure> &configure) {
HeartBeatReporter::initialize(controller, updateSink, configure);
}
diff --git a/extensions/http-curl/protocols/AgentPrinter.h
b/extensions/http-curl/protocols/AgentPrinter.h
index 50000fe..83c8353 100644
--- a/extensions/http-curl/protocols/AgentPrinter.h
+++ b/extensions/http-curl/protocols/AgentPrinter.h
@@ -45,7 +45,7 @@ class AgentPrinter : public RESTProtocol, public
HeartBeatReporter {
/**
* Initialize agent printer.
*/
- virtual void initialize(const
std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const
std::shared_ptr<state::StateMonitor> &updateSink,
+ virtual void initialize(core::controller::ControllerServiceProvider*
controller, const std::shared_ptr<state::StateMonitor> &updateSink,
const std::shared_ptr<Configure> &configure)
override;
/**
diff --git a/extensions/http-curl/protocols/RESTReceiver.cpp
b/extensions/http-curl/protocols/RESTReceiver.cpp
index ae1ebf6..10557e7 100644
--- a/extensions/http-curl/protocols/RESTReceiver.cpp
+++ b/extensions/http-curl/protocols/RESTReceiver.cpp
@@ -44,7 +44,7 @@ RESTReceiver::RESTReceiver(std::string name,
utils::Identifier uuid)
logger_(logging::LoggerFactory<RESTReceiver>::getLogger()) {
}
-void RESTReceiver::initialize(const
std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const
std::shared_ptr<state::StateMonitor> &updateSink, const
std::shared_ptr<Configure> &configure) {
+void RESTReceiver::initialize(core::controller::ControllerServiceProvider*
controller, const std::shared_ptr<state::StateMonitor> &updateSink, const
std::shared_ptr<Configure> &configure) {
HeartBeatReporter::initialize(controller, updateSink, configure);
logger_->log_trace("Initializing rest receiver");
if (nullptr != configuration_) {
diff --git a/extensions/http-curl/protocols/RESTReceiver.h
b/extensions/http-curl/protocols/RESTReceiver.h
index 9c78a83..4bd6d51 100644
--- a/extensions/http-curl/protocols/RESTReceiver.h
+++ b/extensions/http-curl/protocols/RESTReceiver.h
@@ -48,7 +48,7 @@ class RESTReceiver : public RESTProtocol, public
HeartBeatReporter {
public:
RESTReceiver(std::string name, utils::Identifier uuid = utils::Identifier());
- virtual void initialize(const
std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const
std::shared_ptr<state::StateMonitor> &updateSink,
+ virtual void initialize(core::controller::ControllerServiceProvider*
controller, const std::shared_ptr<state::StateMonitor> &updateSink,
const std::shared_ptr<Configure> &configure)
override;
virtual int16_t heartbeat(const C2Payload &heartbeat) override;
diff --git a/extensions/http-curl/protocols/RESTSender.cpp
b/extensions/http-curl/protocols/RESTSender.cpp
index 160f048..649910e 100644
--- a/extensions/http-curl/protocols/RESTSender.cpp
+++ b/extensions/http-curl/protocols/RESTSender.cpp
@@ -41,7 +41,7 @@ RESTSender::RESTSender(const std::string &name, const
utils::Identifier &uuid)
logger_(logging::LoggerFactory<Connectable>::getLogger()) {
}
-void RESTSender::initialize(const
std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const
std::shared_ptr<Configure> &configure) {
+void RESTSender::initialize(core::controller::ControllerServiceProvider*
controller, const std::shared_ptr<Configure> &configure) {
C2Protocol::initialize(controller, configure);
// base URL when one is not specified.
if (nullptr != configure) {
diff --git a/extensions/http-curl/protocols/RESTSender.h
b/extensions/http-curl/protocols/RESTSender.h
index 40a81f2..211716c 100644
--- a/extensions/http-curl/protocols/RESTSender.h
+++ b/extensions/http-curl/protocols/RESTSender.h
@@ -56,7 +56,7 @@ class RESTSender : public RESTProtocol, public C2Protocol {
virtual void update(const std::shared_ptr<Configure> &configure) override;
- virtual void initialize(const
std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const
std::shared_ptr<Configure> &configure) override;
+ virtual void initialize(core::controller::ControllerServiceProvider*
controller, const std::shared_ptr<Configure> &configure) override;
protected:
diff --git a/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp
b/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp
index 48e6710..93e1091 100644
--- a/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp
+++ b/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp
@@ -89,9 +89,8 @@ TEST_CASE("HTTPTestsWithNoResourceClaimPOST", "[httptest1]") {
std::shared_ptr<core::ProcessorNode> node =
std::make_shared<core::ProcessorNode>(listenhttp);
std::shared_ptr<core::ProcessorNode> node2 =
std::make_shared<core::ProcessorNode>(invokehttp);
- std::shared_ptr<core::controller::ControllerServiceProvider>
controller_services_provider = nullptr;
- std::shared_ptr<core::ProcessContext> context =
std::make_shared<core::ProcessContext>(node, controller_services_provider,
repo, repo, content_repo);
- std::shared_ptr<core::ProcessContext> context2 =
std::make_shared<core::ProcessContext>(node2, controller_services_provider,
repo, repo, content_repo);
+ std::shared_ptr<core::ProcessContext> context =
std::make_shared<core::ProcessContext>(node, nullptr, repo, repo, content_repo);
+ std::shared_ptr<core::ProcessContext> context2 =
std::make_shared<core::ProcessContext>(node2, nullptr, repo, repo,
content_repo);
context->setProperty(org::apache::nifi::minifi::processors::ListenHTTP::Port,
"8686");
context->setProperty(org::apache::nifi::minifi::processors::ListenHTTP::BasePath,
"/testytesttest");
@@ -211,9 +210,8 @@ TEST_CASE("HTTPTestsWithResourceClaimPOST", "[httptest1]") {
std::shared_ptr<core::ProcessorNode> node =
std::make_shared<core::ProcessorNode>(listenhttp);
std::shared_ptr<core::ProcessorNode> node2 =
std::make_shared<core::ProcessorNode>(invokehttp);
- std::shared_ptr<core::controller::ControllerServiceProvider>
controller_services_provider = nullptr;
- std::shared_ptr<core::ProcessContext> context =
std::make_shared<core::ProcessContext>(node, controller_services_provider,
repo, repo, content_repo);
- std::shared_ptr<core::ProcessContext> context2 =
std::make_shared<core::ProcessContext>(node2, controller_services_provider,
repo, repo, content_repo);
+ std::shared_ptr<core::ProcessContext> context =
std::make_shared<core::ProcessContext>(node, nullptr, repo, repo, content_repo);
+ std::shared_ptr<core::ProcessContext> context2 =
std::make_shared<core::ProcessContext>(node2, nullptr, repo, repo,
content_repo);
context->setProperty(org::apache::nifi::minifi::processors::ListenHTTP::Port,
"8680");
context->setProperty(org::apache::nifi::minifi::processors::ListenHTTP::BasePath,
"/testytesttest");
diff --git a/extensions/mqtt/protocol/MQTTC2Protocol.cpp
b/extensions/mqtt/protocol/MQTTC2Protocol.cpp
index 9637e20..2dcdf68 100644
--- a/extensions/mqtt/protocol/MQTTC2Protocol.cpp
+++ b/extensions/mqtt/protocol/MQTTC2Protocol.cpp
@@ -30,7 +30,7 @@ MQTTC2Protocol::MQTTC2Protocol(std::string name,
utils::Identifier uuid)
MQTTC2Protocol::~MQTTC2Protocol() = default;
-void MQTTC2Protocol::initialize(const
std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const
std::shared_ptr<Configure> &configure) {
+void MQTTC2Protocol::initialize(core::controller::ControllerServiceProvider*
controller, const std::shared_ptr<Configure> &configure) {
if (configure->get("nifi.c2.mqtt.connector.service",
controller_service_name_)) {
auto service = controller->getControllerService(controller_service_name_);
mqtt_service_ =
std::static_pointer_cast<controllers::MQTTControllerService>(service);
diff --git a/extensions/mqtt/protocol/MQTTC2Protocol.h
b/extensions/mqtt/protocol/MQTTC2Protocol.h
index 47e04a6..1497b10 100644
--- a/extensions/mqtt/protocol/MQTTC2Protocol.h
+++ b/extensions/mqtt/protocol/MQTTC2Protocol.h
@@ -64,7 +64,7 @@ class MQTTC2Protocol : public C2Protocol {
// no op.
}
- virtual void initialize(const
std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const
std::shared_ptr<Configure> &configure) override;
+ virtual void initialize(core::controller::ControllerServiceProvider*
controller, const std::shared_ptr<Configure> &configure) override;
protected:
diff --git
a/extensions/standard-processors/tests/integration/TestExecuteProcess.cpp
b/extensions/standard-processors/tests/integration/TestExecuteProcess.cpp
index 138e610..fa597ce 100644
--- a/extensions/standard-processors/tests/integration/TestExecuteProcess.cpp
+++ b/extensions/standard-processors/tests/integration/TestExecuteProcess.cpp
@@ -82,16 +82,14 @@ int main(int argc, char **argv) {
std::vector<std::thread> processor_workers;
std::shared_ptr<core::ProcessorNode> node2 =
std::make_shared<core::ProcessorNode>(processor);
- std::shared_ptr<core::controller::ControllerServiceProvider>
controller_services_provider = nullptr;
- std::shared_ptr<core::ProcessContext> contextset =
std::make_shared<core::ProcessContext>(node2, controller_services_provider,
test_repo, test_repo);
+ std::shared_ptr<core::ProcessContext> contextset =
std::make_shared<core::ProcessContext>(node2, nullptr, test_repo, test_repo);
core::ProcessSessionFactory factory(contextset);
processor->onSchedule(contextset.get(), &factory);
for (int i = 0; i < 1; i++) {
processor_workers.push_back(std::thread([processor, test_repo,
&is_ready]() {
std::shared_ptr<core::ProcessorNode> node =
std::make_shared<core::ProcessorNode>(processor);
- std::shared_ptr<core::controller::ControllerServiceProvider>
controller_services_provider = nullptr;
- std::shared_ptr<core::ProcessContext> context =
std::make_shared<core::ProcessContext>(node, controller_services_provider,
test_repo, test_repo);
+ std::shared_ptr<core::ProcessContext> context =
std::make_shared<core::ProcessContext>(node, nullptr, test_repo, test_repo);
context->setProperty(org::apache::nifi::minifi::processors::ExecuteProcess::Command,
"sleep 0.5");
std::shared_ptr<core::ProcessSession> session =
std::make_shared<core::ProcessSession>(context);
while (!is_ready.load(std::memory_order_relaxed)) {
diff --git a/extensions/standard-processors/tests/unit/GetTCPTests.cpp
b/extensions/standard-processors/tests/unit/GetTCPTests.cpp
index 10d5f77..5f86a51 100644
--- a/extensions/standard-processors/tests/unit/GetTCPTests.cpp
+++ b/extensions/standard-processors/tests/unit/GetTCPTests.cpp
@@ -95,9 +95,8 @@ TEST_CASE("GetTCPWithoutEOM", "[GetTCP1]") {
std::shared_ptr<core::ProcessorNode> node =
std::make_shared<core::ProcessorNode>(processor);
std::shared_ptr<core::ProcessorNode> node2 =
std::make_shared<core::ProcessorNode>(logAttribute);
- std::shared_ptr<core::controller::ControllerServiceProvider>
controller_services_provider = nullptr;
- std::shared_ptr<core::ProcessContext> context =
std::make_shared<core::ProcessContext>(node, controller_services_provider,
repo, repo, content_repo);
- std::shared_ptr<core::ProcessContext> context2 =
std::make_shared<core::ProcessContext>(node2, controller_services_provider,
repo, repo, content_repo);
+ std::shared_ptr<core::ProcessContext> context =
std::make_shared<core::ProcessContext>(node, nullptr, repo, repo, content_repo);
+ std::shared_ptr<core::ProcessContext> context2 =
std::make_shared<core::ProcessContext>(node2, nullptr, repo, repo,
content_repo);
context->setProperty(org::apache::nifi::minifi::processors::GetTCP::EndpointList,
org::apache::nifi::minifi::io::Socket::getMyHostName() + ":" +
std::to_string(server.getPort()));
context->setProperty(org::apache::nifi::minifi::processors::GetTCP::ReconnectInterval,
"100 msec");
auto session = std::make_shared<core::ProcessSession>(context);
@@ -206,9 +205,8 @@ TEST_CASE("GetTCPWithOEM", "[GetTCP2]") {
std::shared_ptr<core::ProcessorNode> node =
std::make_shared<core::ProcessorNode>(processor);
std::shared_ptr<core::ProcessorNode> node2 =
std::make_shared<core::ProcessorNode>(logAttribute);
- std::shared_ptr<core::controller::ControllerServiceProvider>
controller_services_provider = nullptr;
- std::shared_ptr<core::ProcessContext> context =
std::make_shared<core::ProcessContext>(node, controller_services_provider,
repo, repo, content_repo);
- std::shared_ptr<core::ProcessContext> context2 =
std::make_shared<core::ProcessContext>(node2, controller_services_provider,
repo, repo, content_repo);
+ std::shared_ptr<core::ProcessContext> context =
std::make_shared<core::ProcessContext>(node, nullptr, repo, repo, content_repo);
+ std::shared_ptr<core::ProcessContext> context2 =
std::make_shared<core::ProcessContext>(node2, nullptr, repo, repo,
content_repo);
context->setProperty(org::apache::nifi::minifi::processors::GetTCP::EndpointList,
org::apache::nifi::minifi::io::Socket::getMyHostName() + ":" +
std::to_string(server.getPort()));
context->setProperty(org::apache::nifi::minifi::processors::GetTCP::ReconnectInterval,
"100 msec");
// we're using new lines above
@@ -329,9 +327,8 @@ TEST_CASE("GetTCPWithOnlyOEM", "[GetTCP3]") {
std::shared_ptr<core::ProcessorNode> node =
std::make_shared<core::ProcessorNode>(processor);
std::shared_ptr<core::ProcessorNode> node2 =
std::make_shared<core::ProcessorNode>(logAttribute);
- std::shared_ptr<core::controller::ControllerServiceProvider>
controller_services_provider = nullptr;
- std::shared_ptr<core::ProcessContext> context =
std::make_shared<core::ProcessContext>(node, controller_services_provider,
repo, repo, content_repo);
- std::shared_ptr<core::ProcessContext> context2 =
std::make_shared<core::ProcessContext>(node2, controller_services_provider,
repo, repo, content_repo);
+ std::shared_ptr<core::ProcessContext> context =
std::make_shared<core::ProcessContext>(node, nullptr, repo, repo, content_repo);
+ std::shared_ptr<core::ProcessContext> context2 =
std::make_shared<core::ProcessContext>(node2, nullptr, repo, repo,
content_repo);
context->setProperty(org::apache::nifi::minifi::processors::GetTCP::EndpointList,
org::apache::nifi::minifi::io::Socket::getMyHostName() + ":" +
std::to_string(server.getPort()));
context->setProperty(org::apache::nifi::minifi::processors::GetTCP::ReconnectInterval,
"100 msec");
// we're using new lines above
diff --git a/extensions/standard-processors/tests/unit/ProcessorTests.cpp
b/extensions/standard-processors/tests/unit/ProcessorTests.cpp
index a251b4e..a99d8b3 100644
--- a/extensions/standard-processors/tests/unit/ProcessorTests.cpp
+++ b/extensions/standard-processors/tests/unit/ProcessorTests.cpp
@@ -83,8 +83,7 @@ TEST_CASE("Test GetFileMultiple", "[getfileCreate3]") {
REQUIRE(!dir.empty());
std::shared_ptr<core::ProcessorNode> node =
std::make_shared<core::ProcessorNode>(processor);
- std::shared_ptr<core::controller::ControllerServiceProvider>
controller_services_provider = nullptr;
- auto context = std::make_shared<core::ProcessContext>(node,
controller_services_provider, repo, repo, content_repo);
+ auto context = std::make_shared<core::ProcessContext>(node, nullptr, repo,
repo, content_repo);
context->setProperty(org::apache::nifi::minifi::processors::GetFile::Directory,
dir);
// replicate 10 threads
@@ -169,8 +168,7 @@ TEST_CASE("Test GetFile Ignore", "[getfileCreate3]") {
REQUIRE(!dir.empty());
std::shared_ptr<core::ProcessorNode> node =
std::make_shared<core::ProcessorNode>(processor);
- std::shared_ptr<core::controller::ControllerServiceProvider>
controller_services_provider = nullptr;
- auto context = std::make_shared<core::ProcessContext>(node,
controller_services_provider, repo, repo, content_repo);
+ auto context = std::make_shared<core::ProcessContext>(node, nullptr, repo,
repo, content_repo);
context->setProperty(org::apache::nifi::minifi::processors::GetFile::Directory,
dir);
// replicate 10 threads
@@ -265,8 +263,7 @@ TEST_CASE("TestConnectionFull", "[ConnectionFull]") {
processor->setScheduledState(core::ScheduledState::RUNNING);
std::shared_ptr<core::ProcessorNode> node =
std::make_shared<core::ProcessorNode>(processor);
- std::shared_ptr<core::controller::ControllerServiceProvider>
controller_services_provider = nullptr;
- auto context = std::make_shared<core::ProcessContext>(node,
controller_services_provider, repo, repo, content_repo);
+ auto context = std::make_shared<core::ProcessContext>(node, nullptr, repo,
repo, content_repo);
auto factory = std::make_shared<core::ProcessSessionFactory>(context);
@@ -574,8 +571,7 @@ void testRPGBypass(const std::string &host, const
std::string &port, const std::
REQUIRE(rpg->setProperty(minifi::RemoteProcessorGroupPort::hostName, host));
rpg->setProperty(minifi::RemoteProcessorGroupPort::port, port);
std::shared_ptr<core::ProcessorNode> node =
std::make_shared<core::ProcessorNode>(rpg);
- std::shared_ptr<core::controller::ControllerServiceProvider>
controller_services_provider = nullptr;
- auto context = std::make_shared<core::ProcessContext>(node,
controller_services_provider, repo, repo, content_repo);
+ auto context = std::make_shared<core::ProcessContext>(node, nullptr, repo,
repo, content_repo);
auto psf = std::make_shared<core::ProcessSessionFactory>(context);
if (hasException) {
auto expected_error = "Site2Site Protocol: HTTPClient not resolvable. No
peers configured or any port specific hostname and port -- cannot schedule";
diff --git a/libminifi/include/CronDrivenSchedulingAgent.h
b/libminifi/include/CronDrivenSchedulingAgent.h
index 31ac37a..3f1e7e0 100644
--- a/libminifi/include/CronDrivenSchedulingAgent.h
+++ b/libminifi/include/CronDrivenSchedulingAgent.h
@@ -44,7 +44,7 @@ class CronDrivenSchedulingAgent : public
ThreadedSchedulingAgent {
/*!
* Create a new event driven scheduling agent.
*/
-
CronDrivenSchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider>
controller_service_provider, std::shared_ptr<core::Repository> repo,
+ CronDrivenSchedulingAgent(const
gsl::not_null<core::controller::ControllerServiceProvider*>
controller_service_provider, std::shared_ptr<core::Repository> repo,
std::shared_ptr<core::Repository> flow_repo,
std::shared_ptr<core::ContentRepository> content_repo,
std::shared_ptr<Configure> configuration,
utils::ThreadPool<utils::TaskRescheduleInfo>
&thread_pool)
: ThreadedSchedulingAgent(controller_service_provider, repo, flow_repo,
content_repo, configuration, thread_pool) {
diff --git a/libminifi/include/EventDrivenSchedulingAgent.h
b/libminifi/include/EventDrivenSchedulingAgent.h
index 65d25bb..d56e929 100644
--- a/libminifi/include/EventDrivenSchedulingAgent.h
+++ b/libminifi/include/EventDrivenSchedulingAgent.h
@@ -43,7 +43,7 @@ class EventDrivenSchedulingAgent : public
ThreadedSchedulingAgent {
/*!
* Create a new event driven scheduling agent.
*/
-
EventDrivenSchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider>
controller_service_provider, std::shared_ptr<core::Repository> repo,
+ EventDrivenSchedulingAgent(const
gsl::not_null<core::controller::ControllerServiceProvider*>
controller_service_provider, std::shared_ptr<core::Repository> repo,
std::shared_ptr<core::Repository> flow_repo,
std::shared_ptr<core::ContentRepository> content_repo,
std::shared_ptr<Configure> configuration,
utils::ThreadPool<utils::TaskRescheduleInfo>
&thread_pool)
: ThreadedSchedulingAgent(controller_service_provider, repo, flow_repo,
content_repo, configuration, thread_pool) {
diff --git a/libminifi/include/FlowController.h
b/libminifi/include/FlowController.h
index c40c573..cd1804e 100644
--- a/libminifi/include/FlowController.h
+++ b/libminifi/include/FlowController.h
@@ -235,7 +235,7 @@ class FlowController : public
core::controller::ControllerServiceProvider, publi
* @param id service identifier
* @return shared pointer to the controller service node or nullptr if it
does not exist.
*/
- std::shared_ptr<core::controller::ControllerServiceNode>
getControllerServiceNode(const std::string &id) override;
+ std::shared_ptr<core::controller::ControllerServiceNode>
getControllerServiceNode(const std::string &id) const override;
void
verifyCanStopReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode>
&serviceNode) override;
@@ -337,6 +337,15 @@ class FlowController : public
core::controller::ControllerServiceProvider, publi
utils::optional<std::chrono::milliseconds>
loadShutdownTimeoutFromConfiguration();
+ private:
+ template <typename T, typename = typename
std::enable_if<std::is_base_of<SchedulingAgent, T>::value>::type>
+ void conditionalReloadScheduler(std::shared_ptr<T>& scheduler, const bool
condition) {
+ if (condition) {
+ scheduler =
std::make_shared<T>(gsl::not_null<core::controller::ControllerServiceProvider*>(this),
provenance_repo_, flow_file_repo_, content_repo_, configuration_,
thread_pool_);
+ }
+ }
+
+ protected:
// flow controller mutex
std::recursive_mutex mutex_;
diff --git a/libminifi/include/SchedulingAgent.h
b/libminifi/include/SchedulingAgent.h
index 088c3e1..fb5532d 100644
--- a/libminifi/include/SchedulingAgent.h
+++ b/libminifi/include/SchedulingAgent.h
@@ -59,7 +59,7 @@ class SchedulingAgent {
/*!
* Create a new scheduling agent.
*/
- SchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider>
controller_service_provider, std::shared_ptr<core::Repository> repo,
std::shared_ptr<core::Repository> flow_repo,
+ SchedulingAgent(const
gsl::not_null<core::controller::ControllerServiceProvider*>
controller_service_provider, std::shared_ptr<core::Repository> repo,
std::shared_ptr<core::Repository> flow_repo,
std::shared_ptr<core::ContentRepository> content_repo,
std::shared_ptr<Configure> configuration,
utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool)
: admin_yield_duration_(),
bored_yield_duration_(0),
@@ -135,7 +135,7 @@ class SchedulingAgent {
// thread pool for components.
utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool_;
// controller service provider reference
- std::shared_ptr<core::controller::ControllerServiceProvider>
controller_service_provider_;
+ gsl::not_null<core::controller::ControllerServiceProvider*>
controller_service_provider_;
private:
struct SchedulingInfo {
diff --git a/libminifi/include/ThreadedSchedulingAgent.h
b/libminifi/include/ThreadedSchedulingAgent.h
index 8a35580..2e8edda 100644
--- a/libminifi/include/ThreadedSchedulingAgent.h
+++ b/libminifi/include/ThreadedSchedulingAgent.h
@@ -47,8 +47,9 @@ class ThreadedSchedulingAgent : public SchedulingAgent {
/*!
* Create a new threaded scheduling agent.
*/
-
ThreadedSchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider>
controller_service_provider, std::shared_ptr<core::Repository> repo,
std::shared_ptr<core::Repository> flow_repo,
- std::shared_ptr<core::ContentRepository>
content_repo, std::shared_ptr<Configure> configuration,
utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool)
+ ThreadedSchedulingAgent(const
gsl::not_null<core::controller::ControllerServiceProvider*>
controller_service_provider, std::shared_ptr<core::Repository> repo,
+ std::shared_ptr<core::Repository> flow_repo,
std::shared_ptr<core::ContentRepository> content_repo,
+ std::shared_ptr<Configure> configuration,
utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool)
: SchedulingAgent(controller_service_provider, repo, flow_repo,
content_repo, configuration, thread_pool),
logger_(logging::LoggerFactory<ThreadedSchedulingAgent>::getLogger()) {
}
diff --git a/libminifi/include/TimerDrivenSchedulingAgent.h
b/libminifi/include/TimerDrivenSchedulingAgent.h
index 160db1d..1481c18 100644
--- a/libminifi/include/TimerDrivenSchedulingAgent.h
+++ b/libminifi/include/TimerDrivenSchedulingAgent.h
@@ -39,7 +39,7 @@ class TimerDrivenSchedulingAgent : public
ThreadedSchedulingAgent {
/*!
* Create a new processor
*/
-
TimerDrivenSchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider>
controller_service_provider, std::shared_ptr<core::Repository> repo,
+ TimerDrivenSchedulingAgent(const
gsl::not_null<core::controller::ControllerServiceProvider*>
controller_service_provider, std::shared_ptr<core::Repository> repo,
std::shared_ptr<core::Repository> flow_repo,
std::shared_ptr<core::ContentRepository> content_repo,
std::shared_ptr<Configure> configure,
utils::ThreadPool<utils::TaskRescheduleInfo>
&thread_pool)
: ThreadedSchedulingAgent(controller_service_provider, repo, flow_repo,
content_repo, configure, thread_pool),
diff --git a/libminifi/include/c2/C2Agent.h b/libminifi/include/c2/C2Agent.h
index 273fd43..edcf096 100644
--- a/libminifi/include/c2/C2Agent.h
+++ b/libminifi/include/c2/C2Agent.h
@@ -62,7 +62,7 @@ namespace c2 {
*/
class C2Agent : public state::UpdateController {
public:
- C2Agent(const std::shared_ptr<core::controller::ControllerServiceProvider>
&controller,
+ C2Agent(core::controller::ControllerServiceProvider* controller,
const std::shared_ptr<state::StateMonitor> &updateSink,
const std::shared_ptr<Configure> &configure);
virtual ~C2Agent() noexcept {
@@ -210,7 +210,7 @@ class C2Agent : public state::UpdateController {
std::shared_ptr<controllers::UpdatePolicyControllerService> update_service_;
// controller service provider reference.
- std::shared_ptr<core::controller::ControllerServiceProvider> controller_;
+ core::controller::ControllerServiceProvider* controller_;
// shared pointer to the configuration of this agent
std::shared_ptr<Configure> configuration_;
diff --git a/libminifi/include/c2/C2Protocol.h
b/libminifi/include/c2/C2Protocol.h
index 2cbd0bd..23e7748 100644
--- a/libminifi/include/c2/C2Protocol.h
+++ b/libminifi/include/c2/C2Protocol.h
@@ -41,7 +41,7 @@ class C2Protocol : public core::Connectable {
running_(true) {
}
- virtual void initialize(const
std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const
std::shared_ptr<Configure> &configure) {
+ virtual void initialize(core::controller::ControllerServiceProvider*
controller, const std::shared_ptr<Configure> &configure) {
controller_ = controller;
configuration_ = configure;
}
@@ -102,7 +102,7 @@ class C2Protocol : public core::Connectable {
protected:
std::atomic<bool> running_;
- std::shared_ptr<core::controller::ControllerServiceProvider> controller_;
+ core::controller::ControllerServiceProvider* controller_;
std::shared_ptr<Configure> configuration_;
};
diff --git a/libminifi/include/c2/ControllerSocketProtocol.h
b/libminifi/include/c2/ControllerSocketProtocol.h
index d24c8fd..1636477 100644
--- a/libminifi/include/c2/ControllerSocketProtocol.h
+++ b/libminifi/include/c2/ControllerSocketProtocol.h
@@ -51,7 +51,7 @@ class ControllerSocketProtocol : public HeartBeatReporter {
* @param updateSink update mechanism that will be used to stop/clear
elements
* @param configuration configuration class.
*/
- virtual void initialize(const
std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const
std::shared_ptr<state::StateMonitor> &updateSink,
+ virtual void initialize(core::controller::ControllerServiceProvider*
controller, const std::shared_ptr<state::StateMonitor> &updateSink,
const std::shared_ptr<Configure> &configuration);
/**
diff --git a/libminifi/include/c2/HeartBeatReporter.h
b/libminifi/include/c2/HeartBeatReporter.h
index 9de242d..82d5f2a 100644
--- a/libminifi/include/c2/HeartBeatReporter.h
+++ b/libminifi/include/c2/HeartBeatReporter.h
@@ -45,7 +45,7 @@ class HeartBeatReporter : public core::Connectable {
configuration_(nullptr) {
}
- virtual void initialize(const
std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const
std::shared_ptr<state::StateMonitor> &updateSink,
+ virtual void initialize(core::controller::ControllerServiceProvider*
controller, const std::shared_ptr<state::StateMonitor> &updateSink,
const std::shared_ptr<Configure> &configure) {
controller_ = controller;
update_sink_ = updateSink;
@@ -89,7 +89,7 @@ class HeartBeatReporter : public core::Connectable {
}
protected:
- std::shared_ptr<core::controller::ControllerServiceProvider> controller_;
+ core::controller::ControllerServiceProvider* controller_;
std::shared_ptr<state::StateMonitor> update_sink_;
diff --git a/libminifi/include/core/ProcessContext.h
b/libminifi/include/core/ProcessContext.h
index 53d42aa..3e0da1a 100644
--- a/libminifi/include/core/ProcessContext.h
+++ b/libminifi/include/core/ProcessContext.h
@@ -55,7 +55,7 @@ class ProcessContext : public
controller::ControllerServiceLookup, public core::
/*!
* Create a new process context associated with the processor/controller
service/state manager
*/
- ProcessContext(const std::shared_ptr<ProcessorNode> &processor,
std::shared_ptr<controller::ControllerServiceProvider>
&controller_service_provider, const std::shared_ptr<core::Repository> &repo,
+ ProcessContext(const std::shared_ptr<ProcessorNode> &processor,
controller::ControllerServiceProvider* controller_service_provider, const
std::shared_ptr<core::Repository> &repo,
const std::shared_ptr<core::Repository> &flow_repo, const
std::shared_ptr<core::ContentRepository> &content_repo =
std::make_shared<core::repository::FileSystemRepository>())
: VariableRegistry(std::make_shared<minifi::Configure>()),
controller_service_provider_(controller_service_provider),
@@ -73,7 +73,7 @@ class ProcessContext : public
controller::ControllerServiceLookup, public core::
/*!
* Create a new process context associated with the processor/controller
service/state manager
*/
- ProcessContext(const std::shared_ptr<ProcessorNode> &processor,
std::shared_ptr<controller::ControllerServiceProvider>
&controller_service_provider, const std::shared_ptr<core::Repository> &repo,
+ ProcessContext(const std::shared_ptr<ProcessorNode> &processor,
controller::ControllerServiceProvider* controller_service_provider, const
std::shared_ptr<core::Repository> &repo,
const std::shared_ptr<core::Repository> &flow_repo, const
std::shared_ptr<minifi::Configure> &configuration, const
std::shared_ptr<core::ContentRepository> &content_repo =
std::make_shared<core::repository::FileSystemRepository>())
: VariableRegistry(configuration),
@@ -172,9 +172,7 @@ class ProcessContext : public
controller::ControllerServiceLookup, public core::
* identifier
*/
std::shared_ptr<core::controller::ControllerService>
getControllerService(const std::string &identifier) {
- if (controller_service_provider_ != nullptr)
- return
controller_service_provider_->getControllerServiceForComponent(identifier,
processor_node_->getUUIDStr());
- return nullptr;
+ return controller_service_provider_ == nullptr ? nullptr :
controller_service_provider_->getControllerServiceForComponent(identifier,
processor_node_->getUUIDStr());
}
/**
@@ -228,7 +226,7 @@ class ProcessContext : public
controller::ControllerServiceLookup, public core::
}
static std::shared_ptr<core::CoreComponentStateManagerProvider>
getOrCreateDefaultStateManagerProvider(
- std::shared_ptr<controller::ControllerServiceProvider>
controller_service_provider,
+ controller::ControllerServiceProvider* controller_service_provider,
std::shared_ptr<minifi::Configure> configuration,
const char *base_path = "") {
static std::mutex mutex;
@@ -300,7 +298,7 @@ class ProcessContext : public
controller::ControllerServiceLookup, public core::
static std::shared_ptr<core::CoreComponentStateManagerProvider>
getStateManagerProvider(
std::shared_ptr<logging::Logger> logger,
- std::shared_ptr<controller::ControllerServiceProvider>
controller_service_provider,
+ controller::ControllerServiceProvider* controller_service_provider,
std::shared_ptr<minifi::Configure> configuration) {
if (controller_service_provider == nullptr) {
return nullptr;
@@ -330,7 +328,7 @@ class ProcessContext : public
controller::ControllerServiceLookup, public core::
}
// controller service provider.
- std::shared_ptr<controller::ControllerServiceProvider>
controller_service_provider_;
+ controller::ControllerServiceProvider* controller_service_provider_;
// state manager provider
std::shared_ptr<core::CoreComponentStateManagerProvider>
state_manager_provider_;
// repository shared pointer.
diff --git a/libminifi/include/core/ProcessContextBuilder.h
b/libminifi/include/core/ProcessContextBuilder.h
index 155ab41..281be79 100644
--- a/libminifi/include/core/ProcessContextBuilder.h
+++ b/libminifi/include/core/ProcessContextBuilder.h
@@ -62,7 +62,7 @@ class ProcessContextBuilder : public core::CoreComponent,
public std::enable_sha
virtual ~ProcessContextBuilder() = default;
- std::shared_ptr<ProcessContextBuilder> withProvider(const
std::shared_ptr<controller::ControllerServiceProvider>
&controller_service_provider);
+ std::shared_ptr<ProcessContextBuilder>
withProvider(core::controller::ControllerServiceProvider*
controller_service_provider);
std::shared_ptr<ProcessContextBuilder> withProvenanceRepository(const
std::shared_ptr<core::Repository> &repo);
@@ -76,7 +76,7 @@ class ProcessContextBuilder : public core::CoreComponent,
public std::enable_sha
protected:
std::shared_ptr<minifi::Configure> configuration_;
- std::shared_ptr<controller::ControllerServiceProvider>
controller_service_provider_;
+ core::controller::ControllerServiceProvider* controller_service_provider_;
std::shared_ptr<core::Repository> prov_repo_;
std::shared_ptr<core::Repository> flow_repo_;
std::shared_ptr<core::ContentRepository> content_repo_;
diff --git a/libminifi/include/core/controller/ControllerServiceProvider.h
b/libminifi/include/core/controller/ControllerServiceProvider.h
index 8628f94..428b977 100644
--- a/libminifi/include/core/controller/ControllerServiceProvider.h
+++ b/libminifi/include/core/controller/ControllerServiceProvider.h
@@ -82,7 +82,7 @@ class ControllerServiceProvider : public CoreComponent,
public ConfigurableCompo
* @param id controller service identifier.
* @return shared pointer to the controller service node.
*/
- virtual std::shared_ptr<ControllerServiceNode>
getControllerServiceNode(const std::string &id) {
+ virtual std::shared_ptr<ControllerServiceNode>
getControllerServiceNode(const std::string &id) const {
return controller_map_->getControllerServiceNode(id);
}
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index e088628..01c9a1e 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -82,10 +82,6 @@
FlowController::FlowController(std::shared_ptr<core::Repository> provenance_repo
flow_file_repo_(flow_file_repo),
controller_service_map_(std::make_shared<core::controller::ControllerServiceMap>()),
thread_pool_(2, false, nullptr, "Flowcontroller threadpool"),
- timer_scheduler_(nullptr),
- event_scheduler_(nullptr),
- cron_scheduler_(nullptr),
- controller_service_provider_(nullptr),
flow_configuration_(std::move(flow_configuration)),
configuration_(configure),
content_repo_(content_repo),
@@ -267,9 +263,9 @@ int16_t FlowController::stop() {
this->root_->stopProcessing(timer_scheduler_, event_scheduler_,
cron_scheduler_);
}
// stop after we've attempted to stop the processors.
- this->timer_scheduler_->stop();
- this->event_scheduler_->stop();
- this->cron_scheduler_->stop();
+ timer_scheduler_->stop();
+ event_scheduler_->stop();
+ cron_scheduler_->stop();
thread_pool_.shutdown();
/* STOP! Before you change it, consider the following:
* -Stopping the schedulers doesn't actually quit the onTrigger functions
of processors
@@ -350,17 +346,10 @@ void FlowController::load(const
std::shared_ptr<core::ProcessGroup> &root, bool
thread_pool_.setControllerServiceProvider(base_shared_ptr);
thread_pool_.start();
}
- if (nullptr == timer_scheduler_ || reload) {
- timer_scheduler_ =
std::make_shared<TimerDrivenSchedulingAgent>(base_shared_ptr, provenance_repo_,
flow_file_repo_, content_repo_, configuration_, thread_pool_);
- }
-
- if (nullptr == event_scheduler_ || reload) {
- event_scheduler_ =
std::make_shared<EventDrivenSchedulingAgent>(base_shared_ptr, provenance_repo_,
flow_file_repo_, content_repo_, configuration_, thread_pool_);
- }
- if (nullptr == cron_scheduler_ || reload) {
- cron_scheduler_ =
std::make_shared<CronDrivenSchedulingAgent>(base_shared_ptr, provenance_repo_,
flow_file_repo_, content_repo_, configuration_, thread_pool_);
- }
+ conditionalReloadScheduler<TimerDrivenSchedulingAgent>(timer_scheduler_,
!timer_scheduler_ || reload);
+ conditionalReloadScheduler<EventDrivenSchedulingAgent>(event_scheduler_,
!event_scheduler_ || reload);
+ conditionalReloadScheduler<CronDrivenSchedulingAgent>(cron_scheduler_,
!cron_scheduler_ || reload);
std::static_pointer_cast<core::controller::StandardControllerServiceProvider>(controller_service_provider_)->setRootGroup(root_);
std::static_pointer_cast<core::controller::StandardControllerServiceProvider>(controller_service_provider_)->setSchedulingAgent(
@@ -591,7 +580,7 @@ void FlowController::initializeC2() {
loadC2ResponseConfiguration();
if (!c2_initialized_) {
- c2_agent_ = std::unique_ptr<c2::C2Agent>(new
c2::C2Agent(std::dynamic_pointer_cast<FlowController>(shared_from_this()),
+ c2_agent_ = std::unique_ptr<c2::C2Agent>(new c2::C2Agent(this,
std::dynamic_pointer_cast<FlowController>(shared_from_this()),
configuration_));
c2_agent_->start();
@@ -801,7 +790,7 @@ std::shared_ptr<core::controller::ControllerService>
FlowController::getControll
* @param id service identifier
* @return shared pointer to the controller service node or nullptr if it does
not exist.
*/
-std::shared_ptr<core::controller::ControllerServiceNode>
FlowController::getControllerServiceNode(const std::string &id) {
+std::shared_ptr<core::controller::ControllerServiceNode>
FlowController::getControllerServiceNode(const std::string &id) const {
return controller_service_provider_->getControllerServiceNode(id);
}
diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp
index 0ea31d2..9c0085d 100644
--- a/libminifi/src/c2/C2Agent.cpp
+++ b/libminifi/src/c2/C2Agent.cpp
@@ -47,7 +47,7 @@ namespace nifi {
namespace minifi {
namespace c2 {
-C2Agent::C2Agent(const
std::shared_ptr<core::controller::ControllerServiceProvider> &controller,
+C2Agent::C2Agent(core::controller::ControllerServiceProvider* controller,
const std::shared_ptr<state::StateMonitor> &updateSink,
const std::shared_ptr<Configure> &configuration)
: heart_beat_period_(3000),
diff --git a/libminifi/src/c2/ControllerSocketProtocol.cpp
b/libminifi/src/c2/ControllerSocketProtocol.cpp
index 9b7162d..124490f 100644
--- a/libminifi/src/c2/ControllerSocketProtocol.cpp
+++ b/libminifi/src/c2/ControllerSocketProtocol.cpp
@@ -32,7 +32,7 @@ namespace nifi {
namespace minifi {
namespace c2 {
-void ControllerSocketProtocol::initialize(const
std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const
std::shared_ptr<state::StateMonitor> &updateSink,
+void
ControllerSocketProtocol::initialize(core::controller::ControllerServiceProvider*
controller, const std::shared_ptr<state::StateMonitor> &updateSink,
const std::shared_ptr<Configure>
&configuration) {
HeartBeatReporter::initialize(controller, updateSink, configuration);
stream_factory_ = minifi::io::StreamFactory::getInstance(configuration);
diff --git a/libminifi/src/core/ProcessContextBuilder.cpp
b/libminifi/src/core/ProcessContextBuilder.cpp
index b1b794c..2cba8b7 100644
--- a/libminifi/src/core/ProcessContextBuilder.cpp
+++ b/libminifi/src/core/ProcessContextBuilder.cpp
@@ -48,7 +48,7 @@ ProcessContextBuilder::ProcessContextBuilder(const
std::string &name)
configuration_ = std::make_shared<minifi::Configure>();
}
-std::shared_ptr<ProcessContextBuilder>
ProcessContextBuilder::withProvider(const
std::shared_ptr<controller::ControllerServiceProvider>
&controller_service_provider) {
+std::shared_ptr<ProcessContextBuilder>
ProcessContextBuilder::withProvider(core::controller::ControllerServiceProvider*
controller_service_provider) {
controller_service_provider_ = controller_service_provider;
return this->shared_from_this();
}
diff --git a/libminifi/test/TestBase.cpp b/libminifi/test/TestBase.cpp
index 6f415eb..c897459 100644
--- a/libminifi/test/TestBase.cpp
+++ b/libminifi/test/TestBase.cpp
@@ -54,7 +54,7 @@ TestPlan::TestPlan(std::shared_ptr<core::ContentRepository>
content_repo, std::s
} else {
state_dir_ = state_dir;
}
- state_manager_provider_ =
core::ProcessContext::getOrCreateDefaultStateManagerProvider(controller_services_provider_,
configuration_, state_dir_.c_str());
+ state_manager_provider_ =
core::ProcessContext::getOrCreateDefaultStateManagerProvider(controller_services_provider_.get(),
configuration_, state_dir_.c_str());
}
TestPlan::~TestPlan() {
@@ -125,7 +125,7 @@ std::shared_ptr<core::Processor>
TestPlan::addProcessor(const std::shared_ptr<co
auto contextBuilder =
core::ClassLoader::getDefaultClassLoader().instantiate<core::ProcessContextBuilder>("ProcessContextBuilder");
- contextBuilder =
contextBuilder->withContentRepository(content_repo_)->withFlowFileRepository(flow_repo_)->withProvider(controller_services_provider_)->withProvenanceRepository(prov_repo_)->withConfiguration(configuration_);
+ contextBuilder =
contextBuilder->withContentRepository(content_repo_)->withFlowFileRepository(flow_repo_)->withProvider(controller_services_provider_.get())->withProvenanceRepository(prov_repo_)->withConfiguration(configuration_);
auto context = contextBuilder->build(node);
diff --git a/libminifi/test/archive-tests/CompressContentTests.cpp
b/libminifi/test/archive-tests/CompressContentTests.cpp
index bb31681..5944b41 100644
--- a/libminifi/test/archive-tests/CompressContentTests.cpp
+++ b/libminifi/test/archive-tests/CompressContentTests.cpp
@@ -144,8 +144,7 @@ class CompressDecompressionTestController : public
TestController{
processor_->setScheduledState(core::ScheduledState::RUNNING);
std::shared_ptr<core::ProcessorNode> node =
std::make_shared<core::ProcessorNode>(processor_);
- std::shared_ptr<core::controller::ControllerServiceProvider>
controller_services_provider = nullptr;
- context_ = std::make_shared<core::ProcessContext>(node,
controller_services_provider, repo, repo, content_repo);
+ context_ = std::make_shared<core::ProcessContext>(node, nullptr, repo,
repo, content_repo);
}
public:
diff --git a/libminifi/test/archive-tests/MergeFileTests.cpp
b/libminifi/test/archive-tests/MergeFileTests.cpp
index 1809d7c..6c1ec65 100644
--- a/libminifi/test/archive-tests/MergeFileTests.cpp
+++ b/libminifi/test/archive-tests/MergeFileTests.cpp
@@ -185,8 +185,7 @@ class MergeTestController : public TestController {
logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
node = std::make_shared<core::ProcessorNode>(processor);
- std::shared_ptr<core::controller::ControllerServiceProvider>
controller_service_provider = nullptr;
- context = std::make_shared<core::ProcessContext>(node,
controller_service_provider, repo, repo, content_repo);
+ context = std::make_shared<core::ProcessContext>(node, nullptr, repo,
repo, content_repo);
}
~MergeTestController() = default;
std::shared_ptr<core::ProcessContext> context;
diff --git a/libminifi/test/integration/IntegrationBase.h
b/libminifi/test/integration/IntegrationBase.h
index 3a24eef..ce22ef2 100644
--- a/libminifi/test/integration/IntegrationBase.h
+++ b/libminifi/test/integration/IntegrationBase.h
@@ -118,7 +118,7 @@ void IntegrationBase::run(std::string test_file_location) {
auto controller_service_provider = yaml_ptr->getControllerServiceProvider();
char state_dir_name_template[] = "/var/tmp/integrationstate.XXXXXX";
state_dir =
utils::file::FileUtils::create_temp_directory(state_dir_name_template);
-
core::ProcessContext::getOrCreateDefaultStateManagerProvider(controller_service_provider,
configuration, state_dir.c_str());
+
core::ProcessContext::getOrCreateDefaultStateManagerProvider(controller_service_provider.get(),
configuration, state_dir.c_str());
std::shared_ptr<core::ProcessGroup>
pg(yaml_config.getRoot(test_file_location));
queryRootProcessGroup(pg);
diff --git a/libminifi/test/persistence-tests/PersistenceTests.cpp
b/libminifi/test/persistence-tests/PersistenceTests.cpp
index 466110d..4844729 100644
--- a/libminifi/test/persistence-tests/PersistenceTests.cpp
+++ b/libminifi/test/persistence-tests/PersistenceTests.cpp
@@ -41,20 +41,18 @@ struct TestFlow{
TestFlow(const std::shared_ptr<core::repository::FlowFileRepository>&
ff_repository, const std::shared_ptr<core::ContentRepository>& content_repo,
const std::shared_ptr<core::Repository>& prov_repo,
const
std::function<std::shared_ptr<core::Processor>(utils::Identifier&)>&
processorGenerator, const core::Relationship& relationshipToOutput)
: ff_repository(ff_repository), content_repo(content_repo),
prov_repo(prov_repo) {
- std::shared_ptr<core::controller::ControllerServiceProvider>
controller_services_provider = nullptr;
-
// setup processor
{
processor = processorGenerator(mainProcUUID());
std::shared_ptr<core::ProcessorNode> node =
std::make_shared<core::ProcessorNode>(processor);
- processorContext = std::make_shared<core::ProcessContext>(node,
controller_services_provider, prov_repo, ff_repository, content_repo);
+ processorContext = std::make_shared<core::ProcessContext>(node, nullptr,
prov_repo, ff_repository, content_repo);
}
// setup INPUT processor
{
inputProcessor = std::make_shared<core::Processor>("source",
inputProcUUID());
std::shared_ptr<core::ProcessorNode> node =
std::make_shared<core::ProcessorNode>(inputProcessor);
- inputContext = std::make_shared<core::ProcessContext>(node,
controller_services_provider, prov_repo,
+ inputContext = std::make_shared<core::ProcessContext>(node, nullptr,
prov_repo,
ff_repository,
content_repo);
}
diff --git a/libminifi/test/rocksdb-tests/RepoTests.cpp
b/libminifi/test/rocksdb-tests/RepoTests.cpp
index b48ebdd..aa2ad84 100644
--- a/libminifi/test/rocksdb-tests/RepoTests.cpp
+++ b/libminifi/test/rocksdb-tests/RepoTests.cpp
@@ -298,8 +298,7 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") {
input->setSourceUUID(uuid);
processor->addConnection(input);
std::shared_ptr<core::ProcessorNode> node =
std::make_shared<core::ProcessorNode>(processor);
- std::shared_ptr<core::controller::ControllerServiceProvider>
controller_services_provider = nullptr;
- auto context = std::make_shared<core::ProcessContext>(node,
controller_services_provider, prov_repo, ff_repository, content_repo);
+ auto context = std::make_shared<core::ProcessContext>(node, nullptr,
prov_repo, ff_repository, content_repo);
core::ProcessSession sessionGenFlowFile(context);
std::shared_ptr<core::FlowFile> flow =
std::static_pointer_cast<core::FlowFile>(sessionGenFlowFile.create());
sessionGenFlowFile.importFrom(content, flow);
diff --git a/nanofi/include/cxx/C2CallbackAgent.h
b/nanofi/include/cxx/C2CallbackAgent.h
index cc8a665..81198bb 100644
--- a/nanofi/include/cxx/C2CallbackAgent.h
+++ b/nanofi/include/cxx/C2CallbackAgent.h
@@ -47,7 +47,7 @@ class C2CallbackAgent : public c2::C2Agent {
public:
- explicit C2CallbackAgent(const
std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const
std::shared_ptr<state::StateMonitor> &updateSink, const
std::shared_ptr<Configure> &configure);
+ explicit C2CallbackAgent(core::controller::ControllerServiceProvider*
controller, const std::shared_ptr<state::StateMonitor> &updateSink, const
std::shared_ptr<Configure> &configure);
virtual ~C2CallbackAgent() = default;
diff --git a/nanofi/include/cxx/Instance.h b/nanofi/include/cxx/Instance.h
index 0a594e4..c8347c7 100644
--- a/nanofi/include/cxx/Instance.h
+++ b/nanofi/include/cxx/Instance.h
@@ -101,13 +101,12 @@ class Instance {
}
void enableAsyncC2(C2_Server *server, c2_stop_callback *c1,
c2_start_callback *c2, c2_update_callback *c3) {
- std::shared_ptr<core::controller::ControllerServiceProvider>
controller_service_provider = nullptr;
running_ = true;
if (server->type != C2_Server_Type::MQTT) {
configure_->set("c2.rest.url", server->url);
configure_->set("c2.rest.url.ack", server->ack_url);
}
- agent_ =
std::make_shared<c2::C2CallbackAgent>(controller_service_provider, nullptr,
configure_);
+ agent_ = std::make_shared<c2::C2CallbackAgent>(nullptr, nullptr,
configure_);
listener_thread_pool_.start();
registerUpdateListener(agent_, 1000);
agent_->setStopCallback(c1);
@@ -133,8 +132,7 @@ class Instance {
}
void transfer(const std::shared_ptr<FlowFileRecord> &ff, const
std::shared_ptr<minifi::io::DataStream> &stream = nullptr) {
- std::shared_ptr<core::controller::ControllerServiceProvider>
controller_service_provider = nullptr;
- auto processContext = std::make_shared<core::ProcessContext>(proc_node_,
controller_service_provider, no_op_repo_, no_op_repo_, configure_,
content_repo_);
+ auto processContext = std::make_shared<core::ProcessContext>(proc_node_,
nullptr, no_op_repo_, no_op_repo_, configure_, content_repo_);
auto sessionFactory =
std::make_shared<core::ProcessSessionFactory>(processContext);
rpg_->onSchedule(processContext, sessionFactory);
diff --git a/nanofi/src/cxx/C2CallbackAgent.cpp
b/nanofi/src/cxx/C2CallbackAgent.cpp
index 3a2b0d1..b41a870 100644
--- a/nanofi/src/cxx/C2CallbackAgent.cpp
+++ b/nanofi/src/cxx/C2CallbackAgent.cpp
@@ -34,7 +34,7 @@ namespace nifi {
namespace minifi {
namespace c2 {
-C2CallbackAgent::C2CallbackAgent(const
std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const
std::shared_ptr<state::StateMonitor> &updateSink,
+C2CallbackAgent::C2CallbackAgent(core::controller::ControllerServiceProvider*
controller, const std::shared_ptr<state::StateMonitor> &updateSink,
const std::shared_ptr<Configure>
&configuration)
: C2Agent(controller, updateSink, configuration),
stop(nullptr),
diff --git a/nanofi/src/cxx/Plan.cpp b/nanofi/src/cxx/Plan.cpp
index 72dc162..e6a0211 100644
--- a/nanofi/src/cxx/Plan.cpp
+++ b/nanofi/src/cxx/Plan.cpp
@@ -115,7 +115,7 @@ std::shared_ptr<core::Processor>
ExecutionPlan::addProcessor(const std::shared_p
processor_nodes_.push_back(node);
- std::shared_ptr<core::ProcessContext> context =
std::make_shared<core::ProcessContext>(node, controller_services_provider_,
prov_repo_, flow_repo_, content_repo_);
+ std::shared_ptr<core::ProcessContext> context =
std::make_shared<core::ProcessContext>(node,
controller_services_provider_.get(), prov_repo_, flow_repo_, content_repo_);
processor_contexts_.push_back(context);
processor_queue_.push_back(processor);
@@ -228,7 +228,7 @@ void ExecutionPlan::finalize() {
processor_nodes_.push_back(node);
- std::shared_ptr<core::ProcessContext> context =
std::make_shared<core::ProcessContext>(node, controller_services_provider_,
prov_repo_, flow_repo_, content_repo_);
+ std::shared_ptr<core::ProcessContext> context =
std::make_shared<core::ProcessContext>(node,
controller_services_provider_.get(), prov_repo_, flow_repo_, content_repo_);
processor_contexts_.push_back(context);
processor_queue_.push_back(failure_proc);