This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new d8a388ad924 [feature][broker] Support schemaValidationEnforced on topic level (#15712) d8a388ad924 is described below commit d8a388ad9240d783d203f202136990b1f29b243f Author: Jiwei Guo <techno...@apache.org> AuthorDate: Tue May 24 06:52:19 2022 -0700 [feature][broker] Support schemaValidationEnforced on topic level (#15712) --- .../broker/admin/impl/PersistentTopicsBase.java | 20 +++++++++ .../apache/pulsar/broker/admin/v2/Namespaces.java | 2 +- .../pulsar/broker/admin/v2/PersistentTopics.java | 51 ++++++++++++++++++++++ .../pulsar/broker/namespace/NamespaceService.java | 2 +- .../pulsar/broker/service/AbstractTopic.java | 8 ++-- .../service/nonpersistent/NonPersistentTopic.java | 2 - .../broker/service/persistent/PersistentTopic.java | 4 -- .../apache/pulsar/broker/admin/AdminApi2Test.java | 16 +++++++ .../org/apache/pulsar/client/admin/Topics.java | 30 +++++++++++++ .../pulsar/client/admin/internal/TopicsImpl.java | 38 ++++++++++++++++ .../org/apache/pulsar/admin/cli/CmdTopics.java | 33 ++++++++++++++ .../policies/data/HierarchyTopicPolicies.java | 3 ++ .../pulsar/common/policies/data/TopicPolicies.java | 6 +++ .../apache/pulsar/common/protocol/Commands.java | 2 +- 14 files changed, 204 insertions(+), 13 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index eab7ac0fe5b..2f3312c4ae2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -5019,4 +5019,24 @@ public class PersistentTopicsBase extends AdminResource { .filter(topic -> includeSystemTopic ? true : !pulsar().getBrokerService().isSystemTopic(topic)) .collect(Collectors.toList()); } + + protected CompletableFuture<Boolean> internalGetSchemaValidationEnforced(boolean applied) { + return getTopicPoliciesAsyncWithRetry(topicName) + .thenApply(op -> op.map(TopicPolicies::getSchemaValidationEnforced).orElseGet(() -> { + if (applied) { + boolean namespacePolicy = getNamespacePolicies(namespaceName).schema_validation_enforced; + return namespacePolicy || pulsar().getConfiguration().isSchemaValidationEnforced(); + } + return false; + })); + } + + protected CompletableFuture<Void> internalSetSchemaValidationEnforced(boolean schemaValidationEnforced) { + return getTopicPoliciesAsyncWithRetry(topicName) + .thenCompose(op -> { + TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); + topicPolicies.setSchemaValidationEnforced(schemaValidationEnforced); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); + }); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index 7dcf97020fb..2162fa3ee87 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -1822,7 +1822,7 @@ public class Namespaces extends NamespacesBase { @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or Namespace doesn't exist"), @ApiResponse(code = 412, message = "schemaValidationEnforced value is not valid")}) - public void setSchemaValidtionEnforced(@PathParam("tenant") String tenant, + public void setSchemaValidationEnforced(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace, @ApiParam(value = "Flag of whether validation is enforced on the specified namespace", diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 87bb1fed581..4cd405d3e9a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -3911,5 +3911,56 @@ public class PersistentTopics extends PersistentTopicsBase { }); } + @GET + @Path("/{tenant}/{namespace}/{topic}/schemaValidationEnforced") + @ApiOperation(value = "Get schema validation enforced flag for topic.") + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Tenants or Namespace doesn't exist") }) + public void getSchemaValidationEnforced(@Suspended AsyncResponse asyncResponse, + @ApiParam(value = "Specify the tenant", required = true) + @PathParam("tenant") String tenant, + @ApiParam(value = "Specify the namespace", required = true) + @PathParam("namespace") String namespace, + @ApiParam(value = "Specify topic name", required = true) + @PathParam("topic") @Encoded String encodedTopic, + @QueryParam("applied") @DefaultValue("false") boolean applied, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + validateTopicName(tenant, namespace, encodedTopic); + preValidation(authoritative) + .thenCompose(__ -> internalGetSchemaValidationEnforced(applied)) + .thenAccept(asyncResponse::resume) + .exceptionally(ex -> { + handleTopicPolicyException("getSchemaValidationEnforced", ex, asyncResponse); + return null; + }); + } + + @POST + @Path("/{tenant}/{namespace}/{topic}/schemaValidationEnforced") + @ApiOperation(value = "Set schema validation enforced flag on topic.") + @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Tenant or Namespace doesn't exist"), + @ApiResponse(code = 412, message = "schemaValidationEnforced value is not valid")}) + public void setSchemaValidationEnforced(@Suspended AsyncResponse asyncResponse, + @ApiParam(value = "Specify the tenant", required = true) + @PathParam("tenant") String tenant, + @ApiParam(value = "Specify the namespace", required = true) + @PathParam("namespace") String namespace, + @ApiParam(value = "Specify topic name", required = true) + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, + @ApiParam(required = true) boolean schemaValidationEnforced) { + validateTopicName(tenant, namespace, encodedTopic); + preValidation(authoritative) + .thenCompose(__ -> internalSetSchemaValidationEnforced(schemaValidationEnforced)) + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + handleTopicPolicyException("setSchemaValidationEnforced", ex, asyncResponse); + return null; + }); + } + private static final Logger log = LoggerFactory.getLogger(PersistentTopics.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index fd51837692a..38fda886793 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -1215,7 +1215,7 @@ public class NamespaceService implements AutoCloseable { return PulsarWebResource.checkLocalOrGetPeerReplicationCluster(pulsar, namespaceName) .thenCompose(peerClusterData -> { // if peer-cluster-data is present it means namespace is owned by that peer-cluster and request - // should be redirect to the peer-cluster + // should redirect to the peer-cluster if (peerClusterData != null) { return getNonPersistentTopicsFromPeerCluster(peerClusterData, namespaceName); } else { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index ad512949168..edc7bc89647 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -108,8 +108,6 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP protected volatile boolean isEncryptionRequired = false; protected volatile Boolean isAllowAutoUpdateSchema; - // schema validation enforced flag - protected volatile boolean schemaValidationEnforced = false; protected volatile PublishRateLimiter topicPublishRateLimiter; @@ -221,7 +219,7 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP DispatchRateImpl.normalize(data.getSubscriptionDispatchRate())); topicPolicies.getCompactionThreshold().updateTopicValue(data.getCompactionThreshold()); topicPolicies.getDispatchRate().updateTopicValue(DispatchRateImpl.normalize(data.getDispatchRate())); - + topicPolicies.getSchemaValidationEnforced().updateTopicValue(data.getSchemaValidationEnforced()); this.subscriptionPolicies = data.getSubscriptionPolicies(); } @@ -269,6 +267,7 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP brokerService.getPulsar().getConfig().getClusterName()); updateSchemaCompatibilityStrategyNamespaceValue(namespacePolicies); updateNamespaceDispatchRate(namespacePolicies, brokerService.getPulsar().getConfig().getClusterName()); + topicPolicies.getSchemaValidationEnforced().updateNamespaceValue(namespacePolicies.schema_validation_enforced); } private void updateNamespaceDispatchRate(Policies namespacePolicies, String cluster) { @@ -362,6 +361,7 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP topicPolicies.getSchemaCompatibilityStrategy() .updateBrokerValue(formatSchemaCompatibilityStrategy(schemaCompatibilityStrategy)); topicPolicies.getDispatchRate().updateBrokerValue(dispatchRateInBroker(config)); + topicPolicies.getSchemaValidationEnforced().updateBrokerValue(config.isSchemaValidationEnforced()); } private DispatchRateImpl dispatchRateInBroker(ServiceConfiguration config) { @@ -570,7 +570,7 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP @Override public boolean getSchemaValidationEnforced() { - return schemaValidationEnforced; + return topicPolicies.getSchemaValidationEnforced().get(); } public void markBatchMessagePublished() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 58dd9302cf3..b8f41fa4c40 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -169,7 +169,6 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol updateTopicPolicyByNamespacePolicy(policies); isEncryptionRequired = policies.encryption_required; isAllowAutoUpdateSchema = policies.is_allow_auto_update_schema; - schemaValidationEnforced = policies.schema_validation_enforced; } updatePublishDispatcher(); updateResourceGroupLimiter(optPolicies); @@ -1008,7 +1007,6 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol isEncryptionRequired = data.encryption_required; isAllowAutoUpdateSchema = data.is_allow_auto_update_schema; - schemaValidationEnforced = data.schema_validation_enforced; List<CompletableFuture<Void>> producerCheckFutures = new ArrayList<>(producers.size()); producers.values().forEach(producer -> producerCheckFutures.add( diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 4d8ac3e0032..dec5d80cb5f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -341,8 +341,6 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal this.isEncryptionRequired = policies.encryption_required; isAllowAutoUpdateSchema = policies.is_allow_auto_update_schema; - - schemaValidationEnforced = policies.schema_validation_enforced; }).exceptionally(ex -> { log.warn("[{}] Error getting policies {} and isEncryptionRequired will be set to false", topic, ex.getMessage()); @@ -2398,8 +2396,6 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal isAllowAutoUpdateSchema = data.is_allow_auto_update_schema; - schemaValidationEnforced = data.schema_validation_enforced; - initializeDispatchRateLimiterIfNeeded(); updateSubscribeRateLimiter(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java index 87d4f83f7de..fa1b7ba1657 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java @@ -2472,4 +2472,20 @@ public class AdminApi2Test extends MockedPulsarServiceBaseTest { assertEquals(topicStats.getPublishers().size(), 2); topicStats.getPublishers().forEach(p -> assertTrue(p.isSupportsPartialProducer())); } + + @Test(dataProvider = "topicType") + public void testSchemaValidationEnforced(String topicType) throws Exception { + final String topic = topicType + "://prop-xyz/ns1/test-schema-validation-enforced"; + admin.topics().createPartitionedTopic(topic, 1); + @Cleanup + Producer<byte[]> producer1 = pulsarClient.newProducer() + .topic(topic + TopicName.PARTITIONED_TOPIC_SUFFIX + 0) + .create(); + boolean schemaValidationEnforced = admin.topics().getSchemaValidationEnforced(topic, false); + assertEquals(schemaValidationEnforced, false); + admin.topics().setSchemaValidationEnforced(topic, true); + Awaitility.await().untilAsserted(() -> + assertEquals(admin.topics().getSchemaValidationEnforced(topic, false), true) + ); + } } diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java index ae5e90a287c..48ef03cd67e 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -4013,4 +4013,34 @@ public interface Topics { * @return a map of replicated subscription status on a topic */ CompletableFuture<Map<String, Boolean>> getReplicatedSubscriptionStatusAsync(String topic, String subName); + + /** + * Get schema validation enforced for a topic. + * + * @param topic topic name + * @return whether the schema validation enforced is set or not + */ + boolean getSchemaValidationEnforced(String topic, boolean applied) throws PulsarAdminException; + + /** + * Get schema validation enforced for a topic. + * + * @param topic topic name + */ + void setSchemaValidationEnforced(String topic, boolean enable) throws PulsarAdminException; + + /** + * Get schema validation enforced for a topic asynchronously. + * + * @param topic topic name + * @return whether the schema validation enforced is set or not + */ + CompletableFuture<Boolean> getSchemaValidationEnforcedAsync(String topic, boolean applied); + + /** + * Get schema validation enforced for a topic asynchronously. + * + * @param topic topic name + */ + CompletableFuture<Void> setSchemaValidationEnforcedAsync(String topic, boolean enable); } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 0b2aad296ff..5314872fc00 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -2970,6 +2970,44 @@ public class TopicsImpl extends BaseResource implements Topics { return future; } + @Override + public boolean getSchemaValidationEnforced(String topic, boolean applied) throws PulsarAdminException { + return sync(() -> getSchemaValidationEnforcedAsync(topic, applied)); + } + + @Override + public void setSchemaValidationEnforced(String topic, boolean enable) throws PulsarAdminException { + sync(() -> setSchemaValidationEnforcedAsync(topic, enable)); + } + + @Override + public CompletableFuture<Boolean> getSchemaValidationEnforcedAsync(String topic, boolean applied) { + TopicName tn = validateTopic(topic); + WebTarget path = topicPath(tn, "schemaValidationEnforced"); + path = path.queryParam("applied", applied); + final CompletableFuture<Boolean> future = new CompletableFuture<>(); + asyncGetRequest(path, + new InvocationCallback<Boolean>() { + @Override + public void completed(Boolean enforced) { + future.complete(enforced); + } + + @Override + public void failed(Throwable throwable) { + future.completeExceptionally(getApiException(throwable.getCause())); + } + }); + return future; + } + + @Override + public CompletableFuture<Void> setSchemaValidationEnforcedAsync(String topic, boolean schemaValidationEnforced) { + TopicName tn = validateTopic(topic); + WebTarget path = topicPath(tn, "schemaValidationEnforced"); + return asyncPostRequest(path, Entity.entity(schemaValidationEnforced, MediaType.APPLICATION_JSON)); + } + @Override public Set<String> getReplicationClusters(String topic, boolean applied) throws PulsarAdminException { return sync(() -> getReplicationClustersAsync(topic, applied)); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index c4476472338..3c2d40c43cd 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -242,6 +242,9 @@ public class CmdTopics extends CmdBase { jcommander.addCommand("set-replication-clusters", new SetReplicationClusters()); jcommander.addCommand("remove-replication-clusters", new RemoveReplicationClusters()); + jcommander.addCommand("get-schema-validation-enforce", new GetSchemaValidationEnforced()); + jcommander.addCommand("set-schema-validation-enforce", new SetSchemaValidationEnforced()); + initDeprecatedCommands(); } @@ -2840,4 +2843,34 @@ public class CmdTopics extends CmdBase { } } + + @Parameters(commandDescription = "Get the schema validation enforced") + private class GetSchemaValidationEnforced extends CliCommand { + @Parameter(description = "tenant/namespace", required = true) + private java.util.List<String> params; + + @Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic") + private boolean applied = false; + + @Override + void run() throws PulsarAdminException { + String topic = validateTopicName(params); + System.out.println(getAdmin().topics().getSchemaValidationEnforced(topic, applied)); + } + } + + @Parameters(commandDescription = "Set the schema whether open schema validation enforced") + private class SetSchemaValidationEnforced extends CliCommand { + @Parameter(description = "tenant/namespace", required = true) + private java.util.List<String> params; + + @Parameter(names = { "--enable", "-e" }, description = "Enable schema validation enforced") + private boolean enable = false; + + @Override + void run() throws PulsarAdminException { + String topic = validateTopicName(params); + getAdmin().topics().setSchemaValidationEnforced(topic, enable); + } + } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java index 0532744bec3..66c21a11716 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java @@ -58,6 +58,8 @@ public class HierarchyTopicPolicies { final PolicyHierarchyValue<SchemaCompatibilityStrategy> schemaCompatibilityStrategy; final PolicyHierarchyValue<DispatchRateImpl> dispatchRate; + final PolicyHierarchyValue<Boolean> schemaValidationEnforced; + public HierarchyTopicPolicies() { replicationClusters = new PolicyHierarchyValue<>(); retentionPolicies = new PolicyHierarchyValue<>(); @@ -86,5 +88,6 @@ public class HierarchyTopicPolicies { subscriptionDispatchRate = new PolicyHierarchyValue<>(); schemaCompatibilityStrategy = new PolicyHierarchyValue<>(); dispatchRate = new PolicyHierarchyValue<>(); + schemaValidationEnforced = new PolicyHierarchyValue<>(); } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java index ae04c2e2178..07e4dff56bd 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java @@ -78,6 +78,8 @@ public class TopicPolicies { @Builder.Default private Map<String/*subscription*/, SubscriptionPolicies> subscriptionPolicies = new HashMap<>(); + private Boolean schemaValidationEnforced; + public boolean isGlobalPolicies() { return isGlobal != null && isGlobal; } @@ -174,6 +176,10 @@ public class TopicPolicies { return subscribeRate != null; } + public boolean isSchemaValidationEnforced() { + return schemaValidationEnforced != null; + } + public Set<String> getReplicationClustersSet() { return replicationClusters != null ? Sets.newTreeSet(this.replicationClusters) : null; } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index 2d8e043058d..285a12321c1 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -1856,7 +1856,7 @@ public class Commands { case ExclusiveWithFencing: return org.apache.pulsar.common.api.proto.ProducerAccessMode.ExclusiveWithFencing; default: - throw new IllegalArgumentException("Unknonw access mode: " + accessMode); + throw new IllegalArgumentException("Unknown access mode: " + accessMode); } }