sijie closed pull request #1683: Enhance Auditor
URL: https://github.com/apache/bookkeeper/pull/1683
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/.travis.yml b/.travis.yml
index 56b6805467..c645809bf6 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -32,8 +32,9 @@ matrix:
osx_image: xcode9.2
- os: linux
env: CUSTOM_JDK="oraclejdk8"
- - os: linux
- env: CUSTOM_JDK="oraclejdk9"
+ # disabled oraclejdk9 since it has unknown failures on
https://github.com/apache/bookkeeper/pull/1683
+ #- os: linux
+ # env: CUSTOM_JDK="oraclejdk9"
- os: linux
env: CUSTOM_JDK="oraclejdk10"
diff --git a/bookkeeper-proto/src/main/proto/DataFormats.proto
b/bookkeeper-proto/src/main/proto/DataFormats.proto
index 92eaa5fd39..79d9b2f671 100644
--- a/bookkeeper-proto/src/main/proto/DataFormats.proto
+++ b/bookkeeper-proto/src/main/proto/DataFormats.proto
@@ -95,3 +95,10 @@ message LockDataFormat {
message AuditorVoteFormat {
optional string bookieId = 1;
}
+
+/**
+ * information of checkAllLedgers execution
+ */
+message CheckAllLedgersFormat {
+ optional int64 checkAllLedgersCTime = 1;
+}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java
index b5447beee2..304f1848b0 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java
@@ -168,6 +168,22 @@ boolean initializeLostBookieRecoveryDelay(int
lostBookieRecoveryDelay)
*/
int getLostBookieRecoveryDelay() throws
ReplicationException.UnavailableException;
+ /**
+ * Setter for the CheckAllLedgers last executed ctime.
+ *
+ * @param checkAllLedgersCTime
+ * @throws ReplicationException.UnavailableException
+ */
+ void setCheckAllLedgersCTime(long checkAllLedgersCTime) throws
ReplicationException.UnavailableException;
+
+ /**
+ * Getter for the CheckAllLedgers last executed ctime.
+ *
+ * @return the long value of checkAllLedgersCTime
+ * @throws ReplicationException.UnavailableException
+ */
+ long getCheckAllLedgersCTime() throws
ReplicationException.UnavailableException;
+
/**
* Receive notification asynchronously when the lostBookieRecoveryDelay
value is Changed.
*
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
index 32723706fd..c4e9b3045a 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
@@ -22,6 +22,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
+import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.TextFormat;
import com.google.protobuf.TextFormat.ParseException;
@@ -46,6 +47,7 @@
import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
import org.apache.bookkeeper.net.DNS;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.DataFormats.CheckAllLedgersFormat;
import org.apache.bookkeeper.proto.DataFormats.LedgerRereplicationLayoutFormat;
import org.apache.bookkeeper.proto.DataFormats.LockDataFormat;
import org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat;
@@ -115,6 +117,7 @@ int getLedgerZNodeVersion() {
private final String layoutZNode;
private final AbstractConfiguration conf;
private final String lostBookieRecoveryDelayZnode;
+ private final String checkAllLedgersCtimeZnode;
private final ZooKeeper zkc;
private final SubTreeCache subTreeCache;
@@ -127,7 +130,7 @@ public
ZkLedgerUnderreplicationManager(AbstractConfiguration conf, ZooKeeper zkc
+ BookKeeperConstants.DEFAULT_ZK_LEDGERS_ROOT_PATH;
urLockPath = basePath + '/' +
BookKeeperConstants.UNDER_REPLICATION_LOCK;
lostBookieRecoveryDelayZnode = basePath + '/' +
BookKeeperConstants.LOSTBOOKIERECOVERYDELAY_NODE;
-
+ checkAllLedgersCtimeZnode = basePath + '/' +
BookKeeperConstants.CHECK_ALL_LEDGERS_CTIME;
idExtractionPattern = Pattern.compile("urL(\\d+)$");
this.zkc = zkc;
this.subTreeCache = new SubTreeCache(new SubTreeCache.TreeProvider() {
@@ -884,4 +887,50 @@ public String
getReplicationWorkerIdRereplicatingLedger(long ledgerId)
}
return replicationWorkerId;
}
+
+ @Override
+ public void setCheckAllLedgersCTime(long checkAllLedgersCTime) throws
UnavailableException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("setCheckAllLedgersCTime");
+ }
+ try {
+ List<ACL> zkAcls = ZkUtils.getACLs(conf);
+ CheckAllLedgersFormat.Builder builder =
CheckAllLedgersFormat.newBuilder();
+ builder.setCheckAllLedgersCTime(checkAllLedgersCTime);
+ byte[] checkAllLedgersFormatByteArray =
builder.build().toByteArray();
+ if (zkc.exists(checkAllLedgersCtimeZnode, false) != null) {
+ zkc.setData(checkAllLedgersCtimeZnode,
checkAllLedgersFormatByteArray, -1);
+ } else {
+ zkc.create(checkAllLedgersCtimeZnode,
checkAllLedgersFormatByteArray, zkAcls, CreateMode.PERSISTENT);
+ }
+ } catch (KeeperException ke) {
+ throw new ReplicationException.UnavailableException("Error
contacting zookeeper", ke);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new ReplicationException.UnavailableException("Interrupted
while contacting zookeeper", ie);
+ }
+ }
+
+ @Override
+ public long getCheckAllLedgersCTime() throws UnavailableException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("setCheckAllLedgersCTime");
+ }
+ try {
+ byte[] data = zkc.getData(checkAllLedgersCtimeZnode, false, null);
+ CheckAllLedgersFormat checkAllLedgersFormat =
CheckAllLedgersFormat.parseFrom(data);
+ return checkAllLedgersFormat.hasCheckAllLedgersCTime() ?
checkAllLedgersFormat.getCheckAllLedgersCTime()
+ : -1;
+ } catch (KeeperException.NoNodeException ne) {
+ LOG.warn("checkAllLedgersCtimeZnode is not yet available");
+ return -1;
+ } catch (KeeperException ke) {
+ throw new ReplicationException.UnavailableException("Error
contacting zookeeper", ke);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new ReplicationException.UnavailableException("Interrupted
while contacting zookeeper", ie);
+ } catch (InvalidProtocolBufferException ipbe) {
+ throw new ReplicationException.UnavailableException("Error while
parsing ZK protobuf binary data", ipbe);
+ }
+ }
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
index c22889c87d..1fcd0f8887 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
@@ -93,6 +93,7 @@
private final OpStatsLogger uRLPublishTimeForLostBookies;
private final OpStatsLogger bookieToLedgersMapCreationTime;
private final OpStatsLogger checkAllLedgersTime;
+ private final OpStatsLogger auditBookiesTime;
private final Counter numLedgersChecked;
private final OpStatsLogger numFragmentsPerLedger;
private final OpStatsLogger numBookiesPerLedger;
@@ -154,6 +155,7 @@ public Auditor(final String bookieIdentifier,
bookieToLedgersMapCreationTime = this.statsLogger
.getOpStatsLogger(ReplicationStats.BOOKIE_TO_LEDGERS_MAP_CREATION_TIME);
checkAllLedgersTime =
this.statsLogger.getOpStatsLogger(ReplicationStats.CHECK_ALL_LEDGERS_TIME);
+ auditBookiesTime =
this.statsLogger.getOpStatsLogger(ReplicationStats.AUDIT_BOOKIES_TIME);
numLedgersChecked =
this.statsLogger.getCounter(ReplicationStats.NUM_LEDGERS_CHECKED);
numFragmentsPerLedger =
statsLogger.getOpStatsLogger(ReplicationStats.NUM_FRAGMENTS_PER_LEDGER);
numBookiesPerLedger =
statsLogger.getOpStatsLogger(ReplicationStats.NUM_BOOKIES_PER_LEDGER);
@@ -214,16 +216,19 @@ private void initialize(ServerConfiguration conf,
BookKeeper bkc)
private void submitShutdownTask() {
synchronized (this) {
+ LOG.info("Executing submitShutdownTask");
if (executor.isShutdown()) {
+ LOG.info("executor is already shutdown");
return;
}
executor.submit(new Runnable() {
- public void run() {
- synchronized (Auditor.this) {
- executor.shutdown();
- }
+ public void run() {
+ synchronized (Auditor.this) {
+ LOG.info("Shutting down Auditor's Executor");
+ executor.shutdown();
}
- });
+ }
+ });
}
}
@@ -381,46 +386,11 @@ public void start() {
return;
}
- long interval = conf.getAuditorPeriodicCheckInterval();
-
- if (interval > 0) {
- LOG.info("Auditor periodic ledger checking enabled"
- + " 'auditorPeriodicCheckInterval' {} seconds",
interval);
- executor.scheduleAtFixedRate(new Runnable() {
- public void run() {
- try {
- if
(!ledgerUnderreplicationManager.isLedgerReplicationEnabled()) {
- LOG.info("Ledger replication disabled,
skipping");
- return;
- }
-
- Stopwatch stopwatch =
Stopwatch.createStarted();
- checkAllLedgers();
-
checkAllLedgersTime.registerSuccessfulEvent(stopwatch.stop()
-
.elapsed(TimeUnit.MILLISECONDS),
- TimeUnit.MILLISECONDS);
- } catch (KeeperException ke) {
- LOG.error("Exception while running periodic
check", ke);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- LOG.error("Interrupted while running periodic
check", ie);
- } catch (BKException bke) {
- LOG.error("Exception running periodic check",
bke);
- } catch (IOException ioe) {
- LOG.error("I/O exception running periodic
check", ioe);
- } catch (ReplicationException.UnavailableException
ue) {
- LOG.error("Underreplication manager
unavailable running periodic check", ue);
- }
- }
- }, interval, interval, TimeUnit.SECONDS);
- } else {
- LOG.info("Periodic checking disabled");
- }
try {
watchBookieChanges();
knownBookies = getAvailableBookies();
} catch (BKException bke) {
- LOG.error("Couldn't get bookie list, exiting", bke);
+ LOG.error("Couldn't get bookie list, so exiting", bke);
submitShutdownTask();
}
@@ -428,7 +398,8 @@ public void run() {
this.ledgerUnderreplicationManager
.notifyLostBookieRecoveryDelayChanged(new
LostBookieRecoveryDelayChangedCb());
} catch (UnavailableException ue) {
- LOG.error("Exception while registering for
LostBookieRecoveryDelay change notification", ue);
+ LOG.error("Exception while registering for
LostBookieRecoveryDelay change notification, so exiting",
+ ue);
submitShutdownTask();
}
@@ -441,6 +412,71 @@ public void run() {
+ " 'auditorPeriodicBookieCheckInterval' {} seconds",
bookieCheckInterval);
executor.scheduleAtFixedRate(bookieCheck, 0,
bookieCheckInterval, TimeUnit.SECONDS);
}
+
+ long interval = conf.getAuditorPeriodicCheckInterval();
+
+ if (interval > 0) {
+ LOG.info("Auditor periodic ledger checking enabled" + "
'auditorPeriodicCheckInterval' {} seconds",
+ interval);
+
+ long checkAllLedgersLastExecutedCTime;
+ long durationSinceLastExecutionInSecs;
+ long initialDelay;
+ try {
+ checkAllLedgersLastExecutedCTime =
ledgerUnderreplicationManager.getCheckAllLedgersCTime();
+ } catch (UnavailableException ue) {
+ LOG.error("Got UnavailableException while trying to get
checkAllLedgersCTime", ue);
+ checkAllLedgersLastExecutedCTime = -1;
+ }
+ if (checkAllLedgersLastExecutedCTime == -1) {
+ durationSinceLastExecutionInSecs = -1;
+ initialDelay = 0;
+ } else {
+ durationSinceLastExecutionInSecs =
(System.currentTimeMillis() - checkAllLedgersLastExecutedCTime)
+ / 1000;
+ if (durationSinceLastExecutionInSecs < 0) {
+ // this can happen if there is no strict time ordering
+ durationSinceLastExecutionInSecs = 0;
+ }
+ initialDelay = durationSinceLastExecutionInSecs > interval
? 0
+ : (interval - durationSinceLastExecutionInSecs);
+ }
+ LOG.info(
+ "checkAllLedgers scheduling info.
checkAllLedgersLastExecutedCTime: {} "
+ + "durationSinceLastExecutionInSecs: {}
initialDelay: {} interval: {}",
+ checkAllLedgersLastExecutedCTime,
durationSinceLastExecutionInSecs, initialDelay, interval);
+
+ executor.scheduleAtFixedRate(new Runnable() {
+ public void run() {
+ try {
+ if
(!ledgerUnderreplicationManager.isLedgerReplicationEnabled()) {
+ LOG.info("Ledger replication disabled,
skipping checkAllLedgers");
+ return;
+ }
+
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ LOG.info("Starting checkAllLedgers");
+ checkAllLedgers();
+ long checkAllLedgersDuration =
stopwatch.stop().elapsed(TimeUnit.MILLISECONDS);
+ LOG.info("Completed checkAllLedgers in {}
milliSeconds", checkAllLedgersDuration);
+
checkAllLedgersTime.registerSuccessfulEvent(checkAllLedgersDuration,
TimeUnit.MILLISECONDS);
+ } catch (KeeperException ke) {
+ LOG.error("Exception while running periodic
check", ke);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ LOG.error("Interrupted while running periodic
check", ie);
+ } catch (BKException bke) {
+ LOG.error("Exception running periodic check", bke);
+ } catch (IOException ioe) {
+ LOG.error("I/O exception running periodic check",
ioe);
+ } catch (ReplicationException.UnavailableException ue)
{
+ LOG.error("Underreplication manager unavailable
running periodic check", ue);
+ }
+ }
+ }, initialDelay, interval, TimeUnit.SECONDS);
+ } else {
+ LOG.info("Periodic checking disabled");
+ }
}
}
@@ -519,7 +555,7 @@ private void auditBookies()
+ "Will retry after a period");
return;
}
-
+ LOG.info("Starting auditBookies");
Stopwatch stopwatch = Stopwatch.createStarted();
// put exit cases here
Map<String, Set<Long>> ledgerDetails = generateBookie2LedgersIndex();
@@ -551,10 +587,12 @@ private void auditBookies()
} catch (ReplicationException e) {
throw new BKAuditException(e.getMessage(), e.getCause());
}
-
uRLPublishTimeForLostBookies.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MILLISECONDS),
+
uRLPublishTimeForLostBookies.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MILLISECONDS),
TimeUnit.MILLISECONDS);
}
-
+ LOG.info("Completed auditBookies");
+
auditBookiesTime.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MILLISECONDS),
+ TimeUnit.MILLISECONDS);
}
private Map<String, Set<Long>> generateBookie2LedgersIndex()
@@ -691,6 +729,11 @@ void checkAllLedgers() throws BKException, IOException,
InterruptedException, Ke
}
}, null, BKException.Code.OK, BKException.Code.ReadException);
FutureUtils.result(processFuture, BKException.HANDLER);
+ try {
+
ledgerUnderreplicationManager.setCheckAllLedgersCTime(System.currentTimeMillis());
+ } catch (UnavailableException ue) {
+ LOG.error("Got exception while trying to set
checkAllLedgersCTime", ue);
+ }
} finally {
admin.close();
client.close();
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java
index b1afa816d0..eac30eca89 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java
@@ -33,6 +33,7 @@
String URL_PUBLISH_TIME_FOR_LOST_BOOKIE =
"URL_PUBLISH_TIME_FOR_LOST_BOOKIE";
String BOOKIE_TO_LEDGERS_MAP_CREATION_TIME =
"BOOKIE_TO_LEDGERS_MAP_CREATION_TIME";
String CHECK_ALL_LEDGERS_TIME = "CHECK_ALL_LEDGERS_TIME";
+ String AUDIT_BOOKIES_TIME = "AUDIT_BOOKIES_TIME";
String NUM_FRAGMENTS_PER_LEDGER = "NUM_FRAGMENTS_PER_LEDGER";
String NUM_BOOKIES_PER_LEDGER = "NUM_BOOKIES_PER_LEDGER";
String NUM_LEDGERS_CHECKED = "NUM_LEDGERS_CHECKED";
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java
index bd6801f940..8cfe9422fb 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java
@@ -42,6 +42,7 @@
public static final String UNDER_REPLICATION_LOCK = "locks";
public static final String DISABLE_NODE = "disable";
public static final String LOSTBOOKIERECOVERYDELAY_NODE =
"lostBookieRecoveryDelay";
+ public static final String CHECK_ALL_LEDGERS_CTIME =
"checkallledgersctime";
public static final String DEFAULT_ZK_LEDGERS_ROOT_PATH = "/ledgers";
public static final String LAYOUT_ZNODE = "LAYOUT";
public static final String INSTANCEID = "INSTANCEID";
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
index a09720b201..342c443add 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
@@ -129,6 +129,7 @@ public void setUp() throws Exception {
+ "/underreplication/auditorelection";
urLedgerMgr = new ZkLedgerUnderreplicationManager(baseClientConf, zkc);
+ urLedgerMgr.setCheckAllLedgersCTime(System.currentTimeMillis());
startAuditorElectors();
rng = new Random(System.currentTimeMillis()); // Initialize the Random
urLedgerList = new HashSet<Long>();
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
index 84788a3532..52580c56b1 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
@@ -20,17 +20,20 @@
*/
package org.apache.bookkeeper.replication;
+import static org.apache.bookkeeper.replication.ReplicationStats.AUDITOR_SCOPE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import io.netty.buffer.ByteBuf;
+
import java.io.File;
import java.io.FileOutputStream;
import java.io.FilenameFilter;
import java.io.IOException;
import java.net.URI;
+import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
@@ -38,14 +41,18 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.BookieAccessor;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.bookie.IndexPersistenceMgr;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.LedgerHandleAdapter;
@@ -56,8 +63,14 @@
import org.apache.bookkeeper.meta.MetadataDrivers;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import
org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.bookkeeper.test.TestStatsProvider;
+import org.apache.bookkeeper.test.TestStatsProvider.TestOpStatsLogger;
+import org.apache.bookkeeper.test.TestStatsProvider.TestStatsLogger;
+import org.apache.zookeeper.KeeperException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -207,7 +220,7 @@ public void testIndexCorruption() throws Exception {
out.close();
long underReplicatedLedger = -1;
- for (int i = 0; i < 10; i++) {
+ for (int i = 0; i < 15; i++) {
underReplicatedLedger =
underReplicationManager.pollLedgerToRereplicate();
if (underReplicatedLedger != -1) {
break;
@@ -351,6 +364,120 @@ public void run() {
}
}
+ @Test
+ public void testInitialDelayOfCheckAllLedgers() throws Exception {
+ for (AuditorElector e : auditorElectors.values()) {
+ e.shutdown();
+ }
+
+ final int numLedgers = 10;
+ List<Long> ids = new LinkedList<Long>();
+ for (int i = 0; i < numLedgers; i++) {
+ LedgerHandle lh = bkc.createLedger(3, 3, DigestType.CRC32,
"passwd".getBytes());
+ ids.add(lh.getId());
+ for (int j = 0; j < 2; j++) {
+ lh.addEntry("testdata".getBytes());
+ }
+ lh.close();
+ }
+
+ LedgerManagerFactory mFactory = driver.getLedgerManagerFactory();
+ LedgerUnderreplicationManager urm =
mFactory.newLedgerUnderreplicationManager();
+
+ ServerConfiguration servConf = new ServerConfiguration(bsConfs.get(0));
+ validateInitialDelayOfCheckAllLedgers(urm, -1, 1000, servConf, bkc);
+ validateInitialDelayOfCheckAllLedgers(urm, 999, 1000, servConf, bkc);
+ validateInitialDelayOfCheckAllLedgers(urm, 1001, 1000, servConf, bkc);
+ }
+
+ void validateInitialDelayOfCheckAllLedgers(LedgerUnderreplicationManager
urm, long timeSinceLastExecutedInSecs,
+ long auditorPeriodicCheckInterval, ServerConfiguration servConf,
BookKeeper bkc)
+ throws UnavailableException, UnknownHostException,
InterruptedException {
+ TestStatsProvider statsProvider = new TestStatsProvider();
+ TestStatsLogger statsLogger =
statsProvider.getStatsLogger(AUDITOR_SCOPE);
+ TestOpStatsLogger checkAllLedgersStatsLogger = (TestOpStatsLogger)
statsLogger
+ .getOpStatsLogger(ReplicationStats.CHECK_ALL_LEDGERS_TIME);
+ servConf.setAuditorPeriodicCheckInterval(auditorPeriodicCheckInterval);
+ final TestAuditor auditor = new
TestAuditor(Bookie.getBookieAddress(servConf).toString(), servConf, bkc, false,
+ statsLogger);
+ CountDownLatch latch = auditor.getLatch();
+ assertEquals("CHECK_ALL_LEDGERS_TIME SuccessCount", 0,
checkAllLedgersStatsLogger.getSuccessCount());
+ long curTimeBeforeStart = System.currentTimeMillis();
+ long checkAllLedgersCTime = -1;
+ long initialDelayInMsecs = -1;
+ long nextExpectedCheckAllLedgersExecutionTime = -1;
+ long bufferTimeInMsecs = 12000L;
+ if (timeSinceLastExecutedInSecs == -1) {
+ /*
+ * if we are setting checkAllLedgersCTime to -1, it means that
+ * checkAllLedgers hasn't run before. So initialDelay for
+ * checkAllLedgers should be 0.
+ */
+ checkAllLedgersCTime = -1;
+ initialDelayInMsecs = 0;
+ } else {
+ checkAllLedgersCTime = curTimeBeforeStart -
timeSinceLastExecutedInSecs * 1000L;
+ initialDelayInMsecs = timeSinceLastExecutedInSecs >
auditorPeriodicCheckInterval ? 0
+ : (auditorPeriodicCheckInterval -
timeSinceLastExecutedInSecs) * 1000L;
+ }
+ /*
+ * next checkAllLedgers should happen atleast after
+ * nextExpectedCheckAllLedgersExecutionTime.
+ */
+ nextExpectedCheckAllLedgersExecutionTime = curTimeBeforeStart +
initialDelayInMsecs;
+
+ urm.setCheckAllLedgersCTime(checkAllLedgersCTime);
+ auditor.start();
+ /*
+ * since auditorPeriodicCheckInterval are higher values (in the order
of
+ * 100s of seconds), its ok bufferTimeInMsecs to be ` 10 secs.
+ */
+ assertTrue("checkAllLedgers should have executed with initialDelay " +
initialDelayInMsecs,
+ latch.await(initialDelayInMsecs + bufferTimeInMsecs,
TimeUnit.MILLISECONDS));
+ for (int i = 0; i < 10; i++) {
+ Thread.sleep(100);
+ if (checkAllLedgersStatsLogger.getSuccessCount() >= 1) {
+ break;
+ }
+ }
+ assertEquals("CHECK_ALL_LEDGERS_TIME SuccessCount", 1,
checkAllLedgersStatsLogger.getSuccessCount());
+ long currentCheckAllLedgersCTime = urm.getCheckAllLedgersCTime();
+ assertTrue(
+ "currentCheckAllLedgersCTime: " + currentCheckAllLedgersCTime
+ + " should be greater than
nextExpectedCheckAllLedgersExecutionTime: "
+ + nextExpectedCheckAllLedgersExecutionTime,
+ currentCheckAllLedgersCTime >
nextExpectedCheckAllLedgersExecutionTime);
+ assertTrue(
+ "currentCheckAllLedgersCTime: " + currentCheckAllLedgersCTime
+ + " should be lesser than
nextExpectedCheckAllLedgersExecutionTime+bufferTimeInMsecs: "
+ + (nextExpectedCheckAllLedgersExecutionTime +
bufferTimeInMsecs),
+ currentCheckAllLedgersCTime <
(nextExpectedCheckAllLedgersExecutionTime + bufferTimeInMsecs));
+ auditor.close();
+ }
+
+ class TestAuditor extends Auditor {
+
+ final AtomicReference<CountDownLatch> latchRef = new
AtomicReference<CountDownLatch>(new CountDownLatch(1));
+
+ public TestAuditor(String bookieIdentifier, ServerConfiguration conf,
BookKeeper bkc, boolean ownBkc,
+ StatsLogger statsLogger) throws UnavailableException {
+ super(bookieIdentifier, conf, bkc, ownBkc, statsLogger);
+ }
+
+ void checkAllLedgers() throws BKException, IOException,
InterruptedException, KeeperException {
+ super.checkAllLedgers();
+ latchRef.get().countDown();
+ }
+
+ CountDownLatch getLatch() {
+ return latchRef.get();
+ }
+
+ void setLatch(CountDownLatch latch) {
+ latchRef.set(latch);
+ }
+ }
+
private BookieSocketAddress
replaceBookieWithWriteFailingBookie(LedgerHandle lh) throws Exception {
int bookieIdx = -1;
Long entryId =
LedgerHandleAdapter.getLedgerMetadata(lh).getEnsembles().firstKey();
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java
index db0e308eb4..2628578669 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java
@@ -43,6 +43,7 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import lombok.Cleanup;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.conf.TestBKConfiguration;
import org.apache.bookkeeper.meta.AbstractZkLedgerManagerFactory;
@@ -764,6 +765,21 @@ public void run() {
assertEquals("All hierarchies should be cleaned up", 0,
children.size());
}
+ @Test
+ public void testCheckAllLedgersCTime() throws Exception {
+ @Cleanup
+ LedgerUnderreplicationManager underReplicaMgr1 =
lmf1.newLedgerUnderreplicationManager();
+ @Cleanup
+ LedgerUnderreplicationManager underReplicaMgr2 =
lmf2.newLedgerUnderreplicationManager();
+ assertEquals(-1, underReplicaMgr1.getCheckAllLedgersCTime());
+ long curTime = System.currentTimeMillis();
+ underReplicaMgr2.setCheckAllLedgersCTime(curTime);
+ assertEquals(curTime, underReplicaMgr1.getCheckAllLedgersCTime());
+ curTime = System.currentTimeMillis();
+ underReplicaMgr2.setCheckAllLedgersCTime(curTime);
+ assertEquals(curTime, underReplicaMgr1.getCheckAllLedgersCTime());
+ }
+
private void verifyMarkLedgerUnderreplicated(Collection<String>
missingReplica)
throws KeeperException, InterruptedException, ReplicationException
{
Long ledgerA = 0xfeadeefdacL;
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services