Author: nspiegelberg
Date: Tue Sep 27 01:07:26 2011
New Revision: 1176149

URL: http://svn.apache.org/viewvc?rev=1176149&view=rev
Log:
Added HBASE-2345 + recoverLog IOE fix to trunk. Note that dfs.support.append is 
commented out in setup() because that was failing. Will investigate

Modified:
    
hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java

Modified: 
hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
URL: 
http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java?rev=1176149&r1=1176148&r2=1176149&view=diff
==============================================================================
--- 
hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
 (original)
+++ 
hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
 Tue Sep 27 01:07:26 2011
@@ -20,24 +20,36 @@
 package org.apache.hadoop.hbase.regionserver.wal;
 
 import java.io.IOException;
+import java.lang.reflect.Method;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseTestCase;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.hfile.TestHFile;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction;
+import org.apache.hadoop.io.SequenceFile;
 
 /** JUnit test case for HLog */
 public class TestHLog extends HBaseTestCase implements HConstants {
+  static final Log LOG = LogFactory.getLog(TestHLog.class);
+
   private Path dir;
   private Path oldLogDir;
   private MiniDFSCluster cluster;
@@ -47,6 +59,7 @@ public class TestHLog extends HBaseTestC
     // Make block sizes small.
     this.conf.setInt("dfs.blocksize", 1024 * 1024);
     this.conf.setInt("hbase.regionserver.flushlogentries", 1);
+    //this.conf.setBoolean("dfs.support.append", true);
     cluster = new MiniDFSCluster(conf, 3, true, (String[])null);
     // Set the hbase.rootdir to be the home directory in mini dfs.
     this.conf.set(HConstants.HBASE_DIR,
@@ -254,6 +267,105 @@ public class TestHLog extends HBaseTestC
     }
   }
 
+  // For this test to pass, requires:
+  // 1. HDFS-200 (append support)
+  // 2. HDFS-988 (SafeMode should freeze file operations
+  //              [FSNamesystem.nextGenerationStampForBlock])
+  // 3. HDFS-142 (on restart, maintain pendingCreates)
+  public void testAppendClose() throws Exception {
+    this.conf.setBoolean("dfs.support.append", true);
+    byte [] tableName = Bytes.toBytes(getName());
+    HRegionInfo regioninfo = new HRegionInfo(new HTableDescriptor(tableName),
+        HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false);
+    Path subdir = new Path(this.dir, "hlogdir");
+    Path archdir = new Path(this.dir, "hlogdir_archive");
+    HLog wal = new HLog(this.fs, subdir, archdir, this.conf, null);
+    final int total = 20;
+
+    for (int i = 0; i < total; i++) {
+      WALEdit kvs = new WALEdit();
+      kvs.add(new KeyValue(Bytes.toBytes(i), tableName, tableName));
+      wal.append(regioninfo, tableName, kvs, System.currentTimeMillis());
+    }
+    // Now call sync to send the data to HDFS datanodes
+    wal.sync(true);
+    final Path walPath = wal.computeFilename(wal.getFilenum());
+
+    // Stop the cluster.  (ensure restart since we're sharing MiniDFSCluster)
+    try {
+      this.cluster.getNameNode().setSafeMode(SafeModeAction.SAFEMODE_ENTER);
+      this.cluster.shutdown();
+      try {
+        // wal.writer.close() will throw an exception,
+        // but still call this since it closes the LogSyncer thread first
+        wal.close();
+      } catch (IOException e) {
+        LOG.info(e);
+      }
+      this.fs.close(); // closing FS last so DFSOutputStream can't call close
+      LOG.info("STOPPED first instance of the cluster");
+    } finally {
+      // Restart the cluster
+      this.cluster = new MiniDFSCluster(conf, 2, false, null);
+      this.cluster.waitActive();
+      this.fs = cluster.getFileSystem();
+      LOG.info("START second instance.");
+    }
+
+    // set the lease period to be 1 second so that the
+    // namenode triggers lease recovery upon append request
+    Method setLeasePeriod = this.cluster.getClass()
+      .getDeclaredMethod("setLeasePeriod", new Class[]{Long.TYPE, Long.TYPE});
+    setLeasePeriod.setAccessible(true);
+    setLeasePeriod.invoke(cluster,
+                          new Object[]{new Long(1000), new Long(1000)});
+    try {
+      Thread.sleep(1000);
+    } catch (InterruptedException e) {
+      LOG.info(e);
+    }
+
+    // Now try recovering the log, like the HMaster would do
+    final FileSystem recoveredFs = this.fs;
+
+    class RecoverLogThread extends Thread {
+      public Exception exception = null;
+      public void run() {
+          try {
+            HLog.recoverLog(recoveredFs, walPath, true);
+          } catch (IOException e) {
+            exception = e;
+          }
+      }
+    }
+
+    RecoverLogThread t = new RecoverLogThread();
+    t.start();
+    // Timeout after 60 sec. Without correct patches, would be an infinite loop
+    t.join(60 * 1000);
+    if(t.isAlive()) {
+      t.interrupt();
+      throw new Exception("Timed out waiting for HLog.recoverLog()");
+    }
+
+    if (t.exception != null)
+      throw t.exception;
+
+    // Make sure you can read all the content
+    SequenceFile.Reader reader
+      = new SequenceFile.Reader(this.fs, walPath, this.conf);
+    int count = 0;
+    HLogKey key = HLog.newKey(this.conf);
+    WALEdit val = new WALEdit();
+    while (reader.next(key, val)) {
+      count++;
+      assertTrue("Should be one KeyValue per WALEdit",
+                 val.getKeyValues().size() == 1);
+    }
+    assertEquals(total, count);
+    reader.close();
+  }
+
   /**
    * Tests that we can write out an edit, close, and then read it back in 
again.
    * @throws IOException


Reply via email to