Interesting story here, testing with dfs.support.append=true might be difficult/impossible using unit tests because of this code:
// // We found the lease for this file. And surprisingly the original // holder is trying to recreate this file. This should never occur. // if (lease != null) { Lease leaseFile = leaseManager.getLeaseByPath(src); if (leaseFile != null && leaseFile.equals(lease)) { throw new AlreadyBeingCreatedException( "failed to create file " + src + " for " + holder + " on client " + clientMachine + " because current leaseholder is trying to recreate file."); } } I ran into this with the following patch on trunk: diff --git a/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index d2b01fe..8c81aed 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ b/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.regionserver.wal; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -39,6 +40,7 @@ import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Writable; import java.io.DataInput; @@ -1023,7 +1025,8 @@ public class HLog implements HConstants, Syncable { * @param rootDir qualified root directory of the HBase instance * @param srcDir Directory of log files to split: e.g. * <code>${ROOTDIR}/log_HOST_PORT</code> - * @param oldLogDir + * @param oldLogDir the system-wide archival directory for non-split HLogs after either + * (a) recovery of a RS or (b) pruning of HLogs in a RS due to flushes. * @param fs FileSystem * @param conf HBaseConfiguration * @throws IOException @@ -1121,6 +1124,7 @@ public class HLog implements HConstants, Syncable { int concurrentLogReads = conf.getInt("hbase.regionserver.hlog.splitlog.reader.threads", 3); // Is append supported? + boolean append = isAppend(conf); try { int maxSteps = Double.valueOf(Math.ceil((logfiles.length * 1.0) / concurrentLogReads)).intValue(); @@ -1139,6 +1143,7 @@ public class HLog implements HConstants, Syncable { LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length + ": " + logfiles[i].getPath() + ", length=" + logfiles[i].getLen()); } + recoverLog(fs, logfiles[i].getPath(), append); Reader in = null; int count = 0; try { @@ -1290,6 +1295,65 @@ public class HLog implements HConstants, Syncable { return splits; } + /* + * @param conf + * @return True if append enabled and we have the syncFs in our path. + */ + static boolean isAppend(final Configuration conf) { + boolean append = conf.getBoolean("dfs.support.append", false); + if (append) { + try { + SequenceFile.Writer.class.getMethod("syncFs", new Class<?> []{}); + append = true; + } catch (SecurityException ignored) { + } catch (NoSuchMethodException e) { + append = false; + } + } + return append; + } + + /* + * Recover log. + * Try and open log in append mode. + * Doing this, we get a hold of the file that crashed writer + * was writing to. Once we have it, close it. This will + * allow subsequent reader to see up to last sync. + * @param fs + * @param p + * @param append + */ + public static void recoverLog(final FileSystem fs, final Path p, + final boolean append) { + if (!append) { + return; + } + + // lease recovery not needed for local file system case. + // currently, local file system doesn't implement append either. + if (!(fs instanceof DistributedFileSystem)) { + return; + } + + // Trying recovery + boolean recovered = false; + while (!recovered) { + try { + FSDataOutputStream out = fs.append(p); + out.close(); + recovered = true; + } catch (IOException e) { + LOG.info("Failed open for append, waiting on lease recovery: " + p, e); + try { + Thread.sleep(1000); + } catch (InterruptedException ex) { + // ignore it and try again + } + } + } + LOG.info("Past out lease recovery"); + } + /** * Utility class that lets us keep track of the edit with it's key * Only used when splitting logs diff --git a/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreReconstruction.java b/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreReconstruction.java index 05dd38d..7f6e869 100644 --- a/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreReconstruction.java +++ b/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreReconstruction.java @@ -66,6 +66,7 @@ public class TestStoreReconstruction { public void setUp() throws Exception { conf = TEST_UTIL.getConfiguration(); + conf.set("dfs.support.append", "true"); cluster = new MiniDFSCluster(conf, 3, true, (String[])null); // Set the hbase.rootdir to be the home directory in mini dfs. TEST_UTIL.getConfiguration().set(HConstants.HBASE_DIR, @@ -99,7 +100,7 @@ public class TestStoreReconstruction { HRegionInfo info = new HRegionInfo(htd, null, null, false); Path oldLogDir = new Path(this.dir, HConstants.HREGION_OLDLOGDIR_NAME); HLog log = new HLog(cluster.getFileSystem(), - this.dir, oldLogDir, conf, null); + new Path(this.dir, "rs"), oldLogDir, conf, null); HRegion region = new HRegion(dir, log, cluster.getFileSystem(),conf, info, null); List<KeyValue> result = new ArrayList<KeyValue>(); @@ -134,11 +135,11 @@ public class TestStoreReconstruction { log.sync(); // TODO dont close the file here. - log.close(); + //log.close(); List<Path> splits = HLog.splitLog(new Path(conf.get(HConstants.HBASE_DIR)), - this.dir, oldLogDir, cluster.getFileSystem(), conf); + new Path(this.dir, "rs"), oldLogDir, cluster.getFileSystem(), conf); // Split should generate only 1 file since there's only 1 region assertEquals(1, splits.size()); diff --git a/pom.xml b/pom.xml index 9e2ed37..a53f484 100644 --- a/pom.xml +++ b/pom.xml @@ -158,7 +158,7 @@ <compileSource>1.6</compileSource> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - <hadoop.version>0.20.2</hadoop.version> + <hadoop.version>0.20.2+228+200a</hadoop.version> <log4j.version>1.2.15</log4j.version> <jetty.version>6.1.14</jetty.version>