This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 1a71c3563c9d51ad6c756f54c6d46aa9750ebe73
Author: Gabor Gyimesi <[email protected]>
AuthorDate: Mon Apr 3 17:46:18 2023 +0200

    MINIFICPP-2087 Fix deadlock when fetching initial flow from C2 url
    
    Closes #1546
    Signed-off-by: Marton Szasz <[email protected]>
---
 docker/test/integration/MiNiFi_integration_test_driver.py    |  8 ++++++++
 docker/test/integration/cluster/ContainerStore.py            |  3 +++
 docker/test/integration/cluster/DockerTestCluster.py         |  3 +++
 .../test/integration/cluster/containers/MinifiContainer.py   | 12 ++++++++++--
 docker/test/integration/features/minifi_c2_server.feature    |  9 +++++++++
 docker/test/integration/steps/steps.py                       | 11 +++++++++++
 libminifi/src/FlowController.cpp                             |  5 +++--
 7 files changed, 47 insertions(+), 4 deletions(-)

diff --git a/docker/test/integration/MiNiFi_integration_test_driver.py 
b/docker/test/integration/MiNiFi_integration_test_driver.py
index 34dbfb502..56ce946cb 100644
--- a/docker/test/integration/MiNiFi_integration_test_driver.py
+++ b/docker/test/integration/MiNiFi_integration_test_driver.py
@@ -77,6 +77,11 @@ class MiNiFi_integration_test:
         self.cluster.deploy_container('opensearch')
         assert self.cluster.wait_for_container_startup_to_finish('opensearch')
 
+    def start_minifi_c2_server(self):
+        self.cluster.acquire_container("minifi-c2-server", "minifi-c2-server")
+        self.cluster.deploy_container('minifi-c2-server')
+        assert 
self.cluster.wait_for_container_startup_to_finish('minifi-c2-server')
+
     def start(self, container_name=None):
         if container_name is not None:
             logging.info("Starting container %s", container_name)
@@ -310,6 +315,9 @@ class MiNiFi_integration_test:
     def enable_c2_with_ssl_in_minifi(self):
         self.cluster.enable_c2_with_ssl_in_minifi()
 
+    def fetch_flow_config_from_c2_url_in_minifi(self):
+        self.cluster.fetch_flow_config_from_c2_url_in_minifi()
+
     def enable_prometheus_in_minifi(self):
         self.cluster.enable_prometheus_in_minifi()
 
diff --git a/docker/test/integration/cluster/ContainerStore.py 
b/docker/test/integration/cluster/ContainerStore.py
index 1f7126c41..8a2e3d847 100644
--- a/docker/test/integration/cluster/ContainerStore.py
+++ b/docker/test/integration/cluster/ContainerStore.py
@@ -151,6 +151,9 @@ class ContainerStore:
     def enable_c2_with_ssl_in_minifi(self):
         self.minifi_options.enable_c2_with_ssl = True
 
+    def fetch_flow_config_from_c2_url_in_minifi(self):
+        self.minifi_options.use_flow_config_from_url = True
+
     def enable_prometheus_in_minifi(self):
         self.minifi_options.enable_prometheus = True
 
diff --git a/docker/test/integration/cluster/DockerTestCluster.py 
b/docker/test/integration/cluster/DockerTestCluster.py
index d62d56a58..c71e04af5 100644
--- a/docker/test/integration/cluster/DockerTestCluster.py
+++ b/docker/test/integration/cluster/DockerTestCluster.py
@@ -76,6 +76,9 @@ class DockerTestCluster:
     def enable_c2_with_ssl_in_minifi(self):
         self.container_store.enable_c2_with_ssl_in_minifi()
 
+    def fetch_flow_config_from_c2_url_in_minifi(self):
+        self.container_store.fetch_flow_config_from_c2_url_in_minifi()
+
     def enable_prometheus_in_minifi(self):
         self.container_store.enable_prometheus_in_minifi()
 
diff --git a/docker/test/integration/cluster/containers/MinifiContainer.py 
b/docker/test/integration/cluster/containers/MinifiContainer.py
index 409ad6f60..def15df26 100644
--- a/docker/test/integration/cluster/containers/MinifiContainer.py
+++ b/docker/test/integration/cluster/containers/MinifiContainer.py
@@ -32,6 +32,7 @@ class MinifiOptions:
         self.enable_prometheus = False
         self.enable_sql = False
         self.config_format = "json"
+        self.use_flow_config_from_url = False
 
 
 class MinifiContainer(FlowContainer):
@@ -102,12 +103,16 @@ class MinifiContainer(FlowContainer):
                 
f.write("nifi.metrics.publisher.PrometheusMetricsPublisher.port=9936\n")
                 
f.write("nifi.metrics.publisher.metrics=RepositoryMetrics,QueueMetrics,PutFileMetrics,processorMetrics/Get.*,FlowInformation,DeviceInfoNode,AgentStatus\n")
 
+            if self.options.use_flow_config_from_url:
+                
f.write("nifi.c2.flow.url=http://minifi-c2-server:10090/c2/config?class=minifi-test-class\n";)
+
     def _setup_config(self):
-        self._create_config()
         self._create_properties()
