This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 105192d5baf [fix][broker] Fix topic policies cannot be queried with
extensible load manager (#23326)
105192d5baf is described below
commit 105192d5baff8eb48814e89817a900a116624ac3
Author: Yunze Xu <[email protected]>
AuthorDate: Fri Sep 20 18:40:54 2024 +0800
[fix][broker] Fix topic policies cannot be queried with extensible load
manager (#23326)
---
.../loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java | 2 +-
.../main/java/org/apache/pulsar/broker/service/BrokerService.java | 6 ++++++
.../apache/pulsar/broker/service/persistent/PersistentTopic.java | 3 +++
3 files changed, 10 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index 3ebcd1c20ca..5893fc49244 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -267,7 +267,7 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
@Override
public synchronized boolean started() {
- return validateChannelState(LeaderElectionServiceStarted, false);
+ return validateChannelState(Started, true);
}
public synchronized void start() throws PulsarServerException {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index aee6532716c..c7a210bc543 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1177,6 +1177,9 @@ public class BrokerService implements Closeable {
}
private CompletableFuture<Optional<TopicPolicies>>
getTopicPoliciesBypassSystemTopic(@Nonnull TopicName topicName) {
+ if (ExtensibleLoadManagerImpl.isInternalTopic(topicName.toString())) {
+ return CompletableFuture.completedFuture(Optional.empty());
+ }
return
pulsar.getTopicPoliciesService().getTopicPoliciesAsync(topicName,
TopicPoliciesService.GetType.DEFAULT);
}
@@ -3601,6 +3604,9 @@ public class BrokerService implements Closeable {
public @Nonnull CompletableFuture<Boolean>
isAllowAutoSubscriptionCreationAsync(@Nonnull TopicName tpName) {
requireNonNull(tpName);
// Policies priority: topic level -> namespace level -> broker level
+ if (ExtensibleLoadManagerImpl.isInternalTopic(tpName.toString())) {
+ return CompletableFuture.completedFuture(true);
+ }
return pulsar.getTopicPoliciesService()
.getTopicPoliciesAsync(tpName,
TopicPoliciesService.GetType.LOCAL_ONLY)
.thenCompose(optionalTopicPolicies -> {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index e951ffab1e2..9c0bdc120c4 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -4324,6 +4324,9 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
final var topicPoliciesService =
brokerService.pulsar().getTopicPoliciesService();
final var partitionedTopicName =
TopicName.getPartitionedTopicName(topic);
if (topicPoliciesService.registerListener(partitionedTopicName, this))
{
+ if (ExtensibleLoadManagerImpl.isInternalTopic(topic)) {
+ return CompletableFuture.completedFuture(null);
+ }
return
topicPoliciesService.getTopicPoliciesAsync(partitionedTopicName,
TopicPoliciesService.GetType.DEFAULT
).thenAcceptAsync(optionalPolicies ->
optionalPolicies.ifPresent(this::onUpdate),