This is an automated email from the ASF dual-hosted git repository.
chenhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new abd7bfae3b2 [fix] [broker] Fix isolated group not work problem.
(#21096)
abd7bfae3b2 is described below
commit abd7bfae3b23186a5617aee0d5f575fffa606e38
Author: Yan Zhao <[email protected]>
AuthorDate: Tue Sep 5 08:12:49 2023 +0800
[fix] [broker] Fix isolated group not work problem. (#21096)
### Modifications
When upgraded the pulsar version from 2.9.2 to 2.10.3, and the isolated
group feature not work anymore.
Finally, we found the problem. In IsolatedBookieEnsemblePlacementPolicy,
when it gets the bookie rack from the metadata store cache, uses
future.isDone() to avoid sync operation. If the future is incomplete, return
empty blacklists.
The cache may expire due to the caffeine cache `getExpireAfterWriteMillis`
config, if the cache expires, the future may be incomplete. (#21095 will
correct the behavior)
In 2.9.2, it uses the sync to get data from the metadata store, we should
also keep the behavior.
---
.../IsolatedBookieEnsemblePlacementPolicy.java | 31 +++---
.../IsolatedBookieEnsemblePlacementPolicyTest.java | 115 +++++++++++++++++++++
2 files changed, 134 insertions(+), 12 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
index 164677ca9c6..02ddea94874 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.bookie.rackawareness;
import static
org.apache.pulsar.bookie.rackawareness.BookieRackAffinityMapping.METADATA_STORE_INSTANCE;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import io.netty.util.HashedWheelTimer;
import java.util.Collections;
@@ -27,7 +28,6 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
@@ -61,6 +61,7 @@ public class IsolatedBookieEnsemblePlacementPolicy extends
RackawareEnsemblePlac
private static final String PULSAR_SYSTEM_TOPIC_ISOLATION_GROUP = "*";
+ private volatile BookiesRackConfiguration cachedRackConfiguration = null;
public IsolatedBookieEnsemblePlacementPolicy() {
super();
@@ -86,7 +87,12 @@ public class IsolatedBookieEnsemblePlacementPolicy extends
RackawareEnsemblePlac
}
// Only add the bookieMappingCache if we have defined an isolation
group
bookieMappingCache =
store.getMetadataCache(BookiesRackConfiguration.class);
-
bookieMappingCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH).join();
+
bookieMappingCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH).thenAccept(opt
-> opt.ifPresent(
+ bookiesRackConfiguration ->
cachedRackConfiguration = bookiesRackConfiguration))
+ .exceptionally(e -> {
+ log.warn("Failed to load bookies rack configuration
while initialize the PlacementPolicy.");
+ return null;
+ });
}
if (conf.getProperty(SECONDARY_ISOLATION_BOOKIE_GROUPS) != null) {
String secondaryIsolationGroupsString = ConfigurationStringUtil
@@ -179,25 +185,26 @@ public class IsolatedBookieEnsemblePlacementPolicy
extends RackawareEnsemblePlac
return pair;
}
- private Set<BookieId> getExcludedBookiesWithIsolationGroups(int
ensembleSize,
+ @VisibleForTesting
+ Set<BookieId> getExcludedBookiesWithIsolationGroups(int ensembleSize,
Pair<Set<String>, Set<String>> isolationGroups) {
Set<BookieId> excludedBookies = new HashSet<>();
- if (isolationGroups != null &&
isolationGroups.getLeft().contains(PULSAR_SYSTEM_TOPIC_ISOLATION_GROUP)) {
+ if (isolationGroups != null &&
isolationGroups.getLeft().contains(PULSAR_SYSTEM_TOPIC_ISOLATION_GROUP)) {
return excludedBookies;
}
try {
if (bookieMappingCache != null) {
- CompletableFuture<Optional<BookiesRackConfiguration>> future =
-
bookieMappingCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH);
+
bookieMappingCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH)
+ .thenAccept(opt -> cachedRackConfiguration =
opt.orElse(null)).exceptionally(e -> {
+ log.warn("Failed to update the newest bookies rack
config.");
+ return null;
+ });
- Optional<BookiesRackConfiguration> optRes = (future.isDone()
&& !future.isCompletedExceptionally())
- ? future.join() : Optional.empty();
-
- if (optRes.isEmpty()) {
+ BookiesRackConfiguration allGroupsBookieMapping =
cachedRackConfiguration;
+ if (allGroupsBookieMapping == null) {
+ log.debug("The bookies rack config is not available at
now.");
return excludedBookies;
}
-
- BookiesRackConfiguration allGroupsBookieMapping = optRes.get();
Set<String> allBookies = allGroupsBookieMapping.keySet();
int totalAvailableBookiesInPrimaryGroup = 0;
Set<String> primaryIsolationGroup = Collections.emptySet();
diff --git
a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
index f535ced08f7..beb00197e4e 100644
---
a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
+++
b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
@@ -18,11 +18,14 @@
*/
package org.apache.pulsar.bookie.rackawareness;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Sets;
import io.netty.util.HashedWheelTimer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@@ -34,18 +37,23 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.feature.SettableFeatureProvider;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.pulsar.common.policies.data.BookieInfo;
+import org.apache.pulsar.common.policies.data.BookiesRackConfiguration;
import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreFactory;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
@@ -114,6 +122,113 @@ public class IsolatedBookieEnsemblePlacementPolicyTest {
assertFalse(ensemble.contains(new
BookieSocketAddress(BOOKIE4).toBookieId()));
}
+ @Test
+ public void testMetadataStoreCases() throws Exception {
+ Map<String, BookieInfo> mainBookieGroup = new HashMap<>();
+ mainBookieGroup.put(BOOKIE1,
BookieInfo.builder().rack("rack0").build());
+ mainBookieGroup.put(BOOKIE2,
BookieInfo.builder().rack("rack1").build());
+ mainBookieGroup.put(BOOKIE3,
BookieInfo.builder().rack("rack1").build());
+ mainBookieGroup.put(BOOKIE4,
BookieInfo.builder().rack("rack0").build());
+
+ Map<String, BookieInfo> secondaryBookieGroup = new HashMap<>();
+
+ store = mock(MetadataStoreExtended.class);
+ MetadataCacheImpl cache = mock(MetadataCacheImpl.class);
+
when(store.getMetadataCache(BookiesRackConfiguration.class)).thenReturn(cache);
+ CompletableFuture<Optional<BookiesRackConfiguration>> initialFuture =
new CompletableFuture<>();
+ //The initialFuture only has group1.
+ BookiesRackConfiguration rackConfiguration1 = new
BookiesRackConfiguration();
+ rackConfiguration1.put("group1", mainBookieGroup);
+ rackConfiguration1.put("group2", secondaryBookieGroup);
+ initialFuture.complete(Optional.of(rackConfiguration1));
+
+ long waitTime = 2000;
+ CompletableFuture<Optional<BookiesRackConfiguration>>
waitingCompleteFuture = new CompletableFuture<>();
+ new Thread(() -> {
+ try {
+ Thread.sleep(waitTime);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ //The waitingCompleteFuture has group1 and group2.
+ BookiesRackConfiguration rackConfiguration2 = new
BookiesRackConfiguration();
+ Map<String, BookieInfo> mainBookieGroup2 = new HashMap<>();
+ mainBookieGroup2.put(BOOKIE1,
BookieInfo.builder().rack("rack0").build());
+ mainBookieGroup2.put(BOOKIE2,
BookieInfo.builder().rack("rack1").build());
+ mainBookieGroup2.put(BOOKIE4,
BookieInfo.builder().rack("rack0").build());
+
+ Map<String, BookieInfo> secondaryBookieGroup2 = new HashMap<>();
+ secondaryBookieGroup2.put(BOOKIE3,
BookieInfo.builder().rack("rack0").build());
+ rackConfiguration2.put("group1", mainBookieGroup2);
+ rackConfiguration2.put("group2", secondaryBookieGroup2);
+ waitingCompleteFuture.complete(Optional.of(rackConfiguration2));
+ }).start();
+
+ long longWaitTime = 4000;
+ CompletableFuture<Optional<BookiesRackConfiguration>> emptyFuture =
new CompletableFuture<>();
+ new Thread(() -> {
+ try {
+ Thread.sleep(longWaitTime);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ //The emptyFuture means that the zk node /bookies already be
removed.
+ emptyFuture.complete(Optional.empty());
+ }).start();
+
+ //Return different future means that cache expire.
+ when(cache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH))
+ .thenReturn(initialFuture).thenReturn(initialFuture)
+
.thenReturn(waitingCompleteFuture).thenReturn(waitingCompleteFuture)
+ .thenReturn(emptyFuture).thenReturn(emptyFuture);
+
+ IsolatedBookieEnsemblePlacementPolicy isolationPolicy = new
IsolatedBookieEnsemblePlacementPolicy();
+ ClientConfiguration bkClientConf = new ClientConfiguration();
+
bkClientConf.setProperty(BookieRackAffinityMapping.METADATA_STORE_INSTANCE,
store);
+
bkClientConf.setProperty(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
isolationGroups);
+ isolationPolicy.initialize(bkClientConf, Optional.empty(), timer,
SettableFeatureProvider.DISABLE_ALL, NullStatsLogger.INSTANCE,
BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
+ isolationPolicy.onClusterChanged(writableBookies, readOnlyBookies);
+
+ MutablePair<Set<String>, Set<String>> groups = new MutablePair<>();
+ groups.setLeft(Sets.newHashSet("group1"));
+ groups.setRight(new HashSet<>());
+
+ //initialFuture, the future is waiting done.
+ Set<BookieId> blacklist =
+ isolationPolicy.getExcludedBookiesWithIsolationGroups(2,
groups);
+ assertTrue(blacklist.isEmpty());
+
+ //waitingCompleteFuture, the future is waiting done.
+ blacklist =
+ isolationPolicy.getExcludedBookiesWithIsolationGroups(2,
groups);
+ assertTrue(blacklist.isEmpty());
+
+ Thread.sleep(waitTime);
+
+ //waitingCompleteFuture, the future is already done.
+ blacklist =
+ isolationPolicy.getExcludedBookiesWithIsolationGroups(2,
groups);
+ assertFalse(blacklist.isEmpty());
+ assertEquals(blacklist.size(), 1);
+ BookieId excludeBookie = blacklist.iterator().next();
+ assertEquals(excludeBookie.toString(), BOOKIE3);
+
+ //emptyFuture, the future is waiting done.
+ blacklist =
+ isolationPolicy.getExcludedBookiesWithIsolationGroups(2,
groups);
+ assertFalse(blacklist.isEmpty());
+ assertEquals(blacklist.size(), 1);
+ excludeBookie = blacklist.iterator().next();
+ assertEquals(excludeBookie.toString(), BOOKIE3);
+
+ Thread.sleep(longWaitTime - waitTime);
+
+ //emptyFuture, the future is already done.
+ blacklist =
+ isolationPolicy.getExcludedBookiesWithIsolationGroups(2,
groups);
+ assertTrue(blacklist.isEmpty());
+ }
+
@Test
public void testBasic() throws Exception {
Map<String, Map<String, BookieInfo>> bookieMapping = new HashMap<>();