Shawyeok commented on issue #14779:
URL: https://github.com/apache/pulsar/issues/14779#issuecomment-1554473113

   My hypothesis is **true**, here is the steps to reproduce this problem in a 
multi nodes cluster:
   1. Create a topic and identify its owning broker, referred to as 'Node1'.
   2. Delete the topic and unload the associated bundle, resulting in the 
bundle being relocated to another node, denoted as 'Node2'.
   3. Recreate the topic, and initiate a consumer associated with it. At this 
point, the consumer will establish a connection with 'Node2'.
   4. Unload the topic's bundle once more, triggering a return to 'Node1'. 
Concurrently, the consumer will disconnect from 'Node2' and establish a new 
connection with 'Node1', generating a 'BadVersion' error at this stage.
   
   The root cause is in step 2. Broker didn't invalidate childrenCache in 
`AbstractMetadataStore`, `childrenCache` is a Caffeine cache with 
`refreshAfterWrite` policy (`refreshAfterWrite` will return old value in the 
first stale request). Hence in step 4, `store.getCursors` will return an empty 
list even there is a cursor in zk.
   
   The fix is quiet straightforward, just invalidate childrenCache in 
`org.apache.pulsar.metadata.impl.AbstractMetadataStore#delete`
   ```diff
   Index: 
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
   IDEA additional info:
   Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
   <+>UTF-8
   ===================================================================
   diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
   --- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
       (revision 850389a511e5ec86b772d5b501c0e96708901310)
   +++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
       (date 1684490211421)
   @@ -191,6 +191,7 @@
            return storeDelete(path, expectedVersion)
                    .thenRun(() -> {
                        existsCache.synchronous().invalidate(path);
   +                    childrenCache.synchronous().invalidate(path);
                        String parent = parent(path);
                        if (parent != null) {
                            childrenCache.synchronous().invalidate(parent);
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to