Repository: hadoop
Updated Branches:
  refs/heads/branch-2 35fecb530 -> 2cbac36fd


HDFS-4266. BKJM: Separate write and ack quorum (Rakesh R via umamahesh)

(cherry picked from commit f0412de1c1d42b3c2a92531f81d97a24df920523)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2cbac36f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2cbac36f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2cbac36f

Branch: refs/heads/branch-2
Commit: 2cbac36fd3eb1160baf53f643223f96d53d111df
Parents: 35fecb5
Author: Uma Maheswara Rao G <umamah...@apache.org>
Authored: Tue Feb 17 21:28:49 2015 +0530
Committer: Uma Maheswara Rao G <umamah...@apache.org>
Committed: Tue Feb 17 21:31:43 2015 +0530

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   2 +
 .../bkjournal/BookKeeperJournalManager.java     |  15 +-
 .../bkjournal/TestBookKeeperJournalManager.java | 153 ++++++++++++++++++-
 3 files changed, 163 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/2cbac36f/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt 
b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 58561d9..b95eded 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -346,6 +346,8 @@ Release 2.7.0 - UNRELEASED
 
     HDFS-7797. Add audit log for setQuota operation (Rakesh R via umamahesh)
 
+    HDFS-4266. BKJM: Separate write and ack quorum (Rakesh R via umamahesh)
+
   OPTIMIZATIONS
 
     HDFS-7454. Reduce memory footprint for AclEntries in NameNode.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2cbac36f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java
