This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit ca1e4357daecf56dbfaf78a968c23f7b21bdfef4
Author: Gabor Gyimesi <[email protected]>
AuthorDate: Wed May 31 15:34:40 2023 +0200

    MINIFICPP-2112 Fix flow update and restart with minifi controller
    
    Closes #1568
    Signed-off-by: Marton Szasz <[email protected]>
---
 cmake/MiNiFiOptions.cmake                          |   2 +
 controller/tests/ControllerTests.cpp               |  12 +--
 .../integration/MiNiFi_integration_test_driver.py  |  33 +++++++
 docker/test/integration/cluster/ContainerStore.py  |   3 +
 .../test/integration/cluster/DockerTestCluster.py  |  45 ++++++++-
 .../cluster/DockerTestDirectoryBindings.py         |   1 +
 .../cluster/MinifiControllerExecutor.py            |  68 +++++++++++++
 .../cluster/containers/MinifiContainer.py          |  12 ++-
 docker/test/integration/features/CMakeLists.txt    |  64 +++++++------
 .../integration/features/minifi_controller.feature |  75 +++++++++++++++
 .../resources/minifi-controller/config.yml         |  30 ++++++
 docker/test/integration/steps/steps.py             | 106 +++++++++++++++++++++
 docker/test/integration/utils.py                   |   9 +-
 libminifi/include/FlowController.h                 |  16 +++-
 libminifi/include/c2/ControllerSocketProtocol.h    |  42 ++++++++
 libminifi/src/FlowController.cpp                   |  32 +++----
 libminifi/src/c2/ControllerSocketProtocol.cpp      |  53 +++++++++--
 libminifi/src/core/ProcessGroup.cpp                |   3 +
 18 files changed, 538 insertions(+), 68 deletions(-)

