Demogorgon314 opened a new pull request #14283:
URL: https://github.com/apache/pulsar/pull/14283
Fixes #14110 #14096 #14219
### Motivation
Currently, the metadata cache refresh has race condition.
The main reason is `MetadataCacheImpl#refresh` is not atomic operations, and
it will block the async method.
```java
// 1. Caffeine.asyncLoad already begin and reads an old value but is not
done yet.
if (objCache.getIfPresent(path) != null) {
objCache.synchronous().invalidate(path);
// 2. Caffeine.asyncLoad done, the old value will store to cache.
objCache.synchronous().refresh(path);
}
```
I added some log in MetadataCacheImpl in order to observe:
```java
@Override
public CompletableFuture<Optional<CacheGetResult<T>>> asyncLoad(String key,
Executor executor) {
log.info("Async load for objCache {}", key);
CompletableFuture<Optional<CacheGetResult<T>>> future =
readValueFromStore(key);
future.whenComplete((op, ex) -> {
if (ex == null){
op.ifPresent(tCacheGetResult -> log.info("Async load for objCache
complete {} - {}", key, tCacheGetResult));
}
});
return future;
}
---------------------
@Override
public void refresh(String path) {
log.info("1. Refresh path {}", path);
// Refresh object of path if only it is cached before.
if (objCache.getIfPresent(path) != null) {
objCache.synchronous().invalidate(path);
objCache.synchronous().refresh(path);
log.info("2. Refresh path complete {}:{}", path, get(path).join().get());
} else {
log.info("0. Skip Refresh path {}", path);
}
}
```
*Cache inconsistency*, you can see the async load for objCache complete in
mid of refresh :
```shell
2022-02-15T08:33:06,669+0800 INFO [main] o.a.p.m.c.i.MetadataCacheImpl@272
- 1. Refresh path /key-33552641536875 {}
2022-02-15T08:33:06,669+0800 INFO [main] o.a.p.m.c.i.MetadataCacheImpl@279
- 0. Skip Refresh path /key-33552641536875 {}
2022-02-15T08:33:06,669+0800 INFO [main] o.a.p.m.c.i.MetadataCacheImpl@80 -
Async load for objCache /key-33552641536875 {}
2022-02-15T08:33:06,669+0800 INFO [main] o.a.p.m.c.i.MetadataCacheImpl@84 -
Async load for objCache complete /key-33552641536875 -
CacheGetResult(value=MetadataCacheTest.MyClass(a=a, b=1),
stat=Stat(path=/key-33552641536875, version=0, creationTimestamp=1644885186668,
modificationTimestamp=1644885186668, ephemeral=false, createdBySelf=true)) {}
2022-02-15T08:33:06,669+0800 INFO [metadata-store-2437-1]
o.a.p.m.c.i.MetadataCacheImpl@272 - 1. Refresh path /key-33552641536875 {}
2022-02-15T08:33:06,669+0800 INFO [metadata-store-2437-1]
o.a.p.m.c.i.MetadataCacheImpl@80 - Async load for objCache /key-33552641536875
{}
2022-02-15T08:33:06,670+0800 INFO [main] o.a.p.m.c.i.MetadataCacheImpl@272
- 1. Refresh path /key-33552641536875 {}
2022-02-15T08:33:06,670+0800 INFO [main] o.a.p.m.c.i.MetadataCacheImpl@279
- 0. Skip Refresh path /key-33552641536875 {}
2022-02-15T08:33:06,670+0800 INFO [main] o.a.p.m.c.i.MetadataCacheImpl@272
- 1. Refresh path /key-33552641536875 {}
2022-02-15T08:33:06,670+0800 INFO [main] o.a.p.m.c.i.MetadataCacheImpl@279
- 0. Skip Refresh path /key-33552641536875 {}
2022-02-15T08:33:06,671+0800 INFO [metadata-store-2437-1]
o.a.p.m.c.i.MetadataCacheImpl@84 - Async load for objCache complete
/key-33552641536875 - CacheGetResult(value=MetadataCacheTest.MyClass(a=a, b=1),
stat=Stat(path=/key-33552641536875, version=0, creationTimestamp=1644885186668,
modificationTimestamp=1644885186668, ephemeral=false, createdBySelf=true)) {}
2022-02-15T08:33:06,671+0800 INFO [metadata-store-2437-1]
o.a.p.m.c.i.MetadataCacheImpl@277 - 2. Refresh path complete
/key-33552641536875:MetadataCacheTest.MyClass(a=a, b=1) {}
2022-02-15T08:33:06,671+0800 INFO [metadata-store-2437-1]
o.a.p.m.c.i.MetadataCacheImpl@272 - 1. Refresh path /key-33552641536875 {}
2022-02-15T08:33:06,671+0800 INFO [metadata-store-2437-1]
o.a.p.m.c.i.MetadataCacheImpl@80 - Async load for objCache /key-33552641536875
{}
2022-02-15T08:33:06,671+0800 INFO [metadata-store-2437-1]
o.a.p.m.c.i.MetadataCacheImpl@84 - Async load for objCache complete
/key-33552641536875 - CacheGetResult(value=MetadataCacheTest.MyClass(a=a, b=2),
stat=Stat(path=/key-33552641536875, version=1, creationTimestamp=1644885186668,
modificationTimestamp=1644885186669, ephemeral=false, createdBySelf=true)) {}
2022-02-15T08:33:06,671+0800 INFO [metadata-store-2437-1]
o.a.p.m.c.i.MetadataCacheImpl@277 - 2. Refresh path complete
/key-33552641536875:MetadataCacheTest.MyClass(a=a, b=2) {}
```
*Correct* log:
```shell
2022-02-15T08:33:06,659+0800 INFO [main] o.a.p.m.c.i.MetadataCacheImpl@272
- 1. Refresh path /key-33552632155000 {}
2022-02-15T08:33:06,659+0800 INFO [main] o.a.p.m.c.i.MetadataCacheImpl@279
- 0. Skip Refresh path /key-33552632155000 {}
2022-02-15T08:33:06,659+0800 INFO [main] o.a.p.m.c.i.MetadataCacheImpl@80 -
Async load for objCache /key-33552632155000 {}
2022-02-15T08:33:06,659+0800 INFO [main] o.a.p.m.c.i.MetadataCacheImpl@84 -
Async load for objCache complete /key-33552632155000 -
CacheGetResult(value=MetadataCacheTest.MyClass(a=a, b=1),
stat=Stat(path=/key-33552632155000, version=0, creationTimestamp=1644885186659,
modificationTimestamp=1644885186659, ephemeral=false, createdBySelf=true)) {}
2022-02-15T08:33:06,661+0800 INFO [main] o.a.p.m.c.i.MetadataCacheImpl@272
- 1. Refresh path /key-33552632155000 {}
2022-02-15T08:33:06,661+0800 INFO [metadata-store-2436-1]
o.a.p.m.c.i.MetadataCacheImpl@272 - 1. Refresh path /key-33552632155000 {}
2022-02-15T08:33:06,661+0800 INFO [main] o.a.p.m.c.i.MetadataCacheImpl@80 -
Async load for objCache /key-33552632155000 {}
2022-02-15T08:33:06,661+0800 INFO [metadata-store-2436-1]
o.a.p.m.c.i.MetadataCacheImpl@279 - 0. Skip Refresh path /key-33552632155000 {}
2022-02-15T08:33:06,661+0800 INFO [main] o.a.p.m.c.i.MetadataCacheImpl@84 -
Async load for objCache complete /key-33552632155000 -
CacheGetResult(value=MetadataCacheTest.MyClass(a=a, b=2),
stat=Stat(path=/key-33552632155000, version=1, creationTimestamp=1644885186659,
modificationTimestamp=1644885186659, ephemeral=false, createdBySelf=true)) {}
2022-02-15T08:33:06,661+0800 INFO [main] o.a.p.m.c.i.MetadataCacheImpl@277
- 2. Refresh path complete /key-33552632155000:MetadataCacheTest.MyClass(a=a,
b=2) {}
2022-02-15T08:33:06,661+0800 INFO [main] o.a.p.m.c.i.MetadataCacheImpl@272
- 1. Refresh path /key-33552632155000 {}
2022-02-15T08:33:06,661+0800 INFO [main] o.a.p.m.c.i.MetadataCacheImpl@80 -
Async load for objCache /key-33552632155000 {}
2022-02-15T08:33:06,661+0800 INFO [main] o.a.p.m.c.i.MetadataCacheImpl@84 -
Async load for objCache complete /key-33552632155000 -
CacheGetResult(value=MetadataCacheTest.MyClass(a=a, b=2),
stat=Stat(path=/key-33552632155000, version=1, creationTimestamp=1644885186659,
modificationTimestamp=1644885186659, ephemeral=false, createdBySelf=true)) {}
2022-02-15T08:33:06,661+0800 INFO [main] o.a.p.m.c.i.MetadataCacheImpl@277
- 2. Refresh path complete /key-33552632155000:MetadataCacheTest.MyClass(a=a,
b=2) {}
2022-02-15T08:33:06,662+0800 INFO [main] o.a.p.m.c.i.MetadataCacheImpl@80 -
Async load for objCache /key-33552635148375 {}
```
### Modifications
Make metadata cache refresh async.
### Verifying this change
- [x] Make sure that the change passes the CI checks.
### Documentation
Need to update docs?
- [x] `no-need-doc`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]