This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit dab6014b5213adc04713b31eafd1bc5afe1390b4
Author: Zixuan Liu <[email protected]>
AuthorDate: Wed Jan 26 15:46:18 2022 +0800

    [Broker] Fix read schema compatibility strategy priority (#13938)
    
    Signed-off-by: Zixuan Liu <[email protected]>
    
    When we defined the `schemaCompatibilityStrategy=ALWAYS_INCOMPATIBLE` in 
`broker.conf`, and a namespace policies so like:
    ```
    schema_auto_update_compatibility_strategy = 
SchemaAutoUpdateCompatibilityStrategy.Full
    schema_compatibility_strategy = null
    ```
    We should get `SchemaCompatibilityStrategy.FULL` by `pulsar-admin 
namespaces get-schema-compatibility-strategy <ns>`, but got 
`SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE`, this is incorrect response.
    
    - Use `null` as `Policies#schema_auto_update_compatibility_strategy` value
    - Use `FULL` as `schemaCompatibilityStrategy` value in `broker.conf` and 
update configuration code
    - Only return `Policies#schema_compatibility_strategy` when get schema 
compatibility strategy of namespace
    - Change the read schema compatibility strategy priority
    
    (cherry picked from commit c18d64526708c6dd051217feba3c31a73507620c)
---
 conf/broker.conf                                   |  8 ++--
 .../apache/pulsar/broker/ServiceConfiguration.java | 11 +++--
 .../apache/pulsar/broker/admin/AdminResource.java  | 15 ++++++
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 11 +----
 .../broker/admin/impl/SchemasResourceBase.java     | 42 +++++------------
 .../pulsar/broker/service/AbstractTopic.java       | 18 ++++----
 .../broker/admin/AdminApiSchemaAutoUpdateTest.java | 21 +++------
 .../pulsar/broker/admin/AdminApiSchemaTest.java    | 53 ++++++++++++++++++++++
 .../SchemaCompatibilityCheckTest.java              |  6 +--
 .../SchemaTypeCompatibilityCheckTest.java          |  8 ++--
 .../pulsar/common/policies/data/Policies.java      |  3 +-
 .../policies/data/SchemaCompatibilityStrategy.java |  5 +-
 12 files changed, 121 insertions(+), 80 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index a51a5f7..ae5dd2e 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -1215,12 +1215,10 @@ 
schemaRegistryStorageClassName=org.apache.pulsar.broker.service.schema.Bookkeepe
 #   if you enable this setting, it will cause non-java clients failed to 
produce.
 isSchemaValidationEnforced=false
 
-# The schema compatibility strategy in broker level. If this config in 
namespace policy is `UNDEFINED`,
-# broker will use it in broker level. If schemaCompatibilityStrategy is 
`UNDEFINED` will use `FULL`.
-# SchemaCompatibilityStrategy : UNDEFINED, ALWAYS_INCOMPATIBLE, 
ALWAYS_COMPATIBLE, BACKWARD, FORWARD,
+# The schema compatibility strategy in broker level.
+# SchemaCompatibilityStrategy : ALWAYS_INCOMPATIBLE, ALWAYS_COMPATIBLE, 
BACKWARD, FORWARD,
 # FULL, BACKWARD_TRANSITIVE, FORWARD_TRANSITIVE, FULL_TRANSITIVE
-# default : UNDEFINED
-schemaCompatibilityStrategy=
+schemaCompatibilityStrategy=FULL
 
 ### --- Ledger Offloading --- ###
 
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 21c0e5f..904fcec 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1971,10 +1971,9 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
 
     @FieldContext(
             category = CATEGORY_SCHEMA,
-            doc = "The schema compatibility strategy in broker level. If this 
config in namespace policy is `UNDEFINED`"
-                    + ", schema compatibility strategy check will use it in 
broker level."
+            doc = "The schema compatibility strategy in broker level"
     )
-    private SchemaCompatibilityStrategy schemaCompatibilityStrategy = 
SchemaCompatibilityStrategy.UNDEFINED;
+    private SchemaCompatibilityStrategy schemaCompatibilityStrategy = 
SchemaCompatibilityStrategy.FULL;
 
     /**** --- WebSocket --- ****/
     @FieldContext(
@@ -2396,4 +2395,10 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
         }
     }
 
