codelipenghui commented on a change in pull request #13355:
URL: https://github.com/apache/pulsar/pull/13355#discussion_r794355196
##########
File path:
pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
##########
@@ -538,6 +538,60 @@ public void testAuthData() throws Exception {
log.info("-- Exiting {} test --", methodName);
}
+ @Test
+ public void testPermissionForProducerCreateInitialSubscription() throws
Exception {
+ log.info("-- Starting {} test --", methodName);
+
+
conf.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
+ setup();
+
+ String lookupUrl = pulsar.getBrokerServiceUrl();
+
+ final String invalidRole = "invalid-role";
+ final String producerRole = "producer-role";
+ final String topic = "persistent://my-property/my-ns/my-topic";
+ final String initialSubscriptionName = "init-sub";
+ Authentication adminAuthentication = new
ClientAuthentication("superUser");
+ Authentication authenticationInvalidRole = new
ClientAuthentication(invalidRole);
+ Authentication authenticationProducerRole = new
ClientAuthentication(producerRole);
+ @Cleanup
+ PulsarAdmin admin =
+
PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString()).authentication(adminAuthentication).build();
+
+ admin.clusters().createCluster("test",
ClusterData.builder().serviceUrl(brokerUrl.toString()).build());
+ admin.tenants().createTenant("my-property",
+ new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"),
Sets.newHashSet("test")));
+ admin.namespaces().createNamespace("my-property/my-ns",
Sets.newHashSet("test"));
+ admin.topics().grantPermission(topic, invalidRole,
Collections.singleton(AuthAction.produce));
+ admin.topics().grantPermission(topic, producerRole,
Sets.newHashSet(AuthAction.produce, AuthAction.consume));
Review comment:
Should check if the client does not have the consume permission, the
producer should fail.
##########
File path:
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
##########
@@ -596,4 +596,13 @@
* @return the producer builder instance
*/
ProducerBuilder<T> enableLazyStartPartitionedProducers(boolean
lazyStartPartitionedProducers);
+
+ /**
+ * Use this config to automatically create an initial subscription when
creating the topic.
+ * If this field is not set, the initial subscription will not be created.
+ *
+ * @param initialSubscriptionName Name of the initial subscription of the
topic.
+ * @return the producer builder instance
+ */
+ ProducerBuilder<T> initialSubscriptionName(String initialSubscriptionName);
Review comment:
It's better to keep the API as an internal API, do not expose it to
users.
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -1236,7 +1238,30 @@ protected void handleProducer(final CommandProducer
cmdProducer) {
});
schemaVersionFuture.thenAccept(schemaVersion -> {
-
topic.checkIfTransactionBufferRecoverCompletely(isTxnEnabled).thenAccept(future
-> {
+
topic.checkIfTransactionBufferRecoverCompletely(isTxnEnabled).thenCompose(future
-> {
+ if (Strings.isNullOrEmpty(initialSubscriptionName)
+ ||
topic.getSubscriptions().containsKey(initialSubscriptionName)
+ || !topic.isPersistent()) {
+ return CompletableFuture.completedFuture(null);
+ }
+ return isTopicOperationAllowed(
+ topicName, TopicOperation.SUBSCRIBE
+ ).thenCompose(canSubscribe -> {
+ if (!canSubscribe) {
+ log.warn(
+ "[{}] Could not create the initial
subscription {} for topic {}. Client "
+ + "is not authorized to
subscribe with role {}",
+ remoteAddress,
initialSubscriptionName, topicName, getPrincipal());
+ return
CompletableFuture.completedFuture(null);
Review comment:
Should complete with exception?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]