This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 2dcff09 NIFI-9410: Fix for ConsumeMQTT processor in stateless
environment
2dcff09 is described below
commit 2dcff09f7f97539fdf1ffd9e7b3fe75670a5adf2
Author: Peter Gyori <[email protected]>
AuthorDate: Tue Nov 23 17:04:00 2021 +0100
NIFI-9410: Fix for ConsumeMQTT processor in stateless environment
NIFI-9410: Added displayName to the QoS processor property
This closes #5549.
Signed-off-by: Peter Turcsanyi <[email protected]>
---
.../main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java | 8 +++++---
1 file changed, 5 insertions(+), 3 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
index 2755585..7c5a7ff 100644
---
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
+++
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
@@ -145,7 +145,8 @@ public class ConsumeMQTT extends AbstractMQTTProcessor
implements MqttCallback {
public static final PropertyDescriptor PROP_QOS = new
PropertyDescriptor.Builder()
.name("Quality of Service(QoS)")
- .description("The Quality of Service(QoS) to receive the message
with. Accepts values '0', '1' or '2'; '0' for 'at most once', '1' for 'at least
once', '2' for 'exactly once'.")
+ .displayName("Quality of Service (QoS)")
+ .description("The Quality of Service (QoS) to receive the message
with. Accepts values '0', '1' or '2'; '0' for 'at most once', '1' for 'at least
once', '2' for 'exactly once'.")
.required(true)
.defaultValue(ALLOWABLE_VALUE_QOS_0.getValue())
.allowableValues(
@@ -387,7 +388,7 @@ public class ConsumeMQTT extends AbstractMQTTProcessor
implements MqttCallback {
return;
}
- if(context.getProperty(RECORD_READER).isSet()) {
+ if (context.getProperty(RECORD_READER).isSet()) {
transferQueueRecord(context, session);
} else if (context.getProperty(MESSAGE_DEMARCATOR).isSet()) {
transferQueueDemarcator(context, session);
@@ -440,7 +441,8 @@ public class ConsumeMQTT extends AbstractMQTTProcessor
implements MqttCallback {
session.getProvenanceReporter().receive(messageFlowfile,
getTransitUri(mqttMessage.getTopic()));
session.transfer(messageFlowfile, REL_MESSAGE);
- session.commitAsync(() -> mqttQueue.remove(mqttMessage));
+ session.commitAsync();
+ mqttQueue.remove(mqttMessage);
}
}