http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/include/core/repository/VolatileRepository.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/repository/VolatileRepository.h b/libminifi/include/core/repository/VolatileRepository.h index be23a0b..0f25b7f 100644 --- a/libminifi/include/core/repository/VolatileRepository.h +++ b/libminifi/include/core/repository/VolatileRepository.h @@ -126,6 +126,10 @@ class VolatileRepository : public core::Repository, public std::enable_shared_fr virtual void start(); + virtual uint64_t getRepoSize() { + return current_size_; + } + protected: virtual void emplace(RepoValue<T> &old_value) { @@ -254,7 +258,7 @@ bool VolatileRepository<T>::Put(T key, const uint8_t *buf, size_t bufLen) { } updated = value_vector_.at(private_index)->setRepoValue(new_value, old_value, reclaimed_size); - logger_->log_debug("Set repo value at %ll out of %ll updated %ll current_size %ll, adding %ll to %ll", private_index, max_count_, updated == true, reclaimed_size, size, current_size_.load()); + logger_->log_debug("Set repo value at %u out of %u updated %u current_size %u, adding %u to %u", private_index, max_count_, updated == true, reclaimed_size, size, current_size_.load()); if (updated && reclaimed_size > 0) { std::lock_guard<std::mutex> lock(mutex_); emplace(old_value); @@ -273,7 +277,7 @@ bool VolatileRepository<T>::Put(T key, const uint8_t *buf, size_t bufLen) { } while (!updated); current_size_ += size; - logger_->log_debug("VolatileRepository -- put %ll %ll", current_size_.load(), current_index_.load()); + logger_->log_debug("VolatileRepository -- put %u %u", current_size_.load(), current_index_.load()); return true; } /** @@ -330,6 +334,8 @@ bool VolatileRepository<T>::DeSerialize(std::vector<std::shared_ptr<core::Serial store.push_back(newComponent); + current_size_ -= repo_value.getBufferSize(); + if (max_size++ >= requested_batch) { break; } @@ -344,7 +350,7 @@ bool VolatileRepository<T>::DeSerialize(std::vector<std::shared_ptr<core::Serial template<typename T> bool VolatileRepository<T>::DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &store, size_t &max_size) { - logger_->log_debug("VolatileRepository -- DeSerialize %ll", current_size_.load()); + logger_->log_debug("VolatileRepository -- DeSerialize %u", current_size_.load()); max_size = 0; for (auto ent : value_vector_) { // let the destructor do the cleanup @@ -353,6 +359,7 @@ bool VolatileRepository<T>::DeSerialize(std::vector<std::shared_ptr<core::Serial if (ent->getValue(repo_value)) { // we've taken ownership of this repo value store.at(max_size)->DeSerialize(repo_value.getBuffer(), repo_value.getBufferSize()); + current_size_ -= repo_value.getBufferSize(); if (max_size++ >= store.size()) { break; } @@ -386,7 +393,6 @@ void VolatileRepository<T>::start() { #pragma GCC diagnostic pop #endif - } /* namespace repository */ } /* namespace core */ } /* namespace minifi */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/include/core/state/StateManager.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/state/StateManager.h b/libminifi/include/core/state/StateManager.h index 2eee1ae..983ed0b 100644 --- a/libminifi/include/core/state/StateManager.h +++ b/libminifi/include/core/state/StateManager.h @@ -22,12 +22,12 @@ #include <atomic> #include <algorithm> -#include "core/state/metrics/MetricsBase.h" -#include "core/state/metrics/MetricsListener.h" #include "UpdateController.h" #include "io/validation.h" #include "utils/ThreadPool.h" #include "core/Core.h" +#include "nodes/MetricsBase.h" +#include "nodes/TreeUpdateListener.h" namespace org { namespace apache { @@ -40,7 +40,7 @@ namespace state { * the sink for external updates, and encapsulates the thread pool that runs the listeners for various update operations * that can be performed. */ -class StateManager : public metrics::MetricsReporter, public metrics::MetricsSink, public StateMonitor, public std::enable_shared_from_this<StateManager> { +class StateManager : public response::NodeReporter, public response::ResponseNodeSink, public StateMonitor, public std::enable_shared_from_this<StateManager> { public: StateManager() @@ -77,20 +77,31 @@ class StateManager : public metrics::MetricsReporter, public metrics::MetricsSin * Passes metrics to the update controllers if they are a metrics sink. * @param metrics metric to pass through */ - int16_t setMetrics(const std::shared_ptr<metrics::Metrics> &metrics); + int16_t setResponseNodes(const std::shared_ptr<response::ResponseNode> &metrics); /** * Metrics operations */ - virtual int16_t getMetrics(std::vector<std::shared_ptr<metrics::Metrics>> &metric_vector, uint16_t metricsClass); + virtual int16_t getResponseNodes(std::vector<std::shared_ptr<response::ResponseNode>> &metric_vector, uint16_t metricsClass); + + virtual std::string getVersion(){ + return ""; + } protected: + void shutdownState(){ + listener_thread_pool_.shutdown(); + metrics_maps_.clear(); + updateControllers.clear(); + } + /** * Function to apply updates for a given update controller. + * @param source source identifier * @param updateController update controller mechanism. */ - virtual int16_t applyUpdate(const std::shared_ptr<Update> &updateController) = 0; + virtual int16_t applyUpdate(const std::string &source, const std::shared_ptr<Update> &updateController) = 0; /** * Registers and update controller @@ -108,16 +119,18 @@ class StateManager : public metrics::MetricsReporter, public metrics::MetricsSin std::timed_mutex mutex_; - std::map<std::string, std::shared_ptr<metrics::Metrics>> metrics_maps_; + std::map<std::string, std::shared_ptr<response::ResponseNode>> metrics_maps_; std::vector<std::shared_ptr<UpdateController> > updateControllers; - std::unique_ptr<state::metrics::MetricsListener> metrics_listener_; + std::unique_ptr<state::response::TreeUpdateListener> metrics_listener_; utils::ThreadPool<Update> listener_thread_pool_; }; + + } /* namespace state */ } /* namespace minifi */ } /* namespace nifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/include/core/state/UpdateController.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/state/UpdateController.h b/libminifi/include/core/state/UpdateController.h index fb5f887..d74143a 100644 --- a/libminifi/include/core/state/UpdateController.h +++ b/libminifi/include/core/state/UpdateController.h @@ -217,13 +217,13 @@ class StateMonitor : public StateController { * < 0 is an error code * 0 is success */ - virtual int16_t applyUpdate(const std::string &configuration) = 0; + virtual int16_t applyUpdate(const std::string & source, const std::string &configuration) = 0; /** * Apply an update that the agent must decode. This is useful for certain operations * that can't be encapsulated within these definitions. */ - virtual int16_t applyUpdate(const std::shared_ptr<Update> &updateController) = 0; + virtual int16_t applyUpdate(const std::string &source, const std::shared_ptr<Update> &updateController) = 0; /** * Returns uptime for this module. http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/include/core/state/Value.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/state/Value.h b/libminifi/include/core/state/Value.h new file mode 100644 index 0000000..5abe9a3 --- /dev/null +++ b/libminifi/include/core/state/Value.h @@ -0,0 +1,214 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CORE_STATE_VALUE_H_ +#define LIBMINIFI_INCLUDE_CORE_STATE_VALUE_H_ + +#include <memory> +#include <string> +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace state { +namespace response { + +/** + * Purpose: Represents an AST value + * Contains an embedded string representation to be used for a toString analog. + */ +class Value { + public: + + explicit Value(const std::string &value) + : string_value(value) { + + } + + virtual ~Value() { + + } + std::string getStringValue() const { + return string_value; + } + + protected: + std::string string_value; + +}; + +class IntValue : public Value { + public: + explicit IntValue(int value) + : Value(std::to_string(value)), + value(value) { + + } + int getValue() { + return value; + } + protected: + int value; +}; + +class BoolValue : public Value { + public: + explicit BoolValue(bool value) + : Value(value ? "true" : "false"), + value(value) { + + } + bool getValue() { + return value; + } + protected: + bool value; +}; + +class Int64Value : public Value { + public: + explicit Int64Value(uint64_t value) + : Value(std::to_string(value)), + value(value) { + + } + uint64_t getValue() { + return value; + } + protected: + uint64_t value; +}; + + +static inline std::shared_ptr<Value> createValue( + const bool &object) { + return std::make_shared<BoolValue>(object); +} + +static inline std::shared_ptr<Value> createValue( + const char *object) { + return std::make_shared<Value>(object); +} + +static inline std::shared_ptr<Value> createValue( + char *object) { + return std::make_shared<Value>(std::string(object)); +} + +static inline std::shared_ptr<Value> createValue( + const std::string &object) { + return std::make_shared<Value>(object); +} + + +static inline std::shared_ptr<Value> createValue( + const uint32_t &object) { + return std::make_shared<Int64Value>(object); +} +static inline std::shared_ptr<Value> createValue( + const uint64_t &object) { + return std::make_shared<Int64Value>(object); +} + +static inline std::shared_ptr<Value> createValue( + const int &object) { + return std::make_shared<IntValue>(object); +} + + +/** + * Purpose: ValueNode is the AST container for a value + */ +class ValueNode { + public: + ValueNode() + : value_(nullptr) { + + } + + /** + * Define the representations and eventual storage relationships through + * createValue + */ + template<typename T> + auto operator=( + const T ref) -> typename std::enable_if<std::is_same<T, int >::value || + std::is_same<T, uint32_t >::value || + std::is_same<T, uint64_t >::value || + std::is_same<T, bool >::value || + std::is_same<T, char* >::value || + std::is_same<T, const char* >::value || + std::is_same<T, std::string>::value,ValueNode&>::type { + value_ = createValue(ref); + return *this; + } + + + ValueNode &operator=(const ValueNode &ref) { + value_ = ref.value_; + return *this; + } + + inline bool operator==(const ValueNode &rhs) const { + return to_string() == rhs.to_string(); + } + + inline bool operator==(const char*rhs) const { + return to_string() == rhs; + } + + friend bool operator==(const char *lhs, const ValueNode& rhs) { + return lhs == rhs.to_string(); + } + + std::string to_string() const { + return value_ ? value_->getStringValue() : ""; + } + + std::shared_ptr<Value> getValue() const { + return value_; + } + + bool empty() const { + return value_ == nullptr; + } + + protected: + std::shared_ptr<Value> value_; +}; + +struct SerializedResponseNode { + std::string name; + ValueNode value; + + std::vector<SerializedResponseNode> children; + SerializedResponseNode &operator=(const SerializedResponseNode &other) { + name = other.name; + value = other.value; + children = other.children; + return *this; + } +}; + +} /* namespace metrics */ +} /* namespace state */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_CORE_STATE_VALUE_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/include/core/state/metrics/DeviceInformation.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/state/metrics/DeviceInformation.h b/libminifi/include/core/state/metrics/DeviceInformation.h deleted file mode 100644 index 20e29b3..0000000 --- a/libminifi/include/core/state/metrics/DeviceInformation.h +++ /dev/null @@ -1,313 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef LIBMINIFI_INCLUDE_CORE_STATE_METRICS_DEVICEINFORMATION_H_ -#define LIBMINIFI_INCLUDE_CORE_STATE_METRICS_DEVICEINFORMATION_H_ - -#include "core/Resource.h" -#include <sys/socket.h> -#include <netinet/in.h> -#include <arpa/inet.h> -#include <functional> -#include <sys/ioctl.h> -#if ( defined(__APPLE__) || defined(__MACH__) || defined(BSD)) -#include <net/if_dl.h> -#include <net/if_types.h> -#endif -#include <ifaddrs.h> -#include <net/if.h> -#include <unistd.h> -#include <netinet/in.h> -#include <string.h> -#include <sys/socket.h> -#include <netdb.h> -#include <ifaddrs.h> -#include <stdio.h> -#include <stdlib.h> -#include <unistd.h> -#include <sstream> -#include <map> -#include "MetricsBase.h" -#include "Connection.h" -#include "io/ClientSocket.h" - -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace state { -namespace metrics { - -class Device { - public: - Device() { - initialize(); - } - void initialize() { - addrinfo hints = { sizeof(addrinfo) }; - memset(&hints, 0, sizeof hints); // make sure the struct is empty - hints.ai_family = AF_UNSPEC; - hints.ai_socktype = SOCK_STREAM; - hints.ai_flags = AI_CANONNAME; - hints.ai_protocol = 0; /* any protocol */ - - char hostname[1024]; - hostname[1023] = '\0'; - gethostname(hostname, 1023); - - std::ifstream device_id_file(".device_id"); - if (device_id_file) { - std::string line; - while (device_id_file) { - if (std::getline(device_id_file, line)) - device_id_ += line; - } - device_id_file.close(); - } else { - device_id_ = getDeviceId(); - - std::ofstream outputFile(".device_id"); - if (outputFile) { - outputFile.write(device_id_.c_str(), device_id_.length()); - } - outputFile.close(); - } - - canonical_hostname_ = hostname; - - std::stringstream ips; - for (auto ip : getIpAddresses()) { - if (ip.find("127") == 0 || ip.find("192") == 0) - continue; - ip_ = ip; - break; - } - - } - - std::string canonical_hostname_; - std::string ip_; - std::string device_id_; - protected: - - std::vector<std::string> getIpAddresses() { - std::vector<std::string> ips; - struct ifaddrs *ifaddr, *ifa; - if (getifaddrs(&ifaddr) == -1) { - perror("getifaddrs"); - exit(EXIT_FAILURE); - } - - for (ifa = ifaddr; ifa != NULL; ifa = ifa->ifa_next) { - if ((strcmp("lo", ifa->ifa_name) == 0) || !(ifa->ifa_flags & (IFF_RUNNING))) - continue; - if ((ifa->ifa_addr != NULL) && (ifa->ifa_addr->sa_family == AF_INET)) { - ips.push_back(inet_ntoa(((struct sockaddr_in *) ifa->ifa_addr)->sin_addr)); - } - } - - freeifaddrs(ifaddr); - return ips; - } - -#if __linux__ - std::string getDeviceId() { - - std::hash<std::string> hash_fn; - std::string macs; - struct ifaddrs *ifaddr, *ifa; - int family, s, n; - char host[NI_MAXHOST]; - - if (getifaddrs(&ifaddr) == -1) { - exit(EXIT_FAILURE); - } - - /* Walk through linked list, maintaining head pointer so we - can free list later */ - for (ifa = ifaddr, n = 0; ifa != NULL; ifa = ifa->ifa_next, n++) { - if (ifa->ifa_addr == NULL) - continue; - - family = ifa->ifa_addr->sa_family; - - /* Display interface name and family (including symbolic - form of the latter for the common families) */ - - /* For an AF_INET* interface address, display the address */ - - if (family == AF_INET || family == AF_INET6) { - s = getnameinfo(ifa->ifa_addr, - (family == AF_INET) ? sizeof(struct sockaddr_in) : - sizeof(struct sockaddr_in6), - host, NI_MAXHOST, - NULL, 0, NI_NUMERICHOST); - if (s != 0) { - printf("getnameinfo() failed: %s\n", gai_strerror(s)); - exit(EXIT_FAILURE); - } - - } - } - - freeifaddrs(ifaddr); - - int sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_IP); - struct ifreq ifr; - struct ifconf ifc; - char buf[1024]; - ifc.ifc_len = sizeof(buf); - ifc.ifc_buf = buf; - if (ioctl(sock, SIOCGIFCONF, &ifc) == -1) { /* handle error */} - - struct ifreq* it = ifc.ifc_req; - const struct ifreq* const end = it + (ifc.ifc_len / sizeof(struct ifreq)); - - for (; it != end; ++it) { - strcpy(ifr.ifr_name, it->ifr_name); - if (ioctl(sock, SIOCGIFFLAGS, &ifr) == 0) { - if (! (ifr.ifr_flags & IFF_LOOPBACK)) { // don't count loopback - if (ioctl(sock, SIOCGIFHWADDR, &ifr) == 0) { - unsigned char mac[6]; - - memcpy(mac, ifr.ifr_hwaddr.sa_data, 6); - - char mac_add[13]; - snprintf(mac_add,13,"%02X%02X%02X%02X%02X%02X",mac[0], mac[1], mac[2], mac[3], mac[4], mac[5] ); - - macs+= mac_add; - } - } - - } - else { /* handle error */} - } - - close(sock); - - return std::to_string(hash_fn(macs)); - } -#elif( defined(__unix__) || defined(__APPLE__) || defined(__MACH__) || defined(BSD)) // should work on bsd variants as well - std::string getDeviceId() { - ifaddrs* iflist; - std::hash<std::string> hash_fn; - std::set<std::string> macs; - - if (getifaddrs(&iflist) == 0) { - for (ifaddrs* cur = iflist; cur; cur = cur->ifa_next) { - if (cur->ifa_addr && (cur->ifa_addr->sa_family == AF_LINK) && ((sockaddr_dl*) cur->ifa_addr)->sdl_alen) { - sockaddr_dl* sdl = (sockaddr_dl*) cur->ifa_addr; - - if (sdl->sdl_type != IFT_ETHER) { - continue; - } else { - } - char mac[32]; - memcpy(mac, LLADDR(sdl), sdl->sdl_alen); - char mac_add[13]; - snprintf(mac_add, 13, "%02X%02X%02X%02X%02X%02X", mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]); - ///macs += mac_add; - macs.insert(mac_add); - } - } - - freeifaddrs(iflist); - } - std::string macstr; - for (auto &mac : macs) { - macstr += mac; - } - return macstr.length() > 0 ? std::to_string(hash_fn(macstr)) : "8675309"; - } -#else - std::string getDeviceId() { - return "NaD"; - } -#endif - - // connection information - int32_t socket_file_descriptor_; - - addrinfo *addr_info_; -}; - -/** - * Justification and Purpose: Provides Device Information - */ -class DeviceInformation : public DeviceMetric { - public: - - DeviceInformation(std::string name, uuid_t uuid) - : DeviceMetric(name, uuid) { - static Device device; - hostname_ = device.canonical_hostname_; - ip_ = device.ip_; - device_id_ = device.device_id_; - } - - DeviceInformation(const std::string &name) - : DeviceMetric(name, 0) { - static Device device; - hostname_ = device.canonical_hostname_; - ip_ = device.ip_; - device_id_ = device.device_id_; - } - - std::string getName() const{ - return "NetworkInfo"; - } - - std::vector<MetricResponse> serialize() { - std::vector<MetricResponse> serialized; - - MetricResponse hostname; - hostname.name = "hostname"; - hostname.value = hostname_; - - MetricResponse ip; - ip.name = "ip"; - ip.value = ip_; - - serialized.push_back(hostname); - serialized.push_back(ip); - - MetricResponse device_id; - device_id.name = "deviceid"; - device_id.value = device_id_; - - serialized.push_back(device_id); - - return serialized; - } - - protected: - - std::string hostname_; - std::string ip_; - std::string device_id_; -}; - -REGISTER_RESOURCE(DeviceInformation); - -} /* namespace metrics */ -} /* namespace state */ -} /* namespace minifi */ -} /* namespace nifi */ -} /* namespace apache */ -} /* namespace org */ - -#endif /* LIBMINIFI_INCLUDE_CORE_STATE_METRICS_DEVICEINFORMATION_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/include/core/state/metrics/MetricsBase.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/state/metrics/MetricsBase.h b/libminifi/include/core/state/metrics/MetricsBase.h deleted file mode 100644 index b33ad9a..0000000 --- a/libminifi/include/core/state/metrics/MetricsBase.h +++ /dev/null @@ -1,161 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef LIBMINIFI_INCLUDE_METRICS_METRICSBASE_H_ -#define LIBMINIFI_INCLUDE_METRICS_METRICSBASE_H_ - -#include <vector> -#include <memory> -#include <string> -#include "core/Core.h" -#include "core/Connectable.h" - -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace state { -namespace metrics { - -struct MetricResponse { - std::string name; - std::string value; - std::vector<MetricResponse> children; - MetricResponse &operator=(const MetricResponse &other) { - name = other.name; - value = other.value; - children = other.children; - return *this; - } -}; - -/** - * Purpose: Defines a metric. Serialization is intended to be thread safe. - */ -class Metrics : public core::Connectable { - public: - Metrics() - : core::Connectable("metric", 0) { - } - - Metrics(std::string name, uuid_t uuid) - : core::Connectable(name, uuid) { - } - virtual ~Metrics() { - - } - virtual std::string getName() const = 0; - - virtual std::vector<MetricResponse> serialize() = 0; - - virtual void yield() { - } - virtual bool isRunning() { - return true; - } - virtual bool isWorkAvailable() { - return true; - } - -}; - -/** - * Purpose: Defines a metric that - */ -class DeviceMetric : public Metrics { - public: - DeviceMetric(std::string name, uuid_t uuid) - : Metrics(name, uuid) { - } -}; - -/** - * Purpose: Retrieves Metrics from the defined class. The current Metric, which is a consumable for any reader of Metrics must have the ability to set metrics. - * - */ -class MetricsSource { - public: - - MetricsSource() { - - } - - virtual ~MetricsSource() { - } - - /** - * Retrieves all metrics from this source. - * @param metric_vector -- metrics will be placed in this vector. - * @return result of the get operation. - * 0 Success - * 1 No error condition, but cannot obtain lock in timely manner. - * -1 failure - */ - virtual int16_t getMetrics(std::vector<std::shared_ptr<Metrics>> &metric_vector) = 0; - -}; - -class MetricsReporter { - public: - - MetricsReporter() { - - } - - virtual ~MetricsReporter() { - } - - /** - * Retrieves all metrics from this source. - * @param metric_vector -- metrics will be placed in this vector. - * @return result of the get operation. - * 0 Success - * 1 No error condition, but cannot obtain lock in timely manner. - * -1 failure - */ - virtual int16_t getMetrics(std::vector<std::shared_ptr<Metrics>> &metric_vector, uint16_t metricsClass) = 0; - -}; - -/** - * Purpose: Sink interface for all metrics. The current Metric, which is a consumable for any reader of Metrics must have the ability to set metrics. - * - */ -class MetricsSink { - public: - - virtual ~MetricsSink() { - } - /** - * Setter for metrics in this sink. - * @param metrics metrics to insert into the current sink. - * @return result of the set operation. - * 0 Success - * 1 No error condition, but cannot obtain lock in timely manner. - * -1 failure - */ - virtual int16_t setMetrics(const std::shared_ptr<Metrics> &metrics) = 0; -}; - -} /* namespace metrics */ -} /* namespace state */ -} /* namespace minifi */ -} /* namespace nifi */ -} /* namespace apache */ -} /* namespace org */ - -#endif /* LIBMINIFI_INCLUDE_C2_METRICS_METRICSBASE_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/include/core/state/metrics/MetricsListener.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/state/metrics/MetricsListener.h b/libminifi/include/core/state/metrics/MetricsListener.h deleted file mode 100644 index c471f60..0000000 --- a/libminifi/include/core/state/metrics/MetricsListener.h +++ /dev/null @@ -1,128 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef LIBMINIFI_INCLUDE_C2_METRICS_H_ -#define LIBMINIFI_INCLUDE_C2_METRICS_H_ - -#include <vector> - -#include "MetricsBase.h" -#include "core/state/UpdateController.h" - -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace state { -namespace metrics { - -/** - * Purpose: Class that will represent the metrics updates, which can be performed asynchronously. - */ -class MetricsUpdate : public Update { - public: - MetricsUpdate(UpdateStatus status) - : Update(status) { - - } - virtual bool validate() { - return true; - } -}; - -class MetricsWatcher : public utils::AfterExecute<Update> { - public: - explicit MetricsWatcher(std::atomic<bool> *running) - : running_(running) { - } - - explicit MetricsWatcher(MetricsWatcher && other) - : running_(std::move(other.running_)) { - - } - - ~MetricsWatcher() { - - } - - virtual bool isFinished(const Update &result) { - if (result.getStatus().getState() == UpdateState::READ_COMPLETE && running_) { - return false; - } else { - return true; - } - } - virtual bool isCancelled(const Update &result) { - return false; - } - - protected: - std::atomic<bool> *running_; - -}; - -class MetricsListener { - public: - MetricsListener(const std::shared_ptr<metrics::MetricsReporter> &source, const std::shared_ptr<metrics::MetricsSink> &sink) - : running_(true), - source_(source), - sink_(sink){ - - function_ = [&]() { - while(running_) { - std::vector<std::shared_ptr<metrics::Metrics>> metric_vector; - // simple pass through for the metrics - if (nullptr != source_ && nullptr != sink_) { - source_->getMetrics(metric_vector,0); - for(auto metric : metric_vector) { - sink_->setMetrics(metric); - } - } - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); - } - return MetricsUpdate(UpdateState::READ_COMPLETE); - }; - } - - void stop() { - running_ = false; - } - - std::function<Update()> &getFunction() { - return function_; - } - - std::future<Update> &getFuture() { - return future_; - } - - private: - std::function<Update()> function_; - std::future<Update> future_; - std::atomic<bool> running_; - std::shared_ptr<metrics::MetricsReporter> source_; - std::shared_ptr<metrics::MetricsSink> sink_; -}; - -} /* namespace metrics */ -} /* namespace state */ -} /* namespace minifi */ -} /* namespace nifi */ -} /* namespace apache */ -} /* namespace org */ - -#endif /* LIBMINIFI_INCLUDE_C2_METRICS_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/include/core/state/metrics/ProcessMetrics.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/state/metrics/ProcessMetrics.h b/libminifi/include/core/state/metrics/ProcessMetrics.h deleted file mode 100644 index f3b6d23..0000000 --- a/libminifi/include/core/state/metrics/ProcessMetrics.h +++ /dev/null @@ -1,102 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef LIBMINIFI_INCLUDE_CORE_STATE_METRICS_PROCMETRICS_H_ -#define LIBMINIFI_INCLUDE_CORE_STATE_METRICS_PROCMETRICS_H_ - -#include "core/Resource.h" -#include <sstream> -#include <map> -#include <sys/time.h> -#include <sys/resource.h> -#include "MetricsBase.h" -#include "Connection.h" -#include "DeviceInformation.h" -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace state { -namespace metrics { - -/** - * Justification and Purpose: Provides Connection queue metrics. Provides critical information to the - * C2 server. - * - */ -class ProcessMetrics : public Metrics { - public: - - ProcessMetrics(const std::string &name, uuid_t uuid) - : Metrics(name, uuid) { - } - - ProcessMetrics(const std::string &name) - : Metrics(name, 0) { - } - - ProcessMetrics() { - } - - virtual std::string getName() const { - return "ProcessMetrics"; - } - - std::vector<MetricResponse> serialize() { - std::vector<MetricResponse> serialized; - - struct rusage my_usage; - getrusage(RUSAGE_SELF, &my_usage); - - MetricResponse memory; - memory.name = "MemoryMetrics"; - - MetricResponse maxrss; - maxrss.name = "maxrss"; - - maxrss.value = std::to_string(my_usage.ru_maxrss); - - memory.children.push_back(maxrss); - serialized.push_back(memory); - - MetricResponse cpu; - cpu.name = "CpuMetrics"; - MetricResponse ics; - ics.name = "involcs"; - - ics.value = std::to_string(my_usage.ru_nivcsw); - - cpu.children.push_back(ics); - serialized.push_back(cpu); - - return serialized; - } - - protected: - -}; - -REGISTER_RESOURCE(ProcessMetrics); - -} /* namespace metrics */ -} /* namespace state */ -} /* namespace minifi */ -} /* namespace nifi */ -} /* namespace apache */ -} /* namespace org */ - -#endif /* LIBMINIFI_INCLUDE_CORE_STATE_METRICS_QUEUEMETRICS_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/include/core/state/metrics/QueueMetrics.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/state/metrics/QueueMetrics.h b/libminifi/include/core/state/metrics/QueueMetrics.h deleted file mode 100644 index b55f164..0000000 --- a/libminifi/include/core/state/metrics/QueueMetrics.h +++ /dev/null @@ -1,106 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef LIBMINIFI_INCLUDE_CORE_STATE_METRICS_QUEUEMETRICS_H_ -#define LIBMINIFI_INCLUDE_CORE_STATE_METRICS_QUEUEMETRICS_H_ - -#include <sstream> -#include <map> - -#include "MetricsBase.h" -#include "Connection.h" -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace state { -namespace metrics { - -/** - * Justification and Purpose: Provides Connection queue metrics. Provides critical information to the - * C2 server. - * - */ -class QueueMetrics : public Metrics { - public: - - QueueMetrics(const std::string &name, uuid_t uuid) - : Metrics(name, uuid) { - } - - QueueMetrics(const std::string &name) - : Metrics(name, 0) { - } - - QueueMetrics() - : Metrics("QueueMetrics", 0) { - } - - virtual std::string getName() const{ - return "QueueMetrics"; - } - - void addConnection(const std::shared_ptr<minifi::Connection> &connection) { - if (nullptr != connection) { - connections.insert(std::make_pair(connection->getName(), connection)); - } - } - - std::vector<MetricResponse> serialize() { - std::vector<MetricResponse> serialized; - for (auto conn : connections) { - auto connection = conn.second; - MetricResponse parent; - parent.name = connection->getName(); - MetricResponse datasize; - datasize.name = "datasize"; - datasize.value = std::to_string(connection->getQueueDataSize()); - - MetricResponse datasizemax; - datasizemax.name = "datasizemax"; - datasizemax.value = std::to_string(connection->getMaxQueueDataSize()); - - MetricResponse queuesize; - queuesize.name = "queued"; - queuesize.value = std::to_string(connection->getQueueSize()); - - MetricResponse queuesizemax; - queuesizemax.name = "queuedmax"; - queuesizemax.value = std::to_string(connection->getMaxQueueSize()); - - parent.children.push_back(datasize); - parent.children.push_back(datasizemax); - parent.children.push_back(queuesize); - parent.children.push_back(queuesizemax); - - serialized.push_back(parent); - } - return serialized; - } - - protected: - std::map<std::string, std::shared_ptr<minifi::Connection>> connections; -}; - -} /* namespace metrics */ -} /* namespace state */ -} /* namespace minifi */ -} /* namespace nifi */ -} /* namespace apache */ -} /* namespace org */ - -#endif /* LIBMINIFI_INCLUDE_CORE_STATE_METRICS_QUEUEMETRICS_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/include/core/state/metrics/RepositoryMetrics.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/state/metrics/RepositoryMetrics.h b/libminifi/include/core/state/metrics/RepositoryMetrics.h deleted file mode 100644 index 8592257..0000000 --- a/libminifi/include/core/state/metrics/RepositoryMetrics.h +++ /dev/null @@ -1,101 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef LIBMINIFI_INCLUDE_CORE_STATE_METRICS_REPOSITORYMETRICS_H_ -#define LIBMINIFI_INCLUDE_CORE_STATE_METRICS_REPOSITORYMETRICS_H_ - -#include <sstream> -#include <map> - -#include "MetricsBase.h" -#include "Connection.h" -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace state { -namespace metrics { - -/** - * Justification and Purpose: Provides repository metrics. Provides critical information to the - * C2 server. - * - */ -class RepositoryMetrics : public Metrics { - public: - - RepositoryMetrics(const std::string &name, uuid_t uuid) - : Metrics(name, uuid) { - } - - RepositoryMetrics(const std::string &name) - : Metrics(name, 0) { - } - - RepositoryMetrics() - : Metrics("RepositoryMetrics", 0) { - } - - virtual std::string getName() const { - return "RepositoryMetrics"; - } - - void addRepository(const std::shared_ptr<core::Repository> &repo) { - if (nullptr != repo) { - repositories.insert(std::make_pair(repo->getName(), repo)); - } - } - - std::vector<MetricResponse> serialize() { - std::vector<MetricResponse> serialized; - for (auto conn : repositories) { - auto repo = conn.second; - MetricResponse parent; - parent.name = repo->getName(); - MetricResponse datasize; - datasize.name = "running"; - datasize.value = std::to_string(repo->isRunning()); - - MetricResponse datasizemax; - datasizemax.name = "full"; - datasizemax.value = std::to_string(repo->isFull()); - - MetricResponse queuesize; - queuesize.name = "size"; - queuesize.value = std::to_string(repo->getRepoSize()); - - parent.children.push_back(datasize); - parent.children.push_back(datasizemax); - parent.children.push_back(queuesize); - - serialized.push_back(parent); - } - return serialized; - } - - protected: - std::map<std::string, std::shared_ptr<core::Repository>> repositories; -}; - -} /* namespace metrics */ -} /* namespace state */ -} /* namespace minifi */ -} /* namespace nifi */ -} /* namespace apache */ -} /* namespace org */ - -#endif /* LIBMINIFI_INCLUDE_CORE_STATE_METRICS_RepositoryMetrics_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/include/core/state/metrics/SystemMetrics.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/state/metrics/SystemMetrics.h b/libminifi/include/core/state/metrics/SystemMetrics.h deleted file mode 100644 index 8bcf5be..0000000 --- a/libminifi/include/core/state/metrics/SystemMetrics.h +++ /dev/null @@ -1,108 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef LIBMINIFI_INCLUDE_CORE_STATE_METRICS_SYSMETRICS_H_ -#define LIBMINIFI_INCLUDE_CORE_STATE_METRICS_SYSMETRICS_H_ - -#include "core/Resource.h" -#include <sstream> -#include <map> -#ifndef _WIN32 -#include <sys/utsname.h> -#endif -#include "MetricsBase.h" -#include "Connection.h" -#include "DeviceInformation.h" -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace state { -namespace metrics { - -/** - * Justification and Purpose: Provides system information, including critical device information. - * - */ -class SystemInformation : public DeviceInformation { - public: - - SystemInformation(const std::string &name, uuid_t uuid) - : DeviceInformation(name, uuid) { - } - - SystemInformation(const std::string &name) - : DeviceInformation(name, 0) { - } - - SystemInformation() - : DeviceInformation("SystemInformation", 0) { - } - - virtual std::string getName() const{ - return "SystemInformation"; - } - - std::vector<MetricResponse> serialize() { - std::vector<MetricResponse> serialized; - - MetricResponse vcores; - vcores.name = "vcores"; - size_t ncpus = std::thread::hardware_concurrency(); - - vcores.value = std::to_string(ncpus); - - serialized.push_back(vcores); - - MetricResponse mem; - mem.name = "physicalmem"; -#if defined(_SC_PHYS_PAGES) && defined(_SC_PAGESIZE) - size_t mema = (size_t) sysconf( _SC_PHYS_PAGES) * (size_t) sysconf( _SC_PAGESIZE); -#endif - mem.value = std::to_string(mema); - - serialized.push_back(mem); - - MetricResponse arch; - arch.name = "machinearch"; - - utsname buf; - - if (uname(&buf) == -1) { - arch.value = "unknown"; - } else { - arch.value = buf.machine; - } - - serialized.push_back(arch); - - return serialized; - } - - protected: - -}; - -REGISTER_RESOURCE(SystemInformation); -} /* namespace metrics */ -} /* namespace state */ -} /* namespace minifi */ -} /* namespace nifi */ -} /* namespace apache */ -} /* namespace org */ - -#endif /* LIBMINIFI_INCLUDE_CORE_STATE_METRICS_QUEUEMETRICS_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/include/core/state/nodes/AgentInformation.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/state/nodes/AgentInformation.h b/libminifi/include/core/state/nodes/AgentInformation.h new file mode 100644 index 0000000..bc3f496 --- /dev/null +++ b/libminifi/include/core/state/nodes/AgentInformation.h @@ -0,0 +1,586 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CORE_STATE_NODES_AGENTINFORMATION_H_ +#define LIBMINIFI_INCLUDE_CORE_STATE_NODES_AGENTINFORMATION_H_ + +#include "core/Resource.h" +#include <sys/socket.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <functional> +#include <sys/ioctl.h> +#if ( defined(__APPLE__) || defined(__MACH__) || defined(BSD)) +#include <net/if_dl.h> +#include <net/if_types.h> +#endif +#include <ifaddrs.h> +#include <net/if.h> +#include <unistd.h> +#include <netinet/in.h> +#include <string.h> +#include <sys/socket.h> +#include <netdb.h> +#include <ifaddrs.h> +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> +#include <sstream> +#include <map> +#include "../nodes/MetricsBase.h" +#include "Connection.h" +#include "io/ClientSocket.h" +#include "agent/agent_version.h" +#include "agent/build_description.h" +#include "core/ClassLoader.h" +#include "../nodes/StateMonitor.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace state { +namespace response { + +#define GROUP_STR "org::apache::nifi::minifi" + +class Bundles : public DeviceInformation { + public: + Bundles(std::string name, uuid_t uuid) + : DeviceInformation(name, uuid) { + setArray(true); + } + + Bundles(const std::string &name) + : DeviceInformation(name, 0) { + setArray(true); + } + + std::string getName() const { + return "bundles"; + } + + std::vector<SerializedResponseNode> serialize() { + std::vector<SerializedResponseNode> serialized; + + for (auto group : AgentBuild::getExtensions()) { + SerializedResponseNode bundle; + bundle.name = "bundles"; + + SerializedResponseNode bgroup; + bgroup.name = "group"; + bgroup.value = GROUP_STR; + SerializedResponseNode artifact; + artifact.name = "artifact"; + artifact.value = group; + SerializedResponseNode version; + version.name = "version"; + version.value = AgentBuild::VERSION; + + bundle.children.push_back(bgroup); + bundle.children.push_back(artifact); + bundle.children.push_back(version); + serialized.push_back(bundle); + } + + return serialized; + } + +}; + +/** + * Justification and Purpose: Provides available extensions for the agent information block. + */ +class AgentStatus : public StateMonitorNode { + public: + + AgentStatus(std::string name, uuid_t uuid) + : StateMonitorNode(name, uuid) { + + } + + AgentStatus(const std::string &name) + : StateMonitorNode(name, 0) { + } + + std::string getName() const { + return "status"; + } + + void setRepositories(const std::map<std::string, std::shared_ptr<core::Repository>> &repositories) { + repositories_ = repositories; + } + + std::vector<SerializedResponseNode> serialize() { + std::vector<SerializedResponseNode> serialized; + + SerializedResponseNode uptime; + + uptime.name = "uptime"; + if (nullptr != monitor_) + uptime.value = monitor_->getUptime(); + else { + uptime.value = "0"; + } + + if (!repositories_.empty()) { + SerializedResponseNode repositories; + + repositories.name = "repositories"; + + for (auto &repo : repositories_) { + SerializedResponseNode repoNode; + + repoNode.name = repo.first; + + SerializedResponseNode queuesize; + queuesize.name = "size"; + queuesize.value = repo.second->getRepoSize(); + + repoNode.children.push_back(queuesize); + + repositories.children.push_back(repoNode); + + } + serialized.push_back(repositories); + } + + serialized.push_back(uptime); + + if (nullptr != monitor_) { + auto components = monitor_->getAllComponents(); + SerializedResponseNode componentsNode; + + componentsNode.name = "components"; + + for (auto component : components) { + SerializedResponseNode componentNode; + + componentNode.name = component->getComponentName(); + + SerializedResponseNode componentStatusNode; + componentStatusNode.name = "running"; + componentStatusNode.value = component->isRunning(); + + componentNode.children.push_back(componentStatusNode); + + componentsNode.children.push_back(componentNode); + } + serialized.push_back(componentsNode); + } + + return serialized; + } + protected: + std::map<std::string, std::shared_ptr<core::Repository>> repositories_; +}; + +class AgentIdentifier { + public: + + AgentIdentifier(){ + + } + + void setIdentifier(const std::string &identifier) { + identifier_ = identifier; + } + + void setAgentClass(const std::string &agentClass){ + agent_class_ = agentClass; + } + + protected: + std::string identifier_; + std::string agent_class_; +}; + +class AgentMonitor { + public: + + AgentMonitor() + : monitor_(nullptr) { + + } + void addRepository(const std::shared_ptr<core::Repository> &repo) { + if (nullptr != repo) { + repositories_.insert(std::make_pair(repo->getName(), repo)); + } + } + + void setStateMonitor(const std::shared_ptr<state::StateMonitor> &monitor) { + monitor_ = monitor; + } + + protected: + std::map<std::string, std::shared_ptr<core::Repository>> repositories_; + std::shared_ptr<state::StateMonitor> monitor_; +}; + +class ComponentManifest : public DeviceInformation { + public: + ComponentManifest(std::string name, uuid_t uuid) + : DeviceInformation(name, uuid) { + } + + ComponentManifest(const std::string &name) + : DeviceInformation(name, 0) { + } + + std::string getName() const { + return "componentManifest"; + } + + std::vector<SerializedResponseNode> serialize() { + std::vector<SerializedResponseNode> serialized; + struct Components group = BuildDescription::getClassDescriptions(); + serializeClassDescription(group.processors_, "processors", serialized); + serializeClassDescription(group.controller_services_, "controllserServices", serialized); + return serialized; + } + protected: + + void serializeClassDescription(const std::vector<ClassDescription> &descriptions, const std::string name, std::vector<SerializedResponseNode> &response) { + SerializedResponseNode resp; + resp.name = " resp"; + if (!descriptions.empty()) { + + SerializedResponseNode type; + type.name = name; + + for (auto group : descriptions) { + + SerializedResponseNode desc; + desc.name = group.class_name_; + + SerializedResponseNode bgroup; + bgroup.name = "group"; + bgroup.value = GROUP_STR; + SerializedResponseNode artifact; + artifact.name = "artifact"; + artifact.value = group.class_name_; + SerializedResponseNode version; + version.name = "version"; + version.value = AgentBuild::VERSION; + + if (!group.class_properties_.empty()) { + SerializedResponseNode props; + props.name = "propertyDescriptors"; + for (auto && prop : group.class_properties_) { + + SerializedResponseNode child; + child.name = prop.first; + + SerializedResponseNode descriptorName; + descriptorName.name = "name"; + descriptorName.value = prop.first; + + SerializedResponseNode descriptorDescription; + descriptorDescription.name = "description"; + descriptorDescription.value = prop.second; + + child.children.push_back(descriptorName); + child.children.push_back(descriptorDescription); + + props.children.push_back(child); + } + + desc.children.push_back(props); + } + + SerializedResponseNode dyn_prop; + dyn_prop.name = "supportsDynamicProperties"; + dyn_prop.value = group.support_dynamic_; + + desc.children.push_back(dyn_prop); + + desc.children.push_back(bgroup); + desc.children.push_back(artifact); + desc.children.push_back(version); + + SerializedResponseNode buildInfo; + buildInfo.name = "buildInfo"; + + SerializedResponseNode build_version; + build_version.name = "version"; + build_version.value = AgentBuild::VERSION; + + SerializedResponseNode build_rev; + build_rev.name = "revision"; + build_rev.value = AgentBuild::BUILD_REV; + + SerializedResponseNode build_date; + build_date.name = "timestamp"; + build_date.value = (uint64_t)std::stoull(AgentBuild::BUILD_DATE); + + SerializedResponseNode compiler_command; + compiler_command.name = "compiler"; + compiler_command.value = AgentBuild::COMPILER; + + SerializedResponseNode compiler_flags; + compiler_flags.name = "flags"; + compiler_flags.value = AgentBuild::COMPILER_FLAGS; + + buildInfo.children.push_back(compiler_flags); + buildInfo.children.push_back(compiler_command); + + buildInfo.children.push_back(build_version); + buildInfo.children.push_back(build_rev); + buildInfo.children.push_back(build_date); + desc.children.push_back(buildInfo); + type.children.push_back(desc); + } + resp.children.push_back(type); + } + response.push_back(resp); + + } +}; + +/** + * Justification and Purpose: Provides available extensions for the agent information block. + */ +class AgentManifest : public DeviceInformation { + public: + + AgentManifest(std::string name, uuid_t uuid) + : DeviceInformation(name, uuid) { + //setArray(true); + } + + AgentManifest(const std::string &name) + : DeviceInformation(name, 0) { + // setArray(true); + } + + std::string getName() const { + return "agentManifest"; + } + + std::vector<SerializedResponseNode> serialize() { + std::vector<SerializedResponseNode> serialized; + + SerializedResponseNode ident; + + ident.name = "identifier"; + ident.value = AgentBuild::BUILD_IDENTIFIER; + + SerializedResponseNode type; + + type.name = "agentType"; + type.value = "cpp"; + + SerializedResponseNode version; + + version.name = "version"; + version.value = AgentBuild::VERSION; + + SerializedResponseNode buildInfo; + buildInfo.name = "buildInfo"; + + SerializedResponseNode build_version; + build_version.name = "version"; + build_version.value = AgentBuild::VERSION; + + SerializedResponseNode build_rev; + build_rev.name = "revision"; + build_rev.value = AgentBuild::BUILD_REV; + + SerializedResponseNode build_date; + build_date.name = "timestamp"; + build_date.value = (uint64_t)std::stoull(AgentBuild::BUILD_DATE); + + SerializedResponseNode compiler_command; + compiler_command.name = "compiler"; + compiler_command.value = AgentBuild::COMPILER; + + SerializedResponseNode compiler_flags; + compiler_flags.name = "flags"; + compiler_flags.value = AgentBuild::COMPILER_FLAGS; + + buildInfo.children.push_back(compiler_flags); + buildInfo.children.push_back(compiler_command); + + buildInfo.children.push_back(build_version); + buildInfo.children.push_back(build_rev); + buildInfo.children.push_back(build_date); + + Bundles bundles("bundles", nullptr); + + serialized.push_back(ident); + serialized.push_back(type); + serialized.push_back(buildInfo); + // serialize the bundle information. + for (auto bundle : bundles.serialize()) { + serialized.push_back(bundle); + } + + ComponentManifest compMan("componentManifest", nullptr); + // serialize the component information. + for (auto component : compMan.serialize()) { + serialized.push_back(component); + } + + return serialized; + } +}; + +/** + * Purpose and Justification: Prints classes along with their properties for the current agent. + */ +class AgentInformation : public DeviceInformation, public AgentMonitor, public AgentIdentifier { + public: + + AgentInformation(std::string name, uuid_t uuid) + : DeviceInformation(name, uuid) { + } + + AgentInformation(const std::string &name) + : DeviceInformation(name, 0) { + } + + std::string getName() const { + return "agentInfo"; + } + + std::vector<SerializedResponseNode> serialize() { + std::vector<SerializedResponseNode> serialized; + + SerializedResponseNode ident; + + ident.name = "identifier"; + ident.value = identifier_; + + SerializedResponseNode agentClass; + agentClass.name = "agentClass"; + agentClass.value = agent_class_; + + AgentManifest manifest("manifest", nullptr); + + SerializedResponseNode agentManifest; + agentManifest.name = "agentManifest"; + for (auto &ser : manifest.serialize()) { + agentManifest.children.push_back(std::move(ser)); + } + + AgentStatus status("status"); + status.setRepositories(repositories_); + status.setStateMonitor(monitor_); + + SerializedResponseNode agentStatus; + agentStatus.name = "status"; + for (auto &ser : status.serialize()) { + agentStatus.children.push_back(std::move(ser)); + } + + serialized.push_back(ident); + serialized.push_back(agentClass); + serialized.push_back(agentManifest); + serialized.push_back(agentStatus); + return serialized; + } + + protected: + + void serializeClass(const std::vector<ClassDescription> &processors, const std::vector<ClassDescription> &controller_services, const std::vector<ClassDescription> &other_components, + std::vector<SerializedResponseNode> &response) { + SerializedResponseNode resp; + resp.name = " resp"; + if (!processors.empty()) { + SerializedResponseNode type; + type.name = "Processors"; + + for (auto group : processors) { + SerializedResponseNode desc; + + desc.name = group.class_name_; + + if (!group.class_properties_.empty()) { + SerializedResponseNode props; + props.name = "properties"; + for (auto && prop : group.class_properties_) { + SerializedResponseNode child; + child.name = prop.first; + child.value = prop.second; + props.children.push_back(child); + } + + desc.children.push_back(props); + } + + SerializedResponseNode dyn_prop; + dyn_prop.name = "supportsDynamicProperties"; + dyn_prop.value = group.support_dynamic_; + + desc.children.push_back(dyn_prop); + + type.children.push_back(desc); + } + + resp.children.push_back(type); + + } + + if (!controller_services.empty()) { + SerializedResponseNode type; + type.name = "ControllerServices"; + + for (auto group : controller_services) { + SerializedResponseNode desc; + + desc.name = group.class_name_; + + if (!group.class_properties_.empty()) { + SerializedResponseNode props; + props.name = "properties"; + for (auto && prop : group.class_properties_) { + SerializedResponseNode child; + child.name = prop.first; + child.value = prop.second; + props.children.push_back(child); + } + + desc.children.push_back(props); + } + + SerializedResponseNode dyn_prop; + dyn_prop.name = "supportsDynamicProperties"; + dyn_prop.value = group.support_dynamic_; + + desc.children.push_back(dyn_prop); + + type.children.push_back(desc); + } + + resp.children.push_back(type); + + } + response.push_back(resp); + + } +}; + +REGISTER_RESOURCE(AgentInformation); + +} /* namespace metrics */ +} /* namespace state */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_CORE_STATE_NODES_AGENTINFORMATION_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/include/core/state/nodes/BuildInformation.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/state/nodes/BuildInformation.h b/libminifi/include/core/state/nodes/BuildInformation.h new file mode 100644 index 0000000..bd7e08c --- /dev/null +++ b/libminifi/include/core/state/nodes/BuildInformation.h @@ -0,0 +1,135 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CORE_STATE_METRICS_BuildInformation_H_ +#define LIBMINIFI_INCLUDE_CORE_STATE_METRICS_BuildInformation_H_ + +#include "core/Resource.h" +#include <sys/socket.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <functional> +#include <sys/ioctl.h> +#if ( defined(__APPLE__) || defined(__MACH__) || defined(BSD)) +#include <net/if_dl.h> +#include <net/if_types.h> +#endif +#include <ifaddrs.h> +#include <net/if.h> +#include <unistd.h> +#include <netinet/in.h> +#include <string.h> +#include <sys/socket.h> +#include <netdb.h> +#include <ifaddrs.h> +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> +#include <sstream> +#include <map> +#include "../nodes/MetricsBase.h" +#include "Connection.h" +#include "io/ClientSocket.h" +#include "../../../agent/agent_version.h" +#include "core/ClassLoader.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace state { +namespace response { + +/** + * Justification and Purpose: Provides build information + * for this agent. + */ +class BuildInformation : public DeviceInformation { + public: + + BuildInformation(std::string name, uuid_t uuid) + : DeviceInformation(name, uuid) { + } + + BuildInformation(const std::string &name) + : DeviceInformation(name, 0) { + } + + std::string getName() const { + return "BuildInformation"; + } + + std::vector<SerializedResponseNode> serialize() { + std::vector<SerializedResponseNode> serialized; + + SerializedResponseNode build_version; + build_version.name = "build_version"; + build_version.value = AgentBuild::VERSION; + + SerializedResponseNode build_rev; + build_rev.name = "build_rev"; + build_rev.value = AgentBuild::BUILD_REV; + + SerializedResponseNode build_date; + build_date.name = "build_date"; + build_date.value = AgentBuild::BUILD_DATE; + + SerializedResponseNode compiler; + compiler.name = "compiler"; + { + + SerializedResponseNode compiler_command; + compiler_command.name = "compiler_command"; + compiler_command.value = AgentBuild::COMPILER; + + SerializedResponseNode compiler_version; + compiler_version.name = "compiler_version"; + compiler_version.value = AgentBuild::COMPILER_VERSION; + + SerializedResponseNode compiler_flags; + compiler_flags.name = "compiler_flags"; + compiler_flags.value = AgentBuild::COMPILER_FLAGS; + + compiler.children.push_back(compiler_command); + compiler.children.push_back(compiler_version); + compiler.children.push_back(compiler_flags); + + } + SerializedResponseNode device_id; + device_id.name = "device_id"; + device_id.value = AgentBuild::BUILD_IDENTIFIER; + + serialized.push_back(build_version); + serialized.push_back(build_rev); + serialized.push_back(build_date); + serialized.push_back(compiler); + serialized.push_back(device_id); + + return serialized; + } +}; + +REGISTER_RESOURCE(BuildInformation); + +} /* namespace metrics */ +} /* namespace state */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_CORE_STATE_METRICS_BuildInformation_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/include/core/state/nodes/DeviceInformation.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/state/nodes/DeviceInformation.h b/libminifi/include/core/state/nodes/DeviceInformation.h new file mode 100644 index 0000000..dfeb265 --- /dev/null +++ b/libminifi/include/core/state/nodes/DeviceInformation.h @@ -0,0 +1,355 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CORE_STATE_NODES_DEVICEINFORMATION_H_ +#define LIBMINIFI_INCLUDE_CORE_STATE_NODES_DEVICEINFORMATION_H_ + +#include "core/Resource.h" +#include <sys/socket.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <functional> +#include <sys/ioctl.h> +#if ( defined(__APPLE__) || defined(__MACH__) || defined(BSD)) +#include <net/if_dl.h> +#include <net/if_types.h> +#endif +#ifndef _WIN32 +#include <sys/utsname.h> +#endif +#include <ifaddrs.h> +#include <net/if.h> +#include <unistd.h> +#include <netinet/in.h> +#include <string.h> +#include <sys/socket.h> +#include <netdb.h> +#include <ifaddrs.h> +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> +#include <sstream> +#include <map> +#include "../nodes/MetricsBase.h" +#include "Connection.h" +#include "io/ClientSocket.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace state { +namespace response { + +class Device { + public: + Device() { + initialize(); + } + void initialize() { + addrinfo hints = { sizeof(addrinfo) }; + memset(&hints, 0, sizeof hints); // make sure the struct is empty + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_CANONNAME; + hints.ai_protocol = 0; /* any protocol */ + + char hostname[1024]; + hostname[1023] = '\0'; + gethostname(hostname, 1023); + + std::ifstream device_id_file(".device_id"); + if (device_id_file) { + std::string line; + while (device_id_file) { + if (std::getline(device_id_file, line)) + device_id_ += line; + } + device_id_file.close(); + } else { + device_id_ = getDeviceId(); + + std::ofstream outputFile(".device_id"); + if (outputFile) { + outputFile.write(device_id_.c_str(), device_id_.length()); + } + outputFile.close(); + } + + canonical_hostname_ = hostname; + + std::stringstream ips; + for (auto ip : getIpAddresses()) { + if (ip.find("127") == 0 || ip.find("192") == 0) + continue; + ip_ = ip; + break; + } + + } + + std::string canonical_hostname_; + std::string ip_; + std::string device_id_; + protected: + + std::vector<std::string> getIpAddresses() { + std::vector<std::string> ips; + struct ifaddrs *ifaddr, *ifa; + if (getifaddrs(&ifaddr) == -1) { + perror("getifaddrs"); + exit(EXIT_FAILURE); + } + + for (ifa = ifaddr; ifa != NULL; ifa = ifa->ifa_next) { + if ((strcmp("lo", ifa->ifa_name) == 0) || !(ifa->ifa_flags & (IFF_RUNNING))) + continue; + if ((ifa->ifa_addr != NULL) && (ifa->ifa_addr->sa_family == AF_INET)) { + ips.push_back(inet_ntoa(((struct sockaddr_in *) ifa->ifa_addr)->sin_addr)); + } + } + + freeifaddrs(ifaddr); + return ips; + } + +#if __linux__ + std::string getDeviceId() { + + std::hash<std::string> hash_fn; + std::string macs; + struct ifaddrs *ifaddr, *ifa; + int family, s, n; + char host[NI_MAXHOST]; + + if (getifaddrs(&ifaddr) == -1) { + exit(EXIT_FAILURE); + } + + /* Walk through linked list, maintaining head pointer so we + can free list later */ + for (ifa = ifaddr, n = 0; ifa != NULL; ifa = ifa->ifa_next, n++) { + if (ifa->ifa_addr == NULL) + continue; + + family = ifa->ifa_addr->sa_family; + + /* Display interface name and family (including symbolic + form of the latter for the common families) */ + + /* For an AF_INET* interface address, display the address */ + + if (family == AF_INET || family == AF_INET6) { + s = getnameinfo(ifa->ifa_addr, + (family == AF_INET) ? sizeof(struct sockaddr_in) : + sizeof(struct sockaddr_in6), + host, NI_MAXHOST, + NULL, 0, NI_NUMERICHOST); + if (s != 0) { + printf("getnameinfo() failed: %s\n", gai_strerror(s)); + exit(EXIT_FAILURE); + } + + } + } + + freeifaddrs(ifaddr); + + int sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_IP); + struct ifreq ifr; + struct ifconf ifc; + char buf[1024]; + ifc.ifc_len = sizeof(buf); + ifc.ifc_buf = buf; + if (ioctl(sock, SIOCGIFCONF, &ifc) == -1) { /* handle error */} + + struct ifreq* it = ifc.ifc_req; + const struct ifreq* const end = it + (ifc.ifc_len / sizeof(struct ifreq)); + + for (; it != end; ++it) { + strcpy(ifr.ifr_name, it->ifr_name); + if (ioctl(sock, SIOCGIFFLAGS, &ifr) == 0) { + if (! (ifr.ifr_flags & IFF_LOOPBACK)) { // don't count loopback + if (ioctl(sock, SIOCGIFHWADDR, &ifr) == 0) { + unsigned char mac[6]; + + memcpy(mac, ifr.ifr_hwaddr.sa_data, 6); + + char mac_add[13]; + snprintf(mac_add,13,"%02X%02X%02X%02X%02X%02X",mac[0], mac[1], mac[2], mac[3], mac[4], mac[5] ); + + macs+= mac_add; + } + } + + } + else { /* handle error */} + } + + close(sock); + + return std::to_string(hash_fn(macs)); + } +#elif( defined(__unix__) || defined(__APPLE__) || defined(__MACH__) || defined(BSD)) // should work on bsd variants as well + std::string getDeviceId() { + ifaddrs* iflist; + std::hash<std::string> hash_fn; + std::set<std::string> macs; + + if (getifaddrs(&iflist) == 0) { + for (ifaddrs* cur = iflist; cur; cur = cur->ifa_next) { + if (cur->ifa_addr && (cur->ifa_addr->sa_family == AF_LINK) && ((sockaddr_dl*) cur->ifa_addr)->sdl_alen) { + sockaddr_dl* sdl = (sockaddr_dl*) cur->ifa_addr; + + if (sdl->sdl_type != IFT_ETHER) { + continue; + } else { + } + char mac[32]; + memcpy(mac, LLADDR(sdl), sdl->sdl_alen); + char mac_add[13]; + snprintf(mac_add, 13, "%02X%02X%02X%02X%02X%02X", mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]); + ///macs += mac_add; + macs.insert(mac_add); + } + } + + freeifaddrs(iflist); + } + std::string macstr; + for (auto &mac : macs) { + macstr += mac; + } + return macstr.length() > 0 ? std::to_string(hash_fn(macstr)) : "8675309"; + } +#else + std::string getDeviceId() { + return "NaD"; + } +#endif + + // connection information + int32_t socket_file_descriptor_; + + addrinfo *addr_info_; +}; + +/** + * Justification and Purpose: Provides Device Information + */ +class DeviceInfoNode : public DeviceInformation { + public: + + DeviceInfoNode(std::string name, uuid_t uuid) + : DeviceInformation(name, uuid) { + static Device device; + hostname_ = device.canonical_hostname_; + ip_ = device.ip_; + device_id_ = device.device_id_; + } + + DeviceInfoNode(const std::string &name) + : DeviceInformation(name, 0) { + static Device device; + hostname_ = device.canonical_hostname_; + ip_ = device.ip_; + device_id_ = device.device_id_; + } + + std::string getName() const { + return "deviceInfo"; + } + + std::vector<SerializedResponseNode> serialize() { + std::vector<SerializedResponseNode> serialized; + + SerializedResponseNode identifier; + identifier.name = "identifier"; + identifier.value = device_id_; + + SerializedResponseNode systemInfo; + systemInfo.name = "systemInfo"; + + SerializedResponseNode vcores; + vcores.name = "vCores"; + size_t ncpus = std::thread::hardware_concurrency(); + + vcores.value = std::to_string(ncpus); + + systemInfo.children.push_back(vcores); + + SerializedResponseNode mem; + mem.name = "physicalMem"; +#if defined(_SC_PHYS_PAGES) && defined(_SC_PAGESIZE) + size_t mema = (size_t) sysconf( _SC_PHYS_PAGES) * (size_t) sysconf( _SC_PAGESIZE); +#endif + mem.value = std::to_string(mema); + + systemInfo.children.push_back(mem); + + SerializedResponseNode arch; + arch.name = "machinearch"; + + utsname buf; + + if (uname(&buf) == -1) { + arch.value = "unknown"; + } else { + arch.value = buf.machine; + } + + systemInfo.children.push_back(arch); + + serialized.push_back(identifier); + serialized.push_back(systemInfo); + + SerializedResponseNode networkinfo; + networkinfo.name = "networkInfo"; + + SerializedResponseNode hostname; + hostname.name = "hostname"; + hostname.value = hostname_; + + SerializedResponseNode ip; + ip.name = "ipAddress"; + ip.value = ip_; + + networkinfo.children.push_back(hostname); + networkinfo.children.push_back(ip); + + serialized.push_back(networkinfo); + + return serialized; + } + + protected: + + std::string hostname_; + std::string ip_; + std::string device_id_; +}; + +REGISTER_RESOURCE(DeviceInfoNode); + +} /* namespace metrics */ +} /* namespace state */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_CORE_STATE_NODES_DEVICEINFORMATION_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/include/core/state/nodes/FlowInformation.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/state/nodes/FlowInformation.h b/libminifi/include/core/state/nodes/FlowInformation.h new file mode 100644 index 0000000..9c9874a --- /dev/null +++ b/libminifi/include/core/state/nodes/FlowInformation.h @@ -0,0 +1,273 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CORE_STATE_NODES_FLOWINFORMATION_H_ +#define LIBMINIFI_INCLUDE_CORE_STATE_NODES_FLOWINFORMATION_H_ + +#include "core/Resource.h" +#include <sys/socket.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <functional> +#include <sys/ioctl.h> +#if ( defined(__APPLE__) || defined(__MACH__) || defined(BSD)) +#include <net/if_dl.h> +#include <net/if_types.h> +#endif +#include <ifaddrs.h> +#include <net/if.h> +#include <unistd.h> +#include <netinet/in.h> +#include <string.h> +#include <sys/socket.h> +#include <netdb.h> +#include <ifaddrs.h> +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> +#include <sstream> +#include <map> +#include "../nodes/MetricsBase.h" +#include "Connection.h" +#include "io/ClientSocket.h" +#include "../nodes/StateMonitor.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace state { +namespace response { + +class FlowVersion : public DeviceInformation { + public: + + explicit FlowVersion() + : DeviceInformation("FlowVersion", nullptr) { + } + + explicit FlowVersion(const std::string ®istry_url, const std::string &bucket_id, const std::string &flow_id) + : DeviceInformation("FlowVersion", nullptr), + registry_url_(registry_url), + bucket_id_(bucket_id), + flow_id_(flow_id) { + setFlowVersion(registry_url_, bucket_id_, flow_id_); + } + + explicit FlowVersion(FlowVersion &&fv) + : DeviceInformation("FlowVersion", nullptr), + registry_url_(std::move(fv.registry_url_)), + bucket_id_(std::move(fv.bucket_id_)), + flow_id_(std::move(fv.flow_id_)) { + setFlowVersion(registry_url_, bucket_id_, flow_id_); + } + + std::string getName() const { + return "FlowVersion"; + } + + std::string getRegistryUrl() const { + return registry_url_; + } + + std::string getBucketId() const { + return bucket_id_; + } + + std::string getFlowId() const { + return flow_id_.empty() ? getUUIDStr() : flow_id_; + } + + void setFlowVersion(const std::string &url, const std::string &bucket_id, const std::string &flow_id) { + registry_url_ = url; + bucket_id_ = bucket_id; + flow_id_ = flow_id; + } + + std::vector<SerializedResponseNode> serialize() { + std::lock_guard<std::mutex> lock_guard(guard); + + std::vector<SerializedResponseNode> serialized; + SerializedResponseNode ru; + ru.name = "registryUrl"; + ru.value = registry_url_; + + SerializedResponseNode bucketid; + bucketid.name = "bucketId"; + bucketid.value = bucket_id_; + + SerializedResponseNode flowId; + flowId.name = "flowId"; + flowId.value = getFlowId(); + + serialized.push_back(ru); + serialized.push_back(bucketid); + serialized.push_back(flowId); + return serialized; + } + + FlowVersion &operator=(const FlowVersion &&fv) { + registry_url_ = (std::move(fv.registry_url_)); + bucket_id_ = (std::move(fv.bucket_id_)); + flow_id_ = (std::move(fv.flow_id_)); + setFlowVersion(registry_url_, bucket_id_, flow_id_); + return *this; + } + protected: + + std::mutex guard; + + std::string registry_url_; + std::string bucket_id_; + std::string flow_id_; +}; + +class FlowMonitor : public StateMonitorNode { + public: + + FlowMonitor(std::string name, uuid_t uuid) + : StateMonitorNode(name, uuid) { + } + + FlowMonitor(const std::string &name) + : StateMonitorNode(name, 0) { + } + + void addConnection(const std::shared_ptr<minifi::Connection> &connection) { + if (nullptr != connection) { + connections_.insert(std::make_pair(connection->getName(), connection)); + } + } + + void setFlowVersion(std::shared_ptr<state::response::FlowVersion> flow_version) { + flow_version_ = flow_version; + } + protected: + + std::shared_ptr<state::response::FlowVersion> flow_version_; + std::map<std::string, std::shared_ptr<minifi::Connection>> connections_; +}; + +/** + * Justification and Purpose: Provides flow version Information + */ +class FlowInformation : public FlowMonitor { + public: + + FlowInformation(std::string name, uuid_t uuid) + : FlowMonitor(name, uuid) { + } + + FlowInformation(const std::string &name) + : FlowMonitor(name, 0) { + } + + std::string getName() const { + return "flowInfo"; + } + + std::vector<SerializedResponseNode> serialize() { + std::vector<SerializedResponseNode> serialized; + + SerializedResponseNode fv; + fv.name = "flowId"; + fv.value = flow_version_->getFlowId(); + + SerializedResponseNode uri; + uri.name = "versionedFlowSnapshotURI"; + for (auto &entry : flow_version_->serialize()) { + uri.children.push_back(entry); + } + + serialized.push_back(fv); + serialized.push_back(uri); + + if (!connections_.empty()) { + SerializedResponseNode queues; + + queues.name = "queues"; + + for (auto &queue : connections_) { + SerializedResponseNode repoNode; + repoNode.name = queue.first; + + SerializedResponseNode queuesize; + queuesize.name = "size"; + queuesize.value = queue.second->getQueueSize(); + + SerializedResponseNode queuesizemax; + queuesizemax.name = "sizeMax"; + queuesizemax.value = queue.second->getMaxQueueSize(); + + SerializedResponseNode datasize; + datasize.name = "dataSize"; + datasize.value = queue.second->getQueueDataSize(); + SerializedResponseNode datasizemax; + + datasizemax.name = "dataSizeMax"; + datasizemax.value = queue.second->getMaxQueueDataSize(); + + repoNode.children.push_back(queuesize); + repoNode.children.push_back(queuesizemax); + repoNode.children.push_back(datasize); + repoNode.children.push_back(datasizemax); + + queues.children.push_back(repoNode); + + } + serialized.push_back(queues); + } + + if (nullptr != monitor_) { + auto components = monitor_->getAllComponents(); + SerializedResponseNode componentsNode; + + componentsNode.name = "components"; + + for (auto component : components) { + SerializedResponseNode componentNode; + + componentNode.name = component->getComponentName(); + + SerializedResponseNode componentStatusNode; + componentStatusNode.name = "running"; + componentStatusNode.value = component->isRunning(); + + componentNode.children.push_back(componentStatusNode); + + componentsNode.children.push_back(componentNode); + } + serialized.push_back(componentsNode); + } + + return serialized; + } + + protected: + +}; + +REGISTER_RESOURCE(FlowInformation); + +} /* namespace metrics */ +} /* namespace state */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_CORE_STATE_NODES_FLOWINFORMATION_H_ */
