This is an automated email from the ASF dual-hosted git repository. yong pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit dab6014b5213adc04713b31eafd1bc5afe1390b4 Author: Zixuan Liu <[email protected]> AuthorDate: Wed Jan 26 15:46:18 2022 +0800 [Broker] Fix read schema compatibility strategy priority (#13938) Signed-off-by: Zixuan Liu <[email protected]> When we defined the `schemaCompatibilityStrategy=ALWAYS_INCOMPATIBLE` in `broker.conf`, and a namespace policies so like: ``` schema_auto_update_compatibility_strategy = SchemaAutoUpdateCompatibilityStrategy.Full schema_compatibility_strategy = null ``` We should get `SchemaCompatibilityStrategy.FULL` by `pulsar-admin namespaces get-schema-compatibility-strategy <ns>`, but got `SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE`, this is incorrect response. - Use `null` as `Policies#schema_auto_update_compatibility_strategy` value - Use `FULL` as `schemaCompatibilityStrategy` value in `broker.conf` and update configuration code - Only return `Policies#schema_compatibility_strategy` when get schema compatibility strategy of namespace - Change the read schema compatibility strategy priority (cherry picked from commit c18d64526708c6dd051217feba3c31a73507620c) --- conf/broker.conf | 8 ++-- .../apache/pulsar/broker/ServiceConfiguration.java | 11 +++-- .../apache/pulsar/broker/admin/AdminResource.java | 15 ++++++ .../pulsar/broker/admin/impl/NamespacesBase.java | 11 +---- .../broker/admin/impl/SchemasResourceBase.java | 42 +++++------------ .../pulsar/broker/service/AbstractTopic.java | 18 ++++---- .../broker/admin/AdminApiSchemaAutoUpdateTest.java | 21 +++------ .../pulsar/broker/admin/AdminApiSchemaTest.java | 53 ++++++++++++++++++++++ .../SchemaCompatibilityCheckTest.java | 6 +-- .../SchemaTypeCompatibilityCheckTest.java | 8 ++-- .../pulsar/common/policies/data/Policies.java | 3 +- .../policies/data/SchemaCompatibilityStrategy.java | 5 +- 12 files changed, 121 insertions(+), 80 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index a51a5f7..ae5dd2e 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -1215,12 +1215,10 @@ schemaRegistryStorageClassName=org.apache.pulsar.broker.service.schema.Bookkeepe # if you enable this setting, it will cause non-java clients failed to produce. isSchemaValidationEnforced=false -# The schema compatibility strategy in broker level. If this config in namespace policy is `UNDEFINED`, -# broker will use it in broker level. If schemaCompatibilityStrategy is `UNDEFINED` will use `FULL`. -# SchemaCompatibilityStrategy : UNDEFINED, ALWAYS_INCOMPATIBLE, ALWAYS_COMPATIBLE, BACKWARD, FORWARD, +# The schema compatibility strategy in broker level. +# SchemaCompatibilityStrategy : ALWAYS_INCOMPATIBLE, ALWAYS_COMPATIBLE, BACKWARD, FORWARD, # FULL, BACKWARD_TRANSITIVE, FORWARD_TRANSITIVE, FULL_TRANSITIVE -# default : UNDEFINED -schemaCompatibilityStrategy= +schemaCompatibilityStrategy=FULL ### --- Ledger Offloading --- ### diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 21c0e5f..904fcec 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -1971,10 +1971,9 @@ public class ServiceConfiguration implements PulsarConfiguration { @FieldContext( category = CATEGORY_SCHEMA, - doc = "The schema compatibility strategy in broker level. If this config in namespace policy is `UNDEFINED`" - + ", schema compatibility strategy check will use it in broker level." + doc = "The schema compatibility strategy in broker level" ) - private SchemaCompatibilityStrategy schemaCompatibilityStrategy = SchemaCompatibilityStrategy.UNDEFINED; + private SchemaCompatibilityStrategy schemaCompatibilityStrategy = SchemaCompatibilityStrategy.FULL; /**** --- WebSocket --- ****/ @FieldContext( @@ -2396,4 +2395,10 @@ public class ServiceConfiguration implements PulsarConfiguration { } } + public SchemaCompatibilityStrategy getSchemaCompatibilityStrategy() { + if (SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) { + return SchemaCompatibilityStrategy.FULL; + } + return schemaCompatibilityStrategy; + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index 6e601af..5a31524 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -57,6 +57,7 @@ import org.apache.pulsar.common.policies.data.NamespaceOperation; import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.RetentionPolicies; +import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.policies.data.SubscribeRate; import org.apache.pulsar.common.policies.data.TopicOperation; import org.apache.pulsar.common.policies.data.TopicPolicies; @@ -798,6 +799,20 @@ public abstract class AdminResource extends PulsarWebResource { } } + protected CompletableFuture<SchemaCompatibilityStrategy> getSchemaCompatibilityStrategyAsync() { + return getNamespacePoliciesAsync(namespaceName).thenApply(policies -> { + SchemaCompatibilityStrategy schemaCompatibilityStrategy = policies.schema_compatibility_strategy; + if (SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) { + schemaCompatibilityStrategy = SchemaCompatibilityStrategy.fromAutoUpdatePolicy( + policies.schema_auto_update_compatibility_strategy); + if (SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) { + schemaCompatibilityStrategy = pulsar().getConfig().getSchemaCompatibilityStrategy(); + } + } + return schemaCompatibilityStrategy; + }); + } + @CanIgnoreReturnValue public static <T> T checkNotNull(T reference) { return com.google.common.base.Preconditions.checkNotNull(reference); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index c2ad1d8..9b45cb0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -2349,15 +2349,8 @@ public abstract class NamespacesBase extends AdminResource { validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.READ); Policies policies = getNamespacePolicies(namespaceName); - SchemaCompatibilityStrategy schemaCompatibilityStrategy = policies.schema_compatibility_strategy; - if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED) { - schemaCompatibilityStrategy = pulsar().getConfig().getSchemaCompatibilityStrategy(); - if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED) { - schemaCompatibilityStrategy = SchemaCompatibilityStrategy - .fromAutoUpdatePolicy(policies.schema_auto_update_compatibility_strategy); - } - } - return schemaCompatibilityStrategy; + + return policies.schema_compatibility_strategy; } @Deprecated diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java index d2decdd..a4c3f5e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java @@ -37,8 +37,6 @@ import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataExcep import org.apache.pulsar.broker.web.RestException; import org.apache.pulsar.client.internal.DefaultImplementation; import org.apache.pulsar.common.naming.TopicName; -import org.apache.pulsar.common.policies.data.Policies; -import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.protocol.schema.DeleteSchemaResponse; import org.apache.pulsar.common.protocol.schema.GetAllVersionsSchemaResponse; import org.apache.pulsar.common.protocol.schema.GetSchemaResponse; @@ -135,16 +133,7 @@ public class SchemasResourceBase extends AdminResource { public void postSchema(PostSchemaPayload payload, boolean authoritative, AsyncResponse response) { validateDestinationAndAdminOperation(authoritative); - getNamespacePoliciesAsync(namespaceName).thenAccept(policies -> { - SchemaCompatibilityStrategy schemaCompatibilityStrategy = policies.schema_compatibility_strategy; - if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED) { - schemaCompatibilityStrategy = - pulsar().getConfig().getSchemaCompatibilityStrategy(); - if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED) { - schemaCompatibilityStrategy = SchemaCompatibilityStrategy - .fromAutoUpdatePolicy(policies.schema_auto_update_compatibility_strategy); - } - } + getSchemaCompatibilityStrategyAsync().thenAccept(schemaCompatibilityStrategy -> { byte[] data; if (SchemaType.KEY_VALUE.name().equals(payload.getType())) { data = DefaultImplementation @@ -192,26 +181,17 @@ public class SchemasResourceBase extends AdminResource { validateDestinationAndAdminOperation(authoritative); String schemaId = getSchemaId(); - Policies policies = getNamespacePolicies(namespaceName); - - SchemaCompatibilityStrategy schemaCompatibilityStrategy; - if (policies.schema_compatibility_strategy == SchemaCompatibilityStrategy.UNDEFINED) { - schemaCompatibilityStrategy = SchemaCompatibilityStrategy - .fromAutoUpdatePolicy(policies.schema_auto_update_compatibility_strategy); - } else { - schemaCompatibilityStrategy = policies.schema_compatibility_strategy; - } - pulsar().getSchemaRegistryService() - .isCompatible(schemaId, - SchemaData.builder().data(payload.getSchema().getBytes(Charsets.UTF_8)).isDeleted(false) - .timestamp(clock.millis()).type(SchemaType.valueOf(payload.getType())) - .user(defaultIfEmpty(clientAppId(), "")).props(payload.getProperties()).build(), - schemaCompatibilityStrategy) - .thenAccept(isCompatible -> response.resume(Response.accepted() - .entity(IsCompatibilityResponse.builder().isCompatibility(isCompatible) - .schemaCompatibilityStrategy(schemaCompatibilityStrategy.name()).build()) - .build())) + getSchemaCompatibilityStrategyAsync().thenCompose(schemaCompatibilityStrategy -> pulsar() + .getSchemaRegistryService().isCompatible(schemaId, + SchemaData.builder().data(payload.getSchema().getBytes(Charsets.UTF_8)).isDeleted(false) + .timestamp(clock.millis()).type(SchemaType.valueOf(payload.getType())) + .user(defaultIfEmpty(clientAppId(), "")).props(payload.getProperties()).build(), + schemaCompatibilityStrategy) + .thenAccept(isCompatible -> response.resume(Response.accepted() + .entity(IsCompatibilityResponse.builder().isCompatibility(isCompatible) + .schemaCompatibilityStrategy(schemaCompatibilityStrategy.name()).build()) + .build()))) .exceptionally(error -> { response.resume(new RestException(error)); return null; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 65296a7..d93a20a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -540,17 +540,19 @@ public abstract class AbstractTopic implements Topic { if (SystemTopicClient.isSystemTopic(TopicName.get(this.topic))) { schemaCompatibilityStrategy = brokerService.pulsar().getConfig().getSystemTopicSchemaCompatibilityStrategy(); - } else if (policies.schema_compatibility_strategy == SchemaCompatibilityStrategy.UNDEFINED) { - schemaCompatibilityStrategy = brokerService.pulsar() - .getConfig().getSchemaCompatibilityStrategy(); - if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED) { - schemaCompatibilityStrategy = SchemaCompatibilityStrategy.fromAutoUpdatePolicy( - policies.schema_auto_update_compatibility_strategy); + return; + } + + schemaCompatibilityStrategy = policies.schema_compatibility_strategy; + if (SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) { + schemaCompatibilityStrategy = SchemaCompatibilityStrategy.fromAutoUpdatePolicy( + policies.schema_auto_update_compatibility_strategy); + if (SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) { + schemaCompatibilityStrategy = brokerService.pulsar().getConfig().getSchemaCompatibilityStrategy(); } - } else { - schemaCompatibilityStrategy = policies.schema_compatibility_strategy; } } + private static final Summary PUBLISH_LATENCY = Summary.build("pulsar_broker_publish_latency", "-") .quantile(0.0) .quantile(0.50) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaAutoUpdateTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaAutoUpdateTest.java index ec2b1e8..1e4d41c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaAutoUpdateTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaAutoUpdateTest.java @@ -36,9 +36,6 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.policies.data.ClusterDataImpl; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -47,9 +44,6 @@ import org.testng.annotations.Test; @Slf4j @Test(groups = "broker") public class AdminApiSchemaAutoUpdateTest extends MockedPulsarServiceBaseTest { - - private static final Logger LOG = LoggerFactory.getLogger(AdminApiSchemaAutoUpdateTest.class); - @BeforeMethod @Override public void setup() throws Exception { @@ -72,8 +66,8 @@ public class AdminApiSchemaAutoUpdateTest extends MockedPulsarServiceBaseTest { } private void testAutoUpdateBackward(String namespace, String topicName) throws Exception { - Assert.assertEquals(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace), - SchemaAutoUpdateCompatibilityStrategy.Full); + Assert.assertNull(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace)); + admin.namespaces().setSchemaAutoUpdateCompatibilityStrategy(namespace, SchemaAutoUpdateCompatibilityStrategy.Backward); @@ -96,8 +90,8 @@ public class AdminApiSchemaAutoUpdateTest extends MockedPulsarServiceBaseTest { } private void testAutoUpdateForward(String namespace, String topicName) throws Exception { - Assert.assertEquals(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace), - SchemaAutoUpdateCompatibilityStrategy.Full); + Assert.assertNull(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace)); + admin.namespaces().setSchemaAutoUpdateCompatibilityStrategy(namespace, SchemaAutoUpdateCompatibilityStrategy.Forward); @@ -119,8 +113,7 @@ public class AdminApiSchemaAutoUpdateTest extends MockedPulsarServiceBaseTest { } private void testAutoUpdateFull(String namespace, String topicName) throws Exception { - Assert.assertEquals(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace), - SchemaAutoUpdateCompatibilityStrategy.Full); + Assert.assertNull(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace)); try (Producer<V1Data> p = pulsarClient.newProducer(Schema.AVRO(V1Data.class)).topic(topicName).create()) { p.send(new V1Data("test1", 1)); @@ -147,8 +140,8 @@ public class AdminApiSchemaAutoUpdateTest extends MockedPulsarServiceBaseTest { } private void testAutoUpdateDisabled(String namespace, String topicName) throws Exception { - Assert.assertEquals(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace), - SchemaAutoUpdateCompatibilityStrategy.Full); + Assert.assertNull(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace)); + admin.namespaces().setSchemaAutoUpdateCompatibilityStrategy(namespace, SchemaAutoUpdateCompatibilityStrategy.AutoUpdateDisabled); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java index dce60e0..64e4251 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java @@ -20,6 +20,7 @@ package org.apache.pulsar.broker.admin; import static java.lang.String.format; import static java.nio.charset.StandardCharsets.US_ASCII; +import static org.junit.Assert.assertNull; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.doReturn; import static org.testng.Assert.assertEquals; @@ -45,9 +46,11 @@ import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.ClusterDataImpl; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy; +import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaInfoWithVersion; +import org.awaitility.Awaitility; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; @@ -61,6 +64,8 @@ import org.testng.annotations.Test; public class AdminApiSchemaTest extends MockedPulsarServiceBaseTest { final String cluster = "test"; + private final String schemaCompatibilityNamespace = "schematest/test-schema-compatibility-ns"; + @BeforeMethod @Override public void setup() throws Exception { @@ -72,6 +77,7 @@ public class AdminApiSchemaTest extends MockedPulsarServiceBaseTest { admin.tenants().createTenant("schematest", tenantInfo); admin.namespaces().createNamespace("schematest/test", Sets.newHashSet("test")); admin.namespaces().createNamespace("schematest/"+cluster+"/test", Sets.newHashSet("test")); + admin.namespaces().createNamespace(schemaCompatibilityNamespace, Sets.newHashSet("test")); } @AfterMethod(alwaysRun = true) @@ -349,4 +355,51 @@ public class AdminApiSchemaTest extends MockedPulsarServiceBaseTest { assertEquals(ledgerInfo.entries, entryId + 1); assertEquals(ledgerInfo.size, length); } + + @Test + public void testGetSchemaCompatibilityStrategy() throws PulsarAdminException { + assertEquals(admin.namespaces().getSchemaCompatibilityStrategy(schemaCompatibilityNamespace), + SchemaCompatibilityStrategy.UNDEFINED); + } + + @Test + public void testGetSchemaAutoUpdateCompatibilityStrategy() throws PulsarAdminException { + assertNull(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(schemaCompatibilityNamespace)); + } + + @Test + public void testGetSchemaCompatibilityStrategyWhenSetSchemaAutoUpdateCompatibilityStrategy() + throws PulsarAdminException { + assertEquals(admin.namespaces().getSchemaCompatibilityStrategy(schemaCompatibilityNamespace), + SchemaCompatibilityStrategy.UNDEFINED); + + admin.namespaces().setSchemaAutoUpdateCompatibilityStrategy(schemaCompatibilityNamespace, + SchemaAutoUpdateCompatibilityStrategy.Forward); + Awaitility.await().untilAsserted(() -> assertEquals(SchemaAutoUpdateCompatibilityStrategy.Forward, + admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(schemaCompatibilityNamespace) + )); + + assertEquals(admin.namespaces().getSchemaCompatibilityStrategy(schemaCompatibilityNamespace), + SchemaCompatibilityStrategy.UNDEFINED); + + admin.namespaces().setSchemaCompatibilityStrategy(schemaCompatibilityNamespace, + SchemaCompatibilityStrategy.BACKWARD); + Awaitility.await().untilAsserted(() -> assertEquals(SchemaCompatibilityStrategy.BACKWARD, + admin.namespaces().getSchemaCompatibilityStrategy(schemaCompatibilityNamespace))); + } + + @Test + public void testGetSchemaCompatibilityStrategyWhenSetBrokerLevelAndSchemaAutoUpdateCompatibilityStrategy() + throws PulsarAdminException { + pulsar.getConfiguration().setSchemaCompatibilityStrategy(SchemaCompatibilityStrategy.FORWARD); + + assertEquals(admin.namespaces().getSchemaCompatibilityStrategy(schemaCompatibilityNamespace), + SchemaCompatibilityStrategy.UNDEFINED); + + admin.namespaces().setSchemaAutoUpdateCompatibilityStrategy(schemaCompatibilityNamespace, + SchemaAutoUpdateCompatibilityStrategy.AlwaysCompatible); + Awaitility.await().untilAsserted(() -> assertEquals( + admin.namespaces().getSchemaCompatibilityStrategy(schemaCompatibilityNamespace), + SchemaCompatibilityStrategy.UNDEFINED)); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java index 80168b9..5b12f37 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java @@ -240,7 +240,7 @@ public class SchemaCompatibilityCheckTest extends MockedPulsarServiceBaseTest { ); assertEquals(admin.namespaces().getSchemaCompatibilityStrategy(namespaceName.toString()), - SchemaCompatibilityStrategy.FULL); + SchemaCompatibilityStrategy.UNDEFINED); admin.namespaces().setSchemaCompatibilityStrategy(namespaceName.toString(), schemaCompatibilityStrategy); admin.schemas().createSchema(fqtn, Schema.AVRO(Schemas.PersonOne.class).getSchemaInfo()); @@ -320,7 +320,7 @@ public class SchemaCompatibilityCheckTest extends MockedPulsarServiceBaseTest { ); assertEquals(admin.namespaces().getSchemaCompatibilityStrategy(namespaceName.toString()), - SchemaCompatibilityStrategy.FULL); + SchemaCompatibilityStrategy.UNDEFINED); admin.namespaces().setSchemaCompatibilityStrategy(namespaceName.toString(), schemaCompatibilityStrategy); admin.schemas().createSchema(fqtn, Schema.AVRO(Schemas.PersonOne.class).getSchemaInfo()); @@ -399,7 +399,7 @@ public class SchemaCompatibilityCheckTest extends MockedPulsarServiceBaseTest { ); assertEquals(admin.namespaces().getSchemaCompatibilityStrategy(namespaceName.toString()), - SchemaCompatibilityStrategy.FULL); + SchemaCompatibilityStrategy.UNDEFINED); byte[] changeSchemaBytes = (new String(Schema.AVRO(Schemas.PersonOne.class) .getSchemaInfo().getSchema(), UTF_8) + "/n /n /n").getBytes(); SchemaInfo schemaInfo = SchemaInfo.builder().type(SchemaType.AVRO).schema(changeSchemaBytes).build(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java index 345c9c7..45f0076 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java @@ -37,8 +37,8 @@ import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.schema.Schemas; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import java.util.Collections; @@ -57,7 +57,7 @@ public class SchemaTypeCompatibilityCheckTest extends MockedPulsarServiceBaseTes private static final String namespace = "test-namespace"; private static final String namespaceName = PUBLIC_TENANT + "/" + namespace; - @BeforeClass + @BeforeMethod @Override public void setup() throws Exception { super.internalSetup(); @@ -73,7 +73,7 @@ public class SchemaTypeCompatibilityCheckTest extends MockedPulsarServiceBaseTes } - @AfterClass(alwaysRun = true) + @AfterMethod(alwaysRun = true) @Override public void cleanup() throws Exception { super.internalCleanup(); diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java index 8622715..43d266f 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java @@ -100,8 +100,7 @@ public class Policies { @SuppressWarnings("checkstyle:MemberName") @Deprecated - public SchemaAutoUpdateCompatibilityStrategy schema_auto_update_compatibility_strategy = - SchemaAutoUpdateCompatibilityStrategy.Full; + public SchemaAutoUpdateCompatibilityStrategy schema_auto_update_compatibility_strategy = null; @SuppressWarnings("checkstyle:MemberName") public SchemaCompatibilityStrategy schema_compatibility_strategy = SchemaCompatibilityStrategy.UNDEFINED; diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SchemaCompatibilityStrategy.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SchemaCompatibilityStrategy.java index 9a4f74c..f3b4569 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SchemaCompatibilityStrategy.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SchemaCompatibilityStrategy.java @@ -71,10 +71,13 @@ public enum SchemaCompatibilityStrategy { FULL_TRANSITIVE; + public static boolean isUndefined(SchemaCompatibilityStrategy strategy) { + return strategy == null || strategy == SchemaCompatibilityStrategy.UNDEFINED; + } public static SchemaCompatibilityStrategy fromAutoUpdatePolicy(SchemaAutoUpdateCompatibilityStrategy strategy) { if (strategy == null) { - return SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE; + return null; } switch (strategy) { case Backward:
