Interesting story here, testing with dfs.support.append=true might be
difficult/impossible using unit tests because of this code:
//
// We found the lease for this file. And surprisingly the original
// holder is trying to recreate this file. This should never occur.
//
if (lease != null) {
Lease leaseFile = leaseManager.getLeaseByPath(src);
if (leaseFile != null && leaseFile.equals(lease)) {
throw new AlreadyBeingCreatedException(
"failed to create
file " + src + " for " + holder +
" on client " +
clientMachine +
" because current
leaseholder is trying to recreate file.");
}
}
I ran into this with the following patch on trunk:
diff --git
a/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
b/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
index d2b01fe..8c81aed 100644
--- a/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
+++ b/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
@@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.regionserver.wal;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -39,6 +40,7 @@ import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
@@ -1023,7 +1025,8 @@ public class HLog implements HConstants, Syncable {
* @param rootDir qualified root directory of the HBase instance
* @param srcDir Directory of log files to split: e.g.
* <code>${ROOTDIR}/log_HOST_PORT</code>
- * @param oldLogDir
+ * @param oldLogDir the system-wide archival directory for
non-split HLogs after either
+ * (a) recovery of a RS or (b) pruning of HLogs in a RS due to flushes.
* @param fs FileSystem
* @param conf HBaseConfiguration
* @throws IOException
@@ -1121,6 +1124,7 @@ public class HLog implements HConstants, Syncable {
int concurrentLogReads =
conf.getInt("hbase.regionserver.hlog.splitlog.reader.threads", 3);
// Is append supported?
+ boolean append = isAppend(conf);
try {
int maxSteps = Double.valueOf(Math.ceil((logfiles.length * 1.0) /
concurrentLogReads)).intValue();
@@ -1139,6 +1143,7 @@ public class HLog implements HConstants, Syncable {
LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length +
": " + logfiles[i].getPath() + ", length=" +
logfiles[i].getLen());
}
+ recoverLog(fs, logfiles[i].getPath(), append);
Reader in = null;
int count = 0;
try {
@@ -1290,6 +1295,65 @@ public class HLog implements HConstants, Syncable {
return splits;
}
+ /*
+ * @param conf
+ * @return True if append enabled and we have the syncFs in our path.
+ */
+ static boolean isAppend(final Configuration conf) {
+ boolean append = conf.getBoolean("dfs.support.append", false);
+ if (append) {
+ try {
+ SequenceFile.Writer.class.getMethod("syncFs", new Class<?> []{});
+ append = true;
+ } catch (SecurityException ignored) {
+ } catch (NoSuchMethodException e) {
+ append = false;
+ }
+ }
+ return append;
+ }
+
+ /*
+ * Recover log.
+ * Try and open log in append mode.
+ * Doing this, we get a hold of the file that crashed writer
+ * was writing to. Once we have it, close it. This will
+ * allow subsequent reader to see up to last sync.
+ * @param fs
+ * @param p
+ * @param append
+ */
+ public static void recoverLog(final FileSystem fs, final Path p,
+ final boolean append) {
+ if (!append) {
+ return;
+ }
+
+ // lease recovery not needed for local file system case.
+ // currently, local file system doesn't implement append either.
+ if (!(fs instanceof DistributedFileSystem)) {
+ return;
+ }
+
+ // Trying recovery
+ boolean recovered = false;
+ while (!recovered) {
+ try {
+ FSDataOutputStream out = fs.append(p);
+ out.close();
+ recovered = true;
+ } catch (IOException e) {
+ LOG.info("Failed open for append, waiting on lease recovery: " + p, e);
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ex) {
+ // ignore it and try again
+ }
+ }
+ }
+ LOG.info("Past out lease recovery");
+ }
+
/**
* Utility class that lets us keep track of the edit with it's key
* Only used when splitting logs
diff --git
a/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreReconstruction.java
b/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreReconstruction.java
index 05dd38d..7f6e869 100644
---
a/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreReconstruction.java
+++
b/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreReconstruction.java
@@ -66,6 +66,7 @@ public class TestStoreReconstruction {
public void setUp() throws Exception {
conf = TEST_UTIL.getConfiguration();
+ conf.set("dfs.support.append", "true");
cluster = new MiniDFSCluster(conf, 3, true, (String[])null);
// Set the hbase.rootdir to be the home directory in mini dfs.
TEST_UTIL.getConfiguration().set(HConstants.HBASE_DIR,
@@ -99,7 +100,7 @@ public class TestStoreReconstruction {
HRegionInfo info = new HRegionInfo(htd, null, null, false);
Path oldLogDir = new Path(this.dir, HConstants.HREGION_OLDLOGDIR_NAME);
HLog log = new HLog(cluster.getFileSystem(),
- this.dir, oldLogDir, conf, null);
+ new Path(this.dir, "rs"), oldLogDir, conf, null);
HRegion region = new HRegion(dir, log,
cluster.getFileSystem(),conf, info, null);
List<KeyValue> result = new ArrayList<KeyValue>();
@@ -134,11 +135,11 @@ public class TestStoreReconstruction {
log.sync();
// TODO dont close the file here.
- log.close();
+ //log.close();
List<Path> splits =
HLog.splitLog(new Path(conf.get(HConstants.HBASE_DIR)),
- this.dir, oldLogDir, cluster.getFileSystem(), conf);
+ new Path(this.dir, "rs"), oldLogDir,
cluster.getFileSystem(), conf);
// Split should generate only 1 file since there's only 1 region
assertEquals(1, splits.size());
diff --git a/pom.xml b/pom.xml
index 9e2ed37..a53f484 100644
--- a/pom.xml
+++ b/pom.xml
@@ -158,7 +158,7 @@
<compileSource>1.6</compileSource>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <hadoop.version>0.20.2</hadoop.version>
+ <hadoop.version>0.20.2+228+200a</hadoop.version>
<log4j.version>1.2.15</log4j.version>
<jetty.version>6.1.14</jetty.version>