HBASE-14717 enable_table_replication command should only create specified table for a peer cluster (Ashish)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a1a19d94 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a1a19d94 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a1a19d94 Branch: refs/heads/hbase-12439 Commit: a1a19d94059dc3750b477ca03f89a77d53224655 Parents: e15c48e Author: tedyu <[email protected]> Authored: Thu Dec 24 11:07:49 2015 -0800 Committer: tedyu <[email protected]> Committed: Thu Dec 24 11:07:49 2015 -0800 ---------------------------------------------------------------------- .../client/replication/ReplicationAdmin.java | 13 ++++- .../replication/ReplicationPeerZKImpl.java | 20 +++++++- .../TestReplicationAdminWithClusters.java | 51 +++++++++++++++++--- 3 files changed, 73 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/a1a19d94/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java index a0bea8b..c2e7489 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java @@ -599,7 +599,17 @@ public class ReplicationAdmin implements Closeable { if (repPeers == null || repPeers.size() <= 0) { throw new IllegalArgumentException("Found no peer cluster for replication."); } + + final TableName onlyTableNameQualifier = TableName.valueOf(tableName.getQualifierAsString()); + for (ReplicationPeer repPeer : repPeers) { + Map<TableName, List<String>> tableCFMap = repPeer.getTableCFs(); + // TODO Currently peer TableCFs will not include namespace so we need to check only for table + // name without namespace in it. Need to correct this logic once we fix HBASE-11386. + if (tableCFMap != null && !tableCFMap.containsKey(onlyTableNameQualifier)) { + continue; + } + Configuration peerConf = repPeer.getConfiguration(); HTableDescriptor htd = null; try (Connection conn = ConnectionFactory.createConnection(peerConf); @@ -638,7 +648,8 @@ public class ReplicationAdmin implements Closeable { try { Pair<ReplicationPeerConfig, Configuration> pair = this.replicationPeers.getPeerConf(peerId); Configuration peerConf = pair.getSecond(); - ReplicationPeer peer = new ReplicationPeerZKImpl(peerConf, peerId, pair.getFirst()); + ReplicationPeer peer = new ReplicationPeerZKImpl(peerConf, peerId, pair.getFirst(), + parseTableCFsFromConfig(this.getPeerTableCFs(peerId))); s = zkw.getRecoverableZooKeeper().exists(peerConf.get(HConstants.ZOOKEEPER_ZNODE_PARENT), null); http://git-wip-us.apache.org/repos/asf/hbase/blob/a1a19d94/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java index 3ac8007..39f6ebc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java @@ -55,8 +55,8 @@ public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closea private TableCFsTracker tableCFsTracker; /** - * Constructor that takes all the objects required to communicate with the - * specified peer, except for the region server addresses. + * Constructor that takes all the objects required to communicate with the specified peer, except + * for the region server addresses. * @param conf configuration object to this peer * @param id string representation of this peer's identifier * @param peerConfig configuration for the replication peer @@ -67,6 +67,22 @@ public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closea this.peerConfig = peerConfig; this.id = id; } + + /** + * Constructor that takes all the objects required to communicate with the specified peer, except + * for the region server addresses. + * @param conf configuration object to this peer + * @param id string representation of this peer's identifier + * @param peerConfig configuration for the replication peer + * @param tableCFs table-cf configuration for this peer + */ + public ReplicationPeerZKImpl(Configuration conf, String id, ReplicationPeerConfig peerConfig, + Map<TableName, List<String>> tableCFs) throws ReplicationException { + this.conf = conf; + this.peerConfig = peerConfig; + this.id = id; + this.tableCFs = tableCFs; + } /** * start a state tracker to check whether this peer is enabled or not http://git-wip-us.apache.org/repos/asf/hbase/blob/a1a19d94/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java index b5899b8..e7bd72c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java @@ -15,6 +15,10 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; @@ -41,6 +45,7 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase { static Connection connection2; static Admin admin1; static Admin admin2; + static ReplicationAdmin adminExt; @BeforeClass public static void setUpBeforeClass() throws Exception { @@ -49,12 +54,14 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase { connection2 = ConnectionFactory.createConnection(conf2); admin1 = connection1.getAdmin(); admin2 = connection2.getAdmin(); + adminExt = new ReplicationAdmin(conf1); } @AfterClass public static void tearDownAfterClass() throws Exception { admin1.close(); admin2.close(); + adminExt.close(); connection1.close(); connection2.close(); TestReplicationBase.tearDownAfterClass(); @@ -65,7 +72,6 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase { admin2.disableTable(tableName); admin2.deleteTable(tableName); assertFalse(admin2.tableExists(tableName)); - ReplicationAdmin adminExt = new ReplicationAdmin(conf1); adminExt.enableTableRep(tableName); assertTrue(admin2.tableExists(tableName)); } @@ -84,7 +90,6 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase { admin2.modifyTable(tableName, table); admin2.enableTable(tableName); - ReplicationAdmin adminExt = new ReplicationAdmin(conf1); adminExt.enableTableRep(tableName); table = admin1.getTableDescriptor(tableName); for (HColumnDescriptor fam : table.getColumnFamilies()) { @@ -101,7 +106,6 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase { admin2.modifyTable(tableName, table); admin2.enableTable(tableName); - ReplicationAdmin adminExt = new ReplicationAdmin(conf1); try { adminExt.enableTableRep(tableName); fail("Exception should be thrown if table descriptors in the clusters are not same."); @@ -120,7 +124,6 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase { @Test(timeout = 300000) public void testDisableAndEnableReplication() throws Exception { - ReplicationAdmin adminExt = new ReplicationAdmin(conf1); adminExt.disableTableRep(tableName); HTableDescriptor table = admin1.getTableDescriptor(tableName); for (HColumnDescriptor fam : table.getColumnFamilies()) { @@ -139,25 +142,57 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase { @Test(timeout = 300000, expected = TableNotFoundException.class) public void testDisableReplicationForNonExistingTable() throws Exception { - ReplicationAdmin adminExt = new ReplicationAdmin(conf1); adminExt.disableTableRep(TableName.valueOf("nonExistingTable")); } @Test(timeout = 300000, expected = TableNotFoundException.class) public void testEnableReplicationForNonExistingTable() throws Exception { - ReplicationAdmin adminExt = new ReplicationAdmin(conf1); adminExt.enableTableRep(TableName.valueOf("nonExistingTable")); } @Test(timeout = 300000, expected = IllegalArgumentException.class) public void testDisableReplicationWhenTableNameAsNull() throws Exception { - ReplicationAdmin adminExt = new ReplicationAdmin(conf1); adminExt.disableTableRep(null); } @Test(timeout = 300000, expected = IllegalArgumentException.class) public void testEnableReplicationWhenTableNameAsNull() throws Exception { - ReplicationAdmin adminExt = new ReplicationAdmin(conf1); adminExt.enableTableRep(null); } + + /* + * Test enable table replication should create table only in user explicit specified table-cfs. + * HBASE-14717 + */ + @Test(timeout = 300000) + public void testEnableReplicationForExplicitSetTableCfs() throws Exception { + TableName tn = TableName.valueOf("testEnableReplicationForSetTableCfs"); + String peerId = "2"; + if (admin2.isTableAvailable(tableName)) { + admin2.disableTable(tableName); + admin2.deleteTable(tableName); + } + assertFalse("Table should not exists in the peer cluster", admin2.isTableAvailable(tableName)); + + Map<TableName, ? extends Collection<String>> tableCfs = + new HashMap<TableName, Collection<String>>(); + tableCfs.put(tn, null); + try { + adminExt.setPeerTableCFs(peerId, tableCfs); + adminExt.enableTableRep(tableName); + assertFalse("Table should not be created if user has set table cfs explicitly for the " + + "peer and this is not part of that collection", + admin2.isTableAvailable(tableName)); + + tableCfs.put(tableName, null); + adminExt.setPeerTableCFs(peerId, tableCfs); + adminExt.enableTableRep(tableName); + assertTrue( + "Table should be created if user has explicitly added table into table cfs collection", + admin2.isTableAvailable(tableName)); + } finally { + adminExt.removePeerTableCFs(peerId, adminExt.getPeerTableCFs(peerId)); + adminExt.disableTableRep(tableName); + } + } }
