Repository: bookkeeper Updated Branches: refs/heads/master fe52b500c -> a77042db1
BOOKKEEPER-898: listen to read only bookie changes also in auditor Author: Siddharth Boobna <[email protected]> Reviewers: Sijie Guo <[email protected]> Closes #19 from sboobna/BOOKKEEPER-898 Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/a77042db Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/a77042db Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/a77042db Branch: refs/heads/master Commit: a77042db1b024b5cb2bdd2a5f49ea18a43bc036b Parents: fe52b50 Author: Siddharth Boobna <[email protected]> Authored: Mon Mar 7 22:18:12 2016 -0800 Committer: Sijie Guo <[email protected]> Committed: Mon Mar 7 22:18:12 2016 -0800 ---------------------------------------------------------------------- .../bookkeeper/client/BookKeeperAdmin.java | 42 +++++++++++++------- .../apache/bookkeeper/client/BookieWatcher.java | 24 +++++++++++ .../apache/bookkeeper/replication/Auditor.java | 17 ++++++-- .../replication/AuditorLedgerCheckerTest.java | 42 ++++++++++++++++++++ 4 files changed, 106 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/a77042db/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java index 38b21e2..ff339db 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java @@ -20,6 +20,21 @@ */ package org.apache.bookkeeper.client; +import static com.google.common.base.Charsets.UTF_8; + +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Random; +import java.util.UUID; + import org.apache.bookkeeper.client.AsyncCallback.OpenCallback; import org.apache.bookkeeper.client.AsyncCallback.RecoverCallback; import org.apache.bookkeeper.client.BookKeeper.SyncOpenCallback; @@ -42,21 +57,6 @@ import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.NoSuchElementException; -import java.util.Random; -import java.util.UUID; - -import static com.google.common.base.Charsets.UTF_8; - /** * Admin client for BookKeeper clusters */ @@ -213,6 +213,18 @@ public class BookKeeperAdmin { } /** + * Notify when the available list of read only bookies changes. + * This is a one-shot notification. To receive subsequent notifications + * the listener must be registered again. + * + * @param listener the listener to notify + */ + public void notifyReadOnlyBookiesChanged(final BookiesListener listener) + throws BKException { + bkc.bookieWatcher.notifyReadOnlyBookiesChanged(listener); + } + + /** * Open a ledger as an administrator. This means that no digest password * checks are done. Otherwise, the call is identical to BookKeeper#asyncOpenLedger * http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/a77042db/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java index cadec5d..6f8e20d 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java @@ -125,6 +125,10 @@ class BookieWatcher implements Watcher, ChildrenCallback { } } + void notifyReadOnlyBookiesChanged(final BookiesListener listener) throws BKException { + readOnlyBookieWatcher.notifyBookiesChanged(listener); + } + public Collection<BookieSocketAddress> getBookies() throws BKException { try { List<String> children = bk.getZkHandle().getChildren(this.bookieRegistrationPath, false); @@ -357,6 +361,26 @@ class BookieWatcher implements Watcher, ChildrenCallback { } } + void notifyBookiesChanged(final BookiesListener listener) throws BKException { + try { + bk.getZkHandle().getChildren(this.readOnlyBookieRegPath, new Watcher() { + public void process(WatchedEvent event) { + // listen children changed event from ZooKeeper + if (event.getType() == EventType.NodeChildrenChanged) { + listener.availableBookiesChanged(); + } + } + }); + } catch (KeeperException ke) { + logger.error("Error registering watcher with zookeeper", ke); + throw new BKException.ZKException(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + logger.error("Interrupted registering watcher with zookeeper", ie); + throw new BKException.BKInterruptedException(); + } + } + // Read children and register watcher for readonly bookies path void readROBookies(ChildrenCallback callback) { bk.getZkHandle().getChildren(this.readOnlyBookieRegPath, this, callback, null); http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/a77042db/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java index 924e620..2e9e048 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java @@ -237,6 +237,7 @@ public class Auditor implements BookiesListener { LOG.info("Periodic checking disabled"); } try { + notifyBookieChanges(); knownBookies = getAvailableBookies(); } catch (BKException bke) { LOG.error("Couldn't get bookie list, exiting", bke); @@ -265,10 +266,7 @@ public class Auditor implements BookiesListener { } private List<String> getAvailableBookies() throws BKException { - // Get the available bookies, also watch for further changes - // Watching on only available bookies is sufficient, as changes in readonly bookies also changes in available - // bookies - admin.notifyBookiesChanged(this); + // Get the available bookies Collection<BookieSocketAddress> availableBkAddresses = admin.getAvailableBookies(); Collection<BookieSocketAddress> readOnlyBkAddresses = admin.getReadOnlyBookies(); availableBkAddresses.addAll(readOnlyBkAddresses); @@ -280,6 +278,11 @@ public class Auditor implements BookiesListener { return availableBookies; } + private void notifyBookieChanges() throws BKException { + admin.notifyBookiesChanged(this); + admin.notifyReadOnlyBookiesChanged(this); + } + @SuppressWarnings("unchecked") private void auditBookies() throws BKAuditException, KeeperException, @@ -501,6 +504,12 @@ public class Auditor implements BookiesListener { @Override public void availableBookiesChanged() { + // since a watch is triggered, we need to watch again on the bookies + try { + notifyBookieChanges(); + } catch (BKException bke) { + LOG.error("Exception while registering for a bookie change notification", bke); + } submitAuditTask(); } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/a77042db/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java index 692ddce..8b0c344 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java @@ -280,6 +280,48 @@ public class AuditorLedgerCheckerTest extends MultiLedgerManagerTestCase { } /** + * Test Auditor should consider Readonly bookie fail and publish ur ledgers for readonly bookies. + */ + @Test(timeout = 20000) + public void testReadOnlyBookieShutdown() throws Exception { + LedgerHandle lh = createAndAddEntriesToLedger(); + long ledgerId = lh.getId(); + ledgerList.add(ledgerId); + LOG.debug("Created following ledgers : " + ledgerList); + + int count = ledgerList.size(); + final CountDownLatch underReplicaLatch = registerUrLedgerWatcher(count); + + int bkIndex = bs.size() - 1; + ServerConfiguration bookieConf = bsConfs.get(bkIndex); + BookieServer bk = bs.get(bkIndex); + bookieConf.setReadOnlyModeEnabled(true); + bk.getBookie().doTransitionToReadOnlyMode(); + + // grace period for publishing the bk-ledger + LOG.debug("Waiting for Auditor to finish ledger check."); + assertFalse("latch should not have completed", underReplicaLatch.await(5, TimeUnit.SECONDS)); + + String shutdownBookie = shutdownBookie(bkIndex); + + // grace period for publishing the bk-ledger + LOG.debug("Waiting for ledgers to be marked as under replicated"); + underReplicaLatch.await(5, TimeUnit.SECONDS); + Map<Long, String> urLedgerData = getUrLedgerData(urLedgerList); + assertEquals("Missed identifying under replicated ledgers", 1, urLedgerList.size()); + + /* + * Sample data format present in the under replicated ledger path + * + * {4=replica: "10.18.89.153:5002"} + */ + assertTrue("Ledger is not marked as underreplicated:" + ledgerId, urLedgerList.contains(ledgerId)); + String data = urLedgerData.get(ledgerId); + assertTrue("Bookie " + shutdownBookie + "is not listed in the ledger as missing replica :" + data, + data.contains(shutdownBookie)); + } + + /** * Wait for ledger to be underreplicated, and to be missing all replicas specified */ private boolean waitForLedgerMissingReplicas(Long ledgerId, long secondsToWait, String... replicas)
