This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 642461c  Expose the receiver queue size to client consumer stats 
(#8663)
642461c is described below

commit 642461cf658c1f0ed7eed350a990fcaa39182703
Author: feynmanlin <[email protected]>
AuthorDate: Sun Nov 22 09:59:54 2020 +0800

    Expose the receiver queue size to client consumer stats (#8663)
    
    Fixes #8650
    ### Motivation
    Currently, we log the receiver queue size. But we don't expose the receiver 
queue size to the client consumer stats. We should expose it.
    
    ### Modifications
    add API for `ConsumerStats.java`
---
 .../client/api/SimpleProducerConsumerTest.java     | 56 ++++++++++++++++++++--
 .../apache/pulsar/client/api/ConsumerStats.java    | 14 ++++++
 .../pulsar/client/impl/ConsumerStatsDisabled.java  | 11 +++++
 .../client/impl/ConsumerStatsRecorderImpl.java     | 55 +++++++++++++++++----
 .../client/impl/MultiTopicsConsumerImpl.java       |  2 +-
 5 files changed, 125 insertions(+), 13 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index b4d32d5..55dab8b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -36,7 +36,6 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
-import com.google.common.reflect.Reflection;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 
@@ -72,11 +71,9 @@ import java.util.stream.Collectors;
 
 import lombok.Cleanup;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
-import org.apache.bookkeeper.common.util.ReflectionUtils;
 import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.commons.lang3.tuple.Pair;
-import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.impl.ClientCnx;
@@ -84,7 +81,6 @@ import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
-import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.TopicMessageImpl;
 import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
 import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
@@ -100,6 +96,7 @@ import 
org.apache.pulsar.common.compression.CompressionCodecProvider;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString;
+import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -3647,4 +3644,55 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
         Assert.assertTrue(producer.getLastDisconnectedTimestamp() > 0);
         Assert.assertTrue(consumer.getLastDisconnectedTimestamp() > 0);
     }
+
+    @Test
+    public void testGetStats() throws Exception {
+        final String topicName = "persistent://my-property/my-ns/testGetStats" 
+ UUID.randomUUID();
+        final String subName = "my-sub";
+        final int receiveQueueSize = 100;
+        PulsarClient client = newPulsarClient(lookupUrl.toString(), 100);
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .enableBatching(false).topic(topicName).create();
+        ConsumerImpl<String> consumer = (ConsumerImpl<String>) 
client.newConsumer(Schema.STRING)
+                
.topic(topicName).receiverQueueSize(receiveQueueSize).subscriptionName(subName).subscribe();
+        Assert.assertNull(consumer.getStats().getMsgNumInSubReceiverQueue());
+        
Assert.assertEquals(consumer.getStats().getMsgNumInReceiverQueue().intValue(), 
0);
+
+        for (int i = 0; i < receiveQueueSize; i++) {
+            producer.sendAsync("msg" + i);
+        }
+        //Give some time to consume
+        Awaitility.await().atMost(5, TimeUnit.SECONDS)
+                .untilAsserted(() -> 
Assert.assertEquals(consumer.getStats().getMsgNumInReceiverQueue().intValue(), 
receiveQueueSize));
+        consumer.close();
+        producer.close();
+    }
+
+    @Test
+    public void testGetStatsForPartitionedTopic() throws Exception {
+        final String topicName = 
"persistent://my-property/my-ns/testGetStatsForPartitionedTopic";
+        final String subName = "my-sub";
+        final int receiveQueueSize = 100;
+
+        admin.topics().createPartitionedTopic(topicName, 3);
+        PulsarClient client = newPulsarClient(lookupUrl.toString(), 100);
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .enableBatching(false).topic(topicName).create();
+        MultiTopicsConsumerImpl<String> consumer = 
(MultiTopicsConsumerImpl<String>) client.newConsumer(Schema.STRING)
+                
.topic(topicName).receiverQueueSize(receiveQueueSize).subscriptionName(subName).subscribe();
+        
Assert.assertEquals(consumer.getStats().getMsgNumInSubReceiverQueue().size(), 
3);
+        
Assert.assertEquals(consumer.getStats().getMsgNumInReceiverQueue().intValue(), 
0);
+
+        consumer.getStats().getMsgNumInSubReceiverQueue()
+                .forEach((key, value) -> Assert.assertEquals((int) value, 0));
+
+        for (int i = 0; i < receiveQueueSize; i++) {
+            producer.sendAsync("msg" + i);
+        }
+        //Give some time to consume
+        Awaitility.await().atMost(5, TimeUnit.SECONDS)
+                .untilAsserted(() -> 
Assert.assertEquals(consumer.getStats().getMsgNumInReceiverQueue().intValue(), 
receiveQueueSize));
+        consumer.close();
+        producer.close();
+    }
 }
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerStats.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerStats.java
index c34c348..04b2ee1 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerStats.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerStats.java
@@ -19,6 +19,8 @@
 package org.apache.pulsar.client.api;
 
 import java.io.Serializable;
