jdfrozen opened a new issue, #21903:
URL: https://github.com/apache/pulsar/issues/21903

   ### Search before asking
   
   - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) 
and found nothing similar.
   
   
   ### Version
   
   2.7.4
   
   ### Minimal reproduce step
   
   producer Send a lot of messages
   `
   import lombok.extern.slf4j.Slf4j;
   import org.apache.pulsar.client.api.Producer;
   import org.apache.pulsar.client.api.PulsarClient;
   import org.apache.pulsar.client.api.Schema;
   import org.apache.pulsar.client.api.TypedMessageBuilder;
   import org.springframework.stereotype.Component;
   
   @Component
   @Slf4j
   public class ProductInit {
       public static void main(String[] args) {
           try {
               PulsarClient client = PulsarClient.builder()
                   .serviceUrl("pulsar://127.0.0.1:6650")
                   .ioThreads(2)
                   .listenerThreads(10)
                   .build();
               Producer<String> producer = client.newProducer(Schema.STRING)
                   .topic("persistent://DTS/hhhh/aa")
                   .enableBatching(false)
                   .create();
               new Thread(() -> {
                   for(int i=0;i<100000;i++) {
                       try {
                           String msg = String.valueOf(i);
                           TypedMessageBuilder typedMessageBuilder = 
producer.newMessage().value(msg);
                           typedMessageBuilder.send();
                           Thread.sleep(1);
                       } catch (Exception e) {
                           e.printStackTrace();
                       }
                   }
               }).start();
           }catch (Exception e){
               log.error("",e);
           }
       }
   
   }`
   
   NonDurable subscriptionMode Consumption slow message
   `
   import lombok.extern.slf4j.Slf4j;
   import org.apache.pulsar.client.api.Consumer;
   import org.apache.pulsar.client.api.Message;
   import org.apache.pulsar.client.api.PulsarClient;
   import org.apache.pulsar.client.api.Schema;
   import org.apache.pulsar.client.api.SubscriptionMode;
   import org.apache.pulsar.client.api.SubscriptionType;
   
   @Slf4j
   public class NodeInit1 {
       public static void main(String[] args) {
           try {
               PulsarClient client = PulsarClient.builder()
                   .serviceUrl("pulsar://127.0.0.1:6650")
                   .ioThreads(2)
                   .listenerThreads(10)
                   .build();
               Consumer consumer =  client.newConsumer(Schema.STRING)
                   .topic("persistent://DTS/hhhh/aa")
                   .subscriptionName("sub")
                   .subscriptionType(SubscriptionType.Shared)
                   .subscriptionMode(SubscriptionMode.NonDurable)
                   .subscribe();
               new Thread(() -> {
                   while (true) {
                       try {
                           Message<String> message = consumer.receive();
                           Thread.sleep(1000);
                           consumer.acknowledge(message);
                       } catch (Exception e) {
                           e.printStackTrace();
                       }
                   }
               }).start();
           }catch (Exception e){
               log.error("",e);
           }
       }
   }
   `
   Next I add some new consumers to my current subscription or another 
subscription
   
   
   ### What did you expect to see?
   
   consumers join normally
   
   ### What did you see instead?
   
   Join failure,broke appear java.lang.NullPointerException.
   10:36:13.035 [pulsar-io-49-8] ERROR 
org.apache.pulsar.broker.service.persistent.PersistentTopic - 
[persistent://DTS/hhhh/aa] Failed to create subscription: sub-1 error: 
java.util.concurrent.CompletionException: java.lang.NullPointerException
   
   
   After a while
   `10:11:39.979 [pulsar-stats-updater-50-1] ERROR 
org.apache.pulsar.broker.service.PulsarStats - Failed to generate namespace 
stats for namespace DTS/hhhh: null
   java.lang.NullPointerException: null
        at 
org.apache.bookkeeper.mledger.impl.ManagedCursorContainer.removeCursor(ManagedCursorContainer.java:128)
 ~[classes/:?]
        at 
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.deactivateCursor(ManagedLedgerImpl.java:3122)
 ~[classes/:?]
        at 
org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.setInactive(ManagedCursorImpl.java:956)
 ~[classes/:?]
        at 
org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$checkBackloggedCursors$74(PersistentTopic.java:2063)
 ~[classes/:?]
        at 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:387)
 ~[classes/:?]
        at 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
 ~[classes/:?]
        at 
org.apache.pulsar.broker.service.persistent.PersistentTopic.checkBackloggedCursors(PersistentTopic.java:2058)
 ~[classes/:?]
        at 
org.apache.pulsar.broker.service.PulsarStats.lambda$null$1(PulsarStats.java:141)
 ~[classes/:?]
        at 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:387)
 ~[classes/:?]
        at 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
 ~[classes/:?]
        at 
org.apache.pulsar.broker.service.PulsarStats.lambda$null$3(PulsarStats.java:131)
 ~[classes/:?]
        at 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:387)
 ~[classes/:?]
        at 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
 ~[classes/:?]
        at 
org.apache.pulsar.broker.service.PulsarStats.lambda$updateStats$4(PulsarStats.java:120)
 ~[classes/:?]
        at 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:387)
 ~[classes/:?]
        at 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
 ~[classes/:?]
        at 
org.apache.pulsar.broker.service.PulsarStats.updateStats(PulsarStats.java:110) 
~[classes/:?]
        at 
org.apache.pulsar.broker.service.BrokerService.updateRates(BrokerService.java:1370)
 ~[classes/:?]
        at 
org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) 
[classes/:?]
        at 
org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) 
[bookkeeper-common-4.12.0.jar:4.12.0]
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[?:1.8.0_211]
        at 
java.util.concurrent.FutureTask.runAndReset$$$capture(FutureTask.java:308) 
[?:1.8.0_211]
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java) 
[?:1.8.0_211]
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
 [?:1.8.0_211]
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
 [?:1.8.0_211]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_211]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_211]
        at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
 [netty-common-4.1.68.Final.jar:4.1.68.Final]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_211]`
   
   ### Anything else?
   
   After browsing the source code, here's what went wrong,Each time a new 
consumer is added, the checkBackloggedCursors is executed
   
   org.apache.pulsar.broker.service.persistent.PersistentTopic
   `    public void checkBackloggedCursors() {
           // activate caught up cursors which include consumers
           subscriptions.forEach((subName, subscription) -> {
               if (!subscription.getConsumers().isEmpty()
                   && subscription.getCursor().getNumberOfEntries() < 
backloggedCursorThresholdEntries) {
                   subscription.getCursor().setActive();
               } else {
                   subscription.getCursor().setInactive();
               }
           });
       }`
   
   org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl
   `    public void deactivateCursor(ManagedCursor cursor) {
           synchronized (activeCursors) {
               if (activeCursors.get(cursor.getName()) != null) {
                   activeCursors.removeCursor(cursor.getName());
                   if (!activeCursors.hasDurableCursors()) {
                       // cleanup cache if there is no active subscription
                       entryCache.clear();
                   } else {
                       // if removed subscription was the slowest subscription 
: update cursor and let it clear cache:
                       // till new slowest-cursor's read-position
                       discardEntriesFromCache((ManagedCursorImpl) 
activeCursors.getSlowestReader(),
                               getPreviousPosition((PositionImpl) 
activeCursors.getSlowestReader().getReadPosition()));
                   }
               }
               if (!cursor.isDurable()) {
                   nonDurableActiveCursors.removeCursor(cursor.getName());
               }
           }
       }`
   
   
   ### Are you willing to submit a PR?
   
   - [ ] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to