wenbingshen opened a new issue, #3817:
URL: https://github.com/apache/bookkeeper/issues/3817

   ***Describe the bug***
   
   When we were on vacation, a bookie node rejoined the cluster after shaking, 
and the bookie came back online at the lostBookieRecoveryDelay time, and the 
Auditor replicas check task happened to perform checks and recovery during the 
lostBookieRecoveryDelay period, which would cause unnecessary replica recovery, 
and  heavy write and read operations to zookeeper, bookie Cluster.
   
   ```java
       @Test
       public void testDelayBookieAuditOfCheckAllLedgers() throws Exception {
           for (AuditorElector e : auditorElectors.values()) {
               e.shutdown();
           }
   
           final int numLedgers = 10;
           List<Long> ids = new LinkedList<Long>();
           for (int i = 0; i < numLedgers; i++) {
               LedgerHandle lh = bkc.createLedger(3, 3, DigestType.CRC32, 
"passwd".getBytes());
               ids.add(lh.getId());
               for (int j = 0; j < 2; j++) {
                   lh.addEntry("testdata".getBytes());
               }
               lh.close();
           }
   
           LedgerManagerFactory mFactory = driver.getLedgerManagerFactory();
           LedgerUnderreplicationManager urm = 
mFactory.newLedgerUnderreplicationManager();
   
           ServerConfiguration servConf = new 
ServerConfiguration(confByIndex(0));
   
           TestStatsProvider statsProvider = new TestStatsProvider();
           TestStatsLogger statsLogger = 
statsProvider.getStatsLogger(AUDITOR_SCOPE);
           Counter numBookieAuditsDelayed =
                   
statsLogger.getCounter(ReplicationStats.NUM_BOOKIE_AUDITS_DELAYED);
           TestOpStatsLogger underReplicatedLedgerTotalSizeStatsLogger = 
(TestOpStatsLogger) statsLogger
                   
.getOpStatsLogger(ReplicationStats.UNDER_REPLICATED_LEDGERS_TOTAL_SIZE);
   
           servConf.setAuditorPeriodicCheckInterval(1);
           servConf.setAuditorPeriodicPlacementPolicyCheckInterval(0);
           servConf.setAuditorPeriodicBookieCheckInterval(Long.MAX_VALUE);
   
           urm.setLostBookieRecoveryDelay(Integer.MAX_VALUE);
   
           AtomicBoolean canRun = new AtomicBoolean(false);
   
           final TestAuditor auditor = new 
TestAuditor(BookieImpl.getBookieId(servConf).toString(), servConf, bkc,
                   false, statsLogger, canRun);
           final CountDownLatch latch = auditor.getLatch();
   
           auditor.start();
   
           killBookie(addressByIndex(0));
   
           Awaitility.await().untilAsserted(() -> assertEquals(1, (long) 
numBookieAuditsDelayed.get()));
           final Future<?> auditTask = auditor.auditTask;
           assertTrue(auditTask != null && !auditTask.isDone());
   
           canRun.set(true);
   
           assertTrue(latch.await(10, TimeUnit.SECONDS));
           assertTrue(auditor.auditTask.equals(auditTask)
                   && auditor.auditTask != null && !auditor.auditTask.isDone());
           // wrong num is numLedgers, right num is 0
           assertEquals("UNDER_REPLICATED_LEDGERS_TOTAL_SIZE",
                   numLedgers,
                   underReplicatedLedgerTotalSizeStatsLogger.getSuccessCount());
   
           auditor.close();
       }
   
       @Test
       public void testDelayBookieAuditOfPlacementPolicy() throws Exception {
           for (AuditorElector e : auditorElectors.values()) {
               e.shutdown();
           }
   
           final int numLedgers = 10;
           List<Long> ids = new LinkedList<Long>();
           for (int i = 0; i < numLedgers; i++) {
               LedgerHandle lh = bkc.createLedger(3, 3, DigestType.CRC32, 
"passwd".getBytes());
               ids.add(lh.getId());
               for (int j = 0; j < 2; j++) {
                   lh.addEntry("testdata".getBytes());
               }
               lh.close();
           }
   
           LedgerManagerFactory mFactory = driver.getLedgerManagerFactory();
           LedgerUnderreplicationManager urm = 
mFactory.newLedgerUnderreplicationManager();
   
           ServerConfiguration servConf = new 
ServerConfiguration(confByIndex(0));
   
           TestStatsProvider statsProvider = new TestStatsProvider();
           TestStatsLogger statsLogger = 
statsProvider.getStatsLogger(AUDITOR_SCOPE);
           Counter numBookieAuditsDelayed =
                   
statsLogger.getCounter(ReplicationStats.NUM_BOOKIE_AUDITS_DELAYED);
           TestOpStatsLogger placementPolicyCheckTime = (TestOpStatsLogger) 
statsLogger
                   
.getOpStatsLogger(ReplicationStats.PLACEMENT_POLICY_CHECK_TIME);
   
           servConf.setAuditorPeriodicCheckInterval(0);
           servConf.setAuditorPeriodicPlacementPolicyCheckInterval(1);
           servConf.setAuditorPeriodicBookieCheckInterval(Long.MAX_VALUE);
   
           urm.setLostBookieRecoveryDelay(Integer.MAX_VALUE);
   
           AtomicBoolean canRun = new AtomicBoolean(false);
   
           final TestAuditor auditor = new 
TestAuditor(BookieImpl.getBookieId(servConf).toString(), servConf, bkc,
                   false, statsLogger, canRun);
           final CountDownLatch latch = auditor.getLatch();
   
           auditor.start();
   
           killBookie(addressByIndex(0));
   
           Awaitility.await().untilAsserted(() -> assertEquals(1, (long) 
numBookieAuditsDelayed.get()));
           final Future<?> auditTask = auditor.auditTask;
           assertTrue(auditTask != null && !auditTask.isDone());
           assertEquals("PLACEMENT_POLICY_CHECK_TIME", 0, 
placementPolicyCheckTime.getSuccessCount());
   
           canRun.set(true);
   
           assertTrue(latch.await(10, TimeUnit.SECONDS));
           assertTrue(auditor.auditTask.equals(auditTask)
                   && auditor.auditTask != null && !auditor.auditTask.isDone());
           assertTrue("PLACEMENT_POLICY_CHECK_TIME", 
placementPolicyCheckTime.getSuccessCount() > 0);
   
           auditor.close();
       }
   
       @Test
       public void testDelayBookieAuditOfReplicasCheck() throws Exception {
           for (AuditorElector e : auditorElectors.values()) {
               e.shutdown();
           }
   
           final int numLedgers = 10;
           List<Long> ids = new LinkedList<Long>();
           for (int i = 0; i < numLedgers; i++) {
               LedgerHandle lh = bkc.createLedger(3, 3, DigestType.CRC32, 
"passwd".getBytes());
               ids.add(lh.getId());
               for (int j = 0; j < 2; j++) {
                   lh.addEntry("testdata".getBytes());
               }
               lh.close();
           }
   
           LedgerManagerFactory mFactory = driver.getLedgerManagerFactory();
           LedgerUnderreplicationManager urm = 
mFactory.newLedgerUnderreplicationManager();
   
           ServerConfiguration servConf = new 
ServerConfiguration(confByIndex(0));
   
           TestStatsProvider statsProvider = new TestStatsProvider();
           TestStatsLogger statsLogger = 
statsProvider.getStatsLogger(AUDITOR_SCOPE);
           Counter numBookieAuditsDelayed =
                   
statsLogger.getCounter(ReplicationStats.NUM_BOOKIE_AUDITS_DELAYED);
           TestOpStatsLogger replicasCheckTime = (TestOpStatsLogger) statsLogger
                   .getOpStatsLogger(ReplicationStats.REPLICAS_CHECK_TIME);
   
           servConf.setAuditorPeriodicCheckInterval(0);
           servConf.setAuditorPeriodicPlacementPolicyCheckInterval(0);
           servConf.setAuditorPeriodicBookieCheckInterval(Long.MAX_VALUE);
           servConf.setAuditorPeriodicReplicasCheckInterval(1);
   
           urm.setLostBookieRecoveryDelay(Integer.MAX_VALUE);
   
           AtomicBoolean canRun = new AtomicBoolean(false);
   
           final TestAuditor auditor = new 
TestAuditor(BookieImpl.getBookieId(servConf).toString(), servConf, bkc,
                   false, statsLogger, canRun);
           final CountDownLatch latch = auditor.getLatch();
   
           auditor.start();
   
           killBookie(addressByIndex(0));
   
           Awaitility.await().untilAsserted(() -> assertEquals(1, (long) 
numBookieAuditsDelayed.get()));
           final Future<?> auditTask = auditor.auditTask;
           assertTrue(auditTask != null && !auditTask.isDone());
           assertEquals("REPLICAS_CHECK_TIME", 0, 
replicasCheckTime.getSuccessCount());
   
           canRun.set(true);
   
           assertTrue(latch.await(10, TimeUnit.SECONDS));
           assertTrue(auditor.auditTask.equals(auditTask)
                   && auditor.auditTask != null && !auditor.auditTask.isDone());
           assertTrue("REPLICAS_CHECK_TIME", 
replicasCheckTime.getSuccessCount() > 0);
   
           auditor.close();
       }
   
       static class TestAuditor extends Auditor {
   
           final AtomicReference<CountDownLatch> latchRef = new 
AtomicReference<CountDownLatch>(new CountDownLatch(1));
   
           public TestAuditor(String bookieIdentifier, ServerConfiguration 
conf, BookKeeper bkc, boolean ownBkc,
                   StatsLogger statsLogger, AtomicBoolean exceptedRun) throws 
UnavailableException {
               super(bookieIdentifier, conf, bkc, ownBkc, statsLogger);
               renewAuditorTestWrapperTask(exceptedRun);
           }
   
           public TestAuditor(String bookieIdentifier, ServerConfiguration 
conf, BookKeeper bkc, boolean ownBkc,
                   BookKeeperAdmin bkadmin, boolean ownadmin, StatsLogger 
statsLogger,
                              AtomicBoolean exceptedRun) throws 
UnavailableException {
               super(bookieIdentifier, conf, bkc, ownBkc, bkadmin, ownadmin, 
statsLogger);
               renewAuditorTestWrapperTask(exceptedRun);
           }
   
           public TestAuditor(final String bookieIdentifier, 
ServerConfiguration conf, StatsLogger statsLogger,
                              AtomicBoolean exceptedRun)
                   throws UnavailableException {
               super(bookieIdentifier, conf, statsLogger);
               renewAuditorTestWrapperTask(exceptedRun);
           }
   
           private void renewAuditorTestWrapperTask(AtomicBoolean exceptedRun) {
               super.auditorCheckAllLedgersTask =
                       new 
AuditorTestWrapperTask(super.auditorCheckAllLedgersTask, latchRef, exceptedRun);
               super.auditorPlacementPolicyCheckTask =
                       new 
AuditorTestWrapperTask(super.auditorPlacementPolicyCheckTask, latchRef, 
exceptedRun);
               super.auditorReplicasCheckTask =
                       new 
AuditorTestWrapperTask(super.auditorReplicasCheckTask, latchRef, exceptedRun);
           }
   
           CountDownLatch getLatch() {
               return latchRef.get();
           }
   
           void setLatch(CountDownLatch latch) {
               latchRef.set(latch);
           }
   
           private static class AuditorTestWrapperTask extends AuditorTask {
               private final AuditorTask innerTask;
               private final AtomicReference<CountDownLatch> latchRef;
               private final AtomicBoolean exceptedRun;
   
               AuditorTestWrapperTask(AuditorTask innerTask,
                                      AtomicReference<CountDownLatch> latchRef,
                                      AtomicBoolean exceptedRun) {
                   super(null, null, null, null, null,
                           null, null);
                   this.innerTask = innerTask;
                   this.latchRef = latchRef;
                   this.exceptedRun = exceptedRun;
               }
   
               @Override
               protected void runTask() {
                   if (exceptedRun != null && exceptedRun.get()) {
                       innerTask.runTask();
                       latchRef.get().countDown();
                   }
               }
   
               @Override
               public void shutdown() {
                   innerTask.shutdown();
               }
           }
       }
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to