Demogorgon314 opened a new pull request #14283:
URL: https://github.com/apache/pulsar/pull/14283


   Fixes #14110 #14096 #14219
   
   ### Motivation
   Currently, the metadata cache refresh has race condition.
   
   The main reason is `MetadataCacheImpl#refresh` is not atomic operations, and 
it will block the async method.
   
   ```java
   // 1. Caffeine.asyncLoad already begin and reads an old value but is not 
done yet.
   if (objCache.getIfPresent(path) != null) {
     objCache.synchronous().invalidate(path);
     // 2. Caffeine.asyncLoad done, the old value will store to cache.
     objCache.synchronous().refresh(path);
   }
   ```
   
   
   
   I added some log in MetadataCacheImpl in order to observe:
   
   ```java
   @Override
   public CompletableFuture<Optional<CacheGetResult<T>>> asyncLoad(String key, 
Executor executor) {
     log.info("Async load for objCache {}", key);
     CompletableFuture<Optional<CacheGetResult<T>>> future = 
readValueFromStore(key);
     future.whenComplete((op, ex) -> {
       if (ex == null){
         op.ifPresent(tCacheGetResult -> log.info("Async load for objCache 
complete {} - {}", key, tCacheGetResult));
       }
     });
     return future;
   }
   ---------------------
   @Override
   public void refresh(String path) {
     log.info("1. Refresh path {}", path);
     // Refresh object of path if only it is cached before.
     if (objCache.getIfPresent(path) != null) {
       objCache.synchronous().invalidate(path);
       objCache.synchronous().refresh(path);
       log.info("2. Refresh path complete {}:{}", path, get(path).join().get());
     } else {
       log.info("0. Skip Refresh path {}", path);
     }
   }
   ```
   
   
   
   *Cache inconsistency*, you can see the async load for objCache complete in 
