Edward Armes created NIFI-5117:
----------------------------------

             Summary: AMQP Consumer: Error during creation of Flow File results 
in lost message
                 Key: NIFI-5117
                 URL: https://issues.apache.org/jira/browse/NIFI-5117
             Project: Apache NiFi
          Issue Type: Bug
          Components: Extensions
            Reporter: Edward Armes


The AMQP Consumer performs a "basicGet()". The was this basicGet is called 
results in the message being dequeued from the AMQP queue.

If a processor instances fails to submit a flow file to the output as a result 
in the case of an error in "session.write()" or the processor is unexpectedly 
halted before the flow file is created and persisted, the message consumer from 
an AMQP queue is lost and can't be recovered.

Reference: 
https://rabbitmq.github.io/rabbitmq-java-client/api/current/com/rabbitmq/client/Channel.html#basicGet-java.lang.String-boolean-:

A potential fix here would be to:
 # AMQPConsumer.java: Change the call "basicGet(this.queueName, true)" -> 
"basicGet(this.queueName, false)" 
 # AMQPConsumer.java: New method that wraps the basicAck() and basicNack() 
methods to taking a long (the delivery tag) and boolean (successes) if 
successes is true basicAck() is called is false basicNack() with requeue is 
called
 # ConsumerAMQP.java: An additional call(s) to "consumer" to call the new 
method as needed in case of successes and error.

Happy to submit a patch if that helps



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to