Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master b20da80eb -> 20622f6d1


MINIFI-70: enhance site2site port negotiation

This closes #119.

Signed-off-by: Marc Parisi <phroc...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/20622f6d
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/20622f6d
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/20622f6d

Branch: refs/heads/master
Commit: 20622f6d13250b9562e3e30949639ae3402aa09d
Parents: b20da80
Author: Bin Qiu <benqiu2...@gmail.com>
Authored: Tue Jul 18 10:24:35 2017 -0700
Committer: Marc Parisi <phroc...@apache.org>
Committed: Thu Jul 20 11:39:46 2017 -0400

----------------------------------------------------------------------
 README.md                                       |  28 +-
 cmake/BuildTests.cmake                          |   2 +
 conf/config.yml                                 |  53 +++-
 conf/minifi-log.properties                      |   2 +-
 conf/minifi.properties                          |  26 +-
 libminifi/include/ConfigurationListener.h       |   9 +-
 libminifi/include/HttpConfigurationListener.h   |  12 +-
 libminifi/include/RemoteProcessorGroupPort.h    |  49 +++-
 libminifi/include/Site2SiteClientProtocol.h     |  11 +
 libminifi/include/core/FlowConfiguration.h      |   2 +
 .../SiteToSiteProvenanceReportingTask.h         |   4 +-
 libminifi/include/properties/Configure.h        |  15 +-
 libminifi/include/utils/HTTPUtils.h             | 208 ++++++++++++++
 libminifi/src/ConfigurationListener.cpp         |  40 ---
 libminifi/src/Configure.cpp                     |  24 +-
 libminifi/src/HttpConfigurationListener.cpp     |  60 +---
 libminifi/src/RemoteProcessorGroupPort.cpp      | 198 ++++++++++++--
 libminifi/src/Site2SiteClientProtocol.cpp       | 273 ++++++++++---------
 libminifi/src/core/FlowConfiguration.cpp        |   2 +-
 .../SiteToSiteProvenanceReportingTask.cpp       |   1 +
 libminifi/src/core/yaml/YamlConfiguration.cpp   |  47 ++--
 .../test/integration/Site2SiteRestTest.cpp      | 145 ++++++++++
 libminifi/test/resources/TestSite2SiteRest.yml  |  58 ++++
 libminifi/test/unit/ProcessorTests.cpp          |   3 +-
 libminifi/test/unit/YamlConfigurationTests.cpp  |   2 +-
 25 files changed, 939 insertions(+), 335 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/20622f6d/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index b7f532c..ac2f97a 100644
