mattisonchao commented on code in PR #20642:
URL: https://github.com/apache/pulsar/pull/20642#discussion_r1243210715
##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java:
##########
@@ -91,38 +96,58 @@ public void close() {
@Override
public CompletableFuture<Versioned<Set<BookieId>>> getWritableBookies() {
- return getChildren(bookieRegistrationPath);
+ return getBookiesThenFreshCache(bookieRegistrationPath);
}
@Override
public CompletableFuture<Versioned<Set<BookieId>>> getAllBookies() {
// this method is meant to return all the known bookies, even the
bookies
// that are not in a running state
- return getChildren(bookieAllRegistrationPath);
+ return getBookiesThenFreshCache(bookieAllRegistrationPath);
}
@Override
public CompletableFuture<Versioned<Set<BookieId>>> getReadOnlyBookies() {
- return getChildren(bookieReadonlyRegistrationPath);
+ return getBookiesThenFreshCache(bookieReadonlyRegistrationPath);
}
- private CompletableFuture<Versioned<Set<BookieId>>> getChildren(String
path) {
+ /**
+ * @throws IllegalArgumentException if parameter path is null or empty.
+ */
+ private CompletableFuture<Versioned<Set<BookieId>>>
getBookiesThenFreshCache(String path) {
+ if (path == null || path.isEmpty()) {
+ return failedFuture(
+ new IllegalArgumentException(String.format("parameter
[path] can not be null or empty.")));
+ }
return store.getChildren(path)
.thenComposeAsync(children -> {
- Set<BookieId> bookieIds =
PulsarRegistrationClient.convertToBookieAddresses(children);
- List<CompletableFuture<?>> bookieInfoUpdated =
- new ArrayList<>(bookieIds.size());
+ final Set<BookieId> bookieIds =
PulsarRegistrationClient.convertToBookieAddresses(children);
+ final List<CompletableFuture<?>> bookieInfoUpdated = new
ArrayList<>(bookieIds.size());
for (BookieId id : bookieIds) {
// update the cache for new bookies
- if (!bookieServiceInfoCache.containsKey(id)) {
+ if (path.equals(bookieRegistrationPath) &&
writableBookieInfo.get(id) == null) {
bookieInfoUpdated.add(readBookieServiceInfoAsync(id));
+ continue;
+ }
+ if (path.equals(bookieReadonlyRegistrationPath) &&
readOnlyBookieInfo.get(id) == null) {
+
bookieInfoUpdated.add(readBookieInfoAsReadonlyBookie(id));
+ continue;
+ }
+ if (path.equals(bookieAllRegistrationPath)) {
+ if (writableBookieInfo.get(id) != null ||
readOnlyBookieInfo != null) {
Review Comment:
fixed.
##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java:
##########
@@ -91,38 +96,58 @@ public void close() {
@Override
public CompletableFuture<Versioned<Set<BookieId>>> getWritableBookies() {
- return getChildren(bookieRegistrationPath);
+ return getBookiesThenFreshCache(bookieRegistrationPath);
}
@Override
public CompletableFuture<Versioned<Set<BookieId>>> getAllBookies() {
// this method is meant to return all the known bookies, even the
bookies
// that are not in a running state
- return getChildren(bookieAllRegistrationPath);
+ return getBookiesThenFreshCache(bookieAllRegistrationPath);
}
@Override
public CompletableFuture<Versioned<Set<BookieId>>> getReadOnlyBookies() {
- return getChildren(bookieReadonlyRegistrationPath);
+ return getBookiesThenFreshCache(bookieReadonlyRegistrationPath);
}
- private CompletableFuture<Versioned<Set<BookieId>>> getChildren(String
path) {
+ /**
+ * @throws IllegalArgumentException if parameter path is null or empty.
+ */
+ private CompletableFuture<Versioned<Set<BookieId>>>
getBookiesThenFreshCache(String path) {
+ if (path == null || path.isEmpty()) {
+ return failedFuture(
+ new IllegalArgumentException(String.format("parameter
[path] can not be null or empty.")));
+ }
return store.getChildren(path)
.thenComposeAsync(children -> {
- Set<BookieId> bookieIds =
PulsarRegistrationClient.convertToBookieAddresses(children);
- List<CompletableFuture<?>> bookieInfoUpdated =
- new ArrayList<>(bookieIds.size());
+ final Set<BookieId> bookieIds =
PulsarRegistrationClient.convertToBookieAddresses(children);
+ final List<CompletableFuture<?>> bookieInfoUpdated = new
ArrayList<>(bookieIds.size());
for (BookieId id : bookieIds) {
// update the cache for new bookies
- if (!bookieServiceInfoCache.containsKey(id)) {
+ if (path.equals(bookieRegistrationPath) &&
writableBookieInfo.get(id) == null) {
bookieInfoUpdated.add(readBookieServiceInfoAsync(id));
+ continue;
+ }
+ if (path.equals(bookieReadonlyRegistrationPath) &&
readOnlyBookieInfo.get(id) == null) {
+
bookieInfoUpdated.add(readBookieInfoAsReadonlyBookie(id));
+ continue;
+ }
+ if (path.equals(bookieAllRegistrationPath)) {
+ if (writableBookieInfo.get(id) != null ||
readOnlyBookieInfo != null) {
+ // jump to next bookie id
+ continue;
+ }
+
bookieInfoUpdated.add(readBookieServiceInfoAsync(id) // check writable first
+ .thenCompose(updated ->
+ updated ? completedFuture(null)
+ :
readBookieInfoAsReadonlyBookie(id))); // check read-only then
Review Comment:
fixed
##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java:
##########
@@ -153,42 +178,57 @@ public void unwatchReadOnlyBookies(RegistrationListener
registrationListener) {
readOnlyBookiesWatchers.remove(registrationListener);
}
- private void handleDeletedBookieNode(Notification n) {
- if (n.getType() == NotificationType.Deleted) {
- BookieId bookieId = stripBookieIdFromPath(n.getPath());
- if (bookieId != null) {
- log.info("Bookie {} disappeared", bookieId);
- bookieServiceInfoCache.remove(bookieId);
- }
+ /**
+ * This method will receive metadata store notifications and then update
the
+ * local cache in background sequentially.
+ */
+ private void updatedBookies(Notification n) {
+ // make the notification callback run sequential in background.
+ final String path = n.getPath();
+ if (!path.startsWith(bookieReadonlyRegistrationPath) ||
!path.startsWith(bookieRegistrationPath)) {
Review Comment:
fixed
##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java:
##########
@@ -153,42 +178,57 @@ public void unwatchReadOnlyBookies(RegistrationListener
registrationListener) {
readOnlyBookiesWatchers.remove(registrationListener);
}
- private void handleDeletedBookieNode(Notification n) {
- if (n.getType() == NotificationType.Deleted) {
- BookieId bookieId = stripBookieIdFromPath(n.getPath());
- if (bookieId != null) {
- log.info("Bookie {} disappeared", bookieId);
- bookieServiceInfoCache.remove(bookieId);
- }
+ /**
+ * This method will receive metadata store notifications and then update
the
+ * local cache in background sequentially.
+ */
+ private void updatedBookies(Notification n) {
+ // make the notification callback run sequential in background.
+ final String path = n.getPath();
+ if (!path.startsWith(bookieReadonlyRegistrationPath) ||
!path.startsWith(bookieRegistrationPath)) {
+ // ignore unknown path
+ return;
}
- }
-
- private void handleUpdatedBookieNode(Notification n) {
- BookieId bookieId = stripBookieIdFromPath(n.getPath());
+ final BookieId bookieId = stripBookieIdFromPath(n.getPath());
if (bookieId != null) {
- log.info("Bookie {} info updated", bookieId);
- readBookieServiceInfoAsync(bookieId);
+ log.info("Bookie {} do {}. path: {}", bookieId, n.getType(),
n.getPath());
}
- }
-
- private void updatedBookies(Notification n) {
- if (n.getType() == NotificationType.Created || n.getType() ==
NotificationType.Deleted) {
- if (n.getPath().startsWith(bookieReadonlyRegistrationPath)) {
- getReadOnlyBookies().thenAccept(bookies -> {
- readOnlyBookiesWatchers.forEach(w -> executor.execute(()
-> w.onBookiesChanged(bookies)));
- });
- handleDeletedBookieNode(n);
- } else if (n.getPath().startsWith(bookieRegistrationPath)) {
- getWritableBookies().thenAccept(bookies ->
- writableBookiesWatchers.forEach(w ->
executor.execute(() -> w.onBookiesChanged(bookies))));
- handleDeletedBookieNode(n);
- }
- } else if (n.getType() == NotificationType.Modified) {
- if (n.getPath().startsWith(bookieReadonlyRegistrationPath)
- || n.getPath().startsWith(bookieRegistrationPath)) {
- handleUpdatedBookieNode(n);
+ sequencer.sequential(() -> {
+ switch (n.getType()) {
+ case Created:
+ if (path.equals(bookieReadonlyRegistrationPath)) {
Review Comment:
fixed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]