This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d53d3618b9 [fix][broker] fix broker may lost rack information (#23331)
1d53d3618b9 is described below

commit 1d53d3618b9f72f8c66dad6fc2f6248d6d79d2c1
Author: ken <[email protected]>
AuthorDate: Sat Feb 15 07:30:56 2025 +0800

    [fix][broker] fix broker may lost rack information (#23331)
    
    Co-authored-by: fanjianye <[email protected]>
---
 .../metadata/bookkeeper/PulsarRegistrationClient.java      | 14 ++++++++++++--
 1 file changed, 12 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
index be945d988fb..89dbf2be990 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
@@ -181,8 +181,13 @@ public class PulsarRegistrationClient implements 
RegistrationClient {
     @Override
     public CompletableFuture<Void> watchWritableBookies(RegistrationListener 
registrationListener) {
         writableBookiesWatchers.add(registrationListener);
+        // trigger all listeners in writableBookiesWatchers one by one. It 
aims to keep a sync way
+        // to make sure the previous listener has finished when a new listener 
is register.
+        // Though it would bring duplicate trigger listener problem, but since 
watchWritableBookies
+        // is only executed when bookieClient construct, the duplicate problem 
is acceptable.
         return getWritableBookies()
-                .thenAcceptAsync(registrationListener::onBookiesChanged, 
executor);
+                .thenAcceptAsync(bookies ->
+                        writableBookiesWatchers.forEach(w -> 
w.onBookiesChanged(bookies)), executor);
     }
 
     @Override
@@ -193,8 +198,13 @@ public class PulsarRegistrationClient implements 
RegistrationClient {
     @Override
     public CompletableFuture<Void> watchReadOnlyBookies(RegistrationListener 
registrationListener) {
         readOnlyBookiesWatchers.add(registrationListener);
+        // trigger all listeners in readOnlyBookiesWatchers one by one. It 
aims to keep a sync way
+        // to make sure the previous listener has finished when a new listener 
is register.
+        // Though it would bring duplicate trigger listener problem, but since 
watchReadOnlyBookies
+        // is only executed when bookieClient construct, the duplicate problem 
is acceptable.
         return getReadOnlyBookies()
-                .thenAcceptAsync(registrationListener::onBookiesChanged, 
executor);
+                .thenAcceptAsync(bookies ->
+                        readOnlyBookiesWatchers.forEach(w -> 
w.onBookiesChanged(bookies)), executor);
     }
 
     @Override

Reply via email to