MINIFI-339: Add C2 base allowing for 1 protocol and n heartbeat reporters MINIFI-339: Add GetTCP Processor MINIFI-339: Add listener server MINIFI-339: Update to listener MINIFI-339: Resolve Issue with stack based processor nodes losing scope MINIFI-369: Update ListenHTTP processor to allow transfer encoding MINIFI-339: Update rest receiver instantiation and fix issue found in GetFile MINIFI-339: Rename content to operational arguments MINIFI-371: remove virtual destructors when not needed MINIFI-339: Fixing issues with GetTCP MINIFI-378: Resolve issues with shutdown. Took the approach to call notifyStop at the destructor to avoid larger changes MINIFI-339: Allow C2 to be disabled
This closes #134. Signed-off-by: Aldrin Piri <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/9f161a27 Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/9f161a27 Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/9f161a27 Branch: refs/heads/master Commit: 9f161a27e5fa0ea9aac4d59d1c23edbad3d77858 Parents: f5832fa Author: Marc Parisi <[email protected]> Authored: Wed Jun 21 10:47:35 2017 -0400 Committer: Aldrin Piri <[email protected]> Committed: Mon Oct 2 10:56:07 2017 -0400 ---------------------------------------------------------------------- CMakeLists.txt | 2 + README.md | 24 + cmake/BuildTests.cmake | 23 +- libminifi/CMakeLists.txt | 4 +- libminifi/include/ConfigurationListener.h | 117 ----- libminifi/include/Connection.h | 6 +- libminifi/include/EventDrivenSchedulingAgent.h | 2 +- libminifi/include/FlowController.h | 92 +++- libminifi/include/FlowFileRecord.h | 1 - libminifi/include/HttpConfigurationListener.h | 76 --- libminifi/include/RemoteProcessorGroupPort.h | 22 +- libminifi/include/ResourceClaim.h | 29 +- libminifi/include/SchedulingAgent.h | 17 +- libminifi/include/Site2SiteClientProtocol.h | 115 ++--- libminifi/include/Site2SitePeer.h | 2 +- libminifi/include/ThreadedSchedulingAgent.h | 14 +- libminifi/include/TimerDrivenSchedulingAgent.h | 10 +- libminifi/include/c2/C2Agent.h | 200 ++++++++ libminifi/include/c2/C2Payload.h | 192 ++++++++ libminifi/include/c2/C2Protocol.h | 119 +++++ libminifi/include/c2/HeartBeatReporter.h | 101 ++++ libminifi/include/c2/protocols/Protocols.h | 24 + libminifi/include/c2/protocols/RESTProtocol.h | 74 +++ libminifi/include/c2/protocols/RESTReceiver.h | 110 +++++ libminifi/include/c2/protocols/RESTSender.h | 80 +++ .../include/controllers/SSLContextService.h | 38 +- libminifi/include/core/ClassLoader.h | 72 +++ libminifi/include/core/ConfigurationFactory.h | 15 +- libminifi/include/core/ContentRepository.h | 50 ++ libminifi/include/core/FlowConfiguration.h | 6 +- libminifi/include/core/ProcessContext.h | 34 +- libminifi/include/core/ProcessGroup.h | 2 + libminifi/include/core/ProcessSession.h | 13 +- libminifi/include/core/ProcessSessionFactory.h | 6 +- libminifi/include/core/Processor.h | 25 +- libminifi/include/core/ProcessorNode.h | 10 +- libminifi/include/core/Property.h | 5 + libminifi/include/core/Relationship.h | 2 +- libminifi/include/core/Repository.h | 9 +- libminifi/include/core/StreamManager.h | 13 + .../core/controller/ControllerServiceProvider.h | 5 +- .../StandardControllerServiceProvider.h | 20 +- libminifi/include/core/logging/Logger.h | 2 +- .../include/core/logging/LoggerConfiguration.h | 7 +- .../SiteToSiteProvenanceReportingTask.h | 2 +- .../include/core/repository/AtomicRepoEntries.h | 97 ++-- .../core/repository/FileSystemRepository.h | 2 + .../core/repository/FlowFileRepository.h | 24 +- .../core/repository/VolatileContentRepository.h | 8 +- .../repository/VolatileFlowFileRepository.h | 9 +- .../repository/VolatileProvenanceRepository.h | 3 +- .../core/repository/VolatileRepository.h | 7 +- .../include/core/state/ProcessorController.h | 73 +++ libminifi/include/core/state/StateManager.h | 126 +++++ libminifi/include/core/state/UpdateController.h | 252 ++++++++++ .../core/state/metrics/DeviceInformation.h | 319 ++++++++++++ .../include/core/state/metrics/MetricsBase.h | 161 ++++++ .../core/state/metrics/MetricsListener.h | 128 +++++ .../include/core/state/metrics/ProcessMetrics.h | 102 ++++ .../include/core/state/metrics/QueueMetrics.h | 106 ++++ .../core/state/metrics/RepositoryMetrics.h | 101 ++++ .../include/core/state/metrics/SystemMetrics.h | 109 +++++ libminifi/include/core/yaml/YamlConfiguration.h | 4 +- libminifi/include/io/AtomicEntryStream.h | 18 +- libminifi/include/io/BaseStream.h | 2 +- libminifi/include/io/CRCStream.h | 2 +- libminifi/include/io/ClientSocket.h | 30 +- libminifi/include/io/EndianCheck.h | 2 +- libminifi/include/io/FileStream.h | 2 +- libminifi/include/processors/GetFile.h | 64 ++- libminifi/include/processors/GetTCP.h | 288 +++++++++++ libminifi/include/processors/InvokeHTTP.h | 55 +-- libminifi/include/processors/LoadProcessors.h | 1 + libminifi/include/processors/PutFile.h | 3 +- libminifi/include/properties/Configure.h | 3 +- libminifi/include/properties/Properties.h | 8 + libminifi/include/provenance/Provenance.h | 19 +- .../include/provenance/ProvenanceRepository.h | 21 +- libminifi/include/utils/ByteInputCallBack.h | 11 +- libminifi/include/utils/HTTPClient.h | 301 ++++++++++++ libminifi/include/utils/HTTPUtils.h | 304 ------------ libminifi/include/utils/Id.h | 2 +- libminifi/include/utils/StringUtils.h | 4 +- libminifi/include/utils/ThreadPool.h | 53 +- libminifi/src/CPPLINT.cfg | 3 + libminifi/src/ConfigurationListener.cpp | 87 ---- libminifi/src/Configure.cpp | 1 + libminifi/src/Connection.cpp | 22 +- libminifi/src/EventDrivenSchedulingAgent.cpp | 3 +- libminifi/src/FlowController.cpp | 290 +++++++++-- libminifi/src/FlowFileRecord.cpp | 10 +- libminifi/src/HttpConfigurationListener.cpp | 102 ---- libminifi/src/Properties.cpp | 4 +- libminifi/src/RemoteProcessorGroupPort.cpp | 169 +++---- libminifi/src/ResourceClaim.cpp | 9 +- libminifi/src/SchedulingAgent.cpp | 22 +- libminifi/src/Site2SiteClientProtocol.cpp | 256 +++++----- libminifi/src/ThreadedSchedulingAgent.cpp | 20 +- libminifi/src/TimerDrivenSchedulingAgent.cpp | 7 +- libminifi/src/c2/C2Agent.cpp | 485 +++++++++++++++++++ libminifi/src/c2/C2Payload.cpp | 219 +++++++++ libminifi/src/c2/protocols/RESTProtocol.cpp | 177 +++++++ libminifi/src/c2/protocols/RESTReceiver.cpp | 148 ++++++ libminifi/src/c2/protocols/RESTSender.cpp | 144 ++++++ libminifi/src/controllers/SSLContextService.cpp | 19 +- libminifi/src/core/ClassLoader.cpp | 6 +- libminifi/src/core/ConfigurableComponent.cpp | 14 +- libminifi/src/core/ConfigurationFactory.cpp | 8 +- libminifi/src/core/Connectable.cpp | 12 +- libminifi/src/core/FlowConfiguration.cpp | 9 +- libminifi/src/core/FlowFile.cpp | 1 + libminifi/src/core/ProcessGroup.cpp | 38 +- libminifi/src/core/ProcessSession.cpp | 71 ++- libminifi/src/core/ProcessSessionFactory.cpp | 4 +- libminifi/src/core/Processor.cpp | 39 +- libminifi/src/core/ProcessorNode.cpp | 8 +- libminifi/src/core/Repository.cpp | 5 +- libminifi/src/core/RepositoryFactory.cpp | 8 +- .../StandardControllerServiceNode.cpp | 6 +- .../src/core/logging/LoggerConfiguration.cpp | 16 +- .../SiteToSiteProvenanceReportingTask.cpp | 6 +- .../core/repository/FileSystemRepository.cpp | 5 + .../src/core/repository/FlowFileRepository.cpp | 90 ++-- .../repository/VolatileContentRepository.cpp | 17 + .../src/core/state/ProcessorController.cpp | 64 +++ libminifi/src/core/state/StateManager.cpp | 137 ++++++ libminifi/src/core/state/UpdateController.cpp | 76 +++ libminifi/src/io/ClientSocket.cpp | 49 +- libminifi/src/io/FileStream.cpp | 1 + libminifi/src/io/StreamFactory.cpp | 6 +- libminifi/src/processors/ExecuteProcess.cpp | 6 +- libminifi/src/processors/GenerateFlowFile.cpp | 4 +- libminifi/src/processors/GetFile.cpp | 21 +- libminifi/src/processors/GetTCP.cpp | 289 +++++++++++ libminifi/src/processors/InvokeHTTP.cpp | 190 +++----- libminifi/src/processors/ListenHTTP.cpp | 10 +- libminifi/src/processors/ListenSyslog.cpp | 2 +- libminifi/src/processors/LogAttribute.cpp | 1 + libminifi/src/processors/TailFile.cpp | 4 +- libminifi/src/provenance/Provenance.cpp | 6 +- .../src/provenance/ProvenanceRepository.cpp | 36 +- libminifi/src/utils/HttpClient.cpp | 214 ++++++++ libminifi/src/utils/Id.cpp | 19 +- libminifi/test/.device_id | 1 + libminifi/test/TestBase.cpp | 16 +- libminifi/test/TestBase.h | 14 +- libminifi/test/TestServer.h | 54 +-- .../test/integration/C2NullConfiguration.cpp | 135 ++++++ libminifi/test/integration/C2UpdateTest.cpp | 185 +++++++ .../integration/C2VerifyHeartbeatAndStop.cpp | 153 ++++++ .../test/integration/C2VerifyServeResults.cpp | 131 +++++ .../ControllerServiceIntegrationTests.cpp | 33 +- libminifi/test/integration/GetFileNoData.cpp | 184 +++++++ .../HttpConfigurationListenerTest.cpp | 131 ----- .../test/integration/HttpGetIntegrationTest.cpp | 45 +- .../integration/HttpPostIntegrationTest.cpp | 127 +++-- libminifi/test/integration/IntegrationBase.h | 177 +++++++ .../integration/ProvenanceReportingTest.cpp | 1 + .../test/integration/Site2SiteRestTest.cpp | 148 ------ .../test/integration/SiteToSiteRestTest.cpp | 144 ++++++ .../test/integration/TestExecuteProcess.cpp | 9 +- libminifi/test/integration/ThreadPoolAdjust.cpp | 109 +++++ libminifi/test/resources/TestHTTPGet.yml | 10 +- libminifi/test/resources/TestHTTPGetSecure.yml | 13 +- libminifi/test/resources/TestHTTPPost.yml | 59 ++- .../resources/TestHTTPPostChunkedEncoding.yml | 97 ++++ .../test/resources/TestSite2SiteRestSecure.yml | 58 +++ libminifi/test/resources/cn.crt.key.pem | 52 ++ libminifi/test/resources/nifi-cert-key.pem | 47 ++ libminifi/test/unit/C2MetricsTests.cpp | 230 +++++++++ libminifi/test/unit/GetTCPTests.cpp | 419 ++++++++++++++++ libminifi/test/unit/InvokeHTTPTests.cpp | 99 ++-- libminifi/test/unit/LoggerTests.cpp | 6 + libminifi/test/unit/ProcessorTests.cpp | 40 +- libminifi/test/unit/PropertyTests.cpp | 13 + libminifi/test/unit/ProvenanceTestHelper.h | 27 +- libminifi/test/unit/RepoTests.cpp | 54 +++ libminifi/test/unit/SchedulingAgentTests.cpp | 36 ++ libminifi/test/unit/Site2SiteTests.cpp | 2 +- libminifi/test/unit/TailFileTests.cpp | 80 ++- libminifi/test/unit/ThreadPoolTests.cpp | 55 +++ libminifi/test/unit/YamlConfigurationTests.cpp | 3 +- main/CMakeLists.txt | 7 +- 183 files changed, 9290 insertions(+), 2225 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/CMakeLists.txt b/CMakeLists.txt index 292bc8d..a81127c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -35,6 +35,8 @@ ENDIF(POLICY CMP0048) include(CheckCXXCompilerFlag) CHECK_CXX_COMPILER_FLAG("-std=c++11 " COMPILER_SUPPORTS_CXX11) CHECK_CXX_COMPILER_FLAG("-std=c++0x " COMPILER_SUPPORTS_CXX0X) +SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ") +SET(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS}") if(COMPILER_SUPPORTS_CXX11) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") elseif(COMPILER_SUPPORTS_CXX0X) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/README.md ---------------------------------------------------------------------- diff --git a/README.md b/README.md index db10ab4..1c879c5 100644 --- a/README.md +++ b/README.md @@ -317,6 +317,30 @@ Additionally, users can utilize the MiNiFi Toolkit Converter (version 0.0.1 - sc if you do not want to enable client certificate base authorization nifi.security.need.ClientAuth=false + +### Command and Control Configuration +For more more insight into the API used within the C2 agent, please visit: +https://cwiki.apache.org/confluence/display/MINIFI/C2+Design+Proposal + + in minifi.properties + + #Disable/Enable C2 + nifi.c2.enable=true + + #specify metrics classes + nifi.flow.metrics.classes=DeviceInformation,SystemInformation,ProcessMetrics + + #specify C2 protocol + c2.agent.protocol.class=RESTSender + + #control c2 heartbeat interval in millisecocnds + c2.agent.heartbeat.period=3000 + + # enable reporter classes + c2.agent.heartbeat.reporter.class=RESTReciver + + + ### Configuring Volatile and NO-OP Repositories Each of the repositories can be configured to be volatile ( state kept in memory and flushed http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/cmake/BuildTests.cmake ---------------------------------------------------------------------- diff --git a/cmake/BuildTests.cmake b/cmake/BuildTests.cmake index 29603bf..69c5e7d 100644 --- a/cmake/BuildTests.cmake +++ b/cmake/BuildTests.cmake @@ -40,10 +40,14 @@ function(createTests testName) target_include_directories(${testName} PRIVATE BEFORE ${LEVELDB_INCLUDE_DIRS}) target_include_directories(${testName} PRIVATE BEFORE "include") target_include_directories(${testName} PRIVATE BEFORE "libminifi/include/") + target_include_directories(${testName} PRIVATE BEFORE "libminifi/include/c2/protocols") + target_include_directories(${testName} PRIVATE BEFORE "libminifi/include/c2") target_include_directories(${testName} PRIVATE BEFORE "libminifi/include/core") target_include_directories(${testName} PRIVATE BEFORE "libminifi/include/core/controller") target_include_directories(${testName} PRIVATE BEFORE "libminifi/include/core/repository") target_include_directories(${testName} PRIVATE BEFORE "libminifi/include/core/yaml") + target_include_directories(${testName} PRIVATE BEFORE "libminifi/include/core/statemanagement") + target_include_directories(${testName} PRIVATE BEFORE "libminifi/include/core/statemanagement/metrics") target_include_directories(${testName} PRIVATE BEFORE "libminifi/include/io") target_include_directories(${testName} PRIVATE BEFORE "libminifi/include/utils") target_include_directories(${testName} PRIVATE BEFORE "libminifi/include/processors") @@ -88,12 +92,25 @@ add_test(NAME ControllerServiceIntegrationTests COMMAND ControllerServiceIntegra add_test(NAME HttpGetIntegrationTest COMMAND HttpGetIntegrationTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/") -add_test(NAME HttpConfigurationListenerTest COMMAND HttpConfigurationListenerTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/") +add_test(NAME C2UpdateTest COMMAND C2UpdateTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/") add_test(NAME HttpGetIntegrationTestSecure COMMAND HttpGetIntegrationTest "${TEST_RESOURCES}/TestHTTPGetSecure.yml" "${TEST_RESOURCES}/") -add_test(NAME HttpPostIntegrationTest COMMAND HttpPostIntegrationTest "${TEST_RESOURCES}/TestHTTPPost.yml" ) +add_test(NAME HttpPostIntegrationTest COMMAND HttpPostIntegrationTest "${TEST_RESOURCES}/TestHTTPPost.yml" "${TEST_RESOURCES}/") -add_test(NAME Site2SiteRestTest COMMAND Site2SiteRestTest "${TEST_RESOURCES}/TestSite2SiteRest.yml" "${TEST_RESOURCES}/") +add_test(NAME HttpPostIntegrationTestChunked COMMAND HttpPostIntegrationTest "${TEST_RESOURCES}/TestHTTPPostChunkedEncoding.yml" "${TEST_RESOURCES}/") + +add_test(NAME ThreadPoolAdjust COMMAND ThreadPoolAdjust "${TEST_RESOURCES}/TestHTTPPostChunkedEncoding.yml" "${TEST_RESOURCES}/") + +add_test(NAME C2VerifyServeResults COMMAND C2VerifyServeResults "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/") + +add_test(NAME C2NullConfiguration COMMAND C2NullConfiguration "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/") + +add_test(NAME C2VerifyHeartbeatAndStop COMMAND C2VerifyHeartbeatAndStop "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/") + +add_test(NAME SiteToSiteRestTest COMMAND SiteToSiteRestTest "${TEST_RESOURCES}/TestSite2SiteRest.yml" "${TEST_RESOURCES}/" "http://localhost:8082/nifi-api/controller") + +## removed due to travis issues with our certs +#add_test(NAME SiteToSiteRestTestSecure COMMAND SiteToSiteRestTest "${TEST_RESOURCES}/TestSite2SiteRestSecure.yml" "${TEST_RESOURCES}/" "https://localhost:8082/nifi-api/controller") add_test(NAME TestExecuteProcess COMMAND TestExecuteProcess ) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt index 5e63a30..539256b 100644 --- a/libminifi/CMakeLists.txt +++ b/libminifi/CMakeLists.txt @@ -62,7 +62,7 @@ include_directories(../thirdparty/jsoncpp/include) include_directories(../thirdparty/concurrentqueue/) include_directories(include) -file(GLOB SOURCES "src/core/logging/*.cpp" "src/io/*.cpp" "src/io/tls/*.cpp" "src/core/controller/*.cpp" "src/controllers/*.cpp" "src/core/*.cpp" "src/core/repository/*.cpp" "src/core/yaml/*.cpp" "src/core/reporting/*.cpp" "src/provenance/*.cpp" "src/processors/*.cpp" "src/utils/*.cpp" "src/*.cpp") +file(GLOB SOURCES "src/core/logging/*.cpp" "src/core/state/*.cpp" "src/c2/protocols/*.cpp" "src/c2/*.cpp" "src/io/*.cpp" "src/io/tls/*.cpp" "src/core/controller/*.cpp" "src/controllers/*.cpp" "src/core/*.cpp" "src/core/repository/*.cpp" "src/core/yaml/*.cpp" "src/core/reporting/*.cpp" "src/provenance/*.cpp" "src/processors/*.cpp" "src/utils/*.cpp" "src/*.cpp") file(GLOB SPD_SOURCES "../thirdparty/spdlog-20170710/include/spdlog/*") @@ -110,3 +110,5 @@ endif (OPENSSL_FOUND) endif () add_subdirectory(src/utils) + +set_property(TARGET minifi PROPERTY INTERPROCEDURAL_OPTIMIZATION True) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/ConfigurationListener.h ---------------------------------------------------------------------- diff --git a/libminifi/include/ConfigurationListener.h b/libminifi/include/ConfigurationListener.h deleted file mode 100644 index 856ea95..0000000 --- a/libminifi/include/ConfigurationListener.h +++ /dev/null @@ -1,117 +0,0 @@ -/** - * ConfigurationListener class declaration - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __CONFIGURATION_LISTENER__ -#define __CONFIGURATION_LISTENER__ - -#include <memory> -#include <atomic> -#include <cstdint> -#include <cstring> -#include <iostream> -#include <string> -#include <thread> - -#include "yaml-cpp/yaml.h" -#include "core/Core.h" -#include "core/Property.h" -#include "properties/Configure.h" -#include "core/logging/Logger.h" -#include "core/logging/LoggerConfiguration.h" - -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -// Forwarder declaration -class FlowController; -// ConfigurationListener Class -class ConfigurationListener { -public: - - // Constructor - /*! - * Create a new processor - */ - ConfigurationListener(std::shared_ptr<FlowController> controller, - std::shared_ptr<Configure> configure, std::string type) : - connect_timeout_(20000), read_timeout_(20000), type_(type), configure_( - configure), controller_(controller) { - logger_ = logging::LoggerFactory<ConfigurationListener>::getLogger(); - running_ = false; - - } - // Destructor - virtual ~ConfigurationListener() { - stop(); - } - - // Start the thread - void start(); - // Stop the thread - void stop(); - // whether the thread is enable - bool isRunning() { - return running_; - } - // pull the new configuration from the remote host - virtual bool pullConfiguration(std::string &configuration) { - return false; - } - -protected: - - // Run function for the thread - void run(); - - // Run function for the thread - void threadExecutor() { - run(); - } - - // Mutex for protection - std::mutex mutex_; - // thread - std::thread thread_; - // whether the thread is running - std::atomic<bool> running_; - - // url - std::string url_; - // connection timeout - int64_t connect_timeout_; - // read timeout. - int64_t read_timeout_; - // pull interval - int64_t pull_interval_; - // type (http/rest) - std::string type_; - // last applied configuration - std::string lastAppliedConfiguration; - - std::shared_ptr<Configure> configure_; - std::shared_ptr<logging::Logger> logger_; - std::shared_ptr<FlowController> controller_; -}; - -} /* namespace minifi */ -} /* namespace nifi */ -} /* namespace apache */ -} /* namespace org */ - -#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/Connection.h ---------------------------------------------------------------------- diff --git a/libminifi/include/Connection.h b/libminifi/include/Connection.h index ff32baf..c92a626 100644 --- a/libminifi/include/Connection.h +++ b/libminifi/include/Connection.h @@ -47,8 +47,8 @@ class Connection : public core::Connectable, public std::enable_shared_from_this /* * Create a new processor */ - explicit Connection(const std::shared_ptr<core::Repository> &flow_repository, const std::shared_ptr<core::ContentRepository> &content_repo, std::string name, uuid_t uuid = NULL, - uuid_t srcUUID = NULL, + explicit Connection(const std::shared_ptr<core::Repository> &flow_repository, const std::shared_ptr<core::ContentRepository> &content_repo, std::string name, uuid_t uuid = NULL, uuid_t srcUUID = + NULL, uuid_t destUUID = NULL); // Destructor virtual ~Connection() { @@ -130,7 +130,7 @@ class Connection : public core::Connectable, public std::enable_shared_from_this } // Get queue data size uint64_t getQueueDataSize() { - return max_data_queue_size_; + return queued_data_size_; } // Put the flow file into queue void put(std::shared_ptr<core::FlowFile> flow); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/EventDrivenSchedulingAgent.h ---------------------------------------------------------------------- diff --git a/libminifi/include/EventDrivenSchedulingAgent.h b/libminifi/include/EventDrivenSchedulingAgent.h index ca9f021..b434de5 100644 --- a/libminifi/include/EventDrivenSchedulingAgent.h +++ b/libminifi/include/EventDrivenSchedulingAgent.h @@ -46,7 +46,7 @@ class EventDrivenSchedulingAgent : public ThreadedSchedulingAgent { virtual ~EventDrivenSchedulingAgent() { } // Run function for the thread - uint64_t run(std::shared_ptr<core::Processor> processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory); + uint64_t run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory); private: // Prevent default copy constructor and assignment operation http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/FlowController.h ---------------------------------------------------------------------- diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h index d9a0452..e79999f 100644 --- a/libminifi/include/FlowController.h +++ b/libminifi/include/FlowController.h @@ -44,11 +44,10 @@ #include "TimerDrivenSchedulingAgent.h" #include "EventDrivenSchedulingAgent.h" #include "FlowControlProtocol.h" -#include "ConfigurationListener.h" -#include "HttpConfigurationListener.h" - #include "core/Property.h" #include "utils/Id.h" +#include "core/state/metrics/MetricsBase.h" +#include "core/state/StateManager.h" namespace org { namespace apache { @@ -62,7 +61,7 @@ namespace minifi { * Flow Controller class. Generally used by FlowController factory * as a singleton. */ -class FlowController : public core::controller::ControllerServiceProvider, public std::enable_shared_from_this<FlowController> { +class FlowController : public core::controller::ControllerServiceProvider, public state::StateManager { public: static const int DEFAULT_MAX_TIMER_DRIVEN_THREAD = 10; static const int DEFAULT_MAX_EVENT_DRIVEN_THREAD = 5; @@ -71,20 +70,16 @@ class FlowController : public core::controller::ControllerServiceProvider, publi * Flow controller constructor */ explicit FlowController(std::shared_ptr<core::Repository> provenance_repo, std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<Configure> configure, - std::unique_ptr<core::FlowConfiguration> flow_configuration, - std::shared_ptr<core::ContentRepository> content_repo, const std::string name, bool headless_mode); + std::unique_ptr<core::FlowConfiguration> flow_configuration, std::shared_ptr<core::ContentRepository> content_repo, const std::string name, bool headless_mode); explicit FlowController(std::shared_ptr<core::Repository> provenance_repo, std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<Configure> configure, - std::unique_ptr<core::FlowConfiguration> flow_configuration, - std::shared_ptr<core::ContentRepository> content_repo) - : FlowController(provenance_repo, flow_file_repo, configure, std::move(flow_configuration), content_repo, DEFAULT_ROOT_GROUP_NAME, false) - { + std::unique_ptr<core::FlowConfiguration> flow_configuration, std::shared_ptr<core::ContentRepository> content_repo) + : FlowController(provenance_repo, flow_file_repo, configure, std::move(flow_configuration), content_repo, DEFAULT_ROOT_GROUP_NAME, false) { } explicit FlowController(std::shared_ptr<core::Repository> provenance_repo, std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<Configure> configure, std::unique_ptr<core::FlowConfiguration> flow_configuration) - : FlowController(provenance_repo, flow_file_repo, configure, std::move(flow_configuration), std::make_shared<core::repository::FileSystemRepository>(), DEFAULT_ROOT_GROUP_NAME, false) - { + : FlowController(provenance_repo, flow_file_repo, configure, std::move(flow_configuration), std::make_shared<core::repository::FileSystemRepository>(), DEFAULT_ROOT_GROUP_NAME, false) { content_repo_->initialize(configure); } @@ -124,14 +119,33 @@ class FlowController : public core::controller::ControllerServiceProvider, publi virtual bool isRunning() { return running_.load(); } + // Whether the Flow Controller has already been initialized (loaded flow XML) virtual bool isInitialized() { return initialized_.load(); } // Start to run the Flow Controller which internally start the root process group and all its children - virtual bool start(); + virtual int16_t start(); + virtual int16_t pause() { + return -1; + } // Unload the current flow YAML, clean the root process group and all its children - virtual void stop(bool force); + virtual int16_t stop(bool force, uint64_t timeToWait = 0); + virtual int16_t applyUpdate(const std::string &configuration); + virtual int16_t drainRepositories() { + + return -1; + } + + virtual std::vector<std::shared_ptr<state::StateController>> getComponents(const std::string &name); + + virtual std::vector<std::shared_ptr<state::StateController>> getAllComponents(); + + virtual int16_t clearConnection(const std::string &connection); + + virtual int16_t applyUpdate(const std::shared_ptr<state::Update> &updateController) { + return -1; + } // Asynchronous function trigger unloading and wait for a period of time virtual void waitUnload(const uint64_t timeToWaitMs); // Unload the current flow xml, clean the root process group and all its children @@ -158,7 +172,7 @@ class FlowController : public core::controller::ControllerServiceProvider, publi // first it will validate the payload with the current root node config for flowController // like FlowController id/name is the same and new version is greater than the current version // after that, it will apply the configuration - bool applyConfiguration(std::string &configurePayload); + bool applyConfiguration(const std::string &configurePayload); // get name std::string getName() { @@ -168,6 +182,10 @@ class FlowController : public core::controller::ControllerServiceProvider, publi return ""; } + virtual std::string getComponentName() { + return "FlowController"; + } + // get version int getVersion() { if (root_ != nullptr) @@ -199,7 +217,7 @@ class FlowController : public core::controller::ControllerServiceProvider, publi * Enables the controller service services * @param serviceNode service node which will be disabled, along with linked services. */ - virtual void enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode); + virtual std::future<bool> enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode); /** * Enables controller services @@ -211,7 +229,7 @@ class FlowController : public core::controller::ControllerServiceProvider, publi * Disables controller services * @param serviceNode service node which will be disabled, along with linked services. */ - virtual void disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode); + virtual std::future<bool> disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode); /** * Gets all controller services. @@ -278,11 +296,25 @@ class FlowController : public core::controller::ControllerServiceProvider, publi */ virtual void enableAllControllerServices(); + /** + * Retrieves all metrics from this source. + * @param metric_vector -- metrics will be placed in this vector. + * @return result of the get operation. + * 0 Success + * 1 No error condition, but cannot obtain lock in timely manner. + * -1 failure + */ + virtual int16_t getMetrics(std::vector<std::shared_ptr<state::metrics::Metrics>> &metric_vector, uint8_t metricsClass); + + virtual uint64_t getUptime(); + protected: // function to load the flow file repo. void loadFlowRepo(); + void initializeC2(); + /** * Initializes flow controller paths. */ @@ -304,8 +336,12 @@ class FlowController : public core::controller::ControllerServiceProvider, publi // FlowFile Repo // Whether it is running std::atomic<bool> running_; + // conifiguration filename std::string configuration_filename_; + + std::atomic<bool> c2_initialized_; + std::atomic<bool> c2_enabled_; // Whether it has already been initialized (load the flow XML already) std::atomic<bool> initialized_; // Provenance Repo @@ -336,15 +372,29 @@ class FlowController : public core::controller::ControllerServiceProvider, publi // flow configuration object. std::unique_ptr<core::FlowConfiguration> flow_configuration_; - private: + // metrics information + + std::chrono::steady_clock::time_point start_time_; + + std::mutex metrics_mutex_; + // metrics cache + std::map<std::string, std::shared_ptr<state::metrics::Metrics>> metrics_; + + // metrics cache + std::map<std::string, std::shared_ptr<state::metrics::Metrics>> component_metrics_; + + std::map<uint8_t, std::vector<std::shared_ptr<state::metrics::Metrics>>>component_metrics_by_id_; + // metrics last run + std::chrono::steady_clock::time_point last_metrics_capture_; + +private: std::shared_ptr<logging::Logger> logger_; - // http configuration listener object. - std::unique_ptr<HttpConfigurationListener> http_configuration_listener_; std::string serial_number_; static std::shared_ptr<utils::IdGenerator> id_generator_; }; -} /* namespace minifi */ +} +/* namespace minifi */ } /* namespace nifi */ } /* namespace apache */ } /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/FlowFileRecord.h ---------------------------------------------------------------------- diff --git a/libminifi/include/FlowFileRecord.h b/libminifi/include/FlowFileRecord.h index d6e5f2e..9299b8a 100644 --- a/libminifi/include/FlowFileRecord.h +++ b/libminifi/include/FlowFileRecord.h @@ -94,7 +94,6 @@ class OutputStreamCallback { virtual ~OutputStreamCallback() { } - //virtual void process(std::ofstream *stream) = 0; virtual int64_t process(std::shared_ptr<io::BaseStream> stream) = 0; }; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/HttpConfigurationListener.h ---------------------------------------------------------------------- diff --git a/libminifi/include/HttpConfigurationListener.h b/libminifi/include/HttpConfigurationListener.h deleted file mode 100644 index 7e3291e..0000000 --- a/libminifi/include/HttpConfigurationListener.h +++ /dev/null @@ -1,76 +0,0 @@ -/** - * HttpConfigurationListener class declaration - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __HTTP_CONFIGURATION_LISTENER__ -#define __HTTP_CONFIGURATION_LISTENER__ - -#include <curl/curl.h> -#include "core/Core.h" -#include "core/Property.h" -#include "ConfigurationListener.h" -#include "utils/HTTPUtils.h" - -namespace org { -namespace apache { -namespace nifi { -namespace minifi { - -// HttpConfigurationListener Class -class HttpConfigurationListener: public ConfigurationListener { -public: - - // Constructor - /*! - * Create a new processor - */ - HttpConfigurationListener(std::shared_ptr<FlowController> controller, - std::shared_ptr<Configure> configure) : - minifi::ConfigurationListener(controller, configure, "http"), - securityConfig_(configure) { - std::string value; - - if (configure->get(Configure::nifi_configuration_listener_http_url, value)) { - url_ = value; - logger_->log_info("Http configuration listener URL %s", url_.c_str()); - } else { - url_ = ""; - } - - curl_global_init(CURL_GLOBAL_DEFAULT); - this->start(); - } - - bool pullConfiguration(std::string &configuration); - - // Destructor - virtual ~HttpConfigurationListener() { - this->stop(); - curl_global_cleanup(); - } - -protected: - minifi::utils::HTTPSecurityConfiguration securityConfig_; - -}; - -} /* namespace minifi */ -} /* namespace nifi */ -} /* namespace apache */ -} /* namespace org */ - -#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/RemoteProcessorGroupPort.h ---------------------------------------------------------------------- diff --git a/libminifi/include/RemoteProcessorGroupPort.h b/libminifi/include/RemoteProcessorGroupPort.h index d484fb9..cefce45 100644 --- a/libminifi/include/RemoteProcessorGroupPort.h +++ b/libminifi/include/RemoteProcessorGroupPort.h @@ -23,13 +23,14 @@ #include <mutex> #include <memory> #include <stack> -#include "utils/HTTPUtils.h" +#include "utils/HTTPClient.h" #include "concurrentqueue.h" #include "FlowFileRecord.h" #include "core/Processor.h" #include "core/ProcessSession.h" #include "Site2SiteClientProtocol.h" #include "io/StreamFactory.h" +#include "controllers/SSLContextService.h" #include "core/logging/LoggerConfiguration.h" namespace org { @@ -43,14 +44,15 @@ class RemoteProcessorGroupPort : public core::Processor { /*! * Create a new processor */ - RemoteProcessorGroupPort(const std::shared_ptr<io::StreamFactory> &stream_factory, std::string name, std::string url, std::shared_ptr<Configure> configure, uuid_t uuid = nullptr) + RemoteProcessorGroupPort(const std::shared_ptr<io::StreamFactory> &stream_factory, std::string name, std::string url, const std::shared_ptr<Configure> &configure, uuid_t uuid = nullptr) : core::Processor(name, uuid), configure_(configure), direction_(SEND), transmitting_(false), + timeout_(0), logger_(logging::LoggerFactory<RemoteProcessorGroupPort>::getLogger()), url_(url), - securityConfig_(configure) { + ssl_service(nullptr) { stream_factory_ = stream_factory; if (uuid != nullptr) { uuid_copy(protocol_uuid_, uuid); @@ -71,11 +73,12 @@ class RemoteProcessorGroupPort : public core::Processor { static const char *ProcessorName; // Supported Properties static core::Property hostName; + static core::Property SSLContext; static core::Property port; static core::Property portUUID; // Supported Relationships static core::Relationship relation; - public: + public: void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory); // OnTrigger method, implemented by NiFi RemoteProcessorGroupPort virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session); @@ -102,8 +105,7 @@ class RemoteProcessorGroupPort : public core::Processor { if (port_ == -1) { if (protocol_.find("https") != std::string::npos) { port_ = 443; - } - else if (protocol_.find("http") != std::string::npos) { + } else if (protocol_.find("http") != std::string::npos) { port_ = 80; } } @@ -142,15 +144,17 @@ class RemoteProcessorGroupPort : public core::Processor { std::string url_; // Remote Site2Site Info - int site2site_port_; - bool site2site_secure_; + int site2site_port_;bool site2site_secure_; std::vector<minifi::Site2SitePeerStatus> site2site_peer_status_list_; std::atomic<int> site2site_peer_index_; std::mutex site2site_peer_mutex_; std::string rest_user_name_; std::string rest_password_; - minifi::utils::HTTPSecurityConfiguration securityConfig_; + std::shared_ptr<controllers::SSLContextService> ssl_service; + + private: + static const char* RPG_SSL_CONTEXT_SERVICE_NAME; }; } /* namespace minifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/ResourceClaim.h ---------------------------------------------------------------------- diff --git a/libminifi/include/ResourceClaim.h b/libminifi/include/ResourceClaim.h index 19a67fa..b788416 100644 --- a/libminifi/include/ResourceClaim.h +++ b/libminifi/include/ResourceClaim.h @@ -41,12 +41,15 @@ namespace minifi { // Default content directory #define DEFAULT_CONTENT_DIRECTORY "./content_repository" +extern std::string default_directory_path; + +extern void setDefaultDirectory(std::string); + // ResourceClaim Class class ResourceClaim : public std::enable_shared_from_this<ResourceClaim> { public: - static char *default_directory_path; // Constructor /*! * Create a new resource claim @@ -55,23 +58,20 @@ class ResourceClaim : public std::enable_shared_from_this<ResourceClaim> { ResourceClaim(const std::string path, std::shared_ptr<core::StreamManager<ResourceClaim>> claim_manager, bool deleted = false); // Destructor - virtual ~ResourceClaim() { + ~ResourceClaim() { } // increaseFlowFileRecordOwnedCount void increaseFlowFileRecordOwnedCount() { - ++_flowFileRecordOwnedCount; + claim_manager_->incrementStreamCount(shared_from_this()); } // decreaseFlowFileRecordOwenedCount void decreaseFlowFileRecordOwnedCount() { - - if (_flowFileRecordOwnedCount > 0) { - _flowFileRecordOwnedCount--; - } + claim_manager_->decrementStreamCount(shared_from_this()); } // getFlowFileRecordOwenedCount uint64_t getFlowFileRecordOwnedCount() { - return _flowFileRecordOwnedCount; + return claim_manager_->getStreamCount(shared_from_this()); } // Get the content full path std::string getContentFullPath() { @@ -83,13 +83,19 @@ class ResourceClaim : public std::enable_shared_from_this<ResourceClaim> { } void deleteClaim() { - if (!deleted_) - { + if (!deleted_) { deleted_ = true; } } + bool exists() { + if (claim_manager_ == nullptr) { + return false; + } + return claim_manager_->exists(shared_from_this()); + } + friend std::ostream& operator<<(std::ostream& stream, const ResourceClaim& claim) { stream << claim._contentFullPath; return stream; @@ -104,9 +110,6 @@ class ResourceClaim : public std::enable_shared_from_this<ResourceClaim> { // Full path to the content std::string _contentFullPath; - // How many FlowFileRecord Own this cliam - std::atomic<uint64_t> _flowFileRecordOwnedCount; - std::shared_ptr<core::StreamManager<ResourceClaim>> claim_manager_; private: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/SchedulingAgent.h ---------------------------------------------------------------------- diff --git a/libminifi/include/SchedulingAgent.h b/libminifi/include/SchedulingAgent.h index 130c088..569c4ee 100644 --- a/libminifi/include/SchedulingAgent.h +++ b/libminifi/include/SchedulingAgent.h @@ -52,10 +52,8 @@ class SchedulingAgent { /*! * Create a new scheduling agent. */ - SchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, std::shared_ptr<core::Repository> repo, - std::shared_ptr<core::Repository> flow_repo, - std::shared_ptr<core::ContentRepository> content_repo, - std::shared_ptr<Configure> configuration) + SchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_repo, + std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configuration) : configure_(configuration), admin_yield_duration_(0), bored_yield_duration_(0), @@ -65,7 +63,7 @@ class SchedulingAgent { running_ = false; repo_ = repo; flow_repo_ = flow_repo; - utils::ThreadPool<bool> pool = utils::ThreadPool<bool>(configure_->getInt(Configure::nifi_flow_engine_threads, 8), true); + utils::ThreadPool<bool> pool = utils::ThreadPool<bool>(configure_->getInt(Configure::nifi_flow_engine_threads, 2), true); component_lifecycle_thread_pool_ = std::move(pool); component_lifecycle_thread_pool_.start(); } @@ -74,7 +72,7 @@ class SchedulingAgent { } // onTrigger, return whether the yield is need - bool onTrigger(std::shared_ptr<core::Processor> processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory); + bool onTrigger(std::shared_ptr<core::Processor> processor, std::shared_ptr<core::ProcessContext> processContext, std::shared_ptr<core::ProcessSessionFactory> sessionFactory); // Whether agent has work to do bool hasWorkToDo(std::shared_ptr<core::Processor> processor); // Whether the outgoing need to be backpressure @@ -82,6 +80,7 @@ class SchedulingAgent { // start void start() { running_ = true; + component_lifecycle_thread_pool_.start(); } // stop virtual void stop() { @@ -90,8 +89,8 @@ class SchedulingAgent { } public: - virtual void enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode); - virtual void disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode); + virtual std::future<bool> enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode); + virtual std::future<bool> disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode); // schedule, overwritten by different DrivenSchedulingAgent virtual void schedule(std::shared_ptr<core::Processor> processor) = 0; // unschedule, overwritten by different DrivenSchedulingAgent @@ -112,7 +111,7 @@ class SchedulingAgent { std::shared_ptr<Configure> configure_; std::shared_ptr<core::Repository> repo_; - + std::shared_ptr<core::Repository> flow_repo_; std::shared_ptr<core::ContentRepository> content_repo_; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/Site2SiteClientProtocol.h ---------------------------------------------------------------------- diff --git a/libminifi/include/Site2SiteClientProtocol.h b/libminifi/include/Site2SiteClientProtocol.h index dcb551a..df974fb 100644 --- a/libminifi/include/Site2SiteClientProtocol.h +++ b/libminifi/include/Site2SiteClientProtocol.h @@ -174,29 +174,13 @@ typedef struct { } RespondCodeContext; // Respond Code Context -static RespondCodeContext respondCodeContext[] = { - { RESERVED, "Reserved for Future Use", false }, - { PROPERTIES_OK, "Properties OK", false }, - { UNKNOWN_PROPERTY_NAME, "Unknown Property Name", true }, - { ILLEGAL_PROPERTY_VALUE, "Illegal Property Value", true }, - { MISSING_PROPERTY, "Missing Property", true }, - { CONTINUE_TRANSACTION, "Continue Transaction", false }, - { FINISH_TRANSACTION, "Finish Transaction", false }, - { CONFIRM_TRANSACTION, "Confirm Transaction", true }, - { TRANSACTION_FINISHED, "Transaction Finished", false }, - { TRANSACTION_FINISHED_BUT_DESTINATION_FULL, "Transaction Finished But Destination is Full", false }, - { CANCEL_TRANSACTION, "Cancel Transaction", true }, - { BAD_CHECKSUM, "Bad Checksum", false }, - { MORE_DATA, "More Data Exists", false }, - { NO_MORE_DATA, "No More Data Exists", false }, - { UNKNOWN_PORT, "Unknown Port", false }, - { PORT_NOT_IN_VALID_STATE, "Port Not in a Valid State", true }, - { PORTS_DESTINATION_FULL, "Port's Destination is Full", false }, - { UNAUTHORIZED, "User Not Authorized", true }, - { ABORT, "Abort", true }, - { UNRECOGNIZED_RESPONSE_CODE, "Unrecognized Response Code", false }, - { END_OF_STREAM, "End of Stream", false } -}; +static RespondCodeContext respondCodeContext[] = { { RESERVED, "Reserved for Future Use", false }, { PROPERTIES_OK, "Properties OK", false }, { UNKNOWN_PROPERTY_NAME, "Unknown Property Name", true }, + { ILLEGAL_PROPERTY_VALUE, "Illegal Property Value", true }, { MISSING_PROPERTY, "Missing Property", true }, { CONTINUE_TRANSACTION, "Continue Transaction", false }, { FINISH_TRANSACTION, + "Finish Transaction", false }, { CONFIRM_TRANSACTION, "Confirm Transaction", true }, { TRANSACTION_FINISHED, "Transaction Finished", false }, { TRANSACTION_FINISHED_BUT_DESTINATION_FULL, + "Transaction Finished But Destination is Full", false }, { CANCEL_TRANSACTION, "Cancel Transaction", true }, { BAD_CHECKSUM, "Bad Checksum", false }, { MORE_DATA, "More Data Exists", false }, + { NO_MORE_DATA, "No More Data Exists", false }, { UNKNOWN_PORT, "Unknown Port", false }, { PORT_NOT_IN_VALID_STATE, "Port Not in a Valid State", true }, { PORTS_DESTINATION_FULL, + "Port's Destination is Full", false }, { UNAUTHORIZED, "User Not Authorized", true }, { ABORT, "Abort", true }, { UNRECOGNIZED_RESPONSE_CODE, "Unrecognized Response Code", false }, { + END_OF_STREAM, "End of Stream", false } }; // Respond Code Sequence Pattern static const uint8_t CODE_SEQUENCE_VALUE_1 = (uint8_t) 'R'; @@ -246,47 +230,47 @@ typedef enum { // HandShakeProperty Str static const char *HandShakePropertyStr[MAX_HANDSHAKE_PROPERTY] = { - /** - * Boolean value indicating whether or not the contents of a FlowFile should - * be GZipped when transferred. - */ - "GZIP", - /** - * The unique identifier of the port to communicate with - */ - "PORT_IDENTIFIER", - /** - * Indicates the number of milliseconds after the request was made that the - * client will wait for a response. If no response has been received by the - * time this value expires, the server can move on without attempting to - * service the request because the client will have already disconnected. - */ - "REQUEST_EXPIRATION_MILLIS", - /** - * The preferred number of FlowFiles that the server should send to the - * client when pulling data. This property was introduced in version 5 of - * the protocol. - */ - "BATCH_COUNT", - /** - * The preferred number of bytes that the server should send to the client - * when pulling data. This property was introduced in version 5 of the - * protocol. - */ - "BATCH_SIZE", - /** - * The preferred amount of time that the server should send data to the - * client when pulling data. This property was introduced in version 5 of - * the protocol. Value is in milliseconds. - */ - "BATCH_DURATION" }; +/** + * Boolean value indicating whether or not the contents of a FlowFile should + * be GZipped when transferred. + */ +"GZIP", +/** + * The unique identifier of the port to communicate with + */ +"PORT_IDENTIFIER", +/** + * Indicates the number of milliseconds after the request was made that the + * client will wait for a response. If no response has been received by the + * time this value expires, the server can move on without attempting to + * service the request because the client will have already disconnected. + */ +"REQUEST_EXPIRATION_MILLIS", +/** + * The preferred number of FlowFiles that the server should send to the + * client when pulling data. This property was introduced in version 5 of + * the protocol. + */ +"BATCH_COUNT", +/** + * The preferred number of bytes that the server should send to the client + * when pulling data. This property was introduced in version 5 of the + * protocol. + */ +"BATCH_SIZE", +/** + * The preferred amount of time that the server should send data to the + * client when pulling data. This property was introduced in version 5 of + * the protocol. Value is in milliseconds. + */ +"BATCH_DURATION" }; class Site2SiteClientProtocol; // Transaction Class class Transaction { friend class Site2SiteClientProtocol; - public: + public: // Constructor /*! * Create a new transaction @@ -389,13 +373,12 @@ class DataPacket { }; /** - * Site2Site Peer - */ - typedef struct Site2SitePeerStatus { - std::string host_; - int port_; - bool isSecure_; - } Site2SitePeerStatus; + * Site2Site Peer + */ +typedef struct Site2SitePeerStatus { + std::string host_; + int port_;bool isSecure_; +} Site2SitePeerStatus; // Site2SiteClientProtocol Class class Site2SiteClientProtocol { @@ -525,7 +508,7 @@ class Site2SiteClientProtocol { bool receive(std::string transactionID, DataPacket *packet, bool &eof); // Send the data packet from the transaction // Return false when any error occurs - bool send(std::string transactionID, DataPacket *packet, std::shared_ptr<FlowFileRecord> flowFile, core::ProcessSession *session); + int16_t send(std::string transactionID, DataPacket *packet, std::shared_ptr<FlowFileRecord> flowFile, core::ProcessSession *session); // Confirm the data that was sent or received by comparing CRC32's of the data sent and the data received. bool confirm(std::string transactionID); // Cancel the transaction http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/Site2SitePeer.h ---------------------------------------------------------------------- diff --git a/libminifi/include/Site2SitePeer.h b/libminifi/include/Site2SitePeer.h index 65a5479..59f5cfe 100644 --- a/libminifi/include/Site2SitePeer.h +++ b/libminifi/include/Site2SitePeer.h @@ -78,7 +78,7 @@ class Site2SitePeer : public org::apache::nifi::minifi::io::BaseStream { _url = std::move(ss._url); } // Destructor - virtual ~Site2SitePeer() { + ~Site2SitePeer() { Close(); } // Set Processor yield period in MilliSecond http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/ThreadedSchedulingAgent.h ---------------------------------------------------------------------- diff --git a/libminifi/include/ThreadedSchedulingAgent.h b/libminifi/include/ThreadedSchedulingAgent.h index 27b8b3a..d9fc7be 100644 --- a/libminifi/include/ThreadedSchedulingAgent.h +++ b/libminifi/include/ThreadedSchedulingAgent.h @@ -84,14 +84,12 @@ class ThreadedSchedulingAgent : public SchedulingAgent { /*! * Create a new threaded scheduling agent. */ - ThreadedSchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, std::shared_ptr<core::Repository> repo, - std::shared_ptr<core::Repository> flow_repo, - std::shared_ptr<core::ContentRepository> content_repo, - std::shared_ptr<Configure> configuration) + ThreadedSchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_repo, + std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configuration) : SchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configuration), logger_(logging::LoggerFactory<ThreadedSchedulingAgent>::getLogger()) { - utils::ThreadPool<uint64_t> pool = utils::ThreadPool<uint64_t>(configure_->getInt(Configure::nifi_flow_engine_threads, 8), true); + utils::ThreadPool<uint64_t> pool = utils::ThreadPool<uint64_t>(configure_->getInt(Configure::nifi_flow_engine_threads, 2), true); thread_pool_ = std::move(pool); thread_pool_.start(); @@ -101,7 +99,8 @@ class ThreadedSchedulingAgent : public SchedulingAgent { } // Run function for the thread - virtual uint64_t run(std::shared_ptr<core::Processor> processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory) = 0; + virtual uint64_t run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext, + const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) = 0; public: // schedule, overwritten by different DrivenTimerDrivenSchedulingAgent @@ -110,12 +109,11 @@ class ThreadedSchedulingAgent : public SchedulingAgent { virtual void unschedule(std::shared_ptr<core::Processor> processor); virtual void stop(); - protected: + protected: utils::ThreadPool<uint64_t> thread_pool_; protected: - private: // Prevent default copy constructor and assignment operation // Only support pass by reference or pointer http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/TimerDrivenSchedulingAgent.h ---------------------------------------------------------------------- diff --git a/libminifi/include/TimerDrivenSchedulingAgent.h b/libminifi/include/TimerDrivenSchedulingAgent.h index 1502c47..8398b3a 100644 --- a/libminifi/include/TimerDrivenSchedulingAgent.h +++ b/libminifi/include/TimerDrivenSchedulingAgent.h @@ -38,10 +38,9 @@ class TimerDrivenSchedulingAgent : public ThreadedSchedulingAgent { * Create a new processor */ TimerDrivenSchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, std::shared_ptr<core::Repository> repo, - std::shared_ptr<core::Repository> flow_repo, - std::shared_ptr<core::ContentRepository> content_repo, - std::shared_ptr<Configure> configure) - : ThreadedSchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configure) { + std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configure) + : ThreadedSchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configure), + logger_(logging::LoggerFactory<TimerDrivenSchedulingAgent>::getLogger()) { } // Destructor virtual ~TimerDrivenSchedulingAgent() { @@ -49,7 +48,7 @@ class TimerDrivenSchedulingAgent : public ThreadedSchedulingAgent { /** * Run function that accepts the processor, context and session factory. */ - uint64_t run(std::shared_ptr<core::Processor> processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory); + uint64_t run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory); private: // Prevent default copy constructor and assignment operation @@ -57,6 +56,7 @@ class TimerDrivenSchedulingAgent : public ThreadedSchedulingAgent { TimerDrivenSchedulingAgent(const TimerDrivenSchedulingAgent &parent); TimerDrivenSchedulingAgent &operator=(const TimerDrivenSchedulingAgent &parent); + std::shared_ptr<logging::Logger> logger_; }; } /* namespace minifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/c2/C2Agent.h ---------------------------------------------------------------------- diff --git a/libminifi/include/c2/C2Agent.h b/libminifi/include/c2/C2Agent.h new file mode 100644 index 0000000..8c2e45d --- /dev/null +++ b/libminifi/include/c2/C2Agent.h @@ -0,0 +1,200 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_C2_C2AGENT_H_ +#define LIBMINIFI_INCLUDE_C2_C2AGENT_H_ + +#include <utility> +#include <functional> +#include <future> +#include <memory> +#include <mutex> +#include <thread> +#include "core/state/UpdateController.h" +#include "core/state/metrics/MetricsBase.h" +#include "C2Payload.h" +#include "C2Protocol.h" +#include "io/validation.h" +#include "protocols/Protocols.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace c2 { + +/** + * Purpose and Justification: C2 agent will be the mechanism that will abstract the protocol to do the work. + * + * The protocol represents a transformation layer into the objects seen in C2Payload. That transformation may + * be minimal or extreme, depending on the protocol itself. + * + * Metrics Classes defined here: + * + * 0 HeartBeat -- RESERVED + * 1-255 Defined by the configuration file. + */ +class C2Agent : public state::UpdateController, public state::metrics::MetricsSink, public std::enable_shared_from_this<C2Agent> { + public: + + C2Agent(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<state::StateMonitor> &updateSink, const std::shared_ptr<Configure> &configure); + + virtual ~C2Agent() { + + } + + /** + * Sends the heartbeat to ths server. Will include metrics + * in the payload if they exist. + */ + void performHeartBeat(); + + virtual std::vector<std::function<state::Update()>> getFunctions() { + return functions_; + } + + /** + * Sets the metric within this sink + * @param metric metric to set + * @param return 0 on success, -1 on failure. + */ + virtual int16_t setMetrics(const std::shared_ptr<state::metrics::Metrics> &metric); + + protected: + + /** + * Configure the C2 agent + */ + void configure(const std::shared_ptr<Configure> &configure, bool reconfigure = true); + + /** + * Serializes metrics into a payload. + * @parem parent_paylaod parent payload into which we insert the newly generated payload. + * @param name name of this metric + * @param metrics metrics to include. + */ + void serializeMetrics(C2Payload &parent_payload, const std::string &name, const std::vector<state::metrics::MetricResponse> &metrics); + + /** + * Extract the payload + * @param resp payload to be moved into the function. + */ + void extractPayload(const C2Payload &&resp); + + /** + * Extract the payload + * @param payload reference. + */ + void extractPayload(const C2Payload &resp); + + /** + * Enqueues a C2 server response for us to evaluate and parse. + */ + void enqueue_c2_server_response(C2Payload &&resp) { + std::lock_guard<std::timed_mutex> lock(queue_mutex); + responses.push_back(std::move(resp)); + } + + /** + * Enqueues a c2 payload for a response to the C2 server. + */ + void enqueue_c2_response(C2Payload &&resp) { + std::lock_guard<std::timed_mutex> lock(request_mutex); + requests.push_back(std::move(resp)); + } + + /** + * Handles a C2 event requested by the server. + * @param resp c2 server response. + */ + void handle_c2_server_response(const C2ContentResponse &resp); + + /** + * Handles an update request + * @param C2ContentResponse response + */ + void handle_update(const C2ContentResponse &resp); + + /** + * Handles a description request + */ + void handle_describe(const C2ContentResponse &resp); + + std::timed_mutex metrics_mutex_; + std::map<std::string, std::shared_ptr<state::metrics::Metrics>> metrics_map_; + + /** + * Device information stored in the metrics format + */ + std::map<std::string, std::shared_ptr<state::metrics::Metrics>> device_information_; + // queue mutex + std::timed_mutex queue_mutex; + + // queue mutex + std::timed_mutex request_mutex; + + // responses for the the C2 agent. + std::vector<C2Payload> responses; + + // requests that originate from the C2 server. + std::vector<C2Payload> requests; + + // heart beat period. + int64_t heart_beat_period_; + + // maximum number of queued messages to send to the c2 server + int16_t max_c2_responses; + + // time point the last time we performed a heartbeat. + std::chrono::steady_clock::time_point last_run_; + + // function that performs the heartbeat + std::function<state::Update()> c2_producer_; + + // function that acts upon the + std::function<state::Update()> c2_consumer_; + + // reference to the update sink, against which we will execute updates. + std::shared_ptr<state::StateMonitor> update_sink_; + + // functions that will be used for the udpate controller. + std::vector<std::function<state::Update()>> functions_; + + // controller service provider refernece. + std::shared_ptr<core::controller::ControllerServiceProvider> controller_; + + std::shared_ptr<Configure> configuration_; + + std::shared_ptr<Configure> running_configuration; + + std::mutex heartbeat_mutex; + + std::vector<std::shared_ptr<HeartBeatReporter>> heartbeat_protocols_; + + std::atomic<C2Protocol*> protocol_; + + std::shared_ptr<logging::Logger> logger_; +} +; + +} /* namesapce c2 */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_C2_C2AGENT_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/c2/C2Payload.h ---------------------------------------------------------------------- diff --git a/libminifi/include/c2/C2Payload.h b/libminifi/include/c2/C2Payload.h new file mode 100644 index 0000000..ca14584 --- /dev/null +++ b/libminifi/include/c2/C2Payload.h @@ -0,0 +1,192 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_C2_C2PAYLOAD_H_ +#define LIBMINIFI_INCLUDE_C2_C2PAYLOAD_H_ + +#include <memory> +#include <string> +#include <map> +#include "core/state/UpdateController.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace c2 { + +enum Operation { + ACKNOWLEDGE, + START, + STOP, + RESTART, + DESCRIBE, + HEARTBEAT, + UPDATE, + VALIDATE, + CLEAR +}; + +enum Direction { + TRANSMIT, + RECEIVE +}; + +class C2ContentResponse { + public: + C2ContentResponse(Operation op); + + C2ContentResponse(const C2ContentResponse &other); + + C2ContentResponse(const C2ContentResponse &&other); + + C2ContentResponse & operator=(const C2ContentResponse &&other); + + C2ContentResponse & operator=(const C2ContentResponse &other); + + Operation op; + // determines if the operation is required + bool required; + // identifier + std::string ident; + // delay before running + uint32_t delay; + // max time before this response will no longer be honored. + uint64_t ttl; + // name applied to commands + std::string name; + // commands that correspond with the operation. + std::map<std::string, std::string> operation_arguments; +// std::vector<std::string> content; +}; + +/** + * C2Payload is an update for the state manager. + * Note that the payload can either consist of other payloads or + * have content directly within it, represented by C2ContentResponse objects, above. + * + * Payloads can also contain raw data, which can be binary data. + */ +class C2Payload : public state::Update { + public: + virtual ~C2Payload() { + + } + + C2Payload(Operation op, std::string identifier, bool resp = false, bool isRaw = false); + + C2Payload(Operation op, bool resp = false, bool isRaw = false); + + C2Payload(Operation op, state::UpdateState state, bool resp = false, bool isRaw = false); + + C2Payload(const C2Payload &other); + + C2Payload(const C2Payload &&other); + + void setIdentifier(const std::string &ident); + + std::string getIdentifier() const; + + void setLabel(const std::string label) { + label_ = label; + } + + std::string getLabel() const { + return label_; + } + + /** + * Gets the operation for this payload. May be nested or a single operation. + */ + Operation getOperation() const; + + /** + * Validate the payload, if necessary and/or possible. + */ + virtual bool validate(); + + /** + * Get content responses from this payload. + */ + const std::vector<C2ContentResponse> &getContent() const; + + /** + * Add a content response to this payload. + */ + void addContent(const C2ContentResponse &&content); + + /** + * Determines if this object contains raw data. + */ + bool isRaw() const; + + /** + * Sets raw data within this object. + */ + void setRawData(const std::string &data); + + /** + * Sets raw data from a vector within this object. + */ + void setRawData(const std::vector<char> &data); + + /** + * Returns raw data. + */ + std::string getRawData() const; + + /** + * Add a nested payload. + * @param payload payload to move into this object. + */ + void addPayload(const C2Payload &&payload); + /** + * Get nested payloads. + */ + const std::vector<C2Payload> &getNestedPayloads() const; + + C2Payload &operator=(const C2Payload &&other); + C2Payload &operator=(const C2Payload &other); + + protected: + + // identifier for this payload. + std::string ident_; + + std::string label_; + + std::vector<C2Payload> payloads_; + + std::vector<C2ContentResponse> content_; + + Operation op_; + + bool raw_; + + std::string raw_data_; + + bool isResponse; + +}; + +} /* namesapce c2 */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_C2_C2PAYLOAD_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/c2/C2Protocol.h ---------------------------------------------------------------------- diff --git a/libminifi/include/c2/C2Protocol.h b/libminifi/include/c2/C2Protocol.h new file mode 100644 index 0000000..683a486 --- /dev/null +++ b/libminifi/include/c2/C2Protocol.h @@ -0,0 +1,119 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_C2_C2PROTOCOL_H_ +#define LIBMINIFI_INCLUDE_C2_C2PROTOCOL_H_ + +#include "C2Payload.h" +#include "core/controller/ControllerServiceProvider.h" +#include "properties/Configure.h" +#include "core/Connectable.h" +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace c2 { + +/** + * Defines a protocol to perform state management of the minifi agent. + */ +class C2Protocol : public core::Connectable { + public: + + C2Protocol(std::string name, uuid_t uuid) + : core::Connectable(name, uuid), + running_(true) { + + } + + virtual void initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) { + controller_ = controller; + configuration_ = configure; + } + virtual ~C2Protocol() { + + } + + /** + * Update the configuration. + */ + virtual void update(const std::shared_ptr<Configure> &configure)=0; + + /** + * Send a C2 payload to the provided URI. The direction indicates to the protocol whether or not this a transmit or receive operation. + * Depending on the protocol this may mean different things. + * + * @param url url. + * @param operation payload to perform and/or send + * @param direction direction of the C2 operation. + * @param async whether or not this is an asynchronous operation + * @return payload from the response or a response to come back in the face of an asynchronous operation. + */ + virtual C2Payload consumePayload(const std::string &url, const C2Payload &operation, Direction direction = TRANSMIT, bool async = false) = 0; + + /** + * Send a C2 payload . The direction indicates to the protocol whether or not this a transmit or receive operation. + * Depending on the protocol this may mean different things. + * + * @param operation payload to perform and/or send + * @param direction direction of the C2 operation. + * @param async whether or not this is an asynchronous operation + * @return payload from the response or a response to come back in the face of an asynchronous operation. + */ + virtual C2Payload consumePayload(const C2Payload &operation, Direction direction = TRANSMIT, bool async = false) = 0; + + /** + * Determines if we are connected and operating + */ + virtual bool isRunning() { + return running_.load(); + } + + /** + * Block until work is available on any input connection, or the given duration elapses + * @param timeoutMs timeout in milliseconds + */ + void waitForWork(uint64_t timeoutMs); + + virtual void yield() { + + } + + /** + * Determines if work is available by this connectable + * @return boolean if work is available. + */ + virtual bool isWorkAvailable() { + return true; + } + + protected: + + std::atomic<bool> running_; + + std::shared_ptr<core::controller::ControllerServiceProvider> controller_; + + std::shared_ptr<Configure> configuration_; +}; + +} /* namesapce c2 */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_C2_C2PROTOCOL_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/c2/HeartBeatReporter.h ---------------------------------------------------------------------- diff --git a/libminifi/include/c2/HeartBeatReporter.h b/libminifi/include/c2/HeartBeatReporter.h new file mode 100644 index 0000000..3d0fd49 --- /dev/null +++ b/libminifi/include/c2/HeartBeatReporter.h @@ -0,0 +1,101 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_C2_HEARTBEATREPORTER_H_ +#define LIBMINIFI_INCLUDE_C2_HEARTBEATREPORTER_H_ + +#include "C2Protocol.h" +#include "C2Payload.h" +#include "core/controller/ControllerServiceProvider.h" +#include "properties/Configure.h" +#include "core/Connectable.h" +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace c2 { + +/** + * Defines a heart beat reporting interface. Note that this differs from + * C2Protocol as heartbeats can be any interface which provides only one way communication. + */ +class HeartBeatReporter : public core::Connectable { + public: + + HeartBeatReporter(std::string name, uuid_t uuid) + : core::Connectable(name, uuid), + controller_(nullptr), + configuration_(nullptr) { + } + + virtual void initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) { + controller_ = controller; + configuration_ = configure; + } + virtual ~HeartBeatReporter() { + } + /** + * Send a C2 payloadd to the provided URI. The direction indicates to the protocol whether or not this a transmit or receive operation. + * Depending on the protocol this may mean different things. + * + * @param url url. + * @param operation payload to perform and/or send + * @param direction direction of the C2 operation. + * @param async whether or not this is an asynchronous operation + * @return result of the heartbeat operation + */ + virtual int16_t heartbeat(const C2Payload &heartbeat) = 0; + + /** + * Determines if we are connected and operating + */ + virtual bool isRunning() { + return true; + } + + /** + * Block until work is available on any input connection, or the given duration elapses + * @param timeoutMs timeout in milliseconds + */ + void waitForWork(uint64_t timeoutMs); + + virtual void yield() { + + } + + /** + * Determines if work is available by this connectable + * @return boolean if work is available. + */ + virtual bool isWorkAvailable() { + return true; + } + + protected: + + std::shared_ptr<core::controller::ControllerServiceProvider> controller_; + + std::shared_ptr<Configure> configuration_; +}; + +} /* namesapce c2 */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_C2_HEARTBEATREPORTER_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/c2/protocols/Protocols.h ---------------------------------------------------------------------- diff --git a/libminifi/include/c2/protocols/Protocols.h b/libminifi/include/c2/protocols/Protocols.h new file mode 100644 index 0000000..c4c314f --- /dev/null +++ b/libminifi/include/c2/protocols/Protocols.h @@ -0,0 +1,24 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_C2_PROTOCOLS_H_ +#define LIBMINIFI_INCLUDE_C2_PROTOCOLS_H_ + +#include "RESTReceiver.h" +#include "RESTSender.h" + +#endif /* LIBMINIFI_INCLUDE_C2_RESTPROTOCOL_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/c2/protocols/RESTProtocol.h ---------------------------------------------------------------------- diff --git a/libminifi/include/c2/protocols/RESTProtocol.h b/libminifi/include/c2/protocols/RESTProtocol.h new file mode 100644 index 0000000..2978a03 --- /dev/null +++ b/libminifi/include/c2/protocols/RESTProtocol.h @@ -0,0 +1,74 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_C2_PROTOCOLS_RESTPROTOCOL_H_ +#define LIBMINIFI_INCLUDE_C2_PROTOCOLS_RESTPROTOCOL_H_ + +#include "json/json.h" +#include "json/writer.h" +#include <string> +#include <mutex> +#include "CivetServer.h" +#include "../C2Protocol.h" +#include "../HeartBeatReporter.h" +#include "controllers/SSLContextService.h" +#include "utils/ByteInputCallBack.h" +#include "utils/HTTPClient.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace c2 { + +/** + * Purpose and Justification: Encapsulates the restful protocol that is built upon C2Protocol. + * + * The external interfaces rely solely on send, where send includes a Direction. Transmit will perform a POST + * and RECEIVE will perform a GET. This does not mean we can't receive on a POST; however, since Direction + * will encompass other protocols the context of its meaning here simply translates into POST and GET respectively. + * + */ +class RESTProtocol { + public: + RESTProtocol() { + + } + + virtual ~RESTProtocol() { + + } + + protected: + + virtual Json::Value serializeJsonPayload(Json::Value &json_root, const C2Payload &payload); + + virtual const C2Payload parseJsonResponse(const C2Payload &payload, const std::vector<char> &response); + + virtual std::string getOperation(const C2Payload &payload); + + virtual Operation stringToOperation(const std::string str); + +}; + +} /* namesapce c2 */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_C2_PROTOCOLS_RESTOPERATIONS_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/include/c2/protocols/RESTReceiver.h ---------------------------------------------------------------------- diff --git a/libminifi/include/c2/protocols/RESTReceiver.h b/libminifi/include/c2/protocols/RESTReceiver.h new file mode 100644 index 0000000..17b5028 --- /dev/null +++ b/libminifi/include/c2/protocols/RESTReceiver.h @@ -0,0 +1,110 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_C2_RESTRCVR_H_ +#define LIBMINIFI_INCLUDE_C2_RESTRCVR_H_ + +#include "RESTSender.h" +#include "json/json.h" +#include "json/writer.h" +#include <string> +#include <mutex> +#include "core/Resource.h" +#include "RESTProtocol.h" +#include "CivetServer.h" +#include "../C2Protocol.h" +#include "controllers/SSLContextService.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace c2 { + +int log_message(const struct mg_connection *conn, const char *message); + +int ssl_protocol_en(void *ssl_context, void *user_data); + +/** + * Purpose and Justification: Encapsulates the restful protocol that is built upon C2Protocol. + * + * The external interfaces rely solely on send, where send includes a Direction. Transmit will perform a POST + * and RECEIVE will perform a GET. This does not mean we can't receive on a POST; however, since Direction + * will encompass other protocols the context of its meaning here simply translates into POST and GET respectively. + * + */ +class RESTReceiver : public RESTProtocol, public HeartBeatReporter { + public: + RESTReceiver(std::string name, uuid_t uuid = nullptr); + + void initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure); + virtual int16_t heartbeat(const C2Payload &heartbeat); + + protected: + + class ListeningProtocol : public CivetHandler { + + public: + ListeningProtocol() { + + } + + bool handleGet(CivetServer *server, struct mg_connection *conn) { + std::string currentvalue; + { + std::lock_guard<std::mutex> lock(reponse_mutex_); + currentvalue = resp_; + } + + mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " + "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", + currentvalue.length()); + mg_printf(conn, "%s", currentvalue.c_str()); + return true; + } + + void setResponse(std::string response) { + std::lock_guard<std::mutex> lock(reponse_mutex_); + resp_ = response; + } + + protected: + std::mutex reponse_mutex_; + std::string resp_; + + }; + + std::unique_ptr<CivetServer> start_webserver(const std::string &port, std::string &rooturi, CivetHandler *handler, std::string &ca_cert); + + std::unique_ptr<CivetServer> start_webserver(const std::string &port, std::string &rooturi, CivetHandler *handler); + + std::unique_ptr<CivetServer> listener; + std::unique_ptr<ListeningProtocol> handler; + + private: + std::shared_ptr<logging::Logger> logger_; +}; + +REGISTER_RESOURCE(RESTReceiver); + +} /* namesapce c2 */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_C2_RESTRCVR_H_ */
