Author: stack
Date: Tue Mar 26 04:49:42 2013
New Revision: 1460974
URL: http://svn.apache.org/r1460974
Log:
HBASE-8156 Support for Namenode HA for non-idempotent operations
Modified:
hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionFileSystem.java
Modified:
hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
URL:
http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java?rev=1460974&r1=1460973&r2=1460974&view=diff
==============================================================================
---
hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
(original)
+++
hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
Tue Mar 26 04:49:42 2013
@@ -19,7 +19,6 @@
package org.apache.hadoop.hbase.regionserver;
-import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
@@ -49,6 +48,7 @@ import org.apache.hadoop.hbase.fs.HFileS
import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
/**
* View to an on-disk Region.
@@ -74,6 +74,15 @@ public class HRegionFileSystem {
private final Configuration conf;
private final Path tableDir;
private final FileSystem fs;
+
+ /**
+ * In order to handle NN connectivity hiccups, one need to retry
non-idempotent operation at the
+ * client level.
+ */
+ private final int hdfsClientRetriesNumber;
+ private final int baseSleepBeforeRetries;
+ private static final int DEFAULT_HDFS_CLIENT_RETRIES_NUMBER = 10;
+ private static final int DEFAULT_BASE_SLEEP_BEFORE_RETRIES = 1000;
/**
* Create a view to the on-disk region
@@ -82,13 +91,17 @@ public class HRegionFileSystem {
* @param tableDir {@link Path} to where the table is being stored
* @param regionInfo {@link HRegionInfo} for region
*/
- HRegionFileSystem(final Configuration conf, final FileSystem fs,
- final Path tableDir, final HRegionInfo regionInfo) {
+ HRegionFileSystem(final Configuration conf, final FileSystem fs, final Path
tableDir,
+ final HRegionInfo regionInfo) {
this.fs = fs;
this.conf = conf;
this.tableDir = tableDir;
this.regionInfo = regionInfo;
- }
+ this.hdfsClientRetriesNumber = conf.getInt("hdfs.client.retries.number",
+ DEFAULT_HDFS_CLIENT_RETRIES_NUMBER);
+ this.baseSleepBeforeRetries =
conf.getInt("hdfs.client.sleep.before.retries",
+ DEFAULT_BASE_SLEEP_BEFORE_RETRIES);
+ }
/** @return the underlying {@link FileSystem} */
public FileSystem getFileSystem() {
@@ -122,7 +135,7 @@ public class HRegionFileSystem {
* Clean up any temp detritus that may have been left around from previous
operation attempts.
*/
void cleanupTempDir() throws IOException {
- FSUtils.deleteDirectory(fs, getTempDir());
+ deleteDir(getTempDir());
}
//
===========================================================================
@@ -145,9 +158,8 @@ public class HRegionFileSystem {
*/
Path createStoreDir(final String familyName) throws IOException {
Path storeDir = getStoreDir(familyName);
- if (!fs.exists(storeDir) && !fs.mkdirs(storeDir)) {
- throw new IOException("Failed create of: " + storeDir);
- }
+ if(!fs.exists(storeDir) && !createDir(storeDir))
+ throw new IOException("Failed creating "+storeDir);
return storeDir;
}
@@ -240,11 +252,10 @@ public class HRegionFileSystem {
// delete the family folder
Path familyDir = getStoreDir(familyName);
- if (!fs.delete(familyDir, true)) {
- throw new IOException("Could not delete family " + familyName +
- " from FileSystem for region " + regionInfo.getRegionNameAsString() +
- "(" + regionInfo.getEncodedName() + ")");
- }
+ if(fs.exists(familyDir) && !deleteDir(familyDir))
+ throw new IOException("Could not delete family " + familyName
+ + " from FileSystem for region " +
regionInfo.getRegionNameAsString() + "("
+ + regionInfo.getEncodedName() + ")");
}
/**
@@ -312,7 +323,9 @@ public class HRegionFileSystem {
private Path commitStoreFile(final String familyName, final Path buildPath,
final long seqNum, final boolean generateNewName) throws IOException {
Path storeDir = getStoreDir(familyName);
- fs.mkdirs(storeDir);
+ if(!fs.exists(storeDir) && !createDir(storeDir))
+ throw new IOException("Failed creating " + storeDir);
+
String name = buildPath.getName();
if (generateNewName) {
name = generateUniqueName((seqNum < 0) ? null : "_SeqId_" + seqNum +
"_");
@@ -322,12 +335,14 @@ public class HRegionFileSystem {
throw new FileNotFoundException(buildPath.toString());
}
LOG.debug("Committing store file " + buildPath + " as " + dstPath);
- if (!fs.rename(buildPath, dstPath)) {
+ // buildPath exists, therefore not doing an exists() check.
+ if (!rename(buildPath, dstPath)) {
throw new IOException("Failed rename of " + buildPath + " to " +
dstPath);
}
return dstPath;
}
+
/**
* Moves multiple store files to the relative region's family store
directory.
* @param storeFiles list of store files divided by family
@@ -414,7 +429,7 @@ public class HRegionFileSystem {
* Clean up any split detritus that may have been left around from previous
split attempts.
*/
void cleanupSplitsDir() throws IOException {
- FSUtils.deleteDirectory(fs, getSplitsDir());
+ deleteDir(getSplitsDir());
}
/**
@@ -437,7 +452,7 @@ public class HRegionFileSystem {
if (daughters != null) {
for (FileStatus daughter: daughters) {
Path daughterDir = new Path(getTableDir(),
daughter.getPath().getName());
- if (fs.exists(daughterDir) && !fs.delete(daughterDir, true)) {
+ if (fs.exists(daughterDir) && !deleteDir(daughterDir)) {
throw new IOException("Failed delete of " + daughterDir);
}
}
@@ -453,7 +468,7 @@ public class HRegionFileSystem {
*/
void cleanupDaughterRegion(final HRegionInfo regionInfo) throws IOException {
Path regionDir = new Path(this.tableDir, regionInfo.getEncodedName());
- if (this.fs.exists(regionDir) && !this.fs.delete(regionDir, true)) {
+ if (this.fs.exists(regionDir) && !deleteDir(regionDir)) {
throw new IOException("Failed delete of " + regionDir);
}
}
@@ -467,7 +482,7 @@ public class HRegionFileSystem {
Path commitDaughterRegion(final HRegionInfo regionInfo) throws IOException {
Path regionDir = new Path(this.tableDir, regionInfo.getEncodedName());
Path daughterTmpDir = this.getSplitsDir(regionInfo);
- if (fs.exists(daughterTmpDir) && !fs.rename(daughterTmpDir, regionDir)) {
+ if (fs.exists(daughterTmpDir) && !rename(daughterTmpDir, regionDir)) {
throw new IOException("Unable to rename " + daughterTmpDir + " to " +
regionDir);
}
return regionDir;
@@ -480,12 +495,13 @@ public class HRegionFileSystem {
Path splitdir = getSplitsDir();
if (fs.exists(splitdir)) {
LOG.info("The " + splitdir + " directory exists. Hence deleting it to
recreate it");
- if (!fs.delete(splitdir, true)) {
+ if (!deleteDir(splitdir)) {
throw new IOException("Failed deletion of " + splitdir
+ " before creating them again.");
}
}
- if (!fs.mkdirs(splitdir)) {
+ // splitDir doesn't exists now. No need to do an exists() call for it.
+ if (!createDir(splitdir)) {
throw new IOException("Failed create of " + splitdir);
}
}
@@ -534,7 +550,7 @@ public class HRegionFileSystem {
* Clean up any merge detritus that may have been left around from previous
merge attempts.
*/
void cleanupMergesDir() throws IOException {
- FSUtils.deleteDirectory(fs, getMergesDir());
+ deleteDir(getMergesDir());
}
/**
@@ -740,7 +756,7 @@ public class HRegionFileSystem {
writeRegionInfoFileContent(conf, fs, tmpPath, regionInfoContent);
// Move the created file to the original path
- if (!fs.rename(tmpPath, regionInfoFile)) {
+ if (fs.exists(tmpPath) && !rename(tmpPath, regionInfoFile)) {
throw new IOException("Unable to rename " + tmpPath + " to " +
regionInfoFile);
}
} else {
@@ -768,7 +784,7 @@ public class HRegionFileSystem {
}
// Create the region directory
- if (!fs.mkdirs(regionFs.getRegionDir())) {
+ if (!createDirOnFileSystem(fs, conf, regionDir)) {
LOG.warn("Unable to create the region directory: " + regionDir);
throw new IOException("Unable to create region directory: " + regionDir);
}
@@ -842,4 +858,122 @@ public class HRegionFileSystem {
LOG.warn("Failed delete of " + regionDir);
}
}
+
+ /**
+ * Creates a directory. Assumes the user has already checked for this
directory existence.
+ * @param dir
+ * @return the result of fs.mkdirs(). In case underlying fs throws an
IOException, it checks
+ * whether the directory exists or not, and returns true if it
exists.
+ * @throws IOException
+ */
+ boolean createDir(Path dir) throws IOException {
+ int i = 0;
+ IOException lastIOE = null;
+ do {
+ try {
+ return fs.mkdirs(dir);
+ } catch (IOException ioe) {
+ lastIOE = ioe;
+ if (fs.exists(dir)) return true; // directory is present
+ sleepBeforeRetry("Create Directory", i+1);
+ }
+ } while (++i <= hdfsClientRetriesNumber);
+ throw new IOException("Exception in createDir", lastIOE);
+ }
+
+ /**
+ * Renames a directory. Assumes the user has already checked for this
directory existence.
+ * @param srcpath
+ * @param dstPath
+ * @return true if rename is successful.
+ * @throws IOException
+ */
+ boolean rename(Path srcpath, Path dstPath) throws IOException {
+ IOException lastIOE = null;
+ int i = 0;
+ do {
+ try {
+ return fs.rename(srcpath, dstPath);
+ } catch (IOException ioe) {
+ lastIOE = ioe;
+ if (!fs.exists(srcpath) && fs.exists(dstPath)) return true; //
successful move
+ // dir is not there, retry after some time.
+ sleepBeforeRetry("Rename Directory", i+1);
+ }
+ } while (++i <= hdfsClientRetriesNumber);
+ throw new IOException("Exception in rename", lastIOE);
+ }
+
+ /**
+ * Deletes a directory. Assumes the user has already checked for this
directory existence.
+ * @param dir
+ * @return true if the directory is deleted.
+ * @throws IOException
+ */
+ boolean deleteDir(Path dir) throws IOException {
+ IOException lastIOE = null;
+ int i = 0;
+ do {
+ try {
+ return fs.delete(dir, true);
+ } catch (IOException ioe) {
+ lastIOE = ioe;
+ if (!fs.exists(dir)) return true;
+ // dir is there, retry deleting after some time.
+ sleepBeforeRetry("Delete Directory", i+1);
+ }
+ } while (++i <= hdfsClientRetriesNumber);
+ throw new IOException("Exception in DeleteDir", lastIOE);
+ }
+
+ /**
+ * sleeping logic; handles the interrupt exception.
+ */
+ private void sleepBeforeRetry(String msg, int sleepMultiplier) {
+ sleepBeforeRetry(msg, sleepMultiplier, baseSleepBeforeRetries,
hdfsClientRetriesNumber);
+ }
+
+ /**
+ * Creates a directory for a filesystem and configuration object. Assumes
the user has already
+ * checked for this directory existence.
+ * @param fs
+ * @param conf
+ * @param dir
+ * @return the result of fs.mkdirs(). In case underlying fs throws an
IOException, it checks
+ * whether the directory exists or not, and returns true if it
exists.
+ * @throws IOException
+ */
+ private static boolean createDirOnFileSystem(FileSystem fs, Configuration
conf, Path dir)
+ throws IOException {
+ int i = 0;
+ IOException lastIOE = null;
+ int hdfsClientRetriesNumber = conf.getInt("hdfs.client.retries.number",
+ DEFAULT_HDFS_CLIENT_RETRIES_NUMBER);
+ int baseSleepBeforeRetries =
conf.getInt("hdfs.client.sleep.before.retries",
+ DEFAULT_BASE_SLEEP_BEFORE_RETRIES);
+ do {
+ try {
+ return fs.mkdirs(dir);
+ } catch (IOException ioe) {
+ lastIOE = ioe;
+ if (fs.exists(dir)) return true; // directory is present
+ sleepBeforeRetry("Create Directory", i+1, baseSleepBeforeRetries,
hdfsClientRetriesNumber);
+ }
+ } while (++i <= hdfsClientRetriesNumber);
+ throw new IOException("Exception in createDir", lastIOE);
+ }
+
+ /**
+ * sleeping logic for static methods; handles the interrupt exception.
Keeping a static version
+ * for this to avoid re-looking for the integer values.
+ */
+ private static void sleepBeforeRetry(String msg, int sleepMultiplier, int
baseSleepBeforeRetries,
+ int hdfsClientRetriesNumber) {
+ if (sleepMultiplier > hdfsClientRetriesNumber) {
+ LOG.debug(msg + ", retries exhausted");
+ return;
+ }
+ LOG.debug(msg + ", sleeping " + baseSleepBeforeRetries + " times " +
sleepMultiplier);
+ Threads.sleep(baseSleepBeforeRetries * sleepMultiplier);
+ }
}
Modified:
hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
URL:
http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java?rev=1460974&r1=1460973&r2=1460974&view=diff
==============================================================================
---
hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
(original)
+++
hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
Tue Mar 26 04:49:42 2013
@@ -165,19 +165,6 @@ public abstract class FSUtils {
return fs.exists(dir) && fs.delete(dir, true);
}
- /**
- * Check if directory exists. If it does not, create it.
- * @param fs filesystem object
- * @param dir path to check
- * @return Path
- * @throws IOException e
- */
- public Path checkdir(final FileSystem fs, final Path dir) throws IOException
{
- if (!fs.exists(dir)) {
- fs.mkdirs(dir);
- }
- return dir;
- }
/**
* Create the specified file on the filesystem. By default, this will:
Modified:
hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionFileSystem.java
URL:
http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionFileSystem.java?rev=1460974&r1=1460973&r2=1460974&view=diff
==============================================================================
---
hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionFileSystem.java
(original)
+++
hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionFileSystem.java
Tue Mar 26 04:49:42 2013
@@ -24,15 +24,25 @@ import static org.junit.Assert.assertFal
import static org.junit.Assert.assertTrue;
import java.io.IOException;
+import java.net.URI;
import java.util.Collection;
+import javax.management.RuntimeErrorException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.Progressable;
import org.junit.Test;
import org.junit.AfterClass;
@@ -43,6 +53,7 @@ import junit.framework.TestCase;
@Category(SmallTests.class)
public class TestHRegionFileSystem {
private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private static final Log LOG =
LogFactory.getLog(TestHRegionFileSystem.class);
@Test
public void testOnDiskRegionCreation() throws IOException {
@@ -74,6 +85,130 @@ public class TestHRegionFileSystem {
}
@Test
+ public void testNonIdempotentOpsWithRetries() throws IOException {
+ Path rootDir =
TEST_UTIL.getDataTestDirOnTestFS("testOnDiskRegionCreation");
+ FileSystem fs = TEST_UTIL.getTestFileSystem();
+ Configuration conf = TEST_UTIL.getConfiguration();
+
+ // Create a Region
+ HRegionInfo hri = new HRegionInfo(Bytes.toBytes("TestTable"));
+ HRegionFileSystem regionFs =
HRegionFileSystem.createRegionOnFileSystem(conf, fs, rootDir, hri);
+ assertTrue(fs.exists(regionFs.getRegionDir()));
+
+ regionFs = new HRegionFileSystem(conf, new MockFileSystemForCreate(),
+ null, null);
+ // HRegionFileSystem.createRegionOnFileSystem(conf, new
MockFileSystemForCreate(), rootDir,
+ // hri);
+ boolean result = regionFs.createDir(new Path("/foo/bar"));
+ assertTrue("Couldn't create the directory", result);
+
+
+ regionFs = new HRegionFileSystem(conf, new MockFileSystem(), null, null);
+ result = regionFs.rename(new Path("/foo/bar"), new Path("/foo/bar2"));
+ assertTrue("Couldn't rename the directory", result);
+
+ regionFs = new HRegionFileSystem(conf, new MockFileSystem(), null, null);
+ result = regionFs.deleteDir(new Path("/foo/bar"));
+ assertTrue("Couldn't delete the directory", result);
+ fs.delete(rootDir, true);
+ }
+
+ static class MockFileSystemForCreate extends MockFileSystem {
+ @Override
+ public boolean exists(Path path) {
+ return false;
+ }
+ }
+
+ /**
+ * a mock fs which throws exception for first 3 times, and then process the
call (returns the
+ * excepted result).
+ */
+ static class MockFileSystem extends FileSystem {
+ int retryCount;
+ final static int successRetryCount = 3;
+
+ public MockFileSystem() {
+ retryCount = 0;
+ }
+
+ @Override
+ public FSDataOutputStream append(Path arg0, int arg1, Progressable arg2)
throws IOException {
+ throw new IOException("");
+ }
+
+ @Override
+ public FSDataOutputStream create(Path arg0, FsPermission arg1, boolean
arg2, int arg3,
+ short arg4, long arg5, Progressable arg6) throws IOException {
+ LOG.debug("Create, " + retryCount);
+ if (retryCount++ < successRetryCount) throw new IOException("Something
bad happen");
+ return null;
+ }
+
+ @Override
+ public boolean delete(Path arg0) throws IOException {
+ if (retryCount++ < successRetryCount) throw new IOException("Something
bad happen");
+ return true;
+ }
+
+ @Override
+ public boolean delete(Path arg0, boolean arg1) throws IOException {
+ if (retryCount++ < successRetryCount) throw new IOException("Something
bad happen");
+ return true;
+ }
+
+ @Override
+ public FileStatus getFileStatus(Path arg0) throws IOException {
+ FileStatus fs = new FileStatus();
+ return fs;
+ }
+
+ @Override
+ public boolean exists(Path path) {
+ return true;
+ }
+
+ @Override
+ public URI getUri() {
+ throw new RuntimeException("Something bad happen");
+ }
+
+ @Override
+ public Path getWorkingDirectory() {
+ throw new RuntimeException("Something bad happen");
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path arg0) throws IOException {
+ throw new IOException("Something bad happen");
+ }
+
+ @Override
+ public boolean mkdirs(Path arg0, FsPermission arg1) throws IOException {
+ LOG.debug("mkdirs, " + retryCount);
+ if (retryCount++ < successRetryCount) throw new IOException("Something
bad happen");
+ return true;
+ }
+
+ @Override
+ public FSDataInputStream open(Path arg0, int arg1) throws IOException {
+ throw new IOException("Something bad happen");
+ }
+
+ @Override
+ public boolean rename(Path arg0, Path arg1) throws IOException {
+ LOG.debug("rename, " + retryCount);
+ if (retryCount++ < successRetryCount) throw new IOException("Something
bad happen");
+ return true;
+ }
+
+ @Override
+ public void setWorkingDirectory(Path arg0) {
+ throw new RuntimeException("Something bad happen");
+ }
+ }
+
+ @Test
public void testTempAndCommit() throws IOException {
Path rootDir = TEST_UTIL.getDataTestDirOnTestFS("testTempAndCommit");
FileSystem fs = TEST_UTIL.getTestFileSystem();