HBASE-20285 Delete all last pushed sequence ids when removing a peer or removing the serial flag for a peer
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ead569c9 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ead569c9 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ead569c9 Branch: refs/heads/branch-2 Commit: ead569c9515368c2c7e1932561fd05c77ccd9482 Parents: 83488b8 Author: zhangduo <zhang...@apache.org> Authored: Mon Mar 26 22:17:00 2018 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Mon Apr 9 15:18:44 2018 +0800 ---------------------------------------------------------------------- .../src/main/protobuf/MasterProcedure.proto | 10 +++ .../replication/ReplicationQueueStorage.java | 5 ++ .../replication/ZKReplicationQueueStorage.java | 37 ++++++++++- .../TestZKReplicationQueueStorage.java | 31 ++++++++- .../replication/DisablePeerProcedure.java | 15 +++++ .../master/replication/EnablePeerProcedure.java | 15 +++++ .../master/replication/RemovePeerProcedure.java | 31 ++++++++- .../replication/ReplicationPeerManager.java | 8 ++- .../replication/UpdatePeerConfigProcedure.java | 3 + .../replication/SerialReplicationTestBase.java | 19 +++++- .../TestAddToSerialReplicationPeer.java | 28 ++------ .../replication/TestSerialReplication.java | 68 ++++++++++++++++---- 12 files changed, 227 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/ead569c9/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto ---------------------------------------------------------------------- diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto index f710759..b37557c 100644 --- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto @@ -421,3 +421,13 @@ message UpdatePeerConfigStateData { required ReplicationPeer peer_config = 1; optional ReplicationPeer old_peer_config = 2; } + +message RemovePeerStateData { + optional ReplicationPeer peer_config = 1; +} + +message EnablePeerStateData { +} + +message DisablePeerStateData { +} http://git-wip-us.apache.org/repos/asf/hbase/blob/ead569c9/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java index 99a1e97..cd37ac2 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java @@ -87,6 +87,11 @@ public interface ReplicationQueueStorage { void setLastSequenceIds(String peerId, Map<String, Long> lastSeqIds) throws ReplicationException; /** + * Remove all the max sequence id record for the given peer. + * @param peerId peer id + */ + void removeLastSequenceIds(String peerId) throws ReplicationException; + /** * Get the current position for a specific WAL in a given queue for a given regionserver. * @param serverName the name of the regionserver * @param queueId a String that identifies the queue http://git-wip-us.apache.org/repos/asf/hbase/blob/ead569c9/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java index 2e7a012..96b0b91 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java @@ -102,7 +102,8 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase */ private final String hfileRefsZNode; - private final String regionsZNode; + @VisibleForTesting + final String regionsZNode; public ZKReplicationQueueStorage(ZKWatcher zookeeper, Configuration conf) { super(zookeeper, conf); @@ -312,6 +313,40 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase } @Override + public void removeLastSequenceIds(String peerId) throws ReplicationException { + String suffix = "-" + peerId; + try { + StringBuilder sb = new StringBuilder(regionsZNode); + int regionsZNodeLength = regionsZNode.length(); + int levelOneLength = regionsZNodeLength + 3; + int levelTwoLength = levelOneLength + 3; + List<String> levelOneDirs = ZKUtil.listChildrenNoWatch(zookeeper, regionsZNode); + // it is possible that levelOneDirs is null if we haven't write any last pushed sequence ids + // yet, so we need an extra check here. + if (CollectionUtils.isEmpty(levelOneDirs)) { + return; + } + for (String levelOne : levelOneDirs) { + sb.append(ZNodePaths.ZNODE_PATH_SEPARATOR).append(levelOne); + for (String levelTwo : ZKUtil.listChildrenNoWatch(zookeeper, sb.toString())) { + sb.append(ZNodePaths.ZNODE_PATH_SEPARATOR).append(levelTwo); + for (String znode : ZKUtil.listChildrenNoWatch(zookeeper, sb.toString())) { + if (znode.endsWith(suffix)) { + sb.append(ZNodePaths.ZNODE_PATH_SEPARATOR).append(znode); + ZKUtil.deleteNode(zookeeper, sb.toString()); + sb.setLength(levelTwoLength); + } + } + sb.setLength(levelOneLength); + } + sb.setLength(regionsZNodeLength); + } + } catch (KeeperException e) { + throw new ReplicationException("Failed to remove all last sequence ids, peerId=" + peerId, e); + } + } + + @Override public long getWALPosition(ServerName serverName, String queueId, String fileName) throws ReplicationException { byte[] bytes; http://git-wip-us.apache.org/repos/asf/hbase/blob/ead569c9/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java index 5821271..74a24ac 100644 --- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java +++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java @@ -32,15 +32,17 @@ import java.util.SortedSet; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseZKTestingUtility; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.MD5Hash; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.zookeeper.KeeperException; import org.junit.After; import org.junit.AfterClass; -import org.junit.Assert; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; @@ -71,7 +73,7 @@ public class TestZKReplicationQueueStorage { } @After - public void tearDownAfterTest() throws ReplicationException { + public void tearDownAfterTest() throws ReplicationException, KeeperException, IOException { for (ServerName serverName : STORAGE.getListOfReplicators()) { for (String queue : STORAGE.getAllQueues(serverName)) { STORAGE.removeQueue(serverName, queue); @@ -301,6 +303,29 @@ public class TestZKReplicationQueueStorage { String encodedRegionName = "31d9792f4435b99d9fb1016f6fbc8dc7"; String expectedPath = "/hbase/replication/regions/31/d9/792f4435b99d9fb1016f6fbc8dc7-" + peerId; String path = STORAGE.getSerialReplicationRegionPeerNode(encodedRegionName, peerId); - Assert.assertEquals(expectedPath, path); + assertEquals(expectedPath, path); + } + + @Test + public void testRemoveAllLastPushedSeqIdsForPeer() throws Exception { + String peerId = "1"; + String peerIdToDelete = "2"; + for (int i = 0; i < 100; i++) { + String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i)); + STORAGE.setLastSequenceIds(peerId, ImmutableMap.of(encodedRegionName, (long) i)); + STORAGE.setLastSequenceIds(peerIdToDelete, ImmutableMap.of(encodedRegionName, (long) i)); + } + for (int i = 0; i < 100; i++) { + String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i)); + assertEquals(i, STORAGE.getLastSequenceId(encodedRegionName, peerId)); + assertEquals(i, STORAGE.getLastSequenceId(encodedRegionName, peerIdToDelete)); + } + STORAGE.removeLastSequenceIds(peerIdToDelete); + for (int i = 0; i < 100; i++) { + String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i)); + assertEquals(i, STORAGE.getLastSequenceId(encodedRegionName, peerId)); + assertEquals(HConstants.NO_SEQNUM, + STORAGE.getLastSequenceId(encodedRegionName, peerIdToDelete)); + } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/ead569c9/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/DisablePeerProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/DisablePeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/DisablePeerProcedure.java index 0871575..7bda1d7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/DisablePeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/DisablePeerProcedure.java @@ -20,11 +20,14 @@ package org.apache.hadoop.hbase.master.replication; import java.io.IOException; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.DisablePeerStateData; + /** * The procedure for disabling a replication peer. */ @@ -67,4 +70,16 @@ public class DisablePeerProcedure extends ModifyPeerProcedure { cpHost.postDisableReplicationPeer(peerId); } } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.serializeStateData(serializer); + serializer.serialize(DisablePeerStateData.getDefaultInstance()); + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.deserializeStateData(serializer); + serializer.deserialize(DisablePeerStateData.class); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/ead569c9/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/EnablePeerProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/EnablePeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/EnablePeerProcedure.java index 890462f..530d4cb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/EnablePeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/EnablePeerProcedure.java @@ -20,11 +20,14 @@ package org.apache.hadoop.hbase.master.replication; import java.io.IOException; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.EnablePeerStateData; + /** * The procedure for enabling a replication peer. */ @@ -67,4 +70,16 @@ public class EnablePeerProcedure extends ModifyPeerProcedure { cpHost.postEnableReplicationPeer(peerId); } } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.serializeStateData(serializer); + serializer.serialize(EnablePeerStateData.getDefaultInstance()); + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.deserializeStateData(serializer); + serializer.deserialize(EnablePeerStateData.class); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/ead569c9/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java index 64faf2b..82dc07e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java @@ -18,13 +18,18 @@ package org.apache.hadoop.hbase.master.replication; import java.io.IOException; +import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RemovePeerStateData; + /** * The procedure for removing a replication peer. */ @@ -33,6 +38,8 @@ public class RemovePeerProcedure extends ModifyPeerProcedure { private static final Logger LOG = LoggerFactory.getLogger(RemovePeerProcedure.class); + private ReplicationPeerConfig peerConfig; + public RemovePeerProcedure() { } @@ -51,7 +58,7 @@ public class RemovePeerProcedure extends ModifyPeerProcedure { if (cpHost != null) { cpHost.preRemoveReplicationPeer(peerId); } - env.getReplicationPeerManager().preRemovePeer(peerId); + peerConfig = env.getReplicationPeerManager().preRemovePeer(peerId); } @Override @@ -63,10 +70,32 @@ public class RemovePeerProcedure extends ModifyPeerProcedure { protected void postPeerModification(MasterProcedureEnv env) throws IOException, ReplicationException { env.getReplicationPeerManager().removeAllQueuesAndHFileRefs(peerId); + if (peerConfig.isSerial()) { + env.getReplicationPeerManager().removeAllLastPushedSeqIds(peerId); + } LOG.info("Successfully removed peer {}", peerId); MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); if (cpHost != null) { cpHost.postRemoveReplicationPeer(peerId); } } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.serializeStateData(serializer); + RemovePeerStateData.Builder builder = RemovePeerStateData.newBuilder(); + if (peerConfig != null) { + builder.setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig)); + } + serializer.serialize(builder.build()); + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.deserializeStateData(serializer); + RemovePeerStateData data = serializer.deserialize(RemovePeerStateData.class); + if (data.hasPeerConfig()) { + this.peerConfig = ReplicationPeerConfigUtil.convert(data.getPeerConfig()); + } + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/ead569c9/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java index a0e01e0..87d0111 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java @@ -109,8 +109,8 @@ public class ReplicationPeerManager { return desc; } - void preRemovePeer(String peerId) throws DoNotRetryIOException { - checkPeerExists(peerId); + ReplicationPeerConfig preRemovePeer(String peerId) throws DoNotRetryIOException { + return checkPeerExists(peerId).getPeerConfig(); } void preEnablePeer(String peerId) throws DoNotRetryIOException { @@ -220,6 +220,10 @@ public class ReplicationPeerManager { return desc != null ? Optional.of(desc.getPeerConfig()) : Optional.empty(); } + void removeAllLastPushedSeqIds(String peerId) throws ReplicationException { + queueStorage.removeLastSequenceIds(peerId); + } + void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException { // Here we need two passes to address the problem of claimQueue. Maybe a claimQueue is still // on-going when the refresh peer config procedure is done, if a RS which has already been http://git-wip-us.apache.org/repos/asf/hbase/blob/ead569c9/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java index b7e670a..ccfd4a0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java @@ -107,6 +107,9 @@ public class UpdatePeerConfigProcedure extends ModifyPeerProcedure { @Override protected void postPeerModification(MasterProcedureEnv env) throws IOException, ReplicationException { + if (oldPeerConfig.isSerial() && !peerConfig.isSerial()) { + env.getReplicationPeerManager().removeAllLastPushedSeqIds(peerId); + } LOG.info("Successfully updated peer config of {} to {}", peerId, peerConfig); MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); if (cpHost != null) { http://git-wip-us.apache.org/repos/asf/hbase/blob/ead569c9/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java index b5aae85..4b7fa87 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java @@ -26,8 +26,13 @@ import java.util.UUID; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; @@ -129,7 +134,10 @@ public class SerialReplicationTestBase { @After public void tearDown() throws Exception { - UTIL.getAdmin().removeReplicationPeer(PEER_ID); + Admin admin = UTIL.getAdmin(); + for (ReplicationPeerDescription pd : admin.listReplicationPeers()) { + admin.removeReplicationPeer(pd.getPeerId()); + } rollAllWALs(); if (WRITER != null) { WRITER.close(); @@ -233,4 +241,13 @@ public class SerialReplicationTestBase { assertEquals(expectedEntries, count); } } + + protected final TableName createTable() throws IOException, InterruptedException { + TableName tableName = TableName.valueOf(name.getMethodName()); + UTIL.getAdmin().createTable( + TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder + .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build()); + UTIL.waitTableAvailable(tableName); + return tableName; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/ead569c9/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestAddToSerialReplicationPeer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestAddToSerialReplicationPeer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestAddToSerialReplicationPeer.java index 64b5bb1..317c120 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestAddToSerialReplicationPeer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestAddToSerialReplicationPeer.java @@ -21,14 +21,11 @@ import java.io.IOException; import java.util.Collections; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; -import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; import org.apache.hadoop.hbase.replication.regionserver.Replication; @@ -88,11 +85,7 @@ public class TestAddToSerialReplicationPeer extends SerialReplicationTestBase { @Test public void testAddPeer() throws Exception { - TableName tableName = TableName.valueOf(name.getMethodName()); - UTIL.getAdmin().createTable( - TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder - .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build()); - UTIL.waitTableAvailable(tableName); + TableName tableName = createTable(); try (Table table = UTIL.getConnection().getTable(tableName)) { for (int i = 0; i < 100; i++) { table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); @@ -118,12 +111,7 @@ public class TestAddToSerialReplicationPeer extends SerialReplicationTestBase { .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).build(); UTIL.getAdmin().addReplicationPeer(PEER_ID, peerConfig, true); - TableName tableName = TableName.valueOf(name.getMethodName()); - - UTIL.getAdmin().createTable( - TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder - .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build()); - UTIL.waitTableAvailable(tableName); + TableName tableName = createTable(); try (Table table = UTIL.getConnection().getTable(tableName)) { for (int i = 0; i < 100; i++) { table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); @@ -159,11 +147,7 @@ public class TestAddToSerialReplicationPeer extends SerialReplicationTestBase { .setReplicateAllUserTables(false).setSerial(true).build(); UTIL.getAdmin().addReplicationPeer(PEER_ID, peerConfig, true); - TableName tableName = TableName.valueOf(name.getMethodName()); - UTIL.getAdmin().createTable( - TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder - .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build()); - UTIL.waitTableAvailable(tableName); + TableName tableName = createTable(); try (Table table = UTIL.getConnection().getTable(tableName)) { for (int i = 0; i < 100; i++) { table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); @@ -190,11 +174,7 @@ public class TestAddToSerialReplicationPeer extends SerialReplicationTestBase { @Test public void testDisabledTable() throws Exception { - TableName tableName = TableName.valueOf(name.getMethodName()); - UTIL.getAdmin().createTable( - TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder - .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build()); - UTIL.waitTableAvailable(tableName); + TableName tableName = createTable(); try (Table table = UTIL.getConnection().getTable(tableName)) { for (int i = 0; i < 100; i++) { table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); http://git-wip-us.apache.org/repos/asf/hbase/blob/ead569c9/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java index bedb2ec..07e626b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java @@ -65,11 +65,7 @@ public class TestSerialReplication extends SerialReplicationTestBase { @Test public void testRegionMove() throws Exception { - TableName tableName = TableName.valueOf(name.getMethodName()); - UTIL.getAdmin().createTable( - TableDescriptorBuilder.newBuilder(tableName).addColumnFamily(ColumnFamilyDescriptorBuilder - .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build()); - UTIL.waitTableAvailable(tableName); + TableName tableName = createTable(); try (Table table = UTIL.getConnection().getTable(tableName)) { for (int i = 0; i < 100; i++) { table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); @@ -89,11 +85,7 @@ public class TestSerialReplication extends SerialReplicationTestBase { @Test public void testRegionSplit() throws Exception { - TableName tableName = TableName.valueOf(name.getMethodName()); - UTIL.getAdmin().createTable( - TableDescriptorBuilder.newBuilder(tableName).addColumnFamily(ColumnFamilyDescriptorBuilder - .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build()); - UTIL.waitTableAvailable(tableName); + TableName tableName = createTable(); try (Table table = UTIL.getConnection().getTable(tableName)) { for (int i = 0; i < 100; i++) { table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); @@ -148,7 +140,7 @@ public class TestSerialReplication extends SerialReplicationTestBase { TableName tableName = TableName.valueOf(name.getMethodName()); UTIL.getAdmin().createTable( TableDescriptorBuilder.newBuilder(tableName) - .addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(CF) + .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(CF) .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) .build(), new byte[][] { splitKey }); @@ -204,4 +196,58 @@ public class TestSerialReplication extends SerialReplicationTestBase { assertEquals(200, count); } } + + @Test + public void testRemovePeerNothingReplicated() throws Exception { + TableName tableName = createTable(); + String encodedRegionName = + UTIL.getMiniHBaseCluster().getRegions(tableName).get(0).getRegionInfo().getEncodedName(); + ReplicationQueueStorage queueStorage = + UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage(); + assertEquals(HConstants.NO_SEQNUM, queueStorage.getLastSequenceId(encodedRegionName, PEER_ID)); + UTIL.getAdmin().removeReplicationPeer(PEER_ID); + assertEquals(HConstants.NO_SEQNUM, queueStorage.getLastSequenceId(encodedRegionName, PEER_ID)); + } + + @Test + public void testRemovePeer() throws Exception { + TableName tableName = createTable(); + try (Table table = UTIL.getConnection().getTable(tableName)) { + for (int i = 0; i < 100; i++) { + table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); + } + } + enablePeerAndWaitUntilReplicationDone(100); + checkOrder(100); + String encodedRegionName = + UTIL.getMiniHBaseCluster().getRegions(tableName).get(0).getRegionInfo().getEncodedName(); + ReplicationQueueStorage queueStorage = + UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage(); + assertTrue(queueStorage.getLastSequenceId(encodedRegionName, PEER_ID) > 0); + UTIL.getAdmin().removeReplicationPeer(PEER_ID); + // confirm that we delete the last pushed sequence id + assertEquals(HConstants.NO_SEQNUM, queueStorage.getLastSequenceId(encodedRegionName, PEER_ID)); + } + + @Test + public void testRemoveSerialFlag() throws Exception { + TableName tableName = createTable(); + try (Table table = UTIL.getConnection().getTable(tableName)) { + for (int i = 0; i < 100; i++) { + table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); + } + } + enablePeerAndWaitUntilReplicationDone(100); + checkOrder(100); + String encodedRegionName = + UTIL.getMiniHBaseCluster().getRegions(tableName).get(0).getRegionInfo().getEncodedName(); + ReplicationQueueStorage queueStorage = + UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage(); + assertTrue(queueStorage.getLastSequenceId(encodedRegionName, PEER_ID) > 0); + ReplicationPeerConfig peerConfig = UTIL.getAdmin().getReplicationPeerConfig(PEER_ID); + UTIL.getAdmin().updateReplicationPeerConfig(PEER_ID, + ReplicationPeerConfig.newBuilder(peerConfig).setSerial(false).build()); + // confirm that we delete the last pushed sequence id + assertEquals(HConstants.NO_SEQNUM, queueStorage.getLastSequenceId(encodedRegionName, PEER_ID)); + } }