nodece edited a comment on pull request #14320:
URL: https://github.com/apache/pulsar/pull/14320#issuecomment-1042715597


   The unavailability of the Admin API is not caused by the HTTP server thread, 
the root cause is that the ZK callback thread is blocked. 
   
   When an admin API calls the ZK metadatastore API, it gets the ZK data by 
call the `CompletableFuture`, note that we did not use the executor to execute 
the `CompletableFuture#complete()` in 
[ZKMetadataStore.java#L171](https://github.com/apache/pulsar/blob/master/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java#L171).
 In ZK callback thread, once the caller converts async to sync calls then the 
ZK callback thread will be blocked, this code so like: 
`metadata.getAsync().get(30, TimeUnit.SECONDS)`.
   
   How to solve this problem?
   1. Use an executor to execute the callback that passes data to Pulsar in ZK 
callback
   2. Don't convert async to sync calls
   
   
   How to reproduce the ZK callback thread is blocked:
   ```
   docker run -d -p 2181:2181 --name test-zookeeper zookeeper
   ```
   ```java
   public class Main {
       private static final long CACHE_REFRESH_TIME_MILLIS = 
TimeUnit.MINUTES.toMillis(5);
   
       public static void printThread(String name) {
           System.out.println(name + " thread name -> " + 
Thread.currentThread().getName());
       }
   
       public static void main(String[] args) throws Exception {
           ZooKeeper zkc = new ZooKeeper("localhost:2181", 60_000, null);
   
           System.out.println("Check the zk connect");
           CountDownLatch zkLatch = new CountDownLatch(1);
           new Thread(() -> {
               while (true) {
                   if (zkc.getState().isConnected()) {
                       zkLatch.countDown();
                       break;
                   }
               }
           }).start();
           if (!zkLatch.await(5, TimeUnit.SECONDS)) {
               throw new Exception("zk connect failed");
           }
   
           AsyncLoadingCache<String, byte[]> objCache = Caffeine.newBuilder()
                   .refreshAfterWrite(CACHE_REFRESH_TIME_MILLIS, 
TimeUnit.MILLISECONDS)
                   .buildAsync((key, executor) -> {
                       CompletableFuture<byte[]> future = new 
CompletableFuture<>();
                       zkc.multi(Lists.newArrayList(Op.getData("/")), (rc, 
path, ctx, opResults) -> {
                           printThread("zk callback");
                           future.complete(null);
                       }, null);
                       return future;
                   });
   
           CountDownLatch countDownLatch = new CountDownLatch(1);
   
           // Reproduce the ZK callback is blocked
           System.out.println("async get start");
           objCache.get("/").whenComplete((unused, ignored) -> {
               printThread("async get done");
               try {
                   System.out.println("zk thread will blocked after sync get");
                   System.out.println("sync get start");
                   objCache.get("/1").get(5, TimeUnit.SECONDS);
                   // Unreachable
                   printThread("sync get done");
                   countDownLatch.countDown();
               } catch (Exception e) {
                   e.printStackTrace();
               } finally {
                   countDownLatch.countDown();
               }
           });
   
           countDownLatch.await();
       }
   }
   ```
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to