liuxiaocs7 commented on code in PR #8286: URL: https://github.com/apache/hbase/pull/8286#discussion_r3331935203
########## hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationEndpointTestBase.java: ########## @@ -0,0 +1,644 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; +import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource; +import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSourceImpl; +import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceImpl; +import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSource; +import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSourceImpl; +import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationTableSource; +import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALEditInternalHelper; +import org.apache.hadoop.hbase.wal.WALKeyImpl; +import org.apache.hadoop.hbase.zookeeper.ZKConfig; +import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Tests ReplicationSource and ReplicationEndpoint interactions + */ +public class ReplicationEndpointTestBase extends TestReplicationBaseNoBeforeAll { + + private static final Logger LOG = LoggerFactory.getLogger(ReplicationEndpointTestBase.class); + + static int numRegionServers; + + protected static void setUpBeforeClass() throws Exception { + configureClusters(UTIL1, UTIL2); + startClusters(); + numRegionServers = UTIL1.getHBaseCluster().getRegionServerThreads().size(); + } + + @AfterAll + public static void assertStopped() { + // check stop is called + assertTrue(ReplicationEndpointForTest.stoppedCount.get() > 0); + } + + @BeforeEach + public void setup() throws Exception { + ReplicationEndpointForTest.contructedCount.set(0); + ReplicationEndpointForTest.startedCount.set(0); + ReplicationEndpointForTest.replicateCount.set(0); + ReplicationEndpointReturningFalse.replicated.set(false); + ReplicationEndpointForTest.lastEntries = null; + final List<RegionServerThread> rsThreads = UTIL1.getMiniHBaseCluster().getRegionServerThreads(); + for (RegionServerThread rs : rsThreads) { + UTIL1.getAdmin().rollWALWriter(rs.getRegionServer().getServerName()); + } + // Wait for all log roll to finish + UTIL1.waitFor(3000, new Waiter.ExplainingPredicate<Exception>() { + @Override + public boolean evaluate() throws Exception { + for (RegionServerThread rs : rsThreads) { + if (!rs.getRegionServer().walRollRequestFinished()) { + return false; + } + } + return true; + } + + @Override + public String explainFailure() throws Exception { + List<String> logRollInProgressRsList = new ArrayList<>(); + for (RegionServerThread rs : rsThreads) { + if (!rs.getRegionServer().walRollRequestFinished()) { + logRollInProgressRsList.add(rs.getRegionServer().toString()); + } + } + return "Still waiting for log roll on regionservers: " + logRollInProgressRsList; + } + }); + } + + @Test + public void testCustomReplicationEndpoint() throws Exception { + // test installing a custom replication endpoint other than the default one. + hbaseAdmin.addReplicationPeer("testCustomReplicationEndpoint", + ReplicationPeerConfig.newBuilder().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1)) + .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()).build()); + + // check whether the class has been constructed and started + Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() { + @Override + public boolean evaluate() throws Exception { + return ReplicationEndpointForTest.contructedCount.get() >= numRegionServers; + } + }); + + Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() { + @Override + public boolean evaluate() throws Exception { + return ReplicationEndpointForTest.startedCount.get() >= numRegionServers; + } + }); + + assertEquals(0, ReplicationEndpointForTest.replicateCount.get()); + + // now replicate some data. + doPut(Bytes.toBytes("row42")); + + Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() { + @Override + public boolean evaluate() throws Exception { + return ReplicationEndpointForTest.replicateCount.get() >= 1; + } + }); + + doAssert(Bytes.toBytes("row42")); + + hbaseAdmin.removeReplicationPeer("testCustomReplicationEndpoint"); + } + + @Test + public void testReplicationEndpointReturnsFalseOnReplicate() throws Exception { + assertEquals(0, ReplicationEndpointForTest.replicateCount.get()); + assertTrue(!ReplicationEndpointReturningFalse.replicated.get()); + int peerCount = hbaseAdmin.listReplicationPeers().size(); + final String id = "testReplicationEndpointReturnsFalseOnReplicate"; + hbaseAdmin.addReplicationPeer(id, + ReplicationPeerConfig.newBuilder().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1)) + .setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName()).build()); + // This test is flakey and then there is so much stuff flying around in here its, hard to + // debug. Peer needs to be up for the edit to make it across. This wait on + // peer count seems to be a hack that has us not progress till peer is up. + if (hbaseAdmin.listReplicationPeers().size() <= peerCount) { + LOG.info("Waiting on peercount to go up from " + peerCount); + Threads.sleep(100); + } + // now replicate some data + doPut(row); + + Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() { + @Override + public boolean evaluate() throws Exception { + // Looks like replication endpoint returns false unless we put more than 10 edits. We + // only send over one edit. + int count = ReplicationEndpointForTest.replicateCount.get(); + LOG.info("count=" + count); + return ReplicationEndpointReturningFalse.replicated.get(); + } + }); + if (ReplicationEndpointReturningFalse.ex.get() != null) { + throw ReplicationEndpointReturningFalse.ex.get(); + } + + hbaseAdmin.removeReplicationPeer("testReplicationEndpointReturnsFalseOnReplicate"); + } + + @Test + public void testInterClusterReplication() throws Exception { + final String id = "testInterClusterReplication"; + + List<HRegion> regions = UTIL1.getHBaseCluster().getRegions(tableName); + int totEdits = 0; + + // Make sure edits are spread across regions because we do region based batching + // before shipping edits. + for (HRegion region : regions) { + RegionInfo hri = region.getRegionInfo(); + byte[] row = hri.getStartKey(); + for (int i = 0; i < 100; i++) { + if (row.length > 0) { + Put put = new Put(row); + put.addColumn(famName, row, row); + region.put(put); + totEdits++; + } + } + } + + hbaseAdmin.addReplicationPeer(id, + ReplicationPeerConfig.newBuilder().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF2)) + .setReplicationEndpointImpl(InterClusterReplicationEndpointForTest.class.getName()) + .build()); + + final int numEdits = totEdits; + Waiter.waitFor(CONF1, 30000, new Waiter.ExplainingPredicate<Exception>() { + @Override + public boolean evaluate() throws Exception { + return InterClusterReplicationEndpointForTest.replicateCount.get() == numEdits; + } + + @Override + public String explainFailure() throws Exception { + String failure = "Failed to replicate all edits, expected = " + numEdits + " replicated = " + + InterClusterReplicationEndpointForTest.replicateCount.get(); + return failure; + } + }); + + hbaseAdmin.removeReplicationPeer("testInterClusterReplication"); + UTIL1.deleteTableData(tableName); + } + + @Test + public void testWALEntryFilterFromReplicationEndpoint() throws Exception { + ReplicationPeerConfig rpc = + ReplicationPeerConfig.newBuilder().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1)) + .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()) + // test that we can create mutliple WALFilters reflectively + .putConfiguration(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY, + EverythingPassesWALEntryFilter.class.getName() + "," + + EverythingPassesWALEntryFilterSubclass.class.getName()) + .build(); + + hbaseAdmin.addReplicationPeer("testWALEntryFilterFromReplicationEndpoint", rpc); + // now replicate some data. + try (Connection connection = ConnectionFactory.createConnection(CONF1)) { + doPut(connection, Bytes.toBytes("row1")); + doPut(connection, row); + doPut(connection, Bytes.toBytes("row2")); + } + + Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() { + @Override + public boolean evaluate() throws Exception { + return ReplicationEndpointForTest.replicateCount.get() >= 1; + } + }); + + assertNull(ReplicationEndpointWithWALEntryFilter.ex.get()); + // make sure our reflectively created filter is in the filter chain + assertTrue(EverythingPassesWALEntryFilter.hasPassedAnEntry()); + hbaseAdmin.removeReplicationPeer("testWALEntryFilterFromReplicationEndpoint"); + } + + @Test + public void testWALEntryFilterAddValidation() throws Exception { + ReplicationPeerConfig rpc = + ReplicationPeerConfig.newBuilder().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1)) + .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()) + // test that we can create mutliple WALFilters reflectively + .putConfiguration(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY, + "IAmNotARealWalEntryFilter") + .build(); + assertThrows(IOException.class, + () -> hbaseAdmin.addReplicationPeer("testWALEntryFilterAddValidation", rpc)); + } + + @Test + public void testWALEntryFilterUpdateValidation() throws Exception { + ReplicationPeerConfig rpc = + ReplicationPeerConfig.newBuilder().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1)) + .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()) + // test that we can create mutliple WALFilters reflectively + .putConfiguration(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY, + "IAmNotARealWalEntryFilter") + .build(); + assertThrows(IOException.class, + () -> hbaseAdmin.updateReplicationPeerConfig("testWALEntryFilterUpdateValidation", rpc)); + } + + @Test + public void testMetricsSourceBaseSourcePassThrough() { + /* + * The replication MetricsSource wraps a MetricsReplicationTableSourceImpl, + * MetricsReplicationSourceSourceImpl and a MetricsReplicationGlobalSourceSource, so that + * metrics get written to both namespaces. Both of those classes wrap a + * MetricsReplicationSourceImpl that implements BaseSource, which allows for custom JMX metrics. + * This test checks to make sure the BaseSource decorator logic on MetricsSource actually calls + * down through the two layers of wrapping to the actual BaseSource. + */ + String id = "id"; + DynamicMetricsRegistry mockRegistry = mock(DynamicMetricsRegistry.class); + MetricsReplicationSourceImpl singleRms = mock(MetricsReplicationSourceImpl.class); + when(singleRms.getMetricsRegistry()).thenReturn(mockRegistry); + MetricsReplicationSourceImpl globalRms = mock(MetricsReplicationSourceImpl.class); + when(globalRms.getMetricsRegistry()).thenReturn(mockRegistry); + + MetricsReplicationSourceSource singleSourceSource = + new MetricsReplicationSourceSourceImpl(singleRms, id); + MetricsReplicationGlobalSourceSource globalSourceSource = + new MetricsReplicationGlobalSourceSourceImpl(globalRms); + MetricsReplicationGlobalSourceSource spyglobalSourceSource = spy(globalSourceSource); + doNothing().when(spyglobalSourceSource).incrFailedRecoveryQueue(); + + Map<String, MetricsReplicationTableSource> singleSourceSourceByTable = new HashMap<>(); + MetricsSource source = + new MetricsSource(id, singleSourceSource, spyglobalSourceSource, singleSourceSourceByTable); + + String gaugeName = "gauge"; + String singleGaugeName = "source.id." + gaugeName; + String globalGaugeName = "source." + gaugeName; + long delta = 1; + String counterName = "counter"; + String singleCounterName = "source.id." + counterName; + String globalCounterName = "source." + counterName; + long count = 2; + source.decGauge(gaugeName, delta); + source.getMetricsContext(); + source.getMetricsDescription(); + source.getMetricsJmxContext(); + source.getMetricsName(); + source.incCounters(counterName, count); + source.incGauge(gaugeName, delta); + source.init(); + source.removeMetric(gaugeName); + source.setGauge(gaugeName, delta); + source.updateHistogram(counterName, count); + source.incrFailedRecoveryQueue(); + + verify(singleRms).decGauge(singleGaugeName, delta); + verify(globalRms).decGauge(globalGaugeName, delta); + verify(globalRms).getMetricsContext(); + verify(globalRms).getMetricsJmxContext(); + verify(globalRms).getMetricsName(); + verify(singleRms).incCounters(singleCounterName, count); + verify(globalRms).incCounters(globalCounterName, count); + verify(singleRms).incGauge(singleGaugeName, delta); + verify(globalRms).incGauge(globalGaugeName, delta); + verify(globalRms).init(); + verify(singleRms).removeMetric(singleGaugeName); + verify(globalRms).removeMetric(globalGaugeName); + verify(singleRms).setGauge(singleGaugeName, delta); + verify(globalRms).setGauge(globalGaugeName, delta); + verify(singleRms).updateHistogram(singleCounterName, count); + verify(globalRms).updateHistogram(globalCounterName, count); + verify(spyglobalSourceSource).incrFailedRecoveryQueue(); + + // check singleSourceSourceByTable metrics. + // singleSourceSourceByTable map entry will be created only + // after calling #setAgeOfLastShippedOpByTable + boolean containsRandomNewTable = + source.getSingleSourceSourceByTable().containsKey("RandomNewTable"); + assertEquals(false, containsRandomNewTable); + source.updateTableLevelMetrics(createWALEntriesWithSize("RandomNewTable")); + containsRandomNewTable = source.getSingleSourceSourceByTable().containsKey("RandomNewTable"); + assertEquals(true, containsRandomNewTable); + MetricsReplicationTableSource msr = source.getSingleSourceSourceByTable().get("RandomNewTable"); + + // age should be greater than zero we created the entry with time in the past + assertTrue(msr.getLastShippedAge() > 0); + assertTrue(msr.getShippedBytes() > 0); + + } + + private List<Pair<Entry, Long>> createWALEntriesWithSize(String tableName) { + List<Pair<Entry, Long>> walEntriesWithSize = new ArrayList<>(); + byte[] a = new byte[] { 'a' }; + Entry entry = createEntry(tableName, null, a); + walEntriesWithSize.add(new Pair<>(entry, 10L)); + return walEntriesWithSize; + } + + private Entry createEntry(String tableName, TreeMap<byte[], Integer> scopes, byte[]... kvs) { + WALKeyImpl key1 = new WALKeyImpl(new byte[0], TableName.valueOf(tableName), + EnvironmentEdgeManager.currentTime() - 1L, scopes); + WALEdit edit1 = new WALEdit(); + + for (byte[] kv : kvs) { + WALEditInternalHelper.addExtendedCell(edit1, new KeyValue(kv, kv, kv)); + } + return new Entry(key1, edit1); + } + + private void doPut(byte[] row) throws IOException { + try (Connection connection = ConnectionFactory.createConnection(CONF1)) { + doPut(connection, row); + } + } + + private void doPut(final Connection connection, final byte[] row) throws IOException { + try (Table t = connection.getTable(tableName)) { + Put put = new Put(row); + put.addColumn(famName, row, row); + t.put(put); + } + } + + private static void doAssert(byte[] row) throws Exception { + if (ReplicationEndpointForTest.lastEntries == null) { + return; // first call + } + assertEquals(1, ReplicationEndpointForTest.lastEntries.size()); + List<Cell> cells = ReplicationEndpointForTest.lastEntries.get(0).getEdit().getCells(); + assertEquals(1, cells.size()); + assertTrue(Bytes.equals(cells.get(0).getRowArray(), cells.get(0).getRowOffset(), + cells.get(0).getRowLength(), row, 0, row.length)); + } + + public static class ReplicationEndpointForTest extends BaseReplicationEndpoint { + static UUID uuid = HBaseTestingUtil.getRandomUUID(); + static AtomicInteger contructedCount = new AtomicInteger(); + static AtomicInteger startedCount = new AtomicInteger(); + static AtomicInteger stoppedCount = new AtomicInteger(); + static AtomicInteger replicateCount = new AtomicInteger(); + static volatile List<Entry> lastEntries = null; + + public ReplicationEndpointForTest() { + replicateCount.set(0); + contructedCount.incrementAndGet(); + } + + @Override + public UUID getPeerUUID() { + return uuid; + } + + @Override + public boolean replicate(ReplicateContext replicateContext) { + replicateCount.incrementAndGet(); + lastEntries = new ArrayList<>(replicateContext.entries); + return true; + } + + @Override + public void start() { + startAsync(); + } + + @Override + public void stop() { + stopAsync(); + } + + @Override + protected void doStart() { + startedCount.incrementAndGet(); + notifyStarted(); + } + + @Override + protected void doStop() { + stoppedCount.incrementAndGet(); + notifyStopped(); + } + + @Override + public boolean canReplicateToSameCluster() { + return true; + } + } + + /** + * Not used by unit tests, helpful for manual testing with replication. + * <p> + * Snippet for `hbase shell`: + * + * <pre> + * create 't', 'f' + * add_peer '1', ENDPOINT_CLASSNAME => 'org.apache.hadoop.hbase.replication.' + \ + * 'TestReplicationEndpoint$SleepingReplicationEndpointForTest' Review Comment: We need to update the doc here to `ReplicationEndpointTestBase$SleepingReplicationEndpointForTest` after refactoring? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
