This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit f9d569b6e94c7aa1a8d836766ad618407979e30e Author: Lari Hotari <[email protected]> AuthorDate: Thu Aug 11 07:12:21 2022 +0300 [fix][broker] Increment topic stats outbound message counters after messages have been written to the TCP/IP connection (#17043) --- .../org/apache/pulsar/broker/service/Consumer.java | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 37dfe087e7f..5c7646921fb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -332,14 +332,19 @@ public class Consumer { topicName, subscription, ackedCount, totalMessages, consumerId, avgMessagesPerEntry.get()); } incrementUnackedMessages(unackedMessages); - msgOut.recordMultipleEvents(totalMessages, totalBytes); - msgOutCounter.add(totalMessages); - bytesOutCounter.add(totalBytes); - chunkedMessageRate.recordMultipleEvents(totalChunkedMessages, 0); - - - return cnx.getCommandSender().sendMessagesToConsumer(consumerId, topicName, subscription, partitionIdx, - entries, batchSizes, batchIndexesAcks, redeliveryTracker, epoch); + Future<Void> writeAndFlushPromise = + cnx.getCommandSender().sendMessagesToConsumer(consumerId, topicName, subscription, partitionIdx, + entries, batchSizes, batchIndexesAcks, redeliveryTracker, epoch); + writeAndFlushPromise.addListener(status -> { + // only increment counters after the messages have been successfully written to the TCP/IP connection + if (status.isSuccess()) { + msgOut.recordMultipleEvents(totalMessages, totalBytes); + msgOutCounter.add(totalMessages); + bytesOutCounter.add(totalBytes); + chunkedMessageRate.recordMultipleEvents(totalChunkedMessages, 0); + } + }); + return writeAndFlushPromise; } private void incrementUnackedMessages(int ackedMessages) {
