http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/include/core/state/metrics/QueueMetrics.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/state/metrics/QueueMetrics.h b/libminifi/include/core/state/metrics/QueueMetrics.h index 018e70b..b55f164 100644 --- a/libminifi/include/core/state/metrics/QueueMetrics.h +++ b/libminifi/include/core/state/metrics/QueueMetrics.h @@ -50,7 +50,7 @@ class QueueMetrics : public Metrics { : Metrics("QueueMetrics", 0) { } - std::string getName() { + virtual std::string getName() const{ return "QueueMetrics"; }
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/include/core/state/metrics/RepositoryMetrics.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/state/metrics/RepositoryMetrics.h b/libminifi/include/core/state/metrics/RepositoryMetrics.h index fa37e94..8592257 100644 --- a/libminifi/include/core/state/metrics/RepositoryMetrics.h +++ b/libminifi/include/core/state/metrics/RepositoryMetrics.h @@ -50,7 +50,7 @@ class RepositoryMetrics : public Metrics { : Metrics("RepositoryMetrics", 0) { } - std::string getName() { + virtual std::string getName() const { return "RepositoryMetrics"; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/include/core/state/metrics/SystemMetrics.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/state/metrics/SystemMetrics.h b/libminifi/include/core/state/metrics/SystemMetrics.h index 4ac2b52..8bcf5be 100644 --- a/libminifi/include/core/state/metrics/SystemMetrics.h +++ b/libminifi/include/core/state/metrics/SystemMetrics.h @@ -53,7 +53,7 @@ class SystemInformation : public DeviceInformation { : DeviceInformation("SystemInformation", 0) { } - std::string getName() { + virtual std::string getName() const{ return "SystemInformation"; } @@ -62,7 +62,6 @@ class SystemInformation : public DeviceInformation { MetricResponse vcores; vcores.name = "vcores"; - int cpus[2] = { 0 }; size_t ncpus = std::thread::hardware_concurrency(); vcores.value = std::to_string(ncpus); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/include/core/yaml/YamlConfiguration.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/yaml/YamlConfiguration.h b/libminifi/include/core/yaml/YamlConfiguration.h index c5153f4..7dc58f2 100644 --- a/libminifi/include/core/yaml/YamlConfiguration.h +++ b/libminifi/include/core/yaml/YamlConfiguration.h @@ -69,9 +69,9 @@ class YamlConfiguration : public FlowConfiguration { * @return the root ProcessGroup node of the flow * configuration tree */ - std::unique_ptr<core::ProcessGroup> getRoot(const std::string &yamlConfigFile) { + virtual std::unique_ptr<core::ProcessGroup> getRoot(const std::string &yamlConfigFile) { YAML::Node rootYamlNode = YAML::LoadFile(yamlConfigFile); - return getRoot(&rootYamlNode); + return getYamlRoot(&rootYamlNode); } /** @@ -85,9 +85,9 @@ class YamlConfiguration : public FlowConfiguration { * @return the root ProcessGroup node of the flow * configuration tree */ - std::unique_ptr<core::ProcessGroup> getRoot(std::istream &yamlConfigStream) { + std::unique_ptr<core::ProcessGroup> getYamlRoot(std::istream &yamlConfigStream) { YAML::Node rootYamlNode = YAML::Load(yamlConfigStream); - return getRoot(&rootYamlNode); + return getYamlRoot(&rootYamlNode); } /** @@ -103,7 +103,7 @@ class YamlConfiguration : public FlowConfiguration { */ std::unique_ptr<core::ProcessGroup> getRootFromPayload(const std::string &yamlConfigPayload) { YAML::Node rootYamlNode = YAML::Load(yamlConfigPayload); - return getRoot(&rootYamlNode); + return getYamlRoot(&rootYamlNode); } protected: @@ -119,7 +119,7 @@ class YamlConfiguration : public FlowConfiguration { * @return the root ProcessGroup node of the flow * configuration tree */ - std::unique_ptr<core::ProcessGroup> getRoot(YAML::Node *rootYamlNode) { + std::unique_ptr<core::ProcessGroup> getYamlRoot(YAML::Node *rootYamlNode) { YAML::Node rootYaml = *rootYamlNode; YAML::Node flowControllerNode = rootYaml[CONFIG_YAML_FLOW_CONTROLLER_KEY]; YAML::Node processorsNode = rootYaml[CONFIG_YAML_PROCESSORS_KEY]; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/include/io/AtomicEntryStream.h ---------------------------------------------------------------------- diff --git a/libminifi/include/io/AtomicEntryStream.h b/libminifi/include/io/AtomicEntryStream.h index 181b69d..7ddf9df 100644 --- a/libminifi/include/io/AtomicEntryStream.h +++ b/libminifi/include/io/AtomicEntryStream.h @@ -129,7 +129,7 @@ void AtomicEntryStream<T>::seek(uint64_t offset) { template<typename T> int AtomicEntryStream<T>::writeData(std::vector<uint8_t> &buf, int buflen) { - if (buf.capacity() < buflen || invalid_stream_) + if ((int)buf.capacity() < buflen || invalid_stream_) return -1; return writeData(reinterpret_cast<uint8_t *>(&buf[0]), buflen); } @@ -159,7 +159,7 @@ int AtomicEntryStream<T>::readData(std::vector<uint8_t> &buf, int buflen) { if (invalid_stream_) { return -1; } - if (buf.capacity() < buflen) { + if ((int)buf.capacity() < buflen) { buf.resize(buflen); } int ret = readData(reinterpret_cast<uint8_t*>(&buf[0]), buflen); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/include/io/CRCStream.h ---------------------------------------------------------------------- diff --git a/libminifi/include/io/CRCStream.h b/libminifi/include/io/CRCStream.h index 7031332..0ca00ba 100644 --- a/libminifi/include/io/CRCStream.h +++ b/libminifi/include/io/CRCStream.h @@ -45,14 +45,13 @@ class CRCStream : public BaseStream { explicit CRCStream(CRCStream<T> &&move); virtual ~CRCStream() { - } - T *getstream(){ + T *getstream() { return child_stream_; } - void disableEncoding(){ + void disableEncoding() { disable_encoding_ = true; } @@ -140,7 +139,6 @@ class CRCStream : public BaseStream { return crc_; } - void reset(); protected: @@ -165,20 +163,22 @@ class CRCStream : public BaseStream { template<typename T> CRCStream<T>::CRCStream(T *other) - : child_stream_(other), disable_encoding_(false) { + : child_stream_(other), + disable_encoding_(false) { crc_ = crc32(0L, Z_NULL, 0); } template<typename T> CRCStream<T>::CRCStream(CRCStream<T> &&move) - : crc_(std::move(move.crc_)), disable_encoding_(false), - child_stream_(std::move(move.child_stream_)) { + : crc_(std::move(move.crc_)), + child_stream_(std::move(move.child_stream_)), + disable_encoding_(false) { } template<typename T> int CRCStream<T>::readData(std::vector<uint8_t> &buf, int buflen) { - if (buf.capacity() < buflen) + if ((int)buf.capacity() < buflen) buf.resize(buflen); return readData((uint8_t*) &buf[0], buflen); } @@ -193,7 +193,7 @@ int CRCStream<T>::readData(uint8_t *buf, int buflen) { template<typename T> int CRCStream<T>::writeData(std::vector<uint8_t> &buf, int buflen) { - if (buf.capacity() < buflen) + if ((int)buf.capacity() < buflen) buf.resize(buflen); return writeData((uint8_t*) &buf[0], buflen); } @@ -219,7 +219,7 @@ template<typename T> int CRCStream<T>::write(uint64_t base_value, bool is_little_endian) { if (disable_encoding_) - is_little_endian=false; + is_little_endian = false; const uint64_t value = is_little_endian == 1 ? htonll_r(base_value) : base_value; uint8_t bytes[sizeof value]; std::copy(static_cast<const char*>(static_cast<const void*>(&value)), static_cast<const char*>(static_cast<const void*>(&value)) + sizeof value, bytes); @@ -229,7 +229,7 @@ int CRCStream<T>::write(uint64_t base_value, bool is_little_endian) { template<typename T> int CRCStream<T>::write(uint32_t base_value, bool is_little_endian) { if (disable_encoding_) - is_little_endian=false; + is_little_endian = false; const uint32_t value = is_little_endian ? htonl(base_value) : base_value; uint8_t bytes[sizeof value]; std::copy(static_cast<const char*>(static_cast<const void*>(&value)), static_cast<const char*>(static_cast<const void*>(&value)) + sizeof value, bytes); @@ -239,7 +239,7 @@ int CRCStream<T>::write(uint32_t base_value, bool is_little_endian) { template<typename T> int CRCStream<T>::write(uint16_t base_value, bool is_little_endian) { if (disable_encoding_) - is_little_endian=false; + is_little_endian = false; const uint16_t value = is_little_endian == 1 ? htons(base_value) : base_value; uint8_t bytes[sizeof value]; std::copy(static_cast<const char*>(static_cast<const void*>(&value)), static_cast<const char*>(static_cast<const void*>(&value)) + sizeof value, bytes); @@ -249,7 +249,7 @@ int CRCStream<T>::write(uint16_t base_value, bool is_little_endian) { template<typename T> int CRCStream<T>::read(uint64_t &value, bool is_little_endian) { if (disable_encoding_) - is_little_endian=false; + is_little_endian = false; auto buf = readBuffer(value); if (is_little_endian) { @@ -265,7 +265,7 @@ int CRCStream<T>::read(uint64_t &value, bool is_little_endian) { template<typename T> int CRCStream<T>::read(uint32_t &value, bool is_little_endian) { if (disable_encoding_) - is_little_endian=false; + is_little_endian = false; auto buf = readBuffer(value); if (is_little_endian) { @@ -281,7 +281,7 @@ int CRCStream<T>::read(uint32_t &value, bool is_little_endian) { template<typename T> int CRCStream<T>::read(uint16_t &value, bool is_little_endian) { if (disable_encoding_) - is_little_endian=false; + is_little_endian = false; auto buf = readBuffer(value); if (is_little_endian) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/include/io/DataStream.h ---------------------------------------------------------------------- diff --git a/libminifi/include/io/DataStream.h b/libminifi/include/io/DataStream.h index be7ebe5..78874f4 100644 --- a/libminifi/include/io/DataStream.h +++ b/libminifi/include/io/DataStream.h @@ -42,7 +42,7 @@ class DataStream { } - ~DataStream() { + virtual ~DataStream() { } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/include/io/NonConvertingStream.h ---------------------------------------------------------------------- diff --git a/libminifi/include/io/NonConvertingStream.h b/libminifi/include/io/NonConvertingStream.h index 3196617..aa16995 100644 --- a/libminifi/include/io/NonConvertingStream.h +++ b/libminifi/include/io/NonConvertingStream.h @@ -60,7 +60,7 @@ class NonConvertingStream : public BaseStream { int writeData(uint8_t *value, int size); - virtual void seek(uint32_t offset) { + virtual void seek(uint64_t offset) { if (composable_stream_ != this) { composable_stream_->seek(offset); } else { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/include/io/Serializable.h ---------------------------------------------------------------------- diff --git a/libminifi/include/io/Serializable.h b/libminifi/include/io/Serializable.h index 326c7b7..aa987b5 100644 --- a/libminifi/include/io/Serializable.h +++ b/libminifi/include/io/Serializable.h @@ -107,7 +107,7 @@ class Serializable { * @param value non encoded value * @return resulting write size **/ - int write(bool value); + int write(bool value, DataStream *stream); /** * write UTF string to stream http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/include/io/validation.h ---------------------------------------------------------------------- diff --git a/libminifi/include/io/validation.h b/libminifi/include/io/validation.h index 70de439..1cce663 100644 --- a/libminifi/include/io/validation.h +++ b/libminifi/include/io/validation.h @@ -73,18 +73,4 @@ static auto IsNullOrEmpty(std::shared_ptr<T> object) -> typename std::enable_if< return (nullptr == object || nullptr == object.get()); } -/** - * Determines if the variable is null or strlen(str) == 0 - */ -static auto IsNullOrEmpty(const char *str)-> decltype(NULL !=str, bool()) { - return (NULL == str || strlen(str) == 0); -} - -/** - * Determines if the variable is null or strlen(str) == 0 - */ -static auto IsNullOrEmpty(char *str)-> decltype(NULL !=str, bool()) { - return (NULL == str || strlen(str) == 0); -} - #endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/include/processors/GetFile.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/GetFile.h b/libminifi/include/processors/GetFile.h index c193a8d..07d3939 100644 --- a/libminifi/include/processors/GetFile.h +++ b/libminifi/include/processors/GetFile.h @@ -35,12 +35,13 @@ namespace processors { struct GetFileRequest { std::string directory = ".";bool recursive = true;bool keepSourceFile = false; - int64_t minAge = 0; - int64_t maxAge = 0; - int64_t minSize = 0; - int64_t maxSize = 0;bool ignoreHiddenFile = true; - int64_t pollInterval = 0; - int64_t batchSize = 10; + uint64_t minAge = 0; + uint64_t maxAge = 0; + uint64_t minSize = 0; + uint64_t maxSize = 0; + bool ignoreHiddenFile = true; + uint64_t pollInterval = 0; + uint64_t batchSize = 10; std::string fileFilter = "[^\\.].*"; }; @@ -62,7 +63,7 @@ class GetFileMetrics : public state::metrics::Metrics { virtual ~GetFileMetrics() { } - virtual std::string getName() { + virtual std::string getName() const { return core::Connectable::getName(); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/include/processors/GetTCP.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/GetTCP.h b/libminifi/include/processors/GetTCP.h index 5ee263f..7b5436f 100644 --- a/libminifi/include/processors/GetTCP.h +++ b/libminifi/include/processors/GetTCP.h @@ -67,16 +67,16 @@ class SocketAfterExecute : public utils::AfterExecute<int> { return false; } - virtual int64_t wait_time(){ + virtual int64_t wait_time() { // wait 500ms return 500; } protected: std::atomic<bool> running_; - std::map<std::string, std::future<int>*> *list_; - std::mutex *mutex_; std::string endpoint_; + std::mutex *mutex_; + std::map<std::string, std::future<int>*> *list_; }; class DataHandlerCallback : public OutputStreamCallback { @@ -132,7 +132,7 @@ class GetTCPMetrics : public state::metrics::Metrics { virtual ~GetTCPMetrics() { } - virtual std::string getName() { + virtual std::string getName() const { return core::Connectable::getName(); } @@ -178,14 +178,13 @@ class GetTCP : public core::Processor, public state::metrics::MetricsSource { */ explicit GetTCP(std::string name, uuid_t uuid = NULL) : Processor(name, uuid), - connection_attempts_(3), - reconnect_interval_(5000), - receive_buffer_size_(16 * 1024 * 1024), + running_(false), stay_connected_(true), + concurrent_handlers_(2), endOfMessageByte(13), - running_(false), + reconnect_interval_(5000), + receive_buffer_size_(16 * 1024 * 1024), connection_attempt_limit_(3), - concurrent_handlers_(2), logger_(logging::LoggerFactory<GetTCP>::getLogger()) { metrics_ = std::make_shared<GetTCPMetrics>(); } @@ -266,8 +265,6 @@ class GetTCP : public core::Processor, public state::metrics::MetricsSource { int64_t receive_buffer_size_; - int8_t connection_attempts_; - uint16_t connection_attempt_limit_; std::shared_ptr<GetTCPMetrics> metrics_; @@ -280,6 +277,7 @@ class GetTCP : public core::Processor, public state::metrics::MetricsSource { // as the top level time. std::shared_ptr<logging::Logger> logger_; + }; REGISTER_RESOURCE(GetTCP); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/include/processors/TailFile.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/TailFile.h b/libminifi/include/processors/TailFile.h index 5e166cf..fd59cb5 100644 --- a/libminifi/include/processors/TailFile.h +++ b/libminifi/include/processors/TailFile.h @@ -88,7 +88,6 @@ class TailFile : public core::Processor { // determine if state is recovered; bool _stateRecovered; uint64_t _currentTailFilePosition; - uint64_t _currentTailFileCreatedTime; static const int BUFFER_SIZE = 512; // Utils functions for parse state file http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/include/sitetosite/Peer.h ---------------------------------------------------------------------- diff --git a/libminifi/include/sitetosite/Peer.h b/libminifi/include/sitetosite/Peer.h index 4617a7a..e33b9c6 100644 --- a/libminifi/include/sitetosite/Peer.h +++ b/libminifi/include/sitetosite/Peer.h @@ -29,6 +29,7 @@ #include <mutex> #include <atomic> #include <memory> +#include "io/EndianCheck.h" #include "core/Property.h" #include "core/logging/LoggerConfiguration.h" @@ -89,10 +90,10 @@ class Peer { } protected: - uint16_t port_; - std::string host_; + uint16_t port_; + uuid_t port_id_; // secore comms @@ -152,11 +153,11 @@ class SiteToSitePeer : public org::apache::nifi::minifi::io::BaseStream { } explicit SiteToSitePeer(const std::string &host, uint16_t port) - : host_(host), + : stream_(nullptr), + host_(host), port_(port), - stream_(nullptr), - yield_expiration_(0), timeout_(30000), + yield_expiration_(0), logger_(logging::LoggerFactory<SiteToSitePeer>::getLogger()) { url_ = "nifi://" + host_ + ":" + std::to_string(port_); yield_expiration_ = 0; @@ -266,6 +267,8 @@ class SiteToSitePeer : public org::apache::nifi::minifi::io::BaseStream { } void setStream(std::unique_ptr<org::apache::nifi::minifi::io::DataStream> stream) { + stream_ = nullptr; + if (stream) stream_ = std::move(stream); } @@ -273,24 +276,22 @@ class SiteToSitePeer : public org::apache::nifi::minifi::io::BaseStream { return stream_.get(); } - int write(uint8_t value) { + int write(uint8_t value, bool is_little_endian = minifi::io::EndiannessCheck::IS_LITTLE) { return Serializable::write(value, stream_.get()); } - int write(char value) { + int write(char value, bool is_little_endian = minifi::io::EndiannessCheck::IS_LITTLE) { return Serializable::write(value, stream_.get()); } - int write(uint32_t value) { - + int write(uint32_t value, bool is_little_endian = minifi::io::EndiannessCheck::IS_LITTLE) { return Serializable::write(value, stream_.get()); - } - int write(uint16_t value) { + int write(uint16_t value, bool is_little_endian = minifi::io::EndiannessCheck::IS_LITTLE) { return Serializable::write(value, stream_.get()); } int write(uint8_t *value, int len) { return Serializable::write(value, len, stream_.get()); } - int write(uint64_t value) { + int write(uint64_t value, bool is_little_endian = minifi::io::EndiannessCheck::IS_LITTLE) { return Serializable::write(value, stream_.get()); } int write(bool value) { @@ -303,7 +304,7 @@ class SiteToSitePeer : public org::apache::nifi::minifi::io::BaseStream { int read(uint8_t &value) { return Serializable::read(value, stream_.get()); } - int read(uint16_t &value) { + int read(uint16_t &value, bool is_little_endian = minifi::io::EndiannessCheck::IS_LITTLE) { return Serializable::read(value, stream_.get()); } int read(char &value) { @@ -312,10 +313,10 @@ class SiteToSitePeer : public org::apache::nifi::minifi::io::BaseStream { int read(uint8_t *value, int len) { return Serializable::read(value, len, stream_.get()); } - int read(uint32_t &value) { + int read(uint32_t &value, bool is_little_endian = minifi::io::EndiannessCheck::IS_LITTLE) { return Serializable::read(value, stream_.get()); } - int read(uint64_t &value) { + int read(uint64_t &value, bool is_little_endian = minifi::io::EndiannessCheck::IS_LITTLE) { return Serializable::read(value, stream_.get()); } int readUTF(std::string &str, bool widen = false) { @@ -350,6 +351,7 @@ class SiteToSitePeer : public org::apache::nifi::minifi::io::BaseStream { std::unique_ptr<org::apache::nifi::minifi::io::DataStream> stream_; std::string host_; + uint16_t port_; // Mutex for protection @@ -358,18 +360,14 @@ class SiteToSitePeer : public org::apache::nifi::minifi::io::BaseStream { std::string url_; // socket timeout; std::atomic<uint64_t> timeout_; - // Logger - std::shared_ptr<logging::Logger> logger_; // Yield Period in Milliseconds std::atomic<uint64_t> yield_period_msec_; // Yield Expiration std::atomic<uint64_t> yield_expiration_; // Yield Expiration per destination PortID std::map<std::string, uint64_t> yield_expiration_PortIdMap; - // OpenSSL connection state - // Prevent default copy constructor and assignment operation - // Only support pass by reference or pointer - + // Logger + std::shared_ptr<logging::Logger> logger_; }; } /* namespace sitetosite */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/include/sitetosite/SiteToSite.h ---------------------------------------------------------------------- diff --git a/libminifi/include/sitetosite/SiteToSite.h b/libminifi/include/sitetosite/SiteToSite.h index 953c5b1..8219f58 100644 --- a/libminifi/include/sitetosite/SiteToSite.h +++ b/libminifi/include/sitetosite/SiteToSite.h @@ -31,6 +31,10 @@ namespace apache { namespace nifi { namespace minifi { namespace sitetosite { +#if defined(__GNUC__) || defined(__GNUG__) +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wunused-variable" +#endif // Resource Negotiated Status Code #define RESOURCE_OK 20 @@ -245,7 +249,8 @@ typedef enum { // Respond Code Class typedef struct { RespondCode code; - const char *description;bool hasDescription; + const char *description; + bool hasDescription; } RespondCodeContext; // Respond Code Context @@ -265,7 +270,8 @@ class Transaction { * Create a new transaction */ explicit Transaction(TransferDirection direction, org::apache::nifi::minifi::io::CRCStream<SiteToSitePeer> &stream) - : crcStream(std::move(stream)), closed_(false) { + : closed_(false), + crcStream(std::move(stream)) { _state = TRANSACTION_STARTED; _direction = direction; _dataAvailable = false; @@ -297,7 +303,6 @@ class Transaction { uuid_parse(str.c_str(), uuid_); } - // getState TransactionState getState() { return _state; @@ -406,6 +411,9 @@ class SiteToSiteClientConfiguration { std::shared_ptr<controllers::SSLContextService> ssl_service_; }; +#if defined(__GNUC__) || defined(__GNUG__) +#pragma GCC diagnostic pop +#endif } /* namespace sitetosite */ } /* namespace minifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/include/sitetosite/SiteToSiteClient.h ---------------------------------------------------------------------- diff --git a/libminifi/include/sitetosite/SiteToSiteClient.h b/libminifi/include/sitetosite/SiteToSiteClient.h index 98404cb..59539a1 100644 --- a/libminifi/include/sitetosite/SiteToSiteClient.h +++ b/libminifi/include/sitetosite/SiteToSiteClient.h @@ -46,10 +46,9 @@ class DataPacket { } std::map<std::string, std::string> _attributes; uint64_t _size; - std::shared_ptr<logging::Logger> logger_reference_; std::shared_ptr<Transaction> transaction_; const std::string & payload_; - + std::shared_ptr<logging::Logger> logger_reference_; }; class SiteToSiteClient : public core::Connectable { @@ -233,8 +232,6 @@ class SiteToSiteClient : public core::Connectable { // Peer State PeerState peer_state_; - std::shared_ptr<logging::Logger> logger_; - // portIDStr std::string port_id_str_; @@ -262,6 +259,10 @@ class SiteToSiteClient : public core::Connectable { uint32_t _currentCodecVersion; int _currentCodecVersionIndex; + private: + std::shared_ptr<logging::Logger> logger_; + + }; // Nest Callback Class for write stream http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/include/sitetosite/SiteToSiteFactory.h ---------------------------------------------------------------------- diff --git a/libminifi/include/sitetosite/SiteToSiteFactory.h b/libminifi/include/sitetosite/SiteToSiteFactory.h index dda8337..4beefc6 100644 --- a/libminifi/include/sitetosite/SiteToSiteFactory.h +++ b/libminifi/include/sitetosite/SiteToSiteFactory.h @@ -39,7 +39,7 @@ static std::unique_ptr<SiteToSitePeer> createStreamingPeer(const SiteToSiteClien std::unique_ptr<org::apache::nifi::minifi::io::DataStream> str = std::unique_ptr<org::apache::nifi::minifi::io::DataStream>( client_configuration.getStreamFactory()->createSocket(client_configuration.getPeer()->getHost(), client_configuration.getPeer()->getPort())); auto peer = std::unique_ptr<SiteToSitePeer>(new SiteToSitePeer(std::move(str), client_configuration.getPeer()->getHost(), client_configuration.getPeer()->getPort())); - return std::move(peer); + return peer; } /** @@ -51,7 +51,7 @@ static std::unique_ptr<SiteToSiteClient> createRawSocket(const SiteToSiteClientC client_configuration.getPeer()->getPortId(uuid); auto ptr = std::unique_ptr<SiteToSiteClient>(new RawSiteToSiteClient(createStreamingPeer(client_configuration))); ptr->setPortId(uuid); - return std::move(ptr); + return ptr; } /** @@ -73,7 +73,7 @@ static std::unique_ptr<SiteToSiteClient> createClient(const SiteToSiteClientConf auto peer = std::unique_ptr<SiteToSitePeer>(new SiteToSitePeer(client_configuration.getPeer()->getHost(), client_configuration.getPeer()->getPort())); ptr->setPortId(uuid); ptr->setPeer(std::move(peer)); - return std::move(ptr); + return ptr; } return nullptr; }; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/include/utils/ByteArrayCallback.h ---------------------------------------------------------------------- diff --git a/libminifi/include/utils/ByteArrayCallback.h b/libminifi/include/utils/ByteArrayCallback.h index 2ecd29b..f0a83ee 100644 --- a/libminifi/include/utils/ByteArrayCallback.h +++ b/libminifi/include/utils/ByteArrayCallback.h @@ -17,8 +17,6 @@ #ifndef LIBMINIFI_INCLUDE_UTILS_BYTEARRAYCALLBACK_H_ #define LIBMINIFI_INCLUDE_UTILS_BYTEARRAYCALLBACK_H_ -#include <fstream> -#include <iterator> #include "concurrentqueue.h" #include "FlowFileRecord.h" #include "core/logging/LoggerConfiguration.h" @@ -97,8 +95,7 @@ class ByteOutputCallback : public OutputStreamCallback { ByteOutputCallback() = delete; explicit ByteOutputCallback(size_t max_size, bool wait_on_read=false) - : ptr(nullptr), - max_size_(max_size), + : max_size_(max_size), read_started_( wait_on_read ? false : true ), logger_(logging::LoggerFactory<ByteOutputCallback>::getLogger()) { current_str_pos = 0; @@ -145,8 +142,6 @@ class ByteOutputCallback : public OutputStreamCallback { // flag to wait on writes until we have a consumer. std::atomic<bool> read_started_; - char *ptr; - size_t current_str_pos; std::string current_str; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/include/utils/HTTPClient.h ---------------------------------------------------------------------- diff --git a/libminifi/include/utils/HTTPClient.h b/libminifi/include/utils/HTTPClient.h index 0a9278e..0c4b179 100644 --- a/libminifi/include/utils/HTTPClient.h +++ b/libminifi/include/utils/HTTPClient.h @@ -67,7 +67,7 @@ struct HTTPHeaderResponse { } void append(const std::string &header) { - if (max_tokens_ == -1 || header_tokens_.size() <= max_tokens_) { + if (max_tokens_ == -1 || (int32_t)header_tokens_.size() <= max_tokens_) { header_tokens_.push_back(header); } } @@ -76,7 +76,7 @@ struct HTTPHeaderResponse { header_mapping_[key].append(value); } - int max_tokens_; + int32_t max_tokens_; std::vector<std::string> header_tokens_; std::map<std::string, std::string> header_mapping_; @@ -145,7 +145,7 @@ class HTTPRequestResponse { return 0x10000000; size_t buffer_size = callback->ptr->getBufferSize(); if (callback->getPos() <= buffer_size) { - int len = buffer_size - callback->pos; + size_t len = buffer_size - callback->pos; if (len <= 0) return 0; char *ptr = callback->ptr->getBuffer(callback->getPos()); @@ -276,67 +276,9 @@ class BaseHTTPClient { }; -static std::string get_token(utils::BaseHTTPClient *client, std::string username, std::string password) { +extern std::string get_token(utils::BaseHTTPClient *client, std::string username, std::string password); - if (nullptr == client) { - return ""; - } - std::string token; - - client->setContentType("application/x-www-form-urlencoded"); - - client->set_request_method("POST"); - - std::string payload = "username=" + username + "&" + "password=" + password; - - client->setPostFields(client->escape(payload)); - - client->submit(); - - if (client->submit() && client->getResponseCode() == 200) { - - const std::string &response_body = std::string(client->getResponseBody().data(), client->getResponseBody().size()); - - if (!response_body.empty()) { - token = "Bearer " + response_body; - } - } - - return token; -} - -static void parse_url(std::string &url, std::string &host, int &port, std::string &protocol) { - - std::string http("http://"); - std::string https("https://"); - - if (url.compare(0, http.size(), http) == 0) - protocol = http; - - if (url.compare(0, https.size(), https) == 0) - protocol = https; - - if (!protocol.empty()) { - size_t pos = url.find_first_of(":", protocol.size()); - - if (pos == std::string::npos) { - pos = url.size(); - } - - host = url.substr(protocol.size(), pos - protocol.size()); - - if (pos < url.size() && url[pos] == ':') { - size_t ppos = url.find_first_of("/", pos); - if (ppos == std::string::npos) { - ppos = url.size(); - } - std::string portStr(url.substr(pos + 1, ppos - pos - 1)); - if (portStr.size() > 0) { - port = std::stoi(portStr); - } - } - } -} +extern void parse_url(std::string *url, std::string *host, int *port, std::string *protocol); } /* namespace utils */ } /* namespace minifi */ } /* namespace nifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/include/utils/StringUtils.h ---------------------------------------------------------------------- diff --git a/libminifi/include/utils/StringUtils.h b/libminifi/include/utils/StringUtils.h index df48086..5dce17b 100644 --- a/libminifi/include/utils/StringUtils.h +++ b/libminifi/include/utils/StringUtils.h @@ -95,8 +95,8 @@ class StringUtils { static std::vector<std::string> split(const std::string &str, const std::string &delimiter) { std::vector<std::string> result; - int last = 0; - int next = 0; + size_t last = 0; + size_t next = 0; while ((next = str.find(delimiter, last)) != std::string::npos) { result.push_back(str.substr(last, next - last)); last = next + delimiter.length(); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/include/utils/ThreadPool.h ---------------------------------------------------------------------- diff --git a/libminifi/include/utils/ThreadPool.h b/libminifi/include/utils/ThreadPool.h index 0ae2e5b..4f80829 100644 --- a/libminifi/include/utils/ThreadPool.h +++ b/libminifi/include/utils/ThreadPool.h @@ -69,18 +69,18 @@ template<typename T> class Worker { public: explicit Worker(std::function<T()> &task, const std::string &identifier, std::unique_ptr<AfterExecute<T>> run_determinant) - : task(task), - run_determinant_(std::move(run_determinant)), - identifier_(identifier), - time_slice_(0) { + : identifier_(identifier), + time_slice_(0), + task(task), + run_determinant_(std::move(run_determinant)) { promise = std::make_shared<std::promise<T>>(); } explicit Worker(std::function<T()> &task, const std::string &identifier) - : task(task), - run_determinant_(nullptr), - identifier_(identifier), - time_slice_(0) { + : identifier_(identifier), + time_slice_(0), + task(task), + run_determinant_(nullptr) { promise = std::make_shared<std::promise<T>>(); } @@ -97,11 +97,11 @@ class Worker { * Move constructor for worker tasks */ Worker(Worker &&other) - : task(std::move(other.task)), - promise(other.promise), + : identifier_(std::move(other.identifier_)), time_slice_(std::move(other.time_slice_)), - identifier_(std::move(other.identifier_)), - run_determinant_(std::move(other.run_determinant_)) { + task(std::move(other.task)), + run_determinant_(std::move(other.run_determinant_)), + promise(other.promise) { } /** @@ -192,15 +192,15 @@ class ThreadPool { public: ThreadPool(int max_worker_threads = 2, bool daemon_threads = false) - : max_worker_threads_(max_worker_threads), - daemon_threads_(daemon_threads), + : daemon_threads_(daemon_threads), + max_worker_threads_(max_worker_threads), running_(false) { current_workers_ = 0; } ThreadPool(const ThreadPool<T> &&other) - : max_worker_threads_(std::move(other.max_worker_threads_)), - daemon_threads_(std::move(other.daemon_threads_)), + : daemon_threads_(std::move(other.daemon_threads_)), + max_worker_threads_(std::move(other.max_worker_threads_)), running_(false) { current_workers_ = 0; } @@ -407,7 +407,7 @@ void ThreadPool<T>::run_tasks() { auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(now).count(); // if our differential is < 10% of the wait time we will not put the task into a wait state // since requeuing will break the time slice contract. - if (task.getTimeSlice() > ms && (task.getTimeSlice() - ms) > (wt * .10)) { + if ((double)task.getTimeSlice() > ms && ((double)(task.getTimeSlice() - ms)) > (wt * .10)) { wait_to_run = true; } } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/src/FlowController.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp index 0975032..95b9c52 100644 --- a/libminifi/src/FlowController.cpp +++ b/libminifi/src/FlowController.cpp @@ -172,7 +172,7 @@ FlowController::~FlowController() { bool FlowController::applyConfiguration(const std::string &configurePayload) { std::unique_ptr<core::ProcessGroup> newRoot; try { - newRoot = std::move(flow_configuration_->getRootFromPayload(configurePayload)); + newRoot = flow_configuration_->getRootFromPayload(configurePayload); } catch (...) { logger_->log_error("Invalid configuration payload"); return false; @@ -623,9 +623,7 @@ int16_t FlowController::clearConnection(const std::string &connection) { return -1; } -int16_t FlowController::getMetrics(std::vector<std::shared_ptr<state::metrics::Metrics>> &metric_vector, uint8_t metricsClass) { - auto now = std::chrono::steady_clock::now(); - auto time_since = std::chrono::duration_cast<std::chrono::milliseconds>(now - last_metrics_capture_).count(); +int16_t FlowController::getMetrics(std::vector<std::shared_ptr<state::metrics::Metrics>> &metric_vector, uint16_t metricsClass) { std::lock_guard<std::mutex> lock(metrics_mutex_); if (metricsClass == 0) { for (auto metric : metrics_) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/src/RemoteProcessorGroupPort.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp b/libminifi/src/RemoteProcessorGroupPort.cpp index 8604c6e..76932de 100644 --- a/libminifi/src/RemoteProcessorGroupPort.cpp +++ b/libminifi/src/RemoteProcessorGroupPort.cpp @@ -68,18 +68,18 @@ std::unique_ptr<sitetosite::SiteToSiteClient> RemoteProcessorGroupPort::getNextP // create if (url_.empty()) { sitetosite::SiteToSiteClientConfiguration config(stream_factory_, std::make_shared<sitetosite::Peer>(protocol_uuid_, host_, port_, ssl_service != nullptr), client_type_); - nextProtocol = std::move(sitetosite::createClient(config)); + nextProtocol = sitetosite::createClient(config); } else if (peer_index_ >= 0) { std::lock_guard<std::mutex> lock(peer_mutex_); logger_->log_info("Creating client from peer %d", peer_index_.load()); sitetosite::SiteToSiteClientConfiguration config(stream_factory_, peers_[this->peer_index_].getPeer(), client_type_); peer_index_++; - if (peer_index_ >= peers_.size()) { + if (peer_index_ >= static_cast<int>(peers_.size())) { peer_index_ = 0; } - nextProtocol = std::move(sitetosite::createClient(config)); + nextProtocol = sitetosite::createClient(config); } else { logger_->log_info("Refreshing the peer list since there are none configured."); refreshPeerList(); @@ -87,11 +87,11 @@ std::unique_ptr<sitetosite::SiteToSiteClient> RemoteProcessorGroupPort::getNextP } } logger_->log_info("Obtained protocol from available_protocols_"); - return std::move(nextProtocol); + return nextProtocol; } void RemoteProcessorGroupPort::returnProtocol(std::unique_ptr<sitetosite::SiteToSiteClient> return_protocol) { - int count = peers_.size(); + auto count = peers_.size(); if (max_concurrent_tasks_ > count) count = max_concurrent_tasks_; if (available_protocols_.size_approx() >= count) { @@ -132,18 +132,18 @@ void RemoteProcessorGroupPort::initialize() { } // populate the site2site protocol for load balancing between them if (peers_.size() > 0) { - int count = peers_.size(); + auto count = peers_.size(); if (max_concurrent_tasks_ > count) count = max_concurrent_tasks_; - for (int i = 0; i < count; i++) { + for (uint32_t i = 0; i < count; i++) { std::unique_ptr<sitetosite::SiteToSiteClient> nextProtocol = nullptr; sitetosite::SiteToSiteClientConfiguration config(stream_factory_, peers_[this->peer_index_].getPeer(), client_type_); peer_index_++; - if (peer_index_ >= peers_.size()) { + if (peer_index_ >= static_cast<int>(peers_.size())) { peer_index_ = 0; } logger_->log_info("Creating client"); - nextProtocol = std::move(sitetosite::createClient(config)); + nextProtocol = sitetosite::createClient(config); logger_->log_info("Created client, moving into available protocols"); returnProtocol(std::move(nextProtocol)); } @@ -298,7 +298,7 @@ void RemoteProcessorGroupPort::refreshPeerList() { std::unique_ptr<sitetosite::SiteToSiteClient> protocol; sitetosite::SiteToSiteClientConfiguration config(stream_factory_, std::make_shared<sitetosite::Peer>(protocol_uuid_, host_, site2site_port_, ssl_service != nullptr), client_type_); - protocol = std::move(sitetosite::createClient(config)); + protocol = sitetosite::createClient(config); protocol->getPeerList(peers_); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/src/c2/C2Agent.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp index 58b5514..ae1629f 100644 --- a/libminifi/src/c2/C2Agent.cpp +++ b/libminifi/src/c2/C2Agent.cpp @@ -341,7 +341,7 @@ void C2Agent::handle_c2_server_response(const C2ContentResponse &resp) { case Operation::RESTART: { update_sink_->stop(true); C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true); - C2Payload && ret = protocol_.load()->consumePayload(std::move(response)); + protocol_.load()->consumePayload(std::move(response)); exit(1); } break; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/src/c2/C2Payload.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/c2/C2Payload.cpp b/libminifi/src/c2/C2Payload.cpp index 2a737d4..8a30e2a 100644 --- a/libminifi/src/c2/C2Payload.cpp +++ b/libminifi/src/c2/C2Payload.cpp @@ -190,7 +190,7 @@ C2Payload &C2Payload::operator=(const C2Payload &&other) { op_ = std::move(other.op_); raw_ = other.raw_; if (raw_) { - raw_data_ = std::move(raw_data_); + raw_data_ = std::move(other.raw_data_); } label_ = std::move(other.label_); payloads_ = std::move(other.payloads_); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/src/core/Core.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/Core.cpp b/libminifi/src/core/Core.cpp index 995c001..da9462b 100644 --- a/libminifi/src/core/Core.cpp +++ b/libminifi/src/core/Core.cpp @@ -60,7 +60,7 @@ void CoreComponent::setName(const std::string name) { name_ = name; } // Get Process Name -std::string CoreComponent::getName() { +std::string CoreComponent::getName() const { return name_; } } /* namespace core */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/src/core/ProcessSession.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp index 2d8ee08..7f31647 100644 --- a/libminifi/src/core/ProcessSession.cpp +++ b/libminifi/src/core/ProcessSession.cpp @@ -139,7 +139,7 @@ std::shared_ptr<core::FlowFile> ProcessSession::clone(const std::shared_ptr<core std::shared_ptr<core::FlowFile> record = this->create(parent); if (record) { if (parent->getResourceClaim()) { - if ((offset + size) > parent->getSize()) { + if ((uint64_t)(offset + size) > parent->getSize()) { // Set offset and size logger_->log_error("clone offset %d and size %d exceed parent size %d", offset, size, parent->getSize()); // Remove the Add FlowFile for the session @@ -326,7 +326,7 @@ void ProcessSession::read(const std::shared_ptr<core::FlowFile> &flow, InputStre */ void ProcessSession::importFrom(io::DataStream &stream, const std::shared_ptr<core::FlowFile> &flow) { std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository()); - int max_read = getpagesize(); + size_t max_read = getpagesize(); std::vector<uint8_t> charBuffer; charBuffer.resize(max_read); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/src/core/repository/VolatileContentRepository.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/repository/VolatileContentRepository.cpp b/libminifi/src/core/repository/VolatileContentRepository.cpp index 77eb1b2..60adc1b 100644 --- a/libminifi/src/core/repository/VolatileContentRepository.cpp +++ b/libminifi/src/core/repository/VolatileContentRepository.cpp @@ -130,34 +130,30 @@ std::shared_ptr<io::BaseStream> VolatileContentRepository::write(const std::shar } bool VolatileContentRepository::exists(const std::shared_ptr<minifi::ResourceClaim> &claim) { - int size = 0; - { - std::lock_guard<std::mutex> lock(map_mutex_); - auto claim_check = master_list_.find(claim->getContentFullPath()); - if (claim_check != master_list_.end()) { - auto ent = claim_check->second->takeOwnership(); - if (ent == nullptr) { - return false; - } - return true; + std::lock_guard<std::mutex> lock(map_mutex_); + auto claim_check = master_list_.find(claim->getContentFullPath()); + if (claim_check != master_list_.end()) { + auto ent = claim_check->second->takeOwnership(); + if (ent == nullptr) { + return false; } + return true; } + return false; } std::shared_ptr<io::BaseStream> VolatileContentRepository::read(const std::shared_ptr<minifi::ResourceClaim> &claim) { - int size = 0; - { - std::lock_guard<std::mutex> lock(map_mutex_); - auto claim_check = master_list_.find(claim->getContentFullPath()); - if (claim_check != master_list_.end()) { - auto ent = claim_check->second->takeOwnership(); - if (ent == nullptr) { - return nullptr; - } - return std::make_shared<io::AtomicEntryStream<std::shared_ptr<minifi::ResourceClaim>>>(claim, ent); + std::lock_guard<std::mutex> lock(map_mutex_); + auto claim_check = master_list_.find(claim->getContentFullPath()); + if (claim_check != master_list_.end()) { + auto ent = claim_check->second->takeOwnership(); + if (ent == nullptr) { + return nullptr; } + return std::make_shared<io::AtomicEntryStream<std::shared_ptr<minifi::ResourceClaim>>>(claim, ent); } + return nullptr; } @@ -181,11 +177,13 @@ bool VolatileContentRepository::remove(const std::shared_ptr<minifi::ResourceCla } } else { std::lock_guard<std::mutex> lock(map_mutex_); - auto size = master_list_[claim->getContentFullPath()]->getLength(); - delete master_list_[claim->getContentFullPath()]; - master_list_.erase(claim->getContentFullPath()); - current_size_ -= size; - + auto claim_item = master_list_.find(claim->getContentFullPath()); + if (claim_item != master_list_.end()) { + auto size = claim_item->second->getLength(); + delete claim_item->second; + master_list_.erase(claim->getContentFullPath()); + current_size_ -= size; + } return true; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/src/core/yaml/YamlConfiguration.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/yaml/YamlConfiguration.cpp index 135fc60..661cfe8 100644 --- a/libminifi/src/core/yaml/YamlConfiguration.cpp +++ b/libminifi/src/core/yaml/YamlConfiguration.cpp @@ -105,7 +105,7 @@ void YamlConfiguration::parseProcessorNodeYaml(YAML::Node processorsNode, core:: logger_->log_debug("parseProcessorNode: class => [%s]", procCfg.javaClass); // Determine the processor name only from the Java class - int lastOfIdx = procCfg.javaClass.find_last_of("."); + auto lastOfIdx = procCfg.javaClass.find_last_of("."); if (lastOfIdx != std::string::npos) { lastOfIdx++; // if a value is found, increment to move beyond the . int nameLength = procCfg.javaClass.length() - lastOfIdx; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/src/io/BaseStream.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/io/BaseStream.cpp b/libminifi/src/io/BaseStream.cpp index 137bf1a..a750586 100644 --- a/libminifi/src/io/BaseStream.cpp +++ b/libminifi/src/io/BaseStream.cpp @@ -84,7 +84,7 @@ int BaseStream::write(uint64_t base_value, bool is_little_endian) { **/ int BaseStream::write(bool value) { uint8_t v = value; - return Serializable::write(v); + return Serializable::write(v, reinterpret_cast<DataStream*>(composable_stream_)); } /** http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/src/io/ClientSocket.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/io/ClientSocket.cpp b/libminifi/src/io/ClientSocket.cpp index d702620..c520249 100644 --- a/libminifi/src/io/ClientSocket.cpp +++ b/libminifi/src/io/ClientSocket.cpp @@ -149,8 +149,6 @@ int8_t Socket::createConnection(const addrinfo *p, in_addr_t &addr) { } int16_t Socket::initialize() { - struct sockaddr_in servAddr; - addrinfo hints = { sizeof(addrinfo) }; memset(&hints, 0, sizeof hints); // make sure the struct is empty hints.ai_family = AF_UNSPEC; @@ -175,10 +173,8 @@ int16_t Socket::initialize() { h = gethostbyname(requested_hostname_.c_str()); #else const char *host; - uint16_t port; host = requested_hostname_.c_str(); - port = port_; char buf[1024]; struct hostent he; int hh_errno; @@ -263,19 +259,16 @@ int16_t Socket::select_descriptor(const uint16_t msec) { int16_t Socket::setSocketOptions(const int sock) { int opt = 1; - bool nagle_off = true; #ifndef __MACH__ - if (nagle_off) { - if (setsockopt(sock, SOL_TCP, TCP_NODELAY, static_cast<void*>(&opt), sizeof(opt)) < 0) { - logger_->log_error("setsockopt() TCP_NODELAY failed"); - close(sock); - return -1; - } - if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char*>(&opt), sizeof(opt)) < 0) { - logger_->log_error("setsockopt() SO_REUSEADDR failed"); - close(sock); - return -1; - } + if (setsockopt(sock, SOL_TCP, TCP_NODELAY, static_cast<void*>(&opt), sizeof(opt)) < 0) { + logger_->log_error("setsockopt() TCP_NODELAY failed"); + close(sock); + return -1; + } + if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char*>(&opt), sizeof(opt)) < 0) { + logger_->log_error("setsockopt() SO_REUSEADDR failed"); + close(sock); + return -1; } int sndsize = 256 * 1024; @@ -303,7 +296,7 @@ std::string Socket::getHostname() const { } int Socket::writeData(std::vector<uint8_t> &buf, int buflen) { - if (buf.capacity() < buflen) + if (static_cast<int>(buf.capacity()) < buflen) return -1; return writeData(reinterpret_cast<uint8_t *>(&buf[0]), buflen); } @@ -386,7 +379,7 @@ int Socket::read(uint16_t &value, bool is_little_endian) { } int Socket::readData(std::vector<uint8_t> &buf, int buflen, bool retrieve_all_bytes) { - if (buf.capacity() < buflen) { + if (static_cast<int>(buf.capacity()) < buflen) { buf.resize(buflen); } return readData(reinterpret_cast<uint8_t*>(&buf[0]), buflen, retrieve_all_bytes); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/src/io/DataStream.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/io/DataStream.cpp b/libminifi/src/io/DataStream.cpp index 14ff5f0..656bcdb 100644 --- a/libminifi/src/io/DataStream.cpp +++ b/libminifi/src/io/DataStream.cpp @@ -94,7 +94,7 @@ int DataStream::readData(std::vector<uint8_t> &buf, int buflen) { return -1; } - if (buf.capacity() < buflen) + if (static_cast<int>(buf.capacity()) < buflen) buf.resize(buflen); buf.insert(buf.begin(), &buffer[readBuffer], &buffer[readBuffer + buflen]); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/src/io/FileStream.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/io/FileStream.cpp b/libminifi/src/io/FileStream.cpp index 2b605eb..abf2ce2 100644 --- a/libminifi/src/io/FileStream.cpp +++ b/libminifi/src/io/FileStream.cpp @@ -83,7 +83,7 @@ void FileStream::seek(uint64_t offset) { } int FileStream::writeData(std::vector<uint8_t> &buf, int buflen) { - if (buf.capacity() < buflen) { + if (static_cast<int>(buf.capacity()) < buflen) { return -1; } return writeData(reinterpret_cast<uint8_t *>(&buf[0]), buflen); @@ -119,7 +119,7 @@ inline std::vector<uint8_t> FileStream::readBuffer(const T& t) { } int FileStream::readData(std::vector<uint8_t> &buf, int buflen) { - if (buf.capacity() < buflen) { + if (static_cast<int>(buf.capacity()) < buflen) { buf.resize(buflen); } int ret = readData(reinterpret_cast<uint8_t*>(&buf[0]), buflen); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/src/io/NonConvertingStream.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/io/NonConvertingStream.cpp b/libminifi/src/io/NonConvertingStream.cpp index f40bede..07bb130 100644 --- a/libminifi/src/io/NonConvertingStream.cpp +++ b/libminifi/src/io/NonConvertingStream.cpp @@ -84,7 +84,7 @@ int NonConvertingStream::write(uint64_t base_value, bool is_little_endian) { **/ int NonConvertingStream::write(bool value) { uint8_t v = value; - return Serializable::write(v); + return Serializable::write(v, reinterpret_cast<DataStream*>(composable_stream_)); } /** http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/src/io/Serializable.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/io/Serializable.cpp b/libminifi/src/io/Serializable.cpp index 342d2ba..7d3c79a 100644 --- a/libminifi/src/io/Serializable.cpp +++ b/libminifi/src/io/Serializable.cpp @@ -64,9 +64,9 @@ int Serializable::write(uint8_t *value, int len, DataStream *stream) { return stream->writeData(value, len); } -int Serializable::write(bool value) { +int Serializable::write(bool value, DataStream *stream) { uint8_t temp = value; - return write(temp); + return stream->writeData(&temp, 1); } int Serializable::read(uint8_t &value, DataStream *stream) { @@ -150,9 +150,7 @@ int Serializable::readUTF(std::string &str, DataStream *stream, bool widen) { } int Serializable::writeUTF(std::string str, DataStream *stream, bool widen) { - int inLength = str.length(); uint32_t utflen = 0; - int currentPtr = 0; utflen = str.length(); @@ -176,8 +174,6 @@ int Serializable::writeUTF(std::string str, DataStream *stream, bool widen) { utf_to_write.resize(utflen); } - int i = 0; - uint8_t *underlyingPtr = &utf_to_write[0]; for (auto c : str) { writeData(c, underlyingPtr++); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/src/processors/ExecuteProcess.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/ExecuteProcess.cpp b/libminifi/src/processors/ExecuteProcess.cpp index 323d69a..622f30a 100644 --- a/libminifi/src/processors/ExecuteProcess.cpp +++ b/libminifi/src/processors/ExecuteProcess.cpp @@ -105,7 +105,7 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSessi argc++; } argv[argc] = NULL; - int status, died; + int status; if (!_processRunning) { _processRunning = true; // if the process has not launched yet @@ -180,7 +180,7 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSessi } break; } else { - if (numRead == (sizeof(buffer) - totalRead)) { + if (numRead == static_cast<int>((sizeof(buffer) - totalRead))) { // we reach the max buffer size logger_->log_info("Execute Command Max Respond %d", sizeof(buffer)); ExecuteProcess::WriteCallback callback(buffer, sizeof(buffer)); @@ -205,7 +205,7 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSessi } } - died = wait(&status); + wait(&status); if (WIFEXITED(status)) { logger_->log_info("Execute Command Complete %s status %d pid %d", _fullCommand.c_str(), WEXITSTATUS(status), _pid); } else { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/src/processors/ExtractText.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/ExtractText.cpp b/libminifi/src/processors/ExtractText.cpp index 1bb4d9f..2175415 100644 --- a/libminifi/src/processors/ExtractText.cpp +++ b/libminifi/src/processors/ExtractText.cpp @@ -65,7 +65,7 @@ void ExtractText::onTrigger(core::ProcessContext *context, core::ProcessSession int64_t ExtractText::ReadCallback::process(std::shared_ptr<io::BaseStream> stream) { int64_t ret = 0; - int64_t size_limit = flowFile_->getSize(); + uint64_t size_limit = flowFile_->getSize(); uint64_t read_size = 0; uint64_t loop_read = max_read_; @@ -82,7 +82,7 @@ int64_t ExtractText::ReadCallback::process(std::shared_ptr<io::BaseStream> strea std::string contentStr; while (read_size < size_limit) { - if (size_limit - read_size < max_read_) + if (size_limit - read_size < (uint64_t)max_read_) loop_read = size_limit - read_size; ret = stream->readData(buffer_, loop_read); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/src/processors/GetFile.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/GetFile.cpp b/libminifi/src/processors/GetFile.cpp index 5676c23..1dd3858 100644 --- a/libminifi/src/processors/GetFile.cpp +++ b/libminifi/src/processors/GetFile.cpp @@ -202,10 +202,10 @@ bool GetFile::acceptFile(std::string fullName, std::string name, const GetFileRe struct stat statbuf; if (stat(fullName.c_str(), &statbuf) == 0) { - if (request.minSize > 0 && statbuf.st_size < request.minSize) + if (request.minSize > 0 && statbuf.st_size < (int32_t)request.minSize) return false; - if (request.maxSize > 0 && statbuf.st_size > request.maxSize) + if (request.maxSize > 0 && statbuf.st_size > (int32_t)request.maxSize) return false; uint64_t modifiedTime = ((uint64_t) (statbuf.st_mtime) * 1000); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/src/processors/GetTCP.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/GetTCP.cpp b/libminifi/src/processors/GetTCP.cpp index 0db082f..215bb5e 100644 --- a/libminifi/src/processors/GetTCP.cpp +++ b/libminifi/src/processors/GetTCP.cpp @@ -237,7 +237,7 @@ void GetTCP::onTrigger(const std::shared_ptr<core::ProcessContext> &context, con std::vector<std::string> hostAndPort = utils::StringUtils::split(endpoint, ":"); if (hostAndPort.size() == 2) { logger_->log_debug("Opening another socket to %s:%s", hostAndPort.at(0), hostAndPort.at(1)); - std::unique_ptr<io::Socket> socket = std::move(stream_factory_->createSocket(hostAndPort.at(0), std::stoi(hostAndPort.at(1)))); + std::unique_ptr<io::Socket> socket = stream_factory_->createSocket(hostAndPort.at(0), std::stoi(hostAndPort.at(1))); if (socket->initialize() != -1) { socket->setNonBlocking(); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/src/processors/ListenHTTP.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/ListenHTTP.cpp b/libminifi/src/processors/ListenHTTP.cpp index 1e2241d..c67b357 100644 --- a/libminifi/src/processors/ListenHTTP.cpp +++ b/libminifi/src/processors/ListenHTTP.cpp @@ -306,8 +306,8 @@ int64_t ListenHTTP::WriteCallback::process(std::shared_ptr<io::BaseStream> strea while (tlen == -1 || nlen < tlen) { rlen = tlen == -1 ? sizeof(buf) : tlen - nlen; - if (rlen > sizeof(buf)) { - rlen = sizeof(buf); + if (rlen > (int64_t)sizeof(buf)) { + rlen = (int64_t)sizeof(buf); } // Read a buffer of data from client http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/src/processors/ListenSyslog.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/ListenSyslog.cpp b/libminifi/src/processors/ListenSyslog.cpp index 054d585..a71b69f 100644 --- a/libminifi/src/processors/ListenSyslog.cpp +++ b/libminifi/src/processors/ListenSyslog.cpp @@ -153,7 +153,7 @@ void ListenSyslog::runThread() { clilen = sizeof(cli_addr); int newsockfd = accept(_serverSocket, reinterpret_cast<struct sockaddr *>(&cli_addr), &clilen); if (newsockfd > 0) { - if (_clientSockets.size() < _maxConnections) { + if (_clientSockets.size() < (uint64_t)_maxConnections) { _clientSockets.push_back(newsockfd); logger_->log_info("ListenSysLog new client socket %d connection", newsockfd); continue; @@ -166,7 +166,7 @@ void ListenSyslog::runThread() { struct sockaddr_in cli_addr; clilen = sizeof(cli_addr); int recvlen = recvfrom(_serverSocket, _buffer, sizeof(_buffer), 0, (struct sockaddr *) &cli_addr, &clilen); - if (recvlen > 0 && (recvlen + getEventQueueByteSize()) <= _recvBufSize) { + if (recvlen > 0 && (uint64_t)(recvlen + getEventQueueByteSize()) <= _recvBufSize) { uint8_t *payload = new uint8_t[recvlen]; memcpy(payload, _buffer, recvlen); putEvent(payload, recvlen); @@ -183,7 +183,7 @@ void ListenSyslog::runThread() { logger_->log_info("ListenSysLog client socket %d close", clientSocket); it = _clientSockets.erase(it); } else { - if ((recvlen + getEventQueueByteSize()) <= _recvBufSize) { + if ((uint64_t)(recvlen + getEventQueueByteSize()) <= _recvBufSize) { uint8_t *payload = new uint8_t[recvlen]; memcpy(payload, _buffer, recvlen); putEvent(payload, recvlen); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/src/processors/TailFile.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/TailFile.cpp b/libminifi/src/processors/TailFile.cpp index 711f226..b0b47ec 100644 --- a/libminifi/src/processors/TailFile.cpp +++ b/libminifi/src/processors/TailFile.cpp @@ -236,7 +236,7 @@ void TailFile::onTrigger(core::ProcessContext *context, core::ProcessSession *se struct stat statbuf; if (stat(fullPath.c_str(), &statbuf) == 0) { - if (statbuf.st_size <= this->_currentTailFilePosition) { + if ((uint64_t)statbuf.st_size <= this->_currentTailFilePosition) { // there are no new input for the current tail file context->yield(); return; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/src/sitetosite/RawSocketProtocol.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/sitetosite/RawSocketProtocol.cpp b/libminifi/src/sitetosite/RawSocketProtocol.cpp index e162a36..43c3157 100644 --- a/libminifi/src/sitetosite/RawSocketProtocol.cpp +++ b/libminifi/src/sitetosite/RawSocketProtocol.cpp @@ -312,7 +312,7 @@ bool RawSiteToSiteClient::getPeerList(std::vector<PeerStatus> &peers) { return false; } - for (int i = 0; i < number; i++) { + for (uint32_t i = 0; i < number; i++) { std::string host; status = peer_->readUTF(host); if (status <= 0) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/src/sitetosite/SiteToSiteClient.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/sitetosite/SiteToSiteClient.cpp b/libminifi/src/sitetosite/SiteToSiteClient.cpp index e521bbd..7094fd3 100644 --- a/libminifi/src/sitetosite/SiteToSiteClient.cpp +++ b/libminifi/src/sitetosite/SiteToSiteClient.cpp @@ -534,7 +534,7 @@ int16_t SiteToSiteClient::send(std::string transactionID, DataPacket *packet, co } ret = transaction->getStream().writeData(reinterpret_cast<uint8_t *>(const_cast<char*>(packet->payload_.c_str())), len); - if (ret != len) { + if (ret != (int64_t)len) { logger_->log_info("ret != len"); return -1; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/src/utils/HTTPClient.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/utils/HTTPClient.cpp b/libminifi/src/utils/HTTPClient.cpp new file mode 100644 index 0000000..e7c9d64 --- /dev/null +++ b/libminifi/src/utils/HTTPClient.cpp @@ -0,0 +1,87 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "utils/HTTPClient.h" +#include <string> +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + +std::string get_token(utils::BaseHTTPClient *client, std::string username, std::string password) { + if (nullptr == client) { + return ""; + } + std::string token; + + client->setContentType("application/x-www-form-urlencoded"); + + client->set_request_method("POST"); + + std::string payload = "username=" + username + "&" + "password=" + password; + + client->setPostFields(client->escape(payload)); + + client->submit(); + + if (client->submit() && client->getResponseCode() == 200) { + const std::string &response_body = std::string(client->getResponseBody().data(), client->getResponseBody().size()); + if (!response_body.empty()) { + token = "Bearer " + response_body; + } + } + return token; +} + +void parse_url(std::string *url, std::string *host, int *port, std::string *protocol) { + std::string http("http://"); + std::string https("https://"); + + if (url->compare(0, http.size(), http) == 0) + *protocol = http; + + if (url->compare(0, https.size(), https) == 0) + *protocol = https; + + if (!protocol->empty()) { + size_t pos = url->find_first_of(":", protocol->size()); + + if (pos == std::string::npos) { + pos = url->size(); + } + + *host = url->substr(protocol->size(), pos - protocol->size()); + + if (pos < url->size() && (*url)[pos] == ':') { + size_t ppos = url->find_first_of("/", pos); + if (ppos == std::string::npos) { + ppos = url->size(); + } + std::string portStr(url->substr(pos + 1, ppos - pos - 1)); + if (portStr.size() > 0) { + *port = std::stoi(portStr); + } + } + } +} + +} /* namespace utils */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/test/TestBase.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/TestBase.cpp b/libminifi/test/TestBase.cpp index d61bdac..a384d21 100644 --- a/libminifi/test/TestBase.cpp +++ b/libminifi/test/TestBase.cpp @@ -27,8 +27,8 @@ TestPlan::TestPlan(std::shared_ptr<core::ContentRepository> content_repo, std::s content_repo_(content_repo), flow_repo_(flow_repo), prov_repo_(prov_repo), - location(-1), finalized(false), + location(-1), current_flowfile_(nullptr), logger_(logging::LoggerFactory<TestPlan>::getLogger()) { stream_factory = std::make_shared<org::apache::nifi::minifi::io::StreamFactory>(std::make_shared<minifi::Configure>()); @@ -117,7 +117,7 @@ bool linkToPrevious) { bool TestPlan::setProperty(const std::shared_ptr<core::Processor> proc, const std::string &prop, const std::string &value) { std::lock_guard<std::recursive_mutex> guard(mutex); - int i = 0; + uint32_t i = 0; logger_->log_info("Attempting to set property %s %s for %s", prop, value, proc->getName()); for (i = 0; i < processor_queue_.size(); i++) { if (processor_queue_.at(i) == proc) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/test/TestBase.h ---------------------------------------------------------------------- diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h index c4bc7d7..c6cebb2 100644 --- a/libminifi/test/TestBase.h +++ b/libminifi/test/TestBase.h @@ -184,16 +184,12 @@ class TestPlan { protected: - std::shared_ptr<logging::Logger> logger_; - void finalize(); std::shared_ptr<minifi::Connection> buildFinalConnection(std::shared_ptr<core::Processor> processor, bool setDest = false); std::shared_ptr<org::apache::nifi::minifi::io::StreamFactory> stream_factory; - std::atomic<bool> finalized; - std::shared_ptr<core::ContentRepository> content_repo_; std::shared_ptr<core::Repository> flow_repo_; @@ -203,6 +199,8 @@ class TestPlan { std::recursive_mutex mutex; + std::atomic<bool> finalized; + int location; std::shared_ptr<core::ProcessSession> current_session_; @@ -217,6 +215,10 @@ class TestPlan { std::vector<std::shared_ptr<core::ProcessSessionFactory>> factories_; std::vector<std::shared_ptr<minifi::Connection>> relationships_; core::Relationship termination_; + + private: + + std::shared_ptr<logging::Logger> logger_; }; class TestController { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/test/TestServer.h ---------------------------------------------------------------------- diff --git a/libminifi/test/TestServer.h b/libminifi/test/TestServer.h index 29f6345..06f996c 100644 --- a/libminifi/test/TestServer.h +++ b/libminifi/test/TestServer.h @@ -27,20 +27,6 @@ /* Server context handle */ static std::string resp_str; -static int responder(struct mg_connection *conn, void *response) { - const char *msg = resp_str.c_str(); - - mg_printf(conn, "HTTP/1.1 200 OK\r\n" - "Content-Length: %lu\r\n" - "Content-Type: text/plain\r\n" - "Connection: close\r\n\r\n", - resp_str.size()); - - mg_write(conn, msg, resp_str.size()); - - return 200; -} - void init_webserver() { mg_init_library(0); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/test/archive-tests/util/ArchiveTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/archive-tests/util/ArchiveTests.cpp b/libminifi/test/archive-tests/util/ArchiveTests.cpp index f6e7d7e..b164032 100644 --- a/libminifi/test/archive-tests/util/ArchiveTests.cpp +++ b/libminifi/test/archive-tests/util/ArchiveTests.cpp @@ -28,175 +28,177 @@ #include "../../TestBase.h" TAE_MAP_T build_test_archive_map(int NUM_FILES, const char** FILE_NAMES, const char** FILE_CONTENT) { - TAE_MAP_T test_entries; - - for (int i = 0; i < NUM_FILES; i++) { - std::string name {FILE_NAMES[i]}; - TestArchiveEntry entry; - - entry.name = name; - entry.content = FILE_CONTENT[i]; - entry.size = strlen(FILE_CONTENT[i]); - entry.type = AE_IFREG; - entry.perms = 0765; - entry.uid = 12; entry.gid = 34; - entry.mtime = time(nullptr); - entry.mtime_nsec = 3.14159; - - test_entries[name] = entry; - } - - return test_entries; + TAE_MAP_T test_entries; + + for (int i = 0; i < NUM_FILES; i++) { + std::string name { FILE_NAMES[i] }; + TestArchiveEntry entry; + + entry.name = name; + entry.content = FILE_CONTENT[i]; + entry.size = strlen(FILE_CONTENT[i]); + entry.type = AE_IFREG; + entry.perms = 0765; + entry.uid = 12; + entry.gid = 34; + entry.mtime = time(nullptr); + entry.mtime_nsec = 3; + + test_entries[name] = entry; + } + + return test_entries; } FN_VEC_T build_test_archive_order(int NUM_FILES, const char** FILE_NAMES) { - FN_VEC_T ret; - for (int i = 0; i < NUM_FILES; i++) ret.push_back(FILE_NAMES[i]); - return ret; + FN_VEC_T ret; + for (int i = 0; i < NUM_FILES; i++) + ret.push_back(FILE_NAMES[i]); + return ret; } OrderedTestArchive build_ordered_test_archive(int NUM_FILES, const char** FILE_NAMES, const char** FILE_CONTENT) { - OrderedTestArchive ret; - ret.map = build_test_archive_map(NUM_FILES, FILE_NAMES, FILE_CONTENT); - ret.order = build_test_archive_order(NUM_FILES, FILE_NAMES); - return ret; + OrderedTestArchive ret; + ret.map = build_test_archive_map(NUM_FILES, FILE_NAMES, FILE_CONTENT); + ret.order = build_test_archive_order(NUM_FILES, FILE_NAMES); + return ret; } void build_test_archive(std::string path, TAE_MAP_T entries, FN_VEC_T order) { - std::cout << "Creating " << path << std::endl; - archive * test_archive = archive_write_new(); + std::cout << "Creating " << path << std::endl; + archive * test_archive = archive_write_new(); - archive_write_set_format_ustar(test_archive); - archive_write_open_filename(test_archive, path.c_str()); - struct archive_entry* entry = archive_entry_new(); + archive_write_set_format_ustar(test_archive); + archive_write_open_filename(test_archive, path.c_str()); + struct archive_entry* entry = archive_entry_new(); - if (order.empty()) { // Use map sort order - for (auto &kvp : entries) - order.push_back(kvp.first); - } + if (order.empty()) { // Use map sort order + for (auto &kvp : entries) + order.push_back(kvp.first); + } - for (std::string name : order) { - TestArchiveEntry test_entry = entries.at(name); + for (std::string name : order) { + TestArchiveEntry test_entry = entries.at(name); - std::cout << "Adding entry: " << name << std::endl; + std::cout << "Adding entry: " << name << std::endl; - archive_entry_set_filetype(entry, test_entry.type); - archive_entry_set_pathname(entry, test_entry.name.c_str()); - archive_entry_set_size(entry, test_entry.size); - archive_entry_set_perm(entry, test_entry.perms); - archive_entry_set_uid(entry, test_entry.uid); - archive_entry_set_gid(entry, test_entry.gid); - archive_entry_set_mtime(entry, test_entry.mtime, test_entry.mtime_nsec); + archive_entry_set_filetype(entry, test_entry.type); + archive_entry_set_pathname(entry, test_entry.name.c_str()); + archive_entry_set_size(entry, test_entry.size); + archive_entry_set_perm(entry, test_entry.perms); + archive_entry_set_uid(entry, test_entry.uid); + archive_entry_set_gid(entry, test_entry.gid); + archive_entry_set_mtime(entry, test_entry.mtime, test_entry.mtime_nsec); - archive_write_header(test_archive, entry); - archive_write_data(test_archive, test_entry.content, test_entry.size); + archive_write_header(test_archive, entry); + archive_write_data(test_archive, test_entry.content, test_entry.size); - archive_entry_clear(entry); - } + archive_entry_clear(entry); + } - archive_entry_free(entry); - archive_write_close(test_archive); + archive_entry_free(entry); + archive_write_close(test_archive); } void build_test_archive(std::string path, OrderedTestArchive ordered_archive) { - build_test_archive(path, ordered_archive.map, ordered_archive.order); + build_test_archive(path, ordered_archive.map, ordered_archive.order); } bool check_archive_contents(std::string path, TAE_MAP_T entries, bool check_attributes, FN_VEC_T order) { - FN_VEC_T read_names; - FN_VEC_T extra_names; - bool ok = true; - struct archive *a = archive_read_new(); - struct archive_entry *entry; - - archive_read_support_format_all(a); - archive_read_support_filter_all(a); + FN_VEC_T read_names; + FN_VEC_T extra_names; + bool ok = true; + struct archive *a = archive_read_new(); + struct archive_entry *entry; + + archive_read_support_format_all(a); + archive_read_support_filter_all(a); + + int r = archive_read_open_filename(a, path.c_str(), 16384); + + if (r != ARCHIVE_OK) { + std::cout << "Unable to open archive " << path << " for checking!" << std::endl; + return false; + } + + while (archive_read_next_header(a, &entry) == ARCHIVE_OK) { + std::string name { archive_entry_pathname(entry) }; + auto it = entries.find(name); + if (it == entries.end()) { + extra_names.push_back(name); + } else { + read_names.push_back(name); + TestArchiveEntry test_entry = it->second; + size_t size = archive_entry_size(entry); + + std::cout << "Checking archive entry: " << name << std::endl; + + REQUIRE(size == test_entry.size); + + if (size > 0) { + int rlen, nlen = 0; + const char* buf[size]; + bool read_ok = true; + + for (;;) { + rlen = archive_read_data(a, buf, size); + nlen += rlen; + if (rlen == 0) + break; + if (rlen < 0) { + std::cout << "FAIL: Negative size read?" << std::endl; + read_ok = false; + break; + } + } - int r = archive_read_open_filename(a, path.c_str(), 16384); + if (read_ok) { + REQUIRE(nlen == size); + REQUIRE(memcmp(buf, test_entry.content, size) == 0); + } + } - if (r != ARCHIVE_OK) { - std::cout << "Unable to open archive " << path << " for checking!" << std::endl; - return false; - } + REQUIRE(archive_entry_filetype(entry) == test_entry.type); - while (archive_read_next_header(a, &entry) == ARCHIVE_OK) { - std::string name {archive_entry_pathname(entry)}; - auto it = entries.find(name); - if (it == entries.end()) { - extra_names.push_back(name); - } else { - read_names.push_back(name); - TestArchiveEntry test_entry = it->second; - size_t size = archive_entry_size(entry); - - std::cout << "Checking archive entry: " << name << std::endl; - - REQUIRE(size == test_entry.size); - - if (size > 0) { - int rlen, nlen = 0; - const char* buf[size]; - bool read_ok = true; - - for (;;) { - rlen = archive_read_data(a, buf, size); - nlen += rlen; - if (rlen == 0) break; - if (rlen < 0) { - std::cout << "FAIL: Negative size read?" << std::endl; - read_ok = false; - break; - } - } - - if (read_ok) { - REQUIRE(nlen == size); - REQUIRE(memcmp(buf, test_entry.content, size) == 0); - } - } - - REQUIRE(archive_entry_filetype(entry) == test_entry.type); - - if (check_attributes) { - REQUIRE(archive_entry_uid(entry) == test_entry.uid); - REQUIRE(archive_entry_gid(entry) == test_entry.gid); - REQUIRE(archive_entry_perm(entry) == test_entry.perms); - REQUIRE(archive_entry_mtime(entry) == test_entry.mtime); - } - } + if (check_attributes) { + REQUIRE(archive_entry_uid(entry) == test_entry.uid); + REQUIRE(archive_entry_gid(entry) == test_entry.gid); + REQUIRE(archive_entry_perm(entry) == test_entry.perms); + REQUIRE(archive_entry_mtime(entry) == test_entry.mtime); + } } + } - archive_read_close(a); - archive_read_free(a); + archive_read_close(a); + archive_read_free(a); - if (!extra_names.empty()) { - ok = false; - std::cout << "Extra files found: "; - for (std::string filename : extra_names) std::cout << filename << " "; - std::cout << std::endl; - } + if (!extra_names.empty()) { + ok = false; + std::cout << "Extra files found: "; + for (std::string filename : extra_names) + std::cout << filename << " "; + std::cout << std::endl; + } - REQUIRE(extra_names.empty()); + REQUIRE(extra_names.empty()); - if (!order.empty()) { - REQUIRE(order.size() == entries.size()); - } + if (!order.empty()) { + REQUIRE(order.size() == entries.size()); + } - if (!order.empty()) { - REQUIRE(read_names == order); - } else { - std::set<std::string> read_names_set(read_names.begin(), read_names.end()); - std::set<std::string> test_file_entries_set; - std::transform(entries.begin(), entries.end(), - std::inserter(test_file_entries_set, test_file_entries_set.end()), - [](std::pair<std::string, TestArchiveEntry> p){ return p.first; }); + if (!order.empty()) { + REQUIRE(read_names == order); + } else { + std::set<std::string> read_names_set(read_names.begin(), read_names.end()); + std::set<std::string> test_file_entries_set; + std::transform(entries.begin(), entries.end(), std::inserter(test_file_entries_set, test_file_entries_set.end()), [](std::pair<std::string, TestArchiveEntry> p) {return p.first;}); - REQUIRE(read_names_set == test_file_entries_set); - } + REQUIRE(read_names_set == test_file_entries_set); + } - return ok; + return ok; } bool check_archive_contents(std::string path, OrderedTestArchive archive, bool check_attributes) { - return check_archive_contents(path, archive.map, check_attributes, archive.order); + return check_archive_contents(path, archive.map, check_attributes, archive.order); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/test/curl-tests/sitetositehttp/HTTPHandlers.h ---------------------------------------------------------------------- diff --git a/libminifi/test/curl-tests/sitetositehttp/HTTPHandlers.h b/libminifi/test/curl-tests/sitetositehttp/HTTPHandlers.h index 3fe4623..911d6d4 100644 --- a/libminifi/test/curl-tests/sitetositehttp/HTTPHandlers.h +++ b/libminifi/test/curl-tests/sitetositehttp/HTTPHandlers.h @@ -19,8 +19,6 @@ #include "CivetServer.h" #include "concurrentqueue.h" - - #include "CivetStream.h" #include "io/CRCStream.h" #ifndef LIBMINIFI_TEST_CURL_TESTS_SITETOSITEHTTP_HTTPHANDLERS_H_ @@ -35,8 +33,8 @@ class FlowObj { } explicit FlowObj(const FlowObj &&other) - : attributes(std::move(other.attributes)), - total_size(std::move(other.total_size)), + : total_size(std::move(other.total_size)), + attributes(std::move(other.attributes)), data(std::move(other.data)) { } @@ -146,24 +144,23 @@ class TransactionResponder : public CivetHandler { return transaction_id_str; } protected: - moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *flow_files_feed_; + std::string base_url; std::string transaction_id_str; - std::string base_url;bool wrong_uri;bool empty_transaction_uri;bool input_port; + bool wrong_uri; + bool empty_transaction_uri; + bool input_port; std::string port_id; + moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *flow_files_feed_; }; class FlowFileResponder : public CivetHandler { - - moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> flow_files_; - moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *flow_files_feed_; - public: explicit FlowFileResponder(bool input_port, bool wrong_uri, bool invalid_checksum) : wrong_uri(wrong_uri), input_port(input_port), - flow_files_feed_(nullptr), - invalid_checksum(invalid_checksum) { + invalid_checksum(invalid_checksum), + flow_files_feed_(nullptr) { } moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *getFlows() { @@ -269,30 +266,32 @@ class FlowFileResponder : public CivetHandler { } protected: -// base url + // base url std::string base_url; -// set the wrong url + // set the wrong url bool wrong_uri; -// we are running an input port + // we are running an input port bool input_port; -// invalid checksum is returned. + // invalid checksum is returned. bool invalid_checksum; + moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> flow_files_; + moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *flow_files_feed_; }; class DeleteTransactionResponder : public CivetHandler { public: explicit DeleteTransactionResponder(const std::string base_url, std::string response_code, int expected_resp_code) - : base_url(base_url), - response_code(response_code), - flow_files_feed_(nullptr) { + : flow_files_feed_(nullptr), + base_url(base_url), + response_code(response_code) { expected_resp_code_str = std::to_string(expected_resp_code); } explicit DeleteTransactionResponder(const std::string base_url, std::string response_code, moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *feed) - : base_url(base_url), - response_code(response_code), - flow_files_feed_(feed) { + : flow_files_feed_(feed), + base_url(base_url), + response_code(response_code) { } bool handleDelete(CivetServer *server, struct mg_connection *conn) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/test/gps-tests/GPSTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/gps-tests/GPSTests.cpp b/libminifi/test/gps-tests/GPSTests.cpp index 01464bf..db34def 100644 --- a/libminifi/test/gps-tests/GPSTests.cpp +++ b/libminifi/test/gps-tests/GPSTests.cpp @@ -28,6 +28,7 @@ #include "../TestBase.h" #include "core/Core.h" #include "core/FlowFile.h" +#include "processors/GetFile.h" #include "../unit/ProvenanceTestHelper.h" #include "core/Processor.h" #include "core/ProcessContext.h" http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/002b0743/libminifi/test/unit/ProvenanceTestHelper.h ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/ProvenanceTestHelper.h b/libminifi/test/unit/ProvenanceTestHelper.h index d3bccd0..db7b35a 100644 --- a/libminifi/test/unit/ProvenanceTestHelper.h +++ b/libminifi/test/unit/ProvenanceTestHelper.h @@ -27,11 +27,18 @@ #include <utility> #include <vector> #include "core/repository/VolatileContentRepository.h" -#include "../../include/core/Processor.h" -#include "../../include/Connection.h" -#include "../../include/FlowController.h" -#include "../../include/properties/Configure.h" -#include "../../include/provenance/Provenance.h" +#include "core/Processor.h" +#include "Connection.h" +#include "FlowController.h" +#include "properties/Configure.h" +#include "provenance/Provenance.h" +#if defined(__clang__) +#pragma clang diagnostic push +#pragma clang diagnostic ignored "-Woverloaded-virtual" +#elif defined(__GNUC__) || defined(__GNUG__) +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Woverloaded-virtual" +#endif /** * Test repository @@ -39,7 +46,8 @@ class TestRepository : public core::Repository { public: TestRepository() - : Repository("repo_name", "./dir", 1000, 100, 0), core::SerializableComponent("repo_name",0) { + : core::SerializableComponent("repo_name", 0), + Repository("repo_name", "./dir", 1000, 100, 0) { } // initialize bool initialize() { @@ -127,7 +135,7 @@ class TestRepository : public core::Repository { void getProvenanceRecord(std::vector<std::shared_ptr<provenance::ProvenanceEventRecord>> &records, int maxSize) { for (auto entry : repositoryResults) { - if (records.size() >= maxSize) + if (records.size() >= (uint64_t)maxSize) break; std::shared_ptr<provenance::ProvenanceEventRecord> eventRead = std::make_shared<provenance::ProvenanceEventRecord>(); @@ -147,7 +155,8 @@ class TestRepository : public core::Repository { class TestFlowRepository : public core::Repository { public: TestFlowRepository() - : core::Repository("ff", "./dir", 1000, 100, 0), core::SerializableComponent("ff",0) { + : core::SerializableComponent("ff", 0), + core::Repository("ff", "./dir", 1000, 100, 0) { } // initialize bool initialize() { @@ -185,7 +194,7 @@ class TestFlowRepository : public core::Repository { void getProvenanceRecord(std::vector<std::shared_ptr<provenance::ProvenanceEventRecord>> &records, int maxSize) { for (auto entry : repositoryResults) { - if (records.size() >= maxSize) + if (records.size() >= (uint64_t)maxSize) break; std::shared_ptr<provenance::ProvenanceEventRecord> eventRead = std::make_shared<provenance::ProvenanceEventRecord>(); @@ -266,5 +275,9 @@ class TestFlowController : public minifi::FlowController { void initializePaths(const std::string &adjustedFilename) { } }; - +#if defined(__clang__) +#pragma clang diagnostic pop +#elif defined(__GNUC__) || defined(__GNUG__) +#pragma GCC diagnostic pop +#endif #endif /* LIBMINIFI_TEST_UNIT_PROVENANCETESTHELPER_H_ */