Repository: nifi-minifi-cpp Updated Branches: refs/heads/master 63c53bcfd -> 76f06675c
MINIFI-320: Move instantiation of OpenSSL to a singleton that's lazily created. Added test. In order to make the test work as expected I had to update thread pool to use a non blocking queue. This closes #100. Signed-off-by: Aldrin Piri <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/76f06675 Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/76f06675 Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/76f06675 Branch: refs/heads/master Commit: 76f06675cdbf95e3ada102dff328e00f29c59f79 Parents: 63c53bc Author: Marc Parisi <[email protected]> Authored: Thu May 18 14:21:16 2017 -0400 Committer: Aldrin Piri <[email protected]> Committed: Fri May 19 16:44:16 2017 -0400 ---------------------------------------------------------------------- libminifi/include/io/tls/TLSSocket.h | 31 ++++++++++ libminifi/include/utils/ThreadPool.h | 59 ++++++++++---------- libminifi/src/SchedulingAgent.cpp | 6 +- libminifi/src/io/tls/TLSSocket.cpp | 41 ++++++++------ .../ControllerServiceIntegrationTests.cpp | 45 ++++++++------- libminifi/test/unit/MockClasses.h | 3 +- libminifi/test/unit/SocketTests.cpp | 58 +++++++++++++++++-- libminifi/test/unit/ThreadPoolTests.cpp | 3 +- 8 files changed, 170 insertions(+), 76 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/76f06675/libminifi/include/io/tls/TLSSocket.h ---------------------------------------------------------------------- diff --git a/libminifi/include/io/tls/TLSSocket.h b/libminifi/include/io/tls/TLSSocket.h index c14170b..1185045 100644 --- a/libminifi/include/io/tls/TLSSocket.h +++ b/libminifi/include/io/tls/TLSSocket.h @@ -20,6 +20,7 @@ #include <openssl/ssl.h> #include <openssl/err.h> +#include <atomic> #include <cstdint> #include "../ClientSocket.h" @@ -37,6 +38,36 @@ namespace io { #define TLS_ERROR_KEY_ERROR 4 #define TLS_ERROR_CERT_ERROR 5 +class OpenSSLInitializer +{ + public: + static OpenSSLInitializer *getInstance() { + OpenSSLInitializer* atomic_context = context_instance.load( + std::memory_order_relaxed); + std::atomic_thread_fence(std::memory_order_acquire); + if (atomic_context == nullptr) { + std::lock_guard<std::mutex> lock(context_mutex); + atomic_context = context_instance.load(std::memory_order_relaxed); + if (atomic_context == nullptr) { + atomic_context = new OpenSSLInitializer(); + std::atomic_thread_fence(std::memory_order_release); + context_instance.store(atomic_context, std::memory_order_relaxed); + } + } + return atomic_context; + } + + OpenSSLInitializer() + { + SSL_library_init(); + OpenSSL_add_all_algorithms(); + SSL_load_error_strings(); + } + private: + static std::atomic<OpenSSLInitializer*> context_instance; + static std::mutex context_mutex; +}; + class TLSContext: public SocketContext { public: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/76f06675/libminifi/include/utils/ThreadPool.h ---------------------------------------------------------------------- diff --git a/libminifi/include/utils/ThreadPool.h b/libminifi/include/utils/ThreadPool.h index e3c15d8..4c399a7 100644 --- a/libminifi/include/utils/ThreadPool.h +++ b/libminifi/include/utils/ThreadPool.h @@ -17,6 +17,7 @@ #ifndef LIBMINIFI_INCLUDE_THREAD_POOL_H #define LIBMINIFI_INCLUDE_THREAD_POOL_H +#include <chrono> #include <iostream> #include <atomic> #include <mutex> @@ -24,6 +25,7 @@ #include <queue> #include <future> #include <thread> +#include "concurrentqueue.h" namespace org { namespace apache { namespace nifi { @@ -43,6 +45,9 @@ class Worker { promise = std::make_shared<std::promise<T>>(); } + explicit Worker() { + } + /** * Move constructor for worker tasks */ @@ -116,9 +121,10 @@ class ThreadPool { * a future * @param task this thread pool will subsume ownership of * the worker task - * @return future with the impending result. + * @param future future to move new promise to + * @return true if future can be created and thread pool is in a running state. */ - std::future<T> execute(Worker<T> &&task); + bool execute(Worker<T> &&task, std::future<T> &future); /** * Starts the Thread Pool */ @@ -160,6 +166,7 @@ class ThreadPool { thread_queue_ = std::move(other.thread_queue_); worker_queue_ = std::move(other.worker_queue_); + if (!running_) { start(); } @@ -189,7 +196,7 @@ class ThreadPool { // atomic running boolean std::atomic<bool> running_; // worker queue of worker objects - std::queue<Worker<T>> worker_queue_; + moodycamel::ConcurrentQueue<Worker<T>> worker_queue_; // notification for available work std::condition_variable tasks_available_; // manager mutex @@ -206,20 +213,17 @@ class ThreadPool { * Runs worker tasks */ void run_tasks(); -} -; +}; template<typename T> -std::future<T> ThreadPool<T>::execute(Worker<T> &&task) { +bool ThreadPool<T>::execute(Worker<T> &&task, std::future<T> &future) { - std::unique_lock<std::mutex> lock(worker_queue_mutex_); - bool wasEmpty = worker_queue_.empty(); - std::future<T> future = task.getPromise()->get_future(); - worker_queue_.push(std::move(task)); - if (wasEmpty) { + future = std::move(task.getPromise()->get_future()); + bool enqueued = worker_queue_.enqueue(std::move(task)); + if (running_) { tasks_available_.notify_one(); } - return future; + return enqueued; } template<typename T> @@ -241,20 +245,15 @@ void ThreadPool<T>::startWorkers() { } template<typename T> void ThreadPool<T>::run_tasks() { + auto waitperiod = std::chrono::milliseconds(1) * 100; while (running_.load()) { - std::unique_lock<std::mutex> lock(worker_queue_mutex_); - if (worker_queue_.empty()) { - - tasks_available_.wait(lock); - } - - if (!running_.load()) - break; - if (worker_queue_.empty()) + Worker<T> task; + if (!worker_queue_.try_dequeue(task)) { + std::unique_lock<std::mutex> lock(worker_queue_mutex_); + tasks_available_.wait_for(lock, waitperiod); continue; - Worker<T> task = std::move(worker_queue_.front()); - worker_queue_.pop(); + } task.run(); } current_workers_--; @@ -266,16 +265,16 @@ void ThreadPool<T>::start() { if (!running_) { running_ = true; manager_thread_ = std::thread(&ThreadPool::startWorkers, this); - + if (worker_queue_.size_approx() > 0) { + tasks_available_.notify_all(); + } } } template<typename T> void ThreadPool<T>::shutdown() { - - std::lock_guard<std::recursive_mutex> lock(manager_mutex_); if (running_.load()) { - + std::lock_guard<std::recursive_mutex> lock(manager_mutex_); running_.store(false); drain(); @@ -285,8 +284,10 @@ void ThreadPool<T>::shutdown() { std::unique_lock<std::mutex> lock(worker_queue_mutex_); thread_queue_.clear(); current_workers_ = 0; - while (!worker_queue_.empty()) - worker_queue_.pop(); + while (worker_queue_.size_approx() > 0) { + Worker<T> task; + worker_queue_.try_dequeue(task); + } } } } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/76f06675/libminifi/src/SchedulingAgent.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/SchedulingAgent.cpp b/libminifi/src/SchedulingAgent.cpp index fc979fd..c582c52 100644 --- a/libminifi/src/SchedulingAgent.cpp +++ b/libminifi/src/SchedulingAgent.cpp @@ -53,7 +53,8 @@ void SchedulingAgent::enableControllerService( utils::Worker<bool> functor(f_ex); // move the functor into the thread pool. While a future is returned // we aren't terribly concerned with the result. - component_lifecycle_thread_pool_.execute(std::move(functor)); + std::future<bool> future; + component_lifecycle_thread_pool_.execute(std::move(functor), future); } void SchedulingAgent::disableControllerService( @@ -67,7 +68,8 @@ void SchedulingAgent::disableControllerService( utils::Worker<bool> functor(f_ex); // move the functor into the thread pool. While a future is returned // we aren't terribly concerned with the result. - component_lifecycle_thread_pool_.execute(std::move(functor)); + std::future<bool> future; + component_lifecycle_thread_pool_.execute(std::move(functor), future); } bool SchedulingAgent::hasTooMuchOutGoing( http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/76f06675/libminifi/src/io/tls/TLSSocket.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/io/tls/TLSSocket.cpp b/libminifi/src/io/tls/TLSSocket.cpp index f938e0a..4c3bf51 100644 --- a/libminifi/src/io/tls/TLSSocket.cpp +++ b/libminifi/src/io/tls/TLSSocket.cpp @@ -32,8 +32,12 @@ namespace nifi { namespace minifi { namespace io { +std::atomic<OpenSSLInitializer*> OpenSSLInitializer::context_instance; +std::mutex OpenSSLInitializer::context_mutex; + TLSContext::TLSContext(const std::shared_ptr<Configure> &configure) - : SocketContext(configure), error_value(0), + : SocketContext(configure), + error_value(0), ctx(0), logger_(logging::Logger::getLogger()), configure_(configure) { @@ -45,20 +49,20 @@ int16_t TLSContext::initialize() { if (ctx != 0) { return error_value; } + + if (nullptr == OpenSSLInitializer::getInstance()) { + return error_value; + } + std::string clientAuthStr; bool needClientCert = true; - if (!(configure_->get(Configure::nifi_security_need_ClientAuth, - clientAuthStr) + if (!(configure_->get(Configure::nifi_security_need_ClientAuth, clientAuthStr) && org::apache::nifi::minifi::utils::StringUtils::StringToBool( clientAuthStr, needClientCert))) { needClientCert = true; } - SSL_library_init(); const SSL_METHOD *method; - - OpenSSL_add_all_algorithms(); - SSL_load_error_strings(); method = TLSv1_2_client_method(); ctx = SSL_CTX_new(method); if (ctx == NULL) { @@ -74,9 +78,9 @@ int16_t TLSContext::initialize() { std::string caCertificate; if (!(configure_->get(Configure::nifi_security_client_certificate, - certificate) + certificate) && configure_->get(Configure::nifi_security_client_private_key, - privatekey))) { + privatekey))) { logger_->log_error( "Certificate and Private Key PEM file not configured, error: %s.", std::strerror(errno)); @@ -92,10 +96,11 @@ int16_t TLSContext::initialize() { return error_value; } if (configure_->get(Configure::nifi_security_client_pass_phrase, - passphrase)) { + passphrase)) { // if the private key has passphase SSL_CTX_set_default_passwd_cb(ctx, pemPassWordCb); - SSL_CTX_set_default_passwd_cb_userdata(ctx, static_cast<void*>(configure_.get())); + SSL_CTX_set_default_passwd_cb_userdata( + ctx, static_cast<void*>(configure_.get())); } int retp = SSL_CTX_use_PrivateKey_file(ctx, privatekey.c_str(), @@ -117,7 +122,7 @@ int16_t TLSContext::initialize() { } // load CA certificates if (configure_->get(Configure::nifi_security_client_ca_certificate, - caCertificate)) { + caCertificate)) { retp = SSL_CTX_load_verify_locations(ctx, caCertificate.c_str(), 0); if (retp == 0) { logger_->log_error("Can not load CA certificate, Exiting, error : %s", @@ -143,23 +148,25 @@ TLSSocket::~TLSSocket() { * @param port connecting port * @param listeners number of listeners in the queue */ -TLSSocket::TLSSocket(const std::shared_ptr<TLSContext> &context, const std::string &hostname, const uint16_t port, +TLSSocket::TLSSocket(const std::shared_ptr<TLSContext> &context, + const std::string &hostname, const uint16_t port, const uint16_t listeners) : Socket(context, hostname, port, listeners), ssl(0) { - context_ = context; + context_ = context; } -TLSSocket::TLSSocket(const std::shared_ptr<TLSContext> &context, const std::string &hostname, const uint16_t port) +TLSSocket::TLSSocket(const std::shared_ptr<TLSContext> &context, + const std::string &hostname, const uint16_t port) : Socket(context, hostname, port, 0), ssl(0) { - context_ = context; + context_ = context; } TLSSocket::TLSSocket(const TLSSocket &&d) : Socket(std::move(d)), ssl(0) { - context_ = d.context_; + context_ = d.context_; } int16_t TLSSocket::initialize() { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/76f06675/libminifi/test/integration/ControllerServiceIntegrationTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/integration/ControllerServiceIntegrationTests.cpp b/libminifi/test/integration/ControllerServiceIntegrationTests.cpp index 9bfee2b..4668bb9 100644 --- a/libminifi/test/integration/ControllerServiceIntegrationTests.cpp +++ b/libminifi/test/integration/ControllerServiceIntegrationTests.cpp @@ -61,6 +61,7 @@ void waitToVerifyProcessor() { std::this_thread::sleep_for(std::chrono::seconds(2)); } + int main(int argc, char **argv) { std::string test_file_location; std::string key_dir; @@ -82,12 +83,6 @@ int main(int argc, char **argv) { configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location); - /* - * nifi.security.client.certificate=/Users/mparisi/Downloads/nifi-toolkit-1.1.1/bin/cn.crt.pem - nifi.security.client.private.key=/Users/mparisi/Downloads/nifi-toolkit-1.1.1/bin/cn.ckey.pem - nifi.security.client.pass.phrase=/Users/mparisi/Downloads/nifi-toolkit-1.1.1/bin/cn.pass - nifi.security.client.ca.certificate=/Users/mparisi/Downloads/nifi-toolkit-1.1.1/bin/nifi-cert.pem - */ std::string client_cert = "cn.crt.pem"; std::string priv_key_file = "cn.ckey.pem"; std::string passphrase = "cn.pass"; @@ -147,7 +142,6 @@ int main(int argc, char **argv) { "STARTING FLOW CONTROLLER INTEGRATION TEST"); controller->load(); controller->start(); - waitToVerifyProcessor(); std::shared_ptr<core::controller::ControllerServiceNode> ssl_client_cont = controller->getControllerServiceNode("SSLClientServiceTest"); ssl_client_cont->enable(); @@ -163,23 +157,34 @@ int main(int argc, char **argv) { std::shared_ptr<core::controller::ControllerServiceNode> cs_id = controller ->getControllerServiceNode("ID"); assert(cs_id != nullptr); - controller->disableControllerService(cs_id); - disabled = true; - waitToVerifyProcessor(); - controller->enableControllerService(cs_id); - disabled = false; - waitToVerifyProcessor(); + { + std::lock_guard<std::mutex> lock(control_mutex); + controller->disableControllerService(cs_id); + disabled = true; + waitToVerifyProcessor(); + } + { + std::lock_guard<std::mutex> lock(control_mutex); + controller->enableControllerService(cs_id); + disabled = false; + waitToVerifyProcessor(); + } std::shared_ptr<core::controller::ControllerServiceNode> mock_cont = controller->getControllerServiceNode("MockItLikeIts1995"); assert(cs_id->enabled()); - - controller->disableReferencingServices(mock_cont); - disabled = true; - waitToVerifyProcessor(); + { + std::lock_guard<std::mutex> lock(control_mutex); + controller->disableReferencingServices(mock_cont); + disabled = true; + waitToVerifyProcessor(); + } assert(cs_id->enabled() == false); - controller->enableReferencingServices(mock_cont); - disabled = false; - waitToVerifyProcessor(); + { + std::lock_guard<std::mutex> lock(control_mutex); + controller->enableReferencingServices(mock_cont); + disabled = false; + waitToVerifyProcessor(); + } assert(cs_id->enabled() == true); controller->waitUnload(60000); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/76f06675/libminifi/test/unit/MockClasses.h ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/MockClasses.h b/libminifi/test/unit/MockClasses.h index d32184b..0aa4235 100644 --- a/libminifi/test/unit/MockClasses.h +++ b/libminifi/test/unit/MockClasses.h @@ -24,6 +24,7 @@ #include "core/ProcessSession.h" std::atomic<bool> disabled; +std::mutex control_mutex; class MockControllerService : public core::controller::ControllerService { public: @@ -112,7 +113,7 @@ class MockProcessor : public core::Processor { std::shared_ptr<core::controller::ControllerService> service = context ->getControllerService(linked_service); - + std::lock_guard<std::mutex> lock(control_mutex); if (!disabled.load()) { assert(true == context->isControllerServiceEnabled(linked_service)); assert(nullptr != service); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/76f06675/libminifi/test/unit/SocketTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/SocketTests.cpp b/libminifi/test/unit/SocketTests.cpp index 5a53b47..059e3c0 100644 --- a/libminifi/test/unit/SocketTests.cpp +++ b/libminifi/test/unit/SocketTests.cpp @@ -17,11 +17,16 @@ */ #define CATCH_CONFIG_MAIN // This tells Catch to provide a main() - only do this in one cpp file - -#include "../TestBase.h" -#include <memory> +#include <thread> +#include <random> +#include <chrono> #include <vector> +#include <memory> +#include <utility> +#include "../TestBase.h" #include "io/ClientSocket.h" +#include "io/tls/TLSSocket.h" +#include "utils/ThreadPool.h" TEST_CASE("TestSocket", "[TestSocket1]") { org::apache::nifi::minifi::io::Socket socket( @@ -53,7 +58,6 @@ TEST_CASE("TestSocketWriteTest1", "[TestSocket2]") { TEST_CASE("TestSocketWriteTest2", "[TestSocket3]") { std::vector<uint8_t> buffer; buffer.push_back('a'); - std::shared_ptr<org::apache::nifi::minifi::io::SocketContext> socket_context = std::make_shared<org::apache::nifi::minifi::io::SocketContext>( std::make_shared<minifi::Configure>()); @@ -89,7 +93,6 @@ TEST_CASE("TestGetHostName", "[TestSocket4]") { TEST_CASE("TestWriteEndian64", "[TestSocket4]") { std::vector<uint8_t> buffer; buffer.push_back('a'); - std::shared_ptr<org::apache::nifi::minifi::io::SocketContext> socket_context = std::make_shared<org::apache::nifi::minifi::io::SocketContext>( std::make_shared<minifi::Configure>()); @@ -127,7 +130,6 @@ TEST_CASE("TestWriteEndian32", "[TestSocket5]") { org::apache::nifi::minifi::io::Socket server(socket_context, "localhost", 9183, 1); - REQUIRE(-1 != server.initialize()); org::apache::nifi::minifi::io::Socket client(socket_context, "localhost", @@ -190,3 +192,47 @@ TEST_CASE("TestSocketWriteTestAfterClose", "[TestSocket6]") { server.closeStream(); } + +std::atomic<uint8_t> counter; +std::mt19937_64 seed { std::random_device { }() }; +bool createSocket() { + int mine = counter++; + std::shared_ptr<minifi::Configure> configuration = std::make_shared< + minifi::Configure>(); + + std::uniform_int_distribution<> distribution { 10, 100 }; + std::this_thread::sleep_for(std::chrono::milliseconds { distribution(seed) }); + + for (int i = 0; i < 50; i++) { + std::shared_ptr<org::apache::nifi::minifi::io::TLSContext> socketA = + std::make_shared<org::apache::nifi::minifi::io::TLSContext>( + configuration); + socketA->initialize(); + } + + return true; +} +/** + * MINIFI-320 was created to address reallocations within TLSContext + * This test will create 20 threads that attempt to create contexts + * to ensure we no longer see the segfaults. + */ +TEST_CASE("TestTLSContextCreation", "[TestSocket6]") { + utils::ThreadPool<bool> pool(20, true); + + std::vector<std::future<bool>> futures; + for (int i = 0; i < 20; i++) { + std::function<bool()> f_ex = createSocket; + utils::Worker<bool> functor(f_ex); + std::future<bool> fut; + REQUIRE(true == pool.execute(std::move(functor), fut)); + futures.push_back(std::move(fut)); + } + pool.start(); + for (auto &&future : futures) { + future.wait(); + } + + REQUIRE(20 == counter.load()); +} + http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/76f06675/libminifi/test/unit/ThreadPoolTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/ThreadPoolTests.cpp b/libminifi/test/unit/ThreadPoolTests.cpp index 0fa75e6..0bba767 100644 --- a/libminifi/test/unit/ThreadPoolTests.cpp +++ b/libminifi/test/unit/ThreadPoolTests.cpp @@ -31,7 +31,8 @@ TEST_CASE("ThreadPoolTest1", "[TPT1]") { std::function<bool()> f_ex = function; utils::Worker<bool> functor(f_ex); pool.start(); - std::future<bool> fut = pool.execute(std::move(functor)); + std::future<bool> fut; + REQUIRE(true == pool.execute(std::move(functor), fut)); fut.wait(); REQUIRE(true == fut.get()); }
