This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 43335fb80f4 [fix][broker] Fix PulsarRegistrationClient and 
ZkRegistrationClient not aware rack info problem. (#18672)
43335fb80f4 is described below

commit 43335fb80f407471fa4b4278d92b6ea8e4ab5c62
Author: Yan Zhao <[email protected]>
AuthorDate: Wed Feb 1 10:00:59 2023 +0800

    [fix][broker] Fix PulsarRegistrationClient and ZkRegistrationClient not 
aware rack info problem. (#18672)
---
 .../rackawareness/BookieRackAffinityMapping.java   |  26 +++++
 .../BookieRackAffinityMappingTest.java             | 127 +++++++++++++++++++++
 .../bookkeeper/PulsarRegistrationClient.java       |  18 ++-
 3 files changed, 161 insertions(+), 10 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java
index 6602dfcd049..e9e350800b4 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.bookie.rackawareness;
 
 import static 
org.apache.pulsar.metadata.bookkeeper.AbstractMetadataDriver.METADATA_STORE_SCHEME;
+import java.lang.reflect.Field;
 import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -28,8 +29,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
+import org.apache.bookkeeper.client.DefaultBookieAddressResolver;
 import org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy;
 import org.apache.bookkeeper.client.RackChangeNotifier;
+import org.apache.bookkeeper.discover.RegistrationClient;
 import org.apache.bookkeeper.meta.exceptions.Code;
 import org.apache.bookkeeper.meta.exceptions.MetadataException;
 import org.apache.bookkeeper.net.AbstractDNSToSwitchMapping;
@@ -119,6 +122,7 @@ public class BookieRackAffinityMapping extends 
AbstractDNSToSwitchMapping
             racksWithHost = bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).get()
                     .orElseGet(BookiesRackConfiguration::new);
             updateRacksWithHost(racksWithHost);
+            watchAvailableBookies();
             for (Map<String, BookieInfo> bookieMapping : 
racksWithHost.values()) {
                 for (String address : bookieMapping.keySet()) {
                     bookieAddressListLastTime.add(BookieId.parse(address));
@@ -133,6 +137,28 @@ public class BookieRackAffinityMapping extends 
AbstractDNSToSwitchMapping
         }
     }
 
+    private void watchAvailableBookies() {
+        BookieAddressResolver bookieAddressResolver = 
getBookieAddressResolver();
+        if (bookieAddressResolver instanceof DefaultBookieAddressResolver) {
+            try {
+                Field field = 
DefaultBookieAddressResolver.class.getDeclaredField("registrationClient");
+                field.setAccessible(true);
+                RegistrationClient registrationClient = (RegistrationClient) 
field.get(bookieAddressResolver);
+                registrationClient.watchWritableBookies(versioned -> {
+                    try {
+                        racksWithHost = 
bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).get()
+                                .orElseGet(BookiesRackConfiguration::new);
+                        updateRacksWithHost(racksWithHost);
+                    } catch (InterruptedException | ExecutionException e) {
+                        LOG.error("Failed to update rack info. ", e);
+                    }
+                });
+            } catch (NoSuchFieldException | IllegalAccessException e) {
+                LOG.error("Failed watch available bookies.", e);
+            }
+        }
+    }
+
     private synchronized void updateRacksWithHost(BookiesRackConfiguration 
racks) {
         // In config z-node, the bookies are added in the `ip:port` notation, 
while BK will ask
         // for just the IP/hostname when trying to get the rack for a bookie.
diff --git 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java
 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java
index bbc0c9257fd..d7be7dabd0d 100644
--- 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java
+++ 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java
@@ -18,21 +18,45 @@
  */
 package org.apache.pulsar.bookie.rackawareness;
 
+import static 
org.apache.bookkeeper.feature.SettableFeatureProvider.DISABLE_ALL;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNull;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.util.HashedWheelTimer;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.client.DefaultBookieAddressResolver;
+import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
+import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
 import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.discover.BookieServiceInfo;
+import org.apache.bookkeeper.discover.BookieServiceInfoUtils;
+import org.apache.bookkeeper.discover.RegistrationClient;
+import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.net.BookieNode;
 import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookieAddressResolver;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.pulsar.common.policies.data.BookieInfo;
+import org.apache.pulsar.common.policies.data.BookiesRackConfiguration;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.MetadataStoreFactory;
+import org.apache.pulsar.metadata.bookkeeper.BookieServiceInfoSerde;
+import org.apache.pulsar.metadata.bookkeeper.PulsarRegistrationClient;
 import org.awaitility.Awaitility;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -198,4 +222,107 @@ public class BookieRackAffinityMappingTest {
             assertNull(r.get(2));
         });
     }
