Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master 8eb8cf704 -> 6a672dae8


MINIFICPP-457: Add prioritizer service for Network comunications.

MINIFICPP-504: Tie in estimated size for max throughput to RPG

MINIFICPP-457: Resolve new test

MINIFICPP-457: Resolve issue with trusty

This closes #337.

Approved on GH by @achristianson

Signed-off-by: Marc Parisi <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/6a672dae
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/6a672dae
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/6a672dae

Branch: refs/heads/master
Commit: 6a672dae8f9e1d9eafe6ee7ec403127f3c1382d3
Parents: 8eb8cf7
Author: Marc Parisi <[email protected]>
Authored: Tue May 22 07:56:24 2018 -0400
Committer: Marc Parisi <[email protected]>
Committed: Fri May 25 14:52:56 2018 -0400

----------------------------------------------------------------------
 README.md                                       |  36 ++++
 controller/Controller.h                         |   4 +-
 controller/MiNiFiController.cpp                 |   2 +-
 .../http-curl/tests/C2FailedUpdateTest.cpp      |   2 +-
 .../http-curl/tests/C2UpdateAgentTest.cpp       |   2 +-
 extensions/http-curl/tests/C2UpdateTest.cpp     |   2 +-
 .../tests/ControllerServiceIntegrationTests.cpp |   2 +-
 extensions/http-curl/tests/GetFileNoData.cpp    |   3 +-
 .../http-curl/tests/HttpGetIntegrationTest.cpp  |   2 +-
 libminifi/include/capi/Instance.h               |   2 +-
 .../controllers/NetworkPrioritizerService.h     | 147 +++++++++++++
 libminifi/include/io/ClientSocket.h             |   9 +-
 libminifi/include/io/DataStream.h               |   3 -
 libminifi/include/io/NetworkPrioritizer.h       | 118 +++++++++++
 libminifi/include/io/StreamFactory.h            |  39 +++-
 libminifi/include/sitetosite/Peer.h             |  11 +-
 .../include/sitetosite/SiteToSiteFactory.h      |   9 +-
 libminifi/src/RemoteProcessorGroupPort.cpp      |   7 +-
 libminifi/src/c2/ControllerSocketProtocol.cpp   |   2 +-
 libminifi/src/capi/Plan.cpp                     |  13 +-
 .../controllers/NetworkPrioritizerService.cpp   | 211 +++++++++++++++++++
 libminifi/src/io/ClientSocket.cpp               |  25 ++-
 libminifi/src/sitetosite/Peer.cpp               |  13 +-
 libminifi/test/TestBase.cpp                     |  20 +-
 libminifi/test/integration/IntegrationBase.h    |   3 +-
 .../integration/ProvenanceReportingTest.cpp     |   2 +-
 .../test/integration/SecureSocketGetTCPTest.cpp |   2 +-
 libminifi/test/unit/ControllerTests.cpp         |   6 +-
 libminifi/test/unit/GetTCPTests.cpp             |   6 +-
 .../unit/NetworkPrioritizerServiceTests.cpp     | 168 +++++++++++++++
 libminifi/test/unit/ProcessorTests.cpp          |   2 +-
 libminifi/test/unit/SocketTests.cpp             |   8 +-
 libminifi/test/unit/YamlConfigurationTests.cpp  |  10 +-
 main/MiNiFiMain.cpp                             |   2 +-
 34 files changed, 808 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 32eb897..5a5e639 100644
--- a/README.md
+++ b/README.md
@@ -702,6 +702,7 @@ Additionally, a unique hexadecimal 
uid.minifi.device.segment should be assigned
           Trigger Threshold: 90
           Low Battery Threshold: 50
           Wait Period: 500 ms
+          
 ### MQTT Controller service
 The MQTTController Service can be configured for MQTT connectivity and provide 
that capability to your processors when MQTT is built.
     
@@ -713,6 +714,41 @@ The MQTTController Service can be configured for MQTT 
connectivity and provide t
           Broker URI: localhost:1883
            Client ID: client ID
           Quality of Service: 2
