Author: sershe Date: Fri Apr 5 00:40:58 2013 New Revision: 1464799 URL: http://svn.apache.org/r1464799 Log: HBASE-7649 client retry timeout doesn't need to do x2 fallback when going to different server
Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java hbase/branches/0.95/hbase-it/src/test/java/org/apache/hadoop/hbase/IngestIntegrationTestBase.java hbase/branches/0.95/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRebalanceAndKillServersTargeted.java hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1464799&r1=1464798&r2=1464799&view=diff ============================================================================== --- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original) +++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Fri Apr 5 00:40:58 2013 @@ -95,6 +95,7 @@ import org.apache.hadoop.hbase.zookeeper import org.apache.hadoop.ipc.RemoteException; import org.apache.zookeeper.KeeperException; +import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ServiceException; /** @@ -167,6 +168,8 @@ public class HConnectionManager { /** Default admin protocol class name. */ public static final String DEFAULT_ADMIN_PROTOCOL_CLASS = AdminProtocol.class.getName(); + public static final String RETRIES_BY_SERVER = "hbase.client.retries.by.server"; + private static final Log LOG = LogFactory.getLog(HConnectionManager.class); static { @@ -513,10 +516,12 @@ public class HConnectionManager { private final Class<? extends AdminProtocol> adminClass; private final Class<? extends ClientProtocol> clientClass; private final long pause; - private final int numRetries; + private final int numTries; private final int maxRPCAttempts; private final int rpcTimeout; private final int prefetchRegionLimit; + private final boolean useServerTrackerForRetries; + private final long serverTrackerTimeout; private volatile boolean closed; private volatile boolean aborted; @@ -602,7 +607,7 @@ public class HConnectionManager { } this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE); - this.numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, + this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); this.maxRPCAttempts = conf.getInt( HConstants.HBASE_CLIENT_RPC_MAXATTEMPTS, @@ -613,7 +618,21 @@ public class HConnectionManager { this.prefetchRegionLimit = conf.getInt( HConstants.HBASE_CLIENT_PREFETCH_LIMIT, HConstants.DEFAULT_HBASE_CLIENT_PREFETCH_LIMIT); - + this.useServerTrackerForRetries = conf.getBoolean(RETRIES_BY_SERVER, true); + long serverTrackerTimeout = 0; + if (this.useServerTrackerForRetries) { + // Server tracker allows us to do faster, and yet useful (hopefully), retries. + // However, if we are too useful, we might fail very quickly due to retry count limit. + // To avoid this, we are going to cheat for now (see HBASE-7659), and calculate maximum + // retry time if normal retries were used. Then we will retry until this time runs out. + // If we keep hitting one server, the net effect will be the incremental backoff, and + // essentially the same number of retries as planned. If we have to do faster retries, + // we will do more retries in aggregate, but the user will be none the wiser. + for (int i = 0; i < this.numTries; ++i) { + serverTrackerTimeout += ConnectionUtils.getPauseTime(this.pause, i); + } + } + this.serverTrackerTimeout = serverTrackerTimeout; retrieveClusterId(); // ProtobufRpcClientEngine is the main RpcClientEngine implementation, @@ -772,10 +791,10 @@ public class HConnectionManager { if (exceptionCaught != null) // It failed. If it's not the last try, we're going to wait a little - if (tries < numRetries) { + if (tries < numTries) { // tries at this point is 1 or more; decrement to start from 0. long pauseTime = ConnectionUtils.getPauseTime(this.pause, tries - 1); - LOG.info("getMaster attempt " + tries + " of " + numRetries + + LOG.info("getMaster attempt " + tries + " of " + numTries + " failed; retrying after sleep of " +pauseTime + ", exception=" + exceptionCaught); try { @@ -788,7 +807,7 @@ public class HConnectionManager { } else { // Enough tries, we stop now - LOG.info("getMaster attempt " + tries + " of " + numRetries + + LOG.info("getMaster attempt " + tries + " of " + numTries + " failed; no more retrying.", exceptionCaught); throw new MasterNotRunningException(exceptionCaught); } @@ -1103,7 +1122,7 @@ public class HConnectionManager { return location; } } - int localNumRetries = retry ? numRetries : 1; + int localNumRetries = retry ? numTries : 1; // build the key of the meta region we should be looking for. // the extra 9's on the end are necessary to allow "exact" matches // without knowing the precise region names. @@ -1112,7 +1131,7 @@ public class HConnectionManager { for (int tries = 0; true; tries++) { if (tries >= localNumRetries) { throw new NoServerForRegionException("Unable to find region for " - + Bytes.toStringBinary(row) + " after " + numRetries + " tries."); + + Bytes.toStringBinary(row) + " after " + numTries + " tries."); } HRegionLocation metaLocation = null; @@ -1210,13 +1229,13 @@ public class HConnectionManager { if (e instanceof RemoteException) { e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); } - if (tries < numRetries - 1) { + if (tries < numTries - 1) { if (LOG.isDebugEnabled()) { LOG.debug("locateRegionInMeta parentTable=" + Bytes.toString(parentTable) + ", metaLocation=" + ((metaLocation == null)? "null": "{" + metaLocation + "}") + ", attempt=" + tries + " of " + - this.numRetries + " failed; retrying after sleep of " + + this.numTries + " failed; retrying after sleep of " + ConnectionUtils.getPauseTime(this.pause, tries) + " because: " + e.getMessage()); } } else { @@ -1969,6 +1988,8 @@ public class HConnectionManager { private final List<Action<R>> toReplay; private final LinkedList<Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>>> inProgress; + + private ServerErrorTracker errorsByServer = null; private int curNumRetries; // Notified when a tasks is done @@ -1994,10 +2015,11 @@ public class HConnectionManager { * Group a list of actions per region servers, and send them. The created MultiActions are * added to the inProgress list. * @param actionsList - * @param sleepTime - sleep time before actually executing the actions. Can be zero. + * @param isRetry Whether we are retrying these actions. If yes, backoff + * time may be applied before new requests. * @throws IOException - if we can't locate a region after multiple retries. */ - private void submit(List<Action<R>> actionsList, final long sleepTime) throws IOException { + private void submit(List<Action<R>> actionsList, final boolean isRetry) throws IOException { // group per location => regions server final Map<HRegionLocation, MultiAction<R>> actionsByServer = new HashMap<HRegionLocation, MultiAction<R>>(); @@ -2022,15 +2044,25 @@ public class HConnectionManager { // Send the queries and add them to the inProgress list for (Entry<HRegionLocation, MultiAction<R>> e : actionsByServer.entrySet()) { + long backoffTime = 0; + if (isRetry) { + if (hci.isUsingServerTrackerForRetries()) { + assert this.errorsByServer != null; + backoffTime = this.errorsByServer.calculateBackoffTime(e.getKey(), hci.pause); + } else { + // curNumRetries starts with one, subtract to start from 0. + backoffTime = ConnectionUtils.getPauseTime(hci.pause, curNumRetries - 1); + } + } Callable<MultiResponse> callable = - createDelayedCallable(sleepTime, e.getKey(), e.getValue()); - if (LOG.isTraceEnabled() && (sleepTime > 0)) { + createDelayedCallable(backoffTime, e.getKey(), e.getValue()); + if (LOG.isTraceEnabled() && isRetry) { StringBuilder sb = new StringBuilder(); for (Action<R> action : e.getValue().allActions()) { sb.append(Bytes.toStringBinary(action.getAction().getRow())).append(';'); } - LOG.trace("Sending requests to [" + e.getKey().getHostnamePort() - + "] with delay of [" + sleepTime + "] for rows [" + sb.toString() + "]"); + LOG.trace("Will retry requests to [" + e.getKey().getHostnamePort() + + "] after delay of [" + backoffTime + "] for rows [" + sb.toString() + "]"); } Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>> p = new Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>>( @@ -2044,9 +2076,7 @@ public class HConnectionManager { * @throws IOException */ private void doRetry() throws IOException{ - // curNumRetries at this point is 1 or more; decrement to start from 0. - final long sleepTime = ConnectionUtils.getPauseTime(hci.pause, this.curNumRetries - 1); - submit(this.toReplay, sleepTime); + submit(this.toReplay, true); this.toReplay.clear(); } @@ -2085,7 +2115,7 @@ public class HConnectionManager { } // execute the actions. We will analyze and resubmit the actions in a 'while' loop. - submit(listActions, 0); + submit(listActions, false); // LastRetry is true if, either: // we had an exception 'DoNotRetry' @@ -2094,7 +2124,7 @@ public class HConnectionManager { boolean lastRetry = false; // despite its name numRetries means number of tries. So if numRetries == 1 it means we // won't retry. And we compare vs. 2 in case someone set it to zero. - boolean noRetry = (hci.numRetries < 2); + boolean noRetry = (hci.numTries < 2); // Analyze and resubmit until all actions are done successfully or failed after numRetries while (!this.inProgress.isEmpty()) { @@ -2112,7 +2142,7 @@ public class HConnectionManager { } catch (ExecutionException e) { exception = e; } - + HRegionLocation location = currentTask.getSecond(); // Error case: no result at all for this multi action. We need to redo all actions if (responses == null) { for (List<Action<R>> actions : currentTask.getFirst().actions.values()) { @@ -2120,14 +2150,14 @@ public class HConnectionManager { Row row = action.getAction(); // Do not use the exception for updating cache because it might be coming from // any of the regions in the MultiAction. - hci.updateCachedLocations(tableName, row, null, currentTask.getSecond()); + hci.updateCachedLocations(tableName, row, null, location); if (noRetry) { - errors.add(exception, row, currentTask); + errors.add(exception, row, location); } else { if (isTraceEnabled) { - retriedErrors.add(exception, row, currentTask); + retriedErrors.add(exception, row, location); } - lastRetry = addToReplay(nbRetries, action); + lastRetry = addToReplay(nbRetries, action, location); } } } @@ -2146,14 +2176,14 @@ public class HConnectionManager { // Failure: retry if it's make sense else update the errors lists if (result == null || result instanceof Throwable) { Row row = correspondingAction.getAction(); - hci.updateCachedLocations(this.tableName, row, result, currentTask.getSecond()); + hci.updateCachedLocations(this.tableName, row, result, location); if (result instanceof DoNotRetryIOException || noRetry) { - errors.add((Exception)result, row, currentTask); + errors.add((Exception)result, row, location); } else { if (isTraceEnabled) { - retriedErrors.add((Exception)result, row, currentTask); + retriedErrors.add((Exception)result, row, location); } - lastRetry = addToReplay(nbRetries, correspondingAction); + lastRetry = addToReplay(nbRetries, correspondingAction, location); } } else // success if (callback != null) { @@ -2186,11 +2216,10 @@ public class HConnectionManager { private List<Row> actions = new ArrayList<Row>(); private List<String> addresses = new ArrayList<String>(); - public void add(Exception ex, Row row, - Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>> obj) { + public void add(Exception ex, Row row, HRegionLocation location) { exceptions.add(ex); actions.add(row); - addresses.add(obj.getSecond().getHostnamePort()); + addresses.add(location.getHostnamePort()); } public void rethrowIfAny() throws RetriesExhaustedWithDetailsException { @@ -2219,17 +2248,24 @@ public class HConnectionManager { * Put the action that has to be retried in the Replay list. * @return true if we're out of numRetries and it's the last retry. */ - private boolean addToReplay(int[] nbRetries, Action<R> action) { + private boolean addToReplay(int[] nbRetries, Action<R> action, HRegionLocation source) { this.toReplay.add(action); nbRetries[action.getOriginalIndex()]++; if (nbRetries[action.getOriginalIndex()] > this.curNumRetries) { this.curNumRetries = nbRetries[action.getOriginalIndex()]; } - // numRetries means number of tries, while curNumRetries means current number of retries. So - // we need to add 1 to make them comparable. And as we look for the last try we compare - // with '>=' and no '>'. And we need curNumRetries to means what it says as we don't want - // to initialize it to 1. - return ( (this.curNumRetries +1) >= hci.numRetries); + if (hci.isUsingServerTrackerForRetries()) { + if (this.errorsByServer == null) { + this.errorsByServer = hci.createServerErrorTracker(); + } + this.errorsByServer.reportServerError(source); + return !this.errorsByServer.canRetryMore(); + } else { + // We need to add 1 to make tries and retries comparable. And as we look for + // the last try we compare with '>=' and not '>'. And we need curNumRetries + // to means what it says as we don't want to initialize it to 1. + return ((this.curNumRetries + 1) >= hci.numTries); + } } /** @@ -2521,8 +2557,102 @@ public class HConnectionManager { void setRpcEngine(RpcClientEngine engine) { this.rpcEngine = engine; } - } + /** + * The record of errors for servers. Visible for testing. + */ + @VisibleForTesting + static class ServerErrorTracker { + private final Map<HRegionLocation, ServerErrors> errorsByServer = + new HashMap<HRegionLocation, ServerErrors>(); + private long canRetryUntil = 0; + + public ServerErrorTracker(long timeout) { + LOG.info("Server tracker timeout is " + timeout + "ms"); + this.canRetryUntil = EnvironmentEdgeManager.currentTimeMillis() + timeout; + } + + boolean canRetryMore() { + return EnvironmentEdgeManager.currentTimeMillis() < this.canRetryUntil; + } + + /** + * Calculates the back-off time for a retrying request to a particular server. + * This is here, and package private, for testing (no good way to get at it). + * @param server The server in question. + * @param basePause The default hci pause. + * @return The time to wait before sending next request. + */ + long calculateBackoffTime(HRegionLocation server, long basePause) { + long result = 0; + ServerErrors errorStats = errorsByServer.get(server); + if (errorStats != null) { + result = ConnectionUtils.getPauseTime(basePause, errorStats.retries); + // Adjust by the time we already waited since last talking to this server. + long now = EnvironmentEdgeManager.currentTimeMillis(); + long timeSinceLastError = now - errorStats.getLastErrorTime(); + if (timeSinceLastError > 0) { + result = Math.max(0, result - timeSinceLastError); + } + // Finally, see if the backoff time overshoots the timeout. + if (result > 0 && (now + result > this.canRetryUntil)) { + result = Math.max(0, this.canRetryUntil - now); + } + } + return result; + } + + /** + * Reports that there was an error on the server to do whatever bean-counting necessary. + * This is here, and package private, for testing (no good way to get at it). + * @param server The server in question. + */ + void reportServerError(HRegionLocation server) { + ServerErrors errors = errorsByServer.get(server); + if (errors != null) { + errors.addError(); + } else { + errorsByServer.put(server, new ServerErrors()); + } + } + + /** + * The record of errors for a server. + */ + private static class ServerErrors { + public long lastErrorTime; + public int retries; + + public ServerErrors() { + this.lastErrorTime = EnvironmentEdgeManager.currentTimeMillis(); + this.retries = 0; + } + + public void addError() { + this.lastErrorTime = EnvironmentEdgeManager.currentTimeMillis(); + ++this.retries; + } + + public long getLastErrorTime() { + return this.lastErrorTime; + } + } + } + + public boolean isUsingServerTrackerForRetries() { + return this.useServerTrackerForRetries; + } + /** + * Creates the server error tracker to use inside process. + * Currently, to preserve the main assumption about current retries, and to work well with + * the retry-limit-based calculation, the calculation is local per Process object. + * We may benefit from connection-wide tracking of server errors. + * @return ServerErrorTracker to use. + */ + ServerErrorTracker createServerErrorTracker() { + return new ServerErrorTracker(this.serverTrackerTimeout); + } + } /** * Set the number of retries to use serverside when trying to communicate * with another server over {@link HConnection}. Used updating catalog Modified: hbase/branches/0.95/hbase-it/src/test/java/org/apache/hadoop/hbase/IngestIntegrationTestBase.java URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-it/src/test/java/org/apache/hadoop/hbase/IngestIntegrationTestBase.java?rev=1464799&r1=1464798&r2=1464799&view=diff ============================================================================== --- hbase/branches/0.95/hbase-it/src/test/java/org/apache/hadoop/hbase/IngestIntegrationTestBase.java (original) +++ hbase/branches/0.95/hbase-it/src/test/java/org/apache/hadoop/hbase/IngestIntegrationTestBase.java Fri Apr 5 00:40:58 2013 @@ -24,6 +24,7 @@ import junit.framework.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.LoadTestTool; @@ -42,9 +43,9 @@ public abstract class IngestIntegrationT protected HBaseCluster cluster; private LoadTestTool loadTool; - protected void setUp(int numSlavesBase) throws Exception { + protected void setUp(int numSlavesBase, Configuration conf) throws Exception { tableName = this.getClass().getSimpleName(); - util = new IntegrationTestingUtility(); + util = (conf == null) ? new IntegrationTestingUtility() : new IntegrationTestingUtility(conf); LOG.info("Initializing cluster with " + numSlavesBase + " servers"); util.initializeCluster(numSlavesBase); LOG.info("Done initializing cluster"); @@ -58,6 +59,10 @@ public abstract class IngestIntegrationT Assert.assertEquals("Failed to initialize LoadTestTool", 0, ret); } + protected void setUp(int numSlavesBase) throws Exception { + setUp(numSlavesBase, null); + } + protected void tearDown() throws Exception { LOG.info("Restoring the cluster"); util.restoreCluster(); Modified: hbase/branches/0.95/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRebalanceAndKillServersTargeted.java URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRebalanceAndKillServersTargeted.java?rev=1464799&r1=1464798&r2=1464799&view=diff ============================================================================== --- hbase/branches/0.95/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRebalanceAndKillServersTargeted.java (original) +++ hbase/branches/0.95/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRebalanceAndKillServersTargeted.java Fri Apr 5 00:40:58 2013 @@ -27,6 +27,8 @@ import junit.framework.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ChaosMonkey; import org.apache.hadoop.hbase.util.Pair; @@ -100,7 +102,9 @@ public class IntegrationTestRebalanceAnd @Before @SuppressWarnings("unchecked") public void setUp() throws Exception { - super.setUp(NUM_SLAVES_BASE); + Configuration conf = HBaseConfiguration.create(); + conf.set(HConnectionManager.RETRIES_BY_SERVER, "true"); + super.setUp(NUM_SLAVES_BASE, conf); ChaosMonkey.Policy chaosPolicy = new ChaosMonkey.PeriodicRandomActionPolicy( CHAOS_EVERY_MS, new UnbalanceKillAndRebalanceAction()); Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java?rev=1464799&r1=1464798&r2=1464799&view=diff ============================================================================== --- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java (original) +++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java Fri Apr 5 00:40:58 2013 @@ -18,10 +18,7 @@ */ package org.apache.hadoop.hbase.client; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; import java.io.IOException; import java.lang.reflect.Field; @@ -42,6 +39,7 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.ServerName; @@ -55,6 +53,8 @@ import org.apache.hadoop.hbase.master.HM import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.Threads; import org.junit.AfterClass; @@ -304,13 +304,13 @@ public class TestHCM { // Hijack the number of retry to fail immediately instead of retrying: there will be no new // connection to the master - Field numRetries = conn.getClass().getDeclaredField("numRetries"); - numRetries.setAccessible(true); + Field numTries = conn.getClass().getDeclaredField("numTries"); + numTries.setAccessible(true); Field modifiersField = Field.class.getDeclaredField("modifiers"); modifiersField.setAccessible(true); - modifiersField.setInt(numRetries, numRetries.getModifiers() & ~Modifier.FINAL); - final int prevNumRetriesVal = (Integer)numRetries.get(conn); - numRetries.set(conn, 1); + modifiersField.setInt(numTries, numTries.getModifiers() & ~Modifier.FINAL); + final int prevNumRetriesVal = (Integer)numTries.get(conn); + numTries.set(conn, 1); // We do a put and expect the cache to be updated, even if we don't retry LOG.info("Put starting"); @@ -379,7 +379,7 @@ public class TestHCM { "Previous server was "+destServer.getServerName().getHostAndPort(), curServer.getServerName().getPort(), conn.getCachedLocation(TABLE_NAME, ROW).getPort()); - numRetries.set(conn, prevNumRetriesVal); + numTries.set(conn, prevNumRetriesVal); table.close(); } @@ -705,13 +705,13 @@ public class TestHCM { conn.getCachedLocation(TABLE_NAME3, ROW_X).getPort() == destServerName.getPort()); // Hijack the number of retry to fail after 2 tries - Field numRetries = conn.getClass().getDeclaredField("numRetries"); - numRetries.setAccessible(true); + Field numTries = conn.getClass().getDeclaredField("numTries"); + numTries.setAccessible(true); Field modifiersField = Field.class.getDeclaredField("modifiers"); modifiersField.setAccessible(true); - modifiersField.setInt(numRetries, numRetries.getModifiers() & ~Modifier.FINAL); - final int prevNumRetriesVal = (Integer)numRetries.get(conn); - numRetries.set(conn, 2); + modifiersField.setInt(numTries, numTries.getModifiers() & ~Modifier.FINAL); + final int prevNumRetriesVal = (Integer)numTries.get(conn); + numTries.set(conn, 2); Put put3 = new Put(ROW_X); put3.add(FAM_NAM, ROW_X, ROW_X); @@ -722,10 +722,83 @@ public class TestHCM { table.batch(Lists.newArrayList(put4, put3)); // first should be a valid row, // second we get RegionMovedException. - numRetries.set(conn, prevNumRetriesVal); + numTries.set(conn, prevNumRetriesVal); table.close(); conn.close(); } + @Test + public void testErrorBackoffTimeCalculation() throws Exception { + final long ANY_PAUSE = 1000; + HRegionInfo ri = new HRegionInfo(TABLE_NAME); + HRegionLocation location = new HRegionLocation(ri, new ServerName("127.0.0.1", 1, 0)); + HRegionLocation diffLocation = new HRegionLocation(ri, new ServerName("127.0.0.1", 2, 0)); + + ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge(); + EnvironmentEdgeManager.injectEdge(timeMachine); + try { + long timeBase = timeMachine.currentTimeMillis(); + long largeAmountOfTime = ANY_PAUSE * 1000; + HConnectionImplementation.ServerErrorTracker tracker = + new HConnectionImplementation.ServerErrorTracker(largeAmountOfTime); + + // The default backoff is 0. + assertEquals(0, tracker.calculateBackoffTime(location, ANY_PAUSE)); + + // Check some backoff values from HConstants sequence. + tracker.reportServerError(location); + assertEqualsWithJitter(ANY_PAUSE, tracker.calculateBackoffTime(location, ANY_PAUSE)); + tracker.reportServerError(location); + tracker.reportServerError(location); + tracker.reportServerError(location); + assertEqualsWithJitter(ANY_PAUSE * 2, tracker.calculateBackoffTime(location, ANY_PAUSE)); + + // All of this shouldn't affect backoff for different location. + + assertEquals(0, tracker.calculateBackoffTime(diffLocation, ANY_PAUSE)); + tracker.reportServerError(diffLocation); + assertEqualsWithJitter(ANY_PAUSE, tracker.calculateBackoffTime(diffLocation, ANY_PAUSE)); + + // But should still work for a different region in the same location. + HRegionInfo ri2 = new HRegionInfo(TABLE_NAME2); + HRegionLocation diffRegion = new HRegionLocation(ri2, location.getServerName()); + assertEqualsWithJitter(ANY_PAUSE * 2, tracker.calculateBackoffTime(diffRegion, ANY_PAUSE)); + + // Check with different base. + assertEqualsWithJitter(ANY_PAUSE * 4, + tracker.calculateBackoffTime(location, ANY_PAUSE * 2)); + + // See that time from last error is taken into account. Time shift is applied after jitter, + // so pass the original expected backoff as the base for jitter. + long timeShift = (long)(ANY_PAUSE * 0.5); + timeMachine.setValue(timeBase + timeShift); + assertEqualsWithJitter(ANY_PAUSE * 2 - timeShift, + tracker.calculateBackoffTime(location, ANY_PAUSE), ANY_PAUSE * 2); + + // However we should not go into negative. + timeMachine.setValue(timeBase + ANY_PAUSE * 100); + assertEquals(0, tracker.calculateBackoffTime(location, ANY_PAUSE)); + + // We also should not go over the boundary; last retry would be on it. + long timeLeft = (long)(ANY_PAUSE * 0.5); + timeMachine.setValue(timeBase + largeAmountOfTime - timeLeft); + assertTrue(tracker.canRetryMore()); + tracker.reportServerError(location); + assertEquals(timeLeft, tracker.calculateBackoffTime(location, ANY_PAUSE)); + timeMachine.setValue(timeBase + largeAmountOfTime); + assertFalse(tracker.canRetryMore()); + } finally { + EnvironmentEdgeManager.reset(); + } + } + + private static void assertEqualsWithJitter(long expected, long actual) { + assertEqualsWithJitter(expected, actual, expected); + } + + private static void assertEqualsWithJitter(long expected, long actual, long jitterBase) { + assertTrue("Value not within jitter: " + expected + " vs " + actual, + Math.abs(actual - expected) <= (0.01f * jitterBase)); + } }