codelipenghui commented on a change in pull request #13355:
URL: https://github.com/apache/pulsar/pull/13355#discussion_r794355196



##########
File path: 
pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
##########
@@ -538,6 +538,60 @@ public void testAuthData() throws Exception {
         log.info("-- Exiting {} test --", methodName);
     }
 
+    @Test
+    public void testPermissionForProducerCreateInitialSubscription() throws 
Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        
conf.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
+        setup();
+
+        String lookupUrl = pulsar.getBrokerServiceUrl();
+
+        final String invalidRole = "invalid-role";
+        final String producerRole = "producer-role";
+        final String topic = "persistent://my-property/my-ns/my-topic";
+        final String initialSubscriptionName = "init-sub";
+        Authentication adminAuthentication = new 
ClientAuthentication("superUser");
+        Authentication authenticationInvalidRole = new 
ClientAuthentication(invalidRole);
+        Authentication authenticationProducerRole = new 
ClientAuthentication(producerRole);
+        @Cleanup
+        PulsarAdmin admin =
+                
PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString()).authentication(adminAuthentication).build();
+
+        admin.clusters().createCluster("test", 
ClusterData.builder().serviceUrl(brokerUrl.toString()).build());
+        admin.tenants().createTenant("my-property",
+                new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), 
Sets.newHashSet("test")));
+        admin.namespaces().createNamespace("my-property/my-ns", 
Sets.newHashSet("test"));
+        admin.topics().grantPermission(topic, invalidRole, 
Collections.singleton(AuthAction.produce));
+        admin.topics().grantPermission(topic, producerRole, 
Sets.newHashSet(AuthAction.produce, AuthAction.consume));

Review comment:
       Should check if the client does not have the consume permission, the 
producer should fail.

##########
File path: 
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
##########
@@ -596,4 +596,13 @@
      * @return the producer builder instance
      */
     ProducerBuilder<T> enableLazyStartPartitionedProducers(boolean 
lazyStartPartitionedProducers);
+
+    /**
+     * Use this config to automatically create an initial subscription when 
creating the topic.
+     * If this field is not set, the initial subscription will not be created.
+     *
+     * @param initialSubscriptionName Name of the initial subscription of the 
topic.
+     * @return the producer builder instance
+     */
+    ProducerBuilder<T> initialSubscriptionName(String initialSubscriptionName);

Review comment:
       It's better to keep the API as an internal API, do not expose it to 
users.

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -1236,7 +1238,30 @@ protected void handleProducer(final CommandProducer 
cmdProducer) {
                     });
 
                     schemaVersionFuture.thenAccept(schemaVersion -> {
-                        
topic.checkIfTransactionBufferRecoverCompletely(isTxnEnabled).thenAccept(future 
-> {
+                        
topic.checkIfTransactionBufferRecoverCompletely(isTxnEnabled).thenCompose(future
 -> {
+                            if (Strings.isNullOrEmpty(initialSubscriptionName)
+                                    || 
topic.getSubscriptions().containsKey(initialSubscriptionName)
+                                    || !topic.isPersistent()) {
+                                return CompletableFuture.completedFuture(null);
+                            }
+                            return isTopicOperationAllowed(
+                                    topicName, TopicOperation.SUBSCRIBE
+                            ).thenCompose(canSubscribe -> {
+                                if (!canSubscribe) {
+                                    log.warn(
+                                            "[{}] Could not create the initial 
subscription {} for topic {}. Client "
+                                                    + "is not authorized to 
subscribe with role {}",
+                                            remoteAddress, 
initialSubscriptionName, topicName, getPrincipal());
+                                    return 
CompletableFuture.completedFuture(null);

Review comment:
       Should complete with exception?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to