This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 9cb41fd42e5e3f19ba950fde6bcfcbac267cf3c2 Author: Michael Marshall <[email protected]> AuthorDate: Fri Jul 29 03:24:56 2022 -0500 Fix rack awareness cache expiration race condition (#16825) (cherry picked from commit e451806a715661fca81876579e0f078aca36c9d9) --- .../rackawareness/BookieRackAffinityMapping.java | 64 +++++++--------------- .../BookieRackAffinityMappingTest.java | 12 ++-- 2 files changed, 26 insertions(+), 50 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java index ec4b7da250e..c0c29637114 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java @@ -26,9 +26,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy; import org.apache.bookkeeper.client.RackChangeNotifier; @@ -66,8 +64,8 @@ public class BookieRackAffinityMapping extends AbstractDNSToSwitchMapping private ITopologyAwareEnsemblePlacementPolicy<BookieNode> rackawarePolicy = null; private List<BookieId> bookieAddressListLastTime = new ArrayList<>(); - private volatile BookiesRackConfiguration racksWithHost = new BookiesRackConfiguration(); - private volatile Map<String, BookieInfo> bookieInfoMap = new HashMap<>(); + private BookiesRackConfiguration racksWithHost = new BookiesRackConfiguration(); + private Map<String, BookieInfo> bookieInfoMap = new HashMap<>(); public static MetadataStore createMetadataStore(Configuration conf) throws MetadataException { MetadataStore store; @@ -110,15 +108,17 @@ public class BookieRackAffinityMapping extends AbstractDNSToSwitchMapping } @Override - public void setConf(Configuration conf) { + public synchronized void setConf(Configuration conf) { super.setConf(conf); MetadataStore store; try { store = createMetadataStore(conf); bookieMappingCache = store.getMetadataCache(BookiesRackConfiguration.class); - bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).join(); - for (Map<String, BookieInfo> bookieMapping : bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).get() - .map(Map::values).orElse(Collections.emptyList())) { + store.registerListener(this::handleUpdates); + racksWithHost = bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).get() + .orElseGet(BookiesRackConfiguration::new); + updateRacksWithHost(racksWithHost); + for (Map<String, BookieInfo> bookieMapping : racksWithHost.values()) { for (String address : bookieMapping.keySet()) { bookieAddressListLastTime.add(BookieId.parse(address)); } @@ -130,13 +130,9 @@ public class BookieRackAffinityMapping extends AbstractDNSToSwitchMapping } catch (InterruptedException | ExecutionException | MetadataException e) { throw new RuntimeException(METADATA_STORE_INSTANCE + " failed to init BookieId list"); } - store.registerListener(this::handleUpdates); - - // A previous version of this code tried to eagerly load the cache. However, this is invalid - // in later versions of bookkeeper as when setConf is called, the bookieAddressResolver is not yet set } - private void updateRacksWithHost(BookiesRackConfiguration racks) { + private synchronized void updateRacksWithHost(BookiesRackConfiguration racks) { // In config z-node, the bookies are added in the `ip:port` notation, while BK will ask // for just the IP/hostname when trying to get the rack for a bookie. // To work around this issue, we insert in the map the bookie ip/hostname with same rack-info @@ -176,7 +172,7 @@ public class BookieRackAffinityMapping extends AbstractDNSToSwitchMapping } @Override - public List<String> resolve(List<String> bookieAddressList) { + public synchronized List<String> resolve(List<String> bookieAddressList) { List<String> racks = new ArrayList<>(bookieAddressList.size()); for (String bookieAddress : bookieAddressList) { racks.add(getRack(bookieAddress)); @@ -185,32 +181,9 @@ public class BookieRackAffinityMapping extends AbstractDNSToSwitchMapping } private String getRack(String bookieAddress) { - try { - // Trigger load of z-node in case it didn't exist - CompletableFuture<Optional<BookiesRackConfiguration>> future = - bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH); - - Optional<BookiesRackConfiguration> racks = (future.isDone() && !future.isCompletedExceptionally()) - ? future.join() : Optional.empty(); - updateRacksWithHost(racks.orElseGet(BookiesRackConfiguration::new)); - if (!racks.isPresent()) { - // since different placement policy will have different default rack, - // don't be smart here and just return null - return null; - } - } catch (Exception e) { - throw new RuntimeException(e); - } - BookieInfo bi = bookieInfoMap.get(bookieAddress); if (bi == null) { - Optional<BookieInfo> biOpt = racksWithHost.getBookie(bookieAddress); - if (biOpt.isPresent()) { - bi = biOpt.get(); - } else { - updateRacksWithHost(racksWithHost); - bi = bookieInfoMap.get(bookieAddress); - } + bi = racksWithHost.getBookie(bookieAddress).orElse(null); } if (bi != null @@ -243,10 +216,11 @@ public class BookieRackAffinityMapping extends AbstractDNSToSwitchMapping return; } - if (rackawarePolicy != null) { - bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH) - .thenAccept(optVal -> { + bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH) + .thenAccept(optVal -> { + synchronized (this) { LOG.info("Bookie rack info updated to {}. Notifying rackaware policy.", optVal); + this.updateRacksWithHost(optVal.orElseGet(BookiesRackConfiguration::new)); List<BookieId> bookieAddressList = new ArrayList<>(); for (Map<String, BookieInfo> bookieMapping : optVal.map(Map::values).orElse( Collections.emptyList())) { @@ -261,9 +235,11 @@ public class BookieRackAffinityMapping extends AbstractDNSToSwitchMapping Set<BookieId> bookieIdSet = new HashSet<>(bookieAddressList); bookieIdSet.addAll(bookieAddressListLastTime); bookieAddressListLastTime = bookieAddressList; - rackawarePolicy.onBookieRackChange(new ArrayList<>(bookieIdSet)); - }); - } + if (rackawarePolicy != null) { + rackawarePolicy.onBookieRackChange(new ArrayList<>(bookieIdSet)); + } + } + }); } @Override diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java index 485447868b1..4377916ace2 100644 --- a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java @@ -170,12 +170,12 @@ public class BookieRackAffinityMappingTest { bookieMapping.put("group2", secondaryBookieGroup); store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, jsonMapper.writeValueAsBytes(bookieMapping), Optional.empty()).join(); - - racks = mapping.resolve(Lists.newArrayList("127.0.0.1", "127.0.0.2", "127.0.0.3")); - assertEquals(racks.get(0), "/rack0"); - assertEquals(racks.get(1), "/rack1"); - assertEquals(racks.get(2), "/rack0"); - + Awaitility.await().untilAsserted(() -> { + List<String> r = mapping.resolve(Lists.newArrayList("127.0.0.1", "127.0.0.2", "127.0.0.3")); + assertEquals(r.get(0), "/rack0"); + assertEquals(r.get(1), "/rack1"); + assertEquals(r.get(2), "/rack0"); + }); store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, "{}".getBytes(), Optional.empty()).join();
