This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new abd7bfae3b2 [fix] [broker] Fix isolated group not work problem. 
(#21096)
abd7bfae3b2 is described below

commit abd7bfae3b23186a5617aee0d5f575fffa606e38
Author: Yan Zhao <[email protected]>
AuthorDate: Tue Sep 5 08:12:49 2023 +0800

    [fix] [broker] Fix isolated group not work problem. (#21096)
    
    ### Modifications
    When upgraded the pulsar version from 2.9.2 to 2.10.3, and the isolated 
group feature not work anymore.
    Finally, we found the problem. In IsolatedBookieEnsemblePlacementPolicy, 
when it gets the bookie rack from the metadata store cache, uses 
future.isDone() to avoid sync operation. If the future is incomplete, return 
empty blacklists.
    The cache may expire due to the caffeine cache `getExpireAfterWriteMillis` 
config, if the cache expires, the future may be incomplete. (#21095 will 
correct the behavior)
    
    In 2.9.2, it uses the sync to get data from the metadata store, we should 
also keep the behavior.
---
 .../IsolatedBookieEnsemblePlacementPolicy.java     |  31 +++---
 .../IsolatedBookieEnsemblePlacementPolicyTest.java | 115 +++++++++++++++++++++
 2 files changed, 134 insertions(+), 12 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
index 164677ca9c6..02ddea94874 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.bookie.rackawareness;
 
 import static 
org.apache.pulsar.bookie.rackawareness.BookieRackAffinityMapping.METADATA_STORE_INSTANCE;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Sets;
 import io.netty.util.HashedWheelTimer;
 import java.util.Collections;
@@ -27,7 +28,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
 import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
@@ -61,6 +61,7 @@ public class IsolatedBookieEnsemblePlacementPolicy extends 
RackawareEnsemblePlac
 
     private static final String PULSAR_SYSTEM_TOPIC_ISOLATION_GROUP = "*";
 
+    private volatile BookiesRackConfiguration cachedRackConfiguration = null;
 
     public IsolatedBookieEnsemblePlacementPolicy() {
         super();
@@ -86,7 +87,12 @@ public class IsolatedBookieEnsemblePlacementPolicy extends 
RackawareEnsemblePlac
             }
             // Only add the bookieMappingCache if we have defined an isolation 
group
             bookieMappingCache = 
store.getMetadataCache(BookiesRackConfiguration.class);
-            
bookieMappingCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH).join();
+            
bookieMappingCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH).thenAccept(opt
 -> opt.ifPresent(
+                            bookiesRackConfiguration -> 
cachedRackConfiguration = bookiesRackConfiguration))
+                    .exceptionally(e -> {
+                        log.warn("Failed to load bookies rack configuration 
while initialize the PlacementPolicy.");
+                        return null;
+                    });
         }
         if (conf.getProperty(SECONDARY_ISOLATION_BOOKIE_GROUPS) != null) {
             String secondaryIsolationGroupsString = ConfigurationStringUtil
@@ -179,25 +185,26 @@ public class IsolatedBookieEnsemblePlacementPolicy 
extends RackawareEnsemblePlac
         return pair;
     }
 
