Author: jgray
Date: Mon Nov 1 18:01:57 2010
New Revision: 1029776
URL: http://svn.apache.org/viewvc?rev=1029776&view=rev
Log:
HBASE-3154 HBase RPC should support timeout (Hairong via jgray)
Modified:
hbase/trunk/CHANGES.txt
hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
Modified: hbase/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1029776&r1=1029775&r2=1029776&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Mon Nov 1 18:01:57 2010
@@ -1079,6 +1079,7 @@ Release 0.21.0 - Unreleased
cacheBlocks=true
HBASE-3126 Force use of 'mv -f' when moving aside hbase logfiles
HBASE-3176 Remove compile warnings in HRegionServer
+ HBASE-3154 HBase RPC should support timeout (Hairong via jgray)
NEW FEATURES
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1029776&r1=1029775&r2=1029776&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java Mon Nov
1 18:01:57 2010
@@ -365,6 +365,16 @@ public final class HConstants {
* Default value of {...@link #HBASE_REGIONSERVER_LEASE_PERIOD_KEY}.
*/
public static long DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD = 60000;
+
+ /**
+ * timeout for each RPC
+ */
+ public static String HBASE_RPC_TIMEOUT_KEY = "hbase.rpc.timeout";
+
+ /**
+ * Default value of {...@link #HBASE_RPC_TIMEOUT_KEY}
+ */
+ public static int DEFAULT_HBASE_RPC_TIMEOUT = 60000;
public static final String
REPLICATION_ENABLE_KEY = "hbase.replication";
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1029776&r1=1029775&r2=1029776&view=diff
==============================================================================
---
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
(original)
+++
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
Mon Nov 1 18:01:57 2010
@@ -224,7 +224,7 @@ public class HConnectionManager {
private final long pause;
private final int numRetries;
private final int maxRPCAttempts;
- private final long rpcTimeout;
+ private final int rpcTimeout;
private final int prefetchRegionLimit;
private final Object masterLock = new Object();
@@ -282,9 +282,9 @@ public class HConnectionManager {
this.pause = conf.getLong("hbase.client.pause", 1000);
this.numRetries = conf.getInt("hbase.client.retries.number", 10);
this.maxRPCAttempts = conf.getInt("hbase.client.rpc.maxattempts", 1);
- this.rpcTimeout = conf.getLong(
- HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
- HConstants.DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD);
+ this.rpcTimeout = conf.getInt(
+ HConstants.HBASE_RPC_TIMEOUT_KEY,
+ HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
this.prefetchRegionLimit = conf.getInt("hbase.client.prefetch.limit",
10);
@@ -341,7 +341,7 @@ public class HConnectionManager {
HMasterInterface tryMaster = (HMasterInterface)HBaseRPC.getProxy(
HMasterInterface.class, HBaseRPCProtocolVersion.versionID,
- masterLocation.getInetSocketAddress(), this.conf);
+ masterLocation.getInetSocketAddress(), this.conf,
this.rpcTimeout);
if (tryMaster.isMasterRunning()) {
this.master = tryMaster;
@@ -936,7 +936,7 @@ public class HConnectionManager {
server = (HRegionInterface)HBaseRPC.waitForProxy(
serverInterfaceClass, HBaseRPCProtocolVersion.versionID,
regionServer.getInetSocketAddress(), this.conf,
- this.maxRPCAttempts, this.rpcTimeout);
+ this.maxRPCAttempts, this.rpcTimeout, this.rpcTimeout);
} catch (RemoteException e) {
LOG.warn("Remove exception connecting to RS", e);
throw RemoteExceptionHandler.decodeRemoteException(e);
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java?rev=1029776&r1=1029775&r2=1029776&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java Mon
Nov 1 18:01:57 2010
@@ -79,7 +79,7 @@ public class HBaseClient {
final protected long failureSleep; // Time to sleep before retry on failure.
protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
protected final boolean tcpKeepAlive; // if T then use keepalives
- protected final int pingInterval; // how often sends ping to the server in
msecs
+ protected int pingInterval; // how often sends ping to the server in msecs
protected final SocketFactory socketFactory; // how to create
sockets
private int refCount = 1;
@@ -194,7 +194,7 @@ public class HBaseClient {
private IOException closeException; // close reason
public Connection(InetSocketAddress address) throws IOException {
- this(new ConnectionId(address, null));
+ this(new ConnectionId(address, null, 0));
}
public Connection(ConnectionId remoteId) throws IOException {
@@ -245,7 +245,8 @@ public class HBaseClient {
* otherwise, throw the timeout exception.
*/
private void handleTimeout(SocketTimeoutException e) throws IOException {
- if (shouldCloseConnection.get() || !running.get()) {
+ if (shouldCloseConnection.get() || !running.get() ||
+ remoteId.rpcTimeout > 0) {
throw e;
}
sendPing();
@@ -308,6 +309,9 @@ public class HBaseClient {
this.socket.setKeepAlive(tcpKeepAlive);
// connection time out is 20s
NetUtils.connect(this.socket, remoteId.getAddress(), 20000);
+ if (remoteId.rpcTimeout > 0) {
+ pingInterval = remoteId.rpcTimeout; // overwrite pingInterval
+ }
this.socket.setSoTimeout(pingInterval);
break;
} catch (SocketTimeoutException toe) {
@@ -718,14 +722,14 @@ public class HBaseClient {
*/
public Writable call(Writable param, InetSocketAddress address)
throws IOException {
- return call(param, address, null);
+ return call(param, address, null, 0);
}
public Writable call(Writable param, InetSocketAddress addr,
- UserGroupInformation ticket)
+ UserGroupInformation ticket, int rpcTimeout)
throws IOException {
Call call = new Call(param);
- Connection connection = getConnection(addr, ticket, call);
+ Connection connection = getConnection(addr, ticket, rpcTimeout, call);
connection.sendParam(call); // send the parameter
boolean interrupted = false;
//noinspection SynchronizationOnLocalVariableOrMethodParameter
@@ -808,7 +812,7 @@ public class HBaseClient {
for (int i = 0; i < params.length; i++) {
ParallelCall call = new ParallelCall(params[i], results, i);
try {
- Connection connection = getConnection(addresses[i], null, call);
+ Connection connection = getConnection(addresses[i], null, 0, call);
connection.sendParam(call); // send each parameter
} catch (IOException e) {
// log errors
@@ -831,6 +835,7 @@ public class HBaseClient {
* pool. Connections to a given host/port are reused. */
private Connection getConnection(InetSocketAddress addr,
UserGroupInformation ticket,
+ int rpcTimeout,
Call call)
throws IOException {
if (!running.get()) {
@@ -842,7 +847,7 @@ public class HBaseClient {
* connectionsId object and with set() method. We need to manage the
* refs for keys in HashMap properly. For now its ok.
*/
- ConnectionId remoteId = new ConnectionId(addr, ticket);
+ ConnectionId remoteId = new ConnectionId(addr, ticket, rpcTimeout);
do {
synchronized (connections) {
connection = connections.get(remoteId);
@@ -868,10 +873,13 @@ public class HBaseClient {
private static class ConnectionId {
final InetSocketAddress address;
final UserGroupInformation ticket;
+ final private int rpcTimeout;
- ConnectionId(InetSocketAddress address, UserGroupInformation ticket) {
+ ConnectionId(InetSocketAddress address, UserGroupInformation ticket,
+ int rpcTimeout) {
this.address = address;
this.ticket = ticket;
+ this.rpcTimeout = rpcTimeout;
}
InetSocketAddress getAddress() {
@@ -885,7 +893,8 @@ public class HBaseClient {
public boolean equals(Object obj) {
if (obj instanceof ConnectionId) {
ConnectionId id = (ConnectionId) obj;
- return address.equals(id.address) && ticket == id.ticket;
+ return address.equals(id.address) && ticket == id.ticket &&
+ rpcTimeout == id.rpcTimeout;
//Note : ticket is a ref comparision.
}
return false;
@@ -893,7 +902,7 @@ public class HBaseClient {
@Override
public int hashCode() {
- return address.hashCode() ^ System.identityHashCode(ticket);
+ return address.hashCode() ^ System.identityHashCode(ticket) ^ rpcTimeout;
}
}
}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java?rev=1029776&r1=1029775&r2=1029776&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java Mon Nov
1 18:01:57 2010
@@ -231,6 +231,7 @@ public class HBaseRPC {
private UserGroupInformation ticket;
private HBaseClient client;
private boolean isClosed = false;
+ final private int rpcTimeout;
/**
* @param address address for invoker
@@ -239,10 +240,11 @@ public class HBaseRPC {
* @param factory socket factory
*/
public Invoker(InetSocketAddress address, UserGroupInformation ticket,
- Configuration conf, SocketFactory factory) {
+ Configuration conf, SocketFactory factory, int rpcTimeout) {
this.address = address;
this.ticket = ticket;
this.client = CLIENTS.getClient(conf, factory);
+ this.rpcTimeout = rpcTimeout;
}
public Object invoke(Object proxy, Method method, Object[] args)
@@ -253,7 +255,7 @@ public class HBaseRPC {
startTime = System.currentTimeMillis();
}
HbaseObjectWritable value = (HbaseObjectWritable)
- client.call(new Invocation(method, args), address, ticket);
+ client.call(new Invocation(method, args), address, ticket, rpcTimeout);
if (logDebug) {
long callTime = System.currentTimeMillis() - startTime;
LOG.debug("Call: " + method.getName() + " " + callTime);
@@ -324,6 +326,7 @@ public class HBaseRPC {
* @param addr address of remote service
* @param conf configuration
* @param maxAttempts max attempts
+ * @param rpcTimeout timeout for each RPC
* @param timeout timeout in milliseconds
* @return proxy
* @throws IOException e
@@ -334,6 +337,7 @@ public class HBaseRPC {
InetSocketAddress addr,
Configuration conf,
int maxAttempts,
+ int rpcTimeout,
long timeout
) throws IOException {
// HBase does limited number of reconnects which is different from hadoop.
@@ -342,7 +346,7 @@ public class HBaseRPC {
int reconnectAttempts = 0;
while (true) {
try {
- return getProxy(protocol, clientVersion, addr, conf);
+ return getProxy(protocol, clientVersion, addr, conf, rpcTimeout);
} catch(ConnectException se) { // namenode has not been started
ioe = se;
if (maxAttempts >= 0 && ++reconnectAttempts >= maxAttempts) {
@@ -379,13 +383,15 @@ public class HBaseRPC {
* @param addr remote address
* @param conf configuration
* @param factory socket factory
+ * @param rpcTimeout timeout for each RPC
* @return proxy
* @throws IOException e
*/
public static VersionedProtocol getProxy(Class<?> protocol,
long clientVersion, InetSocketAddress addr, Configuration conf,
- SocketFactory factory) throws IOException {
- return getProxy(protocol, clientVersion, addr, null, conf, factory);
+ SocketFactory factory, int rpcTimeout) throws IOException {
+ return getProxy(protocol, clientVersion, addr, null, conf, factory,
+ rpcTimeout);
}
/**
@@ -398,17 +404,18 @@ public class HBaseRPC {
* @param ticket ticket
* @param conf configuration
* @param factory socket factory
+ * @param rpcTimeout timeout for each RPC
* @return proxy
* @throws IOException e
*/
public static VersionedProtocol getProxy(Class<?> protocol,
long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,
- Configuration conf, SocketFactory factory)
+ Configuration conf, SocketFactory factory, int rpcTimeout)
throws IOException {
VersionedProtocol proxy =
(VersionedProtocol) Proxy.newProxyInstance(
protocol.getClassLoader(), new Class[] { protocol },
- new Invoker(addr, ticket, conf, factory));
+ new Invoker(addr, ticket, conf, factory, rpcTimeout));
long serverVersion = proxy.getProtocolVersion(protocol.getName(),
clientVersion);
if (serverVersion == clientVersion) {
@@ -425,15 +432,17 @@ public class HBaseRPC {
* @param clientVersion version we are expecting
* @param addr remote address
* @param conf configuration
+ * @param rpcTimeout timeout for each RPC
* @return a proxy instance
* @throws IOException e
*/
public static VersionedProtocol getProxy(Class<?> protocol,
- long clientVersion, InetSocketAddress addr, Configuration conf)
+ long clientVersion, InetSocketAddress addr, Configuration conf,
+ int rpcTimeout)
throws IOException {
return getProxy(protocol, clientVersion, addr, conf, NetUtils
- .getDefaultSocketFactory(conf));
+ .getDefaultSocketFactory(conf), rpcTimeout);
}
/**
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1029776&r1=1029775&r2=1029776&view=diff
==============================================================================
---
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
(original)
+++
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
Mon Nov 1 18:01:57 2010
@@ -240,7 +240,7 @@ public class HRegionServer implements HR
// A sleeper that sleeps for msgInterval.
private final Sleeper sleeper;
- private final long rpcTimeout;
+ private final int rpcTimeout;
// The main region server thread.
@SuppressWarnings("unused")
@@ -292,9 +292,9 @@ public class HRegionServer implements HR
this.numRegionsToReport = conf.getInt(
"hbase.regionserver.numregionstoreport", 10);
- this.rpcTimeout = conf.getLong(
- HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
- HConstants.DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD);
+ this.rpcTimeout = conf.getInt(
+ HConstants.HBASE_RPC_TIMEOUT_KEY,
+ HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
this.abortRequested = false;
this.stopped = false;
@@ -1363,7 +1363,7 @@ public class HRegionServer implements HR
master = (HMasterRegionInterface) HBaseRPC.waitForProxy(
HMasterRegionInterface.class, HBaseRPCProtocolVersion.versionID,
masterAddress.getInetSocketAddress(), this.conf, -1,
- this.rpcTimeout);
+ this.rpcTimeout, this.rpcTimeout);
} catch (IOException e) {
LOG.warn("Unable to connect to master. Retrying. Error was:", e);
sleeper.sleep();