horizonzy commented on code in PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#discussion_r928385603
##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java:
##########
@@ -502,10 +570,40 @@ private boolean
isLastSegmentOpenAndMissingBookies(LedgerHandle lh) throws BKExc
*/
private Set<LedgerFragment> getUnderreplicatedFragments(LedgerHandle lh,
Long ledgerVerificationPercentage)
throws InterruptedException {
+ //The data loss fragments is first to repair. If a fragment is
data_loss and not_adhering_placement
+ //at the same time, we only fix data_loss in this time. After fix
data_loss, the fragment is still
+ //not_adhering_placement, Auditor will mark this ledger again.
+ Set<LedgerFragment> underreplicatedFragments = new HashSet<>();
+
+ Set<LedgerFragment> dataLossFragments = getDataLossFragments(lh,
ledgerVerificationPercentage);
+ underreplicatedFragments.addAll(dataLossFragments);
+
+ Set<LedgerFragment> notAdheringFragments =
getNeedRepairedPlacementNotAdheringFragments(lh);
+
+ for (LedgerFragment notAdheringFragment : notAdheringFragments) {
+ if (!checkFragmentRepeat(underreplicatedFragments,
notAdheringFragment)) {
Review Comment:
Maybe not, we didn't overrride ledgerFragment equals method, and we just
compare the first ledgerId and last ledgerId, so we use checkFragmentRepeat to
handle the case.
##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java:
##########
@@ -364,6 +364,18 @@ DistributionSchedule.WriteSet reorderReadLACSequence(
default void updateBookieInfo(Map<BookieId, BookieInfo> bookieInfoMap) {
}
+ /**
+ * Replace some bookie to adhering placement policy. If the all kinds of
replacement
+ * didn't adhere placement policy, return empty map.
+ *
+ * @param ensemble
+ * @param writeQuorumSize
+ * @param ackQuorumSize
+ * @return Map: key means ensemble index, value means target replace
bookieId.
+ */
+ Map<Integer, BookieId>
replaceNotAdheringPlacementPolicyBookie(List<BookieId> ensemble, int
writeQuorumSize,
Review Comment:
That's fine.
##########
bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java:
##########
@@ -1417,6 +1417,176 @@ public boolean validate() {
}
}
+ @Test
+ public void testReplaceNotAdheringPlacementPolicyBookie() throws Exception
{
+ repp.uninitalize();
+
+ int minNumRacksPerWriteQuorum = 3;
+ ClientConfiguration clientConf = new ClientConfiguration(conf);
+ clientConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorum);
+ // set enforceMinNumRacksPerWriteQuorum
+ clientConf.setEnforceMinNumRacksPerWriteQuorum(true);
+ repp = new RackawareEnsemblePlacementPolicy();
+ repp.initialize(clientConf, Optional.<DNSToSwitchMapping> empty(),
timer, DISABLE_ALL,
+ NullStatsLogger.INSTANCE,
BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
+ repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+ int numOfRacks = 3;
+ int numOfBookiesPerRack = 3;
+ String[] rackLocationNames = new String[numOfRacks];
+ List<BookieId> bookieSocketAddresses = new ArrayList<BookieId>();
+ Map<BookieId, String> bookieRackMap = new HashMap<BookieId, String>();
+ BookieId bookieAddress;
+
+ for (int i = 0; i < numOfRacks; i++) {
+ rackLocationNames[i] = "/default-region/r" + i;
+ for (int j = 0; j < numOfBookiesPerRack; j++) {
+ int index = i * numOfBookiesPerRack + j;
+ bookieAddress = new BookieSocketAddress("128.0.0." + index,
3181).toBookieId();
+ StaticDNSResolver.addNodeToRack("128.0.0." + index,
rackLocationNames[i]);
+ bookieSocketAddresses.add(bookieAddress);
+ bookieRackMap.put(bookieAddress, rackLocationNames[i]);
+ }
+ }
+
+ repp.onClusterChanged(new HashSet<BookieId>(bookieSocketAddresses),
new HashSet<BookieId>());
+
+ int writeQuorum = 3;
+ int ackQuorum = 3;
+
+ //test three knows bookie
+ List<BookieId> knowsEnsemble = new ArrayList<>();
+ knowsEnsemble.add(BookieId.parse("128.0.0.0:3181"));
+ knowsEnsemble.add(BookieId.parse("128.0.0.1:3181"));
+ knowsEnsemble.add(BookieId.parse("128.0.0.2:3181"));
+
+ PlacementPolicyAdherence placementPolicyAdherence =
repp.isEnsembleAdheringToPlacementPolicy(
+ knowsEnsemble, writeQuorum, ackQuorum);
+ assertEquals(placementPolicyAdherence, PlacementPolicyAdherence.FAIL);
+
+ Map<Integer, BookieId> targetBookie =
+ repp.replaceNotAdheringPlacementPolicyBookie(knowsEnsemble,
ackQuorum, writeQuorum,
+ Collections.emptyMap());
+ //should replace two bookie
+ assertEquals(targetBookie.size(), 2);
+
+ for (Map.Entry<Integer, BookieId> entry : targetBookie.entrySet()) {
+ knowsEnsemble.set(entry.getKey(), entry.getValue());
+ }
+
+ placementPolicyAdherence = repp.isEnsembleAdheringToPlacementPolicy(
+ knowsEnsemble, writeQuorum, ackQuorum);
+ assertEquals(placementPolicyAdherence,
PlacementPolicyAdherence.MEETS_STRICT);
+
+ //test three unknowns bookie
+ List<BookieId> unknownEnsembles = new ArrayList<>();
+ unknownEnsembles.add(BookieId.parse("128.0.0.100:3181"));
+ unknownEnsembles.add(BookieId.parse("128.0.0.101:3181"));
+ unknownEnsembles.add(BookieId.parse("128.0.0.102:3181"));
+
+ placementPolicyAdherence = repp.isEnsembleAdheringToPlacementPolicy(
+ unknownEnsembles, writeQuorum, ackQuorum);
+ assertEquals(placementPolicyAdherence, PlacementPolicyAdherence.FAIL);
+
+ //should replace three bookie
+ targetBookie =
repp.replaceNotAdheringPlacementPolicyBookie(unknownEnsembles, ackQuorum,
writeQuorum,
+ Collections.emptyMap());
+ assertEquals(targetBookie.size(), 3);
+
+ for (Map.Entry<Integer, BookieId> entry : targetBookie.entrySet()) {
+ unknownEnsembles.set(entry.getKey(), entry.getValue());
+ }
+
+ placementPolicyAdherence = repp.isEnsembleAdheringToPlacementPolicy(
+ unknownEnsembles, writeQuorum, ackQuorum);
+ assertEquals(placementPolicyAdherence,
PlacementPolicyAdherence.MEETS_STRICT);
Review Comment:
What is `If all ensembles are missing` meaning?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]