This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new 9897c7d Fix master broker while subscribe to non-persistent
partitioned topic without topic auto-creation (#9107)
9897c7d is described below
commit 9897c7db5b8788e25710fd3dc7a2b693d4bc9b41
Author: lipenghui <[email protected]>
AuthorDate: Sun Jan 3 10:48:55 2021 +0800
Fix master broker while subscribe to non-persistent partitioned topic
without topic auto-creation (#9107)
### Motivation
After #9029 merged, the non-persistent topic can be created when enabling
the topic auto-creation, Otherwise, the client will get a `Topic does not
exist` exception. This looks like a concurrent merge related issue, but I'm not
able to find another PR related to this issue.
This PR is fixing the test that wants to subscribe to a partitioned
non-persistent topic but disabled the topic auto-creation. Currently, the fix
is enabling the topic auto-creation for the test. For non-persistent topics, we
don't persist any metadata for it in the metadata server, so for users who want
to use the non-persistent topic, they must enable the topic auto-creation.
(cherry picked from commit 21a3a3b5376fee8afeb01c5decfbdee49ae26a0c)
---
.../pulsar/broker/service/BrokerService.java | 27 ++++++++++++----------
.../pulsar/client/api/PartitionCreationTest.java | 14 +++++------
2 files changed, 22 insertions(+), 19 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 32bf07e..6330e36 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1558,19 +1558,22 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
synchronized (multiLayerTopicsMap) {
ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String,
Topic>> namespaceMap = multiLayerTopicsMap
.get(namespaceName);
- ConcurrentOpenHashMap<String, Topic> bundleMap =
namespaceMap.get(bundleName);
- bundleMap.remove(topic);
- if (bundleMap.isEmpty()) {
- namespaceMap.remove(bundleName);
- }
+ if (namespaceMap != null) {
+ ConcurrentOpenHashMap<String, Topic> bundleMap =
namespaceMap.get(bundleName);
+ bundleMap.remove(topic);
+ if (bundleMap.isEmpty()) {
+ namespaceMap.remove(bundleName);
+ }
- if (namespaceMap.isEmpty()) {
- multiLayerTopicsMap.remove(namespaceName);
- final ClusterReplicationMetrics clusterReplicationMetrics
= pulsarStats
- .getClusterReplicationMetrics();
- replicationClients.forEach((cluster, client) -> {
-
clusterReplicationMetrics.remove(clusterReplicationMetrics.getKeyName(namespaceName,
cluster));
- });
+ if (namespaceMap.isEmpty()) {
+ multiLayerTopicsMap.remove(namespaceName);
+ final ClusterReplicationMetrics
clusterReplicationMetrics = pulsarStats
+ .getClusterReplicationMetrics();
+ replicationClients.forEach((cluster, client) -> {
+
clusterReplicationMetrics.remove(clusterReplicationMetrics.getKeyName(namespaceName,
+ cluster));
+ });
+ }
}
}
} catch (Exception e) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java
index a217d05..dfdb25f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java
@@ -63,15 +63,15 @@ public class PartitionCreationTest extends
ProducerConsumerBase {
super.internalCleanup();
}
- @Test(dataProvider = "topicDomainProvider")
+ @Test(dataProvider = "topicDomainProvider", timeOut = 60000)
public void
testCreateConsumerForPartitionedTopicWhenDisableTopicAutoCreation(TopicDomain
domain) throws PulsarAdminException, PulsarClientException {
- conf.setAllowAutoTopicCreation(false);
+
conf.setAllowAutoTopicCreation(domain.equals(TopicDomain.non_persistent));
final String topic = domain.value() +
"://public/default/testCreateConsumerWhenDisableTopicAutoCreation";
admin.topics().createPartitionedTopic(topic, 3);
Assert.assertNotNull(pulsarClient.newConsumer().topic(topic).subscriptionName("sub-1").subscribe());
}
- @Test(dataProvider = "topicDomainProvider")
+ @Test(dataProvider = "topicDomainProvider", timeOut = 60000)
public void
testCreateConsumerForNonPartitionedTopicWhenDisableTopicAutoCreation(TopicDomain
domain) throws PulsarClientException {
conf.setAllowAutoTopicCreation(false);
final String topic = domain.value() +
"://public/default/testCreateConsumerForNonPartitionedTopicWhenDisableTopicAutoCreation";
@@ -88,7 +88,7 @@ public class PartitionCreationTest extends
ProducerConsumerBase {
}
}
- @Test(dataProvider = "topicDomainProvider")
+ @Test(dataProvider = "topicDomainProvider", timeOut = 60000)
public void
testCreateConsumerForPartitionedTopicWhenEnableTopicAutoCreation(TopicDomain
domain) throws PulsarAdminException, PulsarClientException {
conf.setAllowAutoTopicCreation(true);
final String topic = domain.value() +
"://public/default/testCreateConsumerForPartitionedTopicWhenEnableTopicAutoCreation";
@@ -96,14 +96,14 @@ public class PartitionCreationTest extends
ProducerConsumerBase {
Assert.assertNotNull(pulsarClient.newConsumer().topic(topic).subscriptionName("sub-1").subscribe());
}
- @Test(dataProvider = "topicDomainProvider")
+ @Test(dataProvider = "topicDomainProvider", timeOut = 60000)
public void
testCreateConsumerForNonPartitionedTopicWhenEnableTopicAutoCreation(TopicDomain
domain) throws PulsarClientException {
conf.setAllowAutoTopicCreation(true);
final String topic = domain.value() +
"://public/default/testCreateConsumerForNonPartitionedTopicWhenEnableTopicAutoCreation";
Assert.assertNotNull(pulsarClient.newConsumer().topic(topic).subscriptionName("sub-1").subscribe());
}
- @Test
+ @Test(timeOut = 60000)
public void
testCreateConsumerForPartitionedTopicUpdateWhenDisableTopicAutoCreation()
throws Exception {
conf.setAllowAutoTopicCreation(false);
final String topic =
"testCreateConsumerForPartitionedTopicUpdateWhenDisableTopicAutoCreation-" +
System.currentTimeMillis();
@@ -118,7 +118,7 @@ public class PartitionCreationTest extends
ProducerConsumerBase {
Assert.assertEquals(consumer.getConsumers().size(), 5);
}
- @Test
+ @Test(timeOut = 60000)
public void testCreateMissedPartitions() throws JsonProcessingException,
KeeperException, InterruptedException, PulsarAdminException,
PulsarClientException {
conf.setAllowAutoTopicCreation(false);
final String topic = "testCreateMissedPartitions";