This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9287e873df8f13f9ac88846eaa49dd411a8ebc4a
Author: Lari Hotari <[email protected]>
AuthorDate: Mon Aug 23 21:15:14 2021 +0300

    [Broker] Handle NPE when full key range isn't covered with active consumers 
(#11749)
    
    (cherry picked from commit 8027ab4e8763486de16d4b2f850b234b70a16b27)
---
 .../PersistentStickyKeyDispatcherMultipleConsumers.java           | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index c23b360..d4d64e2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -175,8 +175,12 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
         for (Entry entry : entries) {
             int stickyKeyHash = getStickyKeyHash(entry);
             Consumer c = selector.select(stickyKeyHash);
-            groupedEntries.computeIfAbsent(c, k -> new 
ArrayList<>()).add(entry);
-            consumerStickyKeyHashesMap.computeIfAbsent(c, k -> new 
HashSet<>()).add(stickyKeyHash);
+            if (c != null) {
+                groupedEntries.computeIfAbsent(c, k -> new 
ArrayList<>()).add(entry);
+                consumerStickyKeyHashesMap.computeIfAbsent(c, k -> new 
HashSet<>()).add(stickyKeyHash);
+            } else {
+                entry.release();
+            }
         }
 
         AtomicInteger keyNumbers = new AtomicInteger(groupedEntries.size());

Reply via email to