This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 63473159c42 [fix][broker] Restore the broker id to match the format
used in existing Pulsar releases (#21937)
63473159c42 is described below
commit 63473159c42b2867acbd8defe8e72f084dc32500
Author: Lari Hotari <[email protected]>
AuthorDate: Sun Jan 21 09:31:05 2024 -0800
[fix][broker] Restore the broker id to match the format used in existing
Pulsar releases (#21937)
---
.../org/apache/pulsar/broker/PulsarService.java | 13 +++++++++---
.../pulsar/broker/admin/impl/BrokersBase.java | 14 ++++++-------
.../apache/pulsar/broker/admin/AdminApi2Test.java | 7 +++----
.../apache/pulsar/broker/admin/AdminApiTest.java | 8 ++------
.../org/apache/pulsar/broker/admin/AdminTest.java | 4 ++--
.../pulsar/broker/admin/v1/V1_AdminApiTest.java | 5 +----
.../AntiAffinityNamespaceGroupTest.java | 4 ++--
.../broker/loadbalance/LoadBalancerTest.java | 4 ++--
.../impl/ModularLoadManagerImplTest.java | 4 ++--
.../broker/service/AdvertisedAddressTest.java | 2 +-
.../pulsar/broker/service/BrokerServiceTest.java | 9 --------
.../pulsar/broker/service/ReplicatorTest.java | 6 ++----
.../pulsar/client/api/BrokerServiceLookupTest.java | 2 +-
.../org/apache/pulsar/client/admin/Brokers.java | 24 +++++++++++-----------
.../pulsar/client/admin/internal/BrokersImpl.java | 8 ++++----
15 files changed, 51 insertions(+), 63 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 42d43b3dcf2..83a91e6971e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -830,10 +830,10 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
this.brokerServiceUrlTls = brokerUrlTls(config);
// the broker id is used in the load manager to identify the broker
+ // it should not be used for making connections to the broker
this.brokerId =
- String.format("%s:%s", advertisedAddress,
config.getWebServicePortTls().isPresent()
- ? config.getWebServicePortTls().get()
- : config.getWebServicePort().orElseThrow());
+ String.format("%s:%s", advertisedAddress,
config.getWebServicePort()
+ .or(config::getWebServicePortTls).orElseThrow());
if (this.compactionServiceFactory == null) {
this.compactionServiceFactory = loadCompactionServiceFactory();
@@ -1702,6 +1702,13 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
return brokerServiceUrlTls != null ? brokerServiceUrlTls :
brokerServiceUrl;
}
+ /**
+ * Return the broker id. The broker id is used in the load manager to
uniquely identify the broker at runtime.
+ * It should not be used for making connections to the broker. The broker
id is available after {@link #start()}
+ * has been called.
+ *
+ * @return broker id
+ */
public String getBrokerId() {
return Objects.requireNonNull(brokerId,
"brokerId is not initialized before start has been called");
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index ad3d7e789e4..f056b18f3f1 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -85,7 +85,7 @@ public class BrokersBase extends AdminResource {
@GET
@Path("/{cluster}")
@ApiOperation(
- value = "Get the list of active brokers (web service addresses) in the
cluster."
+ value = "Get the list of active brokers (broker ids) in the cluster."
+ "If authorization is not enabled, any cluster name is
valid.",
response = String.class,
responseContainer = "Set")
@@ -115,7 +115,7 @@ public class BrokersBase extends AdminResource {
@GET
@ApiOperation(
- value = "Get the list of active brokers (web service addresses) in
the local cluster."
+ value = "Get the list of active brokers (broker ids) in the local
cluster."
+ "If authorization is not enabled",
response = String.class,
responseContainer = "Set")
@@ -155,8 +155,8 @@ public class BrokersBase extends AdminResource {
}
@GET
- @Path("/{clusterName}/{broker-webserviceurl}/ownedNamespaces")
- @ApiOperation(value = "Get the list of namespaces served by the specific
broker",
+ @Path("/{clusterName}/{brokerId}/ownedNamespaces")
+ @ApiOperation(value = "Get the list of namespaces served by the specific
broker id",
response = NamespaceOwnershipStatus.class, responseContainer =
"Map")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve
the cluster"),
@@ -164,9 +164,9 @@ public class BrokersBase extends AdminResource {
@ApiResponse(code = 404, message = "Cluster doesn't exist") })
public void getOwnedNamespaces(@Suspended final AsyncResponse
asyncResponse,
@PathParam("clusterName") String cluster,
- @PathParam("broker-webserviceurl") String
broker) {
+ @PathParam("brokerId") String brokerId) {
validateSuperUserAccessAsync()
- .thenCompose(__ -> maybeRedirectToBroker(broker))
+ .thenCompose(__ -> maybeRedirectToBroker(brokerId))
.thenCompose(__ -> validateClusterOwnershipAsync(cluster))
.thenCompose(__ ->
pulsar().getNamespaceService().getOwnedNameSpacesStatusAsync())
.thenAccept(asyncResponse::resume)
@@ -174,7 +174,7 @@ public class BrokersBase extends AdminResource {
// If the exception is not redirect exception we need to
log it.
if (!isRedirectException(ex)) {
LOG.error("[{}] Failed to get the namespace ownership
status. cluster={}, broker={}",
- clientAppId(), cluster, broker);
+ clientAppId(), cluster, brokerId);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 9a5d25fa0c8..f0bc80fa364 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -516,8 +516,7 @@ public class AdminApi2Test extends
MockedPulsarServiceBaseTest {
assertEquals(topicStats.getSubscriptions().get("my-sub").getMsgDropRate(), 0);
assertEquals(topicStats.getPublishers().size(), 0);
assertEquals(topicStats.getMsgDropRate(), 0);
- assertEquals(topicStats.getOwnerBroker(),
- pulsar.getAdvertisedAddress() + ":" +
pulsar.getConfiguration().getWebServicePort().get());
+ assertEquals(topicStats.getOwnerBroker(), pulsar.getBrokerId());
PersistentTopicInternalStats internalStats =
admin.topics().getInternalStats(nonPersistentTopicName, false);
assertEquals(internalStats.cursors.keySet(), Set.of("my-sub"));
@@ -1310,7 +1309,7 @@ public class AdminApi2Test extends
MockedPulsarServiceBaseTest {
String cluster = pulsar.getConfiguration().getClusterName();
String namespaceRegex = "other/" + cluster + "/other.*";
String brokerName = pulsar.getAdvertisedAddress();
- String brokerAddress = brokerName + ":" +
pulsar.getConfiguration().getWebServicePort().get();
+ String brokerAddress = pulsar.getBrokerId();
Map<String, String> parameters1 = new HashMap<>();
parameters1.put("min_limit", "1");
@@ -1318,7 +1317,7 @@ public class AdminApi2Test extends
MockedPulsarServiceBaseTest {
NamespaceIsolationData nsPolicyData1 = NamespaceIsolationData.builder()
.namespaces(Collections.singletonList(namespaceRegex))
- .primary(Collections.singletonList(brokerName + ":[0-9]*"))
+ .primary(Collections.singletonList(brokerName))
.secondary(Collections.singletonList(brokerName + ".*"))
.autoFailoverPolicy(AutoFailoverPolicyData.builder()
.policyType(AutoFailoverPolicyType.min_available)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 93cf369f7dd..b28cfc98fdb 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -549,10 +549,7 @@ public class AdminApiTest extends
MockedPulsarServiceBaseTest {
}
}
- String[] parts = list.get(0).split(":");
- Assert.assertEquals(parts.length, 2);
- Map<String, NamespaceOwnershipStatus> nsMap2 =
adminTls.brokers().getOwnedNamespaces("test",
- String.format("%s:%d", parts[0],
pulsar.getListenPortHTTPS().get()));
+ Map<String, NamespaceOwnershipStatus> nsMap2 =
adminTls.brokers().getOwnedNamespaces("test", list.get(0));
Assert.assertEquals(nsMap2.size(), 2);
deleteNamespaceWithRetry("prop-xyz/ns1", false);
@@ -943,8 +940,7 @@ public class AdminApiTest extends
MockedPulsarServiceBaseTest {
assertEquals(topicStats.getSubscriptions().get(subName).getConsumers().size(),
1);
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 10);
assertEquals(topicStats.getPublishers().size(), 0);
- assertEquals(topicStats.getOwnerBroker(),
- pulsar.getAdvertisedAddress() + ":" +
pulsar.getConfiguration().getWebServicePortTls().get());
+ assertEquals(topicStats.getOwnerBroker(), pulsar.getBrokerId());
PersistentTopicInternalStats internalStats =
admin.topics().getInternalStats(persistentTopicName, false);
assertEquals(internalStats.cursors.keySet(),
Set.of(Codec.encode(subName)));
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index 8a83682c1d2..2894903c0d0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -302,7 +302,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
NamespaceIsolationDataImpl policyData =
NamespaceIsolationDataImpl.builder()
.namespaces(Collections.singletonList("dummy/colo/ns"))
- .primary(Collections.singletonList("localhost" + ":" +
pulsar.getListenPortHTTP()))
+
.primary(Collections.singletonList(pulsar.getAdvertisedAddress()))
.autoFailoverPolicy(AutoFailoverPolicyData.builder()
.policyType(AutoFailoverPolicyType.min_available)
.parameters(parameters1)
@@ -722,7 +722,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
assertTrue(res instanceof Set);
Set<String> activeBrokers = (Set<String>) res;
assertEquals(activeBrokers.size(), 1);
- assertEquals(activeBrokers, Set.of(pulsar.getAdvertisedAddress() + ":"
+ pulsar.getListenPortHTTP().get()));
+ assertEquals(activeBrokers, Set.of(pulsar.getBrokerId()));
Object leaderBrokerRes = asyncRequests(ctx ->
brokers.getLeaderBroker(ctx));
assertTrue(leaderBrokerRes instanceof BrokerInfo);
BrokerInfo leaderBroker = (BrokerInfo)leaderBrokerRes;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
index 34bc1fa9a6a..f2faa98636b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
@@ -460,10 +460,7 @@ public class V1_AdminApiTest extends
MockedPulsarServiceBaseTest {
}
}
- String[] parts = list.get(0).split(":");
- Assert.assertEquals(parts.length, 2);
- Map<String, NamespaceOwnershipStatus> nsMap2 =
adminTls.brokers().getOwnedNamespaces("use",
- String.format("%s:%d", parts[0],
pulsar.getListenPortHTTPS().get()));
+ Map<String, NamespaceOwnershipStatus> nsMap2 =
adminTls.brokers().getOwnedNamespaces("use", list.get(0));
Assert.assertEquals(nsMap2.size(), 2);
admin.namespaces().deleteNamespace("prop-xyz/use/ns1");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
index 560cfa9216a..5fbda961c0e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
@@ -103,14 +103,14 @@ public class AntiAffinityNamespaceGroupTest extends
MockedPulsarServiceBaseTest
setupConfigs(conf);
super.internalSetup(conf);
pulsar1 = pulsar;
- primaryHost = String.format("%s:%d", "localhost",
pulsar1.getListenPortHTTP().get());
+ primaryHost = pulsar1.getBrokerId();
admin1 = admin;
var config2 = getDefaultConf();
setupConfigs(config2);
additionalPulsarTestContext =
createAdditionalPulsarTestContext(config2);
pulsar2 = additionalPulsarTestContext.getPulsarService();
- secondaryHost = String.format("%s:%d", "localhost",
pulsar2.getListenPortHTTP().get());
+ secondaryHost = pulsar2.getBrokerId();
primaryLoadManager = getField(pulsar1.getLoadManager().get(),
"loadManager");
secondaryLoadManager = getField(pulsar2.getLoadManager().get(),
"loadManager");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
index e4a66b1201c..7a2314b01a3 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
@@ -729,7 +729,7 @@ public class LoadBalancerTest {
// set up policy that use this broker as secondary
policyData = NamespaceIsolationData.builder()
.namespaces(Collections.singletonList("pulsar/use/secondary-ns.*"))
-
.primary(Collections.singletonList(pulsarServices[0].getWebServiceAddress()))
+
.primary(Collections.singletonList(pulsarServices[0].getAdvertisedAddress()))
.secondary(allExceptFirstBroker)
.autoFailoverPolicy(AutoFailoverPolicyData.builder()
.policyType(AutoFailoverPolicyType.min_available)
@@ -741,7 +741,7 @@ public class LoadBalancerTest {
// set up policy that do not use this broker (neither primary nor
secondary)
policyData = NamespaceIsolationData.builder()
.namespaces(Collections.singletonList("pulsar/use/shared-ns.*"))
-
.primary(Collections.singletonList(pulsarServices[0].getWebServiceAddress()))
+
.primary(Collections.singletonList(pulsarServices[0].getAdvertisedAddress()))
.secondary(allExceptFirstBroker)
.autoFailoverPolicy(AutoFailoverPolicyData.builder()
.policyType(AutoFailoverPolicyType.min_available)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
index b924a59bf7d..824291c52da 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
@@ -181,7 +181,7 @@ public class ModularLoadManagerImplTest {
pulsar1 = new PulsarService(config1);
pulsar1.start();
- primaryBrokerId = String.format("%s:%d", "localhost",
pulsar1.getListenPortHTTPS().get());
+ primaryBrokerId = pulsar1.getBrokerId();
url1 = new URL(pulsar1.getWebServiceAddress());
admin1 = PulsarAdmin.builder().serviceHttpUrl(url1.toString()).build();
@@ -215,7 +215,7 @@ public class ModularLoadManagerImplTest {
config.setBrokerServicePortTls(Optional.of(0));
pulsar3 = new PulsarService(config);
- secondaryBrokerId = String.format("%s:%d", "localhost",
pulsar2.getListenPortHTTPS().get());
+ secondaryBrokerId = pulsar2.getBrokerId();
url2 = new URL(pulsar2.getWebServiceAddress());
admin2 = PulsarAdmin.builder().serviceHttpUrl(url2.toString()).build();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java
index 554c663850f..19e40ebf996 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java
@@ -75,7 +75,7 @@ public class AdvertisedAddressTest {
Assert.assertEquals( pulsar.getAdvertisedAddress(), advertisedAddress
);
Assert.assertEquals( pulsar.getBrokerServiceUrl(),
String.format("pulsar://%s:%d", advertisedAddress,
pulsar.getBrokerListenPort().get()) );
Assert.assertEquals( pulsar.getSafeWebServiceAddress(),
String.format("http://%s:%d", advertisedAddress,
pulsar.getListenPortHTTP().get()) );
- String brokerZkPath = String.format("/loadbalance/brokers/%s:%d",
pulsar.getAdvertisedAddress(), pulsar.getListenPortHTTP().get());
+ String brokerZkPath = String.format("/loadbalance/brokers/%s",
pulsar.getBrokerId());
String bkBrokerData = new
String(bkEnsemble.getZkClient().getData(brokerZkPath, false, new Stat()),
StandardCharsets.UTF_8);
JsonObject jsonBkBrokerData = new Gson().fromJson(bkBrokerData,
JsonObject.class);
Assert.assertEquals(
jsonBkBrokerData.get("pulsarServiceUrl").getAsString(),
pulsar.getBrokerServiceUrl() );
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 3600850974c..b6a73274f44 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -1724,13 +1724,4 @@ public class BrokerServiceTest extends BrokerTestBase {
fail("Unsubscribe failed");
}
}
-
- @Test
- public void testGetBrokerId() throws Exception {
- cleanup();
- conf.setWebServicePortTls(Optional.of(8081));
- setup();
- assertEquals(pulsar.getBrokerId(), "localhost:8081");
- resetState();
- }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index ef6d751548d..88a668e8745 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -239,16 +239,14 @@ public class ReplicatorTest extends ReplicatorTestBase {
pulsar1.getConfiguration().setAuthorizationEnabled(true);
//init clusterData
- String cluster2ServiceUrls =
String.format("%s,localhost:1234,localhost:5678,localhost:5677,localhost:5676",
- pulsar2.getWebServiceAddress());
- ClusterData cluster2Data =
ClusterData.builder().serviceUrl(cluster2ServiceUrls).build();
+ ClusterData cluster2Data =
ClusterData.builder().serviceUrl(pulsar2.getWebServiceAddress()).build();
String cluster2 = "activeCLuster2";
admin2.clusters().createCluster(cluster2, cluster2Data);
Awaitility.await().until(()
-> admin2.clusters().getCluster(cluster2) != null);
List<String> list = admin1.brokers().getActiveBrokers(cluster2);
- assertEquals(list.get(0), urlTls2.toString().replace("https://", ""));
+ assertEquals(list.get(0), pulsar2.getBrokerId());
//restore configuration
pulsar1.getConfiguration().setAuthorizationEnabled(false);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
index cb72c7d42cd..dab4fe9087e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
@@ -760,7 +760,7 @@ public class BrokerServiceLookupTest extends
ProducerConsumerBase {
});
// Unload the NamespacePolicies and AntiAffinity check.
- String currentBroker = String.format("%s:%d", "localhost",
pulsar.getListenPortHTTP().get());
+ String currentBroker = pulsar.getBrokerId();
assertTrue(loadManager.shouldNamespacePoliciesUnload(namespace,"0x00000000_0xffffffff",
currentBroker));
assertTrue(loadManager.shouldAntiAffinityNamespaceUnload(namespace,"0x00000000_0xffffffff",
currentBroker));
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java
index 29c280f8ba5..dc0b7c9885a 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java
@@ -35,7 +35,7 @@ public interface Brokers {
/**
* Get the list of active brokers in the local cluster.
* <p/>
- * Get the list of active brokers (web service addresses) in the local
cluster.
+ * Get the list of active brokers (broker ids) in the local cluster.
* <p/>
* Response Example:
*
@@ -44,7 +44,7 @@ public interface Brokers {
* * * "prod1-broker3.messaging.use.example.com:8080"]</code>
* </pre>
*
- * @return a list of (host:port)
+ * @return a list of broker ids
* @throws NotAuthorizedException
* You don't have admin permission to get the list of active
brokers in the cluster
* @throws PulsarAdminException
@@ -55,7 +55,7 @@ public interface Brokers {
/**
* Get the list of active brokers in the local cluster asynchronously.
* <p/>
- * Get the list of active brokers (web service addresses) in the local
cluster.
+ * Get the list of active brokers (broker ids) in the local cluster.
* <p/>
* Response Example:
*
@@ -64,13 +64,13 @@ public interface Brokers {
* "prod1-broker3.messaging.use.example.com:8080"]</code>
* </pre>
*
- * @return a list of (host:port)
+ * @return a list of broker ids
*/
CompletableFuture<List<String>> getActiveBrokersAsync();
/**
* Get the list of active brokers in the cluster.
* <p/>
- * Get the list of active brokers (web service addresses) in the cluster.
+ * Get the list of active brokers (broker ids) in the cluster.
* <p/>
* Response Example:
*
@@ -81,7 +81,7 @@ public interface Brokers {
*
* @param cluster
* Cluster name
- * @return a list of (host:port)
+ * @return a list of broker ids
* @throws NotAuthorizedException
* You don't have admin permission to get the list of active
brokers in the cluster
* @throws NotFoundException
@@ -94,7 +94,7 @@ public interface Brokers {
/**
* Get the list of active brokers in the cluster asynchronously.
* <p/>
- * Get the list of active brokers (web service addresses) in the cluster.
+ * Get the list of active brokers (broker ids) in the cluster.
* <p/>
* Response Example:
*
@@ -105,7 +105,7 @@ public interface Brokers {
*
* @param cluster
* Cluster name
- * @return a list of (host:port)
+ * @return a list of broker ids
*/
CompletableFuture<List<String>> getActiveBrokersAsync(String cluster);
@@ -156,11 +156,11 @@ public interface Brokers {
* </pre>
*
* @param cluster
- * @param brokerUrl
+ * @param brokerId
* @return
* @throws PulsarAdminException
*/
- Map<String, NamespaceOwnershipStatus> getOwnedNamespaces(String cluster,
String brokerUrl)
+ Map<String, NamespaceOwnershipStatus> getOwnedNamespaces(String cluster,
String brokerId)
throws PulsarAdminException;
/**
@@ -176,10 +176,10 @@ public interface Brokers {
* </pre>
*
* @param cluster
- * @param brokerUrl
+ * @param brokerId
* @return
*/
- CompletableFuture<Map<String, NamespaceOwnershipStatus>>
getOwnedNamespacesAsync(String cluster, String brokerUrl);
+ CompletableFuture<Map<String, NamespaceOwnershipStatus>>
getOwnedNamespacesAsync(String cluster, String brokerId);
/**
* Update a dynamic configuration value into ZooKeeper.
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java
index 0e6296724b3..7b4ebb1778d 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java
@@ -75,15 +75,15 @@ public class BrokersImpl extends BaseResource implements
Brokers {
}
@Override
- public Map<String, NamespaceOwnershipStatus> getOwnedNamespaces(String
cluster, String brokerUrl)
+ public Map<String, NamespaceOwnershipStatus> getOwnedNamespaces(String
cluster, String brokerId)
throws PulsarAdminException {
- return sync(() -> getOwnedNamespacesAsync(cluster, brokerUrl));
+ return sync(() -> getOwnedNamespacesAsync(cluster, brokerId));
}
@Override
public CompletableFuture<Map<String, NamespaceOwnershipStatus>>
getOwnedNamespacesAsync(
- String cluster, String brokerUrl) {
- WebTarget path =
adminBrokers.path(cluster).path(brokerUrl).path("ownedNamespaces");
+ String cluster, String brokerId) {
+ WebTarget path =
adminBrokers.path(cluster).path(brokerId).path("ownedNamespaces");
return asyncGetRequest(path, new FutureCallback<Map<String,
NamespaceOwnershipStatus>>(){});
}