Author: stack
Date: Wed May 25 20:50:32 2011
New Revision: 1127680
URL: http://svn.apache.org/viewvc?rev=1127680&view=rev
Log:
HBASE-2937 Facilitate Timeouts In HBase Client
Modified:
hbase/trunk/CHANGES.txt
hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/PoolMap.java
Modified: hbase/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1127680&r1=1127679&r2=1127680&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Wed May 25 20:50:32 2011
@@ -231,6 +231,7 @@ Release 0.91.0 - Unreleased
HBASE-3811 Allow adding attributes to Scan (Alex Baranau)
HBASE-3841 HTable and HTableInterface docs are inconsistent with
one another (Harsh J Chouraria)
+ HBASE-2937 Facilitate Timeouts In HBase Client (Karthick Sankarachary)
TASKS
HBASE-3559 Move report of split to master OFF the heartbeat channel
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1127680&r1=1127679&r2=1127680&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java Wed May
25 20:50:32 2011
@@ -155,6 +155,12 @@ public final class HConstants {
/** Parameter name for HBase client IPC pool size */
public static final String HBASE_CLIENT_IPC_POOL_SIZE =
"hbase.client.ipc.pool.size";
+ /** Parameter name for HBase client operation timeout, which overrides RPC
timeout */
+ public static final String HBASE_CLIENT_OPERATION_TIMEOUT =
"hbase.client.operation.timeout";
+
+ /** Default HBase client operation timeout, which is tantamount to a
blocking call */
+ public static final int DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT =
Integer.MAX_VALUE;
+
/** Used to construct the name of the log directory for a region server
* Use '.' as a special character to seperate the log files from table data
*/
public static final String HREGION_LOGDIR_NAME = ".logs";
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1127680&r1=1127679&r2=1127680&view=diff
==============================================================================
---
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
(original)
+++
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
Wed May 25 20:50:32 2011
@@ -1269,14 +1269,18 @@ public class HConnectionManager {
for(int tries = 0; tries < numRetries; tries++) {
try {
callable.instantiateServer(tries != 0);
+ callable.beforeCall();
return callable.call();
} catch (Throwable t) {
+ callable.shouldRetry(t);
t = translateException(t);
exceptions.add(t);
if (tries == numRetries - 1) {
throw new RetriesExhaustedException(callable.getServerName(),
callable.getRegionName(), callable.getRow(), tries,
exceptions);
}
+ } finally {
+ callable.afterCall();
}
try {
Thread.sleep(getPauseTime(tries));
@@ -1292,6 +1296,7 @@ public class HConnectionManager {
throws IOException, RuntimeException {
try {
callable.instantiateServer(false);
+ callable.beforeCall();
return callable.call();
} catch (Throwable t) {
Throwable t2 = translateException(t);
@@ -1300,6 +1305,8 @@ public class HConnectionManager {
} else {
throw new RuntimeException(t2);
}
+ } finally {
+ callable.afterCall();
}
}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1127680&r1=1127679&r2=1127680&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java Wed
May 25 20:50:32 2011
@@ -111,6 +111,7 @@ public class HTable implements HTableInt
private ExecutorService pool; // For Multi
private long maxScannerResultSize;
private boolean closed;
+ private int operationTimeout;
/**
* Creates an object to access a HBase table.
@@ -180,6 +181,9 @@ public class HTable implements HTableInt
this.connection = HConnectionManager.getConnection(conf);
this.scannerTimeout =
(int) conf.getLong(HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
HConstants.DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD);
+ this.operationTimeout = HTableDescriptor.isMetaTable(tableName) ?
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT
+ : conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
+ HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
this.configuration = conf;
this.connection.locateRegion(tableName, HConstants.EMPTY_START_ROW);
this.writeBufferSize = conf.getLong("hbase.client.write.buffer", 2097152);
@@ -549,7 +553,7 @@ public class HTable implements HTableInt
public Result getRowOrBefore(final byte[] row, final byte[] family)
throws IOException {
return connection.getRegionServerWithRetries(
- new ServerCallable<Result>(connection, tableName, row) {
+ new ServerCallable<Result>(connection, tableName, row,
operationTimeout) {
public Result call() throws IOException {
return
server.getClosestRowBefore(location.getRegionInfo().getRegionName(),
row, family);
@@ -594,7 +598,7 @@ public class HTable implements HTableInt
@Override
public Result get(final Get get) throws IOException {
return connection.getRegionServerWithRetries(
- new ServerCallable<Result>(connection, tableName, get.getRow()) {
+ new ServerCallable<Result>(connection, tableName, get.getRow(),
operationTimeout) {
public Result call() throws IOException {
return server.get(location.getRegionInfo().getRegionName(), get);
}
@@ -650,7 +654,7 @@ public class HTable implements HTableInt
public void delete(final Delete delete)
throws IOException {
connection.getRegionServerWithRetries(
- new ServerCallable<Boolean>(connection, tableName, delete.getRow()) {
+ new ServerCallable<Boolean>(connection, tableName, delete.getRow(),
operationTimeout) {
public Boolean call() throws IOException {
server.delete(location.getRegionInfo().getRegionName(), delete);
return null; // FindBugs NP_BOOLEAN_RETURN_NULL
@@ -720,7 +724,7 @@ public class HTable implements HTableInt
"Invalid arguments to increment, no columns specified");
}
return connection.getRegionServerWithRetries(
- new ServerCallable<Result>(connection, tableName, increment.getRow()) {
+ new ServerCallable<Result>(connection, tableName, increment.getRow(),
operationTimeout) {
public Result call() throws IOException {
return server.increment(
location.getRegionInfo().getRegionName(), increment);
@@ -757,7 +761,7 @@ public class HTable implements HTableInt
"Invalid arguments to incrementColumnValue", npe);
}
return connection.getRegionServerWithRetries(
- new ServerCallable<Long>(connection, tableName, row) {
+ new ServerCallable<Long>(connection, tableName, row, operationTimeout)
{
public Long call() throws IOException {
return server.incrementColumnValue(
location.getRegionInfo().getRegionName(), row, family,
@@ -776,7 +780,7 @@ public class HTable implements HTableInt
final Put put)
throws IOException {
return connection.getRegionServerWithRetries(
- new ServerCallable<Boolean>(connection, tableName, row) {
+ new ServerCallable<Boolean>(connection, tableName, row,
operationTimeout) {
public Boolean call() throws IOException {
return server.checkAndPut(location.getRegionInfo().getRegionName(),
row, family, qualifier, value, put) ? Boolean.TRUE :
Boolean.FALSE;
@@ -795,7 +799,7 @@ public class HTable implements HTableInt
final Delete delete)
throws IOException {
return connection.getRegionServerWithRetries(
- new ServerCallable<Boolean>(connection, tableName, row) {
+ new ServerCallable<Boolean>(connection, tableName, row,
operationTimeout) {
public Boolean call() throws IOException {
return server.checkAndDelete(
location.getRegionInfo().getRegionName(),
@@ -812,7 +816,7 @@ public class HTable implements HTableInt
@Override
public boolean exists(final Get get) throws IOException {
return connection.getRegionServerWithRetries(
- new ServerCallable<Boolean>(connection, tableName, get.getRow()) {
+ new ServerCallable<Boolean>(connection, tableName, get.getRow(),
operationTimeout) {
public Boolean call() throws IOException {
return server.
exists(location.getRegionInfo().getRegionName(), get);
@@ -876,7 +880,7 @@ public class HTable implements HTableInt
public RowLock lockRow(final byte [] row)
throws IOException {
return connection.getRegionServerWithRetries(
- new ServerCallable<RowLock>(connection, tableName, row) {
+ new ServerCallable<RowLock>(connection, tableName, row,
operationTimeout) {
public RowLock call() throws IOException {
long lockId =
server.lockRow(location.getRegionInfo().getRegionName(), row);
@@ -893,7 +897,7 @@ public class HTable implements HTableInt
public void unlockRow(final RowLock rl)
throws IOException {
connection.getRegionServerWithRetries(
- new ServerCallable<Boolean>(connection, tableName, rl.getRow()) {
+ new ServerCallable<Boolean>(connection, tableName, rl.getRow(),
operationTimeout) {
public Boolean call() throws IOException {
server.unlockRow(location.getRegionInfo().getRegionName(),
rl.getLockId());
@@ -1477,4 +1481,12 @@ public class HTable implements HTableInt
return rangeKeys;
}
+ public void setOperationTimeout(int operationTimeout) {
+ this.operationTimeout = operationTimeout;
+ }
+
+ public int getOperationTimeout() {
+ return operationTimeout;
+ }
+
}
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java?rev=1127680&r1=1127679&r2=1127680&view=diff
==============================================================================
---
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java
(original)
+++
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java
Wed May 25 20:50:32 2011
@@ -20,10 +20,17 @@
package org.apache.hadoop.hbase.client;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.retry.RetryPolicy;
import java.io.IOException;
+import java.net.SocketTimeoutException;
import java.util.concurrent.Callable;
/**
@@ -36,6 +43,8 @@ public abstract class ServerCallable<T>
protected final byte [] row;
protected HRegionLocation location;
protected HRegionInterface server;
+ protected int callTimeout;
+ protected long startTime, endTime;
/**
* @param connection connection callable is on
@@ -43,11 +52,15 @@ public abstract class ServerCallable<T>
* @param row row we are querying
*/
public ServerCallable(HConnection connection, byte [] tableName, byte []
row) {
+ this(connection, tableName, row,
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
+ }
+
+ public ServerCallable(HConnection connection, byte [] tableName, byte []
row, int callTimeout) {
this.connection = connection;
this.tableName = tableName;
this.row = row;
+ this.callTimeout = callTimeout;
}
-
/**
*
* @param reload set this to true if connection should re-find the region
@@ -78,4 +91,28 @@ public abstract class ServerCallable<T>
public byte [] getRow() {
return row;
}
+
+ public void beforeCall() {
+ HBaseRPC.setRpcTimeout(this.callTimeout);
+ this.startTime = System.currentTimeMillis();
+ }
+
+ public void afterCall() {
+ HBaseRPC.resetRpcTimeout();
+ this.endTime = System.currentTimeMillis();
+ }
+
+ public void shouldRetry(Throwable throwable) throws IOException {
+ if (this.callTimeout != HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT)
+ if (throwable instanceof SocketTimeoutException
+ || (this.endTime - this.startTime > this.callTimeout)) {
+ throw (SocketTimeoutException) (SocketTimeoutException) new
SocketTimeoutException(
+ "Call to access row '" + Bytes.toString(row) + "' on table '"
+ + Bytes.toString(tableName)
+ + "' failed on socket timeout exception: " + throwable)
+ .initCause(throwable);
+ } else {
+ this.callTimeout = ((int) (this.endTime - this.startTime));
+ }
+ }
}
\ No newline at end of file
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java?rev=1127680&r1=1127679&r2=1127680&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java Wed
May 25 20:50:32 2011
@@ -36,6 +36,7 @@ import java.util.Hashtable;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -72,7 +73,7 @@ public class HBaseClient {
private static final Log LOG =
LogFactory.getLog("org.apache.hadoop.ipc.HBaseClient");
- protected final Map<ConnectionId, Connection> connections;
+ protected final PoolMap<ConnectionId, Connection> connections;
protected final Class<? extends Writable> valueClass; // class of call
values
protected int counter; // counter for call ids
@@ -560,9 +561,25 @@ public class HBaseClient {
} else {
Writable value = ReflectionUtils.newInstance(valueClass, conf);
value.readFields(in); // read value
- call.setValue(value);
+ // it's possible that this call may have been cleaned up due to a RPC
+ // timeout, so check if it still exists before setting the value.
+ if (call != null) {
+ call.setValue(value);
+ }
calls.remove(id);
}
+ } catch (SocketTimeoutException ste) {
+ if (remoteId.rpcTimeout > 0) {
+ // Clean up open calls but don't treat this as a fatal condition,
+ // since we expect certain responses to not make it by the specified
+ // {@link ConnectionId#rpcTimeout}.
+ closeException = ste;
+ cleanupCalls();
+ } else {
+ // Since the server did not respond within the default ping interval
+ // time, treat this as a fatal condition and close this connection
+ markClosed(ste);
+ }
} catch (IOException e) {
markClosed(e);
}
@@ -585,9 +602,7 @@ public class HBaseClient {
// release the resources
// first thing to do;take the connection out of the connection list
synchronized (connections) {
- if (connections.get(remoteId) == this) {
- connections.remove(remoteId);
- }
+ connections.remove(remoteId, this);
}
// close the streams and therefore the socket
@@ -624,6 +639,10 @@ public class HBaseClient {
while (itor.hasNext()) {
Call c = itor.next().getValue();
c.setException(closeException); // local exception
+ // Notify the open calls, so they are aware of what just happened
+ synchronized (c) {
+ c.notifyAll();
+ }
itor.remove();
}
}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java?rev=1127680&r1=1127679&r2=1127680&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java Wed May
25 20:50:32 2011
@@ -24,6 +24,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.VersionedProtocol;
@@ -87,6 +88,14 @@ public class HBaseRPC {
private static final Map<Class,RpcEngine> PROXY_ENGINES
= new HashMap<Class,RpcEngine>();
+ // thread-specific RPC timeout, which may override that of RpcEngine
+ private static ThreadLocal<Integer> rpcTimeout = new ThreadLocal<Integer>() {
+ @Override
+ protected Integer initialValue() {
+ return HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT;
+ }
+ };
+
// set a protocol to use a non-default RpcEngine
static void setProtocolEngine(Configuration conf,
Class protocol, Class engine) {
@@ -285,7 +294,7 @@ public class HBaseRPC {
throws IOException {
VersionedProtocol proxy =
getProtocolEngine(protocol,conf)
- .getProxy(protocol, clientVersion, addr, ticket, conf, factory,
rpcTimeout);
+ .getProxy(protocol, clientVersion, addr, ticket, conf, factory,
Math.min(rpcTimeout, HBaseRPC.getRpcTimeout()));
long serverVersion = proxy.getProtocolVersion(protocol.getName(),
clientVersion);
if (serverVersion == clientVersion) {
@@ -379,4 +388,16 @@ public class HBaseRPC {
return getProtocolEngine(protocol, conf)
.getServer(protocol, instance, ifaces, bindAddress, port, numHandlers,
metaHandlerCount, verbose, conf, highPriorityLevel);
}
+
+ public static void setRpcTimeout(int rpcTimeout) {
+ HBaseRPC.rpcTimeout.set(rpcTimeout);
+ }
+
+ public static int getRpcTimeout() {
+ return HBaseRPC.rpcTimeout.get();
+ }
+
+ public static void resetRpcTimeout() {
+ HBaseRPC.rpcTimeout.remove();
+ }
}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/PoolMap.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/PoolMap.java?rev=1127680&r1=1127679&r2=1127680&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/PoolMap.java
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/PoolMap.java Wed May
25 20:50:32 2011
@@ -77,11 +77,12 @@ public class PoolMap<K, V> implements Ma
return pool != null ? pool.put(value) : null;
}
+ @SuppressWarnings("unchecked")
@Override
public V remove(Object key) {
Pool<V> pool = pools.remove(key);
if (pool != null) {
- pool.clear();
+ remove((K) key, pool.get());
}
return null;
}