poorbarcode opened a new pull request, #17526:
URL: https://github.com/apache/pulsar/pull/17526

   ### Motivation
   
   With the transaction feature, we send and receive messages, and at the same 
time, execute `admin API: unload namespace` 1000 times. Then the problem occur: 
Consumer could not receive any message, and there has no error log. After that 
we tried `admin API: get topic stats`, and the response showed only producers 
are registered on topic, and no consumers are registered on topic, but consumer 
stat is `Ready` in the client. This means that the state of the consumer is 
inconsistent between the broker and the client.
   
   #### Location problem
   
   Then we found the problem: Two PersistentTopic which have the same name 
registered at a broker node, consumer registered on one (aka `topic-c`), and 
producer registered on another one (aka `topic-p`). At this time, when we send 
messages, the data flow like this : 
   
   ```text
   client: producer sends a message
   
   broker: handle cmd-send
   
   broker: find the topic by name, it is "topic-p"
   
   broker: find all subscriptions registered on "topic-p"
   
   broker: found one subscription, but it has no consumers registered
   
   broker: no need to send the message to the client
   ``` 
   
   But the consumer exactly registered on another topic: `topic-c`, so consumer 
could not receive any message.
   
   #### Repreduce
   
   > *How to reproduce two topics registered at the same broker node ?*
   
   Make  `transaction buffer recover`, `admin unload namespace`, `client create 
consumer`, ` client create producer` executed at the same time, the process 
flow like this (at the step-11, the problem occurs ):
   
   | Time | `transaction buffer recoverr` | `admin unload namespace` | `client 
create consumer` | `client create producer` |
   | ----------- | ----------- | ----------- | ----------- | ----------- |
   | 1 | TB recover |  |  |  |
   | 2 | TB recover failure | topic.unload |  |  |
   | 3 | topic.close(false) | topic.close(true) |  |  |
   | 4 | brokerService.topics.remove(topicName) |  |  |  |
   | 5 | remove topic finish |  | lookup |  |
   | 6 |  |  | create `topic-c` |  |
   | 7 |  |  | consumer registered on `topic-c` |  |
   | 8 |  | brokerService.topics.remove(topic) |  |  |
   | 9 |  | remove `topic-c` finish |  | lookup |
   | 10 |  |  |  | create `topic-p`  |
   | 11 |  |  |  | producer registered on `topic-p`  |
   
   - Each column means the individual process. e.g. `client create consumer`,  
`client create producer`.
   - Multiple processes are going on at the same time, and all effet the 
`brokerService.topics`.
   - Column `Time` is used only to indicate the order of each step, not the 
actual time.
   - The important steps are explained below:
   
   > step 3
   
   Even if persistent topic property`isClosingOrDeleting` have already changed 
to `true`, it still can be executed another once, see line-1247:
   
   
https://github.com/apache/pulsar/blob/f230d15ffcd5f74cca13bd23b35ace784d6f8ce6/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L1240-L1249
 
   
   Whether close can be executed depends on two predicates: `is closing` or 
`@param closeWithoutWaitingClientDisconnect is true`. This means that method 
`topic.close` can be reentrant executed when `@param 
closeWithoutWaitingClientDisconnect` is true, and in the implementation of 
`admin API: unload namespace` the parameter 
`closeWithoutWaitingClientDisconnect` is exactly `true`.
   
   
https://github.com/apache/pulsar/blob/f230d15ffcd5f74cca13bd23b35ace784d6f8ce6/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java#L723-L725
   
   So when `transaction buffer recover fail` and `admin unload namespace` is 
executed at the same time, and `transaction buffer recover fail` before `admin 
unload namespace`, the topic will be removed from `brokerService.topics` twice.
   
   > step-4 / step-8
   
   Because of the current implementation of `BrokerService. 
removeTopicFromCache` use cmd `map.remove(key)`, not use `map.remove(key, 
value)`, So this cmd can remove any value in the map, even if it's not the 
desired one.
   
   
https://github.com/apache/pulsar/blob/f230d15ffcd5f74cca13bd23b35ace784d6f8ce6/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java#L1956
   
   To sum up: We should make these two changes: 
   
   - Make method `topic.close` non-reentrant. Also prevent reentrant between 
`topic.close` and `topic.delete`.
   - Use cmd  `map.remove(key, value)` instead of `map.remove(key)` in 
implementation of `BrokerService. removeTopicFromCache`. This change will apply 
to both scenes: `topic.close` and `topic.delete`.
   
   ### Modifications
   
   - Make method `topic.close` non-reentrant. Also prevent reentrant between 
`topic.close` and `topic.delete`.
     - fixed by PR #17524
   - Use cmd  `map.remove(key, value)` instead of `map.remove(key)` in 
implementation of `BrokerService. 
     - fixed by current PR 
   
   
   ### Documentation
   
   - [ ] `doc-required` 
     
   - [ ] `doc-not-needed` 
     
   - [ ] `doc` 
   
   - [ ] `doc-complete`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to