This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f03d8d9a79a0dfe9fbb660a8b030c9550707faea
Author: Lari Hotari <[email protected]>
AuthorDate: Wed May 28 18:12:41 2025 +0300

    [fix][io] Acknowledge RabbitMQ message after processing the message 
successfully (#24354)
    
    (cherry picked from commit 76c6f6a043018c4278c741f77410ef4938ba6489)
---
 .../apache/pulsar/io/rabbitmq/RabbitMQSource.java  | 31 +++++++++++++++++++---
 1 file changed, 27 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
 
b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
index b0b7ef31b08..16b624bdc10 100644
--- 
a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
+++ 
b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
@@ -96,11 +96,22 @@ public class RabbitMQSource extends PushSource<byte[]> {
         @Override
         public void handleDelivery(String consumerTag, Envelope envelope, 
AMQP.BasicProperties properties, byte[] body)
                 throws IOException {
-            source.consume(new 
RabbitMQRecord(Optional.ofNullable(envelope.getRoutingKey()), body));
             long deliveryTag = envelope.getDeliveryTag();
-            // positively acknowledge all deliveries up to this delivery tag 
to reduce network traffic
-            // since manual message acknowledgments are turned on by default
-            this.getChannel().basicAck(deliveryTag, true);
+            source.consume(new 
RabbitMQRecord(Optional.ofNullable(envelope.getRoutingKey()), body, () -> {
+                // acknowledge this delivery tag to RabbitMQ
+                try {
+                    this.getChannel().basicAck(deliveryTag, false);
+                } catch (IOException e) {
+                    logger.error("Error while acknowledging envelope {}.", 
envelope, e);
+                }
+            }, () -> {
+                // negatively acknowledge this delivery tag to RabbitMQ
+                try {
+                    this.getChannel().basicNack(deliveryTag, false, true);
+                } catch (IOException e) {
+                    logger.error("Error while negatively acknowledging 
envelope {}.", envelope, e);
+                }
+            }));
         }
     }
 
@@ -108,5 +119,17 @@ public class RabbitMQSource extends PushSource<byte[]> {
     private static class RabbitMQRecord implements Record<byte[]> {
         private final Optional<String> key;
         private final byte[] value;
+        private final Runnable ackFunction;
+        private final Runnable nackFunction;
+
+        @Override
+        public void ack() {
+            ackFunction.run();
+        }
+
+        @Override
+        public void fail() {
+            nackFunction.run();
+        }
     }
 }
\ No newline at end of file

Reply via email to