This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b62b341  [Issue#2734][pulsar-brokers]Add namespace level enforcement 
on schema validation (#4157)
b62b341 is described below

commit b62b341e8265c9859caf04605d8a526d3206d67c
Author: tuteng <[email protected]>
AuthorDate: Tue Apr 30 10:18:57 2019 +0800

    [Issue#2734][pulsar-brokers]Add namespace level enforcement on schema 
validation (#4157)
    
    Fixes #2734
    Master Issue: #2734
    
    ### Motivation
    In 2.2 we have a broker-level setting on enforcing schema validation. in 
2.3, we need to add the enforcement to namespace level and eventually get rid 
of broker-level settings.
    
    ### Modifications
    
    * Add command line arguments for namespace
    * Add rest api interface for namespace
    * Add schema_validation_enforced to class Policies
    * Add unit test
    
    ### Verifying this change
    
    Unit test pass
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   |  17 ++
 .../apache/pulsar/broker/admin/v2/Namespaces.java  |  32 ++++
 .../apache/pulsar/broker/service/ServerCnx.java    |   3 +-
 .../org/apache/pulsar/broker/service/Topic.java    |   2 +
 .../service/nonpersistent/NonPersistentTopic.java  |   7 +
 .../broker/service/persistent/PersistentTopic.java |  10 ++
 .../admin/AdminApiSchemaValidationEnforced.java    | 185 +++++++++++++++++++++
 .../org/apache/pulsar/client/admin/Namespaces.java |  32 ++++
 .../client/admin/internal/NamespacesImpl.java      |  24 +++
 .../org/apache/pulsar/admin/cli/CmdNamespaces.java |  38 +++++
 .../pulsar/common/policies/data/Policies.java      |  11 +-
 11 files changed, 357 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 26958e9..98fb11b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -1787,6 +1787,23 @@ public abstract class NamespacesBase extends 
AdminResource {
             "schemaAutoUpdateCompatibilityStrategy");
     }
 
+    protected boolean internalGetSchemaValidationEnforced() {
+        validateSuperUserAccess();
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        return getNamespacePolicies(namespaceName).schema_validation_enforced;
+    }
+
+    protected void internalSetSchemaValidationEnforced(boolean 
schemaValidationEnforced) {
+        validateSuperUserAccess();
+        validatePoliciesReadOnlyAccess();
+
+        mutatePolicy((policies) -> {
+                policies. schema_validation_enforced = 
schemaValidationEnforced;
+                return policies;
+            }, (policies) -> policies. schema_validation_enforced,
+            "schemaValidationEnforced");
+    }
+
 
     private <T> void mutatePolicy(Function<Policies, Policies> 
policyTransformation,
                                   Function<Policies, T> getter,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index b116003..860ab84 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -843,5 +843,37 @@ public class Namespaces extends NamespacesBase {
         internalSetSchemaAutoUpdateCompatibilityStrategy(strategy);
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/schemaValidationEnforced")
+    @ApiOperation(value = "Get schema validation enforced flag for namespace.",
+                  notes = "If the flag is set to true, when a producer without 
a schema attempts to produce to a topic"
+                          + " with schema in this namespace, the producer will 
be failed to connect. PLEASE be"
+                          + " carefully on using this, since non-java clients 
don't support schema.if you enable"
+                          + " this setting, it will cause non-java clients 
failed to produce.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+                            @ApiResponse(code = 404, message = "Tenants or 
Namespace doesn't exist") })
+    public boolean getSchemaValidtionEnforced(@PathParam("tenant") String 
tenant,
+                                           @PathParam("namespace") String 
namespace) {
+        validateNamespaceName(tenant, namespace);
+        return internalGetSchemaValidationEnforced();
+    }
+
+    @POST
+    @Path("/{tenant}/{namespace}/schemaValidationEnforced")
+    @ApiOperation(value = "Set schema validation enforced flag on namespace.",
+                  notes = "If the flag is set to true, when a producer without 
a schema attempts to produce to a topic"
+                          + " with schema in this namespace, the producer will 
be failed to connect. PLEASE be"
+                          + " carefully on using this, since non-java clients 
don't support schema.if you enable"
+                          + " this setting, it will cause non-java clients 
failed to produce.")
+            @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't 
have admin permission"),
+                            @ApiResponse(code = 404, message = "Tenant or 
Namespace doesn't exist"),
+                            @ApiResponse(code = 412, message = 
"schemaValidationEnforced value is not valid") })
+    public void setSchemaValidtionEnforced(@PathParam("tenant") String tenant,
+                                             @PathParam("namespace") String 
namespace,
+                                             boolean schemaValidationEnforced) 
{
+        validateNamespaceName(tenant, namespace);
+        internalSetSchemaValidationEnforced(schemaValidationEnforced);
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(Namespaces.class);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index d2a5268..6fe3864 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -61,6 +61,7 @@ import 
org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotRea
 import org.apache.pulsar.broker.service.schema.IncompatibleSchemaException;
 import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
 import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.ClientCnx;
@@ -911,7 +912,7 @@ public class ServerCnx extends PulsarHandler {
                                         log.info("[{}]-{} {} configured with 
schema {}", remoteAddress, producerId,
                                                 topicName, hasSchema);
                                         CompletableFuture<SchemaVersion> 
result = new CompletableFuture<>();
-                                        if (hasSchema && 
schemaValidationEnforced) {
+                                        if (hasSchema && 
(schemaValidationEnforced || topic.getSchemaValidationEnforced())) {
                                             result.completeExceptionally(new 
IncompatibleSchemaException(
                                                 "Producers cannot connect 
without a schema to topics with a schema"));
                                         } else {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index 4209af7..00c76e2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -117,6 +117,8 @@ public interface Topic {
 
     boolean isEncryptionRequired();
 
+    boolean getSchemaValidationEnforced();
+
     boolean isReplicated();
 
     BacklogQuota getBacklogQuota();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index b0cbf1c..dc74bea 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -139,6 +139,8 @@ public class NonPersistentTopic implements Topic {
     private volatile boolean isEncryptionRequired = false;
     private volatile SchemaCompatibilityStrategy schemaCompatibilityStrategy =
         SchemaCompatibilityStrategy.FULL;
+    // schema validation enforced flag
+    private volatile boolean schemaValidationEnforced = false;
 
     private static class TopicStats {
         public double averageMsgSize;
@@ -183,6 +185,7 @@ public class NonPersistentTopic implements Topic {
             isEncryptionRequired = policies.encryption_required;
             schemaCompatibilityStrategy = 
SchemaCompatibilityStrategy.fromAutoUpdatePolicy(
                     policies.schema_auto_update_compatibility_strategy);
+            schemaValidationEnforced = policies.schema_validation_enforced;
 
         } catch (Exception e) {
             log.warn("[{}] Error getting policies {} and isEncryptionRequired 
will be set to false", topic, e.getMessage());
@@ -955,6 +958,7 @@ public class NonPersistentTopic implements Topic {
         isEncryptionRequired = data.encryption_required;
         schemaCompatibilityStrategy = 
SchemaCompatibilityStrategy.fromAutoUpdatePolicy(
                 data.schema_auto_update_compatibility_strategy);
+        schemaValidationEnforced = data.schema_validation_enforced;
 
         producers.forEach(producer -> {
             producer.checkPermissions();
@@ -990,6 +994,9 @@ public class NonPersistentTopic implements Topic {
     }
 
     @Override
+    public boolean getSchemaValidationEnforced() { return 
schemaValidationEnforced; }
+
+    @Override
     public boolean isReplicated() {
         return replicators.size() > 1;
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 19bc174..23ca67a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -184,6 +184,8 @@ public class PersistentTopic implements Topic, 
AddEntryCallback {
     private volatile boolean isEncryptionRequired = false;
     private volatile SchemaCompatibilityStrategy schemaCompatibilityStrategy =
         SchemaCompatibilityStrategy.FULL;
+    // schema validation enforced flag
+    private volatile boolean schemaValidationEnforced = false;
 
     private static final FastThreadLocal<TopicStatsHelper> 
threadLocalTopicStats = new FastThreadLocal<TopicStatsHelper>() {
         @Override
@@ -262,6 +264,8 @@ public class PersistentTopic implements Topic, 
AddEntryCallback {
 
             schemaCompatibilityStrategy = 
SchemaCompatibilityStrategy.fromAutoUpdatePolicy(
                     policies.schema_auto_update_compatibility_strategy);
+
+            schemaValidationEnforced = policies.schema_validation_enforced;
         } catch (Exception e) {
             log.warn("[{}] Error getting policies {} and isEncryptionRequired 
will be set to false", topic, e.getMessage());
             isEncryptionRequired = false;
@@ -1621,6 +1625,9 @@ public class PersistentTopic implements Topic, 
AddEntryCallback {
         schemaCompatibilityStrategy = 
SchemaCompatibilityStrategy.fromAutoUpdatePolicy(
                 data.schema_auto_update_compatibility_strategy);
 
+        schemaValidationEnforced = data.schema_validation_enforced;
+
+
         initializeDispatchRateLimiterIfNeeded(Optional.ofNullable(data));
 
         producers.forEach(producer -> {
@@ -1691,6 +1698,9 @@ public class PersistentTopic implements Topic, 
AddEntryCallback {
     }
 
     @Override
+    public boolean getSchemaValidationEnforced() { return 
schemaValidationEnforced; }
+
+    @Override
     public boolean isReplicated() {
         return !replicators.isEmpty();
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaValidationEnforced.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaValidationEnforced.java
new file mode 100644
index 0000000..b193d3a
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaValidationEnforced.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import com.google.common.collect.Sets;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.schema.PostSchemaPayload;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.slf4j.Logger;
+import org.testng.Assert;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import org.testng.collections.Maps;
+
+import java.util.Map;
+
+@Slf4j
+public class AdminApiSchemaValidationEnforced extends 
MockedPulsarServiceBaseTest {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AdminApiSchemaValidationEnforced.class);
+
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception {
+        super.internalSetup();
+
+        admin.clusters().createCluster("test", new 
ClusterData("http://127.0.0.1"; + ":" + BROKER_WEBSERVICE_PORT));
+        TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", 
"role2"), Sets.newHashSet("test"));
+        admin.tenants().createTenant("schema-validation-enforced", tenantInfo);
+    }
+
+    @AfterMethod
+    @Override
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testDisableSchemaValidationEnforcedNoSchema() throws Exception 
{
+        
admin.namespaces().createNamespace("schema-validation-enforced/default-no-schema");
+        String namespace = "schema-validation-enforced/default-no-schema";
+        String topicName = 
"persistent://schema-validation-enforced/default-no-schema/test";
+        
Assert.assertEquals(admin.namespaces().getSchemaValidationEnforced(namespace), 
false);
+        admin.namespaces().setSchemaValidationEnforced(namespace, false);
+        try {
+            admin.schemas().getSchemaInfo(topicName);
+        } catch (PulsarAdminException.NotFoundException e) {
+            Assert.assertTrue(e.getMessage().contains("HTTP 404 Not Found"));
+        }
+        try (Producer p = 
pulsarClient.newProducer().topic(topicName).create()) {
+            p.send("test schemaValidationEnforced".getBytes());
+        }
+    }
+
+    @Test
+    public void testDisableSchemaValidationEnforcedHasSchema() throws 
Exception {
+        
admin.namespaces().createNamespace("schema-validation-enforced/default-has-schema");
+        String namespace = "schema-validation-enforced/default-has-schema";
+        String topicName = 
"persistent://schema-validation-enforced/default-has-schema/test";
+        
Assert.assertEquals(admin.namespaces().getSchemaValidationEnforced(namespace), 
false);
+        admin.namespaces().setSchemaValidationEnforced(namespace, false);
+        try {
+            admin.schemas().getSchemaInfo(topicName);
+        } catch (PulsarAdminException.NotFoundException e) {
+            Assert.assertTrue(e.getMessage().contains("HTTP 404 Not Found"));
+        }
+        Map<String, String> properties = Maps.newHashMap();
+        SchemaInfo schemaInfo = new SchemaInfo();
+        schemaInfo.setType(SchemaType.STRING);
+        schemaInfo.setProperties(properties);
+        schemaInfo.setName("test");
+        schemaInfo.setSchema("".getBytes());
+        PostSchemaPayload postSchemaPayload = new PostSchemaPayload("STRING", 
"", properties);
+        admin.schemas().createSchema(topicName, postSchemaPayload);
+        try (Producer p = 
pulsarClient.newProducer().topic(topicName).create()) {
+            p.send("test schemaValidationEnforced".getBytes());
+        }
+        Assert.assertEquals(admin.schemas().getSchemaInfo(topicName), 
schemaInfo);
+    }
+
+
+    @Test
+    public void testEnableSchemaValidationEnforcedNoSchema() throws Exception {
+        
admin.namespaces().createNamespace("schema-validation-enforced/enable-no-schema");
+        String namespace = "schema-validation-enforced/enable-no-schema";
+        String topicName = 
"persistent://schema-validation-enforced/enable-no-schema/test";
+        
Assert.assertEquals(admin.namespaces().getSchemaValidationEnforced(namespace), 
false);
+        admin.namespaces().setSchemaValidationEnforced(namespace,true);
+        try {
+            admin.schemas().getSchemaInfo(topicName);
+        } catch (PulsarAdminException.NotFoundException e) {
+            Assert.assertTrue(e.getMessage().contains("HTTP 404 Not Found"));
+        }
+        try (Producer p = 
pulsarClient.newProducer().topic(topicName).create()) {
+            p.send("test schemaValidationEnforced".getBytes());
+        }
+    }
+
+    @Test
+    public void testEnableSchemaValidationEnforcedHasSchemaMismatch() throws 
Exception {
+        
admin.namespaces().createNamespace("schema-validation-enforced/enable-has-schema-mismatch");
+        String namespace = 
"schema-validation-enforced/enable-has-schema-mismatch";
+        String topicName = 
"persistent://schema-validation-enforced/enable-has-schema-mismatch/test";
+        
Assert.assertEquals(admin.namespaces().getSchemaValidationEnforced(namespace), 
false);
+        admin.namespaces().setSchemaValidationEnforced(namespace,true);
+        
Assert.assertEquals(admin.namespaces().getSchemaValidationEnforced(namespace), 
true);
+        admin.topics().createNonPartitionedTopic(topicName);
+        admin.topics().getStats(topicName);
+        try {
+            admin.schemas().getSchemaInfo(topicName);
+        } catch (PulsarAdminException.NotFoundException e) {
+            Assert.assertTrue(e.getMessage().contains("HTTP 404 Not Found"));
+        }
+        Map<String, String> properties = Maps.newHashMap();
+        properties.put("key1", "value1");
+        SchemaInfo schemaInfo = new SchemaInfo();
+        schemaInfo.setType(SchemaType.STRING);
+        schemaInfo.setProperties(properties);
+        schemaInfo.setName("test");
+        schemaInfo.setSchema("".getBytes());
+        PostSchemaPayload postSchemaPayload = new PostSchemaPayload("STRING", 
"{'key':'value'}", properties);
+        admin.schemas().createSchema(topicName, postSchemaPayload);
+        try (Producer p = 
pulsarClient.newProducer().topic(topicName).create()) {
+            Assert.fail("Client no schema, but topic has schema, should fail");
+        }  catch (PulsarClientException e) {
+            
Assert.assertTrue(e.getMessage().contains("IncompatibleSchemaException"));
+        }
+        
Assert.assertEquals(admin.schemas().getSchemaInfo(topicName).getName(), 
schemaInfo.getName());
+        
Assert.assertEquals(admin.schemas().getSchemaInfo(topicName).getType(), 
schemaInfo.getType());
+    }
+
+    @Test
+    public void testEnableSchemaValidationEnforcedHasSchemaMatch() throws 
Exception {
+        
admin.namespaces().createNamespace("schema-validation-enforced/enable-has-schema-match");
+        String namespace = 
"schema-validation-enforced/enable-has-schema-match";
+        String topicName = 
"persistent://schema-validation-enforced/enable-has-schema-match/test";
+        
Assert.assertEquals(admin.namespaces().getSchemaValidationEnforced(namespace), 
false);
+        try {
+            admin.schemas().getSchemaInfo(topicName);
+        } catch (PulsarAdminException.NotFoundException e) {
+            Assert.assertTrue(e.getMessage().contains("HTTP 404 Not Found"));
+        }
+        admin.namespaces().setSchemaValidationEnforced(namespace,true);
+        Map<String, String> properties = Maps.newHashMap();
+        SchemaInfo schemaInfo = new SchemaInfo();
+        schemaInfo.setType(SchemaType.STRING);
+        schemaInfo.setProperties(properties);
+        schemaInfo.setName("test");
+        schemaInfo.setSchema("".getBytes());
+        PostSchemaPayload postSchemaPayload = new PostSchemaPayload("STRING", 
"", properties);
+        admin.schemas().createSchema(topicName, postSchemaPayload);
+        try (Producer<String> p = 
pulsarClient.newProducer(Schema.STRING).topic(topicName).create()) {
+            p.send("test schemaValidationEnforced");
+        }
+        
Assert.assertEquals(admin.schemas().getSchemaInfo(topicName).getName(), 
schemaInfo.getName());
+        
Assert.assertEquals(admin.schemas().getSchemaInfo(topicName).getType(), 
schemaInfo.getType());
+    }
+
+}
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index 86da063..6b8c22c 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -1361,4 +1361,36 @@ public interface Namespaces {
     void setSchemaAutoUpdateCompatibilityStrategy(String namespace,
                                                   
SchemaAutoUpdateCompatibilityStrategy strategy)
             throws PulsarAdminException;
+
+    /**
+     * Get schema validation enforced for namespace.
+     * @return the schema validation enforced flag
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Tenant or Namespace does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+
+    boolean getSchemaValidationEnforced(String namespace)
+            throws PulsarAdminException;
+    /**
+     * Set schema validation enforced for namespace.
+     * if a producer without a schema attempts to produce to a topic with 
schema in this the namespace, the
+     * producer will be failed to connect. PLEASE be carefully on using this, 
since non-java clients don't
+     * support schema. if you enable this setting, it will cause non-java 
clients failed to produce.
+     *
+     * @param namespace pulsar namespace name
+     * @param schemaValidationEnforced flag to enable or disable schema 
validation for the given namespace
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Tenant or Namespace does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+
+    void setSchemaValidationEnforced(String namespace, boolean 
schemaValidationEnforced)
+            throws PulsarAdminException;
 }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index 6146e11..b80d238 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -810,6 +810,30 @@ public class NamespacesImpl extends BaseResource 
implements Namespaces {
         }
     }
 
+    @Override
+    public boolean getSchemaValidationEnforced(String namespace)
+            throws PulsarAdminException {
+        try {
+            NamespaceName ns = NamespaceName.get(namespace);
+            WebTarget path = namespacePath(ns, "schemaValidationEnforced");
+            return request(path).get(Boolean.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void setSchemaValidationEnforced(String namespace, boolean 
schemaValidationEnforced)
+            throws PulsarAdminException {
+        try {
+            NamespaceName ns = NamespaceName.get(namespace);
+            WebTarget path = namespacePath(ns, "schemaValidationEnforced");
+            request(path).post(Entity.entity(schemaValidationEnforced, 
MediaType.APPLICATION_JSON), ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
     private WebTarget namespacePath(NamespaceName namespace, String... parts) {
         final WebTarget base = namespace.isV2() ? adminV2Namespaces : 
adminNamespaces;
         WebTarget namespacePath = base.path(namespace.toString());
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 8b1b147..d45df9c 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -1013,6 +1013,41 @@ public class CmdNamespaces extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Get the schema validation enforced")
+    private class GetSchemaValidationEnforced extends CliCommand {
+        @Parameter(description = "tenant/namespace", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+
+            
System.out.println(admin.namespaces().getSchemaValidationEnforced(namespace));
+        }
+    }
+
+    @Parameters(commandDescription = "Set the schema whether open schema 
validation enforced")
+    private class SetSchemaValidationEnforced extends CliCommand {
+        @Parameter(description = "tenant/namespace", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--enable", "-e" }, description = "Enable schema 
validation enforced")
+        private boolean enable = false;
+
+        @Parameter(names = { "--disable", "-d" }, description = "Disable 
schema validation enforced")
+        private boolean disable = false;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+
+            if (enable == disable) {
+                throw new ParameterException("Need to specify either --enable 
or --disable");
+            }
+            admin.namespaces().setSchemaValidationEnforced(namespace, enable);
+        }
+    }
+
     public CmdNamespaces(PulsarAdmin admin) {
         super("namespaces", admin);
         jcommander.addCommand("list", new GetNamespacesPerProperty());
@@ -1093,5 +1128,8 @@ public class CmdNamespaces extends CmdBase {
 
         jcommander.addCommand("get-schema-autoupdate-strategy", new 
GetSchemaAutoUpdateStrategy());
         jcommander.addCommand("set-schema-autoupdate-strategy", new 
SetSchemaAutoUpdateStrategy());
+
+        jcommander.addCommand("get-schema-validation-enforce", new 
GetSchemaValidationEnforced());
+        jcommander.addCommand("set-schema-validation-enforce", new 
SetSchemaValidationEnforced());
     }
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
index f7f8ff6..23728dd 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
@@ -65,6 +65,8 @@ public class Policies {
     public SchemaAutoUpdateCompatibilityStrategy 
schema_auto_update_compatibility_strategy =
         SchemaAutoUpdateCompatibilityStrategy.Full;
 
+    public boolean schema_validation_enforced = false;
+
     @Override
     public int hashCode() {
         return Objects.hash(auth_policies, replication_clusters,
@@ -77,7 +79,8 @@ public class Policies {
                 max_consumers_per_topic, max_consumers_per_subscription,
                 compaction_threshold, offload_threshold,
                 offload_deletion_lag_ms,
-                schema_auto_update_compatibility_strategy);
+                schema_auto_update_compatibility_strategy,
+                schema_validation_enforced);
     }
 
     @Override
@@ -104,7 +107,8 @@ public class Policies {
                     && compaction_threshold == other.compaction_threshold
                     && offload_threshold == other.offload_threshold
                     && offload_deletion_lag_ms == other.offload_deletion_lag_ms
-                    && schema_auto_update_compatibility_strategy == 
other.schema_auto_update_compatibility_strategy;
+                    && schema_auto_update_compatibility_strategy == 
other.schema_auto_update_compatibility_strategy
+                    && schema_validation_enforced == 
other.schema_validation_enforced;
         }
 
         return false;
@@ -146,6 +150,7 @@ public class Policies {
                 .add("compaction_threshold", compaction_threshold)
                 .add("offload_threshold", offload_threshold)
                 .add("offload_deletion_lag_ms", offload_deletion_lag_ms)
-                .add("schema_auto_update_compatibility_strategy", 
schema_auto_update_compatibility_strategy).toString();
+                .add("schema_auto_update_compatibility_strategy", 
schema_auto_update_compatibility_strategy)
+                .add("schema_validation_enforced", 
schema_validation_enforced).toString();
     }
 }

Reply via email to