nodece commented on a change in pull request #13297:
URL: https://github.com/apache/pulsar/pull/13297#discussion_r782734029



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
##########
@@ -181,11 +211,48 @@ protected void 
updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
         
topicPolicies.getDeduplicationEnabled().updateNamespaceValue(namespacePolicies.deduplicationEnabled);
         topicPolicies.getSubscriptionTypesEnabled().updateNamespaceValue(
                 
subTypeStringsToEnumSet(namespacePolicies.subscription_types_enabled));
+
+        updateSchemaCompatibilityStrategyNamespaceValue(namespacePolicies);
+
         Arrays.stream(BacklogQuota.BacklogQuotaType.values()).forEach(
                 type -> this.topicPolicies.getBackLogQuotaMap().get(type)
                         
.updateNamespaceValue(MapUtils.getObject(namespacePolicies.backlog_quota_map, 
type)));
     }
 
+    private void updateSchemaCompatibilityStrategyNamespaceValue(Policies 
namespacePolicies){
+        SchemaCompatibilityStrategy schemaCompatibilityStrategy =
+                
formatSchemaCompatibilityStrategy(namespacePolicies.schema_compatibility_strategy);
+        if (this.isSystemTopic()) {
+            
topicPolicies.getSchemaCompatibilityStrategy().updateNamespaceValue(schemaCompatibilityStrategy);
+            return;
+        }
+        
topicPolicies.getSchemaCompatibilityStrategy().updateNamespaceValue(schemaCompatibilityStrategy);
+
+        // If the broker config is null, use 
schema_auto_update_compatibility_strategy from namespace policies as the
+        // broker value.
+        // This is the initial broker config.
+        SchemaCompatibilityStrategy brokerSchemaCompatibilityStrategyFromCache 
=
+                
topicPolicies.getSchemaCompatibilityStrategy().getBrokerValue();
+        SchemaCompatibilityStrategy 
brokerSchemaCompatibilityStrategyFromPulsarConfig =
+                
brokerService.pulsar().getConfiguration().getSchemaCompatibilityStrategy();
+        if (brokerSchemaCompatibilityStrategyFromPulsarConfig == 
SchemaCompatibilityStrategy.UNDEFINED) {
+            brokerSchemaCompatibilityStrategyFromPulsarConfig = null;
+        }
+
+        // When the initial broker config is null or the initial broker config 
is not equal to the current broker

Review comment:
       Backward compatibility, set schema_auto_update_compatibility_strategy 
value as broker value, it is the lowest priority:
   - When the broker value is null in topic policies, set 
schema_auto_update_compatibility_strategy as broker value
   - When the broker value from topic policies does not equal from Pulsar 
config, need to update the schema_auto_update_compatibility_strategy to broker 
value




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to