Repository: nifi-minifi-cpp Updated Branches: refs/heads/master b20da80eb -> 20622f6d1
MINIFI-70: enhance site2site port negotiation This closes #119. Signed-off-by: Marc Parisi <phroc...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/20622f6d Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/20622f6d Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/20622f6d Branch: refs/heads/master Commit: 20622f6d13250b9562e3e30949639ae3402aa09d Parents: b20da80 Author: Bin Qiu <benqiu2...@gmail.com> Authored: Tue Jul 18 10:24:35 2017 -0700 Committer: Marc Parisi <phroc...@apache.org> Committed: Thu Jul 20 11:39:46 2017 -0400 ---------------------------------------------------------------------- README.md | 28 +- cmake/BuildTests.cmake | 2 + conf/config.yml | 53 +++- conf/minifi-log.properties | 2 +- conf/minifi.properties | 26 +- libminifi/include/ConfigurationListener.h | 9 +- libminifi/include/HttpConfigurationListener.h | 12 +- libminifi/include/RemoteProcessorGroupPort.h | 49 +++- libminifi/include/Site2SiteClientProtocol.h | 11 + libminifi/include/core/FlowConfiguration.h | 2 + .../SiteToSiteProvenanceReportingTask.h | 4 +- libminifi/include/properties/Configure.h | 15 +- libminifi/include/utils/HTTPUtils.h | 208 ++++++++++++++ libminifi/src/ConfigurationListener.cpp | 40 --- libminifi/src/Configure.cpp | 24 +- libminifi/src/HttpConfigurationListener.cpp | 60 +--- libminifi/src/RemoteProcessorGroupPort.cpp | 198 ++++++++++++-- libminifi/src/Site2SiteClientProtocol.cpp | 273 ++++++++++--------- libminifi/src/core/FlowConfiguration.cpp | 2 +- .../SiteToSiteProvenanceReportingTask.cpp | 1 + libminifi/src/core/yaml/YamlConfiguration.cpp | 47 ++-- .../test/integration/Site2SiteRestTest.cpp | 145 ++++++++++ libminifi/test/resources/TestSite2SiteRest.yml | 58 ++++ libminifi/test/unit/ProcessorTests.cpp | 3 +- libminifi/test/unit/YamlConfigurationTests.cpp | 2 +- 25 files changed, 939 insertions(+), 335 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/20622f6d/README.md ---------------------------------------------------------------------- diff --git a/README.md b/README.md index b7f532c..ac2f97a 100644 --- a/README.md +++ b/README.md @@ -278,8 +278,6 @@ Additionally, users can utilize the MiNiFi Toolkit Converter (version 0.0.1 - sc name: From Node A max concurrent tasks: 1 Properties: - Port: 10001 - Host Name: localhost ### Site2Site Security Configuration @@ -319,8 +317,7 @@ Additionally, users can utilize the MiNiFi Toolkit Converter (version 0.0.1 - sc Provenance Reporting: scheduling strategy: TIMER_DRIVEN scheduling period: 1 sec - port: 10001 - host: localhost + url: http://localhost:8080/nifi port uuid: 471deef6-2a6e-4a7d-912a-81cc17e3a204 batch size: 100 @@ -334,14 +331,27 @@ Additionally, users can utilize the MiNiFi Toolkit Converter (version 0.0.1 - sc nifi.configuration.listener.type=http nifi.configuration.listener.http.url=https://localhost:8080 nifi.configuration.listener.pull.interval=1 sec - nifi.configuration.listener.client.ca.certificate=./conf/nifi-cert.pem if you want to enable client certificate - nifi.configuration.listener.need.ClientAuth=true - nifi.configuration.listener.client.certificate=./conf/client.pem - nifi.configuration.listener.client.private.key=./conf/client.key - nifi.configuration.listener.client.pass.phrase=./conf/password + nifi.https.need.ClientAuth=true + nifi.https.client.certificate=./conf/client.pem + nifi.https.client.private.key=./conf/client.key + nifi.https.client.pass.phrase=./conf/password + nifi.https.client.ca.certificate=./conf/nifi-cert.pem +### REST API access + + Configure REST API user name and password + nifi.rest.api.user.name=admin + nifi.rest.api.password=password + + if you want to enable client certificate + nifi.https.need.ClientAuth=true + nifi.https.client.certificate=./conf/client.pem + nifi.https.client.private.key=./conf/client.key + nifi.https.client.pass.phrase=./conf/password + nifi.https.client.ca.certificate=./conf/nifi-cert.pem + ### UID generation MiNiFi needs to generate many unique identifiers in the course of operations. There are a few different uid implementations available that can be configured in minifi-uid.properties. http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/20622f6d/cmake/BuildTests.cmake ---------------------------------------------------------------------- diff --git a/cmake/BuildTests.cmake b/cmake/BuildTests.cmake index aedae10..59f1d59 100644 --- a/cmake/BuildTests.cmake +++ b/cmake/BuildTests.cmake @@ -94,4 +94,6 @@ add_test(NAME HttpGetIntegrationTestSecure COMMAND HttpGetIntegrationTest "${TES add_test(NAME HttpPostIntegrationTest COMMAND HttpPostIntegrationTest "${TEST_RESOURCES}/TestHTTPPost.yml" ) +add_test(NAME Site2SiteRestTest COMMAND Site2SiteRestTest "${TEST_RESOURCES}/TestSite2SiteRest.yml" "${TEST_RESOURCES}/") + add_test(NAME TestExecuteProcess COMMAND TestExecuteProcess ) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/20622f6d/conf/config.yml ---------------------------------------------------------------------- diff --git a/conf/config.yml b/conf/config.yml index b50c609..b714197 100644 --- a/conf/config.yml +++ b/conf/config.yml @@ -14,7 +14,52 @@ # limitations under the License. Flow Controller: - name: MiNiFi Flow -Processors: [] -Connections: [] -Remote Processing Groups: [] + id: 471deef6-2a6e-4a7d-912a-81cc17e3a205 + name: MiNiFi Flow + +Processors: + - name: GetFile + id: 471deef6-2a6e-4a7d-912a-81cc17e3a206 + class: org.apache.nifi.processors.standard.GetFile + max concurrent tasks: 1 + scheduling strategy: TIMER_DRIVEN + scheduling period: 1 sec + penalization period: 30 sec + yield period: 10 sec + run duration nanos: 0 + auto-terminated relationships list: + Properties: + Input Directory: /tmp/getfile + Keep Source File: true + +Connections: + - name: GenerateFlowFileS2S + id: 471deef6-2a6e-4a7d-912a-81cc17e3a207 + source id: 471deef6-2a6e-4a7d-912a-81cc17e3a206 + source relationship name: success + destination id: 471deef6-2a6e-4a7d-912a-81cc17e3a204 + max work queue size: 0 + max work queue data size: 1 MB + flowfile expiration: 60 sec + queue prioritizer class: org.apache.nifi.prioritizer.NewestFlowFileFirstPrioritizer + +Remote Processing Groups: + - name: NiFi Flow + id: 471deef6-2a6e-4a7d-912a-81cc17e3a208 + url: https://localhost:9443/nifi + timeout: 30 secs + yield period: 10 sec + Input Ports: + - id: 471deef6-2a6e-4a7d-912a-81cc17e3a204 + name: From Node A + max concurrent tasks: 1 + use compression: false + Properties: # Deviates from spec and will later be removed when this is autonegotiated + Port UUID: 471deef6-2a6e-4a7d-912a-81cc17e3a204 + +Provenance Reporting: + scheduling strategy: TIMER_DRIVEN + scheduling period: 1 sec + url: https://localhost:9443/nifi + port uuid: 471deef6-2a6e-4a7d-912a-81cc17e3a204 + batch size: 100 http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/20622f6d/conf/minifi-log.properties ---------------------------------------------------------------------- diff --git a/conf/minifi-log.properties b/conf/minifi-log.properties index 870bb7a..99caa1b 100644 --- a/conf/minifi-log.properties +++ b/conf/minifi-log.properties @@ -36,7 +36,7 @@ appender.rolling.max_file_size=5242880 logger.root=INFO,rolling #Logging configurable by namespace -#logger.org::apache::nifi::minifi=DEBUG,rolling +logger.org::apache::nifi::minifi=DEBUG,rolling #Logging configurable by class fully qualified name #logger.org::apache::nifi::minifi::core::logging::LoggerConfiguration=DEBUG http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/20622f6d/conf/minifi.properties ---------------------------------------------------------------------- diff --git a/conf/minifi.properties b/conf/minifi.properties index cfa2858..8e71818 100644 --- a/conf/minifi.properties +++ b/conf/minifi.properties @@ -14,7 +14,7 @@ # limitations under the License. # Core Properties # -nifi.version=0.2.0 +nifi.version=0.1.0 nifi.flow.configuration.file=./conf/config.yml nifi.administrative.yield.duration=30 sec # If a component has no work to do (is "bored"), how long should we wait before checking again for work? @@ -23,21 +23,11 @@ nifi.bored.yield.duration=10 millis nifi.provenance.repository.directory.default=./provenance_repository nifi.provenance.repository.max.storage.time=1 MIN nifi.provenance.repository.max.storage.size=1 MB -# FlowFileRepository # -nifi.flowfile.repository.enable=true -nifi.flowfile.repository.directory.default=./flowfile_repository -nifi.flowfile.repository.max.storage.time=10 MIN -nifi.flowfile.repository.max.storage.size=1 MB - -# Security Related Properties # -# Enable tls ssl #nifi.remote.input.secure=true -# Enable client certificate base authorization -#nifi.security.need.ClientAuth=true -# Client certificate and private key PEM files -#nifi.security.client.certificate=./conf/client.pem -#nifi.security.client.private.key=./conf/client.pem -# Client private key passphrase file -#nifi.security.client.pass.phrase=./conf/password -# Setup the client CA certificate file -#nifi.security.client.ca.certificate=./conf/nifi-cert.pem +nifi.https.need.ClientAuth=true +nifi.https.client.certificate=./conf/client.pem +nifi.https.client.private.key=./conf/client.key +nifi.https.client.pass.phrase=./conf/password +nifi.https.client.ca.certificate=./conf/nifi-cert.pem +#nifi.rest.api.user.name=admin +#nifi.rest.api.password=password http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/20622f6d/libminifi/include/ConfigurationListener.h ---------------------------------------------------------------------- diff --git a/libminifi/include/ConfigurationListener.h b/libminifi/include/ConfigurationListener.h index 5574226..856ea95 100644 --- a/libminifi/include/ConfigurationListener.h +++ b/libminifi/include/ConfigurationListener.h @@ -51,9 +51,10 @@ public: ConfigurationListener(std::shared_ptr<FlowController> controller, std::shared_ptr<Configure> configure, std::string type) : connect_timeout_(20000), read_timeout_(20000), type_(type), configure_( - configure), controller_(controller), need_client_certificate_(false) { + configure), controller_(controller) { logger_ = logging::LoggerFactory<ConfigurationListener>::getLogger(); running_ = false; + } // Destructor virtual ~ConfigurationListener() { @@ -106,12 +107,6 @@ protected: std::shared_ptr<Configure> configure_; std::shared_ptr<logging::Logger> logger_; std::shared_ptr<FlowController> controller_; - - bool need_client_certificate_; - std::string certificate_; - std::string private_key_; - std::string passphrase_; - std::string ca_certificate_; }; } /* namespace minifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/20622f6d/libminifi/include/HttpConfigurationListener.h ---------------------------------------------------------------------- diff --git a/libminifi/include/HttpConfigurationListener.h b/libminifi/include/HttpConfigurationListener.h index 72d4728..7e3291e 100644 --- a/libminifi/include/HttpConfigurationListener.h +++ b/libminifi/include/HttpConfigurationListener.h @@ -40,7 +40,8 @@ public: */ HttpConfigurationListener(std::shared_ptr<FlowController> controller, std::shared_ptr<Configure> configure) : - minifi::ConfigurationListener(controller, configure, "http") { + minifi::ConfigurationListener(controller, configure, "http"), + securityConfig_(configure) { std::string value; if (configure->get(Configure::nifi_configuration_listener_http_url, value)) { @@ -56,14 +57,6 @@ public: bool pullConfiguration(std::string &configuration); - /** - * Configures a secure connection - */ - void configureSecureConnection(CURL *http_session); - - static CURLcode configureSSLContext(CURL *curl, void *ctx, void *param); - static int pemPassWordCb(char *buf, int size, int rwflag, void *param); - // Destructor virtual ~HttpConfigurationListener() { this->stop(); @@ -71,6 +64,7 @@ public: } protected: + minifi::utils::HTTPSecurityConfiguration securityConfig_; }; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/20622f6d/libminifi/include/RemoteProcessorGroupPort.h ---------------------------------------------------------------------- diff --git a/libminifi/include/RemoteProcessorGroupPort.h b/libminifi/include/RemoteProcessorGroupPort.h index 9f89b07..d484fb9 100644 --- a/libminifi/include/RemoteProcessorGroupPort.h +++ b/libminifi/include/RemoteProcessorGroupPort.h @@ -23,6 +23,7 @@ #include <mutex> #include <memory> #include <stack> +#include "utils/HTTPUtils.h" #include "concurrentqueue.h" #include "FlowFileRecord.h" #include "core/Processor.h" @@ -42,20 +43,30 @@ class RemoteProcessorGroupPort : public core::Processor { /*! * Create a new processor */ - RemoteProcessorGroupPort(const std::shared_ptr<io::StreamFactory> &stream_factory, std::string name, uuid_t uuid = nullptr) + RemoteProcessorGroupPort(const std::shared_ptr<io::StreamFactory> &stream_factory, std::string name, std::string url, std::shared_ptr<Configure> configure, uuid_t uuid = nullptr) : core::Processor(name, uuid), + configure_(configure), direction_(SEND), transmitting_(false), - logger_(logging::LoggerFactory<RemoteProcessorGroupPort>::getLogger()) { + logger_(logging::LoggerFactory<RemoteProcessorGroupPort>::getLogger()), + url_(url), + securityConfig_(configure) { stream_factory_ = stream_factory; if (uuid != nullptr) { uuid_copy(protocol_uuid_, uuid); } + site2site_port_ = -1; + site2site_secure_ = false; + site2site_peer_index_ = -1; + // REST API port and host + port_ = -1; + utils::parse_url(url_, host_, port_, protocol_); } // Destructor virtual ~RemoteProcessorGroupPort() { } + // Processor Name static const char *ProcessorName; // Supported Properties @@ -84,6 +95,25 @@ class RemoteProcessorGroupPort : public core::Processor { void setTransmitting(bool val) { transmitting_ = val; } + // setURL + void setURL(std::string val) { + url_ = val; + utils::parse_url(url_, host_, port_, protocol_); + if (port_ == -1) { + if (protocol_.find("https") != std::string::npos) { + port_ = 443; + } + else if (protocol_.find("http") != std::string::npos) { + port_ = 80; + } + } + } + + // refresh remoteSite2SiteInfo via nifi rest api + void refreshRemoteSite2SiteInfo(); + + // refresh site2site peer list + void refreshPeerList(); protected: @@ -93,6 +123,7 @@ class RemoteProcessorGroupPort : public core::Processor { moodycamel::ConcurrentQueue<std::unique_ptr<Site2SiteClientProtocol>> available_protocols_; + std::shared_ptr<Configure> configure_; // Logger std::shared_ptr<logging::Logger> logger_; // Transaction Direction @@ -104,9 +135,21 @@ class RemoteProcessorGroupPort : public core::Processor { uuid_t protocol_uuid_; + // rest API end point info std::string host_; + int port_; + std::string protocol_; + std::string url_; - uint16_t port_; + // Remote Site2Site Info + int site2site_port_; + bool site2site_secure_; + std::vector<minifi::Site2SitePeerStatus> site2site_peer_status_list_; + std::atomic<int> site2site_peer_index_; + std::mutex site2site_peer_mutex_; + std::string rest_user_name_; + std::string rest_password_; + minifi::utils::HTTPSecurityConfiguration securityConfig_; }; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/20622f6d/libminifi/include/Site2SiteClientProtocol.h ---------------------------------------------------------------------- diff --git a/libminifi/include/Site2SiteClientProtocol.h b/libminifi/include/Site2SiteClientProtocol.h index a987459..8d89004 100644 --- a/libminifi/include/Site2SiteClientProtocol.h +++ b/libminifi/include/Site2SiteClientProtocol.h @@ -388,6 +388,15 @@ class DataPacket { }; +/** + * Site2Site Peer + */ + typedef struct Site2SitePeerStatus { + std::string host_; + int port_; + bool isSecure_; + } Site2SitePeerStatus; + // Site2SiteClientProtocol Class class Site2SiteClientProtocol { public: @@ -476,6 +485,8 @@ class Site2SiteClientProtocol { } // bootstrap the protocol to the ready for transaction state by going through the state machine bool bootstrap(); + // get peerList + bool getPeerList(std::vector<minifi::Site2SitePeerStatus> &peer); // establish bool establish(); // handShake http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/20622f6d/libminifi/include/core/FlowConfiguration.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/FlowConfiguration.h b/libminifi/include/core/FlowConfiguration.h index 6e2b700..3429166 100644 --- a/libminifi/include/core/FlowConfiguration.h +++ b/libminifi/include/core/FlowConfiguration.h @@ -65,6 +65,7 @@ class FlowConfiguration : public CoreComponent { flow_file_repo_(flow_file_repo), config_path_(path), stream_factory_(stream_factory), + configuration_(configuration), logger_(logging::LoggerFactory<FlowConfiguration>::getLogger()) { controller_services_ = std::make_shared<core::controller::ControllerServiceMap>(); service_provider_ = std::make_shared<core::controller::StandardControllerServiceProvider>(controller_services_, nullptr, configuration); @@ -128,6 +129,7 @@ class FlowConfiguration : public CoreComponent { std::shared_ptr<core::Repository> flow_file_repo_; // stream factory std::shared_ptr<io::StreamFactory> stream_factory_; + std::shared_ptr<Configure> configuration_; private: std::shared_ptr<logging::Logger> logger_; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/20622f6d/libminifi/include/core/reporting/SiteToSiteProvenanceReportingTask.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/reporting/SiteToSiteProvenanceReportingTask.h b/libminifi/include/core/reporting/SiteToSiteProvenanceReportingTask.h index 9e3f567..e1d80e8 100644 --- a/libminifi/include/core/reporting/SiteToSiteProvenanceReportingTask.h +++ b/libminifi/include/core/reporting/SiteToSiteProvenanceReportingTask.h @@ -45,8 +45,8 @@ class SiteToSiteProvenanceReportingTask : public minifi::RemoteProcessorGroupPor /*! * Create a new processor */ - SiteToSiteProvenanceReportingTask(const std::shared_ptr<io::StreamFactory> &stream_factory) - : minifi::RemoteProcessorGroupPort(stream_factory, ReportTaskName), + SiteToSiteProvenanceReportingTask(const std::shared_ptr<io::StreamFactory> &stream_factory, std::shared_ptr<Configure> configure) + : minifi::RemoteProcessorGroupPort(stream_factory, ReportTaskName, "", configure, NULL), logger_(logging::LoggerFactory<SiteToSiteProvenanceReportingTask>::getLogger()) { this->setTriggerWhenEmpty(true); port_ = 0; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/20622f6d/libminifi/include/properties/Configure.h ---------------------------------------------------------------------- diff --git a/libminifi/include/properties/Configure.h b/libminifi/include/properties/Configure.h index fa19a18..13da55a 100644 --- a/libminifi/include/properties/Configure.h +++ b/libminifi/include/properties/Configure.h @@ -63,12 +63,15 @@ class Configure : public Properties { static const char *nifi_configuration_listener_http_url; static const char *nifi_configuration_listener_rest_url; static const char *nifi_configuration_listener_type; // http or rest - // configuration listener security config - static const char *nifi_configuration_listener_need_ClientAuth; - static const char *nifi_configuration_listener_client_certificate; - static const char *nifi_configuration_listener_private_key; - static const char *nifi_configuration_listener_client_pass_phrase; - static const char *nifi_configuration_listener_client_ca_certificate; + // security config for all https service + static const char *nifi_https_need_ClientAuth; + static const char *nifi_https_client_certificate; + static const char *nifi_https_client_private_key; + static const char *nifi_https_client_pass_phrase; + static const char *nifi_https_client_ca_certificate; + // nifi rest api user name and password + static const char *nifi_rest_api_user_name; + static const char *nifi_rest_api_password; }; } /* namespace minifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/20622f6d/libminifi/include/utils/HTTPUtils.h ---------------------------------------------------------------------- diff --git a/libminifi/include/utils/HTTPUtils.h b/libminifi/include/utils/HTTPUtils.h index 3f20f5e..46aa67a 100644 --- a/libminifi/include/utils/HTTPUtils.h +++ b/libminifi/include/utils/HTTPUtils.h @@ -21,7 +21,16 @@ #include <curl/curl.h> #include <vector> +#include <iostream> +#include <string> +#include <curl/curlbuild.h> +#include <curl/easy.h> +#include <openssl/ssl.h> #include "ByteInputCallBack.h" +#include "core/logging/Logger.h" +#include "core/logging/LoggerConfiguration.h" +#include "properties/Configure.h" +#include "io/validation.h" namespace org { namespace apache { @@ -88,6 +97,205 @@ struct HTTPRequestResponse { }; +static void parse_url(std::string &url, std::string &host, int &port, std::string &protocol) { + + std::string http("http://"); + std::string https("https://"); + + if (url.compare(0, http.size(), http) == 0) + protocol = http; + + if (url.compare(0, https.size(), https) == 0) + protocol = https; + + if (!protocol.empty()) { + size_t pos = url.find_first_of(":", protocol.size()); + + if (pos == std::string::npos) { + pos = url.size(); + } + + host = url.substr(protocol.size(), pos - protocol.size()); + + if (pos < url.size() && url[pos] == ':') { + size_t ppos = url.find_first_of("/", pos); + if (ppos == std::string::npos) { + ppos = url.size(); + } + std::string portStr(url.substr(pos + 1, ppos - pos - 1)); + if (portStr.size() > 0) { + port = std::stoi(portStr); + } + } + } +} + +// HTTPSecurityConfiguration +class HTTPSecurityConfiguration { +public: + + // Constructor + /*! + * Create a new HTTPSecurityConfiguration + */ + HTTPSecurityConfiguration(bool need_client_certificate, std::string certificate, + std::string private_key, std::string passphrase, std::string ca_certificate) : + need_client_certificate_(need_client_certificate), certificate_(certificate), + private_key_(private_key), passphrase_(passphrase), ca_certificate_(ca_certificate) { + logger_ = logging::LoggerFactory<HTTPSecurityConfiguration>::getLogger(); + } + // Destructor + virtual ~HTTPSecurityConfiguration() { + } + + HTTPSecurityConfiguration(std::shared_ptr<Configure> configure) { + logger_ = logging::LoggerFactory<HTTPSecurityConfiguration>::getLogger(); + need_client_certificate_ = false; + std::string clientAuthStr; + if (configure->get(Configure::nifi_https_need_ClientAuth, clientAuthStr)) { + org::apache::nifi::minifi::utils::StringUtils::StringToBool(clientAuthStr, this->need_client_certificate_); + } + + if (configure->get(Configure::nifi_https_client_ca_certificate, this->ca_certificate_)) { + logger_->log_info("HTTPSecurityConfiguration CA certificates: [%s]", this->ca_certificate_); + } + + if (this->need_client_certificate_) { + std::string passphrase_file; + + if (!(configure->get(Configure::nifi_https_client_certificate, this->certificate_) && configure->get(Configure::nifi_https_client_private_key, this->private_key_))) { + logger_->log_error("Certificate and Private Key PEM file not configured for HTTPSecurityConfiguration, error: %s.", std::strerror(errno)); + } + + if (configure->get(Configure::nifi_https_client_pass_phrase, passphrase_file)) { + // load the passphase from file + std::ifstream file(passphrase_file.c_str(), std::ifstream::in); + if (file.good()) { + this->passphrase_.assign((std::istreambuf_iterator<char>(file)), std::istreambuf_iterator<char>()); + file.close(); + } + } + + logger_->log_info("HTTPSecurityConfiguration certificate: [%s], private key: [%s], passphrase file: [%s]", this->certificate_, this->private_key_, passphrase_file); + } + } + + /** + * Configures a secure connection + */ + void configureSecureConnection(CURL *http_session) { + curl_easy_setopt(http_session, CURLOPT_VERBOSE, 1L); + curl_easy_setopt(http_session, CURLOPT_CAINFO, this->ca_certificate_.c_str()); + curl_easy_setopt(http_session, CURLOPT_SSLCERTTYPE, "PEM"); + curl_easy_setopt(http_session, CURLOPT_SSL_VERIFYPEER, 1L); + if (this->need_client_certificate_) { + CURLcode ret; + ret = curl_easy_setopt(http_session, CURLOPT_SSL_CTX_FUNCTION, + &HTTPSecurityConfiguration::configureSSLContext); + if (ret != CURLE_OK) + logger_->log_error("CURLOPT_SSL_CTX_FUNCTION not supported %d", ret); + curl_easy_setopt(http_session, CURLOPT_SSL_CTX_DATA, + static_cast<void*>(this)); + curl_easy_setopt(http_session, CURLOPT_SSLKEYTYPE, "PEM"); + } + } + + static CURLcode configureSSLContext(CURL *curl, void *ctx, void *param) { + minifi::utils::HTTPSecurityConfiguration *config = + static_cast<minifi::utils::HTTPSecurityConfiguration *>(param); + SSL_CTX* sslCtx = static_cast<SSL_CTX*>(ctx); + + SSL_CTX_load_verify_locations(sslCtx, config->ca_certificate_.c_str(), 0); + SSL_CTX_use_certificate_file(sslCtx, config->certificate_.c_str(), + SSL_FILETYPE_PEM); + SSL_CTX_set_default_passwd_cb(sslCtx, + HTTPSecurityConfiguration::pemPassWordCb); + SSL_CTX_set_default_passwd_cb_userdata(sslCtx, param); + SSL_CTX_use_PrivateKey_file(sslCtx, config->private_key_.c_str(), + SSL_FILETYPE_PEM); + // verify private key + if (!SSL_CTX_check_private_key(sslCtx)) { + config->logger_->log_error( + "Private key does not match the public certificate, error : %s", + std::strerror(errno)); + return CURLE_FAILED_INIT; + } + + config->logger_->log_debug( + "HTTPSecurityConfiguration load Client Certificates OK"); + return CURLE_OK; + } + + static int pemPassWordCb(char *buf, int size, int rwflag, void *param) { + minifi::utils::HTTPSecurityConfiguration *config = + static_cast<minifi::utils::HTTPSecurityConfiguration *>(param); + + if (config->passphrase_.length() > 0) { + memset(buf, 0x00, size); + memcpy(buf, config->passphrase_.c_str(), + config->passphrase_.length() - 1); + return config->passphrase_.length() - 1; + } + return 0; + } + +protected: + bool need_client_certificate_; + std::string certificate_; + std::string private_key_; + std::string passphrase_; + std::string ca_certificate_; + +private: + std::shared_ptr<logging::Logger> logger_; +}; + +static std::string get_token(std::string loginUrl, std::string username, std::string password, HTTPSecurityConfiguration &securityConfig) { + utils::HTTPRequestResponse content; + std::string token; + CURL *login_session = curl_easy_init(); + if (loginUrl.find("https") != std::string::npos) { + securityConfig.configureSecureConnection(login_session); + } + curl_easy_setopt(login_session, CURLOPT_URL, loginUrl.c_str()); + struct curl_slist *list = NULL; + list = curl_slist_append(list, "Content-Type: application/x-www-form-urlencoded"); + list = curl_slist_append(list, "Accept: text/plain"); + curl_easy_setopt(login_session, CURLOPT_HTTPHEADER, list); + std::string payload = "username=" + username + "&" + "password=" + password; + char *output = curl_easy_escape(login_session, payload.c_str(), payload.length()); + curl_easy_setopt(login_session, CURLOPT_WRITEFUNCTION, + &utils::HTTPRequestResponse::recieve_write); + curl_easy_setopt(login_session, CURLOPT_WRITEDATA, + static_cast<void*>(&content)); + curl_easy_setopt(login_session, CURLOPT_POSTFIELDSIZE, strlen(output)); + curl_easy_setopt(login_session, CURLOPT_POSTFIELDS, output); + curl_easy_setopt(login_session, CURLOPT_POST, 1); + CURLcode res = curl_easy_perform(login_session); + curl_slist_free_all(list); + curl_free(output); + if (res == CURLE_OK) { + std::string response_body(content.data.begin(), content.data.end()); + int64_t http_code = 0; + curl_easy_getinfo(login_session, CURLINFO_RESPONSE_CODE, &http_code); + char *content_type; + /* ask for the content-type */ + curl_easy_getinfo(login_session, CURLINFO_CONTENT_TYPE, &content_type); + + bool isSuccess = ((int32_t) (http_code / 100)) == 2 + && res != CURLE_ABORTED_BY_CALLBACK; + bool body_empty = IsNullOrEmpty(content.data); + + if (isSuccess && !body_empty) { + token = "Bearer " + response_body; + } + } + curl_easy_cleanup(login_session); + + return token; +} + + } /* namespace utils */ } /* namespace minifi */ } /* namespace nifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/20622f6d/libminifi/src/ConfigurationListener.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/ConfigurationListener.cpp b/libminifi/src/ConfigurationListener.cpp index d52a088..aaf50ce 100644 --- a/libminifi/src/ConfigurationListener.cpp +++ b/libminifi/src/ConfigurationListener.cpp @@ -46,46 +46,6 @@ void ConfigurationListener::start() { } } - std::string clientAuthStr; - if (configure_->get(Configure::nifi_configuration_listener_need_ClientAuth, clientAuthStr)) { - org::apache::nifi::minifi::utils::StringUtils::StringToBool(clientAuthStr, this->need_client_certificate_); - } - - if (configure_->get( - Configure::nifi_configuration_listener_client_ca_certificate, - this->ca_certificate_)) { - logger_->log_info("Configuration Listener CA certificates: [%s]", - this->ca_certificate_); - } - - if (this->need_client_certificate_) { - std::string passphrase_file; - - if (!(configure_->get( - Configure::nifi_configuration_listener_client_certificate, this->certificate_) - && configure_->get(Configure::nifi_configuration_listener_private_key, - this->private_key_))) { - logger_->log_error( - "Certificate and Private Key PEM file not configured for configuration listener, error: %s.", - std::strerror(errno)); - } - - if (configure_->get( - Configure::nifi_configuration_listener_client_pass_phrase, - passphrase_file)) { - // load the passphase from file - std::ifstream file(passphrase_file.c_str(), std::ifstream::in); - if (file.good()) { - this->passphrase_.assign((std::istreambuf_iterator<char>(file)), - std::istreambuf_iterator<char>()); - file.close(); - } - } - - logger_->log_info("Configuration Listener certificate: [%s], private key: [%s], passphrase file: [%s]", - this->certificate_, this->private_key_, passphrase_file); - } - thread_ = std::thread(&ConfigurationListener::threadExecutor, this); thread_.detach(); running_ = true; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/20622f6d/libminifi/src/Configure.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/Configure.cpp b/libminifi/src/Configure.cpp index e1bc225..8bbc5fc 100644 --- a/libminifi/src/Configure.cpp +++ b/libminifi/src/Configure.cpp @@ -61,16 +61,20 @@ const char *Configure::nifi_configuration_listener_rest_url = "nifi.configuration.listener.rest.url"; const char *Configure::nifi_configuration_listener_type = "nifi.configuration.listener.type"; -const char *Configure::nifi_configuration_listener_need_ClientAuth = - "nifi.configuration.listener.need.ClientAuth"; -const char *Configure::nifi_configuration_listener_client_certificate = - "nifi.configuration.listener.client.certificate"; -const char *Configure::nifi_configuration_listener_private_key = - "nifi.configuration.listener.client.private.key"; -const char *Configure::nifi_configuration_listener_client_pass_phrase = - "nifi.configuration.listener.client.pass.phrase"; -const char *Configure::nifi_configuration_listener_client_ca_certificate = - "nifi.configuration.listener.client.ca.certificate"; +const char *Configure::nifi_https_need_ClientAuth = + "nifi.https.need.ClientAuth"; +const char *Configure::nifi_https_client_certificate = + "nifi.https.client.certificate"; +const char *Configure::nifi_https_client_private_key = + "nifi.https.client.private.key"; +const char *Configure::nifi_https_client_pass_phrase = + "nifi.https.client.pass.phrase"; +const char *Configure::nifi_https_client_ca_certificate = + "nifi.https.client.ca.certificate"; +const char *Configure::nifi_rest_api_user_name = + "nifi.rest.api.user.name"; +const char *Configure::nifi_rest_api_password = + "nifi.rest.api.password"; } /* namespace minifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/20622f6d/libminifi/src/HttpConfigurationListener.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/HttpConfigurationListener.cpp b/libminifi/src/HttpConfigurationListener.cpp index 70d5793..39da67b 100644 --- a/libminifi/src/HttpConfigurationListener.cpp +++ b/libminifi/src/HttpConfigurationListener.cpp @@ -38,64 +38,6 @@ namespace apache { namespace nifi { namespace minifi { -int HttpConfigurationListener::pemPassWordCb(char *buf, int size, int rwflag, - void *param) { - minifi::HttpConfigurationListener *listener = - static_cast<minifi::HttpConfigurationListener*>(param); - - if (listener->passphrase_.length() > 0) { - memset(buf, 0x00, size); - memcpy(buf, listener->passphrase_.c_str(), - listener->passphrase_.length() - 1); - return listener->passphrase_.length() - 1; - } - return 0; -} - -CURLcode HttpConfigurationListener::configureSSLContext(CURL *curl, void *ctx, - void *param) { - minifi::HttpConfigurationListener *listener = - static_cast<minifi::HttpConfigurationListener*>(param); - SSL_CTX* sslCtx = static_cast<SSL_CTX*>(ctx); - - SSL_CTX_load_verify_locations(sslCtx, listener->ca_certificate_.c_str(), 0); - SSL_CTX_use_certificate_file(sslCtx, listener->certificate_.c_str(), - SSL_FILETYPE_PEM); - SSL_CTX_set_default_passwd_cb(sslCtx, - HttpConfigurationListener::pemPassWordCb); - SSL_CTX_set_default_passwd_cb_userdata(sslCtx, param); - SSL_CTX_use_PrivateKey_file(sslCtx, listener->private_key_.c_str(), - SSL_FILETYPE_PEM); - // verify private key - if (!SSL_CTX_check_private_key(sslCtx)) { - listener->logger_->log_error( - "Private key does not match the public certificate, error : %s", - std::strerror(errno)); - return CURLE_FAILED_INIT; - } - - listener->logger_->log_debug( - "HttpConfigurationListener load Client Certificates OK"); - return CURLE_OK; -} - -void HttpConfigurationListener::configureSecureConnection(CURL *http_session) { - curl_easy_setopt(http_session, CURLOPT_VERBOSE, 1L); - curl_easy_setopt(http_session, CURLOPT_CAINFO, this->ca_certificate_.c_str()); - curl_easy_setopt(http_session, CURLOPT_SSLCERTTYPE, "PEM"); - curl_easy_setopt(http_session, CURLOPT_SSL_VERIFYPEER, 1L); - if (this->need_client_certificate_) { - CURLcode ret; - ret = curl_easy_setopt(http_session, CURLOPT_SSL_CTX_FUNCTION, - &HttpConfigurationListener::configureSSLContext); - if (ret != CURLE_OK) - logger_->log_error("CURLOPT_SSL_CTX_FUNCTION not supported %d", ret); - curl_easy_setopt(http_session, CURLOPT_SSL_CTX_DATA, - static_cast<void*>(this)); - curl_easy_setopt(http_session, CURLOPT_SSLKEYTYPE, "PEM"); - } -} - bool HttpConfigurationListener::pullConfiguration(std::string &configuration) { if (url_.empty()) return false; @@ -117,7 +59,7 @@ bool HttpConfigurationListener::pullConfiguration(std::string &configuration) { } if (fullUrl.find("https") != std::string::npos) { - configureSecureConnection(http_session); + securityConfig_.configureSecureConnection(http_session); } utils::HTTPRequestResponse content; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/20622f6d/libminifi/src/RemoteProcessorGroupPort.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp b/libminifi/src/RemoteProcessorGroupPort.cpp index ca8d3be..d1862cd 100644 --- a/libminifi/src/RemoteProcessorGroupPort.cpp +++ b/libminifi/src/RemoteProcessorGroupPort.cpp @@ -20,6 +20,9 @@ #include "../include/RemoteProcessorGroupPort.h" +#include <curl/curl.h> +#include <curl/curlbuild.h> +#include <curl/easy.h> #include <uuid/uuid.h> #include <algorithm> #include <cstdint> @@ -30,6 +33,8 @@ #include <string> #include <type_traits> #include <utility> +#include "json/json.h" +#include "json/writer.h" #include "../include/core/logging/Logger.h" #include "../include/core/ProcessContext.h" @@ -44,8 +49,8 @@ namespace nifi { namespace minifi { const char *RemoteProcessorGroupPort::ProcessorName("RemoteProcessorGroupPort"); -core::Property RemoteProcessorGroupPort::hostName("Host Name", "Remote Host Name.", "localhost"); -core::Property RemoteProcessorGroupPort::port("Port", "Remote Port", "9999"); +core::Property RemoteProcessorGroupPort::hostName("Host Name", "Remote Host Name.", ""); +core::Property RemoteProcessorGroupPort::port("Port", "Remote Port", ""); core::Property RemoteProcessorGroupPort::portUUID("Port UUID", "Specifies remote NiFi Port UUID.", ""); core::Relationship RemoteProcessorGroupPort::relation; @@ -55,18 +60,38 @@ bool create = true) { if (!available_protocols_.try_dequeue(nextProtocol)) { if (create) { // create - nextProtocol = std::unique_ptr<Site2SiteClientProtocol>(new Site2SiteClientProtocol(nullptr)); - nextProtocol->setPortId(protocol_uuid_); - std::unique_ptr<org::apache::nifi::minifi::io::DataStream> str = std::unique_ptr<org::apache::nifi::minifi::io::DataStream>(stream_factory_->createSocket(host_, port_)); - std::unique_ptr<Site2SitePeer> peer_ = std::unique_ptr<Site2SitePeer>(new Site2SitePeer(std::move(str), host_, port_)); - nextProtocol->setPeer(std::move(peer_)); + if (url_.empty()) { + nextProtocol = std::unique_ptr<Site2SiteClientProtocol>(new Site2SiteClientProtocol(nullptr)); + nextProtocol->setPortId(protocol_uuid_); + std::unique_ptr<org::apache::nifi::minifi::io::DataStream> str = std::unique_ptr<org::apache::nifi::minifi::io::DataStream>(stream_factory_->createSocket(host_, port_)); + std::unique_ptr<Site2SitePeer> peer_ = std::unique_ptr<Site2SitePeer>(new Site2SitePeer(std::move(str), host_, port_)); + nextProtocol->setPeer(std::move(peer_)); + } else if (site2site_peer_index_ >= 0) { + nextProtocol = std::unique_ptr<Site2SiteClientProtocol>(new Site2SiteClientProtocol(nullptr)); + minifi::Site2SitePeerStatus peer; + nextProtocol->setPortId(protocol_uuid_); + { + std::lock_guard < std::mutex > lock(site2site_peer_mutex_); + peer = site2site_peer_status_list_[this->site2site_peer_index_]; + site2site_peer_index_++; + if (site2site_peer_index_ >= site2site_peer_status_list_.size()) { + site2site_peer_index_ = 0; + } + } + std::unique_ptr<org::apache::nifi::minifi::io::DataStream> str = std::unique_ptr<org::apache::nifi::minifi::io::DataStream>(stream_factory_->createSocket(peer.host_, peer.port_)); + std::unique_ptr<Site2SitePeer> peer_ = std::unique_ptr<Site2SitePeer>(new Site2SitePeer(std::move(str), peer.host_, peer.port_)); + nextProtocol->setPeer(std::move(peer_)); + } } } return std::move(nextProtocol); } void RemoteProcessorGroupPort::returnProtocol(std::unique_ptr<Site2SiteClientProtocol> return_protocol) { - if (available_protocols_.size_approx() >= max_concurrent_tasks_) { + int count = site2site_peer_status_list_.size(); + if (max_concurrent_tasks_ > count) + count = max_concurrent_tasks_; + if (available_protocols_.size_approx() >= count) { // let the memory be freed return; } @@ -84,6 +109,35 @@ void RemoteProcessorGroupPort::initialize() { std::set<core::Relationship> relationships; relationships.insert(relation); setSupportedRelationships(relationships); + curl_global_init(CURL_GLOBAL_DEFAULT); + { + std::lock_guard < std::mutex > lock(site2site_peer_mutex_); + if (!url_.empty()) { + refreshPeerList(); + if (site2site_peer_status_list_.size() > 0) + site2site_peer_index_ = 0; + } + // populate the site2site protocol for load balancing between them + if (site2site_peer_status_list_.size() > 0) { + int count = site2site_peer_status_list_.size(); + if (max_concurrent_tasks_ > count) + count = max_concurrent_tasks_; + for (int i = 0; i < count; i++) { + std::unique_ptr<Site2SiteClientProtocol> nextProtocol = nullptr; + nextProtocol = std::unique_ptr < Site2SiteClientProtocol > (new Site2SiteClientProtocol(nullptr)); + nextProtocol->setPortId(protocol_uuid_); + minifi::Site2SitePeerStatus peer = site2site_peer_status_list_[this->site2site_peer_index_]; + site2site_peer_index_++; + if (site2site_peer_index_ >= site2site_peer_status_list_.size()) { + site2site_peer_index_ = 0; + } + std::unique_ptr<org::apache::nifi::minifi::io::DataStream> str = std::unique_ptr < org::apache::nifi::minifi::io::DataStream > (stream_factory_->createSocket(peer.host_, peer.port_)); + std::unique_ptr<Site2SitePeer> peer_ = std::unique_ptr < Site2SitePeer > (new Site2SitePeer(std::move(str), peer.host_, peer.port_)); + nextProtocol->setPeer(std::move(peer_)); + returnProtocol(std::move(nextProtocol)); + } + } + } } void RemoteProcessorGroupPort::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) { @@ -91,12 +145,6 @@ void RemoteProcessorGroupPort::onSchedule(core::ProcessContext *context, core::P int64_t lvalue; - if (context->getProperty(hostName.getName(), value)) { - host_ = value; - } - if (context->getProperty(port.getName(), value) && core::Property::StringToInt(value, lvalue)) { - port_ = (uint16_t) lvalue; - } if (context->getProperty(portUUID.getName(), value)) { uuid_parse(value.c_str(), protocol_uuid_); } @@ -110,16 +158,15 @@ void RemoteProcessorGroupPort::onTrigger(core::ProcessContext *context, core::Pr int64_t lvalue; - std::string host = ""; - uint16_t sport = 0; - - if (context->getProperty(hostName.getName(), value)) { - host = value; + if (context->getProperty(hostName.getName(), value) && !value.empty()) { + host_ = value; } - if (context->getProperty(port.getName(), value) && core::Property::StringToInt(value, lvalue)) { - sport = (uint16_t) lvalue; + + if (context->getProperty(port.getName(), value) && !value.empty() && core::Property::StringToInt(value, lvalue)) { + port_ = static_cast<int> (lvalue); } - if (context->getProperty(portUUID.getName(), value)) { + + if (context->getProperty(portUUID.getName(), value) && !value.empty()) { uuid_parse(value.c_str(), protocol_uuid_); } @@ -150,6 +197,113 @@ void RemoteProcessorGroupPort::onTrigger(core::ProcessContext *context, core::Pr return; } +void RemoteProcessorGroupPort::refreshRemoteSite2SiteInfo() { + if (this->host_.empty() || this->port_ == -1 || this->protocol_.empty()) + return; + + std::string fullUrl = this->protocol_ + this->host_ + ":" + std::to_string(this->port_) + "/nifi-api/controller/"; + + this->site2site_port_ = -1; + configure_->get(Configure::nifi_rest_api_user_name, this->rest_user_name_); + configure_->get(Configure::nifi_rest_api_password, this->rest_password_); + + std::string token; + + if (!rest_user_name_.empty()) { + std::string loginUrl = this->protocol_ + this->host_ + ":" + std::to_string(this->port_) + "/nifi-api/access/token/"; + token = utils::get_token(loginUrl, this->rest_user_name_, this->rest_password_, this->securityConfig_); + logger_->log_debug("Token from NiFi REST Api endpoint %s", token); + if (token.empty()) + return; + } + + CURL *http_session = curl_easy_init(); + + if (fullUrl.find("https") != std::string::npos) { + this->securityConfig_.configureSecureConnection(http_session); + } + + struct curl_slist *list = NULL; + if (!token.empty()) { + std::string header = "Authorization: " + token; + list = curl_slist_append(list, header.c_str()); + curl_easy_setopt(http_session, CURLOPT_HTTPHEADER, list); + } + + curl_easy_setopt(http_session, CURLOPT_URL, fullUrl.c_str()); + + utils::HTTPRequestResponse content; + curl_easy_setopt(http_session, CURLOPT_WRITEFUNCTION, + &utils::HTTPRequestResponse::recieve_write); + + curl_easy_setopt(http_session, CURLOPT_WRITEDATA, + static_cast<void*>(&content)); + + CURLcode res = curl_easy_perform(http_session); + if (list) + curl_slist_free_all(list); + + if (res == CURLE_OK) { + std::string response_body(content.data.begin(), content.data.end()); + int64_t http_code = 0; + curl_easy_getinfo(http_session, CURLINFO_RESPONSE_CODE, &http_code); + char *content_type; + /* ask for the content-type */ + curl_easy_getinfo(http_session, CURLINFO_CONTENT_TYPE, &content_type); + + bool isSuccess = ((int32_t) (http_code / 100)) == 2 + && res != CURLE_ABORTED_BY_CALLBACK; + bool body_empty = IsNullOrEmpty(content.data); + + if (isSuccess && !body_empty) { + std::string controller = std::move(response_body); + logger_->log_debug("controller config %s", controller.c_str()); + Json::Value value; + Json::Reader reader; + bool parsingSuccessful = reader.parse(controller, value); + if (parsingSuccessful && !value.empty()) { + Json::Value controllerValue = value["controller"]; + if (!controllerValue.empty()) { + Json::Value port = controllerValue["remoteSiteListeningPort"]; + if (!port.empty()) + this->site2site_port_ = port.asInt(); + Json::Value secure = controllerValue["siteToSiteSecure"]; + if (!secure.empty()) + this->site2site_secure_ = secure.asBool(); + } + logger_->log_info("process group remote site2site port %d, is secure %d", site2site_port_, site2site_secure_); + } + } else { + logger_->log_error("Cannot output body to content for ProcessGroup::refreshRemoteSite2SiteInfo"); + } + } else { + logger_->log_error( + "ProcessGroup::refreshRemoteSite2SiteInfo -- curl_easy_perform() failed %s\n", + curl_easy_strerror(res)); + } + curl_easy_cleanup(http_session); +} + +void RemoteProcessorGroupPort::refreshPeerList() { + refreshRemoteSite2SiteInfo(); + if (site2site_port_ == -1) + return; + + this->site2site_peer_status_list_.clear(); + + std::unique_ptr < Site2SiteClientProtocol> protocol; + protocol = std::unique_ptr < Site2SiteClientProtocol + > (new Site2SiteClientProtocol(nullptr)); + protocol->setPortId(protocol_uuid_); + std::unique_ptr<org::apache::nifi::minifi::io::DataStream> str = + std::unique_ptr < org::apache::nifi::minifi::io::DataStream + > (stream_factory_->createSocket(host_, site2site_port_)); + std::unique_ptr<Site2SitePeer> peer_ = std::unique_ptr < Site2SitePeer + > (new Site2SitePeer(std::move(str), host_, site2site_port_)); + protocol->setPeer(std::move(peer_)); + protocol->getPeerList(site2site_peer_status_list_); +} + } /* namespace minifi */ } /* namespace nifi */ } /* namespace apache */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/20622f6d/libminifi/src/Site2SiteClientProtocol.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/Site2SiteClientProtocol.cpp b/libminifi/src/Site2SiteClientProtocol.cpp index 5e0637d..7d6e3f3 100644 --- a/libminifi/src/Site2SiteClientProtocol.cpp +++ b/libminifi/src/Site2SiteClientProtocol.cpp @@ -27,6 +27,7 @@ #include <thread> #include <random> #include <iostream> +#include <vector> #include "io/CRCStream.h" #include "Site2SitePeer.h" #include "Site2SiteClientProtocol.h" @@ -57,9 +58,6 @@ bool Site2SiteClientProtocol::establish() { if (!ret) { logger_->log_error("Site2Site Protocol Version Negotiation failed"); - /* - peer_->yield(); - tearDown(); */ return false; } @@ -91,7 +89,6 @@ bool Site2SiteClientProtocol::initiateResourceNegotiation() { if (ret <= 0) { logger_->log_info("result of writing version is %i", ret); - // tearDown(); return false; } @@ -100,40 +97,36 @@ bool Site2SiteClientProtocol::initiateResourceNegotiation() { if (ret <= 0) { logger_->log_info("result of writing version status code %i", ret); - // tearDown(); return false; } logger_->log_info("status code is %i", statusCode); switch (statusCode) { - case RESOURCE_OK: - logger_->log_info("Site2Site Protocol Negotiate protocol version OK"); - return true; - case DIFFERENT_RESOURCE_VERSION: - uint32_t serverVersion; - ret = peer_->read(serverVersion); - if (ret <= 0) { - // tearDown(); - return false; - } - logger_->log_info("Site2Site Server Response asked for a different protocol version %d", serverVersion); - for (unsigned int i = (_currentVersionIndex + 1); i < sizeof(_supportedVersion) / sizeof(uint32_t); i++) { - if (serverVersion >= _supportedVersion[i]) { - _currentVersion = _supportedVersion[i]; - _currentVersionIndex = i; - return initiateResourceNegotiation(); - } - } - ret = -1; - // tearDown(); - return false; - case NEGOTIATED_ABORT: - logger_->log_info("Site2Site Negotiate protocol response ABORT"); - ret = -1; - // tearDown(); + case RESOURCE_OK: + logger_->log_info("Site2Site Protocol Negotiate protocol version OK"); + return true; + case DIFFERENT_RESOURCE_VERSION: + uint32_t serverVersion; + ret = peer_->read(serverVersion); + if (ret <= 0) { return false; - default: - logger_->log_info("Negotiate protocol response unknown code %d", statusCode); - return true; + } + logger_->log_info("Site2Site Server Response asked for a different protocol version %d", serverVersion); + for (unsigned int i = (_currentVersionIndex + 1); i < sizeof(_supportedVersion) / sizeof(uint32_t); i++) { + if (serverVersion >= _supportedVersion[i]) { + _currentVersion = _supportedVersion[i]; + _currentVersionIndex = i; + return initiateResourceNegotiation(); + } + } + ret = -1; + return false; + case NEGOTIATED_ABORT: + logger_->log_info("Site2Site Negotiate protocol response ABORT"); + ret = -1; + return false; + default: + logger_->log_info("Negotiate protocol response unknown code %d", statusCode); + return true; } return true; @@ -152,7 +145,6 @@ bool Site2SiteClientProtocol::initiateCodecResourceNegotiation() { if (ret <= 0) { logger_->log_debug("result of getCodecResourceName is %i", ret); - // tearDown(); return false; } @@ -160,7 +152,6 @@ bool Site2SiteClientProtocol::initiateCodecResourceNegotiation() { if (ret <= 0) { logger_->log_debug("result of _currentCodecVersion is %i", ret); - // tearDown(); return false; } @@ -168,40 +159,36 @@ bool Site2SiteClientProtocol::initiateCodecResourceNegotiation() { ret = peer_->read(statusCode); if (ret <= 0) { - // tearDown(); return false; } switch (statusCode) { - case RESOURCE_OK: - logger_->log_info("Site2Site Codec Negotiate version OK"); - return true; - case DIFFERENT_RESOURCE_VERSION: - uint32_t serverVersion; - ret = peer_->read(serverVersion); - if (ret <= 0) { - // tearDown(); - return false; - } - logger_->log_info("Site2Site Server Response asked for a different codec version %d", serverVersion); - for (unsigned int i = (_currentCodecVersionIndex + 1); i < sizeof(_supportedCodecVersion) / sizeof(uint32_t); i++) { - if (serverVersion >= _supportedCodecVersion[i]) { - _currentCodecVersion = _supportedCodecVersion[i]; - _currentCodecVersionIndex = i; - return initiateCodecResourceNegotiation(); - } - } - ret = -1; - // tearDown(); - return false; - case NEGOTIATED_ABORT: - logger_->log_info("Site2Site Codec Negotiate response ABORT"); - ret = -1; - // tearDown(); + case RESOURCE_OK: + logger_->log_info("Site2Site Codec Negotiate version OK"); + return true; + case DIFFERENT_RESOURCE_VERSION: + uint32_t serverVersion; + ret = peer_->read(serverVersion); + if (ret <= 0) { return false; - default: - logger_->log_info("Negotiate Codec response unknown code %d", statusCode); - return true; + } + logger_->log_info("Site2Site Server Response asked for a different codec version %d", serverVersion); + for (unsigned int i = (_currentCodecVersionIndex + 1); i < sizeof(_supportedCodecVersion) / sizeof(uint32_t); i++) { + if (serverVersion >= _supportedCodecVersion[i]) { + _currentCodecVersion = _supportedCodecVersion[i]; + _currentCodecVersionIndex = i; + return initiateCodecResourceNegotiation(); + } + } + ret = -1; + return false; + case NEGOTIATED_ABORT: + logger_->log_info("Site2Site Codec Negotiate response ABORT"); + ret = -1; + return false; + default: + logger_->log_info("Negotiate Codec response unknown code %d", statusCode); + return true; } return true; @@ -223,11 +210,10 @@ bool Site2SiteClientProtocol::handShake() { int ret = peer_->writeUTF(_commsIdentifier); if (ret <= 0) { - // tearDown(); return false; } - std::map<std::string, std::string> properties; + std::map < std::string, std::string > properties; properties[HandShakePropertyStr[GZIP]] = "false"; properties[HandShakePropertyStr[PORT_IDENTIFIER]] = _portIdStr; properties[HandShakePropertyStr[REQUEST_EXPIRATION_MILLIS]] = std::to_string(this->_timeOut); @@ -243,7 +229,6 @@ bool Site2SiteClientProtocol::handShake() { if (_currentVersion >= 3) { ret = peer_->writeUTF(peer_->getURL()); if (ret <= 0) { - // tearDown(); return false; } } @@ -251,7 +236,6 @@ bool Site2SiteClientProtocol::handShake() { uint32_t size = properties.size(); ret = peer_->write(size); if (ret <= 0) { - // tearDown(); return false; } @@ -259,12 +243,10 @@ bool Site2SiteClientProtocol::handShake() { for (it = properties.begin(); it != properties.end(); it++) { ret = peer_->writeUTF(it->first); if (ret <= 0) { - // tearDown(); return false; } ret = peer_->writeUTF(it->second); if (ret <= 0) { - // tearDown(); return false; } logger_->log_info("Site2Site Protocol Send handshake properties %s %s", it->first.c_str(), it->second.c_str()); @@ -276,31 +258,24 @@ bool Site2SiteClientProtocol::handShake() { ret = this->readRespond(code, message); if (ret <= 0) { - // tearDown(); return false; } switch (code) { - case PROPERTIES_OK: - logger_->log_info("Site2Site HandShake Completed"); - _peerState = HANDSHAKED; - return true; - case PORT_NOT_IN_VALID_STATE: - case UNKNOWN_PORT: - case PORTS_DESTINATION_FULL: - logger_->log_error("Site2Site HandShake Failed because destination port is either invalid or full"); - ret = -1; - /* - peer_->yield(); - tearDown(); */ - return false; - default: - logger_->log_info("HandShake Failed because of unknown respond code %d", code); - ret = -1; - /* - peer_->yield(); - tearDown(); */ - return false; + case PROPERTIES_OK: + logger_->log_info("Site2Site HandShake Completed"); + _peerState = HANDSHAKED; + return true; + case PORT_NOT_IN_VALID_STATE: + case UNKNOWN_PORT: + case PORTS_DESTINATION_FULL: + logger_->log_error("Site2Site HandShake Failed because destination port is either invalid or full"); + ret = -1; + return false; + default: + logger_->log_info("HandShake Failed because of unknown respond code %d", code); + ret = -1; + return false; } return false; @@ -322,6 +297,64 @@ void Site2SiteClientProtocol::tearDown() { _peerState = IDLE; } +bool Site2SiteClientProtocol::getPeerList(std::vector<minifi::Site2SitePeerStatus> &peer) { + if (establish() && handShake()) { + int status = this->writeRequestType(REQUEST_PEER_LIST); + + if (status <= 0) { + tearDown(); + return false; + } + + uint32_t number; + status = peer_->read(number); + + if (status <= 0) { + tearDown(); + return false; + } + + for (int i = 0; i < number; i++) { + std::string host; + status = peer_->readUTF(host); + if (status <= 0) { + tearDown(); + return false; + } + uint32_t port; + status = peer_->read(port); + if (status <= 0) { + tearDown(); + return false; + } + uint8_t secure; + status = peer_->read(secure); + if (status <= 0) { + tearDown(); + return false; + } + uint32_t count; + status = peer_->read(count); + if (status <= 0) { + tearDown(); + return false; + } + minifi::Site2SitePeerStatus status; + status.host_ = host; + status.isSecure_ = secure; + status.port_ = port; + peer.push_back(status); + logger_->log_info("Site2Site Peer host %s, port %d, Secure %d", host, port, secure); + } + + tearDown(); + return true; + } else { + tearDown(); + return false; + } +} + int Site2SiteClientProtocol::writeRequestType(RequestType type) { if (type >= MAX_REQUEST_TYPE) return -1; @@ -426,7 +459,6 @@ bool Site2SiteClientProtocol::negotiateCodec() { int status = this->writeRequestType(NEGOTIATE_FLOWFILE_CODEC); if (status <= 0) { - // tearDown(); return false; } @@ -435,9 +467,6 @@ bool Site2SiteClientProtocol::negotiateCodec() { if (!ret) { logger_->log_error("Site2Site Codec Version Negotiation failed"); - /* - peer_->yield(); - tearDown(); */ return false; } @@ -480,7 +509,6 @@ Transaction* Site2SiteClientProtocol::createTransaction(std::string &transaction ret = writeRequestType(RECEIVE_FLOWFILES); if (ret <= 0) { - // tearDown(); return NULL; } @@ -490,40 +518,37 @@ Transaction* Site2SiteClientProtocol::createTransaction(std::string &transaction ret = readRespond(code, message); if (ret <= 0) { - // tearDown(); return NULL; } org::apache::nifi::minifi::io::CRCStream<Site2SitePeer> crcstream(peer_.get()); switch (code) { - case MORE_DATA: - dataAvailable = true; - logger_->log_info("Site2Site peer indicates that data is available"); - transaction = new Transaction(direction, crcstream); - _transactionMap[transaction->getUUIDStr()] = transaction; - transactionID = transaction->getUUIDStr(); - transaction->setDataAvailable(dataAvailable); - logger_->log_info("Site2Site create transaction %s", transaction->getUUIDStr().c_str()); - return transaction; - case NO_MORE_DATA: - dataAvailable = false; - logger_->log_info("Site2Site peer indicates that no data is available"); - transaction = new Transaction(direction, crcstream); - _transactionMap[transaction->getUUIDStr()] = transaction; - transactionID = transaction->getUUIDStr(); - transaction->setDataAvailable(dataAvailable); - logger_->log_info("Site2Site create transaction %s", transaction->getUUIDStr().c_str()); - return transaction; - default: - logger_->log_info("Site2Site got unexpected response %d when asking for data", code); - // tearDown(); - return NULL; + case MORE_DATA: + dataAvailable = true; + logger_->log_info("Site2Site peer indicates that data is available"); + transaction = new Transaction(direction, crcstream); + _transactionMap[transaction->getUUIDStr()] = transaction; + transactionID = transaction->getUUIDStr(); + transaction->setDataAvailable(dataAvailable); + logger_->log_info("Site2Site create transaction %s", transaction->getUUIDStr().c_str()); + return transaction; + case NO_MORE_DATA: + dataAvailable = false; + logger_->log_info("Site2Site peer indicates that no data is available"); + transaction = new Transaction(direction, crcstream); + _transactionMap[transaction->getUUIDStr()] = transaction; + transactionID = transaction->getUUIDStr(); + transaction->setDataAvailable(dataAvailable); + logger_->log_info("Site2Site create transaction %s", transaction->getUUIDStr().c_str()); + return transaction; + default: + logger_->log_info("Site2Site got unexpected response %d when asking for data", code); + return NULL; } } else { ret = writeRequestType(SEND_FLOWFILES); if (ret <= 0) { - // tearDown(); return NULL; } else { org::apache::nifi::minifi::io::CRCStream<Site2SitePeer> crcstream(peer_.get()); @@ -760,7 +785,7 @@ void Site2SiteClientProtocol::receiveFlowFiles(core::ProcessContext *context, co try { while (true) { - std::map<std::string, std::string> empty; + std::map < std::string, std::string > empty; uint64_t startTime = getTimeMillis(); std::string payload; DataPacket packet(this, transaction, empty, payload); @@ -774,7 +799,7 @@ void Site2SiteClientProtocol::receiveFlowFiles(core::ProcessContext *context, co // transaction done break; } - std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create()); + std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast < FlowFileRecord > (session->create()); if (!flowFile) { throw Exception(SITE2SITE_EXCEPTION, "Flow File Creation Failed"); @@ -1072,7 +1097,7 @@ bool Site2SiteClientProtocol::complete(std::string transactionID) { } void Site2SiteClientProtocol::transferFlowFiles(core::ProcessContext *context, core::ProcessSession *session) { - std::shared_ptr<FlowFileRecord> flow = std::static_pointer_cast<FlowFileRecord>(session->get()); + std::shared_ptr<FlowFileRecord> flow = std::static_pointer_cast < FlowFileRecord > (session->get()); Transaction *transaction = NULL; @@ -1125,7 +1150,7 @@ void Site2SiteClientProtocol::transferFlowFiles(core::ProcessContext *context, c if (transferNanos > _batchSendNanos) break; - flow = std::static_pointer_cast<FlowFileRecord>(session->get()); + flow = std::static_pointer_cast < FlowFileRecord > (session->get()); if (!flow) { continueTransaction = false; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/20622f6d/libminifi/src/core/FlowConfiguration.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/FlowConfiguration.cpp b/libminifi/src/core/FlowConfiguration.cpp index cc6e0e5..c32add6 100644 --- a/libminifi/src/core/FlowConfiguration.cpp +++ b/libminifi/src/core/FlowConfiguration.cpp @@ -46,7 +46,7 @@ std::shared_ptr<core::Processor> FlowConfiguration::createProcessor(std::string std::shared_ptr<core::Processor> FlowConfiguration::createProvenanceReportTask() { std::shared_ptr<core::Processor> processor = nullptr; - processor = std::make_shared<org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask>(stream_factory_); + processor = std::make_shared<org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask>(stream_factory_, this->configuration_); // initialize the processor processor->initialize(); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/20622f6d/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp b/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp index e46f740..02ddb52 100644 --- a/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp +++ b/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp @@ -48,6 +48,7 @@ namespace reporting { const char *SiteToSiteProvenanceReportingTask::ProvenanceAppStr = "MiNiFi Flow"; void SiteToSiteProvenanceReportingTask::initialize() { + RemoteProcessorGroupPort::initialize(); } void SiteToSiteProvenanceReportingTask::getJsonReport(core::ProcessContext *context, core::ProcessSession *session, std::vector<std::shared_ptr<provenance::ProvenanceEventRecord>> &records, http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/20622f6d/libminifi/src/core/yaml/YamlConfiguration.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/yaml/YamlConfiguration.cpp index c28ec70..4ce944e 100644 --- a/libminifi/src/core/yaml/YamlConfiguration.cpp +++ b/libminifi/src/core/yaml/YamlConfiguration.cpp @@ -316,18 +316,6 @@ void YamlConfiguration::parseProvenanceReportingYaml(YAML::Node *reportNode, cor auto schedulingStrategyStr = node["scheduling strategy"].as<std::string>(); checkRequiredField(&node, "scheduling period", CONFIG_YAML_PROVENANCE_REPORT_KEY); auto schedulingPeriodStr = node["scheduling period"].as<std::string>(); - checkRequiredField(&node, "host", CONFIG_YAML_PROVENANCE_REPORT_KEY); - auto hostStr = node["host"].as<std::string>(); - checkRequiredField(&node, "port", CONFIG_YAML_PROVENANCE_REPORT_KEY); - auto portStr = node["port"].as<std::string>(); - checkRequiredField(&node, "port uuid", CONFIG_YAML_PROVENANCE_REPORT_KEY); - auto portUUIDStr = node["port uuid"].as<std::string>(); - checkRequiredField(&node, "batch size", CONFIG_YAML_PROVENANCE_REPORT_KEY); - auto batchSizeStr = node["batch size"].as<std::string>(); - - // add processor to parent - parentGroup->addProcessor(processor); - processor->setScheduledState(core::RUNNING); core::TimeUnit unit; if (core::Property::StringToTime(schedulingPeriodStr, schedulingPeriod, unit) && core::Property::ConvertTimeUnitToNS(schedulingPeriod, unit, schedulingPeriod)) { @@ -342,20 +330,43 @@ void YamlConfiguration::parseProvenanceReportingYaml(YAML::Node *reportNode, cor throw std::invalid_argument("Invalid scheduling strategy " + schedulingStrategyStr); } - reportTask->setHost(hostStr); - logger_->log_debug("ProvenanceReportingTask host %s", hostStr); int64_t lvalue; - if (core::Property::StringToInt(portStr, lvalue)) { - logger_->log_debug("ProvenanceReportingTask port %d", (uint16_t) lvalue); - reportTask->setPort((uint16_t) lvalue); + if (node["host"]) { + auto hostStr = node["host"].as<std::string>(); + reportTask->setHost(hostStr); + } + if (node["port"]) { + auto portStr = node["port"].as<std::string>(); + if (core::Property::StringToInt(portStr, lvalue)) { + logger_->log_debug("ProvenanceReportingTask port %d", (uint16_t) lvalue); + reportTask->setPort((uint16_t) lvalue); + } + } + if (node["url"]) { + auto urlStr = node["url"].as<std::string>(); + if (!urlStr.empty()) { + reportTask->setURL(urlStr); + logger_->log_debug("ProvenanceReportingTask URL %s", urlStr); + } } + checkRequiredField(&node, "port uuid", CONFIG_YAML_PROVENANCE_REPORT_KEY); + auto portUUIDStr = node["port uuid"].as<std::string>(); + checkRequiredField(&node, "batch size", CONFIG_YAML_PROVENANCE_REPORT_KEY); + auto batchSizeStr = node["batch size"].as<std::string>(); logger_->log_debug("ProvenanceReportingTask port uuid %s", portUUIDStr); uuid_parse(portUUIDStr.c_str(), port_uuid); reportTask->setPortUUID(port_uuid); + if (core::Property::StringToInt(batchSizeStr, lvalue)) { reportTask->setBatchSize(lvalue); } + + reportTask->initialize(); + + // add processor to parent + parentGroup->addProcessor(processor); + processor->setScheduledState(core::RUNNING); } void YamlConfiguration::parseControllerServices(YAML::Node *controllerServicesNode) { @@ -535,7 +546,7 @@ void YamlConfiguration::parsePortYaml(YAML::Node *portNode, core::ProcessGroup * auto portId = inputPortsObj["id"].as<std::string>(); uuid_parse(portId.c_str(), uuid); - port = std::make_shared<minifi::RemoteProcessorGroupPort>(stream_factory_, nameStr, uuid); + port = std::make_shared<minifi::RemoteProcessorGroupPort>(stream_factory_, nameStr, parent->getURL(), this->configuration_, uuid); processor = std::static_pointer_cast<core::Processor>(port); port->setDirection(direction); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/20622f6d/libminifi/test/integration/Site2SiteRestTest.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/integration/Site2SiteRestTest.cpp b/libminifi/test/integration/Site2SiteRestTest.cpp new file mode 100644 index 0000000..01aa7a8 --- /dev/null +++ b/libminifi/test/integration/Site2SiteRestTest.cpp @@ -0,0 +1,145 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <sys/stat.h> +#include <cassert> +#include <utility> +#include <chrono> +#include <fstream> +#include <memory> +#include <string> +#include <thread> +#include <type_traits> +#include <vector> +#include <iostream> +#include <sstream> +#include "../TestBase.h" +#include "utils/StringUtils.h" +#include "core/Core.h" +#include "../include/core/logging/Logger.h" +#include "core/ProcessGroup.h" +#include "core/yaml/YamlConfiguration.h" +#include "FlowController.h" +#include "properties/Configure.h" +#include "../unit/ProvenanceTestHelper.h" +#include "io/StreamFactory.h" +#include "CivetServer.h" +#include "RemoteProcessorGroupPort.h" + +void waitToVerifyProcessor() { + std::this_thread::sleep_for(std::chrono::seconds(10)); +} + +class ConfigHandler: public CivetHandler { + public: + bool handleGet(CivetServer *server, struct mg_connection *conn) { + static const std::string site2site_rest_resp = "{" + "\"revision\": {" + "\"clientId\": \"483d53eb-53ec-4e93-b4d4-1fc3d23dae6f\"" + "}," + "\"controller\": {" + "\"id\": \"fe4a3a42-53b6-4af1-a80d-6fdfe60de97f\"," + "\"name\": \"NiFi Flow\"," + "\"remoteSiteListeningPort\": 10001," + "\"siteToSiteSecure\": false" + "}}"; + mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " + "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", + site2site_rest_resp.length()); + mg_printf(conn, "%s", site2site_rest_resp.c_str()); + return true; + } +}; + +int main(int argc, char **argv) { + LogTestController::getInstance().setInfo<minifi::RemoteProcessorGroupPort>(); + LogTestController::getInstance().setInfo<minifi::FlowController>(); + + const char *options[] = { "document_root", ".", "listening_ports", "8082", 0 }; + std::vector < std::string > cpp_options; + for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) { + cpp_options.push_back(options[i]); + } + + mkdir("/tmp/site2siteGetFile/", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); + std::fstream file; + std::stringstream ss; + ss << "/tmp/site2siteGetFile/" << "tstFile.ext"; + file.open(ss.str(), std::ios::out); + file << "tempFile"; + file.close(); + + CivetServer server(cpp_options); + ConfigHandler h_ex; + server.addHandler("/nifi-api/controller/", h_ex); + LogTestController::getInstance().setDebug<minifi::RemoteProcessorGroupPort>(); + + std::string key_dir, test_file_location; + if (argc > 1) { + test_file_location = argv[1]; + key_dir = argv[2]; + } + + std::shared_ptr<minifi::Configure> configuration = std::make_shared< + minifi::Configure>(); + configuration->set(minifi::Configure::nifi_default_directory, key_dir); + mkdir("content_repository", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); + + std::shared_ptr<core::Repository> test_repo = + std::make_shared<TestRepository>(); + std::shared_ptr<core::Repository> test_flow_repo = std::make_shared< + TestFlowRepository>(); + + configuration->set(minifi::Configure::nifi_flow_configuration_file, + test_file_location); + + std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared + < minifi::io::StreamFactory > (configuration); + std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr + < core::YamlConfiguration + > (new core::YamlConfiguration(test_repo, test_repo, stream_factory, + configuration, test_file_location)); + std::shared_ptr<TestRepository> repo = std::static_pointer_cast + < TestRepository > (test_repo); + + std::shared_ptr<minifi::FlowController> controller = + std::make_shared < minifi::FlowController + > (test_repo, test_flow_repo, configuration, std::move(yaml_ptr), DEFAULT_ROOT_GROUP_NAME, true); + + core::YamlConfiguration yaml_config(test_repo, test_repo, stream_factory, + configuration, test_file_location); + + std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot( + test_file_location); + std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr < core::ProcessGroup + > (ptr.get()); + ptr.release(); + + controller->load(); + controller->start(); + waitToVerifyProcessor(); + + controller->waitUnload(60000); + std::string logs = LogTestController::getInstance().log_output.str(); + assert(logs.find("process group remote site2site port 10001, is secure 0") != std::string::npos); + LogTestController::getInstance().reset(); + unlink(ss.str().c_str()); + rmdir("/tmp/site2siteGetFile/"); + rmdir("./content_repository"); + return 0; +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/20622f6d/libminifi/test/resources/TestSite2SiteRest.yml ---------------------------------------------------------------------- diff --git a/libminifi/test/resources/TestSite2SiteRest.yml b/libminifi/test/resources/TestSite2SiteRest.yml new file mode 100644 index 0000000..ca751b5 --- /dev/null +++ b/libminifi/test/resources/TestSite2SiteRest.yml @@ -0,0 +1,58 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the \"License\"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an \"AS IS\" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +Flow Controller: + id: 471deef6-2a6e-4a7d-912a-81cc17e3a205 + name: MiNiFi Flow + +Processors: + - name: GetFile + id: 471deef6-2a6e-4a7d-912a-81cc17e3a206 + class: org.apache.nifi.processors.standard.GetFile + max concurrent tasks: 1 + scheduling strategy: TIMER_DRIVEN + scheduling period: 1 sec + penalization period: 30 sec + yield period: 10 sec + run duration nanos: 0 + auto-terminated relationships list: + Properties: + Input Directory: /tmp/site2siteGetFile + Keep Source File: true + +Connections: + - name: GenerateFlowFileS2S + id: 471deef6-2a6e-4a7d-912a-81cc17e3a207 + source id: 471deef6-2a6e-4a7d-912a-81cc17e3a206 + source relationship name: success + destination id: 471deef6-2a6e-4a7d-912a-81cc17e3a204 + max work queue size: 0 + max work queue data size: 1 MB + flowfile expiration: 60 sec + queue prioritizer class: org.apache.nifi.prioritizer.NewestFlowFileFirstPrioritizer + +Remote Processing Groups: + - name: NiFi Flow + id: 471deef6-2a6e-4a7d-912a-81cc17e3a208 + url: http://localhost:8082/nifi + timeout: 30 secs + yield period: 10 sec + Input Ports: + - id: 471deef6-2a6e-4a7d-912a-81cc17e3a204 + name: From Node A + max concurrent tasks: 1 + use compression: false + Properties: # Deviates from spec and will later be removed when this is autonegotiated + Port UUID: 471deef6-2a6e-4a7d-912a-81cc17e3a204 http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/20622f6d/libminifi/test/unit/ProcessorTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/ProcessorTests.cpp b/libminifi/test/unit/ProcessorTests.cpp index ac2b54e..9e2d50c 100644 --- a/libminifi/test/unit/ProcessorTests.cpp +++ b/libminifi/test/unit/ProcessorTests.cpp @@ -46,9 +46,10 @@ TEST_CASE("Test Find file", "[getfileCreate2]") { TestController testController; std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::GetFile>("getfileCreate2"); + std::shared_ptr<org::apache::nifi::minifi::Configure> configure = std::make_shared<org::apache::nifi::minifi::Configure>(); std::shared_ptr<core::Processor> processorReport = std::make_shared<org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask>( - std::make_shared<org::apache::nifi::minifi::io::StreamFactory>(std::make_shared<org::apache::nifi::minifi::Configure>())); + std::make_shared<org::apache::nifi::minifi::io::StreamFactory>(configure), configure); std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>(); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/20622f6d/libminifi/test/unit/YamlConfigurationTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/YamlConfigurationTests.cpp b/libminifi/test/unit/YamlConfigurationTests.cpp index 660ff53..ba73a34 100644 --- a/libminifi/test/unit/YamlConfigurationTests.cpp +++ b/libminifi/test/unit/YamlConfigurationTests.cpp @@ -129,7 +129,7 @@ TEST_CASE("Test YAML Config Processing", "[YamlConfiguration]") { " port name: provenance\n" " port: 8090\n" " port uuid: 2f389b8d-83f2-48d3-b465-048f28a1cb56\n" - " destination url: https://localhost:8090/\n" + " url: https://localhost:8090/\n" " originating url: http://${hostname(true)}:8081/nifi\n" " use compression: true\n" " timeout: 30 secs\n"