diff --git a/cmake/MiNiFiOptions.cmake b/cmake/MiNiFiOptions.cmake
index 1da46a8da..9600f34b3 100644
--- a/cmake/MiNiFiOptions.cmake
+++ b/cmake/MiNiFiOptions.cmake
@@ -121,6 +121,8 @@ add_minifi_option(ENABLE_KUBERNETES "Enables the Kubernetes 
extensions." ON)
 add_minifi_option(ENABLE_TEST_PROCESSORS "Enables test processors" OFF)
 add_minifi_option(ENABLE_PROMETHEUS "Enables Prometheus support." ON)
 add_minifi_option(DISABLE_JEMALLOC "Disables jemalloc." OFF)
+add_minifi_option(DISABLE_CURL "Disables curl." OFF)
+add_minifi_option(DISABLE_CONTROLLER "Disables build of MiNiFi controller 
binary." OFF)
 
 set_minifi_cache_variable(CUSTOM_MALLOC OFF "Overwrite malloc implementation.")
 set_property(CACHE CUSTOM_MALLOC PROPERTY STRINGS "jemalloc" "mimalloc" 
"rpmalloc" OFF)
diff --git a/controller/tests/ControllerTests.cpp 
b/controller/tests/ControllerTests.cpp
index c88af54df..82f90e1bc 100644
--- a/controller/tests/ControllerTests.cpp
+++ b/controller/tests/ControllerTests.cpp
@@ -314,14 +314,14 @@ TEST_CASE_METHOD(ControllerTestFixture, "Test 
listComponents", "[controllerTests
   }
 
   using org::apache::nifi::minifi::utils::verifyEventHappenedInPollTime;
-  REQUIRE(verifyEventHappenedInPollTime(500ms, [&] { return 
controller_->isRunning(); }, 20ms));
+  REQUIRE(verifyEventHappenedInPollTime(5s, [&] { return 
controller_->isRunning(); }, 20ms));
 
   {
     auto socket = createSocket();
     minifi::controller::stopComponent(std::move(socket), 
"TestStateController");
   }
 
-  REQUIRE(verifyEventHappenedInPollTime(500ms, [&] { return 
!controller_->isRunning(); }, 20ms));
+  REQUIRE(verifyEventHappenedInPollTime(5s, [&] { return 
!controller_->isRunning(); }, 20ms));
 
   {
     auto socket = createSocket();
@@ -353,14 +353,14 @@ TEST_CASE_METHOD(ControllerTestFixture, "TestClear", 
"[controllerTests]") {
   }
 
   using org::apache::nifi::minifi::utils::verifyEventHappenedInPollTime;
-  REQUIRE(verifyEventHappenedInPollTime(500ms, [&] { return 
controller_->isRunning(); }, 20ms));
+  REQUIRE(verifyEventHappenedInPollTime(5s, [&] { return 
controller_->isRunning(); }, 20ms));
 
   for (auto i = 0; i < 3; ++i) {
     auto socket = createSocket();
     minifi::controller::clearConnection(std::move(socket), "connection");
   }
 
-  REQUIRE(verifyEventHappenedInPollTime(500ms, [&] { return 3 == 
update_sink_->clear_calls; }, 20ms));
+  REQUIRE(verifyEventHappenedInPollTime(5s, [&] { return 3 == 
update_sink_->clear_calls; }, 20ms));
 }
 
 TEST_CASE_METHOD(ControllerTestFixture, "TestUpdate", "[controllerTests]") {
@@ -384,7 +384,7 @@ TEST_CASE_METHOD(ControllerTestFixture, "TestUpdate", 
"[controllerTests]") {
   }
 
   using org::apache::nifi::minifi::utils::verifyEventHappenedInPollTime;
-  REQUIRE(verifyEventHappenedInPollTime(500ms, [&] { return 
controller_->isRunning(); }, 20ms));
+  REQUIRE(verifyEventHappenedInPollTime(5s, [&] { return 
controller_->isRunning(); }, 20ms));
 
   std::stringstream ss;
 
@@ -393,7 +393,7 @@ TEST_CASE_METHOD(ControllerTestFixture, "TestUpdate", 
"[controllerTests]") {
     minifi::controller::updateFlow(std::move(socket), ss, "connection");
   }
 
-  REQUIRE(verifyEventHappenedInPollTime(500ms, [&] { return 1 == 
update_sink_->update_calls; }, 20ms));
+  REQUIRE(verifyEventHappenedInPollTime(5s, [&] { return 1 == 
update_sink_->update_calls; }, 20ms));
   REQUIRE(0 == update_sink_->clear_calls);
 }
 
diff --git a/docker/test/integration/MiNiFi_integration_test_driver.py 
b/docker/test/integration/MiNiFi_integration_test_driver.py
index 981b0802c..bcaddb33b 100644
--- a/docker/test/integration/MiNiFi_integration_test_driver.py
+++ b/docker/test/integration/MiNiFi_integration_test_driver.py
@@ -332,3 +332,36 @@ class MiNiFi_integration_test:
 
     def set_yaml_in_minifi(self):
         self.cluster.set_yaml_in_minifi()
+
+    def set_controller_socket_properties_in_minifi(self):
+        self.cluster.set_controller_socket_properties_in_minifi()
+
+    def update_flow_config_through_controller(self, container_name: str):
+        self.cluster.update_flow_config_through_controller(container_name)
+
+    def check_minifi_controller_updated_config_is_persisted(self, 
container_name: str):
+        assert 
self.cluster.check_minifi_controller_updated_config_is_persisted(container_name)
 or self.cluster.log_app_output()
+
+    def stop_component_through_controller(self, component: str, 
container_name: str):
+        self.cluster.stop_component_through_controller(component, 
container_name)
+
+    def start_component_through_controller(self, component: str, 
container_name: str):
+        self.cluster.start_component_through_controller(component, 
container_name)
+
+    def check_component_not_running_through_controller(self, component: str, 
container_name: str):
+        assert 
self.cluster.check_component_not_running_through_controller(component, 
container_name) or self.cluster.log_app_output()
+
+    def check_component_running_through_controller(self, component: str, 
container_name: str):
+        assert 
self.cluster.check_component_running_through_controller(component, 
container_name) or self.cluster.log_app_output()
+
+    def connection_found_through_controller(self, connection: str, 
container_name: str):
+        assert self.cluster.connection_found_through_controller(connection, 
container_name) or self.cluster.log_app_output()
+
+    def check_connections_full_through_controller(self, connection_count: int, 
container_name: str):
+        assert 
self.cluster.check_connections_full_through_controller(connection_count, 
container_name) or self.cluster.log_app_output()
+
+    def check_connection_size_through_controller(self, connection: str, size: 
int, max_size: int, container_name: str):
+        assert 
self.cluster.check_connection_size_through_controller(connection, size, 
max_size, container_name) or self.cluster.log_app_output()
+
+    def manifest_can_be_retrieved_through_minifi_controller(self, 
container_name: str):
+        assert 
self.cluster.manifest_can_be_retrieved_through_minifi_controller(container_name)
 or self.cluster.log_app_output()
diff --git a/docker/test/integration/cluster/ContainerStore.py 
b/docker/test/integration/cluster/ContainerStore.py
index a463a5e0c..7a511b7c5 100644
--- a/docker/test/integration/cluster/ContainerStore.py
+++ b/docker/test/integration/cluster/ContainerStore.py
@@ -166,6 +166,9 @@ class ContainerStore:
     def set_yaml_in_minifi(self):
         self.minifi_options.config_format = "yaml"
 
+    def set_controller_socket_properties_in_minifi(self):
+        self.minifi_options.enable_controller_socket = True
+
     def get_startup_finished_log_entry(self, container_name):
         return self.containers[container_name].get_startup_finished_log_entry()
 
diff --git a/docker/test/integration/cluster/DockerTestCluster.py 
b/docker/test/integration/cluster/DockerTestCluster.py
index c38b0cfb3..732837245 100644
--- a/docker/test/integration/cluster/DockerTestCluster.py
+++ b/docker/test/integration/cluster/DockerTestCluster.py
@@ -19,6 +19,7 @@ import re
 from .LogSource import LogSource
 from .ContainerStore import ContainerStore
 from .DockerCommunicator import DockerCommunicator
+from .MinifiControllerExecutor import MinifiControllerExecutor
 from .checkers.AwsChecker import AwsChecker
 from .checkers.AzureChecker import AzureChecker
 from .checkers.ElasticSearchChecker import ElasticSearchChecker
@@ -26,7 +27,7 @@ from .checkers.GcsChecker import GcsChecker
 from .checkers.PostgresChecker import PostgresChecker
 from .checkers.PrometheusChecker import PrometheusChecker
 from .checkers.SplunkChecker import SplunkChecker
-from utils import get_peak_memory_usage, get_minifi_pid, get_memory_usage
+from utils import get_peak_memory_usage, get_minifi_pid, get_memory_usage, 
retry_check
 
 
 class DockerTestCluster:
@@ -42,6 +43,7 @@ class DockerTestCluster:
         self.postgres_checker = PostgresChecker(self.container_communicator)
         self.splunk_checker = SplunkChecker(self.container_communicator)
         self.prometheus_checker = PrometheusChecker()
+        self.minifi_controller_executor = 
MinifiControllerExecutor(self.container_communicator)
 
     def cleanup(self):
         self.container_store.cleanup()
@@ -91,6 +93,9 @@ class DockerTestCluster:
     def set_yaml_in_minifi(self):
         self.container_store.set_yaml_in_minifi()
 
+    def set_controller_socket_properties_in_minifi(self):
+        self.container_store.set_controller_socket_properties_in_minifi()
+
     def get_app_log(self, container_name):
         log_source = self.container_store.log_source(container_name)
         if log_source == LogSource.FROM_DOCKER_CONTAINER:
@@ -271,3 +276,41 @@ class DockerTestCluster:
             time.sleep(1)
         logging.warning(f"Memory usage ({current_memory_usage}) is more than 
the maximum asserted memory usage ({max_memory_usage})")
         return False
+
+    def update_flow_config_through_controller(self, container_name: str):
+        self.minifi_controller_executor.update_flow(container_name)
+
+    @retry_check(10, 1)
+    def check_minifi_controller_updated_config_is_persisted(self, 
container_name: str) -> bool:
+        return 
self.minifi_controller_executor.updated_config_is_persisted(container_name)
+
+    def stop_component_through_controller(self, component: str, 
container_name: str):
+        self.minifi_controller_executor.stop_component(component, 
container_name)
+
+    def start_component_through_controller(self, component: str, 
container_name: str):
+        self.minifi_controller_executor.start_component(component, 
container_name)
+
+    @retry_check(10, 1)
+    def check_component_not_running_through_controller(self, component: str, 
container_name: str) -> bool:
+        return not 
self.minifi_controller_executor.is_component_running(component, container_name)
+
+    @retry_check(10, 1)
+    def check_component_running_through_controller(self, component: str, 
container_name: str) -> bool:
+        return self.minifi_controller_executor.is_component_running(component, 
container_name)
+
+    @retry_check(10, 1)
+    def connection_found_through_controller(self, connection: str, 
container_name: str) -> bool:
+        return connection in 
self.minifi_controller_executor.get_connections(container_name)
+
+    @retry_check(10, 1)
+    def check_connections_full_through_controller(self, connection_count: int, 
container_name: str) -> bool:
+        return 
self.minifi_controller_executor.get_full_connection_count(container_name) == 
connection_count
+
+    @retry_check(10, 1)
+    def check_connection_size_through_controller(self, connection: str, size: 
int, max_size: int, container_name: str) -> bool:
+        return self.minifi_controller_executor.get_connection_size(connection, 
container_name) == (size, max_size)
+
+    @retry_check(10, 1000)
+    def manifest_can_be_retrieved_through_minifi_controller(self, 
container_name: str) -> bool:
+        manifest = self.minifi_controller_executor.get_manifest(container_name)
+        return '"agentManifest": {' in manifest and '"componentManifest": {' 
in manifest and '"agentType": "cpp"' in manifest
diff --git a/docker/test/integration/cluster/DockerTestDirectoryBindings.py 
b/docker/test/integration/cluster/DockerTestDirectoryBindings.py
index 228e3837a..259d3a44d 100644
--- a/docker/test/integration/cluster/DockerTestDirectoryBindings.py
+++ b/docker/test/integration/cluster/DockerTestDirectoryBindings.py
@@ -58,6 +58,7 @@ class DockerTestDirectoryBindings:
         shutil.copytree(test_dir + "/resources/elasticsearch/certs", 
self.data_directories[self.test_id]["resources_dir"] + "/elasticsearch")
         shutil.copytree(test_dir + "/resources/opensearch/certs", 
self.data_directories[self.test_id]["resources_dir"] + "/opensearch")
         shutil.copytree(test_dir + "/resources/minifi-c2-server-ssl/certs", 
self.data_directories[self.test_id]["resources_dir"] + "/minifi-c2-server-ssl")
+        shutil.copytree(test_dir + "/resources/minifi-controller", 
self.data_directories[self.test_id]["resources_dir"] + "/minifi-controller")
         shutil.copytree(test_dir + "/resources/minifi", 
self.data_directories[self.test_id]["minifi_config_dir"], dirs_exist_ok=True)
 
     def get_data_directories(self, test_id):
diff --git a/docker/test/integration/cluster/MinifiControllerExecutor.py 
b/docker/test/integration/cluster/MinifiControllerExecutor.py
new file mode 100644
index 000000000..d7290df89
--- /dev/null
+++ b/docker/test/integration/cluster/MinifiControllerExecutor.py
@@ -0,0 +1,68 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from .DockerCommunicator import DockerCommunicator
+
+
+class MinifiControllerExecutor:
+    def __init__(self, container_communicator: DockerCommunicator):
+        self.container_communicator = container_communicator
+
+    def update_flow(self, container_name: str):
+        self.container_communicator.execute_command(container_name, 
["/opt/minifi/minifi-current/bin/minificontroller", "--updateflow", 
"/tmp/resources/minifi-controller/config.yml"])
+
+    def updated_config_is_persisted(self, container_name: str) -> bool:
+        (code, output) = 
self.container_communicator.execute_command(container_name, ["cat", 
"/opt/minifi/minifi-current/conf/config.yml"])
+        return code == 0 and "2f2a3b47-f5ba-49f6-82b5-bc1c86b96f38" in output
+
+    def stop_component(self, component: str, container_name: str):
+        self.container_communicator.execute_command(container_name, 
["/opt/minifi/minifi-current/bin/minificontroller", "--stop", component])
+
+    def start_component(self, component: str, container_name: str):
+        self.container_communicator.execute_command(container_name, 
["/opt/minifi/minifi-current/bin/minificontroller", "--start", component])
+
+    def is_component_running(self, component: str, container_name: str) -> 
bool:
+        (code, output) = 
self.container_communicator.execute_command(container_name, 
["/opt/minifi/minifi-current/bin/minificontroller", "--list", "components"])
+        return code == 0 and component + ", running: true" in output
+
+    def get_connections(self, container_name: str):
+        (_, output) = 
self.container_communicator.execute_command(container_name, 
["/opt/minifi/minifi-current/bin/minificontroller", "--list", "connections"])
+        connections = []
+        for line in output.split('\n'):
+            if not line.startswith('[') and not line.startswith('Connection 
Names'):
+                connections.append(line)
+        return connections
+
+    def get_full_connection_count(self, container_name: str) -> int:
+        (_, output) = 
self.container_communicator.execute_command(container_name, 
["/opt/minifi/minifi-current/bin/minificontroller", "--getfull"])
+        for line in output.split('\n'):
+            if "are full" in line:
+                return int(line.split(' ')[0])
+        return -1
+
+    def get_connection_size(self, connection: str, container_name: str):
+        (_, output) = 
self.container_communicator.execute_command(container_name, 
["/opt/minifi/minifi-current/bin/minificontroller", "--getsize", connection])
+        for line in output.split('\n'):
+            if "Size/Max of " + connection in line:
+                size_and_max = line.split(connection)[1].split('/')
+                return (int(size_and_max[0].strip()), 
int(size_and_max[1].strip()))
+        return (-1, -1)
+
+    def get_manifest(self, container_name: str) -> str:
+        (_, output) = 
self.container_communicator.execute_command(container_name, 
["/opt/minifi/minifi-current/bin/minificontroller", "--manifest"])
+        manifest = ""
+        for line in output.split('\n'):
+            if not line.startswith('['):
+                manifest += line
+        return manifest
diff --git a/docker/test/integration/cluster/containers/MinifiContainer.py 
b/docker/test/integration/cluster/containers/MinifiContainer.py
index e784c647b..d9f47a254 100644
--- a/docker/test/integration/cluster/containers/MinifiContainer.py
+++ b/docker/test/integration/cluster/containers/MinifiContainer.py
@@ -34,6 +34,7 @@ class MinifiOptions:
         self.config_format = "json"
         self.use_flow_config_from_url = False
         self.set_ssl_context_properties = False
+        self.enable_controller_socket = False
 
 
 class MinifiContainer(FlowContainer):
@@ -46,6 +47,7 @@ class MinifiContainer(FlowContainer):
 
         super().__init__(config_dir, name, 'minifi-cpp', copy.copy(vols), 
network, image_store, command)
         self.container_specific_config_dir = 
self._create_container_config_dir(self.config_dir)
+        os.chmod(self.container_specific_config_dir, 0o777)
 
     def _create_container_config_dir(self, config_dir):
         container_config_dir = os.path.join(config_dir, str(uuid.uuid4()))
@@ -69,8 +71,10 @@ class MinifiContainer(FlowContainer):
             assert False, "Invalid flow configuration format: 
{}".format(self.options.config_format)
         test_flow_yaml = serializer.serialize(self.start_nodes, 
self.controllers)
         logging.info('Using generated flow config yml:\n%s', test_flow_yaml)
-        with open(os.path.join(self.container_specific_config_dir, 
"config.yml"), 'wb') as config_file:
+        absolute_flow_config_path = 
os.path.join(self.container_specific_config_dir, "config.yml")
+        with open(absolute_flow_config_path, 'wb') as config_file:
             config_file.write(test_flow_yaml.encode('utf-8'))
+        os.chmod(absolute_flow_config_path, 0o777)
 
     def _create_properties(self):
         properties_file_path = 
os.path.join(self.container_specific_config_dir, 'minifi.properties')
@@ -114,6 +118,12 @@ class MinifiContainer(FlowContainer):
             if self.options.use_flow_config_from_url:
                 
f.write("nifi.c2.flow.url=http://minifi-c2-server:10090/c2/config?class=minifi-test-class\n";)
 
+            if self.options.enable_controller_socket:
+                f.write("controller.socket.enable=true\n")
+                f.write("controller.socket.host=localhost\n")
+                f.write("controller.socket.port=9998\n")
+                f.write("controller.socket.local.any.interface=false\n")
+
     def _setup_config(self):
         self._create_properties()
         if not self.options.use_flow_config_from_url:
diff --git a/docker/test/integration/features/CMakeLists.txt 
b/docker/test/integration/features/CMakeLists.txt
index f0ca97a77..93abb54cd 100644
--- a/docker/test/integration/features/CMakeLists.txt
+++ b/docker/test/integration/features/CMakeLists.txt
@@ -15,76 +15,82 @@
 # specific language governing permissions and limitations
 # under the License.
 
+set(ENABLED_BEHAVE_TESTS "")
+
 if (ENABLE_LIBRDKAFKA)
-    set(ENABLED_BEHAVE_TESTS 
"${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/kafka.feature")
+    list(APPEND ENABLED_BEHAVE_TESTS 
"${CMAKE_CURRENT_SOURCE_DIR}/kafka.feature")
 endif()
 
-set(ENABLED_BEHAVE_TESTS 
"${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/attributes_to_json.feature")
-set(ENABLED_BEHAVE_TESTS 
"${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/core_functionality.feature")
-set(ENABLED_BEHAVE_TESTS 
"${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/defragtextflowfiles.feature")
-set(ENABLED_BEHAVE_TESTS 
"${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/file_system_operations.feature")
-set(ENABLED_BEHAVE_TESTS 
"${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/hashcontent.feature")
-set(ENABLED_BEHAVE_TESTS 
"${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/replace_text.feature")
-set(ENABLED_BEHAVE_TESTS 
"${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/routetext.feature")
-set(ENABLED_BEHAVE_TESTS 
"${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/s2s.feature")
-set(ENABLED_BEHAVE_TESTS 
"${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/syslog_listener.feature")
-set(ENABLED_BEHAVE_TESTS 
"${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/network_listener.feature")
-set(ENABLED_BEHAVE_TESTS 
"${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/minifi_c2_server.feature")
+list(APPEND ENABLED_BEHAVE_TESTS 
"${CMAKE_CURRENT_SOURCE_DIR}/attributes_to_json.feature")
+list(APPEND ENABLED_BEHAVE_TESTS 
"${CMAKE_CURRENT_SOURCE_DIR}/core_functionality.feature")
+list(APPEND ENABLED_BEHAVE_TESTS 
"${CMAKE_CURRENT_SOURCE_DIR}/defragtextflowfiles.feature")
+list(APPEND ENABLED_BEHAVE_TESTS 
"${CMAKE_CURRENT_SOURCE_DIR}/file_system_operations.feature")
+list(APPEND ENABLED_BEHAVE_TESTS 
"${CMAKE_CURRENT_SOURCE_DIR}/hashcontent.feature")
+list(APPEND ENABLED_BEHAVE_TESTS 
"${CMAKE_CURRENT_SOURCE_DIR}/replace_text.feature")
+list(APPEND ENABLED_BEHAVE_TESTS 
"${CMAKE_CURRENT_SOURCE_DIR}/routetext.feature")
+list(APPEND ENABLED_BEHAVE_TESTS "${CMAKE_CURRENT_SOURCE_DIR}/s2s.feature")
+list(APPEND ENABLED_BEHAVE_TESTS 
"${CMAKE_CURRENT_SOURCE_DIR}/syslog_listener.feature")
+list(APPEND ENABLED_BEHAVE_TESTS 
"${CMAKE_CURRENT_SOURCE_DIR}/network_listener.feature")
+list(APPEND ENABLED_BEHAVE_TESTS 
"${CMAKE_CURRENT_SOURCE_DIR}/minifi_c2_server.feature")
 
 if (ENABLE_AZURE)
-    set(ENABLED_BEHAVE_TESTS 
"${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/azure_storage.feature")
+    list(APPEND ENABLED_BEHAVE_TESTS 
"${CMAKE_CURRENT_SOURCE_DIR}/azure_storage.feature")
 endif()
 
 if (ENABLE_GCP)
-    set(ENABLED_BEHAVE_TESTS 
"${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/google_cloud_storage.feature")
+    list(APPEND ENABLED_BEHAVE_TESTS 
"${CMAKE_CURRENT_SOURCE_DIR}/google_cloud_storage.feature")
 endif()
 
 if (NOT DISABLE_CIVET)
-    set(ENABLED_BEHAVE_TESTS 
"${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/http.feature")
-    set(ENABLED_BEHAVE_TESTS 
"${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/https.feature")
+    list(APPEND ENABLED_BEHAVE_TESTS 
"${CMAKE_CURRENT_SOURCE_DIR}/http.feature")
+    list(APPEND ENABLED_BEHAVE_TESTS 
"${CMAKE_CURRENT_SOURCE_DIR}/https.feature")
 endif()
 
 if (ENABLE_KUBERNETES)
-    set(ENABLED_BEHAVE_TESTS 
"${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/kubernetes_logging.feature")
-    set(ENABLED_BEHAVE_TESTS 
"${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/kubernetes_metrics.feature")
+    list(APPEND ENABLED_BEHAVE_TESTS 
"${CMAKE_CURRENT_SOURCE_DIR}/kubernetes_logging.feature")
+    list(APPEND ENABLED_BEHAVE_TESTS 
"${CMAKE_CURRENT_SOURCE_DIR}/kubernetes_metrics.feature")
 endif()
 
 if (ENABLE_MQTT)
-    set(ENABLED_BEHAVE_TESTS 
"${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/mqtt.feature")
+    list(APPEND ENABLED_BEHAVE_TESTS 
"${CMAKE_CURRENT_SOURCE_DIR}/mqtt.feature")
 endif()
 
 if (ENABLE_OPC)
-    set(ENABLED_BEHAVE_TESTS 
"${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/opcua.feature")
+    list(APPEND ENABLED_BEHAVE_TESTS 
"${CMAKE_CURRENT_SOURCE_DIR}/opcua.feature")
 endif()
 
 if (ENABLE_PYTHON_SCRIPTING)
-    set(ENABLED_BEHAVE_TESTS 
"${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/python.feature")
-    set(ENABLED_BEHAVE_TESTS 
"${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/python_script.feature")
+    list(APPEND ENABLED_BEHAVE_TESTS 
"${CMAKE_CURRENT_SOURCE_DIR}/python.feature")
+    list(APPEND ENABLED_BEHAVE_TESTS 
"${CMAKE_CURRENT_SOURCE_DIR}/python_script.feature")
 endif()
 
 if (ENABLE_LUA_SCRIPTING)
-    set(ENABLED_BEHAVE_TESTS 
"${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/lua_script.feature")
+    list(APPEND ENABLED_BEHAVE_TESTS 
"${CMAKE_CURRENT_SOURCE_DIR}/lua_script.feature")
 endif()
 
 if (ENABLE_AWS)
-    set(ENABLED_BEHAVE_TESTS 
"${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/s3.feature")
+    list(APPEND ENABLED_BEHAVE_TESTS "${CMAKE_CURRENT_SOURCE_DIR}/s3.feature")
 endif()
 
 if (ENABLE_SPLUNK)
-    set(ENABLED_BEHAVE_TESTS 
"${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/splunk.feature")
+    list(APPEND ENABLED_BEHAVE_TESTS 
"${CMAKE_CURRENT_SOURCE_DIR}/splunk.feature")
 endif()
 
 if (ENABLE_SQL)
-    set(ENABLED_BEHAVE_TESTS 
"${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/sql.feature")
+    list(APPEND ENABLED_BEHAVE_TESTS "${CMAKE_CURRENT_SOURCE_DIR}/sql.feature")
 endif()
 
 if (ENABLE_ELASTICSEARCH AND NOT CI_BUILD) # Elasticsearch/Opensearch 
containers require more RAM than what the CI environment has
-    set(ENABLED_BEHAVE_TESTS 
"${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/elasticsearch.feature")
-    set(ENABLED_BEHAVE_TESTS 
"${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/opensearch.feature")
+    list(APPEND ENABLED_BEHAVE_TESTS 
"${CMAKE_CURRENT_SOURCE_DIR}/elasticsearch.feature")
+    list(APPEND ENABLED_BEHAVE_TESTS 
"${CMAKE_CURRENT_SOURCE_DIR}/opensearch.feature")
 endif()
 
 if (ENABLE_PROMETHEUS)
-    set(ENABLED_BEHAVE_TESTS 
"${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/prometheus.feature")
+    list(APPEND ENABLED_BEHAVE_TESTS 
"${CMAKE_CURRENT_SOURCE_DIR}/prometheus.feature")
+endif()
+
+if (NOT DISABLE_CURL AND NOT DISABLE_CONTROLLER)
+    list(APPEND ENABLED_BEHAVE_TESTS 
"${CMAKE_CURRENT_SOURCE_DIR}/minifi_controller.feature")
 endif()
 
 file(GLOB FEATURE_FILES  "*.feature")
diff --git a/docker/test/integration/features/minifi_controller.feature 
b/docker/test/integration/features/minifi_controller.feature
new file mode 100644
index 000000000..700a782bd
--- /dev/null
+++ b/docker/test/integration/features/minifi_controller.feature
@@ -0,0 +1,75 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+Feature: MiNiFi Controller functionalities
+  Test MiNiFi Controller functionalities
+
+  Background:
+    Given the content of "/tmp/output" is monitored
+
+  Scenario: Flow config can be updated through MiNiFi controller
+    Given a GenerateFlowFile processor
+    And a file with the content "test" is present in "/tmp/input"
+    And controller socket properties are set up
+    When all instances start up
+    And MiNiFi config is updated through MiNiFi controller
+    Then a flowfile with the content "test" is placed in the monitored 
directory in less than 60 seconds
+    And the updated config is persisted
+
+  Scenario: A component can be stopped
+    Given a GenerateFlowFile processor
+    And a file with the content "test" is present in "/tmp/input"
+    And controller socket properties are set up
+    When all instances start up
+    And the GenerateFlowFile component is stopped through MiNiFi controller
+    Then the GenerateFlowFile component is not running
+    And the FlowController component is running
+
+  Scenario: If FlowController is stopped all other components are stopped
+    Given a GenerateFlowFile processor
+    And a file with the content "test" is present in "/tmp/input"
+    And controller socket properties are set up
+    When all instances start up
+    And the FlowController component is stopped through MiNiFi controller
+    Then the GenerateFlowFile component is not running
+    And the FlowController component is not running
+
+  Scenario: FlowController can be stopped and restarted
+    Given a GenerateFlowFile processor
+    And a file with the content "test" is present in "/tmp/input"
+    And controller socket properties are set up
+    When all instances start up
+    And the FlowController component is stopped through MiNiFi controller
+    And the FlowController component is started through MiNiFi controller
+    Then the GenerateFlowFile component is running
+    And the FlowController component is running
+
+  Scenario: Queue state can be queried
+    Given a GenerateFlowFile processor
+    And a LogAttribute processor with the "FlowFiles To Log" property set to 
"0"
+    And the "success" relationship of the GenerateFlowFile processor is 
connected to the LogAttribute
+    And controller socket properties are set up
+    When all instances start up
+    And MiNiFi config is updated through MiNiFi controller
+    Then connection "GetFile/success/PutFile" can be seen through MiNiFi 
controller
+    And 0 connections can be seen full through MiNiFi controller
+    And connection "GetFile/success/PutFile" has 0 size and 2000 max size 
through MiNiFi controller
+
+  Scenario: Manifest can be retrieved
+    Given a GenerateFlowFile processor
+    And a file with the content "test" is present in "/tmp/input"
+    And controller socket properties are set up
+    When all instances start up
+    Then manifest can be retrieved through MiNiFi controller
diff --git a/docker/test/integration/resources/minifi-controller/config.yml 
b/docker/test/integration/resources/minifi-controller/config.yml
new file mode 100644
index 000000000..60bf34b12
--- /dev/null
+++ b/docker/test/integration/resources/minifi-controller/config.yml
@@ -0,0 +1,30 @@
+MiNiFi Config Version: 3
+Flow Controller:
+  name: MiNiFi Flow
+Processors:
+- name: Get files from /tmp/input
+  id: 2f2a3b47-f5ba-49f6-82b5-bc1c86b96f38
+  class: org.apache.nifi.minifi.processors.GetFile
+  scheduling strategy: TIMER_DRIVEN
+  scheduling period: 1000 ms
+  Properties:
+    Input Directory: /tmp/input
+- name: Put files to /tmp/output
+  id: e143601d-de4f-44ba-a6ec-d1f97d77ec94
+  class: org.apache.nifi.minifi.processors.PutFile
+  scheduling strategy: EVENT_DRIVEN
+  auto-terminated relationships list:
+  - failure
+  - success
+  Properties:
+    Conflict Resolution Strategy: fail
+    Create Missing Directories: 'true'
+    Directory: /tmp/output
+Connections:
+- name: GetFile/success/PutFile
+  id: 098a56ba-f4bf-4323-a3f3-6f8a5e3586bf
+  source id: 2f2a3b47-f5ba-49f6-82b5-bc1c86b96f38
+  source relationship names:
+  - success
+  destination id: e143601d-de4f-44ba-a6ec-d1f97d77ec94
+Remote Process Groups: []
diff --git a/docker/test/integration/steps/steps.py 
b/docker/test/integration/steps/steps.py
index 83baf1d44..06978dcdb 100644
--- a/docker/test/integration/steps/steps.py
+++ b/docker/test/integration/steps/steps.py
@@ -1064,3 +1064,109 @@ def step_impl(context, peak_usage_percent: str, 
duration: str) -> None:
 @given(u'a MiNiFi CPP server with yaml config')
 def step_impl(context):
     context.test.set_yaml_in_minifi()
+
+
+# MiNiFi controller
+@given(u'controller socket properties are set up')
+def step_impl(context):
+    context.test.set_controller_socket_properties_in_minifi()
+
+
+@when(u'MiNiFi config is updated through MiNiFi controller in the 
\"{minifi_container_name}\" flow')
+def step_impl(context, minifi_container_name: str):
+    context.test.update_flow_config_through_controller(minifi_container_name)
+
+
+@when(u'MiNiFi config is updated through MiNiFi controller')
+def step_impl(context):
+    context.execute_steps("""when MiNiFi config is updated through MiNiFi 
controller in the \"{minifi_container_name}\" 
flow""".format(minifi_container_name="minifi-cpp-flow"))
+
+
+@then(u'the updated config is persisted in the \"{minifi_container_name}\" 
flow')
+def step_impl(context, minifi_container_name: str):
+    
context.test.check_minifi_controller_updated_config_is_persisted(minifi_container_name)
+
+
+@then(u'the updated config is persisted')
+def step_impl(context):
+    context.execute_steps("""then the updated config is persisted in the 
\"{minifi_container_name}\" 
flow""".format(minifi_container_name="minifi-cpp-flow"))
+
+
+@when(u'the {component} component is stopped through MiNiFi controller in the 
\"{minifi_container_name}\" flow')
+def step_impl(context, minifi_container_name: str, component: str):
+    context.test.stop_component_through_controller(component, 
minifi_container_name)
+
+
+@when(u'the {component} component is stopped through MiNiFi controller')
+def step_impl(context, component: str):
+    context.execute_steps("""when the {component} component is stopped through 
MiNiFi controller in the \"{minifi_container_name}\" 
flow""".format(component=component, minifi_container_name="minifi-cpp-flow"))
+
+
+@when(u'the {component} component is started through MiNiFi controller in the 
\"{minifi_container_name}\" flow')
+def step_impl(context, minifi_container_name: str, component: str):
+    context.test.start_component_through_controller(component, 
minifi_container_name)
+
+
+@when(u'the {component} component is started through MiNiFi controller')
+def step_impl(context, component: str):
+    context.execute_steps("""when the {component} component is started through 
MiNiFi controller in the \"{minifi_container_name}\" 
flow""".format(component=component, minifi_container_name="minifi-cpp-flow"))
+
+
+@then(u'the {component} component is not running in the 
\"{minifi_container_name}\" flow')
+def step_impl(context, component: str, minifi_container_name: str):
+    context.test.check_component_not_running_through_controller(component, 
minifi_container_name)
+
+
+@then(u'the {component} component is not running')
+def step_impl(context, component: str):
+    context.execute_steps("""then the {component} component is not running in 
the \"{minifi_container_name}\" flow""".format(component=component, 
minifi_container_name="minifi-cpp-flow"))
+
+
+@then(u'the {component} component is running in the 
\"{minifi_container_name}\" flow')
+def step_impl(context, component: str, minifi_container_name: str):
+    context.test.check_component_running_through_controller(component, 
minifi_container_name)
+
+
+@then(u'the {component} component is running')
+def step_impl(context, component: str):
+    context.execute_steps("""then the {component} component is running in the 
\"{minifi_container_name}\" flow""".format(component=component, 
minifi_container_name="minifi-cpp-flow"))
+
+
+@then(u'connection \"{connection}\" can be seen through MiNiFi controller in 
the \"{minifi_container_name}\" flow')
+def step_impl(context, connection: str, minifi_container_name: str):
+    context.test.connection_found_through_controller(connection, 
minifi_container_name)
+
+
+@then(u'connection \"{connection}\" can be seen through MiNiFi controller')
+def step_impl(context, connection: str):
+    context.execute_steps("""then connection \"{connection}\" can be seen 
through MiNiFi controller in the \"{minifi_container_name}\" 
flow""".format(connection=connection, minifi_container_name="minifi-cpp-flow"))
+
+
+@then(u'{connection_count:d} connections can be seen full through MiNiFi 
controller in the \"{minifi_container_name}\" flow')
+def step_impl(context, connection_count: int, minifi_container_name: str):
+    context.test.check_connections_full_through_controller(connection_count, 
minifi_container_name)
+
+
+@then(u'{connection_count:d} connections can be seen full through MiNiFi 
controller')
+def step_impl(context, connection_count: int):
+    context.execute_steps("""then {connection_count:d} connections can be seen 
full through MiNiFi controller in the \"{minifi_container_name}\" 
flow""".format(connection_count=connection_count, 
minifi_container_name="minifi-cpp-flow"))
+
+
+@then(u'connection \"{connection}\" has {size:d} size and {max_size:d} max 
size through MiNiFi controller in the \"{minifi_container_name}\" flow')
+def step_impl(context, connection: str, size: int, max_size: int, 
minifi_container_name: str):
+    context.test.check_connection_size_through_controller(connection, size, 
max_size, minifi_container_name)
+
+
+@then(u'connection \"{connection}\" has {size:d} size and {max_size:d} max 
size through MiNiFi controller')
+def step_impl(context, connection: str, size: int, max_size: int):
+    context.execute_steps("""then connection \"{connection}\" has {size:d} 
size and {max_size:d} max size through MiNiFi controller in the 
\"{minifi_container_name}\" flow""".format(connection=connection, size=size, 
max_size=max_size, minifi_container_name="minifi-cpp-flow"))
+
+
+@then(u'manifest can be retrieved through MiNiFi controller in the 
\"{minifi_container_name}\" flow')
+def step_impl(context, minifi_container_name: str):
+    
context.test.manifest_can_be_retrieved_through_minifi_controller(minifi_container_name)
+
+
+@then(u'manifest can be retrieved through MiNiFi controller')
+def step_impl(context):
+    context.execute_steps("""then manifest can be retrieved through MiNiFi 
controller in the \"{minifi_container_name}\" 
flow""".format(minifi_container_name="minifi-cpp-flow"))
diff --git a/docker/test/integration/utils.py b/docker/test/integration/utils.py
index 07e388431..15db614d2 100644
--- a/docker/test/integration/utils.py
+++ b/docker/test/integration/utils.py
@@ -25,13 +25,10 @@ def retry_check(max_tries=5, retry_interval=1):
     def retry_check_func(func):
         @functools.wraps(func)
         def retry_wrapper(*args, **kwargs):
-            current_retry_count = 0
-            while current_retry_count < max_tries:
-                if not func(*args, **kwargs):
-                    current_retry_count += 1
-                    time.sleep(retry_interval)
-                else:
+            for i in range(max_tries):
+                if func(*args, **kwargs):
                     return True
+                time.sleep(retry_interval)
             return False
         return retry_wrapper
     return retry_check_func
diff --git a/libminifi/include/FlowController.h 
b/libminifi/include/FlowController.h
index 60a8832e8..dbd0844f0 100644
--- a/libminifi/include/FlowController.h
+++ b/libminifi/include/FlowController.h
@@ -89,7 +89,7 @@ class FlowController : public 
core::controller::ForwardingControllerServiceProvi
   }
 
   bool isRunning() const override {
-    return running_.load() || updating_.load();
+    return running_.load() || updating_.isUpdating();
   }
 
   virtual bool isInitialized() {
@@ -149,6 +149,18 @@ class FlowController : public 
core::controller::ForwardingControllerServiceProvi
   std::map<std::string, std::unique_ptr<io::InputStream>> getDebugInfo() 
override;
 
  private:
+  class UpdateState {
+   public:
+    bool isUpdating() const { return update_block_count_ > 0; }
+    void beginUpdate() { ++update_block_count_; }
+    void endUpdate() { --update_block_count_; }
+    void lock() { beginUpdate(); }
+    void unlock() { endUpdate(); }
+
+   private:
+    std::atomic<uint32_t> update_block_count_;
+  };
+
   /**
    * Loads the flow as specified in the flow config file or if not present
    * tries to fetch it from the C2 server (if enabled).
@@ -171,7 +183,7 @@ class FlowController : public 
core::controller::ForwardingControllerServiceProvi
 
   std::recursive_mutex mutex_;
   std::atomic<bool> running_;
-  std::atomic<bool> updating_;
+  UpdateState updating_;
   std::atomic<bool> initialized_;
   std::unique_ptr<TimerDrivenSchedulingAgent> timer_scheduler_;
   std::unique_ptr<EventDrivenSchedulingAgent> event_scheduler_;
diff --git a/libminifi/include/c2/ControllerSocketProtocol.h 
b/libminifi/include/c2/ControllerSocketProtocol.h
index e4c7b23cd..67bb47ef3 100644
--- a/libminifi/include/c2/ControllerSocketProtocol.h
+++ b/libminifi/include/c2/ControllerSocketProtocol.h
@@ -18,6 +18,10 @@
 
 #include <memory>
 #include <string>
+#include <condition_variable>
+#include <thread>
+#include <mutex>
+#include <atomic>
 
 #include "io/StreamFactory.h"
 #include "io/BaseStream.h"
@@ -26,6 +30,7 @@
 #include "core/state/nodes/StateMonitor.h"
 #include "core/controller/ControllerServiceProvider.h"
 #include "ControllerSocketReporter.h"
+#include "utils/MinifiConcurrentQueue.h"
 
 namespace org::apache::nifi::minifi::c2 {
 
@@ -61,6 +66,43 @@ class ControllerSocketProtocol {
   std::weak_ptr<ControllerSocketReporter> controller_socket_reporter_;
   std::shared_ptr<Configure> configuration_;
   std::shared_ptr<core::logging::Logger> logger_ = 
core::logging::LoggerFactory<ControllerSocketProtocol>::getLogger();
+  std::mutex initialization_mutex_;
+
+  // Some commands need to restart the controller socket to reinitialize the 
socket with new data for example new SSL data in case of a flow update
+  // These commands are handled on a separate thread, and while these commands 
are handled other incoming commands are dropped
+  class SocketRestartCommandProcessor {
+   public:
+    explicit SocketRestartCommandProcessor(state::StateMonitor& update_sink_);
+    ~SocketRestartCommandProcessor();
+
+    enum class Command {
+      FLOW_UPDATE,
+      START
+    };
+
+    struct CommandData {
+      Command command;
+      std::string data;
+    };
+
+    void enqueue(const CommandData& command_data) {
+      is_socket_restarting_ = true;
+      command_queue_.enqueue(command_data);
+    }
+
+    bool isSocketRestarting() const {
+      return is_socket_restarting_;
+    }
+
+   private:
+    mutable std::atomic_bool is_socket_restarting_ = false;
+    state::StateMonitor& update_sink_;
+    std::thread command_processor_thread_;
+    std::atomic_bool running_ = true;
+    utils::ConditionConcurrentQueue<CommandData> command_queue_;
+  };
+
+  SocketRestartCommandProcessor socket_restart_processor_;
 };
 
 }  // namespace org::apache::nifi::minifi::c2
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 896dd88d9..082ac92dd 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -53,7 +53,6 @@ 
FlowController::FlowController(std::shared_ptr<core::Repository> provenance_repo
                                std::shared_ptr<utils::file::FileSystem> 
filesystem, std::function<void()> request_restart)
     : 
core::controller::ForwardingControllerServiceProvider(core::getClassName<FlowController>()),
       running_(false),
-      updating_(false),
       initialized_(false),
       thread_pool_(5, false, nullptr, "Flowcontroller threadpool"),
       configuration_(std::move(configure)),
@@ -128,10 +127,9 @@ bool FlowController::applyConfiguration(const std::string 
&source, const std::st
 
   logger_->log_info("Starting to reload Flow Controller with flow control name 
%s, version %d", newRoot->getName(), newRoot->getVersion());
 
-  updating_ = true;
   bool started = false;
-
   {
+    std::scoped_lock<UpdateState> update_lock(updating_);
     std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
     stop();
 
@@ -155,8 +153,6 @@ bool FlowController::applyConfiguration(const std::string 
&source, const std::st
     }
   }
 
-  updating_ = false;
-
   if (started) {
     auto flowVersion = flow_configuration_->getFlowVersion();
     if (flowVersion) {
@@ -193,7 +189,6 @@ int16_t FlowController::stop() {
     this->content_repo_->stop();
     // stop the ControllerServices
     disableAllControllerServices();
-    initialized_ = false;
     running_ = false;
   }
   return 0;
@@ -265,14 +260,17 @@ void FlowController::load(bool reload) {
     io::NetworkPrioritizerFactory::getInstance()->clearPrioritizer();
   }
 
-  if (!root_wrapper_.initialized()) {
-    logger_->log_info("Instantiating new flow");
-    root_wrapper_.setNewRoot(loadInitialFlow());
-    if (c2_agent_ && !c2_agent_->isControllerRunning()) {
-      // TODO(lordgamez): this initialization configures the C2 sender 
protocol (e.g. RESTSender) which may contain an SSL Context service from the 
flow config
-      // for SSL communication. This service may change on flow update and we 
should take care of the SSL Context Service change in the C2 Agent.
-      c2_agent_->initialize(this, this, this);
-      c2_agent_->start();
+  {
+    std::scoped_lock<UpdateState> update_lock(updating_);
+    if (!root_wrapper_.initialized()) {
+      logger_->log_info("Instantiating new flow");
+      root_wrapper_.setNewRoot(loadInitialFlow());
+      if (c2_agent_ && !c2_agent_->isControllerRunning()) {
+        // TODO(lordgamez): this initialization configures the C2 sender 
protocol (e.g. RESTSender) which may contain an SSL Context service from the 
flow config
+        // for SSL communication. This service may change on flow update and 
we should take care of the SSL Context Service change in the C2 Agent.
+        c2_agent_->initialize(this, this, this);
+        c2_agent_->start();
+      }
     }
   }
 
@@ -333,7 +331,7 @@ int16_t FlowController::start() {
     logger_->log_info("Starting Flow Controller");
     enableAllControllerServices();
     if (controller_socket_protocol_) {
-      // Initialization is postponed after initializing the flow so the 
controller socket may load the SSL context defined in the flow configuration
+      // Initialization is postponed after controller services are enabled so 
the controller socket may load the SSL context defined in the flow configuration
       controller_socket_protocol_->initialize();
     }
     timer_scheduler_->start();
@@ -403,7 +401,7 @@ int16_t FlowController::clearConnection(const std::string 
&connection) {
 }
 
 void 
FlowController::executeOnAllComponents(std::function<void(state::StateController&)>
 func) {
-  if (updating_ || !initialized_) {
+  if (updating_.isUpdating()) {
     return;
   }
   std::lock_guard<std::recursive_mutex> lock(mutex_);
@@ -413,7 +411,7 @@ void 
FlowController::executeOnAllComponents(std::function<void(state::StateContr
 }
 
 void FlowController::executeOnComponent(const std::string &id_or_name, 
std::function<void(state::StateController&)> func) {
-  if (updating_ || !initialized_) {
+  if (updating_.isUpdating()) {
     return;
   }
   std::lock_guard<std::recursive_mutex> lock(mutex_);
diff --git a/libminifi/src/c2/ControllerSocketProtocol.cpp 
b/libminifi/src/c2/ControllerSocketProtocol.cpp
index c715c43a3..b64597a83 100644
--- a/libminifi/src/c2/ControllerSocketProtocol.cpp
+++ b/libminifi/src/c2/ControllerSocketProtocol.cpp
@@ -30,17 +30,47 @@
 
 namespace org::apache::nifi::minifi::c2 {
 
+ControllerSocketProtocol::SocketRestartCommandProcessor::SocketRestartCommandProcessor(state::StateMonitor&
 update_sink) :
+    update_sink_(update_sink) {
+  command_queue_.start();
+  command_processor_thread_ = std::thread([this] {
+    while (running_) {
+      CommandData command_data;
+      if (command_queue_.dequeueWait(command_data)) {
+        if (command_data.command == Command::FLOW_UPDATE) {
+          update_sink_.applyUpdate("ControllerSocketProtocol", 
command_data.data, true);
+        } else if (command_data.command == Command::START) {
+          update_sink_.executeOnComponent(command_data.data, 
[](state::StateController& component) {
+            component.start();
+          });
+        }
+      }
+      is_socket_restarting_ = false;
+    }
+  });
+}
+
+ControllerSocketProtocol::SocketRestartCommandProcessor::~SocketRestartCommandProcessor()
 {
+  running_ = false;
+  command_queue_.stop();
+  if (command_processor_thread_.joinable()) {
+    command_processor_thread_.join();
+  }
+}
+
 
ControllerSocketProtocol::ControllerSocketProtocol(core::controller::ControllerServiceProvider&
 controller, state::StateMonitor& update_sink,
     std::shared_ptr<Configure> configuration, const 
std::shared_ptr<ControllerSocketReporter>& controller_socket_reporter)
       : controller_(controller),
         update_sink_(update_sink),
         controller_socket_reporter_(controller_socket_reporter),
-        configuration_(std::move(configuration)) {
+        configuration_(std::move(configuration)),
+        socket_restart_processor_(update_sink_) {
   gsl_Expects(configuration_);
   stream_factory_ = minifi::io::StreamFactory::getInstance(configuration_);
 }
 
 void ControllerSocketProtocol::initialize() {
+  std::unique_lock<std::mutex> lock(initialization_mutex_);
   std::shared_ptr<minifi::controllers::SSLContextService> secure_context;
   std::string context_name;
   if (configuration_->get(Configure::controller_ssl_context_service, 
context_name)) {
@@ -100,9 +130,14 @@ void ControllerSocketProtocol::handleStart(io::BaseStream 
*stream) {
   std::string component_str;
   const auto size = stream->read(component_str);
   if (!io::isError(size)) {
-    update_sink_.executeOnComponent(component_str, [](state::StateController& 
component) {
-      component.start();
-    });
+    if (component_str == "FlowController") {
+      // Starting flow controller resets socket
+      
socket_restart_processor_.enqueue({SocketRestartCommandProcessor::Command::START,
 component_str});
+    } else {
+      update_sink_.executeOnComponent(component_str, 
[](state::StateController& component) {
+        component.start();
+      });
+    }
   } else {
     logger_->log_debug("Connection broke");
   }
@@ -149,7 +184,7 @@ void ControllerSocketProtocol::handleUpdate(io::BaseStream 
*stream) {
     std::ifstream tf(ff_loc);
     std::string flow_configuration((std::istreambuf_iterator<char>(tf)),
         std::istreambuf_iterator<char>());
-    update_sink_.applyUpdate("ControllerSocketProtocol", flow_configuration);
+    
socket_restart_processor_.enqueue({SocketRestartCommandProcessor::Command::FLOW_UPDATE,
 flow_configuration});
   }
 }
 
@@ -179,7 +214,7 @@ void 
ControllerSocketProtocol::writeQueueSizesResponse(io::BaseStream *stream) {
 
 void ControllerSocketProtocol::writeComponentsResponse(io::BaseStream *stream) 
{
   std::vector<std::pair<std::string, bool>> components;
-  update_sink_.executeOnAllComponents([&components](state::StateController& 
component){
+  update_sink_.executeOnAllComponents([&components](state::StateController& 
component) {
     components.emplace_back(component.getComponentName(), 
component.isRunning());
   });
   io::BufferStream resp;
@@ -296,6 +331,12 @@ void 
ControllerSocketProtocol::handleCommand(io::BaseStream *stream) {
     logger_->log_debug("Connection broke");
     return;
   }
+
+  if (socket_restart_processor_.isSocketRestarting()) {
+    logger_->log_debug("Socket restarting, dropping command");
+    return;
+  }
+
   switch (head) {
     case Operation::START:
       handleStart(stream);
diff --git a/libminifi/src/core/ProcessGroup.cpp 
b/libminifi/src/core/ProcessGroup.cpp
index 509c0542d..942cefdf4 100644
--- a/libminifi/src/core/ProcessGroup.cpp
+++ b/libminifi/src/core/ProcessGroup.cpp
@@ -133,6 +133,7 @@ void 
ProcessGroup::startProcessingProcessors(TimerDrivenSchedulingAgent& timeSch
   for (const auto processor : failed_processors_) {
     try {
       logger_->log_debug("Starting %s", processor->getName());
+      processor->setScheduledState(core::ScheduledState::RUNNING);
       switch (processor->getSchedulingStrategy()) {
         case TIMER_DRIVEN:
           timeScheduler.schedule(processor);
@@ -147,10 +148,12 @@ void 
ProcessGroup::startProcessingProcessors(TimerDrivenSchedulingAgent& timeSch
     }
     catch (const std::exception &e) {
       logger_->log_error("Failed to start processor %s (%s): %s", 
processor->getUUIDStr(), processor->getName(), e.what());
+      processor->setScheduledState(core::ScheduledState::STOPPED);
       failed_processors.insert(processor);
     }
     catch (...) {
       logger_->log_error("Failed to start processor %s (%s)", 
processor->getUUIDStr(), processor->getName());
+      processor->setScheduledState(core::ScheduledState::STOPPED);
       failed_processors.insert(processor);
     }
   }

Reply via email to