http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/FlowFileRepository.h ---------------------------------------------------------------------- diff --git a/libminifi/include/FlowFileRepository.h b/libminifi/include/FlowFileRepository.h deleted file mode 100644 index 50d2c41..0000000 --- a/libminifi/include/FlowFileRepository.h +++ /dev/null @@ -1,204 +0,0 @@ -/** - * @file FlowFileRepository - * Flow file repository class declaration - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __FLOWFILE_REPOSITORY_H__ -#define __FLOWFILE_REPOSITORY_H__ - -#include <ftw.h> -#include <uuid/uuid.h> -#include <atomic> -#include <cstdint> -#include <cstring> -#include <iostream> -#include <map> -#include <set> -#include <string> -#include <thread> -#include <vector> - -#include "Configure.h" -#include "Connection.h" -#include "FlowFileRecord.h" -#include "Logger.h" -#include "Property.h" -#include "ResourceClaim.h" -#include "io/Serializable.h" -#include "utils/TimeUtil.h" -#include "Repository.h" - -#define FLOWFILE_REPOSITORY_DIRECTORY "./flowfile_repository" -#define MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE (10*1024*1024) // 10M -#define MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME (600000) // 10 minute -#define FLOWFILE_REPOSITORY_PURGE_PERIOD (2500) // 2500 msec - -//! FlowFile Repository -class FlowFileRepository : public Repository -{ -public: - //! Constructor - /*! - * Create a new provenance repository - */ - FlowFileRepository() - : Repository(Repository::FLOWFILE, FLOWFILE_REPOSITORY_DIRECTORY, - MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME, MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, FLOWFILE_REPOSITORY_PURGE_PERIOD) - { - } - //! Destructor - virtual ~FlowFileRepository() { - } - //! Load Repo to Connections - void loadFlowFileToConnections(std::map<std::string, Connection *> *connectionMap); - -protected: - -private: - - // Prevent default copy constructor and assignment operation - // Only support pass by reference or pointer - FlowFileRepository(const FlowFileRepository &parent); - FlowFileRepository &operator=(const FlowFileRepository &parent); -}; - -//! FlowFile Event Record -class FlowFileEventRecord : protected Serializable -{ -public: - //! Constructor - /*! - * Create a new provenance event record - */ - FlowFileEventRecord() - : _entryDate(0), _lineageStartDate(0), _size(0), _offset(0) - { - _eventTime = getTimeMillis(); - logger_ = Logger::getLogger(); - } - - //! Destructor - virtual ~FlowFileEventRecord() { - } - //! Get Attributes - std::map<std::string, std::string> getAttributes() { - return _attributes; - } - //! Get Size - uint64_t getFileSize() { - return _size; - } - // ! Get Offset - uint64_t getFileOffset() { - return _offset; - } - // ! Get Entry Date - uint64_t getFlowFileEntryDate() { - return _entryDate; - } - // ! Get Lineage Start Date - uint64_t getlineageStartDate() { - return _lineageStartDate; - } - // ! Get Event Time - uint64_t getEventTime() { - return _eventTime; - } - //! Get FlowFileUuid - std::string getFlowFileUuid() - { - return _uuid; - } - //! Get ConnectionUuid - std::string getConnectionUuid() - { - return _uuidConnection; - } - //! Get content full path - std::string getContentFullPath() - { - return _contentFullPath; - } - //! Get LineageIdentifiers - std::set<std::string> getLineageIdentifiers() - { - return _lineageIdentifiers; - } - //! fromFlowFile - void fromFlowFile(FlowFileRecord *flow, std::string uuidConnection) - { - _entryDate = flow->getEntryDate(); - _lineageStartDate = flow->getlineageStartDate(); - _lineageIdentifiers = flow->getlineageIdentifiers(); - _uuid = flow->getUUIDStr(); - _attributes = flow->getAttributes(); - _size = flow->getSize(); - _offset = flow->getOffset(); - _uuidConnection = uuidConnection; - if (flow->getResourceClaim()) - { - _contentFullPath = flow->getResourceClaim()->getContentFullPath(); - } - } - //! Serialize and Persistent to the repository - bool Serialize(FlowFileRepository *repo); - //! DeSerialize - bool DeSerialize(const uint8_t *buffer, const int bufferSize); - //! DeSerialize - bool DeSerialize(DataStream &stream) - { - return DeSerialize(stream.getBuffer(),stream.getSize()); - } - //! DeSerialize - bool DeSerialize(FlowFileRepository *repo, std::string key); - -protected: - - //! Date at which the event was created - uint64_t _eventTime; - //! Date at which the flow file entered the flow - uint64_t _entryDate; - //! Date at which the origin of this flow file entered the flow - uint64_t _lineageStartDate; - //! Size in bytes of the data corresponding to this flow file - uint64_t _size; - //! flow uuid - std::string _uuid; - //! connection uuid - std::string _uuidConnection; - //! Offset to the content - uint64_t _offset; - //! Full path to the content - std::string _contentFullPath; - //! Attributes key/values pairs for the flow record - std::map<std::string, std::string> _attributes; - //! UUID string for all parents - std::set<std::string> _lineageIdentifiers; - -private: - - //! Logger - std::shared_ptr<Logger> logger_; - - // Prevent default copy constructor and assignment operation - // Only support pass by reference or pointer - FlowFileEventRecord(const FlowFileEventRecord &parent); - FlowFileEventRecord &operator=(const FlowFileEventRecord &parent); - -}; - -#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/GenerateFlowFile.h ---------------------------------------------------------------------- diff --git a/libminifi/include/GenerateFlowFile.h b/libminifi/include/GenerateFlowFile.h deleted file mode 100644 index 27aa43b..0000000 --- a/libminifi/include/GenerateFlowFile.h +++ /dev/null @@ -1,87 +0,0 @@ -/** - * @file GenerateFlowFile.h - * GenerateFlowFile class declaration - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __GENERATE_FLOW_FILE_H__ -#define __GENERATE_FLOW_FILE_H__ - -#include "FlowFileRecord.h" -#include "Processor.h" -#include "ProcessSession.h" - -//! GenerateFlowFile Class -class GenerateFlowFile : public Processor -{ -public: - //! Constructor - /*! - * Create a new processor - */ - GenerateFlowFile(std::string name, uuid_t uuid = NULL) - : Processor(name, uuid) - { - _data = NULL; - _dataSize = 0; - } - //! Destructor - virtual ~GenerateFlowFile() - { - if (_data) - delete[] _data; - } - //! Processor Name - static const std::string ProcessorName; - //! Supported Properties - static Property FileSize; - static Property BatchSize; - static Property DataFormat; - static Property UniqueFlowFiles; - static const char *DATA_FORMAT_BINARY; - static const char *DATA_FORMAT_TEXT; - //! Supported Relationships - static Relationship Success; - //! Nest Callback Class for write stream - class WriteCallback : public OutputStreamCallback - { - public: - WriteCallback(char *data, uint64_t size) - : _data(data), _dataSize(size) {} - char *_data; - uint64_t _dataSize; - void process(std::ofstream *stream) { - if (_data && _dataSize > 0) - stream->write(_data, _dataSize); - } - }; - -public: - //! OnTrigger method, implemented by NiFi GenerateFlowFile - virtual void onTrigger(ProcessContext *context, ProcessSession *session); - //! Initialize, over write by NiFi GenerateFlowFile - virtual void initialize(void); - -protected: - -private: - //! Generated data - char * _data; - //! Size of the generate data - uint64_t _dataSize; -}; - -#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/GetFile.h ---------------------------------------------------------------------- diff --git a/libminifi/include/GetFile.h b/libminifi/include/GetFile.h deleted file mode 100644 index 8f8068c..0000000 --- a/libminifi/include/GetFile.h +++ /dev/null @@ -1,117 +0,0 @@ -/** - * @file GetFile.h - * GetFile class declaration - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __GET_FILE_H__ -#define __GET_FILE_H__ - -#include "FlowFileRecord.h" -#include "Processor.h" -#include "ProcessSession.h" - -//! GetFile Class -class GetFile : public Processor -{ -public: - //! Constructor - /*! - * Create a new processor - */ - GetFile(std::string name, uuid_t uuid = NULL) - : Processor(name, uuid) - { - logger_ = Logger::getLogger(); - _directory = "."; - _recursive = true; - _keepSourceFile = false; - _minAge = 0; - _maxAge = 0; - _minSize = 0; - _maxSize = 0; - _ignoreHiddenFile = true; - _pollInterval = 0; - _batchSize = 10; - _lastDirectoryListingTime = getTimeMillis(); - _fileFilter = "[^\\.].*"; - } - //! Destructor - virtual ~GetFile() - { - } - //! Processor Name - static const std::string ProcessorName; - //! Supported Properties - static Property Directory; - static Property Recurse; - static Property KeepSourceFile; - static Property MinAge; - static Property MaxAge; - static Property MinSize; - static Property MaxSize; - static Property IgnoreHiddenFile; - static Property PollInterval; - static Property BatchSize; - static Property FileFilter; - //! Supported Relationships - static Relationship Success; - -public: - //! OnTrigger method, implemented by NiFi GetFile - virtual void onTrigger(ProcessContext *context, ProcessSession *session); - //! Initialize, over write by NiFi GetFile - virtual void initialize(void); - //! perform directory listing - void performListing(std::string dir); - -protected: - -private: - //! Logger - std::shared_ptr<Logger> logger_; - //! Queue for store directory list - std::queue<std::string> _dirList; - //! Get Listing size - uint64_t getListingSize() { - std::lock_guard<std::mutex> lock(_mtx); - return _dirList.size(); - } - //! Whether the directory listing is empty - bool isListingEmpty(); - //! Put full path file name into directory listing - void putListing(std::string fileName); - //! Poll directory listing for files - void pollListing(std::queue<std::string> &list, int maxSize); - //! Check whether file can be added to the directory listing - bool acceptFile(std::string fullName, std::string name); - //! Mutex for protection of the directory listing - std::mutex _mtx; - std::string _directory; - bool _recursive; - bool _keepSourceFile; - int64_t _minAge; - int64_t _maxAge; - int64_t _minSize; - int64_t _maxSize; - bool _ignoreHiddenFile; - int64_t _pollInterval; - int64_t _batchSize; - uint64_t _lastDirectoryListingTime; - std::string _fileFilter; -}; - -#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/ListenHTTP.h ---------------------------------------------------------------------- diff --git a/libminifi/include/ListenHTTP.h b/libminifi/include/ListenHTTP.h deleted file mode 100644 index 5a467f2..0000000 --- a/libminifi/include/ListenHTTP.h +++ /dev/null @@ -1,116 +0,0 @@ -/** - * @file ListenHTTP.h - * ListenHTTP class declaration - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __LISTEN_HTTP_H__ -#define __LISTEN_HTTP_H__ - -#include <memory> -#include <regex> - -#include <CivetServer.h> - -#include "FlowFileRecord.h" -#include "Processor.h" -#include "ProcessSession.h" - - -//! ListenHTTP Class -class ListenHTTP : public Processor -{ -public: - - //! Constructor - /*! - * Create a new processor - */ - ListenHTTP(std::string name, uuid_t uuid = NULL) - : Processor(name, uuid) - { - _logger = Logger::getLogger(); - } - //! Destructor - ~ListenHTTP() - { - } - //! Processor Name - static const std::string ProcessorName; - //! Supported Properties - static Property BasePath; - static Property Port; - static Property AuthorizedDNPattern; - static Property SSLCertificate; - static Property SSLCertificateAuthority; - static Property SSLVerifyPeer; - static Property SSLMinimumVersion; - static Property HeadersAsAttributesRegex; - //! Supported Relationships - static Relationship Success; - - void onTrigger(ProcessContext *context, ProcessSession *session); - void initialize(); - void onSchedule(ProcessContext *context, ProcessSessionFactory *sessionFactory); - - //! HTTP request handler - class Handler : public CivetHandler - { - public: - Handler(ProcessContext *context, - ProcessSessionFactory *sessionFactory, - std::string &&authDNPattern, - std::string &&headersAsAttributesPattern); - bool handlePost(CivetServer *server, struct mg_connection *conn); - - private: - //! Send HTTP 500 error response to client - void sendErrorResponse(struct mg_connection *conn); - //! Logger - std::shared_ptr<Logger> _logger; - - std::regex _authDNRegex; - std::regex _headersAsAttributesRegex; - ProcessContext *_processContext; - ProcessSessionFactory *_processSessionFactory; - }; - - //! Write callback for transferring data from HTTP request to content repo - class WriteCallback : public OutputStreamCallback - { - public: - WriteCallback(struct mg_connection *conn, const struct mg_request_info *reqInfo); - void process(std::ofstream *stream); - - private: - //! Logger - std::shared_ptr<Logger> _logger; - - struct mg_connection *_conn; - const struct mg_request_info *_reqInfo; - }; - -protected: - -private: - //! Logger - std::shared_ptr<Logger> _logger; - - std::unique_ptr<CivetServer> _server; - std::unique_ptr<Handler> _handler; -}; - -#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/ListenSyslog.h ---------------------------------------------------------------------- diff --git a/libminifi/include/ListenSyslog.h b/libminifi/include/ListenSyslog.h deleted file mode 100644 index 339dbc1..0000000 --- a/libminifi/include/ListenSyslog.h +++ /dev/null @@ -1,209 +0,0 @@ -/** - * @file ListenSyslog.h - * ListenSyslog class declaration - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __LISTEN_SYSLOG_H__ -#define __LISTEN_SYSLOG_H__ - -#include <stdio.h> -#include <unistd.h> -#include <sys/types.h> -#include <sys/socket.h> -#include <netinet/in.h> -#include <arpa/inet.h> -#include <errno.h> -#include <sys/select.h> -#include <sys/time.h> -#include <sys/types.h> -#include <chrono> -#include <thread> -#include "FlowFileRecord.h" -#include "Processor.h" -#include "ProcessSession.h" - -//! SyslogEvent -typedef struct { - uint8_t *payload; - uint64_t len; -} SysLogEvent; - -//! ListenSyslog Class -class ListenSyslog : public Processor -{ -public: - //! Constructor - /*! - * Create a new processor - */ - ListenSyslog(std::string name, uuid_t uuid = NULL) - : Processor(name, uuid) - { - logger_ = Logger::getLogger(); - _eventQueueByteSize = 0; - _serverSocket = 0; - _recvBufSize = 65507; - _maxSocketBufSize = 1024*1024; - _maxConnections = 2; - _maxBatchSize = 1; - _messageDelimiter = "\n"; - _protocol = "UDP"; - _port = 514; - _parseMessages = false; - _serverSocket = 0; - _maxFds = 0; - FD_ZERO(&_readfds); - _thread = NULL; - _resetServerSocket = false; - _serverTheadRunning = false; - } - //! Destructor - virtual ~ListenSyslog() - { - _serverTheadRunning = false; - if (this->_thread) - delete this->_thread; - // need to reset the socket - std::vector<int>::iterator it; - for (it = _clientSockets.begin(); it != _clientSockets.end(); ++it) - { - int clientSocket = *it; - close(clientSocket); - } - _clientSockets.clear(); - if (_serverSocket > 0) - { - logger_->log_info("ListenSysLog Server socket %d close", _serverSocket); - close(_serverSocket); - _serverSocket = 0; - } - } - //! Processor Name - static const std::string ProcessorName; - //! Supported Properties - static Property RecvBufSize; - static Property MaxSocketBufSize; - static Property MaxConnections; - static Property MaxBatchSize; - static Property MessageDelimiter; - static Property ParseMessages; - static Property Protocol; - static Property Port; - //! Supported Relationships - static Relationship Success; - static Relationship Invalid; - //! Nest Callback Class for write stream - class WriteCallback : public OutputStreamCallback - { - public: - WriteCallback(char *data, uint64_t size) - : _data(data), _dataSize(size) {} - char *_data; - uint64_t _dataSize; - void process(std::ofstream *stream) { - if (_data && _dataSize > 0) - stream->write(_data, _dataSize); - } - }; - -public: - //! OnTrigger method, implemented by NiFi ListenSyslog - virtual void onTrigger(ProcessContext *context, ProcessSession *session); - //! Initialize, over write by NiFi ListenSyslog - virtual void initialize(void); - -protected: - -private: - //! Logger - std::shared_ptr<Logger> logger_; - //! Run function for the thread - static void run(ListenSyslog *process); - //! Run Thread - void runThread(); - //! Queue for store syslog event - std::queue<SysLogEvent> _eventQueue; - //! Size of Event queue in bytes - uint64_t _eventQueueByteSize; - //! Get event queue size - uint64_t getEventQueueSize() { - std::lock_guard<std::mutex> lock(_mtx); - return _eventQueue.size(); - } - //! Get event queue byte size - uint64_t getEventQueueByteSize() { - std::lock_guard<std::mutex> lock(_mtx); - return _eventQueueByteSize; - } - //! Whether the event queue is empty - bool isEventQueueEmpty() - { - std::lock_guard<std::mutex> lock(_mtx); - return _eventQueue.empty(); - } - //! Put event into directory listing - void putEvent(uint8_t *payload, uint64_t len) - { - std::lock_guard<std::mutex> lock(_mtx); - SysLogEvent event; - event.payload = payload; - event.len = len; - _eventQueue.push(event); - _eventQueueByteSize += len; - } - //! Read \n terminated line from TCP socket - int readline( int fd, char *bufptr, size_t len ); - //! start server socket and handling client socket - void startSocketThread(); - //! Poll event - void pollEvent(std::queue<SysLogEvent> &list, int maxSize) - { - std::lock_guard<std::mutex> lock(_mtx); - - while (!_eventQueue.empty() && (maxSize == 0 || list.size() < maxSize)) - { - SysLogEvent event = _eventQueue.front(); - _eventQueue.pop(); - _eventQueueByteSize -= event.len; - list.push(event); - } - return; - } - //! Mutex for protection of the directory listing - std::mutex _mtx; - int64_t _recvBufSize; - int64_t _maxSocketBufSize; - int64_t _maxConnections; - int64_t _maxBatchSize; - std::string _messageDelimiter; - std::string _protocol; - int64_t _port; - bool _parseMessages; - int _serverSocket; - std::vector<int> _clientSockets; - int _maxFds; - fd_set _readfds; - //! thread - std::thread *_thread; - //! whether to reset the server socket - bool _resetServerSocket; - bool _serverTheadRunning; - //! buffer for read socket - uint8_t _buffer[2048]; -}; - -#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/LogAppenders.h ---------------------------------------------------------------------- diff --git a/libminifi/include/LogAppenders.h b/libminifi/include/LogAppenders.h deleted file mode 100644 index ef28bb8..0000000 --- a/libminifi/include/LogAppenders.h +++ /dev/null @@ -1,298 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef LIBMINIFI_INCLUDE_LOGAPPENDERS_H_ -#define LIBMINIFI_INCLUDE_LOGAPPENDERS_H_ - -#include "BaseLogger.h" -#include "spdlog/sinks/null_sink.h" -#include "spdlog/sinks/ostream_sink.h" -#include <cxxabi.h> -#include "Configure.h" - -template<typename T> -static std::string getUniqueName() { - std::string name = LOG_NAME; - name += " -- "; - name += abi::__cxa_demangle(typeid(T).name(), 0, 0, 0); - spdlog::drop(name); - return name; -} - -/** - * Null appender sets a null sink, thereby performing no logging. - */ -class NullAppender: public BaseLogger { -public: - /** - * Base constructor that creates the null sink. - */ - explicit NullAppender() : - ::BaseLogger("off") { - auto null_sink = std::make_shared<spdlog::sinks::null_sink_st>(); - std::string unique_name = getUniqueName<NullAppender>(); - logger_ = std::make_shared<spdlog::logger>(unique_name, null_sink); - configured_level_ = off; - setLogLevel(); - } - - /** - * Move constructor for the null appender. - */ - explicit NullAppender(const NullAppender &&other) : - ::BaseLogger(std::move(other)) { - - } - -}; - -/** - * Basic output stream configuration that uses a supplied ostream - * - * Design : extends LoggerConfiguration using the logger and log level - * encapsulated within the base configuration class. - */ -class OutputStreamAppender: public BaseLogger { - -public: - - static const char *nifi_log_output_stream_error_stderr; - - /** - * Output stream move constructor. - */ - explicit OutputStreamAppender(const OutputStreamAppender &&other) : - ::BaseLogger(std::move(other)) { - - } - - /** - * Base constructor. Creates a ostream sink. - * @param stream incoming stream reference. - * @param config configuration. - */ - explicit OutputStreamAppender(Configure *config) : - ::BaseLogger("info") { - auto ostream_sink = std::make_shared<spdlog::sinks::ostream_sink_mt>( - std::cout); - - std::string unique_name = getUniqueName<OutputStreamAppender>(); - logger_ = std::make_shared<spdlog::logger>(unique_name, ostream_sink); - - std::string use_std_err; - - if (NULL != config - && config->get(nifi_log_output_stream_error_stderr, - use_std_err)) { - - std::transform(use_std_err.begin(), use_std_err.end(), - use_std_err.begin(), ::tolower); - - if (use_std_err == "true") { - std::string err_unique_name = - getUniqueName<OutputStreamAppender>(); - auto error_ostream_sink = std::make_shared< - spdlog::sinks::ostream_sink_mt>(std::cerr); - stderr_ = std::make_shared<spdlog::logger>(err_unique_name, - error_ostream_sink); - } - } else { - stderr_ = nullptr; - } - - std::string log_level; - if (NULL != config - && config->get(BaseLogger::nifi_log_level, log_level)) { - setLogLevel(log_level); - } else{ - setLogLevel("info"); - } - - } - - /** - * Base constructor. Creates a ostream sink. - * @param stream incoming stream reference. - * @param config configuration. - */ - OutputStreamAppender(std::ostream &stream, Configure *config) : - ::BaseLogger("info") { - auto ostream_sink = std::make_shared<spdlog::sinks::ostream_sink_mt>( - stream); - std::string unique_name = getUniqueName<OutputStreamAppender>(); - logger_ = std::make_shared<spdlog::logger>(unique_name, ostream_sink); - - stderr_ = nullptr; - - std::string log_level; - if (NULL != config - && config->get(BaseLogger::nifi_log_level, log_level)) { - setLogLevel(log_level); - } else { - setLogLevel("info"); - } - - } - -protected: - -}; - -/** - * Rolling configuration - * Design : extends LoggerConfiguration using the logger and log level - * encapsulated within the base configuration class. - */ -class RollingAppender: public BaseLogger { -public: - static const char *nifi_log_rolling_apender_file; - static const char *nifi_log_rolling_appender_max_files; - static const char *nifi_log_rolling_appender_max_file_size; - - /** - * RollingAppenderConfiguration move constructor. - */ - explicit RollingAppender(const RollingAppender&& other) : - ::BaseLogger(std::move(other)), max_files_( - std::move(other.max_files_)), file_name_( - std::move(other.file_name_)), max_file_size_( - std::move(other.max_file_size_)) { - } - /** - * Base Constructor. - * @param config pointer to the configuration for this instance. - */ - explicit RollingAppender(Configure * config = 0) : - ::BaseLogger("info") { - std::string file_name = ""; - if (NULL != config - && config->get(nifi_log_rolling_apender_file, file_name)) { - file_name_ = file_name; - } else{ - file_name_ = LOG_FILE_NAME; - } - - std::string max_files = ""; - if (NULL != config - && config->get(nifi_log_rolling_appender_max_files, - max_files)) { - try { - max_files_ = std::stoi(max_files); - } catch (const std::invalid_argument &ia) { - max_files_ = DEFAULT_LOG_FILE_NUMBER; - } catch (const std::out_of_range &oor) { - max_files_ = DEFAULT_LOG_FILE_NUMBER; - } - } else { - max_files_ = DEFAULT_LOG_FILE_NUMBER; - } - - std::string max_file_size = ""; - if (NULL != config - && config->get(nifi_log_rolling_appender_max_file_size, - max_file_size)) { - try { - max_file_size_ = std::stoi(max_file_size); - } catch (const std::invalid_argument &ia) { - max_file_size_ = DEFAULT_LOG_FILE_SIZE; - } catch (const std::out_of_range &oor) { - max_file_size_ = DEFAULT_LOG_FILE_SIZE; - } - } else { - max_file_size_ = DEFAULT_LOG_FILE_SIZE; - } - - std::string unique_name = getUniqueName<OutputStreamAppender>(); - logger_ = spdlog::rotating_logger_mt(unique_name, file_name_, - max_file_size_, max_files_); - - std::string log_level; - if (NULL != config - && config->get(BaseLogger::nifi_log_level, log_level)) { - setLogLevel(log_level); - } - } - - /** - * To maintain current functionality we will flush on write. - */ - void log_str(LOG_LEVEL_E level, const std::string &buffer) { - ::BaseLogger::log_str(level, buffer); - logger_->flush(); - } - -protected: - - /** - * file name. - */ - std::string file_name_; - /** - * maximum number of files to keep in the rotation. - */ - size_t max_files_; - /** - * Maximum file size per rotated file. - */ - size_t max_file_size_; - -}; - -class LogInstance { -public: - /** - * Returns a logger configuration based on - * the configuration within this instance. - * @param config configuration for this instance. - */ - static std::unique_ptr<BaseLogger> getConfiguredLogger(Configure *config) { - std::string appender = ""; - - if (config->get(BaseLogger::nifi_log_appender, appender)) { - std::transform(appender.begin(), appender.end(), appender.begin(), - ::tolower); - - if ("nullappender" == appender || "null appender" == appender - || "null" == appender) { - - return std::move( - std::unique_ptr<BaseLogger>(new NullAppender())); - - } else if ("rollingappender" == appender - || "rolling appender" == appender - || "rolling" == appender) { - - return std::move( - std::unique_ptr<BaseLogger>( - new RollingAppender(config))); - - } else if ("outputstream" == appender - || "outputstreamappender" == appender - || "outputstream appender" == appender) { - - return std::move( - std::unique_ptr<BaseLogger>( - new OutputStreamAppender(config))); - - } - } - return nullptr; - - } -}; - -#endif /* LIBMINIFI_INCLUDE_LOGAPPENDERS_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/LogAttribute.h ---------------------------------------------------------------------- diff --git a/libminifi/include/LogAttribute.h b/libminifi/include/LogAttribute.h deleted file mode 100644 index 429a594..0000000 --- a/libminifi/include/LogAttribute.h +++ /dev/null @@ -1,128 +0,0 @@ -/** - * @file LogAttribute.h - * LogAttribute class declaration - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __LOG_ATTRIBUTE_H__ -#define __LOG_ATTRIBUTE_H__ - -#include "FlowFileRecord.h" -#include "Processor.h" -#include "ProcessSession.h" - -//! LogAttribute Class -class LogAttribute : public Processor -{ -public: - //! Constructor - /*! - * Create a new processor - */ - LogAttribute(std::string name, uuid_t uuid = NULL) - : Processor(name, uuid) - { - logger_ = Logger::getLogger(); - } - //! Destructor - virtual ~LogAttribute() - { - } - //! Processor Name - static const std::string ProcessorName; - //! Supported Properties - static Property LogLevel; - static Property AttributesToLog; - static Property AttributesToIgnore; - static Property LogPayload; - static Property LogPrefix; - //! Supported Relationships - static Relationship Success; - enum LogAttrLevel { - LogAttrLevelTrace, LogAttrLevelDebug, LogAttrLevelInfo, LogAttrLevelWarn, LogAttrLevelError - }; - //! Convert log level from string to enum - bool logLevelStringToEnum(std::string logStr, LogAttrLevel &level) - { - if (logStr == "trace") - { - level = LogAttrLevelTrace; - return true; - } - else if (logStr == "debug") - { - level = LogAttrLevelDebug; - return true; - } - else if (logStr == "info") - { - level = LogAttrLevelInfo; - return true; - } - else if (logStr == "warn") - { - level = LogAttrLevelWarn; - return true; - } - else if (logStr == "error") - { - level = LogAttrLevelError; - return true; - } - else - return false; - } - //! Nest Callback Class for read stream - class ReadCallback : public InputStreamCallback - { - public: - ReadCallback(uint64_t size) - { - _bufferSize = size; - _buffer = new char[_bufferSize]; - } - ~ReadCallback() - { - if (_buffer) - delete[] _buffer; - } - void process(std::ifstream *stream) { - - stream->read(_buffer, _bufferSize); - if (!stream) - _readSize = stream->gcount(); - else - _readSize = _bufferSize; - } - char *_buffer; - uint64_t _bufferSize; - uint64_t _readSize; - }; - -public: - //! OnTrigger method, implemented by NiFi LogAttribute - virtual void onTrigger(ProcessContext *context, ProcessSession *session); - //! Initialize, over write by NiFi LogAttribute - virtual void initialize(void); - -protected: - -private: - //! Logger - std::shared_ptr<Logger> logger_; -}; - -#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/Logger.h ---------------------------------------------------------------------- diff --git a/libminifi/include/Logger.h b/libminifi/include/Logger.h deleted file mode 100644 index 81d4446..0000000 --- a/libminifi/include/Logger.h +++ /dev/null @@ -1,200 +0,0 @@ -/** - * @file Logger.h - * Logger class declaration - * This is a C++ wrapper for spdlog, a lightweight C++ logging library - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __LOGGER_H__ -#define __LOGGER_H__ - -#include <string> -#include <atomic> -#include <memory> -#include <utility> -#include <algorithm> -#include <cstdio> -#include <iostream> - -#include "BaseLogger.h" -#include "spdlog/spdlog.h" - -/** - * Logger class - * Design: Extends BaseLogger, leaving this class to be the facade to the underlying - * logging mechanism. Is a facade to BaseLogger's underlying log stream. This allows - * the underlying implementation to be replaced real time. - */ -class Logger: public BaseLogger { -protected: - struct singleton; -public: - - /** - * Returns a shared pointer to the logger instance. - * Note that while there is no synchronization this is expected - * to be called and initialized first - * @returns shared pointer to the base logger. - */ - static std::shared_ptr<Logger> getLogger() { - - if (singleton_logger_ == nullptr) - singleton_logger_ = std::make_shared<Logger>(singleton { 0 }); - return singleton_logger_; - } - - /** - * Returns the log level for this instance. - */ - LOG_LEVEL_E getLogLevel() const { - return current_logger_.load()->getLogLevel(); - } - - /** - * Sets the log level atomic and sets it - * within logger if it can - * @param level desired log level. - */ - void setLogLevel(LOG_LEVEL_E level) { - current_logger_.load()->setLogLevel(level); - } - - /** - * Sets the log level for this instance based on the string - * @param level desired log leve. - * @param defaultLevel default level if we cannot match level. - */ - void setLogLevel(const std::string &level, - LOG_LEVEL_E defaultLevel = info) { - current_logger_.load()->setLogLevel(level, info); - } - - void updateLogger(std::unique_ptr<BaseLogger> logger) { - - if (logger == nullptr ) - return; - current_logger_.store(logger.release()); - } - - /** - * @brief Log error message - * @param format format string ('man printf' for syntax) - * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match - */ - void log_error(const char * const format, ...) { - if (!current_logger_.load()->shouldLog(err)) - return; - FILL_BUFFER - current_logger_.load()->log_str(err, buffer); - } - /** - * @brief Log warn message - * @param format format string ('man printf' for syntax) - * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match - */ - void log_warn(const char * const format, ...) { - if (!current_logger_.load()->shouldLog(warn)) - return; - FILL_BUFFER - current_logger_.load()->log_str(warn, buffer); - } - /** - * @brief Log info message - * @param format format string ('man printf' for syntax) - * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match - */ - void log_info(const char * const format, ...) { - if (!current_logger_.load()->shouldLog(info)) - return; - FILL_BUFFER - current_logger_.load()->log_str(info, buffer); - } - /** - * @brief Log debug message - * @param format format string ('man printf' for syntax) - * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match - */ - void log_debug(const char * const format, ...) { - - if (!current_logger_.load()->shouldLog(debug)) - return; - FILL_BUFFER - current_logger_.load()->log_str(debug, buffer); - } - /** - * @brief Log trace message - * @param format format string ('man printf' for syntax) - * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match - */ - void log_trace(const char * const format, ...) { - - if (!current_logger_.load()->shouldLog(trace)) - return; - FILL_BUFFER - current_logger_.load()->log_str(trace, buffer); - } - - /** - * @brief Log message - * @param format format string ('man printf' for syntax) - * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match - */ - virtual void log_str(LOG_LEVEL_E level, const std::string &buffer) { - current_logger_.load()->log_str(level, buffer); - } - - //! Destructor - ~Logger() { - } - - explicit Logger(const singleton &a) { - - /** - * flush on info to maintain current functionality - */ - std::shared_ptr<spdlog::logger> defaultsink = spdlog::rotating_logger_mt(LOG_NAME, - LOG_FILE_NAME, - DEFAULT_LOG_FILE_SIZE, DEFAULT_LOG_FILE_NUMBER); - defaultsink->flush_on(spdlog::level::level_enum::info); - - std::unique_ptr<BaseLogger> new_logger_ = std::unique_ptr<BaseLogger>( - new BaseLogger("info", defaultsink)); - - new_logger_->setLogLevel(info); - current_logger_.store(new_logger_.release()); - } - - Logger(const Logger &parent) = delete; - Logger &operator=(const Logger &parent) = delete; - -protected: - - /** - * Allows for a null constructor above so that we can have a public constructor that - * effectively limits us to being a singleton by having a protected argument in the constructor - */ - struct singleton { - explicit singleton(int) { - } - }; - - std::atomic<BaseLogger*> current_logger_; - -//! Singleton logger instance - static std::shared_ptr<Logger> singleton_logger_; -}; - -#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/ProcessContext.h ---------------------------------------------------------------------- diff --git a/libminifi/include/ProcessContext.h b/libminifi/include/ProcessContext.h deleted file mode 100644 index 585e187..0000000 --- a/libminifi/include/ProcessContext.h +++ /dev/null @@ -1,113 +0,0 @@ -/** - * @file ProcessContext.h - * ProcessContext class declaration - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __PROCESS_CONTEXT_H__ -#define __PROCESS_CONTEXT_H__ - -#include <uuid/uuid.h> -#include <vector> -#include <queue> -#include <map> -#include <mutex> -#include <atomic> -#include <algorithm> - -#include "Logger.h" -#include "Processor.h" - -//! ProcessContext Class -class ProcessContext -{ -public: - //! Constructor - /*! - * Create a new process context associated with the processor/controller service/state manager - */ - ProcessContext(Processor *processor = NULL) : _processor(processor) { - logger_ = Logger::getLogger(); - } - //! Destructor - virtual ~ProcessContext() {} - //! Get Processor associated with the Process Context - Processor *getProcessor() { - return _processor; - } - bool getProperty(std::string name, std::string &value) { - if (_processor) - return _processor->getProperty(name, value); - else - return false; - } - //! Sets the property value using the property's string name - bool setProperty(std::string name, std::string value) - { - if (_processor) - return _processor->setProperty(name, value); - else - return false; - } - //! Sets the property value using the Property object - bool setProperty(Property prop, std::string value) { - if (_processor) - return _processor->setProperty(prop, value); - else - return false; - } - //! Whether the relationship is supported - bool isSupportedRelationship(Relationship relationship) { - if (_processor) - return _processor->isSupportedRelationship(relationship); - else - return false; - } - //! Check whether the relationship is auto terminated - bool isAutoTerminated(Relationship relationship) { - if (_processor) - return _processor->isAutoTerminated(relationship); - else - return false; - } - //! Get ProcessContext Maximum Concurrent Tasks - uint8_t getMaxConcurrentTasks(void) { - if (_processor) - return _processor->getMaxConcurrentTasks(); - else - return 0; - } - //! Yield based on the yield period - void yield() { - if (_processor) - _processor->yield(); - } - - // Prevent default copy constructor and assignment operation - // Only support pass by reference or pointer - ProcessContext(const ProcessContext &parent) = delete; - ProcessContext &operator=(const ProcessContext &parent) = delete; - -private: - - //! Processor - Processor *_processor; - //! Logger - std::shared_ptr<Logger> logger_; - -}; - -#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/ProcessGroup.h ---------------------------------------------------------------------- diff --git a/libminifi/include/ProcessGroup.h b/libminifi/include/ProcessGroup.h deleted file mode 100644 index dfec6c5..0000000 --- a/libminifi/include/ProcessGroup.h +++ /dev/null @@ -1,187 +0,0 @@ -/** - * @file ProcessGroup.h - * ProcessGroup class declaration - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __PROCESS_GROUP_H__ -#define __PROCESS_GROUP_H__ - -#include <uuid/uuid.h> -#include <vector> -#include <queue> -#include <map> -#include <mutex> -#include <atomic> -#include <algorithm> -#include <set> - -#include "Processor.h" -#include "Exception.h" -#include "TimerDrivenSchedulingAgent.h" -#include "EventDrivenSchedulingAgent.h" -#include "Logger.h" - -//! Process Group Type -enum ProcessGroupType -{ - ROOT_PROCESS_GROUP = 0, - REMOTE_PROCESS_GROUP, - MAX_PROCESS_GROUP_TYPE -}; - -//! ProcessGroup Class -class ProcessGroup -{ -public: - //! Constructor - /*! - * Create a new process group - */ - ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid = NULL, ProcessGroup *parent = NULL); - //! Destructor - virtual ~ProcessGroup(); - //! Set Processor Name - void setName(std::string name) { - name_ = name; - } - //! Get Process Name - std::string getName(void) { - return (name_); - } - //! Set URL - void setURL(std::string url) { - url_ = url; - } - //! Get URL - std::string getURL(void) { - return (url_); - } - //! SetTransmitting - void setTransmitting(bool val) - { - transmitting_ = val; - } - //! Get Transmitting - bool getTransmitting() - { - return transmitting_; - } - //! setTimeOut - void setTimeOut(uint64_t time) - { - timeOut_ = time; - } - uint64_t getTimeOut() - { - return timeOut_; - } - //! Set Processor yield period in MilliSecond - void setYieldPeriodMsec(uint64_t period) { - yield_period_msec_ = period; - } - //! Get Processor yield period in MilliSecond - uint64_t getYieldPeriodMsec(void) { - return(yield_period_msec_); - } - //! Set UUID - void setUUID(uuid_t uuid) { - uuid_copy(uuid_, uuid); - } - //! Get UUID - bool getUUID(uuid_t uuid) { - if (uuid) - { - uuid_copy(uuid, uuid_); - return true; - } - else - return false; - } - //! Start Processing - void startProcessing(TimerDrivenSchedulingAgent *timeScheduler, - EventDrivenSchedulingAgent *eventScheduler); - //! Stop Processing - void stopProcessing(TimerDrivenSchedulingAgent *timeScheduler, - EventDrivenSchedulingAgent *eventScheduler); - //! Whether it is root process group - bool isRootProcessGroup(); - //! set parent process group - void setParent(ProcessGroup *parent) { - std::lock_guard<std::mutex> lock(mtx_); - parent_process_group_ = parent; - } - //! get parent process group - ProcessGroup *getParent(void) { - std::lock_guard<std::mutex> lock(mtx_); - return parent_process_group_; - } - //! Add processor - void addProcessor(Processor *processor); - //! Remove processor - void removeProcessor(Processor *processor); - //! Add child processor group - void addProcessGroup(ProcessGroup *child); - //! Remove child processor group - void removeProcessGroup(ProcessGroup *child); - // ! Add connections - void addConnection(Connection *connection); - //! findProcessor based on UUID - Processor *findProcessor(uuid_t uuid); - //! findProcessor based on name - Processor *findProcessor(std::string processorName); - //! removeConnection - void removeConnection(Connection *connection); - //! update property value - void updatePropertyValue(std::string processorName, std::string propertyName, std::string propertyValue); - //! get connections under the process group - void getConnections(std::map<std::string, Connection*> *connectionMap); - -protected: - //! A global unique identifier - uuid_t uuid_; - //! Processor Group Name - std::string name_; - //! Process Group Type - ProcessGroupType type_; - //! Processors (ProcessNode) inside this process group which include Input/Output Port, Remote Process Group input/Output port - std::set<Processor *> processors_; - std::set<ProcessGroup *> child_process_groups_; - //! Connections between the processor inside the group; - std::set<Connection *> connections_; - //! Parent Process Group - ProcessGroup* parent_process_group_; - //! Yield Period in Milliseconds - std::atomic<uint64_t> yield_period_msec_; - std::atomic<uint64_t> timeOut_; - //! URL - std::string url_; - //! Transmitting - std::atomic<bool> transmitting_; - -private: - - //! Mutex for protection - std::mutex mtx_; - //! Logger - std::shared_ptr<Logger> logger_; - // Prevent default copy constructor and assignment operation - // Only support pass by reference or pointer - ProcessGroup(const ProcessGroup &parent); - ProcessGroup &operator=(const ProcessGroup &parent); -}; - -#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/ProcessSession.h ---------------------------------------------------------------------- diff --git a/libminifi/include/ProcessSession.h b/libminifi/include/ProcessSession.h deleted file mode 100644 index 4e26758..0000000 --- a/libminifi/include/ProcessSession.h +++ /dev/null @@ -1,125 +0,0 @@ -/** - * @file ProcessSession.h - * ProcessSession class declaration - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __PROCESS_SESSION_H__ -#define __PROCESS_SESSION_H__ - -#include <uuid/uuid.h> -#include <vector> -#include <queue> -#include <map> -#include <mutex> -#include <atomic> -#include <algorithm> -#include <set> - -#include "Processor.h" -#include "ProcessContext.h" -#include "FlowFileRecord.h" -#include "Exception.h" -#include "Logger.h" -#include "Provenance.h" - -//! ProcessSession Class -class ProcessSession -{ -public: - //! Constructor - /*! - * Create a new process session - */ - ProcessSession(ProcessContext *processContext = NULL); - //! Destructor - virtual ~ProcessSession() { - if (_provenanceReport) - delete _provenanceReport; - } - //! Commit the session - void commit(); - //! Roll Back the session - void rollback(); - //! Get Provenance Report - ProvenanceReporter *getProvenanceReporter() - { - return _provenanceReport; - } - //! - //! Get the FlowFile from the highest priority queue - FlowFileRecord *get(); - //! Create a new UUID FlowFile with no content resource claim and without parent - FlowFileRecord *create(); - //! Create a new UUID FlowFile with no content resource claim and inherit all attributes from parent - FlowFileRecord *create(FlowFileRecord *parent); - //! Clone a new UUID FlowFile from parent both for content resource claim and attributes - FlowFileRecord *clone(FlowFileRecord *parent); - //! Clone a new UUID FlowFile from parent for attributes and sub set of parent content resource claim - FlowFileRecord *clone(FlowFileRecord *parent, long offset, long size); - //! Duplicate a FlowFile with the same UUID and all attributes and content resource claim for the roll back of the session - FlowFileRecord *duplicate(FlowFileRecord *orignal); - //! Transfer the FlowFile to the relationship - void transfer(FlowFileRecord *flow, Relationship relationship); - //! Put Attribute - void putAttribute(FlowFileRecord *flow, std::string key, std::string value); - //! Remove Attribute - void removeAttribute(FlowFileRecord *flow, std::string key); - //! Remove Flow File - void remove(FlowFileRecord *flow); - //! Execute the given read callback against the content - void read(FlowFileRecord *flow, InputStreamCallback *callback); - //! Execute the given write callback against the content - void write(FlowFileRecord *flow, OutputStreamCallback *callback); - //! Execute the given write/append callback against the content - void append(FlowFileRecord *flow, OutputStreamCallback *callback); - //! Penalize the flow - void penalize(FlowFileRecord *flow); - //! Import the existed file into the flow - void import(std::string source, FlowFileRecord *flow, bool keepSource = true, uint64_t offset = 0); - - // Prevent default copy constructor and assignment operation - // Only support pass by reference or pointer - ProcessSession(const ProcessSession &parent) = delete; - ProcessSession &operator=(const ProcessSession &parent) = delete; - -protected: - //! FlowFiles being modified by current process session - std::map<std::string, FlowFileRecord *> _updatedFlowFiles; - //! Copy of the original FlowFiles being modified by current process session as above - std::map<std::string, FlowFileRecord *> _originalFlowFiles; - //! FlowFiles being added by current process session - std::map<std::string, FlowFileRecord *> _addedFlowFiles; - //! FlowFiles being deleted by current process session - std::map<std::string, FlowFileRecord *> _deletedFlowFiles; - //! FlowFiles being transfered to the relationship - std::map<std::string, Relationship> _transferRelationship; - //! FlowFiles being cloned for multiple connections per relationship - std::map<std::string, FlowFileRecord *> _clonedFlowFiles; - -private: - // Clone the flow file during transfer to multiple connections for a relationship - FlowFileRecord* cloneDuringTransfer(FlowFileRecord *parent); - //! ProcessContext - ProcessContext *_processContext; - //! Logger - std::shared_ptr<Logger> logger_; - //! Provenance Report - ProvenanceReporter *_provenanceReport; - -}; - -#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/ProcessSessionFactory.h ---------------------------------------------------------------------- diff --git a/libminifi/include/ProcessSessionFactory.h b/libminifi/include/ProcessSessionFactory.h deleted file mode 100644 index 7fc3410..0000000 --- a/libminifi/include/ProcessSessionFactory.h +++ /dev/null @@ -1,52 +0,0 @@ -/** - * @file ProcessSessionFactory.h - * ProcessSessionFactory class declaration - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __PROCESS_SESSION_FACTORY_H__ -#define __PROCESS_SESSION_FACTORY_H__ - -#include <memory> - -#include "ProcessContext.h" -#include "ProcessSession.h" - -//! ProcessSessionFactory Class -class ProcessSessionFactory -{ -public: - //! Constructor - /*! - * Create a new process session factory - */ - explicit ProcessSessionFactory(ProcessContext *processContext) : _processContext(processContext) {} - - //! Create the session - std::unique_ptr<ProcessSession> createSession(); - - // Prevent default copy constructor and assignment operation - // Only support pass by reference or pointer - ProcessSessionFactory(const ProcessSessionFactory &parent) = delete; - ProcessSessionFactory &operator=(const ProcessSessionFactory &parent) = delete; - -private: - //! ProcessContext - ProcessContext *_processContext; - -}; - -#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/Processor.h ---------------------------------------------------------------------- diff --git a/libminifi/include/Processor.h b/libminifi/include/Processor.h deleted file mode 100644 index 1634bc0..0000000 --- a/libminifi/include/Processor.h +++ /dev/null @@ -1,365 +0,0 @@ -/** - * @file Processor.h - * Processor class declaration - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __PROCESSOR_H__ -#define __PROCESSOR_H__ - -#include <uuid/uuid.h> -#include <vector> -#include <queue> -#include <map> -#include <mutex> -#include <memory> -#include <condition_variable> -#include <atomic> -#include <algorithm> -#include <set> -#include <chrono> -#include <functional> - -#include "utils/TimeUtil.h" -#include "Property.h" -#include "Relationship.h" -#include "Connection.h" - -//! Forwarder declaration -class ProcessContext; -class ProcessSession; -class ProcessSessionFactory; - -//! Minimum scheduling period in Nano Second -#define MINIMUM_SCHEDULING_NANOS 30000 - -//! Default yield period in second -#define DEFAULT_YIELD_PERIOD_SECONDS 1 - -//! Default penalization period in second -#define DEFAULT_PENALIZATION_PERIOD_SECONDS 30 - -/*! - * Indicates the valid values for the state of a entity - * with respect to scheduling the entity to run. - */ -enum ScheduledState { - - /** - * Entity cannot be scheduled to run - */ - DISABLED, - /** - * Entity can be scheduled to run but currently is not - */ - STOPPED, - /** - * Entity is currently scheduled to run - */ - RUNNING -}; - -/*! - * Scheduling Strategy - */ -enum SchedulingStrategy { - //! Event driven - EVENT_DRIVEN, - //! Timer driven - TIMER_DRIVEN, - //! Cron Driven - CRON_DRIVEN -}; - -//! Processor Class -class Processor -{ - friend class ProcessContext; -public: - //! Constructor - /*! - * Create a new processor - */ - Processor(std::string name, uuid_t uuid = NULL); - //! Destructor - virtual ~Processor(); - //! Set Processor Name - void setName(std::string name) { - _name = name; - } - //! Get Process Name - std::string getName(void) { - return (_name); - } - //! Set UUID - void setUUID(uuid_t uuid) { - uuid_copy(_uuid, uuid); - char uuidStr[37]; - uuid_unparse_lower(_uuid, uuidStr); - _uuidStr = uuidStr; - } - //! Get UUID - bool getUUID(uuid_t uuid) { - if (uuid) - { - uuid_copy(uuid, _uuid); - return true; - } - else - { - return false; - } - } - //! Set the supported processor properties while the process is not running - bool setSupportedProperties(std::set<Property> properties); - //! Set the supported relationships while the process is not running - bool setSupportedRelationships(std::set<Relationship> relationships); - //! Get the supported property value by name - bool getProperty(std::string name, std::string &value); - //! Set the supported property value by name wile the process is not running - bool setProperty(std::string name, std::string value); - //! Set the supported property value by using the property itself. - bool setProperty(Property prop, std::string value); - //! Whether the relationship is supported - bool isSupportedRelationship(Relationship relationship); - //! Set the auto terminated relationships while the process is not running - bool setAutoTerminatedRelationships(std::set<Relationship> relationships); - //! Check whether the relationship is auto terminated - bool isAutoTerminated(Relationship relationship); - //! Check whether the processor is running - bool isRunning(); - //! Set Processor Scheduled State - void setScheduledState(ScheduledState state); - //! Get Processor Scheduled State - ScheduledState getScheduledState(void) { - return _state; - } - //! Set Processor Scheduling Strategy - void setSchedulingStrategy(SchedulingStrategy strategy) { - _strategy = strategy; - } - //! Get Processor Scheduling Strategy - SchedulingStrategy getSchedulingStrategy(void) { - return _strategy; - } - //! Set Processor Loss Tolerant - void setlossTolerant(bool lossTolerant) { - _lossTolerant = lossTolerant; - } - //! Get Processor Loss Tolerant - bool getlossTolerant(void) { - return _lossTolerant; - } - //! Set Processor Scheduling Period in Nano Second - void setSchedulingPeriodNano(uint64_t period) { - uint64_t minPeriod = MINIMUM_SCHEDULING_NANOS; - _schedulingPeriodNano = std::max(period, minPeriod); - } - //! Get Processor Scheduling Period in Nano Second - uint64_t getSchedulingPeriodNano(void) { - return _schedulingPeriodNano; - } - //! Set Processor Run Duration in Nano Second - void setRunDurationNano(uint64_t period) { - _runDurantionNano = period; - } - //! Get Processor Run Duration in Nano Second - uint64_t getRunDurationNano(void) { - return(_runDurantionNano); - } - //! Set Processor yield period in MilliSecond - void setYieldPeriodMsec(uint64_t period) { - _yieldPeriodMsec = period; - } - //! Get Processor yield period in MilliSecond - uint64_t getYieldPeriodMsec(void) { - return(_yieldPeriodMsec); - } - //! Set Processor penalization period in MilliSecond - void setPenalizationPeriodMsec(uint64_t period) { - _penalizationPeriodMsec = period; - } - //! Get Processor penalization period in MilliSecond - uint64_t getPenalizationPeriodMsec(void) { - return(_penalizationPeriodMsec); - } - //! Set Processor Maximum Concurrent Tasks - void setMaxConcurrentTasks(uint8_t tasks) { - _maxConcurrentTasks = tasks; - } - //! Get Processor Maximum Concurrent Tasks - uint8_t getMaxConcurrentTasks(void) { - return(_maxConcurrentTasks); - } - //! Set Trigger when empty - void setTriggerWhenEmpty(bool value) { - _triggerWhenEmpty = value; - } - //! Get Trigger when empty - bool getTriggerWhenEmpty(void) { - return(_triggerWhenEmpty); - } - //! Get Active Task Counts - uint8_t getActiveTasks(void) { - return(_activeTasks); - } - //! Increment Active Task Counts - void incrementActiveTasks(void) { - _activeTasks++; - } - //! decrement Active Task Counts - void decrementActiveTask(void) { - _activeTasks--; - } - void clearActiveTask(void) { - _activeTasks = 0; - } - //! Yield based on the yield period - void yield() - { - _yieldExpiration = (getTimeMillis() + _yieldPeriodMsec); - } - //! Yield based on the input time - void yield(uint64_t time) - { - _yieldExpiration = (getTimeMillis() + time); - } - //! whether need be to yield - bool isYield() - { - if (_yieldExpiration > 0) - return (_yieldExpiration >= getTimeMillis()); - else - return false; - } - // clear yield expiration - void clearYield() - { - _yieldExpiration = 0; - } - // get yield time - uint64_t getYieldTime() - { - uint64_t curTime = getTimeMillis(); - if (_yieldExpiration > curTime) - return (_yieldExpiration - curTime); - else - return 0;; - } - //! Whether flow file queued in incoming connection - bool flowFilesQueued(); - //! Whether flow file queue full in any of the outgoin connection - bool flowFilesOutGoingFull(); - //! Get incoming connections - std::set<Connection *> getIncomingConnections() { - return _incomingConnections; - } - //! Has Incoming Connection - bool hasIncomingConnections() { - return (_incomingConnections.size() > 0); - } - //! Get outgoing connections based on relationship name - std::set<Connection *> getOutGoingConnections(std::string relationship); - //! Add connection - bool addConnection(Connection *connection); - //! Remove connection - void removeConnection(Connection *connection); - //! Get the UUID as string - std::string getUUIDStr() { - return _uuidStr; - } - //! Get the Next RoundRobin incoming connection - Connection *getNextIncomingConnection(); - //! On Trigger - void onTrigger(ProcessContext *context, ProcessSessionFactory *sessionFactory); - //! Block until work is available on any input connection, or the given duration elapses - void waitForWork(uint64_t timeoutMs); - //! Notify this processor that work may be available - void notifyWork(); - -public: - //! OnTrigger method, implemented by NiFi Processor Designer - virtual void onTrigger(ProcessContext *context, ProcessSession *session) = 0; - //! Initialize, overridden by NiFi Process Designer - virtual void initialize() {} - //! Scheduled event hook, overridden by NiFi Process Designer - virtual void onSchedule(ProcessContext *context, ProcessSessionFactory *sessionFactory) {} - -protected: - - //! A global unique identifier - uuid_t _uuid; - //! Processor Name - std::string _name; - //! Supported properties - std::map<std::string, Property> _properties; - //! Supported relationships - std::map<std::string, Relationship> _relationships; - //! Autoterminated relationships - std::map<std::string, Relationship> _autoTerminatedRelationships; - //! Processor state - std::atomic<ScheduledState> _state; - //! Scheduling Strategy - std::atomic<SchedulingStrategy> _strategy; - //! lossTolerant - std::atomic<bool> _lossTolerant; - //! SchedulePeriod in Nano Seconds - std::atomic<uint64_t> _schedulingPeriodNano; - //! Run Duration in Nano Seconds - std::atomic<uint64_t> _runDurantionNano; - //! Yield Period in Milliseconds - std::atomic<uint64_t> _yieldPeriodMsec; - //! Penalization Period in MilliSecond - std::atomic<uint64_t> _penalizationPeriodMsec; - //! Maximum Concurrent Tasks - std::atomic<uint8_t> _maxConcurrentTasks; - //! Active Tasks - std::atomic<uint8_t> _activeTasks; - //! Trigger the Processor even if the incoming connection is empty - std::atomic<bool> _triggerWhenEmpty; - //! Incoming connections - std::set<Connection *> _incomingConnections; - //! Outgoing connections map based on Relationship name - std::map<std::string, std::set<Connection *>> _outGoingConnections; - //! UUID string - std::string _uuidStr; - -private: - - //! Mutex for protection - std::mutex _mtx; - //! Yield Expiration - std::atomic<uint64_t> _yieldExpiration; - //! Incoming connection Iterator - std::set<Connection *>::iterator _incomingConnectionsIter; - //! Condition for whether there is incoming work to do - std::atomic<bool> _hasWork; - //! Concurrent condition mutex for whether there is incoming work to do - std::mutex _workAvailableMtx; - //! Concurrent condition variable for whether there is incoming work to do - std::condition_variable _hasWorkCondition; - //! Check all incoming connections for work - bool isWorkAvailable(); - //! Logger - std::shared_ptr<Logger> logger_; - // Prevent default copy constructor and assignment operation - // Only support pass by reference or pointer - Processor(const Processor &parent); - Processor &operator=(const Processor &parent); - -}; - -#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/Property.h ---------------------------------------------------------------------- diff --git a/libminifi/include/Property.h b/libminifi/include/Property.h deleted file mode 100644 index bf33b35..0000000 --- a/libminifi/include/Property.h +++ /dev/null @@ -1,259 +0,0 @@ -/** - * @file Property.h - * Processor Property class declaration - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __PROPERTY_H__ -#define __PROPERTY_H__ - -#include <algorithm> -#include <sstream> -#include <string> -#include <vector> -#include <queue> -#include <map> -#include <mutex> -#include <atomic> -#include <functional> -#include <set> -#include <stdlib.h> -#include <math.h> - -//! Time Unit -enum TimeUnit { - DAY, HOUR, MINUTE, SECOND, MILLISECOND, NANOSECOND -}; - -//! Property Class -class Property { - -public: - //! Constructor - /*! - * Create a new property - */ - Property(const std::string name, const std::string description, - const std::string value) : - _name(name), _description(description), _value(value) { - } - Property() { - } - //! Destructor - virtual ~Property() { - } - //! Get Name for the property - std::string getName() { - return _name; - } - //! Get Description for the property - std::string getDescription() { - return _description; - } - //! Get value for the property - std::string getValue() { - return _value; - } - //! Set value for the property - void setValue(std::string value) { - _value = value; - } - //! Compare - bool operator <(const Property & right) const { - return _name < right._name; - } - - //! Convert TimeUnit to MilliSecond - static bool ConvertTimeUnitToMS(int64_t input, TimeUnit unit, - int64_t &out) { - if (unit == MILLISECOND) { - out = input; - return true; - } else if (unit == SECOND) { - out = input * 1000; - return true; - } else if (unit == MINUTE) { - out = input * 60 * 1000; - return true; - } else if (unit == HOUR) { - out = input * 60 * 60 * 1000; - return true; - } else if (unit == DAY) { - out = 24 * 60 * 60 * 1000; - return true; - } else if (unit == NANOSECOND) { - out = input / 1000 / 1000; - return true; - } else { - return false; - } - } - //! Convert TimeUnit to NanoSecond - static bool ConvertTimeUnitToNS(int64_t input, TimeUnit unit, - int64_t &out) { - if (unit == MILLISECOND) { - out = input * 1000 * 1000; - return true; - } else if (unit == SECOND) { - out = input * 1000 * 1000 * 1000; - return true; - } else if (unit == MINUTE) { - out = input * 60 * 1000 * 1000 * 1000; - return true; - } else if (unit == HOUR) { - out = input * 60 * 60 * 1000 * 1000 * 1000; - return true; - } else if (unit == NANOSECOND) { - out = input; - return true; - } else { - return false; - } - } - //! Convert String - static bool StringToTime(std::string input, int64_t &output, - TimeUnit &timeunit) { - if (input.size() == 0) { - return false; - } - - const char *cvalue = input.c_str(); - char *pEnd; - long int ival = strtol(cvalue, &pEnd, 0); - - if (pEnd[0] == '\0') { - return false; - } - - while (*pEnd == ' ') { - // Skip the space - pEnd++; - } - - std::string unit(pEnd); - - if (unit == "sec" || unit == "s" || unit == "second" - || unit == "seconds" || unit == "secs") { - timeunit = SECOND; - output = ival; - return true; - } else if (unit == "min" || unit == "m" || unit == "mins" - || unit == "minute" || unit == "minutes") { - timeunit = MINUTE; - output = ival; - return true; - } else if (unit == "ns" || unit == "nano" || unit == "nanos" - || unit == "nanoseconds") { - timeunit = NANOSECOND; - output = ival; - return true; - } else if (unit == "ms" || unit == "milli" || unit == "millis" - || unit == "milliseconds") { - timeunit = MILLISECOND; - output = ival; - return true; - } else if (unit == "h" || unit == "hr" || unit == "hour" - || unit == "hrs" || unit == "hours") { - timeunit = HOUR; - output = ival; - return true; - } else if (unit == "d" || unit == "day" || unit == "days") { - timeunit = DAY; - output = ival; - return true; - } else - return false; - } - - //! Convert String to Integer - static bool StringToInt(std::string input, int64_t &output) { - if (input.size() == 0) { - return false; - } - - const char *cvalue = input.c_str(); - char *pEnd; - long int ival = strtol(cvalue, &pEnd, 0); - - if (pEnd[0] == '\0') { - output = ival; - return true; - } - - while (*pEnd == ' ') { - // Skip the space - pEnd++; - } - - char end0 = toupper(pEnd[0]); - if ((end0 == 'K') || (end0 == 'M') || (end0 == 'G') || (end0 == 'T') - || (end0 == 'P')) { - if (pEnd[1] == '\0') { - unsigned long int multiplier = 1000; - - if ((end0 != 'K')) { - multiplier *= 1000; - if (end0 != 'M') { - multiplier *= 1000; - if (end0 != 'G') { - multiplier *= 1000; - if (end0 != 'T') { - multiplier *= 1000; - } - } - } - } - output = ival * multiplier; - return true; - - } else if ((pEnd[1] == 'b' || pEnd[1] == 'B') - && (pEnd[2] == '\0')) { - - unsigned long int multiplier = 1024; - - if ((end0 != 'K')) { - multiplier *= 1024; - if (end0 != 'M') { - multiplier *= 1024; - if (end0 != 'G') { - multiplier *= 1024; - if (end0 != 'T') { - multiplier *= 1024; - } - } - } - } - output = ival * multiplier; - return true; - } - } - - return false; - } - -protected: - //! Name - std::string _name; - //! Description - std::string _description; - //! Value - std::string _value; - -private: - -}; - -#endif
