horizonzy commented on code in PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#discussion_r939693630
##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java:
##########
@@ -1079,4 +1083,236 @@ public boolean
areAckedBookiesAdheringToPlacementPolicy(Set<BookieId> ackedBooki
}
return rackCounter.size() >= minWriteQuorumNumRacksPerWriteQuorum;
}
+
+ @Override
+ public PlacementResult<List<BookieId>> replaceToAdherePlacementPolicy(
+ int ensembleSize,
+ int writeQuorumSize,
+ int ackQuorumSize,
+ Set<BookieId> excludeBookies,
+ List<BookieId> currentEnsemble) {
+ rwLock.readLock().lock();
+ try {
+ PlacementPolicyAdherence currentPlacementAdherence =
isEnsembleAdheringToPlacementPolicy(
+ currentEnsemble, writeQuorumSize, ackQuorumSize);
+ if (PlacementPolicyAdherence.FAIL != currentPlacementAdherence) {
+ return PlacementResult.of(new ArrayList<>(currentEnsemble),
currentPlacementAdherence);
+ }
+ for (BookieId bookieId : currentEnsemble) {
+ if (!knownBookies.containsKey(bookieId)) {
+ excludeBookies.add(bookieId);
+ }
+ }
+ PlacementResult<List<BookieId>> placementResult =
PlacementResult.of(Collections.emptyList(),
+ PlacementPolicyAdherence.FAIL);
+ int minDiffer = Integer.MAX_VALUE;
+ for (int i = 0; i < currentEnsemble.size(); i++) {
+ PlacementResult<List<BookieId>> result =
doReplaceToAdherePlacementPolicy(ensembleSize,
+ writeQuorumSize, ackQuorumSize, excludeBookies,
currentEnsemble, i);
+ if (PlacementPolicyAdherence.FAIL ==
result.getAdheringToPolicy()) {
+ continue;
+ }
+ int differ = differBetweenBookies(currentEnsemble,
result.getResult());
+ if (differ < minDiffer) {
+ minDiffer = differ;
+ placementResult = result;
+ if (minDiffer == 1) {
+ break;
+ }
+ }
+ }
+ return placementResult;
+ } finally {
+ rwLock.readLock().unlock();
+ }
+ }
+
+ private int differBetweenBookies(List<BookieId> bookiesA, List<BookieId>
bookiesB) {
+ if (CollectionUtils.isEmpty(bookiesA) ||
CollectionUtils.isEmpty(bookiesB)) {
+ return Integer.MAX_VALUE;
+ }
+ if (bookiesA.size() != bookiesB.size()) {
+ return Integer.MAX_VALUE;
+ }
+ int differ = 0;
+ for (int i = 0; i < bookiesA.size(); i++) {
+ if (!bookiesA.get(i).equals(bookiesB.get(i))) {
+ differ++;
+ }
+ }
+ return differ;
+ }
+
+ private PlacementResult<List<BookieId>> doReplaceToAdherePlacementPolicy(
+ int ensembleSize,
+ int writeQuorumSize,
+ int ackQuorumSize,
+ Set<BookieId> excludeBookies,
+ List<BookieId> currentEnsemble,
+ int startIndex) {
+ final List<BookieNode> provisionalEnsembleNodes =
currentEnsemble.stream()
+ .map(this::convertBookieToNode).collect(Collectors.toList());
+ final Set<Node> excludeNodes = convertBookiesToNodes(
+ addDefaultRackBookiesIfMinNumRacksIsEnforced(excludeBookies));
+ int minNumRacksPerWriteQuorumForThisEnsemble =
Math.min(writeQuorumSize, minNumRacksPerWriteQuorum);
+ final RRTopologyAwareCoverageEnsemble ensemble =
+ new RRTopologyAwareCoverageEnsemble(
+ ensembleSize,
+ writeQuorumSize,
+ ackQuorumSize,
+ RACKNAME_DISTANCE_FROM_LEAVES,
+ null,
+ null,
+ minNumRacksPerWriteQuorumForThisEnsemble);
+ int numRacks = topology.getNumOfRacks();
+ // only one rack or less than
minNumRacksPerWriteQuorumForThisEnsemble, stop calculation to skip relocation
+ if (numRacks < 2 || numRacks <
minNumRacksPerWriteQuorumForThisEnsemble) {
+ LOG.warn("Skip ensemble relocation because the cluster has only {}
rack.", numRacks);
+ return PlacementResult.of(Collections.emptyList(),
PlacementPolicyAdherence.FAIL);
+ }
+ BookieNode prevNode = null;
+ final BookieNode firstNode = provisionalEnsembleNodes.get(startIndex);
+ // use same bookie at first to reduce ledger replication
+ if (!excludeNodes.contains(firstNode) && ensemble.apply(firstNode,
ensemble)
+ && ensemble.addNode(firstNode)) {
+ excludeNodes.add(firstNode);
+ prevNode = firstNode;
+ }
+ for (int i = prevNode == null ? 0 : 1; i < ensembleSize; i++) {
+ int index = (startIndex + i) % ensembleSize;
+ final String curRack;
+ if (null == prevNode) {
+ if ((null == localNode) ||
defaultRack.equals(localNode.getNetworkLocation())) {
+ curRack = NodeBase.ROOT;
+ } else {
+ curRack = localNode.getNetworkLocation();
+ }
+ } else {
+ curRack = "~" + prevNode.getNetworkLocation();
+ }
+ try {
+ prevNode = replaceToAdherePlacementPolicyInternal(
+ curRack, excludeNodes, ensemble, ensemble,
+ provisionalEnsembleNodes, index, ensembleSize,
minNumRacksPerWriteQuorumForThisEnsemble);
+ // got a good candidate
+ if (ensemble.addNode(prevNode)) {
+ // add the candidate to exclude set
+ excludeNodes.add(prevNode);
+ } else {
+ throw new BKNotEnoughBookiesException();
+ }
+ // replace to newer node
+ provisionalEnsembleNodes.set(index, prevNode);
+ } catch (BKNotEnoughBookiesException e) {
+ LOG.warn("Skip ensemble relocation because the cluster has not
enough bookies.");
+ return PlacementResult.of(Collections.emptyList(),
PlacementPolicyAdherence.FAIL);
+ }
+ }
+ List<BookieId> bookieList = ensemble.toList();
+ if (ensembleSize != bookieList.size()) {
+ LOG.warn("Not enough {} bookies are available to form an ensemble
: {}.",
+ ensembleSize, bookieList);
+ return PlacementResult.of(Collections.emptyList(),
PlacementPolicyAdherence.FAIL);
+ }
+ PlacementPolicyAdherence placementPolicyAdherence =
isEnsembleAdheringToPlacementPolicy(bookieList,
+ writeQuorumSize, ackQuorumSize);
+ if (PlacementPolicyAdherence.FAIL == placementPolicyAdherence) {
+ return PlacementResult.of(Collections.emptyList(),
PlacementPolicyAdherence.FAIL);
+ }
+ return PlacementResult.of(revertBookieListByIndex(bookieList,
startIndex), placementPolicyAdherence);
+ }
+
+ private List<BookieId> revertBookieListByIndex(List<BookieId> bookies, int
startIndex) {
+ BookieId[] bookieIds = new BookieId[bookies.size()];
+ for (int i = 0; i < bookies.size(); i++) {
+ if (startIndex == bookies.size()) {
+ startIndex = 0;
+ }
+ bookieIds[startIndex++] = bookies.get(i);
+ }
+ return Lists.newArrayList(bookieIds);
+ }
+
+ private BookieNode replaceToAdherePlacementPolicyInternal(
+ String netPath, Set<Node> excludeBookies, Predicate<BookieNode>
predicate,
+ Ensemble<BookieNode> ensemble, List<BookieNode>
provisionalEnsembleNodes, int ensembleIndex,
+ int ensembleSize, int minNumRacksPerWriteQuorumForThisEnsemble)
throws BKNotEnoughBookiesException {
+ final BookieNode currentNode =
provisionalEnsembleNodes.get(ensembleIndex);
+ // if the current bookie could be applied to the ensemble, apply it to
minify the number of bookies replaced
+ if (!excludeBookies.contains(currentNode) &&
predicate.apply(currentNode, ensemble)) {
+ return currentNode;
+ }
+ final List<Pair<String, List<BookieNode>>> conditionList = new
ArrayList<>();
+ final Set<String> preExcludeRacks = new HashSet<>();
+ final Set<String> postExcludeRacks = new HashSet<>();
+ for (int i = 0; i < minNumRacksPerWriteQuorumForThisEnsemble - 1; i++)
{
+
preExcludeRacks.add(provisionalEnsembleNodes.get(Math.floorMod((ensembleIndex -
i - 1), ensembleSize))
+ .getNetworkLocation());
+
postExcludeRacks.add(provisionalEnsembleNodes.get(Math.floorMod((ensembleIndex
+ i + 1), ensembleSize))
+ .getNetworkLocation());
+ }
+ // adhere minNumRacksPerWriteQuorum by preExcludeRacks
+ // avoid additional replace from write quorum candidates by
preExcludeRacks and postExcludeRacks
+ // avoid to use first candidate bookies for election by
provisionalEnsembleNodes
+ conditionList.add(Pair.of(
+ "~" + String.join(",",
+ Stream.concat(preExcludeRacks.stream(),
postExcludeRacks.stream()).collect(Collectors.toSet())),
+ provisionalEnsembleNodes
+ ));
+ // avoid to use same rack between previous index by netPath
+ // avoid to use first candidate bookies for election by
provisionalEnsembleNodes
+ conditionList.add(Pair.of(netPath, provisionalEnsembleNodes));
+ // avoid to use same rack between previous index by netPath
+ conditionList.add(Pair.of(netPath, Collections.emptyList()));
+
+ for (Pair<String, List<BookieNode>> condition : conditionList) {
+ WeightedRandomSelection<BookieNode> wRSelection = null;
+
+ final List<Node> leaves = new
ArrayList<>(topology.getLeaves(condition.getLeft()));
+ if (!isWeighted) {
+ Collections.shuffle(leaves);
+ } else {
+ if (CollectionUtils.subtract(leaves, excludeBookies).size() <
1) {
+ throw new BKNotEnoughBookiesException();
+ }
+ wRSelection = prepareForWeightedSelection(leaves);
+ if (wRSelection == null) {
+ throw new BKNotEnoughBookiesException();
+ }
+ }
+
+ final Iterator<Node> it = leaves.iterator();
+ final Set<Node> bookiesSeenSoFar = new HashSet<>();
+ while (true) {
+ Node n;
+ if (isWeighted) {
+ if (bookiesSeenSoFar.size() == leaves.size()) {
+ // Don't loop infinitely.
+ break;
+ }
+ n = wRSelection.getNextRandom();
+ bookiesSeenSoFar.add(n);
+ } else {
+ if (it.hasNext()) {
+ n = it.next();
+ } else {
+ break;
+ }
+ }
+ if (excludeBookies.contains(n)) {
+ continue;
+ }
+ if (!(n instanceof BookieNode) ||
!predicate.apply((BookieNode) n, ensemble)) {
+ continue;
+ }
+ // additional excludeBookies
+ if (condition.getRight().contains(n)) {
+ continue;
+ }
+ BookieNode bn = (BookieNode) n;
+ return bn;
+ }
+ }
+ throw new BKNotEnoughBookiesException();
Review Comment:
It already log, in `doReplaceToAdherePlacementPolicy`, it catch
`BKNotEnoughBookiesException` and log it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]