This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.8 by this push:
     new 7869cd098b1 Fix can not enable system topic if 
`AutoUpdateSchemaEnabled=false`. (#15759)
7869cd098b1 is described below

commit 7869cd098b103948495f8f133456b1ed40bed681
Author: Jiwei Guo <[email protected]>
AuthorDate: Tue May 24 19:53:01 2022 -0700

    Fix can not enable system topic if `AutoUpdateSchemaEnabled=false`. (#15759)
---
 .../apache/pulsar/broker/service/AbstractTopic.java  |  3 +++
 .../apache/pulsar/broker/service/BrokerService.java  |  2 +-
 .../SystemTopicBasedTopicPoliciesServiceTest.java    | 20 ++++++++++++++++++++
 3 files changed, 24 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index d93a20a5390..c279320a964 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -348,6 +348,9 @@ public abstract class AbstractTopic implements Topic {
     }
 
     private boolean allowAutoUpdateSchema() {
+        if (brokerService.isSystemTopic(topic)) {
+            return true;
+        }
         if (isAllowAutoUpdateSchema == null) {
             return 
brokerService.pulsar().getConfig().isAllowAutoUpdateSchemaEnabled();
         }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index c45a0ccb6a4..a5e013230c3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2710,7 +2710,7 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
         log.debug("No autoSubscriptionCreateOverride policy found for {}", 
topicName);
         return null;
     }
-    private boolean isSystemTopic(String topic) {
+    public boolean isSystemTopic(String topic) {
         return SystemTopicClient.isSystemTopic(TopicName.get(topic));
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index 8b5f4203c84..b818b56dec7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -49,12 +49,14 @@ import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.client.impl.BackoffBuilder;
 import org.apache.pulsar.client.impl.ReaderImpl;
+import org.apache.pulsar.common.events.EventType;
 import org.apache.pulsar.common.events.PulsarEvent;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
+import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
@@ -376,4 +378,22 @@ public class SystemTopicBasedTopicPoliciesServiceTest 
extends MockedPulsarServic
             }
         });
     }
+
+    @Test
+    public void testAutoCreateSchema() throws Exception {
+        String namespace = "system-topic/ns2";
+        String topic = namespace + "/test";
+        admin.namespaces().createNamespace(namespace);
+        admin.namespaces().setIsAllowAutoUpdateSchema(namespace, false);
+        admin.topics().createNonPartitionedTopic(topic);
+        TopicName changeEventTopicName =
+                NamespaceEventsSystemTopicFactory.getSystemTopicName(
+                        TopicName.get(topic).getNamespaceObject(), 
EventType.TOPIC_POLICY);
+        Awaitility.await().untilAsserted(() -> {
+            SchemaInfo schemaInfo = admin
+                    .schemas()
+                    .getSchemaInfo(changeEventTopicName.toString());
+            Assert.assertNotNull(schemaInfo);
+        });
+    }
 }

Reply via email to