Author: hairong
Date: Fri Jul 25 16:42:56 2008
New Revision: 679930
URL: http://svn.apache.org/viewvc?rev=679930&view=rev
Log:
HADOOP-3169. LeaseChecker daemon should not be started in DFSClient
constructor. Contributed by Tse Wo (Nicholas), SZE.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=679930&r1=679929&r2=679930&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Jul 25 16:42:56 2008
@@ -112,6 +112,9 @@
HADOOP-3747. Adds counter suport for MultipleOutputs.
(Alejandro Abdelnur via ddas)
+ HADOOP-3169. LeaseChecker daemon should not be started in DFSClient
+ constructor. (TszWo (Nicholas), SZE via hairong)
+
OPTIMIZATIONS
HADOOP-3556. Removed lock contention in MD5Hash by changing the
@@ -855,7 +858,7 @@
HADOOP-3588. Fixed usability issues with archives. (mahadev)
- HADOOP-3536. Uncaught exception in DataBlockScanner.
+ HADOOP-3635. Uncaught exception in DataBlockScanner.
(Tsz Wo (Nicholas), SZE via hairong)
HADOOP-3539. Exception when closing DFSClient while multiple files are
Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=679930&r1=679929&r2=679930&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Fri Jul 25
16:42:56 2008
@@ -61,7 +61,7 @@
* filesystem tasks.
*
********************************************************/
-public class DFSClient implements FSConstants {
+public class DFSClient implements FSConstants, java.io.Closeable {
public static final Log LOG = LogFactory.getLog(DFSClient.class);
public static final int MAX_BLOCK_ACQUIRE_FAILURES = 3;
private static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB
@@ -71,7 +71,7 @@
volatile boolean clientRunning = true;
Random r = new Random();
String clientName;
- Daemon leaseChecker;
+ private final LeaseChecker leasechecker = new LeaseChecker();
private Configuration conf;
private long defaultBlockSize;
private short defaultReplication;
@@ -81,12 +81,6 @@
final int writePacketSize;
private FileSystem.Statistics stats;
- /**
- * A map from name -> DFSOutputStream of files that are currently being
- * written by this client.
- */
- private TreeMap<String, OutputStream> pendingCreates =
- new TreeMap<String, OutputStream>();
public static ClientProtocol createNamenode(Configuration conf) throws
IOException {
return createNamenode(NameNode.getAddress(conf), conf);
@@ -186,8 +180,6 @@
}
defaultBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
defaultReplication = (short) conf.getInt("dfs.replication", 3);
- this.leaseChecker = new Daemon(new LeaseChecker());
- this.leaseChecker.start();
}
public DFSClient(InetSocketAddress nameNodeAddr,
@@ -207,33 +199,13 @@
* Close the file system, abandoning all of the leases and files being
* created and close connections to the namenode.
*/
- public void close() throws IOException {
- // synchronize in here so that we don't need to change the API
- synchronized (this) {
- checkOpen();
- synchronized (pendingCreates) {
- while (!pendingCreates.isEmpty()) {
- String name = pendingCreates.firstKey();
- OutputStream out = pendingCreates.remove(name);
- if (out != null) {
- try {
- out.close();
- } catch (IOException ie) {
- System.err.println("Exception closing file " + name);
- ie.printStackTrace();
- }
- }
- }
- }
- this.clientRunning = false;
- try {
- leaseChecker.join();
- } catch (InterruptedException ie) {
- }
-
- // close connections to the namenode
- RPC.stopProxy(rpcNamenode);
- }
+ public synchronized void close() throws IOException {
+ checkOpen();
+ clientRunning = false;
+ leasechecker.close();
+
+ // close connections to the namenode
+ RPC.stopProxy(rpcNamenode);
}
/**
@@ -477,9 +449,7 @@
OutputStream result = new DFSOutputStream(src, masked,
overwrite, replication, blockSize, progress, buffersize,
conf.getInt("io.bytes.per.checksum", 512));
- synchronized (pendingCreates) {
- pendingCreates.put(src, result);
- }
+ leasechecker.put(src, result);
return result;
}
@@ -508,9 +478,7 @@
}
OutputStream result = new DFSOutputStream(src, buffersize, progress,
lastBlock, stat, conf.getInt("io.bytes.per.checksum", 512));
- synchronized(pendingCreates) {
- pendingCreates.put(src, result);
- }
+ leasechecker.put(src, result);
return result;
}
@@ -803,35 +771,98 @@
throw new IOException("No live nodes contain current block");
}
- /***************************************************************
- * Periodically check in with the namenode and renew all the leases
- * when the lease period is half over.
- ***************************************************************/
- class LeaseChecker implements Runnable {
+ boolean isLeaseCheckerStarted() {
+ return leasechecker.daemon != null;
+ }
+
+ /** Lease management*/
+ private class LeaseChecker implements Runnable {
+ /** A map from src -> DFSOutputStream of files that are currently being
+ * written by this client.
+ */
+ private final SortedMap<String, OutputStream> pendingCreates
+ = new TreeMap<String, OutputStream>();
+
+ private Daemon daemon = null;
+
+ synchronized void put(String src, OutputStream out) {
+ if (clientRunning) {
+ if (daemon == null) {
+ daemon = new Daemon(this);
+ daemon.start();
+ }
+ pendingCreates.put(src, out);
+ }
+ }
+
+ synchronized void remove(String src) {
+ pendingCreates.remove(src);
+ }
+
+ synchronized void close() {
+ while (!pendingCreates.isEmpty()) {
+ String src = pendingCreates.firstKey();
+ OutputStream out = pendingCreates.remove(src);
+ if (out != null) {
+ try {
+ out.close();
+ } catch (IOException ie) {
+ System.err.println("Exception closing file " + src);
+ ie.printStackTrace();
+ }
+ }
+ }
+
+ if (daemon != null) {
+ daemon.interrupt();
+ }
+ }
+
+ private void renew() throws IOException {
+ synchronized(this) {
+ if (pendingCreates.isEmpty()) {
+ return;
+ }
+ }
+ namenode.renewLease(clientName);
+ }
+
/**
+ * Periodically check in with the namenode and renew all the leases
+ * when the lease period is half over.
*/
public void run() {
long lastRenewed = 0;
while (clientRunning) {
if (System.currentTimeMillis() - lastRenewed > (LEASE_SOFTLIMIT_PERIOD
/ 2)) {
try {
- synchronized (pendingCreates) {
- if (pendingCreates.size() > 0)
- namenode.renewLease(clientName);
- }
+ renew();
lastRenewed = System.currentTimeMillis();
} catch (IOException ie) {
- String err = StringUtils.stringifyException(ie);
- LOG.warn("Problem renewing lease for " + clientName +
- ": " + err);
+ LOG.warn("Problem renewing lease for " + clientName, ie);
}
}
+
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(this + " is interrupted.", ie);
+ }
+ return;
}
}
}
+
+ /** [EMAIL PROTECTED] */
+ public String toString() {
+ String s = getClass().getSimpleName();
+ if (LOG.isTraceEnabled()) {
+ return s + "@" + DFSClient.this + ": "
+ + StringUtils.stringifyException(new Throwable("for testing"));
+ }
+ return s;
+ }
}
/** Utility class to encapsulate data node info and its ip address. */
@@ -2792,10 +2823,7 @@
@Override
public void close() throws IOException {
closeInternal();
-
- synchronized (pendingCreates) {
- pendingCreates.remove(src);
- }
+ leasechecker.remove(src);
if (s != null) {
s.close();
@@ -2918,4 +2946,10 @@
+ StringUtils.stringifyException(ie));
}
}
+
+ /** [EMAIL PROTECTED] */
+ public String toString() {
+ return getClass().getSimpleName() + "[clientName=" + clientName
+ + ", ugi=" + ugi + "]";
+ }
}
Modified:
hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestDistributedFileSystem.java?rev=679930&r1=679929&r2=679930&view=diff
==============================================================================
---
hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
(original)
+++
hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
Fri Jul 25 16:42:56 2008
@@ -21,6 +21,8 @@
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -65,4 +67,47 @@
}
}
+ public void testDFSClient() throws Exception {
+ Configuration conf = new Configuration();
+ MiniDFSCluster cluster = null;
+
+ try {
+ cluster = new MiniDFSCluster(conf, 2, true, null);
+ final Path filepath = new Path("/test/LeaseChecker/foo");
+ final long millis = System.currentTimeMillis();
+
+ {
+ DistributedFileSystem dfs =
(DistributedFileSystem)cluster.getFileSystem();
+ assertFalse(dfs.dfs.isLeaseCheckerStarted());
+
+ //create a file
+ FSDataOutputStream out = dfs.create(filepath);
+ assertTrue(dfs.dfs.isLeaseCheckerStarted());
+
+ //write something and close
+ out.writeLong(millis);
+ assertTrue(dfs.dfs.isLeaseCheckerStarted());
+ out.close();
+ assertTrue(dfs.dfs.isLeaseCheckerStarted());
+ dfs.close();
+ }
+
+ {
+ DistributedFileSystem dfs =
(DistributedFileSystem)cluster.getFileSystem();
+ assertFalse(dfs.dfs.isLeaseCheckerStarted());
+
+ //open and check the file
+ FSDataInputStream in = dfs.open(filepath);
+ assertFalse(dfs.dfs.isLeaseCheckerStarted());
+ assertEquals(millis, in.readLong());
+ assertFalse(dfs.dfs.isLeaseCheckerStarted());
+ in.close();
+ assertFalse(dfs.dfs.isLeaseCheckerStarted());
+ dfs.close();
+ }
+ }
+ finally {
+ if (cluster != null) {cluster.shutdown();}
+ }
+ }
}