hangc0276 opened a new pull request #10341:
URL: https://github.com/apache/pulsar/pull/10341


   ### Motivation
   When running KOP with topic policy, it get the following exception.
   ```
   17:09:46.777 
[mock-pulsar-bk:org.apache.pulsar.broker.service.ServerCnx@1010] WARN  
org.apache.pulsar.broker.service.ServerCnx - 
[/127.0.0.1:55980][persistent://pulsar/test/localhost:15000/__change_events-partition-0][multiTopicsReader-c3d8054591]
 Failed to create consumer: consumerId=0, Topic policies cache have not init.
   java.util.concurrent.CompletionException: 
org.apache.pulsar.broker.service.BrokerServiceException$TopicPoliciesCacheNotInitException:
 Topic policies cache have not init.
           at 
java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326) 
~[?:1.8.0_172]
           at 
java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
 ~[?:1.8.0_172]
           at 
java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911) 
~[?:1.8.0_172]
           at 
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:953) 
~[?:1.8.0_172]
           at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
 ~[?:1.8.0_172]
           at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 
~[?:1.8.0_172]
           at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) 
~[?:1.8.0_172]
           at 
java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656) 
~[?:1.8.0_172]
           at 
java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:632)
 ~[?:1.8.0_172]
           at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 
~[?:1.8.0_172]
           at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) 
~[?:1.8.0_172]
           at 
org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.lambda$addEntry$32(BookkeeperSchemaStorage.java:566)
 ~[pulsar-broker-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT]
           at 
org.apache.bookkeeper.client.PulsarMockLedgerHandle.lambda$asyncAddEntry$5(PulsarMockLedgerHandle.java:194)
 ~[testmocks-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT]
           at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
 [?:1.8.0_172]
           at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
 [?:1.8.0_172]
           at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
 [?:1.8.0_172]
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_172]
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_172]
           at java.lang.Thread.run(Thread.java:748) [?:1.8.0_172]
   Caused by: 
org.apache.pulsar.broker.service.BrokerServiceException$TopicPoliciesCacheNotInitException:
 Topic policies cache have not init.
           at 
org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.getTopicPolicies(SystemTopicBasedTopicPoliciesService.java:148)
 ~[pulsar-broker-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT]
           at 
org.apache.pulsar.broker.service.persistent.PersistentTopic.checkSubscriptionTypesEnable(PersistentTopic.java:2872)
 ~[pulsar-broker-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT]
           at 
org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:648)
 ~[pulsar-broker-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT]
           at 
org.apache.pulsar.broker.service.ServerCnx.lambda$null$12(ServerCnx.java:966) 
~[pulsar-broker-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT]
           at 
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) 
~[?:1.8.0_172]
           ... 15 more
   17:09:46.801 
[pulsar-client-io-34-1:org.apache.pulsar.client.impl.ClientCnx@682] WARN  
org.apache.pulsar.client.impl.ClientCnx - [id: 0xb9d000f5, L:/127.0.0.1:55980 - 
R:localhost/127.0.0.1:15002] Received error from server: Topic policies cache 
have not init.
   17:09:46.804 
[pulsar-client-io-34-1:org.apache.pulsar.client.impl.ConsumerImpl@770] WARN  
org.apache.pulsar.client.impl.ConsumerImpl - 
[persistent://pulsar/test/localhost:15000/__change_events-partition-0][multiTopicsReader-c3d8054591]
 Failed to subscribe to topic on localhost/127.0.0.1:15002
   17:09:46.804 
[pulsar-client-io-34-1:org.apache.pulsar.client.impl.ConnectionHandler@102] 
WARN  org.apache.pulsar.client.impl.ConnectionHandler - 
[persistent://pulsar/test/localhost:15000/__change_events-partition-0] 
[multiTopicsReader-c3d8054591] Could not get connection to broker: Topic 
policies cache have not init. -- Will try again in 0.1 s
   ```
   The reason is that it doesn't catch exception in getTopicPolicies, which 
will lead to subscribe failed.
   ```Java
   public boolean checkSubscriptionTypesEnable(SubType subType) throws 
Exception {
           TopicName topicName = TopicName.get(topic);
           if 
(brokerService.pulsar().getConfiguration().isTopicLevelPoliciesEnabled()) {
               TopicPolicies topicPolicies =
                       
brokerService.pulsar().getTopicPoliciesService().getTopicPolicies(TopicName.get(topic));
               if (topicPolicies == null) {
                   return checkNsAndBrokerSubscriptionTypesEnable(topicName, 
subType);
               } else {
                   if (topicPolicies.getSubscriptionTypesEnabled().isEmpty()) {
                       return 
checkNsAndBrokerSubscriptionTypesEnable(topicName, subType);
                   }
                   return 
topicPolicies.getSubscriptionTypesEnabled().contains(subType);
               }
           } else {
               return checkNsAndBrokerSubscriptionTypesEnable(topicName, 
subType);
           }
       }
   ```
   
   ### Changes
   1. catch the exception in `checkSubscriptionTypesEnable`
   2. expose exception stack in `servercnx#handleSubscribe`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to