HBASE-20377 Deal with table in enabling and disabling state when modifying 
serial replication peer


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/5a633adf
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/5a633adf
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/5a633adf

Branch: refs/heads/HBASE-19064
Commit: 5a633adffead3b979f6e1a607994409978b0ea74
Parents: 826909a
Author: zhangduo <zhang...@apache.org>
Authored: Fri Apr 13 16:21:03 2018 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Fri Apr 13 20:33:29 2018 +0800

----------------------------------------------------------------------
 .../apache/hadoop/hbase/client/TableState.java  | 14 ++++
 .../master/procedure/DisableTableProcedure.java |  6 +-
 .../master/replication/ModifyPeerProcedure.java | 84 +++++++++++++-------
 .../TestAddToSerialReplicationPeer.java         | 74 +++++++++++++++++
 4 files changed, 146 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/5a633adf/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableState.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableState.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableState.java
index cc3b765..40612e9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableState.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableState.java
@@ -104,6 +104,13 @@ public class TableState {
   }
 
   /**
+   * @return True if table is {@link State#ENABLING}.
+   */
+  public boolean isEnabling() {
+    return isInStates(State.ENABLING);
+  }
+
+  /**
    * @return True if {@link State#ENABLED} or {@link State#ENABLING}
    */
   public boolean isEnabledOrEnabling() {
@@ -118,6 +125,13 @@ public class TableState {
   }
 
   /**
+   * @return True if table is disabling.
+   */
+  public boolean isDisabling() {
+    return isInStates(State.DISABLING);
+  }
+
+  /**
    * @return True if {@link State#DISABLED} or {@link State#DISABLED}
    */
   public boolean isDisabledOrDisabling() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/5a633adf/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java
index f5caff7..685a73e 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.master.procedure;
 
 import java.io.IOException;
 import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotEnabledException;
@@ -107,7 +108,7 @@ public class DisableTableProcedure
           break;
         case DISABLE_TABLE_MARK_REGIONS_OFFLINE:
           
addChildProcedure(env.getAssignmentManager().createUnassignProcedures(tableName));
-          
setNextState(DisableTableState.DISABLE_TABLE_SET_DISABLED_TABLE_STATE);
+          
setNextState(DisableTableState.DISABLE_TABLE_ADD_REPLICATION_BARRIER);
           break;
         case DISABLE_TABLE_ADD_REPLICATION_BARRIER:
           if (env.getMasterServices().getTableDescriptors().get(tableName)
@@ -119,7 +120,8 @@ public class DisableTableProcedure
                 .getRegionsOfTable(tableName)) {
                 long maxSequenceId =
                   WALSplitter.getMaxRegionSequenceId(mfs.getFileSystem(), 
mfs.getRegionDir(region));
-                
mutator.mutate(MetaTableAccessor.makePutForReplicationBarrier(region, 
maxSequenceId,
+                long openSeqNum = maxSequenceId > 0 ? maxSequenceId + 1 : 
HConstants.NO_SEQNUM;
+                
mutator.mutate(MetaTableAccessor.makePutForReplicationBarrier(region, 
openSeqNum,
                   EnvironmentEdgeManager.currentTime()));
               }
             }

http://git-wip-us.apache.org/repos/asf/hbase/blob/5a633adf/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
index 8bedeff..0b9efce 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
@@ -18,17 +18,16 @@
 package org.apache.hadoop.hbase.master.replication;
 
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.master.MasterFileSystem;
+import org.apache.hadoop.hbase.client.TableState;
 import org.apache.hadoop.hbase.master.TableStateManager;
 import 
org.apache.hadoop.hbase.master.TableStateManager.TableStateNotFoundException;
-import org.apache.hadoop.hbase.master.assignment.RegionStates;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
 import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
 import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
@@ -38,7 +37,6 @@ import 
org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.replication.ReplicationUtils;
 import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.wal.WALSplitter;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,6 +54,9 @@ public abstract class ModifyPeerProcedure extends 
AbstractPeerProcedure<PeerModi
 
   protected static final int UPDATE_LAST_SEQ_ID_BATCH_SIZE = 1000;
 
+  // The sleep interval when waiting table to be enabled or disabled.
+  protected static final int SLEEP_INTERVAL_MS = 1000;
+
   protected ModifyPeerProcedure() {
   }
 
@@ -126,6 +127,27 @@ public abstract class ModifyPeerProcedure extends 
AbstractPeerProcedure<PeerModi
     throw new UnsupportedOperationException();
   }
 
+  // If the table is in enabling state, we need to wait until it is enabled 
and then reopen all its
+  // regions.
+  private boolean needReopen(TableStateManager tsm, TableName tn) throws 
IOException {
+    for (;;) {
+      try {
+        TableState state = tsm.getTableState(tn);
+        if (state.isEnabled()) {
+          return true;
+        }
+        if (!state.isEnabling()) {
+          return false;
+        }
+        Thread.sleep(SLEEP_INTERVAL_MS);
+      } catch (TableStateNotFoundException e) {
+        return false;
+      } catch (InterruptedException e) {
+        throw (IOException) new 
InterruptedIOException(e.getMessage()).initCause(e);
+      }
+    }
+  }
+
   private void reopenRegions(MasterProcedureEnv env) throws IOException {
     ReplicationPeerConfig peerConfig = getNewPeerConfig();
     ReplicationPeerConfig oldPeerConfig = getOldPeerConfig();
@@ -142,15 +164,10 @@ public abstract class ModifyPeerProcedure extends 
AbstractPeerProcedure<PeerModi
         ReplicationUtils.contains(oldPeerConfig, tn)) {
         continue;
       }
-      try {
-        if (!tsm.getTableState(tn).isEnabled()) {
-          continue;
-        }
-      } catch (TableStateNotFoundException e) {
-        continue;
+      if (needReopen(tsm, tn)) {
+        addChildProcedure(env.getAssignmentManager().createReopenProcedures(
+          env.getAssignmentManager().getRegionStates().getRegionsOfTable(tn)));
       }
-      addChildProcedure(env.getAssignmentManager().createReopenProcedures(
-        env.getAssignmentManager().getRegionStates().getRegionsOfTable(tn)));
     }
   }
 
