szaszm commented on a change in pull request #1257:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1257#discussion_r800855548



##########
File path: README.md
##########
@@ -82,6 +82,7 @@ Through JNI extensions you can run NiFi processors using 
NARs. The JNI extension
 | CURL | [InvokeHTTP](PROCESSORS.md#invokehttp)      |    -DDISABLE_CURL=ON  |
 | GPS | GetGPS      |    -DENABLE_GPS=ON  |
 | Kafka | [PublishKafka](PROCESSORS.md#publishkafka)      |    
-DENABLE_LIBRDKAFKA=ON  |
+| Kubernetes | 
[KubernetesControllerService](CONTROLLERS.md#kubernetesControllerService) | 
-DENABLE_KUBERNETES=ON |

Review comment:
       I don't think we need to add controller services here.

##########
File path: extensions/kubernetes/CMakeLists.txt
##########
@@ -17,9 +17,10 @@
 
 include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt)
 
-file(GLOB SOURCES "*.cpp")
-add_executable(minifi-kubernetes ${SOURCES})
-target_link_libraries(minifi-kubernetes ${LIBMINIFI} kubernetes CURL::libcurl)

Review comment:
       With or without the `-extensions` suffix are both fine IMO. It's not 
consistent, so with systemd I chose without for simplicity.

##########
File path: README.md
##########
@@ -113,8 +114,8 @@ Through JNI extensions you can run NiFi processors using 
NARs. The JNI extension
 ### To build
 
 #### Utilities
-* CMake 3.16 or greater
-* gcc 8 or greater
+* CMake 3.17 or greater
+* gcc 10 or greater

Review comment:
       Thanks, this was already the status quo for some time now.

##########
File path: 
extensions/kubernetes/controllerservice/KubernetesControllerService.cpp
##########
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "KubernetesControllerService.h"
+
+#include <vector>
+
+extern "C" {
+#include "config/incluster_config.h"
+#include "config/kube_config.h"
+#include "include/apiClient.h"
+#include "api/CoreV1API.h"
+}
+
+#include "core/Resource.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "Exception.h"
+
+namespace org::apache::nifi::minifi::controllers {
+
+class KubernetesControllerService::APIClient {
+ public:
+  explicit APIClient(core::logging::Logger& logger);
+  ~APIClient();

Review comment:
       Please either implement or disable copy (and optionally move). Otherwise 
we run the risk of double free.

##########
File path: libminifi/include/AttributeProviderService.h
##########
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include "core/controller/ControllerService.h"
+
+namespace org::apache::nifi::minifi::controllers {
+
+class AttributeProviderService : public core::controller::ControllerService {
+ public:
+  using ControllerService::ControllerService;
+
+  void yield() override {}
+  bool isRunning() override { return getState() == 
core::controller::ControllerServiceState::ENABLED; }
+  bool isWorkAvailable() override { return false; }
+
+  using AttributeMap = std::unordered_map<std::string, std::string>;
+  virtual std::vector<AttributeMap> getAttributes() = 0;

Review comment:
       I find it clearer without the type alias. I had to scroll through the 
diff and check multiple files to know what `AttributeMap` actually meant.

##########
File path: extensions/standard-processors/processors/TailFile.cpp
##########
@@ -846,7 +874,28 @@ void TailFile::checkForNewFiles() {
     return true;
   };
 
-  utils::file::list_dir(base_dir_, add_new_files_callback, logger_, 
recursive_lookup_);
+  if (attribute_provider_service_) {
+    for (const auto& base_dir : getBaseDirectories(context)) {
+      utils::file::list_dir(base_dir, add_new_files_callback, logger_, 
recursive_lookup_);
+    }
+  } else {
+    utils::file::list_dir(base_dir_, add_new_files_callback, logger_, 
recursive_lookup_);
+  }
+}
+
+std::vector<std::string> TailFile::getBaseDirectories(core::ProcessContext& 
context) const {
+  gsl_Expects(attribute_provider_service_);
+
+  const auto attribute_maps = attribute_provider_service_->getAttributes();
+  return attribute_maps |
+      ranges::views::transform([&context](const auto& attribute_map) {
+        auto flow_file = std::make_shared<FlowFileRecord>();
+        for (const auto& [key, value] : attribute_map) {
+          flow_file->setAttribute(key, value);
+        }
+        return context.getProperty(BaseDirectory, flow_file).value();

Review comment:
       What happens if the property doesn't exist? Is it reasonable that it's 
missing for one set of attributes, but not another?

##########
File path: extensions/standard-processors/processors/TailFile.cpp
##########
@@ -398,6 +410,19 @@ void TailFile::onSchedule(const 
std::shared_ptr<core::ProcessContext> &context,
   initial_start_position_ = 
InitialStartPositions{utils::parsePropertyWithAllowableValuesOrThrow(*context, 
InitialStartPosition.getName(), InitialStartPositions::values())};
 }
 
+void TailFile::parseAttributeProviderServiceProperty(core::ProcessContext& 
context) {
+  const auto attribute_provider_service_name = 
context.getProperty(AttributeProviderService);
+  if (attribute_provider_service_name && 
!attribute_provider_service_name->empty()) {
+    std::shared_ptr<core::controller::ControllerService> controller_service = 
context.getControllerService(*attribute_provider_service_name);
+    if (controller_service) {
+      attribute_provider_service_ = 
dynamic_cast<minifi::controllers::AttributeProviderService*>(controller_service.get());
+    }
+    if (!attribute_provider_service_) {
+      throw minifi::Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, 
utils::StringUtils::join_pack("Could not create AttributeProviderService: ", 
*attribute_provider_service_name)};
+    }

Review comment:
       I would separate the cases where a controller service with the specified 
name doesn't exist and when it exists but is not an AttributeProviderService.

##########
File path: 
extensions/kubernetes/controllerservice/KubernetesControllerService.cpp
##########
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "KubernetesControllerService.h"
+
+#include <vector>
+
+extern "C" {
+#include "config/incluster_config.h"
+#include "config/kube_config.h"
+#include "include/apiClient.h"
+#include "api/CoreV1API.h"
+}
+
+#include "core/Resource.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "Exception.h"
+
+namespace org::apache::nifi::minifi::controllers {
+
+class KubernetesControllerService::APIClient {
+ public:
+  explicit APIClient(core::logging::Logger& logger);
+  ~APIClient();
+  apiClient_t* getClient() { return api_client_; }
+
+ private:
+  char* base_path_ = nullptr;
+  sslConfig_t* ssl_config_ = nullptr;
+  list_t* api_keys_ = nullptr;
+  apiClient_t* api_client_ = nullptr;
+};
+
+KubernetesControllerService::APIClient::APIClient(core::logging::Logger& 
logger) {
+  int rc = load_incluster_config(&base_path_, &ssl_config_, &api_keys_);
+  if (rc != 0) {
+    logger.log_error("Cannot load kubernetes configuration in cluster");
+    return;
+  }
+  api_client_ = apiClient_create_with_base_path(base_path_, ssl_config_, 
api_keys_);
+  if (!api_client_) {
+    logger.log_error("Cannot create a kubernetes client");
+  }
+}
+
+KubernetesControllerService::APIClient::~APIClient() {
+  if (api_client_) {
+    apiClient_free(api_client_);
+    api_client_ = nullptr;
+  }
+
+  free_client_config(base_path_, ssl_config_, api_keys_);
+  base_path_ = nullptr;
+  ssl_config_ = nullptr;
+  api_keys_ = nullptr;
+
+  apiClient_unsetupGlobalEnv();
+}
+
+core::Property KubernetesControllerService::NamespaceFilter{
+    core::PropertyBuilder::createProperty("Namespace Filter")
+        ->withDescription("Limit the output to pods in namespaces which match 
this regular expression")
+        ->withDefaultValue<std::string>("default")
+        ->build()};
+core::Property KubernetesControllerService::PodNameFilter{
+    core::PropertyBuilder::createProperty("Pod Name Filter")
+        ->withDescription("If present, limit the output to pods the name of 
which matches this regular expression")
+        ->build()};
+core::Property KubernetesControllerService::ContainerNameFilter{
+    core::PropertyBuilder::createProperty("Container Name Filter")
+        ->withDescription("If present, limit the output to containers the name 
of which matches this regular expression")
+        ->build()};
+
+KubernetesControllerService::KubernetesControllerService(const std::string& 
name, const utils::Identifier& uuid)
+  : AttributeProviderService(name, uuid),
+    
logger_{core::logging::LoggerFactory<KubernetesControllerService>::getLogger()},
+    api_client_{std::make_unique<APIClient>(*logger_)} {
+}
+
+KubernetesControllerService::KubernetesControllerService(const std::string& 
name, const std::shared_ptr<Configure>& configuration)
+  : KubernetesControllerService{name} {
+    setConfiguration(configuration);
+    initialize();
+}
+
+void KubernetesControllerService::initialize() {
+  if (initialized_) { return; }
+
+  std::lock_guard<std::mutex> lock(initialization_mutex_);
+  ControllerService::initialize();
+  setSupportedProperties({NamespaceFilter, PodNameFilter, 
ContainerNameFilter});
+  initialized_ = true;
+}
+
+void KubernetesControllerService::onEnable() {
+  std::string namespace_filter;
+  if (getProperty(NamespaceFilter.getName(), namespace_filter) && 
!namespace_filter.empty()) {
+    namespace_filter_ = std::regex{namespace_filter};
+  }
+
+  std::string pod_name_filter;
+  if (getProperty(PodNameFilter.getName(), pod_name_filter) && 
!pod_name_filter.empty()) {
+    pod_name_filter_ = std::regex{pod_name_filter};
+  }
+
+  std::string container_name_filter;
+  if (getProperty(ContainerNameFilter.getName(), container_name_filter) && 
!container_name_filter.empty()) {
+    container_name_filter_ = std::regex{container_name_filter};
+  }
+}
+
+namespace {
+
+struct v1_pod_list_t_deleter {
+  void operator()(v1_pod_list_t* ptr) const noexcept { v1_pod_list_free(ptr); }
+};
+using v1_pod_list_unique_ptr = std::unique_ptr<v1_pod_list_t, 
v1_pod_list_t_deleter>;
+
+v1_pod_list_unique_ptr getPods(apiClient_t* api_client, core::logging::Logger& 
logger) {

Review comment:
       Consider using a reference, `gsl::not_null` or a precondition to ensure 
that `api_client` can't be null.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to