This is an automated email from the ASF dual-hosted git repository. fgerlits pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit e8736bd04414090890f948dc02de406a40b12103 Author: Gabor Gyimesi <[email protected]> AuthorDate: Tue Mar 29 16:59:20 2022 +0200 MINIFICPP-1780 Restart agent after C2 property update Co-authored-by: Marton Szasz <[email protected]> Signed-off-by: Ferenc Gerlits <[email protected]> This closes #1299 --- extensions/coap/tests/CoapIntegrationBase.h | 3 +- extensions/http-curl/protocols/RESTSender.cpp | 2 +- extensions/http-curl/tests/C2PauseResumeTest.cpp | 3 +- .../http-curl/tests/C2PropertiesUpdateTests.cpp | 23 +- .../tests/ControllerServiceIntegrationTests.cpp | 12 +- extensions/http-curl/tests/HTTPHandlers.h | 2 +- extensions/http-curl/tests/HTTPIntegrationBase.h | 4 + extensions/systemd/CMakeLists.txt | 2 +- extensions/systemd/ConsumeJournald.cpp | 2 +- extensions/systemd/ConsumeJournald.h | 4 +- libminifi/include/FlowController.h | 21 +- libminifi/include/agent/build_description.h | 36 +- libminifi/include/c2/C2Agent.h | 10 +- libminifi/include/c2/C2Client.h | 14 +- libminifi/include/c2/C2Payload.h | 2 +- libminifi/include/core/state/Value.h | 124 +++--- .../include/core/state/nodes/AgentInformation.h | 207 ++++------ .../include/core/state/nodes/FlowInformation.h | 12 +- libminifi/include/core/state/nodes/MetricsBase.h | 2 +- libminifi/include/properties/Configuration.h | 1 + .../include/utils/FifoExecutor.h | 13 +- libminifi/include/utils/SmallString.h | 4 + libminifi/include/utils/file/FileUtils.h | 6 +- .../include/utils/meta/type_list.h | 36 +- libminifi/src/Configuration.cpp | 1 + libminifi/src/FlowController.cpp | 25 +- libminifi/src/c2/C2Agent.cpp | 48 +-- libminifi/src/c2/C2Client.cpp | 30 +- libminifi/src/core/state/Value.cpp | 48 ++- .../src/utils/FifoExecutor.cpp | 15 +- libminifi/src/utils/file/FileUtils.cpp | 7 + libminifi/test/aws-tests/FetchS3ObjectTests.cpp | 8 +- libminifi/test/flow-tests/TestControllerWithFlow.h | 3 +- libminifi/test/integration/IntegrationBase.h | 116 ++++-- .../test/integration/ProvenanceReportingTest.cpp | 18 +- .../test/persistence-tests/PersistenceTests.cpp | 4 +- libminifi/test/rocksdb-tests/RepoTests.cpp | 2 +- libminifi/test/unit/ProvenanceTestHelper.h | 3 +- main/AgentDocs.cpp | 40 +- main/AgentDocs.h | 19 +- main/MiNiFiMain.cpp | 449 +++++++++++---------- nanofi/src/cxx/C2CallbackAgent.cpp | 14 +- 42 files changed, 689 insertions(+), 706 deletions(-) diff --git a/extensions/coap/tests/CoapIntegrationBase.h b/extensions/coap/tests/CoapIntegrationBase.h index 74f7c344f..520aaaf76 100644 --- a/extensions/coap/tests/CoapIntegrationBase.h +++ b/extensions/coap/tests/CoapIntegrationBase.h @@ -74,7 +74,8 @@ class CoapIntegrationBase : public IntegrationBase { std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo); - std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME); + std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME, + std::make_shared<utils::file::FileSystem>(), []{}); controller->load(); controller->start(); diff --git a/extensions/http-curl/protocols/RESTSender.cpp b/extensions/http-curl/protocols/RESTSender.cpp index 64c7408ea..af15b4dbf 100644 --- a/extensions/http-curl/protocols/RESTSender.cpp +++ b/extensions/http-curl/protocols/RESTSender.cpp @@ -141,7 +141,7 @@ C2Payload RESTSender::sendPayload(const std::string url, const Direction directi } if (payload.getOperation() == Operation::TRANSFER) { - file_callback = std::unique_ptr<utils::ByteOutputCallback>(new utils::ByteOutputCallback(std::numeric_limits<size_t>::max())); + file_callback = std::make_unique<utils::ByteOutputCallback>(std::numeric_limits<size_t>::max()); read.pos = 0; read.ptr = file_callback.get(); client.setReadCallback(&read); diff --git a/extensions/http-curl/tests/C2PauseResumeTest.cpp b/extensions/http-curl/tests/C2PauseResumeTest.cpp index d4cbbf83c..bfb69fd18 100644 --- a/extensions/http-curl/tests/C2PauseResumeTest.cpp +++ b/extensions/http-curl/tests/C2PauseResumeTest.cpp @@ -131,7 +131,8 @@ int main(int argc, char **argv) { test_repo, test_repo, content_repo, stream_factory, configuration, args.test_file); std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>( - test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME); + test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME, + std::make_shared<utils::file::FileSystem>(), []{}); core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, args.test_file); diff --git a/extensions/http-curl/tests/C2PropertiesUpdateTests.cpp b/extensions/http-curl/tests/C2PropertiesUpdateTests.cpp index 6d96de6ba..d92497c0b 100644 --- a/extensions/http-curl/tests/C2PropertiesUpdateTests.cpp +++ b/extensions/http-curl/tests/C2PropertiesUpdateTests.cpp @@ -80,7 +80,12 @@ class C2HeartbeatHandler : public ServerAwareHandler { class VerifyPropertyUpdate : public HTTPIntegrationBase { public: + VerifyPropertyUpdate() :fn_{[]{}} {} explicit VerifyPropertyUpdate(std::function<void()> fn) : fn_(std::move(fn)) {} + VerifyPropertyUpdate(const VerifyPropertyUpdate&) = delete; + VerifyPropertyUpdate(VerifyPropertyUpdate&&) = default; + VerifyPropertyUpdate& operator=(const VerifyPropertyUpdate&) = delete; + VerifyPropertyUpdate& operator=(VerifyPropertyUpdate&&) = default; void testSetup() {} @@ -89,6 +94,8 @@ class VerifyPropertyUpdate : public HTTPIntegrationBase { } std::function<void()> fn_; + + [[nodiscard]] int getRestartRequestedCount() const noexcept { return restart_requested_count_; } }; static const std::string properties_file = @@ -96,7 +103,7 @@ static const std::string properties_file = "nifi.c2.agent.protocol.class=RESTSender\n" "nifi.c2.enable=true\n" "nifi.c2.agent.class=test\n" - "nifi.c2.agent.heartbeat.period=100\n"; + "nifi.c2.agent.heartbeat.period=500\n"; static const std::string log_properties_file = "logger.root=INFO,ostream\n"; @@ -151,10 +158,18 @@ int main() { assert(!log_test_controller->contains("DummyClass3::before", 0s)); } - VerifyPropertyUpdate harness([&] { + // On msvc, the passed lambda can't capture a reference to the object under construction, so we need to late-init harness. + VerifyPropertyUpdate harness; + harness = VerifyPropertyUpdate([&] { assert(utils::verifyEventHappenedInPollTime(3s, [&] {return ack_handler.isAcknowledged("79");})); - assert(utils::verifyEventHappenedInPollTime(3s, [&] {return ack_handler.getApplyCount("FULLY_APPLIED") == 1;})); - assert(utils::verifyEventHappenedInPollTime(3s, [&] {return ack_handler.getApplyCount("NO_OPERATION") > 0;})); + assert(utils::verifyEventHappenedInPollTime(3s, [&] { + return ack_handler.getApplyCount("FULLY_APPLIED") == 1 + && harness.getRestartRequestedCount() == 1; + })); + assert(utils::verifyEventHappenedInPollTime(3s, [&] { + return ack_handler.getApplyCount("NO_OPERATION") > 0 + && harness.getRestartRequestedCount() == 1; // only one, i.e. no additional restart requests compared to the previous update. + })); // update operation acknowledged { // verify final log levels diff --git a/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp b/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp index 839769788..72669b64b 100644 --- a/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp +++ b/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp @@ -69,13 +69,15 @@ int main(int argc, char **argv) { std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration); std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); content_repo->initialize(configuration); - std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>( - new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, args.test_file)); + std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::make_unique<core::YamlConfiguration>( + test_repo, test_repo, content_repo, stream_factory, configuration, args.test_file); std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo); - std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), - content_repo, - DEFAULT_ROOT_GROUP_NAME); + const auto controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), + content_repo, + DEFAULT_ROOT_GROUP_NAME, + std::make_shared<utils::file::FileSystem>(), + []{}); disabled = false; std::shared_ptr<core::controller::ControllerServiceMap> map = std::make_shared<core::controller::ControllerServiceMap>(); diff --git a/extensions/http-curl/tests/HTTPHandlers.h b/extensions/http-curl/tests/HTTPHandlers.h index 92478a00a..7bae42aed 100644 --- a/extensions/http-curl/tests/HTTPHandlers.h +++ b/extensions/http-curl/tests/HTTPHandlers.h @@ -476,7 +476,7 @@ class HeartbeatHandler : public ServerAwareHandler { classes.push_back(proc["type"].GetString()); } - auto group = minifi::BuildDescription::getClassDescriptions(str); + auto group = minifi::BuildDescription{}.getClassDescriptions(str); for (const auto& proc : group.processors_) { assert(std::find(classes.begin(), classes.end(), proc.class_name_) != std::end(classes)); (void)proc; diff --git a/extensions/http-curl/tests/HTTPIntegrationBase.h b/extensions/http-curl/tests/HTTPIntegrationBase.h index 1136f217b..6870a14fb 100644 --- a/extensions/http-curl/tests/HTTPIntegrationBase.h +++ b/extensions/http-curl/tests/HTTPIntegrationBase.h @@ -47,6 +47,10 @@ class HTTPIntegrationBase : public IntegrationBase { : IntegrationBase(waitTime), server(nullptr) { } + HTTPIntegrationBase(const HTTPIntegrationBase&) = delete; + HTTPIntegrationBase(HTTPIntegrationBase&&) = default; + HTTPIntegrationBase& operator=(const HTTPIntegrationBase&) = delete; + HTTPIntegrationBase& operator=(HTTPIntegrationBase&&) = default; virtual void setUrl(const std::string &url, ServerAwareHandler *handler); diff --git a/extensions/systemd/CMakeLists.txt b/extensions/systemd/CMakeLists.txt index 1cb668ba5..41bd52bf1 100644 --- a/extensions/systemd/CMakeLists.txt +++ b/extensions/systemd/CMakeLists.txt @@ -19,7 +19,7 @@ include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt) -add_library(minifi-systemd SHARED ConsumeJournald.cpp WorkerThread.cpp libwrapper/LibWrapper.cpp libwrapper/DlopenWrapper.cpp) +add_library(minifi-systemd SHARED ConsumeJournald.cpp libwrapper/LibWrapper.cpp libwrapper/DlopenWrapper.cpp) target_link_libraries(minifi-systemd ${LIBMINIFI} Threads::Threads date::date) diff --git a/extensions/systemd/ConsumeJournald.cpp b/extensions/systemd/ConsumeJournald.cpp index 94f3ca8b1..98e800ae2 100644 --- a/extensions/systemd/ConsumeJournald.cpp +++ b/extensions/systemd/ConsumeJournald.cpp @@ -81,7 +81,7 @@ void ConsumeJournald::initialize() { setSupportedProperties({BatchSize, PayloadFormat, IncludeTimestamp, JournalType, ProcessOldMessages, TimestampFormat}); setSupportedRelationships({Success}); - worker_ = std::make_unique<Worker>(); + worker_ = std::make_unique<utils::FifoExecutor>(); } void ConsumeJournald::notifyStop() { diff --git a/extensions/systemd/ConsumeJournald.h b/extensions/systemd/ConsumeJournald.h index a96c07e03..830e657e2 100644 --- a/extensions/systemd/ConsumeJournald.h +++ b/extensions/systemd/ConsumeJournald.h @@ -35,7 +35,7 @@ #include "libwrapper/LibWrapper.h" #include "utils/Deleters.h" #include "utils/gsl.h" -#include "WorkerThread.h" +#include "utils/FifoExecutor.h" namespace org { namespace apache { namespace nifi { namespace minifi { namespace extensions { namespace systemd { @@ -97,7 +97,7 @@ class ConsumeJournald final : public core::Processor { std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<ConsumeJournald>::getLogger(); core::CoreComponentStateManager* state_manager_; std::unique_ptr<libwrapper::LibWrapper> libwrapper_; - std::unique_ptr<Worker> worker_; + std::unique_ptr<utils::FifoExecutor> worker_; std::unique_ptr<libwrapper::Journal> journal_; std::size_t batch_size_ = 1000; diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h index 0d5f6d7cd..92fa9aeb6 100644 --- a/libminifi/include/FlowController.h +++ b/libminifi/include/FlowController.h @@ -22,6 +22,7 @@ #include <algorithm> #include <atomic> +#include <functional> #include <map> #include <memory> #include <mutex> @@ -57,10 +58,7 @@ #include "utils/Id.h" #include "utils/file/FileSystem.h" -namespace org { -namespace apache { -namespace nifi { -namespace minifi { +namespace org::apache::nifi::minifi { namespace state { class ProcessorController; @@ -77,12 +75,14 @@ class FlowController : public core::controller::ForwardingControllerServiceProvi public: FlowController(std::shared_ptr<core::Repository> provenance_repo, std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<Configure> configure, std::unique_ptr<core::FlowConfiguration> flow_configuration, - std::shared_ptr<core::ContentRepository> content_repo, std::string name = DEFAULT_ROOT_GROUP_NAME, - std::shared_ptr<utils::file::FileSystem> filesystem = std::make_shared<utils::file::FileSystem>()); + std::shared_ptr<core::ContentRepository> content_repo, const std::string& name = DEFAULT_ROOT_GROUP_NAME, + std::shared_ptr<utils::file::FileSystem> filesystem = std::make_shared<utils::file::FileSystem>(), + std::function<void()> request_restart = []{}); FlowController(std::shared_ptr<core::Repository> provenance_repo, std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<Configure> configure, std::unique_ptr<core::FlowConfiguration> flow_configuration, - std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<utils::file::FileSystem> filesystem); + std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<utils::file::FileSystem> filesystem, + std::function<void()> request_restart = []{}); ~FlowController() override; @@ -181,7 +181,7 @@ class FlowController : public core::controller::ForwardingControllerServiceProvi * Retrieves the agent manifest to be sent as a response to C2 DESCRIBE manifest * @return the agent manifest response node */ - std::shared_ptr<state::response::ResponseNode> getAgentManifest() const override; + std::shared_ptr<state::response::ResponseNode> getAgentManifest() override; uint64_t getUptime() override; @@ -254,9 +254,6 @@ class FlowController : public core::controller::ForwardingControllerServiceProvi std::map<utils::Identifier, std::unique_ptr<state::ProcessorController>> processor_to_controller_; }; -} // namespace minifi -} // namespace nifi -} // namespace apache -} // namespace org +} // namespace org::apache::nifi::minifi #endif // LIBMINIFI_INCLUDE_FLOWCONTROLLER_H_ diff --git a/libminifi/include/agent/build_description.h b/libminifi/include/agent/build_description.h index 7eec05062..6017fd71f 100644 --- a/libminifi/include/agent/build_description.h +++ b/libminifi/include/agent/build_description.h @@ -32,10 +32,7 @@ #include "core/Annotation.h" #include "io/validation.h" -namespace org { -namespace apache { -namespace nifi { -namespace minifi { +namespace org::apache::nifi::minifi { class ClassDescription { public: @@ -71,6 +68,10 @@ struct Components { std::vector<ClassDescription> processors_; std::vector<ClassDescription> controller_services_; std::vector<ClassDescription> other_components_; + + [[nodiscard]] bool empty() const noexcept { + return processors_.empty() && controller_services_.empty() && other_components_.empty(); + } }; struct BundleDetails { @@ -121,13 +122,8 @@ class ExternalBuildDescription { class BuildDescription { public: - static struct Components getClassDescriptions(const std::string& group = "minifi-system") { - static std::map<std::string, struct Components> class_mappings; -#ifndef WIN32 - if (UNLIKELY(IsNullOrEmpty(class_mappings[group].processors_) && IsNullOrEmpty(class_mappings[group].processors_))) { -#else - if (class_mappings[group].processors_.empty()) { -#endif + struct Components getClassDescriptions(const std::string& group = "minifi-system") { + if (class_mappings_[group].empty()) { for (const auto& clazz : core::ClassLoader::getDefaultClassLoader().getClasses(group)) { std::string class_name = clazz; auto lastOfIdx = clazz.find_last_of("::"); @@ -158,22 +154,22 @@ class BuildDescription { description.inputRequirement_ = processor->getInputRequirementAsString(); description.isSingleThreaded_ = processor->isSingleThreaded(); description.class_relationships_ = processor->getSupportedRelationships(); - class_mappings[group].processors_.emplace_back(description); + class_mappings_[group].processors_.emplace_back(description); } else if (is_controller_service) { - class_mappings[group].controller_services_.emplace_back(description); + class_mappings_[group].controller_services_.emplace_back(description); } else { - class_mappings[group].other_components_.emplace_back(description); + class_mappings_[group].other_components_.emplace_back(description); } } } } - return class_mappings[group]; + return class_mappings_[group]; } -}; // NOLINT -} // namespace minifi -} // namespace nifi -} // namespace apache -} // namespace org + private: + std::map<std::string, struct Components> class_mappings_; +}; + +} // namespace org::apache::nifi::minifi #endif // LIBMINIFI_INCLUDE_AGENT_BUILD_DESCRIPTION_H_ diff --git a/libminifi/include/c2/C2Agent.h b/libminifi/include/c2/C2Agent.h index 38b33c03a..b32b029f6 100644 --- a/libminifi/include/c2/C2Agent.h +++ b/libminifi/include/c2/C2Agent.h @@ -68,8 +68,9 @@ class C2Agent : public state::UpdateController { C2Agent(core::controller::ControllerServiceProvider *controller, state::Pausable *pause_handler, state::StateMonitor* updateSink, - const std::shared_ptr<Configure> &configure, - const std::shared_ptr<utils::file::FileSystem> &filesystem = std::make_shared<utils::file::FileSystem>()); + std::shared_ptr<Configure> configure, + std::shared_ptr<utils::file::FileSystem> filesystem, + std::function<void()> request_restart); ~C2Agent() noexcept override { delete protocol_.load(); @@ -93,8 +94,6 @@ class C2Agent : public state::UpdateController { std::optional<std::string> fetchFlow(const std::string& uri) const; protected: - void restart_agent(); - /** * Check the collection of triggers for any updates that need to be handled. * This is an optional step @@ -246,6 +245,9 @@ class C2Agent : public state::UpdateController { bool manifest_sent_; const uint64_t C2RESPONSE_POLL_MS = 100; + + std::atomic<bool> restart_needed_ = false; + std::function<void()> request_restart_; }; } // namespace c2 diff --git a/libminifi/include/c2/C2Client.h b/libminifi/include/c2/C2Client.h index 14d4dcdd1..7180b281e 100644 --- a/libminifi/include/c2/C2Client.h +++ b/libminifi/include/c2/C2Client.h @@ -36,11 +36,7 @@ #include "core/Flow.h" #include "utils/file/FileSystem.h" -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace c2 { +namespace org::apache::nifi::minifi::c2 { class C2Client : public core::Flow, public state::response::NodeReporter { public: @@ -48,6 +44,7 @@ class C2Client : public core::Flow, public state::response::NodeReporter { std::shared_ptr<Configure> configuration, std::shared_ptr<core::Repository> provenance_repo, std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<core::ContentRepository> content_repo, std::unique_ptr<core::FlowConfiguration> flow_configuration, std::shared_ptr<utils::file::FileSystem> filesystem, + std::function<void()> request_restart, std::shared_ptr<core::logging::Logger> logger = core::logging::LoggerFactory<C2Client>::getLogger()); void initialize(core::controller::ControllerServiceProvider *controller, state::Pausable *pause_handler, state::StateMonitor* update_sink); @@ -84,10 +81,7 @@ class C2Client : public core::Flow, public state::response::NodeReporter { protected: std::atomic<bool> flow_update_{false}; + std::function<void()> request_restart_; }; -} // namespace c2 -} // namespace minifi -} // namespace nifi -} // namespace apache -} // namespace org +} // namespace org::apache::nifi::minifi::c2 diff --git a/libminifi/include/c2/C2Payload.h b/libminifi/include/c2/C2Payload.h index dfb21d457..badc5e0eb 100644 --- a/libminifi/include/c2/C2Payload.h +++ b/libminifi/include/c2/C2Payload.h @@ -230,7 +230,7 @@ class C2Payload : public state::Update { friend std::ostream& operator<<(std::ostream& out, const C2Payload& payload); - std::string str() const { + [[nodiscard]] std::string str() const { std::stringstream ss; ss << *this; return std::move(ss).str(); diff --git a/libminifi/include/core/state/Value.h b/libminifi/include/core/state/Value.h index 230eb6f65..6d8b77219 100644 --- a/libminifi/include/core/state/Value.h +++ b/libminifi/include/core/state/Value.h @@ -24,18 +24,15 @@ #include <iostream> #include <memory> #include <string> +#include <utility> #include <vector> #include <typeinfo> #include "utils/ValueParser.h" #include "utils/ValueCaster.h" #include "utils/Export.h" +#include "utils/meta/type_list.h" -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace state { -namespace response { +namespace org::apache::nifi::minifi::state::response { /** * Purpose: Represents an AST value @@ -48,17 +45,18 @@ class Value { using ParseException = utils::internal::ParseException; public: - explicit Value(const std::string &value) - : string_value(value), + explicit Value(std::string value) + : string_value(std::move(value)), type_id(std::type_index(typeid(std::string))) { } virtual ~Value() = default; - std::string getStringValue() const { + + [[nodiscard]] std::string getStringValue() const { return string_value; } - const char* c_str() const { + [[nodiscard]] const char* c_str() const { return string_value.c_str(); } @@ -67,7 +65,7 @@ class Value { return convertValueImpl<typename std::common_type<T>::type>(ref); } - bool empty() { + [[nodiscard]] bool empty() const noexcept { return string_value.empty(); } @@ -178,7 +176,7 @@ class UInt32Value : public Value { setTypeId<uint32_t>(); } - uint32_t getValue() const { + [[nodiscard]] uint32_t getValue() const { return value; } @@ -210,7 +208,7 @@ class UInt32Value : public Value { return utils::internal::cast_if_in_range(value, ref); } - uint32_t value; + uint32_t value{}; }; class IntValue : public Value { @@ -225,7 +223,7 @@ class IntValue : public Value { : Value(strvalue) { utils::internal::ValueParser(strvalue).parse(value).parseEnd(); } - int getValue() const { + [[nodiscard]] int getValue() const { return value; } @@ -256,7 +254,7 @@ class IntValue : public Value { return utils::internal::cast_if_in_range(value, ref); } - int value; + int value{}; }; class BoolValue : public Value { @@ -272,7 +270,7 @@ class BoolValue : public Value { utils::internal::ValueParser(strvalue).parse(value).parseEnd(); } - bool getValue() const { + [[nodiscard]] bool getValue() const { return value; } @@ -302,7 +300,7 @@ class BoolValue : public Value { return true; } - bool value; + bool value{}; private: template<typename T> @@ -329,7 +327,7 @@ class UInt64Value : public Value { setTypeId<uint64_t>(); } - uint64_t getValue() const { + [[nodiscard]] uint64_t getValue() const { return value; } @@ -358,7 +356,7 @@ class UInt64Value : public Value { return utils::internal::cast_if_in_range(value, ref); } - uint64_t value; + uint64_t value{}; }; class Int64Value : public Value { @@ -374,7 +372,7 @@ class Int64Value : public Value { setTypeId<int64_t>(); } - int64_t getValue() { + [[nodiscard]] int64_t getValue() const { return value; } @@ -404,7 +402,7 @@ class Int64Value : public Value { return utils::internal::cast_if_in_range(value, ref); } - int64_t value; + int64_t value{}; }; class DoubleValue : public Value { @@ -420,37 +418,37 @@ class DoubleValue : public Value { setTypeId<double>(); } - double getValue() { + [[nodiscard]] double getValue() const { return value; } protected: - virtual bool getValue(int& ref) { + bool getValue(int& ref) override { return utils::internal::cast_if_in_range(value, ref); } - virtual bool getValue(uint32_t& ref) { + bool getValue(uint32_t& ref) override { return utils::internal::cast_if_in_range(value, ref); } - virtual bool getValue(int64_t& ref ) { + bool getValue(int64_t& ref) override { return utils::internal::cast_if_in_range(value, ref); } - virtual bool getValue(uint64_t& ref) { + bool getValue(uint64_t& ref) override { return utils::internal::cast_if_in_range(value, ref); } - virtual bool getValue(bool&) { + bool getValue(bool&) override { return false; } - virtual bool getValue(double& ref) { + bool getValue(double& ref) override { ref = value; return true; } - double value; + double value{}; }; static inline std::shared_ptr<Value> createValue(const bool &object) { @@ -497,56 +495,50 @@ static inline std::shared_ptr<Value> createValue(const double &object) { * Purpose: ValueNode is the AST container for a value */ class ValueNode { + using supported_types = utils::meta::type_list<int, uint32_t, size_t, int64_t, uint64_t, bool, char*, const char*, double, std::string>; + public: - ValueNode() - : value_(nullptr) { - } + ValueNode() = default; - ValueNode(ValueNode &&vn) = default; - ValueNode(const ValueNode &vn) = default; + template<typename T> + requires (supported_types::contains<T>()) // NOLINT + /* implicit, because it doesn't change the meaning, and it simplifies construction of maps */ + ValueNode(const T value) // NOLINT + :value_{createValue(value)} + {} /** * Define the representations and eventual storage relationships through * createValue */ template<typename T> - auto operator=(const T ref) -> typename std::enable_if<std::is_same<T, int >::value || - std::is_same<T, uint32_t >::value || - std::is_same<T, size_t >::value || - std::is_same<T, int64_t>::value || - std::is_same<T, uint64_t >::value || - std::is_same<T, bool >::value || - std::is_same<T, char* >::value || - std::is_same<T, const char* >::value || - std::is_same<T, double>::value || - std::is_same<T, std::string>::value, ValueNode&>::type { + requires (supported_types::contains<T>()) // NOLINT + ValueNode& operator=(const T ref) { value_ = createValue(ref); return *this; } - ValueNode &operator=(const ValueNode &ref) = default; - inline bool operator==(const ValueNode &rhs) const { return to_string() == rhs.to_string(); } - inline bool operator==(const char*rhs) const { + inline bool operator==(const char* rhs) const { return to_string() == rhs; } - friend bool operator==(const char *lhs, const ValueNode& rhs) { + friend bool operator==(const char* lhs, const ValueNode& rhs) { return lhs == rhs.to_string(); } - std::string to_string() const { + [[nodiscard]] std::string to_string() const { return value_ ? value_->getStringValue() : ""; } - std::shared_ptr<Value> getValue() const { + [[nodiscard]] std::shared_ptr<Value> getValue() const { return value_; } - bool empty() const { + [[nodiscard]] bool empty() const noexcept { return value_ == nullptr || value_->empty(); } @@ -556,33 +548,23 @@ class ValueNode { struct SerializedResponseNode { std::string name; - ValueNode value; - bool array; - bool collapsible; + ValueNode value{}; + bool array = false; + bool collapsible = true; bool keep_empty = false; - std::vector<SerializedResponseNode> children; - - SerializedResponseNode(bool collapsible = true) // NOLINT - : array(false), - collapsible(collapsible) { - } + std::vector<SerializedResponseNode> children{}; - SerializedResponseNode(const SerializedResponseNode &other) = default; - - SerializedResponseNode &operator=(const SerializedResponseNode &other) = default; - - bool empty() const { + [[nodiscard]] bool empty() const noexcept { return value.empty() && children.empty(); } + + [[nodiscard]] std::string to_string() const; }; +inline std::string to_string(const SerializedResponseNode& node) { return node.to_string(); } + std::string hashResponseNodes(const std::vector<SerializedResponseNode>& nodes); -} // namespace response -} // namespace state -} // namespace minifi -} // namespace nifi -} // namespace apache -} // namespace org +} // namespace org::apache::nifi::minifi::state::response #endif // LIBMINIFI_INCLUDE_CORE_STATE_VALUE_H_ diff --git a/libminifi/include/core/state/nodes/AgentInformation.h b/libminifi/include/core/state/nodes/AgentInformation.h index 904ada129..8520c1e40 100644 --- a/libminifi/include/core/state/nodes/AgentInformation.h +++ b/libminifi/include/core/state/nodes/AgentInformation.h @@ -64,12 +64,7 @@ #include "utils/Export.h" #include "SupportedOperations.h" -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace state { -namespace response { +namespace org::apache::nifi::minifi::state::response { #define GROUP_STR "org.apache.nifi.minifi" @@ -91,7 +86,7 @@ class ComponentManifest : public DeviceInformation { std::vector<SerializedResponseNode> serialized; SerializedResponseNode resp; resp.name = "componentManifest"; - struct Components group = BuildDescription::getClassDescriptions(getName()); + struct Components group = build_description_.getClassDescriptions(getName()); serializeClassDescription(group.processors_, "processors", resp); serializeClassDescription(group.controller_services_, "controllerServices", resp); serialized.push_back(resp); @@ -316,6 +311,9 @@ class ComponentManifest : public DeviceInformation { response.children.push_back(type); } } + + private: + BuildDescription build_description_; }; class ExternalManifest : public ComponentManifest { @@ -499,11 +497,13 @@ class AgentStatus : public StateMonitorNode { } SerializedResponseNode serializeComponents() const { - SerializedResponseNode components_node(false); + SerializedResponseNode components_node; + components_node.collapsible = false; components_node.name = "components"; if (monitor_ != nullptr) { monitor_->executeOnAllComponents([&components_node](StateController& component){ - SerializedResponseNode component_node(false); + SerializedResponseNode component_node; + component_node.collapsible = false; component_node.name = component.getComponentName(); SerializedResponseNode uuid_node; @@ -585,7 +585,7 @@ class AgentMonitor { } } - void setStateMonitor(state::StateMonitor* &monitor) { + void setStateMonitor(state::StateMonitor* monitor) { monitor_ = monitor; } @@ -599,15 +599,15 @@ class AgentMonitor { */ class AgentManifest : public DeviceInformation { public: - AgentManifest(std::string name, const utils::Identifier& uuid) - : DeviceInformation(std::move(name), uuid) { + AgentManifest(const std::string& name, const utils::Identifier& uuid) + : DeviceInformation(name, uuid) { } - explicit AgentManifest(std::string name) - : DeviceInformation(std::move(name)) { + explicit AgentManifest(const std::string& name) + : DeviceInformation(name) { } - std::string getName() const { + std::string getName() const override { return "agentManifest"; } @@ -620,80 +620,38 @@ class AgentManifest : public DeviceInformation { } void setConfigurationReader(std::function<std::optional<std::string>(const std::string&)> configuration_reader) { - configuration_reader_ = configuration_reader; - } - - std::vector<SerializedResponseNode> serialize() { - static std::vector<SerializedResponseNode> serialized; - if (serialized.empty()) { - SerializedResponseNode ident; - - ident.name = "identifier"; - ident.value = AgentBuild::BUILD_IDENTIFIER; - - SerializedResponseNode type; - - type.name = "agentType"; - type.value = "cpp"; - - SerializedResponseNode version; - - version.name = "version"; - version.value = AgentBuild::VERSION; - - SerializedResponseNode buildInfo; - buildInfo.name = "buildInfo"; - - SerializedResponseNode build_version; - build_version.name = "version"; - build_version.value = AgentBuild::VERSION; - - SerializedResponseNode build_rev; - build_rev.name = "revision"; - build_rev.value = AgentBuild::BUILD_REV; - - SerializedResponseNode build_date; - build_date.name = "timestamp"; - build_date.value = (uint64_t) std::stoull(AgentBuild::BUILD_DATE); - - SerializedResponseNode compiler_command; - compiler_command.name = "compiler"; - compiler_command.value = AgentBuild::COMPILER; - - SerializedResponseNode compiler_flags; - compiler_flags.name = "flags"; - compiler_flags.value = AgentBuild::COMPILER_FLAGS; - - buildInfo.children.push_back(compiler_flags); - buildInfo.children.push_back(compiler_command); - - buildInfo.children.push_back(build_version); - buildInfo.children.push_back(build_rev); - buildInfo.children.push_back(build_date); - - Bundles bundles("bundles"); - - serialized.push_back(ident); - serialized.push_back(type); - serialized.push_back(buildInfo); - // serialize the bundle information. - for (auto bundle : bundles.serialize()) { - serialized.push_back(bundle); - } - - SchedulingDefaults defaults("schedulingDefaults"); - - for (auto defaultNode : defaults.serialize()) { - serialized.push_back(defaultNode); - } - - SupportedOperations supported_operations("supportedOperations"); - supported_operations.setStateMonitor(monitor_); - supported_operations.setUpdatePolicyController(update_policy_controller_); - supported_operations.setConfigurationReader(configuration_reader_); - for (const auto& operation : supported_operations.serialize()) { - serialized.push_back(operation); - } + configuration_reader_ = std::move(configuration_reader); + } + + std::vector<SerializedResponseNode> serialize() override { + std::vector<SerializedResponseNode> serialized = { + {.name = "identifier", .value = AgentBuild::BUILD_IDENTIFIER}, + {.name = "agentType", .value = "cpp"}, + {.name = "buildInfo", .children = { + {.name = "flags", .value = AgentBuild::COMPILER_FLAGS}, + {.name = "compiler", .value = AgentBuild::COMPILER}, + {.name = "version", .value = AgentBuild::VERSION}, + {.name = "revision", .value = AgentBuild::BUILD_REV}, + {.name = "timestamp", .value = static_cast<uint64_t>(std::stoull(AgentBuild::BUILD_DATE))} + }} + }; + { + auto bundles = Bundles{"bundles"}.serialize(); + std::move(std::begin(bundles), std::end(bundles), std::back_inserter(serialized)); + } + { + auto schedulingDefaults = SchedulingDefaults{"schedulingDefaults"}.serialize(); + std::move(std::begin(schedulingDefaults), std::end(schedulingDefaults), std::back_inserter(serialized)); + } + { + auto supportedOperations = [this]() { + SupportedOperations supported_operations("supportedOperations"); + supported_operations.setStateMonitor(monitor_); + supported_operations.setUpdatePolicyController(update_policy_controller_); + supported_operations.setConfigurationReader(configuration_reader_); + return supported_operations.serialize(); + }(); + std::move(std::begin(supportedOperations), std::end(supportedOperations), std::back_inserter(serialized)); } return serialized; } @@ -721,52 +679,42 @@ class AgentNode : public DeviceInformation, public AgentMonitor, public AgentIde } void setConfigurationReader(std::function<std::optional<std::string>(const std::string&)> configuration_reader) { - configuration_reader_ = configuration_reader; + configuration_reader_ = std::move(configuration_reader); } protected: - std::vector<SerializedResponseNode> serialize() { - std::vector<SerializedResponseNode> serialized; - - SerializedResponseNode ident; - - ident.name = "identifier"; - ident.value = provider_->getAgentIdentifier(); - serialized.push_back(ident); + std::vector<SerializedResponseNode> serialize() override { + std::vector<SerializedResponseNode> serialized = { + {.name = "identifier", .value = provider_->getAgentIdentifier()}, + }; const auto agent_class = provider_->getAgentClass(); if (agent_class) { - SerializedResponseNode agentClass; - agentClass.name = "agentClass"; - agentClass.value = *agent_class; - serialized.push_back(agentClass); + serialized.push_back({.name = "agentClass", .value = *agent_class}); } - SerializedResponseNode agentManifestHash; - agentManifestHash.name = "agentManifestHash"; - agentManifestHash.value = getAgentManifestHash(); - serialized.push_back(agentManifestHash); - + serialized.push_back({.name = "agentManifestHash", .value = getAgentManifestHash()}); return serialized; } std::vector<SerializedResponseNode> getAgentManifest() const { - SerializedResponseNode agentManifest; - agentManifest.name = "agentManifest"; - AgentManifest manifest{"manifest"}; - manifest.setStateMonitor(monitor_); - manifest.setUpdatePolicyController(update_policy_controller_); - manifest.setConfigurationReader(configuration_reader_); - agentManifest.children = manifest.serialize(); - return std::vector<SerializedResponseNode>{ agentManifest }; - } - - std::string getAgentManifestHash() { - if (!agentManifestHash_.has_value()) { - agentManifestHash_ = hashResponseNodes(getAgentManifest()); + if (agent_manifest_cache_) { return std::vector{*agent_manifest_cache_}; } + agent_manifest_cache_ = {.name = "agentManifest", .children = [this] { + AgentManifest manifest{"manifest"}; + manifest.setStateMonitor(monitor_); + manifest.setUpdatePolicyController(update_policy_controller_); + manifest.setConfigurationReader(configuration_reader_); + return manifest.serialize(); + }()}; + agent_manifest_hash_cache_.clear(); + return std::vector{ *agent_manifest_cache_ }; + } + + std::string getAgentManifestHash() const { + if (agent_manifest_hash_cache_.empty()) { + agent_manifest_hash_cache_ = hashResponseNodes(getAgentManifest()); } - - return *agentManifestHash_; + return agent_manifest_hash_cache_; } std::vector<SerializedResponseNode> getAgentStatus() const { @@ -787,9 +735,11 @@ class AgentNode : public DeviceInformation, public AgentMonitor, public AgentIde } private: - std::optional<std::string> agentManifestHash_; + mutable std::optional<SerializedResponseNode> agent_manifest_cache_; + mutable std::string agent_manifest_hash_cache_; controllers::UpdatePolicyControllerService* update_policy_controller_ = nullptr; std::function<std::optional<std::string>(const std::string&)> configuration_reader_; + std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<AgentNode>::getLogger(); }; /** @@ -811,7 +761,7 @@ class AgentInformation : public AgentNode { setArray(false); } - std::string getName() const { + std::string getName() const override { return "agentInfo"; } @@ -819,7 +769,7 @@ class AgentInformation : public AgentNode { include_agent_status_ = include; } - std::vector<SerializedResponseNode> serialize() { + std::vector<SerializedResponseNode> serialize() override { std::vector<SerializedResponseNode> serialized(AgentNode::serialize()); if (include_agent_manifest_) { auto manifest = getAgentManifest(); @@ -837,11 +787,6 @@ class AgentInformation : public AgentNode { bool include_agent_status_; }; -} // namespace response -} // namespace state -} // namespace minifi -} // namespace nifi -} // namespace apache -} // namespace org +} // namespace org::apache::nifi::minifi::state::response #endif // LIBMINIFI_INCLUDE_CORE_STATE_NODES_AGENTINFORMATION_H_ diff --git a/libminifi/include/core/state/nodes/FlowInformation.h b/libminifi/include/core/state/nodes/FlowInformation.h index b10e504fb..aae2e355c 100644 --- a/libminifi/include/core/state/nodes/FlowInformation.h +++ b/libminifi/include/core/state/nodes/FlowInformation.h @@ -194,11 +194,13 @@ class FlowInformation : public FlowMonitor { serialized.push_back(uri); if (!connections_.empty()) { - SerializedResponseNode queues(false); + SerializedResponseNode queues; + queues.collapsible = false; queues.name = "queues"; for (auto &queue : connections_) { - SerializedResponseNode repoNode(false); + SerializedResponseNode repoNode; + repoNode.collapsible = false; repoNode.name = queue.second->getName(); SerializedResponseNode queueUUIDNode; @@ -233,11 +235,13 @@ class FlowInformation : public FlowMonitor { } if (nullptr != monitor_) { - SerializedResponseNode componentsNode(false); + SerializedResponseNode componentsNode; + componentsNode.collapsible = false; componentsNode.name = "components"; monitor_->executeOnAllComponents([&componentsNode](StateController& component){ - SerializedResponseNode componentNode(false); + SerializedResponseNode componentNode; + componentNode.collapsible = false; componentNode.name = component.getComponentName(); SerializedResponseNode uuidNode; diff --git a/libminifi/include/core/state/nodes/MetricsBase.h b/libminifi/include/core/state/nodes/MetricsBase.h index e0af1043a..d16c2bcc4 100644 --- a/libminifi/include/core/state/nodes/MetricsBase.h +++ b/libminifi/include/core/state/nodes/MetricsBase.h @@ -207,7 +207,7 @@ class NodeReporter { * Retrieves the agent manifest to be sent as a response to C2 DESCRIBE manifest * @return the agent manifest response node */ - virtual std::shared_ptr<state::response::ResponseNode> getAgentManifest() const = 0; + virtual std::shared_ptr<state::response::ResponseNode> getAgentManifest() = 0; }; /** diff --git a/libminifi/include/properties/Configuration.h b/libminifi/include/properties/Configuration.h index d0b7343ae..f7db69cd4 100644 --- a/libminifi/include/properties/Configuration.h +++ b/libminifi/include/properties/Configuration.h @@ -107,6 +107,7 @@ class Configuration : public Properties { static constexpr const char *nifi_c2_agent_coap_port = "nifi.c2.agent.coap.port"; static constexpr const char *nifi_c2_agent_protocol_class = "nifi.c2.agent.protocol.class"; static constexpr const char *nifi_c2_agent_identifier = "nifi.c2.agent.identifier"; + static constexpr const char *nifi_c2_agent_identifier_fallback = "nifi.c2.agent.identifier.fallback"; static constexpr const char *nifi_c2_agent_trigger_classes = "nifi.c2.agent.trigger.classes"; static constexpr const char *nifi_c2_root_classes = "nifi.c2.root.classes"; static constexpr const char *nifi_c2_root_class_definitions = "nifi.c2.root.class.definitions"; diff --git a/extensions/systemd/WorkerThread.h b/libminifi/include/utils/FifoExecutor.h similarity index 81% rename from extensions/systemd/WorkerThread.h rename to libminifi/include/utils/FifoExecutor.h index 503fcb1b4..1aca59d56 100644 --- a/extensions/systemd/WorkerThread.h +++ b/libminifi/include/utils/FifoExecutor.h @@ -23,7 +23,7 @@ #include "utils/MinifiConcurrentQueue.h" -namespace org { namespace apache { namespace nifi { namespace minifi { namespace extensions { namespace systemd { +namespace org::apache::nifi::minifi::utils { namespace detail { class WorkerThread final { @@ -48,9 +48,9 @@ class WorkerThread final { } // namespace detail /** - * A worker that executes arbitrary functions with no parameters asynchronously on an internal thread, returning a future to the result. + * Executes arbitrary functions with no parameters asynchronously on an internal thread, returning a future to the result. */ -class Worker final { +class FifoExecutor final { public: template<typename Func> auto enqueue(Func func) -> std::future<decltype(func())> { @@ -64,9 +64,4 @@ class Worker final { detail::WorkerThread worker_thread_; }; -} // namespace systemd -} // namespace extensions -} // namespace minifi -} // namespace nifi -} // namespace apache -} // namespace org +} // namespace org::apache::nifi::minifi::utils diff --git a/libminifi/include/utils/SmallString.h b/libminifi/include/utils/SmallString.h index 94259c197..31c149e32 100644 --- a/libminifi/include/utils/SmallString.h +++ b/libminifi/include/utils/SmallString.h @@ -34,6 +34,10 @@ class SmallString : public std::array<char, N + 1> { return {c_str()}; } + [[nodiscard]] std::string_view view() const noexcept { + return std::string_view{this->data(), N}; + } + constexpr size_t length() const noexcept { return N; } diff --git a/libminifi/include/utils/file/FileUtils.h b/libminifi/include/utils/file/FileUtils.h index 75ad4e833..c4b1b68cb 100644 --- a/libminifi/include/utils/file/FileUtils.h +++ b/libminifi/include/utils/file/FileUtils.h @@ -607,12 +607,14 @@ inline std::error_code hide_file(const char* const file_name) { uint64_t computeChecksum(const std::string &file_name, uint64_t up_to_position); -inline std::string get_file_content(const std::string &file_name) { - std::ifstream file(file_name); +inline std::string get_content(const std::string &file_name) { + std::ifstream file(file_name, std::ifstream::binary); std::string content((std::istreambuf_iterator<char>(file)), std::istreambuf_iterator<char>()); return content; } +void put_content(const std::filesystem::path& filename, std::string_view new_contents); + bool contains(const std::filesystem::path& file_path, std::string_view text_to_search); diff --git a/main/AgentDocs.h b/libminifi/include/utils/meta/type_list.h similarity index 59% copy from main/AgentDocs.h copy to libminifi/include/utils/meta/type_list.h index 0402adf11..a94899abd 100644 --- a/main/AgentDocs.h +++ b/libminifi/include/utils/meta/type_list.h @@ -1,5 +1,4 @@ /** - * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -15,30 +14,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#ifndef MAIN_AGENTDOCS_H_ -#define MAIN_AGENTDOCS_H_ - -#include <iostream> - -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace docs { +#pragma once +#include <type_traits> -class AgentDocs { - public: - AgentDocs() = default; - ~AgentDocs() = default; - void generate(const std::string &docsdir, std::ostream &genStream); - private: - inline std::string extractClassName(const std::string &processor) const; +namespace org::apache::nifi::minifi::utils::meta { +template<typename... Types> +struct type_list { + template<typename T> + [[nodiscard]] constexpr static bool contains() noexcept { + return (std::is_same_v<T, Types> || ...); + } }; - -} /* namespace docs */ -} /* namespace minifi */ -} /* namespace nifi */ -} /* namespace apache */ -} /* namespace org */ - -#endif // MAIN_AGENTDOCS_H_ +} // namespace org::apache::nifi::minifi::utils::meta diff --git a/libminifi/src/Configuration.cpp b/libminifi/src/Configuration.cpp index 78e3e36af..4e50b2447 100644 --- a/libminifi/src/Configuration.cpp +++ b/libminifi/src/Configuration.cpp @@ -83,6 +83,7 @@ const std::vector<core::ConfigurationProperty> Configuration::CONFIGURATION_PROP core::ConfigurationProperty{Configuration::nifi_c2_agent_coap_port, gsl::make_not_null(core::StandardValidators::get().PORT_VALIDATOR.get())}, core::ConfigurationProperty{Configuration::nifi_c2_agent_protocol_class}, core::ConfigurationProperty{Configuration::nifi_c2_agent_identifier}, + core::ConfigurationProperty{Configuration::nifi_c2_agent_identifier_fallback}, core::ConfigurationProperty{Configuration::nifi_c2_agent_trigger_classes}, core::ConfigurationProperty{Configuration::nifi_c2_root_classes}, core::ConfigurationProperty{Configuration::nifi_c2_root_class_definitions}, diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp index b6fe4cc3e..ef76b9f8c 100644 --- a/libminifi/src/FlowController.cpp +++ b/libminifi/src/FlowController.cpp @@ -45,18 +45,16 @@ #include "io/NetworkPrioritizer.h" #include "io/FileStream.h" -namespace org { -namespace apache { -namespace nifi { -namespace minifi { +namespace org::apache::nifi::minifi { FlowController::FlowController(std::shared_ptr<core::Repository> provenance_repo, std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<Configure> configure, std::unique_ptr<core::FlowConfiguration> flow_configuration, - std::shared_ptr<core::ContentRepository> content_repo, const std::string /*name*/, - std::shared_ptr<utils::file::FileSystem> filesystem) + std::shared_ptr<core::ContentRepository> content_repo, const std::string& /*name*/, + std::shared_ptr<utils::file::FileSystem> filesystem, std::function<void()> request_restart) : core::controller::ForwardingControllerServiceProvider(core::getClassName<FlowController>()), c2::C2Client(std::move(configure), std::move(provenance_repo), std::move(flow_file_repo), - std::move(content_repo), std::move(flow_configuration), std::move(filesystem)), + std::move(content_repo), std::move(flow_configuration), std::move(filesystem), + std::move(request_restart), core::logging::LoggerFactory<c2::C2Client>::getLogger()), running_(false), updating_(false), initialized_(false), @@ -76,9 +74,10 @@ FlowController::FlowController(std::shared_ptr<core::Repository> provenance_repo FlowController::FlowController(std::shared_ptr<core::Repository> provenance_repo, std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<Configure> configure, std::unique_ptr<core::FlowConfiguration> flow_configuration, - std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<utils::file::FileSystem> filesystem) + std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<utils::file::FileSystem> filesystem, + std::function<void()> request_restart) : FlowController(std::move(provenance_repo), std::move(flow_file_repo), std::move(configure), std::move(flow_configuration), - std::move(content_repo), DEFAULT_ROOT_GROUP_NAME, std::move(filesystem)) {} + std::move(content_repo), DEFAULT_ROOT_GROUP_NAME, std::move(filesystem), std::move(request_restart)) {} std::optional<std::chrono::milliseconds> FlowController::loadShutdownTimeoutFromConfiguration() { std::string shutdown_timeout_str; @@ -426,13 +425,14 @@ int16_t FlowController::clearConnection(const std::string &connection) { return -1; } -std::shared_ptr<state::response::ResponseNode> FlowController::getAgentManifest() const { +std::shared_ptr<state::response::ResponseNode> FlowController::getAgentManifest() { auto agentInfo = std::make_shared<state::response::AgentInformation>("agentInfo"); agentInfo->setUpdatePolicyController(std::static_pointer_cast<controllers::UpdatePolicyControllerService>(getControllerService(c2::C2Agent::UPDATE_NAME)).get()); agentInfo->setAgentIdentificationProvider(configuration_); agentInfo->setConfigurationReader([this](const std::string& key){ return configuration_->getString(key); }); + agentInfo->setStateMonitor(this); agentInfo->includeAgentStatus(false); return agentInfo; } @@ -552,7 +552,4 @@ state::StateController* FlowController::getProcessorController(const std::string return foundController.get(); } -} // namespace minifi -} // namespace nifi -} // namespace apache -} // namespace org +} // namespace org::apache::nifi::minifi diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp index 4d1d55df6..dbc349ea2 100644 --- a/libminifi/src/c2/C2Agent.cpp +++ b/libminifi/src/c2/C2Agent.cpp @@ -47,27 +47,25 @@ using namespace std::literals::chrono_literals; -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace c2 { +namespace org::apache::nifi::minifi::c2 { C2Agent::C2Agent(core::controller::ControllerServiceProvider *controller, state::Pausable *pause_handler, state::StateMonitor* updateSink, - const std::shared_ptr<Configure> &configuration, - const std::shared_ptr<utils::file::FileSystem> &filesystem) + std::shared_ptr<Configure> configuration, + std::shared_ptr<utils::file::FileSystem> filesystem, + std::function<void()> request_restart) : heart_beat_period_(3s), max_c2_responses(5), update_sink_(updateSink), update_service_(nullptr), controller_(controller), pause_handler_(pause_handler), - configuration_(configuration), - filesystem_(filesystem), + configuration_(std::move(configuration)), + filesystem_(std::move(filesystem)), protocol_(nullptr), - thread_pool_(2, false, nullptr, "C2 threadpool") { + thread_pool_(2, false, nullptr, "C2 threadpool"), + request_restart_(std::move(request_restart)) { manifest_sent_ = false; last_run_ = std::chrono::steady_clock::now(); @@ -80,7 +78,7 @@ C2Agent::C2Agent(core::controller::ControllerServiceProvider *controller, // create a stubbed service for updating the flow identifier } - configure(configuration, false); + configure(configuration_, false); functions_.emplace_back([this] {return produce();}); functions_.emplace_back([this] {return consume();}); @@ -347,7 +345,7 @@ void C2Agent::handle_c2_server_response(const C2ContentResponse &resp) { update_sink_->stop(); C2Payload response(Operation::ACKNOWLEDGE, resp.ident, true); protocol_.load()->consumePayload(std::move(response)); - restart_agent(); + restart_needed_ = true; } break; case Operation::START: @@ -634,6 +632,7 @@ void C2Agent::handlePropertyUpdate(const C2ContentResponse &resp) { } C2Payload response(Operation::ACKNOWLEDGE, result, resp.ident, true); enqueue_c2_response(std::move(response)); + if (result != state::UpdateState::NO_OPERATION) { restart_needed_ = true; } } /** @@ -715,19 +714,6 @@ void C2Agent::handle_transfer(const C2ContentResponse &resp) { } } -void C2Agent::restart_agent() { - std::string cwd = utils::Environment::getCurrentWorkingDirectory(); - if (cwd.empty()) { - logger_->log_error("Could not restart the agent because the working directory could not be determined"); - return; - } - - std::string command = cwd + "/bin/minifi.sh restart"; - if (system(command.c_str()) != 0) { - logger_->log_error("System command '%s' failed", command); - } -} - utils::TaskRescheduleInfo C2Agent::produce() { // place priority on messages to send to the c2 server if (protocol_.load() != nullptr) { @@ -755,6 +741,12 @@ utils::TaskRescheduleInfo C2Agent::produce() { } }); + if (restart_needed_ && requests.empty()) { + configuration_->commitChanges(); + request_restart_(); + return utils::TaskRescheduleInfo::Done(); + } + try { performHeartBeat(); } @@ -911,8 +903,4 @@ void C2Agent::enqueue_c2_server_response(C2Payload &&resp) { responses.enqueue(std::move(resp)); } -} // namespace c2 -} // namespace minifi -} // namespace nifi -} // namespace apache -} // namespace org +} // namespace org::apache::nifi::minifi::c2 diff --git a/libminifi/src/c2/C2Client.cpp b/libminifi/src/c2/C2Client.cpp index 14009c5b2..a4f7dd887 100644 --- a/libminifi/src/c2/C2Client.cpp +++ b/libminifi/src/c2/C2Client.cpp @@ -16,6 +16,7 @@ * limitations under the License. */ +#include <filesystem> #include <memory> #include <map> #include "c2/C2Client.h" @@ -30,22 +31,20 @@ #include "c2/C2Agent.h" #include "core/state/nodes/FlowInformation.h" #include "utils/file/FileSystem.h" +#include "utils/file/FileUtils.h" -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace c2 { +namespace org::apache::nifi::minifi::c2 { C2Client::C2Client( std::shared_ptr<Configure> configuration, std::shared_ptr<core::Repository> provenance_repo, std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<core::ContentRepository> content_repo, std::unique_ptr<core::FlowConfiguration> flow_configuration, std::shared_ptr<utils::file::FileSystem> filesystem, - std::shared_ptr<core::logging::Logger> logger) + std::function<void()> request_restart, std::shared_ptr<core::logging::Logger> logger) : core::Flow(std::move(provenance_repo), std::move(flow_file_repo), std::move(content_repo), std::move(flow_configuration)), configuration_(std::move(configuration)), filesystem_(std::move(filesystem)), - logger_(std::move(logger)) {} + logger_(std::move(logger)), + request_restart_(std::move(request_restart)) {} void C2Client::stopC2() { if (c2_agent_) { @@ -68,7 +67,14 @@ void C2Client::initialize(core::controller::ControllerServiceProvider *controlle logger_->log_info("Agent class is not predefined"); } - configuration_->setFallbackAgentIdentifier(getControllerUUID().to_string()); + // Set a persistent fallback agent id. This is needed so that the C2 server can identify the same agent after a restart, even if nifi.c2.agent.identifier is not specified. + if (auto id = configuration_->get(Configuration::nifi_c2_agent_identifier_fallback)) { + configuration_->setFallbackAgentIdentifier(*id); + } else { + const auto agent_id = getControllerUUID().to_string(); + configuration_->setFallbackAgentIdentifier(agent_id); + configuration_->set(Configuration::nifi_c2_agent_identifier_fallback, agent_id, PropertyChangeLifetime::PERSISTENT); + } { std::lock_guard<std::mutex> lock(initialization_mutex_); @@ -141,7 +147,7 @@ void C2Client::initialize(core::controller::ControllerServiceProvider *controlle if (!initialized_) { // C2Agent is initialized once, meaning that a C2-triggered flow/configuration update // might not be equal to a fresh restart - c2_agent_ = std::make_unique<c2::C2Agent>(controller, pause_handler, update_sink, configuration_, filesystem_); + c2_agent_ = std::make_unique<c2::C2Agent>(controller, pause_handler, update_sink, configuration_, filesystem_, request_restart_); c2_agent_->start(); initialized_ = true; } @@ -355,8 +361,4 @@ void C2Client::updateResponseNodeConnections() { } } -} // namespace c2 -} // namespace minifi -} // namespace nifi -} // namespace apache -} // namespace org +} // namespace org::apache::nifi::minifi::c2 diff --git a/libminifi/src/core/state/Value.cpp b/libminifi/src/core/state/Value.cpp index 49cc8fda7..120b228a8 100644 --- a/libminifi/src/core/state/Value.cpp +++ b/libminifi/src/core/state/Value.cpp @@ -20,13 +20,11 @@ #include <openssl/sha.h> #include <utility> #include <string> +#include "rapidjson/document.h" +#include "rapidjson/writer.h" +#include "rapidjson/stringbuffer.h" -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace state { -namespace response { +namespace org::apache::nifi::minifi::state::response { const std::type_index Value::UINT64_TYPE = std::type_index(typeid(uint64_t)); const std::type_index Value::INT64_TYPE = std::type_index(typeid(int64_t)); @@ -58,10 +56,36 @@ std::string hashResponseNodes(const std::vector<SerializedResponseNode>& nodes) return utils::StringUtils::to_hex(digest, true /*uppercase*/); } -} /* namespace response */ -} /* namespace state */ -} /* namespace minifi */ -} /* namespace nifi */ -} /* namespace apache */ -} /* namespace org */ +namespace { +rapidjson::Value nodeToJson(const SerializedResponseNode& node, rapidjson::MemoryPoolAllocator<rapidjson::CrtAllocator>& alloc) { + if (node.value.empty()) { + if (node.array) { + rapidjson::Value result(rapidjson::kArrayType); + for (const auto& elem: node.children) { + result.PushBack(nodeToJson(elem, alloc), alloc); + } + return result; + } else { + rapidjson::Value result(rapidjson::kObjectType); + for (const auto& elem: node.children) { + result.AddMember(rapidjson::Value(elem.name.c_str(), alloc), nodeToJson(elem, alloc), alloc); + } + return result; + } + } else { + return rapidjson::Value(node.value.to_string().c_str(), alloc); + } +} +} // namespace + +std::string SerializedResponseNode::to_string() const { + rapidjson::Document doc; + doc.SetObject(); + doc.AddMember(rapidjson::Value(name.c_str(), doc.GetAllocator()), nodeToJson(*this, doc.GetAllocator()), doc.GetAllocator()); + rapidjson::StringBuffer buf; + rapidjson::Writer<rapidjson::StringBuffer> writer{buf}; + doc.Accept(writer); + return buf.GetString(); +} +} // namespace org::apache::nifi::minifi::state::response diff --git a/extensions/systemd/WorkerThread.cpp b/libminifi/src/utils/FifoExecutor.cpp similarity index 77% rename from extensions/systemd/WorkerThread.cpp rename to libminifi/src/utils/FifoExecutor.cpp index 4213d29f8..f7bf0a807 100644 --- a/extensions/systemd/WorkerThread.cpp +++ b/libminifi/src/utils/FifoExecutor.cpp @@ -15,11 +15,9 @@ * limitations under the License. */ -#include "WorkerThread.h" +#include "utils/FifoExecutor.h" -namespace org { namespace apache { namespace nifi { namespace minifi { namespace extensions { namespace systemd { - -namespace detail { +namespace org::apache::nifi::minifi::utils::detail { WorkerThread::WorkerThread() : thread_{&WorkerThread::run, this} {} @@ -33,11 +31,4 @@ void WorkerThread::run() noexcept { task_queue_.consumeWait([](std::packaged_task<void()>&& f) { f(); }); } } -} // namespace detail - -} // namespace systemd -} // namespace extensions -} // namespace minifi -} // namespace nifi -} // namespace apache -} // namespace org +} // namespace org::apache::nifi::minifi::utils::detail diff --git a/libminifi/src/utils/file/FileUtils.cpp b/libminifi/src/utils/file/FileUtils.cpp index 456211130..2dc6f41f8 100644 --- a/libminifi/src/utils/file/FileUtils.cpp +++ b/libminifi/src/utils/file/FileUtils.cpp @@ -106,6 +106,13 @@ std::chrono::time_point<std::chrono::system_clock> to_sys_time_point(const std:: #endif } +void put_content(const std::filesystem::path& filename, std::string_view new_contents) { + std::ofstream ofs; + ofs.exceptions(std::ofstream::badbit | std::ofstream::failbit); + ofs.open(filename, std::ofstream::binary); + ofs.write(new_contents.data(), gsl::narrow<std::streamsize>(new_contents.size())); +} + } // namespace file } // namespace utils } // namespace minifi diff --git a/libminifi/test/aws-tests/FetchS3ObjectTests.cpp b/libminifi/test/aws-tests/FetchS3ObjectTests.cpp index 844f33d0a..06999e321 100644 --- a/libminifi/test/aws-tests/FetchS3ObjectTests.cpp +++ b/libminifi/test/aws-tests/FetchS3ObjectTests.cpp @@ -26,7 +26,7 @@ namespace { using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime; -using org::apache::nifi::minifi::utils::file::get_file_content; +using org::apache::nifi::minifi::utils::file::get_content; using org::apache::nifi::minifi::utils::file::get_separator; class FetchS3ObjectTestsFixture : public FlowProcessorS3TestsFixture<minifi::aws::processors::FetchS3Object> { @@ -120,7 +120,7 @@ TEST_CASE_METHOD(FetchS3ObjectTestsFixture, "Test default properties", "[awsS3Co REQUIRE(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.expirationTimeRuleId value:" + S3_EXPIRATION_TIME_RULE_ID)); REQUIRE(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.sseAlgorithm value:" + S3_SSEALGORITHM_STR)); REQUIRE(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.version value:" + S3_VERSION_1)); - REQUIRE(get_file_content(output_dir + get_separator() + INPUT_FILENAME) == S3_CONTENT); + REQUIRE(get_content(output_dir + get_separator() + INPUT_FILENAME) == S3_CONTENT); REQUIRE(mock_s3_request_sender_ptr->get_object_request.GetVersionId().empty()); REQUIRE(!mock_s3_request_sender_ptr->get_object_request.VersionIdHasBeenSet()); REQUIRE(mock_s3_request_sender_ptr->get_object_request.GetRequestPayer() == Aws::S3::Model::RequestPayer::NOT_SET); @@ -140,7 +140,7 @@ TEST_CASE_METHOD(FetchS3ObjectTestsFixture, "Test empty optional S3 results", "[ REQUIRE(!LogTestController::getInstance().contains("key:s3.expirationTimeRuleId", std::chrono::seconds(0), std::chrono::milliseconds(0))); REQUIRE(!LogTestController::getInstance().contains("key:s3.sseAlgorithm", std::chrono::seconds(0), std::chrono::milliseconds(0))); REQUIRE(!LogTestController::getInstance().contains("key:s3.version", std::chrono::seconds(0), std::chrono::milliseconds(0))); - REQUIRE(get_file_content(output_dir + get_separator() + INPUT_FILENAME).empty()); + REQUIRE(get_content(output_dir + get_separator() + INPUT_FILENAME).empty()); } TEST_CASE_METHOD(FetchS3ObjectTestsFixture, "Test subdirectories on AWS", "[awsS3Config]") { @@ -150,7 +150,7 @@ TEST_CASE_METHOD(FetchS3ObjectTestsFixture, "Test subdirectories on AWS", "[awsS REQUIRE(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:filename value:logs.txt")); REQUIRE(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:path value:dir1/dir2")); REQUIRE(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:absolute.path value:dir1/dir2/logs.txt")); - REQUIRE(get_file_content(output_dir + get_separator() + INPUT_FILENAME).empty()); + REQUIRE(get_content(output_dir + get_separator() + INPUT_FILENAME).empty()); } TEST_CASE_METHOD(FetchS3ObjectTestsFixture, "Test optional values are set in request", "[awsS3Config]") { diff --git a/libminifi/test/flow-tests/TestControllerWithFlow.h b/libminifi/test/flow-tests/TestControllerWithFlow.h index 9841ea8ba..9151c6209 100644 --- a/libminifi/test/flow-tests/TestControllerWithFlow.h +++ b/libminifi/test/flow-tests/TestControllerWithFlow.h @@ -60,7 +60,8 @@ class TestControllerWithFlow: public TestController{ controller_ = std::make_shared<minifi::FlowController>( prov_repo, ff_repo, configuration_, std::move(flow), - content_repo, DEFAULT_ROOT_GROUP_NAME); + content_repo, DEFAULT_ROOT_GROUP_NAME, + std::make_shared<utils::file::FileSystem>(), []{}); controller_->load(std::move(root)); } diff --git a/libminifi/test/integration/IntegrationBase.h b/libminifi/test/integration/IntegrationBase.h index d247bd4bd..1c43656db 100644 --- a/libminifi/test/integration/IntegrationBase.h +++ b/libminifi/test/integration/IntegrationBase.h @@ -19,6 +19,7 @@ #define DEFAULT_WAITTIME_MSECS 3000 +#include <future> #include <memory> #include <optional> #include <string> @@ -34,6 +35,7 @@ #include "core/ConfigurableComponent.h" #include "controllers/SSLContextService.h" #include "HTTPUtils.h" +#include "utils/FifoExecutor.h" namespace minifi = org::apache::nifi::minifi; namespace core = minifi::core; @@ -42,7 +44,30 @@ namespace utils = minifi::utils; class IntegrationBase { public: explicit IntegrationBase(std::chrono::milliseconds waitTime = std::chrono::milliseconds(DEFAULT_WAITTIME_MSECS)); - + IntegrationBase(const IntegrationBase&) = delete; + IntegrationBase(IntegrationBase&& other) noexcept + :configuration{std::move(other.configuration)}, + flowController_{std::move(other.flowController_)}, + wait_time_{other.wait_time_}, + port{std::move(other.port)}, + scheme{std::move(other.scheme)}, + key_dir{std::move(other.key_dir)}, + state_dir{std::move(other.state_dir)}, + restart_requested_count_{other.restart_requested_count_.load()} + {} + IntegrationBase& operator=(const IntegrationBase&) = delete; + IntegrationBase& operator=(IntegrationBase&& other) noexcept { + if (&other == this) return *this; + configuration = std::move(other.configuration); + flowController_ = std::move(other.flowController_); + wait_time_ = other.wait_time_; + port = std::move(other.port); + scheme = std::move(other.scheme); + key_dir = std::move(other.key_dir); + state_dir = std::move(other.state_dir); + restart_requested_count_ = other.restart_requested_count_.load(); + return *this; + } virtual ~IntegrationBase() = default; virtual void run(const std::optional<std::string>& test_file_location = {}, const std::optional<std::string>& home_path = {}); @@ -93,6 +118,7 @@ class IntegrationBase { std::string port, scheme; std::string key_dir; std::string state_dir; + std::atomic<int> restart_requested_count_{0}; }; IntegrationBase::IntegrationBase(std::chrono::milliseconds waitTime) @@ -111,6 +137,7 @@ void IntegrationBase::configureSecurity() { } void IntegrationBase::run(const std::optional<std::string>& test_file_location, const std::optional<std::string>& home_path) { + using namespace std::literals::chrono_literals; testSetup(); std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>(); @@ -126,46 +153,67 @@ void IntegrationBase::run(const std::optional<std::string>& test_file_location, std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); content_repo->initialize(configuration); - std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration); - - bool should_encrypt_flow_config = (configuration->get(minifi::Configure::nifi_flow_configuration_encrypt) - | utils::flatMap(utils::StringUtils::toBool)).value_or(false); - std::shared_ptr<utils::file::FileSystem> filesystem; - if (home_path) { - filesystem = std::make_shared<utils::file::FileSystem>( - should_encrypt_flow_config, - utils::crypto::EncryptionProvider::create(*home_path)); - } else { - filesystem = std::make_shared<utils::file::FileSystem>(); - } - - std::unique_ptr<core::FlowConfiguration> flow_config = std::unique_ptr<core::YamlConfiguration>( - new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location, filesystem)); + std::atomic<bool> running = true; + utils::FifoExecutor assertion_runner; + std::future<void> assertions_done; + while (running) { + running = false; // Stop running after this iteration, unless restart is explicitly requested - auto controller_service_provider = flow_config->getControllerServiceProvider(); - char state_dir_name_template[] = "/var/tmp/integrationstate.XXXXXX"; - state_dir = utils::file::create_temp_directory(state_dir_name_template); - if (!configuration->get(minifi::Configure::nifi_state_management_provider_local_path)) { - configuration->set(minifi::Configure::nifi_state_management_provider_local_path, state_dir); - } - core::ProcessContext::getOrCreateDefaultStateManagerProvider(controller_service_provider.get(), configuration); + std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration); - std::shared_ptr<core::ProcessGroup> pg(flow_config->getRoot()); - queryRootProcessGroup(pg); + bool should_encrypt_flow_config = (configuration->get(minifi::Configure::nifi_flow_configuration_encrypt) + | utils::flatMap(utils::StringUtils::toBool)).value_or(false); - std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo); + std::shared_ptr<utils::file::FileSystem> filesystem; + if (home_path) { + filesystem = std::make_shared<utils::file::FileSystem>( + should_encrypt_flow_config, + utils::crypto::EncryptionProvider::create(*home_path)); + } else { + filesystem = std::make_shared<utils::file::FileSystem>(); + } - flowController_ = std::make_unique<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(flow_config), content_repo, DEFAULT_ROOT_GROUP_NAME); - flowController_->load(); - updateProperties(*flowController_); - flowController_->start(); + auto flow_config = std::make_unique<core::YamlConfiguration>(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location, filesystem); - runAssertions(); + auto controller_service_provider = flow_config->getControllerServiceProvider(); + char state_dir_name_template[] = "/var/tmp/integrationstate.XXXXXX"; + state_dir = utils::file::create_temp_directory(state_dir_name_template); + if (!configuration->get(minifi::Configure::nifi_state_management_provider_local_path)) { + configuration->set(minifi::Configure::nifi_state_management_provider_local_path, state_dir); + } + core::ProcessContext::getOrCreateDefaultStateManagerProvider(controller_service_provider.get(), configuration); + + std::shared_ptr<core::ProcessGroup> pg(flow_config->getRoot()); + queryRootProcessGroup(pg); + + std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo); + + const auto request_restart = [&, this] { + ++restart_requested_count_; + running = true; + }; + flowController_ = std::make_unique<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(flow_config), content_repo, DEFAULT_ROOT_GROUP_NAME, + std::make_shared<utils::file::FileSystem>(), request_restart); + flowController_->load(); + updateProperties(*flowController_); + flowController_->start(); + + assertions_done = assertion_runner.enqueue([this] { runAssertions(); }); + std::future_status status = std::future_status::ready; + while (!running && (status = assertions_done.wait_for(10ms)) == std::future_status::timeout) { /* wait */ } + if (running && status != std::future_status::timeout) { + // cancel restart, because assertions have finished running + running = false; + } - shutdownBeforeFlowController(); - flowController_->unload(); - flowController_->stopC2(); + if (!running) { + // Only stop servers if we're shutting down + shutdownBeforeFlowController(); + } + flowController_->unload(); + flowController_->stopC2(); + } cleanup(); } diff --git a/libminifi/test/integration/ProvenanceReportingTest.cpp b/libminifi/test/integration/ProvenanceReportingTest.cpp index 0a78b2fd5..80aad7d0e 100644 --- a/libminifi/test/integration/ProvenanceReportingTest.cpp +++ b/libminifi/test/integration/ProvenanceReportingTest.cpp @@ -20,17 +20,11 @@ #undef NDEBUG #include <cassert> #include <chrono> -#include <fstream> -#include <utility> #include <memory> +#include <utility> #include <string> #include <thread> -#include <type_traits> -#include <vector> #include "utils/file/FileUtils.h" -#include "utils/StringUtils.h" -#include "core/Core.h" -#include "core/logging/Logger.h" #include "core/ProcessGroup.h" #include "core/yaml/YamlConfiguration.h" #include "FlowController.h" @@ -38,7 +32,6 @@ #include "../unit/ProvenanceTestHelper.h" #include "io/StreamFactory.h" #include "../TestBase.h" -#include "../Catch.h" #include "utils/IntegrationTestUtils.h" int main(int argc, char **argv) { @@ -63,12 +56,13 @@ int main(int argc, char **argv) { configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location); std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration); std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); - std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>( - new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location)); + std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::make_unique<core::YamlConfiguration>( + test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location); std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo); - std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>( - test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME); + const auto controller = std::make_shared<minifi::FlowController>( + test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME, + std::make_shared<utils::file::FileSystem>(), []{}); core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location); diff --git a/libminifi/test/persistence-tests/PersistenceTests.cpp b/libminifi/test/persistence-tests/PersistenceTests.cpp index d28011596..276a06445 100644 --- a/libminifi/test/persistence-tests/PersistenceTests.cpp +++ b/libminifi/test/persistence-tests/PersistenceTests.cpp @@ -176,7 +176,7 @@ TEST_CASE("Processors Can Store FlowFiles", "[TestP1]") { auto flowConfig = std::make_unique<core::FlowConfiguration>(prov_repo, ff_repository, content_repo, nullptr, config, ""); auto flowController = std::make_shared<minifi::FlowController>( - prov_repo, ff_repository, config, std::move(flowConfig), content_repo, ""); + prov_repo, ff_repository, config, std::move(flowConfig), content_repo, "", std::make_shared<utils::file::FileSystem>(), []{}); { TestFlow flow(ff_repository, content_repo, prov_repo, setupMergeProcessor, MergeContent::Merge); @@ -290,7 +290,7 @@ TEST_CASE("Persisted flowFiles are updated on modification", "[TestP1]") { auto flowConfig = std::make_unique<core::FlowConfiguration>(prov_repo, ff_repository, content_repo, nullptr, config, ""); auto flowController = std::make_shared<minifi::FlowController>( - prov_repo, ff_repository, config, std::move(flowConfig), content_repo, ""); + prov_repo, ff_repository, config, std::move(flowConfig), content_repo, "", std::make_shared<utils::file::FileSystem>(), []{}); { TestFlow flow(ff_repository, content_repo, prov_repo, setupContentUpdaterProcessor, {"success", "d"}); diff --git a/libminifi/test/rocksdb-tests/RepoTests.cpp b/libminifi/test/rocksdb-tests/RepoTests.cpp index 3ca9f950b..5e96aec40 100644 --- a/libminifi/test/rocksdb-tests/RepoTests.cpp +++ b/libminifi/test/rocksdb-tests/RepoTests.cpp @@ -290,7 +290,7 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") { auto flowConfig = std::make_unique<core::FlowConfiguration>(prov_repo, ff_repository, content_repo, nullptr, config, ""); auto flowController = std::make_shared<minifi::FlowController>( - prov_repo, ff_repository, config, std::move(flowConfig), content_repo, ""); + prov_repo, ff_repository, config, std::move(flowConfig), content_repo, "", std::make_shared<utils::file::FileSystem>(), []{}); std::string data = "banana"; minifi::io::BufferStream content(data); diff --git a/libminifi/test/unit/ProvenanceTestHelper.h b/libminifi/test/unit/ProvenanceTestHelper.h index 760904bd8..65c91bf1f 100644 --- a/libminifi/test/unit/ProvenanceTestHelper.h +++ b/libminifi/test/unit/ProvenanceTestHelper.h @@ -244,7 +244,8 @@ class TestFlowController : public org::apache::nifi::minifi::FlowController { TestFlowController(std::shared_ptr<org::apache::nifi::minifi::core::Repository> repo, std::shared_ptr<org::apache::nifi::minifi::core::Repository> flow_file_repo, const std::shared_ptr<org::apache::nifi::minifi::core::ContentRepository>& /*content_repo*/) :org::apache::nifi::minifi::FlowController(repo, flow_file_repo, std::make_shared<org::apache::nifi::minifi::Configure>(), nullptr, - std::make_shared<org::apache::nifi::minifi::core::repository::VolatileContentRepository>(), "") { + std::make_shared<org::apache::nifi::minifi::core::repository::VolatileContentRepository>(), "", + std::make_shared<org::apache::nifi::minifi::utils::file::FileSystem>(), []{}) { } ~TestFlowController() override = default; diff --git a/main/AgentDocs.cpp b/main/AgentDocs.cpp index 222e76a2e..bd5b834af 100644 --- a/main/AgentDocs.cpp +++ b/main/AgentDocs.cpp @@ -1,5 +1,4 @@ /** - * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -34,18 +33,13 @@ #include "core/Relationship.h" #include "io/validation.h" #include "utils/file/FileUtils.h" -#include "agent/build_description.h" #include "agent/agent_docs.h" #include "agent/agent_version.h" -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace docs { +namespace org::apache::nifi::minifi::docs { std::string AgentDocs::extractClassName(const std::string &processor) const { - auto positionOfLastDot = processor.find_last_of("."); + auto positionOfLastDot = processor.find_last_of('.'); if (positionOfLastDot != std::string::npos) { return processor.substr(positionOfLastDot + 1); } @@ -55,7 +49,7 @@ std::string AgentDocs::extractClassName(const std::string &processor) const { void AgentDocs::generate(const std::string &docsdir, std::ostream &genStream) { std::map<std::string, ClassDescription> processorSet; for (const auto &group : minifi::AgentBuild::getExtensions()) { - struct Components descriptions = BuildDescription::getClassDescriptions(group); + struct Components descriptions = build_description_.getClassDescriptions(group); for (const auto &processorName : descriptions.processors_) { processorSet.insert(std::make_pair(extractClassName(processorName.class_name_), processorName)); } @@ -64,18 +58,18 @@ void AgentDocs::generate(const std::string &docsdir, std::ostream &genStream) { const std::string &filename = docsdir + utils::file::get_separator() + processor.first; std::ofstream outfile(filename); - std::string description; - - bool foundDescription = minifi::AgentDocs::getDescription(processor.first, description); - - if (!foundDescription) { - foundDescription = minifi::AgentDocs::getDescription(processor.second.class_name_, description); - } + { + std::string description; + bool foundDescription = minifi::AgentDocs::getDescription(processor.first, description); + if (!foundDescription) { + foundDescription = minifi::AgentDocs::getDescription(processor.second.class_name_, description); + } - outfile << "## " << processor.first << std::endl << std::endl; - if (foundDescription) { - outfile << "### Description " << std::endl << std::endl; - outfile << description << std::endl; + outfile << "## " << processor.first << std::endl << std::endl; + if (foundDescription) { + outfile << "### Description " << std::endl << std::endl; + outfile << description << std::endl; + } } outfile << "### Properties " << std::endl << std::endl; @@ -165,8 +159,4 @@ void AgentDocs::generate(const std::string &docsdir, std::ostream &genStream) { } } -} /* namespace docs */ -} /* namespace minifi */ -} /* namespace nifi */ -} /* namespace apache */ -} /* namespace org */ +} // namespace org::apache::nifi::minifi::docs diff --git a/main/AgentDocs.h b/main/AgentDocs.h index 0402adf11..1b880b3eb 100644 --- a/main/AgentDocs.h +++ b/main/AgentDocs.h @@ -1,5 +1,4 @@ /** - * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -19,26 +18,18 @@ #define MAIN_AGENTDOCS_H_ #include <iostream> +#include "agent/build_description.h" -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace docs { +namespace org::apache::nifi::minifi::docs { class AgentDocs { public: - AgentDocs() = default; - ~AgentDocs() = default; void generate(const std::string &docsdir, std::ostream &genStream); private: - inline std::string extractClassName(const std::string &processor) const; + [[nodiscard]] inline std::string extractClassName(const std::string &processor) const; + BuildDescription build_description_; }; -} /* namespace docs */ -} /* namespace minifi */ -} /* namespace nifi */ -} /* namespace apache */ -} /* namespace org */ +} // namespace org::apache::nifi::minifi::docs #endif // MAIN_AGENTDOCS_H_ diff --git a/main/MiNiFiMain.cpp b/main/MiNiFiMain.cpp index e90b8665d..70afd8085 100644 --- a/main/MiNiFiMain.cpp +++ b/main/MiNiFiMain.cpp @@ -43,6 +43,7 @@ #include <signal.h> #include <sodium.h> +#include <atomic> #include <cstdlib> #include <iostream> #include <memory> @@ -68,8 +69,8 @@ namespace core = minifi::core; namespace utils = minifi::utils; // Variables that allow us to avoid a timed wait. -sem_t *running; -//! Flow Controller +static sem_t *flow_controller_running; +static sem_t *process_running; /** * Removed the stop command from the signal handler so that we could trigger @@ -83,24 +84,27 @@ sem_t *running; #ifdef WIN32 BOOL WINAPI consoleSignalHandler(DWORD signal) { + if (!process_running) { exit(0); return TRUE; } if (signal == CTRL_C_EVENT || signal == CTRL_BREAK_EVENT) { - sem_post(running); - if (sem_wait(running) == -1) - perror("sem_wait"); + int ret = ETIMEDOUT; + while (ret == ETIMEDOUT) { + if (flow_controller_running) { sem_post(flow_controller_running); } + const struct timespec timeout_100ms { .tv_sec = 0, .tv_nsec = 100000000}; + ret = sem_timedwait(process_running, &timeout_100ms); + } + return TRUE; } - - return TRUE; + return FALSE; } void SignalExitProcess() { - sem_post(running); + sem_post(flow_controller_running); } #endif void sigHandler(int signal) { if (signal == SIGINT || signal == SIGTERM) { - // avoid stopping the controller here. - sem_post(running); + sem_post(flow_controller_running); } } @@ -148,20 +152,6 @@ int main(int argc, char **argv) { return -1; } - uint16_t stop_wait_time = STOP_WAIT_TIME_MS; - - std::string graceful_shutdown_seconds; - std::string prov_repo_class = "provenancerepository"; - std::string flow_repo_class = "flowfilerepository"; - std::string nifi_configuration_class_name = "yamlconfiguration"; - std::string content_repo_class = "filesystemrepository"; - - running = sem_open("/MiNiFiMain", O_CREAT, 0644, 0); - if (running == SEM_FAILED || running == 0) { - logger->log_error("could not initialize semaphore"); - perror("initialization failure"); - } - #ifdef WIN32 if (!SetConsoleCtrlHandler(consoleSignalHandler, TRUE)) { logger->log_error("Cannot install signal handler"); @@ -184,237 +174,274 @@ int main(int argc, char **argv) { return -1; } #endif - // Determine MINIFI_HOME const std::string minifiHome = determineMinifiHome(logger); if (minifiHome.empty()) { // determineMinifiHome already logged everything we need return -1; } - // chdir to MINIFI_HOME if (!utils::Environment::setCurrentWorkingDirectory(minifiHome.c_str())) { logger->log_error("Failed to change working directory to MINIFI_HOME (%s)", minifiHome); return -1; } + const auto flow_controller_semaphore_path = "/MiNiFiMain"; + const auto process_semaphore_path = "/MiNiFiProc"; - const auto log_properties = std::make_shared<core::logging::LoggerProperties>(); - log_properties->setHome(minifiHome); - log_properties->loadConfigureFile(DEFAULT_LOG_PROPERTIES_FILE); - core::logging::LoggerConfiguration::getConfiguration().initialize(log_properties); - - std::shared_ptr<minifi::Properties> uid_properties = std::make_shared<minifi::Properties>("UID properties"); - uid_properties->setHome(minifiHome); - uid_properties->loadConfigureFile(DEFAULT_UID_PROPERTIES_FILE); - utils::IdGenerator::getIdGenerator()->initialize(uid_properties); - - // Make a record of minifi home in the configured log file. - logger->log_info("MINIFI_HOME=%s", minifiHome); - - auto decryptor = minifi::Decryptor::create(minifiHome); - if (decryptor) { - logger->log_info("Found encryption key, will decrypt sensitive properties in the configuration"); - } else { - logger->log_info("No encryption key found, will not decrypt sensitive properties in the configuration"); + process_running = sem_open(process_semaphore_path, O_CREAT, 0644, 0); + if (process_running == SEM_FAILED) { + logger->log_error("could not initialize process semaphore"); + perror("sem_open"); + return -1; } - const std::shared_ptr<minifi::Configure> configure = std::make_shared<minifi::Configure>(std::move(decryptor), std::move(log_properties)); - configure->setHome(minifiHome); - configure->loadConfigureFile(DEFAULT_NIFI_PROPERTIES_FILE); - - minifi::core::extension::ExtensionManager::get().initialize(configure); - - if (argc >= 3 && std::string("docs") == argv[1]) { - if (utils::file::create_dir(argv[2]) != 0) { - std::cerr << "Working directory doesn't exist and cannot be created: " << argv[2] << std::endl; - exit(1); - } - - std::cerr << "Dumping docs to " << argv[2] << std::endl; - if (argc == 4) { - std::string filepath, filename; - utils::file::PathUtils::getFileNameAndPath(argv[3], filepath, filename); - if (filepath == argv[2]) { - std::cerr << "Target file should be out of the working directory: " << filepath << std::endl; - exit(1); - } - std::ofstream outref(argv[3]); - dumpDocs(configure, argv[2], outref); - } else { - dumpDocs(configure, argv[2], std::cout); + std::atomic<bool> restart_token{false}; + const auto request_restart = [&] { + if (!restart_token.exchange(true)) { + // only do sem_post if a restart is not already in progress (the flag was unset before the exchange) + sem_post(flow_controller_running); + logger->log_info("Initiating restart..."); } - exit(0); - } - + }; - if (configure->get(minifi::Configure::nifi_graceful_shutdown_seconds, graceful_shutdown_seconds)) { - try { - stop_wait_time = std::stoi(graceful_shutdown_seconds); - } - catch (const std::out_of_range &e) { - logger->log_error("%s is out of range. %s", minifi::Configure::nifi_graceful_shutdown_seconds, e.what()); - } - catch (const std::invalid_argument &e) { - logger->log_error("%s contains an invalid argument set. %s", minifi::Configure::nifi_graceful_shutdown_seconds, e.what()); + do { + flow_controller_running = sem_open(flow_controller_semaphore_path, O_CREAT, 0644, 0); + if (flow_controller_running == SEM_FAILED) { + logger->log_error("could not initialize flow controller semaphore"); + perror("sem_open"); + return -1; } - } - else { - logger->log_debug("%s not set, defaulting to %d", minifi::Configure::nifi_graceful_shutdown_seconds, - STOP_WAIT_TIME_MS); - } - - configure->get(minifi::Configure::nifi_provenance_repository_class_name, prov_repo_class); - // Create repos for flow record and provenance - std::shared_ptr<core::Repository> prov_repo = core::createRepository(prov_repo_class, true, "provenance"); - if (!prov_repo->initialize(configure)) { - logger->log_error("Provenance repository failed to initialize, exiting.."); - exit(1); - } - - configure->get(minifi::Configure::nifi_flow_repository_class_name, flow_repo_class); - - std::shared_ptr<core::Repository> flow_repo = core::createRepository(flow_repo_class, true, "flowfile"); - - if (!flow_repo->initialize(configure)) { - logger->log_error("Flow file repository failed to initialize, exiting.."); - exit(1); - } + uint16_t stop_wait_time = STOP_WAIT_TIME_MS; - configure->get(minifi::Configure::nifi_content_repository_class_name, content_repo_class); + std::string graceful_shutdown_seconds; + std::string prov_repo_class = "provenancerepository"; + std::string flow_repo_class = "flowfilerepository"; + std::string nifi_configuration_class_name = "yamlconfiguration"; + std::string content_repo_class = "filesystemrepository"; - std::shared_ptr<core::ContentRepository> content_repo = core::createContentRepository(content_repo_class, true, "content"); + const auto log_properties = std::make_shared<core::logging::LoggerProperties>(); + log_properties->setHome(minifiHome); + log_properties->loadConfigureFile(DEFAULT_LOG_PROPERTIES_FILE); + core::logging::LoggerConfiguration::getConfiguration().initialize(log_properties); - if (!content_repo->initialize(configure)) { - logger->log_error("Content repository failed to initialize, exiting.."); - exit(1); - } - - std::string content_repo_path; - if (configure->get(minifi::Configure::nifi_dbcontent_repository_directory_default, content_repo_path)) { - core::logging::LOG_INFO(logger) << "setting default dir to " << content_repo_path; - minifi::setDefaultDirectory(content_repo_path); - } + std::shared_ptr<minifi::Properties> uid_properties = std::make_shared<minifi::Properties>("UID properties"); + uid_properties->setHome(minifiHome); + uid_properties->loadConfigureFile(DEFAULT_UID_PROPERTIES_FILE); + utils::IdGenerator::getIdGenerator()->initialize(uid_properties); - configure->get(minifi::Configure::nifi_configuration_class_name, nifi_configuration_class_name); + // Make a record of minifi home in the configured log file. + logger->log_info("MINIFI_HOME=%s", minifiHome); - std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configure); + auto decryptor = minifi::Decryptor::create(minifiHome); + if (decryptor) { + logger->log_info("Found encryption key, will decrypt sensitive properties in the configuration"); + } else { + logger->log_info("No encryption key found, will not decrypt sensitive properties in the configuration"); + } - bool should_encrypt_flow_config = (configure->get(minifi::Configure::nifi_flow_configuration_encrypt) - | utils::flatMap(utils::StringUtils::toBool)).value_or(false); + const std::shared_ptr<minifi::Configure> configure = std::make_shared<minifi::Configure>(std::move(decryptor), std::move(log_properties)); + configure->setHome(minifiHome); + configure->loadConfigureFile(DEFAULT_NIFI_PROPERTIES_FILE); - auto filesystem = std::make_shared<utils::file::FileSystem>( - should_encrypt_flow_config, - utils::crypto::EncryptionProvider::create(minifiHome)); + minifi::core::extension::ExtensionManager::get().initialize(configure); - std::unique_ptr<core::FlowConfiguration> flow_configuration = core::createFlowConfiguration( - prov_repo, flow_repo, content_repo, configure, stream_factory, nifi_configuration_class_name, - configure->get(minifi::Configure::nifi_flow_configuration_file), filesystem); + if (argc >= 3 && std::string("docs") == argv[1]) { + if (utils::file::create_dir(argv[2]) != 0) { + std::cerr << "Working directory doesn't exist and cannot be created: " << argv[2] << std::endl; + exit(1); + } - const auto controller = std::make_unique<minifi::FlowController>( - prov_repo, flow_repo, configure, std::move(flow_configuration), content_repo, filesystem); + std::cerr << "Dumping docs to " << argv[2] << std::endl; + if (argc == 4) { + std::string filepath, filename; + utils::file::PathUtils::getFileNameAndPath(argv[3], filepath, filename); + if (filepath == argv[2]) { + std::cerr << "Target file should be out of the working directory: " << filepath << std::endl; + exit(1); + } + std::ofstream outref(argv[3]); + dumpDocs(configure, argv[2], outref); + } else { + dumpDocs(configure, argv[2], std::cout); + } + exit(0); + } - const bool disk_space_watchdog_enable = (configure->get(minifi::Configure::minifi_disk_space_watchdog_enable) | utils::map([](const std::string& v){ return v == "true"; })).value_or(true); - std::unique_ptr<utils::CallBackTimer> disk_space_watchdog; - if (disk_space_watchdog_enable) { - try { - const auto repo_paths = [&] { - std::vector<std::string> repo_paths; - repo_paths.reserve(3); - // REPOSITORY_DIRECTORY is a dummy path used by noop repositories - const auto path_valid = [](const std::string& p) { return !p.empty() && p != REPOSITORY_DIRECTORY; }; - auto prov_repo_path = prov_repo->getDirectory(); - auto flow_repo_path = flow_repo->getDirectory(); - auto content_repo_storage_path = content_repo->getStoragePath(); - if (!prov_repo->isNoop() && path_valid(prov_repo_path)) { repo_paths.push_back(std::move(prov_repo_path)); } - if (!flow_repo->isNoop() && path_valid(flow_repo_path)) { repo_paths.push_back(std::move(flow_repo_path)); } - if (path_valid(content_repo_storage_path)) { repo_paths.push_back(std::move(content_repo_storage_path)); } - return repo_paths; - }(); - const auto available_spaces = minifi::disk_space_watchdog::check_available_space(repo_paths, logger.get()); - const auto config = minifi::disk_space_watchdog::read_config(*configure); - const auto min_space = [](const std::vector<std::uintmax_t>& spaces) { - const auto it = std::min_element(std::begin(spaces), std::end(spaces)); - return it != spaces.end() ? *it : (std::numeric_limits<std::uintmax_t>::max)(); - }; - if (min_space(available_spaces) <= config.stop_threshold_bytes) { - logger->log_error("Cannot start MiNiFi due to insufficient available disk space"); - return -1; + if (configure->get(minifi::Configure::nifi_graceful_shutdown_seconds, graceful_shutdown_seconds)) { + try { + stop_wait_time = std::stoi(graceful_shutdown_seconds); } - auto interval_switch = minifi::disk_space_watchdog::disk_space_interval_switch(config); - disk_space_watchdog = std::make_unique<utils::CallBackTimer>(config.interval, [interval_switch, min_space, repo_paths, logger, &controller]() mutable { - const auto stop = [&]{ controller->stop(); controller->unload(); }; - const auto restart = [&]{ controller->load(); controller->start(); }; - const auto switch_state = interval_switch(min_space(minifi::disk_space_watchdog::check_available_space(repo_paths, logger.get()))); - if (switch_state.state == utils::IntervalSwitchState::LOWER && switch_state.switched) { - logger->log_warn("Stopping flow controller due to insufficient disk space"); - stop(); - } else if (switch_state.state == utils::IntervalSwitchState::UPPER && switch_state.switched) { - logger->log_info("Restarting flow controller"); - restart(); - } - }); - } catch (const std::runtime_error& error) { - logger->log_error(error.what()); - return -1; + catch (const std::out_of_range& e) { + logger->log_error("%s is out of range. %s", minifi::Configure::nifi_graceful_shutdown_seconds, e.what()); + } + catch (const std::invalid_argument& e) { + logger->log_error("%s contains an invalid argument set. %s", minifi::Configure::nifi_graceful_shutdown_seconds, e.what()); + } + } else { + logger->log_debug("%s not set, defaulting to %d", minifi::Configure::nifi_graceful_shutdown_seconds, + STOP_WAIT_TIME_MS); } - } - logger->log_info("Loading FlowController"); + configure->get(minifi::Configure::nifi_provenance_repository_class_name, prov_repo_class); + // Create repos for flow record and provenance + std::shared_ptr<core::Repository> prov_repo = core::createRepository(prov_repo_class, true, "provenance"); - // Load flow from specified configuration file - try { - controller->load(); - } - catch (std::exception &e) { - logger->log_error("Failed to load configuration due to exception: %s", e.what()); - return -1; - } - catch (...) { - logger->log_error("Failed to load configuration due to unknown exception"); - return -1; - } + if (!prov_repo->initialize(configure)) { + logger->log_error("Provenance repository failed to initialize, exiting.."); + exit(1); + } - // Start Processing the flow - controller->start(); + configure->get(minifi::Configure::nifi_flow_repository_class_name, flow_repo_class); - if (disk_space_watchdog) { disk_space_watchdog->start(); } + std::shared_ptr<core::Repository> flow_repo = core::createRepository(flow_repo_class, true, "flowfile"); - logger->log_info("MiNiFi started"); + if (!flow_repo->initialize(configure)) { + logger->log_error("Flow file repository failed to initialize, exiting.."); + exit(1); + } - /** - * Sem wait provides us the ability to have a controlled - * yield without the need for a more complex construct and - * a spin lock - */ - int ret_val; - while ((ret_val = sem_wait(running)) == -1 && errno == EINTR); - if (ret_val == -1) perror("sem_wait"); + configure->get(minifi::Configure::nifi_content_repository_class_name, content_repo_class); - while ((ret_val = sem_close(running)) == -1 && errno == EINTR); - if (ret_val == -1) perror("sem_close"); + std::shared_ptr<core::ContentRepository> content_repo = core::createContentRepository(content_repo_class, true, "content"); - while ((ret_val = sem_unlink("/MiNiFiMain")) == -1 && errno == EINTR); - if (ret_val == -1) perror("sem_unlink"); + if (!content_repo->initialize(configure)) { + logger->log_error("Content repository failed to initialize, exiting.."); + exit(1); + } - disk_space_watchdog = nullptr; + std::string content_repo_path; + if (configure->get(minifi::Configure::nifi_dbcontent_repository_directory_default, content_repo_path)) { + core::logging::LOG_INFO(logger) << "setting default dir to " << content_repo_path; + minifi::setDefaultDirectory(content_repo_path); + } - /** - * Trigger unload -- wait stop_wait_time - */ - controller->waitUnload(stop_wait_time); + configure->get(minifi::Configure::nifi_configuration_class_name, nifi_configuration_class_name); + + std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configure); + + bool should_encrypt_flow_config = (configure->get(minifi::Configure::nifi_flow_configuration_encrypt) + | utils::flatMap(utils::StringUtils::toBool)).value_or(false); + + auto filesystem = std::make_shared<utils::file::FileSystem>( + should_encrypt_flow_config, + utils::crypto::EncryptionProvider::create(minifiHome)); + + std::unique_ptr<core::FlowConfiguration> flow_configuration = core::createFlowConfiguration( + prov_repo, flow_repo, content_repo, configure, stream_factory, nifi_configuration_class_name, + configure->get(minifi::Configure::nifi_flow_configuration_file), filesystem); + + const auto controller = std::make_unique<minifi::FlowController>( + prov_repo, flow_repo, configure, std::move(flow_configuration), content_repo, filesystem, request_restart); + + const bool disk_space_watchdog_enable = (configure->get(minifi::Configure::minifi_disk_space_watchdog_enable) | utils::map([](const std::string& v) { return v == "true"; })).value_or(true); + std::unique_ptr<utils::CallBackTimer> disk_space_watchdog; + if (disk_space_watchdog_enable) { + try { + const auto repo_paths = [&] { + std::vector<std::string> repo_paths; + repo_paths.reserve(3); + // REPOSITORY_DIRECTORY is a dummy path used by noop repositories + const auto path_valid = [](const std::string& p) { return !p.empty() && p != REPOSITORY_DIRECTORY; }; + auto prov_repo_path = prov_repo->getDirectory(); + auto flow_repo_path = flow_repo->getDirectory(); + auto content_repo_storage_path = content_repo->getStoragePath(); + if (!prov_repo->isNoop() && path_valid(prov_repo_path)) { repo_paths.push_back(std::move(prov_repo_path)); } + if (!flow_repo->isNoop() && path_valid(flow_repo_path)) { repo_paths.push_back(std::move(flow_repo_path)); } + if (path_valid(content_repo_storage_path)) { repo_paths.push_back(std::move(content_repo_storage_path)); } + return repo_paths; + }(); + const auto available_spaces = minifi::disk_space_watchdog::check_available_space(repo_paths, logger.get()); + const auto config = minifi::disk_space_watchdog::read_config(*configure); + const auto min_space = [](const std::vector<std::uintmax_t>& spaces) { + const auto it = std::min_element(std::begin(spaces), std::end(spaces)); + return it != spaces.end() ? *it : (std::numeric_limits<std::uintmax_t>::max)(); + }; + if (min_space(available_spaces) <= config.stop_threshold_bytes) { + logger->log_error("Cannot start MiNiFi due to insufficient available disk space"); + return -1; + } + auto interval_switch = minifi::disk_space_watchdog::disk_space_interval_switch(config); + disk_space_watchdog = std::make_unique<utils::CallBackTimer>(config.interval, [interval_switch, min_space, repo_paths, logger, &controller]() mutable { + const auto stop = [&] { + controller->stop(); + controller->unload(); + }; + const auto restart = [&] { + controller->load(); + controller->start(); + }; + const auto switch_state = interval_switch(min_space(minifi::disk_space_watchdog::check_available_space(repo_paths, logger.get()))); + if (switch_state.state == utils::IntervalSwitchState::LOWER && switch_state.switched) { + logger->log_warn("Stopping flow controller due to insufficient disk space"); + stop(); + } else if (switch_state.state == utils::IntervalSwitchState::UPPER && switch_state.switched) { + logger->log_info("Restarting flow controller"); + restart(); + } + }); + } catch (const std::runtime_error& error) { + logger->log_error(error.what()); + return -1; + } + } - controller->stopC2(); + logger->log_info("Loading FlowController"); - flow_repo = nullptr; + // Load flow from specified configuration file + try { + controller->load(); + } + catch (std::exception& e) { + logger->log_error("Failed to load configuration due to exception: %s", e.what()); + return -1; + } + catch (...) { + logger->log_error("Failed to load configuration due to unknown exception"); + return -1; + } - prov_repo = nullptr; + // Start Processing the flow + controller->start(); + + if (disk_space_watchdog) { disk_space_watchdog->start(); } + + logger->log_info("MiNiFi started"); + + /** + * Sem wait provides us the ability to have a controlled + * yield without the need for a more complex construct and + * a spin lock + */ + int ret_val; + while ((ret_val = sem_wait(flow_controller_running)) == -1 && errno == EINTR) {} + if (ret_val == -1) perror("sem_wait"); + + while ((ret_val = sem_close(flow_controller_running)) == -1 && errno == EINTR) {} + if (ret_val == -1) perror("sem_close"); + flow_controller_running = nullptr; + + while ((ret_val = sem_unlink(flow_controller_semaphore_path)) == -1 && errno == EINTR) {} + if (ret_val == -1) perror("sem_unlink"); + + disk_space_watchdog = nullptr; + + /** + * Trigger unload -- wait stop_wait_time + */ + controller->waitUnload(stop_wait_time); + controller->stopC2(); + flow_repo = nullptr; + prov_repo = nullptr; + } while ([&] { + const auto restart_token_temp = restart_token.exchange(false); + if (restart_token_temp) { + logger->log_info("Restarting MiNiFi"); + } + return restart_token_temp; + }()); + if (process_running) { sem_post(process_running); } logger->log_info("MiNiFi exit"); - -#ifdef WIN32 - sem_post(running); -#endif - return 0; } diff --git a/nanofi/src/cxx/C2CallbackAgent.cpp b/nanofi/src/cxx/C2CallbackAgent.cpp index 9a7c755c9..d4af51260 100644 --- a/nanofi/src/cxx/C2CallbackAgent.cpp +++ b/nanofi/src/cxx/C2CallbackAgent.cpp @@ -28,15 +28,11 @@ #include "core/logging/LoggerConfiguration.h" #include "utils/file/FileUtils.h" #include "utils/file/FileManager.h" -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace c2 { +namespace org::apache::nifi::minifi::c2 { C2CallbackAgent::C2CallbackAgent(core::controller::ControllerServiceProvider* controller, state::Pausable* pause_handler, state::StateMonitor* updateSink, const std::shared_ptr<Configure> &configuration) - : C2Agent(controller, pause_handler, updateSink, configuration), + : C2Agent(controller, pause_handler, updateSink, configuration, std::make_shared<utils::file::FileSystem>(), []{}), stop(nullptr) { } @@ -72,8 +68,4 @@ void C2CallbackAgent::handle_c2_server_response(const C2ContentResponse &resp) { } } -} /* namespace c2 */ -} /* namespace minifi */ -} /* namespace nifi */ -} /* namespace apache */ -} /* namespace org */ +} /* namespace org::apache::nifi::minifi::c2 */
