jdfrozen opened a new issue, #21903: URL: https://github.com/apache/pulsar/issues/21903
### Search before asking - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) and found nothing similar. ### Version 2.7.4 ### Minimal reproduce step producer Send a lot of messages ` import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.TypedMessageBuilder; import org.springframework.stereotype.Component; @Component @Slf4j public class ProductInit { public static void main(String[] args) { try { PulsarClient client = PulsarClient.builder() .serviceUrl("pulsar://127.0.0.1:6650") .ioThreads(2) .listenerThreads(10) .build(); Producer<String> producer = client.newProducer(Schema.STRING) .topic("persistent://DTS/hhhh/aa") .enableBatching(false) .create(); new Thread(() -> { for(int i=0;i<100000;i++) { try { String msg = String.valueOf(i); TypedMessageBuilder typedMessageBuilder = producer.newMessage().value(msg); typedMessageBuilder.send(); Thread.sleep(1); } catch (Exception e) { e.printStackTrace(); } } }).start(); }catch (Exception e){ log.error("",e); } } }` NonDurable subscriptionMode Consumption slow message ` import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionMode; import org.apache.pulsar.client.api.SubscriptionType; @Slf4j public class NodeInit1 { public static void main(String[] args) { try { PulsarClient client = PulsarClient.builder() .serviceUrl("pulsar://127.0.0.1:6650") .ioThreads(2) .listenerThreads(10) .build(); Consumer consumer = client.newConsumer(Schema.STRING) .topic("persistent://DTS/hhhh/aa") .subscriptionName("sub") .subscriptionType(SubscriptionType.Shared) .subscriptionMode(SubscriptionMode.NonDurable) .subscribe(); new Thread(() -> { while (true) { try { Message<String> message = consumer.receive(); Thread.sleep(1000); consumer.acknowledge(message); } catch (Exception e) { e.printStackTrace(); } } }).start(); }catch (Exception e){ log.error("",e); } } } ` Next I add some new consumers to my current subscription or another subscription ### What did you expect to see? consumers join normally ### What did you see instead? Join failureļ¼broke appear java.lang.NullPointerException. 10:36:13.035 [pulsar-io-49-8] ERROR org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://DTS/hhhh/aa] Failed to create subscription: sub-1 error: java.util.concurrent.CompletionException: java.lang.NullPointerException After a while `10:11:39.979 [pulsar-stats-updater-50-1] ERROR org.apache.pulsar.broker.service.PulsarStats - Failed to generate namespace stats for namespace DTS/hhhh: null java.lang.NullPointerException: null at org.apache.bookkeeper.mledger.impl.ManagedCursorContainer.removeCursor(ManagedCursorContainer.java:128) ~[classes/:?] at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.deactivateCursor(ManagedLedgerImpl.java:3122) ~[classes/:?] at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.setInactive(ManagedCursorImpl.java:956) ~[classes/:?] at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$checkBackloggedCursors$74(PersistentTopic.java:2063) ~[classes/:?] at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:387) ~[classes/:?] at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159) ~[classes/:?] at org.apache.pulsar.broker.service.persistent.PersistentTopic.checkBackloggedCursors(PersistentTopic.java:2058) ~[classes/:?] at org.apache.pulsar.broker.service.PulsarStats.lambda$null$1(PulsarStats.java:141) ~[classes/:?] at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:387) ~[classes/:?] at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159) ~[classes/:?] at org.apache.pulsar.broker.service.PulsarStats.lambda$null$3(PulsarStats.java:131) ~[classes/:?] at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:387) ~[classes/:?] at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159) ~[classes/:?] at org.apache.pulsar.broker.service.PulsarStats.lambda$updateStats$4(PulsarStats.java:120) ~[classes/:?] at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:387) ~[classes/:?] at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159) ~[classes/:?] at org.apache.pulsar.broker.service.PulsarStats.updateStats(PulsarStats.java:110) ~[classes/:?] at org.apache.pulsar.broker.service.BrokerService.updateRates(BrokerService.java:1370) ~[classes/:?] at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) [classes/:?] at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [bookkeeper-common-4.12.0.jar:4.12.0] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_211] at java.util.concurrent.FutureTask.runAndReset$$$capture(FutureTask.java:308) [?:1.8.0_211] at java.util.concurrent.FutureTask.runAndReset(FutureTask.java) [?:1.8.0_211] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_211] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [?:1.8.0_211] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_211] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_211] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.68.Final.jar:4.1.68.Final] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_211]` ### Anything else? After browsing the source code, here's what went wrong,Each time a new consumer is added, the checkBackloggedCursors is executed org.apache.pulsar.broker.service.persistent.PersistentTopic ` public void checkBackloggedCursors() { // activate caught up cursors which include consumers subscriptions.forEach((subName, subscription) -> { if (!subscription.getConsumers().isEmpty() && subscription.getCursor().getNumberOfEntries() < backloggedCursorThresholdEntries) { subscription.getCursor().setActive(); } else { subscription.getCursor().setInactive(); } }); }` org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl ` public void deactivateCursor(ManagedCursor cursor) { synchronized (activeCursors) { if (activeCursors.get(cursor.getName()) != null) { activeCursors.removeCursor(cursor.getName()); if (!activeCursors.hasDurableCursors()) { // cleanup cache if there is no active subscription entryCache.clear(); } else { // if removed subscription was the slowest subscription : update cursor and let it clear cache: // till new slowest-cursor's read-position discardEntriesFromCache((ManagedCursorImpl) activeCursors.getSlowestReader(), getPreviousPosition((PositionImpl) activeCursors.getSlowestReader().getReadPosition())); } } if (!cursor.isDurable()) { nonDurableActiveCursors.removeCursor(cursor.getName()); } } }` ### Are you willing to submit a PR? - [ ] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
