Peter Gyori created NIFI-9410:
---------------------------------

             Summary: ConsumeMQTT does not work in stateless flows
                 Key: NIFI-9410
                 URL: https://issues.apache.org/jira/browse/NIFI-9410
             Project: Apache NiFi
          Issue Type: Bug
          Components: Extensions
            Reporter: Peter Gyori
            Assignee: Peter Gyori


When ConsumeMQTT is executed in stateless environment, the {{transferQueue()}} 
method gets into an endless loop ({{{}while (!mqttQueue.isEmpty()){}}}). At the 
end of each iteration of this loop

 
{code:java}
session.commitAsync(() -> mqttQueue.remove(mqttMessage));{code}
 

is supposed to remove the message from the queue. but since the commit is 
async, the method is already processing the same message in the next iteration 
by the time it would execute. The session cannot commit so it does a rollback, 
and the message is never removed from the queue.

The endless loop consumes all resources and NiFi needs to be restarted to break 
the loop.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to