This is an automated email from the ASF dual-hosted git repository.
zhangmingao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 95abf3afd0c fix the local police IsolationGroups cover the
defaultIsolationGroups cause can not rollback to use defaultIsolationGroups
(#16273)
95abf3afd0c is described below
commit 95abf3afd0cb3fdb368f9f9fae6dcff0b95983ce
Author: lixinyang <[email protected]>
AuthorDate: Tue Jul 5 10:33:25 2022 +0800
fix the local police IsolationGroups cover the defaultIsolationGroups cause
can not rollback to use defaultIsolationGroups (#16273)
Co-authored-by: nicklixinyang <[email protected]>
---
.../IsolatedBookieEnsemblePlacementPolicy.java | 46 +++++------------
.../IsolatedBookieEnsemblePlacementPolicyTest.java | 60 +++++++++++++++++++++-
2 files changed, 72 insertions(+), 34 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
index b1c08801cae..b8c288535bc 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
@@ -121,32 +121,7 @@ public class IsolatedBookieEnsemblePlacementPolicy extends
RackawareEnsemblePlac
public PlacementResult<List<BookieId>> newEnsemble(int ensembleSize, int
writeQuorumSize, int ackQuorumSize,
Map<String, byte[]> customMetadata, Set<BookieId> excludeBookies)
throws BKNotEnoughBookiesException {
- if
(customMetadata.containsKey(EnsemblePlacementPolicyConfig.ENSEMBLE_PLACEMENT_POLICY_CONFIG))
{
- try {
- EnsemblePlacementPolicyConfig policy =
EnsemblePlacementPolicyConfig
-
.decode(customMetadata.get(EnsemblePlacementPolicyConfig.ENSEMBLE_PLACEMENT_POLICY_CONFIG));
- Map<String, Object> policyProperties = policy.getProperties();
- String isolationBookieGroups =
- (String) policyProperties.get(ISOLATION_BOOKIE_GROUPS);
- String secondaryIsolationBookieGroups =
- (String)
policyProperties.get(SECONDARY_ISOLATION_BOOKIE_GROUPS);
- Set<String> primaryIsolationGroups = new HashSet<>();
- Set<String> secondaryIsolationGroups = new HashSet<>();
- if (isolationBookieGroups != null) {
-
primaryIsolationGroups.addAll(Arrays.asList(isolationBookieGroups.split(",")));
- }
- if (secondaryIsolationBookieGroups != null) {
-
secondaryIsolationGroups.addAll(Arrays.asList(secondaryIsolationBookieGroups.split(",")));
- }
- defaultIsolationGroups =
ImmutablePair.of(primaryIsolationGroups, secondaryIsolationGroups);
- } catch
(EnsemblePlacementPolicyConfig.ParseEnsemblePlacementPolicyConfigException e) {
- log.error("Failed to decode EnsemblePlacementPolicyConfig from
customeMetadata when choosing ensemble, "
- + "Will use defaultIsolationGroups instead");
- }
- }
-
- Set<BookieId> blacklistedBookies =
getBlacklistedBookiesWithIsolationGroups(
- ensembleSize, defaultIsolationGroups);
+ Set<BookieId> blacklistedBookies = getBlacklistedBookies(ensembleSize,
customMetadata);
if (excludeBookies == null) {
excludeBookies = new HashSet<BookieId>();
}
@@ -159,10 +134,20 @@ public class IsolatedBookieEnsemblePlacementPolicy
extends RackawareEnsemblePlac
Map<String, byte[]> customMetadata, List<BookieId> currentEnsemble,
BookieId bookieToReplace, Set<BookieId> excludeBookies)
throws BKNotEnoughBookiesException {
+ Set<BookieId> blacklistedBookies = getBlacklistedBookies(ensembleSize,
customMetadata);
+ if (excludeBookies == null) {
+ excludeBookies = new HashSet<BookieId>();
+ }
+ excludeBookies.addAll(blacklistedBookies);
+ return super.replaceBookie(ensembleSize, writeQuorumSize,
ackQuorumSize, customMetadata, currentEnsemble,
+ bookieToReplace, excludeBookies);
+ }
+
+ private Set<BookieId> getBlacklistedBookies(int ensembleSize, Map<String,
byte[]> customMetadata){
// parse the ensemble placement policy from the custom metadata, if it
is present, we will apply it to
// the isolation groups for filtering the bookies.
Optional<EnsemblePlacementPolicyConfig> ensemblePlacementPolicyConfig =
- getEnsemblePlacementPolicyConfig(customMetadata);
+ getEnsemblePlacementPolicyConfig(customMetadata);
Set<BookieId> blacklistedBookies;
if (ensemblePlacementPolicyConfig.isPresent()) {
EnsemblePlacementPolicyConfig config =
ensemblePlacementPolicyConfig.get();
@@ -171,12 +156,7 @@ public class IsolatedBookieEnsemblePlacementPolicy extends
RackawareEnsemblePlac
} else {
blacklistedBookies =
getBlacklistedBookiesWithIsolationGroups(ensembleSize, defaultIsolationGroups);
}
- if (excludeBookies == null) {
- excludeBookies = new HashSet<BookieId>();
- }
- excludeBookies.addAll(blacklistedBookies);
- return super.replaceBookie(ensembleSize, writeQuorumSize,
ackQuorumSize, customMetadata, currentEnsemble,
- bookieToReplace, excludeBookies);
+ return blacklistedBookies;
}
private static Optional<EnsemblePlacementPolicyConfig>
getEnsemblePlacementPolicyConfig(
diff --git
a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
index d0f9b410904..3ae5fd2b0ea 100644
---
a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
+++
b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
@@ -327,7 +327,7 @@ public class IsolatedBookieEnsemblePlacementPolicyTest {
placementPolicyProperties2
);
Map<String, byte[]> customMetadata2 = new HashMap<>();
-
customMetadata2.put(EnsemblePlacementPolicyConfig.ENSEMBLE_PLACEMENT_POLICY_CONFIG,
policyConfig.encode());
+
customMetadata2.put(EnsemblePlacementPolicyConfig.ENSEMBLE_PLACEMENT_POLICY_CONFIG,
policyConfig2.encode());
BookieId replaceBookie2 = isolationPolicy.replaceBookie(3, 3, 3,
customMetadata2,
Arrays.asList(bookie1Id,bookie2Id,bookie3Id), bookie3Id,
null).getResult();
assertEquals(replaceBookie2, bookie4Id);
@@ -526,4 +526,62 @@ public class IsolatedBookieEnsemblePlacementPolicyTest {
Arrays.asList(bookie1Id, bookie3Id), bookie3Id, null).getResult();
assertEquals(bookieId, bookie2Id);
}
+
+ @Test
+ public void testDefaultIsolationPolicyNotCovered() throws Exception {
+ Map<String, Map<String, BookieInfo>> bookieMapping = new HashMap<>();
+ final String defaultIsolatedGroup = "Group1";
+ final String defaultSecondaryIsolatedGroup = "Group2";
+ final String customIsolatedGroup = "Group2";
+
+ Map<String, BookieInfo> Group1 = new HashMap<>();
+ Group1.put(BOOKIE1, BookieInfo.builder().rack("rack0").build());
+ Group1.put(BOOKIE2, BookieInfo.builder().rack("rack0").build());
+
+ Map<String, BookieInfo> Group2 = new HashMap<>();
+ Group2.put(BOOKIE3, BookieInfo.builder().rack("rack1").build());
+ Group2.put(BOOKIE4, BookieInfo.builder().rack("rack1").build());
+
+ Set<BookieId> BookieIdGroup1 = new HashSet<>();
+ BookieIdGroup1.add(new BookieSocketAddress(BOOKIE1).toBookieId());
+ BookieIdGroup1.add(new BookieSocketAddress(BOOKIE2).toBookieId());
+ Set<BookieId> BookieIdGroup2 = new HashSet<>();
+ BookieIdGroup2.add(new BookieSocketAddress(BOOKIE3).toBookieId());
+ BookieIdGroup2.add(new BookieSocketAddress(BOOKIE4).toBookieId());
+
+ bookieMapping.put(defaultIsolatedGroup, Group1);
+ bookieMapping.put(defaultSecondaryIsolatedGroup, Group2);
+
+ store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
jsonMapper.writeValueAsBytes(bookieMapping),
+ Optional.empty()).join();
+
+ IsolatedBookieEnsemblePlacementPolicy isolationPolicy = new
IsolatedBookieEnsemblePlacementPolicy();
+ ClientConfiguration bkClientConf = new ClientConfiguration();
+
bkClientConf.setProperty(BookieRackAffinityMapping.METADATA_STORE_INSTANCE,
store);
+
bkClientConf.setProperty(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
defaultIsolatedGroup);
+
bkClientConf.setProperty(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
defaultSecondaryIsolatedGroup);
+ isolationPolicy.initialize(bkClientConf, Optional.empty(), timer,
SettableFeatureProvider.DISABLE_ALL,
+ NullStatsLogger.INSTANCE,
BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
+ isolationPolicy.onClusterChanged(writableBookies, readOnlyBookies);
+
+ Map<String, Object> placementPolicyProperties = new HashMap<>();
+ placementPolicyProperties.put(
+ IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
customIsolatedGroup);
+ placementPolicyProperties.put(
+
IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS, "");
+
+ EnsemblePlacementPolicyConfig policyConfig = new
EnsemblePlacementPolicyConfig(
+ IsolatedBookieEnsemblePlacementPolicy.class,
+ placementPolicyProperties
+ );
+ Map<String, byte[]> customMetadata = new HashMap<>();
+
customMetadata.put(EnsemblePlacementPolicyConfig.ENSEMBLE_PLACEMENT_POLICY_CONFIG,
policyConfig.encode());
+
+ List<BookieId> customBookieList = isolationPolicy
+ .newEnsemble(2, 2, 2, customMetadata, new
HashSet<>()).getResult();
+ assertEquals(BookieIdGroup2.containsAll(customBookieList),true);
+ List<BookieId> defaultBookieList = isolationPolicy
+ .newEnsemble(2, 2, 2, Collections.emptyMap(), new
HashSet<>()).getResult();
+ assertEquals(BookieIdGroup1.containsAll(defaultBookieList),true);
+ }
}