Repository: hadoop Updated Branches: refs/heads/trunk f24a56787 -> f0412de1c
HDFS-4266. BKJM: Separate write and ack quorum (Rakesh R via umamahesh) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f0412de1 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f0412de1 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f0412de1 Branch: refs/heads/trunk Commit: f0412de1c1d42b3c2a92531f81d97a24df920523 Parents: f24a567 Author: Uma Maheswara Rao G <umamah...@apache.org> Authored: Tue Feb 17 21:28:49 2015 +0530 Committer: Uma Maheswara Rao G <umamah...@apache.org> Committed: Tue Feb 17 21:28:49 2015 +0530 ---------------------------------------------------------------------- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + .../bkjournal/BookKeeperJournalManager.java | 15 +- .../bkjournal/TestBookKeeperJournalManager.java | 153 ++++++++++++++++++- 3 files changed, 163 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0412de1/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index fcf5994..f28e41e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -639,6 +639,8 @@ Release 2.7.0 - UNRELEASED HDFS-7797. Add audit log for setQuota operation (Rakesh R via umamahesh) + HDFS-4266. BKJM: Separate write and ack quorum (Rakesh R via umamahesh) + OPTIMIZATIONS HDFS-7454. Reduce memory footprint for AclEntries in NameNode. http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0412de1/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java index 51905c0..89fa84c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java @@ -152,6 +152,13 @@ public class BookKeeperJournalManager implements JournalManager { = "dfs.namenode.bookkeeperjournal.readEntryTimeoutSec"; public static final int BKJM_BOOKKEEPER_READ_ENTRY_TIMEOUT_DEFAULT = 5; + public static final String BKJM_BOOKKEEPER_ACK_QUORUM_SIZE + = "dfs.namenode.bookkeeperjournal.ack.quorum-size"; + + public static final String BKJM_BOOKKEEPER_ADD_ENTRY_TIMEOUT_SEC + = "dfs.namenode.bookkeeperjournal.addEntryTimeoutSec"; + public static final int BKJM_BOOKKEEPER_ADD_ENTRY_TIMEOUT_DEFAULT = 5; + private ZooKeeper zkc; private final Configuration conf; private final BookKeeper bkc; @@ -162,6 +169,8 @@ public class BookKeeperJournalManager implements JournalManager { private final MaxTxId maxTxId; private final int ensembleSize; private final int quorumSize; + private final int ackQuorumSize; + private final int addEntryTimeout; private final String digestpw; private final int speculativeReadTimeout; private final int readEntryTimeout; @@ -184,6 +193,9 @@ public class BookKeeperJournalManager implements JournalManager { BKJM_BOOKKEEPER_ENSEMBLE_SIZE_DEFAULT); quorumSize = conf.getInt(BKJM_BOOKKEEPER_QUORUM_SIZE, BKJM_BOOKKEEPER_QUORUM_SIZE_DEFAULT); + ackQuorumSize = conf.getInt(BKJM_BOOKKEEPER_ACK_QUORUM_SIZE, quorumSize); + addEntryTimeout = conf.getInt(BKJM_BOOKKEEPER_ADD_ENTRY_TIMEOUT_SEC, + BKJM_BOOKKEEPER_ADD_ENTRY_TIMEOUT_DEFAULT); speculativeReadTimeout = conf.getInt( BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_MS, BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_DEFAULT); @@ -216,6 +228,7 @@ public class BookKeeperJournalManager implements JournalManager { ClientConfiguration clientConf = new ClientConfiguration(); clientConf.setSpeculativeReadTimeout(speculativeReadTimeout); clientConf.setReadEntryTimeout(readEntryTimeout); + clientConf.setAddEntryTimeout(addEntryTimeout); bkc = new BookKeeper(clientConf, zkc); } catch (KeeperException e) { throw new IOException("Error initializing zk", e); @@ -403,7 +416,7 @@ public class BookKeeperJournalManager implements JournalManager { // bookkeeper errored on last stream, clean up ledger currentLedger.close(); } - currentLedger = bkc.createLedger(ensembleSize, quorumSize, + currentLedger = bkc.createLedger(ensembleSize, quorumSize, ackQuorumSize, BookKeeper.DigestType.MAC, digestpw.getBytes(Charsets.UTF_8)); } catch (BKException bke) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0412de1/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java index 44e4bef..07fcd72 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java @@ -32,6 +32,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Random; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.ExecutorService; import java.util.concurrent.Callable; @@ -67,6 +68,7 @@ public class TestBookKeeperJournalManager { private ZooKeeper zkc; private static BKJMUtil bkutil; static int numBookies = 3; + private BookieServer newBookie; @BeforeClass public static void setupBookkeeper() throws Exception { @@ -87,6 +89,9 @@ public class TestBookKeeperJournalManager { @After public void teardown() throws Exception { zkc.close(); + if (newBookie != null) { + newBookie.shutdown(); + } } private NamespaceInfo newNSInfo() { @@ -377,7 +382,8 @@ public class TestBookKeeperJournalManager { */ @Test public void testAllBookieFailure() throws Exception { - BookieServer bookieToFail = bkutil.newBookie(); + // bookie to fail + newBookie = bkutil.newBookie(); BookieServer replacementBookie = null; try { @@ -408,7 +414,7 @@ public class TestBookKeeperJournalManager { } out.setReadyToFlush(); out.flush(); - bookieToFail.shutdown(); + newBookie.shutdown(); assertEquals("New bookie didn't die", numBookies, bkutil.checkBookiesUp(numBookies, 10)); @@ -449,7 +455,7 @@ public class TestBookKeeperJournalManager { if (replacementBookie != null) { replacementBookie.shutdown(); } - bookieToFail.shutdown(); + newBookie.shutdown(); if (bkutil.checkBookiesUp(numBookies, 30) != numBookies) { LOG.warn("Not all bookies from this test shut down, expect errors"); @@ -464,7 +470,7 @@ public class TestBookKeeperJournalManager { */ @Test public void testOneBookieFailure() throws Exception { - BookieServer bookieToFail = bkutil.newBookie(); + newBookie = bkutil.newBookie(); BookieServer replacementBookie = null; try { @@ -500,7 +506,7 @@ public class TestBookKeeperJournalManager { replacementBookie = bkutil.newBookie(); assertEquals("replacement bookie didn't start", ensembleSize+1, bkutil.checkBookiesUp(ensembleSize+1, 10)); - bookieToFail.shutdown(); + newBookie.shutdown(); assertEquals("New bookie didn't die", ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10)); @@ -518,7 +524,7 @@ public class TestBookKeeperJournalManager { if (replacementBookie != null) { replacementBookie.shutdown(); } - bookieToFail.shutdown(); + newBookie.shutdown(); if (bkutil.checkBookiesUp(numBookies, 30) != numBookies) { LOG.warn("Not all bookies from this test shut down, expect errors"); @@ -822,6 +828,141 @@ public class TestBookKeeperJournalManager { assertTrue("No thread managed to complete formatting", numCompleted > 0); } + @Test(timeout = 120000) + public void testDefaultAckQuorum() throws Exception { + newBookie = bkutil.newBookie(); + int ensembleSize = numBookies + 1; + int quorumSize = numBookies + 1; + // ensure that the journal manager has to use all bookies, + // so that a failure will fail the journal manager + Configuration conf = new Configuration(); + conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ENSEMBLE_SIZE, + ensembleSize); + conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE, + quorumSize); + // sets 2 secs + conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ADD_ENTRY_TIMEOUT_SEC, + 2); + NamespaceInfo nsi = newNSInfo(); + BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, + BKJMUtil.createJournalURI("/hdfsjournal-onebookiefailure"), nsi); + bkjm.format(nsi); + CountDownLatch sleepLatch = new CountDownLatch(1); + sleepBookie(sleepLatch, newBookie); + + EditLogOutputStream out = bkjm.startLogSegment(1, + NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); + int numTransactions = 100; + for (long i = 1; i <= numTransactions; i++) { + FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); + op.setTransactionId(i); + out.write(op); + } + try { + out.close(); + bkjm.finalizeLogSegment(1, numTransactions); + + List<EditLogInputStream> in = new ArrayList<EditLogInputStream>(); + bkjm.selectInputStreams(in, 1, true); + try { + assertEquals(numTransactions, + FSEditLogTestUtil.countTransactionsInStream(in.get(0))); + } finally { + in.get(0).close(); + } + fail("Should throw exception as not enough non-faulty bookies available!"); + } catch (IOException ioe) { + // expected + } + } + + /** + * Test ack quorum feature supported by bookkeeper. Keep ack quorum bookie + * alive and sleep all the other bookies. Now the client would wait for the + * acknowledgement from the ack size bookies and after receiving the success + * response will continue writing. Non ack client will hang long time to add + * entries. + */ + @Test(timeout = 120000) + public void testAckQuorum() throws Exception { + // slow bookie + newBookie = bkutil.newBookie(); + // make quorum size and ensemble size same to avoid the interleave writing + // of the ledger entries + int ensembleSize = numBookies + 1; + int quorumSize = numBookies + 1; + int ackSize = numBookies; + // ensure that the journal manager has to use all bookies, + // so that a failure will fail the journal manager + Configuration conf = new Configuration(); + conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ENSEMBLE_SIZE, + ensembleSize); + conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE, + quorumSize); + conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ACK_QUORUM_SIZE, + ackSize); + // sets 60 minutes + conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ADD_ENTRY_TIMEOUT_SEC, + 3600); + + NamespaceInfo nsi = newNSInfo(); + BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, + BKJMUtil.createJournalURI("/hdfsjournal-onebookiefailure"), nsi); + bkjm.format(nsi); + CountDownLatch sleepLatch = new CountDownLatch(1); + sleepBookie(sleepLatch, newBookie); + + EditLogOutputStream out = bkjm.startLogSegment(1, + NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); + int numTransactions = 100; + for (long i = 1; i <= numTransactions; i++) { + FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); + op.setTransactionId(i); + out.write(op); + } + out.close(); + bkjm.finalizeLogSegment(1, numTransactions); + + List<EditLogInputStream> in = new ArrayList<EditLogInputStream>(); + bkjm.selectInputStreams(in, 1, true); + try { + assertEquals(numTransactions, + FSEditLogTestUtil.countTransactionsInStream(in.get(0))); + } finally { + sleepLatch.countDown(); + in.get(0).close(); + bkjm.close(); + } + } + + /** + * Sleep a bookie until I count down the latch + * + * @param latch + * Latch to wait on + * @param bookie + * bookie server + * @throws Exception + */ + private void sleepBookie(final CountDownLatch l, final BookieServer bookie) + throws Exception { + + Thread sleeper = new Thread() { + public void run() { + try { + bookie.suspendProcessing(); + l.await(60, TimeUnit.SECONDS); + bookie.resumeProcessing(); + } catch (Exception e) { + LOG.error("Error suspending bookie", e); + } + } + }; + sleeper.setName("BookieServerSleeper-" + bookie.getBookie().getId()); + sleeper.start(); + } + + private String startAndFinalizeLogSegment(BookKeeperJournalManager bkjm, int startTxid, int endTxid) throws IOException, KeeperException, InterruptedException {