+        if not self.options.use_flow_config_from_url:
+            self._create_config()
+            self.vols[os.path.join(self.container_specific_config_dir, 
'config.yml')] = {"bind": os.path.join(MinifiContainer.MINIFI_ROOT, 'conf', 
'config.yml'), "mode": "rw"}
 
         self.vols[os.path.join(self.container_specific_config_dir, 
'minifi.properties')] = {"bind": os.path.join(MinifiContainer.MINIFI_ROOT, 
'conf', 'minifi.properties'), "mode": "rw"}
-        self.vols[os.path.join(self.container_specific_config_dir, 
'config.yml')] = {"bind": os.path.join(MinifiContainer.MINIFI_ROOT, 'conf', 
'config.yml'), "mode": "rw"}
         self.vols[os.path.join(self.container_specific_config_dir, 
'minifi-log.properties')] = {"bind": os.path.join(MinifiContainer.MINIFI_ROOT, 
'conf', 'minifi-log.properties'), "mode": "rw"}
 
     def deploy(self):
@@ -122,6 +127,9 @@ class MinifiContainer(FlowContainer):
         else:
             image = 'apacheminificpp:' + MinifiContainer.MINIFI_TAG_PREFIX + 
MinifiContainer.MINIFI_VERSION
 
+        if self.options.use_flow_config_from_url:
+            self.command = ["/bin/sh", "-c", "rm " + 
MinifiContainer.MINIFI_ROOT + "/conf/config.yml && ./bin/minifi.sh run"]
+
         self.client.containers.run(
             image,
             detach=True,
diff --git a/docker/test/integration/features/minifi_c2_server.feature 
b/docker/test/integration/features/minifi_c2_server.feature
index 2a32260f2..d4e7e11bf 100644
--- a/docker/test/integration/features/minifi_c2_server.feature
+++ b/docker/test/integration/features/minifi_c2_server.feature
@@ -34,3 +34,12 @@ Feature: MiNiFi can communicate with Apache NiFi MiNiFi C2 
server
     When all instances start up
     Then the MiNiFi C2 SSL server logs contain the following message: 
"acknowledged with a state of FULLY_APPLIED(DONE)" in less than 60 seconds
     And a flowfile with the content "test" is placed in the monitored 
directory in less than 10 seconds
+
+  Scenario: MiNiFi can get flow config from C2 server through flow url when it 
is not available at start
+    Given flow configuration path is set up in flow url property
+    And C2 is enabled in MiNiFi
+    And a file with the content "test" is present in "/tmp/input"
+    And a MiNiFi C2 server is started
+    When all instances start up
+    Then the MiNiFi C2 server logs contain the following message: 
"acknowledged with a state of FULLY_APPLIED(DONE)" in less than 30 seconds
+    And a flowfile with the content "test" is placed in the monitored 
directory in less than 10 seconds
diff --git a/docker/test/integration/steps/steps.py 
b/docker/test/integration/steps/steps.py
index 9dce360d7..bcc78ffcc 100644
--- a/docker/test/integration/steps/steps.py
+++ b/docker/test/integration/steps/steps.py
@@ -1013,6 +1013,11 @@ def step_impl(context):
     context.test.acquire_container("minifi-c2-server", "minifi-c2-server")
 
 
+@given(u'a MiNiFi C2 server is started')
+def step_impl(context):
+    context.test.start_minifi_c2_server()
+
+
 @then("the MiNiFi C2 server logs contain the following message: 
\"{log_message}\" in less than {duration}")
 def step_impl(context, log_message, duration):
     context.test.check_container_log_contents("minifi-c2-server", log_message, 
humanfriendly.parse_timespan(duration))
@@ -1028,6 +1033,12 @@ def step_impl(context):
     context.test.acquire_container("minifi-c2-server", "minifi-c2-server-ssl")
 
 
+@given(u'flow configuration path is set up in flow url property')
+def step_impl(context):
+    context.test.acquire_container("minifi-cpp-flow", "minifi-cpp")
+    context.test.fetch_flow_config_from_c2_url_in_minifi()
+
+
 # MiNiFi memory usage
 @then(u'the peak memory usage of the agent is more than {size} in less than 
{duration}')
 def step_impl(context, size: str, duration: str) -> None:
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index f8efae4a0..a40b00411 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -152,6 +152,7 @@ bool FlowController::applyConfiguration(const std::string 
&source, const std::st
     auto flowVersion = flow_configuration_->getFlowVersion();
     if (flowVersion) {
       logger_->log_debug("Setting flow id to %s", flowVersion->getFlowId());
+      logger_->log_debug("Setting flow url to %s", 
flowVersion->getFlowIdentifier()->getRegistryUrl());
       configuration_->set(Configure::nifi_c2_flow_id, 
flowVersion->getFlowId());
       configuration_->set(Configure::nifi_c2_flow_url, 
flowVersion->getFlowIdentifier()->getRegistryUrl());
     } else {
@@ -389,7 +390,7 @@ int16_t FlowController::clearConnection(const std::string 
&connection) {
 }
 
 void 
FlowController::executeOnAllComponents(std::function<void(state::StateController&)>
 func) {
-  if (updating_) {
+  if (updating_ || !initialized_) {
     return;
   }
   std::lock_guard<std::recursive_mutex> lock(mutex_);
@@ -399,7 +400,7 @@ void 
FlowController::executeOnAllComponents(std::function<void(state::StateContr
 }
 
 void FlowController::executeOnComponent(const std::string &id_or_name, 
std::function<void(state::StateController&)> func) {
-  if (updating_) {
+  if (updating_ || !initialized_) {
     return;
   }
   std::lock_guard<std::recursive_mutex> lock(mutex_);

Reply via email to