hangc0276 opened a new pull request #10341:
URL: https://github.com/apache/pulsar/pull/10341
### Motivation
When running KOP with topic policy, it get the following exception.
```
17:09:46.777
[mock-pulsar-bk:org.apache.pulsar.broker.service.ServerCnx@1010] WARN
org.apache.pulsar.broker.service.ServerCnx -
[/127.0.0.1:55980][persistent://pulsar/test/localhost:15000/__change_events-partition-0][multiTopicsReader-c3d8054591]
Failed to create consumer: consumerId=0, Topic policies cache have not init.
java.util.concurrent.CompletionException:
org.apache.pulsar.broker.service.BrokerServiceException$TopicPoliciesCacheNotInitException:
Topic policies cache have not init.
at
java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
~[?:1.8.0_172]
at
java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
~[?:1.8.0_172]
at
java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
~[?:1.8.0_172]
at
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:953)
~[?:1.8.0_172]
at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
~[?:1.8.0_172]
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
~[?:1.8.0_172]
at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
~[?:1.8.0_172]
at
java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656)
~[?:1.8.0_172]
at
java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:632)
~[?:1.8.0_172]
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
~[?:1.8.0_172]
at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
~[?:1.8.0_172]
at
org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.lambda$addEntry$32(BookkeeperSchemaStorage.java:566)
~[pulsar-broker-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT]
at
org.apache.bookkeeper.client.PulsarMockLedgerHandle.lambda$asyncAddEntry$5(PulsarMockLedgerHandle.java:194)
~[testmocks-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT]
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
[?:1.8.0_172]
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
[?:1.8.0_172]
at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
[?:1.8.0_172]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_172]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_172]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_172]
Caused by:
org.apache.pulsar.broker.service.BrokerServiceException$TopicPoliciesCacheNotInitException:
Topic policies cache have not init.
at
org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.getTopicPolicies(SystemTopicBasedTopicPoliciesService.java:148)
~[pulsar-broker-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT]
at
org.apache.pulsar.broker.service.persistent.PersistentTopic.checkSubscriptionTypesEnable(PersistentTopic.java:2872)
~[pulsar-broker-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT]
at
org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:648)
~[pulsar-broker-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT]
at
org.apache.pulsar.broker.service.ServerCnx.lambda$null$12(ServerCnx.java:966)
~[pulsar-broker-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT]
at
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
~[?:1.8.0_172]
... 15 more
17:09:46.801
[pulsar-client-io-34-1:org.apache.pulsar.client.impl.ClientCnx@682] WARN
org.apache.pulsar.client.impl.ClientCnx - [id: 0xb9d000f5, L:/127.0.0.1:55980 -
R:localhost/127.0.0.1:15002] Received error from server: Topic policies cache
have not init.
17:09:46.804
[pulsar-client-io-34-1:org.apache.pulsar.client.impl.ConsumerImpl@770] WARN
org.apache.pulsar.client.impl.ConsumerImpl -
[persistent://pulsar/test/localhost:15000/__change_events-partition-0][multiTopicsReader-c3d8054591]
Failed to subscribe to topic on localhost/127.0.0.1:15002
17:09:46.804
[pulsar-client-io-34-1:org.apache.pulsar.client.impl.ConnectionHandler@102]
WARN org.apache.pulsar.client.impl.ConnectionHandler -
[persistent://pulsar/test/localhost:15000/__change_events-partition-0]
[multiTopicsReader-c3d8054591] Could not get connection to broker: Topic
policies cache have not init. -- Will try again in 0.1 s
```
The reason is that it doesn't catch exception in getTopicPolicies, which
will lead to subscribe failed.
```Java
public boolean checkSubscriptionTypesEnable(SubType subType) throws
Exception {
TopicName topicName = TopicName.get(topic);
if
(brokerService.pulsar().getConfiguration().isTopicLevelPoliciesEnabled()) {
TopicPolicies topicPolicies =
brokerService.pulsar().getTopicPoliciesService().getTopicPolicies(TopicName.get(topic));
if (topicPolicies == null) {
return checkNsAndBrokerSubscriptionTypesEnable(topicName,
subType);
} else {
if (topicPolicies.getSubscriptionTypesEnabled().isEmpty()) {
return
checkNsAndBrokerSubscriptionTypesEnable(topicName, subType);
}
return
topicPolicies.getSubscriptionTypesEnabled().contains(subType);
}
} else {
return checkNsAndBrokerSubscriptionTypesEnable(topicName,
subType);
}
}
```
### Changes
1. catch the exception in `checkSubscriptionTypesEnable`
2. expose exception stack in `servercnx#handleSubscribe`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]