Author: liyin Date: Thu May 16 19:20:12 2013 New Revision: 1483520 URL: http://svn.apache.org/r1483520 Log: [HBASE-8524] [0.89-fb] Fix indefinite wait() in HBaseClient
Author: aaiyer Summary: HBaseClient threads wait() indefinitely on the Connection to get a response from the server. However, if the server does not respond (for whatever reasons) the client could be waiting forever. Moreover, it will be difficult to guarantee rpcTimeout settings if we wait forever. Changing the indefinite wait to a loop of timed waits; Bailing out if we wait for longer than the rpcTimeout. Test Plan: mvn tests TBD: try to get this on ods and re-pro with TF Reviewers: liyintang, rshroff, shaneh Reviewed By: liyintang CC: hbase-eng@, mbm, mycnyc Differential Revision: https://phabricator.fb.com/D806604 Task ID: 2362485 Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java?rev=1483520&r1=1483519&r2=1483520&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java Thu May 16 19:20:12 2013 @@ -68,6 +68,7 @@ import org.apache.hadoop.ipc.RemoteExcep import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.HasThread; /** A client for an IPC service. IPC calls take a single {@link Writable} as a @@ -107,6 +108,13 @@ public class HBaseClient { protected final SocketFactory socketFactory; // how to create sockets + // To avoid indefinite wait, an outstanding RPC will wait in a loop + // and check every min {defaultWaitTime, rpcTimeout}; until the + // rpcTimeout is hit. + final private static String RPC_POLL_INTERVAL_NAME = "rpc.poll.interval"; + final private static int DEFAULT_RPC_POLL_INTERVAL = 50; + private int defaultWaitTime; + final private static String PING_INTERVAL_NAME = "ipc.ping.interval"; final static int DEFAULT_PING_INTERVAL = 60000; // 1 min final static int PING_CALL_ID = -1; @@ -792,6 +800,7 @@ public class HBaseClient { this.socketFactory = factory; this.connectionTimeOutMillSec = conf.getInt("hbase.client.connection.timeout.millsec", 5000); + this.defaultWaitTime = conf.getInt(RPC_POLL_INTERVAL_NAME, DEFAULT_RPC_POLL_INTERVAL); this.numConnectionsPerServer = conf.getInt(NUM_CONNECTIONS_PER_SERVER, DEFAULT_NUM_CONNECTIONS_PER_SERVER); LOG.debug("Created a new HBaseClient with " + numConnectionsPerServer + @@ -865,14 +874,24 @@ public class HBaseClient { connection.sendParam(call); // send the parameter boolean interrupted = false; //noinspection SynchronizationOnLocalVariableOrMethodParameter + long startTime = EnvironmentEdgeManager.currentTimeMillis(); + long waitPeriod = rpcTimeout > 0 ? rpcTimeout : defaultWaitTime; synchronized (call) { while (!call.done) { try { - call.wait(); // wait for the result + call.wait(waitPeriod); // wait for the result } catch (InterruptedException ignored) { // save the fact that we were interrupted interrupted = true; } + + if (rpcTimeout > 0 && + !call.done + && EnvironmentEdgeManager.currentTimeMillis() > startTime + rpcTimeout) { + String msg = "Waited for " + rpcTimeout + " Call " + call.toString() + " still not complete"; + LOG.warn(msg); + call.setException(new InterruptedIOException(msg)); + } } if (interrupted) {
