LB-Yu opened a new issue, #1445: URL: https://github.com/apache/fluss/issues/1445
### Search before asking - [x] I searched in the [issues](https://github.com/alibaba/fluss/issues) and found nothing similar. ### Description Currently, the operation to update LeaderAndIsr in tryProcessAdjustIsr() is executed synchronously and sequentially. In large clusters, if ISR flapping occurs, it may result in a large number of AdjustIsrReceivedEvent events, which can in turn cause the Coordinator event processing thread to become blocked. We need to optimize the performance of tryProcessAdjustIsr() so that it can return more quickly. I tried both the batch and async API of Curator, and I foud that the batch API performs better. ```java public void batchUpdateLeaderAndIsr(Map<TableBucket, LeaderAndIsr> leaderANdIsrList) throws Exception { if (leaderANdIsrList.isEmpty()) { return; } List<CuratorOp> ops = new ArrayList<>(leaderANdIsrList.size()); for (Map.Entry<TableBucket, LeaderAndIsr> entry : leaderANdIsrList.entrySet()) { TableBucket tableBucket = entry.getKey(); LeaderAndIsr leaderAndIsr = entry.getValue(); String path = LeaderAndIsrZNode.path(tableBucket); byte[] data = LeaderAndIsrZNode.encode(leaderAndIsr); CuratorOp updateOp = zkClient.transactionOp().setData().forPath(path, data); ops.add(updateOp); if (ops.size() == MAX_BATCH_SIZE) { zkClient.transaction().forOperations(ops); ops.clear(); } } if (!ops.isEmpty()) { zkClient.transaction().forOperations(ops); } } public void asyncUpdateLeaderAndIsr(Map<TableBucket, LeaderAndIsr> leaderANdIsrList) throws Exception { if (leaderANdIsrList.isEmpty()) { return; } CountDownLatch latch = new CountDownLatch(leaderANdIsrList.size()); for (Map.Entry<TableBucket, LeaderAndIsr> entry : leaderANdIsrList.entrySet()) { TableBucket tableBucket = entry.getKey(); LeaderAndIsr leaderAndIsr = entry.getValue(); String path = LeaderAndIsrZNode.path(tableBucket); byte[] data = LeaderAndIsrZNode.encode(leaderAndIsr); zkClient.setData() .inBackground( (client, event) -> { latch.countDown(); }) .forPath(path, data); } latch.await(); } @Test void testBatchUpdateLeaderAndIsr() throws Exception { int totalCount = 100000; Map<TableBucket, LeaderAndIsr> leaderAndIsrList = new HashMap<>(); for (int i = 0; i < totalCount; i++) { TableBucket tableBucket = new TableBucket(1, i); LeaderAndIsr leaderAndIsr = new LeaderAndIsr(i, 10, Arrays.asList(i + 1, i + 2, i + 3), 100, 1000); leaderAndIsrList.put(tableBucket, leaderAndIsr); zookeeperClient.registerLeaderAndIsr(tableBucket, leaderAndIsr); } long startTime = System.currentTimeMillis(); zookeeperClient.batchUpdateLeaderAndIsr(leaderAndIsrList); long endTime = System.currentTimeMillis(); System.out.println("batchUpdateLeaderAndIsr cost: " + (endTime - startTime) + "ms"); startTime = System.currentTimeMillis(); zookeeperClient.asyncUpdateLeaderAndIsr(leaderAndIsrList); endTime = System.currentTimeMillis(); System.out.println("asyncUpdateLeaderAndIsr cost: " + (endTime - startTime) + "ms"); startTime = System.currentTimeMillis(); for (Map.Entry<TableBucket, LeaderAndIsr> entry : leaderAndIsrList.entrySet()) { zookeeperClient.updateLeaderAndIsr(entry.getKey(), entry.getValue()); } endTime = System.currentTimeMillis(); System.out.println("updateLeaderAndIsr cost: " + (endTime - startTime) + "ms"); } ``` In my local test, I get the result like below. So I will use batch operation to optimize it. ``` batchUpdateLeaderAndIsr cost: 777ms asyncUpdateLeaderAndIsr cost: 3528ms updateLeaderAndIsr cost: 12294ms ``` ### Willingness to contribute - [x] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
