This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 002671931b6 [branch-2.10] [fix] [broker] Fix isolated group not work
problem. (#21098)
002671931b6 is described below
commit 002671931b66048a3e828ce1f2810819d7ef6c1b
Author: Yan Zhao <[email protected]>
AuthorDate: Wed Sep 6 12:44:00 2023 +0800
[branch-2.10] [fix] [broker] Fix isolated group not work problem. (#21098)
---
.../IsolatedBookieEnsemblePlacementPolicy.java | 31 +++---
.../IsolatedBookieEnsemblePlacementPolicyTest.java | 106 +++++++++++++++++++++
2 files changed, 124 insertions(+), 13 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
index ae288fcf7f2..64bc59057d0 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
@@ -19,16 +19,15 @@
package org.apache.pulsar.bookie.rackawareness;
import static
org.apache.pulsar.bookie.rackawareness.BookieRackAffinityMapping.METADATA_STORE_INSTANCE;
+import com.google.common.annotations.VisibleForTesting;
import io.netty.util.HashedWheelTimer;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
@@ -60,6 +59,7 @@ public class IsolatedBookieEnsemblePlacementPolicy extends
RackawareEnsemblePlac
private MetadataCache<BookiesRackConfiguration> bookieMappingCache;
+ private volatile BookiesRackConfiguration cachedRackConfiguration = null;
public IsolatedBookieEnsemblePlacementPolicy() {
super();
@@ -87,7 +87,12 @@ public class IsolatedBookieEnsemblePlacementPolicy extends
RackawareEnsemblePlac
// Only add the bookieMappingCache if we have defined an
isolation group
bookieMappingCache =
store.getMetadataCache(BookiesRackConfiguration.class);
-
bookieMappingCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH).join();
+
bookieMappingCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH).thenAccept(opt
-> opt.ifPresent(
+ bookiesRackConfiguration ->
cachedRackConfiguration = bookiesRackConfiguration))
+ .exceptionally(e -> {
+ log.warn("Failed to load bookies rack
configuration while initialize the PlacementPolicy.");
+ return null;
+ });
}
}
if (conf.getProperty(SECONDARY_ISOLATION_BOOKIE_GROUPS) != null) {
@@ -107,7 +112,6 @@ public class IsolatedBookieEnsemblePlacementPolicy extends
RackawareEnsemblePlac
public PlacementResult<List<BookieId>> newEnsemble(int ensembleSize, int
writeQuorumSize, int ackQuorumSize,
Map<String, byte[]> customMetadata, Set<BookieId> excludeBookies)
throws BKNotEnoughBookiesException {
- Map<String, List<String>> isolationGroup = new HashMap<>();
Set<BookieId> blacklistedBookies =
getBlacklistedBookiesWithIsolationGroups(
ensembleSize, defaultIsolationGroups);
if (excludeBookies == null) {
@@ -182,22 +186,23 @@ public class IsolatedBookieEnsemblePlacementPolicy
extends RackawareEnsemblePlac
return pair;
}
- private Set<BookieId> getBlacklistedBookiesWithIsolationGroups(int
ensembleSize,
+ @VisibleForTesting
+ Set<BookieId> getBlacklistedBookiesWithIsolationGroups(int ensembleSize,
Pair<Set<String>, Set<String>> isolationGroups) {
Set<BookieId> blacklistedBookies = new HashSet<>();
try {
if (bookieMappingCache != null) {
- CompletableFuture<Optional<BookiesRackConfiguration>> future =
-
bookieMappingCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH);
+
bookieMappingCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH)
+ .thenAccept(opt -> cachedRackConfiguration =
opt.orElse(null)).exceptionally(e -> {
+ log.warn("Failed to update the newest bookies rack
config.");
+ return null;
+ });
- Optional<BookiesRackConfiguration> optRes = (future.isDone()
&& !future.isCompletedExceptionally())
- ? future.join() : Optional.empty();
-
- if (!optRes.isPresent()) {
+ BookiesRackConfiguration allGroupsBookieMapping =
cachedRackConfiguration;
+ if (allGroupsBookieMapping == null) {
+ log.debug("The bookies rack config is not available at
now.");
return blacklistedBookies;
}
-
- BookiesRackConfiguration allGroupsBookieMapping = optRes.get();
Set<String> allBookies = allGroupsBookieMapping.keySet();
int totalAvailableBookiesInPrimaryGroup = 0;
Set<String> primaryIsolationGroup = Collections.emptySet();
diff --git
a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
index 85feeaecfdd..e8a1568354d 100644
---
a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
+++
b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
@@ -18,11 +18,14 @@
*/
package org.apache.pulsar.bookie.rackawareness;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Sets;
import io.netty.util.HashedWheelTimer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@@ -34,18 +37,23 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.feature.SettableFeatureProvider;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.pulsar.common.policies.data.BookieInfo;
+import org.apache.pulsar.common.policies.data.BookiesRackConfiguration;
import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreFactory;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -87,6 +95,104 @@ public class IsolatedBookieEnsemblePlacementPolicyTest {
timer.stop();
}
+ @Test
+ public void testMetadataStoreCases() throws Exception {
+ Map<String, BookieInfo> mainBookieGroup = new HashMap<>();
+ mainBookieGroup.put(BOOKIE1,
BookieInfo.builder().rack("rack0").build());
+ mainBookieGroup.put(BOOKIE2,
BookieInfo.builder().rack("rack1").build());
+
+ Map<String, BookieInfo> secondaryBookieGroup = new HashMap<>();
+ secondaryBookieGroup.put(BOOKIE3,
BookieInfo.builder().rack("rack0").build());
+
+ store = mock(MetadataStoreExtended.class);
+ MetadataCacheImpl cache = mock(MetadataCacheImpl.class);
+
when(store.getMetadataCache(BookiesRackConfiguration.class)).thenReturn(cache);
+ CompletableFuture<Optional<BookiesRackConfiguration>> initialFuture =
new CompletableFuture<>();
+ //The initialFuture only has group1.
+ BookiesRackConfiguration rackConfiguration1 = new
BookiesRackConfiguration();
+ rackConfiguration1.put("group1", mainBookieGroup);
+ initialFuture.complete(Optional.of(rackConfiguration1));
+
+ long waitTime = 2000;
+ CompletableFuture<Optional<BookiesRackConfiguration>>
waitingCompleteFuture = new CompletableFuture<>();
+ new Thread(() -> {
+ try {
+ Thread.sleep(waitTime);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ //The waitingCompleteFuture has group1 and group2.
+ BookiesRackConfiguration rackConfiguration2 = new
BookiesRackConfiguration();
+ rackConfiguration2.put("group1", mainBookieGroup);
+ rackConfiguration2.put("group2", secondaryBookieGroup);
+ waitingCompleteFuture.complete(Optional.of(rackConfiguration2));
+ }).start();
+
+ long longWaitTime = 4000;
+ CompletableFuture<Optional<BookiesRackConfiguration>> emptyFuture =
new CompletableFuture<>();
+ new Thread(() -> {
+ try {
+ Thread.sleep(longWaitTime);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ //The emptyFuture means that the zk node /bookies already be
removed.
+ emptyFuture.complete(Optional.empty());
+ }).start();
+
+ //Return different future means that cache expire.
+ when(cache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH))
+ .thenReturn(initialFuture).thenReturn(initialFuture)
+
.thenReturn(waitingCompleteFuture).thenReturn(waitingCompleteFuture)
+ .thenReturn(emptyFuture).thenReturn(emptyFuture);
+
+ IsolatedBookieEnsemblePlacementPolicy isolationPolicy = new
IsolatedBookieEnsemblePlacementPolicy();
+ ClientConfiguration bkClientConf = new ClientConfiguration();
+
bkClientConf.setProperty(BookieRackAffinityMapping.METADATA_STORE_INSTANCE,
store);
+
bkClientConf.setProperty(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
isolationGroups);
+ isolationPolicy.initialize(bkClientConf, Optional.empty(), timer,
SettableFeatureProvider.DISABLE_ALL, NullStatsLogger.INSTANCE,
BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
+ isolationPolicy.onClusterChanged(writableBookies, readOnlyBookies);
+
+ MutablePair<Set<String>, Set<String>> groups = new MutablePair<>();
+ groups.setLeft(Sets.newHashSet("group1"));
+ groups.setRight(new HashSet<>());
+
+ //initialFuture, the future is waiting done.
+ Set<BookieId> blacklist =
+ isolationPolicy.getBlacklistedBookiesWithIsolationGroups(2,
groups);
+ assertTrue(blacklist.isEmpty());
+
+ //waitingCompleteFuture, the future is waiting done.
+ blacklist =
+ isolationPolicy.getBlacklistedBookiesWithIsolationGroups(2,
groups);
+ assertTrue(blacklist.isEmpty());
+
+ Thread.sleep(waitTime);
+
+ //waitingCompleteFuture, the future is already done.
+ blacklist =
+ isolationPolicy.getBlacklistedBookiesWithIsolationGroups(2,
groups);
+ assertFalse(blacklist.isEmpty());
+ assertEquals(blacklist.size(), 1);
+ BookieId excludeBookie = blacklist.iterator().next();
+ assertEquals(excludeBookie.toString(), BOOKIE3);
+
+ //emptyFuture, the future is waiting done.
+ blacklist =
+ isolationPolicy.getBlacklistedBookiesWithIsolationGroups(2,
groups);
+ assertFalse(blacklist.isEmpty());
+ assertEquals(blacklist.size(), 1);
+ excludeBookie = blacklist.iterator().next();
+ assertEquals(excludeBookie.toString(), BOOKIE3);
+
+ Thread.sleep(longWaitTime - waitTime);
+
+ //emptyFuture, the future is already done.
+ blacklist =
+ isolationPolicy.getBlacklistedBookiesWithIsolationGroups(2,
groups);
+ assertTrue(blacklist.isEmpty());
+ }
+
@Test
public void testBasic() throws Exception {
Map<String, Map<String, BookieInfo>> bookieMapping = new HashMap<>();