Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master 63c53bcfd -> 76f06675c


MINIFI-320: Move instantiation of OpenSSL to a singleton that's
lazily created. Added test. In order to make the test work as
expected I had to update thread pool to use a non blocking queue.

This closes #100.

Signed-off-by: Aldrin Piri <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/76f06675
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/76f06675
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/76f06675

Branch: refs/heads/master
Commit: 76f06675cdbf95e3ada102dff328e00f29c59f79
Parents: 63c53bc
Author: Marc Parisi <[email protected]>
Authored: Thu May 18 14:21:16 2017 -0400
Committer: Aldrin Piri <[email protected]>
Committed: Fri May 19 16:44:16 2017 -0400

----------------------------------------------------------------------
 libminifi/include/io/tls/TLSSocket.h            | 31 ++++++++++
 libminifi/include/utils/ThreadPool.h            | 59 ++++++++++----------
 libminifi/src/SchedulingAgent.cpp               |  6 +-
 libminifi/src/io/tls/TLSSocket.cpp              | 41 ++++++++------
 .../ControllerServiceIntegrationTests.cpp       | 45 ++++++++-------
 libminifi/test/unit/MockClasses.h               |  3 +-
 libminifi/test/unit/SocketTests.cpp             | 58 +++++++++++++++++--
 libminifi/test/unit/ThreadPoolTests.cpp         |  3 +-
 8 files changed, 170 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/76f06675/libminifi/include/io/tls/TLSSocket.h
----------------------------------------------------------------------
diff --git a/libminifi/include/io/tls/TLSSocket.h 
b/libminifi/include/io/tls/TLSSocket.h
index c14170b..1185045 100644
--- a/libminifi/include/io/tls/TLSSocket.h
+++ b/libminifi/include/io/tls/TLSSocket.h
@@ -20,6 +20,7 @@
 
 #include <openssl/ssl.h>
 #include <openssl/err.h>
+#include <atomic>
 #include <cstdint>
 #include "../ClientSocket.h"
 
