This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 89f722fc9b67a646d54dc2318b830fab8b98a716 Author: Lari Hotari <[email protected]> AuthorDate: Sun Jan 21 09:31:05 2024 -0800 [fix][broker] Restore the broker id to match the format used in existing Pulsar releases (#21937) (cherry picked from commit 63473159c42b2867acbd8defe8e72f084dc32500) --- .../org/apache/pulsar/broker/PulsarService.java | 13 +++++++++--- .../pulsar/broker/admin/impl/BrokersBase.java | 14 ++++++------- .../apache/pulsar/broker/admin/AdminApi2Test.java | 7 +++---- .../apache/pulsar/broker/admin/AdminApiTest.java | 8 ++------ .../org/apache/pulsar/broker/admin/AdminTest.java | 4 ++-- .../pulsar/broker/admin/v1/V1_AdminApiTest.java | 5 +---- .../AntiAffinityNamespaceGroupTest.java | 4 ++-- .../broker/loadbalance/LoadBalancerTest.java | 4 ++-- .../impl/ModularLoadManagerImplTest.java | 4 ++-- .../broker/service/AdvertisedAddressTest.java | 2 +- .../pulsar/broker/service/BrokerServiceTest.java | 9 -------- .../pulsar/broker/service/ReplicatorTest.java | 6 ++---- .../pulsar/client/api/BrokerServiceLookupTest.java | 2 +- .../org/apache/pulsar/client/admin/Brokers.java | 24 +++++++++++----------- .../pulsar/client/admin/internal/BrokersImpl.java | 8 ++++---- 15 files changed, 51 insertions(+), 63 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 39a8293570b..64f4ee02881 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -830,10 +830,10 @@ public class PulsarService implements AutoCloseable, ShutdownService { this.brokerServiceUrlTls = brokerUrlTls(config); // the broker id is used in the load manager to identify the broker + // it should not be used for making connections to the broker this.brokerId = - String.format("%s:%s", advertisedAddress, config.getWebServicePortTls().isPresent() - ? config.getWebServicePortTls().get() - : config.getWebServicePort().orElseThrow()); + String.format("%s:%s", advertisedAddress, config.getWebServicePort() + .or(config::getWebServicePortTls).orElseThrow()); if (this.compactionServiceFactory == null) { this.compactionServiceFactory = loadCompactionServiceFactory(); @@ -1749,6 +1749,13 @@ public class PulsarService implements AutoCloseable, ShutdownService { return brokerServiceUrlTls != null ? brokerServiceUrlTls : brokerServiceUrl; } + /** + * Return the broker id. The broker id is used in the load manager to uniquely identify the broker at runtime. + * It should not be used for making connections to the broker. The broker id is available after {@link #start()} + * has been called. + * + * @return broker id + */ public String getBrokerId() { return Objects.requireNonNull(brokerId, "brokerId is not initialized before start has been called"); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java index ad3d7e789e4..f056b18f3f1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java @@ -85,7 +85,7 @@ public class BrokersBase extends AdminResource { @GET @Path("/{cluster}") @ApiOperation( - value = "Get the list of active brokers (web service addresses) in the cluster." + value = "Get the list of active brokers (broker ids) in the cluster." + "If authorization is not enabled, any cluster name is valid.", response = String.class, responseContainer = "Set") @@ -115,7 +115,7 @@ public class BrokersBase extends AdminResource { @GET @ApiOperation( - value = "Get the list of active brokers (web service addresses) in the local cluster." + value = "Get the list of active brokers (broker ids) in the local cluster." + "If authorization is not enabled", response = String.class, responseContainer = "Set") @@ -155,8 +155,8 @@ public class BrokersBase extends AdminResource { } @GET - @Path("/{clusterName}/{broker-webserviceurl}/ownedNamespaces") - @ApiOperation(value = "Get the list of namespaces served by the specific broker", + @Path("/{clusterName}/{brokerId}/ownedNamespaces") + @ApiOperation(value = "Get the list of namespaces served by the specific broker id", response = NamespaceOwnershipStatus.class, responseContainer = "Map") @ApiResponses(value = { @ApiResponse(code = 307, message = "Current broker doesn't serve the cluster"), @@ -164,9 +164,9 @@ public class BrokersBase extends AdminResource { @ApiResponse(code = 404, message = "Cluster doesn't exist") }) public void getOwnedNamespaces(@Suspended final AsyncResponse asyncResponse, @PathParam("clusterName") String cluster, - @PathParam("broker-webserviceurl") String broker) { + @PathParam("brokerId") String brokerId) { validateSuperUserAccessAsync() - .thenCompose(__ -> maybeRedirectToBroker(broker)) + .thenCompose(__ -> maybeRedirectToBroker(brokerId)) .thenCompose(__ -> validateClusterOwnershipAsync(cluster)) .thenCompose(__ -> pulsar().getNamespaceService().getOwnedNameSpacesStatusAsync()) .thenAccept(asyncResponse::resume) @@ -174,7 +174,7 @@ public class BrokersBase extends AdminResource { // If the exception is not redirect exception we need to log it. if (!isRedirectException(ex)) { LOG.error("[{}] Failed to get the namespace ownership status. cluster={}, broker={}", - clientAppId(), cluster, broker); + clientAppId(), cluster, brokerId); } resumeAsyncResponseExceptionally(asyncResponse, ex); return null; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java index 723d576dea3..dcdeb9d4855 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java @@ -515,8 +515,7 @@ public class AdminApi2Test extends MockedPulsarServiceBaseTest { assertEquals(topicStats.getSubscriptions().get("my-sub").getMsgDropRate(), 0); assertEquals(topicStats.getPublishers().size(), 0); assertEquals(topicStats.getMsgDropRate(), 0); - assertEquals(topicStats.getOwnerBroker(), - pulsar.getAdvertisedAddress() + ":" + pulsar.getConfiguration().getWebServicePort().get()); + assertEquals(topicStats.getOwnerBroker(), pulsar.getBrokerId()); PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(nonPersistentTopicName, false); assertEquals(internalStats.cursors.keySet(), Set.of("my-sub")); @@ -1309,7 +1308,7 @@ public class AdminApi2Test extends MockedPulsarServiceBaseTest { String cluster = pulsar.getConfiguration().getClusterName(); String namespaceRegex = "other/" + cluster + "/other.*"; String brokerName = pulsar.getAdvertisedAddress(); - String brokerAddress = brokerName + ":" + pulsar.getConfiguration().getWebServicePort().get(); + String brokerAddress = pulsar.getBrokerId(); Map<String, String> parameters1 = new HashMap<>(); parameters1.put("min_limit", "1"); @@ -1317,7 +1316,7 @@ public class AdminApi2Test extends MockedPulsarServiceBaseTest { NamespaceIsolationData nsPolicyData1 = NamespaceIsolationData.builder() .namespaces(Collections.singletonList(namespaceRegex)) - .primary(Collections.singletonList(brokerName + ":[0-9]*")) + .primary(Collections.singletonList(brokerName)) .secondary(Collections.singletonList(brokerName + ".*")) .autoFailoverPolicy(AutoFailoverPolicyData.builder() .policyType(AutoFailoverPolicyType.min_available) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index 78dc3d2cd85..2426be2bee9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -546,10 +546,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { } } - String[] parts = list.get(0).split(":"); - Assert.assertEquals(parts.length, 2); - Map<String, NamespaceOwnershipStatus> nsMap2 = adminTls.brokers().getOwnedNamespaces("test", - String.format("%s:%d", parts[0], pulsar.getListenPortHTTPS().get())); + Map<String, NamespaceOwnershipStatus> nsMap2 = adminTls.brokers().getOwnedNamespaces("test", list.get(0)); Assert.assertEquals(nsMap2.size(), 2); deleteNamespaceWithRetry("prop-xyz/ns1", false); @@ -940,8 +937,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { assertEquals(topicStats.getSubscriptions().get(subName).getConsumers().size(), 1); assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 10); assertEquals(topicStats.getPublishers().size(), 0); - assertEquals(topicStats.getOwnerBroker(), - pulsar.getAdvertisedAddress() + ":" + pulsar.getConfiguration().getWebServicePortTls().get()); + assertEquals(topicStats.getOwnerBroker(), pulsar.getBrokerId()); PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(persistentTopicName, false); assertEquals(internalStats.cursors.keySet(), Set.of(Codec.encode(subName))); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java index e9352503822..25ef8d8aee1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java @@ -302,7 +302,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest { NamespaceIsolationDataImpl policyData = NamespaceIsolationDataImpl.builder() .namespaces(Collections.singletonList("dummy/colo/ns")) - .primary(Collections.singletonList("localhost" + ":" + pulsar.getListenPortHTTP())) + .primary(Collections.singletonList(pulsar.getAdvertisedAddress())) .autoFailoverPolicy(AutoFailoverPolicyData.builder() .policyType(AutoFailoverPolicyType.min_available) .parameters(parameters1) @@ -722,7 +722,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest { assertTrue(res instanceof Set); Set<String> activeBrokers = (Set<String>) res; assertEquals(activeBrokers.size(), 1); - assertEquals(activeBrokers, Set.of(pulsar.getAdvertisedAddress() + ":" + pulsar.getListenPortHTTP().get())); + assertEquals(activeBrokers, Set.of(pulsar.getBrokerId())); Object leaderBrokerRes = asyncRequests(ctx -> brokers.getLeaderBroker(ctx)); assertTrue(leaderBrokerRes instanceof BrokerInfo); BrokerInfo leaderBroker = (BrokerInfo)leaderBrokerRes; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java index d0775b00ed8..43ec01f47f6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java @@ -458,10 +458,7 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest { } } - String[] parts = list.get(0).split(":"); - Assert.assertEquals(parts.length, 2); - Map<String, NamespaceOwnershipStatus> nsMap2 = adminTls.brokers().getOwnedNamespaces("use", - String.format("%s:%d", parts[0], pulsar.getListenPortHTTPS().get())); + Map<String, NamespaceOwnershipStatus> nsMap2 = adminTls.brokers().getOwnedNamespaces("use", list.get(0)); Assert.assertEquals(nsMap2.size(), 2); admin.namespaces().deleteNamespace("prop-xyz/use/ns1"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java index 560cfa9216a..5fbda961c0e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java @@ -103,14 +103,14 @@ public class AntiAffinityNamespaceGroupTest extends MockedPulsarServiceBaseTest setupConfigs(conf); super.internalSetup(conf); pulsar1 = pulsar; - primaryHost = String.format("%s:%d", "localhost", pulsar1.getListenPortHTTP().get()); + primaryHost = pulsar1.getBrokerId(); admin1 = admin; var config2 = getDefaultConf(); setupConfigs(config2); additionalPulsarTestContext = createAdditionalPulsarTestContext(config2); pulsar2 = additionalPulsarTestContext.getPulsarService(); - secondaryHost = String.format("%s:%d", "localhost", pulsar2.getListenPortHTTP().get()); + secondaryHost = pulsar2.getBrokerId(); primaryLoadManager = getField(pulsar1.getLoadManager().get(), "loadManager"); secondaryLoadManager = getField(pulsar2.getLoadManager().get(), "loadManager"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java index e350004a739..04a2175b1d1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java @@ -743,7 +743,7 @@ public class LoadBalancerTest { // set up policy that use this broker as secondary policyData = NamespaceIsolationData.builder() .namespaces(Collections.singletonList("pulsar/use/secondary-ns.*")) - .primary(Collections.singletonList(pulsarServices[0].getWebServiceAddress())) + .primary(Collections.singletonList(pulsarServices[0].getAdvertisedAddress())) .secondary(allExceptFirstBroker) .autoFailoverPolicy(AutoFailoverPolicyData.builder() .policyType(AutoFailoverPolicyType.min_available) @@ -755,7 +755,7 @@ public class LoadBalancerTest { // set up policy that do not use this broker (neither primary nor secondary) policyData = NamespaceIsolationData.builder() .namespaces(Collections.singletonList("pulsar/use/shared-ns.*")) - .primary(Collections.singletonList(pulsarServices[0].getWebServiceAddress())) + .primary(Collections.singletonList(pulsarServices[0].getAdvertisedAddress())) .secondary(allExceptFirstBroker) .autoFailoverPolicy(AutoFailoverPolicyData.builder() .policyType(AutoFailoverPolicyType.min_available) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java index c78ee257340..f6a467af41b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java @@ -180,7 +180,7 @@ public class ModularLoadManagerImplTest { pulsar1 = new PulsarService(config1); pulsar1.start(); - primaryBrokerId = String.format("%s:%d", "localhost", pulsar1.getListenPortHTTPS().get()); + primaryBrokerId = pulsar1.getBrokerId(); url1 = new URL(pulsar1.getWebServiceAddress()); admin1 = PulsarAdmin.builder().serviceHttpUrl(url1.toString()).build(); @@ -214,7 +214,7 @@ public class ModularLoadManagerImplTest { config.setBrokerServicePortTls(Optional.of(0)); pulsar3 = new PulsarService(config); - secondaryBrokerId = String.format("%s:%d", "localhost", pulsar2.getListenPortHTTPS().get()); + secondaryBrokerId = pulsar2.getBrokerId(); url2 = new URL(pulsar2.getWebServiceAddress()); admin2 = PulsarAdmin.builder().serviceHttpUrl(url2.toString()).build(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java index 554c663850f..19e40ebf996 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java @@ -75,7 +75,7 @@ public class AdvertisedAddressTest { Assert.assertEquals( pulsar.getAdvertisedAddress(), advertisedAddress ); Assert.assertEquals( pulsar.getBrokerServiceUrl(), String.format("pulsar://%s:%d", advertisedAddress, pulsar.getBrokerListenPort().get()) ); Assert.assertEquals( pulsar.getSafeWebServiceAddress(), String.format("http://%s:%d", advertisedAddress, pulsar.getListenPortHTTP().get()) ); - String brokerZkPath = String.format("/loadbalance/brokers/%s:%d", pulsar.getAdvertisedAddress(), pulsar.getListenPortHTTP().get()); + String brokerZkPath = String.format("/loadbalance/brokers/%s", pulsar.getBrokerId()); String bkBrokerData = new String(bkEnsemble.getZkClient().getData(brokerZkPath, false, new Stat()), StandardCharsets.UTF_8); JsonObject jsonBkBrokerData = new Gson().fromJson(bkBrokerData, JsonObject.class); Assert.assertEquals( jsonBkBrokerData.get("pulsarServiceUrl").getAsString(), pulsar.getBrokerServiceUrl() ); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index 4799c67cdfc..2b3d73a9579 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -1786,13 +1786,4 @@ public class BrokerServiceTest extends BrokerTestBase { fail("Unsubscribe failed"); } } - - @Test - public void testGetBrokerId() throws Exception { - cleanup(); - conf.setWebServicePortTls(Optional.of(8081)); - setup(); - assertEquals(pulsar.getBrokerId(), "localhost:8081"); - resetState(); - } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index c372e3029ab..159d49ca2e7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -239,16 +239,14 @@ public class ReplicatorTest extends ReplicatorTestBase { pulsar1.getConfiguration().setAuthorizationEnabled(true); //init clusterData - String cluster2ServiceUrls = String.format("%s,localhost:1234,localhost:5678,localhost:5677,localhost:5676", - pulsar2.getWebServiceAddress()); - ClusterData cluster2Data = ClusterData.builder().serviceUrl(cluster2ServiceUrls).build(); + ClusterData cluster2Data = ClusterData.builder().serviceUrl(pulsar2.getWebServiceAddress()).build(); String cluster2 = "activeCLuster2"; admin2.clusters().createCluster(cluster2, cluster2Data); Awaitility.await().until(() -> admin2.clusters().getCluster(cluster2) != null); List<String> list = admin1.brokers().getActiveBrokers(cluster2); - assertEquals(list.get(0), urlTls2.toString().replace("https://", "")); + assertEquals(list.get(0), pulsar2.getBrokerId()); //restore configuration pulsar1.getConfiguration().setAuthorizationEnabled(false); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java index 9a68e56171d..9c50c29f924 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java @@ -754,7 +754,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase { }); // Unload the NamespacePolicies and AntiAffinity check. - String currentBroker = String.format("%s:%d", "localhost", pulsar.getListenPortHTTP().get()); + String currentBroker = pulsar.getBrokerId(); assertTrue(loadManager.shouldNamespacePoliciesUnload(namespace,"0x00000000_0xffffffff", currentBroker)); assertTrue(loadManager.shouldAntiAffinityNamespaceUnload(namespace,"0x00000000_0xffffffff", currentBroker)); diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java index 29c280f8ba5..dc0b7c9885a 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java @@ -35,7 +35,7 @@ public interface Brokers { /** * Get the list of active brokers in the local cluster. * <p/> - * Get the list of active brokers (web service addresses) in the local cluster. + * Get the list of active brokers (broker ids) in the local cluster. * <p/> * Response Example: * @@ -44,7 +44,7 @@ public interface Brokers { * * * "prod1-broker3.messaging.use.example.com:8080"]</code> * </pre> * - * @return a list of (host:port) + * @return a list of broker ids * @throws NotAuthorizedException * You don't have admin permission to get the list of active brokers in the cluster * @throws PulsarAdminException @@ -55,7 +55,7 @@ public interface Brokers { /** * Get the list of active brokers in the local cluster asynchronously. * <p/> - * Get the list of active brokers (web service addresses) in the local cluster. + * Get the list of active brokers (broker ids) in the local cluster. * <p/> * Response Example: * @@ -64,13 +64,13 @@ public interface Brokers { * "prod1-broker3.messaging.use.example.com:8080"]</code> * </pre> * - * @return a list of (host:port) + * @return a list of broker ids */ CompletableFuture<List<String>> getActiveBrokersAsync(); /** * Get the list of active brokers in the cluster. * <p/> - * Get the list of active brokers (web service addresses) in the cluster. + * Get the list of active brokers (broker ids) in the cluster. * <p/> * Response Example: * @@ -81,7 +81,7 @@ public interface Brokers { * * @param cluster * Cluster name - * @return a list of (host:port) + * @return a list of broker ids * @throws NotAuthorizedException * You don't have admin permission to get the list of active brokers in the cluster * @throws NotFoundException @@ -94,7 +94,7 @@ public interface Brokers { /** * Get the list of active brokers in the cluster asynchronously. * <p/> - * Get the list of active brokers (web service addresses) in the cluster. + * Get the list of active brokers (broker ids) in the cluster. * <p/> * Response Example: * @@ -105,7 +105,7 @@ public interface Brokers { * * @param cluster * Cluster name - * @return a list of (host:port) + * @return a list of broker ids */ CompletableFuture<List<String>> getActiveBrokersAsync(String cluster); @@ -156,11 +156,11 @@ public interface Brokers { * </pre> * * @param cluster - * @param brokerUrl + * @param brokerId * @return * @throws PulsarAdminException */ - Map<String, NamespaceOwnershipStatus> getOwnedNamespaces(String cluster, String brokerUrl) + Map<String, NamespaceOwnershipStatus> getOwnedNamespaces(String cluster, String brokerId) throws PulsarAdminException; /** @@ -176,10 +176,10 @@ public interface Brokers { * </pre> * * @param cluster - * @param brokerUrl + * @param brokerId * @return */ - CompletableFuture<Map<String, NamespaceOwnershipStatus>> getOwnedNamespacesAsync(String cluster, String brokerUrl); + CompletableFuture<Map<String, NamespaceOwnershipStatus>> getOwnedNamespacesAsync(String cluster, String brokerId); /** * Update a dynamic configuration value into ZooKeeper. diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java index 0e6296724b3..7b4ebb1778d 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java @@ -75,15 +75,15 @@ public class BrokersImpl extends BaseResource implements Brokers { } @Override - public Map<String, NamespaceOwnershipStatus> getOwnedNamespaces(String cluster, String brokerUrl) + public Map<String, NamespaceOwnershipStatus> getOwnedNamespaces(String cluster, String brokerId) throws PulsarAdminException { - return sync(() -> getOwnedNamespacesAsync(cluster, brokerUrl)); + return sync(() -> getOwnedNamespacesAsync(cluster, brokerId)); } @Override public CompletableFuture<Map<String, NamespaceOwnershipStatus>> getOwnedNamespacesAsync( - String cluster, String brokerUrl) { - WebTarget path = adminBrokers.path(cluster).path(brokerUrl).path("ownedNamespaces"); + String cluster, String brokerId) { + WebTarget path = adminBrokers.path(cluster).path(brokerId).path("ownedNamespaces"); return asyncGetRequest(path, new FutureCallback<Map<String, NamespaceOwnershipStatus>>(){}); }