+    public SchemaCompatibilityStrategy getSchemaCompatibilityStrategy() {
+        if 
(SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) {
+            return SchemaCompatibilityStrategy.FULL;
+        }
+        return schemaCompatibilityStrategy;
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 6e601af..5a31524 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -57,6 +57,7 @@ import 
org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.policies.data.SubscribeRate;
 import org.apache.pulsar.common.policies.data.TopicOperation;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
@@ -798,6 +799,20 @@ public abstract class AdminResource extends 
PulsarWebResource {
         }
     }
 
+    protected CompletableFuture<SchemaCompatibilityStrategy> 
getSchemaCompatibilityStrategyAsync() {
+        return getNamespacePoliciesAsync(namespaceName).thenApply(policies -> {
+            SchemaCompatibilityStrategy schemaCompatibilityStrategy = 
policies.schema_compatibility_strategy;
+            if 
(SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) {
+                schemaCompatibilityStrategy = 
SchemaCompatibilityStrategy.fromAutoUpdatePolicy(
+                        policies.schema_auto_update_compatibility_strategy);
+                if 
(SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) {
+                    schemaCompatibilityStrategy = 
pulsar().getConfig().getSchemaCompatibilityStrategy();
+                }
+            }
+            return schemaCompatibilityStrategy;
+        });
+    }
+
     @CanIgnoreReturnValue
     public static <T> T checkNotNull(T reference) {
         return com.google.common.base.Preconditions.checkNotNull(reference);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index c2ad1d8..9b45cb0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -2349,15 +2349,8 @@ public abstract class NamespacesBase extends 
AdminResource {
         validateNamespacePolicyOperation(namespaceName, 
PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
                 PolicyOperation.READ);
         Policies policies = getNamespacePolicies(namespaceName);
-        SchemaCompatibilityStrategy schemaCompatibilityStrategy = 
policies.schema_compatibility_strategy;
-        if (schemaCompatibilityStrategy == 
SchemaCompatibilityStrategy.UNDEFINED) {
-            schemaCompatibilityStrategy = 
pulsar().getConfig().getSchemaCompatibilityStrategy();
-            if (schemaCompatibilityStrategy == 
SchemaCompatibilityStrategy.UNDEFINED) {
-                schemaCompatibilityStrategy = SchemaCompatibilityStrategy
-                        
.fromAutoUpdatePolicy(policies.schema_auto_update_compatibility_strategy);
-            }
-        }
-        return schemaCompatibilityStrategy;
+
+        return policies.schema_compatibility_strategy;
     }
 
     @Deprecated
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
index d2decdd..a4c3f5e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
@@ -37,8 +37,6 @@ import 
org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataExcep
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.internal.DefaultImplementation;
 import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.Policies;
-import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.protocol.schema.DeleteSchemaResponse;
 import org.apache.pulsar.common.protocol.schema.GetAllVersionsSchemaResponse;
 import org.apache.pulsar.common.protocol.schema.GetSchemaResponse;
@@ -135,16 +133,7 @@ public class SchemasResourceBase extends AdminResource {
     public void postSchema(PostSchemaPayload payload, boolean authoritative, 
AsyncResponse response) {
         validateDestinationAndAdminOperation(authoritative);
 
-        getNamespacePoliciesAsync(namespaceName).thenAccept(policies -> {
-            SchemaCompatibilityStrategy schemaCompatibilityStrategy = 
policies.schema_compatibility_strategy;
-            if (schemaCompatibilityStrategy == 
SchemaCompatibilityStrategy.UNDEFINED) {
-                schemaCompatibilityStrategy =
-                        pulsar().getConfig().getSchemaCompatibilityStrategy();
-                if (schemaCompatibilityStrategy == 
SchemaCompatibilityStrategy.UNDEFINED) {
-                    schemaCompatibilityStrategy = SchemaCompatibilityStrategy
-                            
.fromAutoUpdatePolicy(policies.schema_auto_update_compatibility_strategy);
-                }
-            }
+        
getSchemaCompatibilityStrategyAsync().thenAccept(schemaCompatibilityStrategy -> 
{
             byte[] data;
             if (SchemaType.KEY_VALUE.name().equals(payload.getType())) {
                 data = DefaultImplementation
@@ -192,26 +181,17 @@ public class SchemasResourceBase extends AdminResource {
         validateDestinationAndAdminOperation(authoritative);
 
         String schemaId = getSchemaId();
-        Policies policies = getNamespacePolicies(namespaceName);
-
-        SchemaCompatibilityStrategy schemaCompatibilityStrategy;
-        if (policies.schema_compatibility_strategy == 
SchemaCompatibilityStrategy.UNDEFINED) {
-            schemaCompatibilityStrategy = SchemaCompatibilityStrategy
-                    
.fromAutoUpdatePolicy(policies.schema_auto_update_compatibility_strategy);
-        } else {
-            schemaCompatibilityStrategy = 
policies.schema_compatibility_strategy;
-        }
 
-        pulsar().getSchemaRegistryService()
-                .isCompatible(schemaId,
-                        
SchemaData.builder().data(payload.getSchema().getBytes(Charsets.UTF_8)).isDeleted(false)
-                                
.timestamp(clock.millis()).type(SchemaType.valueOf(payload.getType()))
-                                .user(defaultIfEmpty(clientAppId(), 
"")).props(payload.getProperties()).build(),
-                        schemaCompatibilityStrategy)
-                .thenAccept(isCompatible -> response.resume(Response.accepted()
-                        
.entity(IsCompatibilityResponse.builder().isCompatibility(isCompatible)
-                                
.schemaCompatibilityStrategy(schemaCompatibilityStrategy.name()).build())
-                        .build()))
+        
getSchemaCompatibilityStrategyAsync().thenCompose(schemaCompatibilityStrategy 
-> pulsar()
+                        .getSchemaRegistryService().isCompatible(schemaId,
+                                
SchemaData.builder().data(payload.getSchema().getBytes(Charsets.UTF_8)).isDeleted(false)
+                                        
.timestamp(clock.millis()).type(SchemaType.valueOf(payload.getType()))
+                                        .user(defaultIfEmpty(clientAppId(), 
"")).props(payload.getProperties()).build(),
+                                schemaCompatibilityStrategy)
+                        .thenAccept(isCompatible -> 
response.resume(Response.accepted()
+                                
.entity(IsCompatibilityResponse.builder().isCompatibility(isCompatible)
+                                        
.schemaCompatibilityStrategy(schemaCompatibilityStrategy.name()).build())
+                                .build())))
                 .exceptionally(error -> {
                     response.resume(new RestException(error));
                     return null;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 65296a7..d93a20a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -540,17 +540,19 @@ public abstract class AbstractTopic implements Topic {
         if (SystemTopicClient.isSystemTopic(TopicName.get(this.topic))) {
             schemaCompatibilityStrategy =
                     
brokerService.pulsar().getConfig().getSystemTopicSchemaCompatibilityStrategy();
-        } else if (policies.schema_compatibility_strategy == 
SchemaCompatibilityStrategy.UNDEFINED) {
-            schemaCompatibilityStrategy = brokerService.pulsar()
-                    .getConfig().getSchemaCompatibilityStrategy();
-            if (schemaCompatibilityStrategy == 
SchemaCompatibilityStrategy.UNDEFINED) {
-                schemaCompatibilityStrategy = 
SchemaCompatibilityStrategy.fromAutoUpdatePolicy(
-                        policies.schema_auto_update_compatibility_strategy);
+            return;
+        }
+
+        schemaCompatibilityStrategy = policies.schema_compatibility_strategy;
+        if 
(SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) {
+            schemaCompatibilityStrategy = 
SchemaCompatibilityStrategy.fromAutoUpdatePolicy(
+                    policies.schema_auto_update_compatibility_strategy);
+            if 
(SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) {
+                schemaCompatibilityStrategy = 
brokerService.pulsar().getConfig().getSchemaCompatibilityStrategy();
             }
-        } else {
-            schemaCompatibilityStrategy = 
policies.schema_compatibility_strategy;
         }
     }
+
     private static final Summary PUBLISH_LATENCY = 
Summary.build("pulsar_broker_publish_latency", "-")
             .quantile(0.0)
             .quantile(0.50)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaAutoUpdateTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaAutoUpdateTest.java
index ec2b1e8..1e4d41c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaAutoUpdateTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaAutoUpdateTest.java
@@ -36,9 +36,6 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.policies.data.ClusterDataImpl;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import 
org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -47,9 +44,6 @@ import org.testng.annotations.Test;
 @Slf4j
 @Test(groups = "broker")
 public class AdminApiSchemaAutoUpdateTest extends MockedPulsarServiceBaseTest {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(AdminApiSchemaAutoUpdateTest.class);
-
     @BeforeMethod
     @Override
     public void setup() throws Exception {
@@ -72,8 +66,8 @@ public class AdminApiSchemaAutoUpdateTest extends 
MockedPulsarServiceBaseTest {
     }
 
     private void testAutoUpdateBackward(String namespace, String topicName) 
throws Exception {
-        
Assert.assertEquals(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace),
-                            SchemaAutoUpdateCompatibilityStrategy.Full);
+        
Assert.assertNull(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace));
+
         admin.namespaces().setSchemaAutoUpdateCompatibilityStrategy(namespace,
                                                                     
SchemaAutoUpdateCompatibilityStrategy.Backward);
 
@@ -96,8 +90,8 @@ public class AdminApiSchemaAutoUpdateTest extends 
MockedPulsarServiceBaseTest {
     }
 
     private void testAutoUpdateForward(String namespace, String topicName) 
throws Exception {
-        
Assert.assertEquals(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace),
-                            SchemaAutoUpdateCompatibilityStrategy.Full);
+        
Assert.assertNull(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace));
+
         admin.namespaces().setSchemaAutoUpdateCompatibilityStrategy(namespace,
                                                                     
SchemaAutoUpdateCompatibilityStrategy.Forward);
 
@@ -119,8 +113,7 @@ public class AdminApiSchemaAutoUpdateTest extends 
MockedPulsarServiceBaseTest {
     }
 
     private void testAutoUpdateFull(String namespace, String topicName) throws 
Exception {
-        
Assert.assertEquals(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace),
-                            SchemaAutoUpdateCompatibilityStrategy.Full);
+        
Assert.assertNull(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace));
 
         try (Producer<V1Data> p = 
pulsarClient.newProducer(Schema.AVRO(V1Data.class)).topic(topicName).create()) {
             p.send(new V1Data("test1", 1));
@@ -147,8 +140,8 @@ public class AdminApiSchemaAutoUpdateTest extends 
MockedPulsarServiceBaseTest {
     }
 
     private void testAutoUpdateDisabled(String namespace, String topicName) 
throws Exception {
-        
Assert.assertEquals(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace),
-                            SchemaAutoUpdateCompatibilityStrategy.Full);
+        
Assert.assertNull(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace));
+
         admin.namespaces().setSchemaAutoUpdateCompatibilityStrategy(namespace,
                 SchemaAutoUpdateCompatibilityStrategy.AutoUpdateDisabled);
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java
index dce60e0..64e4251 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.admin;
 
 import static java.lang.String.format;
 import static java.nio.charset.StandardCharsets.US_ASCII;
+import static org.junit.Assert.assertNull;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.doReturn;
 import static org.testng.Assert.assertEquals;
@@ -45,9 +46,11 @@ import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.ClusterDataImpl;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import 
org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
+import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaInfoWithVersion;
+import org.awaitility.Awaitility;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
@@ -61,6 +64,8 @@ import org.testng.annotations.Test;
 public class AdminApiSchemaTest extends MockedPulsarServiceBaseTest {
 
     final String cluster = "test";
+    private final String schemaCompatibilityNamespace = 
"schematest/test-schema-compatibility-ns";
+
     @BeforeMethod
     @Override
     public void setup() throws Exception {
@@ -72,6 +77,7 @@ public class AdminApiSchemaTest extends 
MockedPulsarServiceBaseTest {
         admin.tenants().createTenant("schematest", tenantInfo);
         admin.namespaces().createNamespace("schematest/test", 
Sets.newHashSet("test"));
         admin.namespaces().createNamespace("schematest/"+cluster+"/test", 
Sets.newHashSet("test"));
+        admin.namespaces().createNamespace(schemaCompatibilityNamespace, 
Sets.newHashSet("test"));
     }
 
     @AfterMethod(alwaysRun = true)
@@ -349,4 +355,51 @@ public class AdminApiSchemaTest extends 
MockedPulsarServiceBaseTest {
         assertEquals(ledgerInfo.entries, entryId + 1);
         assertEquals(ledgerInfo.size, length);
     }
+
+    @Test
+    public void testGetSchemaCompatibilityStrategy() throws 
PulsarAdminException {
+        
assertEquals(admin.namespaces().getSchemaCompatibilityStrategy(schemaCompatibilityNamespace),
+                SchemaCompatibilityStrategy.UNDEFINED);
+    }
+
+    @Test
+    public void testGetSchemaAutoUpdateCompatibilityStrategy() throws 
PulsarAdminException {
+        
assertNull(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(schemaCompatibilityNamespace));
+    }
+
+    @Test
+    public void 
testGetSchemaCompatibilityStrategyWhenSetSchemaAutoUpdateCompatibilityStrategy()
+            throws PulsarAdminException {
+        
assertEquals(admin.namespaces().getSchemaCompatibilityStrategy(schemaCompatibilityNamespace),
+                SchemaCompatibilityStrategy.UNDEFINED);
+
+        
admin.namespaces().setSchemaAutoUpdateCompatibilityStrategy(schemaCompatibilityNamespace,
+                SchemaAutoUpdateCompatibilityStrategy.Forward);
+        Awaitility.await().untilAsserted(() -> 
assertEquals(SchemaAutoUpdateCompatibilityStrategy.Forward,
+                
admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(schemaCompatibilityNamespace)
+        ));
+
+        
assertEquals(admin.namespaces().getSchemaCompatibilityStrategy(schemaCompatibilityNamespace),
+                SchemaCompatibilityStrategy.UNDEFINED);
+
+        
admin.namespaces().setSchemaCompatibilityStrategy(schemaCompatibilityNamespace,
+                SchemaCompatibilityStrategy.BACKWARD);
+        Awaitility.await().untilAsserted(() -> 
assertEquals(SchemaCompatibilityStrategy.BACKWARD,
+                
admin.namespaces().getSchemaCompatibilityStrategy(schemaCompatibilityNamespace)));
+    }
+
+    @Test
+    public void 
testGetSchemaCompatibilityStrategyWhenSetBrokerLevelAndSchemaAutoUpdateCompatibilityStrategy()
+            throws PulsarAdminException {
+        
pulsar.getConfiguration().setSchemaCompatibilityStrategy(SchemaCompatibilityStrategy.FORWARD);
+
+        
assertEquals(admin.namespaces().getSchemaCompatibilityStrategy(schemaCompatibilityNamespace),
+                SchemaCompatibilityStrategy.UNDEFINED);
+
+        
admin.namespaces().setSchemaAutoUpdateCompatibilityStrategy(schemaCompatibilityNamespace,
+                SchemaAutoUpdateCompatibilityStrategy.AlwaysCompatible);
+        Awaitility.await().untilAsserted(() -> assertEquals(
+                
admin.namespaces().getSchemaCompatibilityStrategy(schemaCompatibilityNamespace),
+                SchemaCompatibilityStrategy.UNDEFINED));
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
index 80168b9..5b12f37 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
@@ -240,7 +240,7 @@ public class SchemaCompatibilityCheckTest extends 
MockedPulsarServiceBaseTest {
         );
 
         
assertEquals(admin.namespaces().getSchemaCompatibilityStrategy(namespaceName.toString()),
-                SchemaCompatibilityStrategy.FULL);
+                SchemaCompatibilityStrategy.UNDEFINED);
 
         
admin.namespaces().setSchemaCompatibilityStrategy(namespaceName.toString(), 
schemaCompatibilityStrategy);
         admin.schemas().createSchema(fqtn, 
Schema.AVRO(Schemas.PersonOne.class).getSchemaInfo());
@@ -320,7 +320,7 @@ public class SchemaCompatibilityCheckTest extends 
MockedPulsarServiceBaseTest {
         );
 
         
assertEquals(admin.namespaces().getSchemaCompatibilityStrategy(namespaceName.toString()),
-                SchemaCompatibilityStrategy.FULL);
+                SchemaCompatibilityStrategy.UNDEFINED);
 
         
admin.namespaces().setSchemaCompatibilityStrategy(namespaceName.toString(), 
schemaCompatibilityStrategy);
         admin.schemas().createSchema(fqtn, 
Schema.AVRO(Schemas.PersonOne.class).getSchemaInfo());
@@ -399,7 +399,7 @@ public class SchemaCompatibilityCheckTest extends 
MockedPulsarServiceBaseTest {
         );
 
         
assertEquals(admin.namespaces().getSchemaCompatibilityStrategy(namespaceName.toString()),
-                SchemaCompatibilityStrategy.FULL);
+                SchemaCompatibilityStrategy.UNDEFINED);
         byte[] changeSchemaBytes = (new 
String(Schema.AVRO(Schemas.PersonOne.class)
                 .getSchemaInfo().getSchema(), UTF_8) + "/n   /n   
/n").getBytes();
         SchemaInfo schemaInfo = 
SchemaInfo.builder().type(SchemaType.AVRO).schema(changeSchemaBytes).build();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java
index 345c9c7..45f0076 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java
@@ -37,8 +37,8 @@ import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.schema.Schemas;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import java.util.Collections;
@@ -57,7 +57,7 @@ public class SchemaTypeCompatibilityCheckTest extends 
MockedPulsarServiceBaseTes
     private static final String namespace = "test-namespace";
     private static final String namespaceName = PUBLIC_TENANT + "/" + 
namespace;
 
-    @BeforeClass
+    @BeforeMethod
     @Override
     public void setup() throws Exception {
         super.internalSetup();
@@ -73,7 +73,7 @@ public class SchemaTypeCompatibilityCheckTest extends 
MockedPulsarServiceBaseTes
 
     }
 
-    @AfterClass(alwaysRun = true)
+    @AfterMethod(alwaysRun = true)
     @Override
     public void cleanup() throws Exception {
         super.internalCleanup();
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
index 8622715..43d266f 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
@@ -100,8 +100,7 @@ public class Policies {
 
     @SuppressWarnings("checkstyle:MemberName")
     @Deprecated
-    public SchemaAutoUpdateCompatibilityStrategy 
schema_auto_update_compatibility_strategy =
-        SchemaAutoUpdateCompatibilityStrategy.Full;
+    public SchemaAutoUpdateCompatibilityStrategy 
schema_auto_update_compatibility_strategy = null;
 
     @SuppressWarnings("checkstyle:MemberName")
     public SchemaCompatibilityStrategy schema_compatibility_strategy = 
SchemaCompatibilityStrategy.UNDEFINED;
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SchemaCompatibilityStrategy.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SchemaCompatibilityStrategy.java
index 9a4f74c..f3b4569 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SchemaCompatibilityStrategy.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SchemaCompatibilityStrategy.java
@@ -71,10 +71,13 @@ public enum SchemaCompatibilityStrategy {
     FULL_TRANSITIVE;
 
 
+    public static boolean isUndefined(SchemaCompatibilityStrategy strategy) {
+        return strategy == null || strategy == 
SchemaCompatibilityStrategy.UNDEFINED;
+    }
 
     public static SchemaCompatibilityStrategy 
fromAutoUpdatePolicy(SchemaAutoUpdateCompatibilityStrategy strategy) {
         if (strategy == null) {
-            return SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE;
+            return null;
         }
         switch (strategy) {
             case Backward:

Reply via email to