[
https://issues.apache.org/jira/browse/NIFI-2045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15342634#comment-15342634
]
ASF GitHub Bot commented on NIFI-2045:
--------------------------------------
Github user JPercivall commented on a diff in the pull request:
https://github.com/apache/nifi/pull/548#discussion_r67948120
--- Diff:
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
---
@@ -289,8 +296,10 @@ public void process(final OutputStream out) throws
IOException {
String transitUri = new
StringBuilder(broker).append(mqttMessage.getTopic()).toString();
session.getProvenanceReporter().receive(messageFlowfile,
transitUri);
session.transfer(messageFlowfile, REL_MESSAGE);
- mqttQueue.remove(mqttMessage);
session.commit();
+ if (!mqttQueue.remove(mqttMessage)) {
+ logger.warn("Mqtt message " + mqttMessage + " had already
been removed from queue, possible duplication of flow files");
--- End diff --
Thinking through this error message and how it would be useful to a user, I
would like some kind of UUID to output here (like a message id) but there
aren't any in MQTT. So I think it would be better to word the warning a
different way by referencing the FlowFile that could potentially contain
duplicate data. By having a warning message with the UUID of the FlowFile
containing potentially duplicate data the user may be able to correlate that
using provenance to find the other FlowFile with duplicate data.
> ConsumeMQTT can lose a message if session commit fails
> ------------------------------------------------------
>
> Key: NIFI-2045
> URL: https://issues.apache.org/jira/browse/NIFI-2045
> Project: Apache NiFi
> Issue Type: Bug
> Affects Versions: 1.0.0
> Reporter: Bryan Rosander
> Priority: Critical
>
> ConsumeMQTT removes the received message from the queue before committing the
> session. This means that if the session commit fails and the session rolls
> back, the message will be lost.
> https://github.com/apache/nifi/blob/f47af1ce8336c9305916f00738976f3505b01b0b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java#L292
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)