congbobo184 commented on a change in pull request #10963:
URL: https://github.com/apache/pulsar/pull/10963#discussion_r654312778
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
##########
@@ -672,55 +673,57 @@ protected void
internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n
"Number of partitions should be less than or equal to " +
maxPartitions));
return;
}
+
+ List<CompletableFuture<Void>> createFutureList = new ArrayList<>();
+
+ CompletableFuture<Void> createLocalFuture = new CompletableFuture<>();
+ createFutureList.add(createLocalFuture);
checkTopicExistsAsync(topicName).thenAccept(exists -> {
if (exists) {
log.warn("[{}] Failed to create already existing topic {}",
clientAppId(), topicName);
asyncResponse.resume(new RestException(Status.CONFLICT, "This
topic already exists"));
Review comment:
if local exist but remote cluster don't have, do we need to check this
logical?
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
##########
@@ -672,55 +673,57 @@ protected void
internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n
"Number of partitions should be less than or equal to " +
maxPartitions));
return;
}
+
+ List<CompletableFuture<Void>> createFutureList = new ArrayList<>();
+
+ CompletableFuture<Void> createLocalFuture = new CompletableFuture<>();
+ createFutureList.add(createLocalFuture);
checkTopicExistsAsync(topicName).thenAccept(exists -> {
if (exists) {
log.warn("[{}] Failed to create already existing topic {}",
clientAppId(), topicName);
asyncResponse.resume(new RestException(Status.CONFLICT, "This
topic already exists"));
- } else {
-
- try {
- String path = ZkAdminPaths.partitionedTopicPath(topicName);
- namespaceResources().getPartitionedTopicResources()
- .createAsync(path, new
PartitionedTopicMetadata(numPartitions)).thenAccept(r -> {
- log.info("[{}] Successfully created
partitioned topic {}", clientAppId(), topicName);
-
tryCreatePartitionsAsync(numPartitions).thenAccept(v -> {
- log.info("[{}] Successfully created
partitions for topic {}", clientAppId(),
- topicName);
-
asyncResponse.resume(Response.noContent().build());
- }).exceptionally(e -> {
- log.error("[{}] Failed to create
partitions for topic {}", clientAppId(),
- topicName);
- // The partitioned topic is created but
there are some partitions create failed
- asyncResponse.resume(new RestException(e));
- return null;
- });
- }).exceptionally(ex -> {
- if (ex.getCause() instanceof
AlreadyExistsException) {
- log.warn("[{}] Failed to create already
existing partitioned topic {}",
- clientAppId(), topicName);
- asyncResponse.resume(
- new RestException(Status.CONFLICT,
"Partitioned topic already exists"));
- } else if (ex.getCause() instanceof
BadVersionException) {
- log.warn("[{}] Failed to create
partitioned topic {}: concurrent modification",
- clientAppId(), topicName);
- asyncResponse.resume(new
RestException(Status.CONFLICT, "Concurrent modification"));
- } else {
- log.error("[{}] Failed to create
partitioned topic {}", clientAppId(), topicName,
- ex.getCause());
- asyncResponse.resume(new
RestException(ex.getCause()));
- }
- return null;
- });
- } catch (Exception e) {
- log.error("[{}] Failed to create partitioned topic {}",
clientAppId(), topicName, e);
- resumeAsyncResponseExceptionally(asyncResponse, e);
- }
+ return;
}
+
+ provisionPartitionedTopicPath(asyncResponse, numPartitions,
createLocalTopicOnly)
+ .thenCompose(ignored ->
tryCreatePartitionsAsync(numPartitions))
+ .whenComplete((ignored, ex) -> {
+ if (ex != null) {
+ createLocalFuture.completeExceptionally(ex);
+ return;
+ }
+ createLocalFuture.complete(null);
Review comment:
`createFutureList.add(tryCreatePartitionsAsync(numPartitions))`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]