lifepuzzlefun commented on code in PR #20130:
URL: https://github.com/apache/pulsar/pull/20130#discussion_r1186782667
##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java:
##########
@@ -117,17 +118,19 @@ public CompletableFuture<List<String>> asyncReload(String
key, List<String> oldV
.buildAsync(new AsyncCacheLoader<String, Boolean>() {
@Override
public CompletableFuture<Boolean> asyncLoad(String key,
Executor executor) {
- return existsFromStore(key);
+ // force jump to forkJoinThread to avoid callback
execute on the metadataThread
+ return
existsFromStore(key).thenApplyAsync(Function.identity());
}
@Override
public CompletableFuture<Boolean> asyncReload(String key,
Boolean oldValue,
Executor executor) {
if (isConnected) {
- return existsFromStore(key);
+ // force jump to forkJoinThread to avoid callback
execute on the metadataThread
+ return
existsFromStore(key).thenApplyAsync(Function.identity());
} else {
// Do not refresh if we're not connected
- return CompletableFuture.completedFuture(oldValue);
+ return CompletableFuture.supplyAsync(() ->
oldValue);
Review Comment:
hi @Technoboy- after read the Caffeine `asyncReload` method javadoc. i
change back to the origin ·CompletableFuture.completedFuture` because the
`asyncReload` method will be trigger on fork-join-pool which is safe to
complete.
##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java:
##########
@@ -117,17 +118,19 @@ public CompletableFuture<List<String>> asyncReload(String
key, List<String> oldV
.buildAsync(new AsyncCacheLoader<String, Boolean>() {
@Override
public CompletableFuture<Boolean> asyncLoad(String key,
Executor executor) {
- return existsFromStore(key);
+ // force jump to forkJoinThread to avoid callback
execute on the metadataThread
+ return
existsFromStore(key).thenApplyAsync(Function.identity());
}
@Override
public CompletableFuture<Boolean> asyncReload(String key,
Boolean oldValue,
Executor executor) {
if (isConnected) {
- return existsFromStore(key);
+ // force jump to forkJoinThread to avoid callback
execute on the metadataThread
+ return
existsFromStore(key).thenApplyAsync(Function.identity());
} else {
// Do not refresh if we're not connected
- return CompletableFuture.completedFuture(oldValue);
+ return CompletableFuture.supplyAsync(() ->
oldValue);
Review Comment:
hi @Technoboy- after read the Caffeine `asyncReload` method javadoc. i
change back to the origin `CompletableFuture.completedFuture` because the
`asyncReload` method will be trigger on fork-join-pool which is safe to
complete.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]