lhotari commented on code in PR #17524:
URL: https://github.com/apache/pulsar/pull/17524#discussion_r1447088522
##########
pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java:
##########
@@ -69,6 +70,32 @@ public static <T> CompletableFuture<List<T>>
waitForAll(Stream<CompletableFuture
})));
}
+ /**
+ * Make {@param dest} completed as {@param src}.
Review Comment:
Please add better description.
`completeAs` might not be the best possible name for this method.
`completeAfter` would be a better name.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1479,6 +1486,10 @@ public void deleteLedgerComplete(Object ctx) {
unfenceTopicToResume();
}
});
+
+ FutureUtil.completeAs(closeWithoutWaitingClientFuture, res);
+ FutureUtil.completeAs(fullyCloseFuture, res);
Review Comment:
What's the difference of `closeWithoutWaitingClientFuture` and
`fullyCloseFuture`? Is it the same in this case?
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1376,8 +1380,11 @@ private CompletableFuture<Void> delete(boolean
failIfHasSubscriptions,
}
fenceTopicToCloseOrDelete(); // Avoid clients reconnections while
deleting
+ // Mark the progress of close to prevent close calling
concurrently.
+ this.closeWithoutWaitingClientFuture = new CompletableFuture<>();
+ this.fullyCloseFuture = new CompletableFuture<>();
Review Comment:
Could these fields `closeWithoutWaitingClientFuture` and `fullyCloseFuture`
be final fields which have `CompletableFuture` instances that are created when
PersistentTopic is constructed?
The main motivation would be to ensure that thread safety doesn't break
later if these fields are after some later changes accessed without considering
thread safety that is currently provided by the lock.
##########
pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java:
##########
@@ -69,6 +70,32 @@ public static <T> CompletableFuture<List<T>>
waitForAll(Stream<CompletableFuture
})));
}
+ /**
+ * Make {@param dest} completed as {@param src}.
+ */
+ public static <T> void completeAs(final CompletableFuture<T> dest,
CompletableFuture<T> src){
+ src.whenComplete((v, ex) -> {
+ if (ex != null){
+ dest.completeExceptionally(ex);
+ } else {
+ dest.complete(v);
+ }
+ });
+ }
+
+ /**
+ * Make {@param dest} completed as {@param src}.
+ */
+ public static <T> void completeAs(final CompletableFuture<Void> dest,
CompletableFuture<T>...src){
Review Comment:
Add a better description. rename this method to `completeAfterAll`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]