horizonzy commented on code in PR #4133:
URL: https://github.com/apache/bookkeeper/pull/4133#discussion_r1429473242
##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java:
##########
@@ -644,12 +645,75 @@ public final DistributionSchedule.WriteSet
reorderReadLACSequence(
@Override
public PlacementPolicyAdherence
isEnsembleAdheringToPlacementPolicy(List<BookieId> ensembleList,
int writeQuorumSize, int ackQuorumSize) {
- /**
- * TODO: have to implement actual logic for this method for
- * RegionAwareEnsemblePlacementPolicy. For now return true value.
- *
- * - https://github.com/apache/bookkeeper/issues/1898
- */
+ if (CollectionUtils.isEmpty(ensembleList)) {
+ return PlacementPolicyAdherence.FAIL;
+ }
+
+ int effectiveMinRegionsForDurability =
disableDurabilityFeature.isAvailable() ? 1 : minRegionsForDurability;
+
+ int ensembleSize = ensembleList.size();
+ Map<String, Set<BookieId>> regionsInQuorum = new HashMap<>();
+ BookieId bookie;
+ for (int i = 0; i < ensembleList.size(); i++) {
+ regionsInQuorum.clear();
+ for (int j = 0; j < writeQuorumSize; j++) {
+ bookie = ensembleList.get((i + j) % ensembleSize);
+ if (knownBookies.containsKey(bookie)) {
+ String region = getLocalRegion(knownBookies.get(bookie));
+ if (regionsInQuorum.containsKey(region)) {
+ regionsInQuorum.get(region).add(bookie);
+ } else {
+ Set<BookieId> bookieSet = new HashSet<>();
+ bookieSet.add(bookie);
+ regionsInQuorum.put(region, bookieSet);
+ }
+ } else if (LOG.isDebugEnabled()) {
+ LOG.debug("bookie {} is not in the list of knownBookies",
bookie);
+ }
+ }
+
+ if (regionsInQuorum.isEmpty()) {
+ return PlacementPolicyAdherence.FAIL;
+ }
+
+ if (regionsInQuorum.size() < 2) {
Review Comment:
I have a question: why judge the `regionsInQuorum.size() < 2`?
##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java:
##########
@@ -644,12 +645,75 @@ public final DistributionSchedule.WriteSet
reorderReadLACSequence(
@Override
public PlacementPolicyAdherence
isEnsembleAdheringToPlacementPolicy(List<BookieId> ensembleList,
int writeQuorumSize, int ackQuorumSize) {
- /**
- * TODO: have to implement actual logic for this method for
- * RegionAwareEnsemblePlacementPolicy. For now return true value.
- *
- * - https://github.com/apache/bookkeeper/issues/1898
- */
+ if (CollectionUtils.isEmpty(ensembleList)) {
+ return PlacementPolicyAdherence.FAIL;
+ }
+
+ int effectiveMinRegionsForDurability =
disableDurabilityFeature.isAvailable() ? 1 : minRegionsForDurability;
+
+ int ensembleSize = ensembleList.size();
+ Map<String, Set<BookieId>> regionsInQuorum = new HashMap<>();
+ BookieId bookie;
+ for (int i = 0; i < ensembleList.size(); i++) {
+ regionsInQuorum.clear();
+ for (int j = 0; j < writeQuorumSize; j++) {
+ bookie = ensembleList.get((i + j) % ensembleSize);
+ if (knownBookies.containsKey(bookie)) {
Review Comment:
we didn't lock this method, so the result may be different between line_661
and line_662, line_661 contains the bookie, but at the line_662, the bookie may
be removed. Use `knownBookies.get(bookie);` directly would be better.
##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java:
##########
@@ -644,12 +645,75 @@ public final DistributionSchedule.WriteSet
reorderReadLACSequence(
@Override
public PlacementPolicyAdherence
isEnsembleAdheringToPlacementPolicy(List<BookieId> ensembleList,
int writeQuorumSize, int ackQuorumSize) {
- /**
- * TODO: have to implement actual logic for this method for
- * RegionAwareEnsemblePlacementPolicy. For now return true value.
- *
- * - https://github.com/apache/bookkeeper/issues/1898
- */
+ if (CollectionUtils.isEmpty(ensembleList)) {
+ return PlacementPolicyAdherence.FAIL;
+ }
+
+ int effectiveMinRegionsForDurability =
disableDurabilityFeature.isAvailable() ? 1 : minRegionsForDurability;
+
+ int ensembleSize = ensembleList.size();
+ Map<String, Set<BookieId>> regionsInQuorum = new HashMap<>();
+ BookieId bookie;
+ for (int i = 0; i < ensembleList.size(); i++) {
+ regionsInQuorum.clear();
+ for (int j = 0; j < writeQuorumSize; j++) {
+ bookie = ensembleList.get((i + j) % ensembleSize);
+ if (knownBookies.containsKey(bookie)) {
+ String region = getLocalRegion(knownBookies.get(bookie));
+ if (regionsInQuorum.containsKey(region)) {
+ regionsInQuorum.get(region).add(bookie);
+ } else {
+ Set<BookieId> bookieSet = new HashSet<>();
+ bookieSet.add(bookie);
+ regionsInQuorum.put(region, bookieSet);
+ }
+ } else if (LOG.isDebugEnabled()) {
+ LOG.debug("bookie {} is not in the list of knownBookies",
bookie);
+ }
+ }
+
+ if (regionsInQuorum.isEmpty()) {
+ return PlacementPolicyAdherence.FAIL;
+ }
+
+ if (regionsInQuorum.size() < 2) {
+ // fall back to use the ensemblePlacementPolicy in specific
region
+ String region = regionsInQuorum.keySet().iterator().next();
+ Set<BookieId> bookieIds = regionsInQuorum.get(region);
+
+ TopologyAwareEnsemblePlacementPolicy policyWithinRegion =
perRegionPlacement.get(region);
+ PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy =
policyWithinRegion
+ .isEnsembleAdheringToPlacementPolicy(new
ArrayList<>(bookieIds), bookieIds.size(), 1);
+ if (isEnsembleAdheringToPlacementPolicy ==
PlacementPolicyAdherence.FAIL) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("For ensemble {}, write set starting at {}
are all from one region, "
+ + "fall back to
RackawareEnsemblePlacementPolicy and fail.", ensembleList, i);
+ }
+ return PlacementPolicyAdherence.FAIL;
+ }
+ continue;
+ }
+
+ if (effectiveMinRegionsForDurability > 0 && regionsInQuorum.size()
< effectiveMinRegionsForDurability) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("For ensemble {}, write set starting at {} are
from {} regions, "
+ + "less than
effectiveMinRegionsForDurability: {}.",
+ ensembleList, i, regionsInQuorum.size(),
effectiveMinRegionsForDurability);
+ }
+ return PlacementPolicyAdherence.FAIL;
+ }
+
+ if (regionsInQuorum.size() < writeQuorumSize) {
Review Comment:
We can check it before line_679
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]