This is an automated email from the ASF dual-hosted git repository.

fgerlits pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit a1ee3af66307c25c18e5ebe987505c7639f6f64a
Author: Gabor Gyimesi <[email protected]>
AuthorDate: Mon Oct 27 14:46:02 2025 +0100

    MINIFICPP-2603 Add Record Reader and Record Writer properties to MQTT 
processors
    
    Signed-off-by: Ferenc Gerlits <[email protected]>
    
    Closes #2004
---
 PROCESSORS.md                                      |  20 +-
 docker/requirements.txt                            |   1 +
 .../test/integration/cluster/DockerTestCluster.py  |   5 +
 .../checkers/MqttHelper.py}                        |  28 +-
 .../cluster/containers/FlowContainer.py            |   6 +
 .../cluster/containers/MqttBrokerContainer.py      |   1 +
 .../features/MiNiFi_integration_test_driver.py     |   3 +
 docker/test/integration/features/mqtt.feature      |  53 +++
 docker/test/integration/features/steps/steps.py    |  43 ++-
 .../XMLRecordSetWriter.py}                         |  24 +-
 .../integration/minifi/core/ControllerService.py   |   3 +
 .../mqtt/processors/AbstractMQTTProcessor.cpp      |  14 +
 extensions/mqtt/processors/AbstractMQTTProcessor.h |  18 +-
 extensions/mqtt/processors/ConsumeMQTT.cpp         |  78 ++++-
 extensions/mqtt/processors/ConsumeMQTT.h           |  42 ++-
 extensions/mqtt/processors/PublishMQTT.cpp         |  59 +++-
 extensions/mqtt/processors/PublishMQTT.h           |  13 +-
 extensions/mqtt/tests/ConsumeMQTTTests.cpp         | 375 ++++++++++++++++-----
 extensions/mqtt/tests/PublishMQTTTests.cpp         | 134 ++++++--
 .../processors/ConvertRecord.cpp                   |  13 +-
 .../standard-processors/processors/ConvertRecord.h |   7 +-
 .../standard-processors/processors/SplitRecord.cpp |  13 +-
 .../standard-processors/processors/SplitRecord.h   |   8 +-
 .../minifi-cpp/controllers/RecordConverter.h       |  32 ++
 24 files changed, 770 insertions(+), 223 deletions(-)

diff --git a/PROCESSORS.md b/PROCESSORS.md
index 2ce4022ec..1fc8af238 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -392,7 +392,7 @@ In the list below, the names of required properties appear 
in bold. Any other pr
 
 ### Description
 
-This Processor gets the contents of a FlowFile from a MQTT broker for a 
specified topic. The the payload of the MQTT message becomes content of a 
FlowFile
+This Processor gets the contents of a FlowFile from a MQTT broker for a 
specified topic. The the payload of the MQTT message becomes content of a 
FlowFile. If Record Reader and Record Writer are set, then the MQTT message 
specific attributes are not set in the flow file, because different attributes 
can be set for different records. In this case if Add Attributes As Fields is 
set to true, the attributes will be added to each record as fields.
 
 ### Properties
 
@@ -411,6 +411,9 @@ In the list below, the names of required properties appear 
in bold. Any other pr
 | Attribute From Content Type |               |                                
      | Name of FlowFile attribute to be filled from content type of received 
message. MQTT 5.x only.                                                         
    |
 | Topic Alias Maximum         | 0             |                                
      | Maximum number of topic aliases to use. If set to 0, then topic aliases 
cannot be used. MQTT 5.x only.                                                  
  |
 | Receive Maximum             | 65535         |                                
      | Maximum number of unacknowledged messages allowed. MQTT 5.x only.       
                                                                                
  |
+| Record Reader               |               |                                
      | The Record Reader to use for parsing received MQTT Messages into 
Records.                                                                        
         |
+| Record Writer               |               |                                
      | The Record Writer to use for serializing Records before writing them to 
a FlowFile.                                                                     
  |
+| Add Attributes As Fields    | true          | true<br/>false                 
      | If setting this property to true, default fields are going to be added 
in each record: _topic, _qos, _isDuplicate, _isRetained.                        
   |
 | **Quality of Service**      | 0             | 0<br/>1<br/>2                  
      | The Quality of Service (QoS) of messages.                               
                                                                                
  |
 | Connection Timeout          | 10 sec        |                                
      | Maximum time interval the client will wait for the network connection 
to the MQTT broker                                                              
    |
 | Keep Alive Interval         | 60 sec        |                                
      | Defines the maximum time interval between messages sent or received     
                                                                                
  |
@@ -435,10 +438,15 @@ In the list below, the names of required properties 
appear in bold. Any other pr
 
 ### Output Attributes
 
-| Attribute   | Relationship | Description               |
-|-------------|--------------|---------------------------|
-| mqtt.broker |              | URI of the sending broker |
-| mqtt.topic  |              | Topic of the message      |
+| Attribute            | Relationship | Description                            
                                                                                
               |
+|----------------------|--------------|---------------------------------------------------------------------------------------------------------------------------------------|
+| mqtt.broker          |              | URI of the sending broker              
                                                                                
               |
+| mqtt.topic           |              | Topic of the message                   
                                                                                
               |
+| mqtt.topic.segment.n |              | The nth topic segment of the message   
                                                                                
               |
+| mqtt.qos             |              | The quality of service for this 
message.                                                                        
                      |
+| mqtt.isDuplicate     |              | Whether or not this message might be a 
duplicate of one which has already been received.                               
               |
+| mqtt.isRetained      |              | Whether or not this message was from a 
current publisher, or was "retained" by the server as the last message 
published on the topic. |
+| record.count         |              | The number of records received         
                                                                                
               |
 
 
 ## ConsumeWindowsEventLog
@@ -2198,6 +2206,8 @@ In the list below, the names of required properties 
appear in bold. Any other pr
 | Retain                  | false         | true<br/>false                     
  | Retain published message in broker                                          
                                                |
 | Message Expiry Interval |               |                                    
  | Time while message is valid and will be forwarded by broker. MQTT 5.x only. 
                                                |
 | Content Type            |               |                                    
  | Content type of the message. MQTT 5.x only.<br/>**Supports Expression 
Language: true**                                      |
+| Record Reader           |               |                                    
  | The Record Reader to use for parsing the incoming FlowFile into Records.    
                                                |
+| Record Writer           |               |                                    
  | The Record Writer to use for serializing Records before publishing them as 
an MQTT Message.                                 |
 | **Quality of Service**  | 0             | 0<br/>1<br/>2                      
  | The Quality of Service (QoS) of messages.                                   
                                                |
 | Connection Timeout      | 10 sec        |                                    
  | Maximum time interval the client will wait for the network connection to 
the MQTT broker                                    |
 | Keep Alive Interval     | 60 sec        |                                    
  | Defines the maximum time interval between messages sent or received         
                                                |
diff --git a/docker/requirements.txt b/docker/requirements.txt
index 0fcf30a8f..2a2da597a 100644
--- a/docker/requirements.txt
+++ b/docker/requirements.txt
@@ -11,3 +11,4 @@ prometheus-api-client==0.5.5
 humanfriendly==10.0
 requests<2.29  # https://github.com/docker/docker-py/issues/3113
 couchbase==4.3.5
+paho-mqtt==2.1.0
diff --git a/docker/test/integration/cluster/DockerTestCluster.py 
b/docker/test/integration/cluster/DockerTestCluster.py
index aac42d4aa..620f84267 100644
--- a/docker/test/integration/cluster/DockerTestCluster.py
+++ b/docker/test/integration/cluster/DockerTestCluster.py
@@ -37,6 +37,7 @@ from .checkers.SplunkChecker import SplunkChecker
 from .checkers.GrafanaLokiChecker import GrafanaLokiChecker
 from .checkers.ModbusChecker import ModbusChecker
 from .checkers.CouchbaseChecker import CouchbaseChecker
+from .checkers.MqttHelper import MqttHelper
 from utils import get_peak_memory_usage, get_minifi_pid, get_memory_usage, 
retry_check
 
 
@@ -58,6 +59,7 @@ class DockerTestCluster:
         self.modbus_checker = ModbusChecker(self.container_communicator)
         self.couchbase_checker = CouchbaseChecker()
         self.kafka_checker = KafkaHelper(self.container_communicator, 
feature_id)
+        self.mqtt_helper = MqttHelper()
 
     def cleanup(self):
         self.container_store.cleanup()
@@ -457,3 +459,6 @@ class DockerTestCluster:
 
     def is_data_present_in_couchbase(self, doc_id: str, bucket_name: str, 
expected_data: str, expected_data_type: str):
         return self.couchbase_checker.is_data_present_in_couchbase(doc_id, 
bucket_name, expected_data, expected_data_type)
+
+    def publish_test_mqtt_message(self, topic: str, message: str):
+        self.mqtt_helper.publish_test_mqtt_message(topic, message)
diff --git a/docker/test/integration/minifi/core/ControllerService.py 
b/docker/test/integration/cluster/checkers/MqttHelper.py
similarity index 58%
copy from docker/test/integration/minifi/core/ControllerService.py
copy to docker/test/integration/cluster/checkers/MqttHelper.py
index 02d32e4a7..719911c97 100644
--- a/docker/test/integration/minifi/core/ControllerService.py
+++ b/docker/test/integration/cluster/checkers/MqttHelper.py
@@ -12,26 +12,12 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import paho.mqtt.client as mqtt
 
 
-import uuid
-import logging
-
-
-class ControllerService(object):
-    def __init__(self, name=None, properties=None):
-
-        self.id = str(uuid.uuid4())
-        self.instance_id = str(uuid.uuid4())
-
-        if name is None:
-            self.name = str(uuid.uuid4())
-            logging.info('Controller service name was not provided; using 
generated name \'%s\'', self.name)
-        else:
-            self.name = name
-
-        if properties is None:
-            properties = {}
-
-        self.properties = properties
-        self.linked_services = []
+class MqttHelper:
+    def publish_test_mqtt_message(self, topic: str, message: str):
+        client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, 
"docker_test_client_id")
+        client.connect("localhost", 1883, 60)
+        client.publish(topic, message)
+        client.disconnect()
diff --git a/docker/test/integration/cluster/containers/FlowContainer.py 
b/docker/test/integration/cluster/containers/FlowContainer.py
index 5b5dabe71..b3ac65c68 100644
--- a/docker/test/integration/cluster/containers/FlowContainer.py
+++ b/docker/test/integration/cluster/containers/FlowContainer.py
@@ -47,6 +47,12 @@ class FlowContainer(Container):
     def add_controller(self, controller):
         self.controllers.append(controller)
 
+    def get_controller(self, name):
+        for controller in self.controllers:
+            if controller.name == name:
+                return controller
+        raise ValueError(f"Controller with name '{name}' not found")
+
     def add_parameter_to_flow_config(self, parameter_context_name, 
parameter_name, parameter_value):
         if parameter_context_name in self.parameter_contexts:
             
self.parameter_contexts[parameter_context_name].append(Parameter(parameter_name,
 parameter_value))
diff --git a/docker/test/integration/cluster/containers/MqttBrokerContainer.py 
b/docker/test/integration/cluster/containers/MqttBrokerContainer.py
index b33c41b7e..faa168f31 100644
--- a/docker/test/integration/cluster/containers/MqttBrokerContainer.py
+++ b/docker/test/integration/cluster/containers/MqttBrokerContainer.py
@@ -34,6 +34,7 @@ class MqttBrokerContainer(Container):
             self.image_store.get_image(self.get_engine()),
             detach=True,
             name=self.name,
+            ports={'1883/tcp': 1883},
             network=self.network.name,
             entrypoint=self.command)
         logging.info('Added container \'%s\'', self.name)
diff --git a/docker/test/integration/features/MiNiFi_integration_test_driver.py 
b/docker/test/integration/features/MiNiFi_integration_test_driver.py
index 455a8b5ce..e74e1772f 100644
--- a/docker/test/integration/features/MiNiFi_integration_test_driver.py
+++ b/docker/test/integration/features/MiNiFi_integration_test_driver.py
@@ -517,3 +517,6 @@ class MiNiFi_integration_test:
 
     def check_is_data_present_on_couchbase(self, doc_id: str, bucket_name: 
str, expected_data: str, expected_data_type: str):
         assert self.cluster.is_data_present_in_couchbase(doc_id, bucket_name, 
expected_data, expected_data_type)
+
+    def publish_test_mqtt_message(self, topic, message):
+        self.cluster.publish_test_mqtt_message(topic, message)
diff --git a/docker/test/integration/features/mqtt.feature 
b/docker/test/integration/features/mqtt.feature
index 17300956e..64ef62f6b 100644
--- a/docker/test/integration/features/mqtt.feature
+++ b/docker/test/integration/features/mqtt.feature
@@ -90,8 +90,10 @@ Feature: Sending data to MQTT streaming platform using 
PublishMQTT
     And a ConsumeMQTT processor set up to communicate with an MQTT broker 
