hangc0276 commented on a change in pull request #12025:
URL: https://github.com/apache/pulsar/pull/12025#discussion_r789541003



##########
File path: 
pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
##########
@@ -90,11 +92,10 @@ public RackawareEnsemblePlacementPolicyImpl 
initialize(ClientConfiguration conf,
                 for (String isolationGroup : isolationGroupsString.split(",")) 
{
                     primaryIsolationGroups.add(isolationGroup);
                 }
-
-                // Only add the bookieMappingCache if we have defined an 
isolation group
-                bookieMappingCache = 
store.getMetadataCache(BookiesRackConfiguration.class);
-                
bookieMappingCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH).join();
             }
+            // Only add the bookieMappingCache if we have defined an isolation 
group
+            bookieMappingCache = 
store.getMetadataCache(BookiesRackConfiguration.class);
+            
bookieMappingCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH).join();

Review comment:
       Do we need this line? it may throw CompletionException

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
##########
@@ -1480,22 +1485,77 @@ public void openLedgerFailed(ManagedLedgerException 
exception, Object ctx) {
                         );
                     }
 
-
                     ManagedLedgerConfig managedLedgerConfig = new 
ManagedLedgerConfig();
                     
managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble());
                     
managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum());
                     
managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum());
-                    if (localPolicies.isPresent() && 
localPolicies.get().bookieAffinityGroup != null) {
-                        managedLedgerConfig
-                                .setBookKeeperEnsemblePlacementPolicyClassName(
-                                        
IsolatedBookieEnsemblePlacementPolicy.class);
-                        Map<String, Object> properties = Maps.newHashMap();
-                        
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
-                                
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupPrimary());
-                        
properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
-                                
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupSecondary());
-                        
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
+
+                    if (serviceConfig.isStrictBookieAffinityEnabled()) {

Review comment:
       If user configured `strictBookieAffinityEnabled = true`, but not set 
bookie rack info for the cluster. After all brokers startup, the user set the 
bookie rack info by pulsar-admin, whether the namespace isolation still work?

##########
File path: site2/docs/reference-configuration.md
##########
@@ -364,6 +364,7 @@ brokerServiceCompactionThresholdInBytes|If the estimated 
backlog size is greater
 | additionalServletDirectory | Location of broker additional servlet NAR 
directory | ./brokerAdditionalServlet |
 | brokerEntryMetadataInterceptors | Set broker entry metadata interceptors.<br 
/><br />Multiple interceptors should be separated by commas. <br /><br 
/>Available 
values:<li>org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor</li><li>org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor</li>
 <br /><br />Example<br 
/>brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor,
 org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor|N/A |
 | enableExposingBrokerEntryMetadataToClient|Whether to expose broker entry 
metadata to client or not.<br /><br />Available 
values:<li>true</li><li>false</li><br />Example<br 
/>enableExposingBrokerEntryMetadataToClient=true  | false |
+| strictBookieAffinityEnabled | Enable or disable the strict bookie isolation 
strategy. If enabled, <br /> - `bookie-ensemble` first tries to choose bookies 
that belong to a namespace's affinity group. If the number of bookies is not 
enough, then the rest bookies are chosen. <br /> - If namespace has no affinity 
group, `bookie-ensemble` only chooses bookies that belong to no region. If the 
number of bookies is not enough, `BKNotEnoughBookiesException` is thrown.| 
false |

Review comment:
       >If enabled `bookie-ensemble` first tries to choose bookies that belong 
to a namespace's affinity group. If the number of bookies is not enough, then 
the rest bookies are chosen.
   
   then the rest non-region configured bookies will be chosen, right?

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
##########
@@ -1479,22 +1483,79 @@ public void openLedgerFailed(ManagedLedgerException 
exception, Object ctx) {
                         );
                     }
 
-
                     ManagedLedgerConfig managedLedgerConfig = new 
ManagedLedgerConfig();
                     
managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble());
                     
managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum());
                     
managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum());
-                    if (localPolicies.isPresent() && 
localPolicies.get().bookieAffinityGroup != null) {
-                        managedLedgerConfig
-                                .setBookKeeperEnsemblePlacementPolicyClassName(
-                                        
IsolatedBookieEnsemblePlacementPolicy.class);
-                        Map<String, Object> properties = Maps.newHashMap();
-                        
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
-                                
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupPrimary());
-                        
properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
-                                
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupSecondary());
-                        
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
+
+                    if (serviceConfig.isStrictBookieAffinityEnabled()) {
+                        try {
+                            Optional<GetResult> racksData =
+                                    this.pulsar.getLocalMetadataStore().
+                                            
get(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH).join();

Review comment:
       We should check the 
`this.pulsar.getLocalMetadataStore().get(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH)`
 future exception instead of call `join`. For example:
   ```
   CompletableFuture<Optional<BookiesRackConfiguration>> future =
                           
bookieMappingCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH);
   
   Optional<BookiesRackConfiguration> optRes = (future.isDone() && 
!future.isCompletedExceptionally())
                           ? future.join() : Optional.empty()
   ```

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
##########
@@ -1480,22 +1485,77 @@ public void openLedgerFailed(ManagedLedgerException 
exception, Object ctx) {
                         );
                     }
 
-
                     ManagedLedgerConfig managedLedgerConfig = new 
ManagedLedgerConfig();
                     
managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble());
                     
managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum());
                     
managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum());
-                    if (localPolicies.isPresent() && 
localPolicies.get().bookieAffinityGroup != null) {
-                        managedLedgerConfig
-                                .setBookKeeperEnsemblePlacementPolicyClassName(
-                                        
IsolatedBookieEnsemblePlacementPolicy.class);
-                        Map<String, Object> properties = Maps.newHashMap();
-                        
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
-                                
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupPrimary());
-                        
properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
-                                
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupSecondary());
-                        
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
+
+                    if (serviceConfig.isStrictBookieAffinityEnabled()) {
+                        try {
+                            Optional<GetResult> racksData =
+                                    this.pulsar.getLocalMetadataStore().
+                                            
get(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH).join();
+                            if (racksData.isPresent()) {
+                                BookiesRackConfiguration 
allGroupsBookieMapping =
+                                        ObjectMapperFactory.getThreadLocal().
+                                                
readValue(racksData.get().getValue(), BookiesRackConfiguration.class);
+                                if (allGroupsBookieMapping.size() > 0) {
+                                    managedLedgerConfig
+                                            
.setBookKeeperEnsemblePlacementPolicyClassName(
+                                                    
ZkIsolatedBookieEnsemblePlacementPolicy.class);
+                                    if (localPolicies.isPresent()
+                                            && 
localPolicies.get().bookieAffinityGroup != null) {
+                                        Map<String, Object> properties = 
Maps.newHashMap();
+                                        
properties.put(ZkIsolatedBookieEnsemblePlacementPolicy
+                                                        
.ISOLATION_BOOKIE_GROUPS,
+                                                
localPolicies.get().bookieAffinityGroup
+                                                        
.getBookkeeperAffinityGroupPrimary());
+                                        
properties.put(ZkIsolatedBookieEnsemblePlacementPolicy
+                                                        
.SECONDARY_ISOLATION_BOOKIE_GROUPS,
+                                                
localPolicies.get().bookieAffinityGroup
+                                                        
.getBookkeeperAffinityGroupSecondary());
+                                        managedLedgerConfig.
+                                                
setBookKeeperEnsemblePlacementPolicyProperties(properties);
+                                    } else if 
(namespace.getTenant().equals("pulsar")) {
+                                        Map<String, Object> properties = 
Maps.newHashMap();
+                                        
properties.put(ZkIsolatedBookieEnsemblePlacementPolicy
+                                                .ISOLATION_BOOKIE_GROUPS, "*");
+                                        
properties.put(ZkIsolatedBookieEnsemblePlacementPolicy
+                                                
.SECONDARY_ISOLATION_BOOKIE_GROUPS, "*");
+                                        managedLedgerConfig.
+                                                
setBookKeeperEnsemblePlacementPolicyProperties(properties);
+                                    } else {
+                                        Map<String, Object> properties = 
Maps.newHashMap();
+                                        
properties.put(ZkIsolatedBookieEnsemblePlacementPolicy
+                                                .ISOLATION_BOOKIE_GROUPS, "");
+                                        
properties.put(ZkIsolatedBookieEnsemblePlacementPolicy
+                                                
.SECONDARY_ISOLATION_BOOKIE_GROUPS, "");
+                                        managedLedgerConfig.
+                                                
setBookKeeperEnsemblePlacementPolicyProperties(properties);
+                                    }
+                                }
+                            }
+                        } catch (Exception e) {
+                            log.error("Error getting bookie isolation info 
from zk");
+                            throw new IllegalStateException(e);

Review comment:
       If we throw `IllegalStateException` out, whether will make the broker 
break down? For example, user configure the rack info in wrong format, and the 
json parse failed. Do we need to cover the case or just make the broker 
shutdown?

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
##########
@@ -1480,22 +1485,77 @@ public void openLedgerFailed(ManagedLedgerException 
exception, Object ctx) {
                         );
                     }
 
-
                     ManagedLedgerConfig managedLedgerConfig = new 
ManagedLedgerConfig();
                     
managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble());
                     
managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum());
                     
managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum());
-                    if (localPolicies.isPresent() && 
localPolicies.get().bookieAffinityGroup != null) {
-                        managedLedgerConfig
-                                .setBookKeeperEnsemblePlacementPolicyClassName(
-                                        
IsolatedBookieEnsemblePlacementPolicy.class);
-                        Map<String, Object> properties = Maps.newHashMap();
-                        
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
-                                
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupPrimary());
-                        
properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
-                                
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupSecondary());
-                        
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
+
+                    if (serviceConfig.isStrictBookieAffinityEnabled()) {
+                        try {
+                            Optional<GetResult> racksData =
+                                    this.pulsar.getLocalMetadataStore().
+                                            
get(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH).join();
+                            if (racksData.isPresent()) {
+                                BookiesRackConfiguration 
allGroupsBookieMapping =
+                                        ObjectMapperFactory.getThreadLocal().
+                                                
readValue(racksData.get().getValue(), BookiesRackConfiguration.class);
+                                if (allGroupsBookieMapping.size() > 0) {
+                                    managedLedgerConfig
+                                            
.setBookKeeperEnsemblePlacementPolicyClassName(
+                                                    
ZkIsolatedBookieEnsemblePlacementPolicy.class);
+                                    if (localPolicies.isPresent()
+                                            && 
localPolicies.get().bookieAffinityGroup != null) {
+                                        Map<String, Object> properties = 
Maps.newHashMap();
+                                        
properties.put(ZkIsolatedBookieEnsemblePlacementPolicy
+                                                        
.ISOLATION_BOOKIE_GROUPS,
+                                                
localPolicies.get().bookieAffinityGroup
+                                                        
.getBookkeeperAffinityGroupPrimary());
+                                        
properties.put(ZkIsolatedBookieEnsemblePlacementPolicy
+                                                        
.SECONDARY_ISOLATION_BOOKIE_GROUPS,
+                                                
localPolicies.get().bookieAffinityGroup
+                                                        
.getBookkeeperAffinityGroupSecondary());
+                                        managedLedgerConfig.
+                                                
setBookKeeperEnsemblePlacementPolicyProperties(properties);
+                                    } else if 
(namespace.getTenant().equals("pulsar")) {
+                                        Map<String, Object> properties = 
Maps.newHashMap();
+                                        
properties.put(ZkIsolatedBookieEnsemblePlacementPolicy
+                                                .ISOLATION_BOOKIE_GROUPS, "*");
+                                        
properties.put(ZkIsolatedBookieEnsemblePlacementPolicy
+                                                
.SECONDARY_ISOLATION_BOOKIE_GROUPS, "*");
+                                        managedLedgerConfig.
+                                                
setBookKeeperEnsemblePlacementPolicyProperties(properties);
+                                    } else {
+                                        Map<String, Object> properties = 
Maps.newHashMap();
+                                        
properties.put(ZkIsolatedBookieEnsemblePlacementPolicy
+                                                .ISOLATION_BOOKIE_GROUPS, "");
+                                        
properties.put(ZkIsolatedBookieEnsemblePlacementPolicy
+                                                
.SECONDARY_ISOLATION_BOOKIE_GROUPS, "");
+                                        managedLedgerConfig.
+                                                
setBookKeeperEnsemblePlacementPolicyProperties(properties);
+                                    }
+                                }
+                            }
+                        } catch (Exception e) {

Review comment:
       Could we use sub exception instead of `Exception`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to