nodece commented on a change in pull request #13297:
URL: https://github.com/apache/pulsar/pull/13297#discussion_r782734029
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
##########
@@ -181,11 +211,48 @@ protected void
updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
topicPolicies.getDeduplicationEnabled().updateNamespaceValue(namespacePolicies.deduplicationEnabled);
topicPolicies.getSubscriptionTypesEnabled().updateNamespaceValue(
subTypeStringsToEnumSet(namespacePolicies.subscription_types_enabled));
+
+ updateSchemaCompatibilityStrategyNamespaceValue(namespacePolicies);
+
Arrays.stream(BacklogQuota.BacklogQuotaType.values()).forEach(
type -> this.topicPolicies.getBackLogQuotaMap().get(type)
.updateNamespaceValue(MapUtils.getObject(namespacePolicies.backlog_quota_map,
type)));
}
+ private void updateSchemaCompatibilityStrategyNamespaceValue(Policies
namespacePolicies){
+ SchemaCompatibilityStrategy schemaCompatibilityStrategy =
+
formatSchemaCompatibilityStrategy(namespacePolicies.schema_compatibility_strategy);
+ if (this.isSystemTopic()) {
+
topicPolicies.getSchemaCompatibilityStrategy().updateNamespaceValue(schemaCompatibilityStrategy);
+ return;
+ }
+
topicPolicies.getSchemaCompatibilityStrategy().updateNamespaceValue(schemaCompatibilityStrategy);
+
+ // If the broker config is null, use
schema_auto_update_compatibility_strategy from namespace policies as the
+ // broker value.
+ // This is the initial broker config.
+ SchemaCompatibilityStrategy brokerSchemaCompatibilityStrategyFromCache
=
+
topicPolicies.getSchemaCompatibilityStrategy().getBrokerValue();
+ SchemaCompatibilityStrategy
brokerSchemaCompatibilityStrategyFromPulsarConfig =
+
brokerService.pulsar().getConfiguration().getSchemaCompatibilityStrategy();
+ if (brokerSchemaCompatibilityStrategyFromPulsarConfig ==
SchemaCompatibilityStrategy.UNDEFINED) {
+ brokerSchemaCompatibilityStrategyFromPulsarConfig = null;
+ }
+
+ // When the initial broker config is null or the initial broker config
is not equal to the current broker
Review comment:
Backward compatibility, set schema_auto_update_compatibility_strategy
value as broker value, it is the lowest priority:
- When the broker value is null in topic policies, set
schema_auto_update_compatibility_strategy as broker value
- When the broker value from topic policies does not equal from Pulsar
config, need to update the schema_auto_update_compatibility_strategy to broker
value
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]