This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7aea58d Allow to configure schema compatibility policy for system
topics (#12598)
7aea58d is described below
commit 7aea58d293ba2ca29e0acbf4cfd5733d84846120
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Nov 3 22:10:39 2021 -0700
Allow to configure schema compatibility policy for system topics (#12598)
---
conf/broker.conf | 3 +++
.../apache/pulsar/broker/ServiceConfiguration.java | 7 ++++++
.../pulsar/broker/service/AbstractTopic.java | 8 ++++++-
.../NamespaceEventsSystemTopicServiceTest.java | 25 ++++++++++++++++++++++
4 files changed, 42 insertions(+), 1 deletion(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index bd37eb9..13e955b 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -534,6 +534,9 @@ zookeeperSessionExpiredPolicy=shutdown
# Enable or disable system topic
systemTopicEnabled=false
+# The schema compatibility strategy to use for system topics
+systemTopicSchemaCompatibilityStrategy=ALWAYS_COMPATIBLE
+
# Enable or disable topic level policies, topic level policies depends on the
system topic
# Please enable the system topic first.
topicLevelPoliciesEnabled=false
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index a6205c3..832a389 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1042,6 +1042,13 @@ public class ServiceConfiguration implements
PulsarConfiguration {
private boolean systemTopicEnabled = false;
@FieldContext(
+ category = CATEGORY_SCHEMA,
+ doc = "The schema compatibility strategy to use for system topics"
+ )
+ private SchemaCompatibilityStrategy systemTopicSchemaCompatibilityStrategy
=
+ SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE;
+
+ @FieldContext(
category = CATEGORY_SERVER,
doc = "Enable or disable topic level policies, topic level policies
depends on the system topic, " +
"please enable the system topic first.")
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 951d602..edf922e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -48,6 +48,7 @@ import
org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
+import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
@@ -93,6 +94,8 @@ public abstract class AbstractTopic implements Topic {
// Whether messages published must be encrypted or not in this topic
protected volatile boolean isEncryptionRequired = false;
+
+ @Getter
protected volatile SchemaCompatibilityStrategy schemaCompatibilityStrategy
=
SchemaCompatibilityStrategy.FULL;
protected volatile boolean isAllowAutoUpdateSchema = true;
@@ -521,7 +524,10 @@ public abstract class AbstractTopic implements Topic {
}
protected void setSchemaCompatibilityStrategy(Policies policies) {
- if (policies.schema_compatibility_strategy ==
SchemaCompatibilityStrategy.UNDEFINED) {
+ if (SystemTopicClient.isSystemTopic(TopicName.get(this.topic))) {
+ schemaCompatibilityStrategy =
+
brokerService.pulsar().getConfig().getSystemTopicSchemaCompatibilityStrategy();
+ } else if (policies.schema_compatibility_strategy ==
SchemaCompatibilityStrategy.UNDEFINED) {
schemaCompatibilityStrategy = brokerService.pulsar()
.getConfig().getSchemaCompatibilityStrategy();
if (schemaCompatibilityStrategy ==
SchemaCompatibilityStrategy.UNDEFINED) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
index b524e1a..2daca67 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
@@ -19,9 +19,14 @@
package org.apache.pulsar.broker.systopic;
import com.google.common.collect.Sets;
+import lombok.Cleanup;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.events.ActionType;
import org.apache.pulsar.common.events.EventType;
import org.apache.pulsar.common.events.EventsTopicNames;
@@ -31,6 +36,7 @@ import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
+import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.slf4j.Logger;
@@ -65,6 +71,25 @@ public class NamespaceEventsSystemTopicServiceTest extends
MockedPulsarServiceBa
}
@Test
+ public void testSchemaCompatibility() throws Exception {
+ TopicPoliciesSystemTopicClient systemTopicClientForNamespace1 =
systemTopicFactory
+
.createTopicPoliciesSystemTopicClient(NamespaceName.get(NAMESPACE1));
+ String topicName =
systemTopicClientForNamespace1.getTopicName().toString();
+ @Cleanup
+ Reader<byte[]> reader = pulsarClient.newReader(Schema.BYTES)
+ .topic(topicName)
+ .startMessageId(MessageId.earliest)
+ .create();
+
+ PersistentTopic topic =
+ (PersistentTopic) pulsar.getBrokerService()
+ .getTopic(topicName, false)
+ .join().get();
+
+ Assert.assertEquals(SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE,
topic.getSchemaCompatibilityStrategy());
+ }
+
+ @Test
public void testSendAndReceiveNamespaceEvents() throws Exception {
TopicPoliciesSystemTopicClient systemTopicClientForNamespace1 =
systemTopicFactory
.createTopicPoliciesSystemTopicClient(NamespaceName.get(NAMESPACE1));