Repository: hbase
Updated Branches:
  refs/heads/0.94 c0d9ac9a0 -> b65849bcd


HBASE-11405 Multiple invocations of hbck in parallel disables balancer 
permanently (Sean Busbey and bharath v)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/b65849bc
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/b65849bc
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/b65849bc

Branch: refs/heads/0.94
Commit: b65849bcde31a0c0c4c8eda0cc3678d441c9cd1c
Parents: c0d9ac9
Author: Ted Yu <[email protected]>
Authored: Thu Sep 25 16:46:20 2014 +0000
Committer: Ted Yu <[email protected]>
Committed: Thu Sep 25 16:46:20 2014 +0000

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/util/HBaseFsck.java | 86 +++++++++++++++++++-
 .../hadoop/hbase/HBaseTestingUtility.java       | 59 +++++++++++++-
 .../apache/hadoop/hbase/util/TestHBaseFsck.java | 50 ++++++++++++
 3 files changed, 191 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/b65849bc/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java 
b/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
index e028b56..469c98e 100644
--- a/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
+++ b/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
@@ -21,6 +21,7 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.StringWriter;
+import java.net.InetAddress;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -43,6 +44,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
@@ -50,10 +52,12 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.ClusterStatus;
 import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -97,7 +101,9 @@ import 
org.apache.hadoop.hbase.util.hbck.TableIntegrityErrorHandlerImpl;
 import org.apache.hadoop.hbase.zookeeper.RootRegionTracker;
 import org.apache.hadoop.hbase.zookeeper.ZKTableReadOnly;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Tool;
