This is an automated email from the ASF dual-hosted git repository.

guangning pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a8d11f10633547614e6885376cab63807e24fa32
Author: ltamber <[email protected]>
AuthorDate: Sun Feb 16 13:27:50 2020 +0800

    [Issue 5904]Support `unload` all partitions of a partitioned topic (#6187)
    
    Fixes #5904
    
    ### Motivation
    Pulsar supports unload a non-partitioned-topic or a partition of a 
partitioned topic. If there has a partitioned topic with too many partitions, 
users need to get all partition and unload them one by one. We need to support 
unload all partition of a partitioned topic.
---
 .../apache/pulsar/broker/admin/AdminResource.java  | 26 ++++++++
 .../broker/admin/impl/PersistentTopicsBase.java    | 71 ++++++++++++++++------
 .../broker/admin/v1/NonPersistentTopics.java       | 18 +++---
 .../pulsar/broker/admin/v1/PersistentTopics.java   |  7 ++-
 .../broker/admin/v2/NonPersistentTopics.java       | 13 ++--
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 11 +++-
 .../pulsar/broker/admin/PersistentTopicsTest.java  | 44 +++++++++++---
 7 files changed, 146 insertions(+), 44 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 2aa5ae7..79fe998 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -551,6 +551,32 @@ public abstract class AdminResource extends 
PulsarWebResource {
         return pulsar().getConfigurationCache().failureDomainListCache();
     }
 
+    protected CompletableFuture<PartitionedTopicMetadata> 
getPartitionedTopicMetadataAsync(
+            TopicName topicName, boolean authoritative, boolean 
checkAllowAutoCreation) {
+        validateClusterOwnership(topicName.getCluster());
+        // validates global-namespace contains local/peer cluster: if 
peer/local cluster present then lookup can
+        // serve/redirect request else fail partitioned-metadata-request so, 
client fails while creating
+        // producer/consumer
+        validateGlobalNamespaceOwnership(topicName.getNamespaceObject());
+
+        try {
+            checkConnect(topicName);
+        } catch (WebApplicationException e) {
+            validateAdminAccessForTenant(topicName.getTenant());
+        } catch (Exception e) {
+            // unknown error marked as internal server error
+            log.warn("Unexpected error while authorizing lookup. topic={}, 
role={}. Error: {}", topicName,
+                    clientAppId(), e.getMessage(), e);
+            return FutureUtil.failedFuture(e);
+        }
+
+        if (checkAllowAutoCreation) {
+            return 
pulsar().getBrokerService().fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName);
+        } else {
+            return 
pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName);
+        }
+    }
+
     protected PartitionedTopicMetadata getPartitionedTopicMetadata(TopicName 
topicName,
             boolean authoritative, boolean checkAllowAutoCreation) {
         validateClusterOwnership(topicName.getCluster());
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 0add56e..4a1021f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -655,12 +655,63 @@ public class PersistentTopicsBase extends AdminResource {
         });
     }
 
