mattisonchao commented on a change in pull request #13845:
URL: https://github.com/apache/pulsar/pull/13845#discussion_r788363349
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -714,70 +714,73 @@ protected void
internalDeletePartitionedTopic(AsyncResponse asyncResponse, boole
protected void internalUnloadTopic(AsyncResponse asyncResponse, boolean
authoritative) {
log.info("[{}] Unloading topic {}", clientAppId(), topicName);
- try {
- if (topicName.isGlobal()) {
- validateGlobalNamespaceOwnership(namespaceName);
- }
- } catch (Exception e) {
- log.error("[{}] Failed to unload topic {}", clientAppId(),
topicName, e);
- resumeAsyncResponseExceptionally(asyncResponse, e);
- return;
- }
- // If the topic name is a partition name, no need to get partition
topic metadata again
- if (topicName.isPartitioned()) {
- if (checkTopicIsTransactionCoordinatorAssign(topicName)) {
- internalUnloadTransactionCoordinator(asyncResponse,
authoritative);
- } else {
- internalUnloadNonPartitionedTopic(asyncResponse,
authoritative);
- }
- } else {
- getPartitionedTopicMetadataAsync(topicName, authoritative, false)
- .thenAccept(meta -> {
- if (meta.partitions > 0) {
- final List<CompletableFuture<Void>> futures =
Lists.newArrayList();
-
- for (int i = 0; i < meta.partitions; i++) {
- TopicName topicNamePartition =
topicName.getPartition(i);
- try {
-
futures.add(pulsar().getAdminClient().topics().unloadAsync(
- topicNamePartition.toString()));
- } catch (Exception e) {
- log.error("[{}] Failed to unload topic
{}", clientAppId(), topicNamePartition, e);
- asyncResponse.resume(new RestException(e));
- return;
- }
- }
-
- FutureUtil.waitForAll(futures).handle((result,
exception) -> {
- if (exception != null) {
- Throwable th = exception.getCause();
- if (th instanceof NotFoundException) {
- asyncResponse.resume(new
RestException(Status.NOT_FOUND, th.getMessage()));
- } else if (th instanceof
WebApplicationException) {
- asyncResponse.resume(th);
- } else {
- log.error("[{}] Failed to unload topic
{}", clientAppId(), topicName,
- exception);
- asyncResponse.resume(new
RestException(exception));
- }
- } else {
-
asyncResponse.resume(Response.noContent().build());
- }
- return null;
- });
- } else {
- internalUnloadNonPartitionedTopic(asyncResponse,
authoritative);
- }
- }).exceptionally(t -> {
- log.error("[{}] Failed to unload topic {}", clientAppId(),
topicName, t);
- if (t instanceof WebApplicationException) {
- asyncResponse.resume(t);
- } else {
- asyncResponse.resume(new RestException(t));
- }
- return null;
- });
- }
+ CompletableFuture<Void> future =
CompletableFuture.completedFuture(null);
+ if (topicName.isGlobal()) {
+ future = validateGlobalNamespaceOwnershipAsync(namespaceName);
+ }
+ future.thenAccept(__ ->{
+ // If the topic name is a partition name, no need to get partition
topic metadata again
+ if (topicName.isPartitioned()) {
+ if (checkTopicIsTransactionCoordinatorAssign(topicName)) {
+ internalUnloadTransactionCoordinatorAsync(asyncResponse,
authoritative);
+ } else {
+ internalUnloadNonPartitionedTopicAsync(asyncResponse,
authoritative);
+ }
+ } else {
+ getPartitionedTopicMetadataAsync(topicName, authoritative,
false)
+ .thenAccept(meta -> {
+ if (meta.partitions > 0) {
+ final List<CompletableFuture<Void>> futures =
Lists.newArrayList();
+
+ for (int i = 0; i < meta.partitions; i++) {
+ TopicName topicNamePartition =
topicName.getPartition(i);
+ try {
+
futures.add(pulsar().getAdminClient().topics().unloadAsync(
+ topicNamePartition.toString()));
+ } catch (Exception e) {
+ log.error("[{}] Failed to unload topic
{}", clientAppId(),
+ topicNamePartition, e);
+ asyncResponse.resume(new
RestException(e));
+ return;
+ }
+ }
+
+ FutureUtil.waitForAll(futures).handle((result,
exception) -> {
+ if (exception != null) {
+ Throwable th = exception.getCause();
+ if (th instanceof NotFoundException) {
+ asyncResponse.resume(new
RestException(Status.NOT_FOUND, th.getMessage()));
+ } else if (th instanceof
WebApplicationException) {
+ asyncResponse.resume(th);
+ } else {
+ log.error("[{}] Failed to unload
topic {}", clientAppId(), topicName,
+ exception);
+ asyncResponse.resume(new
RestException(exception));
+ }
+ } else {
+
asyncResponse.resume(Response.noContent().build());
+ }
+ return null;
+ });
+ } else {
+
internalUnloadNonPartitionedTopicAsync(asyncResponse, authoritative);
+ }
+ }).exceptionally(t -> {
+ log.error("[{}] Failed to unload topic {}",
clientAppId(), topicName, t);
Review comment:
fixed
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -714,70 +714,73 @@ protected void
internalDeletePartitionedTopic(AsyncResponse asyncResponse, boole
protected void internalUnloadTopic(AsyncResponse asyncResponse, boolean
authoritative) {
log.info("[{}] Unloading topic {}", clientAppId(), topicName);
- try {
- if (topicName.isGlobal()) {
- validateGlobalNamespaceOwnership(namespaceName);
- }
- } catch (Exception e) {
- log.error("[{}] Failed to unload topic {}", clientAppId(),
topicName, e);
- resumeAsyncResponseExceptionally(asyncResponse, e);
- return;
- }
- // If the topic name is a partition name, no need to get partition
topic metadata again
- if (topicName.isPartitioned()) {
- if (checkTopicIsTransactionCoordinatorAssign(topicName)) {
- internalUnloadTransactionCoordinator(asyncResponse,
authoritative);
- } else {
- internalUnloadNonPartitionedTopic(asyncResponse,
authoritative);
- }
- } else {
- getPartitionedTopicMetadataAsync(topicName, authoritative, false)
- .thenAccept(meta -> {
- if (meta.partitions > 0) {
- final List<CompletableFuture<Void>> futures =
Lists.newArrayList();
-
- for (int i = 0; i < meta.partitions; i++) {
- TopicName topicNamePartition =
topicName.getPartition(i);
- try {
-
futures.add(pulsar().getAdminClient().topics().unloadAsync(
- topicNamePartition.toString()));
- } catch (Exception e) {
- log.error("[{}] Failed to unload topic
{}", clientAppId(), topicNamePartition, e);
- asyncResponse.resume(new RestException(e));
- return;
- }
- }
-
- FutureUtil.waitForAll(futures).handle((result,
exception) -> {
- if (exception != null) {
- Throwable th = exception.getCause();
- if (th instanceof NotFoundException) {
- asyncResponse.resume(new
RestException(Status.NOT_FOUND, th.getMessage()));
- } else if (th instanceof
WebApplicationException) {
- asyncResponse.resume(th);
- } else {
- log.error("[{}] Failed to unload topic
{}", clientAppId(), topicName,
- exception);
- asyncResponse.resume(new
RestException(exception));
- }
- } else {
-
asyncResponse.resume(Response.noContent().build());
- }
- return null;
- });
- } else {
- internalUnloadNonPartitionedTopic(asyncResponse,
authoritative);
- }
- }).exceptionally(t -> {
- log.error("[{}] Failed to unload topic {}", clientAppId(),
topicName, t);
- if (t instanceof WebApplicationException) {
- asyncResponse.resume(t);
- } else {
- asyncResponse.resume(new RestException(t));
- }
- return null;
- });
- }
+ CompletableFuture<Void> future =
CompletableFuture.completedFuture(null);
+ if (topicName.isGlobal()) {
+ future = validateGlobalNamespaceOwnershipAsync(namespaceName);
+ }
+ future.thenAccept(__ ->{
+ // If the topic name is a partition name, no need to get partition
topic metadata again
+ if (topicName.isPartitioned()) {
+ if (checkTopicIsTransactionCoordinatorAssign(topicName)) {
+ internalUnloadTransactionCoordinatorAsync(asyncResponse,
authoritative);
+ } else {
+ internalUnloadNonPartitionedTopicAsync(asyncResponse,
authoritative);
+ }
+ } else {
+ getPartitionedTopicMetadataAsync(topicName, authoritative,
false)
+ .thenAccept(meta -> {
+ if (meta.partitions > 0) {
+ final List<CompletableFuture<Void>> futures =
Lists.newArrayList();
Review comment:
fixed
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -714,70 +714,73 @@ protected void
internalDeletePartitionedTopic(AsyncResponse asyncResponse, boole
protected void internalUnloadTopic(AsyncResponse asyncResponse, boolean
authoritative) {
log.info("[{}] Unloading topic {}", clientAppId(), topicName);
- try {
- if (topicName.isGlobal()) {
- validateGlobalNamespaceOwnership(namespaceName);
- }
- } catch (Exception e) {
- log.error("[{}] Failed to unload topic {}", clientAppId(),
topicName, e);
- resumeAsyncResponseExceptionally(asyncResponse, e);
- return;
- }
- // If the topic name is a partition name, no need to get partition
topic metadata again
- if (topicName.isPartitioned()) {
- if (checkTopicIsTransactionCoordinatorAssign(topicName)) {
- internalUnloadTransactionCoordinator(asyncResponse,
authoritative);
- } else {
- internalUnloadNonPartitionedTopic(asyncResponse,
authoritative);
- }
- } else {
- getPartitionedTopicMetadataAsync(topicName, authoritative, false)
- .thenAccept(meta -> {
- if (meta.partitions > 0) {
- final List<CompletableFuture<Void>> futures =
Lists.newArrayList();
-
- for (int i = 0; i < meta.partitions; i++) {
- TopicName topicNamePartition =
topicName.getPartition(i);
- try {
-
futures.add(pulsar().getAdminClient().topics().unloadAsync(
- topicNamePartition.toString()));
- } catch (Exception e) {
- log.error("[{}] Failed to unload topic
{}", clientAppId(), topicNamePartition, e);
- asyncResponse.resume(new RestException(e));
- return;
- }
- }
-
- FutureUtil.waitForAll(futures).handle((result,
exception) -> {
- if (exception != null) {
- Throwable th = exception.getCause();
- if (th instanceof NotFoundException) {
- asyncResponse.resume(new
RestException(Status.NOT_FOUND, th.getMessage()));
- } else if (th instanceof
WebApplicationException) {
- asyncResponse.resume(th);
- } else {
- log.error("[{}] Failed to unload topic
{}", clientAppId(), topicName,
- exception);
- asyncResponse.resume(new
RestException(exception));
- }
- } else {
-
asyncResponse.resume(Response.noContent().build());
- }
- return null;
- });
- } else {
- internalUnloadNonPartitionedTopic(asyncResponse,
authoritative);
- }
- }).exceptionally(t -> {
- log.error("[{}] Failed to unload topic {}", clientAppId(),
topicName, t);
- if (t instanceof WebApplicationException) {
- asyncResponse.resume(t);
- } else {
- asyncResponse.resume(new RestException(t));
- }
- return null;
- });
- }
+ CompletableFuture<Void> future =
CompletableFuture.completedFuture(null);
+ if (topicName.isGlobal()) {
+ future = validateGlobalNamespaceOwnershipAsync(namespaceName);
+ }
+ future.thenAccept(__ ->{
+ // If the topic name is a partition name, no need to get partition
topic metadata again
+ if (topicName.isPartitioned()) {
+ if (checkTopicIsTransactionCoordinatorAssign(topicName)) {
+ internalUnloadTransactionCoordinatorAsync(asyncResponse,
authoritative);
+ } else {
+ internalUnloadNonPartitionedTopicAsync(asyncResponse,
authoritative);
+ }
+ } else {
+ getPartitionedTopicMetadataAsync(topicName, authoritative,
false)
+ .thenAccept(meta -> {
+ if (meta.partitions > 0) {
+ final List<CompletableFuture<Void>> futures =
Lists.newArrayList();
+
+ for (int i = 0; i < meta.partitions; i++) {
+ TopicName topicNamePartition =
topicName.getPartition(i);
+ try {
+
futures.add(pulsar().getAdminClient().topics().unloadAsync(
+ topicNamePartition.toString()));
+ } catch (Exception e) {
+ log.error("[{}] Failed to unload topic
{}", clientAppId(),
+ topicNamePartition, e);
+ asyncResponse.resume(new
RestException(e));
+ return;
+ }
+ }
+
+ FutureUtil.waitForAll(futures).handle((result,
exception) -> {
+ if (exception != null) {
+ Throwable th = exception.getCause();
+ if (th instanceof NotFoundException) {
+ asyncResponse.resume(new
RestException(Status.NOT_FOUND, th.getMessage()));
+ } else if (th instanceof
WebApplicationException) {
+ asyncResponse.resume(th);
+ } else {
+ log.error("[{}] Failed to unload
topic {}", clientAppId(), topicName,
+ exception);
+ asyncResponse.resume(new
RestException(exception));
+ }
+ } else {
+
asyncResponse.resume(Response.noContent().build());
+ }
+ return null;
+ });
+ } else {
+
internalUnloadNonPartitionedTopicAsync(asyncResponse, authoritative);
+ }
+ }).exceptionally(t -> {
+ log.error("[{}] Failed to unload topic {}",
clientAppId(), topicName, t);
+ if (t instanceof WebApplicationException) {
+ asyncResponse.resume(t);
+ } else {
+ asyncResponse.resume(new RestException(t));
+ }
+ return null;
+ });
+ }
+ }).exceptionally(ex -> {
+ Throwable cause = ex.getCause();
+ log.error("[{}] Failed to unload topic {}", clientAppId(),
topicName, cause);
Review comment:
fixed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]