Repository: hbase
Updated Branches:
  refs/heads/master f0e29c49a -> 20e855f28


HBASE-13895 DATALOSS: Region assigned before WAL replay when abort (Enis 
Soztutar) -- REAPPLY


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/20e855f2
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/20e855f2
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/20e855f2

Branch: refs/heads/master
Commit: 20e855f2824d3d39c13560fedabbd985f3ae5d13
Parents: f0e29c4
Author: stack <st...@apache.org>
Authored: Wed Jul 1 23:16:30 2015 -0700
Committer: stack <st...@apache.org>
Committed: Wed Jul 1 23:16:30 2015 -0700

----------------------------------------------------------------------
 .../RegionServerAbortedException.java           |  34 ++++
 .../RegionServerStoppedException.java           |   1 +
 .../test/IntegrationTestLoadAndVerify.java      | 198 +++++++++++++++++--
 .../hadoop/hbase/mapreduce/WALPlayer.java       |   6 +-
 .../hadoop/hbase/master/AssignmentManager.java  |  53 ++---
 .../hbase/regionserver/RSRpcServices.java       |   8 +-
 .../hadoop/hbase/mapreduce/TestWALPlayer.java   |   2 +-
 .../master/TestAssignmentManagerOnCluster.java  |  12 --
 8 files changed, 260 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/20e855f2/hbase-client/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAbortedException.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAbortedException.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAbortedException.java