@@ -183,6 +200,26 @@ public abstract class ModifyPeerProcedure extends 
AbstractPeerProcedure<PeerModi
     }
   }
 
+  // If the table is currently disabling, then we need to wait until it is 
disabled.We will write
+  // replication barrier for a disabled table. And return whether we need to 
update the last pushed
+  // sequence id, if the table has been deleted already, i.e, we hit 
TableStateNotFoundException,
+  // then we do not need to update last pushed sequence id for this table.
+  private boolean needSetLastPushedSequenceId(TableStateManager tsm, TableName 
tn)
+      throws IOException {
+    for (;;) {
+      try {
+        if (!tsm.getTableState(tn).isDisabling()) {
+          return true;
+        }
+        Thread.sleep(SLEEP_INTERVAL_MS);
+      } catch (TableStateNotFoundException e) {
+        return false;
+      } catch (InterruptedException e) {
+        throw (IOException) new 
InterruptedIOException(e.getMessage()).initCause(e);
+      }
+    }
+  }
+
   // Will put the encodedRegionName->lastPushedSeqId pair into the map passed 
in, if the map is
   // large enough we will call queueStorage.setLastSequenceIds and clear the 
map. So the caller
   // should not forget to check whether the map is empty at last, if not you 
should call
@@ -192,26 +229,13 @@ public abstract class ModifyPeerProcedure extends 
AbstractPeerProcedure<PeerModi
     TableStateManager tsm = env.getMasterServices().getTableStateManager();
     ReplicationQueueStorage queueStorage = 
env.getReplicationPeerManager().getQueueStorage();
     Connection conn = env.getMasterServices().getConnection();