+          
+### Network Prioritizer Controller Service
+  The network prioritizer controller service can be configured to manage 
prioritizing and binding to specific network interfaces. Linked Services, can 
be used
+  as a prioritized list to create a disjunction among multiple networking 
prioritizers. This allows you to create classes with different configurations 
that
+  create multiple prioritizations. Max Throughput is the maximum throughput in 
bytes per second. Max Payload is the maximum number of bytes supported by that
+  prioritizer. If a prioritizer is configured with the option "Default 
Prioritizer: true," then all socket communications will use that default 
prioritizer.
+  
+  In the configuration below there are two classes defined under 
"NetworkPrioritizerService", one class "NetworkPrioritizerService2" defines 
en0, and en1.
+  If en0 is down at any point, then en1 will be given priority before 
resorting to en2 and en3 of  "NetworkPrioritizerService3". If the throughput 
for 
+  "NetworkPrioritizerService2" exceeds the defined throughput or the max 
payload of 1024, then "NetworkPrioritizerService3" will be used. If Max Payload 
and 
+  Max Throughput are not defined, then they will not be limiting factors. For 
this release, 0.5.0, Max Payload will only be used for processors that custom 
+  implement that feature. RPGs will not support max payloads until 0.6.0. 
Additionally, since connection queues aren't prioritized, you must have a live 
connection
+  for your data to send it. Since connection queues can't be re-prioritized, 
this can create a starvation problem. The configuration is required to account 
for this.
+    
+   Controller Services:
+   - name: NetworkPrioritizerService
+     id: 2438e3c8-015a-1000-79ca-83af40ec1883
+     class: NetworkPrioritizerService
+     Properties:
+         Linked Services: NetworkPrioritizerService2,NetworkPrioritizerService3
+   - name: NetworkPrioritizerService2
+     id: 2438e3c8-015a-1000-79ca-83af40ec1884
+     class: NetworkPrioritizerService
+     Properties:
+         Network Controllers: en0,en1
+         Max Throughput: 1,024,1024
+         Max Payload: 1024
+   - name: NetworkPrioritizerService3
+     id: 2438e3c8-015a-1000-79ca-83af40ec1884
+     class: NetworkPrioritizerService
+     Properties:
+         Network Controllers: en2,en3
+         Max Throughput: 1,024,1024
+         Max Payload: 1,024,1024
+         
 ### Running
 After completing a [build](#building), the application can be run by issuing 
the following from :
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/controller/Controller.h
----------------------------------------------------------------------
diff --git a/controller/Controller.h b/controller/Controller.h
index 0a2b292..312b922 100644
--- a/controller/Controller.h
+++ b/controller/Controller.h
@@ -231,7 +231,7 @@ std::shared_ptr<core::controller::ControllerService> 
getControllerService(const
 
   configuration->get(minifi::Configure::nifi_configuration_class_name, 
nifi_configuration_class_name);
 
-  std::shared_ptr<minifi::io::StreamFactory> stream_factory = 
std::make_shared<minifi::io::StreamFactory>(configuration);
+  std::shared_ptr<minifi::io::StreamFactory> stream_factory = 
minifi::io::StreamFactory::getInstance(configuration);
 
   std::unique_ptr<core::FlowConfiguration> flow_configuration = 
core::createFlowConfiguration(prov_repo, flow_repo, content_repo, 
configuration, stream_factory, nifi_configuration_class_name);
 
@@ -288,7 +288,7 @@ std::shared_ptr<core::controller::ControllerService> 
getControllerService(const
 
   configuration->get(minifi::Configure::nifi_configuration_class_name, 
nifi_configuration_class_name);
 
-  std::shared_ptr<minifi::io::StreamFactory> stream_factory = 
std::make_shared<minifi::io::StreamFactory>(configuration);
+  std::shared_ptr<minifi::io::StreamFactory> stream_factory = 
minifi::io::StreamFactory::getInstance(configuration);
 
   std::unique_ptr<core::FlowConfiguration> flow_configuration = 
core::createFlowConfiguration(prov_repo, flow_repo, content_repo, 
configuration, stream_factory, nifi_configuration_class_name);
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/controller/MiNiFiController.cpp
----------------------------------------------------------------------
diff --git a/controller/MiNiFiController.cpp b/controller/MiNiFiController.cpp
index dc12306..5e2b886 100644
--- a/controller/MiNiFiController.cpp
+++ b/controller/MiNiFiController.cpp
@@ -119,7 +119,7 @@ int main(int argc, char **argv) {
     secure_context->setDisablePeerVerification();
   }
 
-  auto stream_factory_ = 
std::make_shared<minifi::io::StreamFactory>(configuration);
+  auto stream_factory_ = minifi::io::StreamFactory::getInstance(configuration);
 
   std::string host = "localhost", portStr, caCert;
   int port = -1;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/extensions/http-curl/tests/C2FailedUpdateTest.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/tests/C2FailedUpdateTest.cpp 
b/extensions/http-curl/tests/C2FailedUpdateTest.cpp
index 2aed1f4..fc3e79a 100644
--- a/extensions/http-curl/tests/C2FailedUpdateTest.cpp
+++ b/extensions/http-curl/tests/C2FailedUpdateTest.cpp
@@ -151,7 +151,7 @@ int main(int argc, char **argv) {
 
   configuration->set(minifi::Configure::nifi_flow_configuration_file, 
test_file_location);
 
-  std::shared_ptr<minifi::io::StreamFactory> stream_factory = 
std::make_shared<minifi::io::StreamFactory>(configuration);
+  std::shared_ptr<minifi::io::StreamFactory> stream_factory = 
minifi::io::StreamFactory::getInstance(configuration);
   std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
   std::unique_ptr<core::FlowConfiguration> yaml_ptr = 
std::unique_ptr<core::YamlConfiguration>(
       new core::YamlConfiguration(test_repo, test_repo, content_repo, 
stream_factory, configuration, test_file_location));

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/extensions/http-curl/tests/C2UpdateAgentTest.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/tests/C2UpdateAgentTest.cpp 
b/extensions/http-curl/tests/C2UpdateAgentTest.cpp
index 26a3e86..00b761b 100644
--- a/extensions/http-curl/tests/C2UpdateAgentTest.cpp
+++ b/extensions/http-curl/tests/C2UpdateAgentTest.cpp
@@ -150,7 +150,7 @@ int main(int argc, char **argv) {
   configuration->set(minifi::Configure::nifi_flow_configuration_file, 
test_file_location);
   configuration->set("c2.agent.update.command", "echo \"verification 
command\"");
 
-  std::shared_ptr<minifi::io::StreamFactory> stream_factory = 
std::make_shared<minifi::io::StreamFactory>(configuration);
+  std::shared_ptr<minifi::io::StreamFactory> stream_factory = 
minifi::io::StreamFactory::getInstance(configuration);
   std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
   std::unique_ptr<core::FlowConfiguration> yaml_ptr = 
std::unique_ptr<core::YamlConfiguration>(
       new core::YamlConfiguration(test_repo, test_repo, content_repo, 
stream_factory, configuration, test_file_location));

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/extensions/http-curl/tests/C2UpdateTest.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/tests/C2UpdateTest.cpp 
b/extensions/http-curl/tests/C2UpdateTest.cpp
index 52e60f8..c51376d 100644
--- a/extensions/http-curl/tests/C2UpdateTest.cpp
+++ b/extensions/http-curl/tests/C2UpdateTest.cpp
@@ -149,7 +149,7 @@ int main(int argc, char **argv) {
 
   configuration->set(minifi::Configure::nifi_flow_configuration_file, 
test_file_location);
 
-  std::shared_ptr<minifi::io::StreamFactory> stream_factory = 
std::make_shared<minifi::io::StreamFactory>(configuration);
+  std::shared_ptr<minifi::io::StreamFactory> stream_factory = 
minifi::io::StreamFactory::getInstance(configuration);
   std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
   std::unique_ptr<core::FlowConfiguration> yaml_ptr = 
std::unique_ptr<core::YamlConfiguration>(
       new core::YamlConfiguration(test_repo, test_repo, content_repo, 
stream_factory, configuration, test_file_location));

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp 
b/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp
index 612603a..8fb1daf 100644
--- a/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp
+++ b/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp
@@ -79,7 +79,7 @@ int main(int argc, char **argv) {
   configuration->set(minifi::Configure::nifi_security_client_pass_phrase, 
passphrase);
   configuration->set(minifi::Configure::nifi_default_directory, key_dir);
 
-  std::shared_ptr<minifi::io::StreamFactory> stream_factory = 
std::make_shared<minifi::io::StreamFactory>(configuration);
+  std::shared_ptr<minifi::io::StreamFactory> stream_factory = 
minifi::io::StreamFactory::getInstance(configuration);
   std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
   content_repo->initialize(configuration);
   std::unique_ptr<core::FlowConfiguration> yaml_ptr = 
std::unique_ptr<core::YamlConfiguration>(

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/extensions/http-curl/tests/GetFileNoData.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/tests/GetFileNoData.cpp 
b/extensions/http-curl/tests/GetFileNoData.cpp
index 299d994..a0a5dfd 100644
--- a/extensions/http-curl/tests/GetFileNoData.cpp
+++ b/extensions/http-curl/tests/GetFileNoData.cpp
@@ -146,8 +146,7 @@ int main(int argc, char **argv) {
   configuration->set(minifi::Configure::nifi_flow_configuration_file,
                      test_file_location);
 
-  std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared
-      <minifi::io::StreamFactory>(configuration);
+  std::shared_ptr<minifi::io::StreamFactory> stream_factory = 
minifi::io::StreamFactory::getInstance(configuration);
   std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
   std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr
       <core::YamlConfiguration

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/extensions/http-curl/tests/HttpGetIntegrationTest.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/tests/HttpGetIntegrationTest.cpp 
b/extensions/http-curl/tests/HttpGetIntegrationTest.cpp
index 9e6e99f..8e6042f 100644
--- a/extensions/http-curl/tests/HttpGetIntegrationTest.cpp
+++ b/extensions/http-curl/tests/HttpGetIntegrationTest.cpp
@@ -93,7 +93,7 @@ int main(int argc, char **argv) {
 
   configuration->set(minifi::Configure::nifi_flow_configuration_file, 
test_file_location);
 
-  std::shared_ptr<minifi::io::StreamFactory> stream_factory = 
std::make_shared<minifi::io::StreamFactory>(configuration);
+  std::shared_ptr<minifi::io::StreamFactory> stream_factory = 
minifi::io::StreamFactory::getInstance(configuration);
 
   std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/libminifi/include/capi/Instance.h
----------------------------------------------------------------------
diff --git a/libminifi/include/capi/Instance.h 
b/libminifi/include/capi/Instance.h
index bbe0f4c..29e0fcb 100644
--- a/libminifi/include/capi/Instance.h
+++ b/libminifi/include/capi/Instance.h
@@ -71,8 +71,8 @@ class Instance {
         listener_thread_pool_(1),
         
content_repo_(std::make_shared<minifi::core::repository::VolatileContentRepository>()),
         no_op_repo_(std::make_shared<minifi::core::Repository>()) {
-    stream_factory_ = std::make_shared<minifi::io::StreamFactory>(configure_);
     running_ = false;
+    stream_factory_ = minifi::io::StreamFactory::getInstance(configure_);
     uuid_t uuid;
     uuid_parse(port.c_str(), uuid);
     rpg_ = std::make_shared<minifi::RemoteProcessorGroupPort>(stream_factory_, 
url, url, configure_, uuid);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/libminifi/include/controllers/NetworkPrioritizerService.h
----------------------------------------------------------------------
diff --git a/libminifi/include/controllers/NetworkPrioritizerService.h 
b/libminifi/include/controllers/NetworkPrioritizerService.h
new file mode 100644
index 0000000..bf7916f
--- /dev/null
+++ b/libminifi/include/controllers/NetworkPrioritizerService.h
@@ -0,0 +1,147 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONTROLLERS_NETWORKPRIORITIZERSERVICE_H_
+#define LIBMINIFI_INCLUDE_CONTROLLERS_NETWORKPRIORITIZERSERVICE_H_
+
+#include <iostream>
+#include <memory>
+#include <limits>
+#include "core/Resource.h"
+#include "utils/StringUtils.h"
+#include "io/validation.h"
+#include "controllers/SSLContextService.h"
+#include "core/controller/ControllerService.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "ThreadManagementService.h"
+#include "io/NetworkPrioritizer.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace controllers {
+
+/**
+ * Purpose: Network prioritizer for selecting network interfaces through the 
flow configuration.
+ */
+class NetworkPrioritizerService : public core::controller::ControllerService, 
public minifi::io::NetworkPrioritizer, public 
std::enable_shared_from_this<NetworkPrioritizerService> {
+ public:
+  explicit NetworkPrioritizerService(const std::string &name, const 
std::string &id)
+      : ControllerService(name, id),
+        enabled_(false),
+        max_throughput_(std::numeric_limits<uint64_t>::max()),
+        max_payload_(std::numeric_limits<uint64_t>::max()),
+        tokens_per_ms(2),
+        tokens_(1000),
+        timestamp_(0),
+        bytes_per_token_(0),
+        verify_interfaces_(true),
+        
logger_(logging::LoggerFactory<NetworkPrioritizerService>::getLogger()) {
+  }
+
+  explicit NetworkPrioritizerService(const std::string &name, uuid_t uuid = 0)
+      : ControllerService(name, uuid),
+        enabled_(false),
+        max_throughput_(std::numeric_limits<uint64_t>::max()),
+        max_payload_(std::numeric_limits<uint64_t>::max()),
+        tokens_per_ms(2),
+        tokens_(1000),
+        timestamp_(0),
+        bytes_per_token_(0),
+        verify_interfaces_(true),
+        
logger_(logging::LoggerFactory<NetworkPrioritizerService>::getLogger()) {
+  }
+
+  explicit NetworkPrioritizerService(const std::string &name, const 
std::shared_ptr<Configure> &configuration)
+      : NetworkPrioritizerService(name, nullptr) {
+    setConfiguration(configuration);
+    initialize();
+  }
+
+  static core::Property NetworkControllers;
+  static core::Property MaxThroughput;
+  static core::Property MaxPayload;
+  static core::Property VerifyInterfaces;
+  static core::Property DefaultPrioritizer;
+
+  void initialize();
+
+  void yield();
+
+  bool isRunning();
+
+  bool isWorkAvailable();
+
+  virtual void onEnable();
+
+  virtual io::NetworkInterface &&getInterface(uint32_t size);
+
+ protected:
+
+  std::string get_nearest_interface(const std::vector<std::string> &ifcs);
+
+  bool interface_online(const std::string &ifc);
+
+  std::vector<std::string> getInterfaces(uint32_t size);
+
+  bool sufficient_tokens(uint32_t size);
+
+  virtual void reduce_tokens(uint32_t size);
+
+  bool enabled_;
+
+  uint64_t max_throughput_;
+
+  uint64_t max_payload_;
+
+  std::vector<std::string> network_controllers_;
+
+  int tokens_per_ms;
+
+  /**
+   * Using a variation of the token bucket algorithm.
+   * every millisecond 1 token will be added to the bucket. max throughput 
will define a maximum rate per second.
+   *
+   * When a request for data arrives to send and not enough tokens exist, we 
will restrict sending through the interfaces defined here.
+   *
+   * When a request arrives tokens will be decremented. We will compute the 
amount of data that can be sent per token from the configuration
+   * of max_throughput_
+   */
+  uint32_t tokens_;
+
+  std::mutex token_mutex_;
+
+  uint64_t timestamp_;
+
+  uint32_t bytes_per_token_;
+
+  bool verify_interfaces_;
+
+ private:
+  std::shared_ptr<logging::Logger> logger_;
+};
+
+REGISTER_RESOURCE(NetworkPrioritizerService);
+
+} /* namespace controllers */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_CONTROLLERS_NETWORKPRIORITIZERSERVICE_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/libminifi/include/io/ClientSocket.h
----------------------------------------------------------------------
diff --git a/libminifi/include/io/ClientSocket.h 
b/libminifi/include/io/ClientSocket.h
index 32f04a5..e4b7604 100644
--- a/libminifi/include/io/ClientSocket.h
+++ b/libminifi/include/io/ClientSocket.h
@@ -30,6 +30,7 @@
 #include "core/logging/Logger.h"
 #include "io/validation.h"
 #include "properties/Configure.h"
+#include "io/NetworkPrioritizer.h"
 
 namespace org {
 namespace apache {
@@ -91,8 +92,8 @@ class Socket : public BaseStream {
    */
   virtual int16_t initialize();
 
-  virtual void setInterface(std::string &interface) {
-    local_network_interface_ = interface;
+  virtual void setInterface(io::NetworkInterface &&interface) {
+    local_network_interface_ = std::move(interface);
   }
 
   /**
@@ -260,7 +261,7 @@ class Socket : public BaseStream {
   uint16_t port_;
 
   bool is_loopback_only_;
-  std::string local_network_interface_;
+  io::NetworkInterface local_network_interface_;
 
   // connection information
   int32_t socket_file_descriptor_;
@@ -268,6 +269,8 @@ class Socket : public BaseStream {
   fd_set total_list_;
   fd_set read_fds_;
   std::atomic<uint16_t> socket_max_;
+  std::atomic<uint64_t> total_written_;
+  std::atomic<uint64_t> total_read_;
   uint16_t listeners_;
 
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/libminifi/include/io/DataStream.h
----------------------------------------------------------------------
diff --git a/libminifi/include/io/DataStream.h 
b/libminifi/include/io/DataStream.h
index a61ff5a..825cd89 100644
--- a/libminifi/include/io/DataStream.h
+++ b/libminifi/include/io/DataStream.h
@@ -69,9 +69,6 @@ class DataStream {
 
   }
 
-  virtual void setInterface(std::string &interface) {
-  }
-
   /**
    * Reads data and places it into buf
    * @param buf buffer in which we extract data

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/libminifi/include/io/NetworkPrioritizer.h
----------------------------------------------------------------------
diff --git a/libminifi/include/io/NetworkPrioritizer.h 
b/libminifi/include/io/NetworkPrioritizer.h
new file mode 100644
index 0000000..7265f49
--- /dev/null
+++ b/libminifi/include/io/NetworkPrioritizer.h
@@ -0,0 +1,118 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_IO_NETWORKPRIORITIZER_H_
+#define LIBMINIFI_INCLUDE_IO_NETWORKPRIORITIZER_H_
+
+#include <iostream>
+#include <memory>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
+
+class NetworkInterface;
+
+class NetworkPrioritizer {
+ public:
+
+  virtual ~NetworkPrioritizer() {
+  }
+
+  virtual NetworkInterface &&getInterface(uint32_t size) = 0;
+
+ protected:
+  friend class NetworkInterface;
+  virtual void reduce_tokens(uint32_t size) = 0;
+
+};
+
+class NetworkInterface {
+ public:
+
+  NetworkInterface() : prioritizer_(nullptr){
+  }
+
+  virtual ~NetworkInterface(){
+  }
+
+  explicit NetworkInterface(const std::string &ifc, const 
std::shared_ptr<NetworkPrioritizer> &prioritizer)
+      : ifc_(ifc),
+        prioritizer_(prioritizer) {
+  }
+
+  explicit NetworkInterface(const NetworkInterface &&other)
+      : ifc_(std::move(other.ifc_)),
+        prioritizer_(std::move(other.prioritizer_)) {
+  }
+
+  std::string getInterface() const {
+    return ifc_;
+  }
+  void log_write(uint32_t size) {
+    if (nullptr != prioritizer_) {
+      prioritizer_->reduce_tokens(size);
+    }
+  }
+
+  void log_read(uint32_t size) {
+    if (nullptr != prioritizer_) {
+      prioritizer_->reduce_tokens(size);
+    }
+  }
+
+  NetworkInterface &operator=(const NetworkInterface &&other) {
+    ifc_ = std::move(other.ifc_);
+    prioritizer_ = std::move(other.prioritizer_);
+    return *this;
+  }
+ private:
+  friend class NetworkPrioritizer;
+  std::string ifc_;
+  std::shared_ptr<NetworkPrioritizer> prioritizer_;
+};
+
+class NetworkPrioritizerFactory {
+ public:
+  static std::shared_ptr<NetworkPrioritizerFactory> getInstance() {
+    static std::shared_ptr<NetworkPrioritizerFactory> fa = 
std::make_shared<NetworkPrioritizerFactory>();
+    return fa;
+  }
+
+  int setPrioritizer(const std::shared_ptr<NetworkPrioritizer> &prioritizer) {
+    if (np_ != nullptr)
+      return -1;
+    np_ = prioritizer;
+    return 0;
+  }
+
+  std::shared_ptr<NetworkPrioritizer> getPrioritizer() {
+    return np_;
+  }
+ private:
+  std::shared_ptr<NetworkPrioritizer> np_;
+};
+
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_IO_NETWORKPRIORITIZER_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/libminifi/include/io/StreamFactory.h
----------------------------------------------------------------------
diff --git a/libminifi/include/io/StreamFactory.h 
b/libminifi/include/io/StreamFactory.h
index bbb09b9..f3fc95e 100644
--- a/libminifi/include/io/StreamFactory.h
+++ b/libminifi/include/io/StreamFactory.h
@@ -22,6 +22,7 @@
 #include "utils/StringUtils.h"
 #include "validation.h"
 #include "controllers/SSLContextService.h"
+#include "NetworkPrioritizer.h"
 namespace org {
 namespace apache {
 namespace nifi {
@@ -50,21 +51,49 @@ class StreamFactory {
    * Creates a socket and returns a unique ptr
    *
    */
-  std::unique_ptr<Socket> createSocket(const std::string &host, const uint16_t 
port) {
-    return delegate_->createSocket(host, port);
+  std::unique_ptr<Socket> createSocket(const std::string &host, const uint16_t 
port, uint32_t estimated_size = 0) {
+    auto socket = delegate_->createSocket(host, port);
+    auto prioritizer_ = 
NetworkPrioritizerFactory::getInstance()->getPrioritizer();
+    if (nullptr != prioritizer_) {
+      std::cout << "prioritizer" << std::endl;
+      auto &&ifc = prioritizer_->getInterface(estimated_size);
+      if (ifc.getInterface().empty()) {
+        return nullptr;
+      } else {
+        socket->setInterface(std::move(ifc));
+      }
+    }
+    return socket;
   }
 
   /**
    * Creates a socket and returns a unique ptr
    *
    */
-  std::unique_ptr<Socket> createSecureSocket(const std::string &host, const 
uint16_t port, const std::shared_ptr<minifi::controllers::SSLContextService> 
&ssl_service) {
-    return delegate_->createSecureSocket(host, port, ssl_service);
+  std::unique_ptr<Socket> createSecureSocket(const std::string &host, const 
uint16_t port, const std::shared_ptr<minifi::controllers::SSLContextService> 
&ssl_service, uint32_t estimated_size = 0) {
+    auto socket = delegate_->createSecureSocket(host, port, ssl_service);
+    auto prioritizer_ = 
NetworkPrioritizerFactory::getInstance()->getPrioritizer();
+    if (nullptr != prioritizer_) {
+      auto &&ifc = prioritizer_->getInterface(estimated_size);
+      if (ifc.getInterface().empty()) {
+        return nullptr;
+      } else {
+        socket->setInterface(std::move(ifc));
+      }
+    }
+    return socket;
   }
 
-  StreamFactory(const std::shared_ptr<Configure> &configure);
+  static std::shared_ptr<StreamFactory> getInstance(const 
std::shared_ptr<Configure> &configuration) {
+    // avoid invalid access
+    static std::shared_ptr<StreamFactory> factory = 
std::shared_ptr<StreamFactory>(new StreamFactory(configuration));
+    return factory;
+  }
 
  protected:
+
+  StreamFactory(const std::shared_ptr<Configure> &configure);
+
   std::shared_ptr<AbstractStreamFactory> delegate_;
 };
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/libminifi/include/sitetosite/Peer.h
----------------------------------------------------------------------
diff --git a/libminifi/include/sitetosite/Peer.h 
b/libminifi/include/sitetosite/Peer.h
index 1f9ec01..3775c7c 100644
--- a/libminifi/include/sitetosite/Peer.h
+++ b/libminifi/include/sitetosite/Peer.h
@@ -143,7 +143,6 @@ class SiteToSitePeer : public 
org::apache::nifi::minifi::io::BaseStream {
       : stream_(nullptr),
         host_(""),
         port_(-1),
-        local_network_interface_(""),
         logger_(logging::LoggerFactory<SiteToSitePeer>::getLogger()) {
 
   }
@@ -159,13 +158,13 @@ class SiteToSitePeer : public 
org::apache::nifi::minifi::io::BaseStream {
       : stream_(nullptr),
         host_(host),
         port_(port),
-        local_network_interface_(interface),
         timeout_(30000),
         yield_expiration_(0),
         logger_(logging::LoggerFactory<SiteToSitePeer>::getLogger()) {
     url_ = "nifi://" + host_ + ":" + std::to_string(port_);
     yield_expiration_ = 0;
     timeout_ = 30000;  // 30 seconds
+    local_network_interface_= std::move(io::NetworkInterface(interface, 
nullptr));
   }
 
   explicit SiteToSitePeer(SiteToSitePeer &&ss)
@@ -193,10 +192,10 @@ class SiteToSitePeer : public 
org::apache::nifi::minifi::io::BaseStream {
   }
   // setInterface
   void setInterface(std::string &interface) {
-    local_network_interface_ = interface;
+    local_network_interface_ = 
std::move(io::NetworkInterface(interface,nullptr));
   }
   std::string getInterface() {
-    return local_network_interface_;
+    return local_network_interface_.getInterface();
   }
   // Get Processor yield period in MilliSecond
   uint64_t getYieldPeriodMsec(void) {
@@ -288,7 +287,7 @@ class SiteToSitePeer : public 
org::apache::nifi::minifi::io::BaseStream {
   void setStream(std::unique_ptr<org::apache::nifi::minifi::io::DataStream> 
stream) {
     stream_ = nullptr;
     if (stream)
-    stream_ = std::move(stream);
+      stream_ = std::move(stream);
   }
 
   org::apache::nifi::minifi::io::DataStream *getStream() {
@@ -374,7 +373,7 @@ class SiteToSitePeer : public 
org::apache::nifi::minifi::io::BaseStream {
 
   uint16_t port_;
 
-  std::string local_network_interface_;
+  io::NetworkInterface local_network_interface_;
 
   utils::HTTPProxy proxy_;
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/libminifi/include/sitetosite/SiteToSiteFactory.h
----------------------------------------------------------------------
diff --git a/libminifi/include/sitetosite/SiteToSiteFactory.h 
b/libminifi/include/sitetosite/SiteToSiteFactory.h
index 848a152..4f0f36c 100644
--- a/libminifi/include/sitetosite/SiteToSiteFactory.h
+++ b/libminifi/include/sitetosite/SiteToSiteFactory.h
@@ -44,6 +44,9 @@ static std::unique_ptr<SiteToSitePeer> 
createStreamingPeer(const SiteToSiteClien
     str = std::unique_ptr<org::apache::nifi::minifi::io::DataStream>(
         
client_configuration.getStreamFactory()->createSocket(client_configuration.getPeer()->getHost(),
 client_configuration.getPeer()->getPort()));
   }
+
+  if (nullptr == str)
+    return nullptr;
   auto peer = std::unique_ptr<SiteToSitePeer>(new 
SiteToSitePeer(std::move(str), client_configuration.getPeer()->getHost(), 
client_configuration.getPeer()->getPort(),
       client_configuration.getInterface()));
   return peer;
@@ -57,7 +60,11 @@ static std::unique_ptr<SiteToSitePeer> 
createStreamingPeer(const SiteToSiteClien
 static std::unique_ptr<SiteToSiteClient> createRawSocket(const 
SiteToSiteClientConfiguration &client_configuration) {
   uuid_t uuid;
   client_configuration.getPeer()->getPortId(uuid);
-  auto ptr = std::unique_ptr<SiteToSiteClient>(new 
RawSiteToSiteClient(createStreamingPeer(client_configuration)));
+  auto rsptr = createStreamingPeer(client_configuration);
+  if (nullptr == rsptr){
+    return nullptr;
+  }
+  auto ptr = std::unique_ptr<SiteToSiteClient>(new 
RawSiteToSiteClient(std::move(rsptr)));
   ptr->setPortId(uuid);
   ptr->setSSLContextService(client_configuration.getSecurityContext());
   return ptr;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/libminifi/src/RemoteProcessorGroupPort.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp 
b/libminifi/src/RemoteProcessorGroupPort.cpp
index 68f6831..629075c 100644
--- a/libminifi/src/RemoteProcessorGroupPort.cpp
+++ b/libminifi/src/RemoteProcessorGroupPort.cpp
@@ -354,13 +354,14 @@ void RemoteProcessorGroupPort::refreshPeerList() {
   this->peers_.clear();
 
   std::unique_ptr<sitetosite::SiteToSiteClient> protocol;
-  sitetosite::SiteToSiteClientConfiguration config(stream_factory_, 
std::make_shared<sitetosite::Peer>(protocol_uuid_, host_,
-    site2site_port_, ssl_service != nullptr), this->getInterface(), 
client_type_);
+  sitetosite::SiteToSiteClientConfiguration config(stream_factory_, 
std::make_shared<sitetosite::Peer>(protocol_uuid_, host_, site2site_port_, 
ssl_service != nullptr), this->getInterface(),
+                                                   client_type_);
   config.setSecurityContext(ssl_service);
   config.setHTTPProxy(this->proxy_);
   protocol = sitetosite::createClient(config);
 
-  protocol->getPeerList(peers_);
+  if (protocol)
+    protocol->getPeerList(peers_);
 
   logging::LOG_INFO(logger_) << "Have " << peers_.size() << " peers";
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/libminifi/src/c2/ControllerSocketProtocol.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/c2/ControllerSocketProtocol.cpp 
b/libminifi/src/c2/ControllerSocketProtocol.cpp
index ea9cf9b..58b835b 100644
--- a/libminifi/src/c2/ControllerSocketProtocol.cpp
+++ b/libminifi/src/c2/ControllerSocketProtocol.cpp
@@ -32,7 +32,7 @@ namespace c2 {
 void ControllerSocketProtocol::initialize(const 
std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const 
std::shared_ptr<state::StateMonitor> &updateSink,
                                           const std::shared_ptr<Configure> 
&configuration) {
   HeartBeatReporter::initialize(controller, updateSink, configuration);
-  stream_factory_ = std::make_shared<minifi::io::StreamFactory>(configuration);
+  stream_factory_ = minifi::io::StreamFactory::getInstance(configuration);
 
   std::string host = "localhost", port, limitStr, context_name;
   bool anyInterface = false;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/libminifi/src/capi/Plan.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/capi/Plan.cpp b/libminifi/src/capi/Plan.cpp
index 6e5c7f6..6181382 100644
--- a/libminifi/src/capi/Plan.cpp
+++ b/libminifi/src/capi/Plan.cpp
@@ -23,24 +23,21 @@
 #include <string>
 
 ExecutionPlan::ExecutionPlan(std::shared_ptr<core::ContentRepository> 
content_repo, std::shared_ptr<core::Repository> flow_repo, 
std::shared_ptr<core::Repository> prov_repo)
-    :
-      content_repo_(content_repo),
+    : content_repo_(content_repo),
       flow_repo_(flow_repo),
       prov_repo_(prov_repo),
       finalized(false),
       location(-1),
       current_flowfile_(nullptr),
       logger_(logging::LoggerFactory<ExecutionPlan>::getLogger()) {
-  stream_factory = 
std::make_shared<org::apache::nifi::minifi::io::StreamFactory>(std::make_shared<minifi::Configure>());
+  stream_factory = 
org::apache::nifi::minifi::io::StreamFactory::getInstance(std::make_shared<minifi::Configure>());
 }
 
-std::shared_ptr<core::Processor> ExecutionPlan::addProcessor(const 
std::shared_ptr<core::Processor> &processor, const std::string &name, 
core::Relationship relationship,
-bool linkToPrevious) {
+std::shared_ptr<core::Processor> ExecutionPlan::addProcessor(const 
std::shared_ptr<core::Processor> &processor, const std::string &name, 
core::Relationship relationship, bool linkToPrevious) {
   if (finalized) {
     return nullptr;
   }
 
-
   uuid_t uuid;
   uuid_generate(uuid);
 
@@ -93,8 +90,7 @@ bool linkToPrevious) {
   return processor;
 }
 
-std::shared_ptr<core::Processor> ExecutionPlan::addProcessor(const std::string 
&processor_name, const std::string &name, core::Relationship relationship,
-bool linkToPrevious) {
+std::shared_ptr<core::Processor> ExecutionPlan::addProcessor(const std::string 
&processor_name, const std::string &name, core::Relationship relationship, bool 
linkToPrevious) {
   if (finalized) {
     return nullptr;
   }
@@ -141,7 +137,6 @@ void ExecutionPlan::reset() {
   }
 }
 
-
 bool ExecutionPlan::runNextProcessor(std::function<void(const 
std::shared_ptr<core::ProcessContext>, const 
std::shared_ptr<core::ProcessSession>)> verify) {
   if (!finalized) {
     finalize();

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/libminifi/src/controllers/NetworkPrioritizerService.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/controllers/NetworkPrioritizerService.cpp 
b/libminifi/src/controllers/NetworkPrioritizerService.cpp
new file mode 100644
index 0000000..d88921b
--- /dev/null
+++ b/libminifi/src/controllers/NetworkPrioritizerService.cpp
@@ -0,0 +1,211 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "controllers/NetworkPrioritizerService.h"
+#include <cstdio>
+#include <utility>
+#include <limits>
+#include <string>
+#include <vector>
+#include <sys/ioctl.h>
+#include <ifaddrs.h>
+#include <net/if.h>
+#include <netinet/in.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <set>
+#include "utils/StringUtils.h"
+#if ( defined(__APPLE__) || defined(__MACH__) || defined(BSD))
+#include <net/if_dl.h>
+#include <net/if_types.h>
+#endif
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace controllers {
+
+core::Property NetworkPrioritizerService::NetworkControllers("Network 
Controllers", "Network controllers in order of priority for this prioritizer");
+core::Property NetworkPrioritizerService::MaxThroughput("Max Throughput", "Max 
throughput for these network controllers");
+core::Property NetworkPrioritizerService::MaxPayload("Max Payload", "Maximum 
payload for these network controllers");
+core::Property NetworkPrioritizerService::VerifyInterfaces("Verify 
Interfaces", "Verify that interfaces are operational", "true");
+core::Property NetworkPrioritizerService::DefaultPrioritizer("Default 
Prioritizer", "Sets this controller service as the default prioritizer for all 
comms");
+
+void NetworkPrioritizerService::initialize() {
+  std::set<core::Property> supportedProperties;
+  supportedProperties.insert(NetworkControllers);
+  supportedProperties.insert(MaxThroughput);
+  supportedProperties.insert(MaxPayload);
+  supportedProperties.insert(VerifyInterfaces);
+  supportedProperties.insert(DefaultPrioritizer);
+  setSupportedProperties(supportedProperties);
+}
+
+void NetworkPrioritizerService::yield() {
+}
+
+/**
+ * If not an intersecting operation we will attempt to locate the highest 
priority interface available.
+ */
+io::NetworkInterface &&NetworkPrioritizerService::getInterface(uint32_t size = 
0) {
+  std::vector<std::string> controllers;
+  if (!network_controllers_.empty()) {
+    if (sufficient_tokens(size) && size < max_payload_) {
+      controllers.insert(std::end(controllers), 
std::begin(network_controllers_), std::end(network_controllers_));
+    }
+  }
+
+  if (!controllers.empty()) {
+    auto ifc = get_nearest_interface(controllers);
+    if (!ifc.empty()) {
+      reduce_tokens(size);
+      return std::move(io::NetworkInterface(ifc, shared_from_this()));
+    }
+  }
+  for (size_t i = 0; i < linked_services_.size(); i++) {
+    auto np = 
std::dynamic_pointer_cast<NetworkPrioritizerService>(linked_services_.at(i));
+    if (np != nullptr) {
+      auto ifcs = np->getInterfaces(size);
+      auto ifc = get_nearest_interface(ifcs);
+      if (!ifc.empty()) {
+        np->reduce_tokens(size);
+        return std::move(io::NetworkInterface(ifc, np));
+      }
+    }
+  }
+  return std::move(io::NetworkInterface("", nullptr));
+}
+
+std::string NetworkPrioritizerService::get_nearest_interface(const 
std::vector<std::string> &ifcs) {
+  for (auto ifc : ifcs) {
+    if (!verify_interfaces_ || interface_online(ifc)) {
+      logger_->log_debug("%s is online", ifc);
+      return ifc;
+    } else {
+      logger_->log_debug("%s is not online", ifc);
+    }
+  }
+  return "";
+}
+
+bool NetworkPrioritizerService::interface_online(const std::string &ifc) {
+  struct ifreq ifr;
+  auto sockid = socket(PF_INET6, SOCK_DGRAM, IPPROTO_IP);
+  memset(&ifr, 0, sizeof(ifr));
+  snprintf(ifr.ifr_name, ifc.length(), "%s", ifc.c_str());
+  if (ioctl(sockid, SIOCGIFFLAGS, &ifr) < 0) {
+    return false;
+  }
+  close(sockid);
+  return (ifr.ifr_flags & IFF_UP) && (ifr.ifr_flags & IFF_RUNNING);
+}
+
+std::vector<std::string> NetworkPrioritizerService::getInterfaces(uint32_t 
size = 0) {
+  std::vector<std::string> interfaces;
+  if (!network_controllers_.empty()) {
+    if (sufficient_tokens(size) && size < max_payload_) {
+      return network_controllers_;
+    }
+  }
+  return interfaces;
+}
+
+bool NetworkPrioritizerService::sufficient_tokens(uint32_t size) {
+  std::lock_guard<std::mutex> lock(token_mutex_);
+  auto ms = std::chrono::system_clock::now().time_since_epoch() / 
std::chrono::milliseconds(1);
+  auto diff = ms - timestamp_;
+  timestamp_ = ms;
+  if (diff > 0) {
+    tokens_ += diff * tokens_per_ms;
+  }
+  if (bytes_per_token_ > 0 && size > 0) {
+    if (tokens_ * bytes_per_token_ >= size) {
+      return true;
+    } else {
+      return false;
+    }
+  }
+  return true;
+}
+
+void NetworkPrioritizerService::reduce_tokens(uint32_t size) {
+  std::lock_guard<std::mutex> lock(token_mutex_);
+  if (bytes_per_token_ > 0 && size > 0) {
+    uint32_t tokens = size / bytes_per_token_;
+    tokens_ -= tokens;
+  }
+}
+
+bool NetworkPrioritizerService::isRunning() {
+  return getState() == core::controller::ControllerServiceState::ENABLED;
+}
+
+bool NetworkPrioritizerService::isWorkAvailable() {
+  return false;
+}
+
+void NetworkPrioritizerService::onEnable() {
+  std::string controllers, max_throughput, max_payload, df_prioritizer, 
intersect, verify_interfaces, roundrobin_interfaces;
+// if we have defined controller services or we have linked services
+  if (getProperty(NetworkControllers.getName(), controllers) || 
!linked_services_.empty()) {
+    // if this controller service is defined, it will be an intersection of 
this config with linked services.
+    if (getProperty(MaxThroughput.getName(), max_throughput) && 
!max_throughput.empty()) {
+      max_throughput_ = std::stoi(max_throughput);
+      if (max_throughput_ < 1000) {
+        bytes_per_token_ = 1;
+        tokens_ = max_throughput_;
+      } else {
+        bytes_per_token_ = max_throughput_ / 1000;
+      }
+    }
+
+    if (getProperty(MaxPayload.getName(), max_payload) && 
!max_payload.empty()) {
+      max_payload_ = std::stoi(max_payload);
+    }
+
+    if (!controllers.empty()) {
+      network_controllers_ = utils::StringUtils::split(controllers, ",");
+    }
+    if (getProperty(DefaultPrioritizer.getName(), df_prioritizer)) {
+      bool is_default = false;
+      if (utils::StringUtils::StringToBool(df_prioritizer, is_default)) {
+        if (is_default) {
+          if 
(io::NetworkPrioritizerFactory::getInstance()->setPrioritizer(shared_from_this())
 < 0) {
+            std::runtime_error("Can only have one prioritizer");
+          }
+        }
+      }
+    }
+    if (getProperty(VerifyInterfaces.getName(), verify_interfaces)) {
+      utils::StringUtils::StringToBool(verify_interfaces, verify_interfaces_);
+    }
+    timestamp_ = std::chrono::system_clock::now().time_since_epoch() / 
std::chrono::milliseconds(1);
+    enabled_ = true;
+    logger_->log_trace("Enabled enable ");
+  } else {
+    logger_->log_trace("Could not enable ");
+  }
+}
+} /* namespace controllers */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/libminifi/src/io/ClientSocket.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/ClientSocket.cpp 
b/libminifi/src/io/ClientSocket.cpp
index 12dfd98..4846b41 100644
--- a/libminifi/src/io/ClientSocket.cpp
+++ b/libminifi/src/io/ClientSocket.cpp
@@ -48,8 +48,9 @@ Socket::Socket(const std::shared_ptr<SocketContext> &context, 
const std::string
       addr_info_(0),
       socket_file_descriptor_(-1),
       socket_max_(0),
+      total_written_(0),
+      total_read_(0),
       is_loopback_only_(false),
-      local_network_interface_(""),
       listeners_(listeners),
       canonical_hostname_(""),
       nonBlocking_(false),
@@ -66,7 +67,6 @@ Socket::Socket(const Socket &&other)
     : requested_hostname_(std::move(other.requested_hostname_)),
       port_(std::move(other.port_)),
       is_loopback_only_(false),
-      local_network_interface_(""),
       addr_info_(std::move(other.addr_info_)),
       socket_file_descriptor_(other.socket_file_descriptor_),
       socket_max_(other.socket_max_.load()),
@@ -76,6 +76,8 @@ Socket::Socket(const Socket &&other)
       canonical_hostname_(std::move(other.canonical_hostname_)),
       nonBlocking_(false),
       logger_(std::move(other.logger_)) {
+  total_written_ = other.total_written_.load();
+  total_read_ = other.total_read_.load();
 }
 
 Socket::~Socket() {
@@ -92,6 +94,14 @@ void Socket::closeStream() {
     close(socket_file_descriptor_);
     socket_file_descriptor_ = -1;
   }
+  if (total_written_ > 0) {
+    local_network_interface_.log_write(total_written_);
+    total_written_ = 0;
+  }
+  if (total_read_ > 0) {
+    local_network_interface_.log_read(total_read_);
+    total_read_ = 0;
+  }
 }
 
 void Socket::setNonBlocking() {
@@ -108,7 +118,7 @@ int8_t Socket::createConnection(const addrinfo *p, 
in_addr_t &addr) {
 
   setSocketOptions(socket_file_descriptor_);
 
-  if (listeners_ <= 0 && !local_network_interface_.empty()) {
+  if (listeners_ <= 0 && !local_network_interface_.getInterface().empty()) {
     // bind to local network interface
     ifaddrs* list = NULL;
     ifaddrs* item = NULL;
@@ -118,7 +128,7 @@ int8_t Socket::createConnection(const addrinfo *p, 
in_addr_t &addr) {
       item = list;
       while (item) {
         if ((item->ifa_addr != NULL) && (item->ifa_name != NULL) && (AF_INET 
== item->ifa_addr->sa_family)) {
-          if (strcmp(item->ifa_name, local_network_interface_.c_str()) == 0) {
+          if (strcmp(item->ifa_name, 
local_network_interface_.getInterface().c_str()) == 0) {
             itemFound = item;
             break;
           }
@@ -129,11 +139,10 @@ int8_t Socket::createConnection(const addrinfo *p, 
in_addr_t &addr) {
       if (itemFound != NULL) {
         result = bind(socket_file_descriptor_, itemFound->ifa_addr, 
sizeof(struct sockaddr_in));
         if (result < 0)
-          logger_->log_info("Bind to interface %s failed %s", 
local_network_interface_, strerror(errno));
+          logger_->log_info("Bind to interface %s failed %s", 
local_network_interface_.getInterface(), strerror(errno));
         else
-          logger_->log_info("Bind to interface %s", local_network_interface_);
+          logger_->log_info("Bind to interface %s", 
local_network_interface_.getInterface());
       }
-
       freeifaddrs(list);
     }
   }
@@ -366,6 +375,7 @@ int Socket::writeData(uint8_t *value, int size) {
 
   if (ret)
     logger_->log_trace("Send data size %d over socket %d", size, fd);
+  total_written_+=bytes;
   return bytes;
 }
 
@@ -463,6 +473,7 @@ int Socket::readData(uint8_t *buf, int buflen, bool 
retrieve_all_bytes) {
       break;
     }
   }
+  total_read_+=total_read;
   return total_read;
 }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/libminifi/src/sitetosite/Peer.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/sitetosite/Peer.cpp 
b/libminifi/src/sitetosite/Peer.cpp
index b0bfca2..385f991 100644
--- a/libminifi/src/sitetosite/Peer.cpp
+++ b/libminifi/src/sitetosite/Peer.cpp
@@ -41,8 +41,17 @@ bool SiteToSitePeer::Open() {
   if (IsNullOrEmpty(host_))
     return false;
 
-  if (!this->local_network_interface_.empty())
-      stream_->setInterface(local_network_interface_);
+  /**
+   * We may override the interface provided to us within the socket in this 
step; however, this is a
+   * known configuration path, and thus we will allow the RPG configuration to 
override anything provided to us
+   * previously by the socket preference.
+   */
+  if (!this->local_network_interface_.getInterface().empty()) {
+    auto socket = static_cast<io::Socket*>(stream_.get());
+    if (nullptr != socket) {
+      
socket->setInterface(io::NetworkInterface(local_network_interface_.getInterface(),
 nullptr));
+    }
+  }
 
   if (stream_->initialize() < 0)
     return false;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/libminifi/test/TestBase.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/TestBase.cpp b/libminifi/test/TestBase.cpp
index 53a1bf6..a97bc99 100644
--- a/libminifi/test/TestBase.cpp
+++ b/libminifi/test/TestBase.cpp
@@ -18,9 +18,9 @@
 
 #include "./TestBase.h"
 
-TestPlan::TestPlan(std::shared_ptr<core::ContentRepository> content_repo, 
std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::Repository> 
prov_repo, const std::shared_ptr<minifi::state::response::FlowVersion> 
&flow_version)
-    :
-      content_repo_(content_repo),
+TestPlan::TestPlan(std::shared_ptr<core::ContentRepository> content_repo, 
std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::Repository> 
prov_repo,
+                   const std::shared_ptr<minifi::state::response::FlowVersion> 
&flow_version)
+    : content_repo_(content_repo),
       flow_repo_(flow_repo),
       prov_repo_(prov_repo),
       finalized(false),
@@ -28,11 +28,10 @@ TestPlan::TestPlan(std::shared_ptr<core::ContentRepository> 
content_repo, std::s
       current_flowfile_(nullptr),
       flow_version_(flow_version),
       logger_(logging::LoggerFactory<TestPlan>::getLogger()) {
-  stream_factory = 
std::make_shared<org::apache::nifi::minifi::io::StreamFactory>(std::make_shared<minifi::Configure>());
+  stream_factory = 
org::apache::nifi::minifi::io::StreamFactory::getInstance(std::make_shared<minifi::Configure>());
 }
 
-std::shared_ptr<core::Processor> TestPlan::addProcessor(const 
std::shared_ptr<core::Processor> &processor, const std::string &name, 
core::Relationship relationship,
-bool linkToPrevious) {
+std::shared_ptr<core::Processor> TestPlan::addProcessor(const 
std::shared_ptr<core::Processor> &processor, const std::string &name, 
core::Relationship relationship, bool linkToPrevious) {
   if (finalized) {
     return nullptr;
   }
@@ -92,8 +91,7 @@ bool linkToPrevious) {
   return processor;
 }
 
-std::shared_ptr<core::Processor> TestPlan::addProcessor(const std::string 
&processor_name, const std::string &name, core::Relationship relationship,
-bool linkToPrevious) {
+std::shared_ptr<core::Processor> TestPlan::addProcessor(const std::string 
&processor_name, const std::string &name, core::Relationship relationship, bool 
linkToPrevious) {
   if (finalized) {
     return nullptr;
   }
@@ -113,10 +111,7 @@ bool linkToPrevious) {
   return addProcessor(processor, name, relationship, linkToPrevious);
 }
 
-bool TestPlan::setProperty(const std::shared_ptr<core::Processor> proc,
-                           const std::string &prop,
-                           const std::string &value,
-                           bool dynamic) {
+bool TestPlan::setProperty(const std::shared_ptr<core::Processor> proc, const 
std::string &prop, const std::string &value, bool dynamic) {
   std::lock_guard<std::recursive_mutex> guard(mutex);
   int32_t i = 0;
   logger_->log_info("Attempting to set property %s %s for %s", prop, value, 
proc->getName());
@@ -149,7 +144,6 @@ void TestPlan::reset() {
   }
 }
 
-
 bool TestPlan::runNextProcessor(std::function<void(const 
std::shared_ptr<core::ProcessContext>, const 
std::shared_ptr<core::ProcessSession>)> verify) {
   if (!finalized) {
     finalize();

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/libminifi/test/integration/IntegrationBase.h
----------------------------------------------------------------------
diff --git a/libminifi/test/integration/IntegrationBase.h 
b/libminifi/test/integration/IntegrationBase.h
index 887b1a9..cb86b7e 100644
--- a/libminifi/test/integration/IntegrationBase.h
+++ b/libminifi/test/integration/IntegrationBase.h
@@ -97,8 +97,7 @@ void IntegrationBase::run(std::string test_file_location) {
 
   std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
   content_repo->initialize(configuration);
-  std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared
-      <minifi::io::StreamFactory>(configuration);
+  std::shared_ptr<minifi::io::StreamFactory> stream_factory = 
minifi::io::StreamFactory::getInstance(configuration);
   std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr
       <core::YamlConfiguration
       >(new core::YamlConfiguration(test_repo, test_repo, content_repo, 
stream_factory,

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/libminifi/test/integration/ProvenanceReportingTest.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/integration/ProvenanceReportingTest.cpp 
b/libminifi/test/integration/ProvenanceReportingTest.cpp
index 0e8d52f..7fe3c3e 100644
--- a/libminifi/test/integration/ProvenanceReportingTest.cpp
+++ b/libminifi/test/integration/ProvenanceReportingTest.cpp
@@ -58,7 +58,7 @@ int main(int argc, char **argv) {
   std::shared_ptr<core::Repository> test_flow_repo = 
std::make_shared<TestFlowRepository>();
 
   configuration->set(minifi::Configure::nifi_flow_configuration_file, 
test_file_location);
-  std::shared_ptr<minifi::io::StreamFactory> stream_factory = 
std::make_shared<minifi::io::StreamFactory>(configuration);
+  std::shared_ptr<minifi::io::StreamFactory> stream_factory = 
minifi::io::StreamFactory::getInstance(configuration);
   std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
   std::unique_ptr<core::FlowConfiguration> yaml_ptr = 
std::unique_ptr<core::YamlConfiguration>(
       new core::YamlConfiguration(test_repo, test_repo, content_repo, 
stream_factory, configuration, test_file_location));

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/libminifi/test/integration/SecureSocketGetTCPTest.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/integration/SecureSocketGetTCPTest.cpp 
b/libminifi/test/integration/SecureSocketGetTCPTest.cpp
index f9d4261..cc0fc26 100644
--- a/libminifi/test/integration/SecureSocketGetTCPTest.cpp
+++ b/libminifi/test/integration/SecureSocketGetTCPTest.cpp
@@ -128,7 +128,7 @@ class SecureSocketTest : public IntegrationBase {
 
     std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
     content_repo->initialize(configuration);
-    std::shared_ptr<minifi::io::StreamFactory> stream_factory = 
std::make_shared<minifi::io::StreamFactory>(configuration);
+    std::shared_ptr<minifi::io::StreamFactory> stream_factory = 
minifi::io::StreamFactory::getInstance(configuration);
     std::unique_ptr<core::FlowConfiguration> yaml_ptr = 
std::unique_ptr<core::YamlConfiguration>(
         new core::YamlConfiguration(test_repo, test_repo, content_repo, 
stream_factory, configuration, test_file_location));
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/libminifi/test/unit/ControllerTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/ControllerTests.cpp 
b/libminifi/test/unit/ControllerTests.cpp
index 49cb759..c5268ab 100644
--- a/libminifi/test/unit/ControllerTests.cpp
+++ b/libminifi/test/unit/ControllerTests.cpp
@@ -172,7 +172,7 @@ TEST_CASE("TestGet", "[test1]") {
   minifi::c2::ControllerSocketProtocol protocol("testprotocol");
   protocol.initialize(nullptr, ptr, configuration);
 
-  auto stream_factory = 
std::make_shared<minifi::io::StreamFactory>(configuration);
+  auto stream_factory = minifi::io::StreamFactory::getInstance(configuration);
 
   auto socket = stream_factory->createSocket("localhost", 9997);
 
@@ -205,7 +205,7 @@ TEST_CASE("TestClear", "[test1]") {
   minifi::c2::ControllerSocketProtocol protocol("testprotocol");
   protocol.initialize(nullptr, ptr, configuration);
 
-  auto stream_factory = 
std::make_shared<minifi::io::StreamFactory>(configuration);
+  auto stream_factory = minifi::io::StreamFactory::getInstance(configuration);
 
   auto socket = stream_factory->createSocket("localhost", 9997);
 
@@ -241,7 +241,7 @@ TEST_CASE("TestUpdate", "[test1]") {
   minifi::c2::ControllerSocketProtocol protocol("testprotocol");
   protocol.initialize(nullptr, ptr, configuration);
 
-  auto stream_factory = 
std::make_shared<minifi::io::StreamFactory>(configuration);
+  auto stream_factory = minifi::io::StreamFactory::getInstance(configuration);
 
   auto socket = stream_factory->createSocket("localhost", 9997);
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/libminifi/test/unit/GetTCPTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/GetTCPTests.cpp 
b/libminifi/test/unit/GetTCPTests.cpp
index 6e28bf1..34765e7 100644
--- a/libminifi/test/unit/GetTCPTests.cpp
+++ b/libminifi/test/unit/GetTCPTests.cpp
@@ -46,7 +46,7 @@ TEST_CASE("GetTCPWithoutEOM", "[GetTCP1]") {
   content_repo->initialize(std::make_shared<minifi::Configure>());
 
   std::shared_ptr<org::apache::nifi::minifi::io::SocketContext> socket_context 
= 
std::make_shared<org::apache::nifi::minifi::io::SocketContext>(std::make_shared<minifi::Configure>());
-  std::shared_ptr<org::apache::nifi::minifi::io::StreamFactory> stream_factory 
= 
std::make_shared<org::apache::nifi::minifi::io::StreamFactory>(std::make_shared<minifi::Configure>());
+  std::shared_ptr<org::apache::nifi::minifi::io::StreamFactory> stream_factory 
= minifi::io::StreamFactory::getInstance(std::make_shared<minifi::Configure>());
   org::apache::nifi::minifi::io::ServerSocket server(socket_context, 
"localhost", 9184, 1);
 
   REQUIRE(-1 != server.initialize());
@@ -154,7 +154,7 @@ TEST_CASE("GetTCPWithOEM", "[GetTCP2]") {
   content_repo->initialize(std::make_shared<minifi::Configure>());
 
   std::shared_ptr<org::apache::nifi::minifi::io::SocketContext> socket_context 
= 
std::make_shared<org::apache::nifi::minifi::io::SocketContext>(std::make_shared<minifi::Configure>());
-  std::shared_ptr<org::apache::nifi::minifi::io::StreamFactory> stream_factory 
= 
std::make_shared<org::apache::nifi::minifi::io::StreamFactory>(std::make_shared<minifi::Configure>());
+  std::shared_ptr<org::apache::nifi::minifi::io::StreamFactory> stream_factory 
= minifi::io::StreamFactory::getInstance(std::make_shared<minifi::Configure>());
 
   TestController testController;
 
@@ -278,7 +278,7 @@ TEST_CASE("GetTCPWithOnlyOEM", "[GetTCP3]") {
   content_repo->initialize(std::make_shared<minifi::Configure>());
 
   std::shared_ptr<org::apache::nifi::minifi::io::SocketContext> socket_context 
= 
std::make_shared<org::apache::nifi::minifi::io::SocketContext>(std::make_shared<minifi::Configure>());
-  std::shared_ptr<org::apache::nifi::minifi::io::StreamFactory> stream_factory 
= 
std::make_shared<org::apache::nifi::minifi::io::StreamFactory>(std::make_shared<minifi::Configure>());
+  std::shared_ptr<org::apache::nifi::minifi::io::StreamFactory> stream_factory 
= minifi::io::StreamFactory::getInstance(std::make_shared<minifi::Configure>());
 
   TestController testController;
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/libminifi/test/unit/NetworkPrioritizerServiceTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/NetworkPrioritizerServiceTests.cpp 
b/libminifi/test/unit/NetworkPrioritizerServiceTests.cpp
new file mode 100644
index 0000000..765487e
--- /dev/null
+++ b/libminifi/test/unit/NetworkPrioritizerServiceTests.cpp
@@ -0,0 +1,168 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <uuid/uuid.h>
+#include <vector>
+#include <memory>
+#include <utility>
+#include <string>
+#include "../TestBase.h"
+#include "io/ClientSocket.h"
+#include "core/Processor.h"
+#include "../../controller/Controller.h"
+#include "core/controller/ControllerService.h"
+#include "c2/ControllerSocketProtocol.h"
+#include "controllers/NetworkPrioritizerService.h"
+#include "state/UpdateController.h"
+
+TEST_CASE("TestPrioritizerOneInterface", "[test1]") {
+  auto controller = 
std::make_shared<minifi::controllers::NetworkPrioritizerService>("TestService");
+  std::shared_ptr<minifi::Configure> configuration = 
std::make_shared<minifi::Configure>();
+  controller->initialize();
+  
controller->setProperty(minifi::controllers::NetworkPrioritizerService::NetworkControllers,
 "eth0,eth1");
+  
controller->setProperty(minifi::controllers::NetworkPrioritizerService::VerifyInterfaces,
 "false");
+  
controller->setProperty(minifi::controllers::NetworkPrioritizerService::MaxThroughput,
 "10");
+  
controller->setProperty(minifi::controllers::NetworkPrioritizerService::MaxPayload,
 "10");
+  controller->onEnable();
+  REQUIRE("eth0" == controller->getInterface(0).getInterface());
+}
+
+TEST_CASE("TestPrioritizerOneInterfaceMaxPayload", "[test2]") {
+  auto controller = 
std::make_shared<minifi::controllers::NetworkPrioritizerService>("TestService");
+  std::shared_ptr<minifi::Configure> configuration = 
std::make_shared<minifi::Configure>();
+  controller->initialize();
+  
controller->setProperty(minifi::controllers::NetworkPrioritizerService::NetworkControllers,
 "eth0,eth1");
+  
controller->setProperty(minifi::controllers::NetworkPrioritizerService::VerifyInterfaces,
 "false");
+  
controller->setProperty(minifi::controllers::NetworkPrioritizerService::MaxThroughput,
 "1");
+  
controller->setProperty(minifi::controllers::NetworkPrioritizerService::MaxPayload,
 "1");
+  controller->onEnable();
+  // can't because we've triggered the max payload
+  REQUIRE("" == controller->getInterface(5).getInterface());
+}
+
+TEST_CASE("TestPrioritizerOneInterfaceMaxThroughput", "[test3]") {
+  auto controller = 
std::make_shared<minifi::controllers::NetworkPrioritizerService>("TestService");
+  std::shared_ptr<minifi::Configure> configuration = 
std::make_shared<minifi::Configure>();
+  controller->initialize();
+  
controller->setProperty(minifi::controllers::NetworkPrioritizerService::NetworkControllers,
 "eth0,eth1");
+  
controller->setProperty(minifi::controllers::NetworkPrioritizerService::VerifyInterfaces,
 "false");
+  
controller->setProperty(minifi::controllers::NetworkPrioritizerService::MaxThroughput,
 "10");
+  controller->onEnable();
+  // can't because we've triggered the max payload
+  REQUIRE("eth0" == controller->getInterface(5).getInterface());
+  REQUIRE("eth0" == controller->getInterface(5).getInterface());
+  REQUIRE("" == controller->getInterface(5).getInterface());
+  std::this_thread::sleep_for(std::chrono::milliseconds(10));
+  REQUIRE("eth0" == controller->getInterface(5).getInterface());
+}
+
+TEST_CASE("TestPriorotizerMultipleInterfaces", "[test4]") {
+  auto controller = 
std::make_shared<minifi::controllers::NetworkPrioritizerService>("TestService");
+  auto controller2 = 
std::make_shared<minifi::controllers::NetworkPrioritizerService>("TestService2");
+  auto controller3 = 
std::make_shared<minifi::controllers::NetworkPrioritizerService>("TestService3");
+  std::shared_ptr<minifi::Configure> configuration = 
std::make_shared<minifi::Configure>();
+  controller->initialize();
+  
controller->setProperty(minifi::controllers::NetworkPrioritizerService::VerifyInterfaces,
 "false");
+
+  controller3->initialize();
+  
controller3->setProperty(minifi::controllers::NetworkPrioritizerService::NetworkControllers,
 "eth0");
+  
controller3->setProperty(minifi::controllers::NetworkPrioritizerService::VerifyInterfaces,
 "false");
+  
controller3->setProperty(minifi::controllers::NetworkPrioritizerService::MaxThroughput,
 "10");
+  controller3->onEnable();
+
+  controller2->initialize();
+  
controller2->setProperty(minifi::controllers::NetworkPrioritizerService::NetworkControllers,
 "eth1");
+  
controller2->setProperty(minifi::controllers::NetworkPrioritizerService::VerifyInterfaces,
 "false");
+  
controller2->setProperty(minifi::controllers::NetworkPrioritizerService::MaxThroughput,
 "10");
+  controller2->onEnable();
+  std::vector<std::shared_ptr<core::controller::ControllerService> > services;
+  services.push_back(controller2);
+  services.push_back(controller3);
+  controller->setLinkedControllerServices(services);
+  controller->onEnable();
+  // can't because we've triggered the max payload
+  REQUIRE("eth1" == controller->getInterface(5).getInterface());
+  REQUIRE("eth1" == controller->getInterface(5).getInterface());
+  REQUIRE("eth0" == controller->getInterface(5).getInterface());
+  REQUIRE("eth0" == controller->getInterface(5).getInterface());
+}
+
+TEST_CASE("TestPriorotizerMultipleInterfacesNeverSwitch", "[test5]") {
+  auto controller = 
std::make_shared<minifi::controllers::NetworkPrioritizerService>("TestService");
+  auto controller2 = 
std::make_shared<minifi::controllers::NetworkPrioritizerService>("TestService2");
+  auto controller3 = 
std::make_shared<minifi::controllers::NetworkPrioritizerService>("TestService3");
+  std::shared_ptr<minifi::Configure> configuration = 
std::make_shared<minifi::Configure>();
+  controller->initialize();
+  
controller->setProperty(minifi::controllers::NetworkPrioritizerService::VerifyInterfaces,
 "false");
+
+  controller3->initialize();
+  
controller3->setProperty(minifi::controllers::NetworkPrioritizerService::NetworkControllers,
 "eth0");
+  
controller3->setProperty(minifi::controllers::NetworkPrioritizerService::VerifyInterfaces,
 "false");
+  
controller3->setProperty(minifi::controllers::NetworkPrioritizerService::MaxThroughput,
 "1000");
+  controller3->onEnable();
+
+  controller2->initialize();
+  
controller2->setProperty(minifi::controllers::NetworkPrioritizerService::NetworkControllers,
 "eth1");
+  
controller2->setProperty(minifi::controllers::NetworkPrioritizerService::VerifyInterfaces,
 "false");
+  
controller2->setProperty(minifi::controllers::NetworkPrioritizerService::MaxThroughput,
 "10");
+  controller2->onEnable();
+  std::vector<std::shared_ptr<core::controller::ControllerService> > services;
+  services.push_back(controller3);
+  services.push_back(controller2);
+  controller->setLinkedControllerServices(services);
+  controller->onEnable();
+  // can't because we've triggered the max payload
+  for (int i = 0; i < 50; i++) {
+    REQUIRE("eth0" == controller->getInterface(5).getInterface());
+    REQUIRE("eth0" == controller->getInterface(5).getInterface());
+    REQUIRE("eth0" == controller->getInterface(5).getInterface());
+    REQUIRE("eth0" == controller->getInterface(5).getInterface());
+    std::this_thread::sleep_for(std::chrono::milliseconds(10));
+  }
+}
+
+
+TEST_CASE("TestPriorotizerMultipleInterfacesMaxPayload", "[test4]") {
+  auto controller = 
std::make_shared<minifi::controllers::NetworkPrioritizerService>("TestService");
+  auto controller2 = 
std::make_shared<minifi::controllers::NetworkPrioritizerService>("TestService2");
+  auto controller3 = 
std::make_shared<minifi::controllers::NetworkPrioritizerService>("TestService3");
+  std::shared_ptr<minifi::Configure> configuration = 
std::make_shared<minifi::Configure>();
+  controller->initialize();
+  
controller->setProperty(minifi::controllers::NetworkPrioritizerService::VerifyInterfaces,
 "false");
+
+  controller3->initialize();
+  
controller3->setProperty(minifi::controllers::NetworkPrioritizerService::NetworkControllers,
 "eth0");
+  
controller3->setProperty(minifi::controllers::NetworkPrioritizerService::VerifyInterfaces,
 "false");
+  
controller3->setProperty(minifi::controllers::NetworkPrioritizerService::MaxThroughput,
 "1000");
+
+  controller3->onEnable();
+
+  controller2->initialize();
+  
controller2->setProperty(minifi::controllers::NetworkPrioritizerService::NetworkControllers,
 "eth1");
+  
controller2->setProperty(minifi::controllers::NetworkPrioritizerService::VerifyInterfaces,
 "false");
+  
controller2->setProperty(minifi::controllers::NetworkPrioritizerService::MaxThroughput,
 "10");
+  
controller3->setProperty(minifi::controllers::NetworkPrioritizerService::MaxPayload,
 "10");
+  controller2->onEnable();
+  std::vector<std::shared_ptr<core::controller::ControllerService> > services;
+  services.push_back(controller2);
+  services.push_back(controller3);
+  controller->setLinkedControllerServices(services);
+  controller->onEnable();
+  // can't because we've triggered the max payload
+  REQUIRE("eth0" == controller->getInterface(50).getInterface());
+  REQUIRE("eth0" == controller->getInterface(50).getInterface());
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/libminifi/test/unit/ProcessorTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/ProcessorTests.cpp 
b/libminifi/test/unit/ProcessorTests.cpp
index 6e9ae83..8810763 100644
--- a/libminifi/test/unit/ProcessorTests.cpp
+++ b/libminifi/test/unit/ProcessorTests.cpp
@@ -257,7 +257,7 @@ TEST_CASE("Test Find file", "[getfileCreate3]") {
   std::shared_ptr<TestPlan> plan = testController.createPlan();
   std::shared_ptr<core::Processor> processor = plan->addProcessor("GetFile", 
"getfileCreate2");
   std::shared_ptr<core::Processor> processorReport = 
std::make_shared<org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask>(
-      
std::make_shared<org::apache::nifi::minifi::io::StreamFactory>(std::make_shared<org::apache::nifi::minifi::Configure>()),
 std::make_shared<org::apache::nifi::minifi::Configure>());
+      
minifi::io::StreamFactory::getInstance(std::make_shared<org::apache::nifi::minifi::Configure>()),
 std::make_shared<org::apache::nifi::minifi::Configure>());
   plan->addProcessor(processorReport, "reporter", 
core::Relationship("success", "description"), false);
   char format[] = "/tmp/gt.XXXXXX";
   char *dir = testController.createTempDirectory(format);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/libminifi/test/unit/SocketTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/SocketTests.cpp 
b/libminifi/test/unit/SocketTests.cpp
index 7e78846..352e6fd 100644
--- a/libminifi/test/unit/SocketTests.cpp
+++ b/libminifi/test/unit/SocketTests.cpp
@@ -220,9 +220,9 @@ TEST_CASE("TestTLSContextCreation", "[TestSocket6]") {
 TEST_CASE("TestTLSContextCreation2", "[TestSocket7]") {
   std::shared_ptr<minifi::Configure> configure = 
std::make_shared<minifi::Configure>();
   configure->set("nifi.remote.input.secure", "false");
-  minifi::io::StreamFactory factory(configure);
+  auto factory = minifi::io::StreamFactory::getInstance(configure);
   std::string host = "localhost";
-  minifi::io::Socket *socket = factory.createSocket(host, 10001).release();
+  minifi::io::Socket *socket = factory->createSocket(host, 10001).release();
   minifi::io::TLSSocket *tls = dynamic_cast<minifi::io::TLSSocket*>(socket);
   REQUIRE(tls == nullptr);
 }
@@ -234,9 +234,9 @@ TEST_CASE("TestTLSContextCreation2", "[TestSocket7]") {
 TEST_CASE("TestTLSContextCreationNullptr", "[TestSocket7]") {
   std::shared_ptr<minifi::Configure> configure = 
std::make_shared<minifi::Configure>();
   configure->set("nifi.remote.input.secure", "false");
-  minifi::io::StreamFactory factory(configure);
+  auto factory = minifi::io::StreamFactory::getInstance(configure);
   std::string host = "localhost";
-  minifi::io::Socket *socket = factory.createSecureSocket(host, 10001, 
nullptr).release();
+  minifi::io::Socket *socket = factory->createSecureSocket(host, 10001, 
nullptr).release();
   minifi::io::TLSSocket *tls = dynamic_cast<minifi::io::TLSSocket*>(socket);
   REQUIRE(tls == nullptr);
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/libminifi/test/unit/YamlConfigurationTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/YamlConfigurationTests.cpp 
b/libminifi/test/unit/YamlConfigurationTests.cpp
index 724fc64..b05c402 100644
--- a/libminifi/test/unit/YamlConfigurationTests.cpp
+++ b/libminifi/test/unit/YamlConfigurationTests.cpp
@@ -30,7 +30,7 @@ TEST_CASE("Test YAML Config Processing", 
"[YamlConfiguration]") {
   std::shared_ptr<core::Repository> testProvRepo = 
core::createRepository("provenancerepository", true);
   std::shared_ptr<core::Repository> testFlowFileRepo = 
core::createRepository("flowfilerepository", true);
   std::shared_ptr<minifi::Configure> configuration = 
std::make_shared<minifi::Configure>();
-  std::shared_ptr<minifi::io::StreamFactory> streamFactory = 
std::make_shared<minifi::io::StreamFactory>(configuration);
+  std::shared_ptr<minifi::io::StreamFactory> streamFactory = 
minifi::io::StreamFactory::getInstance(configuration);
   std::shared_ptr<core::ContentRepository>
       content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
   core::YamlConfiguration *yamlConfig =
@@ -197,7 +197,7 @@ TEST_CASE("Test YAML v3 Config Processing", 
"[YamlConfiguration3]") {
   std::shared_ptr<core::Repository> testProvRepo = 
core::createRepository("provenancerepository", true);
   std::shared_ptr<core::Repository> testFlowFileRepo = 
core::createRepository("flowfilerepository", true);
   std::shared_ptr<minifi::Configure> configuration = 
std::make_shared<minifi::Configure>();
-  std::shared_ptr<minifi::io::StreamFactory> streamFactory = 
std::make_shared<minifi::io::StreamFactory>(configuration);
+  std::shared_ptr<minifi::io::StreamFactory> streamFactory = 
minifi::io::StreamFactory::getInstance(configuration);
   std::shared_ptr<core::ContentRepository>
       content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
   core::YamlConfiguration *yamlConfig =
@@ -350,7 +350,7 @@ TEST_CASE("Test Dynamic Unsupported", 
"[YamlConfigurationDynamicUnsupported]") {
   std::shared_ptr<core::Repository> testProvRepo = 
core::createRepository("provenancerepository", true);
   std::shared_ptr<core::Repository> testFlowFileRepo = 
core::createRepository("flowfilerepository", true);
   std::shared_ptr<minifi::Configure> configuration = 
std::make_shared<minifi::Configure>();
-  std::shared_ptr<minifi::io::StreamFactory> streamFactory = 
std::make_shared<minifi::io::StreamFactory>(configuration);
+  std::shared_ptr<minifi::io::StreamFactory> streamFactory = 
minifi::io::StreamFactory::getInstance(configuration);
   std::shared_ptr<core::ContentRepository>
       content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
   core::YamlConfiguration *yamlConfig =
@@ -388,7 +388,7 @@ TEST_CASE("Test Required Property", 
"[YamlConfigurationRequiredProperty]") {
   std::shared_ptr<core::Repository> testProvRepo = 
core::createRepository("provenancerepository", true);
   std::shared_ptr<core::Repository> testFlowFileRepo = 
core::createRepository("flowfilerepository", true);
   std::shared_ptr<minifi::Configure> configuration = 
std::make_shared<minifi::Configure>();
-  std::shared_ptr<minifi::io::StreamFactory> streamFactory = 
std::make_shared<minifi::io::StreamFactory>(configuration);
+  std::shared_ptr<minifi::io::StreamFactory> streamFactory = 
minifi::io::StreamFactory::getInstance(configuration);
   std::shared_ptr<core::ContentRepository>
       content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
   core::YamlConfiguration *yamlConfig =
@@ -433,7 +433,7 @@ TEST_CASE("Test Required Property 2", 
"[YamlConfigurationRequiredProperty2]") {
   std::shared_ptr<core::Repository> testProvRepo = 
core::createRepository("provenancerepository", true);
   std::shared_ptr<core::Repository> testFlowFileRepo = 
core::createRepository("flowfilerepository", true);
   std::shared_ptr<minifi::Configure> configuration = 
std::make_shared<minifi::Configure>();
-  std::shared_ptr<minifi::io::StreamFactory> streamFactory = 
std::make_shared<minifi::io::StreamFactory>(configuration);
+  std::shared_ptr<minifi::io::StreamFactory> streamFactory = 
minifi::io::StreamFactory::getInstance(configuration);
   std::shared_ptr<core::ContentRepository>
       content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
   core::YamlConfiguration *yamlConfig =

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/main/MiNiFiMain.cpp
----------------------------------------------------------------------
diff --git a/main/MiNiFiMain.cpp b/main/MiNiFiMain.cpp
index 868b65f..65d79ed 100644
--- a/main/MiNiFiMain.cpp
+++ b/main/MiNiFiMain.cpp
@@ -183,7 +183,7 @@ int main(int argc, char **argv) {
 
   configure->get(minifi::Configure::nifi_configuration_class_name, 
nifi_configuration_class_name);
 
-  std::shared_ptr<minifi::io::StreamFactory> stream_factory = 
std::make_shared<minifi::io::StreamFactory>(configure);
+  std::shared_ptr<minifi::io::StreamFactory> stream_factory = 
minifi::io::StreamFactory::getInstance(configure);
 
   std::unique_ptr<core::FlowConfiguration> flow_configuration = 
core::createFlowConfiguration(prov_repo, flow_repo, content_repo, configure, 
stream_factory, nifi_configuration_class_name);
 

Reply via email to