nodece commented on pull request #14320:
URL: https://github.com/apache/pulsar/pull/14320#issuecomment-1042715597


   The unavailability of the Admin API is not caused by the HTTP server thread, 
the root cause is that the ZK callback thread is blocked. 
   
   When an admin API calls the ZK metadatastore API, we can return the ZK data 
to the caller by `CompletableFuture`, Note that we did not use the executor to 
execute the callback in 
[ZKMetadataStore.java#L171](https://github.com/apache/pulsar/blob/master/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java#L171).
 In ZK callback thread, once the caller converts async to sync calls, the code 
so like: `metadata.getAsync().get(30, TimeUnit.SECONDS)`, then the ZK callback 
thread will be blocked.
   
   How to solve this problem?
   1. Use an executor to execute the callback that passes data to Pulsar in ZK 
callback
   2. Don't convert async to sync calls
   
   
   How to reproduce the ZK callback thread is blocked:
   ```
   docker run -d -p 2181:2181 --name test-zookeeper zookeeper
   ```
   ```java
   public class Main {
       private static final long CACHE_REFRESH_TIME_MILLIS = 
TimeUnit.MINUTES.toMillis(5);
   
       public static void printThread(String name) {
           System.out.println(name + " thread name -> " + 
Thread.currentThread().getName());
       }
   
       public static void main(String[] args) throws Exception {
           ZooKeeper zkc = new ZooKeeper("localhost:2181", 60_000, null);
   
           System.out.println("Check the zk connect");
           CountDownLatch zkLatch = new CountDownLatch(1);
           new Thread(() -> {
               while (true) {
                   if (zkc.getState().isConnected()) {
                       zkLatch.countDown();
                       break;
                   }
               }
           }).start();
           if (!zkLatch.await(5, TimeUnit.SECONDS)) {
               throw new Exception("zk connect failed");
           }
   
           AsyncLoadingCache<String, byte[]> objCache = Caffeine.newBuilder()
                   .refreshAfterWrite(CACHE_REFRESH_TIME_MILLIS, 
TimeUnit.MILLISECONDS)
                   .buildAsync((key, executor) -> {
                       CompletableFuture<byte[]> future = new 
CompletableFuture<>();
                       zkc.multi(Lists.newArrayList(Op.getData("/")), (rc, 
path, ctx, opResults) -> {
                           printThread("zk callback");
                           future.complete(null);
                       }, null);
                       return future;
                   });
   
           CountDownLatch countDownLatch = new CountDownLatch(1);
   
           // Reproduce the ZK callback is blocked
           System.out.println("async get start");
           objCache.get("/").whenComplete((unused, ignored) -> {
               printThread("async get done");
               try {
                   System.out.println("zk thread will blocked after sync get");
                   System.out.println("sync get start");
                   objCache.get("/1").get(5, TimeUnit.SECONDS);
                   printThread("sync get done");
                   countDownLatch.countDown();
               } catch (Exception e) {
                   e.printStackTrace();
               } finally {
                   countDownLatch.countDown();
               }
           });
   
           countDownLatch.await();
       }
   }
   ```
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to