--- a/README.md
+++ b/README.md
@@ -278,8 +278,6 @@ Additionally, users can utilize the MiNiFi Toolkit 
Converter (version 0.0.1 - sc
                 name: From Node A
                 max concurrent tasks: 1
                 Properties:
-                    Port: 10001
-                    Host Name: localhost
 
 ### Site2Site Security Configuration
 
@@ -319,8 +317,7 @@ Additionally, users can utilize the MiNiFi Toolkit 
Converter (version 0.0.1 - sc
     Provenance Reporting:
       scheduling strategy: TIMER_DRIVEN
       scheduling period: 1 sec
-      port: 10001
-      host: localhost
+      url: http://localhost:8080/nifi
       port uuid: 471deef6-2a6e-4a7d-912a-81cc17e3a204
       batch size: 100
 
@@ -334,14 +331,27 @@ Additionally, users can utilize the MiNiFi Toolkit 
Converter (version 0.0.1 - sc
     nifi.configuration.listener.type=http
     nifi.configuration.listener.http.url=https://localhost:8080
     nifi.configuration.listener.pull.interval=1 sec
-    nifi.configuration.listener.client.ca.certificate=./conf/nifi-cert.pem
 
     if you want to enable client certificate
-    nifi.configuration.listener.need.ClientAuth=true
-    nifi.configuration.listener.client.certificate=./conf/client.pem
-    nifi.configuration.listener.client.private.key=./conf/client.key
-    nifi.configuration.listener.client.pass.phrase=./conf/password
+    nifi.https.need.ClientAuth=true
+    nifi.https.client.certificate=./conf/client.pem
+    nifi.https.client.private.key=./conf/client.key
+    nifi.https.client.pass.phrase=./conf/password
+    nifi.https.client.ca.certificate=./conf/nifi-cert.pem
 
+### REST API access
+
+    Configure REST API user name and password
+    nifi.rest.api.user.name=admin
+    nifi.rest.api.password=password
+
+    if you want to enable client certificate
+    nifi.https.need.ClientAuth=true
+    nifi.https.client.certificate=./conf/client.pem
+    nifi.https.client.private.key=./conf/client.key
+    nifi.https.client.pass.phrase=./conf/password
+    nifi.https.client.ca.certificate=./conf/nifi-cert.pem
+      
 ### UID generation
 
 MiNiFi needs to generate many unique identifiers in the course of operations.  
There are a few different uid implementations available that can be configured 
in minifi-uid.properties.

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/20622f6d/cmake/BuildTests.cmake
----------------------------------------------------------------------
diff --git a/cmake/BuildTests.cmake b/cmake/BuildTests.cmake
index aedae10..59f1d59 100644
--- a/cmake/BuildTests.cmake
+++ b/cmake/BuildTests.cmake
@@ -94,4 +94,6 @@ add_test(NAME HttpGetIntegrationTestSecure COMMAND 
HttpGetIntegrationTest "${TES
 
 add_test(NAME HttpPostIntegrationTest COMMAND HttpPostIntegrationTest 
"${TEST_RESOURCES}/TestHTTPPost.yml" )
 
+add_test(NAME Site2SiteRestTest COMMAND Site2SiteRestTest 
"${TEST_RESOURCES}/TestSite2SiteRest.yml" "${TEST_RESOURCES}/")
+
 add_test(NAME TestExecuteProcess COMMAND TestExecuteProcess )

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/20622f6d/conf/config.yml
----------------------------------------------------------------------
diff --git a/conf/config.yml b/conf/config.yml
index b50c609..b714197 100644
--- a/conf/config.yml
+++ b/conf/config.yml
@@ -14,7 +14,52 @@
 # limitations under the License.
 
 Flow Controller:
-  name: MiNiFi Flow
-Processors: []
-Connections: []
-Remote Processing Groups: []
+    id: 471deef6-2a6e-4a7d-912a-81cc17e3a205
+    name: MiNiFi Flow
+
+Processors:
+    - name: GetFile
+      id: 471deef6-2a6e-4a7d-912a-81cc17e3a206
+      class: org.apache.nifi.processors.standard.GetFile
+      max concurrent tasks: 1
+      scheduling strategy: TIMER_DRIVEN
+      scheduling period: 1 sec
+      penalization period: 30 sec
+      yield period: 10 sec
+      run duration nanos: 0
+      auto-terminated relationships list:
+      Properties:
+          Input Directory: /tmp/getfile
+          Keep Source File: true
+
+Connections:
+    - name: GenerateFlowFileS2S
+      id: 471deef6-2a6e-4a7d-912a-81cc17e3a207
+      source id: 471deef6-2a6e-4a7d-912a-81cc17e3a206 
+      source relationship name: success
+      destination id: 471deef6-2a6e-4a7d-912a-81cc17e3a204
+      max work queue size: 0
+      max work queue data size: 1 MB
+      flowfile expiration: 60 sec
+      queue prioritizer class: 
org.apache.nifi.prioritizer.NewestFlowFileFirstPrioritizer
+
+Remote Processing Groups:
+    - name: NiFi Flow
+      id: 471deef6-2a6e-4a7d-912a-81cc17e3a208
+      url: https://localhost:9443/nifi
+      timeout: 30 secs
+      yield period: 10 sec
+      Input Ports:
+          - id: 471deef6-2a6e-4a7d-912a-81cc17e3a204
+            name: From Node A
+            max concurrent tasks: 1
+            use compression: false
+            Properties: # Deviates from spec and will later be removed when 
this is autonegotiated
+                Port UUID: 471deef6-2a6e-4a7d-912a-81cc17e3a204
+
+Provenance Reporting:
+      scheduling strategy: TIMER_DRIVEN
+      scheduling period: 1 sec
+      url: https://localhost:9443/nifi
+      port uuid: 471deef6-2a6e-4a7d-912a-81cc17e3a204
+      batch size: 100

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/20622f6d/conf/minifi-log.properties
----------------------------------------------------------------------
diff --git a/conf/minifi-log.properties b/conf/minifi-log.properties
index 870bb7a..99caa1b 100644
--- a/conf/minifi-log.properties
+++ b/conf/minifi-log.properties
@@ -36,7 +36,7 @@ appender.rolling.max_file_size=5242880
 logger.root=INFO,rolling
 
 #Logging configurable by namespace
-#logger.org::apache::nifi::minifi=DEBUG,rolling
+logger.org::apache::nifi::minifi=DEBUG,rolling
 
 #Logging configurable by class fully qualified name
 #logger.org::apache::nifi::minifi::core::logging::LoggerConfiguration=DEBUG

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/20622f6d/conf/minifi.properties
----------------------------------------------------------------------
diff --git a/conf/minifi.properties b/conf/minifi.properties
index cfa2858..8e71818 100644
--- a/conf/minifi.properties
+++ b/conf/minifi.properties
@@ -14,7 +14,7 @@
 # limitations under the License.
 
 # Core Properties #
-nifi.version=0.2.0
+nifi.version=0.1.0
 nifi.flow.configuration.file=./conf/config.yml
 nifi.administrative.yield.duration=30 sec
 # If a component has no work to do (is "bored"), how long should we wait 
before checking again for work?
@@ -23,21 +23,11 @@ nifi.bored.yield.duration=10 millis
 nifi.provenance.repository.directory.default=./provenance_repository
 nifi.provenance.repository.max.storage.time=1 MIN
 nifi.provenance.repository.max.storage.size=1 MB
-# FlowFileRepository #
-nifi.flowfile.repository.enable=true
-nifi.flowfile.repository.directory.default=./flowfile_repository
-nifi.flowfile.repository.max.storage.time=10 MIN
-nifi.flowfile.repository.max.storage.size=1 MB
-
-# Security Related Properties #
-# Enable tls ssl
 #nifi.remote.input.secure=true
-# Enable client certificate base authorization
-#nifi.security.need.ClientAuth=true
-# Client certificate and private key PEM files
-#nifi.security.client.certificate=./conf/client.pem
-#nifi.security.client.private.key=./conf/client.pem
-# Client private key passphrase file
-#nifi.security.client.pass.phrase=./conf/password
-# Setup the client CA certificate file
-#nifi.security.client.ca.certificate=./conf/nifi-cert.pem
+nifi.https.need.ClientAuth=true
+nifi.https.client.certificate=./conf/client.pem
+nifi.https.client.private.key=./conf/client.key
+nifi.https.client.pass.phrase=./conf/password
+nifi.https.client.ca.certificate=./conf/nifi-cert.pem
+#nifi.rest.api.user.name=admin
+#nifi.rest.api.password=password

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/20622f6d/libminifi/include/ConfigurationListener.h
----------------------------------------------------------------------
diff --git a/libminifi/include/ConfigurationListener.h 
b/libminifi/include/ConfigurationListener.h
index 5574226..856ea95 100644
--- a/libminifi/include/ConfigurationListener.h
+++ b/libminifi/include/ConfigurationListener.h
@@ -51,9 +51,10 @@ public:
   ConfigurationListener(std::shared_ptr<FlowController> controller,
       std::shared_ptr<Configure> configure, std::string type) :
       connect_timeout_(20000), read_timeout_(20000), type_(type), configure_(
-          configure), controller_(controller), need_client_certificate_(false) 
{
+          configure), controller_(controller) {
     logger_ = logging::LoggerFactory<ConfigurationListener>::getLogger();
     running_ = false;
+
   }
   // Destructor
   virtual ~ConfigurationListener() {
@@ -106,12 +107,6 @@ protected:
   std::shared_ptr<Configure> configure_;
   std::shared_ptr<logging::Logger> logger_;
   std::shared_ptr<FlowController> controller_;
-
-  bool need_client_certificate_;
-  std::string certificate_;
-  std::string private_key_;
-  std::string passphrase_;
-  std::string ca_certificate_;
 };
 
 } /* namespace minifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/20622f6d/libminifi/include/HttpConfigurationListener.h
----------------------------------------------------------------------
diff --git a/libminifi/include/HttpConfigurationListener.h 
b/libminifi/include/HttpConfigurationListener.h
index 72d4728..7e3291e 100644
--- a/libminifi/include/HttpConfigurationListener.h
+++ b/libminifi/include/HttpConfigurationListener.h
@@ -40,7 +40,8 @@ public:
    */
   HttpConfigurationListener(std::shared_ptr<FlowController> controller,
       std::shared_ptr<Configure> configure) :
-      minifi::ConfigurationListener(controller, configure, "http") {
+      minifi::ConfigurationListener(controller, configure, "http"),
+      securityConfig_(configure) {
       std::string value;
 
       if (configure->get(Configure::nifi_configuration_listener_http_url, 
value)) {
@@ -56,14 +57,6 @@ public:
 
   bool pullConfiguration(std::string &configuration);
 
-  /**
-    * Configures a secure connection
-    */
-  void configureSecureConnection(CURL *http_session);
-
-  static CURLcode configureSSLContext(CURL *curl, void *ctx, void *param);
-  static int pemPassWordCb(char *buf, int size, int rwflag, void *param);
-
   // Destructor
   virtual ~HttpConfigurationListener() {
     this->stop();
@@ -71,6 +64,7 @@ public:
   }
 
 protected:
+  minifi::utils::HTTPSecurityConfiguration securityConfig_;
 
 };
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/20622f6d/libminifi/include/RemoteProcessorGroupPort.h
----------------------------------------------------------------------
diff --git a/libminifi/include/RemoteProcessorGroupPort.h 
b/libminifi/include/RemoteProcessorGroupPort.h
index 9f89b07..d484fb9 100644
--- a/libminifi/include/RemoteProcessorGroupPort.h
+++ b/libminifi/include/RemoteProcessorGroupPort.h
@@ -23,6 +23,7 @@
 #include <mutex>
 #include <memory>
 #include <stack>
+#include "utils/HTTPUtils.h"
 #include "concurrentqueue.h"
 #include "FlowFileRecord.h"
 #include "core/Processor.h"
@@ -42,20 +43,30 @@ class RemoteProcessorGroupPort : public core::Processor {
   /*!
    * Create a new processor
    */
-  RemoteProcessorGroupPort(const std::shared_ptr<io::StreamFactory> 
&stream_factory, std::string name, uuid_t uuid = nullptr)
+  RemoteProcessorGroupPort(const std::shared_ptr<io::StreamFactory> 
&stream_factory, std::string name, std::string url, std::shared_ptr<Configure> 
configure, uuid_t uuid = nullptr)
       : core::Processor(name, uuid),
+        configure_(configure),
         direction_(SEND),
         transmitting_(false),
-        logger_(logging::LoggerFactory<RemoteProcessorGroupPort>::getLogger()) 
{
+        logger_(logging::LoggerFactory<RemoteProcessorGroupPort>::getLogger()),
+        url_(url),
+        securityConfig_(configure) {
     stream_factory_ = stream_factory;
     if (uuid != nullptr) {
       uuid_copy(protocol_uuid_, uuid);
     }
+    site2site_port_ = -1;
+    site2site_secure_ = false;
+    site2site_peer_index_ = -1;
+    // REST API port and host
+    port_ = -1;
+    utils::parse_url(url_, host_, port_, protocol_);
   }
   // Destructor
   virtual ~RemoteProcessorGroupPort() {
 
   }
+
   // Processor Name
   static const char *ProcessorName;
   // Supported Properties
@@ -84,6 +95,25 @@ class RemoteProcessorGroupPort : public core::Processor {
   void setTransmitting(bool val) {
     transmitting_ = val;
   }
+  // setURL
+  void setURL(std::string val) {
+    url_ = val;
+    utils::parse_url(url_, host_, port_, protocol_);
+    if (port_ == -1) {
+      if (protocol_.find("https") != std::string::npos) {
+        port_ = 443;
+      }
+      else if (protocol_.find("http") != std::string::npos) {
+        port_ = 80;
+      }
+    }
+  }
+
+  // refresh remoteSite2SiteInfo via nifi rest api
+  void refreshRemoteSite2SiteInfo();
+
+  // refresh site2site peer list
+  void refreshPeerList();
 
  protected:
 
@@ -93,6 +123,7 @@ class RemoteProcessorGroupPort : public core::Processor {
 
   moodycamel::ConcurrentQueue<std::unique_ptr<Site2SiteClientProtocol>> 
available_protocols_;
 
+  std::shared_ptr<Configure> configure_;
   // Logger
   std::shared_ptr<logging::Logger> logger_;
   // Transaction Direction
@@ -104,9 +135,21 @@ class RemoteProcessorGroupPort : public core::Processor {
 
   uuid_t protocol_uuid_;
 
+  // rest API end point info
   std::string host_;
+  int port_;
+  std::string protocol_;
+  std::string url_;
 
-  uint16_t port_;
+  // Remote Site2Site Info
+  int site2site_port_;
+  bool site2site_secure_;
+  std::vector<minifi::Site2SitePeerStatus> site2site_peer_status_list_;
+  std::atomic<int> site2site_peer_index_;
+  std::mutex site2site_peer_mutex_;
+  std::string rest_user_name_;
+  std::string rest_password_;
+  minifi::utils::HTTPSecurityConfiguration securityConfig_;
 
 };
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/20622f6d/libminifi/include/Site2SiteClientProtocol.h
----------------------------------------------------------------------
diff --git a/libminifi/include/Site2SiteClientProtocol.h 
b/libminifi/include/Site2SiteClientProtocol.h
index a987459..8d89004 100644
--- a/libminifi/include/Site2SiteClientProtocol.h
+++ b/libminifi/include/Site2SiteClientProtocol.h
@@ -388,6 +388,15 @@ class DataPacket {
 
 };
 
+/**
+  * Site2Site Peer
+  */
+ typedef struct Site2SitePeerStatus {
+   std::string host_;
+   int port_;
+   bool isSecure_;
+ } Site2SitePeerStatus;
+
 // Site2SiteClientProtocol Class
 class Site2SiteClientProtocol {
  public:
@@ -476,6 +485,8 @@ class Site2SiteClientProtocol {
   }
   // bootstrap the protocol to the ready for transaction state by going 
through the state machine
   bool bootstrap();
+  // get peerList
+  bool getPeerList(std::vector<minifi::Site2SitePeerStatus> &peer);
   // establish
   bool establish();
   // handShake

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/20622f6d/libminifi/include/core/FlowConfiguration.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/FlowConfiguration.h 
b/libminifi/include/core/FlowConfiguration.h
index 6e2b700..3429166 100644
--- a/libminifi/include/core/FlowConfiguration.h
+++ b/libminifi/include/core/FlowConfiguration.h
@@ -65,6 +65,7 @@ class FlowConfiguration : public CoreComponent {
         flow_file_repo_(flow_file_repo),
         config_path_(path),
         stream_factory_(stream_factory),
+        configuration_(configuration),
         logger_(logging::LoggerFactory<FlowConfiguration>::getLogger()) {
     controller_services_ = 
std::make_shared<core::controller::ControllerServiceMap>();
     service_provider_ = 
std::make_shared<core::controller::StandardControllerServiceProvider>(controller_services_,
 nullptr, configuration);
@@ -128,6 +129,7 @@ class FlowConfiguration : public CoreComponent {
   std::shared_ptr<core::Repository> flow_file_repo_;
   // stream factory
   std::shared_ptr<io::StreamFactory> stream_factory_;
+  std::shared_ptr<Configure> configuration_;
 
  private:
   std::shared_ptr<logging::Logger> logger_;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/20622f6d/libminifi/include/core/reporting/SiteToSiteProvenanceReportingTask.h
----------------------------------------------------------------------
diff --git 
a/libminifi/include/core/reporting/SiteToSiteProvenanceReportingTask.h 
b/libminifi/include/core/reporting/SiteToSiteProvenanceReportingTask.h
index 9e3f567..e1d80e8 100644
--- a/libminifi/include/core/reporting/SiteToSiteProvenanceReportingTask.h
+++ b/libminifi/include/core/reporting/SiteToSiteProvenanceReportingTask.h
@@ -45,8 +45,8 @@ class SiteToSiteProvenanceReportingTask : public 
minifi::RemoteProcessorGroupPor
   /*!
    * Create a new processor
    */
-  SiteToSiteProvenanceReportingTask(const std::shared_ptr<io::StreamFactory> 
&stream_factory)
-      : minifi::RemoteProcessorGroupPort(stream_factory, ReportTaskName),
+  SiteToSiteProvenanceReportingTask(const std::shared_ptr<io::StreamFactory> 
&stream_factory, std::shared_ptr<Configure> configure)
+      : minifi::RemoteProcessorGroupPort(stream_factory, ReportTaskName, "", 
configure, NULL),
         
logger_(logging::LoggerFactory<SiteToSiteProvenanceReportingTask>::getLogger()) 
{
     this->setTriggerWhenEmpty(true);
     port_ = 0;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/20622f6d/libminifi/include/properties/Configure.h
----------------------------------------------------------------------
diff --git a/libminifi/include/properties/Configure.h 
b/libminifi/include/properties/Configure.h
index fa19a18..13da55a 100644
--- a/libminifi/include/properties/Configure.h
+++ b/libminifi/include/properties/Configure.h
@@ -63,12 +63,15 @@ class Configure : public Properties {
   static const char *nifi_configuration_listener_http_url;
   static const char *nifi_configuration_listener_rest_url;
   static const char *nifi_configuration_listener_type; // http or rest
-  // configuration listener security config
-  static const char *nifi_configuration_listener_need_ClientAuth;
-  static const char *nifi_configuration_listener_client_certificate;
-  static const char *nifi_configuration_listener_private_key;
-  static const char *nifi_configuration_listener_client_pass_phrase;
-  static const char *nifi_configuration_listener_client_ca_certificate;
+  // security config for all https service
+  static const char *nifi_https_need_ClientAuth;
+  static const char *nifi_https_client_certificate;
+  static const char *nifi_https_client_private_key;
+  static const char *nifi_https_client_pass_phrase;
+  static const char *nifi_https_client_ca_certificate;
+  // nifi rest api user name and password
+  static const char *nifi_rest_api_user_name;
+  static const char *nifi_rest_api_password;
 };
 
 } /* namespace minifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/20622f6d/libminifi/include/utils/HTTPUtils.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/HTTPUtils.h 
b/libminifi/include/utils/HTTPUtils.h
index 3f20f5e..46aa67a 100644
--- a/libminifi/include/utils/HTTPUtils.h
+++ b/libminifi/include/utils/HTTPUtils.h
@@ -21,7 +21,16 @@
 
 #include <curl/curl.h>
 #include <vector>
+#include <iostream>
+#include <string>
+#include <curl/curlbuild.h>
+#include <curl/easy.h>
+#include <openssl/ssl.h>
 #include "ByteInputCallBack.h"
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "properties/Configure.h"
+#include "io/validation.h"
 
 namespace org {
 namespace apache {
@@ -88,6 +97,205 @@ struct HTTPRequestResponse {
 
 };
 
+static void parse_url(std::string &url, std::string &host, int &port, 
std::string &protocol) {
+
+  std::string http("http://";);
+  std::string https("https://";);
+
+  if (url.compare(0, http.size(), http) == 0)
+    protocol = http;
+
+  if (url.compare(0, https.size(), https) == 0)
+    protocol = https;
+
+  if (!protocol.empty()) {
+    size_t pos = url.find_first_of(":", protocol.size());
+
+    if (pos == std::string::npos) {
+      pos = url.size();
+    }
+
+    host = url.substr(protocol.size(), pos - protocol.size());
+
+    if (pos < url.size() && url[pos] == ':') {
+      size_t ppos = url.find_first_of("/", pos);
+      if (ppos == std::string::npos) {
+        ppos = url.size();
+      }
+      std::string portStr(url.substr(pos + 1, ppos - pos - 1));
+      if (portStr.size() > 0) {
+        port = std::stoi(portStr);
+      }
+    }
+  }
+}
+
+// HTTPSecurityConfiguration
+class HTTPSecurityConfiguration {
+public:
+
+  // Constructor
+  /*!
+   * Create a new HTTPSecurityConfiguration
+   */
+  HTTPSecurityConfiguration(bool need_client_certificate, std::string 
certificate,
+      std::string private_key, std::string passphrase, std::string 
ca_certificate) :
+        need_client_certificate_(need_client_certificate), 
certificate_(certificate),
+        private_key_(private_key), passphrase_(passphrase), 
ca_certificate_(ca_certificate) {
+    logger_ = logging::LoggerFactory<HTTPSecurityConfiguration>::getLogger();
+  }
+  // Destructor
+  virtual ~HTTPSecurityConfiguration() {
+  }
+
+  HTTPSecurityConfiguration(std::shared_ptr<Configure> configure) {
+    logger_ = logging::LoggerFactory<HTTPSecurityConfiguration>::getLogger();
+    need_client_certificate_ = false;
+    std::string clientAuthStr;
+    if (configure->get(Configure::nifi_https_need_ClientAuth, clientAuthStr)) {
+      
org::apache::nifi::minifi::utils::StringUtils::StringToBool(clientAuthStr, 
this->need_client_certificate_);
+    }
+
+    if (configure->get(Configure::nifi_https_client_ca_certificate, 
this->ca_certificate_)) {
+      logger_->log_info("HTTPSecurityConfiguration CA certificates: [%s]", 
this->ca_certificate_);
+    }
+
+    if (this->need_client_certificate_) {
+      std::string passphrase_file;
+
+      if (!(configure->get(Configure::nifi_https_client_certificate, 
this->certificate_) && configure->get(Configure::nifi_https_client_private_key, 
this->private_key_))) {
+        logger_->log_error("Certificate and Private Key PEM file not 
configured for HTTPSecurityConfiguration, error: %s.", std::strerror(errno));
+      }
+
+      if (configure->get(Configure::nifi_https_client_pass_phrase, 
passphrase_file)) {
+        // load the passphase from file
+        std::ifstream file(passphrase_file.c_str(), std::ifstream::in);
+        if (file.good()) {
+          this->passphrase_.assign((std::istreambuf_iterator<char>(file)), 
std::istreambuf_iterator<char>());
+          file.close();
+        }
+      }
+
+      logger_->log_info("HTTPSecurityConfiguration certificate: [%s], private 
key: [%s], passphrase file: [%s]", this->certificate_, this->private_key_, 
passphrase_file);
+    }
+  }
+
+  /**
+    * Configures a secure connection
+    */
+  void configureSecureConnection(CURL *http_session) {
+    curl_easy_setopt(http_session, CURLOPT_VERBOSE, 1L);
+    curl_easy_setopt(http_session, CURLOPT_CAINFO, 
this->ca_certificate_.c_str());
+    curl_easy_setopt(http_session, CURLOPT_SSLCERTTYPE, "PEM");
+    curl_easy_setopt(http_session, CURLOPT_SSL_VERIFYPEER, 1L);
+    if (this->need_client_certificate_) {
+      CURLcode ret;
+      ret = curl_easy_setopt(http_session, CURLOPT_SSL_CTX_FUNCTION,
+          &HTTPSecurityConfiguration::configureSSLContext);
+      if (ret != CURLE_OK)
+        logger_->log_error("CURLOPT_SSL_CTX_FUNCTION not supported %d", ret);
+      curl_easy_setopt(http_session, CURLOPT_SSL_CTX_DATA,
+          static_cast<void*>(this));
+      curl_easy_setopt(http_session, CURLOPT_SSLKEYTYPE, "PEM");
+    }
+  }
+
+  static CURLcode configureSSLContext(CURL *curl, void *ctx, void *param) {
+    minifi::utils::HTTPSecurityConfiguration *config =
+        static_cast<minifi::utils::HTTPSecurityConfiguration *>(param);
+    SSL_CTX* sslCtx = static_cast<SSL_CTX*>(ctx);
+
+    SSL_CTX_load_verify_locations(sslCtx, config->ca_certificate_.c_str(), 0);
+    SSL_CTX_use_certificate_file(sslCtx, config->certificate_.c_str(),
+        SSL_FILETYPE_PEM);
+    SSL_CTX_set_default_passwd_cb(sslCtx,
+        HTTPSecurityConfiguration::pemPassWordCb);
+    SSL_CTX_set_default_passwd_cb_userdata(sslCtx, param);
+    SSL_CTX_use_PrivateKey_file(sslCtx, config->private_key_.c_str(),
+        SSL_FILETYPE_PEM);
+    // verify private key
+    if (!SSL_CTX_check_private_key(sslCtx)) {
+      config->logger_->log_error(
+          "Private key does not match the public certificate, error : %s",
+          std::strerror(errno));
+      return CURLE_FAILED_INIT;
+    }
+
+    config->logger_->log_debug(
+        "HTTPSecurityConfiguration load Client Certificates OK");
+    return CURLE_OK;
+  }
+
+  static int pemPassWordCb(char *buf, int size, int rwflag, void *param) {
+    minifi::utils::HTTPSecurityConfiguration *config =
+        static_cast<minifi::utils::HTTPSecurityConfiguration *>(param);
+
+    if (config->passphrase_.length() > 0) {
+      memset(buf, 0x00, size);
+      memcpy(buf, config->passphrase_.c_str(),
+          config->passphrase_.length() - 1);
+      return config->passphrase_.length() - 1;
+    }
+    return 0;
+  }
+
+protected:
+  bool need_client_certificate_;
+  std::string certificate_;
+  std::string private_key_;
+  std::string passphrase_;
+  std::string ca_certificate_;
+
+private:
+  std::shared_ptr<logging::Logger> logger_;
+};
+
+static std::string get_token(std::string loginUrl, std::string username, 
std::string password, HTTPSecurityConfiguration &securityConfig) {
+  utils::HTTPRequestResponse content;
+  std::string token;
+  CURL *login_session = curl_easy_init();
+  if (loginUrl.find("https") != std::string::npos) {
+     securityConfig.configureSecureConnection(login_session);
+   }
+  curl_easy_setopt(login_session, CURLOPT_URL, loginUrl.c_str());
+  struct curl_slist *list = NULL;
+  list = curl_slist_append(list, "Content-Type: 
application/x-www-form-urlencoded");
+  list = curl_slist_append(list, "Accept: text/plain");
+  curl_easy_setopt(login_session, CURLOPT_HTTPHEADER, list);
+  std::string payload = "username=" + username + "&" + "password=" + password;
+  char *output = curl_easy_escape(login_session, payload.c_str(), 
payload.length());
+  curl_easy_setopt(login_session, CURLOPT_WRITEFUNCTION,
+      &utils::HTTPRequestResponse::recieve_write);
+  curl_easy_setopt(login_session, CURLOPT_WRITEDATA,
+      static_cast<void*>(&content));
+  curl_easy_setopt(login_session, CURLOPT_POSTFIELDSIZE, strlen(output));
+  curl_easy_setopt(login_session, CURLOPT_POSTFIELDS, output);
+  curl_easy_setopt(login_session, CURLOPT_POST, 1);
+  CURLcode res = curl_easy_perform(login_session);
+  curl_slist_free_all(list);
+  curl_free(output);
+  if (res == CURLE_OK) {
+    std::string response_body(content.data.begin(), content.data.end());
+    int64_t http_code = 0;
+    curl_easy_getinfo(login_session, CURLINFO_RESPONSE_CODE, &http_code);
+    char *content_type;
+    /* ask for the content-type */
+    curl_easy_getinfo(login_session, CURLINFO_CONTENT_TYPE, &content_type);
+
+    bool isSuccess = ((int32_t) (http_code / 100)) == 2
+        && res != CURLE_ABORTED_BY_CALLBACK;
+    bool body_empty = IsNullOrEmpty(content.data);
+
+    if (isSuccess && !body_empty) {
+      token = "Bearer " + response_body;
+    }
+  }
+  curl_easy_cleanup(login_session);
+
+  return token;
+}
+
+
 } /* namespace utils */
 } /* namespace minifi */
 } /* namespace nifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/20622f6d/libminifi/src/ConfigurationListener.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ConfigurationListener.cpp 
b/libminifi/src/ConfigurationListener.cpp
index d52a088..aaf50ce 100644
--- a/libminifi/src/ConfigurationListener.cpp
+++ b/libminifi/src/ConfigurationListener.cpp
@@ -46,46 +46,6 @@ void ConfigurationListener::start() {
     }
   }
 
-  std::string clientAuthStr;
-  if (configure_->get(Configure::nifi_configuration_listener_need_ClientAuth, 
clientAuthStr)) {
-    org::apache::nifi::minifi::utils::StringUtils::StringToBool(clientAuthStr, 
this->need_client_certificate_);
-  }
-
-  if (configure_->get(
-          Configure::nifi_configuration_listener_client_ca_certificate,
-      this->ca_certificate_)) {
-    logger_->log_info("Configuration Listener CA certificates: [%s]",
-        this->ca_certificate_);
-  }
-
-  if (this->need_client_certificate_) {
-    std::string passphrase_file;
-
-    if (!(configure_->get(
-        Configure::nifi_configuration_listener_client_certificate, 
this->certificate_)
-        && configure_->get(Configure::nifi_configuration_listener_private_key,
-            this->private_key_))) {
-      logger_->log_error(
-          "Certificate and Private Key PEM file not configured for 
configuration listener, error: %s.",
-          std::strerror(errno));
-    }
-
-    if (configure_->get(
-        Configure::nifi_configuration_listener_client_pass_phrase,
-        passphrase_file)) {
-      // load the passphase from file
-      std::ifstream file(passphrase_file.c_str(), std::ifstream::in);
-      if (file.good()) {
-        this->passphrase_.assign((std::istreambuf_iterator<char>(file)),
-            std::istreambuf_iterator<char>());
-        file.close();
-      }
-    }
-
-    logger_->log_info("Configuration Listener certificate: [%s], private key: 
[%s], passphrase file: [%s]",
-            this->certificate_, this->private_key_, passphrase_file);
-  }
-
   thread_ = std::thread(&ConfigurationListener::threadExecutor, this);
   thread_.detach();
   running_ = true;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/20622f6d/libminifi/src/Configure.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Configure.cpp b/libminifi/src/Configure.cpp
index e1bc225..8bbc5fc 100644
--- a/libminifi/src/Configure.cpp
+++ b/libminifi/src/Configure.cpp
@@ -61,16 +61,20 @@ const char *Configure::nifi_configuration_listener_rest_url 
=
     "nifi.configuration.listener.rest.url";
 const char *Configure::nifi_configuration_listener_type =
     "nifi.configuration.listener.type";
-const char *Configure::nifi_configuration_listener_need_ClientAuth =
-    "nifi.configuration.listener.need.ClientAuth";
-const char *Configure::nifi_configuration_listener_client_certificate =
-    "nifi.configuration.listener.client.certificate";
-const char *Configure::nifi_configuration_listener_private_key =
-    "nifi.configuration.listener.client.private.key";
-const char *Configure::nifi_configuration_listener_client_pass_phrase =
-    "nifi.configuration.listener.client.pass.phrase";
-const char *Configure::nifi_configuration_listener_client_ca_certificate =
-    "nifi.configuration.listener.client.ca.certificate";
+const char *Configure::nifi_https_need_ClientAuth =
+    "nifi.https.need.ClientAuth";
+const char *Configure::nifi_https_client_certificate =
+    "nifi.https.client.certificate";
+const char *Configure::nifi_https_client_private_key =
+    "nifi.https.client.private.key";
+const char *Configure::nifi_https_client_pass_phrase =
+    "nifi.https.client.pass.phrase";
+const char *Configure::nifi_https_client_ca_certificate =
+    "nifi.https.client.ca.certificate";
+const char *Configure::nifi_rest_api_user_name =
+    "nifi.rest.api.user.name";
+const char *Configure::nifi_rest_api_password =
+    "nifi.rest.api.password";
 
 
 } /* namespace minifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/20622f6d/libminifi/src/HttpConfigurationListener.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/HttpConfigurationListener.cpp 
b/libminifi/src/HttpConfigurationListener.cpp
index 70d5793..39da67b 100644
--- a/libminifi/src/HttpConfigurationListener.cpp
+++ b/libminifi/src/HttpConfigurationListener.cpp
@@ -38,64 +38,6 @@ namespace apache {
 namespace nifi {
 namespace minifi {
 
-int HttpConfigurationListener::pemPassWordCb(char *buf, int size, int rwflag,
-    void *param) {
-  minifi::HttpConfigurationListener *listener =
-      static_cast<minifi::HttpConfigurationListener*>(param);
-
-  if (listener->passphrase_.length() > 0) {
-    memset(buf, 0x00, size);
-    memcpy(buf, listener->passphrase_.c_str(),
-        listener->passphrase_.length() - 1);
-    return listener->passphrase_.length() - 1;
-  }
-  return 0;
-}
-
-CURLcode HttpConfigurationListener::configureSSLContext(CURL *curl, void *ctx,
-    void *param) {
-  minifi::HttpConfigurationListener *listener =
-      static_cast<minifi::HttpConfigurationListener*>(param);
-  SSL_CTX* sslCtx = static_cast<SSL_CTX*>(ctx);
-
-  SSL_CTX_load_verify_locations(sslCtx, listener->ca_certificate_.c_str(), 0);
-  SSL_CTX_use_certificate_file(sslCtx, listener->certificate_.c_str(),
-      SSL_FILETYPE_PEM);
-  SSL_CTX_set_default_passwd_cb(sslCtx,
-      HttpConfigurationListener::pemPassWordCb);
-  SSL_CTX_set_default_passwd_cb_userdata(sslCtx, param);
-  SSL_CTX_use_PrivateKey_file(sslCtx, listener->private_key_.c_str(),
-      SSL_FILETYPE_PEM);
-  // verify private key
-  if (!SSL_CTX_check_private_key(sslCtx)) {
-    listener->logger_->log_error(
-        "Private key does not match the public certificate, error : %s",
-        std::strerror(errno));
-    return CURLE_FAILED_INIT;
-  }
-
-  listener->logger_->log_debug(
-      "HttpConfigurationListener load Client Certificates OK");
-  return CURLE_OK;
-}
-
-void HttpConfigurationListener::configureSecureConnection(CURL *http_session) {
-  curl_easy_setopt(http_session, CURLOPT_VERBOSE, 1L);
-  curl_easy_setopt(http_session, CURLOPT_CAINFO, 
this->ca_certificate_.c_str());
-  curl_easy_setopt(http_session, CURLOPT_SSLCERTTYPE, "PEM");
-  curl_easy_setopt(http_session, CURLOPT_SSL_VERIFYPEER, 1L);
-  if (this->need_client_certificate_) {
-    CURLcode ret;
-    ret = curl_easy_setopt(http_session, CURLOPT_SSL_CTX_FUNCTION,
-        &HttpConfigurationListener::configureSSLContext);
-    if (ret != CURLE_OK)
-      logger_->log_error("CURLOPT_SSL_CTX_FUNCTION not supported %d", ret);
-    curl_easy_setopt(http_session, CURLOPT_SSL_CTX_DATA,
-        static_cast<void*>(this));
-    curl_easy_setopt(http_session, CURLOPT_SSLKEYTYPE, "PEM");
-  }
-}
-
 bool HttpConfigurationListener::pullConfiguration(std::string &configuration) {
   if (url_.empty())
     return false;
@@ -117,7 +59,7 @@ bool 
HttpConfigurationListener::pullConfiguration(std::string &configuration) {
   }
 
   if (fullUrl.find("https") != std::string::npos) {
-    configureSecureConnection(http_session);
+    securityConfig_.configureSecureConnection(http_session);
   }
 
   utils::HTTPRequestResponse content;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/20622f6d/libminifi/src/RemoteProcessorGroupPort.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp 
b/libminifi/src/RemoteProcessorGroupPort.cpp
index ca8d3be..d1862cd 100644
--- a/libminifi/src/RemoteProcessorGroupPort.cpp
+++ b/libminifi/src/RemoteProcessorGroupPort.cpp
@@ -20,6 +20,9 @@
 
 #include "../include/RemoteProcessorGroupPort.h"
 
+#include <curl/curl.h>
+#include <curl/curlbuild.h>
+#include <curl/easy.h>
 #include <uuid/uuid.h>
 #include <algorithm>
 #include <cstdint>
@@ -30,6 +33,8 @@
 #include <string>
 #include <type_traits>
 #include <utility>
+#include "json/json.h"
+#include "json/writer.h"
 
 #include "../include/core/logging/Logger.h"
 #include "../include/core/ProcessContext.h"
@@ -44,8 +49,8 @@ namespace nifi {
 namespace minifi {
 
 const char 
*RemoteProcessorGroupPort::ProcessorName("RemoteProcessorGroupPort");
-core::Property RemoteProcessorGroupPort::hostName("Host Name", "Remote Host 
Name.", "localhost");
-core::Property RemoteProcessorGroupPort::port("Port", "Remote Port", "9999");
+core::Property RemoteProcessorGroupPort::hostName("Host Name", "Remote Host 
Name.", "");
+core::Property RemoteProcessorGroupPort::port("Port", "Remote Port", "");
 core::Property RemoteProcessorGroupPort::portUUID("Port UUID", "Specifies 
remote NiFi Port UUID.", "");
 core::Relationship RemoteProcessorGroupPort::relation;
 
@@ -55,18 +60,38 @@ bool create = true) {
   if (!available_protocols_.try_dequeue(nextProtocol)) {
     if (create) {
       // create
-      nextProtocol = std::unique_ptr<Site2SiteClientProtocol>(new 
Site2SiteClientProtocol(nullptr));
-      nextProtocol->setPortId(protocol_uuid_);
-      std::unique_ptr<org::apache::nifi::minifi::io::DataStream> str = 
std::unique_ptr<org::apache::nifi::minifi::io::DataStream>(stream_factory_->createSocket(host_,
 port_));
-      std::unique_ptr<Site2SitePeer> peer_ = 
std::unique_ptr<Site2SitePeer>(new Site2SitePeer(std::move(str), host_, port_));
-      nextProtocol->setPeer(std::move(peer_));
+      if (url_.empty()) {
+        nextProtocol = std::unique_ptr<Site2SiteClientProtocol>(new 
Site2SiteClientProtocol(nullptr));
+        nextProtocol->setPortId(protocol_uuid_);
+        std::unique_ptr<org::apache::nifi::minifi::io::DataStream> str = 
std::unique_ptr<org::apache::nifi::minifi::io::DataStream>(stream_factory_->createSocket(host_,
 port_));
+        std::unique_ptr<Site2SitePeer> peer_ = 
std::unique_ptr<Site2SitePeer>(new Site2SitePeer(std::move(str), host_, port_));
+        nextProtocol->setPeer(std::move(peer_));
+      } else if (site2site_peer_index_ >= 0) {
+        nextProtocol = std::unique_ptr<Site2SiteClientProtocol>(new 
Site2SiteClientProtocol(nullptr));
+        minifi::Site2SitePeerStatus peer;
+        nextProtocol->setPortId(protocol_uuid_);
+        {
+          std::lock_guard < std::mutex > lock(site2site_peer_mutex_);
+          peer = site2site_peer_status_list_[this->site2site_peer_index_];
+          site2site_peer_index_++;
+          if (site2site_peer_index_ >= site2site_peer_status_list_.size()) {
+            site2site_peer_index_ = 0;
+          }
+        }
+        std::unique_ptr<org::apache::nifi::minifi::io::DataStream> str = 
std::unique_ptr<org::apache::nifi::minifi::io::DataStream>(stream_factory_->createSocket(peer.host_,
 peer.port_));
+        std::unique_ptr<Site2SitePeer> peer_ = 
std::unique_ptr<Site2SitePeer>(new Site2SitePeer(std::move(str), peer.host_, 
peer.port_));
+        nextProtocol->setPeer(std::move(peer_));
+      }
     }
   }
   return std::move(nextProtocol);
 }
 
 void 
RemoteProcessorGroupPort::returnProtocol(std::unique_ptr<Site2SiteClientProtocol>
 return_protocol) {
-  if (available_protocols_.size_approx() >= max_concurrent_tasks_) {
+  int count = site2site_peer_status_list_.size();
+  if (max_concurrent_tasks_ > count)
+    count = max_concurrent_tasks_;
+  if (available_protocols_.size_approx() >= count) {
     // let the memory be freed
     return;
   }
@@ -84,6 +109,35 @@ void RemoteProcessorGroupPort::initialize() {
   std::set<core::Relationship> relationships;
   relationships.insert(relation);
   setSupportedRelationships(relationships);
+  curl_global_init(CURL_GLOBAL_DEFAULT);
+  {
+    std::lock_guard < std::mutex > lock(site2site_peer_mutex_);
+    if (!url_.empty()) {
+      refreshPeerList();
+      if (site2site_peer_status_list_.size() > 0)
+        site2site_peer_index_ = 0;
+    }
+    // populate the site2site protocol for load balancing between them
+    if (site2site_peer_status_list_.size() > 0) {
+      int count = site2site_peer_status_list_.size();
+      if (max_concurrent_tasks_ > count)
+        count = max_concurrent_tasks_;
+      for (int i = 0; i < count; i++) {
+        std::unique_ptr<Site2SiteClientProtocol> nextProtocol = nullptr;
+        nextProtocol = std::unique_ptr < Site2SiteClientProtocol > (new 
Site2SiteClientProtocol(nullptr));
+        nextProtocol->setPortId(protocol_uuid_);
+        minifi::Site2SitePeerStatus peer = 
site2site_peer_status_list_[this->site2site_peer_index_];
+        site2site_peer_index_++;
+        if (site2site_peer_index_ >= site2site_peer_status_list_.size()) {
+          site2site_peer_index_ = 0;
+        }
+        std::unique_ptr<org::apache::nifi::minifi::io::DataStream> str = 
std::unique_ptr < org::apache::nifi::minifi::io::DataStream > 
(stream_factory_->createSocket(peer.host_, peer.port_));
+        std::unique_ptr<Site2SitePeer> peer_ = std::unique_ptr < Site2SitePeer 
> (new Site2SitePeer(std::move(str), peer.host_, peer.port_));
+        nextProtocol->setPeer(std::move(peer_));
+        returnProtocol(std::move(nextProtocol));
+      }
+    }
+  }
 }
 
 void RemoteProcessorGroupPort::onSchedule(core::ProcessContext *context, 
core::ProcessSessionFactory *sessionFactory) {
@@ -91,12 +145,6 @@ void 
RemoteProcessorGroupPort::onSchedule(core::ProcessContext *context, core::P
 
   int64_t lvalue;
 
-  if (context->getProperty(hostName.getName(), value)) {
-    host_ = value;
-  }
-  if (context->getProperty(port.getName(), value) && 
core::Property::StringToInt(value, lvalue)) {
-    port_ = (uint16_t) lvalue;
-  }
   if (context->getProperty(portUUID.getName(), value)) {
     uuid_parse(value.c_str(), protocol_uuid_);
   }
@@ -110,16 +158,15 @@ void 
RemoteProcessorGroupPort::onTrigger(core::ProcessContext *context, core::Pr
 
   int64_t lvalue;
 
-  std::string host = "";
-  uint16_t sport = 0;
-
-  if (context->getProperty(hostName.getName(), value)) {
-    host = value;
+  if (context->getProperty(hostName.getName(), value) && !value.empty()) {
+    host_ = value;
   }
-  if (context->getProperty(port.getName(), value) && 
core::Property::StringToInt(value, lvalue)) {
-    sport = (uint16_t) lvalue;
+
+  if (context->getProperty(port.getName(), value) && !value.empty() && 
core::Property::StringToInt(value, lvalue)) {
+    port_ = static_cast<int> (lvalue);
   }
-  if (context->getProperty(portUUID.getName(), value)) {
+
+  if (context->getProperty(portUUID.getName(), value) && !value.empty()) {
     uuid_parse(value.c_str(), protocol_uuid_);
   }
 
@@ -150,6 +197,113 @@ void 
RemoteProcessorGroupPort::onTrigger(core::ProcessContext *context, core::Pr
   return;
 }
 
+void RemoteProcessorGroupPort::refreshRemoteSite2SiteInfo() {
+  if (this->host_.empty() || this->port_ == -1 || this->protocol_.empty())
+      return;
+
+  std::string fullUrl = this->protocol_ + this->host_ + ":" + 
std::to_string(this->port_) + "/nifi-api/controller/";
+
+  this->site2site_port_ = -1;
+  configure_->get(Configure::nifi_rest_api_user_name, this->rest_user_name_);
+  configure_->get(Configure::nifi_rest_api_password, this->rest_password_);
+
+  std::string token;
+
+  if (!rest_user_name_.empty()) {
+    std::string loginUrl = this->protocol_ + this->host_ + ":" + 
std::to_string(this->port_) + "/nifi-api/access/token/";
+    token = utils::get_token(loginUrl, this->rest_user_name_, 
this->rest_password_, this->securityConfig_);
+    logger_->log_debug("Token from NiFi REST Api endpoint %s", token);
+    if (token.empty())
+        return;
+  }
+
+  CURL *http_session = curl_easy_init();
+
+  if (fullUrl.find("https") != std::string::npos) {
+    this->securityConfig_.configureSecureConnection(http_session);
+  }
+
+  struct curl_slist *list = NULL;
+  if (!token.empty()) {
+    std::string header = "Authorization: " + token;
+    list = curl_slist_append(list, header.c_str());
+    curl_easy_setopt(http_session, CURLOPT_HTTPHEADER, list);
+  }
+
+  curl_easy_setopt(http_session, CURLOPT_URL, fullUrl.c_str());
+
+  utils::HTTPRequestResponse content;
+  curl_easy_setopt(http_session, CURLOPT_WRITEFUNCTION,
+      &utils::HTTPRequestResponse::recieve_write);
+
+  curl_easy_setopt(http_session, CURLOPT_WRITEDATA,
+      static_cast<void*>(&content));
+
+  CURLcode res = curl_easy_perform(http_session);
+  if (list)
+    curl_slist_free_all(list);
+
+  if (res == CURLE_OK) {
+    std::string response_body(content.data.begin(), content.data.end());
+    int64_t http_code = 0;
+    curl_easy_getinfo(http_session, CURLINFO_RESPONSE_CODE, &http_code);
+    char *content_type;
+    /* ask for the content-type */
+    curl_easy_getinfo(http_session, CURLINFO_CONTENT_TYPE, &content_type);
+
+    bool isSuccess = ((int32_t) (http_code / 100)) == 2
+        && res != CURLE_ABORTED_BY_CALLBACK;
+    bool body_empty = IsNullOrEmpty(content.data);
+
+    if (isSuccess && !body_empty) {
+      std::string controller = std::move(response_body);
+      logger_->log_debug("controller config %s", controller.c_str());
+      Json::Value value;
+      Json::Reader reader;
+      bool parsingSuccessful = reader.parse(controller, value);
+      if (parsingSuccessful && !value.empty()) {
+        Json::Value controllerValue = value["controller"];
+        if (!controllerValue.empty()) {
+          Json::Value port = controllerValue["remoteSiteListeningPort"];
+          if (!port.empty())
+            this->site2site_port_ = port.asInt();
+          Json::Value secure = controllerValue["siteToSiteSecure"];
+          if (!secure.empty())
+            this->site2site_secure_ = secure.asBool();
+        }
+        logger_->log_info("process group remote site2site port %d, is secure 
%d", site2site_port_, site2site_secure_);
+      }
+    } else {
+      logger_->log_error("Cannot output body to content for 
ProcessGroup::refreshRemoteSite2SiteInfo");
+    }
+  } else {
+    logger_->log_error(
+        "ProcessGroup::refreshRemoteSite2SiteInfo -- curl_easy_perform() 
failed %s\n",
+        curl_easy_strerror(res));
+  }
+  curl_easy_cleanup(http_session);
+}
+
+void RemoteProcessorGroupPort::refreshPeerList() {
+  refreshRemoteSite2SiteInfo();
+  if (site2site_port_ == -1)
+    return;
+
+  this->site2site_peer_status_list_.clear();
+
+  std::unique_ptr < Site2SiteClientProtocol> protocol;
+  protocol = std::unique_ptr < Site2SiteClientProtocol
+      > (new Site2SiteClientProtocol(nullptr));
+  protocol->setPortId(protocol_uuid_);
+  std::unique_ptr<org::apache::nifi::minifi::io::DataStream> str =
+      std::unique_ptr < org::apache::nifi::minifi::io::DataStream
+          > (stream_factory_->createSocket(host_, site2site_port_));
+  std::unique_ptr<Site2SitePeer> peer_ = std::unique_ptr < Site2SitePeer
+      > (new Site2SitePeer(std::move(str), host_, site2site_port_));
+  protocol->setPeer(std::move(peer_));
+  protocol->getPeerList(site2site_peer_status_list_);
+}
+
 } /* namespace minifi */
 } /* namespace nifi */
 } /* namespace apache */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/20622f6d/libminifi/src/Site2SiteClientProtocol.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Site2SiteClientProtocol.cpp 
b/libminifi/src/Site2SiteClientProtocol.cpp
index 5e0637d..7d6e3f3 100644
--- a/libminifi/src/Site2SiteClientProtocol.cpp
+++ b/libminifi/src/Site2SiteClientProtocol.cpp
@@ -27,6 +27,7 @@
 #include <thread>
 #include <random>
 #include <iostream>
+#include <vector>
 #include "io/CRCStream.h"
 #include "Site2SitePeer.h"
 #include "Site2SiteClientProtocol.h"
@@ -57,9 +58,6 @@ bool Site2SiteClientProtocol::establish() {
 
   if (!ret) {
     logger_->log_error("Site2Site Protocol Version Negotiation failed");
-    /*
-     peer_->yield();
-     tearDown(); */
     return false;
   }
 
@@ -91,7 +89,6 @@ bool Site2SiteClientProtocol::initiateResourceNegotiation() {
 
   if (ret <= 0) {
     logger_->log_info("result of writing version is %i", ret);
-    // tearDown();
     return false;
   }
 
@@ -100,40 +97,36 @@ bool 
Site2SiteClientProtocol::initiateResourceNegotiation() {
 
   if (ret <= 0) {
     logger_->log_info("result of writing version status code  %i", ret);
-    // tearDown();
     return false;
   }
   logger_->log_info("status code is %i", statusCode);
   switch (statusCode) {
-    case RESOURCE_OK:
-      logger_->log_info("Site2Site Protocol Negotiate protocol version OK");
-      return true;
-    case DIFFERENT_RESOURCE_VERSION:
-      uint32_t serverVersion;
-      ret = peer_->read(serverVersion);
-      if (ret <= 0) {
-        // tearDown();
-        return false;
-      }
-      logger_->log_info("Site2Site Server Response asked for a different 
protocol version %d", serverVersion);
-      for (unsigned int i = (_currentVersionIndex + 1); i < 
sizeof(_supportedVersion) / sizeof(uint32_t); i++) {
-        if (serverVersion >= _supportedVersion[i]) {
-          _currentVersion = _supportedVersion[i];
-          _currentVersionIndex = i;
-          return initiateResourceNegotiation();
-        }
-      }
-      ret = -1;
-      // tearDown();
-      return false;
-    case NEGOTIATED_ABORT:
-      logger_->log_info("Site2Site Negotiate protocol response ABORT");
-      ret = -1;
-      // tearDown();
+  case RESOURCE_OK:
+    logger_->log_info("Site2Site Protocol Negotiate protocol version OK");
+    return true;
+  case DIFFERENT_RESOURCE_VERSION:
+    uint32_t serverVersion;
+    ret = peer_->read(serverVersion);
+    if (ret <= 0) {
       return false;
-    default:
-      logger_->log_info("Negotiate protocol response unknown code %d", 
statusCode);
-      return true;
+    }
+    logger_->log_info("Site2Site Server Response asked for a different 
protocol version %d", serverVersion);
+    for (unsigned int i = (_currentVersionIndex + 1); i < 
sizeof(_supportedVersion) / sizeof(uint32_t); i++) {
+      if (serverVersion >= _supportedVersion[i]) {
+        _currentVersion = _supportedVersion[i];
+        _currentVersionIndex = i;
+        return initiateResourceNegotiation();
+      }
+    }
+    ret = -1;
+    return false;
+  case NEGOTIATED_ABORT:
+    logger_->log_info("Site2Site Negotiate protocol response ABORT");
+    ret = -1;
+    return false;
+  default:
+    logger_->log_info("Negotiate protocol response unknown code %d", 
statusCode);
+    return true;
   }
 
   return true;
@@ -152,7 +145,6 @@ bool 
Site2SiteClientProtocol::initiateCodecResourceNegotiation() {
 
   if (ret <= 0) {
     logger_->log_debug("result of getCodecResourceName is %i", ret);
-    // tearDown();
     return false;
   }
 
@@ -160,7 +152,6 @@ bool 
Site2SiteClientProtocol::initiateCodecResourceNegotiation() {
 
   if (ret <= 0) {
     logger_->log_debug("result of _currentCodecVersion is %i", ret);
-    // tearDown();
     return false;
   }
 
@@ -168,40 +159,36 @@ bool 
Site2SiteClientProtocol::initiateCodecResourceNegotiation() {
   ret = peer_->read(statusCode);
 
   if (ret <= 0) {
-    // tearDown();
     return false;
   }
 
   switch (statusCode) {
-    case RESOURCE_OK:
-      logger_->log_info("Site2Site Codec Negotiate version OK");
-      return true;
-    case DIFFERENT_RESOURCE_VERSION:
-      uint32_t serverVersion;
-      ret = peer_->read(serverVersion);
-      if (ret <= 0) {
-        // tearDown();
-        return false;
-      }
-      logger_->log_info("Site2Site Server Response asked for a different codec 
version %d", serverVersion);
-      for (unsigned int i = (_currentCodecVersionIndex + 1); i < 
sizeof(_supportedCodecVersion) / sizeof(uint32_t); i++) {
-        if (serverVersion >= _supportedCodecVersion[i]) {
-          _currentCodecVersion = _supportedCodecVersion[i];
-          _currentCodecVersionIndex = i;
-          return initiateCodecResourceNegotiation();
-        }
-      }
-      ret = -1;
-      // tearDown();
-      return false;
-    case NEGOTIATED_ABORT:
-      logger_->log_info("Site2Site Codec Negotiate response ABORT");
-      ret = -1;
-      // tearDown();
+  case RESOURCE_OK:
+    logger_->log_info("Site2Site Codec Negotiate version OK");
+    return true;
+  case DIFFERENT_RESOURCE_VERSION:
+    uint32_t serverVersion;
+    ret = peer_->read(serverVersion);
+    if (ret <= 0) {
       return false;
-    default:
-      logger_->log_info("Negotiate Codec response unknown code %d", 
statusCode);
-      return true;
+    }
+    logger_->log_info("Site2Site Server Response asked for a different codec 
version %d", serverVersion);
+    for (unsigned int i = (_currentCodecVersionIndex + 1); i < 
sizeof(_supportedCodecVersion) / sizeof(uint32_t); i++) {
+      if (serverVersion >= _supportedCodecVersion[i]) {
+        _currentCodecVersion = _supportedCodecVersion[i];
+        _currentCodecVersionIndex = i;
+        return initiateCodecResourceNegotiation();
+      }
+    }
+    ret = -1;
+    return false;
+  case NEGOTIATED_ABORT:
+    logger_->log_info("Site2Site Codec Negotiate response ABORT");
+    ret = -1;
+    return false;
+  default:
+    logger_->log_info("Negotiate Codec response unknown code %d", statusCode);
+    return true;
   }
 
   return true;
@@ -223,11 +210,10 @@ bool Site2SiteClientProtocol::handShake() {
   int ret = peer_->writeUTF(_commsIdentifier);
 
   if (ret <= 0) {
-    // tearDown();
     return false;
   }
 
-  std::map<std::string, std::string> properties;
+  std::map < std::string, std::string > properties;
   properties[HandShakePropertyStr[GZIP]] = "false";
   properties[HandShakePropertyStr[PORT_IDENTIFIER]] = _portIdStr;
   properties[HandShakePropertyStr[REQUEST_EXPIRATION_MILLIS]] = 
std::to_string(this->_timeOut);
@@ -243,7 +229,6 @@ bool Site2SiteClientProtocol::handShake() {
   if (_currentVersion >= 3) {
     ret = peer_->writeUTF(peer_->getURL());
     if (ret <= 0) {
-      // tearDown();
       return false;
     }
   }
@@ -251,7 +236,6 @@ bool Site2SiteClientProtocol::handShake() {
   uint32_t size = properties.size();
   ret = peer_->write(size);
   if (ret <= 0) {
-    // tearDown();
     return false;
   }
 
@@ -259,12 +243,10 @@ bool Site2SiteClientProtocol::handShake() {
   for (it = properties.begin(); it != properties.end(); it++) {
     ret = peer_->writeUTF(it->first);
     if (ret <= 0) {
-      // tearDown();
       return false;
     }
     ret = peer_->writeUTF(it->second);
     if (ret <= 0) {
-      // tearDown();
       return false;
     }
     logger_->log_info("Site2Site Protocol Send handshake properties %s %s", 
it->first.c_str(), it->second.c_str());
@@ -276,31 +258,24 @@ bool Site2SiteClientProtocol::handShake() {
   ret = this->readRespond(code, message);
 
   if (ret <= 0) {
-    // tearDown();
     return false;
   }
 
   switch (code) {
-    case PROPERTIES_OK:
-      logger_->log_info("Site2Site HandShake Completed");
-      _peerState = HANDSHAKED;
-      return true;
-    case PORT_NOT_IN_VALID_STATE:
-    case UNKNOWN_PORT:
-    case PORTS_DESTINATION_FULL:
-      logger_->log_error("Site2Site HandShake Failed because destination port 
is either invalid or full");
-      ret = -1;
-      /*
-       peer_->yield();
-       tearDown(); */
-      return false;
-    default:
-      logger_->log_info("HandShake Failed because of unknown respond code %d", 
code);
-      ret = -1;
-      /*
-       peer_->yield();
-       tearDown(); */
-      return false;
+  case PROPERTIES_OK:
+    logger_->log_info("Site2Site HandShake Completed");
+    _peerState = HANDSHAKED;
+    return true;
+  case PORT_NOT_IN_VALID_STATE:
+  case UNKNOWN_PORT:
+  case PORTS_DESTINATION_FULL:
+    logger_->log_error("Site2Site HandShake Failed because destination port is 
either invalid or full");
+    ret = -1;
+    return false;
+  default:
+    logger_->log_info("HandShake Failed because of unknown respond code %d", 
code);
+    ret = -1;
+    return false;
   }
 
   return false;
@@ -322,6 +297,64 @@ void Site2SiteClientProtocol::tearDown() {
   _peerState = IDLE;
 }
 
+bool 
Site2SiteClientProtocol::getPeerList(std::vector<minifi::Site2SitePeerStatus> 
&peer) {
+  if (establish() && handShake()) {
+    int status = this->writeRequestType(REQUEST_PEER_LIST);
+
+    if (status <= 0) {
+      tearDown();
+      return false;
+    }
+
+    uint32_t number;
+    status = peer_->read(number);
+
+    if (status <= 0) {
+      tearDown();
+      return false;
+    }
+
+    for (int i = 0; i < number; i++) {
+      std::string host;
+      status = peer_->readUTF(host);
+      if (status <= 0) {
+        tearDown();
+        return false;
+      }
+      uint32_t port;
+      status = peer_->read(port);
+      if (status <= 0) {
+        tearDown();
+        return false;
+      }
+      uint8_t secure;
+      status = peer_->read(secure);
+      if (status <= 0) {
+        tearDown();
+        return false;
+      }
+      uint32_t count;
+      status = peer_->read(count);
+      if (status <= 0) {
+        tearDown();
+        return false;
+      }
+      minifi::Site2SitePeerStatus status;
+      status.host_ = host;
+      status.isSecure_ = secure;
+      status.port_ = port;
+      peer.push_back(status);
+      logger_->log_info("Site2Site Peer host %s, port %d, Secure %d", host, 
port, secure);
+    }
+
+    tearDown();
+    return true;
+  } else {
+    tearDown();
+    return false;
+  }
+}
+
 int Site2SiteClientProtocol::writeRequestType(RequestType type) {
   if (type >= MAX_REQUEST_TYPE)
     return -1;
@@ -426,7 +459,6 @@ bool Site2SiteClientProtocol::negotiateCodec() {
   int status = this->writeRequestType(NEGOTIATE_FLOWFILE_CODEC);
 
   if (status <= 0) {
-    // tearDown();
     return false;
   }
 
@@ -435,9 +467,6 @@ bool Site2SiteClientProtocol::negotiateCodec() {
 
   if (!ret) {
     logger_->log_error("Site2Site Codec Version Negotiation failed");
-    /*
-     peer_->yield();
-     tearDown(); */
     return false;
   }
 
@@ -480,7 +509,6 @@ Transaction* 
Site2SiteClientProtocol::createTransaction(std::string &transaction
     ret = writeRequestType(RECEIVE_FLOWFILES);
 
     if (ret <= 0) {
-      // tearDown();
       return NULL;
     }
 
@@ -490,40 +518,37 @@ Transaction* 
Site2SiteClientProtocol::createTransaction(std::string &transaction
     ret = readRespond(code, message);
 
     if (ret <= 0) {
-      // tearDown();
       return NULL;
     }
 
     org::apache::nifi::minifi::io::CRCStream<Site2SitePeer> 
crcstream(peer_.get());
     switch (code) {
-      case MORE_DATA:
-        dataAvailable = true;
-        logger_->log_info("Site2Site peer indicates that data is available");
-        transaction = new Transaction(direction, crcstream);
-        _transactionMap[transaction->getUUIDStr()] = transaction;
-        transactionID = transaction->getUUIDStr();
-        transaction->setDataAvailable(dataAvailable);
-        logger_->log_info("Site2Site create transaction %s", 
transaction->getUUIDStr().c_str());
-        return transaction;
-      case NO_MORE_DATA:
-        dataAvailable = false;
-        logger_->log_info("Site2Site peer indicates that no data is 
available");
-        transaction = new Transaction(direction, crcstream);
-        _transactionMap[transaction->getUUIDStr()] = transaction;
-        transactionID = transaction->getUUIDStr();
-        transaction->setDataAvailable(dataAvailable);
-        logger_->log_info("Site2Site create transaction %s", 
transaction->getUUIDStr().c_str());
-        return transaction;
-      default:
-        logger_->log_info("Site2Site got unexpected response %d when asking 
for data", code);
-        // tearDown();
-        return NULL;
+    case MORE_DATA:
+      dataAvailable = true;
+      logger_->log_info("Site2Site peer indicates that data is available");
+      transaction = new Transaction(direction, crcstream);
+      _transactionMap[transaction->getUUIDStr()] = transaction;
+      transactionID = transaction->getUUIDStr();
+      transaction->setDataAvailable(dataAvailable);
+      logger_->log_info("Site2Site create transaction %s", 
transaction->getUUIDStr().c_str());
+      return transaction;
+    case NO_MORE_DATA:
+      dataAvailable = false;
+      logger_->log_info("Site2Site peer indicates that no data is available");
+      transaction = new Transaction(direction, crcstream);
+      _transactionMap[transaction->getUUIDStr()] = transaction;
+      transactionID = transaction->getUUIDStr();
+      transaction->setDataAvailable(dataAvailable);
+      logger_->log_info("Site2Site create transaction %s", 
transaction->getUUIDStr().c_str());
+      return transaction;
+    default:
+      logger_->log_info("Site2Site got unexpected response %d when asking for 
data", code);
+      return NULL;
     }
   } else {
     ret = writeRequestType(SEND_FLOWFILES);
 
     if (ret <= 0) {
-      // tearDown();
       return NULL;
     } else {
       org::apache::nifi::minifi::io::CRCStream<Site2SitePeer> 
crcstream(peer_.get());
@@ -760,7 +785,7 @@ void 
Site2SiteClientProtocol::receiveFlowFiles(core::ProcessContext *context, co
 
   try {
     while (true) {
-      std::map<std::string, std::string> empty;
+      std::map < std::string, std::string > empty;
       uint64_t startTime = getTimeMillis();
       std::string payload;
       DataPacket packet(this, transaction, empty, payload);
@@ -774,7 +799,7 @@ void 
Site2SiteClientProtocol::receiveFlowFiles(core::ProcessContext *context, co
         // transaction done
         break;
       }
-      std::shared_ptr<FlowFileRecord> flowFile = 
std::static_pointer_cast<FlowFileRecord>(session->create());
+      std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast < 
FlowFileRecord > (session->create());
 
       if (!flowFile) {
         throw Exception(SITE2SITE_EXCEPTION, "Flow File Creation Failed");
@@ -1072,7 +1097,7 @@ bool Site2SiteClientProtocol::complete(std::string 
transactionID) {
 }
 
 void Site2SiteClientProtocol::transferFlowFiles(core::ProcessContext *context, 
core::ProcessSession *session) {
-  std::shared_ptr<FlowFileRecord> flow = 
std::static_pointer_cast<FlowFileRecord>(session->get());
+  std::shared_ptr<FlowFileRecord> flow = std::static_pointer_cast < 
FlowFileRecord > (session->get());
 
   Transaction *transaction = NULL;
 
@@ -1125,7 +1150,7 @@ void 
Site2SiteClientProtocol::transferFlowFiles(core::ProcessContext *context, c
       if (transferNanos > _batchSendNanos)
         break;
 
-      flow = std::static_pointer_cast<FlowFileRecord>(session->get());
+      flow = std::static_pointer_cast < FlowFileRecord > (session->get());
 
       if (!flow) {
         continueTransaction = false;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/20622f6d/libminifi/src/core/FlowConfiguration.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/FlowConfiguration.cpp 
b/libminifi/src/core/FlowConfiguration.cpp
index cc6e0e5..c32add6 100644
--- a/libminifi/src/core/FlowConfiguration.cpp
+++ b/libminifi/src/core/FlowConfiguration.cpp
@@ -46,7 +46,7 @@ std::shared_ptr<core::Processor> 
FlowConfiguration::createProcessor(std::string
 std::shared_ptr<core::Processor> 
FlowConfiguration::createProvenanceReportTask() {
   std::shared_ptr<core::Processor> processor = nullptr;
 
-  processor = 
std::make_shared<org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask>(stream_factory_);
+  processor = 
std::make_shared<org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask>(stream_factory_,
 this->configuration_);
   // initialize the processor
   processor->initialize();
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/20622f6d/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp 
b/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp
index e46f740..02ddb52 100644
--- a/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp
+++ b/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp
@@ -48,6 +48,7 @@ namespace reporting {
 const char *SiteToSiteProvenanceReportingTask::ProvenanceAppStr = "MiNiFi 
Flow";
 
 void SiteToSiteProvenanceReportingTask::initialize() {
+  RemoteProcessorGroupPort::initialize();
 }
 
 void SiteToSiteProvenanceReportingTask::getJsonReport(core::ProcessContext 
*context, core::ProcessSession *session, 
std::vector<std::shared_ptr<provenance::ProvenanceEventRecord>> &records,

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/20622f6d/libminifi/src/core/yaml/YamlConfiguration.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp 
b/libminifi/src/core/yaml/YamlConfiguration.cpp
index c28ec70..4ce944e 100644
--- a/libminifi/src/core/yaml/YamlConfiguration.cpp
+++ b/libminifi/src/core/yaml/YamlConfiguration.cpp
@@ -316,18 +316,6 @@ void 
YamlConfiguration::parseProvenanceReportingYaml(YAML::Node *reportNode, cor
   auto schedulingStrategyStr = node["scheduling strategy"].as<std::string>();
   checkRequiredField(&node, "scheduling period", 
CONFIG_YAML_PROVENANCE_REPORT_KEY);
   auto schedulingPeriodStr = node["scheduling period"].as<std::string>();
-  checkRequiredField(&node, "host", CONFIG_YAML_PROVENANCE_REPORT_KEY);
-  auto hostStr = node["host"].as<std::string>();
-  checkRequiredField(&node, "port", CONFIG_YAML_PROVENANCE_REPORT_KEY);
-  auto portStr = node["port"].as<std::string>();
-  checkRequiredField(&node, "port uuid", CONFIG_YAML_PROVENANCE_REPORT_KEY);
-  auto portUUIDStr = node["port uuid"].as<std::string>();
-  checkRequiredField(&node, "batch size", CONFIG_YAML_PROVENANCE_REPORT_KEY);
-  auto batchSizeStr = node["batch size"].as<std::string>();
-
-  // add processor to parent
-  parentGroup->addProcessor(processor);
-  processor->setScheduledState(core::RUNNING);
 
   core::TimeUnit unit;
   if (core::Property::StringToTime(schedulingPeriodStr, schedulingPeriod, 
unit) && core::Property::ConvertTimeUnitToNS(schedulingPeriod, unit, 
schedulingPeriod)) {
@@ -342,20 +330,43 @@ void 
YamlConfiguration::parseProvenanceReportingYaml(YAML::Node *reportNode, cor
     throw std::invalid_argument("Invalid scheduling strategy " + 
schedulingStrategyStr);
   }
 
-  reportTask->setHost(hostStr);
-  logger_->log_debug("ProvenanceReportingTask host %s", hostStr);
   int64_t lvalue;
-  if (core::Property::StringToInt(portStr, lvalue)) {
-    logger_->log_debug("ProvenanceReportingTask port %d", (uint16_t) lvalue);
-    reportTask->setPort((uint16_t) lvalue);
+  if (node["host"]) {
+    auto hostStr = node["host"].as<std::string>();
+    reportTask->setHost(hostStr);
+  }
+  if (node["port"]) {
+    auto portStr = node["port"].as<std::string>();
+    if (core::Property::StringToInt(portStr, lvalue)) {
+      logger_->log_debug("ProvenanceReportingTask port %d", (uint16_t) lvalue);
+      reportTask->setPort((uint16_t) lvalue);
+    }
+  }
+  if (node["url"]) {
+    auto urlStr = node["url"].as<std::string>();
+    if (!urlStr.empty()) {
+      reportTask->setURL(urlStr);
+      logger_->log_debug("ProvenanceReportingTask URL %s", urlStr);
+    }
   }
+  checkRequiredField(&node, "port uuid", CONFIG_YAML_PROVENANCE_REPORT_KEY);
+  auto portUUIDStr = node["port uuid"].as<std::string>();
+  checkRequiredField(&node, "batch size", CONFIG_YAML_PROVENANCE_REPORT_KEY);
+  auto batchSizeStr = node["batch size"].as<std::string>();
 
   logger_->log_debug("ProvenanceReportingTask port uuid %s", portUUIDStr);
   uuid_parse(portUUIDStr.c_str(), port_uuid);
   reportTask->setPortUUID(port_uuid);
+
   if (core::Property::StringToInt(batchSizeStr, lvalue)) {
     reportTask->setBatchSize(lvalue);
   }
+
+  reportTask->initialize();
+
+  // add processor to parent
+  parentGroup->addProcessor(processor);
+  processor->setScheduledState(core::RUNNING);
 }
 
 void YamlConfiguration::parseControllerServices(YAML::Node 
*controllerServicesNode) {
@@ -535,7 +546,7 @@ void YamlConfiguration::parsePortYaml(YAML::Node *portNode, 
core::ProcessGroup *
   auto portId = inputPortsObj["id"].as<std::string>();
   uuid_parse(portId.c_str(), uuid);
 
-  port = std::make_shared<minifi::RemoteProcessorGroupPort>(stream_factory_, 
nameStr, uuid);
+  port = std::make_shared<minifi::RemoteProcessorGroupPort>(stream_factory_, 
nameStr, parent->getURL(), this->configuration_, uuid);
 
   processor = std::static_pointer_cast<core::Processor>(port);
   port->setDirection(direction);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/20622f6d/libminifi/test/integration/Site2SiteRestTest.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/integration/Site2SiteRestTest.cpp 
b/libminifi/test/integration/Site2SiteRestTest.cpp
new file mode 100644
index 0000000..01aa7a8
--- /dev/null
+++ b/libminifi/test/integration/Site2SiteRestTest.cpp
@@ -0,0 +1,145 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <sys/stat.h>
+#include <cassert>
+#include <utility>
+#include <chrono>
+#include <fstream>
+#include <memory>
+#include <string>
+#include <thread>
+#include <type_traits>
+#include <vector>
+#include <iostream>
+#include <sstream>
+#include "../TestBase.h"
+#include "utils/StringUtils.h"
+#include "core/Core.h"
+#include "../include/core/logging/Logger.h"
+#include "core/ProcessGroup.h"
+#include "core/yaml/YamlConfiguration.h"
+#include "FlowController.h"
+#include "properties/Configure.h"
+#include "../unit/ProvenanceTestHelper.h"
+#include "io/StreamFactory.h"
+#include "CivetServer.h"
+#include "RemoteProcessorGroupPort.h"
+
+void waitToVerifyProcessor() {
+  std::this_thread::sleep_for(std::chrono::seconds(10));
+}
+
+class ConfigHandler: public CivetHandler {
+ public:
+  bool handleGet(CivetServer *server, struct mg_connection *conn) {
+    static const std::string site2site_rest_resp = "{"
+           "\"revision\": {"
+           "\"clientId\": \"483d53eb-53ec-4e93-b4d4-1fc3d23dae6f\""
+           "},"
+           "\"controller\": {"
+           "\"id\": \"fe4a3a42-53b6-4af1-a80d-6fdfe60de97f\","
+           "\"name\": \"NiFi Flow\","
+           "\"remoteSiteListeningPort\": 10001,"
+           "\"siteToSiteSecure\": false"
+           "}}";
+    mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
+        "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
+        site2site_rest_resp.length());
+    mg_printf(conn, "%s", site2site_rest_resp.c_str());
+    return true;
+  }
+};
+
+int main(int argc, char **argv) {
+  LogTestController::getInstance().setInfo<minifi::RemoteProcessorGroupPort>();
+  LogTestController::getInstance().setInfo<minifi::FlowController>();
+
+  const char *options[] = { "document_root", ".", "listening_ports", "8082", 0 
};
+  std::vector < std::string > cpp_options;
+  for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) {
+    cpp_options.push_back(options[i]);
+  }
+
+  mkdir("/tmp/site2siteGetFile/", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
+  std::fstream file;
+  std::stringstream ss;
+  ss << "/tmp/site2siteGetFile/" << "tstFile.ext";
+  file.open(ss.str(), std::ios::out);
+  file << "tempFile";
+  file.close();
+
+  CivetServer server(cpp_options);
+  ConfigHandler h_ex;
+  server.addHandler("/nifi-api/controller/", h_ex);
+  
LogTestController::getInstance().setDebug<minifi::RemoteProcessorGroupPort>();
+
+  std::string key_dir, test_file_location;
+  if (argc > 1) {
+    test_file_location = argv[1];
+    key_dir = argv[2];
+  }
+
+  std::shared_ptr<minifi::Configure> configuration = std::make_shared<
+      minifi::Configure>();
+  configuration->set(minifi::Configure::nifi_default_directory, key_dir);
+  mkdir("content_repository", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
+
+  std::shared_ptr<core::Repository> test_repo =
+      std::make_shared<TestRepository>();
+  std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<
+      TestFlowRepository>();
+
+  configuration->set(minifi::Configure::nifi_flow_configuration_file,
+      test_file_location);
+
+  std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared
+      < minifi::io::StreamFactory > (configuration);
+  std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr
+      < core::YamlConfiguration
+      > (new core::YamlConfiguration(test_repo, test_repo, stream_factory,
+          configuration, test_file_location));
+  std::shared_ptr<TestRepository> repo = std::static_pointer_cast
+      < TestRepository > (test_repo);
+
+  std::shared_ptr<minifi::FlowController> controller =
+      std::make_shared < minifi::FlowController
+          > (test_repo, test_flow_repo, configuration, std::move(yaml_ptr), 
DEFAULT_ROOT_GROUP_NAME, true);
+
+  core::YamlConfiguration yaml_config(test_repo, test_repo, stream_factory,
+      configuration, test_file_location);
+
+  std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(
+      test_file_location);
+  std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr < core::ProcessGroup
+      > (ptr.get());
+  ptr.release();
+
+  controller->load();
+  controller->start();
+  waitToVerifyProcessor();
+
+  controller->waitUnload(60000);
+  std::string logs = LogTestController::getInstance().log_output.str();
+  assert(logs.find("process group remote site2site port 10001, is secure 0") 
!= std::string::npos);
+  LogTestController::getInstance().reset();
+  unlink(ss.str().c_str());
+  rmdir("/tmp/site2siteGetFile/");
+  rmdir("./content_repository");
+  return 0;
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/20622f6d/libminifi/test/resources/TestSite2SiteRest.yml
----------------------------------------------------------------------
diff --git a/libminifi/test/resources/TestSite2SiteRest.yml 
b/libminifi/test/resources/TestSite2SiteRest.yml
new file mode 100644
index 0000000..ca751b5
--- /dev/null
+++ b/libminifi/test/resources/TestSite2SiteRest.yml
@@ -0,0 +1,58 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the \"License\"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an \"AS IS\" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+Flow Controller:
+    id: 471deef6-2a6e-4a7d-912a-81cc17e3a205
+    name: MiNiFi Flow
+
+Processors:
+    - name: GetFile
+      id: 471deef6-2a6e-4a7d-912a-81cc17e3a206
+      class: org.apache.nifi.processors.standard.GetFile
+      max concurrent tasks: 1
+      scheduling strategy: TIMER_DRIVEN
+      scheduling period: 1 sec
+      penalization period: 30 sec
+      yield period: 10 sec
+      run duration nanos: 0
+      auto-terminated relationships list:
+      Properties:
+          Input Directory: /tmp/site2siteGetFile
+          Keep Source File: true
+
+Connections:
+    - name: GenerateFlowFileS2S
+      id: 471deef6-2a6e-4a7d-912a-81cc17e3a207
+      source id: 471deef6-2a6e-4a7d-912a-81cc17e3a206 
+      source relationship name: success
+      destination id: 471deef6-2a6e-4a7d-912a-81cc17e3a204
+      max work queue size: 0
+      max work queue data size: 1 MB
+      flowfile expiration: 60 sec
+      queue prioritizer class: 
org.apache.nifi.prioritizer.NewestFlowFileFirstPrioritizer
+
+Remote Processing Groups:
+    - name: NiFi Flow
+      id: 471deef6-2a6e-4a7d-912a-81cc17e3a208
+      url: http://localhost:8082/nifi
+      timeout: 30 secs
+      yield period: 10 sec
+      Input Ports:
+          - id: 471deef6-2a6e-4a7d-912a-81cc17e3a204
+            name: From Node A
+            max concurrent tasks: 1
+            use compression: false
+            Properties: # Deviates from spec and will later be removed when 
this is autonegotiated
+                Port UUID: 471deef6-2a6e-4a7d-912a-81cc17e3a204

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/20622f6d/libminifi/test/unit/ProcessorTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/ProcessorTests.cpp 
b/libminifi/test/unit/ProcessorTests.cpp
index ac2b54e..9e2d50c 100644
--- a/libminifi/test/unit/ProcessorTests.cpp
+++ b/libminifi/test/unit/ProcessorTests.cpp
@@ -46,9 +46,10 @@ TEST_CASE("Test Find file", "[getfileCreate2]") {
   TestController testController;
 
   std::shared_ptr<core::Processor> processor = 
std::make_shared<org::apache::nifi::minifi::processors::GetFile>("getfileCreate2");
+  std::shared_ptr<org::apache::nifi::minifi::Configure> configure = 
std::make_shared<org::apache::nifi::minifi::Configure>();
 
   std::shared_ptr<core::Processor> processorReport = 
std::make_shared<org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask>(
-      
std::make_shared<org::apache::nifi::minifi::io::StreamFactory>(std::make_shared<org::apache::nifi::minifi::Configure>()));
+      
std::make_shared<org::apache::nifi::minifi::io::StreamFactory>(configure), 
configure);
 
   std::shared_ptr<core::Repository> test_repo = 
std::make_shared<TestRepository>();
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/20622f6d/libminifi/test/unit/YamlConfigurationTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/YamlConfigurationTests.cpp 
b/libminifi/test/unit/YamlConfigurationTests.cpp
index 660ff53..ba73a34 100644
--- a/libminifi/test/unit/YamlConfigurationTests.cpp
+++ b/libminifi/test/unit/YamlConfigurationTests.cpp
@@ -129,7 +129,7 @@ TEST_CASE("Test YAML Config Processing", 
"[YamlConfiguration]") {
   "    port name: provenance\n"
   "    port: 8090\n"
   "    port uuid: 2f389b8d-83f2-48d3-b465-048f28a1cb56\n"
-  "    destination url: https://localhost:8090/\n";
+  "    url: https://localhost:8090/\n";
   "    originating url: http://${hostname(true)}:8081/nifi\n"
   "    use compression: true\n"
   "    timeout: 30 secs\n"

Reply via email to