Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master 1257e5291 -> d7885d6bd


MINIFICPP-677: Change behavior of async callback

This closes #441.

Signed-off-by: Aldrin Piri <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/d7885d6b
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/d7885d6b
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/d7885d6b

Branch: refs/heads/master
Commit: d7885d6bd8dea00d584fe9fcbc3eb21a12d43205
Parents: 1257e52
Author: Marc Parisi <[email protected]>
Authored: Thu Nov 15 15:30:46 2018 -0500
Committer: Aldrin Piri <[email protected]>
Committed: Thu Nov 15 20:00:30 2018 -0500

----------------------------------------------------------------------
 extensions/http-curl/client/HTTPClient.cpp      |  4 ++
 extensions/http-curl/client/HTTPStream.cpp      | 15 +++---
 extensions/http-curl/client/HTTPStream.h        |  3 ++
 .../http-curl/tests/HTTPIntegrationBase.h       | 15 ++++--
 .../http-curl/tests/HTTPSiteToSiteTests.cpp     |  2 +-
 libminifi/include/utils/ByteArrayCallback.h     |  2 +-
 libminifi/test/integration/IntegrationBase.h    | 52 +++++++++-----------
 7 files changed, 50 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d7885d6b/extensions/http-curl/client/HTTPClient.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/client/HTTPClient.cpp 
b/extensions/http-curl/client/HTTPClient.cpp
index 7940c6e..d607664 100644
--- a/extensions/http-curl/client/HTTPClient.cpp
+++ b/extensions/http-curl/client/HTTPClient.cpp
@@ -99,6 +99,10 @@ HTTPClient::~HTTPClient() {
     curl_easy_cleanup(http_session_);
     http_session_ = nullptr;
   }
+  // forceClose ended up not being the issue in MINIFICPP-667, but leaving here
+  // out of good hygiene.
+  forceClose();
+  read_callback_.close();
   logger_->log_trace("Closing HTTPClient for %s", url_);
 }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d7885d6b/extensions/http-curl/client/HTTPStream.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/client/HTTPStream.cpp 
b/extensions/http-curl/client/HTTPStream.cpp
index 608870b..8735b61 100644
--- a/extensions/http-curl/client/HTTPStream.cpp
+++ b/extensions/http-curl/client/HTTPStream.cpp
@@ -37,9 +37,9 @@ HttpStream::HttpStream(std::shared_ptr<utils::HTTPClient> 
client)
       written(0),
       // given the nature of the stream we don't want to slow libCURL, we will 
produce
       // a warning instead allowing us to adjust it server side or through the 
local configuration.
-      http_read_callback_(66560,true),
+      http_read_callback_(66560, true),
       started_(false),
