MINIFICPP-12: Resolve warnings found during compilation MINIFICPP-12: Remove linux warnings
MINIFICPP-12: Update property and stringutils MINIFICPP-12: Resolve linter issues MINIFICPP-12: Update extension readme This closes #212. Signed-off-by: Aldrin Piri <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/002b0743 Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/002b0743 Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/002b0743 Branch: refs/heads/master Commit: 002b07434a963ec5ccdd984f499e584ff3af1e19 Parents: 9032c0f Author: Marc Parisi <[email protected]> Authored: Wed Nov 29 09:05:00 2017 -0500 Committer: Aldrin Piri <[email protected]> Committed: Thu Dec 7 15:05:59 2017 -0500 ---------------------------------------------------------------------- CMakeLists.txt | 27 +- Extensions.md | 22 +- README.md | 22 +- extensions/http-curl/client/HTTPCallback.h | 6 +- extensions/http-curl/client/HTTPClient.cpp | 40 +-- extensions/http-curl/client/HTTPClient.h | 54 ++-- extensions/http-curl/client/HTTPStream.cpp | 10 +- extensions/http-curl/client/HTTPStream.h | 60 +++-- extensions/http-curl/processors/InvokeHTTP.cpp | 2 - extensions/http-curl/processors/InvokeHTTP.h | 14 +- extensions/http-curl/protocols/RESTProtocol.cpp | 2 +- extensions/http-curl/protocols/RESTReceiver.cpp | 9 +- extensions/http-curl/protocols/RESTSender.cpp | 4 +- .../http-curl/sitetosite/HTTPProtocol.cpp | 8 +- extensions/http-curl/sitetosite/HTTPProtocol.h | 3 +- .../http-curl/sitetosite/HTTPTransaction.h | 1 - extensions/libarchive/CompressContent.h | 1 - extensions/libarchive/FocusArchiveEntry.cpp | 5 +- extensions/libarchive/MergeContent.h | 1 - extensions/pcap/CapturePacket.h | 2 +- .../rocksdb-repos/DatabaseContentRepository.h | 4 +- extensions/rocksdb-repos/FlowFileRepository.cpp | 4 +- extensions/rocksdb-repos/FlowFileRepository.h | 4 +- .../rocksdb-repos/ProvenanceRepository.cpp | 4 +- extensions/rocksdb-repos/ProvenanceRepository.h | 2 +- extensions/rocksdb-repos/RocksDbStream.cpp | 6 +- extensions/script/python/PyBaseStream.cpp | 2 +- extensions/usb-camera/GetUSBCamera.cpp | 103 +++---- libminifi/CMakeLists.txt | 2 + libminifi/include/FlowController.h | 4 +- libminifi/include/FlowFileRecord.h | 2 +- libminifi/include/RemoteProcessorGroupPort.h | 14 +- libminifi/include/SchedulingAgent.h | 4 +- libminifi/include/ThreadedSchedulingAgent.h | 5 +- libminifi/include/core/Core.h | 8 +- libminifi/include/core/FlowConfiguration.h | 2 +- libminifi/include/core/ProcessContext.h | 8 +- libminifi/include/core/Property.h | 108 +++++++- .../core/controller/ControllerServiceNode.h | 4 +- .../StandardControllerServiceProvider.h | 11 +- .../include/core/repository/AtomicRepoEntries.h | 9 +- .../core/repository/VolatileContentRepository.h | 14 +- .../core/repository/VolatileRepository.h | 22 +- .../core/state/metrics/DeviceInformation.h | 8 +- .../include/core/state/metrics/MetricsBase.h | 4 +- .../core/state/metrics/MetricsListener.h | 6 +- .../include/core/state/metrics/ProcessMetrics.h | 2 +- .../include/core/state/metrics/QueueMetrics.h | 2 +- .../core/state/metrics/RepositoryMetrics.h | 2 +- .../include/core/state/metrics/SystemMetrics.h | 3 +- libminifi/include/core/yaml/YamlConfiguration.h | 12 +- libminifi/include/io/AtomicEntryStream.h | 4 +- libminifi/include/io/CRCStream.h | 30 +-- libminifi/include/io/DataStream.h | 2 +- libminifi/include/io/NonConvertingStream.h | 2 +- libminifi/include/io/Serializable.h | 2 +- libminifi/include/io/validation.h | 14 - libminifi/include/processors/GetFile.h | 15 +- libminifi/include/processors/GetTCP.h | 20 +- libminifi/include/processors/TailFile.h | 1 - libminifi/include/sitetosite/Peer.h | 40 ++- libminifi/include/sitetosite/SiteToSite.h | 14 +- libminifi/include/sitetosite/SiteToSiteClient.h | 9 +- .../include/sitetosite/SiteToSiteFactory.h | 6 +- libminifi/include/utils/ByteArrayCallback.h | 7 +- libminifi/include/utils/HTTPClient.h | 68 +---- libminifi/include/utils/StringUtils.h | 4 +- libminifi/include/utils/ThreadPool.h | 34 +-- libminifi/src/FlowController.cpp | 6 +- libminifi/src/RemoteProcessorGroupPort.cpp | 20 +- libminifi/src/c2/C2Agent.cpp | 2 +- libminifi/src/c2/C2Payload.cpp | 2 +- libminifi/src/core/Core.cpp | 2 +- libminifi/src/core/ProcessSession.cpp | 4 +- .../repository/VolatileContentRepository.cpp | 48 ++-- libminifi/src/core/yaml/YamlConfiguration.cpp | 2 +- libminifi/src/io/BaseStream.cpp | 2 +- libminifi/src/io/ClientSocket.cpp | 29 +- libminifi/src/io/DataStream.cpp | 2 +- libminifi/src/io/FileStream.cpp | 4 +- libminifi/src/io/NonConvertingStream.cpp | 2 +- libminifi/src/io/Serializable.cpp | 8 +- libminifi/src/processors/ExecuteProcess.cpp | 6 +- libminifi/src/processors/ExtractText.cpp | 4 +- libminifi/src/processors/GetFile.cpp | 4 +- libminifi/src/processors/GetTCP.cpp | 2 +- libminifi/src/processors/ListenHTTP.cpp | 4 +- libminifi/src/processors/ListenSyslog.cpp | 6 +- libminifi/src/processors/TailFile.cpp | 2 +- libminifi/src/sitetosite/RawSocketProtocol.cpp | 2 +- libminifi/src/sitetosite/SiteToSiteClient.cpp | 2 +- libminifi/src/utils/HTTPClient.cpp | 87 ++++++ libminifi/test/TestBase.cpp | 4 +- libminifi/test/TestBase.h | 10 +- libminifi/test/TestServer.h | 14 - .../test/archive-tests/util/ArchiveTests.cpp | 270 ++++++++++--------- .../curl-tests/sitetositehttp/HTTPHandlers.h | 43 ++- libminifi/test/gps-tests/GPSTests.cpp | 1 + libminifi/test/unit/ProvenanceTestHelper.h | 33 ++- libminifi/test/unit/ThreadPoolTests.cpp | 4 +- libminifi/test/unit/YamlConfigurationTests.cpp | 4 +- main/MiNiFiMain.cpp | 23 +- 102 files changed, 846 insertions(+), 762 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/CMakeLists.txt b/CMakeLists.txt index 9e01f10..175b945 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -24,6 +24,7 @@ set(PROJECT_VERSION_MAJOR 0) set(PROJECT_VERSION_MINOR 4) set(PROJECT_VERSION_PATCH 0) option(SKIP_TESTS "Skips building all tests." OFF) +option(PORTABLE "Instructs the compiler to remove architecture specific optimizations" ON) include(FeatureSummary) @@ -50,6 +51,16 @@ else() message(STATUS "The compiler ${CMAKE_CXX_COMPILER} has no C++11 support. Please use a different C++ compiler.") endif() +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall") + +if (NOT PORTABLE) + if(MSVC) + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /arch:AVX2") + else() + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -march=native") + endif() +endif() + set(CMAKE_CXX_STANDARD 11) set(CMAKE_CXX_STANDARD_REQUIRED ON) @@ -121,10 +132,6 @@ endif() add_subdirectory(libminifi) -if (EXCLUDE_BOOST) - message(" -- Excluding Boost dependent packages...") -endif(EXCLUDE_BOOST) - #### EXTENSION option(DISABLE_CURL "Disables libCurl Properties." OFF) if (NOT DISABLE_CURL) @@ -151,18 +158,18 @@ if (NOT DISABLE_LIBARCHIVE) endif() option(ENABLE_GPS "Enables the GPS extension." OFF) -if (ENABLE_GPS) +if (ENABLE_ALL OR ENABLE_GPS) createExtension(GPS-EXTENSION "GPS EXTENSIONS" "Enables LibGPS Functionality and the GetGPS processor." "extensions/gps" "${TEST_DIR}/gps-tests") -endif(ENABLE_GPS) +endif() option(ENABLE_PCAP "Enables the PCAP extension." OFF) -if(ENABLE_PCAP) +if(ENABLE_ALL OR ENABLE_PCAP) createExtension(PCAP-EXTENSION "PCAP EXTENSIONS" "Enables libPCAP Functionality and the PacketCapture processor." "extensions/pcap" "${TEST_DIR}/pcap-tests") -endif(ENABLE_PCAP) +endif() ## Create LibRdKafka Extension option(ENABLE_LIBRDKAFKA "Enables the librdkafka extension." OFF) -if (ENABLE_LIBRDKAFKA) +if (ENABLE_ALL OR ENABLE_LIBRDKAFKA) createExtension(RDKAFKA-EXTENSIONS "RDKAFKA EXTENSIONS" "This Enables librdkafka functionality including PublishKafka" "extensions/librdkafka" "${TEST_DIR}/kafka-tests" "TRUE" "thirdparty/librdkafka-0.11.1") endif() @@ -175,7 +182,7 @@ endif() ## USB camera extensions option(ENABLE_USB_CAMERA "Enables USB camera support." OFF) -if (ENABLE_USB_CAMERA) +if (ENABLE_ALL OR ENABLE_USB_CAMERA) createExtension(USB-CAMERA-EXTENSIONS "USB CAMERA EXTENSIONS" "This enables USB camera support" "extensions/usb-camera" "${TEST_DIR}/usb-camera-tests" "TRUE" "thirdparty/libuvc-0.0.6") endif() http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/Extensions.md ---------------------------------------------------------------------- diff --git a/Extensions.md b/Extensions.md index acebf75..24ef07e 100644 --- a/Extensions.md +++ b/Extensions.md @@ -12,7 +12,27 @@ See the License for the specific language governing permissions and limitations under the License. --> -# Apache MiNiFi Extensions HowTo +# Apache MiNiFi Extensions Guide + +Currently we support the following extension flags: + - `-DDISABLE_CURL=TRUE` + - `-DDISABLE_ROCKSDB=TRUE` + - `-DDISABLE_LIBARCHIVE=TRUE` + - `-DDISABLE_USB_CAMERA=TRUE` + - `-DDISABLE_LIBRDKAFKA=TRUE` + - `-DDISABLE_SCRIPTING=TRUE` + - `-DDISABLE_EXPRESSION_LANGUAGE=TRUE` + - `-DDISABLE_PYTHON_SCRIPTING=TRUE` + - `-DENABLE_LUA_SCRIPTING=TRUE` + - `-DENABLE_PCAP=TRUE` + - `-DENABLE_GPS=TRUE` + - `-DENABLE_TENSORFLOW=TRUE` + +For more information on these extensions, please visit [the Extension How-To on our wiki](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=74685143) + +To enable all extensions for your platform, you may use -DENABLE_ALL=TRUE + +# Extensions by example Extensions consist of modules that are conditionally built into your client. Reasons why you may wish to do this with your modules/processors http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/README.md ---------------------------------------------------------------------- diff --git a/README.md b/README.md index a6286d8..716f9c6 100644 --- a/README.md +++ b/README.md @@ -222,16 +222,8 @@ $ sudo brew install libpcap ``` - Perform a `cmake ..` to generate the project files - - Optionally, disable or enable features using any combination of the following flags (more information is available - on the [wiki](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=74685143)): - - `-DDISABLE_CURL=1` - - `-DDISABLE_ROCKSDB=1` - - `-DDISABLE_LIBARCHIVE=1` - - `-DDISABLE_USB_CAMERA=1` - - `-DDISABLE_LIBRDKAFKA=1` - - `-DDISABLE_SCRIPTING=1` - - `-DDISABLE_PYTHON_SCRIPTING=1` - - `-DENABLE_LUA_SCRIPTING=1` + - Optionally disable or enable extensions. Please visit our guide [extensions guide](Extensions.md) for flags or our wiki entry on + [customizing builds](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=74685143) for more information on this topic. ``` # ~/Development/code/apache/nifi-minifi-cpp on git:master $ cmake .. @@ -241,16 +233,6 @@ $ sudo brew install libpcap -- Build files have been written to: /Users/apiri/Development/code/apache/nifi-minifi-cpp/build ``` -- To build with GPS support perform a 'cmake -DBUILD_GPS=ON ..' to generate the project files - ``` - # ~/Development/code/apache/nifi-minifi-cpp on git:master - $ cmake -DBUILD_GPS=ON .. - ... - -- Configuring done - -- Generating done - -- Build files have been written to: /Users/apiri/Development/code/apache/nifi-minifi-cpp/build - ``` - - Perform a build ``` # ~/Development/code/apache/nifi-minifi-cpp on git:master http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/extensions/http-curl/client/HTTPCallback.h ---------------------------------------------------------------------- diff --git a/extensions/http-curl/client/HTTPCallback.h b/extensions/http-curl/client/HTTPCallback.h index aeca2a9..170f40a 100644 --- a/extensions/http-curl/client/HTTPCallback.h +++ b/extensions/http-curl/client/HTTPCallback.h @@ -38,8 +38,8 @@ namespace utils { class HttpStreamingCallback : public ByteInputCallBack { public: HttpStreamingCallback() - : ptr(nullptr), - is_alive_(true) { + : is_alive_(true), + ptr(nullptr) { previous_pos_ = 0; rolling_count_ = 0; } @@ -121,8 +121,6 @@ class HttpStreamingCallback : public ByteInputCallBack { size_t absolute_position = pos - previous_pos_; current_pos_ = pos; - for (int i = 0; i < current_vec_.size(); i++) { - } return ptr + absolute_position; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/extensions/http-curl/client/HTTPClient.cpp ---------------------------------------------------------------------- diff --git a/extensions/http-curl/client/HTTPClient.cpp b/extensions/http-curl/client/HTTPClient.cpp index d83d04e..707d301 100644 --- a/extensions/http-curl/client/HTTPClient.cpp +++ b/extensions/http-curl/client/HTTPClient.cpp @@ -33,18 +33,19 @@ HTTPClient::HTTPClient(const std::string &url, const std::shared_ptr<minifi::con : core::Connectable("HTTPClient", 0), ssl_context_service_(ssl_context_service), url_(url), - logger_(logging::LoggerFactory<HTTPClient>::getLogger()), connect_timeout_(0), read_timeout_(0), - content_type(nullptr), + content_type_str_(nullptr), headers_(nullptr), callback(nullptr), write_callback_(nullptr), http_code(0), read_callback_(INT_MAX), header_response_(-1), - res(CURLE_OK) { + res(CURLE_OK), + logger_(logging::LoggerFactory<HTTPClient>::getLogger()) { HTTPClientInitializer *initializer = HTTPClientInitializer::getInstance(); + initializer->initialize(); http_session_ = curl_easy_init(); } @@ -52,37 +53,39 @@ HTTPClient::HTTPClient(std::string name, uuid_t uuid) : core::Connectable(name, uuid), ssl_context_service_(nullptr), url_(), - logger_(logging::LoggerFactory<HTTPClient>::getLogger()), connect_timeout_(0), read_timeout_(0), + content_type_str_(nullptr), + headers_(nullptr), callback(nullptr), write_callback_(nullptr), - content_type(nullptr), - read_callback_(INT_MAX), - headers_(nullptr), http_code(0), + read_callback_(INT_MAX), header_response_(-1), - res(CURLE_OK) { + res(CURLE_OK), + logger_(logging::LoggerFactory<HTTPClient>::getLogger()) { HTTPClientInitializer *initializer = HTTPClientInitializer::getInstance(); + initializer->initialize(); http_session_ = curl_easy_init(); } HTTPClient::HTTPClient() : core::Connectable("HTTPClient", 0), ssl_context_service_(nullptr), - callback(nullptr), - write_callback_(nullptr), url_(), - logger_(logging::LoggerFactory<HTTPClient>::getLogger()), connect_timeout_(0), read_timeout_(0), - content_type(nullptr), + content_type_str_(nullptr), headers_(nullptr), + callback(nullptr), + write_callback_(nullptr), http_code(0), read_callback_(INT_MAX), header_response_(-1), - res(CURLE_OK) { + res(CURLE_OK), + logger_(logging::LoggerFactory<HTTPClient>::getLogger()) { HTTPClientInitializer *initializer = HTTPClientInitializer::getInstance(); + initializer->initialize(); http_session_ = curl_easy_init(); } @@ -95,6 +98,7 @@ HTTPClient::~HTTPClient() { curl_easy_cleanup(http_session_); http_session_ = nullptr; } + logger_->log_info("Closing HTTPClient for %s", url_); } void HTTPClient::forceClose() { @@ -148,7 +152,7 @@ void HTTPClient::setReadCallback(HTTPReadCallback *callbackObj) { } void HTTPClient::setUploadCallback(HTTPUploadCallback *callbackObj) { - logger_->log_info("Setting callback"); + logger_->log_info("Setting callback for %s", url_); write_callback_ = callbackObj; if (method_ == "put" || method_ == "PUT") { curl_easy_setopt(http_session_, CURLOPT_INFILESIZE_LARGE, (curl_off_t ) callbackObj->ptr->getBufferSize()); @@ -230,7 +234,7 @@ bool HTTPClient::submit() { read_callback_.close(); } curl_easy_getinfo(http_session_, CURLINFO_RESPONSE_CODE, &http_code); - curl_easy_getinfo(http_session_, CURLINFO_CONTENT_TYPE, &content_type); + curl_easy_getinfo(http_session_, CURLINFO_CONTENT_TYPE, &content_type_str_); if (res != CURLE_OK) { logger_->log_error("curl_easy_perform() failed %s\n", curl_easy_strerror(res)); return false; @@ -239,7 +243,7 @@ bool HTTPClient::submit() { logger_->log_info("Finished with %s", url_); std::string key = ""; for (auto header_line : header_response_.header_tokens_) { - int i = 0; + unsigned int i = 0; for (i = 0; i < header_line.length(); i++) { if (header_line.at(i) == ':') { break; @@ -279,12 +283,12 @@ int64_t &HTTPClient::getResponseCode() { } const char *HTTPClient::getContentType() { - return content_type; + return content_type_str_; } const std::vector<char> &HTTPClient::getResponseBody() { if (response_body_.size() == 0) - response_body_ = std::move(read_callback_.to_string()); + response_body_ = read_callback_.to_string(); return response_body_; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/extensions/http-curl/client/HTTPClient.h ---------------------------------------------------------------------- diff --git a/extensions/http-curl/client/HTTPClient.h b/extensions/http-curl/client/HTTPClient.h index 1d858b2..85000d9 100644 --- a/extensions/http-curl/client/HTTPClient.h +++ b/extensions/http-curl/client/HTTPClient.h @@ -50,6 +50,9 @@ class HTTPClientInitializer { static HTTPClientInitializer initializer; return &initializer; } + void initialize() { + + } private: ~HTTPClientInitializer() { curl_global_cleanup(); @@ -78,7 +81,7 @@ class HTTPClient : public BaseHTTPClient, public core::Connectable { ~HTTPClient(); - virtual void setVerbose() override ; + virtual void setVerbose() override; void forceClose(); @@ -90,7 +93,7 @@ class HTTPClient : public BaseHTTPClient, public core::Connectable { virtual void setUploadCallback(HTTPUploadCallback *callbackObj) override; - virtual void setReadCallback(HTTPReadCallback *callbackObj) ; + virtual void setReadCallback(HTTPReadCallback *callbackObj); struct curl_slist *build_header_list(std::string regex, const std::map<std::string, std::string> &attributes); @@ -122,19 +125,23 @@ class HTTPClient : public BaseHTTPClient, public core::Connectable { void setDisablePeerVerification() override; - const std::vector<std::string> &getHeaders() override{ + std::string getURL() const{ + return url_; + } + + const std::vector<std::string> &getHeaders() override { return header_response_.header_tokens_; } - virtual const std::map<std::string, std::string> &getParsedHeaders() override{ + virtual const std::map<std::string, std::string> &getParsedHeaders() override { return header_response_.header_mapping_; } /** * Determines if we are connected and operating */ - virtual bool isRunning() override{ + virtual bool isRunning() override { return true; } @@ -145,7 +152,7 @@ class HTTPClient : public BaseHTTPClient, public core::Connectable { void waitForWork(uint64_t timeoutMs) { } - virtual void yield() override{ + virtual void yield() override { } @@ -153,7 +160,7 @@ class HTTPClient : public BaseHTTPClient, public core::Connectable { * Determines if work is available by this connectable * @return boolean if work is available. */ - virtual bool isWorkAvailable() override{ + virtual bool isWorkAvailable() override { return true; } @@ -171,29 +178,36 @@ class HTTPClient : public BaseHTTPClient, public core::Connectable { void configure_secure_connection(CURL *http_session); - HTTPReadCallback *callback; - HTTPUploadCallback *write_callback_; - bool isSecure(const std::string &url); - struct curl_slist *headers_; + HTTPReadCallback content_; + + std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service_; + std::string url_; + int64_t connect_timeout_; + // read timeout. + int64_t read_timeout_; + char *content_type_str_; + std::string content_type_; + struct curl_slist *headers_; + HTTPReadCallback *callback; + HTTPUploadCallback *write_callback_; + int64_t http_code; ByteOutputCallback read_callback_; utils::HTTPHeaderResponse header_response_; + CURLcode res; - int64_t http_code; - char *content_type; - int64_t connect_timeout_; -// read timeout. - int64_t read_timeout_; - std::string content_type_; - std::shared_ptr<logging::Logger> logger_; + + CURL *http_session_; - std::string url_; + std::string method_; - std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service_; + + std::shared_ptr<logging::Logger> logger_; + }; } /* namespace utils */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/extensions/http-curl/client/HTTPStream.cpp ---------------------------------------------------------------------- diff --git a/extensions/http-curl/client/HTTPStream.cpp b/extensions/http-curl/client/HTTPStream.cpp index 2d702b3..608870b 100644 --- a/extensions/http-curl/client/HTTPStream.cpp +++ b/extensions/http-curl/client/HTTPStream.cpp @@ -33,13 +33,13 @@ namespace minifi { namespace io { HttpStream::HttpStream(std::shared_ptr<utils::HTTPClient> client) - : logger_(logging::LoggerFactory<HttpStream>::getLogger()), - http_client_(client), + : http_client_(client), written(0), // given the nature of the stream we don't want to slow libCURL, we will produce // a warning instead allowing us to adjust it server side or through the local configuration. http_read_callback_(66560,true), - started_(false) { + started_(false), + logger_(logging::LoggerFactory<HttpStream>::getLogger()){ // submit early on } @@ -54,7 +54,7 @@ void HttpStream::seek(uint64_t offset) { } int HttpStream::writeData(std::vector<uint8_t> &buf, int buflen) { - if (buf.capacity() < buflen) { + if ((int)buf.capacity() < buflen) { return -1; } return writeData(reinterpret_cast<uint8_t *>(&buf[0]), buflen); @@ -90,7 +90,7 @@ inline std::vector<uint8_t> HttpStream::readBuffer(const T& t) { } int HttpStream::readData(std::vector<uint8_t> &buf, int buflen) { - if (buf.capacity() < buflen) { + if ((int)buf.capacity() < buflen) { buf.resize(buflen); } int ret = readData(reinterpret_cast<uint8_t*>(&buf[0]), buflen); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/extensions/http-curl/client/HTTPStream.h ---------------------------------------------------------------------- diff --git a/extensions/http-curl/client/HTTPStream.h b/extensions/http-curl/client/HTTPStream.h index c534dbc..d3e5bca 100644 --- a/extensions/http-curl/client/HTTPStream.h +++ b/extensions/http-curl/client/HTTPStream.h @@ -43,21 +43,32 @@ class HttpStream : public io::BaseStream { explicit HttpStream(std::shared_ptr<utils::HTTPClient> http_client_); virtual ~HttpStream() { - closeStream(); - http_client_future_.get(); + forceClose(); } virtual void closeStream() override; + const std::shared_ptr<utils::HTTPClient> &getClientRef() { + return http_client_; + } + const std::shared_ptr<utils::HTTPClient> &getClient() { http_client_future_.get(); return http_client_; } - void forceClose(){ - closeStream(); - http_client_->forceClose(); - http_client_future_.get(); + void forceClose() { + if (started_) { + closeStream(); + http_client_->forceClose(); + if (http_client_future_.valid()) { + http_client_future_.get(); + } else { + logger_->log_warn("Future status already cleared for %s, continuing", http_client_->getURL()); + } + + started_ = false; + } } /** * Skip to the specified offset. @@ -65,7 +76,7 @@ class HttpStream : public io::BaseStream { */ virtual void seek(uint64_t offset) override; - virtual const uint64_t getSize() const override{ + virtual const uint64_t getSize() const override { return written; } @@ -107,40 +118,38 @@ class HttpStream : public io::BaseStream { } static bool submit_client(std::shared_ptr<utils::HTTPClient> client) { + if (client == nullptr) + return false; bool submit_status = client->submit(); - return submit_status; } - static bool submit_read_client( std::shared_ptr<utils::HTTPClient> client, utils::ByteOutputCallback *callback){ - if (!client) + static bool submit_read_client(std::shared_ptr<utils::HTTPClient> client, utils::ByteOutputCallback *callback) { + if (client == nullptr) return false; bool submit_status = client->submit(); callback->close(); return submit_status; } - inline bool isFinished(int seconds = 0) { + inline bool isFinished(int seconds = 0) { if (http_client_future_.wait_for(std::chrono::seconds(seconds)) == std::future_status::ready && (http_read_callback_.getSize() == 0 && http_read_callback_.waitingOps())) { return true; - } - else{ + } else { return false; } } - /** - * Waits for more data to become available. - */ - bool waitForDataAvailable(){ - do{ - logger_->log_trace("Waiting for more data"); - }while(http_client_future_.wait_for(std::chrono::seconds(0)) != std::future_status::ready && http_read_callback_.getSize() == 0); - - return http_read_callback_.getSize() > 0; - } - + /** + * Waits for more data to become available. + */ + bool waitForDataAvailable() { + do { + logger_->log_trace("Waiting for more data"); + } while (http_client_future_.wait_for(std::chrono::seconds(0)) != std::future_status::ready && http_read_callback_.getSize() == 0); + return http_read_callback_.getSize() > 0; + } protected: @@ -163,7 +172,6 @@ class HttpStream : public io::BaseStream { size_t written; std::mutex mutex_; - std::atomic<bool> started_; utils::HttpStreamingCallback http_callback_; @@ -173,6 +181,8 @@ class HttpStream : public io::BaseStream { utils::HTTPReadCallback read_callback_; + std::atomic<bool> started_; + private: std::shared_ptr<logging::Logger> logger_; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/extensions/http-curl/processors/InvokeHTTP.cpp ---------------------------------------------------------------------- diff --git a/extensions/http-curl/processors/InvokeHTTP.cpp b/extensions/http-curl/processors/InvokeHTTP.cpp index ef13be4..a8bcbab 100644 --- a/extensions/http-curl/processors/InvokeHTTP.cpp +++ b/extensions/http-curl/processors/InvokeHTTP.cpp @@ -327,9 +327,7 @@ void InvokeHTTP::onTrigger(const std::shared_ptr<core::ProcessContext> &context, flowFile->addAttribute(TRANSACTION_ID, tx_id); bool isSuccess = ((int32_t) (http_code / 100)) == 2; - bool output_body_to_requestAttr = (!isSuccess || putToAttribute) && flowFile != nullptr; bool output_body_to_content = isSuccess && !putToAttribute; - bool body_empty = IsNullOrEmpty(response_body); logger_->log_info("isSuccess: %d, response code %d", isSuccess, http_code); std::shared_ptr<FlowFileRecord> response_flow = nullptr; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/extensions/http-curl/processors/InvokeHTTP.h ---------------------------------------------------------------------- diff --git a/extensions/http-curl/processors/InvokeHTTP.h b/extensions/http-curl/processors/InvokeHTTP.h index 1aa3faa..f1aff50 100644 --- a/extensions/http-curl/processors/InvokeHTTP.h +++ b/extensions/http-curl/processors/InvokeHTTP.h @@ -51,16 +51,17 @@ class InvokeHTTP : public core::Processor { */ InvokeHTTP(std::string name, uuid_t uuid = NULL) : Processor(name, uuid), + ssl_context_service_(nullptr), date_header_include_(true), connect_timeout_(20000), - penalize_no_retry_(false), read_timeout_(20000), always_output_response_(false), - disable_peer_verification_(false), - ssl_context_service_(nullptr), use_chunked_encoding_(false), + penalize_no_retry_(false), + disable_peer_verification_(false), logger_(logging::LoggerFactory<InvokeHTTP>::getLogger()) { static utils::HTTPClientInitializer *initializer = utils::HTTPClientInitializer::getInstance(); + initializer->initialize(); } // Destructor virtual ~InvokeHTTP(); @@ -131,7 +132,8 @@ class InvokeHTTP : public core::Processor { * @param isSuccess success code or not * @param statuscode http response code. */ - void route(std::shared_ptr<FlowFileRecord> &request, std::shared_ptr<FlowFileRecord> &response, const std::shared_ptr<core::ProcessSession> &session, const std::shared_ptr<core::ProcessContext> &context, bool isSuccess, int statusCode); + void route(std::shared_ptr<FlowFileRecord> &request, std::shared_ptr<FlowFileRecord> &response, const std::shared_ptr<core::ProcessSession> &session, + const std::shared_ptr<core::ProcessContext> &context, bool isSuccess, int statusCode); /** * Determine if we should emit a new flowfile based on our activity * @param method method type @@ -157,12 +159,12 @@ class InvokeHTTP : public core::Processor { std::string put_attribute_name_; // determine if we always output a response. bool always_output_response_; - // penalize on no retry - bool penalize_no_retry_; // content type. std::string content_type_; // use chunked encoding. bool use_chunked_encoding_; + // penalize on no retry + bool penalize_no_retry_; // disable peer verification ( makes susceptible for MITM attacks ) bool disable_peer_verification_; private: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/extensions/http-curl/protocols/RESTProtocol.cpp ---------------------------------------------------------------------- diff --git a/extensions/http-curl/protocols/RESTProtocol.cpp b/extensions/http-curl/protocols/RESTProtocol.cpp index 9d58c36..afbe3c9 100644 --- a/extensions/http-curl/protocols/RESTProtocol.cpp +++ b/extensions/http-curl/protocols/RESTProtocol.cpp @@ -73,7 +73,7 @@ const C2Payload RESTProtocol::parseJsonResponse(const C2Payload &payload, const new_payload.addPayload(std::move(nested_payload)); } // we have a response for this request - return std::move(new_payload); + return new_payload; } } } catch (...) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/extensions/http-curl/protocols/RESTReceiver.cpp ---------------------------------------------------------------------- diff --git a/extensions/http-curl/protocols/RESTReceiver.cpp b/extensions/http-curl/protocols/RESTReceiver.cpp index a84f30a..4c46516 100644 --- a/extensions/http-curl/protocols/RESTReceiver.cpp +++ b/extensions/http-curl/protocols/RESTReceiver.cpp @@ -36,7 +36,6 @@ int log_message(const struct mg_connection *conn, const char *message) { } int ssl_protocol_en(void *ssl_context, void *user_data) { - struct ssl_ctx_st *ctx = (struct ssl_ctx_st *) ssl_context; return 0; } @@ -57,9 +56,9 @@ void RESTReceiver::initialize(const std::shared_ptr<core::controller::Controller if (!listeningPort.empty() && !rootUri.empty()) { handler = std::unique_ptr<ListeningProtocol>(new ListeningProtocol()); if (!caCert.empty()) { - listener = std::move(start_webserver(listeningPort, rootUri, dynamic_cast<CivetHandler*>(handler.get()), caCert)); + listener = start_webserver(listeningPort, rootUri, dynamic_cast<CivetHandler*>(handler.get()), caCert); } else { - listener = std::move(start_webserver(listeningPort, rootUri, dynamic_cast<CivetHandler*>(handler.get()))); + listener = start_webserver(listeningPort, rootUri, dynamic_cast<CivetHandler*>(handler.get())); } } } @@ -117,7 +116,7 @@ std::unique_ptr<CivetServer> RESTReceiver::start_webserver(const std::string &po "ssl_verify_peer", "no", "num_threads", "1", 0 }; std::vector<std::string> cpp_options; - for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) { + for (uint32_t i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) { cpp_options.push_back(options[i]); } std::unique_ptr<CivetServer> server = std::unique_ptr<CivetServer>(new CivetServer(cpp_options)); @@ -131,7 +130,7 @@ std::unique_ptr<CivetServer> RESTReceiver::start_webserver(const std::string &po const char *options[] = { "document_root", ".", "listening_ports", port.c_str(), "num_threads", "1", 0 }; std::vector<std::string> cpp_options; - for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) { + for (uint32_t i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) { cpp_options.push_back(options[i]); } std::unique_ptr<CivetServer> server = std::unique_ptr<CivetServer>(new CivetServer(cpp_options)); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/extensions/http-curl/protocols/RESTSender.cpp ---------------------------------------------------------------------- diff --git a/extensions/http-curl/protocols/RESTSender.cpp b/extensions/http-curl/protocols/RESTSender.cpp index 9959814..ebf532a 100644 --- a/extensions/http-curl/protocols/RESTSender.cpp +++ b/extensions/http-curl/protocols/RESTSender.cpp @@ -81,7 +81,7 @@ C2Payload RESTSender::consumePayload(const std::string &url, const C2Payload &pa outputConfig = writer.write(json_payload); } - return std::move(sendPayload(url, direction, payload, outputConfig)); + return sendPayload(url, direction, payload, outputConfig); } C2Payload RESTSender::consumePayload(const C2Payload &payload, Direction direction, bool async) { @@ -125,7 +125,7 @@ const C2Payload RESTSender::sendPayload(const std::string url, const Direction d C2Payload response_payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true, true); response_payload.setRawData(client.getResponseBody()); - return std::move(response_payload); + return response_payload; } return parseJsonResponse(payload, client.getResponseBody()); } else { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/extensions/http-curl/sitetosite/HTTPProtocol.cpp ---------------------------------------------------------------------- diff --git a/extensions/http-curl/sitetosite/HTTPProtocol.cpp b/extensions/http-curl/sitetosite/HTTPProtocol.cpp index 59c5259..d70ab45 100644 --- a/extensions/http-curl/sitetosite/HTTPProtocol.cpp +++ b/extensions/http-curl/sitetosite/HTTPProtocol.cpp @@ -66,7 +66,8 @@ std::shared_ptr<Transaction> HttpSiteToSiteClient::createTransaction(std::string client->setUseChunkedEncoding(); client->setPostFields(""); client->submit(); - peer_->setStream(nullptr); + if (peer_->getStream() != nullptr) + logger_->log_info("Closing %s",((io::HttpStream*)peer_->getStream())->getClientRef()->getURL()); if (client->getResponseCode() == 201) { // parse the headers auto headers = client->getParsedHeaders(); @@ -105,6 +106,7 @@ std::shared_ptr<Transaction> HttpSiteToSiteClient::createTransaction(std::string logger_->log_debug("Could not create transaction, intent is %s", intent_name); } } else { + peer_->setStream(nullptr); logger_->log_debug("Could not create transaction, received %d", client->getResponseCode()); } return nullptr; @@ -208,7 +210,7 @@ bool HttpSiteToSiteClient::getPeerList(std::vector<PeerStatus> &peers) { std::shared_ptr<minifi::utils::HTTPClient> HttpSiteToSiteClient::openConnectionForSending(const std::shared_ptr<HttpTransaction> &transaction) { std::stringstream uri; uri << transaction->getTransactionUrl() << "/flow-files"; - std::shared_ptr<minifi::utils::HTTPClient> client = std::move(create_http_client(uri.str(), "POST")); + std::shared_ptr<minifi::utils::HTTPClient> client = create_http_client(uri.str(), "POST"); client->setContentType("application/octet-stream"); client->appendHeader("Accept", "text/plain"); client->setUseChunkedEncoding(); @@ -218,7 +220,7 @@ std::shared_ptr<minifi::utils::HTTPClient> HttpSiteToSiteClient::openConnectionF std::shared_ptr<minifi::utils::HTTPClient> HttpSiteToSiteClient::openConnectionForReceive(const std::shared_ptr<HttpTransaction> &transaction) { std::stringstream uri; uri << transaction->getTransactionUrl() << "/flow-files"; - std::shared_ptr<minifi::utils::HTTPClient> client = std::move(create_http_client(uri.str(), "GET")); + std::shared_ptr<minifi::utils::HTTPClient> client = create_http_client(uri.str(), "GET"); return client; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/extensions/http-curl/sitetosite/HTTPProtocol.h ---------------------------------------------------------------------- diff --git a/extensions/http-curl/sitetosite/HTTPProtocol.h b/extensions/http-curl/sitetosite/HTTPProtocol.h index bbef3c9..26b59b3 100644 --- a/extensions/http-curl/sitetosite/HTTPProtocol.h +++ b/extensions/http-curl/sitetosite/HTTPProtocol.h @@ -179,9 +179,8 @@ class HttpSiteToSiteClient : public sitetosite::SiteToSiteClient { private: - std::shared_ptr<logging::Logger> logger_; - RespondCode current_code; + std::shared_ptr<logging::Logger> logger_; // Prevent default copy constructor and assignment operation // Only support pass by reference or pointer HttpSiteToSiteClient(const HttpSiteToSiteClient &parent); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/extensions/http-curl/sitetosite/HTTPTransaction.h ---------------------------------------------------------------------- diff --git a/extensions/http-curl/sitetosite/HTTPTransaction.h b/extensions/http-curl/sitetosite/HTTPTransaction.h index 6a0dbc3..a3c83bd 100644 --- a/extensions/http-curl/sitetosite/HTTPTransaction.h +++ b/extensions/http-curl/sitetosite/HTTPTransaction.h @@ -39,7 +39,6 @@ class HttpTransaction : public sitetosite::Transaction { explicit HttpTransaction(sitetosite::TransferDirection direction, org::apache::nifi::minifi::io::CRCStream<SiteToSitePeer> &stream) : Transaction(direction, stream), client_ref_(nullptr) { - } ~HttpTransaction(){ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/extensions/libarchive/CompressContent.h ---------------------------------------------------------------------- diff --git a/extensions/libarchive/CompressContent.h b/extensions/libarchive/CompressContent.h index 6041668..03e3646 100644 --- a/extensions/libarchive/CompressContent.h +++ b/extensions/libarchive/CompressContent.h @@ -206,7 +206,6 @@ public: } int64_t process(std::shared_ptr<io::BaseStream> stream) { - int64_t ret = 0; struct archive *arch; int r; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/extensions/libarchive/FocusArchiveEntry.cpp ---------------------------------------------------------------------- diff --git a/extensions/libarchive/FocusArchiveEntry.cpp b/extensions/libarchive/FocusArchiveEntry.cpp index b3ed1fa..afcef8b 100644 --- a/extensions/libarchive/FocusArchiveEntry.cpp +++ b/extensions/libarchive/FocusArchiveEntry.cpp @@ -295,8 +295,9 @@ int64_t FocusArchiveEntry::ReadCallback::process(std::shared_ptr<io::BaseStream> return nlen; } -FocusArchiveEntry::ReadCallback::ReadCallback(core::Processor *processor,fileutils::FileManager *file_man, ArchiveMetadata *archiveMetadata) - : proc_(processor), file_man_(file_man) { +FocusArchiveEntry::ReadCallback::ReadCallback(core::Processor *processor, fileutils::FileManager *file_man, ArchiveMetadata *archiveMetadata) + : file_man_(file_man), + proc_(processor) { logger_ = logging::LoggerFactory<FocusArchiveEntry>::getLogger(); _archiveMetadata = archiveMetadata; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/extensions/libarchive/MergeContent.h ---------------------------------------------------------------------- diff --git a/extensions/libarchive/MergeContent.h b/extensions/libarchive/MergeContent.h index 312790c..d73eb71 100644 --- a/extensions/libarchive/MergeContent.h +++ b/extensions/libarchive/MergeContent.h @@ -193,7 +193,6 @@ public: } int64_t process(std::shared_ptr<io::BaseStream> stream) { - int64_t ret = 0; struct archive *arch; arch = archive_write_new(); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/extensions/pcap/CapturePacket.h ---------------------------------------------------------------------- diff --git a/extensions/pcap/CapturePacket.h b/extensions/pcap/CapturePacket.h index 0e5f6a6..a63d94c 100644 --- a/extensions/pcap/CapturePacket.h +++ b/extensions/pcap/CapturePacket.h @@ -45,8 +45,8 @@ class CapturePacketMechanism { public: explicit CapturePacketMechanism(const std::string &base_path, const std::string &file, int64_t *max_size) : writer_(nullptr), - file_(file), path_(base_path), + file_(file), max_size_(max_size) { atomic_count_.store(0); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/extensions/rocksdb-repos/DatabaseContentRepository.h ---------------------------------------------------------------------- diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.h b/extensions/rocksdb-repos/DatabaseContentRepository.h index 676e9dd..3c21e04 100644 --- a/extensions/rocksdb-repos/DatabaseContentRepository.h +++ b/extensions/rocksdb-repos/DatabaseContentRepository.h @@ -72,9 +72,9 @@ class DatabaseContentRepository : public core::ContentRepository, public core::C DatabaseContentRepository(std::string name = getClassName<DatabaseContentRepository>(), uuid_t uuid = 0) : core::Connectable(name, uuid), - logger_(logging::LoggerFactory<DatabaseContentRepository>::getLogger()), is_valid_(false), - db_(nullptr) { + db_(nullptr), + logger_(logging::LoggerFactory<DatabaseContentRepository>::getLogger()) { } virtual ~DatabaseContentRepository() { stop(); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/extensions/rocksdb-repos/FlowFileRepository.cpp ---------------------------------------------------------------------- diff --git a/extensions/rocksdb-repos/FlowFileRepository.cpp b/extensions/rocksdb-repos/FlowFileRepository.cpp index df2be6e..d5b403c 100644 --- a/extensions/rocksdb-repos/FlowFileRepository.cpp +++ b/extensions/rocksdb-repos/FlowFileRepository.cpp @@ -72,17 +72,15 @@ void FlowFileRepository::flush() { void FlowFileRepository::run() { // threshold for purge - uint64_t purgeThreshold = max_partition_bytes_ * 3 / 4; while (running_) { std::this_thread::sleep_for(std::chrono::milliseconds(purge_period_)); - uint64_t curTime = getTimeMillis(); flush(); uint64_t size = getRepoSize(); - if (size > max_partition_bytes_) + if (size > (uint64_t)max_partition_bytes_) repo_full_ = true; else repo_full_ = false; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/extensions/rocksdb-repos/FlowFileRepository.h ---------------------------------------------------------------------- diff --git a/extensions/rocksdb-repos/FlowFileRepository.h b/extensions/rocksdb-repos/FlowFileRepository.h index f473415..a826bad 100644 --- a/extensions/rocksdb-repos/FlowFileRepository.h +++ b/extensions/rocksdb-repos/FlowFileRepository.h @@ -55,8 +55,8 @@ class FlowFileRepository : public core::Repository, public std::enable_shared_fr FlowFileRepository(const std::string repo_name = "", std::string directory = FLOWFILE_REPOSITORY_DIRECTORY, int64_t maxPartitionMillis = MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME, int64_t maxPartitionBytes = MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, uint64_t purgePeriod = FLOWFILE_REPOSITORY_PURGE_PERIOD) : core::SerializableComponent(repo_name,0), Repository(repo_name.length() > 0 ? repo_name : core::getClassName<FlowFileRepository>(), directory, maxPartitionMillis, maxPartitionBytes, purgePeriod), - logger_(logging::LoggerFactory<FlowFileRepository>::getLogger()), - content_repo_(nullptr) { + content_repo_(nullptr), + logger_(logging::LoggerFactory<FlowFileRepository>::getLogger()){ db_ = NULL; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/extensions/rocksdb-repos/ProvenanceRepository.cpp ---------------------------------------------------------------------- diff --git a/extensions/rocksdb-repos/ProvenanceRepository.cpp b/extensions/rocksdb-repos/ProvenanceRepository.cpp index 4540a4a..6ff4056 100644 --- a/extensions/rocksdb-repos/ProvenanceRepository.cpp +++ b/extensions/rocksdb-repos/ProvenanceRepository.cpp @@ -68,7 +68,7 @@ void ProvenanceRepository::run() { std::string key = it->key().ToString(); uint64_t eventTime = eventRead.getEventTime(reinterpret_cast<uint8_t*>(const_cast<char*>(it->value().data())), it->value().size()); if (eventTime > 0) { - if ((curTime - eventTime) > max_partition_millis_) + if ((curTime - eventTime) > (uint64_t)max_partition_millis_) Delete(key); } else { logger_->log_debug("NiFi Provenance retrieve event %s fail", key.c_str()); @@ -79,7 +79,7 @@ void ProvenanceRepository::run() { } flush(); size = getRepoSize(); - if (size > max_partition_bytes_) + if (size > (uint64_t)max_partition_bytes_) repo_full_ = true; else repo_full_ = false; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/extensions/rocksdb-repos/ProvenanceRepository.h ---------------------------------------------------------------------- diff --git a/extensions/rocksdb-repos/ProvenanceRepository.h b/extensions/rocksdb-repos/ProvenanceRepository.h index 67072eb..38d63e0 100644 --- a/extensions/rocksdb-repos/ProvenanceRepository.h +++ b/extensions/rocksdb-repos/ProvenanceRepository.h @@ -190,7 +190,7 @@ class ProvenanceRepository : public core::Repository, public std::enable_shared_ for (it->SeekToFirst(); it->Valid(); it->Next()) { std::shared_ptr<ProvenanceEventRecord> eventRead = std::make_shared<ProvenanceEventRecord>(); std::string key = it->key().ToString(); - if (records.size() >= maxSize) + if (records.size() >= (uint64_t)maxSize) break; if (eventRead->DeSerialize((uint8_t *) it->value().data(), (int) it->value().size())) { records.push_back(eventRead); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/extensions/rocksdb-repos/RocksDbStream.cpp ---------------------------------------------------------------------- diff --git a/extensions/rocksdb-repos/RocksDbStream.cpp b/extensions/rocksdb-repos/RocksDbStream.cpp index 73f691e..469b537 100644 --- a/extensions/rocksdb-repos/RocksDbStream.cpp +++ b/extensions/rocksdb-repos/RocksDbStream.cpp @@ -30,10 +30,10 @@ namespace io { RocksDbStream::RocksDbStream(const std::string &path, rocksdb::DB *db, bool write_enable) : BaseStream(), - logger_(logging::LoggerFactory<RocksDbStream>::getLogger()), - db_(db), path_(path), - write_enable_(write_enable) { + write_enable_(write_enable), + db_(db), + logger_(logging::LoggerFactory<RocksDbStream>::getLogger()) { rocksdb::Status status; status = db_->Get(rocksdb::ReadOptions(), path_, &value_); if (status.ok()) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/extensions/script/python/PyBaseStream.cpp ---------------------------------------------------------------------- diff --git a/extensions/script/python/PyBaseStream.cpp b/extensions/script/python/PyBaseStream.cpp index 3e32e5b..fac0b3b 100644 --- a/extensions/script/python/PyBaseStream.cpp +++ b/extensions/script/python/PyBaseStream.cpp @@ -52,7 +52,7 @@ py::bytes PyBaseStream::read(size_t len) { auto read = stream_->readData(buffer.data(), static_cast<int>(len)); auto result = py::bytes(reinterpret_cast<char *>(buffer.data()), static_cast<size_t>(read)); - return std::move(result); + return result; } size_t PyBaseStream::write(py::bytes buf) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/extensions/usb-camera/GetUSBCamera.cpp ---------------------------------------------------------------------- diff --git a/extensions/usb-camera/GetUSBCamera.cpp b/extensions/usb-camera/GetUSBCamera.cpp index 5f87b45..761431f 100644 --- a/extensions/usb-camera/GetUSBCamera.cpp +++ b/extensions/usb-camera/GetUSBCamera.cpp @@ -30,31 +30,19 @@ namespace minifi { namespace processors { core::Property GetUSBCamera::FPS( // NOLINT - "FPS", - "Frames per second to capture from USB camera", - "1"); + "FPS", "Frames per second to capture from USB camera", "1"); core::Property GetUSBCamera::Format( // NOLINT - "Format", - "Frame format (currently only PNG and RAW are supported; RAW is a binary pixel buffer of RGB values)", - "PNG"); + "Format", "Frame format (currently only PNG and RAW are supported; RAW is a binary pixel buffer of RGB values)", "PNG"); core::Property GetUSBCamera::VendorID( // NOLINT - "USB Vendor ID", - "USB Vendor ID of camera device, in hexadecimal format", - "0x0"); + "USB Vendor ID", "USB Vendor ID of camera device, in hexadecimal format", "0x0"); core::Property GetUSBCamera::ProductID( // NOLINT - "USB Product ID", - "USB Product ID of camera device, in hexadecimal format", - "0x0"); + "USB Product ID", "USB Product ID of camera device, in hexadecimal format", "0x0"); core::Property GetUSBCamera::SerialNo( // NOLINT - "USB Serial No.", - "USB Serial No. of camera device", - ""); + "USB Serial No.", "USB Serial No. of camera device", ""); core::Relationship GetUSBCamera::Success( // NOLINT - "success", - "Sucessfully captured images sent here"); + "success", "Sucessfully captured images sent here"); core::Relationship GetUSBCamera::Failure( // NOLINT - "failure", - "Failures sent here"); + "failure", "Failures sent here"); void GetUSBCamera::initialize() { std::set<core::Property> properties; @@ -109,18 +97,12 @@ void GetUSBCamera::onFrame(uvc_frame_t *frame, void *ptr) { std::shared_ptr<OutputStreamCallback> write_cb; if (cb_data->format == "PNG") { - write_cb = std::make_shared<GetUSBCamera::PNGWriteCallback>(cb_data->png_write_mtx, - cb_data->frame_buffer, - cb_data->device_width, - cb_data->device_height); + write_cb = std::make_shared<GetUSBCamera::PNGWriteCallback>(cb_data->png_write_mtx, cb_data->frame_buffer, cb_data->device_width, cb_data->device_height); } else if (cb_data->format == "RAW") { write_cb = std::make_shared<GetUSBCamera::RawWriteCallback>(cb_data->frame_buffer); } else { cb_data->logger->log_warn("Invalid format specified (%s); defaulting to PNG", cb_data->format); - write_cb = std::make_shared<GetUSBCamera::PNGWriteCallback>(cb_data->png_write_mtx, - cb_data->frame_buffer, - cb_data->device_width, - cb_data->device_height); + write_cb = std::make_shared<GetUSBCamera::PNGWriteCallback>(cb_data->png_write_mtx, cb_data->frame_buffer, cb_data->device_width, cb_data->device_height); } session->write(flow_file, write_cb.get()); @@ -133,8 +115,7 @@ void GetUSBCamera::onFrame(uvc_frame_t *frame, void *ptr) { } } -void GetUSBCamera::onSchedule(core::ProcessContext *context, - core::ProcessSessionFactory *session_factory) { +void GetUSBCamera::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *session_factory) { std::lock_guard<std::recursive_mutex> lock(*dev_access_mtx_); double default_fps = 1; @@ -179,7 +160,7 @@ void GetUSBCamera::onSchedule(core::ProcessContext *context, cleanupUvc(); logger_->log_info("Beginning to capture frames from USB camera"); - uvc_stream_ctrl_t ctrl{}; + uvc_stream_ctrl_t ctrl { }; uvc_error_t res; res = uvc_init(&ctx_, nullptr); @@ -192,11 +173,7 @@ void GetUSBCamera::onSchedule(core::ProcessContext *context, logger_->log_info("UVC initialized"); // Locate device - res = uvc_find_device( - ctx_, &dev_, - usb_vendor_id, - usb_product_id, - usb_serial_no); + res = uvc_find_device(ctx_, &dev_, usb_vendor_id, usb_product_id, usb_serial_no); if (res < 0) { logger_->log_error("Unable to find device: %s", uvc_strerror(res)); @@ -234,25 +211,23 @@ void GetUSBCamera::onSchedule(core::ProcessContext *context, } } - case UVC_VS_FORMAT_MJPEG:logger_->log_info("Skipping MJPEG frame formats"); + case UVC_VS_FORMAT_MJPEG: + logger_->log_info("Skipping MJPEG frame formats"); - default:logger_->log_info("Found unknown format"); + default: + logger_->log_info("Found unknown format"); } } if (fps == 0) { logger_->log_error("Could not find suitable frame format from device. " - "Try changing configuration (lower FPS) or device."); + "Try changing configuration (lower FPS) or device."); return; } logger_->log_info("Negotiating stream profile (looking for %dx%d @ %d)", width, height, fps); - res = uvc_get_stream_ctrl_format_size( - devh_, &ctrl, - UVC_FRAME_FORMAT_UNCOMPRESSED, - width, height, fps - ); + res = uvc_get_stream_ctrl_format_size(devh_, &ctrl, UVC_FRAME_FORMAT_UNCOMPRESSED, width, height, fps); if (res < 0) { logger_->log_error("Failed to find a matching stream profile: %s", uvc_strerror(res)); @@ -329,8 +304,7 @@ void GetUSBCamera::cleanupUvc() { } } -void GetUSBCamera::onTrigger(core::ProcessContext *context, - core::ProcessSession *session) { +void GetUSBCamera::onTrigger(core::ProcessContext *context, core::ProcessSession *session) { auto flowFile = session->get(); if (flowFile) { @@ -339,15 +313,12 @@ void GetUSBCamera::onTrigger(core::ProcessContext *context, } } -GetUSBCamera::PNGWriteCallback::PNGWriteCallback(std::shared_ptr<std::mutex> write_mtx, - uvc_frame_t *frame, - uint32_t width, - uint32_t height) - : logger_(logging::LoggerFactory<PNGWriteCallback>::getLogger()), +GetUSBCamera::PNGWriteCallback::PNGWriteCallback(std::shared_ptr<std::mutex> write_mtx, uvc_frame_t *frame, uint32_t width, uint32_t height) + : png_write_mtx_(std::move(write_mtx)), frame_(frame), width_(width), height_(height), - png_write_mtx_(std::move(write_mtx)) { + logger_(logging::LoggerFactory<PNGWriteCallback>::getLogger()) { } int64_t GetUSBCamera::PNGWriteCallback::process(std::shared_ptr<io::BaseStream> stream) { @@ -375,27 +346,23 @@ int64_t GetUSBCamera::PNGWriteCallback::process(std::shared_ptr<io::BaseStream> try { png_set_write_fn(png, this, [](png_structp out_png, - png_bytep out_data, - png_size_t num_bytes) { + png_bytep out_data, + png_size_t num_bytes) { auto this_callback = reinterpret_cast<PNGWriteCallback *>(png_get_io_ptr(out_png)); std::copy(out_data, out_data + num_bytes, std::back_inserter(this_callback->png_output_buf_)); - }, [](png_structp flush_png) {}); - - png_set_IHDR( - png, - info, - width_, height_, - 8, - PNG_COLOR_TYPE_RGB, - PNG_INTERLACE_NONE, - PNG_COMPRESSION_TYPE_DEFAULT, - PNG_FILTER_TYPE_DEFAULT - ); + }, + [](png_structp flush_png) {}); + + png_set_IHDR(png, info, width_, height_, 8, + PNG_COLOR_TYPE_RGB, + PNG_INTERLACE_NONE, + PNG_COMPRESSION_TYPE_DEFAULT, + PNG_FILTER_TYPE_DEFAULT); png_write_info(png, info); png_bytep row_pointers[height_]; - for (int y = 0; y < height_; y++) { + for (uint32_t y = 0; y < height_; y++) { row_pointers[y] = reinterpret_cast<png_byte *>(frame_->data) + width_ * y * 3; } @@ -414,8 +381,8 @@ int64_t GetUSBCamera::PNGWriteCallback::process(std::shared_ptr<io::BaseStream> } GetUSBCamera::RawWriteCallback::RawWriteCallback(uvc_frame_t *frame) - : logger_(logging::LoggerFactory<RawWriteCallback>::getLogger()), - frame_(frame) { + : frame_(frame), + logger_(logging::LoggerFactory<RawWriteCallback>::getLogger()) { } int64_t GetUSBCamera::RawWriteCallback::process(std::shared_ptr<io::BaseStream> stream) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt index 0829072..d826a24 100644 --- a/libminifi/CMakeLists.txt +++ b/libminifi/CMakeLists.txt @@ -50,6 +50,8 @@ else() message(STATUS "The compiler ${CMAKE_CXX_COMPILER} has no C++11 support. Please use a different C++ compiler.") endif() +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wno-reorder") + include_directories(../thirdparty/spdlog-20170710/include) include_directories(../thirdparty/yaml-cpp-yaml-cpp-20171024/include) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/include/FlowController.h ---------------------------------------------------------------------- diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h index e79999f..6957c4e 100644 --- a/libminifi/include/FlowController.h +++ b/libminifi/include/FlowController.h @@ -175,7 +175,7 @@ class FlowController : public core::controller::ControllerServiceProvider, publi bool applyConfiguration(const std::string &configurePayload); // get name - std::string getName() { + std::string getName() const{ if (root_ != nullptr) return root_->getName(); else @@ -304,7 +304,7 @@ class FlowController : public core::controller::ControllerServiceProvider, publi * 1 No error condition, but cannot obtain lock in timely manner. * -1 failure */ - virtual int16_t getMetrics(std::vector<std::shared_ptr<state::metrics::Metrics>> &metric_vector, uint8_t metricsClass); + virtual int16_t getMetrics(std::vector<std::shared_ptr<state::metrics::Metrics>> &metric_vector, uint16_t metricsClass); virtual uint64_t getUptime(); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/include/FlowFileRecord.h ---------------------------------------------------------------------- diff --git a/libminifi/include/FlowFileRecord.h b/libminifi/include/FlowFileRecord.h index b82e08c..93ea74b 100644 --- a/libminifi/include/FlowFileRecord.h +++ b/libminifi/include/FlowFileRecord.h @@ -114,8 +114,8 @@ class FlowFileRecord : public core::FlowFile, public io::Serializable { explicit FlowFileRecord(std::shared_ptr<core::Repository> flow_repository, const std::shared_ptr<core::ContentRepository> &content_repo) : FlowFile(), - content_repo_(content_repo), flow_repository_(flow_repository), + content_repo_(content_repo), snapshot_("") { } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/include/RemoteProcessorGroupPort.h ---------------------------------------------------------------------- diff --git a/libminifi/include/RemoteProcessorGroupPort.h b/libminifi/include/RemoteProcessorGroupPort.h index e89d443..386e4d7 100644 --- a/libminifi/include/RemoteProcessorGroupPort.h +++ b/libminifi/include/RemoteProcessorGroupPort.h @@ -50,9 +50,11 @@ class RemoteProcessorGroupPort : public core::Processor { direction_(sitetosite::SEND), transmitting_(false), timeout_(0), - logger_(logging::LoggerFactory<RemoteProcessorGroupPort>::getLogger()), url_(url), - ssl_service(nullptr) { + http_enabled_(false), + ssl_service(nullptr), + logger_(logging::LoggerFactory<RemoteProcessorGroupPort>::getLogger()){ + client_type_ = sitetosite::CLIENT_TYPE::RAW; stream_factory_ = stream_factory; if (uuid != nullptr) { uuid_copy(protocol_uuid_, uuid); @@ -62,7 +64,7 @@ class RemoteProcessorGroupPort : public core::Processor { peer_index_ = -1; // REST API port and host port_ = -1; - utils::parse_url(url_, host_, port_, protocol_); + utils::parse_url(&url_, &host_, &port_, &protocol_); } // Destructor virtual ~RemoteProcessorGroupPort() { @@ -102,7 +104,7 @@ class RemoteProcessorGroupPort : public core::Processor { // setURL void setURL(std::string val) { url_ = val; - utils::parse_url(url_, host_, port_, protocol_); + utils::parse_url(&url_, &host_, &port_, &protocol_); if (port_ == -1) { if (protocol_.find("https") != std::string::npos) { port_ = 443; @@ -127,8 +129,6 @@ class RemoteProcessorGroupPort : public core::Processor { moodycamel::ConcurrentQueue<std::unique_ptr<sitetosite::SiteToSiteClient>> available_protocols_; std::shared_ptr<Configure> configure_; - // Logger - std::shared_ptr<logging::Logger> logger_; // Transaction Direction sitetosite::TransferDirection direction_; // Transmitting @@ -160,6 +160,8 @@ class RemoteProcessorGroupPort : public core::Processor { std::shared_ptr<controllers::SSLContextService> ssl_service; private: + // Logger + std::shared_ptr<logging::Logger> logger_; static const char* RPG_SSL_CONTEXT_SERVICE_NAME; }; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/include/SchedulingAgent.h ---------------------------------------------------------------------- diff --git a/libminifi/include/SchedulingAgent.h b/libminifi/include/SchedulingAgent.h index a4f2ade..71aceb8 100644 --- a/libminifi/include/SchedulingAgent.h +++ b/libminifi/include/SchedulingAgent.h @@ -53,9 +53,9 @@ class SchedulingAgent { */ SchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configuration) - : configure_(configuration), - admin_yield_duration_(0), + : admin_yield_duration_(0), bored_yield_duration_(0), + configure_(configuration), content_repo_(content_repo), controller_service_provider_(controller_service_provider), logger_(logging::LoggerFactory<SchedulingAgent>::getLogger()) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/include/ThreadedSchedulingAgent.h ---------------------------------------------------------------------- diff --git a/libminifi/include/ThreadedSchedulingAgent.h b/libminifi/include/ThreadedSchedulingAgent.h index d9fc7be..b01d740 100644 --- a/libminifi/include/ThreadedSchedulingAgent.h +++ b/libminifi/include/ThreadedSchedulingAgent.h @@ -39,9 +39,8 @@ namespace minifi { class TimerAwareMonitor : public utils::AfterExecute<uint64_t> { public: TimerAwareMonitor(std::atomic<bool> *run_monitor) - : run_monitor_(run_monitor), - current_wait_(0) { - + : current_wait_(0), + run_monitor_(run_monitor) { } explicit TimerAwareMonitor(TimerAwareMonitor &&other) : AfterExecute(std::move(other)), http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/include/core/Core.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/Core.h b/libminifi/include/core/Core.h index ef31ecc..3ba051d 100644 --- a/libminifi/include/core/Core.h +++ b/libminifi/include/core/Core.h @@ -99,15 +99,15 @@ class CoreComponent { /** * Constructor that sets the name and uuid. */ - explicit CoreComponent(const std::string name, uuid_t uuid = 0) + explicit CoreComponent(const std::string name, uuid_t uuid = nullptr) : name_(name) { - if (!uuid) + if (nullptr == uuid) // Generate the global UUID for the flow record id_generator_->generate(uuid_); else uuid_copy(uuid_, uuid); - char uuidStr[37]; + char uuidStr[37] = {0}; uuid_unparse_lower(uuid_, uuidStr); uuidStr_ = uuidStr; } @@ -125,7 +125,7 @@ class CoreComponent { } // Get component name Name - virtual std::string getName(); + virtual std::string getName() const; /** * Set name. http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/include/core/FlowConfiguration.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/FlowConfiguration.h b/libminifi/include/core/FlowConfiguration.h index a45958e..6e644ef 100644 --- a/libminifi/include/core/FlowConfiguration.h +++ b/libminifi/include/core/FlowConfiguration.h @@ -52,9 +52,9 @@ class FlowConfiguration : public CoreComponent { explicit FlowConfiguration(std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<io::StreamFactory> stream_factory, std::shared_ptr<Configure> configuration, const std::string path) : CoreComponent(core::getClassName<FlowConfiguration>()), + config_path_(path), flow_file_repo_(flow_file_repo), content_repo_(content_repo), - config_path_(path), stream_factory_(stream_factory), configuration_(configuration), logger_(logging::LoggerFactory<FlowConfiguration>::getLogger()) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/include/core/ProcessContext.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/ProcessContext.h b/libminifi/include/core/ProcessContext.h index a9e0013..9759c16 100644 --- a/libminifi/include/core/ProcessContext.h +++ b/libminifi/include/core/ProcessContext.h @@ -52,11 +52,11 @@ class ProcessContext : public controller::ControllerServiceLookup { */ ProcessContext(const std::shared_ptr<ProcessorNode> &processor, std::shared_ptr<controller::ControllerServiceProvider> &controller_service_provider, const std::shared_ptr<core::Repository> &repo, const std::shared_ptr<core::Repository> &flow_repo, const std::shared_ptr<core::ContentRepository> &content_repo = std::make_shared<core::repository::FileSystemRepository>()) - : processor_node_(processor), - controller_service_provider_(controller_service_provider), - logger_(logging::LoggerFactory<ProcessContext>::getLogger()), + : controller_service_provider_(controller_service_provider), + flow_repo_(flow_repo), content_repo_(content_repo), - flow_repo_(flow_repo) { + processor_node_(processor), + logger_(logging::LoggerFactory<ProcessContext>::getLogger()) { repo_ = repo; } // Destructor http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/include/core/Property.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/Property.h b/libminifi/include/core/Property.h index 7ee0c15..806f870 100644 --- a/libminifi/include/core/Property.h +++ b/libminifi/include/core/Property.h @@ -58,21 +58,21 @@ class Property { */ Property(const std::string name, const std::string description, const std::string value) : name_(name), - isCollection(false), - description_(description) { + description_(description), + isCollection(false) { values_.push_back(std::string(value.c_str())); } Property(const std::string name, const std::string description) : name_(name), - isCollection(true), - description_(description) { + description_(description), + isCollection(true) { } Property() - : isCollection(false), - name_(""), - description_("") { + : name_(""), + description_(""), + isCollection(false) { } @@ -98,7 +98,8 @@ class Property { bool operator <(const Property & right) const; // Convert TimeUnit to MilliSecond - static bool ConvertTimeUnitToMS(int64_t input, TimeUnit unit, int64_t &out) { + template<typename T> + static bool ConvertTimeUnitToMS(int64_t input, TimeUnit unit, T &out) { if (unit == MILLISECOND) { out = input; return true; @@ -121,8 +122,18 @@ class Property { return false; } } + + static bool ConvertTimeUnitToMS(int64_t input, TimeUnit unit, int64_t &out) { + return ConvertTimeUnitToMS<int64_t>(input, unit, out); + } + + static bool ConvertTimeUnitToMS(int64_t input, TimeUnit unit, uint64_t &out) { + return ConvertTimeUnitToMS<uint64_t>(input, unit, out); + } + // Convert TimeUnit to NanoSecond - static bool ConvertTimeUnitToNS(int64_t input, TimeUnit unit, int64_t &out) { + template<typename T> + static bool ConvertTimeUnitToNS(int64_t input, TimeUnit unit, T &out) { if (unit == MILLISECOND) { out = input * 1000 * 1000; return true; @@ -142,6 +153,71 @@ class Property { return false; } } + + // Convert TimeUnit to NanoSecond + static bool ConvertTimeUnitToNS(int64_t input, TimeUnit unit, uint64_t &out) { + return ConvertTimeUnitToNS<uint64_t>(input, unit, out); + } + + // Convert TimeUnit to NanoSecond + static bool ConvertTimeUnitToNS(int64_t input, TimeUnit unit, int64_t &out) { + return ConvertTimeUnitToNS<int64_t>(input, unit, out); + } + + // Convert String + static bool StringToTime(std::string input, uint64_t &output, TimeUnit &timeunit) { + if (input.size() == 0) { + return false; + } + + const char *cvalue = input.c_str(); + char *pEnd; + long int ival = strtol(cvalue, &pEnd, 0); + + if (pEnd[0] == '\0') { + return false; + } + + while (*pEnd == ' ') { + // Skip the space + pEnd++; + } + + std::string unit(pEnd); + std::transform(unit.begin(), unit.end(), unit.begin(), ::tolower); + + if (unit == "sec" || unit == "s" || unit == "second" || unit == "seconds" || unit == "secs") { + timeunit = SECOND; + output = ival; + return true; + } else if (unit == "msec" || unit == "ms" || unit == "millisecond" || unit == "milliseconds" || unit == "msecs") { + timeunit = MILLISECOND; + output = ival; + return true; + } else if (unit == "min" || unit == "m" || unit == "mins" || unit == "minute" || unit == "minutes") { + timeunit = MINUTE; + output = ival; + return true; + } else if (unit == "ns" || unit == "nano" || unit == "nanos" || unit == "nanoseconds") { + timeunit = NANOSECOND; + output = ival; + return true; + } else if (unit == "ms" || unit == "milli" || unit == "millis" || unit == "milliseconds") { + timeunit = MILLISECOND; + output = ival; + return true; + } else if (unit == "h" || unit == "hr" || unit == "hour" || unit == "hrs" || unit == "hours") { + timeunit = HOUR; + output = ival; + return true; + } else if (unit == "d" || unit == "day" || unit == "days") { + timeunit = DAY; + output = ival; + return true; + } else + return false; + } + // Convert String static bool StringToTime(std::string input, int64_t &output, TimeUnit &timeunit) { if (input.size() == 0) { @@ -197,7 +273,8 @@ class Property { } // Convert String to Integer - static bool StringToInt(std::string input, int64_t &output) { + template<typename T> + static bool StringToInt(std::string input, T &output) { if (input.size() == 0) { return false; } @@ -263,12 +340,21 @@ class Property { return false; } + static bool StringToInt(std::string input, int64_t &output) { + return StringToInt<int64_t>(input, output); + } + + // Convert String to Integer + static bool StringToInt(std::string input, uint64_t &output) { + return StringToInt<uint64_t>(input, output); + } + protected: - bool isCollection; // Name std::string name_; // Description std::string description_; + bool isCollection; // Value std::vector<std::string> values_; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/include/core/controller/ControllerServiceNode.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/controller/ControllerServiceNode.h b/libminifi/include/core/controller/ControllerServiceNode.h index 8f30e4e..341ba90 100644 --- a/libminifi/include/core/controller/ControllerServiceNode.h +++ b/libminifi/include/core/controller/ControllerServiceNode.h @@ -45,9 +45,9 @@ class ControllerServiceNode : public CoreComponent, public ConfigurableComponent explicit ControllerServiceNode(std::shared_ptr<ControllerService> service, const std::string &id, std::shared_ptr<Configure> configuration) : CoreComponent(id), ConfigurableComponent(), - controller_service_(service), + active(false), configuration_(configuration), - active(false) { + controller_service_(service){ if (service == nullptr || IsNullOrEmpty(service.get())) { throw Exception(GENERAL_EXCEPTION, "Service must be properly configured"); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/include/core/controller/StandardControllerServiceProvider.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/controller/StandardControllerServiceProvider.h b/libminifi/include/core/controller/StandardControllerServiceProvider.h index 4cb33a7..5063954 100644 --- a/libminifi/include/core/controller/StandardControllerServiceProvider.h +++ b/libminifi/include/core/controller/StandardControllerServiceProvider.h @@ -44,9 +44,9 @@ class StandardControllerServiceProvider : public ControllerServiceProvider, publ explicit StandardControllerServiceProvider(std::shared_ptr<ControllerServiceMap> services, std::shared_ptr<ProcessGroup> root_group, std::shared_ptr<Configure> configuration, std::shared_ptr<minifi::SchedulingAgent> agent, ClassLoader &loader = ClassLoader::getDefaultClassLoader()) : ControllerServiceProvider(services), - root_group_(root_group), agent_(agent), extension_loader_(loader), + root_group_(root_group), configuration_(configuration), logger_(logging::LoggerFactory<StandardControllerServiceProvider>::getLogger()) { } @@ -54,18 +54,18 @@ class StandardControllerServiceProvider : public ControllerServiceProvider, publ explicit StandardControllerServiceProvider(std::shared_ptr<ControllerServiceMap> services, std::shared_ptr<ProcessGroup> root_group, std::shared_ptr<Configure> configuration, ClassLoader &loader = ClassLoader::getDefaultClassLoader()) : ControllerServiceProvider(services), - root_group_(root_group), - agent_(0), + agent_(nullptr), extension_loader_(loader), + root_group_(root_group), configuration_(configuration), logger_(logging::LoggerFactory<StandardControllerServiceProvider>::getLogger()) { } explicit StandardControllerServiceProvider(const StandardControllerServiceProvider && other) : ControllerServiceProvider(std::move(other)), - root_group_(std::move(other.root_group_)), agent_(std::move(other.agent_)), extension_loader_(other.extension_loader_), + root_group_(std::move(other.root_group_)), configuration_(other.configuration_), logger_(logging::LoggerFactory<StandardControllerServiceProvider>::getLogger()) { @@ -189,9 +189,10 @@ class StandardControllerServiceProvider : public ControllerServiceProvider, publ ClassLoader &extension_loader_; + std::shared_ptr<ProcessGroup> root_group_; + std::shared_ptr<Configure> configuration_; - std::shared_ptr<ProcessGroup> root_group_; private: std::shared_ptr<logging::Logger> logger_; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/include/core/repository/AtomicRepoEntries.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/repository/AtomicRepoEntries.h b/libminifi/include/core/repository/AtomicRepoEntries.h index e96f19b..4235a23 100644 --- a/libminifi/include/core/repository/AtomicRepoEntries.h +++ b/libminifi/include/core/repository/AtomicRepoEntries.h @@ -28,8 +28,6 @@ #include <map> #include <iterator> -static uint16_t accounting_size = sizeof(std::vector<uint8_t>) + sizeof(std::string) + sizeof(size_t); - namespace org { namespace apache { namespace nifi { @@ -183,13 +181,12 @@ class AtomicEntry { * size allowd by this and other atomic entries. */ explicit AtomicEntry(std::atomic<size_t> *total_size, size_t *max_size) - : write_pending_(false), - has_value_(false), - accumulated_repo_size_(total_size), + : accumulated_repo_size_(total_size), max_repo_size_(max_size), + write_pending_(false), + has_value_(false), ref_count_(0), free_required(false) { - } /** http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/include/core/repository/VolatileContentRepository.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/repository/VolatileContentRepository.h b/libminifi/include/core/repository/VolatileContentRepository.h index 81e7a29..b26df0b 100644 --- a/libminifi/include/core/repository/VolatileContentRepository.h +++ b/libminifi/include/core/repository/VolatileContentRepository.h @@ -43,10 +43,10 @@ class VolatileContentRepository : public core::ContentRepository, public virtual static const char *minimal_locking; explicit VolatileContentRepository(std::string name = getClassName<VolatileContentRepository>()) - : core::repository::VolatileRepository<std::shared_ptr<minifi::ResourceClaim>>(name), - core::SerializableComponent(name,0), - logger_(logging::LoggerFactory<VolatileContentRepository>::getLogger()), - minimize_locking_(true) { + : core::SerializableComponent(name, 0), + core::repository::VolatileRepository<std::shared_ptr<minifi::ResourceClaim>>(name), + minimize_locking_(true), + logger_(logging::LoggerFactory<VolatileContentRepository>::getLogger()) { max_count_ = 15000; } virtual ~VolatileContentRepository() { @@ -122,14 +122,14 @@ class VolatileContentRepository : public core::ContentRepository, public virtual std::function<bool(std::shared_ptr<minifi::ResourceClaim>)> resource_claim_check_; std::function<void(std::shared_ptr<minifi::ResourceClaim>)> claim_reclaimer_; - // logger - std::shared_ptr<logging::Logger> logger_; - // mutex and master list that represent a cache of Atomic entries. this exists so that we don't have to walk the atomic entry list. // The idea is to reduce the computational complexity while keeping access as maximally lock free as we can. std::mutex map_mutex_; std::map<std::string, AtomicEntry<std::shared_ptr<minifi::ResourceClaim>>*> master_list_; + + // logger + std::shared_ptr<logging::Logger> logger_; }; } /* namespace repository */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/include/core/repository/VolatileRepository.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/repository/VolatileRepository.h b/libminifi/include/core/repository/VolatileRepository.h index 3b5d5e1..01bf165 100644 --- a/libminifi/include/core/repository/VolatileRepository.h +++ b/libminifi/include/core/repository/VolatileRepository.h @@ -34,7 +34,13 @@ namespace nifi { namespace minifi { namespace core { namespace repository { - +#if defined(__clang__) +#pragma clang diagnostic push +#pragma clang diagnostic ignored "-Woverloaded-virtual" +#elif defined(__GNUC__) || defined(__GNUG__) +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Woverloaded-virtual" +#endif /** * Flow File repository * Design: Extends Repository and implements the run function, using RocksDB as the primary substrate. @@ -52,10 +58,10 @@ class VolatileRepository : public core::Repository, public std::enable_shared_fr uint64_t purgePeriod = REPOSITORY_PURGE_PERIOD) : core::SerializableComponent(repo_name, 0), Repository(repo_name.length() > 0 ? repo_name : core::getClassName<VolatileRepository>(), "", maxPartitionMillis, maxPartitionBytes, purgePeriod), - max_size_(maxPartitionBytes * 0.75), + current_size_(0), current_index_(0), max_count_(10000), - current_size_(0), + max_size_(maxPartitionBytes * 0.75), logger_(logging::LoggerFactory<VolatileRepository>::getLogger()) { @@ -216,7 +222,7 @@ bool VolatileRepository<T>::initialize(const std::shared_ptr<Configure> &configu logger_->log_info("Resizing value_vector_ for %s count is %d", getName(), max_count_); logger_->log_info("Using a maximum size for %s of %u", getName(), max_size_); value_vector_.reserve(max_count_); - for (int i = 0; i < max_count_; i++) { + for (uint32_t i = 0; i < max_count_; i++) { value_vector_.emplace_back(new AtomicEntry<T>(¤t_size_, &max_size_)); } return true; @@ -236,7 +242,7 @@ bool VolatileRepository<T>::Put(T key, const uint8_t *buf, size_t bufLen) { size_t reclaimed_size = 0; RepoValue<T> old_value; do { - int private_index = current_index_.fetch_add(1); + uint16_t private_index = current_index_.fetch_add(1); // round robin through the beginning if (private_index >= max_count_) { uint16_t new_index = 0; @@ -374,6 +380,12 @@ void VolatileRepository<T>::start() { thread_ = std::thread(&VolatileRepository<T>::run, std::enable_shared_from_this<VolatileRepository<T>>::shared_from_this()); logger_->log_info("%s Repository Monitor Thread Start", name_); } +#if defined(__clang__) +#pragma clang diagnostic pop +#elif defined(__GNUC__) || defined(__GNUG__) +#pragma GCC diagnostic pop +#endif + } /* namespace repository */ } /* namespace core */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/include/core/state/metrics/DeviceInformation.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/state/metrics/DeviceInformation.h b/libminifi/include/core/state/metrics/DeviceInformation.h index 9579877..20e29b3 100644 --- a/libminifi/include/core/state/metrics/DeviceInformation.h +++ b/libminifi/include/core/state/metrics/DeviceInformation.h @@ -58,8 +58,6 @@ class Device { initialize(); } void initialize() { - struct sockaddr_in servAddr; - addrinfo hints = { sizeof(addrinfo) }; memset(&hints, 0, sizeof hints); // make sure the struct is empty hints.ai_family = AF_UNSPEC; @@ -172,7 +170,6 @@ class Device { struct ifreq ifr; struct ifconf ifc; char buf[1024]; - int success = 0; ifc.ifc_len = sizeof(buf); ifc.ifc_buf = buf; if (ioctl(sock, SIOCGIFCONF, &ifc) == -1) { /* handle error */} @@ -207,7 +204,6 @@ class Device { #elif( defined(__unix__) || defined(__APPLE__) || defined(__MACH__) || defined(BSD)) // should work on bsd variants as well std::string getDeviceId() { ifaddrs* iflist; - bool found = false; std::hash<std::string> hash_fn; std::set<std::string> macs; @@ -226,8 +222,6 @@ class Device { snprintf(mac_add, 13, "%02X%02X%02X%02X%02X%02X", mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]); ///macs += mac_add; macs.insert(mac_add); - } else if (cur->ifa_addr->sa_family == AF_INET) { - struct sockaddr_in* sockInfoIP = (struct sockaddr_in*) cur->ifa_addr; } } @@ -273,7 +267,7 @@ class DeviceInformation : public DeviceMetric { device_id_ = device.device_id_; } - std::string getName() { + std::string getName() const{ return "NetworkInfo"; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/include/core/state/metrics/MetricsBase.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/state/metrics/MetricsBase.h b/libminifi/include/core/state/metrics/MetricsBase.h index cc96298..b33ad9a 100644 --- a/libminifi/include/core/state/metrics/MetricsBase.h +++ b/libminifi/include/core/state/metrics/MetricsBase.h @@ -58,7 +58,7 @@ class Metrics : public core::Connectable { virtual ~Metrics() { } - virtual std::string getName() = 0; + virtual std::string getName() const = 0; virtual std::vector<MetricResponse> serialize() = 0; @@ -127,7 +127,7 @@ class MetricsReporter { * 1 No error condition, but cannot obtain lock in timely manner. * -1 failure */ - virtual int16_t getMetrics(std::vector<std::shared_ptr<Metrics>> &metric_vector, uint8_t metricsClass) = 0; + virtual int16_t getMetrics(std::vector<std::shared_ptr<Metrics>> &metric_vector, uint16_t metricsClass) = 0; }; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/include/core/state/metrics/MetricsListener.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/state/metrics/MetricsListener.h b/libminifi/include/core/state/metrics/MetricsListener.h index 34df476..c471f60 100644 --- a/libminifi/include/core/state/metrics/MetricsListener.h +++ b/libminifi/include/core/state/metrics/MetricsListener.h @@ -66,7 +66,7 @@ class MetricsWatcher : public utils::AfterExecute<Update> { return true; } } - virtual bool isCancelled(const UpdateStatus &result) { + virtual bool isCancelled(const Update &result) { return false; } @@ -79,8 +79,8 @@ class MetricsListener { public: MetricsListener(const std::shared_ptr<metrics::MetricsReporter> &source, const std::shared_ptr<metrics::MetricsSink> &sink) : running_(true), - sink_(sink), - source_(source) { + source_(source), + sink_(sink){ function_ = [&]() { while(running_) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/include/core/state/metrics/ProcessMetrics.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/state/metrics/ProcessMetrics.h b/libminifi/include/core/state/metrics/ProcessMetrics.h index f3f911d..f3b6d23 100644 --- a/libminifi/include/core/state/metrics/ProcessMetrics.h +++ b/libminifi/include/core/state/metrics/ProcessMetrics.h @@ -52,7 +52,7 @@ class ProcessMetrics : public Metrics { ProcessMetrics() { } - std::string getName() { + virtual std::string getName() const { return "ProcessMetrics"; }
