[ 
https://issues.apache.org/jira/browse/NIFI-2045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15342634#comment-15342634
 ] 

ASF GitHub Bot commented on NIFI-2045:
--------------------------------------

Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/548#discussion_r67948120
  
    --- Diff: 
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
 ---
    @@ -289,8 +296,10 @@ public void process(final OutputStream out) throws 
IOException {
                 String transitUri = new 
StringBuilder(broker).append(mqttMessage.getTopic()).toString();
                 session.getProvenanceReporter().receive(messageFlowfile, 
transitUri);
                 session.transfer(messageFlowfile, REL_MESSAGE);
    -            mqttQueue.remove(mqttMessage);
                 session.commit();
    +            if (!mqttQueue.remove(mqttMessage)) {
    +                logger.warn("Mqtt message " + mqttMessage + " had already 
been removed from queue, possible duplication of flow files");
    --- End diff --
    
    Thinking through this error message and how it would be useful to a user, I 
would like some kind of UUID to output here (like a message id) but there 
aren't any in MQTT. So I think it would be better to word the warning a 
different way by referencing the FlowFile that could potentially contain 
duplicate data. By having a warning message with the UUID of the FlowFile 
containing potentially duplicate data the user may be able to correlate that 
using provenance to find the other FlowFile with duplicate data.


> ConsumeMQTT can lose a message if session commit fails
> ------------------------------------------------------
>
>                 Key: NIFI-2045
>                 URL: https://issues.apache.org/jira/browse/NIFI-2045
>             Project: Apache NiFi
>          Issue Type: Bug
>    Affects Versions: 1.0.0
>            Reporter: Bryan Rosander
>            Priority: Critical
>
> ConsumeMQTT removes the received message from the queue before committing the 
> session.  This means that if the session commit fails and the session rolls 
> back, the message will be lost.
> https://github.com/apache/nifi/blob/f47af1ce8336c9305916f00738976f3505b01b0b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java#L292



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to