http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/include/spdlog/sinks/null_sink.h ---------------------------------------------------------------------- diff --git a/include/spdlog/sinks/null_sink.h b/include/spdlog/sinks/null_sink.h new file mode 100644 index 0000000..992b3b7 --- /dev/null +++ b/include/spdlog/sinks/null_sink.h @@ -0,0 +1,52 @@ +/*************************************************************************/ +/* spdlog - an extremely fast and easy to use c++11 logging library. */ +/* Copyright (c) 2014 Gabi Melman. */ +/* */ +/* Permission is hereby granted, free of charge, to any person obtaining */ +/* a copy of this software and associated documentation files (the */ +/* "Software"), to deal in the Software without restriction, including */ +/* without limitation the rights to use, copy, modify, merge, publish, */ +/* distribute, sublicense, and/or sell copies of the Software, and to */ +/* permit persons to whom the Software is furnished to do so, subject to */ +/* the following conditions: */ +/* */ +/* The above copyright notice and this permission notice shall be */ +/* included in all copies or substantial portions of the Software. */ +/* */ +/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, */ +/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF */ +/* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.*/ +/* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY */ +/* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, */ +/* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE */ +/* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ +/*************************************************************************/ + +#pragma once +#include <mutex> +#include "./base_sink.h" +#include "../details/null_mutex.h" + + +namespace spdlog +{ +namespace sinks +{ + +template <class Mutex> +class null_sink : public base_sink < Mutex > +{ +protected: + void _sink_it(const details::log_msg&) override + {} + + void flush() override + {} + +}; +typedef null_sink<details::null_mutex> null_sink_st; +typedef null_sink<std::mutex> null_sink_mt; + +} +} +
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/include/spdlog/sinks/ostream_sink.h ---------------------------------------------------------------------- diff --git a/include/spdlog/sinks/ostream_sink.h b/include/spdlog/sinks/ostream_sink.h new file mode 100644 index 0000000..f2fe3b2 --- /dev/null +++ b/include/spdlog/sinks/ostream_sink.h @@ -0,0 +1,67 @@ +/*************************************************************************/ +/* spdlog - an extremely fast and easy to use c++11 logging library. */ +/* Copyright (c) 2014 Gabi Melman. */ +/* */ +/* Permission is hereby granted, free of charge, to any person obtaining */ +/* a copy of this software and associated documentation files (the */ +/* "Software"), to deal in the Software without restriction, including */ +/* without limitation the rights to use, copy, modify, merge, publish, */ +/* distribute, sublicense, and/or sell copies of the Software, and to */ +/* permit persons to whom the Software is furnished to do so, subject to */ +/* the following conditions: */ +/* */ +/* The above copyright notice and this permission notice shall be */ +/* included in all copies or substantial portions of the Software. */ +/* */ +/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, */ +/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF */ +/* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.*/ +/* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY */ +/* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, */ +/* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE */ +/* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ +/*************************************************************************/ + +#pragma once + +#include <ostream> +#include <mutex> +#include <memory> + +#include "../details/null_mutex.h" +#include "./base_sink.h" + +namespace spdlog +{ +namespace sinks +{ +template<class Mutex> +class ostream_sink: public base_sink<Mutex> +{ +public: + explicit ostream_sink(std::ostream& os, bool force_flush=false) :_ostream(os), _force_flush(force_flush) {} + ostream_sink(const ostream_sink&) = delete; + ostream_sink& operator=(const ostream_sink&) = delete; + virtual ~ostream_sink() = default; + +protected: + void _sink_it(const details::log_msg& msg) override + { + _ostream.write(msg.formatted.data(), msg.formatted.size()); + if (_force_flush) + _ostream.flush(); + } + + void flush() override + { + _ostream.flush(); + } + + std::ostream& _ostream; + bool _force_flush; +}; + +typedef ostream_sink<std::mutex> ostream_sink_mt; +typedef ostream_sink<details::null_mutex> ostream_sink_st; +} +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/include/spdlog/sinks/sink.h ---------------------------------------------------------------------- diff --git a/include/spdlog/sinks/sink.h b/include/spdlog/sinks/sink.h new file mode 100644 index 0000000..88c423a --- /dev/null +++ b/include/spdlog/sinks/sink.h @@ -0,0 +1,42 @@ +/*************************************************************************/ +/* spdlog - an extremely fast and easy to use c++11 logging library. */ +/* Copyright (c) 2014 Gabi Melman. */ +/* */ +/* Permission is hereby granted, free of charge, to any person obtaining */ +/* a copy of this software and associated documentation files (the */ +/* "Software"), to deal in the Software without restriction, including */ +/* without limitation the rights to use, copy, modify, merge, publish, */ +/* distribute, sublicense, and/or sell copies of the Software, and to */ +/* permit persons to whom the Software is furnished to do so, subject to */ +/* the following conditions: */ +/* */ +/* The above copyright notice and this permission notice shall be */ +/* included in all copies or substantial portions of the Software. */ +/* */ +/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, */ +/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF */ +/* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.*/ +/* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY */ +/* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, */ +/* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE */ +/* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ +/*************************************************************************/ + +#pragma once + +#include "../details/log_msg.h" + +namespace spdlog +{ +namespace sinks +{ +class sink +{ +public: + virtual ~sink() {} + virtual void log(const details::log_msg& msg) = 0; + virtual void flush() = 0; +}; +} +} + http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/include/spdlog/sinks/stdout_sinks.h ---------------------------------------------------------------------- diff --git a/include/spdlog/sinks/stdout_sinks.h b/include/spdlog/sinks/stdout_sinks.h new file mode 100644 index 0000000..4ca16ac --- /dev/null +++ b/include/spdlog/sinks/stdout_sinks.h @@ -0,0 +1,71 @@ +/*************************************************************************/ +/* spdlog - an extremely fast and easy to use c++11 logging library. */ +/* Copyright (c) 2014 Gabi Melman. */ +/* */ +/* Permission is hereby granted, free of charge, to any person obtaining */ +/* a copy of this software and associated documentation files (the */ +/* "Software"), to deal in the Software without restriction, including */ +/* without limitation the rights to use, copy, modify, merge, publish, */ +/* distribute, sublicense, and/or sell copies of the Software, and to */ +/* permit persons to whom the Software is furnished to do so, subject to */ +/* the following conditions: */ +/* */ +/* The above copyright notice and this permission notice shall be */ +/* included in all copies or substantial portions of the Software. */ +/* */ +/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, */ +/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF */ +/* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.*/ +/* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY */ +/* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, */ +/* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE */ +/* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ +/*************************************************************************/ + +#pragma once + +#include <iostream> +#include <mutex> +#include "./ostream_sink.h" +#include "spdlog/details/null_mutex.h" + +namespace spdlog +{ +namespace sinks +{ + +template <class Mutex> +class stdout_sink : public ostream_sink<Mutex> +{ + using MyType = stdout_sink<Mutex>; +public: + stdout_sink() : ostream_sink<Mutex>(std::cout, true) {} + static std::shared_ptr<MyType> instance() + { + static std::shared_ptr<MyType> instance = std::make_shared<MyType>(); + return instance; + } +}; + +typedef stdout_sink<details::null_mutex> stdout_sink_st; +typedef stdout_sink<std::mutex> stdout_sink_mt; + + +template <class Mutex> +class stderr_sink : public ostream_sink<Mutex> +{ + using MyType = stderr_sink<Mutex>; +public: + stderr_sink() : ostream_sink<Mutex>(std::cerr, true) {} + static std::shared_ptr<MyType> instance() + { + static std::shared_ptr<MyType> instance = std::make_shared<MyType>(); + return instance; + } + +}; + +typedef stderr_sink<std::mutex> stderr_sink_mt; +typedef stderr_sink<details::null_mutex> stderr_sink_st; +} +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/include/spdlog/sinks/syslog_sink.h ---------------------------------------------------------------------- diff --git a/include/spdlog/sinks/syslog_sink.h b/include/spdlog/sinks/syslog_sink.h new file mode 100644 index 0000000..37b6513 --- /dev/null +++ b/include/spdlog/sinks/syslog_sink.h @@ -0,0 +1,102 @@ +/*************************************************************************/ +/* spdlog - an extremely fast and easy to use c++11 logging library. */ +/* Copyright (c) 2014 Gabi Melman. */ +/* */ +/* Permission is hereby granted, free of charge, to any person obtaining */ +/* a copy of this software and associated documentation files (the */ +/* "Software"), to deal in the Software without restriction, including */ +/* without limitation the rights to use, copy, modify, merge, publish, */ +/* distribute, sublicense, and/or sell copies of the Software, and to */ +/* permit persons to whom the Software is furnished to do so, subject to */ +/* the following conditions: */ +/* */ +/* The above copyright notice and this permission notice shall be */ +/* included in all copies or substantial portions of the Software. */ +/* */ +/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, */ +/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF */ +/* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.*/ +/* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY */ +/* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, */ +/* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE */ +/* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ +/*************************************************************************/ + +#pragma once + +#ifdef __linux__ + +#include <array> +#include <string> +#include <syslog.h> + +#include "./sink.h" +#include "../common.h" +#include "../details/log_msg.h" + + +namespace spdlog +{ +namespace sinks +{ +/** + * Sink that write to syslog using the `syscall()` library call. + * + * Locking is not needed, as `syslog()` itself is thread-safe. + */ +class syslog_sink : public sink +{ +public: + // + syslog_sink(const std::string& ident = "", int syslog_option=0, int syslog_facility=LOG_USER): + _ident(ident) + { + _priorities[static_cast<int>(level::trace)] = LOG_DEBUG; + _priorities[static_cast<int>(level::debug)] = LOG_DEBUG; + _priorities[static_cast<int>(level::info)] = LOG_INFO; + _priorities[static_cast<int>(level::notice)] = LOG_NOTICE; + _priorities[static_cast<int>(level::warn)] = LOG_WARNING; + _priorities[static_cast<int>(level::err)] = LOG_ERR; + _priorities[static_cast<int>(level::critical)] = LOG_CRIT; + _priorities[static_cast<int>(level::alert)] = LOG_ALERT; + _priorities[static_cast<int>(level::emerg)] = LOG_EMERG; + _priorities[static_cast<int>(level::off)] = LOG_INFO; + + //set ident to be program name if empty + ::openlog(_ident.empty()? nullptr:_ident.c_str(), syslog_option, syslog_facility); + } + ~syslog_sink() + { + ::closelog(); + } + + syslog_sink(const syslog_sink&) = delete; + syslog_sink& operator=(const syslog_sink&) = delete; + + void log(const details::log_msg &msg) override + { + ::syslog(syslog_prio_from_level(msg), "%s", msg.formatted.str().c_str()); + } + + void flush() override + { + } + + +private: + std::array<int, 10> _priorities; + //must store the ident because the man says openlog might use the pointer as is and not a string copy + const std::string _ident; + + // + // Simply maps spdlog's log level to syslog priority level. + // + int syslog_prio_from_level(const details::log_msg &msg) const + { + return _priorities[static_cast<int>(msg.level)]; + } +}; +} +} + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/include/spdlog/spdlog.h ---------------------------------------------------------------------- diff --git a/include/spdlog/spdlog.h b/include/spdlog/spdlog.h new file mode 100644 index 0000000..5cec562 --- /dev/null +++ b/include/spdlog/spdlog.h @@ -0,0 +1,155 @@ +/*************************************************************************/ +/* spdlog - an extremely fast and easy to use c++11 logging library. */ +/* Copyright (c) 2014 Gabi Melman. */ +/* */ +/* Permission is hereby granted, free of charge, to any person obtaining */ +/* a copy of this software and associated documentation files (the */ +/* "Software"), to deal in the Software without restriction, including */ +/* without limitation the rights to use, copy, modify, merge, publish, */ +/* distribute, sublicense, and/or sell copies of the Software, and to */ +/* permit persons to whom the Software is furnished to do so, subject to */ +/* the following conditions: */ +/* */ +/* The above copyright notice and this permission notice shall be */ +/* included in all copies or substantial portions of the Software. */ +/* */ +/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, */ +/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF */ +/* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.*/ +/* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY */ +/* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, */ +/* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE */ +/* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ +/*************************************************************************/ + + +// spdlog main header file. +//see example.cpp for usage example + +#pragma once + +#include "tweakme.h" +#include "common.h" +#include "logger.h" + +namespace spdlog +{ +// Return an existing logger or nullptr if a logger with such name doesn't exist. +// Examples: +// +// spdlog::get("mylog")->info("Hello"); +// auto logger = spdlog::get("mylog"); +// logger.info("This is another message" , x, y, z); +// logger.info() << "This is another message" << x << y << z; +std::shared_ptr<logger> get(const std::string& name); + +// +// Set global formatting +// example: spdlog::set_pattern("%Y-%m-%d %H:%M:%S.%e %l : %v"); +// +void set_pattern(const std::string& format_string); +void set_formatter(formatter_ptr f); + +// +// Set global logging level for +// +void set_level(level::level_enum log_level); + +// +// Turn on async mode (off by default) and set the queue size for each async_logger. +// effective only for loggers created after this call. +// queue_size: size of queue (must be power of 2): +// Each logger will pre-allocate a dedicated queue with queue_size entries upon construction. +// +// async_overflow_policy (optional, block_retry by default): +// async_overflow_policy::block_retry - if queue is full, block until queue has room for the new log entry. +// async_overflow_policy::discard_log_msg - never block and discard any new messages when queue overflows. +// +// worker_warmup_cb (optional): +// callback function that will be called in worker thread upon start (can be used to init stuff like thread affinity) +// +void set_async_mode(size_t queue_size, const async_overflow_policy overflow_policy = async_overflow_policy::block_retry, const std::function<void()>& worker_warmup_cb = nullptr, const std::chrono::milliseconds& flush_interval_ms = std::chrono::milliseconds::zero()); + +// Turn off async mode +void set_sync_mode(); + +// +// Create and register multi/single threaded rotating file logger +// +std::shared_ptr<logger> rotating_logger_mt(const std::string& logger_name, const std::string& filename, size_t max_file_size, size_t max_files, bool force_flush = false); +std::shared_ptr<logger> rotating_logger_st(const std::string& logger_name, const std::string& filename, size_t max_file_size, size_t max_files, bool force_flush = false); + +// +// Create file logger which creates new file on the given time (default in midnight): +// +std::shared_ptr<logger> daily_logger_mt(const std::string& logger_name, const std::string& filename, int hour=0, int minute=0, bool force_flush = false); +std::shared_ptr<logger> daily_logger_st(const std::string& logger_name, const std::string& filename, int hour=0, int minute=0, bool force_flush = false); + + +// +// Create and register stdout/stderr loggers +// +std::shared_ptr<logger> stdout_logger_mt(const std::string& logger_name); +std::shared_ptr<logger> stdout_logger_st(const std::string& logger_name); +std::shared_ptr<logger> stderr_logger_mt(const std::string& logger_name); +std::shared_ptr<logger> stderr_logger_st(const std::string& logger_name); + + +// +// Create and register a syslog logger +// +#ifdef __linux__ +std::shared_ptr<logger> syslog_logger(const std::string& logger_name, const std::string& ident = "", int syslog_option = 0); +#endif + + +// Create and register a logger with multiple sinks +std::shared_ptr<logger> create(const std::string& logger_name, sinks_init_list sinks); +template<class It> +std::shared_ptr<logger> create(const std::string& logger_name, const It& sinks_begin, const It& sinks_end); + + +// Create and register a logger with templated sink type +// Example: spdlog::create<daily_file_sink_st>("mylog", "dailylog_filename", "txt"); +template <typename Sink, typename... Args> +std::shared_ptr<spdlog::logger> create(const std::string& logger_name, const Args&...); + + +// Register the given logger with the given name +void register_logger(std::shared_ptr<logger> logger); + +// Drop the reference to the given logger +void drop(const std::string &name); + +// Drop all references +void drop_all(); + + +/////////////////////////////////////////////////////////////////////////////// +// +// Macros to be display source file & line +// Trace & Debug can be switched on/off at compile time for zero cost debug statements. +// Uncomment SPDLOG_DEBUG_ON/SPDLOG_TRACE_ON in teakme.h to enable. +// +// Example: +// spdlog::set_level(spdlog::level::debug); +// SPDLOG_DEBUG(my_logger, "Some debug message {} {}", 1, 3.2); +/////////////////////////////////////////////////////////////////////////////// + +#ifdef SPDLOG_TRACE_ON +#define SPDLOG_TRACE(logger, ...) logger->trace(__VA_ARGS__) << " (" << __FILE__ << " #" << __LINE__ <<")"; +#else +#define SPDLOG_TRACE(logger, ...) +#endif + +#ifdef SPDLOG_DEBUG_ON +#define SPDLOG_DEBUG(logger, ...) logger->debug(__VA_ARGS__) << " (" << __FILE__ << " #" << __LINE__ <<")"; +#else +#define SPDLOG_DEBUG(logger, ...) +#endif + + +} + + +#include "details/spdlog_impl.h" http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/include/spdlog/tweakme.h ---------------------------------------------------------------------- diff --git a/include/spdlog/tweakme.h b/include/spdlog/tweakme.h new file mode 100644 index 0000000..b651658 --- /dev/null +++ b/include/spdlog/tweakme.h @@ -0,0 +1,74 @@ +/*************************************************************************/ +/* spdlog - an extremely fast and easy to use c++11 logging library. */ +/* Copyright (c) 2014 Gabi Melman. */ +/* */ +/* Permission is hereby granted, free of charge, to any person obtaining */ +/* a copy of this software and associated documentation files (the */ +/* "Software"), to deal in the Software without restriction, including */ +/* without limitation the rights to use, copy, modify, merge, publish, */ +/* distribute, sublicense, and/or sell copies of the Software, and to */ +/* permit persons to whom the Software is furnished to do so, subject to */ +/* the following conditions: */ +/* */ +/* The above copyright notice and this permission notice shall be */ +/* included in all copies or substantial portions of the Software. */ +/* */ +/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, */ +/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF */ +/* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.*/ +/* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY */ +/* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, */ +/* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE */ +/* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ +/*************************************************************************/ + + +#pragma once + +/////////////////////////////////////////////////////////////////////////////// +// Edit this file to squeeze every last drop of performance out of spdlog. +/////////////////////////////////////////////////////////////////////////////// + + +/////////////////////////////////////////////////////////////////////////////// +// Under Linux, the much faster CLOCK_REALTIME_COARSE clock can be used. +// This clock is less accurate - can be off by dozens of millis - depending on the kernel HZ. +// Uncomment to use it instead of the regular (but slower) clock. +// #define SPDLOG_CLOCK_COARSE +/////////////////////////////////////////////////////////////////////////////// + + +/////////////////////////////////////////////////////////////////////////////// +// Uncomment if date/time logging is not needed. +// This will prevent spdlog from quering the clock on each log call. +// #define SPDLOG_NO_DATETIME +/////////////////////////////////////////////////////////////////////////////// + + +/////////////////////////////////////////////////////////////////////////////// +// Uncomment if thread id logging is not needed (i.e. no %t in the log pattern). +// This will prevent spdlog from quering the thread id on each log call. +// #define SPDLOG_NO_THREAD_ID +/////////////////////////////////////////////////////////////////////////////// + + +/////////////////////////////////////////////////////////////////////////////// +// Uncomment if logger name logging is not needed. +// This will prevent spdlog from copying the logger name on each log call. +// #define SPDLOG_NO_NAME +/////////////////////////////////////////////////////////////////////////////// + + +/////////////////////////////////////////////////////////////////////////////// +// Uncomment to enable the SPDLOG_DEBUG/SPDLOG_TRACE macros. +// #define SPDLOG_DEBUG_ON +// #define SPDLOG_TRACE_ON +/////////////////////////////////////////////////////////////////////////////// + + +/////////////////////////////////////////////////////////////////////////////// +// Uncomment to avoid locking in the registry operations (spdlog::get(), spdlog::drop() spdlog::register()). +// Use only if your code never modifes concurrently the registry. +// Note that upon creating a logger the registry is modified by spdlog.. +// #define SPDLOG_NO_REGISTRY_MUTEX +/////////////////////////////////////////////////////////////////////////////// http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt new file mode 100644 index 0000000..571b73d --- /dev/null +++ b/libminifi/CMakeLists.txt @@ -0,0 +1,51 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless qrequired by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +cmake_minimum_required (VERSION 2.6) + +set(PROJECT "apache-nifi-minifi-cpp") +set(VERSION "0.1.0") + +#### Establish Project Configuration #### +# Enable usage of the VERSION specifier +# https://cmake.org/cmake/help/v3.0/policy/CMP0048.html#policy:CMP0048 +cmake_policy(SET CMP0048 NEW) + +project(${PROJECT} + VERSION ${VERSION}) + +set(CMAKE_CXX_STANDARD 11) +set(CMAKE_CXX_STANDARD_REQUIRED ON) + +include_directories(../include) +include_directories(include) + +file(GLOB SOURCES "src/*.cpp") + +add_library(spdlog INTERFACE) +add_library(minifi STATIC ${SOURCES}) + +# Include libxml2 +find_package (LibXml2) +if (LIBXML2_FOUND) + include_directories(${LIBXML2_INCLUDE_DIR}) + target_link_libraries (minifi ${LIBXML2_LIBRARIES}) +else () + # Build from our local version +endif (LIBXML2_FOUND) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/include/Configure.h ---------------------------------------------------------------------- diff --git a/libminifi/include/Configure.h b/libminifi/include/Configure.h new file mode 100644 index 0000000..d325fa0 --- /dev/null +++ b/libminifi/include/Configure.h @@ -0,0 +1,115 @@ +/** + * @file Configure.h + * Configure class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __CONFIGURE_H__ +#define __CONFIGURE_H__ + +#include <stdio.h> +#include <string> +#include <map> +#include <stdlib.h> +#include <errno.h> +#include <iostream> +#include <fstream> +#include "Logger.h" + +class Configure { +public: + //! Get the singleton logger instance + static Configure * getConfigure() + { + if (!_configure) + { + _configure = new Configure(); + } + return _configure; + } + //! nifi.flow.configuration.file + static const char *nifi_flow_configuration_file; + static const char *nifi_administrative_yield_duration; + static const char *nifi_bored_yield_duration; + static const char *nifi_server_name; + static const char *nifi_server_port; + static const char *nifi_server_report_interval; + + //! Clear the load config + void clear() + { + std::lock_guard<std::mutex> lock(_mtx); + _properties.clear(); + } + //! Set the config value + void set(std::string key, std::string value) + { + std::lock_guard<std::mutex> lock(_mtx); + _properties[key] = value; + } + //! Check whether the config value existed + bool has(std::string key) + { + std::lock_guard<std::mutex> lock(_mtx); + return (_properties.find(key) != _properties.end()); + } + //! Get the config value + bool get(std::string key, std::string &value); + // Trim String utils + std::string trim(const std::string& s); + std::string trimLeft(const std::string& s); + std::string trimRight(const std::string& s); + //! Parse one line in configure file like key=value + void parseConfigureFileLine(char *buf); + //! Load Configure File + void loadConfigureFile(const char *fileName); + //! Set the determined MINIFI_HOME + void setHome(std::string minifiHome) + { + _minifiHome = minifiHome; + } + + //! Get the determined MINIFI_HOME + std::string getHome() + { + return _minifiHome; + } + //! Parse Command Line + void parseCommandLine(int argc, char **argv); + +private: + //! Mutex for protection + std::mutex _mtx; + //! Logger + Logger *_logger; + //! Home location for this executable + std::string _minifiHome; + + Configure() + { + _logger = Logger::getLogger(); + } + virtual ~Configure() + { + + } + static Configure *_configure; + +protected: + std::map<std::string,std::string> _properties; +}; + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/include/Connection.h ---------------------------------------------------------------------- diff --git a/libminifi/include/Connection.h b/libminifi/include/Connection.h new file mode 100644 index 0000000..dc6b94b --- /dev/null +++ b/libminifi/include/Connection.h @@ -0,0 +1,201 @@ +/** + * @file Connection.h + * Connection class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __CONNECTION_H__ +#define __CONNECTION_H__ + +#include <uuid/uuid.h> +#include <vector> +#include <queue> +#include <map> +#include <mutex> +#include <atomic> +#include <algorithm> + +#include "FlowFileRecord.h" +#include "Relationship.h" +#include "Logger.h" + +//! Forwarder declaration +class Processor; + +//! Connection Class +class Connection +{ +public: + //! Constructor + /*! + * Create a new processor + */ + Connection(std::string name, uuid_t uuid = NULL, uuid_t srcUUID = NULL, uuid_t destUUID = NULL); + //! Destructor + virtual ~Connection() {} + //! Set Connection Name + void setName(std::string name) { + _name = name; + } + //! Get Process Name + std::string getName(void) { + return (_name); + } + //! Set UUID + void setUUID(uuid_t uuid) { + uuid_copy(_uuid, uuid); + } + //! Set Source Processor UUID + void setSourceProcessorUUID(uuid_t uuid) { + uuid_copy(_srcUUID, uuid); + } + //! Set Destination Processor UUID + void setDestinationProcessorUUID(uuid_t uuid) { + uuid_copy(_destUUID, uuid); + } + //! Get Source Processor UUID + void getSourceProcessorUUID(uuid_t uuid) { + uuid_copy(uuid, _srcUUID); + } + //! Get Destination Processor UUID + void getDestinationProcessorUUID(uuid_t uuid) { + uuid_copy(uuid, _destUUID); + } + //! Get UUID + bool getUUID(uuid_t uuid) { + if (uuid) + { + uuid_copy(uuid, _uuid); + return true; + } + else + return false; + } + //! Set Connection Source Processor + void setSourceProcessor(Processor *source) { + _srcProcessor = source; + } + // ! Get Connection Source Processor + Processor *getSourceProcessor() { + return _srcProcessor; + } + //! Set Connection Destination Processor + void setDestinationProcessor(Processor *dest) { + _destProcessor = dest; + } + // ! Get Connection Destination Processor + Processor *getDestinationProcessor() { + return _destProcessor; + } + //! Set Connection relationship + void setRelationship(Relationship relationship) { + _relationship = relationship; + } + // ! Get Connection relationship + Relationship getRelationship() { + return _relationship; + } + //! Set Max Queue Size + void setMaxQueueSize(uint64_t size) + { + _maxQueueSize = size; + } + //! Get Max Queue Size + uint64_t getMaxQueueSize() + { + return _maxQueueSize; + } + //! Set Max Queue Data Size + void setMaxQueueDataSize(uint64_t size) + { + _maxQueueDataSize = size; + } + //! Get Max Queue Data Size + uint64_t getMaxQueueDataSize() + { + return _maxQueueDataSize; + } + //! Set Flow expiration duration in millisecond + void setFlowExpirationDuration(uint64_t duration) + { + _expiredDuration = duration; + } + //! Get Flow expiration duration in millisecond + uint64_t getFlowExpirationDuration() + { + return _expiredDuration; + } + //! Check whether the queue is empty + bool isEmpty(); + //! Check whether the queue is full to apply back pressure + bool isFull(); + //! Get queue size + uint64_t getQueueSize() { + std::lock_guard<std::mutex> lock(_mtx); + return _queue.size(); + } + //! Get queue data size + uint64_t getQueueDataSize() + { + return _maxQueueDataSize; + } + //! Put the flow file into queue + void put(FlowFileRecord *flow); + //! Poll the flow file from queue, the expired flow file record also being returned + FlowFileRecord *poll(std::set<FlowFileRecord *> &expiredFlowRecords); + //! Drain the flow records + void drain(); + +protected: + //! A global unique identifier + uuid_t _uuid; + //! Source Processor UUID + uuid_t _srcUUID; + //! Destination Processor UUID + uuid_t _destUUID; + //! Connection Name + std::string _name; + //! Relationship for this connection + Relationship _relationship; + //! Source Processor (ProcessNode/Port) + Processor *_srcProcessor; + //! Destination Processor (ProcessNode/Port) + Processor *_destProcessor; + //! Max queue size to apply back pressure + std::atomic<uint64_t> _maxQueueSize; + //! Max queue data size to apply back pressure + std::atomic<uint64_t> _maxQueueDataSize; + //! Flow File Expiration Duration in= MilliSeconds + std::atomic<uint64_t> _expiredDuration; + + +private: + //! Mutex for protection + std::mutex _mtx; + //! Queued data size + std::atomic<uint64_t> _queuedDataSize; + //! Queue for the Flow File + std::queue<FlowFileRecord *> _queue; + //! Logger + Logger *_logger; + // Prevent default copy constructor and assignment operation + // Only support pass by reference or pointer + Connection(const Connection &parent); + Connection &operator=(const Connection &parent); + +}; + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/include/Exception.h ---------------------------------------------------------------------- diff --git a/libminifi/include/Exception.h b/libminifi/include/Exception.h new file mode 100644 index 0000000..d321454 --- /dev/null +++ b/libminifi/include/Exception.h @@ -0,0 +1,95 @@ +/** + * @file Exception.h + * Exception class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __EXCEPTION_H__ +#define __EXCEPTION_H__ + +#include <sstream> +#include <exception> +#include <stdexcept> +#include <errno.h> +#include <string.h> + +//! ExceptionType +enum ExceptionType +{ + FILE_OPERATION_EXCEPTION = 0, + FLOW_EXCEPTION, + PROCESSOR_EXCEPTION, + PROCESS_SESSION_EXCEPTION, + PROCESS_SCHEDULE_EXCEPTION, + SITE2SITE_EXCEPTION, + GENERAL_EXCEPTION, + MAX_EXCEPTION +}; + +//! Exception String +static const char *ExceptionStr[MAX_EXCEPTION] = +{ + "File Operation", + "Flow File Operation", + "Processor Operation", + "Process Session Operation", + "Process Schedule Operation", + "Site2Site Protocol", + "General Operation" +}; + +//! Exception Type to String +inline const char *ExceptionTypeToString(ExceptionType type) +{ + if (type < MAX_EXCEPTION) + return ExceptionStr[type]; + else + return NULL; +} + +//! Exception Class +class Exception : public std::exception +{ +public: + //! Constructor + /*! + * Create a new flow record + */ + Exception(ExceptionType type, const char *errorMsg) : _type(type), _errorMsg(errorMsg) { + } + //! Destructor + virtual ~Exception() throw () {} + virtual const char * what() const throw () { + + _whatStr = ExceptionTypeToString(_type); + + _whatStr += ":" + _errorMsg; + return _whatStr.c_str(); + } + +protected: + +private: + //! Exception type + ExceptionType _type; + //! Exception detailed information + std::string _errorMsg; + //! Hold the what result + mutable std::string _whatStr; + +}; + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/include/FlowControlProtocol.h ---------------------------------------------------------------------- diff --git a/libminifi/include/FlowControlProtocol.h b/libminifi/include/FlowControlProtocol.h new file mode 100644 index 0000000..23f2d49 --- /dev/null +++ b/libminifi/include/FlowControlProtocol.h @@ -0,0 +1,339 @@ +/** + * @file FlowControlProtocol.h + * FlowControlProtocol class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __FLOW_CONTROL_PROTOCOL_H__ +#define __FLOW_CONTROL_PROTOCOL_H__ + +#include <stdio.h> +#include <unistd.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <fcntl.h> +#include <netdb.h> +#include <string> +#include <errno.h> +#include <chrono> +#include <thread> +#include "Logger.h" +#include "Configure.h" +#include "Property.h" + +//! Forwarder declaration +class FlowController; + +#define DEFAULT_NIFI_SERVER_PORT 9000 +#define DEFAULT_REPORT_INTERVAL 1000 // 1 sec +#define MAX_READ_TIMEOUT 30000 // 30 seconds + +//! FlowControl Protocol Msg Type +typedef enum { + REGISTER_REQ, // Device Register Request from device to server which contain device serial number, current running flow xml version + REGISTER_RESP, // Device Register Respond from server to device, may contain new flow.xml from server ask device to apply and also device report interval + REPORT_REQ, // Period Device Report from device to server which contain device serial number, current running flow xml name/version and other period report info + REPORT_RESP, // Report Respond from server to device, may ask device to update flow xml or processor property + MAX_FLOW_CONTROL_MSG_TYPE +} FlowControlMsgType; + +//! FlowControl Protocol Msg Type String +static const char *FlowControlMsgTypeStr[MAX_FLOW_CONTROL_MSG_TYPE] = +{ + "REGISTER_REQ", + "REGISTER_RESP", + "REPORT_REQ", + "REPORT_RESP" +}; + +//! Flow Control Msg Type to String +inline const char *FlowControlMsgTypeToStr(FlowControlMsgType type) +{ + if (type < MAX_FLOW_CONTROL_MSG_TYPE) + return FlowControlMsgTypeStr[type]; + else + return NULL; +} + +//! FlowControll Protocol Msg ID (Some Messages are fix length, Some are variable length (TLV) +typedef enum { + //Fix length 8 bytes: client to server in register request, required field + FLOW_SERIAL_NUMBER, + // Flow XML name TLV: client to server in register request and report request, required field + FLOW_XML_NAME, + // Flow XML content, TLV: server to client in register respond, option field in case server want to ask client to load xml from server + FLOW_XML_CONTENT, + // Fix length, 4 bytes Report interval in msec: server to client in register respond, option field + REPORT_INTERVAL, + // Processor Name TLV: server to client in report respond, option field in case server want to ask client to update processor property + PROCESSOR_NAME, + // Processor Property Name TLV: server to client in report respond, option field in case server want to ask client to update processor property + PROPERTY_NAME, + // Processor Property Value TLV: server to client in report respond, option field in case server want to ask client to update processor property + PROPERTY_VALUE, + // Report Blob TLV: client to server in report request, option field in case client want to pickyback the report blob in report request to server + REPORT_BLOB, + MAX_FLOW_MSG_ID +} FlowControlMsgID; + +//! FlowControl Protocol Msg ID String +static const char *FlowControlMsgIDStr[MAX_FLOW_MSG_ID] = +{ + "FLOW_SERIAL_NUMBER", + "FLOW_XML_NAME", + "FLOW_XML_CONTENT", + "REPORT_INTERVAL", + "PROCESSOR_NAME" + "PROPERTY_NAME", + "PROPERTY_VALUE", + "REPORT_BLOB" +}; + +#define TYPE_HDR_LEN 4 // Fix Hdr Type +#define TLV_HDR_LEN 8 // Type 4 bytes and Len 4 bytes + +//! FlowControl Protocol Msg Len +inline int FlowControlMsgIDEncodingLen(FlowControlMsgID id, int payLoadLen) +{ + if (id == FLOW_SERIAL_NUMBER) + return (TYPE_HDR_LEN + 8); + else if (id == REPORT_INTERVAL) + return (TYPE_HDR_LEN + 4); + else if (id < MAX_FLOW_MSG_ID) + return (TLV_HDR_LEN + payLoadLen); + else + return -1; +} + +//! Flow Control Msg Id to String +inline const char *FlowControlMsgIdToStr(FlowControlMsgID id) +{ + if (id < MAX_FLOW_MSG_ID) + return FlowControlMsgIDStr[id]; + else + return NULL; +} + +//! Flow Control Respond status code +typedef enum { + RESP_SUCCESS, + RESP_TRIGGER_REGISTER, // Server respond to client report to re trigger register + RESP_START_FLOW_CONTROLLER, // Server respond to client to start flow controller + RESP_STOP_FLOW_CONTROLLER, // Server respond to client to stop flow controller + RESP_FAILURE, + MAX_RESP_CODE +} FlowControlRespCode; + +//! FlowControl Resp Code str +static const char *FlowControlRespCodeStr[MAX_RESP_CODE] = +{ + "RESP_SUCCESS", + "RESP_TRIGGER_REGISTER", + "RESP_START_FLOW_CONTROLLER", + "RESP_STOP_FLOW_CONTROLLER", + "RESP_FAILURE" +}; + +//! Flow Control Resp Code to String +inline const char *FlowControlRespCodeToStr(FlowControlRespCode code) +{ + if (code < MAX_RESP_CODE) + return FlowControlRespCodeStr[code]; + else + return NULL; +} + +//! Common FlowControlProtocol Header +typedef struct { + uint32_t msgType; //! Msg Type + uint32_t seqNumber; //! Seq Number to match Req with Resp + uint32_t status; //! Resp Code, see FlowControlRespCode + uint32_t payloadLen; //! Msg Payload length +} FlowControlProtocolHeader; + +//! FlowControlProtocol Class +class FlowControlProtocol +{ +public: + //! Constructor + /*! + * Create a new control protocol + */ + FlowControlProtocol(FlowController *controller) { + _controller = controller; + _logger = Logger::getLogger(); + _configure = Configure::getConfigure(); + _socket = 0; + _serverName = "localhost"; + _serverPort = DEFAULT_NIFI_SERVER_PORT; + _registered = false; + _seqNumber = 0; + _reportBlob = NULL; + _reportBlobLen = 0; + _reportInterval = DEFAULT_REPORT_INTERVAL; + _running = false; + + std::string value; + + if (_configure->get(Configure::nifi_server_name, value)) + { + _serverName = value; + _logger->log_info("NiFi Server Name %s", _serverName.c_str()); + } + if (_configure->get(Configure::nifi_server_port, value) && Property::StringToInt(value, _serverPort)) + { + _logger->log_info("NiFi Server Port: [%d]", _serverPort); + } + if (_configure->get(Configure::nifi_server_report_interval, value)) + { + TimeUnit unit; + if (Property::StringToTime(value, _reportInterval, unit) && + Property::ConvertTimeUnitToMS(_reportInterval, unit, _reportInterval)) + { + _logger->log_info("NiFi server report interval: [%d] ms", _reportInterval); + } + } + } + //! Destructor + virtual ~FlowControlProtocol() + { + stop(); + if (_socket) + close(_socket); + if (_reportBlob) + delete [] _reportBlob; + if (this->_thread) + delete this->_thread; + } + +public: + + //! SendRegisterRequest and Process Register Respond, return 0 for success + int sendRegisterReq(); + //! SendReportReq and Process Report Respond, return 0 for success + int sendReportReq(); + //! Start the flow control protocol + void start(); + //! Stop the flow control protocol + void stop(); + //! Set Report BLOB for periodically report + void setReportBlob(char *blob, int len) + { + std::lock_guard<std::mutex> lock(_mtx); + if (_reportBlob && _reportBlobLen >= len) + { + memcpy(_reportBlob, blob, len); + _reportBlobLen = len; + } + else + { + if (_reportBlob) + delete[] _reportBlob; + _reportBlob = new char[len]; + _reportBlobLen = len; + } + } + //! Run function for the thread + static void run(FlowControlProtocol *protocol); + //! set 8 bytes SerialNumber + void setSerialNumber(uint8_t *number) + { + memcpy(_serialNumber, number, 8); + } + +protected: + +private: + //! Connect to the socket, return sock descriptor if success, 0 for failure + int connectServer(const char *host, uint16_t port); + //! Send Data via the socket, return -1 for failure + int sendData(uint8_t *buf, int buflen); + //! Read length into buf, return -1 for failure and 0 for EOF + int readData(uint8_t *buf, int buflen); + //! Select on the socket + int selectClient(int msec); + //! Read the header + int readHdr(FlowControlProtocolHeader *hdr); + //! encode uint32_t + uint8_t *encode(uint8_t *buf, uint32_t value) + { + *buf++ = (value & 0xFF000000) >> 24; + *buf++ = (value & 0x00FF0000) >> 16; + *buf++ = (value & 0x0000FF00) >> 8; + *buf++ = (value & 0x000000FF); + return buf; + } + //! encode uint32_t + uint8_t *decode(uint8_t *buf, uint32_t &value) + { + value = ((buf[0]<<24)|(buf[1]<<16)|(buf[2]<<8)|(buf[3])); + return (buf + 4); + } + //! encode byte array + uint8_t *encode(uint8_t *buf, uint8_t *bufArray, int size) + { + memcpy(buf, bufArray, size); + buf += size; + return buf; + } + //! encode std::string + uint8_t *encode(uint8_t *buf, std::string value) + { + // add the \0 for size + buf = encode(buf, value.size()+1); + buf = encode(buf, (uint8_t *) value.c_str(), value.size()+1); + return buf; + } + //! Mutex for protection + std::mutex _mtx; + //! Logger + Logger *_logger; + //! Configure + Configure *_configure; + //! NiFi server Name + std::string _serverName; + //! NiFi server port + int64_t _serverPort; + //! Serial Number + uint8_t _serialNumber[8]; + //! socket to server + int _socket; + //! report interal in msec + int64_t _reportInterval; + //! whether it was registered to the NiFi server + bool _registered; + //! seq number + uint32_t _seqNumber; + //! FlowController + FlowController *_controller; + //! report Blob + char *_reportBlob; + //! report Blob len; + int _reportBlobLen; + //! thread + std::thread *_thread; + //! whether it is running + bool _running; + // Prevent default copy constructor and assignment operation + // Only support pass by reference or pointer + FlowControlProtocol(const FlowControlProtocol &parent); + FlowControlProtocol &operator=(const FlowControlProtocol &parent); + +}; + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/include/FlowController.h ---------------------------------------------------------------------- diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h new file mode 100644 index 0000000..0d758df --- /dev/null +++ b/libminifi/include/FlowController.h @@ -0,0 +1,248 @@ +/** + * @file FlowController.h + * FlowController class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __FLOW_CONTROLLER_H__ +#define __FLOW_CONTROLLER_H__ + +#include <uuid/uuid.h> +#include <vector> +#include <queue> +#include <map> +#include <mutex> +#include <atomic> +#include <algorithm> +#include <set> +#include <libxml/parser.h> +#include <libxml/tree.h> +#include <yaml-cpp/yaml.h> + +#include "Configure.h" +#include "Property.h" +#include "Relationship.h" +#include "FlowFileRecord.h" +#include "Connection.h" +#include "Processor.h" +#include "ProcessContext.h" +#include "ProcessSession.h" +#include "ProcessGroup.h" +#include "GenerateFlowFile.h" +#include "LogAttribute.h" +#include "RealTimeDataCollector.h" +#include "TimerDrivenSchedulingAgent.h" +#include "FlowControlProtocol.h" +#include "RemoteProcessorGroupPort.h" +#include "GetFile.h" +#include "TailFile.h" +#include "ListenSyslog.h" +#include "ExecuteProcess.h" + +//! Default NiFi Root Group Name +#define DEFAULT_ROOT_GROUP_NAME "" +#define DEFAULT_FLOW_XML_FILE_NAME "conf/flow.xml" +#define DEFAULT_FLOW_YAML_FILE_NAME "conf/flow.yml" +#define CONFIG_YAML_PROCESSORS_KEY "Processors" + +enum class ConfigFormat { XML, YAML }; + +struct ProcessorConfig { + std::string name; + std::string javaClass; + std::string maxConcurrentTasks; + std::string schedulingStrategy; + std::string schedulingPeriod; + std::string penalizationPeriod; + std::string yieldPeriod; + std::string runDurationNanos; + std::vector<std::string> autoTerminatedRelationships; + std::vector<Property> properties; +}; + +//! FlowController Class +class FlowController +{ +public: + static const int DEFAULT_MAX_TIMER_DRIVEN_THREAD = 10; + static const int DEFAULT_MAX_EVENT_DRIVEN_THREAD = 5; + //! Constructor + /*! + * Create a new Flow Controller + */ + FlowController(std::string name = DEFAULT_ROOT_GROUP_NAME); + //! Destructor + virtual ~FlowController(); + //! Set FlowController Name + void setName(std::string name) { + _name = name; + } + //! Get Flow Controller Name + std::string getName(void) { + return (_name); + } + //! Set UUID + void setUUID(uuid_t uuid) { + uuid_copy(_uuid, uuid); + } + //! Get UUID + bool getUUID(uuid_t uuid) { + if (uuid) + { + uuid_copy(uuid, _uuid); + return true; + } + else + return false; + } + //! Set MAX TimerDrivenThreads + void setMaxTimerDrivenThreads(int number) + { + _maxTimerDrivenThreads = number; + } + //! Get MAX TimerDrivenThreads + int getMaxTimerDrivenThreads() + { + return _maxTimerDrivenThreads; + } + //! Set MAX EventDrivenThreads + void setMaxEventDrivenThreads(int number) + { + _maxEventDrivenThreads = number; + } + //! Get MAX EventDrivenThreads + int getMaxEventDrivenThreads() + { + return _maxEventDrivenThreads; + } + //! Create FlowFile Repository + bool createFlowFileRepository(); + //! Create Content Repository + bool createContentRepository(); + + //! Life Cycle related function + //! Load flow xml from disk, after that, create the root process group and its children, initialize the flows + void load(ConfigFormat format); + //! Whether the Flow Controller is start running + bool isRunning(); + //! Whether the Flow Controller has already been initialized (loaded flow XML) + bool isInitialized(); + //! Start to run the Flow Controller which internally start the root process group and all its children + bool start(); + //! Stop to run the Flow Controller which internally stop the root process group and all its children + void stop(bool force); + //! Unload the current flow xml, clean the root process group and all its children + void unload(); + //! Load new xml + void reload(std::string xmlFile); + //! update property value + void updatePropertyValue(std::string processorName, std::string propertyName, std::string propertyValue) + { + if (_root) + _root->updatePropertyValue(processorName, propertyName, propertyValue); + } + + //! Create Processor (Node/Input/Output Port) based on the name + Processor *createProcessor(std::string name, uuid_t uuid); + //! Create Root Processor Group + ProcessGroup *createRootProcessGroup(std::string name, uuid_t uuid); + //! Create Remote Processor Group + ProcessGroup *createRemoteProcessGroup(std::string name, uuid_t uuid); + //! Create Connection + Connection *createConnection(std::string name, uuid_t uuid); + //! set 8 bytes SerialNumber + void setSerialNumber(uint8_t *number) + { + _protocol->setSerialNumber(number); + } + +protected: + + //! A global unique identifier + uuid_t _uuid; + //! FlowController Name + std::string _name; + //! Configuration File Name + std::string _configurationFileName; + //! NiFi property File Name + std::string _propertiesFileName; + //! Root Process Group + ProcessGroup *_root; + //! MAX Timer Driven Threads + int _maxTimerDrivenThreads; + //! MAX Event Driven Threads + int _maxEventDrivenThreads; + //! Config + //! FlowFile Repo + //! Provenance Repo + //! Flow Engines + //! Flow Scheduler + TimerDrivenSchedulingAgent _timerScheduler; + //! Controller Service + //! Config + //! Site to Site Server Listener + //! Heart Beat + //! FlowControl Protocol + FlowControlProtocol *_protocol; + +private: + + //! Mutex for protection + std::mutex _mtx; + //! Logger + Logger *_logger; + //! Configure + Configure *_configure; + //! Whether it is running + std::atomic<bool> _running; + //! Whether it has already been initialized (load the flow XML already) + std::atomic<bool> _initialized; + //! Process Processor Node XML + void parseProcessorNode(xmlDoc *doc, xmlNode *processorNode, ProcessGroup *parent); + //! Process Port XML + void parsePort(xmlDoc *doc, xmlNode *processorNode, ProcessGroup *parent, TransferDirection direction); + //! Process Root Processor Group XML + void parseRootProcessGroup(xmlDoc *doc, xmlNode *node); + //! Process Property XML + void parseProcessorProperty(xmlDoc *doc, xmlNode *node, Processor *processor); + //! Process connection XML + void parseConnection(xmlDoc *doc, xmlNode *node, ProcessGroup *parent); + //! Process Remote Process Group + void parseRemoteProcessGroup(xmlDoc *doc, xmlNode *node, ProcessGroup *parent); + + //! Process Processor Node YAML + void parseProcessorNodeYaml(YAML::Node processorNode, ProcessGroup *parent); + //! Process Port YAML + void parsePortYaml(YAML::Node *portNode, ProcessGroup *parent, TransferDirection direction); + //! Process Root Processor Group YAML + void parseRootProcessGroupYaml(YAML::Node rootNode); + //! Process Property YAML + void parseProcessorPropertyYaml(YAML::Node *doc, YAML::Node *node, Processor *processor); + //! Process connection YAML + void parseConnectionYaml(YAML::Node *node, ProcessGroup *parent); + //! Process Remote Process Group YAML + void parseRemoteProcessGroupYaml(YAML::Node *node, ProcessGroup *parent); + //! Parse Properties Node YAML for a processor + void parsePropertiesNodeYaml(YAML::Node *propertiesNode, Processor *processor); + + // Prevent default copy constructor and assignment operation + // Only support pass by reference or pointer + FlowController(const FlowController &parent); + FlowController &operator=(const FlowController &parent); + +}; + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/include/FlowFileRecord.h ---------------------------------------------------------------------- diff --git a/libminifi/include/FlowFileRecord.h b/libminifi/include/FlowFileRecord.h new file mode 100644 index 0000000..8b7362f --- /dev/null +++ b/libminifi/include/FlowFileRecord.h @@ -0,0 +1,220 @@ +/** + * @file FlowFileRecord.h + * Flow file record class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __FLOW_FILE_RECORD_H__ +#define __FLOW_FILE_RECORD_H__ + +#include <uuid/uuid.h> +#include <vector> +#include <queue> +#include <map> +#include <mutex> +#include <atomic> +#include <iostream> +#include <sstream> +#include <fstream> +#include <set> + +#include "TimeUtil.h" +#include "Logger.h" +#include "ResourceClaim.h" + +class ProcessSession; +class Connection; + +#define DEFAULT_FLOWFILE_PATH "." + +//! FlowFile Attribute +enum FlowAttribute +{ + //! The flowfile's path indicates the relative directory to which a FlowFile belongs and does not contain the filename + PATH = 0, + //! The flowfile's absolute path indicates the absolute directory to which a FlowFile belongs and does not contain the filename + ABSOLUTE_PATH, + //! The filename of the FlowFile. The filename should not contain any directory structure. + FILENAME, + //! A unique UUID assigned to this FlowFile. + UUID, + //! A numeric value indicating the FlowFile priority + priority, + //! The MIME Type of this FlowFile + MIME_TYPE, + //! Specifies the reason that a FlowFile is being discarded + DISCARD_REASON, + //! Indicates an identifier other than the FlowFile's UUID that is known to refer to this FlowFile. + ALTERNATE_IDENTIFIER, + MAX_FLOW_ATTRIBUTES +}; + +//! FlowFile Attribute Key +static const char *FlowAttributeKeyArray[MAX_FLOW_ATTRIBUTES] = +{ + "path", + "absolute.path", + "filename", + "uuid", + "priority", + "mime.type", + "discard.reason", + "alternate.identifier" +}; + +//! FlowFile Attribute Enum to Key +inline const char *FlowAttributeKey(FlowAttribute attribute) +{ + if (attribute < MAX_FLOW_ATTRIBUTES) + return FlowAttributeKeyArray[attribute]; + else + return NULL; +} + +//! FlowFile IO Callback functions for input and output +//! throw exception for error +class InputStreamCallback +{ +public: + virtual void process(std::ifstream *stream) = 0; +}; +class OutputStreamCallback +{ +public: + virtual void process(std::ofstream *stream) = 0; +}; + + +//! FlowFile Record Class +class FlowFileRecord +{ + friend class ProcessSession; +public: + //! Constructor + /*! + * Create a new flow record + */ + FlowFileRecord(std::map<std::string, std::string> attributes, ResourceClaim *claim = NULL); + //! Destructor + virtual ~FlowFileRecord(); + //! addAttribute key is enum + bool addAttribute(FlowAttribute key, std::string value); + //! addAttribute key is string + bool addAttribute(std::string key, std::string value); + //! removeAttribute key is enum + bool removeAttribute(FlowAttribute key); + //! removeAttribute key is string + bool removeAttribute(std::string key); + //! updateAttribute key is enum + bool updateAttribute(FlowAttribute key, std::string value); + //! updateAttribute key is string + bool updateAttribute(std::string key, std::string value); + //! getAttribute key is enum + bool getAttribute(FlowAttribute key, std::string &value); + //! getAttribute key is string + bool getAttribute(std::string key, std::string &value); + //! setAttribute, if attribute already there, update it, else, add it + void setAttribute(std::string key, std::string value) { + _attributes[key] = value; + } + //! Get the UUID as string + std::string getUUIDStr() { + return _uuidStr; + } + //! Get Attributes + std::map<std::string, std::string> getAttributes() { + return _attributes; + } + //! Check whether it is still being penalized + bool isPenalized() { + return (_penaltyExpirationMs > 0 ? _penaltyExpirationMs > getTimeMillis() : false); + } + //! Get Size + uint64_t getSize() { + return _size; + } + // ! Get Offset + uint64_t getOffset() { + return _offset; + } + // ! Get Entry Date + uint64_t getEntryDate() { + return _entryDate; + } + // ! Get Lineage Start Date + uint64_t getlineageStartDate() { + return _lineageStartDate; + } + // ! Set Original connection + void setOriginalConnection (Connection *connection) { + _orginalConnection = connection; + } + //! Get Resource Claim + ResourceClaim *getResourceClaim() { + return _claim; + } + +protected: + + //! Date at which the flow file entered the flow + uint64_t _entryDate; + //! Date at which the origin of this flow file entered the flow + uint64_t _lineageStartDate; + //! Date at which the flow file was queued + uint64_t _lastQueueDate; + //! Size in bytes of the data corresponding to this flow file + uint64_t _size; + //! A global unique identifier + uuid_t _uuid; + //! A local unique identifier + uint64_t _id; + //! Offset to the content + uint64_t _offset; + //! Penalty expiration + uint64_t _penaltyExpirationMs; + //! Attributes key/values pairs for the flow record + std::map<std::string, std::string> _attributes; + //! Pointer to the associated content resource claim + ResourceClaim *_claim; + //! UUID string + std::string _uuidStr; + //! UUID string for all parents + std::set<std::string> _lineageIdentifiers; + //! duplicate the original flow file + void duplicate(FlowFileRecord *original); + +private: + + //! Local flow sequence ID + static std::atomic<uint64_t> _localFlowSeqNumber; + //! Mark for deletion + bool _markedDelete; + //! Connection queue that this flow file will be transfer or current in + Connection *_connection; + //! Orginal connection queue that this flow file was dequeued from + Connection *_orginalConnection; + //! Logger + Logger *_logger; + //! Snapshot flow record for session rollback + bool _snapshot; + // Prevent default copy constructor and assignment operation + // Only support pass by reference or pointer + FlowFileRecord(const FlowFileRecord &parent); + FlowFileRecord &operator=(const FlowFileRecord &parent); + +}; + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/include/GenerateFlowFile.h ---------------------------------------------------------------------- diff --git a/libminifi/include/GenerateFlowFile.h b/libminifi/include/GenerateFlowFile.h new file mode 100644 index 0000000..27aa43b --- /dev/null +++ b/libminifi/include/GenerateFlowFile.h @@ -0,0 +1,87 @@ +/** + * @file GenerateFlowFile.h + * GenerateFlowFile class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __GENERATE_FLOW_FILE_H__ +#define __GENERATE_FLOW_FILE_H__ + +#include "FlowFileRecord.h" +#include "Processor.h" +#include "ProcessSession.h" + +//! GenerateFlowFile Class +class GenerateFlowFile : public Processor +{ +public: + //! Constructor + /*! + * Create a new processor + */ + GenerateFlowFile(std::string name, uuid_t uuid = NULL) + : Processor(name, uuid) + { + _data = NULL; + _dataSize = 0; + } + //! Destructor + virtual ~GenerateFlowFile() + { + if (_data) + delete[] _data; + } + //! Processor Name + static const std::string ProcessorName; + //! Supported Properties + static Property FileSize; + static Property BatchSize; + static Property DataFormat; + static Property UniqueFlowFiles; + static const char *DATA_FORMAT_BINARY; + static const char *DATA_FORMAT_TEXT; + //! Supported Relationships + static Relationship Success; + //! Nest Callback Class for write stream + class WriteCallback : public OutputStreamCallback + { + public: + WriteCallback(char *data, uint64_t size) + : _data(data), _dataSize(size) {} + char *_data; + uint64_t _dataSize; + void process(std::ofstream *stream) { + if (_data && _dataSize > 0) + stream->write(_data, _dataSize); + } + }; + +public: + //! OnTrigger method, implemented by NiFi GenerateFlowFile + virtual void onTrigger(ProcessContext *context, ProcessSession *session); + //! Initialize, over write by NiFi GenerateFlowFile + virtual void initialize(void); + +protected: + +private: + //! Generated data + char * _data; + //! Size of the generate data + uint64_t _dataSize; +}; + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/include/GetFile.h ---------------------------------------------------------------------- diff --git a/libminifi/include/GetFile.h b/libminifi/include/GetFile.h new file mode 100644 index 0000000..eb975fd --- /dev/null +++ b/libminifi/include/GetFile.h @@ -0,0 +1,117 @@ +/** + * @file GetFile.h + * GetFile class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __GET_FILE_H__ +#define __GET_FILE_H__ + +#include "FlowFileRecord.h" +#include "Processor.h" +#include "ProcessSession.h" + +//! GetFile Class +class GetFile : public Processor +{ +public: + //! Constructor + /*! + * Create a new processor + */ + GetFile(std::string name, uuid_t uuid = NULL) + : Processor(name, uuid) + { + _logger = Logger::getLogger(); + _directory = "."; + _recursive = true; + _keepSourceFile = false; + _minAge = 0; + _maxAge = 0; + _minSize = 0; + _maxSize = 0; + _ignoreHiddenFile = true; + _pollInterval = 0; + _batchSize = 10; + _lastDirectoryListingTime = getTimeMillis(); + _fileFilter = "[^\\.].*"; + } + //! Destructor + virtual ~GetFile() + { + } + //! Processor Name + static const std::string ProcessorName; + //! Supported Properties + static Property Directory; + static Property Recurse; + static Property KeepSourceFile; + static Property MinAge; + static Property MaxAge; + static Property MinSize; + static Property MaxSize; + static Property IgnoreHiddenFile; + static Property PollInterval; + static Property BatchSize; + static Property FileFilter; + //! Supported Relationships + static Relationship Success; + +public: + //! OnTrigger method, implemented by NiFi GetFile + virtual void onTrigger(ProcessContext *context, ProcessSession *session); + //! Initialize, over write by NiFi GetFile + virtual void initialize(void); + //! perform directory listing + void performListing(std::string dir); + +protected: + +private: + //! Logger + Logger *_logger; + //! Queue for store directory list + std::queue<std::string> _dirList; + //! Get Listing size + uint64_t getListingSize() { + std::lock_guard<std::mutex> lock(_mtx); + return _dirList.size(); + } + //! Whether the directory listing is empty + bool isListingEmpty(); + //! Put full path file name into directory listing + void putListing(std::string fileName); + //! Poll directory listing for files + void pollListing(std::queue<std::string> &list, int maxSize); + //! Check whether file can be added to the directory listing + bool acceptFile(std::string fileName); + //! Mutex for protection of the directory listing + std::mutex _mtx; + std::string _directory; + bool _recursive; + bool _keepSourceFile; + int64_t _minAge; + int64_t _maxAge; + int64_t _minSize; + int64_t _maxSize; + bool _ignoreHiddenFile; + int64_t _pollInterval; + int64_t _batchSize; + uint64_t _lastDirectoryListingTime; + std::string _fileFilter; +}; + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/include/ListenSyslog.h ---------------------------------------------------------------------- diff --git a/libminifi/include/ListenSyslog.h b/libminifi/include/ListenSyslog.h new file mode 100644 index 0000000..81bc92c --- /dev/null +++ b/libminifi/include/ListenSyslog.h @@ -0,0 +1,209 @@ +/** + * @file ListenSyslog.h + * ListenSyslog class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __LISTEN_SYSLOG_H__ +#define __LISTEN_SYSLOG_H__ + +#include <stdio.h> +#include <unistd.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <errno.h> +#include <sys/select.h> +#include <sys/time.h> +#include <sys/types.h> +#include <chrono> +#include <thread> +#include "FlowFileRecord.h" +#include "Processor.h" +#include "ProcessSession.h" + +//! SyslogEvent +typedef struct { + uint8_t *payload; + uint64_t len; +} SysLogEvent; + +//! ListenSyslog Class +class ListenSyslog : public Processor +{ +public: + //! Constructor + /*! + * Create a new processor + */ + ListenSyslog(std::string name, uuid_t uuid = NULL) + : Processor(name, uuid) + { + _logger = Logger::getLogger(); + _eventQueueByteSize = 0; + _serverSocket = 0; + _recvBufSize = 65507; + _maxSocketBufSize = 1024*1024; + _maxConnections = 2; + _maxBatchSize = 1; + _messageDelimiter = "\n"; + _protocol = "UDP"; + _port = 514; + _parseMessages = false; + _serverSocket = 0; + _maxFds = 0; + FD_ZERO(&_readfds); + _thread = NULL; + _resetServerSocket = false; + _serverTheadRunning = false; + } + //! Destructor + virtual ~ListenSyslog() + { + _serverTheadRunning = false; + if (this->_thread) + delete this->_thread; + // need to reset the socket + std::vector<int>::iterator it; + for (it = _clientSockets.begin(); it != _clientSockets.end(); ++it) + { + int clientSocket = *it; + close(clientSocket); + } + _clientSockets.clear(); + if (_serverSocket > 0) + { + _logger->log_info("ListenSysLog Server socket %d close", _serverSocket); + close(_serverSocket); + _serverSocket = 0; + } + } + //! Processor Name + static const std::string ProcessorName; + //! Supported Properties + static Property RecvBufSize; + static Property MaxSocketBufSize; + static Property MaxConnections; + static Property MaxBatchSize; + static Property MessageDelimiter; + static Property ParseMessages; + static Property Protocol; + static Property Port; + //! Supported Relationships + static Relationship Success; + static Relationship Invalid; + //! Nest Callback Class for write stream + class WriteCallback : public OutputStreamCallback + { + public: + WriteCallback(char *data, uint64_t size) + : _data(data), _dataSize(size) {} + char *_data; + uint64_t _dataSize; + void process(std::ofstream *stream) { + if (_data && _dataSize > 0) + stream->write(_data, _dataSize); + } + }; + +public: + //! OnTrigger method, implemented by NiFi ListenSyslog + virtual void onTrigger(ProcessContext *context, ProcessSession *session); + //! Initialize, over write by NiFi ListenSyslog + virtual void initialize(void); + +protected: + +private: + //! Logger + Logger *_logger; + //! Run function for the thread + static void run(ListenSyslog *process); + //! Run Thread + void runThread(); + //! Queue for store syslog event + std::queue<SysLogEvent> _eventQueue; + //! Size of Event queue in bytes + uint64_t _eventQueueByteSize; + //! Get event queue size + uint64_t getEventQueueSize() { + std::lock_guard<std::mutex> lock(_mtx); + return _eventQueue.size(); + } + //! Get event queue byte size + uint64_t getEventQueueByteSize() { + std::lock_guard<std::mutex> lock(_mtx); + return _eventQueueByteSize; + } + //! Whether the event queue is empty + bool isEventQueueEmpty() + { + std::lock_guard<std::mutex> lock(_mtx); + return _eventQueue.empty(); + } + //! Put event into directory listing + void putEvent(uint8_t *payload, uint64_t len) + { + std::lock_guard<std::mutex> lock(_mtx); + SysLogEvent event; + event.payload = payload; + event.len = len; + _eventQueue.push(event); + _eventQueueByteSize += len; + } + //! Read \n terminated line from TCP socket + int readline( int fd, char *bufptr, size_t len ); + //! start server socket and handling client socket + void startSocketThread(); + //! Poll event + void pollEvent(std::queue<SysLogEvent> &list, int maxSize) + { + std::lock_guard<std::mutex> lock(_mtx); + + while (!_eventQueue.empty() && (maxSize == 0 || list.size() < maxSize)) + { + SysLogEvent event = _eventQueue.front(); + _eventQueue.pop(); + _eventQueueByteSize -= event.len; + list.push(event); + } + return; + } + //! Mutex for protection of the directory listing + std::mutex _mtx; + int64_t _recvBufSize; + int64_t _maxSocketBufSize; + int64_t _maxConnections; + int64_t _maxBatchSize; + std::string _messageDelimiter; + std::string _protocol; + int64_t _port; + bool _parseMessages; + int _serverSocket; + std::vector<int> _clientSockets; + int _maxFds; + fd_set _readfds; + //! thread + std::thread *_thread; + //! whether to reset the server socket + bool _resetServerSocket; + bool _serverTheadRunning; + //! buffer for read socket + uint8_t _buffer[2048]; +}; + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/include/LogAttribute.h ---------------------------------------------------------------------- diff --git a/libminifi/include/LogAttribute.h b/libminifi/include/LogAttribute.h new file mode 100644 index 0000000..125ebf3 --- /dev/null +++ b/libminifi/include/LogAttribute.h @@ -0,0 +1,128 @@ +/** + * @file LogAttribute.h + * LogAttribute class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __LOG_ATTRIBUTE_H__ +#define __LOG_ATTRIBUTE_H__ + +#include "FlowFileRecord.h" +#include "Processor.h" +#include "ProcessSession.h" + +//! LogAttribute Class +class LogAttribute : public Processor +{ +public: + //! Constructor + /*! + * Create a new processor + */ + LogAttribute(std::string name, uuid_t uuid = NULL) + : Processor(name, uuid) + { + _logger = Logger::getLogger(); + } + //! Destructor + virtual ~LogAttribute() + { + } + //! Processor Name + static const std::string ProcessorName; + //! Supported Properties + static Property LogLevel; + static Property AttributesToLog; + static Property AttributesToIgnore; + static Property LogPayload; + static Property LogPrefix; + //! Supported Relationships + static Relationship Success; + enum LogAttrLevel { + LogAttrLevelTrace, LogAttrLevelDebug, LogAttrLevelInfo, LogAttrLevelWarn, LogAttrLevelError + }; + //! Convert log level from string to enum + bool logLevelStringToEnum(std::string logStr, LogAttrLevel &level) + { + if (logStr == "trace") + { + level = LogAttrLevelTrace; + return true; + } + else if (logStr == "debug") + { + level = LogAttrLevelDebug; + return true; + } + else if (logStr == "info") + { + level = LogAttrLevelInfo; + return true; + } + else if (logStr == "warn") + { + level = LogAttrLevelWarn; + return true; + } + else if (logStr == "error") + { + level = LogAttrLevelError; + return true; + } + else + return false; + } + //! Nest Callback Class for read stream + class ReadCallback : public InputStreamCallback + { + public: + ReadCallback(uint64_t size) + { + _bufferSize = size; + _buffer = new char[_bufferSize]; + } + ~ReadCallback() + { + if (_buffer) + delete[] _buffer; + } + void process(std::ifstream *stream) { + + stream->read(_buffer, _bufferSize); + if (!stream) + _readSize = stream->gcount(); + else + _readSize = _bufferSize; + } + char *_buffer; + uint64_t _bufferSize; + uint64_t _readSize; + }; + +public: + //! OnTrigger method, implemented by NiFi LogAttribute + virtual void onTrigger(ProcessContext *context, ProcessSession *session); + //! Initialize, over write by NiFi LogAttribute + virtual void initialize(void); + +protected: + +private: + //! Logger + Logger *_logger; +}; + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/include/Logger.h ---------------------------------------------------------------------- diff --git a/libminifi/include/Logger.h b/libminifi/include/Logger.h new file mode 100644 index 0000000..3edad9d --- /dev/null +++ b/libminifi/include/Logger.h @@ -0,0 +1,154 @@ +/** + * @file Logger.h + * Logger class declaration + * This is a C++ wrapper for spdlog, a lightweight C++ logging library + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __LOGGER_H__ +#define __LOGGER_H__ + +#include "spdlog/spdlog.h" + +using spdlog::stdout_logger_mt; +using spdlog::rotating_logger_mt; +using spdlog::logger; + +#define LOG_BUFFER_SIZE 1024 +#define FILL_BUFFER char buffer[LOG_BUFFER_SIZE]; \ + va_list args; \ + va_start(args, format); \ + vsnprintf(buffer, LOG_BUFFER_SIZE,format, args); \ + va_end(args); + +//! 5M default log file size +#define DEFAULT_LOG_FILE_SIZE (5*1024*1024) +//! 3 log files rotation +#define DEFAULT_LOG_FILE_NUMBER 3 +#define LOG_NAME "minifi log" +#define LOG_FILE_NAME "minifi-app.log" + +typedef enum +{ + trace = 0, + debug = 1, + info = 2, + notice = 3, + warn = 4, + err = 5, + critical = 6, + alert = 7, + emerg = 8, + off = 9 +} LOG_LEVEL_E; + +//! Logger Class +class Logger { + +public: + + //! Get the singleton logger instance + static Logger * getLogger() { + if (!_logger) + _logger = new Logger(); + return _logger; + } + void setLogLevel(LOG_LEVEL_E level) { + if (_spdlog == NULL) + return; + _spdlog->set_level((spdlog::level::level_enum) level); + } + //! Destructor + ~Logger() {} + /** + * @brief Log error message + * @param format format string ('man printf' for syntax) + * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match + */ + void log_error(const char *const format, ...) { + if(_spdlog == NULL) + return; + FILL_BUFFER + _spdlog->error(buffer); + } + /** + * @brief Log warn message + * @param format format string ('man printf' for syntax) + * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match + */ + void log_warn(const char *const format, ...) { + if(_spdlog == NULL) + return; + FILL_BUFFER + _spdlog->warn(buffer); + } + /** + * @brief Log info message + * @param format format string ('man printf' for syntax) + * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match + */ + void log_info(const char *const format, ...) { + if(_spdlog == NULL) + return; + FILL_BUFFER + _spdlog->info(buffer); + } + /** + * @brief Log debug message + * @param format format string ('man printf' for syntax) + * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match + */ + void log_debug(const char *const format, ...) { + if(_spdlog == NULL) + return; + FILL_BUFFER + _spdlog->debug(buffer); + } + /** + * @brief Log trace message + * @param format format string ('man printf' for syntax) + * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match + */ + void log_trace(const char *const format, ...) { + if(_spdlog == NULL) + return; + FILL_BUFFER + _spdlog->trace(buffer); + } + +protected: + +private: + // Prevent default copy constructor and assignment operation + // Only support pass by reference or pointer + Logger(const Logger &parent); + Logger &operator=(const Logger &parent); + //! Constructor + /*! + * Create a logger + * */ + Logger(const std::string logger_name = LOG_NAME, const std::string filename = LOG_FILE_NAME, size_t max_file_size = DEFAULT_LOG_FILE_SIZE, size_t max_files = DEFAULT_LOG_FILE_NUMBER, bool force_flush = true) { + _spdlog = rotating_logger_mt(logger_name, filename, max_file_size, max_files, force_flush); + _spdlog->set_level((spdlog::level::level_enum) debug); + } + //! spdlog + std::shared_ptr<logger> _spdlog; + + //! Singleton logger instance + static Logger *_logger; +}; + +#endif