[ 
https://issues.apache.org/jira/browse/NIFI-5117?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Edward Armes updated NIFI-5117:
-------------------------------
    Description: 
The AMQP Consumer performs a "basicGet()". The was this basicGet is called 
results in the message being dequeued from the AMQP queue.

If a processor instances fails to submit a flow file to the output as a result 
in the case of an error in "session.write()" or the processor is unexpectedly 
halted before the flow file is created and persisted, the message consumer from 
an AMQP queue is lost and can't be recovered.

Reference: 
[https://rabbitmq.github.io/rabbitmq-java-client/api/current/com/rabbitmq/client/Channel.html#basicGet-java.lang.String-boolean-:]

A potential fix here would be to:
 # AMQPConsumer.java: Change the call "basicGet(this.queueName, true)" -> 
"basicGet(this.queueName, false)"
 # AMQPConsumer.java: New method that wraps the basicAck() and basicNack() 
methods to taking a long (the delivery tag) and boolean (successes) if 
successes is true basicAck() is called is false basicNack() with requeue is 
called
 # ConsumerAMQP.java: An additional call(s) to "consumer" to call the new 
method as needed in case of successes and error.

  was:
The AMQP Consumer performs a "basicGet()". The was this basicGet is called 
results in the message being dequeued from the AMQP queue.

If a processor instances fails to submit a flow file to the output as a result 
in the case of an error in "session.write()" or the processor is unexpectedly 
halted before the flow file is created and persisted, the message consumer from 
an AMQP queue is lost and can't be recovered.

Reference: 
https://rabbitmq.github.io/rabbitmq-java-client/api/current/com/rabbitmq/client/Channel.html#basicGet-java.lang.String-boolean-:

A potential fix here would be to:
 # AMQPConsumer.java: Change the call "basicGet(this.queueName, true)" -> 
"basicGet(this.queueName, false)" 
 # AMQPConsumer.java: New method that wraps the basicAck() and basicNack() 
methods to taking a long (the delivery tag) and boolean (successes) if 
successes is true basicAck() is called is false basicNack() with requeue is 
called
 # ConsumerAMQP.java: An additional call(s) to "consumer" to call the new 
method as needed in case of successes and error.

Happy to submit a patch if that helps


> AMQP Consumer: Error during creation of Flow File results in lost message
> -------------------------------------------------------------------------
>
>                 Key: NIFI-5117
>                 URL: https://issues.apache.org/jira/browse/NIFI-5117
>             Project: Apache NiFi
>          Issue Type: Bug
>          Components: Extensions
>            Reporter: Edward Armes
>            Priority: Major
>
> The AMQP Consumer performs a "basicGet()". The was this basicGet is called 
> results in the message being dequeued from the AMQP queue.
> If a processor instances fails to submit a flow file to the output as a 
> result in the case of an error in "session.write()" or the processor is 
> unexpectedly halted before the flow file is created and persisted, the 
> message consumer from an AMQP queue is lost and can't be recovered.
> Reference: 
> [https://rabbitmq.github.io/rabbitmq-java-client/api/current/com/rabbitmq/client/Channel.html#basicGet-java.lang.String-boolean-:]
> A potential fix here would be to:
>  # AMQPConsumer.java: Change the call "basicGet(this.queueName, true)" -> 
> "basicGet(this.queueName, false)"
>  # AMQPConsumer.java: New method that wraps the basicAck() and basicNack() 
> methods to taking a long (the delivery tag) and boolean (successes) if 
> successes is true basicAck() is called is false basicNack() with requeue is 
> called
>  # ConsumerAMQP.java: An additional call(s) to "consumer" to call the new 
> method as needed in case of successes and error.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to