Repository: hbase
Updated Branches:
  refs/heads/branch-1 895e9228a -> 60e7e6aed


HBASE-11261. Handle splitting/merging of regions that have region_replication 
greater than one


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/60e7e6ae
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/60e7e6ae
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/60e7e6ae

Branch: refs/heads/branch-1
Commit: 60e7e6aed5b624828e52ca21e9ee64f99c1e0fa1
Parents: 895e922
Author: Devaraj Das <[email protected]>
Authored: Fri Jan 16 10:54:52 2015 -0800
Committer: Devaraj Das <[email protected]>
Committed: Fri Jan 16 10:54:52 2015 -0800

----------------------------------------------------------------------
 .../hadoop/hbase/master/AssignmentManager.java  | 200 ++++++++++++++++++-
 .../master/handler/ClosedRegionHandler.java     |   3 +-
 .../master/handler/ServerShutdownHandler.java   |   3 +-
 .../hbase/regionserver/StoreFileInfo.java       |  18 ++
 .../hbase/util/ServerRegionReplicaUtil.java     |   6 +
 .../hadoop/hbase/client/TestReplicasClient.java |  34 ++--
 .../TestRegionMergeTransactionOnCluster.java    |  52 ++++-
 .../TestSplitTransactionOnCluster.java          |  86 +++++++-
 8 files changed, 372 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/60e7e6ae/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
