horizonzy commented on code in PR #4133:
URL: https://github.com/apache/bookkeeper/pull/4133#discussion_r1429473242


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java:
##########
@@ -644,12 +645,75 @@ public final DistributionSchedule.WriteSet 
reorderReadLACSequence(
     @Override
     public PlacementPolicyAdherence 
isEnsembleAdheringToPlacementPolicy(List<BookieId> ensembleList,
             int writeQuorumSize, int ackQuorumSize) {
-        /**
-         * TODO: have to implement actual logic for this method for
-         * RegionAwareEnsemblePlacementPolicy. For now return true value.
-         *
-         * - https://github.com/apache/bookkeeper/issues/1898
-         */
+        if (CollectionUtils.isEmpty(ensembleList)) {
+            return PlacementPolicyAdherence.FAIL;
+        }
+
+        int effectiveMinRegionsForDurability = 
disableDurabilityFeature.isAvailable() ? 1 : minRegionsForDurability;
+
+        int ensembleSize = ensembleList.size();
+        Map<String, Set<BookieId>> regionsInQuorum = new HashMap<>();
+        BookieId bookie;
+        for (int i = 0; i < ensembleList.size(); i++) {
+            regionsInQuorum.clear();
+            for (int j = 0; j < writeQuorumSize; j++) {
+                bookie = ensembleList.get((i + j) % ensembleSize);
+                if (knownBookies.containsKey(bookie)) {
+                    String region = getLocalRegion(knownBookies.get(bookie));
+                    if (regionsInQuorum.containsKey(region)) {
+                        regionsInQuorum.get(region).add(bookie);
+                    } else {
+                        Set<BookieId> bookieSet = new HashSet<>();
+                        bookieSet.add(bookie);
+                        regionsInQuorum.put(region, bookieSet);
+                    }
+                } else if (LOG.isDebugEnabled()) {
+                    LOG.debug("bookie {} is not in the list of knownBookies", 
bookie);
+                }
+            }
+
+            if (regionsInQuorum.isEmpty()) {
+                return PlacementPolicyAdherence.FAIL;
+            }
+
+            if (regionsInQuorum.size() < 2) {

Review Comment:
   I have a question: why judge the `regionsInQuorum.size() < 2`?



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java:
##########
@@ -644,12 +645,75 @@ public final DistributionSchedule.WriteSet 
reorderReadLACSequence(
     @Override
     public PlacementPolicyAdherence 
isEnsembleAdheringToPlacementPolicy(List<BookieId> ensembleList,
             int writeQuorumSize, int ackQuorumSize) {
-        /**
-         * TODO: have to implement actual logic for this method for
-         * RegionAwareEnsemblePlacementPolicy. For now return true value.
-         *
-         * - https://github.com/apache/bookkeeper/issues/1898
-         */
+        if (CollectionUtils.isEmpty(ensembleList)) {
+            return PlacementPolicyAdherence.FAIL;
+        }
+
+        int effectiveMinRegionsForDurability = 
disableDurabilityFeature.isAvailable() ? 1 : minRegionsForDurability;
+
+        int ensembleSize = ensembleList.size();
+        Map<String, Set<BookieId>> regionsInQuorum = new HashMap<>();
+        BookieId bookie;
+        for (int i = 0; i < ensembleList.size(); i++) {
+            regionsInQuorum.clear();
+            for (int j = 0; j < writeQuorumSize; j++) {
+                bookie = ensembleList.get((i + j) % ensembleSize);
+                if (knownBookies.containsKey(bookie)) {

Review Comment:
   we didn't lock this method, so the result may be different between line_661 
and line_662, line_661 contains the bookie, but at the line_662, the bookie may 
be removed. Use `knownBookies.get(bookie);` directly would be better.



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java:
##########
@@ -644,12 +645,75 @@ public final DistributionSchedule.WriteSet 
reorderReadLACSequence(
     @Override
     public PlacementPolicyAdherence 
isEnsembleAdheringToPlacementPolicy(List<BookieId> ensembleList,
             int writeQuorumSize, int ackQuorumSize) {
-        /**
-         * TODO: have to implement actual logic for this method for
-         * RegionAwareEnsemblePlacementPolicy. For now return true value.
-         *
-         * - https://github.com/apache/bookkeeper/issues/1898
-         */
+        if (CollectionUtils.isEmpty(ensembleList)) {
+            return PlacementPolicyAdherence.FAIL;
+        }
+
+        int effectiveMinRegionsForDurability = 
disableDurabilityFeature.isAvailable() ? 1 : minRegionsForDurability;
+
+        int ensembleSize = ensembleList.size();
+        Map<String, Set<BookieId>> regionsInQuorum = new HashMap<>();
+        BookieId bookie;
+        for (int i = 0; i < ensembleList.size(); i++) {
+            regionsInQuorum.clear();
+            for (int j = 0; j < writeQuorumSize; j++) {
+                bookie = ensembleList.get((i + j) % ensembleSize);
+                if (knownBookies.containsKey(bookie)) {
+                    String region = getLocalRegion(knownBookies.get(bookie));
+                    if (regionsInQuorum.containsKey(region)) {
+                        regionsInQuorum.get(region).add(bookie);
+                    } else {
+                        Set<BookieId> bookieSet = new HashSet<>();
+                        bookieSet.add(bookie);
+                        regionsInQuorum.put(region, bookieSet);
+                    }
+                } else if (LOG.isDebugEnabled()) {
+                    LOG.debug("bookie {} is not in the list of knownBookies", 
bookie);
+                }
+            }
+
+            if (regionsInQuorum.isEmpty()) {
+                return PlacementPolicyAdherence.FAIL;
+            }
+
+            if (regionsInQuorum.size() < 2) {
+                // fall back to use the ensemblePlacementPolicy in specific 
region
+                String region = regionsInQuorum.keySet().iterator().next();
+                Set<BookieId> bookieIds = regionsInQuorum.get(region);
+
+                TopologyAwareEnsemblePlacementPolicy policyWithinRegion = 
perRegionPlacement.get(region);
+                PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy = 
policyWithinRegion
+                        .isEnsembleAdheringToPlacementPolicy(new 
ArrayList<>(bookieIds), bookieIds.size(), 1);
+                if (isEnsembleAdheringToPlacementPolicy == 
PlacementPolicyAdherence.FAIL) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("For ensemble {}, write set starting at {} 
are all from one region, "
+                                + "fall back to 
RackawareEnsemblePlacementPolicy and fail.", ensembleList, i);
+                    }
+                    return PlacementPolicyAdherence.FAIL;
+                }
+                continue;
+            }
+
+            if (effectiveMinRegionsForDurability > 0 && regionsInQuorum.size() 
< effectiveMinRegionsForDurability) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("For ensemble {}, write set starting at {} are 
from {} regions, "
+                                    + "less than 
effectiveMinRegionsForDurability: {}.",
+                            ensembleList, i, regionsInQuorum.size(), 
effectiveMinRegionsForDurability);
+                }
+                return PlacementPolicyAdherence.FAIL;
+            }
+
+            if (regionsInQuorum.size() < writeQuorumSize) {

Review Comment:
   We can check it before line_679



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to