HBASE-19935 Only allow table replication for sync replication for now

Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/785a77c1
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/785a77c1
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/785a77c1

Branch: refs/heads/HBASE-19064
Commit: 785a77c115d5088f397ac96e6d758ad3fff4af8d
Parents: 00d450e
Author: Guanghao Zhang <zg...@apache.org>
Authored: Tue Feb 6 16:00:59 2018 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Thu Feb 8 16:49:19 2018 +0800

----------------------------------------------------------------------
 .../replication/ReplicationPeerConfig.java      |  9 +++
 .../replication/ReplicationPeerManager.java     | 34 ++++++++-
 .../replication/TestReplicationAdmin.java       | 73 ++++++++++++++------
 .../wal/TestCombinedAsyncWriter.java            |  6 ++
 .../wal/TestSyncReplicationWALProvider.java     |  6 ++
 5 files changed, 102 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/785a77c1/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
index 4c10c46..69565a7 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
@@ -25,6 +25,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
+
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -217,6 +219,13 @@ public class ReplicationPeerConfig {
     return this.remoteWALDir;
   }
 
+  /**
+   * Use remote wal dir to decide whether a peer is sync replication peer
+   */
+  public boolean isSyncReplication() {
+    return !StringUtils.isBlank(this.remoteWALDir);
+  }
+
   public static ReplicationPeerConfigBuilder newBuilder() {
     return new ReplicationPeerConfigBuilderImpl();
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/785a77c1/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
index 9336fbd..6bfd9c9 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
@@ -167,7 +167,7 @@ public class ReplicationPeerManager {
               " does not match new remote wal dir '" + 
peerConfig.getRemoteWALDir() + "'");
     }
 
