codelipenghui commented on a change in pull request #13845:
URL: https://github.com/apache/pulsar/pull/13845#discussion_r788346085
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -714,70 +714,73 @@ protected void
internalDeletePartitionedTopic(AsyncResponse asyncResponse, boole
protected void internalUnloadTopic(AsyncResponse asyncResponse, boolean
authoritative) {
log.info("[{}] Unloading topic {}", clientAppId(), topicName);
- try {
- if (topicName.isGlobal()) {
- validateGlobalNamespaceOwnership(namespaceName);
- }
- } catch (Exception e) {
- log.error("[{}] Failed to unload topic {}", clientAppId(),
topicName, e);
- resumeAsyncResponseExceptionally(asyncResponse, e);
- return;
- }
- // If the topic name is a partition name, no need to get partition
topic metadata again
- if (topicName.isPartitioned()) {
- if (checkTopicIsTransactionCoordinatorAssign(topicName)) {
- internalUnloadTransactionCoordinator(asyncResponse,
authoritative);
- } else {
- internalUnloadNonPartitionedTopic(asyncResponse,
authoritative);
- }
- } else {
- getPartitionedTopicMetadataAsync(topicName, authoritative, false)
- .thenAccept(meta -> {
- if (meta.partitions > 0) {
- final List<CompletableFuture<Void>> futures =
Lists.newArrayList();
-
- for (int i = 0; i < meta.partitions; i++) {
- TopicName topicNamePartition =
topicName.getPartition(i);
- try {
-
futures.add(pulsar().getAdminClient().topics().unloadAsync(
- topicNamePartition.toString()));
- } catch (Exception e) {
- log.error("[{}] Failed to unload topic
{}", clientAppId(), topicNamePartition, e);
- asyncResponse.resume(new RestException(e));
- return;
- }
- }
-
- FutureUtil.waitForAll(futures).handle((result,
exception) -> {
- if (exception != null) {
- Throwable th = exception.getCause();
- if (th instanceof NotFoundException) {
- asyncResponse.resume(new
RestException(Status.NOT_FOUND, th.getMessage()));
- } else if (th instanceof
WebApplicationException) {
- asyncResponse.resume(th);
- } else {
- log.error("[{}] Failed to unload topic
{}", clientAppId(), topicName,
- exception);
- asyncResponse.resume(new
RestException(exception));
- }
- } else {
-
asyncResponse.resume(Response.noContent().build());
- }
- return null;
- });
- } else {
- internalUnloadNonPartitionedTopic(asyncResponse,
authoritative);
- }
- }).exceptionally(t -> {
- log.error("[{}] Failed to unload topic {}", clientAppId(),
topicName, t);
- if (t instanceof WebApplicationException) {
- asyncResponse.resume(t);
- } else {
- asyncResponse.resume(new RestException(t));
- }
- return null;
- });
- }
+ CompletableFuture<Void> future =
CompletableFuture.completedFuture(null);
+ if (topicName.isGlobal()) {
+ future = validateGlobalNamespaceOwnershipAsync(namespaceName);
+ }
+ future.thenAccept(__ ->{
+ // If the topic name is a partition name, no need to get partition
topic metadata again
+ if (topicName.isPartitioned()) {
+ if (checkTopicIsTransactionCoordinatorAssign(topicName)) {
+ internalUnloadTransactionCoordinatorAsync(asyncResponse,
authoritative);
+ } else {
+ internalUnloadNonPartitionedTopicAsync(asyncResponse,
authoritative);
+ }
+ } else {
+ getPartitionedTopicMetadataAsync(topicName, authoritative,
false)
+ .thenAccept(meta -> {
+ if (meta.partitions > 0) {
+ final List<CompletableFuture<Void>> futures =
Lists.newArrayList();
Review comment:
If we know the partitions, we'd better specify the size of the array
list to avoid the expansion of the array list.
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -714,70 +714,73 @@ protected void
internalDeletePartitionedTopic(AsyncResponse asyncResponse, boole
protected void internalUnloadTopic(AsyncResponse asyncResponse, boolean
authoritative) {
log.info("[{}] Unloading topic {}", clientAppId(), topicName);
- try {
- if (topicName.isGlobal()) {
- validateGlobalNamespaceOwnership(namespaceName);
- }
- } catch (Exception e) {
- log.error("[{}] Failed to unload topic {}", clientAppId(),
topicName, e);
- resumeAsyncResponseExceptionally(asyncResponse, e);
- return;
- }
- // If the topic name is a partition name, no need to get partition
topic metadata again
- if (topicName.isPartitioned()) {
- if (checkTopicIsTransactionCoordinatorAssign(topicName)) {
- internalUnloadTransactionCoordinator(asyncResponse,
authoritative);
- } else {
- internalUnloadNonPartitionedTopic(asyncResponse,
authoritative);
- }
- } else {
- getPartitionedTopicMetadataAsync(topicName, authoritative, false)
- .thenAccept(meta -> {
- if (meta.partitions > 0) {
- final List<CompletableFuture<Void>> futures =
Lists.newArrayList();
-
- for (int i = 0; i < meta.partitions; i++) {
- TopicName topicNamePartition =
topicName.getPartition(i);
- try {
-
futures.add(pulsar().getAdminClient().topics().unloadAsync(
- topicNamePartition.toString()));
- } catch (Exception e) {
- log.error("[{}] Failed to unload topic
{}", clientAppId(), topicNamePartition, e);
- asyncResponse.resume(new RestException(e));
- return;
- }
- }
-
- FutureUtil.waitForAll(futures).handle((result,
exception) -> {
- if (exception != null) {
- Throwable th = exception.getCause();
- if (th instanceof NotFoundException) {
- asyncResponse.resume(new
RestException(Status.NOT_FOUND, th.getMessage()));
- } else if (th instanceof
WebApplicationException) {
- asyncResponse.resume(th);
- } else {
- log.error("[{}] Failed to unload topic
{}", clientAppId(), topicName,
- exception);
- asyncResponse.resume(new
RestException(exception));
- }
- } else {
-
asyncResponse.resume(Response.noContent().build());
- }
- return null;
- });
- } else {
- internalUnloadNonPartitionedTopic(asyncResponse,
authoritative);
- }
- }).exceptionally(t -> {
- log.error("[{}] Failed to unload topic {}", clientAppId(),
topicName, t);
- if (t instanceof WebApplicationException) {
- asyncResponse.resume(t);
- } else {
- asyncResponse.resume(new RestException(t));
- }
- return null;
- });
- }
+ CompletableFuture<Void> future =
CompletableFuture.completedFuture(null);
+ if (topicName.isGlobal()) {
+ future = validateGlobalNamespaceOwnershipAsync(namespaceName);
+ }
+ future.thenAccept(__ ->{
+ // If the topic name is a partition name, no need to get partition
topic metadata again
+ if (topicName.isPartitioned()) {
+ if (checkTopicIsTransactionCoordinatorAssign(topicName)) {
+ internalUnloadTransactionCoordinatorAsync(asyncResponse,
authoritative);
+ } else {
+ internalUnloadNonPartitionedTopicAsync(asyncResponse,
authoritative);
+ }
+ } else {
+ getPartitionedTopicMetadataAsync(topicName, authoritative,
false)
+ .thenAccept(meta -> {
+ if (meta.partitions > 0) {
+ final List<CompletableFuture<Void>> futures =
Lists.newArrayList();
+
+ for (int i = 0; i < meta.partitions; i++) {
+ TopicName topicNamePartition =
topicName.getPartition(i);
+ try {
+
futures.add(pulsar().getAdminClient().topics().unloadAsync(
+ topicNamePartition.toString()));
+ } catch (Exception e) {
+ log.error("[{}] Failed to unload topic
{}", clientAppId(),
+ topicNamePartition, e);
+ asyncResponse.resume(new
RestException(e));
+ return;
+ }
+ }
+
+ FutureUtil.waitForAll(futures).handle((result,
exception) -> {
+ if (exception != null) {
+ Throwable th = exception.getCause();
+ if (th instanceof NotFoundException) {
+ asyncResponse.resume(new
RestException(Status.NOT_FOUND, th.getMessage()));
+ } else if (th instanceof
WebApplicationException) {
+ asyncResponse.resume(th);
+ } else {
+ log.error("[{}] Failed to unload
topic {}", clientAppId(), topicName,
+ exception);
+ asyncResponse.resume(new
RestException(exception));
+ }
+ } else {
+
asyncResponse.resume(Response.noContent().build());
+ }
+ return null;
+ });
+ } else {
+
internalUnloadNonPartitionedTopicAsync(asyncResponse, authoritative);
+ }
+ }).exceptionally(t -> {
+ log.error("[{}] Failed to unload topic {}",
clientAppId(), topicName, t);
+ if (t instanceof WebApplicationException) {
+ asyncResponse.resume(t);
+ } else {
+ asyncResponse.resume(new RestException(t));
+ }
+ return null;
+ });
+ }
+ }).exceptionally(ex -> {
+ Throwable cause = ex.getCause();
+ log.error("[{}] Failed to unload topic {}", clientAppId(),
topicName, cause);
Review comment:
Failed to validate the global namespace ownership while unloading topic
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -714,70 +714,73 @@ protected void
internalDeletePartitionedTopic(AsyncResponse asyncResponse, boole
protected void internalUnloadTopic(AsyncResponse asyncResponse, boolean
authoritative) {
log.info("[{}] Unloading topic {}", clientAppId(), topicName);
- try {
- if (topicName.isGlobal()) {
- validateGlobalNamespaceOwnership(namespaceName);
- }
- } catch (Exception e) {
- log.error("[{}] Failed to unload topic {}", clientAppId(),
topicName, e);
- resumeAsyncResponseExceptionally(asyncResponse, e);
- return;
- }
- // If the topic name is a partition name, no need to get partition
topic metadata again
- if (topicName.isPartitioned()) {
- if (checkTopicIsTransactionCoordinatorAssign(topicName)) {
- internalUnloadTransactionCoordinator(asyncResponse,
authoritative);
- } else {
- internalUnloadNonPartitionedTopic(asyncResponse,
authoritative);
- }
- } else {
- getPartitionedTopicMetadataAsync(topicName, authoritative, false)
- .thenAccept(meta -> {
- if (meta.partitions > 0) {
- final List<CompletableFuture<Void>> futures =
Lists.newArrayList();
-
- for (int i = 0; i < meta.partitions; i++) {
- TopicName topicNamePartition =
topicName.getPartition(i);
- try {
-
futures.add(pulsar().getAdminClient().topics().unloadAsync(
- topicNamePartition.toString()));
- } catch (Exception e) {
- log.error("[{}] Failed to unload topic
{}", clientAppId(), topicNamePartition, e);
- asyncResponse.resume(new RestException(e));
- return;
- }
- }
-
- FutureUtil.waitForAll(futures).handle((result,
exception) -> {
- if (exception != null) {
- Throwable th = exception.getCause();
- if (th instanceof NotFoundException) {
- asyncResponse.resume(new
RestException(Status.NOT_FOUND, th.getMessage()));
- } else if (th instanceof
WebApplicationException) {
- asyncResponse.resume(th);
- } else {
- log.error("[{}] Failed to unload topic
{}", clientAppId(), topicName,
- exception);
- asyncResponse.resume(new
RestException(exception));
- }
- } else {
-
asyncResponse.resume(Response.noContent().build());
- }
- return null;
- });
- } else {
- internalUnloadNonPartitionedTopic(asyncResponse,
authoritative);
- }
- }).exceptionally(t -> {
- log.error("[{}] Failed to unload topic {}", clientAppId(),
topicName, t);
- if (t instanceof WebApplicationException) {
- asyncResponse.resume(t);
- } else {
- asyncResponse.resume(new RestException(t));
- }
- return null;
- });
- }
+ CompletableFuture<Void> future =
CompletableFuture.completedFuture(null);
+ if (topicName.isGlobal()) {
+ future = validateGlobalNamespaceOwnershipAsync(namespaceName);
+ }
+ future.thenAccept(__ ->{
+ // If the topic name is a partition name, no need to get partition
topic metadata again
+ if (topicName.isPartitioned()) {
+ if (checkTopicIsTransactionCoordinatorAssign(topicName)) {
+ internalUnloadTransactionCoordinatorAsync(asyncResponse,
authoritative);
+ } else {
+ internalUnloadNonPartitionedTopicAsync(asyncResponse,
authoritative);
+ }
+ } else {
+ getPartitionedTopicMetadataAsync(topicName, authoritative,
false)
+ .thenAccept(meta -> {
+ if (meta.partitions > 0) {
+ final List<CompletableFuture<Void>> futures =
Lists.newArrayList();
+
+ for (int i = 0; i < meta.partitions; i++) {
+ TopicName topicNamePartition =
topicName.getPartition(i);
+ try {
+
futures.add(pulsar().getAdminClient().topics().unloadAsync(
+ topicNamePartition.toString()));
+ } catch (Exception e) {
+ log.error("[{}] Failed to unload topic
{}", clientAppId(),
+ topicNamePartition, e);
+ asyncResponse.resume(new
RestException(e));
+ return;
+ }
+ }
+
+ FutureUtil.waitForAll(futures).handle((result,
exception) -> {
+ if (exception != null) {
+ Throwable th = exception.getCause();
+ if (th instanceof NotFoundException) {
+ asyncResponse.resume(new
RestException(Status.NOT_FOUND, th.getMessage()));
+ } else if (th instanceof
WebApplicationException) {
+ asyncResponse.resume(th);
+ } else {
+ log.error("[{}] Failed to unload
topic {}", clientAppId(), topicName,
+ exception);
+ asyncResponse.resume(new
RestException(exception));
+ }
+ } else {
+
asyncResponse.resume(Response.noContent().build());
+ }
+ return null;
+ });
+ } else {
+
internalUnloadNonPartitionedTopicAsync(asyncResponse, authoritative);
+ }
+ }).exceptionally(t -> {
+ log.error("[{}] Failed to unload topic {}",
clientAppId(), topicName, t);
Review comment:
Should be `Failed to get partitioned metadata while unloading topic {}`
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -969,50 +972,42 @@ protected void internalUnloadTopic(AsyncResponse
asyncResponse, boolean authorit
});
}
- private void internalUnloadNonPartitionedTopic(AsyncResponse
asyncResponse, boolean authoritative) {
- try {
- validateTopicOperation(topicName, TopicOperation.UNLOAD);
- } catch (Exception e) {
- log.error("[{}] Failed to unload topic {},{}", clientAppId(),
topicName, e.getMessage());
- resumeAsyncResponseExceptionally(asyncResponse, e);
- return;
- }
-
- validateTopicOwnershipAsync(topicName, authoritative)
- .thenCompose(__ -> getTopicReferenceAsync(topicName))
- .thenCompose(topic -> topic.close(false))
- .thenRun(() -> {
- log.info("[{}] Successfully unloaded topic {}",
clientAppId(), topicName);
- asyncResponse.resume(Response.noContent().build());
- })
+ private void internalUnloadNonPartitionedTopicAsync(AsyncResponse
asyncResponse, boolean authoritative) {
+ validateTopicOperationAsync(topicName, TopicOperation.UNLOAD)
+ .thenCompose(unused -> validateTopicOwnershipAsync(topicName,
authoritative)
+ .thenCompose(__ -> getTopicReferenceAsync(topicName))
+ .thenCompose(topic -> topic.close(false))
+ .thenRun(() -> {
+ log.info("[{}] Successfully unloaded topic {}",
clientAppId(), topicName);
+ asyncResponse.resume(Response.noContent().build());
+ }))
.exceptionally(ex -> {
- log.error("[{}] Failed to unload topic {}, {}",
clientAppId(), topicName, ex.getMessage());
- asyncResponse.resume(ex.getCause());
+ Throwable cause = ex.getCause();
+ log.error("[{}] Failed to unload topic {}, {}",
clientAppId(), topicName, cause);
+ resumeAsyncResponseExceptionally(asyncResponse, cause);
return null;
});
}
- private void internalUnloadTransactionCoordinator(AsyncResponse
asyncResponse, boolean authoritative) {
- try {
- validateTopicOperation(topicName, TopicOperation.UNLOAD);
- } catch (Exception e) {
- log.error("[{}] Failed to unload tc {},{}", clientAppId(),
topicName.getPartitionIndex(), e.getMessage());
- resumeAsyncResponseExceptionally(asyncResponse, e);
- return;
- }
- validateTopicOwnershipAsync(topicName, authoritative)
- .thenCompose(v -> pulsar()
- .getTransactionMetadataStoreService()
-
.removeTransactionMetadataStore(TransactionCoordinatorID.get(topicName.getPartitionIndex())))
- .thenRun(() -> {
- log.info("[{}] Successfully unloaded tc {}",
clientAppId(), topicName.getPartitionIndex());
- asyncResponse.resume(Response.noContent().build());
- }).exceptionally(ex -> {
- log.error("[{}] Failed to unload tc {}, {}",
clientAppId(), topicName.getPartitionIndex(),
- ex.getMessage());
- asyncResponse.resume(ex.getCause());
- return null;
- });
+ private void internalUnloadTransactionCoordinatorAsync(AsyncResponse
asyncResponse, boolean authoritative) {
+ validateTopicOperationAsync(topicName, TopicOperation.UNLOAD)
Review comment:
Hmmm, looks like we make the wrong permission before, for the
transaction coordinator, only the superuser can unload it. @congbobo184 Could
you please help confirm?
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -969,50 +972,42 @@ protected void internalUnloadTopic(AsyncResponse
asyncResponse, boolean authorit
});
}
- private void internalUnloadNonPartitionedTopic(AsyncResponse
asyncResponse, boolean authoritative) {
- try {
- validateTopicOperation(topicName, TopicOperation.UNLOAD);
- } catch (Exception e) {
- log.error("[{}] Failed to unload topic {},{}", clientAppId(),
topicName, e.getMessage());
- resumeAsyncResponseExceptionally(asyncResponse, e);
- return;
- }
-
- validateTopicOwnershipAsync(topicName, authoritative)
- .thenCompose(__ -> getTopicReferenceAsync(topicName))
- .thenCompose(topic -> topic.close(false))
- .thenRun(() -> {
- log.info("[{}] Successfully unloaded topic {}",
clientAppId(), topicName);
- asyncResponse.resume(Response.noContent().build());
- })
+ private void internalUnloadNonPartitionedTopicAsync(AsyncResponse
asyncResponse, boolean authoritative) {
+ validateTopicOperationAsync(topicName, TopicOperation.UNLOAD)
+ .thenCompose(unused -> validateTopicOwnershipAsync(topicName,
authoritative)
+ .thenCompose(__ -> getTopicReferenceAsync(topicName))
+ .thenCompose(topic -> topic.close(false))
+ .thenRun(() -> {
+ log.info("[{}] Successfully unloaded topic {}",
clientAppId(), topicName);
+ asyncResponse.resume(Response.noContent().build());
+ }))
.exceptionally(ex -> {
- log.error("[{}] Failed to unload topic {}, {}",
clientAppId(), topicName, ex.getMessage());
- asyncResponse.resume(ex.getCause());
+ Throwable cause = ex.getCause();
+ log.error("[{}] Failed to unload topic {}, {}",
clientAppId(), topicName, cause);
+ resumeAsyncResponseExceptionally(asyncResponse, cause);
return null;
});
}
- private void internalUnloadTransactionCoordinator(AsyncResponse
asyncResponse, boolean authoritative) {
- try {
- validateTopicOperation(topicName, TopicOperation.UNLOAD);
- } catch (Exception e) {
- log.error("[{}] Failed to unload tc {},{}", clientAppId(),
topicName.getPartitionIndex(), e.getMessage());
- resumeAsyncResponseExceptionally(asyncResponse, e);
- return;
- }
- validateTopicOwnershipAsync(topicName, authoritative)
- .thenCompose(v -> pulsar()
- .getTransactionMetadataStoreService()
-
.removeTransactionMetadataStore(TransactionCoordinatorID.get(topicName.getPartitionIndex())))
- .thenRun(() -> {
- log.info("[{}] Successfully unloaded tc {}",
clientAppId(), topicName.getPartitionIndex());
- asyncResponse.resume(Response.noContent().build());
- }).exceptionally(ex -> {
- log.error("[{}] Failed to unload tc {}, {}",
clientAppId(), topicName.getPartitionIndex(),
- ex.getMessage());
- asyncResponse.resume(ex.getCause());
- return null;
- });
+ private void internalUnloadTransactionCoordinatorAsync(AsyncResponse
asyncResponse, boolean authoritative) {
+ validateTopicOperationAsync(topicName, TopicOperation.UNLOAD)
Review comment:
It's ok to push a separate PR to fix this one, to make this PR more
focus on what it wants to do
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -714,70 +714,73 @@ protected void
internalDeletePartitionedTopic(AsyncResponse asyncResponse, boole
protected void internalUnloadTopic(AsyncResponse asyncResponse, boolean
authoritative) {
log.info("[{}] Unloading topic {}", clientAppId(), topicName);
- try {
- if (topicName.isGlobal()) {
- validateGlobalNamespaceOwnership(namespaceName);
- }
- } catch (Exception e) {
- log.error("[{}] Failed to unload topic {}", clientAppId(),
topicName, e);
- resumeAsyncResponseExceptionally(asyncResponse, e);
- return;
- }
- // If the topic name is a partition name, no need to get partition
topic metadata again
- if (topicName.isPartitioned()) {
- if (checkTopicIsTransactionCoordinatorAssign(topicName)) {
- internalUnloadTransactionCoordinator(asyncResponse,
authoritative);
- } else {
- internalUnloadNonPartitionedTopic(asyncResponse,
authoritative);
- }
- } else {
- getPartitionedTopicMetadataAsync(topicName, authoritative, false)
- .thenAccept(meta -> {
- if (meta.partitions > 0) {
- final List<CompletableFuture<Void>> futures =
Lists.newArrayList();
-
- for (int i = 0; i < meta.partitions; i++) {
- TopicName topicNamePartition =
topicName.getPartition(i);
- try {
-
futures.add(pulsar().getAdminClient().topics().unloadAsync(
- topicNamePartition.toString()));
- } catch (Exception e) {
- log.error("[{}] Failed to unload topic
{}", clientAppId(), topicNamePartition, e);
- asyncResponse.resume(new RestException(e));
- return;
- }
- }
-
- FutureUtil.waitForAll(futures).handle((result,
exception) -> {
- if (exception != null) {
- Throwable th = exception.getCause();
- if (th instanceof NotFoundException) {
- asyncResponse.resume(new
RestException(Status.NOT_FOUND, th.getMessage()));
- } else if (th instanceof
WebApplicationException) {
- asyncResponse.resume(th);
- } else {
- log.error("[{}] Failed to unload topic
{}", clientAppId(), topicName,
- exception);
- asyncResponse.resume(new
RestException(exception));
- }
- } else {
-
asyncResponse.resume(Response.noContent().build());
- }
- return null;
- });
- } else {
- internalUnloadNonPartitionedTopic(asyncResponse,
authoritative);
- }
- }).exceptionally(t -> {
- log.error("[{}] Failed to unload topic {}", clientAppId(),
topicName, t);
- if (t instanceof WebApplicationException) {
- asyncResponse.resume(t);
- } else {
- asyncResponse.resume(new RestException(t));
- }
- return null;
- });
- }
+ CompletableFuture<Void> future =
CompletableFuture.completedFuture(null);
+ if (topicName.isGlobal()) {
+ future = validateGlobalNamespaceOwnershipAsync(namespaceName);
+ }
Review comment:
```suggestion
CompletableFuture<Void> future = null;
if (topicName.isGlobal()) {
future = validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
future = CompletableFuture.completedFuture(null);
}
```
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -714,70 +714,73 @@ protected void
internalDeletePartitionedTopic(AsyncResponse asyncResponse, boole
protected void internalUnloadTopic(AsyncResponse asyncResponse, boolean
authoritative) {
log.info("[{}] Unloading topic {}", clientAppId(), topicName);
- try {
- if (topicName.isGlobal()) {
- validateGlobalNamespaceOwnership(namespaceName);
- }
- } catch (Exception e) {
- log.error("[{}] Failed to unload topic {}", clientAppId(),
topicName, e);
- resumeAsyncResponseExceptionally(asyncResponse, e);
- return;
- }
- // If the topic name is a partition name, no need to get partition
topic metadata again
- if (topicName.isPartitioned()) {
- if (checkTopicIsTransactionCoordinatorAssign(topicName)) {
- internalUnloadTransactionCoordinator(asyncResponse,
authoritative);
- } else {
- internalUnloadNonPartitionedTopic(asyncResponse,
authoritative);
- }
- } else {
- getPartitionedTopicMetadataAsync(topicName, authoritative, false)
- .thenAccept(meta -> {
- if (meta.partitions > 0) {
- final List<CompletableFuture<Void>> futures =
Lists.newArrayList();
-
- for (int i = 0; i < meta.partitions; i++) {
- TopicName topicNamePartition =
topicName.getPartition(i);
- try {
-
futures.add(pulsar().getAdminClient().topics().unloadAsync(
- topicNamePartition.toString()));
- } catch (Exception e) {
- log.error("[{}] Failed to unload topic
{}", clientAppId(), topicNamePartition, e);
- asyncResponse.resume(new RestException(e));
- return;
- }
- }
-
- FutureUtil.waitForAll(futures).handle((result,
exception) -> {
- if (exception != null) {
- Throwable th = exception.getCause();
- if (th instanceof NotFoundException) {
- asyncResponse.resume(new
RestException(Status.NOT_FOUND, th.getMessage()));
- } else if (th instanceof
WebApplicationException) {
- asyncResponse.resume(th);
- } else {
- log.error("[{}] Failed to unload topic
{}", clientAppId(), topicName,
- exception);
- asyncResponse.resume(new
RestException(exception));
- }
- } else {
-
asyncResponse.resume(Response.noContent().build());
- }
- return null;
- });
- } else {
- internalUnloadNonPartitionedTopic(asyncResponse,
authoritative);
- }
- }).exceptionally(t -> {
- log.error("[{}] Failed to unload topic {}", clientAppId(),
topicName, t);
- if (t instanceof WebApplicationException) {
- asyncResponse.resume(t);
- } else {
- asyncResponse.resume(new RestException(t));
- }
- return null;
- });
- }
+ CompletableFuture<Void> future =
CompletableFuture.completedFuture(null);
+ if (topicName.isGlobal()) {
+ future = validateGlobalNamespaceOwnershipAsync(namespaceName);
+ }
+ future.thenAccept(__ ->{
Review comment:
```suggestion
future.thenAccept(__ -> {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]