Author: rangadi
Date: Tue May 6 17:39:35 2008
New Revision: 653959
URL: http://svn.apache.org/viewvc?rev=653959&view=rev
Log:
HADOOP-3334. Move lease handling from FSNamesystem into a seperate class. (Tsz
Wo (Nicholas), SZE via rangadi)
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSImage.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
hadoop/core/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java
hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestEditLog.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=653959&r1=653958&r2=653959&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue May 6 17:39:35 2008
@@ -93,6 +93,9 @@
line length (mapred.linerecordreader.maxlength), thereby avoiding reading
too far into the following split. (Zheng Shao via cdouglas)
+ HADOOP-3334. Move lease handling from FSNamesystem into a seperate class.
+ (Tsz Wo (Nicholas), SZE via rangadi)
+
OPTIMIZATIONS
HADOOP-3274. The default constructor of BytesWritable creates empty
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java?rev=653959&r1=653958&r2=653959&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java Tue May 6
17:39:35 2008
@@ -537,7 +537,7 @@
null,
lastLocations);
fsDir.replaceNode(path, node, cons);
- fsNamesys.addLease(path, clientName);
+ fsNamesys.leaseManager.addLease(path, clientName);
} else if (opcode == OP_CLOSE) {
//
// Remove lease if it exists.
@@ -545,7 +545,7 @@
if (old.isUnderConstruction()) {
INodeFileUnderConstruction cons = (INodeFileUnderConstruction)
old;
- fsNamesys.removeLease(path, cons.getClientName());
+ fsNamesys.leaseManager.removeLease(path, cons.getClientName());
}
}
break;
@@ -590,10 +590,11 @@
break;
}
case OP_DELETE: {
- UTF8 src = null;
+ String src = null;
if (logVersion >= -4) {
- src = new UTF8();
- src.readFields(in);
+ UTF8 srcUtf8 = new UTF8();
+ srcUtf8.readFields(in);
+ src = srcUtf8.toString();
} else {
ArrayWritable aw = null;
Writable writables[];
@@ -604,13 +605,13 @@
throw new IOException("Incorrect data format. "
+ "delete operation.");
}
- src = (UTF8) writables[0];
- timestamp = Long.parseLong(((UTF8)writables[1]).toString());
+ src = writables[0].toString();
+ timestamp = Long.parseLong(writables[1].toString());
}
- old = fsDir.unprotectedDelete(src.toString(), timestamp, null);
+ old = fsDir.unprotectedDelete(src, timestamp, null);
if (old != null && old.isUnderConstruction()) {
INodeFileUnderConstruction cons =
(INodeFileUnderConstruction)old;
- fsNamesys.removeLease(src.toString(), cons.getClientName());
+ fsNamesys.leaseManager.removeLease(src, cons.getClientName());
}
break;
}
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSImage.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSImage.java?rev=653959&r1=653958&r2=653959&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSImage.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSImage.java Tue May 6
17:39:35 2008
@@ -968,7 +968,7 @@
}
INodeFile oldnode = (INodeFile) old;
fsDir.replaceNode(path, oldnode, cons);
- fs.addLease(path, cons.getClientName());
+ fs.leaseManager.addLease(path, cons.getClientName());
}
}
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?rev=653959&r1=653958&r2=653959&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
(original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Tue May
6 17:39:35 2008
@@ -31,6 +31,7 @@
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.ScriptBasedMapping;
+import org.apache.hadoop.dfs.LeaseManager.Lease;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.*;
@@ -162,20 +163,7 @@
private UnderReplicatedBlocks neededReplications = new
UnderReplicatedBlocks();
private PendingReplicationBlocks pendingReplications;
- //
- // Used for handling lock-leases
- // Mapping: leaseHolder -> Lease
- //
- private Map<StringBytesWritable, Lease> leases = new
TreeMap<StringBytesWritable, Lease>();
- // Set of: Lease
- private SortedSet<Lease> sortedLeases = new TreeSet<Lease>();
-
- //
- // Map path names to leases. It is protected by the sortedLeases lock.
- // The map stores pathnames in lexicographical order.
- //
- private TreeMap<String, Lease> sortedLeasesByPath =
- new TreeMap<String, Lease>();
+ LeaseManager leaseManager = new LeaseManager(this);
//
// Threaded object that checks to see if we have been
@@ -241,9 +229,6 @@
*/
private GenerationStamp generationStamp = new GenerationStamp(1000);
- private long softLimit = LEASE_SOFTLIMIT_PERIOD;
- private long hardLimit = LEASE_HARDLIMIT_PERIOD;
-
// Ask Datanode only up to this many blocks to delete.
private int blockInvalidateLimit = FSConstants.BLOCK_INVALIDATE_CHUNK;
@@ -282,7 +267,7 @@
conf.getInt("dfs.replication.pending.timeout.sec",
-1) * 1000L);
this.hbthread = new Daemon(new HeartbeatMonitor());
- this.lmthread = new Daemon(new LeaseMonitor());
+ this.lmthread = new Daemon(leaseManager.createMonitor());
this.replthread = new Daemon(new ReplicationMonitor());
this.resthread = new Daemon(new ResolutionMonitor());
hbthread.start();
@@ -886,7 +871,7 @@
// If the file is under construction , then it must be in our
// leases. Find the appropriate lease record.
//
- Lease lease = getLease(holder);
+ Lease lease = leaseManager.getLease(holder);
//
// We found the lease for this file. And surprisingly the original
// holder is trying to recreate this file. This should never occur.
@@ -900,7 +885,7 @@
//
// Find the original holder.
//
- lease = getLease(pendingFile.getClientName());
+ lease = leaseManager.getLease(pendingFile.getClientName());
if (lease == null) {
throw new AlreadyBeingCreatedException(
"failed to create file " +
src + " for " + holder +
@@ -913,15 +898,7 @@
// to proceed. Otherwise, prevent this request from creating file.
//
if (lease.expiredSoftLimit()) {
- synchronized (sortedLeases) {
- lease.releaseLocks();
- removeLease(lease.getHolder());
- LOG.info("startFile: Removing lease " + lease + " ");
- if (!sortedLeases.remove(lease)) {
- LOG.error("startFile: Unknown failure trying to remove " + lease
+
- " from lease set.");
- }
- }
+ leaseManager.handleExpiredSoftLimit(lease);
} else {
throw new AlreadyBeingCreatedException(
"failed to create file " +
src + " for " + holder +
@@ -950,7 +927,7 @@
DatanodeDescriptor clientNode =
host2DataNodeMap.getDatanodeByHost(clientMachine);
- addLease(src, holder);
+ leaseManager.addLease(src, holder);
//
// Now we can add the name to the filesystem. This file has no
@@ -1071,7 +1048,7 @@
) throws IOException {
INode file = dir.getFileINode(src);
if (file == null) {
- Lease lease = getLease(holder);
+ Lease lease = leaseManager.getLease(holder);
throw new LeaseExpiredException("No lease on " + src +
" File does not exist. " +
(lease != null ? lease.toString() :
@@ -1079,7 +1056,7 @@
" does not have any open files."));
}
if (!file.isUnderConstruction()) {
- Lease lease = getLease(holder);
+ Lease lease = leaseManager.getLease(holder);
throw new LeaseExpiredException("No lease on " + src +
" File is not open for writing. " +
(lease != null ? lease.toString() :
@@ -1101,23 +1078,7 @@
String holder
) throws IOException {
NameNode.stateChangeLog.debug("DIR* NameSystem.abandonFileInProgress:" +
src);
- synchronized (sortedLeases) {
- // find the lease
- Lease lease = getLease(holder);
- if (lease != null) {
- // remove the file from the lease
- if (lease.completedCreate(src)) {
- // if we found the file in the lease, remove it from pendingCreates
- internalReleaseCreate(src, holder);
- } else {
- LOG.info("Attempt by " + holder +
- " to release someone else's create lock on " + src);
- }
- } else {
- LOG.info("Attempt to release a lock from an unknown lease holder "
- + holder + " for " + src);
- }
- }
+ leaseManager.abandonLease(src, holder);
}
/**
@@ -1168,7 +1129,7 @@
NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " + src
+ " blocklist persisted");
- removeLease(src, holder);
+ leaseManager.removeLease(src, holder);
//
// REMIND - mjc - this should be done only after we wait a few secs.
@@ -1427,7 +1388,7 @@
}
if (old.isUnderConstruction()) {
INodeFileUnderConstruction cons = (INodeFileUnderConstruction) old;
- removeLease(src, cons.getClientName());
+ leaseManager.removeLease(src, cons.getClientName());
}
return true;
}
@@ -1538,198 +1499,12 @@
}
}
- /************************************************************
- * A Lease governs all the locks held by a single client.
- * For each client there's a corresponding lease, whose
- * timestamp is updated when the client periodically
- * checks in. If the client dies and allows its lease to
- * expire, all the corresponding locks can be released.
- *************************************************************/
- class Lease implements Comparable<Lease> {
- private StringBytesWritable holder;
- private long lastUpdate;
- private Collection<StringBytesWritable> locks = new
TreeSet<StringBytesWritable>();
- private Collection<StringBytesWritable> creates = new
TreeSet<StringBytesWritable>();
-
- public Lease(String holder) throws IOException {
- this.holder = new StringBytesWritable(holder);
- renew();
- }
- public void renew() {
- this.lastUpdate = now();
- }
- /**
- * Returns true if the Hard Limit Timer has expired
- */
- public boolean expiredHardLimit() {
- if (now() - lastUpdate > hardLimit) {
- return true;
- }
- return false;
- }
- /**
- * Returns true if the Soft Limit Timer has expired
- */
- public boolean expiredSoftLimit() {
- if (now() - lastUpdate > softLimit) {
- return true;
- }
- return false;
- }
- public void obtained(String src) throws IOException {
- locks.add(new StringBytesWritable(src));
- }
- public void released(String src) throws IOException {
- locks.remove(new StringBytesWritable(src));
- }
- public void startedCreate(String src) throws IOException {
- creates.add(new StringBytesWritable(src));
- }
- public boolean completedCreate(String src) throws IOException {
- return creates.remove(new StringBytesWritable(src));
- }
- public boolean hasLocks() {
- return (locks.size() + creates.size()) > 0;
- }
- public void releaseLocks() throws IOException {
- String holderStr = holder.getString();
- locks.clear();
- for (Iterator<StringBytesWritable> it = creates.iterator();
it.hasNext();)
- internalReleaseCreate(it.next().getString(), holderStr);
- creates.clear();
- }
-
- /**
- */
- public String toString() {
- return "[Lease. Holder: " + holder.toString() + ", heldlocks: " +
- locks.size() + ", pendingcreates: " + creates.size() + "]";
- }
-
- /**
- */
- public int compareTo(Lease o) {
- Lease l1 = this;
- Lease l2 = o;
- long lu1 = l1.lastUpdate;
- long lu2 = l2.lastUpdate;
- if (lu1 < lu2) {
- return -1;
- } else if (lu1 > lu2) {
- return 1;
- } else {
- return l1.holder.compareTo(l2.holder);
- }
- }
-
- public boolean equals(Object o) {
- if (!(o instanceof Lease)) {
- return false;
- }
- Lease obj = (Lease) o;
- if (lastUpdate == obj.lastUpdate &&
- holder.equals(obj.holder)) {
- return true;
- }
- return false;
- }
-
- public int hashCode() {
- return holder.hashCode();
- }
-
- String getHolder() throws IOException {
- return holder.getString();
- }
-
- Collection<StringBytesWritable> getPaths() throws IOException {
- return creates;
- }
-
- // If a file with the specified prefix exists, then replace
- // it with the new prefix.
- //
- void replacePrefix(String src, String overwrite,
- String replaceBy) throws IOException {
- List<StringBytesWritable> toAdd = new ArrayList<StringBytesWritable>();
- for (Iterator<StringBytesWritable> f = creates.iterator();
- f.hasNext();){
- String path = f.next().getString();
- if (!path.startsWith(src)) {
- continue;
- }
- // remove this filename from this lease.
- f.remove();
-
- // remember new filename
- String newPath = path.replaceFirst(overwrite, replaceBy);
- toAdd.add(new StringBytesWritable(newPath));
- LOG.info("Modified Lease for file " + path +
- " to new path " + newPath);
- }
- // add modified filenames back into lease.
- for (Iterator<StringBytesWritable> f = toAdd.iterator();
- f.hasNext();) {
- creates.add(f.next());
- }
- }
- }
-
- /******************************************************
- * LeaseMonitor checks for leases that have expired,
- * and disposes of them.
- ******************************************************/
- class LeaseMonitor implements Runnable {
- public void run() {
- try {
- while (fsRunning) {
- synchronized (FSNamesystem.this) {
- synchronized (sortedLeases) {
- Lease top;
- while ((sortedLeases.size() > 0) &&
- ((top = sortedLeases.first()) != null)) {
- if (top.expiredHardLimit()) {
- top.releaseLocks();
- leases.remove(top.holder);
- LOG.info("Removing lease " + top + ", leases remaining: " +
sortedLeases.size());
- if (!sortedLeases.remove(top)) {
- LOG.info("Unknown failure trying to remove " + top + "
from lease set.");
- }
- } else {
- break;
- }
- }
- }
- }
- try {
- Thread.sleep(2000);
- } catch (InterruptedException ie) {
- }
- }
- } catch (Exception e) {
- FSNamesystem.LOG.error(StringUtils.stringifyException(e));
- }
- }
- }
-
- private Lease getLease(String holder) throws IOException {
- return leases.get(new StringBytesWritable(holder));
- }
-
- private void putLease(String holder, Lease lease) throws IOException {
- leases.put(new StringBytesWritable(holder), lease);
- }
-
- private void removeLease(String holder) throws IOException {
- leases.remove(new StringBytesWritable(holder));
- }
-
/**
* Move a file that is being written to be immutable.
* @param src The filename
* @param holder The datanode that was creating the file
*/
- private void internalReleaseCreate(String src, String holder) throws
IOException {
+ void internalReleaseCreate(String src, String holder) throws IOException {
INodeFile iFile = dir.getFileINode(src);
if (iFile == null) {
NameNode.stateChangeLog.warn("DIR* NameSystem.internalReleaseCreate: "
@@ -1776,17 +1551,10 @@
/**
* Renew the lease(s) held by the given client
*/
- public void renewLease(String holder) throws IOException {
- synchronized (sortedLeases) {
- if (isInSafeMode())
- throw new SafeModeException("Cannot renew lease for " + holder,
safeMode);
- Lease lease = getLease(holder);
- if (lease != null) {
- sortedLeases.remove(lease);
- lease.renew();
- sortedLeases.add(lease);
- }
- }
+ void renewLease(String holder) throws IOException {
+ if (isInSafeMode())
+ throw new SafeModeException("Cannot renew lease for " + holder,
safeMode);
+ leaseManager.renewLease(holder);
}
/**
@@ -4104,15 +3872,6 @@
return maxFsObjects;
}
- /**
- * Used by unit tests to change lease periods
- */
- void setLeasePeriod(long softLimit, long hardLimit) {
- this.softLimit = softLimit;
- this.hardLimit = hardLimit;
- this.lmthread.interrupt();
- }
-
public long getFilesTotal() {
return this.dir.totalInodes();
}
@@ -4200,48 +3959,10 @@
return generationStamp.getStamp();
}
- /**
- * deletes the lease for the specified file
- */
- void removeLease(String src, String holder) throws IOException {
- synchronized (sortedLeases) {
- Lease lease = getLease(holder);
- if (lease != null) {
- lease.completedCreate(src);
- if (!lease.hasLocks()) {
- removeLease(holder);
- sortedLeases.remove(lease);
- sortedLeasesByPath.remove(src);
- }
- }
- }
- }
-
- /**
- * Adds (or re-adds) the lease for the specified file.
- */
- void addLease(String src, String holder) throws IOException {
- synchronized (sortedLeases) {
- Lease lease = getLease(holder);
- if (lease == null) {
- lease = new Lease(holder);
- putLease(holder, lease);
- sortedLeases.add(lease);
- } else {
- sortedLeases.remove(lease);
- lease.renew();
- sortedLeases.add(lease);
- }
- sortedLeasesByPath.put(src, lease);
- lease.startedCreate(src);
- }
- }
-
// rename was successful. If any part of the renamed subtree had
// files that were being written to, update with new filename.
//
void changeLease(String src, String dst) throws IOException {
- Map<String, Lease> addTo = new TreeMap<String, Lease>();
String overwrite;
String replaceBy;
@@ -4255,66 +3976,17 @@
replaceBy = dst;
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("changelease " +
- " src " + src + " dest " + dst +
- " overwrite " + overwrite +
- " replaceBy " + replaceBy);
- }
-
- synchronized (sortedLeases) {
- SortedMap<String, Lease> myset = sortedLeasesByPath.tailMap(src);
- for (Iterator<Map.Entry<String, Lease>> iter =
myset.entrySet().iterator();
- iter.hasNext();) {
- Map.Entry<String, Lease> value = iter.next();
- String path = (String)value.getKey();
- if (!path.startsWith(src)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("changelease comparing " + path +
- " with " + src + " and terminating.");
- }
- break;
- }
- Lease lease = (Lease)value.getValue();
-
- // Fix up all the pathnames in this lease.
- if (LOG.isDebugEnabled()) {
- LOG.debug("changelease comparing " + path +
- " with " + src + " and replacing ");
- }
- lease.replacePrefix(src, overwrite, replaceBy);
-
- // Remove this lease from sortedLeasesByPath because the
- // pathname has changed.
- String newPath = path.replaceFirst(overwrite, replaceBy);
- addTo.put(newPath, lease);
- iter.remove();
- }
- // re-add entries back in sortedLeasesByPath
- sortedLeasesByPath.putAll(addTo);
- }
+ leaseManager.changeLease(src, dst, overwrite, replaceBy);
}
/**
- * Returns the number of leases currently in the system
- */
- int countLease() {
- synchronized (sortedLeases) {
- return sortedLeases.size();
- }
- }
-
- /**
* Serializes leases.
*/
void saveFilesUnderConstruction(DataOutputStream out) throws IOException {
- synchronized (sortedLeases) {
- int count = 0;
- for (Lease lease : sortedLeases) {
- count += lease.getPaths().size();
- }
- out.writeInt(count); // write the size
- for (Lease lease : sortedLeases) {
+ synchronized (leaseManager) {
+ out.writeInt(leaseManager.countPath()); // write the size
+
+ for (Lease lease : leaseManager.getSortedLeases()) {
Collection<StringBytesWritable> files = lease.getPaths();
for (Iterator<StringBytesWritable> i = files.iterator(); i.hasNext();){
String path = i.next().getString();
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java?rev=653959&r1=653958&r2=653959&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java
(original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java Tue
May 6 17:39:35 2008
@@ -763,7 +763,8 @@
* Set the softLimit and hardLimit of client lease periods
*/
void setLeasePeriod(long soft, long hard) {
- nameNode.namesystem.setLeasePeriod(soft, hard);
+ nameNode.namesystem.leaseManager.setLeasePeriod(soft, hard);
+ nameNode.namesystem.lmthread.interrupt();
}
/**
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestEditLog.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestEditLog.java?rev=653959&r1=653958&r2=653959&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestEditLog.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestEditLog.java Tue May
6 17:39:35 2008
@@ -139,12 +139,9 @@
File editFile = fsimage.getEditFile(i);
System.out.println("Verifying file: " + editFile);
int numEdits = editLog.loadFSEdits(editFile);
- System.out.println("Number of outstanding leases " +
- FSNamesystem.getFSNamesystem().countLease());
-
- assertTrue("Found " + FSNamesystem.getFSNamesystem().countLease() +
- " leases but expected 0",
- FSNamesystem.getFSNamesystem().countLease() == 0);
+ int numLeases = FSNamesystem.getFSNamesystem().leaseManager.countLease();
+ System.out.println("Number of outstanding leases " + numLeases);
+ assertEquals(0, numLeases);
assertTrue("Verification for " + editFile + " failed. " +
"Expected " + (numThreads * 2 * numberTransactions) + "
transactions. "+
"Found " + numEdits + " transactions.",