This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new bd86e4e9d8d [feat][client] PIP-253 Expose ProducerStats for DeadLetter 
and RetryLetter producers in ConsumerStats (#20239)
bd86e4e9d8d is described below

commit bd86e4e9d8dc2fe75ac8e29d628ffc5f7f9aab8c
Author: Kai <[email protected]>
AuthorDate: Sun Oct 29 02:28:03 2023 -0700

    [feat][client] PIP-253 Expose ProducerStats for DeadLetter and RetryLetter 
producers in ConsumerStats (#20239)
---
 .../client/api/SimpleProducerConsumerStatTest.java | 132 +++++++++++++++++++++
 .../apache/pulsar/client/api/ConsumerStats.java    |  10 ++
 .../apache/pulsar/client/impl/ConsumerImpl.java    |   4 +
 .../pulsar/client/impl/ConsumerStatsDisabled.java  |  21 ++++
 .../pulsar/client/impl/ConsumerStatsRecorder.java  |   5 +
 .../client/impl/ConsumerStatsRecorderImpl.java     |  25 ++++
 .../impl/MultiTopicConsumerStatsRecorderImpl.java  |  21 ++++
 .../src/main/resources/findbugsExclude.xml         |  10 ++
 8 files changed, 228 insertions(+)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
index 40e401d2866..b74e396cea5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
@@ -560,4 +560,136 @@ public class SimpleProducerConsumerStatTest extends 
ProducerConsumerBase {
 
         log.info("-- Exiting {} test --", methodName);
     }
+
+    @Test
+    public void testRetryLetterAndDeadLetterStats() throws 
PulsarClientException, InterruptedException {
+        final String topicName = 
"persistent://my-property/my-ns/testRetryLetterAndDeadLetterStats";
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topicName)
+                .subscriptionType(SubscriptionType.Shared)
+                .negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS)
+                .enableRetry(true)
+                .deadLetterPolicy(DeadLetterPolicy.builder()
+                        .maxRedeliverCount(3)
+                        
.retryLetterTopic("persistent://my-property/my-ns/retry-topic")
+                        
.deadLetterTopic("persistent://my-property/my-ns/dlq-topic")
+                        .build())
+                .subscriptionName("sub")
+                .subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName)
+                .create();
+
+        final int messages = 1;
+        for (int i = 0; i < messages; i++) {
+            producer.send(("message-" + i).getBytes());
+        }
+
+        for (int i = 0; i < messages * 4; i++) {
+            // nack and reconsumeLater
+            Message msg = consumer.receive(1, TimeUnit.SECONDS);
+            if (msg != null) {
+                consumer.reconsumeLater(msg, 100, TimeUnit.MILLISECONDS);
+            }
+        }
+
+        Awaitility.await().untilAsserted(() -> {
+            ConsumerStats stats = consumer.getStats();
+            ProducerStats retryStats = stats.getRetryLetterProducerStats();
+            ProducerStats deadLetterStats = stats.getDeadLetterProducerStats();
+            assertNotNull(retryStats);
+            assertNotNull(deadLetterStats);
+            assertEquals(retryStats.getTotalMsgsSent(), 3);
+            assertEquals(deadLetterStats.getTotalMsgsSent(), 1);
+        });
+    }
+    @Test
+    public void testDeadLetterStats() throws PulsarClientException, 
InterruptedException {
+        final String topicName = 
"persistent://my-property/my-ns/testDeadLetterStats";
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topicName)
+                .subscriptionType(SubscriptionType.Shared)
+                .negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS)
+                .deadLetterPolicy(DeadLetterPolicy.builder()
+                        .maxRedeliverCount(1)
+                        
.deadLetterTopic("persistent://my-property/my-ns/dlq-topic")
+                        .build())
+                .subscriptionName("sub")
+                .subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName)
+                .create();
+
+        final int messages = 1;
+        for (int i = 0; i < messages; i++) {
+            producer.send(("message-" + i).getBytes());
+        }
+
+        for (int i = 0; i < messages * 2; i++) {
+            // nack and reconsumeLater
+            Message msg = consumer.receive(1, TimeUnit.SECONDS);
+            if (msg != null) {
+                consumer.negativeAcknowledge(msg);
+            }
+        }
+
+        Awaitility.await().untilAsserted(() -> {
+            ConsumerStats stats = consumer.getStats();
+            ProducerStats dlqStats = stats.getDeadLetterProducerStats();
+            assertNotNull(dlqStats);
+            assertEquals(dlqStats.getTotalMsgsSent(), 1);
+        });
+    }
+
+    @Test
+    public void testPartitionedRetryLetterAndDeadLetterStats()
+            throws PulsarClientException, InterruptedException, 
PulsarAdminException {
+        final String topicName = 
"persistent://my-property/my-ns/testPartitionedRetryLetterAndDeadLetterStats";
+
+        admin.topics().createPartitionedTopic(topicName, 10);
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topicName)
+                .subscriptionType(SubscriptionType.Shared)
+                .negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS)
+                .enableRetry(true)
+                .deadLetterPolicy(DeadLetterPolicy.builder()
+                        .maxRedeliverCount(3)
+                        
.retryLetterTopic("persistent://my-property/my-ns/retry-topic")
+                        
.deadLetterTopic("persistent://my-property/my-ns/dlq-topic")
+                        .build())
+                .subscriptionName("sub")
+                .subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName)
+                .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
+                .create();
+
+        final int messages = 30;
+        for (int i = 0; i < messages; i++) {
+            producer.send(("message-" + i).getBytes());
+        }
+
+        for (int i = 0; i < messages * 4; i++) {
+            // nack and reconsumeLater
+            Message msg = consumer.receive(1, TimeUnit.SECONDS);
+            if (msg != null) {
+                consumer.reconsumeLater(msg, 100, TimeUnit.MILLISECONDS);
+            }
+        }
+
+        Awaitility.await().untilAsserted(() -> {
+            ConsumerStats stats = consumer.getStats();
+            ProducerStats retryStats = stats.getRetryLetterProducerStats();
+            ProducerStats deadLetterStats = stats.getDeadLetterProducerStats();
+            assertNotNull(retryStats);
+            assertNotNull(deadLetterStats);
+            assertEquals(retryStats.getTotalMsgsSent(), 3 * messages);
+            assertEquals(deadLetterStats.getTotalMsgsSent(), messages);
+        });
+    }
 }
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerStats.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerStats.java
index 529101ecde3..7935e05d55b 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerStats.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerStats.java
@@ -122,4 +122,14 @@ public interface ConsumerStats extends Serializable {
     default Map<String, ConsumerStats> getPartitionStats() {
         return Collections.emptyMap();
     }
+
+    /**
+     * @return producer stats for deadLetterProducer if available
+     */
+    ProducerStats getDeadLetterProducerStats();
+
+    /**
+     * @return producer stats for retryLetterProducer if available
+     */
+    ProducerStats getRetryLetterProducerStats();
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index f390b80a7f0..15cfeb26741 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -617,6 +617,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                             .enableChunking(true)
                             .blockIfQueueFull(false)
                             .create();
+                    
stats.setRetryLetterProducerStats(retryLetterProducer.getStats());
                 }
             } catch (Exception e) {
                 log.error("Create retry letter producer exception with topic: 
{}",
@@ -2168,6 +2169,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                                     .enableBatching(false)
                                     .enableChunking(true)
                                     .createAsync();
+                    deadLetterProducer.thenAccept(dlqProducer -> {
+                        
stats.setDeadLetterProducerStats(dlqProducer.getStats());
+                    });
                 }
             } finally {
                 createProducerLock.writeLock().unlock();
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsDisabled.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsDisabled.java
index e8719753bef..374b88ab3eb 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsDisabled.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsDisabled.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import java.util.Optional;
 import org.apache.pulsar.client.api.ConsumerStats;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.ProducerStats;
 
 public class ConsumerStatsDisabled implements ConsumerStatsRecorder {
     private static final long serialVersionUID = 1L;
@@ -124,6 +125,16 @@ public class ConsumerStatsDisabled implements 
ConsumerStatsRecorder {
         return null;
     }
 
+    @Override
+    public ProducerStats getDeadLetterProducerStats() {
+        return null;
+    }
+
+    @Override
+    public ProducerStats getRetryLetterProducerStats() {
+        return null;
+    }
+
     @Override
     public double getRateMsgsReceived() {
         return 0;
@@ -148,4 +159,14 @@ public class ConsumerStatsDisabled implements 
ConsumerStatsRecorder {
     public void updateCumulativeStats(ConsumerStats stats) {
         // do nothing
     }
+
+    @Override
+    public void setDeadLetterProducerStats(ProducerStats producerStats) {
+        // do nothing
+    }
+
+    @Override
+    public void setRetryLetterProducerStats(ProducerStats producerStats) {
+        // do nothing
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorder.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorder.java
index 1a7de725f31..1d0d9e734b3 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorder.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorder.java
@@ -22,6 +22,7 @@ import io.netty.util.Timeout;
 import java.util.Optional;
 import org.apache.pulsar.client.api.ConsumerStats;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.ProducerStats;
 
 public interface ConsumerStatsRecorder extends ConsumerStats {
     void updateNumMsgsReceived(Message<?> message);
@@ -39,4 +40,8 @@ public interface ConsumerStatsRecorder extends ConsumerStats {
     void reset();
 
     void updateCumulativeStats(ConsumerStats stats);
+
+    void setDeadLetterProducerStats(ProducerStats producerStats);
+
+    void setRetryLetterProducerStats(ProducerStats producerStats);
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java
index 3a47ddc5d4b..8dfc0af8e1d 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java
@@ -33,6 +33,7 @@ import java.util.stream.Collectors;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerStats;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.ProducerStats;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.slf4j.Logger;
@@ -63,6 +64,10 @@ public class ConsumerStatsRecorderImpl implements 
ConsumerStatsRecorder {
     private volatile double receivedMsgsRate;
     private volatile double receivedBytesRate;
 
+    volatile ProducerStats deadLetterProducerStats;
+
+    volatile ProducerStats retryLetterProducerStats;
+
     private static final DecimalFormat THROUGHPUT_FORMAT = new 
DecimalFormat("0.00");
 
     public ConsumerStatsRecorderImpl() {
@@ -259,6 +264,26 @@ public class ConsumerStatsRecorderImpl implements 
ConsumerStatsRecorder {
         return null;
     }
 
+    @Override
+    public ProducerStats getDeadLetterProducerStats() {
+        return deadLetterProducerStats;
+    }
+
+    @Override
+    public ProducerStats getRetryLetterProducerStats() {
+        return retryLetterProducerStats;
+    }
+
+    @Override
+    public void setDeadLetterProducerStats(ProducerStats producerStats) {
+        this.deadLetterProducerStats = producerStats;
+    }
+
+    @Override
+    public void setRetryLetterProducerStats(ProducerStats producerStats) {
+        this.retryLetterProducerStats = producerStats;
+    }
+
     @Override
     public long getNumMsgsReceived() {
         return numMsgsReceived.longValue();
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicConsumerStatsRecorderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicConsumerStatsRecorderImpl.java
index 17018be02be..eb4a339e20b 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicConsumerStatsRecorderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicConsumerStatsRecorderImpl.java
@@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerStats;
 import org.apache.pulsar.client.api.MultiTopicConsumerStats;
+import org.apache.pulsar.client.api.ProducerStats;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,6 +33,10 @@ public class MultiTopicConsumerStatsRecorderImpl extends 
ConsumerStatsRecorderIm
     private static final long serialVersionUID = 1L;
     private Map<String, ConsumerStats> partitionStats = new 
ConcurrentHashMap<>();
 
+    private PartitionedTopicProducerStatsRecorderImpl deadLetterStats = new 
PartitionedTopicProducerStatsRecorderImpl();
+    private PartitionedTopicProducerStatsRecorderImpl retryLetterStats =
+            new PartitionedTopicProducerStatsRecorderImpl();
+
     public MultiTopicConsumerStatsRecorderImpl() {
         super();
     }
@@ -55,5 +60,21 @@ public class MultiTopicConsumerStatsRecorderImpl extends 
ConsumerStatsRecorderIm
         return partitionStats;
     }
 
+    @Override
+    public ProducerStats getDeadLetterProducerStats() {
+        deadLetterStats.reset();
+        partitionStats.forEach((partition, consumerStats) -> 
deadLetterStats.updateCumulativeStats(partition,
+                consumerStats.getDeadLetterProducerStats()));
+        return deadLetterStats;
+    }
+
+    @Override
+    public ProducerStats getRetryLetterProducerStats() {
+        retryLetterStats.reset();
+        partitionStats.forEach((partition, consumerStats) -> 
retryLetterStats.updateCumulativeStats(partition,
+                consumerStats.getRetryLetterProducerStats()));
+        return retryLetterStats;
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(MultiTopicConsumerStatsRecorderImpl.class);
 }
diff --git a/pulsar-client/src/main/resources/findbugsExclude.xml 
b/pulsar-client/src/main/resources/findbugsExclude.xml
index f47f9b4a31a..0e05d20cb9b 100644
--- a/pulsar-client/src/main/resources/findbugsExclude.xml
+++ b/pulsar-client/src/main/resources/findbugsExclude.xml
@@ -1033,4 +1033,14 @@
         <Method name="getAckSet"/>
         <Bug pattern="EI_EXPOSE_REP"/>
     </Match>
+    <Match>
+        <Class name="org.apache.pulsar.client.impl.ConsumerImpl"/>
+        <Method name="getStats"/>
+        <Bug pattern="EI_EXPOSE_REP"/>
+    </Match>
+    <Match>
+        <Class name="org.apache.pulsar.client.impl.MultiTopicsConsumerImpl"/>
+        <Method name="getStats"/>
+        <Bug pattern="EI_EXPOSE_REP"/>
+    </Match>
 </FindBugsFilter>

Reply via email to