This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.1 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 978762d434056d291f64c192a8c96af313dd1ad8 Author: Lari Hotari <[email protected]> AuthorDate: Fri Oct 10 11:46:55 2025 +0300 [fix][client] Fix getPendingQueueSize for PartitionedTopicProducerStatsRecorderImpl: avoid NPE and implement aggregation (#24830) Co-authored-by: Zixuan Liu <[email protected]> (cherry picked from commit a43762ba421c4517c9803f9b7bda986153a9351b) --- .../PartitionedTopicProducerStatsRecorderImpl.java | 8 +++++++ .../client/impl/ProducerStatsRecorderImpl.java | 2 +- .../client/impl/ProducerStatsRecorderImplTest.java | 25 ++++++++++++++++++++++ 3 files changed, 34 insertions(+), 1 deletion(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedTopicProducerStatsRecorderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedTopicProducerStatsRecorderImpl.java index 2f73a6af406..65aeac40ce7 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedTopicProducerStatsRecorderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedTopicProducerStatsRecorderImpl.java @@ -35,6 +35,7 @@ public class PartitionedTopicProducerStatsRecorderImpl extends ProducerStatsReco private final DoubleAdder sendMsgsRateAggregate; private final DoubleAdder sendBytesRateAggregate; private int partitions = 0; + private int pendingQueueSize; public PartitionedTopicProducerStatsRecorderImpl() { super(); @@ -46,6 +47,7 @@ public class PartitionedTopicProducerStatsRecorderImpl extends ProducerStatsReco void reset() { super.reset(); partitions = 0; + pendingQueueSize = 0; } void updateCumulativeStats(String partition, ProducerStats stats) { @@ -58,6 +60,7 @@ public class PartitionedTopicProducerStatsRecorderImpl extends ProducerStatsReco sendMsgsRateAggregate.add(stats.getSendMsgsRate()); sendBytesRateAggregate.add(stats.getSendBytesRate()); partitions++; + pendingQueueSize += stats.getPendingQueueSize(); } @Override @@ -75,5 +78,10 @@ public class PartitionedTopicProducerStatsRecorderImpl extends ProducerStatsReco return partitionStats; } + @Override + public int getPendingQueueSize() { + return pendingQueueSize; + } + private static final Logger log = LoggerFactory.getLogger(PartitionedTopicProducerStatsRecorderImpl.class); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java index d828d729cc7..5e8c746ed89 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java @@ -332,7 +332,7 @@ public class ProducerStatsRecorderImpl implements ProducerStatsRecorder { @Override public int getPendingQueueSize() { - return producer.getPendingQueueSize(); + return producer != null ? producer.getPendingQueueSize() : 0; } public void cancelStatsTimeout() { diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java index 8f648bfd9ff..1981c2c8ff5 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java @@ -93,4 +93,29 @@ public class ProducerStatsRecorderImplTest { assertTrue(recorder2.getSendBytesRate() > 0); assertTrue(recorder2.getSendMsgsRate() > 0); } + + @Test + public void testPartitionedTopicProducerStatsPendingQueueSizeDoesntNPE() { + PartitionedTopicProducerStatsRecorderImpl recorder = new PartitionedTopicProducerStatsRecorderImpl(); + assertEquals(recorder.getPendingQueueSize(), 0); + } + + @Test + public void testProducerStatsPendingQueueSizeDoesntNPE() { + ProducerStatsRecorderImpl recorder = new ProducerStatsRecorderImpl(); + assertEquals(recorder.getPendingQueueSize(), 0); + } + + @Test + public void testPartitionedTopicProducerStatsPendingQueueSizeAggregated() { + PartitionedTopicProducerStatsRecorderImpl recorder = new PartitionedTopicProducerStatsRecorderImpl(); + + ProducerStatsRecorderImpl individualStats = spy(new ProducerStatsRecorderImpl()); + when(individualStats.getPendingQueueSize()).thenReturn(1); + recorder.updateCumulativeStats("1", individualStats); + recorder.updateCumulativeStats("2", individualStats); + recorder.updateCumulativeStats("3", individualStats); + + assertEquals(recorder.getPendingQueueSize(), 3); + } }
