This is an automated email from the ASF dual-hosted git repository.

lordgamez pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit e730451c933bf184796014fd538dd4e9ae8debc7
Author: Gabor Gyimesi <[email protected]>
AuthorDate: Tue Jan 20 17:18:25 2026 +0100

    MINIFICPP-2685 Move MQTT tests to modular docker tests
    
    Signed-off-by: Gabor Gyimesi <[email protected]>
    
    This closes #2091
---
 .../minifi_test_framework/containers/container.py  |  25 ++
 .../minifi_test_framework/steps/checking_steps.py  |  14 +-
 .../src/minifi_test_framework/steps/core_steps.py  |  46 ++-
 .../steps/flow_building_steps.py                   |  10 +-
 docker/RunBehaveTests.sh                           |   3 +-
 docker/requirements.txt                            |   1 -
 docker/test/integration/cluster/ContainerStore.py  |   9 -
 .../test/integration/cluster/DockerTestCluster.py  |   5 -
 docker/test/integration/cluster/ImageStore.py      |  11 -
 .../cluster/containers/MqttBrokerContainer.py      |  40 ---
 .../features/MiNiFi_integration_test_driver.py     |   3 -
 docker/test/integration/features/steps/steps.py    |  27 --
 .../integration/minifi/processors/ConsumeMQTT.py   |  30 --
 .../integration/minifi/processors/PublishMQTT.py   |  30 --
 .../features/steps/couchbase_server_container.py   |   4 +-
 .../tests/features/steps/grafana_loki_container.py |   4 +-
 extensions/mqtt/tests/features/environment.py      |  44 +++
 .../mqtt/tests}/features/mqtt.feature              | 398 ++++++++++++---------
 .../features/resources/publish_mqtt_message.py     |  20 +-
 .../tests/features/steps/mqtt_broker_container.py  |  63 ++++
 extensions/mqtt/tests/features/steps/steps.py      |  76 ++++
 21 files changed, 521 insertions(+), 342 deletions(-)

diff --git a/behave_framework/src/minifi_test_framework/containers/container.py 
b/behave_framework/src/minifi_test_framework/containers/container.py
index 8c59586f5..f3268d2dd 100644
--- a/behave_framework/src/minifi_test_framework/containers/container.py
+++ b/behave_framework/src/minifi_test_framework/containers/container.py
@@ -20,6 +20,7 @@ import os
 import shlex
 import tempfile
 import tarfile
+import uuid
 
 import docker
 from docker.models.networks import Network
@@ -50,6 +51,24 @@ class Container:
     def add_host_file(self, host_path: str, container_path: str, mode: str = 
"ro"):
         self.host_files.append(HostFile(container_path, host_path, mode))
 
