gaozhangmin commented on pull request #13939:
URL: https://github.com/apache/pulsar/pull/13939#issuecomment-1021800310


   > @gaozhangmin I'm not sure I see the case where the cursor is added 
multiple times:
   > 
   > ```
   > ledger.getScheduledExecutor()
   >                         .schedule(() -> checkForNewEntries(op, callback, 
ctx),
   >                                 config.getNewEntriesCheckDelayInMillis(), 
TimeUnit.MILLISECONDS);
   > ```
   > 
   > This is called only the first time, as a kind of optimization to avoid 
inserting in the queue for cases where consumer and producers are very close. 
After the first time is unsuccessful, then we insert in the queue and wait for 
the notification. There shouldn't be more than 1 entry per cursor in that queue.
   
   @merlimat You can verify by this test
   ```
   @Test
       public void testSub() throws Exception {
           final String topicName = 
"persistent://prop/ns-abc/stuckSubscriptionTopic";
           for (int i=0;i<10000;i++) {
               Consumer<String> consumer1 = 
pulsarClient.newConsumer(Schema.STRING).topic(topicName)
                       
.subscriptionType(SubscriptionType.Failover).subscriptionName("test").subscribe();
               consumer1.close();
           }
           PersistentTopic topicRef = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
           ManagedLedgerImpl ml = (ManagedLedgerImpl)(topicRef.ledger);
           System.out.println(ml.getWaitingCursorsCount());
   
       }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to