+
+    @Test
+    public void testWithPulsarRegistrationClient() throws Exception {
+        String data = "{\"group1\": {\"" + BOOKIE1
+                + "\": {\"rack\": \"/rack0\", \"hostname\": 
\"bookie1.example.com\"}, \"" + BOOKIE2
+                + "\": {\"rack\": \"/rack1\", \"hostname\": 
\"bookie2.example.com\"}}}";
+        store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, 
data.getBytes(), Optional.empty()).join();
+
+        // Case1: ZKCache is given
+        BookieRackAffinityMapping mapping = new BookieRackAffinityMapping();
+        Field field = 
BookieRackAffinityMapping.class.getDeclaredField("racksWithHost");
+        field.setAccessible(true);
+
+        ClientConfiguration bkClientConf = new ClientConfiguration();
+        
bkClientConf.setProperty(BookieRackAffinityMapping.METADATA_STORE_INSTANCE, 
store);
+
+        PulsarRegistrationClient pulsarRegistrationClient = new 
PulsarRegistrationClient(store, "/ledgers");
+        DefaultBookieAddressResolver defaultBookieAddressResolver = new 
DefaultBookieAddressResolver(pulsarRegistrationClient);
+
+        mapping.setBookieAddressResolver(defaultBookieAddressResolver);
+        mapping.setConf(bkClientConf);
+        List<String> racks = mapping
+                .resolve(Lists.newArrayList(BOOKIE1.getHostName(), 
BOOKIE2.getHostName(), BOOKIE3.getHostName()))
+                .stream().filter(Objects::nonNull).toList();
+        assertEquals(racks.size(), 0);
+
+        HashedWheelTimer timer = new HashedWheelTimer(
+                new 
ThreadFactoryBuilder().setNameFormat("TestTimer-%d").build(),
+                bkClientConf.getTimeoutTimerTickDurationMs(), 
TimeUnit.MILLISECONDS,
+                bkClientConf.getTimeoutTimerNumTicks());
+
+        RackawareEnsemblePlacementPolicy repp = new 
RackawareEnsemblePlacementPolicy();
+        Class<?> clazz1 = 
Class.forName("org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy");
+        Field field1 = clazz1.getDeclaredField("knownBookies");
+        field1.setAccessible(true);
+        Map<BookieId, BookieNode> knownBookies = (Map<BookieId, BookieNode>) 
field1.get(repp);
+        repp.initialize(bkClientConf, Optional.of(mapping), timer,
+                DISABLE_ALL, NullStatsLogger.INSTANCE, 
defaultBookieAddressResolver);
+
+        Class<?> clazz2 = 
Class.forName("org.apache.bookkeeper.client.BookieWatcherImpl");
+        Constructor<?> constructor =
+                clazz2.getDeclaredConstructor(ClientConfiguration.class, 
EnsemblePlacementPolicy.class,
+                        RegistrationClient.class, BookieAddressResolver.class, 
StatsLogger.class);
+        constructor.setAccessible(true);
+        Object o = constructor.newInstance(bkClientConf, repp, 
pulsarRegistrationClient, defaultBookieAddressResolver,
+                NullStatsLogger.INSTANCE);
+        Method method = clazz2.getDeclaredMethod("initialBlockingBookieRead");
+        method.setAccessible(true);
+        method.invoke(o);
+
+        Set<BookieId> bookieIds = new HashSet<>();
+        bookieIds.add(BOOKIE1.toBookieId());
+
+        Field field2 = 
BookieServiceInfoSerde.class.getDeclaredField("INSTANCE");
+        field2.setAccessible(true);
+        BookieServiceInfoSerde serviceInfoSerde = (BookieServiceInfoSerde) 
field2.get(null);
+
+        BookieServiceInfo bookieServiceInfo = 
BookieServiceInfoUtils.buildLegacyBookieServiceInfo(BOOKIE1.toString());
+        store.put("/ledgers/available/" + BOOKIE1, 
serviceInfoSerde.serialize("", bookieServiceInfo),
+                Optional.of(-1L)).get();
+
+        Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> 
((BookiesRackConfiguration)field.get(mapping)).get("group1").size() == 1);
+        racks = mapping
+                .resolve(Lists.newArrayList(BOOKIE1.getHostName(), 
BOOKIE2.getHostName(), BOOKIE3.getHostName()))
+                .stream().filter(Objects::nonNull).toList();
+        assertEquals(racks.size(), 1);
+        assertEquals(racks.get(0), "/rack0");
+        assertEquals(knownBookies.size(), 1);
+        
assertEquals(knownBookies.get(BOOKIE1.toBookieId()).getNetworkLocation(), 
"/rack0");
+
+        bookieServiceInfo = 
BookieServiceInfoUtils.buildLegacyBookieServiceInfo(BOOKIE2.toString());
+        store.put("/ledgers/available/" + BOOKIE2, 
serviceInfoSerde.serialize("", bookieServiceInfo),
+                Optional.of(-1L)).get();
+        Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> 
((BookiesRackConfiguration)field.get(mapping)).get("group1").size() == 2);
+
+        racks = mapping
+                .resolve(Lists.newArrayList(BOOKIE1.getHostName(), 
BOOKIE2.getHostName(), BOOKIE3.getHostName()))
+                .stream().filter(Objects::nonNull).toList();
+        assertEquals(racks.size(), 2);
+        assertEquals(racks.get(0), "/rack0");
+        assertEquals(racks.get(1), "/rack1");
+        assertEquals(knownBookies.size(), 2);
+        
assertEquals(knownBookies.get(BOOKIE1.toBookieId()).getNetworkLocation(), 
"/rack0");
+        
assertEquals(knownBookies.get(BOOKIE2.toBookieId()).getNetworkLocation(), 
"/rack1");
+
+        bookieServiceInfo = 
BookieServiceInfoUtils.buildLegacyBookieServiceInfo(BOOKIE3.toString());
+        store.put("/ledgers/available/" + BOOKIE3, 
serviceInfoSerde.serialize("", bookieServiceInfo),
+                Optional.of(-1L)).get();
+        Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> 
((BookiesRackConfiguration)field.get(mapping)).get("group1").size() == 2);
+
+        racks = mapping
+                .resolve(Lists.newArrayList(BOOKIE1.getHostName(), 
BOOKIE2.getHostName(), BOOKIE3.getHostName()))
+                .stream().filter(Objects::nonNull).toList();
+        assertEquals(racks.size(), 2);
+        assertEquals(racks.get(0), "/rack0");
+        assertEquals(racks.get(1), "/rack1");
+        assertEquals(knownBookies.size(), 3);
+        
assertEquals(knownBookies.get(BOOKIE1.toBookieId()).getNetworkLocation(), 
"/rack0");
+        
assertEquals(knownBookies.get(BOOKIE2.toBookieId()).getNetworkLocation(), 
"/rack1");
+        
assertEquals(knownBookies.get(BOOKIE3.toBookieId()).getNetworkLocation(), 
"/default-rack");
+
+        timer.stop();
+    }
 }
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
index b10c10469e7..a32625926e7 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
@@ -25,11 +25,11 @@ import io.netty.util.concurrent.DefaultThreadFactory;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import lombok.extern.slf4j.Slf4j;
@@ -59,8 +59,8 @@ public class PulsarRegistrationClient implements 
RegistrationClient {
 
     private final ConcurrentHashMap<BookieId, Versioned<BookieServiceInfo>> 
bookieServiceInfoCache =
                                                                                
     new ConcurrentHashMap();
-    private final Map<RegistrationListener, Boolean> writableBookiesWatchers = 
new ConcurrentHashMap<>();
-    private final Map<RegistrationListener, Boolean> readOnlyBookiesWatchers = 
new ConcurrentHashMap<>();
+    private final Set<RegistrationListener> writableBookiesWatchers = new 
CopyOnWriteArraySet<>();
+    private final Set<RegistrationListener> readOnlyBookiesWatchers = new 
CopyOnWriteArraySet<>();
     private final MetadataCache<BookieServiceInfo> 
bookieServiceInfoMetadataCache;
     private final ScheduledExecutorService executor;
 
@@ -131,7 +131,7 @@ public class PulsarRegistrationClient implements 
RegistrationClient {
 
     @Override
     public CompletableFuture<Void> watchWritableBookies(RegistrationListener 
registrationListener) {
-        writableBookiesWatchers.put(registrationListener, Boolean.TRUE);
+        writableBookiesWatchers.add(registrationListener);
         return getWritableBookies()
                 .thenAcceptAsync(registrationListener::onBookiesChanged, 
executor);
     }
@@ -143,7 +143,7 @@ public class PulsarRegistrationClient implements 
RegistrationClient {
 
     @Override
     public CompletableFuture<Void> watchReadOnlyBookies(RegistrationListener 
registrationListener) {
-        readOnlyBookiesWatchers.put(registrationListener, Boolean.TRUE);
+        readOnlyBookiesWatchers.add(registrationListener);
         return getReadOnlyBookies()
                 .thenAcceptAsync(registrationListener::onBookiesChanged, 
executor);
     }
@@ -175,14 +175,12 @@ public class PulsarRegistrationClient implements 
RegistrationClient {
         if (n.getType() == NotificationType.Created || n.getType() == 
NotificationType.Deleted) {
             if (n.getPath().startsWith(bookieReadonlyRegistrationPath)) {
                 getReadOnlyBookies().thenAccept(bookies -> {
-                    readOnlyBookiesWatchers.keySet()
-                            .forEach(w -> executor.execute(() -> 
w.onBookiesChanged(bookies)));
+                    readOnlyBookiesWatchers.forEach(w -> executor.execute(() 
-> w.onBookiesChanged(bookies)));
                 });
                 handleDeletedBookieNode(n);
             } else if (n.getPath().startsWith(bookieRegistrationPath)) {
-                getWritableBookies().thenAccept(bookies ->
-                        writableBookiesWatchers.keySet()
-                                .forEach(w -> executor.execute(() -> 
w.onBookiesChanged(bookies))));
+                  getWritableBookies().thenAccept(bookies ->
+                        writableBookiesWatchers.forEach(w -> 
executor.execute(() -> w.onBookiesChanged(bookies))));
                 handleDeletedBookieNode(n);
             }
         } else if (n.getType() == NotificationType.Modified) {

Reply via email to