@@ -163,6 +169,8 @@ public class HBaseFsck extends Configured implements Tool {
   private static final int DEFAULT_OVERLAPS_TO_SIDELINE = 2;
   private static final int DEFAULT_MAX_MERGE = 5;
   private static final String TO_BE_LOADED = "to_be_loaded";
+  private static final String HBCK_LOCK_FILE = "hbase-hbck.lock";
+
 
   /**********************
    * Internal resources
@@ -177,6 +185,12 @@ public class HBaseFsck extends Configured implements Tool {
   private long startMillis = System.currentTimeMillis();
   private HFileCorruptionChecker hfcc;
   private int retcode = 0;
+  private Path HBCK_LOCK_PATH;
+  private FSDataOutputStream hbckOutFd;
+  // This lock is to prevent cleanup of balancer resources twice between
+  // ShutdownHook and the main code. We cleanup only if the connect() is
+  // successful
+  private final AtomicBoolean hbckLockCleanup = new AtomicBoolean(false);
 
   /***********
    * Options
@@ -277,10 +291,78 @@ public class HBaseFsck extends Configured implements Tool 
{
   }
 
   /**
+   * This method maintains a lock using a file. If the creation fails we 
return null
+   *
+   * @return FSDataOutputStream object corresponding to the newly opened lock 
file
+   * @throws IOException
+   */
+  private FSDataOutputStream checkAndMarkRunningHbck() throws IOException {
+    try {
+      FileSystem fs = FSUtils.getCurrentFileSystem(getConf());
+      FsPermission defaultPerms = FSUtils.getFilePermissions(fs, getConf(),
+          HConstants.DATA_FILE_UMASK_KEY);
+      Path tmpDir = new Path(FSUtils.getRootDir(getConf()), 
HConstants.HBASE_TEMP_DIRECTORY);
+      fs.mkdirs(tmpDir);
+      HBCK_LOCK_PATH = new Path(tmpDir, HBCK_LOCK_FILE);
+      final FSDataOutputStream out = FSUtils.create(fs, HBCK_LOCK_PATH, 
defaultPerms, false);
+      out.writeBytes(InetAddress.getLocalHost().toString());
+      out.flush();
+      return out;
+    } catch (IOException exception) {
+      RemoteException e = null;
+      if (exception instanceof RemoteException) {
+        e = (RemoteException)exception;
+      } else if (exception.getCause() instanceof RemoteException) {
+        e = (RemoteException)(exception.getCause());
+      }
+      if(null != e && 
AlreadyBeingCreatedException.class.getName().equals(e.getClassName())){
+        return null;
+      }
+      throw exception;
+    }
+  }
+
+  private void unlockHbck() {
+    if(hbckLockCleanup.compareAndSet(true, false)){
+      IOUtils.closeStream(hbckOutFd);
+      try{
+        FSUtils.delete(FSUtils.getCurrentFileSystem(getConf()), 
HBCK_LOCK_PATH, true);
+      } catch(IOException ioe) {
+        LOG.warn("Failed to delete " + HBCK_LOCK_PATH);
+        LOG.debug(ioe);
+      }
+    }
+  }
+
+  /**
    * To repair region consistency, one must call connect() in order to repair
    * online state.
    */
   public void connect() throws IOException {
+
+    // Check if another instance of balancer is running
+    hbckOutFd = checkAndMarkRunningHbck();
+    if (hbckOutFd == null) {
+      setRetCode(-1);
+      LOG.error("Another instance of hbck is running, exiting this 
instance.[If you are sure" +
+                     " no other instance is running, delete the lock file " +
+                     HBCK_LOCK_PATH + " and rerun the tool]");
+      throw new IOException("Duplicate hbck - Abort");
+    }
+
+    // Make sure to cleanup the lock
+    hbckLockCleanup.set(true);
+
+    // Add a shutdown hook to this thread, incase user tries to
+    // kill the hbck with a ctrl-c, we want to cleanup the lock so that
+    // it is available for further calls
+    Runtime.getRuntime().addShutdownHook(new Thread() {
+      public void run() {
+          unlockHbck();
+      }
+    });
+    LOG.debug("Launching hbck");
+
     admin = new HBaseAdmin(getConf());
     meta = new HTable(getConf(), HConstants.META_TABLE_NAME);
     status = admin.getMaster().getClusterStatus();
@@ -462,6 +544,9 @@ public class HBaseFsck extends Configured implements Tool {
 
     offlineReferenceFileRepair();
 
+    // Remove the hbck lock
+    unlockHbck();
+
     // Print table summary
     printTableSummary(tablesInfo);
     return errors.summarize();
@@ -3691,7 +3776,6 @@ public class HBaseFsck extends Configured implements Tool 
{
     URI defaultFs = hbasedir.getFileSystem(conf).getUri();
     conf.set("fs.defaultFS", defaultFs.toString());     // for hadoop 0.21+
     conf.set("fs.default.name", defaultFs.toString());  // for hadoop 0.20
-
     int ret = ToolRunner.run(new HBaseFsck(conf), args);
     System.exit(ret);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/b65849bc/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java 
b/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index c2bcd80..fc54826 100644
--- a/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ b/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -25,7 +25,9 @@ import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.lang.reflect.Field;
+import java.lang.reflect.Method;
 import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.UnknownHostException;
@@ -88,6 +90,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NodeExistsException;
 import org.apache.zookeeper.WatchedEvent;
@@ -199,12 +202,18 @@ public class HBaseTestingUtility {
 
     // a hbase checksum verification failure will cause unit tests to fail
     ChecksumUtil.generateExceptionForChecksumFailureForTest(true);
-    setHDFSClientRetryProperty();
+    setHDFSClientRetry(1);
   }
 
-  private void setHDFSClientRetryProperty() {
-    this.conf.setInt("hdfs.client.retries.number", 1);
+  /**
+   * Controls how many attempts we will make in the face of failures in HDFS.
+   */
+  public void setHDFSClientRetry(final int retries) {
+    this.conf.setInt("hdfs.client.retries.number", retries);
     HBaseFileSystem.setRetryCounts(conf);
+    if (0 == retries) {
+      makeDFSClientNonRetrying();
+    }
   }
 
   /**
@@ -1924,6 +1933,50 @@ public class HBaseTestingUtility {
     }
   }
 
+  void makeDFSClientNonRetrying() {
+    if (null == this.dfsCluster) {
+      LOG.debug("dfsCluster has not started, can't make client non-retrying.");
+      return;
+    }
+    try {
+      final FileSystem filesystem = this.dfsCluster.getFileSystem();
+      if (!(filesystem instanceof DistributedFileSystem)) {
+        LOG.debug("dfsCluster is not backed by a DistributedFileSystem, can't 
make client non-retrying.");
+        return;
+      }
+      // rely on FileSystem.CACHE to alter how we talk via DFSClient
+      final DistributedFileSystem fs = (DistributedFileSystem)filesystem;
+      // retrieve the backing DFSClient instance
+      final Field dfsField = fs.getClass().getDeclaredField("dfs");
+      dfsField.setAccessible(true);
+      final Class<?> dfsClazz = dfsField.getType();
+      final DFSClient dfs = DFSClient.class.cast(dfsField.get(fs));
+
+      // expose the method for creating direct RPC connections.
+      final Method createRPCNamenode = 
dfsClazz.getDeclaredMethod("createRPCNamenode", InetSocketAddress.class, 
Configuration.class, UserGroupInformation.class);
+      createRPCNamenode.setAccessible(true);
+
+      // grab the DFSClient instance's backing connection information
+      final Field nnField = dfsClazz.getDeclaredField("nnAddress");
+      nnField.setAccessible(true);
+      final InetSocketAddress nnAddress = 
InetSocketAddress.class.cast(nnField.get(dfs));
+      final Field confField = dfsClazz.getDeclaredField("conf");
+      confField.setAccessible(true);
+      final Configuration conf = Configuration.class.cast(confField.get(dfs));
+      final Field ugiField = dfsClazz.getDeclaredField("ugi");
+      ugiField.setAccessible(true);
+      final UserGroupInformation ugi = 
UserGroupInformation.class.cast(ugiField.get(dfs));
+
+      // replace the proxy for the namenode rpc with a direct instance
+      final Field namenodeField = dfsClazz.getDeclaredField("namenode");
+      namenodeField.setAccessible(true);
+      namenodeField.set(dfs, createRPCNamenode.invoke(null, nnAddress, conf, 
ugi));
+      LOG.debug("Set DSFClient namenode to bare RPC");
+    } catch (Exception exception) {
+      LOG.info("Could not alter DFSClient to be non-retrying.", exception);
+    }
+  }
+
   /**
    * Wait until all regions for a table in .META. have a non-empty
    * info:server, up to 60 seconds.  This means all regions have been deployed,

http://git-wip-us.apache.org/repos/asf/hbase/blob/b65849bc/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java 
b/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java
index 4ec70cf..3e95a5e 100644
--- a/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java
+++ b/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java
@@ -35,8 +35,12 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
@@ -116,6 +120,7 @@ public class TestHBaseFsck {
   public static void setUpBeforeClass() throws Exception {
     
TEST_UTIL.getConfiguration().setBoolean("hbase.master.distributed.log.splitting",
 false);
     TEST_UTIL.startMiniCluster(3);
+    TEST_UTIL.setHDFSClientRetry(0);
   }
 
   @AfterClass
@@ -539,6 +544,51 @@ public class TestHBaseFsck {
   }
 
   /**
+   * This test makes sure that parallel instances of Hbck is disabled.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testParallelHbck() throws Exception {
+    final ExecutorService service;
+    final Future<HBaseFsck> hbck1,hbck2;
+
+    class RunHbck implements Callable<HBaseFsck>{
+      boolean fail = true;
+      public HBaseFsck call(){
+        try{
+          return doFsck(conf, false);
+        } catch(Exception e){
+          if (e.getMessage().contains("Duplicate hbck")) {
+            fail = false;
+          } else {
+            LOG.fatal("hbck failed.", e);
+          }
+        }
+        // If we reach here, then an exception was caught
+        if (fail) fail();
+        return null;
+      }
+    }
+    service = Executors.newFixedThreadPool(2);
+    hbck1 = service.submit(new RunHbck());
+    hbck2 = service.submit(new RunHbck());
+    service.shutdown();
+    //wait for 15 seconds, for both hbck calls finish
+    service.awaitTermination(15, TimeUnit.SECONDS);
+    HBaseFsck h1 = hbck1.get();
+    HBaseFsck h2 = hbck2.get();
+    // Make sure only one of the calls was successful
+    assert(h1 == null || h2 == null);
+    if (h1 != null) {
+      assert(h1.getRetCode() >= 0);
+    }
+    if (h2 != null) {
+      assert(h2.getRetCode() >= 0);
+    }
+  }
+
+  /**
    * This create and fixes a bad table with regions that have a duplicate
    * start key
    */

Reply via email to