This is an automated email from the ASF dual-hosted git repository.
lordgamez pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new f39859fda MINIFICPP-1896 Change C2 component-level start/stop commands
to use UUID
f39859fda is described below
commit f39859fdaf46d9120fd9466f67b47d9aabb43440
Author: Marton Szasz <[email protected]>
AuthorDate: Wed Aug 10 19:18:45 2022 +0200
MINIFICPP-1896 Change C2 component-level start/stop commands to use UUID
Signed-off-by: Gabor Gyimesi <[email protected]>
This closes #1388
---
.../tests/C2ClearCoreComponentStateTest.cpp | 4 +-
.../tests/C2DescribeCoreComponentStateTest.cpp | 4 +-
.../http-curl/tests/C2DescribeManifestTest.cpp | 7 ++-
.../http-curl/tests/C2VerifyHeartbeatAndStop.cpp | 2 +-
.../tests/C2VerifyLightweightHeartbeatAndStop.cpp | 2 +-
extensions/http-curl/tests/HTTPHandlers.h | 4 +-
extensions/http-curl/tests/VerifyInvokeHTTP.h | 4 +-
.../tests/integration/TailFileTest.cpp | 4 +-
libminifi/include/FlowController.h | 12 ++--
libminifi/include/core/Scheduling.h | 6 --
libminifi/include/core/state/ProcessorController.h | 31 +++-------
libminifi/include/core/state/UpdateController.h | 10 ++--
libminifi/src/FlowController.cpp | 70 ++++++++++------------
libminifi/src/c2/C2Agent.cpp | 2 +-
libminifi/src/core/state/ProcessorController.cpp | 19 ++----
.../src/core/state/nodes/SupportedOperations.cpp | 2 +-
.../integration/OnScheduleErrorHandlingTests.cpp | 2 +-
.../integration/StateTransactionalityTests.cpp | 4 +-
libminifi/test/pcap-tests/PcapTest.cpp | 37 ++----------
19 files changed, 87 insertions(+), 139 deletions(-)
diff --git a/extensions/http-curl/tests/C2ClearCoreComponentStateTest.cpp
b/extensions/http-curl/tests/C2ClearCoreComponentStateTest.cpp
index c39d80b3e..ece5a8798 100644
--- a/extensions/http-curl/tests/C2ClearCoreComponentStateTest.cpp
+++ b/extensions/http-curl/tests/C2ClearCoreComponentStateTest.cpp
@@ -58,8 +58,8 @@ class VerifyC2ClearCoreComponentState : public VerifyC2Base {
protected:
void updateProperties(minifi::FlowController& flow_controller) override {
auto setFileName = [] (const std::string& fileName,
minifi::state::StateController& component){
- auto* processor =
dynamic_cast<minifi::state::ProcessorController&>(component).getProcessor();
- processor->setProperty(minifi::processors::TailFile::FileName, fileName);
+ auto& processor =
dynamic_cast<minifi::state::ProcessorController&>(component).getProcessor();
+ processor.setProperty(minifi::processors::TailFile::FileName, fileName);
};
flow_controller.executeOnComponent("TailFile1",
diff --git a/extensions/http-curl/tests/C2DescribeCoreComponentStateTest.cpp
b/extensions/http-curl/tests/C2DescribeCoreComponentStateTest.cpp
index 8754bfa0f..1439df089 100644
--- a/extensions/http-curl/tests/C2DescribeCoreComponentStateTest.cpp
+++ b/extensions/http-curl/tests/C2DescribeCoreComponentStateTest.cpp
@@ -45,8 +45,8 @@ class VerifyC2DescribeCoreComponentState : public
VerifyC2Describe {
protected:
void updateProperties(minifi::FlowController& flow_controller) override {
auto setFileName = [] (const std::string& fileName,
minifi::state::StateController& component){
- auto* processor =
dynamic_cast<minifi::state::ProcessorController&>(component).getProcessor();
- processor->setProperty(minifi::processors::TailFile::FileName, fileName);
+ auto& processor =
dynamic_cast<minifi::state::ProcessorController&>(component).getProcessor();
+ processor.setProperty(minifi::processors::TailFile::FileName, fileName);
};
flow_controller.executeOnComponent("TailFile1",
diff --git a/extensions/http-curl/tests/C2DescribeManifestTest.cpp
b/extensions/http-curl/tests/C2DescribeManifestTest.cpp
index 42e7d1aac..9475b4086 100644
--- a/extensions/http-curl/tests/C2DescribeManifestTest.cpp
+++ b/extensions/http-curl/tests/C2DescribeManifestTest.cpp
@@ -18,13 +18,16 @@
#undef NDEBUG
#include <string>
-#include "TestBase.h"
#include "Catch.h"
#include "HTTPIntegrationBase.h"
#include "HTTPHandlers.h"
#include "properties/Configuration.h"
#include "ConfigTestAccessor.h"
+// from TestHTTPGet.yml
+constexpr auto invokehttp_uuid = "2438e3c8-015a-1000-79ca-83af40ec1991";
+constexpr auto logattribute_uuid = "2438e3c8-015a-1000-79ca-83af40ec1992";
+
class DescribeManifestHandler: public HeartbeatHandler {
public:
explicit DescribeManifestHandler(std::shared_ptr<minifi::Configure>
configuration, std::atomic<bool>& verified)
@@ -37,7 +40,7 @@ class DescribeManifestHandler: public HeartbeatHandler {
}
void handleAcknowledge(const rapidjson::Document& root) override {
- verifyJsonHasAgentManifest(root, {"InvokeHTTP", "LogAttribute"},
{"nifi.extension.path", "nifi.python.processor.dir"});
+ verifyJsonHasAgentManifest(root, {invokehttp_uuid, logattribute_uuid},
{"nifi.extension.path", "nifi.python.processor.dir"});
verified_ = true;
}
diff --git a/extensions/http-curl/tests/C2VerifyHeartbeatAndStop.cpp
b/extensions/http-curl/tests/C2VerifyHeartbeatAndStop.cpp
index deaf1438d..b39f8dee2 100644
--- a/extensions/http-curl/tests/C2VerifyHeartbeatAndStop.cpp
+++ b/extensions/http-curl/tests/C2VerifyHeartbeatAndStop.cpp
@@ -42,7 +42,7 @@ class VerifyC2Heartbeat : public VerifyC2Base {
using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
assert(verifyLogLinePresenceInPollTime(std::chrono::milliseconds(wait_time_),
"Received Ack from Server",
- "C2Agent] [debug] Stopping component invoke",
+ "C2Agent] [debug] Stopping component
2438e3c8-015a-1000-79ca-83af40ec1991",
"C2Agent] [debug] Stopping component FlowController"));
}
diff --git a/extensions/http-curl/tests/C2VerifyLightweightHeartbeatAndStop.cpp
b/extensions/http-curl/tests/C2VerifyLightweightHeartbeatAndStop.cpp
index f0060538a..f371685b7 100644
--- a/extensions/http-curl/tests/C2VerifyLightweightHeartbeatAndStop.cpp
+++ b/extensions/http-curl/tests/C2VerifyLightweightHeartbeatAndStop.cpp
@@ -66,7 +66,7 @@ class VerifyLightWeightC2Heartbeat : public VerifyC2Base {
using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
assert(verifyLogLinePresenceInPollTime(std::chrono::milliseconds(wait_time_),
"Received Ack from Server",
- "C2Agent] [debug] Stopping component invoke",
+ "C2Agent] [debug] Stopping component
2438e3c8-015a-1000-79ca-83af40ec1991",
"C2Agent] [debug] Stopping component FlowController"));
}
diff --git a/extensions/http-curl/tests/HTTPHandlers.h
b/extensions/http-curl/tests/HTTPHandlers.h
index cb523bf84..a49090e72 100644
--- a/extensions/http-curl/tests/HTTPHandlers.h
+++ b/extensions/http-curl/tests/HTTPHandlers.h
@@ -590,7 +590,7 @@ class HeartbeatHandler : public ServerAwareHandler {
case minifi::c2::Operation::STOP: {
auto operands = getOperandsOfProperties(operation_node);
assert(operands.find("c2") != operands.end());
- assert(operands.find("FlowController") != operands.end());
+ // FlowController is also present, but this handler has no way of
knowing its UUID to test it
for (const auto& component : verify_components) {
assert(operands.find(component) != operands.end());
}
@@ -630,7 +630,7 @@ class StoppingHeartbeatHandler : public HeartbeatHandler {
private:
static void sendStopOperation(struct mg_connection *conn) {
- std::string resp = "{\"operation\" : \"heartbeat\",
\"requested_operations\" : [{ \"operationid\" : 41, \"operation\" : \"stop\",
\"operand\" : \"invoke\" }, "
+ std::string resp = "{\"operation\" : \"heartbeat\",
\"requested_operations\" : [{ \"operationid\" : 41, \"operation\" : \"stop\",
\"operand\" : \"2438e3c8-015a-1000-79ca-83af40ec1991\" }, "
"{ \"operationid\" : 42, \"operation\" : \"stop\", \"operand\" :
\"FlowController\" } ]}";
mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
"text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
diff --git a/extensions/http-curl/tests/VerifyInvokeHTTP.h
b/extensions/http-curl/tests/VerifyInvokeHTTP.h
index a47a800d8..bb5efefad 100644
--- a/extensions/http-curl/tests/VerifyInvokeHTTP.h
+++ b/extensions/http-curl/tests/VerifyInvokeHTTP.h
@@ -70,8 +70,8 @@ class VerifyInvokeHTTP : public HTTPIntegrationBase {
flowController_->executeOnComponent("InvokeHTTP",
[&](minifi::state::StateController& component) {
const auto processorController =
dynamic_cast<minifi::state::ProcessorController*>(&component);
assert(processorController);
- auto proc = processorController->getProcessor();
- proc->setProperty(property, value);
+ auto& proc = processorController->getProcessor();
+ proc.setProperty(property, value);
executed = true;
});
diff --git a/extensions/standard-processors/tests/integration/TailFileTest.cpp
b/extensions/standard-processors/tests/integration/TailFileTest.cpp
index c1c7ff678..01df3836b 100644
--- a/extensions/standard-processors/tests/integration/TailFileTest.cpp
+++ b/extensions/standard-processors/tests/integration/TailFileTest.cpp
@@ -78,8 +78,8 @@ class TailFileTestHarness : public IntegrationBase {
fc.executeOnComponent("tf", [this] (minifi::state::StateController&
component) {
auto proc =
dynamic_cast<minifi::state::ProcessorController*>(&component);
if (nullptr != proc) {
-
proc->getProcessor()->setProperty(minifi::processors::TailFile::FileName,
ss.str());
-
proc->getProcessor()->setProperty(minifi::processors::TailFile::StateFile,
statefile);
+
proc->getProcessor().setProperty(minifi::processors::TailFile::FileName,
ss.str());
+
proc->getProcessor().setProperty(minifi::processors::TailFile::StateFile,
statefile);
}
});
}
diff --git a/libminifi/include/FlowController.h
b/libminifi/include/FlowController.h
index 01edc0154..f1354adbb 100644
--- a/libminifi/include/FlowController.h
+++ b/libminifi/include/FlowController.h
@@ -115,7 +115,7 @@ class FlowController : public
core::controller::ForwardingControllerServiceProvi
return -1;
}
- void executeOnComponent(const std::string &name,
std::function<void(state::StateController&)> func) override;
+ void executeOnComponent(const std::string& id_or_name,
std::function<void(state::StateController&)> func) override;
void executeOnAllComponents(std::function<void(state::StateController&)>
func) override;
int16_t clearConnection(const std::string &connection) override;
@@ -238,15 +238,15 @@ class FlowController : public
core::controller::ForwardingControllerServiceProvi
private:
std::vector<state::StateController*> getAllComponents();
- state::StateController* getComponent(const std::string &name);
+ state::StateController* getComponent(const std::string& id_or_name);
- state::StateController* getProcessorController(const std::string& name,
- const
std::function<std::unique_ptr<state::ProcessorController>(core::Processor&)>&
controllerFactory);
+ state::StateController* getProcessorController(const std::string& id_or_name,
+ const
std::function<gsl::not_null<std::unique_ptr<state::ProcessorController>>(core::Processor&)>&
controllerFactory);
std::vector<state::StateController*> getAllProcessorControllers(
- const
std::function<std::unique_ptr<state::ProcessorController>(core::Processor&)>&
controllerFactory);
+ const
std::function<gsl::not_null<std::unique_ptr<state::ProcessorController>>(core::Processor&)>&
controllerFactory);
- std::unique_ptr<state::ProcessorController>
createController(core::Processor& processor);
+ gsl::not_null<std::unique_ptr<state::ProcessorController>>
createController(core::Processor& processor);
std::chrono::milliseconds shutdown_check_interval_{1000};
std::shared_ptr<core::logging::Logger> logger_ =
core::logging::LoggerFactory<FlowController>::getLogger();
diff --git a/libminifi/include/core/Scheduling.h
b/libminifi/include/core/Scheduling.h
index 7f5f9ca5c..9a46b6cb7 100644
--- a/libminifi/include/core/Scheduling.h
+++ b/libminifi/include/core/Scheduling.h
@@ -43,15 +43,9 @@ enum ScheduledState {
RUNNING
};
-/*
- * Scheduling Strategy
- */
enum SchedulingStrategy {
- // Event driven
EVENT_DRIVEN,
- // Timer driven
TIMER_DRIVEN,
- // Cron Driven
CRON_DRIVEN
};
diff --git a/libminifi/include/core/state/ProcessorController.h
b/libminifi/include/core/state/ProcessorController.h
index fe1de138a..109d5c490 100644
--- a/libminifi/include/core/state/ProcessorController.h
+++ b/libminifi/include/core/state/ProcessorController.h
@@ -1,5 +1,4 @@
/**
- *
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -15,8 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#ifndef LIBMINIFI_INCLUDE_CORE_STATE_PROCESSORCONTROLLER_H_
-#define LIBMINIFI_INCLUDE_CORE_STATE_PROCESSORCONTROLLER_H_
+#pragma once
#include <string>
#include <memory>
@@ -24,11 +22,7 @@
#include "SchedulingAgent.h"
#include "UpdateController.h"
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace state {
+namespace org::apache::nifi::minifi::state {
/**
* Purpose, Justification, & Design: ProcessController is the state control
mechanism for processors.
@@ -38,20 +32,20 @@ namespace state {
*/
class ProcessorController : public StateController {
public:
- ProcessorController(core::Processor* processor, const
std::shared_ptr<SchedulingAgent> &scheduler);
+ ProcessorController(core::Processor& processor,
std::shared_ptr<SchedulingAgent> scheduler);
~ProcessorController() override;
- std::string getComponentName() const override {
+ [[nodiscard]] std::string getComponentName() const override {
return processor_->getName();
}
- utils::Identifier getComponentUUID() const override {
+ [[nodiscard]] utils::Identifier getComponentUUID() const override {
return processor_->getUUID();
}
- core::Processor* getProcessor() {
- return processor_;
+ core::Processor& getProcessor() {
+ return *processor_;
}
/**
* Start the client
@@ -69,15 +63,8 @@ class ProcessorController : public StateController {
int16_t resume() override;
protected:
- core::Processor* processor_;
+ gsl::not_null<core::Processor*> processor_;
std::shared_ptr<SchedulingAgent> scheduler_;
};
-} // namespace state
-} // namespace minifi
-} // namespace nifi
-} // namespace apache
-} // namespace org
-
-#endif // LIBMINIFI_INCLUDE_CORE_STATE_PROCESSORCONTROLLER_H_
-
+} // namespace org::apache::nifi::minifi::state
diff --git a/libminifi/include/core/state/UpdateController.h
b/libminifi/include/core/state/UpdateController.h
index a4c9ad525..496b523a2 100644
--- a/libminifi/include/core/state/UpdateController.h
+++ b/libminifi/include/core/state/UpdateController.h
@@ -114,11 +114,11 @@ class Pausable {
class StateController : public Pausable {
public:
- virtual ~StateController() = default;
+ ~StateController() override = default;
- virtual std::string getComponentName() const = 0;
+ [[nodiscard]] virtual std::string getComponentName() const = 0;
- virtual utils::Identifier getComponentUUID() const = 0;
+ [[nodiscard]] virtual utils::Identifier getComponentUUID() const = 0;
/**
* Start the client
*/
@@ -141,8 +141,8 @@ class StateMonitor : public StateController {
public:
~StateMonitor() override = default;
- // Execute callback func on the named component. Thread safe, locking
mutex_, preventing concurrent flow update
- virtual void executeOnComponent(const std::string &name,
std::function<void(state::StateController&)> func) = 0;
+ // Execute callback func on the component. Thread safe, locking mutex_,
preventing concurrent flow update
+ virtual void executeOnComponent(const std::string &id_or_name,
std::function<void(state::StateController&)> func) = 0;
// Execute callback func on the all components. Thread safe, locking mutex_,
preventing concurrent flow update
virtual void
executeOnAllComponents(std::function<void(state::StateController&)> func) = 0;
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index a7f9a5ce0..2339e968c 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -462,52 +462,46 @@ void
FlowController::executeOnAllComponents(std::function<void(state::StateContr
}
}
-void FlowController::executeOnComponent(const std::string &name,
std::function<void(state::StateController&)> func) {
+void FlowController::executeOnComponent(const std::string &id_or_name,
std::function<void(state::StateController&)> func) {
if (updating_) {
return;
}
std::lock_guard<std::recursive_mutex> lock(mutex_);
- if (auto* component = getComponent(name); component != nullptr) {
+ if (auto* component = getComponent(id_or_name); component != nullptr) {
func(*component);
} else {
- logger_->log_error("Could not get execute requested callback for component
\"%s\", because component was not found", name);
+ logger_->log_error("Could not get execute requested callback for component
\"%s\", because component was not found", id_or_name);
}
}
std::vector<state::StateController*> FlowController::getAllComponents() {
if (root_) {
- auto controllerFactory = [this] (core::Processor& p) {
- return createController(p);
- };
- return getAllProcessorControllers(controllerFactory);
+ return getAllProcessorControllers([this](core::Processor& p) { return
createController(p); });
}
return {this};
}
-state::StateController* FlowController::getComponent(const std::string& name) {
- if (name == "FlowController") {
+state::StateController* FlowController::getComponent(const std::string&
id_or_name) {
+ if (id_or_name == getUUIDStr() || id_or_name == "FlowController") {
return this;
} else if (root_) {
- auto controllerFactory = [this] (core::Processor& p) {
- return createController(p);
- };
- return getProcessorController(name, controllerFactory);
+ return getProcessorController(id_or_name, [this](core::Processor& p) {
return createController(p); });
}
return nullptr;
}
-std::unique_ptr<state::ProcessorController>
FlowController::createController(core::Processor& processor) {
- switch (processor.getSchedulingStrategy()) {
- case core::SchedulingStrategy::TIMER_DRIVEN:
- return std::make_unique<state::ProcessorController>(&processor,
timer_scheduler_);
- case core::SchedulingStrategy::EVENT_DRIVEN:
- return std::make_unique<state::ProcessorController>(&processor,
event_scheduler_);
- case core::SchedulingStrategy::CRON_DRIVEN:
- return std::make_unique<state::ProcessorController>(&processor,
cron_scheduler_);
- }
- return {};
+gsl::not_null<std::unique_ptr<state::ProcessorController>>
FlowController::createController(core::Processor& processor) {
+ const auto scheduler = [this, &processor]() ->
std::shared_ptr<SchedulingAgent> {
+ switch (processor.getSchedulingStrategy()) {
+ case core::SchedulingStrategy::TIMER_DRIVEN: return timer_scheduler_;
+ case core::SchedulingStrategy::EVENT_DRIVEN: return event_scheduler_;
+ case core::SchedulingStrategy::CRON_DRIVEN: return cron_scheduler_;
+ }
+ gsl_Assert(false);
+ };
+ return
gsl::make_not_null(std::make_unique<state::ProcessorController>(processor,
scheduler()));
}
uint64_t FlowController::getUptime() {
@@ -541,7 +535,7 @@ std::map<std::string, std::unique_ptr<io::InputStream>>
FlowController::getDebug
}
std::vector<state::StateController*>
FlowController::getAllProcessorControllers(
- const
std::function<std::unique_ptr<state::ProcessorController>(core::Processor&)>&
controllerFactory) {
+ const
std::function<gsl::not_null<std::unique_ptr<state::ProcessorController>>(core::Processor&)>&
controllerFactory) {
std::vector<state::StateController*> controllerVec{this};
std::vector<core::Processor*> processorVec;
root_->getAllProcessors(processorVec);
@@ -558,19 +552,21 @@ std::vector<state::StateController*>
FlowController::getAllProcessorControllers(
return controllerVec;
}
-state::StateController* FlowController::getProcessorController(const
std::string& name, const
std::function<std::unique_ptr<state::ProcessorController>(core::Processor&)>&
controllerFactory) {
- auto* processor = root_->findProcessorByName(name);
- if (processor == nullptr) {
- logger_->log_error("Could not get processor controller for requested name
\"%s\", because processor was not found either", name);
- return nullptr;
- }
-
- // reference to the existing or newly created controller
- auto& foundController = processor_to_controller_[processor->getUUID()];
- if (!foundController) {
- foundController = controllerFactory(*processor);
- }
- return foundController.get();
+state::StateController* FlowController::getProcessorController(const
std::string& id_or_name,
+ const
std::function<gsl::not_null<std::unique_ptr<state::ProcessorController>>(core::Processor&)>&
controllerFactory) {
+ return utils::Identifier::parse(id_or_name)
+ | utils::flatMap([this](utils::Identifier id) { return
utils::optional_from_ptr(root_->findProcessorById(id)); })
+ | utils::orElse([this, &id_or_name] { return
utils::optional_from_ptr(root_->findProcessorByName(id_or_name)); })
+ | utils::map([this, &controllerFactory](gsl::not_null<core::Processor*>
proc) -> gsl::not_null<state::ProcessorController*> {
+ return
utils::optional_from_ptr(processor_to_controller_[proc->getUUID()].get())
+ | utils::valueOrElse([this, proc, &controllerFactory] {
+ return
gsl::make_not_null((processor_to_controller_[proc->getUUID()] =
controllerFactory(*proc)).get());
+ });
+ })
+ | utils::valueOrElse([this, &id_or_name]() ->
state::ProcessorController* {
+ logger_->log_error("Could not get processor controller for requested
id/name \"%s\", because the processor was not found", id_or_name);
+ return nullptr;
+ });
}
void FlowController::loadMetricsPublisher() {
diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp
index 38663576b..b22dce01d 100644
--- a/libminifi/src/c2/C2Agent.cpp
+++ b/libminifi/src/c2/C2Agent.cpp
@@ -358,7 +358,7 @@ void C2Agent::handle_c2_server_response(const
C2ContentResponse &resp) {
// stop all referenced components.
update_sink_->executeOnComponent(resp.name, [this, &resp]
(state::StateController& component) {
- logger_->log_debug("Stopping component %s",
component.getComponentName());
+ logger_->log_debug("Stopping component %s", resp.name);
if (resp.op == Operation::STOP) {
component.stop();
} else {
diff --git a/libminifi/src/core/state/ProcessorController.cpp
b/libminifi/src/core/state/ProcessorController.cpp
index 47a1512f6..c5686de77 100644
--- a/libminifi/src/core/state/ProcessorController.cpp
+++ b/libminifi/src/core/state/ProcessorController.cpp
@@ -18,16 +18,13 @@
#include "core/state/ProcessorController.h"
#include <memory>
+#include <utility>
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace state {
+namespace org::apache::nifi::minifi::state {
-ProcessorController::ProcessorController(core::Processor* processor, const
std::shared_ptr<SchedulingAgent> &scheduler)
- : processor_(processor),
- scheduler_(scheduler) {
+ProcessorController::ProcessorController(core::Processor& processor,
std::shared_ptr<SchedulingAgent> scheduler)
+ : processor_(&processor),
+ scheduler_(std::move(scheduler)) {
}
ProcessorController::~ProcessorController() = default;
@@ -59,8 +56,4 @@ int16_t ProcessorController::resume() {
return start();
}
-} /* namespace state */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+} // namespace org::apache::nifi::minifi::state
diff --git a/libminifi/src/core/state/nodes/SupportedOperations.cpp
b/libminifi/src/core/state/nodes/SupportedOperations.cpp
index 2ab0217f6..252103116 100644
--- a/libminifi/src/core/state/nodes/SupportedOperations.cpp
+++ b/libminifi/src/core/state/nodes/SupportedOperations.cpp
@@ -112,7 +112,7 @@ void
SupportedOperations::fillProperties(SerializedResponseNode& properties, min
addProperty(properties, "c2");
if (monitor_) {
monitor_->executeOnAllComponents([&properties](StateController&
component){
- addProperty(properties, component.getComponentName());
+ addProperty(properties, component.getComponentUUID().to_string());
});
}
break;
diff --git a/libminifi/test/integration/OnScheduleErrorHandlingTests.cpp
b/libminifi/test/integration/OnScheduleErrorHandlingTests.cpp
index 010ffcb68..b20d34164 100644
--- a/libminifi/test/integration/OnScheduleErrorHandlingTests.cpp
+++ b/libminifi/test/integration/OnScheduleErrorHandlingTests.cpp
@@ -93,7 +93,7 @@ class EventDriverScheduleErrorHandlingTests: public
IntegrationBase {
auto process_controller =
dynamic_cast<org::apache::nifi::minifi::state::ProcessorController*>(&component);
assert(process_controller != nullptr);
-
process_controller->getProcessor()->setSchedulingStrategy(org::apache::nifi::minifi::core::SchedulingStrategy::EVENT_DRIVEN);
+
process_controller->getProcessor().setSchedulingStrategy(org::apache::nifi::minifi::core::SchedulingStrategy::EVENT_DRIVEN);
}
++controllerVecIdx;
diff --git a/libminifi/test/integration/StateTransactionalityTests.cpp
b/libminifi/test/integration/StateTransactionalityTests.cpp
index 18bba7ca7..e3cd7983b 100644
--- a/libminifi/test/integration/StateTransactionalityTests.cpp
+++ b/libminifi/test/integration/StateTransactionalityTests.cpp
@@ -72,8 +72,8 @@ class StatefulIntegrationTest : public IntegrationBase {
// set hooks
const auto processController =
dynamic_cast<ProcessorController*>(&component);
assert(processController != nullptr);
- stateful_processor_ =
dynamic_cast<StatefulProcessor*>(processController->getProcessor());
- assert(stateful_processor_ != nullptr);
+ stateful_processor_ =
dynamic_cast<StatefulProcessor*>(&processController->getProcessor());
+ assert(stateful_processor_);
stateful_processor_->setHooks(on_schedule_hook_, on_trigger_hooks_);
}
diff --git a/libminifi/test/pcap-tests/PcapTest.cpp
b/libminifi/test/pcap-tests/PcapTest.cpp
index 23df72a8d..a65c13676 100644
--- a/libminifi/test/pcap-tests/PcapTest.cpp
+++ b/libminifi/test/pcap-tests/PcapTest.cpp
@@ -16,29 +16,13 @@
* limitations under the License.
*/
-#include <sys/stat.h>
#undef NDEBUG
#include <cassert>
-#include <utility>
#include <chrono>
-#include <fstream>
-#include <memory>
#include <string>
-#include <thread>
-#include <type_traits>
-#include <vector>
-#include <iostream>
-#include <sstream>
#include "../TestBase.h"
-#include "utils/StringUtils.h"
-#include "core/Core.h"
-#include "core/logging/Logger.h"
#include "core/ProcessGroup.h"
-#include "core/yaml/YamlConfiguration.h"
#include "FlowController.h"
-#include "properties/Configure.h"
-#include "../unit/ProvenanceTestHelper.h"
-#include "io/StreamFactory.h"
#include "core/ConfigurableComponent.h"
#include "core/state/ProcessorController.h"
#include "../integration/IntegrationBase.h"
@@ -47,9 +31,7 @@
class PcapTestHarness : public IntegrationBase {
public:
- PcapTestHarness() {
- dir = testController.createTempDirectory();
- }
+ PcapTestHarness() = default;
void testSetup() override {
LogTestController::getInstance().setTrace<minifi::processors::CapturePacket>();
@@ -81,33 +63,26 @@ class PcapTestHarness : public IntegrationBase {
fc.executeOnComponent("pcap", [this] (minifi::state::StateController&
component) {
auto proccontroller =
dynamic_cast<minifi::state::ProcessorController*>(&component);
if (proccontroller) {
- auto processor = proccontroller->getProcessor();
-
processor->setProperty(minifi::processors::CapturePacket::BaseDir.getName(),
dir);
-
processor->setProperty(minifi::processors::CapturePacket::NetworkControllers.getName(),
".*");
+ auto& processor = proccontroller->getProcessor();
+
processor.setProperty(minifi::processors::CapturePacket::BaseDir.getName(),
dir);
+
processor.setProperty(minifi::processors::CapturePacket::NetworkControllers.getName(),
".*");
}
});
}
protected:
- std::string dir;
TestController testController;
+ std::string dir = testController.createTempDirectory();
};
int main(int argc, char **argv) {
- std::string key_dir;
std::string test_file_location;
- std::string url;
-
if (argc > 1) {
test_file_location = argv[1];
}
-
PcapTestHarness harness;
-
- harness.setKeyDir(key_dir);
-
+ harness.setKeyDir("");
harness.run(test_file_location);
-
return 0;
}