This is an automated email from the ASF dual-hosted git repository. chenhang pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 772ddaec02b44e7f7ba909f258ec79f6897e23a6 Author: congbo <[email protected]> AuthorDate: Wed Sep 1 16:07:04 2021 +0800 [Schema] Schema compatibility strategy in broker level. (#11856) ## Motivation link #11849 Schema compatibility strategy config in broker level. ## implement If namespace schema compatibility strategy is `UNDEFINED`, use broker schema compatibility strategy (cherry picked from commit b079c1e047975179260b64aa9f70c8f384580aba) --- conf/broker.conf | 7 ++++++ .../apache/pulsar/broker/ServiceConfiguration.java | 10 +++++++++ .../pulsar/broker/admin/impl/NamespacesBase.java | 7 ++++-- .../broker/admin/impl/SchemasResourceBase.java | 8 +++++-- .../pulsar/broker/service/AbstractTopic.java | 10 ++++++--- .../SchemaTypeCompatibilityCheckTest.java | 25 ++++++++++++++++++++++ 6 files changed, 60 insertions(+), 7 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 6b20908..0144cdc 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -1208,6 +1208,13 @@ schemaRegistryStorageClassName=org.apache.pulsar.broker.service.schema.Bookkeepe # if you enable this setting, it will cause non-java clients failed to produce. isSchemaValidationEnforced=false +# The schema compatibility strategy in broker level. If this config in namespace policy is `UNDEFINED`, +# broker will use it in broker level. If schemaCompatibilityStrategy is `UNDEFINED` will use `FULL`. +# SchemaCompatibilityStrategy : UNDEFINED, ALWAYS_INCOMPATIBLE, ALWAYS_COMPATIBLE, BACKWARD, FORWARD, +# FULL, BACKWARD_TRANSITIVE, FORWARD_TRANSITIVE, FULL_TRANSITIVE +# default : UNDEFINED +schemaCompatibilityStrategy= + ### --- Ledger Offloading --- ### # The directory for all the offloader implementations diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 1043a8a..49e0e19 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -40,6 +40,7 @@ import org.apache.pulsar.common.nar.NarClassLoader; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; import org.apache.pulsar.common.policies.data.OffloadedReadPriority; +import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.policies.data.TopicType; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.sasl.SaslConstants; @@ -1936,11 +1937,13 @@ public class ServiceConfiguration implements PulsarConfiguration { + " if you enable this setting, it will cause non-java clients failed to produce." ) private boolean isSchemaValidationEnforced = false; + @FieldContext( category = CATEGORY_SCHEMA, doc = "The schema storage implementation used by this broker" ) private String schemaRegistryStorageClassName = "org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory"; + @FieldContext( category = CATEGORY_SCHEMA, doc = "The list compatibility checkers to be used in schema registry" @@ -1951,6 +1954,13 @@ public class ServiceConfiguration implements PulsarConfiguration { "org.apache.pulsar.broker.service.schema.ProtobufNativeSchemaCompatibilityCheck" ); + @FieldContext( + category = CATEGORY_SCHEMA, + doc = "The schema compatibility strategy in broker level. If this config in namespace policy is `UNDEFINED`" + + ", schema compatibility strategy check will use it in broker level." + ) + private SchemaCompatibilityStrategy schemaCompatibilityStrategy = SchemaCompatibilityStrategy.UNDEFINED; + /**** --- WebSocket --- ****/ @FieldContext( category = CATEGORY_WEBSOCKET, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 04f13b8..5d78216 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -2357,8 +2357,11 @@ public abstract class NamespacesBase extends AdminResource { Policies policies = getNamespacePolicies(namespaceName); SchemaCompatibilityStrategy schemaCompatibilityStrategy = policies.schema_compatibility_strategy; if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED) { - schemaCompatibilityStrategy = SchemaCompatibilityStrategy - .fromAutoUpdatePolicy(policies.schema_auto_update_compatibility_strategy); + schemaCompatibilityStrategy = pulsar().getConfig().getSchemaCompatibilityStrategy(); + if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED) { + schemaCompatibilityStrategy = SchemaCompatibilityStrategy + .fromAutoUpdatePolicy(policies.schema_auto_update_compatibility_strategy); + } } return schemaCompatibilityStrategy; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java index 0899b92..cab4ee8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java @@ -138,8 +138,12 @@ public class SchemasResourceBase extends AdminResource { getNamespacePoliciesAsync(namespaceName).thenAccept(policies -> { SchemaCompatibilityStrategy schemaCompatibilityStrategy = policies.schema_compatibility_strategy; if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED) { - schemaCompatibilityStrategy = SchemaCompatibilityStrategy - .fromAutoUpdatePolicy(policies.schema_auto_update_compatibility_strategy); + schemaCompatibilityStrategy = + pulsar().getConfig().getSchemaCompatibilityStrategy(); + if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED) { + schemaCompatibilityStrategy = SchemaCompatibilityStrategy + .fromAutoUpdatePolicy(policies.schema_auto_update_compatibility_strategy); + } } byte[] data; if (SchemaType.KEY_VALUE.name().equals(payload.getType())) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index ff08aba..cb66734 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -524,10 +524,14 @@ public abstract class AbstractTopic implements Topic { PUBLISH_LATENCY.observe(latency, unit); } - protected void setSchemaCompatibilityStrategy (Policies policies) { + protected void setSchemaCompatibilityStrategy(Policies policies) { if (policies.schema_compatibility_strategy == SchemaCompatibilityStrategy.UNDEFINED) { - schemaCompatibilityStrategy = SchemaCompatibilityStrategy.fromAutoUpdatePolicy( - policies.schema_auto_update_compatibility_strategy); + schemaCompatibilityStrategy = brokerService.pulsar() + .getConfig().getSchemaCompatibilityStrategy(); + if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED) { + schemaCompatibilityStrategy = SchemaCompatibilityStrategy.fromAutoUpdatePolicy( + policies.schema_auto_update_compatibility_strategy); + } } else { schemaCompatibilityStrategy = policies.schema_compatibility_strategy; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java index 9b9c600..345c9c7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java @@ -27,6 +27,7 @@ import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.schema.SchemaDefinition; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; @@ -79,6 +80,30 @@ public class SchemaTypeCompatibilityCheckTest extends MockedPulsarServiceBaseTes } @Test + public void testSchemaCompatibilityStrategyInBrokerLevel() throws PulsarClientException { + conf.setSchemaCompatibilityStrategy(SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE); + + String topicName = TopicName.get( + TopicDomain.persistent.value(), + PUBLIC_TENANT, + namespace, + "testSchemaCompatibilityStrategyInBrokerLevel" + ).toString(); + + pulsarClient.newProducer(Schema.AVRO(SchemaDefinition.<Schemas.PersonOne>builder(). + withAlwaysAllowNull(true).withPojo(Schemas.PersonOne.class).build())) + .topic(topicName) + .create(); + + ProducerBuilder<Schemas.PersonThree> producerBuilder = pulsarClient.newProducer(Schema.AVRO(SchemaDefinition + .<Schemas.PersonThree>builder().withAlwaysAllowNull(true).withPojo(Schemas.PersonThree.class).build())) + .topic(topicName); + + Throwable t = expectThrows(PulsarClientException.IncompatibleSchemaException.class, producerBuilder::create); + assertTrue(t.getMessage().contains("org.apache.avro.SchemaValidationException: Unable to read schema")); + } + + @Test public void structTypeProducerProducerUndefinedCompatible() throws Exception { admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.UNDEFINED);
