Author: stack Date: Tue May 21 20:28:55 2013 New Revision: 1484939 URL: http://svn.apache.org/r1484939 Log: HBASE-8581 rpc refactor dropped passing the operation timeout through to the rpcclient
Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java hbase/branches/0.95/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java hbase/branches/0.95/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1484939&r1=1484938&r2=1484939&view=diff ============================================================================== --- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original) +++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java Tue May 21 20:28:55 2013 @@ -243,9 +243,11 @@ public class HTable implements HTableInt */ private void finishSetup() throws IOException { this.connection.locateRegion(tableName, HConstants.EMPTY_START_ROW); - this.operationTimeout = HTableDescriptor.isMetaTable(tableName) ? HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT - : this.configuration.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, - HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); + this.operationTimeout = HTableDescriptor.isMetaTable(tableName) ? + this.configuration.getInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, + HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT): + this.configuration.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, + HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); this.writeBufferSize = this.configuration.getLong( "hbase.client.write.buffer", 2097152); this.clearBufferOnFail = true; @@ -547,8 +549,7 @@ public class HTable implements HTableInt if (scan.getCaching() <= 0) { scan.setCaching(getScannerCaching()); } - return new ClientScanner(getConfiguration(), scan, getTableName(), - this.connection); + return new ClientScanner(getConfiguration(), scan, getTableName(), this.connection); } /** @@ -888,6 +889,7 @@ public class HTable implements HTableInt try { GetRequest request = RequestConverter.buildGetRequest( location.getRegionInfo().getRegionName(), get, true); + GetResponse response = stub.get(null, request); return response.getExists(); } catch (ServiceException se) { Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java?rev=1484939&r1=1484938&r2=1484939&view=diff ============================================================================== --- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java (original) +++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java Tue May 21 20:28:55 2013 @@ -206,13 +206,13 @@ public abstract class ServerCallable<T> } // If, after the planned sleep, there won't be enough time left, we stop now. - if (((this.endTime - this.globalStartTime) + MIN_RPC_TIMEOUT + expectedSleep) > - this.callTimeout) { + long duration = singleCallDuration(expectedSleep); + if (duration > this.callTimeout) { throw (SocketTimeoutException) new SocketTimeoutException( "Call to access row '" + Bytes.toString(row) + "' on table '" + Bytes.toString(tableName) + "' failed on timeout. " + " callTimeout=" + this.callTimeout + - ", time=" + (this.endTime - this.startTime)).initCause(t); + ", callDuration=" + duration).initCause(t); } } finally { afterCall(); @@ -227,6 +227,14 @@ public abstract class ServerCallable<T> } /** + * @param expectedSleep + * @return Calculate how long a single call took + */ + private long singleCallDuration(final long expectedSleep) { + return (this.endTime - this.globalStartTime) + MIN_RPC_TIMEOUT + expectedSleep; + } + + /** * Run this instance against the server once. * @return an object of type T * @throws IOException if a remote or network exception occurs Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java?rev=1484939&r1=1484938&r2=1484939&view=diff ============================================================================== --- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java (original) +++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java Tue May 21 20:28:55 2013 @@ -1491,6 +1491,14 @@ public class RpcClient { return rpcTimeout.get(); } + /** + * Returns the lower of the thread-local RPC time from {@link #setRpcTimeout(int)} and the given + * default timeout. + */ + public static int getRpcTimeout(int defaultTimeout) { + return Math.min(defaultTimeout, rpcTimeout.get()); + } + public static void resetRpcTimeout() { rpcTimeout.remove(); } @@ -1575,7 +1583,9 @@ public class RpcClient { final User ticket, final int rpcTimeout) { this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort()); this.rpcClient = rpcClient; - this.rpcTimeout = rpcTimeout; + // Set the rpc timeout to be the minimum of configured timeout and whatever the current + // thread local setting is. + this.rpcTimeout = getRpcTimeout(rpcTimeout); this.ticket = ticket; } Modified: hbase/branches/0.95/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java?rev=1484939&r1=1484938&r2=1484939&view=diff ============================================================================== --- hbase/branches/0.95/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java (original) +++ hbase/branches/0.95/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java Tue May 21 20:28:55 2013 @@ -17,7 +17,10 @@ */ package org.apache.hadoop.hbase.client; +import static org.junit.Assert.*; + import java.io.IOException; +import java.net.SocketTimeoutException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -31,6 +34,7 @@ import org.apache.hadoop.hbase.exception import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService.BlockingInterface; +import org.apache.hadoop.hbase.util.Bytes; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; @@ -87,6 +91,41 @@ public class TestClientNoCluster { } } + /** + * Test that operation timeout prevails over rpc default timeout and retries, etc. + * @throws IOException + */ + @Test + public void testRocTimeout() throws IOException { + Configuration localConfig = HBaseConfiguration.create(this.conf); + // This override mocks up our exists/get call to throw a RegionServerStoppedException. + localConfig.set("hbase.client.connection.impl", RpcTimeoutConnection.class.getName()); + int pause = 10; + localConfig.setInt("hbase.client.pause", pause); + localConfig.setInt("hbase.client.retries.number", 10); + // Set the operation timeout to be < the pause. Expectation is that after first pause, we will + // fail out of the rpc because the rpc timeout will have been set to the operation tiemout + // and it has expired. Otherwise, if this functionality is broke, all retries will be run -- + // all ten of them -- and we'll get the RetriesExhaustedException exception. + localConfig.setInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, pause - 1); + HTable table = new HTable(localConfig, HConstants.META_TABLE_NAME); + Throwable t = null; + try { + // An exists call turns into a get w/ a flag. + table.exists(new Get(Bytes.toBytes("abc"))); + } catch (SocketTimeoutException e) { + // I expect this exception. + LOG.info("Got expected exception", e); + t = e; + } catch (RetriesExhaustedException e) { + // This is the old, unwanted behavior. If we get here FAIL!!! + fail(); + } finally { + table.close(); + } + assertTrue(t != null); + } + @Test public void testDoNotRetryMetaScanner() throws IOException { this.conf.set("hbase.client.connection.impl", @@ -197,4 +236,31 @@ public class TestClientNoCluster { return this.stub; } } -} \ No newline at end of file + + /** + * Override to check we are setting rpc timeout right. + */ + static class RpcTimeoutConnection + extends HConnectionManager.HConnectionImplementation { + final ClientService.BlockingInterface stub; + + RpcTimeoutConnection(Configuration conf, boolean managed) + throws IOException { + super(conf, managed); + // Mock up my stub so an exists call -- which turns into a get -- throws an exception + this.stub = Mockito.mock(ClientService.BlockingInterface.class); + try { + Mockito.when(stub.get((RpcController)Mockito.any(), + (ClientProtos.GetRequest)Mockito.any())). + thenThrow(new ServiceException(new RegionServerStoppedException("From Mockito"))); + } catch (ServiceException e) { + throw new IOException(e); + } + } + + @Override + public BlockingInterface getClient(ServerName sn) throws IOException { + return this.stub; + } + } +} Modified: hbase/branches/0.95/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1484939&r1=1484938&r2=1484939&view=diff ============================================================================== --- hbase/branches/0.95/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java (original) +++ hbase/branches/0.95/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java Tue May 21 20:28:55 2013 @@ -264,6 +264,10 @@ public final class HConstants { /** Parameter name for HBase client operation timeout, which overrides RPC timeout */ public static final String HBASE_CLIENT_OPERATION_TIMEOUT = "hbase.client.operation.timeout"; + /** Parameter name for HBase client operation timeout, which overrides RPC timeout */ + public static final String HBASE_CLIENT_META_OPERATION_TIMEOUT = + "hbase.client.meta.operation.timeout"; + /** Default HBase client operation timeout, which is tantamount to a blocking call */ public static final int DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT = Integer.MAX_VALUE;