This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 16da3724cac [fix][broker] Increment topic stats outbound message
counters after messages have been written to the TCP/IP connection (#17043)
16da3724cac is described below
commit 16da3724cacdb66dee744c2c6b2d75258d7e95ae
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Aug 11 07:12:21 2022 +0300
[fix][broker] Increment topic stats outbound message counters after
messages have been written to the TCP/IP connection (#17043)
(cherry picked from commit 2bc933ee71421f60879c99c7888c96e95d5c9386)
---
.../org/apache/pulsar/broker/service/Consumer.java | 21 +++++++++++++--------
1 file changed, 13 insertions(+), 8 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index ba6b3c3c297..2fea1a1b854 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -290,14 +290,19 @@ public class Consumer {
topicName, subscription, ackedCount, totalMessages,
consumerId, avgMessagesPerEntry.get());
}
incrementUnackedMessages(unackedMessages);
- msgOut.recordMultipleEvents(totalMessages, totalBytes);
- msgOutCounter.add(totalMessages);
- bytesOutCounter.add(totalBytes);
- chunkedMessageRate.recordMultipleEvents(totalChunkedMessages, 0);
-
-
- return cnx.getCommandSender().sendMessagesToConsumer(consumerId,
topicName, subscription, partitionIdx,
- entries, batchSizes, batchIndexesAcks, redeliveryTracker);
+ Future<Void> writeAndFlushPromise =
+ cnx.getCommandSender().sendMessagesToConsumer(consumerId,
topicName, subscription, partitionIdx,
+ entries, batchSizes, batchIndexesAcks,
redeliveryTracker);
+ writeAndFlushPromise.addListener(status -> {
+ // only increment counters after the messages have been
successfully written to the TCP/IP connection
+ if (status.isSuccess()) {
+ msgOut.recordMultipleEvents(totalMessages, totalBytes);
+ msgOutCounter.add(totalMessages);
+ bytesOutCounter.add(totalBytes);
+ chunkedMessageRate.recordMultipleEvents(totalChunkedMessages,
0);
+ }
+ });
+ return writeAndFlushPromise;
}
private void incrementUnackedMessages(int ackedMessages) {