This is an automated email from the ASF dual-hosted git repository. yong pushed a commit to branch branch-4.14 in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit 1da014eb5f1f81e0be03536ff7b2dab293518e8f Author: Hang Chen <[email protected]> AuthorDate: Tue Oct 5 17:07:29 2021 +0800 Add auditor get ledger throttle to avoid auto recovery zk session expire (#2802) (cherry picked from commit 29829a4cc3b47928ca8c894474f046bd9deb088d) --- .../bookkeeper/conf/ServerConfiguration.java | 39 +++++++++++++++ .../bookkeeper/meta/AbstractZkLedgerManager.java | 8 ++-- .../org/apache/bookkeeper/replication/Auditor.java | 56 +++++++++++++++++++++- .../replication/AuditorPeriodicCheckTest.java | 54 +++++++++++++++++++++ conf/bk_server.conf | 7 +++ 5 files changed, 159 insertions(+), 5 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java index 00ef028..1a50d02 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java @@ -202,6 +202,10 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati protected static final String UNDERREPLICATED_LEDGER_RECOVERY_GRACE_PERIOD = "underreplicatedLedgerRecoveryGracePeriod"; protected static final String AUDITOR_REPLICAS_CHECK_INTERVAL = "auditorReplicasCheckInterval"; + protected static final String AUDITOR_MAX_NUMBER_OF_CONCURRENT_OPEN_LEDGER_OPERATIONS = + "auditorMaxNumberOfConcurrentOpenLedgerOperations"; + protected static final String AUDITOR_ACQUIRE_CONCURRENT_OPEN_LEDGER_OPERATIONS_TIMEOUT_MSEC = + "auditorAcquireConcurrentOpenLedgerOperationsTimeOutMSec"; // Worker Thread parameters. protected static final String NUM_ADD_WORKER_THREADS = "numAddWorkerThreads"; @@ -2503,6 +2507,41 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati } /** + * Get the semaphore limit value of getting ledger from zookeeper in auto recovery. + * + * @return The semaphore value. By default it is 500. + */ + public int getAuditorMaxNumberOfConcurrentOpenLedgerOperations() { + return getInt(AUDITOR_MAX_NUMBER_OF_CONCURRENT_OPEN_LEDGER_OPERATIONS, 500); + } + + /** + * Set the semaphore limit value for getting ledger from zookeeper in auto recovery. + * @param semaphore + */ + public void setAuditorMaxNumberOfConcurrentOpenLedgerOperations(int semaphore) { + setProperty(AUDITOR_MAX_NUMBER_OF_CONCURRENT_OPEN_LEDGER_OPERATIONS, semaphore); + } + + /** + * Get the acquire concurrent open ledger operations timeout. + * + * @return The timeout values. By default it is 120000ms + */ + public int getAuditorAcquireConcurrentOpenLedgerOperationsTimeoutMSec() { + return getInt(AUDITOR_ACQUIRE_CONCURRENT_OPEN_LEDGER_OPERATIONS_TIMEOUT_MSEC, 120000); + } + + /** + * Set the acquire concurrent open ledger operations timeout. + * @param timeoutMs + */ + public void setAuditorAcquireConcurrentOpenLedgerOperationsTimeoutMSec(int timeoutMs) { + setProperty(AUDITOR_ACQUIRE_CONCURRENT_OPEN_LEDGER_OPERATIONS_TIMEOUT_MSEC, timeoutMs); + } + + + /** * Set what percentage of a ledger (fragment)'s entries will be verified. * 0 - only the first and last entry of each ledger fragment would be verified * 100 - the entire ledger fragment would be verified diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java index 0e16d69..cda9370 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java @@ -568,9 +568,11 @@ public abstract class AbstractZkLedgerManager implements LedgerManager, Watcher MultiCallback mcb = new MultiCallback(zkActiveLedgers.size(), finalCb, ctx, successRc, failureRc); // start loop over all ledgers - for (Long ledger : zkActiveLedgers) { - processor.process(ledger, mcb); - } + scheduler.submit(() -> { + for (Long ledger : zkActiveLedgers) { + processor.process(ledger, mcb); + } + }); } }); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java index e4cf394..2c61de3 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java @@ -165,6 +165,8 @@ public class Auditor implements AutoCloseable { private final AtomicInteger numLedgersFoundHavingLessThanWQReplicasOfAnEntry; private final long underreplicatedLedgerRecoveryGracePeriod; private final int zkOpTimeoutMs; + private final Semaphore openLedgerNoRecoverySemaphore; + private final int openLedgerNoRecoverySemaphoreWaitTimeoutMSec; private final StatsLogger statsLogger; @StatsDoc( @@ -348,6 +350,20 @@ public class Auditor implements AutoCloseable { this.numLedgersHavingLessThanWQReplicasOfAnEntryGuageValue = new AtomicInteger(0); this.numLedgersFoundHavingLessThanWQReplicasOfAnEntry = new AtomicInteger(0); + if (conf.getAuditorMaxNumberOfConcurrentOpenLedgerOperations() <= 0) { + LOG.error("auditorMaxNumberOfConcurrentOpenLedgerOperations should be greater than 0"); + throw new UnavailableException("auditorMaxNumberOfConcurrentOpenLedgerOperations should be greater than 0"); + } + this.openLedgerNoRecoverySemaphore = new Semaphore(conf.getAuditorMaxNumberOfConcurrentOpenLedgerOperations()); + + if (conf.getAuditorAcquireConcurrentOpenLedgerOperationsTimeoutMSec() < 0) { + LOG.error("auditorAcquireConcurrentOpenLedgerOperationsTimeoutMSec should be greater than or equal to 0"); + throw new UnavailableException("auditorAcquireConcurrentOpenLedgerOperationsTimeoutMSec " + + "should be greater than or equal to 0"); + } + this.openLedgerNoRecoverySemaphoreWaitTimeoutMSec = + conf.getAuditorAcquireConcurrentOpenLedgerOperationsTimeoutMSec(); + numUnderReplicatedLedger = this.statsLogger.getOpStatsLogger(ReplicationStats.NUM_UNDER_REPLICATED_LEDGERS); underReplicatedLedgerTotalSize = this.statsLogger.getOpStatsLogger(UNDER_REPLICATED_LEDGERS_TOTAL_SIZE); uRLPublishTimeForLostBookies = this.statsLogger @@ -1205,12 +1221,32 @@ public class Auditor implements AutoCloseable { } /** + * Get BookKeeper client according to configuration. + * @param conf + * @return + * @throws IOException + * @throws InterruptedException + */ + BookKeeper getBookKeeper(ServerConfiguration conf) throws IOException, InterruptedException { + return createBookKeeperClient(conf); + } + + /** + * Get BookKeeper admin according to bookKeeper client. + * @param bookKeeper + * @return + */ + BookKeeperAdmin getBookKeeperAdmin(final BookKeeper bookKeeper) { + return new BookKeeperAdmin(bookKeeper, statsLogger); + } + + /** * List all the ledgers and check them individually. This should not * be run very often. */ void checkAllLedgers() throws BKException, IOException, InterruptedException, KeeperException { - final BookKeeper localClient = createBookKeeperClient(conf); - final BookKeeperAdmin localAdmin = new BookKeeperAdmin(localClient, statsLogger); + final BookKeeper localClient = getBookKeeper(conf); + final BookKeeperAdmin localAdmin = getBookKeeperAdmin(localClient); try { final LedgerChecker checker = new LedgerChecker(localClient); @@ -1230,7 +1266,23 @@ public class Auditor implements AutoCloseable { return; } + try { + if (!openLedgerNoRecoverySemaphore.tryAcquire(openLedgerNoRecoverySemaphoreWaitTimeoutMSec, + TimeUnit.MILLISECONDS)) { + LOG.warn("Failed to acquire semaphore for {} ms, ledgerId: {}", + openLedgerNoRecoverySemaphoreWaitTimeoutMSec, ledgerId); + FutureUtils.complete(processFuture, null); + return; + } + } catch (InterruptedException e) { + LOG.error("Unable to acquire open ledger operation semaphore ", e); + Thread.currentThread().interrupt(); + FutureUtils.complete(processFuture, null); + return; + } + localAdmin.asyncOpenLedgerNoRecovery(ledgerId, (rc, lh, ctx) -> { + openLedgerNoRecoverySemaphore.release(); if (Code.OK == rc) { checker.checkLedger(lh, // the ledger handle will be closed after checkLedger is done. diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java index 8ea6636..d60451d 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java @@ -25,6 +25,13 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import io.netty.buffer.ByteBuf; @@ -50,6 +57,7 @@ import org.apache.bookkeeper.bookie.Bookie; import org.apache.bookkeeper.bookie.BookieAccessor; import org.apache.bookkeeper.bookie.BookieException; import org.apache.bookkeeper.bookie.IndexPersistenceMgr; +import org.apache.bookkeeper.client.AsyncCallback; import org.apache.bookkeeper.client.AsyncCallback.AddCallback; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; @@ -75,6 +83,7 @@ import org.apache.zookeeper.KeeperException; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -366,6 +375,51 @@ public class AuditorPeriodicCheckTest extends BookKeeperClusterTestCase { } @Test + public void testGetLedgerFromZookeeperThrottled() throws Exception { + final int numberLedgers = 30; + + // write ledgers into bookkeeper cluster + try { + for (AuditorElector e : auditorElectors.values()) { + e.shutdown(); + } + + for (int i = 0; i < numberLedgers; ++i) { + LedgerHandle lh = bkc.createLedger(3, 3, DigestType.CRC32, "passwd".getBytes()); + for (int j = 0; j < 5; j++) { + lh.addEntry("testdata".getBytes()); + } + lh.close(); + } + } catch (InterruptedException | BKException e) { + LOG.error("Failed to shutdown auditor elector or write data to ledgers ", e); + fail(); + } + + // create auditor and call `checkAllLedgers` + ServerConfiguration configuration = confByIndex(0); + configuration.setAuditorMaxNumberOfConcurrentOpenLedgerOperations(10); + + Auditor auditor1 = new Auditor(BookieImpl.getBookieId(configuration).toString(), + configuration, NullStatsLogger.INSTANCE); + Auditor auditor = Mockito.spy(auditor1); + + BookKeeper bookKeeper = Mockito.spy(auditor.getBookKeeper(configuration)); + BookKeeperAdmin admin = Mockito.spy(auditor.getBookKeeperAdmin(bookKeeper)); + when(auditor.getBookKeeper(configuration)).thenReturn(bookKeeper); + when(auditor.getBookKeeperAdmin(bookKeeper)).thenReturn(admin); + + try { + auditor.checkAllLedgers(); + verify(admin, times(numberLedgers)).asyncOpenLedgerNoRecovery(anyLong(), + any(AsyncCallback.OpenCallback.class), eq(null)); + } catch (Exception e) { + LOG.error("Caught exception while checking all ledgers ", e); + fail(); + } + } + + @Test public void testInitialDelayOfCheckAllLedgers() throws Exception { for (AuditorElector e : auditorElectors.values()) { e.shutdown(); diff --git a/conf/bk_server.conf b/conf/bk_server.conf index 21712d5..639bda2 100755 --- a/conf/bk_server.conf +++ b/conf/bk_server.conf @@ -869,6 +869,13 @@ zkEnableSecurity=false # the provided digest type provided at `digestType` and the provided passwd provided at `passwd`. # enableDigestTypeAutodetection=true +# Semaphore limit of getting ledger from zookeeper. Used to throttle the zookeeper client request operation +# sending to Zookeeper server. Default value is 500 +# auditorMaxNumberOfConcurrentOpenLedgerOperations=500 + +# Wait timeout of acquiring semaphore of concurrent open ledger operations. Default value is 120000ms. +# auditorAcquireConcurrentOpenLedgerOperationsTimeOutMSec=120000 + ############################################################################# ## Placement settings #############################################################################
