This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e28f69e [Enhancement] avoid duplicate deletion of schema (#11640)
e28f69e is described below
commit e28f69ecf82d40b20aed16ad03f7831bbca721b4
Author: Zhanpeng Wu <[email protected]>
AuthorDate: Sat Aug 21 19:32:38 2021 +0800
[Enhancement] avoid duplicate deletion of schema (#11640)
### Motivation
Currently when I need to delete a namespace forcedly, the
`NamespacesBase#internalDeleteNamespaceForcefully` will firstly list all the
topics under the namespace and then delete them concurrently without
distinguishing wheather the topic is partitioned or non-partitioned. This
behaivor will cause duplicate deletion of schema when the topic list contains
partitioned topics, because all the partitions of the partitioned topic share
the same schema.
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 34 +++++++++++--
.../apache/pulsar/broker/admin/AdminApiTest2.java | 56 ++++++++++++++++++++++
2 files changed, 87 insertions(+), 3 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 4048b4a..eb27521 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -415,14 +415,42 @@ public abstract class NamespacesBase extends
AdminResource {
try {
// firstly remove all topics including system topics
if (!topics.isEmpty()) {
+ Set<String> partitionedTopics = new HashSet<>();
+ Set<String> nonPartitionedTopics = new HashSet<>();
+
for (String topic : topics) {
try {
-
futures.add(pulsar().getAdminClient().topics().deleteAsync(topic, true, true));
+ TopicName topicName = TopicName.get(topic);
+ if (topicName.isPartitioned()) {
+ String partitionedTopic =
topicName.getPartitionedTopicName();
+ if (!partitionedTopics.contains(partitionedTopic))
{
+ // Distinguish partitioned topic to avoid
duplicate deletion of the same schema
+
futures.add(pulsar().getAdminClient().topics().deletePartitionedTopicAsync(
+ partitionedTopic, true, true));
+ partitionedTopics.add(partitionedTopic);
+ }
+ } else {
+
futures.add(pulsar().getAdminClient().topics().deleteAsync(
+ topic, true, true));
+ nonPartitionedTopics.add(topic);
+ }
} catch (Exception e) {
- log.error("[{}] Failed to force delete topic {}",
clientAppId(), topic, e);
- asyncResponse.resume(new RestException(e));
+ String errorMessage = String.format("Failed to force
delete topic %s, "
+ + "but the previous deletion command
of partitioned-topics:%s "
+ + "and non-partitioned-topics:%s have
been sent out asynchronously. "
+ + "Reason: %s",
+ topic, partitionedTopics,
nonPartitionedTopics, e.getCause());
+ log.error("[{}] {}", clientAppId(), errorMessage, e);
+ asyncResponse.resume(new
RestException(Status.INTERNAL_SERVER_ERROR, errorMessage));
+ return;
}
}
+
+ if (log.isDebugEnabled()) {
+ log.debug("Successfully send deletion command of
partitioned-topics:{} "
+ + "and non-partitioned-topics:{} in
namespace:{}.",
+ partitionedTopics, nonPartitionedTopics,
namespaceName);
+ }
}
// forcefully delete namespace bundles
NamespaceBundles bundles =
pulsar().getNamespaceService().getNamespaceBundleFactory()
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
index ce6814c..556305d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
@@ -36,6 +36,7 @@ import java.lang.reflect.Field;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -44,6 +45,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.core.Response.Status;
import lombok.Cleanup;
@@ -1473,6 +1475,60 @@ public class AdminApiTest2 extends
MockedPulsarServiceBaseTest {
}
@Test
+ public void testDistinguishTopicTypeWhenForceDeleteNamespace() throws
Exception {
+ conf.setForceDeleteNamespaceAllowed(true);
+ final String ns = "prop-xyz/distinguish-topic-type-ns";
+ final String exNs = "prop-xyz/ex-distinguish-topic-type-ns";
+ admin.namespaces().createNamespace(ns, 2);
+ admin.namespaces().createNamespace(exNs, 2);
+
+ final String p1 = "persistent://" + ns + "/p1";
+ final String p5 = "persistent://" + ns + "/p5";
+ final String np = "persistent://" + ns + "/np";
+
+ admin.topics().createPartitionedTopic(p1, 1);
+ admin.topics().createPartitionedTopic(p5, 5);
+ admin.topics().createNonPartitionedTopic(np);
+
+ final String exNp = "persistent://" + exNs + "/np";
+ admin.topics().createNonPartitionedTopic(exNp);
+ // insert an invalid topic name
+ pulsar.getLocalMetadataStore().put(
+ "/managed-ledgers/" + exNs + "/persistent/", "".getBytes(),
Optional.empty()).join();
+
+ List<String> topics =
pulsar.getNamespaceService().getFullListOfTopics(NamespaceName.get(ns)).get();
+ List<String> exTopics =
pulsar.getNamespaceService().getFullListOfTopics(NamespaceName.get(exNs)).get();
+
+ // ensure that the topic list contains all the topics
+ List<String> allTopics = new ArrayList<>(Arrays.asList(np,
TopicName.get(p1).getPartition(0).toString()));
+ for (int i = 0; i < 5; i++) {
+ allTopics.add(TopicName.get(p5).getPartition(i).toString());
+ }
+ Assert.assertEquals(allTopics.stream().filter(t ->
!topics.contains(t)).count(), 0);
+ Assert.assertTrue(exTopics.contains("persistent://" + exNs + "/"));
+ // partition num = p1 + p5 + np
+ Assert.assertEquals(topics.size(), 1 + 5 + 1);
+ Assert.assertEquals(exTopics.size(), 1 + 1);
+
+ admin.namespaces().deleteNamespace(ns, true);
+ Arrays.asList(p1, p5, np).forEach(t -> {
+ try {
+ admin.schemas().getSchemaInfo(t);
+ } catch (PulsarAdminException e) {
+ // all the normal topics' schemas have been deleted
+ Assert.assertEquals(e.getStatusCode(), 404);
+ }
+ });
+
+ try {
+ admin.namespaces().deleteNamespace(exNs, true);
+ fail("Should fail due to invalid topic");
+ } catch (Exception e) {
+ //ok
+ }
+ }
+
+ @Test
public void testUpdateClusterWithProxyUrl() throws Exception {
ClusterData cluster =
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build();
String clusterName = "test2";