shibd opened a new pull request #13725: URL: https://github.com/apache/pulsar/pull/13725
### Motivation #13663 Flaky-test: org.apache.pulsar.metadata.LockManagerTest.updateValue The root cause it that `MetadataCacheImpl#refresh` method is not safely accessed by multiple concurrent threads. https://github.com/apache/pulsar/blob/2a7515f9593a76b294bfe2835621a0ab8a904957/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java#L262-L269 The `AbstractMetadaStore.put` method will have two threads `refresh` the cache in parallel. 1. (`thread 1`)Callback method of `storePut` returned: https://github.com/apache/pulsar/blob/2a7515f9593a76b294bfe2835621a0ab8a904957/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java#L279-L281 2. (`thread 2`)Notification implement in `storePut` internal(ZK, RocksDB, LocalMemory): https://github.com/apache/pulsar/blob/2a7515f9593a76b294bfe2835621a0ab8a904957/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java#L183-L193 We can only get the execution result of thread 1 on the client, and can't wait for thread 2. When thread 2 has not finished refreshing the last time, At this point, if the update starts again, the old value may be returned. Reference by `caffeine` note **LoadingCache#refresh** ``` /** * Loads a new value for the {@code key}, asynchronously. While the new value is loading the * previous value (if any) will continue to be returned by {@code get(key)} unless it is evicted. * If the new value is loaded successfully it will replace the previous value in the cache; if an * exception is thrown while refreshing the previous value will remain, <i>and the exception will * be logged (using {@link java.util.logging.Logger}) and swallowed</i>. * <p> * Caches loaded by a {@link CacheLoader} will call {@link CacheLoader#reload} if the cache * currently contains a value for the {@code key}, and {@link CacheLoader#load} otherwise. Loading * is asynchronous by delegating to the default executor. * * @param key key with which a value may be associated * @throws NullPointerException if the specified key is null */ void refresh(@NonNull K key); ``` **Cache#invalidate** ``` /** * Discards any cached value for the {@code key}. The behavior of this operation is undefined for * an entry that is being loaded (or reloaded) and is otherwise not present. * * @param key the key whose mapping is to be removed from the cache * @throws NullPointerException if the specified key is null */ void invalidate(@NonNull @CompatibleWith("K") Object key); ``` In this unit test, the first execution of the following method will trigger the update cache, and thread 2 may not complete the update all the time. https://github.com/apache/pulsar/blob/2a7515f9593a76b294bfe2835621a0ab8a904957/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LockManagerTest.java#L142-L144 When the value is updated again, it is possible that the cache update did not succeed.So you may get the last cached value. May be getValue is equals "locak-1" https://github.com/apache/pulsar/blob/2a7515f9593a76b294bfe2835621a0ab8a904957/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LockManagerTest.java#L146-L148 I solved it directly with synchronous lock. After many tests, the problem no longer appears. If there is a better implementation, it can be discussed. Thank you~ ### Modifications - add synchronized to `MetadataCacheImpl#refresh`、`MetadataCacheImpl#invalidate`、`MetadataCacheImpl#invalidateAll` ### Documentation - [x] `no-need-doc` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
