turcsanyip commented on code in PR #11213:
URL: https://github.com/apache/nifi/pull/11213#discussion_r3288850157


##########
nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java:
##########
@@ -278,5 +278,43 @@ protected Connection createConnection(ProcessContext 
context, ExecutorService ex
         public Connection getConnection() {
             return connection;
         }
+
+        public TestChannel getTestChannel() {
+            return connection.getTestChannel();
+        }
+    }
+
+    /**
+     * When the broker closes the channel with a 404 (exchange not found), the 
FlowFile
+     * must route to REL_FAILURE — not cause an unhandled processor exception.
+     */
+    @Test
+    public void validateFlowFileRoutedToFailureWhenBrokerClosesChannel() {

Review Comment:
   Please move these test methods up, just after the last existing test method 
(and before the private method / nested class).



##########
nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java:
##########
@@ -278,5 +278,43 @@ protected Connection createConnection(ProcessContext 
context, ExecutorService ex
         public Connection getConnection() {
             return connection;
         }
+
+        public TestChannel getTestChannel() {
+            return connection.getTestChannel();
+        }
+    }
+
+    /**
+     * When the broker closes the channel with a 404 (exchange not found), the 
FlowFile
+     * must route to REL_FAILURE — not cause an unhandled processor exception.
+     */
+    @Test
+    public void validateFlowFileRoutedToFailureWhenBrokerClosesChannel() {
+        final LocalPublishAMQP proc = new LocalPublishAMQP();
+        final TestRunner testRunner = TestRunners.newTestRunner(proc);
+        setConnectionProperties(testRunner);
+        testRunner.setProperty(PublishAMQP.DELIVERY_GUARANTEE, 
PublishAMQP.DeliveryGuarantee.AT_LEAST_ONCE);
+        proc.getTestChannel().setSimulateShutdownOnConfirm(true);
+
+        testRunner.enqueue("Hello Joe".getBytes());
+        testRunner.run();
+
+        
assertTrue(testRunner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).isEmpty());
+        
assertNotNull(testRunner.getFlowFilesForRelationship(PublishAMQP.REL_FAILURE).getFirst());

Review Comment:
   To assert that the FlowFile has been transferred to a given relationship we 
typically use:
   ```
           testRunner.assertTransferCount(PublishAMQP.REL_SUCCESS, 0);
           testRunner.assertTransferCount(PublishAMQP.REL_FAILURE, 1);
   ```
   
   Or it could be even simpler here:
   ```
           testRunner.assertAllFlowFilesTransferred(PublishAMQP.REL_FAILURE);
   ```



##########
nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPPublisher.java:
##########
@@ -83,23 +139,24 @@ public String toString() {
     }
 
     /**
-     * Listener to listen and WARN-log undeliverable messages which are 
returned
-     * back to the sender. Since in the current implementation messages are 
sent
-     * with 'mandatory' bit set, such messages must have final destination
-     * otherwise they are silently dropped which could cause a confusion
-     * especially during early stages of flow development. This implies that
-     * bindings between exchange -> routingKey -> queue must exist and are
-     * typically done by AMQP administrator. This logger simply helps to 
monitor
-     * for such conditions by logging such messages as warning. In the future
-     * this can be extended to provide other type of functionality (e.g., fail
-     * processor etc.)
+     * Listens for messages returned by the broker when they cannot be routed 
to any queue
+     * (mandatory=true publish with no matching binding).
+     *
+     * In {@link PublishAMQP.DeliveryGuarantee#AT_MOST_ONCE} mode (the 
default), this listener
+     * only logs a warning — matching the original behaviour.
+     *
+     * In {@link PublishAMQP.DeliveryGuarantee#AT_LEAST_ONCE} mode, the return 
reason is also
+     * stored in {@link #undeliverableReturnReason} so that {@link #publish} 
can detect it after
+     * {@code waitForConfirms()} synchronizes the two threads and throw an 
{@link AMQPException}
+     * to trigger REL_FAILURE routing.
      */
     private final class UndeliverableMessageLogger implements ReturnListener {
         @Override
         public void handleReturn(int replyCode, String replyText, String 
exchangeName, String routingKey, BasicProperties properties, byte[] message) 
throws IOException {
-            String logMessage = "Message destined for '" + exchangeName + "' 
exchange with '" + routingKey
-                    + "' as routing key came back with replyCode=" + replyCode 
+ " and replyText=" + replyText + ".";
-            processorLog.warn(logMessage);
+            final String reason = "exchange='" + exchangeName + "' 
routingKey='" + routingKey
+                    + "' replyCode=" + replyCode + " replyText='" + replyText 
+ "'";
+            undeliverableReturnReason.set(reason);

Review Comment:
   Line 45 says: _Only populated when {@link #useConfirms} is true._
   So an `if` should be added:
   
   ```suggestion
               if (useConfirms) {
                   undeliverableReturnReason.set(reason);
               }
   ```



##########
nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPPublisherTest.java:
##########
@@ -105,4 +104,66 @@ public void 
validateSuccessfulPublishingAndUndeliverableRoutingKey() throws Exce
         connection.close();
     }
 
+    /**
+     * Verifies that a {@link com.rabbitmq.client.ShutdownSignalException} 
thrown by
+     * {@code waitForConfirms()} (e.g., broker closes channel with 404 
NOT_FOUND because the
+     * exchange does not exist) is converted to {@link AMQPException} so the 
FlowFile routes
+     * to REL_FAILURE instead of surfacing as an unhandled processor error.
+     */
+    @Test
+    public void failPublishWhenBrokerClosesChannelDuringConfirm() {

Review Comment:
   Similar to `succeedsPublishWhenMessageUndeliverableInAtMostOnceMode()` test 
method name, `*InAtLeastOnce` suffix could be added in these methods.



##########
nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPPublisher.java:
##########
@@ -83,23 +139,24 @@ public String toString() {
     }
 
     /**
-     * Listener to listen and WARN-log undeliverable messages which are 
returned
-     * back to the sender. Since in the current implementation messages are 
sent
-     * with 'mandatory' bit set, such messages must have final destination
-     * otherwise they are silently dropped which could cause a confusion
-     * especially during early stages of flow development. This implies that
-     * bindings between exchange -> routingKey -> queue must exist and are
-     * typically done by AMQP administrator. This logger simply helps to 
monitor
-     * for such conditions by logging such messages as warning. In the future
-     * this can be extended to provide other type of functionality (e.g., fail
-     * processor etc.)
+     * Listens for messages returned by the broker when they cannot be routed 
to any queue
+     * (mandatory=true publish with no matching binding).
+     *
+     * In {@link PublishAMQP.DeliveryGuarantee#AT_MOST_ONCE} mode (the 
default), this listener
+     * only logs a warning — matching the original behaviour.
+     *
+     * In {@link PublishAMQP.DeliveryGuarantee#AT_LEAST_ONCE} mode, the return 
reason is also
+     * stored in {@link #undeliverableReturnReason} so that {@link #publish} 
can detect it after
+     * {@code waitForConfirms()} synchronizes the two threads and throw an 
{@link AMQPException}
+     * to trigger REL_FAILURE routing.
      */
     private final class UndeliverableMessageLogger implements ReturnListener {
         @Override
         public void handleReturn(int replyCode, String replyText, String 
exchangeName, String routingKey, BasicProperties properties, byte[] message) 
throws IOException {
-            String logMessage = "Message destined for '" + exchangeName + "' 
exchange with '" + routingKey
-                    + "' as routing key came back with replyCode=" + replyCode 
+ " and replyText=" + replyText + ".";
-            processorLog.warn(logMessage);
+            final String reason = "exchange='" + exchangeName + "' 
routingKey='" + routingKey

Review Comment:
   It seems to me that the prefix for the reason is "Message returned as 
undeliverable by broker: " in both cases where the text is used (logging and 
exception). So the prefix could be added right here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to