This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b62b341 [Issue#2734][pulsar-brokers]Add namespace level enforcement
on schema validation (#4157)
b62b341 is described below
commit b62b341e8265c9859caf04605d8a526d3206d67c
Author: tuteng <[email protected]>
AuthorDate: Tue Apr 30 10:18:57 2019 +0800
[Issue#2734][pulsar-brokers]Add namespace level enforcement on schema
validation (#4157)
Fixes #2734
Master Issue: #2734
### Motivation
In 2.2 we have a broker-level setting on enforcing schema validation. in
2.3, we need to add the enforcement to namespace level and eventually get rid
of broker-level settings.
### Modifications
* Add command line arguments for namespace
* Add rest api interface for namespace
* Add schema_validation_enforced to class Policies
* Add unit test
### Verifying this change
Unit test pass
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 17 ++
.../apache/pulsar/broker/admin/v2/Namespaces.java | 32 ++++
.../apache/pulsar/broker/service/ServerCnx.java | 3 +-
.../org/apache/pulsar/broker/service/Topic.java | 2 +
.../service/nonpersistent/NonPersistentTopic.java | 7 +
.../broker/service/persistent/PersistentTopic.java | 10 ++
.../admin/AdminApiSchemaValidationEnforced.java | 185 +++++++++++++++++++++
.../org/apache/pulsar/client/admin/Namespaces.java | 32 ++++
.../client/admin/internal/NamespacesImpl.java | 24 +++
.../org/apache/pulsar/admin/cli/CmdNamespaces.java | 38 +++++
.../pulsar/common/policies/data/Policies.java | 11 +-
11 files changed, 357 insertions(+), 4 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 26958e9..98fb11b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -1787,6 +1787,23 @@ public abstract class NamespacesBase extends
AdminResource {
"schemaAutoUpdateCompatibilityStrategy");
}
+ protected boolean internalGetSchemaValidationEnforced() {
+ validateSuperUserAccess();
+ validateAdminAccessForTenant(namespaceName.getTenant());
+ return getNamespacePolicies(namespaceName).schema_validation_enforced;
+ }
+
+ protected void internalSetSchemaValidationEnforced(boolean
schemaValidationEnforced) {
+ validateSuperUserAccess();
+ validatePoliciesReadOnlyAccess();
+
+ mutatePolicy((policies) -> {
+ policies. schema_validation_enforced =
schemaValidationEnforced;
+ return policies;
+ }, (policies) -> policies. schema_validation_enforced,
+ "schemaValidationEnforced");
+ }
+
private <T> void mutatePolicy(Function<Policies, Policies>
policyTransformation,
Function<Policies, T> getter,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index b116003..860ab84 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -843,5 +843,37 @@ public class Namespaces extends NamespacesBase {
internalSetSchemaAutoUpdateCompatibilityStrategy(strategy);
}
+ @GET
+ @Path("/{tenant}/{namespace}/schemaValidationEnforced")
+ @ApiOperation(value = "Get schema validation enforced flag for namespace.",
+ notes = "If the flag is set to true, when a producer without
a schema attempts to produce to a topic"
+ + " with schema in this namespace, the producer will
be failed to connect. PLEASE be"
+ + " carefully on using this, since non-java clients
don't support schema.if you enable"
+ + " this setting, it will cause non-java clients
failed to produce.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
+ @ApiResponse(code = 404, message = "Tenants or
Namespace doesn't exist") })
+ public boolean getSchemaValidtionEnforced(@PathParam("tenant") String
tenant,
+ @PathParam("namespace") String
namespace) {
+ validateNamespaceName(tenant, namespace);
+ return internalGetSchemaValidationEnforced();
+ }
+
+ @POST
+ @Path("/{tenant}/{namespace}/schemaValidationEnforced")
+ @ApiOperation(value = "Set schema validation enforced flag on namespace.",
+ notes = "If the flag is set to true, when a producer without
a schema attempts to produce to a topic"
+ + " with schema in this namespace, the producer will
be failed to connect. PLEASE be"
+ + " carefully on using this, since non-java clients
don't support schema.if you enable"
+ + " this setting, it will cause non-java clients
failed to produce.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't
have admin permission"),
+ @ApiResponse(code = 404, message = "Tenant or
Namespace doesn't exist"),
+ @ApiResponse(code = 412, message =
"schemaValidationEnforced value is not valid") })
+ public void setSchemaValidtionEnforced(@PathParam("tenant") String tenant,
+ @PathParam("namespace") String
namespace,
+ boolean schemaValidationEnforced)
{
+ validateNamespaceName(tenant, namespace);
+ internalSetSchemaValidationEnforced(schemaValidationEnforced);
+ }
+
private static final Logger log =
LoggerFactory.getLogger(Namespaces.class);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index d2a5268..6fe3864 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -61,6 +61,7 @@ import
org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotRea
import org.apache.pulsar.broker.service.schema.IncompatibleSchemaException;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.ClientCnx;
@@ -911,7 +912,7 @@ public class ServerCnx extends PulsarHandler {
log.info("[{}]-{} {} configured with
schema {}", remoteAddress, producerId,
topicName, hasSchema);
CompletableFuture<SchemaVersion>
result = new CompletableFuture<>();
- if (hasSchema &&
schemaValidationEnforced) {
+ if (hasSchema &&
(schemaValidationEnforced || topic.getSchemaValidationEnforced())) {
result.completeExceptionally(new
IncompatibleSchemaException(
"Producers cannot connect
without a schema to topics with a schema"));
} else {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index 4209af7..00c76e2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -117,6 +117,8 @@ public interface Topic {
boolean isEncryptionRequired();
+ boolean getSchemaValidationEnforced();
+
boolean isReplicated();
BacklogQuota getBacklogQuota();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index b0cbf1c..dc74bea 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -139,6 +139,8 @@ public class NonPersistentTopic implements Topic {
private volatile boolean isEncryptionRequired = false;
private volatile SchemaCompatibilityStrategy schemaCompatibilityStrategy =
SchemaCompatibilityStrategy.FULL;
+ // schema validation enforced flag
+ private volatile boolean schemaValidationEnforced = false;
private static class TopicStats {
public double averageMsgSize;
@@ -183,6 +185,7 @@ public class NonPersistentTopic implements Topic {
isEncryptionRequired = policies.encryption_required;
schemaCompatibilityStrategy =
SchemaCompatibilityStrategy.fromAutoUpdatePolicy(
policies.schema_auto_update_compatibility_strategy);
+ schemaValidationEnforced = policies.schema_validation_enforced;
} catch (Exception e) {
log.warn("[{}] Error getting policies {} and isEncryptionRequired
will be set to false", topic, e.getMessage());
@@ -955,6 +958,7 @@ public class NonPersistentTopic implements Topic {
isEncryptionRequired = data.encryption_required;
schemaCompatibilityStrategy =
SchemaCompatibilityStrategy.fromAutoUpdatePolicy(
data.schema_auto_update_compatibility_strategy);
+ schemaValidationEnforced = data.schema_validation_enforced;
producers.forEach(producer -> {
producer.checkPermissions();
@@ -990,6 +994,9 @@ public class NonPersistentTopic implements Topic {
}
@Override
+ public boolean getSchemaValidationEnforced() { return
schemaValidationEnforced; }
+
+ @Override
public boolean isReplicated() {
return replicators.size() > 1;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 19bc174..23ca67a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -184,6 +184,8 @@ public class PersistentTopic implements Topic,
AddEntryCallback {
private volatile boolean isEncryptionRequired = false;
private volatile SchemaCompatibilityStrategy schemaCompatibilityStrategy =
SchemaCompatibilityStrategy.FULL;
+ // schema validation enforced flag
+ private volatile boolean schemaValidationEnforced = false;
private static final FastThreadLocal<TopicStatsHelper>
threadLocalTopicStats = new FastThreadLocal<TopicStatsHelper>() {
@Override
@@ -262,6 +264,8 @@ public class PersistentTopic implements Topic,
AddEntryCallback {
schemaCompatibilityStrategy =
SchemaCompatibilityStrategy.fromAutoUpdatePolicy(
policies.schema_auto_update_compatibility_strategy);
+
+ schemaValidationEnforced = policies.schema_validation_enforced;
} catch (Exception e) {
log.warn("[{}] Error getting policies {} and isEncryptionRequired
will be set to false", topic, e.getMessage());
isEncryptionRequired = false;
@@ -1621,6 +1625,9 @@ public class PersistentTopic implements Topic,
AddEntryCallback {
schemaCompatibilityStrategy =
SchemaCompatibilityStrategy.fromAutoUpdatePolicy(
data.schema_auto_update_compatibility_strategy);
+ schemaValidationEnforced = data.schema_validation_enforced;
+
+
initializeDispatchRateLimiterIfNeeded(Optional.ofNullable(data));
producers.forEach(producer -> {
@@ -1691,6 +1698,9 @@ public class PersistentTopic implements Topic,
AddEntryCallback {
}
@Override
+ public boolean getSchemaValidationEnforced() { return
schemaValidationEnforced; }
+
+ @Override
public boolean isReplicated() {
return !replicators.isEmpty();
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaValidationEnforced.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaValidationEnforced.java
new file mode 100644
index 0000000..b193d3a
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaValidationEnforced.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import com.google.common.collect.Sets;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.schema.PostSchemaPayload;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.slf4j.Logger;
+import org.testng.Assert;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import org.testng.collections.Maps;
+
+import java.util.Map;
+
+@Slf4j
+public class AdminApiSchemaValidationEnforced extends
MockedPulsarServiceBaseTest {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(AdminApiSchemaValidationEnforced.class);
+
+ @BeforeMethod
+ @Override
+ public void setup() throws Exception {
+ super.internalSetup();
+
+ admin.clusters().createCluster("test", new
ClusterData("http://127.0.0.1" + ":" + BROKER_WEBSERVICE_PORT));
+ TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1",
"role2"), Sets.newHashSet("test"));
+ admin.tenants().createTenant("schema-validation-enforced", tenantInfo);
+ }
+
+ @AfterMethod
+ @Override
+ public void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test
+ public void testDisableSchemaValidationEnforcedNoSchema() throws Exception
{
+
admin.namespaces().createNamespace("schema-validation-enforced/default-no-schema");
+ String namespace = "schema-validation-enforced/default-no-schema";
+ String topicName =
"persistent://schema-validation-enforced/default-no-schema/test";
+
Assert.assertEquals(admin.namespaces().getSchemaValidationEnforced(namespace),
false);
+ admin.namespaces().setSchemaValidationEnforced(namespace, false);
+ try {
+ admin.schemas().getSchemaInfo(topicName);
+ } catch (PulsarAdminException.NotFoundException e) {
+ Assert.assertTrue(e.getMessage().contains("HTTP 404 Not Found"));
+ }
+ try (Producer p =
pulsarClient.newProducer().topic(topicName).create()) {
+ p.send("test schemaValidationEnforced".getBytes());
+ }
+ }
+
+ @Test
+ public void testDisableSchemaValidationEnforcedHasSchema() throws
Exception {
+
admin.namespaces().createNamespace("schema-validation-enforced/default-has-schema");
+ String namespace = "schema-validation-enforced/default-has-schema";
+ String topicName =
"persistent://schema-validation-enforced/default-has-schema/test";
+
Assert.assertEquals(admin.namespaces().getSchemaValidationEnforced(namespace),
false);
+ admin.namespaces().setSchemaValidationEnforced(namespace, false);
+ try {
+ admin.schemas().getSchemaInfo(topicName);
+ } catch (PulsarAdminException.NotFoundException e) {
+ Assert.assertTrue(e.getMessage().contains("HTTP 404 Not Found"));
+ }
+ Map<String, String> properties = Maps.newHashMap();
+ SchemaInfo schemaInfo = new SchemaInfo();
+ schemaInfo.setType(SchemaType.STRING);
+ schemaInfo.setProperties(properties);
+ schemaInfo.setName("test");
+ schemaInfo.setSchema("".getBytes());
+ PostSchemaPayload postSchemaPayload = new PostSchemaPayload("STRING",
"", properties);
+ admin.schemas().createSchema(topicName, postSchemaPayload);
+ try (Producer p =
pulsarClient.newProducer().topic(topicName).create()) {
+ p.send("test schemaValidationEnforced".getBytes());
+ }
+ Assert.assertEquals(admin.schemas().getSchemaInfo(topicName),
schemaInfo);
+ }
+
+
+ @Test
+ public void testEnableSchemaValidationEnforcedNoSchema() throws Exception {
+
admin.namespaces().createNamespace("schema-validation-enforced/enable-no-schema");
+ String namespace = "schema-validation-enforced/enable-no-schema";
+ String topicName =
"persistent://schema-validation-enforced/enable-no-schema/test";
+
Assert.assertEquals(admin.namespaces().getSchemaValidationEnforced(namespace),
false);
+ admin.namespaces().setSchemaValidationEnforced(namespace,true);
+ try {
+ admin.schemas().getSchemaInfo(topicName);
+ } catch (PulsarAdminException.NotFoundException e) {
+ Assert.assertTrue(e.getMessage().contains("HTTP 404 Not Found"));
+ }
+ try (Producer p =
pulsarClient.newProducer().topic(topicName).create()) {
+ p.send("test schemaValidationEnforced".getBytes());
+ }
+ }
+
+ @Test
+ public void testEnableSchemaValidationEnforcedHasSchemaMismatch() throws
Exception {
+
admin.namespaces().createNamespace("schema-validation-enforced/enable-has-schema-mismatch");
+ String namespace =
"schema-validation-enforced/enable-has-schema-mismatch";
+ String topicName =
"persistent://schema-validation-enforced/enable-has-schema-mismatch/test";
+
Assert.assertEquals(admin.namespaces().getSchemaValidationEnforced(namespace),
false);
+ admin.namespaces().setSchemaValidationEnforced(namespace,true);
+
Assert.assertEquals(admin.namespaces().getSchemaValidationEnforced(namespace),
true);
+ admin.topics().createNonPartitionedTopic(topicName);
+ admin.topics().getStats(topicName);
+ try {
+ admin.schemas().getSchemaInfo(topicName);
+ } catch (PulsarAdminException.NotFoundException e) {
+ Assert.assertTrue(e.getMessage().contains("HTTP 404 Not Found"));
+ }
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("key1", "value1");
+ SchemaInfo schemaInfo = new SchemaInfo();
+ schemaInfo.setType(SchemaType.STRING);
+ schemaInfo.setProperties(properties);
+ schemaInfo.setName("test");
+ schemaInfo.setSchema("".getBytes());
+ PostSchemaPayload postSchemaPayload = new PostSchemaPayload("STRING",
"{'key':'value'}", properties);
+ admin.schemas().createSchema(topicName, postSchemaPayload);
+ try (Producer p =
pulsarClient.newProducer().topic(topicName).create()) {
+ Assert.fail("Client no schema, but topic has schema, should fail");
+ } catch (PulsarClientException e) {
+
Assert.assertTrue(e.getMessage().contains("IncompatibleSchemaException"));
+ }
+
Assert.assertEquals(admin.schemas().getSchemaInfo(topicName).getName(),
schemaInfo.getName());
+
Assert.assertEquals(admin.schemas().getSchemaInfo(topicName).getType(),
schemaInfo.getType());
+ }
+
+ @Test
+ public void testEnableSchemaValidationEnforcedHasSchemaMatch() throws
Exception {
+
admin.namespaces().createNamespace("schema-validation-enforced/enable-has-schema-match");
+ String namespace =
"schema-validation-enforced/enable-has-schema-match";
+ String topicName =
"persistent://schema-validation-enforced/enable-has-schema-match/test";
+
Assert.assertEquals(admin.namespaces().getSchemaValidationEnforced(namespace),
false);
+ try {
+ admin.schemas().getSchemaInfo(topicName);
+ } catch (PulsarAdminException.NotFoundException e) {
+ Assert.assertTrue(e.getMessage().contains("HTTP 404 Not Found"));
+ }
+ admin.namespaces().setSchemaValidationEnforced(namespace,true);
+ Map<String, String> properties = Maps.newHashMap();
+ SchemaInfo schemaInfo = new SchemaInfo();
+ schemaInfo.setType(SchemaType.STRING);
+ schemaInfo.setProperties(properties);
+ schemaInfo.setName("test");
+ schemaInfo.setSchema("".getBytes());
+ PostSchemaPayload postSchemaPayload = new PostSchemaPayload("STRING",
"", properties);
+ admin.schemas().createSchema(topicName, postSchemaPayload);
+ try (Producer<String> p =
pulsarClient.newProducer(Schema.STRING).topic(topicName).create()) {
+ p.send("test schemaValidationEnforced");
+ }
+
Assert.assertEquals(admin.schemas().getSchemaInfo(topicName).getName(),
schemaInfo.getName());
+
Assert.assertEquals(admin.schemas().getSchemaInfo(topicName).getType(),
schemaInfo.getType());
+ }
+
+}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index 86da063..6b8c22c 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -1361,4 +1361,36 @@ public interface Namespaces {
void setSchemaAutoUpdateCompatibilityStrategy(String namespace,
SchemaAutoUpdateCompatibilityStrategy strategy)
throws PulsarAdminException;
+
+ /**
+ * Get schema validation enforced for namespace.
+ * @return the schema validation enforced flag
+ * @throws NotAuthorizedException
+ * Don't have admin permission
+ * @throws NotFoundException
+ * Tenant or Namespace does not exist
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+
+ boolean getSchemaValidationEnforced(String namespace)
+ throws PulsarAdminException;
+ /**
+ * Set schema validation enforced for namespace.
+ * if a producer without a schema attempts to produce to a topic with
schema in this the namespace, the
+ * producer will be failed to connect. PLEASE be carefully on using this,
since non-java clients don't
+ * support schema. if you enable this setting, it will cause non-java
clients failed to produce.
+ *
+ * @param namespace pulsar namespace name
+ * @param schemaValidationEnforced flag to enable or disable schema
validation for the given namespace
+ * @throws NotAuthorizedException
+ * Don't have admin permission
+ * @throws NotFoundException
+ * Tenant or Namespace does not exist
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+
+ void setSchemaValidationEnforced(String namespace, boolean
schemaValidationEnforced)
+ throws PulsarAdminException;
}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index 6146e11..b80d238 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -810,6 +810,30 @@ public class NamespacesImpl extends BaseResource
implements Namespaces {
}
}
+ @Override
+ public boolean getSchemaValidationEnforced(String namespace)
+ throws PulsarAdminException {
+ try {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "schemaValidationEnforced");
+ return request(path).get(Boolean.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public void setSchemaValidationEnforced(String namespace, boolean
schemaValidationEnforced)
+ throws PulsarAdminException {
+ try {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "schemaValidationEnforced");
+ request(path).post(Entity.entity(schemaValidationEnforced,
MediaType.APPLICATION_JSON), ErrorData.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
private WebTarget namespacePath(NamespaceName namespace, String... parts) {
final WebTarget base = namespace.isV2() ? adminV2Namespaces :
adminNamespaces;
WebTarget namespacePath = base.path(namespace.toString());
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 8b1b147..d45df9c 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -1013,6 +1013,41 @@ public class CmdNamespaces extends CmdBase {
}
}
+ @Parameters(commandDescription = "Get the schema validation enforced")
+ private class GetSchemaValidationEnforced extends CliCommand {
+ @Parameter(description = "tenant/namespace", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String namespace = validateNamespace(params);
+
+
System.out.println(admin.namespaces().getSchemaValidationEnforced(namespace));
+ }
+ }
+
+ @Parameters(commandDescription = "Set the schema whether open schema
validation enforced")
+ private class SetSchemaValidationEnforced extends CliCommand {
+ @Parameter(description = "tenant/namespace", required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = { "--enable", "-e" }, description = "Enable schema
validation enforced")
+ private boolean enable = false;
+
+ @Parameter(names = { "--disable", "-d" }, description = "Disable
schema validation enforced")
+ private boolean disable = false;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String namespace = validateNamespace(params);
+
+ if (enable == disable) {
+ throw new ParameterException("Need to specify either --enable
or --disable");
+ }
+ admin.namespaces().setSchemaValidationEnforced(namespace, enable);
+ }
+ }
+
public CmdNamespaces(PulsarAdmin admin) {
super("namespaces", admin);
jcommander.addCommand("list", new GetNamespacesPerProperty());
@@ -1093,5 +1128,8 @@ public class CmdNamespaces extends CmdBase {
jcommander.addCommand("get-schema-autoupdate-strategy", new
GetSchemaAutoUpdateStrategy());
jcommander.addCommand("set-schema-autoupdate-strategy", new
SetSchemaAutoUpdateStrategy());
+
+ jcommander.addCommand("get-schema-validation-enforce", new
GetSchemaValidationEnforced());
+ jcommander.addCommand("set-schema-validation-enforce", new
SetSchemaValidationEnforced());
}
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
index f7f8ff6..23728dd 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
@@ -65,6 +65,8 @@ public class Policies {
public SchemaAutoUpdateCompatibilityStrategy
schema_auto_update_compatibility_strategy =
SchemaAutoUpdateCompatibilityStrategy.Full;
+ public boolean schema_validation_enforced = false;
+
@Override
public int hashCode() {
return Objects.hash(auth_policies, replication_clusters,
@@ -77,7 +79,8 @@ public class Policies {
max_consumers_per_topic, max_consumers_per_subscription,
compaction_threshold, offload_threshold,
offload_deletion_lag_ms,
- schema_auto_update_compatibility_strategy);
+ schema_auto_update_compatibility_strategy,
+ schema_validation_enforced);
}
@Override
@@ -104,7 +107,8 @@ public class Policies {
&& compaction_threshold == other.compaction_threshold
&& offload_threshold == other.offload_threshold
&& offload_deletion_lag_ms == other.offload_deletion_lag_ms
- && schema_auto_update_compatibility_strategy ==
other.schema_auto_update_compatibility_strategy;
+ && schema_auto_update_compatibility_strategy ==
other.schema_auto_update_compatibility_strategy
+ && schema_validation_enforced ==
other.schema_validation_enforced;
}
return false;
@@ -146,6 +150,7 @@ public class Policies {
.add("compaction_threshold", compaction_threshold)
.add("offload_threshold", offload_threshold)
.add("offload_deletion_lag_ms", offload_deletion_lag_ms)
- .add("schema_auto_update_compatibility_strategy",
schema_auto_update_compatibility_strategy).toString();
+ .add("schema_auto_update_compatibility_strategy",
schema_auto_update_compatibility_strategy)
+ .add("schema_validation_enforced",
schema_validation_enforced).toString();
}
}