poorbarcode opened a new pull request, #17526: URL: https://github.com/apache/pulsar/pull/17526
### Motivation With the transaction feature, we send and receive messages, and at the same time, execute `admin API: unload namespace` 1000 times. Then the problem occur: Consumer could not receive any message, and there has no error log. After that we tried `admin API: get topic stats`, and the response showed only producers are registered on topic, and no consumers are registered on topic, but consumer stat is `Ready` in the client. This means that the state of the consumer is inconsistent between the broker and the client. #### Location problem Then we found the problem: Two PersistentTopic which have the same name registered at a broker node, consumer registered on one (aka `topic-c`), and producer registered on another one (aka `topic-p`). At this time, when we send messages, the data flow like this : ```text client: producer sends a message broker: handle cmd-send broker: find the topic by name, it is "topic-p" broker: find all subscriptions registered on "topic-p" broker: found one subscription, but it has no consumers registered broker: no need to send the message to the client ``` But the consumer exactly registered on another topic: `topic-c`, so consumer could not receive any message. #### Repreduce > *How to reproduce two topics registered at the same broker node ?* Make `transaction buffer recover`, `admin unload namespace`, `client create consumer`, ` client create producer` executed at the same time, the process flow like this (at the step-11, the problem occurs ): | Time | `transaction buffer recoverr` | `admin unload namespace` | `client create consumer` | `client create producer` | | ----------- | ----------- | ----------- | ----------- | ----------- | | 1 | TB recover | | | | | 2 | TB recover failure | topic.unload | | | | 3 | topic.close(false) | topic.close(true) | | | | 4 | brokerService.topics.remove(topicName) | | | | | 5 | remove topic finish | | lookup | | | 6 | | | create `topic-c` | | | 7 | | | consumer registered on `topic-c` | | | 8 | | brokerService.topics.remove(topic) | | | | 9 | | remove `topic-c` finish | | lookup | | 10 | | | | create `topic-p` | | 11 | | | | producer registered on `topic-p` | - Each column means the individual process. e.g. `client create consumer`, `client create producer`. - Multiple processes are going on at the same time, and all effet the `brokerService.topics`. - Column `Time` is used only to indicate the order of each step, not the actual time. - The important steps are explained below: > step 3 Even if persistent topic property`isClosingOrDeleting` have already changed to `true`, it still can be executed another once, see line-1247: https://github.com/apache/pulsar/blob/f230d15ffcd5f74cca13bd23b35ace784d6f8ce6/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L1240-L1249 Whether close can be executed depends on two predicates: `is closing` or `@param closeWithoutWaitingClientDisconnect is true`. This means that method `topic.close` can be reentrant executed when `@param closeWithoutWaitingClientDisconnect` is true, and in the implementation of `admin API: unload namespace` the parameter `closeWithoutWaitingClientDisconnect` is exactly `true`. https://github.com/apache/pulsar/blob/f230d15ffcd5f74cca13bd23b35ace784d6f8ce6/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java#L723-L725 So when `transaction buffer recover fail` and `admin unload namespace` is executed at the same time, and `transaction buffer recover fail` before `admin unload namespace`, the topic will be removed from `brokerService.topics` twice. > step-4 / step-8 Because of the current implementation of `BrokerService. removeTopicFromCache` use cmd `map.remove(key)`, not use `map.remove(key, value)`, So this cmd can remove any value in the map, even if it's not the desired one. https://github.com/apache/pulsar/blob/f230d15ffcd5f74cca13bd23b35ace784d6f8ce6/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java#L1956 To sum up: We should make these two changes: - Make method `topic.close` non-reentrant. Also prevent reentrant between `topic.close` and `topic.delete`. - Use cmd `map.remove(key, value)` instead of `map.remove(key)` in implementation of `BrokerService. removeTopicFromCache`. This change will apply to both scenes: `topic.close` and `topic.delete`. ### Modifications - Make method `topic.close` non-reentrant. Also prevent reentrant between `topic.close` and `topic.delete`. - fixed by PR #17524 - Use cmd `map.remove(key, value)` instead of `map.remove(key)` in implementation of `BrokerService. - fixed by current PR ### Documentation - [ ] `doc-required` - [ ] `doc-not-needed` - [ ] `doc` - [ ] `doc-complete` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org