This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 43335fb80f4 [fix][broker] Fix PulsarRegistrationClient and
ZkRegistrationClient not aware rack info problem. (#18672)
43335fb80f4 is described below
commit 43335fb80f407471fa4b4278d92b6ea8e4ab5c62
Author: Yan Zhao <[email protected]>
AuthorDate: Wed Feb 1 10:00:59 2023 +0800
[fix][broker] Fix PulsarRegistrationClient and ZkRegistrationClient not
aware rack info problem. (#18672)
---
.../rackawareness/BookieRackAffinityMapping.java | 26 +++++
.../BookieRackAffinityMappingTest.java | 127 +++++++++++++++++++++
.../bookkeeper/PulsarRegistrationClient.java | 18 ++-
3 files changed, 161 insertions(+), 10 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java
index 6602dfcd049..e9e350800b4 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.bookie.rackawareness;
import static
org.apache.pulsar.metadata.bookkeeper.AbstractMetadataDriver.METADATA_STORE_SCHEME;
+import java.lang.reflect.Field;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
@@ -28,8 +29,10 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
+import org.apache.bookkeeper.client.DefaultBookieAddressResolver;
import org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy;
import org.apache.bookkeeper.client.RackChangeNotifier;
+import org.apache.bookkeeper.discover.RegistrationClient;
import org.apache.bookkeeper.meta.exceptions.Code;
import org.apache.bookkeeper.meta.exceptions.MetadataException;
import org.apache.bookkeeper.net.AbstractDNSToSwitchMapping;
@@ -119,6 +122,7 @@ public class BookieRackAffinityMapping extends
AbstractDNSToSwitchMapping
racksWithHost = bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).get()
.orElseGet(BookiesRackConfiguration::new);
updateRacksWithHost(racksWithHost);
+ watchAvailableBookies();
for (Map<String, BookieInfo> bookieMapping :
racksWithHost.values()) {
for (String address : bookieMapping.keySet()) {
bookieAddressListLastTime.add(BookieId.parse(address));
@@ -133,6 +137,28 @@ public class BookieRackAffinityMapping extends
AbstractDNSToSwitchMapping
}
}
+ private void watchAvailableBookies() {
+ BookieAddressResolver bookieAddressResolver =
getBookieAddressResolver();
+ if (bookieAddressResolver instanceof DefaultBookieAddressResolver) {
+ try {
+ Field field =
DefaultBookieAddressResolver.class.getDeclaredField("registrationClient");
+ field.setAccessible(true);
+ RegistrationClient registrationClient = (RegistrationClient)
field.get(bookieAddressResolver);
+ registrationClient.watchWritableBookies(versioned -> {
+ try {
+ racksWithHost =
bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).get()
+ .orElseGet(BookiesRackConfiguration::new);
+ updateRacksWithHost(racksWithHost);
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error("Failed to update rack info. ", e);
+ }
+ });
+ } catch (NoSuchFieldException | IllegalAccessException e) {
+ LOG.error("Failed watch available bookies.", e);
+ }
+ }
+ }
+
private synchronized void updateRacksWithHost(BookiesRackConfiguration
racks) {
// In config z-node, the bookies are added in the `ip:port` notation,
while BK will ask
// for just the IP/hostname when trying to get the rack for a bookie.
diff --git
a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java
b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java
index bbc0c9257fd..d7be7dabd0d 100644
---
a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java
+++
b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java
@@ -18,21 +18,45 @@
*/
package org.apache.pulsar.bookie.rackawareness;
+import static
org.apache.bookkeeper.feature.SettableFeatureProvider.DISABLE_ALL;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.util.HashedWheelTimer;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.client.DefaultBookieAddressResolver;
+import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
+import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.discover.BookieServiceInfo;
+import org.apache.bookkeeper.discover.BookieServiceInfoUtils;
+import org.apache.bookkeeper.discover.RegistrationClient;
+import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.net.BookieNode;
import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookieAddressResolver;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.pulsar.common.policies.data.BookieInfo;
+import org.apache.pulsar.common.policies.data.BookiesRackConfiguration;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreFactory;
+import org.apache.pulsar.metadata.bookkeeper.BookieServiceInfoSerde;
+import org.apache.pulsar.metadata.bookkeeper.PulsarRegistrationClient;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -198,4 +222,107 @@ public class BookieRackAffinityMappingTest {
assertNull(r.get(2));
});
}
+
+ @Test
+ public void testWithPulsarRegistrationClient() throws Exception {
+ String data = "{\"group1\": {\"" + BOOKIE1
+ + "\": {\"rack\": \"/rack0\", \"hostname\":
\"bookie1.example.com\"}, \"" + BOOKIE2
+ + "\": {\"rack\": \"/rack1\", \"hostname\":
\"bookie2.example.com\"}}}";
+ store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
data.getBytes(), Optional.empty()).join();
+
+ // Case1: ZKCache is given
+ BookieRackAffinityMapping mapping = new BookieRackAffinityMapping();
+ Field field =
BookieRackAffinityMapping.class.getDeclaredField("racksWithHost");
+ field.setAccessible(true);
+
+ ClientConfiguration bkClientConf = new ClientConfiguration();
+
bkClientConf.setProperty(BookieRackAffinityMapping.METADATA_STORE_INSTANCE,
store);
+
+ PulsarRegistrationClient pulsarRegistrationClient = new
PulsarRegistrationClient(store, "/ledgers");
+ DefaultBookieAddressResolver defaultBookieAddressResolver = new
DefaultBookieAddressResolver(pulsarRegistrationClient);
+
+ mapping.setBookieAddressResolver(defaultBookieAddressResolver);
+ mapping.setConf(bkClientConf);
+ List<String> racks = mapping
+ .resolve(Lists.newArrayList(BOOKIE1.getHostName(),
BOOKIE2.getHostName(), BOOKIE3.getHostName()))
+ .stream().filter(Objects::nonNull).toList();
+ assertEquals(racks.size(), 0);
+
+ HashedWheelTimer timer = new HashedWheelTimer(
+ new
ThreadFactoryBuilder().setNameFormat("TestTimer-%d").build(),
+ bkClientConf.getTimeoutTimerTickDurationMs(),
TimeUnit.MILLISECONDS,
+ bkClientConf.getTimeoutTimerNumTicks());
+
+ RackawareEnsemblePlacementPolicy repp = new
RackawareEnsemblePlacementPolicy();
+ Class<?> clazz1 =
Class.forName("org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy");
+ Field field1 = clazz1.getDeclaredField("knownBookies");
+ field1.setAccessible(true);
+ Map<BookieId, BookieNode> knownBookies = (Map<BookieId, BookieNode>)
field1.get(repp);
+ repp.initialize(bkClientConf, Optional.of(mapping), timer,
+ DISABLE_ALL, NullStatsLogger.INSTANCE,
defaultBookieAddressResolver);
+
+ Class<?> clazz2 =
Class.forName("org.apache.bookkeeper.client.BookieWatcherImpl");
+ Constructor<?> constructor =
+ clazz2.getDeclaredConstructor(ClientConfiguration.class,
EnsemblePlacementPolicy.class,
+ RegistrationClient.class, BookieAddressResolver.class,
StatsLogger.class);
+ constructor.setAccessible(true);
+ Object o = constructor.newInstance(bkClientConf, repp,
pulsarRegistrationClient, defaultBookieAddressResolver,
+ NullStatsLogger.INSTANCE);
+ Method method = clazz2.getDeclaredMethod("initialBlockingBookieRead");
+ method.setAccessible(true);
+ method.invoke(o);
+
+ Set<BookieId> bookieIds = new HashSet<>();
+ bookieIds.add(BOOKIE1.toBookieId());
+
+ Field field2 =
BookieServiceInfoSerde.class.getDeclaredField("INSTANCE");
+ field2.setAccessible(true);
+ BookieServiceInfoSerde serviceInfoSerde = (BookieServiceInfoSerde)
field2.get(null);
+
+ BookieServiceInfo bookieServiceInfo =
BookieServiceInfoUtils.buildLegacyBookieServiceInfo(BOOKIE1.toString());
+ store.put("/ledgers/available/" + BOOKIE1,
serviceInfoSerde.serialize("", bookieServiceInfo),
+ Optional.of(-1L)).get();
+
+ Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() ->
((BookiesRackConfiguration)field.get(mapping)).get("group1").size() == 1);
+ racks = mapping
+ .resolve(Lists.newArrayList(BOOKIE1.getHostName(),
BOOKIE2.getHostName(), BOOKIE3.getHostName()))
+ .stream().filter(Objects::nonNull).toList();
+ assertEquals(racks.size(), 1);
+ assertEquals(racks.get(0), "/rack0");
+ assertEquals(knownBookies.size(), 1);
+
assertEquals(knownBookies.get(BOOKIE1.toBookieId()).getNetworkLocation(),
"/rack0");
+
+ bookieServiceInfo =
BookieServiceInfoUtils.buildLegacyBookieServiceInfo(BOOKIE2.toString());
+ store.put("/ledgers/available/" + BOOKIE2,
serviceInfoSerde.serialize("", bookieServiceInfo),
+ Optional.of(-1L)).get();
+ Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() ->
((BookiesRackConfiguration)field.get(mapping)).get("group1").size() == 2);
+
+ racks = mapping
+ .resolve(Lists.newArrayList(BOOKIE1.getHostName(),
BOOKIE2.getHostName(), BOOKIE3.getHostName()))
+ .stream().filter(Objects::nonNull).toList();
+ assertEquals(racks.size(), 2);
+ assertEquals(racks.get(0), "/rack0");
+ assertEquals(racks.get(1), "/rack1");
+ assertEquals(knownBookies.size(), 2);
+
assertEquals(knownBookies.get(BOOKIE1.toBookieId()).getNetworkLocation(),
"/rack0");
+
assertEquals(knownBookies.get(BOOKIE2.toBookieId()).getNetworkLocation(),
"/rack1");
+
+ bookieServiceInfo =
BookieServiceInfoUtils.buildLegacyBookieServiceInfo(BOOKIE3.toString());
+ store.put("/ledgers/available/" + BOOKIE3,
serviceInfoSerde.serialize("", bookieServiceInfo),
+ Optional.of(-1L)).get();
+ Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() ->
((BookiesRackConfiguration)field.get(mapping)).get("group1").size() == 2);
+
+ racks = mapping
+ .resolve(Lists.newArrayList(BOOKIE1.getHostName(),
BOOKIE2.getHostName(), BOOKIE3.getHostName()))
+ .stream().filter(Objects::nonNull).toList();
+ assertEquals(racks.size(), 2);
+ assertEquals(racks.get(0), "/rack0");
+ assertEquals(racks.get(1), "/rack1");
+ assertEquals(knownBookies.size(), 3);
+
assertEquals(knownBookies.get(BOOKIE1.toBookieId()).getNetworkLocation(),
"/rack0");
+
assertEquals(knownBookies.get(BOOKIE2.toBookieId()).getNetworkLocation(),
"/rack1");
+
assertEquals(knownBookies.get(BOOKIE3.toBookieId()).getNetworkLocation(),
"/default-rack");
+
+ timer.stop();
+ }
}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
index b10c10469e7..a32625926e7 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
@@ -25,11 +25,11 @@ import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import lombok.extern.slf4j.Slf4j;
@@ -59,8 +59,8 @@ public class PulsarRegistrationClient implements
RegistrationClient {
private final ConcurrentHashMap<BookieId, Versioned<BookieServiceInfo>>
bookieServiceInfoCache =
new ConcurrentHashMap();
- private final Map<RegistrationListener, Boolean> writableBookiesWatchers =
new ConcurrentHashMap<>();
- private final Map<RegistrationListener, Boolean> readOnlyBookiesWatchers =
new ConcurrentHashMap<>();
+ private final Set<RegistrationListener> writableBookiesWatchers = new
CopyOnWriteArraySet<>();
+ private final Set<RegistrationListener> readOnlyBookiesWatchers = new
CopyOnWriteArraySet<>();
private final MetadataCache<BookieServiceInfo>
bookieServiceInfoMetadataCache;
private final ScheduledExecutorService executor;
@@ -131,7 +131,7 @@ public class PulsarRegistrationClient implements
RegistrationClient {
@Override
public CompletableFuture<Void> watchWritableBookies(RegistrationListener
registrationListener) {
- writableBookiesWatchers.put(registrationListener, Boolean.TRUE);
+ writableBookiesWatchers.add(registrationListener);
return getWritableBookies()
.thenAcceptAsync(registrationListener::onBookiesChanged,
executor);
}
@@ -143,7 +143,7 @@ public class PulsarRegistrationClient implements
RegistrationClient {
@Override
public CompletableFuture<Void> watchReadOnlyBookies(RegistrationListener
registrationListener) {
- readOnlyBookiesWatchers.put(registrationListener, Boolean.TRUE);
+ readOnlyBookiesWatchers.add(registrationListener);
return getReadOnlyBookies()
.thenAcceptAsync(registrationListener::onBookiesChanged,
executor);
}
@@ -175,14 +175,12 @@ public class PulsarRegistrationClient implements
RegistrationClient {
if (n.getType() == NotificationType.Created || n.getType() ==
NotificationType.Deleted) {
if (n.getPath().startsWith(bookieReadonlyRegistrationPath)) {
getReadOnlyBookies().thenAccept(bookies -> {
- readOnlyBookiesWatchers.keySet()
- .forEach(w -> executor.execute(() ->
w.onBookiesChanged(bookies)));
+ readOnlyBookiesWatchers.forEach(w -> executor.execute(()
-> w.onBookiesChanged(bookies)));
});
handleDeletedBookieNode(n);
} else if (n.getPath().startsWith(bookieRegistrationPath)) {
- getWritableBookies().thenAccept(bookies ->
- writableBookiesWatchers.keySet()
- .forEach(w -> executor.execute(() ->
w.onBookiesChanged(bookies))));
+ getWritableBookies().thenAccept(bookies ->
+ writableBookiesWatchers.forEach(w ->
executor.execute(() -> w.onBookiesChanged(bookies))));
handleDeletedBookieNode(n);
}
} else if (n.getType() == NotificationType.Modified) {