massakam opened a new pull request #5185: [pulsar-broker] Fix bug that message delivery stops after resetting cursor for failover subscription URL: https://github.com/apache/pulsar/pull/5185 ### Motivation Resetting the cursor for a subscription in Failover mode may cause message delivery to stop. This can be reproduced with the following procedure: 1. Connect multiple consumers to a subscription in Failover mode 1. Reset the subscription cursor to a past position 1. Close some consumers 1. The remaining consumers may not receive new messages from the topic At this time, the active consumer is already closed one: ```js "subscriptions" : { "sub1" : { "msgRateOut" : 0.0, "msgThroughputOut" : 0.0, "msgRateRedeliver" : 0.0, "msgBacklog" : 57604, "blockedSubscriptionOnUnackedMsgs" : false, "unackedMessages" : 0, "type" : "Failover", "activeConsumerName" : "04b6c", // This consumer is already closed! "msgRateExpired" : 0.0, "consumers" : [ { "msgRateOut" : 0.0, "msgThroughputOut" : 0.0, "msgRateRedeliver" : 0.0, "consumerName" : "06317b", "availablePermits" : 564, "unackedMessages" : 0, "blockedConsumerOnUnackedMsgs" : false, "metadata" : { }, "connectedSince" : "2019-09-11T18:56:25.413+09:00", "clientVersion" : "2.3.2", "address" : "/xxx.xxx.xxx.xxx:36968" }, { "msgRateOut" : 0.0, "msgThroughputOut" : 0.0, "msgRateRedeliver" : 0.0, "consumerName" : "37edc", "availablePermits" : 1000, "unackedMessages" : 0, "blockedConsumerOnUnackedMsgs" : false, "metadata" : { }, "connectedSince" : "2019-09-11T18:56:27.77+09:00", "clientVersion" : "2.3.2", "address" : "/xxx.xxx.xxx.xxx:38392" }, { "msgRateOut" : 0.0, "msgThroughputOut" : 0.0, "msgRateRedeliver" : 0.0, "consumerName" : "822f0", "availablePermits" : 1000, "unackedMessages" : 0, "blockedConsumerOnUnackedMsgs" : false, "metadata" : { }, "connectedSince" : "2019-09-11T18:56:27.769+09:00", "clientVersion" : "2.3.2", "address" : "/xxx.xxx.xxx.xxx:38380" }, { "msgRateOut" : 0.0, "msgThroughputOut" : 0.0, "msgRateRedeliver" : 0.0, "consumerName" : "b91282", "availablePermits" : 1000, "unackedMessages" : 0, "blockedConsumerOnUnackedMsgs" : false, "metadata" : { }, "connectedSince" : "2019-09-11T18:56:25.413+09:00", "clientVersion" : "2.3.2", "address" : "/xxx.xxx.xxx.xxx:38408" } ] } }, ``` This is because `AbstractDispatcherSingleActiveConsumer#closeFuture` is not null, so `pickAndScheduleActiveConsumer()` is not called and the active consumer does not change. https://github.com/apache/pulsar/blob/8c3445ad6746df93fef80d2c661374cdab00bc38/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java#L181-L184 `closeFuture` becomes non-null when `disconnectAllConsumers()` is called. And once a value is assigned, it will never return to null. https://github.com/apache/pulsar/blob/8c3445ad6746df93fef80d2c661374cdab00bc38/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java#L217-L218 `disconnectAllConsumers()` is called when unloading or deleting a topic, as well as when resetting the cursor. ### Modifications Added `resetCloseFuture()` method to the Dispatcher classes to return `closeFuture` to null when resetting cursor is completed.
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services