This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new c7ba1b0b73 NIFI-14767 Enrich flowfile attributes with mqtt topic
segments produced by ConsumeMQTT
c7ba1b0b73 is described below
commit c7ba1b0b73c11ec8454ac44e51fae5fd2e9b83e6
Author: tpalfy <[email protected]>
AuthorDate: Fri Jul 11 17:13:48 2025 +0200
NIFI-14767 Enrich flowfile attributes with mqtt topic segments produced by
ConsumeMQTT
Signed-off-by: Pierre Villard <[email protected]>
This closes #10108.
---
.../apache/nifi/processors/mqtt/ConsumeMQTT.java | 34 +++++-
.../nifi/processors/mqtt/TestConsumeMQTT.java | 120 ++++++++++++++++++++-
2 files changed, 149 insertions(+), 5 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
b/nifi-extension-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
index c0da9cff39..7cbff78996 100644
---
a/nifi-extension-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
+++
b/nifi-extension-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
@@ -105,11 +105,14 @@ public class ConsumeMQTT extends AbstractMQTTProcessor {
public final static String RECORD_COUNT_KEY = "record.count";
public final static String BROKER_ATTRIBUTE_KEY = "mqtt.broker";
public final static String TOPIC_ATTRIBUTE_KEY = "mqtt.topic";
+ public final static String TOPIC_SEGMENT_PREFIX = "mqtt.topic.segment.";
+ public final static String TOPIC_SEPARATOR = "/";
public final static String QOS_ATTRIBUTE_KEY = "mqtt.qos";
public final static String IS_DUPLICATE_ATTRIBUTE_KEY = "mqtt.isDuplicate";
public final static String IS_RETAINED_ATTRIBUTE_KEY = "mqtt.isRetained";
public final static String TOPIC_FIELD_KEY = "_topic";
+ public final static String TOPIC_SEGMENTS_FIELD_KEY = "_topicSegments";
public final static String QOS_FIELD_KEY = "_qos";
public final static String IS_DUPLICATE_FIELD_KEY = "_isDuplicate";
public final static String IS_RETAINED_FIELD_KEY = "_isRetained";
@@ -440,7 +443,7 @@ public class ConsumeMQTT extends AbstractMQTTProcessor {
final Map<String, String> attrs = new HashMap<>();
attrs.put(BROKER_ATTRIBUTE_KEY, clientProperties.getRawBrokerUris());
- attrs.put(TOPIC_ATTRIBUTE_KEY, mqttMessage.getTopic());
+ addTopicAttributes(attrs, mqttMessage.getTopic());
attrs.put(QOS_ATTRIBUTE_KEY, String.valueOf(mqttMessage.getQos()));
attrs.put(IS_DUPLICATE_ATTRIBUTE_KEY,
String.valueOf(mqttMessage.isDuplicate()));
attrs.put(IS_RETAINED_ATTRIBUTE_KEY,
String.valueOf(mqttMessage.isRetained()));
@@ -449,6 +452,20 @@ public class ConsumeMQTT extends AbstractMQTTProcessor {
return messageFlowfile;
}
+ void addTopicAttributes(
+ final Map<String, String> attributes,
+ final String topic
+ ) {
+ attributes.put(TOPIC_ATTRIBUTE_KEY, topic);
+
+ if (topic != null && !topic.isEmpty()) {
+ final String[] segments = topic.split(TOPIC_SEPARATOR, -1);
+ for (int topicSegmentIndex = 0; topicSegmentIndex <
segments.length; topicSegmentIndex++) {
+ attributes.put(TOPIC_SEGMENT_PREFIX + topicSegmentIndex,
segments[topicSegmentIndex]);
+ }
+ }
+ }
+
private void transferQueueRecord(final ProcessContext context, final
ProcessSession session) {
final RecordReaderFactory readerFactory =
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final RecordSetWriterFactory writerFactory =
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
@@ -500,6 +517,7 @@ public class ConsumeMQTT extends AbstractMQTTProcessor {
final List<RecordField> fields = new
ArrayList<>(writeSchema.getFields());
fields.add(new
RecordField(TOPIC_FIELD_KEY, RecordFieldType.STRING.getDataType()));
+ fields.add(new
RecordField(TOPIC_SEGMENTS_FIELD_KEY,
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType())));
fields.add(new
RecordField(QOS_FIELD_KEY, RecordFieldType.INT.getDataType()));
fields.add(new
RecordField(IS_DUPLICATE_FIELD_KEY, RecordFieldType.BOOLEAN.getDataType()));
fields.add(new
RecordField(IS_RETAINED_FIELD_KEY, RecordFieldType.BOOLEAN.getDataType()));
@@ -518,7 +536,7 @@ public class ConsumeMQTT extends AbstractMQTTProcessor {
try {
if
(context.getProperty(ADD_ATTRIBUTES_AS_FIELDS).asBoolean()) {
- record.setValue(TOPIC_FIELD_KEY,
mqttMessage.getTopic());
+ addTopicFields(record,
mqttMessage.getTopic());
record.setValue(QOS_FIELD_KEY,
mqttMessage.getQos());
record.setValue(IS_RETAINED_FIELD_KEY,
mqttMessage.isRetained());
record.setValue(IS_DUPLICATE_FIELD_KEY,
mqttMessage.isDuplicate());
@@ -591,6 +609,18 @@ public class ConsumeMQTT extends AbstractMQTTProcessor {
logger.info("Successfully processed {} records for {}", count,
flowFile);
}
+ private void addTopicFields(
+ final Record record,
+ final String topic
+ ) {
+ record.setValue(TOPIC_FIELD_KEY, topic);
+
+ if (topic != null && !topic.isEmpty()) {
+ final String[] topicSegments = topic.split(TOPIC_SEPARATOR, -1);
+ record.setValue(TOPIC_SEGMENTS_FIELD_KEY, topicSegments);
+ }
+ }
+
private void closeWriter(final RecordSetWriter writer) {
try {
if (writer != null) {
diff --git
a/nifi-extension-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
b/nifi-extension-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
index 63d7c14abf..cc1969802d 100644
---
a/nifi-extension-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
+++
b/nifi-extension-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
@@ -40,7 +40,9 @@ import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -55,6 +57,7 @@ import static
org.apache.nifi.processors.mqtt.common.MqttTestUtil.createJsonReco
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
@@ -73,7 +76,7 @@ public class TestConsumeMQTT {
private static final String INVALID_BROKER_URI = "http://localhost:1883";
private static final String INVALID_CLUSTERED_BROKER_URI =
"ssl://localhost:1883,tcp://localhost:1884";
private static final String CLIENT_ID = "TestClient";
- private static final String TOPIC_NAME = "testTopic";
+ private static final String TOPIC_NAME = "test/topic";
private static final String INTERNAL_QUEUE_SIZE = "100";
private static final String STRING_MESSAGE = "testMessage";
@@ -495,8 +498,8 @@ public class TestConsumeMQTT {
final List<MockFlowFile> flowFiles =
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
assertEquals(1, flowFiles.size());
- assertEquals("[{\"name\":\"Apache
NiFi\",\"_topic\":\"testTopic\",\"_qos\":0,\"_isDuplicate\":false,\"_isRetained\":false},"
- + "{\"name\":\"Apache
NiFi\",\"_topic\":\"testTopic\",\"_qos\":0,\"_isDuplicate\":false,\"_isRetained\":false}]",
+ assertEquals("[{\"name\":\"Apache
NiFi\",\"_topic\":\"test/topic\",\"_topicSegments\":[\"test\",\"topic\"],\"_qos\":0,\"_isDuplicate\":false,\"_isRetained\":false},"
+ + "{\"name\":\"Apache
NiFi\",\"_topic\":\"test/topic\",\"_topicSegments\":[\"test\",\"topic\"],\"_qos\":0,\"_isDuplicate\":false,\"_isRetained\":false}]",
new String(flowFiles.getFirst().toByteArray()));
final List<MockFlowFile> badFlowFiles =
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE);
@@ -645,6 +648,117 @@ public class TestConsumeMQTT {
assertTrue(mqttQueue.contains(mock));
}
+ @Test
+ void addTopicAttributesWithMultipleTopicSegments() {
+ final String topic = "home/livingroom/temperature";
+
+ final Map<String, String> attributes = new HashMap<>();
+ new ConsumeMQTT().addTopicAttributes(attributes, topic);
+
+ assertEquals(4, attributes.size(), "Expected 4 attributes (1 for full
topic + 3 for segments)");
+ assertEquals(topic, attributes.get("mqtt.topic"), "Full topic should
be present");
+ assertEquals("home", attributes.get("mqtt.topic.segment.0"));
+ assertEquals("livingroom", attributes.get("mqtt.topic.segment.1"));
+ assertEquals("temperature", attributes.get("mqtt.topic.segment.2"));
+ assertNull(attributes.get("mqtt.topic.segment.3"), "No further
segments expected");
+ }
+
+ @Test
+ void addTopicAttributesWithLeadingSlashInTopic() {
+ final String topic = "/sensors/light";
+
+ final Map<String, String> attributes = new HashMap<>();
+ new ConsumeMQTT().addTopicAttributes(attributes, topic);
+
+ assertEquals(4, attributes.size(), "Expected 4 attributes (1 for full
topic + 3 for segments)");
+ assertEquals(topic, attributes.get("mqtt.topic"), "Full topic should
be present");
+ assertEquals("", attributes.get("mqtt.topic.segment.0"), "Segment 0
should be empty for leading slash");
+ assertEquals("sensors", attributes.get("mqtt.topic.segment.1"));
+ assertEquals("light", attributes.get("mqtt.topic.segment.2"));
+ assertNull(attributes.get("mqtt.topic.segment.3"), "No further
segments expected");
+ }
+
+ @Test
+ void addTopicAttributesWithTrailingSlashInTopicName() {
+ final String topic = "data/device/";
+
+ final Map<String, String> attributes = new HashMap<>();
+ new ConsumeMQTT().addTopicAttributes(attributes, topic);
+
+ assertEquals(4, attributes.size(), "Expected 4 attributes (1 for full
topic + 3 for segments)");
+ assertEquals(topic, attributes.get("mqtt.topic"), "Full topic should
be present");
+ assertEquals("data", attributes.get("mqtt.topic.segment.0"));
+ assertEquals("device", attributes.get("mqtt.topic.segment.1"));
+ assertEquals("", attributes.get("mqtt.topic.segment.2"), "Segment 2
should be empty for trailing slash");
+ assertNull(attributes.get("mqtt.topic.segment.3"), "No further
segments expected");
+ }
+
+ @Test
+ void addTopicAttributesWithSingleSlashAsTopic() {
+ final String topic = "/";
+
+ final Map<String, String> attributes = new HashMap<>();
+ new ConsumeMQTT().addTopicAttributes(attributes, topic);
+
+ assertEquals(3, attributes.size(), "Expected 3 attributes (1 for full
topic + 2 for segments)");
+ assertEquals(topic, attributes.get("mqtt.topic"), "Full topic should
be present");
+ assertEquals("", attributes.get("mqtt.topic.segment.0"));
+ assertEquals("", attributes.get("mqtt.topic.segment.1"));
+ assertNull(attributes.get("mqtt.topic.segment.2"), "No further
segments expected");
+ }
+
+ @Test
+ void addTopicAttributes_consecutiveSlashesTopic_addsCorrectly() {
+ final String topic = "status//alerts";
+
+ final Map<String, String> attributes = new HashMap<>();
+ new ConsumeMQTT().addTopicAttributes(attributes, topic);
+
+ assertEquals(4, attributes.size(), "Expected 4 attributes (1 for full
topic + 3 for segments)");
+ assertEquals(topic, attributes.get("mqtt.topic"), "Full topic should
be present");
+ assertEquals("status", attributes.get("mqtt.topic.segment.0"));
+ assertEquals("", attributes.get("mqtt.topic.segment.1"));
+ assertEquals("alerts", attributes.get("mqtt.topic.segment.2"));
+ assertNull(attributes.get("mqtt.topic.segment.3"), "No further
segments expected");
+ }
+
+ @Test
+ void addTopicAttributesWithEmptyTopic() {
+ final String topic = "";
+
+ final Map<String, String> attributes = new HashMap<>();
+ new ConsumeMQTT().addTopicAttributes(attributes, topic);
+
+ assertEquals(1, attributes.size(), "Expected only 1 attribute (for
full topic)");
+ assertEquals(topic, attributes.get("mqtt.topic"), "Full topic should
be present and empty");
+ assertNull(attributes.get("mqtt.topic.segment.0"), "No segments should
be added for empty topic");
+ }
+
+ @Test
+ void addTopicAttributesWithNullTopic() {
+ final String topic = null;
+
+ final Map<String, String> attributes = new HashMap<>();
+ new ConsumeMQTT().addTopicAttributes(attributes, topic);
+
+ assertEquals(1, attributes.size(), "Expected only 1 attribute (for
full topic)");
+ assertNull(attributes.get("mqtt.topic"), "Full topic should be null");
+ assertNull(attributes.get("mqtt.topic.segment.0"), "No segments should
be added for null topic");
+ }
+
+ @Test
+ void addTopicAttributesWithTopicWithoutSlashes() {
+ final String topic = "sensors";
+
+ final Map<String, String> attributes = new HashMap<>();
+ new ConsumeMQTT().addTopicAttributes(attributes, topic);
+
+ assertEquals(2, attributes.size(), "Expected 2 attributes (1 for full
topic + 1 for segment)");
+ assertEquals(topic, attributes.get("mqtt.topic"), "Full topic should
be present");
+ assertEquals("sensors", attributes.get("mqtt.topic.segment.0"));
+ assertNull(attributes.get("mqtt.topic.segment.1"), "No further
segments expected");
+ }
+
private TestRunner initializeTestRunner() {
if (mqttTestClient != null) {
throw new IllegalStateException("mqttTestClient should be null,
using ConsumeMQTT's default client!");