TakaHiR07 opened a new issue, #24977:
URL: https://github.com/apache/pulsar/issues/24977

   ### Search before reporting
   
   - [x] I searched in the [issues](https://github.com/apache/pulsar/issues) 
and found nothing similar.
   
   
   ### Read release policy
   
   - [x] I understand that [unsupported 
versions](https://pulsar.apache.org/contribute/release-policy/#supported-versions)
 don't get bug fixes. I will attempt to reproduce the issue on a supported 
version of Pulsar client and Pulsar broker.
   
   
   ### User environment
   
   pulsar-version: 3.0.x
   
   ### Issue Description
   
   When restart broker, notice that the subscription numbers of 
tenant-1/namespace-1/__change_events increase unexpectedly.
   
   The situation is almost the same as described in 
https://github.com/apache/pulsar/pull/24658. However, I don't find any log  
that show failed to close reader close. And after dump the broker heap, I make 
sure that the thousands of  readers remained in 
SystemTopicBasedTopicPoliciesService has never been closed since it create 
successful.
   
   
   After diving into the code, I found the issue's root reason is a concurrent 
error, which is introduced by this pr 
https://github.com/apache/pulsar/pull/21231. And this pr introduce two problem.
    
   Problem 1: cleanCacheAndCloseReader() would be executed twice when throw 
exception
   
   In SystemTopicBasedTopicPoliciesService#prepareInitPoliciesCacheAsync, there 
are three steps:1.createReader, 2.initPolicesCache(read message from earliest), 
3.readMorePoliciesAsync(remain a job to Read new policies in background). Once 
exception occur in each of the three steps, we should clean the error reader.
   
   However, when cleanCacheAndCloseReader() execute twice, would exist this 
concurrent error, in the following steps.
   
   `
   Request-1: policyCacheInitMap put future1
   Request-1: create reader1
   Request-1: readerCaches put reader1
   reader1 read error
   Request-1: first time cleanCacheAndCloseReader(), include:
           remove reader1 in readerCaches
           close reader1
           remove future1 in policyCacheInitMap
   
   Request-2: policyCacheInitMap put future2
   Request-1: second time cleanCacheAndCloseReader(), only remove future2 in 
policyCacheInitMap
   Request-2: create reader2
   Request-2: readerCaches put reader2
   
   Request-3: policyCacheInitMap put future3
   Request-3: create reader3
   Request-3: readerCaches put reader3
   `
   
   Now you can see reader2 become orphan and permanently can not be clean. 
Actually, each create process should be corresponding to one time 
cleanCacheAndCloseReader(). 
   
   
   Problem 2: there is recursive update in ConcurrentHashMap. Because in 
policyCacheInitMap.computeIfAbsent(), we put in a future. when this future 
throw exception, it would cleanCacheAndCloseReader() and update 
policyCacheInitMap again. It is possible to update in a same thread.
   
   Actually, it is not appropriate to do complex operation in 
computeIfAbsent(), especially double update operation. 
   
   
   
   I notice that https://github.com/apache/pulsar/pull/24658 is relevant to the 
problem 1, but it make the logic more complex, and it still exist the situation 
like request-3 modify the resource created in request-2. And it seems still not 
fix the case: if reader close occur exception in cleanCacheAndCloseReader, the 
reader still is orphan.
   
   https://github.com/apache/pulsar/pull/24939 is relevant to problem 2, but it 
just use different thread to execute cleanCacheAndCloseReader() trigger in one 
area. readMorePoliciesAsync() would also trigger cleanCacheAndCloseReader(), 
although it is not execute in the same thread. Whatever, I think it is not 
appropriate to keep the double update code in concurrentHashmap.
   
   ### Error messages
   
   ```text
   
   ```
   
   ### Reproducing the issue
   
   restart broker. may occur 
   
   ### Additional information
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [x] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to