-    private Set<BookieId> getExcludedBookiesWithIsolationGroups(int 
ensembleSize,
+    @VisibleForTesting
+    Set<BookieId> getExcludedBookiesWithIsolationGroups(int ensembleSize,
         Pair<Set<String>, Set<String>> isolationGroups) {
         Set<BookieId> excludedBookies = new HashSet<>();
-        if (isolationGroups != null && 
isolationGroups.getLeft().contains(PULSAR_SYSTEM_TOPIC_ISOLATION_GROUP))  {
+        if (isolationGroups != null && 
isolationGroups.getLeft().contains(PULSAR_SYSTEM_TOPIC_ISOLATION_GROUP)) {
             return excludedBookies;
         }
         try {
             if (bookieMappingCache != null) {
-                CompletableFuture<Optional<BookiesRackConfiguration>> future =
-                        
bookieMappingCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH);
+                
bookieMappingCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH)
+                        .thenAccept(opt -> cachedRackConfiguration = 
opt.orElse(null)).exceptionally(e -> {
+                            log.warn("Failed to update the newest bookies rack 
config.");
+                            return null;
+                        });
 
-                Optional<BookiesRackConfiguration> optRes = (future.isDone() 
&& !future.isCompletedExceptionally())
-                        ? future.join() : Optional.empty();
-
-                if (optRes.isEmpty()) {
+                BookiesRackConfiguration allGroupsBookieMapping = 
cachedRackConfiguration;
+                if (allGroupsBookieMapping == null) {
+                    log.debug("The bookies rack config is not available at 
now.");
                     return excludedBookies;
                 }
-
-                BookiesRackConfiguration allGroupsBookieMapping = optRes.get();
                 Set<String> allBookies = allGroupsBookieMapping.keySet();
                 int totalAvailableBookiesInPrimaryGroup = 0;
                 Set<String> primaryIsolationGroup = Collections.emptySet();
diff --git 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
index f535ced08f7..beb00197e4e 100644
--- 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
+++ 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
@@ -18,11 +18,14 @@
  */
 package org.apache.pulsar.bookie.rackawareness;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Sets;
 import io.netty.util.HashedWheelTimer;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
@@ -34,18 +37,23 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.feature.SettableFeatureProvider;
 import org.apache.bookkeeper.net.BookieId;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.pulsar.common.policies.data.BookieInfo;
+import org.apache.pulsar.common.policies.data.BookiesRackConfiguration;
 import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.MetadataStoreFactory;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -114,6 +122,113 @@ public class IsolatedBookieEnsemblePlacementPolicyTest {
         assertFalse(ensemble.contains(new 
BookieSocketAddress(BOOKIE4).toBookieId()));
     }
 
+    @Test
+    public void testMetadataStoreCases() throws Exception {
+        Map<String, BookieInfo> mainBookieGroup = new HashMap<>();
+        mainBookieGroup.put(BOOKIE1, 
BookieInfo.builder().rack("rack0").build());
+        mainBookieGroup.put(BOOKIE2, 
BookieInfo.builder().rack("rack1").build());
+        mainBookieGroup.put(BOOKIE3, 
BookieInfo.builder().rack("rack1").build());
+        mainBookieGroup.put(BOOKIE4, 
BookieInfo.builder().rack("rack0").build());
+
+        Map<String, BookieInfo> secondaryBookieGroup = new HashMap<>();
+
+        store = mock(MetadataStoreExtended.class);
+        MetadataCacheImpl cache = mock(MetadataCacheImpl.class);
+        
when(store.getMetadataCache(BookiesRackConfiguration.class)).thenReturn(cache);
+        CompletableFuture<Optional<BookiesRackConfiguration>> initialFuture = 
new CompletableFuture<>();
+        //The initialFuture only has group1.
+        BookiesRackConfiguration rackConfiguration1 = new 
BookiesRackConfiguration();
+        rackConfiguration1.put("group1", mainBookieGroup);
+        rackConfiguration1.put("group2", secondaryBookieGroup);
+        initialFuture.complete(Optional.of(rackConfiguration1));
+
+        long waitTime = 2000;
+        CompletableFuture<Optional<BookiesRackConfiguration>> 
waitingCompleteFuture = new CompletableFuture<>();
+        new Thread(() -> {
+            try {
+                Thread.sleep(waitTime);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+            //The waitingCompleteFuture has group1 and group2.
+            BookiesRackConfiguration rackConfiguration2 = new 
BookiesRackConfiguration();
+            Map<String, BookieInfo> mainBookieGroup2 = new HashMap<>();
+            mainBookieGroup2.put(BOOKIE1, 
BookieInfo.builder().rack("rack0").build());
+            mainBookieGroup2.put(BOOKIE2, 
BookieInfo.builder().rack("rack1").build());
+            mainBookieGroup2.put(BOOKIE4, 
BookieInfo.builder().rack("rack0").build());
+
+            Map<String, BookieInfo> secondaryBookieGroup2 = new HashMap<>();
+            secondaryBookieGroup2.put(BOOKIE3, 
BookieInfo.builder().rack("rack0").build());
+            rackConfiguration2.put("group1", mainBookieGroup2);
+            rackConfiguration2.put("group2", secondaryBookieGroup2);
+            waitingCompleteFuture.complete(Optional.of(rackConfiguration2));
+        }).start();
+
+        long longWaitTime = 4000;
+        CompletableFuture<Optional<BookiesRackConfiguration>> emptyFuture = 
new CompletableFuture<>();
+        new Thread(() -> {
+            try {
+                Thread.sleep(longWaitTime);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+            //The emptyFuture means that the zk node /bookies already be 
removed.
+            emptyFuture.complete(Optional.empty());
+        }).start();
+
+        //Return different future means that cache expire.
+        when(cache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH))
+                .thenReturn(initialFuture).thenReturn(initialFuture)
+                
.thenReturn(waitingCompleteFuture).thenReturn(waitingCompleteFuture)
+                .thenReturn(emptyFuture).thenReturn(emptyFuture);
+
+        IsolatedBookieEnsemblePlacementPolicy isolationPolicy = new 
IsolatedBookieEnsemblePlacementPolicy();
+        ClientConfiguration bkClientConf = new ClientConfiguration();
+        
bkClientConf.setProperty(BookieRackAffinityMapping.METADATA_STORE_INSTANCE, 
store);
+        
bkClientConf.setProperty(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
 isolationGroups);
+        isolationPolicy.initialize(bkClientConf, Optional.empty(), timer, 
SettableFeatureProvider.DISABLE_ALL, NullStatsLogger.INSTANCE, 
BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
+        isolationPolicy.onClusterChanged(writableBookies, readOnlyBookies);
+
+        MutablePair<Set<String>, Set<String>> groups = new MutablePair<>();
+        groups.setLeft(Sets.newHashSet("group1"));
+        groups.setRight(new HashSet<>());
+
+        //initialFuture, the future is waiting done.
+        Set<BookieId> blacklist =
+                isolationPolicy.getExcludedBookiesWithIsolationGroups(2, 
groups);
+        assertTrue(blacklist.isEmpty());
+
+        //waitingCompleteFuture, the future is waiting done.
+        blacklist =
+                isolationPolicy.getExcludedBookiesWithIsolationGroups(2, 
groups);
+        assertTrue(blacklist.isEmpty());
+
+        Thread.sleep(waitTime);
+
+        //waitingCompleteFuture, the future is already done.
+        blacklist =
+                isolationPolicy.getExcludedBookiesWithIsolationGroups(2, 
groups);
+        assertFalse(blacklist.isEmpty());
+        assertEquals(blacklist.size(), 1);
+        BookieId excludeBookie = blacklist.iterator().next();
+        assertEquals(excludeBookie.toString(), BOOKIE3);
+
+        //emptyFuture, the future is waiting done.
+        blacklist =
+                isolationPolicy.getExcludedBookiesWithIsolationGroups(2, 
groups);
+        assertFalse(blacklist.isEmpty());
+        assertEquals(blacklist.size(), 1);
+        excludeBookie = blacklist.iterator().next();
+        assertEquals(excludeBookie.toString(), BOOKIE3);
+
+        Thread.sleep(longWaitTime - waitTime);
+
+        //emptyFuture, the future is already done.
+        blacklist =
+                isolationPolicy.getExcludedBookiesWithIsolationGroups(2, 
groups);
+        assertTrue(blacklist.isEmpty());
+    }
+
     @Test
     public void testBasic() throws Exception {
         Map<String, Map<String, BookieInfo>> bookieMapping = new HashMap<>();

Reply via email to