This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f9d569b6e94c7aa1a8d836766ad618407979e30e
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Aug 11 07:12:21 2022 +0300

    [fix][broker] Increment topic stats outbound message counters after 
messages have been written to the TCP/IP connection (#17043)
---
 .../org/apache/pulsar/broker/service/Consumer.java  | 21 +++++++++++++--------
 1 file changed, 13 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 37dfe087e7f..5c7646921fb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -332,14 +332,19 @@ public class Consumer {
                    topicName, subscription, ackedCount, totalMessages, 
consumerId, avgMessagesPerEntry.get());
         }
         incrementUnackedMessages(unackedMessages);
-        msgOut.recordMultipleEvents(totalMessages, totalBytes);
-        msgOutCounter.add(totalMessages);
-        bytesOutCounter.add(totalBytes);
-        chunkedMessageRate.recordMultipleEvents(totalChunkedMessages, 0);
-
-
-        return cnx.getCommandSender().sendMessagesToConsumer(consumerId, 
topicName, subscription, partitionIdx,
-                entries, batchSizes, batchIndexesAcks, redeliveryTracker, 
epoch);
+        Future<Void> writeAndFlushPromise =
+                cnx.getCommandSender().sendMessagesToConsumer(consumerId, 
topicName, subscription, partitionIdx,
+                        entries, batchSizes, batchIndexesAcks, 
redeliveryTracker, epoch);
+        writeAndFlushPromise.addListener(status -> {
+            // only increment counters after the messages have been 
successfully written to the TCP/IP connection
+            if (status.isSuccess()) {
+                msgOut.recordMultipleEvents(totalMessages, totalBytes);
+                msgOutCounter.add(totalMessages);
+                bytesOutCounter.add(totalBytes);
+                chunkedMessageRate.recordMultipleEvents(totalChunkedMessages, 
0);
+            }
+        });
+        return writeAndFlushPromise;
     }
 
     private void incrementUnackedMessages(int ackedMessages) {

Reply via email to