massakam opened a new pull request #5185: [pulsar-broker] Fix bug that message 
delivery stops after resetting cursor for failover subscription
URL: https://github.com/apache/pulsar/pull/5185
 
 
   ### Motivation
   
   Resetting the cursor for a subscription in Failover mode may cause message 
delivery to stop. This can be reproduced with the following procedure:
   
   1. Connect multiple consumers to a subscription in Failover mode
   1. Reset the subscription cursor to a past position
   1. Close some consumers
   1. The remaining consumers may not receive new messages from the topic
   
   At this time, the active consumer is already closed one:
   
   ```js
   "subscriptions" : {
     "sub1" : {
       "msgRateOut" : 0.0,
       "msgThroughputOut" : 0.0,
       "msgRateRedeliver" : 0.0,
       "msgBacklog" : 57604,
       "blockedSubscriptionOnUnackedMsgs" : false,
       "unackedMessages" : 0,
       "type" : "Failover",
       "activeConsumerName" : "04b6c", // This consumer is already closed!
       "msgRateExpired" : 0.0,
       "consumers" : [ {
         "msgRateOut" : 0.0,
         "msgThroughputOut" : 0.0,
         "msgRateRedeliver" : 0.0,
         "consumerName" : "06317b",
         "availablePermits" : 564,
         "unackedMessages" : 0,
         "blockedConsumerOnUnackedMsgs" : false,
         "metadata" : { },
         "connectedSince" : "2019-09-11T18:56:25.413+09:00",
         "clientVersion" : "2.3.2",
         "address" : "/xxx.xxx.xxx.xxx:36968"
       }, {
         "msgRateOut" : 0.0,
         "msgThroughputOut" : 0.0,
         "msgRateRedeliver" : 0.0,
         "consumerName" : "37edc",
         "availablePermits" : 1000,
         "unackedMessages" : 0,
         "blockedConsumerOnUnackedMsgs" : false,
         "metadata" : { },
         "connectedSince" : "2019-09-11T18:56:27.77+09:00",
         "clientVersion" : "2.3.2",
         "address" : "/xxx.xxx.xxx.xxx:38392"
       }, {
         "msgRateOut" : 0.0,
         "msgThroughputOut" : 0.0,
         "msgRateRedeliver" : 0.0,
         "consumerName" : "822f0",
         "availablePermits" : 1000,
         "unackedMessages" : 0,
         "blockedConsumerOnUnackedMsgs" : false,
         "metadata" : { },
         "connectedSince" : "2019-09-11T18:56:27.769+09:00",
         "clientVersion" : "2.3.2",
         "address" : "/xxx.xxx.xxx.xxx:38380"
       }, {
         "msgRateOut" : 0.0,
         "msgThroughputOut" : 0.0,
         "msgRateRedeliver" : 0.0,
         "consumerName" : "b91282",
         "availablePermits" : 1000,
         "unackedMessages" : 0,
         "blockedConsumerOnUnackedMsgs" : false,
         "metadata" : { },
         "connectedSince" : "2019-09-11T18:56:25.413+09:00",
         "clientVersion" : "2.3.2",
         "address" : "/xxx.xxx.xxx.xxx:38408"
       } ]
     }
   },
   ```
   
   This is because `AbstractDispatcherSingleActiveConsumer#closeFuture` is not 
null, so `pickAndScheduleActiveConsumer()` is not called and the active 
consumer does not change.
   
https://github.com/apache/pulsar/blob/8c3445ad6746df93fef80d2c661374cdab00bc38/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java#L181-L184
   
   `closeFuture` becomes non-null when `disconnectAllConsumers()` is called. 
And once a value is assigned, it will never return to null.
   
https://github.com/apache/pulsar/blob/8c3445ad6746df93fef80d2c661374cdab00bc38/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java#L217-L218
   
   `disconnectAllConsumers()` is called when unloading or deleting a topic, as 
well as when resetting the cursor.
   
   ### Modifications
   
   Added `resetCloseFuture()` method to the Dispatcher classes to return 
`closeFuture` to null when resetting cursor is completed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to