hangc0276 commented on a change in pull request #12025:
URL: https://github.com/apache/pulsar/pull/12025#discussion_r789541003
##########
File path:
pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
##########
@@ -90,11 +92,10 @@ public RackawareEnsemblePlacementPolicyImpl
initialize(ClientConfiguration conf,
for (String isolationGroup : isolationGroupsString.split(","))
{
primaryIsolationGroups.add(isolationGroup);
}
-
- // Only add the bookieMappingCache if we have defined an
isolation group
- bookieMappingCache =
store.getMetadataCache(BookiesRackConfiguration.class);
-
bookieMappingCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH).join();
}
+ // Only add the bookieMappingCache if we have defined an isolation
group
+ bookieMappingCache =
store.getMetadataCache(BookiesRackConfiguration.class);
+
bookieMappingCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH).join();
Review comment:
Do we need this line? it may throw CompletionException
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
##########
@@ -1480,22 +1485,77 @@ public void openLedgerFailed(ManagedLedgerException
exception, Object ctx) {
);
}
-
ManagedLedgerConfig managedLedgerConfig = new
ManagedLedgerConfig();
managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble());
managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum());
managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum());
- if (localPolicies.isPresent() &&
localPolicies.get().bookieAffinityGroup != null) {
- managedLedgerConfig
- .setBookKeeperEnsemblePlacementPolicyClassName(
-
IsolatedBookieEnsemblePlacementPolicy.class);
- Map<String, Object> properties = Maps.newHashMap();
-
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
-
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupPrimary());
-
properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
-
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupSecondary());
-
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
+
+ if (serviceConfig.isStrictBookieAffinityEnabled()) {
Review comment:
If user configured `strictBookieAffinityEnabled = true`, but not set
bookie rack info for the cluster. After all brokers startup, the user set the
bookie rack info by pulsar-admin, whether the namespace isolation still work?
##########
File path: site2/docs/reference-configuration.md
##########
@@ -364,6 +364,7 @@ brokerServiceCompactionThresholdInBytes|If the estimated
backlog size is greater
| additionalServletDirectory | Location of broker additional servlet NAR
directory | ./brokerAdditionalServlet |
| brokerEntryMetadataInterceptors | Set broker entry metadata interceptors.<br
/><br />Multiple interceptors should be separated by commas. <br /><br
/>Available
values:<li>org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor</li><li>org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor</li>
<br /><br />Example<br
/>brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor,
org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor|N/A |
| enableExposingBrokerEntryMetadataToClient|Whether to expose broker entry
metadata to client or not.<br /><br />Available
values:<li>true</li><li>false</li><br />Example<br
/>enableExposingBrokerEntryMetadataToClient=true | false |
+| strictBookieAffinityEnabled | Enable or disable the strict bookie isolation
strategy. If enabled, <br /> - `bookie-ensemble` first tries to choose bookies
that belong to a namespace's affinity group. If the number of bookies is not
enough, then the rest bookies are chosen. <br /> - If namespace has no affinity
group, `bookie-ensemble` only chooses bookies that belong to no region. If the
number of bookies is not enough, `BKNotEnoughBookiesException` is thrown.|
false |
Review comment:
>If enabled `bookie-ensemble` first tries to choose bookies that belong
to a namespace's affinity group. If the number of bookies is not enough, then
the rest bookies are chosen.
then the rest non-region configured bookies will be chosen, right?
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
##########
@@ -1479,22 +1483,79 @@ public void openLedgerFailed(ManagedLedgerException
exception, Object ctx) {
);
}
-
ManagedLedgerConfig managedLedgerConfig = new
ManagedLedgerConfig();
managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble());
managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum());
managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum());
- if (localPolicies.isPresent() &&
localPolicies.get().bookieAffinityGroup != null) {
- managedLedgerConfig
- .setBookKeeperEnsemblePlacementPolicyClassName(
-
IsolatedBookieEnsemblePlacementPolicy.class);
- Map<String, Object> properties = Maps.newHashMap();
-
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
-
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupPrimary());
-
properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
-
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupSecondary());
-
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
+
+ if (serviceConfig.isStrictBookieAffinityEnabled()) {
+ try {
+ Optional<GetResult> racksData =
+ this.pulsar.getLocalMetadataStore().
+
get(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH).join();
Review comment:
We should check the
`this.pulsar.getLocalMetadataStore().get(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH)`
future exception instead of call `join`. For example:
```
CompletableFuture<Optional<BookiesRackConfiguration>> future =
bookieMappingCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH);
Optional<BookiesRackConfiguration> optRes = (future.isDone() &&
!future.isCompletedExceptionally())
? future.join() : Optional.empty()
```
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
##########
@@ -1480,22 +1485,77 @@ public void openLedgerFailed(ManagedLedgerException
exception, Object ctx) {
);
}
-
ManagedLedgerConfig managedLedgerConfig = new
ManagedLedgerConfig();
managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble());
managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum());
managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum());
- if (localPolicies.isPresent() &&
localPolicies.get().bookieAffinityGroup != null) {
- managedLedgerConfig
- .setBookKeeperEnsemblePlacementPolicyClassName(
-
IsolatedBookieEnsemblePlacementPolicy.class);
- Map<String, Object> properties = Maps.newHashMap();
-
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
-
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupPrimary());
-
properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
-
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupSecondary());
-
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
+
+ if (serviceConfig.isStrictBookieAffinityEnabled()) {
+ try {
+ Optional<GetResult> racksData =
+ this.pulsar.getLocalMetadataStore().
+
get(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH).join();
+ if (racksData.isPresent()) {
+ BookiesRackConfiguration
allGroupsBookieMapping =
+ ObjectMapperFactory.getThreadLocal().
+
readValue(racksData.get().getValue(), BookiesRackConfiguration.class);
+ if (allGroupsBookieMapping.size() > 0) {
+ managedLedgerConfig
+
.setBookKeeperEnsemblePlacementPolicyClassName(
+
ZkIsolatedBookieEnsemblePlacementPolicy.class);
+ if (localPolicies.isPresent()
+ &&
localPolicies.get().bookieAffinityGroup != null) {
+ Map<String, Object> properties =
Maps.newHashMap();
+
properties.put(ZkIsolatedBookieEnsemblePlacementPolicy
+
.ISOLATION_BOOKIE_GROUPS,
+
localPolicies.get().bookieAffinityGroup
+
.getBookkeeperAffinityGroupPrimary());
+
properties.put(ZkIsolatedBookieEnsemblePlacementPolicy
+
.SECONDARY_ISOLATION_BOOKIE_GROUPS,
+
localPolicies.get().bookieAffinityGroup
+
.getBookkeeperAffinityGroupSecondary());
+ managedLedgerConfig.
+
setBookKeeperEnsemblePlacementPolicyProperties(properties);
+ } else if
(namespace.getTenant().equals("pulsar")) {
+ Map<String, Object> properties =
Maps.newHashMap();
+
properties.put(ZkIsolatedBookieEnsemblePlacementPolicy
+ .ISOLATION_BOOKIE_GROUPS, "*");
+
properties.put(ZkIsolatedBookieEnsemblePlacementPolicy
+
.SECONDARY_ISOLATION_BOOKIE_GROUPS, "*");
+ managedLedgerConfig.
+
setBookKeeperEnsemblePlacementPolicyProperties(properties);
+ } else {
+ Map<String, Object> properties =
Maps.newHashMap();
+
properties.put(ZkIsolatedBookieEnsemblePlacementPolicy
+ .ISOLATION_BOOKIE_GROUPS, "");
+
properties.put(ZkIsolatedBookieEnsemblePlacementPolicy
+
.SECONDARY_ISOLATION_BOOKIE_GROUPS, "");
+ managedLedgerConfig.
+
setBookKeeperEnsemblePlacementPolicyProperties(properties);
+ }
+ }
+ }
+ } catch (Exception e) {
+ log.error("Error getting bookie isolation info
from zk");
+ throw new IllegalStateException(e);
Review comment:
If we throw `IllegalStateException` out, whether will make the broker
break down? For example, user configure the rack info in wrong format, and the
json parse failed. Do we need to cover the case or just make the broker
shutdown?
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
##########
@@ -1480,22 +1485,77 @@ public void openLedgerFailed(ManagedLedgerException
exception, Object ctx) {
);
}
-
ManagedLedgerConfig managedLedgerConfig = new
ManagedLedgerConfig();
managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble());
managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum());
managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum());
- if (localPolicies.isPresent() &&
localPolicies.get().bookieAffinityGroup != null) {
- managedLedgerConfig
- .setBookKeeperEnsemblePlacementPolicyClassName(
-
IsolatedBookieEnsemblePlacementPolicy.class);
- Map<String, Object> properties = Maps.newHashMap();
-
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
-
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupPrimary());
-
properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
-
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupSecondary());
-
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
+
+ if (serviceConfig.isStrictBookieAffinityEnabled()) {
+ try {
+ Optional<GetResult> racksData =
+ this.pulsar.getLocalMetadataStore().
+
get(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH).join();
+ if (racksData.isPresent()) {
+ BookiesRackConfiguration
allGroupsBookieMapping =
+ ObjectMapperFactory.getThreadLocal().
+
readValue(racksData.get().getValue(), BookiesRackConfiguration.class);
+ if (allGroupsBookieMapping.size() > 0) {
+ managedLedgerConfig
+
.setBookKeeperEnsemblePlacementPolicyClassName(
+
ZkIsolatedBookieEnsemblePlacementPolicy.class);
+ if (localPolicies.isPresent()
+ &&
localPolicies.get().bookieAffinityGroup != null) {
+ Map<String, Object> properties =
Maps.newHashMap();
+
properties.put(ZkIsolatedBookieEnsemblePlacementPolicy
+
.ISOLATION_BOOKIE_GROUPS,
+
localPolicies.get().bookieAffinityGroup
+
.getBookkeeperAffinityGroupPrimary());
+
properties.put(ZkIsolatedBookieEnsemblePlacementPolicy
+
.SECONDARY_ISOLATION_BOOKIE_GROUPS,
+
localPolicies.get().bookieAffinityGroup
+
.getBookkeeperAffinityGroupSecondary());
+ managedLedgerConfig.
+
setBookKeeperEnsemblePlacementPolicyProperties(properties);
+ } else if
(namespace.getTenant().equals("pulsar")) {
+ Map<String, Object> properties =
Maps.newHashMap();
+
properties.put(ZkIsolatedBookieEnsemblePlacementPolicy
+ .ISOLATION_BOOKIE_GROUPS, "*");
+
properties.put(ZkIsolatedBookieEnsemblePlacementPolicy
+
.SECONDARY_ISOLATION_BOOKIE_GROUPS, "*");
+ managedLedgerConfig.
+
setBookKeeperEnsemblePlacementPolicyProperties(properties);
+ } else {
+ Map<String, Object> properties =
Maps.newHashMap();
+
properties.put(ZkIsolatedBookieEnsemblePlacementPolicy
+ .ISOLATION_BOOKIE_GROUPS, "");
+
properties.put(ZkIsolatedBookieEnsemblePlacementPolicy
+
.SECONDARY_ISOLATION_BOOKIE_GROUPS, "");
+ managedLedgerConfig.
+
setBookKeeperEnsemblePlacementPolicyProperties(properties);
+ }
+ }
+ }
+ } catch (Exception e) {
Review comment:
Could we use sub exception instead of `Exception`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]