This is an automated email from the ASF dual-hosted git repository. szaszm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 1a71c3563c9d51ad6c756f54c6d46aa9750ebe73 Author: Gabor Gyimesi <[email protected]> AuthorDate: Mon Apr 3 17:46:18 2023 +0200 MINIFICPP-2087 Fix deadlock when fetching initial flow from C2 url Closes #1546 Signed-off-by: Marton Szasz <[email protected]> --- docker/test/integration/MiNiFi_integration_test_driver.py | 8 ++++++++ docker/test/integration/cluster/ContainerStore.py | 3 +++ docker/test/integration/cluster/DockerTestCluster.py | 3 +++ .../test/integration/cluster/containers/MinifiContainer.py | 12 ++++++++++-- docker/test/integration/features/minifi_c2_server.feature | 9 +++++++++ docker/test/integration/steps/steps.py | 11 +++++++++++ libminifi/src/FlowController.cpp | 5 +++-- 7 files changed, 47 insertions(+), 4 deletions(-) diff --git a/docker/test/integration/MiNiFi_integration_test_driver.py b/docker/test/integration/MiNiFi_integration_test_driver.py index 34dbfb502..56ce946cb 100644 --- a/docker/test/integration/MiNiFi_integration_test_driver.py +++ b/docker/test/integration/MiNiFi_integration_test_driver.py @@ -77,6 +77,11 @@ class MiNiFi_integration_test: self.cluster.deploy_container('opensearch') assert self.cluster.wait_for_container_startup_to_finish('opensearch') + def start_minifi_c2_server(self): + self.cluster.acquire_container("minifi-c2-server", "minifi-c2-server") + self.cluster.deploy_container('minifi-c2-server') + assert self.cluster.wait_for_container_startup_to_finish('minifi-c2-server') + def start(self, container_name=None): if container_name is not None: logging.info("Starting container %s", container_name) @@ -310,6 +315,9 @@ class MiNiFi_integration_test: def enable_c2_with_ssl_in_minifi(self): self.cluster.enable_c2_with_ssl_in_minifi() + def fetch_flow_config_from_c2_url_in_minifi(self): + self.cluster.fetch_flow_config_from_c2_url_in_minifi() + def enable_prometheus_in_minifi(self): self.cluster.enable_prometheus_in_minifi() diff --git a/docker/test/integration/cluster/ContainerStore.py b/docker/test/integration/cluster/ContainerStore.py index 1f7126c41..8a2e3d847 100644 --- a/docker/test/integration/cluster/ContainerStore.py +++ b/docker/test/integration/cluster/ContainerStore.py @@ -151,6 +151,9 @@ class ContainerStore: def enable_c2_with_ssl_in_minifi(self): self.minifi_options.enable_c2_with_ssl = True + def fetch_flow_config_from_c2_url_in_minifi(self): + self.minifi_options.use_flow_config_from_url = True + def enable_prometheus_in_minifi(self): self.minifi_options.enable_prometheus = True diff --git a/docker/test/integration/cluster/DockerTestCluster.py b/docker/test/integration/cluster/DockerTestCluster.py index d62d56a58..c71e04af5 100644 --- a/docker/test/integration/cluster/DockerTestCluster.py +++ b/docker/test/integration/cluster/DockerTestCluster.py @@ -76,6 +76,9 @@ class DockerTestCluster: def enable_c2_with_ssl_in_minifi(self): self.container_store.enable_c2_with_ssl_in_minifi() + def fetch_flow_config_from_c2_url_in_minifi(self): + self.container_store.fetch_flow_config_from_c2_url_in_minifi() + def enable_prometheus_in_minifi(self): self.container_store.enable_prometheus_in_minifi() diff --git a/docker/test/integration/cluster/containers/MinifiContainer.py b/docker/test/integration/cluster/containers/MinifiContainer.py index 409ad6f60..def15df26 100644 --- a/docker/test/integration/cluster/containers/MinifiContainer.py +++ b/docker/test/integration/cluster/containers/MinifiContainer.py @@ -32,6 +32,7 @@ class MinifiOptions: self.enable_prometheus = False self.enable_sql = False self.config_format = "json" + self.use_flow_config_from_url = False class MinifiContainer(FlowContainer): @@ -102,12 +103,16 @@ class MinifiContainer(FlowContainer): f.write("nifi.metrics.publisher.PrometheusMetricsPublisher.port=9936\n") f.write("nifi.metrics.publisher.metrics=RepositoryMetrics,QueueMetrics,PutFileMetrics,processorMetrics/Get.*,FlowInformation,DeviceInfoNode,AgentStatus\n") + if self.options.use_flow_config_from_url: + f.write("nifi.c2.flow.url=http://minifi-c2-server:10090/c2/config?class=minifi-test-class\n") + def _setup_config(self): - self._create_config() self._create_properties() + if not self.options.use_flow_config_from_url: + self._create_config() + self.vols[os.path.join(self.container_specific_config_dir, 'config.yml')] = {"bind": os.path.join(MinifiContainer.MINIFI_ROOT, 'conf', 'config.yml'), "mode": "rw"} self.vols[os.path.join(self.container_specific_config_dir, 'minifi.properties')] = {"bind": os.path.join(MinifiContainer.MINIFI_ROOT, 'conf', 'minifi.properties'), "mode": "rw"} - self.vols[os.path.join(self.container_specific_config_dir, 'config.yml')] = {"bind": os.path.join(MinifiContainer.MINIFI_ROOT, 'conf', 'config.yml'), "mode": "rw"} self.vols[os.path.join(self.container_specific_config_dir, 'minifi-log.properties')] = {"bind": os.path.join(MinifiContainer.MINIFI_ROOT, 'conf', 'minifi-log.properties'), "mode": "rw"} def deploy(self): @@ -122,6 +127,9 @@ class MinifiContainer(FlowContainer): else: image = 'apacheminificpp:' + MinifiContainer.MINIFI_TAG_PREFIX + MinifiContainer.MINIFI_VERSION + if self.options.use_flow_config_from_url: + self.command = ["/bin/sh", "-c", "rm " + MinifiContainer.MINIFI_ROOT + "/conf/config.yml && ./bin/minifi.sh run"] + self.client.containers.run( image, detach=True, diff --git a/docker/test/integration/features/minifi_c2_server.feature b/docker/test/integration/features/minifi_c2_server.feature index 2a32260f2..d4e7e11bf 100644 --- a/docker/test/integration/features/minifi_c2_server.feature +++ b/docker/test/integration/features/minifi_c2_server.feature @@ -34,3 +34,12 @@ Feature: MiNiFi can communicate with Apache NiFi MiNiFi C2 server When all instances start up Then the MiNiFi C2 SSL server logs contain the following message: "acknowledged with a state of FULLY_APPLIED(DONE)" in less than 60 seconds And a flowfile with the content "test" is placed in the monitored directory in less than 10 seconds + + Scenario: MiNiFi can get flow config from C2 server through flow url when it is not available at start + Given flow configuration path is set up in flow url property + And C2 is enabled in MiNiFi + And a file with the content "test" is present in "/tmp/input" + And a MiNiFi C2 server is started + When all instances start up + Then the MiNiFi C2 server logs contain the following message: "acknowledged with a state of FULLY_APPLIED(DONE)" in less than 30 seconds + And a flowfile with the content "test" is placed in the monitored directory in less than 10 seconds diff --git a/docker/test/integration/steps/steps.py b/docker/test/integration/steps/steps.py index 9dce360d7..bcc78ffcc 100644 --- a/docker/test/integration/steps/steps.py +++ b/docker/test/integration/steps/steps.py @@ -1013,6 +1013,11 @@ def step_impl(context): context.test.acquire_container("minifi-c2-server", "minifi-c2-server") +@given(u'a MiNiFi C2 server is started') +def step_impl(context): + context.test.start_minifi_c2_server() + + @then("the MiNiFi C2 server logs contain the following message: \"{log_message}\" in less than {duration}") def step_impl(context, log_message, duration): context.test.check_container_log_contents("minifi-c2-server", log_message, humanfriendly.parse_timespan(duration)) @@ -1028,6 +1033,12 @@ def step_impl(context): context.test.acquire_container("minifi-c2-server", "minifi-c2-server-ssl") +@given(u'flow configuration path is set up in flow url property') +def step_impl(context): + context.test.acquire_container("minifi-cpp-flow", "minifi-cpp") + context.test.fetch_flow_config_from_c2_url_in_minifi() + + # MiNiFi memory usage @then(u'the peak memory usage of the agent is more than {size} in less than {duration}') def step_impl(context, size: str, duration: str) -> None: diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp index f8efae4a0..a40b00411 100644 --- a/libminifi/src/FlowController.cpp +++ b/libminifi/src/FlowController.cpp @@ -152,6 +152,7 @@ bool FlowController::applyConfiguration(const std::string &source, const std::st auto flowVersion = flow_configuration_->getFlowVersion(); if (flowVersion) { logger_->log_debug("Setting flow id to %s", flowVersion->getFlowId()); + logger_->log_debug("Setting flow url to %s", flowVersion->getFlowIdentifier()->getRegistryUrl()); configuration_->set(Configure::nifi_c2_flow_id, flowVersion->getFlowId()); configuration_->set(Configure::nifi_c2_flow_url, flowVersion->getFlowIdentifier()->getRegistryUrl()); } else { @@ -389,7 +390,7 @@ int16_t FlowController::clearConnection(const std::string &connection) { } void FlowController::executeOnAllComponents(std::function<void(state::StateController&)> func) { - if (updating_) { + if (updating_ || !initialized_) { return; } std::lock_guard<std::recursive_mutex> lock(mutex_); @@ -399,7 +400,7 @@ void FlowController::executeOnAllComponents(std::function<void(state::StateContr } void FlowController::executeOnComponent(const std::string &id_or_name, std::function<void(state::StateController&)> func) { - if (updating_) { + if (updating_ || !initialized_) { return; } std::lock_guard<std::recursive_mutex> lock(mutex_);