+import java.util.Map;
+
 import org.apache.pulsar.common.classification.InterfaceAudience;
 import org.apache.pulsar.common.classification.InterfaceStability;
 
@@ -101,4 +103,16 @@ public interface ConsumerStats extends Serializable {
      * @return Total number of message acknowledgments failures on this 
consumer
      */
     long getTotalAcksFailed();
+
+    /**
+     * Get the size of receiver queue.
+     * @return
+     */
+    Integer getMsgNumInReceiverQueue();
+
+    /**
+     * Get the receiver queue size of sub-consumers.
+     * @return
+     */
+    Map<Long, Integer> getMsgNumInSubReceiverQueue();
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsDisabled.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsDisabled.java
index 8ced7a6..7304548 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsDisabled.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsDisabled.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.impl;
 
+import java.util.Map;
 import java.util.Optional;
 
 import org.apache.pulsar.client.api.ConsumerStats;
@@ -116,6 +117,16 @@ public class ConsumerStatsDisabled implements 
ConsumerStatsRecorder {
     }
 
     @Override
+    public Integer getMsgNumInReceiverQueue() {
+        return null;
+    }
+
+    @Override
+    public Map<Long, Integer> getMsgNumInSubReceiverQueue() {
+        return null;
+    }
+
+    @Override
     public double getRateMsgsReceived() {
         return 0;
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java
index c1a2559..c9b2c49 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java
@@ -20,10 +20,14 @@ package org.apache.pulsar.client.impl;
 
 import java.io.IOException;
 import java.text.DecimalFormat;
+import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.LongAdder;
+import java.util.stream.Collectors;
 
+import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerStats;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
@@ -42,7 +46,7 @@ public class ConsumerStatsRecorderImpl implements 
ConsumerStatsRecorder {
     private static final long serialVersionUID = 1L;
     private TimerTask stat;
     private Timeout statTimeout;
-    private ConsumerImpl<?> consumer;
+    private Consumer<?> consumer;
     private PulsarClientImpl pulsarClient;
     private long oldTime;
     private long statsIntervalSeconds;
@@ -65,6 +69,11 @@ public class ConsumerStatsRecorderImpl implements 
ConsumerStatsRecorder {
     private static final DecimalFormat THROUGHPUT_FORMAT = new 
DecimalFormat("0.00");
 
     public ConsumerStatsRecorderImpl() {
+        this(null);
+    }
+
+    public ConsumerStatsRecorderImpl(Consumer<?> consumer) {
+        this.consumer = consumer;
         numMsgsReceived = new LongAdder();
         numBytesReceived = new LongAdder();
         numReceiveFailed = new LongAdder();
@@ -80,7 +89,7 @@ public class ConsumerStatsRecorderImpl implements 
ConsumerStatsRecorder {
     }
 
     public ConsumerStatsRecorderImpl(PulsarClientImpl pulsarClient, 
ConsumerConfigurationData<?> conf,
-            ConsumerImpl<?> consumer) {
+            Consumer<?> consumer) {
         this.pulsarClient = pulsarClient;
         this.consumer = consumer;
         this.statsIntervalSeconds = 
pulsarClient.getConfiguration().getStatsIntervalSeconds();
@@ -112,9 +121,10 @@ public class ConsumerStatsRecorderImpl implements 
ConsumerStatsRecorder {
         }
 
         stat = (timeout) -> {
-            if (timeout.isCancelled()) {
+            if (timeout.isCancelled() || !(consumer instanceof ConsumerImpl)) {
                 return;
             }
+            ConsumerImpl<?> consumerImpl = (ConsumerImpl<?>) consumer;
             try {
                 long now = System.nanoTime();
                 double elapsed = (now - oldTime) / 1e9;
@@ -135,7 +145,6 @@ public class ConsumerStatsRecorderImpl implements 
ConsumerStatsRecorder {
 
                 receivedMsgsRate = currentNumMsgsReceived / elapsed;
                 receivedBytesRate = currentNumBytesReceived / elapsed;
-
                 if ((currentNumMsgsReceived | currentNumBytesReceived | 
currentNumReceiveFailed | currentNumAcksSent
                         | currentNumAcksFailed) != 0) {
                     log.info(
@@ -143,15 +152,15 @@ public class ConsumerStatsRecorderImpl implements 
ConsumerStatsRecorder {
                                     + "Consume throughput received: {} msgs/s 
--- {} Mbit/s --- "
                                     + "Ack sent rate: {} ack/s --- " + "Failed 
messages: {} --- batch messages: {} ---"
                                     + "Failed acks: {}",
-                            consumer.getTopic(), consumer.getSubscription(), 
consumer.consumerName,
-                            consumer.incomingMessages.size(), 
THROUGHPUT_FORMAT.format(receivedMsgsRate),
+                            consumerImpl.getTopic(), 
consumerImpl.getSubscription(), consumerImpl.consumerName,
+                            consumerImpl.incomingMessages.size(), 
THROUGHPUT_FORMAT.format(receivedMsgsRate),
                             THROUGHPUT_FORMAT.format(receivedBytesRate * 8 / 
1024 / 1024),
                             THROUGHPUT_FORMAT.format(currentNumAcksSent / 
elapsed), currentNumReceiveFailed,
                             currentNumBatchReceiveFailed, 
currentNumAcksFailed);
                 }
             } catch (Exception e) {
-                log.error("[{}] [{}] [{}]: {}", consumer.getTopic(), 
consumer.subscription, consumer.consumerName,
-                        e.getMessage());
+                log.error("[{}] [{}] [{}]: {}", consumerImpl.getTopic(), 
consumerImpl.subscription
+                        , consumerImpl.consumerName, e.getMessage());
             } finally {
                 // schedule the next stat info
                 statTimeout = pulsarClient.timer().newTimeout(stat, 
statsIntervalSeconds, TimeUnit.SECONDS);
@@ -230,22 +239,47 @@ public class ConsumerStatsRecorderImpl implements 
ConsumerStatsRecorder {
         totalAcksFailed.add(stats.getTotalAcksFailed());
     }
 
+    @Override
+    public Integer getMsgNumInReceiverQueue() {
+        if (consumer instanceof ConsumerBase) {
+            return ((ConsumerBase<?>) consumer).incomingMessages.size();
+        }
+        return null;
+    }
+
+    @Override
+    public Map<Long, Integer> getMsgNumInSubReceiverQueue() {
+        if (consumer instanceof MultiTopicsConsumerImpl) {
+            List<ConsumerImpl<?>> consumerList = ((MultiTopicsConsumerImpl) 
consumer).getConsumers();
+            return consumerList.stream().collect(
+                    Collectors.toMap((consumerImpl) -> consumerImpl.consumerId
+                            , (consumerImpl) -> 
consumerImpl.incomingMessages.size())
+            );
+        }
+        return null;
+    }
+
+    @Override
     public long getNumMsgsReceived() {
         return numMsgsReceived.longValue();
     }
 
+    @Override
     public long getNumBytesReceived() {
         return numBytesReceived.longValue();
     }
 
+    @Override
     public long getNumAcksSent() {
         return numAcksSent.longValue();
     }
 
+    @Override
     public long getNumAcksFailed() {
         return numAcksFailed.longValue();
     }
 
+    @Override
     public long getNumReceiveFailed() {
         return numReceiveFailed.longValue();
     }
@@ -255,14 +289,17 @@ public class ConsumerStatsRecorderImpl implements 
ConsumerStatsRecorder {
         return numBatchReceiveFailed.longValue();
     }
 
+    @Override
     public long getTotalMsgsReceived() {
         return totalMsgsReceived.longValue();
     }
 
+    @Override
     public long getTotalBytesReceived() {
         return totalBytesReceived.longValue();
     }
 
+    @Override
     public long getTotalReceivedFailed() {
         return totalReceiveFailed.longValue();
     }
@@ -272,10 +309,12 @@ public class ConsumerStatsRecorderImpl implements 
ConsumerStatsRecorder {
         return totalBatchReceiveFailed.longValue();
     }
 
+    @Override
     public long getTotalAcksSent() {
         return totalAcksSent.longValue();
     }
 
+    @Override
     public long getTotalAcksFailed() {
         return totalAcksFailed.longValue();
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 26e82dd..49e31f4 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -157,7 +157,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
         }
 
         this.internalConfig = getInternalConsumerConfig();
-        this.stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? 
new ConsumerStatsRecorderImpl() : null;
+        this.stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? 
new ConsumerStatsRecorderImpl(this) : null;
 
         // start track and auto subscribe partition increasement
         if (conf.isAutoUpdatePartitions()) {

Reply via email to