wenbingshen opened a new issue, #3817:
URL: https://github.com/apache/bookkeeper/issues/3817
***Describe the bug***
When we were on vacation, a bookie node rejoined the cluster after shaking,
and the bookie came back online at the lostBookieRecoveryDelay time, and the
Auditor replicas check task happened to perform checks and recovery during the
lostBookieRecoveryDelay period, which would cause unnecessary replica recovery,
and heavy write and read operations to zookeeper, bookie Cluster.
```java
@Test
public void testDelayBookieAuditOfCheckAllLedgers() throws Exception {
for (AuditorElector e : auditorElectors.values()) {
e.shutdown();
}
final int numLedgers = 10;
List<Long> ids = new LinkedList<Long>();
for (int i = 0; i < numLedgers; i++) {
LedgerHandle lh = bkc.createLedger(3, 3, DigestType.CRC32,
"passwd".getBytes());
ids.add(lh.getId());
for (int j = 0; j < 2; j++) {
lh.addEntry("testdata".getBytes());
}
lh.close();
}
LedgerManagerFactory mFactory = driver.getLedgerManagerFactory();
LedgerUnderreplicationManager urm =
mFactory.newLedgerUnderreplicationManager();
ServerConfiguration servConf = new
ServerConfiguration(confByIndex(0));
TestStatsProvider statsProvider = new TestStatsProvider();
TestStatsLogger statsLogger =
statsProvider.getStatsLogger(AUDITOR_SCOPE);
Counter numBookieAuditsDelayed =
statsLogger.getCounter(ReplicationStats.NUM_BOOKIE_AUDITS_DELAYED);
TestOpStatsLogger underReplicatedLedgerTotalSizeStatsLogger =
(TestOpStatsLogger) statsLogger
.getOpStatsLogger(ReplicationStats.UNDER_REPLICATED_LEDGERS_TOTAL_SIZE);
servConf.setAuditorPeriodicCheckInterval(1);
servConf.setAuditorPeriodicPlacementPolicyCheckInterval(0);
servConf.setAuditorPeriodicBookieCheckInterval(Long.MAX_VALUE);
urm.setLostBookieRecoveryDelay(Integer.MAX_VALUE);
AtomicBoolean canRun = new AtomicBoolean(false);
final TestAuditor auditor = new
TestAuditor(BookieImpl.getBookieId(servConf).toString(), servConf, bkc,
false, statsLogger, canRun);
final CountDownLatch latch = auditor.getLatch();
auditor.start();
killBookie(addressByIndex(0));
Awaitility.await().untilAsserted(() -> assertEquals(1, (long)
numBookieAuditsDelayed.get()));
final Future<?> auditTask = auditor.auditTask;
assertTrue(auditTask != null && !auditTask.isDone());
canRun.set(true);
assertTrue(latch.await(10, TimeUnit.SECONDS));
assertTrue(auditor.auditTask.equals(auditTask)
&& auditor.auditTask != null && !auditor.auditTask.isDone());
// wrong num is numLedgers, right num is 0
assertEquals("UNDER_REPLICATED_LEDGERS_TOTAL_SIZE",
numLedgers,
underReplicatedLedgerTotalSizeStatsLogger.getSuccessCount());
auditor.close();
}
@Test
public void testDelayBookieAuditOfPlacementPolicy() throws Exception {
for (AuditorElector e : auditorElectors.values()) {
e.shutdown();
}
final int numLedgers = 10;
List<Long> ids = new LinkedList<Long>();
for (int i = 0; i < numLedgers; i++) {
LedgerHandle lh = bkc.createLedger(3, 3, DigestType.CRC32,
"passwd".getBytes());
ids.add(lh.getId());
for (int j = 0; j < 2; j++) {
lh.addEntry("testdata".getBytes());
}
lh.close();
}
LedgerManagerFactory mFactory = driver.getLedgerManagerFactory();
LedgerUnderreplicationManager urm =
mFactory.newLedgerUnderreplicationManager();
ServerConfiguration servConf = new
ServerConfiguration(confByIndex(0));
TestStatsProvider statsProvider = new TestStatsProvider();
TestStatsLogger statsLogger =
statsProvider.getStatsLogger(AUDITOR_SCOPE);
Counter numBookieAuditsDelayed =
statsLogger.getCounter(ReplicationStats.NUM_BOOKIE_AUDITS_DELAYED);
TestOpStatsLogger placementPolicyCheckTime = (TestOpStatsLogger)
statsLogger
.getOpStatsLogger(ReplicationStats.PLACEMENT_POLICY_CHECK_TIME);
servConf.setAuditorPeriodicCheckInterval(0);
servConf.setAuditorPeriodicPlacementPolicyCheckInterval(1);
servConf.setAuditorPeriodicBookieCheckInterval(Long.MAX_VALUE);
urm.setLostBookieRecoveryDelay(Integer.MAX_VALUE);
AtomicBoolean canRun = new AtomicBoolean(false);
final TestAuditor auditor = new
TestAuditor(BookieImpl.getBookieId(servConf).toString(), servConf, bkc,
false, statsLogger, canRun);
final CountDownLatch latch = auditor.getLatch();
auditor.start();
killBookie(addressByIndex(0));
Awaitility.await().untilAsserted(() -> assertEquals(1, (long)
numBookieAuditsDelayed.get()));
final Future<?> auditTask = auditor.auditTask;
assertTrue(auditTask != null && !auditTask.isDone());
assertEquals("PLACEMENT_POLICY_CHECK_TIME", 0,
placementPolicyCheckTime.getSuccessCount());
canRun.set(true);
assertTrue(latch.await(10, TimeUnit.SECONDS));
assertTrue(auditor.auditTask.equals(auditTask)
&& auditor.auditTask != null && !auditor.auditTask.isDone());
assertTrue("PLACEMENT_POLICY_CHECK_TIME",
placementPolicyCheckTime.getSuccessCount() > 0);
auditor.close();
}
@Test
public void testDelayBookieAuditOfReplicasCheck() throws Exception {
for (AuditorElector e : auditorElectors.values()) {
e.shutdown();
}
final int numLedgers = 10;
List<Long> ids = new LinkedList<Long>();
for (int i = 0; i < numLedgers; i++) {
LedgerHandle lh = bkc.createLedger(3, 3, DigestType.CRC32,
"passwd".getBytes());
ids.add(lh.getId());
for (int j = 0; j < 2; j++) {
lh.addEntry("testdata".getBytes());
}
lh.close();
}
LedgerManagerFactory mFactory = driver.getLedgerManagerFactory();
LedgerUnderreplicationManager urm =
mFactory.newLedgerUnderreplicationManager();
ServerConfiguration servConf = new
ServerConfiguration(confByIndex(0));
TestStatsProvider statsProvider = new TestStatsProvider();
TestStatsLogger statsLogger =
statsProvider.getStatsLogger(AUDITOR_SCOPE);
Counter numBookieAuditsDelayed =
statsLogger.getCounter(ReplicationStats.NUM_BOOKIE_AUDITS_DELAYED);
TestOpStatsLogger replicasCheckTime = (TestOpStatsLogger) statsLogger
.getOpStatsLogger(ReplicationStats.REPLICAS_CHECK_TIME);
servConf.setAuditorPeriodicCheckInterval(0);
servConf.setAuditorPeriodicPlacementPolicyCheckInterval(0);
servConf.setAuditorPeriodicBookieCheckInterval(Long.MAX_VALUE);
servConf.setAuditorPeriodicReplicasCheckInterval(1);
urm.setLostBookieRecoveryDelay(Integer.MAX_VALUE);
AtomicBoolean canRun = new AtomicBoolean(false);
final TestAuditor auditor = new
TestAuditor(BookieImpl.getBookieId(servConf).toString(), servConf, bkc,
false, statsLogger, canRun);
final CountDownLatch latch = auditor.getLatch();
auditor.start();
killBookie(addressByIndex(0));
Awaitility.await().untilAsserted(() -> assertEquals(1, (long)
numBookieAuditsDelayed.get()));
final Future<?> auditTask = auditor.auditTask;
assertTrue(auditTask != null && !auditTask.isDone());
assertEquals("REPLICAS_CHECK_TIME", 0,
replicasCheckTime.getSuccessCount());
canRun.set(true);
assertTrue(latch.await(10, TimeUnit.SECONDS));
assertTrue(auditor.auditTask.equals(auditTask)
&& auditor.auditTask != null && !auditor.auditTask.isDone());
assertTrue("REPLICAS_CHECK_TIME",
replicasCheckTime.getSuccessCount() > 0);
auditor.close();
}
static class TestAuditor extends Auditor {
final AtomicReference<CountDownLatch> latchRef = new
AtomicReference<CountDownLatch>(new CountDownLatch(1));
public TestAuditor(String bookieIdentifier, ServerConfiguration
conf, BookKeeper bkc, boolean ownBkc,
StatsLogger statsLogger, AtomicBoolean exceptedRun) throws
UnavailableException {
super(bookieIdentifier, conf, bkc, ownBkc, statsLogger);
renewAuditorTestWrapperTask(exceptedRun);
}
public TestAuditor(String bookieIdentifier, ServerConfiguration
conf, BookKeeper bkc, boolean ownBkc,
BookKeeperAdmin bkadmin, boolean ownadmin, StatsLogger
statsLogger,
AtomicBoolean exceptedRun) throws
UnavailableException {
super(bookieIdentifier, conf, bkc, ownBkc, bkadmin, ownadmin,
statsLogger);
renewAuditorTestWrapperTask(exceptedRun);
}
public TestAuditor(final String bookieIdentifier,
ServerConfiguration conf, StatsLogger statsLogger,
AtomicBoolean exceptedRun)
throws UnavailableException {
super(bookieIdentifier, conf, statsLogger);
renewAuditorTestWrapperTask(exceptedRun);
}
private void renewAuditorTestWrapperTask(AtomicBoolean exceptedRun) {
super.auditorCheckAllLedgersTask =
new
AuditorTestWrapperTask(super.auditorCheckAllLedgersTask, latchRef, exceptedRun);
super.auditorPlacementPolicyCheckTask =
new
AuditorTestWrapperTask(super.auditorPlacementPolicyCheckTask, latchRef,
exceptedRun);
super.auditorReplicasCheckTask =
new
AuditorTestWrapperTask(super.auditorReplicasCheckTask, latchRef, exceptedRun);
}
CountDownLatch getLatch() {
return latchRef.get();
}
void setLatch(CountDownLatch latch) {
latchRef.set(latch);
}
private static class AuditorTestWrapperTask extends AuditorTask {
private final AuditorTask innerTask;
private final AtomicReference<CountDownLatch> latchRef;
private final AtomicBoolean exceptedRun;
AuditorTestWrapperTask(AuditorTask innerTask,
AtomicReference<CountDownLatch> latchRef,
AtomicBoolean exceptedRun) {
super(null, null, null, null, null,
null, null);
this.innerTask = innerTask;
this.latchRef = latchRef;
this.exceptedRun = exceptedRun;
}
@Override
protected void runTask() {
if (exceptedRun != null && exceptedRun.get()) {
innerTask.runTask();
latchRef.get().countDown();
}
}
@Override
public void shutdown() {
innerTask.shutdown();
}
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]