[ 
https://issues.apache.org/jira/browse/NIFI-2045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15342791#comment-15342791
 ] 

ASF GitHub Bot commented on NIFI-2045:
--------------------------------------

Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/548#discussion_r67957184
  
    --- Diff: 
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
 ---
    @@ -288,9 +289,13 @@ public void process(final OutputStream out) throws 
IOException {
     
                 String transitUri = new 
StringBuilder(broker).append(mqttMessage.getTopic()).toString();
                 session.getProvenanceReporter().receive(messageFlowfile, 
transitUri);
    +            String uuid = 
messageFlowfile.getAttribute(CoreAttributes.UUID.key());
                 session.transfer(messageFlowfile, REL_MESSAGE);
    -            mqttQueue.remove(mqttMessage);
                 session.commit();
    +            if (!mqttQueue.remove(mqttMessage)) {
    --- End diff --
    
    In order to avoid concatenating the Strings in the logger message 
needlessly, this should check if warn is enabled too: 
    `... && getLogger().isWarnEnabled()`


> ConsumeMQTT can lose a message if session commit fails
> ------------------------------------------------------
>
>                 Key: NIFI-2045
>                 URL: https://issues.apache.org/jira/browse/NIFI-2045
>             Project: Apache NiFi
>          Issue Type: Bug
>    Affects Versions: 1.0.0
>            Reporter: Bryan Rosander
>            Priority: Critical
>
> ConsumeMQTT removes the received message from the queue before committing the 
> session.  This means that if the session commit fails and the session rolls 
> back, the message will be lost.
> https://github.com/apache/nifi/blob/f47af1ce8336c9305916f00738976f3505b01b0b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java#L292



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to