This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7fe92ac43cf [fix][broker] Support lookup options for extensible load 
manager (#22487)
7fe92ac43cf is described below

commit 7fe92ac43cfd2f2de5576a023498aac8b46c7ac8
Author: Kai Wang <kw...@apache.org>
AuthorDate: Tue Apr 23 15:22:44 2024 +0800

    [fix][broker] Support lookup options for extensible load manager (#22487)
---
 .../pulsar/broker/loadbalance/LoadManager.java     |  3 +-
 .../extensions/ExtensibleLoadManager.java          |  5 +-
 .../extensions/ExtensibleLoadManagerImpl.java      | 53 +++++++++---------
 .../extensions/ExtensibleLoadManagerWrapper.java   | 15 +++--
 .../channel/ServiceUnitStateChannelImpl.java       |  4 +-
 .../extensions/data/BrokerLookupData.java          | 17 +++++-
 .../pulsar/broker/namespace/NamespaceService.java  |  4 +-
 .../AntiAffinityNamespaceGroupExtensionTest.java   |  4 +-
 .../ExtensibleLoadManagerImplBaseTest.java         |  4 ++
 .../extensions/ExtensibleLoadManagerImplTest.java  | 65 ++++++++++++++++++----
 .../channel/ServiceUnitStateChannelTest.java       | 14 ++---
 .../extensions/data/BrokerLookupDataTest.java      | 32 ++++++++++-
 .../loadbalance/ExtensibleLoadManagerTest.java     |  3 +-
 13 files changed, 162 insertions(+), 61 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java
index 2cce68b60cb..0dd5d948480 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java
@@ -31,6 +31,7 @@ import 
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerWrap
 import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
 import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
 import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.broker.namespace.LookupOptions;
 import org.apache.pulsar.common.naming.ServiceUnitId;
 import org.apache.pulsar.common.stats.Metrics;
 import org.apache.pulsar.common.util.Reflections;
@@ -63,7 +64,7 @@ public interface LoadManager {
     Optional<ResourceUnit> getLeastLoaded(ServiceUnitId su) throws Exception;
 
     default CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(
-            Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
+            Optional<ServiceUnitId> topic, ServiceUnitId bundle, LookupOptions 
options) {
         throw new UnsupportedOperationException();
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManager.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManager.java
index b7da70d1cf1..eabf6005b43 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManager.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManager.java
@@ -60,9 +60,12 @@ public interface ExtensibleLoadManager extends Closeable {
      *              (e.g. {@link 
NamespaceService#internalGetWebServiceUrl(NamespaceBundle, LookupOptions)}),
      *              So the topic is optional.
      * @param serviceUnit service unit (e.g. bundle).
+     * @param options The lookup options.
      * @return The broker lookup data.
      */
-    CompletableFuture<Optional<BrokerLookupData>> 
assign(Optional<ServiceUnitId> topic, ServiceUnitId serviceUnit);
+    CompletableFuture<Optional<BrokerLookupData>> 
assign(Optional<ServiceUnitId> topic,
+                                                         ServiceUnitId 
serviceUnit,
+                                                         LookupOptions 
options);
 
     /**
      * Check the incoming service unit is owned by the current broker.
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index c8cf1c05756..a20694356b1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -88,6 +88,7 @@ import 
org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionS
 import 
org.apache.pulsar.broker.loadbalance.extensions.strategy.LeastResourceUsageWithWeight;
 import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
 import 
org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
+import org.apache.pulsar.broker.namespace.LookupOptions;
 import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -482,7 +483,8 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
 
     @Override
     public CompletableFuture<Optional<BrokerLookupData>> 
assign(Optional<ServiceUnitId> topic,
-                                                                ServiceUnitId 
serviceUnit) {
+                                                                ServiceUnitId 
serviceUnit,
+                                                                LookupOptions 
options) {
 
         final String bundle = serviceUnit.toString();
 
@@ -496,7 +498,7 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
                     if (candidateBrokerId != null) {
                         return 
CompletableFuture.completedFuture(Optional.of(candidateBrokerId));
                     }
-                    return getOrSelectOwnerAsync(serviceUnit, 
bundle).thenApply(Optional::ofNullable);
+                    return getOrSelectOwnerAsync(serviceUnit, bundle, 
options).thenApply(Optional::ofNullable);
                 });
             }
             return getBrokerLookupData(owner, bundle);
@@ -509,18 +511,18 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
     }
 
     private CompletableFuture<String> getOrSelectOwnerAsync(ServiceUnitId 
serviceUnit,
-                                                            String bundle) {
+                                                            String bundle,
+                                                            LookupOptions 
options) {
         return 
serviceUnitStateChannel.getOwnerAsync(bundle).thenCompose(broker -> {
             // If the bundle not assign yet, select and publish assign event 
to channel.
             if (broker.isEmpty()) {
-                return this.selectAsync(serviceUnit).thenCompose(brokerOpt -> {
+                return this.selectAsync(serviceUnit, Collections.emptySet(), 
options).thenCompose(brokerOpt -> {
                     if (brokerOpt.isPresent()) {
                         assignCounter.incrementSuccess();
                         log.info("Selected new owner broker: {} for bundle: 
{}.", brokerOpt.get(), bundle);
                         return 
serviceUnitStateChannel.publishAssignEventAsync(bundle, brokerOpt.get());
                     }
-                    throw new IllegalStateException(
-                            "Failed to select the new owner broker for bundle: 
" + bundle);
+                    return CompletableFuture.completedFuture(null);
                 });
             }
             assignCounter.incrementSkip();
@@ -534,22 +536,19 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
             String bundle) {
         return owner.thenCompose(broker -> {
             if (broker.isEmpty()) {
-                String errorMsg = String.format(
-                        "Failed to get or assign the owner for bundle:%s", 
bundle);
-                log.error(errorMsg);
-                throw new IllegalStateException(errorMsg);
-            }
-            return CompletableFuture.completedFuture(broker.get());
-        }).thenCompose(broker -> 
this.getBrokerRegistry().lookupAsync(broker).thenCompose(brokerLookupData -> {
-            if (brokerLookupData.isEmpty()) {
-                String errorMsg = String.format(
-                        "Failed to lookup broker:%s for bundle:%s, the broker 
has not been registered.",
-                        broker, bundle);
-                log.error(errorMsg);
-                throw new IllegalStateException(errorMsg);
+                return CompletableFuture.completedFuture(Optional.empty());
             }
-            return CompletableFuture.completedFuture(brokerLookupData);
-        }));
+            return 
this.getBrokerRegistry().lookupAsync(broker.get()).thenCompose(brokerLookupData 
-> {
+                if (brokerLookupData.isEmpty()) {
+                    String errorMsg = String.format(
+                            "Failed to lookup broker:%s for bundle:%s, the 
broker has not been registered.",
+                            broker, bundle);
+                    log.error(errorMsg);
+                    throw new IllegalStateException(errorMsg);
+                }
+                return CompletableFuture.completedFuture(brokerLookupData);
+            });
+        });
     }
 
     /**
@@ -562,7 +561,7 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
     public CompletableFuture<NamespaceEphemeralData> 
tryAcquiringOwnership(NamespaceBundle namespaceBundle) {
         log.info("Try acquiring ownership for bundle: {} - {}.", 
namespaceBundle, brokerRegistry.getBrokerId());
         final String bundle = namespaceBundle.toString();
-        return assign(Optional.empty(), namespaceBundle)
+        return assign(Optional.empty(), namespaceBundle, 
LookupOptions.builder().readOnly(false).build())
                 .thenApply(brokerLookupData -> {
                     if (brokerLookupData.isEmpty()) {
                         String errorMsg = String.format(
@@ -595,12 +594,12 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
         }
     }
 
-    public CompletableFuture<Optional<String>> selectAsync(ServiceUnitId 
bundle) {
-        return selectAsync(bundle, Collections.emptySet());
-    }
-
     public CompletableFuture<Optional<String>> selectAsync(ServiceUnitId 
bundle,
-                                                           Set<String> 
excludeBrokerSet) {
+                                                           Set<String> 
excludeBrokerSet,
+                                                           LookupOptions 
options) {
+        if (options.isReadOnly()) {
+            return CompletableFuture.completedFuture(Optional.empty());
+        }
         BrokerRegistry brokerRegistry = getBrokerRegistry();
         return brokerRegistry.getAvailableBrokerLookupDataAsync()
                 .thenComposeAsync(availableBrokers -> {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java
index cd1561cb70e..25eb27bc58d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java
@@ -28,10 +28,11 @@ import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.loadbalance.LoadManager;
 import org.apache.pulsar.broker.loadbalance.ResourceUnit;
-import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
 import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.broker.namespace.LookupOptions;
 import org.apache.pulsar.common.naming.ServiceUnitId;
 import org.apache.pulsar.common.stats.Metrics;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
 
 public class ExtensibleLoadManagerWrapper implements LoadManager {
@@ -62,9 +63,15 @@ public class ExtensibleLoadManagerWrapper implements 
LoadManager {
 
     @Override
     public CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(
-            Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
-        return loadManager.assign(topic, bundle)
-                .thenApply(lookupData -> 
lookupData.map(BrokerLookupData::toLookupResult));
+            Optional<ServiceUnitId> topic, ServiceUnitId bundle, LookupOptions 
options) {
+        return loadManager.assign(topic, bundle, options)
+                .thenApply(lookupData -> lookupData.map(data -> {
+                    try {
+                        return data.toLookupResult(options);
+                    } catch (PulsarServerException ex) {
+                        throw FutureUtil.wrapToCompletionException(ex);
+                    }
+                }));
     }
 
     @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index e355187af4b..bf6266482f8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -83,6 +83,7 @@ import 
org.apache.pulsar.broker.loadbalance.extensions.manager.StateChangeListen
 import org.apache.pulsar.broker.loadbalance.extensions.models.Split;
 import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
 import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
+import org.apache.pulsar.broker.namespace.LookupOptions;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.client.api.CompressionType;
@@ -1430,7 +1431,8 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
     private Optional<String> selectBroker(String serviceUnit, String 
inactiveBroker) {
         try {
             return loadManager.selectAsync(
-                    LoadManagerShared.getNamespaceBundle(pulsar, serviceUnit), 
Set.of(inactiveBroker))
+                    LoadManagerShared.getNamespaceBundle(pulsar, serviceUnit),
+                            Set.of(inactiveBroker), 
LookupOptions.builder().build())
                     .get(inFlightStateWaitingTimeInMillis, MILLISECONDS);
         } catch (Throwable e) {
             log.error("Failed to select a broker for serviceUnit:{}", 
serviceUnit);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupData.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupData.java
index 41f5b18e321..50a2b704040 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupData.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupData.java
@@ -18,9 +18,12 @@
  */
 package org.apache.pulsar.broker.loadbalance.extensions.data;
 
+import java.net.URI;
 import java.util.Map;
 import java.util.Optional;
+import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.broker.namespace.LookupOptions;
 import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
 import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
 import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
@@ -79,7 +82,19 @@ public record BrokerLookupData (String webServiceUrl,
         return this.startTimestamp;
     }
 
-    public LookupResult toLookupResult() {
+    public LookupResult toLookupResult(LookupOptions options) throws 
PulsarServerException {
+        if (options.hasAdvertisedListenerName()) {
+            AdvertisedListener listener = 
advertisedListeners.get(options.getAdvertisedListenerName());
+            if (listener == null) {
+                throw new PulsarServerException("the broker do not have "
+                        + options.getAdvertisedListenerName() + " listener");
+            }
+            URI url = listener.getBrokerServiceUrl();
+            URI urlTls = listener.getBrokerServiceUrlTls();
+            return new LookupResult(webServiceUrl, webServiceUrlTls,
+                    url == null ? null : url.toString(),
+                    urlTls == null ? null : urlTls.toString(), 
LookupResult.Type.BrokerUrl, false);
+        }
         return new LookupResult(webServiceUrl, webServiceUrlTls, 
pulsarServiceUrl, pulsarServiceUrlTls,
                 LookupResult.Type.BrokerUrl, false);
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 65081f2ea42..44cdd6368fe 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -227,7 +227,7 @@ public class NamespaceService implements AutoCloseable {
                             return 
CompletableFuture.completedFuture(optResult);
                         }
                         if 
(ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
-                            return 
loadManager.get().findBrokerServiceUrl(Optional.of(topic), bundle);
+                            return 
loadManager.get().findBrokerServiceUrl(Optional.of(topic), bundle, options);
                         } else {
                             // TODO: Add unit tests cover it.
                             return findBrokerServiceUrl(bundle, options);
@@ -353,7 +353,7 @@ public class NamespaceService implements AutoCloseable {
             }
             CompletableFuture<Optional<LookupResult>> future =
                     
ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)
-                    ? 
loadManager.get().findBrokerServiceUrl(Optional.ofNullable(topic), bundle) :
+                    ? 
loadManager.get().findBrokerServiceUrl(Optional.ofNullable(topic), bundle, 
options) :
                     findBrokerServiceUrl(bundle, options);
 
             return future.thenApply(lookupResult -> {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/AntiAffinityNamespaceGroupExtensionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/AntiAffinityNamespaceGroupExtensionTest.java
index d77490e1b82..cd653a964be 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/AntiAffinityNamespaceGroupExtensionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/AntiAffinityNamespaceGroupExtensionTest.java
@@ -38,6 +38,7 @@ import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateC
 import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
 import 
org.apache.pulsar.broker.loadbalance.extensions.filter.AntiAffinityGroupPolicyFilter;
 import 
org.apache.pulsar.broker.loadbalance.extensions.policies.AntiAffinityGroupPolicyHelper;
+import org.apache.pulsar.broker.namespace.LookupOptions;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
@@ -61,7 +62,8 @@ public class AntiAffinityNamespaceGroupExtensionTest extends 
AntiAffinityNamespa
 
     protected String selectBroker(ServiceUnitId serviceUnit, Object 
loadManager) {
         try {
-            return ((ExtensibleLoadManagerImpl) 
loadManager).assign(Optional.empty(), serviceUnit).get()
+            return ((ExtensibleLoadManagerImpl) loadManager)
+                    .assign(Optional.empty(), serviceUnit, 
LookupOptions.builder().build()).get()
                     .get().getPulsarServiceUrl();
         } catch (Throwable e) {
             throw new RuntimeException(e);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
index 651a544a04e..4f2c1ae6607 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
@@ -21,6 +21,8 @@ package org.apache.pulsar.broker.loadbalance.extensions;
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.spy;
 import com.google.common.collect.Sets;
+
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.commons.lang3.tuple.Pair;
@@ -69,6 +71,8 @@ public abstract class ExtensibleLoadManagerImplBaseTest 
extends MockedPulsarServ
         
conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
         conf.setLoadBalancerSheddingEnabled(false);
         conf.setLoadBalancerDebugModeEnabled(true);
+        conf.setWebServicePortTls(Optional.of(0));
+        conf.setBrokerServicePortTls(Optional.of(0));
         return conf;
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index b72ab77e814..a385b0d3c5c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -156,10 +156,12 @@ public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBase
     public void testAssignInternalTopic() throws Exception {
         Optional<BrokerLookupData> brokerLookupData1 = 
primaryLoadManager.assign(
                 Optional.of(TopicName.get(ServiceUnitStateChannelImpl.TOPIC)),
-                getBundleAsync(pulsar1, 
TopicName.get(ServiceUnitStateChannelImpl.TOPIC)).get()).get();
+                getBundleAsync(pulsar1, 
TopicName.get(ServiceUnitStateChannelImpl.TOPIC)).get(),
+                LookupOptions.builder().build()).get();
         Optional<BrokerLookupData> brokerLookupData2 = 
secondaryLoadManager.assign(
                 Optional.of(TopicName.get(ServiceUnitStateChannelImpl.TOPIC)),
-                getBundleAsync(pulsar1, 
TopicName.get(ServiceUnitStateChannelImpl.TOPIC)).get()).get();
+                getBundleAsync(pulsar1, 
TopicName.get(ServiceUnitStateChannelImpl.TOPIC)).get(),
+                LookupOptions.builder().build()).get();
         assertEquals(brokerLookupData1, brokerLookupData2);
         assertTrue(brokerLookupData1.isPresent());
 
@@ -167,7 +169,7 @@ public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBase
                 FieldUtils.readField(channel1, "leaderElectionService", true);
         Optional<LeaderBroker> currentLeader = 
leaderElectionService.getCurrentLeader();
         assertTrue(currentLeader.isPresent());
-        assertEquals(brokerLookupData1.get().getWebServiceUrl(), 
currentLeader.get().getServiceUrl());
+        assertEquals(brokerLookupData1.get().getWebServiceUrlTls(), 
currentLeader.get().getServiceUrl());
     }
 
     @Test
@@ -175,15 +177,17 @@ public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBase
         Pair<TopicName, NamespaceBundle> topicAndBundle = 
getBundleIsNotOwnByChangeEventTopic("test-assign");
         TopicName topicName = topicAndBundle.getLeft();
         NamespaceBundle bundle = topicAndBundle.getRight();
-        Optional<BrokerLookupData> brokerLookupData = 
primaryLoadManager.assign(Optional.empty(), bundle).get();
+        Optional<BrokerLookupData> brokerLookupData = 
primaryLoadManager.assign(Optional.empty(), bundle,
+                LookupOptions.builder().build()).get();
         assertTrue(brokerLookupData.isPresent());
         log.info("Assign the bundle {} to {}", bundle, brokerLookupData);
         // Should get owner info from channel.
-        Optional<BrokerLookupData> brokerLookupData1 = 
secondaryLoadManager.assign(Optional.empty(), bundle).get();
+        Optional<BrokerLookupData> brokerLookupData1 = 
secondaryLoadManager.assign(Optional.empty(), bundle,
+                LookupOptions.builder().build()).get();
         assertEquals(brokerLookupData, brokerLookupData1);
 
         Optional<LookupResult> lookupResult = pulsar2.getNamespaceService()
-                .getBrokerServiceUrlAsync(topicName, null).get();
+                .getBrokerServiceUrlAsync(topicName, 
LookupOptions.builder().build()).get();
         assertTrue(lookupResult.isPresent());
         assertEquals(lookupResult.get().getLookupData().getHttpUrl(), 
brokerLookupData.get().getWebServiceUrl());
 
@@ -193,6 +197,43 @@ public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBase
         assertEquals(webServiceUrl.get().toString(), 
brokerLookupData.get().getWebServiceUrl());
     }
 
+    @Test
+    public void testLookupOptions() throws Exception {
+        Pair<TopicName, NamespaceBundle> topicAndBundle =
+                getBundleIsNotOwnByChangeEventTopic("test-lookup-options");
+        TopicName topicName = topicAndBundle.getLeft();
+        NamespaceBundle bundle = topicAndBundle.getRight();
+
+        admin.topics().createPartitionedTopic(topicName.toString(), 1);
+
+        // Test LookupOptions.readOnly = true when the bundle is not owned by 
any broker.
+        Optional<URL> webServiceUrlReadOnlyTrue = pulsar1.getNamespaceService()
+                .getWebServiceUrl(bundle, 
LookupOptions.builder().readOnly(true).requestHttps(false).build());
+        assertTrue(webServiceUrlReadOnlyTrue.isEmpty());
+
+        // Test LookupOptions.readOnly = false and the bundle assign to some 
broker.
+        Optional<URL> webServiceUrlReadOnlyFalse = 
pulsar1.getNamespaceService()
+                .getWebServiceUrl(bundle, 
LookupOptions.builder().readOnly(false).requestHttps(false).build());
+        assertTrue(webServiceUrlReadOnlyFalse.isPresent());
+
+        // Test LookupOptions.requestHttps = true
+        Optional<URL> webServiceUrlHttps = pulsar2.getNamespaceService()
+                .getWebServiceUrl(bundle, 
LookupOptions.builder().requestHttps(true).build());
+        assertTrue(webServiceUrlHttps.isPresent());
+        assertTrue(webServiceUrlHttps.get().toString().startsWith("https"));
+
+        // TODO: Support LookupOptions.loadTopicsInBundle = true
+
+        // Test LookupOptions.advertisedListenerName = internal but the broker 
do not have internal listener.
+        try {
+            pulsar2.getNamespaceService()
+                    .getWebServiceUrl(bundle, 
LookupOptions.builder().advertisedListenerName("internal").build());
+            fail();
+        } catch (Exception e) {
+            assertTrue(e.getMessage().contains("the broker do not have 
internal listener"));
+        }
+    }
+
     @Test
     public void testCheckOwnershipAsync() throws Exception {
         Pair<TopicName, NamespaceBundle> topicAndBundle = 
getBundleIsNotOwnByChangeEventTopic("test-check-ownership");
@@ -210,7 +251,7 @@ public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBase
         assertFalse(secondaryLoadManager.checkOwnershipAsync(Optional.empty(), 
bundle).get());
 
         // 2. Assign the bundle to a broker.
-        Optional<BrokerLookupData> lookupData = 
primaryLoadManager.assign(Optional.empty(), bundle).get();
+        Optional<BrokerLookupData> lookupData = 
primaryLoadManager.assign(Optional.empty(), bundle, 
LookupOptions.builder().build()).get();
         assertTrue(lookupData.isPresent());
         if 
(lookupData.get().getPulsarServiceUrl().equals(pulsar1.getBrokerServiceUrl())) {
             
assertTrue(primaryLoadManager.checkOwnershipAsync(Optional.empty(), 
bundle).get());
@@ -243,7 +284,7 @@ public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBase
 
         })).when(primaryLoadManager).getBrokerFilterPipeline();
 
-        Optional<BrokerLookupData> brokerLookupData = 
primaryLoadManager.assign(Optional.empty(), bundle).get();
+        Optional<BrokerLookupData> brokerLookupData = 
primaryLoadManager.assign(Optional.empty(), bundle, 
LookupOptions.builder().build()).get();
         assertTrue(brokerLookupData.isPresent());
         assertEquals(brokerLookupData.get().getWebServiceUrl(), 
pulsar2.getWebServiceAddress());
     }
@@ -263,7 +304,7 @@ public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBase
             }
         })).when(primaryLoadManager).getBrokerFilterPipeline();
 
-        Optional<BrokerLookupData> brokerLookupData = 
primaryLoadManager.assign(Optional.empty(), bundle).get();
+        Optional<BrokerLookupData> brokerLookupData = 
primaryLoadManager.assign(Optional.empty(), bundle, 
LookupOptions.builder().build()).get();
         assertTrue(brokerLookupData.isPresent());
     }
 
@@ -272,7 +313,7 @@ public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBase
         TopicName topicName =
                 TopicName.get("public/test/testUnloadUponTopicLookupFailure");
         NamespaceBundle bundle = 
pulsar1.getNamespaceService().getBundle(topicName);
-        primaryLoadManager.assign(Optional.empty(), bundle).get();
+        primaryLoadManager.assign(Optional.empty(), bundle, 
LookupOptions.builder().build()).get();
 
         CompletableFuture future1 = new CompletableFuture();
         CompletableFuture future2 = new CompletableFuture();
@@ -869,7 +910,7 @@ public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBase
                 return FutureUtil.failedFuture(new 
BrokerFilterException("Test"));
             }
         })).when(primaryLoadManager).getBrokerFilterPipeline();
-        Optional<BrokerLookupData> brokerLookupData = 
primaryLoadManager.assign(Optional.empty(), bundle).get();
+        Optional<BrokerLookupData> brokerLookupData = 
primaryLoadManager.assign(Optional.empty(), bundle, 
LookupOptions.builder().build()).get();
         Awaitility.waitAtMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
             assertTrue(brokerLookupData.isPresent());
             assertEquals(brokerLookupData.get().getWebServiceUrl(), 
pulsar2.getWebServiceAddress());
@@ -1564,7 +1605,7 @@ public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBase
         String topic = "persistent://" + defaultTestNamespace + 
"/test-get-owned-service-units";
         admin.topics().createPartitionedTopic(topic, 1);
         NamespaceBundle bundle = getBundleAsync(pulsar1, 
TopicName.get(topic)).join();
-        CompletableFuture<Optional<BrokerLookupData>> owner = 
primaryLoadManager.assign(Optional.empty(), bundle);
+        CompletableFuture<Optional<BrokerLookupData>> owner = 
primaryLoadManager.assign(Optional.empty(), bundle, 
LookupOptions.builder().build());
         assertFalse(owner.join().isEmpty());
 
         BrokerLookupData brokerLookupData = owner.join().get();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index fe8387710ee..1076f92037f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -502,7 +502,7 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
 
         // recovered, check the monitor update state : Assigned -> Owned
         doReturn(CompletableFuture.completedFuture(Optional.of(brokerId1)))
-                .when(loadManager).selectAsync(any(), any());
+                .when(loadManager).selectAsync(any(), any(), any());
         FieldUtils.writeDeclaredField(channel2, "producer", producer, true);
         FieldUtils.writeDeclaredField(channel1,
                 "inFlightStateWaitingTimeInMillis", 1 , true);
@@ -724,7 +724,7 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         var owner1 = channel1.getOwnerAsync(bundle1);
         var owner2 = channel2.getOwnerAsync(bundle2);
         doReturn(CompletableFuture.completedFuture(Optional.of(brokerId2)))
-                .when(loadManager).selectAsync(any(), any());
+                .when(loadManager).selectAsync(any(), any(), any());
         assertTrue(owner1.get().isEmpty());
         assertTrue(owner2.get().isEmpty());
 
@@ -1126,7 +1126,7 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         FieldUtils.writeDeclaredField(channel2,
                 "inFlightStateWaitingTimeInMillis", 3 * 1000, true);
         doReturn(CompletableFuture.completedFuture(Optional.of(brokerId2)))
-                .when(loadManager).selectAsync(any(), any());
+                .when(loadManager).selectAsync(any(), any(), any());
         channel1.publishAssignEventAsync(bundle, brokerId2);
         // channel1 is broken. the assign won't be complete.
         waitUntilState(channel1, bundle);
@@ -1525,7 +1525,7 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
 
         // test stable metadata state
         doReturn(CompletableFuture.completedFuture(Optional.of(brokerId2)))
-                .when(loadManager).selectAsync(any(), any());
+                .when(loadManager).selectAsync(any(), any(), any());
         leaderChannel.handleMetadataSessionEvent(SessionReestablished);
         followerChannel.handleMetadataSessionEvent(SessionReestablished);
         FieldUtils.writeDeclaredField(leaderChannel, 
"lastMetadataSessionEventTimestamp",
@@ -1590,7 +1590,7 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
 
         // test stable metadata state
         doReturn(CompletableFuture.completedFuture(Optional.of(brokerId2)))
-                .when(loadManager).selectAsync(any(), any());
+                .when(loadManager).selectAsync(any(), any(), any());
         FieldUtils.writeDeclaredField(leaderChannel, 
"inFlightStateWaitingTimeInMillis",
                 -1, true);
         FieldUtils.writeDeclaredField(followerChannel, 
"inFlightStateWaitingTimeInMillis",
@@ -1645,7 +1645,7 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
 
         // simulate ownership cleanup(no selected owner) by the leader channel
         doReturn(CompletableFuture.completedFuture(Optional.empty()))
-                .when(loadManager).selectAsync(any(), any());
+                .when(loadManager).selectAsync(any(), any(), any());
         var leaderChannel = channel1;
         String leader1 = channel1.getChannelOwnerAsync().get(2, 
TimeUnit.SECONDS).get();
         String leader2 = channel2.getChannelOwnerAsync().get(2, 
TimeUnit.SECONDS).get();
@@ -1669,7 +1669,7 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         overrideTableViews(bundle,
                 new ServiceUnitStateData(Owned, broker, null, 1));
         doReturn(CompletableFuture.completedFuture(Optional.of(brokerId1)))
-                .when(loadManager).selectAsync(any(), any());
+                .when(loadManager).selectAsync(any(), any(), any());
         leaderChannel.handleMetadataSessionEvent(SessionReestablished);
         FieldUtils.writeDeclaredField(leaderChannel, 
"lastMetadataSessionEventTimestamp",
                 System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS 
* 1000 + 1000), true);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupDataTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupDataTest.java
index 0d874e0f771..66e8c917d1f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupDataTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupDataTest.java
@@ -18,13 +18,19 @@
  */
 package org.apache.pulsar.broker.loadbalance.extensions.data;
 
+import static org.testng.Assert.fail;
 import static org.testng.AssertJUnit.assertEquals;
 import static org.testng.AssertJUnit.assertTrue;
+
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
+import org.apache.pulsar.broker.PulsarServerException;
 import 
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
 import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.broker.namespace.LookupOptions;
 import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
 import org.testng.annotations.Test;
 
@@ -32,12 +38,20 @@ import org.testng.annotations.Test;
 public class BrokerLookupDataTest {
 
     @Test
-    public void testConstructors() {
+    public void testConstructors() throws PulsarServerException, 
URISyntaxException {
         String webServiceUrl = "http://localhost:8080";;
         String webServiceUrlTls = "https://localhoss:8081";;
         String pulsarServiceUrl = "pulsar://localhost:6650";
         String pulsarServiceUrlTls = "pulsar+ssl://localhost:6651";
-        Map<String, AdvertisedListener> advertisedListeners = new HashMap<>();
+        final String listenerUrl = "pulsar://gateway:7000";
+        final String listenerUrlTls = "pulsar://gateway:8000";
+        final String listener = "internal";
+        Map<String, AdvertisedListener> advertisedListeners = new HashMap<>(){{
+            put(listener, AdvertisedListener.builder()
+                    .brokerServiceUrl(new URI(listenerUrl))
+                    .brokerServiceUrlTls(new URI(listenerUrlTls))
+                    .build());
+        }};
         Map<String, String> protocols = new HashMap<>(){{
             put("kafka", "9092");
         }};
@@ -56,10 +70,22 @@ public class BrokerLookupDataTest {
         assertEquals("3.0", lookupData.brokerVersion());
 
 
-        LookupResult lookupResult = lookupData.toLookupResult();
+        LookupResult lookupResult = 
lookupData.toLookupResult(LookupOptions.builder().build());
         assertEquals(webServiceUrl, lookupResult.getLookupData().getHttpUrl());
         assertEquals(webServiceUrlTls, 
lookupResult.getLookupData().getHttpUrlTls());
         assertEquals(pulsarServiceUrl, 
lookupResult.getLookupData().getBrokerUrl());
         assertEquals(pulsarServiceUrlTls, 
lookupResult.getLookupData().getBrokerUrlTls());
+
+        try {
+            
lookupData.toLookupResult(LookupOptions.builder().advertisedListenerName("others").build());
+            fail();
+        } catch (PulsarServerException ex) {
+            assertTrue(ex.getMessage().contains("the broker do not have others 
listener"));
+        }
+        lookupResult = 
lookupData.toLookupResult(LookupOptions.builder().advertisedListenerName(listener).build());
+        assertEquals(listenerUrl, lookupResult.getLookupData().getBrokerUrl());
+        assertEquals(listenerUrlTls, 
lookupResult.getLookupData().getBrokerUrlTls());
+        assertEquals(webServiceUrl, lookupResult.getLookupData().getHttpUrl());
+        assertEquals(webServiceUrlTls, 
lookupResult.getLookupData().getHttpUrlTls());
     }
 }
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
index 1f29e19f018..ee7497010ad 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
@@ -403,9 +403,10 @@ public class ExtensibleLoadManagerTest extends 
TestRetrySupport {
                 () -> {
                     try {
                         admin.lookups().lookupTopicAsync(topic).get(5, 
TimeUnit.SECONDS);
+                        fail();
                     } catch (Exception ex) {
                         log.error("Failed to lookup topic: ", ex);
-                        assertThat(ex.getMessage()).contains("Failed to select 
the new owner broker for bundle");
+                        assertThat(ex.getMessage()).contains("Service 
Unavailable");
                     }
                 }
         );


Reply via email to