-    RegionStates regionStates = env.getAssignmentManager().getRegionStates();
-    MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
-    boolean isTableEnabled;
-    try {
-      isTableEnabled = tsm.getTableState(tableName).isEnabled();
-    } catch (TableStateNotFoundException e) {
+    if (!needSetLastPushedSequenceId(tsm, tableName)) {
       return;
     }
-    if (isTableEnabled) {
-      for (Pair<String, Long> name2Barrier : MetaTableAccessor
-        .getTableEncodedRegionNameAndLastBarrier(conn, tableName)) {
-        addToMap(lastSeqIds, name2Barrier.getFirst(), 
name2Barrier.getSecond().longValue() - 1,
-          queueStorage);
-      }
-    } else {
-      for (RegionInfo region : regionStates.getRegionsOfTable(tableName, 
true)) {
-        long maxSequenceId =
-          WALSplitter.getMaxRegionSequenceId(mfs.getFileSystem(), 
mfs.getRegionDir(region));
-        addToMap(lastSeqIds, region.getEncodedName(), maxSequenceId, 
queueStorage);
-      }
+    for (Pair<String, Long> name2Barrier : MetaTableAccessor
+      .getTableEncodedRegionNameAndLastBarrier(conn, tableName)) {
+      addToMap(lastSeqIds, name2Barrier.getFirst(), 
name2Barrier.getSecond().longValue() - 1,
+        queueStorage);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/5a633adf/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestAddToSerialReplicationPeer.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestAddToSerialReplicationPeer.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestAddToSerialReplicationPeer.java
index 317c120..d3d6cbe 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestAddToSerialReplicationPeer.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestAddToSerialReplicationPeer.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hbase.replication;
 
+import static org.junit.Assert.assertTrue;
+
 import java.io.IOException;
 import java.util.Collections;
 import org.apache.hadoop.fs.Path;
@@ -26,6 +28,8 @@ import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableState;
+import org.apache.hadoop.hbase.master.TableStateManager;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
 import org.apache.hadoop.hbase.replication.regionserver.Replication;
@@ -192,4 +196,74 @@ public class TestAddToSerialReplicationPeer extends 
SerialReplicationTestBase {
     waitUntilReplicationDone(100);
     checkOrder(100);
   }
+
+  @Test
+  public void testDisablingTable() throws Exception {
+    TableName tableName = createTable();
+    try (Table table = UTIL.getConnection().getTable(tableName)) {
+      for (int i = 0; i < 100; i++) {
+        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, 
Bytes.toBytes(i)));
+      }
+    }
+    UTIL.getAdmin().disableTable(tableName);
+    rollAllWALs();
+    TableStateManager tsm = 
UTIL.getMiniHBaseCluster().getMaster().getTableStateManager();
+    tsm.setTableState(tableName, TableState.State.DISABLING);
+    Thread t = new Thread(() -> {
+      try {
+        addPeer(true);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    });
+    t.start();
+    Thread.sleep(5000);
+    // we will wait on the disabling table so the thread should still be alive.
+    assertTrue(t.isAlive());
+    tsm.setTableState(tableName, TableState.State.DISABLED);
+    t.join();
+    UTIL.getAdmin().enableTable(tableName);
+    try (Table table = UTIL.getConnection().getTable(tableName)) {
+      for (int i = 0; i < 100; i++) {
+        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, 
Bytes.toBytes(i)));
+      }
+    }
+    waitUntilReplicationDone(100);
+    checkOrder(100);
+  }
+
+  @Test
+  public void testEnablingTable() throws Exception {
+    TableName tableName = createTable();
+    try (Table table = UTIL.getConnection().getTable(tableName)) {
+      for (int i = 0; i < 100; i++) {
+        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, 
Bytes.toBytes(i)));
+      }
+    }
+    RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0);
+    HRegionServer rs = 
UTIL.getOtherRegionServer(UTIL.getRSForFirstRegionInTable(tableName));
+    moveRegionAndArchiveOldWals(region, rs);
+    TableStateManager tsm = 
UTIL.getMiniHBaseCluster().getMaster().getTableStateManager();
+    tsm.setTableState(tableName, TableState.State.ENABLING);
+    Thread t = new Thread(() -> {
+      try {
+        addPeer(true);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    });
+    t.start();
+    Thread.sleep(5000);
+    // we will wait on the disabling table so the thread should still be alive.
+    assertTrue(t.isAlive());
+    tsm.setTableState(tableName, TableState.State.ENABLED);
+    t.join();
+    try (Table table = UTIL.getConnection().getTable(tableName)) {
+      for (int i = 0; i < 100; i++) {
+        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, 
Bytes.toBytes(i)));
+      }
+    }
+    waitUntilReplicationDone(100);
+    checkOrder(100);
+  }
 }

Reply via email to