codelipenghui commented on a change in pull request #8961:
URL: https://github.com/apache/pulsar/pull/8961#discussion_r544093892
##########
File path:
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -3239,6 +3250,21 @@ protected void asyncCreateLedger(BookKeeper bookKeeper,
ManagedLedgerConfig conf
Map<String, byte[]> finalMetadata = new HashMap<>();
finalMetadata.putAll(ledgerMetadata);
finalMetadata.putAll(metadata);
+ if (config.getBookKeeperEnsemblePlacementPolicyClassName() != null) {
+ EnsemblePlacementPolicyConfig ensemblePlacementPolicyConfig = new
EnsemblePlacementPolicyConfig(
+ config.getBookKeeperEnsemblePlacementPolicyClassName(),
+ config.getBookKeeperEnsemblePlacementPolicyProperties()
+ );
+ try {
+
finalMetadata.put(EnsemblePlacementPolicyConfig.ENSEMBLE_PLACEMENT_POLICY_CONFIG,
+ ensemblePlacementPolicyConfig.encode());
Review comment:
it's better to move the placement policy ledger metadata to the
LedgerMetadataUtils.java. Currently, all of the different types of ledger
metadata is generated from the LedgerMetadataUtils.java
##########
File path:
pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java
##########
@@ -74,23 +78,26 @@ public RackawareEnsemblePlacementPolicyImpl
initialize(ClientConfiguration conf,
if (conf.getProperty(ISOLATION_BOOKIE_GROUPS) != null) {
String isolationGroupsString =
castToString(conf.getProperty(ISOLATION_BOOKIE_GROUPS));
if (!isolationGroupsString.isEmpty()) {
- for (String isolationGroup : isolationGroupsString.split(","))
{
- primaryIsolationGroups.add(isolationGroup);
- }
+ fillIsolationGroup(isolationGroupsString, "");
bookieMappingCache = getAndSetZkCache(conf);
}
}
if (conf.getProperty(SECONDARY_ISOLATION_BOOKIE_GROUPS) != null) {
String secondaryIsolationGroupsString =
castToString(conf.getProperty(SECONDARY_ISOLATION_BOOKIE_GROUPS));
- if (!secondaryIsolationGroupsString.isEmpty()) {
- for (String isolationGroup :
secondaryIsolationGroupsString.split(",")) {
- secondaryIsolationGroups.add(isolationGroup);
- }
- }
+ fillIsolationGroup("", secondaryIsolationGroupsString);
}
return super.initialize(conf, optionalDnsResolver, timer,
featureProvider, statsLogger, bookieAddressResolver);
}
+ private void fillIsolationGroup(String isolationBookieGroups, String
secondaryIsolationBookieGroups) {
Review comment:
```suggestion
private void fillIsolationGroup(Collection<String> isolationGroup,
String isolationGroupString) {
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]