Repository: nifi-minifi-cpp Updated Branches: refs/heads/master 1257e5291 -> d7885d6bd
MINIFICPP-677: Change behavior of async callback This closes #441. Signed-off-by: Aldrin Piri <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/d7885d6b Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/d7885d6b Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/d7885d6b Branch: refs/heads/master Commit: d7885d6bd8dea00d584fe9fcbc3eb21a12d43205 Parents: 1257e52 Author: Marc Parisi <[email protected]> Authored: Thu Nov 15 15:30:46 2018 -0500 Committer: Aldrin Piri <[email protected]> Committed: Thu Nov 15 20:00:30 2018 -0500 ---------------------------------------------------------------------- extensions/http-curl/client/HTTPClient.cpp | 4 ++ extensions/http-curl/client/HTTPStream.cpp | 15 +++--- extensions/http-curl/client/HTTPStream.h | 3 ++ .../http-curl/tests/HTTPIntegrationBase.h | 15 ++++-- .../http-curl/tests/HTTPSiteToSiteTests.cpp | 2 +- libminifi/include/utils/ByteArrayCallback.h | 2 +- libminifi/test/integration/IntegrationBase.h | 52 +++++++++----------- 7 files changed, 50 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d7885d6b/extensions/http-curl/client/HTTPClient.cpp ---------------------------------------------------------------------- diff --git a/extensions/http-curl/client/HTTPClient.cpp b/extensions/http-curl/client/HTTPClient.cpp index 7940c6e..d607664 100644 --- a/extensions/http-curl/client/HTTPClient.cpp +++ b/extensions/http-curl/client/HTTPClient.cpp @@ -99,6 +99,10 @@ HTTPClient::~HTTPClient() { curl_easy_cleanup(http_session_); http_session_ = nullptr; } + // forceClose ended up not being the issue in MINIFICPP-667, but leaving here + // out of good hygiene. + forceClose(); + read_callback_.close(); logger_->log_trace("Closing HTTPClient for %s", url_); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d7885d6b/extensions/http-curl/client/HTTPStream.cpp ---------------------------------------------------------------------- diff --git a/extensions/http-curl/client/HTTPStream.cpp b/extensions/http-curl/client/HTTPStream.cpp index 608870b..8735b61 100644 --- a/extensions/http-curl/client/HTTPStream.cpp +++ b/extensions/http-curl/client/HTTPStream.cpp @@ -37,9 +37,9 @@ HttpStream::HttpStream(std::shared_ptr<utils::HTTPClient> client) written(0), // given the nature of the stream we don't want to slow libCURL, we will produce // a warning instead allowing us to adjust it server side or through the local configuration. - http_read_callback_(66560,true), + http_read_callback_(66560, true), started_(false), - logger_(logging::LoggerFactory<HttpStream>::getLogger()){ + logger_(logging::LoggerFactory<HttpStream>::getLogger()) { // submit early on } @@ -54,7 +54,7 @@ void HttpStream::seek(uint64_t offset) { } int HttpStream::writeData(std::vector<uint8_t> &buf, int buflen) { - if ((int)buf.capacity() < buflen) { + if ((int) buf.capacity() < buflen) { return -1; } return writeData(reinterpret_cast<uint8_t *>(&buf[0]), buflen); @@ -70,11 +70,11 @@ int HttpStream::writeData(uint8_t *value, int size) { callback_.ptr = &http_callback_; callback_.pos = 0; http_client_->setUploadCallback(&callback_); - http_client_future_ = std::async(submit_client, http_client_); + http_client_future_ = std::async(std::launch::async, submit_client, http_client_); started_ = true; } } - http_callback_.process(value,size); + http_callback_.process(value, size); return size; } else { return -1; @@ -90,7 +90,7 @@ inline std::vector<uint8_t> HttpStream::readBuffer(const T& t) { } int HttpStream::readData(std::vector<uint8_t> &buf, int buflen) { - if ((int)buf.capacity() < buflen) { + if ((int) buf.capacity() < buflen) { buf.resize(buflen); } int ret = readData(reinterpret_cast<uint8_t*>(&buf[0]), buflen); @@ -109,11 +109,10 @@ int HttpStream::readData(uint8_t *buf, int buflen) { read_callback_.ptr = &http_read_callback_; read_callback_.pos = 0; http_client_->setReadCallback(&read_callback_); - http_client_future_ = std::async(submit_read_client, http_client_, &http_read_callback_); + http_client_future_ = std::async(std::launch::async, submit_read_client, http_client_, &http_read_callback_); started_ = true; } } - return http_read_callback_.readFully((char*) buf, buflen); } else { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d7885d6b/extensions/http-curl/client/HTTPStream.h ---------------------------------------------------------------------- diff --git a/extensions/http-curl/client/HTTPStream.h b/extensions/http-curl/client/HTTPStream.h index d3e5bca..3829e94 100644 --- a/extensions/http-curl/client/HTTPStream.h +++ b/extensions/http-curl/client/HTTPStream.h @@ -59,6 +59,9 @@ class HttpStream : public io::BaseStream { void forceClose() { if (started_) { + // lock shouldn't be needed here as call paths currently guarantee + // flow, but we should be safe anyway. + std::lock_guard<std::mutex> lock(mutex_); closeStream(); http_client_->forceClose(); if (http_client_future_.valid()) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d7885d6b/extensions/http-curl/tests/HTTPIntegrationBase.h ---------------------------------------------------------------------- diff --git a/extensions/http-curl/tests/HTTPIntegrationBase.h b/extensions/http-curl/tests/HTTPIntegrationBase.h index 8defc56..1bc4c72 100644 --- a/extensions/http-curl/tests/HTTPIntegrationBase.h +++ b/extensions/http-curl/tests/HTTPIntegrationBase.h @@ -34,18 +34,25 @@ int ssl_enable(void *ssl_context, void *user_data) { class HTTPIntegrationBase : public IntegrationBase { public: - HTTPIntegrationBase() : IntegrationBase(), server(nullptr) {} + HTTPIntegrationBase(uint64_t waitTime = 60000) + : IntegrationBase(waitTime), + server(nullptr) { + } void setUrl(std::string url, CivetHandler *handler); virtual ~HTTPIntegrationBase(); + void shutdownBeforeFlowController() { + stop_webserver(server); + } + protected: CivetServer *server; }; HTTPIntegrationBase::~HTTPIntegrationBase() { - stop_webserver(server); + } void HTTPIntegrationBase::setUrl(std::string url, CivetHandler *handler) { @@ -53,8 +60,8 @@ void HTTPIntegrationBase::setUrl(std::string url, CivetHandler *handler) { parse_http_components(url, port, scheme, path); struct mg_callbacks callback; if (url.find("localhost") != std::string::npos) { - if (server != nullptr){ - server->addHandler(path,handler); + if (server != nullptr) { + server->addHandler(path, handler); return; } if (scheme == "https" && !key_dir.empty()) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d7885d6b/extensions/http-curl/tests/HTTPSiteToSiteTests.cpp ---------------------------------------------------------------------- diff --git a/extensions/http-curl/tests/HTTPSiteToSiteTests.cpp b/extensions/http-curl/tests/HTTPSiteToSiteTests.cpp index dd457b6..b908208 100644 --- a/extensions/http-curl/tests/HTTPSiteToSiteTests.cpp +++ b/extensions/http-curl/tests/HTTPSiteToSiteTests.cpp @@ -53,7 +53,7 @@ class SiteToSiteTestHarness : public HTTPIntegrationBase { public: explicit SiteToSiteTestHarness(bool isSecure) - : isSecure(isSecure) { + : HTTPIntegrationBase(2000), isSecure(isSecure) { char format[] = "/tmp/ssth.XXXXXX"; dir = testController.createTempDirectory(format); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d7885d6b/libminifi/include/utils/ByteArrayCallback.h ---------------------------------------------------------------------- diff --git a/libminifi/include/utils/ByteArrayCallback.h b/libminifi/include/utils/ByteArrayCallback.h index 49249a7..6c3bf81 100644 --- a/libminifi/include/utils/ByteArrayCallback.h +++ b/libminifi/include/utils/ByteArrayCallback.h @@ -104,7 +104,7 @@ class ByteOutputCallback : public OutputStreamCallback { } virtual ~ByteOutputCallback() { - + close(); } virtual int64_t process(std::shared_ptr<io::BaseStream> stream); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d7885d6b/libminifi/test/integration/IntegrationBase.h ---------------------------------------------------------------------- diff --git a/libminifi/test/integration/IntegrationBase.h b/libminifi/test/integration/IntegrationBase.h index cb86b7e..4d7e303 100644 --- a/libminifi/test/integration/IntegrationBase.h +++ b/libminifi/test/integration/IntegrationBase.h @@ -31,7 +31,7 @@ class IntegrationBase { public: - IntegrationBase(); + IntegrationBase(uint64_t waitTime = 60000); virtual ~IntegrationBase(); @@ -44,6 +44,10 @@ class IntegrationBase { virtual void testSetup() = 0; + virtual void shutdownBeforeFlowController() { + + } + virtual void cleanup() = 0; virtual void runAssertions() = 0; @@ -60,17 +64,18 @@ class IntegrationBase { void configureSecurity(); std::shared_ptr<minifi::Configure> configuration; + uint64_t wait_time_; std::string port, scheme, path; std::string key_dir; }; -IntegrationBase::IntegrationBase() - : configuration(std::make_shared<minifi::Configure>()) { +IntegrationBase::IntegrationBase(uint64_t waitTime) + : configuration(std::make_shared<minifi::Configure>()), + wait_time_(waitTime) { mkdir("content_repository", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); } -IntegrationBase::~IntegrationBase() -{ +IntegrationBase::~IntegrationBase() { rmdir("./content_repository"); } @@ -87,48 +92,37 @@ void IntegrationBase::configureSecurity() { void IntegrationBase::run(std::string test_file_location) { testSetup(); - std::shared_ptr<core::Repository> test_repo = - std::make_shared<TestRepository>(); - std::shared_ptr<core::Repository> test_flow_repo = std::make_shared< - TestFlowRepository>(); + std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>(); + std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<TestFlowRepository>(); - configuration->set(minifi::Configure::nifi_flow_configuration_file, - test_file_location); + configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location); std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); content_repo->initialize(configuration); std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration); - std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr - <core::YamlConfiguration - >(new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, - configuration, - test_file_location)); + std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>( + new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location)); - core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, - configuration, - test_file_location); + core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location); - std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot( - test_file_location); - std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup - >(ptr.get()); + std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(test_file_location); + std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup>(ptr.get()); queryRootProcessGroup(pg); ptr.release(); - std::shared_ptr<TestRepository> repo = std::static_pointer_cast - <TestRepository>(test_repo); + std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo); - std::shared_ptr<minifi::FlowController> controller = - std::make_shared<minifi::FlowController - >(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME, true); + std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME, + true); controller->load(); controller->start(); waitToVerifyProcessor(); - controller->waitUnload(60000); + shutdownBeforeFlowController(); + controller->unload(); runAssertions();
