Author: nspiegelberg
Date: Tue Sep 27 01:07:26 2011
New Revision: 1176149
URL: http://svn.apache.org/viewvc?rev=1176149&view=rev
Log:
Added HBASE-2345 + recoverLog IOE fix to trunk. Note that dfs.support.append is
commented out in setup() because that was failing. Will investigate
Modified:
hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
Modified:
hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
URL:
http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java?rev=1176149&r1=1176148&r2=1176149&view=diff
==============================================================================
---
hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
(original)
+++
hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
Tue Sep 27 01:07:26 2011
@@ -20,24 +20,36 @@
package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException;
+import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestCase;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.hfile.TestHFile;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction;
+import org.apache.hadoop.io.SequenceFile;
/** JUnit test case for HLog */
public class TestHLog extends HBaseTestCase implements HConstants {
+ static final Log LOG = LogFactory.getLog(TestHLog.class);
+
private Path dir;
private Path oldLogDir;
private MiniDFSCluster cluster;
@@ -47,6 +59,7 @@ public class TestHLog extends HBaseTestC
// Make block sizes small.
this.conf.setInt("dfs.blocksize", 1024 * 1024);
this.conf.setInt("hbase.regionserver.flushlogentries", 1);
+ //this.conf.setBoolean("dfs.support.append", true);
cluster = new MiniDFSCluster(conf, 3, true, (String[])null);
// Set the hbase.rootdir to be the home directory in mini dfs.
this.conf.set(HConstants.HBASE_DIR,
@@ -254,6 +267,105 @@ public class TestHLog extends HBaseTestC
}
}
+ // For this test to pass, requires:
+ // 1. HDFS-200 (append support)
+ // 2. HDFS-988 (SafeMode should freeze file operations
+ // [FSNamesystem.nextGenerationStampForBlock])
+ // 3. HDFS-142 (on restart, maintain pendingCreates)
+ public void testAppendClose() throws Exception {
+ this.conf.setBoolean("dfs.support.append", true);
+ byte [] tableName = Bytes.toBytes(getName());
+ HRegionInfo regioninfo = new HRegionInfo(new HTableDescriptor(tableName),
+ HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false);
+ Path subdir = new Path(this.dir, "hlogdir");
+ Path archdir = new Path(this.dir, "hlogdir_archive");
+ HLog wal = new HLog(this.fs, subdir, archdir, this.conf, null);
+ final int total = 20;
+
+ for (int i = 0; i < total; i++) {
+ WALEdit kvs = new WALEdit();
+ kvs.add(new KeyValue(Bytes.toBytes(i), tableName, tableName));
+ wal.append(regioninfo, tableName, kvs, System.currentTimeMillis());
+ }
+ // Now call sync to send the data to HDFS datanodes
+ wal.sync(true);
+ final Path walPath = wal.computeFilename(wal.getFilenum());
+
+ // Stop the cluster. (ensure restart since we're sharing MiniDFSCluster)
+ try {
+ this.cluster.getNameNode().setSafeMode(SafeModeAction.SAFEMODE_ENTER);
+ this.cluster.shutdown();
+ try {
+ // wal.writer.close() will throw an exception,
+ // but still call this since it closes the LogSyncer thread first
+ wal.close();
+ } catch (IOException e) {
+ LOG.info(e);
+ }
+ this.fs.close(); // closing FS last so DFSOutputStream can't call close
+ LOG.info("STOPPED first instance of the cluster");
+ } finally {
+ // Restart the cluster
+ this.cluster = new MiniDFSCluster(conf, 2, false, null);
+ this.cluster.waitActive();
+ this.fs = cluster.getFileSystem();
+ LOG.info("START second instance.");
+ }
+
+ // set the lease period to be 1 second so that the
+ // namenode triggers lease recovery upon append request
+ Method setLeasePeriod = this.cluster.getClass()
+ .getDeclaredMethod("setLeasePeriod", new Class[]{Long.TYPE, Long.TYPE});
+ setLeasePeriod.setAccessible(true);
+ setLeasePeriod.invoke(cluster,
+ new Object[]{new Long(1000), new Long(1000)});
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ LOG.info(e);
+ }
+
+ // Now try recovering the log, like the HMaster would do
+ final FileSystem recoveredFs = this.fs;
+
+ class RecoverLogThread extends Thread {
+ public Exception exception = null;
+ public void run() {
+ try {
+ HLog.recoverLog(recoveredFs, walPath, true);
+ } catch (IOException e) {
+ exception = e;
+ }
+ }
+ }
+
+ RecoverLogThread t = new RecoverLogThread();
+ t.start();
+ // Timeout after 60 sec. Without correct patches, would be an infinite loop
+ t.join(60 * 1000);
+ if(t.isAlive()) {
+ t.interrupt();
+ throw new Exception("Timed out waiting for HLog.recoverLog()");
+ }
+
+ if (t.exception != null)
+ throw t.exception;
+
+ // Make sure you can read all the content
+ SequenceFile.Reader reader
+ = new SequenceFile.Reader(this.fs, walPath, this.conf);
+ int count = 0;
+ HLogKey key = HLog.newKey(this.conf);
+ WALEdit val = new WALEdit();
+ while (reader.next(key, val)) {
+ count++;
+ assertTrue("Should be one KeyValue per WALEdit",
+ val.getKeyValues().size() == 1);
+ }
+ assertEquals(total, count);
+ reader.close();
+ }
+
/**
* Tests that we can write out an edit, close, and then read it back in
again.
* @throws IOException