Author: cutting Date: Mon May 7 12:43:37 2007 New Revision: 535963 URL: http://svn.apache.org/viewvc?view=rev&rev=535963 Log: HADOOP-1263. Change DFSClient to retry certain namenode calls with an exponential backoff. Contributed by Hairong.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/RetryPolicies.java lucene/hadoop/trunk/src/test/org/apache/hadoop/io/retry/TestRetryProxy.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=535963&r1=535962&r2=535963 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Mon May 7 12:43:37 2007 @@ -332,6 +332,10 @@ 98. HADOOP-1184. Fix HDFS decomissioning to complete when the only copy of a block is on a decommissioned node. (Dhruba Borthakur via cutting) +99. HADOOP-1263. Change DFSClient to retry certain namenode calls + with a random, exponentially increasing backoff time, to avoid + overloading the namenode on, e.g., job start. (Hairong Kuang via cutting) + Release 0.12.3 - 2007-04-06 Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?view=diff&rev=535963&r1=535962&r2=535963 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Mon May 7 12:43:37 2007 @@ -18,6 +18,9 @@ package org.apache.hadoop.dfs; import org.apache.hadoop.io.*; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.io.retry.RetryProxy; import org.apache.hadoop.fs.*; import org.apache.hadoop.ipc.*; import org.apache.hadoop.conf.*; @@ -28,6 +31,7 @@ import java.io.*; import java.net.*; import java.util.*; +import java.util.concurrent.TimeUnit; /******************************************************** * DFSClient can connect to a Hadoop Filesystem and @@ -94,6 +98,46 @@ Runtime.getRuntime().addShutdownHook(clientFinalizer); } + private static ClientProtocol createNamenode( + InetSocketAddress nameNodeAddr, Configuration conf) + throws IOException { + RetryPolicy timeoutPolicy = RetryPolicies.exponentialBackoffRetry( + 5, 200, TimeUnit.MILLISECONDS); + RetryPolicy createPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep( + 5, LEASE_SOFTLIMIT_PERIOD, TimeUnit.MILLISECONDS); + + Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap = + new HashMap<Class<? extends Exception>, RetryPolicy>(); + exceptionToPolicyMap.put(SocketTimeoutException.class, timeoutPolicy); + exceptionToPolicyMap.put(AlreadyBeingCreatedException.class, createPolicy); + + RetryPolicy methodPolicy = RetryPolicies.retryByException( + RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap); + Map<String,RetryPolicy> methodNameToPolicyMap = new HashMap<String,RetryPolicy>(); + + methodNameToPolicyMap.put("open", methodPolicy); + methodNameToPolicyMap.put("setReplication", methodPolicy); + methodNameToPolicyMap.put("abandonBlock", methodPolicy); + methodNameToPolicyMap.put("abandonFileInProgress", methodPolicy); + methodNameToPolicyMap.put("reportBadBlocks", methodPolicy); + methodNameToPolicyMap.put("exists", methodPolicy); + methodNameToPolicyMap.put("isDir", methodPolicy); + methodNameToPolicyMap.put("getListing", methodPolicy); + methodNameToPolicyMap.put("getHints", methodPolicy); + methodNameToPolicyMap.put("renewLease", methodPolicy); + methodNameToPolicyMap.put("getStats", methodPolicy); + methodNameToPolicyMap.put("getDatanodeReport", methodPolicy); + methodNameToPolicyMap.put("getBlockSize", methodPolicy); + methodNameToPolicyMap.put("getEditLogSize", methodPolicy); + methodNameToPolicyMap.put("complete", methodPolicy); + methodNameToPolicyMap.put("getEditLogSize", methodPolicy); + methodNameToPolicyMap.put("create", methodPolicy); + + return (ClientProtocol) RetryProxy.create(ClientProtocol.class, + RPC.getProxy(ClientProtocol.class, + ClientProtocol.versionID, nameNodeAddr, conf), + methodNameToPolicyMap); + } /** * Create a new DFSClient connected to the given namenode server. @@ -101,8 +145,7 @@ public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf) throws IOException { this.conf = conf; - this.namenode = (ClientProtocol) RPC.getProxy(ClientProtocol.class, - ClientProtocol.versionID, nameNodeAddr, conf); + this.namenode = createNamenode(nameNodeAddr, conf); String taskId = conf.get("mapred.task.id"); if (taskId != null) { this.clientName = "DFSClient_" + taskId; @@ -160,19 +203,12 @@ } public long getBlockSize(UTF8 f) throws IOException { - int retries = 4; - while (true) { - try { - return namenode.getBlockSize(f.toString()); - } catch (IOException ie) { - if (--retries == 0) { - LOG.warn("Problem getting block size: " + - StringUtils.stringifyException(ie)); - throw ie; - } - LOG.debug("Problem getting block size: " + - StringUtils.stringifyException(ie)); - } + try { + return namenode.getBlockSize(f.toString()); + } catch (IOException ie) { + LOG.warn("Problem getting block size: " + + StringUtils.stringifyException(ie)); + throw ie; } } @@ -1133,31 +1169,8 @@ } private LocatedBlock locateNewBlock() throws IOException { - int retries = 3; - while (true) { - while (true) { - try { - return namenode.create(src.toString(), clientName.toString(), - overwrite, replication, blockSize); - } catch (RemoteException e) { - if (--retries == 0 || - !AlreadyBeingCreatedException.class.getName(). - equals(e.getClassName())) { - throw e; - } else { - // because failed tasks take upto LEASE_PERIOD to - // release their pendingCreates files, if the file - // we want to create is already being created, - // wait and try again. - LOG.info(StringUtils.stringifyException(e)); - try { - Thread.sleep(LEASE_SOFTLIMIT_PERIOD); - } catch (InterruptedException ie) { - } - } - } - } - } + return namenode.create(src.toString(), clientName.toString(), + overwrite, replication, blockSize); } private LocatedBlock locateFollowingBlock(long start Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java?view=diff&rev=535963&r1=535962&r2=535963 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java Mon May 7 12:43:37 2007 @@ -67,7 +67,7 @@ } return null; } - LOG.warn("Exception while invoking " + method.getName() + LOG.info("Exception while invoking " + method.getName() + " of " + implementation.getClass() + ". Retrying." + StringUtils.stringifyException(e)); } @@ -76,6 +76,9 @@ private Object invokeMethod(Method method, Object[] args) throws Throwable { try { + if (!method.isAccessible()) { + method.setAccessible(true); + } return method.invoke(implementation, args); } catch (InvocationTargetException e) { throw e.getCause(); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/RetryPolicies.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/RetryPolicies.java?view=diff&rev=535963&r1=535962&r2=535963 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/RetryPolicies.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/RetryPolicies.java Mon May 7 12:43:37 2007 @@ -18,6 +18,7 @@ package org.apache.hadoop.io.retry; import java.util.Map; +import java.util.Random; import java.util.concurrent.TimeUnit; /** @@ -82,6 +83,10 @@ return new RetryUpToMaximumCountWithProportionalSleep(maxRetries, sleepTime, timeUnit); } + public static final RetryPolicy exponentialBackoffRetry( + int maxRetries, long sleepTime, TimeUnit timeUnit) { + return new ExponentialBackoffRetry(maxRetries, sleepTime, timeUnit); + } /** * <p> * Set a default policy with some explicit handlers for specific exceptions. @@ -121,7 +126,7 @@ } public boolean shouldRetry(Exception e, int retries) throws Exception { - if (retries > maxRetries) { + if (retries >= maxRetries) { throw e; } try { @@ -184,5 +189,16 @@ } - + static class ExponentialBackoffRetry extends RetryLimited { + private Random r = new Random(); + public ExponentialBackoffRetry( + int maxRetries, long sleepTime, TimeUnit timeUnit) { + super(maxRetries, sleepTime, timeUnit); + } + + @Override + protected long calculateSleepTime(int retries) { + return sleepTime*r.nextInt(1<<(retries+1)); + } + } } Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/io/retry/TestRetryProxy.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/io/retry/TestRetryProxy.java?view=diff&rev=535963&r1=535962&r2=535963 ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/io/retry/TestRetryProxy.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/io/retry/TestRetryProxy.java Mon May 7 12:43:37 2007 @@ -7,6 +7,7 @@ import static org.apache.hadoop.io.retry.RetryPolicies.retryUpToMaximumCountWithFixedSleep; import static org.apache.hadoop.io.retry.RetryPolicies.retryUpToMaximumCountWithProportionalSleep; import static org.apache.hadoop.io.retry.RetryPolicies.retryUpToMaximumTimeWithFixedSleep; +import static org.apache.hadoop.io.retry.RetryPolicies.exponentialBackoffRetry; import java.util.Collections; import java.util.Map; @@ -91,6 +92,20 @@ UnreliableInterface unreliable = (UnreliableInterface) RetryProxy.create(UnreliableInterface.class, unreliableImpl, retryUpToMaximumCountWithProportionalSleep(8, 1, TimeUnit.NANOSECONDS)); + unreliable.alwaysSucceeds(); + unreliable.failsOnceThenSucceeds(); + try { + unreliable.failsTenTimesThenSucceeds(); + fail("Should fail"); + } catch (UnreliableException e) { + // expected + } + } + + public void testExponentialRetry() throws UnreliableException { + UnreliableInterface unreliable = (UnreliableInterface) + RetryProxy.create(UnreliableInterface.class, unreliableImpl, + exponentialBackoffRetry(5, 1L, TimeUnit.NANOSECONDS)); unreliable.alwaysSucceeds(); unreliable.failsOnceThenSucceeds(); try {