This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 1aecf92ea2cbe1b8e15785257af8d6ee84499489 Author: Matteo Merli <[email protected]> AuthorDate: Wed Nov 3 22:10:39 2021 -0700 Allow to configure schema compatibility policy for system topics (#12598) (cherry picked from commit 7aea58d293ba2ca29e0acbf4cfd5733d84846120) --- conf/broker.conf | 3 +++ .../apache/pulsar/broker/ServiceConfiguration.java | 7 ++++++ .../pulsar/broker/service/AbstractTopic.java | 8 ++++++- .../NamespaceEventsSystemTopicServiceTest.java | 25 ++++++++++++++++++++++ 4 files changed, 42 insertions(+), 1 deletion(-) diff --git a/conf/broker.conf b/conf/broker.conf index 2d7df90..8e518ab 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -534,6 +534,9 @@ zookeeperSessionExpiredPolicy=shutdown # Enable or disable system topic systemTopicEnabled=false +# The schema compatibility strategy to use for system topics +systemTopicSchemaCompatibilityStrategy=ALWAYS_COMPATIBLE + # Enable or disable topic level policies, topic level policies depends on the system topic # Please enable the system topic first. topicLevelPoliciesEnabled=false diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 0d578f4..ce8ab98 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -1044,6 +1044,13 @@ public class ServiceConfiguration implements PulsarConfiguration { private boolean systemTopicEnabled = false; @FieldContext( + category = CATEGORY_SCHEMA, + doc = "The schema compatibility strategy to use for system topics" + ) + private SchemaCompatibilityStrategy systemTopicSchemaCompatibilityStrategy = + SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE; + + @FieldContext( category = CATEGORY_SERVER, doc = "Enable or disable topic level policies, topic level policies depends on the system topic, " + "please enable the system topic first.") diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 183fb05..be4f904 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -47,6 +47,7 @@ import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage; import org.apache.pulsar.broker.service.schema.SchemaRegistryService; import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException; import org.apache.pulsar.broker.stats.prometheus.metrics.Summary; +import org.apache.pulsar.broker.systopic.SystemTopicClient; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; import org.apache.pulsar.common.policies.data.InactiveTopicPolicies; @@ -92,6 +93,8 @@ public abstract class AbstractTopic implements Topic { // Whether messages published must be encrypted or not in this topic protected volatile boolean isEncryptionRequired = false; + + @Getter protected volatile SchemaCompatibilityStrategy schemaCompatibilityStrategy = SchemaCompatibilityStrategy.FULL; protected volatile Boolean isAllowAutoUpdateSchema; @@ -537,7 +540,10 @@ public abstract class AbstractTopic implements Topic { } protected void setSchemaCompatibilityStrategy(Policies policies) { - if (policies.schema_compatibility_strategy == SchemaCompatibilityStrategy.UNDEFINED) { + if (SystemTopicClient.isSystemTopic(TopicName.get(this.topic))) { + schemaCompatibilityStrategy = + brokerService.pulsar().getConfig().getSystemTopicSchemaCompatibilityStrategy(); + } else if (policies.schema_compatibility_strategy == SchemaCompatibilityStrategy.UNDEFINED) { schemaCompatibilityStrategy = brokerService.pulsar() .getConfig().getSchemaCompatibilityStrategy(); if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java index b524e1a..2daca67 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java @@ -19,9 +19,14 @@ package org.apache.pulsar.broker.systopic; import com.google.common.collect.Sets; +import lombok.Cleanup; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.events.ActionType; import org.apache.pulsar.common.events.EventType; import org.apache.pulsar.common.events.EventsTopicNames; @@ -31,6 +36,7 @@ import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.ClusterDataImpl; +import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.policies.data.TopicPolicies; import org.slf4j.Logger; @@ -65,6 +71,25 @@ public class NamespaceEventsSystemTopicServiceTest extends MockedPulsarServiceBa } @Test + public void testSchemaCompatibility() throws Exception { + TopicPoliciesSystemTopicClient systemTopicClientForNamespace1 = systemTopicFactory + .createTopicPoliciesSystemTopicClient(NamespaceName.get(NAMESPACE1)); + String topicName = systemTopicClientForNamespace1.getTopicName().toString(); + @Cleanup + Reader<byte[]> reader = pulsarClient.newReader(Schema.BYTES) + .topic(topicName) + .startMessageId(MessageId.earliest) + .create(); + + PersistentTopic topic = + (PersistentTopic) pulsar.getBrokerService() + .getTopic(topicName, false) + .join().get(); + + Assert.assertEquals(SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE, topic.getSchemaCompatibilityStrategy()); + } + + @Test public void testSendAndReceiveNamespaceEvents() throws Exception { TopicPoliciesSystemTopicClient systemTopicClientForNamespace1 = systemTopicFactory .createTopicPoliciesSystemTopicClient(NamespaceName.get(NAMESPACE1));
