codelipenghui commented on a change in pull request #8961:
URL: https://github.com/apache/pulsar/pull/8961#discussion_r544093892



##########
File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -3239,6 +3250,21 @@ protected void asyncCreateLedger(BookKeeper bookKeeper, 
ManagedLedgerConfig conf
         Map<String, byte[]> finalMetadata = new HashMap<>();
         finalMetadata.putAll(ledgerMetadata);
         finalMetadata.putAll(metadata);
+        if (config.getBookKeeperEnsemblePlacementPolicyClassName() != null) {
+            EnsemblePlacementPolicyConfig ensemblePlacementPolicyConfig = new 
EnsemblePlacementPolicyConfig(
+                config.getBookKeeperEnsemblePlacementPolicyClassName(),
+                config.getBookKeeperEnsemblePlacementPolicyProperties()
+            );
+            try {
+                
finalMetadata.put(EnsemblePlacementPolicyConfig.ENSEMBLE_PLACEMENT_POLICY_CONFIG,
+                    ensemblePlacementPolicyConfig.encode());

Review comment:
       it's better to move the placement policy ledger metadata to the 
LedgerMetadataUtils.java. Currently, all of the different types of ledger 
metadata is generated from the LedgerMetadataUtils.java

##########
File path: 
pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java
##########
@@ -74,23 +78,26 @@ public RackawareEnsemblePlacementPolicyImpl 
initialize(ClientConfiguration conf,
         if (conf.getProperty(ISOLATION_BOOKIE_GROUPS) != null) {
             String isolationGroupsString = 
castToString(conf.getProperty(ISOLATION_BOOKIE_GROUPS));
             if (!isolationGroupsString.isEmpty()) {
-                for (String isolationGroup : isolationGroupsString.split(",")) 
{
-                    primaryIsolationGroups.add(isolationGroup);
-                }
+                fillIsolationGroup(isolationGroupsString, "");
                 bookieMappingCache = getAndSetZkCache(conf);
             }
         }
         if (conf.getProperty(SECONDARY_ISOLATION_BOOKIE_GROUPS) != null) {
             String secondaryIsolationGroupsString = 
castToString(conf.getProperty(SECONDARY_ISOLATION_BOOKIE_GROUPS));
-            if (!secondaryIsolationGroupsString.isEmpty()) {
-                for (String isolationGroup : 
secondaryIsolationGroupsString.split(",")) {
-                    secondaryIsolationGroups.add(isolationGroup);
-                }
-            }
+            fillIsolationGroup("", secondaryIsolationGroupsString);
         }
         return super.initialize(conf, optionalDnsResolver, timer, 
featureProvider, statsLogger, bookieAddressResolver);
     }
 
+    private void fillIsolationGroup(String isolationBookieGroups, String 
secondaryIsolationBookieGroups) {

Review comment:
       ```suggestion
       private void fillIsolationGroup(Collection<String> isolationGroup, 
String isolationGroupString) {
       }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to