instance
     And the "MQTT Version" property of the ConsumeMQTT processor is set to 
"<version>"
     And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And a LogAttribute processor
     And "ConsumeMQTT" processor is a start node
     And the "success" relationship of the ConsumeMQTT processor is connected 
to the PutFile
+    And the "success" relationship of the PutFile processor is connected to 
the LogAttribute
 
     And an MQTT broker is set up in correspondence with the PublishMQTT and 
ConsumeMQTT
 
@@ -101,6 +103,12 @@ Feature: Sending data to MQTT streaming platform using 
PublishMQTT
     And a file with the content "test" is placed in "/tmp/input"
     And a flowfile with the content "test" is placed in the monitored 
directory in less than 60 seconds
     And the MQTT broker has a log line matching "Received PUBLISH from 
.*testtopic.*\(4 bytes\)"
+    And the Minifi logs contain the following message: "key:mqtt.broker 
value:mqtt-broker-" in less than 60 seconds
+    And the Minifi logs contain the following message: "key:mqtt.topic 
value:testtopic" in less than 1 seconds
+    And the Minifi logs contain the following message: 
"key:mqtt.topic.segment.0 value:testtopic" in less than 1 seconds
+    And the Minifi logs contain the following message: "key:mqtt.qos value:0" 
in less than 1 seconds
+    And the Minifi logs contain the following message: "key:mqtt.isDuplicate 
value:false" in less than 1 seconds
+    And the Minifi logs contain the following message: "key:mqtt.isRetained 
value:false" in less than 1 seconds
 
     Examples: MQTT versions
     | version  |
@@ -505,3 +513,48 @@ Feature: Sending data to MQTT streaming platform using 
PublishMQTT
     And "publisher-client" flow is killed
     And the MQTT broker has a log line matching "Sending PUBLISH to 
consumer-client"
     And a flowfile with the content "last_will_message" is placed in the 
monitored directory in less than 60 seconds
+
+  Scenario: A MiNiFi instance uses record reader and writer to convert 
consumed message from an MQTT broker
+    Given a XMLReader controller service is set up
+    And a JsonRecordSetWriter controller service is set up with "Array" output 
grouping
+    And a ConsumeMQTT processor with the "Topic" property set to 
"test/my/topic"
+    And the "MQTT Version" property of the ConsumeMQTT processor is set to 
"3.x AUTO"
+    And the "Record Reader" property of the ConsumeMQTT processor is set to 
"XMLReader"
+    And the "Record Writer" property of the ConsumeMQTT processor is set to 
"JsonRecordSetWriter"
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And a LogAttribute processor
+    And the "success" relationship of the ConsumeMQTT processor is connected 
to the PutFile
+    And the "success" relationship of the PutFile processor is connected to 
the LogAttribute
+    And an MQTT broker is set up in correspondence with the ConsumeMQTT
+
+    When both instances start up
+    And a test message "<root><element>test</element></root>" is published to 
the MQTT broker on topic "test/my/topic"
+
+    Then the MQTT broker has a log line matching "Received SUBSCRIBE from 
consumer-client"
+    And a flowfile with the JSON content '[{"_isRetained": false, 
"_isDuplicate": false, "_qos": 0, "_topicSegments": ["test", "my", "topic"], 
"_topic": "test/my/topic", "element": "test"}]' is placed in the monitored 
directory in less than 60 seconds
+    And the Minifi logs contain the following message: "key:record.count 
value:1" in less than 60 seconds
+    And the Minifi logs contain the following message: "key:mqtt.broker 
value:mqtt-broker-" in less than 1 seconds
+
+  Scenario: A MiNiFi instance uses record reader and writer to convert and 
publish records to an MQTT broker
+    Given a JsonTreeReader controller service is set up
+    And a XMLRecordSetWriter controller service is set up
+    And the "Name of Record Tag" property of the XMLRecordSetWriter controller 
is set to "record"
+    And the "Name of Root Tag" property of the XMLRecordSetWriter controller 
is set to "root"
+    And a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
+    And a file with the content '[{"string": "test"}, {"int": 42}]' is present 
in '/tmp/input'
+    And a PublishMQTT processor set up to communicate with an MQTT broker 
instance
+    And the "MQTT Version" property of the PublishMQTT processor is set to 
"3.x AUTO"
+    And the "Record Reader" property of the PublishMQTT processor is set to 
"JsonTreeReader"
+    And the "Record Writer" property of the PublishMQTT processor is set to 
"XMLRecordSetWriter"
+    And a UpdateAttribute processor with the "filename" property set to 
"${UUID()}.xml"
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And the "success" relationship of the GetFile processor is connected to 
the PublishMQTT
+    And the "success" relationship of the PublishMQTT processor is connected 
to the UpdateAttribute
+    And the "success" relationship of the UpdateAttribute processor is 
connected to the PutFile
+    And an MQTT broker is set up in correspondence with the PublishMQTT
+
+    When both instances start up
+
+    Then two flowfiles with the contents '<?xml 
version="1.0"?><root><record><string>test</string></record></root>' and '<?xml 
version="1.0"?><root><record><int>42</int></record></root>' are placed in the 
monitored directory in less than 60 seconds
+    And the MQTT broker has a log line matching "Received PUBLISH from 
.*testtopic.*\(72 bytes\)"
+    And the MQTT broker has a log line matching "Received PUBLISH from 
.*testtopic.*\(64 bytes\)"
diff --git a/docker/test/integration/features/steps/steps.py 
b/docker/test/integration/features/steps/steps.py
index 129d8df57..590fa06d0 100644
--- a/docker/test/integration/features/steps/steps.py
+++ b/docker/test/integration/features/steps/steps.py
@@ -25,6 +25,8 @@ from minifi.controllers.ODBCService import ODBCService
 from minifi.controllers.KubernetesControllerService import 
KubernetesControllerService
 from minifi.controllers.JsonRecordSetWriter import JsonRecordSetWriter
 from minifi.controllers.JsonTreeReader import JsonTreeReader
+from minifi.controllers.XMLReader import XMLReader
+from minifi.controllers.XMLRecordSetWriter import XMLRecordSetWriter
 from minifi.controllers.CouchbaseClusterService import CouchbaseClusterService
 from minifi.controllers.XMLReader import XMLReader
 
@@ -182,6 +184,12 @@ def step_impl(context, property_name, processor_name, 
property_value):
         processor.set_property(property_name, property_value)
 
 
