lifepuzzlefun commented on code in PR #20130:
URL: https://github.com/apache/pulsar/pull/20130#discussion_r1186782667


##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java:
##########
@@ -117,17 +118,19 @@ public CompletableFuture<List<String>> asyncReload(String 
key, List<String> oldV
                 .buildAsync(new AsyncCacheLoader<String, Boolean>() {
                     @Override
                     public CompletableFuture<Boolean> asyncLoad(String key, 
Executor executor) {
-                        return existsFromStore(key);
+                        // force jump to forkJoinThread to avoid callback 
execute on the metadataThread
+                        return 
existsFromStore(key).thenApplyAsync(Function.identity());
                     }
 
                     @Override
                     public CompletableFuture<Boolean> asyncReload(String key, 
Boolean oldValue,
                             Executor executor) {
                         if (isConnected) {
-                            return existsFromStore(key);
+                            // force jump to forkJoinThread to avoid callback 
execute on the metadataThread
+                            return 
existsFromStore(key).thenApplyAsync(Function.identity());
                         } else {
                             // Do not refresh if we're not connected
-                            return CompletableFuture.completedFuture(oldValue);
+                            return CompletableFuture.supplyAsync(() -> 
oldValue);

Review Comment:
   hi @Technoboy- after read the Caffeine `asyncReload` method javadoc. i 
change back to the origin ·CompletableFuture.completedFuture` because the 
`asyncReload` method will be trigger on fork-join-pool which is safe to 
complete.
   



##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java:
##########
@@ -117,17 +118,19 @@ public CompletableFuture<List<String>> asyncReload(String 
key, List<String> oldV
                 .buildAsync(new AsyncCacheLoader<String, Boolean>() {
                     @Override
                     public CompletableFuture<Boolean> asyncLoad(String key, 
Executor executor) {
-                        return existsFromStore(key);
+                        // force jump to forkJoinThread to avoid callback 
execute on the metadataThread
+                        return 
existsFromStore(key).thenApplyAsync(Function.identity());
                     }
 
                     @Override
                     public CompletableFuture<Boolean> asyncReload(String key, 
Boolean oldValue,
                             Executor executor) {
                         if (isConnected) {
-                            return existsFromStore(key);
+                            // force jump to forkJoinThread to avoid callback 
execute on the metadataThread
+                            return 
existsFromStore(key).thenApplyAsync(Function.identity());
                         } else {
                             // Do not refresh if we're not connected
-                            return CompletableFuture.completedFuture(oldValue);
+                            return CompletableFuture.supplyAsync(() -> 
oldValue);

Review Comment:
   hi @Technoboy- after read the Caffeine `asyncReload` method javadoc. i 
change back to the origin `CompletableFuture.completedFuture` because the 
`asyncReload` method will be trigger on fork-join-pool which is safe to 
complete.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to