This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 29829a4 Add auditor get ledger throttle to avoid auto recovery zk
session expire (#2802)
29829a4 is described below
commit 29829a4cc3b47928ca8c894474f046bd9deb088d
Author: Hang Chen <[email protected]>
AuthorDate: Tue Oct 5 17:07:29 2021 +0800
Add auditor get ledger throttle to avoid auto recovery zk session expire
(#2802)
---
.../bookkeeper/conf/ServerConfiguration.java | 39 +++++++++++++++
.../bookkeeper/meta/AbstractZkLedgerManager.java | 8 ++--
.../org/apache/bookkeeper/replication/Auditor.java | 56 +++++++++++++++++++++-
.../replication/AuditorPeriodicCheckTest.java | 54 +++++++++++++++++++++
conf/bk_server.conf | 7 +++
5 files changed, 159 insertions(+), 5 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index dbc94bd..1466655 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -204,6 +204,10 @@ public class ServerConfiguration extends
AbstractConfiguration<ServerConfigurati
protected static final String UNDERREPLICATED_LEDGER_RECOVERY_GRACE_PERIOD
=
"underreplicatedLedgerRecoveryGracePeriod";
protected static final String AUDITOR_REPLICAS_CHECK_INTERVAL =
"auditorReplicasCheckInterval";
+ protected static final String
AUDITOR_MAX_NUMBER_OF_CONCURRENT_OPEN_LEDGER_OPERATIONS =
+ "auditorMaxNumberOfConcurrentOpenLedgerOperations";
+ protected static final String
AUDITOR_ACQUIRE_CONCURRENT_OPEN_LEDGER_OPERATIONS_TIMEOUT_MSEC =
+ "auditorAcquireConcurrentOpenLedgerOperationsTimeOutMSec";
// Worker Thread parameters.
protected static final String NUM_ADD_WORKER_THREADS =
"numAddWorkerThreads";
@@ -2525,6 +2529,41 @@ public class ServerConfiguration extends
AbstractConfiguration<ServerConfigurati
}
/**
+ * Get the semaphore limit value of getting ledger from zookeeper in auto
recovery.
+ *
+ * @return The semaphore value. By default it is 500.
+ */
+ public int getAuditorMaxNumberOfConcurrentOpenLedgerOperations() {
+ return getInt(AUDITOR_MAX_NUMBER_OF_CONCURRENT_OPEN_LEDGER_OPERATIONS,
500);
+ }
+
+ /**
+ * Set the semaphore limit value for getting ledger from zookeeper in auto
recovery.
+ * @param semaphore
+ */
+ public void setAuditorMaxNumberOfConcurrentOpenLedgerOperations(int
semaphore) {
+ setProperty(AUDITOR_MAX_NUMBER_OF_CONCURRENT_OPEN_LEDGER_OPERATIONS,
semaphore);
+ }
+
+ /**
+ * Get the acquire concurrent open ledger operations timeout.
+ *
+ * @return The timeout values. By default it is 120000ms
+ */
+ public int getAuditorAcquireConcurrentOpenLedgerOperationsTimeoutMSec() {
+ return
getInt(AUDITOR_ACQUIRE_CONCURRENT_OPEN_LEDGER_OPERATIONS_TIMEOUT_MSEC, 120000);
+ }
+
+ /**
+ * Set the acquire concurrent open ledger operations timeout.
+ * @param timeoutMs
+ */
+ public void setAuditorAcquireConcurrentOpenLedgerOperationsTimeoutMSec(int
timeoutMs) {
+
setProperty(AUDITOR_ACQUIRE_CONCURRENT_OPEN_LEDGER_OPERATIONS_TIMEOUT_MSEC,
timeoutMs);
+ }
+
+
+ /**
* Set what percentage of a ledger (fragment)'s entries will be verified.
* 0 - only the first and last entry of each ledger fragment would be
verified
* 100 - the entire ledger fragment would be verified
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
index 0e16d69..cda9370 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
@@ -568,9 +568,11 @@ public abstract class AbstractZkLedgerManager implements
LedgerManager, Watcher
MultiCallback mcb = new MultiCallback(zkActiveLedgers.size(),
finalCb, ctx,
successRc, failureRc);
// start loop over all ledgers
- for (Long ledger : zkActiveLedgers) {
- processor.process(ledger, mcb);
- }
+ scheduler.submit(() -> {
+ for (Long ledger : zkActiveLedgers) {
+ processor.process(ledger, mcb);
+ }
+ });
}
});
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
index 064c7fc..3220b3f 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
@@ -165,6 +165,8 @@ public class Auditor implements AutoCloseable {
private final AtomicInteger
numLedgersFoundHavingLessThanWQReplicasOfAnEntry;
private final long underreplicatedLedgerRecoveryGracePeriod;
private final int zkOpTimeoutMs;
+ private final Semaphore openLedgerNoRecoverySemaphore;
+ private final int openLedgerNoRecoverySemaphoreWaitTimeoutMSec;
private final StatsLogger statsLogger;
@StatsDoc(
@@ -348,6 +350,20 @@ public class Auditor implements AutoCloseable {
this.numLedgersHavingLessThanWQReplicasOfAnEntryGuageValue = new
AtomicInteger(0);
this.numLedgersFoundHavingLessThanWQReplicasOfAnEntry = new
AtomicInteger(0);
+ if (conf.getAuditorMaxNumberOfConcurrentOpenLedgerOperations() <= 0) {
+ LOG.error("auditorMaxNumberOfConcurrentOpenLedgerOperations should
be greater than 0");
+ throw new
UnavailableException("auditorMaxNumberOfConcurrentOpenLedgerOperations should
be greater than 0");
+ }
+ this.openLedgerNoRecoverySemaphore = new
Semaphore(conf.getAuditorMaxNumberOfConcurrentOpenLedgerOperations());
+
+ if (conf.getAuditorAcquireConcurrentOpenLedgerOperationsTimeoutMSec()
< 0) {
+ LOG.error("auditorAcquireConcurrentOpenLedgerOperationsTimeoutMSec
should be greater than or equal to 0");
+ throw new
UnavailableException("auditorAcquireConcurrentOpenLedgerOperationsTimeoutMSec "
+ + "should be greater than or equal to 0");
+ }
+ this.openLedgerNoRecoverySemaphoreWaitTimeoutMSec =
+ conf.getAuditorAcquireConcurrentOpenLedgerOperationsTimeoutMSec();
+
numUnderReplicatedLedger =
this.statsLogger.getOpStatsLogger(ReplicationStats.NUM_UNDER_REPLICATED_LEDGERS);
underReplicatedLedgerTotalSize =
this.statsLogger.getOpStatsLogger(UNDER_REPLICATED_LEDGERS_TOTAL_SIZE);
uRLPublishTimeForLostBookies = this.statsLogger
@@ -1205,12 +1221,32 @@ public class Auditor implements AutoCloseable {
}
/**
+ * Get BookKeeper client according to configuration.
+ * @param conf
+ * @return
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ BookKeeper getBookKeeper(ServerConfiguration conf) throws IOException,
InterruptedException {
+ return createBookKeeperClient(conf);
+ }
+
+ /**
+ * Get BookKeeper admin according to bookKeeper client.
+ * @param bookKeeper
+ * @return
+ */
+ BookKeeperAdmin getBookKeeperAdmin(final BookKeeper bookKeeper) {
+ return new BookKeeperAdmin(bookKeeper, statsLogger);
+ }
+
+ /**
* List all the ledgers and check them individually. This should not
* be run very often.
*/
void checkAllLedgers() throws BKException, IOException,
InterruptedException, KeeperException {
- final BookKeeper localClient = createBookKeeperClient(conf);
- final BookKeeperAdmin localAdmin = new BookKeeperAdmin(localClient,
statsLogger);
+ final BookKeeper localClient = getBookKeeper(conf);
+ final BookKeeperAdmin localAdmin = getBookKeeperAdmin(localClient);
try {
final LedgerChecker checker = new LedgerChecker(localClient);
@@ -1230,7 +1266,23 @@ public class Auditor implements AutoCloseable {
return;
}
+ try {
+ if
(!openLedgerNoRecoverySemaphore.tryAcquire(openLedgerNoRecoverySemaphoreWaitTimeoutMSec,
+ TimeUnit.MILLISECONDS)) {
+ LOG.warn("Failed to acquire semaphore for {} ms,
ledgerId: {}",
+ openLedgerNoRecoverySemaphoreWaitTimeoutMSec,
ledgerId);
+ FutureUtils.complete(processFuture, null);
+ return;
+ }
+ } catch (InterruptedException e) {
+ LOG.error("Unable to acquire open ledger operation
semaphore ", e);
+ Thread.currentThread().interrupt();
+ FutureUtils.complete(processFuture, null);
+ return;
+ }
+
localAdmin.asyncOpenLedgerNoRecovery(ledgerId, (rc, lh, ctx)
-> {
+ openLedgerNoRecoverySemaphore.release();
if (Code.OK == rc) {
checker.checkLedger(lh,
// the ledger handle will be closed after
checkLedger is done.
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
index f47486c..c8c604d 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
@@ -25,6 +25,13 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
import io.netty.buffer.ByteBuf;
@@ -51,6 +58,7 @@ import org.apache.bookkeeper.bookie.BookieAccessor;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.bookie.BookieImpl;
import org.apache.bookkeeper.bookie.IndexPersistenceMgr;
+import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
@@ -76,6 +84,7 @@ import org.apache.zookeeper.KeeperException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -366,6 +375,51 @@ public class AuditorPeriodicCheckTest extends
BookKeeperClusterTestCase {
}
@Test
+ public void testGetLedgerFromZookeeperThrottled() throws Exception {
+ final int numberLedgers = 30;
+
+ // write ledgers into bookkeeper cluster
+ try {
+ for (AuditorElector e : auditorElectors.values()) {
+ e.shutdown();
+ }
+
+ for (int i = 0; i < numberLedgers; ++i) {
+ LedgerHandle lh = bkc.createLedger(3, 3, DigestType.CRC32,
"passwd".getBytes());
+ for (int j = 0; j < 5; j++) {
+ lh.addEntry("testdata".getBytes());
+ }
+ lh.close();
+ }
+ } catch (InterruptedException | BKException e) {
+ LOG.error("Failed to shutdown auditor elector or write data to
ledgers ", e);
+ fail();
+ }
+
+ // create auditor and call `checkAllLedgers`
+ ServerConfiguration configuration = confByIndex(0);
+ configuration.setAuditorMaxNumberOfConcurrentOpenLedgerOperations(10);
+
+ Auditor auditor1 = new
Auditor(BookieImpl.getBookieId(configuration).toString(),
+ configuration, NullStatsLogger.INSTANCE);
+ Auditor auditor = Mockito.spy(auditor1);
+
+ BookKeeper bookKeeper =
Mockito.spy(auditor.getBookKeeper(configuration));
+ BookKeeperAdmin admin =
Mockito.spy(auditor.getBookKeeperAdmin(bookKeeper));
+ when(auditor.getBookKeeper(configuration)).thenReturn(bookKeeper);
+ when(auditor.getBookKeeperAdmin(bookKeeper)).thenReturn(admin);
+
+ try {
+ auditor.checkAllLedgers();
+ verify(admin,
times(numberLedgers)).asyncOpenLedgerNoRecovery(anyLong(),
+ any(AsyncCallback.OpenCallback.class), eq(null));
+ } catch (Exception e) {
+ LOG.error("Caught exception while checking all ledgers ", e);
+ fail();
+ }
+ }
+
+ @Test
public void testInitialDelayOfCheckAllLedgers() throws Exception {
for (AuditorElector e : auditorElectors.values()) {
e.shutdown();
diff --git a/conf/bk_server.conf b/conf/bk_server.conf
index 53e4135..3941fe1 100755
--- a/conf/bk_server.conf
+++ b/conf/bk_server.conf
@@ -875,6 +875,13 @@ zkEnableSecurity=false
# the provided digest type provided at `digestType` and the provided passwd
provided at `passwd`.
# enableDigestTypeAutodetection=true
+# Semaphore limit of getting ledger from zookeeper. Used to throttle the
zookeeper client request operation
+# sending to Zookeeper server. Default value is 500
+# auditorMaxNumberOfConcurrentOpenLedgerOperations=500
+
+# Wait timeout of acquiring semaphore of concurrent open ledger operations.
Default value is 120000ms.
+# auditorAcquireConcurrentOpenLedgerOperationsTimeOutMSec=120000
+
#############################################################################
## Placement settings
#############################################################################