MINIFI-217: Updates namespaces and removes use of raw pointers for user facing API.
MINIFI-226: Separate YamlConfiguration from FlowController. This closes #63. Signed-off-by: Aldrin Piri <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/44704b36 Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/44704b36 Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/44704b36 Branch: refs/heads/master Commit: 44704b363ccc422bb4fe2e1a6dcc184e9213bd68 Parents: bca0a06 Author: Marc Parisi <[email protected]> Authored: Mon Feb 27 14:02:58 2017 -0500 Committer: Aldrin Piri <[email protected]> Committed: Tue Mar 28 13:05:11 2017 -0400 ---------------------------------------------------------------------- CMakeLists.txt | 6 + libminifi/CMakeLists.txt | 4 +- libminifi/include/AppendHostInfo.h | 67 - libminifi/include/BaseLogger.h | 207 -- libminifi/include/Configure.h | 127 - libminifi/include/Connection.h | 329 ++- libminifi/include/EventDrivenSchedulingAgent.h | 62 +- libminifi/include/Exception.h | 28 +- libminifi/include/ExecuteProcess.h | 112 - libminifi/include/FlowControlProtocol.h | 506 ++-- libminifi/include/FlowController.h | 452 ++-- libminifi/include/FlowFileRecord.h | 330 +-- libminifi/include/FlowFileRepository.h | 204 -- libminifi/include/GenerateFlowFile.h | 87 - libminifi/include/GetFile.h | 117 - libminifi/include/ListenHTTP.h | 116 - libminifi/include/ListenSyslog.h | 209 -- libminifi/include/LogAppenders.h | 298 --- libminifi/include/LogAttribute.h | 128 - libminifi/include/Logger.h | 200 -- libminifi/include/ProcessContext.h | 113 - libminifi/include/ProcessGroup.h | 187 -- libminifi/include/ProcessSession.h | 125 - libminifi/include/ProcessSessionFactory.h | 52 - libminifi/include/Processor.h | 365 --- libminifi/include/Property.h | 259 -- libminifi/include/Provenance.h | 604 ----- libminifi/include/PutFile.h | 88 - libminifi/include/RealTimeDataCollector.h | 131 - libminifi/include/Relationship.h | 87 - libminifi/include/RemoteProcessorGroupPort.h | 134 +- libminifi/include/Repository.h | 318 --- libminifi/include/ResourceClaim.h | 46 +- libminifi/include/SchedulingAgent.h | 138 +- libminifi/include/Site2SiteClientProtocol.h | 1003 ++++---- libminifi/include/Site2SitePeer.h | 455 ++-- libminifi/include/TailFile.h | 93 - libminifi/include/ThreadedSchedulingAgent.h | 77 +- libminifi/include/TimerDrivenSchedulingAgent.h | 63 +- libminifi/include/core/ConfigurableComponent.h | 104 + libminifi/include/core/ConfigurationFactory.h | 65 + libminifi/include/core/Connectable.h | 165 ++ libminifi/include/core/FlowConfiguration.h | 118 + libminifi/include/core/FlowFile.h | 283 +++ libminifi/include/core/ProcessContext.h | 114 + libminifi/include/core/ProcessGroup.h | 190 ++ libminifi/include/core/ProcessSession.h | 167 ++ libminifi/include/core/ProcessSessionFactory.h | 64 + libminifi/include/core/Processor.h | 270 ++ libminifi/include/core/ProcessorConfig.h | 53 + libminifi/include/core/ProcessorNode.h | 246 ++ libminifi/include/core/Property.h | 264 ++ libminifi/include/core/Relationship.h | 96 + libminifi/include/core/Repository.h | 153 ++ libminifi/include/core/RepositoryFactory.h | 44 + libminifi/include/core/Scheduling.h | 64 + libminifi/include/core/core.h | 177 ++ libminifi/include/core/logging/BaseLogger.h | 224 ++ libminifi/include/core/logging/LogAppenders.h | 301 +++ libminifi/include/core/logging/Logger.h | 214 ++ .../core/repository/FlowFileRepository.h | 169 ++ libminifi/include/core/yaml/YamlConfiguration.h | 99 + libminifi/include/io/BaseStream.h | 256 +- libminifi/include/io/CRCStream.h | 415 ++- libminifi/include/io/ClientSocket.h | 411 +-- libminifi/include/io/DataStream.h | 188 +- libminifi/include/io/EndianCheck.h | 44 +- libminifi/include/io/Serializable.h | 309 +-- libminifi/include/io/SocketFactory.h | 91 - libminifi/include/io/Sockets.h | 27 + libminifi/include/io/StreamFactory.h | 138 + libminifi/include/io/TLSSocket.h | 187 -- libminifi/include/io/tls/TLSSocket.h | 198 ++ libminifi/include/io/validation.h | 29 +- libminifi/include/processors/AppendHostInfo.h | 80 + libminifi/include/processors/ExecuteProcess.h | 125 + libminifi/include/processors/GenerateFlowFile.h | 98 + libminifi/include/processors/GetFile.h | 129 + libminifi/include/processors/ListenHTTP.h | 126 + libminifi/include/processors/ListenSyslog.h | 216 ++ libminifi/include/processors/LogAttribute.h | 130 + libminifi/include/processors/PutFile.h | 101 + .../include/processors/RealTimeDataCollector.h | 145 ++ libminifi/include/processors/TailFile.h | 105 + libminifi/include/properties/Configure.h | 131 + libminifi/include/provenance/Provenance.h | 560 +++++ .../include/provenance/ProvenanceRepository.h | 166 ++ libminifi/include/utils/FailurePolicy.h | 13 + libminifi/include/utils/StringUtils.h | 15 +- libminifi/src/AppendHostInfo.cpp | 97 - libminifi/src/BaseLogger.cpp | 153 -- libminifi/src/Configure.cpp | 40 +- libminifi/src/Connection.cpp | 302 +-- libminifi/src/EventDrivenSchedulingAgent.cpp | 55 +- libminifi/src/ExecuteProcess.cpp | 251 -- libminifi/src/FlowControlProtocol.cpp | 884 ++++--- libminifi/src/FlowController.cpp | 954 ++----- libminifi/src/FlowFileRecord.cpp | 530 ++-- libminifi/src/FlowFileRepository.cpp | 282 --- libminifi/src/GenerateFlowFile.cpp | 135 - libminifi/src/GetFile.cpp | 329 --- libminifi/src/ListenHTTP.cpp | 395 --- libminifi/src/ListenSyslog.cpp | 343 --- libminifi/src/LogAppenders.cpp | 25 - libminifi/src/LogAttribute.cpp | 159 -- libminifi/src/Logger.cpp | 28 - libminifi/src/ProcessGroup.cpp | 307 --- libminifi/src/ProcessSession.cpp | 790 ------ libminifi/src/ProcessSessionFactory.cpp | 28 - libminifi/src/Processor.cpp | 526 ---- libminifi/src/Provenance.cpp | 566 ----- libminifi/src/PutFile.cpp | 203 -- libminifi/src/RealTimeDataCollector.cpp | 481 ---- libminifi/src/RemoteProcessorGroupPort.cpp | 191 +- libminifi/src/Repository.cpp | 140 -- libminifi/src/ResourceClaim.cpp | 47 +- libminifi/src/SchedulingAgent.cpp | 107 +- libminifi/src/Site2SiteClientProtocol.cpp | 2377 +++++++++--------- libminifi/src/Site2SitePeer.cpp | 12 + libminifi/src/TailFile.cpp | 269 -- libminifi/src/ThreadedSchedulingAgent.cpp | 216 +- libminifi/src/TimerDrivenSchedulingAgent.cpp | 50 +- libminifi/src/core/ConfigurableComponent.cpp | 143 ++ libminifi/src/core/ConfigurationFactory.cpp | 81 + libminifi/src/core/Connectable.cpp | 174 ++ libminifi/src/core/Core.cpp | 51 + libminifi/src/core/FlowConfiguration.cpp | 110 + libminifi/src/core/ProcessGroup.cpp | 312 +++ libminifi/src/core/ProcessSession.cpp | 941 +++++++ libminifi/src/core/ProcessSessionFactory.cpp | 42 + libminifi/src/core/Processor.cpp | 272 ++ libminifi/src/core/ProcessorNode.cpp | 65 + libminifi/src/core/Property.cpp | 57 + libminifi/src/core/Record.cpp | 223 ++ libminifi/src/core/Repository.cpp | 65 + libminifi/src/core/RepositoryFactory.cpp | 69 + libminifi/src/core/logging/BaseLogger.cpp | 161 ++ libminifi/src/core/logging/LogAppenders.cpp | 39 + libminifi/src/core/logging/Logger.cpp | 40 + .../src/core/repository/FlowFileRepository.cpp | 109 + libminifi/src/core/yaml/YamlConfiguration.cpp | 490 ++++ libminifi/src/io/BaseStream.cpp | 39 +- libminifi/src/io/ClientSocket.cpp | 641 +++-- libminifi/src/io/DataStream.cpp | 12 + libminifi/src/io/EndianCheck.cpp | 13 +- libminifi/src/io/Serializable.cpp | 13 +- libminifi/src/io/SocketFactory.cpp | 24 - libminifi/src/io/StreamFactory.cpp | 37 + libminifi/src/io/TLSSocket.cpp | 237 -- libminifi/src/io/tls/TLSSocket.cpp | 249 ++ libminifi/src/processors/AppendHostInfo.cpp | 124 + libminifi/src/processors/ExecuteProcess.cpp | 255 ++ libminifi/src/processors/GenerateFlowFile.cpp | 145 ++ libminifi/src/processors/GetFile.cpp | 340 +++ libminifi/src/processors/ListenHTTP.cpp | 380 +++ libminifi/src/processors/ListenSyslog.cpp | 331 +++ libminifi/src/processors/LogAttribute.cpp | 176 ++ libminifi/src/processors/PutFile.cpp | 213 ++ .../src/processors/RealTimeDataCollector.cpp | 480 ++++ libminifi/src/processors/TailFile.cpp | 271 ++ libminifi/src/provenance/Provenance.cpp | 578 +++++ .../src/provenance/ProvenanceRepository.cpp | 75 + libminifi/test/Server.cpp | 38 +- libminifi/test/TestBase.h | 81 +- libminifi/test/nodefs/NoLevelDB.cpp | 34 + libminifi/test/nodefs/NoYamlConfiguration.cpp | 38 + libminifi/test/unit/CRCTests.cpp | 76 +- libminifi/test/unit/LoggerTests.cpp | 212 +- libminifi/test/unit/ProcessorTests.cpp | 308 ++- libminifi/test/unit/PropertyTests.cpp | 31 +- libminifi/test/unit/ProvenanceTestHelper.h | 273 +- libminifi/test/unit/ProvenanceTests.cpp | 112 +- libminifi/test/unit/SerializationTests.cpp | 10 +- libminifi/test/unit/SocketTests.cpp | 169 +- libminifi/test/unit/TimeUtilsTest.cpp | 21 + main/CMakeLists.txt | 2 +- main/MiNiFiMain.cpp | 237 +- 177 files changed, 19820 insertions(+), 17175 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/CMakeLists.txt b/CMakeLists.txt index 418220d..5d7875e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -128,6 +128,12 @@ enable_testing(test) target_include_directories(tests PRIVATE BEFORE ${LEVELDB_INCLUDE_DIRS}) target_include_directories(tests PRIVATE BEFORE "include") target_include_directories(tests PRIVATE BEFORE "libminifi/include/") + target_include_directories(tests PRIVATE BEFORE "libminifi/include/core") + target_include_directories(tests PRIVATE BEFORE "libminifi/include/core/repository") + target_include_directories(tests PRIVATE BEFORE "libminifi/include/io") + target_include_directories(tests PRIVATE BEFORE "libminifi/include/utils") + target_include_directories(tests PRIVATE BEFORE "libminifi/include/processors") + target_include_directories(tests PRIVATE BEFORE "libminifi/include/provenance") target_link_libraries(tests ${CMAKE_THREAD_LIBS_INIT} ${UUID_LIBRARIES} ${LEVELDB_LIBRARIES} ${OPENSSL_LIBRARIES} minifi yaml-cpp c-library civetweb-cpp) add_test(NAME LibMinifiTests COMMAND tests) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt index 69975ad..4c71cc1 100644 --- a/libminifi/CMakeLists.txt +++ b/libminifi/CMakeLists.txt @@ -48,7 +48,7 @@ include(CheckCXXCompilerFlag) CHECK_CXX_COMPILER_FLAG("-std=c++11" COMPILER_SUPPORTS_CXX11) CHECK_CXX_COMPILER_FLAG("-std=c++0x" COMPILER_SUPPORTS_CXX0X) if(COMPILER_SUPPORTS_CXX11) - set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -g") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") elseif(COMPILER_SUPPORTS_CXX0X) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++0x") else() @@ -60,7 +60,7 @@ include_directories(../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include) include_directories(../thirdparty/civetweb-1.9.1/include) include_directories(include) -file(GLOB SOURCES "src/*.cpp" "src/io/*.cpp" "src/utils/*.cpp") +file(GLOB SOURCES "src/core/logging/*.cpp" "src/io/*.cpp" "src/io/tls/*.cpp" "src/core/*.cpp" "src/core/repository/*.cpp" "src/core/yaml/*.cpp" "src/provenance/*.cpp" "src/processors/*.cpp" "src/*.cpp") file(GLOB SPD_SOURCES "../include/spdlog/*") # Workaround the limitations of having a http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/AppendHostInfo.h ---------------------------------------------------------------------- diff --git a/libminifi/include/AppendHostInfo.h b/libminifi/include/AppendHostInfo.h deleted file mode 100644 index 8d9dd8f..0000000 --- a/libminifi/include/AppendHostInfo.h +++ /dev/null @@ -1,67 +0,0 @@ -/** - * @file AppendHostInfo.h - * AppendHostInfo class declaration - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __APPEND_HOSTINFO_H__ -#define __APPEND_HOSTINFO_H__ - -#include "FlowFileRecord.h" -#include "Processor.h" -#include "ProcessSession.h" - -//! AppendHostInfo Class -class AppendHostInfo : public Processor -{ -public: - //! Constructor - /*! - * Create a new processor - */ - AppendHostInfo(std::string name, uuid_t uuid = NULL) - : Processor(name, uuid) - { - logger_ = Logger::getLogger(); - } - //! Destructor - virtual ~AppendHostInfo() - { - } - //! Processor Name - static const std::string ProcessorName; - //! Supported Properties - static Property InterfaceName; - static Property HostAttribute; - static Property IPAttribute; - - //! Supported Relationships - static Relationship Success; - -public: - //! OnTrigger method, implemented by NiFi AppendHostInfo - virtual void onTrigger(ProcessContext *context, ProcessSession *session); - //! Initialize, over write by NiFi AppendHostInfo - virtual void initialize(void); - -protected: - -private: - //! Logger - std::shared_ptr<Logger> logger_; -}; - -#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/BaseLogger.h ---------------------------------------------------------------------- diff --git a/libminifi/include/BaseLogger.h b/libminifi/include/BaseLogger.h deleted file mode 100644 index e9d2a02..0000000 --- a/libminifi/include/BaseLogger.h +++ /dev/null @@ -1,207 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef LIBMINIFI_INCLUDE_BASELOGGER_H_ -#define LIBMINIFI_INCLUDE_BASELOGGER_H_ - -#include <string> -#include <memory> -#include "spdlog/spdlog.h" -#include <iostream> -//! 5M default log file size -#define DEFAULT_LOG_FILE_SIZE (5*1024*1024) -//! 3 log files rotation -#define DEFAULT_LOG_FILE_NUMBER 3 -#define LOG_NAME "minifi log" -#define LOG_FILE_NAME "minifi-app.log" - -/** - * Log level enumeration. - */ -typedef enum { - trace = 0, debug = 1, info = 2, warn = 3, err = 4, critical = 5, off = 6 -} LOG_LEVEL_E; - -#define LOG_BUFFER_SIZE 1024 -#define FILL_BUFFER char buffer[LOG_BUFFER_SIZE]; \ - va_list args; \ - va_start(args, format); \ - std::vsnprintf(buffer, LOG_BUFFER_SIZE,format, args); \ - va_end(args); - -/** - * Base class that represents a logger configuration. - */ -class BaseLogger { - -public: - static const char *nifi_log_level; - static const char *nifi_log_appender; - - /** - * Base Constructor - */ - BaseLogger() { - setLogLevel("info"); - logger_ = nullptr; - stderr_ = nullptr; - } - - /** - * Logger configuration constructorthat will set the base log level. - * @param config incoming configuration. - */ - BaseLogger(std::string log_level, std::shared_ptr<spdlog::logger> logger) : logger_(logger) { - setLogLevel(log_level); - - } - - virtual ~BaseLogger() { - - } - - /** - * Move constructor that will atomically swap configuration - * shared pointers. - */ - BaseLogger(const BaseLogger &&other) : - configured_level_(other.configured_level_.load()) { - // must atomically exchange the pointers - logger_ = std::move(other.logger_); - set_error_logger(other.stderr_); - - } - - /** - * Returns the log level for this instance. - */ - virtual LOG_LEVEL_E getLogLevel() const { - return configured_level_; - } - - /** - * @brief Log error message - * @param format format string ('man printf' for syntax) - * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match - */ - virtual void log_error(const char * const format, ...); - /** - * @brief Log warn message - * @param format format string ('man printf' for syntax) - * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match - */ - virtual void log_warn(const char * const format, ...); - /** - * @brief Log info message - * @param format format string ('man printf' for syntax) - * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match - */ - virtual void log_info(const char * const format, ...); - /** - * @brief Log debug message - * @param format format string ('man printf' for syntax) - * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match - */ - virtual void log_debug(const char * const format, ...); - /** - * @brief Log trace message - * @param format format string ('man printf' for syntax) - * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match - */ - virtual void log_trace(const char * const format, ...); - - /** - * @brief Log error message - * @param format format string ('man printf' for syntax) - * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match - */ - virtual void log_str(LOG_LEVEL_E level,const std::string &buffer); - - /** - * Sets the log level for this instance based on the string - * @param level desired log leve. - * @param defaultLevel default level if we cannot match level. - */ - virtual void setLogLevel(const std::string &level, - LOG_LEVEL_E defaultLevel = info); - - /** - * Sets the log level atomic and sets it - * within logger if it can - * @param level desired log level. - */ - virtual void setLogLevel(LOG_LEVEL_E level) { - configured_level_ = level; - setLogLevel(); - } - - bool shouldLog(LOG_LEVEL_E level) - { - return level >= configured_level_.load(std::memory_order_relaxed); - } - - /** - * Move operator overload - */ - BaseLogger &operator=(const BaseLogger &&other) { - configured_level_ = (other.configured_level_.load()); - // must atomically exchange the pointers - logger_ = std::move(other.logger_); - set_error_logger(other.stderr_); - return *this; - } - -protected: - - - - /** - * Logger configuration constructorthat will set the base log level. - * @param config incoming configuration. - */ - BaseLogger(std::string log_level) : logger_(nullptr) { - setLogLevel(log_level); - } - - - void setLogger(std::shared_ptr<spdlog::logger> logger) { - logger_ = logger; - } - - /** - * Since a thread may be using stderr and it can be null, - * we must atomically exchange the shared pointers. - * @param other other shared pointer. can be null ptr - */ - void set_error_logger(std::shared_ptr<spdlog::logger> other); - - /** - * Sets the log level on the spdlogger if it is not null. - */ - void setLogLevel() { - if (logger_ != nullptr) - logger_->set_level( - (spdlog::level::level_enum) configured_level_.load()); - - } - - std::atomic<LOG_LEVEL_E> configured_level_; - std::shared_ptr<spdlog::logger> logger_; - std::shared_ptr<spdlog::logger> stderr_; -}; - -#endif /* LIBMINIFI_INCLUDE_BASELOGGER_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/Configure.h ---------------------------------------------------------------------- diff --git a/libminifi/include/Configure.h b/libminifi/include/Configure.h deleted file mode 100644 index 6f0d198..0000000 --- a/libminifi/include/Configure.h +++ /dev/null @@ -1,127 +0,0 @@ -/** - * @file Configure.h - * Configure class declaration - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __CONFIGURE_H__ -#define __CONFIGURE_H__ - -#include <stdio.h> -#include <string> -#include <map> -#include <stdlib.h> -#include <errno.h> -#include <iostream> -#include <fstream> -#include "Logger.h" - -class Configure { -public: - //! Get the singleton logger instance - static Configure * getConfigure() - { - if (!configure_) - { - configure_ = new Configure(); - } - return configure_; - } - //! nifi.flow.configuration.file - static const char *nifi_flow_configuration_file; - static const char *nifi_administrative_yield_duration; - static const char *nifi_bored_yield_duration; - static const char *nifi_graceful_shutdown_seconds; - static const char *nifi_log_level; - static const char *nifi_server_name; - static const char *nifi_server_port; - static const char *nifi_server_report_interval; - static const char *nifi_provenance_repository_max_storage_time; - static const char *nifi_provenance_repository_max_storage_size; - static const char *nifi_provenance_repository_directory_default; - static const char *nifi_provenance_repository_enable; - static const char *nifi_flowfile_repository_max_storage_time; - static const char *nifi_flowfile_repository_max_storage_size; - static const char *nifi_flowfile_repository_directory_default; - static const char *nifi_flowfile_repository_enable; - static const char *nifi_remote_input_secure; - static const char *nifi_security_need_ClientAuth; - static const char *nifi_security_client_certificate; - static const char *nifi_security_client_private_key; - static const char *nifi_security_client_pass_phrase; - static const char *nifi_security_client_ca_certificate; - - //! Clear the load config - void clear() - { - std::lock_guard<std::mutex> lock(_mtx); - _properties.clear(); - } - //! Set the config value - void set(std::string key, std::string value) - { - std::lock_guard<std::mutex> lock(_mtx); - _properties[key] = value; - } - //! Check whether the config value existed - bool has(std::string key) - { - std::lock_guard<std::mutex> lock(_mtx); - return (_properties.find(key) != _properties.end()); - } - //! Get the config value - bool get(std::string key, std::string &value); - //! Parse one line in configure file like key=value - void parseConfigureFileLine(char *buf); - //! Load Configure File - void loadConfigureFile(const char *fileName); - //! Set the determined MINIFI_HOME - void setHome(std::string minifiHome) - { - _minifiHome = minifiHome; - } - - //! Get the determined MINIFI_HOME - std::string getHome() - { - return _minifiHome; - } - //! Parse Command Line - void parseCommandLine(int argc, char **argv); - -private: - //! Mutex for protection - std::mutex _mtx; - //! Logger - std::shared_ptr<Logger> logger_; - //! Home location for this executable - std::string _minifiHome; - - Configure() - { - logger_ = Logger::getLogger(); - } - virtual ~Configure() - { - - } - static Configure *configure_; - -protected: - std::map<std::string,std::string> _properties; -}; - -#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/Connection.h ---------------------------------------------------------------------- diff --git a/libminifi/include/Connection.h b/libminifi/include/Connection.h index 5af0d2f..1c7b9a4 100644 --- a/libminifi/include/Connection.h +++ b/libminifi/include/Connection.h @@ -27,180 +27,171 @@ #include <mutex> #include <atomic> #include <algorithm> +#include "core/core.h" +#include "core/Connectable.h" +#include "core/logging/Logger.h" +#include "core/Relationship.h" +#include "core/Connectable.h" +#include "core/FlowFile.h" +#include "core/Repository.h" -#include "FlowFileRecord.h" -#include "Logger.h" -#include "Relationship.h" +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +// Connection Class -//! Forwarder declaration -class Processor; +class Connection : public core::Connectable, + public std::enable_shared_from_this<Connection> { + public: + // Constructor + /* + * Create a new processor + */ + explicit Connection(std::shared_ptr<core::Repository> flow_repository, + std::string name, uuid_t uuid = NULL, uuid_t srcUUID = + NULL, + uuid_t destUUID = NULL); + // Destructor + virtual ~Connection() { + } -//! Connection Class -class Connection -{ -public: - //! Constructor - /*! - * Create a new processor - */ - explicit Connection(std::string name, uuid_t uuid = NULL, uuid_t srcUUID = NULL, uuid_t destUUID = NULL); - //! Destructor - virtual ~Connection() {} - //! Set Connection Name - void setName(std::string name) { - _name = name; - } - //! Get Process Name - std::string getName(void) { - return (_name); - } - //! Set UUID - void setUUID(uuid_t uuid) { - uuid_copy(_uuid, uuid); - } - //! Set Source Processor UUID - void setSourceProcessorUUID(uuid_t uuid) { - uuid_copy(_srcUUID, uuid); - } - //! Set Destination Processor UUID - void setDestinationProcessorUUID(uuid_t uuid) { - uuid_copy(_destUUID, uuid); - } - //! Get Source Processor UUID - void getSourceProcessorUUID(uuid_t uuid) { - uuid_copy(uuid, _srcUUID); - } - //! Get Destination Processor UUID - void getDestinationProcessorUUID(uuid_t uuid) { - uuid_copy(uuid, _destUUID); - } - //! Get UUID - bool getUUID(uuid_t uuid) { - if (uuid) - { - uuid_copy(uuid, _uuid); - return true; - } - else - return false; - } - //! Get UUID Str - std::string getUUIDStr() { - return _uuidStr; - } - //! Set Connection Source Processor - void setSourceProcessor(Processor *source) { - _srcProcessor = source; - } - // ! Get Connection Source Processor - Processor *getSourceProcessor() { - return _srcProcessor; - } - //! Set Connection Destination Processor - void setDestinationProcessor(Processor *dest) { - _destProcessor = dest; - } - // ! Get Connection Destination Processor - Processor *getDestinationProcessor() { - return _destProcessor; - } - //! Set Connection relationship - void setRelationship(Relationship relationship) { - _relationship = relationship; - } - // ! Get Connection relationship - Relationship getRelationship() { - return _relationship; - } - //! Set Max Queue Size - void setMaxQueueSize(uint64_t size) - { - _maxQueueSize = size; - } - //! Get Max Queue Size - uint64_t getMaxQueueSize() - { - return _maxQueueSize; - } - //! Set Max Queue Data Size - void setMaxQueueDataSize(uint64_t size) - { - _maxQueueDataSize = size; - } - //! Get Max Queue Data Size - uint64_t getMaxQueueDataSize() - { - return _maxQueueDataSize; - } - //! Set Flow expiration duration in millisecond - void setFlowExpirationDuration(uint64_t duration) - { - _expiredDuration = duration; - } - //! Get Flow expiration duration in millisecond - uint64_t getFlowExpirationDuration() - { - return _expiredDuration; - } - //! Check whether the queue is empty - bool isEmpty(); - //! Check whether the queue is full to apply back pressure - bool isFull(); - //! Get queue size - uint64_t getQueueSize() { - std::lock_guard<std::mutex> lock(_mtx); - return _queue.size(); - } - //! Get queue data size - uint64_t getQueueDataSize() - { - return _maxQueueDataSize; - } - //! Put the flow file into queue - void put(FlowFileRecord *flow); - //! Poll the flow file from queue, the expired flow file record also being returned - FlowFileRecord *poll(std::set<FlowFileRecord *> &expiredFlowRecords); - //! Drain the flow records - void drain(); + // Set Source Processor UUID + void setSourceUUID(uuid_t uuid) { + uuid_copy(src_uuid_, uuid); + } + // Set Destination Processor UUID + void setDestinationUUID(uuid_t uuid) { + uuid_copy(dest_uuid_, uuid); + } + // Get Source Processor UUID + void getSourceUUID(uuid_t uuid) { + uuid_copy(uuid, src_uuid_); + } + // Get Destination Processor UUID + void getDestinationUUID(uuid_t uuid) { + uuid_copy(uuid, dest_uuid_); + } -protected: - //! A global unique identifier - uuid_t _uuid; - //! Source Processor UUID - uuid_t _srcUUID; - //! Destination Processor UUID - uuid_t _destUUID; - //! Connection Name - std::string _name; - //! Relationship for this connection - Relationship _relationship; - //! Source Processor (ProcessNode/Port) - Processor *_srcProcessor; - //! Destination Processor (ProcessNode/Port) - Processor *_destProcessor; - //! Max queue size to apply back pressure - std::atomic<uint64_t> _maxQueueSize; - //! Max queue data size to apply back pressure - std::atomic<uint64_t> _maxQueueDataSize; - //! Flow File Expiration Duration in= MilliSeconds - std::atomic<uint64_t> _expiredDuration; - //! UUID string - std::string _uuidStr; + // Set Connection Source Processor + void setSource(std::shared_ptr<core::Connectable> source) { + source_connectable_ = source; + } + // ! Get Connection Source Processor + std::shared_ptr<core::Connectable> getSource() { + return source_connectable_; + } + // Set Connection Destination Processor + void setDestination(std::shared_ptr<core::Connectable> dest) { + dest_connectable_ = dest; + } + // ! Get Connection Destination Processor + std::shared_ptr<core::Connectable> getDestination() { + return dest_connectable_; + } + // Set Connection relationship + void setRelationship(core::Relationship relationship) { + relationship_ = relationship; + } + // ! Get Connection relationship + core::Relationship getRelationship() { + return relationship_; + } + // Set Max Queue Size + void setMaxQueueSize(uint64_t size) { + max_queue_size_ = size; + } + // Get Max Queue Size + uint64_t getMaxQueueSize() { + return max_queue_size_; + } + // Set Max Queue Data Size + void setMaxQueueDataSize(uint64_t size) { + max_data_queue_size_ = size; + } + // Get Max Queue Data Size + uint64_t getMaxQueueDataSize() { + return max_data_queue_size_; + } + // Set Flow expiration duration in millisecond + void setFlowExpirationDuration(uint64_t duration) { + expired_duration_ = duration; + } + // Get Flow expiration duration in millisecond + uint64_t getFlowExpirationDuration() { + return expired_duration_; + } + // Check whether the queue is empty + bool isEmpty(); + // Check whether the queue is full to apply back pressure + bool isFull(); + // Get queue size + uint64_t getQueueSize() { + std::lock_guard<std::mutex> lock(mutex_); + return queue_.size(); + } + // Get queue data size + uint64_t getQueueDataSize() { + return max_data_queue_size_; + } + // Put the flow file into queue + void put(std::shared_ptr<core::FlowFile> flow); + // Poll the flow file from queue, the expired flow file record also being returned + std::shared_ptr<core::FlowFile> poll( + std::set<std::shared_ptr<core::FlowFile>> &expiredFlowRecords); + // Drain the flow records + void drain(); -private: - //! Mutex for protection - std::mutex _mtx; - //! Queued data size - std::atomic<uint64_t> _queuedDataSize; - //! Queue for the Flow File - std::queue<FlowFileRecord *> _queue; - //! Logger - std::shared_ptr<Logger> logger_; - // Prevent default copy constructor and assignment operation - // Only support pass by reference or pointer - Connection(const Connection &parent); - Connection &operator=(const Connection &parent); + void yield() { -}; + } + + bool isWorkAvailable() { + return !isEmpty(); + } + + bool isRunning() { + return true; + } + protected: + // Source Processor UUID + uuid_t src_uuid_; + // Destination Processor UUID + uuid_t dest_uuid_; + // Relationship for this connection + core::Relationship relationship_; + // Source Processor (ProcessNode/Port) + std::shared_ptr<core::Connectable> source_connectable_; + // Destination Processor (ProcessNode/Port) + std::shared_ptr<core::Connectable> dest_connectable_; + // Max queue size to apply back pressure + std::atomic<uint64_t> max_queue_size_; + // Max queue data size to apply back pressure + std::atomic<uint64_t> max_data_queue_size_; + // Flow File Expiration Duration in= MilliSeconds + std::atomic<uint64_t> expired_duration_; + // flow file repository + std::shared_ptr<core::Repository> flow_repository_; + + private: + // Mutex for protection + std::mutex mutex_; + // Queued data size + std::atomic<uint64_t> queued_data_size_; + // Queue for the Flow File + std::queue<std::shared_ptr<core::FlowFile>> queue_; + // flow repository + // Logger + std::shared_ptr<logging::Logger> logger_; + // Prevent default copy constructor and assignment operation + // Only support pass by reference or pointer + Connection(const Connection &parent); + Connection &operator=(const Connection &parent); + +}; +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ #endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/EventDrivenSchedulingAgent.h ---------------------------------------------------------------------- diff --git a/libminifi/include/EventDrivenSchedulingAgent.h b/libminifi/include/EventDrivenSchedulingAgent.h index b32f84f..9d53c5c 100644 --- a/libminifi/include/EventDrivenSchedulingAgent.h +++ b/libminifi/include/EventDrivenSchedulingAgent.h @@ -20,36 +20,46 @@ #ifndef __EVENT_DRIVEN_SCHEDULING_AGENT_H__ #define __EVENT_DRIVEN_SCHEDULING_AGENT_H__ -#include "Logger.h" -#include "Processor.h" -#include "ProcessContext.h" +#include "core/logging/Logger.h" +#include "core/Processor.h" +#include "core/ProcessContext.h" +#include "core/ProcessSessionFactory.h" #include "ThreadedSchedulingAgent.h" -//! EventDrivenSchedulingAgent Class -class EventDrivenSchedulingAgent : public ThreadedSchedulingAgent -{ -public: - //! Constructor - /*! - * Create a new processor - */ - EventDrivenSchedulingAgent() - : ThreadedSchedulingAgent() - { - } - //! Destructor - virtual ~EventDrivenSchedulingAgent() - { - } - //! Run function for the thread - void run(Processor *processor, ProcessContext *processContext, ProcessSessionFactory *sessionFactory); +namespace org { +namespace apache { +namespace nifi { +namespace minifi { -private: - // Prevent default copy constructor and assignment operation - // Only support pass by reference or pointer - EventDrivenSchedulingAgent(const EventDrivenSchedulingAgent &parent); - EventDrivenSchedulingAgent &operator=(const EventDrivenSchedulingAgent &parent); +// EventDrivenSchedulingAgent Class +class EventDrivenSchedulingAgent : public ThreadedSchedulingAgent { + public: + // Constructor + /*! + * Create a new processor + */ + EventDrivenSchedulingAgent(std::shared_ptr<core::Repository> repo) + : ThreadedSchedulingAgent(repo) { + } + // Destructor + virtual ~EventDrivenSchedulingAgent() { + } + // Run function for the thread + void run(std::shared_ptr<core::Processor> processor, + core::ProcessContext *processContext, + core::ProcessSessionFactory *sessionFactory); + + private: + // Prevent default copy constructor and assignment operation + // Only support pass by reference or pointer + EventDrivenSchedulingAgent(const EventDrivenSchedulingAgent &parent); + EventDrivenSchedulingAgent &operator=( + const EventDrivenSchedulingAgent &parent); }; +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ #endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/Exception.h ---------------------------------------------------------------------- diff --git a/libminifi/include/Exception.h b/libminifi/include/Exception.h index 1e02fa5..a0c70e6 100644 --- a/libminifi/include/Exception.h +++ b/libminifi/include/Exception.h @@ -26,7 +26,13 @@ #include <errno.h> #include <string.h> -//! ExceptionType + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { + +// ExceptionType enum ExceptionType { FILE_OPERATION_EXCEPTION = 0, @@ -39,7 +45,7 @@ enum ExceptionType MAX_EXCEPTION }; -//! Exception String +// Exception String static const char *ExceptionStr[MAX_EXCEPTION] = { "File Operation", @@ -51,7 +57,7 @@ static const char *ExceptionStr[MAX_EXCEPTION] = "General Operation" }; -//! Exception Type to String +// Exception Type to String inline const char *ExceptionTypeToString(ExceptionType type) { if (type < MAX_EXCEPTION) @@ -60,17 +66,17 @@ inline const char *ExceptionTypeToString(ExceptionType type) return NULL; } -//! Exception Class +// Exception Class class Exception : public std::exception { public: - //! Constructor + // Constructor /*! * Create a new flow record */ Exception(ExceptionType type, const char *errorMsg) : _type(type), _errorMsg(errorMsg) { } - //! Destructor + // Destructor virtual ~Exception() throw () {} virtual const char * what() const throw () { @@ -82,13 +88,17 @@ public: private: - //! Exception type + // Exception type ExceptionType _type; - //! Exception detailed information + // Exception detailed information std::string _errorMsg; - //! Hold the what result + // Hold the what result mutable std::string _whatStr; }; +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ #endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/ExecuteProcess.h ---------------------------------------------------------------------- diff --git a/libminifi/include/ExecuteProcess.h b/libminifi/include/ExecuteProcess.h deleted file mode 100644 index 3ddd815..0000000 --- a/libminifi/include/ExecuteProcess.h +++ /dev/null @@ -1,112 +0,0 @@ -/** - * @file ExecuteProcess.h - * ExecuteProcess class declaration - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __EXECUTE_PROCESS_H__ -#define __EXECUTE_PROCESS_H__ - -#include <stdio.h> -#include <unistd.h> -#include <string> -#include <errno.h> -#include <chrono> -#include <thread> -#include <unistd.h> -#include <sys/wait.h> -#include <iostream> -#include <sys/types.h> -#include <signal.h> -#include "FlowFileRecord.h" -#include "Processor.h" -#include "ProcessSession.h" - -//! ExecuteProcess Class -class ExecuteProcess : public Processor -{ -public: - //! Constructor - /*! - * Create a new processor - */ - ExecuteProcess(std::string name, uuid_t uuid = NULL) - : Processor(name, uuid) - { - logger_ = Logger::getLogger(); - _redirectErrorStream = false; - _batchDuration = 0; - _workingDir = "."; - _processRunning = false; - _pid = 0; - } - //! Destructor - virtual ~ExecuteProcess() - { - if (_processRunning && _pid > 0) - kill(_pid, SIGTERM); - } - //! Processor Name - static const std::string ProcessorName; - //! Supported Properties - static Property Command; - static Property CommandArguments; - static Property WorkingDir; - static Property BatchDuration; - static Property RedirectErrorStream; - //! Supported Relationships - static Relationship Success; - - //! Nest Callback Class for write stream - class WriteCallback : public OutputStreamCallback - { - public: - WriteCallback(char *data, uint64_t size) - : _data(data), _dataSize(size) {} - char *_data; - uint64_t _dataSize; - void process(std::ofstream *stream) { - if (_data && _dataSize > 0) - stream->write(_data, _dataSize); - } - }; - -public: - //! OnTrigger method, implemented by NiFi ExecuteProcess - virtual void onTrigger(ProcessContext *context, ProcessSession *session); - //! Initialize, over write by NiFi ExecuteProcess - virtual void initialize(void); - -protected: - -private: - //! Logger - std::shared_ptr<Logger> logger_; - //! Property - std::string _command; - std::string _commandArgument; - std::string _workingDir; - int64_t _batchDuration; - bool _redirectErrorStream; - //! Full command - std::string _fullCommand; - //! whether the process is running - bool _processRunning; - int _pipefd[2]; - pid_t _pid; -}; - -#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/FlowControlProtocol.h ---------------------------------------------------------------------- diff --git a/libminifi/include/FlowControlProtocol.h b/libminifi/include/FlowControlProtocol.h index ffc454a..c37c8f8 100644 --- a/libminifi/include/FlowControlProtocol.h +++ b/libminifi/include/FlowControlProtocol.h @@ -32,310 +32,288 @@ #include <errno.h> #include <chrono> #include <thread> -#include "Configure.h" -#include "Logger.h" -#include "Property.h" -//! Forwarder declaration +#include "core/Property.h" +#include "properties/Configure.h" +#include "core/logging/Logger.h" + + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +// Forwarder declaration class FlowController; #define DEFAULT_NIFI_SERVER_PORT 9000 #define DEFAULT_REPORT_INTERVAL 1000 // 1 sec #define MAX_READ_TIMEOUT 30000 // 30 seconds -//! FlowControl Protocol Msg Type +// FlowControl Protocol Msg Type typedef enum { - REGISTER_REQ, // Device Register Request from device to server which contain device serial number, current running flow YAML version - REGISTER_RESP, // Device Register Respond from server to device, may contain new flow.YAML from server ask device to apply and also device report interval - REPORT_REQ, // Period Device Report from device to server which contain device serial number, current running flow YAML name/version and other period report info - REPORT_RESP, // Report Respond from server to device, may ask device to update flow YAML or processor property - MAX_FLOW_CONTROL_MSG_TYPE + REGISTER_REQ, // Device Register Request from device to server which contain device serial number, current running flow YAML version + REGISTER_RESP, // Device Register Respond from server to device, may contain new flow.YAML from server ask device to apply and also device report interval + REPORT_REQ, // Period Device Report from device to server which contain device serial number, current running flow YAML name/version and other period report info + REPORT_RESP, // Report Respond from server to device, may ask device to update flow YAML or processor property + MAX_FLOW_CONTROL_MSG_TYPE } FlowControlMsgType; -//! FlowControl Protocol Msg Type String -static const char *FlowControlMsgTypeStr[MAX_FLOW_CONTROL_MSG_TYPE] = -{ - "REGISTER_REQ", - "REGISTER_RESP", - "REPORT_REQ", - "REPORT_RESP" -}; +// FlowControl Protocol Msg Type String +static const char *FlowControlMsgTypeStr[MAX_FLOW_CONTROL_MSG_TYPE] = { + "REGISTER_REQ", "REGISTER_RESP", "REPORT_REQ", "REPORT_RESP" }; -//! Flow Control Msg Type to String -inline const char *FlowControlMsgTypeToStr(FlowControlMsgType type) -{ - if (type < MAX_FLOW_CONTROL_MSG_TYPE) - return FlowControlMsgTypeStr[type]; - else - return NULL; +// Flow Control Msg Type to String +inline const char *FlowControlMsgTypeToStr(FlowControlMsgType type) { + if (type < MAX_FLOW_CONTROL_MSG_TYPE) + return FlowControlMsgTypeStr[type]; + else + return NULL; } -//! FlowControll Protocol Msg ID (Some Messages are fix length, Some are variable length (TLV) +// FlowControll Protocol Msg ID (Some Messages are fix length, Some are variable length (TLV) typedef enum { - //Fix length 8 bytes: client to server in register request, required field - FLOW_SERIAL_NUMBER, - // Flow YAML name TLV: client to server in register request and report request, required field - FLOW_YML_NAME, - // Flow YAML content, TLV: server to client in register respond, option field in case server want to ask client to load YAML from server - FLOW_YML_CONTENT, - // Fix length, 4 bytes Report interval in msec: server to client in register respond, option field - REPORT_INTERVAL, - // Processor Name TLV: server to client in report respond, option field in case server want to ask client to update processor property - PROCESSOR_NAME, - // Processor Property Name TLV: server to client in report respond, option field in case server want to ask client to update processor property - PROPERTY_NAME, - // Processor Property Value TLV: server to client in report respond, option field in case server want to ask client to update processor property - PROPERTY_VALUE, - // Report Blob TLV: client to server in report request, option field in case client want to pickyback the report blob in report request to server - REPORT_BLOB, - MAX_FLOW_MSG_ID + //Fix length 8 bytes: client to server in register request, required field + FLOW_SERIAL_NUMBER, + // Flow YAML name TLV: client to server in register request and report request, required field + FLOW_YML_NAME, + // Flow YAML content, TLV: server to client in register respond, option field in case server want to ask client to load YAML from server + FLOW_YML_CONTENT, + // Fix length, 4 bytes Report interval in msec: server to client in register respond, option field + REPORT_INTERVAL, + // Processor Name TLV: server to client in report respond, option field in case server want to ask client to update processor property + PROCESSOR_NAME, + // Processor Property Name TLV: server to client in report respond, option field in case server want to ask client to update processor property + PROPERTY_NAME, + // Processor Property Value TLV: server to client in report respond, option field in case server want to ask client to update processor property + PROPERTY_VALUE, + // Report Blob TLV: client to server in report request, option field in case client want to pickyback the report blob in report request to server + REPORT_BLOB, + MAX_FLOW_MSG_ID } FlowControlMsgID; -//! FlowControl Protocol Msg ID String -static const char *FlowControlMsgIDStr[MAX_FLOW_MSG_ID] = -{ - "FLOW_SERIAL_NUMBER", - "FLOW_YAML_NAME", - "FLOW_YAML_CONTENT", - "REPORT_INTERVAL", - "PROCESSOR_NAME" - "PROPERTY_NAME", - "PROPERTY_VALUE", - "REPORT_BLOB" -}; +// FlowControl Protocol Msg ID String +static const char *FlowControlMsgIDStr[MAX_FLOW_MSG_ID] = { + "FLOW_SERIAL_NUMBER", "FLOW_YAML_NAME", "FLOW_YAML_CONTENT", + "REPORT_INTERVAL", "PROCESSOR_NAME" + "PROPERTY_NAME", "PROPERTY_VALUE", "REPORT_BLOB" }; #define TYPE_HDR_LEN 4 // Fix Hdr Type #define TLV_HDR_LEN 8 // Type 4 bytes and Len 4 bytes -//! FlowControl Protocol Msg Len -inline int FlowControlMsgIDEncodingLen(FlowControlMsgID id, int payLoadLen) -{ - if (id == FLOW_SERIAL_NUMBER) - return (TYPE_HDR_LEN + 8); - else if (id == REPORT_INTERVAL) - return (TYPE_HDR_LEN + 4); - else if (id < MAX_FLOW_MSG_ID) - return (TLV_HDR_LEN + payLoadLen); - else - return -1; +// FlowControl Protocol Msg Len +inline int FlowControlMsgIDEncodingLen(FlowControlMsgID id, int payLoadLen) { + if (id == FLOW_SERIAL_NUMBER) + return (TYPE_HDR_LEN + 8); + else if (id == REPORT_INTERVAL) + return (TYPE_HDR_LEN + 4); + else if (id < MAX_FLOW_MSG_ID) + return (TLV_HDR_LEN + payLoadLen); + else + return -1; } -//! Flow Control Msg Id to String -inline const char *FlowControlMsgIdToStr(FlowControlMsgID id) -{ - if (id < MAX_FLOW_MSG_ID) - return FlowControlMsgIDStr[id]; - else - return NULL; +// Flow Control Msg Id to String +inline const char *FlowControlMsgIdToStr(FlowControlMsgID id) { + if (id < MAX_FLOW_MSG_ID) + return FlowControlMsgIDStr[id]; + else + return NULL; } -//! Flow Control Respond status code +// Flow Control Respond status code typedef enum { - RESP_SUCCESS, - RESP_TRIGGER_REGISTER, // Server respond to client report to re trigger register - RESP_START_FLOW_CONTROLLER, // Server respond to client to start flow controller - RESP_STOP_FLOW_CONTROLLER, // Server respond to client to stop flow controller - RESP_FAILURE, - MAX_RESP_CODE + RESP_SUCCESS, + RESP_TRIGGER_REGISTER, // Server respond to client report to re trigger register + RESP_START_FLOW_CONTROLLER, // Server respond to client to start flow controller + RESP_STOP_FLOW_CONTROLLER, // Server respond to client to stop flow controller + RESP_FAILURE, + MAX_RESP_CODE } FlowControlRespCode; -//! FlowControl Resp Code str -static const char *FlowControlRespCodeStr[MAX_RESP_CODE] = -{ - "RESP_SUCCESS", - "RESP_TRIGGER_REGISTER", - "RESP_START_FLOW_CONTROLLER", - "RESP_STOP_FLOW_CONTROLLER", - "RESP_FAILURE" -}; +// FlowControl Resp Code str +static const char *FlowControlRespCodeStr[MAX_RESP_CODE] = { "RESP_SUCCESS", + "RESP_TRIGGER_REGISTER", "RESP_START_FLOW_CONTROLLER", + "RESP_STOP_FLOW_CONTROLLER", "RESP_FAILURE" }; -//! Flow Control Resp Code to String -inline const char *FlowControlRespCodeToStr(FlowControlRespCode code) -{ - if (code < MAX_RESP_CODE) - return FlowControlRespCodeStr[code]; - else - return NULL; +// Flow Control Resp Code to String +inline const char *FlowControlRespCodeToStr(FlowControlRespCode code) { + if (code < MAX_RESP_CODE) + return FlowControlRespCodeStr[code]; + else + return NULL; } -//! Common FlowControlProtocol Header +// Common FlowControlProtocol Header typedef struct { - uint32_t msgType; //! Msg Type - uint32_t seqNumber; //! Seq Number to match Req with Resp - uint32_t status; //! Resp Code, see FlowControlRespCode - uint32_t payloadLen; //! Msg Payload length + uint32_t msgType; // Msg Type + uint32_t seqNumber; // Seq Number to match Req with Resp + uint32_t status; // Resp Code, see FlowControlRespCode + uint32_t payloadLen; // Msg Payload length } FlowControlProtocolHeader; -//! FlowControlProtocol Class -class FlowControlProtocol -{ -public: - //! Constructor - /*! - * Create a new control protocol - */ - FlowControlProtocol(FlowController *controller) { - _controller = controller; - logger_ = Logger::getLogger(); - configure_ = Configure::getConfigure(); - _socket = 0; - _serverName = "localhost"; - _serverPort = DEFAULT_NIFI_SERVER_PORT; - _registered = false; - _seqNumber = 0; - _reportBlob = NULL; - _reportBlobLen = 0; - _reportInterval = DEFAULT_REPORT_INTERVAL; - _running = false; +// FlowControlProtocol Class +class FlowControlProtocol { + public: + // Constructor + /*! + * Create a new control protocol + */ + FlowControlProtocol(FlowController *controller) { + _controller = controller; + logger_ = logging::Logger::getLogger(); + configure_ = Configure::getConfigure(); + _socket = 0; + _serverName = "localhost"; + _serverPort = DEFAULT_NIFI_SERVER_PORT; + _registered = false; + _seqNumber = 0; + _reportBlob = NULL; + _reportBlobLen = 0; + _reportInterval = DEFAULT_REPORT_INTERVAL; + running_ = false; - std::string value; + std::string value; - if (configure_->get(Configure::nifi_server_name, value)) - { - _serverName = value; - logger_->log_info("NiFi Server Name %s", _serverName.c_str()); - } - if (configure_->get(Configure::nifi_server_port, value) && Property::StringToInt(value, _serverPort)) - { - logger_->log_info("NiFi Server Port: [%d]", _serverPort); - } - if (configure_->get(Configure::nifi_server_report_interval, value)) - { - TimeUnit unit; - if (Property::StringToTime(value, _reportInterval, unit) && - Property::ConvertTimeUnitToMS(_reportInterval, unit, _reportInterval)) - { - logger_->log_info("NiFi server report interval: [%d] ms", _reportInterval); - } - } - else - _reportInterval = 0; - } - //! Destructor - virtual ~FlowControlProtocol() - { - stop(); - if (_socket) - close(_socket); - if (_reportBlob) - delete [] _reportBlob; - if (this->_thread) - delete this->_thread; - } + if (configure_->get(Configure::nifi_server_name, value)) { + _serverName = value; + logger_->log_info("NiFi Server Name %s", _serverName.c_str()); + } + if (configure_->get(Configure::nifi_server_port, value) + && core::Property::StringToInt( + value, _serverPort)) { + logger_->log_info("NiFi Server Port: [%d]", _serverPort); + } + if (configure_->get(Configure::nifi_server_report_interval, value)) { + core::TimeUnit unit; + if (core::Property::StringToTime( + value, _reportInterval, unit) + && core::Property::ConvertTimeUnitToMS( + _reportInterval, unit, _reportInterval)) { + logger_->log_info("NiFi server report interval: [%d] ms", + _reportInterval); + } + } else + _reportInterval = 0; + } + // Destructor + virtual ~FlowControlProtocol() { + stop(); + if (_socket) + close(_socket); + if (_reportBlob) + delete[] _reportBlob; + if (this->_thread) + delete this->_thread; + } -public: + public: - //! SendRegisterRequest and Process Register Respond, return 0 for success - int sendRegisterReq(); - //! SendReportReq and Process Report Respond, return 0 for success - int sendReportReq(); - //! Start the flow control protocol - void start(); - //! Stop the flow control protocol - void stop(); - //! Set Report BLOB for periodically report - void setReportBlob(char *blob, int len) - { - std::lock_guard<std::mutex> lock(_mtx); - if (_reportBlob && _reportBlobLen >= len) - { - memcpy(_reportBlob, blob, len); - _reportBlobLen = len; - } - else - { - if (_reportBlob) - delete[] _reportBlob; - _reportBlob = new char[len]; - _reportBlobLen = len; - } - } - //! Run function for the thread - static void run(FlowControlProtocol *protocol); - //! set 8 bytes SerialNumber - void setSerialNumber(uint8_t *number) - { - memcpy(_serialNumber, number, 8); - } + // SendRegisterRequest and Process Register Respond, return 0 for success + int sendRegisterReq(); + // SendReportReq and Process Report Respond, return 0 for success + int sendReportReq(); + // Start the flow control protocol + void start(); + // Stop the flow control protocol + void stop(); + // Set Report BLOB for periodically report + void setReportBlob(char *blob, int len) { + std::lock_guard<std::mutex> lock(mutex_); + if (_reportBlob && _reportBlobLen >= len) { + memcpy(_reportBlob, blob, len); + _reportBlobLen = len; + } else { + if (_reportBlob) + delete[] _reportBlob; + _reportBlob = new char[len]; + _reportBlobLen = len; + } + } + // Run function for the thread + static void run(FlowControlProtocol *protocol); + // set 8 bytes SerialNumber + void setSerialNumber(uint8_t *number) { + memcpy(_serialNumber, number, 8); + } -protected: + protected: -private: - //! Connect to the socket, return sock descriptor if success, 0 for failure - int connectServer(const char *host, uint16_t port); - //! Send Data via the socket, return -1 for failure - int sendData(uint8_t *buf, int buflen); - //! Read length into buf, return -1 for failure and 0 for EOF - int readData(uint8_t *buf, int buflen); - //! Select on the socket - int selectClient(int msec); - //! Read the header - int readHdr(FlowControlProtocolHeader *hdr); - //! encode uint32_t - uint8_t *encode(uint8_t *buf, uint32_t value) - { - *buf++ = (value & 0xFF000000) >> 24; - *buf++ = (value & 0x00FF0000) >> 16; - *buf++ = (value & 0x0000FF00) >> 8; - *buf++ = (value & 0x000000FF); - return buf; - } - //! encode uint32_t - uint8_t *decode(uint8_t *buf, uint32_t &value) - { - value = ((buf[0]<<24)|(buf[1]<<16)|(buf[2]<<8)|(buf[3])); - return (buf + 4); - } - //! encode byte array - uint8_t *encode(uint8_t *buf, uint8_t *bufArray, int size) - { - memcpy(buf, bufArray, size); - buf += size; - return buf; - } - //! encode std::string - uint8_t *encode(uint8_t *buf, std::string value) - { - // add the \0 for size - buf = encode(buf, value.size()+1); - buf = encode(buf, (uint8_t *) value.c_str(), value.size()+1); - return buf; - } - //! Mutex for protection - std::mutex _mtx; - //! Logger - std::shared_ptr<Logger> logger_; - //! Configure - Configure *configure_ = NULL; - //! NiFi server Name - std::string _serverName; - //! NiFi server port - int64_t _serverPort; - //! Serial Number - uint8_t _serialNumber[8]; - //! socket to server - int _socket; - //! report interal in msec - int64_t _reportInterval; - //! whether it was registered to the NiFi server - bool _registered; - //! seq number - uint32_t _seqNumber; - //! FlowController - FlowController *_controller = NULL; - //! report Blob - char *_reportBlob; - //! report Blob len; - int _reportBlobLen; - //! thread - std::thread *_thread = NULL; - //! whether it is running - bool _running; - // Prevent default copy constructor and assignment operation - // Only support pass by reference or pointer - FlowControlProtocol(const FlowControlProtocol &parent); - FlowControlProtocol &operator=(const FlowControlProtocol &parent); + private: + // Connect to the socket, return sock descriptor if success, 0 for failure + int connectServer(const char *host, uint16_t port); + // Send Data via the socket, return -1 for failure + int sendData(uint8_t *buf, int buflen); + // Read length into buf, return -1 for failure and 0 for EOF + int readData(uint8_t *buf, int buflen); + // Select on the socket + int selectClient(int msec); + // Read the header + int readHdr(FlowControlProtocolHeader *hdr); + // encode uint32_t + uint8_t *encode(uint8_t *buf, uint32_t value) { + *buf++ = (value & 0xFF000000) >> 24; + *buf++ = (value & 0x00FF0000) >> 16; + *buf++ = (value & 0x0000FF00) >> 8; + *buf++ = (value & 0x000000FF); + return buf; + } + // encode uint32_t + uint8_t *decode(uint8_t *buf, uint32_t &value) { + value = ((buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | (buf[3])); + return (buf + 4); + } + // encode byte array + uint8_t *encode(uint8_t *buf, uint8_t *bufArray, int size) { + memcpy(buf, bufArray, size); + buf += size; + return buf; + } + // encode std::string + uint8_t *encode(uint8_t *buf, std::string value) { + // add the \0 for size + buf = encode(buf, value.size() + 1); + buf = encode(buf, (uint8_t *) value.c_str(), value.size() + 1); + return buf; + } + // Mutex for protection + std::mutex mutex_; + // Logger + std::shared_ptr<logging::Logger> logger_; + // Configure + Configure *configure_ = NULL; + // NiFi server Name + std::string _serverName; + // NiFi server port + int64_t _serverPort; + // Serial Number + uint8_t _serialNumber[8]; + // socket to server + int _socket; + // report interal in msec + int64_t _reportInterval; + // whether it was registered to the NiFi server + bool _registered; + // seq number + uint32_t _seqNumber; + // FlowController + FlowController *_controller = NULL; + // report Blob + char *_reportBlob; + // report Blob len; + int _reportBlobLen; + // thread + std::thread *_thread = NULL; + // whether it is running + bool running_; + // Prevent default copy constructor and assignment operation + // Only support pass by reference or pointer + FlowControlProtocol(const FlowControlProtocol &parent); + FlowControlProtocol &operator=(const FlowControlProtocol &parent); }; +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ #endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/FlowController.h ---------------------------------------------------------------------- diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h index 9db13ff..0475623 100644 --- a/libminifi/include/FlowController.h +++ b/libminifi/include/FlowController.h @@ -28,324 +28,168 @@ #include <atomic> #include <algorithm> #include <set> -#ifdef YAML_SUPPORT -#include "yaml-cpp/yaml.h" -#endif -#include "Configure.h" -#include "Property.h" -#include "Relationship.h" +#include "properties/Configure.h" +#include "core/Relationship.h" #include "FlowFileRecord.h" #include "Connection.h" -#include "Processor.h" -#include "ProcessContext.h" -#include "ProcessSession.h" -#include "ProcessGroup.h" -#include "GenerateFlowFile.h" -#include "LogAttribute.h" -#include "RealTimeDataCollector.h" +#include "core/Processor.h" +#include "core/logging/Logger.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "core/ProcessGroup.h" +#include "core/FlowConfiguration.h" #include "TimerDrivenSchedulingAgent.h" #include "EventDrivenSchedulingAgent.h" #include "FlowControlProtocol.h" -#include "RemoteProcessorGroupPort.h" -#include "Provenance.h" -#include "FlowFileRepository.h" -#include "GetFile.h" -#include "PutFile.h" -#include "TailFile.h" -#include "ListenSyslog.h" -#include "ListenHTTP.h" -#include "ExecuteProcess.h" -#include "AppendHostInfo.h" -// OpenSSL related -#ifdef OPENSSL_SUPPORT -#include <openssl/ssl.h> -#include <openssl/err.h> -#endif -//! Default NiFi Root Group Name -#define DEFAULT_ROOT_GROUP_NAME "" -#define DEFAULT_FLOW_YAML_FILE_NAME "conf/flow.yml" -#define CONFIG_YAML_PROCESSORS_KEY "Processors" +#include "core/Property.h" -struct ProcessorConfig { - std::string id; - std::string name; - std::string javaClass; - std::string maxConcurrentTasks; - std::string schedulingStrategy; - std::string schedulingPeriod; - std::string penalizationPeriod; - std::string yieldPeriod; - std::string runDurationNanos; - std::vector<std::string> autoTerminatedRelationships; - std::vector<Property> properties; -}; +namespace org { +namespace apache { +namespace nifi { +namespace minifi { + +// Default NiFi Root Group Name +#define DEFAULT_ROOT_GROUP_NAME "" /** * Flow Controller class. Generally used by FlowController factory * as a singleton. */ -class FlowController { -public: - static const int DEFAULT_MAX_TIMER_DRIVEN_THREAD = 10; - static const int DEFAULT_MAX_EVENT_DRIVEN_THREAD = 5; - - - //! Destructor - virtual ~FlowController(){ - } - //! Set FlowController Name - virtual void setName(std::string name) { - _name = name; - } - //! Get Flow Controller Name - virtual std::string getName(void) { - return (_name); - } - //! Set UUID - virtual void setUUID(uuid_t uuid) { - uuid_copy(_uuid, uuid); - } - //! Get UUID - virtual bool getUUID(uuid_t uuid) { - if (uuid) { - uuid_copy(uuid, _uuid); - return true; - } else - return false; - } - //! Set MAX TimerDrivenThreads - virtual void setMaxTimerDrivenThreads(int number) { - _maxTimerDrivenThreads = number; - } - //! Get MAX TimerDrivenThreads - virtual int getMaxTimerDrivenThreads() { - return _maxTimerDrivenThreads; - } - //! Set MAX EventDrivenThreads - virtual void setMaxEventDrivenThreads(int number) { - _maxEventDrivenThreads = number; - } - //! Get MAX EventDrivenThreads - virtual int getMaxEventDrivenThreads() { - return _maxEventDrivenThreads; - } - //! Get the provenance repository - virtual ProvenanceRepository *getProvenanceRepository() { - return this->_provenanceRepo; - } - //! Get the flowfile repository - virtual FlowFileRepository *getFlowFileRepository() { - return this->_flowfileRepo; - } - //! Load flow xml from disk, after that, create the root process group and its children, initialize the flows - virtual void load() = 0; - - //! Whether the Flow Controller is start running - virtual bool isRunning() { - return _running.load(); - } - //! Whether the Flow Controller has already been initialized (loaded flow XML) - virtual bool isInitialized() { - return _initialized.load(); - } - //! Start to run the Flow Controller which internally start the root process group and all its children - virtual bool start() = 0; - //! Unload the current flow YAML, clean the root process group and all its children - virtual void stop(bool force) = 0; - //! Asynchronous function trigger unloading and wait for a period of time - virtual void waitUnload(const uint64_t timeToWaitMs) = 0; - //! Unload the current flow xml, clean the root process group and all its children - virtual void unload() = 0; - //! Load new xml - virtual void reload(std::string yamlFile) = 0; - //! update property value - void updatePropertyValue(std::string processorName, - std::string propertyName, std::string propertyValue) { - if (_root) - _root->updatePropertyValue(processorName, propertyName, - propertyValue); - } - - //! Create Processor (Node/Input/Output Port) based on the name - virtual Processor *createProcessor(std::string name, uuid_t uuid) = 0; - //! Create Root Processor Group - virtual ProcessGroup *createRootProcessGroup(std::string name, uuid_t uuid) = 0; - //! Create Remote Processor Group - virtual ProcessGroup *createRemoteProcessGroup(std::string name, - uuid_t uuid) = 0; - //! Create Connection - virtual Connection *createConnection(std::string name, uuid_t uuid) = 0; - //! set 8 bytes SerialNumber - virtual void setSerialNumber(uint8_t *number) { - _protocol->setSerialNumber(number); - } - -protected: - - //! A global unique identifier - uuid_t _uuid; - //! FlowController Name - std::string _name; - //! Configuration File Name - std::string _configurationFileName; - //! NiFi property File Name - std::string _propertiesFileName; - //! Root Process Group - ProcessGroup *_root; - //! MAX Timer Driven Threads - int _maxTimerDrivenThreads; - //! MAX Event Driven Threads - int _maxEventDrivenThreads; - //! Config - //! FlowFile Repo - //! Whether it is running - std::atomic<bool> _running; - //! Whether it has already been initialized (load the flow XML already) - std::atomic<bool> _initialized; - //! Provenance Repo - ProvenanceRepository *_provenanceRepo; - //! FlowFile Repo - FlowFileRepository *_flowfileRepo; - //! Flow Engines - //! Flow Timer Scheduler - TimerDrivenSchedulingAgent _timerScheduler; - //! Flow Event Scheduler - EventDrivenSchedulingAgent _eventScheduler; - //! Controller Service - //! Config - //! Site to Site Server Listener - //! Heart Beat - //! FlowControl Protocol - FlowControlProtocol *_protocol; - - - FlowController() : - _root(0), _maxTimerDrivenThreads(0), _maxEventDrivenThreads(0), _running( - false), _initialized(false), _provenanceRepo(0), _flowfileRepo(0), _protocol( - 0), logger_(Logger::getLogger()){ - } - -private: - - //! Logger - std::shared_ptr<Logger> logger_; +class FlowController : public core::CoreComponent { + public: + static const int DEFAULT_MAX_TIMER_DRIVEN_THREAD = 10; + static const int DEFAULT_MAX_EVENT_DRIVEN_THREAD = 5; + + /** + * Flow controller constructor + */ + FlowController(std::shared_ptr<core::Repository> provenance_repo, + std::shared_ptr<core::Repository> flow_file_repo, + std::unique_ptr<core::FlowConfiguration> flow_configuration, + const std::string name = DEFAULT_ROOT_GROUP_NAME,bool headless_mode=false); + + // Destructor + virtual ~FlowController(); + + // Set MAX TimerDrivenThreads + virtual void setMaxTimerDrivenThreads(int number) { + max_timer_driven_threads_ = number; + } + // Get MAX TimerDrivenThreads + virtual int getMaxTimerDrivenThreads() { + return max_timer_driven_threads_; + } + // Set MAX EventDrivenThreads + virtual void setMaxEventDrivenThreads(int number) { + max_event_driven_threads_ = number; + } + // Get MAX EventDrivenThreads + virtual int getMaxEventDrivenThreads() { + return max_event_driven_threads_; + } + // Get the provenance repository + virtual std::shared_ptr<core::Repository> getProvenanceRepository() { + return this->provenance_repo_; + } + + // Get the flowfile repository + virtual std::shared_ptr<core::Repository> getFlowFileRepository() { + return this->flow_file_repo_; + } + + // Load flow xml from disk, after that, create the root process group and its children, initialize the flows + virtual void load(); + + // Whether the Flow Controller is start running + virtual bool isRunning() { + return running_.load(); + } + // Whether the Flow Controller has already been initialized (loaded flow XML) + virtual bool isInitialized() { + return initialized_.load(); + } + // Start to run the Flow Controller which internally start the root process group and all its children + virtual bool start(); + // Unload the current flow YAML, clean the root process group and all its children + virtual void stop(bool force); + // Asynchronous function trigger unloading and wait for a period of time + virtual void waitUnload(const uint64_t timeToWaitMs); + // Unload the current flow xml, clean the root process group and all its children + virtual void unload(); + // Load new xml + virtual void reload(std::string yamlFile); + // update property value + void updatePropertyValue(std::string processorName, std::string propertyName, + std::string propertyValue) { + if (root_ != nullptr) + root_->updatePropertyValue(processorName, propertyName, propertyValue); + } + + // set 8 bytes SerialNumber + virtual void setSerialNumber(uint8_t *number) { + protocol_->setSerialNumber(number); + } + + protected: + + // function to load the flow file repo. + void loadFlowRepo(); + + /** + * Initializes flow controller paths. + */ + virtual void initializePaths(const std::string &adjustedFilename); + + // flow controller mutex + std::recursive_mutex mutex_; + + // configuration object + Configure *configure_; + + // Configuration File Name + std::string configuration_file_name_; + // NiFi property File Name + std::string properties_file_name_; + // Root Process Group + std::unique_ptr<core::ProcessGroup> root_; + // MAX Timer Driven Threads + int max_timer_driven_threads_; + // MAX Event Driven Threads + int max_event_driven_threads_; + // FlowFile Repo + // Whether it is running + std::atomic<bool> running_; + // conifiguration filename + std::string configuration_filename_; + // Whether it has already been initialized (load the flow XML already) + std::atomic<bool> initialized_; + // Provenance Repo + std::shared_ptr<core::Repository> provenance_repo_; + + // FlowFile Repo + std::shared_ptr<core::Repository> flow_file_repo_; + + // Flow Engines + // Flow Timer Scheduler + TimerDrivenSchedulingAgent _timerScheduler; + // Flow Event Scheduler + EventDrivenSchedulingAgent _eventScheduler; + // Controller Service + // Config + // Site to Site Server Listener + // Heart Beat + // FlowControl Protocol + FlowControlProtocol *protocol_; + + // flow configuration object. + std::unique_ptr<core::FlowConfiguration> flow_configuration_; }; -/** - * Flow Controller implementation that defines the typical flow. - * of events. - */ -class FlowControllerImpl: public FlowController { -public: - - //! Destructor - virtual ~FlowControllerImpl(); - - //! Life Cycle related function - //! Load flow xml from disk, after that, create the root process group and its children, initialize the flows - void load(); - //! Start to run the Flow Controller which internally start the root process group and all its children - bool start(); - //! Stop to run the Flow Controller which internally stop the root process group and all its children - void stop(bool force); - //! Asynchronous function trigger unloading and wait for a period of time - void waitUnload(const uint64_t timeToWaitMs); - //! Unload the current flow xml, clean the root process group and all its children - void unload(); - //! Load new xml - void reload(std::string yamlFile); - //! Load Flow File from persistent Flow Repo - void loadFlowRepo(); - //! update property value - void updatePropertyValue(std::string processorName, - std::string propertyName, std::string propertyValue) { - if (_root) - _root->updatePropertyValue(processorName, propertyName, - propertyValue); - } - - //! Create Processor (Node/Input/Output Port) based on the name - Processor *createProcessor(std::string name, uuid_t uuid); - //! Create Root Processor Group - ProcessGroup *createRootProcessGroup(std::string name, uuid_t uuid); - //! Create Remote Processor Group - ProcessGroup *createRemoteProcessGroup(std::string name, uuid_t uuid); - //! Create Connection - Connection *createConnection(std::string name, uuid_t uuid); - - //! Constructor - /*! - * Create a new Flow Controller - */ - FlowControllerImpl(std::string name = DEFAULT_ROOT_GROUP_NAME); - - - - - friend class FlowControlFactory; - -private: - - - - //! Mutex for protection - std::mutex _mtx; - //! Logger - std::shared_ptr<Logger> logger_; - Configure *configure_; - -#ifdef YAML_SUPPORT - //! Process Processor Node YAML - void parseProcessorNodeYaml(YAML::Node processorNode, ProcessGroup *parent); - //! Process Port YAML - void parsePortYaml(YAML::Node *portNode, ProcessGroup *parent, - TransferDirection direction); - //! Process Root Processor Group YAML - void parseRootProcessGroupYaml(YAML::Node rootNode); - //! Process Property YAML - void parseProcessorPropertyYaml(YAML::Node *doc, YAML::Node *node, - Processor *processor); - //! Process connection YAML - void parseConnectionYaml(YAML::Node *node, ProcessGroup *parent); - //! Process Remote Process Group YAML - void parseRemoteProcessGroupYaml(YAML::Node *node, ProcessGroup *parent); - //! Parse Properties Node YAML for a processor - void parsePropertiesNodeYaml(YAML::Node *propertiesNode, - Processor *processor); -#endif - - // Prevent default copy constructor and assignment operation - // Only support pass by reference or pointer - FlowControllerImpl(const FlowController &parent); - FlowControllerImpl &operator=(const FlowController &parent); - -}; - -/** - * Flow Controller factory that creates flow controllers or gets the - * assigned instance. - */ -class FlowControllerFactory { -public: - //! Get the singleton flow controller - static FlowController * getFlowController(FlowController *instance = 0) { - if (!_flowController) { - if (NULL == instance) - _flowController = createFlowController(); - else - _flowController = instance; - } - return _flowController; - } - - //! Get the singleton flow controller - static FlowController * createFlowController() { - return dynamic_cast<FlowController*>(new FlowControllerImpl()); - } -private: - static FlowController *_flowController; -}; +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ #endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/FlowFileRecord.h ---------------------------------------------------------------------- diff --git a/libminifi/include/FlowFileRecord.h b/libminifi/include/FlowFileRecord.h index ded0623..ca0856c 100644 --- a/libminifi/include/FlowFileRecord.h +++ b/libminifi/include/FlowFileRecord.h @@ -31,215 +31,155 @@ #include <fstream> #include <set> +#include "io/Serializable.h" +#include "core/FlowFile.h" #include "utils/TimeUtil.h" -#include "Logger.h" +#include "core/logging/Logger.h" #include "ResourceClaim.h" +#include "Connection.h" -class ProcessSession; -class Connection; -class FlowFileEventRecord; +namespace org { +namespace apache { +namespace nifi { +namespace minifi { #define DEFAULT_FLOWFILE_PATH "." -//! FlowFile Attribute -enum FlowAttribute -{ - //! The flowfile's path indicates the relative directory to which a FlowFile belongs and does not contain the filename - PATH = 0, - //! The flowfile's absolute path indicates the absolute directory to which a FlowFile belongs and does not contain the filename - ABSOLUTE_PATH, - //! The filename of the FlowFile. The filename should not contain any directory structure. - FILENAME, - //! A unique UUID assigned to this FlowFile. - UUID, - //! A numeric value indicating the FlowFile priority - priority, - //! The MIME Type of this FlowFile - MIME_TYPE, - //! Specifies the reason that a FlowFile is being discarded - DISCARD_REASON, - //! Indicates an identifier other than the FlowFile's UUID that is known to refer to this FlowFile. - ALTERNATE_IDENTIFIER, - MAX_FLOW_ATTRIBUTES +// FlowFile Attribute +enum FlowAttribute { + // The flowfile's path indicates the relative directory to which a FlowFile belongs and does not contain the filename + PATH = 0, + // The flowfile's absolute path indicates the absolute directory to which a FlowFile belongs and does not contain the filename + ABSOLUTE_PATH, + // The filename of the FlowFile. The filename should not contain any directory structure. + FILENAME, + // A unique UUID assigned to this FlowFile. + UUID, + // A numeric value indicating the FlowFile priority + priority, + // The MIME Type of this FlowFile + MIME_TYPE, + // Specifies the reason that a FlowFile is being discarded + DISCARD_REASON, + // Indicates an identifier other than the FlowFile's UUID that is known to refer to this FlowFile. + ALTERNATE_IDENTIFIER, + MAX_FLOW_ATTRIBUTES }; -//! FlowFile Attribute Key -static const char *FlowAttributeKeyArray[MAX_FLOW_ATTRIBUTES] = -{ - "path", - "absolute.path", - "filename", - "uuid", - "priority", - "mime.type", - "discard.reason", - "alternate.identifier" -}; - -//! FlowFile Attribute Enum to Key -inline const char *FlowAttributeKey(FlowAttribute attribute) -{ - if (attribute < MAX_FLOW_ATTRIBUTES) - return FlowAttributeKeyArray[attribute]; - else - return NULL; +// FlowFile Attribute Key +static const char *FlowAttributeKeyArray[MAX_FLOW_ATTRIBUTES] = { "path", + "absolute.path", "filename", "uuid", "priority", "mime.type", + "discard.reason", "alternate.identifier" }; + +// FlowFile Attribute Enum to Key +inline const char *FlowAttributeKey(FlowAttribute attribute) { + if (attribute < MAX_FLOW_ATTRIBUTES) + return FlowAttributeKeyArray[attribute]; + else + return NULL; } -//! FlowFile IO Callback functions for input and output -//! throw exception for error -class InputStreamCallback -{ -public: - virtual void process(std::ifstream *stream) = 0; +// FlowFile IO Callback functions for input and output +// throw exception for error +class InputStreamCallback { + public: + virtual void process(std::ifstream *stream) = 0; }; -class OutputStreamCallback -{ -public: - virtual void process(std::ofstream *stream) = 0; +class OutputStreamCallback { + public: + virtual void process(std::ofstream *stream) = 0; }; - -//! FlowFile Record Class -class FlowFileRecord -{ - friend class ProcessSession; -public: - //! Constructor - /*! - * Create a new flow record - */ - explicit FlowFileRecord(std::map<std::string, std::string> attributes, ResourceClaim *claim = NULL); - /*! - * Create a new flow record from repo flow event - */ - explicit FlowFileRecord(FlowFileEventRecord *event); - //! Destructor - virtual ~FlowFileRecord(); - //! addAttribute key is enum - bool addAttribute(FlowAttribute key, std::string value); - //! addAttribute key is string - bool addAttribute(std::string key, std::string value); - //! removeAttribute key is enum - bool removeAttribute(FlowAttribute key); - //! removeAttribute key is string - bool removeAttribute(std::string key); - //! updateAttribute key is enum - bool updateAttribute(FlowAttribute key, std::string value); - //! updateAttribute key is string - bool updateAttribute(std::string key, std::string value); - //! getAttribute key is enum - bool getAttribute(FlowAttribute key, std::string &value); - //! getAttribute key is string - bool getAttribute(std::string key, std::string &value); - //! setAttribute, if attribute already there, update it, else, add it - void setAttribute(std::string key, std::string value) { - _attributes[key] = value; - } - //! Get the UUID as string - std::string getUUIDStr() { - return _uuidStr; - } - //! Get Attributes - std::map<std::string, std::string> getAttributes() { - return _attributes; - } - //! Check whether it is still being penalized - bool isPenalized() { - return (_penaltyExpirationMs > 0 ? _penaltyExpirationMs > getTimeMillis() : false); - } - //! Get Size - uint64_t getSize() { - return _size; - } - // ! Get Offset - uint64_t getOffset() { - return _offset; - } - // ! Get Entry Date - uint64_t getEntryDate() { - return _entryDate; - } - // ! Get Lineage Start Date - uint64_t getlineageStartDate() { - return _lineageStartDate; - } - // ! Set Original connection - void setOriginalConnection (Connection *connection) { - _orginalConnection = connection; - } - //! Get Original connection - Connection * getOriginalConnection() { - return _orginalConnection; - } - //! Get Resource Claim - ResourceClaim *getResourceClaim() { - return _claim; - } - //! Get lineageIdentifiers - std::set<std::string> getlineageIdentifiers() - { - return _lineageIdentifiers; - } - //! Check whether it is stored to DB already - bool isStoredToRepository() - { - return _isStoredToRepo; - } - void setStoredToRepository(bool value) - { - _isStoredToRepo = value; - } - -protected: - - //! Date at which the flow file entered the flow - uint64_t _entryDate; - //! Date at which the origin of this flow file entered the flow - uint64_t _lineageStartDate; - //! Date at which the flow file was queued - uint64_t _lastQueueDate; - //! Size in bytes of the data corresponding to this flow file - uint64_t _size; - //! A global unique identifier - uuid_t _uuid; - //! A local unique identifier - uint64_t _id; - //! Offset to the content - uint64_t _offset; - //! Penalty expiration - uint64_t _penaltyExpirationMs; - //! Attributes key/values pairs for the flow record - std::map<std::string, std::string> _attributes; - //! Pointer to the associated content resource claim - ResourceClaim *_claim; - //! UUID string - std::string _uuidStr; - //! UUID string for all parents - std::set<std::string> _lineageIdentifiers; - //! whether it is stored to DB - bool _isStoredToRepo; - //! duplicate the original flow file - void duplicate(FlowFileRecord *original); - -private: - - //! Local flow sequence ID - static std::atomic<uint64_t> _localFlowSeqNumber; - //! Mark for deletion - bool _markedDelete; - //! Connection queue that this flow file will be transfer or current in - Connection *_connection; - //! Orginal connection queue that this flow file was dequeued from - Connection *_orginalConnection; - //! Logger - std::shared_ptr<Logger> logger_; - //! Snapshot flow record for session rollback - bool _snapshot; - // Prevent default copy constructor and assignment operation - // Only support pass by reference or pointer - FlowFileRecord(const FlowFileRecord &parent); - FlowFileRecord &operator=(const FlowFileRecord &parent); +class FlowFileRecord : public core::FlowFile, public io::Serializable { + public: + // Constructor + /* + * Create a new flow record + */ + explicit FlowFileRecord(std::shared_ptr<core::Repository> flow_repository, + std::map<std::string, std::string> attributes, + std::shared_ptr<ResourceClaim> claim = nullptr); + + explicit FlowFileRecord(std::shared_ptr<core::Repository> flow_repository, + std::shared_ptr<core::FlowFile> &event); + + explicit FlowFileRecord(std::shared_ptr<core::Repository> flow_repository, + std::shared_ptr<core::FlowFile> &event, + const std::string &uuidConnection); + + explicit FlowFileRecord(std::shared_ptr<core::Repository> flow_repository) + : FlowFile(), + flow_repository_(flow_repository), + snapshot_("") { + + } + // Destructor + virtual ~FlowFileRecord(); + // addAttribute key is enum + bool addKeyedAttribute(FlowAttribute key, std::string value); + // removeAttribute key is enum + bool removeKeyedAttribute(FlowAttribute key); + // updateAttribute key is enum + bool updateKeyedAttribute(FlowAttribute key, std::string value); + // getAttribute key is enum + bool getKeyedAttribute(FlowAttribute key, std::string &value); + + //! Serialize and Persistent to the repository + bool Serialize(); + //! DeSerialize + bool DeSerialize(const uint8_t *buffer, const int bufferSize); + //! DeSerialize + bool DeSerialize(io::DataStream &stream) { + return DeSerialize(stream.getBuffer(), stream.getSize()); + } + //! DeSerialize + bool DeSerialize(std::string key); + + void setSnapShot(bool snapshot) { + snapshot_ = snapshot; + } + + /** + * gets the UUID connection. + * @return uuidConnection + */ + const std::string getConnectionUuid() { + return uuid_connection_; + } + + const std::string getContentFullPath() + { + return content_full_fath_; + } + + + FlowFileRecord &operator=(const FlowFileRecord &); + + FlowFileRecord(const FlowFileRecord &parent) = delete; + + protected: + + // connection uuid + std::string uuid_connection_; + // Full path to the content + std::string content_full_fath_; + + // Local flow sequence ID + static std::atomic<uint64_t> local_flow_seq_number_; + + // repository reference. + std::shared_ptr<core::Repository> flow_repository_; + + // Snapshot flow record for session rollback + bool snapshot_; + // Prevent default copy constructor and assignment operation + // Only support pass by reference or pointer }; +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + #endif
