This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.8 by this push:
new 7869cd098b1 Fix can not enable system topic if
`AutoUpdateSchemaEnabled=false`. (#15759)
7869cd098b1 is described below
commit 7869cd098b103948495f8f133456b1ed40bed681
Author: Jiwei Guo <[email protected]>
AuthorDate: Tue May 24 19:53:01 2022 -0700
Fix can not enable system topic if `AutoUpdateSchemaEnabled=false`. (#15759)
---
.../apache/pulsar/broker/service/AbstractTopic.java | 3 +++
.../apache/pulsar/broker/service/BrokerService.java | 2 +-
.../SystemTopicBasedTopicPoliciesServiceTest.java | 20 ++++++++++++++++++++
3 files changed, 24 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index d93a20a5390..c279320a964 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -348,6 +348,9 @@ public abstract class AbstractTopic implements Topic {
}
private boolean allowAutoUpdateSchema() {
+ if (brokerService.isSystemTopic(topic)) {
+ return true;
+ }
if (isAllowAutoUpdateSchema == null) {
return
brokerService.pulsar().getConfig().isAllowAutoUpdateSchemaEnabled();
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index c45a0ccb6a4..a5e013230c3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2710,7 +2710,7 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
log.debug("No autoSubscriptionCreateOverride policy found for {}",
topicName);
return null;
}
- private boolean isSystemTopic(String topic) {
+ public boolean isSystemTopic(String topic) {
return SystemTopicClient.isSystemTopic(TopicName.get(topic));
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index 8b5f4203c84..b818b56dec7 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -49,12 +49,14 @@ import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.BackoffBuilder;
import org.apache.pulsar.client.impl.ReaderImpl;
+import org.apache.pulsar.common.events.EventType;
import org.apache.pulsar.common.events.PulsarEvent;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicPolicies;
+import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.testng.Assert;
@@ -376,4 +378,22 @@ public class SystemTopicBasedTopicPoliciesServiceTest
extends MockedPulsarServic
}
});
}
+
+ @Test
+ public void testAutoCreateSchema() throws Exception {
+ String namespace = "system-topic/ns2";
+ String topic = namespace + "/test";
+ admin.namespaces().createNamespace(namespace);
+ admin.namespaces().setIsAllowAutoUpdateSchema(namespace, false);
+ admin.topics().createNonPartitionedTopic(topic);
+ TopicName changeEventTopicName =
+ NamespaceEventsSystemTopicFactory.getSystemTopicName(
+ TopicName.get(topic).getNamespaceObject(),
EventType.TOPIC_POLICY);
+ Awaitility.await().untilAsserted(() -> {
+ SchemaInfo schemaInfo = admin
+ .schemas()
+ .getSchemaInfo(changeEventTopicName.toString());
+ Assert.assertNotNull(schemaInfo);
+ });
+ }
}