This is an automated email from the ASF dual-hosted git repository.
reddycharan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 29eb420 BOOKKEEPER-1028 and BOOKKEEPER-1029
29eb420 is described below
commit 29eb420f1cca07c1a3a7748ed7fbb8a5bc54f2fa
Author: cguttapalem <[email protected]>
AuthorDate: Thu Jul 6 20:11:25 2017 -0700
BOOKKEEPER-1028 and BOOKKEEPER-1029
BOOKKEEPER-1028: inc/excl opts listunderreplicated
- Introduce including and excluding BookieId options
for listunderreplicatedLedgers
- But there is limitation that, since replicaslist wont be
updated in underreplicatedledger zNode there is possibility
of stale information
---------------------------------------------------------
BOOKKEEPER-1029: BookieDecommision Workflow
- LostBookieRecoveryDelay config param is stored in ZK
- if LostBookieRecoveryDelay is reset to same value then it force
triggers audit immediately
- Added logic to trigger immediately or schedule pending audittask
depending on the changed value in ZK
- good number of testcases validating focetrigger/reschedluing audittask
- added bookieshell command to get/set LostBookieRecoveryDelay from ZK
- added bookieshell command to triggeraudit by resetting
LostBookieRecoveryDelay
- added decommissionbookie bkshell command, which validates the
complete replication of ledgers stored in the bookie
Author: cguttapalem <[email protected]>
Author: Charan Reddy Guttapalem <[email protected]>
Reviewers: Enrico Olivelli <[email protected]>, Sijie Guo
<[email protected]>
This closes #127 from reddycharan/listunderreplicatedpredicate and squashes
the following commits:
34bacf3c [cguttapalem] BOOKKEEPER-1029: BookieDecommision Workflow
eb43ec49 [Charan Reddy Guttapalem] BOOKKEEPER-1029: BookieDecommision
Workflow
fcb399df [cguttapalem] BOOKKEEPER-1028: inc/excl opts listunderreplicated
---
bookkeeper-server/conf/log4j.shell.properties | 1 +
.../org/apache/bookkeeper/bookie/BookieShell.java | 176 +++++++++++-
.../apache/bookkeeper/client/BookKeeperAdmin.java | 309 +++++++++++++++++++++
.../apache/bookkeeper/client/LedgerChecker.java | 5 +
.../apache/bookkeeper/client/LedgerMetadata.java | 2 +-
.../meta/LedgerUnderreplicationManager.java | 52 +++-
.../meta/ZkLedgerUnderreplicationManager.java | 122 +++++++-
.../org/apache/bookkeeper/replication/Auditor.java | 144 ++++++++--
.../replication/ReplicationException.java | 2 +-
.../bookkeeper/util/BookKeeperConstants.java | 1 +
.../bookkeeper/client/BookKeeperAdminTest.java | 230 +++++++++++++++
.../replication/AuditorLedgerCheckerTest.java | 267 +++++++++++++++++-
12 files changed, 1262 insertions(+), 49 deletions(-)
diff --git a/bookkeeper-server/conf/log4j.shell.properties
b/bookkeeper-server/conf/log4j.shell.properties
index dcdc77c..58d6ea6 100644
--- a/bookkeeper-server/conf/log4j.shell.properties
+++ b/bookkeeper-server/conf/log4j.shell.properties
@@ -39,3 +39,4 @@ log4j.appender.CONSOLE.layout.ConversionPattern=%d{ABSOLUTE}
%-5p %m%n
log4j.logger.org.apache.zookeeper=ERROR
log4j.logger.org.apache.bookkeeper=ERROR
log4j.logger.org.apache.bookkeeper.bookie.BookieShell=INFO
+log4j.logger.org.apache.bookkeeper.client.BookKeeperAdmin=INFO
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index 3fc9d25..42d4de4 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -44,15 +44,16 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
import org.apache.bookkeeper.bookie.BookieException.InvalidCookieException;
import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner;
import org.apache.bookkeeper.bookie.Journal.JournalScanner;
import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.BookKeeperAdmin;
+import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.LedgerMetadata;
@@ -94,7 +95,6 @@ import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.AbstractFuture;
-
/**
* Bookie Shell is to provide utilities for users to administer a bookkeeper
cluster.
*/
@@ -126,6 +126,9 @@ public class BookieShell implements Tool {
static final String CMD_UPDATELEDGER = "updateledgers";
static final String CMD_DELETELEDGER = "deleteledger";
static final String CMD_BOOKIEINFO = "bookieinfo";
+ static final String CMD_DECOMMISSIONBOOKIE = "decommissionbookie";
+ static final String CMD_LOSTBOOKIERECOVERYDELAY =
"lostbookierecoverydelay";
+ static final String CMD_TRIGGERAUDIT = "triggeraudit";
static final String CMD_HELP = "help";
final ServerConfiguration bkConf = new ServerConfiguration();
@@ -498,6 +501,8 @@ public class BookieShell implements Tool {
public ListUnderreplicatedCmd() {
super(CMD_LISTUNDERREPLICATED);
+ opts.addOption("missingreplica", true, "Bookie Id of missing
replica");
+ opts.addOption("excludingmissingreplica", true, "Bookie Id of
missing replica to ignore");
}
@Override
@@ -507,16 +512,30 @@ public class BookieShell implements Tool {
@Override
String getDescription() {
- return "List ledgers marked as underreplicated";
+ return "List ledgers marked as underreplicated, with optional
options to specify missingreplica (BookieId) and to exclude missingreplica";
}
@Override
String getUsage() {
- return "listunderreplicated";
+ return "listunderreplicated [[-missingreplica <bookieaddress>]
[-excludingmissingreplica <bookieaddress>]]";
}
@Override
int runCmd(CommandLine cmdLine) throws Exception {
+
+ final String includingBookieId =
cmdLine.getOptionValue("missingreplica");
+ final String excludingBookieId =
cmdLine.getOptionValue("excludingmissingreplica");
+
+ Predicate<List<String>> predicate = null;
+ if (!StringUtils.isBlank(includingBookieId) &&
!StringUtils.isBlank(excludingBookieId)) {
+ predicate = replicasList ->
(replicasList.contains(includingBookieId)
+ && !replicasList.contains(excludingBookieId));
+ } else if (!StringUtils.isBlank(includingBookieId)) {
+ predicate = replicasList ->
replicasList.contains(includingBookieId);
+ } else if (!StringUtils.isBlank(excludingBookieId)) {
+ predicate = replicasList ->
!replicasList.contains(excludingBookieId);
+ }
+
ZooKeeper zk = null;
try {
zk = ZooKeeperClient.newBuilder()
@@ -525,7 +544,7 @@ public class BookieShell implements Tool {
.build();
LedgerManagerFactory mFactory =
LedgerManagerFactory.newLedgerManagerFactory(bkConf, zk);
LedgerUnderreplicationManager underreplicationManager =
mFactory.newLedgerUnderreplicationManager();
- Iterator<Long> iter =
underreplicationManager.listLedgersToRereplicate();
+ Iterator<Long> iter =
underreplicationManager.listLedgersToRereplicate(predicate);
while (iter.hasNext()) {
System.out.println(iter.next());
}
@@ -1327,6 +1346,65 @@ public class BookieShell implements Tool {
}
/**
+ * Setter and Getter for LostBookieRecoveryDelay value (in seconds) in
Zookeeper
+ */
+ class LostBookieRecoveryDelayCmd extends MyCommand {
+ Options opts = new Options();
+
+ public LostBookieRecoveryDelayCmd() {
+ super(CMD_LOSTBOOKIERECOVERYDELAY);
+ opts.addOption("g", "get", false, "Get LostBookieRecoveryDelay
value (in seconds)");
+ opts.addOption("s", "set", true, "Set LostBookieRecoveryDelay
value (in seconds)");
+ }
+
+ @Override
+ Options getOptions() {
+ return opts;
+ }
+
+ @Override
+ String getDescription() {
+ return "Setter and Getter for LostBookieRecoveryDelay value (in
seconds) in Zookeeper";
+ }
+
+ @Override
+ String getUsage() {
+ return "lostbookierecoverydelay [-get|-set <value>]";
+ }
+
+ @Override
+ int runCmd(CommandLine cmdLine) throws Exception {
+ boolean getter = cmdLine.hasOption("g");
+ boolean setter = cmdLine.hasOption("s");
+
+ if ((!getter && !setter) || (getter && setter)) {
+ LOG.error("One and only one of -get and -set must be
specified");
+ printUsage();
+ return 1;
+ }
+ ClientConfiguration adminConf = new ClientConfiguration(bkConf);
+ BookKeeperAdmin admin = new BookKeeperAdmin(adminConf);
+ try {
+ if (getter) {
+ int lostBookieRecoveryDelay =
admin.getLostBookieRecoveryDelay();
+ LOG.info("LostBookieRecoveryDelay value in ZK: {}",
String.valueOf(lostBookieRecoveryDelay));
+ } else {
+ int lostBookieRecoveryDelay =
Integer.parseInt(cmdLine.getOptionValue("set"));
+ admin.setLostBookieRecoveryDelay(lostBookieRecoveryDelay);
+ LOG.info("Successfully set LostBookieRecoveryDelay value
in ZK: {}",
+ String.valueOf(lostBookieRecoveryDelay));
+ }
+ } finally {
+ if (admin != null) {
+ admin.close();
+ }
+ }
+ return 0;
+ }
+ }
+
+
+ /**
* Print which node has the auditor lock
*/
class WhoIsAuditorCmd extends MyCommand {
@@ -1825,6 +1903,91 @@ public class BookieShell implements Tool {
}
/**
+ * Command to trigger AuditTask by resetting lostBookieRecoveryDelay to
its current value
+ */
+ class TriggerAuditCmd extends MyCommand {
+ Options opts = new Options();
+
+ TriggerAuditCmd() {
+ super(CMD_TRIGGERAUDIT);
+ }
+
+ @Override
+ String getDescription() {
+ return "Force trigger the Audit by resetting the
lostBookieRecoveryDelay";
+ }
+
+ @Override
+ String getUsage() {
+ return CMD_TRIGGERAUDIT;
+ }
+
+ @Override
+ Options getOptions() {
+ return opts;
+ }
+
+ @Override
+ public int runCmd(CommandLine cmdLine) throws Exception {
+ ClientConfiguration adminConf = new ClientConfiguration(bkConf);
+ BookKeeperAdmin admin = new BookKeeperAdmin(adminConf);
+ try {
+ admin.triggerAudit();
+ } finally {
+ if (admin != null) {
+ admin.close();
+ }
+ }
+ return 0;
+ }
+ }
+
+ /**
+ * Command to trigger AuditTask by resetting lostBookieRecoveryDelay and
then make sure the
+ * ledgers stored in the bookie are properly replicated.
+ */
+ class DecommissionBookieCmd extends MyCommand {
+ Options lOpts = new Options();
+
+ DecommissionBookieCmd() {
+ super(CMD_DECOMMISSIONBOOKIE);
+ }
+
+ @Override
+ String getDescription() {
+ return "Force trigger the Audittask and make sure all the ledgers
stored in the decommissioning bookie are replicated";
+ }
+
+ @Override
+ String getUsage() {
+ return CMD_DECOMMISSIONBOOKIE;
+ }
+
+ @Override
+ Options getOptions() {
+ return lOpts;
+ }
+
+ @Override
+ public int runCmd(CommandLine cmdLine) throws Exception {
+ ClientConfiguration adminConf = new ClientConfiguration(bkConf);
+ BookKeeperAdmin admin = new BookKeeperAdmin(adminConf);
+ try {
+ BookieSocketAddress thisBookieAddress =
Bookie.getBookieAddress(bkConf);
+ admin.decommissionBookie(thisBookieAddress);
+ return 0;
+ } catch (Exception e) {
+ LOG.error("Received exception in DecommissionBookieCmd ", e);
+ return -1;
+ } finally {
+ if (admin != null) {
+ admin.close();
+ }
+ }
+ }
+ }
+
+ /**
* A facility for reporting update ledger progress.
*/
public interface UpdateLedgerNotifier {
@@ -1855,7 +2018,10 @@ public class BookieShell implements Tool {
commands.put(CMD_UPDATELEDGER, new UpdateLedgerCmd());
commands.put(CMD_DELETELEDGER, new DeleteLedgerCmd());
commands.put(CMD_BOOKIEINFO, new BookieInfoCmd());
+ commands.put(CMD_DECOMMISSIONBOOKIE, new DecommissionBookieCmd());
commands.put(CMD_HELP, new HelpCmd());
+ commands.put(CMD_LOSTBOOKIERECOVERYDELAY, new
LostBookieRecoveryDelayCmd());
+ commands.put(CMD_TRIGGERAUDIT, new TriggerAuditCmd());
}
@Override
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
index 16d39ce..8cf8833 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
@@ -35,19 +35,35 @@ import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Random;
+import java.util.Set;
import java.util.UUID;
+import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Predicate;
+import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
import org.apache.bookkeeper.client.AsyncCallback.RecoverCallback;
import org.apache.bookkeeper.client.BookKeeper.SyncOpenCallback;
import
org.apache.bookkeeper.client.LedgerFragmentReplicator.SingleFragmentCallback;
import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.meta.LedgerManagerFactory;
+import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager;
import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.MultiCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
+import org.apache.bookkeeper.replication.AuditorElector;
+import org.apache.bookkeeper.replication.BookieLedgerIndexer;
+import org.apache.bookkeeper.replication.ReplicationException.BKAuditException;
+import
org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
+import
org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
import org.apache.bookkeeper.util.BookKeeperConstants;
import org.apache.bookkeeper.util.IOUtils;
import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
@@ -64,6 +80,8 @@ import org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.util.concurrent.AbstractFuture;
+
/**
* Admin client for BookKeeper clusters
*/
@@ -89,6 +107,14 @@ public class BookKeeperAdmin implements AutoCloseable {
*/
private Random rand = new Random();
+ private LedgerManagerFactory mFactory;
+
+ /*
+ * underreplicationManager is not initialized as part of constructor use
its
+ * getter (getUnderreplicationManager) so that it can be lazy-initialized
+ */
+ private LedgerUnderreplicationManager underreplicationManager;
+
/**
* Constructor that takes in a ZooKeeper servers connect string so we know
* how to connect to ZooKeeper to retrieve information about the BookKeeper
@@ -144,6 +170,7 @@ public class BookKeeperAdmin implements AutoCloseable {
bkc = new BookKeeper(conf, zk);
ownsBK = true;
this.lfr = new LedgerFragmentReplicator(bkc, NullStatsLogger.INSTANCE);
+ this.mFactory = bkc.ledgerManagerFactory;
}
/**
@@ -162,6 +189,7 @@ public class BookKeeperAdmin implements AutoCloseable {
ownsZK = false;
this.bookiesPath = bkc.getConf().getZkAvailableBookiesPath();
this.lfr = new LedgerFragmentReplicator(bkc, statsLogger);
+ this.mFactory = bkc.ledgerManagerFactory;
}
public BookKeeperAdmin(final BookKeeper bkc) {
@@ -1064,4 +1092,285 @@ public class BookKeeperAdmin implements AutoCloseable {
public LedgerMetadata getLedgerMetadata(LedgerHandle lh) {
return lh.getLedgerMetadata();
}
+
+ private LedgerUnderreplicationManager getUnderreplicationManager()
+ throws CompatibilityException, KeeperException,
InterruptedException {
+ if (underreplicationManager == null) {
+ underreplicationManager =
mFactory.newLedgerUnderreplicationManager();
+ }
+ return underreplicationManager;
+ }
+
+ /**
+ * Setter for LostBookieRecoveryDelay value (in seconds) in Zookeeper
+ *
+ * @param lostBookieRecoveryDelay
+ * lostBookieRecoveryDelay value (in seconds)
to set
+ * @throws CompatibilityException
+ * @throws KeeperException
+ * @throws InterruptedException
+ * @throws UnavailableException
+ */
+ public void setLostBookieRecoveryDelay(int lostBookieRecoveryDelay)
+ throws CompatibilityException, KeeperException,
InterruptedException, UnavailableException {
+ LedgerUnderreplicationManager urlManager =
getUnderreplicationManager();
+ urlManager.setLostBookieRecoveryDelay(lostBookieRecoveryDelay);
+ }
+
+ /**
+ * returns the current LostBookieRecoveryDelay value (in seconds) in
Zookeeper
+ *
+ * @return
+ * current lostBookieRecoveryDelay value (in seconds)
+ * @throws CompatibilityException
+ * @throws KeeperException
+ * @throws InterruptedException
+ * @throws UnavailableException
+ */
+ public int getLostBookieRecoveryDelay()
+ throws CompatibilityException, KeeperException,
InterruptedException, UnavailableException {
+ LedgerUnderreplicationManager urlManager =
getUnderreplicationManager();
+ return urlManager.getLostBookieRecoveryDelay();
+ }
+
+ /**
+ * trigger AuditTask by resetting lostBookieRecoveryDelay to its current
+ * value. If Autorecovery is not enabled or if there is no Auditor then
this
+ * method will throw UnavailableException.
+ *
+ * @throws CompatibilityException
+ * @throws KeeperException
+ * @throws InterruptedException
+ * @throws UnavailableException
+ * @throws IOException
+ */
+ public void triggerAudit()
+ throws CompatibilityException, KeeperException,
InterruptedException, UnavailableException, IOException {
+ LedgerUnderreplicationManager urlManager =
getUnderreplicationManager();
+ if (!urlManager.isLedgerReplicationEnabled()) {
+ LOG.error("Autorecovery is disabled. So giving up!");
+ throw new UnavailableException("Autorecovery is disabled. So
giving up!");
+ }
+
+ BookieSocketAddress auditorId = AuditorElector.getCurrentAuditor(new
ServerConfiguration(bkc.conf), zk);
+ if (auditorId == null) {
+ LOG.error("No auditor elected, though Autorecovery is enabled. So
giving up.");
+ throw new UnavailableException("No auditor elected, though
Autorecovery is enabled. So giving up.");
+ }
+
+ int previousLostBookieRecoveryDelayValue =
urlManager.getLostBookieRecoveryDelay();
+ LOG.info("Resetting LostBookieRecoveryDelay value: {}, to kickstart
audit task",
+ previousLostBookieRecoveryDelayValue);
+
urlManager.setLostBookieRecoveryDelay(previousLostBookieRecoveryDelayValue);
+ }
+
+ /**
+ * Triggers AuditTask by resetting lostBookieRecoveryDelay and then make
+ * sure the ledgers stored in the given decommissioning bookie are properly
+ * replicated and they are not underreplicated because of the given bookie.
+ * This method waits untill there are no underreplicatedledgers because of
this
+ * bookie. If the given Bookie is not shutdown yet, then it will throw
+ * BKIllegalOpException.
+ *
+ * @param bookieAddress
+ * address of the decommissioning bookie
+ * @throws CompatibilityException
+ * @throws UnavailableException
+ * @throws KeeperException
+ * @throws InterruptedException
+ * @throws IOException
+ * @throws BKAuditException
+ * @throws TimeoutException
+ * @throws BKException
+ */
+ public void decommissionBookie(BookieSocketAddress bookieAddress)
+ throws CompatibilityException, UnavailableException,
KeeperException, InterruptedException, IOException,
+ BKAuditException, TimeoutException, BKException {
+ if (getAvailableBookies().contains(bookieAddress) ||
getReadOnlyBookies().contains(bookieAddress)) {
+ LOG.error("Bookie: {} is not shutdown yet", bookieAddress);
+ throw BKException.create(BKException.Code.IllegalOpException);
+ }
+
+ triggerAudit();
+
+ /*
+ * Sleep for 30 secs, so that Auditor gets chance to trigger its
+ * force audittask and let the underreplicationmanager process
+ * to do its replication process
+ */
+ Thread.sleep(30 * 1000);
+
+ /*
+ * get the collection of the ledgers which are stored in this
+ * bookie, by making a call to
+ * bookieLedgerIndexer.getBookieToLedgerIndex.
+ */
+
+ BookieLedgerIndexer bookieLedgerIndexer = new
BookieLedgerIndexer(bkc.ledgerManager);
+ Map<String, Set<Long>> bookieToLedgersMap =
bookieLedgerIndexer.getBookieToLedgerIndex();
+ Set<Long> ledgersStoredInThisBookie =
bookieToLedgersMap.get(bookieAddress.toString());
+ if ((ledgersStoredInThisBookie != null) &&
(!ledgersStoredInThisBookie.isEmpty())) {
+ /*
+ * wait untill all the ledgers are replicated to other
+ * bookies by making sure that these ledgers metadata don't
+ * contain this bookie as part of their ensemble.
+ */
+ waitForLedgersToBeReplicated(ledgersStoredInThisBookie,
bookieAddress, bkc.ledgerManager);
+ }
+
+ // for double-checking, check if any ledgers are listed as
underreplicated because of this bookie
+ Predicate<List<String>> predicate = replicasList ->
replicasList.contains(bookieAddress.toString());
+ Iterator<Long> urLedgerIterator =
underreplicationManager.listLedgersToRereplicate(predicate);
+ if (urLedgerIterator.hasNext()) {
+ //if there are any then wait and make sure those ledgers are
replicated properly
+ LOG.info("Still in some underreplicated ledgers metadata, this
bookie is part of its ensemble. "
+ + "Have to make sure that those ledger fragments are
rereplicated");
+ List<Long> urLedgers = new ArrayList<>();
+ urLedgerIterator.forEachRemaining(urLedgers::add);
+ waitForLedgersToBeReplicated(urLedgers, bookieAddress,
bkc.ledgerManager);
+ }
+ }
+
+ private void waitForLedgersToBeReplicated(Collection<Long> ledgers,
BookieSocketAddress thisBookieAddress,
+ LedgerManager ledgerManager) throws InterruptedException,
TimeoutException {
+ int maxSleepTimeInBetweenChecks = 10 * 60 * 1000; // 10 minutes
+ int sleepTimePerLedger = 10 * 1000; // 10 secs
+ Predicate<Long> validateBookieIsNotPartOfEnsemble = ledgerId ->
!areEntriesOfLedgerStoredInTheBookie(ledgerId,
+ thisBookieAddress, ledgerManager);
+ while (!ledgers.isEmpty()) {
+ LOG.info("Count of Ledgers which need to be rereplicated: {}",
ledgers.size());
+ int sleepTimeForThisCheck = ledgers.size() * sleepTimePerLedger >
maxSleepTimeInBetweenChecks
+ ? maxSleepTimeInBetweenChecks : ledgers.size() *
sleepTimePerLedger;
+ Thread.sleep(sleepTimeForThisCheck);
+ LOG.debug("Making sure following ledgers replication to be
completed: {}", ledgers);
+ ledgers.removeIf(validateBookieIsNotPartOfEnsemble);
+ }
+ }
+
+ private boolean areEntriesOfLedgerStoredInTheBookie(long ledgerId,
BookieSocketAddress bookieAddress,
+ LedgerManager ledgerManager) {
+ ReadMetadataCallback cb = new ReadMetadataCallback(ledgerId);
+ ledgerManager.readLedgerMetadata(ledgerId, cb);
+ try {
+ LedgerMetadata ledgerMetadata = cb.get();
+ Collection<ArrayList<BookieSocketAddress>> ensemblesOfSegments =
ledgerMetadata.getEnsembles().values();
+ Iterator<ArrayList<BookieSocketAddress>>
ensemblesOfSegmentsIterator = ensemblesOfSegments.iterator();
+ ArrayList<BookieSocketAddress> ensemble;
+ int segmentNo = 0;
+ while (ensemblesOfSegmentsIterator.hasNext()) {
+ ensemble = ensemblesOfSegmentsIterator.next();
+ if (ensemble.contains(bookieAddress)) {
+ if (areEntriesOfSegmentStoredInTheBookie(ledgerMetadata,
bookieAddress, segmentNo++)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ } catch (InterruptedException | ExecutionException e) {
+ if (e.getCause() != null
+ &&
e.getCause().getClass().equals(BKException.BKNoSuchLedgerExistsException.class))
{
+ LOG.debug("Ledger: {} has been deleted", ledgerId);
+ return false;
+ } else {
+ LOG.error("Got exception while trying to read LedgerMeatadata
of " + ledgerId, e);
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private boolean areEntriesOfSegmentStoredInTheBookie(LedgerMetadata
ledgerMetadata,
+ BookieSocketAddress bookieAddress, int segmentNo) {
+ boolean isLedgerClosed = ledgerMetadata.isClosed();
+ int ensembleSize = ledgerMetadata.getEnsembleSize();
+ int writeQuorumSize = ledgerMetadata.getWriteQuorumSize();
+
+ List<Entry<Long, ArrayList<BookieSocketAddress>>> segments = new
LinkedList<Entry<Long, ArrayList<BookieSocketAddress>>>(
+ ledgerMetadata.getEnsembles().entrySet());
+
+ boolean lastSegment = (segmentNo == (segments.size() - 1));
+
+ /*
+ * Checking the last segment of the ledger can be complicated in
+ * some cases. In the case that the ledger is closed, we can just
+ * check the fragments of the segment as normal, except in the case
+ * that no entry was ever written, to the ledger, in which case we
+ * check no fragments.
+ *
+ * Following the same approach as in LedgerChecker.checkLedger
+ */
+ if (lastSegment && isLedgerClosed && (ledgerMetadata.getLastEntryId()
< segments.get(segmentNo).getKey())) {
+ return false;
+ }
+
+ /*
+ * if ensembleSize is equal to writeQuorumSize, then ofcourse all
+ * the entries of this segment are supposed to be stored in this
+ * bookie. If this is last segment of the ledger and if the ledger
+ * is not closed (this is a corner case), then we have to return
+ * true. For more info. Check BOOKKEEPER-237 and BOOKKEEPER-325.
+ */
+ if ((lastSegment && !isLedgerClosed) || (ensembleSize ==
writeQuorumSize)) {
+ return true;
+ }
+
+ /*
+ * the following check is required because ensembleSize can be
+ * greater than writeQuorumSize and in this case if there are only
+ * couple of entries then based on RoundRobinDistributionSchedule
+ * there might not be any entry copy in this bookie though this
+ * bookie is part of the ensemble of this segment. If no entry is
+ * stored in this bookie then we should return false, because
+ * ReplicationWorker wont take care of fixing the ledgerMetadata of
+ * this segment in this case.
+ *
+ * if ensembleSize > writeQuorumSize, then in LedgerFragment.java
+ * firstEntryID may not be equal to firstStoredEntryId lastEntryId
+ * may not be equalto lastStoredEntryId. firstStoredEntryId and
+ * lastStoredEntryId will be LedgerHandle.INVALID_ENTRY_ID, if no
+ * entry of this segment stored in this bookie. In this case
+ * LedgerChecker.verifyLedgerFragment will not consider it as
+ * unavailable/bad fragment though this bookie is part of the
+ * ensemble of the segment and it is down.
+ */
+ DistributionSchedule distributionSchedule = new
RoundRobinDistributionSchedule(
+ ledgerMetadata.getWriteQuorumSize(),
ledgerMetadata.getAckQuorumSize(),
+ ledgerMetadata.getEnsembleSize());
+ ArrayList<BookieSocketAddress> currentSegmentEnsemble =
segments.get(segmentNo).getValue();
+ int thisBookieIndexInCurrentEnsemble =
currentSegmentEnsemble.indexOf(bookieAddress);
+ long firstEntryId = segments.get(segmentNo).getKey();
+ long lastEntryId = lastSegment ? ledgerMetadata.getLastEntryId() :
segments.get(segmentNo + 1).getKey() - 1;
+ long firstStoredEntryId = LedgerHandle.INVALID_ENTRY_ID;
+ long firstEntryIter = firstEntryId;
+ // following the same approach followed in
LedgerFragment.getFirstStoredEntryId()
+ for (int i = 0; i < ensembleSize && firstEntryIter <= lastEntryId;
i++) {
+ if (distributionSchedule.hasEntry(firstEntryIter,
thisBookieIndexInCurrentEnsemble)) {
+ firstStoredEntryId = firstEntryIter;
+ break;
+ } else {
+ firstEntryIter++;
+ }
+ }
+ return firstStoredEntryId != LedgerHandle.INVALID_ENTRY_ID;
+ }
+
+ static class ReadMetadataCallback extends AbstractFuture<LedgerMetadata>
+ implements GenericCallback<LedgerMetadata> {
+ final long ledgerId;
+
+ ReadMetadataCallback(long ledgerId) {
+ this.ledgerId = ledgerId;
+ }
+
+ long getLedgerId() {
+ return ledgerId;
+ }
+
+ public void operationComplete(int rc, LedgerMetadata result) {
+ if (rc != 0) {
+ setException(BKException.create(rc));
+ } else {
+ set(result);
+ }
+ }
+ }
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
index 4266c90..7f58131 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
@@ -88,6 +88,11 @@ public class LedgerChecker {
long firstStored = fragment.getFirstStoredEntryId();
long lastStored = fragment.getLastStoredEntryId();
+ // because of this if block, even if the bookie of the fragment is
+ // down, it considers Fragment is available/not-bad if firstStored
+ // and lastStored are LedgerHandle.INVALID_ENTRY_ID.
+ // So same logic is used in
BookieShell.DecommissionBookieCmd.areEntriesOfSegmentStoredInTheBookie
+ // if any change is made here, then the changes should be in
BookieShell also
if (firstStored == LedgerHandle.INVALID_ENTRY_ID) {
if (lastStored != LedgerHandle.INVALID_ENTRY_ID) {
throw new InvalidFragmentException();
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
index 2641de9..92fc63e 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
@@ -241,7 +241,7 @@ public class LedgerMetadata {
state = LedgerMetadataFormat.State.CLOSED;
}
- void addEnsemble(long startEntryId, ArrayList<BookieSocketAddress>
ensemble) {
+ public void addEnsemble(long startEntryId, ArrayList<BookieSocketAddress>
ensemble) {
assert ensembles.isEmpty() || startEntryId >= ensembles.lastKey();
ensembles.put(startEntryId, ensemble);
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java
index d006895..014c1a8 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java
@@ -21,6 +21,8 @@ import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.replication.ReplicationException;
import java.util.Iterator;
+import java.util.List;
+import java.util.function.Predicate;
/**
* Interface for marking ledgers which need to be rereplicated
@@ -42,11 +44,18 @@ public interface LedgerUnderreplicationManager {
/**
* Get a list of all the ledgers which have been
- * marked for rereplication.
- *
+ * marked for rereplication, filtered by the predicate on the missing
replicas list.
+ *
+ * Missing replicas list of an underreplicated ledger is the list of the
bookies which are part of
+ * the ensemble of this ledger and are currently unavailable/down.
+ *
+ * If filtering is not needed then it is suggested to pass null for
predicate,
+ * otherwise it will read the content of the ZNode to decide on filtering.
+ *
+ * @param predicate filter to use while listing under replicated ledgers.
'null' if filtering is not required
* @return an iterator which returns ledger ids
*/
- Iterator<Long> listLedgersToRereplicate();
+ Iterator<Long> listLedgersToRereplicate(Predicate<List<String>> predicate);
/**
* Acquire a underreplicated ledger for rereplication. The ledger
@@ -116,4 +125,41 @@ public interface LedgerUnderreplicationManager {
*/
void notifyLedgerReplicationEnabled(GenericCallback<Void> cb)
throws ReplicationException.UnavailableException;
+
+ /**
+ * Creates the zNode for lostBookieRecoveryDelay with the specified value
and returns true.
+ * If the node is already existing, then it returns false
+ *
+ * @param lostBookieRecoveryDelay
+ * @return
+ * true if it succeeds in creating zNode for lostBookieRecoveryDelay,
false if it is already existing
+ * @throws ReplicationException.UnavailableException
+ */
+ boolean initializeLostBookieRecoveryDelay(int lostBookieRecoveryDelay)
+ throws ReplicationException.UnavailableException;
+
+ /**
+ * Setter for the lostBookieRecoveryDelay znode
+ *
+ * @param lostBookieRecoveryDelay
+ * @throws ReplicationException.UnavailableException
+ */
+ void setLostBookieRecoveryDelay(int lostBookieRecoveryDelay) throws
ReplicationException.UnavailableException;
+
+ /**
+ * Getter for the lostBookieRecoveryDelay
+ *
+ * @return the int value of lostBookieRecoveryDelay
+ * @throws ReplicationException.UnavailableException
+ */
+ int getLostBookieRecoveryDelay() throws
ReplicationException.UnavailableException;
+
+ /**
+ * Receive notification asynchronously when the lostBookieRecoveryDelay
value is Changed
+ *
+ * @param cb
+ * @throws ReplicationException.UnavailableException
+ */
+ void notifyLostBookieRecoveryDelayChanged(GenericCallback<Void> cb)
+ throws ReplicationException.UnavailableException;
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
index e9d48f3..e56ee30 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
@@ -30,6 +30,7 @@ import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
+import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -49,7 +50,9 @@ import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,7 +60,6 @@ import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.protobuf.TextFormat;
-import org.apache.zookeeper.data.ACL;
/**
* ZooKeeper implementation of underreplication manager.
@@ -101,6 +103,7 @@ public class ZkLedgerUnderreplicationManager implements
LedgerUnderreplicationMa
private final String urLockPath;
private final String layoutZNode;
private final AbstractConfiguration conf;
+ private final String lostBookieRecoveryDelayZnode;
private final ZooKeeper zkc;
private final SubTreeCache subTreeCache;
@@ -112,7 +115,8 @@ public class ZkLedgerUnderreplicationManager implements
LedgerUnderreplicationMa
urLedgerPath = basePath
+ BookKeeperConstants.DEFAULT_ZK_LEDGERS_ROOT_PATH;
urLockPath = basePath + '/' +
BookKeeperConstants.UNDER_REPLICATION_LOCK;
-
+ lostBookieRecoveryDelayZnode = basePath + '/' +
BookKeeperConstants.LOSTBOOKIERECOVERYDELAY_NODE;
+
idExtractionPattern = Pattern.compile("urL(\\d+)$");
this.zkc = zkc;
this.subTreeCache = new SubTreeCache(new SubTreeCache.TreeProvider() {
@@ -341,8 +345,21 @@ public class ZkLedgerUnderreplicationManager implements
LedgerUnderreplicationMa
}
}
+ /**
+ * Get a list of all the ledgers which have been
+ * marked for rereplication, filtered by the predicate on the replicas
list.
+ *
+ * Replicas list of an underreplicated ledger is the list of the bookies
which are part of
+ * the ensemble of this ledger and are currently unavailable/down.
+ *
+ * If filtering is not needed then it is suggested to pass null for
predicate,
+ * otherwise it will read the content of the ZNode to decide on filtering.
+ *
+ * @param predicate filter to use while listing under replicated ledgers.
'null' if filtering is not required.
+ * @return an iterator which returns ledger ids
+ */
@Override
- public Iterator<Long> listLedgersToRereplicate() {
+ public Iterator<Long> listLedgersToRereplicate(final
Predicate<List<String>> predicate) {
final Queue<String> queue = new LinkedList<String>();
queue.add(urLedgerPath);
@@ -363,12 +380,20 @@ public class ZkLedgerUnderreplicationManager implements
LedgerUnderreplicationMa
while (queue.size() > 0 && curBatch.size() == 0) {
String parent = queue.remove();
try {
- for (String c : zkc.getChildren(parent,false)) {
- String child = parent + "/" + c;
- if (c.startsWith("urL")) {
- curBatch.add(getLedgerId(child));
- } else {
- queue.add(child);
+ for (String c : zkc.getChildren(parent, false)) {
+ try {
+ String child = parent + "/" + c;
+ if (c.startsWith("urL")) {
+ long ledgerId = getLedgerId(child);
+ if ((predicate == null)
+ ||
predicate.test(getLedgerUnreplicationInfo(ledgerId).getReplicaList())) {
+ curBatch.add(ledgerId);
+ }
+ } else {
+ queue.add(child);
+ }
+ } catch (KeeperException.NoNodeException nne) {
+ // ignore
}
}
} catch (InterruptedException ie) {
@@ -694,4 +719,83 @@ public class ZkLedgerUnderreplicationManager implements
LedgerUnderreplicationMa
zkc.delete(getUrLedgerLockZnode(getUrLockPath(zkLedgersRootPath),
ledgerId), -1);
}
}
+
+ @Override
+ public boolean initializeLostBookieRecoveryDelay(int
lostBookieRecoveryDelay) throws UnavailableException {
+ LOG.debug("initializeLostBookieRecoveryDelay()");
+ try {
+ zkc.create(lostBookieRecoveryDelayZnode,
Integer.toString(lostBookieRecoveryDelay).getBytes(UTF_8),
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ } catch (KeeperException.NodeExistsException ke) {
+ LOG.info(
+ "lostBookieRecoveryDelay Znode is already present, so
using existing lostBookieRecoveryDelay Znode value");
+ return false;
+ } catch (KeeperException ke) {
+ LOG.error("Error while initializing LostBookieRecoveryDelay", ke);
+ throw new ReplicationException.UnavailableException("Error
contacting zookeeper", ke);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new ReplicationException.UnavailableException("Interrupted
while contacting zookeeper", ie);
+ }
+ return true;
+ }
+
+ @Override
+ public void setLostBookieRecoveryDelay(int lostBookieRecoveryDelay) throws
UnavailableException {
+ LOG.debug("setLostBookieRecoveryDelay()");
+ try {
+ if (zkc.exists(lostBookieRecoveryDelayZnode, false) != null) {
+ zkc.setData(lostBookieRecoveryDelayZnode,
Integer.toString(lostBookieRecoveryDelay).getBytes(UTF_8),
+ -1);
+ } else {
+ zkc.create(lostBookieRecoveryDelayZnode,
Integer.toString(lostBookieRecoveryDelay).getBytes(UTF_8),
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ }
+ } catch (KeeperException ke) {
+ LOG.error("Error while setting LostBookieRecoveryDelay ", ke);
+ throw new ReplicationException.UnavailableException("Error
contacting zookeeper", ke);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new ReplicationException.UnavailableException("Interrupted
while contacting zookeeper", ie);
+ }
+ }
+
+ @Override
+ public int getLostBookieRecoveryDelay() throws UnavailableException {
+ LOG.debug("getLostBookieRecoveryDelay()");
+ try {
+ byte[] data = zkc.getData(lostBookieRecoveryDelayZnode, false,
null);
+ return Integer.parseInt(new String(data, UTF_8));
+ } catch (KeeperException ke) {
+ LOG.error("Error while getting LostBookieRecoveryDelay ", ke);
+ throw new ReplicationException.UnavailableException("Error
contacting zookeeper", ke);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new ReplicationException.UnavailableException("Interrupted
while contacting zookeeper", ie);
+ }
+ }
+
+ @Override
+ public void notifyLostBookieRecoveryDelayChanged(GenericCallback<Void> cb)
throws UnavailableException {
+ LOG.debug("notifyLostBookieRecoveryDelayChanged()");
+ Watcher w = new Watcher() {
+ public void process(WatchedEvent e) {
+ if (e.getType() == Watcher.Event.EventType.NodeDataChanged) {
+ cb.operationComplete(0, null);
+ }
+ }
+ };
+ try {
+ if (null == zkc.exists(lostBookieRecoveryDelayZnode, w)) {
+ cb.operationComplete(0, null);
+ return;
+ }
+ } catch (KeeperException ke) {
+ LOG.error("Error while checking the state of
lostBookieRecoveryDelay", ke);
+ throw new ReplicationException.UnavailableException("Error
contacting zookeeper", ke);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new ReplicationException.UnavailableException("Interrupted
while contacting zookeeper", ie);
+ }
+ }
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
index 3df941f..054e09d 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
@@ -20,8 +20,19 @@
*/
package org.apache.bookkeeper.replication;
-import com.google.common.base.Stopwatch;
-import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
@@ -41,35 +52,21 @@ import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
import org.apache.bookkeeper.replication.ReplicationException.BKAuditException;
import
org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
import
org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
-import org.apache.bookkeeper.replication.ReplicationStats;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
import org.apache.commons.collections.CollectionUtils;
import org.apache.zookeeper.AsyncCallback;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.SettableFuture;
-
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Future;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.SettableFuture;
/**
* Auditor is a single entity in the entire Bookie cluster and will be watching
@@ -101,7 +98,8 @@ public class Auditor implements BookiesListener {
private final Counter numDelayedBookieAuditsCancelled;
private volatile Future<?> auditTask;
private Set<String> bookiesToBeAudited = Sets.newHashSet();
-
+ private volatile int lostBookieRecoveryDelayBeforeChange;
+
public Auditor(final String bookieIdentifier, ServerConfiguration conf,
ZooKeeper zkc, StatsLogger statsLogger) throws
UnavailableException {
this.conf = conf;
@@ -149,6 +147,15 @@ public class Auditor implements BookiesListener {
LOG.info("AuthProvider used by the Auditor is
"+clientConfiguration.getClientAuthProviderFactoryClass());
this.bkc = new BookKeeper(clientConfiguration, zkc);
this.admin = new BookKeeperAdmin(bkc, statsLogger);
+ if (this.ledgerUnderreplicationManager
+
.initializeLostBookieRecoveryDelay(conf.getLostBookieRecoveryDelay())) {
+ LOG.info("Initializing lostBookieRecoveryDelay zNode to the
conif value: {}",
+ conf.getLostBookieRecoveryDelay());
+ } else {
+ LOG.info(
+ "Valid lostBookieRecoveryDelay zNode is available, so
not creating lostBookieRecoveryDelay zNode as part of Auditor initialization ");
+ }
+ lostBookieRecoveryDelayBeforeChange =
this.ledgerUnderreplicationManager.getLostBookieRecoveryDelay();
} catch (CompatibilityException ce) {
throw new UnavailableException(
"CompatibilityException while initializing Auditor", ce);
@@ -191,7 +198,8 @@ public class Auditor implements BookiesListener {
public void run() {
try {
waitIfLedgerReplicationDisabled();
-
+ int lostBookieRecoveryDelay =
Auditor.this.ledgerUnderreplicationManager
+ .getLostBookieRecoveryDelay();
List<String> availableBookies = getAvailableBookies();
// casting to String, as knownBookies and
availableBookies
@@ -220,7 +228,7 @@ public class Auditor implements BookiesListener {
}
knownBookies.removeAll(bookiesToBeAudited);
- if (conf.getLostBookieRecoveryDelay() == 0) {
+ if (lostBookieRecoveryDelay == 0) {
startAudit(false);
bookiesToBeAudited.clear();
return;
@@ -246,9 +254,9 @@ public class Auditor implements BookiesListener {
auditTask = null;
bookiesToBeAudited.clear();
}
- }, conf.getLostBookieRecoveryDelay(),
TimeUnit.SECONDS);
+ }, lostBookieRecoveryDelay, TimeUnit.SECONDS);
numBookieAuditsDelayed.inc();
- LOG.info("Delaying bookie audit by " +
conf.getLostBookieRecoveryDelay()
+ LOG.info("Delaying bookie audit by " +
lostBookieRecoveryDelay
+ "secs for " +
bookiesToBeAudited.toString());
}
} catch (BKException bke) {
@@ -263,6 +271,64 @@ public class Auditor implements BookiesListener {
});
}
+ synchronized Future<?> submitLostBookieRecoveryDelayChangedEvent() {
+ if (executor.isShutdown()) {
+ SettableFuture<Void> f = SettableFuture.<Void> create();
+ f.setException(new BKAuditException("Auditor shutting down"));
+ return f;
+ }
+ return executor.submit(new Runnable() {
+ int lostBookieRecoveryDelay = -1;
+ public void run() {
+ try {
+ waitIfLedgerReplicationDisabled();
+ lostBookieRecoveryDelay =
Auditor.this.ledgerUnderreplicationManager
+ .getLostBookieRecoveryDelay();
+ // if there is pending auditTask, cancel the task. So that
it can be rescheduled
+ // after new lostBookieRecoveryDelay period
+ if (auditTask != null) {
+ LOG.info("lostBookieRecoveryDelay period has been
changed so canceling the pending AuditTask");
+ auditTask.cancel(false);
+ numDelayedBookieAuditsCancelled.inc();
+ }
+
+ // if lostBookieRecoveryDelay is set to its previous value
then consider it as
+ // signal to trigger the Audit immediately.
+ if ((lostBookieRecoveryDelay == 0)
+ || (lostBookieRecoveryDelay ==
lostBookieRecoveryDelayBeforeChange)) {
+ LOG.info(
+ "lostBookieRecoveryDelay has been set to 0 or
reset to its previos value, so starting AuditTask. "
+ + "Current lostBookieRecoveryDelay: {},
previous lostBookieRecoveryDelay: {}",
+ lostBookieRecoveryDelay,
lostBookieRecoveryDelayBeforeChange);
+ startAudit(false);
+ auditTask = null;
+ bookiesToBeAudited.clear();
+ } else if (auditTask != null) {
+ LOG.info("lostBookieRecoveryDelay has been set to {},
so rescheduling AuditTask accordingly",
+ lostBookieRecoveryDelay);
+ auditTask = executor.schedule(new Runnable() {
+ public void run() {
+ startAudit(false);
+ auditTask = null;
+ bookiesToBeAudited.clear();
+ }
+ }, lostBookieRecoveryDelay, TimeUnit.SECONDS);
+ numBookieAuditsDelayed.inc();
+ }
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ LOG.error("Interrupted while for LedgersReplication to be
enabled ", ie);
+ } catch (UnavailableException ue) {
+ LOG.error("Exception while reading from ZK", ue);
+ } finally{
+ if (lostBookieRecoveryDelay != -1) {
+ lostBookieRecoveryDelayBeforeChange =
lostBookieRecoveryDelay;
+ }
+ }
+ }
+ });
+ }
+
public void start() {
LOG.info("I'm starting as Auditor Bookie. ID: {}", bookieIdentifier);
// on startup watching available bookie and based on the
@@ -317,6 +383,14 @@ public class Auditor implements BookiesListener {
LOG.error("Couldn't get bookie list, exiting", bke);
submitShutdownTask();
}
+
+ try {
+ this.ledgerUnderreplicationManager
+ .notifyLostBookieRecoveryDelayChanged(new
LostBookieRecoveryDelayChangedCb());
+ } catch (UnavailableException ue) {
+ LOG.error("Exception while registering for
LostBookieRecoveryDelay change notification", ue);
+ submitShutdownTask();
+ }
long bookieCheckInterval =
conf.getAuditorPeriodicBookieCheckInterval();
if (bookieCheckInterval == 0) {
@@ -330,6 +404,19 @@ public class Auditor implements BookiesListener {
}
}
+ private class LostBookieRecoveryDelayChangedCb implements
GenericCallback<Void> {
+ @Override
+ public void operationComplete(int rc, Void result) {
+ try {
+ Auditor.this.ledgerUnderreplicationManager
+
.notifyLostBookieRecoveryDelayChanged(LostBookieRecoveryDelayChangedCb.this);
+ } catch (UnavailableException ae) {
+ LOG.error("Exception while registering for a
LostBookieRecoveryDelay notification", ae);
+ }
+ Auditor.this.submitLostBookieRecoveryDelayChangedEvent();
+ }
+ }
+
private void waitIfLedgerReplicationDisabled() throws UnavailableException,
InterruptedException {
ReplicationEnableCb cb = new ReplicationEnableCb();
@@ -673,4 +760,11 @@ public class Auditor implements BookiesListener {
}
};
+ int getLostBookieRecoveryDelayBeforeChange() {
+ return lostBookieRecoveryDelayBeforeChange;
+ }
+
+ Future<?> getAuditTask() {
+ return auditTask;
+ }
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationException.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationException.java
index 32a03e2..2ff7b94 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationException.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationException.java
@@ -64,7 +64,7 @@ public abstract class ReplicationException extends Exception {
/**
* Exception while auditing bookie-ledgers
*/
- static class BKAuditException extends ReplicationException {
+ public static class BKAuditException extends ReplicationException {
private static final long serialVersionUID = 95551905L;
BKAuditException(String message, Throwable cause) {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java
index bae7715..362e1e6 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java
@@ -40,6 +40,7 @@ public class BookKeeperConstants {
public static final String UNDER_REPLICATION_NODE = "underreplication";
public static final String UNDER_REPLICATION_LOCK = "locks";
public static final String DISABLE_NODE = "disable";
+ public static final String LOSTBOOKIERECOVERYDELAY_NODE =
"lostBookieRecoveryDelay";
public static final String DEFAULT_ZK_LEDGERS_ROOT_PATH = "/ledgers";
public static final String LAYOUT_ZNODE = "LAYOUT";
public static final String INSTANCEID = "INSTANCEID";
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperAdminTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperAdminTest.java
new file mode 100644
index 0000000..b197353
--- /dev/null
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperAdminTest.java
@@ -0,0 +1,230 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Iterator;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.client.BKException.BKIllegalOpException;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager;
+import
org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BookKeeperAdminTest extends BookKeeperClusterTestCase {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(BookKeeperAdminTest.class);
+ private DigestType digestType = DigestType.CRC32;
+ private static final String PASSWORD = "testPasswd";
+ private static final int numOfBookies = 6;
+ private final int lostBookieRecoveryDelayInitValue = 1800;
+
+ public BookKeeperAdminTest() {
+ super(numOfBookies);
+ baseConf.setAutoRecoveryDaemonEnabled(true);
+ baseConf.setLostBookieRecoveryDelay(lostBookieRecoveryDelayInitValue);
+ baseConf.setOpenLedgerRereplicationGracePeriod(String.valueOf(30000));
+ }
+
+ @Test(timeout = 60000)
+ public void testLostBookieRecoveryDelayValue() throws Exception {
+ BookKeeperAdmin bkAdmin = new
BookKeeperAdmin(zkUtil.getZooKeeperConnectString());
+ assertEquals("LostBookieRecoveryDelay",
lostBookieRecoveryDelayInitValue, bkAdmin.getLostBookieRecoveryDelay());
+ int newLostBookieRecoveryDelayValue = 2400;
+ bkAdmin.setLostBookieRecoveryDelay(newLostBookieRecoveryDelayValue);
+ assertEquals("LostBookieRecoveryDelay",
newLostBookieRecoveryDelayValue, bkAdmin.getLostBookieRecoveryDelay());
+ assertEquals("LostBookieRecoveryDelay",
newLostBookieRecoveryDelayValue, bkAdmin.getLostBookieRecoveryDelay());
+ newLostBookieRecoveryDelayValue = 3000;
+ bkAdmin.setLostBookieRecoveryDelay(newLostBookieRecoveryDelayValue);
+ assertEquals("LostBookieRecoveryDelay",
newLostBookieRecoveryDelayValue, bkAdmin.getLostBookieRecoveryDelay());
+ bkAdmin.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testTriggerAudit() throws Exception {
+ ZkLedgerUnderreplicationManager urLedgerMgr = new
ZkLedgerUnderreplicationManager(baseClientConf, zkc);
+ BookKeeperAdmin bkAdmin = new
BookKeeperAdmin(zkUtil.getZooKeeperConnectString());
+ int lostBookieRecoveryDelayValue =
bkAdmin.getLostBookieRecoveryDelay();
+ urLedgerMgr.disableLedgerReplication();
+ try {
+ bkAdmin.triggerAudit();
+ Assert.fail("Trigger Audit should have failed because
LedgerReplication is disabled");
+ } catch (UnavailableException une) {
+ // expected
+ }
+ assertEquals("LostBookieRecoveryDelay", lostBookieRecoveryDelayValue,
bkAdmin.getLostBookieRecoveryDelay());
+ urLedgerMgr.enableLedgerReplication();
+ bkAdmin.triggerAudit();
+ assertEquals("LostBookieRecoveryDelay", lostBookieRecoveryDelayValue,
bkAdmin.getLostBookieRecoveryDelay());
+ long ledgerId = 1L;
+ LedgerHandle ledgerHandle = bkc.createLedgerAdv(ledgerId, numBookies,
numBookies, numBookies, digestType,
+ PASSWORD.getBytes(), null);
+ ledgerHandle.addEntry(0, "data".getBytes());
+ ledgerHandle.close();
+
+ killBookie(1);
+ /*
+ * since lostBookieRecoveryDelay is set, when a bookie is died, it will
+ * not start Audit process immediately. But when triggerAudit is called
+ * it will force audit process.
+ */
+ bkAdmin.triggerAudit();
+ Thread.sleep(500);
+ Iterator<Long> ledgersToRereplicate =
urLedgerMgr.listLedgersToRereplicate(null);
+ assertTrue("There are supposed to be underreplicatedledgers",
ledgersToRereplicate.hasNext());
+ assertEquals("Underreplicated ledgerId", ledgerId,
ledgersToRereplicate.next().longValue());
+ bkAdmin.close();
+ }
+
+ @Test(timeout = 480000)
+ public void testDecommissionBookie() throws Exception {
+ ZkLedgerUnderreplicationManager urLedgerMgr = new
ZkLedgerUnderreplicationManager(baseClientConf, zkc);
+ BookKeeperAdmin bkAdmin = new
BookKeeperAdmin(zkUtil.getZooKeeperConnectString());
+
+ int numOfLedgers = 2 * numOfBookies;
+ int numOfEntries = 2 * numOfBookies;
+ for (int i = 0; i < numOfLedgers; i++) {
+ LedgerHandle lh = bkc.createLedger(3, 2, digestType,
PASSWORD.getBytes());
+ for (int j = 0; j < numOfEntries; j++) {
+ lh.addEntry("entry".getBytes());
+ }
+ lh.close();
+ }
+ /*
+ * create ledgers having empty segments (segment with no entries)
+ */
+ for (int i = 0; i < numOfLedgers; i++) {
+ LedgerHandle emptylh = bkc.createLedger(3, 2, digestType,
PASSWORD.getBytes());
+ emptylh.close();
+ }
+
+ try {
+ /*
+ * if we try to call decommissionBookie for a bookie which is not
+ * shutdown, then it should throw BKIllegalOpException
+ */
+ bkAdmin.decommissionBookie(bs.get(0).getLocalAddress());
+ fail("Expected BKIllegalOpException because that bookie is not
shutdown yet");
+ } catch (BKIllegalOpException bkioexc) {
+ // expected IllegalException
+ }
+
+ ServerConfiguration killedBookieConf = killBookie(1);
+ /*
+ * this decommisionBookie should make sure that there are no
+ * underreplicated ledgers because of this bookie
+ */
+ bkAdmin.decommissionBookie(Bookie.getBookieAddress(killedBookieConf));
+ bkAdmin.triggerAudit();
+ Thread.sleep(500);
+ Iterator<Long> ledgersToRereplicate =
urLedgerMgr.listLedgersToRereplicate(null);
+ if (ledgersToRereplicate.hasNext()) {
+ while (ledgersToRereplicate.hasNext()) {
+ Long ledgerId = ledgersToRereplicate.next();
+ LOG.error("Ledger: {} is underreplicated which is not
expected", ledgerId);
+ }
+ fail("There are not supposed to be any underreplicatedledgers");
+ }
+
+ killedBookieConf = killBookie(0);
+ bkAdmin.decommissionBookie(Bookie.getBookieAddress(killedBookieConf));
+ bkAdmin.triggerAudit();
+ Thread.sleep(500);
+ ledgersToRereplicate = urLedgerMgr.listLedgersToRereplicate(null);
+ if (ledgersToRereplicate.hasNext()) {
+ while (ledgersToRereplicate.hasNext()) {
+ Long ledgerId = ledgersToRereplicate.next();
+ LOG.error("Ledger: {} is underreplicated which is not
expected", ledgerId);
+ }
+ fail("There are not supposed to be any underreplicatedledgers");
+ }
+ bkAdmin.close();
+ }
+
+ @Test(timeout = 240000)
+ public void
testDecommissionForLedgersWithMultipleSegmentsAndNotWriteClosed() throws
Exception {
+ ZkLedgerUnderreplicationManager urLedgerMgr = new
ZkLedgerUnderreplicationManager(baseClientConf, zkc);
+ BookKeeperAdmin bkAdmin = new
BookKeeperAdmin(zkUtil.getZooKeeperConnectString());
+ int numOfEntries = 2 * numOfBookies;
+
+ LedgerHandle lh1 = bkc.createLedgerAdv(1L, numBookies, 3, 3,
digestType, PASSWORD.getBytes(), null);
+ LedgerHandle lh2 = bkc.createLedgerAdv(2L, numBookies, 3, 3,
digestType, PASSWORD.getBytes(), null);
+ LedgerHandle lh3 = bkc.createLedgerAdv(3L, numBookies, 3, 3,
digestType, PASSWORD.getBytes(), null);
+ LedgerHandle lh4 = bkc.createLedgerAdv(4L, numBookies, 3, 3,
digestType, PASSWORD.getBytes(), null);
+ for (int j = 0; j < numOfEntries; j++) {
+ lh1.addEntry(j, "data".getBytes());
+ lh2.addEntry(j, "data".getBytes());
+ lh3.addEntry(j, "data".getBytes());
+ lh4.addEntry(j, "data".getBytes());
+ }
+
+ startNewBookie();
+
+ assertEquals("Number of Available Bookies", numOfBookies + 1,
bkAdmin.getAvailableBookies().size());
+
+ ServerConfiguration killedBookieConf = killBookie(0);
+
+ /*
+ * since one of the bookie is killed, ensemble change happens when next
+ * write is made.So new segment will be created for those 2 ledgers.
+ */
+ for (int j = numOfEntries; j < 2 * numOfEntries; j++) {
+ lh1.addEntry(j, "data".getBytes());
+ lh2.addEntry(j, "data".getBytes());
+ }
+
+ /*
+ * Here lh1 and lh2 have multiple segments and are writeclosed. But
lh3 and lh4 are
+ * not writeclosed and contains only one segment.
+ */
+ lh1.close();
+ lh2.close();
+
+ /*
+ * If the last segment of the ledger is underreplicated and if the
+ * ledger is not closed then it will remain underreplicated for
+ * openLedgerRereplicationGracePeriod (by default 30 secs). For more
+ * info. Check BOOKKEEPER-237 and BOOKKEEPER-325. But later
+ * ReplicationWorker will fence the ledger.
+ */
+ bkAdmin.decommissionBookie(Bookie.getBookieAddress(killedBookieConf));
+ bkAdmin.triggerAudit();
+ Thread.sleep(500);
+ Iterator<Long> ledgersToRereplicate =
urLedgerMgr.listLedgersToRereplicate(null);
+ if (ledgersToRereplicate.hasNext()) {
+ while (ledgersToRereplicate.hasNext()) {
+ Long ledgerId = ledgersToRereplicate.next();
+ LOG.error("Ledger: {} is underreplicated which is not
expected", ledgerId);
+ }
+ fail("There are not supposed to be any underreplicatedledgers");
+ }
+ bkAdmin.close();
+ }
+}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
index 103f1ce..f2a7316 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
@@ -20,6 +20,11 @@
*/
package org.apache.bookkeeper.replication;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -33,6 +38,7 @@ import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -40,13 +46,19 @@ import
org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.LedgerMetadata;
import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager;
+import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat;
import
org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
import
org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
import org.apache.bookkeeper.test.MultiLedgerManagerTestCase;
+import org.apache.commons.lang.mutable.MutableInt;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
@@ -56,8 +68,6 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.junit.Assert.*;
-
/**
* Tests publishing of under replicated ledgers by the Auditor bookie node when
* corresponding bookies identifes as not running
@@ -337,7 +347,7 @@ public class AuditorLedgerCheckerTest extends
MultiLedgerManagerTestCase {
.size());
// wait for 5 seconds before starting the recovery work when a bookie
fails
- baseConf.setLostBookieRecoveryDelay(5);
+ urLedgerMgr.setLostBookieRecoveryDelay(5);
// shutdown a non auditor bookie; choosing non-auditor to avoid
another election
String shutdownBookie = shutDownNonAuditorBookie();
@@ -395,6 +405,247 @@ public class AuditorLedgerCheckerTest extends
MultiLedgerManagerTestCase {
_testDelayedAuditOfLostBookies();
}
+ @Test(timeout=60000)
+ public void testRescheduleOfDelayedAuditOfLostBookiesToStartImmediately()
throws Exception {
+ // wait for a second so that the initial periodic check finishes
+ Thread.sleep(1000);
+
+ LedgerHandle lh1 = createAndAddEntriesToLedger();
+ Long ledgerId = lh1.getId();
+ LOG.debug("Created ledger : " + ledgerId);
+ ledgerList.add(ledgerId);
+ lh1.close();
+
+ final CountDownLatch underReplicaLatch =
registerUrLedgerWatcher(ledgerList
+ .size());
+
+ // wait for 50 seconds before starting the recovery work when a bookie
fails
+ urLedgerMgr.setLostBookieRecoveryDelay(50);
+
+ // shutdown a non auditor bookie; choosing non-auditor to avoid
another election
+ String shutdownBookie = shutDownNonAuditorBookie();
+
+ LOG.debug("Waiting for ledgers to be marked as under replicated");
+ assertFalse("audit of lost bookie isn't delayed",
underReplicaLatch.await(4, TimeUnit.SECONDS));
+ assertEquals("under replicated ledgers identified when it was not
expected", 0,
+ urLedgerList.size());
+
+ // set lostBookieRecoveryDelay to 0, so that it triggers AuditTask
immediately
+ urLedgerMgr.setLostBookieRecoveryDelay(0);
+
+ // wait for 1 second for the ledger to get reported as under replicated
+ assertTrue("audit of lost bookie isn't delayed",
underReplicaLatch.await(1, TimeUnit.SECONDS));
+
+ assertTrue("Ledger is not marked as underreplicated:" + ledgerId,
+ urLedgerList.contains(ledgerId));
+ Map<Long, String> urLedgerData = getUrLedgerData(urLedgerList);
+ String data = urLedgerData.get(ledgerId);
+ assertTrue("Bookie " + shutdownBookie
+ + "is not listed in the ledger as missing replica :" + data,
+ data.contains(shutdownBookie));
+ }
+
+ @Test(timeout=60000)
+ public void testRescheduleOfDelayedAuditOfLostBookiesToStartLater() throws
Exception {
+ // wait for a second so that the initial periodic check finishes
+ Thread.sleep(1000);
+
+ LedgerHandle lh1 = createAndAddEntriesToLedger();
+ Long ledgerId = lh1.getId();
+ LOG.debug("Created ledger : " + ledgerId);
+ ledgerList.add(ledgerId);
+ lh1.close();
+
+ final CountDownLatch underReplicaLatch =
registerUrLedgerWatcher(ledgerList
+ .size());
+
+ // wait for 3 seconds before starting the recovery work when a bookie
fails
+ urLedgerMgr.setLostBookieRecoveryDelay(3);
+
+ // shutdown a non auditor bookie; choosing non-auditor to avoid
another election
+ String shutdownBookie = shutDownNonAuditorBookie();
+
+ LOG.debug("Waiting for ledgers to be marked as under replicated");
+ assertFalse("audit of lost bookie isn't delayed",
underReplicaLatch.await(2, TimeUnit.SECONDS));
+ assertEquals("under replicated ledgers identified when it was not
expected", 0,
+ urLedgerList.size());
+
+ // set lostBookieRecoveryDelay to 4, so the pending AuditTask is
resheduled
+ urLedgerMgr.setLostBookieRecoveryDelay(4);
+
+ // since we changed the BookieRecoveryDelay period to 4, the audittask
shouldn't have been executed
+ LOG.debug("Waiting for ledgers to be marked as under replicated");
+ assertFalse("audit of lost bookie isn't delayed",
underReplicaLatch.await(2, TimeUnit.SECONDS));
+ assertEquals("under replicated ledgers identified when it was not
expected", 0,
+ urLedgerList.size());
+
+ // wait for 3 seconds (since we already waited for 2 secs) for the
ledger to get reported as under replicated
+ assertTrue("audit of lost bookie isn't delayed",
underReplicaLatch.await(3, TimeUnit.SECONDS));
+ assertTrue("Ledger is not marked as underreplicated:" + ledgerId,
+ urLedgerList.contains(ledgerId));
+ Map<Long, String> urLedgerData = getUrLedgerData(urLedgerList);
+ String data = urLedgerData.get(ledgerId);
+ assertTrue("Bookie " + shutdownBookie
+ + "is not listed in the ledger as missing replica :" + data,
+ data.contains(shutdownBookie));
+ }
+
+ @Test(timeout=60000)
+ public void testTriggerAuditorWithNoPendingAuditTask() throws Exception {
+ // wait for a second so that the initial periodic check finishes
+ Thread.sleep(1000);
+ int lostBookieRecoveryDelayConfValue =
baseConf.getLostBookieRecoveryDelay();
+ Auditor auditorBookiesAuditor = getAuditorBookiesAuditor();
+ Future<?> auditTask = auditorBookiesAuditor.getAuditTask();
+ int lostBookieRecoveryDelayBeforeChange =
auditorBookiesAuditor.getLostBookieRecoveryDelayBeforeChange();
+ Assert.assertEquals("auditTask is supposed to be null", null,
auditTask);
+ Assert.assertEquals(
+ "lostBookieRecoveryDelayBeforeChange of Auditor should be
equal to BaseConf's lostBookieRecoveryDelay",
+ lostBookieRecoveryDelayConfValue,
lostBookieRecoveryDelayBeforeChange);
+
+ // there is no easy way to validate if the Auditor has executed Audit
process (Auditor.startAudit),
+ // without shuttingdown Bookie. To test if by resetting
LostBookieRecoveryDelay it does Auditing
+ // even when there is no pending AuditTask, following approach is
needed.
+
+ // Here we are creating few ledgers ledgermetadata with non-existing
bookies as its ensemble.
+ // When Auditor does audit it recognizes these ledgers as
underreplicated and mark them as
+ // under-replicated, since these bookies are not available.
+ int numofledgers = 5;
+ Random rand = new Random();
+ for (int i = 0; i < numofledgers; i++) {
+ LedgerMetadata metadata = new LedgerMetadata(3, 2, 2,
DigestType.CRC32, "passwd".getBytes(), null);
+ ArrayList<BookieSocketAddress> ensemble = new
ArrayList<BookieSocketAddress>();
+ ensemble.add(new BookieSocketAddress("99.99.99.99:9999"));
+ ensemble.add(new BookieSocketAddress("11.11.11.11:1111"));
+ ensemble.add(new BookieSocketAddress("88.88.88.88:8888"));
+ metadata.addEnsemble(0, ensemble);
+ LedgerManager ledgerManager =
LedgerManagerFactory.newLedgerManagerFactory(baseClientConf, zkc)
+ .newLedgerManager();
+ MutableInt ledgerCreateRC = new MutableInt(-1);
+ CountDownLatch latch = new CountDownLatch(1);
+ long ledgerId = (Math.abs(rand.nextLong())) % 100000000;
+ ledgerManager.createLedgerMetadata(ledgerId, metadata,
+ new BookkeeperInternalCallbacks.GenericCallback<Void>() {
+ @Override
+ public void operationComplete(int rc, Void result) {
+ ledgerCreateRC.setValue(rc);
+ latch.countDown();
+ }
+ });
+ Assert.assertTrue("Ledger creation should complete within 2 secs",
+ latch.await(2000, TimeUnit.MILLISECONDS));
+ Assert.assertEquals("LedgerCreate should succeed and return OK rc
value", BKException.Code.OK,
+ ledgerCreateRC.getValue());
+ ledgerList.add(ledgerId);
+ }
+
+ final CountDownLatch underReplicaLatch =
registerUrLedgerWatcher(ledgerList.size());
+
urLedgerMgr.setLostBookieRecoveryDelay(lostBookieRecoveryDelayBeforeChange);
+ assertTrue("Audit should be triggered and created ledgers should be
marked as underreplicated",
+ underReplicaLatch.await(2, TimeUnit.SECONDS));
+ assertEquals("All the ledgers should be marked as underreplicated",
ledgerList.size(), urLedgerList.size());
+
+ auditTask = auditorBookiesAuditor.getAuditTask();
+ Assert.assertEquals("auditTask is supposed to be null", null,
auditTask);
+ Assert.assertEquals(
+ "lostBookieRecoveryDelayBeforeChange of Auditor should be
equal to BaseConf's lostBookieRecoveryDelay",
+ lostBookieRecoveryDelayBeforeChange,
auditorBookiesAuditor.getLostBookieRecoveryDelayBeforeChange());
+ }
+
+ @Test(timeout=60000)
+ public void testTriggerAuditorWithPendingAuditTask() throws Exception {
+ // wait for a second so that the initial periodic check finishes
+ Thread.sleep(1000);
+
+ Auditor auditorBookiesAuditor = getAuditorBookiesAuditor();
+ LedgerHandle lh1 = createAndAddEntriesToLedger();
+ Long ledgerId = lh1.getId();
+ LOG.debug("Created ledger : " + ledgerId);
+ ledgerList.add(ledgerId);
+ lh1.close();
+
+ final CountDownLatch underReplicaLatch =
registerUrLedgerWatcher(ledgerList
+ .size());
+
+ int lostBookieRecoveryDelay = 5;
+ // wait for 5 seconds before starting the recovery work when a bookie
fails
+ urLedgerMgr.setLostBookieRecoveryDelay(lostBookieRecoveryDelay);
+
+ // shutdown a non auditor bookie; choosing non-auditor to avoid
another election
+ String shutdownBookie = shutDownNonAuditorBookie();
+
+ LOG.debug("Waiting for ledgers to be marked as under replicated");
+ assertFalse("audit of lost bookie isn't delayed",
underReplicaLatch.await(2, TimeUnit.SECONDS));
+ assertEquals("under replicated ledgers identified when it was not
expected", 0,
+ urLedgerList.size());
+
+ Future<?> auditTask = auditorBookiesAuditor.getAuditTask();
+ Assert.assertNotEquals("auditTask is not supposed to be null", null,
auditTask);
+ Assert.assertEquals(
+ "lostBookieRecoveryDelayBeforeChange of Auditor should be
equal to what we set",
+ lostBookieRecoveryDelay,
auditorBookiesAuditor.getLostBookieRecoveryDelayBeforeChange());
+
+ // set lostBookieRecoveryDelay to 5 (previous value), so that Auditor
is triggered immediately
+ urLedgerMgr.setLostBookieRecoveryDelay(lostBookieRecoveryDelay);
+ assertTrue("audit of lost bookie shouldn't be delayed",
underReplicaLatch.await(2, TimeUnit.SECONDS));
+ assertEquals("all under replicated ledgers should be identified",
ledgerList.size(),
+ urLedgerList.size());
+
+ Thread.sleep(100);
+ auditTask = auditorBookiesAuditor.getAuditTask();
+ Assert.assertEquals("auditTask is supposed to be null", null,
auditTask);
+ Assert.assertEquals(
+ "lostBookieRecoveryDelayBeforeChange of Auditor should be
equal to previously set value",
+ lostBookieRecoveryDelay,
auditorBookiesAuditor.getLostBookieRecoveryDelayBeforeChange());
+ }
+
+ @Test(timeout=60000)
+ public void testTriggerAuditorBySettingDelayToZeroWithPendingAuditTask()
throws Exception {
+ // wait for a second so that the initial periodic check finishes
+ Thread.sleep(1000);
+
+ Auditor auditorBookiesAuditor = getAuditorBookiesAuditor();
+ LedgerHandle lh1 = createAndAddEntriesToLedger();
+ Long ledgerId = lh1.getId();
+ LOG.debug("Created ledger : " + ledgerId);
+ ledgerList.add(ledgerId);
+ lh1.close();
+
+ final CountDownLatch underReplicaLatch =
registerUrLedgerWatcher(ledgerList
+ .size());
+
+ int lostBookieRecoveryDelay = 5;
+ // wait for 5 seconds before starting the recovery work when a bookie
fails
+ urLedgerMgr.setLostBookieRecoveryDelay(lostBookieRecoveryDelay);
+
+ // shutdown a non auditor bookie; choosing non-auditor to avoid
another election
+ String shutdownBookie = shutDownNonAuditorBookie();
+
+ LOG.debug("Waiting for ledgers to be marked as under replicated");
+ assertFalse("audit of lost bookie isn't delayed",
underReplicaLatch.await(2, TimeUnit.SECONDS));
+ assertEquals("under replicated ledgers identified when it was not
expected", 0,
+ urLedgerList.size());
+
+ Future<?> auditTask = auditorBookiesAuditor.getAuditTask();
+ Assert.assertNotEquals("auditTask is not supposed to be null", null,
auditTask);
+ Assert.assertEquals(
+ "lostBookieRecoveryDelayBeforeChange of Auditor should be
equal to what we set",
+ lostBookieRecoveryDelay,
auditorBookiesAuditor.getLostBookieRecoveryDelayBeforeChange());
+
+ // set lostBookieRecoveryDelay to 0, so that Auditor is triggered
immediately
+ urLedgerMgr.setLostBookieRecoveryDelay(0);
+ assertTrue("audit of lost bookie shouldn't be delayed",
underReplicaLatch.await(1, TimeUnit.SECONDS));
+ assertEquals("all under replicated ledgers should be identified",
ledgerList.size(),
+ urLedgerList.size());
+
+ Thread.sleep(100);
+ auditTask = auditorBookiesAuditor.getAuditTask();
+ Assert.assertEquals("auditTask is supposed to be null", null,
auditTask);
+ Assert.assertEquals(
+ "lostBookieRecoveryDelayBeforeChange of Auditor should be
equal to previously set value",
+ 0,
auditorBookiesAuditor.getLostBookieRecoveryDelayBeforeChange());
+ }
+
/**
* Test audit of bookies is delayed when one bookie is down. But when
* another one goes down, the audit is started immediately.
@@ -414,7 +665,7 @@ public class AuditorLedgerCheckerTest extends
MultiLedgerManagerTestCase {
CountDownLatch underReplicaLatch =
registerUrLedgerWatcher(ledgerList.size());
// wait for 10 seconds before starting the recovery work when a bookie
fails
- baseConf.setLostBookieRecoveryDelay(10);
+ urLedgerMgr.setLostBookieRecoveryDelay(10);
// shutdown a non auditor bookie to avoid an election
String shutdownBookie1 = shutDownNonAuditorBookie();
@@ -467,7 +718,7 @@ public class AuditorLedgerCheckerTest extends
MultiLedgerManagerTestCase {
CountDownLatch underReplicaLatch =
registerUrLedgerWatcher(ledgerList.size());
// wait for 5 seconds before starting the recovery work when a bookie
fails
- baseConf.setLostBookieRecoveryDelay(5);
+ urLedgerMgr.setLostBookieRecoveryDelay(5);
// shutdown a non auditor bookie to avoid an election
int idx1 = getShutDownNonAuditorBookieIdx("");
@@ -647,6 +898,12 @@ public class AuditorLedgerCheckerTest extends
MultiLedgerManagerTestCase {
return auditors.get(0);
}
+ private Auditor getAuditorBookiesAuditor() throws Exception {
+ BookieServer auditorBookieServer = getAuditorBookie();
+ String bookieAddr = auditorBookieServer.getLocalAddress().toString();
+ return auditorElectors.get(bookieAddr).auditor;
+ }
+
private String shutDownNonAuditorBookie() throws Exception {
// shutdown bookie which is not an auditor
int indexOf = bs.indexOf(getAuditorBookie());
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].