Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master 6220b37c1 -> e2e473267


MINIFICPP-404: http proxy support for s2s

This closes #280.

Signed-off-by: Marc Parisi <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/e2e47326
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/e2e47326
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/e2e47326

Branch: refs/heads/master
Commit: e2e473267ca378462c166399718e85a22c7a3123
Parents: 6220b37
Author: Bin Qiu <[email protected]>
Authored: Thu Mar 8 08:05:35 2018 -0800
Committer: Marc Parisi <[email protected]>
Committed: Mon Mar 19 12:33:50 2018 -0400

----------------------------------------------------------------------
 README.md                                       | 18 +++++++++-
 extensions/http-curl/client/HTTPClient.h        | 12 +++++++
 extensions/http-curl/sitetosite/HTTPProtocol.h  |  6 +++-
 libminifi/include/RemoteProcessorGroupPort.h    | 14 ++++++--
 libminifi/include/core/ProcessGroup.h           | 37 ++++++++++++++++++++
 libminifi/include/sitetosite/Peer.h             | 10 ++++++
 libminifi/include/sitetosite/SiteToSite.h       |  9 +++++
 .../include/sitetosite/SiteToSiteFactory.h      |  1 +
 libminifi/include/utils/HTTPClient.h            | 10 ++++++
 libminifi/src/RemoteProcessorGroupPort.cpp      |  8 ++++-
 libminifi/src/core/ProcessGroup.cpp             |  1 +
 libminifi/src/core/yaml/YamlConfiguration.cpp   | 36 +++++++++++++++++++
 12 files changed, 157 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e2e47326/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index c01de51..2ad57b2 100644
--- a/README.md
+++ b/README.md
@@ -526,10 +526,26 @@ If during testing you have a need to disable host or peer 
verification, you may
        nifi.security.client.disable.peer.verification=true
     
 ### HTTP SiteToSite Configuration
-To enable HTTPSiteToSite you must set the following flag to true. 
+To enable HTTPSiteToSite globally you must set the following flag to true.
        
     nifi.remote.input.http.enabled=true
+
+To enable HTTPSiteToSite for a remote process group.
+    Remote Processing Groups:
+    - name: NiFi Flow
+      transport protocol: HTTP
     
+### HTTP SiteToSite Proxy Configuration
+To enable HTTP Proxy for a remote process group.
+
+    Remote Processing Groups:
+    - name: NiFi Flow
+      transport protocol: HTTP
+      proxy host: localhost
+      proxy port: 8888
+      proxy user:
+      proxy password:
+
 ### Command and Control Configuration
 For more more insight into the API used within the C2 agent, please visit:
 https://cwiki.apache.org/confluence/display/MINIFI/C2+Design+Proposal

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e2e47326/extensions/http-curl/client/HTTPClient.h
----------------------------------------------------------------------
diff --git a/extensions/http-curl/client/HTTPClient.h 
b/extensions/http-curl/client/HTTPClient.h
index eabd7c2..5b04723 100644
--- a/extensions/http-curl/client/HTTPClient.h
+++ b/extensions/http-curl/client/HTTPClient.h
@@ -172,6 +172,18 @@ class HTTPClient : public BaseHTTPClient, public 
core::Connectable {
 
   void setPostSize(size_t size);
 
+  void setHTTPProxy(const utils::HTTPProxy &proxy) override {
+    if (!proxy.host.empty()) {
+      curl_easy_setopt(http_session_, CURLOPT_PROXY, proxy.host.c_str());
+      curl_easy_setopt(http_session_, CURLOPT_PROXYPORT, proxy.port);
+      if (!proxy.username.empty()) {
+        curl_easy_setopt(http_session_, CURLOPT_PROXYAUTH, CURLAUTH_ANY);
+        std::string value = proxy.username + ":" + proxy.password;
+        curl_easy_setopt(http_session_, CURLOPT_PROXYUSERPWD, value.c_str());
+      }
+    }
+  }
+
  protected:
 
   inline bool matches(const std::string &value, const std::string &sregex) 
override;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e2e47326/extensions/http-curl/sitetosite/HTTPProtocol.h
----------------------------------------------------------------------
diff --git a/extensions/http-curl/sitetosite/HTTPProtocol.h 
b/extensions/http-curl/sitetosite/HTTPProtocol.h
index 7ac4893..4e0147e 100644
--- a/extensions/http-curl/sitetosite/HTTPProtocol.h
+++ b/extensions/http-curl/sitetosite/HTTPProtocol.h
@@ -183,9 +183,13 @@ class HttpSiteToSiteClient : public 
sitetosite::SiteToSiteClient {
       }
     }
     if (!this->peer_->getInterface().empty()) {
-      logger_->log_info("HTTP Site2Site bind local network interface", 
this->peer_->getInterface());
+      logger_->log_info("HTTP Site2Site bind local network interface %s", 
this->peer_->getInterface());
       http_client_->setInterface(this->peer_->getInterface());
     }
