codelipenghui commented on a change in pull request #8818:
URL: https://github.com/apache/pulsar/pull/8818#discussion_r548529963
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
##########
@@ -715,6 +720,11 @@ protected static PartitionedTopicMetadata
fetchPartitionedTopicMetadataCheckAllo
.get();
} catch (Exception e) {
if (e.getCause() instanceof RestException) {
+ // Since CompletableFuture wrappers exception, so we will lost
the Status
+ // rebuild the Status if exception message contains "Topic not
exist."
+ if (e.getCause().getMessage().contains("Topic not exist.")) {
+ throw new
org.apache.pulsar.broker.web.RestException(Response.Status.NOT_FOUND, "Topic
not exist.");
+ }
Review comment:
Same as the above comment.
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
##########
@@ -702,6 +702,11 @@ protected static PartitionedTopicMetadata
fetchPartitionedTopicMetadata(PulsarSe
return
pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(topicName).get();
} catch (Exception e) {
if (e.getCause() instanceof RestException) {
+ // Since CompletableFuture wrappers exception, so we will lost
the Status
+ // rebuild the Status if exception message contains "Topic not
exist."
+ if (e.getCause().getMessage().contains("Topic not exist.")) {
+ throw new
org.apache.pulsar.broker.web.RestException(Response.Status.NOT_FOUND, "Topic
not exist.");
+ }
Review comment:
Could you please clarify why `throw (RestException) e.getCause()` does
not work here? If e.getCause is instance of RestException, I think we can throw
it directly. I might be missing something here.
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -456,10 +456,17 @@ protected void internalCreateNonPartitionedTopic(boolean
authoritative) {
validateTopicOwnership(topicName, authoritative);
- PartitionedTopicMetadata partitionMetadata =
getPartitionedTopicMetadata(topicName, authoritative, false);
- if (partitionMetadata.partitions > 0) {
- log.warn("[{}] Partitioned topic with the same name already exists
{}", clientAppId(), topicName);
- throw new RestException(Status.CONFLICT, "This topic already
exists");
+ PartitionedTopicMetadata partitionMetadata = null;
+ try {
+ partitionMetadata = getPartitionedTopicMetadata(topicName,
authoritative, false);
+ if (partitionMetadata.partitions > 0) {
+ log.warn("[{}] Partitioned topic with the same name already
exists {}", clientAppId(), topicName);
+ throw new RestException(Status.CONFLICT, "This topic already
exists");
+ }
+ } catch (RestException restException) {
+ if (Status.NOT_FOUND.getStatusCode() !=
restException.getResponse().getStatus()) {
+ throw restException;
Review comment:
I think it's not correct handling here, If got 404 here, it means the
partitioned metadata does not exist right? If the partitioned metadata does
exist, why should we prevent the non-partitioned topic creation?
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
##########
@@ -1174,13 +1174,19 @@ public ServiceUnitId getServiceUnitId(TopicName
topicName) throws Exception {
}
private CompletableFuture<List<String>> getPartitionsForTopic(TopicName
topicName) {
- return
pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(topicName).thenCompose(meta
-> {
+ CompletableFuture<List<String>> future = new CompletableFuture<>();
+
pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(topicName).whenComplete((meta,
t) -> {
+ if (t != null) {
+ future.completeExceptionally(t);
+ return;
+ }
List<String> result = Lists.newArrayList();
for (int i = 0; i < meta.partitions; i++) {
result.add(topicName.getPartition(i).toString());
}
- return CompletableFuture.completedFuture(result);
+ future.complete(result);
});
+ return future;
Review comment:
What is the difference between these changes? Why we change the
`thenCompose` to `whenComplete`?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]