This is an automated email from the ASF dual-hosted git repository. chenhang pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 9287e873df8f13f9ac88846eaa49dd411a8ebc4a Author: Lari Hotari <[email protected]> AuthorDate: Mon Aug 23 21:15:14 2021 +0300 [Broker] Handle NPE when full key range isn't covered with active consumers (#11749) (cherry picked from commit 8027ab4e8763486de16d4b2f850b234b70a16b27) --- .../PersistentStickyKeyDispatcherMultipleConsumers.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index c23b360..d4d64e2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -175,8 +175,12 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi for (Entry entry : entries) { int stickyKeyHash = getStickyKeyHash(entry); Consumer c = selector.select(stickyKeyHash); - groupedEntries.computeIfAbsent(c, k -> new ArrayList<>()).add(entry); - consumerStickyKeyHashesMap.computeIfAbsent(c, k -> new HashSet<>()).add(stickyKeyHash); + if (c != null) { + groupedEntries.computeIfAbsent(c, k -> new ArrayList<>()).add(entry); + consumerStickyKeyHashesMap.computeIfAbsent(c, k -> new HashSet<>()).add(stickyKeyHash); + } else { + entry.release(); + } } AtomicInteger keyNumbers = new AtomicInteger(groupedEntries.size());