new file mode 100644
index 0000000..ddc2270
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAbortedException.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Thrown by the region server when it is aborting.
+ */
+@SuppressWarnings("serial")
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class RegionServerAbortedException extends RegionServerStoppedException 
{
+  public RegionServerAbortedException(String s) {
+    super(s);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/20e855f2/hbase-client/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerStoppedException.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerStoppedException.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerStoppedException.java
index f116869..95f697e 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerStoppedException.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerStoppedException.java
@@ -25,6 +25,7 @@ import 
org.apache.hadoop.hbase.classification.InterfaceStability;
 
 /**
  * Thrown by the region server when it is in shutting down state.
+ * @see RegionServerAbortedException
  */
 @SuppressWarnings("serial")
 @InterfaceAudience.Public

http://git-wip-us.apache.org/repos/asf/hbase/blob/20e855f2/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadAndVerify.java
----------------------------------------------------------------------
diff --git 
a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadAndVerify.java
 
b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadAndVerify.java
index c92393f..623a370 100644
--- 
a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadAndVerify.java
+++ 
b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadAndVerify.java
@@ -23,7 +23,6 @@ import static org.junit.Assert.assertTrue;
 import java.io.IOException;
 import java.util.Random;
 import java.util.Set;
-import java.util.UUID;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -31,8 +30,11 @@ import org.apache.commons.cli.CommandLine;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -67,6 +69,19 @@ import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.mapreduce.WALPlayer;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import java.io.BufferedReader;
+import java.io.FileNotFoundException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.InterruptedIOException;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -91,6 +106,9 @@ import com.google.common.collect.Sets;
  */
 @Category(IntegrationTests.class)
 public class IntegrationTestLoadAndVerify  extends IntegrationTestBase  {
+
+  private static final Log LOG = 
LogFactory.getLog(IntegrationTestLoadAndVerify.class);
+
   private static final String TEST_NAME = "IntegrationTestLoadAndVerify";
   private static final byte[] TEST_FAMILY = Bytes.toBytes("f1");
   private static final byte[] TEST_QUALIFIER = Bytes.toBytes("q1");
@@ -112,7 +130,10 @@ public class IntegrationTestLoadAndVerify  extends 
IntegrationTestBase  {
 
   private static final int SCANNER_CACHING = 500;
 
+  private static final int MISSING_ROWS_TO_LOG = 10; // YARN complains when 
too many counters
+
   private String toRun = null;
+  private String keysDir = null;
 
   private enum Counters {
     ROWS_WRITTEN,
@@ -267,7 +288,6 @@ public void cleanUpCluster() throws Exception {
   }
 
   public static class VerifyReducer extends Reducer<BytesWritable, 
BytesWritable, Text, Text> {
-    private static final Log LOG = LogFactory.getLog(VerifyReducer.class);
     private Counter refsChecked;
     private Counter rowsWritten;
 
@@ -312,6 +332,7 @@ public void cleanUpCluster() throws Exception {
 
   protected Job doLoad(Configuration conf, HTableDescriptor htd) throws 
Exception {
     Path outputDir = getTestDir(TEST_NAME, "load-output");
+    LOG.info("Load output dir: " + outputDir);
 
     NMapInputFormat.setNumMapTasks(conf, conf.getInt(NUM_MAP_TASKS_KEY, 
NUM_MAP_TASKS_DEFAULT));
     conf.set(TABLE_NAME_KEY, htd.getTableName().getNameAsString());
@@ -339,6 +360,7 @@ public void cleanUpCluster() throws Exception {
 
   protected void doVerify(Configuration conf, HTableDescriptor htd) throws 
Exception {
     Path outputDir = getTestDir(TEST_NAME, "verify-output");
+    LOG.info("Verify output dir: " + outputDir);
 
     Job job = Job.getInstance(conf);
     job.setJarByClass(this.getClass());
@@ -363,6 +385,139 @@ public void cleanUpCluster() throws Exception {
     assertEquals(0, numOutputRecords);
   }
 
+  /**
+   * Tool to search missing rows in WALs and hfiles.
+   * Pass in file or dir of keys to search for. Key file must have been 
written by Verify step
+   * (we depend on the format it writes out. We'll read them in and then 
search in hbase
+   * WALs and oldWALs dirs (Some of this is TODO).
+   */
+  public static class WALSearcher extends WALPlayer {
+    public WALSearcher(Configuration conf) {
+      super(conf);
+    }
+
+    /**
+     * The actual searcher mapper.
+     */
+    public static class WALMapperSearcher extends WALMapper {
+      private SortedSet<byte []> keysToFind;
+      private AtomicInteger rows = new AtomicInteger(0);
+
+      @Override
+      public void setup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, 
Mutation>.Context context)
+          throws IOException {
+        super.setup(context);
+        try {
+          this.keysToFind = readKeysToSearch(context.getConfiguration());
+          LOG.info("Loaded keys to find: count=" + this.keysToFind.size());
+        } catch (InterruptedException e) {
+          throw new InterruptedIOException(e.toString());
+        }
+      }
+
+      @Override
+      protected boolean filter(Context context, Cell cell) {
+        // TODO: Can I do a better compare than this copying out key?
+        byte [] row = new byte [cell.getRowLength()];
+        System.arraycopy(cell.getRowArray(), cell.getRowOffset(), row, 0, 
cell.getRowLength());
+        boolean b = this.keysToFind.contains(row);
+        if (b) {
+          String keyStr = Bytes.toStringBinary(row);
+          try {
+            LOG.info("Found cell=" + cell + " , walKey=" + 
context.getCurrentKey());
+          } catch (IOException|InterruptedException e) {
+            LOG.warn(e);
+          }
+          if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
+            context.getCounter(FOUND_GROUP_KEY, keyStr).increment(1);
+          }
+          context.getCounter(FOUND_GROUP_KEY, 
"CELL_WITH_MISSING_ROW").increment(1);
+        }
+        return b;
+      }
+    }
+
+    // Put in place the above WALMapperSearcher.
+    @Override
+    public Job createSubmittableJob(String[] args) throws IOException {
+      Job job = super.createSubmittableJob(args);
+      // Call my class instead.
+      job.setJarByClass(WALMapperSearcher.class);
+      job.setMapperClass(WALMapperSearcher.class);
+      job.setOutputFormatClass(NullOutputFormat.class);
+      return job;
+    }
+  }
+
+  static final String FOUND_GROUP_KEY = "Found";
+  static final String SEARCHER_INPUTDIR_KEY = "searcher.keys.inputdir";
+
+  static SortedSet<byte []> readKeysToSearch(final Configuration conf)
+      throws IOException, InterruptedException {
+    Path keysInputDir = new Path(conf.get(SEARCHER_INPUTDIR_KEY));
+    FileSystem fs = FileSystem.get(conf);
+    SortedSet<byte []> result = new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
+    if (!fs.exists(keysInputDir)) {
+      throw new FileNotFoundException(keysInputDir.toString());
+    }
+    if (!fs.isDirectory(keysInputDir)) {
+      FileStatus keyFileStatus = fs.getFileStatus(keysInputDir);
+      readFileToSearch(conf, fs, keyFileStatus, result);
+    } else {
+      RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(keysInputDir, 
false);
+      while(iterator.hasNext()) {
+        LocatedFileStatus keyFileStatus = iterator.next();
+        // Skip "_SUCCESS" file.
+        if (keyFileStatus.getPath().getName().startsWith("_")) continue;
+        readFileToSearch(conf, fs, keyFileStatus, result);
+      }
+    }
+    return result;
+  }
+
+  private static SortedSet<byte[]> readFileToSearch(final Configuration conf,
+    final FileSystem fs, final FileStatus keyFileStatus, SortedSet<byte []> 
result)
+        throws IOException,
+    InterruptedException {
+    // verify uses file output format and writes <Text, Text>. We can read it 
as a text file
+    try (InputStream in = fs.open(keyFileStatus.getPath());
+        BufferedReader reader = new BufferedReader(new InputStreamReader(in))) 
{
+      // extract out the key and return that missing as a missing key
+      String line;
+      while ((line = reader.readLine()) != null) {
+        if (line.isEmpty()) continue;
+
+        String[] parts = line.split("\\s+");
+        if (parts.length >= 1) {
+          String key = parts[0];
+          result.add(Bytes.toBytesBinary(key));
+        } else {
+          LOG.info("Cannot parse key from: " + line);
+        }
+      }
+    }
+    return result;
+  }
+
+  private int doSearch(Configuration conf, String keysDir) throws Exception {
+    Path inputDir = new Path(keysDir);
+
+    getConf().set(SEARCHER_INPUTDIR_KEY, inputDir.toString());
+    SortedSet<byte []> keys = readKeysToSearch(getConf());
+    if (keys.isEmpty()) throw new RuntimeException("No keys to find");
+    LOG.info("Count of keys to find: " + keys.size());
+    for(byte [] key: keys)  LOG.info("Key: " + Bytes.toStringBinary(key));
+    Path hbaseDir = new Path(getConf().get(HConstants.HBASE_DIR));
+    // Now read all WALs. In two dirs. Presumes certain layout.
+    Path walsDir = new Path(hbaseDir, HConstants.HREGION_LOGDIR_NAME);
+    Path oldWalsDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME);
+    LOG.info("Running Search with keys inputDir=" + inputDir +
+      " against " + getConf().get(HConstants.HBASE_DIR));
+    int ret = ToolRunner.run(new WALSearcher(getConf()), new String [] 
{walsDir.toString(), ""});
+    if (ret != 0) return ret;
+    return ToolRunner.run(new WALSearcher(getConf()), new String [] 
{oldWalsDir.toString(), ""});
+  }
+
   private static void setJobScannerConf(Job job) {
     // Make sure scanners log something useful to make debugging possible.
     job.getConfiguration().setBoolean(ScannerCallable.LOG_SCANNER_ACTIVITY, 
true);
@@ -371,11 +526,8 @@ public void cleanUpCluster() throws Exception {
   }
 
   public Path getTestDir(String testName, String subdir) throws IOException {
-    //HBaseTestingUtility.getDataTestDirOnTestFs() has not been backported.
+    Path testDir = util.getDataTestDirOnTestFS(testName);
     FileSystem fs = FileSystem.get(getConf());
-    Path base = new Path(fs.getWorkingDirectory(), "test-data");
-    String randomStr = UUID.randomUUID().toString();
-    Path testDir = new Path(base, randomStr);
     fs.deleteOnExit(testDir);
 
     return new Path(new Path(testDir, testName), subdir);
@@ -398,7 +550,8 @@ public void cleanUpCluster() throws Exception {
   }
 
   public void usage() {
-    System.err.println(this.getClass().getSimpleName() + " [-Doptions] 
<load|verify|loadAndVerify>");
+    System.err.println(this.getClass().getSimpleName()
+      + " [-Doptions] <load|verify|loadAndVerify|search>");
     System.err.println("  Loads a table with row dependencies and verifies the 
dependency chains");
     System.err.println("Options");
     System.err.println("  -Dloadmapper.table=<name>        Table to 
write/verify (default autogen)");
@@ -417,11 +570,16 @@ public void cleanUpCluster() throws Exception {
     super.processOptions(cmd);
 
     String[] args = cmd.getArgs();
-    if (args == null || args.length < 1 || args.length > 1) {
+    if (args == null || args.length < 1) {
       usage();
       throw new RuntimeException("Incorrect Number of args.");
     }
     toRun = args[0];
+    if (toRun.equalsIgnoreCase("search")) {
+      if (args.length > 1) {
+        keysDir = args[1];
+      }
+    }
   }
 
   @Override
@@ -429,16 +587,25 @@ public void cleanUpCluster() throws Exception {
     IntegrationTestingUtility.setUseDistributedCluster(getConf());
     boolean doLoad = false;
     boolean doVerify = false;
+    boolean doSearch = false;
     boolean doDelete = getConf().getBoolean("loadmapper.deleteAfter",true);
     int numPresplits = getConf().getInt("loadmapper.numPresplits", 40);
 
-    if (toRun.equals("load")) {
+    if (toRun.equalsIgnoreCase("load")) {
       doLoad = true;
-    } else if (toRun.equals("verify")) {
+    } else if (toRun.equalsIgnoreCase("verify")) {
       doVerify= true;
-    } else if (toRun.equals("loadAndVerify")) {
+    } else if (toRun.equalsIgnoreCase("loadAndVerify")) {
       doLoad=true;
       doVerify= true;
+    } else if (toRun.equalsIgnoreCase("search")) {
+      doLoad=false;
+      doVerify= false;
+      doSearch = true;
+      if (keysDir == null) {
+        System.err.println("Usage: search <KEYS_DIR>]");
+        return 1;
+      }
     } else {
       System.err.println("Invalid argument " + toRun);
       usage();
@@ -450,9 +617,9 @@ public void cleanUpCluster() throws Exception {
     HTableDescriptor htd = new HTableDescriptor(table);
     htd.addFamily(new HColumnDescriptor(TEST_FAMILY));
 
-    try (Connection conn = ConnectionFactory.createConnection(getConf());
-        Admin admin = conn.getAdmin()) {
-      if (doLoad) {
+    if (doLoad) {
+      try (Connection conn = ConnectionFactory.createConnection(getConf());
+          Admin admin = conn.getAdmin()) {
         admin.createTable(htd, Bytes.toBytes(0L), Bytes.toBytes(-1L), 
numPresplits);
         doLoad(getConf(), htd);
       }
@@ -463,6 +630,9 @@ public void cleanUpCluster() throws Exception {
         getTestingUtil(getConf()).deleteTable(htd.getTableName());
       }
     }
+    if (doSearch) {
+      return doSearch(getConf(), keysDir);
+    }
     return 0;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/20e855f2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
index 713ca40..c067fc3 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
@@ -85,6 +85,10 @@ public class WALPlayer extends Configured implements Tool {
 
   private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
 
+  protected WALPlayer(final Configuration c) {
+    super(c);
+  }
+
   /**
    * A mapper that just writes out KeyValues.
    * This one can be used together with {@link KeyValueSortReducer}
@@ -327,7 +331,7 @@ public class WALPlayer extends Configured implements Tool {
    * @throws Exception When running the job fails.
    */
   public static void main(String[] args) throws Exception {
-    int ret = ToolRunner.run(HBaseConfiguration.create(), new WALPlayer(), 
args);
+    int ret = ToolRunner.run(new WALPlayer(HBaseConfiguration.create()), args);
     System.exit(ret);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/20e855f2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
index 34db4e4..8426689 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
@@ -76,6 +76,7 @@ import 
org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Regio
 import 
org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
 import org.apache.hadoop.hbase.quotas.RegionStateListener;
 import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
+import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -393,7 +394,7 @@ public class AssignmentManager {
    * @throws IOException
    * @throws KeeperException
    * @throws InterruptedException
-   * @throws CoordinatedStateException 
+   * @throws CoordinatedStateException
    */
   void joinCluster()
   throws IOException, KeeperException, InterruptedException, 
CoordinatedStateException {
@@ -890,10 +891,18 @@ public class AssignmentManager {
         LOG.warn("Server " + server + " region CLOSE RPC returned false for " +
           region.getRegionNameAsString());
       } catch (Throwable t) {
+        long sleepTime = 0;
+        Configuration conf = this.server.getConfiguration();
         if (t instanceof RemoteException) {
           t = ((RemoteException)t).unwrapRemoteException();
         }
-        if (t instanceof NotServingRegionException
+        if (t instanceof RegionServerAbortedException) {
+          // RS is aborting, we cannot offline the region since the region may 
need to do WAL
+          // recovery. Until we see  the RS expiration, we should retry.
+          sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
+            RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
+
+        } else if (t instanceof NotServingRegionException
             || t instanceof RegionServerStoppedException
             || t instanceof ServerNotRunningYetException) {
           LOG.debug("Offline " + region.getRegionNameAsString()
@@ -903,27 +912,25 @@ public class AssignmentManager {
         } else if (t instanceof FailedServerException && i < maximumAttempts) {
           // In case the server is in the failed server list, no point to
           // retry too soon. Retry after the failed_server_expiry time
-          try {
-            Configuration conf = this.server.getConfiguration();
-            long sleepTime = 1 + 
conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
-              RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
-            if (LOG.isDebugEnabled()) {
-              LOG.debug(server + " is on failed server list; waiting "
-                + sleepTime + "ms", t);
-            }
-            Thread.sleep(sleepTime);
-          } catch (InterruptedException ie) {
-            LOG.warn("Failed to unassign "
-              + region.getRegionNameAsString() + " since interrupted", ie);
-            regionStates.updateRegionState(region, State.FAILED_CLOSE);
-            Thread.currentThread().interrupt();
-            return;
+          sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
+          RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(server + " is on failed server list; waiting " + 
sleepTime + "ms", t);
           }
-        }
-
-        LOG.info("Server " + server + " returned " + t + " for "
-          + region.getRegionNameAsString() + ", try=" + i
-          + " of " + this.maximumAttempts, t);
+       }
+       try {
+         if (sleepTime > 0) {
+           Thread.sleep(sleepTime);
+         }
+       } catch (InterruptedException ie) {
+         LOG.warn("Interrupted unassign " + region.getRegionNameAsString(), 
ie);
+         Thread.currentThread().interrupt();
+         regionStates.updateRegionState(region, State.FAILED_CLOSE);
+         return;
+       }
+       LOG.info("Server " + server + " returned " + t + " for "
+         + region.getRegionNameAsString() + ", try=" + i
+         + " of " + this.maximumAttempts, t);
       }
     }
     // Run out of attempts
@@ -1320,7 +1327,7 @@ public class AssignmentManager {
           if (state == null || state.getServerName() == null) {
             // We don't know where the region is, offline it.
             // No need to send CLOSE RPC
-            LOG.warn("Attempting to unassign a region not in RegionStates"
+            LOG.warn("Attempting to unassign a region not in RegionStates "
               + region.getRegionNameAsString() + ", offlined");
             regionOffline(region);
             return;

http://git-wip-us.apache.org/repos/asf/hbase/blob/20e855f2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 7bcf8e7..d7be4b4 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -1036,9 +1036,11 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
    * @throws IOException
    */
   protected void checkOpen() throws IOException {
-    if (regionServer.isStopped() || regionServer.isAborted()) {
-      throw new RegionServerStoppedException("Server " + 
regionServer.serverName
-        + " not running" + (regionServer.isAborted() ? ", aborting" : ""));
+    if (regionServer.isAborted()) {
+      throw new RegionServerAbortedException("Server " + 
regionServer.serverName + " aborting");
+    }
+    if (regionServer.isStopped()) {
+      throw new RegionServerStoppedException("Server " + 
regionServer.serverName + " stopping");
     }
     if (!regionServer.fsOk) {
       throw new RegionServerStoppedException("File system not available");

http://git-wip-us.apache.org/repos/asf/hbase/blob/20e855f2/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java
index e524f38..6dc1d9f 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java
@@ -114,7 +114,7 @@ public class TestWALPlayer {
         .getRootDir(), HConstants.HREGION_LOGDIR_NAME).toString();
 
     Configuration configuration= TEST_UTIL.getConfiguration();
-    WALPlayer player = new WALPlayer();
+    WALPlayer player = new WALPlayer(configuration);
     String optionName="_test_.name";
     configuration.set(optionName, "1000");
     player.setupTime(configuration, optionName);

http://git-wip-us.apache.org/repos/asf/hbase/blob/20e855f2/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java
index 6b68bfe..264e62f 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java
@@ -982,12 +982,6 @@ public class TestAssignmentManagerOnCluster {
       assertTrue(regionStates.isRegionOnline(hri));
       assertEquals(oldServerName, regionStates.getRegionServerOfRegion(hri));
 
-      // Try to unassign the dead region before SSH
-      am.unassign(hri);
-      // The region should be moved to offline since the server is dead
-      RegionState state = regionStates.getRegionState(hri);
-      assertTrue(state.isOffline());
-
       // Kill the hosting server, which doesn't have meta on it.
       cluster.killRegionServer(oldServerName);
       cluster.waitForRegionServerToStop(oldServerName, -1);
@@ -1061,12 +1055,6 @@ public class TestAssignmentManagerOnCluster {
       assertTrue(regionStates.isRegionOnline(hri));
       assertEquals(oldServerName, regionStates.getRegionServerOfRegion(hri));
 
-      // Try to unassign the dead region before SSH
-      am.unassign(hri);
-      // The region should be moved to offline since the server is dead
-      RegionState state = regionStates.getRegionState(hri);
-      assertTrue(state.isOffline());
-
       // Disable the table now.
       master.disableTable(hri.getTable());
 

Reply via email to