HBASE-15001 Fix thread-safety issues with replication ReplicationSinkManager and HBaseInterClusterReplicationEndpoint perform certain unsafe operations which might lead to undesirable behavior with multiwal enabled.
Signed-off-by: Elliott Clark <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/48113d75 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/48113d75 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/48113d75 Branch: refs/heads/hbase-12439 Commit: 48113d7572bbea2f05ae619882d43aa41827f147 Parents: ba04e03 Author: Ashu Pachauri <[email protected]> Authored: Thu Dec 17 13:25:39 2015 -0800 Committer: Elliott Clark <[email protected]> Committed: Fri Dec 18 11:48:55 2015 -0800 ---------------------------------------------------------------------- .../HBaseInterClusterReplicationEndpoint.java | 27 +++++++++++++++----- .../regionserver/ReplicationSinkManager.java | 21 ++++++++++----- .../TestReplicationSinkManager.java | 26 +++++++++---------- 3 files changed, 49 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/48113d75/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index 70cc420..78e3e00 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -143,9 +143,9 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi int sleepMultiplier = 1; // Connect to peer cluster first, unless we have to stop - while (this.isRunning() && replicationSinkMgr.getSinks().size() == 0) { + while (this.isRunning() && replicationSinkMgr.getNumSinks() == 0) { replicationSinkMgr.chooseSinks(); - if (this.isRunning() && replicationSinkMgr.getSinks().size() == 0) { + if (this.isRunning() && replicationSinkMgr.getNumSinks() == 0) { if (sleepForRetries("Waiting for peers", sleepMultiplier)) { sleepMultiplier++; } @@ -180,19 +180,24 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi List<Entry> entries = replicateContext.getEntries(); String walGroupId = replicateContext.getWalGroupId(); int sleepMultiplier = 1; + int numReplicated = 0; if (!peersSelected && this.isRunning()) { connectToPeers(); peersSelected = true; } - if (replicationSinkMgr.getSinks().size() == 0) { + int numSinks = replicationSinkMgr.getNumSinks(); + if (numSinks == 0) { + LOG.warn("No replication sinks found, returning without replicating. The source should retry" + + " with the same set of edits."); return false; } + // minimum of: configured threads, number of 100-waledit batches, // and number of current sinks - int n = Math.min(Math.min(this.maxThreads, entries.size()/100+1), - replicationSinkMgr.getSinks().size()); + int n = Math.min(Math.min(this.maxThreads, entries.size()/100+1), numSinks); + List<List<Entry>> entryLists = new ArrayList<List<Entry>>(n); if (n == 1) { entryLists.add(entries); @@ -237,7 +242,11 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi // wait for all futures, remove successful parts // (only the remaining parts will be retried) Future<Integer> f = pool.take(); - entryLists.set(f.get().intValue(), Collections.<Entry>emptyList()); + int index = f.get().intValue(); + int batchSize = entryLists.get(index).size(); + entryLists.set(index, Collections.<Entry>emptyList()); + // Now, we have marked the batch as done replicating, record its size + numReplicated += batchSize; } catch (InterruptedException ie) { iox = new IOException(ie); } catch (ExecutionException ee) { @@ -249,6 +258,12 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi // if we had any exceptions, try again throw iox; } + if (numReplicated != entries.size()) { + // Something went wrong here and we don't know what, let's just fail and retry. + LOG.warn("The number of edits replicated is different from the number received," + + " failing for now."); + return false; + } // update metrics this.metrics.setAgeOfLastShippedOp(entries.get(entries.size() - 1).getKey().getWriteTime(), walGroupId); http://git-wip-us.apache.org/repos/asf/hbase/blob/48113d75/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java index 76fa6c2..0469f9b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.Random; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -105,7 +106,7 @@ public class ReplicationSinkManager { * * @return a replication sink to replicate to */ - public SinkPeer getReplicationSink() throws IOException { + public synchronized SinkPeer getReplicationSink() throws IOException { if (endpoint.getLastRegionServerUpdate() > this.lastUpdateToPeers || sinks.isEmpty()) { LOG.info("Current list of sinks is out of date or empty, updating"); chooseSinks(); @@ -127,7 +128,7 @@ public class ReplicationSinkManager { * @param sinkPeer * The SinkPeer that had a failed replication attempt on it */ - public void reportBadSink(SinkPeer sinkPeer) { + public synchronized void reportBadSink(SinkPeer sinkPeer) { ServerName serverName = sinkPeer.getServerName(); int badReportCount = (badReportCounts.containsKey(serverName) ? badReportCounts.get(serverName) : 0) + 1; @@ -146,11 +147,14 @@ public class ReplicationSinkManager { * @param sinkPeer * The SinkPeer that had a failed replication attempt on it */ - public void reportSinkSuccess(SinkPeer sinkPeer) { + public synchronized void reportSinkSuccess(SinkPeer sinkPeer) { badReportCounts.remove(sinkPeer.getServerName()); } - void chooseSinks() { + /** + * Refresh the list of sinks. + */ + public synchronized void chooseSinks() { List<ServerName> slaveAddresses = endpoint.getRegionServers(); Collections.shuffle(slaveAddresses, random); int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio); @@ -159,8 +163,13 @@ public class ReplicationSinkManager { badReportCounts.clear(); } - List<ServerName> getSinks() { - return sinks; + public synchronized int getNumSinks() { + return sinks.size(); + } + + @VisibleForTesting + protected List<ServerName> getSinksForTesting() { + return Collections.unmodifiableList(sinks); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/48113d75/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java index 57c3196..104753a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java @@ -67,7 +67,7 @@ public class TestReplicationSinkManager { sinkManager.chooseSinks(); - assertEquals(2, sinkManager.getSinks().size()); + assertEquals(2, sinkManager.getNumSinks()); } @@ -81,7 +81,7 @@ public class TestReplicationSinkManager { sinkManager.chooseSinks(); - assertEquals(1, sinkManager.getSinks().size()); + assertEquals(1, sinkManager.getNumSinks()); } @Test @@ -93,14 +93,14 @@ public class TestReplicationSinkManager { sinkManager.chooseSinks(); // Sanity check - assertEquals(1, sinkManager.getSinks().size()); + assertEquals(1, sinkManager.getNumSinks()); SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AdminService.BlockingInterface.class)); sinkManager.reportBadSink(sinkPeer); // Just reporting a bad sink once shouldn't have an effect - assertEquals(1, sinkManager.getSinks().size()); + assertEquals(1, sinkManager.getNumSinks()); } @@ -120,9 +120,9 @@ public class TestReplicationSinkManager { sinkManager.chooseSinks(); // Sanity check - assertEquals(3, sinkManager.getSinks().size()); + assertEquals(3, sinkManager.getNumSinks()); - ServerName serverName = sinkManager.getSinks().get(0); + ServerName serverName = sinkManager.getSinksForTesting().get(0); SinkPeer sinkPeer = new SinkPeer(serverName, mock(AdminService.BlockingInterface.class)); @@ -133,12 +133,12 @@ public class TestReplicationSinkManager { // Reporting a bad sink more than the threshold count should remove it // from the list of potential sinks - assertEquals(2, sinkManager.getSinks().size()); + assertEquals(2, sinkManager.getNumSinks()); // // now try a sink that has some successes // - serverName = sinkManager.getSinks().get(0); + serverName = sinkManager.getSinksForTesting().get(0); sinkPeer = new SinkPeer(serverName, mock(AdminService.BlockingInterface.class)); for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD-1; i++) { @@ -148,17 +148,17 @@ public class TestReplicationSinkManager { sinkManager.reportBadSink(sinkPeer); // did not remove the sink, since we had one successful try - assertEquals(2, sinkManager.getSinks().size()); + assertEquals(2, sinkManager.getNumSinks()); for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD-2; i++) { sinkManager.reportBadSink(sinkPeer); } // still not remove, since the success reset the counter - assertEquals(2, sinkManager.getSinks().size()); + assertEquals(2, sinkManager.getNumSinks()); sinkManager.reportBadSink(sinkPeer); // but we exhausted the tries - assertEquals(1, sinkManager.getSinks().size()); + assertEquals(1, sinkManager.getNumSinks()); } @Test @@ -174,7 +174,7 @@ public class TestReplicationSinkManager { sinkManager.chooseSinks(); // Sanity check - List<ServerName> sinkList = sinkManager.getSinks(); + List<ServerName> sinkList = sinkManager.getSinksForTesting(); assertEquals(2, sinkList.size()); ServerName serverNameA = sinkList.get(0); @@ -190,7 +190,7 @@ public class TestReplicationSinkManager { // We've gone down to 0 good sinks, so the replication sinks // should have been refreshed now - assertEquals(2, sinkManager.getSinks().size()); + assertEquals(2, sinkManager.getNumSinks()); } }
