This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 105192d5baf [fix][broker] Fix topic policies cannot be queried with 
extensible load manager (#23326)
105192d5baf is described below

commit 105192d5baff8eb48814e89817a900a116624ac3
Author: Yunze Xu <[email protected]>
AuthorDate: Fri Sep 20 18:40:54 2024 +0800

    [fix][broker] Fix topic policies cannot be queried with extensible load 
manager (#23326)
---
 .../loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java | 2 +-
 .../main/java/org/apache/pulsar/broker/service/BrokerService.java   | 6 ++++++
 .../apache/pulsar/broker/service/persistent/PersistentTopic.java    | 3 +++
 3 files changed, 10 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index 3ebcd1c20ca..5893fc49244 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -267,7 +267,7 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
 
     @Override
     public synchronized boolean started() {
-        return validateChannelState(LeaderElectionServiceStarted, false);
+        return validateChannelState(Started, true);
     }
 
     public synchronized void start() throws PulsarServerException {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index aee6532716c..c7a210bc543 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1177,6 +1177,9 @@ public class BrokerService implements Closeable {
     }
 
     private CompletableFuture<Optional<TopicPolicies>> 
getTopicPoliciesBypassSystemTopic(@Nonnull TopicName topicName) {
+        if (ExtensibleLoadManagerImpl.isInternalTopic(topicName.toString())) {
+            return CompletableFuture.completedFuture(Optional.empty());
+        }
         return 
pulsar.getTopicPoliciesService().getTopicPoliciesAsync(topicName,
                 TopicPoliciesService.GetType.DEFAULT);
     }
@@ -3601,6 +3604,9 @@ public class BrokerService implements Closeable {
     public @Nonnull CompletableFuture<Boolean> 
isAllowAutoSubscriptionCreationAsync(@Nonnull TopicName tpName) {
         requireNonNull(tpName);
         // Policies priority: topic level -> namespace level -> broker level
+        if (ExtensibleLoadManagerImpl.isInternalTopic(tpName.toString())) {
+            return CompletableFuture.completedFuture(true);
+        }
         return pulsar.getTopicPoliciesService()
                 .getTopicPoliciesAsync(tpName, 
TopicPoliciesService.GetType.LOCAL_ONLY)
                 .thenCompose(optionalTopicPolicies -> {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index e951ffab1e2..9c0bdc120c4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -4324,6 +4324,9 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         final var topicPoliciesService = 
brokerService.pulsar().getTopicPoliciesService();
         final var partitionedTopicName = 
TopicName.getPartitionedTopicName(topic);
         if (topicPoliciesService.registerListener(partitionedTopicName, this)) 
{
+            if (ExtensibleLoadManagerImpl.isInternalTopic(topic)) {
+                return CompletableFuture.completedFuture(null);
+            }
             return 
topicPoliciesService.getTopicPoliciesAsync(partitionedTopicName,
                     TopicPoliciesService.GetType.DEFAULT
             ).thenAcceptAsync(optionalPolicies -> 
optionalPolicies.ifPresent(this::onUpdate),

Reply via email to