+@given("the \"{property_name}\" property of the {controller_name} controller 
is set to \"{property_value}\"")
+def step_impl(context, property_name, controller_name, property_value):
+    container = context.test.acquire_container(context=context, 
name="minifi-cpp-flow")
+    container.get_controller(controller_name).set_property(property_name, 
property_value)
+
+
 @given("the \"{property_name}\" properties of the {processor_name_one} and 
{processor_name_two} processors are set to the same random guid")
 def step_impl(context, property_name, processor_name_one, processor_name_two):
     uuid_str = str(uuid.uuid4())
@@ -430,20 +438,30 @@ def step_impl(context, processor_name):
 
 
 # Record set reader and writer
-@given("a JsonRecordSetWriter controller service is set up with \"{}\" output 
grouping")
-def step_impl(context, output_grouping: str):
+@given("a JsonRecordSetWriter controller service is set up with \"{}\" output 
grouping in the \"{minifi_container_name}\" flow")
+def step_impl(context, output_grouping: str, minifi_container_name: str):
     json_record_set_writer = JsonRecordSetWriter(name="JsonRecordSetWriter", 
output_grouping=output_grouping)
-    container = context.test.acquire_container(context=context, 
name="minifi-cpp-flow")
+    container = context.test.acquire_container(context=context, 
name=minifi_container_name)
     container.add_controller(json_record_set_writer)
 
 
-@given("a JsonTreeReader controller service is set up")
-def step_impl(context):
+@given("a JsonTreeReader controller service is set up in the 
\"{minifi_container_name}\" flow")
+def step_impl(context, minifi_container_name: str):
     json_record_set_reader = JsonTreeReader("JsonTreeReader")
-    container = context.test.acquire_container(context=context, 
name="minifi-cpp-flow")
+    container = context.test.acquire_container(context=context, 
name=minifi_container_name)
     container.add_controller(json_record_set_reader)
 
 
+@given("a JsonRecordSetWriter controller service is set up with \"{}\" output 
grouping")
+def step_impl(context, output_grouping: str):
+    context.execute_steps(f"given a JsonRecordSetWriter controller service is 
set up with \"{output_grouping}\" output grouping in the \"minifi-cpp-flow\" 
flow")
+
+
+@given("a JsonTreeReader controller service is set up")
+def step_impl(context):
+    context.execute_steps("given a JsonTreeReader controller service is set up 
in the \"minifi-cpp-flow\" flow")
+
+
 @given("a XMLReader controller service is set up")
 def step_impl(context):
     xml_reader = XMLReader("XMLReader")
@@ -451,6 +469,13 @@ def step_impl(context):
     container.add_controller(xml_reader)
 
 
+@given("a XMLRecordSetWriter controller service is set up")
+def step_impl(context):
+    xml_record_set_writer = XMLRecordSetWriter("XMLRecordSetWriter")
+    container = context.test.acquire_container(context=context, 
name="minifi-cpp-flow")
+    container.add_controller(xml_record_set_writer)
+
+
 # Kubernetes
 def __set_up_the_kubernetes_controller_service(context, processor_name, 
service_property_name, properties):
     kubernetes_controller_service = KubernetesControllerService("Kubernetes 
Controller Service", properties)
@@ -775,6 +800,7 @@ def step_impl(context, content, duration):
 
 
 @then("two flowfiles with the contents \"{content_1}\" and \"{content_2}\" are 
placed in the monitored directory in less than {duration}")
+@then("two flowfiles with the contents '{content_1}' and '{content_2}' are 
placed in the monitored directory in less than {duration}")
 def step_impl(context, content_1, content_2, duration):
     context.test.check_for_multiple_files_generated(2, 
humanfriendly.parse_timespan(duration), [content_1, content_2])
 
@@ -933,6 +959,11 @@ def step_impl(context, log_count, log_pattern):
     context.test.check_container_log_matches_regex('mqtt-broker', log_pattern, 
60, count=int(log_count))
 
 
+@when("a test message \"{message}\" is published to the MQTT broker on topic 
\"{topic}\"")
+def step_impl(context, message, topic):
+    context.test.publish_test_mqtt_message(topic, message)
+
+
 @then("the \"{minifi_container_name}\" flow has a log line matching 
\"{log_pattern}\" in less than {duration}")
 def step_impl(context, minifi_container_name, log_pattern, duration):
     context.test.check_container_log_matches_regex(minifi_container_name, 
log_pattern, humanfriendly.parse_timespan(duration), count=1)
diff --git a/docker/test/integration/minifi/core/ControllerService.py 
b/docker/test/integration/minifi/controllers/XMLRecordSetWriter.py
similarity index 58%
copy from docker/test/integration/minifi/core/ControllerService.py
copy to docker/test/integration/minifi/controllers/XMLRecordSetWriter.py
index 02d32e4a7..82c2df6f3 100644
--- a/docker/test/integration/minifi/core/ControllerService.py
+++ b/docker/test/integration/minifi/controllers/XMLRecordSetWriter.py
@@ -14,24 +14,10 @@
 # limitations under the License.
 
 
-import uuid
-import logging
+from ..core.ControllerService import ControllerService
 
 
-class ControllerService(object):
-    def __init__(self, name=None, properties=None):
-
-        self.id = str(uuid.uuid4())
-        self.instance_id = str(uuid.uuid4())
-
-        if name is None:
-            self.name = str(uuid.uuid4())
-            logging.info('Controller service name was not provided; using 
generated name \'%s\'', self.name)
-        else:
-            self.name = name
-
-        if properties is None:
-            properties = {}
-
-        self.properties = properties
-        self.linked_services = []
+class XMLRecordSetWriter(ControllerService):
+    def __init__(self, name=None):
+        super(XMLRecordSetWriter, self).__init__(name=name)
+        self.service_class = 'XMLRecordSetWriter'
diff --git a/docker/test/integration/minifi/core/ControllerService.py 
b/docker/test/integration/minifi/core/ControllerService.py
index 02d32e4a7..bc26c27bc 100644
--- a/docker/test/integration/minifi/core/ControllerService.py
+++ b/docker/test/integration/minifi/core/ControllerService.py
@@ -35,3 +35,6 @@ class ControllerService(object):
 
         self.properties = properties
         self.linked_services = []
+
+    def set_property(self, name, value):
+        self.properties[name] = value
diff --git a/extensions/mqtt/processors/AbstractMQTTProcessor.cpp 
b/extensions/mqtt/processors/AbstractMQTTProcessor.cpp
index 3fc5e9f66..4d928fafe 100644
--- a/extensions/mqtt/processors/AbstractMQTTProcessor.cpp
+++ b/extensions/mqtt/processors/AbstractMQTTProcessor.cpp
@@ -109,6 +109,20 @@ void 
AbstractMQTTProcessor::onSchedule(core::ProcessContext& context, core::Proc
   readProperties(context);
   checkProperties(context);
   initializeClient();
+
+  auto record_set_reader = 
utils::parseOptionalControllerService<core::RecordSetReader>(context, 
RecordReader, getUUID());
+  auto record_set_writer = 
utils::parseOptionalControllerService<core::RecordSetWriter>(context, 
RecordWriter, getUUID());
+
+  if ((record_set_reader == nullptr) != (record_set_writer == nullptr)) {
+    throw Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "MQTT processor 
requires both or neither Record Reader and Record Writer to be set");
+  }
+
+  if (record_set_reader) {
+    record_converter_ = core::RecordConverter{
+      .record_set_reader = gsl::make_not_null(std::move(record_set_reader)),
+      .record_set_writer = gsl::make_not_null(std::move(record_set_writer)),
+    };
+  }
 }
 
 void AbstractMQTTProcessor::initializeClient() {
diff --git a/extensions/mqtt/processors/AbstractMQTTProcessor.h 
b/extensions/mqtt/processors/AbstractMQTTProcessor.h
index 303c0f492..5b7a8def6 100644
--- a/extensions/mqtt/processors/AbstractMQTTProcessor.h
+++ b/extensions/mqtt/processors/AbstractMQTTProcessor.h
@@ -23,6 +23,7 @@
 #include <vector>
 #include <shared_mutex>
 #include <future>
+#include <optional>
 
 #include "minifi-cpp/core/PropertyDefinition.h"
 #include "core/ProcessorImpl.h"
@@ -32,6 +33,7 @@
 #include "core/logging/LoggerFactory.h"
 #include "utils/Enum.h"
 #include "MQTTAsync.h"
+#include "minifi-cpp/controllers/RecordConverter.h"
 
 namespace org::apache::nifi::minifi::processors::mqtt {
 enum class MqttVersions {
@@ -166,6 +168,14 @@ class AbstractMQTTProcessor : public core::ProcessorImpl {
       .withDescription("Private key passphrase")
       .isSensitive(true)
       .build();
+  EXTENSIONAPI static constexpr auto RecordReader = 
core::PropertyDefinitionBuilder<>::createProperty("Record Reader")
+      .withDescription("The Record Reader to use for parsing received MQTT 
Messages into Records.")
+      .withAllowedTypes<minifi::core::RecordSetReader>()
+      .build();
+  EXTENSIONAPI static constexpr auto RecordWriter = 
core::PropertyDefinitionBuilder<>::createProperty("Record Writer")
+      .withDescription("The Record Writer to use for serializing Records 
before writing them to a FlowFile.")
+      .withAllowedTypes<minifi::core::RecordSetWriter>()
+      .build();
   EXTENSIONAPI static constexpr auto BasicProperties = 
std::to_array<core::PropertyReference>({
       BrokerURI,
       ClientID,
@@ -186,7 +196,9 @@ class AbstractMQTTProcessor : public core::ProcessorImpl {
       SecurityCA,
       SecurityCert,
       SecurityPrivateKey,
-      SecurityPrivateKeyPassword
+      SecurityPrivateKeyPassword,
+      RecordReader,
+      RecordWriter
   });
 
   void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& 
factory) override;
@@ -256,13 +268,15 @@ class AbstractMQTTProcessor : public core::ProcessorImpl {
   std::optional<std::chrono::seconds> maximum_session_expiry_interval_;
   std::optional<std::chrono::seconds> server_keep_alive_;
 
+  std::optional<core::RecordConverter> record_converter_;
+
  private:
   using ConnectFinishedTask = std::packaged_task<void(MQTTAsync_successData*, 
MQTTAsync_successData5*, MQTTAsync_failureData*, MQTTAsync_failureData5*)>;
 
   /**
    * Initializes local MQTT client and connects to broker.
    */
-  void initializeClient();
+  virtual void initializeClient();
 
   /**
    * Calls disconnect() and releases local MQTT client
diff --git a/extensions/mqtt/processors/ConsumeMQTT.cpp 
b/extensions/mqtt/processors/ConsumeMQTT.cpp
index dd66c8913..5b5405a0a 100644
--- a/extensions/mqtt/processors/ConsumeMQTT.cpp
+++ b/extensions/mqtt/processors/ConsumeMQTT.cpp
@@ -16,7 +16,6 @@
  */
 #include "ConsumeMQTT.h"
 
-
 #include <cinttypes>
 #include <memory>
 #include <set>
@@ -26,9 +25,10 @@
 #include "minifi-cpp/core/ProcessContext.h"
 #include "core/ProcessSession.h"
 #include "core/Resource.h"
+#include "io/BufferStream.h"
+#include "utils/ProcessorConfigUtils.h"
 #include "utils/StringUtils.h"
 #include "utils/ValueParser.h"
-#include "utils/ProcessorConfigUtils.h"
 
 namespace org::apache::nifi::minifi::processors {
 
@@ -37,6 +37,12 @@ void ConsumeMQTT::initialize() {
   setSupportedRelationships(Relationships);
 }
 
+void ConsumeMQTT::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFactory& factory) {
+  AbstractMQTTProcessor::onSchedule(context, factory);
+
+  add_attributes_as_fields_ = utils::parseBoolProperty(context, 
AddAttributesAsFields);
+}
+
 void ConsumeMQTT::enqueueReceivedMQTTMsg(SmartMessage message) {
   if (queue_.size_approx() >= max_queue_size_) {
     logger_->log_error("MQTT queue full");
@@ -58,8 +64,57 @@ void ConsumeMQTT::readProperties(core::ProcessContext& 
context) {
   receive_maximum_ = gsl::narrow<uint16_t>(utils::parseU64Property(context, 
ReceiveMaximum));
 }
 
-void ConsumeMQTT::onTriggerImpl(core::ProcessContext&, core::ProcessSession& 
session) {
-  std::queue<SmartMessage> msg_queue = getReceivedMqttMessages();
+void ConsumeMQTT::addAttributesAsRecordFields(core::RecordSet& new_records, 
const SmartMessage& message) const {
+  if (!add_attributes_as_fields_) {
+    return;
+  }
+
+  for (auto& record : new_records) {
+    record.emplace("_topic", core::RecordField(message.topic));
+    auto topic_segments = utils::string::split(message.topic, "/");
+    core::RecordArray topic_segments_array;
+    for (const auto& topic_segment : topic_segments) {
+      topic_segments_array.emplace_back(core::RecordField(topic_segment));
+    }
+    record.emplace("_topicSegments", 
core::RecordField(std::move(topic_segments_array)));
+    record.emplace("_qos", core::RecordField(message.contents->qos));
+    record.emplace("_isDuplicate", core::RecordField(message.contents->dup > 
0));
+    record.emplace("_isRetained", core::RecordField(message.contents->retained 
> 0));
+  }
+}
+
+void ConsumeMQTT::transferMessagesAsRecords(core::ProcessSession& session) {
+  gsl_Expects(record_converter_);
+  auto msg_queue = getReceivedMqttMessages();
+  core::RecordSet record_set;
+  while (!msg_queue.empty()) {
+    io::BufferStream buffer_stream;
+    buffer_stream.write(reinterpret_cast<const 
uint8_t*>(msg_queue.front().contents->payload), 
gsl::narrow<size_t>(msg_queue.front().contents->payloadlen));
+    auto new_records_result = 
record_converter_->record_set_reader->read(buffer_stream);
+    if (!new_records_result) {
+      logger_->log_error("Failed to read records from MQTT message: {}", 
new_records_result.error());
+      msg_queue.pop();
+      continue;
+    }
+    auto& new_records = new_records_result.value();
+    addAttributesAsRecordFields(new_records, msg_queue.front());
+    record_set.reserve(record_set.size() + new_records.size());
+    record_set.insert(record_set.end(), 
std::make_move_iterator(new_records.begin()), 
std::make_move_iterator(new_records.end()));
+    msg_queue.pop();
+  }
+  if (record_set.empty()) {
+    logger_->log_debug("No records to write, skipping FlowFile creation");
+    return;
+  }
+  std::shared_ptr<core::FlowFile> flow_file = session.create();
+  record_converter_->record_set_writer->write(record_set, flow_file, session);
+  session.putAttribute(*flow_file, RecordCountOutputAttribute.name, 
std::to_string(record_set.size()));
+  session.putAttribute(*flow_file, BrokerOutputAttribute.name, uri_);
+  session.transfer(flow_file, Success);
+}
+
+void ConsumeMQTT::transferMessagesAsFlowFiles(core::ProcessSession& session) {
+  auto msg_queue = getReceivedMqttMessages();
   while (!msg_queue.empty()) {
     const auto& message = msg_queue.front();
     std::shared_ptr<core::FlowFile> flow_file = session.create();
@@ -76,6 +131,13 @@ void ConsumeMQTT::onTriggerImpl(core::ProcessContext&, 
core::ProcessSession& ses
       putUserPropertiesAsAttributes(message, flow_file, session);
       session.putAttribute(*flow_file, BrokerOutputAttribute.name, uri_);
       session.putAttribute(*flow_file, TopicOutputAttribute.name, 
message.topic);
+      auto topic_segments = utils::string::split(message.topic, "/");
+      for (size_t i = 0; i < topic_segments.size(); ++i) {
+        session.putAttribute(*flow_file, "mqtt.topic.segment." + 
std::to_string(i), topic_segments[i]);
+      }
+      session.putAttribute(*flow_file, QosOutputAttribute.name, 
std::to_string(message.contents->qos));
+      session.putAttribute(*flow_file, IsDuplicateOutputAttribute.name, 
message.contents->dup > 0 ? "true" : "false");
+      session.putAttribute(*flow_file, IsRetainedOutputAttribute.name, 
message.contents->retained > 0 ? "true" : "false");
       fillAttributeFromContentType(message, flow_file, session);
       logger_->log_debug("ConsumeMQTT processing success for the flow with 
UUID {} topic {}", flow_file->getUUIDStr(), message.topic);
       session.transfer(flow_file, Success);
@@ -84,6 +146,14 @@ void ConsumeMQTT::onTriggerImpl(core::ProcessContext&, 
core::ProcessSession& ses
   }
 }
 
+void ConsumeMQTT::onTriggerImpl(core::ProcessContext&, core::ProcessSession& 
session) {
+  if (record_converter_) {
+    transferMessagesAsRecords(session);
+  } else {
+    transferMessagesAsFlowFiles(session);
+  }
+}
+
 std::queue<ConsumeMQTT::SmartMessage> ConsumeMQTT::getReceivedMqttMessages() {
   std::queue<SmartMessage> msg_queue;
   SmartMessage message;
diff --git a/extensions/mqtt/processors/ConsumeMQTT.h 
b/extensions/mqtt/processors/ConsumeMQTT.h
index 384fdeed3..b4434d46e 100644
--- a/extensions/mqtt/processors/ConsumeMQTT.h
+++ b/extensions/mqtt/processors/ConsumeMQTT.h
@@ -42,7 +42,8 @@ class ConsumeMQTT : public processors::AbstractMQTTProcessor {
   using AbstractMQTTProcessor::AbstractMQTTProcessor;
 
   EXTENSIONAPI static constexpr const char* Description = "This Processor gets 
the contents of a FlowFile from a MQTT broker for a specified topic. "
-      "The the payload of the MQTT message becomes content of a FlowFile";
+      "The the payload of the MQTT message becomes content of a FlowFile. If 
Record Reader and Record Writer are set, then the MQTT message specific 
attributes are not set in the flow file, "
+      "because different attributes can be set for different records. In this 
case if Add Attributes As Fields is set to true, the attributes will be added 
to each record as fields.";
 
   EXTENSIONAPI static constexpr auto Topic = 
core::PropertyDefinitionBuilder<>::createProperty("Topic")
       .withDescription("The topic to subscribe to.")
@@ -81,6 +82,11 @@ class ConsumeMQTT : public processors::AbstractMQTTProcessor 
{
       
.withValidator(core::StandardPropertyValidators::UNSIGNED_INTEGER_VALIDATOR)
       .withDefaultValue(MQTT_MAX_RECEIVE_MAXIMUM_STR)
       .build();
+  EXTENSIONAPI static constexpr auto AddAttributesAsFields = 
core::PropertyDefinitionBuilder<>::createProperty("Add Attributes As Fields")
+      .withDescription("If setting this property to true, default fields are 
going to be added in each record: _topic, _qos, _isDuplicate, _isRetained.")
+      .withValidator(core::StandardPropertyValidators::BOOLEAN_VALIDATOR)
+      .withDefaultValue("true")
+      .build();
   EXTENSIONAPI static constexpr auto Properties = 
utils::array_cat(AbstractMQTTProcessor::BasicProperties, 
std::to_array<core::PropertyReference>({
       Topic,
       CleanSession,
@@ -89,7 +95,8 @@ class ConsumeMQTT : public processors::AbstractMQTTProcessor {
       QueueBufferMaxMessage,
       AttributeFromContentType,
       TopicAliasMaximum,
-      ReceiveMaximum
+      ReceiveMaximum,
+      AddAttributesAsFields
   }), AbstractMQTTProcessor::AdvancedProperties);
 
   EXTENSIONAPI static constexpr auto Success = 
core::RelationshipDefinition{"success", "FlowFiles that are sent successfully 
to the destination are transferred to this relationship"};
@@ -97,7 +104,15 @@ class ConsumeMQTT : public 
processors::AbstractMQTTProcessor {
 
   EXTENSIONAPI static constexpr auto BrokerOutputAttribute = 
core::OutputAttributeDefinition<0>{"mqtt.broker", {}, "URI of the sending 
broker"};
   EXTENSIONAPI static constexpr auto TopicOutputAttribute = 
core::OutputAttributeDefinition<0>{"mqtt.topic", {}, "Topic of the message"};
-  EXTENSIONAPI static constexpr auto OutputAttributes = 
std::array<core::OutputAttributeReference, 2>{BrokerOutputAttribute, 
TopicOutputAttribute};
+  EXTENSIONAPI static constexpr auto TopicSegmentOutputAttribute = 
core::OutputAttributeDefinition<0>{"mqtt.topic.segment.n", {}, "The nth topic 
segment of the message"};
+  EXTENSIONAPI static constexpr auto QosOutputAttribute = 
core::OutputAttributeDefinition<0>{"mqtt.qos", {}, "The quality of service for 
this message."};
+  EXTENSIONAPI static constexpr auto IsDuplicateOutputAttribute = 
core::OutputAttributeDefinition<0>{"mqtt.isDuplicate", {},
+      "Whether or not this message might be a duplicate of one which has 
already been received."};
+  EXTENSIONAPI static constexpr auto IsRetainedOutputAttribute = 
core::OutputAttributeDefinition<0>{"mqtt.isRetained", {},
+      "Whether or not this message was from a current publisher, or was 
\"retained\" by the server as the last message published on the topic."};
+  EXTENSIONAPI static constexpr auto RecordCountOutputAttribute = 
core::OutputAttributeDefinition<0>{"record.count", {}, "The number of records 
received"};
+  EXTENSIONAPI static constexpr auto OutputAttributes = 
std::to_array<core::OutputAttributeReference>({BrokerOutputAttribute, 
TopicOutputAttribute, TopicSegmentOutputAttribute,
+      QosOutputAttribute, IsDuplicateOutputAttribute, 
IsRetainedOutputAttribute, RecordCountOutputAttribute});
 
   EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
   EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
@@ -109,6 +124,15 @@ class ConsumeMQTT : public 
processors::AbstractMQTTProcessor {
   void readProperties(core::ProcessContext& context) override;
   void onTriggerImpl(core::ProcessContext& context, core::ProcessSession& 
session) override;
   void initialize() override;
+  void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& 
factory) override;
+
+ protected:
+    /**
+   * Enqueues received MQTT message into internal message queue.
+   * Called as a callback on a separate thread than onTrigger, as a reaction 
to message incoming.
+   * @param message message to put to queue
+   */
+  void enqueueReceivedMQTTMsg(SmartMessage message);
 
  private:
   class WriteCallback {
@@ -142,13 +166,6 @@ class ConsumeMQTT : public 
processors::AbstractMQTTProcessor {
   void onSubscriptionFailure5(MQTTAsync_failureData5* response);
   void onMessageReceived(SmartMessage smart_message) override;
 
-  /**
-   * Enqueues received MQTT message into internal message queue.
-   * Called as a callback on a separate thread than onTrigger, as a reaction 
to message incoming.
-   * @param message message to put to queue
-   */
-  void enqueueReceivedMQTTMsg(SmartMessage message);
-
   /**
    * Called in onTrigger to return the whole internal message queue
    * @return message queue of messages received since previous onTrigger
@@ -193,6 +210,10 @@ class ConsumeMQTT : public 
processors::AbstractMQTTProcessor {
 
   void setProcessorSpecificMqtt5ConnectOptions(MQTTProperties& connect_props) 
const override;
 
+  void transferMessagesAsRecords(core::ProcessSession& session);
+  void addAttributesAsRecordFields(core::RecordSet& new_records, const 
SmartMessage& message) const;
+  void transferMessagesAsFlowFiles(core::ProcessSession& session);
+
   std::string topic_;
   bool clean_session_ = true;
   bool clean_start_ = true;
@@ -205,6 +226,7 @@ class ConsumeMQTT : public 
processors::AbstractMQTTProcessor {
   std::unordered_map<uint16_t, std::string> alias_to_topic_;
 
   moodycamel::ConcurrentQueue<SmartMessage> queue_;
+  bool add_attributes_as_fields_ = true;
 };
 
 }  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/mqtt/processors/PublishMQTT.cpp 
b/extensions/mqtt/processors/PublishMQTT.cpp
index 23a8af251..1952dc057 100644
--- a/extensions/mqtt/processors/PublishMQTT.cpp
+++ b/extensions/mqtt/processors/PublishMQTT.cpp
@@ -56,29 +56,62 @@ void PublishMQTT::readProperties(core::ProcessContext& 
context) {
 }
 
 void PublishMQTT::onTriggerImpl(core::ProcessContext& context, 
core::ProcessSession& session) {
-  std::shared_ptr<core::FlowFile> flow_file = session.get();
+  std::shared_ptr<core::FlowFile> original_flow_file = session.get();
 
-  if (!flow_file) {
+  if (!original_flow_file) {
     context.yield();
     return;
   }
 
+  std::vector<std::shared_ptr<core::FlowFile>> flow_files;
+  if (record_converter_) {
+    nonstd::expected<core::RecordSet, std::error_code> record_set;
+    session.read(original_flow_file, [this, &record_set](const 
std::shared_ptr<io::InputStream>& input_stream) {
+      record_set = record_converter_->record_set_reader->read(*input_stream);
+      return gsl::narrow<int64_t>(input_stream->size());
+    });
+
+    if (!record_set) {
+      logger_->log_error("Failed to read FlowFile [{}] as RecordSet, error: 
{}", original_flow_file->getUUIDStr(), record_set.error().message());
+      session.transfer(original_flow_file, Failure);
+      return;
+    }
+
+    for (auto&& record : *record_set) {
+      auto new_flow_file = session.create(original_flow_file.get());
+      if (!new_flow_file) {
+        logger_->log_error("Failed to create new FlowFile from record");
+        continue;
+      }
+      std::vector<core::Record> records;
+      records.emplace_back(std::move(record));
+      record_converter_->record_set_writer->write(records, new_flow_file, 
session);
+      flow_files.push_back(std::move(new_flow_file));
+    }
+
+    session.remove(original_flow_file);
+  } else {
+    flow_files.push_back(original_flow_file);
+  }
+
   // broker's Receive Maximum can change after reconnect
   
in_flight_message_counter_.setMax(broker_receive_maximum_.value_or(MQTT_MAX_RECEIVE_MAXIMUM));
 
-  const auto topic = getTopic(context, flow_file.get());
-  try {
-    const auto result = session.readBuffer(flow_file);
-    if (result.status < 0 || !sendMessage(result.buffer, topic, 
getContentType(context, flow_file.get()), flow_file)) {
-      logger_->log_error("Failed to send flow file [{}] to MQTT topic '{}' on 
broker {}", flow_file->getUUIDStr(), topic, uri_);
+  for (const auto& flow_file : flow_files) {
+    const auto topic = getTopic(context, flow_file.get());
+    try {
+      const auto result = session.readBuffer(flow_file);
+      if (result.status < 0 || !sendMessage(result.buffer, topic, 
getContentType(context, flow_file.get()), flow_file)) {
+        logger_->log_error("Failed to send flow file [{}] to MQTT topic '{}' 
on broker {}", flow_file->getUUIDStr(), topic, uri_);
+        session.transfer(flow_file, Failure);
+        return;
+      }
+      logger_->log_debug("Sent flow file [{}] with length {} to MQTT topic 
'{}' on broker {}", flow_file->getUUIDStr(), result.status, topic, uri_);
+      session.transfer(flow_file, Success);
+    } catch (const Exception& ex) {
+      logger_->log_error("Failed to send flow file [{}] to MQTT topic '{}' on 
broker {}, exception string: '{}'", flow_file->getUUIDStr(), topic, uri_, 
ex.what());
       session.transfer(flow_file, Failure);
-      return;
     }
-    logger_->log_debug("Sent flow file [{}] with length {} to MQTT topic '{}' 
on broker {}", flow_file->getUUIDStr(), result.status, topic, uri_);
-    session.transfer(flow_file, Success);
-  } catch (const Exception& ex) {
-    logger_->log_error("Failed to send flow file [{}] to MQTT topic '{}' on 
broker {}, exception string: '{}'", flow_file->getUUIDStr(), topic, uri_, 
ex.what());
-    session.transfer(flow_file, Failure);
   }
 }
 
diff --git a/extensions/mqtt/processors/PublishMQTT.h 
b/extensions/mqtt/processors/PublishMQTT.h
index c59c4bd3f..4e0145486 100644
--- a/extensions/mqtt/processors/PublishMQTT.h
+++ b/extensions/mqtt/processors/PublishMQTT.h
@@ -86,6 +86,9 @@ class PublishMQTT : public processors::AbstractMQTTProcessor {
   void onTriggerImpl(core::ProcessContext& context, core::ProcessSession& 
session) override;
   void initialize() override;
 
+ protected:
+  virtual bool sendMessage(const std::vector<std::byte>& buffer, const 
std::string& topic, const std::string& content_type, const 
std::shared_ptr<core::FlowFile>& flow_file);
+
  private:
   /**
    * Counts unacknowledged QoS 1 and QoS 2 messages to respect broker's 
Receive Maximum
@@ -134,16 +137,6 @@ class PublishMQTT : public 
processors::AbstractMQTTProcessor {
    */
   std::string getContentType(core::ProcessContext& context, const 
core::FlowFile* const flow_file) const;
 
-  /**
-   * Sends an MQTT message asynchronously
-   * @param buffer contents of the message
-   * @param topic topic of the message
-   * @param content_type Content Type for MQTT 5
-   * @param flow_file Flow File being processed
-   * @return success of message sending
-   */
-  bool sendMessage(const std::vector<std::byte>& buffer, const std::string& 
topic, const std::string& content_type, const std::shared_ptr<core::FlowFile>& 
flow_file);
-
   /**
    * Callback for asynchronous message sending
    * @param success if message sending was successful
diff --git a/extensions/mqtt/tests/ConsumeMQTTTests.cpp 
b/extensions/mqtt/tests/ConsumeMQTTTests.cpp
index 6e22381c6..8b02e2df1 100644
--- a/extensions/mqtt/tests/ConsumeMQTTTests.cpp
+++ b/extensions/mqtt/tests/ConsumeMQTTTests.cpp
@@ -20,139 +20,350 @@
 #include "catch2/matchers/catch_matchers_string.hpp"
 #include "unit/TestBase.h"
 #include "../processors/ConsumeMQTT.h"
+#include "core/Resource.h"
+#include "unit/SingleProcessorTestController.h"
+#include "rapidjson/document.h"
+#include "unit/ProcessorUtils.h"
 
-namespace {
-struct Fixture {
-  Fixture() {
-    
LogTestController::getInstance().setDebug<minifi::processors::ConsumeMQTT>();
-    plan_ = testController_.createPlan();
-    consumeMqttProcessor_ = plan_->addProcessor("ConsumeMQTT", 
"consumeMqttProcessor");
+namespace org::apache::nifi::minifi::test {
+void verifyXmlJsonResult(const std::string& json_content, size_t 
expected_record_count, bool add_attributes_as_fields) {
+  rapidjson::Document document;
+  document.Parse(json_content.c_str());
+  REQUIRE(document.IsArray());
+  REQUIRE(document.GetArray().Size() == expected_record_count);
+  for (size_t i = 0; i < expected_record_count; ++i) {
+    auto& current_record = document[gsl::narrow<rapidjson::SizeType>(i)];
+    REQUIRE(current_record.IsObject());
+    REQUIRE(current_record.HasMember("int_value"));
+    uint64_t int_result = current_record["int_value"].GetInt64();
+    CHECK(int_result == 42);
+    REQUIRE(current_record.HasMember("string_value"));
+    std::string string_result = current_record["string_value"].GetString();
+    CHECK(string_result == "test");
+
+    if (add_attributes_as_fields) {
+      string_result = current_record["_topic"].GetString();
+      CHECK(string_result == "mytopic/segment/" + std::to_string(i));
+      auto array = current_record["_topicSegments"].GetArray();
+      CHECK(array.Size() == 3);
+      string_result = array[0].GetString();
+      CHECK(string_result == "mytopic");
+      string_result = array[1].GetString();
+      CHECK(string_result == "segment");
+      string_result = array[2].GetString();
+      CHECK(string_result == std::to_string(i));
+      int_result = current_record["_qos"].GetInt64();
+      CHECK(int_result == i);
+      bool bool_result = current_record["_isDuplicate"].GetBool();
+      if (i == 0) {
+        CHECK_FALSE(bool_result);
+      } else {
+        CHECK(bool_result);
+      }
+      bool_result = current_record["_isRetained"].GetBool();
+      if (i == 0) {
+        CHECK_FALSE(bool_result);
+      } else {
+        CHECK(bool_result);
+      }
+    } else {
+      CHECK_FALSE(current_record.HasMember("_topic"));
+      CHECK_FALSE(current_record.HasMember("_qos"));
+      CHECK_FALSE(current_record.HasMember("_isDuplicate"));
+      CHECK_FALSE(current_record.HasMember("_isRetained"));
+    }
   }
+}
 
-  Fixture(Fixture&&) = delete;
-  Fixture(const Fixture&) = delete;
-  Fixture& operator=(Fixture&&) = delete;
-  Fixture& operator=(const Fixture&) = delete;
+class TestConsumeMQTTProcessor : public minifi::processors::ConsumeMQTT {
+ public:
+  using SmartMessage = processors::AbstractMQTTProcessor::SmartMessage;
+  using MQTTMessageDeleter = 
processors::AbstractMQTTProcessor::MQTTMessageDeleter;
+  explicit TestConsumeMQTTProcessor(minifi::core::ProcessorMetadata metadata)
+      : minifi::processors::ConsumeMQTT(std::move(metadata)) {}
 
-  ~Fixture() {
+  using ConsumeMQTT::enqueueReceivedMQTTMsg;
+
+  void initializeClient() override {
+  }
+
+  void onTrigger(core::ProcessContext& context, core::ProcessSession& session) 
override {
+    minifi::processors::ConsumeMQTT::onTriggerImpl(context, session);
+  }
+};
+
+REGISTER_RESOURCE(TestConsumeMQTTProcessor, Processor);
+
+struct ConsumeMqttTestFixture {
+  ConsumeMqttTestFixture()
+      : 
test_controller_(utils::make_processor<TestConsumeMQTTProcessor>("TestConsumeMQTTProcessor")),
+        consume_mqtt_processor_(test_controller_.getProcessor()) {
+    REQUIRE(consume_mqtt_processor_ != nullptr);
+    LogTestController::getInstance().setDebug<TestConsumeMQTTProcessor>();
+  }
+
+  ConsumeMqttTestFixture(ConsumeMqttTestFixture&&) = delete;
+  ConsumeMqttTestFixture(const ConsumeMqttTestFixture&) = delete;
+  ConsumeMqttTestFixture& operator=(ConsumeMqttTestFixture&&) = delete;
+  ConsumeMqttTestFixture& operator=(const ConsumeMqttTestFixture&) = delete;
+
+  ~ConsumeMqttTestFixture() {
     LogTestController::getInstance().reset();
   }
 
-  TestController testController_;
-  std::shared_ptr<TestPlan> plan_;
-  core::Processor* consumeMqttProcessor_ = nullptr;
+  SingleProcessorTestController test_controller_;
+  core::Processor* consume_mqtt_processor_ = nullptr;
 };
-}  // namespace
 
 using namespace std::literals::chrono_literals;
 
-TEST_CASE_METHOD(Fixture, "ConsumeMQTTTest_EmptyTopic", "[consumeMQTTTest]") {
-  
REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI.name,
 "127.0.0.1:1883"));
-  REQUIRE_THROWS_WITH(plan_->scheduleProcessor(consumeMqttProcessor_),
-      Catch::Matchers::EndsWith("Expected valid value from 
\"consumeMqttProcessor::Topic\", but got PropertyNotSet (Property Error:2)"));
+TEST_CASE_METHOD(ConsumeMqttTestFixture, "ConsumeMQTTTest_EmptyTopic", 
"[consumeMQTTTest]") {
+  REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, 
minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
+  
REQUIRE_THROWS_WITH(test_controller_.plan->scheduleProcessor(consume_mqtt_processor_),
+      Catch::Matchers::EndsWith("Expected valid value from 
\"TestConsumeMQTTProcessor::Topic\", but got PropertyNotSet (Property 
Error:2)"));
 }
 
-TEST_CASE_METHOD(Fixture, "ConsumeMQTTTest_EmptyBrokerURI", 
"[consumeMQTTTest]") {
-  
REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::Topic.name,
 "mytopic"));
-  REQUIRE_THROWS_WITH(plan_->scheduleProcessor(consumeMqttProcessor_),
-      Catch::Matchers::EndsWith("Expected valid value from 
\"consumeMqttProcessor::Broker URI\", but got PropertyNotSet (Property 
Error:2)"));
+TEST_CASE_METHOD(ConsumeMqttTestFixture, "ConsumeMQTTTest_EmptyBrokerURI", 
"[consumeMQTTTest]") {
+  REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, 
minifi::processors::ConsumeMQTT::Topic.name, "mytopic"));
+  
REQUIRE_THROWS_WITH(test_controller_.plan->scheduleProcessor(consume_mqtt_processor_),
+      Catch::Matchers::EndsWith("Expected valid value from 
\"TestConsumeMQTTProcessor::Broker URI\", but got PropertyNotSet (Property 
Error:2)"));
 }
 
-TEST_CASE_METHOD(Fixture, "ConsumeMQTTTest_DurableSessionWithID", 
"[consumeMQTTTest]") {
-  
REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI.name,
 "127.0.0.1:1883"));
-  
REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::ClientID.name,
 "subscriber"));
-  
REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::Topic.name,
 "mytopic"));
-  
REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::QoS.name,
 "1"));
-  
REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::CleanSession.name,
 "false"));
+TEST_CASE_METHOD(ConsumeMqttTestFixture, 
"ConsumeMQTTTest_DurableSessionWithID", "[consumeMQTTTest]") {
+  REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, 
minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
+  REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, 
minifi::processors::AbstractMQTTProcessor::ClientID.name, "subscriber"));
+  REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, 
minifi::processors::ConsumeMQTT::Topic.name, "mytopic"));
+  REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, 
minifi::processors::AbstractMQTTProcessor::QoS.name, "1"));
+  REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, 
minifi::processors::ConsumeMQTT::CleanSession.name, "false"));
 
-  REQUIRE_NOTHROW(plan_->scheduleProcessor(consumeMqttProcessor_));
+  
REQUIRE_NOTHROW(test_controller_.plan->scheduleProcessor(consume_mqtt_processor_));
   REQUIRE_FALSE(LogTestController::getInstance().contains("[warning] Messages 
are not preserved during client disconnection "
-    "by the broker when QoS is less than 1 for durable (non-clean) sessions. 
Only subscriptions are preserved.", 1s));
+    "by the broker when QoS is less than 1 for durable (non-clean) sessions. 
Only subscriptions are preserved.", 0s));
 }
 
-TEST_CASE_METHOD(Fixture, "ConsumeMQTTTest_DurableSessionWithQoS0", 
"[consumeMQTTTest]") {
-  
REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI.name,
 "127.0.0.1:1883"));
-  
REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::ClientID.name,
 "subscriber"));
-  
REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::Topic.name,
 "mytopic"));
-  
REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::QoS.name,
 "0"));
-  
REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::CleanSession.name,
 "false"));
+TEST_CASE_METHOD(ConsumeMqttTestFixture, 
"ConsumeMQTTTest_DurableSessionWithQoS0", "[consumeMQTTTest]") {
+  REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, 
minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
+  REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, 
minifi::processors::AbstractMQTTProcessor::ClientID.name, "subscriber"));
+  REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, 
minifi::processors::ConsumeMQTT::Topic.name, "mytopic"));
+  REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, 
minifi::processors::AbstractMQTTProcessor::QoS.name, "0"));
+  REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, 
minifi::processors::ConsumeMQTT::CleanSession.name, "false"));
 
-  REQUIRE_NOTHROW(plan_->scheduleProcessor(consumeMqttProcessor_));
+  
REQUIRE_NOTHROW(test_controller_.plan->scheduleProcessor(consume_mqtt_processor_));
 
   REQUIRE(LogTestController::getInstance().contains("[warning] Messages are 
not preserved during client disconnection "
     "by the broker when QoS is less than 1 for durable (non-clean) sessions. 
Only subscriptions are preserved.", 1s));
 }
 
-TEST_CASE_METHOD(Fixture, "ConsumeMQTTTest_DurableSessionWithID_V_5", 
"[consumeMQTTTest]") {
-  
REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI.name,
 "127.0.0.1:1883"));
-  
REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::ClientID.name,
 "subscriber"));
-  
REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::Topic.name,
 "mytopic"));
-  
REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::QoS.name,
 "1"));
-  
REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::MqttVersion.name,
 
std::string{magic_enum::enum_name(minifi::processors::mqtt::MqttVersions::V_5_0)}));
-  
REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::SessionExpiryInterval.name,
 "1 h"));
+TEST_CASE_METHOD(ConsumeMqttTestFixture, 
"ConsumeMQTTTest_DurableSessionWithID_V_5", "[consumeMQTTTest]") {
+  REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, 
minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
+  REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, 
minifi::processors::AbstractMQTTProcessor::ClientID.name, "subscriber"));
+  REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, 
minifi::processors::ConsumeMQTT::Topic.name, "mytopic"));
+  REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, 
minifi::processors::AbstractMQTTProcessor::QoS.name, "1"));
+  REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, 
minifi::processors::AbstractMQTTProcessor::MqttVersion.name,
+    
std::string{magic_enum::enum_name(minifi::processors::mqtt::MqttVersions::V_5_0)}));
+  REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, 
minifi::processors::ConsumeMQTT::SessionExpiryInterval.name, "1 h"));
 
-  REQUIRE_NOTHROW(plan_->scheduleProcessor(consumeMqttProcessor_));
+  
REQUIRE_NOTHROW(test_controller_.plan->scheduleProcessor(consume_mqtt_processor_));
   REQUIRE_FALSE(LogTestController::getInstance().contains("[warning] Messages 
are not preserved during client disconnection "
-                                                          "by the broker when 
QoS is less than 1 for durable (Session Expiry Interval > 0) sessions. Only 
subscriptions are preserved.", 1s));
+                                                          "by the broker when 
QoS is less than 1 for durable (Session Expiry Interval > 0) sessions. Only 
subscriptions are preserved.", 0s));
 }
 
-TEST_CASE_METHOD(Fixture, "ConsumeMQTTTest_DurableSessionWithQoS0_V_5", 
"[consumeMQTTTest]") {
-  
REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI.name,
 "127.0.0.1:1883"));
-  
REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::ClientID.name,
 "subscriber"));
-  
REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::Topic.name,
 "mytopic"));
-  
REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::QoS.name,
 "0"));
-  
REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::MqttVersion.name,
 
std::string{magic_enum::enum_name(minifi::processors::mqtt::MqttVersions::V_5_0)}));
-  
REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::SessionExpiryInterval.name,
 "1 h"));
+TEST_CASE_METHOD(ConsumeMqttTestFixture, 
"ConsumeMQTTTest_DurableSessionWithQoS0_V_5", "[consumeMQTTTest]") {
+  REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, 
minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
+  REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, 
minifi::processors::AbstractMQTTProcessor::ClientID.name, "subscriber"));
+  REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, 
minifi::processors::ConsumeMQTT::Topic.name, "mytopic"));
+  REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, 
minifi::processors::AbstractMQTTProcessor::QoS.name, "0"));
+  REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, 
minifi::processors::AbstractMQTTProcessor::MqttVersion.name,
+    
std::string{magic_enum::enum_name(minifi::processors::mqtt::MqttVersions::V_5_0)}));
+  REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, 
minifi::processors::ConsumeMQTT::SessionExpiryInterval.name, "1 h"));
 
-  REQUIRE_NOTHROW(plan_->scheduleProcessor(consumeMqttProcessor_));
+  
REQUIRE_NOTHROW(test_controller_.plan->scheduleProcessor(consume_mqtt_processor_));
 
   REQUIRE(LogTestController::getInstance().contains("[warning] Messages are 
not preserved during client disconnection "
                                                     "by the broker when QoS is 
less than 1 for durable (Session Expiry Interval > 0) sessions. Only 
subscriptions are preserved.", 1s));
 }
 
-TEST_CASE_METHOD(Fixture, "ConsumeMQTTTest_CleanStart_V_3", 
"[consumeMQTTTest]") {
-  
REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI.name,
 "127.0.0.1:1883"));
-  
REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::Topic.name,
 "mytopic"));
-  
REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::CleanStart.name,
 "true"));
+TEST_CASE_METHOD(ConsumeMqttTestFixture, "ConsumeMQTTTest_CleanStart_V_3", 
"[consumeMQTTTest]") {
+  REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, 
minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
+  REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, 
minifi::processors::ConsumeMQTT::Topic.name, "mytopic"));
+  REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, 
minifi::processors::ConsumeMQTT::CleanStart.name, "true"));
 
-  REQUIRE_NOTHROW(plan_->scheduleProcessor(consumeMqttProcessor_));
+  
REQUIRE_NOTHROW(test_controller_.plan->scheduleProcessor(consume_mqtt_processor_));
   REQUIRE(LogTestController::getInstance().contains("[warning] MQTT 3.x 
specification does not support Clean Start. Property is not used.", 1s));
 }
 
-TEST_CASE_METHOD(Fixture, "ConsumeMQTTTest_SessionExpiryInterval_V_3", 
"[consumeMQTTTest]") {
-  
REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI.name,
 "127.0.0.1:1883"));
-  
REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::Topic.name,
 "mytopic"));
-  
REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::SessionExpiryInterval.name,
 "1 h"));
+TEST_CASE_METHOD(ConsumeMqttTestFixture, 
"ConsumeMQTTTest_SessionExpiryInterval_V_3", "[consumeMQTTTest]") {
+  REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, 
minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
+  REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, 
minifi::processors::ConsumeMQTT::Topic.name, "mytopic"));
+  REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, 
minifi::processors::ConsumeMQTT::SessionExpiryInterval.name, "1 h"));
 
-  REQUIRE_NOTHROW(plan_->scheduleProcessor(consumeMqttProcessor_));
+  
REQUIRE_NOTHROW(test_controller_.plan->scheduleProcessor(consume_mqtt_processor_));
   REQUIRE(LogTestController::getInstance().contains("[warning] MQTT 3.x 
specification does not support Session Expiry Intervals. Property is not 
used.", 1s));
 }
 
-TEST_CASE_METHOD(Fixture, "ConsumeMQTTTest_CleanSession_V_5", 
"[consumeMQTTTest]") {
-  
REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI.name,
 "127.0.0.1:1883"));
-  
REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::Topic.name,
 "mytopic"));
-  
REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::MqttVersion.name,
 
std::string{magic_enum::enum_name(minifi::processors::mqtt::MqttVersions::V_5_0)}));
-  
REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::SessionExpiryInterval.name,
 "0 s"));
-  
REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::CleanSession.name,
 "true"));
+TEST_CASE_METHOD(ConsumeMqttTestFixture, "ConsumeMQTTTest_CleanSession_V_5", 
"[consumeMQTTTest]") {
+  REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, 
minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
+  REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, 
minifi::processors::ConsumeMQTT::Topic.name, "mytopic"));
+  REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, 
minifi::processors::AbstractMQTTProcessor::MqttVersion.name,
+    
std::string{magic_enum::enum_name(minifi::processors::mqtt::MqttVersions::V_5_0)}));
+  REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, 
minifi::processors::ConsumeMQTT::SessionExpiryInterval.name, "0 s"));
+  REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, 
minifi::processors::ConsumeMQTT::CleanSession.name, "true"));
 
-  REQUIRE_NOTHROW(plan_->scheduleProcessor(consumeMqttProcessor_));
+  
REQUIRE_NOTHROW(test_controller_.plan->scheduleProcessor(consume_mqtt_processor_));
   REQUIRE(LogTestController::getInstance().contains("[warning] MQTT 5.0 
specification does not support Clean Session. Property is not used.", 1s));
 }
 
-TEST_CASE_METHOD(Fixture, "ConsumeMQTTTest_TopicAliasMaximum_V_3", 
"[consumeMQTTTest]") {
-  
REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI.name,
 "127.0.0.1:1883"));
-  
REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::Topic.name,
 "mytopic"));
-  
REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::TopicAliasMaximum.name,
 "1"));
+TEST_CASE_METHOD(ConsumeMqttTestFixture, 
"ConsumeMQTTTest_TopicAliasMaximum_V_3", "[consumeMQTTTest]") {
+  REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, 
minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
+  REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, 
minifi::processors::ConsumeMQTT::Topic.name, "mytopic"));
+  REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, 
minifi::processors::ConsumeMQTT::TopicAliasMaximum.name, "1"));
 
-  REQUIRE_NOTHROW(plan_->scheduleProcessor(consumeMqttProcessor_));
+  
REQUIRE_NOTHROW(test_controller_.plan->scheduleProcessor(consume_mqtt_processor_));
   REQUIRE(LogTestController::getInstance().contains("[warning] MQTT 3.x 
specification does not support Topic Alias Maximum. Property is not used.", 
1s));
 }
 
-TEST_CASE_METHOD(Fixture, "ConsumeMQTTTest_ReceiveMaximum_V_3", 
"[consumeMQTTTest]") {
-  
REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI.name,
 "127.0.0.1:1883"));
-  
REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::Topic.name,
 "mytopic"));
-  
REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::ReceiveMaximum.name,
 "1"));
+TEST_CASE_METHOD(ConsumeMqttTestFixture, "ConsumeMQTTTest_ReceiveMaximum_V_3", 
"[consumeMQTTTest]") {
+  REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, 
minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
+  REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, 
minifi::processors::ConsumeMQTT::Topic.name, "mytopic"));
+  REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, 
minifi::processors::ConsumeMQTT::ReceiveMaximum.name, "1"));
 
-  REQUIRE_NOTHROW(plan_->scheduleProcessor(consumeMqttProcessor_));
+  
REQUIRE_NOTHROW(test_controller_.plan->scheduleProcessor(consume_mqtt_processor_));
   REQUIRE(LogTestController::getInstance().contains("[warning] MQTT 3.x 
specification does not support Receive Maximum. Property is not used.", 1s));
 }
+
+TEST_CASE_METHOD(ConsumeMqttTestFixture, "Read XML messages and write them to 
json records", "[consumeMQTTTest]") {
+  test_controller_.plan->addController("XMLReader", "XMLReader");
+  test_controller_.plan->addController("JsonRecordSetWriter", 
"JsonRecordSetWriter");
+  REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, 
minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
+  REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, 
minifi::processors::ConsumeMQTT::Topic.name, "mytopic"));
+  REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, 
minifi::processors::ConsumeMQTT::RecordReader.name, "XMLReader"));
+  REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, 
minifi::processors::ConsumeMQTT::RecordWriter.name, "JsonRecordSetWriter"));
+
+  bool add_attributes_as_fields = true;
+  SECTION("Add attributes as fields by default") {
+  }
+
+  SECTION("Do not add attributes as fields") {
+    add_attributes_as_fields = false;
+    REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, 
minifi::processors::ConsumeMQTT::AddAttributesAsFields.name, "false"));
+  }
+
+  const size_t expected_record_count = 2;
+  const std::string payload = 
R"(<root><int_value>42</int_value><string_value>test</string_value></root>)";
+  for (size_t i = 0; i < expected_record_count; ++i) {
+    TestConsumeMQTTProcessor::SmartMessage 
message{std::unique_ptr<MQTTAsync_message, 
TestConsumeMQTTProcessor::MQTTMessageDeleter>(
+        new MQTTAsync_message{.struct_id = {'M', 'Q', 'T', 'M'}, 
.struct_version = gsl::narrow<int>(i), .payloadlen = 
gsl::narrow<int>(payload.size()),
+                              .payload = const_cast<char*>(payload.data()), 
.qos = gsl::narrow<int>(i), .retained = gsl::narrow<int>(i), .dup = 
gsl::narrow<int>(i),
+                              .msgid = gsl::narrow<int>(i + 1), .properties = 
{}}),
+      std::string{"mytopic/segment/" + std::to_string(i)}};  // 
NOLINT(clang-analyzer-cplusplus.NewDeleteLeaks)
+
+    auto& test_processor = 
dynamic_cast<TestConsumeMQTTProcessor&>(consume_mqtt_processor_->getImpl());
+    test_processor.enqueueReceivedMQTTMsg(std::move(message));
+  }
+  const auto trigger_results = test_controller_.trigger();
+  CHECK(trigger_results.at(TestConsumeMQTTProcessor::Success).size() == 1);
+  const auto flow_file = 
trigger_results.at(TestConsumeMQTTProcessor::Success).at(0);
+
+  auto string_content = test_controller_.plan->getContent(flow_file);
+  verifyXmlJsonResult(string_content, expected_record_count, 
add_attributes_as_fields);
+
+  CHECK(*flow_file->getAttribute("record.count") == "2");
+  CHECK(*flow_file->getAttribute("mqtt.broker") == "127.0.0.1:1883");
+}
+
+TEST_CASE_METHOD(ConsumeMqttTestFixture, "Invalid XML payload does not result 
in new flow files", "[consumeMQTTTest]") {
+  test_controller_.plan->addController("XMLReader", "XMLReader");
+  test_controller_.plan->addController("JsonRecordSetWriter", 
"JsonRecordSetWriter");
+  REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, 
minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
+  REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, 
minifi::processors::ConsumeMQTT::Topic.name, "mytopic"));
+  REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, 
minifi::processors::ConsumeMQTT::RecordReader.name, "XMLReader"));
+  REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, 
minifi::processors::ConsumeMQTT::RecordWriter.name, "JsonRecordSetWriter"));
+
+  const std::string payload = "invalid xml payload";
+  TestConsumeMQTTProcessor::SmartMessage message{
+    std::unique_ptr<MQTTAsync_message, 
TestConsumeMQTTProcessor::MQTTMessageDeleter>(
+      new MQTTAsync_message{.struct_id = {'M', 'Q', 'T', 'M'}, .struct_version 
= 1, .payloadlen = gsl::narrow<int>(payload.size()),
+                            .payload = const_cast<char*>(payload.data()), .qos 
= 1, .retained = 0, .dup = 0, .msgid = 42, .properties = {}}),
+    std::string{"mytopic"}};  // 
NOLINT(clang-analyzer-cplusplus.NewDeleteLeaks)
+  auto& test_processor = 
dynamic_cast<TestConsumeMQTTProcessor&>(consume_mqtt_processor_->getImpl());
+  test_processor.enqueueReceivedMQTTMsg(std::move(message));
+
+  const auto trigger_results = test_controller_.trigger();
+  CHECK(trigger_results.at(TestConsumeMQTTProcessor::Success).empty());
+  REQUIRE(LogTestController::getInstance().contains("[error] Failed to read 
records from MQTT message", 1s));
+}
+
+TEST_CASE_METHOD(ConsumeMqttTestFixture, "Read MQTT message and write it to a 
flow file", "[consumeMQTTTest]") {
+  std::vector<std::string> expected_topic_segments;
+  std::string topic;
+
+  SECTION("Single topic segment") {
+    expected_topic_segments = {"mytopic"};
+    topic = "mytopic";
+  }
+
+  SECTION("Multiple topic segments") {
+    expected_topic_segments = {"my", "topic", "segment"};
+    topic = "my/topic/segment";
+  }
+
+  SECTION("Empty topic segment") {
+    expected_topic_segments = {"mytopic", "", "segment"};
+    topic = "mytopic//segment";
+  }
+
+  SECTION("Empty topic segment at the end") {
+    expected_topic_segments = {"mytopic", ""};
+    topic = "mytopic/";
+  }
+
+  REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, 
minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
+  REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, 
minifi::processors::ConsumeMQTT::Topic.name, "mytopic"));
+
+  const size_t expected_flow_file_count = 2;
+  const std::string payload = "test MQTT payload";
+  for (size_t i = 0; i < expected_flow_file_count; ++i) {
+    TestConsumeMQTTProcessor::SmartMessage 
message{std::unique_ptr<MQTTAsync_message, 
TestConsumeMQTTProcessor::MQTTMessageDeleter>(
+        new MQTTAsync_message{.struct_id = {'M', 'Q', 'T', 'M'}, 
.struct_version = 1, .payloadlen = gsl::narrow<int>(payload.size()),
+                              .payload = const_cast<char*>(payload.data()), 
.qos = 1, .retained = 0, .dup = 0, .msgid = 42, .properties = {}}),
+      std::string{topic}};  // NOLINT(clang-analyzer-cplusplus.NewDeleteLeaks)
+    auto& test_processor = 
dynamic_cast<TestConsumeMQTTProcessor&>(consume_mqtt_processor_->getImpl());
+    test_processor.enqueueReceivedMQTTMsg(std::move(message));
+  }
+  const auto trigger_results = test_controller_.trigger();
+  CHECK(trigger_results.at(TestConsumeMQTTProcessor::Success).size() == 
expected_flow_file_count);
+  for (size_t i = 0; i < expected_flow_file_count; ++i) {
+    const auto flow_file = 
trigger_results.at(TestConsumeMQTTProcessor::Success).at(i);
+    auto string_content = test_controller_.plan->getContent(flow_file);
+    CHECK(string_content == payload);
+
+    CHECK(*flow_file->getAttribute("mqtt.broker") == "127.0.0.1:1883");
+    CHECK(*flow_file->getAttribute("mqtt.topic") == topic);
+    for (size_t j = 0; j < expected_topic_segments.size(); ++j) {
+      CHECK(*flow_file->getAttribute("mqtt.topic.segment." + 
std::to_string(j)) == expected_topic_segments[j]);
+    }
+    CHECK(*flow_file->getAttribute("mqtt.qos") == "1");
+    CHECK(*flow_file->getAttribute("mqtt.isDuplicate") == "false");
+    CHECK(*flow_file->getAttribute("mqtt.isRetained") == "false");
+  }
+}
+
+TEST_CASE_METHOD(ConsumeMqttTestFixture, "Test scheduling failure if 
non-existent recordset reader or writer is set", "[consumeMQTTTest]") {
+  test_controller_.plan->addController("XMLReader", "XMLReader");
+  test_controller_.plan->addController("JsonRecordSetWriter", 
"JsonRecordSetWriter");
+  REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, 
minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
+  REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, 
minifi::processors::ConsumeMQTT::Topic.name, "mytopic"));
+  SECTION("RecordReader is set to invalid controller service") {
+    REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, 
minifi::processors::ConsumeMQTT::RecordReader.name, "invalid_reader"));
+    REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, 
minifi::processors::ConsumeMQTT::RecordWriter.name, "JsonRecordSetWriter"));
+    REQUIRE_THROWS_WITH(test_controller_.trigger(), 
Catch::Matchers::EndsWith("Controller service 'Record Reader' = 
'invalid_reader' not found"));
+  }
+
+  SECTION("RecordWriter is set to invalid controller service") {
+    REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, 
minifi::processors::ConsumeMQTT::RecordReader.name, "XMLReader"));
+    REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, 
minifi::processors::ConsumeMQTT::RecordWriter.name, "invalid_writer"));
+    REQUIRE_THROWS_WITH(test_controller_.trigger(), 
Catch::Matchers::EndsWith("Controller service 'Record Writer' = 
'invalid_writer' not found"));
+  }
+}
+
+}  // namespace org::apache::nifi::minifi::test
diff --git a/extensions/mqtt/tests/PublishMQTTTests.cpp 
b/extensions/mqtt/tests/PublishMQTTTests.cpp
index bcc84f65f..98ff17280 100644
--- a/extensions/mqtt/tests/PublishMQTTTests.cpp
+++ b/extensions/mqtt/tests/PublishMQTTTests.cpp
@@ -22,62 +22,85 @@
 #include "catch2/matchers/catch_matchers_string.hpp"
 #include "unit/TestBase.h"
 #include "../processors/PublishMQTT.h"
+#include "unit/SingleProcessorTestController.h"
+#include "core/Resource.h"
+#include "controllers/XMLRecordSetWriter.h"
+#include "unit/ProcessorUtils.h"
 
 using namespace std::literals::chrono_literals;
 
-namespace {
-struct Fixture {
-  Fixture() {
-    
LogTestController::getInstance().setDebug<minifi::processors::PublishMQTT>();
-    plan_ = testController_.createPlan();
-    publishMqttProcessor_ = plan_->addProcessor("PublishMQTT", 
"publishMqttProcessor");
+namespace org::apache::nifi::minifi::test {
+
+class TestPublishMQTTProcessor : public minifi::processors::PublishMQTT {
+ public:
+  explicit TestPublishMQTTProcessor(minifi::core::ProcessorMetadata metadata)
+      : minifi::processors::PublishMQTT(std::move(metadata)) {}
+
+  void initializeClient() override {
+  }
+
+  bool sendMessage(const std::vector<std::byte>&, const std::string&, const 
std::string&, const std::shared_ptr<core::FlowFile>&) override {
+    return true;
   }
 
-  Fixture(Fixture&&) = delete;
-  Fixture(const Fixture&) = delete;
-  Fixture& operator=(Fixture&&) = delete;
-  Fixture& operator=(const Fixture&) = delete;
+  void onTrigger(core::ProcessContext& context, core::ProcessSession& session) 
override {
+    minifi::processors::PublishMQTT::onTriggerImpl(context, session);
+  }
+};
 
-  ~Fixture() {
+REGISTER_RESOURCE(TestPublishMQTTProcessor, Processor);
+
+struct PublishMQTTTestFixture {
+  PublishMQTTTestFixture()
+      : 
test_controller_(utils::make_processor<TestPublishMQTTProcessor>("TestPublishMQTTProcessor")),
+        publish_mqtt_processor_(test_controller_.getProcessor()) {
+    REQUIRE(publish_mqtt_processor_ != nullptr);
+    LogTestController::getInstance().setDebug<TestPublishMQTTProcessor>();
+  }
+
+  PublishMQTTTestFixture(PublishMQTTTestFixture&&) = delete;
+  PublishMQTTTestFixture(const PublishMQTTTestFixture&) = delete;
+  PublishMQTTTestFixture& operator=(PublishMQTTTestFixture&&) = delete;
+  PublishMQTTTestFixture& operator=(const PublishMQTTTestFixture&) = delete;
+
+  ~PublishMQTTTestFixture() {
     LogTestController::getInstance().reset();
   }
 
-  TestController testController_;
-  std::shared_ptr<TestPlan> plan_;
-  core::Processor* publishMqttProcessor_ = nullptr;
+  SingleProcessorTestController test_controller_;
+  core::Processor* publish_mqtt_processor_ = nullptr;
 };
-}  // namespace
 
-TEST_CASE_METHOD(Fixture, "PublishMQTTTest_EmptyTopic", "[publishMQTTTest]") {
-  
REQUIRE(publishMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI.name,
 "127.0.0.1:1883"));
-  REQUIRE_THROWS_WITH(plan_->scheduleProcessor(publishMqttProcessor_),
+TEST_CASE_METHOD(PublishMQTTTestFixture, "PublishMQTTTest_EmptyTopic", 
"[publishMQTTTest]") {
+  REQUIRE(test_controller_.plan->setProperty(publish_mqtt_processor_, 
minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
+  
REQUIRE_THROWS_WITH(test_controller_.plan->scheduleProcessor(publish_mqtt_processor_),
       Catch::Matchers::EndsWith("Process Schedule Operation: PublishMQTT: 
Topic is required"));
 }
 
-TEST_CASE_METHOD(Fixture, "PublishMQTTTest_EmptyBrokerURI", 
"[publishMQTTTest]") {
-  
REQUIRE(publishMqttProcessor_->setProperty(minifi::processors::PublishMQTT::Topic.name,
 "mytopic"));
-  REQUIRE_THROWS_WITH(plan_->scheduleProcessor(publishMqttProcessor_),
-      Catch::Matchers::EndsWith("Expected valid value from 
\"publishMqttProcessor::Broker URI\", but got PropertyNotSet (Property 
Error:2)"));
+TEST_CASE_METHOD(PublishMQTTTestFixture, "PublishMQTTTest_EmptyBrokerURI", 
"[publishMQTTTest]") {
+  REQUIRE(test_controller_.plan->setProperty(publish_mqtt_processor_, 
minifi::processors::PublishMQTT::Topic.name, "mytopic"));
+  
REQUIRE_THROWS_WITH(test_controller_.plan->scheduleProcessor(publish_mqtt_processor_),
+      Catch::Matchers::EndsWith("Expected valid value from 
\"TestPublishMQTTProcessor::Broker URI\", but got PropertyNotSet (Property 
Error:2)"));
 }
 
-TEST_CASE_METHOD(Fixture, "PublishMQTTTest_EmptyClientID_V_3", 
"[publishMQTTTest]") {
-  
REQUIRE(publishMqttProcessor_->setProperty(minifi::processors::PublishMQTT::Topic.name,
 "mytopic"));
-  
REQUIRE(publishMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI.name,
 "127.0.0.1:1883"));
-  
REQUIRE(publishMqttProcessor_->setProperty(minifi::processors::PublishMQTT::MessageExpiryInterval.name,
 "60 sec"));
-  REQUIRE_NOTHROW(plan_->scheduleProcessor(publishMqttProcessor_));
+TEST_CASE_METHOD(PublishMQTTTestFixture, "PublishMQTTTest_EmptyClientID_V_3", 
"[publishMQTTTest]") {
+  REQUIRE(test_controller_.plan->setProperty(publish_mqtt_processor_, 
minifi::processors::PublishMQTT::Topic.name, "mytopic"));
+  REQUIRE(test_controller_.plan->setProperty(publish_mqtt_processor_, 
minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
+  REQUIRE(test_controller_.plan->setProperty(publish_mqtt_processor_, 
minifi::processors::PublishMQTT::MessageExpiryInterval.name, "60 sec"));
+  
REQUIRE_NOTHROW(test_controller_.plan->scheduleProcessor(publish_mqtt_processor_));
   REQUIRE(LogTestController::getInstance().contains("[warning] MQTT 3.x 
specification does not support Message Expiry Intervals. Property is not 
used.", 1s));
 }
 
-TEST_CASE_METHOD(Fixture, "PublishMQTTTest_ContentType_V_3", 
"[publishMQTTTest]") {
-  
REQUIRE(publishMqttProcessor_->setProperty(minifi::processors::PublishMQTT::Topic.name,
 "mytopic"));
-  
REQUIRE(publishMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI.name,
 "127.0.0.1:1883"));
-  
REQUIRE(publishMqttProcessor_->setProperty(minifi::processors::PublishMQTT::ContentType.name,
 "text/plain"));
-  REQUIRE_NOTHROW(plan_->scheduleProcessor(publishMqttProcessor_));
+TEST_CASE_METHOD(PublishMQTTTestFixture, "PublishMQTTTest_ContentType_V_3", 
"[publishMQTTTest]") {
+  REQUIRE(test_controller_.plan->setProperty(publish_mqtt_processor_, 
minifi::processors::PublishMQTT::Topic.name, "mytopic"));
+  REQUIRE(test_controller_.plan->setProperty(publish_mqtt_processor_, 
minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
+  REQUIRE(test_controller_.plan->setProperty(publish_mqtt_processor_, 
minifi::processors::PublishMQTT::ContentType.name, "text/plain"));
+  
REQUIRE_NOTHROW(test_controller_.plan->scheduleProcessor(publish_mqtt_processor_));
   REQUIRE(LogTestController::getInstance().contains("[warning] MQTT 3.x 
specification does not support Content Types. Property is not used.", 1s));
 }
 
-TEST_CASE_METHOD(Fixture, "PublishMQTT can publish the number of in-flight 
messages as a metric") {
-  const auto node = publishMqttProcessor_->getResponseNode();
+TEST_CASE_METHOD(PublishMQTTTestFixture, "PublishMQTT can publish the number 
of in-flight messages as a metric") {
+  const auto node = publish_mqtt_processor_->getResponseNode();
 
   SECTION("heartbeat metric") {
     const auto serialized_nodes = 
minifi::state::response::ResponseNode::serializeAndMergeResponseNodes({node});
@@ -94,3 +117,46 @@ TEST_CASE_METHOD(Fixture, "PublishMQTT can publish the 
number of in-flight messa
     CHECK(it->value == 0.0);
   }
 }
+
+TEST_CASE_METHOD(PublishMQTTTestFixture, "Test sending XML message records", 
"[publishMQTTTest]") {
+  test_controller_.plan->addController("JsonTreeReader", "JsonTreeReader");
+  auto xml_writer = test_controller_.plan->addController("XMLRecordSetWriter", 
"XMLRecordSetWriter");
+  REQUIRE(test_controller_.plan->setProperty(xml_writer, 
minifi::standard::XMLRecordSetWriter::NameOfRootTag.name, "root"));
+  REQUIRE(test_controller_.plan->setProperty(xml_writer, 
minifi::standard::XMLRecordSetWriter::NameOfRecordTag.name, "record"));
+
+  REQUIRE(test_controller_.plan->setProperty(publish_mqtt_processor_, 
minifi::processors::PublishMQTT::Topic.name, "mytopic"));
+  REQUIRE(test_controller_.plan->setProperty(publish_mqtt_processor_, 
minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
+  REQUIRE(test_controller_.plan->setProperty(publish_mqtt_processor_, 
minifi::processors::PublishMQTT::RecordReader.name, "JsonTreeReader"));
+  REQUIRE(test_controller_.plan->setProperty(publish_mqtt_processor_, 
minifi::processors::PublishMQTT::RecordWriter.name, "XMLRecordSetWriter"));
+
+  const auto trigger_results = test_controller_.trigger(R"([{"element1": 
"value1"}, {"element2": "42"}])");
+  CHECK(trigger_results.at(TestPublishMQTTProcessor::Success).size() == 2);
+  const auto flow_file_1 = 
trigger_results.at(TestPublishMQTTProcessor::Success).at(0);
+
+  auto string_content = test_controller_.plan->getContent(flow_file_1);
+  CHECK(string_content == R"(<?xml 
version="1.0"?><root><record><element1>value1</element1></record></root>)");
+
+  const auto flow_file_2 = 
trigger_results.at(TestPublishMQTTProcessor::Success).at(1);
+  string_content = test_controller_.plan->getContent(flow_file_2);
+  CHECK(string_content == R"(<?xml 
version="1.0"?><root><record><element2>42</element2></record></root>)");
+}
+
+TEST_CASE_METHOD(PublishMQTTTestFixture, "Test scheduling failure if 
non-existent recordset reader or writer is set", "[publishMQTTTest]") {
+  test_controller_.plan->addController("XMLReader", "XMLReader");
+  test_controller_.plan->addController("JsonRecordSetWriter", 
"JsonRecordSetWriter");
+  REQUIRE(test_controller_.plan->setProperty(publish_mqtt_processor_, 
minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
+  REQUIRE(test_controller_.plan->setProperty(publish_mqtt_processor_, 
minifi::processors::PublishMQTT::Topic.name, "mytopic"));
+  SECTION("RecordReader is set to invalid controller service") {
+    REQUIRE(test_controller_.plan->setProperty(publish_mqtt_processor_, 
minifi::processors::PublishMQTT::RecordReader.name, "invalid_reader"));
+    REQUIRE(test_controller_.plan->setProperty(publish_mqtt_processor_, 
minifi::processors::PublishMQTT::RecordWriter.name, "JsonRecordSetWriter"));
+    REQUIRE_THROWS_WITH(test_controller_.trigger(), 
Catch::Matchers::EndsWith("Controller service 'Record Reader' = 
'invalid_reader' not found"));
+  }
+
+  SECTION("RecordWriter is set to invalid controller service") {
+    REQUIRE(test_controller_.plan->setProperty(publish_mqtt_processor_, 
minifi::processors::PublishMQTT::RecordReader.name, "XMLReader"));
+    REQUIRE(test_controller_.plan->setProperty(publish_mqtt_processor_, 
minifi::processors::PublishMQTT::RecordWriter.name, "invalid_writer"));
+    REQUIRE_THROWS_WITH(test_controller_.trigger(), 
Catch::Matchers::EndsWith("Controller service 'Record Writer' = 
'invalid_writer' not found"));
+  }
+}
+
+}  // namespace org::apache::nifi::minifi::test
diff --git a/extensions/standard-processors/processors/ConvertRecord.cpp 
b/extensions/standard-processors/processors/ConvertRecord.cpp
index fb85990c1..cf37cc191 100644
--- a/extensions/standard-processors/processors/ConvertRecord.cpp
+++ b/extensions/standard-processors/processors/ConvertRecord.cpp
@@ -20,17 +20,20 @@
 #include "nonstd/expected.hpp"
 #include "utils/GeneralUtils.h"
 #include "utils/ProcessorConfigUtils.h"
+#include "minifi-cpp/utils/gsl.h"
 
 namespace org::apache::nifi::minifi::processors {
 
 void ConvertRecord::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFactory&) {
-  record_set_reader_ = 
utils::parseControllerService<core::RecordSetReader>(context, RecordReader, 
getUUID());
-  record_set_writer_ = 
utils::parseControllerService<core::RecordSetWriter>(context, RecordWriter, 
getUUID());
+  record_converter_ = core::RecordConverter{
+    .record_set_reader = 
utils::parseControllerService<core::RecordSetReader>(context, RecordReader, 
getUUID()),
+    .record_set_writer = 
utils::parseControllerService<core::RecordSetWriter>(context, RecordWriter, 
getUUID())
+  };
   include_zero_record_flow_files_ = utils::parseBoolProperty(context, 
IncludeZeroRecordFlowFiles);
 }
 
 void ConvertRecord::onTrigger(core::ProcessContext& context, 
core::ProcessSession& session) {
-  gsl_Expects(record_set_reader_ && record_set_writer_);
+  gsl_Expects(record_converter_);
   const auto flow_file = session.get();
   if (!flow_file) {
     context.yield();
@@ -39,7 +42,7 @@ void ConvertRecord::onTrigger(core::ProcessContext& context, 
core::ProcessSessio
 
   nonstd::expected<core::RecordSet, std::error_code> record_set;
   session.read(flow_file, [this, &record_set](const 
std::shared_ptr<io::InputStream>& input_stream) {
-    record_set = record_set_reader_->read(*input_stream);
+    record_set = record_converter_->record_set_reader->read(*input_stream);
     return gsl::narrow<int64_t>(input_stream->size());
   });
   if (!record_set) {
@@ -55,7 +58,7 @@ void ConvertRecord::onTrigger(core::ProcessContext& context, 
core::ProcessSessio
     return;
   }
 
-  record_set_writer_->write(*record_set, flow_file, session);
+  record_converter_->record_set_writer->write(*record_set, flow_file, session);
   
flow_file->setAttribute(processors::ConvertRecord::RecordCountOutputAttribute.name,
 std::to_string(record_set->size()));
   session.transfer(flow_file, Success);
 }
diff --git a/extensions/standard-processors/processors/ConvertRecord.h 
b/extensions/standard-processors/processors/ConvertRecord.h
index daf1ffe27..5827b9f62 100644
--- a/extensions/standard-processors/processors/ConvertRecord.h
+++ b/extensions/standard-processors/processors/ConvertRecord.h
@@ -19,13 +19,13 @@
 #include <memory>
 #include <string_view>
 #include <utility>
+#include <optional>
 
 #include "core/AbstractProcessor.h"
 #include "core/ProcessSession.h"
 #include "core/PropertyDefinitionBuilder.h"
 #include "minifi-cpp/core/RelationshipDefinition.h"
-#include "controllers/RecordSetReader.h"
-#include "controllers/RecordSetWriter.h"
+#include "minifi-cpp/controllers/RecordConverter.h"
 
 namespace org::apache::nifi::minifi::processors {
 
@@ -77,8 +77,7 @@ class ConvertRecord : public 
core::AbstractProcessor<ConvertRecord> {
   void onTrigger(core::ProcessContext& context, core::ProcessSession& session) 
override;
 
  private:
-  std::shared_ptr<core::RecordSetReader> record_set_reader_;
-  std::shared_ptr<core::RecordSetWriter> record_set_writer_;
+  std::optional<core::RecordConverter> record_converter_;
   bool include_zero_record_flow_files_ = true;
 };
 
diff --git a/extensions/standard-processors/processors/SplitRecord.cpp 
b/extensions/standard-processors/processors/SplitRecord.cpp
index 5cc4f3153..fb32a5baa 100644
--- a/extensions/standard-processors/processors/SplitRecord.cpp
+++ b/extensions/standard-processors/processors/SplitRecord.cpp
@@ -19,12 +19,16 @@
 #include "core/Resource.h"
 #include "nonstd/expected.hpp"
 #include "utils/GeneralUtils.h"
+#include "utils/ProcessorConfigUtils.h"
+#include "minifi-cpp/utils/gsl.h"
 
 namespace org::apache::nifi::minifi::processors {
 
 void SplitRecord::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFactory&) {
-  record_set_reader_ = 
utils::parseControllerService<core::RecordSetReader>(context, RecordReader, 
getUUID());
-  record_set_writer_ = 
utils::parseControllerService<core::RecordSetWriter>(context, RecordWriter, 
getUUID());
+  record_converter_ = core::RecordConverter{
+    .record_set_reader = 
utils::parseControllerService<core::RecordSetReader>(context, RecordReader, 
getUUID()),
+    .record_set_writer = 
utils::parseControllerService<core::RecordSetWriter>(context, RecordWriter, 
getUUID())
+  };
 }
 
 nonstd::expected<std::size_t, std::string> 
SplitRecord::readRecordsPerSplit(core::ProcessContext& context, const 
core::FlowFile& original_flow_file) {
@@ -36,6 +40,7 @@ nonstd::expected<std::size_t, std::string> 
SplitRecord::readRecordsPerSplit(core
 }
 
 void SplitRecord::onTrigger(core::ProcessContext& context, 
core::ProcessSession& session) {
+  gsl_Expects(record_converter_);
   const auto original_flow_file = session.get();
   if (!original_flow_file) {
     context.yield();
@@ -51,7 +56,7 @@ void SplitRecord::onTrigger(core::ProcessContext& context, 
core::ProcessSession&
 
   nonstd::expected<core::RecordSet, std::error_code> record_set;
   session.read(original_flow_file, [this, &record_set](const 
std::shared_ptr<io::InputStream>& input_stream) {
-    record_set = record_set_reader_->read(*input_stream);
+    record_set = record_converter_->record_set_reader->read(*input_stream);
     return gsl::narrow<int64_t>(input_stream->size());
   });
   if (!record_set) {
@@ -84,7 +89,7 @@ void SplitRecord::onTrigger(core::ProcessContext& context, 
core::ProcessSession&
     split_flow_file->setAttribute("fragment.count", 
std::to_string(fragment_count));
     split_flow_file->setAttribute("segment.original.filename", 
original_flow_file->getAttribute("filename").value_or(""));
 
-    record_set_writer_->write(slice_record_set, split_flow_file, session);
+    record_converter_->record_set_writer->write(slice_record_set, 
split_flow_file, session);
     session.transfer(split_flow_file, Splits);
     ++fragment_index;
   }
diff --git a/extensions/standard-processors/processors/SplitRecord.h 
b/extensions/standard-processors/processors/SplitRecord.h
index 5057d7e1c..c594024d3 100644
--- a/extensions/standard-processors/processors/SplitRecord.h
+++ b/extensions/standard-processors/processors/SplitRecord.h
@@ -16,6 +16,8 @@
  */
 #pragma once
 
+#include <optional>
+
 #include "minifi-cpp/core/Annotation.h"
 #include "minifi-cpp/core/ProcessContext.h"
 #include "core/ProcessSession.h"
@@ -24,8 +26,7 @@
 #include "core/PropertyDefinitionBuilder.h"
 #include "minifi-cpp/core/RelationshipDefinition.h"
 #include "minifi-cpp/core/logging/Logger.h"
-#include "minifi-cpp/controllers/RecordSetReader.h"
-#include "minifi-cpp/controllers/RecordSetWriter.h"
+#include "minifi-cpp/controllers/RecordConverter.h"
 #include "core/AbstractProcessor.h"
 
 namespace org::apache::nifi::minifi::processors {
@@ -87,8 +88,7 @@ class SplitRecord final : public 
core::AbstractProcessor<SplitRecord> {
  private:
   static nonstd::expected<std::size_t, std::string> 
readRecordsPerSplit(core::ProcessContext& context, const core::FlowFile& 
original_flow_file);
 
-  std::shared_ptr<core::RecordSetReader> record_set_reader_;
-  std::shared_ptr<core::RecordSetWriter> record_set_writer_;
+  std::optional<core::RecordConverter> record_converter_;
 };
 
 }  // namespace org::apache::nifi::minifi::processors
diff --git a/minifi-api/include/minifi-cpp/controllers/RecordConverter.h 
b/minifi-api/include/minifi-cpp/controllers/RecordConverter.h
new file mode 100644
index 000000000..295dd2013
--- /dev/null
+++ b/minifi-api/include/minifi-cpp/controllers/RecordConverter.h
@@ -0,0 +1,32 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <memory>
+
+#include "RecordSetReader.h"
+#include "RecordSetWriter.h"
+#include "minifi-cpp/utils/gsl.h"
+
+namespace org::apache::nifi::minifi::core {
+
+struct RecordConverter {
+  gsl::not_null<std::shared_ptr<core::RecordSetReader>> record_set_reader;
+  gsl::not_null<std::shared_ptr<core::RecordSetWriter>> record_set_writer;
+};
+
+}  // namespace org::apache::nifi::minifi::core

Reply via email to