http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/core/Property.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/Property.h b/libminifi/include/core/Property.h new file mode 100644 index 0000000..c681449 --- /dev/null +++ b/libminifi/include/core/Property.h @@ -0,0 +1,264 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __PROPERTY_H__ +#define __PROPERTY_H__ + +#include <algorithm> +#include <sstream> +#include <string> +#include <vector> +#include <queue> +#include <map> +#include <mutex> +#include <atomic> +#include <functional> +#include <set> +#include <stdlib.h> +#include <math.h> + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { + +// Time Unit +enum TimeUnit { + DAY, + HOUR, + MINUTE, + SECOND, + MILLISECOND, + NANOSECOND +}; + +// Property Class +class Property { + + public: + // Constructor + /*! + * Create a new property + */ + Property(const std::string name, const std::string description, + const std::string value) + : name_(name), + description_(description), + value_(value) { + } + Property() { + } + // Destructor + virtual ~Property() { + } + // Get Name for the property + std::string getName() const; + // Get Description for the property + std::string getDescription(); + // Get value for the property + std::string getValue() const; + // Set value for the property + void setValue(std::string value); + const Property &operator=(const Property &other); + // Compare + bool operator <(const Property & right) const; + + // Convert TimeUnit to MilliSecond + static bool ConvertTimeUnitToMS(int64_t input, TimeUnit unit, int64_t &out) { + if (unit == MILLISECOND) { + out = input; + return true; + } else if (unit == SECOND) { + out = input * 1000; + return true; + } else if (unit == MINUTE) { + out = input * 60 * 1000; + return true; + } else if (unit == HOUR) { + out = input * 60 * 60 * 1000; + return true; + } else if (unit == DAY) { + out = 24 * 60 * 60 * 1000; + return true; + } else if (unit == NANOSECOND) { + out = input / 1000 / 1000; + return true; + } else { + return false; + } + } + // Convert TimeUnit to NanoSecond + static bool ConvertTimeUnitToNS(int64_t input, TimeUnit unit, int64_t &out) { + if (unit == MILLISECOND) { + out = input * 1000 * 1000; + return true; + } else if (unit == SECOND) { + out = input * 1000 * 1000 * 1000; + return true; + } else if (unit == MINUTE) { + out = input * 60 * 1000 * 1000 * 1000; + return true; + } else if (unit == HOUR) { + out = input * 60 * 60 * 1000 * 1000 * 1000; + return true; + } else if (unit == NANOSECOND) { + out = input; + return true; + } else { + return false; + } + } + // Convert String + static bool StringToTime(std::string input, int64_t &output, + TimeUnit &timeunit) { + if (input.size() == 0) { + return false; + } + + const char *cvalue = input.c_str(); + char *pEnd; + long int ival = strtol(cvalue, &pEnd, 0); + + if (pEnd[0] == '\0') { + return false; + } + + while (*pEnd == ' ') { + // Skip the space + pEnd++; + } + + std::string unit(pEnd); + + if (unit == "sec" || unit == "s" || unit == "second" || unit == "seconds" + || unit == "secs") { + timeunit = SECOND; + output = ival; + return true; + } else if (unit == "min" || unit == "m" || unit == "mins" + || unit == "minute" || unit == "minutes") { + timeunit = MINUTE; + output = ival; + return true; + } else if (unit == "ns" || unit == "nano" || unit == "nanos" + || unit == "nanoseconds") { + timeunit = NANOSECOND; + output = ival; + return true; + } else if (unit == "ms" || unit == "milli" || unit == "millis" + || unit == "milliseconds") { + timeunit = MILLISECOND; + output = ival; + return true; + } else if (unit == "h" || unit == "hr" || unit == "hour" || unit == "hrs" + || unit == "hours") { + timeunit = HOUR; + output = ival; + return true; + } else if (unit == "d" || unit == "day" || unit == "days") { + timeunit = DAY; + output = ival; + return true; + } else + return false; + } + + // Convert String to Integer + static bool StringToInt(std::string input, int64_t &output) { + if (input.size() == 0) { + return false; + } + + const char *cvalue = input.c_str(); + char *pEnd; + long int ival = strtol(cvalue, &pEnd, 0); + + if (pEnd[0] == '\0') { + output = ival; + return true; + } + + while (*pEnd == ' ') { + // Skip the space + pEnd++; + } + + char end0 = toupper(pEnd[0]); + if ((end0 == 'K') || (end0 == 'M') || (end0 == 'G') || (end0 == 'T') + || (end0 == 'P')) { + if (pEnd[1] == '\0') { + unsigned long int multiplier = 1000; + + if ((end0 != 'K')) { + multiplier *= 1000; + if (end0 != 'M') { + multiplier *= 1000; + if (end0 != 'G') { + multiplier *= 1000; + if (end0 != 'T') { + multiplier *= 1000; + } + } + } + } + output = ival * multiplier; + return true; + + } else if ((pEnd[1] == 'b' || pEnd[1] == 'B') && (pEnd[2] == '\0')) { + + unsigned long int multiplier = 1024; + + if ((end0 != 'K')) { + multiplier *= 1024; + if (end0 != 'M') { + multiplier *= 1024; + if (end0 != 'G') { + multiplier *= 1024; + if (end0 != 'T') { + multiplier *= 1024; + } + } + } + } + output = ival * multiplier; + return true; + } + } + + return false; + } + + protected: + // Name + std::string name_; + // Description + std::string description_; + // Value + std::string value_; + + private: + +}; + +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/core/Relationship.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/Relationship.h b/libminifi/include/core/Relationship.h new file mode 100644 index 0000000..416ede6 --- /dev/null +++ b/libminifi/include/core/Relationship.h @@ -0,0 +1,96 @@ +/** + * @file Relationship.h + * Relationship class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __RELATIONSHIP_H__ +#define __RELATIONSHIP_H__ + +#include <string> +#include <uuid/uuid.h> +#include <vector> +#include <queue> +#include <map> +#include <mutex> +#include <atomic> + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { + +// undefined relationship for remote process group outgoing port and root process group incoming port +#define UNDEFINED_RELATIONSHIP "undefined" + +inline bool isRelationshipNameUndefined(std::string name) { + if (name == UNDEFINED_RELATIONSHIP) + return true; + else + return false; +} + +// Relationship Class +class Relationship { + + public: + /* + * Create a new relationship + */ + Relationship(const std::string name, const std::string description) + : name_(name), + description_(description) { + } + Relationship() + : name_(UNDEFINED_RELATIONSHIP) { + } + // Destructor + virtual ~Relationship() { + } + // Get Name for the relationship + std::string getName() const { + return name_; + } + // Get Description for the relationship + std::string getDescription() const { + return description_; + } + // Compare + bool operator <(const Relationship & right) const { + return name_ < right.name_; + } + // Whether it is a undefined relationship + bool isRelationshipUndefined() { + return isRelationshipNameUndefined(name_); + } + + protected: + + // Name + std::string name_; + // Description + std::string description_; + + private: +}; + +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/core/Repository.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/Repository.h b/libminifi/include/core/Repository.h new file mode 100644 index 0000000..a668df5 --- /dev/null +++ b/libminifi/include/core/Repository.h @@ -0,0 +1,153 @@ +/** + * @file Repository + * Repository class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __REPOSITORY_H__ +#define __REPOSITORY_H__ + +#include <ftw.h> +#include <uuid/uuid.h> +#include <atomic> +#include <cstdint> +#include <cstring> +#include <iostream> +#include <map> +#include <set> +#include <string> +#include <thread> +#include <vector> + +#include "properties/Configure.h" +#include "core/logging/Logger.h" +#include "core/Property.h" +#include "ResourceClaim.h" +#include "io/Serializable.h" +#include "utils/TimeUtil.h" +#include "utils/StringUtils.h" +#include "core.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { + +class Repository : public CoreComponent { + public: + /* + * Constructor for the repository + */ + Repository(std::string repo_name, std::string directory, + int64_t maxPartitionMillis, int64_t maxPartitionBytes, + uint64_t purgePeriod) + : CoreComponent(repo_name), + thread_() { + directory_ = directory; + max_partition_millis_ = maxPartitionMillis; + max_partition_bytes_ = maxPartitionBytes; + purge_period_ = purgePeriod; + configure_ = Configure::getConfigure(); + running_ = false; + repo_full_ = false; + } + + // Destructor + virtual ~Repository() { + stop(); + } + + // initialize + virtual bool initialize(){ + return true; + } + // Put + virtual bool Put(std::string key, uint8_t *buf, int bufLen){ + return true; + } + // Delete + virtual bool Delete(std::string key){ + return true; + } + + virtual bool Get(std::string key, std::string &value) { + return true; + } + + // Run function for the thread + virtual void run(){ + // no op + } + // Start the repository monitor thread + virtual void start(); + // Stop the repository monitor thread + virtual void stop(); + // whether the repo is full + virtual bool isFull() { + return repo_full_; + } + // whether the repo is enable + virtual bool isRunning() { + return running_; + } + uint64_t incrementSize(const char *fpath, const struct stat *sb, + int typeflag) { + return (repo_size_ += sb->st_size); + } + + // Prevent default copy constructor and assignment operation + // Only support pass by reference or pointer + Repository(const Repository &parent) = delete; + Repository &operator=(const Repository &parent) = delete; + + protected: + // Mutex for protection + std::mutex mutex_; + // repository directory + std::string directory_; + // Configure + Configure *configure_; + // max db entry life time + int64_t max_partition_millis_; + // max db size + int64_t max_partition_bytes_; + // purge period + uint64_t purge_period_; + // thread + std::thread thread_; + // whether the monitoring thread is running for the repo while it was enabled + bool running_; + // whether stop accepting provenace event + std::atomic<bool> repo_full_; + // repoSize + uint64_t repoSize(); + // size of the directory + std::atomic<uint64_t> repo_size_; + + private: + // Run function for the thread + void threadExecutor(){ + run(); + } +}; + +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/core/RepositoryFactory.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/RepositoryFactory.h b/libminifi/include/core/RepositoryFactory.h new file mode 100644 index 0000000..03ed524 --- /dev/null +++ b/libminifi/include/core/RepositoryFactory.h @@ -0,0 +1,44 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef LIBMINIFI_INCLUDE_CORE_REPOSITORYFACTORY_H_ +#define LIBMINIFI_INCLUDE_CORE_REPOSITORYFACTORY_H_ + + +#include "core/Repository.h" +#include "core.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { + +namespace core { + + std::shared_ptr<core::Repository> createRepository( + const std::string configuration_class_name, bool fail_safe = false); + + + +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_CORE_REPOSITORYFACTORY_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/core/Scheduling.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/Scheduling.h b/libminifi/include/core/Scheduling.h new file mode 100644 index 0000000..0c983df --- /dev/null +++ b/libminifi/include/core/Scheduling.h @@ -0,0 +1,64 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CORE_SCHEDULING_H_ +#define LIBMINIFI_INCLUDE_CORE_SCHEDULING_H_ + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { + +/* + * Indicates the valid values for the state of a entity + * with respect to scheduling the entity to run. + */ +enum ScheduledState { + + /** + * Entity cannot be scheduled to run + */ + DISABLED, + /** + * Entity can be scheduled to run but currently is not + */ + STOPPED, + /** + * Entity is currently scheduled to run + */ + RUNNING +}; + +/* + * Scheduling Strategy + */ +enum SchedulingStrategy { + // Event driven + EVENT_DRIVEN, + // Timer driven + TIMER_DRIVEN, + // Cron Driven + CRON_DRIVEN +}; + +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ +#endif /* LIBMINIFI_INCLUDE_CORE_SCHEDULING_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/core/core.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/core.h b/libminifi/include/core/core.h new file mode 100644 index 0000000..9f86100 --- /dev/null +++ b/libminifi/include/core/core.h @@ -0,0 +1,177 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CORE_CORE_H_ +#define LIBMINIFI_INCLUDE_CORE_CORE_H_ + +#include <uuid/uuid.h> +#include <cxxabi.h> +#include "core/logging/Logger.h" +/** + * namespace aliasing + */ +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { +} +namespace processors { +} +namespace provenance { + +} +namespace core { + +template<typename T> +static inline std::string getClassName() { + char *b = abi::__cxa_demangle(typeid(T).name(), 0, 0, 0); + std::string name = b; + delete [] b; + return name; +} + +template<typename T> +struct class_operations { + + template<typename Q=T> + static std::true_type canDestruct(decltype(std::declval<Q>().~Q()) *) { + return std::true_type(); + } + + + template<typename Q=T> + static std::false_type canDestruct(...) { + return std::false_type(); + } + + typedef decltype(canDestruct<T>(0)) type; + + static const bool value = type::value; /* Which is it? */ +}; + + +template<typename T> +typename std::enable_if<!class_operations<T>::value, T*>::type instantiate() { + throw std::runtime_error("Cannot instantiate class"); +} + +template<typename T> +typename std::enable_if<class_operations<T>::value, T*>::type instantiate() { + return new T(); +} + +/** + * Base component within MiNiFi + * Purpose: Many objects store a name and UUID, therefore + * the functionality is localized here to avoid duplication + */ +class CoreComponent { + + public: + + /** + * Constructor that sets the name and uuid. + */ + explicit CoreComponent(const std::string name, uuid_t uuid = 0) + : logger_(logging::Logger::getLogger()), + name_(name) { + if (!uuid) + // Generate the global UUID for the flow record + uuid_generate(uuid_); + else + uuid_copy(uuid_, uuid); + + char uuidStr[37]; + uuid_unparse_lower(uuid_, uuidStr); + uuidStr_ = uuidStr; + } + + /** + * Move Constructor. + */ + explicit CoreComponent(const CoreComponent &&other) + : name_(std::move(other.name_)), + logger_(logging::Logger::getLogger()) { + uuid_copy(uuid_, other.uuid_); + } + + // Get component name Name + std::string getName(); + + /** + * Set name. + * @param name + */ + void setName(const std::string name); + + /** + * Set UUID in this instance + * @param uuid uuid to apply to the internal representation. + */ + void setUUID(uuid_t uuid); + + /** + * Returns the UUID through the provided object. + * @param uuid uuid struct to which we will copy the memory + * @return success of request + */ + bool getUUID(uuid_t uuid); + + unsigned const char *getUUID(); + /** + * Return the UUID string + * @param constant reference to the UUID str + */ + const std::string & getUUIDStr() { + return uuidStr_; + } + + protected: + // A global unique identifier + uuid_t uuid_; + // UUID string + std::string uuidStr_; + + // logger shared ptr + std::shared_ptr<org::apache::nifi::minifi::core::logging::Logger> logger_; + + // Connectable's name + std::string name_; +}; + +namespace logging { +} +} +} +} +} +} + +namespace minifi = org::apache::nifi::minifi; + +namespace core = org::apache::nifi::minifi::core; + +namespace processors = org::apache::nifi::minifi::processors; + +namespace logging = org::apache::nifi::minifi::core::logging; + +namespace utils = org::apache::nifi::minifi::utils; + +namespace provenance = org::apache::nifi::minifi::provenance; + +#endif /* LIBMINIFI_INCLUDE_CORE_CORE_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/core/logging/BaseLogger.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/logging/BaseLogger.h b/libminifi/include/core/logging/BaseLogger.h new file mode 100644 index 0000000..bfdf26f --- /dev/null +++ b/libminifi/include/core/logging/BaseLogger.h @@ -0,0 +1,224 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_BASELOGGER_H_ +#define LIBMINIFI_INCLUDE_BASELOGGER_H_ + +#include <string> +#include <memory> +#include "spdlog/spdlog.h" +#include <iostream> + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { +namespace logging { + +// 5M default log file size +#define DEFAULT_LOG_FILE_SIZE (5*1024*1024) +// 3 log files rotation +#define DEFAULT_LOG_FILE_NUMBER 3 +#define LOG_NAME "minifi log" +#define LOG_FILE_NAME "minifi-app.log" + +/** + * Log level enumeration. + */ +typedef enum { + trace = 0, + debug = 1, + info = 2, + warn = 3, + err = 4, + critical = 5, + off = 6 +} LOG_LEVEL_E; + +#define LOG_BUFFER_SIZE 1024 +#define FILL_BUFFER char buffer[LOG_BUFFER_SIZE]; \ + va_list args; \ + va_start(args, format); \ + std::vsnprintf(buffer, LOG_BUFFER_SIZE,format, args); \ + va_end(args); + +/** + * Base class that represents a logger configuration. + */ +class BaseLogger { + + public: + static const char *nifi_log_level; + static const char *nifi_log_appender; + + /** + * Base Constructor + */ + BaseLogger() { + setLogLevel("info"); + logger_ = nullptr; + stderr_ = nullptr; + } + + /** + * Logger configuration constructorthat will set the base log level. + * @param config incoming configuration. + */ + BaseLogger(std::string log_level, std::shared_ptr<spdlog::logger> logger) + : logger_(logger) { + setLogLevel(log_level); + + } + + virtual ~BaseLogger() { + + } + + /** + * Move constructor that will atomically swap configuration + * shared pointers. + */ + BaseLogger(const BaseLogger &&other) + : configured_level_(other.configured_level_.load()) { + // must atomically exchange the pointers + logger_ = std::move(other.logger_); + set_error_logger(other.stderr_); + + } + + /** + * Returns the log level for this instance. + */ + virtual LOG_LEVEL_E getLogLevel() const { + return configured_level_; + } + + /** + * @brief Log error message + * @param format format string ('man printf' for syntax) + * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match + */ + virtual void log_error(const char * const format, ...); + /** + * @brief Log warn message + * @param format format string ('man printf' for syntax) + * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match + */ + virtual void log_warn(const char * const format, ...); + /** + * @brief Log info message + * @param format format string ('man printf' for syntax) + * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match + */ + virtual void log_info(const char * const format, ...); + /** + * @brief Log debug message + * @param format format string ('man printf' for syntax) + * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match + */ + virtual void log_debug(const char * const format, ...); + /** + * @brief Log trace message + * @param format format string ('man printf' for syntax) + * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match + */ + virtual void log_trace(const char * const format, ...); + + /** + * @brief Log error message + * @param format format string ('man printf' for syntax) + * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match + */ + virtual void log_str(LOG_LEVEL_E level, const std::string &buffer); + + /** + * Sets the log level for this instance based on the string + * @param level desired log leve. + * @param defaultLevel default level if we cannot match level. + */ + virtual void setLogLevel(const std::string &level, LOG_LEVEL_E defaultLevel = + info); + + /** + * Sets the log level atomic and sets it + * within logger if it can + * @param level desired log level. + */ + virtual void setLogLevel(LOG_LEVEL_E level) { + configured_level_ = level; + setLogLevel(); + } + + bool shouldLog(LOG_LEVEL_E level) { + return level >= configured_level_.load(std::memory_order_relaxed); + } + + /** + * Move operator overload + */ + BaseLogger &operator=(const BaseLogger &&other) { + configured_level_ = (other.configured_level_.load()); + // must atomically exchange the pointers + logger_ = std::move(other.logger_); + set_error_logger(other.stderr_); + return *this; + } + + protected: + + /** + * Logger configuration constructorthat will set the base log level. + * @param config incoming configuration. + */ + BaseLogger(std::string log_level) + : logger_(nullptr) { + setLogLevel(log_level); + } + + void setLogger(std::shared_ptr<spdlog::logger> logger) { + logger_ = logger; + } + + /** + * Since a thread may be using stderr and it can be null, + * we must atomically exchange the shared pointers. + * @param other other shared pointer. can be null ptr + */ + void set_error_logger(std::shared_ptr<spdlog::logger> other); + + /** + * Sets the log level on the spdlogger if it is not null. + */ + void setLogLevel() { + if (logger_ != nullptr) + logger_->set_level((spdlog::level::level_enum) configured_level_.load()); + + } + + std::atomic<LOG_LEVEL_E> configured_level_; + std::shared_ptr<spdlog::logger> logger_; + std::shared_ptr<spdlog::logger> stderr_; +}; + +} /* namespace logging */ +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ +#endif /* LIBMINIFI_INCLUDE_BASELOGGER_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/core/logging/LogAppenders.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/logging/LogAppenders.h b/libminifi/include/core/logging/LogAppenders.h new file mode 100644 index 0000000..7bdc3be --- /dev/null +++ b/libminifi/include/core/logging/LogAppenders.h @@ -0,0 +1,301 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_LOGAPPENDERS_H_ +#define LIBMINIFI_INCLUDE_LOGAPPENDERS_H_ + +#include "BaseLogger.h" +#include "spdlog/sinks/null_sink.h" +#include "spdlog/sinks/ostream_sink.h" +#include <cxxabi.h> +#include "properties/Configure.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { +namespace logging { + +template<typename T> +static std::string getUniqueName() { + std::string name = LOG_NAME; + name += " -- "; + name += abi::__cxa_demangle(typeid(T).name(), 0, 0, 0); + spdlog::drop(name); + return name; +} + +/** + * Null appender sets a null sink, thereby performing no logging. + */ +class NullAppender : public BaseLogger { + public: + /** + * Base constructor that creates the null sink. + */ + explicit NullAppender() + : BaseLogger("off") { + auto null_sink = std::make_shared<spdlog::sinks::null_sink_st>(); + std::string unique_name = getUniqueName<NullAppender>(); + logger_ = std::make_shared<spdlog::logger>(unique_name, null_sink); + configured_level_ = off; + setLogLevel(); + } + + /** + * Move constructor for the null appender. + */ + explicit NullAppender(const NullAppender &&other) + : BaseLogger(std::move(other)) { + + } + +}; + +/** + * Basic output stream configuration that uses a supplied ostream + * + * Design : extends LoggerConfiguration using the logger and log level + * encapsulated within the base configuration class. + */ +class OutputStreamAppender : public BaseLogger { + + public: + + static const char *nifi_log_output_stream_error_stderr; + + /** + * Output stream move constructor. + */ + explicit OutputStreamAppender(const OutputStreamAppender &&other) + : BaseLogger(std::move(other)) { + + } + + /** + * Base constructor. Creates a ostream sink. + * @param stream incoming stream reference. + * @param config configuration. + */ + explicit OutputStreamAppender(Configure *config) + : BaseLogger("info") { + auto ostream_sink = std::make_shared<spdlog::sinks::ostream_sink_mt>( + std::cout); + + std::string unique_name = getUniqueName<OutputStreamAppender>(); + logger_ = std::make_shared<spdlog::logger>(unique_name, ostream_sink); + + std::string use_std_err; + + if (NULL != config + && config->get(nifi_log_output_stream_error_stderr, use_std_err)) { + + std::transform(use_std_err.begin(), use_std_err.end(), + use_std_err.begin(), ::tolower); + + if (use_std_err == "true") { + std::string err_unique_name = getUniqueName<OutputStreamAppender>(); + auto error_ostream_sink = std::make_shared< + spdlog::sinks::ostream_sink_mt>(std::cerr); + stderr_ = std::make_shared<spdlog::logger>(err_unique_name, + error_ostream_sink); + } + } else { + stderr_ = nullptr; + } + + std::string log_level; + if (NULL != config && config->get(BaseLogger::nifi_log_level, log_level)) { + setLogLevel(log_level); + } else { + setLogLevel("info"); + } + + } + + /** + * Base constructor. Creates a ostream sink. + * @param stream incoming stream reference. + * @param config configuration. + */ + OutputStreamAppender(std::ostream &stream, Configure *config) + : BaseLogger("info") { + auto ostream_sink = std::make_shared<spdlog::sinks::ostream_sink_mt>( + stream); + std::string unique_name = getUniqueName<OutputStreamAppender>(); + logger_ = std::make_shared<spdlog::logger>(unique_name, ostream_sink); + + stderr_ = nullptr; + + std::string log_level; + if (NULL != config && config->get(BaseLogger::nifi_log_level, log_level)) { + setLogLevel(log_level); + } else { + setLogLevel("info"); + } + + } + + protected: + +}; + +/** + * Rolling configuration + * Design : extends LoggerConfiguration using the logger and log level + * encapsulated within the base configuration class. + */ +class RollingAppender : public BaseLogger { + public: + static const char *nifi_log_rolling_apender_file; + static const char *nifi_log_rolling_appender_max_files; + static const char *nifi_log_rolling_appender_max_file_size; + + /** + * RollingAppenderConfiguration move constructor. + */ + explicit RollingAppender(const RollingAppender&& other) + : BaseLogger(std::move(other)), + max_files_(std::move(other.max_files_)), + file_name_(std::move(other.file_name_)), + max_file_size_(std::move(other.max_file_size_)) { + } + /** + * Base Constructor. + * @param config pointer to the configuration for this instance. + */ + explicit RollingAppender(Configure * config = 0) + : BaseLogger("info") { + std::string file_name = ""; + if (NULL != config + && config->get(nifi_log_rolling_apender_file, file_name)) { + file_name_ = file_name; + } else { + file_name_ = LOG_FILE_NAME; + } + + std::string max_files = ""; + if (NULL != config + && config->get(nifi_log_rolling_appender_max_files, max_files)) { + try { + max_files_ = std::stoi(max_files); + } catch (const std::invalid_argument &ia) { + max_files_ = DEFAULT_LOG_FILE_NUMBER; + } catch (const std::out_of_range &oor) { + max_files_ = DEFAULT_LOG_FILE_NUMBER; + } + } else { + max_files_ = DEFAULT_LOG_FILE_NUMBER; + } + + std::string max_file_size = ""; + if (NULL != config + && config->get(nifi_log_rolling_appender_max_file_size, + max_file_size)) { + try { + max_file_size_ = std::stoi(max_file_size); + } catch (const std::invalid_argument &ia) { + max_file_size_ = DEFAULT_LOG_FILE_SIZE; + } catch (const std::out_of_range &oor) { + max_file_size_ = DEFAULT_LOG_FILE_SIZE; + } + } else { + max_file_size_ = DEFAULT_LOG_FILE_SIZE; + } + + std::string unique_name = getUniqueName<OutputStreamAppender>(); + logger_ = spdlog::rotating_logger_mt(unique_name, file_name_, + max_file_size_, max_files_); + + std::string log_level; + if (NULL != config && config->get(BaseLogger::nifi_log_level, log_level)) { + setLogLevel(log_level); + } + } + + /** + * To maintain current functionality we will flush on write. + */ + void log_str(LOG_LEVEL_E level, const std::string &buffer) { + BaseLogger::log_str(level, buffer); + logger_->flush(); + } + + protected: + + /** + * file name. + */ + std::string file_name_; + /** + * maximum number of files to keep in the rotation. + */ + size_t max_files_; + /** + * Maximum file size per rotated file. + */ + size_t max_file_size_; + +}; + +class LogInstance { + public: + /** + * Returns a logger configuration based on + * the configuration within this instance. + * @param config configuration for this instance. + */ + static std::unique_ptr<BaseLogger> getConfiguredLogger(Configure *config) { + std::string appender = ""; + + if (config->get(BaseLogger::nifi_log_appender, appender)) { + std::transform(appender.begin(), appender.end(), appender.begin(), + ::tolower); + + if ("nullappender" == appender || "null appender" == appender + || "null" == appender) { + + return std::move(std::unique_ptr<BaseLogger>(new NullAppender())); + + } else if ("rollingappender" == appender || "rolling appender" == appender + || "rolling" == appender) { + + return std::move( + std::unique_ptr<BaseLogger>(new RollingAppender(config))); + + } else if ("outputstream" == appender + || "outputstreamappender" == appender + || "outputstream appender" == appender) { + + return std::move( + std::unique_ptr<BaseLogger>(new OutputStreamAppender(config))); + + } + } + return nullptr; + + } +}; + +} /* namespace logging */ +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ +#endif /* LIBMINIFI_INCLUDE_LOGAPPENDERS_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/core/logging/Logger.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/logging/Logger.h b/libminifi/include/core/logging/Logger.h new file mode 100644 index 0000000..08ef702 --- /dev/null +++ b/libminifi/include/core/logging/Logger.h @@ -0,0 +1,214 @@ +/** + * @file Logger.h + * Logger class declaration + * This is a C++ wrapper for spdlog, a lightweight C++ logging library + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __LOGGER_H__ +#define __LOGGER_H__ + +#include <string> +#include <atomic> +#include <memory> +#include <utility> +#include <algorithm> +#include <cstdio> +#include <iostream> + +#include "BaseLogger.h" +#include "spdlog/spdlog.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { +namespace logging { + +/** + * Logger class + * Design: Extends BaseLogger, leaving this class to be the facade to the underlying + * logging mechanism. Is a facade to BaseLogger's underlying log stream. This allows + * the underlying implementation to be replaced real time. + */ +class Logger : public BaseLogger { + protected: + struct singleton; + public: + + /** + * Returns a shared pointer to the logger instance. + * Note that while there is no synchronization this is expected + * to be called and initialized first + * @returns shared pointer to the base logger. + */ + static std::shared_ptr<Logger> getLogger() { + + if (singleton_logger_ == nullptr) + singleton_logger_ = std::make_shared<Logger>(singleton { 0 }); + return singleton_logger_; + } + + /** + * Returns the log level for this instance. + */ + LOG_LEVEL_E getLogLevel() const { + return current_logger_.load()->getLogLevel(); + } + + /** + * Sets the log level atomic and sets it + * within logger if it can + * @param level desired log level. + */ + void setLogLevel(LOG_LEVEL_E level) { + current_logger_.load()->setLogLevel(level); + } + + /** + * Sets the log level for this instance based on the string + * @param level desired log leve. + * @param defaultLevel default level if we cannot match level. + */ + void setLogLevel(const std::string &level, LOG_LEVEL_E defaultLevel = info) { + current_logger_.load()->setLogLevel(level, info); + } + + void updateLogger(std::unique_ptr<BaseLogger> logger) { + + if (logger == nullptr) + return; + current_logger_.store(logger.release()); + } + + /** + * @brief Log error message + * @param format format string ('man printf' for syntax) + * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match + */ + void log_error(const char * const format, ...) { + if (!current_logger_.load()->shouldLog(err)) + return; + FILL_BUFFER + current_logger_.load()->log_str(err, buffer); + } + /** + * @brief Log warn message + * @param format format string ('man printf' for syntax) + * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match + */ + void log_warn(const char * const format, ...) { + if (!current_logger_.load()->shouldLog(warn)) + return; + FILL_BUFFER + current_logger_.load()->log_str(warn, buffer); + } + /** + * @brief Log info message + * @param format format string ('man printf' for syntax) + * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match + */ + void log_info(const char * const format, ...) { + if (!current_logger_.load()->shouldLog(info)) + return; + FILL_BUFFER + current_logger_.load()->log_str(info, buffer); + } + /** + * @brief Log debug message + * @param format format string ('man printf' for syntax) + * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match + */ + void log_debug(const char * const format, ...) { + + if (!current_logger_.load()->shouldLog(debug)) + return; + FILL_BUFFER + current_logger_.load()->log_str(debug, buffer); + } + /** + * @brief Log trace message + * @param format format string ('man printf' for syntax) + * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match + */ + void log_trace(const char * const format, ...) { + + if (!current_logger_.load()->shouldLog(trace)) + return; + FILL_BUFFER + current_logger_.load()->log_str(trace, buffer); + } + + /** + * @brief Log message + * @param format format string ('man printf' for syntax) + * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match + */ + virtual void log_str(LOG_LEVEL_E level, const std::string &buffer) { + current_logger_.load()->log_str(level, buffer); + } + + // Destructor + ~Logger() { + } + + explicit Logger(const singleton &a) { + + /** + * flush on info to maintain current functionality + */ + std::shared_ptr<spdlog::logger> defaultsink = spdlog::rotating_logger_mt( + LOG_NAME, + LOG_FILE_NAME, + DEFAULT_LOG_FILE_SIZE, DEFAULT_LOG_FILE_NUMBER); + defaultsink->flush_on(spdlog::level::level_enum::info); + + std::unique_ptr<BaseLogger> new_logger_ = std::unique_ptr<BaseLogger>( + new BaseLogger("info", defaultsink)); + + new_logger_->setLogLevel(info); + current_logger_.store(new_logger_.release()); + } + + Logger(const Logger &parent) = delete; + Logger &operator=(const Logger &parent) = delete; + + protected: + + /** + * Allows for a null constructor above so that we can have a public constructor that + * effectively limits us to being a singleton by having a protected argument in the constructor + */ + struct singleton { + explicit singleton(int) { + } + }; + + std::atomic<BaseLogger*> current_logger_; + +// Singleton logger instance + static std::shared_ptr<Logger> singleton_logger_; +}; + +} /* namespace logging */ +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/core/repository/FlowFileRepository.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/repository/FlowFileRepository.h b/libminifi/include/core/repository/FlowFileRepository.h new file mode 100644 index 0000000..31e655a --- /dev/null +++ b/libminifi/include/core/repository/FlowFileRepository.h @@ -0,0 +1,169 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CORE_REPOSITORY_FLOWFILEREPOSITORY_H_ +#define LIBMINIFI_INCLUDE_CORE_REPOSITORY_FLOWFILEREPOSITORY_H_ + +#include "leveldb/db.h" +#include "leveldb/options.h" +#include "leveldb/slice.h" +#include "leveldb/status.h" +#include "core/Repository.h" +#include "core/core.h" +#include "Connection.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { +namespace repository { + + + +#define FLOWFILE_REPOSITORY_DIRECTORY "./flowfile_repository" +#define MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE (10*1024*1024) // 10M +#define MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME (600000) // 10 minute +#define FLOWFILE_REPOSITORY_PURGE_PERIOD (2500) // 2500 msec + +/** + * Flow File repository + * Design: Extends Repository and implements the run function, using LevelDB as the primary substrate. + */ +class FlowFileRepository : public core::Repository, public std::enable_shared_from_this<FlowFileRepository> { + public: + // Constructor + + + + FlowFileRepository(std::string directory, int64_t maxPartitionMillis, + int64_t maxPartitionBytes, uint64_t purgePeriod) + : Repository(core::getClassName<FlowFileRepository>(), directory, + maxPartitionMillis, maxPartitionBytes, purgePeriod) + + { + db_ = NULL; + } + + FlowFileRepository() : FlowFileRepository(FLOWFILE_REPOSITORY_DIRECTORY, + MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME, MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, FLOWFILE_REPOSITORY_PURGE_PERIOD) + { + } + + // Destructor + ~FlowFileRepository() { + if (db_) + delete db_; + } + + // initialize + virtual bool initialize() { + std::string value; + + if (configure_->get(Configure::nifi_flowfile_repository_directory_default, + value)) { + directory_ = value; + } + logger_->log_info("NiFi FlowFile Repository Directory %s", + directory_.c_str()); + if (configure_->get(Configure::nifi_flowfile_repository_max_storage_size, + value)) { + Property::StringToInt(value, max_partition_bytes_); + } + logger_->log_info("NiFi FlowFile Max Partition Bytes %d", + max_partition_bytes_); + if (configure_->get(Configure::nifi_flowfile_repository_max_storage_time, + value)) { + TimeUnit unit; + if (Property::StringToTime(value, max_partition_millis_, unit) + && Property::ConvertTimeUnitToMS(max_partition_millis_, unit, + max_partition_millis_)) { + } + } + logger_->log_info("NiFi FlowFile Max Storage Time: [%d] ms", + max_partition_millis_); + leveldb::Options options; + options.create_if_missing = true; + leveldb::Status status = leveldb::DB::Open(options, directory_.c_str(), + &db_); + if (status.ok()) { + logger_->log_info("NiFi FlowFile Repository database open %s success", + directory_.c_str()); + } else { + logger_->log_error("NiFi FlowFile Repository database open %s fail", + directory_.c_str()); + return false; + } + return true; + } + + virtual void run(); + + virtual bool Put(std::string key, uint8_t *buf, int bufLen) + { + + // persistent to the DB + leveldb::Slice value((const char *) buf, bufLen); + leveldb::Status status; + status = db_->Put(leveldb::WriteOptions(), key, value); + if (status.ok()) + return true; + else + return false; + } + /** + * + * Deletes the key + * @return status of the delete operation + */ + virtual bool Delete(std::string key) + { + leveldb::Status status; + status = db_->Delete(leveldb::WriteOptions(), key); + if (status.ok()) + return true; + else + return false; + } + /** + * Sets the value from the provided key + * @return status of the get operation. + */ + virtual bool Get(std::string key, std::string &value) + { + leveldb::Status status; + status = db_->Get(leveldb::ReadOptions(), key, &value); + if (status.ok()) + return true; + else + return false; + } + + void loadFlowFileToConnections(std::map<std::string, std::shared_ptr<minifi::Connection>> &connectionMap); + + private: + leveldb::DB* db_; +}; + +} /* namespace repository */ +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_CORE_REPOSITORY_FLOWFILEREPOSITORY_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/core/yaml/YamlConfiguration.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/yaml/YamlConfiguration.h b/libminifi/include/core/yaml/YamlConfiguration.h new file mode 100644 index 0000000..0ca9190 --- /dev/null +++ b/libminifi/include/core/yaml/YamlConfiguration.h @@ -0,0 +1,99 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CORE_YAMLCONFIGURATION_H_ +#define LIBMINIFI_INCLUDE_CORE_YAMLCONFIGURATION_H_ + +#include "core/ProcessorConfig.h" +#include "yaml-cpp/yaml.h" +#include "../FlowConfiguration.h" +#include "Site2SiteClientProtocol.h" +#include <string> +#include "io/validation.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { + +#define DEFAULT_FLOW_YAML_FILE_NAME "conf/flow.yml" +#define CONFIG_YAML_PROCESSORS_KEY "Processors" + +class YamlConfiguration : public FlowConfiguration { + + public: + YamlConfiguration(std::shared_ptr<core::Repository> repo, + std::shared_ptr<core::Repository> flow_file_repo, + const std::string path = DEFAULT_FLOW_YAML_FILE_NAME) + : FlowConfiguration(repo, flow_file_repo, path) { + if (IsNullOrEmpty(config_path_)) { + config_path_ = DEFAULT_FLOW_YAML_FILE_NAME; + } + } + + virtual ~YamlConfiguration() { + + } + + std::unique_ptr<core::ProcessGroup> getRoot(const std::string &from_config) { + + YAML::Node flow = YAML::LoadFile(from_config); + + YAML::Node flowControllerNode = flow["Flow Controller"]; + YAML::Node processorsNode = flow[CONFIG_YAML_PROCESSORS_KEY]; + YAML::Node connectionsNode = flow["Connections"]; + YAML::Node remoteProcessingGroupNode = flow["Remote Processing Groups"]; + + // Create the root process group + core::ProcessGroup * root = parseRootProcessGroupYaml(flowControllerNode); + parseProcessorNodeYaml(processorsNode, root); + parseRemoteProcessGroupYaml(&remoteProcessingGroupNode, root); + parseConnectionYaml(&connectionsNode, root); + + return std::unique_ptr<core::ProcessGroup>(root); + + } + protected: + // Process Processor Node YAML + void parseProcessorNodeYaml(YAML::Node processorNode, + core::ProcessGroup * parent); + // Process Port YAML + void parsePortYaml(YAML::Node *portNode, core::ProcessGroup *parent, + TransferDirection direction); + // Process Root Processor Group YAML + core::ProcessGroup *parseRootProcessGroupYaml(YAML::Node rootNode); + // Process Property YAML + void parseProcessorPropertyYaml(YAML::Node *doc, YAML::Node *node, + std::shared_ptr<core::Processor> processor); + // Process connection YAML + void parseConnectionYaml(YAML::Node *node, core::ProcessGroup * parent); + // Process Remote Process Group YAML + void parseRemoteProcessGroupYaml(YAML::Node *node, + core::ProcessGroup * parent); + // Parse Properties Node YAML for a processor + void parsePropertiesNodeYaml(YAML::Node *propertiesNode, + std::shared_ptr<core::Processor> processor); +}; + +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_CORE_YAMLCONFIGURATION_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/io/BaseStream.h ---------------------------------------------------------------------- diff --git a/libminifi/include/io/BaseStream.h b/libminifi/include/io/BaseStream.h index c3ebe42..b0b3589 100644 --- a/libminifi/include/io/BaseStream.h +++ b/libminifi/include/io/BaseStream.h @@ -19,132 +19,144 @@ #ifndef LIBMINIFI_INCLUDE_IO_BASESTREAM_H_ #define LIBMINIFI_INCLUDE_IO_BASESTREAM_H_ - #include <cstdint> #include "EndianCheck.h" #include "DataStream.h" #include "Serializable.h" -class BaseStream: public DataStream, public Serializable { - -public: - BaseStream() { - - } - virtual ~BaseStream() { - - } - /** - * write 4 bytes to stream - * @param base_value non encoded value - * @param stream output stream - * @param is_little_endian endianness determination - * @return resulting write size - **/ - virtual int write(uint32_t base_value, bool is_little_endian = - EndiannessCheck::IS_LITTLE); - - /** - * write 2 bytes to stream - * @param base_value non encoded value - * @param stream output stream - * @param is_little_endian endianness determination - * @return resulting write size - **/ - virtual int write(uint16_t base_value, bool is_little_endian = - EndiannessCheck::IS_LITTLE); - - /** - * write valueto stream - * @param value non encoded value - * @param len length of value - * @param strema output stream - * @return resulting write size - **/ - virtual int write(uint8_t *value, int len); - - /** - * write 8 bytes to stream - * @param base_value non encoded value - * @param stream output stream - * @param is_little_endian endianness determination - * @return resulting write size - **/ - virtual int write(uint64_t base_value, bool is_little_endian = - EndiannessCheck::IS_LITTLE); - - /** - * write bool to stream - * @param value non encoded value - * @return resulting write size - **/ - virtual int write(bool value); - - /** - * write UTF string to stream - * @param str string to write - * @return resulting write size - **/ - virtual int writeUTF(std::string str, bool widen = false); - - /** - * reads a byte from the stream - * @param value reference in which will set the result - * @param stream stream from which we will read - * @return resulting read size - **/ - virtual int read(uint8_t &value); - - /** - * reads two bytes from the stream - * @param value reference in which will set the result - * @param stream stream from which we will read - * @return resulting read size - **/ - virtual int read(uint16_t &base_value, bool is_little_endian = - EndiannessCheck::IS_LITTLE); - - /** - * reads a byte from the stream - * @param value reference in which will set the result - * @param stream stream from which we will read - * @return resulting read size - **/ - virtual int read(char &value); - - /** - * reads a byte array from the stream - * @param value reference in which will set the result - * @param len length to read - * @param stream stream from which we will read - * @return resulting read size - **/ - virtual int read(uint8_t *value, int len); - - /** - * reads four bytes from the stream - * @param value reference in which will set the result - * @param stream stream from which we will read - * @return resulting read size - **/ - virtual int read(uint32_t &value, - bool is_little_endian = EndiannessCheck::IS_LITTLE); - - /** - * reads eight byte from the stream - * @param value reference in which will set the result - * @param stream stream from which we will read - * @return resulting read size - **/ - virtual int read(uint64_t &value, - bool is_little_endian = EndiannessCheck::IS_LITTLE); - - /** - * read UTF from stream - * @param str reference string - * @param stream stream from which we will read - * @return resulting read size - **/ - virtual int readUTF(std::string &str, bool widen = false); + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace io { + + +class BaseStream : public DataStream, public Serializable { + + public: + BaseStream() { + + } + virtual ~BaseStream() { + + } + /** + * write 4 bytes to stream + * @param base_value non encoded value + * @param stream output stream + * @param is_little_endian endianness determination + * @return resulting write size + **/ + virtual int write(uint32_t base_value, bool is_little_endian = + EndiannessCheck::IS_LITTLE); + + /** + * write 2 bytes to stream + * @param base_value non encoded value + * @param stream output stream + * @param is_little_endian endianness determination + * @return resulting write size + **/ + virtual int write(uint16_t base_value, bool is_little_endian = + EndiannessCheck::IS_LITTLE); + + /** + * write valueto stream + * @param value non encoded value + * @param len length of value + * @param strema output stream + * @return resulting write size + **/ + virtual int write(uint8_t *value, int len); + + /** + * write 8 bytes to stream + * @param base_value non encoded value + * @param stream output stream + * @param is_little_endian endianness determination + * @return resulting write size + **/ + virtual int write(uint64_t base_value, bool is_little_endian = + EndiannessCheck::IS_LITTLE); + + /** + * write bool to stream + * @param value non encoded value + * @return resulting write size + **/ + virtual int write(bool value); + + /** + * write UTF string to stream + * @param str string to write + * @return resulting write size + **/ + virtual int writeUTF(std::string str, bool widen = false); + + /** + * reads a byte from the stream + * @param value reference in which will set the result + * @param stream stream from which we will read + * @return resulting read size + **/ + virtual int read(uint8_t &value); + + /** + * reads two bytes from the stream + * @param value reference in which will set the result + * @param stream stream from which we will read + * @return resulting read size + **/ + virtual int read(uint16_t &base_value, bool is_little_endian = + EndiannessCheck::IS_LITTLE); + + /** + * reads a byte from the stream + * @param value reference in which will set the result + * @param stream stream from which we will read + * @return resulting read size + **/ + virtual int read(char &value); + + /** + * reads a byte array from the stream + * @param value reference in which will set the result + * @param len length to read + * @param stream stream from which we will read + * @return resulting read size + **/ + virtual int read(uint8_t *value, int len); + + /** + * reads four bytes from the stream + * @param value reference in which will set the result + * @param stream stream from which we will read + * @return resulting read size + **/ + virtual int read(uint32_t &value, bool is_little_endian = + EndiannessCheck::IS_LITTLE); + + /** + * reads eight byte from the stream + * @param value reference in which will set the result + * @param stream stream from which we will read + * @return resulting read size + **/ + virtual int read(uint64_t &value, bool is_little_endian = + EndiannessCheck::IS_LITTLE); + + /** + * read UTF from stream + * @param str reference string + * @param stream stream from which we will read + * @return resulting read size + **/ + virtual int readUTF(std::string &str, bool widen = false); }; +} /* namespace io */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ #endif /* LIBMINIFI_INCLUDE_IO_BASESTREAM_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/io/CRCStream.h ---------------------------------------------------------------------- diff --git a/libminifi/include/io/CRCStream.h b/libminifi/include/io/CRCStream.h index 01b6199..99fdfc3 100644 --- a/libminifi/include/io/CRCStream.h +++ b/libminifi/include/io/CRCStream.h @@ -25,282 +25,279 @@ #include "BaseStream.h" #include "Serializable.h" -#define htonll_r(x) ((((uint64_t)htonl(x)) << 32) + htonl((x) >> 32)) +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace io { +#define htonll_r(x) ((((uint64_t)htonl(x)) << 32) + htonl((x) >> 32)) template<typename T> -class CRCStream: public BaseStream { -public: - /** - * Raw pointer because the caller guarantees that - * it will exceed our lifetime. - */ - explicit CRCStream(T *stream); - - explicit CRCStream( CRCStream<T> &&move ); - - virtual ~CRCStream() { - - } - - /** - * Reads data and places it into buf - * @param buf buffer in which we extract data - * @param buflen - */ - virtual int readData(std::vector<uint8_t> &buf, int buflen); - /** - * Reads data and places it into buf - * @param buf buffer in which we extract data - * @param buflen - */ - virtual int readData(uint8_t *buf, int buflen); - - /** - * Write value to the stream using std::vector - * @param buf incoming buffer - * @param buflen buffer to write - * - */ - virtual int writeData(std::vector<uint8_t> &buf, int buflen); - - /** - * writes value to stream - * @param value value to write - * @param size size of value - */ - virtual int writeData(uint8_t *value, int size); - - /** - * write 4 bytes to stream - * @param base_value non encoded value - * @param stream output stream - * @param is_little_endian endianness determination - * @return resulting write size - **/ - virtual int write(uint32_t base_value, bool is_little_endian = - EndiannessCheck::IS_LITTLE); - /** - * write 2 bytes to stream - * @param base_value non encoded value - * @param stream output stream - * @param is_little_endian endianness determination - * @return resulting write size - **/ - virtual int write(uint16_t base_value, bool is_little_endian = - EndiannessCheck::IS_LITTLE); - - - /** - * write 8 bytes to stream - * @param base_value non encoded value - * @param stream output stream - * @param is_little_endian endianness determination - * @return resulting write size - **/ - virtual int write(uint64_t base_value, bool is_little_endian = - EndiannessCheck::IS_LITTLE); - - - - /** - * Reads a system word - * @param value value to write - */ - virtual int read(uint64_t &value, bool is_little_endian = - EndiannessCheck::IS_LITTLE); - - /** - * Reads a uint32_t - * @param value value to write - */ - virtual int read(uint32_t &value, bool is_little_endian = - EndiannessCheck::IS_LITTLE); - - /** - * Reads a system short - * @param value value to write - */ - virtual int read(uint16_t &value, bool is_little_endian = - EndiannessCheck::IS_LITTLE); - - - virtual short initialize() { - child_stream_->initialize(); - reset(); - return 0; - } - - - void updateCRC(uint8_t *buffer, uint32_t length); - - uint64_t getCRC() { - return crc_; - } - - void reset(); -protected: - +class CRCStream : public BaseStream { + public: /** - * Creates a vector and returns the vector using the provided - * type name. - * @param t incoming object - * @returns vector. - */ - template<typename K> - std::vector<uint8_t> readBuffer(const K& t){ - std::vector<uint8_t> buf; - buf.resize(sizeof t); - readData((uint8_t*) &buf[0], sizeof(t)); - return buf; - } - - - uint64_t crc_; - T *child_stream_; -}; + * Raw pointer because the caller guarantees that + * it will exceed our lifetime. + */ + explicit CRCStream(T *stream); + + explicit CRCStream(CRCStream<T> &&move); + + virtual ~CRCStream() { + + } + + /** + * Reads data and places it into buf + * @param buf buffer in which we extract data + * @param buflen + */ + virtual int readData(std::vector<uint8_t> &buf, int buflen); + /** + * Reads data and places it into buf + * @param buf buffer in which we extract data + * @param buflen + */ + virtual int readData(uint8_t *buf, int buflen); + + /** + * Write value to the stream using std::vector + * @param buf incoming buffer + * @param buflen buffer to write + * + */ + virtual int writeData(std::vector<uint8_t> &buf, int buflen); + + /** + * writes value to stream + * @param value value to write + * @param size size of value + */ + virtual int writeData(uint8_t *value, int size); + + /** + * write 4 bytes to stream + * @param base_value non encoded value + * @param stream output stream + * @param is_little_endian endianness determination + * @return resulting write size + **/ + virtual int write(uint32_t base_value, bool is_little_endian = + EndiannessCheck::IS_LITTLE); + /** + * write 2 bytes to stream + * @param base_value non encoded value + * @param stream output stream + * @param is_little_endian endianness determination + * @return resulting write size + **/ + virtual int write(uint16_t base_value, bool is_little_endian = + EndiannessCheck::IS_LITTLE); + + /** + * write 8 bytes to stream + * @param base_value non encoded value + * @param stream output stream + * @param is_little_endian endianness determination + * @return resulting write size + **/ + virtual int write(uint64_t base_value, bool is_little_endian = + EndiannessCheck::IS_LITTLE); + /** + * Reads a system word + * @param value value to write + */ + virtual int read(uint64_t &value, bool is_little_endian = + EndiannessCheck::IS_LITTLE); + + /** + * Reads a uint32_t + * @param value value to write + */ + virtual int read(uint32_t &value, bool is_little_endian = + EndiannessCheck::IS_LITTLE); + + /** + * Reads a system short + * @param value value to write + */ + virtual int read(uint16_t &value, bool is_little_endian = + EndiannessCheck::IS_LITTLE); + + virtual short initialize() { + child_stream_->initialize(); + reset(); + return 0; + } + + void updateCRC(uint8_t *buffer, uint32_t length); + + uint64_t getCRC() { + return crc_; + } + + void reset(); + protected: + + /** + * Creates a vector and returns the vector using the provided + * type name. + * @param t incoming object + * @returns vector. + */ + template<typename K> + std::vector<uint8_t> readBuffer(const K& t) { + std::vector<uint8_t> buf; + buf.resize(sizeof t); + readData((uint8_t*) &buf[0], sizeof(t)); + return buf; + } + + uint64_t crc_; + T *child_stream_; +}; template<typename T> -CRCStream<T>::CRCStream(T *other) : - child_stream_(other) { - crc_ = crc32(0L, Z_NULL, 0); +CRCStream<T>::CRCStream(T *other) + : child_stream_(other) { + crc_ = crc32(0L, Z_NULL, 0); } template<typename T> -CRCStream<T>::CRCStream(CRCStream<T> &&move) : - crc_(std::move(move.crc_)), child_stream_(std::move(move.child_stream_)) { +CRCStream<T>::CRCStream(CRCStream<T> &&move) + : crc_(std::move(move.crc_)), + child_stream_(std::move(move.child_stream_)) { } template<typename T> int CRCStream<T>::readData(std::vector<uint8_t> &buf, int buflen) { - if (buf.capacity() < buflen) - buf.resize(buflen); - return readData((uint8_t*) &buf[0], buflen); + if (buf.capacity() < buflen) + buf.resize(buflen); + return readData((uint8_t*) &buf[0], buflen); } template<typename T> int CRCStream<T>::readData(uint8_t *buf, int buflen) { - int ret = child_stream_->read(buf, buflen); - crc_ = crc32(crc_, buf, buflen); - return ret; + int ret = child_stream_->read(buf, buflen); + crc_ = crc32(crc_, buf, buflen); + return ret; } template<typename T> int CRCStream<T>::writeData(std::vector<uint8_t> &buf, int buflen) { - if (buf.capacity() < buflen) - buf.resize(buflen); - return writeData((uint8_t*) &buf[0], buflen); + if (buf.capacity() < buflen) + buf.resize(buflen); + return writeData((uint8_t*) &buf[0], buflen); } template<typename T> int CRCStream<T>::writeData(uint8_t *value, int size) { - int ret = child_stream_->write(value, size); - crc_ = crc32(crc_, value, size); - return ret; + int ret = child_stream_->write(value, size); + crc_ = crc32(crc_, value, size); + return ret; } template<typename T> void CRCStream<T>::reset() { - crc_ = crc32(0L, Z_NULL, 0); + crc_ = crc32(0L, Z_NULL, 0); } template<typename T> void CRCStream<T>::updateCRC(uint8_t *buffer, uint32_t length) { - crc_ = crc32(crc_, buffer, length); + crc_ = crc32(crc_, buffer, length); } template<typename T> -int CRCStream<T>::write(uint64_t base_value, bool is_little_endian){ - - const uint64_t value = - is_little_endian == 1 ? htonll_r(base_value) : base_value; - uint8_t bytes[sizeof value]; - std::copy(static_cast<const char*>(static_cast<const void*>(&value)), - static_cast<const char*>(static_cast<const void*>(&value)) + sizeof value, - bytes); - return writeData(bytes,sizeof value); -} +int CRCStream<T>::write(uint64_t base_value, bool is_little_endian) { + const uint64_t value = + is_little_endian == 1 ? htonll_r(base_value) : base_value; + uint8_t bytes[sizeof value]; + std::copy( + static_cast<const char*>(static_cast<const void*>(&value)), + static_cast<const char*>(static_cast<const void*>(&value)) + sizeof value, + bytes); + return writeData(bytes, sizeof value); +} template<typename T> -int CRCStream<T>::write(uint32_t base_value, bool is_little_endian){ - const uint32_t value = is_little_endian ? htonl(base_value) : base_value; - uint8_t bytes[sizeof value]; - std::copy(static_cast<const char*>(static_cast<const void*>(&value)), - static_cast<const char*>(static_cast<const void*>(&value)) + sizeof value, - bytes); - return writeData(bytes,sizeof value); +int CRCStream<T>::write(uint32_t base_value, bool is_little_endian) { + const uint32_t value = is_little_endian ? htonl(base_value) : base_value; + uint8_t bytes[sizeof value]; + std::copy( + static_cast<const char*>(static_cast<const void*>(&value)), + static_cast<const char*>(static_cast<const void*>(&value)) + sizeof value, + bytes); + return writeData(bytes, sizeof value); } template<typename T> -int CRCStream<T>::write(uint16_t base_value, bool is_little_endian){ - const uint16_t value = - is_little_endian == 1 ? htons(base_value) : base_value; +int CRCStream<T>::write(uint16_t base_value, bool is_little_endian) { + const uint16_t value = is_little_endian == 1 ? htons(base_value) : base_value; uint8_t bytes[sizeof value]; - std::copy(static_cast<const char*>(static_cast<const void*>(&value)), - static_cast<const char*>(static_cast<const void*>(&value)) + sizeof value, - bytes); - return writeData(bytes,sizeof value); + std::copy( + static_cast<const char*>(static_cast<const void*>(&value)), + static_cast<const char*>(static_cast<const void*>(&value)) + sizeof value, + bytes); + return writeData(bytes, sizeof value); } - template<typename T> int CRCStream<T>::read(uint64_t &value, bool is_little_endian) { - auto buf = readBuffer(value); - - if (is_little_endian) { - value = ((uint64_t) buf[0] << 56) | ((uint64_t) (buf[1] & 255) << 48) - | ((uint64_t) (buf[2] & 255) << 40) - | ((uint64_t) (buf[3] & 255) << 32) - | ((uint64_t) (buf[4] & 255) << 24) - | ((uint64_t) (buf[5] & 255) << 16) - | ((uint64_t) (buf[6] & 255) << 8) - | ((uint64_t) (buf[7] & 255) << 0); - } else { - value = ((uint64_t) buf[0] << 0) | ((uint64_t) (buf[1] & 255) << 8) - | ((uint64_t) (buf[2] & 255) << 16) - | ((uint64_t) (buf[3] & 255) << 24) - | ((uint64_t) (buf[4] & 255) << 32) - | ((uint64_t) (buf[5] & 255) << 40) - | ((uint64_t) (buf[6] & 255) << 48) - | ((uint64_t) (buf[7] & 255) << 56); - } - return sizeof(value); + auto buf = readBuffer(value); + + if (is_little_endian) { + value = ((uint64_t) buf[0] << 56) | ((uint64_t) (buf[1] & 255) << 48) + | ((uint64_t) (buf[2] & 255) << 40) | ((uint64_t) (buf[3] & 255) << 32) + | ((uint64_t) (buf[4] & 255) << 24) | ((uint64_t) (buf[5] & 255) << 16) + | ((uint64_t) (buf[6] & 255) << 8) | ((uint64_t) (buf[7] & 255) << 0); + } else { + value = ((uint64_t) buf[0] << 0) | ((uint64_t) (buf[1] & 255) << 8) + | ((uint64_t) (buf[2] & 255) << 16) | ((uint64_t) (buf[3] & 255) << 24) + | ((uint64_t) (buf[4] & 255) << 32) | ((uint64_t) (buf[5] & 255) << 40) + | ((uint64_t) (buf[6] & 255) << 48) | ((uint64_t) (buf[7] & 255) << 56); + } + return sizeof(value); } template<typename T> int CRCStream<T>::read(uint32_t &value, bool is_little_endian) { - auto buf = readBuffer(value); + auto buf = readBuffer(value); - if (is_little_endian) { - value = (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3]; - } else { - value = buf[0] | buf[1] << 8 | buf[2] << 16 | buf[3] << 24; + if (is_little_endian) { + value = (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3]; + } else { + value = buf[0] | buf[1] << 8 | buf[2] << 16 | buf[3] << 24; - } + } - return sizeof(value); + return sizeof(value); } template<typename T> int CRCStream<T>::read(uint16_t &value, bool is_little_endian) { - auto buf = readBuffer(value); + auto buf = readBuffer(value); - if (is_little_endian) { - value = (buf[0] << 8) | buf[1]; - } else { - value = buf[0] | buf[1] << 8; + if (is_little_endian) { + value = (buf[0] << 8) | buf[1]; + } else { + value = buf[0] | buf[1] << 8; - } - return sizeof(value); + } + return sizeof(value); } - +} /* namespace io */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ #endif /* LIBMINIFI_INCLUDE_IO_CRCSTREAM_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/io/ClientSocket.h ---------------------------------------------------------------------- diff --git a/libminifi/include/io/ClientSocket.h b/libminifi/include/io/ClientSocket.h index 3f8aae1..97cace2 100644 --- a/libminifi/include/io/ClientSocket.h +++ b/libminifi/include/io/ClientSocket.h @@ -26,10 +26,17 @@ #include <mutex> #include <atomic> #include "io/BaseStream.h" -#include "Logger.h" +#include "core/core.h" +#include "core/logging/Logger.h" #include "io/validation.h" +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace io { + /** * Socket class. * Purpose: Provides a general purpose socket interface that abstracts @@ -39,207 +46,209 @@ * * */ -class Socket: public BaseStream { -public: - /** - * Constructor that accepts host name, port and listeners. With this - * contructor we will be creating a server socket - * @param hostname our host name - * @param port connecting port - * @param listeners number of listeners in the queue - */ - explicit Socket(const std::string &hostname, const uint16_t port, - const uint16_t listeners); - - /** - * Constructor that creates a client socket. - * @param hostname hostname we are connecting to. - * @param port port we are connecting to. - */ - explicit Socket(const std::string &hostname, const uint16_t port); - - /** - * Move constructor. - */ - explicit Socket(const Socket &&); - - static std::string HOSTNAME; - - /** - * Static function to return the current machine's host name - */ - static std::string getMyHostName(std::string *str = &HOSTNAME) { - if (__builtin_expect(!IsNullOrEmpty(str), 0)) - return *str; - else { - char hostname[1024]; - gethostname(hostname, 1024); - Socket mySock(hostname, 0); - mySock.initialize(); - return mySock.getHostname(); - } - } - - /** - * Destructor - */ - - virtual ~Socket(); - - virtual void closeStream(); - /** - * Initializes the socket - * @return result of the creation operation. - */ - virtual short initialize(); - - std::string getHostname() const; - - /** - * Return the port for this socket - * @returns port - */ - uint16_t getPort(); - - // data stream extensions - /** - * Reads data and places it into buf - * @param buf buffer in which we extract data - * @param buflen - */ - virtual int readData(std::vector<uint8_t> &buf, int buflen); - /** - * Reads data and places it into buf - * @param buf buffer in which we extract data - * @param buflen - */ - virtual int readData(uint8_t *buf, int buflen); - - /** - * Write value to the stream using std::vector - * @param buf incoming buffer - * @param buflen buffer to write - * - */ - virtual int writeData(std::vector<uint8_t> &buf, int buflen); - - /** - * writes value to stream - * @param value value to write - * @param size size of value - */ - virtual int writeData(uint8_t *value, int size); - - - - /** - * Writes a system word - * @param value value to write - */ - virtual int write(uint64_t value, bool is_little_endian = - EndiannessCheck::IS_LITTLE); - - /** - * Writes a uint32_t - * @param value value to write - */ - virtual int write(uint32_t value, bool is_little_endian = - EndiannessCheck::IS_LITTLE); - - /** - * Writes a system short - * @param value value to write - */ - virtual int write(uint16_t value, bool is_little_endian = - EndiannessCheck::IS_LITTLE); - - - /** - * Reads a system word - * @param value value to write - */ - virtual int read(uint64_t &value, bool is_little_endian = - EndiannessCheck::IS_LITTLE); - - /** - * Reads a uint32_t - * @param value value to write - */ - virtual int read(uint32_t &value, bool is_little_endian = - EndiannessCheck::IS_LITTLE); - - /** - * Reads a system short - * @param value value to write - */ - virtual int read(uint16_t &value, bool is_little_endian = - EndiannessCheck::IS_LITTLE); - - /** - * Returns the underlying buffer - * @return vector's array - **/ - const uint8_t *getBuffer() const { - return ::DataStream::getBuffer(); - } - - /** - * Retrieve size of data stream - * @return size of data stream - **/ - const uint32_t getSize() const { - return ::DataStream::getSize(); - } - -protected: - - /** - * Creates a vector and returns the vector using the provided - * type name. - * @param t incoming object - * @returns vector. - */ - template<typename T> - std::vector<uint8_t> readBuffer(const T&); - - /** - * Creates a connection using the address info object. - * @param p addrinfo structure. - * @returns fd. - */ - virtual int8_t createConnection(const addrinfo *p,in_addr_t &addr); - - /** - * Sets socket options depending on the instance. - * @param sock socket file descriptor. - */ - virtual short setSocketOptions(const int sock); - - /** - * Attempt to select the socket file descriptor - * @param msec timeout interval to wait - * @returns file descriptor - */ - virtual short select_descriptor(const uint16_t msec); - - std::shared_ptr<Logger> logger_; - - addrinfo *addr_info_; - - std::recursive_mutex selection_mutex_; - - std::string requested_hostname_; - std::string canonical_hostname_; - uint16_t port_; - - // connection information - int32_t socket_file_descriptor_; - - fd_set total_list_; - fd_set read_fds_; - std::atomic<uint16_t> socket_max_; - uint16_t listeners_; +class Socket : public BaseStream { + public: + /** + * Constructor that accepts host name, port and listeners. With this + * contructor we will be creating a server socket + * @param hostname our host name + * @param port connecting port + * @param listeners number of listeners in the queue + */ + explicit Socket(const std::string &hostname, const uint16_t port, + const uint16_t listeners); + + /** + * Constructor that creates a client socket. + * @param hostname hostname we are connecting to. + * @param port port we are connecting to. + */ + explicit Socket(const std::string &hostname, const uint16_t port); + + /** + * Move constructor. + */ + explicit Socket(const Socket &&); + + static std::string HOSTNAME; + + /** + * Static function to return the current machine's host name + */ + static std::string getMyHostName(std::string *str = &HOSTNAME) { + if (__builtin_expect(!IsNullOrEmpty(str), 0)) + return *str; + else { + char hostname[1024]; + gethostname(hostname, 1024); + Socket mySock(hostname, 0); + mySock.initialize(); + return mySock.getHostname(); + } + } + + /** + * Destructor + */ + + virtual ~Socket(); + + virtual void closeStream(); + /** + * Initializes the socket + * @return result of the creation operation. + */ + virtual short initialize(); + + std::string getHostname() const; + + /** + * Return the port for this socket + * @returns port + */ + uint16_t getPort(); + + // data stream extensions + /** + * Reads data and places it into buf + * @param buf buffer in which we extract data + * @param buflen + */ + virtual int readData(std::vector<uint8_t> &buf, int buflen); + /** + * Reads data and places it into buf + * @param buf buffer in which we extract data + * @param buflen + */ + virtual int readData(uint8_t *buf, int buflen); + + /** + * Write value to the stream using std::vector + * @param buf incoming buffer + * @param buflen buffer to write + * + */ + virtual int writeData(std::vector<uint8_t> &buf, int buflen); + + /** + * writes value to stream + * @param value value to write + * @param size size of value + */ + virtual int writeData(uint8_t *value, int size); + + /** + * Writes a system word + * @param value value to write + */ + virtual int write(uint64_t value, bool is_little_endian = + EndiannessCheck::IS_LITTLE); + + /** + * Writes a uint32_t + * @param value value to write + */ + virtual int write(uint32_t value, bool is_little_endian = + EndiannessCheck::IS_LITTLE); + + /** + * Writes a system short + * @param value value to write + */ + virtual int write(uint16_t value, bool is_little_endian = + EndiannessCheck::IS_LITTLE); + + /** + * Reads a system word + * @param value value to write + */ + virtual int read(uint64_t &value, bool is_little_endian = + EndiannessCheck::IS_LITTLE); + + /** + * Reads a uint32_t + * @param value value to write + */ + virtual int read(uint32_t &value, bool is_little_endian = + EndiannessCheck::IS_LITTLE); + + /** + * Reads a system short + * @param value value to write + */ + virtual int read(uint16_t &value, bool is_little_endian = + EndiannessCheck::IS_LITTLE); + + /** + * Returns the underlying buffer + * @return vector's array + **/ + const uint8_t *getBuffer() const { + return DataStream::getBuffer(); + } + + /** + * Retrieve size of data stream + * @return size of data stream + **/ + const uint32_t getSize() const { + return DataStream::getSize(); + } + + protected: + + /** + * Creates a vector and returns the vector using the provided + * type name. + * @param t incoming object + * @returns vector. + */ + template<typename T> + std::vector<uint8_t> readBuffer(const T&); + + /** + * Creates a connection using the address info object. + * @param p addrinfo structure. + * @returns fd. + */ + virtual int8_t createConnection(const addrinfo *p, in_addr_t &addr); + + /** + * Sets socket options depending on the instance. + * @param sock socket file descriptor. + */ + virtual short setSocketOptions(const int sock); + + /** + * Attempt to select the socket file descriptor + * @param msec timeout interval to wait + * @returns file descriptor + */ + virtual short select_descriptor(const uint16_t msec); + + std::shared_ptr<logging::Logger> logger_; + + addrinfo *addr_info_; + + std::recursive_mutex selection_mutex_; + + std::string requested_hostname_; + std::string canonical_hostname_; + uint16_t port_; + + // connection information + int32_t socket_file_descriptor_; + + fd_set total_list_; + fd_set read_fds_; + std::atomic<uint16_t> socket_max_; + uint16_t listeners_; }; +} /* namespace io */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ #endif /* LIBMINIFI_INCLUDE_IO_CLIENTSOCKET_H_ */
