This is an automated email from the ASF dual-hosted git repository.

fgerlits pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit ccccbef805e7fb11e926ede35af181c6ca61fd2a
Author: Gabor Gyimesi <[email protected]>
AuthorDate: Wed Feb 12 17:11:45 2025 +0100

    MINIFICPP-2525 Handle errors when enabling controllers
    
    Signed-off-by: Ferenc Gerlits <[email protected]>
    Closes #1933
---
 controller/tests/ControllerTests.cpp               |   2 +-
 .../controllerservices/CouchbaseClusterService.h   |   4 +-
 .../tests/SmbConnectionControllerServiceTests.cpp  |   2 +-
 libminifi/include/core/FlowConfiguration.h         |   3 +-
 .../ForwardingControllerServiceProvider.h          |   4 +-
 .../controller/StandardControllerServiceNode.h     |  26 +--
 .../controller/StandardControllerServiceProvider.h |  74 +++-----
 libminifi/src/core/FlowConfiguration.cpp           |  18 +-
 .../core/controller/ControllerServiceNodeMap.cpp   |   1 +
 .../controller/StandardControllerServiceNode.cpp   |  51 ++++--
 .../StandardControllerServiceProvider.cpp          | 128 +++++++++++++
 .../src/core/flow/StructuredConfiguration.cpp      |   3 +-
 .../integration/C2ControllerEnableFailureTest.cpp  | 197 +++++++++++++++++++++
 libminifi/test/libtest/unit/ProvenanceTestHelper.h |  54 ------
 libminifi/test/libtest/unit/TestBase.cpp           |   2 +-
 .../test/resources/TestC2InvalidController.yml     |  41 +++++
 libminifi/test/resources/TestC2ValidController.yml |  41 +++++
 libminifi/test/unit/SchedulingAgentTests.cpp       |   1 -
 .../core/controller/ControllerServiceProvider.h    |   2 +-
 utils/include/core/ProcessContext.h                |  16 +-
 20 files changed, 502 insertions(+), 168 deletions(-)

diff --git a/controller/tests/ControllerTests.cpp 
b/controller/tests/ControllerTests.cpp
index adbe80ca6..ab9debb68 100644
--- a/controller/tests/ControllerTests.cpp
+++ b/controller/tests/ControllerTests.cpp
@@ -212,7 +212,7 @@ class TestControllerServiceProvider : public 
core::controller::ControllerService
     return is_ssl_ ? ssl_context_service_ : nullptr;
   }
 
