Author: szetszwo Date: Tue Jun 12 05:27:15 2012 New Revision: 1349124 URL: http://svn.apache.org/viewvc?rev=1349124&view=rev Log: HDFS-3504. Support configurable retry policy in DFSClient for RPC connections and RPC calls, and add MultipleLinearRandomRetry, a new retry policy.
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicy.java hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryProxy.java hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RemoteException.java hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java?rev=1349124&r1=1349123&r2=1349124&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java (original) +++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java Tue Jun 12 05:27:15 2012 @@ -33,7 +33,7 @@ import org.apache.hadoop.ipc.RpcInvocati class RetryInvocationHandler implements RpcInvocationHandler { public static final Log LOG = LogFactory.getLog(RetryInvocationHandler.class); - private FailoverProxyProvider proxyProvider; + private final FailoverProxyProvider proxyProvider; /** * The number of times the associated proxyProvider has ever been failed over. @@ -41,26 +41,25 @@ class RetryInvocationHandler implements private long proxyProviderFailoverCount = 0; private volatile boolean hasMadeASuccessfulCall = false; - private RetryPolicy defaultPolicy; - private Map<String,RetryPolicy> methodNameToPolicyMap; + private final RetryPolicy defaultPolicy; + private final Map<String,RetryPolicy> methodNameToPolicyMap; private Object currentProxy; public RetryInvocationHandler(FailoverProxyProvider proxyProvider, RetryPolicy retryPolicy) { - this.proxyProvider = proxyProvider; - this.defaultPolicy = retryPolicy; - this.methodNameToPolicyMap = Collections.emptyMap(); - this.currentProxy = proxyProvider.getProxy(); + this(proxyProvider, retryPolicy, Collections.<String, RetryPolicy>emptyMap()); } - + public RetryInvocationHandler(FailoverProxyProvider proxyProvider, + RetryPolicy defaultPolicy, Map<String, RetryPolicy> methodNameToPolicyMap) { this.proxyProvider = proxyProvider; - this.defaultPolicy = RetryPolicies.TRY_ONCE_THEN_FAIL; + this.defaultPolicy = defaultPolicy; this.methodNameToPolicyMap = methodNameToPolicyMap; this.currentProxy = proxyProvider.getProxy(); } + @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { RetryPolicy policy = methodNameToPolicyMap.get(method.getName()); Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java?rev=1349124&r1=1349123&r2=1349124&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java (original) +++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java Tue Jun 12 05:27:15 2012 @@ -22,10 +22,13 @@ import java.net.ConnectException; import java.net.NoRouteToHostException; import java.net.SocketException; import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; -import java.util.Random; import java.util.Map.Entry; +import java.util.Random; import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; @@ -33,8 +36,6 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.StandbyException; -import com.google.common.annotations.VisibleForTesting; - /** * <p> * A collection of useful implementations of {@link RetryPolicy}. @@ -44,7 +45,12 @@ public class RetryPolicies { public static final Log LOG = LogFactory.getLog(RetryPolicies.class); - private static final Random RAND = new Random(); + private static ThreadLocal<Random> RANDOM = new ThreadLocal<Random>() { + @Override + protected Random initialValue() { + return new Random(); + } + }; /** * <p> @@ -157,17 +163,35 @@ public class RetryPolicies { } } + /** + * Retry up to maxRetries. + * The actual sleep time of the n-th retry is f(n, sleepTime), + * where f is a function provided by the subclass implementation. + * + * The object of the subclasses should be immutable; + * otherwise, the subclass must override hashCode(), equals(..) and toString(). + */ static abstract class RetryLimited implements RetryPolicy { - int maxRetries; - long sleepTime; - TimeUnit timeUnit; - - public RetryLimited(int maxRetries, long sleepTime, TimeUnit timeUnit) { + final int maxRetries; + final long sleepTime; + final TimeUnit timeUnit; + + private String myString; + + RetryLimited(int maxRetries, long sleepTime, TimeUnit timeUnit) { + if (maxRetries < 0) { + throw new IllegalArgumentException("maxRetries = " + maxRetries+" < 0"); + } + if (sleepTime < 0) { + throw new IllegalArgumentException("sleepTime = " + sleepTime + " < 0"); + } + this.maxRetries = maxRetries; this.sleepTime = sleepTime; this.timeUnit = timeUnit; } + @Override public RetryAction shouldRetry(Exception e, int retries, int failovers, boolean isMethodIdempotent) throws Exception { if (retries >= maxRetries) { @@ -178,6 +202,30 @@ public class RetryPolicies { } protected abstract long calculateSleepTime(int retries); + + @Override + public int hashCode() { + return toString().hashCode(); + } + + @Override + public boolean equals(final Object that) { + if (this == that) { + return true; + } else if (that == null || this.getClass() != that.getClass()) { + return false; + } + return this.toString().equals(that.toString()); + } + + @Override + public String toString() { + if (myString == null) { + myString = getClass().getSimpleName() + "(maxRetries=" + maxRetries + + ", sleepTime=" + sleepTime + " " + timeUnit + ")"; + } + return myString; + } } static class RetryUpToMaximumCountWithFixedSleep extends RetryLimited { @@ -208,6 +256,169 @@ public class RetryPolicies { } } + /** + * Given pairs of number of retries and sleep time (n0, t0), (n1, t1), ..., + * the first n0 retries sleep t0 milliseconds on average, + * the following n1 retries sleep t1 milliseconds on average, and so on. + * + * For all the sleep, the actual sleep time is randomly uniform distributed + * in the close interval [0.5t, 1.5t], where t is the sleep time specified. + * + * The objects of this class are immutable. + */ + public static class MultipleLinearRandomRetry implements RetryPolicy { + /** Pairs of numRetries and sleepSeconds */ + public static class Pair { + final int numRetries; + final int sleepMillis; + + public Pair(final int numRetries, final int sleepMillis) { + if (numRetries < 0) { + throw new IllegalArgumentException("numRetries = " + numRetries+" < 0"); + } + if (sleepMillis < 0) { + throw new IllegalArgumentException("sleepMillis = " + sleepMillis + " < 0"); + } + + this.numRetries = numRetries; + this.sleepMillis = sleepMillis; + } + + @Override + public String toString() { + return numRetries + "x" + sleepMillis + "ms"; + } + } + + private final List<Pair> pairs; + private String myString; + + public MultipleLinearRandomRetry(List<Pair> pairs) { + if (pairs == null || pairs.isEmpty()) { + throw new IllegalArgumentException("pairs must be neither null nor empty."); + } + this.pairs = Collections.unmodifiableList(pairs); + } + + @Override + public RetryAction shouldRetry(Exception e, int curRetry, int failovers, + boolean isMethodIdempotent) throws Exception { + final Pair p = searchPair(curRetry); + if (p == null) { + //no more retries. + return RetryAction.FAIL; + } + + //calculate sleep time and return. + final double ratio = RANDOM.get().nextDouble() + 0.5;//0.5 <= ratio <=1.5 + final long sleepTime = Math.round(p.sleepMillis * ratio); + return new RetryAction(RetryAction.RetryDecision.RETRY, sleepTime); + } + + /** + * Given the current number of retry, search the corresponding pair. + * @return the corresponding pair, + * or null if the current number of retry > maximum number of retry. + */ + private Pair searchPair(int curRetry) { + int i = 0; + for(; i < pairs.size() && curRetry > pairs.get(i).numRetries; i++) { + curRetry -= pairs.get(i).numRetries; + } + return i == pairs.size()? null: pairs.get(i); + } + + @Override + public int hashCode() { + return toString().hashCode(); + } + + @Override + public boolean equals(final Object that) { + if (this == that) { + return true; + } else if (that == null || this.getClass() != that.getClass()) { + return false; + } + return this.toString().equals(that.toString()); + } + + @Override + public String toString() { + if (myString == null) { + myString = getClass().getSimpleName() + pairs; + } + return myString; + } + + /** + * Parse the given string as a MultipleLinearRandomRetry object. + * The format of the string is "t_1, n_1, t_2, n_2, ...", + * where t_i and n_i are the i-th pair of sleep time and number of retires. + * Note that the white spaces in the string are ignored. + * + * @return the parsed object, or null if the parsing fails. + */ + public static MultipleLinearRandomRetry parseCommaSeparatedString(String s) { + final String[] elements = s.split(","); + if (elements.length == 0) { + LOG.warn("Illegal value: there is no element in \"" + s + "\"."); + return null; + } + if (elements.length % 2 != 0) { + LOG.warn("Illegal value: the number of elements in \"" + s + "\" is " + + elements.length + " but an even number of elements is expected."); + return null; + } + + final List<RetryPolicies.MultipleLinearRandomRetry.Pair> pairs + = new ArrayList<RetryPolicies.MultipleLinearRandomRetry.Pair>(); + + for(int i = 0; i < elements.length; ) { + //parse the i-th sleep-time + final int sleep = parsePositiveInt(elements, i++, s); + if (sleep == -1) { + return null; //parse fails + } + + //parse the i-th number-of-retries + final int retries = parsePositiveInt(elements, i++, s); + if (retries == -1) { + return null; //parse fails + } + + pairs.add(new RetryPolicies.MultipleLinearRandomRetry.Pair(retries, sleep)); + } + return new RetryPolicies.MultipleLinearRandomRetry(pairs); + } + + /** + * Parse the i-th element as an integer. + * @return -1 if the parsing fails or the parsed value <= 0; + * otherwise, return the parsed value. + */ + private static int parsePositiveInt(final String[] elements, + final int i, final String originalString) { + final String s = elements[i].trim(); + final int n; + try { + n = Integer.parseInt(s); + } catch(NumberFormatException nfe) { + LOG.warn("Failed to parse \"" + s + "\", which is the index " + i + + " element in \"" + originalString + "\"", nfe); + return -1; + } + + if (n <= 0) { + LOG.warn("The value " + n + " <= 0: it is parsed from the string \"" + + s + "\" which is the index " + i + " element in \"" + + originalString + "\""); + return -1; + } + return n; + } + } + static class ExceptionDependentRetry implements RetryPolicy { RetryPolicy defaultPolicy; @@ -265,6 +476,14 @@ public class RetryPolicies { public ExponentialBackoffRetry( int maxRetries, long sleepTime, TimeUnit timeUnit) { super(maxRetries, sleepTime, timeUnit); + + if (maxRetries < 0) { + throw new IllegalArgumentException("maxRetries = " + maxRetries + " < 0"); + } else if (maxRetries >= Long.SIZE - 1) { + //calculateSleepTime may overflow. + throw new IllegalArgumentException("maxRetries = " + maxRetries + + " >= " + (Long.SIZE - 1)); + } } @Override @@ -353,11 +572,10 @@ public class RetryPolicies { * @param cap value at which to cap the base sleep time * @return an amount of time to sleep */ - @VisibleForTesting - public static long calculateExponentialTime(long time, int retries, + private static long calculateExponentialTime(long time, int retries, long cap) { - long baseTime = Math.min(time * ((long)1 << retries), cap); - return (long) (baseTime * (RAND.nextFloat() + 0.5)); + long baseTime = Math.min(time * (1L << retries), cap); + return (long) (baseTime * (RANDOM.get().nextDouble() + 0.5)); } private static long calculateExponentialTime(long time, int retries) { Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicy.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicy.java?rev=1349124&r1=1349123&r2=1349124&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicy.java (original) +++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicy.java Tue Jun 12 05:27:15 2012 @@ -60,6 +60,12 @@ public interface RetryPolicy { this.reason = reason; } + @Override + public String toString() { + return getClass().getSimpleName() + "(action=" + action + + ", delayMillis=" + delayMillis + ", reason=" + reason + ")"; + } + public enum RetryDecision { FAIL, RETRY, Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryProxy.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryProxy.java?rev=1349124&r1=1349123&r2=1349124&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryProxy.java (original) +++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryProxy.java Tue Jun 12 05:27:15 2012 @@ -75,9 +75,10 @@ public class RetryProxy { */ public static Object create(Class<?> iface, Object implementation, Map<String,RetryPolicy> methodNameToPolicyMap) { - return RetryProxy.create(iface, + return create(iface, new DefaultFailoverProxyProvider(iface, implementation), - methodNameToPolicyMap); + methodNameToPolicyMap, + RetryPolicies.TRY_ONCE_THEN_FAIL); } /** @@ -92,11 +93,13 @@ public class RetryProxy { * @return the retry proxy */ public static Object create(Class<?> iface, FailoverProxyProvider proxyProvider, - Map<String,RetryPolicy> methodNameToPolicyMap) { + Map<String,RetryPolicy> methodNameToPolicyMap, + RetryPolicy defaultPolicy) { return Proxy.newProxyInstance( proxyProvider.getInterface().getClassLoader(), new Class<?>[] { iface }, - new RetryInvocationHandler(proxyProvider, methodNameToPolicyMap) + new RetryInvocationHandler(proxyProvider, defaultPolicy, + methodNameToPolicyMap) ); } } Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?rev=1349124&r1=1349123&r2=1349124&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java (original) +++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java Tue Jun 12 05:27:15 2012 @@ -18,47 +18,51 @@ package org.apache.hadoop.ipc; -import java.net.InetAddress; -import java.net.Socket; -import java.net.InetSocketAddress; -import java.net.SocketTimeoutException; -import java.net.UnknownHostException; -import java.io.IOException; -import java.io.DataInputStream; -import java.io.DataOutputStream; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.FilterInputStream; +import java.io.IOException; import java.io.InputStream; +import java.io.InterruptedIOException; import java.io.OutputStream; - +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketTimeoutException; +import java.net.UnknownHostException; import java.security.PrivilegedExceptionAction; import java.util.Hashtable; import java.util.Iterator; +import java.util.Map.Entry; import java.util.Random; import java.util.Set; -import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import javax.net.SocketFactory; -import org.apache.commons.logging.*; - +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.io.retry.RetryPolicy.RetryAction; import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto; import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcPayloadHeaderProto; import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcPayloadOperationProto; import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcResponseHeaderProto; import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcStatusProto; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableUtils; -import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.KerberosInfo; import org.apache.hadoop.security.SaslRpcClient; @@ -67,8 +71,8 @@ import org.apache.hadoop.security.Securi import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.security.token.TokenSelector; import org.apache.hadoop.security.token.TokenInfo; +import org.apache.hadoop.security.token.TokenSelector; import org.apache.hadoop.util.ProtoUtil; import org.apache.hadoop.util.ReflectionUtils; @@ -80,8 +84,8 @@ import org.apache.hadoop.util.Reflection */ public class Client { - public static final Log LOG = - LogFactory.getLog(Client.class); + public static final Log LOG = LogFactory.getLog(Client.class); + private Hashtable<ConnectionId, Connection> connections = new Hashtable<ConnectionId, Connection>(); @@ -228,8 +232,7 @@ public class Client { private int rpcTimeout; private int maxIdleTime; //connections will be culled if it was idle for //maxIdleTime msecs - private int maxRetries; //the max. no. of retries for socket connections - // the max. no. of retries for socket connections on time out exceptions + private final RetryPolicy connectionRetryPolicy; private int maxRetriesOnSocketTimeouts; private boolean tcpNoDelay; // if T then disable Nagle's Algorithm private boolean doPing; //do we need to send ping message @@ -253,7 +256,7 @@ public class Client { } this.rpcTimeout = remoteId.getRpcTimeout(); this.maxIdleTime = remoteId.getMaxIdleTime(); - this.maxRetries = remoteId.getMaxRetries(); + this.connectionRetryPolicy = remoteId.connectionRetryPolicy; this.maxRetriesOnSocketTimeouts = remoteId.getMaxRetriesOnSocketTimeouts(); this.tcpNoDelay = remoteId.getTcpNoDelay(); this.doPing = remoteId.getDoPing(); @@ -488,7 +491,7 @@ public class Client { if (updateAddress()) { timeoutFailures = ioFailures = 0; } - handleConnectionFailure(ioFailures++, maxRetries, ie); + handleConnectionFailure(ioFailures++, ie); } } } @@ -680,8 +683,36 @@ public class Client { Thread.sleep(1000); } catch (InterruptedException ignored) {} - LOG.info("Retrying connect to server: " + server + - ". Already tried " + curRetries + " time(s)."); + LOG.info("Retrying connect to server: " + server + ". Already tried " + + curRetries + " time(s); maxRetries=" + maxRetries); + } + + private void handleConnectionFailure(int curRetries, IOException ioe + ) throws IOException { + closeConnection(); + + final RetryAction action; + try { + action = connectionRetryPolicy.shouldRetry(ioe, curRetries, 0, true); + } catch(Exception e) { + throw e instanceof IOException? (IOException)e: new IOException(e); + } + if (action.action == RetryAction.RetryDecision.FAIL) { + if (action.reason != null) { + LOG.warn("Failed to connect to server: " + server + ": " + + action.reason, ioe); + } + throw ioe; + } + + try { + Thread.sleep(action.delayMillis); + } catch (InterruptedException e) { + throw (IOException)new InterruptedIOException("Interrupted: action=" + + action + ", retry policy=" + connectionRetryPolicy).initCause(e); + } + LOG.info("Retrying connect to server: " + server + ". Already tried " + + curRetries + " time(s); retry policy is " + connectionRetryPolicy); } /** @@ -849,6 +880,10 @@ public class Client { try { RpcResponseHeaderProto response = RpcResponseHeaderProto.parseDelimitedFrom(in); + if (response == null) { + throw new IOException("Response is null."); + } + int callId = response.getCallId(); if (LOG.isDebugEnabled()) LOG.debug(getName() + " got value #" + callId); @@ -1287,7 +1322,7 @@ public class Client { private final String serverPrincipal; private final int maxIdleTime; //connections will be culled if it was idle for //maxIdleTime msecs - private final int maxRetries; //the max. no. of retries for socket connections + private final RetryPolicy connectionRetryPolicy; // the max. no. of retries for socket connections on time out exceptions private final int maxRetriesOnSocketTimeouts; private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm @@ -1297,7 +1332,7 @@ public class Client { ConnectionId(InetSocketAddress address, Class<?> protocol, UserGroupInformation ticket, int rpcTimeout, String serverPrincipal, int maxIdleTime, - int maxRetries, int maxRetriesOnSocketTimeouts, + RetryPolicy connectionRetryPolicy, int maxRetriesOnSocketTimeouts, boolean tcpNoDelay, boolean doPing, int pingInterval) { this.protocol = protocol; this.address = address; @@ -1305,7 +1340,7 @@ public class Client { this.rpcTimeout = rpcTimeout; this.serverPrincipal = serverPrincipal; this.maxIdleTime = maxIdleTime; - this.maxRetries = maxRetries; + this.connectionRetryPolicy = connectionRetryPolicy; this.maxRetriesOnSocketTimeouts = maxRetriesOnSocketTimeouts; this.tcpNoDelay = tcpNoDelay; this.doPing = doPing; @@ -1336,10 +1371,6 @@ public class Client { return maxIdleTime; } - int getMaxRetries() { - return maxRetries; - } - /** max connection retries on socket time outs */ public int getMaxRetriesOnSocketTimeouts() { return maxRetriesOnSocketTimeouts; @@ -1357,6 +1388,12 @@ public class Client { return pingInterval; } + static ConnectionId getConnectionId(InetSocketAddress addr, + Class<?> protocol, UserGroupInformation ticket, int rpcTimeout, + Configuration conf) throws IOException { + return getConnectionId(addr, protocol, ticket, rpcTimeout, null, conf); + } + /** * Returns a ConnectionId object. * @param addr Remote address for the connection. @@ -1367,9 +1404,18 @@ public class Client { * @return A ConnectionId instance * @throws IOException */ - public static ConnectionId getConnectionId(InetSocketAddress addr, + static ConnectionId getConnectionId(InetSocketAddress addr, Class<?> protocol, UserGroupInformation ticket, int rpcTimeout, - Configuration conf) throws IOException { + RetryPolicy connectionRetryPolicy, Configuration conf) throws IOException { + + if (connectionRetryPolicy == null) { + final int max = conf.getInt( + CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, + CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_DEFAULT); + connectionRetryPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep( + max, 1, TimeUnit.SECONDS); + } + String remotePrincipal = getRemotePrincipal(conf, addr, protocol); boolean doPing = conf.getBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, true); @@ -1377,8 +1423,7 @@ public class Client { rpcTimeout, remotePrincipal, conf.getInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_DEFAULT), - conf.getInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, - CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_DEFAULT), + connectionRetryPolicy, conf.getInt( CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT), @@ -1421,7 +1466,7 @@ public class Client { return isEqual(this.address, that.address) && this.doPing == that.doPing && this.maxIdleTime == that.maxIdleTime - && this.maxRetries == that.maxRetries + && isEqual(this.connectionRetryPolicy, that.connectionRetryPolicy) && this.pingInterval == that.pingInterval && isEqual(this.protocol, that.protocol) && this.rpcTimeout == that.rpcTimeout @@ -1434,11 +1479,10 @@ public class Client { @Override public int hashCode() { - int result = 1; + int result = connectionRetryPolicy.hashCode(); result = PRIME * result + ((address == null) ? 0 : address.hashCode()); result = PRIME * result + (doPing ? 1231 : 1237); result = PRIME * result + maxIdleTime; - result = PRIME * result + maxRetries; result = PRIME * result + pingInterval; result = PRIME * result + ((protocol == null) ? 0 : protocol.hashCode()); result = PRIME * result + rpcTimeout; Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java?rev=1349124&r1=1349123&r2=1349124&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java (original) +++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java Tue Jun 12 05:27:15 2012 @@ -36,9 +36,9 @@ import org.apache.hadoop.classification. import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DataOutputOutputStream; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.ipc.RPC.RpcInvoker; - import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcRequestProto; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.SecretManager; @@ -66,15 +66,24 @@ public class ProtobufRpcEngine implement private static final ClientCache CLIENTS = new ClientCache(); + public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion, + InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, + SocketFactory factory, int rpcTimeout) throws IOException { + return getProxy(protocol, clientVersion, addr, ticket, conf, factory, + rpcTimeout, null); + } + @Override @SuppressWarnings("unchecked") public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, - SocketFactory factory, int rpcTimeout) throws IOException { + SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy + ) throws IOException { - return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(protocol - .getClassLoader(), new Class[] { protocol }, new Invoker(protocol, - addr, ticket, conf, factory, rpcTimeout)), false); + final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory, + rpcTimeout, connectionRetryPolicy); + return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance( + protocol.getClassLoader(), new Class[]{protocol}, invoker), false); } @Override @@ -97,11 +106,12 @@ public class ProtobufRpcEngine implement private final long clientProtocolVersion; private final String protocolName; - public Invoker(Class<?> protocol, InetSocketAddress addr, + private Invoker(Class<?> protocol, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, SocketFactory factory, - int rpcTimeout) throws IOException { - this(protocol, Client.ConnectionId.getConnectionId(addr, protocol, - ticket, rpcTimeout, conf), conf, factory); + int rpcTimeout, RetryPolicy connectionRetryPolicy) throws IOException { + this(protocol, Client.ConnectionId.getConnectionId( + addr, protocol, ticket, rpcTimeout, connectionRetryPolicy, conf), + conf, factory); } /** Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java?rev=1349124&r1=1349123&r2=1349124&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java (original) +++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java Tue Jun 12 05:27:15 2012 @@ -41,6 +41,7 @@ import org.apache.commons.logging.*; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.io.*; +import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolInfoService; import org.apache.hadoop.net.NetUtils; @@ -326,7 +327,7 @@ public class RPC { long clientVersion, InetSocketAddress addr, Configuration conf, long connTimeout) throws IOException { - return waitForProtocolProxy(protocol, clientVersion, addr, conf, 0, connTimeout); + return waitForProtocolProxy(protocol, clientVersion, addr, conf, 0, null, connTimeout); } /** @@ -347,7 +348,7 @@ public class RPC { int rpcTimeout, long timeout) throws IOException { return waitForProtocolProxy(protocol, clientVersion, addr, - conf, rpcTimeout, timeout).getProxy(); + conf, rpcTimeout, null, timeout).getProxy(); } /** @@ -367,6 +368,7 @@ public class RPC { long clientVersion, InetSocketAddress addr, Configuration conf, int rpcTimeout, + RetryPolicy connectionRetryPolicy, long timeout) throws IOException { long startTime = System.currentTimeMillis(); IOException ioe; @@ -374,7 +376,7 @@ public class RPC { try { return getProtocolProxy(protocol, clientVersion, addr, UserGroupInformation.getCurrentUser(), conf, NetUtils - .getDefaultSocketFactory(conf), rpcTimeout); + .getDefaultSocketFactory(conf), rpcTimeout, connectionRetryPolicy); } catch(ConnectException se) { // namenode has not been started LOG.info("Server at " + addr + " not available yet, Zzzzz..."); ioe = se; @@ -463,7 +465,7 @@ public class RPC { Configuration conf, SocketFactory factory) throws IOException { return getProtocolProxy( - protocol, clientVersion, addr, ticket, conf, factory, 0); + protocol, clientVersion, addr, ticket, conf, factory, 0, null); } /** @@ -489,7 +491,7 @@ public class RPC { SocketFactory factory, int rpcTimeout) throws IOException { return getProtocolProxy(protocol, clientVersion, addr, ticket, - conf, factory, rpcTimeout).getProxy(); + conf, factory, rpcTimeout, null).getProxy(); } /** @@ -512,12 +514,13 @@ public class RPC { UserGroupInformation ticket, Configuration conf, SocketFactory factory, - int rpcTimeout) throws IOException { + int rpcTimeout, + RetryPolicy connectionRetryPolicy) throws IOException { if (UserGroupInformation.isSecurityEnabled()) { SaslRpcServer.init(conf); } - return getProtocolEngine(protocol,conf).getProxy(protocol, - clientVersion, addr, ticket, conf, factory, rpcTimeout); + return getProtocolEngine(protocol,conf).getProxy(protocol, clientVersion, + addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy); } /** Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RemoteException.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RemoteException.java?rev=1349124&r1=1349123&r2=1349124&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RemoteException.java (original) +++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RemoteException.java Tue Jun 12 05:27:15 2012 @@ -97,8 +97,9 @@ public class RemoteException extends IOE return new RemoteException(attrs.getValue("class"), attrs.getValue("message")); } - + + @Override public String toString() { - return className + ": " + getMessage(); + return getClass().getName() + "(" + className + "): " + getMessage(); } } Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java?rev=1349124&r1=1349123&r2=1349124&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java (original) +++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java Tue Jun 12 05:27:15 2012 @@ -26,6 +26,7 @@ import javax.net.SocketFactory; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.SecretManager; @@ -40,7 +41,8 @@ public interface RpcEngine { <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, - SocketFactory factory, int rpcTimeout) throws IOException; + SocketFactory factory, int rpcTimeout, + RetryPolicy connectionRetryPolicy) throws IOException; /** Expert: Make multiple, parallel calls to a set of servers. */ Object[] call(Method method, Object[][] params, InetSocketAddress[] addrs, Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java?rev=1349124&r1=1349123&r2=1349124&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java (original) +++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java Tue Jun 12 05:27:15 2012 @@ -31,6 +31,7 @@ import javax.net.SocketFactory; import org.apache.commons.logging.*; import org.apache.hadoop.io.*; +import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.ipc.RPC.RpcInvoker; import org.apache.hadoop.ipc.VersionedProtocol; @@ -259,9 +260,14 @@ public class WritableRpcEngine implement public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, SocketFactory factory, - int rpcTimeout) + int rpcTimeout, RetryPolicy connectionRetryPolicy) throws IOException { + if (connectionRetryPolicy != null) { + throw new UnsupportedOperationException( + "Not supported: connectionRetryPolicy=" + connectionRetryPolicy); + } + T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(), new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout)); Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java?rev=1349124&r1=1349123&r2=1349124&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java (original) +++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java Tue Jun 12 05:27:15 2012 @@ -18,50 +18,55 @@ package org.apache.hadoop.ipc; +import static org.apache.hadoop.test.MetricsAsserts.assertCounter; +import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt; +import static org.apache.hadoop.test.MetricsAsserts.getMetrics; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + import java.io.Closeable; import java.io.IOException; -import java.net.ConnectException; -import java.net.InetAddress; -import java.net.InetSocketAddress; import java.lang.management.ManagementFactory; import java.lang.management.ThreadInfo; import java.lang.management.ThreadMXBean; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; +import java.net.ConnectException; +import java.net.InetAddress; +import java.net.InetSocketAddress; import java.util.Arrays; import javax.net.SocketFactory; -import org.apache.commons.logging.*; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.io.UTF8; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryProxy; import org.apache.hadoop.ipc.Client.ConnectionId; -import org.apache.hadoop.ipc.TestSaslRPC.TestSaslImpl; -import org.apache.hadoop.ipc.TestSaslRPC.TestSaslProtocol; import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AuthorizationException; import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.security.authorize.Service; import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.security.AccessControlException; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.test.MockitoUtil; import org.junit.Test; -import static org.junit.Assert.*; import com.google.protobuf.DescriptorProtos; import com.google.protobuf.DescriptorProtos.EnumDescriptorProto; -import static org.apache.hadoop.test.MetricsAsserts.*; - /** Unit tests for RPC. */ @SuppressWarnings("deprecation") public class TestRPC { @@ -250,7 +255,8 @@ public class TestRPC { @Override public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, - SocketFactory factory, int rpcTimeout) throws IOException { + SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy + ) throws IOException { T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(), new Class[] { protocol }, new StoppedInvocationHandler()); return new ProtocolProxy<T>(protocol, proxy, false);