This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9a97c843a46 [feat][broker] PIP-368: Support lookup based on the lookup
properties (#23223)
9a97c843a46 is described below
commit 9a97c843a46e23a0811e2172991cd00a3af642c0
Author: Zike Yang <[email protected]>
AuthorDate: Wed Aug 28 09:56:05 2024 +0800
[feat][broker] PIP-368: Support lookup based on the lookup properties
(#23223)
PIP: https://github.com/apache/pulsar/pull/23075
### Motivation
This is the implementation for the PIP:
https://github.com/apache/pulsar/pull/23075
Currently, the lookup process uses only the topic name as its parameter.
However, to enhance this process, it's
beneficial for clients to provide additional information. This could be
done by introducing the `lookupProperties` field
in the client configuration. Clients can then share these properties with
the broker during lookup.
On the broker side, the broker could also contain some properties that are
used for the lookup. We can also support the
lookupProperties for the broker. The broker can use these properties to
make a better decision on which broker to
return.
Here is the rack-aware lookup scenario for using the client properties for
the lookup:
Assuming there are two brokers that broker-0 configures the lookup property
"rack" with "A" and broker-1 configures the
lookup property "rack" with "B". By using the lookup properties, clients
can supply rack information during the lookup,
enabling the broker to identify and connect them to the nearest broker
within the same rack. If a client that configures
the "rack" property with "A" connects to a lookup broker, the customized
load manager can determine broker-0 as the
owner broker since the broker and the client have the same rack property.
### Modifications
- Add new configuration `lookupProperties` to the client. While looking up
the broker, the client will send the properties
to the broker through `CommandLookupTopic` request.
- Add `properties` field to the `CommandLookupTopic`.
- Add `lookupProperties` to the `LookupOptions`. The Load Manager
implementation can access
the `properties` through `LookupOptions` to make a better decision on which
broker to return.
- Introduce a new broker configuration `lookupPropertyPrefix`. Any broker
configuration properties that start with the `lookupPropertyPrefix`
will be included into the `BrokerLookupData` and be persisted in the
metadata store. The broker can use these properties
during the lookup.
Co-authored-by: Yunze Xu <[email protected]>
---
.../apache/pulsar/broker/ServiceConfiguration.java | 19 ++++
.../loadbalance/extensions/BrokerRegistryImpl.java | 3 +-
.../extensions/data/BrokerLookupData.java | 3 +-
.../pulsar/broker/lookup/TopicLookupBase.java | 8 +-
.../pulsar/broker/namespace/LookupOptions.java | 2 +
.../apache/pulsar/broker/service/ServerCnx.java | 13 ++-
.../extensions/data/BrokerLookupDataTest.java | 4 +-
.../extensions/filter/BrokerFilterTestBase.java | 3 +-
.../filter/BrokerIsolationPoliciesFilterTest.java | 3 +-
.../extensions/manager/RedirectManagerTest.java | 4 +-
.../extensions/scheduler/TransferShedderTest.java | 3 +-
.../pulsar/client/api/LookupPropertiesTest.java | 110 +++++++++++++++++++++
.../common/naming/ServiceConfigurationTest.java | 14 +++
.../apache/pulsar/client/api/ClientBuilder.java | 12 +++
.../client/impl/BinaryProtoLookupService.java | 3 +-
.../pulsar/client/impl/ClientBuilderImpl.java | 6 ++
.../client/impl/conf/ClientConfigurationData.java | 11 +++
.../client/impl/BinaryProtoLookupServiceTest.java | 2 +
.../apache/pulsar/common/protocol/Commands.java | 8 +-
pulsar-common/src/main/proto/PulsarApi.proto | 2 +
20 files changed, 220 insertions(+), 13 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index c836879b075..6488ace991e 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -21,9 +21,11 @@ package org.apache.pulsar.broker;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
@@ -2946,6 +2948,13 @@ public class ServiceConfiguration implements
PulsarConfiguration {
@com.fasterxml.jackson.annotation.JsonIgnore
private Properties properties = new Properties();
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "The properties whose name starts with this prefix will be
uploaded to the metadata store for "
+ + " the topic lookup"
+ )
+ private String lookupPropertyPrefix = "lookup.";
+
@FieldContext(
dynamic = true,
category = CATEGORY_SERVER,
@@ -3743,4 +3752,14 @@ public class ServiceConfiguration implements
PulsarConfiguration {
public boolean isSystemTopicAndTopicLevelPoliciesEnabled() {
return topicLevelPoliciesEnabled && systemTopicEnabled;
}
+
+ public Map<String, String> lookupProperties() {
+ final var map = new HashMap<String, String>();
+ properties.forEach((key, value) -> {
+ if (key instanceof String && value instanceof String && ((String)
key).startsWith(lookupPropertyPrefix)) {
+ map.put((String) key, (String) value);
+ }
+ });
+ return map;
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
index 18e30ddf922..5db11d40c33 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
@@ -94,7 +94,8 @@ public class BrokerRegistryImpl implements BrokerRegistry {
pulsar.getConfiguration().isEnableNonPersistentTopics(),
conf.getLoadManagerClassName(),
System.currentTimeMillis(),
- pulsar.getBrokerVersion());
+ pulsar.getBrokerVersion(),
+ pulsar.getConfig().lookupProperties());
this.state = State.Init;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupData.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupData.java
index 50a2b704040..5d982076bd6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupData.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupData.java
@@ -41,7 +41,8 @@ public record BrokerLookupData (String webServiceUrl,
boolean nonPersistentTopicsEnabled,
String loadManagerClassName,
long startTimestamp,
- String brokerVersion) implements
ServiceLookupData {
+ String brokerVersion,
+ Map<String, String> properties) implements
ServiceLookupData {
@Override
public String getWebServiceUrl() {
return this.webServiceUrl();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
index 9a05c3d992a..42f145d32aa 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
@@ -24,6 +24,8 @@ import io.netty.buffer.ByteBuf;
import java.net.InetAddress;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import javax.ws.rs.Encoded;
@@ -180,7 +182,7 @@ public class TopicLookupBase extends PulsarWebResource {
public static CompletableFuture<ByteBuf> lookupTopicAsync(PulsarService
pulsarService, TopicName topicName,
boolean authoritative, String clientAppId,
AuthenticationDataSource authenticationData, long requestId) {
return lookupTopicAsync(pulsarService, topicName, authoritative,
clientAppId,
- authenticationData, requestId, null);
+ authenticationData, requestId, null, Collections.emptyMap());
}
/**
@@ -208,7 +210,8 @@ public class TopicLookupBase extends PulsarWebResource {
public static CompletableFuture<ByteBuf> lookupTopicAsync(PulsarService
pulsarService, TopicName topicName,
boolean
authoritative, String clientAppId,
AuthenticationDataSource authenticationData,
- long requestId,
final String advertisedListenerName) {
+ long requestId,
final String advertisedListenerName,
+ Map<String,
String> properties) {
final CompletableFuture<ByteBuf> validationFuture = new
CompletableFuture<>();
final CompletableFuture<ByteBuf> lookupfuture = new
CompletableFuture<>();
@@ -299,6 +302,7 @@ public class TopicLookupBase extends PulsarWebResource {
.authoritative(authoritative)
.advertisedListenerName(advertisedListenerName)
.loadTopicsInBundle(true)
+ .properties(properties)
.build();
pulsarService.getNamespaceService().getBrokerServiceUrlAsync(topicName, options)
.thenAccept(lookupResult -> {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/LookupOptions.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/LookupOptions.java
index 431266682c5..be545064632 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/LookupOptions.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/LookupOptions.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.namespace;
+import java.util.Map;
import lombok.Builder;
import lombok.Data;
import org.apache.commons.lang3.StringUtils;
@@ -46,6 +47,7 @@ public class LookupOptions {
private final boolean requestHttps;
private final String advertisedListenerName;
+ private final Map<String, String> properties;
public boolean hasAdvertisedListenerName() {
return StringUtils.isNotBlank(advertisedListenerName);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 2f9e9b2a1ac..d1fe9776e07 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -47,6 +47,7 @@ import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
@@ -544,9 +545,19 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
isTopicOperationAllowed(topicName, TopicOperation.LOOKUP,
authenticationData, originalAuthData).thenApply(
isAuthorized -> {
if (isAuthorized) {
+ final Map<String, String> properties;
+ if (lookup.getPropertiesCount() > 0) {
+ properties = new HashMap<>();
+ for (int i = 0; i < lookup.getPropertiesCount(); i++) {
+ final var keyValue = lookup.getPropertyAt(i);
+ properties.put(keyValue.getKey(),
keyValue.getValue());
+ }
+ } else {
+ properties = Collections.emptyMap();
+ }
lookupTopicAsync(getBrokerService().pulsar(), topicName,
authoritative,
getPrincipal(), getAuthenticationData(),
- requestId,
advertisedListenerName).handle((lookupResponse, ex) -> {
+ requestId, advertisedListenerName,
properties).handle((lookupResponse, ex) -> {
if (ex == null) {
writeAndFlush(lookupResponse);
} else {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupDataTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupDataTest.java
index 66e8c917d1f..0a9742fd761 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupDataTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupDataTest.java
@@ -24,6 +24,7 @@ import static org.testng.AssertJUnit.assertTrue;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@@ -58,7 +59,8 @@ public class BrokerLookupDataTest {
BrokerLookupData lookupData = new BrokerLookupData(
webServiceUrl, webServiceUrlTls, pulsarServiceUrl,
pulsarServiceUrlTls, advertisedListeners, protocols, true,
true,
- ExtensibleLoadManagerImpl.class.getName(),
System.currentTimeMillis(),"3.0");
+ ExtensibleLoadManagerImpl.class.getName(),
System.currentTimeMillis(),"3.0",
+ Collections.emptyMap());
assertEquals(webServiceUrl, lookupData.webServiceUrl());
assertEquals(webServiceUrlTls, lookupData.webServiceUrlTls());
assertEquals(pulsarServiceUrl, lookupData.pulsarServiceUrl());
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilterTestBase.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilterTestBase.java
index a120ef473e9..ab0065e0aa5 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilterTestBase.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilterTestBase.java
@@ -22,6 +22,7 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import java.io.IOException;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@@ -136,6 +137,6 @@ public class BrokerFilterTestBase {
return new BrokerLookupData(
webServiceUrl, webServiceUrlTls, pulsarServiceUrl,
pulsarServiceUrlTls, advertisedListeners, protocols, true,
true,
- loadManagerClassName, -1, version);
+ loadManagerClassName, -1, version, Collections.emptyMap());
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilterTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilterTest.java
index 87aaf4bac7f..d3553bd25d1 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilterTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilterTest.java
@@ -28,6 +28,7 @@ import static org.mockito.Mockito.reset;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@@ -218,7 +219,7 @@ public class BrokerIsolationPoliciesFilterTest {
webServiceUrl, webServiceUrlTls, pulsarServiceUrl,
pulsarServiceUrlTls, advertisedListeners, protocols,
persistentTopicsEnabled, nonPersistentTopicsEnabled,
- ExtensibleLoadManagerImpl.class.getName(),
System.currentTimeMillis(), "3.0.0");
+ ExtensibleLoadManagerImpl.class.getName(),
System.currentTimeMillis(), "3.0.0", Collections.emptyMap());
}
public LoadManagerContext getContext() {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManagerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManagerTest.java
index cbf77b59d5a..f2e9cf86868 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManagerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManagerTest.java
@@ -33,6 +33,8 @@ import
org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
import org.testng.annotations.Test;
+
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@@ -106,6 +108,6 @@ public class RedirectManagerTest {
return new BrokerLookupData(
webServiceUrl, webServiceUrlTls, pulsarServiceUrl,
pulsarServiceUrlTls, advertisedListeners, protocols, true,
true,
- loadManagerClassName, startTimeStamp, "3.0.0");
+ loadManagerClassName, startTimeStamp, "3.0.0",
Collections.emptyMap());
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
index efca2880949..48bef15b5f8 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
@@ -46,6 +46,7 @@ import com.google.common.collect.Range;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -697,7 +698,7 @@ public class TransferShedderTest {
webServiceUrl, webServiceUrlTls, pulsarServiceUrl,
pulsarServiceUrlTls, advertisedListeners, protocols,
true, true,
- conf.getLoadManagerClassName(), System.currentTimeMillis(),
"3.0.0");
+ conf.getLoadManagerClassName(), System.currentTimeMillis(),
"3.0.0", Collections.emptyMap());
}
private void setIsolationPolicies(SimpleResourceAllocationPolicies
policies,
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/LookupPropertiesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/LookupPropertiesTest.java
new file mode 100644
index 00000000000..cb8b2d1e526
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/LookupPropertiesTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.MultiBrokerBaseTest;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
+import org.apache.pulsar.broker.namespace.LookupOptions;
+import org.apache.pulsar.client.impl.PartitionedProducerImpl;
+import org.apache.pulsar.client.impl.ProducerImpl;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker-api")
+public class LookupPropertiesTest extends MultiBrokerBaseTest {
+
+ private static final String BROKER_KEY = "lookup.broker.id";
+ private static final String CLIENT_KEY = "broker.id";
+
+ @Override
+ protected void startBroker() throws Exception {
+ addCustomConfigs(conf, 0);
+ super.startBroker();
+ }
+
+ @Override
+ protected ServiceConfiguration createConfForAdditionalBroker(int
additionalBrokerIndex) {
+ return addCustomConfigs(getDefaultConf(), additionalBrokerIndex + 10);
+ }
+
+ private static ServiceConfiguration addCustomConfigs(ServiceConfiguration
config, int index) {
+ config.setDefaultNumberOfNamespaceBundles(16);
+ config.setLoadBalancerAutoBundleSplitEnabled(false);
+
config.setLoadManagerClassName(BrokerIdAwareLoadManager.class.getName());
+
config.setLoadBalancerAverageResourceUsageDifferenceThresholdPercentage(100);
+ config.setLoadBalancerDebugModeEnabled(true);
+ config.setBrokerShutdownTimeoutMs(1000);
+ final var properties = new Properties();
+ properties.setProperty(BROKER_KEY, "broker-" + index);
+ config.setProperties(properties);
+ return config;
+ }
+
+ @Test
+ public void testLookupProperty() throws Exception {
+ final var topic = "test-lookup-property";
+ admin.topics().createPartitionedTopic(topic, 16);
+ @Cleanup final var client = (PulsarClientImpl) PulsarClient.builder()
+ .serviceUrl(pulsar.getBrokerServiceUrl())
+ .lookupProperties(
+ Collections.singletonMap(CLIENT_KEY, "broker-10")) //
broker-10 refers to additionalBrokers[0]
+ .build();
+ @Cleanup final var producer = (PartitionedProducerImpl<byte[]>)
client.newProducer().topic(topic).create();
+ Assert.assertNotNull(producer);
+ final var connections =
producer.getProducers().stream().map(ProducerImpl::getClientCnx)
+ .collect(Collectors.toSet());
+ Assert.assertEquals(connections.size(), 1);
+ final var port = ((InetSocketAddress)
connections.stream().findAny().orElseThrow().ctx().channel()
+ .remoteAddress()).getPort();
+ Assert.assertEquals(port,
additionalBrokers.get(0).getBrokerListenPort().orElseThrow());
+ }
+
+ public static class BrokerIdAwareLoadManager extends
ExtensibleLoadManagerImpl {
+ @Override
+ public CompletableFuture<Optional<String>> selectAsync(ServiceUnitId
bundle, Set<String> excludeBrokerSet,
+ LookupOptions
options) {
+ final var clientId = options.getProperties() == null ? null :
options.getProperties().get(CLIENT_KEY);
+ if (clientId == null) {
+ return super.selectAsync(bundle, excludeBrokerSet, options);
+ }
+ return
getBrokerRegistry().getAvailableBrokerLookupDataAsync().thenCompose(brokerLookupDataMap
-> {
+ final var optBroker =
brokerLookupDataMap.entrySet().stream().filter(entry -> {
+ final var brokerId =
entry.getValue().properties().get(BROKER_KEY);
+ return brokerId != null && brokerId.equals(clientId);
+ }).findAny();
+ return
optBroker.map(Map.Entry::getKey).map(Optional::of).map(CompletableFuture::completedFuture)
+ .orElseGet(() -> super.selectAsync(bundle,
excludeBrokerSet, options));
+ });
+ }
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
index 77bb36eb68d..5972c6f724d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
@@ -35,6 +35,7 @@ import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
@@ -393,4 +394,17 @@ public class ServiceConfigurationTest {
assertEquals(conf.getMaxSecondsToClearTopicNameCache(), 2);
assertEquals(conf.getTopicNameCacheMaxCapacity(), 100);
}
+
+ @Test
+ public void testLookupProperties() throws Exception {
+ var confFile = "lookup.key1=value1\nkey=value\nlookup.key2=value2";
+ var conf = (ServiceConfiguration) PulsarConfigurationLoader.create(
+ new ByteArrayInputStream(confFile.getBytes()),
ServiceConfiguration.class);
+ assertEquals(conf.lookupProperties(), Map.of("lookup.key1", "value1",
"lookup.key2", "value2"));
+
+ confFile = confFile + "\nlookupPropertyPrefix=lookup.key2";
+ conf = PulsarConfigurationLoader.create(new
ByteArrayInputStream(confFile.getBytes()),
+ ServiceConfiguration.class);
+ assertEquals(conf.lookupProperties(), Map.of("lookup.key2", "value2"));
+ }
}
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
index 4adf7d89b0e..73ad555165c 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
@@ -668,4 +668,16 @@ public interface ClientBuilder extends Serializable,
Cloneable {
* @return the client builder instance
*/
ClientBuilder autoCertRefreshSeconds(int autoCertRefreshSeconds);
+
+ /**
+ * Set the properties used for topic lookup.
+ * <p>
+ * When the broker performs topic lookup, these lookup properties will be
taken into consideration in a customized
+ * load manager.
+ * <p>
+ * Note: The lookup properties are only used in topic lookup when:
+ * - The protocol is binary protocol, i.e. the service URL starts with
"pulsar://" or "pulsar+ssl://"
+ * - The `loadManagerClassName` config in broker is a class that
implements the `ExtensibleLoadManager` interface
+ */
+ ClientBuilder lookupProperties(Map<String, String> properties);
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
index 6ee6fafde1c..9dd04acce7e 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
@@ -178,7 +178,8 @@ public class BinaryProtoLookupService implements
LookupService {
client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx
-> {
long requestId = client.newRequestId();
- ByteBuf request = Commands.newLookup(topicName.toString(),
listenerName, authoritative, requestId);
+ ByteBuf request = Commands.newLookup(topicName.toString(),
listenerName, authoritative, requestId,
+ client.getConfiguration().getLookupProperties());
clientCnx.newLookup(request, requestId).whenComplete((r, t) -> {
if (t != null) {
// lookup failed
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
index d9edc53b50e..69232186767 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
@@ -476,4 +476,10 @@ public class ClientBuilderImpl implements ClientBuilder {
conf.setDescription(description);
return this;
}
+
+ @Override
+ public ClientBuilder lookupProperties(Map<String, String> properties) {
+ conf.setLookupProperties(properties);
+ return this;
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
index e2713644af6..c1c2e759255 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
@@ -28,6 +28,7 @@ import java.net.InetSocketAddress;
import java.net.URI;
import java.time.Clock;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -412,6 +413,8 @@ public class ClientConfigurationData implements
Serializable, Cloneable {
)
private String description;
+ private Map<String, String> lookupProperties;
+
private transient OpenTelemetry openTelemetry;
/**
@@ -477,4 +480,12 @@ public class ClientConfigurationData implements
Serializable, Cloneable {
public String getSocks5ProxyPassword() {
return Objects.nonNull(socks5ProxyPassword) ? socks5ProxyPassword :
System.getProperty("socks5Proxy.password");
}
+
+ public void setLookupProperties(Map<String, String> lookupProperties) {
+ this.lookupProperties = Collections.unmodifiableMap(lookupProperties);
+ }
+
+ public Map<String, String> getLookupProperties() {
+ return (lookupProperties == null) ? Collections.emptyMap() :
Collections.unmodifiableMap(lookupProperties);
+ }
}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java
index 983cd21a7a9..f691215b04e 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java
@@ -70,6 +70,8 @@ public class BinaryProtoLookupServiceTest {
doReturn(cnxPool).when(client).getCnxPool();
doReturn(clientConfig).when(client).getConfiguration();
doReturn(1L).when(client).newRequestId();
+ ClientConfigurationData data = new ClientConfigurationData();
+ doReturn(data).when(client).getConfiguration();
lookup = spy(
new BinaryProtoLookupService(client,
"pulsar://localhost:6650", false, mock(ExecutorService.class)));
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 224e093baf1..3fb2fd5ad3d 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -936,10 +936,11 @@ public class Commands {
}
public static ByteBuf newLookup(String topic, boolean authoritative, long
requestId) {
- return newLookup(topic, null, authoritative, requestId);
+ return newLookup(topic, null, authoritative, requestId, null);
}
- public static ByteBuf newLookup(String topic, String listenerName, boolean
authoritative, long requestId) {
+ public static ByteBuf newLookup(String topic, String listenerName, boolean
authoritative, long requestId,
+ Map<String, String> properties) {
BaseCommand cmd = localCmd(Type.LOOKUP);
CommandLookupTopic lookup = cmd.setLookupTopic()
.setTopic(topic)
@@ -948,6 +949,9 @@ public class Commands {
if (StringUtils.isNotBlank(listenerName)) {
lookup.setAdvertisedListenerName(listenerName);
}
+ if (properties != null) {
+ properties.forEach((key, value) ->
lookup.addProperty().setKey(key).setValue(value));
+ }
return serializeWithSize(cmd);
}
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto
b/pulsar-common/src/main/proto/PulsarApi.proto
index 5067ed64079..19658c5e57f 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -446,6 +446,8 @@ message CommandLookupTopic {
optional string original_auth_method = 6;
//
optional string advertised_listener_name = 7;
+ // The properties used for topic lookup
+ repeated KeyValue properties = 8;
}
message CommandLookupTopicResponse {