This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d8a388ad924 [feature][broker] Support schemaValidationEnforced on 
topic level (#15712)
d8a388ad924 is described below

commit d8a388ad9240d783d203f202136990b1f29b243f
Author: Jiwei Guo <techno...@apache.org>
AuthorDate: Tue May 24 06:52:19 2022 -0700

    [feature][broker] Support schemaValidationEnforced on topic level (#15712)
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 20 +++++++++
 .../apache/pulsar/broker/admin/v2/Namespaces.java  |  2 +-
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 51 ++++++++++++++++++++++
 .../pulsar/broker/namespace/NamespaceService.java  |  2 +-
 .../pulsar/broker/service/AbstractTopic.java       |  8 ++--
 .../service/nonpersistent/NonPersistentTopic.java  |  2 -
 .../broker/service/persistent/PersistentTopic.java |  4 --
 .../apache/pulsar/broker/admin/AdminApi2Test.java  | 16 +++++++
 .../org/apache/pulsar/client/admin/Topics.java     | 30 +++++++++++++
 .../pulsar/client/admin/internal/TopicsImpl.java   | 38 ++++++++++++++++
 .../org/apache/pulsar/admin/cli/CmdTopics.java     | 33 ++++++++++++++
 .../policies/data/HierarchyTopicPolicies.java      |  3 ++
 .../pulsar/common/policies/data/TopicPolicies.java |  6 +++
 .../apache/pulsar/common/protocol/Commands.java    |  2 +-
 14 files changed, 204 insertions(+), 13 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index eab7ac0fe5b..2f3312c4ae2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -5019,4 +5019,24 @@ public class PersistentTopicsBase extends AdminResource {
                 .filter(topic -> includeSystemTopic ? true : 
!pulsar().getBrokerService().isSystemTopic(topic))
                 .collect(Collectors.toList());
     }
+
+    protected CompletableFuture<Boolean> 
internalGetSchemaValidationEnforced(boolean applied) {
+        return getTopicPoliciesAsyncWithRetry(topicName)
+                .thenApply(op -> 
op.map(TopicPolicies::getSchemaValidationEnforced).orElseGet(() -> {
+                    if (applied) {
+                        boolean namespacePolicy = 
getNamespacePolicies(namespaceName).schema_validation_enforced;
+                        return namespacePolicy || 
pulsar().getConfiguration().isSchemaValidationEnforced();
+                    }
+                    return false;
+                }));
+    }
+
+    protected CompletableFuture<Void> 
internalSetSchemaValidationEnforced(boolean schemaValidationEnforced) {
+        return getTopicPoliciesAsyncWithRetry(topicName)
+                .thenCompose(op -> {
+                    TopicPolicies topicPolicies = 
op.orElseGet(TopicPolicies::new);
+                    
topicPolicies.setSchemaValidationEnforced(schemaValidationEnforced);
+                    return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies);
+                });
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 7dcf97020fb..2162fa3ee87 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -1822,7 +1822,7 @@ public class Namespaces extends NamespacesBase {
     @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission"),
             @ApiResponse(code = 404, message = "Tenant or Namespace doesn't 
exist"),
             @ApiResponse(code = 412, message = "schemaValidationEnforced value 
is not valid")})
-    public void setSchemaValidtionEnforced(@PathParam("tenant") String tenant,
+    public void setSchemaValidationEnforced(@PathParam("tenant") String tenant,
                                            @PathParam("namespace") String 
namespace,
                                            @ApiParam(value =
                                                    "Flag of whether validation 
is enforced on the specified namespace",
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 87bb1fed581..4cd405d3e9a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -3911,5 +3911,56 @@ public class PersistentTopics extends 
PersistentTopicsBase {
                 });
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/{topic}/schemaValidationEnforced")
+    @ApiOperation(value = "Get schema validation enforced flag for topic.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Tenants or Namespace doesn't 
exist") })
+    public void getSchemaValidationEnforced(@Suspended AsyncResponse 
asyncResponse,
+                                            @ApiParam(value = "Specify the 
tenant", required = true)
+                                            @PathParam("tenant") String tenant,
+                                            @ApiParam(value = "Specify the 
namespace", required = true)
+                                            @PathParam("namespace") String 
namespace,
+                                            @ApiParam(value = "Specify topic 
name", required = true)
+                                            @PathParam("topic") @Encoded 
String encodedTopic,
+                                            @QueryParam("applied") 
@DefaultValue("false") boolean applied,
+                                            @ApiParam(value = "Is 
authentication required to perform this operation")
+                                            @QueryParam("authoritative") 
@DefaultValue("false") boolean authoritative) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        preValidation(authoritative)
+                .thenCompose(__ -> 
internalGetSchemaValidationEnforced(applied))
+                .thenAccept(asyncResponse::resume)
+                .exceptionally(ex -> {
+                    handleTopicPolicyException("getSchemaValidationEnforced", 
ex, asyncResponse);
+                    return null;
+                });
+    }
+
+    @POST
+    @Path("/{tenant}/{namespace}/{topic}/schemaValidationEnforced")
+    @ApiOperation(value = "Set schema validation enforced flag on topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or Namespace doesn't 
exist"),
+            @ApiResponse(code = 412, message = "schemaValidationEnforced value 
is not valid")})
+    public void setSchemaValidationEnforced(@Suspended AsyncResponse 
asyncResponse,
+                                            @ApiParam(value = "Specify the 
tenant", required = true)
+                                            @PathParam("tenant") String tenant,
+                                            @ApiParam(value = "Specify the 
namespace", required = true)
+                                            @PathParam("namespace") String 
namespace,
+                                            @ApiParam(value = "Specify topic 
name", required = true)
+                                            @PathParam("topic") @Encoded 
String encodedTopic,
+                                            @ApiParam(value = "Is 
authentication required to perform this operation")
+                                            @QueryParam("authoritative") 
@DefaultValue("false") boolean authoritative,
+                                            @ApiParam(required = true) boolean 
schemaValidationEnforced) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        preValidation(authoritative)
+                .thenCompose(__ -> 
internalSetSchemaValidationEnforced(schemaValidationEnforced))
+                .thenAccept(__ -> 
asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    handleTopicPolicyException("setSchemaValidationEnforced", 
ex, asyncResponse);
+                    return null;
+                });
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(PersistentTopics.class);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index fd51837692a..38fda886793 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -1215,7 +1215,7 @@ public class NamespaceService implements AutoCloseable {
         return PulsarWebResource.checkLocalOrGetPeerReplicationCluster(pulsar, 
namespaceName)
                 .thenCompose(peerClusterData -> {
                     // if peer-cluster-data is present it means namespace is 
owned by that peer-cluster and request
-                    // should be redirect to the peer-cluster
+                    // should redirect to the peer-cluster
                     if (peerClusterData != null) {
                         return 
getNonPersistentTopicsFromPeerCluster(peerClusterData, namespaceName);
                     } else {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index ad512949168..edc7bc89647 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -108,8 +108,6 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
     protected volatile boolean isEncryptionRequired = false;
 
     protected volatile Boolean isAllowAutoUpdateSchema;
-    // schema validation enforced flag
-    protected volatile boolean schemaValidationEnforced = false;
 
     protected volatile PublishRateLimiter topicPublishRateLimiter;
 
@@ -221,7 +219,7 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
             DispatchRateImpl.normalize(data.getSubscriptionDispatchRate()));
         
topicPolicies.getCompactionThreshold().updateTopicValue(data.getCompactionThreshold());
         
topicPolicies.getDispatchRate().updateTopicValue(DispatchRateImpl.normalize(data.getDispatchRate()));
-
+        
topicPolicies.getSchemaValidationEnforced().updateTopicValue(data.getSchemaValidationEnforced());
         this.subscriptionPolicies = data.getSubscriptionPolicies();
     }
 
@@ -269,6 +267,7 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
             brokerService.getPulsar().getConfig().getClusterName());
         updateSchemaCompatibilityStrategyNamespaceValue(namespacePolicies);
         updateNamespaceDispatchRate(namespacePolicies, 
brokerService.getPulsar().getConfig().getClusterName());
+        
topicPolicies.getSchemaValidationEnforced().updateNamespaceValue(namespacePolicies.schema_validation_enforced);
     }
 
     private void updateNamespaceDispatchRate(Policies namespacePolicies, 
String cluster) {
@@ -362,6 +361,7 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
         topicPolicies.getSchemaCompatibilityStrategy()
                 
.updateBrokerValue(formatSchemaCompatibilityStrategy(schemaCompatibilityStrategy));
         
topicPolicies.getDispatchRate().updateBrokerValue(dispatchRateInBroker(config));
+        
topicPolicies.getSchemaValidationEnforced().updateBrokerValue(config.isSchemaValidationEnforced());
     }
 
     private DispatchRateImpl dispatchRateInBroker(ServiceConfiguration config) 
{
@@ -570,7 +570,7 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
 
     @Override
     public boolean getSchemaValidationEnforced() {
-        return schemaValidationEnforced;
+        return topicPolicies.getSchemaValidationEnforced().get();
     }
 
     public void markBatchMessagePublished() {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 58dd9302cf3..b8f41fa4c40 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -169,7 +169,6 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic, TopicPol
                         updateTopicPolicyByNamespacePolicy(policies);
                         isEncryptionRequired = policies.encryption_required;
                         isAllowAutoUpdateSchema = 
policies.is_allow_auto_update_schema;
-                        schemaValidationEnforced = 
policies.schema_validation_enforced;
                     }
                     updatePublishDispatcher();
                     updateResourceGroupLimiter(optPolicies);
@@ -1008,7 +1007,6 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic, TopicPol
 
         isEncryptionRequired = data.encryption_required;
         isAllowAutoUpdateSchema = data.is_allow_auto_update_schema;
-        schemaValidationEnforced = data.schema_validation_enforced;
 
         List<CompletableFuture<Void>> producerCheckFutures = new 
ArrayList<>(producers.size());
         producers.values().forEach(producer -> producerCheckFutures.add(
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 4d8ac3e0032..dec5d80cb5f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -341,8 +341,6 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                     this.isEncryptionRequired = policies.encryption_required;
 
                     isAllowAutoUpdateSchema = 
policies.is_allow_auto_update_schema;
-
-                    schemaValidationEnforced = 
policies.schema_validation_enforced;
                 }).exceptionally(ex -> {
                     log.warn("[{}] Error getting policies {} and 
isEncryptionRequired will be set to false",
                             topic, ex.getMessage());
@@ -2398,8 +2396,6 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 
         isAllowAutoUpdateSchema = data.is_allow_auto_update_schema;
 
-        schemaValidationEnforced = data.schema_validation_enforced;
-
         initializeDispatchRateLimiterIfNeeded();
 
         updateSubscribeRateLimiter();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 87d4f83f7de..fa1b7ba1657 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -2472,4 +2472,20 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
         assertEquals(topicStats.getPublishers().size(), 2);
         topicStats.getPublishers().forEach(p -> 
assertTrue(p.isSupportsPartialProducer()));
     }
+
+    @Test(dataProvider = "topicType")
+    public void testSchemaValidationEnforced(String topicType) throws 
Exception {
+        final String topic = topicType + 
"://prop-xyz/ns1/test-schema-validation-enforced";
+        admin.topics().createPartitionedTopic(topic, 1);
+        @Cleanup
+        Producer<byte[]> producer1 = pulsarClient.newProducer()
+                .topic(topic + TopicName.PARTITIONED_TOPIC_SUFFIX + 0)
+                .create();
+        boolean schemaValidationEnforced = 
admin.topics().getSchemaValidationEnforced(topic, false);
+        assertEquals(schemaValidationEnforced, false);
+        admin.topics().setSchemaValidationEnforced(topic, true);
+        Awaitility.await().untilAsserted(() ->
+            assertEquals(admin.topics().getSchemaValidationEnforced(topic, 
false), true)
+        );
+    }
 }
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
index ae5e90a287c..48ef03cd67e 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -4013,4 +4013,34 @@ public interface Topics {
      * @return a map of replicated subscription status on a topic
      */
     CompletableFuture<Map<String, Boolean>> 
getReplicatedSubscriptionStatusAsync(String topic, String subName);
+
+    /**
+     * Get schema validation enforced for a topic.
+     *
+     * @param topic topic name
+     * @return whether the schema validation enforced is set or not
+     */
+    boolean getSchemaValidationEnforced(String topic, boolean applied) throws 
PulsarAdminException;
+
+    /**
+     * Get schema validation enforced for a topic.
+     *
+     * @param topic topic name
+     */
+    void setSchemaValidationEnforced(String topic, boolean enable) throws 
PulsarAdminException;
+
+    /**
+     * Get schema validation enforced for a topic asynchronously.
+     *
+     * @param topic topic name
+     * @return whether the schema validation enforced is set or not
+     */
+    CompletableFuture<Boolean> getSchemaValidationEnforcedAsync(String topic, 
boolean applied);
+
+    /**
+     * Get schema validation enforced for a topic asynchronously.
+     *
+     * @param topic topic name
+     */
+    CompletableFuture<Void> setSchemaValidationEnforcedAsync(String topic, 
boolean enable);
 }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 0b2aad296ff..5314872fc00 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -2970,6 +2970,44 @@ public class TopicsImpl extends BaseResource implements 
Topics {
         return future;
     }
 
+    @Override
+    public boolean getSchemaValidationEnforced(String topic, boolean applied) 
throws PulsarAdminException {
+        return sync(() -> getSchemaValidationEnforcedAsync(topic, applied));
+    }
+
+    @Override
+    public void setSchemaValidationEnforced(String topic, boolean enable) 
throws PulsarAdminException {
+        sync(() -> setSchemaValidationEnforcedAsync(topic, enable));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> getSchemaValidationEnforcedAsync(String 
topic, boolean applied) {
+        TopicName tn = validateTopic(topic);
+        WebTarget path = topicPath(tn, "schemaValidationEnforced");
+        path = path.queryParam("applied", applied);
+        final CompletableFuture<Boolean> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<Boolean>() {
+                    @Override
+                    public void completed(Boolean enforced) {
+                        future.complete(enforced);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        
future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+    @Override
+    public CompletableFuture<Void> setSchemaValidationEnforcedAsync(String 
topic, boolean schemaValidationEnforced) {
+        TopicName tn = validateTopic(topic);
+        WebTarget path = topicPath(tn, "schemaValidationEnforced");
+        return asyncPostRequest(path, Entity.entity(schemaValidationEnforced, 
MediaType.APPLICATION_JSON));
+    }
+
     @Override
     public Set<String> getReplicationClusters(String topic, boolean applied) 
throws PulsarAdminException {
         return sync(() -> getReplicationClustersAsync(topic, applied));
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index c4476472338..3c2d40c43cd 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -242,6 +242,9 @@ public class CmdTopics extends CmdBase {
         jcommander.addCommand("set-replication-clusters", new 
SetReplicationClusters());
         jcommander.addCommand("remove-replication-clusters", new 
RemoveReplicationClusters());
 
+        jcommander.addCommand("get-schema-validation-enforce", new 
GetSchemaValidationEnforced());
+        jcommander.addCommand("set-schema-validation-enforce", new 
SetSchemaValidationEnforced());
+
         initDeprecatedCommands();
     }
 
@@ -2840,4 +2843,34 @@ public class CmdTopics extends CmdBase {
 
         }
     }
+
+    @Parameters(commandDescription = "Get the schema validation enforced")
+    private class GetSchemaValidationEnforced extends CliCommand {
+        @Parameter(description = "tenant/namespace", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "-ap", "--applied" }, description = "Get the 
applied policy of the topic")
+        private boolean applied = false;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String topic = validateTopicName(params);
+            
System.out.println(getAdmin().topics().getSchemaValidationEnforced(topic, 
applied));
+        }
+    }
+
+    @Parameters(commandDescription = "Set the schema whether open schema 
validation enforced")
+    private class SetSchemaValidationEnforced extends CliCommand {
+        @Parameter(description = "tenant/namespace", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--enable", "-e" }, description = "Enable schema 
validation enforced")
+        private boolean enable = false;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String topic = validateTopicName(params);
+            getAdmin().topics().setSchemaValidationEnforced(topic, enable);
+        }
+    }
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
index 0532744bec3..66c21a11716 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
@@ -58,6 +58,8 @@ public class HierarchyTopicPolicies {
     final PolicyHierarchyValue<SchemaCompatibilityStrategy> 
schemaCompatibilityStrategy;
     final PolicyHierarchyValue<DispatchRateImpl> dispatchRate;
 
+    final PolicyHierarchyValue<Boolean> schemaValidationEnforced;
+
     public HierarchyTopicPolicies() {
         replicationClusters = new PolicyHierarchyValue<>();
         retentionPolicies = new PolicyHierarchyValue<>();
@@ -86,5 +88,6 @@ public class HierarchyTopicPolicies {
         subscriptionDispatchRate = new PolicyHierarchyValue<>();
         schemaCompatibilityStrategy = new PolicyHierarchyValue<>();
         dispatchRate = new PolicyHierarchyValue<>();
+        schemaValidationEnforced = new PolicyHierarchyValue<>();
     }
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
index ae04c2e2178..07e4dff56bd 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
@@ -78,6 +78,8 @@ public class TopicPolicies {
     @Builder.Default
     private Map<String/*subscription*/, SubscriptionPolicies> 
subscriptionPolicies = new HashMap<>();
 
+    private Boolean schemaValidationEnforced;
+
     public boolean isGlobalPolicies() {
         return isGlobal != null && isGlobal;
     }
@@ -174,6 +176,10 @@ public class TopicPolicies {
         return subscribeRate != null;
     }
 
+    public boolean isSchemaValidationEnforced() {
+        return schemaValidationEnforced != null;
+    }
+
     public Set<String> getReplicationClustersSet() {
         return replicationClusters != null ? 
Sets.newTreeSet(this.replicationClusters) : null;
     }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 2d8e043058d..285a12321c1 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -1856,7 +1856,7 @@ public class Commands {
         case ExclusiveWithFencing:
             return 
org.apache.pulsar.common.api.proto.ProducerAccessMode.ExclusiveWithFencing;
         default:
-            throw new IllegalArgumentException("Unknonw access mode: " + 
accessMode);
+            throw new IllegalArgumentException("Unknown access mode: " + 
accessMode);
         }
     }
 

Reply via email to