Repository: nifi-minifi-cpp Updated Branches: refs/heads/master 7647b7c20 -> 372f2d6f8
MINIFI-258 - Removing Configure, StreamFactory, TLSContext singletons This closes #91. Signed-off-by: Aldrin Piri <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/372f2d6f Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/372f2d6f Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/372f2d6f Branch: refs/heads/master Commit: 372f2d6f80ec040571665cea267de011e12343a2 Parents: 7647b7c Author: Bryan Rosander <[email protected]> Authored: Mon May 1 14:49:44 2017 -0400 Committer: Aldrin Piri <[email protected]> Committed: Wed May 10 15:38:24 2017 -0400 ---------------------------------------------------------------------- libminifi/include/EventDrivenSchedulingAgent.h | 4 +- libminifi/include/FlowControlProtocol.h | 11 +-- libminifi/include/FlowController.h | 4 +- libminifi/include/RemoteProcessorGroupPort.h | 6 +- libminifi/include/ResourceClaim.h | 2 - libminifi/include/SchedulingAgent.h | 3 - libminifi/include/Site2SiteClientProtocol.h | 3 - libminifi/include/Site2SitePeer.h | 5 -- libminifi/include/ThreadedSchedulingAgent.h | 5 +- libminifi/include/TimerDrivenSchedulingAgent.h | 4 +- libminifi/include/core/ConfigurationFactory.h | 14 ++-- libminifi/include/core/FlowConfiguration.h | 4 +- libminifi/include/core/Processor.h | 3 +- libminifi/include/core/Repository.h | 5 +- libminifi/include/core/logging/LogAppenders.h | 8 +- .../SiteToSiteProvenanceReportingTask.h | 4 +- .../core/repository/FlowFileRepository.h | 8 +- libminifi/include/core/yaml/YamlConfiguration.h | 8 +- libminifi/include/io/ClientSocket.h | 18 ++++- libminifi/include/io/StreamFactory.h | 84 ++------------------ libminifi/include/io/tls/TLSSocket.h | 48 +++-------- libminifi/include/properties/Configure.h | 23 ++---- .../include/provenance/ProvenanceRepository.h | 8 +- libminifi/src/Configure.cpp | 3 +- libminifi/src/FlowController.cpp | 14 ++-- libminifi/src/RemoteProcessorGroupPort.cpp | 2 +- libminifi/src/ResourceClaim.cpp | 2 - libminifi/src/core/ConfigurationFactory.cpp | 11 ++- libminifi/src/core/FlowConfiguration.cpp | 2 +- libminifi/src/core/Processor.cpp | 8 +- .../SiteToSiteProvenanceReportingTask.cpp | 2 +- libminifi/src/core/yaml/YamlConfiguration.cpp | 2 +- libminifi/src/io/ClientSocket.cpp | 7 +- libminifi/src/io/StreamFactory.cpp | 65 ++++++++++++++- libminifi/src/io/tls/TLSSocket.cpp | 39 ++++----- libminifi/test/HttpGetIntegrationTest.cpp | 10 ++- libminifi/test/HttpPostIntegrationTest.cpp | 11 ++- libminifi/test/ProcessorTests.cpp | 2 +- libminifi/test/SocketTests.cpp | 28 ++++--- libminifi/test/TestBase.h | 6 +- libminifi/test/nodefs/NoYamlConfiguration.cpp | 2 +- libminifi/test/unit/LoggerTests.cpp | 2 +- libminifi/test/unit/ProvenanceTestHelper.h | 2 +- libminifi/test/unit/RepoTests.cpp | 7 +- libminifi/test/unit/Site2SiteTests.cpp | 2 +- libminifi/test/unit/YamlCongifurationTests.cpp | 4 +- main/MiNiFiMain.cpp | 11 +-- 47 files changed, 251 insertions(+), 275 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/372f2d6f/libminifi/include/EventDrivenSchedulingAgent.h ---------------------------------------------------------------------- diff --git a/libminifi/include/EventDrivenSchedulingAgent.h b/libminifi/include/EventDrivenSchedulingAgent.h index 9d53c5c..22a68f3 100644 --- a/libminifi/include/EventDrivenSchedulingAgent.h +++ b/libminifi/include/EventDrivenSchedulingAgent.h @@ -38,8 +38,8 @@ class EventDrivenSchedulingAgent : public ThreadedSchedulingAgent { /*! * Create a new processor */ - EventDrivenSchedulingAgent(std::shared_ptr<core::Repository> repo) - : ThreadedSchedulingAgent(repo) { + EventDrivenSchedulingAgent(std::shared_ptr<core::Repository> repo, std::shared_ptr<Configure> configure) + : ThreadedSchedulingAgent(repo, configure) { } // Destructor virtual ~EventDrivenSchedulingAgent() { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/372f2d6f/libminifi/include/FlowControlProtocol.h ---------------------------------------------------------------------- diff --git a/libminifi/include/FlowControlProtocol.h b/libminifi/include/FlowControlProtocol.h index 826f999..6454c59 100644 --- a/libminifi/include/FlowControlProtocol.h +++ b/libminifi/include/FlowControlProtocol.h @@ -157,10 +157,9 @@ class FlowControlProtocol { /*! * Create a new control protocol */ - FlowControlProtocol(FlowController *controller) { + FlowControlProtocol(FlowController *controller, const std::shared_ptr<Configure> &configure) { _controller = controller; logger_ = logging::Logger::getLogger(); - configure_ = Configure::getConfigure(); _socket = 0; _serverName = "localhost"; _serverPort = DEFAULT_NIFI_SERVER_PORT; @@ -173,15 +172,15 @@ class FlowControlProtocol { std::string value; - if (configure_->get(Configure::nifi_server_name, value)) { + if (configure->get(Configure::nifi_server_name, value)) { _serverName = value; logger_->log_info("NiFi Server Name %s", _serverName.c_str()); } - if (configure_->get(Configure::nifi_server_port, value) + if (configure->get(Configure::nifi_server_port, value) && core::Property::StringToInt(value, _serverPort)) { logger_->log_info("NiFi Server Port: [%d]", _serverPort); } - if (configure_->get(Configure::nifi_server_report_interval, value)) { + if (configure->get(Configure::nifi_server_report_interval, value)) { core::TimeUnit unit; if (core::Property::StringToTime(value, _reportInterval, unit) && core::Property::ConvertTimeUnitToMS(_reportInterval, unit, @@ -276,8 +275,6 @@ class FlowControlProtocol { std::mutex mutex_; // Logger std::shared_ptr<logging::Logger> logger_; - // Configure - Configure *configure_ = NULL; // NiFi server Name std::string _serverName; // NiFi server port http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/372f2d6f/libminifi/include/FlowController.h ---------------------------------------------------------------------- diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h index e007f80..187448b 100644 --- a/libminifi/include/FlowController.h +++ b/libminifi/include/FlowController.h @@ -66,6 +66,7 @@ class FlowController : public core::CoreComponent { */ FlowController(std::shared_ptr<core::Repository> provenance_repo, std::shared_ptr<core::Repository> flow_file_repo, + std::shared_ptr<Configure> configure, std::unique_ptr<core::FlowConfiguration> flow_configuration, const std::string name = DEFAULT_ROOT_GROUP_NAME, bool headless_mode = false); @@ -145,9 +146,6 @@ class FlowController : public core::CoreComponent { // flow controller mutex std::recursive_mutex mutex_; - // configuration object - Configure *configure_; - // Configuration File Name std::string configuration_file_name_; // NiFi property File Name http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/372f2d6f/libminifi/include/RemoteProcessorGroupPort.h ---------------------------------------------------------------------- diff --git a/libminifi/include/RemoteProcessorGroupPort.h b/libminifi/include/RemoteProcessorGroupPort.h index f8aac38..cc96a0b 100644 --- a/libminifi/include/RemoteProcessorGroupPort.h +++ b/libminifi/include/RemoteProcessorGroupPort.h @@ -27,6 +27,7 @@ #include "core/Processor.h" #include "core/ProcessSession.h" #include "Site2SiteClientProtocol.h" +#include "io/StreamFactory.h" namespace org { namespace apache { @@ -39,10 +40,11 @@ class RemoteProcessorGroupPort : public core::Processor { /*! * Create a new processor */ - RemoteProcessorGroupPort(std::string name, uuid_t uuid = NULL) + RemoteProcessorGroupPort(const std::shared_ptr<io::StreamFactory> &stream_factory, std::string name, uuid_t uuid = NULL) : core::Processor(name, uuid), direction_(SEND), transmitting_(false) { + stream_factory_ = stream_factory; logger_ = logging::Logger::getLogger(); uuid_copy(protocol_uuid_, uuid); } @@ -82,7 +84,7 @@ class RemoteProcessorGroupPort : public core::Processor { protected: private: - + std::shared_ptr<io::StreamFactory> stream_factory_; std::unique_ptr<Site2SiteClientProtocol> getNextProtocol(); void returnProtocol(std::unique_ptr<Site2SiteClientProtocol> protocol); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/372f2d6f/libminifi/include/ResourceClaim.h ---------------------------------------------------------------------- diff --git a/libminifi/include/ResourceClaim.h b/libminifi/include/ResourceClaim.h index 1f5d17f..426d0dc 100644 --- a/libminifi/include/ResourceClaim.h +++ b/libminifi/include/ResourceClaim.h @@ -84,8 +84,6 @@ class ResourceClaim { std::atomic<uint64_t> _flowFileRecordOwnedCount; private: - // Configure - Configure *configure_; // Logger std::shared_ptr<logging::Logger> logger_; // Prevent default copy constructor and assignment operation http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/372f2d6f/libminifi/include/SchedulingAgent.h ---------------------------------------------------------------------- diff --git a/libminifi/include/SchedulingAgent.h b/libminifi/include/SchedulingAgent.h index e7d1e58..30df071 100644 --- a/libminifi/include/SchedulingAgent.h +++ b/libminifi/include/SchedulingAgent.h @@ -50,7 +50,6 @@ class SchedulingAgent { * Create a new processor */ SchedulingAgent(std::shared_ptr<core::Repository> repo) { - configure_ = Configure::getConfigure(); logger_ = logging::Logger::getLogger(); running_ = false; repo_ = repo; @@ -87,8 +86,6 @@ class SchedulingAgent { protected: // Logger std::shared_ptr<logging::Logger> logger_; - // Configure - Configure *configure_; // Mutex for protection std::mutex mutex_; // Whether it is running http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/372f2d6f/libminifi/include/Site2SiteClientProtocol.h ---------------------------------------------------------------------- diff --git a/libminifi/include/Site2SiteClientProtocol.h b/libminifi/include/Site2SiteClientProtocol.h index b59c885..109b422 100644 --- a/libminifi/include/Site2SiteClientProtocol.h +++ b/libminifi/include/Site2SiteClientProtocol.h @@ -400,7 +400,6 @@ class Site2SiteClientProtocol { */ Site2SiteClientProtocol(std::unique_ptr<Site2SitePeer> peer) { logger_ = logging::Logger::getLogger(); - configure_ = Configure::getConfigure(); peer_ = std::move(peer); _batchSize = 0; _batchCount = 0; @@ -601,8 +600,6 @@ class Site2SiteClientProtocol { std::mutex mutex_; // Logger std::shared_ptr<logging::Logger> logger_; - // Configure - Configure *configure_; // Batch Count std::atomic<uint64_t> _batchCount; // Batch Size http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/372f2d6f/libminifi/include/Site2SitePeer.h ---------------------------------------------------------------------- diff --git a/libminifi/include/Site2SitePeer.h b/libminifi/include/Site2SitePeer.h index a315293..a79c240 100644 --- a/libminifi/include/Site2SitePeer.h +++ b/libminifi/include/Site2SitePeer.h @@ -64,7 +64,6 @@ class Site2SitePeer : public org::apache::nifi::minifi::io::BaseStream { port_(port_), stream_(injected_socket.release()) { logger_ = logging::Logger::getLogger(); - configure_ = Configure::getConfigure(); _yieldExpiration = 0; _timeOut = 30000; // 30 seconds _url = "nifi://" + host_ + ":" + std::to_string(port_); @@ -75,7 +74,6 @@ class Site2SitePeer : public org::apache::nifi::minifi::io::BaseStream { host_(std::move(ss.host_)), port_(std::move(ss.port_)) { logger_ = logging::Logger::getLogger(); - configure_ = Configure::getConfigure(); _yieldExpiration.store(ss._yieldExpiration); _timeOut.store(ss._timeOut); _url = std::move(ss._url); @@ -238,7 +236,6 @@ class Site2SitePeer : public org::apache::nifi::minifi::io::BaseStream { host_ = std::move(other.host_); port_ = std::move(other.port_); logger_ = logging::Logger::getLogger(); - configure_ = Configure::getConfigure(); _yieldExpiration = 0; _timeOut = 30000; // 30 seconds _url = "nifi://" + host_ + ":" + std::to_string(port_); @@ -266,8 +263,6 @@ class Site2SitePeer : public org::apache::nifi::minifi::io::BaseStream { std::atomic<uint64_t> _timeOut; // Logger std::shared_ptr<logging::Logger> logger_; - // Configure - Configure *configure_; // Yield Period in Milliseconds std::atomic<uint64_t> _yieldPeriodMsec; // Yield Expiration http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/372f2d6f/libminifi/include/ThreadedSchedulingAgent.h ---------------------------------------------------------------------- diff --git a/libminifi/include/ThreadedSchedulingAgent.h b/libminifi/include/ThreadedSchedulingAgent.h index 4e39da3..044b3c3 100644 --- a/libminifi/include/ThreadedSchedulingAgent.h +++ b/libminifi/include/ThreadedSchedulingAgent.h @@ -42,8 +42,9 @@ class ThreadedSchedulingAgent : public SchedulingAgent { /*! * Create a new processor */ - ThreadedSchedulingAgent(std::shared_ptr<core::Repository> repo) + ThreadedSchedulingAgent(std::shared_ptr<core::Repository> repo, std::shared_ptr<Configure> configure) : SchedulingAgent(repo) { + configure_ = configure; } // Destructor virtual ~ThreadedSchedulingAgent() { @@ -69,7 +70,7 @@ class ThreadedSchedulingAgent : public SchedulingAgent { // Only support pass by reference or pointer ThreadedSchedulingAgent(const ThreadedSchedulingAgent &parent); ThreadedSchedulingAgent &operator=(const ThreadedSchedulingAgent &parent); - + std::shared_ptr<Configure> configure_; }; } /* namespace minifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/372f2d6f/libminifi/include/TimerDrivenSchedulingAgent.h ---------------------------------------------------------------------- diff --git a/libminifi/include/TimerDrivenSchedulingAgent.h b/libminifi/include/TimerDrivenSchedulingAgent.h index 7da2abd..31d089b 100644 --- a/libminifi/include/TimerDrivenSchedulingAgent.h +++ b/libminifi/include/TimerDrivenSchedulingAgent.h @@ -37,8 +37,8 @@ class TimerDrivenSchedulingAgent : public ThreadedSchedulingAgent { /*! * Create a new processor */ - TimerDrivenSchedulingAgent(std::shared_ptr<core::Repository> repo) - : ThreadedSchedulingAgent(repo) { + TimerDrivenSchedulingAgent(std::shared_ptr<core::Repository> repo, std::shared_ptr<Configure> configure) + : ThreadedSchedulingAgent(repo, configure) { } // Destructor virtual ~TimerDrivenSchedulingAgent() { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/372f2d6f/libminifi/include/core/ConfigurationFactory.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/ConfigurationFactory.h b/libminifi/include/core/ConfigurationFactory.h index b25faff..bf631d4 100644 --- a/libminifi/include/core/ConfigurationFactory.h +++ b/libminifi/include/core/ConfigurationFactory.h @@ -30,16 +30,18 @@ namespace core { template<typename T> typename std::enable_if<!class_operations<T>::value, T*>::type instantiate( - std::shared_ptr<core::Repository> repo, - std::shared_ptr<core::Repository> flow_file_repo, const std::string path) { + const std::shared_ptr<core::Repository> &repo, + const std::shared_ptr<core::Repository> &flow_file_repo, const std::string path) { throw std::runtime_error("Cannot instantiate class"); } template<typename T> typename std::enable_if<class_operations<T>::value, T*>::type instantiate( - std::shared_ptr<core::Repository> repo, - std::shared_ptr<core::Repository> flow_file_repo, const std::string path) { - return new T(repo, flow_file_repo, path); + const std::shared_ptr<core::Repository> &repo, + const std::shared_ptr<core::Repository> &flow_file_repo, + const std::shared_ptr<io::StreamFactory> &stream_factory, + const std::string path) { + return new T(repo, flow_file_repo, stream_factory, path); } /** @@ -49,6 +51,8 @@ typename std::enable_if<class_operations<T>::value, T*>::type instantiate( std::unique_ptr<core::FlowConfiguration> createFlowConfiguration( std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_file_repo, + std::shared_ptr<Configure> configure, + std::shared_ptr<io::StreamFactory> stream_factory, const std::string configuration_class_name, const std::string path = "", bool fail_safe = false); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/372f2d6f/libminifi/include/core/FlowConfiguration.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/FlowConfiguration.h b/libminifi/include/core/FlowConfiguration.h index 79e400d..96f0b4a 100644 --- a/libminifi/include/core/FlowConfiguration.h +++ b/libminifi/include/core/FlowConfiguration.h @@ -38,6 +38,7 @@ #include "core/ProcessContext.h" #include "core/ProcessSession.h" #include "core/ProcessGroup.h" +#include "io/StreamFactory.h" namespace org { namespace apache { @@ -57,6 +58,7 @@ class FlowConfiguration : public CoreComponent { */ FlowConfiguration(std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_file_repo, + std::shared_ptr<io::StreamFactory> stream_factory, const std::string path) : CoreComponent(core::getClassName<FlowConfiguration>()), flow_file_repo_(flow_file_repo), @@ -108,7 +110,7 @@ class FlowConfiguration : public CoreComponent { std::string config_path_; // flow file repo std::shared_ptr<core::Repository> flow_file_repo_; - + std::shared_ptr<io::StreamFactory> stream_factory_; }; } /* namespace core */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/372f2d6f/libminifi/include/core/Processor.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/Processor.h b/libminifi/include/core/Processor.h index c00d4ce..9f9b31a 100644 --- a/libminifi/include/core/Processor.h +++ b/libminifi/include/core/Processor.h @@ -33,6 +33,7 @@ #include "Connectable.h" #include "ConfigurableComponent.h" +#include "io/StreamFactory.h" #include "Property.h" #include "utils/TimeUtil.h" #include "Relationship.h" @@ -243,7 +244,7 @@ class Processor : public Connectable, public ConfigurableComponent, std::atomic<bool> _triggerWhenEmpty; //! obtainSite2SiteProtocol for use - std::shared_ptr<Site2SiteClientProtocol> obtainSite2SiteProtocol(std::string host, uint16_t sport, uuid_t portId); + std::shared_ptr<Site2SiteClientProtocol> obtainSite2SiteProtocol(const std::shared_ptr<io::StreamFactory> &stream_factory, std::string host, uint16_t sport, uuid_t portId); //! returnSite2SiteProtocol after use void returnSite2SiteProtocol(std::shared_ptr<Site2SiteClientProtocol> protocol);private: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/372f2d6f/libminifi/include/core/Repository.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/Repository.h b/libminifi/include/core/Repository.h index 6209b83..48ccc47 100644 --- a/libminifi/include/core/Repository.h +++ b/libminifi/include/core/Repository.h @@ -61,7 +61,6 @@ class Repository : public CoreComponent { max_partition_millis_ = maxPartitionMillis; max_partition_bytes_ = maxPartitionBytes; purge_period_ = purgePeriod; - configure_ = Configure::getConfigure(); running_ = false; repo_full_ = false; } @@ -72,7 +71,7 @@ class Repository : public CoreComponent { } // initialize - virtual bool initialize() { + virtual bool initialize(const std::shared_ptr<Configure> &configure) { return true; } // Put @@ -119,8 +118,6 @@ class Repository : public CoreComponent { std::mutex mutex_; // repository directory std::string directory_; - // Configure - Configure *configure_; // max db entry life time int64_t max_partition_millis_; // max db size http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/372f2d6f/libminifi/include/core/logging/LogAppenders.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/logging/LogAppenders.h b/libminifi/include/core/logging/LogAppenders.h index 7bdc3be..cf11238 100644 --- a/libminifi/include/core/logging/LogAppenders.h +++ b/libminifi/include/core/logging/LogAppenders.h @@ -92,7 +92,7 @@ class OutputStreamAppender : public BaseLogger { * @param stream incoming stream reference. * @param config configuration. */ - explicit OutputStreamAppender(Configure *config) + explicit OutputStreamAppender(const std::shared_ptr<Configure> &config) : BaseLogger("info") { auto ostream_sink = std::make_shared<spdlog::sinks::ostream_sink_mt>( std::cout); @@ -133,7 +133,7 @@ class OutputStreamAppender : public BaseLogger { * @param stream incoming stream reference. * @param config configuration. */ - OutputStreamAppender(std::ostream &stream, Configure *config) + OutputStreamAppender(std::ostream &stream, const std::shared_ptr<Configure> &config) : BaseLogger("info") { auto ostream_sink = std::make_shared<spdlog::sinks::ostream_sink_mt>( stream); @@ -179,7 +179,7 @@ class RollingAppender : public BaseLogger { * Base Constructor. * @param config pointer to the configuration for this instance. */ - explicit RollingAppender(Configure * config = 0) + explicit RollingAppender(const std::shared_ptr<Configure> &config = 0) : BaseLogger("info") { std::string file_name = ""; if (NULL != config @@ -260,7 +260,7 @@ class LogInstance { * the configuration within this instance. * @param config configuration for this instance. */ - static std::unique_ptr<BaseLogger> getConfiguredLogger(Configure *config) { + static std::unique_ptr<BaseLogger> getConfiguredLogger(const std::shared_ptr<Configure> &config) { std::string appender = ""; if (config->get(BaseLogger::nifi_log_appender, appender)) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/372f2d6f/libminifi/include/core/reporting/SiteToSiteProvenanceReportingTask.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/reporting/SiteToSiteProvenanceReportingTask.h b/libminifi/include/core/reporting/SiteToSiteProvenanceReportingTask.h index 927a8ac..34921eb 100644 --- a/libminifi/include/core/reporting/SiteToSiteProvenanceReportingTask.h +++ b/libminifi/include/core/reporting/SiteToSiteProvenanceReportingTask.h @@ -27,6 +27,7 @@ #include "core/Processor.h" #include "core/ProcessSession.h" #include "Site2SiteClientProtocol.h" +#include "io/StreamFactory.h" namespace org { namespace apache { @@ -42,7 +43,7 @@ public: /*! * Create a new processor */ - SiteToSiteProvenanceReportingTask() : + SiteToSiteProvenanceReportingTask(const std::shared_ptr<io::StreamFactory> &stream_factory) : core::Processor(ReportTaskName) { logger_ = logging::Logger::getLogger(); this->setTriggerWhenEmpty(true); @@ -108,6 +109,7 @@ private: int batch_size_; //! Logger std::shared_ptr<logging::Logger> logger_; + std::shared_ptr<io::StreamFactory> stream_factory_; }; // SiteToSiteProvenanceReportingTask http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/372f2d6f/libminifi/include/core/repository/FlowFileRepository.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/repository/FlowFileRepository.h b/libminifi/include/core/repository/FlowFileRepository.h index eed1975..9fc13e0 100644 --- a/libminifi/include/core/repository/FlowFileRepository.h +++ b/libminifi/include/core/repository/FlowFileRepository.h @@ -70,22 +70,22 @@ class FlowFileRepository : public core::Repository, } // initialize - virtual bool initialize() { + virtual bool initialize(const std::shared_ptr<Configure> &configure) { std::string value; - if (configure_->get(Configure::nifi_flowfile_repository_directory_default, + if (configure->get(Configure::nifi_flowfile_repository_directory_default, value)) { directory_ = value; } logger_->log_info("NiFi FlowFile Repository Directory %s", directory_.c_str()); - if (configure_->get(Configure::nifi_flowfile_repository_max_storage_size, + if (configure->get(Configure::nifi_flowfile_repository_max_storage_size, value)) { Property::StringToInt(value, max_partition_bytes_); } logger_->log_info("NiFi FlowFile Max Partition Bytes %d", max_partition_bytes_); - if (configure_->get(Configure::nifi_flowfile_repository_max_storage_time, + if (configure->get(Configure::nifi_flowfile_repository_max_storage_time, value)) { TimeUnit unit; if (Property::StringToTime(value, max_partition_millis_, unit) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/372f2d6f/libminifi/include/core/yaml/YamlConfiguration.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/yaml/YamlConfiguration.h b/libminifi/include/core/yaml/YamlConfiguration.h index fb54869..793cdb9 100644 --- a/libminifi/include/core/yaml/YamlConfiguration.h +++ b/libminifi/include/core/yaml/YamlConfiguration.h @@ -24,6 +24,7 @@ #include "Site2SiteClientProtocol.h" #include <string> #include "io/validation.h" +#include "io/StreamFactory.h" namespace org { namespace apache { @@ -43,8 +44,10 @@ class YamlConfiguration : public FlowConfiguration { public: YamlConfiguration(std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_file_repo, + std::shared_ptr<io::StreamFactory> stream_factory, const std::string path = DEFAULT_FLOW_YAML_FILE_NAME) - : FlowConfiguration(repo, flow_file_repo, path) { + : FlowConfiguration(repo, flow_file_repo, stream_factory, path) { + stream_factory_ = stream_factory; if (IsNullOrEmpty(config_path_)) { config_path_ = DEFAULT_FLOW_YAML_FILE_NAME; } @@ -240,6 +243,9 @@ class YamlConfiguration : public FlowConfiguration { const std::string &fieldName, const std::string &yamlSection = "", const std::string &errorMessage = ""); + + protected: + std::shared_ptr<io::StreamFactory> stream_factory_; }; } /* namespace core */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/372f2d6f/libminifi/include/io/ClientSocket.h ---------------------------------------------------------------------- diff --git a/libminifi/include/io/ClientSocket.h b/libminifi/include/io/ClientSocket.h index 6998950..124055d 100644 --- a/libminifi/include/io/ClientSocket.h +++ b/libminifi/include/io/ClientSocket.h @@ -28,8 +28,8 @@ #include "io/BaseStream.h" #include "core/Core.h" #include "core/logging/Logger.h" - #include "io/validation.h" +#include "properties/Configure.h" namespace org { namespace apache { @@ -38,6 +38,14 @@ namespace minifi { namespace io { /** + * Context class for socket. This is currently only used as a parent class for TLSContext. It is necessary so the Socket and TLSSocket constructors + * can be the same. It also gives us a common place to set timeouts, etc from the Configure object in the future. + */ +class SocketContext { + public: + SocketContext(const std::shared_ptr<Configure> &configure) {} +}; +/** * Socket class. * Purpose: Provides a general purpose socket interface that abstracts * connecting information from users @@ -51,19 +59,21 @@ class Socket : public BaseStream { /** * Constructor that accepts host name, port and listeners. With this * contructor we will be creating a server socket + * @param context the SocketContext * @param hostname our host name * @param port connecting port * @param listeners number of listeners in the queue */ - explicit Socket(const std::string &hostname, const uint16_t port, + explicit Socket(const std::shared_ptr<SocketContext> &context, const std::string &hostname, const uint16_t port, const uint16_t listeners); /** * Constructor that creates a client socket. + * @param context the SocketContext * @param hostname hostname we are connecting to. * @param port port we are connecting to. */ - explicit Socket(const std::string &hostname, const uint16_t port); + explicit Socket(const std::shared_ptr<SocketContext> &context, const std::string &hostname, const uint16_t port); /** * Move constructor. @@ -81,7 +91,7 @@ class Socket : public BaseStream { else { char hostname[1024]; gethostname(hostname, 1024); - Socket mySock(hostname, 0); + Socket mySock(nullptr, hostname, 0); mySock.initialize(); return mySock.getHostname(); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/372f2d6f/libminifi/include/io/StreamFactory.h ---------------------------------------------------------------------- diff --git a/libminifi/include/io/StreamFactory.h b/libminifi/include/io/StreamFactory.h index f942f63..db4625a 100644 --- a/libminifi/include/io/StreamFactory.h +++ b/libminifi/include/io/StreamFactory.h @@ -22,36 +22,15 @@ #include "utils/StringUtils.h" #include "validation.h" -#ifdef OPENSSL_SUPPORT -#include "tls/TLSSocket.h" -#endif - namespace org { namespace apache { namespace nifi { namespace minifi { namespace io { -/** - * Purpose: Socket Creator is a class that will determine if the provided socket type - * exists per the compilation parameters - */ -template<typename T> -class SocketCreator { - - template<bool cond, typename U> - using TypeCheck = typename std::enable_if< cond, U >::type; - +class AbstractStreamFactory { public: - template<typename U = T> - TypeCheck<true, U> *create(const std::string &host, const uint16_t port) { - return new T(host, port); - } - template<typename U = T> - TypeCheck<false, U> *create(const std::string &host, const uint16_t port) { - return new Socket(host, port); - } - + virtual std::unique_ptr<Socket> createSocket(const std::string &host, const uint16_t port) = 0; }; /** @@ -63,69 +42,18 @@ class StreamFactory { public: /** - * Build an instance, creating a memory fence, which - * allows us to avoid locking. This is tantamount to double checked locking. - * @returns new StreamFactory; - */ - static StreamFactory *getInstance() { - StreamFactory* atomic_context = context_instance_.load( - std::memory_order_relaxed); - std::atomic_thread_fence(std::memory_order_acquire); - if (atomic_context == nullptr) { - std::lock_guard<std::mutex> lock(context_mutex_); - atomic_context = context_instance_.load(std::memory_order_relaxed); - if (atomic_context == nullptr) { - atomic_context = new StreamFactory(); - std::atomic_thread_fence(std::memory_order_release); - context_instance_.store(atomic_context, std::memory_order_relaxed); - } - } - return atomic_context; - } - - /** * Creates a socket and returns a unique ptr * */ std::unique_ptr<Socket> createSocket(const std::string &host, const uint16_t port) { - Socket *socket = 0; - - if (is_secure_) { - socket = createSocket<TLSSocket>(host, port); - } else { - socket = createSocket<Socket>(host, port); - } - return std::unique_ptr<Socket>(socket); - } - - protected: - - /** - * Creates a socket and returns a unique ptr - * - */ - template<typename T> - Socket *createSocket(const std::string &host, const uint16_t port) { - SocketCreator<T> creator; - return creator.create(host, port); + return delegate_->createSocket(host, port); } - StreamFactory() - : configure_(Configure::getConfigure()) { - std::string secureStr; - is_secure_ = false; - if (configure_->get(Configure::nifi_remote_input_secure, secureStr)) { - org::apache::nifi::minifi::utils::StringUtils::StringToBool(secureStr, - is_secure_); - } - } - - bool is_secure_; - static std::atomic<StreamFactory*> context_instance_; - static std::mutex context_mutex_; + StreamFactory(const std::shared_ptr<Configure> &configure); - Configure *configure_; + protected: + std::shared_ptr<AbstractStreamFactory> delegate_; }; } /* namespace io */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/372f2d6f/libminifi/include/io/tls/TLSSocket.h ---------------------------------------------------------------------- diff --git a/libminifi/include/io/tls/TLSSocket.h b/libminifi/include/io/tls/TLSSocket.h index 2762ba8..c14170b 100644 --- a/libminifi/include/io/tls/TLSSocket.h +++ b/libminifi/include/io/tls/TLSSocket.h @@ -21,8 +21,6 @@ #include <openssl/ssl.h> #include <openssl/err.h> #include <cstdint> -#include <atomic> -#include <mutex> #include "../ClientSocket.h" #include "properties/Configure.h" @@ -39,32 +37,11 @@ namespace io { #define TLS_ERROR_KEY_ERROR 4 #define TLS_ERROR_CERT_ERROR 5 -class TLSContext { +class TLSContext: public SocketContext { public: - - /** - * Build an instance, creating a memory fence, which - * allows us to avoid locking. This is tantamount to double checked locking. - * @returns new TLSContext; - */ - static TLSContext *getInstance() { - TLSContext* atomic_context = context_instance.load( - std::memory_order_relaxed); - std::atomic_thread_fence(std::memory_order_acquire); - if (atomic_context == nullptr) { - std::lock_guard<std::mutex> lock(context_mutex); - atomic_context = context_instance.load(std::memory_order_relaxed); - if (atomic_context == nullptr) { - atomic_context = new TLSContext(); - atomic_context->initialize(); - std::atomic_thread_fence(std::memory_order_release); - context_instance.store(atomic_context, std::memory_order_relaxed); - } - } - return atomic_context; - } - + TLSContext(const std::shared_ptr<Configure> &configure); + virtual ~TLSContext() { if (0 != ctx) SSL_CTX_free(ctx); @@ -82,10 +59,10 @@ class TLSContext { private: - static int pemPassWordCb(char *buf, int size, int rwflag, void *userdata) { + static int pemPassWordCb(char *buf, int size, int rwflag, void *configure) { std::string passphrase; - if (Configure::getConfigure()->get( + if (static_cast<Configure*>(configure)->get( Configure::nifi_security_client_pass_phrase, passphrase)) { std::ifstream file(passphrase.c_str(), std::ifstream::in); @@ -106,17 +83,12 @@ class TLSContext { return 0; } - TLSContext(); std::shared_ptr<logging::Logger> logger_; - Configure *configuration; + std::shared_ptr<Configure> configure_; SSL_CTX *ctx; int16_t error_value; - - static std::atomic<TLSContext*> context_instance; - static std::mutex context_mutex; - }; class TLSSocket : public Socket { @@ -125,19 +97,21 @@ class TLSSocket : public Socket { /** * Constructor that accepts host name, port and listeners. With this * contructor we will be creating a server socket + * @param context the TLSContext * @param hostname our host name * @param port connecting port * @param listeners number of listeners in the queue */ - explicit TLSSocket(const std::string &hostname, const uint16_t port, + explicit TLSSocket(const std::shared_ptr<TLSContext> &context, const std::string &hostname, const uint16_t port, const uint16_t listeners); /** * Constructor that creates a client socket. + * @param context the TLSContext * @param hostname hostname we are connecting to. * @param port port we are connecting to. */ - explicit TLSSocket(const std::string &hostname, const uint16_t port); + explicit TLSSocket(const std::shared_ptr<TLSContext> &context, const std::string &hostname, const uint16_t port); /** * Move constructor. @@ -183,7 +157,7 @@ class TLSSocket : public Socket { int writeData(uint8_t *value, int size); protected: - + std::shared_ptr<TLSContext> context_; SSL* ssl; }; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/372f2d6f/libminifi/include/properties/Configure.h ---------------------------------------------------------------------- diff --git a/libminifi/include/properties/Configure.h b/libminifi/include/properties/Configure.h index d1d045c..cb1b412 100644 --- a/libminifi/include/properties/Configure.h +++ b/libminifi/include/properties/Configure.h @@ -37,13 +37,6 @@ namespace minifi { class Configure { public: - // Get the singleton logger instance - static Configure * getConfigure() { - if (!configure_) { - configure_ = new Configure(); - } - return configure_; - } // nifi.flow.configuration.file static const char *nifi_flow_configuration_file; static const char *nifi_administrative_yield_duration; @@ -71,6 +64,14 @@ class Configure { static const char *nifi_security_client_pass_phrase; static const char *nifi_security_client_ca_certificate; + Configure() { + logger_ = logging::Logger::getLogger(); + } + + virtual ~Configure() { + + } + // Clear the load config void clear() { std::lock_guard<std::mutex> lock(mutex_); @@ -112,14 +113,6 @@ class Configure { // Home location for this executable std::string minifi_home_; - Configure() { - logger_ = logging::Logger::getLogger(); - } - virtual ~Configure() { - - } - static Configure *configure_; - protected: std::map<std::string, std::string> properties_; }; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/372f2d6f/libminifi/include/provenance/ProvenanceRepository.h ---------------------------------------------------------------------- diff --git a/libminifi/include/provenance/ProvenanceRepository.h b/libminifi/include/provenance/ProvenanceRepository.h index b96021c..af613a5 100644 --- a/libminifi/include/provenance/ProvenanceRepository.h +++ b/libminifi/include/provenance/ProvenanceRepository.h @@ -71,21 +71,21 @@ class ProvenanceRepository : public core::Repository, } // initialize - virtual bool initialize() { + virtual bool initialize(const std::shared_ptr<org::apache::nifi::minifi::Configure> &config) { std::string value; - if (configure_->get(Configure::nifi_provenance_repository_directory_default, + if (config->get(Configure::nifi_provenance_repository_directory_default, value)) { directory_ = value; } logger_->log_info("NiFi Provenance Repository Directory %s", directory_.c_str()); - if (configure_->get(Configure::nifi_provenance_repository_max_storage_size, + if (config->get(Configure::nifi_provenance_repository_max_storage_size, value)) { core::Property::StringToInt(value, max_partition_bytes_); } logger_->log_info("NiFi Provenance Max Partition Bytes %d", max_partition_bytes_); - if (configure_->get(Configure::nifi_provenance_repository_max_storage_time, + if (config->get(Configure::nifi_provenance_repository_max_storage_time, value)) { core::TimeUnit unit; if (core::Property::StringToTime(value, max_partition_millis_, unit) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/372f2d6f/libminifi/src/Configure.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/Configure.cpp b/libminifi/src/Configure.cpp index f70686d..95562c3 100644 --- a/libminifi/src/Configure.cpp +++ b/libminifi/src/Configure.cpp @@ -25,7 +25,6 @@ namespace apache { namespace nifi { namespace minifi { -Configure *Configure::configure_(NULL); const char *Configure::nifi_flow_configuration_file = "nifi.flow.configuration.file"; const char *Configure::nifi_administrative_yield_duration = @@ -125,7 +124,7 @@ void Configure::loadConfigureFile(const char *fileName) { if (fileName) { // perform a naive determination if this is a relative path if (fileName[0] != '/') { - adjustedFilename = adjustedFilename + configure_->getHome() + "/" + adjustedFilename = adjustedFilename + getHome() + "/" + fileName; } else { adjustedFilename += fileName; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/372f2d6f/libminifi/src/FlowController.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp index 1a163ea..6785a9d 100644 --- a/libminifi/src/FlowController.cpp +++ b/libminifi/src/FlowController.cpp @@ -49,6 +49,7 @@ namespace minifi { FlowController::FlowController( std::shared_ptr<core::Repository> provenance_repo, std::shared_ptr<core::Repository> flow_file_repo, + std::shared_ptr<Configure> configure, std::unique_ptr<core::FlowConfiguration> flow_configuration, const std::string name, bool headless_mode) : CoreComponent(core::getClassName<FlowController>()), @@ -60,8 +61,8 @@ FlowController::FlowController( provenance_repo_(provenance_repo), flow_file_repo_(flow_file_repo), protocol_(0), - _timerScheduler(provenance_repo_), - _eventScheduler(provenance_repo_), + _timerScheduler(provenance_repo_, configure), + _eventScheduler(provenance_repo_, configure), flow_configuration_(std::move(flow_configuration)) { if (provenance_repo == nullptr) throw std::runtime_error("Provenance Repo should not be null"); @@ -81,14 +82,11 @@ FlowController::FlowController( initialized_ = false; root_ = NULL; - protocol_ = new FlowControlProtocol(this); - - // NiFi config properties - configure_ = Configure::getConfigure(); + protocol_ = new FlowControlProtocol(this, configure); if (!headless_mode) { std::string rawConfigFileString; - configure_->get(Configure::nifi_flow_configuration_file, + configure->get(Configure::nifi_flow_configuration_file, rawConfigFileString); if (!rawConfigFileString.empty()) { @@ -99,7 +97,7 @@ FlowController::FlowController( if (!configuration_filename_.empty()) { // perform a naive determination if this is a relative path if (configuration_filename_.c_str()[0] != '/') { - adjustedFilename = adjustedFilename + configure_->getHome() + "/" + adjustedFilename = adjustedFilename + configure->getHome() + "/" + configuration_filename_; } else { adjustedFilename = configuration_filename_; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/372f2d6f/libminifi/src/RemoteProcessorGroupPort.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp b/libminifi/src/RemoteProcessorGroupPort.cpp index 6263359..70771af 100644 --- a/libminifi/src/RemoteProcessorGroupPort.cpp +++ b/libminifi/src/RemoteProcessorGroupPort.cpp @@ -106,7 +106,7 @@ void RemoteProcessorGroupPort::onTrigger(core::ProcessContext *context, } std::shared_ptr<Site2SiteClientProtocol> protocol_ = - this->obtainSite2SiteProtocol(host, sport, protocol_uuid_); + this->obtainSite2SiteProtocol(stream_factory_, host, sport, protocol_uuid_); if (!protocol_) { context->yield(); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/372f2d6f/libminifi/src/ResourceClaim.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/ResourceClaim.cpp b/libminifi/src/ResourceClaim.cpp index cbe7712..b71032b 100644 --- a/libminifi/src/ResourceClaim.cpp +++ b/libminifi/src/ResourceClaim.cpp @@ -47,8 +47,6 @@ ResourceClaim::ResourceClaim(const std::string contentDirectory) // Create the full content path for the content _contentFullPath = contentDirectory + "/" + uuidStr; - configure_ = Configure::getConfigure(); - logger_ = logging::Logger::getLogger(); logger_->log_debug("Resource Claim created %s", _contentFullPath.c_str()); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/372f2d6f/libminifi/src/core/ConfigurationFactory.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/ConfigurationFactory.cpp b/libminifi/src/core/ConfigurationFactory.cpp index e009b1b..4ccead2 100644 --- a/libminifi/src/core/ConfigurationFactory.cpp +++ b/libminifi/src/core/ConfigurationFactory.cpp @@ -23,6 +23,7 @@ #include <set> #include "core/ConfigurationFactory.h" #include "core/FlowConfiguration.h" +#include "io/StreamFactory.h" #ifdef YAML_SUPPORT #include "core/yaml/YamlConfiguration.h" @@ -40,6 +41,8 @@ class YamlConfiguration; std::unique_ptr<core::FlowConfiguration> createFlowConfiguration( std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_file_repo, + std::shared_ptr<Configure> configure, + std::shared_ptr<io::StreamFactory> stream_factory, const std::string configuration_class_name, const std::string path, bool fail_safe) { @@ -50,17 +53,17 @@ std::unique_ptr<core::FlowConfiguration> createFlowConfiguration( if (class_name_lc == "flowconfiguration") { // load the base configuration. return std::unique_ptr<core::FlowConfiguration>( - new core::FlowConfiguration(repo, flow_file_repo, path)); + new core::FlowConfiguration(repo, flow_file_repo, stream_factory, path)); } else if (class_name_lc == "yamlconfiguration") { // only load if the class is defined. return std::unique_ptr<core::FlowConfiguration>( - instantiate<core::YamlConfiguration>(repo, flow_file_repo, path)); + instantiate<core::YamlConfiguration>(repo, flow_file_repo, stream_factory, path)); } else { if (fail_safe) { return std::unique_ptr<core::FlowConfiguration>( - new core::FlowConfiguration(repo, flow_file_repo, path)); + new core::FlowConfiguration(repo, flow_file_repo, stream_factory, path)); } else { throw std::runtime_error( "Support for the provided configuration class could not be found"); @@ -69,7 +72,7 @@ std::unique_ptr<core::FlowConfiguration> createFlowConfiguration( } catch (const std::runtime_error &r) { if (fail_safe) { return std::unique_ptr<core::FlowConfiguration>( - new core::FlowConfiguration(repo, flow_file_repo, path)); + new core::FlowConfiguration(repo, flow_file_repo, stream_factory, path)); } } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/372f2d6f/libminifi/src/core/FlowConfiguration.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/FlowConfiguration.cpp b/libminifi/src/core/FlowConfiguration.cpp index f2b6f8b..90058d2 100644 --- a/libminifi/src/core/FlowConfiguration.cpp +++ b/libminifi/src/core/FlowConfiguration.cpp @@ -90,7 +90,7 @@ std::shared_ptr<core::Processor> FlowConfiguration::createProvenanceReportTask() std::shared_ptr<core::Processor> processor = nullptr; processor = std::make_shared< - org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask>(); + org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask>(stream_factory_); // initialize the processor processor->initialize(); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/372f2d6f/libminifi/src/core/Processor.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp index 4b14775..e124992 100644 --- a/libminifi/src/core/Processor.cpp +++ b/libminifi/src/core/Processor.cpp @@ -195,7 +195,7 @@ void Processor::removeConnection(std::shared_ptr<Connectable> conn) { } std::shared_ptr<Site2SiteClientProtocol> Processor::obtainSite2SiteProtocol( - std::string host, uint16_t sport, uuid_t portId) { + const std::shared_ptr<io::StreamFactory> &stream_factory, std::string host, uint16_t sport, uuid_t portId) { std::lock_guard < std::mutex > lock(mutex_); if (!protocols_created_) { @@ -206,8 +206,7 @@ std::shared_ptr<Site2SiteClientProtocol> Processor::obtainSite2SiteProtocol( protocol->setPortId(portId); std::unique_ptr<org::apache::nifi::minifi::io::DataStream> str = std::unique_ptr < org::apache::nifi::minifi::io::DataStream - > (org::apache::nifi::minifi::io::StreamFactory::getInstance()->createSocket( - host, sport)); + > (stream_factory->createSocket(host, sport)); std::unique_ptr<Site2SitePeer> peer_ = std::unique_ptr < Site2SitePeer > (new Site2SitePeer(std::move(str), host, sport)); protocol->setPeer(std::move(peer_)); @@ -225,8 +224,7 @@ std::shared_ptr<Site2SiteClientProtocol> Processor::obtainSite2SiteProtocol( protocol->setPortId(portId); std::unique_ptr<org::apache::nifi::minifi::io::DataStream> str = std::unique_ptr < org::apache::nifi::minifi::io::DataStream - > (org::apache::nifi::minifi::io::StreamFactory::getInstance()->createSocket( - host, sport)); + > (stream_factory->createSocket(host, sport)); std::unique_ptr<Site2SitePeer> peer_ = std::unique_ptr < Site2SitePeer > (new Site2SitePeer(std::move(str), host, sport)); protocol->setPeer(std::move(peer_)); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/372f2d6f/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp b/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp index 867e200..adffc88 100644 --- a/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp +++ b/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp @@ -106,7 +106,7 @@ void SiteToSiteProvenanceReportingTask::onTrigger(core::ProcessContext *context, core::ProcessSession *session) { std::shared_ptr<Site2SiteClientProtocol> protocol_ = - this->obtainSite2SiteProtocol(host_, port_, port_uuid_); + this->obtainSite2SiteProtocol(stream_factory_, host_, port_, port_uuid_); if (!protocol_) { context->yield(); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/372f2d6f/libminifi/src/core/yaml/YamlConfiguration.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/yaml/YamlConfiguration.cpp index 489bdaa..b7c8246 100644 --- a/libminifi/src/core/yaml/YamlConfiguration.cpp +++ b/libminifi/src/core/yaml/YamlConfiguration.cpp @@ -518,7 +518,7 @@ void YamlConfiguration::parsePortYaml(YAML::Node *portNode, auto portId = inputPortsObj["id"].as<std::string>(); uuid_parse(portId.c_str(), uuid); - port = std::make_shared<minifi::RemoteProcessorGroupPort>(nameStr, uuid); + port = std::make_shared<minifi::RemoteProcessorGroupPort>(stream_factory_, nameStr, uuid); processor = std::static_pointer_cast<core::Processor>(port); port->setDirection(direction); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/372f2d6f/libminifi/src/io/ClientSocket.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/io/ClientSocket.cpp b/libminifi/src/io/ClientSocket.cpp index 98e0205..7a4a191 100644 --- a/libminifi/src/io/ClientSocket.cpp +++ b/libminifi/src/io/ClientSocket.cpp @@ -24,6 +24,7 @@ #include <arpa/inet.h> #include <unistd.h> #include <cstdio> +#include <memory> #include <utility> #include <vector> #include <cerrno> @@ -39,7 +40,7 @@ namespace io { char *Socket::HOSTNAME = const_cast<char*>(Socket::getMyHostName(0).c_str()); -Socket::Socket(const std::string &hostname, const uint16_t port, +Socket::Socket(const std::shared_ptr<SocketContext> &context, const std::string &hostname, const uint16_t port, const uint16_t listeners = -1) : requested_hostname_(hostname), port_(port), @@ -53,8 +54,8 @@ Socket::Socket(const std::string &hostname, const uint16_t port, FD_ZERO(&read_fds_); } -Socket::Socket(const std::string &hostname, const uint16_t port) - : Socket(hostname, port, 0) { +Socket::Socket(const std::shared_ptr<SocketContext> &context, const std::string &hostname, const uint16_t port) + : Socket(context, hostname, port, 0) { } Socket::Socket(const Socket &&other) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/372f2d6f/libminifi/src/io/StreamFactory.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/io/StreamFactory.cpp b/libminifi/src/io/StreamFactory.cpp index 1cf419e..220883c 100644 --- a/libminifi/src/io/StreamFactory.cpp +++ b/libminifi/src/io/StreamFactory.cpp @@ -17,7 +17,13 @@ */ #include "io/StreamFactory.h" #include <atomic> +#include <memory> #include <mutex> +#include <string> + +#ifdef OPENSSL_SUPPORT +#include "io/tls/TLSSocket.h" +#endif namespace org { namespace apache { @@ -25,9 +31,64 @@ namespace nifi { namespace minifi { namespace io { -std::atomic<StreamFactory*> StreamFactory::context_instance_; -std::mutex StreamFactory::context_mutex_; +/** + * Purpose: Socket Creator is a class that will determine if the provided socket type + * exists per the compilation parameters + */ + +template<typename T, typename V> +class SocketCreator : public AbstractStreamFactory { + template<bool cond, typename U> + using TypeCheck = typename std::enable_if< cond, U >::type; + + template<bool cond, typename Q> + using ContextTypeCheck = typename std::enable_if< cond, Q >::type; + + public: + template<typename Q = V> + ContextTypeCheck<true, std::shared_ptr<Q>> create(const std::shared_ptr<Configure> &configure) { + return std::make_shared<V>(configure); + } + template<typename Q = V> + ContextTypeCheck<false, std::shared_ptr<Q>> create(const std::shared_ptr<Configure> &configure) { + return std::make_shared<SocketContext>(configure); + } + + SocketCreator<T, V>(std::shared_ptr<Configure> configure) { + context_ = create(configure); + } + + template<typename U = T> + TypeCheck<true, U> *create(const std::string &host, const uint16_t port) { + return new T(context_, host, port); + } + template<typename U = T> + TypeCheck<false, U> *create(const std::string &host, const uint16_t port) { + return new Socket(context_, host, port); + } + + std::unique_ptr<Socket> createSocket(const std::string &host, const uint16_t port) { + T *socket = create(host, port); + return std::unique_ptr<Socket>(socket); + } + + private: + std::shared_ptr<V> context_; +}; + +// std::atomic<StreamFactory*> StreamFactory::context_instance_; +// std::mutex StreamFactory::context_mutex_; +StreamFactory::StreamFactory(const std::shared_ptr<Configure> &configure) { + std::string secureStr; + bool is_secure = false; + if (configure->get(Configure::nifi_remote_input_secure, secureStr)) { + org::apache::nifi::minifi::utils::StringUtils::StringToBool(secureStr, is_secure); + delegate_ = std::make_shared<SocketCreator<TLSSocket, TLSContext>>(configure); + } else { + delegate_ = std::make_shared<SocketCreator<Socket, SocketContext>>(configure); + } +} } /* namespace io */ } /* namespace minifi */ } /* namespace nifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/372f2d6f/libminifi/src/io/tls/TLSSocket.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/io/tls/TLSSocket.cpp b/libminifi/src/io/tls/TLSSocket.cpp index 1499840..f938e0a 100644 --- a/libminifi/src/io/tls/TLSSocket.cpp +++ b/libminifi/src/io/tls/TLSSocket.cpp @@ -15,12 +15,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#include "io/tls/TLSSocket.h" #include <openssl/ssl.h> #include <openssl/err.h> +#include <memory> #include <utility> #include <string> #include <vector> +#include "io/tls/TLSSocket.h" #include "properties/Configure.h" #include "utils/StringUtils.h" #include "core/Property.h" @@ -31,14 +32,11 @@ namespace nifi { namespace minifi { namespace io { -std::atomic<TLSContext*> TLSContext::context_instance; -std::mutex TLSContext::context_mutex; - -TLSContext::TLSContext() - : error_value(0), +TLSContext::TLSContext(const std::shared_ptr<Configure> &configure) + : SocketContext(configure), error_value(0), ctx(0), logger_(logging::Logger::getLogger()), - configuration(Configure::getConfigure()) { + configure_(configure) { } /** * The memory barrier is defined by the singleton @@ -49,7 +47,7 @@ int16_t TLSContext::initialize() { } std::string clientAuthStr; bool needClientCert = true; - if (!(configuration->get(Configure::nifi_security_need_ClientAuth, + if (!(configure_->get(Configure::nifi_security_need_ClientAuth, clientAuthStr) && org::apache::nifi::minifi::utils::StringUtils::StringToBool( clientAuthStr, needClientCert))) { @@ -75,9 +73,9 @@ int16_t TLSContext::initialize() { std::string passphrase; std::string caCertificate; - if (!(configuration->get(Configure::nifi_security_client_certificate, + if (!(configure_->get(Configure::nifi_security_client_certificate, certificate) - && configuration->get(Configure::nifi_security_client_private_key, + && configure_->get(Configure::nifi_security_client_private_key, privatekey))) { logger_->log_error( "Certificate and Private Key PEM file not configured, error: %s.", @@ -93,10 +91,11 @@ int16_t TLSContext::initialize() { error_value = TLS_ERROR_CERT_MISSING; return error_value; } - if (configuration->get(Configure::nifi_security_client_pass_phrase, + if (configure_->get(Configure::nifi_security_client_pass_phrase, passphrase)) { // if the private key has passphase SSL_CTX_set_default_passwd_cb(ctx, pemPassWordCb); + SSL_CTX_set_default_passwd_cb_userdata(ctx, static_cast<void*>(configure_.get())); } int retp = SSL_CTX_use_PrivateKey_file(ctx, privatekey.c_str(), @@ -117,7 +116,7 @@ int16_t TLSContext::initialize() { return error_value; } // load CA certificates - if (configuration->get(Configure::nifi_security_client_ca_certificate, + if (configure_->get(Configure::nifi_security_client_ca_certificate, caCertificate)) { retp = SSL_CTX_load_verify_locations(ctx, caCertificate.c_str(), 0); if (retp == 0) { @@ -144,29 +143,31 @@ TLSSocket::~TLSSocket() { * @param port connecting port * @param listeners number of listeners in the queue */ -TLSSocket::TLSSocket(const std::string &hostname, const uint16_t port, +TLSSocket::TLSSocket(const std::shared_ptr<TLSContext> &context, const std::string &hostname, const uint16_t port, const uint16_t listeners) - : Socket(hostname, port, listeners), + : Socket(context, hostname, port, listeners), ssl(0) { + context_ = context; } -TLSSocket::TLSSocket(const std::string &hostname, const uint16_t port) - : Socket(hostname, port, 0), +TLSSocket::TLSSocket(const std::shared_ptr<TLSContext> &context, const std::string &hostname, const uint16_t port) + : Socket(context, hostname, port, 0), ssl(0) { + context_ = context; } TLSSocket::TLSSocket(const TLSSocket &&d) : Socket(std::move(d)), ssl(0) { + context_ = d.context_; } int16_t TLSSocket::initialize() { - TLSContext *context = TLSContext::getInstance(); - int16_t ret = context->initialize(); + int16_t ret = context_->initialize(); Socket::initialize(); if (!ret) { // we have s2s secure config - ssl = SSL_new(context->getContext()); + ssl = SSL_new(context_->getContext()); SSL_set_fd(ssl, socket_file_descriptor_); if (SSL_connect(ssl) == -1) { logger_->log_error("SSL socket connect failed to %s %d", http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/372f2d6f/libminifi/test/HttpGetIntegrationTest.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/HttpGetIntegrationTest.cpp b/libminifi/test/HttpGetIntegrationTest.cpp index e6720f5..90505b4 100644 --- a/libminifi/test/HttpGetIntegrationTest.cpp +++ b/libminifi/test/HttpGetIntegrationTest.cpp @@ -35,6 +35,7 @@ #include "../include/FlowController.h" #include "../include/properties/Configure.h" #include "unit/ProvenanceTestHelper.h" +#include "../include/io/StreamFactory.h" std::string test_file_location; @@ -57,7 +58,7 @@ int main(int argc, char **argv) { logger->updateLogger(std::move(outputLogger)); logger->setLogLevel("debug"); - minifi::Configure *configuration = minifi::Configure::getConfigure(); + std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>(); std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>(); @@ -67,18 +68,19 @@ int main(int argc, char **argv) { configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location); + std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared<minifi::io::StreamFactory>(configuration); std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr< core::YamlConfiguration>( - new core::YamlConfiguration(test_repo, test_repo, test_file_location)); + new core::YamlConfiguration(test_repo, test_repo, stream_factory, test_file_location)); std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo); std::shared_ptr<minifi::FlowController> controller = std::make_shared< - minifi::FlowController>(test_repo, test_flow_repo, std::move(yaml_ptr), + minifi::FlowController>(test_repo, test_flow_repo, std::make_shared<minifi::Configure>(), std::move(yaml_ptr), DEFAULT_ROOT_GROUP_NAME, true); - core::YamlConfiguration yaml_config(test_repo, test_repo, test_file_location); + core::YamlConfiguration yaml_config(test_repo, test_repo, stream_factory, test_file_location); std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot( test_file_location); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/372f2d6f/libminifi/test/HttpPostIntegrationTest.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/HttpPostIntegrationTest.cpp b/libminifi/test/HttpPostIntegrationTest.cpp index 2898611..73d21e6 100644 --- a/libminifi/test/HttpPostIntegrationTest.cpp +++ b/libminifi/test/HttpPostIntegrationTest.cpp @@ -35,6 +35,8 @@ #include "../include/FlowController.h" #include "../include/properties/Configure.h" #include "unit/ProvenanceTestHelper.h" +#include "../include/io/StreamFactory.h" +#include "../include/properties/Configure.h" std::string test_file_location; @@ -62,7 +64,7 @@ int main(int argc, char **argv) { logger->updateLogger(std::move(outputLogger)); logger->setLogLevel("debug"); - minifi::Configure *configuration = minifi::Configure::getConfigure(); + std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>(); std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>(); @@ -71,19 +73,20 @@ int main(int argc, char **argv) { configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location); + std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared<minifi::io::StreamFactory>(configuration); std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr< core::YamlConfiguration>( - new core::YamlConfiguration(test_repo, test_repo, test_file_location)); + new core::YamlConfiguration(test_repo, test_repo, stream_factory, test_file_location)); std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo); std::shared_ptr<minifi::FlowController> controller = std::make_shared< - minifi::FlowController>(test_repo, test_flow_repo, std::move(yaml_ptr), + minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), DEFAULT_ROOT_GROUP_NAME, true); - core::YamlConfiguration yaml_config(test_repo, test_repo, test_file_location); + core::YamlConfiguration yaml_config(test_repo, test_repo, stream_factory, test_file_location); std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot( test_file_location); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/372f2d6f/libminifi/test/ProcessorTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/ProcessorTests.cpp b/libminifi/test/ProcessorTests.cpp index ebc408b..dfdcf47 100644 --- a/libminifi/test/ProcessorTests.cpp +++ b/libminifi/test/ProcessorTests.cpp @@ -50,7 +50,7 @@ TEST_CASE("Test Find file", "[getfileCreate2]") { std::shared_ptr<core::Processor> processorReport = std::make_shared< - org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask>(); + org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask>(std::make_shared<org::apache::nifi::minifi::io::StreamFactory>(std::make_shared<org::apache::nifi::minifi::Configure>())); std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>(); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/372f2d6f/libminifi/test/SocketTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/SocketTests.cpp b/libminifi/test/SocketTests.cpp index 263a8f3..2e5013b 100644 --- a/libminifi/test/SocketTests.cpp +++ b/libminifi/test/SocketTests.cpp @@ -24,7 +24,7 @@ using namespace org::apache::nifi::minifi::io; TEST_CASE("TestSocket", "[TestSocket1]") { - Socket socket("localhost", 8183); + Socket socket(std::make_shared<SocketContext>(std::make_shared<minifi::Configure>()), "localhost", 8183); REQUIRE(-1 == socket.initialize()); REQUIRE("localhost" == socket.getHostname()); socket.closeStream(); @@ -33,7 +33,7 @@ TEST_CASE("TestSocket", "[TestSocket1]") { TEST_CASE("TestSocketWriteTest1", "[TestSocket2]") { - Socket socket("localhost", 8183); + Socket socket(std::make_shared<SocketContext>(std::make_shared<minifi::Configure>()), "localhost", 8183); REQUIRE(-1 == socket.initialize()); socket.writeData(0, 0); @@ -51,12 +51,14 @@ TEST_CASE("TestSocketWriteTest2", "[TestSocket3]") { std::vector<uint8_t> buffer; buffer.push_back('a'); + + std::shared_ptr<SocketContext> socket_context = std::make_shared<SocketContext>(std::make_shared<minifi::Configure>()); - Socket server("localhost", 9183, 1); + Socket server(socket_context, "localhost", 9183, 1); REQUIRE(-1 != server.initialize()); - Socket client("localhost", 9183); + Socket client(socket_context, "localhost", 9183); REQUIRE(-1 != client.initialize()); @@ -85,12 +87,14 @@ TEST_CASE("TestWriteEndian64", "[TestSocket4]") { std::vector<uint8_t> buffer; buffer.push_back('a'); + + std::shared_ptr<SocketContext> socket_context = std::make_shared<SocketContext>(std::make_shared<minifi::Configure>()); - Socket server("localhost", 9183, 1); + Socket server(socket_context, "localhost", 9183, 1); REQUIRE(-1 != server.initialize()); - Socket client("localhost", 9183); + Socket client(socket_context, "localhost", 9183); REQUIRE(-1 != client.initialize()); @@ -113,11 +117,13 @@ TEST_CASE("TestWriteEndian32", "[TestSocket5]") { std::vector<uint8_t> buffer; buffer.push_back('a'); - Socket server("localhost", 9183, 1); + std::shared_ptr<SocketContext> socket_context = std::make_shared<SocketContext>(std::make_shared<minifi::Configure>()); + + Socket server(socket_context, "localhost", 9183, 1); REQUIRE(-1 != server.initialize()); - Socket client("localhost", 9183); + Socket client(socket_context, "localhost", 9183); REQUIRE(-1 != client.initialize()); @@ -151,11 +157,13 @@ TEST_CASE("TestSocketWriteTestAfterClose", "[TestSocket6]") { std::vector<uint8_t> buffer; buffer.push_back('a'); - Socket server("localhost", 9183, 1); + std::shared_ptr<SocketContext> socket_context = std::make_shared<SocketContext>(std::make_shared<minifi::Configure>()); + + Socket server(socket_context, "localhost", 9183, 1); REQUIRE(-1 != server.initialize()); - Socket client("localhost", 9183); + Socket client(socket_context, "localhost", 9183); REQUIRE(-1 != client.initialize()); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/372f2d6f/libminifi/test/TestBase.h ---------------------------------------------------------------------- diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h index 585c8cd..0b3ca3e 100644 --- a/libminifi/test/TestBase.h +++ b/libminifi/test/TestBase.h @@ -27,6 +27,7 @@ #include "core/logging/LogAppenders.h" #include "core/logging/Logger.h" #include "core/Core.h" +#include "properties/Configure.h" class LogTestController { public: @@ -72,12 +73,11 @@ class TestController { } } - void setDebugToConsole() { + void setDebugToConsole(std::shared_ptr<org::apache::nifi::minifi::Configure> configure) { std::ostringstream oss; std::unique_ptr<logging::BaseLogger> outputLogger = std::unique_ptr< logging::BaseLogger>( - new org::apache::nifi::minifi::core::logging::OutputStreamAppender( - std::cout, minifi::Configure::getConfigure())); + new org::apache::nifi::minifi::core::logging::OutputStreamAppender(std::cout, configure)); std::shared_ptr<logging::Logger> logger = logging::Logger::getLogger(); logger->updateLogger(std::move(outputLogger)); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/372f2d6f/libminifi/test/nodefs/NoYamlConfiguration.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/nodefs/NoYamlConfiguration.cpp b/libminifi/test/nodefs/NoYamlConfiguration.cpp index 5f3fce4..c720264 100644 --- a/libminifi/test/nodefs/NoYamlConfiguration.cpp +++ b/libminifi/test/nodefs/NoYamlConfiguration.cpp @@ -28,7 +28,7 @@ TEST_CASE("NoYamlSupport1", "[NoYamlSupport1]") { "provenancerepository", true); REQUIRE(nullptr != prov_repo); std::unique_ptr<core::FlowConfiguration> flow_configuration = std::move( - core::createFlowConfiguration(prov_repo, prov_repo, + core::createFlowConfiguration(prov_repo, prov_repo, std::make_shared<minifi::Configure>(), std::make_shared<minifi::io::StreamFactory>(false), "yamlconfiguration")); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/372f2d6f/libminifi/test/unit/LoggerTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/LoggerTests.cpp b/libminifi/test/unit/LoggerTests.cpp index cbf7a36..48942f0 100644 --- a/libminifi/test/unit/LoggerTests.cpp +++ b/libminifi/test/unit/LoggerTests.cpp @@ -158,7 +158,7 @@ TEST_CASE("Test log Levels change", "[ttl5]") { TEST_CASE("Test log LevelsConfigured", "[ttl6]") { std::ostringstream oss; - minifi::Configure *config = minifi::Configure::getConfigure(); + std::shared_ptr<minifi::Configure> config = std::make_shared<minifi::Configure>(); config->set(BaseLogger::nifi_log_appender, "OutputStreamAppender"); config->set( http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/372f2d6f/libminifi/test/unit/ProvenanceTestHelper.h ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/ProvenanceTestHelper.h b/libminifi/test/unit/ProvenanceTestHelper.h index 1a76be7..58ae870 100644 --- a/libminifi/test/unit/ProvenanceTestHelper.h +++ b/libminifi/test/unit/ProvenanceTestHelper.h @@ -161,7 +161,7 @@ class TestFlowController : public minifi::FlowController { public: TestFlowController(std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_file_repo) - : minifi::FlowController(repo, flow_file_repo, nullptr, "",true) { + : minifi::FlowController(repo, flow_file_repo, std::make_shared<minifi::Configure>(), nullptr, "",true) { } ~TestFlowController() { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/372f2d6f/libminifi/test/unit/RepoTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/RepoTests.cpp b/libminifi/test/unit/RepoTests.cpp index 21fae45..de51ead 100644 --- a/libminifi/test/unit/RepoTests.cpp +++ b/libminifi/test/unit/RepoTests.cpp @@ -23,6 +23,7 @@ #include "FlowFileRecord.h" #include "core/Core.h" #include "core/repository/FlowFileRepository.h" +#include "properties/Configure.h" TEST_CASE("Test Repo Empty Value Attribute", "[TestFFR1]") { @@ -38,7 +39,7 @@ TEST_CASE("Test Repo Empty Value Attribute", "[TestFFR1]") { 0, 1); - repository->initialize(); + repository->initialize(std::make_shared<minifi::Configure>()); minifi::FlowFileRecord record(repository); @@ -70,7 +71,7 @@ TEST_CASE("Test Repo Empty Key Attribute ", "[TestFFR2]") { 0, 1); - repository->initialize(); + repository->initialize(std::make_shared<minifi::Configure>()); minifi::FlowFileRecord record(repository); @@ -106,7 +107,7 @@ TEST_CASE("Test Repo Key Attribute Verify ", "[TestFFR3]") { 0, 1); - repository->initialize(); + repository->initialize(std::make_shared<org::apache::nifi::minifi::Configure>()); minifi::FlowFileRecord record(repository); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/372f2d6f/libminifi/test/unit/Site2SiteTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/Site2SiteTests.cpp b/libminifi/test/unit/Site2SiteTests.cpp index f831efe..d82a3cf 100644 --- a/libminifi/test/unit/Site2SiteTests.cpp +++ b/libminifi/test/unit/Site2SiteTests.cpp @@ -105,7 +105,7 @@ TEST_CASE("TestSiteToSiteVerifySend", "[S2S3]") { std::unique_ptr<logging::BaseLogger> outputLogger = std::unique_ptr< logging::BaseLogger>( new org::apache::nifi::minifi::core::logging::OutputStreamAppender( - std::cout, minifi::Configure::getConfigure())); + std::cout, std::make_shared<minifi::Configure>())); std::shared_ptr<logging::Logger> logger = logging::Logger::getLogger(); logger->updateLogger(std::move(outputLogger)); logger->setLogLevel("trace"); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/372f2d6f/libminifi/test/unit/YamlCongifurationTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/YamlCongifurationTests.cpp b/libminifi/test/unit/YamlCongifurationTests.cpp index f08304c..3f804ab 100644 --- a/libminifi/test/unit/YamlCongifurationTests.cpp +++ b/libminifi/test/unit/YamlCongifurationTests.cpp @@ -127,7 +127,7 @@ TEST_CASE("Test YAML Config 1", "[testyamlconfig1]") { " timeout: 30 secs\n" " batch size: 1000"; - core::YamlConfiguration *yamlConfig = new core::YamlConfiguration(TEST_PROV_REPO, TEST_FF_REPO); + core::YamlConfiguration *yamlConfig = new core::YamlConfiguration(TEST_PROV_REPO, TEST_FF_REPO, std::make_shared<minifi::io::StreamFactory>(std::make_shared<minifi::Configure>())); std::istringstream yamlstream(TEST_YAML_WITHOUT_IDS); std::unique_ptr<core::ProcessGroup> rootFlowConfig = yamlConfig->getRoot(yamlstream); @@ -178,7 +178,7 @@ TEST_CASE("Test YAML Config Missing Required Fields", "[testyamlconfig2]") { " use compression: false\n" "\n"; - core::YamlConfiguration *yamlConfig = new core::YamlConfiguration(TEST_PROV_REPO, TEST_FF_REPO); + core::YamlConfiguration *yamlConfig = new core::YamlConfiguration(TEST_PROV_REPO, TEST_FF_REPO, std::make_shared<minifi::io::StreamFactory>(std::make_shared<minifi::Configure>())); std::istringstream yamlstream(TEST_YAML_NO_RPG_PORT_ID); REQUIRE_THROWS_AS(yamlConfig->getRoot(yamlstream), std::invalid_argument); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/372f2d6f/main/MiNiFiMain.cpp ---------------------------------------------------------------------- diff --git a/main/MiNiFiMain.cpp b/main/MiNiFiMain.cpp index f4ea89e..daf4a8f 100644 --- a/main/MiNiFiMain.cpp +++ b/main/MiNiFiMain.cpp @@ -120,7 +120,7 @@ int main(int argc, char **argv) { return -1; } - minifi::Configure *configure = minifi::Configure::getConfigure(); + std::shared_ptr<minifi::Configure> configure = std::make_shared<minifi::Configure>(); configure->setHome(minifiHome); configure->loadConfigureFile(DEFAULT_NIFI_PROPERTIES_FILE); @@ -160,7 +160,7 @@ int main(int argc, char **argv) { // Create repos for flow record and provenance std::shared_ptr<core::Repository> prov_repo = core::createRepository( prov_repo_class, true); - prov_repo->initialize(); + prov_repo->initialize(configure); configure->get(minifi::Configure::nifi_flow_repository_class_name, flow_repo_class); @@ -168,17 +168,18 @@ int main(int argc, char **argv) { std::shared_ptr<core::Repository> flow_repo = core::createRepository( flow_repo_class, true); - flow_repo->initialize(); + flow_repo->initialize(configure); configure->get(minifi::Configure::nifi_configuration_class_name, nifi_configuration_class_name); + std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared<minifi::io::StreamFactory>(configure); std::unique_ptr<core::FlowConfiguration> flow_configuration = std::move( - core::createFlowConfiguration(prov_repo, flow_repo, + core::createFlowConfiguration(prov_repo, flow_repo, configure, stream_factory, nifi_configuration_class_name)); controller = std::unique_ptr<minifi::FlowController>( - new minifi::FlowController(prov_repo, flow_repo, + new minifi::FlowController(prov_repo, flow_repo, configure, std::move(flow_configuration))); // Load flow from specified configuration file
