This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 4ce967ea6727afa0a36245b5755fa67f43a170da Author: Tao Jiuming <[email protected]> AuthorDate: Tue Jun 21 15:53:03 2022 +0800 [fix][broker][monitoring] fix message ack rate (#16108) (cherry picked from commit 8869d8c18361fcd5fcf731f9edf2d38ae07cc0cf) --- .../org/apache/pulsar/broker/service/Consumer.java | 32 ++++--- .../pulsar/broker/stats/ConsumerStatsTest.java | 97 +++++++++++++++------- 2 files changed, 87 insertions(+), 42 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 349fbd860e4..9b0d678ffc9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -368,7 +368,7 @@ public class Consumer { } public CompletableFuture<Void> messageAcked(CommandAck ack) { - CompletableFuture<Void> future; + CompletableFuture<Long> future; this.lastAckedTimestamp = System.currentTimeMillis(); Map<String, Long> properties = Collections.emptyMap(); @@ -404,11 +404,12 @@ public class Consumer { if (ack.hasTxnidMostBits() && ack.hasTxnidLeastBits()) { List<PositionImpl> positionsAcked = Collections.singletonList(position); future = transactionCumulativeAcknowledge(ack.getTxnidMostBits(), - ack.getTxnidLeastBits(), positionsAcked); + ack.getTxnidLeastBits(), positionsAcked) + .thenApply(unused -> 1L); } else { List<Position> positionsAcked = Collections.singletonList(position); subscription.acknowledgeMessage(positionsAcked, AckType.Cumulative, properties); - future = CompletableFuture.completedFuture(null); + future = CompletableFuture.completedFuture(1L); } } else { if (ack.hasTxnidLeastBits() && ack.hasTxnidMostBits()) { @@ -419,16 +420,16 @@ public class Consumer { } return future - .whenComplete((__, t) -> { - if (t == null) { - this.messageAckRate.recordEvent(ack.getMessageIdsCount()); - } + .thenApply(v -> { + this.messageAckRate.recordEvent(v); + return null; }); } //this method is for individual ack not carry the transaction - private CompletableFuture<Void> individualAckNormal(CommandAck ack, Map<String, Long> properties) { + private CompletableFuture<Long> individualAckNormal(CommandAck ack, Map<String, Long> properties) { List<Position> positionsAcked = new ArrayList<>(); + long totalAckCount = 0; for (int i = 0; i < ack.getMessageIdsCount(); i++) { MessageIdData msgId = ack.getMessageIdAt(i); PositionImpl position; @@ -461,10 +462,12 @@ public class Consumer { checkCanRemovePendingAcksAndHandle(position, msgId); checkAckValidationError(ack, position); + + totalAckCount += ackedCount; } subscription.acknowledgeMessage(positionsAcked, AckType.Individual, properties); - CompletableFuture<Void> completableFuture = new CompletableFuture<>(); - completableFuture.complete(null); + CompletableFuture<Long> completableFuture = new CompletableFuture<>(); + completableFuture.complete(totalAckCount); if (isTransactionEnabled() && Subscription.isIndividualAckMode(subType)) { completableFuture.whenComplete((v, e) -> positionsAcked.forEach(position -> { //check if the position can remove from the consumer pending acks. @@ -482,7 +485,7 @@ public class Consumer { //this method is for individual ack carry the transaction - private CompletableFuture<Void> individualAckWithTransaction(CommandAck ack) { + private CompletableFuture<Long> individualAckWithTransaction(CommandAck ack) { // Individual ack List<MutablePair<PositionImpl, Integer>> positionsAcked = new ArrayList<>(); if (!isTransactionEnabled()) { @@ -490,6 +493,7 @@ public class Consumer { new BrokerServiceException.NotAllowedException("Server don't support transaction ack!")); } + LongAdder totalAckCount = new LongAdder(); for (int i = 0; i < ack.getMessageIdsCount(); i++) { MessageIdData msgId = ack.getMessageIdAt(i); PositionImpl position; @@ -518,6 +522,8 @@ public class Consumer { checkCanRemovePendingAcksAndHandle(position, msgId); checkAckValidationError(ack, position); + + totalAckCount.add(ackedCount); } CompletableFuture<Void> completableFuture = transactionIndividualAcknowledge(ack.getTxnidMostBits(), @@ -533,7 +539,7 @@ public class Consumer { } })); } - return completableFuture; + return completableFuture.thenApply(__ -> totalAckCount.sum()); } private long getBatchSize(MessageIdData msgId) { @@ -756,9 +762,9 @@ public class Consumer { messageAckRate.calculateRate(); stats.msgRateOut = msgOut.getRate(); - stats.messageAckRate = messageAckRate.getRate(); stats.msgThroughputOut = msgOut.getValueRate(); stats.msgRateRedeliver = msgRedeliver.getRate(); + stats.messageAckRate = messageAckRate.getValueRate(); stats.chunkedMessageRate = chunkedMessageRate.getRate(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java index 412efe69632..e35be235f3c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java @@ -29,7 +29,7 @@ import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageListener; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClientException; @@ -53,6 +53,7 @@ import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @Slf4j @@ -249,56 +250,94 @@ public class ConsumerStatsTest extends ProducerConsumerBase { private void testMessageAckRateMetric(String topicName, boolean exposeTopicLevelMetrics) throws Exception { - final int messages = 100; + final int messages = 1000; String subName = "test_sub"; + CountDownLatch latch = new CountDownLatch(messages); @Cleanup - Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create(); + Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName) + .enableBatching(true).batchingMaxMessages(10).create(); + + MessageListener<String> listener = (consumer, msg) -> { + try { + consumer.acknowledge(msg); + latch.countDown(); + } catch (PulsarClientException e) { + //ignore + } + }; @Cleanup - Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topicName) - .subscriptionName(subName).isAckReceiptEnabled(true).subscribe(); + Consumer<String> c1 = pulsarClient.newConsumer(Schema.STRING) + .topic(topicName) + .subscriptionName(subName) + .subscriptionType(SubscriptionType.Shared) + .messageListener(listener) + .subscribe(); + @Cleanup + Consumer<String> c2 = pulsarClient.newConsumer(Schema.STRING) + .topic(topicName) + .subscriptionName(subName) + .subscriptionType(SubscriptionType.Shared) + .messageListener(listener) + .subscribe(); String namespace = TopicName.get(topicName).getNamespace(); for (int i = 0; i < messages; i++) { - producer.send(UUID.randomUUID().toString()); + producer.sendAsync(UUID.randomUUID().toString()); } + producer.flush(); - for (int i = 0; i < messages; i++) { - Message<String> message = consumer.receive(20, TimeUnit.SECONDS); - if (message == null) { - break; - } - - consumer.acknowledge(message); - } + latch.await(20, TimeUnit.SECONDS); + TimeUnit.SECONDS.sleep(1); Topic topic = pulsar.getBrokerService().getTopic(topicName, false).get().get(); Subscription subscription = topic.getSubscription(subName); List<org.apache.pulsar.broker.service.Consumer> consumers = subscription.getConsumers(); - Assert.assertEquals(consumers.size(), 1); + Assert.assertEquals(consumers.size(), 2); org.apache.pulsar.broker.service.Consumer consumer1 = consumers.get(0); + org.apache.pulsar.broker.service.Consumer consumer2 = consumers.get(1); consumer1.updateRates(); + consumer2.updateRates(); ByteArrayOutputStream output = new ByteArrayOutputStream(); PrometheusMetricsGenerator.generate(pulsar, exposeTopicLevelMetrics, true, true, output); String metricStr = output.toString(StandardCharsets.UTF_8.name()); Multimap<String, PrometheusMetricsTest.Metric> metricsMap = PrometheusMetricsTest.parseMetrics(metricStr); - Collection<PrometheusMetricsTest.Metric> metrics = metricsMap.get("pulsar_consumer_msg_ack_rate"); - Assert.assertTrue(metrics.size() > 0); - - int num = 0; - for (PrometheusMetricsTest.Metric metric : metrics) { - if (exposeTopicLevelMetrics && metric.tags.get("subscription").equals(subName)) { - num++; - Assert.assertTrue(metric.value > 0); - } else if (!exposeTopicLevelMetrics && metric.tags.get("namespace").equals(namespace)) { - num++; - Assert.assertTrue(metric.value > 0); - } + Collection<PrometheusMetricsTest.Metric> ackRateMetric = metricsMap.get("pulsar_consumer_msg_ack_rate"); + + String rateOutMetricName = exposeTopicLevelMetrics ? "pulsar_consumer_msg_rate_out" : "pulsar_rate_out"; + Collection<PrometheusMetricsTest.Metric> rateOutMetric = metricsMap.get(rateOutMetricName); + Assert.assertTrue(ackRateMetric.size() > 0); + Assert.assertTrue(rateOutMetric.size() > 0); + + if (exposeTopicLevelMetrics) { + String consumer1Name = consumer1.consumerName(); + String consumer2Name = consumer2.consumerName(); + double totalAckRate = ackRateMetric.stream() + .filter(metric -> metric.tags.get("consumer_name").equals(consumer1Name) + || metric.tags.get("consumer_name").equals(consumer2Name)) + .mapToDouble(metric -> metric.value).sum(); + double totalRateOut = rateOutMetric.stream() + .filter(metric -> metric.tags.get("consumer_name").equals(consumer1Name) + || metric.tags.get("consumer_name").equals(consumer2Name)) + .mapToDouble(metric -> metric.value).sum(); + + Assert.assertTrue(totalAckRate > 0D); + Assert.assertTrue(totalRateOut > 0D); + Assert.assertEquals(totalAckRate, totalRateOut, totalRateOut * 0.1D); + } else { + double totalAckRate = ackRateMetric.stream() + .filter(metric -> namespace.equals(metric.tags.get("namespace"))) + .mapToDouble(metric -> metric.value).sum(); + double totalRateOut = rateOutMetric.stream() + .filter(metric -> namespace.equals(metric.tags.get("namespace"))) + .mapToDouble(metric -> metric.value).sum(); + + Assert.assertTrue(totalAckRate > 0D); + Assert.assertTrue(totalRateOut > 0D); + Assert.assertEquals(totalAckRate, totalRateOut, totalRateOut * 0.1D); } - - Assert.assertTrue(num > 0); } }
