Repository: nifi-minifi-cpp Updated Branches: refs/heads/master 6220b37c1 -> e2e473267
MINIFICPP-404: http proxy support for s2s This closes #280. Signed-off-by: Marc Parisi <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/e2e47326 Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/e2e47326 Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/e2e47326 Branch: refs/heads/master Commit: e2e473267ca378462c166399718e85a22c7a3123 Parents: 6220b37 Author: Bin Qiu <[email protected]> Authored: Thu Mar 8 08:05:35 2018 -0800 Committer: Marc Parisi <[email protected]> Committed: Mon Mar 19 12:33:50 2018 -0400 ---------------------------------------------------------------------- README.md | 18 +++++++++- extensions/http-curl/client/HTTPClient.h | 12 +++++++ extensions/http-curl/sitetosite/HTTPProtocol.h | 6 +++- libminifi/include/RemoteProcessorGroupPort.h | 14 ++++++-- libminifi/include/core/ProcessGroup.h | 37 ++++++++++++++++++++ libminifi/include/sitetosite/Peer.h | 10 ++++++ libminifi/include/sitetosite/SiteToSite.h | 9 +++++ .../include/sitetosite/SiteToSiteFactory.h | 1 + libminifi/include/utils/HTTPClient.h | 10 ++++++ libminifi/src/RemoteProcessorGroupPort.cpp | 8 ++++- libminifi/src/core/ProcessGroup.cpp | 1 + libminifi/src/core/yaml/YamlConfiguration.cpp | 36 +++++++++++++++++++ 12 files changed, 157 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e2e47326/README.md ---------------------------------------------------------------------- diff --git a/README.md b/README.md index c01de51..2ad57b2 100644 --- a/README.md +++ b/README.md @@ -526,10 +526,26 @@ If during testing you have a need to disable host or peer verification, you may nifi.security.client.disable.peer.verification=true ### HTTP SiteToSite Configuration -To enable HTTPSiteToSite you must set the following flag to true. +To enable HTTPSiteToSite globally you must set the following flag to true. nifi.remote.input.http.enabled=true + +To enable HTTPSiteToSite for a remote process group. + Remote Processing Groups: + - name: NiFi Flow + transport protocol: HTTP +### HTTP SiteToSite Proxy Configuration +To enable HTTP Proxy for a remote process group. + + Remote Processing Groups: + - name: NiFi Flow + transport protocol: HTTP + proxy host: localhost + proxy port: 8888 + proxy user: + proxy password: + ### Command and Control Configuration For more more insight into the API used within the C2 agent, please visit: https://cwiki.apache.org/confluence/display/MINIFI/C2+Design+Proposal http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e2e47326/extensions/http-curl/client/HTTPClient.h ---------------------------------------------------------------------- diff --git a/extensions/http-curl/client/HTTPClient.h b/extensions/http-curl/client/HTTPClient.h index eabd7c2..5b04723 100644 --- a/extensions/http-curl/client/HTTPClient.h +++ b/extensions/http-curl/client/HTTPClient.h @@ -172,6 +172,18 @@ class HTTPClient : public BaseHTTPClient, public core::Connectable { void setPostSize(size_t size); + void setHTTPProxy(const utils::HTTPProxy &proxy) override { + if (!proxy.host.empty()) { + curl_easy_setopt(http_session_, CURLOPT_PROXY, proxy.host.c_str()); + curl_easy_setopt(http_session_, CURLOPT_PROXYPORT, proxy.port); + if (!proxy.username.empty()) { + curl_easy_setopt(http_session_, CURLOPT_PROXYAUTH, CURLAUTH_ANY); + std::string value = proxy.username + ":" + proxy.password; + curl_easy_setopt(http_session_, CURLOPT_PROXYUSERPWD, value.c_str()); + } + } + } + protected: inline bool matches(const std::string &value, const std::string &sregex) override; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e2e47326/extensions/http-curl/sitetosite/HTTPProtocol.h ---------------------------------------------------------------------- diff --git a/extensions/http-curl/sitetosite/HTTPProtocol.h b/extensions/http-curl/sitetosite/HTTPProtocol.h index 7ac4893..4e0147e 100644 --- a/extensions/http-curl/sitetosite/HTTPProtocol.h +++ b/extensions/http-curl/sitetosite/HTTPProtocol.h @@ -183,9 +183,13 @@ class HttpSiteToSiteClient : public sitetosite::SiteToSiteClient { } } if (!this->peer_->getInterface().empty()) { - logger_->log_info("HTTP Site2Site bind local network interface", this->peer_->getInterface()); + logger_->log_info("HTTP Site2Site bind local network interface %s", this->peer_->getInterface()); http_client_->setInterface(this->peer_->getInterface()); } + if (!this->peer_->getHTTPProxy().host.empty()) { + logger_->log_info("HTTP Site2Site setup http proxy host %s", this->peer_->getHTTPProxy().host); + http_client_->setHTTPProxy(this->peer_->getHTTPProxy()); + } return http_client_; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e2e47326/libminifi/include/RemoteProcessorGroupPort.h ---------------------------------------------------------------------- diff --git a/libminifi/include/RemoteProcessorGroupPort.h b/libminifi/include/RemoteProcessorGroupPort.h index 14200ee..aece744 100644 --- a/libminifi/include/RemoteProcessorGroupPort.h +++ b/libminifi/include/RemoteProcessorGroupPort.h @@ -148,7 +148,12 @@ class RemoteProcessorGroupPort : public core::Processor { } } } - + void setHTTPProxy(const utils::HTTPProxy &proxy) { + this->proxy_ = proxy; + } + utils::HTTPProxy getHTTPProxy() { + return this->proxy_; + } // refresh remoteSite2SiteInfo via nifi rest api void refreshRemoteSite2SiteInfo(); @@ -157,6 +162,10 @@ class RemoteProcessorGroupPort : public core::Processor { virtual void notifyStop(); + void enableHTTP() { + client_type_ = sitetosite::HTTP; + } + protected: std::shared_ptr<io::StreamFactory> stream_factory_; @@ -183,10 +192,11 @@ class RemoteProcessorGroupPort : public core::Processor { std::string protocol_; std::string url_; bool http_enabled_; + // http proxy + utils::HTTPProxy proxy_; sitetosite::CLIENT_TYPE client_type_; - // Remote Site2Site Info int site2site_port_; bool site2site_secure_; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e2e47326/libminifi/include/core/ProcessGroup.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/ProcessGroup.h b/libminifi/include/core/ProcessGroup.h index d6b7510..737d6de 100644 --- a/libminifi/include/core/ProcessGroup.h +++ b/libminifi/include/core/ProcessGroup.h @@ -35,6 +35,7 @@ #include "controller/ControllerServiceNode.h" #include "controller/ControllerServiceMap.h" #include "utils/Id.h" +#include "utils/HTTPClient.h" namespace org { namespace apache { @@ -97,6 +98,39 @@ class ProcessGroup { std::string getInterface() { return local_network_interface_; } + void setTransportProtocol(std::string &protocol) { + transport_protocol_ = protocol; + } + std::string getTransportProtocol() { + return transport_protocol_; + } + void setHttpProxyHost(std::string &host) { + proxy_.host = host; + } + std::string getHttpProxyHost() { + return proxy_.host; + } + void setHttpProxyUserName(std::string &username) { + proxy_.username = username; + } + std::string getHttpProxyUserName() { + return proxy_.username; + } + void setHttpProxyPassWord(std::string &password) { + proxy_.password = password; + } + std::string getHttpProxyPassWord() { + return proxy_.password; + } + void setHttpProxyPort(int port) { + proxy_.port = port; + } + int getHttpProxyPort() { + return proxy_.port; + } + utils::HTTPProxy getHTTPProxy() { + return proxy_; + } // Set Processor yield period in MilliSecond void setYieldPeriodMsec(uint64_t period) { yield_period_msec_ = period; @@ -201,6 +235,9 @@ class ProcessGroup { std::string local_network_interface_; // Transmitting std::atomic<bool> transmitting_; + // http proxy + utils::HTTPProxy proxy_; + std::string transport_protocol_; // controller services http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e2e47326/libminifi/include/sitetosite/Peer.h ---------------------------------------------------------------------- diff --git a/libminifi/include/sitetosite/Peer.h b/libminifi/include/sitetosite/Peer.h index cc097b6..1f9ec01 100644 --- a/libminifi/include/sitetosite/Peer.h +++ b/libminifi/include/sitetosite/Peer.h @@ -37,6 +37,7 @@ #include "io/ClientSocket.h" #include "io/BaseStream.h" #include "utils/TimeUtil.h" +#include "utils/HTTPClient.h" namespace org { namespace apache { @@ -172,6 +173,7 @@ class SiteToSitePeer : public org::apache::nifi::minifi::io::BaseStream { host_(std::move(ss.host_)), port_(std::move(ss.port_)), local_network_interface_(std::move(ss.local_network_interface_)), + proxy_(std::move(ss.proxy_)), logger_(std::move(ss.logger_)) { yield_expiration_.store(ss.yield_expiration_); timeout_.store(ss.timeout_); @@ -276,6 +278,12 @@ class SiteToSitePeer : public org::apache::nifi::minifi::io::BaseStream { uint64_t getTimeOut() { return timeout_; } + void setHTTPProxy(const utils::HTTPProxy &proxy) { + this->proxy_ = proxy; + } + utils::HTTPProxy getHTTPProxy() { + return this->proxy_; + } void setStream(std::unique_ptr<org::apache::nifi::minifi::io::DataStream> stream) { stream_ = nullptr; @@ -368,6 +376,8 @@ class SiteToSitePeer : public org::apache::nifi::minifi::io::BaseStream { std::string local_network_interface_; + utils::HTTPProxy proxy_; + // Mutex for protection std::mutex mutex_; // URL http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e2e47326/libminifi/include/sitetosite/SiteToSite.h ---------------------------------------------------------------------- diff --git a/libminifi/include/sitetosite/SiteToSite.h b/libminifi/include/sitetosite/SiteToSite.h index ec4cf44..484a277 100644 --- a/libminifi/include/sitetosite/SiteToSite.h +++ b/libminifi/include/sitetosite/SiteToSite.h @@ -25,6 +25,7 @@ #include "io/CRCStream.h" #include "io/StreamFactory.h" #include "utils/Id.h" +#include "utils/HTTPClient.h" namespace org { namespace apache { @@ -368,6 +369,12 @@ class SiteToSiteClientConfiguration { std::string getInterface() const { return local_network_interface_; } + void setHTTPProxy(const utils::HTTPProxy &proxy) { + proxy_ = proxy; + } + utils::HTTPProxy getHTTPProxy() const { + return this->proxy_; + } protected: @@ -382,6 +389,8 @@ class SiteToSiteClientConfiguration { // secore comms std::shared_ptr<controllers::SSLContextService> ssl_service_; + + utils::HTTPProxy proxy_; }; #if defined(__GNUC__) || defined(__GNUG__) #pragma GCC diagnostic pop http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e2e47326/libminifi/include/sitetosite/SiteToSiteFactory.h ---------------------------------------------------------------------- diff --git a/libminifi/include/sitetosite/SiteToSiteFactory.h b/libminifi/include/sitetosite/SiteToSiteFactory.h index 73959bb..848a152 100644 --- a/libminifi/include/sitetosite/SiteToSiteFactory.h +++ b/libminifi/include/sitetosite/SiteToSiteFactory.h @@ -82,6 +82,7 @@ static std::unique_ptr<SiteToSiteClient> createClient(const SiteToSiteClientConf ptr->setSSLContextService(client_configuration.getSecurityContext()); auto peer = std::unique_ptr<SiteToSitePeer>(new SiteToSitePeer(client_configuration.getPeer()->getHost(), client_configuration.getPeer()->getPort(), client_configuration.getInterface())); + peer->setHTTPProxy(client_configuration.getHTTPProxy()); char idStr[37]; uuid_unparse_lower(uuid, idStr); ptr->setPortId(uuid); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e2e47326/libminifi/include/utils/HTTPClient.h ---------------------------------------------------------------------- diff --git a/libminifi/include/utils/HTTPClient.h b/libminifi/include/utils/HTTPClient.h index 3d1383c..69674be 100644 --- a/libminifi/include/utils/HTTPClient.h +++ b/libminifi/include/utils/HTTPClient.h @@ -25,6 +25,13 @@ namespace nifi { namespace minifi { namespace utils { +struct HTTPProxy { + std::string host; + std::string username; + std::string password; + int port; +}; + struct HTTPUploadCallback { HTTPUploadCallback() { stop = false; @@ -257,6 +264,9 @@ class BaseHTTPClient { virtual void setDisablePeerVerification() { } + virtual void setHTTPProxy(const utils::HTTPProxy &proxy) { + } + virtual void setDisableHostVerification() { } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e2e47326/libminifi/src/RemoteProcessorGroupPort.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp b/libminifi/src/RemoteProcessorGroupPort.cpp index bbf697f..68f6831 100644 --- a/libminifi/src/RemoteProcessorGroupPort.cpp +++ b/libminifi/src/RemoteProcessorGroupPort.cpp @@ -68,6 +68,7 @@ std::unique_ptr<sitetosite::SiteToSiteClient> RemoteProcessorGroupPort::getNextP // create if (url_.empty()) { sitetosite::SiteToSiteClientConfiguration config(stream_factory_, std::make_shared<sitetosite::Peer>(protocol_uuid_, host_, port_, ssl_service != nullptr), this->getInterface(), client_type_); + config.setHTTPProxy(this->proxy_); nextProtocol = sitetosite::createClient(config); } else if (peer_index_ >= 0) { std::lock_guard<std::mutex> lock(peer_mutex_); @@ -78,7 +79,7 @@ std::unique_ptr<sitetosite::SiteToSiteClient> RemoteProcessorGroupPort::getNextP if (peer_index_ >= static_cast<int>(peers_.size())) { peer_index_ = 0; } - + config.setHTTPProxy(this->proxy_); nextProtocol = sitetosite::createClient(config); } else { logger_->log_debug("Refreshing the peer list since there are none configured."); @@ -177,6 +178,7 @@ void RemoteProcessorGroupPort::onSchedule(const std::shared_ptr<core::ProcessCon peer_index_ = 0; } logger_->log_trace("Creating client"); + config.setHTTPProxy(this->proxy_); nextProtocol = sitetosite::createClient(config); logger_->log_trace("Created client, moving into available protocols"); returnProtocol(std::move(nextProtocol)); @@ -301,6 +303,9 @@ void RemoteProcessorGroupPort::refreshRemoteSite2SiteInfo() { client->setDisablePeerVerification(); } } + if (!proxy_.host.empty()) { + client->setHTTPProxy(proxy_); + } if (!token.empty()) { std::string header = "Authorization: " + token; client->appendHeader(header); @@ -352,6 +357,7 @@ void RemoteProcessorGroupPort::refreshPeerList() { sitetosite::SiteToSiteClientConfiguration config(stream_factory_, std::make_shared<sitetosite::Peer>(protocol_uuid_, host_, site2site_port_, ssl_service != nullptr), this->getInterface(), client_type_); config.setSecurityContext(ssl_service); + config.setHTTPProxy(this->proxy_); protocol = sitetosite::createClient(config); protocol->getPeerList(peers_); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e2e47326/libminifi/src/core/ProcessGroup.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/ProcessGroup.cpp b/libminifi/src/core/ProcessGroup.cpp index 023ca9d..626c4e4 100644 --- a/libminifi/src/core/ProcessGroup.cpp +++ b/libminifi/src/core/ProcessGroup.cpp @@ -53,6 +53,7 @@ ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid, yield_period_msec_ = 0; transmitting_ = false; + transport_protocol_ = "RAW"; logger_->log_debug("ProcessGroup %s created", name_); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e2e47326/libminifi/src/core/yaml/YamlConfiguration.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/yaml/YamlConfiguration.cpp index 8f4820f..b065eff 100644 --- a/libminifi/src/core/yaml/YamlConfiguration.cpp +++ b/libminifi/src/core/yaml/YamlConfiguration.cpp @@ -305,6 +305,37 @@ void YamlConfiguration::parseRemoteProcessGroupYaml(YAML::Node *rpgNode, core::P group->setInterface(interface); } + if (currRpgNode["transport protocol"]) { + std::string transport_protocol = currRpgNode["transport protocol"].as<std::string>(); + logger_->log_debug("parseRemoteProcessGroupYaml: transport protocol => [%s]", transport_protocol); + if (transport_protocol == "HTTP") { + group->setTransportProtocol(transport_protocol); + if (currRpgNode["proxy host"]) { + std::string http_proxy_host = currRpgNode["proxy host"].as<std::string>(); + logger_->log_debug("parseRemoteProcessGroupYaml: proxy host => [%s]", http_proxy_host); + group->setHttpProxyHost(http_proxy_host); + if (currRpgNode["proxy user"]) { + std::string http_proxy_username = currRpgNode["proxy user"].as<std::string>(); + logger_->log_debug("parseRemoteProcessGroupYaml: proxy user => [%s]", http_proxy_username); + group->setHttpProxyUserName(http_proxy_username); + } + if (currRpgNode["proxy password"]) { + std::string http_proxy_password = currRpgNode["proxy password"].as<std::string>(); + logger_->log_debug("parseRemoteProcessGroupYaml: proxy password => [%s]", http_proxy_password); + group->setHttpProxyPassWord(http_proxy_password); + } + if (currRpgNode["proxy port"]) { + std::string http_proxy_port = currRpgNode["proxy port"].as<std::string>(); + int32_t port; + if (core::Property::StringToInt(http_proxy_port, port)) { + logger_->log_debug("parseRemoteProcessGroupYaml: proxy port => [%d]", port); + group->setHttpProxyPort(port); + } + } + } + } + } + group->setTransmitting(true); group->setURL(url); @@ -657,6 +688,11 @@ void YamlConfiguration::parsePortYaml(YAML::Node *portNode, processor->initialize(); if (!parent->getInterface().empty()) port->setInterface(parent->getInterface()); + if (parent->getTransportProtocol() == "HTTP") { + port->enableHTTP(); + if (!parent->getHttpProxyHost().empty()) + port->setHTTPProxy(parent->getHTTPProxy()); + } // handle port properties YAML::Node nodeVal = portNode->as<YAML::Node>();
