This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new bd86e4e9d8d [feat][client] PIP-253 Expose ProducerStats for DeadLetter
and RetryLetter producers in ConsumerStats (#20239)
bd86e4e9d8d is described below
commit bd86e4e9d8dc2fe75ac8e29d628ffc5f7f9aab8c
Author: Kai <[email protected]>
AuthorDate: Sun Oct 29 02:28:03 2023 -0700
[feat][client] PIP-253 Expose ProducerStats for DeadLetter and RetryLetter
producers in ConsumerStats (#20239)
---
.../client/api/SimpleProducerConsumerStatTest.java | 132 +++++++++++++++++++++
.../apache/pulsar/client/api/ConsumerStats.java | 10 ++
.../apache/pulsar/client/impl/ConsumerImpl.java | 4 +
.../pulsar/client/impl/ConsumerStatsDisabled.java | 21 ++++
.../pulsar/client/impl/ConsumerStatsRecorder.java | 5 +
.../client/impl/ConsumerStatsRecorderImpl.java | 25 ++++
.../impl/MultiTopicConsumerStatsRecorderImpl.java | 21 ++++
.../src/main/resources/findbugsExclude.xml | 10 ++
8 files changed, 228 insertions(+)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
index 40e401d2866..b74e396cea5 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
@@ -560,4 +560,136 @@ public class SimpleProducerConsumerStatTest extends
ProducerConsumerBase {
log.info("-- Exiting {} test --", methodName);
}
+
+ @Test
+ public void testRetryLetterAndDeadLetterStats() throws
PulsarClientException, InterruptedException {
+ final String topicName =
"persistent://my-property/my-ns/testRetryLetterAndDeadLetterStats";
+
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(topicName)
+ .subscriptionType(SubscriptionType.Shared)
+ .negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS)
+ .enableRetry(true)
+ .deadLetterPolicy(DeadLetterPolicy.builder()
+ .maxRedeliverCount(3)
+
.retryLetterTopic("persistent://my-property/my-ns/retry-topic")
+
.deadLetterTopic("persistent://my-property/my-ns/dlq-topic")
+ .build())
+ .subscriptionName("sub")
+ .subscribe();
+
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topicName)
+ .create();
+
+ final int messages = 1;
+ for (int i = 0; i < messages; i++) {
+ producer.send(("message-" + i).getBytes());
+ }
+
+ for (int i = 0; i < messages * 4; i++) {
+ // nack and reconsumeLater
+ Message msg = consumer.receive(1, TimeUnit.SECONDS);
+ if (msg != null) {
+ consumer.reconsumeLater(msg, 100, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ Awaitility.await().untilAsserted(() -> {
+ ConsumerStats stats = consumer.getStats();
+ ProducerStats retryStats = stats.getRetryLetterProducerStats();
+ ProducerStats deadLetterStats = stats.getDeadLetterProducerStats();
+ assertNotNull(retryStats);
+ assertNotNull(deadLetterStats);
+ assertEquals(retryStats.getTotalMsgsSent(), 3);
+ assertEquals(deadLetterStats.getTotalMsgsSent(), 1);
+ });
+ }
+ @Test
+ public void testDeadLetterStats() throws PulsarClientException,
InterruptedException {
+ final String topicName =
"persistent://my-property/my-ns/testDeadLetterStats";
+
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(topicName)
+ .subscriptionType(SubscriptionType.Shared)
+ .negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS)
+ .deadLetterPolicy(DeadLetterPolicy.builder()
+ .maxRedeliverCount(1)
+
.deadLetterTopic("persistent://my-property/my-ns/dlq-topic")
+ .build())
+ .subscriptionName("sub")
+ .subscribe();
+
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topicName)
+ .create();
+
+ final int messages = 1;
+ for (int i = 0; i < messages; i++) {
+ producer.send(("message-" + i).getBytes());
+ }
+
+ for (int i = 0; i < messages * 2; i++) {
+ // nack and reconsumeLater
+ Message msg = consumer.receive(1, TimeUnit.SECONDS);
+ if (msg != null) {
+ consumer.negativeAcknowledge(msg);
+ }
+ }
+
+ Awaitility.await().untilAsserted(() -> {
+ ConsumerStats stats = consumer.getStats();
+ ProducerStats dlqStats = stats.getDeadLetterProducerStats();
+ assertNotNull(dlqStats);
+ assertEquals(dlqStats.getTotalMsgsSent(), 1);
+ });
+ }
+
+ @Test
+ public void testPartitionedRetryLetterAndDeadLetterStats()
+ throws PulsarClientException, InterruptedException,
PulsarAdminException {
+ final String topicName =
"persistent://my-property/my-ns/testPartitionedRetryLetterAndDeadLetterStats";
+
+ admin.topics().createPartitionedTopic(topicName, 10);
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(topicName)
+ .subscriptionType(SubscriptionType.Shared)
+ .negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS)
+ .enableRetry(true)
+ .deadLetterPolicy(DeadLetterPolicy.builder()
+ .maxRedeliverCount(3)
+
.retryLetterTopic("persistent://my-property/my-ns/retry-topic")
+
.deadLetterTopic("persistent://my-property/my-ns/dlq-topic")
+ .build())
+ .subscriptionName("sub")
+ .subscribe();
+
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topicName)
+ .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
+ .create();
+
+ final int messages = 30;
+ for (int i = 0; i < messages; i++) {
+ producer.send(("message-" + i).getBytes());
+ }
+
+ for (int i = 0; i < messages * 4; i++) {
+ // nack and reconsumeLater
+ Message msg = consumer.receive(1, TimeUnit.SECONDS);
+ if (msg != null) {
+ consumer.reconsumeLater(msg, 100, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ Awaitility.await().untilAsserted(() -> {
+ ConsumerStats stats = consumer.getStats();
+ ProducerStats retryStats = stats.getRetryLetterProducerStats();
+ ProducerStats deadLetterStats = stats.getDeadLetterProducerStats();
+ assertNotNull(retryStats);
+ assertNotNull(deadLetterStats);
+ assertEquals(retryStats.getTotalMsgsSent(), 3 * messages);
+ assertEquals(deadLetterStats.getTotalMsgsSent(), messages);
+ });
+ }
}
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerStats.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerStats.java
index 529101ecde3..7935e05d55b 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerStats.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerStats.java
@@ -122,4 +122,14 @@ public interface ConsumerStats extends Serializable {
default Map<String, ConsumerStats> getPartitionStats() {
return Collections.emptyMap();
}
+
+ /**
+ * @return producer stats for deadLetterProducer if available
+ */
+ ProducerStats getDeadLetterProducerStats();
+
+ /**
+ * @return producer stats for retryLetterProducer if available
+ */
+ ProducerStats getRetryLetterProducerStats();
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index f390b80a7f0..15cfeb26741 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -617,6 +617,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
.enableChunking(true)
.blockIfQueueFull(false)
.create();
+
stats.setRetryLetterProducerStats(retryLetterProducer.getStats());
}
} catch (Exception e) {
log.error("Create retry letter producer exception with topic:
{}",
@@ -2168,6 +2169,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
.enableBatching(false)
.enableChunking(true)
.createAsync();
+ deadLetterProducer.thenAccept(dlqProducer -> {
+
stats.setDeadLetterProducerStats(dlqProducer.getStats());
+ });
}
} finally {
createProducerLock.writeLock().unlock();
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsDisabled.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsDisabled.java
index e8719753bef..374b88ab3eb 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsDisabled.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsDisabled.java
@@ -23,6 +23,7 @@ import java.util.Map;
import java.util.Optional;
import org.apache.pulsar.client.api.ConsumerStats;
import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.ProducerStats;
public class ConsumerStatsDisabled implements ConsumerStatsRecorder {
private static final long serialVersionUID = 1L;
@@ -124,6 +125,16 @@ public class ConsumerStatsDisabled implements
ConsumerStatsRecorder {
return null;
}
+ @Override
+ public ProducerStats getDeadLetterProducerStats() {
+ return null;
+ }
+
+ @Override
+ public ProducerStats getRetryLetterProducerStats() {
+ return null;
+ }
+
@Override
public double getRateMsgsReceived() {
return 0;
@@ -148,4 +159,14 @@ public class ConsumerStatsDisabled implements
ConsumerStatsRecorder {
public void updateCumulativeStats(ConsumerStats stats) {
// do nothing
}
+
+ @Override
+ public void setDeadLetterProducerStats(ProducerStats producerStats) {
+ // do nothing
+ }
+
+ @Override
+ public void setRetryLetterProducerStats(ProducerStats producerStats) {
+ // do nothing
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorder.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorder.java
index 1a7de725f31..1d0d9e734b3 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorder.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorder.java
@@ -22,6 +22,7 @@ import io.netty.util.Timeout;
import java.util.Optional;
import org.apache.pulsar.client.api.ConsumerStats;
import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.ProducerStats;
public interface ConsumerStatsRecorder extends ConsumerStats {
void updateNumMsgsReceived(Message<?> message);
@@ -39,4 +40,8 @@ public interface ConsumerStatsRecorder extends ConsumerStats {
void reset();
void updateCumulativeStats(ConsumerStats stats);
+
+ void setDeadLetterProducerStats(ProducerStats producerStats);
+
+ void setRetryLetterProducerStats(ProducerStats producerStats);
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java
index 3a47ddc5d4b..8dfc0af8e1d 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java
@@ -33,6 +33,7 @@ import java.util.stream.Collectors;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerStats;
import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.ProducerStats;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.slf4j.Logger;
@@ -63,6 +64,10 @@ public class ConsumerStatsRecorderImpl implements
ConsumerStatsRecorder {
private volatile double receivedMsgsRate;
private volatile double receivedBytesRate;
+ volatile ProducerStats deadLetterProducerStats;
+
+ volatile ProducerStats retryLetterProducerStats;
+
private static final DecimalFormat THROUGHPUT_FORMAT = new
DecimalFormat("0.00");
public ConsumerStatsRecorderImpl() {
@@ -259,6 +264,26 @@ public class ConsumerStatsRecorderImpl implements
ConsumerStatsRecorder {
return null;
}
+ @Override
+ public ProducerStats getDeadLetterProducerStats() {
+ return deadLetterProducerStats;
+ }
+
+ @Override
+ public ProducerStats getRetryLetterProducerStats() {
+ return retryLetterProducerStats;
+ }
+
+ @Override
+ public void setDeadLetterProducerStats(ProducerStats producerStats) {
+ this.deadLetterProducerStats = producerStats;
+ }
+
+ @Override
+ public void setRetryLetterProducerStats(ProducerStats producerStats) {
+ this.retryLetterProducerStats = producerStats;
+ }
+
@Override
public long getNumMsgsReceived() {
return numMsgsReceived.longValue();
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicConsumerStatsRecorderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicConsumerStatsRecorderImpl.java
index 17018be02be..eb4a339e20b 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicConsumerStatsRecorderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicConsumerStatsRecorderImpl.java
@@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerStats;
import org.apache.pulsar.client.api.MultiTopicConsumerStats;
+import org.apache.pulsar.client.api.ProducerStats;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,6 +33,10 @@ public class MultiTopicConsumerStatsRecorderImpl extends
ConsumerStatsRecorderIm
private static final long serialVersionUID = 1L;
private Map<String, ConsumerStats> partitionStats = new
ConcurrentHashMap<>();
+ private PartitionedTopicProducerStatsRecorderImpl deadLetterStats = new
PartitionedTopicProducerStatsRecorderImpl();
+ private PartitionedTopicProducerStatsRecorderImpl retryLetterStats =
+ new PartitionedTopicProducerStatsRecorderImpl();
+
public MultiTopicConsumerStatsRecorderImpl() {
super();
}
@@ -55,5 +60,21 @@ public class MultiTopicConsumerStatsRecorderImpl extends
ConsumerStatsRecorderIm
return partitionStats;
}
+ @Override
+ public ProducerStats getDeadLetterProducerStats() {
+ deadLetterStats.reset();
+ partitionStats.forEach((partition, consumerStats) ->
deadLetterStats.updateCumulativeStats(partition,
+ consumerStats.getDeadLetterProducerStats()));
+ return deadLetterStats;
+ }
+
+ @Override
+ public ProducerStats getRetryLetterProducerStats() {
+ retryLetterStats.reset();
+ partitionStats.forEach((partition, consumerStats) ->
retryLetterStats.updateCumulativeStats(partition,
+ consumerStats.getRetryLetterProducerStats()));
+ return retryLetterStats;
+ }
+
private static final Logger log =
LoggerFactory.getLogger(MultiTopicConsumerStatsRecorderImpl.class);
}
diff --git a/pulsar-client/src/main/resources/findbugsExclude.xml
b/pulsar-client/src/main/resources/findbugsExclude.xml
index f47f9b4a31a..0e05d20cb9b 100644
--- a/pulsar-client/src/main/resources/findbugsExclude.xml
+++ b/pulsar-client/src/main/resources/findbugsExclude.xml
@@ -1033,4 +1033,14 @@
<Method name="getAckSet"/>
<Bug pattern="EI_EXPOSE_REP"/>
</Match>
+ <Match>
+ <Class name="org.apache.pulsar.client.impl.ConsumerImpl"/>
+ <Method name="getStats"/>
+ <Bug pattern="EI_EXPOSE_REP"/>
+ </Match>
+ <Match>
+ <Class name="org.apache.pulsar.client.impl.MultiTopicsConsumerImpl"/>
+ <Method name="getStats"/>
+ <Bug pattern="EI_EXPOSE_REP"/>
+ </Match>
</FindBugsFilter>