LB-Yu opened a new issue, #1445:
URL: https://github.com/apache/fluss/issues/1445

   ### Search before asking
   
   - [x] I searched in the [issues](https://github.com/alibaba/fluss/issues) 
and found nothing similar.
   
   
   ### Description
   
   Currently, the operation to update LeaderAndIsr in tryProcessAdjustIsr() is 
executed synchronously and sequentially. In large clusters, if ISR flapping 
occurs, it may result in a large number of AdjustIsrReceivedEvent events, which 
can in turn cause the Coordinator event processing thread to become blocked. We 
need to optimize the performance of tryProcessAdjustIsr() so that it can return 
more quickly.
   
   I tried both the batch and async API of Curator, and I foud that the batch 
API performs better.
   ```java
       public void batchUpdateLeaderAndIsr(Map<TableBucket, LeaderAndIsr> 
leaderANdIsrList)
               throws Exception {
           if (leaderANdIsrList.isEmpty()) {
               return;
           }
   
           List<CuratorOp> ops = new ArrayList<>(leaderANdIsrList.size());
           for (Map.Entry<TableBucket, LeaderAndIsr> entry : 
leaderANdIsrList.entrySet()) {
               TableBucket tableBucket = entry.getKey();
               LeaderAndIsr leaderAndIsr = entry.getValue();
   
               String path = LeaderAndIsrZNode.path(tableBucket);
               byte[] data = LeaderAndIsrZNode.encode(leaderAndIsr);
               CuratorOp updateOp = 
zkClient.transactionOp().setData().forPath(path, data);
               ops.add(updateOp);
               if (ops.size() == MAX_BATCH_SIZE) {
                   zkClient.transaction().forOperations(ops);
                   ops.clear();
               }
           }
           if (!ops.isEmpty()) {
               zkClient.transaction().forOperations(ops);
           }
       }
   
       public void asyncUpdateLeaderAndIsr(Map<TableBucket, LeaderAndIsr> 
leaderANdIsrList)
               throws Exception {
           if (leaderANdIsrList.isEmpty()) {
               return;
           }
   
           CountDownLatch latch = new CountDownLatch(leaderANdIsrList.size());
           for (Map.Entry<TableBucket, LeaderAndIsr> entry : 
leaderANdIsrList.entrySet()) {
               TableBucket tableBucket = entry.getKey();
               LeaderAndIsr leaderAndIsr = entry.getValue();
   
               String path = LeaderAndIsrZNode.path(tableBucket);
               byte[] data = LeaderAndIsrZNode.encode(leaderAndIsr);
               zkClient.setData()
                       .inBackground(
                               (client, event) -> {
                                   latch.countDown();
                               })
                       .forPath(path, data);
           }
           latch.await();
       }
   
       @Test
       void testBatchUpdateLeaderAndIsr() throws Exception {
           int totalCount = 100000;
   
           Map<TableBucket, LeaderAndIsr> leaderAndIsrList = new HashMap<>();
           for (int i = 0; i < totalCount; i++) {
               TableBucket tableBucket = new TableBucket(1, i);
               LeaderAndIsr leaderAndIsr =
                       new LeaderAndIsr(i, 10, Arrays.asList(i + 1, i + 2, i + 
3), 100, 1000);
               leaderAndIsrList.put(tableBucket, leaderAndIsr);
               zookeeperClient.registerLeaderAndIsr(tableBucket, leaderAndIsr);
           }
   
           long startTime = System.currentTimeMillis();
           zookeeperClient.batchUpdateLeaderAndIsr(leaderAndIsrList);
           long endTime = System.currentTimeMillis();
           System.out.println("batchUpdateLeaderAndIsr cost: " + (endTime - 
startTime) + "ms");
   
           startTime = System.currentTimeMillis();
           zookeeperClient.asyncUpdateLeaderAndIsr(leaderAndIsrList);
           endTime = System.currentTimeMillis();
           System.out.println("asyncUpdateLeaderAndIsr cost: " + (endTime - 
startTime) + "ms");
   
           startTime = System.currentTimeMillis();
           for (Map.Entry<TableBucket, LeaderAndIsr> entry : 
leaderAndIsrList.entrySet()) {
               zookeeperClient.updateLeaderAndIsr(entry.getKey(), 
entry.getValue());
           }
           endTime = System.currentTimeMillis();
           System.out.println("updateLeaderAndIsr cost: " + (endTime - 
startTime) + "ms");
       }
   ```
   
   In my local test, I get the result like below. So I will use batch operation 
to optimize it.
   ```
   batchUpdateLeaderAndIsr cost: 777ms
   asyncUpdateLeaderAndIsr cost: 3528ms
   updateLeaderAndIsr cost: 12294ms
   ```
   
   ### Willingness to contribute
   
   - [x] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to