http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index a8cffba..9ff4b2d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -45,8 +45,10 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableDescriptors; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost; @@ -225,8 +227,16 @@ public class ReplicationSourceManager implements ReplicationListener { * old region server wal queues */ protected void init() throws IOException, ReplicationException { + boolean replicationForBulkLoadDataEnabled = + conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, + HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT); for (String id : this.replicationPeers.getPeerIds()) { addSource(id); + if (replicationForBulkLoadDataEnabled) { + // Check if peer exists in hfile-refs queue, if not add it. This can happen in the case + // when a peer was added before replication for bulk loaded data was enabled. + this.replicationQueues.addPeerToHFileRefs(id); + } } List<String> currentReplicators = this.replicationQueues.getListOfReplicators(); if (currentReplicators == null || currentReplicators.size() == 0) { @@ -733,4 +743,15 @@ public class ReplicationSourceManager implements ReplicationListener { } return stats.toString(); } + + public void addHFileRefs(TableName tableName, byte[] family, List<String> files) + throws ReplicationException { + for (ReplicationSourceInterface source : this.sources) { + source.addHFileRefs(tableName, family, files); + } + } + + public void cleanUpHFileRefs(String peerId, List<String> files) { + this.replicationQueues.removeHFileRefs(peerId, files); + } }
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SourceFSConfigurationProvider.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SourceFSConfigurationProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SourceFSConfigurationProvider.java new file mode 100644 index 0000000..8271115 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SourceFSConfigurationProvider.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Interface that defines how a region server in peer cluster will get source cluster file system + * configurations. User can configure their custom implementation implementing this interface by + * setting the value of their custom implementation's fully qualified class name to + * hbase.replication.source.fs.conf.provider property in RegionServer configuration. Default is + * {@link DefaultSourceFSConfigurationProvider} + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) +public interface SourceFSConfigurationProvider { + + /** + * Returns the source cluster file system configuration for the given source cluster replication + * ID. + * @param sinkConf sink cluster configuration + * @param replicationClusterId unique ID which identifies the source cluster + * @return source cluster file system configuration + * @throws IOException for invalid directory or for a bad disk. + */ + public Configuration getConf(Configuration sinkConf, String replicationClusterId) + throws IOException; + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java index becc9f3..3541ade 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java @@ -217,7 +217,7 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService for(ClientProtos.BulkLoadHFileRequest.FamilyPath el : request.getFamilyPathList()) { familyPaths.add(new Pair(el.getFamily().toByteArray(),el.getPath())); } - + Token userToken = null; if (userProvider.isHadoopSecurityEnabled()) { userToken = new Token(request.getFsToken().getIdentifier().toByteArray(), request.getFsToken() @@ -375,6 +375,14 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService public String prepareBulkLoad(final byte[] family, final String srcPath) throws IOException { Path p = new Path(srcPath); Path stageP = new Path(stagingDir, new Path(Bytes.toString(family), p.getName())); + + // In case of Replication for bulk load files, hfiles are already copied in staging directory + if (p.equals(stageP)) { + LOG.debug(p.getName() + + " is already available in staging directory. Skipping copy or rename."); + return stageP.toString(); + } + if (srcFs == null) { srcFs = FileSystem.get(p.toUri(), conf); } @@ -414,6 +422,14 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService Path p = new Path(srcPath); Path stageP = new Path(stagingDir, new Path(Bytes.toString(family), p.getName())); + + // In case of Replication for bulk load files, hfiles are not renamed by end point during + // prepare stage, so no need of rename here again + if (p.equals(stageP)) { + LOG.debug(p.getName() + " is already available in source directory. Skipping rename."); + return; + } + LOG.debug("Moving " + stageP + " back to " + p); if(!fs.rename(stageP, p)) throw new IOException("Failed to move HFile: " + stageP + " to " + p); http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java new file mode 100644 index 0000000..87db386 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java @@ -0,0 +1,264 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package org.apache.hadoop.hbase.master.cleaner; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.ChoreService; +import org.apache.hadoop.hbase.CoordinatedStateManager; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationFactory; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationPeers; +import org.apache.hadoop.hbase.replication.ReplicationQueues; +import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; +import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl; +import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner; +import org.apache.hadoop.hbase.replication.regionserver.Replication; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +@Category({ MasterTests.class, SmallTests.class }) +public class TestReplicationHFileCleaner { + private static final Log LOG = LogFactory.getLog(ReplicationQueuesZKImpl.class); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static Server server; + private static ReplicationQueues rq; + private static ReplicationPeers rp; + private static final String peerId = "TestReplicationHFileCleaner"; + private static Configuration conf = TEST_UTIL.getConfiguration(); + static FileSystem fs = null; + Path root; + + /** + * @throws java.lang.Exception + */ + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniZKCluster(); + server = new DummyServer(); + conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); + Replication.decorateMasterConfiguration(conf); + rp = ReplicationFactory.getReplicationPeers(server.getZooKeeper(), conf, server); + rp.init(); + + rq = ReplicationFactory.getReplicationQueues(server.getZooKeeper(), conf, server); + rq.init(server.getServerName().toString()); + try { + fs = FileSystem.get(conf); + } finally { + if (fs != null) { + fs.close(); + } + } + } + + /** + * @throws java.lang.Exception + */ + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniZKCluster(); + } + + @Before + public void setup() throws ReplicationException, IOException { + root = TEST_UTIL.getDataTestDirOnTestFS(); + rp.addPeer(peerId, new ReplicationPeerConfig().setClusterKey(TEST_UTIL.getClusterKey()), null); + } + + @After + public void cleanup() throws ReplicationException { + try { + fs.delete(root, true); + } catch (IOException e) { + LOG.warn("Failed to delete files recursively from path " + root); + } + rp.removePeer(peerId); + } + + @Test + public void testIsFileDeletable() throws IOException, ReplicationException { + // 1. Create a file + Path file = new Path(root, "testIsFileDeletableWithNoHFileRefs"); + fs.createNewFile(file); + // 2. Assert file is successfully created + assertTrue("Test file not created!", fs.exists(file)); + ReplicationHFileCleaner cleaner = new ReplicationHFileCleaner(); + cleaner.setConf(conf); + // 3. Assert that file as is should be deletable + assertTrue("Cleaner should allow to delete this file as there is no hfile reference node " + + "for it in the queue.", + cleaner.isFileDeletable(fs.getFileStatus(file))); + + List<String> files = new ArrayList<String>(1); + files.add(file.getName()); + // 4. Add the file to hfile-refs queue + rq.addHFileRefs(peerId, files); + // 5. Assert file should not be deletable + assertFalse("Cleaner should not allow to delete this file as there is a hfile reference node " + + "for it in the queue.", + cleaner.isFileDeletable(fs.getFileStatus(file))); + } + + @Test + public void testGetDeletableFiles() throws Exception { + // 1. Create two files and assert that they do not exist + Path notDeletablefile = new Path(root, "testGetDeletableFiles_1"); + fs.createNewFile(notDeletablefile); + assertTrue("Test file not created!", fs.exists(notDeletablefile)); + Path deletablefile = new Path(root, "testGetDeletableFiles_2"); + fs.createNewFile(deletablefile); + assertTrue("Test file not created!", fs.exists(deletablefile)); + + List<FileStatus> files = new ArrayList<FileStatus>(2); + FileStatus f = new FileStatus(); + f.setPath(deletablefile); + files.add(f); + f = new FileStatus(); + f.setPath(notDeletablefile); + files.add(f); + + List<String> hfiles = new ArrayList<>(1); + hfiles.add(notDeletablefile.getName()); + // 2. Add one file to hfile-refs queue + rq.addHFileRefs(peerId, hfiles); + + ReplicationHFileCleaner cleaner = new ReplicationHFileCleaner(); + cleaner.setConf(conf); + Iterator<FileStatus> deletableFilesIterator = cleaner.getDeletableFiles(files).iterator(); + int i = 0; + while (deletableFilesIterator.hasNext() && i < 2) { + i++; + } + // 5. Assert one file should not be deletable and it is present in the list returned + if (i > 2) { + fail("File " + notDeletablefile + + " should not be deletable as its hfile reference node is not added."); + } + assertTrue(deletableFilesIterator.next().getPath().equals(deletablefile)); + } + + /* + * Test for HBASE-14621. This test will not assert directly anything. Without the fix the test + * will end up in a infinite loop, so it will timeout. + */ + @Test(timeout = 15000) + public void testForDifferntHFileRefsZnodeVersion() throws Exception { + // 1. Create a file + Path file = new Path(root, "testForDifferntHFileRefsZnodeVersion"); + fs.createNewFile(file); + // 2. Assert file is successfully created + assertTrue("Test file not created!", fs.exists(file)); + ReplicationHFileCleaner cleaner = new ReplicationHFileCleaner(); + cleaner.setConf(conf); + + ReplicationQueuesClient replicationQueuesClient = Mockito.mock(ReplicationQueuesClient.class); + //Return different znode version for each call + Mockito.when(replicationQueuesClient.getHFileRefsNodeChangeVersion()).thenReturn(1, 2); + + Class<? extends ReplicationHFileCleaner> cleanerClass = cleaner.getClass(); + Field rqc = cleanerClass.getDeclaredField("rqc"); + rqc.setAccessible(true); + rqc.set(cleaner, replicationQueuesClient); + + cleaner.isFileDeletable(fs.getFileStatus(file)); + } + + static class DummyServer implements Server { + + @Override + public Configuration getConfiguration() { + return TEST_UTIL.getConfiguration(); + } + + @Override + public ZooKeeperWatcher getZooKeeper() { + try { + return new ZooKeeperWatcher(getConfiguration(), "dummy server", this); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + + @Override + public CoordinatedStateManager getCoordinatedStateManager() { + return null; + } + + @Override + public ClusterConnection getConnection() { + return null; + } + + @Override + public MetaTableLocator getMetaTableLocator() { + return null; + } + + @Override + public ServerName getServerName() { + return ServerName.valueOf("regionserver,60020,000000"); + } + + @Override + public void abort(String why, Throwable e) { + } + + @Override + public boolean isAborted() { + return false; + } + + @Override + public void stop(String why) { + } + + @Override + public boolean isStopped() { + return false; + } + + @Override + public ChoreService getChoreService() { + return null; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java index f463f76..abe484e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java @@ -19,12 +19,14 @@ package org.apache.hadoop.hbase.replication; import java.io.IOException; +import java.util.List; import java.util.UUID; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager; @@ -89,4 +91,10 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { public String getStats() { return ""; } + + @Override + public void addHFileRefs(TableName tableName, byte[] family, List<String> files) + throws ReplicationException { + return; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java index 455a790..e919c24 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java @@ -19,15 +19,21 @@ package org.apache.hadoop.hbase.replication; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; import java.io.Closeable; import java.io.IOException; +import java.util.Arrays; import java.util.List; import java.util.Random; +import java.util.concurrent.CountDownLatch; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; @@ -35,7 +41,9 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; @@ -48,12 +56,17 @@ import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; +import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.RSRpcServices; +import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.HFileTestUtil; import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.junit.After; @@ -79,6 +92,7 @@ public class TestMasterReplication { private static final TableName tableName = TableName.valueOf("test"); private static final byte[] famName = Bytes.toBytes("f"); + private static final byte[] famName1 = Bytes.toBytes("f1"); private static final byte[] row = Bytes.toBytes("row"); private static final byte[] row1 = Bytes.toBytes("row1"); private static final byte[] row2 = Bytes.toBytes("row2"); @@ -103,7 +117,11 @@ public class TestMasterReplication { baseConfiguration.setInt("hbase.regionserver.maxlogs", 10); baseConfiguration.setLong("hbase.master.logcleaner.ttl", 10); baseConfiguration.setBoolean(HConstants.REPLICATION_ENABLE_KEY, - HConstants.REPLICATION_ENABLE_DEFAULT); + HConstants.REPLICATION_ENABLE_DEFAULT); + baseConfiguration.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); + baseConfiguration.set("hbase.replication.source.fs.conf.provider", + TestSourceFSConfigurationProvider.class.getCanonicalName()); + baseConfiguration.set(HConstants.REPLICATION_CLUSTER_ID, "12345"); baseConfiguration.setBoolean("dfs.support.append", true); baseConfiguration.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); baseConfiguration.setStrings( @@ -114,6 +132,9 @@ public class TestMasterReplication { HColumnDescriptor fam = new HColumnDescriptor(famName); fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); table.addFamily(fam); + fam = new HColumnDescriptor(famName1); + fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); + table.addFamily(fam); fam = new HColumnDescriptor(noRepfamName); table.addFamily(fam); } @@ -130,14 +151,7 @@ public class TestMasterReplication { int numClusters = 2; Table[] htables = null; try { - startMiniClusters(numClusters); - createTableOnClusters(table); - - htables = getHTablesOnClusters(tableName); - - // Test the replication scenarios of 0 -> 1 -> 0 - addPeer("1", 0, 1); - addPeer("1", 1, 0); + htables = setUpClusterTablesAndPeers(numClusters); int[] expectedCounts = new int[] { 2, 2 }; @@ -157,12 +171,64 @@ public class TestMasterReplication { } /** - * Tests the cyclic replication scenario of 0 -> 1 -> 2 -> 0 by adding and - * deleting rows to a table in each clusters and ensuring that the each of - * these clusters get the appropriate mutations. It also tests the grouping - * scenario where a cluster needs to replicate the edits originating from - * itself and also the edits that it received using replication from a - * different cluster. The scenario is explained in HBASE-9158 + * It tests the replication scenario involving 0 -> 1 -> 0. It does it by bulk loading a set of + * HFiles to a table in each cluster, checking if it's replicated. + */ + @Test(timeout = 300000) + public void testHFileCyclicReplication() throws Exception { + LOG.info("testHFileCyclicReplication"); + int numClusters = 2; + Table[] htables = null; + try { + htables = setUpClusterTablesAndPeers(numClusters); + + // Load 100 rows for each hfile range in cluster '0' and validate whether its been replicated + // to cluster '1'. + byte[][][] hfileRanges = + new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") }, + new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("fff") }, }; + int numOfRows = 100; + int[] expectedCounts = + new int[] { hfileRanges.length * numOfRows, hfileRanges.length * numOfRows }; + + loadAndValidateHFileReplication("testHFileCyclicReplication_01", 0, new int[] { 1 }, row, + famName, htables, hfileRanges, numOfRows, expectedCounts, true); + + // Load 200 rows for each hfile range in cluster '1' and validate whether its been replicated + // to cluster '0'. + hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes("gggg"), Bytes.toBytes("iiii") }, + new byte[][] { Bytes.toBytes("jjj"), Bytes.toBytes("lll") }, }; + numOfRows = 200; + int[] newExpectedCounts = new int[] { hfileRanges.length * numOfRows + expectedCounts[0], + hfileRanges.length * numOfRows + expectedCounts[1] }; + + loadAndValidateHFileReplication("testHFileCyclicReplication_10", 1, new int[] { 0 }, row, + famName, htables, hfileRanges, numOfRows, newExpectedCounts, true); + + } finally { + close(htables); + shutDownMiniClusters(); + } + } + + private Table[] setUpClusterTablesAndPeers(int numClusters) throws Exception { + Table[] htables; + startMiniClusters(numClusters); + createTableOnClusters(table); + + htables = getHTablesOnClusters(tableName); + // Test the replication scenarios of 0 -> 1 -> 0 + addPeer("1", 0, 1); + addPeer("1", 1, 0); + return htables; + } + + /** + * Tests the cyclic replication scenario of 0 -> 1 -> 2 -> 0 by adding and deleting rows to a + * table in each clusters and ensuring that the each of these clusters get the appropriate + * mutations. It also tests the grouping scenario where a cluster needs to replicate the edits + * originating from itself and also the edits that it received using replication from a different + * cluster. The scenario is explained in HBASE-9158 */ @Test(timeout = 300000) public void testCyclicReplication2() throws Exception { @@ -213,6 +279,119 @@ public class TestMasterReplication { } /** + * It tests the multi slave hfile replication scenario involving 0 -> 1, 2. It does it by bulk + * loading a set of HFiles to a table in master cluster, checking if it's replicated in its peers. + */ + @Test(timeout = 300000) + public void testHFileMultiSlaveReplication() throws Exception { + LOG.info("testHFileMultiSlaveReplication"); + int numClusters = 3; + Table[] htables = null; + try { + startMiniClusters(numClusters); + createTableOnClusters(table); + + // Add a slave, 0 -> 1 + addPeer("1", 0, 1); + + htables = getHTablesOnClusters(tableName); + + // Load 100 rows for each hfile range in cluster '0' and validate whether its been replicated + // to cluster '1'. + byte[][][] hfileRanges = + new byte[][][] { new byte[][] { Bytes.toBytes("mmmm"), Bytes.toBytes("oooo") }, + new byte[][] { Bytes.toBytes("ppp"), Bytes.toBytes("rrr") }, }; + int numOfRows = 100; + + int[] expectedCounts = + new int[] { hfileRanges.length * numOfRows, hfileRanges.length * numOfRows }; + + loadAndValidateHFileReplication("testHFileCyclicReplication_0", 0, new int[] { 1 }, row, + famName, htables, hfileRanges, numOfRows, expectedCounts, true); + + // Validate data is not replicated to cluster '2'. + assertEquals(0, utilities[2].countRows(htables[2])); + + rollWALAndWait(utilities[0], htables[0].getName(), row); + + // Add one more slave, 0 -> 2 + addPeer("2", 0, 2); + + // Load 200 rows for each hfile range in cluster '0' and validate whether its been replicated + // to cluster '1' and '2'. Previous data should be replicated to cluster '2'. + hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes("ssss"), Bytes.toBytes("uuuu") }, + new byte[][] { Bytes.toBytes("vvv"), Bytes.toBytes("xxx") }, }; + numOfRows = 200; + + int[] newExpectedCounts = new int[] { hfileRanges.length * numOfRows + expectedCounts[0], + hfileRanges.length * numOfRows + expectedCounts[1], hfileRanges.length * numOfRows }; + + loadAndValidateHFileReplication("testHFileCyclicReplication_1", 0, new int[] { 1, 2 }, row, + famName, htables, hfileRanges, numOfRows, newExpectedCounts, true); + + } finally { + close(htables); + shutDownMiniClusters(); + } + } + + /** + * It tests the bulk loaded hfile replication scenario to only explicitly specified table column + * families. It does it by bulk loading a set of HFiles belonging to both the CFs of table and set + * only one CF data to replicate. + */ + @Test(timeout = 300000) + public void testHFileReplicationForConfiguredTableCfs() throws Exception { + LOG.info("testHFileReplicationForConfiguredTableCfs"); + int numClusters = 2; + Table[] htables = null; + try { + startMiniClusters(numClusters); + createTableOnClusters(table); + + htables = getHTablesOnClusters(tableName); + // Test the replication scenarios only 'f' is configured for table data replication not 'f1' + addPeer("1", 0, 1, tableName.getNameAsString() + ":" + Bytes.toString(famName)); + + // Load 100 rows for each hfile range in cluster '0' for table CF 'f' + byte[][][] hfileRanges = + new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") }, + new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("fff") }, }; + int numOfRows = 100; + int[] expectedCounts = + new int[] { hfileRanges.length * numOfRows, hfileRanges.length * numOfRows }; + + loadAndValidateHFileReplication("load_f", 0, new int[] { 1 }, row, famName, htables, + hfileRanges, numOfRows, expectedCounts, true); + + // Load 100 rows for each hfile range in cluster '0' for table CF 'f1' + hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes("gggg"), Bytes.toBytes("iiii") }, + new byte[][] { Bytes.toBytes("jjj"), Bytes.toBytes("lll") }, }; + numOfRows = 100; + + int[] newExpectedCounts = + new int[] { hfileRanges.length * numOfRows + expectedCounts[0], expectedCounts[1] }; + + loadAndValidateHFileReplication("load_f1", 0, new int[] { 1 }, row, famName1, htables, + hfileRanges, numOfRows, newExpectedCounts, false); + + // Validate data replication for CF 'f1' + + // Source cluster table should contain data for the families + wait(0, htables[0], hfileRanges.length * numOfRows + expectedCounts[0]); + + // Sleep for enough time so that the data is still not replicated for the CF which is not + // configured for replication + Thread.sleep((NB_RETRIES / 2) * SLEEP_TIME); + // Peer cluster should have only configured CF data + wait(1, htables[1], expectedCounts[1]); + } finally { + close(htables); + shutDownMiniClusters(); + } + } + + /** * Tests cyclic replication scenario of 0 -> 1 -> 2 -> 1. */ @Test(timeout = 300000) @@ -328,6 +507,17 @@ public class TestMasterReplication { close(replicationAdmin); } } + + private void addPeer(String id, int masterClusterNumber, int slaveClusterNumber, String tableCfs) + throws Exception { + ReplicationAdmin replicationAdmin = null; + try { + replicationAdmin = new ReplicationAdmin(configurations[masterClusterNumber]); + replicationAdmin.addPeer(id, utilities[slaveClusterNumber].getClusterKey(), tableCfs); + } finally { + close(replicationAdmin); + } + } private void disablePeer(String id, int masterClusterNumber) throws Exception { ReplicationAdmin replicationAdmin = null; @@ -405,8 +595,56 @@ public class TestMasterReplication { wait(row, target, false); } - private void wait(byte[] row, Table target, boolean isDeleted) - throws Exception { + private void loadAndValidateHFileReplication(String testName, int masterNumber, + int[] slaveNumbers, byte[] row, byte[] fam, Table[] tables, byte[][][] hfileRanges, + int numOfRows, int[] expectedCounts, boolean toValidate) throws Exception { + HBaseTestingUtility util = utilities[masterNumber]; + + Path dir = util.getDataTestDirOnTestFS(testName); + FileSystem fs = util.getTestFileSystem(); + dir = dir.makeQualified(fs); + Path familyDir = new Path(dir, Bytes.toString(fam)); + + int hfileIdx = 0; + for (byte[][] range : hfileRanges) { + byte[] from = range[0]; + byte[] to = range[1]; + HFileTestUtil.createHFile(util.getConfiguration(), fs, + new Path(familyDir, "hfile_" + hfileIdx++), fam, row, from, to, numOfRows); + } + + Table source = tables[masterNumber]; + final TableName tableName = source.getName(); + LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()); + String[] args = { dir.toString(), tableName.toString() }; + loader.run(args); + + if (toValidate) { + for (int slaveClusterNumber : slaveNumbers) { + wait(slaveClusterNumber, tables[slaveClusterNumber], expectedCounts[slaveClusterNumber]); + } + } + } + + private void wait(int slaveNumber, Table target, int expectedCount) + throws IOException, InterruptedException { + int count = 0; + for (int i = 0; i < NB_RETRIES; i++) { + if (i == NB_RETRIES - 1) { + fail("Waited too much time for bulkloaded data replication. Current count=" + count + + ", expected count=" + expectedCount); + } + count = utilities[slaveNumber].countRows(target); + if (count != expectedCount) { + LOG.info("Waiting more time for bulkloaded data replication."); + Thread.sleep(SLEEP_TIME); + } else { + break; + } + } + } + + private void wait(byte[] row, Table target, boolean isDeleted) throws Exception { Get get = new Get(row); for (int i = 0; i < NB_RETRIES; i++) { if (i == NB_RETRIES - 1) { @@ -430,6 +668,47 @@ public class TestMasterReplication { } } + private void rollWALAndWait(final HBaseTestingUtility utility, final TableName table, + final byte[] row) throws IOException { + final Admin admin = utility.getHBaseAdmin(); + final MiniHBaseCluster cluster = utility.getMiniHBaseCluster(); + + // find the region that corresponds to the given row. + HRegion region = null; + for (HRegion candidate : cluster.getRegions(table)) { + if (HRegion.rowIsInRange(candidate.getRegionInfo(), row)) { + region = candidate; + break; + } + } + assertNotNull("Couldn't find the region for row '" + Arrays.toString(row) + "'", region); + + final CountDownLatch latch = new CountDownLatch(1); + + // listen for successful log rolls + final WALActionsListener listener = new WALActionsListener.Base() { + @Override + public void postLogRoll(final Path oldPath, final Path newPath) throws IOException { + latch.countDown(); + } + }; + region.getWAL().registerWALActionsListener(listener); + + // request a roll + admin.rollWALWriter(cluster.getServerHoldingRegion(region.getTableDesc().getTableName(), + region.getRegionInfo().getRegionName())); + + // wait + try { + latch.await(); + } catch (InterruptedException exception) { + LOG.warn("Interrupted while waiting for the wal of '" + region + "' to roll. If later " + + "replication tests fail, it's probably because we should still be waiting."); + Thread.currentThread().interrupt(); + } + region.getWAL().unregisterWALActionsListener(listener); + } + /** * Use a coprocessor to count puts and deletes. as KVs would be replicated back with the same * timestamp there is otherwise no way to count them. http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java index 4823597..47d2880 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java @@ -658,7 +658,8 @@ public class TestReplicationSmallTests extends TestReplicationBase { HRegionInfo hri = new HRegionInfo(htable1.getName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); WALEdit edit = WALEdit.createCompaction(hri, compactionDescriptor); - Replication.scopeWALEdits(htable1.getTableDescriptor(), new WALKey(), edit); + Replication.scopeWALEdits(htable1.getTableDescriptor(), new WALKey(), edit, + htable1.getConfiguration(), null); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java index 696c130..41c3240 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication; import static org.junit.Assert.*; +import java.util.ArrayList; import java.util.List; import java.util.SortedMap; import java.util.SortedSet; @@ -160,6 +161,62 @@ public abstract class TestReplicationStateBasic { } @Test + public void testHfileRefsReplicationQueues() throws ReplicationException, KeeperException { + rp.init(); + rq1.init(server1); + rqc.init(); + + List<String> files1 = new ArrayList<String>(3); + files1.add("file_1"); + files1.add("file_2"); + files1.add("file_3"); + assertNull(rqc.getReplicableHFiles(ID_ONE)); + assertEquals(0, rqc.getAllPeersFromHFileRefsQueue().size()); + rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), null); + rq1.addHFileRefs(ID_ONE, files1); + assertEquals(1, rqc.getAllPeersFromHFileRefsQueue().size()); + assertEquals(3, rqc.getReplicableHFiles(ID_ONE).size()); + List<String> files2 = new ArrayList<>(files1); + String removedString = files2.remove(0); + rq1.removeHFileRefs(ID_ONE, files2); + assertEquals(1, rqc.getReplicableHFiles(ID_ONE).size()); + files2 = new ArrayList<>(1); + files2.add(removedString); + rq1.removeHFileRefs(ID_ONE, files2); + assertEquals(0, rqc.getReplicableHFiles(ID_ONE).size()); + rp.removePeer(ID_ONE); + } + + @Test + public void testRemovePeerForHFileRefs() throws ReplicationException, KeeperException { + rq1.init(server1); + rqc.init(); + + rp.init(); + rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), null); + rp.addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO), null); + + List<String> files1 = new ArrayList<String>(3); + files1.add("file_1"); + files1.add("file_2"); + files1.add("file_3"); + rq1.addHFileRefs(ID_ONE, files1); + rq1.addHFileRefs(ID_TWO, files1); + assertEquals(2, rqc.getAllPeersFromHFileRefsQueue().size()); + assertEquals(3, rqc.getReplicableHFiles(ID_ONE).size()); + assertEquals(3, rqc.getReplicableHFiles(ID_TWO).size()); + + rp.removePeer(ID_ONE); + assertEquals(1, rqc.getAllPeersFromHFileRefsQueue().size()); + assertNull(rqc.getReplicableHFiles(ID_ONE)); + assertEquals(3, rqc.getReplicableHFiles(ID_TWO).size()); + + rp.removePeer(ID_TWO); + assertEquals(0, rqc.getAllPeersFromHFileRefsQueue().size()); + assertNull(rqc.getReplicableHFiles(ID_TWO)); + } + + @Test public void testReplicationPeers() throws Exception { rp.init(); http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java index 4587c61..3b7402a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java @@ -64,6 +64,7 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic { utility = new HBaseTestingUtility(); utility.startMiniZKCluster(); conf = utility.getConfiguration(); + conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); zkw = HBaseTestingUtility.getZooKeeperWatcher(utility); String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication"); replicationZNode = ZKUtil.joinZNode(zkw.baseZNode, replicationZNodeName); http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java index 13545b5..b36bb9a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java @@ -52,15 +52,15 @@ public class TestReplicationSyncUpTool extends TestReplicationBase { private static final TableName t1_su = TableName.valueOf("t1_syncup"); private static final TableName t2_su = TableName.valueOf("t2_syncup"); - private static final byte[] famName = Bytes.toBytes("cf1"); + protected static final byte[] famName = Bytes.toBytes("cf1"); private static final byte[] qualName = Bytes.toBytes("q1"); - private static final byte[] noRepfamName = Bytes.toBytes("norep"); + protected static final byte[] noRepfamName = Bytes.toBytes("norep"); private HTableDescriptor t1_syncupSource, t1_syncupTarget; private HTableDescriptor t2_syncupSource, t2_syncupTarget; - private Table ht1Source, ht2Source, ht1TargetAtPeer1, ht2TargetAtPeer1; + protected Table ht1Source, ht2Source, ht1TargetAtPeer1, ht2TargetAtPeer1; @Before public void setUp() throws Exception { @@ -179,7 +179,7 @@ public class TestReplicationSyncUpTool extends TestReplicationBase { } - private void setupReplication() throws Exception { + protected void setupReplication() throws Exception { ReplicationAdmin admin1 = new ReplicationAdmin(conf1); ReplicationAdmin admin2 = new ReplicationAdmin(conf2); @@ -418,7 +418,7 @@ public class TestReplicationSyncUpTool extends TestReplicationBase { } } - private void syncUp(HBaseTestingUtility ut) throws Exception { + protected void syncUp(HBaseTestingUtility ut) throws Exception { ReplicationSyncUp.setConfigure(ut.getConfiguration()); String[] arguments = new String[] { null }; new ReplicationSyncUp().run(arguments); http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java new file mode 100644 index 0000000..f54c632 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java @@ -0,0 +1,235 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; +import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.HFileTestUtil; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; + +@Category({ ReplicationTests.class, LargeTests.class }) +public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplicationSyncUpTool { + + private static final Log LOG = LogFactory + .getLog(TestReplicationSyncUpToolWithBulkLoadedData.class); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + conf1.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); + conf1.set(HConstants.REPLICATION_CLUSTER_ID, "12345"); + conf1.set("hbase.replication.source.fs.conf.provider", + TestSourceFSConfigurationProvider.class.getCanonicalName()); + String classes = conf1.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ""); + if (!classes.contains("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint")) { + classes = classes + ",org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint"; + conf1.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, classes); + } + + TestReplicationBase.setUpBeforeClass(); + } + + @Override + public void testSyncUpTool() throws Exception { + /** + * Set up Replication: on Master and one Slave Table: t1_syncup and t2_syncup columnfamily: + * 'cf1' : replicated 'norep': not replicated + */ + setupReplication(); + + /** + * Prepare 16 random hfile ranges required for creating hfiles + */ + Iterator<String> randomHFileRangeListIterator = null; + Set<String> randomHFileRanges = new HashSet<String>(16); + for (int i = 0; i < 16; i++) { + randomHFileRanges.add(UUID.randomUUID().toString()); + } + List<String> randomHFileRangeList = new ArrayList<>(randomHFileRanges); + Collections.sort(randomHFileRangeList); + randomHFileRangeListIterator = randomHFileRangeList.iterator(); + + /** + * at Master: t1_syncup: Load 100 rows into cf1, and 3 rows into norep t2_syncup: Load 200 rows + * into cf1, and 3 rows into norep verify correctly replicated to slave + */ + loadAndReplicateHFiles(true, randomHFileRangeListIterator); + + /** + * Verify hfile load works step 1: stop hbase on Slave step 2: at Master: t1_syncup: Load + * another 100 rows into cf1 and 3 rows into norep t2_syncup: Load another 200 rows into cf1 and + * 3 rows into norep step 3: stop hbase on master, restart hbase on Slave step 4: verify Slave + * still has the rows before load t1_syncup: 100 rows from cf1 t2_syncup: 200 rows from cf1 step + * 5: run syncup tool on Master step 6: verify that hfiles show up on Slave and 'norep' does not + * t1_syncup: 200 rows from cf1 t2_syncup: 400 rows from cf1 verify correctly replicated to + * Slave + */ + mimicSyncUpAfterBulkLoad(randomHFileRangeListIterator); + + } + + private void mimicSyncUpAfterBulkLoad(Iterator<String> randomHFileRangeListIterator) + throws Exception { + LOG.debug("mimicSyncUpAfterBulkLoad"); + utility2.shutdownMiniHBaseCluster(); + + loadAndReplicateHFiles(false, randomHFileRangeListIterator); + + int rowCount_ht1Source = utility1.countRows(ht1Source); + assertEquals("t1_syncup has 206 rows on source, after bulk load of another 103 hfiles", 206, + rowCount_ht1Source); + + int rowCount_ht2Source = utility1.countRows(ht2Source); + assertEquals("t2_syncup has 406 rows on source, after bulk load of another 203 hfiles", 406, + rowCount_ht2Source); + + utility1.shutdownMiniHBaseCluster(); + utility2.restartHBaseCluster(1); + + Thread.sleep(SLEEP_TIME); + + // Before sync up + int rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1); + int rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1); + assertEquals("@Peer1 t1_syncup should still have 100 rows", 100, rowCount_ht1TargetAtPeer1); + assertEquals("@Peer1 t2_syncup should still have 200 rows", 200, rowCount_ht2TargetAtPeer1); + + // Run sync up tool + syncUp(utility1); + + // After syun up + for (int i = 0; i < NB_RETRIES; i++) { + syncUp(utility1); + rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1); + rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1); + if (i == NB_RETRIES - 1) { + if (rowCount_ht1TargetAtPeer1 != 200 || rowCount_ht2TargetAtPeer1 != 400) { + // syncUP still failed. Let's look at the source in case anything wrong there + utility1.restartHBaseCluster(1); + rowCount_ht1Source = utility1.countRows(ht1Source); + LOG.debug("t1_syncup should have 206 rows at source, and it is " + rowCount_ht1Source); + rowCount_ht2Source = utility1.countRows(ht2Source); + LOG.debug("t2_syncup should have 406 rows at source, and it is " + rowCount_ht2Source); + } + assertEquals("@Peer1 t1_syncup should be sync up and have 200 rows", 200, + rowCount_ht1TargetAtPeer1); + assertEquals("@Peer1 t2_syncup should be sync up and have 400 rows", 400, + rowCount_ht2TargetAtPeer1); + } + if (rowCount_ht1TargetAtPeer1 == 200 && rowCount_ht2TargetAtPeer1 == 400) { + LOG.info("SyncUpAfterBulkLoad succeeded at retry = " + i); + break; + } else { + LOG.debug("SyncUpAfterBulkLoad failed at retry = " + i + ", with rowCount_ht1TargetPeer1 =" + + rowCount_ht1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 =" + + rowCount_ht2TargetAtPeer1); + } + Thread.sleep(SLEEP_TIME); + } + } + + private void loadAndReplicateHFiles(boolean verifyReplicationOnSlave, + Iterator<String> randomHFileRangeListIterator) throws Exception { + LOG.debug("loadAndReplicateHFiles"); + + // Load 100 + 3 hfiles to t1_syncup. + byte[][][] hfileRanges = + new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()), + Bytes.toBytes(randomHFileRangeListIterator.next()) } }; + loadAndValidateHFileReplication("HFileReplication_1", row, famName, ht1Source, hfileRanges, + 100); + + hfileRanges = + new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()), + Bytes.toBytes(randomHFileRangeListIterator.next()) } }; + loadAndValidateHFileReplication("HFileReplication_1", row, noRepfamName, ht1Source, + hfileRanges, 3); + + // Load 200 + 3 hfiles to t2_syncup. + hfileRanges = + new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()), + Bytes.toBytes(randomHFileRangeListIterator.next()) } }; + loadAndValidateHFileReplication("HFileReplication_1", row, famName, ht2Source, hfileRanges, + 200); + + hfileRanges = + new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()), + Bytes.toBytes(randomHFileRangeListIterator.next()) } }; + loadAndValidateHFileReplication("HFileReplication_1", row, noRepfamName, ht2Source, + hfileRanges, 3); + + if (verifyReplicationOnSlave) { + // ensure replication completed + wait(ht1TargetAtPeer1, utility1.countRows(ht1Source) - 3, + "t1_syncup has 103 rows on source, and 100 on slave1"); + + wait(ht2TargetAtPeer1, utility1.countRows(ht2Source) - 3, + "t2_syncup has 203 rows on source, and 200 on slave1"); + } + } + + private void loadAndValidateHFileReplication(String testName, byte[] row, byte[] fam, + Table source, byte[][][] hfileRanges, int numOfRows) throws Exception { + Path dir = utility1.getDataTestDirOnTestFS(testName); + FileSystem fs = utility1.getTestFileSystem(); + dir = dir.makeQualified(fs); + Path familyDir = new Path(dir, Bytes.toString(fam)); + + int hfileIdx = 0; + for (byte[][] range : hfileRanges) { + byte[] from = range[0]; + byte[] to = range[1]; + HFileTestUtil.createHFile(utility1.getConfiguration(), fs, new Path(familyDir, "hfile_" + + hfileIdx++), fam, row, from, to, numOfRows); + } + + final TableName tableName = source.getName(); + LoadIncrementalHFiles loader = new LoadIncrementalHFiles(utility1.getConfiguration()); + String[] args = { dir.toString(), tableName.toString() }; + loader.run(args); + } + + private void wait(Table target, int expectedCount, String msg) throws IOException, + InterruptedException { + for (int i = 0; i < NB_RETRIES; i++) { + int rowCount_ht2TargetAtPeer1 = utility2.countRows(target); + if (i == NB_RETRIES - 1) { + assertEquals(msg, expectedCount, rowCount_ht2TargetAtPeer1); + } + if (expectedCount == rowCount_ht2TargetAtPeer1) { + break; + } + Thread.sleep(SLEEP_TIME); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java index b87e7ef..f08d2bf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java @@ -21,32 +21,52 @@ package org.apache.hadoop.hbase.replication.regionserver; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import java.security.SecureRandom; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.testclassification.ReplicationTests; -import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.HFileTestUtil; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -58,21 +78,18 @@ public class TestReplicationSink { private static final Log LOG = LogFactory.getLog(TestReplicationSink.class); private static final int BATCH_SIZE = 10; - private final static HBaseTestingUtility TEST_UTIL = - new HBaseTestingUtility(); + protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private static ReplicationSink SINK; + protected static ReplicationSink SINK; - private static final TableName TABLE_NAME1 = - TableName.valueOf("table1"); - private static final TableName TABLE_NAME2 = - TableName.valueOf("table2"); + protected static final TableName TABLE_NAME1 = TableName.valueOf("table1"); + protected static final TableName TABLE_NAME2 = TableName.valueOf("table2"); - private static final byte[] FAM_NAME1 = Bytes.toBytes("info1"); - private static final byte[] FAM_NAME2 = Bytes.toBytes("info2"); + protected static final byte[] FAM_NAME1 = Bytes.toBytes("info1"); + protected static final byte[] FAM_NAME2 = Bytes.toBytes("info2"); - private static Table table1; - private static Stoppable STOPPABLE = new Stoppable() { + protected static Table table1; + protected static Stoppable STOPPABLE = new Stoppable() { final AtomicBoolean stop = new AtomicBoolean(false); @Override @@ -85,10 +102,13 @@ public class TestReplicationSink { LOG.info("STOPPING BECAUSE: " + why); this.stop.set(true); } - + }; - private static Table table2; + protected static Table table2; + protected static String baseNamespaceDir; + protected static String hfileArchiveDir; + protected static String replicationClusterId; /** * @throws java.lang.Exception @@ -98,11 +118,18 @@ public class TestReplicationSink { TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true); TEST_UTIL.getConfiguration().setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT); + TEST_UTIL.getConfiguration().set("hbase.replication.source.fs.conf.provider", + TestSourceFSConfigurationProvider.class.getCanonicalName()); + TEST_UTIL.startMiniCluster(3); SINK = new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration()), STOPPABLE); table1 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAME1); table2 = TEST_UTIL.createTable(TABLE_NAME2, FAM_NAME2); + Path rootDir = FSUtils.getRootDir(TEST_UTIL.getConfiguration()); + baseNamespaceDir = new Path(rootDir, new Path(HConstants.BASE_NAMESPACE_DIR)).toString(); + hfileArchiveDir = new Path(rootDir, new Path(HConstants.HFILE_ARCHIVE_DIRECTORY)).toString(); + replicationClusterId = "12345"; } /** @@ -134,7 +161,8 @@ public class TestReplicationSink { for(int i = 0; i < BATCH_SIZE; i++) { entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); } - SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator())); + SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()), + replicationClusterId, baseNamespaceDir, hfileArchiveDir); Scan scan = new Scan(); ResultScanner scanRes = table1.getScanner(scan); assertEquals(BATCH_SIZE, scanRes.next(BATCH_SIZE).length); @@ -151,7 +179,8 @@ public class TestReplicationSink { for(int i = 0; i < BATCH_SIZE/2; i++) { entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); } - SINK.replicateEntries(entries, CellUtil.createCellScanner(cells)); + SINK.replicateEntries(entries, CellUtil.createCellScanner(cells), replicationClusterId, + baseNamespaceDir, hfileArchiveDir); entries = new ArrayList<WALEntry>(BATCH_SIZE); cells = new ArrayList<Cell>(); @@ -160,7 +189,8 @@ public class TestReplicationSink { i % 2 != 0 ? KeyValue.Type.Put: KeyValue.Type.DeleteColumn, cells)); } - SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator())); + SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()), + replicationClusterId, baseNamespaceDir, hfileArchiveDir); Scan scan = new Scan(); ResultScanner scanRes = table1.getScanner(scan); assertEquals(BATCH_SIZE/2, scanRes.next(BATCH_SIZE).length); @@ -179,7 +209,8 @@ public class TestReplicationSink { i, KeyValue.Type.Put, cells)); } - SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator())); + SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()), + replicationClusterId, baseNamespaceDir, hfileArchiveDir); Scan scan = new Scan(); ResultScanner scanRes = table2.getScanner(scan); for(Result res : scanRes) { @@ -198,14 +229,16 @@ public class TestReplicationSink { for(int i = 0; i < 3; i++) { entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); } - SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator())); + SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()), + replicationClusterId, baseNamespaceDir, hfileArchiveDir); entries = new ArrayList<WALEntry>(3); cells = new ArrayList<Cell>(); entries.add(createEntry(TABLE_NAME1, 0, KeyValue.Type.DeleteColumn, cells)); entries.add(createEntry(TABLE_NAME1, 1, KeyValue.Type.DeleteFamily, cells)); entries.add(createEntry(TABLE_NAME1, 2, KeyValue.Type.DeleteColumn, cells)); - SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator())); + SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()), + replicationClusterId, baseNamespaceDir, hfileArchiveDir); Scan scan = new Scan(); ResultScanner scanRes = table1.getScanner(scan); @@ -228,12 +261,96 @@ public class TestReplicationSink { for(int i = 3; i < 5; i++) { entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); } - SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator())); + SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()), + replicationClusterId, baseNamespaceDir, hfileArchiveDir); Get get = new Get(Bytes.toBytes(1)); Result res = table1.get(get); assertEquals(0, res.size()); } + /** + * Test replicateEntries with a bulk load entry for 25 HFiles + */ + @Test + public void testReplicateEntriesForHFiles() throws Exception { + Path dir = TEST_UTIL.getDataTestDirOnTestFS("testReplicateEntries"); + Path familyDir = new Path(dir, Bytes.toString(FAM_NAME1)); + int numRows = 10; + + List<Path> p = new ArrayList<>(1); + + // 1. Generate 25 hfile ranges + Random rng = new SecureRandom(); + Set<Integer> numbers = new HashSet<>(); + while (numbers.size() < 50) { + numbers.add(rng.nextInt(1000)); + } + List<Integer> numberList = new ArrayList<>(numbers); + Collections.sort(numberList); + + // 2. Create 25 hfiles + Configuration conf = TEST_UTIL.getConfiguration(); + FileSystem fs = dir.getFileSystem(conf); + Iterator<Integer> numbersItr = numberList.iterator(); + for (int i = 0; i < 25; i++) { + Path hfilePath = new Path(familyDir, "hfile_" + i); + HFileTestUtil.createHFile(conf, fs, hfilePath, FAM_NAME1, FAM_NAME1, + Bytes.toBytes(numbersItr.next()), Bytes.toBytes(numbersItr.next()), numRows); + p.add(hfilePath); + } + + // 3. Create a BulkLoadDescriptor and a WALEdit + Map<byte[], List<Path>> storeFiles = new HashMap<>(1); + storeFiles.put(FAM_NAME1, p); + WALEdit edit = null; + WALProtos.BulkLoadDescriptor loadDescriptor = null; + + try (Connection c = ConnectionFactory.createConnection(conf); + RegionLocator l = c.getRegionLocator(TABLE_NAME1)) { + HRegionInfo regionInfo = l.getAllRegionLocations().get(0).getRegionInfo(); + loadDescriptor = + ProtobufUtil.toBulkLoadDescriptor(TABLE_NAME1, + ByteStringer.wrap(regionInfo.getEncodedNameAsBytes()), storeFiles, 1); + edit = WALEdit.createBulkLoadEvent(regionInfo, loadDescriptor); + } + List<WALEntry> entries = new ArrayList<WALEntry>(1); + + // 4. Create a WALEntryBuilder + WALEntry.Builder builder = createWALEntryBuilder(TABLE_NAME1); + + // 5. Copy the hfile to the path as it is in reality + for (int i = 0; i < 25; i++) { + String pathToHfileFromNS = + new StringBuilder(100).append(TABLE_NAME1.getNamespaceAsString()).append(Path.SEPARATOR) + .append(Bytes.toString(TABLE_NAME1.getName())).append(Path.SEPARATOR) + .append(Bytes.toString(loadDescriptor.getEncodedRegionName().toByteArray())) + .append(Path.SEPARATOR).append(Bytes.toString(FAM_NAME1)).append(Path.SEPARATOR) + .append("hfile_" + i).toString(); + String dst = baseNamespaceDir + Path.SEPARATOR + pathToHfileFromNS; + + FileUtil.copy(fs, p.get(0), fs, new Path(dst), false, conf); + } + + entries.add(builder.build()); + ResultScanner scanRes = null; + try { + Scan scan = new Scan(); + scanRes = table1.getScanner(scan); + // 6. Assert no existing data in table + assertEquals(0, scanRes.next(numRows).length); + // 7. Replicate the bulk loaded entry + SINK.replicateEntries(entries, CellUtil.createCellScanner(edit.getCells().iterator()), + replicationClusterId, baseNamespaceDir, hfileArchiveDir); + scanRes = table1.getScanner(scan); + // 8. Assert data is replicated + assertEquals(numRows, scanRes.next(numRows).length); + } finally { + if (scanRes != null) { + scanRes.close(); + } + } + } + private WALEntry createEntry(TableName table, int row, KeyValue.Type type, List<Cell> cells) { byte[] fam = table.equals(TABLE_NAME1) ? FAM_NAME1 : FAM_NAME2; byte[] rowBytes = Bytes.toBytes(row); @@ -256,6 +373,13 @@ public class TestReplicationSink { kv = new KeyValue(rowBytes, fam, null, now, KeyValue.Type.DeleteFamily); } + WALEntry.Builder builder = createWALEntryBuilder(table); + cells.add(kv); + + return builder.build(); + } + + private WALEntry.Builder createWALEntryBuilder(TableName table) { WALEntry.Builder builder = WALEntry.newBuilder(); builder.setAssociatedCellCount(1); WALKey.Builder keyBuilder = WALKey.newBuilder(); @@ -264,13 +388,10 @@ public class TestReplicationSink { uuidBuilder.setMostSigBits(HConstants.DEFAULT_CLUSTER_ID.getMostSignificantBits()); keyBuilder.setClusterId(uuidBuilder.build()); keyBuilder.setTableName(ByteStringer.wrap(table.getName())); - keyBuilder.setWriteTime(now); + keyBuilder.setWriteTime(System.currentTimeMillis()); keyBuilder.setEncodedRegionName(ByteStringer.wrap(HConstants.EMPTY_BYTE_ARRAY)); keyBuilder.setLogSequenceNumber(-1); builder.setKey(keyBuilder.build()); - cells.add(kv); - - return builder.build(); + return builder; } - } http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index d50522c..a208120 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -19,13 +19,17 @@ package org.apache.hadoop.hbase.replication.regionserver; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.net.URLEncoder; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.NavigableMap; import java.util.SortedMap; import java.util.SortedSet; import java.util.TreeSet; @@ -51,6 +55,8 @@ import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; @@ -64,6 +70,7 @@ import org.apache.hadoop.hbase.replication.ReplicationStateZKBase; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.wal.WAL; @@ -108,6 +115,8 @@ public class TestReplicationSourceManager { private static final byte[] f1 = Bytes.toBytes("f1"); + private static final byte[] f2 = Bytes.toBytes("f2"); + private static final TableName test = TableName.valueOf("test"); @@ -161,10 +170,10 @@ public class TestReplicationSourceManager { manager.addSource(slaveId); htd = new HTableDescriptor(test); - HColumnDescriptor col = new HColumnDescriptor("f1"); + HColumnDescriptor col = new HColumnDescriptor(f1); col.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); htd.addFamily(col); - col = new HColumnDescriptor("f2"); + col = new HColumnDescriptor(f2); col.setScope(HConstants.REPLICATION_SCOPE_LOCAL); htd.addFamily(col); @@ -416,6 +425,63 @@ public class TestReplicationSourceManager { s0.abort("", null); } + @Test + public void testBulkLoadWALEditsWithoutBulkLoadReplicationEnabled() throws Exception { + // 1. Create wal key + WALKey logKey = new WALKey(); + // 2. Get the bulk load wal edit event + WALEdit logEdit = getBulkLoadWALEdit(); + + // 3. Get the scopes for the key + Replication.scopeWALEdits(htd, logKey, logEdit, conf, manager); + + // 4. Assert that no bulk load entry scopes are added if bulk load hfile replication is disabled + assertNull("No bulk load entries scope should be added if bulk load replication is diabled.", + logKey.getScopes()); + } + + @Test + public void testBulkLoadWALEdits() throws Exception { + // 1. Create wal key + WALKey logKey = new WALKey(); + // 2. Get the bulk load wal edit event + WALEdit logEdit = getBulkLoadWALEdit(); + // 3. Enable bulk load hfile replication + Configuration bulkLoadConf = HBaseConfiguration.create(conf); + bulkLoadConf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); + + // 4. Get the scopes for the key + Replication.scopeWALEdits(htd, logKey, logEdit, bulkLoadConf, manager); + + NavigableMap<byte[], Integer> scopes = logKey.getScopes(); + // Assert family with replication scope global is present in the key scopes + assertTrue("This family scope is set to global, should be part of replication key scopes.", + scopes.containsKey(f1)); + // Assert family with replication scope local is not present in the key scopes + assertFalse("This family scope is set to local, should not be part of replication key scopes", + scopes.containsKey(f2)); + } + + private WALEdit getBulkLoadWALEdit() { + // 1. Create store files for the families + Map<byte[], List<Path>> storeFiles = new HashMap<>(1); + List<Path> p = new ArrayList<>(1); + p.add(new Path(Bytes.toString(f1))); + storeFiles.put(f1, p); + + p = new ArrayList<>(1); + p.add(new Path(Bytes.toString(f2))); + storeFiles.put(f2, p); + + // 2. Create bulk load descriptor + BulkLoadDescriptor desc = ProtobufUtil.toBulkLoadDescriptor(hri.getTable(), + ByteStringer.wrap(hri.getEncodedNameAsBytes()), storeFiles, 1); + + // 3. create bulk load wal edit event + WALEdit logEdit = WALEdit.createBulkLoadEvent(hri, desc); + return logEdit; + } + static class DummyNodeFailoverWorker extends Thread { private SortedMap<String, SortedSet<String>> logZnodesMap; Server server; http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSourceFSConfigurationProvider.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSourceFSConfigurationProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSourceFSConfigurationProvider.java new file mode 100644 index 0000000..a14c02b --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSourceFSConfigurationProvider.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +@InterfaceAudience.Private +public class TestSourceFSConfigurationProvider implements SourceFSConfigurationProvider { + @Override + public Configuration getConf(Configuration sinkConf, String replicationClusterId) + throws IOException { + return sinkConf; + } +}