This is an automated email from the ASF dual-hosted git repository. szaszm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit ca1e4357daecf56dbfaf78a968c23f7b21bdfef4 Author: Gabor Gyimesi <[email protected]> AuthorDate: Wed May 31 15:34:40 2023 +0200 MINIFICPP-2112 Fix flow update and restart with minifi controller Closes #1568 Signed-off-by: Marton Szasz <[email protected]> --- cmake/MiNiFiOptions.cmake | 2 + controller/tests/ControllerTests.cpp | 12 +-- .../integration/MiNiFi_integration_test_driver.py | 33 +++++++ docker/test/integration/cluster/ContainerStore.py | 3 + .../test/integration/cluster/DockerTestCluster.py | 45 ++++++++- .../cluster/DockerTestDirectoryBindings.py | 1 + .../cluster/MinifiControllerExecutor.py | 68 +++++++++++++ .../cluster/containers/MinifiContainer.py | 12 ++- docker/test/integration/features/CMakeLists.txt | 64 +++++++------ .../integration/features/minifi_controller.feature | 75 +++++++++++++++ .../resources/minifi-controller/config.yml | 30 ++++++ docker/test/integration/steps/steps.py | 106 +++++++++++++++++++++ docker/test/integration/utils.py | 9 +- libminifi/include/FlowController.h | 16 +++- libminifi/include/c2/ControllerSocketProtocol.h | 42 ++++++++ libminifi/src/FlowController.cpp | 32 +++---- libminifi/src/c2/ControllerSocketProtocol.cpp | 53 +++++++++-- libminifi/src/core/ProcessGroup.cpp | 3 + 18 files changed, 538 insertions(+), 68 deletions(-) diff --git a/cmake/MiNiFiOptions.cmake b/cmake/MiNiFiOptions.cmake index 1da46a8da..9600f34b3 100644 --- a/cmake/MiNiFiOptions.cmake +++ b/cmake/MiNiFiOptions.cmake @@ -121,6 +121,8 @@ add_minifi_option(ENABLE_KUBERNETES "Enables the Kubernetes extensions." ON) add_minifi_option(ENABLE_TEST_PROCESSORS "Enables test processors" OFF) add_minifi_option(ENABLE_PROMETHEUS "Enables Prometheus support." ON) add_minifi_option(DISABLE_JEMALLOC "Disables jemalloc." OFF) +add_minifi_option(DISABLE_CURL "Disables curl." OFF) +add_minifi_option(DISABLE_CONTROLLER "Disables build of MiNiFi controller binary." OFF) set_minifi_cache_variable(CUSTOM_MALLOC OFF "Overwrite malloc implementation.") set_property(CACHE CUSTOM_MALLOC PROPERTY STRINGS "jemalloc" "mimalloc" "rpmalloc" OFF) diff --git a/controller/tests/ControllerTests.cpp b/controller/tests/ControllerTests.cpp index c88af54df..82f90e1bc 100644 --- a/controller/tests/ControllerTests.cpp +++ b/controller/tests/ControllerTests.cpp @@ -314,14 +314,14 @@ TEST_CASE_METHOD(ControllerTestFixture, "Test listComponents", "[controllerTests } using org::apache::nifi::minifi::utils::verifyEventHappenedInPollTime; - REQUIRE(verifyEventHappenedInPollTime(500ms, [&] { return controller_->isRunning(); }, 20ms)); + REQUIRE(verifyEventHappenedInPollTime(5s, [&] { return controller_->isRunning(); }, 20ms)); { auto socket = createSocket(); minifi::controller::stopComponent(std::move(socket), "TestStateController"); } - REQUIRE(verifyEventHappenedInPollTime(500ms, [&] { return !controller_->isRunning(); }, 20ms)); + REQUIRE(verifyEventHappenedInPollTime(5s, [&] { return !controller_->isRunning(); }, 20ms)); { auto socket = createSocket(); @@ -353,14 +353,14 @@ TEST_CASE_METHOD(ControllerTestFixture, "TestClear", "[controllerTests]") { } using org::apache::nifi::minifi::utils::verifyEventHappenedInPollTime; - REQUIRE(verifyEventHappenedInPollTime(500ms, [&] { return controller_->isRunning(); }, 20ms)); + REQUIRE(verifyEventHappenedInPollTime(5s, [&] { return controller_->isRunning(); }, 20ms)); for (auto i = 0; i < 3; ++i) { auto socket = createSocket(); minifi::controller::clearConnection(std::move(socket), "connection"); } - REQUIRE(verifyEventHappenedInPollTime(500ms, [&] { return 3 == update_sink_->clear_calls; }, 20ms)); + REQUIRE(verifyEventHappenedInPollTime(5s, [&] { return 3 == update_sink_->clear_calls; }, 20ms)); } TEST_CASE_METHOD(ControllerTestFixture, "TestUpdate", "[controllerTests]") { @@ -384,7 +384,7 @@ TEST_CASE_METHOD(ControllerTestFixture, "TestUpdate", "[controllerTests]") { } using org::apache::nifi::minifi::utils::verifyEventHappenedInPollTime; - REQUIRE(verifyEventHappenedInPollTime(500ms, [&] { return controller_->isRunning(); }, 20ms)); + REQUIRE(verifyEventHappenedInPollTime(5s, [&] { return controller_->isRunning(); }, 20ms)); std::stringstream ss; @@ -393,7 +393,7 @@ TEST_CASE_METHOD(ControllerTestFixture, "TestUpdate", "[controllerTests]") { minifi::controller::updateFlow(std::move(socket), ss, "connection"); } - REQUIRE(verifyEventHappenedInPollTime(500ms, [&] { return 1 == update_sink_->update_calls; }, 20ms)); + REQUIRE(verifyEventHappenedInPollTime(5s, [&] { return 1 == update_sink_->update_calls; }, 20ms)); REQUIRE(0 == update_sink_->clear_calls); } diff --git a/docker/test/integration/MiNiFi_integration_test_driver.py b/docker/test/integration/MiNiFi_integration_test_driver.py index 981b0802c..bcaddb33b 100644 --- a/docker/test/integration/MiNiFi_integration_test_driver.py +++ b/docker/test/integration/MiNiFi_integration_test_driver.py @@ -332,3 +332,36 @@ class MiNiFi_integration_test: def set_yaml_in_minifi(self): self.cluster.set_yaml_in_minifi() + + def set_controller_socket_properties_in_minifi(self): + self.cluster.set_controller_socket_properties_in_minifi() + + def update_flow_config_through_controller(self, container_name: str): + self.cluster.update_flow_config_through_controller(container_name) + + def check_minifi_controller_updated_config_is_persisted(self, container_name: str): + assert self.cluster.check_minifi_controller_updated_config_is_persisted(container_name) or self.cluster.log_app_output() + + def stop_component_through_controller(self, component: str, container_name: str): + self.cluster.stop_component_through_controller(component, container_name) + + def start_component_through_controller(self, component: str, container_name: str): + self.cluster.start_component_through_controller(component, container_name) + + def check_component_not_running_through_controller(self, component: str, container_name: str): + assert self.cluster.check_component_not_running_through_controller(component, container_name) or self.cluster.log_app_output() + + def check_component_running_through_controller(self, component: str, container_name: str): + assert self.cluster.check_component_running_through_controller(component, container_name) or self.cluster.log_app_output() + + def connection_found_through_controller(self, connection: str, container_name: str): + assert self.cluster.connection_found_through_controller(connection, container_name) or self.cluster.log_app_output() + + def check_connections_full_through_controller(self, connection_count: int, container_name: str): + assert self.cluster.check_connections_full_through_controller(connection_count, container_name) or self.cluster.log_app_output() + + def check_connection_size_through_controller(self, connection: str, size: int, max_size: int, container_name: str): + assert self.cluster.check_connection_size_through_controller(connection, size, max_size, container_name) or self.cluster.log_app_output() + + def manifest_can_be_retrieved_through_minifi_controller(self, container_name: str): + assert self.cluster.manifest_can_be_retrieved_through_minifi_controller(container_name) or self.cluster.log_app_output() diff --git a/docker/test/integration/cluster/ContainerStore.py b/docker/test/integration/cluster/ContainerStore.py index a463a5e0c..7a511b7c5 100644 --- a/docker/test/integration/cluster/ContainerStore.py +++ b/docker/test/integration/cluster/ContainerStore.py @@ -166,6 +166,9 @@ class ContainerStore: def set_yaml_in_minifi(self): self.minifi_options.config_format = "yaml" + def set_controller_socket_properties_in_minifi(self): + self.minifi_options.enable_controller_socket = True + def get_startup_finished_log_entry(self, container_name): return self.containers[container_name].get_startup_finished_log_entry() diff --git a/docker/test/integration/cluster/DockerTestCluster.py b/docker/test/integration/cluster/DockerTestCluster.py index c38b0cfb3..732837245 100644 --- a/docker/test/integration/cluster/DockerTestCluster.py +++ b/docker/test/integration/cluster/DockerTestCluster.py @@ -19,6 +19,7 @@ import re from .LogSource import LogSource from .ContainerStore import ContainerStore from .DockerCommunicator import DockerCommunicator +from .MinifiControllerExecutor import MinifiControllerExecutor from .checkers.AwsChecker import AwsChecker from .checkers.AzureChecker import AzureChecker from .checkers.ElasticSearchChecker import ElasticSearchChecker @@ -26,7 +27,7 @@ from .checkers.GcsChecker import GcsChecker from .checkers.PostgresChecker import PostgresChecker from .checkers.PrometheusChecker import PrometheusChecker from .checkers.SplunkChecker import SplunkChecker -from utils import get_peak_memory_usage, get_minifi_pid, get_memory_usage +from utils import get_peak_memory_usage, get_minifi_pid, get_memory_usage, retry_check class DockerTestCluster: @@ -42,6 +43,7 @@ class DockerTestCluster: self.postgres_checker = PostgresChecker(self.container_communicator) self.splunk_checker = SplunkChecker(self.container_communicator) self.prometheus_checker = PrometheusChecker() + self.minifi_controller_executor = MinifiControllerExecutor(self.container_communicator) def cleanup(self): self.container_store.cleanup() @@ -91,6 +93,9 @@ class DockerTestCluster: def set_yaml_in_minifi(self): self.container_store.set_yaml_in_minifi() + def set_controller_socket_properties_in_minifi(self): + self.container_store.set_controller_socket_properties_in_minifi() + def get_app_log(self, container_name): log_source = self.container_store.log_source(container_name) if log_source == LogSource.FROM_DOCKER_CONTAINER: @@ -271,3 +276,41 @@ class DockerTestCluster: time.sleep(1) logging.warning(f"Memory usage ({current_memory_usage}) is more than the maximum asserted memory usage ({max_memory_usage})") return False + + def update_flow_config_through_controller(self, container_name: str): + self.minifi_controller_executor.update_flow(container_name) + + @retry_check(10, 1) + def check_minifi_controller_updated_config_is_persisted(self, container_name: str) -> bool: + return self.minifi_controller_executor.updated_config_is_persisted(container_name) + + def stop_component_through_controller(self, component: str, container_name: str): + self.minifi_controller_executor.stop_component(component, container_name) + + def start_component_through_controller(self, component: str, container_name: str): + self.minifi_controller_executor.start_component(component, container_name) + + @retry_check(10, 1) + def check_component_not_running_through_controller(self, component: str, container_name: str) -> bool: + return not self.minifi_controller_executor.is_component_running(component, container_name) + + @retry_check(10, 1) + def check_component_running_through_controller(self, component: str, container_name: str) -> bool: + return self.minifi_controller_executor.is_component_running(component, container_name) + + @retry_check(10, 1) + def connection_found_through_controller(self, connection: str, container_name: str) -> bool: + return connection in self.minifi_controller_executor.get_connections(container_name) + + @retry_check(10, 1) + def check_connections_full_through_controller(self, connection_count: int, container_name: str) -> bool: + return self.minifi_controller_executor.get_full_connection_count(container_name) == connection_count + + @retry_check(10, 1) + def check_connection_size_through_controller(self, connection: str, size: int, max_size: int, container_name: str) -> bool: + return self.minifi_controller_executor.get_connection_size(connection, container_name) == (size, max_size) + + @retry_check(10, 1000) + def manifest_can_be_retrieved_through_minifi_controller(self, container_name: str) -> bool: + manifest = self.minifi_controller_executor.get_manifest(container_name) + return '"agentManifest": {' in manifest and '"componentManifest": {' in manifest and '"agentType": "cpp"' in manifest diff --git a/docker/test/integration/cluster/DockerTestDirectoryBindings.py b/docker/test/integration/cluster/DockerTestDirectoryBindings.py index 228e3837a..259d3a44d 100644 --- a/docker/test/integration/cluster/DockerTestDirectoryBindings.py +++ b/docker/test/integration/cluster/DockerTestDirectoryBindings.py @@ -58,6 +58,7 @@ class DockerTestDirectoryBindings: shutil.copytree(test_dir + "/resources/elasticsearch/certs", self.data_directories[self.test_id]["resources_dir"] + "/elasticsearch") shutil.copytree(test_dir + "/resources/opensearch/certs", self.data_directories[self.test_id]["resources_dir"] + "/opensearch") shutil.copytree(test_dir + "/resources/minifi-c2-server-ssl/certs", self.data_directories[self.test_id]["resources_dir"] + "/minifi-c2-server-ssl") + shutil.copytree(test_dir + "/resources/minifi-controller", self.data_directories[self.test_id]["resources_dir"] + "/minifi-controller") shutil.copytree(test_dir + "/resources/minifi", self.data_directories[self.test_id]["minifi_config_dir"], dirs_exist_ok=True) def get_data_directories(self, test_id): diff --git a/docker/test/integration/cluster/MinifiControllerExecutor.py b/docker/test/integration/cluster/MinifiControllerExecutor.py new file mode 100644 index 000000000..d7290df89 --- /dev/null +++ b/docker/test/integration/cluster/MinifiControllerExecutor.py @@ -0,0 +1,68 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from .DockerCommunicator import DockerCommunicator + + +class MinifiControllerExecutor: + def __init__(self, container_communicator: DockerCommunicator): + self.container_communicator = container_communicator + + def update_flow(self, container_name: str): + self.container_communicator.execute_command(container_name, ["/opt/minifi/minifi-current/bin/minificontroller", "--updateflow", "/tmp/resources/minifi-controller/config.yml"]) + + def updated_config_is_persisted(self, container_name: str) -> bool: + (code, output) = self.container_communicator.execute_command(container_name, ["cat", "/opt/minifi/minifi-current/conf/config.yml"]) + return code == 0 and "2f2a3b47-f5ba-49f6-82b5-bc1c86b96f38" in output + + def stop_component(self, component: str, container_name: str): + self.container_communicator.execute_command(container_name, ["/opt/minifi/minifi-current/bin/minificontroller", "--stop", component]) + + def start_component(self, component: str, container_name: str): + self.container_communicator.execute_command(container_name, ["/opt/minifi/minifi-current/bin/minificontroller", "--start", component]) + + def is_component_running(self, component: str, container_name: str) -> bool: + (code, output) = self.container_communicator.execute_command(container_name, ["/opt/minifi/minifi-current/bin/minificontroller", "--list", "components"]) + return code == 0 and component + ", running: true" in output + + def get_connections(self, container_name: str): + (_, output) = self.container_communicator.execute_command(container_name, ["/opt/minifi/minifi-current/bin/minificontroller", "--list", "connections"]) + connections = [] + for line in output.split('\n'): + if not line.startswith('[') and not line.startswith('Connection Names'): + connections.append(line) + return connections + + def get_full_connection_count(self, container_name: str) -> int: + (_, output) = self.container_communicator.execute_command(container_name, ["/opt/minifi/minifi-current/bin/minificontroller", "--getfull"]) + for line in output.split('\n'): + if "are full" in line: + return int(line.split(' ')[0]) + return -1 + + def get_connection_size(self, connection: str, container_name: str): + (_, output) = self.container_communicator.execute_command(container_name, ["/opt/minifi/minifi-current/bin/minificontroller", "--getsize", connection]) + for line in output.split('\n'): + if "Size/Max of " + connection in line: + size_and_max = line.split(connection)[1].split('/') + return (int(size_and_max[0].strip()), int(size_and_max[1].strip())) + return (-1, -1) + + def get_manifest(self, container_name: str) -> str: + (_, output) = self.container_communicator.execute_command(container_name, ["/opt/minifi/minifi-current/bin/minificontroller", "--manifest"]) + manifest = "" + for line in output.split('\n'): + if not line.startswith('['): + manifest += line + return manifest diff --git a/docker/test/integration/cluster/containers/MinifiContainer.py b/docker/test/integration/cluster/containers/MinifiContainer.py index e784c647b..d9f47a254 100644 --- a/docker/test/integration/cluster/containers/MinifiContainer.py +++ b/docker/test/integration/cluster/containers/MinifiContainer.py @@ -34,6 +34,7 @@ class MinifiOptions: self.config_format = "json" self.use_flow_config_from_url = False self.set_ssl_context_properties = False + self.enable_controller_socket = False class MinifiContainer(FlowContainer): @@ -46,6 +47,7 @@ class MinifiContainer(FlowContainer): super().__init__(config_dir, name, 'minifi-cpp', copy.copy(vols), network, image_store, command) self.container_specific_config_dir = self._create_container_config_dir(self.config_dir) + os.chmod(self.container_specific_config_dir, 0o777) def _create_container_config_dir(self, config_dir): container_config_dir = os.path.join(config_dir, str(uuid.uuid4())) @@ -69,8 +71,10 @@ class MinifiContainer(FlowContainer): assert False, "Invalid flow configuration format: {}".format(self.options.config_format) test_flow_yaml = serializer.serialize(self.start_nodes, self.controllers) logging.info('Using generated flow config yml:\n%s', test_flow_yaml) - with open(os.path.join(self.container_specific_config_dir, "config.yml"), 'wb') as config_file: + absolute_flow_config_path = os.path.join(self.container_specific_config_dir, "config.yml") + with open(absolute_flow_config_path, 'wb') as config_file: config_file.write(test_flow_yaml.encode('utf-8')) + os.chmod(absolute_flow_config_path, 0o777) def _create_properties(self): properties_file_path = os.path.join(self.container_specific_config_dir, 'minifi.properties') @@ -114,6 +118,12 @@ class MinifiContainer(FlowContainer): if self.options.use_flow_config_from_url: f.write("nifi.c2.flow.url=http://minifi-c2-server:10090/c2/config?class=minifi-test-class\n") + if self.options.enable_controller_socket: + f.write("controller.socket.enable=true\n") + f.write("controller.socket.host=localhost\n") + f.write("controller.socket.port=9998\n") + f.write("controller.socket.local.any.interface=false\n") + def _setup_config(self): self._create_properties() if not self.options.use_flow_config_from_url: diff --git a/docker/test/integration/features/CMakeLists.txt b/docker/test/integration/features/CMakeLists.txt index f0ca97a77..93abb54cd 100644 --- a/docker/test/integration/features/CMakeLists.txt +++ b/docker/test/integration/features/CMakeLists.txt @@ -15,76 +15,82 @@ # specific language governing permissions and limitations # under the License. +set(ENABLED_BEHAVE_TESTS "") + if (ENABLE_LIBRDKAFKA) - set(ENABLED_BEHAVE_TESTS "${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/kafka.feature") + list(APPEND ENABLED_BEHAVE_TESTS "${CMAKE_CURRENT_SOURCE_DIR}/kafka.feature") endif() -set(ENABLED_BEHAVE_TESTS "${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/attributes_to_json.feature") -set(ENABLED_BEHAVE_TESTS "${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/core_functionality.feature") -set(ENABLED_BEHAVE_TESTS "${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/defragtextflowfiles.feature") -set(ENABLED_BEHAVE_TESTS "${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/file_system_operations.feature") -set(ENABLED_BEHAVE_TESTS "${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/hashcontent.feature") -set(ENABLED_BEHAVE_TESTS "${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/replace_text.feature") -set(ENABLED_BEHAVE_TESTS "${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/routetext.feature") -set(ENABLED_BEHAVE_TESTS "${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/s2s.feature") -set(ENABLED_BEHAVE_TESTS "${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/syslog_listener.feature") -set(ENABLED_BEHAVE_TESTS "${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/network_listener.feature") -set(ENABLED_BEHAVE_TESTS "${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/minifi_c2_server.feature") +list(APPEND ENABLED_BEHAVE_TESTS "${CMAKE_CURRENT_SOURCE_DIR}/attributes_to_json.feature") +list(APPEND ENABLED_BEHAVE_TESTS "${CMAKE_CURRENT_SOURCE_DIR}/core_functionality.feature") +list(APPEND ENABLED_BEHAVE_TESTS "${CMAKE_CURRENT_SOURCE_DIR}/defragtextflowfiles.feature") +list(APPEND ENABLED_BEHAVE_TESTS "${CMAKE_CURRENT_SOURCE_DIR}/file_system_operations.feature") +list(APPEND ENABLED_BEHAVE_TESTS "${CMAKE_CURRENT_SOURCE_DIR}/hashcontent.feature") +list(APPEND ENABLED_BEHAVE_TESTS "${CMAKE_CURRENT_SOURCE_DIR}/replace_text.feature") +list(APPEND ENABLED_BEHAVE_TESTS "${CMAKE_CURRENT_SOURCE_DIR}/routetext.feature") +list(APPEND ENABLED_BEHAVE_TESTS "${CMAKE_CURRENT_SOURCE_DIR}/s2s.feature") +list(APPEND ENABLED_BEHAVE_TESTS "${CMAKE_CURRENT_SOURCE_DIR}/syslog_listener.feature") +list(APPEND ENABLED_BEHAVE_TESTS "${CMAKE_CURRENT_SOURCE_DIR}/network_listener.feature") +list(APPEND ENABLED_BEHAVE_TESTS "${CMAKE_CURRENT_SOURCE_DIR}/minifi_c2_server.feature") if (ENABLE_AZURE) - set(ENABLED_BEHAVE_TESTS "${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/azure_storage.feature") + list(APPEND ENABLED_BEHAVE_TESTS "${CMAKE_CURRENT_SOURCE_DIR}/azure_storage.feature") endif() if (ENABLE_GCP) - set(ENABLED_BEHAVE_TESTS "${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/google_cloud_storage.feature") + list(APPEND ENABLED_BEHAVE_TESTS "${CMAKE_CURRENT_SOURCE_DIR}/google_cloud_storage.feature") endif() if (NOT DISABLE_CIVET) - set(ENABLED_BEHAVE_TESTS "${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/http.feature") - set(ENABLED_BEHAVE_TESTS "${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/https.feature") + list(APPEND ENABLED_BEHAVE_TESTS "${CMAKE_CURRENT_SOURCE_DIR}/http.feature") + list(APPEND ENABLED_BEHAVE_TESTS "${CMAKE_CURRENT_SOURCE_DIR}/https.feature") endif() if (ENABLE_KUBERNETES) - set(ENABLED_BEHAVE_TESTS "${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/kubernetes_logging.feature") - set(ENABLED_BEHAVE_TESTS "${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/kubernetes_metrics.feature") + list(APPEND ENABLED_BEHAVE_TESTS "${CMAKE_CURRENT_SOURCE_DIR}/kubernetes_logging.feature") + list(APPEND ENABLED_BEHAVE_TESTS "${CMAKE_CURRENT_SOURCE_DIR}/kubernetes_metrics.feature") endif() if (ENABLE_MQTT) - set(ENABLED_BEHAVE_TESTS "${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/mqtt.feature") + list(APPEND ENABLED_BEHAVE_TESTS "${CMAKE_CURRENT_SOURCE_DIR}/mqtt.feature") endif() if (ENABLE_OPC) - set(ENABLED_BEHAVE_TESTS "${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/opcua.feature") + list(APPEND ENABLED_BEHAVE_TESTS "${CMAKE_CURRENT_SOURCE_DIR}/opcua.feature") endif() if (ENABLE_PYTHON_SCRIPTING) - set(ENABLED_BEHAVE_TESTS "${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/python.feature") - set(ENABLED_BEHAVE_TESTS "${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/python_script.feature") + list(APPEND ENABLED_BEHAVE_TESTS "${CMAKE_CURRENT_SOURCE_DIR}/python.feature") + list(APPEND ENABLED_BEHAVE_TESTS "${CMAKE_CURRENT_SOURCE_DIR}/python_script.feature") endif() if (ENABLE_LUA_SCRIPTING) - set(ENABLED_BEHAVE_TESTS "${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/lua_script.feature") + list(APPEND ENABLED_BEHAVE_TESTS "${CMAKE_CURRENT_SOURCE_DIR}/lua_script.feature") endif() if (ENABLE_AWS) - set(ENABLED_BEHAVE_TESTS "${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/s3.feature") + list(APPEND ENABLED_BEHAVE_TESTS "${CMAKE_CURRENT_SOURCE_DIR}/s3.feature") endif() if (ENABLE_SPLUNK) - set(ENABLED_BEHAVE_TESTS "${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/splunk.feature") + list(APPEND ENABLED_BEHAVE_TESTS "${CMAKE_CURRENT_SOURCE_DIR}/splunk.feature") endif() if (ENABLE_SQL) - set(ENABLED_BEHAVE_TESTS "${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/sql.feature") + list(APPEND ENABLED_BEHAVE_TESTS "${CMAKE_CURRENT_SOURCE_DIR}/sql.feature") endif() if (ENABLE_ELASTICSEARCH AND NOT CI_BUILD) # Elasticsearch/Opensearch containers require more RAM than what the CI environment has - set(ENABLED_BEHAVE_TESTS "${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/elasticsearch.feature") - set(ENABLED_BEHAVE_TESTS "${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/opensearch.feature") + list(APPEND ENABLED_BEHAVE_TESTS "${CMAKE_CURRENT_SOURCE_DIR}/elasticsearch.feature") + list(APPEND ENABLED_BEHAVE_TESTS "${CMAKE_CURRENT_SOURCE_DIR}/opensearch.feature") endif() if (ENABLE_PROMETHEUS) - set(ENABLED_BEHAVE_TESTS "${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/prometheus.feature") + list(APPEND ENABLED_BEHAVE_TESTS "${CMAKE_CURRENT_SOURCE_DIR}/prometheus.feature") +endif() + +if (NOT DISABLE_CURL AND NOT DISABLE_CONTROLLER) + list(APPEND ENABLED_BEHAVE_TESTS "${CMAKE_CURRENT_SOURCE_DIR}/minifi_controller.feature") endif() file(GLOB FEATURE_FILES "*.feature") diff --git a/docker/test/integration/features/minifi_controller.feature b/docker/test/integration/features/minifi_controller.feature new file mode 100644 index 000000000..700a782bd --- /dev/null +++ b/docker/test/integration/features/minifi_controller.feature @@ -0,0 +1,75 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +Feature: MiNiFi Controller functionalities + Test MiNiFi Controller functionalities + + Background: + Given the content of "/tmp/output" is monitored + + Scenario: Flow config can be updated through MiNiFi controller + Given a GenerateFlowFile processor + And a file with the content "test" is present in "/tmp/input" + And controller socket properties are set up + When all instances start up + And MiNiFi config is updated through MiNiFi controller + Then a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds + And the updated config is persisted + + Scenario: A component can be stopped + Given a GenerateFlowFile processor + And a file with the content "test" is present in "/tmp/input" + And controller socket properties are set up + When all instances start up + And the GenerateFlowFile component is stopped through MiNiFi controller + Then the GenerateFlowFile component is not running + And the FlowController component is running + + Scenario: If FlowController is stopped all other components are stopped + Given a GenerateFlowFile processor + And a file with the content "test" is present in "/tmp/input" + And controller socket properties are set up + When all instances start up + And the FlowController component is stopped through MiNiFi controller + Then the GenerateFlowFile component is not running + And the FlowController component is not running + + Scenario: FlowController can be stopped and restarted + Given a GenerateFlowFile processor + And a file with the content "test" is present in "/tmp/input" + And controller socket properties are set up + When all instances start up + And the FlowController component is stopped through MiNiFi controller + And the FlowController component is started through MiNiFi controller + Then the GenerateFlowFile component is running + And the FlowController component is running + + Scenario: Queue state can be queried + Given a GenerateFlowFile processor + And a LogAttribute processor with the "FlowFiles To Log" property set to "0" + And the "success" relationship of the GenerateFlowFile processor is connected to the LogAttribute + And controller socket properties are set up + When all instances start up + And MiNiFi config is updated through MiNiFi controller + Then connection "GetFile/success/PutFile" can be seen through MiNiFi controller + And 0 connections can be seen full through MiNiFi controller + And connection "GetFile/success/PutFile" has 0 size and 2000 max size through MiNiFi controller + + Scenario: Manifest can be retrieved + Given a GenerateFlowFile processor + And a file with the content "test" is present in "/tmp/input" + And controller socket properties are set up + When all instances start up + Then manifest can be retrieved through MiNiFi controller diff --git a/docker/test/integration/resources/minifi-controller/config.yml b/docker/test/integration/resources/minifi-controller/config.yml new file mode 100644 index 000000000..60bf34b12 --- /dev/null +++ b/docker/test/integration/resources/minifi-controller/config.yml @@ -0,0 +1,30 @@ +MiNiFi Config Version: 3 +Flow Controller: + name: MiNiFi Flow +Processors: +- name: Get files from /tmp/input + id: 2f2a3b47-f5ba-49f6-82b5-bc1c86b96f38 + class: org.apache.nifi.minifi.processors.GetFile + scheduling strategy: TIMER_DRIVEN + scheduling period: 1000 ms + Properties: + Input Directory: /tmp/input +- name: Put files to /tmp/output + id: e143601d-de4f-44ba-a6ec-d1f97d77ec94 + class: org.apache.nifi.minifi.processors.PutFile + scheduling strategy: EVENT_DRIVEN + auto-terminated relationships list: + - failure + - success + Properties: + Conflict Resolution Strategy: fail + Create Missing Directories: 'true' + Directory: /tmp/output +Connections: +- name: GetFile/success/PutFile + id: 098a56ba-f4bf-4323-a3f3-6f8a5e3586bf + source id: 2f2a3b47-f5ba-49f6-82b5-bc1c86b96f38 + source relationship names: + - success + destination id: e143601d-de4f-44ba-a6ec-d1f97d77ec94 +Remote Process Groups: [] diff --git a/docker/test/integration/steps/steps.py b/docker/test/integration/steps/steps.py index 83baf1d44..06978dcdb 100644 --- a/docker/test/integration/steps/steps.py +++ b/docker/test/integration/steps/steps.py @@ -1064,3 +1064,109 @@ def step_impl(context, peak_usage_percent: str, duration: str) -> None: @given(u'a MiNiFi CPP server with yaml config') def step_impl(context): context.test.set_yaml_in_minifi() + + +# MiNiFi controller +@given(u'controller socket properties are set up') +def step_impl(context): + context.test.set_controller_socket_properties_in_minifi() + + +@when(u'MiNiFi config is updated through MiNiFi controller in the \"{minifi_container_name}\" flow') +def step_impl(context, minifi_container_name: str): + context.test.update_flow_config_through_controller(minifi_container_name) + + +@when(u'MiNiFi config is updated through MiNiFi controller') +def step_impl(context): + context.execute_steps("""when MiNiFi config is updated through MiNiFi controller in the \"{minifi_container_name}\" flow""".format(minifi_container_name="minifi-cpp-flow")) + + +@then(u'the updated config is persisted in the \"{minifi_container_name}\" flow') +def step_impl(context, minifi_container_name: str): + context.test.check_minifi_controller_updated_config_is_persisted(minifi_container_name) + + +@then(u'the updated config is persisted') +def step_impl(context): + context.execute_steps("""then the updated config is persisted in the \"{minifi_container_name}\" flow""".format(minifi_container_name="minifi-cpp-flow")) + + +@when(u'the {component} component is stopped through MiNiFi controller in the \"{minifi_container_name}\" flow') +def step_impl(context, minifi_container_name: str, component: str): + context.test.stop_component_through_controller(component, minifi_container_name) + + +@when(u'the {component} component is stopped through MiNiFi controller') +def step_impl(context, component: str): + context.execute_steps("""when the {component} component is stopped through MiNiFi controller in the \"{minifi_container_name}\" flow""".format(component=component, minifi_container_name="minifi-cpp-flow")) + + +@when(u'the {component} component is started through MiNiFi controller in the \"{minifi_container_name}\" flow') +def step_impl(context, minifi_container_name: str, component: str): + context.test.start_component_through_controller(component, minifi_container_name) + + +@when(u'the {component} component is started through MiNiFi controller') +def step_impl(context, component: str): + context.execute_steps("""when the {component} component is started through MiNiFi controller in the \"{minifi_container_name}\" flow""".format(component=component, minifi_container_name="minifi-cpp-flow")) + + +@then(u'the {component} component is not running in the \"{minifi_container_name}\" flow') +def step_impl(context, component: str, minifi_container_name: str): + context.test.check_component_not_running_through_controller(component, minifi_container_name) + + +@then(u'the {component} component is not running') +def step_impl(context, component: str): + context.execute_steps("""then the {component} component is not running in the \"{minifi_container_name}\" flow""".format(component=component, minifi_container_name="minifi-cpp-flow")) + + +@then(u'the {component} component is running in the \"{minifi_container_name}\" flow') +def step_impl(context, component: str, minifi_container_name: str): + context.test.check_component_running_through_controller(component, minifi_container_name) + + +@then(u'the {component} component is running') +def step_impl(context, component: str): + context.execute_steps("""then the {component} component is running in the \"{minifi_container_name}\" flow""".format(component=component, minifi_container_name="minifi-cpp-flow")) + + +@then(u'connection \"{connection}\" can be seen through MiNiFi controller in the \"{minifi_container_name}\" flow') +def step_impl(context, connection: str, minifi_container_name: str): + context.test.connection_found_through_controller(connection, minifi_container_name) + + +@then(u'connection \"{connection}\" can be seen through MiNiFi controller') +def step_impl(context, connection: str): + context.execute_steps("""then connection \"{connection}\" can be seen through MiNiFi controller in the \"{minifi_container_name}\" flow""".format(connection=connection, minifi_container_name="minifi-cpp-flow")) + + +@then(u'{connection_count:d} connections can be seen full through MiNiFi controller in the \"{minifi_container_name}\" flow') +def step_impl(context, connection_count: int, minifi_container_name: str): + context.test.check_connections_full_through_controller(connection_count, minifi_container_name) + + +@then(u'{connection_count:d} connections can be seen full through MiNiFi controller') +def step_impl(context, connection_count: int): + context.execute_steps("""then {connection_count:d} connections can be seen full through MiNiFi controller in the \"{minifi_container_name}\" flow""".format(connection_count=connection_count, minifi_container_name="minifi-cpp-flow")) + + +@then(u'connection \"{connection}\" has {size:d} size and {max_size:d} max size through MiNiFi controller in the \"{minifi_container_name}\" flow') +def step_impl(context, connection: str, size: int, max_size: int, minifi_container_name: str): + context.test.check_connection_size_through_controller(connection, size, max_size, minifi_container_name) + + +@then(u'connection \"{connection}\" has {size:d} size and {max_size:d} max size through MiNiFi controller') +def step_impl(context, connection: str, size: int, max_size: int): + context.execute_steps("""then connection \"{connection}\" has {size:d} size and {max_size:d} max size through MiNiFi controller in the \"{minifi_container_name}\" flow""".format(connection=connection, size=size, max_size=max_size, minifi_container_name="minifi-cpp-flow")) + + +@then(u'manifest can be retrieved through MiNiFi controller in the \"{minifi_container_name}\" flow') +def step_impl(context, minifi_container_name: str): + context.test.manifest_can_be_retrieved_through_minifi_controller(minifi_container_name) + + +@then(u'manifest can be retrieved through MiNiFi controller') +def step_impl(context): + context.execute_steps("""then manifest can be retrieved through MiNiFi controller in the \"{minifi_container_name}\" flow""".format(minifi_container_name="minifi-cpp-flow")) diff --git a/docker/test/integration/utils.py b/docker/test/integration/utils.py index 07e388431..15db614d2 100644 --- a/docker/test/integration/utils.py +++ b/docker/test/integration/utils.py @@ -25,13 +25,10 @@ def retry_check(max_tries=5, retry_interval=1): def retry_check_func(func): @functools.wraps(func) def retry_wrapper(*args, **kwargs): - current_retry_count = 0 - while current_retry_count < max_tries: - if not func(*args, **kwargs): - current_retry_count += 1 - time.sleep(retry_interval) - else: + for i in range(max_tries): + if func(*args, **kwargs): return True + time.sleep(retry_interval) return False return retry_wrapper return retry_check_func diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h index 60a8832e8..dbd0844f0 100644 --- a/libminifi/include/FlowController.h +++ b/libminifi/include/FlowController.h @@ -89,7 +89,7 @@ class FlowController : public core::controller::ForwardingControllerServiceProvi } bool isRunning() const override { - return running_.load() || updating_.load(); + return running_.load() || updating_.isUpdating(); } virtual bool isInitialized() { @@ -149,6 +149,18 @@ class FlowController : public core::controller::ForwardingControllerServiceProvi std::map<std::string, std::unique_ptr<io::InputStream>> getDebugInfo() override; private: + class UpdateState { + public: + bool isUpdating() const { return update_block_count_ > 0; } + void beginUpdate() { ++update_block_count_; } + void endUpdate() { --update_block_count_; } + void lock() { beginUpdate(); } + void unlock() { endUpdate(); } + + private: + std::atomic<uint32_t> update_block_count_; + }; + /** * Loads the flow as specified in the flow config file or if not present * tries to fetch it from the C2 server (if enabled). @@ -171,7 +183,7 @@ class FlowController : public core::controller::ForwardingControllerServiceProvi std::recursive_mutex mutex_; std::atomic<bool> running_; - std::atomic<bool> updating_; + UpdateState updating_; std::atomic<bool> initialized_; std::unique_ptr<TimerDrivenSchedulingAgent> timer_scheduler_; std::unique_ptr<EventDrivenSchedulingAgent> event_scheduler_; diff --git a/libminifi/include/c2/ControllerSocketProtocol.h b/libminifi/include/c2/ControllerSocketProtocol.h index e4c7b23cd..67bb47ef3 100644 --- a/libminifi/include/c2/ControllerSocketProtocol.h +++ b/libminifi/include/c2/ControllerSocketProtocol.h @@ -18,6 +18,10 @@ #include <memory> #include <string> +#include <condition_variable> +#include <thread> +#include <mutex> +#include <atomic> #include "io/StreamFactory.h" #include "io/BaseStream.h" @@ -26,6 +30,7 @@ #include "core/state/nodes/StateMonitor.h" #include "core/controller/ControllerServiceProvider.h" #include "ControllerSocketReporter.h" +#include "utils/MinifiConcurrentQueue.h" namespace org::apache::nifi::minifi::c2 { @@ -61,6 +66,43 @@ class ControllerSocketProtocol { std::weak_ptr<ControllerSocketReporter> controller_socket_reporter_; std::shared_ptr<Configure> configuration_; std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<ControllerSocketProtocol>::getLogger(); + std::mutex initialization_mutex_; + + // Some commands need to restart the controller socket to reinitialize the socket with new data for example new SSL data in case of a flow update + // These commands are handled on a separate thread, and while these commands are handled other incoming commands are dropped + class SocketRestartCommandProcessor { + public: + explicit SocketRestartCommandProcessor(state::StateMonitor& update_sink_); + ~SocketRestartCommandProcessor(); + + enum class Command { + FLOW_UPDATE, + START + }; + + struct CommandData { + Command command; + std::string data; + }; + + void enqueue(const CommandData& command_data) { + is_socket_restarting_ = true; + command_queue_.enqueue(command_data); + } + + bool isSocketRestarting() const { + return is_socket_restarting_; + } + + private: + mutable std::atomic_bool is_socket_restarting_ = false; + state::StateMonitor& update_sink_; + std::thread command_processor_thread_; + std::atomic_bool running_ = true; + utils::ConditionConcurrentQueue<CommandData> command_queue_; + }; + + SocketRestartCommandProcessor socket_restart_processor_; }; } // namespace org::apache::nifi::minifi::c2 diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp index 896dd88d9..082ac92dd 100644 --- a/libminifi/src/FlowController.cpp +++ b/libminifi/src/FlowController.cpp @@ -53,7 +53,6 @@ FlowController::FlowController(std::shared_ptr<core::Repository> provenance_repo std::shared_ptr<utils::file::FileSystem> filesystem, std::function<void()> request_restart) : core::controller::ForwardingControllerServiceProvider(core::getClassName<FlowController>()), running_(false), - updating_(false), initialized_(false), thread_pool_(5, false, nullptr, "Flowcontroller threadpool"), configuration_(std::move(configure)), @@ -128,10 +127,9 @@ bool FlowController::applyConfiguration(const std::string &source, const std::st logger_->log_info("Starting to reload Flow Controller with flow control name %s, version %d", newRoot->getName(), newRoot->getVersion()); - updating_ = true; bool started = false; - { + std::scoped_lock<UpdateState> update_lock(updating_); std::lock_guard<std::recursive_mutex> flow_lock(mutex_); stop(); @@ -155,8 +153,6 @@ bool FlowController::applyConfiguration(const std::string &source, const std::st } } - updating_ = false; - if (started) { auto flowVersion = flow_configuration_->getFlowVersion(); if (flowVersion) { @@ -193,7 +189,6 @@ int16_t FlowController::stop() { this->content_repo_->stop(); // stop the ControllerServices disableAllControllerServices(); - initialized_ = false; running_ = false; } return 0; @@ -265,14 +260,17 @@ void FlowController::load(bool reload) { io::NetworkPrioritizerFactory::getInstance()->clearPrioritizer(); } - if (!root_wrapper_.initialized()) { - logger_->log_info("Instantiating new flow"); - root_wrapper_.setNewRoot(loadInitialFlow()); - if (c2_agent_ && !c2_agent_->isControllerRunning()) { - // TODO(lordgamez): this initialization configures the C2 sender protocol (e.g. RESTSender) which may contain an SSL Context service from the flow config - // for SSL communication. This service may change on flow update and we should take care of the SSL Context Service change in the C2 Agent. - c2_agent_->initialize(this, this, this); - c2_agent_->start(); + { + std::scoped_lock<UpdateState> update_lock(updating_); + if (!root_wrapper_.initialized()) { + logger_->log_info("Instantiating new flow"); + root_wrapper_.setNewRoot(loadInitialFlow()); + if (c2_agent_ && !c2_agent_->isControllerRunning()) { + // TODO(lordgamez): this initialization configures the C2 sender protocol (e.g. RESTSender) which may contain an SSL Context service from the flow config + // for SSL communication. This service may change on flow update and we should take care of the SSL Context Service change in the C2 Agent. + c2_agent_->initialize(this, this, this); + c2_agent_->start(); + } } } @@ -333,7 +331,7 @@ int16_t FlowController::start() { logger_->log_info("Starting Flow Controller"); enableAllControllerServices(); if (controller_socket_protocol_) { - // Initialization is postponed after initializing the flow so the controller socket may load the SSL context defined in the flow configuration + // Initialization is postponed after controller services are enabled so the controller socket may load the SSL context defined in the flow configuration controller_socket_protocol_->initialize(); } timer_scheduler_->start(); @@ -403,7 +401,7 @@ int16_t FlowController::clearConnection(const std::string &connection) { } void FlowController::executeOnAllComponents(std::function<void(state::StateController&)> func) { - if (updating_ || !initialized_) { + if (updating_.isUpdating()) { return; } std::lock_guard<std::recursive_mutex> lock(mutex_); @@ -413,7 +411,7 @@ void FlowController::executeOnAllComponents(std::function<void(state::StateContr } void FlowController::executeOnComponent(const std::string &id_or_name, std::function<void(state::StateController&)> func) { - if (updating_ || !initialized_) { + if (updating_.isUpdating()) { return; } std::lock_guard<std::recursive_mutex> lock(mutex_); diff --git a/libminifi/src/c2/ControllerSocketProtocol.cpp b/libminifi/src/c2/ControllerSocketProtocol.cpp index c715c43a3..b64597a83 100644 --- a/libminifi/src/c2/ControllerSocketProtocol.cpp +++ b/libminifi/src/c2/ControllerSocketProtocol.cpp @@ -30,17 +30,47 @@ namespace org::apache::nifi::minifi::c2 { +ControllerSocketProtocol::SocketRestartCommandProcessor::SocketRestartCommandProcessor(state::StateMonitor& update_sink) : + update_sink_(update_sink) { + command_queue_.start(); + command_processor_thread_ = std::thread([this] { + while (running_) { + CommandData command_data; + if (command_queue_.dequeueWait(command_data)) { + if (command_data.command == Command::FLOW_UPDATE) { + update_sink_.applyUpdate("ControllerSocketProtocol", command_data.data, true); + } else if (command_data.command == Command::START) { + update_sink_.executeOnComponent(command_data.data, [](state::StateController& component) { + component.start(); + }); + } + } + is_socket_restarting_ = false; + } + }); +} + +ControllerSocketProtocol::SocketRestartCommandProcessor::~SocketRestartCommandProcessor() { + running_ = false; + command_queue_.stop(); + if (command_processor_thread_.joinable()) { + command_processor_thread_.join(); + } +} + ControllerSocketProtocol::ControllerSocketProtocol(core::controller::ControllerServiceProvider& controller, state::StateMonitor& update_sink, std::shared_ptr<Configure> configuration, const std::shared_ptr<ControllerSocketReporter>& controller_socket_reporter) : controller_(controller), update_sink_(update_sink), controller_socket_reporter_(controller_socket_reporter), - configuration_(std::move(configuration)) { + configuration_(std::move(configuration)), + socket_restart_processor_(update_sink_) { gsl_Expects(configuration_); stream_factory_ = minifi::io::StreamFactory::getInstance(configuration_); } void ControllerSocketProtocol::initialize() { + std::unique_lock<std::mutex> lock(initialization_mutex_); std::shared_ptr<minifi::controllers::SSLContextService> secure_context; std::string context_name; if (configuration_->get(Configure::controller_ssl_context_service, context_name)) { @@ -100,9 +130,14 @@ void ControllerSocketProtocol::handleStart(io::BaseStream *stream) { std::string component_str; const auto size = stream->read(component_str); if (!io::isError(size)) { - update_sink_.executeOnComponent(component_str, [](state::StateController& component) { - component.start(); - }); + if (component_str == "FlowController") { + // Starting flow controller resets socket + socket_restart_processor_.enqueue({SocketRestartCommandProcessor::Command::START, component_str}); + } else { + update_sink_.executeOnComponent(component_str, [](state::StateController& component) { + component.start(); + }); + } } else { logger_->log_debug("Connection broke"); } @@ -149,7 +184,7 @@ void ControllerSocketProtocol::handleUpdate(io::BaseStream *stream) { std::ifstream tf(ff_loc); std::string flow_configuration((std::istreambuf_iterator<char>(tf)), std::istreambuf_iterator<char>()); - update_sink_.applyUpdate("ControllerSocketProtocol", flow_configuration); + socket_restart_processor_.enqueue({SocketRestartCommandProcessor::Command::FLOW_UPDATE, flow_configuration}); } } @@ -179,7 +214,7 @@ void ControllerSocketProtocol::writeQueueSizesResponse(io::BaseStream *stream) { void ControllerSocketProtocol::writeComponentsResponse(io::BaseStream *stream) { std::vector<std::pair<std::string, bool>> components; - update_sink_.executeOnAllComponents([&components](state::StateController& component){ + update_sink_.executeOnAllComponents([&components](state::StateController& component) { components.emplace_back(component.getComponentName(), component.isRunning()); }); io::BufferStream resp; @@ -296,6 +331,12 @@ void ControllerSocketProtocol::handleCommand(io::BaseStream *stream) { logger_->log_debug("Connection broke"); return; } + + if (socket_restart_processor_.isSocketRestarting()) { + logger_->log_debug("Socket restarting, dropping command"); + return; + } + switch (head) { case Operation::START: handleStart(stream); diff --git a/libminifi/src/core/ProcessGroup.cpp b/libminifi/src/core/ProcessGroup.cpp index 509c0542d..942cefdf4 100644 --- a/libminifi/src/core/ProcessGroup.cpp +++ b/libminifi/src/core/ProcessGroup.cpp @@ -133,6 +133,7 @@ void ProcessGroup::startProcessingProcessors(TimerDrivenSchedulingAgent& timeSch for (const auto processor : failed_processors_) { try { logger_->log_debug("Starting %s", processor->getName()); + processor->setScheduledState(core::ScheduledState::RUNNING); switch (processor->getSchedulingStrategy()) { case TIMER_DRIVEN: timeScheduler.schedule(processor); @@ -147,10 +148,12 @@ void ProcessGroup::startProcessingProcessors(TimerDrivenSchedulingAgent& timeSch } catch (const std::exception &e) { logger_->log_error("Failed to start processor %s (%s): %s", processor->getUUIDStr(), processor->getName(), e.what()); + processor->setScheduledState(core::ScheduledState::STOPPED); failed_processors.insert(processor); } catch (...) { logger_->log_error("Failed to start processor %s (%s)", processor->getUUIDStr(), processor->getName()); + processor->setScheduledState(core::ScheduledState::STOPPED); failed_processors.insert(processor); } }
