MINIFI-226: Add controller services capabilities along with unit tests Fix test failures Update Travis YML Update readme to link to MiNiFi licensing information
MINIFI-253 : Add basic Process Loader capability This closes #83. Signed-off-by: Aldrin Piri <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/c9940e94 Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/c9940e94 Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/c9940e94 Branch: refs/heads/master Commit: c9940e9454b02408a75ac47daae78d99cb9ef13d Parents: 372f2d6 Author: Marc Parisi <[email protected]> Authored: Sat Apr 1 08:20:42 2017 -0400 Committer: Aldrin Piri <[email protected]> Committed: Thu May 18 08:59:57 2017 -0400 ---------------------------------------------------------------------- .gitignore | 3 +- .travis.yml | 2 +- CMakeLists.txt | 72 ++-- README.md | 31 ++ libminifi/CMakeLists.txt | 3 +- libminifi/include/EventDrivenSchedulingAgent.h | 10 +- libminifi/include/FlowController.h | 132 +++++- libminifi/include/SchedulingAgent.h | 33 +- libminifi/include/ThreadedSchedulingAgent.h | 11 +- libminifi/include/TimerDrivenSchedulingAgent.h | 7 +- .../include/controllers/SSLContextService.h | 174 ++++++++ libminifi/include/core/ClassLoader.h | 259 ++++++++++++ libminifi/include/core/ConfigurableComponent.h | 11 + libminifi/include/core/ConfigurationFactory.h | 7 +- libminifi/include/core/Connectable.h | 12 +- libminifi/include/core/Core.h | 3 +- libminifi/include/core/FlowConfiguration.h | 31 +- libminifi/include/core/ProcessContext.h | 59 ++- libminifi/include/core/ProcessGroup.h | 23 ++ libminifi/include/core/Property.h | 29 +- libminifi/include/core/Resource.h | 54 +++ .../include/core/controller/ControllerService.h | 142 +++++++ .../core/controller/ControllerServiceLookup.h | 89 ++++ .../core/controller/ControllerServiceMap.h | 120 ++++++ .../core/controller/ControllerServiceNode.h | 133 ++++++ .../core/controller/ControllerServiceProvider.h | 306 ++++++++++++++ .../controller/StandardControllerServiceNode.h | 107 +++++ .../StandardControllerServiceProvider.h | 229 +++++++++++ libminifi/include/core/yaml/YamlConfiguration.h | 54 ++- libminifi/include/io/StreamFactory.h | 6 +- libminifi/include/io/TLSSocket.h | 197 +++++++++ libminifi/include/io/validation.h | 14 +- libminifi/include/processors/AppendHostInfo.h | 4 + libminifi/include/processors/ExecuteProcess.h | 3 + libminifi/include/processors/GenerateFlowFile.h | 3 + libminifi/include/processors/GetFile.h | 3 + libminifi/include/processors/InvokeHTTP.h | 30 +- libminifi/include/processors/ListenHTTP.h | 6 +- libminifi/include/processors/ListenSyslog.h | 3 + libminifi/include/processors/LoadProcessors.h | 34 ++ libminifi/include/processors/LogAttribute.h | 3 + libminifi/include/processors/PutFile.h | 3 + libminifi/include/processors/TailFile.h | 6 +- libminifi/include/properties/Configure.h | 11 +- libminifi/include/utils/ThreadPool.h | 355 ++++++++-------- libminifi/src/Configure.cpp | 17 +- libminifi/src/EventDrivenSchedulingAgent.cpp | 4 +- libminifi/src/FlowController.cpp | 233 ++++++++++- libminifi/src/SchedulingAgent.cpp | 35 +- libminifi/src/ThreadedSchedulingAgent.cpp | 26 +- libminifi/src/TimerDrivenSchedulingAgent.cpp | 4 +- libminifi/src/controllers/SSLContextService.cpp | 226 ++++++++++ libminifi/src/core/ClassLoader.cpp | 70 ++++ libminifi/src/core/ConfigurableComponent.cpp | 42 +- libminifi/src/core/ConfigurationFactory.cpp | 13 +- libminifi/src/core/Connectable.cpp | 6 +- libminifi/src/core/FlowConfiguration.cpp | 66 +-- libminifi/src/core/ProcessGroup.cpp | 16 + libminifi/src/core/Processor.cpp | 20 +- libminifi/src/core/Property.cpp | 24 +- .../core/controller/ControllerServiceNode.cpp | 49 +++ .../controller/ControllerServiceProvider.cpp | 50 +++ .../StandardControllerServiceNode.cpp | 69 ++++ libminifi/src/core/yaml/YamlConfiguration.cpp | 212 +++++++--- libminifi/src/processors/GetFile.cpp | 4 + libminifi/src/processors/InvokeHTTP.cpp | 75 ++-- libminifi/test/FlowFileRecordTest.cpp | 28 -- libminifi/test/HttpGetIntegrationTest.cpp | 120 ------ libminifi/test/HttpPostIntegrationTest.cpp | 120 ------ libminifi/test/ProcessorTests.cpp | 408 ------------------ libminifi/test/SocketTests.cpp | 185 --------- libminifi/test/TestExecuteProcess.cpp | 131 ------ .../ControllerServiceIntegrationTests.cpp | 186 +++++++++ .../test/integration/HttpGetIntegrationTest.cpp | 128 ++++++ .../integration/HttpPostIntegrationTest.cpp | 126 ++++++ libminifi/test/integration/ProcessorTests.cpp | 411 +++++++++++++++++++ libminifi/test/integration/SocketTests.cpp | 185 +++++++++ .../test/integration/TestExecuteProcess.cpp | 137 +++++++ .../test/resources/TestControllerServices.yml | 62 +++ libminifi/test/resources/TestHTTPGet.yml | 73 ++++ libminifi/test/resources/TestHTTPGetSecure.yml | 88 ++++ libminifi/test/resources/TestHTTPPost.yml | 87 ++++ libminifi/test/resources/cn.ckey.pem | 31 ++ libminifi/test/resources/cn.crt.pem | 25 ++ libminifi/test/resources/cn.pass | 0 libminifi/test/resources/nifi-cert.pem | 20 + libminifi/test/unit/ClassLoaderTests.cpp | 35 ++ libminifi/test/unit/ControllerServiceTests.cpp | 89 ++++ libminifi/test/unit/InvokeHTTPTests.cpp | 198 ++++----- libminifi/test/unit/MockClasses.h | 139 +++++++ libminifi/test/unit/ProvenanceTestHelper.h | 6 +- libminifi/test/unit/ProvenanceTests.cpp | 2 +- libminifi/test/unit/RepoTests.cpp | 2 +- libminifi/test/unit/SerializationTests.cpp | 3 +- libminifi/test/unit/Site2SiteTests.cpp | 3 +- libminifi/test/unit/ThreadPoolTests.cpp | 38 ++ libminifi/test/unit/YamlCongifurationTests.cpp | 4 +- libminifi/test/unit/resource/TestHTTPGet.yml | 73 ---- libminifi/test/unit/resource/TestHTTPPost.yml | 87 ---- main/CMakeLists.txt | 2 +- main/MiNiFiMain.cpp | 4 +- 101 files changed, 5593 insertions(+), 1731 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index 2da62d5..0f533cc 100644 --- a/.gitignore +++ b/.gitignore @@ -22,7 +22,8 @@ bin target thirdparty/**/*.o thirdparty/**/*.a +libminifi/test/**/*.a docs/generated # Ignore source files that have been placed in the docker directory during build -docker/minificppsource \ No newline at end of file +docker/minificppsource http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/.travis.yml ---------------------------------------------------------------------- diff --git a/.travis.yml b/.travis.yml index 0fa8aec..b93301c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -63,4 +63,4 @@ matrix: - package='graphviz'; [[ $(brew ls --versions ${package}) ]] && { brew outdated ${package} || brew upgrade ${package}; } || brew install ${package} script: - - mkdir ./build && cd ./build && cmake .. && make && make test && make linter && make docs + - mkdir ./build && cd ./build && cmake .. && make VERBOSE=1 && make test ARGS="-V" && make linter && make docs http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/CMakeLists.txt b/CMakeLists.txt index 066169d..e441c44 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -143,29 +143,48 @@ include(CPack) enable_testing(test) file(GLOB LIBMINIFI_TEST_SOURCES "libminifi/test/unit/*.cpp") - add_executable(tests ${LIBMINIFI_TEST_SOURCES} ${SPD_SOURCES}) - target_include_directories(tests PRIVATE BEFORE "thirdparty/catch") - target_include_directories(tests PRIVATE BEFORE "thirdparty/yaml-cpp-yaml-cpp-0.5.3/include") - target_include_directories(tests PRIVATE BEFORE "thirdparty/jsoncpp/include") - target_include_directories(tests PRIVATE BEFORE ${LEVELDB_INCLUDE_DIRS}) - target_include_directories(tests PRIVATE BEFORE "include") - target_include_directories(tests PRIVATE BEFORE "libminifi/include/") - target_include_directories(tests PRIVATE BEFORE "libminifi/include/core") - target_include_directories(tests PRIVATE BEFORE "libminifi/include/core/repository") - target_include_directories(tests PRIVATE BEFORE "libminifi/include/core/yaml") - target_include_directories(tests PRIVATE BEFORE "libminifi/include/io") - target_include_directories(tests PRIVATE BEFORE "libminifi/include/utils") - target_include_directories(tests PRIVATE BEFORE "libminifi/include/processors") - target_include_directories(tests PRIVATE BEFORE "libminifi/include/provenance") - target_link_libraries(tests ${CMAKE_THREAD_LIBS_INIT} ${UUID_LIBRARIES} ${LEVELDB_LIBRARIES} ${OPENSSL_LIBRARIES} minifi yaml-cpp c-library civetweb-cpp ${JSON_CPP_LIB}) + add_executable(unit_tests ${LIBMINIFI_TEST_SOURCES} ${SPD_SOURCES}) + target_include_directories(unit_tests PRIVATE BEFORE "thirdparty/catch") + target_include_directories(unit_tests PRIVATE BEFORE "thirdparty/yaml-cpp-yaml-cpp-0.5.3/include") + target_include_directories(unit_tests PRIVATE BEFORE "thirdparty/jsoncpp/include") + target_include_directories(unit_tests PRIVATE BEFORE ${LEVELDB_INCLUDE_DIRS}) + target_include_directories(unit_tests PRIVATE BEFORE "include") + target_include_directories(unit_tests PRIVATE BEFORE "libminifi/include/") + target_include_directories(unit_tests PRIVATE BEFORE "libminifi/include/core") + target_include_directories(unit_tests PRIVATE BEFORE "libminifi/include/core/controller") + target_include_directories(unit_tests PRIVATE BEFORE "libminifi/include/core/repository") + target_include_directories(unit_tests PRIVATE BEFORE "libminifi/include/core/yaml") + target_include_directories(unit_tests PRIVATE BEFORE "libminifi/include/io") + target_include_directories(unit_tests PRIVATE BEFORE "libminifi/include/utils") + target_include_directories(unit_tests PRIVATE BEFORE "libminifi/include/processors") + target_include_directories(unit_tests PRIVATE BEFORE "libminifi/include/provenance") + target_link_libraries(unit_tests ${CMAKE_THREAD_LIBS_INIT} ${UUID_LIBRARIES} ${LEVELDB_LIBRARIES} ${OPENSSL_LIBRARIES} minifi yaml-cpp c-library civetweb-cpp ${JSON_CPP_LIB}) if (CURL_FOUND) - target_include_directories(tests PRIVATE BEFORE ${CURL_INCLUDE_DIRS}) - target_link_libraries(tests ${CURL_LIBRARIES}) + target_include_directories(unit_tests PRIVATE BEFORE ${CURL_INCLUDE_DIRS}) + target_link_libraries(unit_tests ${CURL_LIBRARIES}) endif(CURL_FOUND) - add_test(NAME LibMinifiTests COMMAND tests) + add_test(NAME LibMinifiUnitTests COMMAND unit_tests) + + + file(GLOB LIBMINIFI_TEST_CS "libminifi/test/integration/ControllerServiceIntegrationTests.cpp") + add_executable(testControllerServices ${LIBMINIFI_TEST_CS} ${SPD_SOURCES}) + target_include_directories(testControllerServices PRIVATE BEFORE "thirdparty/catch") + target_include_directories(testControllerServices PRIVATE BEFORE "thirdparty/yaml-cpp-yaml-cpp-0.5.3/include") + target_include_directories(testControllerServices PRIVATE BEFORE ${LEVELDB_INCLUDE_DIRS}) + target_include_directories(testControllerServices PRIVATE BEFORE "include") + target_include_directories(testControllerServices PRIVATE BEFORE "libminifi/include/") + target_include_directories(testControllerServices PRIVATE BEFORE "libminifi/include/core") + target_include_directories(testControllerServices PRIVATE BEFORE "libminifi/include/core/controller") + target_include_directories(testControllerServices PRIVATE BEFORE "libminifi/include/core/repository") + target_include_directories(testControllerServices PRIVATE BEFORE "libminifi/include/io") + target_include_directories(testControllerServices PRIVATE BEFORE "libminifi/include/utils") + target_include_directories(testControllerServices PRIVATE BEFORE "libminifi/include/processors") + target_include_directories(testControllerServices PRIVATE BEFORE "libminifi/include/provenance") + target_link_libraries(testControllerServices ${CMAKE_THREAD_LIBS_INIT} ${UUID_LIBRARIES} ${LEVELDB_LIBRARIES} ${OPENSSL_LIBRARIES} minifi yaml-cpp c-library civetweb-cpp ${JSON_CPP_LIB}) + add_test(NAME TestControllerServices COMMAND testControllerServices "${CMAKE_SOURCE_DIR}/libminifi/test/resources/TestControllerServices.yml" "${CMAKE_SOURCE_DIR}/libminifi/test/resources/") - file(GLOB LIBMINIFI_TEST_CS "libminifi/test/HttpGetIntegrationTest.cpp") + file(GLOB LIBMINIFI_TEST_CS "libminifi/test/integration/HttpGetIntegrationTest.cpp") add_executable(testHttpGet ${LIBMINIFI_TEST_CS} ${SPD_SOURCES}) target_include_directories(testHttpGet PRIVATE BEFORE "thirdparty/catch") target_include_directories(testHttpGet PRIVATE BEFORE "thirdparty/yaml-cpp-yaml-cpp-0.5.3/include") @@ -184,9 +203,10 @@ enable_testing(test) target_include_directories(testHttpGet PRIVATE BEFORE ${CURL_INCLUDE_DIRS}) target_link_libraries(testHttpGet ${CURL_LIBRARIES}) endif(CURL_FOUND) - add_test(NAME testHttpGet COMMAND testHttpGet "${CMAKE_SOURCE_DIR}/libminifi/test/unit/resource/TestHTTPGet.yml") + add_test(NAME testHttpGet COMMAND testHttpGet "${CMAKE_SOURCE_DIR}/libminifi/test/resources/TestHTTPGet.yml" "${CMAKE_SOURCE_DIR}/libminifi/test/resources/") + add_test(NAME testHttpGetSecure COMMAND testHttpGet "${CMAKE_SOURCE_DIR}/libminifi/test/resources/TestHTTPGetSecure.yml" "${CMAKE_SOURCE_DIR}/libminifi/test/resources/") - file(GLOB LIBMINIFI_TEST_CS "libminifi/test/HttpPostIntegrationTest.cpp") + file(GLOB LIBMINIFI_TEST_CS "libminifi/test/integration/HttpPostIntegrationTest.cpp") add_executable(testHttpPost ${LIBMINIFI_TEST_CS} ${SPD_SOURCES}) target_include_directories(testHttpPost PRIVATE BEFORE "thirdparty/catch") target_include_directories(testHttpPost PRIVATE BEFORE "thirdparty/yaml-cpp-yaml-cpp-0.5.3/include") @@ -205,16 +225,17 @@ enable_testing(test) target_include_directories(testHttpPost PRIVATE BEFORE ${CURL_INCLUDE_DIRS}) target_link_libraries(testHttpPost ${CURL_LIBRARIES}) endif(CURL_FOUND) - add_test(NAME testHttpPost COMMAND testHttpPost "${CMAKE_SOURCE_DIR}/libminifi/test/unit/resource/TestHTTPPost.yml") + add_test(NAME testHttpPost COMMAND testHttpPost "${CMAKE_SOURCE_DIR}/libminifi/test/resources/TestHTTPPost.yml") - file(GLOB LIBMINIFI_TEST_EXECUTE_PROCESS "libminifi/test/TestExecuteProcess.cpp") + file(GLOB LIBMINIFI_TEST_EXECUTE_PROCESS "libminifi/test/integration/TestExecuteProcess.cpp") add_executable(testExecuteProcess ${LIBMINIFI_TEST_EXECUTE_PROCESS} ${SPD_SOURCES}) target_include_directories(testExecuteProcess PRIVATE BEFORE "thirdparty/yaml-cpp-yaml-cpp-0.5.3/include") target_include_directories(testExecuteProcess PRIVATE BEFORE ${LEVELDB_INCLUDE_DIRS}) target_include_directories(testExecuteProcess PRIVATE BEFORE "include") target_include_directories(testExecuteProcess PRIVATE BEFORE "libminifi/include/") target_include_directories(testExecuteProcess PRIVATE BEFORE "libminifi/include/core") + target_include_directories(testExecuteProcess PRIVATE BEFORE "libminifi/include/core/controller") target_include_directories(testExecuteProcess PRIVATE BEFORE "libminifi/include/core/repository") target_include_directories(testExecuteProcess PRIVATE BEFORE "libminifi/include/core/yaml") target_include_directories(testExecuteProcess PRIVATE BEFORE "libminifi/include/io") @@ -228,7 +249,7 @@ enable_testing(test) target_link_libraries(testExecuteProcess ${CMAKE_THREAD_LIBS_INIT} ${UUID_LIBRARIES} ${LEVELDB_LIBRARIES} ${OPENSSL_LIBRARIES} minifi yaml-cpp c-library civetweb-cpp ${JSON_CPP_LIB}) add_test(NAME ExecuteProcess COMMAND testExecuteProcess) - file(GLOB LIBMINIFI_TEST_SOCKETS "libminifi/test/SocketTests.cpp") + file(GLOB LIBMINIFI_TEST_SOCKETS "libminifi/test/integration/SocketTests.cpp") add_executable(testSockets ${LIBMINIFI_TEST_SOCKETS} ${SPD_SOURCES}) target_include_directories(testSockets PRIVATE BEFORE "thirdparty/catch") target_include_directories(testSockets PRIVATE BEFORE "thirdparty/yaml-cpp-yaml-cpp-0.5.3/include") @@ -248,7 +269,7 @@ enable_testing(test) target_link_libraries(testSockets ${CMAKE_THREAD_LIBS_INIT} ${UUID_LIBRARIES} ${LEVELDB_LIBRARIES} ${OPENSSL_LIBRARIES} minifi yaml-cpp c-library civetweb-cpp ${JSON_CPP_LIB}) add_test(NAME TestSockets COMMAND testSockets) - file(GLOB LIBMINIFI_TEST_PROCESSORS "libminifi/test/ProcessorTests.cpp") + file(GLOB LIBMINIFI_TEST_PROCESSORS "libminifi/test/integration/ProcessorTests.cpp") add_executable(testProcessors ${LIBMINIFI_TEST_PROCESSORS} ${SPD_SOURCES}) target_include_directories(testProcessors PRIVATE BEFORE "thirdparty/catch") target_include_directories(testProcessors PRIVATE BEFORE "thirdparty/yaml-cpp-yaml-cpp-0.5.3/include") @@ -271,6 +292,7 @@ enable_testing(test) add_test(NAME TestProcessors COMMAND testProcessors) + # Create a custom build target called "docker" that will invoke DockerBuild.sh and create the NiFi-MiNiFi-CPP Docker image add_custom_target( docker http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/README.md ---------------------------------------------------------------------- diff --git a/README.md b/README.md index a8b95b7..df89fb8 100644 --- a/README.md +++ b/README.md @@ -310,6 +310,24 @@ Additionally, users can utilize the MiNiFi Toolkit Converter (version 0.0.1 - sc host: localhost port uuid: 471deef6-2a6e-4a7d-912a-81cc17e3a204 batch size: 100 + +### Controller Services + If you need to reference a controller service in your config.yml file, use the following template. In the example, below, ControllerServiceClass is the name of the class defining the controller Service. ControllerService1 + is linked to ControllerService2, and requires the latter to be started for ControllerService1 to start. + + Controller Services: + - name: ControllerService1 + id: 2438e3c8-015a-1000-79ca-83af40ec1974 + class: ControllerServiceClass + Properties: + Property one: value + Linked Services: + - value: ControllerService2 + - name: ControllerService2 + id: 2438e3c8-015a-1000-79ca-83af40ec1992 + class: ControllerServiceClass + Properties: + ### Running After completing a [build](#building), the application can be run by issuing the following from : @@ -341,6 +359,16 @@ guide. It is located [here](https://github.com/google/styleguide/blob/gh-pages/e New contributions are expected to follow the Google style guide when it is reasonable. Additionally, all new files must include a copy of the Apache License Header. +MiNiFi C++ contains a dynamic loading mechanism that loads arbitrary objects. To maintain +consistency of development amongst the NiFi ecosystem, it is called a class loader. If you +are contributing a custom Processor or Controller Service, the mechanism to register your class +into the default class loader is a pragma definition named: + + REGISTER_RESOURCE(CLASSNAME); + +To use this include REGISTER_RESOURCE(YourClassName); in your header file. The default class +loader will make instnaces of YourClassName available for inclusion. + Once you have completed your changes, including source code and tests, you can verify that you follow the Google style guide by running the following command: $ make linter. @@ -350,6 +378,9 @@ This will provide output for all source files. Except as otherwise noted this software is licensed under the [Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.html) +For additional information regarding the source of included projects and +the corresponding licenses, you may visit the following [website](https://cwiki.apache.org/confluence/display/MINIFI/Licensing+Information) + Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt index a8da0e0..7e260d3 100644 --- a/libminifi/CMakeLists.txt +++ b/libminifi/CMakeLists.txt @@ -61,7 +61,8 @@ include_directories(../thirdparty/civetweb-1.9.1/include) include_directories(../thirdparty/jsoncpp/include) include_directories(include) -file(GLOB SOURCES "src/core/logging/*.cpp" "src/io/*.cpp" "src/io/tls/*.cpp" "src/core/*.cpp" "src/core/repository/*.cpp" "src/core/yaml/*.cpp" "src/core/reporting/*.cpp" "src/provenance/*.cpp" "src/processors/*.cpp" "src/*.cpp") +file(GLOB SOURCES "src/core/logging/*.cpp" "src/io/*.cpp" "src/io/tls/*.cpp" "src/core/controller/*.cpp" "src/controllers/*.cpp" "src/core/*.cpp" "src/core/repository/*.cpp" "src/core/yaml/*.cpp" "src/core/reporting/*.cpp" "src/provenance/*.cpp" "src/processors/*.cpp" "src/*.cpp") + file(GLOB SPD_SOURCES "../include/spdlog/*") # Workaround the limitations of having a http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/include/EventDrivenSchedulingAgent.h ---------------------------------------------------------------------- diff --git a/libminifi/include/EventDrivenSchedulingAgent.h b/libminifi/include/EventDrivenSchedulingAgent.h index 22a68f3..2e49ddf 100644 --- a/libminifi/include/EventDrivenSchedulingAgent.h +++ b/libminifi/include/EventDrivenSchedulingAgent.h @@ -36,10 +36,14 @@ class EventDrivenSchedulingAgent : public ThreadedSchedulingAgent { public: // Constructor /*! - * Create a new processor + * Create a new event driven scheduling agent. */ - EventDrivenSchedulingAgent(std::shared_ptr<core::Repository> repo, std::shared_ptr<Configure> configure) - : ThreadedSchedulingAgent(repo, configure) { + EventDrivenSchedulingAgent( + std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, + std::shared_ptr<core::Repository> repo, + std::shared_ptr<Configure> configuration) + : ThreadedSchedulingAgent(controller_service_provider, repo, + configuration) { } // Destructor virtual ~EventDrivenSchedulingAgent() { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/include/FlowController.h ---------------------------------------------------------------------- diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h index 187448b..6c87653 100644 --- a/libminifi/include/FlowController.h +++ b/libminifi/include/FlowController.h @@ -38,6 +38,8 @@ #include "core/ProcessSession.h" #include "core/ProcessGroup.h" #include "core/FlowConfiguration.h" +#include "core/controller/ControllerServiceNode.h" +#include "core/controller/ControllerServiceProvider.h" #include "TimerDrivenSchedulingAgent.h" #include "EventDrivenSchedulingAgent.h" #include "FlowControlProtocol.h" @@ -56,7 +58,8 @@ namespace minifi { * Flow Controller class. Generally used by FlowController factory * as a singleton. */ -class FlowController : public core::CoreComponent { +class FlowController : public core::controller::ControllerServiceProvider, + public std::enable_shared_from_this<FlowController> { public: static const int DEFAULT_MAX_TIMER_DRIVEN_THREAD = 10; static const int DEFAULT_MAX_EVENT_DRIVEN_THREAD = 5; @@ -133,6 +136,122 @@ class FlowController : public core::CoreComponent { protocol_->setSerialNumber(number); } + /** + * Creates a controller service through the controller service provider impl. + * @param type class name + * @param id service identifier + * @param firstTimeAdded first time this CS was added + */ + virtual std::shared_ptr<core::controller::ControllerServiceNode> createControllerService( + const std::string &type, const std::string &id, + bool firstTimeAdded); + + /** + * controller service provider + */ + /** + * removes controller service + * @param serviceNode service node to be removed. + */ + + virtual void removeControllerService( + const std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode); + + /** + * Enables the controller service services + * @param serviceNode service node which will be disabled, along with linked services. + */ + virtual void enableControllerService( + std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode); + + /** + * Enables controller services + * @param serviceNoden vector of service nodes which will be enabled, along with linked services. + */ + virtual void enableControllerServices( + std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> serviceNodes); + + /** + * Disables controller services + * @param serviceNode service node which will be disabled, along with linked services. + */ + virtual void disableControllerService( + std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode); + + /** + * Gets all controller services. + */ + virtual std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> getAllControllerServices(); + + /** + * Gets controller service node specified by <code>id</code> + * @param id service identifier + * @return shared pointer to the controller service node or nullptr if it does not exist. + */ + virtual std::shared_ptr<core::controller::ControllerServiceNode> getControllerServiceNode( + const std::string &id); + + virtual void verifyCanStopReferencingComponents( + std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode); + + /** + * Unschedules referencing components. + */ + virtual std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> unscheduleReferencingComponents( + std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode); + + /** + * Verify can disable referencing components + * @param serviceNode service node whose referenced components will be scheduled. + */ + virtual void verifyCanDisableReferencingServices( + std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode); + + /** + * Disables referencing components + * @param serviceNode service node whose referenced components will be scheduled. + */ + virtual std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> disableReferencingServices( + std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode); + + /** + * Verify can enable referencing components + * @param serviceNode service node whose referenced components will be scheduled. + */ + virtual void verifyCanEnableReferencingServices( + std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode); + + /** + * Determines if the controller service specified by identifier is enabled. + */ + bool isControllerServiceEnabled(const std::string &identifier); + + /** + * Enables referencing components + * @param serviceNode service node whose referenced components will be scheduled. + */ + virtual std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> enableReferencingServices( + std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode); + + /** + * Schedules referencing components + * @param serviceNode service node whose referenced components will be scheduled. + */ + virtual std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> scheduleReferencingComponents( + std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode); + + /** + * Returns controller service components referenced by serviceIdentifier from the embedded + * controller service provider; + */ + std::shared_ptr<core::controller::ControllerService> getControllerServiceForComponent( + const std::string &serviceIdentifier, const std::string &componentId); + + /** + * Enables all controller services for the provider. + */ + virtual void enableAllControllerServices(); + protected: // function to load the flow file repo. @@ -151,7 +270,7 @@ class FlowController : public core::CoreComponent { // NiFi property File Name std::string properties_file_name_; // Root Process Group - std::unique_ptr<core::ProcessGroup> root_; + std::shared_ptr<core::ProcessGroup> root_; // MAX Timer Driven Threads int max_timer_driven_threads_; // MAX Event Driven Threads @@ -171,9 +290,9 @@ class FlowController : public core::CoreComponent { // Flow Engines // Flow Timer Scheduler - TimerDrivenSchedulingAgent _timerScheduler; + std::shared_ptr<TimerDrivenSchedulingAgent> timer_scheduler_; // Flow Event Scheduler - EventDrivenSchedulingAgent _eventScheduler; + std::shared_ptr<EventDrivenSchedulingAgent> event_scheduler_; // Controller Service // Config // Site to Site Server Listener @@ -181,6 +300,11 @@ class FlowController : public core::CoreComponent { // FlowControl Protocol FlowControlProtocol *protocol_; + std::shared_ptr<Configure> configuration_; + + std::shared_ptr<core::controller::ControllerServiceMap> controller_service_map_; + + std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider_; // flow configuration object. std::unique_ptr<core::FlowConfiguration> flow_configuration_; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/include/SchedulingAgent.h ---------------------------------------------------------------------- diff --git a/libminifi/include/SchedulingAgent.h b/libminifi/include/SchedulingAgent.h index 30df071..1198896 100644 --- a/libminifi/include/SchedulingAgent.h +++ b/libminifi/include/SchedulingAgent.h @@ -28,6 +28,7 @@ #include <algorithm> #include <thread> #include "utils/TimeUtil.h" +#include "utils/ThreadPool.h" #include "core/Core.h" #include "core/logging/Logger.h" #include "properties/Configure.h" @@ -35,6 +36,8 @@ #include "core/logging/Logger.h" #include "core/Processor.h" #include "core/ProcessContext.h" +#include "core/controller/ControllerServiceProvider.h" +#include "core/controller/ControllerServiceNode.h" #include "provenance/ProvenanceRepository.h" namespace org { @@ -47,12 +50,22 @@ class SchedulingAgent { public: // Constructor /*! - * Create a new processor + * Create a new scheduling agent. */ - SchedulingAgent(std::shared_ptr<core::Repository> repo) { + SchedulingAgent( + std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, + std::shared_ptr<core::Repository> repo, std::shared_ptr<Configure> configuration) + : configure_(configuration), + admin_yield_duration_(0), + bored_yield_duration_(0), + controller_service_provider_(controller_service_provider) { logger_ = logging::Logger::getLogger(); running_ = false; repo_ = repo; + utils::ThreadPool<bool> pool = utils::ThreadPool<bool>( + configure_->getInt(Configure::nifi_flow_engine_threads, 8), true); + component_lifecycle_thread_pool_ = std::move(pool); + component_lifecycle_thread_pool_.start(); } // Destructor virtual ~SchedulingAgent() { @@ -69,13 +82,19 @@ class SchedulingAgent { // start void start() { running_ = true; + } // stop void stop() { running_ = false; + component_lifecycle_thread_pool_.shutdown(); } public: + virtual void enableControllerService( + std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode); + virtual void disableControllerService( + std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode); // schedule, overwritten by different DrivenSchedulingAgent virtual void schedule(std::shared_ptr<core::Processor> processor) = 0; // unschedule, overwritten by different DrivenSchedulingAgent @@ -91,11 +110,17 @@ class SchedulingAgent { // Whether it is running std::atomic<bool> running_; // AdministrativeYieldDuration - int64_t _administrativeYieldDuration; + int64_t admin_yield_duration_; // BoredYieldDuration - int64_t _boredYieldDuration; + int64_t bored_yield_duration_; + + std::shared_ptr<Configure> configure_; std::shared_ptr<core::Repository> repo_; + // thread pool for components. + utils::ThreadPool<bool> component_lifecycle_thread_pool_; + // controller service provider reference + std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider_; private: // Prevent default copy constructor and assignment operation http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/include/ThreadedSchedulingAgent.h ---------------------------------------------------------------------- diff --git a/libminifi/include/ThreadedSchedulingAgent.h b/libminifi/include/ThreadedSchedulingAgent.h index 044b3c3..bf6f480 100644 --- a/libminifi/include/ThreadedSchedulingAgent.h +++ b/libminifi/include/ThreadedSchedulingAgent.h @@ -40,11 +40,13 @@ class ThreadedSchedulingAgent : public SchedulingAgent { public: // Constructor /*! - * Create a new processor + * Create a new threaded scheduling agent. */ - ThreadedSchedulingAgent(std::shared_ptr<core::Repository> repo, std::shared_ptr<Configure> configure) - : SchedulingAgent(repo) { - configure_ = configure; + ThreadedSchedulingAgent( + std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, + std::shared_ptr<core::Repository> repo, + std::shared_ptr<Configure> configuration) + : SchedulingAgent(controller_service_provider, repo, configuration) { } // Destructor virtual ~ThreadedSchedulingAgent() { @@ -70,7 +72,6 @@ class ThreadedSchedulingAgent : public SchedulingAgent { // Only support pass by reference or pointer ThreadedSchedulingAgent(const ThreadedSchedulingAgent &parent); ThreadedSchedulingAgent &operator=(const ThreadedSchedulingAgent &parent); - std::shared_ptr<Configure> configure_; }; } /* namespace minifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/include/TimerDrivenSchedulingAgent.h ---------------------------------------------------------------------- diff --git a/libminifi/include/TimerDrivenSchedulingAgent.h b/libminifi/include/TimerDrivenSchedulingAgent.h index 31d089b..74096ee 100644 --- a/libminifi/include/TimerDrivenSchedulingAgent.h +++ b/libminifi/include/TimerDrivenSchedulingAgent.h @@ -37,8 +37,11 @@ class TimerDrivenSchedulingAgent : public ThreadedSchedulingAgent { /*! * Create a new processor */ - TimerDrivenSchedulingAgent(std::shared_ptr<core::Repository> repo, std::shared_ptr<Configure> configure) - : ThreadedSchedulingAgent(repo, configure) { + TimerDrivenSchedulingAgent( + std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, + std::shared_ptr<core::Repository> repo, + std::shared_ptr<Configure> configure) + : ThreadedSchedulingAgent(controller_service_provider, repo, configure) { } // Destructor virtual ~TimerDrivenSchedulingAgent() { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/include/controllers/SSLContextService.h ---------------------------------------------------------------------- diff --git a/libminifi/include/controllers/SSLContextService.h b/libminifi/include/controllers/SSLContextService.h new file mode 100644 index 0000000..7b1c5b0 --- /dev/null +++ b/libminifi/include/controllers/SSLContextService.h @@ -0,0 +1,174 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CONTROLLERS_SSLCONTEXTSERVICE_H_ +#define LIBMINIFI_INCLUDE_CONTROLLERS_SSLCONTEXTSERVICE_H_ + +#include <openssl/err.h> +#include <openssl/ssl.h> +#include <iostream> +#include <memory> +#include "core/Resource.h" +#include "utils/StringUtils.h" +#include "io/validation.h" +#include "../core/controller/ControllerService.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace controllers { + +class SSLContext { + public: + SSLContext(SSL_CTX *context) + : context_(context) { + + } + ~SSLContext() { + if (context_) { + SSL_CTX_free(context_); + } + } + protected: + SSL_CTX *context_; +}; + +/** + * SSLContextService provides a configurable controller service from + * which we can provide an SSL Context or component parts that go + * into creating one. + * + * Justification: Abstracts SSL support out of processors into a + * configurable controller service. + */ +class SSLContextService : public core::controller::ControllerService { + public: + explicit SSLContextService(const std::string &name, const std::string &id) + : ControllerService(name, id), + initialized_(false), + valid_(false) { + } + + explicit SSLContextService(const std::string &name, uuid_t uuid = 0) + : ControllerService(name, uuid), + initialized_(false), + valid_(false) { + } + + virtual void initialize(); + + std::unique_ptr<SSLContext> createSSLContext(); + + const std::string &getCertificateFile(); + + const std::string &getPassphrase(); + + const std::string &getPassphraseFile(); + + const std::string &getPrivateKeyFile(); + + const std::string &getCACertificate(); + + void yield() { + + } + + bool isRunning() { + return getState() == core::controller::ControllerServiceState::ENABLED; + } + + bool isWorkAvailable() { + return false; + } + + bool configure_ssl_context(SSL_CTX *ctx) + { + if (SSL_CTX_use_certificate_file(ctx, certificate.c_str(), SSL_FILETYPE_PEM) + <= 0) { + logger_->log_error("Could not create load certificate, error : %s", + std::strerror(errno)); + return false; + } + if (!IsNullOrEmpty(passphrase_)) { + SSL_CTX_set_default_passwd_cb_userdata(ctx, &passphrase_); + SSL_CTX_set_default_passwd_cb(ctx, pemPassWordCb); + } + + int retp = SSL_CTX_use_PrivateKey_file(ctx, private_key_.c_str(), + SSL_FILETYPE_PEM); + if (retp != 1) { + logger_->log_error("Could not create load private key,%i on %s error : %s", + retp, private_key_, std::strerror(errno)); + return false; + } + + if (!SSL_CTX_check_private_key(ctx)) { + logger_->log_error( + "Private key does not match the public certificate, error : %s", + std::strerror(errno)); + return false; + } + + retp = SSL_CTX_load_verify_locations(ctx, ca_certificate_.c_str(), 0); + if (retp == 0) { + logger_->log_error("Can not load CA certificate, Exiting, error : %s", + std::strerror(errno)); + return false; + } + + return true; + } + + protected: + + static int pemPassWordCb(char *buf, int size, int rwflag, void *userdata) { + + std::string *pass = (std::string*) userdata; + if (pass->length() > 0) { + + memset(buf, 0x00, size); + memcpy(buf, pass->c_str(), pass->length() - 1); + + return pass->length() - 1; + } + return 0; + } + + virtual void initializeTLS(); + + virtual void onEnable(); + + std::mutex initialization_mutex_; + std::atomic<bool> initialized_; + std::atomic<bool> valid_; + std::string certificate; + std::string private_key_; + std::string passphrase_; + std::string passphrase_file_; + std::string ca_certificate_; +}; +typedef int (SSLContextService::*ptr)(char *, int, int, void *); +REGISTER_RESOURCE(SSLContextService); + +} /* namespace controllers */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_CONTROLLERS_SSLCONTEXTSERVICE_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/include/core/ClassLoader.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/ClassLoader.h b/libminifi/include/core/ClassLoader.h new file mode 100644 index 0000000..94c4425 --- /dev/null +++ b/libminifi/include/core/ClassLoader.h @@ -0,0 +1,259 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef LIBMINIFI_INCLUDE_CORE_CLASSLOADER_H_ +#define LIBMINIFI_INCLUDE_CORE_CLASSLOADER_H_ + +#include <mutex> +#include <vector> +#include <map> +#include "Connectable.h" +#include "utils/StringUtils.h" +#include <dlfcn.h> +#include "core/Core.h" +#include "core/logging/Logger.h" +#include "io/DataStream.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { + +#define RESOURCE_FAILURE -1 + +#define RESOURCE_SUCCESS 1 + +/** + * Factory that is used as an interface for + * creating processors from shared objects. + */ +class ObjectFactory { + + public: + /** + * Virtual destructor. + */ + virtual ~ObjectFactory() { + + } + + /** + * Create a shared pointer to a new processor. + */ + virtual std::shared_ptr<Connectable> create(const std::string &name) { + return nullptr; + } + + /** + * Create a shared pointer to a new processor. + */ + virtual std::shared_ptr<Connectable> create(const std::string &name, + uuid_t uuid) { + return nullptr; + } + + /** + * Gets the name of the object. + * @return class name of processor + */ + virtual std::string getName() = 0; + + /** + * Gets the class name for the object + * @return class name for the processor. + */ + virtual std::string getClassName() = 0; + +}; + +/** + * Factory that is used as an interface for + * creating processors from shared objects. + */ +template<class T> +class DefautObjectFactory : public ObjectFactory { + + public: + + DefautObjectFactory() { + className = core::getClassName<T>(); + } + /** + * Virtual destructor. + */ + virtual ~DefautObjectFactory() { + + } + + /** + * Create a shared pointer to a new processor. + */ + virtual std::shared_ptr<Connectable> create(const std::string &name) { + std::shared_ptr<T> ptr = std::make_shared<T>(name); + return std::static_pointer_cast<Connectable>(ptr); + } + + /** + * Create a shared pointer to a new processor. + */ + virtual std::shared_ptr<Connectable> create(const std::string &name, + uuid_t uuid) { + std::shared_ptr<T> ptr = std::make_shared<T>(name, uuid); + return std::static_pointer_cast<Connectable>(ptr); + } + + /** + * Gets the name of the object. + * @return class name of processor + */ + virtual std::string getName() { + return className; + } + + /** + * Gets the class name for the object + * @return class name for the processor. + */ + virtual std::string getClassName() { + return className; + } + + protected: + std::string className; + +}; + +/** + * Function that is used to create the + * processor factory from the shared object. + */ +typedef ObjectFactory* createFactory(); + +/** + * Processor class loader that accepts + * a variety of mechanisms to load in shared + * objects. + */ +class ClassLoader { + + public: + + static ClassLoader &getDefaultClassLoader(); + + /** + * Constructor. + */ + ClassLoader() + : logger_(logging::Logger::getLogger()) { + + } + + ~ClassLoader() { + loaded_factories_.clear(); + for (auto ptr : dl_handles_) { + dlclose(ptr); + } + } + + /** + * Register the file system resource. + * This will attempt to load objects within this resource. + * @return return code: RESOURCE_FAILURE or RESOURCE_SUCCESS + */ + uint16_t registerResource(const std::string &resource); + + /** + * Register a class with the give ProcessorFactory + */ + void registerClass(const std::string &name, + std::unique_ptr<ObjectFactory> factory) { + if (loaded_factories_.find(name) != loaded_factories_.end()){ + return; + } + + std::lock_guard<std::mutex> lock(internal_mutex_); + + + loaded_factories_.insert(std::make_pair(name, std::move(factory))); + } + + /** + * Instantiate object based on class_name + * @param class_name class to create + * @param uuid uuid of object + * @return nullptr or object created from class_name definition. + */ + template<class T = Connectable> + std::shared_ptr<T> instantiate(const std::string &class_name, + const std::string &name); + + /** + * Instantiate object based on class_name + * @param class_name class to create + * @param uuid uuid of object + * @return nullptr or object created from class_name definition. + */ + template<class T = Connectable> + std::shared_ptr<T> instantiate(const std::string &class_name, uuid_t uuid); + + protected: + + // logger shared ptr + std::shared_ptr<org::apache::nifi::minifi::core::logging::Logger> logger_; + + std::map<std::string, std::unique_ptr<ObjectFactory>> loaded_factories_; + + std::mutex internal_mutex_; + + std::vector<void *> dl_handles_; + +}; + +template<class T> +std::shared_ptr<T> ClassLoader::instantiate(const std::string &class_name, + const std::string &name) { + std::lock_guard<std::mutex> lock(internal_mutex_); + auto factory_entry = loaded_factories_.find(class_name); + if (factory_entry != loaded_factories_.end()) { + auto obj = factory_entry->second->create(name); + return std::static_pointer_cast<T>(obj); + } else { + return nullptr; + } +} + +template<class T> +std::shared_ptr<T> ClassLoader::instantiate(const std::string &class_name, + uuid_t uuid) { + std::lock_guard<std::mutex> lock(internal_mutex_); + auto factory_entry = loaded_factories_.find(class_name); + if (factory_entry != loaded_factories_.end()) { + auto obj = factory_entry->second->create(class_name, uuid); + return std::static_pointer_cast<T>(obj); + } else { + return nullptr; + } +} + +}/* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_CORE_CLASSLOADER_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/include/core/ConfigurableComponent.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/ConfigurableComponent.h b/libminifi/include/core/ConfigurableComponent.h index d46216b..bf886e8 100644 --- a/libminifi/include/core/ConfigurableComponent.h +++ b/libminifi/include/core/ConfigurableComponent.h @@ -54,6 +54,11 @@ class ConfigurableComponent { * @return result of getting property. */ bool getProperty(const std::string name, std::string &value); + + /** + * Provides a reference for the property. + */ + bool getProperty(const std::string &name, Property &prop); /** * Sets the property using the provided name * @param property name @@ -61,6 +66,12 @@ class ConfigurableComponent { * @return result of setting property. */ bool setProperty(const std::string name, std::string value); + + /** + * Updates the Property from the key (name), adding value + * to the collection of values within the Property. + */ + bool updateProperty(const std::string &name, const std::string &value); /** * Sets the property using the provided name * @param property name http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/include/core/ConfigurationFactory.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/ConfigurationFactory.h b/libminifi/include/core/ConfigurationFactory.h index bf631d4..ed0bdb5 100644 --- a/libminifi/include/core/ConfigurationFactory.h +++ b/libminifi/include/core/ConfigurationFactory.h @@ -31,7 +31,8 @@ namespace core { template<typename T> typename std::enable_if<!class_operations<T>::value, T*>::type instantiate( const std::shared_ptr<core::Repository> &repo, - const std::shared_ptr<core::Repository> &flow_file_repo, const std::string path) { + const std::shared_ptr<core::Repository> &flow_file_repo, + std::shared_ptr<Configure> configuration, const std::string path) { throw std::runtime_error("Cannot instantiate class"); } @@ -40,8 +41,8 @@ typename std::enable_if<class_operations<T>::value, T*>::type instantiate( const std::shared_ptr<core::Repository> &repo, const std::shared_ptr<core::Repository> &flow_file_repo, const std::shared_ptr<io::StreamFactory> &stream_factory, - const std::string path) { - return new T(repo, flow_file_repo, stream_factory, path); + std::shared_ptr<Configure> configuration, const std::string path) { + return new T(repo, flow_file_repo, stream_factory, configuration, path); } /** http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/include/core/Connectable.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/Connectable.h b/libminifi/include/core/Connectable.h index f7e425e..f690533 100644 --- a/libminifi/include/core/Connectable.h +++ b/libminifi/include/core/Connectable.h @@ -120,6 +120,16 @@ class Connectable : public CoreComponent { */ virtual bool isWorkAvailable() = 0; + /** + * Verify that this connectable can be stopped. + * @return bool. + */ + virtual bool verifyCanStop() { + if (isRunning()) + return true; + return false; + } + protected: // Penalization Period in MilliSecond @@ -137,7 +147,7 @@ class Connectable : public CoreComponent { // Incoming connections std::set<std::shared_ptr<Connectable>> _incomingConnections; // Outgoing connections map based on Relationship name - std::map<std::string, std::set<std::shared_ptr<Connectable>>>_outGoingConnections; + std::map<std::string, std::set<std::shared_ptr<Connectable>>> out_going_connections_; // Mutex for protection std::mutex relationship_mutex_; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/include/core/Core.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/Core.h b/libminifi/include/core/Core.h index 1010be7..453a6a5 100644 --- a/libminifi/include/core/Core.h +++ b/libminifi/include/core/Core.h @@ -60,7 +60,7 @@ struct class_operations { typedef decltype(canDestruct<T>(0)) type; - static const bool value = type::value; /* Which is it? */ + static const bool value = type::value; }; template<typename T> @@ -99,6 +99,7 @@ class CoreComponent { uuidStr_ = uuidStr; } + /** * Move Constructor. */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/include/core/FlowConfiguration.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/FlowConfiguration.h b/libminifi/include/core/FlowConfiguration.h index 96f0b4a..2e704b5 100644 --- a/libminifi/include/core/FlowConfiguration.h +++ b/libminifi/include/core/FlowConfiguration.h @@ -21,6 +21,8 @@ #include "core/Core.h" #include "Connection.h" #include "RemoteProcessorGroupPort.h" +#include "core/controller/ControllerServiceNode.h" +#include "core/controller/StandardControllerServiceProvider.h" #include "provenance/Provenance.h" #include "core/reporting/SiteToSiteProvenanceReportingTask.h" #include "processors/GetFile.h" @@ -56,14 +58,19 @@ class FlowConfiguration : public CoreComponent { * Constructor that will be used for configuring * the flow controller. */ - FlowConfiguration(std::shared_ptr<core::Repository> repo, - std::shared_ptr<core::Repository> flow_file_repo, - std::shared_ptr<io::StreamFactory> stream_factory, - const std::string path) + explicit FlowConfiguration(std::shared_ptr<core::Repository> repo, + std::shared_ptr<core::Repository> flow_file_repo, + std::shared_ptr<io::StreamFactory> stream_factory, + std::shared_ptr<Configure> configuration, + const std::string path) : CoreComponent(core::getClassName<FlowConfiguration>()), flow_file_repo_(flow_file_repo), config_path_(path) { - + controller_services_ = std::make_shared< + core::controller::ControllerServiceMap>(); + service_provider_ = std::make_shared< + core::controller::StandardControllerServiceProvider>( + controller_services_, nullptr, configuration); } virtual ~FlowConfiguration(); @@ -74,6 +81,10 @@ class FlowConfiguration : public CoreComponent { // Create Root Processor Group std::unique_ptr<core::ProcessGroup> createRootProcessGroup(std::string name, uuid_t uuid); + + std::shared_ptr<core::controller::ControllerServiceNode> createControllerService( + const std::string &class_name, const std::string &name, uuid_t uuid); + // Create Remote Processor Group std::unique_ptr<core::ProcessGroup> createRemoteProcessGroup(std::string name, uuid_t uuid); @@ -105,11 +116,21 @@ class FlowConfiguration : public CoreComponent { return nullptr; } + std::shared_ptr<core::controller::StandardControllerServiceProvider> &getControllerServiceProvider() { + return service_provider_; + } + protected: + + // service provider reference. + std::shared_ptr<core::controller::StandardControllerServiceProvider> service_provider_; + // based, shared controller service map. + std::shared_ptr<core::controller::ControllerServiceMap> controller_services_; // configuration path std::string config_path_; // flow file repo std::shared_ptr<core::Repository> flow_file_repo_; + // stream factory std::shared_ptr<io::StreamFactory> stream_factory_; }; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/include/core/ProcessContext.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/ProcessContext.h b/libminifi/include/core/ProcessContext.h index 1da85cd..79b7704 100644 --- a/libminifi/include/core/ProcessContext.h +++ b/libminifi/include/core/ProcessContext.h @@ -27,6 +27,8 @@ #include <algorithm> #include "Property.h" +#include "core/controller/ControllerServiceProvider.h" +#include "core/controller/ControllerServiceLookup.h" #include "core/logging/Logger.h" #include "ProcessorNode.h" #include "core/Repository.h" @@ -38,15 +40,18 @@ namespace minifi { namespace core { // ProcessContext Class -class ProcessContext { +class ProcessContext : public controller::ControllerServiceLookup { public: // Constructor /*! * Create a new process context associated with the processor/controller service/state manager */ - ProcessContext(ProcessorNode &processor, - std::shared_ptr<core::Repository> repo) - : processor_node_(processor) { + ProcessContext( + ProcessorNode &processor, + std::shared_ptr<controller::ControllerServiceProvider> &controller_service_provider, + std::shared_ptr<core::Repository> repo) + : processor_node_(processor), + controller_service_provider_(controller_service_provider) { logger_ = logging::Logger::getLogger(); repo_ = repo; } @@ -95,8 +100,54 @@ class ProcessContext { ProcessContext(const ProcessContext &parent) = delete; ProcessContext &operator=(const ProcessContext &parent) = delete; + // controller services + + /** + * @param identifier of controller service + * @return the ControllerService that is registered with the given + * identifier + */ + std::shared_ptr<core::controller::ControllerService> getControllerService( + const std::string &identifier) { + return controller_service_provider_->getControllerServiceForComponent( + identifier, processor_node_.getUUIDStr()); + } + + /** + * @param identifier identifier of service to check + * @return <code>true</code> if the Controller Service with the given + * identifier is enabled, <code>false</code> otherwise. If the given + * identifier is not known by this ControllerServiceLookup, returns + * <code>false</code> + */ + bool isControllerServiceEnabled(const std::string &identifier) { + return controller_service_provider_->isControllerServiceEnabled(identifier); + } + + /** + * @param identifier identifier of service to check + * @return <code>true</code> if the Controller Service with the given + * identifier has been enabled but is still in the transitioning state, + * otherwise returns <code>false</code>. If the given identifier is not + * known by this ControllerServiceLookup, returns <code>false</code> + */ + bool isControllerServiceEnabling(const std::string &identifier) { + return controller_service_provider_->isControllerServiceEnabling(identifier); + } + + /** + * @param identifier identifier to look up + * @return the name of the Controller service with the given identifier. If + * no service can be found with this identifier, returns {@code null} + */ + const std::string getControllerServiceName(const std::string &identifier) { + return controller_service_provider_->getControllerServiceName(identifier); + } + private: + // controller service provider. + std::shared_ptr<controller::ControllerServiceProvider> controller_service_provider_; // repository shared pointer. std::shared_ptr<core::Repository> repo_; // Processor http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/include/core/ProcessGroup.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/ProcessGroup.h b/libminifi/include/core/ProcessGroup.h index 75bb0ba..f2f9a63 100644 --- a/libminifi/include/core/ProcessGroup.h +++ b/libminifi/include/core/ProcessGroup.h @@ -32,6 +32,8 @@ #include "TimerDrivenSchedulingAgent.h" #include "EventDrivenSchedulingAgent.h" #include "core/logging/Logger.h" +#include "controller/ControllerServiceNode.h" +#include "controller/ControllerServiceMap.h" namespace org { namespace apache { @@ -140,6 +142,23 @@ class ProcessGroup { std::shared_ptr<Processor> findProcessor(uuid_t uuid); // findProcessor based on name std::shared_ptr<Processor> findProcessor(const std::string &processorName); + /** + * Add controller service + * @param nodeId node identifier + * @param node controller service node. + */ + void addControllerService( + const std::string &nodeId, + std::shared_ptr<core::controller::ControllerServiceNode> &node); + + /** + * Find controllerservice node will search child groups until the nodeId is found. + * @param node node identifier + * @return controller service node, if it exists. + */ + std::shared_ptr<core::controller::ControllerServiceNode> findControllerService( + const std::string &nodeId); + // removeConnection void removeConnection(std::shared_ptr<Connection> connection); // update property value @@ -171,6 +190,10 @@ class ProcessGroup { // Transmitting std::atomic<bool> transmitting_; + // controller services + + core::controller::ControllerServiceMap controller_service_map_; + private: // Mutex for protection http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/include/core/Property.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/Property.h b/libminifi/include/core/Property.h index c681449..fd940c5 100644 --- a/libminifi/include/core/Property.h +++ b/libminifi/include/core/Property.h @@ -30,6 +30,7 @@ #include <set> #include <stdlib.h> #include <math.h> +#include "utils/StringUtils.h" namespace org { namespace apache { @@ -58,11 +59,24 @@ class Property { Property(const std::string name, const std::string description, const std::string value) : name_(name), - description_(description), - value_(value) { + isCollection(false), + description_(description) { + values_.push_back(std::string(value.c_str())); } - Property() { + + Property(const std::string name, const std::string description) + : name_(name), + isCollection(true), + description_(description) { + } + + Property() + : isCollection(false), + name_(""), + description_("") { + } + // Destructor virtual ~Property() { } @@ -72,8 +86,14 @@ class Property { std::string getDescription(); // Get value for the property std::string getValue() const; + std::vector<std::string> &getValues(); + // Set value for the property void setValue(std::string value); + /** + * Add value to the collection of values. + */ + void addValue(const std::string &value); const Property &operator=(const Property &other); // Compare bool operator <(const Property & right) const; @@ -244,12 +264,13 @@ class Property { } protected: + bool isCollection; // Name std::string name_; // Description std::string description_; // Value - std::string value_; + std::vector<std::string> values_; private: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/include/core/Resource.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/Resource.h b/libminifi/include/core/Resource.h new file mode 100644 index 0000000..8f110a1 --- /dev/null +++ b/libminifi/include/core/Resource.h @@ -0,0 +1,54 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CORE_RESOURCE_H_ +#define LIBMINIFI_INCLUDE_CORE_RESOURCE_H_ + +#include "ClassLoader.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { + +template<class T> +class StaticClassType { + public: + + StaticClassType(const std::string &name) { + // Notify when the static member is created + ClassLoader::getDefaultClassLoader().registerClass( + name, std::unique_ptr<ObjectFactory>(new DefautObjectFactory<T>())); + } +}; + +#define REGISTER_RESOURCE(CLASSNAME) \ + static core::StaticClassType<CLASSNAME> \ + CLASSNAME##_registrar( #CLASSNAME ); + +#define REGISTER_RESOURCE_AS(CLASSNAME,NAME) \ + static core::StaticClassType<CLASSNAME> \ + CLASSNAME##_registrar( #NAME ); + +}/* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_CORE_RESOURCE_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/include/core/controller/ControllerService.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/controller/ControllerService.h b/libminifi/include/core/controller/ControllerService.h new file mode 100644 index 0000000..86c8f7a --- /dev/null +++ b/libminifi/include/core/controller/ControllerService.h @@ -0,0 +1,142 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CORE_CONTROLLER_CONTROLLERSERVICE_H_ +#define LIBMINIFI_INCLUDE_CORE_CONTROLLER_CONTROLLERSERVICE_H_ + +#include <set> +#include "properties/Configure.h" +#include "core/Core.h" +#include "core/ConfigurableComponent.h" +#include "core/Connectable.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { +namespace controller { + +enum ControllerServiceState { + /** + * Controller Service is disabled and cannot be used. + */ + DISABLED, + /** + * Controller Service is in the process of being disabled. + */ + DISABLING, + /** + * Controller Service is being enabled. + */ + ENABLING, + /** + * Controller Service is enabled. + */ + ENABLED +}; + +/** + * Controller Service base class that contains some pure virtual methods. + * + * Design: OnEnable is executed when the controller service is being enabled. + * Note that keeping state here must be protected in this function. + */ +class ControllerService : public ConfigurableComponent, public Connectable { + public: + + /** + * Controller Service constructor. + */ + explicit ControllerService() + : Connectable(core::getClassName<ControllerService>(), 0), + ConfigurableComponent(logging::Logger::getLogger()), + configuration_(std::make_shared<Configure>()) { + current_state_ = DISABLED; + } + + /** + * Controller Service constructor. + */ + explicit ControllerService(const std::string &name, const std::string &id) + : Connectable(name, 0), + ConfigurableComponent(logging::Logger::getLogger()), + configuration_(std::make_shared<Configure>()) { + current_state_ = DISABLED; + uuid_parse(id.c_str(), uuid_); + char uuidStr[37]; + uuid_unparse_lower(uuid_, uuidStr); + uuidStr_ = uuidStr; + } + + /** + * Controller Service constructor. + */ + explicit ControllerService(const std::string &name, uuid_t uuid) + : Connectable(name, uuid), + ConfigurableComponent(logging::Logger::getLogger()), + configuration_(std::make_shared<Configure>()) { + current_state_ = DISABLED; + } + + virtual void initialize() { + // set base supported properties + Property property("Linked Services", "Referenced Controller Services"); + std::set<Property> supportedProperties; + supportedProperties.insert(property); + setSupportedProperties(supportedProperties); + current_state_ = ENABLED; + } + + /** + * Replaces the configuration object within the controller service. + */ + void setConfiguration(const std::shared_ptr<Configure> &configuration) { + configuration_ = configuration; + } + + ControllerServiceState getState() { + return current_state_.load(); + } + + /** + * Function is called when Controller Services are enabled and being run + */ + virtual void onEnable() { + + } + + void setState(ControllerServiceState state) { + current_state_ = state; + } + protected: + + std::shared_ptr<Configure> configuration_; + std::atomic<ControllerServiceState> current_state_; + virtual bool canEdit() { + return true; + } +}; + +} /* namespace controller */ +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_CORE_CONTROLLER_CONTROLLERSERVICE_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/include/core/controller/ControllerServiceLookup.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/controller/ControllerServiceLookup.h b/libminifi/include/core/controller/ControllerServiceLookup.h new file mode 100644 index 0000000..6f23e34 --- /dev/null +++ b/libminifi/include/core/controller/ControllerServiceLookup.h @@ -0,0 +1,89 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CORE_CONTROLLER_CONTROLLERSERVICELOOKUP_H_ +#define LIBMINIFI_INCLUDE_CORE_CONTROLLER_CONTROLLERSERVICELOOKUP_H_ + +#include <map> +#include "core/Core.h" +#include "core/ConfigurableComponent.h" +#include "ControllerService.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { +namespace controller { + +/** + * Controller Service Lookup pure virtual class. + * + * Purpose: Provide a mechanism that controllers can lookup information about + * controller services. + * + */ +class ControllerServiceLookup { + public: + + ControllerServiceLookup() { + + } + + virtual ~ControllerServiceLookup() { + + } + + /** + * Gets the controller service via the provided identifier. + * @param identifier reference string for controller service. + * @return controller service reference. + */ + virtual std::shared_ptr<ControllerService> getControllerService( + const std::string &identifier) = 0; + + /** + * Detects if controller service is enabled. + * @param identifier reference string for controller service. + * @return true if controller service is enabled. + */ + virtual bool isControllerServiceEnabled(const std::string &identifier) = 0; + + /** + * Detects if controller service is being enabled. + * @param identifier reference string for controller service. + * @return true if controller service is enabled. + */ + virtual bool isControllerServiceEnabling(const std::string &identifier) = 0; + + /** + * Gets the controller service name for the provided reference identifier + * @param identifier reference string for the controller service. + */ + virtual const std::string getControllerServiceName( + const std::string &identifier) = 0; + +}; + +} /* namespace controller */ +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_CORE_CONTROLLER_CONTROLLERSERVICELOOKUP_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/include/core/controller/ControllerServiceMap.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/controller/ControllerServiceMap.h b/libminifi/include/core/controller/ControllerServiceMap.h new file mode 100644 index 0000000..f1cc2cf --- /dev/null +++ b/libminifi/include/core/controller/ControllerServiceMap.h @@ -0,0 +1,120 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CORE_CONTROLLER_CONTROLLERSERVICEMAP_H_ +#define LIBMINIFI_INCLUDE_CORE_CONTROLLER_CONTROLLERSERVICEMAP_H_ + +#include <map> +#include <string> +#include "ControllerServiceNode.h" +#include "io/validation.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { +namespace controller { + +/** + * Purpose: Controller service map is the mapping between service names + * and ControllerService Nodes. + * Justification: This abstracts the map, the controller for the map, and the + * accounting into an object that will be used amongst the separate Controller + * Service classes. This will help avoid help when sending the map as a reference. + */ +class ControllerServiceMap { + public: + + ControllerServiceMap() { + } + + virtual ~ControllerServiceMap() { + } + + /** + * Gets the controller service node using the <code>id</code> + * @param id identifier for controller service. + * @return nullptr if node does not exist or controller service node shared pointer. + */ + virtual std::shared_ptr<ControllerServiceNode> getControllerServiceNode( + const std::string &id) { + std::lock_guard<std::mutex> lock(mutex_); + auto exists = controller_services_.find(id); + if (exists != controller_services_.end()) + return exists->second; + else + return nullptr; + } + + /** + * Removes the controller service. + * @param serviceNode service node to remove + * + */ + virtual bool removeControllerService( + const std::shared_ptr<ControllerServiceNode> &serviceNode) { + if (IsNullOrEmpty(serviceNode.get())) + return false; + std::lock_guard<std::mutex> lock(mutex_); + controller_services_[serviceNode->getName()] = nullptr; + controller_services_list_.erase(serviceNode); + return true; + } + + /** + * Puts the service node into the mapping using <code>id</code> as the identifier + * @param id service identifier + * @param serviceNode controller service node shared pointer. + * + */ + virtual bool put(const std::string &id, + const std::shared_ptr<ControllerServiceNode> &serviceNode) { + if (IsNullOrEmpty(id) || IsNullOrEmpty(serviceNode.get())) + return false; + std::lock_guard<std::mutex> lock(mutex_); + controller_services_[id] = serviceNode; + controller_services_list_.insert(serviceNode); + return true; + } + + /** + * Gets all controller services. + * @return controller service node shared pointers. + */ + std::vector<std::shared_ptr<ControllerServiceNode>> getAllControllerServices() { + std::lock_guard<std::mutex> lock(mutex_); + return std::vector<std::shared_ptr<ControllerServiceNode>>( + controller_services_list_.begin(), controller_services_list_.end()); + } + + ControllerServiceMap(const ControllerServiceMap &other) = delete; + + protected: + std::mutex mutex_; + std::set<std::shared_ptr<ControllerServiceNode>> controller_services_list_; + std::map<std::string, std::shared_ptr<ControllerServiceNode>> controller_services_; +}; + +} /* namespace controller */ +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_CORE_CONTROLLER_CONTROLLERSERVICEMAP_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/include/core/controller/ControllerServiceNode.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/controller/ControllerServiceNode.h b/libminifi/include/core/controller/ControllerServiceNode.h new file mode 100644 index 0000000..55913cb --- /dev/null +++ b/libminifi/include/core/controller/ControllerServiceNode.h @@ -0,0 +1,133 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CORE_CONTROLLER_CONTROLLERSERVICENODE_H_ +#define LIBMINIFI_INCLUDE_CORE_CONTROLLER_CONTROLLERSERVICENODE_H_ + +#include "core/Core.h" +#include "core/ConfigurableComponent.h" +#include "core/logging/Logger.h" +#include "properties/Configure.h" +#include "ControllerService.h" +#include "io/validation.h" +#include "Exception.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { +namespace controller { + +class ControllerServiceNode : public CoreComponent, public ConfigurableComponent { + public: + + /** + * Constructor for the controller service node. + * @param service controller service reference + * @param id identifier for this node. + * @param configuration shared pointer configuration. + */ + explicit ControllerServiceNode(std::shared_ptr<ControllerService> service, + const std::string &id, std::shared_ptr<Configure> configuration) + : CoreComponent(id), + ConfigurableComponent(logging::Logger::getLogger()), + controller_service_(service), + configuration_(configuration), + active(false) { + if (service == nullptr || IsNullOrEmpty(service.get())) { + throw Exception(GENERAL_EXCEPTION, "Service must be properly configured"); + } + if (IsNullOrEmpty(configuration)) { + throw Exception(GENERAL_EXCEPTION, + "Configuration must be properly configured"); + } + service->setConfiguration(configuration); + } + + virtual void initialize() { + controller_service_->initialize(); + // set base supported properties + Property property("Linked Services", "Referenced Controller Services"); + std::set<Property> supportedProperties; + supportedProperties.insert(property); + setSupportedProperties(supportedProperties); + } + void setName(const std::string name) { + CoreComponent::setName(name); + controller_service_->setName(name); + } + + void setUUID(uuid_t uuid) { + CoreComponent::setUUID(uuid); + controller_service_->setUUID(uuid); + } + + /** + * Returns the implementation of the Controller Service that this ControllerServiceNode + * maintains + * @return the implementation of the Controller Service + */ + std::shared_ptr<ControllerService> &getControllerServiceImplementation(); + std::vector<std::shared_ptr<ControllerServiceNode> > &getLinkedControllerServices(); + std::vector<std::shared_ptr<ConfigurableComponent> > &getLinkedComponents(); + + /** + * Returns true if we can be enabled. + * Returns false if this ControllerServiceNode cannot be enabled. + */ + virtual bool canEnable()=0; + + virtual bool enabled() { + return active.load(); + } + + /** + * Function to enable the controller service node. + */ + virtual bool enable() = 0; + + /** + * Function to disable the controller service node. + */ + virtual bool disable() = 0; + + ControllerServiceNode(const ControllerServiceNode &other) = delete; + ControllerServiceNode &operator=(const ControllerServiceNode &parent) = delete; + protected: + + bool canEdit() { + return true; + } + + std::atomic<bool> active; + std::shared_ptr<Configure> configuration_; + // controller service. + std::shared_ptr<ControllerService> controller_service_; + // linked controller services. + std::vector<std::shared_ptr<ControllerServiceNode> > linked_controller_services_; + std::vector<std::shared_ptr<ConfigurableComponent> > linked_components_; +}; + +} /* namespace controller */ +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_CORE_CONTROLLER_CONTROLLERSERVICENODE_H_ */
