This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 002671931b6 [branch-2.10] [fix] [broker] Fix isolated group not work 
problem. (#21098)
002671931b6 is described below

commit 002671931b66048a3e828ce1f2810819d7ef6c1b
Author: Yan Zhao <[email protected]>
AuthorDate: Wed Sep 6 12:44:00 2023 +0800

    [branch-2.10] [fix] [broker] Fix isolated group not work problem. (#21098)
---
 .../IsolatedBookieEnsemblePlacementPolicy.java     |  31 +++---
 .../IsolatedBookieEnsemblePlacementPolicyTest.java | 106 +++++++++++++++++++++
 2 files changed, 124 insertions(+), 13 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
index ae288fcf7f2..64bc59057d0 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
@@ -19,16 +19,15 @@
 package org.apache.pulsar.bookie.rackawareness;
 
 import static 
org.apache.pulsar.bookie.rackawareness.BookieRackAffinityMapping.METADATA_STORE_INSTANCE;
+import com.google.common.annotations.VisibleForTesting;
 import io.netty.util.HashedWheelTimer;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
 import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
@@ -60,6 +59,7 @@ public class IsolatedBookieEnsemblePlacementPolicy extends 
RackawareEnsemblePlac
 
     private MetadataCache<BookiesRackConfiguration> bookieMappingCache;
 
+    private volatile BookiesRackConfiguration cachedRackConfiguration = null;
 
     public IsolatedBookieEnsemblePlacementPolicy() {
         super();
@@ -87,7 +87,12 @@ public class IsolatedBookieEnsemblePlacementPolicy extends 
RackawareEnsemblePlac
 
                 // Only add the bookieMappingCache if we have defined an 
isolation group
                 bookieMappingCache = 
store.getMetadataCache(BookiesRackConfiguration.class);
-                
bookieMappingCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH).join();
+                
bookieMappingCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH).thenAccept(opt
 -> opt.ifPresent(
+                                bookiesRackConfiguration -> 
cachedRackConfiguration = bookiesRackConfiguration))
+                        .exceptionally(e -> {
+                            log.warn("Failed to load bookies rack 
configuration while initialize the PlacementPolicy.");
+                            return null;
+                        });
             }
         }
         if (conf.getProperty(SECONDARY_ISOLATION_BOOKIE_GROUPS) != null) {
@@ -107,7 +112,6 @@ public class IsolatedBookieEnsemblePlacementPolicy extends 
RackawareEnsemblePlac
     public PlacementResult<List<BookieId>> newEnsemble(int ensembleSize, int 
writeQuorumSize, int ackQuorumSize,
             Map<String, byte[]> customMetadata, Set<BookieId> excludeBookies)
             throws BKNotEnoughBookiesException {
-        Map<String, List<String>> isolationGroup = new HashMap<>();
         Set<BookieId> blacklistedBookies = 
getBlacklistedBookiesWithIsolationGroups(
             ensembleSize, defaultIsolationGroups);
         if (excludeBookies == null) {
@@ -182,22 +186,23 @@ public class IsolatedBookieEnsemblePlacementPolicy 
extends RackawareEnsemblePlac
         return pair;
     }
 
-    private Set<BookieId> getBlacklistedBookiesWithIsolationGroups(int 
ensembleSize,
+    @VisibleForTesting
+    Set<BookieId> getBlacklistedBookiesWithIsolationGroups(int ensembleSize,
         Pair<Set<String>, Set<String>> isolationGroups) {
         Set<BookieId> blacklistedBookies = new HashSet<>();
         try {
             if (bookieMappingCache != null) {
-                CompletableFuture<Optional<BookiesRackConfiguration>> future =
-                        
bookieMappingCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH);
+                
bookieMappingCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH)
+                        .thenAccept(opt -> cachedRackConfiguration = 
opt.orElse(null)).exceptionally(e -> {
+                            log.warn("Failed to update the newest bookies rack 
config.");
+                            return null;
+                        });
 
-                Optional<BookiesRackConfiguration> optRes = (future.isDone() 
&& !future.isCompletedExceptionally())
-                        ? future.join() : Optional.empty();
-
-                if (!optRes.isPresent()) {
+                BookiesRackConfiguration allGroupsBookieMapping = 
cachedRackConfiguration;
+                if (allGroupsBookieMapping == null) {
+                    log.debug("The bookies rack config is not available at 
now.");
                     return blacklistedBookies;
                 }
-
-                BookiesRackConfiguration allGroupsBookieMapping = optRes.get();
                 Set<String> allBookies = allGroupsBookieMapping.keySet();
                 int totalAvailableBookiesInPrimaryGroup = 0;
                 Set<String> primaryIsolationGroup = Collections.emptySet();
diff --git 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
index 85feeaecfdd..e8a1568354d 100644
--- 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
+++ 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
@@ -18,11 +18,14 @@
  */
 package org.apache.pulsar.bookie.rackawareness;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Sets;
 import io.netty.util.HashedWheelTimer;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
@@ -34,18 +37,23 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.feature.SettableFeatureProvider;
 import org.apache.bookkeeper.net.BookieId;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.pulsar.common.policies.data.BookieInfo;
+import org.apache.pulsar.common.policies.data.BookiesRackConfiguration;
 import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.MetadataStoreFactory;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;
 import org.awaitility.Awaitility;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -87,6 +95,104 @@ public class IsolatedBookieEnsemblePlacementPolicyTest {
         timer.stop();
     }
 
+    @Test
+    public void testMetadataStoreCases() throws Exception {
+        Map<String, BookieInfo> mainBookieGroup = new HashMap<>();
+        mainBookieGroup.put(BOOKIE1, 
BookieInfo.builder().rack("rack0").build());
+        mainBookieGroup.put(BOOKIE2, 
BookieInfo.builder().rack("rack1").build());
+
+        Map<String, BookieInfo> secondaryBookieGroup = new HashMap<>();
+        secondaryBookieGroup.put(BOOKIE3, 
BookieInfo.builder().rack("rack0").build());
+
+        store = mock(MetadataStoreExtended.class);
+        MetadataCacheImpl cache = mock(MetadataCacheImpl.class);
+        
when(store.getMetadataCache(BookiesRackConfiguration.class)).thenReturn(cache);
+        CompletableFuture<Optional<BookiesRackConfiguration>> initialFuture = 
new CompletableFuture<>();
+        //The initialFuture only has group1.
+        BookiesRackConfiguration rackConfiguration1 = new 
BookiesRackConfiguration();
+        rackConfiguration1.put("group1", mainBookieGroup);
+        initialFuture.complete(Optional.of(rackConfiguration1));
+
+        long waitTime = 2000;
+        CompletableFuture<Optional<BookiesRackConfiguration>> 
waitingCompleteFuture = new CompletableFuture<>();
+        new Thread(() -> {
+            try {
+                Thread.sleep(waitTime);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+            //The waitingCompleteFuture has group1 and group2.
+            BookiesRackConfiguration rackConfiguration2 = new 
BookiesRackConfiguration();
+            rackConfiguration2.put("group1", mainBookieGroup);
+            rackConfiguration2.put("group2", secondaryBookieGroup);
+            waitingCompleteFuture.complete(Optional.of(rackConfiguration2));
+        }).start();
+
+        long longWaitTime = 4000;
+        CompletableFuture<Optional<BookiesRackConfiguration>> emptyFuture = 
new CompletableFuture<>();
+        new Thread(() -> {
+            try {
+                Thread.sleep(longWaitTime);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+            //The emptyFuture means that the zk node /bookies already be 
removed.
+            emptyFuture.complete(Optional.empty());
+        }).start();
+
+        //Return different future means that cache expire.
+        when(cache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH))
+                .thenReturn(initialFuture).thenReturn(initialFuture)
+                
.thenReturn(waitingCompleteFuture).thenReturn(waitingCompleteFuture)
+                .thenReturn(emptyFuture).thenReturn(emptyFuture);
+
+        IsolatedBookieEnsemblePlacementPolicy isolationPolicy = new 
IsolatedBookieEnsemblePlacementPolicy();
+        ClientConfiguration bkClientConf = new ClientConfiguration();
+        
bkClientConf.setProperty(BookieRackAffinityMapping.METADATA_STORE_INSTANCE, 
store);
+        
bkClientConf.setProperty(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
 isolationGroups);
+        isolationPolicy.initialize(bkClientConf, Optional.empty(), timer, 
SettableFeatureProvider.DISABLE_ALL, NullStatsLogger.INSTANCE, 
BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
+        isolationPolicy.onClusterChanged(writableBookies, readOnlyBookies);
+
+        MutablePair<Set<String>, Set<String>> groups = new MutablePair<>();
+        groups.setLeft(Sets.newHashSet("group1"));
+        groups.setRight(new HashSet<>());
+
+        //initialFuture, the future is waiting done.
+        Set<BookieId> blacklist =
+                isolationPolicy.getBlacklistedBookiesWithIsolationGroups(2, 
groups);
+        assertTrue(blacklist.isEmpty());
+
+        //waitingCompleteFuture, the future is waiting done.
+        blacklist =
+                isolationPolicy.getBlacklistedBookiesWithIsolationGroups(2, 
groups);
+        assertTrue(blacklist.isEmpty());
+
+        Thread.sleep(waitTime);
+
+        //waitingCompleteFuture, the future is already done.
+        blacklist =
+                isolationPolicy.getBlacklistedBookiesWithIsolationGroups(2, 
groups);
+        assertFalse(blacklist.isEmpty());
+        assertEquals(blacklist.size(), 1);
+        BookieId excludeBookie = blacklist.iterator().next();
+        assertEquals(excludeBookie.toString(), BOOKIE3);
+
+        //emptyFuture, the future is waiting done.
+        blacklist =
+                isolationPolicy.getBlacklistedBookiesWithIsolationGroups(2, 
groups);
+        assertFalse(blacklist.isEmpty());
+        assertEquals(blacklist.size(), 1);
+        excludeBookie = blacklist.iterator().next();
+        assertEquals(excludeBookie.toString(), BOOKIE3);
+
+        Thread.sleep(longWaitTime - waitTime);
+
+        //emptyFuture, the future is already done.
+        blacklist =
+                isolationPolicy.getBlacklistedBookiesWithIsolationGroups(2, 
groups);
+        assertTrue(blacklist.isEmpty());
+    }
+
     @Test
     public void testBasic() throws Exception {
         Map<String, Map<String, BookieInfo>> bookieMapping = new HashMap<>();

Reply via email to