This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new aece67e35ec [fix] Remove blocking calls from BookieRackAffinityMapping
(#22846)
aece67e35ec is described below
commit aece67e35ecec4a9d90a951b78cfc89ca6395054
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Jun 5 10:49:00 2024 -0700
[fix] Remove blocking calls from BookieRackAffinityMapping (#22846)
---
.../rackawareness/BookieRackAffinityMapping.java | 44 +++++++++++++---------
.../IsolatedBookieEnsemblePlacementPolicy.java | 2 +-
2 files changed, 28 insertions(+), 18 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java
index 983822f2294..4a5ff746f40 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java
@@ -70,7 +70,7 @@ public class BookieRackAffinityMapping extends
AbstractDNSToSwitchMapping
private BookiesRackConfiguration racksWithHost = new
BookiesRackConfiguration();
private Map<String, BookieInfo> bookieInfoMap = new HashMap<>();
- public static MetadataStore createMetadataStore(Configuration conf) throws
MetadataException {
+ static MetadataStore getMetadataStore(Configuration conf) throws
MetadataException {
MetadataStore store;
Object storeProperty = conf.getProperty(METADATA_STORE_INSTANCE);
if (storeProperty != null) {
@@ -116,12 +116,20 @@ public class BookieRackAffinityMapping extends
AbstractDNSToSwitchMapping
super.setConf(conf);
MetadataStore store;
try {
- store = createMetadataStore(conf);
- bookieMappingCache =
store.getMetadataCache(BookiesRackConfiguration.class);
- store.registerListener(this::handleUpdates);
- racksWithHost = bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).get()
- .orElseGet(BookiesRackConfiguration::new);
- for (Map<String, BookieInfo> bookieMapping :
racksWithHost.values()) {
+ store = getMetadataStore(conf);
+ } catch (MetadataException e) {
+ throw new RuntimeException(METADATA_STORE_INSTANCE + " failed to
init BookieId list");
+ }
+
+ bookieMappingCache =
store.getMetadataCache(BookiesRackConfiguration.class);
+ store.registerListener(this::handleUpdates);
+
+ try {
+ var racksWithHost = bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH)
+ .thenApply(optRes ->
optRes.orElseGet(BookiesRackConfiguration::new))
+ .get();
+
+ for (var bookieMapping : racksWithHost.values()) {
for (String address : bookieMapping.keySet()) {
bookieAddressListLastTime.add(BookieId.parse(address));
}
@@ -131,10 +139,12 @@ public class BookieRackAffinityMapping extends
AbstractDNSToSwitchMapping
}
}
updateRacksWithHost(racksWithHost);
- watchAvailableBookies();
- } catch (InterruptedException | ExecutionException | MetadataException
e) {
- throw new RuntimeException(METADATA_STORE_INSTANCE + " failed to
init BookieId list");
+ } catch (ExecutionException | InterruptedException e) {
+ LOG.error("Failed to update rack info. ", e);
+ throw new RuntimeException(e);
}
+
+ watchAvailableBookies();
}
private void watchAvailableBookies() {
@@ -145,13 +155,13 @@ public class BookieRackAffinityMapping extends
AbstractDNSToSwitchMapping
field.setAccessible(true);
RegistrationClient registrationClient = (RegistrationClient)
field.get(bookieAddressResolver);
registrationClient.watchWritableBookies(versioned -> {
- try {
- racksWithHost =
bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).get()
- .orElseGet(BookiesRackConfiguration::new);
- updateRacksWithHost(racksWithHost);
- } catch (InterruptedException | ExecutionException e) {
- LOG.error("Failed to update rack info. ", e);
- }
+ bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH)
+ .thenApply(optRes ->
optRes.orElseGet(BookiesRackConfiguration::new))
+ .thenAccept(this::updateRacksWithHost)
+ .exceptionally(ex -> {
+ LOG.error("Failed to update rack info. ", ex);
+ return null;
+ });
});
} catch (NoSuchFieldException | IllegalAccessException e) {
LOG.error("Failed watch available bookies.", e);
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
index 8839e6e2d26..62b7ffa1e29 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
@@ -73,7 +73,7 @@ public class IsolatedBookieEnsemblePlacementPolicy extends
RackawareEnsemblePlac
StatsLogger statsLogger, BookieAddressResolver
bookieAddressResolver) {
MetadataStore store;
try {
- store = BookieRackAffinityMapping.createMetadataStore(conf);
+ store = BookieRackAffinityMapping.getMetadataStore(conf);
} catch (MetadataException e) {
throw new RuntimeException(METADATA_STORE_INSTANCE + " failed
initialized");
}