Author: szetszwo
Date: Tue Mar 6 02:42:46 2012
New Revision: 1297328
URL: http://svn.apache.org/viewvc?rev=1297328&view=rev
Log:
HDFS-3032. Change DFSClient.renewLease() so that it only retries up to the
lease soft-limit. Contributed by Kihwal Lee
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1297328&r1=1297327&r2=1297328&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Tue Mar 6
02:42:46 2012
@@ -656,6 +656,9 @@ Release 0.23.2 - UNRELEASED
HDFS-3012. Exception while renewing delegation token. (Bobby Evans via
jitendra)
+ HDFS-3032. Change DFSClient.renewLease() so that it only retries up to the
+ lease soft-limit. (Kihwal Lee via szetszwo)
+
Release 0.23.1 - 2012-02-17
INCOMPATIBLE CHANGES
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1297328&r1=1297327&r2=1297328&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
Tue Mar 6 02:42:46 2012
@@ -17,8 +17,38 @@
*/
package org.apache.hadoop.hdfs;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_DEFAULT;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_KEY;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
+
import java.io.BufferedOutputStream;
-import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FileNotFoundException;
@@ -26,11 +56,8 @@ import java.io.IOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
-import java.net.NetworkInterface;
import java.net.Socket;
-import java.net.SocketException;
import java.net.URI;
-import java.net.URISyntaxException;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
@@ -60,8 +87,6 @@ import org.apache.hadoop.fs.ParentNotDir
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.FsPermission;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
-
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
@@ -101,7 +126,6 @@ import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
@@ -133,6 +157,7 @@ public class DFSClient implements java.i
final UserGroupInformation ugi;
volatile boolean clientRunning = true;
+ volatile long lastLeaseRenewal;
private volatile FsServerDefaults serverDefaults;
private volatile long serverDefaultsLastUpdate;
final String clientName;
@@ -381,6 +406,12 @@ public class DFSClient implements java.i
void putFileBeingWritten(final String src, final DFSOutputStream out) {
synchronized(filesBeingWritten) {
filesBeingWritten.put(src, out);
+ // update the last lease renewal time only when there was no
+ // writes. once there is one write stream open, the lease renewer
+ // thread keeps it updated well with in anyone's expiration time.
+ if (lastLeaseRenewal == 0) {
+ updateLastLeaseRenewal();
+ }
}
}
@@ -388,6 +419,9 @@ public class DFSClient implements java.i
void removeFileBeingWritten(final String src) {
synchronized(filesBeingWritten) {
filesBeingWritten.remove(src);
+ if (filesBeingWritten.isEmpty()) {
+ lastLeaseRenewal = 0;
+ }
}
}
@@ -403,6 +437,19 @@ public class DFSClient implements java.i
return clientRunning;
}
+ long getLastLeaseRenewal() {
+ return lastLeaseRenewal;
+ }
+
+ void updateLastLeaseRenewal() {
+ synchronized(filesBeingWritten) {
+ if (filesBeingWritten.isEmpty()) {
+ return;
+ }
+ lastLeaseRenewal = System.currentTimeMillis();
+ }
+ }
+
/**
* Renew leases.
* @return true if lease was renewed. May return false if this
@@ -410,8 +457,24 @@ public class DFSClient implements java.i
**/
boolean renewLease() throws IOException {
if (clientRunning && !isFilesBeingWrittenEmpty()) {
- namenode.renewLease(clientName);
- return true;
+ try {
+ namenode.renewLease(clientName);
+ updateLastLeaseRenewal();
+ return true;
+ } catch (IOException e) {
+ // Abort if the lease has already expired.
+ final long elapsed = System.currentTimeMillis() -
getLastLeaseRenewal();
+ if (elapsed > HdfsConstants.LEASE_SOFTLIMIT_PERIOD) {
+ LOG.warn("Failed to renew lease for " + clientName + " for "
+ + (elapsed/1000) + " seconds (>= soft-limit ="
+ + (HdfsConstants.LEASE_SOFTLIMIT_PERIOD/1000) + " seconds.) "
+ + "Closing all files being written ...", e);
+ closeAllFilesBeingWritten(true);
+ } else {
+ // Let the lease renewer handle it and retry.
+ throw e;
+ }
+ }
}
return false;
}
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java?rev=1297328&r1=1297327&r2=1297328&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java
Tue Mar 6 02:42:46 2012
@@ -430,7 +430,8 @@ class LeaseRenewer {
for(long lastRenewed = System.currentTimeMillis();
clientsRunning() && !Thread.interrupted();
Thread.sleep(getSleepPeriod())) {
- if (System.currentTimeMillis() - lastRenewed >= getRenewalTime()) {
+ final long elapsed = System.currentTimeMillis() - lastRenewed;
+ if (elapsed >= getRenewalTime()) {
try {
renew();
if (LOG.isDebugEnabled()) {
@@ -440,7 +441,7 @@ class LeaseRenewer {
lastRenewed = System.currentTimeMillis();
} catch (SocketTimeoutException ie) {
LOG.warn("Failed to renew lease for " + clientsString() + " for "
- + (getRenewalTime()/1000) + " seconds. Aborting ...", ie);
+ + (elapsed/1000) + " seconds. Aborting ...", ie);
synchronized (this) {
for(DFSClient c : dfsclients) {
c.abort();
@@ -449,8 +450,7 @@ class LeaseRenewer {
break;
} catch (IOException ie) {
LOG.warn("Failed to renew lease for " + clientsString() + " for "
- + (getRenewalTime()/1000) + " seconds. Will retry shortly ...",
- ie);
+ + (elapsed/1000) + " seconds. Will retry shortly ...", ie);
}
}
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java?rev=1297328&r1=1297327&r2=1297328&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java
Tue Mar 6 02:42:46 2012
@@ -18,29 +18,113 @@
package org.apache.hadoop.hdfs;
import java.io.DataOutputStream;
+import java.io.IOException;
import java.security.PrivilegedExceptionAction;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.doNothing;
public class TestLease {
static boolean hasLease(MiniDFSCluster cluster, Path src) {
return NameNodeAdapter.getLeaseManager(cluster.getNamesystem()
).getLeaseByPath(src.toString()) != null;
}
-
- final Path dir = new Path("/test/lease/");
+
+ static final String dirString = "/test/lease";
+ final Path dir = new Path(dirString);
+ static final Log LOG = LogFactory.getLog(TestLease.class);
+ Configuration conf = new HdfsConfiguration();
+
+ @Test
+ public void testLeaseAbort() throws Exception {
+ MiniDFSCluster cluster =
+ new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
+ try {
+ cluster.waitActive();
+ NamenodeProtocols preSpyNN = cluster.getNameNodeRpc();
+ NamenodeProtocols spyNN = spy(preSpyNN);
+
+ DFSClient dfs = new DFSClient(null, spyNN, conf, null);
+ byte[] buf = new byte[1024];
+
+ FSDataOutputStream c_out = createFsOut(dfs, dirString + "c");
+ c_out.write(buf, 0, 1024);
+ c_out.close();
+
+ DFSInputStream c_in = dfs.open(dirString + "c");
+ FSDataOutputStream d_out = createFsOut(dfs, dirString + "d");
+
+ // stub the renew method.
+ doThrow(new RemoteException(InvalidToken.class.getName(),
+ "Your token is worthless")).when(spyNN).renewLease(anyString());
+
+ // We don't need to wait the lease renewer thread to act.
+ // call renewLease() manually.
+ // make it look like lease has already expired.
+ dfs.lastLeaseRenewal = System.currentTimeMillis() - 300000;
+ dfs.renewLease();
+
+ // this should not work.
+ try {
+ d_out.write(buf, 0, 1024);
+ d_out.close();
+ Assert.fail("Write did not fail even after the fatal lease renewal
failure");
+ } catch (IOException e) {
+ LOG.info("Write failed as expected. ", e);
+ }
+
+ // unstub
+ doNothing().when(spyNN).renewLease(anyString());
+
+ // existing input streams should work
+ try {
+ int num = c_in.read(buf, 0, 1);
+ if (num != 1) {
+ Assert.fail("Failed to read 1 byte");
+ }
+ c_in.close();
+ } catch (IOException e) {
+ LOG.error("Read failed with ", e);
+ Assert.fail("Read after lease renewal failure failed");
+ }
+
+ // new file writes should work.
+ try {
+ c_out = createFsOut(dfs, dirString + "c");
+ c_out.write(buf, 0, 1024);
+ c_out.close();
+ } catch (IOException e) {
+ LOG.error("Write failed with ", e);
+ Assert.fail("Write failed");
+ }
+ } finally {
+ cluster.shutdown();
+ }
+ }
@Test
public void testLease() throws Exception {
- Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = new
MiniDFSCluster.Builder(conf).numDataNodes(2).build();
try {
FileSystem fs = cluster.getFileSystem();
@@ -94,6 +178,11 @@ public class TestLease {
Assert.assertTrue(c3.leaserenewer != c5.leaserenewer);
}
+ private FSDataOutputStream createFsOut(DFSClient dfs, String path)
+ throws IOException {
+ return new FSDataOutputStream(dfs.create(path, true), null);
+ }
+
static final ClientProtocol mcp = Mockito.mock(ClientProtocol.class);
static public DFSClient createDFSClientAs(UserGroupInformation ugi,
final Configuration conf) throws Exception {