RobertIndie commented on code in PR #23686:
URL: https://github.com/apache/pulsar/pull/23686#discussion_r1875254225
##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java:
##########
@@ -321,22 +325,25 @@ public void accept(Notification t) {
}
}
- private CompletableFuture<T>
executeWithRetry(Supplier<CompletableFuture<T>> op, String key) {
- CompletableFuture<T> result = new CompletableFuture<>();
+ private void execute(Supplier<CompletableFuture<T>> op, String key,
CompletableFuture<T> result, Backoff backoff) {
op.get().thenAccept(result::complete).exceptionally((ex) -> {
if (ex.getCause() instanceof BadVersionException) {
// if resource is updated by other than metadata-cache then
metadata-cache will get bad-version
// exception. so, try to invalidate the cache and try one more
time.
objCache.synchronous().invalidate(key);
- op.get().thenAccept(result::complete).exceptionally((ex1) -> {
- result.completeExceptionally(ex1.getCause());
- return null;
- });
+ backoffExecutor.schedule(() -> execute(op, key, result,
backoff), backoff.next(),
+ TimeUnit.MILLISECONDS);
return null;
}
result.completeExceptionally(ex.getCause());
return null;
});
+ }
+
+ private CompletableFuture<T>
executeWithRetry(Supplier<CompletableFuture<T>> op, String key) {
+ final var backoff = new Backoff(100, TimeUnit.MILLISECONDS, 1,
TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);
Review Comment:
Now the problem becomes whether we should throw the
ConcurrentModificationException/BadVersionException. From a design perspective
and according to [the readModifyUpdate
documentation](https://github.com/apache/pulsar/blob/216b83008deb469e0fc55ed8117f0c393ebcb0ac/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCache.java#L108),
it seems that MetadataCacheImpl should be responsible for retrying and should
not throw this exception.
Since readModifyUpdate does not require users to provide a version, it does
not seem reasonable to throw a
BadVersionException/ConcurrentModificationException to the caller. If the
caller needs to ensure that data is not accidentally overwritten, throwing a
BadVersionException/ConcurrentModificationException will not solve this
problem. Therefore, unless we can explicitly pass in a version, we cannot
achieve this semantic.
Besides, I found that introducing unlimited retries in MetadataCache is also
not reasonable. We can use `metadataStoreOperationTimeoutSeconds` as the
mandatory stop time for retries and throw the TimeoutException if all retires
are failed. Also we can reduce the maximum timeout time following
@BewareMyPower 's
[suggestion](https://github.com/streamnative/ursa-storage/pull/548#discussion_r1873286488).
WDYT? @codelipenghui @BewareMyPower
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]