http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/io/DataStream.h ---------------------------------------------------------------------- diff --git a/libminifi/include/io/DataStream.h b/libminifi/include/io/DataStream.h index 2f793ff..e37dbf7 100644 --- a/libminifi/include/io/DataStream.h +++ b/libminifi/include/io/DataStream.h @@ -22,99 +22,111 @@ #include <cstdint> #include <vector> #include "EndianCheck.h" - +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace io { /** * DataStream defines the mechanism through which * binary data will be written to a sink */ class DataStream { -public: - - DataStream() : - readBuffer(0) { - - } - - /** - * Constructor - **/ - explicit DataStream(const uint8_t *buf, const uint32_t buflen) : - DataStream() { - writeData((uint8_t*) buf, buflen); - - } - - virtual short initialize() { - buffer.clear(); - readBuffer = 0; - return 0; - } - - virtual void closeStream() - { - - } - /** - * Reads data and places it into buf - * @param buf buffer in which we extract data - * @param buflen - */ - virtual int readData(std::vector<uint8_t> &buf, int buflen); - /** - * Reads data and places it into buf - * @param buf buffer in which we extract data - * @param buflen - */ - virtual int readData(uint8_t *buf, int buflen); - - /** - * writes valiue to buffer - * @param value value to write - * @param size size of value - */ - virtual int writeData(uint8_t *value, int size); - - /** - * Reads a system word - * @param value value to write - */ - virtual int read(uint64_t &value, bool is_little_endian = - EndiannessCheck::IS_LITTLE); - - /** - * Reads a uint32_t - * @param value value to write - */ - virtual int read(uint32_t &value, bool is_little_endian = - EndiannessCheck::IS_LITTLE); - - /** - * Reads a system short - * @param value value to write - */ - virtual int read(uint16_t &value, bool is_little_endian = - EndiannessCheck::IS_LITTLE); - - /** - * Returns the underlying buffer - * @return vector's array - **/ - const uint8_t *getBuffer() const { - return &buffer[0]; - } - - /** - * Retrieve size of data stream - * @return size of data stream - **/ - const uint32_t getSize() const { - return buffer.size(); - } - -protected: - // All serialization related method and internal buf - std::vector<uint8_t> buffer; - uint32_t readBuffer; + public: + + DataStream() + : readBuffer(0) { + + } + + ~DataStream() { + + } + + /** + * Constructor + **/ + explicit DataStream(const uint8_t *buf, const uint32_t buflen) + : DataStream() { + writeData((uint8_t*) buf, buflen); + + } + + virtual short initialize() { + buffer.clear(); + readBuffer = 0; + return 0; + } + + virtual void closeStream() { + + } + /** + * Reads data and places it into buf + * @param buf buffer in which we extract data + * @param buflen + */ + virtual int readData(std::vector<uint8_t> &buf, int buflen); + /** + * Reads data and places it into buf + * @param buf buffer in which we extract data + * @param buflen + */ + virtual int readData(uint8_t *buf, int buflen); + + /** + * writes valiue to buffer + * @param value value to write + * @param size size of value + */ + virtual int writeData(uint8_t *value, int size); + + /** + * Reads a system word + * @param value value to write + */ + virtual int read(uint64_t &value, bool is_little_endian = + EndiannessCheck::IS_LITTLE); + + /** + * Reads a uint32_t + * @param value value to write + */ + virtual int read(uint32_t &value, bool is_little_endian = + EndiannessCheck::IS_LITTLE); + + /** + * Reads a system short + * @param value value to write + */ + virtual int read(uint16_t &value, bool is_little_endian = + EndiannessCheck::IS_LITTLE); + + /** + * Returns the underlying buffer + * @return vector's array + **/ + const uint8_t *getBuffer() const { + return &buffer[0]; + } + + /** + * Retrieve size of data stream + * @return size of data stream + **/ + const uint32_t getSize() const { + return buffer.size(); + } + + protected: + // All serialization related method and internal buf + std::vector<uint8_t> buffer; + uint32_t readBuffer; }; +} /* namespace io */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ #endif /* LIBMINIFI_INCLUDE_IO_DATASTREAM_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/io/EndianCheck.h ---------------------------------------------------------------------- diff --git a/libminifi/include/io/EndianCheck.h b/libminifi/include/io/EndianCheck.h index ef900e0..3ceb19c 100644 --- a/libminifi/include/io/EndianCheck.h +++ b/libminifi/include/io/EndianCheck.h @@ -1,5 +1,5 @@ /** - * + * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -16,32 +16,34 @@ * limitations under the License. */ - #ifndef LIBMINIFI_INCLUDE_IO_ENDIANCHECK_H_ #define LIBMINIFI_INCLUDE_IO_ENDIANCHECK_H_ - - - +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace io { /** * Mechanism to determine endianness of host. * Accounts for only BIG/LITTLE/BIENDIAN **/ -class EndiannessCheck -{ -public: - static bool IS_LITTLE; -private: - - static bool is_little_endian() { - /* do whatever is needed at static init time */ - unsigned int x = 1; - char *c = (char*) &x; - IS_LITTLE=*c==1; - return IS_LITTLE; - } +class EndiannessCheck { + public: + static bool IS_LITTLE; + private: + + static bool is_little_endian() { + /* do whatever is needed at static init time */ + unsigned int x = 1; + char *c = (char*) &x; + IS_LITTLE = *c == 1; + return IS_LITTLE; + } }; - - - +} /* namespace io */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ #endif /* LIBMINIFI_INCLUDE_IO_ENDIANCHECK_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/io/Serializable.h ---------------------------------------------------------------------- diff --git a/libminifi/include/io/Serializable.h b/libminifi/include/io/Serializable.h index 59d3a73..5ee886b 100644 --- a/libminifi/include/io/Serializable.h +++ b/libminifi/include/io/Serializable.h @@ -1,5 +1,5 @@ /** - * + * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -22,9 +22,11 @@ #include <string> #include "EndianCheck.h" #include "DataStream.h" - - - +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace io { /** * Serializable instances provide base functionality to * write certain objects/primitives to a data stream. @@ -32,155 +34,158 @@ */ class Serializable { -public: - - /** - * Inline function to write T to stream - **/ - template<typename T> - inline int writeData(const T &t,DataStream *stream); - - /** - * Inline function to write T to to_vec - **/ - template<typename T> - inline int writeData(const T &t, uint8_t *to_vec); - - /** - * Inline function to write T to to_vec - **/ - template<typename T> - inline int writeData(const T &t, std::vector<uint8_t> &to_vec); - - - /** - * write byte to stream - * @return resulting write size - **/ - int write(uint8_t value,DataStream *stream); - - /** - * write byte to stream - * @return resulting write size - **/ - int write(char value,DataStream *stream); - - /** - * write 4 bytes to stream - * @param base_value non encoded value - * @param stream output stream - * @param is_little_endian endianness determination - * @return resulting write size - **/ - int write(uint32_t base_value,DataStream *stream, bool is_little_endian = - EndiannessCheck::IS_LITTLE); - - /** - * write 2 bytes to stream - * @param base_value non encoded value - * @param stream output stream - * @param is_little_endian endianness determination - * @return resulting write size - **/ - int write(uint16_t base_value,DataStream *stream, bool is_little_endian = - EndiannessCheck::IS_LITTLE); - - /** - * write valueto stream - * @param value non encoded value - * @param len length of value - * @param strema output stream - * @return resulting write size - **/ - int write(uint8_t *value, int len,DataStream *stream); - - /** - * write 8 bytes to stream - * @param base_value non encoded value - * @param stream output stream - * @param is_little_endian endianness determination - * @return resulting write size - **/ - int write(uint64_t base_value,DataStream *stream, bool is_little_endian = - EndiannessCheck::IS_LITTLE); - - /** - * write bool to stream - * @param value non encoded value - * @return resulting write size - **/ - int write(bool value); - - /** - * write UTF string to stream - * @param str string to write - * @return resulting write size - **/ - int writeUTF(std::string str,DataStream *stream, bool widen = false); - - /** - * reads a byte from the stream - * @param value reference in which will set the result - * @param stream stream from which we will read - * @return resulting read size - **/ - int read(uint8_t &value,DataStream *stream); - - /** - * reads two bytes from the stream - * @param value reference in which will set the result - * @param stream stream from which we will read - * @return resulting read size - **/ - int read(uint16_t &base_value,DataStream *stream, bool is_little_endian = - EndiannessCheck::IS_LITTLE); - - /** - * reads a byte from the stream - * @param value reference in which will set the result - * @param stream stream from which we will read - * @return resulting read size - **/ - int read(char &value,DataStream *stream); - - /** - * reads a byte array from the stream - * @param value reference in which will set the result - * @param len length to read - * @param stream stream from which we will read - * @return resulting read size - **/ - int read(uint8_t *value, int len,DataStream *stream); - - /** - * reads four bytes from the stream - * @param value reference in which will set the result - * @param stream stream from which we will read - * @return resulting read size - **/ - int read(uint32_t &value,DataStream *stream, - bool is_little_endian = EndiannessCheck::IS_LITTLE); - - /** - * reads eight byte from the stream - * @param value reference in which will set the result - * @param stream stream from which we will read - * @return resulting read size - **/ - int read(uint64_t &value,DataStream *stream, - bool is_little_endian = EndiannessCheck::IS_LITTLE); - - /** - * read UTF from stream - * @param str reference string - * @param stream stream from which we will read - * @return resulting read size - **/ - int readUTF(std::string &str,DataStream *stream, bool widen = false); - -protected: + public: + + /** + * Inline function to write T to stream + **/ + template<typename T> + inline int writeData(const T &t, DataStream *stream); + + /** + * Inline function to write T to to_vec + **/ + template<typename T> + inline int writeData(const T &t, uint8_t *to_vec); + + /** + * Inline function to write T to to_vec + **/ + template<typename T> + inline int writeData(const T &t, std::vector<uint8_t> &to_vec); + + /** + * write byte to stream + * @return resulting write size + **/ + int write(uint8_t value, DataStream *stream); + + /** + * write byte to stream + * @return resulting write size + **/ + int write(char value, DataStream *stream); + + /** + * write 4 bytes to stream + * @param base_value non encoded value + * @param stream output stream + * @param is_little_endian endianness determination + * @return resulting write size + **/ + int write(uint32_t base_value, DataStream *stream, bool is_little_endian = + EndiannessCheck::IS_LITTLE); + + /** + * write 2 bytes to stream + * @param base_value non encoded value + * @param stream output stream + * @param is_little_endian endianness determination + * @return resulting write size + **/ + int write(uint16_t base_value, DataStream *stream, bool is_little_endian = + EndiannessCheck::IS_LITTLE); + + /** + * write valueto stream + * @param value non encoded value + * @param len length of value + * @param strema output stream + * @return resulting write size + **/ + int write(uint8_t *value, int len, DataStream *stream); + + /** + * write 8 bytes to stream + * @param base_value non encoded value + * @param stream output stream + * @param is_little_endian endianness determination + * @return resulting write size + **/ + int write(uint64_t base_value, DataStream *stream, bool is_little_endian = + EndiannessCheck::IS_LITTLE); + + /** + * write bool to stream + * @param value non encoded value + * @return resulting write size + **/ + int write(bool value); + + /** + * write UTF string to stream + * @param str string to write + * @return resulting write size + **/ + int writeUTF(std::string str, DataStream *stream, bool widen = false); + + /** + * reads a byte from the stream + * @param value reference in which will set the result + * @param stream stream from which we will read + * @return resulting read size + **/ + int read(uint8_t &value, DataStream *stream); + + /** + * reads two bytes from the stream + * @param value reference in which will set the result + * @param stream stream from which we will read + * @return resulting read size + **/ + int read(uint16_t &base_value, DataStream *stream, bool is_little_endian = + EndiannessCheck::IS_LITTLE); + + /** + * reads a byte from the stream + * @param value reference in which will set the result + * @param stream stream from which we will read + * @return resulting read size + **/ + int read(char &value, DataStream *stream); + + /** + * reads a byte array from the stream + * @param value reference in which will set the result + * @param len length to read + * @param stream stream from which we will read + * @return resulting read size + **/ + int read(uint8_t *value, int len, DataStream *stream); + + /** + * reads four bytes from the stream + * @param value reference in which will set the result + * @param stream stream from which we will read + * @return resulting read size + **/ + int read(uint32_t &value, DataStream *stream, bool is_little_endian = + EndiannessCheck::IS_LITTLE); + + /** + * reads eight byte from the stream + * @param value reference in which will set the result + * @param stream stream from which we will read + * @return resulting read size + **/ + int read(uint64_t &value, DataStream *stream, bool is_little_endian = + EndiannessCheck::IS_LITTLE); + + /** + * read UTF from stream + * @param str reference string + * @param stream stream from which we will read + * @return resulting read size + **/ + int readUTF(std::string &str, DataStream *stream, bool widen = false); + + protected: }; - +} /* namespace io */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ #endif /* LIBMINIFI_INCLUDE_IO_SERIALIZABLE_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/io/SocketFactory.h ---------------------------------------------------------------------- diff --git a/libminifi/include/io/SocketFactory.h b/libminifi/include/io/SocketFactory.h deleted file mode 100644 index c8cbcb1..0000000 --- a/libminifi/include/io/SocketFactory.h +++ /dev/null @@ -1,91 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef SOCKET_FACTORY_H -#define SOCKET_FACTORY_H - -#include "ClientSocket.h" -#include "TLSSocket.h" -#include "ClientSocket.h" -#include "Configure.h" -#include "utils/StringUtils.h" - -/** - Purpose: Due to the current design this is the only mechanism by which we can - inject different socket types - -**/ -class SocketFactory{ -public: - - /** - * Build an instance, creating a memory fence, which - * allows us to avoid locking. This is tantamount to double checked locking. - * @returns new SocketFactory; - */ - static SocketFactory *getInstance() { - SocketFactory* atomic_context = context_instance_.load( - std::memory_order_relaxed); - std::atomic_thread_fence(std::memory_order_acquire); - if (atomic_context == nullptr) { - std::lock_guard<std::mutex> lock(context_mutex_); - atomic_context = context_instance_.load(std::memory_order_relaxed); - if (atomic_context == nullptr) { - atomic_context = new SocketFactory(); - std::atomic_thread_fence(std::memory_order_release); - context_instance_.store(atomic_context, - std::memory_order_relaxed); - } - } - return atomic_context; - } - - /** - * Creates a socket and returns a unique ptr - * - */ - std::unique_ptr<Socket> createSocket(const std::string &host, const uint16_t port) { - Socket *socket = 0; - if (is_secure_) { -#ifdef OPENSSL_SUPPORT - socket = new TLSSocket(host, port); -#else - socket = 0; -#endif - } else { - socket = new Socket(host, port); - } - return std::unique_ptr<Socket>(socket); - } -protected: - SocketFactory() : - configure_(Configure::getConfigure()) { - std::string secureStr; - is_secure_ = false; - if (configure_->get(Configure::nifi_remote_input_secure, secureStr)) { - StringUtils::StringToBool(secureStr, is_secure_); - } - } - - bool is_secure_; - static std::atomic<SocketFactory*> context_instance_; - static std::mutex context_mutex_; - - Configure *configure_; -}; - -#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/io/Sockets.h ---------------------------------------------------------------------- diff --git a/libminifi/include/io/Sockets.h b/libminifi/include/io/Sockets.h new file mode 100644 index 0000000..2c0b163 --- /dev/null +++ b/libminifi/include/io/Sockets.h @@ -0,0 +1,27 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_IO_SOCKET_H_ +#define LIBMINIFI_INCLUDE_IO_SOCKET_H_ + +#include "ClientSocket.h" + +#ifdef OPENSSL_SUPPORT +#include "tls/TLSSocket.h" +#endif + +#endif /* LIBMINIFI_INCLUDE_IO_TLS_SECURESOCKET_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/io/StreamFactory.h ---------------------------------------------------------------------- diff --git a/libminifi/include/io/StreamFactory.h b/libminifi/include/io/StreamFactory.h new file mode 100644 index 0000000..faa10b5 --- /dev/null +++ b/libminifi/include/io/StreamFactory.h @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef SOCKET_FACTORY_H +#define SOCKET_FACTORY_H + +#include "properties/Configure.h" +#include "Sockets.h" +#include "utils/StringUtils.h" +#include "validation.h" + +#ifdef OPENSSL_SUPPORT +#include "tls/TLSSocket.h" +#endif + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace io { + +/** + * Purpose: Socket Creator is a class that will determine if the provided socket type + * exists per the compilation parameters + */ +template<typename T> +class SocketCreator { + + template<bool cond, typename U> + using TypeCheck = typename std::enable_if< cond, U >::type; + +public: + template<typename U = T> + TypeCheck<true, U> *create(const std::string &host, const uint16_t port) { + return new T(host, port); + } + template<typename U = T> + TypeCheck<false, U> *create(const std::string &host, const uint16_t port) { + return new Socket(host, port); + } + +}; + +/** + Purpose: Due to the current design this is the only mechanism by which we can + inject different socket types + + **/ +class StreamFactory { +public: + + /** + * Build an instance, creating a memory fence, which + * allows us to avoid locking. This is tantamount to double checked locking. + * @returns new StreamFactory; + */ + static StreamFactory *getInstance() { + StreamFactory* atomic_context = context_instance_.load( + std::memory_order_relaxed); + std::atomic_thread_fence(std::memory_order_acquire); + if (atomic_context == nullptr) { + std::lock_guard < std::mutex > lock(context_mutex_); + atomic_context = context_instance_.load(std::memory_order_relaxed); + if (atomic_context == nullptr) { + atomic_context = new StreamFactory(); + std::atomic_thread_fence(std::memory_order_release); + context_instance_.store(atomic_context, + std::memory_order_relaxed); + } + } + return atomic_context; + } + + /** + * Creates a socket and returns a unique ptr + * + */ + std::unique_ptr<Socket> createSocket(const std::string &host, + const uint16_t port) { + Socket *socket = 0; + + if (is_secure_) { + socket = createSocket<TLSSocket>(host, port); + } else { + socket = createSocket<Socket>(host, port); + } + return std::unique_ptr < Socket > (socket); + } + +protected: + + /** + * Creates a socket and returns a unique ptr + * + */ + template<typename T> + Socket *createSocket(const std::string &host, const uint16_t port) { + SocketCreator<T> creator; + return creator.create(host, port); + } + + StreamFactory() : + configure_(Configure::getConfigure()) { + std::string secureStr; + is_secure_ = false; + if (configure_->get(Configure::nifi_remote_input_secure, secureStr)) { + org::apache::nifi::minifi::utils::StringUtils::StringToBool( + secureStr, is_secure_); + } + } + + bool is_secure_; + static std::atomic<StreamFactory*> context_instance_; + static std::mutex context_mutex_; + + Configure *configure_; +}; + +} /* namespace io */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/io/TLSSocket.h ---------------------------------------------------------------------- diff --git a/libminifi/include/io/TLSSocket.h b/libminifi/include/io/TLSSocket.h deleted file mode 100644 index 32645ca..0000000 --- a/libminifi/include/io/TLSSocket.h +++ /dev/null @@ -1,187 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef LIBMINIFI_INCLUDE_IO_TLSSOCKET_H_ -#define LIBMINIFI_INCLUDE_IO_TLSSOCKET_H_ - -#ifdef OPENSSL_SUPPORT -#include <cstdint> -#include "ClientSocket.h" -#include <atomic> -#include <mutex> - -#include "Configure.h" -#include <openssl/ssl.h> -#include <openssl/err.h> - -#define TLS_ERROR_CONTEXT 1 -#define TLS_ERROR_PEM_MISSING 2 -#define TLS_ERROR_CERT_MISSING 3 -#define TLS_ERROR_KEY_ERROR 4 -#define TLS_ERROR_CERT_ERROR 5 - -class TLSContext { - -public: - - /** - * Build an instance, creating a memory fence, which - * allows us to avoid locking. This is tantamount to double checked locking. - * @returns new TLSContext; - */ - static TLSContext *getInstance() { - TLSContext* atomic_context = context_instance.load( - std::memory_order_relaxed); - std::atomic_thread_fence(std::memory_order_acquire); - if (atomic_context == nullptr) { - std::lock_guard<std::mutex> lock(context_mutex); - atomic_context = context_instance.load(std::memory_order_relaxed); - if (atomic_context == nullptr) { - atomic_context = new TLSContext(); - atomic_context->initialize(); - std::atomic_thread_fence(std::memory_order_release); - context_instance.store(atomic_context, - std::memory_order_relaxed); - } - } - return atomic_context; - } - - virtual ~TLSContext() { - if (0 != ctx) - SSL_CTX_free(ctx); - } - - SSL_CTX *getContext() { - return ctx; - } - - short getError() { - return error_value; - } - - short initialize(); - -private: - - static int pemPassWordCb(char *buf, int size, int rwflag, void *userdata) { - std::string passphrase; - - if (Configure::getConfigure()->get( - Configure::nifi_security_client_pass_phrase, passphrase)) { - - std::ifstream file(passphrase.c_str(), std::ifstream::in); - if (!file.good()) { - memset(buf, 0x00, size); - return 0; - } - - std::string password; - password.assign((std::istreambuf_iterator<char>(file)), - std::istreambuf_iterator<char>()); - file.close(); - memset(buf,0x00,size); - memcpy(buf, password.c_str(), password.length()-1); - - return password.length()-1; - } - return 0; - } - - TLSContext(); - - std::shared_ptr<Logger> logger_; - Configure *configuration; - SSL_CTX *ctx; - - short error_value; - static std::atomic<TLSContext*> context_instance; - - static std::mutex context_mutex; -}; - -class TLSSocket: public Socket { -public: - - /** - * Constructor that accepts host name, port and listeners. With this - * contructor we will be creating a server socket - * @param hostname our host name - * @param port connecting port - * @param listeners number of listeners in the queue - */ - explicit TLSSocket(const std::string &hostname, const uint16_t port, - const uint16_t listeners); - - /** - * Constructor that creates a client socket. - * @param hostname hostname we are connecting to. - * @param port port we are connecting to. - */ - explicit TLSSocket(const std::string &hostname, const uint16_t port); - - /** - * Move constructor. - */ - explicit TLSSocket(const TLSSocket &&); - - virtual ~TLSSocket(); - - /** - * Initializes the socket - * @return result of the creation operation. - */ - short initialize(); - - /** - * Attempt to select the socket file descriptor - * @param msec timeout interval to wait - * @returns file descriptor - */ - virtual short select_descriptor(const uint16_t msec); - - /** - * Reads data and places it into buf - * @param buf buffer in which we extract data - * @param buflen - */ - virtual int readData(uint8_t *buf, int buflen); - - /** - * Write value to the stream using std::vector - * @param buf incoming buffer - * @param buflen buffer to write - * - */ - int writeData(std::vector<uint8_t> &buf, int buflen); - - /** - * Write value to the stream using uint8_t ptr - * @param buf incoming buffer - * @param buflen buffer to write - * - */ - int writeData(uint8_t *value, int size); - -protected: - - SSL* ssl; - -}; -#endif - -#endif /* LIBMINIFI_INCLUDE_IO_TLSSOCKET_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/io/tls/TLSSocket.h ---------------------------------------------------------------------- diff --git a/libminifi/include/io/tls/TLSSocket.h b/libminifi/include/io/tls/TLSSocket.h new file mode 100644 index 0000000..f86f8bc --- /dev/null +++ b/libminifi/include/io/tls/TLSSocket.h @@ -0,0 +1,198 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_IO_TLSSOCKET_H_ +#define LIBMINIFI_INCLUDE_IO_TLSSOCKET_H_ + +#include <cstdint> +#include "../ClientSocket.h" +#include <atomic> +#include <mutex> + +#include "properties/Configure.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace io { + +#include <openssl/ssl.h> +#include <openssl/err.h> + +#define TLS_ERROR_CONTEXT 1 +#define TLS_ERROR_PEM_MISSING 2 +#define TLS_ERROR_CERT_MISSING 3 +#define TLS_ERROR_KEY_ERROR 4 +#define TLS_ERROR_CERT_ERROR 5 + +class TLSContext { + + public: + + /** + * Build an instance, creating a memory fence, which + * allows us to avoid locking. This is tantamount to double checked locking. + * @returns new TLSContext; + */ + static TLSContext *getInstance() { + TLSContext* atomic_context = context_instance.load( + std::memory_order_relaxed); + std::atomic_thread_fence(std::memory_order_acquire); + if (atomic_context == nullptr) { + std::lock_guard<std::mutex> lock(context_mutex); + atomic_context = context_instance.load(std::memory_order_relaxed); + if (atomic_context == nullptr) { + atomic_context = new TLSContext(); + atomic_context->initialize(); + std::atomic_thread_fence(std::memory_order_release); + context_instance.store(atomic_context, std::memory_order_relaxed); + } + } + return atomic_context; + } + + virtual ~TLSContext() { + if (0 != ctx) + SSL_CTX_free(ctx); + } + + SSL_CTX *getContext() { + return ctx; + } + + short getError() { + return error_value; + } + + short initialize(); + + private: + + static int pemPassWordCb(char *buf, int size, int rwflag, void *userdata) { + std::string passphrase; + + if (Configure::getConfigure()->get( + Configure::nifi_security_client_pass_phrase, passphrase)) { + + std::ifstream file(passphrase.c_str(), std::ifstream::in); + if (!file.good()) { + memset(buf, 0x00, size); + return 0; + } + + std::string password; + password.assign((std::istreambuf_iterator<char>(file)), + std::istreambuf_iterator<char>()); + file.close(); + memset(buf, 0x00, size); + memcpy(buf, password.c_str(), password.length() - 1); + + return password.length() - 1; + } + return 0; + } + + TLSContext(); + + std::shared_ptr<logging::Logger> logger_; + Configure *configuration; + SSL_CTX *ctx; + + short error_value; + + static std::atomic<TLSContext*> context_instance; + static std::mutex context_mutex; + +}; + +class TLSSocket : public Socket { + public: + + /** + * Constructor that accepts host name, port and listeners. With this + * contructor we will be creating a server socket + * @param hostname our host name + * @param port connecting port + * @param listeners number of listeners in the queue + */ + explicit TLSSocket(const std::string &hostname, const uint16_t port, + const uint16_t listeners); + + /** + * Constructor that creates a client socket. + * @param hostname hostname we are connecting to. + * @param port port we are connecting to. + */ + explicit TLSSocket(const std::string &hostname, const uint16_t port); + + /** + * Move constructor. + */ + explicit TLSSocket(const TLSSocket &&); + + virtual ~TLSSocket(); + + /** + * Initializes the socket + * @return result of the creation operation. + */ + short initialize(); + + /** + * Attempt to select the socket file descriptor + * @param msec timeout interval to wait + * @returns file descriptor + */ + virtual short select_descriptor(const uint16_t msec); + + /** + * Reads data and places it into buf + * @param buf buffer in which we extract data + * @param buflen + */ + virtual int readData(uint8_t *buf, int buflen); + + /** + * Write value to the stream using std::vector + * @param buf incoming buffer + * @param buflen buffer to write + * + */ + int writeData(std::vector<uint8_t> &buf, int buflen); + + /** + * Write value to the stream using uint8_t ptr + * @param buf incoming buffer + * @param buflen buffer to write + * + */ + int writeData(uint8_t *value, int size); + + protected: + + SSL* ssl; + +}; + +} /* namespace io */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_IO_TLSSOCKET_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/io/validation.h ---------------------------------------------------------------------- diff --git a/libminifi/include/io/validation.h b/libminifi/include/io/validation.h index e1b4bb6..c66c412 100644 --- a/libminifi/include/io/validation.h +++ b/libminifi/include/io/validation.h @@ -18,35 +18,38 @@ #ifndef VALIDATION_H #define VALIDATION_H +#include <type_traits> #include <string> #include <cstring> + /** * A checker that will, at compile time, tell us * if the declared type has a size method. */ template<typename T> class size_function_functor_checker { - typedef char hasit; - typedef long doesnothaveit; + typedef char hasit; + typedef long doesnothaveit; - // look for the declared type - template<typename O> static hasit test(decltype(&O::size)); - template<typename O> static doesnothaveit test(...); + // look for the declared type + template<typename O> static hasit test(decltype(&O::size)); + template<typename O> static doesnothaveit test(...); -public: - enum { - has_size_function = sizeof(test<T>(0)) == sizeof(char) - }; + public: + enum { + has_size_function = sizeof(test<T>(0)) == sizeof(char) + }; }; + /** * Determines if the variable is null or ::size() == 0 */ template<typename T> static auto IsNullOrEmpty( T &object) -> typename std::enable_if<size_function_functor_checker<T>::has_size_function==1, bool>::type { - return object.size() == 0; + return object.size() == 0; } /** @@ -55,7 +58,7 @@ static auto IsNullOrEmpty( template<typename T> static auto IsNullOrEmpty( T *object) -> typename std::enable_if<size_function_functor_checker<T>::has_size_function==1, bool>::type { - return (0 == object || object->size() == 0); + return (0 == object || object->size() == 0); } /** @@ -64,13 +67,13 @@ static auto IsNullOrEmpty( template<typename T> static auto IsNullOrEmpty( T *object) -> typename std::enable_if<not size_function_functor_checker<T>::has_size_function , bool>::type { - return (0 == object); + return (0 == object); } /** * Determines if the variable is null or strlen(str) == 0 */ static auto IsNullOrEmpty(char *str)-> decltype(NULL !=str, bool()) { - return (NULL == str || strlen(str) == 0); + return (NULL == str || strlen(str) == 0); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/processors/AppendHostInfo.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/AppendHostInfo.h b/libminifi/include/processors/AppendHostInfo.h new file mode 100644 index 0000000..6515918 --- /dev/null +++ b/libminifi/include/processors/AppendHostInfo.h @@ -0,0 +1,80 @@ +/** + * @file AppendHostInfo.h + * AppendHostInfo class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __APPEND_HOSTINFO_H__ +#define __APPEND_HOSTINFO_H__ + +#include "core/Property.h" +#include "FlowFileRecord.h" +#include "core/Processor.h" +#include "core/ProcessSession.h" +#include "core/core.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +// AppendHostInfo Class +class AppendHostInfo : public core::Processor { + public: + // Constructor + /*! + * Create a new processor + */ + AppendHostInfo(std::string name, uuid_t uuid = NULL) + : core::Processor(name, uuid) { + logger_ = logging::Logger::getLogger(); + } + // Destructor + virtual ~AppendHostInfo() { + } + // Processor Name + static const std::string ProcessorName; + // Supported Properties + static core::Property InterfaceName; + static core::Property HostAttribute; + static core::Property IPAttribute; + + // Supported Relationships + static core::Relationship Success; + + public: + // OnTrigger method, implemented by NiFi AppendHostInfo + virtual void onTrigger( + core::ProcessContext *context, + core::ProcessSession *session); + // Initialize, over write by NiFi AppendHostInfo + virtual void initialize(void); + + protected: + + private: + // Logger + std::shared_ptr<logging::Logger> logger_; +}; + +} /* namespace processors */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/processors/ExecuteProcess.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/ExecuteProcess.h b/libminifi/include/processors/ExecuteProcess.h new file mode 100644 index 0000000..123eed3 --- /dev/null +++ b/libminifi/include/processors/ExecuteProcess.h @@ -0,0 +1,125 @@ +/** + * @file ExecuteProcess.h + * ExecuteProcess class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __EXECUTE_PROCESS_H__ +#define __EXECUTE_PROCESS_H__ + +#include <stdio.h> +#include <unistd.h> +#include <string> +#include <errno.h> +#include <chrono> +#include <thread> +#include <unistd.h> +#include <sys/wait.h> +#include <iostream> +#include <sys/types.h> +#include <signal.h> +#include "FlowFileRecord.h" +#include "core/Processor.h" +#include "core/ProcessSession.h" +#include "core/core.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +// ExecuteProcess Class +class ExecuteProcess : public core::Processor { + public: + // Constructor + /*! + * Create a new processor + */ + ExecuteProcess(std::string name, uuid_t uuid = NULL) + : Processor(name, uuid) { + logger_ = logging::Logger::getLogger(); + _redirectErrorStream = false; + _batchDuration = 0; + _workingDir = "."; + _processRunning = false; + _pid = 0; + } + // Destructor + virtual ~ExecuteProcess() { + if (_processRunning && _pid > 0) + kill(_pid, SIGTERM); + } + // Processor Name + static const std::string ProcessorName; + // Supported Properties + static core::Property Command; + static core::Property CommandArguments; + static core::Property WorkingDir; + static core::Property BatchDuration; + static core::Property RedirectErrorStream; + // Supported Relationships + static core::Relationship Success; + + // Nest Callback Class for write stream + class WriteCallback : public OutputStreamCallback { + public: + WriteCallback(char *data, uint64_t size) + : _data(data), + _dataSize(size) { + } + char *_data; + uint64_t _dataSize; + void process(std::ofstream *stream) { + if (_data && _dataSize > 0) + stream->write(_data, _dataSize); + } + }; + + public: + // OnTrigger method, implemented by NiFi ExecuteProcess + virtual void onTrigger( + core::ProcessContext *context, + core::ProcessSession *session); + // Initialize, over write by NiFi ExecuteProcess + virtual void initialize(void); + + protected: + + private: + // Logger + std::shared_ptr<logging::Logger> logger_; + // Property + std::string _command; + std::string _commandArgument; + std::string _workingDir; + int64_t _batchDuration; + bool _redirectErrorStream; + // Full command + std::string _fullCommand; + // whether the process is running + bool _processRunning; + int _pipefd[2]; + pid_t _pid; +}; + +} /* namespace processors */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/processors/GenerateFlowFile.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/GenerateFlowFile.h b/libminifi/include/processors/GenerateFlowFile.h new file mode 100644 index 0000000..c4ab6fe --- /dev/null +++ b/libminifi/include/processors/GenerateFlowFile.h @@ -0,0 +1,98 @@ +/** + * @file GenerateFlowFile.h + * GenerateFlowFile class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __GENERATE_FLOW_FILE_H__ +#define __GENERATE_FLOW_FILE_H__ + +#include "FlowFileRecord.h" +#include "core/Processor.h" +#include "core/ProcessSession.h" +#include "core/core.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { +// GenerateFlowFile Class +class GenerateFlowFile : public core::Processor { + public: + // Constructor + /*! + * Create a new processor + */ + GenerateFlowFile(std::string name, uuid_t uuid = NULL) + : Processor(name, uuid) { + _data = NULL; + _dataSize = 0; + } + // Destructor + virtual ~GenerateFlowFile() { + if (_data) + delete[] _data; + } + // Processor Name + static const std::string ProcessorName; + // Supported Properties + static core::Property FileSize; + static core::Property BatchSize; + static core::Property DataFormat; + static core::Property UniqueFlowFiles; + static const char *DATA_FORMAT_BINARY; + static const char *DATA_FORMAT_TEXT; + // Supported Relationships + static core::Relationship Success; + // Nest Callback Class for write stream + class WriteCallback : public OutputStreamCallback { + public: + WriteCallback(char *data, uint64_t size) + : _data(data), + _dataSize(size) { + } + char *_data; + uint64_t _dataSize; + void process(std::ofstream *stream) { + if (_data && _dataSize > 0) + stream->write(_data, _dataSize); + } + }; + + public: + // OnTrigger method, implemented by NiFi GenerateFlowFile + virtual void onTrigger( + core::ProcessContext *context, + core::ProcessSession *session); + // Initialize, over write by NiFi GenerateFlowFile + virtual void initialize(void); + + protected: + + private: + // Generated data + char * _data; + // Size of the generate data + uint64_t _dataSize; +}; + +} /* namespace processors */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/processors/GetFile.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/GetFile.h b/libminifi/include/processors/GetFile.h new file mode 100644 index 0000000..f1f0694 --- /dev/null +++ b/libminifi/include/processors/GetFile.h @@ -0,0 +1,129 @@ +/** + * @file GetFile.h + * GetFile class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __GET_FILE_H__ +#define __GET_FILE_H__ + +#include "FlowFileRecord.h" +#include "core/Processor.h" +#include "core/ProcessSession.h" +#include "core/core.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +// GetFile Class +class GetFile : public core::Processor { + public: + // Constructor + /*! + * Create a new processor + */ + GetFile(std::string name, uuid_t uuid = NULL) + : Processor(name, uuid) { + logger_ = logging::Logger::getLogger(); + _directory = "."; + _recursive = true; + _keepSourceFile = false; + _minAge = 0; + _maxAge = 0; + _minSize = 0; + _maxSize = 0; + _ignoreHiddenFile = true; + _pollInterval = 0; + _batchSize = 10; + _lastDirectoryListingTime = getTimeMillis(); + _fileFilter = "[^\\.].*"; + } + // Destructor + virtual ~GetFile() { + } + // Processor Name + static const std::string ProcessorName; + // Supported Properties + static core::Property Directory; + static core::Property Recurse; + static core::Property KeepSourceFile; + static core::Property MinAge; + static core::Property MaxAge; + static core::Property MinSize; + static core::Property MaxSize; + static core::Property IgnoreHiddenFile; + static core::Property PollInterval; + static core::Property BatchSize; + static core::Property FileFilter; + // Supported Relationships + static core::Relationship Success; + + public: + // OnTrigger method, implemented by NiFi GetFile + virtual void onTrigger( + core::ProcessContext *context, + core::ProcessSession *session); + // Initialize, over write by NiFi GetFile + virtual void initialize(void); + // perform directory listing + void performListing(std::string dir); + + protected: + + private: + // Logger + std::shared_ptr<logging::Logger> logger_; + // Queue for store directory list + std::queue<std::string> _dirList; + // Get Listing size + uint64_t getListingSize() { + std::lock_guard<std::mutex> lock(mutex_); + return _dirList.size(); + } + // Whether the directory listing is empty + bool isListingEmpty(); + // Put full path file name into directory listing + void putListing(std::string fileName); + // Poll directory listing for files + void pollListing(std::queue<std::string> &list, int maxSize); + // Check whether file can be added to the directory listing + bool acceptFile(std::string fullName, std::string name); + // Mutex for protection of the directory listing + std::mutex mutex_; + std::string _directory; + bool _recursive; + bool _keepSourceFile; + int64_t _minAge; + int64_t _maxAge; + int64_t _minSize; + int64_t _maxSize; + bool _ignoreHiddenFile; + int64_t _pollInterval; + int64_t _batchSize; + uint64_t _lastDirectoryListingTime; + std::string _fileFilter; +}; + +} /* namespace processors */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/processors/ListenHTTP.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/ListenHTTP.h b/libminifi/include/processors/ListenHTTP.h new file mode 100644 index 0000000..adaefb1 --- /dev/null +++ b/libminifi/include/processors/ListenHTTP.h @@ -0,0 +1,126 @@ +/** + * @file ListenHTTP.h + * ListenHTTP class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __LISTEN_HTTP_H__ +#define __LISTEN_HTTP_H__ + +#include <memory> +#include <regex> + +#include <CivetServer.h> + +#include "FlowFileRecord.h" +#include "core/Processor.h" +#include "core/ProcessSession.h" +#include "core/core.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +// ListenHTTP Class +class ListenHTTP : public core::Processor { + public: + + // Constructor + /*! + * Create a new processor + */ + ListenHTTP(std::string name, uuid_t uuid = NULL) + : Processor(name, uuid) { + _logger = logging::Logger::getLogger(); + } + // Destructor + virtual ~ListenHTTP(); + // Processor Name + static const std::string ProcessorName; + // Supported Properties + static core::Property BasePath; + static core::Property Port; + static core::Property AuthorizedDNPattern; + static core::Property SSLCertificate; + static core::Property SSLCertificateAuthority; + static core::Property SSLVerifyPeer; + static core::Property SSLMinimumVersion; + static core::Property HeadersAsAttributesRegex; + // Supported Relationships + static core::Relationship Success; + + void onTrigger(core::ProcessContext *context, + core::ProcessSession *session); + void initialize(); + void onSchedule( + core::ProcessContext *context, + core::ProcessSessionFactory *sessionFactory); + + // HTTP request handler + class Handler : public CivetHandler { + public: + Handler( + core::ProcessContext *context, + core::ProcessSessionFactory *sessionFactory, + std::string &&authDNPattern, std::string &&headersAsAttributesPattern); + bool handlePost(CivetServer *server, struct mg_connection *conn); + + private: + // Send HTTP 500 error response to client + void sendErrorResponse(struct mg_connection *conn); + // Logger + std::shared_ptr<logging::Logger> _logger; + + std::regex _authDNRegex; + std::regex _headersAsAttributesRegex; + core::ProcessContext *_processContext; + core::ProcessSessionFactory *_processSessionFactory; + }; + + // Write callback for transferring data from HTTP request to content repo + class WriteCallback : public OutputStreamCallback { + public: + WriteCallback(struct mg_connection *conn, + const struct mg_request_info *reqInfo); + void process(std::ofstream *stream); + + private: + // Logger + std::shared_ptr<logging::Logger> _logger; + + struct mg_connection *_conn; + const struct mg_request_info *_reqInfo; + }; + + protected: + + private: + // Logger + std::shared_ptr<logging::Logger> _logger; + + std::unique_ptr<CivetServer> _server; + std::unique_ptr<Handler> _handler; +}; + +} /* namespace processors */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/processors/ListenSyslog.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/ListenSyslog.h b/libminifi/include/processors/ListenSyslog.h new file mode 100644 index 0000000..1e1e11f --- /dev/null +++ b/libminifi/include/processors/ListenSyslog.h @@ -0,0 +1,216 @@ +/** + * @file ListenSyslog.h + * ListenSyslog class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __LISTEN_SYSLOG_H__ +#define __LISTEN_SYSLOG_H__ + +#include <stdio.h> +#include <unistd.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <errno.h> +#include <sys/select.h> +#include <sys/time.h> +#include <sys/types.h> +#include <chrono> +#include <thread> +#include "FlowFileRecord.h" +#include "core/Processor.h" +#include "core/ProcessSession.h" +#include "core/core.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +// SyslogEvent +typedef struct { + uint8_t *payload; + uint64_t len; +} SysLogEvent; + +// ListenSyslog Class +class ListenSyslog : public core::Processor { + public: + // Constructor + /*! + * Create a new processor + */ + ListenSyslog(std::string name, uuid_t uuid = NULL) + : Processor(name, uuid) { + logger_ = logging::Logger::getLogger(); + _eventQueueByteSize = 0; + _serverSocket = 0; + _recvBufSize = 65507; + _maxSocketBufSize = 1024 * 1024; + _maxConnections = 2; + _maxBatchSize = 1; + _messageDelimiter = "\n"; + _protocol = "UDP"; + _port = 514; + _parseMessages = false; + _serverSocket = 0; + _maxFds = 0; + FD_ZERO(&_readfds); + _thread = NULL; + _resetServerSocket = false; + _serverTheadRunning = false; + } + // Destructor + virtual ~ListenSyslog() { + _serverTheadRunning = false; + if (this->_thread) + delete this->_thread; + // need to reset the socket + std::vector<int>::iterator it; + for (it = _clientSockets.begin(); it != _clientSockets.end(); ++it) { + int clientSocket = *it; + close(clientSocket); + } + _clientSockets.clear(); + if (_serverSocket > 0) { + logger_->log_info("ListenSysLog Server socket %d close", _serverSocket); + close(_serverSocket); + _serverSocket = 0; + } + } + // Processor Name + static const std::string ProcessorName; + // Supported Properties + static core::Property RecvBufSize; + static core::Property MaxSocketBufSize; + static core::Property MaxConnections; + static core::Property MaxBatchSize; + static core::Property MessageDelimiter; + static core::Property ParseMessages; + static core::Property Protocol; + static core::Property Port; + // Supported Relationships + static core::Relationship Success; + static core::Relationship Invalid; + // Nest Callback Class for write stream + class WriteCallback : public OutputStreamCallback { + public: + WriteCallback(char *data, uint64_t size) + : _data(data), + _dataSize(size) { + } + char *_data; + uint64_t _dataSize; + void process(std::ofstream *stream) { + if (_data && _dataSize > 0) + stream->write(_data, _dataSize); + } + }; + + public: + // OnTrigger method, implemented by NiFi ListenSyslog + virtual void onTrigger( + core::ProcessContext *context, + core::ProcessSession *session); + // Initialize, over write by NiFi ListenSyslog + virtual void initialize(void); + + protected: + + private: + // Logger + std::shared_ptr<logging::Logger> logger_; + // Run function for the thread + static void run(ListenSyslog *process); + // Run Thread + void runThread(); + // Queue for store syslog event + std::queue<SysLogEvent> _eventQueue; + // Size of Event queue in bytes + uint64_t _eventQueueByteSize; + // Get event queue size + uint64_t getEventQueueSize() { + std::lock_guard<std::mutex> lock(mutex_); + return _eventQueue.size(); + } + // Get event queue byte size + uint64_t getEventQueueByteSize() { + std::lock_guard<std::mutex> lock(mutex_); + return _eventQueueByteSize; + } + // Whether the event queue is empty + bool isEventQueueEmpty() { + std::lock_guard<std::mutex> lock(mutex_); + return _eventQueue.empty(); + } + // Put event into directory listing + void putEvent(uint8_t *payload, uint64_t len) { + std::lock_guard<std::mutex> lock(mutex_); + SysLogEvent event; + event.payload = payload; + event.len = len; + _eventQueue.push(event); + _eventQueueByteSize += len; + } + // Read \n terminated line from TCP socket + int readline(int fd, char *bufptr, size_t len); + // start server socket and handling client socket + void startSocketThread(); + // Poll event + void pollEvent(std::queue<SysLogEvent> &list, int maxSize) { + std::lock_guard<std::mutex> lock(mutex_); + + while (!_eventQueue.empty() && (maxSize == 0 || list.size() < maxSize)) { + SysLogEvent event = _eventQueue.front(); + _eventQueue.pop(); + _eventQueueByteSize -= event.len; + list.push(event); + } + return; + } + // Mutex for protection of the directory listing + std::mutex mutex_; + int64_t _recvBufSize; + int64_t _maxSocketBufSize; + int64_t _maxConnections; + int64_t _maxBatchSize; + std::string _messageDelimiter; + std::string _protocol; + int64_t _port; + bool _parseMessages; + int _serverSocket; + std::vector<int> _clientSockets; + int _maxFds; + fd_set _readfds; + // thread + std::thread *_thread; + // whether to reset the server socket + bool _resetServerSocket; + bool _serverTheadRunning; + // buffer for read socket + uint8_t _buffer[2048]; +}; + +} /* namespace processors */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/processors/LogAttribute.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/LogAttribute.h b/libminifi/include/processors/LogAttribute.h new file mode 100644 index 0000000..37c0ec3 --- /dev/null +++ b/libminifi/include/processors/LogAttribute.h @@ -0,0 +1,130 @@ +/** + * @file LogAttribute.h + * LogAttribute class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __LOG_ATTRIBUTE_H__ +#define __LOG_ATTRIBUTE_H__ + +#include "FlowFileRecord.h" +#include "core/Processor.h" +#include "core/ProcessSession.h" +#include "core/core.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +// LogAttribute Class +class LogAttribute : public core::Processor { + public: + // Constructor + /*! + * Create a new processor + */ + LogAttribute(std::string name, uuid_t uuid = NULL) + : Processor(name, uuid) { + logger_ = logging::Logger::getLogger(); + } + // Destructor + virtual ~LogAttribute() { + } + // Processor Name + static const std::string ProcessorName; + // Supported Properties + static core::Property LogLevel; + static core::Property AttributesToLog; + static core::Property AttributesToIgnore; + static core::Property LogPayload; + static core::Property LogPrefix; + // Supported Relationships + static core::Relationship Success; + enum LogAttrLevel { + LogAttrLevelTrace, + LogAttrLevelDebug, + LogAttrLevelInfo, + LogAttrLevelWarn, + LogAttrLevelError + }; + // Convert log level from string to enum + bool logLevelStringToEnum(std::string logStr, LogAttrLevel &level) { + if (logStr == "trace") { + level = LogAttrLevelTrace; + return true; + } else if (logStr == "debug") { + level = LogAttrLevelDebug; + return true; + } else if (logStr == "info") { + level = LogAttrLevelInfo; + return true; + } else if (logStr == "warn") { + level = LogAttrLevelWarn; + return true; + } else if (logStr == "error") { + level = LogAttrLevelError; + return true; + } else + return false; + } + // Nest Callback Class for read stream + class ReadCallback : public InputStreamCallback { + public: + ReadCallback(uint64_t size) { + _bufferSize = size; + _buffer = new char[_bufferSize]; + } + ~ReadCallback() { + if (_buffer) + delete[] _buffer; + } + void process(std::ifstream *stream) { + + stream->read(_buffer, _bufferSize); + if (!stream) + _readSize = stream->gcount(); + else + _readSize = _bufferSize; + } + char *_buffer; + uint64_t _bufferSize; + uint64_t _readSize; + }; + + public: + // OnTrigger method, implemented by NiFi LogAttribute + virtual void onTrigger( + core::ProcessContext *context, + core::ProcessSession *session); + // Initialize, over write by NiFi LogAttribute + virtual void initialize(void); + + protected: + + private: + // Logger + std::shared_ptr<logging::Logger> logger_; +}; + +} /* namespace processors */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/processors/PutFile.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/PutFile.h b/libminifi/include/processors/PutFile.h new file mode 100644 index 0000000..7653fac --- /dev/null +++ b/libminifi/include/processors/PutFile.h @@ -0,0 +1,101 @@ +/** + * @file PutFile.h + * PutFile class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __PUT_FILE_H__ +#define __PUT_FILE_H__ + +#include "FlowFileRecord.h" +#include "core/Processor.h" +#include "core/ProcessSession.h" +#include "core/core.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +// PutFile Class +class PutFile : public core::Processor { + public: + + static const std::string CONFLICT_RESOLUTION_STRATEGY_REPLACE; + static const std::string CONFLICT_RESOLUTION_STRATEGY_IGNORE; + static const std::string CONFLICT_RESOLUTION_STRATEGY_FAIL; + + // Constructor + /*! + * Create a new processor + */ + PutFile(std::string name, uuid_t uuid = NULL) + : core::Processor(name, uuid) { + logger_ = logging::Logger::getLogger(); + } + // Destructor + virtual ~PutFile() { + } + // Processor Name + static const std::string ProcessorName; + // Supported Properties + static core::Property Directory; + static core::Property ConflictResolution; + // Supported Relationships + static core::Relationship Success; + static core::Relationship Failure; + + // OnTrigger method, implemented by NiFi PutFile + virtual void onTrigger( + core::ProcessContext *context, + core::ProcessSession *session); + // Initialize, over write by NiFi PutFile + virtual void initialize(void); + + class ReadCallback : public InputStreamCallback { + public: + ReadCallback(const std::string &tmpFile, const std::string &destFile); + ~ReadCallback(); + virtual void process(std::ifstream *stream); + bool commit(); + + private: + std::shared_ptr<logging::Logger> logger_; + std::ofstream _tmpFileOs; + bool _writeSucceeded = false; + std::string _tmpFile; + std::string _destFile; + }; + + protected: + + private: + // Logger + std::shared_ptr<logging::Logger> logger_; + + bool putFile(core::ProcessSession *session, + std::shared_ptr<FlowFileRecord> flowFile, + const std::string &tmpFile, const std::string &destFile); +}; + +} /* namespace processors */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/processors/RealTimeDataCollector.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/RealTimeDataCollector.h b/libminifi/include/processors/RealTimeDataCollector.h new file mode 100644 index 0000000..41bd814 --- /dev/null +++ b/libminifi/include/processors/RealTimeDataCollector.h @@ -0,0 +1,145 @@ +/** + * @file RealTimeDataCollector.h + * RealTimeDataCollector class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __REAL_TIME_DATA_COLLECTOR_H__ +#define __REAL_TIME_DATA_COLLECTOR_H__ + +#include <stdio.h> +#include <unistd.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <fcntl.h> +#include <netdb.h> +#include <string> +#include <errno.h> +#include "FlowFileRecord.h" +#include "core/Processor.h" +#include "core/ProcessSession.h" +#include "core/core.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +// RealTimeDataCollector Class +class RealTimeDataCollector : public core::Processor { + public: + // Constructor + /*! + * Create a new processor + */ + explicit RealTimeDataCollector(std::string name, uuid_t uuid = NULL) + : core::Processor(name, uuid) { + _realTimeSocket = 0; + _batchSocket = 0; + logger_ = logging::Logger::getLogger(); + _firstInvoking = false; + _realTimeAccumulated = 0; + _batchAcccumulated = 0; + _queuedDataSize = 0; + } + // Destructor + virtual ~RealTimeDataCollector() { + if (_realTimeSocket) + close(_realTimeSocket); + if (_batchSocket) + close(_batchSocket); + if (_fileStream.is_open()) + _fileStream.close(); + } + // Processor Name + static const std::string ProcessorName; + // Supported Properties + static core::Property REALTIMESERVERNAME; + static core::Property REALTIMESERVERPORT; + static core::Property BATCHSERVERNAME; + static core::Property BATCHSERVERPORT; + static core::Property FILENAME; + static core::Property ITERATION; + static core::Property REALTIMEMSGID; + static core::Property BATCHMSGID; + static core::Property REALTIMEINTERVAL; + static core::Property BATCHINTERVAL; + static core::Property BATCHMAXBUFFERSIZE; + // Supported Relationships + static core::Relationship Success; + // Connect to the socket + int connectServer(const char *host, uint16_t port); + int sendData(int socket, const char *buf, int buflen); + void onTriggerRealTime( + core::ProcessContext *context, + core::ProcessSession *session); + void onTriggerBatch(core::ProcessContext *context, + core::ProcessSession *session); + + public: + // OnTrigger method, implemented by NiFi RealTimeDataCollector + virtual void onTrigger( + core::ProcessContext *context, + core::ProcessSession *session); + // Initialize, over write by NiFi RealTimeDataCollector + virtual void initialize(void); + + protected: + + private: + // realtime server Name + std::string _realTimeServerName; + int64_t _realTimeServerPort; + std::string _batchServerName; + int64_t _batchServerPort; + int64_t _realTimeInterval; + int64_t _batchInterval; + int64_t _batchMaxBufferSize; + // Match pattern for Real time Message ID + std::vector<std::string> _realTimeMsgID; + // Match pattern for Batch Message ID + std::vector<std::string> _batchMsgID; + // file for which the realTime collector will tail + std::string _fileName; + // Whether we need to iterate from the beginning for demo + bool _iteration; + int _realTimeSocket; + int _batchSocket; + // Logger + std::shared_ptr<logging::Logger> logger_; + // Mutex for protection + std::mutex mutex_; + // Queued data size + uint64_t _queuedDataSize; + // Queue for the batch process + std::queue<std::string> _queue; + std::thread::id _realTimeThreadId; + std::thread::id _batchThreadId; + std::atomic<bool> _firstInvoking; + int64_t _realTimeAccumulated; + int64_t _batchAcccumulated; + std::ifstream _fileStream; +}; + +} /* namespace processors */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/processors/TailFile.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/TailFile.h b/libminifi/include/processors/TailFile.h new file mode 100644 index 0000000..5be76e4 --- /dev/null +++ b/libminifi/include/processors/TailFile.h @@ -0,0 +1,105 @@ +/** + * @file TailFile.h + * TailFile class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __TAIL_FILE_H__ +#define __TAIL_FILE_H__ + +#include "FlowFileRecord.h" +#include "core/Processor.h" +#include "core/ProcessSession.h" +#include "core/core.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +// TailFile Class +class TailFile : public core::Processor { + public: + // Constructor + /*! + * Create a new processor + */ + TailFile(std::string name, uuid_t uuid = NULL) + : core::Processor(name, uuid) { + logger_ = logging::Logger::getLogger(); + _stateRecovered = false; + } + // Destructor + virtual ~TailFile() { + storeState(); + } + // Processor Name + static const std::string ProcessorName; + // Supported Properties + static core::Property FileName; + static core::Property StateFile; + // Supported Relationships + static core::Relationship Success; + + public: + // OnTrigger method, implemented by NiFi TailFile + virtual void onTrigger( + core::ProcessContext *context, + core::ProcessSession *session); + // Initialize, over write by NiFi TailFile + virtual void initialize(void); + // recoverState + void recoverState(); + // storeState + void storeState(); + + protected: + + private: + // Logger + std::shared_ptr<logging::Logger> logger_; + std::string _fileLocation; + // Property Specified Tailed File Name + std::string _fileName; + // File to save state + std::string _stateFile; + // State related to the tailed file + std::string _currentTailFileName; + uint64_t _currentTailFilePosition; + bool _stateRecovered; + uint64_t _currentTailFileCreatedTime; + // Utils functions for parse state file + std::string trimLeft(const std::string& s); + std::string trimRight(const std::string& s); + void parseStateFileLine(char *buf); + void checkRollOver(); + +}; + +// Matched File Item for Roll over check +typedef struct { + std::string fileName; + uint64_t modifiedTime; +} TailMatchedFileItem; + +} /* namespace processors */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/properties/Configure.h ---------------------------------------------------------------------- diff --git a/libminifi/include/properties/Configure.h b/libminifi/include/properties/Configure.h new file mode 100644 index 0000000..c0d9bd4 --- /dev/null +++ b/libminifi/include/properties/Configure.h @@ -0,0 +1,131 @@ +/** + * @file Configure.h + * Configure class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __CONFIGURE_H__ +#define __CONFIGURE_H__ + +#include <stdio.h> +#include <string> +#include <map> +#include <stdlib.h> +#include <errno.h> +#include <iostream> +#include <fstream> +#include "core/core.h" +#include "core/logging/Logger.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { + +class Configure { + public: + // Get the singleton logger instance + static Configure * getConfigure() { + if (!configure_) { + configure_ = new Configure(); + } + return configure_; + } + // nifi.flow.configuration.file + static const char *nifi_flow_configuration_file; + static const char *nifi_administrative_yield_duration; + static const char *nifi_bored_yield_duration; + static const char *nifi_graceful_shutdown_seconds; + static const char *nifi_log_level; + static const char *nifi_server_name; + static const char *nifi_configuration_class_name; + static const char *nifi_flow_repository_class_name; + static const char *nifi_provenance_repository_class_name; + static const char *nifi_server_port; + static const char *nifi_server_report_interval; + static const char *nifi_provenance_repository_max_storage_time; + static const char *nifi_provenance_repository_max_storage_size; + static const char *nifi_provenance_repository_directory_default; + static const char *nifi_provenance_repository_enable; + static const char *nifi_flowfile_repository_max_storage_time; + static const char *nifi_flowfile_repository_max_storage_size; + static const char *nifi_flowfile_repository_directory_default; + static const char *nifi_flowfile_repository_enable; + static const char *nifi_remote_input_secure; + static const char *nifi_security_need_ClientAuth; + static const char *nifi_security_client_certificate; + static const char *nifi_security_client_private_key; + static const char *nifi_security_client_pass_phrase; + static const char *nifi_security_client_ca_certificate; + + // Clear the load config + void clear() { + std::lock_guard<std::mutex> lock(mutex_); + properties_.clear(); + } + // Set the config value + void set(std::string key, std::string value) { + std::lock_guard<std::mutex> lock(mutex_); + properties_[key] = value; + } + // Check whether the config value existed + bool has(std::string key) { + std::lock_guard<std::mutex> lock(mutex_); + return (properties_.find(key) != properties_.end()); + } + // Get the config value + bool get(std::string key, std::string &value); + // Parse one line in configure file like key=value + void parseConfigureFileLine(char *buf); + // Load Configure File + void loadConfigureFile(const char *fileName); + // Set the determined MINIFI_HOME + void setHome(std::string minifiHome) { + minifi_home_ = minifiHome; + } + + // Get the determined MINIFI_HOME + std::string getHome() { + return minifi_home_; + } + // Parse Command Line + void parseCommandLine(int argc, char **argv); + + private: + // Mutex for protection + std::mutex mutex_; + // Logger + std::shared_ptr<logging::Logger> logger_; + // Home location for this executable + std::string minifi_home_; + + Configure() { + logger_ = logging::Logger::getLogger(); + } + virtual ~Configure() { + + } + static Configure *configure_; + + protected: + std::map<std::string, std::string> properties_; +}; + +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ +#endif
