sijie closed pull request #2005: Added REST/CLI tools to manage bookies rack
placement config
URL: https://github.com/apache/incubator-pulsar/pull/2005
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
index 3e62c9aa46..e35daef3e2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
@@ -38,11 +38,13 @@
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.zookeeper.ZkBookieRackAffinityMapping;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory.SessionType;
import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
@@ -136,6 +138,15 @@ public static void main(String[] args) throws Exception {
throw new IOException("Failed to initialize BookKeeper metadata");
}
+ if (localZk.exists(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
false) == null) {
+ try {
+
localZk.create(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
"{}".getBytes(), Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ } catch (NodeExistsException e) {
+ // Ignore
+ }
+ }
+
localZk.create("/managed-ledgers", new byte[0],
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
localZk.create("/namespace", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Bookies.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Bookies.java
new file mode 100644
index 0000000000..f15dd57ecf
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Bookies.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin.v2;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+
+import java.util.Map.Entry;
+import java.util.Optional;
+
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response.Status;
+
+import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.common.policies.data.BookieInfo;
+import org.apache.pulsar.common.policies.data.BookiesRackConfiguration;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.zookeeper.ZkBookieRackAffinityMapping;
+import org.apache.pulsar.zookeeper.ZooKeeperCache.Deserializer;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Path("/bookies")
+@Api(value = "/bookies", description = "Configure bookies rack placement",
tags = "bookies")
+@Produces(MediaType.APPLICATION_JSON)
+public class Bookies extends AdminResource {
+
+ @GET
+ @Path("/racks-info")
+ @ApiOperation(value = "Gets the rack placement information for all the
bookies in the cluster", response = BookiesRackConfiguration.class)
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission") })
+ public BookiesRackConfiguration getBookiesRackInfo() throws Exception {
+ validateSuperUserAccess();
+
+ return
localZkCache().getData(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
+ new Deserializer<BookiesRackConfiguration>() {
+
+ @Override
+ public BookiesRackConfiguration deserialize(String key,
byte[] content) throws Exception {
+ return
ObjectMapperFactory.getThreadLocal().readValue(content,
BookiesRackConfiguration.class);
+ }
+
+ }).orElse(new BookiesRackConfiguration());
+ }
+
+ @GET
+ @Path("/racks-info/{bookie}")
+ @ApiOperation(value = "Gets the rack placement information for a specific
bookie in the cluster", response = BookieInfo.class)
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission") })
+ public BookieInfo getBookieRackInfo(@PathParam("bookie") String
bookieAddress) throws Exception {
+ validateSuperUserAccess();
+
+ BookiesRackConfiguration racks = localZkCache()
+ .getData(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
(key, content) -> ObjectMapperFactory
+ .getThreadLocal().readValue(content,
BookiesRackConfiguration.class))
+ .orElse(new BookiesRackConfiguration());
+
+ return racks.getBookie(bookieAddress)
+ .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Bookie
address not found: " + bookieAddress));
+ }
+
+ @DELETE
+ @Path("/racks-info/{bookie}")
+ @ApiOperation(value = "Removed the rack placement information for a
specific bookie in the cluster")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission") })
+ public void deleteBookieRackInfo(@PathParam("bookie") String
bookieAddress) throws Exception {
+ validateSuperUserAccess();
+
+ BookiesRackConfiguration racks = localZkCache()
+ .getData(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
(key, content) -> ObjectMapperFactory
+ .getThreadLocal().readValue(content,
BookiesRackConfiguration.class))
+ .orElse(new BookiesRackConfiguration());
+
+ if (!racks.removeBookie(bookieAddress)) {
+ throw new RestException(Status.NOT_FOUND, "Bookie address not
found: " + bookieAddress);
+ }
+
+ log.info("Removed {} from rack mapping info", bookieAddress);
+ }
+
+ @POST
+ @Path("/racks-info/{bookie}")
+ @ApiOperation(value = "Updates the rack placement information for a
specific bookie in the cluster")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission") })
+ public void updateBookieRackInfo(@PathParam("bookie") String
bookieAddress, @QueryParam("group") String group,
+ BookieInfo bookieInfo) throws Exception {
+ validateSuperUserAccess();
+
+ if (group == null) {
+ throw new RestException(Status.PRECONDITION_FAILED, "Bookie
'group' parameters is missing");
+ }
+
+ Optional<Entry<BookiesRackConfiguration, Stat>> entry = localZkCache()
+ .getEntry(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
(key, content) -> ObjectMapperFactory
+ .getThreadLocal().readValue(content,
BookiesRackConfiguration.class));
+
+ if (entry.isPresent()) {
+ // Update the racks info
+ BookiesRackConfiguration racks = entry.get().getKey();
+ racks.updateBookie(group, bookieAddress, bookieInfo);
+
+
localZk().setData(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
jsonMapper().writeValueAsBytes(racks),
+ entry.get().getValue().getVersion());
+ log.info("Updated rack mapping info for {}", bookieAddress);
+ } else {
+ // Creates the z-node with racks info
+ BookiesRackConfiguration racks = new BookiesRackConfiguration();
+ racks.updateBookie(group, bookieAddress, bookieInfo);
+ zkCreate(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
jsonMapper().writeValueAsBytes(racks));
+ log.info("Created rack mapping info and added {}", bookieAddress);
+ }
+ }
+
+ private static final Logger log = LoggerFactory.getLogger(Bookies.class);
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
index 3126b7f599..173fb493be 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
@@ -71,11 +71,22 @@
private final int ZOOKEEPER_PORT = 12759;
protected final int BROKER_WEBSERVICE_PORT = 15782;
+ protected final int bkBasePort = 5001;
+ private final int numberOfBookies;
+
+ public BrokerBkEnsemblesTests() {
+ this(3);
+ }
+
+ public BrokerBkEnsemblesTests(int numberOfBookies) {
+ this.numberOfBookies = numberOfBookies;
+ }
+
@BeforeMethod
void setup() throws Exception {
try {
// start local bookie and zookeeper
- bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, 5001);
+ bkEnsemble = new LocalBookkeeperEnsemble(numberOfBookies,
ZOOKEEPER_PORT, 5001);
bkEnsemble.start();
// start pulsar service
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/RackAwareTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/RackAwareTest.java
new file mode 100644
index 0000000000..215d3503a8
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/RackAwareTest.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.testng.Assert.assertTrue;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.pulsar.common.policies.data.BookieInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class RackAwareTest extends BrokerBkEnsemblesTests {
+
+ private static final int NUM_BOOKIES = 6;
+ private final List<BookieServer> bookies = new ArrayList<>();
+
+ public RackAwareTest() {
+ // Start bookies manually
+ super(0);
+ }
+
+ @SuppressWarnings("deprecation")
+ @BeforeClass
+ void setup() throws Exception {
+ super.setup();
+
+ // Start bookies with specific racks
+ for (int i = 0; i < NUM_BOOKIES; i++) {
+ File bkDataDir = Files.createTempDirectory("bk" +
Integer.toString(i) + "test").toFile();
+ ServerConfiguration conf = new ServerConfiguration();
+
+ int bookiePort = PortManager.nextFreePort();
+ conf.setBookiePort(bookiePort);
+ conf.setZkServers("127.0.0.1:" +
this.bkEnsemble.getZkServer().getClientPort());
+ conf.setJournalDirName(bkDataDir.getPath());
+ conf.setLedgerDirNames(new String[] { bkDataDir.getPath() });
+ conf.setAllowLoopback(true);
+
+ // Use different advertised addresses for each bookie, so we can
place them in different
+ // racks.
+ // Eg: 1st bookie will be 10.0.0.1, 2nd 10.0.0.2 and so on
+ String addr = String.format("10.0.0.%d", i + 1);
+ conf.setAdvertisedAddress(addr);
+
+ BookieServer bs = new BookieServer(conf, NullStatsLogger.INSTANCE);
+
+ bs.start();
+ bookies.add(bs);
+ log.info("Local BK[{}] started (port: {}, adddress: {})", i,
bookiePort, addr);
+ }
+
+ }
+
+ @AfterClass
+ void shutdown() throws Exception {
+ super.shutdown();
+
+ for (BookieServer bs : bookies) {
+ bs.shutdown();
+ }
+
+ bookies.clear();
+ }
+
+ @Test
+ public void testPlacement() throws Exception {
+ for (int i = 0; i < NUM_BOOKIES; i++) {
+ String bookie = bookies.get(i).getLocalAddress().toString();
+
+ // Place bookie-1 in "rack-1" and the rest in "rack-2"
+ int rackId = i == 0 ? 1 : 2;
+ BookieInfo bi = new BookieInfo("rack-" + rackId, "bookie-" + (i +
1));
+ log.info("setting rack for bookie at {} -- {}", bookie, bi);
+ admin.bookies().updateBookieRackInfo(bookie, "default", bi);
+ }
+
+ // Make sure the racks cache gets updated through the ZK watch
+ Thread.sleep(1000);
+
+ BookKeeper bkc = this.pulsar.getBookKeeperClient();
+
+ // Create few ledgers and verify all of them should have a copy in the
first bookie
+ BookieSocketAddress fistBookie = bookies.get(0).getLocalAddress();
+ for (int i = 0; i < 100; i++) {
+ LedgerHandle lh = bkc.createLedger(2, 2, DigestType.DUMMY, new
byte[0]);
+ log.info("Ledger: {} -- Ensemble: {}", i,
lh.getLedgerMetadata().getEnsembleAt(0));
+
assertTrue(lh.getLedgerMetadata().getEnsembleAt(0).contains(fistBookie),
+ "first bookie in rack 0 not included in ensemble");
+ lh.close();
+ }
+ }
+
+ public void testCrashBrokerWithoutCursorLedgerLeak() throws Exception {
+ // Ignore test
+ }
+
+ public void testSkipCorruptDataLedger() throws Exception {
+ // Ignore test
+ }
+
+ private static final Logger log =
LoggerFactory.getLogger(RackAwareTest.class);
+}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Bookies.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Bookies.java
new file mode 100644
index 0000000000..171fc2f729
--- /dev/null
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Bookies.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.admin;
+
+import org.apache.pulsar.common.policies.data.BookieInfo;
+import org.apache.pulsar.common.policies.data.BookiesRackConfiguration;
+
+/**
+ * Admin interface for bookies rack placement management.
+ */
+public interface Bookies {
+
+ /**
+ * Gets the rack placement information for all the bookies in the cluster
+ */
+ BookiesRackConfiguration getBookiesRackInfo() throws PulsarAdminException;
+
+ /**
+ * Gets the rack placement information for a specific bookie in the cluster
+ */
+ BookieInfo getBookieRackInfo(String bookieAddress) throws
PulsarAdminException;
+
+ /**
+ * Remove rack placement information for a specific bookie in the cluster
+ */
+ void deleteBookieRackInfo(String bookieAddress) throws
PulsarAdminException;
+
+ /**
+ * Updates the rack placement information for a specific bookie in the
cluster
+ */
+ void updateBookieRackInfo(String bookieAddress, String group, BookieInfo
bookieInfo) throws PulsarAdminException;
+}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
index b2db7e2dde..48aea94289 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
@@ -32,6 +32,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.http.conn.ssl.DefaultHostnameVerifier;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import org.apache.pulsar.client.admin.internal.BookiesImpl;
import org.apache.pulsar.client.admin.internal.BrokerStatsImpl;
import org.apache.pulsar.client.admin.internal.BrokersImpl;
import org.apache.pulsar.client.admin.internal.ClustersImpl;
@@ -74,6 +75,7 @@
private final Tenants tenants;
private final Properties properties;
private final Namespaces namespaces;
+ private final Bookies bookies;
private final TopicsImpl topics;
private final NonPersistentTopics nonPersistentTopics;
private final ResourceQuotas resourceQuotas;
@@ -186,6 +188,7 @@ public PulsarAdmin(String serviceUrl,
ClientConfigurationData clientConfigData)
this.lookups = new LookupImpl(root, auth, useTls);
this.functions = new FunctionsImpl(root, auth);
this.schemas = new SchemasImpl(root, auth);
+ this.bookies = new BookiesImpl(root, auth);
}
/**
@@ -304,6 +307,13 @@ public Topics topics() {
return topics;
}
+ /**
+ * @return the bookies management object
+ */
+ public Bookies bookies() {
+ return bookies;
+ }
+
/**
* @return the persistentTopics management object
* @deprecated Since 2.0. See {@link #topics()}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BookiesImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BookiesImpl.java
new file mode 100644
index 0000000000..7680ce994b
--- /dev/null
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BookiesImpl.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.admin.internal;
+
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.MediaType;
+
+import org.apache.pulsar.client.admin.Bookies;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.common.policies.data.BookieInfo;
+import org.apache.pulsar.common.policies.data.BookiesRackConfiguration;
+import org.apache.pulsar.common.policies.data.ErrorData;
+
+public class BookiesImpl extends BaseResource implements Bookies {
+ private final WebTarget adminBookies;
+
+ public BookiesImpl(WebTarget web, Authentication auth) {
+ super(auth);
+ adminBookies = web.path("/admin/v2/bookies");
+ }
+
+ @Override
+ public BookiesRackConfiguration getBookiesRackInfo() throws
PulsarAdminException {
+ try {
+ return
request(adminBookies.path("racks-info")).get(BookiesRackConfiguration.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public BookieInfo getBookieRackInfo(String bookieAddress) throws
PulsarAdminException {
+ try {
+ return
request(adminBookies.path("racks-info").path(bookieAddress)).get(BookieInfo.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public void deleteBookieRackInfo(String bookieAddress) throws
PulsarAdminException {
+ try {
+
request(adminBookies.path("racks-info").path(bookieAddress)).delete(ErrorData.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public void updateBookieRackInfo(String bookieAddress, String group,
BookieInfo bookieInfo)
+ throws PulsarAdminException {
+ try {
+
request(adminBookies.path("racks-info").path(bookieAddress).queryParam("group",
group))
+ .post(Entity.entity(bookieInfo,
MediaType.APPLICATION_JSON), ErrorData.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+}
diff --git
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index a5b1581164..c1a69d5e67 100644
---
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -28,6 +28,7 @@
import java.util.EnumSet;
+import org.apache.pulsar.client.admin.Bookies;
import org.apache.pulsar.client.admin.BrokerStats;
import org.apache.pulsar.client.admin.Brokers;
import org.apache.pulsar.client.admin.Clusters;
@@ -43,6 +44,7 @@
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BacklogQuota.RetentionPolicy;
+import org.apache.pulsar.common.policies.data.BookieInfo;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.FailureDomain;
@@ -688,6 +690,27 @@ void nonPersistentTopics() throws Exception {
}
+ @Test
+ void bookies() throws Exception {
+ PulsarAdmin admin = Mockito.mock(PulsarAdmin.class);
+ Bookies mockBookies = mock(Bookies.class);
+ doReturn(mockBookies).when(admin).bookies();
+
+ CmdBookies bookies = new CmdBookies(admin);
+
+ bookies.run(split("racks-placement"));
+ verify(mockBookies).getBookiesRackInfo();
+
+ bookies.run(split("get-bookie-rack --bookie my-bookie:3181"));
+ verify(mockBookies).getBookieRackInfo("my-bookie:3181");
+
+ bookies.run(split("delete-bookie-rack --bookie my-bookie:3181"));
+ verify(mockBookies).deleteBookieRackInfo("my-bookie:3181");
+
+ bookies.run(split("set-bookie-rack --group my-group --bookie
my-bookie:3181 --rack rack-1 --hostname host-1"));
+ verify(mockBookies).updateBookieRackInfo("my-bookie:3181", "my-group",
new BookieInfo("rack-1", "host-1"));
+ }
+
String[] split(String s) {
return s.split(" ");
}
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBookies.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBookies.java
new file mode 100644
index 0000000000..13056b4001
--- /dev/null
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBookies.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.admin.cli;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.common.policies.data.BookieInfo;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+
+@Parameters(commandDescription = "Operations about bookies rack placement")
+public class CmdBookies extends CmdBase {
+
+ @Parameters(commandDescription = "Gets the rack placement information for
all the bookies in the cluster")
+ private class GetAll extends CliCommand {
+
+ @Override
+ void run() throws Exception {
+ print(admin.bookies().getBookiesRackInfo());
+ }
+ }
+
+ @Parameters(commandDescription = "Gets the rack placement information for
a specific bookie in the cluster")
+ private class GetBookie extends CliCommand {
+
+ @Parameter(names = { "-b", "--bookie" }, description = "bookie
address", required = true)
+ private String bookieAddress;
+
+ @Override
+ void run() throws Exception {
+ print(admin.bookies().getBookieRackInfo(bookieAddress));
+ }
+ }
+
+ @Parameters(commandDescription = "Remove rack placement information for a
specific bookie in the cluster")
+ private class RemoveBookie extends CliCommand {
+
+ @Parameter(names = { "-b", "--bookie" }, description = "bookie
address", required = true)
+ private String bookieAddress;
+
+ @Override
+ void run() throws Exception {
+ admin.bookies().deleteBookieRackInfo(bookieAddress);
+ }
+ }
+
+ @Parameters(commandDescription = "Updates the rack placement information
for a specific bookie in the cluster")
+ private class UpdateBookie extends CliCommand {
+ @Parameter(names = { "-g", "--group" }, description = "Bookie group
name", required = false)
+ private String group = "default";
+
+ @Parameter(names = { "-b", "--bookie" }, description = "Bookie
address", required = true)
+ private String bookieAddress;
+
+ @Parameter(names = { "-r", "--rack" }, description = "Bookie rack
name", required = true)
+ private String bookieRack;
+
+ @Parameter(names = { "--hostname" }, description = "Bookie host name",
required = false)
+ private String bookieHost;
+
+ @Override
+ void run() throws Exception {
+ admin.bookies().updateBookieRackInfo(bookieAddress, group, new
BookieInfo(bookieRack, bookieHost));
+ }
+ }
+
+ public CmdBookies(PulsarAdmin admin) {
+ super("bookies", admin);
+ jcommander.addCommand("racks-placement", new GetAll());
+ jcommander.addCommand("get-bookie-rack", new GetBookie());
+ jcommander.addCommand("delete-bookie-rack", new RemoveBookie());
+ jcommander.addCommand("set-bookie-rack", new UpdateBookie());
+ }
+}
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
index 27bac1153a..60cb84e6ab 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
@@ -90,6 +90,7 @@
commandMap.put("namespaces", CmdNamespaces.class);
commandMap.put("topics", CmdTopics.class);
commandMap.put("schemas", CmdSchemas.class);
+ commandMap.put("bookies", CmdBookies.class);
// Hidden deprecated "persistent" and "non-persistent" subcommands
commandMap.put("persistent", CmdPersistentTopics.class);
diff --git
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/BookieInfo.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BookieInfo.java
similarity index 73%
rename from
pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/BookieInfo.java
rename to
pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BookieInfo.java
index 2df4a50c78..a0d4a0e890 100644
---
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/BookieInfo.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BookieInfo.java
@@ -16,26 +16,16 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.zookeeper;
+package org.apache.pulsar.common.policies.data;
-public class BookieInfo {
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class BookieInfo {
private String rack;
private String hostname;
-
- public String getRack() {
- return rack;
- }
-
- public void setRack(String rack) {
- this.rack = rack;
- }
-
- public String getHostname() {
- return hostname;
- }
-
- public void setHostname(String hostname) {
- this.hostname = hostname;
- }
-}
\ No newline at end of file
+}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BookiesRackConfiguration.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BookiesRackConfiguration.java
new file mode 100644
index 0000000000..a2741b2763
--- /dev/null
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BookiesRackConfiguration.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.policies.data;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.TreeMap;
+
+public class BookiesRackConfiguration extends TreeMap<String, Map<String,
BookieInfo>> {
+
+ public boolean removeBookie(String address) {
+ for (Map<String, BookieInfo> m : values()) {
+ if (m.remove(address) != null ) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ public Optional<BookieInfo> getBookie(String address) {
+ for (Map<String, BookieInfo> m : values()) {
+ BookieInfo bi = m.get(address);
+ if (bi != null) {
+ return Optional.of(bi);
+ }
+ }
+ return Optional.empty();
+ }
+
+ public void updateBookie(String group, String address, BookieInfo
bookieInfo) {
+ checkNotNull(group);
+ checkNotNull(address);
+ checkNotNull(bookieInfo);
+
+ // Remove from any group first
+ removeBookie(address);
+ computeIfAbsent(group, key -> new TreeMap<>()).put(address,
bookieInfo);
+ }
+}
\ No newline at end of file
diff --git
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
index 9abca4caf3..94cd32582d 100644
---
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
+++
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
@@ -190,6 +190,11 @@ private void initializeZookeper() throws IOException {
if (zkc.exists("/ledgers/available/readonly", false) == null) {
zkc.create("/ledgers/available/readonly", new byte[0],
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
+ if (zkc.exists(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
false) == null) {
+ zkc.create(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
"{}".getBytes(), Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ }
+
// No need to create an entry for each requested bookie anymore as
the
// BookieServers will register themselves with ZooKeeper on
startup.
} catch (KeeperException e) {
diff --git
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMapping.java
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMapping.java
index 28b38e7f26..ff50d8b863 100644
---
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMapping.java
+++
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMapping.java
@@ -18,10 +18,13 @@
*/
package org.apache.pulsar.zookeeper;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import org.apache.bookkeeper.client.RackChangeNotifier;
import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl;
@@ -31,39 +34,43 @@
import org.apache.bookkeeper.net.NetworkTopology;
import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
import org.apache.commons.configuration.Configuration;
+import org.apache.pulsar.common.policies.data.BookieInfo;
+import org.apache.pulsar.common.policies.data.BookiesRackConfiguration;
import org.apache.pulsar.common.util.ObjectMapperFactory;
-import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
/**
* It provides the mapping of bookies to its rack from zookeeper.
*/
public class ZkBookieRackAffinityMapping extends AbstractDNSToSwitchMapping
- implements ZooKeeperCacheListener<Map<String, Map<String,
BookieInfo>>>, RackChangeNotifier {
+ implements ZooKeeperCacheListener<BookiesRackConfiguration>,
RackChangeNotifier {
private static final Logger LOG =
LoggerFactory.getLogger(ZkBookieRackAffinityMapping.class);
public static final String BOOKIE_INFO_ROOT_PATH = "/bookies";
- private ZooKeeperDataCache<Map<String, Map<String, BookieInfo>>>
bookieMappingCache = null;
+ private ZooKeeperDataCache<BookiesRackConfiguration> bookieMappingCache =
null;
private RackawareEnsemblePlacementPolicyImpl rackawarePolicy = null;
- public static final ObjectMapper jsonMapper = ObjectMapperFactory.create();
- public static final TypeReference<Map<String, Map<String, BookieInfo>>>
typeRef = new TypeReference<Map<String, Map<String, BookieInfo>>>() {
- };
+ private static final ObjectMapper jsonMapper =
ObjectMapperFactory.create();
+
+ private volatile BookiesRackConfiguration racksWithHost = new
BookiesRackConfiguration();
@Override
public void setConf(Configuration conf) {
super.setConf(conf);
bookieMappingCache = getAndSetZkCache(conf);
+
+ try {
+ racksWithHost =
bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).orElse(new
BookiesRackConfiguration());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
- private ZooKeeperDataCache<Map<String, Map<String, BookieInfo>>>
getAndSetZkCache(Configuration conf) {
+ private ZooKeeperDataCache<BookiesRackConfiguration>
getAndSetZkCache(Configuration conf) {
ZooKeeperCache zkCache = null;
if (conf.getProperty(ZooKeeperCache.ZK_CACHE_INSTANCE) != null) {
zkCache = (ZooKeeperCache)
conf.getProperty(ZooKeeperCache.ZK_CACHE_INSTANCE);
@@ -86,7 +93,7 @@ public void setConf(Configuration conf) {
LOG.error("No zk configurations available");
}
}
- ZooKeeperDataCache<Map<String, Map<String, BookieInfo>>> zkDataCache =
getZkBookieRackMappingCache(
+ ZooKeeperDataCache<BookiesRackConfiguration> zkDataCache =
getZkBookieRackMappingCache(
zkCache);
if (zkDataCache != null) {
zkDataCache.registerListener(this);
@@ -94,19 +101,37 @@ public void setConf(Configuration conf) {
return zkDataCache;
}
- public static ZooKeeperDataCache<Map<String, Map<String, BookieInfo>>>
getZkBookieRackMappingCache(
+ private ZooKeeperDataCache<BookiesRackConfiguration>
getZkBookieRackMappingCache(
ZooKeeperCache zkCache) {
- ZooKeeperDataCache<Map<String, Map<String, BookieInfo>>> zkDataCache =
new ZooKeeperDataCache<Map<String, Map<String, BookieInfo>>>(
+ ZooKeeperDataCache<BookiesRackConfiguration> zkDataCache = new
ZooKeeperDataCache<BookiesRackConfiguration>(
zkCache) {
@Override
- public Map<String, Map<String, BookieInfo>> deserialize(String
key, byte[] content)
+ public BookiesRackConfiguration deserialize(String key, byte[]
content)
throws Exception {
LOG.info("Reloading the bookie rack affinity mapping cache.");
if (LOG.isDebugEnabled()) {
LOG.debug("Loading the bookie mappings with bookie info
data: {}", new String(content));
}
- return jsonMapper.readValue(content, typeRef);
+ BookiesRackConfiguration racks = jsonMapper.readValue(content,
BookiesRackConfiguration.class);
+
+ // In config z-node, the bookies are added in the `ip:port`
notation, while BK will ask
+ // for just the IP/hostname when trying to get the rack for a
bookie.
+ // To work around this issue, we also insert in the map the
bookie ip/hostname with same rack-info
+ BookiesRackConfiguration racksWithHost = new
BookiesRackConfiguration();
+ racks.forEach((group, bookies) -> {
+ bookies.forEach((addr, bi) -> {
+ try {
+ BookieSocketAddress bsa = new
BookieSocketAddress(addr);
+ racksWithHost.updateBookie(group,
bsa.getHostName(), bi);
+ } catch (UnknownHostException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ });
+
+ ZkBookieRackAffinityMapping.this.racksWithHost = racksWithHost;
+ return racks;
}
};
@@ -123,27 +148,27 @@ public void setConf(Configuration conf) {
}
private String getRack(String bookieAddress) {
- String rack = NetworkTopology.DEFAULT_RACK;
try {
- if (bookieMappingCache != null) {
- Map<String, Map<String, BookieInfo>> allGroupsBookieMapping =
bookieMappingCache
- .get(BOOKIE_INFO_ROOT_PATH)
- .orElseThrow(() -> new
KeeperException.NoNodeException(BOOKIE_INFO_ROOT_PATH));
- for (Map<String, BookieInfo> bookieMapping :
allGroupsBookieMapping.values()) {
- BookieInfo bookieInfo = bookieMapping.get(bookieAddress);
- if (bookieInfo != null) {
- rack = bookieInfo.getRack();
- if (!rack.startsWith("/")) {
- rack = "/" + rack;
- }
- break;
- }
- }
+ // Trigger load of z-node in case it didn't exist
+ Optional<BookiesRackConfiguration> racks =
bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH);
+ if (!racks.isPresent()) {
+ return NetworkTopology.DEFAULT_RACK;
}
} catch (Exception e) {
- LOG.warn("Error getting bookie info from zk, using default rack
node {}: {}", rack, e.getMessage());
+ throw new RuntimeException(e);
+ }
+
+
+ Optional<BookieInfo> bi = racksWithHost.getBookie(bookieAddress);
+ if (bi.isPresent()) {
+ String rack = bi.get().getRack();
+ if (!rack.startsWith("/")) {
+ rack = "/" + rack;
+ }
+ return rack;
+ } else {
+ return NetworkTopology.DEFAULT_RACK;
}
- return rack;
}
@Override
@@ -157,7 +182,7 @@ public void reloadCachedMappings() {
}
@Override
- public void onUpdate(String path, Map<String, Map<String, BookieInfo>>
data, Stat stat) {
+ public void onUpdate(String path, BookiesRackConfiguration data, Stat
stat) {
if (rackawarePolicy != null) {
LOG.info("Bookie rack info updated to {}. Notifying rackaware
policy.", data.toString());
List<BookieSocketAddress> bookieAddressList = new ArrayList<>();
diff --git
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java
index 10ba67f5de..877378f7b7 100644
---
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java
+++
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java
@@ -35,6 +35,7 @@
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
import org.apache.commons.configuration.Configuration;
+import org.apache.pulsar.common.policies.data.BookieInfo;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.zookeeper.ZooKeeperCache.Deserializer;
import org.apache.zookeeper.KeeperException;
diff --git
a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMappingTest.java
b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMappingTest.java
index 78fed62508..258e818ae4 100644
---
a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMappingTest.java
+++
b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMappingTest.java
@@ -30,6 +30,7 @@
import org.apache.bookkeeper.test.PortManager;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
+import org.apache.pulsar.common.policies.data.BookieInfo;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
@@ -110,8 +111,7 @@ public void testNoBookieInfo() throws Exception {
bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new
ZooKeeperCache(localZkc) {
});
mapping.setConf(bkClientConf);
- List<String> racks = mapping
- .resolve(Lists.newArrayList(BOOKIE1.toString(),
BOOKIE2.toString(), BOOKIE3.toString()));
+ List<String> racks = mapping.resolve(Lists.newArrayList("127.0.0.1",
"127.0.0.2", "127.0.0.3"));
assertEquals(racks.get(0), NetworkTopology.DEFAULT_RACK);
assertEquals(racks.get(1), NetworkTopology.DEFAULT_RACK);
assertEquals(racks.get(2), NetworkTopology.DEFAULT_RACK);
@@ -119,13 +119,8 @@ public void testNoBookieInfo() throws Exception {
Map<String, Map<BookieSocketAddress, BookieInfo>> bookieMapping = new
HashMap<>();
Map<BookieSocketAddress, BookieInfo> mainBookieGroup = new HashMap<>();
- BookieInfo bookieInfo0 = new BookieInfo();
- bookieInfo0.setRack("/rack0");
- mainBookieGroup.put(BOOKIE1, bookieInfo0);
-
- BookieInfo bookieInfo1 = new BookieInfo();
- bookieInfo1.setRack("/rack1");
- mainBookieGroup.put(BOOKIE2, bookieInfo1);
+ mainBookieGroup.put(BOOKIE1, new BookieInfo("/rack0", null));
+ mainBookieGroup.put(BOOKIE2, new BookieInfo("/rack1", null));
bookieMapping.put("group1", mainBookieGroup);
@@ -134,7 +129,7 @@ public void testNoBookieInfo() throws Exception {
Thread.sleep(100);
- racks = mapping.resolve(Lists.newArrayList(BOOKIE1.toString(),
BOOKIE2.toString(), BOOKIE3.toString()));
+ racks = mapping.resolve(Lists.newArrayList("127.0.0.1", "127.0.0.2",
"127.0.0.3"));
assertEquals(racks.get(0), "/rack0");
assertEquals(racks.get(1), "/rack1");
assertEquals(racks.get(2), NetworkTopology.DEFAULT_RACK);
@@ -147,13 +142,8 @@ public void testBookieInfoChange() throws Exception {
Map<String, Map<BookieSocketAddress, BookieInfo>> bookieMapping = new
HashMap<>();
Map<BookieSocketAddress, BookieInfo> mainBookieGroup = new HashMap<>();
- BookieInfo bookieInfo0 = new BookieInfo();
- bookieInfo0.setRack("rack0");
- mainBookieGroup.put(BOOKIE1, bookieInfo0);
-
- BookieInfo bookieInfo1 = new BookieInfo();
- bookieInfo1.setRack("rack1");
- mainBookieGroup.put(BOOKIE2, bookieInfo1);
+ mainBookieGroup.put(BOOKIE1, new BookieInfo("rack0", null));
+ mainBookieGroup.put(BOOKIE2, new BookieInfo("rack1", null));
bookieMapping.put("group1", mainBookieGroup);
@@ -173,9 +163,7 @@ public void testBookieInfoChange() throws Exception {
// add info for BOOKIE3 and check if the mapping picks up the change
Map<BookieSocketAddress, BookieInfo> secondaryBookieGroup = new
HashMap<>();
- BookieInfo bookieInfo2 = new BookieInfo();
- bookieInfo2.setRack("rack0");
- secondaryBookieGroup.put(BOOKIE3, bookieInfo2);
+ secondaryBookieGroup.put(BOOKIE3, new BookieInfo("rack0", null));
bookieMapping.put("group2", secondaryBookieGroup);
localZkc.setData(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
jsonMapper.writeValueAsBytes(bookieMapping),
@@ -184,16 +172,16 @@ public void testBookieInfoChange() throws Exception {
// wait for the zk to notify and update the mappings
Thread.sleep(100);
- racks = mapping.resolve(Lists.newArrayList(BOOKIE1.toString(),
BOOKIE2.toString(), BOOKIE3.toString()));
+ racks = mapping.resolve(Lists.newArrayList("127.0.0.1", "127.0.0.2",
"127.0.0.3"));
assertEquals(racks.get(0), "/rack0");
assertEquals(racks.get(1), "/rack1");
assertEquals(racks.get(2), "/rack0");
- localZkc.delete(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, -1);
+ localZkc.setData(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
"{}".getBytes(), -1);
Thread.sleep(100);
- racks = mapping.resolve(Lists.newArrayList(BOOKIE1.toString(),
BOOKIE2.toString(), BOOKIE3.toString()));
+ racks = mapping.resolve(Lists.newArrayList("127.0.0.1", "127.0.0.2",
"127.0.0.3"));
assertEquals(racks.get(0), NetworkTopology.DEFAULT_RACK);
assertEquals(racks.get(1), NetworkTopology.DEFAULT_RACK);
assertEquals(racks.get(2), NetworkTopology.DEFAULT_RACK);
diff --git
a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicyTest.java
b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicyTest.java
index 4a437055c5..eef8e34da3 100644
---
a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicyTest.java
+++
b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicyTest.java
@@ -39,6 +39,7 @@
import org.apache.bookkeeper.test.PortManager;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
+import org.apache.pulsar.common.policies.data.BookieInfo;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
@@ -96,18 +97,11 @@ public void testBasic() throws Exception {
Map<String, Map<String, BookieInfo>> bookieMapping = new HashMap<>();
Map<String, BookieInfo> mainBookieGroup = new HashMap<>();
- BookieInfo bookieInfo0 = new BookieInfo();
- bookieInfo0.setRack("rack0");
- mainBookieGroup.put(BOOKIE1, bookieInfo0);
-
- BookieInfo bookieInfo1 = new BookieInfo();
- bookieInfo1.setRack("rack1");
- mainBookieGroup.put(BOOKIE2, bookieInfo1);
+ mainBookieGroup.put(BOOKIE1, new BookieInfo("rack0", null));
+ mainBookieGroup.put(BOOKIE2, new BookieInfo("rack1", null));
Map<String, BookieInfo> secondaryBookieGroup = new HashMap<>();
- BookieInfo bookieInfo2 = new BookieInfo();
- bookieInfo2.setRack("rack0");
- secondaryBookieGroup.put(BOOKIE3, bookieInfo2);
+ secondaryBookieGroup.put(BOOKIE3, new BookieInfo("rack0", null));
bookieMapping.put("group1", mainBookieGroup);
bookieMapping.put("group2", secondaryBookieGroup);
@@ -146,9 +140,7 @@ public void testBasic() throws Exception {
assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE4)));
assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE2)));
- BookieInfo bookieInfo3 = new BookieInfo();
- bookieInfo3.setRack("rack0");
- secondaryBookieGroup.put(BOOKIE4, bookieInfo3);
+ secondaryBookieGroup.put(BOOKIE4, new BookieInfo("rack0", null));
bookieMapping.put("group2", secondaryBookieGroup);
localZkc.setData(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
jsonMapper.writeValueAsBytes(bookieMapping),
@@ -229,21 +221,10 @@ public void testBookieInfoChange() throws Exception {
Map<String, BookieInfo> mainBookieGroup = new HashMap<>();
Map<String, BookieInfo> secondaryBookieGroup = new HashMap<>();
- BookieInfo bookieInfo0 = new BookieInfo();
- bookieInfo0.setRack("rack0");
- mainBookieGroup.put(BOOKIE1, bookieInfo0);
-
- BookieInfo bookieInfo1 = new BookieInfo();
- bookieInfo1.setRack("rack1");
- mainBookieGroup.put(BOOKIE2, bookieInfo1);
-
- BookieInfo bookieInfo2 = new BookieInfo();
- bookieInfo2.setRack("rack0");
- secondaryBookieGroup.put(BOOKIE3, bookieInfo2);
-
- BookieInfo bookieInfo3 = new BookieInfo();
- bookieInfo3.setRack("rack2");
- secondaryBookieGroup.put(BOOKIE4, bookieInfo3);
+ mainBookieGroup.put(BOOKIE1, new BookieInfo("rack0", null));
+ mainBookieGroup.put(BOOKIE2, new BookieInfo("rack1", null));
+ secondaryBookieGroup.put(BOOKIE3, new BookieInfo("rack0", null));
+ secondaryBookieGroup.put(BOOKIE4, new BookieInfo("rack2", null));
bookieMapping.put("group1", mainBookieGroup);
bookieMapping.put("group2", secondaryBookieGroup);
@@ -272,7 +253,7 @@ public void testBookieInfoChange() throws Exception {
// ok
}
- mainBookieGroup.put(BOOKIE3, bookieInfo2);
+ mainBookieGroup.put(BOOKIE3, new BookieInfo("rack1", null));
secondaryBookieGroup.remove(BOOKIE3);
bookieMapping.put("group1", mainBookieGroup);
bookieMapping.put("group2", secondaryBookieGroup);
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services