sijie closed pull request #2005: Added REST/CLI tools to manage bookies rack 
placement config
URL: https://github.com/apache/incubator-pulsar/pull/2005
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
index 3e62c9aa46..e35daef3e2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
@@ -38,11 +38,13 @@
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.zookeeper.ZkBookieRackAffinityMapping;
 import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
 import org.apache.pulsar.zookeeper.ZooKeeperClientFactory.SessionType;
 import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.Stat;
@@ -136,6 +138,15 @@ public static void main(String[] args) throws Exception {
             throw new IOException("Failed to initialize BookKeeper metadata");
         }
 
+        if (localZk.exists(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, 
false) == null) {
+            try {
+                
localZk.create(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, 
"{}".getBytes(), Ids.OPEN_ACL_UNSAFE,
+                        CreateMode.PERSISTENT);
+            } catch (NodeExistsException e) {
+                // Ignore
+            }
+        }
+
         localZk.create("/managed-ledgers", new byte[0], 
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
         localZk.create("/namespace", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Bookies.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Bookies.java
new file mode 100644
index 0000000000..f15dd57ecf
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Bookies.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin.v2;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+
+import java.util.Map.Entry;
+import java.util.Optional;
+
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response.Status;
+
+import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.common.policies.data.BookieInfo;
+import org.apache.pulsar.common.policies.data.BookiesRackConfiguration;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.zookeeper.ZkBookieRackAffinityMapping;
+import org.apache.pulsar.zookeeper.ZooKeeperCache.Deserializer;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Path("/bookies")
+@Api(value = "/bookies", description = "Configure bookies rack placement", 
tags = "bookies")
+@Produces(MediaType.APPLICATION_JSON)
+public class Bookies extends AdminResource {
+
+    @GET
+    @Path("/racks-info")
+    @ApiOperation(value = "Gets the rack placement information for all the 
bookies in the cluster", response = BookiesRackConfiguration.class)
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission") })
+    public BookiesRackConfiguration getBookiesRackInfo() throws Exception {
+        validateSuperUserAccess();
+
+        return 
localZkCache().getData(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
+                new Deserializer<BookiesRackConfiguration>() {
+
+                    @Override
+                    public BookiesRackConfiguration deserialize(String key, 
byte[] content) throws Exception {
+                        return 
ObjectMapperFactory.getThreadLocal().readValue(content, 
BookiesRackConfiguration.class);
+                    }
+
+                }).orElse(new BookiesRackConfiguration());
+    }
+
+    @GET
+    @Path("/racks-info/{bookie}")
+    @ApiOperation(value = "Gets the rack placement information for a specific 
bookie in the cluster", response = BookieInfo.class)
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission") })
+    public BookieInfo getBookieRackInfo(@PathParam("bookie") String 
bookieAddress) throws Exception {
+        validateSuperUserAccess();
+
+        BookiesRackConfiguration racks = localZkCache()
+                .getData(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, 
(key, content) -> ObjectMapperFactory
+                        .getThreadLocal().readValue(content, 
BookiesRackConfiguration.class))
+                .orElse(new BookiesRackConfiguration());
+
+        return racks.getBookie(bookieAddress)
+                .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Bookie 
address not found: " + bookieAddress));
+    }
+
+    @DELETE
+    @Path("/racks-info/{bookie}")
+    @ApiOperation(value = "Removed the rack placement information for a 
specific bookie in the cluster")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission") })
+    public void deleteBookieRackInfo(@PathParam("bookie") String 
bookieAddress) throws Exception {
+        validateSuperUserAccess();
+
+        BookiesRackConfiguration racks = localZkCache()
+                .getData(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, 
(key, content) -> ObjectMapperFactory
+                        .getThreadLocal().readValue(content, 
BookiesRackConfiguration.class))
+                .orElse(new BookiesRackConfiguration());
+
+        if (!racks.removeBookie(bookieAddress)) {
+            throw new RestException(Status.NOT_FOUND, "Bookie address not 
found: " + bookieAddress);
+        }
+
+        log.info("Removed {} from rack mapping info", bookieAddress);
+    }
+
+    @POST
+    @Path("/racks-info/{bookie}")
+    @ApiOperation(value = "Updates the rack placement information for a 
specific bookie in the cluster")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission") })
+    public void updateBookieRackInfo(@PathParam("bookie") String 
bookieAddress, @QueryParam("group") String group,
+            BookieInfo bookieInfo) throws Exception {
+        validateSuperUserAccess();
+
+        if (group == null) {
+            throw new RestException(Status.PRECONDITION_FAILED, "Bookie 
'group' parameters is missing");
+        }
+
+        Optional<Entry<BookiesRackConfiguration, Stat>> entry = localZkCache()
+                .getEntry(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, 
(key, content) -> ObjectMapperFactory
+                        .getThreadLocal().readValue(content, 
BookiesRackConfiguration.class));
+
+        if (entry.isPresent()) {
+            // Update the racks info
+            BookiesRackConfiguration racks = entry.get().getKey();
+            racks.updateBookie(group, bookieAddress, bookieInfo);
+
+            
localZk().setData(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, 
jsonMapper().writeValueAsBytes(racks),
+                    entry.get().getValue().getVersion());
+            log.info("Updated rack mapping info for {}", bookieAddress);
+        } else {
+            // Creates the z-node with racks info
+            BookiesRackConfiguration racks = new BookiesRackConfiguration();
+            racks.updateBookie(group, bookieAddress, bookieInfo);
+            zkCreate(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, 
jsonMapper().writeValueAsBytes(racks));
+            log.info("Created rack mapping info and added {}", bookieAddress);
+        }
+    }
+
+    private static final Logger log = LoggerFactory.getLogger(Bookies.class);
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
index 3126b7f599..173fb493be 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
@@ -71,11 +71,22 @@
     private final int ZOOKEEPER_PORT = 12759;
     protected final int BROKER_WEBSERVICE_PORT = 15782;
 
+    protected final int bkBasePort = 5001;
+    private final int numberOfBookies;
+
+    public BrokerBkEnsemblesTests() {
+        this(3);
+    }
+
+    public BrokerBkEnsemblesTests(int numberOfBookies) {
+        this.numberOfBookies = numberOfBookies;
+    }
+
     @BeforeMethod
     void setup() throws Exception {
         try {
             // start local bookie and zookeeper
-            bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, 5001);
+            bkEnsemble = new LocalBookkeeperEnsemble(numberOfBookies, 
ZOOKEEPER_PORT, 5001);
             bkEnsemble.start();
 
             // start pulsar service
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/RackAwareTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/RackAwareTest.java
new file mode 100644
index 0000000000..215d3503a8
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/RackAwareTest.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.testng.Assert.assertTrue;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.pulsar.common.policies.data.BookieInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class RackAwareTest extends BrokerBkEnsemblesTests {
+
+    private static final int NUM_BOOKIES = 6;
+    private final List<BookieServer> bookies = new ArrayList<>();
+
+    public RackAwareTest() {
+        // Start bookies manually
+        super(0);
+    }
+
+    @SuppressWarnings("deprecation")
+    @BeforeClass
+    void setup() throws Exception {
+        super.setup();
+
+        // Start bookies with specific racks
+        for (int i = 0; i < NUM_BOOKIES; i++) {
+            File bkDataDir = Files.createTempDirectory("bk" + 
Integer.toString(i) + "test").toFile();
+            ServerConfiguration conf = new ServerConfiguration();
+
+            int bookiePort = PortManager.nextFreePort();
+            conf.setBookiePort(bookiePort);
+            conf.setZkServers("127.0.0.1:" + 
this.bkEnsemble.getZkServer().getClientPort());
+            conf.setJournalDirName(bkDataDir.getPath());
+            conf.setLedgerDirNames(new String[] { bkDataDir.getPath() });
+            conf.setAllowLoopback(true);
+
+            // Use different advertised addresses for each bookie, so we can 
place them in different
+            // racks.
+            // Eg: 1st bookie will be 10.0.0.1, 2nd 10.0.0.2 and so on
+            String addr = String.format("10.0.0.%d", i + 1);
+            conf.setAdvertisedAddress(addr);
+
+            BookieServer bs = new BookieServer(conf, NullStatsLogger.INSTANCE);
+
+            bs.start();
+            bookies.add(bs);
+            log.info("Local BK[{}] started (port: {}, adddress: {})", i, 
bookiePort, addr);
+        }
+
+    }
+
+    @AfterClass
+    void shutdown() throws Exception {
+        super.shutdown();
+
+        for (BookieServer bs : bookies) {
+            bs.shutdown();
+        }
+
+        bookies.clear();
+    }
+
+    @Test
+    public void testPlacement() throws Exception {
+        for (int i = 0; i < NUM_BOOKIES; i++) {
+            String bookie = bookies.get(i).getLocalAddress().toString();
+
+            // Place bookie-1 in "rack-1" and the rest in "rack-2"
+            int rackId = i == 0 ? 1 : 2;
+            BookieInfo bi = new BookieInfo("rack-" + rackId, "bookie-" + (i + 
1));
+            log.info("setting rack for bookie at {} -- {}", bookie, bi);
+            admin.bookies().updateBookieRackInfo(bookie, "default", bi);
+        }
+
+        // Make sure the racks cache gets updated through the ZK watch
+        Thread.sleep(1000);
+
+        BookKeeper bkc = this.pulsar.getBookKeeperClient();
+
+        // Create few ledgers and verify all of them should have a copy in the 
first bookie
+        BookieSocketAddress fistBookie = bookies.get(0).getLocalAddress();
+        for (int i = 0; i < 100; i++) {
+            LedgerHandle lh = bkc.createLedger(2, 2, DigestType.DUMMY, new 
byte[0]);
+            log.info("Ledger: {} -- Ensemble: {}", i, 
lh.getLedgerMetadata().getEnsembleAt(0));
+            
assertTrue(lh.getLedgerMetadata().getEnsembleAt(0).contains(fistBookie),
+                    "first bookie in rack 0 not included in ensemble");
+            lh.close();
+        }
+    }
+
+    public void testCrashBrokerWithoutCursorLedgerLeak() throws Exception {
+        // Ignore test
+    }
+
+    public void testSkipCorruptDataLedger() throws Exception {
+        // Ignore test
+    }
+
+    private static final Logger log = 
LoggerFactory.getLogger(RackAwareTest.class);
+}
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Bookies.java 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Bookies.java
new file mode 100644
index 0000000000..171fc2f729
--- /dev/null
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Bookies.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.admin;
+
+import org.apache.pulsar.common.policies.data.BookieInfo;
+import org.apache.pulsar.common.policies.data.BookiesRackConfiguration;
+
+/**
+ * Admin interface for bookies rack placement management.
+ */
+public interface Bookies {
+
+    /**
+     * Gets the rack placement information for all the bookies in the cluster
+     */
+    BookiesRackConfiguration getBookiesRackInfo() throws PulsarAdminException;
+
+    /**
+     * Gets the rack placement information for a specific bookie in the cluster
+     */
+    BookieInfo getBookieRackInfo(String bookieAddress) throws 
PulsarAdminException;
+
+    /**
+     * Remove rack placement information for a specific bookie in the cluster
+     */
+    void deleteBookieRackInfo(String bookieAddress) throws 
PulsarAdminException;
+
+    /**
+     * Updates the rack placement information for a specific bookie in the 
cluster
+     */
+    void updateBookieRackInfo(String bookieAddress, String group, BookieInfo 
bookieInfo) throws PulsarAdminException;
+}
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
index b2db7e2dde..48aea94289 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
@@ -32,6 +32,7 @@
 import org.apache.commons.lang3.StringUtils;
 import org.apache.http.conn.ssl.DefaultHostnameVerifier;
 import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import org.apache.pulsar.client.admin.internal.BookiesImpl;
 import org.apache.pulsar.client.admin.internal.BrokerStatsImpl;
 import org.apache.pulsar.client.admin.internal.BrokersImpl;
 import org.apache.pulsar.client.admin.internal.ClustersImpl;
@@ -74,6 +75,7 @@
     private final Tenants tenants;
     private final Properties properties;
     private final Namespaces namespaces;
+    private final Bookies bookies;
     private final TopicsImpl topics;
     private final NonPersistentTopics nonPersistentTopics;
     private final ResourceQuotas resourceQuotas;
@@ -186,6 +188,7 @@ public PulsarAdmin(String serviceUrl, 
ClientConfigurationData clientConfigData)
         this.lookups = new LookupImpl(root, auth, useTls);
         this.functions = new FunctionsImpl(root, auth);
         this.schemas = new SchemasImpl(root, auth);
+        this.bookies = new BookiesImpl(root, auth);
     }
 
     /**
@@ -304,6 +307,13 @@ public Topics topics() {
         return topics;
     }
 
+    /**
+     * @return the bookies management object
+     */
+    public Bookies bookies() {
+        return bookies;
+    }
+
     /**
      * @return the persistentTopics management object
      * @deprecated Since 2.0. See {@link #topics()}
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BookiesImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BookiesImpl.java
new file mode 100644
index 0000000000..7680ce994b
--- /dev/null
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BookiesImpl.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.admin.internal;
+
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.MediaType;
+
+import org.apache.pulsar.client.admin.Bookies;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.common.policies.data.BookieInfo;
+import org.apache.pulsar.common.policies.data.BookiesRackConfiguration;
+import org.apache.pulsar.common.policies.data.ErrorData;
+
+public class BookiesImpl extends BaseResource implements Bookies {
+    private final WebTarget adminBookies;
+
+    public BookiesImpl(WebTarget web, Authentication auth) {
+        super(auth);
+        adminBookies = web.path("/admin/v2/bookies");
+    }
+
+    @Override
+    public BookiesRackConfiguration getBookiesRackInfo() throws 
PulsarAdminException {
+        try {
+            return 
request(adminBookies.path("racks-info")).get(BookiesRackConfiguration.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public BookieInfo getBookieRackInfo(String bookieAddress) throws 
PulsarAdminException {
+        try {
+            return 
request(adminBookies.path("racks-info").path(bookieAddress)).get(BookieInfo.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void deleteBookieRackInfo(String bookieAddress) throws 
PulsarAdminException {
+        try {
+            
request(adminBookies.path("racks-info").path(bookieAddress)).delete(ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void updateBookieRackInfo(String bookieAddress, String group, 
BookieInfo bookieInfo)
+            throws PulsarAdminException {
+        try {
+            
request(adminBookies.path("racks-info").path(bookieAddress).queryParam("group", 
group))
+                    .post(Entity.entity(bookieInfo, 
MediaType.APPLICATION_JSON), ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+}
diff --git 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index a5b1581164..c1a69d5e67 100644
--- 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -28,6 +28,7 @@
 
 import java.util.EnumSet;
 
+import org.apache.pulsar.client.admin.Bookies;
 import org.apache.pulsar.client.admin.BrokerStats;
 import org.apache.pulsar.client.admin.Brokers;
 import org.apache.pulsar.client.admin.Clusters;
@@ -43,6 +44,7 @@
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.BacklogQuota.RetentionPolicy;
+import org.apache.pulsar.common.policies.data.BookieInfo;
 import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.FailureDomain;
@@ -688,6 +690,27 @@ void nonPersistentTopics() throws Exception {
 
     }
 
+    @Test
+    void bookies() throws Exception {
+        PulsarAdmin admin = Mockito.mock(PulsarAdmin.class);
+        Bookies mockBookies = mock(Bookies.class);
+        doReturn(mockBookies).when(admin).bookies();
+
+        CmdBookies bookies = new CmdBookies(admin);
+
+        bookies.run(split("racks-placement"));
+        verify(mockBookies).getBookiesRackInfo();
+
+        bookies.run(split("get-bookie-rack --bookie my-bookie:3181"));
+        verify(mockBookies).getBookieRackInfo("my-bookie:3181");
+
+        bookies.run(split("delete-bookie-rack --bookie my-bookie:3181"));
+        verify(mockBookies).deleteBookieRackInfo("my-bookie:3181");
+
+        bookies.run(split("set-bookie-rack --group my-group --bookie 
my-bookie:3181 --rack rack-1 --hostname host-1"));
+        verify(mockBookies).updateBookieRackInfo("my-bookie:3181", "my-group", 
new BookieInfo("rack-1", "host-1"));
+    }
+
     String[] split(String s) {
         return s.split(" ");
     }
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBookies.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBookies.java
new file mode 100644
index 0000000000..13056b4001
--- /dev/null
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBookies.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.admin.cli;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.common.policies.data.BookieInfo;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+
+@Parameters(commandDescription = "Operations about bookies rack placement")
+public class CmdBookies extends CmdBase {
+
+    @Parameters(commandDescription = "Gets the rack placement information for 
all the bookies in the cluster")
+    private class GetAll extends CliCommand {
+
+        @Override
+        void run() throws Exception {
+            print(admin.bookies().getBookiesRackInfo());
+        }
+    }
+
+    @Parameters(commandDescription = "Gets the rack placement information for 
a specific bookie in the cluster")
+    private class GetBookie extends CliCommand {
+
+        @Parameter(names = { "-b", "--bookie" }, description = "bookie 
address", required = true)
+        private String bookieAddress;
+
+        @Override
+        void run() throws Exception {
+            print(admin.bookies().getBookieRackInfo(bookieAddress));
+        }
+    }
+
+    @Parameters(commandDescription = "Remove rack placement information for a 
specific bookie in the cluster")
+    private class RemoveBookie extends CliCommand {
+
+        @Parameter(names = { "-b", "--bookie" }, description = "bookie 
address", required = true)
+        private String bookieAddress;
+
+        @Override
+        void run() throws Exception {
+            admin.bookies().deleteBookieRackInfo(bookieAddress);
+        }
+    }
+
+    @Parameters(commandDescription = "Updates the rack placement information 
for a specific bookie in the cluster")
+    private class UpdateBookie extends CliCommand {
+        @Parameter(names = { "-g", "--group" }, description = "Bookie group 
name", required = false)
+        private String group = "default";
+
+        @Parameter(names = { "-b", "--bookie" }, description = "Bookie 
address", required = true)
+        private String bookieAddress;
+
+        @Parameter(names = { "-r", "--rack" }, description = "Bookie rack 
name", required = true)
+        private String bookieRack;
+
+        @Parameter(names = { "--hostname" }, description = "Bookie host name", 
required = false)
+        private String bookieHost;
+
+        @Override
+        void run() throws Exception {
+            admin.bookies().updateBookieRackInfo(bookieAddress, group, new 
BookieInfo(bookieRack, bookieHost));
+        }
+    }
+
+    public CmdBookies(PulsarAdmin admin) {
+        super("bookies", admin);
+        jcommander.addCommand("racks-placement", new GetAll());
+        jcommander.addCommand("get-bookie-rack", new GetBookie());
+        jcommander.addCommand("delete-bookie-rack", new RemoveBookie());
+        jcommander.addCommand("set-bookie-rack", new UpdateBookie());
+    }
+}
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
index 27bac1153a..60cb84e6ab 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
@@ -90,6 +90,7 @@
         commandMap.put("namespaces", CmdNamespaces.class);
         commandMap.put("topics", CmdTopics.class);
         commandMap.put("schemas", CmdSchemas.class);
+        commandMap.put("bookies", CmdBookies.class);
 
         // Hidden deprecated "persistent" and "non-persistent" subcommands
         commandMap.put("persistent", CmdPersistentTopics.class);
diff --git 
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/BookieInfo.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BookieInfo.java
similarity index 73%
rename from 
pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/BookieInfo.java
rename to 
pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BookieInfo.java
index 2df4a50c78..a0d4a0e890 100644
--- 
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/BookieInfo.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BookieInfo.java
@@ -16,26 +16,16 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.zookeeper;
+package org.apache.pulsar.common.policies.data;
 
-public class BookieInfo {
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
 
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class BookieInfo {
     private String rack;
     private String hostname;
-
-    public String getRack() {
-        return rack;
-    }
-
-    public void setRack(String rack) {
-        this.rack = rack;
-    }
-
-    public String getHostname() {
-        return hostname;
-    }
-
-    public void setHostname(String hostname) {
-        this.hostname = hostname;
-    }
-}
\ No newline at end of file
+}
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BookiesRackConfiguration.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BookiesRackConfiguration.java
new file mode 100644
index 0000000000..a2741b2763
--- /dev/null
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BookiesRackConfiguration.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.policies.data;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.TreeMap;
+
+public class BookiesRackConfiguration extends TreeMap<String, Map<String, 
BookieInfo>> {
+
+    public boolean removeBookie(String address) {
+        for (Map<String, BookieInfo> m : values()) {
+            if (m.remove(address) != null ) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    public Optional<BookieInfo> getBookie(String address) {
+        for (Map<String, BookieInfo> m : values()) {
+            BookieInfo bi = m.get(address);
+            if (bi != null) {
+                return Optional.of(bi);
+            }
+        }
+        return Optional.empty();
+    }
+
+    public void updateBookie(String group, String address, BookieInfo 
bookieInfo) {
+        checkNotNull(group);
+        checkNotNull(address);
+        checkNotNull(bookieInfo);
+
+        // Remove from any group first
+        removeBookie(address);
+        computeIfAbsent(group, key -> new TreeMap<>()).put(address, 
bookieInfo);
+    }
+}
\ No newline at end of file
diff --git 
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
 
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
index 9abca4caf3..94cd32582d 100644
--- 
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
+++ 
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
@@ -190,6 +190,11 @@ private void initializeZookeper() throws IOException {
             if (zkc.exists("/ledgers/available/readonly", false) == null) {
                 zkc.create("/ledgers/available/readonly", new byte[0], 
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
             }
+            if (zkc.exists(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, 
false) == null) {
+                zkc.create(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, 
"{}".getBytes(), Ids.OPEN_ACL_UNSAFE,
+                        CreateMode.PERSISTENT);
+            }
+
             // No need to create an entry for each requested bookie anymore as 
the
             // BookieServers will register themselves with ZooKeeper on 
startup.
         } catch (KeeperException e) {
diff --git 
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMapping.java
 
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMapping.java
index 28b38e7f26..ff50d8b863 100644
--- 
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMapping.java
+++ 
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMapping.java
@@ -18,10 +18,13 @@
  */
 package org.apache.pulsar.zookeeper;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 import org.apache.bookkeeper.client.RackChangeNotifier;
 import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl;
@@ -31,39 +34,43 @@
 import org.apache.bookkeeper.net.NetworkTopology;
 import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
 import org.apache.commons.configuration.Configuration;
+import org.apache.pulsar.common.policies.data.BookieInfo;
+import org.apache.pulsar.common.policies.data.BookiesRackConfiguration;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
-import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
 /**
  * It provides the mapping of bookies to its rack from zookeeper.
  */
 public class ZkBookieRackAffinityMapping extends AbstractDNSToSwitchMapping
-        implements ZooKeeperCacheListener<Map<String, Map<String, 
BookieInfo>>>, RackChangeNotifier {
+        implements ZooKeeperCacheListener<BookiesRackConfiguration>, 
RackChangeNotifier {
     private static final Logger LOG = 
LoggerFactory.getLogger(ZkBookieRackAffinityMapping.class);
 
     public static final String BOOKIE_INFO_ROOT_PATH = "/bookies";
 
-    private ZooKeeperDataCache<Map<String, Map<String, BookieInfo>>> 
bookieMappingCache = null;
+    private ZooKeeperDataCache<BookiesRackConfiguration> bookieMappingCache = 
null;
     private RackawareEnsemblePlacementPolicyImpl rackawarePolicy = null;
 
-    public static final ObjectMapper jsonMapper = ObjectMapperFactory.create();
-    public static final TypeReference<Map<String, Map<String, BookieInfo>>> 
typeRef = new TypeReference<Map<String, Map<String, BookieInfo>>>() {
-    };
+    private static final ObjectMapper jsonMapper = 
ObjectMapperFactory.create();
+
+    private volatile BookiesRackConfiguration racksWithHost = new 
BookiesRackConfiguration();
 
     @Override
     public void setConf(Configuration conf) {
         super.setConf(conf);
         bookieMappingCache = getAndSetZkCache(conf);
+
+        try {
+            racksWithHost = 
bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).orElse(new 
BookiesRackConfiguration());
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
     }
 
-    private ZooKeeperDataCache<Map<String, Map<String, BookieInfo>>> 
getAndSetZkCache(Configuration conf) {
+    private ZooKeeperDataCache<BookiesRackConfiguration> 
getAndSetZkCache(Configuration conf) {
         ZooKeeperCache zkCache = null;
         if (conf.getProperty(ZooKeeperCache.ZK_CACHE_INSTANCE) != null) {
             zkCache = (ZooKeeperCache) 
conf.getProperty(ZooKeeperCache.ZK_CACHE_INSTANCE);
@@ -86,7 +93,7 @@ public void setConf(Configuration conf) {
                 LOG.error("No zk configurations available");
             }
         }
-        ZooKeeperDataCache<Map<String, Map<String, BookieInfo>>> zkDataCache = 
getZkBookieRackMappingCache(
+        ZooKeeperDataCache<BookiesRackConfiguration> zkDataCache = 
getZkBookieRackMappingCache(
                 zkCache);
         if (zkDataCache != null) {
             zkDataCache.registerListener(this);
@@ -94,19 +101,37 @@ public void setConf(Configuration conf) {
         return zkDataCache;
     }
 
-    public static ZooKeeperDataCache<Map<String, Map<String, BookieInfo>>> 
getZkBookieRackMappingCache(
+    private ZooKeeperDataCache<BookiesRackConfiguration> 
getZkBookieRackMappingCache(
             ZooKeeperCache zkCache) {
-        ZooKeeperDataCache<Map<String, Map<String, BookieInfo>>> zkDataCache = 
new ZooKeeperDataCache<Map<String, Map<String, BookieInfo>>>(
+        ZooKeeperDataCache<BookiesRackConfiguration> zkDataCache = new 
ZooKeeperDataCache<BookiesRackConfiguration>(
                 zkCache) {
 
             @Override
-            public Map<String, Map<String, BookieInfo>> deserialize(String 
key, byte[] content)
+            public BookiesRackConfiguration deserialize(String key, byte[] 
content)
                     throws Exception {
                 LOG.info("Reloading the bookie rack affinity mapping cache.");
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Loading the bookie mappings with bookie info 
data: {}", new String(content));
                 }
-                return jsonMapper.readValue(content, typeRef);
+                BookiesRackConfiguration racks = jsonMapper.readValue(content, 
BookiesRackConfiguration.class);
+
+                // In config z-node, the bookies are added in the `ip:port` 
notation, while BK will ask
+                // for just the IP/hostname when trying to get the rack for a 
bookie.
+                // To work around this issue, we also insert in the map the 
bookie ip/hostname with same rack-info
+                BookiesRackConfiguration racksWithHost = new 
BookiesRackConfiguration();
+                racks.forEach((group, bookies) -> {
+                    bookies.forEach((addr, bi) -> {
+                        try {
+                            BookieSocketAddress bsa = new 
BookieSocketAddress(addr);
+                            racksWithHost.updateBookie(group, 
bsa.getHostName(), bi);
+                        } catch (UnknownHostException e) {
+                            throw new RuntimeException(e);
+                        }
+                    });
+                });
+
+                ZkBookieRackAffinityMapping.this.racksWithHost = racksWithHost;
+                return racks;
             }
 
         };
@@ -123,27 +148,27 @@ public void setConf(Configuration conf) {
     }
 
     private String getRack(String bookieAddress) {
-        String rack = NetworkTopology.DEFAULT_RACK;
         try {
-            if (bookieMappingCache != null) {
-                Map<String, Map<String, BookieInfo>> allGroupsBookieMapping = 
bookieMappingCache
-                        .get(BOOKIE_INFO_ROOT_PATH)
-                        .orElseThrow(() -> new 
KeeperException.NoNodeException(BOOKIE_INFO_ROOT_PATH));
-                for (Map<String, BookieInfo> bookieMapping : 
allGroupsBookieMapping.values()) {
-                    BookieInfo bookieInfo = bookieMapping.get(bookieAddress);
-                    if (bookieInfo != null) {
-                        rack = bookieInfo.getRack();
-                        if (!rack.startsWith("/")) {
-                            rack = "/" + rack;
-                        }
-                        break;
-                    }
-                }
+            // Trigger load of z-node in case it didn't exist
+            Optional<BookiesRackConfiguration> racks = 
bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH);
+            if (!racks.isPresent()) {
+                return NetworkTopology.DEFAULT_RACK;
             }
         } catch (Exception e) {
-            LOG.warn("Error getting bookie info from zk, using default rack 
node {}: {}", rack, e.getMessage());
+            throw new RuntimeException(e);
+        }
+
+
+        Optional<BookieInfo> bi = racksWithHost.getBookie(bookieAddress);
+        if (bi.isPresent()) {
+            String rack = bi.get().getRack();
+            if (!rack.startsWith("/")) {
+                rack = "/" + rack;
+            }
+            return rack;
+        } else {
+            return NetworkTopology.DEFAULT_RACK;
         }
-        return rack;
     }
 
     @Override
@@ -157,7 +182,7 @@ public void reloadCachedMappings() {
     }
 
     @Override
-    public void onUpdate(String path, Map<String, Map<String, BookieInfo>> 
data, Stat stat) {
+    public void onUpdate(String path, BookiesRackConfiguration data, Stat 
stat) {
         if (rackawarePolicy != null) {
             LOG.info("Bookie rack info updated to {}. Notifying rackaware 
policy.", data.toString());
             List<BookieSocketAddress> bookieAddressList = new ArrayList<>();
diff --git 
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java
 
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java
index 10ba67f5de..877378f7b7 100644
--- 
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java
+++ 
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java
@@ -35,6 +35,7 @@
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
 import org.apache.commons.configuration.Configuration;
+import org.apache.pulsar.common.policies.data.BookieInfo;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.zookeeper.ZooKeeperCache.Deserializer;
 import org.apache.zookeeper.KeeperException;
diff --git 
a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMappingTest.java
 
b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMappingTest.java
index 78fed62508..258e818ae4 100644
--- 
a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMappingTest.java
+++ 
b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMappingTest.java
@@ -30,6 +30,7 @@
 import org.apache.bookkeeper.test.PortManager;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
+import org.apache.pulsar.common.policies.data.BookieInfo;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.ZooDefs;
@@ -110,8 +111,7 @@ public void testNoBookieInfo() throws Exception {
         bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new 
ZooKeeperCache(localZkc) {
         });
         mapping.setConf(bkClientConf);
-        List<String> racks = mapping
-                .resolve(Lists.newArrayList(BOOKIE1.toString(), 
BOOKIE2.toString(), BOOKIE3.toString()));
+        List<String> racks = mapping.resolve(Lists.newArrayList("127.0.0.1", 
"127.0.0.2", "127.0.0.3"));
         assertEquals(racks.get(0), NetworkTopology.DEFAULT_RACK);
         assertEquals(racks.get(1), NetworkTopology.DEFAULT_RACK);
         assertEquals(racks.get(2), NetworkTopology.DEFAULT_RACK);
@@ -119,13 +119,8 @@ public void testNoBookieInfo() throws Exception {
         Map<String, Map<BookieSocketAddress, BookieInfo>> bookieMapping = new 
HashMap<>();
         Map<BookieSocketAddress, BookieInfo> mainBookieGroup = new HashMap<>();
 
-        BookieInfo bookieInfo0 = new BookieInfo();
-        bookieInfo0.setRack("/rack0");
-        mainBookieGroup.put(BOOKIE1, bookieInfo0);
-
-        BookieInfo bookieInfo1 = new BookieInfo();
-        bookieInfo1.setRack("/rack1");
-        mainBookieGroup.put(BOOKIE2, bookieInfo1);
+        mainBookieGroup.put(BOOKIE1, new BookieInfo("/rack0", null));
+        mainBookieGroup.put(BOOKIE2, new BookieInfo("/rack1", null));
 
         bookieMapping.put("group1", mainBookieGroup);
 
@@ -134,7 +129,7 @@ public void testNoBookieInfo() throws Exception {
 
         Thread.sleep(100);
 
-        racks = mapping.resolve(Lists.newArrayList(BOOKIE1.toString(), 
BOOKIE2.toString(), BOOKIE3.toString()));
+        racks = mapping.resolve(Lists.newArrayList("127.0.0.1", "127.0.0.2", 
"127.0.0.3"));
         assertEquals(racks.get(0), "/rack0");
         assertEquals(racks.get(1), "/rack1");
         assertEquals(racks.get(2), NetworkTopology.DEFAULT_RACK);
@@ -147,13 +142,8 @@ public void testBookieInfoChange() throws Exception {
         Map<String, Map<BookieSocketAddress, BookieInfo>> bookieMapping = new 
HashMap<>();
         Map<BookieSocketAddress, BookieInfo> mainBookieGroup = new HashMap<>();
 
-        BookieInfo bookieInfo0 = new BookieInfo();
-        bookieInfo0.setRack("rack0");
-        mainBookieGroup.put(BOOKIE1, bookieInfo0);
-
-        BookieInfo bookieInfo1 = new BookieInfo();
-        bookieInfo1.setRack("rack1");
-        mainBookieGroup.put(BOOKIE2, bookieInfo1);
+        mainBookieGroup.put(BOOKIE1, new BookieInfo("rack0", null));
+        mainBookieGroup.put(BOOKIE2, new BookieInfo("rack1", null));
 
         bookieMapping.put("group1", mainBookieGroup);
 
@@ -173,9 +163,7 @@ public void testBookieInfoChange() throws Exception {
 
         // add info for BOOKIE3 and check if the mapping picks up the change
         Map<BookieSocketAddress, BookieInfo> secondaryBookieGroup = new 
HashMap<>();
-        BookieInfo bookieInfo2 = new BookieInfo();
-        bookieInfo2.setRack("rack0");
-        secondaryBookieGroup.put(BOOKIE3, bookieInfo2);
+        secondaryBookieGroup.put(BOOKIE3, new BookieInfo("rack0", null));
 
         bookieMapping.put("group2", secondaryBookieGroup);
         localZkc.setData(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, 
jsonMapper.writeValueAsBytes(bookieMapping),
@@ -184,16 +172,16 @@ public void testBookieInfoChange() throws Exception {
         // wait for the zk to notify and update the mappings
         Thread.sleep(100);
 
-        racks = mapping.resolve(Lists.newArrayList(BOOKIE1.toString(), 
BOOKIE2.toString(), BOOKIE3.toString()));
+        racks = mapping.resolve(Lists.newArrayList("127.0.0.1", "127.0.0.2", 
"127.0.0.3"));
         assertEquals(racks.get(0), "/rack0");
         assertEquals(racks.get(1), "/rack1");
         assertEquals(racks.get(2), "/rack0");
 
-        localZkc.delete(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, -1);
+        localZkc.setData(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, 
"{}".getBytes(), -1);
 
         Thread.sleep(100);
 
-        racks = mapping.resolve(Lists.newArrayList(BOOKIE1.toString(), 
BOOKIE2.toString(), BOOKIE3.toString()));
+        racks = mapping.resolve(Lists.newArrayList("127.0.0.1", "127.0.0.2", 
"127.0.0.3"));
         assertEquals(racks.get(0), NetworkTopology.DEFAULT_RACK);
         assertEquals(racks.get(1), NetworkTopology.DEFAULT_RACK);
         assertEquals(racks.get(2), NetworkTopology.DEFAULT_RACK);
diff --git 
a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicyTest.java
 
b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicyTest.java
index 4a437055c5..eef8e34da3 100644
--- 
a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicyTest.java
+++ 
b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicyTest.java
@@ -39,6 +39,7 @@
 import org.apache.bookkeeper.test.PortManager;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
+import org.apache.pulsar.common.policies.data.BookieInfo;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.ZooDefs;
@@ -96,18 +97,11 @@ public void testBasic() throws Exception {
         Map<String, Map<String, BookieInfo>> bookieMapping = new HashMap<>();
         Map<String, BookieInfo> mainBookieGroup = new HashMap<>();
 
-        BookieInfo bookieInfo0 = new BookieInfo();
-        bookieInfo0.setRack("rack0");
-        mainBookieGroup.put(BOOKIE1, bookieInfo0);
-
-        BookieInfo bookieInfo1 = new BookieInfo();
-        bookieInfo1.setRack("rack1");
-        mainBookieGroup.put(BOOKIE2, bookieInfo1);
+        mainBookieGroup.put(BOOKIE1, new BookieInfo("rack0", null));
+        mainBookieGroup.put(BOOKIE2, new BookieInfo("rack1", null));
 
         Map<String, BookieInfo> secondaryBookieGroup = new HashMap<>();
-        BookieInfo bookieInfo2 = new BookieInfo();
-        bookieInfo2.setRack("rack0");
-        secondaryBookieGroup.put(BOOKIE3, bookieInfo2);
+        secondaryBookieGroup.put(BOOKIE3, new BookieInfo("rack0", null));
 
         bookieMapping.put("group1", mainBookieGroup);
         bookieMapping.put("group2", secondaryBookieGroup);
@@ -146,9 +140,7 @@ public void testBasic() throws Exception {
         assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE4)));
         assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE2)));
 
-        BookieInfo bookieInfo3 = new BookieInfo();
-        bookieInfo3.setRack("rack0");
-        secondaryBookieGroup.put(BOOKIE4, bookieInfo3);
+        secondaryBookieGroup.put(BOOKIE4, new BookieInfo("rack0", null));
         bookieMapping.put("group2", secondaryBookieGroup);
 
         localZkc.setData(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, 
jsonMapper.writeValueAsBytes(bookieMapping),
@@ -229,21 +221,10 @@ public void testBookieInfoChange() throws Exception {
         Map<String, BookieInfo> mainBookieGroup = new HashMap<>();
         Map<String, BookieInfo> secondaryBookieGroup = new HashMap<>();
 
-        BookieInfo bookieInfo0 = new BookieInfo();
-        bookieInfo0.setRack("rack0");
-        mainBookieGroup.put(BOOKIE1, bookieInfo0);
-
-        BookieInfo bookieInfo1 = new BookieInfo();
-        bookieInfo1.setRack("rack1");
-        mainBookieGroup.put(BOOKIE2, bookieInfo1);
-
-        BookieInfo bookieInfo2 = new BookieInfo();
-        bookieInfo2.setRack("rack0");
-        secondaryBookieGroup.put(BOOKIE3, bookieInfo2);
-
-        BookieInfo bookieInfo3 = new BookieInfo();
-        bookieInfo3.setRack("rack2");
-        secondaryBookieGroup.put(BOOKIE4, bookieInfo3);
+        mainBookieGroup.put(BOOKIE1, new BookieInfo("rack0", null));
+        mainBookieGroup.put(BOOKIE2, new BookieInfo("rack1", null));
+        secondaryBookieGroup.put(BOOKIE3, new BookieInfo("rack0", null));
+        secondaryBookieGroup.put(BOOKIE4, new BookieInfo("rack2", null));
 
         bookieMapping.put("group1", mainBookieGroup);
         bookieMapping.put("group2", secondaryBookieGroup);
@@ -272,7 +253,7 @@ public void testBookieInfoChange() throws Exception {
             // ok
         }
 
-        mainBookieGroup.put(BOOKIE3, bookieInfo2);
+        mainBookieGroup.put(BOOKIE3, new BookieInfo("rack1", null));
         secondaryBookieGroup.remove(BOOKIE3);
         bookieMapping.put("group1", mainBookieGroup);
         bookieMapping.put("group2", secondaryBookieGroup);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to