-    if (oldPeerConfig.getRemoteWALDir() != null) {
+    if (oldPeerConfig.isSyncReplication()) {
       if (!ReplicationUtils.isKeyConfigEqual(oldPeerConfig, peerConfig)) {
         throw new DoNotRetryIOException(
             "Changing the replicated namespace/table config on a synchronous 
replication "
@@ -195,8 +195,8 @@ public class ReplicationPeerManager {
     }
     ReplicationPeerConfig copiedPeerConfig = 
ReplicationPeerConfig.newBuilder(peerConfig).build();
     SyncReplicationState syncReplicationState =
-        StringUtils.isBlank(peerConfig.getRemoteWALDir()) ? 
SyncReplicationState.NONE
-            : SyncReplicationState.DOWNGRADE_ACTIVE;
+        copiedPeerConfig.isSyncReplication() ? 
SyncReplicationState.DOWNGRADE_ACTIVE
+            : SyncReplicationState.NONE;
     peerStorage.addPeer(peerId, copiedPeerConfig, enabled, 
syncReplicationState);
     peers.put(peerId,
       new ReplicationPeerDescription(peerId, enabled, copiedPeerConfig, 
syncReplicationState));
@@ -316,9 +316,37 @@ public class ReplicationPeerManager {
         peerConfig.getTableCFsMap());
     }
 
+    if (peerConfig.isSyncReplication()) {
+      checkPeerConfigForSyncReplication(peerConfig);
+    }
+
     checkConfiguredWALEntryFilters(peerConfig);
   }
 
+  private void checkPeerConfigForSyncReplication(ReplicationPeerConfig 
peerConfig)
+      throws DoNotRetryIOException {
+    // This is used to reduce the difficulty for implementing the sync 
replication state transition
+    // as we need to reopen all the related regions.
+    // TODO: Add namespace, replicat_all flag back
+    if (peerConfig.replicateAllUserTables()) {
+      throw new DoNotRetryIOException(
+          "Only support replicated table config for sync replication peer");
+    }
+    if (peerConfig.getNamespaces() != null && 
!peerConfig.getNamespaces().isEmpty()) {
+      throw new DoNotRetryIOException(
+          "Only support replicated table config for sync replication peer");
+    }
+    if (peerConfig.getTableCFsMap() == null || 
peerConfig.getTableCFsMap().isEmpty()) {
+      throw new DoNotRetryIOException("Need config replicated tables for sync 
replication peer");
+    }
+    for (List<String> cfs : peerConfig.getTableCFsMap().values()) {
+      if (cfs != null && !cfs.isEmpty()) {
+        throw new DoNotRetryIOException(
+            "Only support replicated table config for sync replication peer");
+      }
+    }
+  }
+
   /**
    * Set a namespace in the peer config means that all tables in this 
namespace will be replicated
    * to the peer cluster.

http://git-wip-us.apache.org/repos/asf/hbase/blob/785a77c1/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
index a7710e7..d462dbd 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
@@ -26,6 +26,7 @@ import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -910,6 +911,8 @@ public class TestReplicationAdmin {
 
   @Test
   public void testPeerRemoteWALDir() throws Exception {
+    TableName tableName = TableName.valueOf(name.getMethodName());
+
     String rootDir = "hdfs://srv1:9999/hbase";
     ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder();
     builder.setClusterKey(KEY_ONE);
@@ -929,57 +932,74 @@ public class TestReplicationAdmin {
     builder = ReplicationPeerConfig.newBuilder();
     builder.setClusterKey(KEY_SECOND);
     builder.setRemoteWALDir(rootDir);
-    hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build());
 
-    rpc = hbaseAdmin.getReplicationPeerConfig(ID_SECOND);
-    assertEquals(rootDir, rpc.getRemoteWALDir());
+    try {
+      hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build());
+      fail("Only support replicated table config for sync replication");
+    } catch (Exception e) {
+      // OK
+    }
 
+    builder.setReplicateAllUserTables(false);
     try {
-      builder.setRemoteWALDir("hdfs://srv2:8888/hbase");
-      hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build());
-      fail("Change remote wal dir is not allowed");
+      Set<String> namespaces = new HashSet<String>();
+      namespaces.add("ns1");
+      builder.setNamespaces(namespaces);
+      hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build());
+      fail("Only support replicated table config for sync replication");
     } catch (Exception e) {
       // OK
     }
 
+    builder.setNamespaces(null);
     try {
-      builder.setRemoteWALDir(null);
-      hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build());
-      fail("Change remote wal dir is not allowed");
+      hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build());
+      fail("Only support replicated table config for sync replication, and 
tables can't be empty");
     } catch (Exception e) {
       // OK
     }
 
+    Map<TableName, List<String>> tableCfs = new HashMap<>();
     try {
-      builder = ReplicationPeerConfig.newBuilder(rpc);
-      builder.setReplicateAllUserTables(false);
+      tableCfs.put(tableName, Arrays.asList("cf1"));
+      builder.setTableCFsMap(tableCfs);
+      hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build());
+      fail("Only support replicated table config for sync replication");
+    } catch (Exception e) {
+      // OK
+    }
+
+    tableCfs = new HashMap<>();
+    tableCfs.put(tableName, new ArrayList<>());
+    builder.setTableCFsMap(tableCfs);
+    hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build());
+    rpc = hbaseAdmin.getReplicationPeerConfig(ID_SECOND);
+    assertEquals(rootDir, rpc.getRemoteWALDir());
+
+    try {
+      builder.setRemoteWALDir("hdfs://srv2:8888/hbase");
       hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build());
-      fail(
-        "Change replicated namespace/table config on an existing synchronous 
peer is not allowed");
+      fail("Change remote wal dir is not allowed");
     } catch (Exception e) {
       // OK
     }
 
     try {
-      builder = ReplicationPeerConfig.newBuilder(rpc);
-      Set<String> namespaces = new HashSet<>();
-      namespaces.add("ns1");
-      builder.setExcludeNamespaces(namespaces);
+      builder.setRemoteWALDir(null);
       hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build());
-      fail(
-        "Change replicated namespace/table config on an existing synchronous 
peer is not allowed");
+      fail("Change remote wal dir is not allowed");
     } catch (Exception e) {
       // OK
     }
 
     try {
       builder = ReplicationPeerConfig.newBuilder(rpc);
-      Map<TableName, List<String>> tableCfs = new HashMap<>();
-      tableCfs.put(TableName.valueOf(name.getMethodName()), new ArrayList<>());
-      builder.setExcludeTableCFsMap(tableCfs);
+      tableCfs = new HashMap<>();
+      tableCfs.put(TableName.valueOf("ns1:" + name.getMethodName()), new 
ArrayList<>());
+      builder.setTableCFsMap(tableCfs);
       hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build());
       fail(
-        "Change replicated namespace/table config on an existing synchronous 
peer is not allowed");
+        "Change replicated table config on an existing synchronous peer is not 
allowed");
     } catch (Exception e) {
       // OK
     }
@@ -987,8 +1007,11 @@ public class TestReplicationAdmin {
 
   @Test
   public void testTransitSyncReplicationPeerState() throws Exception {
+    TableName tableName = TableName.valueOf(name.getMethodName());
+
     ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder();
     builder.setClusterKey(KEY_ONE);
+    builder.setReplicateAllUserTables(false);
     hbaseAdmin.addReplicationPeer(ID_ONE, builder.build());
     assertEquals(SyncReplicationState.NONE,
       hbaseAdmin.getReplicationPeerSyncReplicationState(ID_ONE));
@@ -1005,6 +1028,10 @@ public class TestReplicationAdmin {
     builder = ReplicationPeerConfig.newBuilder();
     builder.setClusterKey(KEY_SECOND);
     builder.setRemoteWALDir(rootDir);
+    builder.setReplicateAllUserTables(false);
+    Map<TableName, List<String>> tableCfs = new HashMap<>();
+    tableCfs.put(tableName, new ArrayList<>());
+    builder.setTableCFsMap(tableCfs);
     hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build());
     assertEquals(SyncReplicationState.DOWNGRADE_ACTIVE,
       hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND));

http://git-wip-us.apache.org/repos/asf/hbase/blob/785a77c1/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java
index 36dbe0f..07aa6a8 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java
@@ -23,6 +23,7 @@ import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -33,6 +34,7 @@ import 
org.apache.hadoop.hbase.wal.AsyncFSWALProvider.AsyncWriter;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -51,6 +53,10 @@ import 
org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
 @Category({ RegionServerTests.class, MediumTests.class })
 public class TestCombinedAsyncWriter {
 
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestCombinedAsyncWriter.class);
+
   private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
 
   private static EventLoopGroup EVENT_LOOP_GROUP;

http://git-wip-us.apache.org/repos/asf/hbase/blob/785a77c1/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java
index 60a9e13..f09e51e 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertThat;
 import java.io.IOException;
 import java.util.Optional;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
@@ -41,12 +42,17 @@ import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 @Category({ RegionServerTests.class, MediumTests.class })
 public class TestSyncReplicationWALProvider {
 
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestSyncReplicationWALProvider.class);
+
   private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
 
   private static String PEER_ID = "1";

Reply via email to