This is an automated email from the ASF dual-hosted git repository.

mmarshall pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 2318a18  [Broker] Fix NPE when subscription is already removed (#14363)
2318a18 is described below

commit 2318a180c06c5d885af8cbbbcdae4f2ea8468cc1
Author: Michael Marshall <[email protected]>
AuthorDate: Fri Mar 25 12:03:49 2022 -0500

    [Broker] Fix NPE when subscription is already removed (#14363)
    
    * [Broker] Fix NPE when subscription is already removed
    
    * Cover same case for NonPersistentTopic
    
    Master Issue: #14362
    
    ### Motivation
    
    There is current a race condition when we remove a subscription. The race 
and how to reproduce it is described in the #14362. One of the consequences of 
the race is that there is a chance we try to remove the subscription from the 
topic twice. This leads to an NPE, as described in the issue.
    
    ### Modifications
    
    * Verify that the `sub` is not null before getting its stats.
    
    ### Verifying this change
    
    This is a trivial change.
    
    (cherry picked from commit aee1e7dbc55099c6b7cdc49e7b5e1c4cd66994ce)
---
 .../broker/service/nonpersistent/NonPersistentTopic.java       | 10 ++++++----
 .../pulsar/broker/service/persistent/PersistentTopic.java      | 10 ++++++----
 2 files changed, 12 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 71f1764..d242ed3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -1063,10 +1063,12 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic, TopicPol
         // That creates deadlock. so, execute remove it in different thread.
         return CompletableFuture.runAsync(() -> {
             NonPersistentSubscription sub = 
subscriptions.remove(subscriptionName);
-            // preserve accumulative stats form removed subscription
-            SubscriptionStatsImpl stats = sub.getStats();
-            bytesOutFromRemovedSubscriptions.add(stats.bytesOutCounter);
-            msgOutFromRemovedSubscriptions.add(stats.msgOutCounter);
+            if (sub != null) {
+                // preserve accumulative stats form removed subscription
+                SubscriptionStatsImpl stats = sub.getStats();
+                bytesOutFromRemovedSubscriptions.add(stats.bytesOutCounter);
+                msgOutFromRemovedSubscriptions.add(stats.msgOutCounter);
+            }
         }, brokerService.executor());
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 235ea52..bc06ea4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1050,10 +1050,12 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 
     void removeSubscription(String subscriptionName) {
         PersistentSubscription sub = subscriptions.remove(subscriptionName);
-        // preserve accumulative stats form removed subscription
-        SubscriptionStatsImpl stats = sub.getStats(false, false, false);
-        bytesOutFromRemovedSubscriptions.add(stats.bytesOutCounter);
-        msgOutFromRemovedSubscriptions.add(stats.msgOutCounter);
+        if (sub != null) {
+            // preserve accumulative stats form removed subscription
+            SubscriptionStatsImpl stats = sub.getStats(false, false, false);
+            bytesOutFromRemovedSubscriptions.add(stats.bytesOutCounter);
+            msgOutFromRemovedSubscriptions.add(stats.msgOutCounter);
+        }
     }
 
     /**

Reply via email to