-      logger_(logging::LoggerFactory<HttpStream>::getLogger()){
+      logger_(logging::LoggerFactory<HttpStream>::getLogger()) {
   // submit early on
 }
 
@@ -54,7 +54,7 @@ void HttpStream::seek(uint64_t offset) {
 }
 
 int HttpStream::writeData(std::vector<uint8_t> &buf, int buflen) {
-  if ((int)buf.capacity() < buflen) {
+  if ((int) buf.capacity() < buflen) {
     return -1;
   }
   return writeData(reinterpret_cast<uint8_t *>(&buf[0]), buflen);
@@ -70,11 +70,11 @@ int HttpStream::writeData(uint8_t *value, int size) {
         callback_.ptr = &http_callback_;
         callback_.pos = 0;
         http_client_->setUploadCallback(&callback_);
-        http_client_future_ = std::async(submit_client, http_client_);
+        http_client_future_ = std::async(std::launch::async, submit_client, 
http_client_);
         started_ = true;
       }
     }
-    http_callback_.process(value,size);
+    http_callback_.process(value, size);
     return size;
   } else {
     return -1;
@@ -90,7 +90,7 @@ inline std::vector<uint8_t> HttpStream::readBuffer(const T& 
t) {
 }
 
 int HttpStream::readData(std::vector<uint8_t> &buf, int buflen) {
-  if ((int)buf.capacity() < buflen) {
+  if ((int) buf.capacity() < buflen) {
     buf.resize(buflen);
   }
   int ret = readData(reinterpret_cast<uint8_t*>(&buf[0]), buflen);
@@ -109,11 +109,10 @@ int HttpStream::readData(uint8_t *buf, int buflen) {
         read_callback_.ptr = &http_read_callback_;
         read_callback_.pos = 0;
         http_client_->setReadCallback(&read_callback_);
-        http_client_future_ = std::async(submit_read_client, http_client_, 
&http_read_callback_);
+        http_client_future_ = std::async(std::launch::async, 
submit_read_client, http_client_, &http_read_callback_);
         started_ = true;
       }
     }
-
     return http_read_callback_.readFully((char*) buf, buflen);
 
   } else {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d7885d6b/extensions/http-curl/client/HTTPStream.h
----------------------------------------------------------------------
diff --git a/extensions/http-curl/client/HTTPStream.h 
b/extensions/http-curl/client/HTTPStream.h
index d3e5bca..3829e94 100644
--- a/extensions/http-curl/client/HTTPStream.h
+++ b/extensions/http-curl/client/HTTPStream.h
@@ -59,6 +59,9 @@ class HttpStream : public io::BaseStream {
 
   void forceClose() {
     if (started_) {
+      // lock shouldn't be needed here as call paths currently guarantee
+      // flow, but we should be safe anyway.
+      std::lock_guard<std::mutex> lock(mutex_);
       closeStream();
       http_client_->forceClose();
       if (http_client_future_.valid()) {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d7885d6b/extensions/http-curl/tests/HTTPIntegrationBase.h
----------------------------------------------------------------------
diff --git a/extensions/http-curl/tests/HTTPIntegrationBase.h 
b/extensions/http-curl/tests/HTTPIntegrationBase.h
index 8defc56..1bc4c72 100644
--- a/extensions/http-curl/tests/HTTPIntegrationBase.h
+++ b/extensions/http-curl/tests/HTTPIntegrationBase.h
@@ -34,18 +34,25 @@ int ssl_enable(void *ssl_context, void *user_data) {
 
 class HTTPIntegrationBase : public IntegrationBase {
  public:
-  HTTPIntegrationBase() : IntegrationBase(), server(nullptr) {}
+  HTTPIntegrationBase(uint64_t waitTime = 60000)
+      : IntegrationBase(waitTime),
+        server(nullptr) {
+  }
 
   void setUrl(std::string url, CivetHandler *handler);
 
   virtual ~HTTPIntegrationBase();
 
+  void shutdownBeforeFlowController() {
+    stop_webserver(server);
+  }
+
  protected:
   CivetServer *server;
 };
 
 HTTPIntegrationBase::~HTTPIntegrationBase() {
-  stop_webserver(server);
+
 }
 
 void HTTPIntegrationBase::setUrl(std::string url, CivetHandler *handler) {
@@ -53,8 +60,8 @@ void HTTPIntegrationBase::setUrl(std::string url, 
CivetHandler *handler) {
   parse_http_components(url, port, scheme, path);
   struct mg_callbacks callback;
   if (url.find("localhost") != std::string::npos) {
-    if (server != nullptr){
-      server->addHandler(path,handler);
+    if (server != nullptr) {
+      server->addHandler(path, handler);
       return;
     }
     if (scheme == "https" && !key_dir.empty()) {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d7885d6b/extensions/http-curl/tests/HTTPSiteToSiteTests.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/tests/HTTPSiteToSiteTests.cpp 
b/extensions/http-curl/tests/HTTPSiteToSiteTests.cpp
index dd457b6..b908208 100644
--- a/extensions/http-curl/tests/HTTPSiteToSiteTests.cpp
+++ b/extensions/http-curl/tests/HTTPSiteToSiteTests.cpp
@@ -53,7 +53,7 @@
 class SiteToSiteTestHarness : public HTTPIntegrationBase {
  public:
   explicit SiteToSiteTestHarness(bool isSecure)
-      : isSecure(isSecure) {
+      : HTTPIntegrationBase(2000), isSecure(isSecure) {
     char format[] = "/tmp/ssth.XXXXXX";
     dir = testController.createTempDirectory(format);
   }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d7885d6b/libminifi/include/utils/ByteArrayCallback.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/ByteArrayCallback.h 
b/libminifi/include/utils/ByteArrayCallback.h
index 49249a7..6c3bf81 100644
--- a/libminifi/include/utils/ByteArrayCallback.h
+++ b/libminifi/include/utils/ByteArrayCallback.h
@@ -104,7 +104,7 @@ class ByteOutputCallback : public OutputStreamCallback {
   }
 
   virtual ~ByteOutputCallback() {
-
+         close();
   }
 
   virtual int64_t process(std::shared_ptr<io::BaseStream> stream);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d7885d6b/libminifi/test/integration/IntegrationBase.h
----------------------------------------------------------------------
diff --git a/libminifi/test/integration/IntegrationBase.h 
b/libminifi/test/integration/IntegrationBase.h
index cb86b7e..4d7e303 100644
--- a/libminifi/test/integration/IntegrationBase.h
+++ b/libminifi/test/integration/IntegrationBase.h
@@ -31,7 +31,7 @@
 
 class IntegrationBase {
  public:
-  IntegrationBase();
+  IntegrationBase(uint64_t waitTime = 60000);
 
   virtual ~IntegrationBase();
 
@@ -44,6 +44,10 @@ class IntegrationBase {
 
   virtual void testSetup() = 0;
 
+  virtual void shutdownBeforeFlowController() {
+
+  }
+
   virtual void cleanup() = 0;
 
   virtual void runAssertions() = 0;
@@ -60,17 +64,18 @@ class IntegrationBase {
 
   void configureSecurity();
   std::shared_ptr<minifi::Configure> configuration;
+  uint64_t wait_time_;
   std::string port, scheme, path;
   std::string key_dir;
 };
 
-IntegrationBase::IntegrationBase() 
-  : configuration(std::make_shared<minifi::Configure>()) {
+IntegrationBase::IntegrationBase(uint64_t waitTime)
+    : configuration(std::make_shared<minifi::Configure>()),
+      wait_time_(waitTime) {
   mkdir("content_repository", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
 }
 
-IntegrationBase::~IntegrationBase()
-{
+IntegrationBase::~IntegrationBase() {
   rmdir("./content_repository");
 }
 
@@ -87,48 +92,37 @@ void IntegrationBase::configureSecurity() {
 void IntegrationBase::run(std::string test_file_location) {
   testSetup();
 
-  std::shared_ptr<core::Repository> test_repo =
-      std::make_shared<TestRepository>();
-  std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<
-      TestFlowRepository>();
+  std::shared_ptr<core::Repository> test_repo = 
std::make_shared<TestRepository>();
+  std::shared_ptr<core::Repository> test_flow_repo = 
std::make_shared<TestFlowRepository>();
 
-  configuration->set(minifi::Configure::nifi_flow_configuration_file,
-                     test_file_location);
+  configuration->set(minifi::Configure::nifi_flow_configuration_file, 
test_file_location);
 
   std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
   content_repo->initialize(configuration);
   std::shared_ptr<minifi::io::StreamFactory> stream_factory = 
minifi::io::StreamFactory::getInstance(configuration);
-  std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr
-      <core::YamlConfiguration
-      >(new core::YamlConfiguration(test_repo, test_repo, content_repo, 
stream_factory,
-                                    configuration,
-                                    test_file_location));
+  std::unique_ptr<core::FlowConfiguration> yaml_ptr = 
std::unique_ptr<core::YamlConfiguration>(
+      new core::YamlConfiguration(test_repo, test_repo, content_repo, 
stream_factory, configuration, test_file_location));
 
-  core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, 
stream_factory,
-                                      configuration,
-                                      test_file_location);
+  core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, 
stream_factory, configuration, test_file_location);
 
-  std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(
-                                                                
test_file_location);
-  std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup
-      >(ptr.get());
+  std::unique_ptr<core::ProcessGroup> ptr = 
yaml_config.getRoot(test_file_location);
+  std::shared_ptr<core::ProcessGroup> pg = 
std::shared_ptr<core::ProcessGroup>(ptr.get());
 
   queryRootProcessGroup(pg);
 
   ptr.release();
 
-  std::shared_ptr<TestRepository> repo = std::static_pointer_cast
-      <TestRepository>(test_repo);
+  std::shared_ptr<TestRepository> repo = 
std::static_pointer_cast<TestRepository>(test_repo);
 
-  std::shared_ptr<minifi::FlowController> controller =
-      std::make_shared<minifi::FlowController
-      >(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), 
content_repo, DEFAULT_ROOT_GROUP_NAME, true);
+  std::shared_ptr<minifi::FlowController> controller = 
std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, 
configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME,
+                                                                               
                 true);
 
   controller->load();
   controller->start();
   waitToVerifyProcessor();
 
-  controller->waitUnload(60000);
+  shutdownBeforeFlowController();
+  controller->unload();
 
   runAssertions();
 

Reply via email to