[ 
https://issues.apache.org/jira/browse/NIFI-15483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18083103#comment-18083103
 ] 

ASF subversion and git services commented on NIFI-15483:
--------------------------------------------------------

Commit 8d15f03739761a777b69d743b3c3cb7e80de8f60 in nifi's branch 
refs/heads/main from Rakesh Kumar Singh
[ https://gitbox.apache.org/repos/asf?p=nifi.git;h=8d15f037397 ]

NIFI-15483: Fixed PublishAMQP routing FlowFiles to success when broker cannot 
deliver message

PublishAMQP uses mandatory=true on basicPublish() so the broker returns
messages it cannot route to any queue. However, the return arrives
asynchronously via ReturnListener.handleReturn() on the AMQP I/O thread
while the publishing thread had already moved on to 
session.transfer(REL_SUCCESS).
The UndeliverableMessageLogger only logged a warning — it never signaled
failure back to publish() or onTrigger(), so every unroutable message was
silently counted as a success despite never reaching any consumer.

Fix:
- Enabled Publisher Confirms (channel.confirmSelect()) in the constructor.
  The broker's basic.return frame for an unroutable message is guaranteed
  to arrive before the corresponding confirm frame, so waitForConfirms()
  acts as a synchronization barrier that makes return detection reliable.
- Added an AtomicReference<String> field (undeliverableReturnReason) that
  UndeliverableMessageLogger.handleReturn() populates with exchange/routingKey/
  replyCode/replyText when a message is returned.
- publish() now: resets the field before each call, calls waitForConfirms(5s)
  to synchronize with the broker, then checks the field and throws AMQPException
  if the message was returned — causing onTrigger() to route to REL_FAILURE.
- Broker NACKs (e.g., resource alarm) are also now surfaced as AMQPException
  because waitForConfirms() returns false on NACK.

- Added regression tests to verify that AMQPPublisher and PublishAMQP correctly
  route FlowFiles to REL_FAILURE for all broker-side failure modes:

- Added ShutdownSignalException to the catch block in AMQPPublisher.publish()
- Converts the channel-close signal into AMQPException so PublishAMQP routes
   the FlowFile to REL_FAILURE with a descriptive error message
- Added ShutdownSignalException import

NIFI-15483: Added Delivery Guarantee property to make Publisher Confirms opt-in

Added a new Delivery Guarantee property to PublishAMQP with two options:

At most once (default): works like the original - sends the message without
waiting for a broker reply. If the message cannot be delivered, only a warning
is logged and the FlowFile routes to success. Best for high throughput.

At least once: turns on RabbitMQ Publisher Confirms. The processor waits for
the broker to confirm the message before routing. If the message is returned
or the broker sends a NACK, the FlowFile routes to failure instead of success.
This prevents silent data loss but can be much slower, especially with remote
brokers.

This closes #11213.

Signed-off-by: Peter Turcsanyi <[email protected]>


> PublishAMQP doesn't route on publish failure
> --------------------------------------------
>
>                 Key: NIFI-15483
>                 URL: https://issues.apache.org/jira/browse/NIFI-15483
>             Project: Apache NiFi
>          Issue Type: Bug
>          Components: Extensions
>    Affects Versions: 2.6.0
>         Environment: RHEL 9, NiFi 2.6.0 + RabbitMQ 4.2
>            Reporter: Will James
>            Assignee: Rakesh Kumar Singh
>            Priority: Critical
>          Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> The issue from NIFI-5639 still persists in NiFi v2. Essentially, If you 
> attempt to publish a message using PublishAMQP using invalid properties (e.g. 
> incorrect exchange name or user permission), the message would fail to 
> publish but will still go to the 'success' relationship.  This contradicts 
> the PublishAMQP documentation for the the failure relationship: '{_}All 
> FlowFiles that cannot be routed to the AMQP destination are routed to this 
> relationship.'{_}
> Steps to reproduce:
>  # Set up a *PublishAMQP* processor that successfully publishes to a RabbitMQ 
> queue
>  # Change the _*Exchange Name*_ property on the *PublishAMQP* to an exchange 
> that doesn't exist in RabbitMQ.
>  # Send a valid flow file to the *PublishAMQP* processor. RabbitMQ will fail 
> to process the message, but NiFi still routes it to 'success'.  Depending on 
> the configuration of RabbitMQ, there may be an error bulletin on the NiFi 
> processor. I've seen this happen when publishing using a user with limited 
> queue/exchange permissions - NiFi shows a permission denied response and 
> still routes to success. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to