codelipenghui commented on code in PR #20642:
URL: https://github.com/apache/pulsar/pull/20642#discussion_r1243173969


##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java:
##########
@@ -153,42 +178,57 @@ public void unwatchReadOnlyBookies(RegistrationListener 
registrationListener) {
         readOnlyBookiesWatchers.remove(registrationListener);
     }
 
-    private void handleDeletedBookieNode(Notification n) {
-        if (n.getType() == NotificationType.Deleted) {
-            BookieId bookieId = stripBookieIdFromPath(n.getPath());
-            if (bookieId != null) {
-                log.info("Bookie {} disappeared", bookieId);
-                bookieServiceInfoCache.remove(bookieId);
-            }
+    /**
+     * This method will receive metadata store notifications and then update 
the
+     * local cache in background sequentially.
+     */
+    private void updatedBookies(Notification n) {
+        // make the notification callback run sequential in background.
+        final String path = n.getPath();
+        if (!path.startsWith(bookieReadonlyRegistrationPath) || 
!path.startsWith(bookieRegistrationPath)) {

Review Comment:
   It should be `&&` here? Otherwise, we will miss the update from the 
`bookieRegistrationPath` because 
`!path.startsWith(bookieReadonlyRegistrationPath)` will be true.



##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java:
##########
@@ -91,38 +96,58 @@ public void close() {
 
     @Override
     public CompletableFuture<Versioned<Set<BookieId>>> getWritableBookies() {
-        return getChildren(bookieRegistrationPath);
+        return getBookiesThenFreshCache(bookieRegistrationPath);
     }
 
     @Override
     public CompletableFuture<Versioned<Set<BookieId>>> getAllBookies() {
         // this method is meant to return all the known bookies, even the 
bookies
         // that are not in a running state
-        return getChildren(bookieAllRegistrationPath);
+        return getBookiesThenFreshCache(bookieAllRegistrationPath);
     }
 
     @Override
     public CompletableFuture<Versioned<Set<BookieId>>> getReadOnlyBookies() {
-        return getChildren(bookieReadonlyRegistrationPath);
+        return getBookiesThenFreshCache(bookieReadonlyRegistrationPath);
     }
 
-    private CompletableFuture<Versioned<Set<BookieId>>> getChildren(String 
path) {
+    /**
+     * @throws IllegalArgumentException if parameter path is null or empty.
+     */
+    private CompletableFuture<Versioned<Set<BookieId>>> 
getBookiesThenFreshCache(String path) {
+        if (path == null || path.isEmpty()) {
+            return failedFuture(
+                    new IllegalArgumentException(String.format("parameter 
[path] can not be null or empty.")));
+        }
         return store.getChildren(path)
                 .thenComposeAsync(children -> {
-                    Set<BookieId> bookieIds = 
PulsarRegistrationClient.convertToBookieAddresses(children);
-                    List<CompletableFuture<?>> bookieInfoUpdated =
-                            new ArrayList<>(bookieIds.size());
+                    final Set<BookieId> bookieIds = 
PulsarRegistrationClient.convertToBookieAddresses(children);
+                    final List<CompletableFuture<?>> bookieInfoUpdated = new 
ArrayList<>(bookieIds.size());
                     for (BookieId id : bookieIds) {
                         // update the cache for new bookies
-                        if (!bookieServiceInfoCache.containsKey(id)) {
+                        if (path.equals(bookieRegistrationPath) && 
writableBookieInfo.get(id) == null) {
                             
bookieInfoUpdated.add(readBookieServiceInfoAsync(id));
+                            continue;
+                        }
+                        if (path.equals(bookieReadonlyRegistrationPath) && 
readOnlyBookieInfo.get(id) == null) {
+                            
bookieInfoUpdated.add(readBookieInfoAsReadonlyBookie(id));
+                            continue;
+                        }
+                        if (path.equals(bookieAllRegistrationPath)) {
+                            if (writableBookieInfo.get(id) != null || 
readOnlyBookieInfo != null) {

Review Comment:
   ```suggestion
                               if (writableBookieInfo.get(id) != null || 
readOnlyBookieInfo.get(id) != null) {
   ```



##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java:
##########
@@ -91,38 +96,58 @@ public void close() {
 
     @Override
     public CompletableFuture<Versioned<Set<BookieId>>> getWritableBookies() {
-        return getChildren(bookieRegistrationPath);
+        return getBookiesThenFreshCache(bookieRegistrationPath);
     }
 
     @Override
     public CompletableFuture<Versioned<Set<BookieId>>> getAllBookies() {
         // this method is meant to return all the known bookies, even the 
bookies
         // that are not in a running state
-        return getChildren(bookieAllRegistrationPath);
+        return getBookiesThenFreshCache(bookieAllRegistrationPath);
     }
 
     @Override
     public CompletableFuture<Versioned<Set<BookieId>>> getReadOnlyBookies() {
-        return getChildren(bookieReadonlyRegistrationPath);
+        return getBookiesThenFreshCache(bookieReadonlyRegistrationPath);
     }
 
-    private CompletableFuture<Versioned<Set<BookieId>>> getChildren(String 
path) {
+    /**
+     * @throws IllegalArgumentException if parameter path is null or empty.
+     */
+    private CompletableFuture<Versioned<Set<BookieId>>> 
getBookiesThenFreshCache(String path) {
+        if (path == null || path.isEmpty()) {
+            return failedFuture(
+                    new IllegalArgumentException(String.format("parameter 
[path] can not be null or empty.")));
+        }
         return store.getChildren(path)
                 .thenComposeAsync(children -> {
-                    Set<BookieId> bookieIds = 
PulsarRegistrationClient.convertToBookieAddresses(children);
-                    List<CompletableFuture<?>> bookieInfoUpdated =
-                            new ArrayList<>(bookieIds.size());
+                    final Set<BookieId> bookieIds = 
PulsarRegistrationClient.convertToBookieAddresses(children);
+                    final List<CompletableFuture<?>> bookieInfoUpdated = new 
ArrayList<>(bookieIds.size());
                     for (BookieId id : bookieIds) {
                         // update the cache for new bookies
-                        if (!bookieServiceInfoCache.containsKey(id)) {
+                        if (path.equals(bookieRegistrationPath) && 
writableBookieInfo.get(id) == null) {
                             
bookieInfoUpdated.add(readBookieServiceInfoAsync(id));
+                            continue;
+                        }
+                        if (path.equals(bookieReadonlyRegistrationPath) && 
readOnlyBookieInfo.get(id) == null) {
+                            
bookieInfoUpdated.add(readBookieInfoAsReadonlyBookie(id));
+                            continue;
+                        }
+                        if (path.equals(bookieAllRegistrationPath)) {
+                            if (writableBookieInfo.get(id) != null || 
readOnlyBookieInfo != null) {
+                                // jump to next bookie id
+                                continue;
+                            }
+                            
bookieInfoUpdated.add(readBookieServiceInfoAsync(id) // check writable first
+                                    .thenCompose(updated ->
+                                            updated ? completedFuture(null)
+                                                    : 
readBookieInfoAsReadonlyBookie(id))); // check read-only then

Review Comment:
   It will be easier to read the code by changing the return type to 
`Optional<BookieServiceInfo>` for methods `readBookieServiceInfoAsync` and 
`readBookieInfoAsReadonlyBookie`. So that we can check if the BookieServiceInfo 
is present or not here. It's a little confusing to use a BOOL value `updated` 
here.



##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java:
##########
@@ -153,42 +178,57 @@ public void unwatchReadOnlyBookies(RegistrationListener 
registrationListener) {
         readOnlyBookiesWatchers.remove(registrationListener);
     }
 
-    private void handleDeletedBookieNode(Notification n) {
-        if (n.getType() == NotificationType.Deleted) {
-            BookieId bookieId = stripBookieIdFromPath(n.getPath());
-            if (bookieId != null) {
-                log.info("Bookie {} disappeared", bookieId);
-                bookieServiceInfoCache.remove(bookieId);
-            }
+    /**
+     * This method will receive metadata store notifications and then update 
the
+     * local cache in background sequentially.
+     */
+    private void updatedBookies(Notification n) {
+        // make the notification callback run sequential in background.
+        final String path = n.getPath();
+        if (!path.startsWith(bookieReadonlyRegistrationPath) || 
!path.startsWith(bookieRegistrationPath)) {
+            // ignore unknown path
+            return;
         }
-    }
-
-    private void handleUpdatedBookieNode(Notification n) {
-        BookieId bookieId = stripBookieIdFromPath(n.getPath());
+        final BookieId bookieId = stripBookieIdFromPath(n.getPath());
         if (bookieId != null) {
-            log.info("Bookie {} info updated", bookieId);
-            readBookieServiceInfoAsync(bookieId);
+            log.info("Bookie {} do {}. path: {}", bookieId, n.getType(), 
n.getPath());
         }
-    }
-
-    private void updatedBookies(Notification n) {
-        if (n.getType() == NotificationType.Created || n.getType() == 
NotificationType.Deleted) {
-            if (n.getPath().startsWith(bookieReadonlyRegistrationPath)) {
-                getReadOnlyBookies().thenAccept(bookies -> {
-                    readOnlyBookiesWatchers.forEach(w -> executor.execute(() 
-> w.onBookiesChanged(bookies)));
-                });
-                handleDeletedBookieNode(n);
-            } else if (n.getPath().startsWith(bookieRegistrationPath)) {
-                  getWritableBookies().thenAccept(bookies ->
-                        writableBookiesWatchers.forEach(w -> 
executor.execute(() -> w.onBookiesChanged(bookies))));
-                handleDeletedBookieNode(n);
-            }
-        } else if (n.getType() == NotificationType.Modified) {
-            if (n.getPath().startsWith(bookieReadonlyRegistrationPath)
-                || n.getPath().startsWith(bookieRegistrationPath)) {
-                handleUpdatedBookieNode(n);
+        sequencer.sequential(() -> {
+            switch (n.getType()) {
+                case Created:
+                    if (path.equals(bookieReadonlyRegistrationPath)) {

Review Comment:
   It should be `.startsWith`, not `.equals`? 



##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java:
##########
@@ -227,35 +267,37 @@ public CompletableFuture<Versioned<BookieServiceInfo>> 
getBookieServiceInfo(Book
         // this is because there are a few cases in which some operations on 
the main thread
         // wait for the result. This is due to the fact that resolving the 
address of a bookie
         // is needed in many code paths.
-        Versioned<BookieServiceInfo> resultFromCache = 
bookieServiceInfoCache.get(bookieId);
+        Versioned<BookieServiceInfo> info;
+        if ((info = writableBookieInfo.get(bookieId)) == null) {
+            info = readOnlyBookieInfo.get(bookieId);
+        }
         if (log.isDebugEnabled()) {
-            log.debug("getBookieServiceInfo {} -> {}", bookieId, 
resultFromCache);
+            log.debug("getBookieServiceInfo {} -> {}", bookieId, info);
         }
-        if (resultFromCache != null) {
-            return CompletableFuture.completedFuture(resultFromCache);
+        if (info != null) {
+            return completedFuture(info);
         } else {
             return FutureUtils.exception(new 
BKException.BKBookieHandleNotAvailableException());
         }
     }
 
-    public CompletableFuture<Void> readBookieServiceInfoAsync(BookieId 
bookieId) {
+    public CompletableFuture<Boolean> readBookieServiceInfoAsync(BookieId 
bookieId) {
         String asWritable = bookieRegistrationPath + "/" + bookieId;
         return bookieServiceInfoMetadataCache.get(asWritable)
-                .thenCompose((Optional<BookieServiceInfo> getResult) -> {
-                    if (getResult.isPresent()) {
-                        Versioned<BookieServiceInfo> res =
-                                new Versioned<>(getResult.get(), new 
LongVersion(-1));
-                        log.info("Update BookieInfoCache (writable bookie) {} 
-> {}", bookieId, getResult.get());
-                        bookieServiceInfoCache.put(bookieId, res);
-                        return CompletableFuture.completedFuture(null);
-                    } else {
-                        return readBookieInfoAsReadonlyBookie(bookieId);
-                    }
-                }
-        );
+                .thenApply((Optional<BookieServiceInfo> getResult) -> {
+                            if (getResult.isPresent()) {
+                                Versioned<BookieServiceInfo> res =
+                                        new Versioned<>(getResult.get(), new 
LongVersion(-1));

Review Comment:
   Can we remove the Versioned wrapper? I will always be `-1`, right?



##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java:
##########
@@ -153,42 +178,57 @@ public void unwatchReadOnlyBookies(RegistrationListener 
registrationListener) {
         readOnlyBookiesWatchers.remove(registrationListener);
     }
 
-    private void handleDeletedBookieNode(Notification n) {
-        if (n.getType() == NotificationType.Deleted) {
-            BookieId bookieId = stripBookieIdFromPath(n.getPath());
-            if (bookieId != null) {
-                log.info("Bookie {} disappeared", bookieId);
-                bookieServiceInfoCache.remove(bookieId);
-            }
+    /**
+     * This method will receive metadata store notifications and then update 
the
+     * local cache in background sequentially.
+     */
+    private void updatedBookies(Notification n) {
+        // make the notification callback run sequential in background.
+        final String path = n.getPath();
+        if (!path.startsWith(bookieReadonlyRegistrationPath) || 
!path.startsWith(bookieRegistrationPath)) {
+            // ignore unknown path
+            return;
         }
-    }
-
-    private void handleUpdatedBookieNode(Notification n) {
-        BookieId bookieId = stripBookieIdFromPath(n.getPath());
+        final BookieId bookieId = stripBookieIdFromPath(n.getPath());
         if (bookieId != null) {
-            log.info("Bookie {} info updated", bookieId);
-            readBookieServiceInfoAsync(bookieId);
+            log.info("Bookie {} do {}. path: {}", bookieId, n.getType(), 
n.getPath());
         }
-    }
-
-    private void updatedBookies(Notification n) {
-        if (n.getType() == NotificationType.Created || n.getType() == 
NotificationType.Deleted) {
-            if (n.getPath().startsWith(bookieReadonlyRegistrationPath)) {
-                getReadOnlyBookies().thenAccept(bookies -> {
-                    readOnlyBookiesWatchers.forEach(w -> executor.execute(() 
-> w.onBookiesChanged(bookies)));
-                });
-                handleDeletedBookieNode(n);
-            } else if (n.getPath().startsWith(bookieRegistrationPath)) {
-                  getWritableBookies().thenAccept(bookies ->
-                        writableBookiesWatchers.forEach(w -> 
executor.execute(() -> w.onBookiesChanged(bookies))));
-                handleDeletedBookieNode(n);
-            }
-        } else if (n.getType() == NotificationType.Modified) {
-            if (n.getPath().startsWith(bookieReadonlyRegistrationPath)
-                || n.getPath().startsWith(bookieRegistrationPath)) {
-                handleUpdatedBookieNode(n);
+        sequencer.sequential(() -> {
+            switch (n.getType()) {
+                case Created:
+                    if (path.equals(bookieReadonlyRegistrationPath)) {
+                        return getReadOnlyBookies().thenAccept(bookies ->
+                                readOnlyBookiesWatchers.forEach(w ->
+                                        executor.execute(() -> 
w.onBookiesChanged(bookies))));
+                    }
+                    return getWritableBookies().thenAccept(bookies ->
+                            writableBookiesWatchers.forEach(w ->
+                                    executor.execute(() -> 
w.onBookiesChanged(bookies))));
+                case Modified:
+                    if (bookieId == null) {
+                        return completedFuture(null);
+                    }
+                    if (path.equals(bookieReadonlyRegistrationPath)) {
+                        return 
readBookieInfoAsReadonlyBookie(bookieId).thenApply(__ -> null);
+                    }
+                    return readBookieServiceInfoAsync(bookieId).thenApply(__ 
-> null);

Review Comment:
   Is it also better to remove the cache first? since it has been updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to