This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 29829a4  Add auditor get ledger throttle to avoid auto recovery zk 
session expire (#2802)
29829a4 is described below

commit 29829a4cc3b47928ca8c894474f046bd9deb088d
Author: Hang Chen <[email protected]>
AuthorDate: Tue Oct 5 17:07:29 2021 +0800

    Add auditor get ledger throttle to avoid auto recovery zk session expire 
(#2802)
---
 .../bookkeeper/conf/ServerConfiguration.java       | 39 +++++++++++++++
 .../bookkeeper/meta/AbstractZkLedgerManager.java   |  8 ++--
 .../org/apache/bookkeeper/replication/Auditor.java | 56 +++++++++++++++++++++-
 .../replication/AuditorPeriodicCheckTest.java      | 54 +++++++++++++++++++++
 conf/bk_server.conf                                |  7 +++
 5 files changed, 159 insertions(+), 5 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index dbc94bd..1466655 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -204,6 +204,10 @@ public class ServerConfiguration extends 
AbstractConfiguration<ServerConfigurati
     protected static final String UNDERREPLICATED_LEDGER_RECOVERY_GRACE_PERIOD 
=
             "underreplicatedLedgerRecoveryGracePeriod";
     protected static final String AUDITOR_REPLICAS_CHECK_INTERVAL = 
"auditorReplicasCheckInterval";
+    protected static final String 
AUDITOR_MAX_NUMBER_OF_CONCURRENT_OPEN_LEDGER_OPERATIONS =
+        "auditorMaxNumberOfConcurrentOpenLedgerOperations";
+    protected static final String 
AUDITOR_ACQUIRE_CONCURRENT_OPEN_LEDGER_OPERATIONS_TIMEOUT_MSEC =
+        "auditorAcquireConcurrentOpenLedgerOperationsTimeOutMSec";
 
     // Worker Thread parameters.
     protected static final String NUM_ADD_WORKER_THREADS = 
"numAddWorkerThreads";
@@ -2525,6 +2529,41 @@ public class ServerConfiguration extends 
AbstractConfiguration<ServerConfigurati
     }
 
     /**
+     * Get the semaphore limit value of getting ledger from zookeeper in auto 
recovery.
+     *
+     * @return The semaphore value. By default it is 500.
+     */
+    public int getAuditorMaxNumberOfConcurrentOpenLedgerOperations() {
+        return getInt(AUDITOR_MAX_NUMBER_OF_CONCURRENT_OPEN_LEDGER_OPERATIONS, 
500);
+    }
+
+    /**
+     * Set the semaphore limit value for getting ledger from zookeeper in auto 
recovery.
+     * @param semaphore
+     */
+    public void setAuditorMaxNumberOfConcurrentOpenLedgerOperations(int 
semaphore) {
+        setProperty(AUDITOR_MAX_NUMBER_OF_CONCURRENT_OPEN_LEDGER_OPERATIONS, 
semaphore);
+    }
+
+    /**
+     * Get the acquire concurrent open ledger operations timeout.
+     *
+     * @return The timeout values. By default it is 120000ms
+     */
+    public int getAuditorAcquireConcurrentOpenLedgerOperationsTimeoutMSec() {
+        return 
getInt(AUDITOR_ACQUIRE_CONCURRENT_OPEN_LEDGER_OPERATIONS_TIMEOUT_MSEC, 120000);
+    }
+
+    /**
+     * Set the acquire concurrent open ledger operations timeout.
+     * @param timeoutMs
+     */
+    public void setAuditorAcquireConcurrentOpenLedgerOperationsTimeoutMSec(int 
timeoutMs) {
+        
setProperty(AUDITOR_ACQUIRE_CONCURRENT_OPEN_LEDGER_OPERATIONS_TIMEOUT_MSEC, 
timeoutMs);
+    }
+
+
+    /**
      * Set what percentage of a ledger (fragment)'s entries will be verified.
      * 0 - only the first and last entry of each ledger fragment would be 
verified
      * 100 - the entire ledger fragment would be verified
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
index 0e16d69..cda9370 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
@@ -568,9 +568,11 @@ public abstract class AbstractZkLedgerManager implements 
LedgerManager, Watcher
                 MultiCallback mcb = new MultiCallback(zkActiveLedgers.size(), 
finalCb, ctx,
                                                       successRc, failureRc);
                 // start loop over all ledgers
-                for (Long ledger : zkActiveLedgers) {
-                    processor.process(ledger, mcb);
-                }
+                scheduler.submit(() -> {
+                    for (Long ledger : zkActiveLedgers) {
+                        processor.process(ledger, mcb);
+                    }
+                });
             }
         });
     }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
index 064c7fc..3220b3f 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
@@ -165,6 +165,8 @@ public class Auditor implements AutoCloseable {
     private final AtomicInteger 
numLedgersFoundHavingLessThanWQReplicasOfAnEntry;
     private final long underreplicatedLedgerRecoveryGracePeriod;
     private final int zkOpTimeoutMs;
+    private final Semaphore openLedgerNoRecoverySemaphore;
+    private final int openLedgerNoRecoverySemaphoreWaitTimeoutMSec;
 
     private final StatsLogger statsLogger;
     @StatsDoc(
@@ -348,6 +350,20 @@ public class Auditor implements AutoCloseable {
         this.numLedgersHavingLessThanWQReplicasOfAnEntryGuageValue = new 
AtomicInteger(0);
         this.numLedgersFoundHavingLessThanWQReplicasOfAnEntry = new 
AtomicInteger(0);
 
+        if (conf.getAuditorMaxNumberOfConcurrentOpenLedgerOperations() <= 0) {
+            LOG.error("auditorMaxNumberOfConcurrentOpenLedgerOperations should 
be greater than 0");
+            throw new 
UnavailableException("auditorMaxNumberOfConcurrentOpenLedgerOperations should 
be greater than 0");
+        }
+        this.openLedgerNoRecoverySemaphore = new 
Semaphore(conf.getAuditorMaxNumberOfConcurrentOpenLedgerOperations());
+
+        if (conf.getAuditorAcquireConcurrentOpenLedgerOperationsTimeoutMSec() 
< 0) {
+            LOG.error("auditorAcquireConcurrentOpenLedgerOperationsTimeoutMSec 
should be greater than or equal to 0");
+            throw new 
UnavailableException("auditorAcquireConcurrentOpenLedgerOperationsTimeoutMSec "
+                + "should be greater than or equal to 0");
+        }
+        this.openLedgerNoRecoverySemaphoreWaitTimeoutMSec =
+            conf.getAuditorAcquireConcurrentOpenLedgerOperationsTimeoutMSec();
+
         numUnderReplicatedLedger = 
this.statsLogger.getOpStatsLogger(ReplicationStats.NUM_UNDER_REPLICATED_LEDGERS);
         underReplicatedLedgerTotalSize = 
this.statsLogger.getOpStatsLogger(UNDER_REPLICATED_LEDGERS_TOTAL_SIZE);
         uRLPublishTimeForLostBookies = this.statsLogger
@@ -1205,12 +1221,32 @@ public class Auditor implements AutoCloseable {
     }
 
     /**
+     * Get BookKeeper client according to configuration.
+     * @param conf
+     * @return
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    BookKeeper getBookKeeper(ServerConfiguration conf) throws IOException, 
InterruptedException {
+        return createBookKeeperClient(conf);
+    }
+
+    /**
+     * Get BookKeeper admin according to bookKeeper client.
+     * @param bookKeeper
+     * @return
+     */
+    BookKeeperAdmin getBookKeeperAdmin(final BookKeeper bookKeeper) {
+        return new BookKeeperAdmin(bookKeeper, statsLogger);
+    }
+
+    /**
      * List all the ledgers and check them individually. This should not
      * be run very often.
      */
     void checkAllLedgers() throws BKException, IOException, 
InterruptedException, KeeperException {
-        final BookKeeper localClient = createBookKeeperClient(conf);
-        final BookKeeperAdmin localAdmin = new BookKeeperAdmin(localClient, 
statsLogger);
+        final BookKeeper localClient = getBookKeeper(conf);
+        final BookKeeperAdmin localAdmin = getBookKeeperAdmin(localClient);
 
         try {
             final LedgerChecker checker = new LedgerChecker(localClient);
@@ -1230,7 +1266,23 @@ public class Auditor implements AutoCloseable {
                     return;
                 }
 
+                try {
+                    if 
(!openLedgerNoRecoverySemaphore.tryAcquire(openLedgerNoRecoverySemaphoreWaitTimeoutMSec,
+                        TimeUnit.MILLISECONDS)) {
+                        LOG.warn("Failed to acquire semaphore for {} ms, 
ledgerId: {}",
+                            openLedgerNoRecoverySemaphoreWaitTimeoutMSec, 
ledgerId);
+                        FutureUtils.complete(processFuture, null);
+                        return;
+                    }
+                } catch (InterruptedException e) {
+                    LOG.error("Unable to acquire open ledger operation 
semaphore ", e);
+                    Thread.currentThread().interrupt();
+                    FutureUtils.complete(processFuture, null);
+                    return;
+                }
+
                 localAdmin.asyncOpenLedgerNoRecovery(ledgerId, (rc, lh, ctx) 
-> {
+                    openLedgerNoRecoverySemaphore.release();
                     if (Code.OK == rc) {
                         checker.checkLedger(lh,
                                 // the ledger handle will be closed after 
checkLedger is done.
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
index f47486c..c8c604d 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
@@ -25,6 +25,13 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import io.netty.buffer.ByteBuf;
 
@@ -51,6 +58,7 @@ import org.apache.bookkeeper.bookie.BookieAccessor;
 import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.bookie.BookieImpl;
 import org.apache.bookkeeper.bookie.IndexPersistenceMgr;
+import org.apache.bookkeeper.client.AsyncCallback;
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
@@ -76,6 +84,7 @@ import org.apache.zookeeper.KeeperException;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -366,6 +375,51 @@ public class AuditorPeriodicCheckTest extends 
BookKeeperClusterTestCase {
     }
 
     @Test
+    public void testGetLedgerFromZookeeperThrottled() throws Exception {
+        final int numberLedgers = 30;
+
+        // write ledgers into bookkeeper cluster
+        try {
+            for (AuditorElector e : auditorElectors.values()) {
+                e.shutdown();
+            }
+
+            for (int i = 0; i < numberLedgers; ++i) {
+                LedgerHandle lh = bkc.createLedger(3, 3, DigestType.CRC32, 
"passwd".getBytes());
+                for (int j = 0; j < 5; j++) {
+                    lh.addEntry("testdata".getBytes());
+                }
+                lh.close();
+            }
+        } catch (InterruptedException | BKException e) {
+            LOG.error("Failed to shutdown auditor elector or write data to 
ledgers ", e);
+            fail();
+        }
+
+        // create auditor and call `checkAllLedgers`
+        ServerConfiguration configuration = confByIndex(0);
+        configuration.setAuditorMaxNumberOfConcurrentOpenLedgerOperations(10);
+
+        Auditor auditor1 = new 
Auditor(BookieImpl.getBookieId(configuration).toString(),
+            configuration, NullStatsLogger.INSTANCE);
+        Auditor auditor = Mockito.spy(auditor1);
+
+        BookKeeper bookKeeper = 
Mockito.spy(auditor.getBookKeeper(configuration));
+        BookKeeperAdmin admin = 
Mockito.spy(auditor.getBookKeeperAdmin(bookKeeper));
+        when(auditor.getBookKeeper(configuration)).thenReturn(bookKeeper);
+        when(auditor.getBookKeeperAdmin(bookKeeper)).thenReturn(admin);
+
+        try {
+            auditor.checkAllLedgers();
+            verify(admin, 
times(numberLedgers)).asyncOpenLedgerNoRecovery(anyLong(),
+                any(AsyncCallback.OpenCallback.class), eq(null));
+        } catch (Exception e) {
+            LOG.error("Caught exception while checking all ledgers ", e);
+            fail();
+        }
+    }
+
+    @Test
     public void testInitialDelayOfCheckAllLedgers() throws Exception {
         for (AuditorElector e : auditorElectors.values()) {
             e.shutdown();
diff --git a/conf/bk_server.conf b/conf/bk_server.conf
index 53e4135..3941fe1 100755
--- a/conf/bk_server.conf
+++ b/conf/bk_server.conf
@@ -875,6 +875,13 @@ zkEnableSecurity=false
 # the provided digest type provided at `digestType` and the provided passwd 
provided at `passwd`.
 # enableDigestTypeAutodetection=true
 
+# Semaphore limit of getting ledger from zookeeper. Used to throttle the 
zookeeper client request operation
+# sending to Zookeeper server. Default value is 500
+# auditorMaxNumberOfConcurrentOpenLedgerOperations=500
+
+# Wait timeout of acquiring semaphore of concurrent open ledger operations. 
Default value is 120000ms.
+# auditorAcquireConcurrentOpenLedgerOperationsTimeOutMSec=120000
+
 #############################################################################
 ## Placement settings
 #############################################################################

Reply via email to