mattisonchao commented on a change in pull request #14367:
URL: https://github.com/apache/pulsar/pull/14367#discussion_r817403838
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
##########
@@ -343,41 +349,71 @@ public void healthCheck(@Suspended AsyncResponse
asyncResponse,
String messageStr = UUID.randomUUID().toString();
// create non-partitioned topic manually and close the previous reader
if present.
return pulsar().getBrokerService().getTopic(topicName, true)
- // check and clean all subscriptions
.thenCompose(topicOptional -> {
if (!topicOptional.isPresent()) {
LOG.error("[{}] Fail to run health check while get
topic {}. because get null value.",
clientAppId(), topicName);
- throw new RestException(Status.NOT_FOUND, "Topic [{}]
not found after create.");
+ throw new RestException(Status.NOT_FOUND,
+ String.format("Topic [%s] not found after
create.", topicName));
}
- Topic topic = topicOptional.get();
- // clean all subscriptions
- return
FutureUtil.waitForAll(topic.getSubscriptions().values()
-
.stream().map(Subscription::deleteForcefully).collect(Collectors.toList()))
- .thenApply(__ -> topic);
- }).thenCompose(topic -> {
try {
PulsarClient client = pulsar().getClient();
return
client.newProducer(Schema.STRING).topic(topicName).createAsync()
-
.thenCombine(client.newReader(Schema.STRING).topic(topicName)
+
.thenCombine(client.newReader(Schema.STRING).topic(topicName)
.startMessageId(MessageId.latest).createAsync(), (producer, reader) ->
-
producer.sendAsync(messageStr).thenCompose(__ ->
-
healthCheckRecursiveReadNext(reader, messageStr))
- .thenCompose(__ -> {
-
List<CompletableFuture<Void>> closeFutures =
- new
ArrayList<>();
-
closeFutures.add(producer.closeAsync());
-
closeFutures.add(reader.closeAsync());
- return
FutureUtil.waitForAll(closeFutures);
- })
- ).thenAccept(ignore -> {});
+
producer.sendAsync(messageStr).thenCompose(__ ->
+
healthCheckRecursiveReadNext(reader, messageStr))
+ .thenCompose(__ -> {
Review comment:
@eolivelli
Yes, I totally agree with you. I will change here to use ``whenComplete``.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]