fgerlits commented on code in PR #1503:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1503#discussion_r1143282678


##########
controller/Controller.h:
##########
@@ -18,346 +18,61 @@
 #pragma once
 
 #include <memory>
+#include <string>
 
-#include "core/RepositoryFactory.h"
-#include "core/ConfigurationFactory.h"
-#include "core/extension/ExtensionManager.h"
 #include "io/ClientSocket.h"
-#include "c2/ControllerSocketProtocol.h"
-#include "utils/gsl.h"
-#include "Exception.h"
-#include "FlowController.h"
-#include "core/repository/VolatileContentRepository.h"
-#include "core/repository/VolatileFlowFileRepository.h"
-#include "core/state/MetricsPublisherFactory.h"
-#include "core/state/MetricsPublisherStore.h"
+
+namespace org::apache::nifi::minifi::controller {
 
 /**
  * Sends a single argument comment
  * @param socket socket unique ptr.
  * @param op operation to perform
  * @param value value to send
  */
-bool sendSingleCommand(std::unique_ptr<org::apache::nifi::minifi::io::Socket> 
socket, uint8_t op, const std::string& value) {
-  socket->initialize();
-  std::vector<uint8_t> data;
-  org::apache::nifi::minifi::io::BufferStream stream;
-  stream.write(&op, 1);
-  stream.write(value);
-  return socket->write(stream.getBuffer()) == stream.size();
-}
+bool sendSingleCommand(std::unique_ptr<io::Socket> socket, uint8_t op, const 
std::string& value);
 
 /**
  * Stops a stopped component
  * @param socket socket unique ptr.
  * @param op operation to perform
  */
-bool stopComponent(std::unique_ptr<org::apache::nifi::minifi::io::Socket> 
socket, const std::string& component) {
-  return sendSingleCommand(std::move(socket), 
org::apache::nifi::minifi::c2::Operation::STOP, component);
-}
+bool stopComponent(std::unique_ptr<io::Socket> socket, const std::string& 
component);
 
 /**
  * Starts a previously stopped component.
  * @param socket socket unique ptr.
  * @param op operation to perform
  */
-bool startComponent(std::unique_ptr<org::apache::nifi::minifi::io::Socket> 
socket, const std::string& component) {
-  return sendSingleCommand(std::move(socket), 
org::apache::nifi::minifi::c2::Operation::START, component);
-}
+bool startComponent(std::unique_ptr<io::Socket> socket, const std::string& 
component);
 
 /**
  * Clears a connection queue.
  * @param socket socket unique ptr.
  * @param op operation to perform
  */
-bool clearConnection(std::unique_ptr<org::apache::nifi::minifi::io::Socket> 
socket, const std::string& connection) {
-  return sendSingleCommand(std::move(socket), 
org::apache::nifi::minifi::c2::Operation::CLEAR, connection);
-}
+bool clearConnection(std::unique_ptr<io::Socket> socket, const std::string& 
connection);
 
 /**
  * Updates the flow to the provided file
  */
-int updateFlow(std::unique_ptr<org::apache::nifi::minifi::io::Socket> socket, 
std::ostream &out, const std::string& file) {
-  socket->initialize();
-  std::vector<uint8_t> data;
-  uint8_t op = org::apache::nifi::minifi::c2::Operation::UPDATE;
-  org::apache::nifi::minifi::io::BufferStream stream;
-  stream.write(&op, 1);
-  stream.write("flow");
-  stream.write(file);
-  if 
(org::apache::nifi::minifi::io::isError(socket->write(stream.getBuffer()))) {
-    return -1;
-  }
-  // read the response
-  uint8_t resp = 0;
-  socket->read(resp);
-  if (resp == org::apache::nifi::minifi::c2::Operation::DESCRIBE) {
-    uint16_t connections = 0;
-    socket->read(connections);
-    out << connections << " are full" << std::endl;
-    for (int i = 0; i < connections; i++) {
-      std::string fullcomponent;
-      socket->read(fullcomponent);
-      out << fullcomponent << " is full" << std::endl;
-    }
-  }
-  return 0;
-}
+int updateFlow(std::unique_ptr<io::Socket> socket, std::ostream &out, const 
std::string& file);
 
 /**
  * Lists connections which are full
  * @param socket socket ptr
  */
-int getFullConnections(std::unique_ptr<org::apache::nifi::minifi::io::Socket> 
socket, std::ostream &out) {
-  socket->initialize();
-  std::vector<uint8_t> data;
-  uint8_t op = org::apache::nifi::minifi::c2::Operation::DESCRIBE;
-  org::apache::nifi::minifi::io::BufferStream stream;
-  stream.write(&op, 1);
-  stream.write("getfull");
-  if 
(org::apache::nifi::minifi::io::isError(socket->write(stream.getBuffer()))) {
-    return -1;
-  }
-  // read the response
-  uint8_t resp = 0;
-  socket->read(resp);
-  if (resp == org::apache::nifi::minifi::c2::Operation::DESCRIBE) {
-    uint16_t connections = 0;
-    socket->read(connections);
-    out << connections << " are full" << std::endl;
-    for (int i = 0; i < connections; i++) {
-      std::string fullcomponent;
-      socket->read(fullcomponent);
-      out << fullcomponent << " is full" << std::endl;
-    }
-  }
-  return 0;
-}
-
-int getJstacks(std::unique_ptr<org::apache::nifi::minifi::io::Socket> socket, 
std::ostream &out) {

Review Comment:
   Why did you remove this?  I don't know if it works, but as long as `C2Agent` 
supports `DescribeOperand::JSTACK`, we might as well support it in the 
controller, too.



##########
controller/MiNiFiController.cpp:
##########
@@ -15,57 +15,52 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#include <fcntl.h>
-#include <cstdio>
-#include <semaphore.h>
-#include <csignal>
 #include <vector>
-#include <queue>
-#include <map>
 #include <iostream>
 
-#include "core/Core.h"
-
-#include "core/FlowConfiguration.h"
-#include "core/ConfigurationFactory.h"
-#include "core/RepositoryFactory.h"
-#include "FlowController.h"
 #include "MainHelper.h"
 #include "properties/Configure.h"
 #include "Controller.h"
 #include "c2/ControllerSocketProtocol.h"
+#include "core/controller/ControllerService.h"
+#include "core/extension/ExtensionManager.h"
+#include "io/StreamFactory.h"
+#include "core/ConfigurationFactory.h"
 
 #include "cxxopts.hpp"
 
 namespace minifi = org::apache::nifi::minifi;
 
-int main(int argc, char **argv) {
-  const auto logger = 
minifi::core::logging::LoggerConfiguration::getConfiguration().getLogger("controller");
+std::shared_ptr<minifi::core::controller::ControllerService> 
getControllerService(const std::shared_ptr<minifi::Configure> &configuration,
+    const std::string &service_name, const std::string& minifi_home) {
+  std::string nifi_configuration_class_name = "yamlconfiguration";

Review Comment:
   We could change this default to `"adaptiveconfiguration"`, and support JSON 
like we already do in `MiNiFiMain.cpp`.  But this can also be done separately 
later.



##########
encrypt-config/tests/ConfigFileEncryptorTests.cpp:
##########
@@ -77,7 +77,7 @@ TEST_CASE("ConfigFileEncryptor can encrypt the sensitive 
properties", "[encrypt-
     uint32_t num_properties_encrypted = 
encryptSensitivePropertiesInFile(test_file, KEY);
 
     REQUIRE(num_properties_encrypted == 1);
-    REQUIRE(test_file.size() == 107);
+    REQUIRE(test_file.size() == 110);

Review Comment:
   why did this change?



##########
libminifi/src/c2/ControllerSocketMetricsPublisher.cpp:
##########
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "c2/ControllerSocketMetricsPublisher.h"
+#include "utils/gsl.h"
+#include "core/Resource.h"
+#include "c2/C2Agent.h"
+
+namespace org::apache::nifi::minifi::c2 {
+
+std::unordered_map<std::string, ControllerSocketReporter::QueueSize> 
ControllerSocketMetricsPublisher::getQueueSizes() {
+  std::lock_guard<std::recursive_mutex> guard(queue_metrics_node_mutex_);
+  std::unordered_map<std::string, QueueSize> sizes;
+  if (!queue_metrics_node_) {
+    return sizes;
+  }
+  for (const auto& metric : queue_metrics_node_->calculateMetrics()) {
+    gsl_Expects(metric.labels.contains("connection_name"));
+    if (metric.name == "queue_size") {
+      sizes[metric.labels.at("connection_name")].queue_size = 
static_cast<uint32_t>(metric.value);
+    } else if (metric.name == "queue_size_max") {
+      sizes[metric.labels.at("connection_name")].queue_size_max = 
static_cast<uint32_t>(metric.value);
+    }
+  }
+  return sizes;
+}
+
+std::unordered_set<std::string> 
ControllerSocketMetricsPublisher::getFullConnections() {
+  std::lock_guard<std::recursive_mutex> guard(queue_metrics_node_mutex_);
+  std::unordered_set<std::string> full_connections;
+  if (!queue_metrics_node_) {
+    return full_connections;
+  }

Review Comment:
   I don't think this check is necessary, since if `queue_metrics_node_` is 
null, then `queues` will be empty.
   
   If we remove this check, then I think we can remove the lock, as well, and 
then make `queue_metrics_node_mutex_` non-recursive.



##########
controller/tests/ControllerTests.cpp:
##########
@@ -0,0 +1,484 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vector>
+#include <memory>
+#include <utility>
+#include <string>
+#include <filesystem>
+#include "TestBase.h"
+#include "Catch.h"
+#include "io/ClientSocket.h"
+#include "core/Processor.h"
+#include "Controller.h"
+#include "c2/ControllerSocketProtocol.h"
+#include "utils/IntegrationTestUtils.h"
+#include "c2/ControllerSocketMetricsPublisher.h"
+#include "core/controller/ControllerServiceProvider.h"
+#include "controllers/SSLContextService.h"
+
+#include "state/UpdateController.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::test {
+
+class TestStateController : public minifi::state::StateController {
+ public:
+  TestStateController()
+    : is_running(false) {
+  }
+
+  std::string getComponentName() const override {
+    return "TestStateController";
+  }
+
+  minifi::utils::Identifier getComponentUUID() const override {
+    static auto dummyUUID = 
minifi::utils::Identifier::parse("12345678-1234-1234-1234-123456789abc").value();
+    return dummyUUID;
+  }
+
+  int16_t start() override {
+    is_running = true;
+    return 0;
+  }
+
+  int16_t stop() override {
+    is_running = false;
+    return 0;
+  }
+
+  bool isRunning() const override {
+    return is_running;
+  }
+
+  int16_t pause() override {
+    return 0;
+  }
+
+  int16_t resume() override {
+    return 0;
+  }
+
+  std::atomic<bool> is_running;
+};
+
+class TestUpdateSink : public minifi::state::StateMonitor {
+ public:
+  explicit TestUpdateSink(std::shared_ptr<StateController> controller)
+    : is_running(true),
+      clear_calls(0),
+      controller(std::move(controller)),
+      update_calls(0) {
+  }
+
+  void executeOnComponent(const std::string&, 
std::function<void(minifi::state::StateController&)> func) override {
+    func(*controller);
+  }
+
+  void 
executeOnAllComponents(std::function<void(minifi::state::StateController&)> 
func) override {
+    func(*controller);
+  }
+
+  std::string getComponentName() const override {
+    return "TestUpdateSink";
+  }
+
+  minifi::utils::Identifier getComponentUUID() const override {
+    static auto dummyUUID = 
minifi::utils::Identifier::parse("12345678-1234-1234-1234-123456789abc").value();
+    return dummyUUID;
+  }
+
+  int16_t start() override {
+    is_running = true;
+    return 0;
+  }
+
+  int16_t stop() override {
+    is_running = false;
+    return 0;
+  }
+
+  bool isRunning() const override {
+    return is_running;
+  }
+
+  int16_t pause() override {
+    return 0;
+  }
+
+  int16_t resume() override {
+    return 0;
+  }
+  std::vector<BackTrace> getTraces() override {
+    std::vector<BackTrace> traces;
+    return traces;
+  }
+
+  int16_t drainRepositories() override {
+    return 0;
+  }
+
+  std::map<std::string, std::unique_ptr<minifi::io::InputStream>> 
getDebugInfo() override {
+    return {};
+  }
+
+  int16_t clearConnection(const std::string& /*connection*/) override {
+    clear_calls++;
+    return 0;
+  }
+
+  std::vector<std::string> getSupportedConfigurationFormats() const override {
+    return {};
+  }
+
+  int16_t applyUpdate(const std::string& /*source*/, const std::string& 
/*configuration*/, bool /*persist*/ = false, const std::optional<std::string>& 
/*flow_id*/ = std::nullopt) override {
+    update_calls++;
+    return 0;
+  }
+
+  int16_t applyUpdate(const std::string& /*source*/, const 
std::shared_ptr<minifi::state::Update>& /*updateController*/) override {
+    update_calls++;
+    return 0;
+  }
+
+  uint64_t getUptime() override {
+    return 8765309;
+  }
+
+  std::atomic<bool> is_running;
+  std::atomic<uint32_t> clear_calls;
+  std::shared_ptr<StateController> controller;
+  std::atomic<uint32_t> update_calls;
+};
+
+class TestControllerSocketReporter : public c2::ControllerSocketReporter {
+  std::unordered_map<std::string, ControllerSocketReporter::QueueSize> 
getQueueSizes() override {
+    return {
+      {"con1", {1, 2}},
+      {"con2", {3, 3}}
+    };
+  }
+
+  std::unordered_set<std::string> getFullConnections() override {
+    return {"con2"};
+  }
+
+  std::unordered_set<std::string> getConnections() override {
+    return {"con1", "con2"};
+  }
+
+  std::string getAgentManifest() override {
+    return "testAgentManifest";
+  }
+};
+
+class TestControllerServiceProvider : public 
core::controller::ControllerServiceProvider {
+ public:
+  explicit 
TestControllerServiceProvider(std::shared_ptr<controllers::SSLContextService> 
ssl_context_service)
+    : 
core::controller::ControllerServiceProvider("TestControllerServiceProvider"),
+      ssl_context_service_(std::move(ssl_context_service)) {
+  }
+  std::shared_ptr<core::controller::ControllerService> 
getControllerService(const std::string&) const override {
+    return ssl_context_service_;
+  }
+
+  std::shared_ptr<core::controller::ControllerServiceNode> 
createControllerService(const std::string&, const std::string&, const 
std::string&, bool) override {
+    return nullptr;
+  }
+  void clearControllerServices() override {
+  }
+  void enableAllControllerServices() override {
+  }
+  void disableAllControllerServices() override {
+  }
+
+ private:
+  std::shared_ptr<controllers::SSLContextService> ssl_context_service_;
+};
+
+class ControllerTestFixture {
+ public:
+  enum class ConnectionType {
+    UNSECURE,
+    SSL_FROM_SERVICE_PROVIDER,
+    SSL_FROM_CONFIGURATION
+  };
+
+  ControllerTestFixture()
+    : configuration_(std::make_shared<minifi::Configure>()),
+      controller_(std::make_shared<TestStateController>()),
+      update_sink_(std::make_unique<TestUpdateSink>(controller_)),
+      stream_factory_(minifi::io::StreamFactory::getInstance(configuration_)) {
+    configuration_->set(minifi::Configure::controller_socket_host, 
"localhost");
+    configuration_->set(minifi::Configure::controller_socket_port, "9997");
+    configuration_->set(minifi::Configure::nifi_security_client_certificate, 
(minifi::utils::file::FileUtils::get_executable_dir() / "resources" / 
"minifi-cpp-flow.crt").string());
+    configuration_->set(minifi::Configure::nifi_security_client_private_key, 
(minifi::utils::file::FileUtils::get_executable_dir() / "resources" / 
"minifi-cpp-flow.key").string());
+    configuration_->set(minifi::Configure::nifi_security_client_pass_phrase, 
"abcdefgh");
+    
configuration_->set(minifi::Configure::nifi_security_client_ca_certificate, 
(minifi::utils::file::FileUtils::get_executable_dir() / "resources" / 
"root-ca.pem").string());
+    configuration_->set(minifi::Configure::controller_ssl_context_service, 
"SSLContextService");
+    ssl_context_service_ = 
std::make_shared<controllers::SSLContextService>("SSLContextService", 
configuration_);
+    ssl_context_service_->onEnable();
+    controller_service_provider_ = 
std::make_unique<TestControllerServiceProvider>(ssl_context_service_);
+  }
+
+  void initalizeControllerSocket(const 
std::shared_ptr<c2::ControllerSocketReporter>& reporter = nullptr) {
+    if (connection_type_ == ConnectionType::SSL_FROM_CONFIGURATION) {
+      configuration_->set(minifi::Configure::nifi_remote_input_secure, "true");
+    }
+    controller_socket_protocol_ = 
std::make_unique<minifi::c2::ControllerSocketProtocol>(
+      connection_type_ == ConnectionType::SSL_FROM_SERVICE_PROVIDER ? 
controller_service_provider_.get() : nullptr, update_sink_.get(), 
configuration_, reporter);
+    controller_socket_protocol_->initialize();
+  }
+
+  std::unique_ptr<minifi::io::Socket> createSocket() {
+    if (connection_type_ == ConnectionType::UNSECURE) {
+      return stream_factory_->createSocket("localhost", 9997);
+    } else {
+      return stream_factory_->createSecureSocket("localhost", 9997, 
ssl_context_service_);
+    }
+  }
+
+  void setConnectionType(ConnectionType connection_type) {
+    connection_type_ = connection_type;
+  }
+
+ protected:
+  ConnectionType connection_type_ = ConnectionType::UNSECURE;
+  std::shared_ptr<minifi::Configure> configuration_;
+  std::shared_ptr<TestStateController> controller_;
+  std::unique_ptr<TestUpdateSink> update_sink_;
+  std::shared_ptr<minifi::io::StreamFactory> stream_factory_;
+  std::unique_ptr<minifi::c2::ControllerSocketProtocol> 
controller_socket_protocol_;
+  std::shared_ptr<controllers::SSLContextService> ssl_context_service_;
+  std::unique_ptr<TestControllerServiceProvider> controller_service_provider_;
+};
+
+#ifdef WIN32
+TEST_CASE("TestWindows", "[test1]") {
+  std::cout << "Controller Tests are not supported on windows";
+}
+#else
+TEST_CASE_METHOD(ControllerTestFixture, "TestGet", "[test1]") {
+  SECTION("With SSL from service provider") {
+    
setConnectionType(ControllerTestFixture::ConnectionType::SSL_FROM_SERVICE_PROVIDER);
+  }
+
+  SECTION("With SSL from properties") {
+    
setConnectionType(ControllerTestFixture::ConnectionType::SSL_FROM_CONFIGURATION);
+  }
+
+  SECTION("Without SSL") {
+    setConnectionType(ControllerTestFixture::ConnectionType::UNSECURE);
+  }
+
+  initalizeControllerSocket();
+
+  auto socket = createSocket();
+
+  minifi::controller::startComponent(std::move(socket), "TestStateController");
+
+  using org::apache::nifi::minifi::utils::verifyEventHappenedInPollTime;
+  REQUIRE(verifyEventHappenedInPollTime(500ms, [&] { return 
controller_->isRunning(); }, 20ms));
+
+  socket = createSocket();
+
+  minifi::controller::stopComponent(std::move(socket), "TestStateController");
+
+  REQUIRE(verifyEventHappenedInPollTime(500ms, [&] { return 
!controller_->isRunning(); }, 20ms));
+
+  socket = createSocket();
+  std::stringstream ss;
+  minifi::controller::listComponents(std::move(socket), ss);
+
+  REQUIRE(ss.str().find("TestStateController") != std::string::npos);

Review Comment:
   Why don't we check the whole returned value?



##########
controller/tests/ControllerTests.cpp:
##########
@@ -0,0 +1,484 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vector>
+#include <memory>
+#include <utility>
+#include <string>
+#include <filesystem>
+#include "TestBase.h"
+#include "Catch.h"
+#include "io/ClientSocket.h"
+#include "core/Processor.h"
+#include "Controller.h"
+#include "c2/ControllerSocketProtocol.h"
+#include "utils/IntegrationTestUtils.h"
+#include "c2/ControllerSocketMetricsPublisher.h"
+#include "core/controller/ControllerServiceProvider.h"
+#include "controllers/SSLContextService.h"
+
+#include "state/UpdateController.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::test {
+
+class TestStateController : public minifi::state::StateController {
+ public:
+  TestStateController()
+    : is_running(false) {
+  }
+
+  std::string getComponentName() const override {
+    return "TestStateController";
+  }
+
+  minifi::utils::Identifier getComponentUUID() const override {
+    static auto dummyUUID = 
minifi::utils::Identifier::parse("12345678-1234-1234-1234-123456789abc").value();
+    return dummyUUID;
+  }
+
+  int16_t start() override {
+    is_running = true;
+    return 0;
+  }
+
+  int16_t stop() override {
+    is_running = false;
+    return 0;
+  }
+
+  bool isRunning() const override {
+    return is_running;
+  }
+
+  int16_t pause() override {
+    return 0;
+  }
+
+  int16_t resume() override {
+    return 0;
+  }
+
+  std::atomic<bool> is_running;
+};
+
+class TestUpdateSink : public minifi::state::StateMonitor {
+ public:
+  explicit TestUpdateSink(std::shared_ptr<StateController> controller)
+    : is_running(true),
+      clear_calls(0),
+      controller(std::move(controller)),
+      update_calls(0) {
+  }
+
+  void executeOnComponent(const std::string&, 
std::function<void(minifi::state::StateController&)> func) override {
+    func(*controller);
+  }
+
+  void 
executeOnAllComponents(std::function<void(minifi::state::StateController&)> 
func) override {
+    func(*controller);
+  }
+
+  std::string getComponentName() const override {
+    return "TestUpdateSink";
+  }
+
+  minifi::utils::Identifier getComponentUUID() const override {
+    static auto dummyUUID = 
minifi::utils::Identifier::parse("12345678-1234-1234-1234-123456789abc").value();
+    return dummyUUID;
+  }
+
+  int16_t start() override {
+    is_running = true;
+    return 0;
+  }
+
+  int16_t stop() override {
+    is_running = false;
+    return 0;
+  }
+
+  bool isRunning() const override {
+    return is_running;
+  }
+
+  int16_t pause() override {
+    return 0;
+  }
+
+  int16_t resume() override {
+    return 0;
+  }
+  std::vector<BackTrace> getTraces() override {
+    std::vector<BackTrace> traces;
+    return traces;
+  }
+
+  int16_t drainRepositories() override {
+    return 0;
+  }
+
+  std::map<std::string, std::unique_ptr<minifi::io::InputStream>> 
getDebugInfo() override {
+    return {};
+  }
+
+  int16_t clearConnection(const std::string& /*connection*/) override {
+    clear_calls++;
+    return 0;
+  }
+
+  std::vector<std::string> getSupportedConfigurationFormats() const override {
+    return {};
+  }
+
+  int16_t applyUpdate(const std::string& /*source*/, const std::string& 
/*configuration*/, bool /*persist*/ = false, const std::optional<std::string>& 
/*flow_id*/ = std::nullopt) override {
+    update_calls++;
+    return 0;
+  }
+
+  int16_t applyUpdate(const std::string& /*source*/, const 
std::shared_ptr<minifi::state::Update>& /*updateController*/) override {
+    update_calls++;
+    return 0;
+  }
+
+  uint64_t getUptime() override {
+    return 8765309;
+  }
+
+  std::atomic<bool> is_running;
+  std::atomic<uint32_t> clear_calls;
+  std::shared_ptr<StateController> controller;
+  std::atomic<uint32_t> update_calls;
+};
+
+class TestControllerSocketReporter : public c2::ControllerSocketReporter {
+  std::unordered_map<std::string, ControllerSocketReporter::QueueSize> 
getQueueSizes() override {
+    return {
+      {"con1", {1, 2}},
+      {"con2", {3, 3}}
+    };
+  }
+
+  std::unordered_set<std::string> getFullConnections() override {
+    return {"con2"};
+  }
+
+  std::unordered_set<std::string> getConnections() override {
+    return {"con1", "con2"};
+  }
+
+  std::string getAgentManifest() override {
+    return "testAgentManifest";
+  }
+};
+
+class TestControllerServiceProvider : public 
core::controller::ControllerServiceProvider {
+ public:
+  explicit 
TestControllerServiceProvider(std::shared_ptr<controllers::SSLContextService> 
ssl_context_service)
+    : 
core::controller::ControllerServiceProvider("TestControllerServiceProvider"),
+      ssl_context_service_(std::move(ssl_context_service)) {
+  }
+  std::shared_ptr<core::controller::ControllerService> 
getControllerService(const std::string&) const override {
+    return ssl_context_service_;
+  }
+
+  std::shared_ptr<core::controller::ControllerServiceNode> 
createControllerService(const std::string&, const std::string&, const 
std::string&, bool) override {
+    return nullptr;
+  }
+  void clearControllerServices() override {
+  }
+  void enableAllControllerServices() override {
+  }
+  void disableAllControllerServices() override {
+  }
+
+ private:
+  std::shared_ptr<controllers::SSLContextService> ssl_context_service_;
+};
+
+class ControllerTestFixture {
+ public:
+  enum class ConnectionType {
+    UNSECURE,
+    SSL_FROM_SERVICE_PROVIDER,
+    SSL_FROM_CONFIGURATION
+  };
+
+  ControllerTestFixture()
+    : configuration_(std::make_shared<minifi::Configure>()),
+      controller_(std::make_shared<TestStateController>()),
+      update_sink_(std::make_unique<TestUpdateSink>(controller_)),
+      stream_factory_(minifi::io::StreamFactory::getInstance(configuration_)) {
+    configuration_->set(minifi::Configure::controller_socket_host, 
"localhost");
+    configuration_->set(minifi::Configure::controller_socket_port, "9997");
+    configuration_->set(minifi::Configure::nifi_security_client_certificate, 
(minifi::utils::file::FileUtils::get_executable_dir() / "resources" / 
"minifi-cpp-flow.crt").string());
+    configuration_->set(minifi::Configure::nifi_security_client_private_key, 
(minifi::utils::file::FileUtils::get_executable_dir() / "resources" / 
"minifi-cpp-flow.key").string());
+    configuration_->set(minifi::Configure::nifi_security_client_pass_phrase, 
"abcdefgh");
+    
configuration_->set(minifi::Configure::nifi_security_client_ca_certificate, 
(minifi::utils::file::FileUtils::get_executable_dir() / "resources" / 
"root-ca.pem").string());
+    configuration_->set(minifi::Configure::controller_ssl_context_service, 
"SSLContextService");
+    ssl_context_service_ = 
std::make_shared<controllers::SSLContextService>("SSLContextService", 
configuration_);
+    ssl_context_service_->onEnable();
+    controller_service_provider_ = 
std::make_unique<TestControllerServiceProvider>(ssl_context_service_);
+  }
+
+  void initalizeControllerSocket(const 
std::shared_ptr<c2::ControllerSocketReporter>& reporter = nullptr) {
+    if (connection_type_ == ConnectionType::SSL_FROM_CONFIGURATION) {
+      configuration_->set(minifi::Configure::nifi_remote_input_secure, "true");
+    }
+    controller_socket_protocol_ = 
std::make_unique<minifi::c2::ControllerSocketProtocol>(
+      connection_type_ == ConnectionType::SSL_FROM_SERVICE_PROVIDER ? 
controller_service_provider_.get() : nullptr, update_sink_.get(), 
configuration_, reporter);
+    controller_socket_protocol_->initialize();
+  }
+
+  std::unique_ptr<minifi::io::Socket> createSocket() {
+    if (connection_type_ == ConnectionType::UNSECURE) {
+      return stream_factory_->createSocket("localhost", 9997);
+    } else {
+      return stream_factory_->createSecureSocket("localhost", 9997, 
ssl_context_service_);
+    }
+  }
+
+  void setConnectionType(ConnectionType connection_type) {
+    connection_type_ = connection_type;
+  }
+
+ protected:
+  ConnectionType connection_type_ = ConnectionType::UNSECURE;
+  std::shared_ptr<minifi::Configure> configuration_;
+  std::shared_ptr<TestStateController> controller_;
+  std::unique_ptr<TestUpdateSink> update_sink_;
+  std::shared_ptr<minifi::io::StreamFactory> stream_factory_;
+  std::unique_ptr<minifi::c2::ControllerSocketProtocol> 
controller_socket_protocol_;
+  std::shared_ptr<controllers::SSLContextService> ssl_context_service_;
+  std::unique_ptr<TestControllerServiceProvider> controller_service_provider_;
+};
+
+#ifdef WIN32
+TEST_CASE("TestWindows", "[test1]") {
+  std::cout << "Controller Tests are not supported on windows";
+}
+#else
+TEST_CASE_METHOD(ControllerTestFixture, "TestGet", "[test1]") {
+  SECTION("With SSL from service provider") {
+    
setConnectionType(ControllerTestFixture::ConnectionType::SSL_FROM_SERVICE_PROVIDER);
+  }
+
+  SECTION("With SSL from properties") {
+    
setConnectionType(ControllerTestFixture::ConnectionType::SSL_FROM_CONFIGURATION);
+  }
+
+  SECTION("Without SSL") {
+    setConnectionType(ControllerTestFixture::ConnectionType::UNSECURE);
+  }
+
+  initalizeControllerSocket();
+
+  auto socket = createSocket();
+
+  minifi::controller::startComponent(std::move(socket), "TestStateController");
+
+  using org::apache::nifi::minifi::utils::verifyEventHappenedInPollTime;
+  REQUIRE(verifyEventHappenedInPollTime(500ms, [&] { return 
controller_->isRunning(); }, 20ms));
+
+  socket = createSocket();
+
+  minifi::controller::stopComponent(std::move(socket), "TestStateController");
+
+  REQUIRE(verifyEventHappenedInPollTime(500ms, [&] { return 
!controller_->isRunning(); }, 20ms));
+
+  socket = createSocket();
+  std::stringstream ss;
+  minifi::controller::listComponents(std::move(socket), ss);
+
+  REQUIRE(ss.str().find("TestStateController") != std::string::npos);
+}
+
+TEST_CASE_METHOD(ControllerTestFixture, "TestClear", "[test1]") {
+  SECTION("With SSL from service provider") {
+    
setConnectionType(ControllerTestFixture::ConnectionType::SSL_FROM_SERVICE_PROVIDER);
+  }
+
+  SECTION("With SSL from properties") {
+    
setConnectionType(ControllerTestFixture::ConnectionType::SSL_FROM_CONFIGURATION);
+  }
+
+  SECTION("Without SSL") {
+    setConnectionType(ControllerTestFixture::ConnectionType::UNSECURE);
+  }
+
+  initalizeControllerSocket();
+
+  auto socket = createSocket();
+
+  minifi::controller::startComponent(std::move(socket), "TestStateController");
+
+  using org::apache::nifi::minifi::utils::verifyEventHappenedInPollTime;
+  REQUIRE(verifyEventHappenedInPollTime(500ms, [&] { return 
controller_->isRunning(); }, 20ms));
+
+  socket = createSocket();
+
+  minifi::controller::clearConnection(std::move(socket), "connection");
+
+  socket = createSocket();
+
+  minifi::controller::clearConnection(std::move(socket), "connection");
+
+  socket = createSocket();
+
+  minifi::controller::clearConnection(std::move(socket), "connection");
+
+  REQUIRE(verifyEventHappenedInPollTime(500ms, [&] { return 3 == 
update_sink_->clear_calls; }, 20ms));
+}
+
+TEST_CASE_METHOD(ControllerTestFixture, "TestUpdate", "[test1]") {
+  SECTION("With SSL from service provider") {
+    
setConnectionType(ControllerTestFixture::ConnectionType::SSL_FROM_SERVICE_PROVIDER);
+  }
+
+  SECTION("With SSL from properties") {
+    
setConnectionType(ControllerTestFixture::ConnectionType::SSL_FROM_CONFIGURATION);
+  }
+
+  SECTION("Without SSL") {
+    setConnectionType(ControllerTestFixture::ConnectionType::UNSECURE);
+  }
+
+  initalizeControllerSocket();
+
+  auto socket = createSocket();
+
+  minifi::controller::startComponent(std::move(socket), "TestStateController");
+
+  using org::apache::nifi::minifi::utils::verifyEventHappenedInPollTime;
+  REQUIRE(verifyEventHappenedInPollTime(500ms, [&] { return 
controller_->isRunning(); }, 20ms));
+
+  std::stringstream ss;
+
+  socket = createSocket();
+
+  minifi::controller::updateFlow(std::move(socket), ss, "connection");
+
+  REQUIRE(verifyEventHappenedInPollTime(500ms, [&] { return 1 == 
update_sink_->update_calls; }, 20ms));
+  REQUIRE(0 == update_sink_->clear_calls);
+}
+
+TEST_CASE_METHOD(ControllerTestFixture, "Test connection getters on empty 
flow", "[test1]") {
+  SECTION("With SSL from service provider") {
+    
setConnectionType(ControllerTestFixture::ConnectionType::SSL_FROM_SERVICE_PROVIDER);
+  }
+
+  SECTION("With SSL from properties") {
+    
setConnectionType(ControllerTestFixture::ConnectionType::SSL_FROM_CONFIGURATION);
+  }
+
+  SECTION("Without SSL") {
+    setConnectionType(ControllerTestFixture::ConnectionType::UNSECURE);
+  }
+
+  initalizeControllerSocket();
+
+  auto socket = createSocket();
+
+  std::stringstream connection_stream;
+  minifi::controller::getConnectionSize(std::move(socket), connection_stream, 
"con1");
+  CHECK(connection_stream.str() == "Size/Max of con1 not found\n");
+
+  connection_stream.str(std::string());
+  socket = createSocket();
+  minifi::controller::getFullConnections(std::move(socket), connection_stream);
+  CHECK(connection_stream.str() == "0 are full\n");
+
+  connection_stream.str(std::string());
+  socket = createSocket();
+  minifi::controller::listConnections(std::move(socket), connection_stream);
+  CHECK(connection_stream.str() == "Connection Names:\n");
+
+  connection_stream.str(std::string());
+  socket = createSocket();
+  minifi::controller::listConnections(std::move(socket), connection_stream, 
false);
+  CHECK(connection_stream.str().empty());
+}
+
+TEST_CASE_METHOD(ControllerTestFixture, "Test connection getters", "[test1]") {
+  SECTION("With SSL from service provider") {
+    
setConnectionType(ControllerTestFixture::ConnectionType::SSL_FROM_SERVICE_PROVIDER);
+  }
+
+  SECTION("With SSL from properties") {
+    
setConnectionType(ControllerTestFixture::ConnectionType::SSL_FROM_CONFIGURATION);
+  }
+
+  SECTION("Without SSL") {
+    setConnectionType(ControllerTestFixture::ConnectionType::UNSECURE);
+  }
+
+  auto reporter = std::make_shared<TestControllerSocketReporter>();
+  initalizeControllerSocket(reporter);
+
+  std::stringstream connection_stream;
+  auto socket = createSocket();
+  minifi::controller::getConnectionSize(std::move(socket), connection_stream, 
"conn");
+  CHECK(connection_stream.str() == "Size/Max of conn not found\n");
+
+  connection_stream.str(std::string());
+  socket = createSocket();
+  minifi::controller::getConnectionSize(std::move(socket), connection_stream, 
"con1");
+  CHECK(connection_stream.str() == "Size/Max of con1 1 / 2\n");
+
+  connection_stream.str(std::string());
+  socket = createSocket();
+  minifi::controller::getFullConnections(std::move(socket), connection_stream);
+  CHECK(connection_stream.str() == "1 are full\ncon2 is full\n");
+
+  connection_stream.str(std::string());
+  socket = createSocket();
+  minifi::controller::listConnections(std::move(socket), connection_stream);
+  CHECK(connection_stream.str() == "Connection Names:\ncon2\ncon1\n");

Review Comment:
   Is the order of the connections deterministic?  If not, can we split the 
output into lines and verify it without assuming an order?



##########
controller/MiNiFiController.cpp:
##########
@@ -15,57 +15,52 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#include <fcntl.h>
-#include <cstdio>
-#include <semaphore.h>
-#include <csignal>
 #include <vector>
-#include <queue>
-#include <map>
 #include <iostream>
 
-#include "core/Core.h"
-
-#include "core/FlowConfiguration.h"
-#include "core/ConfigurationFactory.h"
-#include "core/RepositoryFactory.h"
-#include "FlowController.h"
 #include "MainHelper.h"
 #include "properties/Configure.h"
 #include "Controller.h"
 #include "c2/ControllerSocketProtocol.h"
+#include "core/controller/ControllerService.h"
+#include "core/extension/ExtensionManager.h"
+#include "io/StreamFactory.h"
+#include "core/ConfigurationFactory.h"
 
 #include "cxxopts.hpp"
 
 namespace minifi = org::apache::nifi::minifi;
 
-int main(int argc, char **argv) {
-  const auto logger = 
minifi::core::logging::LoggerConfiguration::getConfiguration().getLogger("controller");
+std::shared_ptr<minifi::core::controller::ControllerService> 
getControllerService(const std::shared_ptr<minifi::Configure> &configuration,
+    const std::string &service_name, const std::string& minifi_home) {
+  std::string nifi_configuration_class_name = "yamlconfiguration";
 
-  const std::string minifiHome = determineMinifiHome(logger);
-  if (minifiHome.empty()) {
-    // determineMinifiHome already logged everything we need
-    return -1;
-  }
+  minifi::core::extension::ExtensionManager::get().initialize(configuration);
 
-  const auto configuration = std::make_shared<minifi::Configure>();
-  configuration->setHome(minifiHome);
-  configuration->loadConfigureFile(DEFAULT_NIFI_PROPERTIES_FILE);
+  configuration->get(minifi::Configure::nifi_configuration_class_name, 
nifi_configuration_class_name);
+  const auto stream_factory = 
minifi::io::StreamFactory::getInstance(configuration);
+  auto flow_configuration = minifi::core::createFlowConfiguration(
+    minifi::core::ConfigurationContext{nullptr, nullptr, stream_factory, 
configuration, std::filesystem::path(minifi_home) / "conf" / "config.yml"}, 
nifi_configuration_class_name);
 
-  const auto log_properties = 
std::make_shared<minifi::core::logging::LoggerProperties>();
-  log_properties->setHome(minifiHome);
-  log_properties->loadConfigureFile(DEFAULT_LOG_PROPERTIES_FILE);
-  
minifi::core::logging::LoggerConfiguration::getConfiguration().initialize(log_properties);
+  auto root = flow_configuration->getRoot();
+  if (!root) {
+    return nullptr;
+  }
+  auto controller = root->findControllerService(service_name);
+  if (!controller) {
+    return nullptr;
+  }
+  return controller->getControllerServiceImplementation();
+}
 
+std::shared_ptr<minifi::controllers::SSLContextService> 
getSSLContextService(const std::shared_ptr<minifi::Configure>& configuration, 
const std::string& minifi_home) {
+  std::shared_ptr<minifi::controllers::SSLContextService> secure_context;
   std::string context_name;
-
-  std::shared_ptr<minifi::controllers::SSLContextService> secure_context = 
nullptr;
-
   // if the user wishes to use a controller service we need to instantiate the 
flow
-  if (configuration->get("controller.ssl.context.service", context_name)) {
-    const auto service = getControllerService(configuration, context_name);
+  if (configuration->get(minifi::Configure::controller_ssl_context_service, 
context_name)) {
+    const auto service = getControllerService(configuration, context_name, 
minifi_home);
     if (nullptr != service) {
-      secure_context = 
std::static_pointer_cast<minifi::controllers::SSLContextService>(service);
+      secure_context = 
std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(service);

Review Comment:
   I can see this is the same logic as before, but is it a good thing that if 
`controller.ssl.context.service` is set, but there is no such controller 
service defined, or the service is not an `SSLContextService`, then we silently 
fall back to the SSL service in `nifi.remote.input.secure`?
   
   If `controller.ssl.context.service` is not set, then falling back to 
`nifi.remote.input.secure` is good, but if it is set, but incorrectly, then I 
would prefer to print an error message and terminate.



##########
controller/tests/ControllerTests.cpp:
##########
@@ -0,0 +1,484 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vector>
+#include <memory>
+#include <utility>
+#include <string>
+#include <filesystem>
+#include "TestBase.h"
+#include "Catch.h"
+#include "io/ClientSocket.h"
+#include "core/Processor.h"
+#include "Controller.h"
+#include "c2/ControllerSocketProtocol.h"
+#include "utils/IntegrationTestUtils.h"
+#include "c2/ControllerSocketMetricsPublisher.h"
+#include "core/controller/ControllerServiceProvider.h"
+#include "controllers/SSLContextService.h"
+
+#include "state/UpdateController.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::test {
+
+class TestStateController : public minifi::state::StateController {
+ public:
+  TestStateController()
+    : is_running(false) {
+  }
+
+  std::string getComponentName() const override {
+    return "TestStateController";
+  }
+
+  minifi::utils::Identifier getComponentUUID() const override {
+    static auto dummyUUID = 
minifi::utils::Identifier::parse("12345678-1234-1234-1234-123456789abc").value();
+    return dummyUUID;
+  }
+
+  int16_t start() override {
+    is_running = true;
+    return 0;
+  }
+
+  int16_t stop() override {
+    is_running = false;
+    return 0;
+  }
+
+  bool isRunning() const override {
+    return is_running;
+  }
+
+  int16_t pause() override {
+    return 0;
+  }
+
+  int16_t resume() override {
+    return 0;
+  }
+
+  std::atomic<bool> is_running;
+};
+
+class TestUpdateSink : public minifi::state::StateMonitor {
+ public:
+  explicit TestUpdateSink(std::shared_ptr<StateController> controller)
+    : is_running(true),
+      clear_calls(0),
+      controller(std::move(controller)),
+      update_calls(0) {
+  }
+
+  void executeOnComponent(const std::string&, 
std::function<void(minifi::state::StateController&)> func) override {
+    func(*controller);
+  }
+
+  void 
executeOnAllComponents(std::function<void(minifi::state::StateController&)> 
func) override {
+    func(*controller);
+  }
+
+  std::string getComponentName() const override {
+    return "TestUpdateSink";
+  }
+
+  minifi::utils::Identifier getComponentUUID() const override {
+    static auto dummyUUID = 
minifi::utils::Identifier::parse("12345678-1234-1234-1234-123456789abc").value();
+    return dummyUUID;
+  }
+
+  int16_t start() override {
+    is_running = true;
+    return 0;
+  }
+
+  int16_t stop() override {
+    is_running = false;
+    return 0;
+  }
+
+  bool isRunning() const override {
+    return is_running;
+  }
+
+  int16_t pause() override {
+    return 0;
+  }
+
+  int16_t resume() override {
+    return 0;
+  }
+  std::vector<BackTrace> getTraces() override {
+    std::vector<BackTrace> traces;
+    return traces;
+  }
+
+  int16_t drainRepositories() override {
+    return 0;
+  }
+
+  std::map<std::string, std::unique_ptr<minifi::io::InputStream>> 
getDebugInfo() override {
+    return {};
+  }
+
+  int16_t clearConnection(const std::string& /*connection*/) override {
+    clear_calls++;
+    return 0;
+  }
+
+  std::vector<std::string> getSupportedConfigurationFormats() const override {
+    return {};
+  }
+
+  int16_t applyUpdate(const std::string& /*source*/, const std::string& 
/*configuration*/, bool /*persist*/ = false, const std::optional<std::string>& 
/*flow_id*/ = std::nullopt) override {
+    update_calls++;
+    return 0;
+  }
+
+  int16_t applyUpdate(const std::string& /*source*/, const 
std::shared_ptr<minifi::state::Update>& /*updateController*/) override {
+    update_calls++;
+    return 0;
+  }
+
+  uint64_t getUptime() override {
+    return 8765309;
+  }
+
+  std::atomic<bool> is_running;
+  std::atomic<uint32_t> clear_calls;
+  std::shared_ptr<StateController> controller;
+  std::atomic<uint32_t> update_calls;
+};
+
+class TestControllerSocketReporter : public c2::ControllerSocketReporter {
+  std::unordered_map<std::string, ControllerSocketReporter::QueueSize> 
getQueueSizes() override {
+    return {
+      {"con1", {1, 2}},
+      {"con2", {3, 3}}
+    };
+  }
+
+  std::unordered_set<std::string> getFullConnections() override {
+    return {"con2"};
+  }
+
+  std::unordered_set<std::string> getConnections() override {
+    return {"con1", "con2"};
+  }
+
+  std::string getAgentManifest() override {
+    return "testAgentManifest";
+  }
+};
+
+class TestControllerServiceProvider : public 
core::controller::ControllerServiceProvider {
+ public:
+  explicit 
TestControllerServiceProvider(std::shared_ptr<controllers::SSLContextService> 
ssl_context_service)
+    : 
core::controller::ControllerServiceProvider("TestControllerServiceProvider"),
+      ssl_context_service_(std::move(ssl_context_service)) {
+  }
+  std::shared_ptr<core::controller::ControllerService> 
getControllerService(const std::string&) const override {
+    return ssl_context_service_;
+  }
+
+  std::shared_ptr<core::controller::ControllerServiceNode> 
createControllerService(const std::string&, const std::string&, const 
std::string&, bool) override {
+    return nullptr;
+  }
+  void clearControllerServices() override {
+  }
+  void enableAllControllerServices() override {
+  }
+  void disableAllControllerServices() override {
+  }
+
+ private:
+  std::shared_ptr<controllers::SSLContextService> ssl_context_service_;
+};
+
+class ControllerTestFixture {
+ public:
+  enum class ConnectionType {
+    UNSECURE,
+    SSL_FROM_SERVICE_PROVIDER,
+    SSL_FROM_CONFIGURATION
+  };
+
+  ControllerTestFixture()
+    : configuration_(std::make_shared<minifi::Configure>()),
+      controller_(std::make_shared<TestStateController>()),
+      update_sink_(std::make_unique<TestUpdateSink>(controller_)),
+      stream_factory_(minifi::io::StreamFactory::getInstance(configuration_)) {
+    configuration_->set(minifi::Configure::controller_socket_host, 
"localhost");
+    configuration_->set(minifi::Configure::controller_socket_port, "9997");
+    configuration_->set(minifi::Configure::nifi_security_client_certificate, 
(minifi::utils::file::FileUtils::get_executable_dir() / "resources" / 
"minifi-cpp-flow.crt").string());
+    configuration_->set(minifi::Configure::nifi_security_client_private_key, 
(minifi::utils::file::FileUtils::get_executable_dir() / "resources" / 
"minifi-cpp-flow.key").string());
+    configuration_->set(minifi::Configure::nifi_security_client_pass_phrase, 
"abcdefgh");
+    
configuration_->set(minifi::Configure::nifi_security_client_ca_certificate, 
(minifi::utils::file::FileUtils::get_executable_dir() / "resources" / 
"root-ca.pem").string());
+    configuration_->set(minifi::Configure::controller_ssl_context_service, 
"SSLContextService");
+    ssl_context_service_ = 
std::make_shared<controllers::SSLContextService>("SSLContextService", 
configuration_);
+    ssl_context_service_->onEnable();
+    controller_service_provider_ = 
std::make_unique<TestControllerServiceProvider>(ssl_context_service_);
+  }
+
+  void initalizeControllerSocket(const 
std::shared_ptr<c2::ControllerSocketReporter>& reporter = nullptr) {
+    if (connection_type_ == ConnectionType::SSL_FROM_CONFIGURATION) {
+      configuration_->set(minifi::Configure::nifi_remote_input_secure, "true");
+    }
+    controller_socket_protocol_ = 
std::make_unique<minifi::c2::ControllerSocketProtocol>(
+      connection_type_ == ConnectionType::SSL_FROM_SERVICE_PROVIDER ? 
controller_service_provider_.get() : nullptr, update_sink_.get(), 
configuration_, reporter);
+    controller_socket_protocol_->initialize();
+  }
+
+  std::unique_ptr<minifi::io::Socket> createSocket() {
+    if (connection_type_ == ConnectionType::UNSECURE) {
+      return stream_factory_->createSocket("localhost", 9997);
+    } else {
+      return stream_factory_->createSecureSocket("localhost", 9997, 
ssl_context_service_);
+    }
+  }
+
+  void setConnectionType(ConnectionType connection_type) {
+    connection_type_ = connection_type;
+  }
+
+ protected:
+  ConnectionType connection_type_ = ConnectionType::UNSECURE;
+  std::shared_ptr<minifi::Configure> configuration_;
+  std::shared_ptr<TestStateController> controller_;
+  std::unique_ptr<TestUpdateSink> update_sink_;
+  std::shared_ptr<minifi::io::StreamFactory> stream_factory_;
+  std::unique_ptr<minifi::c2::ControllerSocketProtocol> 
controller_socket_protocol_;
+  std::shared_ptr<controllers::SSLContextService> ssl_context_service_;
+  std::unique_ptr<TestControllerServiceProvider> controller_service_provider_;
+};
+
+#ifdef WIN32
+TEST_CASE("TestWindows", "[test1]") {
+  std::cout << "Controller Tests are not supported on windows";
+}
+#else
+TEST_CASE_METHOD(ControllerTestFixture, "TestGet", "[test1]") {

Review Comment:
   "Test listComponents" would be a better name for this test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to