index 262ffee..b95b50c 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
@@ -31,8 +31,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.NavigableMap;
+import java.util.Random;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ThreadFactory;
@@ -62,6 +64,7 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.TableStateManager;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.client.Result;
@@ -148,6 +151,8 @@ public class AssignmentManager extends ZooKeeperListener {
 
   final private KeyLocker<String> locker = new KeyLocker<String>();
 
+  Set<HRegionInfo> replicasToClose = Collections.synchronizedSet(new 
HashSet<HRegionInfo>());
+
   /**
    * Map of regions to reopen after the schema of a table is changed. Key -
    * encoded region name, value - HRegionInfo
@@ -633,6 +638,13 @@ public class AssignmentManager extends ZooKeeperListener {
       LOG.info("Clean cluster startup. Assigning user regions");
       assignAllUserRegions(allRegions);
     }
+    // unassign replicas of the split parents and the merged regions
+    // the daughter replicas are opened in assignAllUserRegions if it was
+    // not already opened.
+    for (HRegionInfo h : replicasToClose) {
+      unassign(h);
+    }
+    replicasToClose.clear();
     return failover;
   }
 
@@ -816,7 +828,11 @@ public class AssignmentManager extends ZooKeeperListener {
       case RS_ZK_REGION_FAILED_OPEN:
         // Region is closed, insert into RIT and handle it
         regionStates.updateRegionState(regionInfo, State.CLOSED, sn);
-        invokeAssign(regionInfo);
+        if (!replicasToClose.contains(regionInfo)) {
+          invokeAssign(regionInfo);
+        } else {
+          offlineDisabledRegion(regionInfo);
+        }
         break;
 
       case M_ZK_REGION_OFFLINE:
@@ -1519,6 +1535,7 @@ public class AssignmentManager extends ZooKeeperListener {
       deleteNodeInStates(encodedName, "closed", null,
         EventType.RS_ZK_REGION_CLOSED, EventType.M_ZK_REGION_OFFLINE);
     }
+    replicasToClose.remove(regionInfo);
     regionOffline(regionInfo);
   }
 
@@ -2264,7 +2281,7 @@ public class AssignmentManager extends ZooKeeperListener {
   private boolean isDisabledorDisablingRegionInRIT(final HRegionInfo region) {
     if (this.tableStateManager.isTableState(region.getTable(),
         ZooKeeperProtos.Table.State.DISABLED,
-        ZooKeeperProtos.Table.State.DISABLING)) {
+        ZooKeeperProtos.Table.State.DISABLING) || 
replicasToClose.contains(region)) {
       LOG.info("Table " + region.getTable() + " is disabled or disabling;"
         + " skipping assign of " + region.getRegionNameAsString());
       offlineDisabledRegion(region);
@@ -2530,7 +2547,7 @@ public class AssignmentManager extends ZooKeeperListener {
       lock.unlock();
 
       // Region is expected to be reassigned afterwards
-      if (reassign && regionStates.isRegionOffline(region)) {
+      if (!replicasToClose.contains(region) && reassign && 
regionStates.isRegionOffline(region)) {
         assign(region, true);
       }
     }
@@ -2843,6 +2860,19 @@ public class AssignmentManager extends ZooKeeperListener 
{
         LOG.debug("null result from meta - ignoring but this is strange.");
         continue;
       }
+      // keep a track of replicas to close. These were the replicas of the 
originally
+      // unmerged regions. The master might have closed them before but it 
mightn't
+      // maybe because it crashed.
+      PairOfSameType<HRegionInfo> p = 
MetaTableAccessor.getMergeRegions(result);
+      if (p.getFirst() != null && p.getSecond() != null) {
+        int numReplicas = 
((MasterServices)server).getTableDescriptors().get(p.getFirst().
+            getTable()).getRegionReplication();
+        for (HRegionInfo merge : p) {
+          for (int i = 1; i < numReplicas; i++) {
+            
replicasToClose.add(RegionReplicaUtil.getRegionInfoForReplica(merge, i));
+          }
+        }
+      }
       RegionLocations rl =  MetaTableAccessor.getRegionLocations(result);
       if (rl == null) continue;
       HRegionLocation[] locations = rl.getRegionLocations();
@@ -2852,6 +2882,14 @@ public class AssignmentManager extends ZooKeeperListener 
{
         if (regionInfo == null) continue;
         int replicaId = regionInfo.getReplicaId();
         State state = RegionStateStore.getRegionState(result, replicaId);
+        // keep a track of replicas to close. These were the replicas of the 
split parents
+        // from the previous life of the master. The master should have closed 
them before
+        // but it couldn't maybe because it crashed
+        if (replicaId == 0 && state.equals(State.SPLIT)) {
+          for (HRegionLocation h : locations) {
+            replicasToClose.add(h.getRegionInfo());
+          }
+        }
         ServerName lastHost = hrl.getServerName();
         ServerName regionLocation = RegionStateStore.getRegionServer(result, 
replicaId);
         regionStates.createRegionState(regionInfo, state, regionLocation, 
lastHost);
@@ -3489,7 +3527,8 @@ public class AssignmentManager extends ZooKeeperListener {
         // When there are more than one region server a new RS is selected as 
the
         // destination and the same is updated in the region plan. (HBASE-5546)
         if (getTableStateManager().isTableState(hri.getTable(),
-            ZooKeeperProtos.Table.State.DISABLED, 
ZooKeeperProtos.Table.State.DISABLING)) {
+            ZooKeeperProtos.Table.State.DISABLED, 
ZooKeeperProtos.Table.State.DISABLING) ||
+            replicasToClose.contains(hri)) {
           offlineDisabledRegion(hri);
           return;
         }
@@ -3529,7 +3568,8 @@ public class AssignmentManager extends ZooKeeperListener {
 
   private void onRegionClosed(final HRegionInfo hri) {
     if (getTableStateManager().isTableState(hri.getTable(),
-        ZooKeeperProtos.Table.State.DISABLED, 
ZooKeeperProtos.Table.State.DISABLING)) {
+        ZooKeeperProtos.Table.State.DISABLED, 
ZooKeeperProtos.Table.State.DISABLING) ||
+        replicasToClose.contains(hri)) {
       offlineDisabledRegion(hri);
       return;
     }
@@ -3541,8 +3581,8 @@ public class AssignmentManager extends ZooKeeperListener {
   }
 
   private String onRegionSplit(ServerName sn, TransitionCode code,
-      HRegionInfo p, HRegionInfo a, HRegionInfo b) {
-    RegionState rs_p = regionStates.getRegionState(p);
+      final HRegionInfo p, final HRegionInfo a, final HRegionInfo b) {
+    final RegionState rs_p = regionStates.getRegionState(p);
     RegionState rs_a = regionStates.getRegionState(a);
     RegionState rs_b = regionStates.getRegionState(b);
     if (!(rs_p.isOpenOrSplittingOnServer(sn)
@@ -3568,6 +3608,15 @@ public class AssignmentManager extends ZooKeeperListener 
{
           ZooKeeperProtos.Table.State.DISABLED, 
ZooKeeperProtos.Table.State.DISABLING)) {
         invokeUnAssign(a);
         invokeUnAssign(b);
+      } else {
+        Callable<Object> splitReplicasCallable = new Callable<Object>() {
+          @Override
+          public Object call() {
+            doSplittingOfReplicas(p, a, b);
+            return null;
+          }
+        };
+        threadPoolExecutorService.submit(splitReplicasCallable);
       }
     } else if (code == TransitionCode.SPLIT_PONR) {
       try {
@@ -3590,7 +3639,7 @@ public class AssignmentManager extends ZooKeeperListener {
   }
 
   private String onRegionMerge(ServerName sn, TransitionCode code,
-      HRegionInfo p, HRegionInfo a, HRegionInfo b) {
+      final HRegionInfo p, final HRegionInfo a, final HRegionInfo b) {
     RegionState rs_p = regionStates.getRegionState(p);
     RegionState rs_a = regionStates.getRegionState(a);
     RegionState rs_b = regionStates.getRegionState(b);
@@ -3617,6 +3666,15 @@ public class AssignmentManager extends ZooKeeperListener 
{
       if (getTableStateManager().isTableState(p.getTable(),
           ZooKeeperProtos.Table.State.DISABLED, 
ZooKeeperProtos.Table.State.DISABLING)) {
         invokeUnAssign(p);
+      } else {
+        Callable<Object> mergeReplicasCallable = new Callable<Object>() {
+          @Override
+          public Object call() {
+            doMergingOfReplicas(p, a, b);
+            return null;
+          }
+        };
+        threadPoolExecutorService.submit(mergeReplicasCallable);
       }
     } else if (code == TransitionCode.MERGE_PONR) {
       try {
@@ -3723,6 +3781,7 @@ public class AssignmentManager extends ZooKeeperListener {
     }
 
     if (et == EventType.RS_ZK_REGION_MERGED) {
+      doMergingOfReplicas(p, hri_a, hri_b);
       LOG.debug("Handling MERGED event for " + encodedName + "; deleting 
node");
       // Remove region from ZK
       try {
@@ -3851,6 +3910,8 @@ public class AssignmentManager extends ZooKeeperListener {
     }
 
     if (et == EventType.RS_ZK_REGION_SPLIT) {
+      // split replicas
+      doSplittingOfReplicas(rs_p.getRegion(), hri_a, hri_b);
       LOG.debug("Handling SPLIT event for " + encodedName + "; deleting node");
       // Remove region from ZK
       try {
@@ -3883,6 +3944,110 @@ public class AssignmentManager extends 
ZooKeeperListener {
     return true;
   }
 
+  private void doMergingOfReplicas(HRegionInfo mergedHri, final HRegionInfo 
hri_a,
+      final HRegionInfo hri_b) {
+    // Close replicas for the original unmerged regions. create/assign new 
replicas
+    // for the merged parent.
+    List<HRegionInfo> unmergedRegions = new ArrayList<HRegionInfo>();
+    unmergedRegions.add(hri_a);
+    unmergedRegions.add(hri_b);
+    Map<ServerName, List<HRegionInfo>> map = 
regionStates.getRegionAssignments(unmergedRegions);
+    Collection<List<HRegionInfo>> c = map.values();
+    for (List<HRegionInfo> l : c) {
+      for (HRegionInfo h : l) {
+        if (!RegionReplicaUtil.isDefaultReplica(h)) {
+          LOG.debug("Unassigning un-merged replica " + h);
+          unassign(h);
+        }
+      }
+    }
+    int numReplicas = 1;
+    try {
+      numReplicas = 
((MasterServices)server).getTableDescriptors().get(mergedHri.getTable()).
+          getRegionReplication();
+    } catch (IOException e) {
+      LOG.warn("Couldn't get the replication attribute of the table " + 
mergedHri.getTable() +
+          " due to " + e.getMessage() + ". The assignment of replicas for the 
merged region " +
+          "will not be done");
+    }
+    List<HRegionInfo> regions = new ArrayList<HRegionInfo>();
+    for (int i = 1; i < numReplicas; i++) {
+      regions.add(RegionReplicaUtil.getRegionInfoForReplica(mergedHri, i));
+    }
+    try {
+      assign(regions);
+    } catch (IOException ioe) {
+      LOG.warn("Couldn't assign all replica(s) of region " + mergedHri + " 
because of " +
+                ioe.getMessage());
+    } catch (InterruptedException ie) {
+      LOG.warn("Couldn't assign all replica(s) of region " + mergedHri+ " 
because of " +
+                ie.getMessage());
+    }
+  }
+
+  private void doSplittingOfReplicas(final HRegionInfo parentHri, final 
HRegionInfo hri_a,
+      final HRegionInfo hri_b) {
+    // create new regions for the replica, and assign them to match with the
+    // current replica assignments. If replica1 of parent is assigned to RS1,
+    // the replica1s of daughters will be on the same machine
+    int numReplicas = 1;
+    try {
+      numReplicas = 
((MasterServices)server).getTableDescriptors().get(parentHri.getTable()).
+          getRegionReplication();
+    } catch (IOException e) {
+      LOG.warn("Couldn't get the replication attribute of the table " + 
parentHri.getTable() +
+          " due to " + e.getMessage() + ". The assignment of daughter replicas 
" +
+          "replicas will not be done");
+    }
+    // unassign the old replicas
+    List<HRegionInfo> parentRegion = new ArrayList<HRegionInfo>();
+    parentRegion.add(parentHri);
+    Map<ServerName, List<HRegionInfo>> currentAssign =
+        regionStates.getRegionAssignments(parentRegion);
+    Collection<List<HRegionInfo>> c = currentAssign.values();
+    for (List<HRegionInfo> l : c) {
+      for (HRegionInfo h : l) {
+        if (!RegionReplicaUtil.isDefaultReplica(h)) {
+          LOG.debug("Unassigning parent's replica " + h);
+          unassign(h);
+        }
+      }
+    }
+    // assign daughter replicas
+    Map<HRegionInfo, ServerName> map = new HashMap<HRegionInfo, ServerName>();
+    for (int i = 1; i < numReplicas; i++) {
+      prepareDaughterReplicaForAssignment(hri_a, parentHri, i, map);
+      prepareDaughterReplicaForAssignment(hri_b, parentHri, i, map);
+    }
+    try {
+      assign(map);
+    } catch (IOException e) {
+      LOG.warn("Caught exception " + e + " while trying to assign replica(s) 
of daughter(s)");
+    } catch (InterruptedException e) {
+      LOG.warn("Caught exception " + e + " while trying to assign replica(s) 
of daughter(s)");
+    }
+  }
+
+  private void prepareDaughterReplicaForAssignment(HRegionInfo daughterHri, 
HRegionInfo parentHri,
+      int replicaId, Map<HRegionInfo, ServerName> map) {
+    HRegionInfo parentReplica = 
RegionReplicaUtil.getRegionInfoForReplica(parentHri, replicaId);
+    HRegionInfo daughterReplica = 
RegionReplicaUtil.getRegionInfoForReplica(daughterHri,
+        replicaId);
+    LOG.debug("Created replica region for daughter " + daughterReplica);
+    ServerName sn;
+    if ((sn = regionStates.getRegionServerOfRegion(parentReplica)) != null) {
+      map.put(daughterReplica, sn);
+    } else {
+      List<ServerName> servers = serverManager.getOnlineServersList();
+      sn = servers.get((new 
Random(System.currentTimeMillis())).nextInt(servers.size()));
+      map.put(daughterReplica, sn);
+    }
+  }
+
+  public Set<HRegionInfo> getReplicasToClose() {
+    return replicasToClose;
+  }
+
   /**
    * A region is offline.  The new state should be the specified one,
    * if not null.  If the specified state is null, the new state is Offline.
@@ -3897,6 +4062,25 @@ public class AssignmentManager extends ZooKeeperListener 
{
 
     // Tell our listeners that a region was closed
     sendRegionClosedNotification(regionInfo);
+    // also note that all the replicas of the primary should be closed
+    if (state != null && state.equals(State.SPLIT)) {
+      Collection<HRegionInfo> c = new ArrayList<HRegionInfo>(1);
+      c.add(regionInfo);
+      Map<ServerName, List<HRegionInfo>> map = 
regionStates.getRegionAssignments(c);
+      Collection<List<HRegionInfo>> allReplicas = map.values();
+      for (List<HRegionInfo> list : allReplicas) {
+        replicasToClose.addAll(list);
+      }
+    }
+    else if (state != null && state.equals(State.MERGED)) {
+      Collection<HRegionInfo> c = new ArrayList<HRegionInfo>(1);
+      c.add(regionInfo);
+      Map<ServerName, List<HRegionInfo>> map = 
regionStates.getRegionAssignments(c);
+      Collection<List<HRegionInfo>> allReplicas = map.values();
+      for (List<HRegionInfo> list : allReplicas) {
+        replicasToClose.addAll(list);
+      }
+    }
   }
 
   private void sendRegionOpenedNotification(final HRegionInfo regionInfo,

http://git-wip-us.apache.org/repos/asf/hbase/blob/60e7e6ae/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ClosedRegionHandler.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ClosedRegionHandler.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ClosedRegionHandler.java
index ac61dc0..e0e4ee4 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ClosedRegionHandler.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ClosedRegionHandler.java
@@ -93,7 +93,8 @@ public class ClosedRegionHandler extends EventHandler 
implements TotesHRegionInf
     LOG.debug("Handling CLOSED event for " + regionInfo.getEncodedName());
     // Check if this table is being disabled or not
     if 
(this.assignmentManager.getTableStateManager().isTableState(this.regionInfo.getTable(),
-        ZooKeeperProtos.Table.State.DISABLED, 
ZooKeeperProtos.Table.State.DISABLING)) {
+        ZooKeeperProtos.Table.State.DISABLED, 
ZooKeeperProtos.Table.State.DISABLING) ||
+        assignmentManager.getReplicasToClose().contains(regionInfo)) {
       assignmentManager.offlineDisabledRegion(regionInfo);
       return;
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/60e7e6ae/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
index 907d5ca..2cc7ec5 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
@@ -275,7 +275,8 @@ public class ServerShutdownHandler extends EventHandler {
             } else if (rit != null) {
               if ((rit.isPendingCloseOrClosing() || rit.isOffline())
                   && am.getTableStateManager().isTableState(hri.getTable(),
-                  ZooKeeperProtos.Table.State.DISABLED, 
ZooKeeperProtos.Table.State.DISABLING)) {
+                  ZooKeeperProtos.Table.State.DISABLED, 
ZooKeeperProtos.Table.State.DISABLING) ||
+                  am.getReplicasToClose().contains(hri)) {
                 // If the table was partially disabled and the RS went down, 
we should clear the RIT
                 // and remove the node for the region.
                 // The rit that we use may be stale in case the table was in 
DISABLING state

http://git-wip-us.apache.org/repos/asf/hbase/blob/60e7e6ae/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
index 5a66433..0a360e2 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
@@ -158,6 +158,24 @@ public class StoreFileInfo {
   }
 
   /**
+   * Create a Store File Info from an HFileLink
+   * @param conf
+   * @param fs
+   * @param fileStatus
+   * @param reference
+   * @throws IOException
+   */
+  public StoreFileInfo(final Configuration conf, final FileSystem fs, final 
FileStatus fileStatus,
+      final Reference reference)
+      throws IOException {
+    this.fs = fs;
+    this.conf = conf;
+    this.initialPath = fileStatus.getPath();
+    this.reference = reference;
+    this.link = null;
+  }
+
+  /**
    * Sets the region coprocessor env.
    * @param coprocessorHost
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/60e7e6ae/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
index 0ea49ad..7dafa68 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.io.HFileLink;
+import org.apache.hadoop.hbase.io.Reference;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
 
@@ -83,6 +84,11 @@ public class ServerRegionReplicaUtil extends 
RegionReplicaUtil {
       return new StoreFileInfo(conf, fs, status);
     }
 
+    if (StoreFileInfo.isReference(status.getPath())) {
+      Reference reference = Reference.read(fs, status.getPath());
+      return new StoreFileInfo(conf, fs, status, reference);
+    }
+
     // else create a store file link. The link file does not exists on 
filesystem though.
     HFileLink link = HFileLink.build(conf, regionInfoForFs.getTable(),
             regionInfoForFs.getEncodedName(), familyName, 
status.getPath().getName());

http://git-wip-us.apache.org/repos/asf/hbase/blob/60e7e6ae/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
index b56fe3c..a8b21ba 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
@@ -97,7 +97,7 @@ public class TestReplicasClient {
     static final AtomicLong sleepTime = new AtomicLong(0);
     static final AtomicBoolean slowDownNext = new AtomicBoolean(false);
     static final AtomicInteger countOfNext = new AtomicInteger(0);
-    static final AtomicReference<CountDownLatch> cdl =
+    private static final AtomicReference<CountDownLatch> cdl =
         new AtomicReference<CountDownLatch>(new CountDownLatch(0));
     Random r = new Random();
     public SlowMeCopro() {
@@ -134,7 +134,7 @@ public class TestReplicasClient {
 
     private void slowdownCode(final 
ObserverContext<RegionCoprocessorEnvironment> e) {
       if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) {
-        CountDownLatch latch = cdl.get();
+        CountDownLatch latch = getCdl().get();
         try {
           if (sleepTime.get() > 0) {
             LOG.info("Sleeping for " + sleepTime.get() + " ms");
@@ -153,6 +153,10 @@ public class TestReplicasClient {
         LOG.info("We're not the primary replicas.");
       }
     }
+
+    public static AtomicReference<CountDownLatch> getCdl() {
+      return cdl;
+    }
   }
 
   @BeforeClass
@@ -288,7 +292,7 @@ public class TestReplicasClient {
   public void testUseRegionWithoutReplica() throws Exception {
     byte[] b1 = "testUseRegionWithoutReplica".getBytes();
     openRegion(hriSecondary);
-    SlowMeCopro.cdl.set(new CountDownLatch(0));
+    SlowMeCopro.getCdl().set(new CountDownLatch(0));
     try {
       Get g = new Get(b1);
       Result r = table.get(g);
@@ -344,14 +348,14 @@ public class TestReplicasClient {
     byte[] b1 = "testGetNoResultStaleRegionWithReplica".getBytes();
     openRegion(hriSecondary);
 
-    SlowMeCopro.cdl.set(new CountDownLatch(1));
+    SlowMeCopro.getCdl().set(new CountDownLatch(1));
     try {
       Get g = new Get(b1);
       g.setConsistency(Consistency.TIMELINE);
       Result r = table.get(g);
       Assert.assertTrue(r.isStale());
     } finally {
-      SlowMeCopro.cdl.get().countDown();
+      SlowMeCopro.getCdl().get().countDown();
       closeRegion(hriSecondary);
     }
   }
@@ -462,13 +466,13 @@ public class TestReplicasClient {
       LOG.info("sleep and is not stale done");
 
       // But if we ask for stale we will get it
-      SlowMeCopro.cdl.set(new CountDownLatch(1));
+      SlowMeCopro.getCdl().set(new CountDownLatch(1));
       g = new Get(b1);
       g.setConsistency(Consistency.TIMELINE);
       r = table.get(g);
       Assert.assertTrue(r.isStale());
       Assert.assertTrue(r.getColumnCells(f, b1).isEmpty());
-      SlowMeCopro.cdl.get().countDown();
+      SlowMeCopro.getCdl().get().countDown();
 
       LOG.info("stale done");
 
@@ -481,14 +485,14 @@ public class TestReplicasClient {
       LOG.info("exists not stale done");
 
       // exists works on stale but don't see the put
-      SlowMeCopro.cdl.set(new CountDownLatch(1));
+      SlowMeCopro.getCdl().set(new CountDownLatch(1));
       g = new Get(b1);
       g.setCheckExistenceOnly(true);
       g.setConsistency(Consistency.TIMELINE);
       r = table.get(g);
       Assert.assertTrue(r.isStale());
       Assert.assertFalse("The secondary has stale data", r.getExists());
-      SlowMeCopro.cdl.get().countDown();
+      SlowMeCopro.getCdl().get().countDown();
       LOG.info("exists stale before flush done");
 
       flushRegion(hriPrimary);
@@ -497,28 +501,28 @@ public class TestReplicasClient {
       Thread.sleep(1000 + REFRESH_PERIOD * 2);
 
       // get works and is not stale
-      SlowMeCopro.cdl.set(new CountDownLatch(1));
+      SlowMeCopro.getCdl().set(new CountDownLatch(1));
       g = new Get(b1);
       g.setConsistency(Consistency.TIMELINE);
       r = table.get(g);
       Assert.assertTrue(r.isStale());
       Assert.assertFalse(r.isEmpty());
-      SlowMeCopro.cdl.get().countDown();
+      SlowMeCopro.getCdl().get().countDown();
       LOG.info("stale done");
 
       // exists works on stale and we see the put after the flush
-      SlowMeCopro.cdl.set(new CountDownLatch(1));
+      SlowMeCopro.getCdl().set(new CountDownLatch(1));
       g = new Get(b1);
       g.setCheckExistenceOnly(true);
       g.setConsistency(Consistency.TIMELINE);
       r = table.get(g);
       Assert.assertTrue(r.isStale());
       Assert.assertTrue(r.getExists());
-      SlowMeCopro.cdl.get().countDown();
+      SlowMeCopro.getCdl().get().countDown();
       LOG.info("exists stale after flush done");
 
     } finally {
-      SlowMeCopro.cdl.get().countDown();
+      SlowMeCopro.getCdl().get().countDown();
       SlowMeCopro.sleepTime.set(0);
       Delete d = new Delete(b1);
       table.delete(d);
@@ -632,7 +636,7 @@ public class TestReplicasClient {
       SlowMeCopro.slowDownNext.set(false);
       SlowMeCopro.countOfNext.set(0);
     } finally {
-      SlowMeCopro.cdl.get().countDown();
+      SlowMeCopro.getCdl().get().countDown();
       SlowMeCopro.sleepTime.set(0);
       SlowMeCopro.slowDownNext.set(false);
       SlowMeCopro.countOfNext.set(0);

http://git-wip-us.apache.org/repos/asf/hbase/blob/60e7e6ae/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
index 9337786..8391782 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.commons.lang.math.RandomUtils;
@@ -43,6 +44,7 @@ import org.apache.hadoop.hbase.UnknownRegionException;
 import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
@@ -288,6 +290,45 @@ public class TestRegionMergeTransactionOnCluster {
     }
   }
 
+  @Test
+  public void testMergeWithReplicas() throws Exception {
+    final TableName tableName = TableName.valueOf("testMergeWithReplicas");
+    // Create table and load data.
+    createTableAndLoadData(master, tableName, 5, 2);
+    List<Pair<HRegionInfo, ServerName>> initialRegionToServers =
+        MetaTableAccessor.getTableRegionsAndLocations(master.getZooKeeper(), 
master.getConnection(),
+           tableName);
+    // Merge 1st and 2nd region
+    PairOfSameType<HRegionInfo> mergedRegions = 
mergeRegionsAndVerifyRegionNum(master, tableName,
+        0, 2, 5 * 2 - 2);
+    List<Pair<HRegionInfo, ServerName>> currentRegionToServers =
+        MetaTableAccessor.getTableRegionsAndLocations(master.getZooKeeper(), 
master.getConnection(),
+           tableName);
+    List<HRegionInfo> initialRegions = new ArrayList<HRegionInfo>();
+    for (Pair<HRegionInfo, ServerName> p : initialRegionToServers) {
+      initialRegions.add(p.getFirst());
+    }
+    List<HRegionInfo> currentRegions = new ArrayList<HRegionInfo>();
+    for (Pair<HRegionInfo, ServerName> p : currentRegionToServers) {
+      currentRegions.add(p.getFirst());
+    }
+    assertTrue(initialRegions.contains(mergedRegions.getFirst())); //this is 
the first region
+    
assertTrue(initialRegions.contains(RegionReplicaUtil.getRegionInfoForReplica(
+        mergedRegions.getFirst(), 1))); //this is the replica of the first 
region
+    assertTrue(initialRegions.contains(mergedRegions.getSecond())); //this is 
the second region
+    
assertTrue(initialRegions.contains(RegionReplicaUtil.getRegionInfoForReplica(
+        mergedRegions.getSecond(), 1))); //this is the replica of the second 
region
+    assertTrue(!initialRegions.contains(currentRegions.get(0))); //this is the 
new region
+    
assertTrue(!initialRegions.contains(RegionReplicaUtil.getRegionInfoForReplica(
+        currentRegions.get(0), 1))); //replica of the new region
+    
assertTrue(currentRegions.contains(RegionReplicaUtil.getRegionInfoForReplica(
+        currentRegions.get(0), 1))); //replica of the new region
+    
assertTrue(!currentRegions.contains(RegionReplicaUtil.getRegionInfoForReplica(
+        mergedRegions.getFirst(), 1))); //replica of the merged region
+    
assertTrue(!currentRegions.contains(RegionReplicaUtil.getRegionInfoForReplica(
+        mergedRegions.getSecond(), 1))); //replica of the merged region
+  }
+
   private PairOfSameType<HRegionInfo> mergeRegionsAndVerifyRegionNum(
       HMaster master, TableName tablename,
       int regionAnum, int regionBnum, int expectedRegionNum) throws Exception {
@@ -336,11 +377,11 @@ public class TestRegionMergeTransactionOnCluster {
 
   private Table createTableAndLoadData(HMaster master, TableName tablename)
       throws Exception {
-    return createTableAndLoadData(master, tablename, INITIAL_REGION_NUM);
+    return createTableAndLoadData(master, tablename, INITIAL_REGION_NUM, 1);
   }
 
   private Table createTableAndLoadData(HMaster master, TableName tablename,
-      int numRegions) throws Exception {
+      int numRegions, int replication) throws Exception {
     assertTrue("ROWSIZE must > numregions:" + numRegions, ROWSIZE > 
numRegions);
     byte[][] splitRows = new byte[numRegions - 1][];
     for (int i = 0; i < splitRows.length; i++) {
@@ -348,6 +389,9 @@ public class TestRegionMergeTransactionOnCluster {
     }
 
     Table table = TEST_UTIL.createTable(tablename, FAMILYNAME, splitRows);
+    if (replication > 1) {
+      HBaseTestingUtility.setReplicas(admin, tablename, replication);
+    }
     loadData(table);
     verifyRowCount(table, ROWSIZE);
 
@@ -357,7 +401,7 @@ public class TestRegionMergeTransactionOnCluster {
     while (System.currentTimeMillis() < timeout) {
       tableRegions = 
MetaTableAccessor.getTableRegionsAndLocations(master.getZooKeeper(),
         master.getConnection(), tablename);
-      if (tableRegions.size() == numRegions)
+      if (tableRegions.size() == numRegions * replication)
         break;
       Thread.sleep(250);
     }
@@ -366,7 +410,7 @@ public class TestRegionMergeTransactionOnCluster {
       master.getZooKeeper(),
       master.getConnection(), tablename);
     LOG.info("Regions after load: " + Joiner.on(',').join(tableRegions));
-    assertEquals(numRegions, tableRegions.size());
+    assertEquals(numRegions * replication, tableRegions.size());
     return table;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/60e7e6ae/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
index 7248535..45ee2d6 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
@@ -60,7 +60,9 @@ import org.apache.hadoop.hbase.ZooKeeperConnectionException;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Consistency;
 import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Mutation;
@@ -69,6 +71,7 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TestReplicasClient.SlowMeCopro;
 import org.apache.hadoop.hbase.coordination.ZKSplitTransactionCoordination;
 import org.apache.hadoop.hbase.coordination.ZkCloseRegionCoordination;
 import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
@@ -966,6 +969,87 @@ public class TestSplitTransactionOnCluster {
     }
   }
 
+  @Test
+  public void testSplitWithRegionReplicas() throws Exception {
+    ZooKeeperWatcher zkw = 
HBaseTestingUtility.getZooKeeperWatcher(TESTING_UTIL);
+    final TableName tableName =
+        TableName.valueOf("foobar");
+    HTableDescriptor htd = TESTING_UTIL.createTableDescriptor("foobar");
+    htd.setRegionReplication(2);
+    htd.addCoprocessor(SlowMeCopro.class.getName());
+    // Create table then get the single region for our new table.
+    HTable t = TESTING_UTIL.createTable(htd, new byte[][]{Bytes.toBytes("cf")},
+        TESTING_UTIL.getConfiguration());
+    int count;
+    List<HRegion> oldRegions;
+    do {
+      oldRegions = cluster.getRegions(tableName);
+      Thread.sleep(10);
+    } while (oldRegions.size() != 2);
+    for (HRegion h : oldRegions) LOG.debug("OLDREGION " + h.getRegionInfo());
+    try {
+      int regionServerIndex = 
cluster.getServerWith(oldRegions.get(0).getRegionName());
+      HRegionServer regionServer = cluster.getRegionServer(regionServerIndex);
+      insertData(tableName, admin, t);
+      // Turn off balancer so it doesn't cut in and mess up our placements.
+      admin.setBalancerRunning(false, true);
+      // Turn off the meta scanner so it don't remove parent on us.
+      cluster.getMaster().setCatalogJanitorEnabled(false);
+      boolean tableExists = 
MetaTableAccessor.tableExists(regionServer.getConnection(),
+          tableName);
+      assertEquals("The specified table should be present.", true, 
tableExists);
+      final HRegion region = findSplittableRegion(oldRegions);
+      regionServerIndex = cluster.getServerWith(region.getRegionName());
+      regionServer = cluster.getRegionServer(regionServerIndex);
+      assertTrue("not able to find a splittable region", region != null);
+      String node = ZKAssign.getNodeName(regionServer.getZooKeeper(),
+          region.getRegionInfo().getEncodedName());
+      regionServer.getZooKeeper().sync(node);
+      SplitTransaction st = new SplitTransaction(region, 
Bytes.toBytes("row2"));
+      try {
+        st.prepare();
+        st.execute(regionServer, regionServer);
+      } catch (IOException e) {
+        e.printStackTrace();
+        fail("Split execution should have succeeded with no exceptions thrown 
" + e);
+      }
+      //TESTING_UTIL.waitUntilAllRegionsAssigned(tableName);
+      List<HRegion> newRegions;
+      do {
+        newRegions = cluster.getRegions(tableName);
+        for (HRegion h : newRegions) LOG.debug("NEWREGION " + 
h.getRegionInfo());
+        Thread.sleep(1000);
+      } while ((newRegions.contains(oldRegions.get(0)) || 
newRegions.contains(oldRegions.get(1)))
+          || newRegions.size() != 4);
+      tableExists = MetaTableAccessor.tableExists(regionServer.getConnection(),
+          tableName);
+      assertEquals("The specified table should be present.", true, 
tableExists);
+      // exists works on stale and we see the put after the flush
+      byte[] b1 = "row1".getBytes();
+      Get g = new Get(b1);
+      g.setConsistency(Consistency.STRONG);
+      // The following GET will make a trip to the meta to get the new 
location of the 1st daughter
+      // In the process it will also get the location of the replica of the 
daughter (initially
+      // pointing to the parent's replica)
+      Result r = t.get(g);
+      Assert.assertFalse(r.isStale());
+      LOG.info("exists stale after flush done");
+
+      SlowMeCopro.getCdl().set(new CountDownLatch(1));
+      g = new Get(b1);
+      g.setConsistency(Consistency.TIMELINE);
+      // This will succeed because in the previous GET we get the location of 
the replica
+      r = t.get(g);
+      Assert.assertTrue(r.isStale());
+      SlowMeCopro.getCdl().get().countDown();
+    } finally {
+      SlowMeCopro.getCdl().get().countDown();
+      admin.setBalancerRunning(true, false);
+      cluster.getMaster().setCatalogJanitorEnabled(true);
+      t.close();
+    }
+  }
+
   private void insertData(final TableName tableName, HBaseAdmin admin, Table 
t) throws IOException,
       InterruptedException {
     Put p = new Put(Bytes.toBytes("row1"));
@@ -1365,7 +1449,7 @@ public class TestSplitTransactionOnCluster {
   private HRegion findSplittableRegion(final List<HRegion> regions) throws 
InterruptedException {
     for (int i = 0; i < 5; ++i) {
       for (HRegion r: regions) {
-        if (r.isSplittable()) {
+        if (r.isSplittable() && r.getRegionInfo().getReplicaId() == 0) {
           return(r);
         }
       }

Reply via email to