This is an automated email from the ASF dual-hosted git repository. szaszm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 18e79b65ac25b849fa4937159546f41d4c08d568 Author: Martin Zink <[email protected]> AuthorDate: Mon Aug 2 14:50:36 2021 +0200 MINIFICPP-1573 Make AppendHostInfo platform independent This closes #1116 Signed-off-by: Marton Szasz <[email protected]> --- CPPLINT.cfg | 2 +- PROCESSORS.md | 7 +- extensions/pdh/PerformanceDataMonitor.cpp | 11 +- .../processors/AppendHostInfo.cpp | 120 ++++++++------- .../processors/AppendHostInfo.h | 43 +++--- .../tests/unit/AppendHostInfoTests.cpp | 75 ++++++++++ .../include/core/state/nodes/DeviceInformation.h | 54 +------ libminifi/include/utils/NetworkInterfaceInfo.h | 78 ++++++++++ .../include/utils/OpenTelemetryLogDataModelUtils.h | 82 ++++++++++ libminifi/include/utils/OsUtils.h | 6 + libminifi/src/io/ClientSocket.cpp | 30 +--- libminifi/src/utils/NetworkInterfaceInfo.cpp | 165 +++++++++++++++++++++ libminifi/src/utils/OsUtils.cpp | 42 +++++- libminifi/test/unit/NetworkInterfaceInfoTests.cpp | 42 ++++++ .../test/unit/OpenTelemetryLogDataModelTests.cpp | 32 ++++ 15 files changed, 634 insertions(+), 155 deletions(-) diff --git a/CPPLINT.cfg b/CPPLINT.cfg index a89734e..ecf6bbc 100644 --- a/CPPLINT.cfg +++ b/CPPLINT.cfg @@ -1,2 +1,2 @@ set noparent -filter=-runtime/reference,-runtime/string,-build/c++11,-build/include_subdir,-whitespace/forcolon +filter=-runtime/reference,-runtime/string,-build/c++11,-build/include_subdir,-whitespace/forcolon,-build/namespaces_literals diff --git a/PROCESSORS.md b/PROCESSORS.md index c110348..2d18f6c 100644 --- a/PROCESSORS.md +++ b/PROCESSORS.md @@ -63,9 +63,10 @@ In the list below, the names of required properties appear in bold. Any other pr | Name | Default Value | Allowable Values | Description | | - | - | - | - | -|Hostname Attribute|source.hostname||Flowfile attribute to used to record the agent's hostname| -|IP Attribute|source.ipv4||Flowfile attribute to used to record the agent's IP address| -|Network Interface Name|eth0||Network interface from which to read an IP v4 address| +|Hostname Attribute|source.hostname||Flowfile attribute used to record the agent's hostname| +|IP Attribute|source.ipv4||Flowfile attribute used to record the agent's IP address| +|Network Interface Filter|||A regular expression to filter ip addresses based on the name of the network interface| +|Refresh Policy|On schedule|On schedule<br>On every trigger|When to recalculate the host info| ### Relationships | Name | Description | diff --git a/extensions/pdh/PerformanceDataMonitor.cpp b/extensions/pdh/PerformanceDataMonitor.cpp index 191bf71..e1a8112 100644 --- a/extensions/pdh/PerformanceDataMonitor.cpp +++ b/extensions/pdh/PerformanceDataMonitor.cpp @@ -25,6 +25,7 @@ #include "MemoryConsumptionCounter.h" #include "utils/StringUtils.h" #include "utils/JsonCallback.h" +#include "utils/OpenTelemetryLogDataModelUtils.h" namespace org { namespace apache { @@ -136,13 +137,9 @@ void PerformanceDataMonitor::initialize() { rapidjson::Value& PerformanceDataMonitor::prepareJSONBody(rapidjson::Document& root) { switch (output_format_) { case OutputFormat::OPENTELEMETRY: { - root.AddMember("Name", "PerformanceData", root.GetAllocator()); - root.AddMember("Timestamp", std::time(0), root.GetAllocator()); - std::string hostname = io::Socket::getMyHostName(); - rapidjson::Value hostname_value; - hostname_value.SetString(hostname.c_str(), hostname.length(), root.GetAllocator()); - root.AddMember("Hostname", hostname_value, root.GetAllocator()); - root.AddMember("Body", rapidjson::Value{ rapidjson::kObjectType }, root.GetAllocator()); + utils::OpenTelemetryLogDataModel::appendEventInformation(root, "PerformanceData"); + utils::OpenTelemetryLogDataModel::appendHostInformation(root); + utils::OpenTelemetryLogDataModel::appendBody(root); return root["Body"]; } case OutputFormat::JSON: diff --git a/extensions/standard-processors/processors/AppendHostInfo.cpp b/extensions/standard-processors/processors/AppendHostInfo.cpp index e9fdb69..fed4ad3 100644 --- a/extensions/standard-processors/processors/AppendHostInfo.cpp +++ b/extensions/standard-processors/processors/AppendHostInfo.cpp @@ -23,90 +23,102 @@ #define __USE_POSIX #endif /* __USE_POSIX */ -#include <limits.h> -#include <string.h> #include <memory> #include <string> -#include <set> +#include <regex> +#include <algorithm> #include "core/ProcessContext.h" #include "core/Property.h" #include "core/ProcessSession.h" #include "core/FlowFile.h" #include "io/ClientSocket.h" +#include "utils/NetworkInterfaceInfo.h" + namespace org { namespace apache { namespace nifi { namespace minifi { namespace processors { -#ifndef WIN32 -#include <netdb.h> -#include <netinet/in.h> -#include <sys/socket.h> -#include <sys/ioctl.h> -#include <net/if.h> -#include <arpa/inet.h> -#endif - -#ifndef HOST_NAME_MAX -#define HOST_NAME_MAX 255 -#endif +core::Property AppendHostInfo::InterfaceNameFilter("Network Interface Filter", "A regular expression to filter ip addresses based on the name of the network interface", ""); +core::Property AppendHostInfo::HostAttribute("Hostname Attribute", "Flowfile attribute used to record the agent's hostname", "source.hostname"); +core::Property AppendHostInfo::IPAttribute("IP Attribute", "Flowfile attribute used to record the agent's IP addresses in a comma separated list", "source.ipv4"); +core::Property AppendHostInfo::RefreshPolicy(core::PropertyBuilder::createProperty("Refresh Policy") + ->withDescription("When to recalculate the host info") + ->withAllowableValues<std::string>({ REFRESH_POLICY_ON_SCHEDULE, REFRESH_POLICY_ON_TRIGGER }) + ->withDefaultValue(REFRESH_POLICY_ON_SCHEDULE)->build()); -core::Property AppendHostInfo::InterfaceName("Network Interface Name", "Network interface from which to read an IP v4 address", "eth0"); -core::Property AppendHostInfo::HostAttribute("Hostname Attribute", "Flowfile attribute to used to record the agent's hostname", "source.hostname"); -core::Property AppendHostInfo::IPAttribute("IP Attribute", "Flowfile attribute to used to record the agent's IP address", "source.ipv4"); core::Relationship AppendHostInfo::Success("success", "success operational on the flow record"); void AppendHostInfo::initialize() { - // Set the supported properties - std::set<core::Property> properties; - properties.insert(InterfaceName); - properties.insert(HostAttribute); - properties.insert(IPAttribute); - setSupportedProperties(properties); + setSupportedProperties({InterfaceNameFilter, HostAttribute, IPAttribute, RefreshPolicy}); + setSupportedRelationships({Success}); +} - // Set the supported relationships - std::set<core::Relationship> relationships; - relationships.insert(Success); - setSupportedRelationships(relationships); +void AppendHostInfo::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) { + std::unique_lock unique_lock(shared_mutex_); + context->getProperty(HostAttribute.getName(), hostname_attribute_name_); + context->getProperty(IPAttribute.getName(), ipaddress_attribute_name_); + std::string interface_name_filter_str; + if (context->getProperty(InterfaceNameFilter.getName(), interface_name_filter_str) && !interface_name_filter_str.empty()) + interface_name_filter_.emplace(interface_name_filter_str); + else + interface_name_filter_ = std::nullopt; + + std::string refresh_policy; + context->getProperty(RefreshPolicy.getName(), refresh_policy); + if (refresh_policy == REFRESH_POLICY_ON_TRIGGER) + refresh_on_trigger_ = true; + else + refreshHostInfo(); } -void AppendHostInfo::onTrigger(core::ProcessContext *context, core::ProcessSession *session) { +void AppendHostInfo::onTrigger(core::ProcessContext*, core::ProcessSession* session) { std::shared_ptr<core::FlowFile> flow = session->get(); if (!flow) return; - // Get Hostname - - std::string hostAttribute = ""; - context->getProperty(HostAttribute.getName(), hostAttribute); - flow->addAttribute(hostAttribute.c_str(), org::apache::nifi::minifi::io::Socket::getMyHostName()); - - // Get IP address for the specified interface - std::string iface; - context->getProperty(InterfaceName.getName(), iface); - // Confirm the specified interface name exists on this device -#ifndef WIN32 - if (if_nametoindex(iface.c_str()) != 0) { - struct ifreq ifr; - int fd = socket(AF_INET, SOCK_DGRAM, 0); - // Type of address to retrieve - IPv4 IP address - ifr.ifr_addr.sa_family = AF_INET; - // Copy the interface name in the ifreq structure - strncpy(ifr.ifr_name, iface.c_str(), IFNAMSIZ - 1); - ioctl(fd, SIOCGIFADDR, &ifr); - close(fd); + { + std::shared_lock shared_lock(shared_mutex_); + if (refresh_on_trigger_) { + shared_lock.unlock(); + std::unique_lock unique_lock(shared_mutex_); + refreshHostInfo(); + } + } - std::string ipAttribute; - context->getProperty(IPAttribute.getName(), ipAttribute); - flow->addAttribute(ipAttribute.c_str(), inet_ntoa(((struct sockaddr_in *) &ifr.ifr_addr)->sin_addr)); + { + std::shared_lock shared_lock(shared_mutex_); + flow->addAttribute(hostname_attribute_name_, hostname_); + if (ipaddresses_.has_value()) { + flow->addAttribute(ipaddress_attribute_name_, ipaddresses_.value()); + } } -#endif - // Transfer to the relationship session->transfer(flow, Success); } +void AppendHostInfo::refreshHostInfo() { + hostname_ = org::apache::nifi::minifi::io::Socket::getMyHostName(); + auto filter = [this](const utils::NetworkInterfaceInfo& interface_info) -> bool { + bool has_ipv4_address = interface_info.hasIpV4Address(); + bool matches_regex_or_empty_regex = (!interface_name_filter_.has_value()) || std::regex_match(interface_info.getName(), interface_name_filter_.value()); + return has_ipv4_address && matches_regex_or_empty_regex; + }; + auto network_interface_infos = utils::NetworkInterfaceInfo::getNetworkInterfaceInfos(filter); + std::ostringstream oss; + if (network_interface_infos.size() == 0) { + ipaddresses_ = std::nullopt; + } else { + for (auto& network_interface_info : network_interface_infos) { + auto& ip_v4_addresses = network_interface_info.getIpV4Addresses(); + std::copy(std::begin(ip_v4_addresses), std::end(ip_v4_addresses), std::ostream_iterator<std::string>(oss, ",")); + } + ipaddresses_ = oss.str(); + ipaddresses_.value().pop_back(); // to remove trailing comma + } +} + } /* namespace processors */ } /* namespace minifi */ } /* namespace nifi */ diff --git a/extensions/standard-processors/processors/AppendHostInfo.h b/extensions/standard-processors/processors/AppendHostInfo.h index e319ac6..5ccb253 100644 --- a/extensions/standard-processors/processors/AppendHostInfo.h +++ b/extensions/standard-processors/processors/AppendHostInfo.h @@ -22,6 +22,9 @@ #include <memory> #include <string> +#include <optional> +#include <regex> +#include <shared_mutex> #include "core/Property.h" #include "FlowFileRecord.h" @@ -37,38 +40,44 @@ namespace nifi { namespace minifi { namespace processors { -// AppendHostInfo Class class AppendHostInfo : public core::Processor { public: - // Constructor - /*! - * Create a new processor - */ - AppendHostInfo(const std::string& name, const utils::Identifier& uuid = {}) // NOLINT + static constexpr const char* REFRESH_POLICY_ON_TRIGGER = "On every trigger"; + static constexpr const char* REFRESH_POLICY_ON_SCHEDULE = "On schedule"; + + explicit AppendHostInfo(const std::string& name, const utils::Identifier& uuid = {}) : core::Processor(name, uuid), - logger_(logging::LoggerFactory<AppendHostInfo>::getLogger()) { + logger_(logging::LoggerFactory<AppendHostInfo>::getLogger()), + refresh_on_trigger_(false) { } - // Destructor virtual ~AppendHostInfo() = default; - // Processor Name static constexpr char const* ProcessorName = "AppendHostInfo"; - // Supported Properties - static core::Property InterfaceName; + + static core::Property InterfaceNameFilter; static core::Property HostAttribute; static core::Property IPAttribute; + static core::Property RefreshPolicy; - // Supported Relationships static core::Relationship Success; public: - // OnTrigger method, implemented by NiFi AppendHostInfo - virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session); - // Initialize, over write by NiFi AppendHostInfo - virtual void initialize(void); + void onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& sessionFactory) override; + void onTrigger(core::ProcessContext* context, core::ProcessSession* session) override; + void initialize() override; + + protected: + virtual void refreshHostInfo(); private: - // Logger + std::shared_mutex shared_mutex_; std::shared_ptr<logging::Logger> logger_; + std::string hostname_attribute_name_; + std::string ipaddress_attribute_name_; + std::optional<std::regex> interface_name_filter_; + bool refresh_on_trigger_; + + std::string hostname_; + std::optional<std::string> ipaddresses_; }; REGISTER_RESOURCE(AppendHostInfo, "Appends host information such as IP address and hostname as an attribute to incoming flowfiles."); diff --git a/extensions/standard-processors/tests/unit/AppendHostInfoTests.cpp b/extensions/standard-processors/tests/unit/AppendHostInfoTests.cpp new file mode 100644 index 0000000..97014e0 --- /dev/null +++ b/extensions/standard-processors/tests/unit/AppendHostInfoTests.cpp @@ -0,0 +1,75 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "TestBase.h" +#include "AppendHostInfo.h" +#include "LogAttribute.h" + +using AppendHostInfo = org::apache::nifi::minifi::processors::AppendHostInfo; +using LogAttribute = org::apache::nifi::minifi::processors::LogAttribute; + +TEST_CASE("AppendHostInfoTest", "[appendhostinfotest]") { + TestController testController; + std::shared_ptr<TestPlan> plan = testController.createPlan(); + LogTestController::getInstance().setTrace<processors::AppendHostInfo>(); + LogTestController::getInstance().setTrace<processors::LogAttribute>(); + std::shared_ptr<core::Processor> generate_flow_file = plan->addProcessor("GenerateFlowFile", "generate_flow_file"); + std::shared_ptr<core::Processor> append_host_info = plan->addProcessor("AppendHostInfo", "append_host_info", core::Relationship("success", "description"), true); + std::shared_ptr<core::Processor> log_attribute = plan->addProcessor("LogAttribute", "log_attributes", core::Relationship("success", "description"), true); + + testController.runSession(plan); + + REQUIRE(LogTestController::getInstance().contains("source.hostname")); + REQUIRE(LogTestController::getInstance().contains("source.ipv4")); +} + +TEST_CASE("AppendHostInfoTestWithUnmatchableRegex", "[appendhostinfotestunmatchableregex]") { + TestController testController; + std::shared_ptr<TestPlan> plan = testController.createPlan(); + LogTestController::getInstance().setTrace<processors::AppendHostInfo>(); + LogTestController::getInstance().setTrace<processors::LogAttribute>(); + std::shared_ptr<core::Processor> generate_flow_file = plan->addProcessor("GenerateFlowFile", "generate_flow_file"); + std::shared_ptr<core::Processor> append_host_info = plan->addProcessor("AppendHostInfo", "append_host_info", core::Relationship("success", "description"), true); + std::shared_ptr<core::Processor> log_attribute = plan->addProcessor("LogAttribute", "log_attributes", core::Relationship("success", "description"), true); + + plan->setProperty(append_host_info, AppendHostInfo::InterfaceNameFilter.getName(), "\b"); + + testController.runSession(plan); + + using namespace std::literals; + REQUIRE(LogTestController::getInstance().contains("source.hostname", 0s, 0ms)); + REQUIRE_FALSE(LogTestController::getInstance().contains("source.ipv4", 0s, 0ms)); +} + +TEST_CASE("AppendHostInfoTestCanFilterOutLoopbackInterfacesWithRegex", "[appendhostinfotestfilterloopback]") { + TestController testController; + std::shared_ptr<TestPlan> plan = testController.createPlan(); + LogTestController::getInstance().setTrace<processors::AppendHostInfo>(); + LogTestController::getInstance().setTrace<processors::LogAttribute>(); + std::shared_ptr<core::Processor> generate_flow_file = plan->addProcessor("GenerateFlowFile", "generate_flow_file"); + std::shared_ptr<core::Processor> append_host_info = plan->addProcessor("AppendHostInfo", "append_host_info", core::Relationship("success", "description"), true); + std::shared_ptr<core::Processor> log_attribute = plan->addProcessor("LogAttribute", "log_attributes", core::Relationship("success", "description"), true); + + plan->setProperty(append_host_info, AppendHostInfo::InterfaceNameFilter.getName(), "(?!Loopback|lo).*?"); // set up the regex to accept everything except interfaces starting with Loopback or lo + + testController.runSession(plan); + + using namespace std::literals; + REQUIRE(LogTestController::getInstance().contains("source.hostname", 0s, 0ms)); + REQUIRE_FALSE(LogTestController::getInstance().contains("127.0.0.1", 0s, 0ms)); +} diff --git a/libminifi/include/core/state/nodes/DeviceInformation.h b/libminifi/include/core/state/nodes/DeviceInformation.h index 581850b..c336b4d 100644 --- a/libminifi/include/core/state/nodes/DeviceInformation.h +++ b/libminifi/include/core/state/nodes/DeviceInformation.h @@ -58,6 +58,7 @@ #include "Connection.h" #include "io/ClientSocket.h" #include "utils/OsUtils.h" +#include "utils/NetworkInterfaceInfo.h" #include "utils/SystemCpuUsageTracker.h" namespace org { @@ -122,52 +123,13 @@ class Device { std::vector<std::string> getIpAddresses() { static std::vector<std::string> ips; if (ips.empty()) { -#ifndef WIN32 - struct ifaddrs *ifaddr, *ifa; - if (getifaddrs(&ifaddr) == -1) { - perror("getifaddrs"); - exit(EXIT_FAILURE); - } - - for (ifa = ifaddr; ifa != NULL; ifa = ifa->ifa_next) { - if ((strcmp("lo", ifa->ifa_name) == 0) || !(ifa->ifa_flags & (IFF_RUNNING))) - continue; - if ((ifa->ifa_addr != NULL) && (ifa->ifa_addr->sa_family == AF_INET)) { - ips.push_back(inet_ntoa(((struct sockaddr_in *) ifa->ifa_addr)->sin_addr)); - } - } - - freeifaddrs(ifaddr); -#else - PIP_ADAPTER_INFO adapterPtr; - PIP_ADAPTER_INFO adapter = NULL; - - DWORD dwRetVal = 0; - - ULONG adapterLen = sizeof(IP_ADAPTER_INFO); - adapterPtr = reinterpret_cast<IP_ADAPTER_INFO*>(malloc(sizeof(IP_ADAPTER_INFO))); - if (adapterPtr == NULL) { - return ips; - } - if (GetAdaptersInfo(adapterPtr, &adapterLen) == ERROR_BUFFER_OVERFLOW) { - free(adapterPtr); - adapterPtr = reinterpret_cast<IP_ADAPTER_INFO*>(malloc(adapterLen)); - if (adapterPtr == NULL) { - return ips; - } - } - - if ((dwRetVal = GetAdaptersInfo(adapterPtr, &adapterLen)) == NO_ERROR) { - adapter = adapterPtr; - while (adapter) { - ips.emplace_back(adapter->IpAddressList.IpAddress.String); - adapter = adapter->Next; - } - } - - if (adapterPtr) - free(adapterPtr); -#endif + const auto filter = [](const utils::NetworkInterfaceInfo& interface_info) { + return !interface_info.isLoopback() && interface_info.isRunning(); + }; + auto network_interface_infos = utils::NetworkInterfaceInfo::getNetworkInterfaceInfos(filter); + for (const auto& network_interface_info : network_interface_infos) + for (const auto& ip_v4_address : network_interface_info.getIpV4Addresses()) + ips.push_back(ip_v4_address); } return ips; } diff --git a/libminifi/include/utils/NetworkInterfaceInfo.h b/libminifi/include/utils/NetworkInterfaceInfo.h new file mode 100644 index 0000000..12ac062 --- /dev/null +++ b/libminifi/include/utils/NetworkInterfaceInfo.h @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <string> +#include <vector> +#include <memory> +#include <unordered_map> +#include <optional> +#include "core/logging/Logger.h" + +#ifdef WIN32 +struct _IP_ADAPTER_ADDRESSES_LH; +typedef _IP_ADAPTER_ADDRESSES_LH IP_ADAPTER_ADDRESSES_LH; +typedef IP_ADAPTER_ADDRESSES_LH IP_ADAPTER_ADDRESSES; +#else +struct ifaddrs; +#endif + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { + +namespace utils { +class NetworkInterfaceInfo { + public: + NetworkInterfaceInfo(NetworkInterfaceInfo&& src) noexcept = default; +#ifdef WIN32 + // Creates NetworkInterfaceInfo from IP_ADAPTER_ADDRESSES struct (it should contain all ip addresses from an adapter) + explicit NetworkInterfaceInfo(const IP_ADAPTER_ADDRESSES* adapter); +#else + // Creates NetworkInterfaceInfo from ifaddrs struct (it will only contain a single ip address from an adapter, it should be merged together based on name_) + explicit NetworkInterfaceInfo(const struct ifaddrs* ifa); +#endif + NetworkInterfaceInfo& operator=(NetworkInterfaceInfo&& other) noexcept = default; + const std::string& getName() const noexcept { return name_; } + bool hasIpV4Address() const noexcept { return !ip_v4_addresses_.empty(); } + bool hasIpV6Address() const noexcept { return !ip_v6_addresses_.empty(); } + bool isRunning() const noexcept { return running_; } + bool isLoopback() const noexcept { return loopback_; } + const std::vector<std::string>& getIpV4Addresses() const noexcept { return ip_v4_addresses_; } + const std::vector<std::string>& getIpV6Addresses() const noexcept { return ip_v6_addresses_; } + + // Traverses the ip addresses and merges them together based on the interface name + static std::vector<NetworkInterfaceInfo> getNetworkInterfaceInfos(std::function<bool(const NetworkInterfaceInfo&)> filter = { [](const NetworkInterfaceInfo&) { return true; } }, + const std::optional<uint32_t> max_interfaces = std::nullopt); + + private: + void moveAddressesInto(NetworkInterfaceInfo& destination); + + std::string name_; + std::vector<std::string> ip_v4_addresses_; + std::vector<std::string> ip_v6_addresses_; + bool running_; + bool loopback_; + static std::shared_ptr<core::logging::Logger> logger_; +}; +} /* namespace utils */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ diff --git a/libminifi/include/utils/OpenTelemetryLogDataModelUtils.h b/libminifi/include/utils/OpenTelemetryLogDataModelUtils.h new file mode 100644 index 0000000..3594886 --- /dev/null +++ b/libminifi/include/utils/OpenTelemetryLogDataModelUtils.h @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <string> + +#include "rapidjson/document.h" +#include "NetworkInterfaceInfo.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + +class OpenTelemetryLogDataModel { + public: + static void appendEventInformation(rapidjson::Document& root, const std::string& event_identifier) { + rapidjson::Value name; + name.SetString(event_identifier.c_str(), event_identifier.length(), root.GetAllocator()); + root.AddMember("Name", name, root.GetAllocator()); + root.AddMember("Timestamp", rapidjson::Value().SetInt64(std::time(0)), root.GetAllocator()); + } + + static void appendHostInformation(rapidjson::Document& root) { + root.AddMember("Resource", rapidjson::Value{ rapidjson::kObjectType }, root.GetAllocator()); + appendHostName(root["Resource"], root.GetAllocator()); + appendIPAddresses(root["Resource"], root.GetAllocator()); + } + + static void appendBody(rapidjson::Document& root) { + root.AddMember("Body", rapidjson::Value{ rapidjson::kObjectType }, root.GetAllocator()); + } + + private: + static void appendHostName(rapidjson::Value& resource, rapidjson::Document::AllocatorType& allocator) { + std::string hostname = io::Socket::getMyHostName(); + rapidjson::Value hostname_value; + hostname_value.SetString(hostname.c_str(), hostname.length(), allocator); + resource.AddMember("host.hostname", hostname_value, allocator); + } + + static void appendIPAddresses(rapidjson::Value& resource, rapidjson::Document::AllocatorType& alloc) { + resource.AddMember("host.ip", rapidjson::Value{ rapidjson::kObjectType }, alloc); + rapidjson::Value& ip = resource["host.ip"]; + auto network_interface_infos = utils::NetworkInterfaceInfo::getNetworkInterfaceInfos(); + for (const auto& network_interface_info : network_interface_infos) { + rapidjson::Value interface_name(network_interface_info.getName().c_str(), network_interface_info.getName().length(), alloc); + rapidjson::Value interface_address_array(rapidjson::kArrayType); + for (auto& ip_v4_address : network_interface_info.getIpV4Addresses()) { + rapidjson::Value address_value(ip_v4_address.c_str(), ip_v4_address.length(), alloc); + interface_address_array.PushBack(address_value.Move(), alloc); + } + for (auto& ip_v6_address : network_interface_info.getIpV6Addresses()) { + rapidjson::Value address_value(ip_v6_address.c_str(), ip_v6_address.length(), alloc); + interface_address_array.PushBack(address_value.Move(), alloc); + } + ip.AddMember(interface_name, interface_address_array, alloc); + } + } +}; + +} /* namespace utils */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ diff --git a/libminifi/include/utils/OsUtils.h b/libminifi/include/utils/OsUtils.h index 82c69fc..bf3615a 100644 --- a/libminifi/include/utils/OsUtils.h +++ b/libminifi/include/utils/OsUtils.h @@ -19,6 +19,8 @@ #include <string> +struct sockaddr; + namespace org { namespace apache { namespace nifi { @@ -26,6 +28,7 @@ namespace minifi { namespace utils { namespace OsUtils { + /// Resolves a user ID to a username extern std::string userIdToUsername(const std::string &uid); @@ -50,6 +53,9 @@ std::string getMachineArchitecture(); /// Resolves common identifiers extern std::string resolve_common_identifiers(const std::string &id); #endif + +std::string sockaddr_ntop(const sockaddr* const sa); + } /* namespace OsUtils */ } /* namespace utils */ } /* namespace minifi */ diff --git a/libminifi/src/io/ClientSocket.cpp b/libminifi/src/io/ClientSocket.cpp index d26a01b..430f019 100644 --- a/libminifi/src/io/ClientSocket.cpp +++ b/libminifi/src/io/ClientSocket.cpp @@ -48,6 +48,7 @@ #include "utils/file/FileUtils.h" #include "utils/GeneralUtils.h" #include "utils/gsl.h" +#include "utils/OsUtils.h" namespace util = org::apache::nifi::minifi::utils; namespace mio = org::apache::nifi::minifi::io; @@ -62,29 +63,6 @@ std::string get_last_getaddrinfo_err_str(int getaddrinfo_result) { #endif /* WIN32 */ } -std::string sockaddr_ntop(const sockaddr* const sa) { - std::string result; - if (sa->sa_family == AF_INET) { - sockaddr_in sa_in{}; - std::memcpy(reinterpret_cast<void*>(&sa_in), sa, sizeof(sockaddr_in)); - result.resize(INET_ADDRSTRLEN); - if (inet_ntop(AF_INET, &sa_in.sin_addr, &result[0], INET_ADDRSTRLEN) == nullptr) { - throw minifi::Exception{ minifi::ExceptionType::GENERAL_EXCEPTION, mio::get_last_socket_error_message() }; - } - } else if (sa->sa_family == AF_INET6) { - sockaddr_in6 sa_in6{}; - std::memcpy(reinterpret_cast<void*>(&sa_in6), sa, sizeof(sockaddr_in6)); - result.resize(INET6_ADDRSTRLEN); - if (inet_ntop(AF_INET6, &sa_in6.sin6_addr, &result[0], INET6_ADDRSTRLEN) == nullptr) { - throw minifi::Exception{ minifi::ExceptionType::GENERAL_EXCEPTION, mio::get_last_socket_error_message() }; - } - } else { - throw minifi::Exception{ minifi::ExceptionType::GENERAL_EXCEPTION, "sockaddr_ntop: unknown address family" }; - } - result.resize(strlen(result.c_str())); // discard remaining null bytes at the end - return result; -} - template<typename T, typename Pred, typename Adv> auto find_if_custom_linked_list(T* const list, const Adv advance_func, const Pred predicate) -> typename std::enable_if<std::is_convertible<decltype(advance_func(std::declval<T*>())), T*>::value && std::is_convertible<decltype(predicate(std::declval<T*>())), bool>::value, T*>::type @@ -273,7 +251,7 @@ int8_t Socket::createConnection(const addrinfo* const destination_addresses) { continue; } - logger_->log_info("Listening on %s:%" PRIu16 " with backlog %" PRIu16, sockaddr_ntop(current_addr->ai_addr), port_, listeners_); + logger_->log_info("Listening on %s:%" PRIu16 " with backlog %" PRIu16, utils::OsUtils::sockaddr_ntop(current_addr->ai_addr), port_, listeners_); } else { // client socket #ifndef WIN32 @@ -286,12 +264,12 @@ int8_t Socket::createConnection(const addrinfo* const destination_addresses) { const auto connect_result = connect(socket_file_descriptor_, current_addr->ai_addr, current_addr->ai_addrlen); if (connect_result == SOCKET_ERROR) { - logger_->log_warn("Couldn't connect to %s:%" PRIu16 ": %s", sockaddr_ntop(current_addr->ai_addr), port_, get_last_socket_error_message()); + logger_->log_warn("Couldn't connect to %s:%" PRIu16 ": %s", utils::OsUtils::sockaddr_ntop(current_addr->ai_addr), port_, get_last_socket_error_message()); close(); continue; } - logger_->log_info("Connected to %s:%" PRIu16, sockaddr_ntop(current_addr->ai_addr), port_); + logger_->log_info("Connected to %s:%" PRIu16, utils::OsUtils::sockaddr_ntop(current_addr->ai_addr), port_); } FD_SET(socket_file_descriptor_, &total_list_); diff --git a/libminifi/src/utils/NetworkInterfaceInfo.cpp b/libminifi/src/utils/NetworkInterfaceInfo.cpp new file mode 100644 index 0000000..82cf763 --- /dev/null +++ b/libminifi/src/utils/NetworkInterfaceInfo.cpp @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "utils/NetworkInterfaceInfo.h" +#include "utils/OsUtils.h" +#include "core/logging/LoggerConfiguration.h" +#ifdef WIN32 +#include <Windows.h> +#include <winsock2.h> +#include <iphlpapi.h> +#include <WS2tcpip.h> +#pragma comment(lib, "IPHLPAPI.lib") +#else +#include <unistd.h> +#include <netinet/in.h> +#include <sys/socket.h> +#include <net/if.h> +#include <arpa/inet.h> +#include <ifaddrs.h> +#endif + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + +std::shared_ptr<core::logging::Logger> NetworkInterfaceInfo::logger_ = core::logging::LoggerFactory<NetworkInterfaceInfo>::getLogger(); + +#ifdef WIN32 +namespace { +std::string utf8_encode(const std::wstring& wstr) { + if (wstr.empty()) + return std::string(); + int size_needed = WideCharToMultiByte(CP_UTF8, 0, wstr.c_str(), -1, nullptr, 0, nullptr, nullptr); + std::string result_string(size_needed, 0); + WideCharToMultiByte(CP_UTF8, 0, wstr.c_str(), -1, result_string.data(), size_needed, nullptr, nullptr); + return result_string; +} +} + +NetworkInterfaceInfo::NetworkInterfaceInfo(const IP_ADAPTER_ADDRESSES* adapter) { + name_ = utf8_encode(adapter->FriendlyName); + for (auto unicast_address = adapter->FirstUnicastAddress; unicast_address != nullptr; unicast_address = unicast_address->Next) { + if (unicast_address->Address.lpSockaddr->sa_family == AF_INET) { + ip_v4_addresses_.push_back(OsUtils::sockaddr_ntop(unicast_address->Address.lpSockaddr)); + } else if (unicast_address->Address.lpSockaddr->sa_family == AF_INET6) { + ip_v6_addresses_.push_back(OsUtils::sockaddr_ntop(unicast_address->Address.lpSockaddr)); + } + } + running_ = adapter->OperStatus == IfOperStatusUp; + loopback_ = adapter->IfType == IF_TYPE_SOFTWARE_LOOPBACK; +} +#else +NetworkInterfaceInfo::NetworkInterfaceInfo(const struct ifaddrs* ifa) { + name_ = ifa->ifa_name; + if (ifa->ifa_addr->sa_family == AF_INET) { + ip_v4_addresses_.push_back(OsUtils::sockaddr_ntop(ifa->ifa_addr)); + } else if (ifa->ifa_addr->sa_family == AF_INET6) { + ip_v6_addresses_.push_back(OsUtils::sockaddr_ntop(ifa->ifa_addr)); + } + running_ = (ifa->ifa_flags & IFF_RUNNING); + loopback_ = (ifa->ifa_flags & IFF_LOOPBACK); +} +#endif + +namespace { +struct HasName { + explicit HasName(const std::string& name) : name_(name) {} + bool operator()(const NetworkInterfaceInfo& network_interface_info) { + return network_interface_info.getName() == name_; + } + const std::string& name_; +}; +} + +std::vector<NetworkInterfaceInfo> NetworkInterfaceInfo::getNetworkInterfaceInfos(std::function<bool(const NetworkInterfaceInfo&)> filter, + const std::optional<uint32_t> max_interfaces) { + std::vector<NetworkInterfaceInfo> network_adapters; +#ifdef WIN32 + ULONG buffer_length = sizeof(IP_ADAPTER_ADDRESSES); + auto get_adapters_err = GetAdaptersAddresses(0, 0, nullptr, nullptr, &buffer_length); + if (ERROR_BUFFER_OVERFLOW != get_adapters_err) { + logger_->log_error("GetAdaptersAddresses failed: %lu", get_adapters_err); + return network_adapters; + } + std::vector<char> bytes(buffer_length, 0); + IP_ADAPTER_ADDRESSES* adapter = reinterpret_cast<IP_ADAPTER_ADDRESSES*>(bytes.data()); + get_adapters_err = GetAdaptersAddresses(0, 0, nullptr, adapter, &buffer_length); + if (NO_ERROR != get_adapters_err) { + logger_->log_error("GetAdaptersAddresses failed: %lu", get_adapters_err); + return network_adapters; + } + while (adapter != nullptr) { + NetworkInterfaceInfo interface_info(adapter); + if (filter(interface_info)) { + auto it = std::find_if(network_adapters.begin(), network_adapters.end(), HasName(interface_info.getName())); + if (it == network_adapters.end()) { + network_adapters.emplace_back(std::move(interface_info)); + } else { + interface_info.moveAddressesInto(*it); + } + } + if (max_interfaces.has_value() && network_adapters.size() >= max_interfaces.value()) + return network_adapters; + adapter = adapter->Next; + } +#else + struct ifaddrs* interface_addresses = nullptr; + auto cleanup = gsl::finally([interface_addresses] { freeifaddrs(interface_addresses); }); + if (getifaddrs(&interface_addresses) == -1) { + logger_->log_error("getifaddrs failed: %s", std::strerror(errno)); + return network_adapters; + } + + for (struct ifaddrs* ifa = interface_addresses; ifa != nullptr; ifa = ifa->ifa_next) { + if (!ifa->ifa_addr) + continue; + NetworkInterfaceInfo interface_info(ifa); + if (filter(interface_info)) { + auto it = std::find_if(network_adapters.begin(), network_adapters.end(), HasName(interface_info.getName())); + if (it == network_adapters.end()) { + network_adapters.emplace_back(std::move(interface_info)); + } else { + interface_info.moveAddressesInto(*it); + } + } + if (max_interfaces.has_value() && network_adapters.size() >= max_interfaces.value()) + return network_adapters; + } +#endif + return network_adapters; +} + +namespace { +void move_append(std::vector<std::string> &&source, std::vector<std::string> &destination) { + destination.reserve(destination.size() + source.size()); + std::move(std::begin(source), std::end(source), std::back_inserter(destination)); + source.clear(); +} +} // namespace + +void NetworkInterfaceInfo::moveAddressesInto(NetworkInterfaceInfo& destination) { + move_append(std::move(ip_v4_addresses_), destination.ip_v4_addresses_); + move_append(std::move(ip_v6_addresses_), destination.ip_v6_addresses_); +} + +} /* namespace utils */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ diff --git a/libminifi/src/utils/OsUtils.cpp b/libminifi/src/utils/OsUtils.cpp index 5d38279..38ab30c 100644 --- a/libminifi/src/utils/OsUtils.cpp +++ b/libminifi/src/utils/OsUtils.cpp @@ -22,6 +22,7 @@ #include <map> #include "utils/gsl.h" +#include "Exception.h" #ifdef __linux__ #include <sys/sysinfo.h> @@ -36,16 +37,19 @@ #include <Windows.h> #include <sddl.h> #include <psapi.h> +#include <winsock2.h> #include <vector> #include <algorithm> +#include <WS2tcpip.h> #pragma comment(lib, "Ws2_32.lib") #else #include <sys/utsname.h> #include <pwd.h> #include <sys/types.h> +#include <arpa/inet.h> #include <unistd.h> #include <fstream> - +#include <cstring> #endif #ifdef __APPLE__ @@ -312,8 +316,44 @@ std::string OsUtils::getMachineArchitecture() { else return buf.machine; #endif + return "unknown"; } + +namespace { +std::string get_last_socket_error_message() { +#ifdef WIN32 + const auto error_code = WSAGetLastError(); +#else + const auto error_code = errno; +#endif /* WIN32 */ + return std::system_category().message(error_code); +} +} + +std::string OsUtils::sockaddr_ntop(const sockaddr* const sa) { + std::string result; + if (sa->sa_family == AF_INET) { + sockaddr_in sa_in{}; + std::memcpy(reinterpret_cast<void*>(&sa_in), sa, sizeof(sockaddr_in)); + result.resize(INET_ADDRSTRLEN); + if (inet_ntop(AF_INET, &sa_in.sin_addr, &result[0], INET_ADDRSTRLEN) == nullptr) { + throw minifi::Exception{ minifi::ExceptionType::GENERAL_EXCEPTION, get_last_socket_error_message() }; + } + } else if (sa->sa_family == AF_INET6) { + sockaddr_in6 sa_in6{}; + std::memcpy(reinterpret_cast<void*>(&sa_in6), sa, sizeof(sockaddr_in6)); + result.resize(INET6_ADDRSTRLEN); + if (inet_ntop(AF_INET6, &sa_in6.sin6_addr, &result[0], INET6_ADDRSTRLEN) == nullptr) { + throw minifi::Exception{ minifi::ExceptionType::GENERAL_EXCEPTION, get_last_socket_error_message() }; + } + } else { + throw minifi::Exception{ minifi::ExceptionType::GENERAL_EXCEPTION, "sockaddr_ntop: unknown address family" }; + } + result.resize(strlen(result.c_str())); // discard remaining null bytes at the end + return result; +} + } // namespace utils } // namespace minifi } // namespace nifi diff --git a/libminifi/test/unit/NetworkInterfaceInfoTests.cpp b/libminifi/test/unit/NetworkInterfaceInfoTests.cpp new file mode 100644 index 0000000..240204b --- /dev/null +++ b/libminifi/test/unit/NetworkInterfaceInfoTests.cpp @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "../TestBase.h" +#include "utils/NetworkInterfaceInfo.h" + +namespace utils = org::apache::nifi::minifi::utils; + +TEST_CASE("NetworkInterfaceInfo test", "[testnetworkinterfaceinfo]") { + auto network_interface_infos = utils::NetworkInterfaceInfo::getNetworkInterfaceInfos(); + REQUIRE(network_interface_infos.size() > 0); + + auto valid_interface_name = network_interface_infos.begin()->getName(); + auto filter = [&valid_interface_name](const utils::NetworkInterfaceInfo& interface_info) -> bool { return interface_info.getName() == valid_interface_name; }; + REQUIRE(utils::NetworkInterfaceInfo::getNetworkInterfaceInfos(filter, 1).size() == 1); +} + +bool is_localhost(const std::string& ip_address) { + return ip_address == "127.0.0.1"; +} + +TEST_CASE("NetworkInterfaceInfo isLoopback test", "[testnetworkinterfaceinfoloopback]") { + auto network_interface_infos = utils::NetworkInterfaceInfo::getNetworkInterfaceInfos([] (const utils::NetworkInterfaceInfo& interface_info) -> bool { return !interface_info.isLoopback();}); + for (auto& network_interface_info : network_interface_infos) { + auto& ip_v4_addresses = network_interface_info.getIpV4Addresses(); + REQUIRE(std::none_of(ip_v4_addresses.begin(), ip_v4_addresses.end(), is_localhost)); + } +} diff --git a/libminifi/test/unit/OpenTelemetryLogDataModelTests.cpp b/libminifi/test/unit/OpenTelemetryLogDataModelTests.cpp new file mode 100644 index 0000000..9e2eccf --- /dev/null +++ b/libminifi/test/unit/OpenTelemetryLogDataModelTests.cpp @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "../TestBase.h" +#include "utils/OpenTelemetryLogDataModelUtils.h" + +namespace utils = org::apache::nifi::minifi::utils; + +TEST_CASE("OpenTelemetryLogDataModel tests", "[testnetworkadapter]") { + rapidjson::Document document(rapidjson::kObjectType); + utils::OpenTelemetryLogDataModel::appendEventInformation(document, "testevent"); + utils::OpenTelemetryLogDataModel::appendHostInformation(document); + utils::OpenTelemetryLogDataModel::appendBody(document); + REQUIRE(document.HasMember("Body")); + REQUIRE(document.HasMember("Name")); + REQUIRE(document.HasMember("Timestamp")); + REQUIRE(document.HasMember("Resource")); +}
