This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit f03d8d9a79a0dfe9fbb660a8b030c9550707faea Author: Lari Hotari <[email protected]> AuthorDate: Wed May 28 18:12:41 2025 +0300 [fix][io] Acknowledge RabbitMQ message after processing the message successfully (#24354) (cherry picked from commit 76c6f6a043018c4278c741f77410ef4938ba6489) --- .../apache/pulsar/io/rabbitmq/RabbitMQSource.java | 31 +++++++++++++++++++--- 1 file changed, 27 insertions(+), 4 deletions(-) diff --git a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java index b0b7ef31b08..16b624bdc10 100644 --- a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java +++ b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java @@ -96,11 +96,22 @@ public class RabbitMQSource extends PushSource<byte[]> { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { - source.consume(new RabbitMQRecord(Optional.ofNullable(envelope.getRoutingKey()), body)); long deliveryTag = envelope.getDeliveryTag(); - // positively acknowledge all deliveries up to this delivery tag to reduce network traffic - // since manual message acknowledgments are turned on by default - this.getChannel().basicAck(deliveryTag, true); + source.consume(new RabbitMQRecord(Optional.ofNullable(envelope.getRoutingKey()), body, () -> { + // acknowledge this delivery tag to RabbitMQ + try { + this.getChannel().basicAck(deliveryTag, false); + } catch (IOException e) { + logger.error("Error while acknowledging envelope {}.", envelope, e); + } + }, () -> { + // negatively acknowledge this delivery tag to RabbitMQ + try { + this.getChannel().basicNack(deliveryTag, false, true); + } catch (IOException e) { + logger.error("Error while negatively acknowledging envelope {}.", envelope, e); + } + })); } } @@ -108,5 +119,17 @@ public class RabbitMQSource extends PushSource<byte[]> { private static class RabbitMQRecord implements Record<byte[]> { private final Optional<String> key; private final byte[] value; + private final Runnable ackFunction; + private final Runnable nackFunction; + + @Override + public void ack() { + ackFunction.run(); + } + + @Override + public void fail() { + nackFunction.run(); + } } } \ No newline at end of file
