This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit fed8b8c324685d907e9867eac5a14ed8a5fefa78 Author: Jiwei Guo <techno...@apache.org> AuthorDate: Sun Apr 3 21:50:45 2022 +0800 [fix][broker] Fix creating system namespace topic failure. (#14949) (cherry picked from commit f3b87b65c6946eb197c1eece22cff8ff04e16fcb) --- .../pulsar/broker/service/BrokerService.java | 3 +- .../systopic/PartitionedSystemTopicTest.java | 33 +++++++++++++++++++++- 2 files changed, 33 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 9f14da56653..261d006ebd0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -24,7 +24,6 @@ import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun; import static org.apache.commons.collections.CollectionUtils.isEmpty; import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.pulsar.broker.PulsarService.isTransactionSystemTopic; -import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsEventsNames; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -2606,7 +2605,7 @@ public class BrokerService implements Closeable { public boolean isAllowAutoTopicCreation(final TopicName topicName) { //System topic can always be created automatically - if (pulsar.getConfiguration().isSystemTopicEnabled() && checkTopicIsEventsNames(topicName)) { + if (pulsar.getConfiguration().isSystemTopicEnabled() && isSystemTopic(topicName)) { return true; } AutoTopicCreationOverride autoTopicCreationOverride = getAutoTopicCreationOverride(topicName); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java index bbd3cae7117..ff45c140f56 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java @@ -19,21 +19,28 @@ package org.apache.pulsar.broker.systopic; import com.google.common.collect.Sets; +import lombok.Cleanup; import org.apache.commons.lang.RandomStringUtils; import org.apache.pulsar.broker.service.BrokerTestBase; import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.events.EventsTopicNames; import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.util.FutureUtil; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; - +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; @Test(groups = "broker") public class PartitionedSystemTopicTest extends BrokerTestBase { @@ -104,4 +111,28 @@ public class PartitionedSystemTopicTest extends BrokerTestBase { } } + @Test + public void testProduceAndConsumeUnderSystemNamespace() throws Exception { + TenantInfo tenantInfo = TenantInfo + .builder() + .adminRoles(Sets.newHashSet("admin")) + .allowedClusters(Sets.newHashSet("test")) + .build(); + admin.tenants().createTenant("pulsar", tenantInfo); + admin.namespaces().createNamespace("pulsar/system", 2); + @Cleanup + Producer<byte[]> producer = pulsarClient.newProducer().topic("pulsar/system/__topic-1").create(); + producer.send("test".getBytes(StandardCharsets.UTF_8)); + @Cleanup + Consumer<byte[]> consumer = pulsarClient + .newConsumer() + .topic("pulsar/system/__topic-1") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscriptionName("sub1") + .subscriptionType(SubscriptionType.Shared) + .subscribe(); + Message<byte[]> receive = consumer.receive(5, TimeUnit.SECONDS); + Assert.assertNotNull(receive); + } + }