-  std::shared_ptr<core::controller::ControllerServiceNode> 
createControllerService(const std::string&, const std::string&, const 
std::string&, bool) override {
+  std::shared_ptr<core::controller::ControllerServiceNode> 
createControllerService(const std::string&, const std::string&) override {
     return nullptr;
   }
   void clearControllerServices() override {
diff --git a/extensions/couchbase/controllerservices/CouchbaseClusterService.h 
b/extensions/couchbase/controllerservices/CouchbaseClusterService.h
index a6bfc30b1..0da173c91 100644
--- a/extensions/couchbase/controllerservices/CouchbaseClusterService.h
+++ b/extensions/couchbase/controllerservices/CouchbaseClusterService.h
@@ -138,11 +138,11 @@ class CouchbaseClusterService : public 
core::controller::ControllerServiceImpl {
   void initialize() override;
 
   void yield() override {
-  };
+  }
 
   bool isWorkAvailable() override {
     return false;
-  };
+  }
 
   bool isRunning() const override {
     return getState() == core::controller::ControllerServiceState::ENABLED;
diff --git a/extensions/smb/tests/SmbConnectionControllerServiceTests.cpp 
b/extensions/smb/tests/SmbConnectionControllerServiceTests.cpp
index c9693dc46..217122fa3 100644
--- a/extensions/smb/tests/SmbConnectionControllerServiceTests.cpp
+++ b/extensions/smb/tests/SmbConnectionControllerServiceTests.cpp
@@ -35,7 +35,7 @@ struct SmbConnectionControllerServiceFixture {
 
 
 TEST_CASE_METHOD(SmbConnectionControllerServiceFixture, 
"SmbConnectionControllerService onEnable throws when empty") {
-  REQUIRE_THROWS(plan_->finalize());
+  
REQUIRE_THROWS(smb_connection_node_->getControllerServiceImplementation()->onEnable());
 }
 
 TEST_CASE_METHOD(SmbConnectionControllerServiceFixture, 
"SmbConnectionControllerService anonymous connection") {
diff --git a/libminifi/include/core/FlowConfiguration.h 
b/libminifi/include/core/FlowConfiguration.h
index a4066f969..a85af2c61 100644
--- a/libminifi/include/core/FlowConfiguration.h
+++ b/libminifi/include/core/FlowConfiguration.h
@@ -88,8 +88,7 @@ class FlowConfiguration : public CoreComponentImpl {
   static std::unique_ptr<core::ProcessGroup> createSimpleProcessGroup(const 
std::string &name, const utils::Identifier &uuid, int version);
   static std::unique_ptr<core::ProcessGroup> createRemoteProcessGroup(const 
std::string &name, const utils::Identifier &uuid);
 
-  std::shared_ptr<core::controller::ControllerServiceNode> 
createControllerService(const std::string &class_name, const std::string 
&full_class_name, const std::string &name,
-      const utils::Identifier &uuid);
+  std::shared_ptr<core::controller::ControllerServiceNode> 
createControllerService(const std::string &class_name, const std::string &name, 
const utils::Identifier &uuid);
 
   // Create Connection
   [[nodiscard]] std::unique_ptr<minifi::Connection> createConnection(const 
std::string &name, const utils::Identifier &uuid) const;
diff --git 
a/libminifi/include/core/controller/ForwardingControllerServiceProvider.h 
b/libminifi/include/core/controller/ForwardingControllerServiceProvider.h
index f31629f1c..09d91b00f 100644
--- a/libminifi/include/core/controller/ForwardingControllerServiceProvider.h
+++ b/libminifi/include/core/controller/ForwardingControllerServiceProvider.h
@@ -31,8 +31,8 @@ class ForwardingControllerServiceProvider : public 
ControllerServiceProviderImpl
  public:
   using ControllerServiceProviderImpl::ControllerServiceProviderImpl;
 
-  std::shared_ptr<ControllerServiceNode> createControllerService(const 
std::string &type, const std::string &longType, const std::string &id, bool 
firstTimeAdded) override {
-    return controller_service_provider_impl_->createControllerService(type, 
longType, id, firstTimeAdded);
+  std::shared_ptr<ControllerServiceNode> createControllerService(const 
std::string &type, const std::string &id) override {
+    return controller_service_provider_impl_->createControllerService(type, 
id);
   }
 
   ControllerServiceNode* getControllerServiceNode(const std::string &id) const 
override {
diff --git a/libminifi/include/core/controller/StandardControllerServiceNode.h 
b/libminifi/include/core/controller/StandardControllerServiceNode.h
index 1e9b85d16..fc4dc7bf0 100644
--- a/libminifi/include/core/controller/StandardControllerServiceNode.h
+++ b/libminifi/include/core/controller/StandardControllerServiceNode.h
@@ -46,39 +46,17 @@ class StandardControllerServiceNode : public 
ControllerServiceNodeImpl {
   StandardControllerServiceNode(const StandardControllerServiceNode &other) = 
delete;
   StandardControllerServiceNode &operator=(const StandardControllerServiceNode 
&parent) = delete;
 
-  /**
-   * Initializes the controller service node.
-   */
   void initialize() override {
     ControllerServiceNodeImpl::initialize();
     active = false;
   }
 
-  bool canEnable() override {
-    if (!active.load()) {
-      for (auto linked_service : linked_controller_services_) {
-        if (!linked_service->canEnable()) {
-          return false;
-        }
-      }
-      return true;
-    } else {
-      return false;
-    }
-  }
-
+  bool canEnable() override;
   bool enable() override;
-
-  bool disable() override {
-    controller_service_->setState(DISABLED);
-    active = false;
-    return true;
-  }
+  bool disable() override;
 
  protected:
-  // controller service provider.
   std::shared_ptr<ControllerServiceProvider> provider;
-
   std::mutex mutex_;
 
  private:
diff --git 
a/libminifi/include/core/controller/StandardControllerServiceProvider.h 
b/libminifi/include/core/controller/StandardControllerServiceProvider.h
index 9f1dfbe47..54eae7115 100644
--- a/libminifi/include/core/controller/StandardControllerServiceProvider.h
+++ b/libminifi/include/core/controller/StandardControllerServiceProvider.h
@@ -15,19 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 #pragma once
 
 #include <string>
 #include <utility>
- #include <memory>
-#include <vector>
-#include "core/ProcessGroup.h"
-#include "core/ClassLoader.h"
+#include <memory>
+#include <unordered_set>
+#include <thread>
 #include "core/controller/ControllerService.h"
 #include "ControllerServiceNodeMap.h"
-#include "ControllerServiceNode.h"
-#include "StandardControllerServiceNode.h"
 #include "ControllerServiceProvider.h"
 #include "core/logging/LoggerFactory.h"
 
@@ -39,6 +35,7 @@ class StandardControllerServiceProvider : public 
ControllerServiceProviderImpl
       : ControllerServiceProviderImpl(std::move(services)),
         extension_loader_(loader),
         configuration_(std::move(configuration)),
+        admin_yield_duration_(readAdministrativeYieldDuration()),
         
logger_(logging::LoggerFactory<StandardControllerServiceProvider>::getLogger()) 
{
   }
 
@@ -47,64 +44,35 @@ class StandardControllerServiceProvider : public 
ControllerServiceProviderImpl
 
   StandardControllerServiceProvider& operator=(const 
StandardControllerServiceProvider &other) = delete;
   StandardControllerServiceProvider& 
operator=(StandardControllerServiceProvider &&other) = delete;
-
-  std::shared_ptr<ControllerServiceNode> createControllerService(const 
std::string& type, const std::string&, const std::string& id, bool) override {
-    std::shared_ptr<ControllerService> new_controller_service = 
extension_loader_.instantiate<ControllerService>(type, id);
-
-    if (!new_controller_service) {
-      return nullptr;
-    }
-
-    std::shared_ptr<ControllerServiceNode> new_service_node = 
std::make_shared<StandardControllerServiceNode>(new_controller_service,
-                                                                               
                               sharedFromThis<ControllerServiceProvider>(), id,
-                                                                               
                               configuration_);
-
-    controller_map_->put(id, new_service_node);
-    return new_service_node;
-  }
-
-  void enableAllControllerServices() override {
-    logger_->log_info("Enabling {} controller services", 
controller_map_->getAllControllerServices().size());
-    for (const auto& service : controller_map_->getAllControllerServices()) {
-      logger_->log_info("Enabling {}", service->getName());
-      if (!service->canEnable()) {
-        logger_->log_warn("Service {} cannot be enabled", service->getName());
-        continue;
-      }
-      if (!service->enable()) {
-        logger_->log_warn("Could not enable {}", service->getName());
-      }
-    }
-  }
-
-  void disableAllControllerServices() override {
-    logger_->log_info("Disabling {} controller services", 
controller_map_->getAllControllerServices().size());
-    for (const auto& service : controller_map_->getAllControllerServices()) {
-      logger_->log_info("Disabling {}", service->getName());
-      if (!service->enabled()) {
-        logger_->log_warn("Service {} is not enabled", service->getName());
-        continue;
-      }
-      if (!service->disable()) {
-        logger_->log_warn("Could not disable {}", service->getName());
-      }
-    }
+  ~StandardControllerServiceProvider() override {
+    stopEnableRetryThread();
   }
 
-  void clearControllerServices() override {
-    controller_map_->clear();
-  }
+  std::shared_ptr<ControllerServiceNode> createControllerService(const 
std::string& type, const std::string& id) override;
+  void enableAllControllerServices() override;
+  void disableAllControllerServices() override;
+  void clearControllerServices() override;
 
  protected:
+  void stopEnableRetryThread();
+  void startEnableRetryThread();
+
   bool canEdit() override {
     return false;
   }
 
   ClassLoader &extension_loader_;
-
   std::shared_ptr<Configure> configuration_;
 
  private:
+  std::chrono::milliseconds readAdministrativeYieldDuration() const;
+
+  std::thread controller_service_enable_retry_thread_;
+  std::atomic_bool enable_retry_thread_running_{false};
+  std::mutex enable_retry_mutex_;
+  std::condition_variable enable_retry_condition_;
+  std::unordered_set<std::shared_ptr<ControllerServiceNode>> 
controller_services_to_enable_;
+  std::chrono::milliseconds admin_yield_duration_;
   std::shared_ptr<logging::Logger> logger_;
 };
 
diff --git a/libminifi/src/core/FlowConfiguration.cpp 
b/libminifi/src/core/FlowConfiguration.cpp
index 302595af3..807102c96 100644
--- a/libminifi/src/core/FlowConfiguration.cpp
+++ b/libminifi/src/core/FlowConfiguration.cpp
@@ -99,7 +99,14 @@ std::unique_ptr<core::ProcessGroup> 
FlowConfiguration::updateFromPayload(const s
   auto old_parameter_providers = std::move(parameter_providers_);
   service_provider_ = 
std::make_shared<core::controller::StandardControllerServiceProvider>(std::make_unique<core::controller::ControllerServiceNodeMap>(),
 configuration_);
   auto payload = getRootFromPayload(yamlConfigPayload);
-  if (!url.empty() && payload != nullptr) {
+  if (!payload) {
+    service_provider_ = old_provider;
+    parameter_contexts_ = std::move(old_parameter_contexts);
+    parameter_providers_ = std::move(old_parameter_providers);
+    return nullptr;
+  }
+
+  if (!url.empty()) {
     std::string payload_flow_id;
     std::string bucket_id;
     auto path_split = utils::string::split(url, "/");
@@ -111,11 +118,8 @@ std::unique_ptr<core::ProcessGroup> 
FlowConfiguration::updateFromPayload(const s
       }
     }
     flow_version_->setFlowVersion(url, bucket_id, flow_id ? *flow_id : 
payload_flow_id);
-  } else {
-    service_provider_ = old_provider;
-    parameter_contexts_ = std::move(old_parameter_contexts);
-    parameter_providers_ = std::move(old_parameter_providers);
   }
+
   return payload;
 }
 
@@ -174,9 +178,9 @@ std::unique_ptr<minifi::Connection> 
FlowConfiguration::createConnection(const st
   return std::make_unique<minifi::ConnectionImpl>(flow_file_repo_, 
content_repo_, name, uuid);
 }
 
-std::shared_ptr<core::controller::ControllerServiceNode> 
FlowConfiguration::createControllerService(const std::string &class_name, const 
std::string &full_class_name, const std::string &name,
+std::shared_ptr<core::controller::ControllerServiceNode> 
FlowConfiguration::createControllerService(const std::string &class_name, const 
std::string &name,
     const utils::Identifier& uuid) {
-  std::shared_ptr<core::controller::ControllerServiceNode> 
controllerServicesNode = service_provider_->createControllerService(class_name, 
full_class_name, name, true);
+  std::shared_ptr<core::controller::ControllerServiceNode> 
controllerServicesNode = service_provider_->createControllerService(class_name, 
name);
   if (nullptr != controllerServicesNode)
     controllerServicesNode->setUUID(uuid);
   return controllerServicesNode;
diff --git a/libminifi/src/core/controller/ControllerServiceNodeMap.cpp 
b/libminifi/src/core/controller/ControllerServiceNodeMap.cpp
index dded7150c..4ad00dd4f 100644
--- a/libminifi/src/core/controller/ControllerServiceNodeMap.cpp
+++ b/libminifi/src/core/controller/ControllerServiceNodeMap.cpp
@@ -81,6 +81,7 @@ void ControllerServiceNodeMap::clear() {
     node->disable();
   }
   controller_service_nodes_.clear();
+  process_groups_.clear();
 }
 
 std::vector<std::shared_ptr<ControllerServiceNode>> 
ControllerServiceNodeMap::getAllControllerServices() const {
diff --git a/libminifi/src/core/controller/StandardControllerServiceNode.cpp 
b/libminifi/src/core/controller/StandardControllerServiceNode.cpp
index 69eb33709..e4529fefb 100644
--- a/libminifi/src/core/controller/StandardControllerServiceNode.cpp
+++ b/libminifi/src/core/controller/StandardControllerServiceNode.cpp
@@ -19,9 +19,20 @@
 #include "core/controller/StandardControllerServiceNode.h"
 #include <memory>
 #include <mutex>
+#include <algorithm>
 
 namespace org::apache::nifi::minifi::core::controller {
 
+bool StandardControllerServiceNode::canEnable() {
+  if (active) {
+    return false;
+  }
+
+  return std::all_of(linked_controller_services_.begin(), 
linked_controller_services_.end(), [](auto linked_service) {
+    return linked_service->canEnable();
+  });
+}
+
 bool StandardControllerServiceNode::enable() {
   logger_->log_trace("Enabling CSN {}", getName());
   if (active) {
@@ -38,24 +49,42 @@ bool StandardControllerServiceNode::enable() {
     }
   }
   std::shared_ptr<ControllerService> impl = 
getControllerServiceImplementation();
-  if (nullptr != impl) {
-    std::lock_guard<std::mutex> lock(mutex_);
-    std::vector<std::shared_ptr<ControllerService>> services;
-    std::vector<ControllerServiceNode*> service_nodes;
-    services.reserve(linked_controller_services_.size());
-    for (const auto& service : linked_controller_services_) {
-      services.push_back(service->getControllerServiceImplementation());
-      if (!service->enable()) {
-        logger_->log_debug("Linked Service '{}' could not be enabled", 
service->getName());
-        return false;
-      }
+  if (nullptr == impl) {
+    logger_->log_warn("Service '{}' service implementation could not be 
found", controller_service_->getName());
+    controller_service_->setState(ENABLING);
+    return false;
+  }
+
+  std::lock_guard<std::mutex> lock(mutex_);
+  std::vector<std::shared_ptr<ControllerService>> services;
+  std::vector<ControllerServiceNode*> service_nodes;
+  services.reserve(linked_controller_services_.size());
+  for (const auto& service : linked_controller_services_) {
+    services.push_back(service->getControllerServiceImplementation());
+    if (!service->enable()) {
+      logger_->log_warn("Linked Service '{}' could not be enabled", 
service->getName());
+      return false;
     }
+  }
+
+  try {
     impl->setLinkedControllerServices(services);
     impl->onEnable();
+  } catch(const std::exception& e) {
+    logger_->log_warn("Service '{}' failed to enable: {}", getName(), 
e.what());
+    controller_service_->setState(ENABLING);
+    return false;
   }
+
   active = true;
   controller_service_->setState(ENABLED);
   return true;
 }
 
+bool StandardControllerServiceNode::disable() {
+  controller_service_->setState(DISABLED);
+  active = false;
+  return true;
+}
+
 }  // namespace org::apache::nifi::minifi::core::controller
diff --git 
a/libminifi/src/core/controller/StandardControllerServiceProvider.cpp 
b/libminifi/src/core/controller/StandardControllerServiceProvider.cpp
new file mode 100644
index 000000000..28cb21cde
--- /dev/null
+++ b/libminifi/src/core/controller/StandardControllerServiceProvider.cpp
@@ -0,0 +1,128 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "core/controller/StandardControllerServiceProvider.h"
+
+#include "core/controller/StandardControllerServiceNode.h"
+#include "core/TypedValues.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::core::controller {
+
+std::shared_ptr<ControllerServiceNode> 
StandardControllerServiceProvider::createControllerService(const std::string& 
type, const std::string& id) {
+  std::shared_ptr<ControllerService> new_controller_service = 
extension_loader_.instantiate<ControllerService>(type, id);
+
+  if (!new_controller_service) {
+    return nullptr;
+  }
+
+  std::shared_ptr<ControllerServiceNode> new_service_node = 
std::make_shared<StandardControllerServiceNode>(new_controller_service,
+                                                                               
                             sharedFromThis<ControllerServiceProvider>(), id,
+                                                                               
                             configuration_);
+
+  controller_map_->put(id, new_service_node);
+  return new_service_node;
+}
+
+void StandardControllerServiceProvider::enableAllControllerServices() {
+  gsl_Expects(!enable_retry_thread_running_);
+  {
+    std::lock_guard<std::mutex> lock(enable_retry_mutex_);
+    logger_->log_info("Enabling {} controller services", 
controller_map_->getAllControllerServices().size());
+    for (const auto& service : controller_map_->getAllControllerServices()) {
+      logger_->log_info("Enabling {}", service->getName());
+      if (!service->canEnable()) {
+        logger_->log_warn("Service {} cannot be enabled", service->getName());
+        continue;
+      }
+      if (!service->enable()) {
+        logger_->log_warn("Could not enable {}", service->getName());
+        controller_services_to_enable_.insert(service);
+      }
+    }
+  }
+  startEnableRetryThread();
+}
+
+void StandardControllerServiceProvider::disableAllControllerServices() {
+  stopEnableRetryThread();
+  logger_->log_info("Disabling {} controller services", 
controller_map_->getAllControllerServices().size());
+  for (const auto& service : controller_map_->getAllControllerServices()) {
+    logger_->log_info("Disabling {}", service->getName());
+    if (!service->disable()) {
+      logger_->log_warn("Could not disable {}", service->getName());
+    }
+  }
+}
+
+void StandardControllerServiceProvider::clearControllerServices() {
+  stopEnableRetryThread();
+  controller_map_->clear();
+}
+
+void StandardControllerServiceProvider::stopEnableRetryThread() {
+  enable_retry_thread_running_ = false;
+  enable_retry_condition_.notify_all();
+  if (controller_service_enable_retry_thread_.joinable()) {
+    controller_service_enable_retry_thread_.join();
+  }
+}
+
+void StandardControllerServiceProvider::startEnableRetryThread() {
+  enable_retry_thread_running_ = true;
+  controller_service_enable_retry_thread_ = std::thread([this]() {
+    if (controller_services_to_enable_.empty()) {
+      return;
+    }
+    std::unique_lock<std::mutex> lock(enable_retry_mutex_);
+    enable_retry_condition_.wait_for(lock, admin_yield_duration_, [this]() {
+      return !enable_retry_thread_running_;
+    });
+    while (enable_retry_thread_running_) {
+      for (auto it = controller_services_to_enable_.begin(); it != 
controller_services_to_enable_.end();) {
+        if ((*it)->enable()) {
+          it = controller_services_to_enable_.erase(it);
+        } else {
+          ++it;
+        }
+      }
+      if (controller_services_to_enable_.empty()) {
+        break;
+      }
+      enable_retry_condition_.wait_for(lock, admin_yield_duration_, [this]() {
+        return !enable_retry_thread_running_;
+      });
+    }
+    controller_services_to_enable_.clear();
+  });
+}
+
+std::chrono::milliseconds 
StandardControllerServiceProvider::readAdministrativeYieldDuration() const {
+  std::chrono::milliseconds admin_yield_duration = 30s;
+  std::string yield_value_str;
+
+  if (configuration_->get(Configure::nifi_administrative_yield_duration, 
yield_value_str)) {
+    std::optional<core::TimePeriodValue> value = 
core::TimePeriodValue::fromString(yield_value_str);
+    if (value) {
+      admin_yield_duration = value->getMilliseconds();
+    }
+  }
+  return admin_yield_duration;
+}
+
+}  // namespace org::apache::nifi::minifi::core::controller
diff --git a/libminifi/src/core/flow/StructuredConfiguration.cpp 
b/libminifi/src/core/flow/StructuredConfiguration.cpp
index 7b706bb6a..9e873032b 100644
--- a/libminifi/src/core/flow/StructuredConfiguration.cpp
+++ b/libminifi/src/core/flow/StructuredConfiguration.cpp
@@ -631,7 +631,6 @@ void StructuredConfiguration::parseControllerServices(const 
Node& controller_ser
     auto type = getRequiredField(service_node, schema_.type);
     logger_->log_debug("Using type {} for controller service node", type);
 
-    std::string fullType = type;
     type = utils::string::partAfterLastOccurrenceOf(type, '.');
 
     auto name = service_node[schema_.name].getString().value();
@@ -639,7 +638,7 @@ void StructuredConfiguration::parseControllerServices(const 
Node& controller_ser
 
     utils::Identifier uuid;
     uuid = id;
-    std::shared_ptr<core::controller::ControllerServiceNode> 
controller_service_node = createControllerService(type, fullType, name, uuid);
+    std::shared_ptr<core::controller::ControllerServiceNode> 
controller_service_node = createControllerService(type, name, uuid);
     if (nullptr != controller_service_node) {
       logger_->log_debug("Created Controller Service with UUID {} and name 
{}", id, name);
       controller_service_node->initialize();
diff --git a/libminifi/test/integration/C2ControllerEnableFailureTest.cpp 
b/libminifi/test/integration/C2ControllerEnableFailureTest.cpp
new file mode 100644
index 000000000..79267b20b
--- /dev/null
+++ b/libminifi/test/integration/C2ControllerEnableFailureTest.cpp
@@ -0,0 +1,197 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <string>
+
+#include "unit/TestBase.h"
+#include "integration/HTTPIntegrationBase.h"
+#include "integration/HTTPHandlers.h"
+#include "unit/Catch.h"
+#include "core/Processor.h"
+#include "core/controller/ControllerService.h"
+#include "core/Resource.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::test {
+
+class DummyController : public core::controller::ControllerServiceImpl {
+ public:
+  explicit DummyController(std::string_view name, const 
minifi::utils::Identifier &uuid = {}) : ControllerServiceImpl(name, uuid) {}
+
+  static constexpr const char* Description = "Dummy Controller";
+
+  static constexpr auto DummyControllerProperty = 
core::PropertyDefinitionBuilder<>::createProperty("Dummy Controller Property")
+      .withDescription("Dummy Controller Property")
+      .build();
+
+  static constexpr auto Properties = 
std::to_array<core::PropertyReference>({DummyControllerProperty});
+  static constexpr bool SupportsDynamicProperties = false;
+  ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_CONTROLLER_SERVICES
+
+  void initialize() override {
+    setSupportedProperties(Properties);
+  }
+
+  void yield() override {
+  }
+
+  bool isWorkAvailable() override {
+    return false;
+  }
+
+  bool isRunning() const override {
+    return getState() == core::controller::ControllerServiceState::ENABLED;
+  }
+
+  void onEnable() override {
+    auto dummy_controller_property = getProperty(DummyControllerProperty.name);
+    if (!dummy_controller_property || dummy_controller_property->empty()) {
+      throw 
minifi::Exception(minifi::ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Missing 
dummy property");
+    }
+  }
+
+ private:
+  std::shared_ptr<core::logging::Logger> logger_ = 
core::logging::LoggerFactory<DummyController>::getLogger(uuid_);
+};
+
+REGISTER_RESOURCE(DummyController, ControllerService);
+
+class DummmyControllerUserProcessor : public minifi::core::ProcessorImpl {
+  using minifi::core::ProcessorImpl::ProcessorImpl;
+
+ public:
+  DummmyControllerUserProcessor(std::string_view name, const 
minifi::utils::Identifier& uuid) : ProcessorImpl(name, uuid) {}
+  explicit DummmyControllerUserProcessor(std::string_view name) : 
ProcessorImpl(name) {}
+  static constexpr auto DummyControllerService = 
core::PropertyDefinitionBuilder<>::createProperty("Dummy Controller Service")
+    .withDescription("Dummy Controller Service")
+    .withAllowedTypes<DummyController>()
+    .build();
+
+  void initialize() override {
+    setSupportedProperties(Properties);
+  }
+
+  void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& 
/*session_factory*/) override {
+    if (auto controller_service = 
context.getProperty(DummmyControllerUserProcessor::DummyControllerService)) {
+      if 
(!std::dynamic_pointer_cast<DummyController>(context.getControllerService(*controller_service,
 uuid_))) {
+        throw 
minifi::Exception(minifi::ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Invalid 
controller service");
+      }
+    } else {
+      throw 
minifi::Exception(minifi::ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Missing 
controller service");
+    }
+    logger_->log_debug("DummyControllerUserProcessor::onSchedule successful");
+  }
+
+  static constexpr const char* Description = "A processor that uses 
controller.";
+  static constexpr auto Properties = std::array<core::PropertyReference, 
1>{DummyControllerService};
+  static constexpr auto Relationships = 
std::array<core::RelationshipDefinition, 0>{};
+  static constexpr bool SupportsDynamicProperties = false;
+  static constexpr bool SupportsDynamicRelationships = false;
+  static constexpr core::annotation::Input InputRequirement = 
core::annotation::Input::INPUT_ALLOWED;
+  static constexpr bool IsSingleThreaded = false;
+  ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
+
+ private:
+  std::shared_ptr<core::logging::Logger> logger_ = 
core::logging::LoggerFactory<DummmyControllerUserProcessor>::getLogger(uuid_);
+};
+
+REGISTER_RESOURCE(DummmyControllerUserProcessor, Processor);
+
+class VerifyC2ControllerUpdate : public VerifyC2Base {
+ public:
+  explicit VerifyC2ControllerUpdate(const std::atomic_bool& 
flow_updated_successfully) : 
flow_updated_successfully_(flow_updated_successfully) {
+  }
+
+  void testSetup() override {
+    LogTestController::getInstance().setTrace<minifi::c2::C2Agent>();
+    LogTestController::getInstance().setDebug<minifi::c2::RESTSender>();
+    LogTestController::getInstance().setDebug<DummmyControllerUserProcessor>();
+    LogTestController::getInstance().setDebug<DummyController>();
+    
LogTestController::getInstance().setDebug<core::controller::StandardControllerServiceProvider>();
+    VerifyC2Base::testSetup();
+  }
+
+  void runAssertions() override {
+    using 
org::apache::nifi::minifi::test::utils::verifyEventHappenedInPollTime;
+    REQUIRE(verifyEventHappenedInPollTime(40s, [&] { return 
flow_updated_successfully_.load(); }, 1s));
+  }
+
+ private:
+  const std::atomic_bool& flow_updated_successfully_;
+};
+
+class ControllerUpdateHandler: public HeartbeatHandler {
+ public:
+  explicit ControllerUpdateHandler(std::atomic_bool& 
flow_updated_successfully, std::shared_ptr<minifi::Configure> configuration, 
const std::filesystem::path& replacement_config_path)
+    : HeartbeatHandler(std::move(configuration)),
+      flow_updated_successfully_(flow_updated_successfully),
+      
replacement_config_(minifi::utils::file::get_content(replacement_config_path.string()))
 {
+  }
+
+  void handleHeartbeat(const rapidjson::Document& /*root*/, struct 
mg_connection* conn) override {
+    switch (test_state_) {
+      case TestState::VERIFY_INITIAL_METRICS: {
+        sendEmptyHeartbeatResponse(conn);
+        REQUIRE(minifi::test::utils::verifyLogLinePresenceInPollTime(5s, 
"Could not enable DummyController"));
+        REQUIRE(minifi::test::utils::verifyLogLinePresenceInPollTime(5s, 
"(DummmyControllerUserProcessor): Process Schedule Operation: Invalid 
controller service"));
+        test_state_ = TestState::SEND_NEW_CONFIG;
+        break;
+      }
+      case TestState::SEND_NEW_CONFIG: {
+        sendHeartbeatResponse("UPDATE", "configuration", "889349", conn, 
{{"configuration_data", minifi::c2::C2Value{replacement_config_}}});
+        test_state_ = TestState::VERIFY_UPDATED_METRICS;
+        break;
+      }
+      case TestState::VERIFY_UPDATED_METRICS: {
+        sendEmptyHeartbeatResponse(conn);
+        if (minifi::test::utils::verifyLogLinePresenceInPollTime(0s, 
"DummyControllerUserProcessor::onSchedule successful")) {
+          flow_updated_successfully_ = true;
+        }
+        break;
+      }
+    }
+  }
+
+ private:
+  enum class TestState {
+    VERIFY_INITIAL_METRICS,
+    SEND_NEW_CONFIG,
+    VERIFY_UPDATED_METRICS
+  };
+
+  static void sendEmptyHeartbeatResponse(struct mg_connection* conn) {
+    mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: 
text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
+  }
+
+  std::atomic_bool& flow_updated_successfully_;
+  TestState test_state_ = TestState::VERIFY_INITIAL_METRICS;
+  std::string replacement_config_;
+};
+
+TEST_CASE("C2ControllerEnableFailureTest", "[c2test]") {
+  std::atomic_bool flow_updated_successfully{false};
+  VerifyC2ControllerUpdate harness(flow_updated_successfully);
+  const auto test_file_path = std::filesystem::path(TEST_RESOURCES) / 
"TestC2InvalidController.yml";
+  auto replacement_path = test_file_path.string();
+  minifi::utils::string::replaceAll(replacement_path, 
"TestC2InvalidController", "TestC2ValidController");
+  ControllerUpdateHandler handler(flow_updated_successfully, 
harness.getConfiguration(), replacement_path);
+  harness.setUrl("https://localhost:0/api/heartbeat";, &handler);
+  harness.run(test_file_path);
+}
+
+}  // namespace org::apache::nifi::minifi::test
diff --git a/libminifi/test/libtest/unit/ProvenanceTestHelper.h 
b/libminifi/test/libtest/unit/ProvenanceTestHelper.h
index 36704bc71..db61c837b 100644
--- a/libminifi/test/libtest/unit/ProvenanceTestHelper.h
+++ b/libminifi/test/libtest/unit/ProvenanceTestHelper.h
@@ -240,57 +240,3 @@ class TestFlowRepository : public 
org::apache::nifi::minifi::core::ThreadedRepos
   std::shared_ptr<org::apache::nifi::minifi::core::ContentRepository> 
content_repo_;
   std::thread thread_;
 };
-
-class TestFlowController : public org::apache::nifi::minifi::FlowController {
- public:
-  
TestFlowController(std::shared_ptr<org::apache::nifi::minifi::core::Repository> 
repo, std::shared_ptr<org::apache::nifi::minifi::core::Repository> 
flow_file_repo,
-      const 
std::shared_ptr<org::apache::nifi::minifi::core::ContentRepository>& 
/*content_repo*/)
-      :org::apache::nifi::minifi::FlowController(repo, flow_file_repo, 
org::apache::nifi::minifi::Configure::create(), nullptr,
-          
std::make_shared<org::apache::nifi::minifi::core::repository::VolatileContentRepository>())
 {
-  }
-
-  ~TestFlowController() override = default;
-
-  void load(bool /*reload*/ = false) override {
-  }
-
-  int16_t start() override {
-    return 0;
-  }
-
-  int16_t stop() override {
-    return 0;
-  }
-
-  void waitUnload(const std::chrono::milliseconds /*time_to_wait*/) override {
-    stop();
-  }
-
-  int16_t pause() override {
-    return -1;
-  }
-
-  int16_t resume() override {
-    return -1;
-  }
-
-  bool isRunning() const override {
-    return true;
-  }
-
-  std::shared_ptr<org::apache::nifi::minifi::core::Processor> 
createProcessor(const std::string& /*name*/, const 
org::apache::nifi::minifi::utils::Identifier& /*uuid*/) {
-    return nullptr;
-  }
-
-  org::apache::nifi::minifi::core::ProcessGroup *createRootProcessGroup(const 
std::string& /*name*/, const org::apache::nifi::minifi::utils::Identifier& 
/*uuid*/) {
-    return nullptr;
-  }
-
-  org::apache::nifi::minifi::core::ProcessGroup 
*createRemoteProcessGroup(const std::string& /*name*/, const 
org::apache::nifi::minifi::utils::Identifier& /*uuid*/) {
-    return nullptr;
-  }
-
-  std::shared_ptr<org::apache::nifi::minifi::Connection> 
createConnection(const std::string& /*name*/, const 
org::apache::nifi::minifi::utils::Identifier& /*uuid*/) {
-    return nullptr;
-  }
-};
diff --git a/libminifi/test/libtest/unit/TestBase.cpp 
b/libminifi/test/libtest/unit/TestBase.cpp
index f933465f3..1f00083f8 100644
--- a/libminifi/test/libtest/unit/TestBase.cpp
+++ b/libminifi/test/libtest/unit/TestBase.cpp
@@ -366,7 +366,7 @@ 
std::shared_ptr<minifi::core::controller::ControllerServiceNode> TestPlan::addCo
   minifi::utils::Identifier uuid = 
minifi::utils::IdGenerator::getIdGenerator()->generate();
 
   std::shared_ptr<minifi::core::controller::ControllerServiceNode> 
controller_service_node =
-      controller_services_provider_->createControllerService(controller_name, 
controller_name, name, true /*firstTimeAdded*/);
+      controller_services_provider_->createControllerService(controller_name, 
name);
   if (controller_service_node == nullptr) {
     return nullptr;
   }
diff --git a/libminifi/test/resources/TestC2InvalidController.yml 
b/libminifi/test/resources/TestC2InvalidController.yml
new file mode 100644
index 000000000..f29f0146e
--- /dev/null
+++ b/libminifi/test/resources/TestC2InvalidController.yml
@@ -0,0 +1,41 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+Flow Controller:
+    name: MiNiFi Flow
+    id: 2438e3c8-015a-1000-79ca-83af40ec1998
+Processors:
+    - name: DummmyControllerUserProcessor
+      id: 2438e3c8-015a-1000-79ca-83af40ec1899
+      class: org.apache.nifi.processors.DummmyControllerUserProcessor
+      max concurrent tasks: 1
+      scheduling strategy: TIMER_DRIVEN
+      scheduling period: 100 msec
+      penalization period: 30 sec
+      yield period: 10 sec
+      run duration nanos: 0
+      auto-terminated relationships list:
+      Properties:
+          Dummy Controller Service: DummyController
+
+Controller Services:
+    - name: DummyController
+      id: 2438e3c8-015a-1000-79ca-83af40ec1888
+      class: DummyController
+      Properties:
+        Dummy Controller Property:
diff --git a/libminifi/test/resources/TestC2ValidController.yml 
b/libminifi/test/resources/TestC2ValidController.yml
new file mode 100644
index 000000000..11eb4ec41
--- /dev/null
+++ b/libminifi/test/resources/TestC2ValidController.yml
@@ -0,0 +1,41 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+Flow Controller:
+    name: MiNiFi Flow
+    id: 2438e3c8-015a-1000-79ca-83af40ec1998
+Processors:
+    - name: DummmyControllerUserProcessor
+      id: 2438e3c8-015a-1000-79ca-83af40ec1899
+      class: org.apache.nifi.processors.DummmyControllerUserProcessor
+      max concurrent tasks: 1
+      scheduling strategy: TIMER_DRIVEN
+      scheduling period: 100 msec
+      penalization period: 30 sec
+      yield period: 10 sec
+      run duration nanos: 0
+      auto-terminated relationships list:
+      Properties:
+          Dummy Controller Service: DummyController
+
+Controller Services:
+    - name: DummyController
+      id: 2438e3c8-015a-1000-79ca-83af40ec1888
+      class: DummyController
+      Properties:
+        Dummy Controller Property: dummy
diff --git a/libminifi/test/unit/SchedulingAgentTests.cpp 
b/libminifi/test/unit/SchedulingAgentTests.cpp
index 298d1c60e..2a6a7abc8 100644
--- a/libminifi/test/unit/SchedulingAgentTests.cpp
+++ b/libminifi/test/unit/SchedulingAgentTests.cpp
@@ -76,7 +76,6 @@ class SchedulingAgentTestFixture {
  protected:
   std::shared_ptr<core::Repository> test_repo_ = 
std::make_shared<TestThreadedRepository>();
   std::shared_ptr<core::ContentRepository> content_repo_ = 
std::make_shared<core::repository::VolatileContentRepository>();
-  std::shared_ptr<minifi::FlowController> controller_ = 
std::make_shared<TestFlowController>(test_repo_, test_repo_, content_repo_);
 
   TestController test_controller_;
   std::shared_ptr<TestPlan> test_plan = test_controller_.createPlan();
diff --git 
a/minifi-api/include/minifi-cpp/core/controller/ControllerServiceProvider.h 
b/minifi-api/include/minifi-cpp/core/controller/ControllerServiceProvider.h
index 4d7a1dd35..2c122fe03 100644
--- a/minifi-api/include/minifi-cpp/core/controller/ControllerServiceProvider.h
+++ b/minifi-api/include/minifi-cpp/core/controller/ControllerServiceProvider.h
@@ -40,7 +40,7 @@ class ControllerServiceProvider : public virtual 
CoreComponent, public virtual C
  public:
   ~ControllerServiceProvider() override = default;
 
-  virtual std::shared_ptr<ControllerServiceNode> createControllerService(const 
std::string &type, const std::string &longType, const std::string &id, bool 
firstTimeAdded) = 0;
+  virtual std::shared_ptr<ControllerServiceNode> createControllerService(const 
std::string &type, const std::string &id) = 0;
   virtual ControllerServiceNode* getControllerServiceNode(const std::string 
&id) const = 0;
   virtual ControllerServiceNode* getControllerServiceNode(const std::string 
&id, const utils::Identifier &processor_or_controller_uuid) const = 0;
   virtual void putControllerServiceNode(const std::string& identifier, const 
std::shared_ptr<ControllerServiceNode>& controller_service_node, ProcessGroup* 
process_group) = 0;
diff --git a/utils/include/core/ProcessContext.h 
b/utils/include/core/ProcessContext.h
index c4c8c7eb8..2606058b3 100644
--- a/utils/include/core/ProcessContext.h
+++ b/utils/include/core/ProcessContext.h
@@ -110,7 +110,11 @@ class ProcessContextImpl : public 
core::VariableRegistryImpl, public virtual Pro
   // controller services
 
   std::shared_ptr<core::controller::ControllerService> 
getControllerService(const std::string &identifier, const utils::Identifier 
&processor_uuid) const override {
-    return controller_service_provider_ == nullptr ? nullptr : 
controller_service_provider_->getControllerService(identifier, processor_uuid);
+    auto controller_service = controller_service_provider_ == nullptr ? 
nullptr : controller_service_provider_->getControllerService(identifier, 
processor_uuid);
+    if (!controller_service || controller_service->getState() != 
core::controller::ControllerServiceState::ENABLED) {
+      return nullptr;
+    }
+    return controller_service;
   }
 
   void initializeContentRepository(const std::string& home) override {
@@ -149,8 +153,8 @@ class ProcessContextImpl : public 
core::VariableRegistryImpl, public virtual Pro
     const auto path = 
configuration->getWithFallback(Configure::nifi_state_storage_local_path, 
Configure::nifi_state_storage_local_path_old);
 
     /* Function to help creating a state storage */
-    auto create_provider = [&](const std::string& type, const std::string& 
longType, const std::unordered_map<std::string, std::string>& extraProperties) 
-> std::shared_ptr<core::StateStorage> {
-      auto new_node = 
controller_service_provider->createControllerService(type, longType, 
DefaultStateStorageName, true /*firstTimeAdded*/);
+    auto create_provider = [&](const std::string& type, const 
std::unordered_map<std::string, std::string>& extraProperties) -> 
std::shared_ptr<core::StateStorage> {
+      auto new_node = 
controller_service_provider->createControllerService(type, 
DefaultStateStorageName);
       if (new_node == nullptr) { return nullptr; }
       new_node->initialize();
       auto storage = new_node->getControllerServiceImplementation();
@@ -169,19 +173,19 @@ class ProcessContextImpl : public 
core::VariableRegistryImpl, public virtual Pro
 
     /* Try to create a RocksDB-backed provider */
     if (preferredType.empty() || preferredType == 
"RocksDbPersistableKeyValueStoreService" || preferredType == 
"RocksDbStateStorage") {
-      auto provider = create_provider("RocksDbStateStorage", 
"org.apache.nifi.minifi.controllers.RocksDbStateStorage", {{"Directory", 
path.value_or("corecomponentstate")}});
+      auto provider = create_provider("RocksDbStateStorage", {{"Directory", 
path.value_or("corecomponentstate")}});
       if (provider != nullptr) { return provider; }
     }
 
     /* Fall back to a locked unordered map-backed provider */
     if (preferredType.empty() || preferredType == 
"UnorderedMapPersistableKeyValueStoreService" || preferredType == 
"PersistentMapStateStorage") {
-      auto provider = create_provider("PersistentMapStateStorage", 
"org.apache.nifi.minifi.controllers.PersistentMapStateStorage", {{"File", 
path.value_or("corecomponentstate.txt")}});
+      auto provider = create_provider("PersistentMapStateStorage", {{"File", 
path.value_or("corecomponentstate.txt")}});
       if (provider != nullptr) { return provider; }
     }
 
     /* Fall back to volatile memory-backed provider */
     if (preferredType.empty() || preferredType == 
"UnorderedMapKeyValueStoreService" || preferredType == 
"VolatileMapStateStorage") {
-      auto provider = create_provider("VolatileMapStateStorage", 
"org.apache.nifi.minifi.controllers.VolatileMapStateStorage", {});
+      auto provider = create_provider("VolatileMapStateStorage", {});
       if (provider != nullptr) { return provider; }
     }
 


Reply via email to