This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new c7ba1b0b73 NIFI-14767 Enrich flowfile attributes with mqtt topic 
segments produced by ConsumeMQTT
c7ba1b0b73 is described below

commit c7ba1b0b73c11ec8454ac44e51fae5fd2e9b83e6
Author: tpalfy <[email protected]>
AuthorDate: Fri Jul 11 17:13:48 2025 +0200

    NIFI-14767 Enrich flowfile attributes with mqtt topic segments produced by 
ConsumeMQTT
    
    Signed-off-by: Pierre Villard <[email protected]>
    
    This closes #10108.
---
 .../apache/nifi/processors/mqtt/ConsumeMQTT.java   |  34 +++++-
 .../nifi/processors/mqtt/TestConsumeMQTT.java      | 120 ++++++++++++++++++++-
 2 files changed, 149 insertions(+), 5 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
 
b/nifi-extension-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
index c0da9cff39..7cbff78996 100644
--- 
a/nifi-extension-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
+++ 
b/nifi-extension-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
@@ -105,11 +105,14 @@ public class ConsumeMQTT extends AbstractMQTTProcessor {
     public final static String RECORD_COUNT_KEY = "record.count";
     public final static String BROKER_ATTRIBUTE_KEY = "mqtt.broker";
     public final static String TOPIC_ATTRIBUTE_KEY = "mqtt.topic";
+    public final static String TOPIC_SEGMENT_PREFIX = "mqtt.topic.segment.";
+    public final static String TOPIC_SEPARATOR = "/";
     public final static String QOS_ATTRIBUTE_KEY = "mqtt.qos";
     public final static String IS_DUPLICATE_ATTRIBUTE_KEY = "mqtt.isDuplicate";
     public final static String IS_RETAINED_ATTRIBUTE_KEY = "mqtt.isRetained";
 
     public final static String TOPIC_FIELD_KEY = "_topic";
+    public final static String TOPIC_SEGMENTS_FIELD_KEY = "_topicSegments";
     public final static String QOS_FIELD_KEY = "_qos";
     public final static String IS_DUPLICATE_FIELD_KEY = "_isDuplicate";
     public final static String IS_RETAINED_FIELD_KEY = "_isRetained";
@@ -440,7 +443,7 @@ public class ConsumeMQTT extends AbstractMQTTProcessor {
 
         final Map<String, String> attrs = new HashMap<>();
         attrs.put(BROKER_ATTRIBUTE_KEY, clientProperties.getRawBrokerUris());
-        attrs.put(TOPIC_ATTRIBUTE_KEY, mqttMessage.getTopic());
+        addTopicAttributes(attrs, mqttMessage.getTopic());
         attrs.put(QOS_ATTRIBUTE_KEY, String.valueOf(mqttMessage.getQos()));
         attrs.put(IS_DUPLICATE_ATTRIBUTE_KEY, 
String.valueOf(mqttMessage.isDuplicate()));
         attrs.put(IS_RETAINED_ATTRIBUTE_KEY, 
String.valueOf(mqttMessage.isRetained()));
@@ -449,6 +452,20 @@ public class ConsumeMQTT extends AbstractMQTTProcessor {
         return messageFlowfile;
     }
 
+    void addTopicAttributes(
+            final Map<String, String> attributes,
+            final String topic
+    ) {
+        attributes.put(TOPIC_ATTRIBUTE_KEY, topic);
+
+        if (topic != null && !topic.isEmpty()) {
+            final String[] segments = topic.split(TOPIC_SEPARATOR, -1);
+            for (int topicSegmentIndex = 0; topicSegmentIndex < 
segments.length; topicSegmentIndex++) {
+                attributes.put(TOPIC_SEGMENT_PREFIX + topicSegmentIndex, 
segments[topicSegmentIndex]);
+            }
+        }
+    }
+
     private void transferQueueRecord(final ProcessContext context, final 
ProcessSession session) {
         final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
         final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
@@ -500,6 +517,7 @@ public class ConsumeMQTT extends AbstractMQTTProcessor {
                                         final List<RecordField> fields = new 
ArrayList<>(writeSchema.getFields());
 
                                         fields.add(new 
RecordField(TOPIC_FIELD_KEY, RecordFieldType.STRING.getDataType()));
+                                        fields.add(new 
RecordField(TOPIC_SEGMENTS_FIELD_KEY, 
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType())));
                                         fields.add(new 
RecordField(QOS_FIELD_KEY, RecordFieldType.INT.getDataType()));
                                         fields.add(new 
RecordField(IS_DUPLICATE_FIELD_KEY, RecordFieldType.BOOLEAN.getDataType()));
                                         fields.add(new 
RecordField(IS_RETAINED_FIELD_KEY, RecordFieldType.BOOLEAN.getDataType()));
@@ -518,7 +536,7 @@ public class ConsumeMQTT extends AbstractMQTTProcessor {
 
                             try {
                                 if 
(context.getProperty(ADD_ATTRIBUTES_AS_FIELDS).asBoolean()) {
-                                    record.setValue(TOPIC_FIELD_KEY, 
mqttMessage.getTopic());
+                                    addTopicFields(record, 
mqttMessage.getTopic());
                                     record.setValue(QOS_FIELD_KEY, 
mqttMessage.getQos());
                                     record.setValue(IS_RETAINED_FIELD_KEY, 
mqttMessage.isRetained());
                                     record.setValue(IS_DUPLICATE_FIELD_KEY, 
mqttMessage.isDuplicate());
@@ -591,6 +609,18 @@ public class ConsumeMQTT extends AbstractMQTTProcessor {
         logger.info("Successfully processed {} records for {}", count, 
flowFile);
     }
 
+    private void addTopicFields(
+            final Record record,
+            final String topic
+    ) {
+        record.setValue(TOPIC_FIELD_KEY, topic);
+
+        if (topic != null && !topic.isEmpty()) {
+            final String[] topicSegments = topic.split(TOPIC_SEPARATOR, -1);
+            record.setValue(TOPIC_SEGMENTS_FIELD_KEY, topicSegments);
+        }
+    }
+
     private void closeWriter(final RecordSetWriter writer) {
         try {
             if (writer != null) {
diff --git 
a/nifi-extension-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
 
b/nifi-extension-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
index 63d7c14abf..cc1969802d 100644
--- 
a/nifi-extension-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
+++ 
b/nifi-extension-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
@@ -40,7 +40,9 @@ import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
@@ -55,6 +57,7 @@ import static 
org.apache.nifi.processors.mqtt.common.MqttTestUtil.createJsonReco
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
@@ -73,7 +76,7 @@ public class TestConsumeMQTT {
     private static final String INVALID_BROKER_URI = "http://localhost:1883";;
     private static final String INVALID_CLUSTERED_BROKER_URI = 
"ssl://localhost:1883,tcp://localhost:1884";
     private static final String CLIENT_ID = "TestClient";
-    private static final String TOPIC_NAME = "testTopic";
+    private static final String TOPIC_NAME = "test/topic";
     private static final String INTERNAL_QUEUE_SIZE = "100";
 
     private static final String STRING_MESSAGE = "testMessage";
@@ -495,8 +498,8 @@ public class TestConsumeMQTT {
 
         final List<MockFlowFile> flowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
         assertEquals(1, flowFiles.size());
-        assertEquals("[{\"name\":\"Apache 
NiFi\",\"_topic\":\"testTopic\",\"_qos\":0,\"_isDuplicate\":false,\"_isRetained\":false},"
-                        + "{\"name\":\"Apache 
NiFi\",\"_topic\":\"testTopic\",\"_qos\":0,\"_isDuplicate\":false,\"_isRetained\":false}]",
+        assertEquals("[{\"name\":\"Apache 
NiFi\",\"_topic\":\"test/topic\",\"_topicSegments\":[\"test\",\"topic\"],\"_qos\":0,\"_isDuplicate\":false,\"_isRetained\":false},"
+                        + "{\"name\":\"Apache 
NiFi\",\"_topic\":\"test/topic\",\"_topicSegments\":[\"test\",\"topic\"],\"_qos\":0,\"_isDuplicate\":false,\"_isRetained\":false}]",
                 new String(flowFiles.getFirst().toByteArray()));
 
         final List<MockFlowFile> badFlowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE);
@@ -645,6 +648,117 @@ public class TestConsumeMQTT {
         assertTrue(mqttQueue.contains(mock));
     }
 
+    @Test
+    void addTopicAttributesWithMultipleTopicSegments() {
+        final String topic = "home/livingroom/temperature";
+
+        final Map<String, String> attributes = new HashMap<>();
+        new ConsumeMQTT().addTopicAttributes(attributes, topic);
+
+        assertEquals(4, attributes.size(), "Expected 4 attributes (1 for full 
topic + 3 for segments)");
+        assertEquals(topic, attributes.get("mqtt.topic"), "Full topic should 
be present");
+        assertEquals("home", attributes.get("mqtt.topic.segment.0"));
+        assertEquals("livingroom", attributes.get("mqtt.topic.segment.1"));
+        assertEquals("temperature", attributes.get("mqtt.topic.segment.2"));
+        assertNull(attributes.get("mqtt.topic.segment.3"), "No further 
segments expected");
+    }
+
+    @Test
+    void addTopicAttributesWithLeadingSlashInTopic() {
+        final String topic = "/sensors/light";
+
+        final Map<String, String> attributes = new HashMap<>();
+        new ConsumeMQTT().addTopicAttributes(attributes, topic);
+
+        assertEquals(4, attributes.size(), "Expected 4 attributes (1 for full 
topic + 3 for segments)");
+        assertEquals(topic, attributes.get("mqtt.topic"), "Full topic should 
be present");
+        assertEquals("", attributes.get("mqtt.topic.segment.0"), "Segment 0 
should be empty for leading slash");
+        assertEquals("sensors", attributes.get("mqtt.topic.segment.1"));
+        assertEquals("light", attributes.get("mqtt.topic.segment.2"));
+        assertNull(attributes.get("mqtt.topic.segment.3"), "No further 
segments expected");
+    }
+
+    @Test
+    void addTopicAttributesWithTrailingSlashInTopicName() {
+        final String topic = "data/device/";
+
+        final Map<String, String> attributes = new HashMap<>();
+        new ConsumeMQTT().addTopicAttributes(attributes, topic);
+
+        assertEquals(4, attributes.size(), "Expected 4 attributes (1 for full 
topic + 3 for segments)");
+        assertEquals(topic, attributes.get("mqtt.topic"), "Full topic should 
be present");
+        assertEquals("data", attributes.get("mqtt.topic.segment.0"));
+        assertEquals("device", attributes.get("mqtt.topic.segment.1"));
+        assertEquals("", attributes.get("mqtt.topic.segment.2"), "Segment 2 
should be empty for trailing slash");
+        assertNull(attributes.get("mqtt.topic.segment.3"), "No further 
segments expected");
+    }
+
+    @Test
+    void addTopicAttributesWithSingleSlashAsTopic() {
+        final String topic = "/";
+
+        final Map<String, String> attributes = new HashMap<>();
+        new ConsumeMQTT().addTopicAttributes(attributes, topic);
+
+        assertEquals(3, attributes.size(), "Expected 3 attributes (1 for full 
topic + 2 for segments)");
+        assertEquals(topic, attributes.get("mqtt.topic"), "Full topic should 
be present");
+        assertEquals("", attributes.get("mqtt.topic.segment.0"));
+        assertEquals("", attributes.get("mqtt.topic.segment.1"));
+        assertNull(attributes.get("mqtt.topic.segment.2"), "No further 
segments expected");
+    }
+
+    @Test
+    void addTopicAttributes_consecutiveSlashesTopic_addsCorrectly() {
+        final String topic = "status//alerts";
+
+        final Map<String, String> attributes = new HashMap<>();
+        new ConsumeMQTT().addTopicAttributes(attributes, topic);
+
+        assertEquals(4, attributes.size(), "Expected 4 attributes (1 for full 
topic + 3 for segments)");
+        assertEquals(topic, attributes.get("mqtt.topic"), "Full topic should 
be present");
+        assertEquals("status", attributes.get("mqtt.topic.segment.0"));
+        assertEquals("", attributes.get("mqtt.topic.segment.1"));
+        assertEquals("alerts", attributes.get("mqtt.topic.segment.2"));
+        assertNull(attributes.get("mqtt.topic.segment.3"), "No further 
segments expected");
+    }
+
+    @Test
+    void addTopicAttributesWithEmptyTopic() {
+        final String topic = "";
+
+        final Map<String, String> attributes = new HashMap<>();
+        new ConsumeMQTT().addTopicAttributes(attributes, topic);
+
+        assertEquals(1, attributes.size(), "Expected only 1 attribute (for 
full topic)");
+        assertEquals(topic, attributes.get("mqtt.topic"), "Full topic should 
be present and empty");
+        assertNull(attributes.get("mqtt.topic.segment.0"), "No segments should 
be added for empty topic");
+    }
+
+    @Test
+    void addTopicAttributesWithNullTopic() {
+        final String topic = null;
+
+        final Map<String, String> attributes = new HashMap<>();
+        new ConsumeMQTT().addTopicAttributes(attributes, topic);
+
+        assertEquals(1, attributes.size(), "Expected only 1 attribute (for 
full topic)");
+        assertNull(attributes.get("mqtt.topic"), "Full topic should be null");
+        assertNull(attributes.get("mqtt.topic.segment.0"), "No segments should 
be added for null topic");
+    }
+
+    @Test
+    void addTopicAttributesWithTopicWithoutSlashes() {
+        final String topic = "sensors";
+
+        final Map<String, String> attributes = new HashMap<>();
+        new ConsumeMQTT().addTopicAttributes(attributes, topic);
+
+        assertEquals(2, attributes.size(), "Expected 2 attributes (1 for full 
topic + 1 for segment)");
+        assertEquals(topic, attributes.get("mqtt.topic"), "Full topic should 
be present");
+        assertEquals("sensors", attributes.get("mqtt.topic.segment.0"));
+        assertNull(attributes.get("mqtt.topic.segment.1"), "No further 
segments expected");
+    }
+
     private TestRunner initializeTestRunner() {
         if (mqttTestClient != null) {
             throw new IllegalStateException("mqttTestClient should be null, 
using ConsumeMQTT's default client!");

Reply via email to