-    protected void internalUnloadTopic(boolean authoritative) {
+    protected void internalUnloadTopic(AsyncResponse asyncResponse, boolean 
authoritative) {
         log.info("[{}] Unloading topic {}", clientAppId(), topicName);
         if (topicName.isGlobal()) {
             validateGlobalNamespaceOwnership(namespaceName);
         }
-        unloadTopic(topicName, authoritative);
+
+        getPartitionedTopicMetadataAsync(topicName, authoritative, 
false).whenComplete((meta, t) -> {
+            if (meta.partitions > 0) {
+                final List<CompletableFuture<Void>> futures = 
Lists.newArrayList();
+
+                for (int i = 0; i < meta.partitions; i++) {
+                    TopicName topicNamePartition = topicName.getPartition(i);
+                    try {
+                        
futures.add(pulsar().getAdminClient().topics().unloadAsync(topicNamePartition.toString()));
+                    } catch (Exception e) {
+                        log.error("[{}] Failed to unload topic {}", 
clientAppId(), topicNamePartition, e);
+                        asyncResponse.resume(new RestException(e));
+                        return;
+                    }
+                }
+
+                FutureUtil.waitForAll(futures).handle((result, exception) -> {
+                    if (exception != null) {
+                        Throwable th = exception.getCause();
+                        if (th instanceof NotFoundException) {
+                            asyncResponse.resume(new 
RestException(Status.NOT_FOUND, th.getMessage()));
+                        } else {
+                            log.error("[{}] Failed to unload topic {}", 
clientAppId(), topicName, exception);
+                            asyncResponse.resume(new RestException(exception));
+                        }
+                        return null;
+                    }
+
+                    asyncResponse.resume(Response.noContent().build());
+                    return null;
+                });
+            } else {
+                validateAdminAccessForTenant(topicName.getTenant());
+                validateTopicOwnership(topicName, authoritative);
+
+                Topic topic = getTopicReference(topicName);
+                topic.close(false).whenComplete((r, ex) -> {
+                    if (ex != null) {
+                        log.error("[{}] Failed to unload topic {}, {}", 
clientAppId(), topicName, ex.getMessage(), ex);
+                        asyncResponse.resume(new RestException(ex));
+
+                    } else {
+                        log.info("[{}] Successfully unloaded topic {}", 
clientAppId(), topicName);
+                        asyncResponse.resume(Response.noContent().build());
+                    }
+                });
+            }
+        }).exceptionally(t -> {
+            Throwable th = t.getCause();
+            asyncResponse.resume(new RestException(th));
+            return null;
+        });
     }
 
     protected void internalDeleteTopic(boolean authoritative, boolean force) {
@@ -1893,22 +1944,6 @@ public class PersistentTopicsBase extends AdminResource {
         return result;
     }
 
-    protected void unloadTopic(TopicName topicName, boolean authoritative) {
-        validateSuperUserAccess();
-        validateTopicOwnership(topicName, authoritative);
-        try {
-            Topic topic = getTopicReference(topicName);
-            topic.close(false).get();
-            log.info("[{}] Successfully unloaded topic {}", clientAppId(), 
topicName);
-        } catch (NullPointerException e) {
-            log.error("[{}] topic {} not found", clientAppId(), topicName);
-            throw new RestException(Status.NOT_FOUND, "Topic does not exist");
-        } catch (Exception e) {
-            log.error("[{}] Failed to unload topic {}, {}", clientAppId(), 
topicName, e.getMessage(), e);
-            throw new RestException(e);
-        }
-    }
-
     // as described at : (PR: #836) CPP-client old client lib should not be 
allowed to connect on partitioned-topic.
     // So, all requests from old-cpp-client (< v1.21) must be rejected.
     // Pulsar client-java lib always passes user-agent as X-Java-$version.
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
index 0179847..12b9622 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
@@ -161,16 +161,18 @@ public class NonPersistentTopics extends PersistentTopics 
{
     @ApiOperation(hidden = true, value = "Unload a topic")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
             @ApiResponse(code = 404, message = "Topic does not exist") })
-    public void unloadTopic(@PathParam("property") String property, 
@PathParam("cluster") String cluster,
-            @PathParam("namespace") String namespace, @PathParam("topic") 
@Encoded String encodedTopic,
+    public void unloadTopic(@Suspended final AsyncResponse asyncResponse, 
@PathParam("property") String property,
+            @PathParam("cluster") String cluster, @PathParam("namespace") 
String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
-        validateTopicName(property, cluster, namespace, encodedTopic);
-        log.info("[{}] Unloading topic {}", clientAppId(), topicName);
-
-        if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
+        try {
+            validateTopicName(property, cluster, namespace, encodedTopic);
+            internalUnloadTopic(asyncResponse, authoritative);
+        } catch (WebApplicationException wae) {
+            asyncResponse.resume(wae);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
         }
-        unloadTopic(topicName, authoritative);
     }
 
     @GET
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index f4fbbe0..9944ca3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -213,11 +213,12 @@ public class PersistentTopics extends 
PersistentTopicsBase {
     @ApiOperation(hidden = true, value = "Unload a topic")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
             @ApiResponse(code = 404, message = "Topic does not exist") })
-    public void unloadTopic(@PathParam("property") String property, 
@PathParam("cluster") String cluster,
-            @PathParam("namespace") String namespace, @PathParam("topic") 
@Encoded String encodedTopic,
+    public void unloadTopic(@Suspended final AsyncResponse asyncResponse, 
@PathParam("property") String property,
+            @PathParam("cluster") String cluster, @PathParam("namespace") 
String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(property, cluster, namespace, encodedTopic);
-        internalUnloadTopic(authoritative);
+        internalUnloadTopic(asyncResponse, authoritative);
     }
 
     @DELETE
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index a41db33..add815d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -214,6 +214,7 @@ public class NonPersistentTopics extends PersistentTopics {
             @ApiResponse(code = 503, message = "Failed to validate global 
cluster configuration"),
     })
     public void unloadTopic(
+            @Suspended final AsyncResponse asyncResponse,
             @ApiParam(value = "Specify the tenant", required = true)
             @PathParam("tenant") String tenant,
             @ApiParam(value = "Specify the namespace", required = true)
@@ -222,12 +223,14 @@ public class NonPersistentTopics extends PersistentTopics 
{
             @PathParam("topic") @Encoded String encodedTopic,
             @ApiParam(value = "Is authentication required to perform this 
operation")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
-        validateTopicName(tenant, namespace, encodedTopic);
-        log.info("[{}] Unloading topic {}", clientAppId(), topicName);
-        if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
+        try {
+            validateTopicName(tenant, namespace, encodedTopic);
+            internalUnloadTopic(asyncResponse, authoritative);
+        } catch (WebApplicationException wae) {
+            asyncResponse.resume(wae);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
         }
-        unloadTopic(topicName, authoritative);
     }
 
     @GET
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 08411dd..57dd7e1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -365,6 +365,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiResponse(code = 500, message = "Internal server error"),
             @ApiResponse(code = 503, message = "Failed to validate global 
cluster configuration") })
     public void unloadTopic(
+            @Suspended final AsyncResponse asyncResponse,
             @ApiParam(value = "Specify the tenant", required = true)
             @PathParam("tenant") String tenant,
             @ApiParam(value = "Specify the namespace", required = true)
@@ -373,8 +374,14 @@ public class PersistentTopics extends PersistentTopicsBase 
{
             @PathParam("topic") @Encoded String encodedTopic,
             @ApiParam(value = "Is authentication required to perform this 
operation")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
-        validateTopicName(tenant, namespace, encodedTopic);
-        internalUnloadTopic(authoritative);
+        try {
+            validateTopicName(tenant, namespace, encodedTopic);
+            internalUnloadTopic(asyncResponse, authoritative);
+        } catch (WebApplicationException wae) {
+            asyncResponse.resume(wae);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
     }
 
     @DELETE
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index a4cd325..1825d31 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -276,18 +276,46 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
     @Test
     public void testUnloadTopic() {
         final String topicName = "standard-topic-to-be-unload";
+        final String partitionTopicName = "partition-topic-to-be-unload";
+
+        // 1) not exist topic
+        AsyncResponse response = mock(AsyncResponse.class);
+        persistentTopics.unloadTopic(response, testTenant, testNamespace, 
"topic-not-exist", true);
+        ArgumentCaptor<RestException> errCaptor = 
ArgumentCaptor.forClass(RestException.class);
+        verify(response, timeout(5000).times(1)).resume(errCaptor.capture());
+        Assert.assertEquals(errCaptor.getValue().getResponse().getStatus(), 
Response.Status.NOT_FOUND.getStatusCode());
+
+        // 2) create non partitioned topic and unload
+        response = mock(AsyncResponse.class);
         persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, 
topicName, true);
-        persistentTopics.unloadTopic(testTenant, testNamespace, topicName, 
true);
+        persistentTopics.unloadTopic(response, testTenant, testNamespace, 
topicName, true);
+        ArgumentCaptor<Response> responseCaptor = 
ArgumentCaptor.forClass(Response.class);
+        verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.NO_CONTENT.getStatusCode());
+
+        // 3) create partitioned topic and unload
+        response = mock(AsyncResponse.class);
+        persistentTopics.createPartitionedTopic(testTenant, testNamespace, 
partitionTopicName, 6);
+        persistentTopics.unloadTopic(response, testTenant, testNamespace, 
partitionTopicName, true);
+        responseCaptor = ArgumentCaptor.forClass(Response.class);
+        verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.NO_CONTENT.getStatusCode());
+
+        // 4) delete partitioned topic
+        response = mock(AsyncResponse.class);
+        persistentTopics.deletePartitionedTopic(response, testTenant, 
testNamespace, partitionTopicName, true, true);
+        responseCaptor = ArgumentCaptor.forClass(Response.class);
+        verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.NO_CONTENT.getStatusCode());
     }
 
-    @Test(expectedExceptions = RestException.class)
+    @Test
     public void testUnloadTopicShallThrowNotFoundWhenTopicNotExist() {
-        try {
-            persistentTopics.unloadTopic(testTenant, 
testNamespace,"non-existent-topic", true);
-        } catch (RestException e) {
-            Assert.assertEquals(e.getResponse().getStatus(), 
Response.Status.NOT_FOUND.getStatusCode());
-            throw e;
-        }
+        AsyncResponse response = mock(AsyncResponse.class);
+        persistentTopics.unloadTopic(response, testTenant, 
testNamespace,"non-existent-topic", true);
+        ArgumentCaptor<RestException> responseCaptor = 
ArgumentCaptor.forClass(RestException.class);
+        verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
+        
Assert.assertEquals(responseCaptor.getValue().getResponse().getStatus(), 
Response.Status.NOT_FOUND.getStatusCode());
     }
 
     @Test

Reply via email to