mid of refresh :
   
   ```shell
   2022-02-15T08:33:06,669+0800 INFO  [main] o.a.p.m.c.i.MetadataCacheImpl@272 
- 1. Refresh path /key-33552641536875 {}
   2022-02-15T08:33:06,669+0800 INFO  [main] o.a.p.m.c.i.MetadataCacheImpl@279 
- 0. Skip Refresh path /key-33552641536875 {}
   2022-02-15T08:33:06,669+0800 INFO  [main] o.a.p.m.c.i.MetadataCacheImpl@80 - 
Async load for objCache /key-33552641536875 {}
   2022-02-15T08:33:06,669+0800 INFO  [main] o.a.p.m.c.i.MetadataCacheImpl@84 - 
Async load for objCache complete /key-33552641536875 - 
CacheGetResult(value=MetadataCacheTest.MyClass(a=a, b=1), 
stat=Stat(path=/key-33552641536875, version=0, creationTimestamp=1644885186668, 
modificationTimestamp=1644885186668, ephemeral=false, createdBySelf=true)) {}
   2022-02-15T08:33:06,669+0800 INFO  [metadata-store-2437-1] 
o.a.p.m.c.i.MetadataCacheImpl@272 - 1. Refresh path /key-33552641536875 {}
   2022-02-15T08:33:06,669+0800 INFO  [metadata-store-2437-1] 
o.a.p.m.c.i.MetadataCacheImpl@80 - Async load for objCache /key-33552641536875 
{}
   2022-02-15T08:33:06,670+0800 INFO  [main] o.a.p.m.c.i.MetadataCacheImpl@272 
- 1. Refresh path /key-33552641536875 {}
   2022-02-15T08:33:06,670+0800 INFO  [main] o.a.p.m.c.i.MetadataCacheImpl@279 
- 0. Skip Refresh path /key-33552641536875 {}
   2022-02-15T08:33:06,670+0800 INFO  [main] o.a.p.m.c.i.MetadataCacheImpl@272 
- 1. Refresh path /key-33552641536875 {}
   2022-02-15T08:33:06,670+0800 INFO  [main] o.a.p.m.c.i.MetadataCacheImpl@279 
- 0. Skip Refresh path /key-33552641536875 {}
   2022-02-15T08:33:06,671+0800 INFO  [metadata-store-2437-1] 
o.a.p.m.c.i.MetadataCacheImpl@84 - Async load for objCache complete 
/key-33552641536875 - CacheGetResult(value=MetadataCacheTest.MyClass(a=a, b=1), 
stat=Stat(path=/key-33552641536875, version=0, creationTimestamp=1644885186668, 
modificationTimestamp=1644885186668, ephemeral=false, createdBySelf=true)) {}
   2022-02-15T08:33:06,671+0800 INFO  [metadata-store-2437-1] 
o.a.p.m.c.i.MetadataCacheImpl@277 - 2. Refresh path complete 
/key-33552641536875:MetadataCacheTest.MyClass(a=a, b=1) {}
   2022-02-15T08:33:06,671+0800 INFO  [metadata-store-2437-1] 
o.a.p.m.c.i.MetadataCacheImpl@272 - 1. Refresh path /key-33552641536875 {}
   2022-02-15T08:33:06,671+0800 INFO  [metadata-store-2437-1] 
o.a.p.m.c.i.MetadataCacheImpl@80 - Async load for objCache /key-33552641536875 
{}
   2022-02-15T08:33:06,671+0800 INFO  [metadata-store-2437-1] 
o.a.p.m.c.i.MetadataCacheImpl@84 - Async load for objCache complete 
/key-33552641536875 - CacheGetResult(value=MetadataCacheTest.MyClass(a=a, b=2), 
stat=Stat(path=/key-33552641536875, version=1, creationTimestamp=1644885186668, 
modificationTimestamp=1644885186669, ephemeral=false, createdBySelf=true)) {}
   2022-02-15T08:33:06,671+0800 INFO  [metadata-store-2437-1] 
o.a.p.m.c.i.MetadataCacheImpl@277 - 2. Refresh path complete 
/key-33552641536875:MetadataCacheTest.MyClass(a=a, b=2) {}
   
   ```
   
   
   
   *Correct* log:
   
   ```shell
   2022-02-15T08:33:06,659+0800 INFO  [main] o.a.p.m.c.i.MetadataCacheImpl@272 
- 1. Refresh path /key-33552632155000 {}
   2022-02-15T08:33:06,659+0800 INFO  [main] o.a.p.m.c.i.MetadataCacheImpl@279 
- 0. Skip Refresh path /key-33552632155000 {}
   2022-02-15T08:33:06,659+0800 INFO  [main] o.a.p.m.c.i.MetadataCacheImpl@80 - 
Async load for objCache /key-33552632155000 {}
   2022-02-15T08:33:06,659+0800 INFO  [main] o.a.p.m.c.i.MetadataCacheImpl@84 - 
Async load for objCache complete /key-33552632155000 - 
CacheGetResult(value=MetadataCacheTest.MyClass(a=a, b=1), 
stat=Stat(path=/key-33552632155000, version=0, creationTimestamp=1644885186659, 
modificationTimestamp=1644885186659, ephemeral=false, createdBySelf=true)) {}
   2022-02-15T08:33:06,661+0800 INFO  [main] o.a.p.m.c.i.MetadataCacheImpl@272 
- 1. Refresh path /key-33552632155000 {}
   2022-02-15T08:33:06,661+0800 INFO  [metadata-store-2436-1] 
o.a.p.m.c.i.MetadataCacheImpl@272 - 1. Refresh path /key-33552632155000 {}
   2022-02-15T08:33:06,661+0800 INFO  [main] o.a.p.m.c.i.MetadataCacheImpl@80 - 
Async load for objCache /key-33552632155000 {}
   2022-02-15T08:33:06,661+0800 INFO  [metadata-store-2436-1] 
o.a.p.m.c.i.MetadataCacheImpl@279 - 0. Skip Refresh path /key-33552632155000 {}
   2022-02-15T08:33:06,661+0800 INFO  [main] o.a.p.m.c.i.MetadataCacheImpl@84 - 
Async load for objCache complete /key-33552632155000 - 
CacheGetResult(value=MetadataCacheTest.MyClass(a=a, b=2), 
stat=Stat(path=/key-33552632155000, version=1, creationTimestamp=1644885186659, 
modificationTimestamp=1644885186659, ephemeral=false, createdBySelf=true)) {}
   2022-02-15T08:33:06,661+0800 INFO  [main] o.a.p.m.c.i.MetadataCacheImpl@277 
- 2. Refresh path complete /key-33552632155000:MetadataCacheTest.MyClass(a=a, 
b=2) {}
   2022-02-15T08:33:06,661+0800 INFO  [main] o.a.p.m.c.i.MetadataCacheImpl@272 
- 1. Refresh path /key-33552632155000 {}
   2022-02-15T08:33:06,661+0800 INFO  [main] o.a.p.m.c.i.MetadataCacheImpl@80 - 
Async load for objCache /key-33552632155000 {}
   2022-02-15T08:33:06,661+0800 INFO  [main] o.a.p.m.c.i.MetadataCacheImpl@84 - 
Async load for objCache complete /key-33552632155000 - 
CacheGetResult(value=MetadataCacheTest.MyClass(a=a, b=2), 
stat=Stat(path=/key-33552632155000, version=1, creationTimestamp=1644885186659, 
modificationTimestamp=1644885186659, ephemeral=false, createdBySelf=true)) {}
   2022-02-15T08:33:06,661+0800 INFO  [main] o.a.p.m.c.i.MetadataCacheImpl@277 
- 2. Refresh path complete /key-33552632155000:MetadataCacheTest.MyClass(a=a, 
b=2) {}
   2022-02-15T08:33:06,662+0800 INFO  [main] o.a.p.m.c.i.MetadataCacheImpl@80 - 
Async load for objCache /key-33552635148375 {}
   ```
   
   ### Modifications
   
   Make metadata cache refresh async.
   
   
   ### Verifying this change
   
   - [x] Make sure that the change passes the CI checks.
   
   ### Documentation
   
   Need to update docs? 
   
   - [x] `no-need-doc` 
     
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to