Repository: nifi-minifi-cpp Updated Branches: refs/heads/master 8eb8cf704 -> 6a672dae8
MINIFICPP-457: Add prioritizer service for Network comunications. MINIFICPP-504: Tie in estimated size for max throughput to RPG MINIFICPP-457: Resolve new test MINIFICPP-457: Resolve issue with trusty This closes #337. Approved on GH by @achristianson Signed-off-by: Marc Parisi <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/6a672dae Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/6a672dae Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/6a672dae Branch: refs/heads/master Commit: 6a672dae8f9e1d9eafe6ee7ec403127f3c1382d3 Parents: 8eb8cf7 Author: Marc Parisi <[email protected]> Authored: Tue May 22 07:56:24 2018 -0400 Committer: Marc Parisi <[email protected]> Committed: Fri May 25 14:52:56 2018 -0400 ---------------------------------------------------------------------- README.md | 36 ++++ controller/Controller.h | 4 +- controller/MiNiFiController.cpp | 2 +- .../http-curl/tests/C2FailedUpdateTest.cpp | 2 +- .../http-curl/tests/C2UpdateAgentTest.cpp | 2 +- extensions/http-curl/tests/C2UpdateTest.cpp | 2 +- .../tests/ControllerServiceIntegrationTests.cpp | 2 +- extensions/http-curl/tests/GetFileNoData.cpp | 3 +- .../http-curl/tests/HttpGetIntegrationTest.cpp | 2 +- libminifi/include/capi/Instance.h | 2 +- .../controllers/NetworkPrioritizerService.h | 147 +++++++++++++ libminifi/include/io/ClientSocket.h | 9 +- libminifi/include/io/DataStream.h | 3 - libminifi/include/io/NetworkPrioritizer.h | 118 +++++++++++ libminifi/include/io/StreamFactory.h | 39 +++- libminifi/include/sitetosite/Peer.h | 11 +- .../include/sitetosite/SiteToSiteFactory.h | 9 +- libminifi/src/RemoteProcessorGroupPort.cpp | 7 +- libminifi/src/c2/ControllerSocketProtocol.cpp | 2 +- libminifi/src/capi/Plan.cpp | 13 +- .../controllers/NetworkPrioritizerService.cpp | 211 +++++++++++++++++++ libminifi/src/io/ClientSocket.cpp | 25 ++- libminifi/src/sitetosite/Peer.cpp | 13 +- libminifi/test/TestBase.cpp | 20 +- libminifi/test/integration/IntegrationBase.h | 3 +- .../integration/ProvenanceReportingTest.cpp | 2 +- .../test/integration/SecureSocketGetTCPTest.cpp | 2 +- libminifi/test/unit/ControllerTests.cpp | 6 +- libminifi/test/unit/GetTCPTests.cpp | 6 +- .../unit/NetworkPrioritizerServiceTests.cpp | 168 +++++++++++++++ libminifi/test/unit/ProcessorTests.cpp | 2 +- libminifi/test/unit/SocketTests.cpp | 8 +- libminifi/test/unit/YamlConfigurationTests.cpp | 10 +- main/MiNiFiMain.cpp | 2 +- 34 files changed, 808 insertions(+), 85 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/README.md ---------------------------------------------------------------------- diff --git a/README.md b/README.md index 32eb897..5a5e639 100644 --- a/README.md +++ b/README.md @@ -702,6 +702,7 @@ Additionally, a unique hexadecimal uid.minifi.device.segment should be assigned Trigger Threshold: 90 Low Battery Threshold: 50 Wait Period: 500 ms + ### MQTT Controller service The MQTTController Service can be configured for MQTT connectivity and provide that capability to your processors when MQTT is built. @@ -713,6 +714,41 @@ The MQTTController Service can be configured for MQTT connectivity and provide t Broker URI: localhost:1883 Client ID: client ID Quality of Service: 2 + +### Network Prioritizer Controller Service + The network prioritizer controller service can be configured to manage prioritizing and binding to specific network interfaces. Linked Services, can be used + as a prioritized list to create a disjunction among multiple networking prioritizers. This allows you to create classes with different configurations that + create multiple prioritizations. Max Throughput is the maximum throughput in bytes per second. Max Payload is the maximum number of bytes supported by that + prioritizer. If a prioritizer is configured with the option "Default Prioritizer: true," then all socket communications will use that default prioritizer. + + In the configuration below there are two classes defined under "NetworkPrioritizerService", one class "NetworkPrioritizerService2" defines en0, and en1. + If en0 is down at any point, then en1 will be given priority before resorting to en2 and en3 of "NetworkPrioritizerService3". If the throughput for + "NetworkPrioritizerService2" exceeds the defined throughput or the max payload of 1024, then "NetworkPrioritizerService3" will be used. If Max Payload and + Max Throughput are not defined, then they will not be limiting factors. For this release, 0.5.0, Max Payload will only be used for processors that custom + implement that feature. RPGs will not support max payloads until 0.6.0. Additionally, since connection queues aren't prioritized, you must have a live connection + for your data to send it. Since connection queues can't be re-prioritized, this can create a starvation problem. The configuration is required to account for this. + + Controller Services: + - name: NetworkPrioritizerService + id: 2438e3c8-015a-1000-79ca-83af40ec1883 + class: NetworkPrioritizerService + Properties: + Linked Services: NetworkPrioritizerService2,NetworkPrioritizerService3 + - name: NetworkPrioritizerService2 + id: 2438e3c8-015a-1000-79ca-83af40ec1884 + class: NetworkPrioritizerService + Properties: + Network Controllers: en0,en1 + Max Throughput: 1,024,1024 + Max Payload: 1024 + - name: NetworkPrioritizerService3 + id: 2438e3c8-015a-1000-79ca-83af40ec1884 + class: NetworkPrioritizerService + Properties: + Network Controllers: en2,en3 + Max Throughput: 1,024,1024 + Max Payload: 1,024,1024 + ### Running After completing a [build](#building), the application can be run by issuing the following from : http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/controller/Controller.h ---------------------------------------------------------------------- diff --git a/controller/Controller.h b/controller/Controller.h index 0a2b292..312b922 100644 --- a/controller/Controller.h +++ b/controller/Controller.h @@ -231,7 +231,7 @@ std::shared_ptr<core::controller::ControllerService> getControllerService(const configuration->get(minifi::Configure::nifi_configuration_class_name, nifi_configuration_class_name); - std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared<minifi::io::StreamFactory>(configuration); + std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration); std::unique_ptr<core::FlowConfiguration> flow_configuration = core::createFlowConfiguration(prov_repo, flow_repo, content_repo, configuration, stream_factory, nifi_configuration_class_name); @@ -288,7 +288,7 @@ std::shared_ptr<core::controller::ControllerService> getControllerService(const configuration->get(minifi::Configure::nifi_configuration_class_name, nifi_configuration_class_name); - std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared<minifi::io::StreamFactory>(configuration); + std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration); std::unique_ptr<core::FlowConfiguration> flow_configuration = core::createFlowConfiguration(prov_repo, flow_repo, content_repo, configuration, stream_factory, nifi_configuration_class_name); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/controller/MiNiFiController.cpp ---------------------------------------------------------------------- diff --git a/controller/MiNiFiController.cpp b/controller/MiNiFiController.cpp index dc12306..5e2b886 100644 --- a/controller/MiNiFiController.cpp +++ b/controller/MiNiFiController.cpp @@ -119,7 +119,7 @@ int main(int argc, char **argv) { secure_context->setDisablePeerVerification(); } - auto stream_factory_ = std::make_shared<minifi::io::StreamFactory>(configuration); + auto stream_factory_ = minifi::io::StreamFactory::getInstance(configuration); std::string host = "localhost", portStr, caCert; int port = -1; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/extensions/http-curl/tests/C2FailedUpdateTest.cpp ---------------------------------------------------------------------- diff --git a/extensions/http-curl/tests/C2FailedUpdateTest.cpp b/extensions/http-curl/tests/C2FailedUpdateTest.cpp index 2aed1f4..fc3e79a 100644 --- a/extensions/http-curl/tests/C2FailedUpdateTest.cpp +++ b/extensions/http-curl/tests/C2FailedUpdateTest.cpp @@ -151,7 +151,7 @@ int main(int argc, char **argv) { configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location); - std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared<minifi::io::StreamFactory>(configuration); + std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration); std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>( new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location)); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/extensions/http-curl/tests/C2UpdateAgentTest.cpp ---------------------------------------------------------------------- diff --git a/extensions/http-curl/tests/C2UpdateAgentTest.cpp b/extensions/http-curl/tests/C2UpdateAgentTest.cpp index 26a3e86..00b761b 100644 --- a/extensions/http-curl/tests/C2UpdateAgentTest.cpp +++ b/extensions/http-curl/tests/C2UpdateAgentTest.cpp @@ -150,7 +150,7 @@ int main(int argc, char **argv) { configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location); configuration->set("c2.agent.update.command", "echo \"verification command\""); - std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared<minifi::io::StreamFactory>(configuration); + std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration); std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>( new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location)); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/extensions/http-curl/tests/C2UpdateTest.cpp ---------------------------------------------------------------------- diff --git a/extensions/http-curl/tests/C2UpdateTest.cpp b/extensions/http-curl/tests/C2UpdateTest.cpp index 52e60f8..c51376d 100644 --- a/extensions/http-curl/tests/C2UpdateTest.cpp +++ b/extensions/http-curl/tests/C2UpdateTest.cpp @@ -149,7 +149,7 @@ int main(int argc, char **argv) { configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location); - std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared<minifi::io::StreamFactory>(configuration); + std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration); std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>( new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location)); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp ---------------------------------------------------------------------- diff --git a/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp b/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp index 612603a..8fb1daf 100644 --- a/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp +++ b/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp @@ -79,7 +79,7 @@ int main(int argc, char **argv) { configuration->set(minifi::Configure::nifi_security_client_pass_phrase, passphrase); configuration->set(minifi::Configure::nifi_default_directory, key_dir); - std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared<minifi::io::StreamFactory>(configuration); + std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration); std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); content_repo->initialize(configuration); std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>( http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/extensions/http-curl/tests/GetFileNoData.cpp ---------------------------------------------------------------------- diff --git a/extensions/http-curl/tests/GetFileNoData.cpp b/extensions/http-curl/tests/GetFileNoData.cpp index 299d994..a0a5dfd 100644 --- a/extensions/http-curl/tests/GetFileNoData.cpp +++ b/extensions/http-curl/tests/GetFileNoData.cpp @@ -146,8 +146,7 @@ int main(int argc, char **argv) { configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location); - std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared - <minifi::io::StreamFactory>(configuration); + std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration); std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr <core::YamlConfiguration http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/extensions/http-curl/tests/HttpGetIntegrationTest.cpp ---------------------------------------------------------------------- diff --git a/extensions/http-curl/tests/HttpGetIntegrationTest.cpp b/extensions/http-curl/tests/HttpGetIntegrationTest.cpp index 9e6e99f..8e6042f 100644 --- a/extensions/http-curl/tests/HttpGetIntegrationTest.cpp +++ b/extensions/http-curl/tests/HttpGetIntegrationTest.cpp @@ -93,7 +93,7 @@ int main(int argc, char **argv) { configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location); - std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared<minifi::io::StreamFactory>(configuration); + std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration); std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/libminifi/include/capi/Instance.h ---------------------------------------------------------------------- diff --git a/libminifi/include/capi/Instance.h b/libminifi/include/capi/Instance.h index bbe0f4c..29e0fcb 100644 --- a/libminifi/include/capi/Instance.h +++ b/libminifi/include/capi/Instance.h @@ -71,8 +71,8 @@ class Instance { listener_thread_pool_(1), content_repo_(std::make_shared<minifi::core::repository::VolatileContentRepository>()), no_op_repo_(std::make_shared<minifi::core::Repository>()) { - stream_factory_ = std::make_shared<minifi::io::StreamFactory>(configure_); running_ = false; + stream_factory_ = minifi::io::StreamFactory::getInstance(configure_); uuid_t uuid; uuid_parse(port.c_str(), uuid); rpg_ = std::make_shared<minifi::RemoteProcessorGroupPort>(stream_factory_, url, url, configure_, uuid); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/libminifi/include/controllers/NetworkPrioritizerService.h ---------------------------------------------------------------------- diff --git a/libminifi/include/controllers/NetworkPrioritizerService.h b/libminifi/include/controllers/NetworkPrioritizerService.h new file mode 100644 index 0000000..bf7916f --- /dev/null +++ b/libminifi/include/controllers/NetworkPrioritizerService.h @@ -0,0 +1,147 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CONTROLLERS_NETWORKPRIORITIZERSERVICE_H_ +#define LIBMINIFI_INCLUDE_CONTROLLERS_NETWORKPRIORITIZERSERVICE_H_ + +#include <iostream> +#include <memory> +#include <limits> +#include "core/Resource.h" +#include "utils/StringUtils.h" +#include "io/validation.h" +#include "controllers/SSLContextService.h" +#include "core/controller/ControllerService.h" +#include "core/logging/LoggerConfiguration.h" +#include "ThreadManagementService.h" +#include "io/NetworkPrioritizer.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace controllers { + +/** + * Purpose: Network prioritizer for selecting network interfaces through the flow configuration. + */ +class NetworkPrioritizerService : public core::controller::ControllerService, public minifi::io::NetworkPrioritizer, public std::enable_shared_from_this<NetworkPrioritizerService> { + public: + explicit NetworkPrioritizerService(const std::string &name, const std::string &id) + : ControllerService(name, id), + enabled_(false), + max_throughput_(std::numeric_limits<uint64_t>::max()), + max_payload_(std::numeric_limits<uint64_t>::max()), + tokens_per_ms(2), + tokens_(1000), + timestamp_(0), + bytes_per_token_(0), + verify_interfaces_(true), + logger_(logging::LoggerFactory<NetworkPrioritizerService>::getLogger()) { + } + + explicit NetworkPrioritizerService(const std::string &name, uuid_t uuid = 0) + : ControllerService(name, uuid), + enabled_(false), + max_throughput_(std::numeric_limits<uint64_t>::max()), + max_payload_(std::numeric_limits<uint64_t>::max()), + tokens_per_ms(2), + tokens_(1000), + timestamp_(0), + bytes_per_token_(0), + verify_interfaces_(true), + logger_(logging::LoggerFactory<NetworkPrioritizerService>::getLogger()) { + } + + explicit NetworkPrioritizerService(const std::string &name, const std::shared_ptr<Configure> &configuration) + : NetworkPrioritizerService(name, nullptr) { + setConfiguration(configuration); + initialize(); + } + + static core::Property NetworkControllers; + static core::Property MaxThroughput; + static core::Property MaxPayload; + static core::Property VerifyInterfaces; + static core::Property DefaultPrioritizer; + + void initialize(); + + void yield(); + + bool isRunning(); + + bool isWorkAvailable(); + + virtual void onEnable(); + + virtual io::NetworkInterface &&getInterface(uint32_t size); + + protected: + + std::string get_nearest_interface(const std::vector<std::string> &ifcs); + + bool interface_online(const std::string &ifc); + + std::vector<std::string> getInterfaces(uint32_t size); + + bool sufficient_tokens(uint32_t size); + + virtual void reduce_tokens(uint32_t size); + + bool enabled_; + + uint64_t max_throughput_; + + uint64_t max_payload_; + + std::vector<std::string> network_controllers_; + + int tokens_per_ms; + + /** + * Using a variation of the token bucket algorithm. + * every millisecond 1 token will be added to the bucket. max throughput will define a maximum rate per second. + * + * When a request for data arrives to send and not enough tokens exist, we will restrict sending through the interfaces defined here. + * + * When a request arrives tokens will be decremented. We will compute the amount of data that can be sent per token from the configuration + * of max_throughput_ + */ + uint32_t tokens_; + + std::mutex token_mutex_; + + uint64_t timestamp_; + + uint32_t bytes_per_token_; + + bool verify_interfaces_; + + private: + std::shared_ptr<logging::Logger> logger_; +}; + +REGISTER_RESOURCE(NetworkPrioritizerService); + +} /* namespace controllers */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_CONTROLLERS_NETWORKPRIORITIZERSERVICE_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/libminifi/include/io/ClientSocket.h ---------------------------------------------------------------------- diff --git a/libminifi/include/io/ClientSocket.h b/libminifi/include/io/ClientSocket.h index 32f04a5..e4b7604 100644 --- a/libminifi/include/io/ClientSocket.h +++ b/libminifi/include/io/ClientSocket.h @@ -30,6 +30,7 @@ #include "core/logging/Logger.h" #include "io/validation.h" #include "properties/Configure.h" +#include "io/NetworkPrioritizer.h" namespace org { namespace apache { @@ -91,8 +92,8 @@ class Socket : public BaseStream { */ virtual int16_t initialize(); - virtual void setInterface(std::string &interface) { - local_network_interface_ = interface; + virtual void setInterface(io::NetworkInterface &&interface) { + local_network_interface_ = std::move(interface); } /** @@ -260,7 +261,7 @@ class Socket : public BaseStream { uint16_t port_; bool is_loopback_only_; - std::string local_network_interface_; + io::NetworkInterface local_network_interface_; // connection information int32_t socket_file_descriptor_; @@ -268,6 +269,8 @@ class Socket : public BaseStream { fd_set total_list_; fd_set read_fds_; std::atomic<uint16_t> socket_max_; + std::atomic<uint64_t> total_written_; + std::atomic<uint64_t> total_read_; uint16_t listeners_; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/libminifi/include/io/DataStream.h ---------------------------------------------------------------------- diff --git a/libminifi/include/io/DataStream.h b/libminifi/include/io/DataStream.h index a61ff5a..825cd89 100644 --- a/libminifi/include/io/DataStream.h +++ b/libminifi/include/io/DataStream.h @@ -69,9 +69,6 @@ class DataStream { } - virtual void setInterface(std::string &interface) { - } - /** * Reads data and places it into buf * @param buf buffer in which we extract data http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/libminifi/include/io/NetworkPrioritizer.h ---------------------------------------------------------------------- diff --git a/libminifi/include/io/NetworkPrioritizer.h b/libminifi/include/io/NetworkPrioritizer.h new file mode 100644 index 0000000..7265f49 --- /dev/null +++ b/libminifi/include/io/NetworkPrioritizer.h @@ -0,0 +1,118 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_IO_NETWORKPRIORITIZER_H_ +#define LIBMINIFI_INCLUDE_IO_NETWORKPRIORITIZER_H_ + +#include <iostream> +#include <memory> + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace io { + +class NetworkInterface; + +class NetworkPrioritizer { + public: + + virtual ~NetworkPrioritizer() { + } + + virtual NetworkInterface &&getInterface(uint32_t size) = 0; + + protected: + friend class NetworkInterface; + virtual void reduce_tokens(uint32_t size) = 0; + +}; + +class NetworkInterface { + public: + + NetworkInterface() : prioritizer_(nullptr){ + } + + virtual ~NetworkInterface(){ + } + + explicit NetworkInterface(const std::string &ifc, const std::shared_ptr<NetworkPrioritizer> &prioritizer) + : ifc_(ifc), + prioritizer_(prioritizer) { + } + + explicit NetworkInterface(const NetworkInterface &&other) + : ifc_(std::move(other.ifc_)), + prioritizer_(std::move(other.prioritizer_)) { + } + + std::string getInterface() const { + return ifc_; + } + void log_write(uint32_t size) { + if (nullptr != prioritizer_) { + prioritizer_->reduce_tokens(size); + } + } + + void log_read(uint32_t size) { + if (nullptr != prioritizer_) { + prioritizer_->reduce_tokens(size); + } + } + + NetworkInterface &operator=(const NetworkInterface &&other) { + ifc_ = std::move(other.ifc_); + prioritizer_ = std::move(other.prioritizer_); + return *this; + } + private: + friend class NetworkPrioritizer; + std::string ifc_; + std::shared_ptr<NetworkPrioritizer> prioritizer_; +}; + +class NetworkPrioritizerFactory { + public: + static std::shared_ptr<NetworkPrioritizerFactory> getInstance() { + static std::shared_ptr<NetworkPrioritizerFactory> fa = std::make_shared<NetworkPrioritizerFactory>(); + return fa; + } + + int setPrioritizer(const std::shared_ptr<NetworkPrioritizer> &prioritizer) { + if (np_ != nullptr) + return -1; + np_ = prioritizer; + return 0; + } + + std::shared_ptr<NetworkPrioritizer> getPrioritizer() { + return np_; + } + private: + std::shared_ptr<NetworkPrioritizer> np_; +}; + +} /* namespace io */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_IO_NETWORKPRIORITIZER_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/libminifi/include/io/StreamFactory.h ---------------------------------------------------------------------- diff --git a/libminifi/include/io/StreamFactory.h b/libminifi/include/io/StreamFactory.h index bbb09b9..f3fc95e 100644 --- a/libminifi/include/io/StreamFactory.h +++ b/libminifi/include/io/StreamFactory.h @@ -22,6 +22,7 @@ #include "utils/StringUtils.h" #include "validation.h" #include "controllers/SSLContextService.h" +#include "NetworkPrioritizer.h" namespace org { namespace apache { namespace nifi { @@ -50,21 +51,49 @@ class StreamFactory { * Creates a socket and returns a unique ptr * */ - std::unique_ptr<Socket> createSocket(const std::string &host, const uint16_t port) { - return delegate_->createSocket(host, port); + std::unique_ptr<Socket> createSocket(const std::string &host, const uint16_t port, uint32_t estimated_size = 0) { + auto socket = delegate_->createSocket(host, port); + auto prioritizer_ = NetworkPrioritizerFactory::getInstance()->getPrioritizer(); + if (nullptr != prioritizer_) { + std::cout << "prioritizer" << std::endl; + auto &&ifc = prioritizer_->getInterface(estimated_size); + if (ifc.getInterface().empty()) { + return nullptr; + } else { + socket->setInterface(std::move(ifc)); + } + } + return socket; } /** * Creates a socket and returns a unique ptr * */ - std::unique_ptr<Socket> createSecureSocket(const std::string &host, const uint16_t port, const std::shared_ptr<minifi::controllers::SSLContextService> &ssl_service) { - return delegate_->createSecureSocket(host, port, ssl_service); + std::unique_ptr<Socket> createSecureSocket(const std::string &host, const uint16_t port, const std::shared_ptr<minifi::controllers::SSLContextService> &ssl_service, uint32_t estimated_size = 0) { + auto socket = delegate_->createSecureSocket(host, port, ssl_service); + auto prioritizer_ = NetworkPrioritizerFactory::getInstance()->getPrioritizer(); + if (nullptr != prioritizer_) { + auto &&ifc = prioritizer_->getInterface(estimated_size); + if (ifc.getInterface().empty()) { + return nullptr; + } else { + socket->setInterface(std::move(ifc)); + } + } + return socket; } - StreamFactory(const std::shared_ptr<Configure> &configure); + static std::shared_ptr<StreamFactory> getInstance(const std::shared_ptr<Configure> &configuration) { + // avoid invalid access + static std::shared_ptr<StreamFactory> factory = std::shared_ptr<StreamFactory>(new StreamFactory(configuration)); + return factory; + } protected: + + StreamFactory(const std::shared_ptr<Configure> &configure); + std::shared_ptr<AbstractStreamFactory> delegate_; }; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/libminifi/include/sitetosite/Peer.h ---------------------------------------------------------------------- diff --git a/libminifi/include/sitetosite/Peer.h b/libminifi/include/sitetosite/Peer.h index 1f9ec01..3775c7c 100644 --- a/libminifi/include/sitetosite/Peer.h +++ b/libminifi/include/sitetosite/Peer.h @@ -143,7 +143,6 @@ class SiteToSitePeer : public org::apache::nifi::minifi::io::BaseStream { : stream_(nullptr), host_(""), port_(-1), - local_network_interface_(""), logger_(logging::LoggerFactory<SiteToSitePeer>::getLogger()) { } @@ -159,13 +158,13 @@ class SiteToSitePeer : public org::apache::nifi::minifi::io::BaseStream { : stream_(nullptr), host_(host), port_(port), - local_network_interface_(interface), timeout_(30000), yield_expiration_(0), logger_(logging::LoggerFactory<SiteToSitePeer>::getLogger()) { url_ = "nifi://" + host_ + ":" + std::to_string(port_); yield_expiration_ = 0; timeout_ = 30000; // 30 seconds + local_network_interface_= std::move(io::NetworkInterface(interface, nullptr)); } explicit SiteToSitePeer(SiteToSitePeer &&ss) @@ -193,10 +192,10 @@ class SiteToSitePeer : public org::apache::nifi::minifi::io::BaseStream { } // setInterface void setInterface(std::string &interface) { - local_network_interface_ = interface; + local_network_interface_ = std::move(io::NetworkInterface(interface,nullptr)); } std::string getInterface() { - return local_network_interface_; + return local_network_interface_.getInterface(); } // Get Processor yield period in MilliSecond uint64_t getYieldPeriodMsec(void) { @@ -288,7 +287,7 @@ class SiteToSitePeer : public org::apache::nifi::minifi::io::BaseStream { void setStream(std::unique_ptr<org::apache::nifi::minifi::io::DataStream> stream) { stream_ = nullptr; if (stream) - stream_ = std::move(stream); + stream_ = std::move(stream); } org::apache::nifi::minifi::io::DataStream *getStream() { @@ -374,7 +373,7 @@ class SiteToSitePeer : public org::apache::nifi::minifi::io::BaseStream { uint16_t port_; - std::string local_network_interface_; + io::NetworkInterface local_network_interface_; utils::HTTPProxy proxy_; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/libminifi/include/sitetosite/SiteToSiteFactory.h ---------------------------------------------------------------------- diff --git a/libminifi/include/sitetosite/SiteToSiteFactory.h b/libminifi/include/sitetosite/SiteToSiteFactory.h index 848a152..4f0f36c 100644 --- a/libminifi/include/sitetosite/SiteToSiteFactory.h +++ b/libminifi/include/sitetosite/SiteToSiteFactory.h @@ -44,6 +44,9 @@ static std::unique_ptr<SiteToSitePeer> createStreamingPeer(const SiteToSiteClien str = std::unique_ptr<org::apache::nifi::minifi::io::DataStream>( client_configuration.getStreamFactory()->createSocket(client_configuration.getPeer()->getHost(), client_configuration.getPeer()->getPort())); } + + if (nullptr == str) + return nullptr; auto peer = std::unique_ptr<SiteToSitePeer>(new SiteToSitePeer(std::move(str), client_configuration.getPeer()->getHost(), client_configuration.getPeer()->getPort(), client_configuration.getInterface())); return peer; @@ -57,7 +60,11 @@ static std::unique_ptr<SiteToSitePeer> createStreamingPeer(const SiteToSiteClien static std::unique_ptr<SiteToSiteClient> createRawSocket(const SiteToSiteClientConfiguration &client_configuration) { uuid_t uuid; client_configuration.getPeer()->getPortId(uuid); - auto ptr = std::unique_ptr<SiteToSiteClient>(new RawSiteToSiteClient(createStreamingPeer(client_configuration))); + auto rsptr = createStreamingPeer(client_configuration); + if (nullptr == rsptr){ + return nullptr; + } + auto ptr = std::unique_ptr<SiteToSiteClient>(new RawSiteToSiteClient(std::move(rsptr))); ptr->setPortId(uuid); ptr->setSSLContextService(client_configuration.getSecurityContext()); return ptr; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/libminifi/src/RemoteProcessorGroupPort.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp b/libminifi/src/RemoteProcessorGroupPort.cpp index 68f6831..629075c 100644 --- a/libminifi/src/RemoteProcessorGroupPort.cpp +++ b/libminifi/src/RemoteProcessorGroupPort.cpp @@ -354,13 +354,14 @@ void RemoteProcessorGroupPort::refreshPeerList() { this->peers_.clear(); std::unique_ptr<sitetosite::SiteToSiteClient> protocol; - sitetosite::SiteToSiteClientConfiguration config(stream_factory_, std::make_shared<sitetosite::Peer>(protocol_uuid_, host_, - site2site_port_, ssl_service != nullptr), this->getInterface(), client_type_); + sitetosite::SiteToSiteClientConfiguration config(stream_factory_, std::make_shared<sitetosite::Peer>(protocol_uuid_, host_, site2site_port_, ssl_service != nullptr), this->getInterface(), + client_type_); config.setSecurityContext(ssl_service); config.setHTTPProxy(this->proxy_); protocol = sitetosite::createClient(config); - protocol->getPeerList(peers_); + if (protocol) + protocol->getPeerList(peers_); logging::LOG_INFO(logger_) << "Have " << peers_.size() << " peers"; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/libminifi/src/c2/ControllerSocketProtocol.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/c2/ControllerSocketProtocol.cpp b/libminifi/src/c2/ControllerSocketProtocol.cpp index ea9cf9b..58b835b 100644 --- a/libminifi/src/c2/ControllerSocketProtocol.cpp +++ b/libminifi/src/c2/ControllerSocketProtocol.cpp @@ -32,7 +32,7 @@ namespace c2 { void ControllerSocketProtocol::initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<state::StateMonitor> &updateSink, const std::shared_ptr<Configure> &configuration) { HeartBeatReporter::initialize(controller, updateSink, configuration); - stream_factory_ = std::make_shared<minifi::io::StreamFactory>(configuration); + stream_factory_ = minifi::io::StreamFactory::getInstance(configuration); std::string host = "localhost", port, limitStr, context_name; bool anyInterface = false; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/libminifi/src/capi/Plan.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/capi/Plan.cpp b/libminifi/src/capi/Plan.cpp index 6e5c7f6..6181382 100644 --- a/libminifi/src/capi/Plan.cpp +++ b/libminifi/src/capi/Plan.cpp @@ -23,24 +23,21 @@ #include <string> ExecutionPlan::ExecutionPlan(std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::Repository> prov_repo) - : - content_repo_(content_repo), + : content_repo_(content_repo), flow_repo_(flow_repo), prov_repo_(prov_repo), finalized(false), location(-1), current_flowfile_(nullptr), logger_(logging::LoggerFactory<ExecutionPlan>::getLogger()) { - stream_factory = std::make_shared<org::apache::nifi::minifi::io::StreamFactory>(std::make_shared<minifi::Configure>()); + stream_factory = org::apache::nifi::minifi::io::StreamFactory::getInstance(std::make_shared<minifi::Configure>()); } -std::shared_ptr<core::Processor> ExecutionPlan::addProcessor(const std::shared_ptr<core::Processor> &processor, const std::string &name, core::Relationship relationship, -bool linkToPrevious) { +std::shared_ptr<core::Processor> ExecutionPlan::addProcessor(const std::shared_ptr<core::Processor> &processor, const std::string &name, core::Relationship relationship, bool linkToPrevious) { if (finalized) { return nullptr; } - uuid_t uuid; uuid_generate(uuid); @@ -93,8 +90,7 @@ bool linkToPrevious) { return processor; } -std::shared_ptr<core::Processor> ExecutionPlan::addProcessor(const std::string &processor_name, const std::string &name, core::Relationship relationship, -bool linkToPrevious) { +std::shared_ptr<core::Processor> ExecutionPlan::addProcessor(const std::string &processor_name, const std::string &name, core::Relationship relationship, bool linkToPrevious) { if (finalized) { return nullptr; } @@ -141,7 +137,6 @@ void ExecutionPlan::reset() { } } - bool ExecutionPlan::runNextProcessor(std::function<void(const std::shared_ptr<core::ProcessContext>, const std::shared_ptr<core::ProcessSession>)> verify) { if (!finalized) { finalize(); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/libminifi/src/controllers/NetworkPrioritizerService.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/controllers/NetworkPrioritizerService.cpp b/libminifi/src/controllers/NetworkPrioritizerService.cpp new file mode 100644 index 0000000..d88921b --- /dev/null +++ b/libminifi/src/controllers/NetworkPrioritizerService.cpp @@ -0,0 +1,211 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "controllers/NetworkPrioritizerService.h" +#include <cstdio> +#include <utility> +#include <limits> +#include <string> +#include <vector> +#include <sys/ioctl.h> +#include <ifaddrs.h> +#include <net/if.h> +#include <netinet/in.h> +#include <string.h> +#include <sys/socket.h> +#include <netdb.h> +#include <stdlib.h> +#include <unistd.h> +#include <set> +#include "utils/StringUtils.h" +#if ( defined(__APPLE__) || defined(__MACH__) || defined(BSD)) +#include <net/if_dl.h> +#include <net/if_types.h> +#endif + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace controllers { + +core::Property NetworkPrioritizerService::NetworkControllers("Network Controllers", "Network controllers in order of priority for this prioritizer"); +core::Property NetworkPrioritizerService::MaxThroughput("Max Throughput", "Max throughput for these network controllers"); +core::Property NetworkPrioritizerService::MaxPayload("Max Payload", "Maximum payload for these network controllers"); +core::Property NetworkPrioritizerService::VerifyInterfaces("Verify Interfaces", "Verify that interfaces are operational", "true"); +core::Property NetworkPrioritizerService::DefaultPrioritizer("Default Prioritizer", "Sets this controller service as the default prioritizer for all comms"); + +void NetworkPrioritizerService::initialize() { + std::set<core::Property> supportedProperties; + supportedProperties.insert(NetworkControllers); + supportedProperties.insert(MaxThroughput); + supportedProperties.insert(MaxPayload); + supportedProperties.insert(VerifyInterfaces); + supportedProperties.insert(DefaultPrioritizer); + setSupportedProperties(supportedProperties); +} + +void NetworkPrioritizerService::yield() { +} + +/** + * If not an intersecting operation we will attempt to locate the highest priority interface available. + */ +io::NetworkInterface &&NetworkPrioritizerService::getInterface(uint32_t size = 0) { + std::vector<std::string> controllers; + if (!network_controllers_.empty()) { + if (sufficient_tokens(size) && size < max_payload_) { + controllers.insert(std::end(controllers), std::begin(network_controllers_), std::end(network_controllers_)); + } + } + + if (!controllers.empty()) { + auto ifc = get_nearest_interface(controllers); + if (!ifc.empty()) { + reduce_tokens(size); + return std::move(io::NetworkInterface(ifc, shared_from_this())); + } + } + for (size_t i = 0; i < linked_services_.size(); i++) { + auto np = std::dynamic_pointer_cast<NetworkPrioritizerService>(linked_services_.at(i)); + if (np != nullptr) { + auto ifcs = np->getInterfaces(size); + auto ifc = get_nearest_interface(ifcs); + if (!ifc.empty()) { + np->reduce_tokens(size); + return std::move(io::NetworkInterface(ifc, np)); + } + } + } + return std::move(io::NetworkInterface("", nullptr)); +} + +std::string NetworkPrioritizerService::get_nearest_interface(const std::vector<std::string> &ifcs) { + for (auto ifc : ifcs) { + if (!verify_interfaces_ || interface_online(ifc)) { + logger_->log_debug("%s is online", ifc); + return ifc; + } else { + logger_->log_debug("%s is not online", ifc); + } + } + return ""; +} + +bool NetworkPrioritizerService::interface_online(const std::string &ifc) { + struct ifreq ifr; + auto sockid = socket(PF_INET6, SOCK_DGRAM, IPPROTO_IP); + memset(&ifr, 0, sizeof(ifr)); + snprintf(ifr.ifr_name, ifc.length(), "%s", ifc.c_str()); + if (ioctl(sockid, SIOCGIFFLAGS, &ifr) < 0) { + return false; + } + close(sockid); + return (ifr.ifr_flags & IFF_UP) && (ifr.ifr_flags & IFF_RUNNING); +} + +std::vector<std::string> NetworkPrioritizerService::getInterfaces(uint32_t size = 0) { + std::vector<std::string> interfaces; + if (!network_controllers_.empty()) { + if (sufficient_tokens(size) && size < max_payload_) { + return network_controllers_; + } + } + return interfaces; +} + +bool NetworkPrioritizerService::sufficient_tokens(uint32_t size) { + std::lock_guard<std::mutex> lock(token_mutex_); + auto ms = std::chrono::system_clock::now().time_since_epoch() / std::chrono::milliseconds(1); + auto diff = ms - timestamp_; + timestamp_ = ms; + if (diff > 0) { + tokens_ += diff * tokens_per_ms; + } + if (bytes_per_token_ > 0 && size > 0) { + if (tokens_ * bytes_per_token_ >= size) { + return true; + } else { + return false; + } + } + return true; +} + +void NetworkPrioritizerService::reduce_tokens(uint32_t size) { + std::lock_guard<std::mutex> lock(token_mutex_); + if (bytes_per_token_ > 0 && size > 0) { + uint32_t tokens = size / bytes_per_token_; + tokens_ -= tokens; + } +} + +bool NetworkPrioritizerService::isRunning() { + return getState() == core::controller::ControllerServiceState::ENABLED; +} + +bool NetworkPrioritizerService::isWorkAvailable() { + return false; +} + +void NetworkPrioritizerService::onEnable() { + std::string controllers, max_throughput, max_payload, df_prioritizer, intersect, verify_interfaces, roundrobin_interfaces; +// if we have defined controller services or we have linked services + if (getProperty(NetworkControllers.getName(), controllers) || !linked_services_.empty()) { + // if this controller service is defined, it will be an intersection of this config with linked services. + if (getProperty(MaxThroughput.getName(), max_throughput) && !max_throughput.empty()) { + max_throughput_ = std::stoi(max_throughput); + if (max_throughput_ < 1000) { + bytes_per_token_ = 1; + tokens_ = max_throughput_; + } else { + bytes_per_token_ = max_throughput_ / 1000; + } + } + + if (getProperty(MaxPayload.getName(), max_payload) && !max_payload.empty()) { + max_payload_ = std::stoi(max_payload); + } + + if (!controllers.empty()) { + network_controllers_ = utils::StringUtils::split(controllers, ","); + } + if (getProperty(DefaultPrioritizer.getName(), df_prioritizer)) { + bool is_default = false; + if (utils::StringUtils::StringToBool(df_prioritizer, is_default)) { + if (is_default) { + if (io::NetworkPrioritizerFactory::getInstance()->setPrioritizer(shared_from_this()) < 0) { + std::runtime_error("Can only have one prioritizer"); + } + } + } + } + if (getProperty(VerifyInterfaces.getName(), verify_interfaces)) { + utils::StringUtils::StringToBool(verify_interfaces, verify_interfaces_); + } + timestamp_ = std::chrono::system_clock::now().time_since_epoch() / std::chrono::milliseconds(1); + enabled_ = true; + logger_->log_trace("Enabled enable "); + } else { + logger_->log_trace("Could not enable "); + } +} +} /* namespace controllers */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/libminifi/src/io/ClientSocket.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/io/ClientSocket.cpp b/libminifi/src/io/ClientSocket.cpp index 12dfd98..4846b41 100644 --- a/libminifi/src/io/ClientSocket.cpp +++ b/libminifi/src/io/ClientSocket.cpp @@ -48,8 +48,9 @@ Socket::Socket(const std::shared_ptr<SocketContext> &context, const std::string addr_info_(0), socket_file_descriptor_(-1), socket_max_(0), + total_written_(0), + total_read_(0), is_loopback_only_(false), - local_network_interface_(""), listeners_(listeners), canonical_hostname_(""), nonBlocking_(false), @@ -66,7 +67,6 @@ Socket::Socket(const Socket &&other) : requested_hostname_(std::move(other.requested_hostname_)), port_(std::move(other.port_)), is_loopback_only_(false), - local_network_interface_(""), addr_info_(std::move(other.addr_info_)), socket_file_descriptor_(other.socket_file_descriptor_), socket_max_(other.socket_max_.load()), @@ -76,6 +76,8 @@ Socket::Socket(const Socket &&other) canonical_hostname_(std::move(other.canonical_hostname_)), nonBlocking_(false), logger_(std::move(other.logger_)) { + total_written_ = other.total_written_.load(); + total_read_ = other.total_read_.load(); } Socket::~Socket() { @@ -92,6 +94,14 @@ void Socket::closeStream() { close(socket_file_descriptor_); socket_file_descriptor_ = -1; } + if (total_written_ > 0) { + local_network_interface_.log_write(total_written_); + total_written_ = 0; + } + if (total_read_ > 0) { + local_network_interface_.log_read(total_read_); + total_read_ = 0; + } } void Socket::setNonBlocking() { @@ -108,7 +118,7 @@ int8_t Socket::createConnection(const addrinfo *p, in_addr_t &addr) { setSocketOptions(socket_file_descriptor_); - if (listeners_ <= 0 && !local_network_interface_.empty()) { + if (listeners_ <= 0 && !local_network_interface_.getInterface().empty()) { // bind to local network interface ifaddrs* list = NULL; ifaddrs* item = NULL; @@ -118,7 +128,7 @@ int8_t Socket::createConnection(const addrinfo *p, in_addr_t &addr) { item = list; while (item) { if ((item->ifa_addr != NULL) && (item->ifa_name != NULL) && (AF_INET == item->ifa_addr->sa_family)) { - if (strcmp(item->ifa_name, local_network_interface_.c_str()) == 0) { + if (strcmp(item->ifa_name, local_network_interface_.getInterface().c_str()) == 0) { itemFound = item; break; } @@ -129,11 +139,10 @@ int8_t Socket::createConnection(const addrinfo *p, in_addr_t &addr) { if (itemFound != NULL) { result = bind(socket_file_descriptor_, itemFound->ifa_addr, sizeof(struct sockaddr_in)); if (result < 0) - logger_->log_info("Bind to interface %s failed %s", local_network_interface_, strerror(errno)); + logger_->log_info("Bind to interface %s failed %s", local_network_interface_.getInterface(), strerror(errno)); else - logger_->log_info("Bind to interface %s", local_network_interface_); + logger_->log_info("Bind to interface %s", local_network_interface_.getInterface()); } - freeifaddrs(list); } } @@ -366,6 +375,7 @@ int Socket::writeData(uint8_t *value, int size) { if (ret) logger_->log_trace("Send data size %d over socket %d", size, fd); + total_written_+=bytes; return bytes; } @@ -463,6 +473,7 @@ int Socket::readData(uint8_t *buf, int buflen, bool retrieve_all_bytes) { break; } } + total_read_+=total_read; return total_read; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/libminifi/src/sitetosite/Peer.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/sitetosite/Peer.cpp b/libminifi/src/sitetosite/Peer.cpp index b0bfca2..385f991 100644 --- a/libminifi/src/sitetosite/Peer.cpp +++ b/libminifi/src/sitetosite/Peer.cpp @@ -41,8 +41,17 @@ bool SiteToSitePeer::Open() { if (IsNullOrEmpty(host_)) return false; - if (!this->local_network_interface_.empty()) - stream_->setInterface(local_network_interface_); + /** + * We may override the interface provided to us within the socket in this step; however, this is a + * known configuration path, and thus we will allow the RPG configuration to override anything provided to us + * previously by the socket preference. + */ + if (!this->local_network_interface_.getInterface().empty()) { + auto socket = static_cast<io::Socket*>(stream_.get()); + if (nullptr != socket) { + socket->setInterface(io::NetworkInterface(local_network_interface_.getInterface(), nullptr)); + } + } if (stream_->initialize() < 0) return false; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/libminifi/test/TestBase.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/TestBase.cpp b/libminifi/test/TestBase.cpp index 53a1bf6..a97bc99 100644 --- a/libminifi/test/TestBase.cpp +++ b/libminifi/test/TestBase.cpp @@ -18,9 +18,9 @@ #include "./TestBase.h" -TestPlan::TestPlan(std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::Repository> prov_repo, const std::shared_ptr<minifi::state::response::FlowVersion> &flow_version) - : - content_repo_(content_repo), +TestPlan::TestPlan(std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::Repository> prov_repo, + const std::shared_ptr<minifi::state::response::FlowVersion> &flow_version) + : content_repo_(content_repo), flow_repo_(flow_repo), prov_repo_(prov_repo), finalized(false), @@ -28,11 +28,10 @@ TestPlan::TestPlan(std::shared_ptr<core::ContentRepository> content_repo, std::s current_flowfile_(nullptr), flow_version_(flow_version), logger_(logging::LoggerFactory<TestPlan>::getLogger()) { - stream_factory = std::make_shared<org::apache::nifi::minifi::io::StreamFactory>(std::make_shared<minifi::Configure>()); + stream_factory = org::apache::nifi::minifi::io::StreamFactory::getInstance(std::make_shared<minifi::Configure>()); } -std::shared_ptr<core::Processor> TestPlan::addProcessor(const std::shared_ptr<core::Processor> &processor, const std::string &name, core::Relationship relationship, -bool linkToPrevious) { +std::shared_ptr<core::Processor> TestPlan::addProcessor(const std::shared_ptr<core::Processor> &processor, const std::string &name, core::Relationship relationship, bool linkToPrevious) { if (finalized) { return nullptr; } @@ -92,8 +91,7 @@ bool linkToPrevious) { return processor; } -std::shared_ptr<core::Processor> TestPlan::addProcessor(const std::string &processor_name, const std::string &name, core::Relationship relationship, -bool linkToPrevious) { +std::shared_ptr<core::Processor> TestPlan::addProcessor(const std::string &processor_name, const std::string &name, core::Relationship relationship, bool linkToPrevious) { if (finalized) { return nullptr; } @@ -113,10 +111,7 @@ bool linkToPrevious) { return addProcessor(processor, name, relationship, linkToPrevious); } -bool TestPlan::setProperty(const std::shared_ptr<core::Processor> proc, - const std::string &prop, - const std::string &value, - bool dynamic) { +bool TestPlan::setProperty(const std::shared_ptr<core::Processor> proc, const std::string &prop, const std::string &value, bool dynamic) { std::lock_guard<std::recursive_mutex> guard(mutex); int32_t i = 0; logger_->log_info("Attempting to set property %s %s for %s", prop, value, proc->getName()); @@ -149,7 +144,6 @@ void TestPlan::reset() { } } - bool TestPlan::runNextProcessor(std::function<void(const std::shared_ptr<core::ProcessContext>, const std::shared_ptr<core::ProcessSession>)> verify) { if (!finalized) { finalize(); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/libminifi/test/integration/IntegrationBase.h ---------------------------------------------------------------------- diff --git a/libminifi/test/integration/IntegrationBase.h b/libminifi/test/integration/IntegrationBase.h index 887b1a9..cb86b7e 100644 --- a/libminifi/test/integration/IntegrationBase.h +++ b/libminifi/test/integration/IntegrationBase.h @@ -97,8 +97,7 @@ void IntegrationBase::run(std::string test_file_location) { std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); content_repo->initialize(configuration); - std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared - <minifi::io::StreamFactory>(configuration); + std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration); std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr <core::YamlConfiguration >(new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/libminifi/test/integration/ProvenanceReportingTest.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/integration/ProvenanceReportingTest.cpp b/libminifi/test/integration/ProvenanceReportingTest.cpp index 0e8d52f..7fe3c3e 100644 --- a/libminifi/test/integration/ProvenanceReportingTest.cpp +++ b/libminifi/test/integration/ProvenanceReportingTest.cpp @@ -58,7 +58,7 @@ int main(int argc, char **argv) { std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<TestFlowRepository>(); configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location); - std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared<minifi::io::StreamFactory>(configuration); + std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration); std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>( new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location)); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/libminifi/test/integration/SecureSocketGetTCPTest.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/integration/SecureSocketGetTCPTest.cpp b/libminifi/test/integration/SecureSocketGetTCPTest.cpp index f9d4261..cc0fc26 100644 --- a/libminifi/test/integration/SecureSocketGetTCPTest.cpp +++ b/libminifi/test/integration/SecureSocketGetTCPTest.cpp @@ -128,7 +128,7 @@ class SecureSocketTest : public IntegrationBase { std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); content_repo->initialize(configuration); - std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared<minifi::io::StreamFactory>(configuration); + std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration); std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>( new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location)); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/libminifi/test/unit/ControllerTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/ControllerTests.cpp b/libminifi/test/unit/ControllerTests.cpp index 49cb759..c5268ab 100644 --- a/libminifi/test/unit/ControllerTests.cpp +++ b/libminifi/test/unit/ControllerTests.cpp @@ -172,7 +172,7 @@ TEST_CASE("TestGet", "[test1]") { minifi::c2::ControllerSocketProtocol protocol("testprotocol"); protocol.initialize(nullptr, ptr, configuration); - auto stream_factory = std::make_shared<minifi::io::StreamFactory>(configuration); + auto stream_factory = minifi::io::StreamFactory::getInstance(configuration); auto socket = stream_factory->createSocket("localhost", 9997); @@ -205,7 +205,7 @@ TEST_CASE("TestClear", "[test1]") { minifi::c2::ControllerSocketProtocol protocol("testprotocol"); protocol.initialize(nullptr, ptr, configuration); - auto stream_factory = std::make_shared<minifi::io::StreamFactory>(configuration); + auto stream_factory = minifi::io::StreamFactory::getInstance(configuration); auto socket = stream_factory->createSocket("localhost", 9997); @@ -241,7 +241,7 @@ TEST_CASE("TestUpdate", "[test1]") { minifi::c2::ControllerSocketProtocol protocol("testprotocol"); protocol.initialize(nullptr, ptr, configuration); - auto stream_factory = std::make_shared<minifi::io::StreamFactory>(configuration); + auto stream_factory = minifi::io::StreamFactory::getInstance(configuration); auto socket = stream_factory->createSocket("localhost", 9997); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/libminifi/test/unit/GetTCPTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/GetTCPTests.cpp b/libminifi/test/unit/GetTCPTests.cpp index 6e28bf1..34765e7 100644 --- a/libminifi/test/unit/GetTCPTests.cpp +++ b/libminifi/test/unit/GetTCPTests.cpp @@ -46,7 +46,7 @@ TEST_CASE("GetTCPWithoutEOM", "[GetTCP1]") { content_repo->initialize(std::make_shared<minifi::Configure>()); std::shared_ptr<org::apache::nifi::minifi::io::SocketContext> socket_context = std::make_shared<org::apache::nifi::minifi::io::SocketContext>(std::make_shared<minifi::Configure>()); - std::shared_ptr<org::apache::nifi::minifi::io::StreamFactory> stream_factory = std::make_shared<org::apache::nifi::minifi::io::StreamFactory>(std::make_shared<minifi::Configure>()); + std::shared_ptr<org::apache::nifi::minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(std::make_shared<minifi::Configure>()); org::apache::nifi::minifi::io::ServerSocket server(socket_context, "localhost", 9184, 1); REQUIRE(-1 != server.initialize()); @@ -154,7 +154,7 @@ TEST_CASE("GetTCPWithOEM", "[GetTCP2]") { content_repo->initialize(std::make_shared<minifi::Configure>()); std::shared_ptr<org::apache::nifi::minifi::io::SocketContext> socket_context = std::make_shared<org::apache::nifi::minifi::io::SocketContext>(std::make_shared<minifi::Configure>()); - std::shared_ptr<org::apache::nifi::minifi::io::StreamFactory> stream_factory = std::make_shared<org::apache::nifi::minifi::io::StreamFactory>(std::make_shared<minifi::Configure>()); + std::shared_ptr<org::apache::nifi::minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(std::make_shared<minifi::Configure>()); TestController testController; @@ -278,7 +278,7 @@ TEST_CASE("GetTCPWithOnlyOEM", "[GetTCP3]") { content_repo->initialize(std::make_shared<minifi::Configure>()); std::shared_ptr<org::apache::nifi::minifi::io::SocketContext> socket_context = std::make_shared<org::apache::nifi::minifi::io::SocketContext>(std::make_shared<minifi::Configure>()); - std::shared_ptr<org::apache::nifi::minifi::io::StreamFactory> stream_factory = std::make_shared<org::apache::nifi::minifi::io::StreamFactory>(std::make_shared<minifi::Configure>()); + std::shared_ptr<org::apache::nifi::minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(std::make_shared<minifi::Configure>()); TestController testController; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/libminifi/test/unit/NetworkPrioritizerServiceTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/NetworkPrioritizerServiceTests.cpp b/libminifi/test/unit/NetworkPrioritizerServiceTests.cpp new file mode 100644 index 0000000..765487e --- /dev/null +++ b/libminifi/test/unit/NetworkPrioritizerServiceTests.cpp @@ -0,0 +1,168 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <uuid/uuid.h> +#include <vector> +#include <memory> +#include <utility> +#include <string> +#include "../TestBase.h" +#include "io/ClientSocket.h" +#include "core/Processor.h" +#include "../../controller/Controller.h" +#include "core/controller/ControllerService.h" +#include "c2/ControllerSocketProtocol.h" +#include "controllers/NetworkPrioritizerService.h" +#include "state/UpdateController.h" + +TEST_CASE("TestPrioritizerOneInterface", "[test1]") { + auto controller = std::make_shared<minifi::controllers::NetworkPrioritizerService>("TestService"); + std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>(); + controller->initialize(); + controller->setProperty(minifi::controllers::NetworkPrioritizerService::NetworkControllers, "eth0,eth1"); + controller->setProperty(minifi::controllers::NetworkPrioritizerService::VerifyInterfaces, "false"); + controller->setProperty(minifi::controllers::NetworkPrioritizerService::MaxThroughput, "10"); + controller->setProperty(minifi::controllers::NetworkPrioritizerService::MaxPayload, "10"); + controller->onEnable(); + REQUIRE("eth0" == controller->getInterface(0).getInterface()); +} + +TEST_CASE("TestPrioritizerOneInterfaceMaxPayload", "[test2]") { + auto controller = std::make_shared<minifi::controllers::NetworkPrioritizerService>("TestService"); + std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>(); + controller->initialize(); + controller->setProperty(minifi::controllers::NetworkPrioritizerService::NetworkControllers, "eth0,eth1"); + controller->setProperty(minifi::controllers::NetworkPrioritizerService::VerifyInterfaces, "false"); + controller->setProperty(minifi::controllers::NetworkPrioritizerService::MaxThroughput, "1"); + controller->setProperty(minifi::controllers::NetworkPrioritizerService::MaxPayload, "1"); + controller->onEnable(); + // can't because we've triggered the max payload + REQUIRE("" == controller->getInterface(5).getInterface()); +} + +TEST_CASE("TestPrioritizerOneInterfaceMaxThroughput", "[test3]") { + auto controller = std::make_shared<minifi::controllers::NetworkPrioritizerService>("TestService"); + std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>(); + controller->initialize(); + controller->setProperty(minifi::controllers::NetworkPrioritizerService::NetworkControllers, "eth0,eth1"); + controller->setProperty(minifi::controllers::NetworkPrioritizerService::VerifyInterfaces, "false"); + controller->setProperty(minifi::controllers::NetworkPrioritizerService::MaxThroughput, "10"); + controller->onEnable(); + // can't because we've triggered the max payload + REQUIRE("eth0" == controller->getInterface(5).getInterface()); + REQUIRE("eth0" == controller->getInterface(5).getInterface()); + REQUIRE("" == controller->getInterface(5).getInterface()); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + REQUIRE("eth0" == controller->getInterface(5).getInterface()); +} + +TEST_CASE("TestPriorotizerMultipleInterfaces", "[test4]") { + auto controller = std::make_shared<minifi::controllers::NetworkPrioritizerService>("TestService"); + auto controller2 = std::make_shared<minifi::controllers::NetworkPrioritizerService>("TestService2"); + auto controller3 = std::make_shared<minifi::controllers::NetworkPrioritizerService>("TestService3"); + std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>(); + controller->initialize(); + controller->setProperty(minifi::controllers::NetworkPrioritizerService::VerifyInterfaces, "false"); + + controller3->initialize(); + controller3->setProperty(minifi::controllers::NetworkPrioritizerService::NetworkControllers, "eth0"); + controller3->setProperty(minifi::controllers::NetworkPrioritizerService::VerifyInterfaces, "false"); + controller3->setProperty(minifi::controllers::NetworkPrioritizerService::MaxThroughput, "10"); + controller3->onEnable(); + + controller2->initialize(); + controller2->setProperty(minifi::controllers::NetworkPrioritizerService::NetworkControllers, "eth1"); + controller2->setProperty(minifi::controllers::NetworkPrioritizerService::VerifyInterfaces, "false"); + controller2->setProperty(minifi::controllers::NetworkPrioritizerService::MaxThroughput, "10"); + controller2->onEnable(); + std::vector<std::shared_ptr<core::controller::ControllerService> > services; + services.push_back(controller2); + services.push_back(controller3); + controller->setLinkedControllerServices(services); + controller->onEnable(); + // can't because we've triggered the max payload + REQUIRE("eth1" == controller->getInterface(5).getInterface()); + REQUIRE("eth1" == controller->getInterface(5).getInterface()); + REQUIRE("eth0" == controller->getInterface(5).getInterface()); + REQUIRE("eth0" == controller->getInterface(5).getInterface()); +} + +TEST_CASE("TestPriorotizerMultipleInterfacesNeverSwitch", "[test5]") { + auto controller = std::make_shared<minifi::controllers::NetworkPrioritizerService>("TestService"); + auto controller2 = std::make_shared<minifi::controllers::NetworkPrioritizerService>("TestService2"); + auto controller3 = std::make_shared<minifi::controllers::NetworkPrioritizerService>("TestService3"); + std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>(); + controller->initialize(); + controller->setProperty(minifi::controllers::NetworkPrioritizerService::VerifyInterfaces, "false"); + + controller3->initialize(); + controller3->setProperty(minifi::controllers::NetworkPrioritizerService::NetworkControllers, "eth0"); + controller3->setProperty(minifi::controllers::NetworkPrioritizerService::VerifyInterfaces, "false"); + controller3->setProperty(minifi::controllers::NetworkPrioritizerService::MaxThroughput, "1000"); + controller3->onEnable(); + + controller2->initialize(); + controller2->setProperty(minifi::controllers::NetworkPrioritizerService::NetworkControllers, "eth1"); + controller2->setProperty(minifi::controllers::NetworkPrioritizerService::VerifyInterfaces, "false"); + controller2->setProperty(minifi::controllers::NetworkPrioritizerService::MaxThroughput, "10"); + controller2->onEnable(); + std::vector<std::shared_ptr<core::controller::ControllerService> > services; + services.push_back(controller3); + services.push_back(controller2); + controller->setLinkedControllerServices(services); + controller->onEnable(); + // can't because we've triggered the max payload + for (int i = 0; i < 50; i++) { + REQUIRE("eth0" == controller->getInterface(5).getInterface()); + REQUIRE("eth0" == controller->getInterface(5).getInterface()); + REQUIRE("eth0" == controller->getInterface(5).getInterface()); + REQUIRE("eth0" == controller->getInterface(5).getInterface()); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } +} + + +TEST_CASE("TestPriorotizerMultipleInterfacesMaxPayload", "[test4]") { + auto controller = std::make_shared<minifi::controllers::NetworkPrioritizerService>("TestService"); + auto controller2 = std::make_shared<minifi::controllers::NetworkPrioritizerService>("TestService2"); + auto controller3 = std::make_shared<minifi::controllers::NetworkPrioritizerService>("TestService3"); + std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>(); + controller->initialize(); + controller->setProperty(minifi::controllers::NetworkPrioritizerService::VerifyInterfaces, "false"); + + controller3->initialize(); + controller3->setProperty(minifi::controllers::NetworkPrioritizerService::NetworkControllers, "eth0"); + controller3->setProperty(minifi::controllers::NetworkPrioritizerService::VerifyInterfaces, "false"); + controller3->setProperty(minifi::controllers::NetworkPrioritizerService::MaxThroughput, "1000"); + + controller3->onEnable(); + + controller2->initialize(); + controller2->setProperty(minifi::controllers::NetworkPrioritizerService::NetworkControllers, "eth1"); + controller2->setProperty(minifi::controllers::NetworkPrioritizerService::VerifyInterfaces, "false"); + controller2->setProperty(minifi::controllers::NetworkPrioritizerService::MaxThroughput, "10"); + controller3->setProperty(minifi::controllers::NetworkPrioritizerService::MaxPayload, "10"); + controller2->onEnable(); + std::vector<std::shared_ptr<core::controller::ControllerService> > services; + services.push_back(controller2); + services.push_back(controller3); + controller->setLinkedControllerServices(services); + controller->onEnable(); + // can't because we've triggered the max payload + REQUIRE("eth0" == controller->getInterface(50).getInterface()); + REQUIRE("eth0" == controller->getInterface(50).getInterface()); +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/libminifi/test/unit/ProcessorTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/ProcessorTests.cpp b/libminifi/test/unit/ProcessorTests.cpp index 6e9ae83..8810763 100644 --- a/libminifi/test/unit/ProcessorTests.cpp +++ b/libminifi/test/unit/ProcessorTests.cpp @@ -257,7 +257,7 @@ TEST_CASE("Test Find file", "[getfileCreate3]") { std::shared_ptr<TestPlan> plan = testController.createPlan(); std::shared_ptr<core::Processor> processor = plan->addProcessor("GetFile", "getfileCreate2"); std::shared_ptr<core::Processor> processorReport = std::make_shared<org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask>( - std::make_shared<org::apache::nifi::minifi::io::StreamFactory>(std::make_shared<org::apache::nifi::minifi::Configure>()), std::make_shared<org::apache::nifi::minifi::Configure>()); + minifi::io::StreamFactory::getInstance(std::make_shared<org::apache::nifi::minifi::Configure>()), std::make_shared<org::apache::nifi::minifi::Configure>()); plan->addProcessor(processorReport, "reporter", core::Relationship("success", "description"), false); char format[] = "/tmp/gt.XXXXXX"; char *dir = testController.createTempDirectory(format); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/libminifi/test/unit/SocketTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/SocketTests.cpp b/libminifi/test/unit/SocketTests.cpp index 7e78846..352e6fd 100644 --- a/libminifi/test/unit/SocketTests.cpp +++ b/libminifi/test/unit/SocketTests.cpp @@ -220,9 +220,9 @@ TEST_CASE("TestTLSContextCreation", "[TestSocket6]") { TEST_CASE("TestTLSContextCreation2", "[TestSocket7]") { std::shared_ptr<minifi::Configure> configure = std::make_shared<minifi::Configure>(); configure->set("nifi.remote.input.secure", "false"); - minifi::io::StreamFactory factory(configure); + auto factory = minifi::io::StreamFactory::getInstance(configure); std::string host = "localhost"; - minifi::io::Socket *socket = factory.createSocket(host, 10001).release(); + minifi::io::Socket *socket = factory->createSocket(host, 10001).release(); minifi::io::TLSSocket *tls = dynamic_cast<minifi::io::TLSSocket*>(socket); REQUIRE(tls == nullptr); } @@ -234,9 +234,9 @@ TEST_CASE("TestTLSContextCreation2", "[TestSocket7]") { TEST_CASE("TestTLSContextCreationNullptr", "[TestSocket7]") { std::shared_ptr<minifi::Configure> configure = std::make_shared<minifi::Configure>(); configure->set("nifi.remote.input.secure", "false"); - minifi::io::StreamFactory factory(configure); + auto factory = minifi::io::StreamFactory::getInstance(configure); std::string host = "localhost"; - minifi::io::Socket *socket = factory.createSecureSocket(host, 10001, nullptr).release(); + minifi::io::Socket *socket = factory->createSecureSocket(host, 10001, nullptr).release(); minifi::io::TLSSocket *tls = dynamic_cast<minifi::io::TLSSocket*>(socket); REQUIRE(tls == nullptr); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/libminifi/test/unit/YamlConfigurationTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/YamlConfigurationTests.cpp b/libminifi/test/unit/YamlConfigurationTests.cpp index 724fc64..b05c402 100644 --- a/libminifi/test/unit/YamlConfigurationTests.cpp +++ b/libminifi/test/unit/YamlConfigurationTests.cpp @@ -30,7 +30,7 @@ TEST_CASE("Test YAML Config Processing", "[YamlConfiguration]") { std::shared_ptr<core::Repository> testProvRepo = core::createRepository("provenancerepository", true); std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository", true); std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>(); - std::shared_ptr<minifi::io::StreamFactory> streamFactory = std::make_shared<minifi::io::StreamFactory>(configuration); + std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration); std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); core::YamlConfiguration *yamlConfig = @@ -197,7 +197,7 @@ TEST_CASE("Test YAML v3 Config Processing", "[YamlConfiguration3]") { std::shared_ptr<core::Repository> testProvRepo = core::createRepository("provenancerepository", true); std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository", true); std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>(); - std::shared_ptr<minifi::io::StreamFactory> streamFactory = std::make_shared<minifi::io::StreamFactory>(configuration); + std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration); std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); core::YamlConfiguration *yamlConfig = @@ -350,7 +350,7 @@ TEST_CASE("Test Dynamic Unsupported", "[YamlConfigurationDynamicUnsupported]") { std::shared_ptr<core::Repository> testProvRepo = core::createRepository("provenancerepository", true); std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository", true); std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>(); - std::shared_ptr<minifi::io::StreamFactory> streamFactory = std::make_shared<minifi::io::StreamFactory>(configuration); + std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration); std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); core::YamlConfiguration *yamlConfig = @@ -388,7 +388,7 @@ TEST_CASE("Test Required Property", "[YamlConfigurationRequiredProperty]") { std::shared_ptr<core::Repository> testProvRepo = core::createRepository("provenancerepository", true); std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository", true); std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>(); - std::shared_ptr<minifi::io::StreamFactory> streamFactory = std::make_shared<minifi::io::StreamFactory>(configuration); + std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration); std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); core::YamlConfiguration *yamlConfig = @@ -433,7 +433,7 @@ TEST_CASE("Test Required Property 2", "[YamlConfigurationRequiredProperty2]") { std::shared_ptr<core::Repository> testProvRepo = core::createRepository("provenancerepository", true); std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository", true); std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>(); - std::shared_ptr<minifi::io::StreamFactory> streamFactory = std::make_shared<minifi::io::StreamFactory>(configuration); + std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration); std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); core::YamlConfiguration *yamlConfig = http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a672dae/main/MiNiFiMain.cpp ---------------------------------------------------------------------- diff --git a/main/MiNiFiMain.cpp b/main/MiNiFiMain.cpp index 868b65f..65d79ed 100644 --- a/main/MiNiFiMain.cpp +++ b/main/MiNiFiMain.cpp @@ -183,7 +183,7 @@ int main(int argc, char **argv) { configure->get(minifi::Configure::nifi_configuration_class_name, nifi_configuration_class_name); - std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared<minifi::io::StreamFactory>(configure); + std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configure); std::unique_ptr<core::FlowConfiguration> flow_configuration = core::createFlowConfiguration(prov_repo, flow_repo, content_repo, configure, stream_factory, nifi_configuration_class_name);
