This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 978762d434056d291f64c192a8c96af313dd1ad8
Author: Lari Hotari <[email protected]>
AuthorDate: Fri Oct 10 11:46:55 2025 +0300

    [fix][client] Fix getPendingQueueSize for 
PartitionedTopicProducerStatsRecorderImpl: avoid NPE and implement aggregation 
(#24830)
    
    Co-authored-by: Zixuan Liu <[email protected]>
    (cherry picked from commit a43762ba421c4517c9803f9b7bda986153a9351b)
---
 .../PartitionedTopicProducerStatsRecorderImpl.java |  8 +++++++
 .../client/impl/ProducerStatsRecorderImpl.java     |  2 +-
 .../client/impl/ProducerStatsRecorderImplTest.java | 25 ++++++++++++++++++++++
 3 files changed, 34 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedTopicProducerStatsRecorderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedTopicProducerStatsRecorderImpl.java
index 2f73a6af406..65aeac40ce7 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedTopicProducerStatsRecorderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedTopicProducerStatsRecorderImpl.java
@@ -35,6 +35,7 @@ public class PartitionedTopicProducerStatsRecorderImpl 
extends ProducerStatsReco
     private final DoubleAdder sendMsgsRateAggregate;
     private final DoubleAdder sendBytesRateAggregate;
     private int partitions = 0;
+    private int pendingQueueSize;
 
     public PartitionedTopicProducerStatsRecorderImpl() {
         super();
@@ -46,6 +47,7 @@ public class PartitionedTopicProducerStatsRecorderImpl 
extends ProducerStatsReco
     void reset() {
         super.reset();
         partitions = 0;
+        pendingQueueSize = 0;
     }
 
     void updateCumulativeStats(String partition, ProducerStats stats) {
@@ -58,6 +60,7 @@ public class PartitionedTopicProducerStatsRecorderImpl 
extends ProducerStatsReco
         sendMsgsRateAggregate.add(stats.getSendMsgsRate());
         sendBytesRateAggregate.add(stats.getSendBytesRate());
         partitions++;
+        pendingQueueSize += stats.getPendingQueueSize();
     }
 
     @Override
@@ -75,5 +78,10 @@ public class PartitionedTopicProducerStatsRecorderImpl 
extends ProducerStatsReco
         return partitionStats;
     }
 
+    @Override
+    public int getPendingQueueSize() {
+        return pendingQueueSize;
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(PartitionedTopicProducerStatsRecorderImpl.class);
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
index d828d729cc7..5e8c746ed89 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
@@ -332,7 +332,7 @@ public class ProducerStatsRecorderImpl implements 
ProducerStatsRecorder {
 
     @Override
     public int getPendingQueueSize() {
-        return producer.getPendingQueueSize();
+        return producer != null ? producer.getPendingQueueSize() : 0;
     }
 
     public void cancelStatsTimeout() {
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java
index 8f648bfd9ff..1981c2c8ff5 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java
@@ -93,4 +93,29 @@ public class ProducerStatsRecorderImplTest {
         assertTrue(recorder2.getSendBytesRate() > 0);
         assertTrue(recorder2.getSendMsgsRate() > 0);
     }
+
+    @Test
+    public void testPartitionedTopicProducerStatsPendingQueueSizeDoesntNPE() {
+        PartitionedTopicProducerStatsRecorderImpl recorder = new 
PartitionedTopicProducerStatsRecorderImpl();
+        assertEquals(recorder.getPendingQueueSize(), 0);
+    }
+
+    @Test
+    public void testProducerStatsPendingQueueSizeDoesntNPE() {
+        ProducerStatsRecorderImpl recorder = new ProducerStatsRecorderImpl();
+        assertEquals(recorder.getPendingQueueSize(), 0);
+    }
+
+    @Test
+    public void testPartitionedTopicProducerStatsPendingQueueSizeAggregated() {
+        PartitionedTopicProducerStatsRecorderImpl recorder = new 
PartitionedTopicProducerStatsRecorderImpl();
+
+        ProducerStatsRecorderImpl individualStats = spy(new 
ProducerStatsRecorderImpl());
+        when(individualStats.getPendingQueueSize()).thenReturn(1);
+        recorder.updateCumulativeStats("1", individualStats);
+        recorder.updateCumulativeStats("2", individualStats);
+        recorder.updateCumulativeStats("3", individualStats);
+
+        assertEquals(recorder.getPendingQueueSize(), 3);
+    }
 }

Reply via email to