+    if (!this->peer_->getHTTPProxy().host.empty()) {
+      logger_->log_info("HTTP Site2Site setup http proxy host %s", 
this->peer_->getHTTPProxy().host);
+      http_client_->setHTTPProxy(this->peer_->getHTTPProxy());
+    }
     return http_client_;
   }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e2e47326/libminifi/include/RemoteProcessorGroupPort.h
----------------------------------------------------------------------
diff --git a/libminifi/include/RemoteProcessorGroupPort.h 
b/libminifi/include/RemoteProcessorGroupPort.h
index 14200ee..aece744 100644
--- a/libminifi/include/RemoteProcessorGroupPort.h
+++ b/libminifi/include/RemoteProcessorGroupPort.h
@@ -148,7 +148,12 @@ class RemoteProcessorGroupPort : public core::Processor {
       }
     }
   }
-
+  void setHTTPProxy(const utils::HTTPProxy &proxy) {
+    this->proxy_ = proxy;
+  }
+  utils::HTTPProxy getHTTPProxy() {
+    return this->proxy_;
+  }
   // refresh remoteSite2SiteInfo via nifi rest api
   void refreshRemoteSite2SiteInfo();
 
@@ -157,6 +162,10 @@ class RemoteProcessorGroupPort : public core::Processor {
 
   virtual void notifyStop();
 
+  void enableHTTP() {
+    client_type_ = sitetosite::HTTP;
+  }
+
  protected:
 
   std::shared_ptr<io::StreamFactory> stream_factory_;
@@ -183,10 +192,11 @@ class RemoteProcessorGroupPort : public core::Processor {
   std::string protocol_;
   std::string url_;
   bool http_enabled_;
+  // http proxy
+  utils::HTTPProxy proxy_;
 
   sitetosite::CLIENT_TYPE client_type_;
 
-
   // Remote Site2Site Info
   int site2site_port_;
   bool site2site_secure_;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e2e47326/libminifi/include/core/ProcessGroup.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ProcessGroup.h 
b/libminifi/include/core/ProcessGroup.h
index d6b7510..737d6de 100644
--- a/libminifi/include/core/ProcessGroup.h
+++ b/libminifi/include/core/ProcessGroup.h
@@ -35,6 +35,7 @@
 #include "controller/ControllerServiceNode.h"
 #include "controller/ControllerServiceMap.h"
 #include "utils/Id.h"
+#include "utils/HTTPClient.h"
 
 namespace org {
 namespace apache {
@@ -97,6 +98,39 @@ class ProcessGroup {
   std::string getInterface() {
     return local_network_interface_;
   }
+  void setTransportProtocol(std::string &protocol) {
+    transport_protocol_ = protocol;
+  }
+  std::string getTransportProtocol() {
+    return transport_protocol_;
+  }
+  void setHttpProxyHost(std::string &host) {
+    proxy_.host = host;
+  }
+  std::string getHttpProxyHost() {
+    return proxy_.host;
+  }
+  void setHttpProxyUserName(std::string &username) {
+    proxy_.username = username;
+  }
+  std::string getHttpProxyUserName() {
+    return proxy_.username;
+  }
+  void setHttpProxyPassWord(std::string &password) {
+    proxy_.password = password;
+  }
+  std::string getHttpProxyPassWord() {
+    return proxy_.password;
+  }
+  void setHttpProxyPort(int port) {
+    proxy_.port = port;
+  }
+  int getHttpProxyPort() {
+    return proxy_.port;
+  }
+  utils::HTTPProxy getHTTPProxy() {
+    return proxy_;
+  }
   // Set Processor yield period in MilliSecond
   void setYieldPeriodMsec(uint64_t period) {
     yield_period_msec_ = period;
@@ -201,6 +235,9 @@ class ProcessGroup {
   std::string local_network_interface_;
   // Transmitting
   std::atomic<bool> transmitting_;
+  // http proxy
+  utils::HTTPProxy proxy_;
+  std::string transport_protocol_;
 
   // controller services
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e2e47326/libminifi/include/sitetosite/Peer.h
----------------------------------------------------------------------
diff --git a/libminifi/include/sitetosite/Peer.h 
b/libminifi/include/sitetosite/Peer.h
index cc097b6..1f9ec01 100644
--- a/libminifi/include/sitetosite/Peer.h
+++ b/libminifi/include/sitetosite/Peer.h
@@ -37,6 +37,7 @@
 #include "io/ClientSocket.h"
 #include "io/BaseStream.h"
 #include "utils/TimeUtil.h"
+#include "utils/HTTPClient.h"
 
 namespace org {
 namespace apache {
@@ -172,6 +173,7 @@ class SiteToSitePeer : public 
org::apache::nifi::minifi::io::BaseStream {
         host_(std::move(ss.host_)),
         port_(std::move(ss.port_)),
         local_network_interface_(std::move(ss.local_network_interface_)),
+        proxy_(std::move(ss.proxy_)),
         logger_(std::move(ss.logger_)) {
     yield_expiration_.store(ss.yield_expiration_);
     timeout_.store(ss.timeout_);
@@ -276,6 +278,12 @@ class SiteToSitePeer : public 
org::apache::nifi::minifi::io::BaseStream {
   uint64_t getTimeOut() {
     return timeout_;
   }
+  void setHTTPProxy(const utils::HTTPProxy &proxy) {
+    this->proxy_ = proxy;
+  }
+  utils::HTTPProxy getHTTPProxy() {
+    return this->proxy_;
+  }
 
   void setStream(std::unique_ptr<org::apache::nifi::minifi::io::DataStream> 
stream) {
     stream_ = nullptr;
@@ -368,6 +376,8 @@ class SiteToSitePeer : public 
org::apache::nifi::minifi::io::BaseStream {
 
   std::string local_network_interface_;
 
+  utils::HTTPProxy proxy_;
+
   // Mutex for protection
   std::mutex mutex_;
   // URL

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e2e47326/libminifi/include/sitetosite/SiteToSite.h
----------------------------------------------------------------------
diff --git a/libminifi/include/sitetosite/SiteToSite.h 
b/libminifi/include/sitetosite/SiteToSite.h
index ec4cf44..484a277 100644
--- a/libminifi/include/sitetosite/SiteToSite.h
+++ b/libminifi/include/sitetosite/SiteToSite.h
@@ -25,6 +25,7 @@
 #include "io/CRCStream.h"
 #include "io/StreamFactory.h"
 #include "utils/Id.h"
+#include "utils/HTTPClient.h"
 
 namespace org {
 namespace apache {
@@ -368,6 +369,12 @@ class SiteToSiteClientConfiguration {
   std::string getInterface() const {
     return local_network_interface_;
   }
+  void setHTTPProxy(const utils::HTTPProxy &proxy) {
+    proxy_ = proxy;
+  }
+  utils::HTTPProxy getHTTPProxy() const {
+    return this->proxy_;
+  }
 
  protected:
 
@@ -382,6 +389,8 @@ class SiteToSiteClientConfiguration {
   // secore comms
 
   std::shared_ptr<controllers::SSLContextService> ssl_service_;
+
+  utils::HTTPProxy proxy_;
 };
 #if defined(__GNUC__) || defined(__GNUG__)
 #pragma GCC diagnostic pop

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e2e47326/libminifi/include/sitetosite/SiteToSiteFactory.h
----------------------------------------------------------------------
diff --git a/libminifi/include/sitetosite/SiteToSiteFactory.h 
b/libminifi/include/sitetosite/SiteToSiteFactory.h
index 73959bb..848a152 100644
--- a/libminifi/include/sitetosite/SiteToSiteFactory.h
+++ b/libminifi/include/sitetosite/SiteToSiteFactory.h
@@ -82,6 +82,7 @@ static std::unique_ptr<SiteToSiteClient> createClient(const 
SiteToSiteClientConf
         ptr->setSSLContextService(client_configuration.getSecurityContext());
         auto peer = std::unique_ptr<SiteToSitePeer>(new 
SiteToSitePeer(client_configuration.getPeer()->getHost(), 
client_configuration.getPeer()->getPort(),
             client_configuration.getInterface()));
+        peer->setHTTPProxy(client_configuration.getHTTPProxy());
         char idStr[37];
         uuid_unparse_lower(uuid, idStr);
         ptr->setPortId(uuid);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e2e47326/libminifi/include/utils/HTTPClient.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/HTTPClient.h 
b/libminifi/include/utils/HTTPClient.h
index 3d1383c..69674be 100644
--- a/libminifi/include/utils/HTTPClient.h
+++ b/libminifi/include/utils/HTTPClient.h
@@ -25,6 +25,13 @@ namespace nifi {
 namespace minifi {
 namespace utils {
 
+struct HTTPProxy {
+  std::string host;
+  std::string username;
+  std::string password;
+  int port;
+};
+
 struct HTTPUploadCallback {
   HTTPUploadCallback() {
     stop = false;
@@ -257,6 +264,9 @@ class BaseHTTPClient {
   virtual void setDisablePeerVerification() {
   }
 
+  virtual void setHTTPProxy(const utils::HTTPProxy &proxy) {
+  }
+
   virtual void setDisableHostVerification() {
   }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e2e47326/libminifi/src/RemoteProcessorGroupPort.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp 
b/libminifi/src/RemoteProcessorGroupPort.cpp
index bbf697f..68f6831 100644
--- a/libminifi/src/RemoteProcessorGroupPort.cpp
+++ b/libminifi/src/RemoteProcessorGroupPort.cpp
@@ -68,6 +68,7 @@ std::unique_ptr<sitetosite::SiteToSiteClient> 
RemoteProcessorGroupPort::getNextP
       // create
       if (url_.empty()) {
         sitetosite::SiteToSiteClientConfiguration config(stream_factory_, 
std::make_shared<sitetosite::Peer>(protocol_uuid_, host_, port_, ssl_service != 
nullptr), this->getInterface(), client_type_);
+        config.setHTTPProxy(this->proxy_);
         nextProtocol = sitetosite::createClient(config);
       } else if (peer_index_ >= 0) {
         std::lock_guard<std::mutex> lock(peer_mutex_);
@@ -78,7 +79,7 @@ std::unique_ptr<sitetosite::SiteToSiteClient> 
RemoteProcessorGroupPort::getNextP
         if (peer_index_ >= static_cast<int>(peers_.size())) {
           peer_index_ = 0;
         }
-
+        config.setHTTPProxy(this->proxy_);
         nextProtocol = sitetosite::createClient(config);
       } else {
         logger_->log_debug("Refreshing the peer list since there are none 
configured.");
@@ -177,6 +178,7 @@ void RemoteProcessorGroupPort::onSchedule(const 
std::shared_ptr<core::ProcessCon
         peer_index_ = 0;
       }
       logger_->log_trace("Creating client");
+      config.setHTTPProxy(this->proxy_);
       nextProtocol = sitetosite::createClient(config);
       logger_->log_trace("Created client, moving into available protocols");
       returnProtocol(std::move(nextProtocol));
@@ -301,6 +303,9 @@ void RemoteProcessorGroupPort::refreshRemoteSite2SiteInfo() 
{
       client->setDisablePeerVerification();
     }
   }
+  if (!proxy_.host.empty()) {
+    client->setHTTPProxy(proxy_);
+  }
   if (!token.empty()) {
     std::string header = "Authorization: " + token;
     client->appendHeader(header);
@@ -352,6 +357,7 @@ void RemoteProcessorGroupPort::refreshPeerList() {
   sitetosite::SiteToSiteClientConfiguration config(stream_factory_, 
std::make_shared<sitetosite::Peer>(protocol_uuid_, host_,
     site2site_port_, ssl_service != nullptr), this->getInterface(), 
client_type_);
   config.setSecurityContext(ssl_service);
+  config.setHTTPProxy(this->proxy_);
   protocol = sitetosite::createClient(config);
 
   protocol->getPeerList(peers_);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e2e47326/libminifi/src/core/ProcessGroup.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ProcessGroup.cpp 
b/libminifi/src/core/ProcessGroup.cpp
index 023ca9d..626c4e4 100644
--- a/libminifi/src/core/ProcessGroup.cpp
+++ b/libminifi/src/core/ProcessGroup.cpp
@@ -53,6 +53,7 @@ ProcessGroup::ProcessGroup(ProcessGroupType type, std::string 
name, uuid_t uuid,
 
   yield_period_msec_ = 0;
   transmitting_ = false;
+  transport_protocol_ = "RAW";
 
   logger_->log_debug("ProcessGroup %s created", name_);
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e2e47326/libminifi/src/core/yaml/YamlConfiguration.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp 
b/libminifi/src/core/yaml/YamlConfiguration.cpp
index 8f4820f..b065eff 100644
--- a/libminifi/src/core/yaml/YamlConfiguration.cpp
+++ b/libminifi/src/core/yaml/YamlConfiguration.cpp
@@ -305,6 +305,37 @@ void 
YamlConfiguration::parseRemoteProcessGroupYaml(YAML::Node *rpgNode, core::P
           group->setInterface(interface);
         }
 
+        if (currRpgNode["transport protocol"]) {
+          std::string transport_protocol = currRpgNode["transport 
protocol"].as<std::string>();
+          logger_->log_debug("parseRemoteProcessGroupYaml: transport protocol 
=> [%s]", transport_protocol);
+          if (transport_protocol == "HTTP") {
+            group->setTransportProtocol(transport_protocol);
+            if (currRpgNode["proxy host"]) {
+              std::string http_proxy_host = currRpgNode["proxy 
host"].as<std::string>();
+              logger_->log_debug("parseRemoteProcessGroupYaml: proxy host => 
[%s]", http_proxy_host);
+              group->setHttpProxyHost(http_proxy_host);
+              if (currRpgNode["proxy user"]) {
+                std::string http_proxy_username = currRpgNode["proxy 
user"].as<std::string>();
+                logger_->log_debug("parseRemoteProcessGroupYaml: proxy user => 
[%s]", http_proxy_username);
+                group->setHttpProxyUserName(http_proxy_username);
+              }
+              if (currRpgNode["proxy password"]) {
+                std::string http_proxy_password = currRpgNode["proxy 
password"].as<std::string>();
+                logger_->log_debug("parseRemoteProcessGroupYaml: proxy 
password => [%s]", http_proxy_password);
+                group->setHttpProxyPassWord(http_proxy_password);
+              }
+              if (currRpgNode["proxy port"]) {
+                std::string http_proxy_port = currRpgNode["proxy 
port"].as<std::string>();
+                int32_t port;
+                if (core::Property::StringToInt(http_proxy_port, port)) {
+                  logger_->log_debug("parseRemoteProcessGroupYaml: proxy port 
=> [%d]", port);
+                  group->setHttpProxyPort(port);
+                }
+              }
+            }
+          }
+        }
+
         group->setTransmitting(true);
         group->setURL(url);
 
@@ -657,6 +688,11 @@ void YamlConfiguration::parsePortYaml(YAML::Node *portNode,
   processor->initialize();
   if (!parent->getInterface().empty())
     port->setInterface(parent->getInterface());
+  if (parent->getTransportProtocol() == "HTTP") {
+    port->enableHTTP();
+    if (!parent->getHttpProxyHost().empty())
+      port->setHTTPProxy(parent->getHTTPProxy());
+  }
 
   // handle port properties
   YAML::Node nodeVal = portNode->as<YAML::Node>();

Reply via email to