codelipenghui commented on code in PR #18672:
URL: https://github.com/apache/pulsar/pull/18672#discussion_r1050334648
##########
pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java:
##########
@@ -198,4 +222,206 @@ public void testBookieInfoChange() throws Exception {
assertNull(r.get(2));
});
}
+
+ @Test
+ public void testWithPulsarRegistrationClient() throws Exception {
+ String data = "{\"group1\": {\"" + BOOKIE1
+ + "\": {\"rack\": \"/rack0\", \"hostname\":
\"bookie1.example.com\"}, \"" + BOOKIE2
+ + "\": {\"rack\": \"/rack1\", \"hostname\":
\"bookie2.example.com\"}}}";
+ store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
data.getBytes(), Optional.empty()).join();
+
+ // Case1: ZKCache is given
+ BookieRackAffinityMapping mapping = new BookieRackAffinityMapping();
+ Field field =
BookieRackAffinityMapping.class.getDeclaredField("racksWithHost");
+ field.setAccessible(true);
+
+ ClientConfiguration bkClientConf = new ClientConfiguration();
+
bkClientConf.setProperty(BookieRackAffinityMapping.METADATA_STORE_INSTANCE,
store);
+
+ PulsarRegistrationClient pulsarRegistrationClient = new
PulsarRegistrationClient(store, "/ledgers");
+ DefaultBookieAddressResolver defaultBookieAddressResolver = new
DefaultBookieAddressResolver(pulsarRegistrationClient);
+
+ mapping.setBookieAddressResolver(defaultBookieAddressResolver);
+ mapping.setConf(bkClientConf);
+ List<String> racks = mapping
+ .resolve(Lists.newArrayList(BOOKIE1.getHostName(),
BOOKIE2.getHostName(), BOOKIE3.getHostName()))
+ .stream().filter(Objects::nonNull).toList();
+ assertEquals(racks.size(), 0);
+
+ HashedWheelTimer timer = new HashedWheelTimer(
Review Comment:
Stop the timer after the test.
##########
pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java:
##########
@@ -198,4 +222,206 @@ public void testBookieInfoChange() throws Exception {
assertNull(r.get(2));
});
}
+
+ @Test
+ public void testWithPulsarRegistrationClient() throws Exception {
+ String data = "{\"group1\": {\"" + BOOKIE1
+ + "\": {\"rack\": \"/rack0\", \"hostname\":
\"bookie1.example.com\"}, \"" + BOOKIE2
+ + "\": {\"rack\": \"/rack1\", \"hostname\":
\"bookie2.example.com\"}}}";
+ store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
data.getBytes(), Optional.empty()).join();
+
+ // Case1: ZKCache is given
+ BookieRackAffinityMapping mapping = new BookieRackAffinityMapping();
+ Field field =
BookieRackAffinityMapping.class.getDeclaredField("racksWithHost");
+ field.setAccessible(true);
+
+ ClientConfiguration bkClientConf = new ClientConfiguration();
+
bkClientConf.setProperty(BookieRackAffinityMapping.METADATA_STORE_INSTANCE,
store);
+
+ PulsarRegistrationClient pulsarRegistrationClient = new
PulsarRegistrationClient(store, "/ledgers");
+ DefaultBookieAddressResolver defaultBookieAddressResolver = new
DefaultBookieAddressResolver(pulsarRegistrationClient);
+
+ mapping.setBookieAddressResolver(defaultBookieAddressResolver);
+ mapping.setConf(bkClientConf);
+ List<String> racks = mapping
+ .resolve(Lists.newArrayList(BOOKIE1.getHostName(),
BOOKIE2.getHostName(), BOOKIE3.getHostName()))
+ .stream().filter(Objects::nonNull).toList();
+ assertEquals(racks.size(), 0);
+
+ HashedWheelTimer timer = new HashedWheelTimer(
+ new
ThreadFactoryBuilder().setNameFormat("TestTimer-%d").build(),
+ bkClientConf.getTimeoutTimerTickDurationMs(),
TimeUnit.MILLISECONDS,
+ bkClientConf.getTimeoutTimerNumTicks());
+
+ RackawareEnsemblePlacementPolicy repp = new
RackawareEnsemblePlacementPolicy();
+ Class<?> clazz1 =
Class.forName("org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy");
+ Field field1 = clazz1.getDeclaredField("knownBookies");
+ field1.setAccessible(true);
+ Map<BookieId, BookieNode> knownBookies = (Map<BookieId, BookieNode>)
field1.get(repp);
+ repp.initialize(bkClientConf, Optional.of(mapping), timer,
+ DISABLE_ALL, NullStatsLogger.INSTANCE,
defaultBookieAddressResolver);
+
+ Class<?> clazz2 =
Class.forName("org.apache.bookkeeper.client.BookieWatcherImpl");
+ Constructor<?> constructor =
+ clazz2.getDeclaredConstructor(ClientConfiguration.class,
EnsemblePlacementPolicy.class,
+ RegistrationClient.class, BookieAddressResolver.class,
StatsLogger.class);
+ constructor.setAccessible(true);
+ Object o = constructor.newInstance(bkClientConf, repp,
pulsarRegistrationClient, defaultBookieAddressResolver,
+ NullStatsLogger.INSTANCE);
+ Method method = clazz2.getDeclaredMethod("initialBlockingBookieRead");
+ method.setAccessible(true);
+ method.invoke(o);
+
+ Set<BookieId> bookieIds = new HashSet<>();
+ bookieIds.add(BOOKIE1.toBookieId());
+
+ Field field2 =
BookieServiceInfoSerde.class.getDeclaredField("INSTANCE");
+ field2.setAccessible(true);
+ BookieServiceInfoSerde serviceInfoSerde = (BookieServiceInfoSerde)
field2.get(null);
+
+ BookieServiceInfo bookieServiceInfo =
BookieServiceInfoUtils.buildLegacyBookieServiceInfo(BOOKIE1.toString());
+ store.put("/ledgers/available/" + BOOKIE1,
serviceInfoSerde.serialize("", bookieServiceInfo),
+ Optional.of(-1L)).get();
+
+ Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() ->
((BookiesRackConfiguration)field.get(mapping)).get("group1").size() == 1);
+ racks = mapping
+ .resolve(Lists.newArrayList(BOOKIE1.getHostName(),
BOOKIE2.getHostName(), BOOKIE3.getHostName()))
+ .stream().filter(Objects::nonNull).toList();
+ assertEquals(racks.size(), 1);
+ assertEquals(racks.get(0), "/rack0");
+ assertEquals(knownBookies.size(), 1);
+
assertEquals(knownBookies.get(BOOKIE1.toBookieId()).getNetworkLocation(),
"/rack0");
+
+ bookieServiceInfo =
BookieServiceInfoUtils.buildLegacyBookieServiceInfo(BOOKIE2.toString());
+ store.put("/ledgers/available/" + BOOKIE2,
serviceInfoSerde.serialize("", bookieServiceInfo),
+ Optional.of(-1L)).get();
+ Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() ->
((BookiesRackConfiguration)field.get(mapping)).get("group1").size() == 2);
+
+ racks = mapping
+ .resolve(Lists.newArrayList(BOOKIE1.getHostName(),
BOOKIE2.getHostName(), BOOKIE3.getHostName()))
+ .stream().filter(Objects::nonNull).toList();
+ assertEquals(racks.size(), 2);
+ assertEquals(racks.get(0), "/rack0");
+ assertEquals(racks.get(1), "/rack1");
+ assertEquals(knownBookies.size(), 2);
+
assertEquals(knownBookies.get(BOOKIE1.toBookieId()).getNetworkLocation(),
"/rack0");
+
assertEquals(knownBookies.get(BOOKIE2.toBookieId()).getNetworkLocation(),
"/rack1");
+
+ bookieServiceInfo =
BookieServiceInfoUtils.buildLegacyBookieServiceInfo(BOOKIE3.toString());
+ store.put("/ledgers/available/" + BOOKIE3,
serviceInfoSerde.serialize("", bookieServiceInfo),
+ Optional.of(-1L)).get();
+ Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() ->
((BookiesRackConfiguration)field.get(mapping)).get("group1").size() == 2);
+
+ racks = mapping
+ .resolve(Lists.newArrayList(BOOKIE1.getHostName(),
BOOKIE2.getHostName(), BOOKIE3.getHostName()))
+ .stream().filter(Objects::nonNull).toList();
+ assertEquals(racks.size(), 2);
+ assertEquals(racks.get(0), "/rack0");
+ assertEquals(racks.get(1), "/rack1");
+ assertEquals(knownBookies.size(), 3);
+
assertEquals(knownBookies.get(BOOKIE1.toBookieId()).getNetworkLocation(),
"/rack0");
+
assertEquals(knownBookies.get(BOOKIE2.toBookieId()).getNetworkLocation(),
"/rack1");
+
assertEquals(knownBookies.get(BOOKIE3.toBookieId()).getNetworkLocation(),
"/default-rack");
+ }
+
+ @Test
+ public void testWithZkRegistrationClient() throws Exception {
+ String data = "{\"group1\": {\"" + BOOKIE1
+ + "\": {\"rack\": \"/rack0\", \"hostname\":
\"bookie1.example.com\"}, \"" + BOOKIE2
+ + "\": {\"rack\": \"/rack1\", \"hostname\":
\"bookie2.example.com\"}}}";
+ store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
data.getBytes(), Optional.empty()).join();
+
+ // Case1: ZKCache is given
+ BookieRackAffinityMapping mapping = new BookieRackAffinityMapping();
+ Field field =
BookieRackAffinityMapping.class.getDeclaredField("racksWithHost");
+ field.setAccessible(true);
+
+ ClientConfiguration bkClientConf = new ClientConfiguration();
+
bkClientConf.setProperty(BookieRackAffinityMapping.METADATA_STORE_INSTANCE,
store);
+
+ PulsarRegistrationClient pulsarRegistrationClient = new
PulsarRegistrationClient(store, "/ledgers");
+ DefaultBookieAddressResolver defaultBookieAddressResolver = new
DefaultBookieAddressResolver(pulsarRegistrationClient);
+
+ mapping.setBookieAddressResolver(defaultBookieAddressResolver);
+ mapping.setConf(bkClientConf);
+ List<String> racks = mapping
+ .resolve(Lists.newArrayList(BOOKIE1.getHostName(),
BOOKIE2.getHostName(), BOOKIE3.getHostName()))
+ .stream().filter(Objects::nonNull).toList();
+ assertEquals(racks.size(), 0);
+
+ HashedWheelTimer timer = new HashedWheelTimer(
Review Comment:
Stop the timer after the test.
##########
pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java:
##########
@@ -133,6 +137,28 @@ public synchronized void setConf(Configuration conf) {
}
}
+ private void watchAvailableBookies() {
+ BookieAddressResolver bookieAddressResolver =
getBookieAddressResolver();
+ if (bookieAddressResolver instanceof DefaultBookieAddressResolver) {
+ try {
+ Field field =
DefaultBookieAddressResolver.class.getDeclaredField("registrationClient");
+ field.setAccessible(true);
+ RegistrationClient registrationClient = (RegistrationClient)
field.get(bookieAddressResolver);
+ registrationClient.watchWritableBookies(versioned -> {
+ try {
+ racksWithHost =
bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).get()
Review Comment:
No, It will load data from metadata store while cache missed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]