codelipenghui commented on code in PR #20642:
URL: https://github.com/apache/pulsar/pull/20642#discussion_r1243173969
##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java:
##########
@@ -153,42 +178,57 @@ public void unwatchReadOnlyBookies(RegistrationListener
registrationListener) {
readOnlyBookiesWatchers.remove(registrationListener);
}
- private void handleDeletedBookieNode(Notification n) {
- if (n.getType() == NotificationType.Deleted) {
- BookieId bookieId = stripBookieIdFromPath(n.getPath());
- if (bookieId != null) {
- log.info("Bookie {} disappeared", bookieId);
- bookieServiceInfoCache.remove(bookieId);
- }
+ /**
+ * This method will receive metadata store notifications and then update
the
+ * local cache in background sequentially.
+ */
+ private void updatedBookies(Notification n) {
+ // make the notification callback run sequential in background.
+ final String path = n.getPath();
+ if (!path.startsWith(bookieReadonlyRegistrationPath) ||
!path.startsWith(bookieRegistrationPath)) {
Review Comment:
It should be `&&` here? Otherwise, we will miss the update from the
`bookieRegistrationPath` because
`!path.startsWith(bookieReadonlyRegistrationPath)` will be true.
##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java:
##########
@@ -91,38 +96,58 @@ public void close() {
@Override
public CompletableFuture<Versioned<Set<BookieId>>> getWritableBookies() {
- return getChildren(bookieRegistrationPath);
+ return getBookiesThenFreshCache(bookieRegistrationPath);
}
@Override
public CompletableFuture<Versioned<Set<BookieId>>> getAllBookies() {
// this method is meant to return all the known bookies, even the
bookies
// that are not in a running state
- return getChildren(bookieAllRegistrationPath);
+ return getBookiesThenFreshCache(bookieAllRegistrationPath);
}
@Override
public CompletableFuture<Versioned<Set<BookieId>>> getReadOnlyBookies() {
- return getChildren(bookieReadonlyRegistrationPath);
+ return getBookiesThenFreshCache(bookieReadonlyRegistrationPath);
}
- private CompletableFuture<Versioned<Set<BookieId>>> getChildren(String
path) {
+ /**
+ * @throws IllegalArgumentException if parameter path is null or empty.
+ */
+ private CompletableFuture<Versioned<Set<BookieId>>>
getBookiesThenFreshCache(String path) {
+ if (path == null || path.isEmpty()) {
+ return failedFuture(
+ new IllegalArgumentException(String.format("parameter
[path] can not be null or empty.")));
+ }
return store.getChildren(path)
.thenComposeAsync(children -> {
- Set<BookieId> bookieIds =
PulsarRegistrationClient.convertToBookieAddresses(children);
- List<CompletableFuture<?>> bookieInfoUpdated =
- new ArrayList<>(bookieIds.size());
+ final Set<BookieId> bookieIds =
PulsarRegistrationClient.convertToBookieAddresses(children);
+ final List<CompletableFuture<?>> bookieInfoUpdated = new
ArrayList<>(bookieIds.size());
for (BookieId id : bookieIds) {
// update the cache for new bookies
- if (!bookieServiceInfoCache.containsKey(id)) {
+ if (path.equals(bookieRegistrationPath) &&
writableBookieInfo.get(id) == null) {
bookieInfoUpdated.add(readBookieServiceInfoAsync(id));
+ continue;
+ }
+ if (path.equals(bookieReadonlyRegistrationPath) &&
readOnlyBookieInfo.get(id) == null) {
+
bookieInfoUpdated.add(readBookieInfoAsReadonlyBookie(id));
+ continue;
+ }
+ if (path.equals(bookieAllRegistrationPath)) {
+ if (writableBookieInfo.get(id) != null ||
readOnlyBookieInfo != null) {
Review Comment:
```suggestion
if (writableBookieInfo.get(id) != null ||
readOnlyBookieInfo.get(id) != null) {
```
##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java:
##########
@@ -91,38 +96,58 @@ public void close() {
@Override
public CompletableFuture<Versioned<Set<BookieId>>> getWritableBookies() {
- return getChildren(bookieRegistrationPath);
+ return getBookiesThenFreshCache(bookieRegistrationPath);
}
@Override
public CompletableFuture<Versioned<Set<BookieId>>> getAllBookies() {
// this method is meant to return all the known bookies, even the
bookies
// that are not in a running state
- return getChildren(bookieAllRegistrationPath);
+ return getBookiesThenFreshCache(bookieAllRegistrationPath);
}
@Override
public CompletableFuture<Versioned<Set<BookieId>>> getReadOnlyBookies() {
- return getChildren(bookieReadonlyRegistrationPath);
+ return getBookiesThenFreshCache(bookieReadonlyRegistrationPath);
}
- private CompletableFuture<Versioned<Set<BookieId>>> getChildren(String
path) {
+ /**
+ * @throws IllegalArgumentException if parameter path is null or empty.
+ */
+ private CompletableFuture<Versioned<Set<BookieId>>>
getBookiesThenFreshCache(String path) {
+ if (path == null || path.isEmpty()) {
+ return failedFuture(
+ new IllegalArgumentException(String.format("parameter
[path] can not be null or empty.")));
+ }
return store.getChildren(path)
.thenComposeAsync(children -> {
- Set<BookieId> bookieIds =
PulsarRegistrationClient.convertToBookieAddresses(children);
- List<CompletableFuture<?>> bookieInfoUpdated =
- new ArrayList<>(bookieIds.size());
+ final Set<BookieId> bookieIds =
PulsarRegistrationClient.convertToBookieAddresses(children);
+ final List<CompletableFuture<?>> bookieInfoUpdated = new
ArrayList<>(bookieIds.size());
for (BookieId id : bookieIds) {
// update the cache for new bookies
- if (!bookieServiceInfoCache.containsKey(id)) {
+ if (path.equals(bookieRegistrationPath) &&
writableBookieInfo.get(id) == null) {
bookieInfoUpdated.add(readBookieServiceInfoAsync(id));
+ continue;
+ }
+ if (path.equals(bookieReadonlyRegistrationPath) &&
readOnlyBookieInfo.get(id) == null) {
+
bookieInfoUpdated.add(readBookieInfoAsReadonlyBookie(id));
+ continue;
+ }
+ if (path.equals(bookieAllRegistrationPath)) {
+ if (writableBookieInfo.get(id) != null ||
readOnlyBookieInfo != null) {
+ // jump to next bookie id
+ continue;
+ }
+
bookieInfoUpdated.add(readBookieServiceInfoAsync(id) // check writable first
+ .thenCompose(updated ->
+ updated ? completedFuture(null)
+ :
readBookieInfoAsReadonlyBookie(id))); // check read-only then
Review Comment:
It will be easier to read the code by changing the return type to
`Optional<BookieServiceInfo>` for methods `readBookieServiceInfoAsync` and
`readBookieInfoAsReadonlyBookie`. So that we can check if the BookieServiceInfo
is present or not here. It's a little confusing to use a BOOL value `updated`
here.
##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java:
##########
@@ -153,42 +178,57 @@ public void unwatchReadOnlyBookies(RegistrationListener
registrationListener) {
readOnlyBookiesWatchers.remove(registrationListener);
}
- private void handleDeletedBookieNode(Notification n) {
- if (n.getType() == NotificationType.Deleted) {
- BookieId bookieId = stripBookieIdFromPath(n.getPath());
- if (bookieId != null) {
- log.info("Bookie {} disappeared", bookieId);
- bookieServiceInfoCache.remove(bookieId);
- }
+ /**
+ * This method will receive metadata store notifications and then update
the
+ * local cache in background sequentially.
+ */
+ private void updatedBookies(Notification n) {
+ // make the notification callback run sequential in background.
+ final String path = n.getPath();
+ if (!path.startsWith(bookieReadonlyRegistrationPath) ||
!path.startsWith(bookieRegistrationPath)) {
+ // ignore unknown path
+ return;
}
- }
-
- private void handleUpdatedBookieNode(Notification n) {
- BookieId bookieId = stripBookieIdFromPath(n.getPath());
+ final BookieId bookieId = stripBookieIdFromPath(n.getPath());
if (bookieId != null) {
- log.info("Bookie {} info updated", bookieId);
- readBookieServiceInfoAsync(bookieId);
+ log.info("Bookie {} do {}. path: {}", bookieId, n.getType(),
n.getPath());
}
- }
-
- private void updatedBookies(Notification n) {
- if (n.getType() == NotificationType.Created || n.getType() ==
NotificationType.Deleted) {
- if (n.getPath().startsWith(bookieReadonlyRegistrationPath)) {
- getReadOnlyBookies().thenAccept(bookies -> {
- readOnlyBookiesWatchers.forEach(w -> executor.execute(()
-> w.onBookiesChanged(bookies)));
- });
- handleDeletedBookieNode(n);
- } else if (n.getPath().startsWith(bookieRegistrationPath)) {
- getWritableBookies().thenAccept(bookies ->
- writableBookiesWatchers.forEach(w ->
executor.execute(() -> w.onBookiesChanged(bookies))));
- handleDeletedBookieNode(n);
- }
- } else if (n.getType() == NotificationType.Modified) {
- if (n.getPath().startsWith(bookieReadonlyRegistrationPath)
- || n.getPath().startsWith(bookieRegistrationPath)) {
- handleUpdatedBookieNode(n);
+ sequencer.sequential(() -> {
+ switch (n.getType()) {
+ case Created:
+ if (path.equals(bookieReadonlyRegistrationPath)) {
Review Comment:
It should be `.startsWith`, not `.equals`?
##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java:
##########
@@ -227,35 +267,37 @@ public CompletableFuture<Versioned<BookieServiceInfo>>
getBookieServiceInfo(Book
// this is because there are a few cases in which some operations on
the main thread
// wait for the result. This is due to the fact that resolving the
address of a bookie
// is needed in many code paths.
- Versioned<BookieServiceInfo> resultFromCache =
bookieServiceInfoCache.get(bookieId);
+ Versioned<BookieServiceInfo> info;
+ if ((info = writableBookieInfo.get(bookieId)) == null) {
+ info = readOnlyBookieInfo.get(bookieId);
+ }
if (log.isDebugEnabled()) {
- log.debug("getBookieServiceInfo {} -> {}", bookieId,
resultFromCache);
+ log.debug("getBookieServiceInfo {} -> {}", bookieId, info);
}
- if (resultFromCache != null) {
- return CompletableFuture.completedFuture(resultFromCache);
+ if (info != null) {
+ return completedFuture(info);
} else {
return FutureUtils.exception(new
BKException.BKBookieHandleNotAvailableException());
}
}
- public CompletableFuture<Void> readBookieServiceInfoAsync(BookieId
bookieId) {
+ public CompletableFuture<Boolean> readBookieServiceInfoAsync(BookieId
bookieId) {
String asWritable = bookieRegistrationPath + "/" + bookieId;
return bookieServiceInfoMetadataCache.get(asWritable)
- .thenCompose((Optional<BookieServiceInfo> getResult) -> {
- if (getResult.isPresent()) {
- Versioned<BookieServiceInfo> res =
- new Versioned<>(getResult.get(), new
LongVersion(-1));
- log.info("Update BookieInfoCache (writable bookie) {}
-> {}", bookieId, getResult.get());
- bookieServiceInfoCache.put(bookieId, res);
- return CompletableFuture.completedFuture(null);
- } else {
- return readBookieInfoAsReadonlyBookie(bookieId);
- }
- }
- );
+ .thenApply((Optional<BookieServiceInfo> getResult) -> {
+ if (getResult.isPresent()) {
+ Versioned<BookieServiceInfo> res =
+ new Versioned<>(getResult.get(), new
LongVersion(-1));
Review Comment:
Can we remove the Versioned wrapper? I will always be `-1`, right?
##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java:
##########
@@ -153,42 +178,57 @@ public void unwatchReadOnlyBookies(RegistrationListener
registrationListener) {
readOnlyBookiesWatchers.remove(registrationListener);
}
- private void handleDeletedBookieNode(Notification n) {
- if (n.getType() == NotificationType.Deleted) {
- BookieId bookieId = stripBookieIdFromPath(n.getPath());
- if (bookieId != null) {
- log.info("Bookie {} disappeared", bookieId);
- bookieServiceInfoCache.remove(bookieId);
- }
+ /**
+ * This method will receive metadata store notifications and then update
the
+ * local cache in background sequentially.
+ */
+ private void updatedBookies(Notification n) {
+ // make the notification callback run sequential in background.
+ final String path = n.getPath();
+ if (!path.startsWith(bookieReadonlyRegistrationPath) ||
!path.startsWith(bookieRegistrationPath)) {
+ // ignore unknown path
+ return;
}
- }
-
- private void handleUpdatedBookieNode(Notification n) {
- BookieId bookieId = stripBookieIdFromPath(n.getPath());
+ final BookieId bookieId = stripBookieIdFromPath(n.getPath());
if (bookieId != null) {
- log.info("Bookie {} info updated", bookieId);
- readBookieServiceInfoAsync(bookieId);
+ log.info("Bookie {} do {}. path: {}", bookieId, n.getType(),
n.getPath());
}
- }
-
- private void updatedBookies(Notification n) {
- if (n.getType() == NotificationType.Created || n.getType() ==
NotificationType.Deleted) {
- if (n.getPath().startsWith(bookieReadonlyRegistrationPath)) {
- getReadOnlyBookies().thenAccept(bookies -> {
- readOnlyBookiesWatchers.forEach(w -> executor.execute(()
-> w.onBookiesChanged(bookies)));
- });
- handleDeletedBookieNode(n);
- } else if (n.getPath().startsWith(bookieRegistrationPath)) {
- getWritableBookies().thenAccept(bookies ->
- writableBookiesWatchers.forEach(w ->
executor.execute(() -> w.onBookiesChanged(bookies))));
- handleDeletedBookieNode(n);
- }
- } else if (n.getType() == NotificationType.Modified) {
- if (n.getPath().startsWith(bookieReadonlyRegistrationPath)
- || n.getPath().startsWith(bookieRegistrationPath)) {
- handleUpdatedBookieNode(n);
+ sequencer.sequential(() -> {
+ switch (n.getType()) {
+ case Created:
+ if (path.equals(bookieReadonlyRegistrationPath)) {
+ return getReadOnlyBookies().thenAccept(bookies ->
+ readOnlyBookiesWatchers.forEach(w ->
+ executor.execute(() ->
w.onBookiesChanged(bookies))));
+ }
+ return getWritableBookies().thenAccept(bookies ->
+ writableBookiesWatchers.forEach(w ->
+ executor.execute(() ->
w.onBookiesChanged(bookies))));
+ case Modified:
+ if (bookieId == null) {
+ return completedFuture(null);
+ }
+ if (path.equals(bookieReadonlyRegistrationPath)) {
+ return
readBookieInfoAsReadonlyBookie(bookieId).thenApply(__ -> null);
+ }
+ return readBookieServiceInfoAsync(bookieId).thenApply(__
-> null);
Review Comment:
Is it also better to remove the cache first? since it has been updated.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]