Repository: hbase Updated Branches: refs/heads/master d44e7df5d -> e28ec7246
http://git-wip-us.apache.org/repos/asf/hbase/blob/e28ec724/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java new file mode 100644 index 0000000..d8d576f --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java @@ -0,0 +1,345 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.replication.regionserver; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.RpcRetryingCaller; +import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.regionserver.wal.HLogKey; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.log4j.Level; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.common.collect.Lists; + +/** + * Tests RegionReplicaReplicationEndpoint class by setting up region replicas and verifying + * async wal replication replays the edits to the secondary region in various scenarios. + */ +@Category(MediumTests.class) +public class TestRegionReplicaReplicationEndpoint { + + private static final Log LOG = LogFactory.getLog(TestRegionReplicaReplicationEndpoint.class); + + static { + ((Log4JLogger)RpcRetryingCaller.LOG).getLogger().setLevel(Level.ALL); + } + + private static final int NB_SERVERS = 2; + + private static final HBaseTestingUtility HTU = new HBaseTestingUtility(); + + @BeforeClass + public static void beforeClass() throws Exception { + Configuration conf = HTU.getConfiguration(); + conf.setFloat("hbase.regionserver.logroll.multiplier", 0.0003f); + conf.setInt("replication.source.size.capacity", 10240); + conf.setLong("replication.source.sleepforretries", 100); + conf.setInt("hbase.regionserver.maxlogs", 10); + conf.setLong("hbase.master.logcleaner.ttl", 10); + conf.setInt("zookeeper.recovery.retry", 1); + conf.setInt("zookeeper.recovery.retry.intervalmill", 10); + conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true); + conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true); + conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); + conf.setInt("replication.stats.thread.period.seconds", 5); + conf.setBoolean("hbase.tests.use.shortcircuit.reads", false); + conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3); // less number of retries is needed + conf.setInt("hbase.client.serverside.retries.multiplier", 1); + + HTU.startMiniCluster(NB_SERVERS); + } + + @AfterClass + public static void afterClass() throws Exception { + HTU.shutdownMiniCluster(); + } + + @Test + public void testRegionReplicaReplicationPeerIsCreated() throws IOException, ReplicationException { + // create a table with region replicas. Check whether the replication peer is created + // and replication started. + ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration()); + String peerId = "region_replica_replication"; + + if (admin.getPeerConfig(peerId) != null) { + admin.removePeer(peerId); + } + + HTableDescriptor htd = HTU.createTableDescriptor( + "testReplicationPeerIsCreated_no_region_replicas"); + HTU.getHBaseAdmin().createTable(htd); + ReplicationPeerConfig peerConfig = admin.getPeerConfig(peerId); + assertNull(peerConfig); + + htd = HTU.createTableDescriptor("testReplicationPeerIsCreated"); + htd.setRegionReplication(2); + HTU.getHBaseAdmin().createTable(htd); + + // assert peer configuration is correct + peerConfig = admin.getPeerConfig(peerId); + assertNotNull(peerConfig); + assertEquals(peerConfig.getClusterKey(), ZKUtil.getZooKeeperClusterKey(HTU.getConfiguration())); + assertEquals(peerConfig.getReplicationEndpointImpl(), + RegionReplicaReplicationEndpoint.class.getName()); + admin.close(); + } + + + public void testRegionReplicaReplication(int regionReplication) throws Exception { + // test region replica replication. Create a table with single region, write some data + // ensure that data is replicated to the secondary region + TableName tableName = TableName.valueOf("testRegionReplicaReplicationWithReplicas_" + + regionReplication); + HTableDescriptor htd = HTU.createTableDescriptor(tableName.toString()); + htd.setRegionReplication(regionReplication); + HTU.getHBaseAdmin().createTable(htd); + TableName tableNameNoReplicas = + TableName.valueOf("testRegionReplicaReplicationWithReplicas_NO_REPLICAS"); + HTU.deleteTableIfAny(tableNameNoReplicas); + HTU.createTable(tableNameNoReplicas, HBaseTestingUtility.fam1); + + HConnection connection = HConnectionManager.createConnection(HTU.getConfiguration()); + HTableInterface table = connection.getTable(tableName); + HTableInterface tableNoReplicas = connection.getTable(tableNameNoReplicas); + + try { + // load some data to the non-replicated table + HTU.loadNumericRows(tableNoReplicas, HBaseTestingUtility.fam1, 6000, 7000); + + // load the data to the table + HTU.loadNumericRows(table, HBaseTestingUtility.fam1, 0, 1000); + + verifyReplication(tableName, regionReplication, 0, 1000); + + } finally { + table.close(); + tableNoReplicas.close(); + HTU.deleteTableIfAny(tableNameNoReplicas); + connection.close(); + } + } + + private void verifyReplication(TableName tableName, int regionReplication, + final int startRow, final int endRow) throws Exception { + // find the regions + final HRegion[] regions = new HRegion[regionReplication]; + + for (int i=0; i < NB_SERVERS; i++) { + HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i); + List<HRegion> onlineRegions = rs.getOnlineRegions(tableName); + for (HRegion region : onlineRegions) { + regions[region.getRegionInfo().getReplicaId()] = region; + } + } + + for (HRegion region : regions) { + assertNotNull(region); + } + + for (int i = 1; i < regionReplication; i++) { + final HRegion region = regions[i]; + // wait until all the data is replicated to all secondary regions + Waiter.waitFor(HTU.getConfiguration(), 60000, new Waiter.Predicate<Exception>() { + @Override + public boolean evaluate() throws Exception { + LOG.info("verifying replication for region replica:" + region.getRegionInfo()); + try { + HTU.verifyNumericRows(region, HBaseTestingUtility.fam1, startRow, endRow); + } catch(Throwable ex) { + LOG.warn("Verification from secondary region is not complete yet. Got:" + ex + + " " + ex.getMessage()); + // still wait + return false; + } + return true; + } + }); + } + } + + @Test(timeout = 60000) + public void testRegionReplicaReplicationWith2Replicas() throws Exception { + testRegionReplicaReplication(2); + } + + @Test(timeout = 60000) + public void testRegionReplicaReplicationWith3Replicas() throws Exception { + testRegionReplicaReplication(3); + } + + @Test(timeout = 60000) + public void testRegionReplicaReplicationWith10Replicas() throws Exception { + testRegionReplicaReplication(10); + } + + @Test (timeout = 60000) + public void testRegionReplicaReplicationForFlushAndCompaction() throws Exception { + // Tests a table with region replication 3. Writes some data, and causes flushes and + // compactions. Verifies that the data is readable from the replicas. Note that this + // does not test whether the replicas actually pick up flushed files and apply compaction + // to their stores + int regionReplication = 3; + TableName tableName = TableName.valueOf("testRegionReplicaReplicationForFlushAndCompaction"); + HTableDescriptor htd = HTU.createTableDescriptor(tableName.toString()); + htd.setRegionReplication(regionReplication); + HTU.getHBaseAdmin().createTable(htd); + + + HConnection connection = HConnectionManager.createConnection(HTU.getConfiguration()); + HTableInterface table = connection.getTable(tableName); + + try { + // load the data to the table + + for (int i = 0; i < 6000; i += 1000) { + LOG.info("Writing data from " + i + " to " + (i+1000)); + HTU.loadNumericRows(table, HBaseTestingUtility.fam1, i, i+1000); + LOG.info("flushing table"); + HTU.flush(tableName); + LOG.info("compacting table"); + HTU.compact(tableName, false); + } + + verifyReplication(tableName, regionReplication, 0, 6000); + } finally { + table.close(); + connection.close(); + } + } + + @Test (timeout = 60000) + public void testRegionReplicaReplicationIgnoresDisabledTables() throws Exception { + testRegionReplicaReplicationIgnoresDisabledTables(false); + } + + @Test (timeout = 60000) + public void testRegionReplicaReplicationIgnoresDroppedTables() throws Exception { + testRegionReplicaReplicationIgnoresDisabledTables(true); + } + + public void testRegionReplicaReplicationIgnoresDisabledTables(boolean dropTable) + throws Exception { + // tests having edits from a disabled or dropped table is handled correctly by skipping those + // entries and further edits after the edits from dropped/disabled table can be replicated + // without problems. + TableName tableName = TableName.valueOf("testRegionReplicaReplicationIgnoresDisabledTables" + + dropTable); + HTableDescriptor htd = HTU.createTableDescriptor(tableName.toString()); + int regionReplication = 3; + htd.setRegionReplication(regionReplication); + HTU.deleteTableIfAny(tableName); + HTU.getHBaseAdmin().createTable(htd); + TableName toBeDisabledTable = TableName.valueOf(dropTable ? "droppedTable" : "disabledTable"); + HTU.deleteTableIfAny(toBeDisabledTable); + htd = HTU.createTableDescriptor(toBeDisabledTable.toString()); + htd.setRegionReplication(regionReplication); + HTU.getHBaseAdmin().createTable(htd); + + // both tables are created, now pause replication + ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration()); + admin.disablePeer(ServerRegionReplicaUtil.getReplicationPeerId()); + + // now that the replication is disabled, write to the table to be dropped, then drop the table. + + HConnection connection = HConnectionManager.createConnection(HTU.getConfiguration()); + HTableInterface table = connection.getTable(tableName); + HTableInterface tableToBeDisabled = connection.getTable(toBeDisabledTable); + + HTU.loadNumericRows(tableToBeDisabled, HBaseTestingUtility.fam1, 6000, 7000); + + AtomicLong skippedEdits = new AtomicLong(); + RegionReplicaReplicationEndpoint.RegionReplicaOutputSink sink = + mock(RegionReplicaReplicationEndpoint.RegionReplicaOutputSink.class); + when(sink.getSkippedEditsCounter()).thenReturn(skippedEdits); + RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter sinkWriter = + new RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter(sink, + (ClusterConnection) connection, + Executors.newSingleThreadExecutor(), 1000); + + HRegionLocation hrl = connection.locateRegion(toBeDisabledTable, HConstants.EMPTY_BYTE_ARRAY); + byte[] encodedRegionName = hrl.getRegionInfo().getEncodedNameAsBytes(); + + HLog.Entry entry = new HLog.Entry( + new HLogKey(encodedRegionName, toBeDisabledTable, 1), + new WALEdit()); + + HTU.getHBaseAdmin().disableTable(toBeDisabledTable); // disable the table + if (dropTable) { + HTU.getHBaseAdmin().deleteTable(toBeDisabledTable); + } + + sinkWriter.append(toBeDisabledTable, encodedRegionName, + HConstants.EMPTY_BYTE_ARRAY, Lists.newArrayList(entry, entry)); + + assertEquals(2, skippedEdits.get()); + + try { + // load some data to the to-be-dropped table + + // load the data to the table + HTU.loadNumericRows(table, HBaseTestingUtility.fam1, 0, 1000); + + // now enable the replication + admin.enablePeer(ServerRegionReplicaUtil.getReplicationPeerId()); + + verifyReplication(tableName, regionReplication, 0, 1000); + + } finally { + admin.close(); + table.close(); + tableToBeDisabled.close(); + HTU.deleteTableIfAny(toBeDisabledTable); + connection.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/e28ec724/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java new file mode 100644 index 0000000..4e879e4 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java @@ -0,0 +1,264 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.replication.regionserver; + +import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.closeRegion; +import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.openRegion; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.RegionLocations; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; +import org.apache.hadoop.hbase.coprocessor.BaseWALObserver; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.WALCoprocessorEnvironment; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster; +import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.regionserver.wal.HLogKey; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.replication.ReplicationEndpoint; +import org.apache.hadoop.hbase.replication.ReplicationEndpoint.ReplicateContext; +import org.apache.hadoop.hbase.replication.regionserver.RegionReplicaReplicationEndpoint.RegionReplicaReplayCallable; +import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.common.collect.Lists; + +/** + * Tests RegionReplicaReplicationEndpoint. Unlike TestRegionReplicaReplicationEndpoint this + * class contains lower level tests using callables. + */ +@Category(MediumTests.class) +public class TestRegionReplicaReplicationEndpointNoMaster { + + private static final Log LOG = LogFactory.getLog( + TestRegionReplicaReplicationEndpointNoMaster.class); + + private static final int NB_SERVERS = 2; + private static TableName tableName = TableName.valueOf( + TestRegionReplicaReplicationEndpointNoMaster.class.getSimpleName()); + private static HTable table; + private static final byte[] row = "TestRegionReplicaReplicator".getBytes(); + + private static HRegionServer rs0; + private static HRegionServer rs1; + + private static HRegionInfo hriPrimary; + private static HRegionInfo hriSecondary; + + private static final HBaseTestingUtility HTU = new HBaseTestingUtility(); + private static final byte[] f = HConstants.CATALOG_FAMILY; + + @BeforeClass + public static void beforeClass() throws Exception { + Configuration conf = HTU.getConfiguration(); + conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true); + conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true); + + // install WALObserver coprocessor for tests + String walCoprocs = HTU.getConfiguration().get(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY); + if (walCoprocs == null) { + walCoprocs = WALEditCopro.class.getName(); + } else { + walCoprocs += "," + WALEditCopro.class.getName(); + } + HTU.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY, + walCoprocs); + HTU.startMiniCluster(NB_SERVERS); + + // Create table then get the single region for our new table. + HTableDescriptor htd = HTU.createTableDescriptor(tableName.toString()); + table = HTU.createTable(htd, new byte[][]{f}, HTU.getConfiguration()); + + hriPrimary = table.getRegionLocation(row, false).getRegionInfo(); + + // mock a secondary region info to open + hriSecondary = new HRegionInfo(hriPrimary.getTable(), hriPrimary.getStartKey(), + hriPrimary.getEndKey(), hriPrimary.isSplit(), hriPrimary.getRegionId(), 1); + + // No master + TestRegionServerNoMaster.stopMasterAndAssignMeta(HTU); + rs0 = HTU.getMiniHBaseCluster().getRegionServer(0); + rs1 = HTU.getMiniHBaseCluster().getRegionServer(1); + } + + @AfterClass + public static void afterClass() throws Exception { + table.close(); + HTU.shutdownMiniCluster(); + } + + @Before + public void before() throws Exception{ + entries.clear(); + } + + @After + public void after() throws Exception { + } + + static ConcurrentLinkedQueue<HLog.Entry> entries = new ConcurrentLinkedQueue<HLog.Entry>(); + + public static class WALEditCopro extends BaseWALObserver { + public WALEditCopro() { + entries.clear(); + } + @Override + public void postWALWrite(ObserverContext<WALCoprocessorEnvironment> ctx, HRegionInfo info, + HLogKey logKey, WALEdit logEdit) throws IOException { + // only keep primary region's edits + if (logKey.getTablename().equals(tableName) && info.getReplicaId() == 0) { + entries.add(new HLog.Entry(logKey, logEdit)); + } + } + } + + @Test + public void testReplayCallable() throws Exception { + // tests replaying the edits to a secondary region replica using the Callable directly + openRegion(HTU, rs0, hriSecondary); + ClusterConnection connection = + (ClusterConnection) HConnectionManager.createConnection(HTU.getConfiguration()); + + //load some data to primary + HTU.loadNumericRows(table, f, 0, 1000); + + Assert.assertEquals(1000, entries.size()); + // replay the edits to the secondary using replay callable + replicateUsingCallable(connection, entries); + + HRegion region = rs0.getFromOnlineRegions(hriSecondary.getEncodedName()); + HTU.verifyNumericRows(region, f, 0, 1000); + + HTU.deleteNumericRows(table, f, 0, 1000); + closeRegion(HTU, rs0, hriSecondary); + connection.close(); + } + + private void replicateUsingCallable(ClusterConnection connection, Queue<HLog.Entry> entries) + throws IOException, RuntimeException { + HLog.Entry entry; + while ((entry = entries.poll()) != null) { + byte[] row = entry.getEdit().getKeyValues().get(0).getRow(); + RegionLocations locations = connection.locateRegion(tableName, row, true, true); + RegionReplicaReplayCallable callable = new RegionReplicaReplayCallable(connection, + RpcControllerFactory.instantiate(connection.getConfiguration()), + table.getName(), locations.getRegionLocation(1), + locations.getRegionLocation(1).getRegionInfo(), row, Lists.newArrayList(entry), + new AtomicLong()); + + RpcRetryingCallerFactory factory = RpcRetryingCallerFactory.instantiate( + connection.getConfiguration()); + factory.<ReplicateWALEntryResponse> newCaller().callWithRetries(callable, 10000); + } + } + + @Test + public void testReplayCallableWithRegionMove() throws Exception { + // tests replaying the edits to a secondary region replica using the Callable directly while + // the region is moved to another location.It tests handling of RME. + openRegion(HTU, rs0, hriSecondary); + ClusterConnection connection = + (ClusterConnection) HConnectionManager.createConnection(HTU.getConfiguration()); + //load some data to primary + HTU.loadNumericRows(table, f, 0, 1000); + + Assert.assertEquals(1000, entries.size()); + // replay the edits to the secondary using replay callable + replicateUsingCallable(connection, entries); + + HRegion region = rs0.getFromOnlineRegions(hriSecondary.getEncodedName()); + HTU.verifyNumericRows(region, f, 0, 1000); + + HTU.loadNumericRows(table, f, 1000, 2000); // load some more data to primary + + // move the secondary region from RS0 to RS1 + closeRegion(HTU, rs0, hriSecondary); + openRegion(HTU, rs1, hriSecondary); + + // replicate the new data + replicateUsingCallable(connection, entries); + + region = rs1.getFromOnlineRegions(hriSecondary.getEncodedName()); + // verify the new data. old data may or may not be there + HTU.verifyNumericRows(region, f, 1000, 2000); + + HTU.deleteNumericRows(table, f, 0, 2000); + closeRegion(HTU, rs1, hriSecondary); + connection.close(); + } + + @Test + public void testRegionReplicaReplicationEndpointReplicate() throws Exception { + // tests replaying the edits to a secondary region replica using the RRRE.replicate() + openRegion(HTU, rs0, hriSecondary); + ClusterConnection connection = + (ClusterConnection) HConnectionManager.createConnection(HTU.getConfiguration()); + RegionReplicaReplicationEndpoint replicator = new RegionReplicaReplicationEndpoint(); + + ReplicationEndpoint.Context context = mock(ReplicationEndpoint.Context.class); + when(context.getConfiguration()).thenReturn(HTU.getConfiguration()); + + replicator.init(context); + replicator.start(); + + //load some data to primary + HTU.loadNumericRows(table, f, 0, 1000); + + Assert.assertEquals(1000, entries.size()); + // replay the edits to the secondary using replay callable + replicator.replicate(new ReplicateContext().setEntries(Lists.newArrayList(entries))); + + HRegion region = rs0.getFromOnlineRegions(hriSecondary.getEncodedName()); + HTU.verifyNumericRows(region, f, 0, 1000); + + HTU.deleteNumericRows(table, f, 0, 1000); + closeRegion(HTU, rs0, hriSecondary); + connection.close(); + } + +}