This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 4574ba0233 Fix RegionAwareEnsemblePlacementPolicy update rack info
problem. (#3666)
4574ba0233 is described below
commit 4574ba02333308ad648c9b78963a381ad83ea564
Author: Yan Zhao <[email protected]>
AuthorDate: Wed Dec 7 15:50:09 2022 +0800
Fix RegionAwareEnsemblePlacementPolicy update rack info problem. (#3666)
---
.../client/RegionAwareEnsemblePlacementPolicy.java | 85 ++++++++++--
.../TestRegionAwareEnsemblePlacementPolicy.java | 154 +++++++++++++++++++++
2 files changed, 225 insertions(+), 14 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
index 3f8a17f59f..43969b8fde 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
@@ -19,6 +19,7 @@ package org.apache.bookkeeper.client;
import io.netty.util.HashedWheelTimer;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -34,6 +35,7 @@ import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieNode;
import org.apache.bookkeeper.net.DNSToSwitchMapping;
import org.apache.bookkeeper.net.NetworkTopology;
+import org.apache.bookkeeper.net.NetworkTopologyImpl;
import org.apache.bookkeeper.net.Node;
import org.apache.bookkeeper.net.NodeBase;
import org.apache.bookkeeper.proto.BookieAddressResolver;
@@ -83,30 +85,34 @@ public class RegionAwareEnsemblePlacementPolicy extends
RackawareEnsemblePlaceme
address2Region = new ConcurrentHashMap<BookieId, String>();
}
+ protected String getLocalRegion(BookieNode node) {
+ if (null == node || null == node.getAddr()) {
+ return UNKNOWN_REGION;
+ }
+ return getRegion(node.getAddr());
+ }
+
protected String getRegion(BookieId addr) {
String region = address2Region.get(addr);
if (null == region) {
- String networkLocation = resolveNetworkLocation(addr);
- if
(NetworkTopology.DEFAULT_REGION_AND_RACK.equals(networkLocation)) {
- region = UNKNOWN_REGION;
- } else {
- String[] parts =
networkLocation.split(NodeBase.PATH_SEPARATOR_STR);
- if (parts.length <= 1) {
- region = UNKNOWN_REGION;
- } else {
- region = parts[1];
- }
- }
+ region = parseBookieRegion(addr);
address2Region.putIfAbsent(addr, region);
}
return region;
}
- protected String getLocalRegion(BookieNode node) {
- if (null == node || null == node.getAddr()) {
+ protected String parseBookieRegion(BookieId addr) {
+ String networkLocation = resolveNetworkLocation(addr);
+ if (NetworkTopology.DEFAULT_REGION_AND_RACK.equals(networkLocation)) {
return UNKNOWN_REGION;
+ } else {
+ String[] parts =
networkLocation.split(NodeBase.PATH_SEPARATOR_STR);
+ if (parts.length <= 1) {
+ return UNKNOWN_REGION;
+ } else {
+ return parts[1];
+ }
}
- return getRegion(node.getAddr());
}
@Override
@@ -161,6 +167,57 @@ public class RegionAwareEnsemblePlacementPolicy extends
RackawareEnsemblePlaceme
}
}
+ @Override
+ public void onBookieRackChange(List<BookieId> bookieAddressList) {
+ rwLock.writeLock().lock();
+ try {
+ bookieAddressList.forEach(bookieAddress -> {
+ try {
+ BookieNode node = knownBookies.get(bookieAddress);
+ if (node != null) {
+ // refresh the rack info if its a known bookie
+ BookieNode newNode = createBookieNode(bookieAddress);
+ if
(!newNode.getNetworkLocation().equals(node.getNetworkLocation())) {
+ topology.remove(node);
+ topology.add(newNode);
+ knownBookies.put(bookieAddress, newNode);
+ historyBookies.put(bookieAddress, newNode);
+ }
+ //Handle per region placement policy.
+ String oldRegion = getRegion(bookieAddress);
+ String newRegion =
parseBookieRegion(newNode.getAddr());
+ if (oldRegion.equals(newRegion)) {
+ TopologyAwareEnsemblePlacementPolicy
regionPlacement = perRegionPlacement.get(oldRegion);
+
regionPlacement.onBookieRackChange(Collections.singletonList(bookieAddress));
+ } else {
+ address2Region.put(bookieAddress, newRegion);
+ TopologyAwareEnsemblePlacementPolicy
oldRegionPlacement = perRegionPlacement.get(oldRegion);
+
oldRegionPlacement.handleBookiesThatLeft(Collections.singleton(bookieAddress));
+ TopologyAwareEnsemblePlacementPolicy
newRegionPlacement = perRegionPlacement.get(
+ newRegion);
+ if (newRegionPlacement == null) {
+ newRegionPlacement = new
RackawareEnsemblePlacementPolicy()
+ .initialize(dnsResolver, timer,
this.reorderReadsRandom,
+ this.stabilizePeriodSeconds,
this.reorderThresholdPendingRequests,
+ this.isWeighted,
this.maxWeightMultiple,
+
this.minNumRacksPerWriteQuorum, this.enforceMinNumRacksPerWriteQuorum,
+
this.ignoreLocalNodeInPlacementPolicy, statsLogger,
+ bookieAddressResolver)
+
.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+ perRegionPlacement.put(newRegion,
newRegionPlacement);
+ }
+
newRegionPlacement.handleBookiesThatJoined(Collections.singleton(bookieAddress));
+ }
+ }
+ } catch (IllegalArgumentException |
NetworkTopologyImpl.InvalidTopologyException e) {
+ LOG.error("Failed to update bookie rack info: {} ",
bookieAddress, e);
+ }
+ });
+ } finally {
+ rwLock.writeLock().unlock();
+ }
+ }
+
@Override
public RegionAwareEnsemblePlacementPolicy initialize(ClientConfiguration
conf,
Optional<DNSToSwitchMapping> optionalDnsResolver,
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
index fc4e02cae9..7fe5b83db4 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
@@ -28,6 +28,7 @@ import static
org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy.RE
import static
org.apache.bookkeeper.client.RoundRobinDistributionSchedule.writeSetFromValues;
import static
org.apache.bookkeeper.feature.SettableFeatureProvider.DISABLE_ALL;
+import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.util.HashedWheelTimer;
import java.net.InetAddress;
@@ -1528,6 +1529,7 @@ public class TestRegionAwareEnsemblePlacementPolicy
extends TestCase {
assertEquals(ensemble.get(reoderSet.get(7)), addr4.toBookieId());
}
+ @Test
public void testNewEnsembleSetWithFiveRegions() throws Exception {
repp.uninitalize();
repp = new RegionAwareEnsemblePlacementPolicy();
@@ -1573,6 +1575,7 @@ public class TestRegionAwareEnsemblePlacementPolicy
extends TestCase {
}
}
+ @Test
public void testRegionsWithDiskWeight() throws Exception {
repp.uninitalize();
repp = new RegionAwareEnsemblePlacementPolicy();
@@ -1607,4 +1610,155 @@ public class TestRegionAwareEnsemblePlacementPolicy
extends TestCase {
assertEquals(3, ensemble.size());
}
+
+ @Test
+ public void testNotifyRackChangeWithOldRegion() throws Exception {
+ BookieSocketAddress addr1 = new BookieSocketAddress("127.0.1.1", 3181);
+ BookieSocketAddress addr2 = new BookieSocketAddress("127.0.1.2", 3181);
+ BookieSocketAddress addr3 = new BookieSocketAddress("127.0.1.3", 3181);
+ BookieSocketAddress addr4 = new BookieSocketAddress("127.0.1.4", 3181);
+
+ // update dns mapping
+ StaticDNSResolver.addNodeToRack(addr1.getHostName(),
"/region1/rack-1");
+ StaticDNSResolver.addNodeToRack(addr2.getHostName(),
"/region1/rack-1");
+ StaticDNSResolver.addNodeToRack(addr3.getHostName(),
"/region2/rack-1");
+ StaticDNSResolver.addNodeToRack(addr4.getHostName(),
"/region2/rack-1");
+
+ // Update cluster
+ Set<BookieId> addrs = Sets.newHashSet(addr1.toBookieId(),
+ addr2.toBookieId(), addr3.toBookieId(), addr4.toBookieId());
+ repp.onClusterChanged(addrs, new HashSet<>());
+
+ assertEquals(4, repp.knownBookies.size());
+ assertEquals("/region1/rack-1",
repp.knownBookies.get(addr1.toBookieId()).getNetworkLocation());
+ assertEquals("/region1/rack-1",
repp.knownBookies.get(addr2.toBookieId()).getNetworkLocation());
+ assertEquals("/region2/rack-1",
repp.knownBookies.get(addr3.toBookieId()).getNetworkLocation());
+ assertEquals("/region2/rack-1",
repp.knownBookies.get(addr4.toBookieId()).getNetworkLocation());
+
+ assertEquals(2, repp.perRegionPlacement.size());
+ TopologyAwareEnsemblePlacementPolicy region1Placement =
repp.perRegionPlacement.get("region1");
+ assertEquals(2, region1Placement.knownBookies.keySet().size());
+ assertEquals("/region1/rack-1",
region1Placement.knownBookies.get(addr1.toBookieId()).getNetworkLocation());
+ assertEquals("/region1/rack-1",
region1Placement.knownBookies.get(addr2.toBookieId()).getNetworkLocation());
+
+ TopologyAwareEnsemblePlacementPolicy region2Placement =
repp.perRegionPlacement.get("region2");
+ assertEquals(2, region2Placement.knownBookies.keySet().size());
+ assertEquals("/region2/rack-1",
region2Placement.knownBookies.get(addr3.toBookieId()).getNetworkLocation());
+ assertEquals("/region2/rack-1",
region2Placement.knownBookies.get(addr4.toBookieId()).getNetworkLocation());
+
+ assertEquals("region1", repp.address2Region.get(addr1.toBookieId()));
+ assertEquals("region1", repp.address2Region.get(addr2.toBookieId()));
+ assertEquals("region2", repp.address2Region.get(addr3.toBookieId()));
+ assertEquals("region2", repp.address2Region.get(addr4.toBookieId()));
+
+ // Update the rack.
+ // change addr2 rack info. /region1/rack-1 -> /region1/rack-2.
+ // change addr4 rack info. /region2/rack-1 -> /region1/rack-2
+ List<BookieSocketAddress> bookieAddressList = new ArrayList<>();
+ List<String> rackList = new ArrayList<>();
+ bookieAddressList.add(addr2);
+ rackList.add("/region1/rack-2");
+ bookieAddressList.add(addr4);
+ rackList.add("/region1/rack-2");
+ StaticDNSResolver.changeRack(bookieAddressList, rackList);
+
+ assertEquals(4, repp.knownBookies.size());
+ assertEquals("/region1/rack-1",
repp.knownBookies.get(addr1.toBookieId()).getNetworkLocation());
+ assertEquals("/region1/rack-2",
repp.knownBookies.get(addr2.toBookieId()).getNetworkLocation());
+ assertEquals("/region2/rack-1",
repp.knownBookies.get(addr3.toBookieId()).getNetworkLocation());
+ assertEquals("/region1/rack-2",
repp.knownBookies.get(addr4.toBookieId()).getNetworkLocation());
+
+ assertEquals(2, repp.perRegionPlacement.size());
+ region1Placement = repp.perRegionPlacement.get("region1");
+ assertEquals(3, region1Placement.knownBookies.keySet().size());
+ assertEquals("/region1/rack-1",
region1Placement.knownBookies.get(addr1.toBookieId()).getNetworkLocation());
+ assertEquals("/region1/rack-2",
region1Placement.knownBookies.get(addr2.toBookieId()).getNetworkLocation());
+ assertEquals("/region1/rack-2",
region1Placement.knownBookies.get(addr4.toBookieId()).getNetworkLocation());
+
+ region2Placement = repp.perRegionPlacement.get("region2");
+ assertEquals(1, region2Placement.knownBookies.keySet().size());
+ assertEquals("/region2/rack-1",
region2Placement.knownBookies.get(addr3.toBookieId()).getNetworkLocation());
+
+ assertEquals("region1", repp.address2Region.get(addr1.toBookieId()));
+ assertEquals("region1", repp.address2Region.get(addr2.toBookieId()));
+ assertEquals("region2", repp.address2Region.get(addr3.toBookieId()));
+ assertEquals("region1", repp.address2Region.get(addr4.toBookieId()));
+ }
+
+ @Test
+ public void testNotifyRackChangeWithNewRegion() throws Exception {
+ BookieSocketAddress addr1 = new BookieSocketAddress("127.0.1.1", 3181);
+ BookieSocketAddress addr2 = new BookieSocketAddress("127.0.1.2", 3181);
+ BookieSocketAddress addr3 = new BookieSocketAddress("127.0.1.3", 3181);
+ BookieSocketAddress addr4 = new BookieSocketAddress("127.0.1.4", 3181);
+
+ // update dns mapping
+ StaticDNSResolver.addNodeToRack(addr1.getHostName(),
"/region1/rack-1");
+ StaticDNSResolver.addNodeToRack(addr2.getHostName(),
"/region1/rack-1");
+ StaticDNSResolver.addNodeToRack(addr3.getHostName(),
"/region2/rack-1");
+ StaticDNSResolver.addNodeToRack(addr4.getHostName(),
"/region2/rack-1");
+
+ // Update cluster
+ Set<BookieId> addrs = Sets.newHashSet(addr1.toBookieId(),
+ addr2.toBookieId(), addr3.toBookieId(), addr4.toBookieId());
+ repp.onClusterChanged(addrs, new HashSet<>());
+
+ assertEquals(4, repp.knownBookies.size());
+ assertEquals("/region1/rack-1",
repp.knownBookies.get(addr1.toBookieId()).getNetworkLocation());
+ assertEquals("/region1/rack-1",
repp.knownBookies.get(addr2.toBookieId()).getNetworkLocation());
+ assertEquals("/region2/rack-1",
repp.knownBookies.get(addr3.toBookieId()).getNetworkLocation());
+ assertEquals("/region2/rack-1",
repp.knownBookies.get(addr4.toBookieId()).getNetworkLocation());
+
+ assertEquals(2, repp.perRegionPlacement.size());
+ TopologyAwareEnsemblePlacementPolicy region1Placement =
repp.perRegionPlacement.get("region1");
+ assertEquals(2, region1Placement.knownBookies.keySet().size());
+ assertEquals("/region1/rack-1",
region1Placement.knownBookies.get(addr1.toBookieId()).getNetworkLocation());
+ assertEquals("/region1/rack-1",
region1Placement.knownBookies.get(addr2.toBookieId()).getNetworkLocation());
+
+ TopologyAwareEnsemblePlacementPolicy region2Placement =
repp.perRegionPlacement.get("region2");
+ assertEquals(2, region2Placement.knownBookies.keySet().size());
+ assertEquals("/region2/rack-1",
region2Placement.knownBookies.get(addr3.toBookieId()).getNetworkLocation());
+ assertEquals("/region2/rack-1",
region2Placement.knownBookies.get(addr4.toBookieId()).getNetworkLocation());
+
+ assertEquals("region1", repp.address2Region.get(addr1.toBookieId()));
+ assertEquals("region1", repp.address2Region.get(addr2.toBookieId()));
+ assertEquals("region2", repp.address2Region.get(addr3.toBookieId()));
+ assertEquals("region2", repp.address2Region.get(addr4.toBookieId()));
+
+ // Update the rack.
+ // change addr2 rack info. /region1/rack-1 -> /region3/rack-1.
+ // change addr4 rack info. /region2/rack-1 -> /region3/rack-1
+ List<BookieSocketAddress> bookieAddressList = new ArrayList<>();
+ List<String> rackList = new ArrayList<>();
+ bookieAddressList.add(addr2);
+ rackList.add("/region3/rack-1");
+ bookieAddressList.add(addr4);
+ rackList.add("/region3/rack-1");
+ StaticDNSResolver.changeRack(bookieAddressList, rackList);
+
+ assertEquals(4, repp.knownBookies.size());
+ assertEquals("/region1/rack-1",
repp.knownBookies.get(addr1.toBookieId()).getNetworkLocation());
+ assertEquals("/region3/rack-1",
repp.knownBookies.get(addr2.toBookieId()).getNetworkLocation());
+ assertEquals("/region2/rack-1",
repp.knownBookies.get(addr3.toBookieId()).getNetworkLocation());
+ assertEquals("/region3/rack-1",
repp.knownBookies.get(addr4.toBookieId()).getNetworkLocation());
+
+ assertEquals(3, repp.perRegionPlacement.size());
+ region1Placement = repp.perRegionPlacement.get("region1");
+ assertEquals(1, region1Placement.knownBookies.keySet().size());
+ assertEquals("/region1/rack-1",
region1Placement.knownBookies.get(addr1.toBookieId()).getNetworkLocation());
+
+ region2Placement = repp.perRegionPlacement.get("region2");
+ assertEquals(1, region2Placement.knownBookies.keySet().size());
+ assertEquals("/region2/rack-1",
region2Placement.knownBookies.get(addr3.toBookieId()).getNetworkLocation());
+
+ TopologyAwareEnsemblePlacementPolicy region3Placement =
repp.perRegionPlacement.get("region3");
+ assertEquals(2, region3Placement.knownBookies.keySet().size());
+ assertEquals("/region3/rack-1",
region3Placement.knownBookies.get(addr2.toBookieId()).getNetworkLocation());
+ assertEquals("/region3/rack-1",
region3Placement.knownBookies.get(addr4.toBookieId()).getNetworkLocation());
+
+ assertEquals("region1", repp.address2Region.get(addr1.toBookieId()));
+ assertEquals("region3", repp.address2Region.get(addr2.toBookieId()));
+ assertEquals("region2", repp.address2Region.get(addr3.toBookieId()));
+ assertEquals("region3", repp.address2Region.get(addr4.toBookieId()));
+ }
}