@@ -37,6 +38,36 @@ namespace io {
 #define TLS_ERROR_KEY_ERROR 4
 #define TLS_ERROR_CERT_ERROR 5
 
+class OpenSSLInitializer
+{
+ public:
+  static OpenSSLInitializer *getInstance() {
+    OpenSSLInitializer* atomic_context = context_instance.load(
+         std::memory_order_relaxed);
+     std::atomic_thread_fence(std::memory_order_acquire);
+     if (atomic_context == nullptr) {
+       std::lock_guard<std::mutex> lock(context_mutex);
+       atomic_context = context_instance.load(std::memory_order_relaxed);
+       if (atomic_context == nullptr) {
+         atomic_context = new OpenSSLInitializer();
+         std::atomic_thread_fence(std::memory_order_release);
+         context_instance.store(atomic_context, std::memory_order_relaxed);
+       }
+     }
+     return atomic_context;
+   }
+
+  OpenSSLInitializer()
+  {
+    SSL_library_init();
+    OpenSSL_add_all_algorithms();
+    SSL_load_error_strings();
+  }
+ private:
+  static std::atomic<OpenSSLInitializer*> context_instance;
+  static std::mutex context_mutex;
+};
+
 class TLSContext: public SocketContext {
 
  public:

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/76f06675/libminifi/include/utils/ThreadPool.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/ThreadPool.h 
b/libminifi/include/utils/ThreadPool.h
index e3c15d8..4c399a7 100644
--- a/libminifi/include/utils/ThreadPool.h
+++ b/libminifi/include/utils/ThreadPool.h
@@ -17,6 +17,7 @@
 #ifndef LIBMINIFI_INCLUDE_THREAD_POOL_H
 #define LIBMINIFI_INCLUDE_THREAD_POOL_H
 
+#include <chrono>
 #include <iostream>
 #include <atomic>
 #include <mutex>
@@ -24,6 +25,7 @@
 #include <queue>
 #include <future>
 #include <thread>
+#include "concurrentqueue.h"
 namespace org {
 namespace apache {
 namespace nifi {
@@ -43,6 +45,9 @@ class Worker {
     promise = std::make_shared<std::promise<T>>();
   }
 
+  explicit Worker() {
+  }
+
   /**
    * Move constructor for worker tasks
    */
@@ -116,9 +121,10 @@ class ThreadPool {
    * a future
    * @param task this thread pool will subsume ownership of
    * the worker task
-   * @return future with the impending result.
+   * @param future future to move new promise to
+   * @return true if future can be created and thread pool is in a running 
state.
    */
-  std::future<T> execute(Worker<T> &&task);
+  bool execute(Worker<T> &&task, std::future<T> &future);
   /**
    * Starts the Thread Pool
    */
@@ -160,6 +166,7 @@ class ThreadPool {
 
     thread_queue_ = std::move(other.thread_queue_);
     worker_queue_ = std::move(other.worker_queue_);
+
     if (!running_) {
       start();
     }
@@ -189,7 +196,7 @@ class ThreadPool {
 // atomic running boolean
   std::atomic<bool> running_;
 // worker queue of worker objects
-  std::queue<Worker<T>> worker_queue_;
+  moodycamel::ConcurrentQueue<Worker<T>> worker_queue_;
 // notification for available work
   std::condition_variable tasks_available_;
 // manager mutex
@@ -206,20 +213,17 @@ class ThreadPool {
    * Runs worker tasks
    */
   void run_tasks();
-}
-;
+};
 
 template<typename T>
-std::future<T> ThreadPool<T>::execute(Worker<T> &&task) {
+bool ThreadPool<T>::execute(Worker<T> &&task, std::future<T> &future) {
 
-  std::unique_lock<std::mutex> lock(worker_queue_mutex_);
-  bool wasEmpty = worker_queue_.empty();
-  std::future<T> future = task.getPromise()->get_future();
-  worker_queue_.push(std::move(task));
-  if (wasEmpty) {
+  future = std::move(task.getPromise()->get_future());
+  bool enqueued = worker_queue_.enqueue(std::move(task));
+  if (running_) {
     tasks_available_.notify_one();
   }
-  return future;
+  return enqueued;
 }
 
 template<typename T>
@@ -241,20 +245,15 @@ void ThreadPool<T>::startWorkers() {
 }
 template<typename T>
 void ThreadPool<T>::run_tasks() {
+  auto waitperiod = std::chrono::milliseconds(1) * 100;
   while (running_.load()) {
-    std::unique_lock<std::mutex> lock(worker_queue_mutex_);
-    if (worker_queue_.empty()) {
-
-      tasks_available_.wait(lock);
-    }
-
-    if (!running_.load())
-      break;
 
-    if (worker_queue_.empty())
+    Worker<T> task;
+    if (!worker_queue_.try_dequeue(task)) {
+      std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+      tasks_available_.wait_for(lock, waitperiod);
       continue;
-    Worker<T> task = std::move(worker_queue_.front());
-    worker_queue_.pop();
+    }
     task.run();
   }
   current_workers_--;
@@ -266,16 +265,16 @@ void ThreadPool<T>::start() {
   if (!running_) {
     running_ = true;
     manager_thread_ = std::thread(&ThreadPool::startWorkers, this);
-
+    if (worker_queue_.size_approx() > 0) {
+      tasks_available_.notify_all();
+    }
   }
 }
 
 template<typename T>
 void ThreadPool<T>::shutdown() {
-
-  std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
   if (running_.load()) {
-
+    std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
     running_.store(false);
 
     drain();
@@ -285,8 +284,10 @@ void ThreadPool<T>::shutdown() {
       std::unique_lock<std::mutex> lock(worker_queue_mutex_);
       thread_queue_.clear();
       current_workers_ = 0;
-      while (!worker_queue_.empty())
-        worker_queue_.pop();
+      while (worker_queue_.size_approx() > 0) {
+        Worker<T> task;
+        worker_queue_.try_dequeue(task);
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/76f06675/libminifi/src/SchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/SchedulingAgent.cpp 
b/libminifi/src/SchedulingAgent.cpp
index fc979fd..c582c52 100644
--- a/libminifi/src/SchedulingAgent.cpp
+++ b/libminifi/src/SchedulingAgent.cpp
@@ -53,7 +53,8 @@ void SchedulingAgent::enableControllerService(
   utils::Worker<bool> functor(f_ex);
   // move the functor into the thread pool. While a future is returned
   // we aren't terribly concerned with the result.
-  component_lifecycle_thread_pool_.execute(std::move(functor));
+  std::future<bool> future;
+  component_lifecycle_thread_pool_.execute(std::move(functor), future);
 }
 
 void SchedulingAgent::disableControllerService(
@@ -67,7 +68,8 @@ void SchedulingAgent::disableControllerService(
   utils::Worker<bool> functor(f_ex);
   // move the functor into the thread pool. While a future is returned
   // we aren't terribly concerned with the result.
-  component_lifecycle_thread_pool_.execute(std::move(functor));
+  std::future<bool> future;
+  component_lifecycle_thread_pool_.execute(std::move(functor), future);
 }
 
 bool SchedulingAgent::hasTooMuchOutGoing(

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/76f06675/libminifi/src/io/tls/TLSSocket.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/tls/TLSSocket.cpp 
b/libminifi/src/io/tls/TLSSocket.cpp
index f938e0a..4c3bf51 100644
--- a/libminifi/src/io/tls/TLSSocket.cpp
+++ b/libminifi/src/io/tls/TLSSocket.cpp
@@ -32,8 +32,12 @@ namespace nifi {
 namespace minifi {
 namespace io {
 
+std::atomic<OpenSSLInitializer*> OpenSSLInitializer::context_instance;
+std::mutex OpenSSLInitializer::context_mutex;
+
 TLSContext::TLSContext(const std::shared_ptr<Configure> &configure)
-    : SocketContext(configure), error_value(0),
+    : SocketContext(configure),
+      error_value(0),
       ctx(0),
       logger_(logging::Logger::getLogger()),
       configure_(configure) {
@@ -45,20 +49,20 @@ int16_t TLSContext::initialize() {
   if (ctx != 0) {
     return error_value;
   }
+
+  if (nullptr == OpenSSLInitializer::getInstance()) {
+    return error_value;
+  }
+
   std::string clientAuthStr;
   bool needClientCert = true;
-  if (!(configure_->get(Configure::nifi_security_need_ClientAuth,
-                           clientAuthStr)
+  if (!(configure_->get(Configure::nifi_security_need_ClientAuth, 
clientAuthStr)
       && org::apache::nifi::minifi::utils::StringUtils::StringToBool(
           clientAuthStr, needClientCert))) {
     needClientCert = true;
   }
 
-  SSL_library_init();
   const SSL_METHOD *method;
-
-  OpenSSL_add_all_algorithms();
-  SSL_load_error_strings();
   method = TLSv1_2_client_method();
   ctx = SSL_CTX_new(method);
   if (ctx == NULL) {
@@ -74,9 +78,9 @@ int16_t TLSContext::initialize() {
     std::string caCertificate;
 
     if (!(configure_->get(Configure::nifi_security_client_certificate,
-                             certificate)
+                          certificate)
         && configure_->get(Configure::nifi_security_client_private_key,
-                              privatekey))) {
+                           privatekey))) {
       logger_->log_error(
           "Certificate and Private Key PEM file not configured, error: %s.",
           std::strerror(errno));
@@ -92,10 +96,11 @@ int16_t TLSContext::initialize() {
       return error_value;
     }
     if (configure_->get(Configure::nifi_security_client_pass_phrase,
-                           passphrase)) {
+                        passphrase)) {
       // if the private key has passphase
       SSL_CTX_set_default_passwd_cb(ctx, pemPassWordCb);
-      SSL_CTX_set_default_passwd_cb_userdata(ctx, 
static_cast<void*>(configure_.get()));
+      SSL_CTX_set_default_passwd_cb_userdata(
+          ctx, static_cast<void*>(configure_.get()));
     }
 
     int retp = SSL_CTX_use_PrivateKey_file(ctx, privatekey.c_str(),
@@ -117,7 +122,7 @@ int16_t TLSContext::initialize() {
     }
     // load CA certificates
     if (configure_->get(Configure::nifi_security_client_ca_certificate,
-                           caCertificate)) {
+                        caCertificate)) {
       retp = SSL_CTX_load_verify_locations(ctx, caCertificate.c_str(), 0);
       if (retp == 0) {
         logger_->log_error("Can not load CA certificate, Exiting, error : %s",
@@ -143,23 +148,25 @@ TLSSocket::~TLSSocket() {
  * @param port connecting port
  * @param listeners number of listeners in the queue
  */
-TLSSocket::TLSSocket(const std::shared_ptr<TLSContext> &context, const 
std::string &hostname, const uint16_t port,
+TLSSocket::TLSSocket(const std::shared_ptr<TLSContext> &context,
+                     const std::string &hostname, const uint16_t port,
                      const uint16_t listeners)
     : Socket(context, hostname, port, listeners),
       ssl(0) {
-        context_ = context;
+  context_ = context;
 }
 
-TLSSocket::TLSSocket(const std::shared_ptr<TLSContext> &context, const 
std::string &hostname, const uint16_t port)
+TLSSocket::TLSSocket(const std::shared_ptr<TLSContext> &context,
+                     const std::string &hostname, const uint16_t port)
     : Socket(context, hostname, port, 0),
       ssl(0) {
-        context_ = context;
+  context_ = context;
 }
 
 TLSSocket::TLSSocket(const TLSSocket &&d)
     : Socket(std::move(d)),
       ssl(0) {
-        context_ = d.context_;
+  context_ = d.context_;
 }
 
 int16_t TLSSocket::initialize() {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/76f06675/libminifi/test/integration/ControllerServiceIntegrationTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/integration/ControllerServiceIntegrationTests.cpp 
b/libminifi/test/integration/ControllerServiceIntegrationTests.cpp
index 9bfee2b..4668bb9 100644
--- a/libminifi/test/integration/ControllerServiceIntegrationTests.cpp
+++ b/libminifi/test/integration/ControllerServiceIntegrationTests.cpp
@@ -61,6 +61,7 @@ void waitToVerifyProcessor() {
   std::this_thread::sleep_for(std::chrono::seconds(2));
 }
 
+
 int main(int argc, char **argv) {
   std::string test_file_location;
   std::string key_dir;
@@ -82,12 +83,6 @@ int main(int argc, char **argv) {
 
   configuration->set(minifi::Configure::nifi_flow_configuration_file,
                      test_file_location);
-  /*
-   * 
nifi.security.client.certificate=/Users/mparisi/Downloads/nifi-toolkit-1.1.1/bin/cn.crt.pem
-   
nifi.security.client.private.key=/Users/mparisi/Downloads/nifi-toolkit-1.1.1/bin/cn.ckey.pem
-   
nifi.security.client.pass.phrase=/Users/mparisi/Downloads/nifi-toolkit-1.1.1/bin/cn.pass
-   
nifi.security.client.ca.certificate=/Users/mparisi/Downloads/nifi-toolkit-1.1.1/bin/nifi-cert.pem
-   */
   std::string client_cert = "cn.crt.pem";
   std::string priv_key_file = "cn.ckey.pem";
   std::string passphrase = "cn.pass";
@@ -147,7 +142,6 @@ int main(int argc, char **argv) {
       "STARTING FLOW CONTROLLER INTEGRATION TEST");
   controller->load();
   controller->start();
-  waitToVerifyProcessor();
   std::shared_ptr<core::controller::ControllerServiceNode> ssl_client_cont =
       controller->getControllerServiceNode("SSLClientServiceTest");
   ssl_client_cont->enable();
@@ -163,23 +157,34 @@ int main(int argc, char **argv) {
   std::shared_ptr<core::controller::ControllerServiceNode> cs_id = controller
       ->getControllerServiceNode("ID");
   assert(cs_id != nullptr);
-  controller->disableControllerService(cs_id);
-  disabled = true;
-  waitToVerifyProcessor();
-  controller->enableControllerService(cs_id);
-  disabled = false;
-  waitToVerifyProcessor();
+  {
+    std::lock_guard<std::mutex> lock(control_mutex);
+    controller->disableControllerService(cs_id);
+    disabled = true;
+    waitToVerifyProcessor();
+  }
+  {
+    std::lock_guard<std::mutex> lock(control_mutex);
+    controller->enableControllerService(cs_id);
+    disabled = false;
+    waitToVerifyProcessor();
+  }
   std::shared_ptr<core::controller::ControllerServiceNode> mock_cont =
       controller->getControllerServiceNode("MockItLikeIts1995");
   assert(cs_id->enabled());
-
-  controller->disableReferencingServices(mock_cont);
-  disabled = true;
-  waitToVerifyProcessor();
+  {
+    std::lock_guard<std::mutex> lock(control_mutex);
+    controller->disableReferencingServices(mock_cont);
+    disabled = true;
+    waitToVerifyProcessor();
+  }
   assert(cs_id->enabled() == false);
-  controller->enableReferencingServices(mock_cont);
-  disabled = false;
-  waitToVerifyProcessor();
+  {
+    std::lock_guard<std::mutex> lock(control_mutex);
+    controller->enableReferencingServices(mock_cont);
+    disabled = false;
+    waitToVerifyProcessor();
+  }
   assert(cs_id->enabled() == true);
 
   controller->waitUnload(60000);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/76f06675/libminifi/test/unit/MockClasses.h
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/MockClasses.h 
b/libminifi/test/unit/MockClasses.h
index d32184b..0aa4235 100644
--- a/libminifi/test/unit/MockClasses.h
+++ b/libminifi/test/unit/MockClasses.h
@@ -24,6 +24,7 @@
 #include "core/ProcessSession.h"
 
 std::atomic<bool> disabled;
+std::mutex control_mutex;
 
 class MockControllerService : public core::controller::ControllerService {
  public:
@@ -112,7 +113,7 @@ class MockProcessor : public core::Processor {
 
       std::shared_ptr<core::controller::ControllerService> service = context
           ->getControllerService(linked_service);
-
+      std::lock_guard<std::mutex> lock(control_mutex);
       if (!disabled.load()) {
         assert(true == context->isControllerServiceEnabled(linked_service));
         assert(nullptr != service);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/76f06675/libminifi/test/unit/SocketTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/SocketTests.cpp 
b/libminifi/test/unit/SocketTests.cpp
index 5a53b47..059e3c0 100644
--- a/libminifi/test/unit/SocketTests.cpp
+++ b/libminifi/test/unit/SocketTests.cpp
@@ -17,11 +17,16 @@
  */
 
 #define CATCH_CONFIG_MAIN  // This tells Catch to provide a main() - only do 
this in one cpp file
-
-#include "../TestBase.h"
-#include <memory>
+#include <thread>
+#include <random>
+#include <chrono>
 #include <vector>
+#include <memory>
+#include <utility>
+#include "../TestBase.h"
 #include "io/ClientSocket.h"
+#include "io/tls/TLSSocket.h"
+#include "utils/ThreadPool.h"
 
 TEST_CASE("TestSocket", "[TestSocket1]") {
   org::apache::nifi::minifi::io::Socket socket(
@@ -53,7 +58,6 @@ TEST_CASE("TestSocketWriteTest1", "[TestSocket2]") {
 TEST_CASE("TestSocketWriteTest2", "[TestSocket3]") {
   std::vector<uint8_t> buffer;
   buffer.push_back('a');
-
   std::shared_ptr<org::apache::nifi::minifi::io::SocketContext> socket_context 
=
       std::make_shared<org::apache::nifi::minifi::io::SocketContext>(
           std::make_shared<minifi::Configure>());
@@ -89,7 +93,6 @@ TEST_CASE("TestGetHostName", "[TestSocket4]") {
 TEST_CASE("TestWriteEndian64", "[TestSocket4]") {
   std::vector<uint8_t> buffer;
   buffer.push_back('a');
-
   std::shared_ptr<org::apache::nifi::minifi::io::SocketContext> socket_context 
=
       std::make_shared<org::apache::nifi::minifi::io::SocketContext>(
           std::make_shared<minifi::Configure>());
@@ -127,7 +130,6 @@ TEST_CASE("TestWriteEndian32", "[TestSocket5]") {
 
   org::apache::nifi::minifi::io::Socket server(socket_context, "localhost",
                                                9183, 1);
-
   REQUIRE(-1 != server.initialize());
 
   org::apache::nifi::minifi::io::Socket client(socket_context, "localhost",
@@ -190,3 +192,47 @@ TEST_CASE("TestSocketWriteTestAfterClose", 
"[TestSocket6]") {
 
   server.closeStream();
 }
+
+std::atomic<uint8_t> counter;
+std::mt19937_64 seed { std::random_device { }() };
+bool createSocket() {
+  int mine = counter++;
+  std::shared_ptr<minifi::Configure> configuration = std::make_shared<
+      minifi::Configure>();
+
+  std::uniform_int_distribution<> distribution { 10, 100 };
+  std::this_thread::sleep_for(std::chrono::milliseconds { distribution(seed) 
});
+
+  for (int i = 0; i < 50; i++) {
+    std::shared_ptr<org::apache::nifi::minifi::io::TLSContext> socketA =
+        std::make_shared<org::apache::nifi::minifi::io::TLSContext>(
+            configuration);
+    socketA->initialize();
+  }
+
+  return true;
+}
+/**
+ * MINIFI-320 was created to address reallocations within TLSContext
+ * This test will create 20 threads that attempt to create contexts
+ * to ensure we no longer see the segfaults.
+ */
+TEST_CASE("TestTLSContextCreation", "[TestSocket6]") {
+  utils::ThreadPool<bool> pool(20, true);
+
+  std::vector<std::future<bool>> futures;
+  for (int i = 0; i < 20; i++) {
+    std::function<bool()> f_ex = createSocket;
+    utils::Worker<bool> functor(f_ex);
+    std::future<bool> fut;
+    REQUIRE(true == pool.execute(std::move(functor), fut));
+    futures.push_back(std::move(fut));
+  }
+  pool.start();
+  for (auto &&future : futures) {
+    future.wait();
+  }
+
+  REQUIRE(20 == counter.load());
+}
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/76f06675/libminifi/test/unit/ThreadPoolTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/ThreadPoolTests.cpp 
b/libminifi/test/unit/ThreadPoolTests.cpp
index 0fa75e6..0bba767 100644
--- a/libminifi/test/unit/ThreadPoolTests.cpp
+++ b/libminifi/test/unit/ThreadPoolTests.cpp
@@ -31,7 +31,8 @@ TEST_CASE("ThreadPoolTest1", "[TPT1]") {
   std::function<bool()> f_ex = function;
   utils::Worker<bool> functor(f_ex);
   pool.start();
-  std::future<bool> fut = pool.execute(std::move(functor));
+  std::future<bool> fut;
+  REQUIRE(true == pool.execute(std::move(functor), fut));
   fut.wait();
   REQUIRE(true == fut.get());
 }

Reply via email to