http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/Site2SitePeer.h ---------------------------------------------------------------------- diff --git a/libminifi/include/Site2SitePeer.h b/libminifi/include/Site2SitePeer.h index e89bb74..de3a42f 100644 --- a/libminifi/include/Site2SitePeer.h +++ b/libminifi/include/Site2SitePeer.h @@ -29,242 +29,261 @@ #include <mutex> #include <atomic> #include <memory> -#include "Logger.h" -#include "Configure.h" -#include "Property.h" + +#include "core/Property.h" +#include "core/logging/Logger.h" +#include "properties/Configure.h" #include "io/ClientSocket.h" #include "io/BaseStream.h" #include "utils/TimeUtil.h" +namespace org { +namespace apache { +namespace nifi { +namespace minifi { static const char MAGIC_BYTES[] = { 'N', 'i', 'F', 'i' }; -//! Site2SitePeer Class -class Site2SitePeer : public BaseStream{ -public: - +// Site2SitePeer Class +class Site2SitePeer : public org::apache::nifi::minifi::io::BaseStream { + public: + + Site2SitePeer() + : stream_(nullptr), + host_(""), + port_(-1) { + + } + /* + * Create a new site2site peer + */ + explicit Site2SitePeer( + std::unique_ptr<org::apache::nifi::minifi::io::DataStream> injected_socket, + const std::string host_, uint16_t port_) + : host_(host_), + port_(port_), + stream_(injected_socket.release()) { + logger_ = logging::Logger::getLogger(); + configure_ = Configure::getConfigure(); + _yieldExpiration = 0; + _timeOut = 30000; // 30 seconds + _url = "nifi://" + host_ + ":" + std::to_string(port_); + } - Site2SitePeer() : stream_(nullptr),host_(""),port_(-1){ - - } - /* - * Create a new site2site peer - */ - explicit Site2SitePeer(std::unique_ptr<DataStream> injected_socket, const std::string host_, uint16_t port_ ) : - host_(host_), port_(port_), stream_(injected_socket.release()){ - logger_ = Logger::getLogger(); - configure_ = Configure::getConfigure(); - _yieldExpiration = 0; - _timeOut = 30000; // 30 seconds - _url = "nifi://" + host_ + ":" + std::to_string(port_); - } - - explicit Site2SitePeer(Site2SitePeer &&ss) : stream_( ss.stream_.release()), host_( std::move(ss.host_) ), port_ (std::move(ss.port_) ) - { - logger_ = Logger::getLogger(); - configure_ = Configure::getConfigure(); - _yieldExpiration.store(ss._yieldExpiration); - _timeOut.store(ss._timeOut); - _url = std::move(ss._url); - } - //! Destructor - virtual ~Site2SitePeer() { - Close(); - } - //! Set Processor yield period in MilliSecond - void setYieldPeriodMsec(uint64_t period) { - _yieldPeriodMsec = period; - } - //! get URL - std::string getURL() { - return _url; - } - //! Get Processor yield period in MilliSecond - uint64_t getYieldPeriodMsec(void) { - return (_yieldPeriodMsec); - } - //! Yield based on the yield period - void yield() { - _yieldExpiration = (getTimeMillis() + _yieldPeriodMsec); - } - //! setHostName - void setHostName(std::string host_) { - this->host_ = host_; - _url = "nifi://" + host_ + ":" + std::to_string(port_); - } - //! setPort - void setPort(uint16_t port_) { - this->port_ = port_; - _url = "nifi://" + host_ + ":" + std::to_string(port_); - } - //! getHostName - std::string getHostName() { - return host_; - } - //! getPort - uint16_t getPort() { - return port_; - } - //! Yield based on the input time - void yield(uint64_t time) { - _yieldExpiration = (getTimeMillis() + time); - } - //! whether need be to yield - bool isYield() { - if (_yieldExpiration > 0) - return (_yieldExpiration >= getTimeMillis()); - else - return false; - } - // clear yield expiration - void clearYield() { - _yieldExpiration = 0; - } - //! Yield based on the yield period - void yield(std::string portId) { - std::lock_guard<std::mutex> lock(_mtx); - uint64_t yieldExpiration = (getTimeMillis() + _yieldPeriodMsec); - _yieldExpirationPortIdMap[portId] = yieldExpiration; - } - //! Yield based on the input time - void yield(std::string portId, uint64_t time) { - std::lock_guard<std::mutex> lock(_mtx); - uint64_t yieldExpiration = (getTimeMillis() + time); - _yieldExpirationPortIdMap[portId] = yieldExpiration; - } - //! whether need be to yield - bool isYield(std::string portId) { - std::lock_guard<std::mutex> lock(_mtx); - std::map<std::string, uint64_t>::iterator it = - this->_yieldExpirationPortIdMap.find(portId); - if (it != _yieldExpirationPortIdMap.end()) { - uint64_t yieldExpiration = it->second; - return (yieldExpiration >= getTimeMillis()); - } else { - return false; - } - } - //! clear yield expiration - void clearYield(std::string portId) { - std::lock_guard<std::mutex> lock(_mtx); - std::map<std::string, uint64_t>::iterator it = - this->_yieldExpirationPortIdMap.find(portId); - if (it != _yieldExpirationPortIdMap.end()) { - _yieldExpirationPortIdMap.erase(portId); - } - } - //! setTimeOut - void setTimeOut(uint64_t time) { - _timeOut = time; - } - //! getTimeOut - uint64_t getTimeOut() { - return _timeOut; - } - int write(uint8_t value) { - return Serializable::write(value,stream_.get()); - } - int write(char value) { - return Serializable::write(value,stream_.get()); - } - int write(uint32_t value) { + explicit Site2SitePeer(Site2SitePeer &&ss) + : stream_(ss.stream_.release()), + host_(std::move(ss.host_)), + port_(std::move(ss.port_)) { + logger_ = logging::Logger::getLogger(); + configure_ = Configure::getConfigure(); + _yieldExpiration.store(ss._yieldExpiration); + _timeOut.store(ss._timeOut); + _url = std::move(ss._url); + } + // Destructor + virtual ~Site2SitePeer() { + Close(); + } + // Set Processor yield period in MilliSecond + void setYieldPeriodMsec(uint64_t period) { + _yieldPeriodMsec = period; + } + // get URL + std::string getURL() { + return _url; + } + // Get Processor yield period in MilliSecond + uint64_t getYieldPeriodMsec(void) { + return (_yieldPeriodMsec); + } + // Yield based on the yield period + void yield() { + _yieldExpiration = (getTimeMillis() + _yieldPeriodMsec); + } + // setHostName + void setHostName(std::string host_) { + this->host_ = host_; + _url = "nifi://" + host_ + ":" + std::to_string(port_); + } + // setPort + void setPort(uint16_t port_) { + this->port_ = port_; + _url = "nifi://" + host_ + ":" + std::to_string(port_); + } + // getHostName + std::string getHostName() { + return host_; + } + // getPort + uint16_t getPort() { + return port_; + } + // Yield based on the input time + void yield(uint64_t time) { + _yieldExpiration = (getTimeMillis() + time); + } + // whether need be to yield + bool isYield() { + if (_yieldExpiration > 0) + return (_yieldExpiration >= getTimeMillis()); + else + return false; + } + // clear yield expiration + void clearYield() { + _yieldExpiration = 0; + } + // Yield based on the yield period + void yield(std::string portId) { + std::lock_guard<std::mutex> lock(mutex_); + uint64_t yieldExpiration = (getTimeMillis() + _yieldPeriodMsec); + _yieldExpirationPortIdMap[portId] = yieldExpiration; + } + // Yield based on the input time + void yield(std::string portId, uint64_t time) { + std::lock_guard<std::mutex> lock(mutex_); + uint64_t yieldExpiration = (getTimeMillis() + time); + _yieldExpirationPortIdMap[portId] = yieldExpiration; + } + // whether need be to yield + bool isYield(std::string portId) { + std::lock_guard<std::mutex> lock(mutex_); + std::map<std::string, uint64_t>::iterator it = this + ->_yieldExpirationPortIdMap.find(portId); + if (it != _yieldExpirationPortIdMap.end()) { + uint64_t yieldExpiration = it->second; + return (yieldExpiration >= getTimeMillis()); + } else { + return false; + } + } + // clear yield expiration + void clearYield(std::string portId) { + std::lock_guard<std::mutex> lock(mutex_); + std::map<std::string, uint64_t>::iterator it = this + ->_yieldExpirationPortIdMap.find(portId); + if (it != _yieldExpirationPortIdMap.end()) { + _yieldExpirationPortIdMap.erase(portId); + } + } + // setTimeOut + void setTimeOut(uint64_t time) { + _timeOut = time; + } + // getTimeOut + uint64_t getTimeOut() { + return _timeOut; + } + int write(uint8_t value) { + return Serializable::write(value, stream_.get()); + } + int write(char value) { + return Serializable::write(value, stream_.get()); + } + int write(uint32_t value) { - return Serializable::write(value,stream_.get()); + return Serializable::write(value, stream_.get()); - } - int write(uint16_t value) { - return Serializable::write(value,stream_.get()); - } - int write(uint8_t *value, int len) { - return Serializable::write(value,len,stream_.get()); - } - int write(uint64_t value) { - return Serializable::write(value,stream_.get()); - } - int write(bool value) { - uint8_t temp = value; - return Serializable::write(temp,stream_.get()); - } - int writeUTF(std::string str, bool widen = false){ - return Serializable::writeUTF(str,stream_.get(),widen); - } - int read(uint8_t &value) { - return Serializable::read(value,stream_.get()); - } - int read(uint16_t &value) { - return Serializable::read(value,stream_.get()); - } - int read(char &value) { - return Serializable::read(value,stream_.get()); - } - int read(uint8_t *value, int len) { - return Serializable::read(value,len,stream_.get()); - } - int read(uint32_t &value) { - return Serializable::read(value,stream_.get()); - } - int read(uint64_t &value) { - return Serializable::read(value,stream_.get()); - } - int readUTF(std::string &str, bool widen = false) - { - return Serializable::readUTF(str,stream_.get(),widen); - } - //! open connection to the peer - bool Open(); - //! close connection to the peer - void Close(); - - /** - * Move assignment operator. - */ - Site2SitePeer& operator=(Site2SitePeer&& other) - { - stream_ = std::unique_ptr<DataStream>( other.stream_.release()); - host_ = std::move(other.host_); - port_ = std::move(other.port_); - logger_ = Logger::getLogger(); - configure_ = Configure::getConfigure(); - _yieldExpiration = 0; - _timeOut = 30000; // 30 seconds - _url = "nifi://" + host_ + ":" + std::to_string(port_); - - return *this; - } + } + int write(uint16_t value) { + return Serializable::write(value, stream_.get()); + } + int write(uint8_t *value, int len) { + return Serializable::write(value, len, stream_.get()); + } + int write(uint64_t value) { + return Serializable::write(value, stream_.get()); + } + int write(bool value) { + uint8_t temp = value; + return Serializable::write(temp, stream_.get()); + } + int writeUTF(std::string str, bool widen = false) { + return Serializable::writeUTF(str, stream_.get(), widen); + } + int read(uint8_t &value) { + return Serializable::read(value, stream_.get()); + } + int read(uint16_t &value) { + return Serializable::read(value, stream_.get()); + } + int read(char &value) { + return Serializable::read(value, stream_.get()); + } + int read(uint8_t *value, int len) { + return Serializable::read(value, len, stream_.get()); + } + int read(uint32_t &value) { + return Serializable::read(value, stream_.get()); + } + int read(uint64_t &value) { + return Serializable::read(value, stream_.get()); + } + int readUTF(std::string &str, bool widen = false) { + return org::apache::nifi::minifi::io::Serializable::readUTF(str, + stream_.get(), + widen); + } + // open connection to the peer + bool Open(); + // close connection to the peer + void Close(); - Site2SitePeer(const Site2SitePeer &parent) = delete; - Site2SitePeer &operator=(const Site2SitePeer &parent) = delete; + /** + * Move assignment operator. + */ + Site2SitePeer& operator=(Site2SitePeer&& other) { + stream_ = std::unique_ptr<org::apache::nifi::minifi::io::DataStream>( + other.stream_.release()); + host_ = std::move(other.host_); + port_ = std::move(other.port_); + logger_ = logging::Logger::getLogger(); + configure_ = Configure::getConfigure(); + _yieldExpiration = 0; + _timeOut = 30000; // 30 seconds + _url = "nifi://" + host_ + ":" + std::to_string(port_); -protected: + return *this; + } -private: + Site2SitePeer(const Site2SitePeer &parent) = delete; + Site2SitePeer &operator=(const Site2SitePeer &parent) = delete; - std::unique_ptr<DataStream> stream_; + protected: - std::string host_; - uint16_t port_; + private: - //! Mutex for protection - std::mutex _mtx; - //! URL - std::string _url; - //! socket timeout; - std::atomic<uint64_t> _timeOut; - //! Logger - std::shared_ptr<Logger> logger_; - //! Configure - Configure *configure_; - //! Yield Period in Milliseconds - std::atomic<uint64_t> _yieldPeriodMsec; - //! Yield Expiration - std::atomic<uint64_t> _yieldExpiration; - //! Yield Expiration per destination PortID - std::map<std::string, uint64_t> _yieldExpirationPortIdMap; - //! OpenSSL connection state - // Prevent default copy constructor and assignment operation - // Only support pass by reference or pointer + std::unique_ptr<org::apache::nifi::minifi::io::DataStream> stream_; + + std::string host_; + uint16_t port_; + + // Mutex for protection + std::mutex mutex_; + // URL + std::string _url; + // socket timeout; + std::atomic<uint64_t> _timeOut; + // Logger + std::shared_ptr<logging::Logger> logger_; + // Configure + Configure *configure_; + // Yield Period in Milliseconds + std::atomic<uint64_t> _yieldPeriodMsec; + // Yield Expiration + std::atomic<uint64_t> _yieldExpiration; + // Yield Expiration per destination PortID + std::map<std::string, uint64_t> _yieldExpirationPortIdMap; + // OpenSSL connection state + // Prevent default copy constructor and assignment operation + // Only support pass by reference or pointer }; + +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ #endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/TailFile.h ---------------------------------------------------------------------- diff --git a/libminifi/include/TailFile.h b/libminifi/include/TailFile.h deleted file mode 100644 index d68748e..0000000 --- a/libminifi/include/TailFile.h +++ /dev/null @@ -1,93 +0,0 @@ -/** - * @file TailFile.h - * TailFile class declaration - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __TAIL_FILE_H__ -#define __TAIL_FILE_H__ - -#include "FlowFileRecord.h" -#include "Processor.h" -#include "ProcessSession.h" - -//! TailFile Class -class TailFile : public Processor -{ -public: - //! Constructor - /*! - * Create a new processor - */ - TailFile(std::string name, uuid_t uuid = NULL) - : Processor(name, uuid) - { - logger_ = Logger::getLogger(); - _stateRecovered = false; - } - //! Destructor - virtual ~TailFile() - { - storeState(); - } - //! Processor Name - static const std::string ProcessorName; - //! Supported Properties - static Property FileName; - static Property StateFile; - //! Supported Relationships - static Relationship Success; - -public: - //! OnTrigger method, implemented by NiFi TailFile - virtual void onTrigger(ProcessContext *context, ProcessSession *session); - //! Initialize, over write by NiFi TailFile - virtual void initialize(void); - //! recoverState - void recoverState(); - //! storeState - void storeState(); - -protected: - -private: - //! Logger - std::shared_ptr<Logger> logger_; - std::string _fileLocation; - //! Property Specified Tailed File Name - std::string _fileName; - //! File to save state - std::string _stateFile; - //! State related to the tailed file - std::string _currentTailFileName; - uint64_t _currentTailFilePosition; - bool _stateRecovered; - uint64_t _currentTailFileCreatedTime; - //! Utils functions for parse state file - std::string trimLeft(const std::string& s); - std::string trimRight(const std::string& s); - void parseStateFileLine(char *buf); - void checkRollOver(); - -}; - -//! Matched File Item for Roll over check -typedef struct { - std::string fileName; - uint64_t modifiedTime; -} TailMatchedFileItem; - -#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/ThreadedSchedulingAgent.h ---------------------------------------------------------------------- diff --git a/libminifi/include/ThreadedSchedulingAgent.h b/libminifi/include/ThreadedSchedulingAgent.h index 5eb5d8a..4e39da3 100644 --- a/libminifi/include/ThreadedSchedulingAgent.h +++ b/libminifi/include/ThreadedSchedulingAgent.h @@ -20,51 +20,60 @@ #ifndef __THREADED_SCHEDULING_AGENT_H__ #define __THREADED_SCHEDULING_AGENT_H__ -#include "Configure.h" -#include "Logger.h" -#include "Processor.h" -#include "ProcessContext.h" +#include "properties/Configure.h" +#include "core/logging/Logger.h" +#include "core/Processor.h" +#include "core/Repository.h" +#include "core/ProcessContext.h" #include "SchedulingAgent.h" +namespace org { +namespace apache { +namespace nifi { +namespace minifi { + /** * An abstract scheduling agent which creates and manages a pool of threads for * each processor scheduled. */ -class ThreadedSchedulingAgent : public SchedulingAgent -{ -public: - //! Constructor - /*! - * Create a new processor - */ - ThreadedSchedulingAgent() - : SchedulingAgent() - { - } - //! Destructor - virtual ~ThreadedSchedulingAgent() - { - } +class ThreadedSchedulingAgent : public SchedulingAgent { + public: + // Constructor + /*! + * Create a new processor + */ + ThreadedSchedulingAgent(std::shared_ptr<core::Repository> repo) + : SchedulingAgent(repo) { + } + // Destructor + virtual ~ThreadedSchedulingAgent() { + } - //! Run function for the thread - virtual void run(Processor *processor, ProcessContext *processContext, ProcessSessionFactory *sessionFactory) = 0; + // Run function for the thread + virtual void run(std::shared_ptr<core::Processor> processor, + core::ProcessContext *processContext, + core::ProcessSessionFactory *sessionFactory) = 0; -public: - //! schedule, overwritten by different DrivenTimerDrivenSchedulingAgent - virtual void schedule(Processor *processor); - //! unschedule, overwritten by different DrivenTimerDrivenSchedulingAgent - virtual void unschedule(Processor *processor); + public: + // schedule, overwritten by different DrivenTimerDrivenSchedulingAgent + virtual void schedule(std::shared_ptr<core::Processor> processor); + // unschedule, overwritten by different DrivenTimerDrivenSchedulingAgent + virtual void unschedule(std::shared_ptr<core::Processor> processor); -protected: - //! Threads - std::map<std::string, std::vector<std::thread *>> _threads; + protected: + // Threads + std::map<std::string, std::vector<std::thread *>> _threads; -private: - // Prevent default copy constructor and assignment operation - // Only support pass by reference or pointer - ThreadedSchedulingAgent(const ThreadedSchedulingAgent &parent); - ThreadedSchedulingAgent &operator=(const ThreadedSchedulingAgent &parent); + private: + // Prevent default copy constructor and assignment operation + // Only support pass by reference or pointer + ThreadedSchedulingAgent(const ThreadedSchedulingAgent &parent); + ThreadedSchedulingAgent &operator=(const ThreadedSchedulingAgent &parent); }; +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ #endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/TimerDrivenSchedulingAgent.h ---------------------------------------------------------------------- diff --git a/libminifi/include/TimerDrivenSchedulingAgent.h b/libminifi/include/TimerDrivenSchedulingAgent.h index 389ccf6..7da2abd 100644 --- a/libminifi/include/TimerDrivenSchedulingAgent.h +++ b/libminifi/include/TimerDrivenSchedulingAgent.h @@ -20,36 +20,47 @@ #ifndef __TIMER_DRIVEN_SCHEDULING_AGENT_H__ #define __TIMER_DRIVEN_SCHEDULING_AGENT_H__ -#include "Logger.h" -#include "Processor.h" -#include "ProcessContext.h" +#include "core/logging/Logger.h" +#include "core/Processor.h" +#include "core/ProcessContext.h" +#include "core/Repository.h" #include "ThreadedSchedulingAgent.h" -//! TimerDrivenSchedulingAgent Class -class TimerDrivenSchedulingAgent : public ThreadedSchedulingAgent -{ -public: - //! Constructor - /*! - * Create a new processor - */ - TimerDrivenSchedulingAgent() - : ThreadedSchedulingAgent() - { - } - //! Destructor - virtual ~TimerDrivenSchedulingAgent() - { - } - //! Run function for the thread - void run(Processor *processor, ProcessContext *processContext, ProcessSessionFactory *sessionFactory); +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +// TimerDrivenSchedulingAgent Class +class TimerDrivenSchedulingAgent : public ThreadedSchedulingAgent { + public: + // Constructor + /*! + * Create a new processor + */ + TimerDrivenSchedulingAgent(std::shared_ptr<core::Repository> repo) + : ThreadedSchedulingAgent(repo) { + } + // Destructor + virtual ~TimerDrivenSchedulingAgent() { + } + /** + * Run function that accepts the processor, context and session factory. + */ + void run(std::shared_ptr<core::Processor> processor, + core::ProcessContext *processContext, + core::ProcessSessionFactory *sessionFactory); -private: - // Prevent default copy constructor and assignment operation - // Only support pass by reference or pointer - TimerDrivenSchedulingAgent(const TimerDrivenSchedulingAgent &parent); - TimerDrivenSchedulingAgent &operator=(const TimerDrivenSchedulingAgent &parent); + private: + // Prevent default copy constructor and assignment operation + // Only support pass by reference or pointer + TimerDrivenSchedulingAgent(const TimerDrivenSchedulingAgent &parent); + TimerDrivenSchedulingAgent &operator=( + const TimerDrivenSchedulingAgent &parent); }; +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ #endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/core/ConfigurableComponent.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/ConfigurableComponent.h b/libminifi/include/core/ConfigurableComponent.h new file mode 100644 index 0000000..c0cc623 --- /dev/null +++ b/libminifi/include/core/ConfigurableComponent.h @@ -0,0 +1,104 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef LIBMINIFI_INCLUDE_CORE_CONFIGURABLECOMPONENT_H_ +#define LIBMINIFI_INCLUDE_CORE_CONFIGURABLECOMPONENT_H_ + +#include "Property.h" +#include "core/logging/Logger.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { + +/** + * Represents a configurable component + * Purpose: Extracts configuration items for all components and localized them + */ +class ConfigurableComponent { + public: + + + ConfigurableComponent() = delete; + + + explicit ConfigurableComponent(std::shared_ptr<logging::Logger> logger); + + explicit ConfigurableComponent(const ConfigurableComponent &&other); + + /** + * Get property using the provided name. + * @param name property name. + * @param value value passed in by reference + * @return result of getting property. + */ + bool getProperty(const std::string name, std::string &value); + /** + * Sets the property using the provided name + * @param property name + * @param value property value. + * @return result of setting property. + */ + bool setProperty(const std::string name, std::string value); + /** + * Sets the property using the provided name + * @param property name + * @param value property value. + * @return whether property was set or not + */ + bool setProperty(Property &prop, std::string value); + + /** + * Sets supported properties for the ConfigurableComponent + * @param supported properties + * @return result of set operation. + */ + bool setSupportedProperties(std::set<Property> properties); + /** + * Sets supported properties for the ConfigurableComponent + * @param supported properties + * @return result of set operation. + */ + + virtual ~ConfigurableComponent(); + + protected: + + + /** + * Returns true if the instance can be edited. + * @return true/false + */ + virtual bool canEdit()= 0; + + std::mutex configuration_mutex_; + std::shared_ptr<logging::Logger> logger_; + // Supported properties + std::map<std::string, Property> properties_; + +}; + +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_CORE_CONFIGURABLECOMPONENT_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/core/ConfigurationFactory.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/ConfigurationFactory.h b/libminifi/include/core/ConfigurationFactory.h new file mode 100644 index 0000000..19ed5f4 --- /dev/null +++ b/libminifi/include/core/ConfigurationFactory.h @@ -0,0 +1,65 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef LIBMINIFI_INCLUDE_CORE_CONFIGURATIONFACTORY_H_ +#define LIBMINIFI_INCLUDE_CORE_CONFIGURATIONFACTORY_H_ + +#include "FlowConfiguration.h" +#include <type_traits> + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { + + + + +template<typename T> +typename std::enable_if<!class_operations<T>::value, T*>::type instantiate( + std::shared_ptr<core::Repository> repo, + std::shared_ptr<core::Repository> flow_file_repo,const std::string path ) { + throw std::runtime_error("Cannot instantiate class"); +} + +template<typename T> +typename std::enable_if<class_operations<T>::value, T*>::type instantiate( + std::shared_ptr<core::Repository> repo, + std::shared_ptr<core::Repository> flow_file_repo,const std::string path ) { + return new T(repo,flow_file_repo,path); +} + + +/** + * Configuration factory is used to create a new FlowConfiguration + * object. + */ + std::unique_ptr<core::FlowConfiguration> createFlowConfiguration( + std::shared_ptr<core::Repository> repo, + std::shared_ptr<core::Repository> flow_file_repo, + const std::string configuration_class_name, const std::string path = "", + bool fail_safe = false); + +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_CORE_CONFIGURATIONFACTORY_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/core/Connectable.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/Connectable.h b/libminifi/include/core/Connectable.h new file mode 100644 index 0000000..15e618f --- /dev/null +++ b/libminifi/include/core/Connectable.h @@ -0,0 +1,165 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef LIBMINIFI_INCLUDE_CORE_CONNECTABLE_H_ +#define LIBMINIFI_INCLUDE_CORE_CONNECTABLE_H_ + +#include <set> +#include "core.h" +#include <condition_variable> +#include "core/logging/Logger.h" +#include "Relationship.h" +#include "Scheduling.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { + +/** + * Represents the base connectable component + * Purpose: As in NiFi, this represents a connection point and allows the derived + * object to be connected to other connectables. + */ +class Connectable : public CoreComponent { + public: + + explicit Connectable(std::string name, uuid_t uuid); + + explicit Connectable(const Connectable &&other); + + bool setSupportedRelationships(std::set<Relationship> relationships); + + // Whether the relationship is supported + bool isSupportedRelationship(Relationship relationship); + + /** + * Sets auto terminated relationships + * @param relationships + * @return result of set operation. + */ + bool setAutoTerminatedRelationships(std::set<Relationship> relationships); + + // Check whether the relationship is auto terminated + bool isAutoTerminated(Relationship relationship); + + // Get Processor penalization period in MilliSecond + uint64_t getPenalizationPeriodMsec(void) { + return (_penalizationPeriodMsec); + } + + /** + * Get outgoing connection based on relationship + * @return set of outgoing connections. + */ + std::set<std::shared_ptr<Connectable>> getOutGoingConnections( + std::string relationship); + + /** + * Get next incoming connection + * @return next incoming connection + */ + std::shared_ptr<Connectable> getNextIncomingConnection(); + + /** + * @return true if incoming connections > 0 + */ + bool hasIncomingConnections() { + return (_incomingConnections.size() > 0); + } + + uint8_t getMaxConcurrentTasks() { + return max_concurrent_tasks_; + } + + void setMaxConcurrentTasks(const uint8_t tasks) { + max_concurrent_tasks_ = tasks; + } + /** + * Yield + */ + virtual void yield() = 0; + + virtual ~Connectable(); + + /** + * Determines if we are connected and operating + */ + virtual bool isRunning() = 0; + + /** + * Block until work is available on any input connection, or the given duration elapses + * @param timeoutMs timeout in milliseconds + */ + void waitForWork(uint64_t timeoutMs); + /** + * Notify this processor that work may be available + */ + + void notifyWork(); + + /** + * Determines if work is available by this connectable + * @return boolean if work is available. + */ + virtual bool isWorkAvailable() = 0; + + protected: + + // Penalization Period in MilliSecond + std::atomic<uint64_t> _penalizationPeriodMsec; + + uint8_t max_concurrent_tasks_; + + // Supported relationships + std::map<std::string, core::Relationship> relationships_; + // Autoterminated relationships + std::map<std::string, core::Relationship> auto_terminated_relationships_; + + // Incoming connection Iterator + std::set<std::shared_ptr<Connectable>>::iterator incoming_connections_Iter; + // Incoming connections + std::set<std::shared_ptr<Connectable>> _incomingConnections; + // Outgoing connections map based on Relationship name + std::map<std::string, std::set<std::shared_ptr<Connectable>>>_outGoingConnections; + + // Mutex for protection + std::mutex relationship_mutex_; + + ///// work conditionals and locking mechanisms + + // Concurrent condition mutex for whether there is incoming work to do + std::mutex work_available_mutex_; + // Condition for whether there is incoming work to do + std::atomic<bool> has_work_; + // Scheduling Strategy + std::atomic<SchedulingStrategy> strategy_; + // Concurrent condition variable for whether there is incoming work to do + std::condition_variable work_condition_; + +}; + +} +/* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_CORE_CONNECTABLE_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/core/FlowConfiguration.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/FlowConfiguration.h b/libminifi/include/core/FlowConfiguration.h new file mode 100644 index 0000000..c7eedd2 --- /dev/null +++ b/libminifi/include/core/FlowConfiguration.h @@ -0,0 +1,118 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CORE_FLOWCONFIGURATION_H_ +#define LIBMINIFI_INCLUDE_CORE_FLOWCONFIGURATION_H_ + +#include "core/core.h" +#include "Connection.h" +#include "RemoteProcessorGroupPort.h" +#include "provenance/Provenance.h" +#include "processors/GetFile.h" +#include "processors/PutFile.h" +#include "processors/TailFile.h" +#include "processors/ListenSyslog.h" +#include "processors/GenerateFlowFile.h" +#include "processors/RealTimeDataCollector.h" +#include "processors/ListenHTTP.h" +#include "processors/LogAttribute.h" +#include "processors/ExecuteProcess.h" +#include "processors/AppendHostInfo.h" +#include "core/Processor.h" +#include "core/logging/Logger.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "core/ProcessGroup.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { + +/** + * Purpose: Flow configuration defines the mechanism + * by which we will configure our flow controller + */ +class FlowConfiguration : public CoreComponent { + public: + /** + * Constructor that will be used for configuring + * the flow controller. + */ + FlowConfiguration(std::shared_ptr<core::Repository> repo, + std::shared_ptr<core::Repository> flow_file_repo, + const std::string path) + : CoreComponent(core::getClassName<FlowConfiguration>()), + flow_file_repo_(flow_file_repo), + config_path_(path) { + + } + + virtual ~FlowConfiguration(); + + // Create Processor (Node/Input/Output Port) based on the name + std::shared_ptr<core::Processor> createProcessor(std::string name, + uuid_t uuid); + // Create Root Processor Group + std::unique_ptr<core::ProcessGroup> createRootProcessGroup(std::string name, + uuid_t uuid); + // Create Remote Processor Group + std::unique_ptr<core::ProcessGroup> createRemoteProcessGroup(std::string name, + uuid_t uuid); + // Create Connection + std::shared_ptr<minifi::Connection> createConnection(std::string name, + uuid_t uuid); + + /** + * Returns the configuration path string + * @return config_path_ + */ + const std::string &getConfigurationPath() { + return config_path_; + } + + virtual std::unique_ptr<core::ProcessGroup> getRoot() { + return getRoot(config_path_); + } + + /** + * Base implementation that returns a null root pointer. + * @return Extensions should return a non-null pointer in order to + * properly configure flow controller. + */ + virtual std::unique_ptr<core::ProcessGroup> getRoot( + const std::string &from_config) { + return nullptr; + } + + protected: + // configuration path + std::string config_path_; + // flow file repo + std::shared_ptr<core::Repository> flow_file_repo_; + +}; + +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_CORE_FLOWCONFIGURATION_H_ */ + http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/core/FlowFile.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/FlowFile.h b/libminifi/include/core/FlowFile.h new file mode 100644 index 0000000..247ad26 --- /dev/null +++ b/libminifi/include/core/FlowFile.h @@ -0,0 +1,283 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef RECORD_H +#define RECORD_H + +#include "utils/TimeUtil.h" +#include "ResourceClaim.h" +#include "Connectable.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { + +class FlowFile { + public: + FlowFile(); + ~FlowFile(); + FlowFile& operator=(const FlowFile& other); + + /** + * Returns a pointer to this flow file record's + * claim + */ + std::shared_ptr<ResourceClaim> getResourceClaim(); + /** + * Sets _claim to the inbound claim argument + */ + void setResourceClaim(std::shared_ptr<ResourceClaim> &claim); + + /** + * clear the resource claim + */ + void clearResourceClaim(); + + /** + * Get lineage identifiers + */ + std::set<std::string> &getlineageIdentifiers(); + + /** + * Returns whether or not this flow file record + * is marked as deleted. + * @return marked deleted + */ + bool isDeleted(); + + /** + * Sets whether to mark this flow file record + * as deleted + * @param deleted deleted flag + */ + void setDeleted(const bool deleted); + + /** + * Get entry date for this record + * @return entry date uint64_t + */ + uint64_t getEntryDate(); + + /** + * Gets the event time. + * @return event time. + */ + uint64_t getEventTime(); + /** + * Get lineage start date + * @return lineage start date uint64_t + */ + uint64_t getlineageStartDate(); + + /** + * Sets the lineage start date + * @param date new lineage start date + */ + void setLineageStartDate(const uint64_t date); + + void setLineageIdentifiers(std::set<std::string> lineage_Identifiers) { + lineage_Identifiers_ = lineage_Identifiers; + } + /** + * Obtains an attribute if it exists. If it does the value is + * copied into value + * @param key key to look for + * @param value value to set + * @return result of finding key + */ + bool getAttribute(std::string key, std::string &value); + + /** + * Updates the value in the attribute map that corresponds + * to key + * @param key attribute name + * @param value value to set to attribute name + * @return result of finding key + */ + bool updateAttribute(const std::string key, const std::string value); + + /** + * Removes the attribute + * @param key attribute name to remove + * @return result of finding key + */ + bool removeAttribute(const std::string key); + + /** + * setAttribute, if attribute already there, update it, else, add it + */ + void setAttribute(const std::string &key, const std::string &value) { + attributes_[key] = value; + } + + /** + * Returns the map of attributes + * @return attributes. + */ + std::map<std::string, std::string> getAttributes() { + return attributes_; + } + + /** + * adds an attribute if it does not exist + * + */ + bool addAttribute(const std::string &key, const std::string &value); + + /** + * Set the size of this record. + * @param size size of record to set.Ã + */ + void setSize(const uint64_t size) { + size_ = size; + } + /** + * Returns the size of corresponding flow file + * @return size as a uint64_t + */ + uint64_t getSize(); + + /** + * Sets the offset + * @param offset offset to apply to this record. + */ + void setOffset(const uint64_t offset) { + offset_ = offset; + } + + /** + * Sets the penalty expiration + * @param penaltyExp new penalty expiration + */ + void setPenaltyExpiration(const uint64_t penaltyExp) { + penaltyExpiration_ms_ = penaltyExp; + } + + uint64_t getPenaltyExpiration() { + return penaltyExpiration_ms_; + } + + /** + * Gets the offset within the flow file + * @return size as a uint64_t + */ + uint64_t getOffset(); + + // Get the UUID as string + std::string getUUIDStr() { + return uuid_str_; + } + + bool getUUID(uuid_t other) + { + uuid_copy(other,uuid_); + return true; + } + + // Check whether it is still being penalized + bool isPenalized() { + return ( + penaltyExpiration_ms_ > 0 ? + penaltyExpiration_ms_ > getTimeMillis() : false); + } + + /** + * Sets the original connection with a shared pointer. + * @param connection shared connection. + */ + void setConnection(std::shared_ptr<core::Connectable> &connection); + + /** + * Sets the original connection with a shared pointer. + * @param connection shared connection. + */ + void setConnection(std::shared_ptr<core::Connectable> &&connection); + + /** + * Returns the connection referenced by this record. + * @return shared connection pointer. + */ + std::shared_ptr<core::Connectable> getConnection(); + /** + * Sets the original connection with a shared pointer. + * @param connection shared connection. + */ + void setOriginalConnection(std::shared_ptr<core::Connectable> &connection); + /** + * Returns the original connection referenced by this record. + * @return shared original connection pointer. + */ + std::shared_ptr<core::Connectable> getOriginalConnection(); + + void setStoredToRepository(bool storedInRepository) { + stored = storedInRepository; + } + + bool isStored() { + return stored; + } + + protected: + bool stored; + // Mark for deletion + bool marked_delete_; + // Date at which the flow file entered the flow + uint64_t entry_date_; + // event time + uint64_t event_time_; + // Date at which the origin of this flow file entered the flow + uint64_t lineage_start_date_; + // Date at which the flow file was queued + uint64_t last_queue_date_; + // Size in bytes of the data corresponding to this flow file + uint64_t size_; + // A global unique identifier + uuid_t uuid_; + // A local unique identifier + uint64_t id_; + // Offset to the content + uint64_t offset_; + // Penalty expiration + uint64_t penaltyExpiration_ms_; + // Attributes key/values pairs for the flow record + std::map<std::string, std::string> attributes_; + // Pointer to the associated content resource claim + std::shared_ptr<ResourceClaim> claim_; + // UUID string + std::string uuid_str_; + // UUID string for all parents + std::set<std::string> lineage_Identifiers_; + + // Logger + std::shared_ptr<logging::Logger> logger_; + + // Connection queue that this flow file will be transfer or current in + std::shared_ptr<core::Connectable> connection_; + // Orginal connection queue that this flow file was dequeued from + std::shared_ptr<core::Connectable> original_connection_; + +}; + +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif // RECORD_H http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/core/ProcessContext.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/ProcessContext.h b/libminifi/include/core/ProcessContext.h new file mode 100644 index 0000000..1da85cd --- /dev/null +++ b/libminifi/include/core/ProcessContext.h @@ -0,0 +1,114 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __PROCESS_CONTEXT_H__ +#define __PROCESS_CONTEXT_H__ + +#include <uuid/uuid.h> +#include <vector> +#include <queue> +#include <map> +#include <mutex> +#include <atomic> +#include <algorithm> + +#include "Property.h" +#include "core/logging/Logger.h" +#include "ProcessorNode.h" +#include "core/Repository.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { + +// ProcessContext Class +class ProcessContext { + public: + // Constructor + /*! + * Create a new process context associated with the processor/controller service/state manager + */ + ProcessContext(ProcessorNode &processor, + std::shared_ptr<core::Repository> repo) + : processor_node_(processor) { + logger_ = logging::Logger::getLogger(); + repo_ = repo; + } + // Destructor + virtual ~ProcessContext() { + } + // Get Processor associated with the Process Context + ProcessorNode &getProcessorNode() { + return processor_node_; + } + bool getProperty(std::string name, std::string &value) { + return processor_node_.getProperty(name, value); + } + // Sets the property value using the property's string name + bool setProperty(std::string name, std::string value) { + return processor_node_.setProperty(name, value); + } + // Sets the property value using the Property object + bool setProperty(Property prop, std::string value) { + return processor_node_.setProperty(prop, value); + } + // Whether the relationship is supported + bool isSupportedRelationship(Relationship relationship) { + return processor_node_.isSupportedRelationship(relationship); + } + + // Check whether the relationship is auto terminated + bool isAutoTerminated(Relationship relationship) { + return processor_node_.isAutoTerminated(relationship); + } + // Get ProcessContext Maximum Concurrent Tasks + uint8_t getMaxConcurrentTasks(void) { + return processor_node_.getMaxConcurrentTasks(); + } + // Yield based on the yield period + void yield() { + processor_node_.yield(); + } + + std::shared_ptr<core::Repository> getProvenanceRepository() { + return repo_; + } + + // Prevent default copy constructor and assignment operation + // Only support pass by reference or pointer + ProcessContext(const ProcessContext &parent) = delete; + ProcessContext &operator=(const ProcessContext &parent) = delete; + + private: + + // repository shared pointer. + std::shared_ptr<core::Repository> repo_; + // Processor + ProcessorNode processor_node_; + // Logger + std::shared_ptr<logging::Logger> logger_; + +}; + +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/core/ProcessGroup.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/ProcessGroup.h b/libminifi/include/core/ProcessGroup.h new file mode 100644 index 0000000..75bb0ba --- /dev/null +++ b/libminifi/include/core/ProcessGroup.h @@ -0,0 +1,190 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __PROCESS_GROUP_H__ +#define __PROCESS_GROUP_H__ + +#include <uuid/uuid.h> +#include <vector> +#include <queue> +#include <map> +#include <mutex> +#include <atomic> +#include <algorithm> +#include <set> + +#include "Processor.h" +#include "Exception.h" +#include "TimerDrivenSchedulingAgent.h" +#include "EventDrivenSchedulingAgent.h" +#include "core/logging/Logger.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { + +// Process Group Type +enum ProcessGroupType { + ROOT_PROCESS_GROUP = 0, + REMOTE_PROCESS_GROUP, + MAX_PROCESS_GROUP_TYPE +}; + +// ProcessGroup Class +class ProcessGroup { + public: + // Constructor + /*! + * Create a new process group + */ + ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid = NULL, + ProcessGroup *parent = NULL); + // Destructor + virtual ~ProcessGroup(); + // Set Processor Name + void setName(std::string name) { + name_ = name; + } + // Get Process Name + std::string getName(void) { + return (name_); + } + // Set URL + void setURL(std::string url) { + url_ = url; + } + // Get URL + std::string getURL(void) { + return (url_); + } + // SetTransmitting + void setTransmitting(bool val) { + transmitting_ = val; + } + // Get Transmitting + bool getTransmitting() { + return transmitting_; + } + // setTimeOut + void setTimeOut(uint64_t time) { + timeOut_ = time; + } + uint64_t getTimeOut() { + return timeOut_; + } + // Set Processor yield period in MilliSecond + void setYieldPeriodMsec(uint64_t period) { + yield_period_msec_ = period; + } + // Get Processor yield period in MilliSecond + uint64_t getYieldPeriodMsec(void) { + return (yield_period_msec_); + } + // Set UUID + void setUUID(uuid_t uuid) { + uuid_copy(uuid_, uuid); + } + // Get UUID + bool getUUID(uuid_t uuid) { + if (uuid) { + uuid_copy(uuid, uuid_); + return true; + } else + return false; + } + // Start Processing + void startProcessing(TimerDrivenSchedulingAgent *timeScheduler, + EventDrivenSchedulingAgent *eventScheduler); + // Stop Processing + void stopProcessing(TimerDrivenSchedulingAgent *timeScheduler, + EventDrivenSchedulingAgent *eventScheduler); + // Whether it is root process group + bool isRootProcessGroup(); + // set parent process group + void setParent(ProcessGroup *parent) { + std::lock_guard<std::mutex> lock(mutex_); + parent_process_group_ = parent; + } + // get parent process group + ProcessGroup *getParent(void) { + std::lock_guard<std::mutex> lock(mutex_); + return parent_process_group_; + } + // Add processor + void addProcessor(std::shared_ptr<Processor> processor); + // Remove processor + void removeProcessor(std::shared_ptr<Processor> processor); + // Add child processor group + void addProcessGroup(ProcessGroup *child); + // Remove child processor group + void removeProcessGroup(ProcessGroup *child); + // ! Add connections + void addConnection(std::shared_ptr<Connection> connection); + // findProcessor based on UUID + std::shared_ptr<Processor> findProcessor(uuid_t uuid); + // findProcessor based on name + std::shared_ptr<Processor> findProcessor(const std::string &processorName); + // removeConnection + void removeConnection(std::shared_ptr<Connection> connection); + // update property value + void updatePropertyValue(std::string processorName, std::string propertyName, + std::string propertyValue); + + void getConnections( + std::map<std::string, std::shared_ptr<Connection>> &connectionMap); + + protected: + // A global unique identifier + uuid_t uuid_; + // Processor Group Name + std::string name_; + // Process Group Type + ProcessGroupType type_; + // Processors (ProcessNode) inside this process group which include Input/Output Port, Remote Process Group input/Output port + std::set<std::shared_ptr<Processor> > processors_; + std::set<ProcessGroup *> child_process_groups_; + // Connections between the processor inside the group; + std::set<std::shared_ptr<Connection> > connections_; + // Parent Process Group + ProcessGroup* parent_process_group_; + // Yield Period in Milliseconds + std::atomic<uint64_t> yield_period_msec_; + std::atomic<uint64_t> timeOut_; + // URL + std::string url_; + // Transmitting + std::atomic<bool> transmitting_; + + private: + + // Mutex for protection + std::mutex mutex_; + // Logger + std::shared_ptr<logging::Logger> logger_; + // Prevent default copy constructor and assignment operation + // Only support pass by reference or pointer + ProcessGroup(const ProcessGroup &parent); + ProcessGroup &operator=(const ProcessGroup &parent); +}; +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/core/ProcessSession.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/ProcessSession.h b/libminifi/include/core/ProcessSession.h new file mode 100644 index 0000000..b516817 --- /dev/null +++ b/libminifi/include/core/ProcessSession.h @@ -0,0 +1,167 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __PROCESS_SESSION_H__ +#define __PROCESS_SESSION_H__ + +#include <uuid/uuid.h> +#include <vector> +#include <queue> +#include <map> +#include <mutex> +#include <atomic> +#include <algorithm> +#include <set> + +#include "ProcessContext.h" +#include "FlowFileRecord.h" +#include "Exception.h" +#include "core/logging/Logger.h" +#include "FlowFile.h" +#include "provenance/Provenance.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { + +// ProcessSession Class +class ProcessSession { + public: + // Constructor + /*! + * Create a new process session + */ + ProcessSession(ProcessContext *processContext = NULL) + : process_context_(processContext) { + logger_ = logging::Logger::getLogger(); + logger_->log_trace("ProcessSession created for %s", + process_context_->getProcessorNode().getName().c_str()); + auto repo = processContext->getProvenanceRepository(); + provenance_report_ = new provenance::ProvenanceReporter( + repo, process_context_->getProcessorNode().getUUIDStr(), + process_context_->getProcessorNode().getName()); + } + +// Destructor + virtual ~ProcessSession() { + if (provenance_report_) + delete provenance_report_; + } +// Commit the session + void commit(); +// Roll Back the session + void rollback(); +// Get Provenance Report + provenance::ProvenanceReporter *getProvenanceReporter() { + return provenance_report_; + } +// +// Get the FlowFile from the highest priority queue + std::shared_ptr<core::FlowFile> get(); +// Create a new UUID FlowFile with no content resource claim and without parent + std::shared_ptr<core::FlowFile> create(); +// Create a new UUID FlowFile with no content resource claim and inherit all attributes from parent + std::shared_ptr<core::FlowFile> create(std::shared_ptr<core::FlowFile> &parent); +// Clone a new UUID FlowFile from parent both for content resource claim and attributes + std::shared_ptr<core::FlowFile> clone( + std::shared_ptr<core::FlowFile> &parent); +// Clone a new UUID FlowFile from parent for attributes and sub set of parent content resource claim + std::shared_ptr<core::FlowFile> clone(std::shared_ptr<core::FlowFile> &parent, + long offset, long size); +// Duplicate a FlowFile with the same UUID and all attributes and content resource claim for the roll back of the session + std::shared_ptr<core::FlowFile> duplicate( + std::shared_ptr<core::FlowFile> &original); +// Transfer the FlowFile to the relationship + void transfer(std::shared_ptr<core::FlowFile> &flow, + Relationship relationship); + void transfer(std::shared_ptr<core::FlowFile> &&flow, + Relationship relationship); +// Put Attribute + void putAttribute(std::shared_ptr<core::FlowFile> &flow, std::string key, + std::string value); + void putAttribute(std::shared_ptr<core::FlowFile> &&flow, std::string key, + std::string value); +// Remove Attribute + void removeAttribute(std::shared_ptr<core::FlowFile> &flow, std::string key); + void removeAttribute(std::shared_ptr<core::FlowFile> &&flow, std::string key); +// Remove Flow File + void remove(std::shared_ptr<core::FlowFile> &flow); + void remove(std::shared_ptr<core::FlowFile> &&flow); +// Execute the given read callback against the content + void read(std::shared_ptr<core::FlowFile> &flow, + InputStreamCallback *callback); + void read(std::shared_ptr<core::FlowFile> &&flow, + InputStreamCallback *callback); +// Execute the given write callback against the content + void write(std::shared_ptr<core::FlowFile> &flow, + OutputStreamCallback *callback); + void write(std::shared_ptr<core::FlowFile> &&flow, + OutputStreamCallback *callback); +// Execute the given write/append callback against the content + void append(std::shared_ptr<core::FlowFile> &flow, + OutputStreamCallback *callback); + void append(std::shared_ptr<core::FlowFile> &&flow, + OutputStreamCallback *callback); +// Penalize the flow + void penalize(std::shared_ptr<core::FlowFile> &flow); + void penalize(std::shared_ptr<core::FlowFile> &&flow); +// Import the existed file into the flow + void import(std::string source, std::shared_ptr<core::FlowFile> &flow, + bool keepSource = true, uint64_t offset = 0); + void import(std::string source, std::shared_ptr<core::FlowFile> &&flow, + bool keepSource = true, uint64_t offset = 0); + +// Prevent default copy constructor and assignment operation +// Only support pass by reference or pointer + ProcessSession(const ProcessSession &parent) = delete; + ProcessSession &operator=(const ProcessSession &parent) = delete; + + protected: +// FlowFiles being modified by current process session + std::map<std::string, std::shared_ptr<core::FlowFile> > _updatedFlowFiles; +// Copy of the original FlowFiles being modified by current process session as above + std::map<std::string, std::shared_ptr<core::FlowFile> > _originalFlowFiles; +// FlowFiles being added by current process session + std::map<std::string, std::shared_ptr<core::FlowFile> > _addedFlowFiles; +// FlowFiles being deleted by current process session + std::map<std::string, std::shared_ptr<core::FlowFile> > _deletedFlowFiles; +// FlowFiles being transfered to the relationship + std::map<std::string, Relationship> _transferRelationship; +// FlowFiles being cloned for multiple connections per relationship + std::map<std::string, std::shared_ptr<core::FlowFile> > _clonedFlowFiles; + + private: +// Clone the flow file during transfer to multiple connections for a relationship + std::shared_ptr<core::FlowFile> cloneDuringTransfer( + std::shared_ptr<core::FlowFile> &parent); +// ProcessContext + ProcessContext *process_context_; +// Logger + std::shared_ptr<logging::Logger> logger_; +// Provenance Report + provenance::ProvenanceReporter *provenance_report_; + +} +; +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/core/ProcessSessionFactory.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/ProcessSessionFactory.h b/libminifi/include/core/ProcessSessionFactory.h new file mode 100644 index 0000000..e0ebe18 --- /dev/null +++ b/libminifi/include/core/ProcessSessionFactory.h @@ -0,0 +1,64 @@ +/** + * @file ProcessSessionFactory.h + * ProcessSessionFactory class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __PROCESS_SESSION_FACTORY_H__ +#define __PROCESS_SESSION_FACTORY_H__ + +#include <memory> + +#include "ProcessContext.h" +#include "ProcessSession.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { + +// ProcessSessionFactory Class +class ProcessSessionFactory { + public: + // Constructor + /*! + * Create a new process session factory + */ + explicit ProcessSessionFactory(ProcessContext *processContext) + : process_context_(processContext) { + } + + // Create the session + std::unique_ptr<ProcessSession> createSession(); + + // Prevent default copy constructor and assignment operation + // Only support pass by reference or pointer + ProcessSessionFactory(const ProcessSessionFactory &parent) = delete; + ProcessSessionFactory &operator=(const ProcessSessionFactory &parent) = delete; + + private: + // ProcessContext + ProcessContext *process_context_; + +}; + +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/core/Processor.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/Processor.h b/libminifi/include/core/Processor.h new file mode 100644 index 0000000..fd0411f --- /dev/null +++ b/libminifi/include/core/Processor.h @@ -0,0 +1,270 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __PROCESSOR_H__ +#define __PROCESSOR_H__ + +#include <uuid/uuid.h> +#include <vector> +#include <queue> +#include <map> +#include <mutex> +#include <memory> +#include <condition_variable> +#include <atomic> +#include <algorithm> +#include <set> +#include <chrono> +#include <functional> + +#include "Connectable.h" +#include "ConfigurableComponent.h" +#include "Property.h" +#include "utils/TimeUtil.h" +#include "Relationship.h" +#include "Connection.h" +#include "ProcessContext.h" +#include "ProcessSession.h" +#include "ProcessSessionFactory.h" +#include "Scheduling.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { + +// Minimum scheduling period in Nano Second +#define MINIMUM_SCHEDULING_NANOS 30000 + +// Default yield period in second +#define DEFAULT_YIELD_PERIOD_SECONDS 1 + +// Default penalization period in second +#define DEFAULT_PENALIZATION_PERIOD_SECONDS 30 + +// Processor Class +class Processor : public Connectable, public ConfigurableComponent, + public std::enable_shared_from_this<Processor> { + + public: + // Constructor + /*! + * Create a new processor + */ + Processor(std::string name, uuid_t uuid = NULL); + // Destructor + virtual ~Processor() { + } + + bool isRunning(); + // Set Processor Scheduled State + void setScheduledState(ScheduledState state); + // Get Processor Scheduled State + ScheduledState getScheduledState(void) { + return state_; + } + // Set Processor Scheduling Strategy + void setSchedulingStrategy(SchedulingStrategy strategy) { + strategy_ = strategy; + } + // Get Processor Scheduling Strategy + SchedulingStrategy getSchedulingStrategy(void) { + return strategy_; + } + // Set Processor Loss Tolerant + void setlossTolerant(bool lossTolerant) { + loss_tolerant_ = lossTolerant; + } + // Get Processor Loss Tolerant + bool getlossTolerant(void) { + return loss_tolerant_; + } + // Set Processor Scheduling Period in Nano Second + void setSchedulingPeriodNano(uint64_t period) { + uint64_t minPeriod = MINIMUM_SCHEDULING_NANOS; + scheduling_period_nano_ = std::max(period, minPeriod); + } + // Get Processor Scheduling Period in Nano Second + uint64_t getSchedulingPeriodNano(void) { + return scheduling_period_nano_; + } + // Set Processor Run Duration in Nano Second + void setRunDurationNano(uint64_t period) { + run_durantion_nano_ = period; + } + // Get Processor Run Duration in Nano Second + uint64_t getRunDurationNano(void) { + return (run_durantion_nano_); + } + // Set Processor yield period in MilliSecond + void setYieldPeriodMsec(uint64_t period) { + yield_period_msec_ = period; + } + // Get Processor yield period in MilliSecond + uint64_t getYieldPeriodMsec(void) { + return (yield_period_msec_); + } + // Set Processor penalization period in MilliSecond + void setPenalizationPeriodMsec(uint64_t period) { + _penalizationPeriodMsec = period; + } + + + // Set Processor Maximum Concurrent Tasks + void setMaxConcurrentTasks(uint8_t tasks) { + max_concurrent_tasks_ = tasks; + } + // Get Processor Maximum Concurrent Tasks + uint8_t getMaxConcurrentTasks(void) { + return (max_concurrent_tasks_); + } + // Set Trigger when empty + void setTriggerWhenEmpty(bool value) { + _triggerWhenEmpty = value; + } + // Get Trigger when empty + bool getTriggerWhenEmpty(void) { + return (_triggerWhenEmpty); + } + // Get Active Task Counts + uint8_t getActiveTasks(void) { + return (active_tasks_); + } + // Increment Active Task Counts + void incrementActiveTasks(void) { + active_tasks_++; + } + // decrement Active Task Counts + void decrementActiveTask(void) { + active_tasks_--; + } + void clearActiveTask(void) { + active_tasks_ = 0; + } + // Yield based on the yield period + void yield() { + yield_expiration_ = (getTimeMillis() + yield_period_msec_); + } + // Yield based on the input time + void yield(uint64_t time) { + yield_expiration_ = (getTimeMillis() + time); + } + // whether need be to yield + bool isYield() { + if (yield_expiration_ > 0) + return (yield_expiration_ >= getTimeMillis()); + else + return false; + } + // clear yield expiration + void clearYield() { + yield_expiration_ = 0; + } + // get yield time + uint64_t getYieldTime() { + uint64_t curTime = getTimeMillis(); + if (yield_expiration_ > curTime) + return (yield_expiration_ - curTime); + else + return 0;; + } + // Whether flow file queued in incoming connection + bool flowFilesQueued(); + // Whether flow file queue full in any of the outgoin connection + bool flowFilesOutGoingFull(); + + // Get outgoing connections based on relationship name + std::set<std::shared_ptr<Connection> > getOutGoingConnections( + std::string relationship); + // Add connection + bool addConnection(std::shared_ptr<Connectable> connection); + // Remove connection + void removeConnection(std::shared_ptr<Connectable> connection); + // Get the UUID as string + std::string getUUIDStr() { + return uuidStr_; + } + // Get the Next RoundRobin incoming connection + std::shared_ptr<Connection> getNextIncomingConnection(); + // On Trigger + void onTrigger(ProcessContext *context, + ProcessSessionFactory *sessionFactory); + + virtual bool canEdit() { + return !isRunning(); + } + + public: + + + // OnTrigger method, implemented by NiFi Processor Designer + virtual void onTrigger(ProcessContext *context, ProcessSession *session) = 0; + // Initialize, overridden by NiFi Process Designer + virtual void initialize() { + } + // Scheduled event hook, overridden by NiFi Process Designer + virtual void onSchedule(ProcessContext *context, + ProcessSessionFactory *sessionFactory) { + } + + protected: + + // Processor state + std::atomic<ScheduledState> state_; + + // lossTolerant + std::atomic<bool> loss_tolerant_; + // SchedulePeriod in Nano Seconds + std::atomic<uint64_t> scheduling_period_nano_; + // Run Duration in Nano Seconds + std::atomic<uint64_t> run_durantion_nano_; + // Yield Period in Milliseconds + std::atomic<uint64_t> yield_period_msec_; + + // Active Tasks + std::atomic<uint8_t> active_tasks_; + // Trigger the Processor even if the incoming connection is empty + std::atomic<bool> _triggerWhenEmpty; + +private: + + // Mutex for protection + std::mutex mutex_; + // Yield Expiration + std::atomic<uint64_t> yield_expiration_; + + + // Check all incoming connections for work + bool isWorkAvailable(); + // Logger + std::shared_ptr<logging::Logger> logger_; + // Prevent default copy constructor and assignment operation + // Only support pass by reference or pointer + Processor(const Processor &parent); + Processor &operator=(const Processor &parent); + +}; + +} +/* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/core/ProcessorConfig.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/ProcessorConfig.h b/libminifi/include/core/ProcessorConfig.h new file mode 100644 index 0000000..6b4a00a --- /dev/null +++ b/libminifi/include/core/ProcessorConfig.h @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CORE_PROCESSORCONFIG_H_ +#define LIBMINIFI_INCLUDE_CORE_PROCESSORCONFIG_H_ + +#include "core/core.h" +#include "core/Property.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { + + +struct ProcessorConfig { + std::string id; + std::string name; + std::string javaClass; + std::string maxConcurrentTasks; + std::string schedulingStrategy; + std::string schedulingPeriod; + std::string penalizationPeriod; + std::string yieldPeriod; + std::string runDurationNanos; + std::vector<std::string> autoTerminatedRelationships; + std::vector<core::Property> properties; +}; + + + +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + + +#endif /* LIBMINIFI_INCLUDE_CORE_PROCESSORCONFIG_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/core/ProcessorNode.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/ProcessorNode.h b/libminifi/include/core/ProcessorNode.h new file mode 100644 index 0000000..8836f62 --- /dev/null +++ b/libminifi/include/core/ProcessorNode.h @@ -0,0 +1,246 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef LIBMINIFI_INCLUDE_PROCESSOR_PROCESSORNODE_H_ +#define LIBMINIFI_INCLUDE_PROCESSOR_PROCESSORNODE_H_ + +#include "ConfigurableComponent.h" +#include "Connectable.h" +#include "Property.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { + +/** + * Processor node functions as a pass through to the implementing Connectables + * ProcessorNode can be used by itself or with a pass through object, in which case + * we need to function as a passthrough or not. + */ +class ProcessorNode : public ConfigurableComponent, public Connectable { + public: + explicit ProcessorNode(const std::shared_ptr<Connectable> processor); + + explicit ProcessorNode(const ProcessorNode &other); + + /** + * Get property using the provided name. + * @param name property name. + * @param value value passed in by reference + * @return result of getting property. + */ + std::shared_ptr<Connectable> getProcessor() const { + return processor_; + } + + void yield() { + processor_->yield(); + } + + /** + * Get property using the provided name. + * @param name property name. + * @param value value passed in by reference + * @return result of getting property. + */ + bool getProperty(const std::string name, std::string &value) { + const std::shared_ptr<ConfigurableComponent> processor_cast = + std::dynamic_pointer_cast<ConfigurableComponent>(processor_); + if (nullptr != processor_cast) + return processor_cast->getProperty(name, value); + else { + return ConfigurableComponent::getProperty(name, value); + } + } + /** + * Sets the property using the provided name + * @param property name + * @param value property value. + * @return result of setting property. + */ + bool setProperty(const std::string name, std::string value) { + const std::shared_ptr<ConfigurableComponent> processor_cast = + std::dynamic_pointer_cast<ConfigurableComponent>(processor_); + bool ret = ConfigurableComponent::setProperty(name, value); + if (nullptr != processor_cast) + ret = processor_cast->setProperty(name, value); + + return ret; + + } + + /** + * Sets the property using the provided name + * @param property name + * @param value property value. + * @return whether property was set or not + */ + bool setProperty(Property &prop, std::string value) { + const std::shared_ptr<ConfigurableComponent> processor_cast = + std::dynamic_pointer_cast<ConfigurableComponent>(processor_); + bool ret = ConfigurableComponent::setProperty(prop, value); + if (nullptr != processor_cast) + ret = processor_cast->setProperty(prop, value); + + return ret; + } + + /** + * Sets supported properties for the ConfigurableComponent + * @param supported properties + * @return result of set operation. + */ + bool setSupportedProperties(std::set<Property> properties) { + const std::shared_ptr<ConfigurableComponent> processor_cast = + std::dynamic_pointer_cast<ConfigurableComponent>(processor_); + bool ret = ConfigurableComponent::setSupportedProperties(properties); + if (nullptr != processor_cast) + ret = processor_cast->setSupportedProperties(properties); + + return ret; + } + /** + * Sets supported properties for the ConfigurableComponent + * @param supported properties + * @return result of set operation. + */ + + bool setAutoTerminatedRelationships(std::set<Relationship> relationships) { + return processor_->setAutoTerminatedRelationships(relationships); + } + + bool isAutoTerminated(Relationship relationship) { + return processor_->isAutoTerminated(relationship); + } + + bool setSupportedRelationships(std::set<Relationship> relationships) { + return processor_->setSupportedRelationships(relationships); + } + + bool isSupportedRelationship(Relationship relationship) { + return processor_->isSupportedRelationship(relationship); + } + + /** + * Set name. + * @param name + */ + void setName(const std::string name) { + Connectable::setName(name); + processor_->setName(name); + } + + /** + * Set UUID in this instance + * @param uuid uuid to apply to the internal representation. + */ + void setUUID(uuid_t uuid) { + Connectable::setUUID(uuid); + processor_->setUUID(uuid); + } + +// Get Processor penalization period in MilliSecond + uint64_t getPenalizationPeriodMsec(void) { + return processor_->getPenalizationPeriodMsec(); + } + + /** + * Get outgoing connection based on relationship + * @return set of outgoing connections. + */ + std::set<std::shared_ptr<Connectable>> getOutGoingConnections( + std::string relationship) { + return processor_->getOutGoingConnections(relationship); + } + + /** + * Get next incoming connection + * @return next incoming connection + */ + std::shared_ptr<Connectable> getNextIncomingConnection() { + return processor_->getNextIncomingConnection(); + } + + /** + * @return true if incoming connections > 0 + */ + bool hasIncomingConnections() { + return processor_->hasIncomingConnections(); + } + + /** + * Returns the UUID through the provided object. + * @param uuid uuid struct to which we will copy the memory + * @return success of request + */ + bool getUUID(uuid_t uuid) { + return processor_->getUUID(uuid); + } + + unsigned const char *getUUID() { + return processor_->getUUID(); + } + /** + * Return the UUID string + * @param constant reference to the UUID str + */ + const std::string & getUUIDStr() const { + return processor_->getUUIDStr(); + } + +// Get Process Name + std::string getName() const { + return processor_->getName(); + } + + uint8_t getMaxConcurrentTasks() { + return processor_->getMaxConcurrentTasks(); + } + + void setMaxConcurrentTasks(const uint8_t tasks) { + processor_->setMaxConcurrentTasks(tasks); + } + + virtual bool isRunning(); + + virtual bool isWorkAvailable(); + + virtual ~ProcessorNode(); + + protected: + + virtual bool canEdit() { + return !processor_->isRunning(); + } + + /** + * internal connectable. + */ + std::shared_ptr<Connectable> processor_; + +} +; + +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_PROCESSOR_PROCESSORNODE_H_ */
