nodece commented on a change in pull request #13297:
URL: https://github.com/apache/pulsar/pull/13297#discussion_r789355170



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
##########
@@ -205,6 +217,33 @@ protected void updateTopicPolicyByNamespacePolicy(Policies 
namespacePolicies) {
         Arrays.stream(BacklogQuota.BacklogQuotaType.values()).forEach(
                 type -> this.topicPolicies.getBackLogQuotaMap().get(type)
                         
.updateNamespaceValue(MapUtils.getObject(namespacePolicies.backlog_quota_map, 
type)));
+        updateSchemaCompatibilityStrategyNamespaceValue(namespacePolicies);
+    }
+
+    private void updateSchemaCompatibilityStrategyNamespaceValue(Policies 
namespacePolicies){
+        if (this.isSystemTopic()) {
+            return;
+        }
+
+        topicPolicies.getSchemaCompatibilityStrategy().updateNamespaceValue(
+                
formatSchemaCompatibilityStrategy(namespacePolicies.schema_compatibility_strategy));
+
+        // Backward compatibility: set 
schema_auto_update_compatibility_strategy value as broker value,
+        // it is the lowest priority.
+        // When the broker value is null in topic policies, set 
schema_auto_update_compatibility_strategy
+        // as broker value.
+        // When the broker value from topic policies does not equal from 
Pulsar config, need to update the
+        // schema_auto_update_compatibility_strategy to broker value.
+        SchemaCompatibilityStrategy strategyFromCache = 
topicPolicies.getSchemaCompatibilityStrategy().getBrokerValue();
+        SchemaCompatibilityStrategy strategyFromPulsarConfig = 
formatSchemaCompatibilityStrategy(
+                
brokerService.pulsar().getConfiguration().getSchemaCompatibilityStrategy());
+
+        if (strategyFromCache == null || strategyFromCache != 
strategyFromPulsarConfig) {

Review comment:
       There have two cases: 
   1. when the broker value is null from the 
cache(`opicPolicies.getSchemaCompatibilityStrategy()`), we will set 
`schema_auto_update_compatibility_strategy` as broker value.
   2. when the broker value in `broker.conf` does not equal the broker value 
from cache(`opicPolicies.getSchemaCompatibilityStrategy()`), it represents the 
`schema_auto_update_compatibility_strategy` has been changed, so we need to 
update this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to