index aecc464..16ffe52 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java
@@ -152,6 +152,13 @@ public class BookKeeperJournalManager implements 
JournalManager {
     = "dfs.namenode.bookkeeperjournal.readEntryTimeoutSec";
   public static final int BKJM_BOOKKEEPER_READ_ENTRY_TIMEOUT_DEFAULT = 5;
 
+  public static final String BKJM_BOOKKEEPER_ACK_QUORUM_SIZE 
+    = "dfs.namenode.bookkeeperjournal.ack.quorum-size";
+
+  public static final String BKJM_BOOKKEEPER_ADD_ENTRY_TIMEOUT_SEC
+    = "dfs.namenode.bookkeeperjournal.addEntryTimeoutSec";
+  public static final int BKJM_BOOKKEEPER_ADD_ENTRY_TIMEOUT_DEFAULT = 5;
+
   private ZooKeeper zkc;
   private final Configuration conf;
   private final BookKeeper bkc;
@@ -162,6 +169,8 @@ public class BookKeeperJournalManager implements 
JournalManager {
   private final MaxTxId maxTxId;
   private final int ensembleSize;
   private final int quorumSize;
+  private final int ackQuorumSize;
+  private final int addEntryTimeout;
   private final String digestpw;
   private final int speculativeReadTimeout;
   private final int readEntryTimeout;
@@ -184,6 +193,9 @@ public class BookKeeperJournalManager implements 
JournalManager {
                                BKJM_BOOKKEEPER_ENSEMBLE_SIZE_DEFAULT);
     quorumSize = conf.getInt(BKJM_BOOKKEEPER_QUORUM_SIZE,
                              BKJM_BOOKKEEPER_QUORUM_SIZE_DEFAULT);
+    ackQuorumSize = conf.getInt(BKJM_BOOKKEEPER_ACK_QUORUM_SIZE, quorumSize);
+    addEntryTimeout = conf.getInt(BKJM_BOOKKEEPER_ADD_ENTRY_TIMEOUT_SEC,
+                             BKJM_BOOKKEEPER_ADD_ENTRY_TIMEOUT_DEFAULT);
     speculativeReadTimeout = conf.getInt(
                              BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_MS,
                              BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_DEFAULT);
@@ -216,6 +228,7 @@ public class BookKeeperJournalManager implements 
JournalManager {
       ClientConfiguration clientConf = new ClientConfiguration();
       clientConf.setSpeculativeReadTimeout(speculativeReadTimeout);
       clientConf.setReadEntryTimeout(readEntryTimeout);
+      clientConf.setAddEntryTimeout(addEntryTimeout);
       bkc = new BookKeeper(clientConf, zkc);
     } catch (KeeperException e) {
       throw new IOException("Error initializing zk", e);
@@ -403,7 +416,7 @@ public class BookKeeperJournalManager implements 
JournalManager {
         // bookkeeper errored on last stream, clean up ledger
         currentLedger.close();
       }
-      currentLedger = bkc.createLedger(ensembleSize, quorumSize,
+      currentLedger = bkc.createLedger(ensembleSize, quorumSize, ackQuorumSize,
                                        BookKeeper.DigestType.MAC,
                                        digestpw.getBytes(Charsets.UTF_8));
     } catch (BKException bke) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2cbac36f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java
index 44e4bef..07fcd72 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java
@@ -32,6 +32,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Callable;
@@ -67,6 +68,7 @@ public class TestBookKeeperJournalManager {
   private ZooKeeper zkc;
   private static BKJMUtil bkutil;
   static int numBookies = 3;
+  private BookieServer newBookie;
 
   @BeforeClass
   public static void setupBookkeeper() throws Exception {
@@ -87,6 +89,9 @@ public class TestBookKeeperJournalManager {
   @After
   public void teardown() throws Exception {
     zkc.close();
+    if (newBookie != null) {
+      newBookie.shutdown();
+    }
   }
 
   private NamespaceInfo newNSInfo() {
@@ -377,7 +382,8 @@ public class TestBookKeeperJournalManager {
    */
   @Test
   public void testAllBookieFailure() throws Exception {
-    BookieServer bookieToFail = bkutil.newBookie();
+    // bookie to fail
+    newBookie = bkutil.newBookie();
     BookieServer replacementBookie = null;
 
     try {
@@ -408,7 +414,7 @@ public class TestBookKeeperJournalManager {
       }
       out.setReadyToFlush();
       out.flush();
-      bookieToFail.shutdown();
+      newBookie.shutdown();
       assertEquals("New bookie didn't die",
                    numBookies, bkutil.checkBookiesUp(numBookies, 10));
 
@@ -449,7 +455,7 @@ public class TestBookKeeperJournalManager {
       if (replacementBookie != null) {
         replacementBookie.shutdown();
       }
-      bookieToFail.shutdown();
+      newBookie.shutdown();
 
       if (bkutil.checkBookiesUp(numBookies, 30) != numBookies) {
         LOG.warn("Not all bookies from this test shut down, expect errors");
@@ -464,7 +470,7 @@ public class TestBookKeeperJournalManager {
    */
   @Test
   public void testOneBookieFailure() throws Exception {
-    BookieServer bookieToFail = bkutil.newBookie();
+    newBookie = bkutil.newBookie();
     BookieServer replacementBookie = null;
 
     try {
@@ -500,7 +506,7 @@ public class TestBookKeeperJournalManager {
       replacementBookie = bkutil.newBookie();
       assertEquals("replacement bookie didn't start",
                    ensembleSize+1, bkutil.checkBookiesUp(ensembleSize+1, 10));
-      bookieToFail.shutdown();
+      newBookie.shutdown();
       assertEquals("New bookie didn't die",
                    ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10));
 
@@ -518,7 +524,7 @@ public class TestBookKeeperJournalManager {
       if (replacementBookie != null) {
         replacementBookie.shutdown();
       }
-      bookieToFail.shutdown();
+      newBookie.shutdown();
 
       if (bkutil.checkBookiesUp(numBookies, 30) != numBookies) {
         LOG.warn("Not all bookies from this test shut down, expect errors");
@@ -822,6 +828,141 @@ public class TestBookKeeperJournalManager {
     assertTrue("No thread managed to complete formatting", numCompleted > 0);
   }
 
+  @Test(timeout = 120000)
+  public void testDefaultAckQuorum() throws Exception {
+    newBookie = bkutil.newBookie();
+    int ensembleSize = numBookies + 1;
+    int quorumSize = numBookies + 1;
+    // ensure that the journal manager has to use all bookies,
+    // so that a failure will fail the journal manager
+    Configuration conf = new Configuration();
+    conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ENSEMBLE_SIZE,
+        ensembleSize);
+    conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE,
+        quorumSize);
+    // sets 2 secs
+    conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ADD_ENTRY_TIMEOUT_SEC,
+        2);
+    NamespaceInfo nsi = newNSInfo();
+    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
+        BKJMUtil.createJournalURI("/hdfsjournal-onebookiefailure"), nsi);
+    bkjm.format(nsi);
+    CountDownLatch sleepLatch = new CountDownLatch(1);
+    sleepBookie(sleepLatch, newBookie);
+
+    EditLogOutputStream out = bkjm.startLogSegment(1,
+        NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
+    int numTransactions = 100;
+    for (long i = 1; i <= numTransactions; i++) {
+      FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
+      op.setTransactionId(i);
+      out.write(op);
+    }
+    try {
+      out.close();
+      bkjm.finalizeLogSegment(1, numTransactions);
+
+      List<EditLogInputStream> in = new ArrayList<EditLogInputStream>();
+      bkjm.selectInputStreams(in, 1, true);
+      try {
+        assertEquals(numTransactions,
+            FSEditLogTestUtil.countTransactionsInStream(in.get(0)));
+      } finally {
+        in.get(0).close();
+      }
+      fail("Should throw exception as not enough non-faulty bookies 
available!");
+    } catch (IOException ioe) {
+      // expected
+    }
+  }
+
+  /**
+   * Test ack quorum feature supported by bookkeeper. Keep ack quorum bookie
+   * alive and sleep all the other bookies. Now the client would wait for the
+   * acknowledgement from the ack size bookies and after receiving the success
+   * response will continue writing. Non ack client will hang long time to add
+   * entries.
+   */
+  @Test(timeout = 120000)
+  public void testAckQuorum() throws Exception {
+    // slow bookie
+    newBookie = bkutil.newBookie();
+    // make quorum size and ensemble size same to avoid the interleave writing
+    // of the ledger entries
+    int ensembleSize = numBookies + 1;
+    int quorumSize = numBookies + 1;
+    int ackSize = numBookies;
+    // ensure that the journal manager has to use all bookies,
+    // so that a failure will fail the journal manager
+    Configuration conf = new Configuration();
+    conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ENSEMBLE_SIZE,
+        ensembleSize);
+    conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE,
+        quorumSize);
+    conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ACK_QUORUM_SIZE,
+        ackSize);
+    // sets 60 minutes
+    conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ADD_ENTRY_TIMEOUT_SEC,
+        3600);
+
+    NamespaceInfo nsi = newNSInfo();
+    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
+        BKJMUtil.createJournalURI("/hdfsjournal-onebookiefailure"), nsi);
+    bkjm.format(nsi);
+    CountDownLatch sleepLatch = new CountDownLatch(1);
+    sleepBookie(sleepLatch, newBookie);
+
+    EditLogOutputStream out = bkjm.startLogSegment(1,
+        NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
+    int numTransactions = 100;
+    for (long i = 1; i <= numTransactions; i++) {
+      FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
+      op.setTransactionId(i);
+      out.write(op);
+    }
+    out.close();
+    bkjm.finalizeLogSegment(1, numTransactions);
+
+    List<EditLogInputStream> in = new ArrayList<EditLogInputStream>();
+    bkjm.selectInputStreams(in, 1, true);
+    try {
+      assertEquals(numTransactions,
+          FSEditLogTestUtil.countTransactionsInStream(in.get(0)));
+    } finally {
+      sleepLatch.countDown();
+      in.get(0).close();
+      bkjm.close();
+    }
+  }
+
+  /**
+   * Sleep a bookie until I count down the latch
+   *
+   * @param latch
+   *          Latch to wait on
+   * @param bookie
+   *          bookie server
+   * @throws Exception
+   */
+  private void sleepBookie(final CountDownLatch l, final BookieServer bookie)
+      throws Exception {
+
+    Thread sleeper = new Thread() {
+      public void run() {
+        try {
+          bookie.suspendProcessing();
+          l.await(60, TimeUnit.SECONDS);
+          bookie.resumeProcessing();
+        } catch (Exception e) {
+          LOG.error("Error suspending bookie", e);
+        }
+      }
+    };
+    sleeper.setName("BookieServerSleeper-" + bookie.getBookie().getId());
+    sleeper.start();
+  }
+
+
   private String startAndFinalizeLogSegment(BookKeeperJournalManager bkjm,
       int startTxid, int endTxid) throws IOException, KeeperException,
       InterruptedException {

Reply via email to