This is an automated email from the ASF dual-hosted git repository. mattisonchao pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 0475d4212ae53fbc62bc786c6cfa4974abc95103 Author: Qiang Zhao <[email protected]> AuthorDate: Sat Jan 14 11:55:18 2023 +0800 [fix][broker] Support deleting partitioned topics with the keyword `-partition-` (#19230) (cherry picked from commit fc4bca6823282c4f5375e5d48552c1f0e114b5ee) --- .../pulsar/broker/admin/v2/PersistentTopics.java | 2 +- .../persistent/MessageRedeliveryController.java | 3 +- .../PartitionKeywordCompatibilityTest.java | 77 ++++++++++++++++++++++ 3 files changed, 79 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index c9c5ad7720c..e6bf99c4118 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -825,7 +825,7 @@ public class PersistentTopics extends PersistentTopicsBase { @ApiParam(value = "Delete the topic's schema storage") @QueryParam("deleteSchema") @DefaultValue("false") boolean deleteSchema) { try { - validatePartitionedTopicName(tenant, namespace, encodedTopic); + validateTopicName(tenant, namespace, encodedTopic); internalDeletePartitionedTopic(asyncResponse, authoritative, force, deleteSchema); } catch (WebApplicationException wae) { asyncResponse.resume(wae); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java index e6febc722de..a8f6c14c537 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java @@ -46,8 +46,7 @@ public class MessageRedeliveryController { if (!allowOutOfOrderDelivery) { this.hashesToBeBlocked = ConcurrentLongLongPairHashMap .newBuilder().concurrencyLevel(2).expectedItems(128).autoShrink(true).build(); - this.hashesRefCount = ConcurrentLongLongHashMap - .newBuilder().concurrencyLevel(2).expectedItems(128).autoShrink(true).build(); + this.hashesRefCount = new ConcurrentLongLongHashMap(128, 2); } else { this.hashesToBeBlocked = null; this.hashesRefCount = null; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PartitionKeywordCompatibilityTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PartitionKeywordCompatibilityTest.java new file mode 100644 index 00000000000..58b9dcee628 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PartitionKeywordCompatibilityTest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.persistent; + + +import lombok.Cleanup; +import org.apache.pulsar.broker.service.BrokerTestBase; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; +import java.util.List; + +@Test +public class PartitionKeywordCompatibilityTest extends BrokerTestBase { + + @BeforeClass(alwaysRun = true) + @Override + protected void setup() throws Exception { + baseSetup(); + setupDefaultTenantAndNamespace(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + internalCleanup(); + } + + public void testAutoCreatePartitionTopicWithKeywordAndDeleteIt() + throws PulsarAdminException, PulsarClientException { + AutoTopicCreationOverride override = AutoTopicCreationOverride.builder() + .allowAutoTopicCreation(true) + .topicType("partitioned") + .defaultNumPartitions(1) + .build(); + admin.namespaces().setAutoTopicCreation("public/default", override); + String topicName = "persistent://public/default/XXX-partition-0-dd"; + @Cleanup + Consumer<byte[]> consumer = pulsarClient.newConsumer() + .topic(topicName) + .subscriptionName("sub-1") + .subscriptionType(SubscriptionType.Exclusive) + .subscribe(); + List<String> topics = admin.topics().getList("public/default"); + List<String> partitionedTopicList = admin.topics().getPartitionedTopicList("public/default"); + Assert.assertTrue(topics.contains(topicName)); + Assert.assertTrue(partitionedTopicList.contains(topicName)); + consumer.close(); + admin.topics().deletePartitionedTopic(topicName); + topics = admin.topics().getList("public/default"); + partitionedTopicList = admin.topics().getPartitionedTopicList("public/default"); + Assert.assertFalse(topics.contains(topicName)); + Assert.assertFalse(partitionedTopicList.contains(topicName)); + } +}
