This is an automated email from the ASF dual-hosted git repository. martinzink pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 2ec34d7f127b05ad22b6547504939f3408006d20 Author: Ferenc Gerlits <[email protected]> AuthorDate: Mon Feb 14 10:06:21 2022 +0100 MINIFICPP-1734 Make TailFile capable of collecting logs from Kubernetes Closes #1257 Signed-off-by: Martin Zink <[email protected]> --- CMakeLists.txt | 2 +- CONTROLLERS.md | 20 ++ PROCESSORS.md | 3 +- README.md | 7 +- bootstrap.sh | 4 +- bstrp_functions.sh | 6 +- examples/kubernetes_tailfile_config.yml | 63 ++++++ extensions/kubernetes/CMakeLists.txt | 9 +- extensions/kubernetes/KubernetesClientPOC.cpp | 123 ------------ .../KubernetesControllerService.cpp | 212 +++++++++++++++++++++ .../KubernetesControllerService.h | 56 ++++++ .../standard-processors/processors/TailFile.cpp | 128 +++++++++---- .../standard-processors/processors/TailFile.h | 29 +-- .../tests/unit/TailFileTests.cpp | 72 ++++++- .../include/controllers/AttributeProviderService.h | 40 ++++ 15 files changed, 590 insertions(+), 184 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index f887c1a..43b68db 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -17,7 +17,7 @@ # under the License. # -cmake_minimum_required(VERSION 3.16) +cmake_minimum_required(VERSION 3.17) cmake_policy(SET CMP0096 NEW) # policy to preserve the leading zeros in PROJECT_VERSION_{MAJOR,MINOR,PATCH,TWEAK} cmake_policy(SET CMP0065 OLD) # default export policy, required for self-dlopen project(nifi-minifi-cpp VERSION 0.12.0) diff --git a/CONTROLLERS.md b/CONTROLLERS.md index 42ab33e..3f864eb 100644 --- a/CONTROLLERS.md +++ b/CONTROLLERS.md @@ -19,6 +19,8 @@ - [AzureStorageCredentialsService](#azureStorageCredentialsService) - [AWSCredentialsService](#awsCredentialsService) +- [KubernetesControllerService](#kubernetesControllerService) + ## AWSCredentialsService @@ -40,6 +42,7 @@ properties (not in bold) are considered optional. |Secret Key|||Specifies the AWS Secret Key| |Credentials File|||Path to a file containing AWS access key and secret key in properties file format. Properties used: accessKey and secretKey| + ## AzureStorageCredentialsService ### Description @@ -60,3 +63,20 @@ properties (not in bold) are considered optional. |Common Storage Account Endpoint Suffix|||Storage accounts in public Azure always use a common FQDN suffix. Override this endpoint suffix with a different suffix in certain circumstances (like Azure Stack or non-public Azure regions).| |Connection String|||Connection string used to connect to Azure Storage service. This overrides all other set credential properties if Managed Identity is not used.| |**Use Managed Identity Credentials**|false||Connection string used to connect to Azure Storage service. This overrides all other set credential properties.| + + +## KubernetesControllerService + +### Description + +Controller service that provides access to the Kubernetes API. + +### Properties + +In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. + +| Name | Default Value | Allowable Values | Description | +| - | - | - | - | +|Namespace Filter|default||Limit the output to pods in namespaces which match this regular expression| +|Pod Name Filter|||If present, limit the output to pods the name of which matches this regular expression| +|Container Name Filter|||If present, limit the output to containers the name of which matches this regular expression| diff --git a/PROCESSORS.md b/PROCESSORS.md index 966f4f7..7d2df44 100644 --- a/PROCESSORS.md +++ b/PROCESSORS.md @@ -1862,11 +1862,12 @@ In the list below, the names of required properties appear in bold. Any other pr | Name | Default Value | Allowable Values | Description | | - | - | - | - | +|Attribute Provider Service|||Provides a list of key-value pair records which can be used in the Base Directory property using Expression Language. Requires Multiple file mode.| |File to Tail|||Fully-qualified filename of the file that should be tailed when using single file mode, or a file regex when using multifile mode| |**Initial Start Position**|Beginning of File|Beginning of Time<br>Beginning of File<br>Current Time|When the Processor first begins to tail data, this property specifies where the Processor should begin reading data. Once data has been ingested from a file, the Processor will continue from the last point from which it has received data.<br>Beginning of Time: Start with the oldest data that matches the Rolling Filename Pattern and then begin reading from the File to Tail.<br>Beginning of [...] |Input Delimiter|||Specifies the character that should be used for delimiting the data being tailedfrom the incoming file.If none is specified, data will be ingested as it becomes available.| |State File|TailFileState||Specifies the file that should be used for storing state about what data has been ingested so that upon restart NiFi can resume from where it left off| -|tail-base-directory|||| +|tail-base-directory|||Base directory used to look for files to tail. This property is required when using Multiple file mode. Can contain expression language placeholders if Attribute Provider Service is set.<br/>**Supports Expression Language: true**| |**tail-mode**|Single file|Single file<br>Multiple file<br>|Specifies the tail file mode. In 'Single file' mode only a single file will be watched. In 'Multiple file' mode a regex may be used. Note that in multiple file mode we will still continue to watch for rollover on the initial set of watched files. The Regex used to locate multiple files will be run during the schedule phrase. Note that if rotated files are matched by the regex, those files will be tailed.| ### Relationships diff --git a/README.md b/README.md index 6011698..bd49da3 100644 --- a/README.md +++ b/README.md @@ -73,7 +73,7 @@ The next table outlines CMAKE flags that correspond with MiNiFi extensions. Exte Through JNI extensions you can run NiFi processors using NARs. The JNI extension set allows you to run these Java processors. MiNiFi C++ will favor C++ implementations over Java implements. In the case where a processor is implemented in either language, the one in C++ will be selected; however, will remain transparent to the consumer. -| Extension Set | Processors | CMAKE Flag | +| Extension Set | Processors and Controller Services | CMAKE Flag | | ------------- |:-------------| :-----| | Archive Extensions | [ApplyTemplate](PROCESSORS.md#applytemplate)<br/>[CompressContent](PROCESSORS.md#compresscontent)<br/>[ManipulateArchive](PROCESSORS.md#manipulatearchive)<br/>[MergeContent](PROCESSORS.md#mergecontent)<br/>[FocusArchiveEntry](PROCESSORS.md#focusarchiveentry)<br/>[UnfocusArchiveEntry](PROCESSORS.md#unfocusarchiveentry) | -DBUILD_LIBARCHIVE=ON | | AWS | [AWSCredentialsService](CONTROLLERS.md#awscredentialsservice)<br/>[PutS3Object](PROCESSORS.md#puts3object)<br/>[DeleteS3Object](PROCESSORS.md#deletes3object)<br/>[FetchS3Object](PROCESSORS.md#fetchs3object)<br/>[ListS3](PROCESSORS.md#lists3) | -DENABLE_AWS=ON | @@ -82,6 +82,7 @@ Through JNI extensions you can run NiFi processors using NARs. The JNI extension | CURL | [InvokeHTTP](PROCESSORS.md#invokehttp) | -DDISABLE_CURL=ON | | GPS | GetGPS | -DENABLE_GPS=ON | | Kafka | [PublishKafka](PROCESSORS.md#publishkafka) | -DENABLE_LIBRDKAFKA=ON | +| Kubernetes | [KubernetesControllerService](CONTROLLERS.md#kubernetesControllerService) | -DENABLE_KUBERNETES=ON | | JNI | **NiFi Processors** | -DENABLE_JNI=ON | | MQTT | [ConsumeMQTT](PROCESSORS.md#consumeMQTT)<br/>[PublishMQTT](PROCESSORS.md#publishMQTT) | -DENABLE_MQTT=ON | | OPC | [FetchOPCProcessor](PROCESSORS.md#fetchopcprocessor) | -DENABLE_OPC=ON | @@ -113,8 +114,8 @@ Through JNI extensions you can run NiFi processors using NARs. The JNI extension ### To build #### Utilities -* CMake 3.16 or greater -* gcc 8 or greater +* CMake 3.17 or greater +* gcc 10 or greater * bison 3.0.x+ (3.2 has been shown to fail builds) * flex 2.6 or greater diff --git a/bootstrap.sh b/bootstrap.sh index 7d54da0..6494ae3 100755 --- a/bootstrap.sh +++ b/bootstrap.sh @@ -19,7 +19,7 @@ script_directory="$(cd "$(dirname "$0")" && pwd)" CMAKE_GLOBAL_MIN_VERSION_MAJOR=3 -CMAKE_GLOBAL_MIN_VERSION_MINOR=16 +CMAKE_GLOBAL_MIN_VERSION_MINOR=17 CMAKE_GLOBAL_MIN_VERSION_REVISION=0 export RED='\033[0;101m' @@ -290,6 +290,8 @@ add_disabled_option AWS_ENABLED ${FALSE} "ENABLE_AWS" add_disabled_option KAFKA_ENABLED ${FALSE} "ENABLE_LIBRDKAFKA" +add_disabled_option KUBERNETES_ENABLED ${FALSE} "ENABLE_KUBERNETES" + add_disabled_option MQTT_ENABLED ${FALSE} "ENABLE_MQTT" add_disabled_option PYTHON_ENABLED ${FALSE} "ENABLE_PYTHON" diff --git a/bstrp_functions.sh b/bstrp_functions.sh index cccfde0..93e4c7d 100755 --- a/bstrp_functions.sh +++ b/bstrp_functions.sh @@ -392,6 +392,7 @@ show_supported_features() { echo "Y. Systemd Support .............$(print_feature_status SYSTEMD_ENABLED)" echo "Z. NanoFi Support ..............$(print_feature_status NANOFI_ENABLED)" echo "AA. Splunk Support .............$(print_feature_status SPLUNK_ENABLED)" + echo "AB. Kubernetes Support .........$(print_feature_status KUBERNETES_ENABLED)" echo "****************************************" echo " Build Options." echo "****************************************" @@ -414,7 +415,7 @@ show_supported_features() { read_feature_options(){ local choice - echo -n "Enter choice [ A - Z or AA or 1-7] " + echo -n "Enter choice [A-Z or AA-AB or 1-7] " read -r choice choice=$(echo "${choice}" | tr '[:upper:]' '[:lower:]') case $choice in @@ -447,6 +448,7 @@ read_feature_options(){ y) ToggleFeature SYSTEMD_ENABLED ;; z) ToggleFeature NANOFI_ENABLED ;; aa) ToggleFeature SPLUNK_ENABLED ;; + ab) ToggleFeature KUBERNETES_ENABLED ;; 1) ToggleFeature TESTS_ENABLED ;; 2) EnableAllFeatures ;; 3) ToggleFeature JNI_ENABLED;; @@ -465,7 +467,7 @@ read_feature_options(){ fi ;; q) exit 0;; - *) echo -e "${RED}Please enter an option A-Z or AA or 1-7...${NO_COLOR}" && sleep 2 + *) echo -e "${RED}Please enter an option A-Z or AA-AB or 1-7...${NO_COLOR}" && sleep 2 esac } diff --git a/examples/kubernetes_tailfile_config.yml b/examples/kubernetes_tailfile_config.yml new file mode 100644 index 0000000..14bde35 --- /dev/null +++ b/examples/kubernetes_tailfile_config.yml @@ -0,0 +1,63 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +MiNiFi Config Version: 3 +Flow Controller: + name: Read Kubernetes pod logs and publish them to Kafka +Processors: +- name: Tail Kubernetes log files + id: 891efd7f-2814-4068-9efd-5f258f4990d5 + class: org.apache.nifi.minifi.processors.TailFile + scheduling strategy: TIMER_DRIVEN + scheduling period: 1 sec + Properties: + tail-mode: Multiple file + tail-base-directory: /var/log/pods/${namespace}_${pod}_${uid}/${container} + File to Tail: '.*\.log' + Attribute Provider Service: KubernetesControllerService + Lookup frequency: 10 min + Recursive lookup: 'false' + Initial Start Position: Beginning of File + Input Delimiter: \n + Rolling Filename Pattern: '${filename}.log.*' +- name: Publish messages to Kafka topic test + id: fb880b73-bff7-4775-a854-e048ae09e07e + class: org.apache.nifi.processors.standard.PublishKafka + scheduling strategy: EVENT_DRIVEN + auto-terminated relationships list: + - success + - failure + Properties: + Batch Size: '10' + Client Name: test-client + Compress Codec: none + Delivery Guarantee: '1' + Known Brokers: kafka-broker:9092 + Message Timeout: 12 sec + Request Timeout: 10 sec + Topic Name: test +Connections: +- name: TailFile/success/PublishKafka + id: d6675f90-62a6-4f98-b67f-00efeab78e5e + source id: 891efd7f-2814-4068-9efd-5f258f4990d5 + source relationship name: success + destination id: fb880b73-bff7-4775-a854-e048ae09e07e +Controller Services: +- name: KubernetesControllerService + id: 568559dc-3c81-4b01-bc08-760fadb953b0 + type: org.apache.nifi.minifi.controllers.KubernetesControllerService + Properties: + Namespace Filter: default +Remote Process Groups: [] diff --git a/extensions/kubernetes/CMakeLists.txt b/extensions/kubernetes/CMakeLists.txt index 5cd2b76..27c2762 100644 --- a/extensions/kubernetes/CMakeLists.txt +++ b/extensions/kubernetes/CMakeLists.txt @@ -17,9 +17,10 @@ include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt) -file(GLOB SOURCES "*.cpp") -add_executable(minifi-kubernetes ${SOURCES}) -target_link_libraries(minifi-kubernetes ${LIBMINIFI} kubernetes CURL::libcurl) +file(GLOB SOURCES "*.cpp" "controllerservice/*.cpp") +add_library(minifi-kubernetes-extensions SHARED ${SOURCES}) +target_link_libraries(minifi-kubernetes-extensions ${LIBMINIFI} kubernetes CURL::libcurl) -register_extension(minifi-kubernetes) +set(KUBERNETES-EXTENSIONS minifi-kubernetes-extensions PARENT_SCOPE) +register_extension(minifi-kubernetes-extensions) register_extension_linter(minifi-kubernetes-extensions-linter) diff --git a/extensions/kubernetes/KubernetesClientPOC.cpp b/extensions/kubernetes/KubernetesClientPOC.cpp deleted file mode 100644 index 4a24ff0..0000000 --- a/extensions/kubernetes/KubernetesClientPOC.cpp +++ /dev/null @@ -1,123 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <errno.h> -#include <malloc.h> -#include <stdio.h> -#include <string> - -extern "C" { -#include "config/incluster_config.h" -#include "config/kube_config.h" -#include "include/apiClient.h" -#include "api/CoreV1API.h" -} - -void list_pod(apiClient_t *apiClient) { - v1_pod_list_t *pod_list = NULL; - std::string name_space = "default"; - std::string field_selector = "spec.nodeName=kind-control-plane"; - pod_list = CoreV1API_listNamespacedPod(apiClient, - name_space.data(), - NULL, /* pretty */ - 0, /* allowWatchBookmarks */ - NULL, /* continue */ - field_selector.data(), - NULL, /* labelSelector */ - 0, /* limit */ - NULL, /* resourceVersion */ - NULL, /* resourceVersionMatch */ - 0, /* timeoutSeconds */ - 0 /* watch */); - printf("The return code of HTTP request=%ld\n", apiClient->response_code); - if (pod_list) { - printf("Get pod list:\n"); - listEntry_t *listEntry = NULL; - v1_pod_t *pod = NULL; - list_ForEach(listEntry, pod_list->items) { - pod = static_cast<v1_pod_t *>(listEntry->data); - printf("\tThe pod name: %s\n", pod->metadata->name); - v1_container_t *container = NULL; - listEntry_t *containerEntry = NULL; - list_ForEach(containerEntry, pod->spec->containers) { - container = static_cast<v1_container_t *>(containerEntry->data); - printf("\tThe container name: %s\n", container->name); - } - } - v1_pod_list_free(pod_list); - pod_list = NULL; - } else { - printf("Cannot get any pod.\n"); - } -} - -void list_namespaces(apiClient_t *apiClient) { - v1_namespace_list_t *namespace_list = NULL; - namespace_list = CoreV1API_listNamespace(apiClient, - NULL /*pretty*/, - 0 /*allowWatchBookmarks*/, - NULL /*_continue*/, - NULL /*fieldSelector */, - NULL /*labelSelector*/, - 0 /*limit*/ , - NULL /*resourceVersion*/, - NULL /*resourceVersionMatch*/, - 0 /*timeoutSeconds*/, - 0 /*watch*/); - printf("The return code of HTTP request=%ld\n", apiClient->response_code); - if (namespace_list) { - printf("Get namespaces list:\n"); - listEntry_t *listEntry = NULL; - v1_namespace_t *ns = NULL; - list_ForEach(listEntry, namespace_list->items) { - ns = static_cast<v1_namespace_t *>(listEntry->data); - printf("\tThe namespace name: %s\n", ns->metadata->name); - } - v1_namespace_list_free(namespace_list); - namespace_list = NULL; - } else { - printf("Cannot get any pod.\n"); - } -} - -int main() { - char *basePath = NULL; - sslConfig_t *sslConfig = NULL; - list_t *apiKeys = NULL; - int rc = load_incluster_config(&basePath, &sslConfig, &apiKeys); - if (rc != 0) { - printf("Cannot load kubernetes configuration in cluster.\n"); - return -1; - } - apiClient_t *apiClient = apiClient_create_with_base_path(basePath, sslConfig, apiKeys); - if (!apiClient) { - printf("Cannot create a kubernetes client.\n"); - return -1; - } - list_pod(apiClient); - list_namespaces(apiClient); - - apiClient_free(apiClient); - apiClient = NULL; - free_client_config(basePath, sslConfig, apiKeys); - basePath = NULL; - sslConfig = NULL; - apiKeys = NULL; - apiClient_unsetupGlobalEnv(); - - return 0; -} diff --git a/extensions/kubernetes/controllerservice/KubernetesControllerService.cpp b/extensions/kubernetes/controllerservice/KubernetesControllerService.cpp new file mode 100644 index 0000000..3322db0 --- /dev/null +++ b/extensions/kubernetes/controllerservice/KubernetesControllerService.cpp @@ -0,0 +1,212 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "KubernetesControllerService.h" + +#include <vector> + +extern "C" { +#include "config/incluster_config.h" +#include "config/kube_config.h" +#include "include/apiClient.h" +#include "api/CoreV1API.h" +} + +#include "core/Resource.h" +#include "core/logging/LoggerConfiguration.h" +#include "Exception.h" +#include "utils/gsl.h" +#include "utils/StringUtils.h" + +namespace org::apache::nifi::minifi::controllers { + +class KubernetesControllerService::APIClient { + public: + APIClient(); + ~APIClient() noexcept; + + APIClient(APIClient&&) = delete; + APIClient(const APIClient&) = delete; + APIClient& operator=(APIClient&&) = delete; + APIClient& operator=(const APIClient&) = delete; + + [[nodiscard]] gsl::not_null<apiClient_t*> getClient() const noexcept { return api_client_; } + + private: + char* base_path_ = nullptr; + sslConfig_t* ssl_config_ = nullptr; + list_t* api_keys_ = nullptr; + gsl::not_null<apiClient_t*> api_client_; +}; + +KubernetesControllerService::APIClient::APIClient() + : api_client_([this] { + int rc = load_incluster_config(&base_path_, &ssl_config_, &api_keys_); + if (rc != 0) { + throw std::runtime_error(utils::StringUtils::join_pack("load_incluster_config() failed with error code ", std::to_string(rc))); + } + const auto api_client = apiClient_create_with_base_path(base_path_, ssl_config_, api_keys_); + if (!api_client) { + throw std::runtime_error("apiClient_create_with_base_path() failed"); + } + return gsl::make_not_null(api_client); + }()) { +} + +KubernetesControllerService::APIClient::~APIClient() noexcept { + apiClient_free(api_client_); + free_client_config(base_path_, ssl_config_, api_keys_); + apiClient_unsetupGlobalEnv(); +} + +const core::Property KubernetesControllerService::NamespaceFilter{ + core::PropertyBuilder::createProperty("Namespace Filter") + ->withDescription("Limit the output to pods in namespaces which match this regular expression") + ->withDefaultValue<std::string>("default") + ->build()}; +const core::Property KubernetesControllerService::PodNameFilter{ + core::PropertyBuilder::createProperty("Pod Name Filter") + ->withDescription("If present, limit the output to pods the name of which matches this regular expression") + ->build()}; +const core::Property KubernetesControllerService::ContainerNameFilter{ + core::PropertyBuilder::createProperty("Container Name Filter") + ->withDescription("If present, limit the output to containers the name of which matches this regular expression") + ->build()}; + +KubernetesControllerService::KubernetesControllerService(const std::string& name, const utils::Identifier& uuid) + : AttributeProviderService(name, uuid), + logger_{core::logging::LoggerFactory<KubernetesControllerService>::getLogger()} { +} + +KubernetesControllerService::KubernetesControllerService(const std::string& name, const std::shared_ptr<Configure>& configuration) + : KubernetesControllerService{name} { + setConfiguration(configuration); + initialize(); +} + +void KubernetesControllerService::initialize() { + std::lock_guard<std::mutex> lock(initialization_mutex_); + if (initialized_) { return; } + + ControllerService::initialize(); + setSupportedProperties({NamespaceFilter, PodNameFilter, ContainerNameFilter}); + initialized_ = true; +} + +void KubernetesControllerService::onEnable() { + try { + api_client_ = std::make_unique<APIClient>(); + } catch (const std::runtime_error& ex) { + logger_->log_error("Could not create the API client in the Kubernetes Controller Service: %s", ex.what()); + } + + std::string namespace_filter; + if (getProperty(NamespaceFilter.getName(), namespace_filter) && !namespace_filter.empty()) { + namespace_filter_ = std::regex{namespace_filter}; + } + + std::string pod_name_filter; + if (getProperty(PodNameFilter.getName(), pod_name_filter) && !pod_name_filter.empty()) { + pod_name_filter_ = std::regex{pod_name_filter}; + } + + std::string container_name_filter; + if (getProperty(ContainerNameFilter.getName(), container_name_filter) && !container_name_filter.empty()) { + container_name_filter_ = std::regex{container_name_filter}; + } +} + +namespace { + +struct v1_pod_list_t_deleter { + void operator()(v1_pod_list_t* ptr) const noexcept { v1_pod_list_free(ptr); } +}; +using v1_pod_list_unique_ptr = std::unique_ptr<v1_pod_list_t, v1_pod_list_t_deleter>; + +v1_pod_list_unique_ptr getPods(gsl::not_null<apiClient_t*> api_client, core::logging::Logger& logger) { + logger.log_info("Calling Kubernetes API listPodForAllNamespaces..."); + v1_pod_list_unique_ptr pod_list{CoreV1API_listPodForAllNamespaces(api_client, + 0, // allowWatchBookmarks + nullptr, // continue + nullptr, // fieldSelector + nullptr, // labelSelector + 0, // limit + nullptr, // pretty + nullptr, // resourceVersion + nullptr, // resourceVersionMatch + 0, // timeoutSeconds + 0)}; // watch + logger.log_info("The return code of the Kubernetes API listPodForAllNamespaces call: %ld", api_client->response_code); + return pod_list; +} + +} // namespace + +std::optional<std::vector<KubernetesControllerService::AttributeMap>> KubernetesControllerService::getAttributes() { + if (!api_client_) { + logger_->log_warn("The Kubernetes client is not valid, unable to call the Kubernetes API"); + return std::nullopt; + } + + const auto pod_list = getPods(api_client_->getClient(), *logger_); + if (!pod_list) { + logger_->log_warn("Could not find any Kubernetes pods"); + return std::nullopt; + } + + std::vector<AttributeMap> container_attribute_maps; + + listEntry_t* pod_entry = nullptr; + list_ForEach(pod_entry, pod_list->items) { + const auto pod = static_cast<v1_pod_t*>(pod_entry->data); + + std::string name_space{pod->metadata->_namespace}; + std::string pod_name{pod->metadata->name}; + std::string uid{pod->metadata->uid}; + + listEntry_t* container_entry = nullptr; + list_ForEach(container_entry, pod->spec->containers) { + auto container = static_cast<v1_container_t*>(container_entry->data); + + std::string container_name{container->name}; + + if (matchesRegexFilters(name_space, pod_name, container_name)) { + container_attribute_maps.push_back(AttributeMap{ + {"namespace", name_space}, + {"pod", pod_name}, + {"uid", uid}, + {"container", container_name}}); + } + } + } + + logger_->log_info("Found %zu containers (after regex filtering) in %ld Kubernetes pods (unfiltered)", container_attribute_maps.size(), pod_list->items->count); + return container_attribute_maps; +} + +bool KubernetesControllerService::matchesRegexFilters(const std::string& name_space, const std::string& pod_name, const std::string& container_name) const { + static constexpr auto matchesFilter = [](const std::string& target, const std::optional<std::regex>& filter) { + return !filter || std::regex_match(target, *filter); + }; + return matchesFilter(name_space, namespace_filter_) && + matchesFilter(pod_name, pod_name_filter_) && + matchesFilter(container_name, container_name_filter_); +} + +REGISTER_RESOURCE(KubernetesControllerService, "Controller service that provides access to the Kubernetes API"); + +} // namespace org::apache::nifi::minifi::controllers diff --git a/extensions/kubernetes/controllerservice/KubernetesControllerService.h b/extensions/kubernetes/controllerservice/KubernetesControllerService.h new file mode 100644 index 0000000..b088e3b --- /dev/null +++ b/extensions/kubernetes/controllerservice/KubernetesControllerService.h @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include <memory> +#include <string> +#include <vector> + +#include "controllers/AttributeProviderService.h" +#include "core/logging/Logger.h" +#include "core/Property.h" + +namespace org::apache::nifi::minifi::controllers { + +class KubernetesControllerService : public AttributeProviderService { + public: + EXTENSIONAPI static const core::Property NamespaceFilter; + EXTENSIONAPI static const core::Property PodNameFilter; + EXTENSIONAPI static const core::Property ContainerNameFilter; + + explicit KubernetesControllerService(const std::string& name, const utils::Identifier& uuid = {}); + KubernetesControllerService(const std::string& name, const std::shared_ptr<Configure>& configuration); + + void initialize() final; + void onEnable() override; + std::optional<std::vector<AttributeMap>> getAttributes() override; + + private: + class APIClient; + + bool matchesRegexFilters(const std::string& name_space, const std::string& pod_name, const std::string& container_name) const; + + std::mutex initialization_mutex_; + bool initialized_ = false; + std::optional<std::regex> namespace_filter_; + std::optional<std::regex> pod_name_filter_; + std::optional<std::regex> container_name_filter_; + std::shared_ptr<core::logging::Logger> logger_; + std::unique_ptr<APIClient> api_client_; +}; + +} // namespace org::apache::nifi::minifi::controllers diff --git a/extensions/standard-processors/processors/TailFile.cpp b/extensions/standard-processors/processors/TailFile.cpp index d9a6a89..eac0d07 100644 --- a/extensions/standard-processors/processors/TailFile.cpp +++ b/extensions/standard-processors/processors/TailFile.cpp @@ -33,7 +33,10 @@ #include <regex> #include "range/v3/action/sort.hpp" +#include "range/v3/range/conversion.hpp" +#include "range/v3/view/transform.hpp" +#include "FlowFileRecord.h" #include "io/CRCStream.h" #include "utils/file/FileUtils.h" #include "utils/file/PathUtils.h" @@ -53,20 +56,20 @@ namespace nifi { namespace minifi { namespace processors { -core::Property TailFile::FileName( +const core::Property TailFile::FileName( core::PropertyBuilder::createProperty("File to Tail") ->withDescription("Fully-qualified filename of the file that should be tailed when using single file mode, or a file regex when using multifile mode") ->isRequired(true) ->build()); -core::Property TailFile::StateFile( +const core::Property TailFile::StateFile( core::PropertyBuilder::createProperty("State File") ->withDescription("DEPRECATED. Only use it for state migration from the legacy state file.") ->isRequired(false) ->withDefaultValue<std::string>("TailFileState") ->build()); -core::Property TailFile::Delimiter( +const core::Property TailFile::Delimiter( core::PropertyBuilder::createProperty("Input Delimiter") ->withDescription("Specifies the character that should be used for delimiting the data being tailed" "from the incoming file. If none is specified, data will be ingested as it becomes available.") @@ -74,7 +77,7 @@ core::Property TailFile::Delimiter( ->withDefaultValue<std::string>("\\n") ->build()); -core::Property TailFile::TailMode( +const core::Property TailFile::TailMode( core::PropertyBuilder::createProperty("tail-mode", "Tailing Mode") ->withDescription("Specifies the tail file mode. In 'Single file' mode only a single file will be watched. " "In 'Multiple file' mode a regex may be used. Note that in multiple file mode we will still continue to watch for rollover on the initial set of watched files. " @@ -82,13 +85,15 @@ core::Property TailFile::TailMode( ->withAllowableValue<std::string>("Single file")->withAllowableValue("Multiple file")->withDefaultValue("Single file") ->build()); -core::Property TailFile::BaseDirectory( +const core::Property TailFile::BaseDirectory( core::PropertyBuilder::createProperty("tail-base-directory", "Base Directory") - ->withDescription("Base directory used to look for files to tail. This property is required when using Multiple file mode.") + ->withDescription("Base directory used to look for files to tail. This property is required when using Multiple file mode. " + "Can contain expression language placeholders if Attribute Provider Service is set.") ->isRequired(false) + ->supportsExpressionLanguage(true) ->build()); -core::Property TailFile::RecursiveLookup( +const core::Property TailFile::RecursiveLookup( core::PropertyBuilder::createProperty("Recursive lookup") ->withDescription("When using Multiple file mode, this property determines whether files are tailed in " "child directories of the Base Directory or not.") @@ -96,7 +101,7 @@ core::Property TailFile::RecursiveLookup( ->withDefaultValue<bool>(false) ->build()); -core::Property TailFile::LookupFrequency( +const core::Property TailFile::LookupFrequency( core::PropertyBuilder::createProperty("Lookup frequency") ->withDescription("When using Multiple file mode, this property specifies the minimum duration " "the processor will wait between looking for new files to tail in the Base Directory.") @@ -104,7 +109,7 @@ core::Property TailFile::LookupFrequency( ->withDefaultValue<core::TimePeriodValue>("10 min") ->build()); -core::Property TailFile::RollingFilenamePattern( +const core::Property TailFile::RollingFilenamePattern( core::PropertyBuilder::createProperty("Rolling Filename Pattern") ->withDescription("If the file to tail \"rolls over\" as would be the case with log files, this filename pattern will be used to " "identify files that have rolled over so MiNiFi can read the remaining of the rolled-over file and then continue with the new log file. " @@ -114,7 +119,7 @@ core::Property TailFile::RollingFilenamePattern( ->withDefaultValue<std::string>("${filename}.*") ->build()); -core::Property TailFile::InitialStartPosition( +const core::Property TailFile::InitialStartPosition( core::PropertyBuilder::createProperty("Initial Start Position") ->withDescription("When the Processor first begins to tail data, this property specifies where the Processor should begin reading data. " "Once data has been ingested from a file, the Processor will continue from the last point from which it has received data.\n" @@ -127,7 +132,14 @@ core::Property TailFile::InitialStartPosition( ->withAllowableValues(InitialStartPositions::values()) ->build()); -core::Relationship TailFile::Success("success", "All files are routed to success"); +const core::Property TailFile::AttributeProviderService( + core::PropertyBuilder::createProperty("Attribute Provider Service") + ->withDescription("Provides a list of key-value pair records which can be used in the Base Directory property using Expression Language. " + "Requires Multiple file mode.") + ->asType<minifi::controllers::AttributeProviderService>() + ->build()); + +const core::Relationship TailFile::Success("success", "All files are routed to success"); const char *TailFile::CURRENT_STR = "CURRENT."; const char *TailFile::POSITION_STR = "POSITION."; @@ -315,25 +327,23 @@ class WholeFileReaderCallback : public OutputStreamCallback { } // namespace void TailFile::initialize() { - // Set the supported properties - std::set<core::Property> properties; - properties.insert(FileName); - properties.insert(StateFile); - properties.insert(Delimiter); - properties.insert(TailMode); - properties.insert(BaseDirectory); - properties.insert(RecursiveLookup); - properties.insert(LookupFrequency); - properties.insert(RollingFilenamePattern); - properties.insert(InitialStartPosition); - setSupportedProperties(properties); - // Set the supported relationships - std::set<core::Relationship> relationships; - relationships.insert(Success); - setSupportedRelationships(relationships); + setSupportedProperties({ + FileName, + StateFile, + Delimiter, + TailMode, + BaseDirectory, + RecursiveLookup, + LookupFrequency, + RollingFilenamePattern, + InitialStartPosition, + AttributeProviderService}); + setSupportedRelationships({Success}); } void TailFile::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory>& /*sessionFactory*/) { + gsl_Expects(context); + tail_states_.clear(); state_manager_ = context->getStateManager(); @@ -355,11 +365,13 @@ void TailFile::onSchedule(const std::shared_ptr<core::ProcessContext> &context, if (mode == "Multiple file") { tail_mode_ = Mode::MULTIPLE; + parseAttributeProviderServiceProperty(*context); + if (!context->getProperty(BaseDirectory.getName(), base_dir_)) { throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "Base directory is required for multiple tail mode."); } - if (!utils::file::is_directory(base_dir_)) { + if (!attribute_provider_service_ && !utils::file::is_directory(base_dir_)) { throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "Base directory does not exist or is not a directory"); } @@ -376,7 +388,7 @@ void TailFile::onSchedule(const std::shared_ptr<core::ProcessContext> &context, recoverState(context); - doMultifileLookup(); + doMultifileLookup(*context); } else { tail_mode_ = Mode::SINGLE; @@ -398,6 +410,24 @@ void TailFile::onSchedule(const std::shared_ptr<core::ProcessContext> &context, initial_start_position_ = InitialStartPositions{utils::parsePropertyWithAllowableValuesOrThrow(*context, InitialStartPosition.getName(), InitialStartPositions::values())}; } +void TailFile::parseAttributeProviderServiceProperty(core::ProcessContext& context) { + const auto attribute_provider_service_name = context.getProperty(AttributeProviderService); + if (!attribute_provider_service_name || attribute_provider_service_name->empty()) { + return; + } + + std::shared_ptr<core::controller::ControllerService> controller_service = context.getControllerService(*attribute_provider_service_name); + if (!controller_service) { + throw minifi::Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, utils::StringUtils::join_pack("Controller service '", *attribute_provider_service_name, "' not found")}; + } + + // we drop ownership of the service here -- in the long term, getControllerService() should return a non-owning pointer or optional reference + attribute_provider_service_ = dynamic_cast<minifi::controllers::AttributeProviderService*>(controller_service.get()); + if (!attribute_provider_service_) { + throw minifi::Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, utils::StringUtils::join_pack("Controller service '", *attribute_provider_service_name, "' is not an AttributeProviderService")}; + } +} + void TailFile::parseStateFileLine(char *buf, std::map<std::string, TailState> &state) const { char *line = buf; @@ -667,11 +697,13 @@ std::vector<TailState> TailFile::sortAndSkipMainFilePrefix(const TailState &stat return matched_files; } -void TailFile::onTrigger(const std::shared_ptr<core::ProcessContext> &, const std::shared_ptr<core::ProcessSession> &session) { +void TailFile::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) { + gsl_Expects(context && session); + if (tail_mode_ == Mode::MULTIPLE) { if (last_multifile_lookup_ + lookup_frequency_ < std::chrono::steady_clock::now()) { logger_->log_debug("Lookup frequency %" PRId64 " ms have elapsed, doing new multifile lookup", int64_t{lookup_frequency_.count()}); - doMultifileLookup(); + doMultifileLookup(*context); } else { logger_->log_trace("Skipping multifile lookup"); } @@ -812,9 +844,9 @@ void TailFile::updateStateAttributes(TailState &state, uint64_t size, uint64_t c state.checksum_ = checksum; } -void TailFile::doMultifileLookup() { +void TailFile::doMultifileLookup(core::ProcessContext& context) { checkForRemovedFiles(); - checkForNewFiles(); + checkForNewFiles(context); last_multifile_lookup_ = std::chrono::steady_clock::now(); } @@ -836,7 +868,7 @@ void TailFile::checkForRemovedFiles() { } } -void TailFile::checkForNewFiles() { +void TailFile::checkForNewFiles(core::ProcessContext& context) { auto add_new_files_callback = [&](const std::string &path, const std::string &file_name) -> bool { std::string full_file_name = path + utils::file::get_separator() + file_name; std::regex file_to_tail_regex(file_to_tail_); @@ -846,7 +878,33 @@ void TailFile::checkForNewFiles() { return true; }; - utils::file::list_dir(base_dir_, add_new_files_callback, logger_, recursive_lookup_); + if (attribute_provider_service_) { + for (const auto& base_dir : getBaseDirectories(context)) { + utils::file::list_dir(base_dir, add_new_files_callback, logger_, recursive_lookup_); + } + } else { + utils::file::list_dir(base_dir_, add_new_files_callback, logger_, recursive_lookup_); + } +} + +std::vector<std::string> TailFile::getBaseDirectories(core::ProcessContext& context) const { + gsl_Expects(attribute_provider_service_); + + const auto attribute_maps = attribute_provider_service_->getAttributes(); + if (!attribute_maps) { + logger_->log_error("Could not get attributes from the Attribute Provider Service"); + return {}; + } + + return attribute_maps.value() | + ranges::views::transform([&context](const auto& attribute_map) { + auto flow_file = std::make_shared<FlowFileRecord>(); + for (const auto& [key, value] : attribute_map) { + flow_file->setAttribute(key, value); + } + return context.getProperty(BaseDirectory, flow_file).value(); + }) | + ranges::to<std::vector<std::string>>(); } std::chrono::milliseconds TailFile::getLookupFrequency() const { diff --git a/extensions/standard-processors/processors/TailFile.h b/extensions/standard-processors/processors/TailFile.h index ba5fd49..91003ba 100644 --- a/extensions/standard-processors/processors/TailFile.h +++ b/extensions/standard-processors/processors/TailFile.h @@ -27,6 +27,7 @@ #include <vector> #include <set> +#include "controllers/AttributeProviderService.h" #include "FlowFileRecord.h" #include "core/Processor.h" #include "core/ProcessSession.h" @@ -91,18 +92,19 @@ class TailFile : public core::Processor { EXTENSIONAPI static constexpr char const* ProcessorName = "TailFile"; // Supported Properties - EXTENSIONAPI static core::Property FileName; - EXTENSIONAPI static core::Property StateFile; - EXTENSIONAPI static core::Property Delimiter; - EXTENSIONAPI static core::Property TailMode; - EXTENSIONAPI static core::Property BaseDirectory; - EXTENSIONAPI static core::Property RecursiveLookup; - EXTENSIONAPI static core::Property LookupFrequency; - EXTENSIONAPI static core::Property RollingFilenamePattern; - EXTENSIONAPI static core::Property InitialStartPosition; + EXTENSIONAPI static const core::Property FileName; + EXTENSIONAPI static const core::Property StateFile; + EXTENSIONAPI static const core::Property Delimiter; + EXTENSIONAPI static const core::Property TailMode; + EXTENSIONAPI static const core::Property BaseDirectory; + EXTENSIONAPI static const core::Property RecursiveLookup; + EXTENSIONAPI static const core::Property LookupFrequency; + EXTENSIONAPI static const core::Property RollingFilenamePattern; + EXTENSIONAPI static const core::Property InitialStartPosition; + EXTENSIONAPI static const core::Property AttributeProviderService; // Supported Relationships - EXTENSIONAPI static core::Relationship Success; + EXTENSIONAPI static const core::Relationship Success; /** * Function that's executed when the processor is scheduled. @@ -143,6 +145,7 @@ class TailFile : public core::Processor { return true; } + void parseAttributeProviderServiceProperty(core::ProcessContext& context); void parseStateFileLine(char *buf, std::map<std::string, TailState> &state) const; void processAllRotatedFiles(const std::shared_ptr<core::ProcessSession> &session, TailState &state); void processRotatedFiles(const std::shared_ptr<core::ProcessSession> &session, TailState &state, std::vector<TailState> &rotated_file_states); @@ -160,9 +163,10 @@ class TailFile : public core::Processor { bool getStateFromStateManager(std::map<std::string, TailState> &state) const; bool getStateFromLegacyStateFile(const std::shared_ptr<core::ProcessContext>& context, std::map<std::string, TailState> &new_tail_states) const; - void doMultifileLookup(); + void doMultifileLookup(core::ProcessContext& context); void checkForRemovedFiles(); - void checkForNewFiles(); + void checkForNewFiles(core::ProcessContext& context); + std::vector<std::string> getBaseDirectories(core::ProcessContext& context) const; void updateFlowFileAttributes(const std::string &full_file_name, const TailState &state, const std::string &fileName, const std::string &baseName, const std::string &extension, std::shared_ptr<core::FlowFile> &flow_file) const; @@ -185,6 +189,7 @@ class TailFile : public core::Processor { std::string rolling_filename_pattern_; InitialStartPositions initial_start_position_; bool first_trigger_{true}; + controllers::AttributeProviderService* attribute_provider_service_ = nullptr; std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<TailFile>::getLogger(); }; diff --git a/extensions/standard-processors/tests/unit/TailFileTests.cpp b/extensions/standard-processors/tests/unit/TailFileTests.cpp index 362b789..b19ff05 100644 --- a/extensions/standard-processors/tests/unit/TailFileTests.cpp +++ b/extensions/standard-processors/tests/unit/TailFileTests.cpp @@ -38,6 +38,7 @@ #include "core/ProcessContext.h" #include "core/ProcessSession.h" #include "core/ProcessorNode.h" +#include "core/Resource.h" #include "TailFile.h" #include "LogAttribute.h" #include "utils/TestUtils.h" @@ -55,9 +56,12 @@ static const std::string NEW_TAIL_DATA = "newdata\n"; static const std::string ADDITIONALY_CREATED_FILE_CONTENT = "additional file data\n"; namespace { -std::string createTempFile(const std::string &directory, const std::string &file_name, const std::string &contents, +std::string createTempFile(const std::filesystem::path& directory, const std::filesystem::path& file_name, const std::string& contents, std::ios_base::openmode open_mode = std::ios::out | std::ios::binary) { - std::string full_file_name = directory + utils::file::get_separator() + file_name; + if (!utils::file::exists(directory.string())) { + std::filesystem::create_directories(directory); + } + std::string full_file_name = (directory / file_name).string(); std::ofstream tmpfile{full_file_name, open_mode}; tmpfile << contents; return full_file_name; @@ -1747,3 +1751,67 @@ TEST_CASE("Initial Start Position is set to invalid or empty value", "[initialSt REQUIRE_THROWS_AS(testController.runSession(plan), minifi::Exception); } + +TEST_CASE("TailFile onSchedule throws if an invalid Attribute Provider Service is found", "[configuration][AttributeProviderService]") { + TestController testController; + LogTestController::getInstance().setDebug<minifi::processors::TailFile>(); + + std::shared_ptr<TestPlan> plan = testController.createPlan(); + std::shared_ptr<core::Processor> tail_file = plan->addProcessor("TailFile", "tailfileProc"); + plan->setProperty(tail_file, minifi::processors::TailFile::TailMode.getName(), "Multiple file"); + plan->setProperty(tail_file, minifi::processors::TailFile::BaseDirectory.getName(), "/var/logs"); + plan->setProperty(tail_file, minifi::processors::TailFile::FileName.getName(), "minifi.log"); + plan->setProperty(tail_file, org::apache::nifi::minifi::processors::TailFile::AttributeProviderService.getName(), "this AttributeProviderService does not exist"); + + REQUIRE_THROWS_AS(plan->runNextProcessor(), minifi::Exception); +} + +namespace { + +class TestAttributeProviderService : public minifi::controllers::AttributeProviderService { + using AttributeProviderService::AttributeProviderService; + void initialize() override {}; + void onEnable() override {}; + std::optional<std::vector<AttributeMap>> getAttributes() override { + return std::vector<AttributeMap>{AttributeMap{{"color", "red"}, {"fruit", "apple"}, {"uid", "001"}, {"animal", "dog"}}, + AttributeMap{{"color", "yellow"}, {"fruit", "banana"}, {"uid", "004"}, {"animal", "dolphin"}}}; + } +}; +REGISTER_RESOURCE(TestAttributeProviderService, "An attribute provider service which provides a constant set of records."); + +} // namespace + +TEST_CASE("TailFile can use an AttributeProviderService", "[AttributeProviderService]") { + TestController testController; + LogTestController::getInstance().setTrace<minifi::processors::TailFile>(); + LogTestController::getInstance().setDebug<core::ProcessSession>(); + LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>(); + + std::filesystem::path temp_directory{testController.createTempDirectory()}; + + std::shared_ptr<TestPlan> plan = testController.createPlan(); + plan->addController("TestAttributeProviderService", "attribute_provider_service"); + std::shared_ptr<core::Processor> tail_file = plan->addProcessor("TailFile", "tail_file"); + plan->setProperty(tail_file, minifi::processors::TailFile::TailMode.getName(), "Multiple file"); + plan->setProperty(tail_file, minifi::processors::TailFile::BaseDirectory.getName(), (temp_directory / "my_${color}_${fruit}_${uid}" / "${animal}").string()); + plan->setProperty(tail_file, minifi::processors::TailFile::LookupFrequency.getName(), "0 sec"); + plan->setProperty(tail_file, minifi::processors::TailFile::FileName.getName(), ".*\\.log"); + plan->setProperty(tail_file, minifi::processors::TailFile::AttributeProviderService.getName(), "attribute_provider_service"); + std::shared_ptr<core::Processor> log_attribute = plan->addProcessor("LogAttribute", "log_attribute", core::Relationship("success", ""), true); + plan->setProperty(log_attribute, minifi::processors::LogAttribute::FlowFilesToLog.getName(), "0"); + + createTempFile(temp_directory / "my_red_apple_001" / "dog", "0.log", "Idared\n"); + createTempFile(temp_directory / "my_red_apple_001" / "dog", "1.log", "Jonagold\n"); + createTempFile(temp_directory / "my_red_strawberry_002" / "elephant", "0.log", "red strawberry\n"); + createTempFile(temp_directory / "my_yellow_apple_003" / "horse", "0.log", "yellow apple\n"); + createTempFile(temp_directory / "my_yellow_banana_004" / "dolphin", "0.log", "yellow banana\n"); + + testController.runSession(plan); + + CHECK(LogTestController::getInstance().contains("Logged 3 flow files")); + CHECK(LogTestController::getInstance().contains("key:absolute.path value:" + (temp_directory / "my_red_apple_001" / "dog" / "0.log").string())); + CHECK(LogTestController::getInstance().contains("key:absolute.path value:" + (temp_directory / "my_red_apple_001" / "dog" / "1.log").string())); + CHECK(LogTestController::getInstance().contains("key:absolute.path value:" + (temp_directory / "my_yellow_banana_004" / "dolphin" / "0.log").string())); + + LogTestController::getInstance().reset(); +} diff --git a/libminifi/include/controllers/AttributeProviderService.h b/libminifi/include/controllers/AttributeProviderService.h new file mode 100644 index 0000000..0c127e4 --- /dev/null +++ b/libminifi/include/controllers/AttributeProviderService.h @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include <string> +#include <optional> +#include <unordered_map> +#include <vector> + +#include "core/controller/ControllerService.h" + +namespace org::apache::nifi::minifi::controllers { + +class AttributeProviderService : public core::controller::ControllerService { + public: + using ControllerService::ControllerService; + + void yield() override {} + bool isRunning() override { return getState() == core::controller::ControllerServiceState::ENABLED; } + bool isWorkAvailable() override { return false; } + + using AttributeMap = std::unordered_map<std::string, std::string>; + virtual std::optional<std::vector<AttributeMap>> getAttributes() = 0; +}; + +} // namespace org::apache::nifi::minifi::controllers
