This is an automated email from the ASF dual-hosted git repository. guangning pushed a commit to branch branch-2.5 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit a8d11f10633547614e6885376cab63807e24fa32 Author: ltamber <[email protected]> AuthorDate: Sun Feb 16 13:27:50 2020 +0800 [Issue 5904]Support `unload` all partitions of a partitioned topic (#6187) Fixes #5904 ### Motivation Pulsar supports unload a non-partitioned-topic or a partition of a partitioned topic. If there has a partitioned topic with too many partitions, users need to get all partition and unload them one by one. We need to support unload all partition of a partitioned topic. --- .../apache/pulsar/broker/admin/AdminResource.java | 26 ++++++++ .../broker/admin/impl/PersistentTopicsBase.java | 71 ++++++++++++++++------ .../broker/admin/v1/NonPersistentTopics.java | 18 +++--- .../pulsar/broker/admin/v1/PersistentTopics.java | 7 ++- .../broker/admin/v2/NonPersistentTopics.java | 13 ++-- .../pulsar/broker/admin/v2/PersistentTopics.java | 11 +++- .../pulsar/broker/admin/PersistentTopicsTest.java | 44 +++++++++++--- 7 files changed, 146 insertions(+), 44 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index 2aa5ae7..79fe998 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -551,6 +551,32 @@ public abstract class AdminResource extends PulsarWebResource { return pulsar().getConfigurationCache().failureDomainListCache(); } + protected CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadataAsync( + TopicName topicName, boolean authoritative, boolean checkAllowAutoCreation) { + validateClusterOwnership(topicName.getCluster()); + // validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can + // serve/redirect request else fail partitioned-metadata-request so, client fails while creating + // producer/consumer + validateGlobalNamespaceOwnership(topicName.getNamespaceObject()); + + try { + checkConnect(topicName); + } catch (WebApplicationException e) { + validateAdminAccessForTenant(topicName.getTenant()); + } catch (Exception e) { + // unknown error marked as internal server error + log.warn("Unexpected error while authorizing lookup. topic={}, role={}. Error: {}", topicName, + clientAppId(), e.getMessage(), e); + return FutureUtil.failedFuture(e); + } + + if (checkAllowAutoCreation) { + return pulsar().getBrokerService().fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName); + } else { + return pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName); + } + } + protected PartitionedTopicMetadata getPartitionedTopicMetadata(TopicName topicName, boolean authoritative, boolean checkAllowAutoCreation) { validateClusterOwnership(topicName.getCluster()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 0add56e..4a1021f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -655,12 +655,63 @@ public class PersistentTopicsBase extends AdminResource { }); } - protected void internalUnloadTopic(boolean authoritative) { + protected void internalUnloadTopic(AsyncResponse asyncResponse, boolean authoritative) { log.info("[{}] Unloading topic {}", clientAppId(), topicName); if (topicName.isGlobal()) { validateGlobalNamespaceOwnership(namespaceName); } - unloadTopic(topicName, authoritative); + + getPartitionedTopicMetadataAsync(topicName, authoritative, false).whenComplete((meta, t) -> { + if (meta.partitions > 0) { + final List<CompletableFuture<Void>> futures = Lists.newArrayList(); + + for (int i = 0; i < meta.partitions; i++) { + TopicName topicNamePartition = topicName.getPartition(i); + try { + futures.add(pulsar().getAdminClient().topics().unloadAsync(topicNamePartition.toString())); + } catch (Exception e) { + log.error("[{}] Failed to unload topic {}", clientAppId(), topicNamePartition, e); + asyncResponse.resume(new RestException(e)); + return; + } + } + + FutureUtil.waitForAll(futures).handle((result, exception) -> { + if (exception != null) { + Throwable th = exception.getCause(); + if (th instanceof NotFoundException) { + asyncResponse.resume(new RestException(Status.NOT_FOUND, th.getMessage())); + } else { + log.error("[{}] Failed to unload topic {}", clientAppId(), topicName, exception); + asyncResponse.resume(new RestException(exception)); + } + return null; + } + + asyncResponse.resume(Response.noContent().build()); + return null; + }); + } else { + validateAdminAccessForTenant(topicName.getTenant()); + validateTopicOwnership(topicName, authoritative); + + Topic topic = getTopicReference(topicName); + topic.close(false).whenComplete((r, ex) -> { + if (ex != null) { + log.error("[{}] Failed to unload topic {}, {}", clientAppId(), topicName, ex.getMessage(), ex); + asyncResponse.resume(new RestException(ex)); + + } else { + log.info("[{}] Successfully unloaded topic {}", clientAppId(), topicName); + asyncResponse.resume(Response.noContent().build()); + } + }); + } + }).exceptionally(t -> { + Throwable th = t.getCause(); + asyncResponse.resume(new RestException(th)); + return null; + }); } protected void internalDeleteTopic(boolean authoritative, boolean force) { @@ -1893,22 +1944,6 @@ public class PersistentTopicsBase extends AdminResource { return result; } - protected void unloadTopic(TopicName topicName, boolean authoritative) { - validateSuperUserAccess(); - validateTopicOwnership(topicName, authoritative); - try { - Topic topic = getTopicReference(topicName); - topic.close(false).get(); - log.info("[{}] Successfully unloaded topic {}", clientAppId(), topicName); - } catch (NullPointerException e) { - log.error("[{}] topic {} not found", clientAppId(), topicName); - throw new RestException(Status.NOT_FOUND, "Topic does not exist"); - } catch (Exception e) { - log.error("[{}] Failed to unload topic {}, {}", clientAppId(), topicName, e.getMessage(), e); - throw new RestException(e); - } - } - // as described at : (PR: #836) CPP-client old client lib should not be allowed to connect on partitioned-topic. // So, all requests from old-cpp-client (< v1.21) must be rejected. // Pulsar client-java lib always passes user-agent as X-Java-$version. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java index 0179847..12b9622 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java @@ -161,16 +161,18 @@ public class NonPersistentTopics extends PersistentTopics { @ApiOperation(hidden = true, value = "Unload a topic") @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist") }) - public void unloadTopic(@PathParam("property") String property, @PathParam("cluster") String cluster, - @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic, + public void unloadTopic(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property, + @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { - validateTopicName(property, cluster, namespace, encodedTopic); - log.info("[{}] Unloading topic {}", clientAppId(), topicName); - - if (topicName.isGlobal()) { - validateGlobalNamespaceOwnership(namespaceName); + try { + validateTopicName(property, cluster, namespace, encodedTopic); + internalUnloadTopic(asyncResponse, authoritative); + } catch (WebApplicationException wae) { + asyncResponse.resume(wae); + } catch (Exception e) { + asyncResponse.resume(new RestException(e)); } - unloadTopic(topicName, authoritative); } @GET diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java index f4fbbe0..9944ca3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java @@ -213,11 +213,12 @@ public class PersistentTopics extends PersistentTopicsBase { @ApiOperation(hidden = true, value = "Unload a topic") @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist") }) - public void unloadTopic(@PathParam("property") String property, @PathParam("cluster") String cluster, - @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic, + public void unloadTopic(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property, + @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(property, cluster, namespace, encodedTopic); - internalUnloadTopic(authoritative); + internalUnloadTopic(asyncResponse, authoritative); } @DELETE diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java index a41db33..add815d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java @@ -214,6 +214,7 @@ public class NonPersistentTopics extends PersistentTopics { @ApiResponse(code = 503, message = "Failed to validate global cluster configuration"), }) public void unloadTopic( + @Suspended final AsyncResponse asyncResponse, @ApiParam(value = "Specify the tenant", required = true) @PathParam("tenant") String tenant, @ApiParam(value = "Specify the namespace", required = true) @@ -222,12 +223,14 @@ public class NonPersistentTopics extends PersistentTopics { @PathParam("topic") @Encoded String encodedTopic, @ApiParam(value = "Is authentication required to perform this operation") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { - validateTopicName(tenant, namespace, encodedTopic); - log.info("[{}] Unloading topic {}", clientAppId(), topicName); - if (topicName.isGlobal()) { - validateGlobalNamespaceOwnership(namespaceName); + try { + validateTopicName(tenant, namespace, encodedTopic); + internalUnloadTopic(asyncResponse, authoritative); + } catch (WebApplicationException wae) { + asyncResponse.resume(wae); + } catch (Exception e) { + asyncResponse.resume(new RestException(e)); } - unloadTopic(topicName, authoritative); } @GET diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 08411dd..57dd7e1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -365,6 +365,7 @@ public class PersistentTopics extends PersistentTopicsBase { @ApiResponse(code = 500, message = "Internal server error"), @ApiResponse(code = 503, message = "Failed to validate global cluster configuration") }) public void unloadTopic( + @Suspended final AsyncResponse asyncResponse, @ApiParam(value = "Specify the tenant", required = true) @PathParam("tenant") String tenant, @ApiParam(value = "Specify the namespace", required = true) @@ -373,8 +374,14 @@ public class PersistentTopics extends PersistentTopicsBase { @PathParam("topic") @Encoded String encodedTopic, @ApiParam(value = "Is authentication required to perform this operation") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { - validateTopicName(tenant, namespace, encodedTopic); - internalUnloadTopic(authoritative); + try { + validateTopicName(tenant, namespace, encodedTopic); + internalUnloadTopic(asyncResponse, authoritative); + } catch (WebApplicationException wae) { + asyncResponse.resume(wae); + } catch (Exception e) { + asyncResponse.resume(new RestException(e)); + } } @DELETE diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index a4cd325..1825d31 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -276,18 +276,46 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest { @Test public void testUnloadTopic() { final String topicName = "standard-topic-to-be-unload"; + final String partitionTopicName = "partition-topic-to-be-unload"; + + // 1) not exist topic + AsyncResponse response = mock(AsyncResponse.class); + persistentTopics.unloadTopic(response, testTenant, testNamespace, "topic-not-exist", true); + ArgumentCaptor<RestException> errCaptor = ArgumentCaptor.forClass(RestException.class); + verify(response, timeout(5000).times(1)).resume(errCaptor.capture()); + Assert.assertEquals(errCaptor.getValue().getResponse().getStatus(), Response.Status.NOT_FOUND.getStatusCode()); + + // 2) create non partitioned topic and unload + response = mock(AsyncResponse.class); persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, topicName, true); - persistentTopics.unloadTopic(testTenant, testNamespace, topicName, true); + persistentTopics.unloadTopic(response, testTenant, testNamespace, topicName, true); + ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class); + verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); + Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); + + // 3) create partitioned topic and unload + response = mock(AsyncResponse.class); + persistentTopics.createPartitionedTopic(testTenant, testNamespace, partitionTopicName, 6); + persistentTopics.unloadTopic(response, testTenant, testNamespace, partitionTopicName, true); + responseCaptor = ArgumentCaptor.forClass(Response.class); + verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); + Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); + + // 4) delete partitioned topic + response = mock(AsyncResponse.class); + persistentTopics.deletePartitionedTopic(response, testTenant, testNamespace, partitionTopicName, true, true); + responseCaptor = ArgumentCaptor.forClass(Response.class); + verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); + Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); } - @Test(expectedExceptions = RestException.class) + @Test public void testUnloadTopicShallThrowNotFoundWhenTopicNotExist() { - try { - persistentTopics.unloadTopic(testTenant, testNamespace,"non-existent-topic", true); - } catch (RestException e) { - Assert.assertEquals(e.getResponse().getStatus(), Response.Status.NOT_FOUND.getStatusCode()); - throw e; - } + AsyncResponse response = mock(AsyncResponse.class); + persistentTopics.unloadTopic(response, testTenant, testNamespace,"non-existent-topic", true); + ArgumentCaptor<RestException> responseCaptor = ArgumentCaptor.forClass(RestException.class); + verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); + Assert.assertEquals(responseCaptor.getValue().getResponse().getStatus(), Response.Status.NOT_FOUND.getStatusCode()); } @Test
