http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/include/core/state/nodes/MetricsBase.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/state/nodes/MetricsBase.h b/libminifi/include/core/state/nodes/MetricsBase.h new file mode 100644 index 0000000..323b6cd --- /dev/null +++ b/libminifi/include/core/state/nodes/MetricsBase.h @@ -0,0 +1,264 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_METRICS_METRICSBASE_H_ +#define LIBMINIFI_INCLUDE_METRICS_METRICSBASE_H_ + +#include <vector> +#include <memory> +#include <string> +#include "core/Core.h" +#include "core/Connectable.h" +#include "core/state/Value.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace state { +namespace response { + + + +/** + * Purpose: Defines a metric. Serialization is intended to be thread safe. + */ +class ResponseNode : public core::Connectable { + public: + ResponseNode() + : core::Connectable("metric", 0), + is_array_(false) { + } + + ResponseNode(std::string name, uuid_t uuid) + : core::Connectable(name, uuid), + is_array_(false) { + } + virtual ~ResponseNode() { + + } + virtual std::string getName() const = 0; + + virtual std::vector<SerializedResponseNode> serialize() = 0; + + virtual void yield() { + } + virtual bool isRunning() { + return true; + } + virtual bool isWorkAvailable() { + return true; + } + + bool isArray() { + return is_array_; + } + + virtual bool isEmpty(){ + return false; + } + + protected: + + bool is_array_; + + void setArray(bool array) { + is_array_ = array; + } + +}; + +/** + * Purpose: Defines a metric that + */ +class DeviceInformation : public ResponseNode { + public: + DeviceInformation(std::string name, uuid_t uuid) + : ResponseNode(name, uuid) { + } +}; + +/** + * Purpose: Defines a metric that + */ +class ObjectNode : public ResponseNode { + public: + ObjectNode(std::string name, uuid_t uuid) + : ResponseNode(name, uuid) { + } + + void add_node(const std::shared_ptr<ResponseNode> &node) { + nodes_.push_back(node); + } + + virtual std::string getName() const { + return Connectable::getName(); + } + + virtual std::vector<SerializedResponseNode> serialize() { + std::vector<SerializedResponseNode> serialized; +// SerializedResponseNode outer_node; + // outer_node.name = getName(); + for (auto &node : nodes_) { + SerializedResponseNode inner_node; + inner_node.name = node->getName(); + for (auto &embed : node->serialize()) { + inner_node.children.push_back(std::move(embed)); + } + serialized.push_back(std::move(inner_node)); + } + //serialized.push_back(std::move(outer_node)); + return serialized; + } + + virtual bool isEmpty(){ + return nodes_.empty(); + } + + protected: + std::vector<std::shared_ptr<ResponseNode>> nodes_; + +}; + +/** + * Purpose: Retrieves Metrics from the defined class. The current Metric, which is a consumable for any reader of Metrics must have the ability to set metrics. + * + */ +class ResponseNodeSource { + public: + + ResponseNodeSource() { + + } + + virtual ~ResponseNodeSource() { + } + + /** + * Retrieves all metrics from this source. + * @param metric_vector -- metrics will be placed in this vector. + * @return result of the get operation. + * 0 Success + * 1 No error condition, but cannot obtain lock in timely manner. + * -1 failure + */ + virtual int16_t getResponseNodes(std::vector<std::shared_ptr<ResponseNode>> &metric_vector) = 0; + + virtual int16_t getMetricNodes(std::vector<std::shared_ptr<ResponseNode>> &metric_vector) = 0; + +}; + +/** + * Purpose: Retrieves Metrics from the defined class. The current Metric, which is a consumable for any reader of Metrics must have the ability to set metrics. + * + */ +class MetricsNodeSource : public ResponseNodeSource { + public: + + MetricsNodeSource() { + + } + + virtual ~MetricsNodeSource() { + } + + /** + * Retrieves all metrics from this source. + * @param metric_vector -- metrics will be placed in this vector. + * @return result of the get operation. + * 0 Success + * 1 No error condition, but cannot obtain lock in timely manner. + * -1 failure + */ + virtual int16_t getResponseNodes(std::vector<std::shared_ptr<ResponseNode>> &metric_vector) { + return getMetricNodes(metric_vector); + } + + virtual int16_t getMetricNodes(std::vector<std::shared_ptr<ResponseNode>> &metric_vector) = 0; + +}; + +class NodeReporter { + public: + + NodeReporter() { + + } + + virtual ~NodeReporter() { + } + + /** + * Retrieves all root response nodes from this source. + * @param metric_vector -- metrics will be placed in this vector. + * @return result of the get operation. + * 0 Success + * 1 No error condition, but cannot obtain lock in timely manner. + * -1 failure + */ + virtual int16_t getResponseNodes(std::vector<std::shared_ptr<ResponseNode>> &metric_vector, uint16_t metricsClass) = 0; + + /** + * Retrieves all metrics from this source. + * @param metric_vector -- metrics will be placed in this vector. + * @return result of the get operation. + * 0 Success + * 1 No error condition, but cannot obtain lock in timely manner. + * -1 failure + */ + virtual int16_t getMetricsNodes(std::vector<std::shared_ptr<ResponseNode>> &metric_vector, uint16_t metricsClass) = 0; + +}; + +/** + * Purpose: Sink interface for all metrics. The current Metric, which is a consumable for any reader of Metrics must have the ability to set metrics. + * + */ +class ResponseNodeSink { + public: + + virtual ~ResponseNodeSink() { + } + /** + * Setter for nodes in this sink. + * @param metrics metrics to insert into the current sink. + * @return result of the set operation. + * 0 Success + * 1 No error condition, but cannot obtain lock in timely manner. + * -1 failure + */ + virtual int16_t setResponseNodes(const std::shared_ptr<ResponseNode> &metrics) = 0; + + /** + * Setter for metrics nodes in this sink. + * @param metrics metrics to insert into the current sink. + * @return result of the set operation. + * 0 Success + * 1 No error condition, but cannot obtain lock in timely manner. + * -1 failure + */ +// virtual int16_t setMetricsNodes(const std::shared_ptr<ResponseNode> &metrics) = 0; +}; + +} /* namespace metrics */ +} /* namespace state */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_C2_METRICS_METRICSBASE_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/include/core/state/nodes/ProcessMetrics.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/state/nodes/ProcessMetrics.h b/libminifi/include/core/state/nodes/ProcessMetrics.h new file mode 100644 index 0000000..ebef0d1 --- /dev/null +++ b/libminifi/include/core/state/nodes/ProcessMetrics.h @@ -0,0 +1,103 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CORE_STATE_METRICS_PROCMETRICS_H_ +#define LIBMINIFI_INCLUDE_CORE_STATE_METRICS_PROCMETRICS_H_ + +#include "core/Resource.h" +#include <sstream> +#include <map> +#include <sys/time.h> +#include <sys/resource.h> + +#include "../nodes/DeviceInformation.h" +#include "../nodes/MetricsBase.h" +#include "Connection.h" +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace state { +namespace response { + +/** + * Justification and Purpose: Provides Connection queue metrics. Provides critical information to the + * C2 server. + * + */ +class ProcessMetrics : public ResponseNode { + public: + + ProcessMetrics(const std::string &name, uuid_t uuid) + : ResponseNode(name, uuid) { + } + + ProcessMetrics(const std::string &name) + : ResponseNode(name, 0) { + } + + ProcessMetrics() { + } + + virtual std::string getName() const { + return "ProcessMetrics"; + } + + std::vector<SerializedResponseNode> serialize() { + std::vector<SerializedResponseNode> serialized; + + struct rusage my_usage; + getrusage(RUSAGE_SELF, &my_usage); + + SerializedResponseNode memory; + memory.name = "MemoryMetrics"; + + SerializedResponseNode maxrss; + maxrss.name = "maxrss"; + + maxrss.value = (uint64_t)my_usage.ru_maxrss; + + memory.children.push_back(maxrss); + serialized.push_back(memory); + + SerializedResponseNode cpu; + cpu.name = "CpuMetrics"; + SerializedResponseNode ics; + ics.name = "involcs"; + + ics.value = (uint64_t)my_usage.ru_nivcsw; + + cpu.children.push_back(ics); + serialized.push_back(cpu); + + return serialized; + } + + protected: + +}; + +REGISTER_RESOURCE(ProcessMetrics); + +} /* namespace metrics */ +} /* namespace state */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_CORE_STATE_METRICS_QUEUEMETRICS_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/include/core/state/nodes/QueueMetrics.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/state/nodes/QueueMetrics.h b/libminifi/include/core/state/nodes/QueueMetrics.h new file mode 100644 index 0000000..b4c0b64 --- /dev/null +++ b/libminifi/include/core/state/nodes/QueueMetrics.h @@ -0,0 +1,106 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CORE_STATE_NODES_QUEUEMETRICS_H_ +#define LIBMINIFI_INCLUDE_CORE_STATE_NODES_QUEUEMETRICS_H_ + +#include <sstream> +#include <map> + +#include "../nodes/MetricsBase.h" +#include "Connection.h" +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace state { +namespace response { + +/** + * Justification and Purpose: Provides Connection queue metrics. Provides critical information to the + * C2 server. + * + */ +class QueueMetrics : public ResponseNode { + public: + + QueueMetrics(const std::string &name, uuid_t uuid) + : ResponseNode(name, uuid) { + } + + QueueMetrics(const std::string &name) + : ResponseNode(name, 0) { + } + + QueueMetrics() + : ResponseNode("QueueMetrics", 0) { + } + + virtual std::string getName() const{ + return "QueueMetrics"; + } + + void addConnection(const std::shared_ptr<minifi::Connection> &connection) { + if (nullptr != connection) { + connections.insert(std::make_pair(connection->getName(), connection)); + } + } + + std::vector<SerializedResponseNode> serialize() { + std::vector<SerializedResponseNode> serialized; + for (auto conn : connections) { + auto connection = conn.second; + SerializedResponseNode parent; + parent.name = connection->getName(); + SerializedResponseNode datasize; + datasize.name = "datasize"; + datasize.value = std::to_string(connection->getQueueDataSize()); + + SerializedResponseNode datasizemax; + datasizemax.name = "datasizemax"; + datasizemax.value = std::to_string(connection->getMaxQueueDataSize()); + + SerializedResponseNode queuesize; + queuesize.name = "queued"; + queuesize.value = std::to_string(connection->getQueueSize()); + + SerializedResponseNode queuesizemax; + queuesizemax.name = "queuedmax"; + queuesizemax.value = std::to_string(connection->getMaxQueueSize()); + + parent.children.push_back(datasize); + parent.children.push_back(datasizemax); + parent.children.push_back(queuesize); + parent.children.push_back(queuesizemax); + + serialized.push_back(parent); + } + return serialized; + } + + protected: + std::map<std::string, std::shared_ptr<minifi::Connection>> connections; +}; + +} /* namespace metrics */ +} /* namespace state */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_CORE_STATE_NODES_QUEUEMETRICS_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/include/core/state/nodes/RepositoryMetrics.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/state/nodes/RepositoryMetrics.h b/libminifi/include/core/state/nodes/RepositoryMetrics.h new file mode 100644 index 0000000..5c2db2a --- /dev/null +++ b/libminifi/include/core/state/nodes/RepositoryMetrics.h @@ -0,0 +1,101 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CORE_STATE_NODES_REPOSITORYMETRICS_H_ +#define LIBMINIFI_INCLUDE_CORE_STATE_NODES_REPOSITORYMETRICS_H_ + +#include <sstream> +#include <map> + +#include "../nodes/MetricsBase.h" +#include "Connection.h" +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace state { +namespace response { + +/** + * Justification and Purpose: Provides repository metrics. Provides critical information to the + * C2 server. + * + */ +class RepositoryMetrics : public ResponseNode { + public: + + RepositoryMetrics(const std::string &name, uuid_t uuid) + : ResponseNode(name, uuid) { + } + + RepositoryMetrics(const std::string &name) + : ResponseNode(name, 0) { + } + + RepositoryMetrics() + : ResponseNode("RepositoryMetrics", 0) { + } + + virtual std::string getName() const { + return "RepositoryMetrics"; + } + + void addRepository(const std::shared_ptr<core::Repository> &repo) { + if (nullptr != repo) { + repositories.insert(std::make_pair(repo->getName(), repo)); + } + } + + std::vector<SerializedResponseNode> serialize() { + std::vector<SerializedResponseNode> serialized; + for (auto conn : repositories) { + auto repo = conn.second; + SerializedResponseNode parent; + parent.name = repo->getName(); + SerializedResponseNode datasize; + datasize.name = "running"; + datasize.value = repo->isRunning(); + + SerializedResponseNode datasizemax; + datasizemax.name = "full"; + datasizemax.value = repo->isFull(); + + SerializedResponseNode queuesize; + queuesize.name = "size"; + queuesize.value = std::to_string(repo->getRepoSize()); + + parent.children.push_back(datasize); + parent.children.push_back(datasizemax); + parent.children.push_back(queuesize); + + serialized.push_back(parent); + } + return serialized; + } + + protected: + std::map<std::string, std::shared_ptr<core::Repository>> repositories; +}; + +} /* namespace metrics */ +} /* namespace state */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_CORE_STATE_METRICS_RepositoryMetrics_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/include/core/state/nodes/StateMonitor.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/state/nodes/StateMonitor.h b/libminifi/include/core/state/nodes/StateMonitor.h new file mode 100644 index 0000000..1bf3e23 --- /dev/null +++ b/libminifi/include/core/state/nodes/StateMonitor.h @@ -0,0 +1,87 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CORE_STATE_NODES_STATEMONITOR_H_ +#define LIBMINIFI_INCLUDE_CORE_STATE_NODES_STATEMONITOR_H_ + +#include "core/Resource.h" +#include <sys/socket.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <functional> +#include <sys/ioctl.h> +#if ( defined(__APPLE__) || defined(__MACH__) || defined(BSD)) +#include <net/if_dl.h> +#include <net/if_types.h> +#endif +#include <ifaddrs.h> +#include <net/if.h> +#include <unistd.h> +#include <netinet/in.h> +#include <string.h> +#include <sys/socket.h> +#include <netdb.h> +#include <ifaddrs.h> +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> +#include <sstream> +#include <map> +#include "../nodes/MetricsBase.h" +#include "Connection.h" +#include "io/ClientSocket.h" +#include "agent/agent_version.h" +#include "agent/build_description.h" +#include "core/ClassLoader.h" +#include "core/state/UpdateController.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace state { +namespace response { + +class StateMonitorNode : public DeviceInformation { + public: + + StateMonitorNode(std::string name, uuid_t uuid) + : DeviceInformation(name, uuid), + monitor_(nullptr) { + + } + + StateMonitorNode(const std::string &name) + : DeviceInformation(name, 0), + monitor_(nullptr) { + } + + void setStateMonitor(const std::shared_ptr<state::StateMonitor> &monitor) { + monitor_ = monitor; + } + protected: + std::shared_ptr<state::StateMonitor> monitor_; +}; + +} /* namespace metrics */ +} /* namespace state */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_CORE_STATE_NODES_STATEMONITOR_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/include/core/state/nodes/SystemMetrics.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/state/nodes/SystemMetrics.h b/libminifi/include/core/state/nodes/SystemMetrics.h new file mode 100644 index 0000000..a7a3c9c --- /dev/null +++ b/libminifi/include/core/state/nodes/SystemMetrics.h @@ -0,0 +1,117 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CORE_STATE_METRICS_SYSMETRICS_H_ +#define LIBMINIFI_INCLUDE_CORE_STATE_METRICS_SYSMETRICS_H_ + +#include "core/Resource.h" +#include <sstream> +#include <map> +#ifndef _WIN32 +#include <sys/utsname.h> +#endif +#include "../nodes/MetricsBase.h" +#include "Connection.h" +#include "../nodes/DeviceInformation.h" +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace state { +namespace response { + +/** + * Justification and Purpose: Provides system information, including critical device information. + * + */ +class SystemInformation : public DeviceInformation { + public: + + SystemInformation(const std::string &name, uuid_t uuid) + : DeviceInformation(name, uuid) { + } + + SystemInformation(const std::string &name) + : DeviceInformation(name, 0) { + } + + SystemInformation() + : DeviceInformation("systeminfo", 0) { + } + + virtual std::string getName() const { + return "systeminfo"; + } + + std::vector<SerializedResponseNode> serialize() { + std::vector<SerializedResponseNode> serialized; + SerializedResponseNode identifier; + identifier.name = "identifier"; + identifier.value = "identifier"; + + SerializedResponseNode systemInfo; + systemInfo.name = "systemInfo"; + + SerializedResponseNode vcores; + vcores.name = "vCores"; + size_t ncpus = std::thread::hardware_concurrency(); + + vcores.value = (uint32_t)ncpus; + + systemInfo.children.push_back(vcores); + + SerializedResponseNode mem; + mem.name = "physicalMem"; +#if defined(_SC_PHYS_PAGES) && defined(_SC_PAGESIZE) + size_t mema = (size_t) sysconf( _SC_PHYS_PAGES) * (size_t) sysconf( _SC_PAGESIZE); +#endif + mem.value = (uint32_t)mema; + + systemInfo.children.push_back(mem); + + SerializedResponseNode arch; + arch.name = "machinearch"; + + utsname buf; + + if (uname(&buf) == -1) { + arch.value = "unknown"; + } else { + arch.value = std::string(buf.machine); + } + + systemInfo.children.push_back(arch); + + serialized.push_back(identifier); + serialized.push_back(systemInfo); + + return serialized; + } + + protected: + +}; + +REGISTER_RESOURCE(SystemInformation); +} /* namespace metrics */ +} /* namespace state */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_CORE_STATE_METRICS_QUEUEMETRICS_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/include/core/state/nodes/TreeUpdateListener.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/state/nodes/TreeUpdateListener.h b/libminifi/include/core/state/nodes/TreeUpdateListener.h new file mode 100644 index 0000000..4b7e237 --- /dev/null +++ b/libminifi/include/core/state/nodes/TreeUpdateListener.h @@ -0,0 +1,128 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_C2_METRICS_H_ +#define LIBMINIFI_INCLUDE_C2_METRICS_H_ + +#include <vector> + +#include "../nodes/MetricsBase.h" +#include "core/state/UpdateController.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace state { +namespace response { + +/** + * Purpose: Class that will represent the metrics updates, which can be performed asynchronously. + */ +class MetricsUpdate : public Update { + public: + MetricsUpdate(UpdateStatus status) + : Update(status) { + + } + virtual bool validate() { + return true; + } +}; + +class OperationWatcher : public utils::AfterExecute<Update> { + public: + explicit OperationWatcher(std::atomic<bool> *running) + : running_(running) { + } + + explicit OperationWatcher(OperationWatcher && other) + : running_(std::move(other.running_)) { + + } + + ~OperationWatcher() { + + } + + virtual bool isFinished(const Update &result) { + if (result.getStatus().getState() == UpdateState::READ_COMPLETE && running_) { + return false; + } else { + return true; + } + } + virtual bool isCancelled(const Update &result) { + return false; + } + + protected: + std::atomic<bool> *running_; + +}; + +class TreeUpdateListener { + public: + TreeUpdateListener(const std::shared_ptr<response::NodeReporter> &source, const std::shared_ptr<response::ResponseNodeSink> &sink) + : running_(true), + source_(source), + sink_(sink){ + + function_ = [&]() { + while(running_) { + std::vector<std::shared_ptr<response::ResponseNode>> metric_vector; + // simple pass through for the metrics + if (nullptr != source_ && nullptr != sink_) { + source_->getResponseNodes(metric_vector,0); + for(auto metric : metric_vector) { + sink_->setResponseNodes(metric); + } + } + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + } + return MetricsUpdate(UpdateState::READ_COMPLETE); + }; + } + + void stop() { + running_ = false; + } + + std::function<Update()> &getFunction() { + return function_; + } + + std::future<Update> &getFuture() { + return future_; + } + + private: + std::function<Update()> function_; + std::future<Update> future_; + std::atomic<bool> running_; + std::shared_ptr<response::NodeReporter> source_; + std::shared_ptr<response::ResponseNodeSink> sink_; +}; + +} /* namespace metrics */ +} /* namespace state */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_C2_METRICS_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/include/processors/GetFile.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/GetFile.h b/libminifi/include/processors/GetFile.h index 07d3939..a9918c6 100644 --- a/libminifi/include/processors/GetFile.h +++ b/libminifi/include/processors/GetFile.h @@ -19,13 +19,14 @@ #define __GET_FILE_H__ #include <atomic> + +#include "../core/state/nodes/MetricsBase.h" #include "FlowFileRecord.h" #include "core/Processor.h" #include "core/ProcessSession.h" #include "core/Core.h" #include "core/Resource.h" #include "core/logging/LoggerConfiguration.h" -#include "core/state/metrics/MetricsBase.h" namespace org { namespace apache { @@ -45,17 +46,17 @@ struct GetFileRequest { std::string fileFilter = "[^\\.].*"; }; -class GetFileMetrics : public state::metrics::Metrics { +class GetFileMetrics : public state::response::ResponseNode { public: GetFileMetrics() - : state::metrics::Metrics("GetFileMetrics", 0) { + : state::response::ResponseNode("GetFileMetrics", 0) { iterations_ = 0; accepted_files_ = 0; input_bytes_ = 0; } GetFileMetrics(std::string name, uuid_t uuid) - : state::metrics::Metrics(name, uuid) { + : state::response::ResponseNode(name, uuid) { iterations_ = 0; accepted_files_ = 0; input_bytes_ = 0; @@ -67,24 +68,24 @@ class GetFileMetrics : public state::metrics::Metrics { return core::Connectable::getName(); } - virtual std::vector<state::metrics::MetricResponse> serialize() { - std::vector<state::metrics::MetricResponse> resp; + virtual std::vector<state::response::SerializedResponseNode> serialize() { + std::vector<state::response::SerializedResponseNode> resp; - state::metrics::MetricResponse iter; + state::response::SerializedResponseNode iter; iter.name = "OnTriggerInvocations"; - iter.value = std::to_string(iterations_.load()); + iter.value = (uint32_t)iterations_.load(); resp.push_back(iter); - state::metrics::MetricResponse accepted_files; + state::response::SerializedResponseNode accepted_files; accepted_files.name = "AcceptedFiles"; - accepted_files.value = std::to_string(accepted_files_.load()); + accepted_files.value = (uint32_t)accepted_files_.load(); resp.push_back(accepted_files); - state::metrics::MetricResponse input_bytes; + state::response::SerializedResponseNode input_bytes; input_bytes.name = "InputBytes"; - input_bytes.value = std::to_string(input_bytes_.load()); + input_bytes.value = (uint32_t)input_bytes_.load(); resp.push_back(input_bytes); @@ -101,7 +102,7 @@ class GetFileMetrics : public state::metrics::Metrics { }; // GetFile Class -class GetFile : public core::Processor, public state::metrics::MetricsSource { +class GetFile : public core::Processor, public state::response::MetricsNodeSource { public: // Constructor /*! @@ -156,7 +157,7 @@ class GetFile : public core::Processor, public state::metrics::MetricsSource { */ void performListing(std::string dir, const GetFileRequest &request); - int16_t getMetrics(std::vector<std::shared_ptr<state::metrics::Metrics>> &metric_vector); + int16_t getMetricNodes(std::vector<std::shared_ptr<state::response::ResponseNode>> &metric_vector); protected: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/include/processors/GetTCP.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/GetTCP.h b/libminifi/include/processors/GetTCP.h index 4d331a1..741d190 100644 --- a/libminifi/include/processors/GetTCP.h +++ b/libminifi/include/processors/GetTCP.h @@ -19,6 +19,8 @@ #define __GET_TCP_H__ #include <atomic> + +#include "../core/state/nodes/MetricsBase.h" #include "FlowFileRecord.h" #include "core/Processor.h" #include "core/ProcessSession.h" @@ -27,7 +29,6 @@ #include "concurrentqueue.h" #include "utils/ThreadPool.h" #include "core/logging/LoggerConfiguration.h" -#include "core/state/metrics/MetricsBase.h" #include "controllers/SSLContextService.h" namespace org { @@ -115,17 +116,17 @@ class DataHandler { }; -class GetTCPMetrics : public state::metrics::Metrics { +class GetTCPMetrics : public state::response::ResponseNode { public: GetTCPMetrics() - : state::metrics::Metrics("GetTCPMetrics", 0) { + : state::response::ResponseNode("GetTCPMetrics", 0) { iterations_ = 0; accepted_files_ = 0; input_bytes_ = 0; } GetTCPMetrics(std::string name, uuid_t uuid) - : state::metrics::Metrics(name, uuid) { + : state::response::ResponseNode(name, uuid) { iterations_ = 0; accepted_files_ = 0; input_bytes_ = 0; @@ -137,24 +138,24 @@ class GetTCPMetrics : public state::metrics::Metrics { return core::Connectable::getName(); } - virtual std::vector<state::metrics::MetricResponse> serialize() { - std::vector<state::metrics::MetricResponse> resp; + virtual std::vector<state::response::SerializedResponseNode> serialize() { + std::vector<state::response::SerializedResponseNode> resp; - state::metrics::MetricResponse iter; + state::response::SerializedResponseNode iter; iter.name = "OnTriggerInvocations"; - iter.value = std::to_string(iterations_.load()); + iter.value = (uint32_t)iterations_.load(); resp.push_back(iter); - state::metrics::MetricResponse accepted_files; + state::response::SerializedResponseNode accepted_files; accepted_files.name = "AcceptedFiles"; - accepted_files.value = std::to_string(accepted_files_.load()); + accepted_files.value = (uint32_t)accepted_files_.load(); resp.push_back(accepted_files); - state::metrics::MetricResponse input_bytes; + state::response::SerializedResponseNode input_bytes; input_bytes.name = "InputBytes"; - input_bytes.value = std::to_string(input_bytes_.load()); + input_bytes.value = (uint32_t)input_bytes_.load(); resp.push_back(input_bytes); @@ -171,7 +172,7 @@ class GetTCPMetrics : public state::metrics::Metrics { }; // GetTCP Class -class GetTCP : public core::Processor, public state::metrics::MetricsSource { +class GetTCP : public core::Processor, public state::response::MetricsNodeSource { public: // Constructor /*! @@ -236,7 +237,7 @@ class GetTCP : public core::Processor, public state::metrics::MetricsSource { // Initialize, over write by NiFi GetTCP virtual void initialize(void); - int16_t getMetrics(std::vector<std::shared_ptr<state::metrics::Metrics>> &metric_vector); + int16_t getMetricNodes(std::vector<std::shared_ptr<state::response::ResponseNode>> &metric_vector); protected: @@ -287,6 +288,7 @@ class GetTCP : public core::Processor, public state::metrics::MetricsSource { REGISTER_RESOURCE(GetTCP); + } /* namespace processors */ } /* namespace minifi */ } /* namespace nifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/include/properties/Configure.h ---------------------------------------------------------------------- diff --git a/libminifi/include/properties/Configure.h b/libminifi/include/properties/Configure.h index 1e17d53..66a9351 100644 --- a/libminifi/include/properties/Configure.h +++ b/libminifi/include/properties/Configure.h @@ -33,6 +33,7 @@ class Configure : public Properties { static const char *nifi_default_directory; static const char *nifi_c2_enable; static const char *nifi_flow_configuration_file; + static const char *nifi_flow_configuration_file_backup_update; static const char *nifi_flow_engine_threads; static const char *nifi_administrative_yield_duration; static const char *nifi_bored_yield_duration; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/include/utils/ByteArrayCallback.h ---------------------------------------------------------------------- diff --git a/libminifi/include/utils/ByteArrayCallback.h b/libminifi/include/utils/ByteArrayCallback.h index 9ccca48..653159c 100644 --- a/libminifi/include/utils/ByteArrayCallback.h +++ b/libminifi/include/utils/ByteArrayCallback.h @@ -109,11 +109,11 @@ class ByteOutputCallback : public OutputStreamCallback { virtual int64_t process(std::shared_ptr<io::BaseStream> stream); - const std::vector<char> to_string(); + virtual const std::vector<char> to_string(); - void close(); + virtual void close(); - size_t getSize(); + virtual size_t getSize(); bool waitingOps(); @@ -121,7 +121,7 @@ class ByteOutputCallback : public OutputStreamCallback { size_t readFully(char *buffer, size_t size); - private: + protected: inline void write_and_notify(char *data, size_t size); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/include/utils/FileOutputCallback.h ---------------------------------------------------------------------- diff --git a/libminifi/include/utils/FileOutputCallback.h b/libminifi/include/utils/FileOutputCallback.h new file mode 100644 index 0000000..df2637b --- /dev/null +++ b/libminifi/include/utils/FileOutputCallback.h @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_UTILS_FILEOUTPUTCALLBACK_H_ +#define LIBMINIFI_INCLUDE_UTILS_FILEOUTPUTCALLBACK_H_ + +#include <fstream> +#include "concurrentqueue.h" +#include "FlowFileRecord.h" +#include "ByteArrayCallback.h" +#include "core/logging/LoggerConfiguration.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + +/** + * General vector based uint8_t callback. + * + * While calls are thread safe, the class is intended to have + * a single consumer. + */ +class FileOutputCallback : public ByteOutputCallback { + public: + FileOutputCallback() = delete; + + explicit FileOutputCallback(std::string file, bool wait_on_read=false) + : ByteOutputCallback(INT_MAX), file_(file), file_stream_(file), + logger_(logging::LoggerFactory<FileOutputCallback>::getLogger()) { + } + + virtual ~FileOutputCallback() { + + } + + virtual int64_t process(std::shared_ptr<io::BaseStream> stream) override; + + virtual const std::vector<char> to_string() override; + + virtual void close() override; + + virtual size_t getSize() override; + + virtual void write(char *data, size_t size) override; + + private: + + std::string file_; + + std::ofstream file_stream_; + + std::shared_ptr<logging::Logger> logger_; + +}; + +} /* namespace utils */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_UTILS_BYTEARRAYCALLBACK_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/include/utils/HTTPClient.h ---------------------------------------------------------------------- diff --git a/libminifi/include/utils/HTTPClient.h b/libminifi/include/utils/HTTPClient.h index 69674be..553dcea 100644 --- a/libminifi/include/utils/HTTPClient.h +++ b/libminifi/include/utils/HTTPClient.h @@ -294,6 +294,7 @@ class BaseHTTPClient { extern std::string get_token(utils::BaseHTTPClient *client, std::string username, std::string password); extern void parse_url(std::string *url, std::string *host, int *port, std::string *protocol); +extern void parse_url(std::string *url, std::string *host, int *port, std::string *protocol, std::string *path, std::string *query); } /* namespace utils */ } /* namespace minifi */ } /* namespace nifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/include/utils/file/FileUtils.h ---------------------------------------------------------------------- diff --git a/libminifi/include/utils/file/FileUtils.h b/libminifi/include/utils/file/FileUtils.h index fd14e3e..796ee1a 100644 --- a/libminifi/include/utils/file/FileUtils.h +++ b/libminifi/include/utils/file/FileUtils.h @@ -79,6 +79,15 @@ class FileUtils { #endif return -1; } + + static int copy_file(const std::string &path_from, const std::string dest_path) { + std::ifstream src(path_from, std::ios::binary); + if (!src.is_open()) + return -1; + std::ofstream dest(dest_path, std::ios::binary); + dest << src.rdbuf(); + return 0; + } }; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/src/Configure.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/Configure.cpp b/libminifi/src/Configure.cpp index be35d6b..1d97edd 100644 --- a/libminifi/src/Configure.cpp +++ b/libminifi/src/Configure.cpp @@ -25,6 +25,7 @@ namespace minifi { const char *Configure::nifi_default_directory = "nifi.default.directory"; const char *Configure::nifi_c2_enable = "nifi.c2.enable"; const char *Configure::nifi_flow_configuration_file = "nifi.flow.configuration.file"; +const char *Configure::nifi_flow_configuration_file_backup_update = "nifi.flow.configuration.backup.on.update"; const char *Configure::nifi_flow_engine_threads = "nifi.flow.engine.threads"; const char *Configure::nifi_administrative_yield_duration = "nifi.administrative.yield.duration"; const char *Configure::nifi_bored_yield_duration = "nifi.bored.yield.duration"; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/src/FlowController.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp index e810e0c..1414765 100644 --- a/libminifi/src/FlowController.cpp +++ b/libminifi/src/FlowController.cpp @@ -33,11 +33,15 @@ #include <utility> #include <memory> #include <string> -#include "core/state/metrics/QueueMetrics.h" -#include "core/state/metrics/DeviceInformation.h" -#include "core/state/metrics/SystemMetrics.h" -#include "core/state/metrics/ProcessMetrics.h" -#include "core/state/metrics/RepositoryMetrics.h" + +#include "core/state/nodes/AgentInformation.h" +#include "core/state/nodes/BuildInformation.h" +#include "core/state/nodes/DeviceInformation.h" +#include "core/state/nodes/FlowInformation.h" +#include "core/state/nodes/ProcessMetrics.h" +#include "core/state/nodes/QueueMetrics.h" +#include "core/state/nodes/RepositoryMetrics.h" +#include "core/state/nodes/SystemMetrics.h" #include "core/state/ProcessorController.h" #include "yaml-cpp/yaml.h" #include "c2/C2Agent.h" @@ -50,6 +54,7 @@ #include "core/controller/ControllerServiceProvider.h" #include "core/logging/LoggerConfiguration.h" #include "core/Connectable.h" +#include "utils/HTTPClient.h" namespace org { namespace apache { @@ -67,6 +72,8 @@ FlowController::FlowController(std::shared_ptr<core::Repository> provenance_repo max_timer_driven_threads_(0), max_event_driven_threads_(0), running_(false), + updating_(false), + flow_version_(nullptr), c2_enabled_(true), initialized_(false), provenance_repo_(provenance_repo), @@ -166,13 +173,19 @@ bool FlowController::applyConfiguration(const std::string &configurePayload) { logger_->log_info("Starting to reload Flow Controller with flow control name %s, version %d", newRoot->getName(), newRoot->getVersion()); + updating_ = true; + std::lock_guard<std::recursive_mutex> flow_lock(mutex_); stop(true); waitUnload(30000); this->root_ = std::move(newRoot); loadFlowRepo(); initialized_ = true; - return start(); + bool started = start(); + + updating_ = false; + + return started; } int16_t FlowController::stop(bool force, uint64_t timeToWait) { @@ -332,53 +345,110 @@ void FlowController::initializeC2() { if (!c2_enabled_) { return; } - if (!c2_initialized_) { - std::string c2_enable_str; - - if (configuration_->get(Configure::nifi_c2_enable, c2_enable_str)) { - bool enable_c2 = true; - utils::StringUtils::StringToBool(c2_enable_str, enable_c2); - c2_enabled_ = enable_c2; - if (!c2_enabled_) { - return; - } - } else { - c2_enabled_ = true; - } - state::StateManager::initialize(); + if (c2_initialized_) + return; - std::shared_ptr<c2::C2Agent> agent = std::make_shared<c2::C2Agent>(std::dynamic_pointer_cast<FlowController>(shared_from_this()), std::dynamic_pointer_cast<FlowController>(shared_from_this()), - configuration_); - registerUpdateListener(agent, agent->getHeartBeatDelay()); + std::string c2_enable_str; - state::StateManager::startMetrics(agent->getHeartBeatDelay()); - } - if (!c2_enabled_) { - return; + if (configuration_->get(Configure::nifi_c2_enable, c2_enable_str)) { + bool enable_c2 = true; + utils::StringUtils::StringToBool(c2_enable_str, enable_c2); + c2_enabled_ = enable_c2; + if (!c2_enabled_) { + return; + } + } else { + c2_enabled_ = true; } + state::StateManager::initialize(); + + std::shared_ptr<c2::C2Agent> agent = std::make_shared<c2::C2Agent>(std::dynamic_pointer_cast<FlowController>(shared_from_this()), std::dynamic_pointer_cast<FlowController>(shared_from_this()), + configuration_); + registerUpdateListener(agent, agent->getHeartBeatDelay()); + + state::StateManager::startMetrics(agent->getHeartBeatDelay()); c2_initialized_ = true; - metrics_.clear(); + flow_version_ = std::make_shared<state::response::FlowVersion>("", "default", ""); + device_information_.clear(); component_metrics_.clear(); component_metrics_by_id_.clear(); std::string class_csv; if (root_ != nullptr) { - std::shared_ptr<state::metrics::QueueMetrics> queueMetrics = std::make_shared<state::metrics::QueueMetrics>(); + std::shared_ptr<state::response::QueueMetrics> queueMetrics = std::make_shared<state::response::QueueMetrics>(); std::map<std::string, std::shared_ptr<Connection>> connections; root_->getConnections(connections); for (auto con : connections) { queueMetrics->addConnection(con.second); } - metrics_[queueMetrics->getName()] = queueMetrics; + device_information_[queueMetrics->getName()] = queueMetrics; - std::shared_ptr<state::metrics::RepositoryMetrics> repoMetrics = std::make_shared<state::metrics::RepositoryMetrics>(); + std::shared_ptr<state::response::RepositoryMetrics> repoMetrics = std::make_shared<state::response::RepositoryMetrics>(); repoMetrics->addRepository(provenance_repo_); repoMetrics->addRepository(flow_file_repo_); - metrics_[repoMetrics->getName()] = repoMetrics; + device_information_[repoMetrics->getName()] = repoMetrics; + } + + if (configuration_->get("nifi.c2.root.classes", class_csv)) { + std::vector<std::string> classes = utils::StringUtils::split(class_csv, ","); + + for (std::string clazz : classes) { + auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate(clazz, clazz); + + if (nullptr == ptr) { + logger_->log_error("No metric defined for %s", clazz); + continue; + } + + std::shared_ptr<state::response::ResponseNode> processor = std::static_pointer_cast<state::response::ResponseNode>(ptr); + + auto identifier = std::dynamic_pointer_cast<state::response::AgentIdentifier>(processor); + + if (identifier != nullptr) { + std::string identifier_str; + if (configuration_->get("nifi.c2.agent.identifier", identifier_str) && !identifier_str.empty()) { + identifier->setIdentifier(identifier_str); + } else { + // set to the flow controller's identifier + identifier->setIdentifier(uuidStr_); + } + + std::string class_str; + if (configuration_->get("nifi.c2.agent.class", class_str) && !class_str.empty()) { + identifier->setAgentClass(class_str); + } else { + // set to the flow controller's identifier + identifier->setAgentClass("default"); + } + } + + auto monitor = std::dynamic_pointer_cast<state::response::AgentMonitor>(processor); + if (monitor != nullptr) { + monitor->addRepository(provenance_repo_); + monitor->addRepository(flow_file_repo_); + monitor->setStateMonitor(shared_from_this()); + } + + auto flowMonitor = std::dynamic_pointer_cast<state::response::FlowMonitor>(processor); + std::map<std::string, std::shared_ptr<Connection>> connections; + root_->getConnections(connections); + if (flowMonitor != nullptr) { + for (auto con : connections) { + flowMonitor->addConnection(con.second); + } + flowMonitor->setStateMonitor(shared_from_this()); + + flowMonitor->setFlowVersion(flow_version_); + } + + std::lock_guard<std::mutex> lock(metrics_mutex_); + + root_response_nodes_[processor->getName()] = processor; + } } if (configuration_->get("nifi.flow.metrics.classes", class_csv)) { @@ -392,11 +462,11 @@ void FlowController::initializeC2() { continue; } - std::shared_ptr<state::metrics::Metrics> processor = std::static_pointer_cast<state::metrics::Metrics>(ptr); + std::shared_ptr<state::response::ResponseNode> processor = std::static_pointer_cast<state::response::ResponseNode>(ptr); std::lock_guard<std::mutex> lock(metrics_mutex_); - metrics_[processor->getName()] = processor; + device_information_[processor->getName()] = processor; } } @@ -406,11 +476,11 @@ void FlowController::initializeC2() { if (root_ != nullptr) { root_->getAllProcessors(processors); for (const auto &processor : processors) { - auto rep = std::dynamic_pointer_cast<state::metrics::MetricsSource>(processor); + auto rep = std::dynamic_pointer_cast<state::response::ResponseNodeSource>(processor); // we have a metrics source. if (nullptr != rep) { - std::vector<std::shared_ptr<state::metrics::Metrics>> metric_vector; - rep->getMetrics(metric_vector); + std::vector<std::shared_ptr<state::response::ResponseNode>> metric_vector; + rep->getResponseNodes(metric_vector); for (auto metric : metric_vector) { component_metrics_[metric->getName()] = metric; } @@ -434,7 +504,7 @@ void FlowController::initializeC2() { std::lock_guard<std::mutex> lock(metrics_mutex_); auto ret = component_metrics_[clazz]; if (nullptr == ret) { - ret = metrics_[clazz]; + ret = device_information_[clazz]; } if (nullptr == ret) { logger_->log_error("No metric defined for %s", clazz); @@ -448,7 +518,146 @@ void FlowController::initializeC2() { } } } + + loadC2ResponseConfiguration(); +} + +void FlowController::loadC2ResponseConfiguration(const std::string &prefix) { + std::string class_definitions; + + if (configuration_->get(prefix, class_definitions)) { + std::vector<std::string> classes = utils::StringUtils::split(class_definitions, ","); + + for (std::string metricsClass : classes) { + try { + std::stringstream option; + option << prefix << "." << metricsClass; + + std::stringstream classOption; + classOption << option.str() << ".classes"; + + std::stringstream nameOption; + nameOption << option.str() << ".name"; + std::string name; + + if (configuration_->get(nameOption.str(), name)) { + std::shared_ptr<state::response::ResponseNode> new_node = std::make_shared<state::response::ObjectNode>(name, nullptr); + + if (configuration_->get(classOption.str(), class_definitions)) { + std::vector<std::string> classes = utils::StringUtils::split(class_definitions, ","); + + for (std::string clazz : classes) { + std::lock_guard<std::mutex> lock(metrics_mutex_); + + // instantiate the object + auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate(clazz, clazz); + + if (nullptr == ptr) { + auto metric = component_metrics_.find(clazz); + if (metric != component_metrics_.end()) { + ptr = metric->second; + } else { + logger_->log_error("No metric defined for %s", clazz); + continue; + } + } + + auto node = std::dynamic_pointer_cast<state::response::ResponseNode>(ptr); + + std::static_pointer_cast<state::response::ObjectNode>(new_node)->add_node(node); + } + + } else { + std::stringstream optionName; + optionName << option.str() << "." << name; + auto node = loadC2ResponseConfiguration(optionName.str(), new_node); +// if (node != nullptr && new_node != node) + // std::static_pointer_cast<state::response::ObjectNode>(new_node)->add_node(node); + } + + root_response_nodes_[name] = new_node; + } + } catch (...) { + logger_->log_error("Could not create metrics class %s", metricsClass); + } + } + } } + +std::shared_ptr<state::response::ResponseNode> FlowController::loadC2ResponseConfiguration(const std::string &prefix, std::shared_ptr<state::response::ResponseNode> prev_node) { + std::string class_definitions; + if (configuration_->get(prefix, class_definitions)) { + std::vector<std::string> classes = utils::StringUtils::split(class_definitions, ","); + + for (std::string metricsClass : classes) { + try { + std::stringstream option; + option << prefix << "." << metricsClass; + + std::stringstream classOption; + classOption << option.str() << ".classes"; + + std::stringstream nameOption; + nameOption << option.str() << ".name"; + std::string name; + + if (configuration_->get(nameOption.str(), name)) { + std::shared_ptr<state::response::ResponseNode> new_node = std::make_shared<state::response::ObjectNode>(name, nullptr); + if (name.find(",") != std::string::npos) { + std::vector<std::string> sub_classes = utils::StringUtils::split(name, ","); + for (std::string subClassStr : classes) { + auto node = loadC2ResponseConfiguration(subClassStr, prev_node); + if (node != nullptr) + std::static_pointer_cast<state::response::ObjectNode>(prev_node)->add_node(node); + } + + } else { + if (configuration_->get(classOption.str(), class_definitions)) { + std::vector<std::string> classes = utils::StringUtils::split(class_definitions, ","); + + for (std::string clazz : classes) { + std::lock_guard<std::mutex> lock(metrics_mutex_); + + // instantiate the object + auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate(clazz, clazz); + + if (nullptr == ptr) { + auto metric = component_metrics_.find(clazz); + if (metric != component_metrics_.end()) { + ptr = metric->second; + } else { + logger_->log_error("No metric defined for %s", clazz); + continue; + } + } + + auto node = std::dynamic_pointer_cast<state::response::ResponseNode>(ptr); + + std::static_pointer_cast<state::response::ObjectNode>(new_node)->add_node(node); + } + if (!new_node->isEmpty()) + std::static_pointer_cast<state::response::ObjectNode>(prev_node)->add_node(new_node); + + } else { + std::stringstream optionName; + optionName << option.str() << "." << name; + auto sub_node = loadC2ResponseConfiguration(optionName.str(), new_node); + std::static_pointer_cast<state::response::ObjectNode>(prev_node)->add_node(sub_node); + } + } + } + } catch (...) { + logger_->log_error("Could not create metrics class %s", metricsClass); + } + } + } + return prev_node; +} + +void FlowController::loadC2ResponseConfiguration() { + loadC2ResponseConfiguration("nifi.c2.root.class.definitions"); +} + /** * Controller Service functions * @@ -595,9 +804,37 @@ void FlowController::enableAllControllerServices() { controller_service_provider_->enableAllControllerServices(); } -int16_t FlowController::applyUpdate(const std::string &configuration) { - applyConfiguration(configuration); - return 0; +int16_t FlowController::applyUpdate(const std::string &source, const std::string &configuration) { + if (!source.empty()) { + std::string host, protocol, path, query, url = source; + int port; + utils::parse_url(&url, &host, &port, &protocol, &path, &query); + + std::string flow_id, bucket_id; + auto path_split = utils::StringUtils::split(path, "/"); + for (size_t i = 0; i < path_split.size(); i++) { + const std::string &str = path_split.at(i); + if (str == "flows") { + if (i + 1 < path_split.size()) { + flow_id = path_split.at(i + 1); + i++; + } + } + + if (str == "bucket") { + if (i + 1 < path_split.size()) { + bucket_id = path_split.at(i + 1); + i++; + } + } + } + flow_version_->setFlowVersion(url, bucket_id, flow_id); + } + if (applyConfiguration(configuration)) { + return 1; + } else { + return 0; + } } int16_t FlowController::clearConnection(const std::string &connection) { @@ -614,10 +851,20 @@ int16_t FlowController::clearConnection(const std::string &connection) { return -1; } -int16_t FlowController::getMetrics(std::vector<std::shared_ptr<state::metrics::Metrics>> &metric_vector, uint16_t metricsClass) { +int16_t FlowController::getResponseNodes(std::vector<std::shared_ptr<state::response::ResponseNode>> &metric_vector, uint16_t metricsClass) { + std::lock_guard<std::mutex> lock(metrics_mutex_); + + for (auto metric : root_response_nodes_) { + metric_vector.push_back(metric.second); + } + + return 0; +} + +int16_t FlowController::getMetricsNodes(std::vector<std::shared_ptr<state::response::ResponseNode>> &metric_vector, uint16_t metricsClass) { std::lock_guard<std::mutex> lock(metrics_mutex_); if (metricsClass == 0) { - for (auto metric : metrics_) { + for (auto metric : device_information_) { metric_vector.push_back(metric.second); } } else { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/src/c2/C2Agent.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp index a8cc5b2..077aefe 100644 --- a/libminifi/src/c2/C2Agent.cpp +++ b/libminifi/src/c2/C2Agent.cpp @@ -8,7 +8,7 @@ * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 - *repo + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -28,7 +28,8 @@ #include "core/state/UpdateController.h" #include "core/logging/Logger.h" #include "core/logging/LoggerConfiguration.h" - +#include "utils/file/FileUtils.h" +#include "utils/file/FileManager.h" namespace org { namespace apache { namespace nifi { @@ -43,6 +44,7 @@ C2Agent::C2Agent(const std::shared_ptr<core::controller::ControllerServiceProvid heart_beat_period_(3000), max_c2_responses(5), logger_(logging::LoggerFactory<C2Agent>::getLogger()) { + allow_updates_ = true; running_configuration = std::make_shared<Configure>(); @@ -107,6 +109,10 @@ void C2Agent::configure(const std::shared_ptr<Configure> &configure, bool reconf if (protocol == nullptr) { protocol = core::ClassLoader::getDefaultClassLoader().instantiateRaw("RESTSender", "RESTSender"); + + if (!protocol) { + return; + } logger_->log_info("Class is RESTSender"); } C2Protocol *old_protocol = protocol_.exchange(dynamic_cast<C2Protocol*>(protocol)); @@ -131,6 +137,31 @@ void C2Agent::configure(const std::shared_ptr<Configure> &configure, bool reconf heart_beat_period_ = 3000; } + std::string update_settings; + if (configure->get("c2.agent.update.allow", update_settings) && utils::StringUtils::StringToBool(update_settings, allow_updates_)) { + // allow the agent to be updated. we then need to get an update command to execute after + } + + if (allow_updates_) { + if (!configure->get("c2.agent.update.command", update_command_)) { + char cwd[1024]; + getcwd(cwd, sizeof(cwd)); + + std::stringstream command; + command << cwd << "/minifi.sh update"; + update_command_ = command.str(); + } + + if (!configure->get("c2.agent.update.temp.location", update_location_)) { + char cwd[1024]; + getcwd(cwd, sizeof(cwd)); + + std::stringstream copy_path; + std::stringstream command; + + copy_path << cwd << "/minifi.update"; + } + } std::string heartbeat_reporters; if (configure->get("c2.agent.heartbeat.reporter.classes", heartbeat_reporters)) { std::vector<std::string> reporters = utils::StringUtils::split(heartbeat_reporters, ","); @@ -163,7 +194,7 @@ void C2Agent::performHeartBeat() { logger_->log_trace("Performing heartbeat"); - std::map<std::string, std::shared_ptr<state::metrics::Metrics>> metrics_copy; + std::map<std::string, std::shared_ptr<state::response::ResponseNode>> metrics_copy; { std::lock_guard<std::timed_mutex> lock(metrics_mutex_); if (metrics_map_.size() > 0) { @@ -180,7 +211,7 @@ void C2Agent::performHeartBeat() { continue; C2Payload child_metric_payload(Operation::HEARTBEAT); child_metric_payload.setLabel(metric.first); - serializeMetrics(child_metric_payload, metric.first, metric.second->serialize()); + serializeMetrics(child_metric_payload, metric.first, metric.second->serialize(), metric.second->isArray()); metrics.addPayload(std::move(child_metric_payload)); } payload.addPayload(std::move(metrics)); @@ -188,44 +219,31 @@ void C2Agent::performHeartBeat() { if (device_information_.size() > 0) { C2Payload deviceInfo(Operation::HEARTBEAT); - deviceInfo.setLabel("DeviceInfo"); + deviceInfo.setLabel("AgentInformation"); for (auto metric : device_information_) { C2Payload child_metric_payload(Operation::HEARTBEAT); child_metric_payload.setLabel(metric.first); - serializeMetrics(child_metric_payload, metric.first, metric.second->serialize()); + if (metric.second->isArray()) { + child_metric_payload.setContainer(true); + } + serializeMetrics(child_metric_payload, metric.first, metric.second->serialize(), metric.second->isArray()); deviceInfo.addPayload(std::move(child_metric_payload)); } payload.addPayload(std::move(deviceInfo)); } - std::vector<std::shared_ptr<state::StateController>> components = update_sink_->getAllComponents(); - - if (!components.empty()) { - C2ContentResponse component_payload(Operation::HEARTBEAT); - component_payload.name = "Components"; - - for (auto &component : components) { - if (component->isRunning()) { - component_payload.operation_arguments[component->getComponentName()] = "enabled"; - } else { - component_payload.operation_arguments[component->getComponentName()] = "disabled"; + if (!root_response_nodes_.empty()) { + for (auto metric : root_response_nodes_) { + C2Payload child_metric_payload(Operation::HEARTBEAT); + child_metric_payload.setLabel(metric.first); + if (metric.second->isArray()) { + child_metric_payload.setContainer(true); } + serializeMetrics(child_metric_payload, metric.first, metric.second->serialize(), metric.second->isArray()); + payload.addPayload(std::move(child_metric_payload)); } - payload.addContent(std::move(component_payload)); - } - - C2ContentResponse state(Operation::HEARTBEAT); - state.name = "state"; - if (update_sink_->isRunning()) { - state.operation_arguments["running"] = "true"; - } else { - state.operation_arguments["running"] = "false"; } - state.operation_arguments["uptime"] = std::to_string(update_sink_->getUptime()); - - payload.addContent(std::move(state)); - C2Payload && response = protocol_.load()->consumePayload(payload); enqueue_c2_server_response(std::move(response)); @@ -237,20 +255,17 @@ void C2Agent::performHeartBeat() { } } -void C2Agent::serializeMetrics(C2Payload &metric_payload, const std::string &name, const std::vector<state::metrics::MetricResponse> &metrics) { +void C2Agent::serializeMetrics(C2Payload &metric_payload, const std::string &name, const std::vector<state::response::SerializedResponseNode> &metrics, bool is_container) { for (auto metric : metrics) { if (metric.children.size() > 0) { C2Payload child_metric_payload(metric_payload.getOperation()); child_metric_payload.setLabel(metric.name); - serializeMetrics(child_metric_payload, metric.name, metric.children); - + serializeMetrics(child_metric_payload, metric.name, metric.children, is_container); metric_payload.addPayload(std::move(child_metric_payload)); } else { C2ContentResponse response(metric_payload.getOperation()); response.name = name; - response.operation_arguments[metric.name] = metric.value; - metric_payload.addContent(std::move(response)); } } @@ -323,9 +338,9 @@ void C2Agent::handle_c2_server_response(const C2ContentResponse &resp) { case Operation::CLEAR: // we've been told to clear something if (resp.name == "connection") { - logger_->log_debug("Clearing connection %s", resp.name); for (auto connection : resp.operation_arguments) { - update_sink_->clearConnection(connection.second); + logger_->log_debug("Clearing connection %s", connection.second.to_string()); + update_sink_->clearConnection(connection.second.to_string()); } C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true); enqueue_c2_response(std::move(response)); @@ -333,7 +348,6 @@ void C2Agent::handle_c2_server_response(const C2ContentResponse &resp) { update_sink_->drainRepositories(); C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true); enqueue_c2_response(std::move(response)); - } else { logger_->log_debug("Clearing unknown %s", resp.name); } @@ -392,7 +406,7 @@ void C2Agent::handle_c2_server_response(const C2ContentResponse &resp) { */ void C2Agent::handle_describe(const C2ContentResponse &resp) { if (resp.name == "metrics") { - auto reporter = std::dynamic_pointer_cast<state::metrics::MetricsReporter>(update_sink_); + auto reporter = std::dynamic_pointer_cast<state::response::NodeReporter>(update_sink_); if (reporter != nullptr) { auto metricsClass = resp.operation_arguments.find("metricsClass"); @@ -400,15 +414,15 @@ void C2Agent::handle_describe(const C2ContentResponse &resp) { if (metricsClass != resp.operation_arguments.end()) { // we have a class try { - metric_class_id = std::stoi(metricsClass->second); + metric_class_id = std::stoi(metricsClass->second.to_string()); } catch (...) { - logger_->log_error("Could not convert %s into an integer", metricsClass->second); + logger_->log_error("Could not convert %s into an integer", metricsClass->second.to_string()); } } - std::vector<std::shared_ptr<state::metrics::Metrics>> metrics_vec; + std::vector<std::shared_ptr<state::response::ResponseNode>> metrics_vec; - reporter->getMetrics(metrics_vec, metric_class_id); + reporter->getResponseNodes(metrics_vec, metric_class_id); C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true); response.setLabel("metrics"); for (auto metric : metrics_vec) { @@ -418,6 +432,26 @@ void C2Agent::handle_describe(const C2ContentResponse &resp) { } } else if (resp.name == "configuration") { + auto unsanitized_keys = configuration_->getConfiguredKeys(); + std::vector<std::string> keys; + std::copy_if(unsanitized_keys.begin(), unsanitized_keys.end(), std::back_inserter(keys), [](std::string key) {return key.find("pass") == std::string::npos;}); + C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true); + response.setLabel("configuration_options"); + C2Payload options(Operation::ACKNOWLEDGE, resp.ident, false, true); + options.setLabel("configuration_options"); + std::string value; + for (auto key : keys) { + C2ContentResponse option(Operation::ACKNOWLEDGE); + option.name = key; + if (configuration_->get(key, value)) { + option.operation_arguments[key] = value; + options.addContent(std::move(option)); + } + } + response.addPayload(std::move(options)); + enqueue_c2_response(std::move(response)); + return; + } else if (resp.name == "manifest") { auto keys = configuration_->getConfiguredKeys(); C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true); response.setLabel("configuration_options"); @@ -433,6 +467,23 @@ void C2Agent::handle_describe(const C2ContentResponse &resp) { } } response.addPayload(std::move(options)); + + if (device_information_.size() > 0) { + C2Payload deviceInfo(Operation::HEARTBEAT); + deviceInfo.setLabel("AgentInformation"); + + for (auto metric : device_information_) { + C2Payload child_metric_payload(Operation::HEARTBEAT); + child_metric_payload.setLabel(metric.first); + if (metric.second->isArray()) { + child_metric_payload.setContainer(true); + } + serializeMetrics(child_metric_payload, metric.first, metric.second->serialize(), metric.second->isArray()); + deviceInfo.addPayload(std::move(child_metric_payload)); + } + response.addPayload(std::move(deviceInfo)); + } + enqueue_c2_response(std::move(response)); return; } @@ -444,13 +495,53 @@ void C2Agent::handle_update(const C2ContentResponse &resp) { // we've been told to update something if (resp.name == "configuration") { auto url = resp.operation_arguments.find("location"); + + auto persist = resp.operation_arguments.find("persist"); + if (url != resp.operation_arguments.end()) { // just get the raw data. - C2Payload payload(Operation::UPDATE, false, true); + C2Payload payload(Operation::TRANSFER, false, true); + + C2Payload &&response = protocol_.load()->consumePayload(url->second.to_string(), payload, RECEIVE, false); - C2Payload &&response = protocol_.load()->consumePayload(url->second, payload, RECEIVE, false); + auto raw_data = response.getRawData(); + std::string file_path = std::string(raw_data.data(), raw_data.size()); - if (update_sink_->applyUpdate(response.getRawData()) == 0) { + std::ifstream new_conf(file_path); + std::string raw_data_str((std::istreambuf_iterator<char>(new_conf)), std::istreambuf_iterator<char>()); + unlink(file_path.c_str()); + // if we can apply the update, we will acknowledge it and then backup the configuration file. + if (update_sink_->applyUpdate(url->second.to_string(), raw_data_str)) { + C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true); + enqueue_c2_response(std::move(response)); + + if (persist != resp.operation_arguments.end() && utils::StringUtils::equalsIgnoreCase(persist->second.to_string(), "true")) { + // update nifi.flow.configuration.file=./conf/config.yml + std::string config_file; + configuration_->get(minifi::Configure::nifi_flow_configuration_file, config_file); + std::stringstream config_file_backup; + config_file_backup << config_file << ".bak"; + // we must be able to successfuly copy the file. + bool persist_config = true; + bool backup_file = false; + std::string backup_config; + + if (configuration_->get(minifi::Configure::nifi_flow_configuration_file_backup_update, backup_config) && utils::StringUtils::StringToBool(backup_config, backup_file)) { + if (utils::file::FileUtils::copy_file(config_file, config_file_backup.str()) != 0) { + persist_config = false; + } + } + if (persist_config) { + std::ofstream writer(config_file); + if (writer.is_open()) { + auto output = response.getRawData(); + writer.write(output.data(), output.size()); + } + writer.close(); + } + } + } else { + logger_->log_debug("update failed."); C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true); enqueue_c2_response(std::move(response)); } @@ -458,7 +549,36 @@ void C2Agent::handle_update(const C2ContentResponse &resp) { } else { auto update_text = resp.operation_arguments.find("configuration_data"); if (update_text != resp.operation_arguments.end()) { - update_sink_->applyUpdate(update_text->second); + if (update_sink_->applyUpdate(url->second.to_string(), update_text->second.to_string()) != 0 && persist != resp.operation_arguments.end() + && utils::StringUtils::equalsIgnoreCase(persist->second.to_string(), "true")) { + C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true); + enqueue_c2_response(std::move(response)); + // update nifi.flow.configuration.file=./conf/config.yml + std::string config_file; + std::stringstream config_file_backup; + config_file_backup << config_file << ".bak"; + + bool persist_config = true; + bool backup_file = false; + std::string backup_config; + + if (configuration_->get(minifi::Configure::nifi_flow_configuration_file_backup_update, backup_config) && utils::StringUtils::StringToBool(backup_config, backup_file)) { + if (utils::file::FileUtils::copy_file(config_file, config_file_backup.str()) != 0) { + persist_config = false; + } + } + if (persist_config) { + std::ofstream writer(config_file); + if (writer.is_open()) { + auto output = update_text->second.to_string(); + writer.write(output.c_str(), output.size()); + } + writer.close(); + } + } else { + C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true); + enqueue_c2_response(std::move(response)); + } } } } else if (resp.name == "c2") { @@ -468,23 +588,69 @@ void C2Agent::handle_update(const C2ContentResponse &resp) { running_configuration->clear(); for (auto entry : resp.operation_arguments) { - running_configuration->set(entry.first, entry.second); + running_configuration->set(entry.first, entry.second.to_string()); } if (resp.operation_arguments.size() > 0) configure(running_configuration); + C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true); + enqueue_c2_response(std::move(response)); + } else if (resp.name == "agent") { + // we are upgrading the agent. therefore we must be given a location + auto location = resp.operation_arguments.find("location"); + if (location != resp.operation_arguments.end()) { + // we will not have a raw payload + C2Payload payload(Operation::TRANSFER, false, true); + + C2Payload &&response = protocol_.load()->consumePayload(location->second.to_string(), payload, RECEIVE, false); + + auto raw_data = response.getRawData(); + + std::string file_path = std::string(raw_data.data(), raw_data.size()); + + // acknowledge the transfer. For a transfer, the response identifier should be the checksum of the + // file transferred. + C2Payload transfer_response(Operation::ACKNOWLEDGE, response.getIdentifier(), false, true); + + protocol_.load()->consumePayload(std::move(transfer_response)); + + if (allow_updates_) { + utils::file::FileUtils::copy_file(file_path, update_location_); + // remove the downloaded file. + logger_->log_trace("removing command %s", file_path); + unlink(file_path.c_str()); + update_agent(); + } + } } } -int16_t C2Agent::setMetrics(const std::shared_ptr<state::metrics::Metrics> &metric) { +void C2Agent::restart_agent() { + char cwd[1024]; + getcwd(cwd, sizeof(cwd)); + + std::stringstream command; + command << cwd << "/minifi.sh restart"; +} + +void C2Agent::update_agent() { + system(update_command_.c_str()); +} + +int16_t C2Agent::setResponseNodes(const std::shared_ptr<state::response::ResponseNode> &metric) { auto now = std::chrono::steady_clock::now(); - bool is_device_metric = std::dynamic_pointer_cast<state::metrics::DeviceMetric>(metric) != nullptr; if (metrics_mutex_.try_lock_until(now + std::chrono::seconds(1))) { - if (is_device_metric) { - device_information_[metric->getName()] = metric; - } else { - metrics_map_[metric->getName()] = metric; - } + root_response_nodes_[metric->getName()] = metric; + metrics_mutex_.unlock(); + return 0; + } + return -1; +} + +int16_t C2Agent::setMetricsNodes(const std::shared_ptr<state::response::ResponseNode> &metric) { + auto now = std::chrono::steady_clock::now(); + if (metrics_mutex_.try_lock_until(now + std::chrono::seconds(1))) { + metrics_map_[metric->getName()] = metric; metrics_mutex_.unlock(); return 0; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/src/c2/C2Payload.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/c2/C2Payload.cpp b/libminifi/src/c2/C2Payload.cpp index 8a30e2a..3807b12 100644 --- a/libminifi/src/c2/C2Payload.cpp +++ b/libminifi/src/c2/C2Payload.cpp @@ -80,21 +80,24 @@ C2Payload::C2Payload(Operation op, std::string identifier, bool resp, bool isRaw op_(op), raw_(isRaw), ident_(identifier), - isResponse(resp) { + isResponse(resp), + is_container_(false) { } C2Payload::C2Payload(Operation op, bool resp, bool isRaw) : state::Update(state::UpdateStatus(state::UpdateState::INITIATE, 0)), op_(op), raw_(isRaw), - isResponse(resp) { + isResponse(resp), + is_container_(false) { } C2Payload::C2Payload(Operation op, state::UpdateState state, bool resp, bool isRaw) : state::Update(state::UpdateStatus(state, 0)), op_(op), raw_(isRaw), - isResponse(resp) { + isResponse(resp), + is_container_(false) { } C2Payload::C2Payload(const C2Payload &other) @@ -106,7 +109,8 @@ C2Payload::C2Payload(const C2Payload &other) ident_(other.ident_), raw_data_(other.raw_data_), payloads_(other.payloads_), - content_(other.content_) { + content_(other.content_), + is_container_(other.is_container_) { } C2Payload::C2Payload(const C2Payload &&other) @@ -118,7 +122,8 @@ C2Payload::C2Payload(const C2Payload &&other) ident_(std::move(other.ident_)), raw_data_(std::move(other.raw_data_)), payloads_(std::move(other.payloads_)), - content_(std::move(other.content_)) { + content_(std::move(other.content_)), + is_container_(std::move(other.is_container_)) { } void C2Payload::setIdentifier(const std::string &ident) { @@ -166,14 +171,14 @@ bool C2Payload::isRaw() const { } void C2Payload::setRawData(const std::string &data) { - raw_data_ = data; + raw_data_.insert(std::end(raw_data_), std::begin(data), std::end(data)); } void C2Payload::setRawData(const std::vector<char> &data) { - raw_data_ = std::string(data.data(), data.size()); + raw_data_.insert(std::end(raw_data_), std::begin(data), std::end(data)); } -std::string C2Payload::getRawData() const { +std::vector<char> C2Payload::getRawData() const { return raw_data_; } @@ -195,6 +200,7 @@ C2Payload &C2Payload::operator=(const C2Payload &&other) { label_ = std::move(other.label_); payloads_ = std::move(other.payloads_); content_ = std::move(other.content_); + is_container_ = std::move(other.is_container_); return *this; } @@ -209,6 +215,7 @@ C2Payload &C2Payload::operator=(const C2Payload &other) { label_ = other.label_; payloads_ = other.payloads_; content_ = other.content_; + is_container_ = other.is_container_; return *this; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/src/c2/ControllerSocketProtocol.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/c2/ControllerSocketProtocol.cpp b/libminifi/src/c2/ControllerSocketProtocol.cpp index fb5be6d..ea9cf9b 100644 --- a/libminifi/src/c2/ControllerSocketProtocol.cpp +++ b/libminifi/src/c2/ControllerSocketProtocol.cpp @@ -158,7 +158,7 @@ void ControllerSocketProtocol::initialize(const std::shared_ptr<core::controller logger_->log_debug("Connection broke"); break; } - update_sink_->applyUpdate(configuration); + update_sink_->applyUpdate("ControllerSocketProtocol", configuration); } } break; @@ -239,7 +239,7 @@ void ControllerSocketProtocol::parse_content(const std::vector<C2ContentResponse if (payload_content.name == "Components") { for (auto content : payload_content.operation_arguments) { bool is_enabled = false; - minifi::utils::StringUtils::StringToBool(content.second, is_enabled); + minifi::utils::StringUtils::StringToBool(content.second.to_string(), is_enabled); std::lock_guard<std::mutex> lock(controller_mutex_); component_map_[content.first] = is_enabled; } @@ -262,9 +262,9 @@ int16_t ControllerSocketProtocol::heartbeat(const C2Payload &payload) { uint64_t max = 0; for (auto content : payload_content.operation_arguments) { if (content.first == "datasize") { - size = std::stol(content.second); + size = std::stol(content.second.to_string()); } else if (content.first == "datasizemax") { - max = std::stol(content.second); + max = std::stol(content.second.to_string()); } } std::lock_guard<std::mutex> lock(controller_mutex_);