+    def add_file_to_running_container(self, content: str, path: str):
+        if not self.container:
+            logging.error("Container is not running. Cannot add file.")
+            raise RuntimeError("Container is not running. Cannot add file.")
+
+        mkdir_command = f"mkdir -p {shlex.quote(path)}"
+        exit_code, output = self.exec_run(mkdir_command)
+        if exit_code != 0:
+            logging.error(f"Error creating directory '{path}' in container: 
{output}")
+            raise RuntimeError(f"Error creating directory '{path}' in 
container: {output}")
+
+        full_path = os.path.join(path, str(uuid.uuid4()))
+        pipe_command = f"printf %s {shlex.quote(content)} > 
{shlex.quote(full_path)}"
+        exit_code, output = self.exec_run(f"sh -c {shlex.quote(pipe_command)}")
+        if exit_code != 0:
+            logging.error(f"Error adding file to running container: {output}")
+            raise RuntimeError(f"Error adding file to running container: 
{output}")
+
     def _write_content_to_file(self, filepath: str, permissions: int | None, 
content: str | bytes):
         write_mode = "w"
         if isinstance(content, bytes):
@@ -74,7 +93,13 @@ class Container:
                 self._write_content_to_file(file_path, None, content)
             self.volumes[temp_path] = {"bind": directory.path, "mode": 
directory.mode}
 
+    def is_deployed(self) -> bool:
+        return self.container is not None
+
     def deploy(self) -> bool:
+        if self.is_deployed():
+            logging.info(f"Container '{self.container_name}' is already 
deployed.")
+            return True
         self._temp_dir = tempfile.TemporaryDirectory()
         self._configure_volumes_of_container_files()
         self._configure_volumes_of_container_dirs()
diff --git a/behave_framework/src/minifi_test_framework/steps/checking_steps.py 
b/behave_framework/src/minifi_test_framework/steps/checking_steps.py
index ef5b1a338..c6593c1bf 100644
--- a/behave_framework/src/minifi_test_framework/steps/checking_steps.py
+++ b/behave_framework/src/minifi_test_framework/steps/checking_steps.py
@@ -36,13 +36,18 @@ def step_impl(context: MinifiTestContext, content: str, 
path: str, duration: str
         timeout_seconds=timeout_in_seconds, bail_condition=lambda: 
context.containers[DEFAULT_MINIFI_CONTAINER_NAME].exited, context=context)
 
 
-@then('a single file with the content "{content}" is placed in the 
"{directory}" directory in less than {duration}')
-def step_impl(context: MinifiTestContext, content: str, directory: str, 
duration: str):
+@then('in the "{container_name}" container a single file with the content 
"{content}" is placed in the "{directory}" directory in less than {duration}')
+def step_impl(context: MinifiTestContext, container_name: str, content: str, 
directory: str, duration: str):
     new_content = content.replace("\\n", "\n")
     timeout_in_seconds = humanfriendly.parse_timespan(duration)
     assert wait_for_condition(
-        condition=lambda: 
context.containers[DEFAULT_MINIFI_CONTAINER_NAME].directory_has_single_file_with_content(directory,
 new_content),
-        timeout_seconds=timeout_in_seconds, bail_condition=lambda: 
context.containers[DEFAULT_MINIFI_CONTAINER_NAME].exited, context=context)
+        condition=lambda: 
context.containers[container_name].directory_has_single_file_with_content(directory,
 new_content),
+        timeout_seconds=timeout_in_seconds, bail_condition=lambda: 
context.containers[container_name].exited, context=context)
+
+
+@then('a single file with the content "{content}" is placed in the 
"{directory}" directory in less than {duration}')
+def step_impl(context: MinifiTestContext, content: str, directory: str, 
duration: str):
+    context.execute_steps(f'then in the "{DEFAULT_MINIFI_CONTAINER_NAME}" 
container a single file with the content "{content}" is placed in the 
"{directory}" directory in less than {duration}')
 
 
 @then('in the "{container_name}" container at least one file with the content 
"{content}" is placed in the "{directory}" directory in less than {duration}')
@@ -147,6 +152,7 @@ def step_impl(context: MinifiTestContext, directory: str, 
regex_str: str, durati
 
 
 @then('files with contents "{content_one}" and "{content_two}" are placed in 
the "{directory}" directory in less than {timeout}')
+@then("files with contents '{content_one}' and '{content_two}' are placed in 
the '{directory}' directory in less than {timeout}")
 def step_impl(context: MinifiTestContext, directory: str, timeout: str, 
content_one: str, content_two: str):
     timeout_seconds = humanfriendly.parse_timespan(timeout)
     c1 = content_one.replace("\\n", "\n")
diff --git a/behave_framework/src/minifi_test_framework/steps/core_steps.py 
b/behave_framework/src/minifi_test_framework/steps/core_steps.py
index 6b488587c..b332b7e46 100644
--- a/behave_framework/src/minifi_test_framework/steps/core_steps.py
+++ b/behave_framework/src/minifi_test_framework/steps/core_steps.py
@@ -61,10 +61,27 @@ def step_impl(context: MinifiTestContext, file_name: str, 
content: str, path: st
     
context.get_or_create_default_minifi_container().files.append(File(os.path.join(path,
 file_name), new_content))
 
 
-@step('a file with the content "{content}" is present in "{path}"')
+@given('a file with the content "{content}" is present in "{path}" in the 
"{container_name}" flow')
+def step_impl(context: MinifiTestContext, content: str, path: str, 
container_name: str):
+    new_content = content.replace("\\n", "\n")
+    
context.get_or_create_minifi_container(container_name).files.append(File(os.path.join(path,
 str(uuid.uuid4())), new_content))
+
+
+@given('a file with the content "{content}" is present in "{path}"')
+@given("a file with the content '{content}' is present in '{path}'")
 def step_impl(context: MinifiTestContext, content: str, path: str):
+    context.execute_steps(f"given a file with the content \"{content}\" is 
present in \"{path}\" in the \"{DEFAULT_MINIFI_CONTAINER_NAME}\" flow")
+
+
+@when('a file with the content "{content}" is placed in "{path}" in the 
"{container_name}" flow')
+def step_impl(context: MinifiTestContext, content: str, path: str, 
container_name: str):
     new_content = content.replace("\\n", "\n")
-    
context.get_or_create_default_minifi_container().files.append(File(os.path.join(path,
 str(uuid.uuid4())), new_content))
+    
context.containers[container_name].add_file_to_running_container(new_content, 
path)
+
+
+@when('a file with the content "{content}" is placed in "{path}"')
+def step_impl(context: MinifiTestContext, content: str, path: str):
+    context.execute_steps(f"when a file with the content \"{content}\" is 
placed in \"{path}\" in the \"{DEFAULT_MINIFI_CONTAINER_NAME}\" flow")
 
 
 @given("an empty file is present in \"{path}\"")
@@ -94,11 +111,31 @@ def step_impl(context: MinifiTestContext):
     context.get_or_create_default_minifi_container().stop()
 
 
+@when("the \"{container_name}\" flow is stopped")
+def step_impl(context: MinifiTestContext, container_name: str):
+    context.get_or_create_minifi_container(container_name).stop()
+
+
 @when("MiNiFi is restarted")
 def step_impl(context: MinifiTestContext):
     context.get_or_create_default_minifi_container().restart()
 
 
+@when("the \"{container_name}\" flow is restarted")
+def step_impl(context: MinifiTestContext, container_name: str):
+    context.get_or_create_minifi_container(container_name).restart()
+
+
+@when("the \"{container_name}\" flow is started")
+def step_impl(context: MinifiTestContext, container_name: str):
+    context.get_or_create_minifi_container(container_name).start()
+
+
+@when("the \"{container_name}\" flow is killed")
+def step_impl(context: MinifiTestContext, container_name: str):
+    context.get_or_create_minifi_container(container_name).kill()
+
+
 @given("OpenSSL FIPS mode is enabled in MiNiFi")
 def step_impl(context: MinifiTestContext):
     context.get_or_create_default_minifi_container().enable_openssl_fips_mode()
@@ -117,3 +154,8 @@ def step_impl(context: MinifiTestContext):
 @given("flow configuration path is set up in flow url property")
 def step_impl(context: MinifiTestContext):
     
context.get_or_create_default_minifi_container().fetch_flow_config_from_flow_url()
+
+
+@step("{duration} later")
+def step_impl(context: MinifiTestContext, duration: str):
+    time.sleep(humanfriendly.parse_timespan(duration))
diff --git 
a/behave_framework/src/minifi_test_framework/steps/flow_building_steps.py 
b/behave_framework/src/minifi_test_framework/steps/flow_building_steps.py
index 474dd5a0a..770f222a7 100644
--- a/behave_framework/src/minifi_test_framework/steps/flow_building_steps.py
+++ b/behave_framework/src/minifi_test_framework/steps/flow_building_steps.py
@@ -70,10 +70,16 @@ def step_impl(context: MinifiTestContext, processor_type: 
str, property_name: st
     context.containers["nifi"].flow_definition.add_processor(processor)
 
 
+@given('a {processor_type} processor with the name "{processor_name}" in the 
"{minifi_container_name}" flow')
+def step_impl(context: MinifiTestContext, processor_type: str, processor_name: 
str, minifi_container_name: str):
+    processor = Processor(processor_type, processor_name)
+    
context.get_or_create_minifi_container(minifi_container_name).flow_definition.add_processor(processor)
+
+
 @given('a {processor_type} processor with the name "{processor_name}"')
 def step_impl(context: MinifiTestContext, processor_type: str, processor_name: 
str):
-    processor = Processor(processor_type, processor_name)
-    
context.get_or_create_default_minifi_container().flow_definition.add_processor(processor)
+    context.execute_steps(
+        f'given a {processor_type} processor with the name "{processor_name}" 
in the "{DEFAULT_MINIFI_CONTAINER_NAME}" flow')
 
 
 @given("a {processor_type} processor in the \"{minifi_container_name}\" flow")
diff --git a/docker/RunBehaveTests.sh b/docker/RunBehaveTests.sh
index dff7fccc9..f982e3c6a 100755
--- a/docker/RunBehaveTests.sh
+++ b/docker/RunBehaveTests.sh
@@ -207,4 +207,5 @@ exec \
     "${docker_dir}/../extensions/gcp/tests/features" \
     "${docker_dir}/../extensions/grafana-loki/tests/features" \
     "${docker_dir}/../extensions/lua/tests/features/" \
-    "${docker_dir}/../extensions/civetweb/tests/features/"
+    "${docker_dir}/../extensions/civetweb/tests/features/" \
+    "${docker_dir}/../extensions/mqtt/tests/features/"
diff --git a/docker/requirements.txt b/docker/requirements.txt
index a50c768a0..0092b5e25 100644
--- a/docker/requirements.txt
+++ b/docker/requirements.txt
@@ -10,4 +10,3 @@ azure-storage-blob==12.24.1
 prometheus-api-client==0.5.5
 humanfriendly==10.0
 requests<2.29  # https://github.com/docker/docker-py/issues/3113
-paho-mqtt==2.1.0
diff --git a/docker/test/integration/cluster/ContainerStore.py 
b/docker/test/integration/cluster/ContainerStore.py
index f139acd21..4b1b69a29 100644
--- a/docker/test/integration/cluster/ContainerStore.py
+++ b/docker/test/integration/cluster/ContainerStore.py
@@ -21,7 +21,6 @@ from .containers.NifiContainer import NiFiOptions
 from .containers.AzureStorageServerContainer import AzureStorageServerContainer
 from .containers.HttpProxyContainer import HttpProxyContainer
 from .containers.PostgreSQLServerContainer import PostgreSQLServerContainer
-from .containers.MqttBrokerContainer import MqttBrokerContainer
 from .containers.SyslogUdpClientContainer import SyslogUdpClientContainer
 from .containers.SyslogTcpClientContainer import SyslogTcpClientContainer
 from .containers.MinifiAsPodInKubernetesCluster import 
MinifiAsPodInKubernetesCluster
@@ -128,14 +127,6 @@ class ContainerStore:
                                                                         
network=self.network,
                                                                         
image_store=self.image_store,
                                                                         
command=command))
-        elif engine == 'mqtt-broker':
-            return self.containers.setdefault(container_name,
-                                              
MqttBrokerContainer(feature_context=feature_context,
-                                                                  
name=container_name,
-                                                                  
vols=self.vols,
-                                                                  
network=self.network,
-                                                                  
image_store=self.image_store,
-                                                                  
command=command))
         elif engine == "syslog-udp-client":
             return self.containers.setdefault(container_name,
                                               SyslogUdpClientContainer(
diff --git a/docker/test/integration/cluster/DockerTestCluster.py 
b/docker/test/integration/cluster/DockerTestCluster.py
index 458e0bd88..006aa97c5 100644
--- a/docker/test/integration/cluster/DockerTestCluster.py
+++ b/docker/test/integration/cluster/DockerTestCluster.py
@@ -23,7 +23,6 @@ from .checkers.AzureChecker import AzureChecker
 from .checkers.PostgresChecker import PostgresChecker
 from .checkers.PrometheusChecker import PrometheusChecker
 from .checkers.ModbusChecker import ModbusChecker
-from .checkers.MqttHelper import MqttHelper
 from utils import get_peak_memory_usage, get_minifi_pid, get_memory_usage
 
 
@@ -37,7 +36,6 @@ class DockerTestCluster:
         self.postgres_checker = PostgresChecker(self.container_communicator)
         self.prometheus_checker = PrometheusChecker()
         self.modbus_checker = ModbusChecker(self.container_communicator)
-        self.mqtt_helper = MqttHelper()
 
     def cleanup(self):
         self.container_store.cleanup()
@@ -278,6 +276,3 @@ class DockerTestCluster:
 
     def enable_ssl_in_nifi(self):
         self.container_store.enable_ssl_in_nifi()
-
-    def publish_test_mqtt_message(self, topic: str, message: str):
-        self.mqtt_helper.publish_test_mqtt_message(topic, message)
diff --git a/docker/test/integration/cluster/ImageStore.py 
b/docker/test/integration/cluster/ImageStore.py
index 50025e37a..a729cd030 100644
--- a/docker/test/integration/cluster/ImageStore.py
+++ b/docker/test/integration/cluster/ImageStore.py
@@ -63,8 +63,6 @@ class ImageStore:
             image = self.__build_http_proxy_image()
         elif container_engine == "postgresql-server":
             image = self.__build_postgresql_server_image()
-        elif container_engine == "mqtt-broker":
-            image = self.__build_mqtt_broker_image()
         else:
             raise Exception("There is no associated image for " + 
container_engine)
 
@@ -277,15 +275,6 @@ class ImageStore:
                 """.format(base_image='postgres:17.4'))
         return self.__build_image(dockerfile)
 
-    def __build_mqtt_broker_image(self):
-        dockerfile = dedent("""\
-            FROM {base_image}
-            RUN echo 'log_dest stderr' >> /mosquitto-no-auth.conf
-            CMD ["/usr/sbin/mosquitto", "--verbose", "--config-file", 
"/mosquitto-no-auth.conf"]
-            """.format(base_image='eclipse-mosquitto:2.0.14'))
-
-        return self.__build_image(dockerfile)
-
     def __build_image(self, dockerfile, context_files=[]):
         conf_dockerfile_buffer = BytesIO()
         docker_context_buffer = BytesIO()
diff --git a/docker/test/integration/cluster/containers/MqttBrokerContainer.py 
b/docker/test/integration/cluster/containers/MqttBrokerContainer.py
deleted file mode 100644
index faa168f31..000000000
--- a/docker/test/integration/cluster/containers/MqttBrokerContainer.py
+++ /dev/null
@@ -1,40 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-import logging
-from .Container import Container
-
-
-class MqttBrokerContainer(Container):
-    def __init__(self, feature_context, name, vols, network, image_store, 
command=None):
-        super().__init__(feature_context, name, 'mqtt-broker', vols, network, 
image_store, command)
-
-    def get_startup_finished_log_entry(self):
-        return "mosquitto version [0-9\\.]+ running"
-
-    def deploy(self):
-        if not self.set_deployed():
-            return
-
-        logging.info('Creating and running MQTT broker docker container...')
-        self.client.containers.run(
-            self.image_store.get_image(self.get_engine()),
-            detach=True,
-            name=self.name,
-            ports={'1883/tcp': 1883},
-            network=self.network.name,
-            entrypoint=self.command)
-        logging.info('Added container \'%s\'', self.name)
diff --git a/docker/test/integration/features/MiNiFi_integration_test_driver.py 
b/docker/test/integration/features/MiNiFi_integration_test_driver.py
index 7a36309cd..a85c48bb7 100644
--- a/docker/test/integration/features/MiNiFi_integration_test_driver.py
+++ b/docker/test/integration/features/MiNiFi_integration_test_driver.py
@@ -389,6 +389,3 @@ class MiNiFi_integration_test:
 
     def enable_ssl_in_nifi(self):
         self.cluster.enable_ssl_in_nifi()
-
-    def publish_test_mqtt_message(self, topic, message):
-        self.cluster.publish_test_mqtt_message(topic, message)
diff --git a/docker/test/integration/features/steps/steps.py 
b/docker/test/integration/features/steps/steps.py
index ce222141b..521679e6f 100644
--- a/docker/test/integration/features/steps/steps.py
+++ b/docker/test/integration/features/steps/steps.py
@@ -121,7 +121,6 @@ def step_impl(context, processor_type, 
minifi_container_name):
 
 @given("a {processor_type} processor")
 @given("a {processor_type} processor set up to communicate with an Azure blob 
storage")
-@given("a {processor_type} processor set up to communicate with an MQTT broker 
instance")
 def step_impl(context, processor_type):
     __create_processor(context, processor_type, processor_type, None, None, 
"minifi-cpp-flow")
 
@@ -486,16 +485,6 @@ def step_impl(context, processor_name, 
service_property_name, property_name, pro
     __set_up_the_kubernetes_controller_service(context, processor_name, 
service_property_name, {property_name: property_value})
 
 
-# MQTT setup
-@when("an MQTT broker is set up in correspondence with the PublishMQTT")
-@given("an MQTT broker is set up in correspondence with the PublishMQTT")
-@given("an MQTT broker is set up in correspondence with the ConsumeMQTT")
-@given("an MQTT broker is set up in correspondence with the PublishMQTT and 
ConsumeMQTT")
-def step_impl(context):
-    context.test.acquire_container(context=context, name="mqtt-broker", 
engine="mqtt-broker")
-    context.test.start('mqtt-broker')
-
-
 # azure storage setup
 @given("an Azure storage server is set up")
 def step_impl(context):
@@ -779,22 +768,6 @@ def step_impl(context, regex, duration):
     context.test.check_minifi_log_matches_regex(regex, 
humanfriendly.parse_timespan(duration))
 
 
-# MQTT
-@then("the MQTT broker has a log line matching \"{log_pattern}\"")
-def step_impl(context, log_pattern):
-    context.test.check_container_log_matches_regex('mqtt-broker', log_pattern, 
60, count=1)
-
-
-@then("the MQTT broker has {log_count} log lines matching \"{log_pattern}\"")
-def step_impl(context, log_count, log_pattern):
-    context.test.check_container_log_matches_regex('mqtt-broker', log_pattern, 
60, count=int(log_count))
-
-
-@when("a test message \"{message}\" is published to the MQTT broker on topic 
\"{topic}\"")
-def step_impl(context, message, topic):
-    context.test.publish_test_mqtt_message(topic, message)
-
-
 @then("the \"{minifi_container_name}\" flow has a log line matching 
\"{log_pattern}\" in less than {duration}")
 def step_impl(context, minifi_container_name, log_pattern, duration):
     context.test.check_container_log_matches_regex(minifi_container_name, 
log_pattern, humanfriendly.parse_timespan(duration), count=1)
diff --git a/docker/test/integration/minifi/processors/ConsumeMQTT.py 
b/docker/test/integration/minifi/processors/ConsumeMQTT.py
deleted file mode 100644
index 8c64be6a3..000000000
--- a/docker/test/integration/minifi/processors/ConsumeMQTT.py
+++ /dev/null
@@ -1,30 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-from ..core.Processor import Processor
-
-
-class ConsumeMQTT(Processor):
-    def __init__(self, context, schedule={'scheduling strategy': 
'TIMER_DRIVEN'}):
-        super(ConsumeMQTT, self).__init__(
-            context=context,
-            clazz='ConsumeMQTT',
-            properties={
-                'Broker URI': f'mqtt-broker-{context.feature_id}:1883',
-                'Topic': 'testtopic',
-                'Client ID': 'consumer-client'},
-            auto_terminate=['success'],
-            schedule=schedule)
diff --git a/docker/test/integration/minifi/processors/PublishMQTT.py 
b/docker/test/integration/minifi/processors/PublishMQTT.py
deleted file mode 100644
index 41387ce76..000000000
--- a/docker/test/integration/minifi/processors/PublishMQTT.py
+++ /dev/null
@@ -1,30 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-from ..core.Processor import Processor
-
-
-class PublishMQTT(Processor):
-    def __init__(self, context, schedule={'scheduling strategy': 
'EVENT_DRIVEN'}):
-        super(PublishMQTT, self).__init__(
-            context=context,
-            clazz='PublishMQTT',
-            properties={
-                'Broker URI': f'mqtt-broker-{context.feature_id}:1883',
-                'Topic': 'testtopic',
-                'Client ID': 'publisher-client'},
-            auto_terminate=['success', 'failure'],
-            schedule=schedule)
diff --git 
a/extensions/couchbase/tests/features/steps/couchbase_server_container.py 
b/extensions/couchbase/tests/features/steps/couchbase_server_container.py
index 96cddf821..893ba9de9 100644
--- a/extensions/couchbase/tests/features/steps/couchbase_server_container.py
+++ b/extensions/couchbase/tests/features/steps/couchbase_server_container.py
@@ -82,8 +82,8 @@ class CouchbaseServerContainer(Container):
             self.client.containers.run("minifi-couchbase-helper:latest", 
["python", "-c", command], remove=True, stdout=True, stderr=True, 
network=self.network.name)
             return True
         except ContainerError as e:
-            stdout = e.stdout.decode("utf-8", errors="replace") if e.stdout 
else ""
-            stderr = e.stderr.decode("utf-8", errors="replace") if e.stderr 
else ""
+            stdout = e.stdout.decode("utf-8", errors="replace") if hasattr(e, 
"stdout") and e.stdout else ""
+            stderr = e.stderr.decode("utf-8", errors="replace") if hasattr(e, 
"stderr") and e.stderr else ""
             logging.error(f"Python command '{command}' failed in couchbase 
helper docker with error: '{e}', stdout: '{stdout}', stderr: '{stderr}'")
             return False
         except Exception as e:
diff --git 
a/extensions/grafana-loki/tests/features/steps/grafana_loki_container.py 
b/extensions/grafana-loki/tests/features/steps/grafana_loki_container.py
index 46fc84b5a..3d7e497aa 100644
--- a/extensions/grafana-loki/tests/features/steps/grafana_loki_container.py
+++ b/extensions/grafana-loki/tests/features/steps/grafana_loki_container.py
@@ -142,8 +142,8 @@ analytics:
                                        remove=True, stdout=True, stderr=True, 
network=self.network.name)
             return True
         except ContainerError as e:
-            stdout = e.stdout.decode("utf-8", errors="replace") if e.stdout 
else ""
-            stderr = e.stderr.decode("utf-8", errors="replace") if e.stderr 
else ""
+            stdout = e.stdout.decode("utf-8", errors="replace") if hasattr(e, 
"stdout") and e.stdout else ""
+            stderr = e.stderr.decode("utf-8", errors="replace") if hasattr(e, 
"stderr") and e.stderr else ""
             logging.error(f"Failed to run python command in grafana loki 
helper docker with error: '{e}', stdout: '{stdout}', stderr: '{stderr}'")
             return False
         except Exception as e:
diff --git a/extensions/mqtt/tests/features/environment.py 
b/extensions/mqtt/tests/features/environment.py
new file mode 100644
index 000000000..b3bbe9df0
--- /dev/null
+++ b/extensions/mqtt/tests/features/environment.py
@@ -0,0 +1,44 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from pathlib import Path
+from minifi_test_framework.containers.docker_image_builder import 
DockerImageBuilder
+from minifi_test_framework.core.hooks import common_before_scenario
+from minifi_test_framework.core.hooks import common_after_scenario
+
+
+def before_all(context):
+    check_log_lines_path = Path(__file__).resolve().parent / "resources" / 
"publish_mqtt_message.py"
+    check_log_lines_content = None
+    with open(check_log_lines_path, "rb") as f:
+        check_log_lines_content = f.read()
+    dockerfile = """
+FROM python:3.13-slim-bookworm
+RUN pip install paho-mqtt==2.1.0
+COPY publish_mqtt_message.py /scripts/publish_mqtt_message.py"""
+    mqtt_helper_builder = DockerImageBuilder(
+        image_tag="minifi-mqtt-helper:latest",
+        dockerfile_content=dockerfile,
+        files_on_context={"publish_mqtt_message.py": check_log_lines_content}
+    )
+    mqtt_helper_builder.build()
+
+
+def before_scenario(context, scenario):
+    common_before_scenario(context, scenario)
+
+
+def after_scenario(context, scenario):
+    common_after_scenario(context, scenario)
diff --git a/docker/test/integration/features/mqtt.feature 
b/extensions/mqtt/tests/features/mqtt.feature
similarity index 60%
rename from docker/test/integration/features/mqtt.feature
rename to extensions/mqtt/tests/features/mqtt.feature
index 64ef62f6b..1f4bd10d4 100644
--- a/docker/test/integration/features/mqtt.feature
+++ b/extensions/mqtt/tests/features/mqtt.feature
@@ -19,22 +19,21 @@ Feature: Sending data to MQTT streaming platform using 
PublishMQTT
   As a user of MiNiFi
   I need to have PublishMQTT and ConsumeMQTT processors
 
-  Background:
-    Given the content of "/tmp/output" is monitored
-
   Scenario Outline: A MiNiFi instance transfers data to an MQTT broker
     Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
     And a file with the content "test" is present in "/tmp/input"
     And a PublishMQTT processor set up to communicate with an MQTT broker 
instance
     And the "MQTT Version" property of the PublishMQTT processor is set to 
"<version>"
     And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And PutFile is EVENT_DRIVEN
     And the "success" relationship of the GetFile processor is connected to 
the PublishMQTT
     And the "success" relationship of the PublishMQTT processor is connected 
to the PutFile
+    And PutFile's success relationship is auto-terminated
 
-    And an MQTT broker is set up in correspondence with the PublishMQTT
+    And an MQTT broker is started
 
-    When both instances start up
-    Then a flowfile with the content "test" is placed in the monitored 
directory in less than 60 seconds
+    When the MiNiFi instance starts up
+    Then a single file with the content "test" is placed in the "/tmp/output" 
directory in less than 60 seconds
     And the MQTT broker has a log line matching "Received PUBLISH from 
.*testtopic.*\(4 bytes\)"
 
     Examples: MQTT versions
@@ -48,12 +47,14 @@ Feature: Sending data to MQTT streaming platform using 
PublishMQTT
     And a PublishMQTT processor set up to communicate with an MQTT broker 
instance
     And the "MQTT Version" property of the PublishMQTT processor is set to 
"<version>"
     And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And PutFile is EVENT_DRIVEN
     And the "success" relationship of the GetFile processor is connected to 
the PublishMQTT
     And the "success" relationship of the PublishMQTT processor is connected 
to the PutFile
     And the "failure" relationship of the PublishMQTT processor is connected 
to the PutFile
+    And PutFile's success relationship is auto-terminated
 
     When the MiNiFi instance starts up
-    Then no files are placed in the monitored directory in 30 seconds of 
running time
+    Then no files are placed in the "/tmp/output" directory in 30 seconds of 
running time
 
     Examples: MQTT versions
     | version  |
@@ -66,14 +67,16 @@ Feature: Sending data to MQTT streaming platform using 
PublishMQTT
     And a PublishMQTT processor set up to communicate with an MQTT broker 
instance
     And the "MQTT Version" property of the PublishMQTT processor is set to 
"<version>"
     And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And PutFile is EVENT_DRIVEN
     And the "success" relationship of the GetFile processor is connected to 
the PublishMQTT
     And the "success" relationship of the PublishMQTT processor is connected 
to the PutFile
+    And PutFile's success relationship is auto-terminated
 
     When the MiNiFi instance starts up
-    Then no files are placed in the monitored directory in 10 seconds of 
running time
+    Then no files are placed in the "/tmp/output" directory in 10 seconds of 
running time
 
-    When an MQTT broker is set up in correspondence with the PublishMQTT
-    Then a flowfile with the content "test" is placed in the monitored 
directory in less than 60 seconds
+    When an MQTT broker is started
+    Then a single file with the content "test" is placed in the "/tmp/output" 
directory in less than 60 seconds
     And the MQTT broker has a log line matching "Received PUBLISH from 
.*testtopic.*\(4 bytes\)"
 
     Examples: MQTT versions
@@ -84,24 +87,28 @@ Feature: Sending data to MQTT streaming platform using 
PublishMQTT
   Scenario Outline: A MiNiFi instance publishes and consumes data to/from an 
MQTT broker
     Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
     And a PublishMQTT processor set up to communicate with an MQTT broker 
instance
+    And PublishMQTT is EVENT_DRIVEN
     And the "MQTT Version" property of the PublishMQTT processor is set to 
"<version>"
     And the "success" relationship of the GetFile processor is connected to 
the PublishMQTT
+    And PublishMQTT's success relationship is auto-terminated
 
     And a ConsumeMQTT processor set up to communicate with an MQTT broker 
instance
     And the "MQTT Version" property of the ConsumeMQTT processor is set to 
"<version>"
     And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And PutFile is EVENT_DRIVEN
     And a LogAttribute processor
-    And "ConsumeMQTT" processor is a start node
+    And LogAttribute is EVENT_DRIVEN
     And the "success" relationship of the ConsumeMQTT processor is connected 
to the PutFile
     And the "success" relationship of the PutFile processor is connected to 
the LogAttribute
+    And LogAttribute's success relationship is auto-terminated
 
-    And an MQTT broker is set up in correspondence with the PublishMQTT and 
ConsumeMQTT
+    And an MQTT broker is started
 
-    When both instances start up
+    When the MiNiFi instance starts up
     Then the MQTT broker has a log line matching "Received SUBSCRIBE from 
consumer-client"
     And the MQTT broker has a log line matching "New client connected from .* 
as publisher-client"
-    And a file with the content "test" is placed in "/tmp/input"
-    And a flowfile with the content "test" is placed in the monitored 
directory in less than 60 seconds
+    When a file with the content "test" is placed in "/tmp/input"
+    Then a single file with the content "test" is placed in the "/tmp/output" 
directory in less than 60 seconds
     And the MQTT broker has a log line matching "Received PUBLISH from 
.*testtopic.*\(4 bytes\)"
     And the Minifi logs contain the following message: "key:mqtt.broker 
value:mqtt-broker-" in less than 60 seconds
     And the Minifi logs contain the following message: "key:mqtt.topic 
value:testtopic" in less than 1 seconds
@@ -120,24 +127,27 @@ Feature: Sending data to MQTT streaming platform using 
PublishMQTT
   Scenario Outline: Subscription to topics with wildcards
     Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
     And a PublishMQTT processor set up to communicate with an MQTT broker 
instance
+    And PublishMQTT is EVENT_DRIVEN
     And the "Topic" property of the PublishMQTT processor is set to 
"test/my/topic"
     And the "MQTT Version" property of the PublishMQTT processor is set to 
"<version>"
     And the "success" relationship of the GetFile processor is connected to 
the PublishMQTT
+    And PublishMQTT's success relationship is auto-terminated
 
     And a ConsumeMQTT processor set up to communicate with an MQTT broker 
instance
     And the "Topic" property of the ConsumeMQTT processor is set to "<topic 
wildcard pattern>"
     And the "MQTT Version" property of the ConsumeMQTT processor is set to 
"<version>"
     And a PutFile processor with the "Directory" property set to "/tmp/output"
-    And "ConsumeMQTT" processor is a start node
+    And PutFile is EVENT_DRIVEN
     And the "success" relationship of the ConsumeMQTT processor is connected 
to the PutFile
+    And PutFile's success relationship is auto-terminated
 
-    And an MQTT broker is set up in correspondence with the PublishMQTT and 
ConsumeMQTT
+    And an MQTT broker is started
 
-    When both instances start up
+    When the MiNiFi instance starts up
     Then the MQTT broker has a log line matching "Received SUBSCRIBE from 
consumer-client"
     And the MQTT broker has a log line matching "New client connected from .* 
as publisher-client"
-    And a file with the content "test" is placed in "/tmp/input"
-    And a flowfile with the content "test" is placed in the monitored 
directory in less than 60 seconds
+    When a file with the content "test" is placed in "/tmp/input"
+    Then a single file with the content "test" is placed in the "/tmp/output" 
directory in less than 60 seconds
     And the MQTT broker has a log line matching "Received PUBLISH from 
.*test/my/topic.*\(4 bytes\)"
 
     Examples: Topic wildcard patterns
@@ -150,33 +160,37 @@ Feature: Sending data to MQTT streaming platform using 
PublishMQTT
   Scenario Outline: Subscription and publishing with disconnecting clients in 
persistent sessions
     # publishing MQTT client
     Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input" in the "publisher-client" flow
-    And a PublishMQTT processor in the "publisher-client" flow
-    And the "MQTT Version" property of the PublishMQTT processor is set to 
"<version>"
-    And the "Quality of Service" property of the PublishMQTT processor is set 
to "1"
-    And the "success" relationship of the GetFile processor is connected to 
the PublishMQTT
+    And a PublishMQTT processor set up to communicate with an MQTT broker 
instance in the "publisher-client" flow
+    And PublishMQTT is EVENT_DRIVEN in the "publisher-client" flow
+    And the "MQTT Version" property of the PublishMQTT processor is set to 
"<version>" in the "publisher-client" flow
+    And the "Quality of Service" property of the PublishMQTT processor is set 
to "1" in the "publisher-client" flow
+    And in the "publisher-client" flow the "success" relationship of the 
GetFile processor is connected to the PublishMQTT
+    And PublishMQTT's success relationship is auto-terminated in the 
"publisher-client" flow
 
     # consuming MQTT client
-    And a ConsumeMQTT processor in the "consumer-client" flow
-    And the "MQTT Version" property of the ConsumeMQTT processor is set to 
"<version>"
-    And the "Quality of Service" property of the ConsumeMQTT processor is set 
to "1"
-    And the "<persistent_session_property_1>" property of the ConsumeMQTT 
processor is set to "<persistent_session_property_1_value>"
-    And the "<persistent_session_property_2>" property of the ConsumeMQTT 
processor is set to "<persistent_session_property_2_value>"
+    And a ConsumeMQTT processor set up to communicate with an MQTT broker 
instance in the "consumer-client" flow
+    And the "MQTT Version" property of the ConsumeMQTT processor is set to 
"<version>" in the "consumer-client" flow
+    And the "Quality of Service" property of the ConsumeMQTT processor is set 
to "1" in the "consumer-client" flow
+    And the "<persistent_session_property_1>" property of the ConsumeMQTT 
processor is set to "<persistent_session_property_1_value>" in the 
"consumer-client" flow
+    And the "<persistent_session_property_2>" property of the ConsumeMQTT 
processor is set to "<persistent_session_property_2_value>" in the 
"consumer-client" flow
     And a PutFile processor with the "Directory" property set to "/tmp/output" 
in the "consumer-client" flow
-    And the "success" relationship of the ConsumeMQTT processor is connected 
to the PutFile
+    And PutFile is EVENT_DRIVEN in the "consumer-client" flow
+    And in the "consumer-client" flow the "success" relationship of the 
ConsumeMQTT processor is connected to the PutFile
+    And PutFile's success relationship is auto-terminated in the 
"consumer-client" flow
 
-    And an MQTT broker is set up in correspondence with the PublishMQTT and 
ConsumeMQTT
+    And an MQTT broker is started
 
     When all instances start up
     Then the MQTT broker has a log line matching "Received SUBSCRIBE from 
consumer-client"
-    And "consumer-client" flow is stopped
-    And the MQTT broker has a log line matching "Received DISCONNECT from 
consumer-client"
+    When the "consumer-client" flow is stopped
+    Then the MQTT broker has a log line matching "Received DISCONNECT from 
consumer-client"
 
-    And a file with the content "test" is placed in "/tmp/input"
-    And the MQTT broker has a log line matching "Received PUBLISH from 
.*testtopic.*\(4 bytes\)"
+    When a file with the content "test" is placed in "/tmp/input" in the 
"publisher-client" flow
+    Then the MQTT broker has a log line matching "Received PUBLISH from 
.*testtopic.*\(4 bytes\)"
 
-    And "consumer-client" flow is restarted
-    And the MQTT broker has 2 log lines matching "New client connected from .* 
as consumer-client"
-    And a flowfile with the content "test" is placed in the monitored 
directory in less than 60 seconds
+    When the "consumer-client" flow is restarted
+    Then the MQTT broker has 2 log lines matching "New client connected from 
.* as consumer-client"
+    And in the "consumer-client" container a single file with the content 
"test" is placed in the "/tmp/output" directory in less than 60 seconds
 
     Examples: MQTT versions
     | version  | persistent_session_property_1 | 
persistent_session_property_1_value | persistent_session_property_2 | 
persistent_session_property_2_value |
@@ -186,24 +200,27 @@ Feature: Sending data to MQTT streaming platform using 
PublishMQTT
   Scenario Outline: UTF-8 topics and messages
     Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
     And a PublishMQTT processor set up to communicate with an MQTT broker 
instance
+    And PublishMQTT is EVENT_DRIVEN
     And the "MQTT Version" property of the PublishMQTT processor is set to 
"<version>"
     And the "Topic" property of the PublishMQTT processor is set to "<topic>"
     And the "success" relationship of the GetFile processor is connected to 
the PublishMQTT
+    And PublishMQTT's success relationship is auto-terminated
 
     And a ConsumeMQTT processor set up to communicate with an MQTT broker 
instance
     And the "MQTT Version" property of the ConsumeMQTT processor is set to 
"<version>"
     And the "Topic" property of the ConsumeMQTT processor is set to "<topic>"
     And a PutFile processor with the "Directory" property set to "/tmp/output"
-    And "ConsumeMQTT" processor is a start node
+    And PutFile is EVENT_DRIVEN
     And the "success" relationship of the ConsumeMQTT processor is connected 
to the PutFile
+    And PutFile's success relationship is auto-terminated
 
-    And an MQTT broker is set up in correspondence with the PublishMQTT and 
ConsumeMQTT
+    And an MQTT broker is started
 
-    When both instances start up
+    When the MiNiFi instance starts up
     Then the MQTT broker has a log line matching "Received SUBSCRIBE from 
consumer-client"
     And the MQTT broker has a log line matching "New client connected from .* 
as publisher-client"
-    And a file with the content "<message>" is placed in "/tmp/input"
-    And a flowfile with the content "<message>" is placed in the monitored 
directory in less than 60 seconds
+    When a file with the content "<message>" is placed in "/tmp/input"
+    Then a single file with the content "<message>" is placed in the 
"/tmp/output" directory in less than 60 seconds
     And the MQTT broker has a log line matching "Received PUBLISH from 
.*<topic>"
 
     Examples: Topics, messages and version
@@ -214,28 +231,30 @@ Feature: Sending data to MQTT streaming platform using 
PublishMQTT
     | محمد بن موسی خوارزمی   | ٱلْجَبْر       | 3.x AUTO |
     | תַּלְמוּד                  | תּוֹרָה        | 3.x AUTO |
 
-
   Scenario Outline: QoS 0 message flow is correct
     Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
     And a PublishMQTT processor set up to communicate with an MQTT broker 
instance
+    And PublishMQTT is EVENT_DRIVEN
     And the "MQTT Version" property of the PublishMQTT processor is set to 
"<version>"
     And the "Quality of Service" property of the PublishMQTT processor is set 
to "0"
     And the "success" relationship of the GetFile processor is connected to 
the PublishMQTT
+    And PublishMQTT's success relationship is auto-terminated
 
     And a ConsumeMQTT processor set up to communicate with an MQTT broker 
instance
     And the "MQTT Version" property of the ConsumeMQTT processor is set to 
"<version>"
     And the "Quality of Service" property of the ConsumeMQTT processor is set 
to "0"
     And a PutFile processor with the "Directory" property set to "/tmp/output"
-    And "ConsumeMQTT" processor is a start node
+    And PutFile is EVENT_DRIVEN
     And the "success" relationship of the ConsumeMQTT processor is connected 
to the PutFile
+    And PutFile's success relationship is auto-terminated
 
-    And an MQTT broker is set up in correspondence with the PublishMQTT and 
ConsumeMQTT
+    And an MQTT broker is started
 
-    When both instances start up
+    When the MiNiFi instance starts up
     Then the MQTT broker has a log line matching "Received SUBSCRIBE from 
consumer-client"
     And the MQTT broker has a log line matching "New client connected from .* 
as publisher-client"
-    And a file with the content "test" is placed in "/tmp/input"
-    And a flowfile with the content "test" is placed in the monitored 
directory in less than 60 seconds
+    When a file with the content "test" is placed in "/tmp/input"
+    Then a single file with the content "test" is placed in the "/tmp/output" 
directory in less than 60 seconds
     And the MQTT broker has a log line matching "Received PUBLISH from 
publisher-client \(d0, q0, r0, m0, 'testtopic'.*\(4 bytes\)"
     And the MQTT broker has a log line matching "Sending PUBLISH to 
consumer-client \(d0, q0, r0, m0, 'testtopic',.*\(4 bytes\)\)"
 
@@ -247,24 +266,27 @@ Feature: Sending data to MQTT streaming platform using 
PublishMQTT
   Scenario Outline: QoS 1 Subscriber sends PUBACK on a PUBLISH message, with 
correct packet ID
     Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
     And a PublishMQTT processor set up to communicate with an MQTT broker 
instance
+    And PublishMQTT is EVENT_DRIVEN
     And the "MQTT Version" property of the PublishMQTT processor is set to 
"<version>"
     And the "Quality of Service" property of the PublishMQTT processor is set 
to "1"
     And the "success" relationship of the GetFile processor is connected to 
the PublishMQTT
+    And PublishMQTT's success relationship is auto-terminated
 
     And a ConsumeMQTT processor set up to communicate with an MQTT broker 
instance
     And the "MQTT Version" property of the ConsumeMQTT processor is set to 
"<version>"
     And the "Quality of Service" property of the ConsumeMQTT processor is set 
to "1"
     And a PutFile processor with the "Directory" property set to "/tmp/output"
-    And "ConsumeMQTT" processor is a start node
+    And PutFile is EVENT_DRIVEN
     And the "success" relationship of the ConsumeMQTT processor is connected 
to the PutFile
+    And PutFile's success relationship is auto-terminated
 
-    And an MQTT broker is set up in correspondence with the PublishMQTT and 
ConsumeMQTT
+    And an MQTT broker is started
 
-    When both instances start up
+    When the MiNiFi instance starts up
     Then the MQTT broker has a log line matching "Received SUBSCRIBE from 
consumer-client"
     And the MQTT broker has a log line matching "New client connected from .* 
as publisher-client"
-    And a file with the content "test" is placed in "/tmp/input"
-    And a flowfile with the content "test" is placed in the monitored 
directory in less than 60 seconds
+    When a file with the content "test" is placed in "/tmp/input"
+    Then a single file with the content "test" is placed in the "/tmp/output" 
directory in less than 60 seconds
     And the MQTT broker has a log line matching "Received PUBLISH from 
publisher-client.*m1, 'testtopic'.*\(4 bytes\)"
     And the MQTT broker has a log line matching "Sending PUBACK to 
publisher-client \(m1, rc0\)"
     And the MQTT broker has a log line matching "Sending PUBLISH to 
consumer-client \(d0, q1, r0, m1, 'testtopic',.*\(4 bytes\)\)"
@@ -278,24 +300,27 @@ Feature: Sending data to MQTT streaming platform using 
PublishMQTT
   Scenario Outline: QoS 2 message flow is correct
     Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
     And a PublishMQTT processor set up to communicate with an MQTT broker 
instance
+    And PublishMQTT is EVENT_DRIVEN
     And the "MQTT Version" property of the PublishMQTT processor is set to 
"<version>"
     And the "Quality of Service" property of the PublishMQTT processor is set 
to "2"
     And the "success" relationship of the GetFile processor is connected to 
the PublishMQTT
+    And PublishMQTT's success relationship is auto-terminated
 
     And a ConsumeMQTT processor set up to communicate with an MQTT broker 
instance
     And the "MQTT Version" property of the ConsumeMQTT processor is set to 
"<version>"
     And the "Quality of Service" property of the ConsumeMQTT processor is set 
to "2"
     And a PutFile processor with the "Directory" property set to "/tmp/output"
-    And "ConsumeMQTT" processor is a start node
+    And PutFile is EVENT_DRIVEN
     And the "success" relationship of the ConsumeMQTT processor is connected 
to the PutFile
+    And PutFile's success relationship is auto-terminated
 
-    And an MQTT broker is set up in correspondence with the PublishMQTT and 
ConsumeMQTT
+    And an MQTT broker is started
 
-    When both instances start up
+    When the MiNiFi instance starts up
     Then the MQTT broker has a log line matching "Received SUBSCRIBE from 
consumer-client"
     And the MQTT broker has a log line matching "New client connected from .* 
as publisher-client"
-    And a file with the content "test" is placed in "/tmp/input"
-    And a flowfile with the content "test" is placed in the monitored 
directory in less than 60 seconds
+    When a file with the content "test" is placed in "/tmp/input"
+    Then a single file with the content "test" is placed in the "/tmp/output" 
directory in less than 60 seconds
     And the MQTT broker has a log line matching "Received PUBLISH from 
publisher-client.*m1, 'testtopic'.*\(4 bytes\)"
     And the MQTT broker has a log line matching "Sending PUBREC to 
publisher-client \(m1, rc0\)"
     And the MQTT broker has a log line matching "Received PUBREL from 
publisher-client \(Mid: 1\)"
@@ -313,28 +338,33 @@ Feature: Sending data to MQTT streaming platform using 
PublishMQTT
   Scenario Outline: Retained message
     # publishing MQTT client
     Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input" in the "publisher-client" flow
-    And a file with the content "test" is present in "/tmp/input"
-    And a PublishMQTT processor in the "publisher-client" flow
-    And the "MQTT Version" property of the PublishMQTT processor is set to 
"<version>"
-    And the "Retain" property of the PublishMQTT processor is set to "true"
-    And the "success" relationship of the GetFile processor is connected to 
the PublishMQTT
+    And the scheduling period of the GetFile processor is set to "120 seconds" 
in the "publisher-client" flow
+    And a file with the content "test" is present in "/tmp/input" in the 
"publisher-client" flow
+    And a PublishMQTT processor set up to communicate with an MQTT broker 
instance in the "publisher-client" flow
+    And PublishMQTT is EVENT_DRIVEN in the "publisher-client" flow
+    And the "MQTT Version" property of the PublishMQTT processor is set to 
"<version>" in the "publisher-client" flow
+    And the "Retain" property of the PublishMQTT processor is set to "true" in 
the "publisher-client" flow
+    And in the "publisher-client" flow the "success" relationship of the 
GetFile processor is connected to the PublishMQTT
+    And PublishMQTT's success relationship is auto-terminated in the 
"publisher-client" flow
 
     # consuming MQTT client
-    And a ConsumeMQTT processor in the "consumer-client" flow
-    And the "MQTT Version" property of the ConsumeMQTT processor is set to 
"<version>"
+    And a ConsumeMQTT processor set up to communicate with an MQTT broker 
instance in the "consumer-client" flow
+    And the "MQTT Version" property of the ConsumeMQTT processor is set to 
"<version>" in the "consumer-client" flow
     And a PutFile processor with the "Directory" property set to "/tmp/output" 
in the "consumer-client" flow
-    And the "success" relationship of the ConsumeMQTT processor is connected 
to the PutFile
+    And PutFile is EVENT_DRIVEN in the "consumer-client" flow
+    And in the "consumer-client" flow the "success" relationship of the 
ConsumeMQTT processor is connected to the PutFile
+    And PutFile's success relationship is auto-terminated in the 
"consumer-client" flow
 
-    And an MQTT broker is set up in correspondence with the PublishMQTT and 
ConsumeMQTT
+    And an MQTT broker is started
 
-    When both instances start up
+    When all instances start up
     Then the MQTT broker has a log line matching "Received PUBLISH from 
.*testtopic.*\(4 bytes\)"
 
     # consumer is joining late, but it will still see the retained message
-    And "consumer-client" flow is started
-    And the MQTT broker has a log line matching "Received SUBSCRIBE from 
consumer-client"
+    When the "consumer-client" flow is started
+    Then the MQTT broker has a log line matching "Received SUBSCRIBE from 
consumer-client"
 
-    And a flowfile with the content "test" is placed in the monitored 
directory in less than 60 seconds
+    And in the "consumer-client" container a single file with the content 
"test" is placed in the "/tmp/output" directory in less than 60 seconds
 
     Examples: MQTT versions
     | version  |
@@ -344,27 +374,31 @@ Feature: Sending data to MQTT streaming platform using 
PublishMQTT
   Scenario Outline: Last will
     # publishing MQTT client with last will
     Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input" in the "publisher-client" flow
-    And a PublishMQTT processor in the "publisher-client" flow
-    And the "MQTT Version" property of the PublishMQTT processor is set to 
"<version>"
-    And the "Last Will Topic" property of the PublishMQTT processor is set to 
"last_will_topic"
-    And the "Last Will Message" property of the PublishMQTT processor is set 
to "last_will_message"
-    And the "success" relationship of the GetFile processor is connected to 
the PublishMQTT
+    And a PublishMQTT processor set up to communicate with an MQTT broker 
instance in the "publisher-client" flow
+    And PublishMQTT is EVENT_DRIVEN in the "publisher-client" flow
+    And the "MQTT Version" property of the PublishMQTT processor is set to 
"<version>" in the "publisher-client" flow
+    And the "Last Will Topic" property of the PublishMQTT processor is set to 
"last_will_topic" in the "publisher-client" flow
+    And the "Last Will Message" property of the PublishMQTT processor is set 
to "last_will_message" in the "publisher-client" flow
+    And in the "publisher-client" flow the "success" relationship of the 
GetFile processor is connected to the PublishMQTT
+    And PublishMQTT's success relationship is auto-terminated in the 
"publisher-client" flow
 
     # consuming MQTT client set to consume last will topic
-    And a ConsumeMQTT processor in the "consumer-client" flow
-    And the "MQTT Version" property of the ConsumeMQTT processor is set to 
"<version>"
-    And the "Topic" property of the ConsumeMQTT processor is set to 
"last_will_topic"
+    And a ConsumeMQTT processor set up to communicate with an MQTT broker 
instance in the "consumer-client" flow
+    And the "MQTT Version" property of the ConsumeMQTT processor is set to 
"<version>" in the "consumer-client" flow
+    And the "Topic" property of the ConsumeMQTT processor is set to 
"last_will_topic" in the "consumer-client" flow
     And a PutFile processor with the "Directory" property set to "/tmp/output" 
in the "consumer-client" flow
-    And the "success" relationship of the ConsumeMQTT processor is connected 
to the PutFile
+    And PutFile is EVENT_DRIVEN in the "consumer-client" flow
+    And in the "consumer-client" flow the "success" relationship of the 
ConsumeMQTT processor is connected to the PutFile
+    And PutFile's success relationship is auto-terminated in the 
"consumer-client" flow
 
-    And an MQTT broker is set up in correspondence with the PublishMQTT and 
ConsumeMQTT
+    And an MQTT broker is started
 
     When all instances start up
     Then the MQTT broker has a log line matching "Sending CONNACK to 
publisher-client"
     And the MQTT broker has a log line matching "Received SUBSCRIBE from 
consumer-client"
-    And "publisher-client" flow is killed
-    And the MQTT broker has a log line matching "Sending PUBLISH to 
consumer-client"
-    And a flowfile with the content "last_will_message" is placed in the 
monitored directory in less than 60 seconds
+    When the "publisher-client" flow is killed
+    Then the MQTT broker has a log line matching "Sending PUBLISH to 
consumer-client"
+    And in the "consumer-client" container a single file with the content 
"last_will_message" is placed in the "/tmp/output" directory in less than 60 
seconds
 
     Examples: MQTT versions
     | version  |
@@ -376,9 +410,9 @@ Feature: Sending data to MQTT streaming platform using 
PublishMQTT
     And the "MQTT Version" property of the ConsumeMQTT processor is set to 
"<version>"
     And the "Keep Alive Interval" property of the ConsumeMQTT processor is set 
to "1 sec"
 
-    And an MQTT broker is set up in correspondence with the ConsumeMQTT
+    And an MQTT broker is started
 
-    When both instances start up
+    When the MiNiFi instance starts up
     Then the MQTT broker has a log line matching "Received PINGREQ from 
consumer-client"
     Then the MQTT broker has a log line matching "Sending PINGRESP to 
consumer-client"
 
@@ -390,171 +424,201 @@ Feature: Sending data to MQTT streaming platform using 
PublishMQTT
   Scenario Outline: Message Expiry Interval - MQTT 5
     # publishing MQTT client
     Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input" in the "publisher-client" flow
-    And a PublishMQTT processor in the "publisher-client" flow
-    And the "MQTT Version" property of the PublishMQTT processor is set to 
"5.0"
-    And the "Message Expiry Interval" property of the PublishMQTT processor is 
set to "<message_expiry_interval>"
-    And the "Quality of Service" property of the PublishMQTT processor is set 
to "1"
-    And the "success" relationship of the GetFile processor is connected to 
the PublishMQTT
+    And a PublishMQTT processor set up to communicate with an MQTT broker 
instance in the "publisher-client" flow
+    And PublishMQTT is EVENT_DRIVEN in the "publisher-client" flow
+    And the "MQTT Version" property of the PublishMQTT processor is set to 
"5.0" in the "publisher-client" flow
+    And the "Message Expiry Interval" property of the PublishMQTT processor is 
set to "<message_expiry_interval>" in the "publisher-client" flow
+    And the "Quality of Service" property of the PublishMQTT processor is set 
to "1" in the "publisher-client" flow
+    And in the "publisher-client" flow the "success" relationship of the 
GetFile processor is connected to the PublishMQTT
+    And PublishMQTT's success relationship is auto-terminated in the 
"publisher-client" flow
 
     # consuming MQTT client
-    And a ConsumeMQTT processor in the "consumer-client" flow
-    And the "MQTT Version" property of the ConsumeMQTT processor is set to 
"5.0"
-    And the "Quality of Service" property of the ConsumeMQTT processor is set 
to "1"
-    And the "Clean Start" property of the ConsumeMQTT processor is set to 
"false"
-    And the "Session Expiry Interval" property of the ConsumeMQTT processor is 
set to "1 h"
+    And a ConsumeMQTT processor set up to communicate with an MQTT broker 
instance in the "consumer-client" flow
+    And the "MQTT Version" property of the ConsumeMQTT processor is set to 
"5.0" in the "consumer-client" flow
+    And the "Quality of Service" property of the ConsumeMQTT processor is set 
to "1" in the "consumer-client" flow
+    And the "Clean Start" property of the ConsumeMQTT processor is set to 
"false" in the "consumer-client" flow
+    And the "Session Expiry Interval" property of the ConsumeMQTT processor is 
set to "1 h" in the "consumer-client" flow
     And a PutFile processor with the "Directory" property set to "/tmp/output" 
in the "consumer-client" flow
-    And the "success" relationship of the ConsumeMQTT processor is connected 
to the PutFile
+    And PutFile is EVENT_DRIVEN in the "consumer-client" flow
+    And in the "consumer-client" flow the "success" relationship of the 
ConsumeMQTT processor is connected to the PutFile
+    And PutFile's success relationship is auto-terminated in the 
"consumer-client" flow
 
-    And an MQTT broker is set up in correspondence with the PublishMQTT and 
ConsumeMQTT
+    And an MQTT broker is started
 
     When all instances start up
     Then the MQTT broker has a log line matching "Received SUBSCRIBE from 
consumer-client"
-    And "consumer-client" flow is stopped
-    And the MQTT broker has a log line matching "Received DISCONNECT from 
consumer-client"
+    When the "consumer-client" flow is stopped
+    Then the MQTT broker has a log line matching "Received DISCONNECT from 
consumer-client"
 
-    And a file with the content "test" is placed in "/tmp/input"
-    And the MQTT broker has a log line matching "Received PUBLISH from 
.*testtopic.*\(4 bytes\)"
+    When a file with the content "test" is placed in "/tmp/input" in the 
"publisher-client" flow
+    Then the MQTT broker has a log line matching "Received PUBLISH from 
.*testtopic.*\(4 bytes\)"
 
     And 2 seconds later
-    And "consumer-client" flow is restarted
-    And the MQTT broker has 2 log lines matching "New client connected from .* 
as consumer-client"
-    And <expectation_num_files> placed in the monitored directory in 
<expectation_time_limit>
+    When the "consumer-client" flow is restarted
+    Then the MQTT broker has 2 log lines matching "New client connected from 
.* as consumer-client"
+    And in the "consumer-client" container <expectation_num_files> placed in 
the "/tmp/output" directory in <expectation_time_limit>
 
     Examples: Message Expiry Intervals
-    | message_expiry_interval | expectation_num_files                  | 
expectation_time_limit     |
-    | 1 h                     | a flowfile with the content "test" is  | less 
than 60 seconds       |
-    | 1 s                     | no files are                           | 30 
seconds of running time |
+    | message_expiry_interval | expectation_num_files                     | 
expectation_time_limit     |
+    | 1 h                     | a single file with the content "test" is  | 
less than 60 seconds       |
+    | 1 s                     | no files are                              | 30 
seconds of running time |
 
   Scenario: User properties - MQTT 5
     # publishing MQTT client: GetFile -> UpdateAttribute (my_attr1:true, 
my_attr2:true) -> PublishMQTT
     Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input" in the "publisher-client" flow
     And a UpdateAttribute processor in the "publisher-client" flow
-    And the "my_attr1" property of the UpdateAttribute processor is set to 
"true"
-    And the "my_attr2" property of the UpdateAttribute processor is set to 
"true"
-    And a PublishMQTT processor in the "publisher-client" flow
-    And the "MQTT Version" property of the PublishMQTT processor is set to 
"5.0"
-    And the "success" relationship of the GetFile processor is connected to 
the UpdateAttribute
-    And the "success" relationship of the UpdateAttribute processor is 
connected to the PublishMQTT
+    And the "my_attr1" property of the UpdateAttribute processor is set to 
"true" in the "publisher-client" flow
+    And the "my_attr2" property of the UpdateAttribute processor is set to 
"true" in the "publisher-client" flow
+    And a PublishMQTT processor set up to communicate with an MQTT broker 
instance in the "publisher-client" flow
+    And PublishMQTT is EVENT_DRIVEN in the "publisher-client" flow
+    And the "MQTT Version" property of the PublishMQTT processor is set to 
"5.0" in the "publisher-client" flow
+    And in the "publisher-client" flow the "success" relationship of the 
GetFile processor is connected to the UpdateAttribute
+    And in the "publisher-client" flow the "success" relationship of the 
UpdateAttribute processor is connected to the PublishMQTT
+    And PublishMQTT's success relationship is auto-terminated in the 
"publisher-client" flow
 
     # consuming MQTT client: ConsumeMQTT -> RouteOnAttribute (my_attr1) -> 
RouteOnAttribute (my_attr2) -> PutFile
-    And a ConsumeMQTT processor in the "consumer-client" flow
-    And the "MQTT Version" property of the ConsumeMQTT processor is set to 
"5.0"
+    And a ConsumeMQTT processor set up to communicate with an MQTT broker 
instance in the "consumer-client" flow
+    And the "MQTT Version" property of the ConsumeMQTT processor is set to 
"5.0" in the "consumer-client" flow
     And a RouteOnAttribute processor with the name "RouteAttr1" in the 
"consumer-client" flow
-    And the "matched_my_attr1" property of the RouteAttr1 processor is set to 
"${my_attr1}"
+    And RouteAttr1 is EVENT_DRIVEN in the "consumer-client" flow
+    And the "matched_my_attr1" property of the RouteAttr1 processor is set to 
"${my_attr1}" in the "consumer-client" flow
     And a RouteOnAttribute processor with the name "RouteAttr2" in the 
"consumer-client" flow
-    And the "matched_my_attr2" property of the RouteAttr2 processor is set to 
"${my_attr2}"
+    And RouteAttr2 is EVENT_DRIVEN in the "consumer-client" flow
+    And the "matched_my_attr2" property of the RouteAttr2 processor is set to 
"${my_attr2}" in the "consumer-client" flow
     And a PutFile processor with the "Directory" property set to "/tmp/output" 
in the "consumer-client" flow
-    And the "success" relationship of the ConsumeMQTT processor is connected 
to the RouteAttr1
-    And the "matched_my_attr1" relationship of the RouteAttr1 processor is 
connected to the RouteAttr2
-    And the "matched_my_attr2" relationship of the RouteAttr2 processor is 
connected to the PutFile
+    And PutFile is EVENT_DRIVEN in the "consumer-client" flow
+    And in the "consumer-client" flow the "success" relationship of the 
ConsumeMQTT processor is connected to the RouteAttr1
+    And in the "consumer-client" flow the "matched_my_attr1" relationship of 
the RouteAttr1 processor is connected to the RouteAttr2
+    And in the "consumer-client" flow the "matched_my_attr2" relationship of 
the RouteAttr2 processor is connected to the PutFile
+    And PutFile's success relationship is auto-terminated in the 
"consumer-client" flow
 
-    And an MQTT broker is set up in correspondence with the PublishMQTT and 
ConsumeMQTT
+    And an MQTT broker is started
 
-    When both instances start up
+    When all instances start up
     Then the MQTT broker has a log line matching "Received SUBSCRIBE from 
consumer-client"
     And the MQTT broker has a log line matching "New client connected from .* 
as publisher-client"
-    And a file with the content "test" is placed in "/tmp/input"
-    And a flowfile with the content "test" is placed in the monitored 
directory in less than 60 seconds
+    When a file with the content "test" is placed in "/tmp/input" in the 
"publisher-client" flow
+    Then in the "consumer-client" container a single file with the content 
"test" is placed in the "/tmp/output" directory in less than 60 seconds
     And the MQTT broker has a log line matching "Received PUBLISH from 
.*testtopic.*\(4 bytes\)"
 
   Scenario: Content type - MQTT 5
     # publishing MQTT client: GetFile -> PublishMQTT (Content Type: text/plain)
     Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input" in the "publisher-client" flow
-    And a PublishMQTT processor in the "publisher-client" flow
-    And the "MQTT Version" property of the PublishMQTT processor is set to 
"5.0"
-    And the "Content Type" property of the PublishMQTT processor is set to 
"text/plain"
-    And the "success" relationship of the GetFile processor is connected to 
the PublishMQTT
+    And a PublishMQTT processor set up to communicate with an MQTT broker 
instance in the "publisher-client" flow
+    And PublishMQTT is EVENT_DRIVEN in the "publisher-client" flow
+    And the "MQTT Version" property of the PublishMQTT processor is set to 
"5.0" in the "publisher-client" flow
+    And the "Content Type" property of the PublishMQTT processor is set to 
"text/plain" in the "publisher-client" flow
+    And in the "publisher-client" flow the "success" relationship of the 
GetFile processor is connected to the PublishMQTT
+    And PublishMQTT's success relationship is auto-terminated in the 
"publisher-client" flow
 
     # consuming MQTT client: ConsumeMQTT (Attribute From Content Type: 
content_type) -> RouteOnAttribute (content_type = text/plain) -> PutFile
-    And a ConsumeMQTT processor in the "consumer-client" flow
-    And the "MQTT Version" property of the ConsumeMQTT processor is set to 
"5.0"
-    And the "Attribute From Content Type" property of the ConsumeMQTT 
processor is set to "content_type"
+    And a ConsumeMQTT processor set up to communicate with an MQTT broker 
instance in the "consumer-client" flow
+    And the "MQTT Version" property of the ConsumeMQTT processor is set to 
"5.0" in the "consumer-client" flow
+    And the "Attribute From Content Type" property of the ConsumeMQTT 
processor is set to "content_type" in the "consumer-client" flow
     And a RouteOnAttribute processor in the "consumer-client" flow
-    And the "matched_content_type" property of the RouteOnAttribute processor 
is set to "${content_type:equals('text/plain')}"
+    And RouteOnAttribute is EVENT_DRIVEN in the "consumer-client" flow
+    And the "matched_content_type" property of the RouteOnAttribute processor 
is set to "${content_type:equals('text/plain')}" in the "consumer-client" flow
     And a PutFile processor with the "Directory" property set to "/tmp/output" 
in the "consumer-client" flow
-    And the "success" relationship of the ConsumeMQTT processor is connected 
to the RouteOnAttribute
-    And the "matched_content_type" relationship of the RouteOnAttribute 
processor is connected to the PutFile
+    And PutFile is EVENT_DRIVEN in the "consumer-client" flow
+    And in the "consumer-client" flow the "success" relationship of the 
ConsumeMQTT processor is connected to the RouteOnAttribute
+    And in the "consumer-client" flow the "matched_content_type" relationship 
of the RouteOnAttribute processor is connected to the PutFile
+    And PutFile's success relationship is auto-terminated in the 
"consumer-client" flow
 
-    And an MQTT broker is set up in correspondence with the PublishMQTT and 
ConsumeMQTT
+    And an MQTT broker is started
 
-    When both instances start up
+    When all instances start up
     Then the MQTT broker has a log line matching "Received SUBSCRIBE from 
consumer-client"
     And the MQTT broker has a log line matching "New client connected from .* 
as publisher-client"
-    And a file with the content "test" is placed in "/tmp/input"
-    And a flowfile with the content "test" is placed in the monitored 
directory in less than 60 seconds
+    When a file with the content "test" is placed in "/tmp/input" in the 
"publisher-client" flow
+    Then in the "consumer-client" container a single file with the content 
"test" is placed in the "/tmp/output" directory in less than 60 seconds
     And the MQTT broker has a log line matching "Received PUBLISH from 
.*testtopic.*\(4 bytes\)"
 
   Scenario: Will content type - MQTT 5
     # publishing MQTT client: GetFile -> PublishMQTT (Last Will Content Type: 
text/plain)
     Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input" in the "publisher-client" flow
-    And a PublishMQTT processor in the "publisher-client" flow
-    And the "MQTT Version" property of the PublishMQTT processor is set to 
"5.0"
-    And the "Last Will Topic" property of the PublishMQTT processor is set to 
"last_will_topic"
-    And the "Last Will Message" property of the PublishMQTT processor is set 
to "last_will_message"
-    And the "Last Will Content Type" property of the PublishMQTT processor is 
set to "text/plain"
-    And the "success" relationship of the GetFile processor is connected to 
the PublishMQTT
+    And a PublishMQTT processor set up to communicate with an MQTT broker 
instance in the "publisher-client" flow
+    And PublishMQTT is EVENT_DRIVEN in the "publisher-client" flow
+    And the "MQTT Version" property of the PublishMQTT processor is set to 
"5.0" in the "publisher-client" flow
+    And the "Last Will Topic" property of the PublishMQTT processor is set to 
"last_will_topic" in the "publisher-client" flow
+    And the "Last Will Message" property of the PublishMQTT processor is set 
to "last_will_message" in the "publisher-client" flow
+    And the "Last Will Content Type" property of the PublishMQTT processor is 
set to "text/plain" in the "publisher-client" flow
+    And in the "publisher-client" flow the "success" relationship of the 
GetFile processor is connected to the PublishMQTT
+    And PublishMQTT's success relationship is auto-terminated in the 
"publisher-client" flow
 
     # consuming MQTT client: ConsumeMQTT (Attribute From Content Type: 
content_type, Topic: last_will_topic) -> RouteOnAttribute (content_type = 
text/plain) -> PutFile
-    And a ConsumeMQTT processor in the "consumer-client" flow
-    And the "MQTT Version" property of the ConsumeMQTT processor is set to 
"5.0"
-    And the "Topic" property of the ConsumeMQTT processor is set to 
"last_will_topic"
-    And the "Attribute From Content Type" property of the ConsumeMQTT 
processor is set to "content_type"
+    And a ConsumeMQTT processor set up to communicate with an MQTT broker 
instance in the "consumer-client" flow
+    And the "MQTT Version" property of the ConsumeMQTT processor is set to 
"5.0" in the "consumer-client" flow
+    And the "Topic" property of the ConsumeMQTT processor is set to 
"last_will_topic" in the "consumer-client" flow
+    And the "Attribute From Content Type" property of the ConsumeMQTT 
processor is set to "content_type" in the "consumer-client" flow
     And a RouteOnAttribute processor in the "consumer-client" flow
-    And the "matched_content_type" property of the RouteOnAttribute processor 
is set to "${content_type:equals('text/plain')}"
+    And RouteOnAttribute is EVENT_DRIVEN in the "consumer-client" flow
+    And the "matched_content_type" property of the RouteOnAttribute processor 
is set to "${content_type:equals('text/plain')}" in the "consumer-client" flow
     And a PutFile processor with the "Directory" property set to "/tmp/output" 
in the "consumer-client" flow
-    And the "success" relationship of the ConsumeMQTT processor is connected 
to the RouteOnAttribute
-    And the "matched_content_type" relationship of the RouteOnAttribute 
processor is connected to the PutFile
+    And PutFile is EVENT_DRIVEN in the "consumer-client" flow
+    And in the "consumer-client" flow the "success" relationship of the 
ConsumeMQTT processor is connected to the RouteOnAttribute
+    And in the "consumer-client" flow the "matched_content_type" relationship 
of the RouteOnAttribute processor is connected to the PutFile
+    And PutFile's success relationship is auto-terminated in the 
"consumer-client" flow
 
-    And an MQTT broker is set up in correspondence with the PublishMQTT and 
ConsumeMQTT
+    And an MQTT broker is started
 
     When all instances start up
     Then the MQTT broker has a log line matching "Sending CONNACK to 
publisher-client"
     And the MQTT broker has a log line matching "Received SUBSCRIBE from 
consumer-client"
-    And "publisher-client" flow is killed
-    And the MQTT broker has a log line matching "Sending PUBLISH to 
consumer-client"
-    And a flowfile with the content "last_will_message" is placed in the 
monitored directory in less than 60 seconds
+    When the "publisher-client" flow is killed
+    Then the MQTT broker has a log line matching "Sending PUBLISH to 
consumer-client"
+    And in the "consumer-client" container a single file with the content 
"last_will_message" is placed in the "/tmp/output" directory in less than 60 
seconds
 
   Scenario: A MiNiFi instance uses record reader and writer to convert 
consumed message from an MQTT broker
     Given a XMLReader controller service is set up
-    And a JsonRecordSetWriter controller service is set up with "Array" output 
grouping
-    And a ConsumeMQTT processor with the "Topic" property set to 
"test/my/topic"
+    And a JsonRecordSetWriter controller service is set up and the "Output 
Grouping" property set to "Array"
+    And a ConsumeMQTT processor set up to communicate with an MQTT broker 
instance
+    And the "Topic" property of the ConsumeMQTT processor is set to 
"test/my/topic"
     And the "MQTT Version" property of the ConsumeMQTT processor is set to 
"3.x AUTO"
     And the "Record Reader" property of the ConsumeMQTT processor is set to 
"XMLReader"
     And the "Record Writer" property of the ConsumeMQTT processor is set to 
"JsonRecordSetWriter"
     And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And PutFile is EVENT_DRIVEN
     And a LogAttribute processor
+    And LogAttribute is EVENT_DRIVEN
     And the "success" relationship of the ConsumeMQTT processor is connected 
to the PutFile
     And the "success" relationship of the PutFile processor is connected to 
the LogAttribute
-    And an MQTT broker is set up in correspondence with the ConsumeMQTT
+    And PutFile's success relationship is auto-terminated
+
+    And an MQTT broker is started
 
-    When both instances start up
+    When the MiNiFi instance starts up
     And a test message "<root><element>test</element></root>" is published to 
the MQTT broker on topic "test/my/topic"
 
     Then the MQTT broker has a log line matching "Received SUBSCRIBE from 
consumer-client"
-    And a flowfile with the JSON content '[{"_isRetained": false, 
"_isDuplicate": false, "_qos": 0, "_topicSegments": ["test", "my", "topic"], 
"_topic": "test/my/topic", "element": "test"}]' is placed in the monitored 
directory in less than 60 seconds
+    And a file with the JSON content '[{"_isRetained": false, "_isDuplicate": 
false, "_qos": 0, "_topicSegments": ["test", "my", "topic"], "_topic": 
"test/my/topic", "element": "test"}]' is placed in the '/tmp/output' directory 
in less than 60 seconds
     And the Minifi logs contain the following message: "key:record.count 
value:1" in less than 60 seconds
     And the Minifi logs contain the following message: "key:mqtt.broker 
value:mqtt-broker-" in less than 1 seconds
 
   Scenario: A MiNiFi instance uses record reader and writer to convert and 
publish records to an MQTT broker
     Given a JsonTreeReader controller service is set up
     And a XMLRecordSetWriter controller service is set up
-    And the "Name of Record Tag" property of the XMLRecordSetWriter controller 
is set to "record"
-    And the "Name of Root Tag" property of the XMLRecordSetWriter controller 
is set to "root"
+    And the "Name of Record Tag" property of the XMLRecordSetWriter controller 
service is set to "record"
+    And the "Name of Root Tag" property of the XMLRecordSetWriter controller 
service is set to "root"
     And a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
     And a file with the content '[{"string": "test"}, {"int": 42}]' is present 
in '/tmp/input'
     And a PublishMQTT processor set up to communicate with an MQTT broker 
instance
+    And PublishMQTT is EVENT_DRIVEN
     And the "MQTT Version" property of the PublishMQTT processor is set to 
"3.x AUTO"
     And the "Record Reader" property of the PublishMQTT processor is set to 
"JsonTreeReader"
     And the "Record Writer" property of the PublishMQTT processor is set to 
"XMLRecordSetWriter"
     And a UpdateAttribute processor with the "filename" property set to 
"${UUID()}.xml"
+    And UpdateAttribute is EVENT_DRIVEN
     And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And PutFile is EVENT_DRIVEN
     And the "success" relationship of the GetFile processor is connected to 
the PublishMQTT
     And the "success" relationship of the PublishMQTT processor is connected 
to the UpdateAttribute
     And the "success" relationship of the UpdateAttribute processor is 
connected to the PutFile
-    And an MQTT broker is set up in correspondence with the PublishMQTT
+    And PutFile's success relationship is auto-terminated
 
-    When both instances start up
+    And an MQTT broker is started
+
+    When the MiNiFi instance starts up
 
-    Then two flowfiles with the contents '<?xml 
version="1.0"?><root><record><string>test</string></record></root>' and '<?xml 
version="1.0"?><root><record><int>42</int></record></root>' are placed in the 
monitored directory in less than 60 seconds
+    Then files with contents '<?xml 
version="1.0"?><root><record><string>test</string></record></root>' and '<?xml 
version="1.0"?><root><record><int>42</int></record></root>' are placed in the 
'/tmp/output' directory in less than 60 seconds
     And the MQTT broker has a log line matching "Received PUBLISH from 
.*testtopic.*\(72 bytes\)"
     And the MQTT broker has a log line matching "Received PUBLISH from 
.*testtopic.*\(64 bytes\)"
diff --git a/docker/test/integration/cluster/checkers/MqttHelper.py 
b/extensions/mqtt/tests/features/resources/publish_mqtt_message.py
similarity index 62%
rename from docker/test/integration/cluster/checkers/MqttHelper.py
rename to extensions/mqtt/tests/features/resources/publish_mqtt_message.py
index 719911c97..6fe709acf 100644
--- a/docker/test/integration/cluster/checkers/MqttHelper.py
+++ b/extensions/mqtt/tests/features/resources/publish_mqtt_message.py
@@ -13,11 +13,19 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import paho.mqtt.client as mqtt
+import sys
 
 
-class MqttHelper:
-    def publish_test_mqtt_message(self, topic: str, message: str):
-        client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, 
"docker_test_client_id")
-        client.connect("localhost", 1883, 60)
-        client.publish(topic, message)
-        client.disconnect()
+def publish_test_mqtt_message(host: str, topic: str, message: str):
+    client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, 
"docker_test_client_id")
+    client.connect(host, 1883, 60)
+    client.publish(topic, message)
+    client.disconnect()
+
+
+if __name__ == "__main__":
+    if sys.argv.__len__() != 4:
+        print("Usage: publish_mqtt_message.py <host> <topic> <message>")
+        sys.exit(1)
+
+    publish_test_mqtt_message(sys.argv[1], sys.argv[2], sys.argv[3])
diff --git a/extensions/mqtt/tests/features/steps/mqtt_broker_container.py 
b/extensions/mqtt/tests/features/steps/mqtt_broker_container.py
new file mode 100644
index 000000000..091c12c77
--- /dev/null
+++ b/extensions/mqtt/tests/features/steps/mqtt_broker_container.py
@@ -0,0 +1,63 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import re
+from textwrap import dedent
+
+from minifi_test_framework.containers.container import Container
+from minifi_test_framework.core.helpers import wait_for_condition
+from minifi_test_framework.core.minifi_test_context import MinifiTestContext
+from minifi_test_framework.containers.docker_image_builder import 
DockerImageBuilder
+from docker.errors import ContainerError
+
+
+class MqttBrokerContainer(Container):
+    def __init__(self, test_context: MinifiTestContext):
+        dockerfile = dedent("""\
+            FROM {base_image}
+            RUN echo 'log_dest stderr' >> /mosquitto-no-auth.conf
+            CMD ["/usr/sbin/mosquitto", "--verbose", "--config-file", 
"/mosquitto-no-auth.conf"]
+            """.format(base_image='eclipse-mosquitto:2.1.1-alpine'))
+
+        builder = DockerImageBuilder(
+            image_tag="minifi-mqtt-broker:latest",
+            dockerfile_content=dockerfile
+        )
+        builder.build()
+
+        super().__init__("minifi-mqtt-broker:latest", 
f"mqtt-broker-{test_context.scenario_id}", test_context.network)
+
+    def deploy(self):
+        super().deploy()
+        finished_str = "mosquitto version [0-9\\.]+ running"
+        return wait_for_condition(
+            condition=lambda: re.search(finished_str, self.get_logs()),
+            timeout_seconds=60,
+            bail_condition=lambda: self.exited,
+            context=None)
+
+    def publish_mqtt_message(self, topic: str, message: str):
+        try:
+            self.client.containers.run("minifi-mqtt-helper:latest", ["python", 
"/scripts/publish_mqtt_message.py", self.container_name, topic, message], 
remove=True, stdout=True, stderr=True, network=self.network.name)
+            return True
+        except ContainerError as e:
+            stdout = e.stdout.decode("utf-8", errors="replace") if hasattr(e, 
"stdout") and e.stdout else ""
+            stderr = e.stderr.decode("utf-8", errors="replace") if hasattr(e, 
"stderr") and e.stderr else ""
+            logging.error(f"Failed to publish mqtt message in mqtt helper 
docker with error: '{e}', stdout: '{stdout}', stderr: '{stderr}'")
+            return False
+        except Exception as e:
+            logging.error(f"Unexpected error while publishing mqtt message in 
mqtt helper docker: '{e}'")
+            return False
diff --git a/extensions/mqtt/tests/features/steps/steps.py 
b/extensions/mqtt/tests/features/steps/steps.py
new file mode 100644
index 000000000..4bff1b2a1
--- /dev/null
+++ b/extensions/mqtt/tests/features/steps/steps.py
@@ -0,0 +1,76 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import re
+from behave import given, step, then, when
+
+from minifi_test_framework.steps import checking_steps        # noqa: F401
+from minifi_test_framework.steps import configuration_steps   # noqa: F401
+from minifi_test_framework.steps import core_steps            # noqa: F401
+from minifi_test_framework.steps import flow_building_steps   # noqa: F401
+from minifi_test_framework.core.minifi_test_context import 
DEFAULT_MINIFI_CONTAINER_NAME, MinifiTestContext
+from minifi_test_framework.minifi.processor import Processor
+from minifi_test_framework.core.helpers import wait_for_condition
+
+from mqtt_broker_container import MqttBrokerContainer
+
+
+@given("a {processor_name} processor set up to communicate with an MQTT broker 
instance in the \"{container_name}\" flow")
+def step_impl(context: MinifiTestContext, processor_name: str, container_name: 
str):
+    processor = Processor(processor_name, processor_name)
+    processor.add_property('Broker URI', 
f'mqtt-broker-{context.scenario_id}:1883')
+    processor.add_property('Topic', 'testtopic')
+    if processor_name == 'PublishMQTT':
+        processor.add_property('Client ID', 'publisher-client')
+    elif processor_name == 'ConsumeMQTT':
+        processor.add_property('Client ID', 'consumer-client')
+    else:
+        raise ValueError(f"Unknown processor to communicate with MQTT broker: 
{processor_name}")
+
+    
context.get_or_create_minifi_container(container_name).flow_definition.add_processor(processor)
+
+
+@given("a {processor_name} processor set up to communicate with an MQTT broker 
instance")
+def step_impl(context: MinifiTestContext, processor_name: str):
+    context.execute_steps(f'given a {processor_name} processor set up to 
communicate with an MQTT broker instance in the 
"{DEFAULT_MINIFI_CONTAINER_NAME}" flow')
+
+
+@step("an MQTT broker is started")
+def step_impl(context: MinifiTestContext):
+    context.containers["mqtt-broker"] = MqttBrokerContainer(context)
+    assert context.containers["mqtt-broker"].deploy()
+
+
+@then('the MQTT broker has a log line matching "{log_line_regex}"')
+def step_impl(context: MinifiTestContext, log_line_regex: str):
+    assert wait_for_condition(
+        condition=lambda: re.search(log_line_regex, 
context.containers["mqtt-broker"].get_logs()),
+        timeout_seconds=60,
+        bail_condition=lambda: context.containers["mqtt-broker"].exited,
+        context=None)
+
+
+@then('the MQTT broker has {log_count:d} log lines matching 
"{log_line_regex}"')
+def step_impl(context: MinifiTestContext, log_count: int, log_line_regex: str):
+    assert wait_for_condition(
+        condition=lambda: len(re.findall(log_line_regex, 
context.containers["mqtt-broker"].get_logs())) == log_count,
+        timeout_seconds=60,
+        bail_condition=lambda: context.containers["mqtt-broker"].exited,
+        context=None)
+
+
+@when("a test message \"{message}\" is published to the MQTT broker on topic 
\"{topic}\"")
+def step_impl(context: MinifiTestContext, message: str, topic: str):
+    assert context.containers["mqtt-broker"].publish_mqtt_message(topic, 
message)


Reply via email to