BewareMyPower commented on code in PR #23686:
URL: https://github.com/apache/pulsar/pull/23686#discussion_r1883181262
##########
pulsar-common/src/main/java/org/apache/pulsar/common/util/Backoff.java:
##########
@@ -35,6 +35,7 @@ public class Backoff {
private long next;
private long mandatoryStop;
+ @Getter
Review Comment:
We already have `@Data`
```suggestion
```
##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java:
##########
@@ -321,22 +330,33 @@ public void accept(Notification t) {
}
}
- private CompletableFuture<T>
executeWithRetry(Supplier<CompletableFuture<T>> op, String key) {
- CompletableFuture<T> result = new CompletableFuture<>();
+ private void execute(Supplier<CompletableFuture<T>> op, String key,
CompletableFuture<T> result, Backoff backoff) {
op.get().thenAccept(result::complete).exceptionally((ex) -> {
if (ex.getCause() instanceof BadVersionException) {
// if resource is updated by other than metadata-cache then
metadata-cache will get bad-version
// exception. so, try to invalidate the cache and try one more
time.
objCache.synchronous().invalidate(key);
- op.get().thenAccept(result::complete).exceptionally((ex1) -> {
- result.completeExceptionally(ex1.getCause());
+ if (backoff.isMandatoryStopMade()) {
+ result.completeExceptionally(new
TimeoutException(String.format("Timeout to update key %s", key)));
return null;
- });
+ }
+ final var next = backoff.next();
+ log.info("Update key {} conflicts. Retrying in {} ms.
Mandatory stop: {} ms. Elapsed time: {} ms", key,
+ next, backoff.isMandatoryStopMade(),
Review Comment:
`isMandatoryStopMade()` is a boolean but you logs `Mandatory stop: {} ms`.
##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java:
##########
@@ -58,6 +63,9 @@ public class MetadataCacheImpl<T> implements
MetadataCache<T>, Consumer<Notifica
private final MetadataStore store;
private final MetadataStoreExtended storeExtended;
private final MetadataSerde<T> serde;
+ private final ScheduledExecutorService backoffExecutor =
+ Executors.newSingleThreadScheduledExecutor(new
DefaultThreadFactory("metadata-cache-backoff"));
Review Comment:
Actually I'm wondering if we need one thread per `MetadataCache`. It means
each time a `MetadataCache` instance is created, a new thread will be created,
which might be unnecessary.
I think a better design is to add a common thread pool like
`ForkJoinPool#commonPool` and a config in `MetadataCacheConfig` to specify a
customized executor if users want.
##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java:
##########
@@ -321,22 +330,33 @@ public void accept(Notification t) {
}
}
- private CompletableFuture<T>
executeWithRetry(Supplier<CompletableFuture<T>> op, String key) {
- CompletableFuture<T> result = new CompletableFuture<>();
+ private void execute(Supplier<CompletableFuture<T>> op, String key,
CompletableFuture<T> result, Backoff backoff) {
op.get().thenAccept(result::complete).exceptionally((ex) -> {
if (ex.getCause() instanceof BadVersionException) {
// if resource is updated by other than metadata-cache then
metadata-cache will get bad-version
// exception. so, try to invalidate the cache and try one more
time.
objCache.synchronous().invalidate(key);
- op.get().thenAccept(result::complete).exceptionally((ex1) -> {
- result.completeExceptionally(ex1.getCause());
+ if (backoff.isMandatoryStopMade()) {
+ result.completeExceptionally(new
TimeoutException(String.format("Timeout to update key %s", key)));
Review Comment:
It's better to print the elapsed time.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]