This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 9796eb40bc68b0aefde722fd1f4f698812fca7a2 Author: Tao Jiuming <[email protected]> AuthorDate: Mon Jun 13 09:56:50 2022 +0800 [broker][monitoring] add message ack rate metric for consumer (#15674) (cherry picked from commit 88b47e5e5ac1b7fcf895bd72b0545b24bdf61f7e) --- .../org/apache/pulsar/broker/service/Consumer.java | 24 +++++-- .../apache/pulsar/broker/service/ServerCnx.java | 1 + .../pulsar/broker/service/StreamingStats.java | 1 + .../nonpersistent/NonPersistentSubscription.java | 1 + .../service/nonpersistent/NonPersistentTopic.java | 4 ++ .../service/persistent/PersistentSubscription.java | 1 + .../broker/service/persistent/PersistentTopic.java | 3 + .../stats/prometheus/AggregatedConsumerStats.java | 2 + .../stats/prometheus/AggregatedNamespaceStats.java | 2 + .../prometheus/AggregatedSubscriptionStats.java | 2 + .../stats/prometheus/NamespaceStatsAggregator.java | 3 + .../pulsar/broker/stats/prometheus/TopicStats.java | 7 ++ .../pulsar/broker/stats/ConsumerStatsTest.java | 81 ++++++++++++++++++++++ .../pulsar/common/policies/data/ConsumerStats.java | 5 ++ .../common/policies/data/SubscriptionStats.java | 5 ++ .../policies/data/stats/ConsumerStatsImpl.java | 6 ++ .../policies/data/stats/SubscriptionStatsImpl.java | 5 ++ pulsar-common/src/main/proto/PulsarApi.proto | 3 + 18 files changed, 151 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 3f50ab7a7aa..349fbd860e4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -86,6 +86,7 @@ public class Consumer { private final Rate msgRedeliver; private final LongAdder msgOutCounter; private final LongAdder bytesOutCounter; + private final Rate messageAckRate; private long lastConsumedTimestamp; private long lastAckedTimestamp; @@ -162,6 +163,7 @@ public class Consumer { this.msgRedeliver = new Rate(); this.bytesOutCounter = new LongAdder(); this.msgOutCounter = new LongAdder(); + this.messageAckRate = new Rate(); this.appId = appId; // Ensure we start from compacted view @@ -366,6 +368,8 @@ public class Consumer { } public CompletableFuture<Void> messageAcked(CommandAck ack) { + CompletableFuture<Void> future; + this.lastAckedTimestamp = System.currentTimeMillis(); Map<String, Long> properties = Collections.emptyMap(); if (ack.getPropertiesCount() > 0) { @@ -399,20 +403,27 @@ public class Consumer { } if (ack.hasTxnidMostBits() && ack.hasTxnidLeastBits()) { List<PositionImpl> positionsAcked = Collections.singletonList(position); - return transactionCumulativeAcknowledge(ack.getTxnidMostBits(), + future = transactionCumulativeAcknowledge(ack.getTxnidMostBits(), ack.getTxnidLeastBits(), positionsAcked); } else { List<Position> positionsAcked = Collections.singletonList(position); subscription.acknowledgeMessage(positionsAcked, AckType.Cumulative, properties); - return CompletableFuture.completedFuture(null); + future = CompletableFuture.completedFuture(null); } } else { if (ack.hasTxnidLeastBits() && ack.hasTxnidMostBits()) { - return individualAckWithTransaction(ack); + future = individualAckWithTransaction(ack); } else { - return individualAckNormal(ack, properties); + future = individualAckNormal(ack, properties); } } + + return future + .whenComplete((__, t) -> { + if (t == null) { + this.messageAckRate.recordEvent(ack.getMessageIdsCount()); + } + }); } //this method is for individual ack not carry the transaction @@ -742,7 +753,10 @@ public class Consumer { msgOut.calculateRate(); chunkedMessageRate.calculateRate(); msgRedeliver.calculateRate(); + messageAckRate.calculateRate(); + stats.msgRateOut = msgOut.getRate(); + stats.messageAckRate = messageAckRate.getRate(); stats.msgThroughputOut = msgOut.getValueRate(); stats.msgRateRedeliver = msgRedeliver.getRate(); stats.chunkedMessageRate = chunkedMessageRate.getRate(); @@ -755,7 +769,7 @@ public class Consumer { lastAckedTimestamp = consumerStats.lastAckedTimestamp; lastConsumedTimestamp = consumerStats.lastConsumedTimestamp; MESSAGE_PERMITS_UPDATER.set(this, consumerStats.availablePermits); - if (log.isDebugEnabled()){ + if (log.isDebugEnabled()) { log.debug("[{}-{}] Setting broker.service.Consumer's messagePermits to {} for consumer {}", topicName, subscription, consumerStats.availablePermits, consumerId); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index c4f9e590caa..f127acb1fa7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -612,6 +612,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { .setConnectedSince(consumerStats.getConnectedSince()) .setMsgBacklog(subscription.getNumberOfEntriesInBacklog(false)) .setMsgRateExpired(subscription.getExpiredMessageRate()) + .setMessageAckRate(consumerStats.messageAckRate) .setType(subscription.getTypeString()); return Commands.serializeWithSize(cmd); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StreamingStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StreamingStats.java index 02dcb8233ff..469c802b76a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StreamingStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StreamingStats.java @@ -65,6 +65,7 @@ public class StreamingStats { statsStream.writePair("msgThroughputOut", stats.msgThroughputOut); statsStream.writePair("msgRateRedeliver", stats.msgRateRedeliver); statsStream.writePair("avgMessagesPerEntry", stats.avgMessagesPerEntry); + statsStream.writePair("messageAckRate", stats.messageAckRate); if (Subscription.isIndividualAckMode(subType)) { statsStream.writePair("unackedMessages", stats.unackedMessages); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java index a9777f5dd0d..fed3b0f851a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java @@ -456,6 +456,7 @@ public class NonPersistentSubscription implements Subscription { ConsumerStatsImpl consumerStats = consumer.getStats(); subStats.consumers.add(consumerStats); subStats.msgRateOut += consumerStats.msgRateOut; + subStats.messageAckRate += consumerStats.messageAckRate; subStats.msgThroughputOut += consumerStats.msgThroughputOut; subStats.bytesOutCounter += consumerStats.bytesOutCounter; subStats.msgOutCounter += consumerStats.msgOutCounter; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 82d610c12b7..fb6da69eb2f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -724,6 +724,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol double subMsgRateOut = 0; double subMsgThroughputOut = 0; double subMsgRateRedeliver = 0; + double subMsgAckRate = 0; // Start subscription name & consumers try { @@ -738,6 +739,8 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol ConsumerStatsImpl consumerStats = consumer.getStats(); subMsgRateOut += consumerStats.msgRateOut; + subMsgAckRate += consumerStats.messageAckRate; + subMsgThroughputOut += consumerStats.msgThroughputOut; subMsgRateRedeliver += consumerStats.msgRateRedeliver; @@ -752,6 +755,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol topicStatsStream.writePair("msgBacklog", subscription.getNumberOfEntriesInBacklog(false)); topicStatsStream.writePair("msgRateExpired", subscription.getExpiredMessageRate()); topicStatsStream.writePair("msgRateOut", subMsgRateOut); + topicStatsStream.writePair("messageAckRate", subMsgAckRate); topicStatsStream.writePair("msgThroughputOut", subMsgThroughputOut); topicStatsStream.writePair("msgRateRedeliver", subMsgRateRedeliver); topicStatsStream.writePair("type", subscription.getTypeString()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 95dee2a3798..31ba5115ef6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -966,6 +966,7 @@ public class PersistentSubscription implements Subscription { subStats.bytesOutCounter += consumerStats.bytesOutCounter; subStats.msgOutCounter += consumerStats.msgOutCounter; subStats.msgRateRedeliver += consumerStats.msgRateRedeliver; + subStats.messageAckRate += consumerStats.messageAckRate; subStats.chunkedMessageRate += consumerStats.chunkedMessageRate; subStats.unackedMessages += consumerStats.unackedMessages; subStats.lastConsumedTimestamp = diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index b1ccf0b23ec..629158c4d50 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1722,6 +1722,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal double subMsgRateOut = 0; double subMsgThroughputOut = 0; double subMsgRateRedeliver = 0; + double subMsgAckRate = 0; // Start subscription name & consumers try { @@ -1735,6 +1736,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal ConsumerStatsImpl consumerStats = consumer.getStats(); subMsgRateOut += consumerStats.msgRateOut; + subMsgAckRate += consumerStats.messageAckRate; subMsgThroughputOut += consumerStats.msgThroughputOut; subMsgRateRedeliver += consumerStats.msgRateRedeliver; @@ -1749,6 +1751,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal subscription.getNumberOfEntriesInBacklog(true)); topicStatsStream.writePair("msgRateExpired", subscription.getExpiredMessageRate()); topicStatsStream.writePair("msgRateOut", subMsgRateOut); + topicStatsStream.writePair("messageAckRate", subMsgAckRate); topicStatsStream.writePair("msgThroughputOut", subMsgThroughputOut); topicStatsStream.writePair("msgRateRedeliver", subMsgRateRedeliver); topicStatsStream.writePair("numberOfEntriesSinceFirstNotAckedMessage", diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedConsumerStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedConsumerStats.java index 8b6bf7d5c96..0a4bd317df5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedConsumerStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedConsumerStats.java @@ -28,6 +28,8 @@ public class AggregatedConsumerStats { public double msgRateOut; + public double msgAckRate; + public double msgThroughputOut; public long availablePermits; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java index 8bacd0f582d..5610dbab218 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java @@ -33,6 +33,7 @@ public class AggregatedNamespaceStats { public double throughputIn; public double throughputOut; + public long messageAckRate; public long bytesInCounter; public long msgInCounter; public long bytesOutCounter; @@ -122,6 +123,7 @@ public class AggregatedNamespaceStats { consumerStats.blockedSubscriptionOnUnackedMsgs = v.blockedSubscriptionOnUnackedMsgs; consumerStats.msgRateRedeliver += v.msgRateRedeliver; consumerStats.unackedMessages += v.unackedMessages; + messageAckRate += v.msgAckRate; }); }); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java index c829be28e59..2a3d4ed533a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java @@ -36,6 +36,8 @@ public class AggregatedSubscriptionStats { public double msgRateOut; + public double messageAckRate; + public double msgThroughputOut; public long msgDelayed; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java index a3a0fcda445..229739b2fce 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java @@ -131,6 +131,7 @@ public class NamespaceStatsAggregator { subsStats.unackedMessages += cStats.unackedMessages; subsStats.msgRateRedeliver += cStats.msgRateRedeliver; subsStats.msgRateOut += cStats.msgRateOut; + subsStats.messageAckRate += cStats.messageAckRate; subsStats.msgThroughputOut += cStats.msgThroughputOut; subsStats.bytesOutCounter += cStats.bytesOutCounter; subsStats.msgOutCounter += cStats.msgOutCounter; @@ -240,6 +241,7 @@ public class NamespaceStatsAggregator { consumerStats.unackedMessages = conStats.unackedMessages; consumerStats.msgRateRedeliver = conStats.msgRateRedeliver; consumerStats.msgRateOut = conStats.msgRateOut; + consumerStats.msgAckRate = conStats.messageAckRate; consumerStats.msgThroughputOut = conStats.msgThroughputOut; consumerStats.bytesOutCounter = conStats.bytesOutCounter; consumerStats.msgOutCounter = conStats.msgOutCounter; @@ -324,6 +326,7 @@ public class NamespaceStatsAggregator { metric(stream, cluster, namespace, "pulsar_rate_out", stats.rateOut); metric(stream, cluster, namespace, "pulsar_throughput_in", stats.throughputIn); metric(stream, cluster, namespace, "pulsar_throughput_out", stats.throughputOut); + metric(stream, cluster, namespace, "pulsar_consumer_msg_ack_rate", stats.messageAckRate); metric(stream, cluster, namespace, "pulsar_in_bytes_total", stats.bytesInCounter); metric(stream, cluster, namespace, "pulsar_in_messages_total", stats.msgInCounter); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java index 35dbdba274a..46be437eb23 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java @@ -246,6 +246,8 @@ class TopicStats { subsStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0, splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_out", subsStats.msgRateOut, splitTopicAndPartitionIndexLabel); + metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_ack_rate", + subsStats.messageAckRate, splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_throughput_out", subsStats.msgThroughputOut, splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, n, "pulsar_out_bytes_total", @@ -282,6 +284,11 @@ class TopicStats { metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), "pulsar_consumer_msg_rate_out", consumerStats.msgRateOut, splitTopicAndPartitionIndexLabel); + + metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), + "pulsar_consumer_msg_ack_rate", consumerStats.msgAckRate, + splitTopicAndPartitionIndexLabel); + metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), "pulsar_consumer_msg_throughput_out", consumerStats.msgThroughputOut, splitTopicAndPartitionIndexLabel); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java index 2f702ab89f6..e064e56e993 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java @@ -20,14 +20,22 @@ package org.apache.pulsar.broker.stats; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Multimap; import com.google.common.collect.Sets; +import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.service.Subscription; +import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.common.policies.data.ConsumerStats; @@ -38,9 +46,13 @@ import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import java.io.ByteArrayOutputStream; +import java.nio.charset.StandardCharsets; +import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Set; +import java.util.UUID; import java.util.concurrent.TimeUnit; @Slf4j @@ -183,6 +195,7 @@ public class ConsumerStatsTest extends ProducerConsumerBase { "msgThroughputOut", "bytesOutCounter", "msgOutCounter", + "messageAckRate", "msgRateRedeliver", "chunkedMessageRate", "consumerName", @@ -220,4 +233,72 @@ public class ConsumerStatsTest extends ProducerConsumerBase { consumer.close(); } + + + @Test + public void testPersistentTopicMessageAckRateMetricTopicLevel() throws Exception { + String topicName = "persistent://public/default/msg_ack_rate" + UUID.randomUUID(); + testMessageAckRateMetric(topicName, true); + } + + @Test + public void testPersistentTopicMessageAckRateMetricNamespaceLevel() throws Exception { + String topicName = "persistent://public/default/msg_ack_rate" + UUID.randomUUID(); + testMessageAckRateMetric(topicName, false); + } + + private void testMessageAckRateMetric(String topicName, boolean exposeTopicLevelMetrics) + throws Exception { + final int messages = 100; + String subName = "test_sub"; + + @Cleanup + Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create(); + @Cleanup + Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topicName) + .subscriptionName(subName).isAckReceiptEnabled(true).subscribe(); + + String namespace = TopicName.get(topicName).getNamespace(); + + for (int i = 0; i < messages; i++) { + producer.send(UUID.randomUUID().toString()); + } + + for (int i = 0; i < messages; i++) { + Message<String> message = consumer.receive(20, TimeUnit.SECONDS); + if (message == null) { + break; + } + + consumer.acknowledge(message); + } + + Topic topic = pulsar.getBrokerService().getTopic(topicName, false).get().get(); + Subscription subscription = topic.getSubscription(subName); + List<org.apache.pulsar.broker.service.Consumer> consumers = subscription.getConsumers(); + Assert.assertEquals(consumers.size(), 1); + org.apache.pulsar.broker.service.Consumer consumer1 = consumers.get(0); + consumer1.updateRates(); + + ByteArrayOutputStream output = new ByteArrayOutputStream(); + PrometheusMetricsGenerator.generate(pulsar, exposeTopicLevelMetrics, true, true, output); + String metricStr = output.toString(StandardCharsets.UTF_8); + + Multimap<String, PrometheusMetricsTest.Metric> metricsMap = PrometheusMetricsTest.parseMetrics(metricStr); + Collection<PrometheusMetricsTest.Metric> metrics = metricsMap.get("pulsar_consumer_msg_ack_rate"); + Assert.assertTrue(metrics.size() > 0); + + int num = 0; + for (PrometheusMetricsTest.Metric metric : metrics) { + if (exposeTopicLevelMetrics && metric.tags.get("subscription").equals(subName)) { + num++; + Assert.assertTrue(metric.value > 0); + } else if (!exposeTopicLevelMetrics && metric.tags.get("namespace").equals(namespace)) { + num++; + Assert.assertTrue(metric.value > 0); + } + } + + Assert.assertTrue(num > 0); + } } diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java index c78c17c8d9d..4c165083ab1 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java @@ -40,6 +40,11 @@ public interface ConsumerStats { /** Total rate of messages redelivered by this consumer (msg/s). */ double getMsgRateRedeliver(); + /** + * Total rate of message ack(msg/s). + */ + double getMessageAckRate(); + /** The total rate of chunked messages delivered to this consumer. */ double getChunkedMessageRate(); diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java index 4175afb7140..39738d2eaa0 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java @@ -40,6 +40,11 @@ public interface SubscriptionStats { /** Total rate of messages redelivered on this subscription (msg/s). */ double getMsgRateRedeliver(); + /** + * Total rate of message ack(msg/s). + */ + double getMessageAckRate(); + /** Chunked message dispatch rate. */ int getChunkedMessageRate(); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java index 33d0a8d1344..ae9ba67148f 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java @@ -45,6 +45,11 @@ public class ConsumerStatsImpl implements ConsumerStats { /** Total rate of messages redelivered by this consumer (msg/s). */ public double msgRateRedeliver; + /** + * Total rate of message ack(msg/s). + */ + public double messageAckRate; + /** The total rate of chunked messages delivered to this consumer. */ public double chunkedMessageRate; @@ -103,6 +108,7 @@ public class ConsumerStatsImpl implements ConsumerStats { public ConsumerStatsImpl add(ConsumerStatsImpl stats) { Objects.requireNonNull(stats); this.msgRateOut += stats.msgRateOut; + this.messageAckRate += stats.messageAckRate; this.msgThroughputOut += stats.msgThroughputOut; this.bytesOutCounter += stats.bytesOutCounter; this.msgOutCounter += stats.msgOutCounter; diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java index 8beaa6facfd..53112295aa2 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java @@ -47,6 +47,11 @@ public class SubscriptionStatsImpl implements SubscriptionStats { /** Total rate of messages redelivered on this subscription (msg/s). */ public double msgRateRedeliver; + /** + * Total rate of message ack(msg/s). + */ + public double messageAckRate; + /** Chunked message dispatch rate. */ public int chunkedMessageRate; diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index 337c6bb2cd4..e66c2b4a70e 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -716,6 +716,9 @@ message CommandConsumerStatsResponse { /// Number of messages in the subscription backlog optional uint64 msgBacklog = 15; + + /// Total rate of messages ack. msg/s + optional double messageAckRate = 16; } message CommandGetLastMessageId {
