Author: cutting Date: Fri Mar 2 14:12:24 2007 New Revision: 513988 URL: http://svn.apache.org/viewvc?view=rev&rev=513988 Log: HADOOP-432. Add a trash feature, disabled by default.
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/Trash.java Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/conf/hadoop-default.xml lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FsShell.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=513988&r1=513987&r2=513988 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Fri Mar 2 14:12:24 2007 @@ -199,6 +199,11 @@ once. Large lists were causing datenodes to timeout. (Dhruba Borthakur via cutting) +62. HADOOP-432. Add a trash feature, disabled by default. When + enabled, the FSShell 'rm' command will move things to a trash + directory in the filesystem. In HDFS, a thread periodically + checkpoints the trash and removes old checkpoints. (cutting) + Release 0.11.2 - 2007-02-16 Modified: lucene/hadoop/trunk/conf/hadoop-default.xml URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/conf/hadoop-default.xml?view=diff&rev=513988&r1=513987&r2=513988 ============================================================================== --- lucene/hadoop/trunk/conf/hadoop-default.xml (original) +++ lucene/hadoop/trunk/conf/hadoop-default.xml Fri Mar 2 14:12:24 2007 @@ -107,6 +107,21 @@ </property> <property> + <name>fs.trash.root</name> + <value>${hadoop.tmp.dir}/Trash</value> + <description>The trash directory, used by FsShell's 'rm' command. + </description> +</property> + +<property> + <name>fs.trash.interval</name> + <value>0</value> + <description>Number of minutes between trash checkpoints. + If zero, the trash feature is disabled. + </description> +</property> + +<property> <name>fs.file.impl</name> <value>org.apache.hadoop.fs.LocalFileSystem</value> <description>The FileSystem for file: uris.</description> Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java?view=diff&rev=513988&r1=513987&r2=513988 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java Fri Mar 2 14:12:24 2007 @@ -20,6 +20,7 @@ import org.apache.commons.logging.*; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.Trash; import org.apache.hadoop.io.*; import org.apache.hadoop.ipc.*; import org.apache.hadoop.conf.*; @@ -83,6 +84,7 @@ private FSNamesystem namesystem; private Server server; + private Thread emptier; private int handlerCount = 2; /** only used for testing purposes */ @@ -178,6 +180,10 @@ this.server = RPC.getServer(this, hostname, port, handlerCount, false, conf); this.server.start(); + + this.emptier = new Thread(new Trash(conf).getEmptier(), "Trash Emptier"); + this.emptier.setDaemon(true); + this.emptier.start(); } /** @@ -225,8 +231,8 @@ if (! stopRequested) { stopRequested = true; namesystem.close(); + emptier.interrupt(); server.stop(); - //this.join(); } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FsShell.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FsShell.java?view=diff&rev=513988&r1=513987&r2=513988 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FsShell.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FsShell.java Fri Mar 2 14:12:24 2007 @@ -29,6 +29,7 @@ public class FsShell extends ToolBase { protected FileSystem fs; + private Trash trash; /** */ @@ -38,6 +39,7 @@ public void init() throws IOException { conf.setQuietMode(true); this.fs = FileSystem.get(conf); + this.trash = new Trash(conf); } /** @@ -583,6 +585,10 @@ "\", use -rmr instead"); } + if (trash.moveToTrash(src)) { + System.out.println("Moved to trash: " + src); + return; + } if (fs.delete(src)) { System.out.println("Deleted " + src); } else { @@ -590,6 +596,11 @@ } } + private void expunge() throws IOException { + trash.expunge(); + trash.checkpoint(); + } + /** * Return an abbreviated English-language desc of the byte length */ @@ -737,6 +748,7 @@ System.err.println(" [-cp <src> <dst>]"); System.err.println(" [-rm <path>]"); System.err.println(" [-rmr <path>]"); + System.err.println(" [-expunge]"); System.err.println(" [-put <localsrc> <dst>]"); System.err.println(" [-copyFromLocal <localsrc> <dst>]"); System.err.println(" [-moveFromLocal <localsrc> <dst>]"); @@ -843,6 +855,8 @@ exitCode = doall(cmd, argv, conf, i); } else if ("-rmr".equals(cmd)) { exitCode = doall(cmd, argv, conf, i); + } else if ("-expunge".equals(cmd)) { + expunge(); } else if ("-du".equals(cmd)) { if (i < argv.length) { exitCode = doall(cmd, argv, conf, i); Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/Trash.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/Trash.java?view=auto&rev=513988 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/Trash.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/Trash.java Fri Mar 2 14:12:24 2007 @@ -0,0 +1,214 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import java.text.*; +import java.io.*; +import java.net.URI; +import java.util.Date; + +import org.apache.commons.logging.*; + +import org.apache.hadoop.conf.*; + +/** Provides a <i>trash</i> feature. Files may be moved to a trash directory. + * They're initially stored in a <i>current</i> sub-directory of the trash + * directory. Within that sub-directory their original path is preserved. + * Periodically one may checkpoint the current trash and remove older + * checkpoints. (This design permits trash management without enumeration of + * the full trash content, without date support in the filesystem, and without + * clock synchronization.) + */ +public class Trash extends Configured { + private static final Log LOG = + LogFactory.getLog("org.apache.hadoop.fs.Trash"); + + private static final String CURRENT = "Current"; + private static final DateFormat CHECKPOINT = new SimpleDateFormat("yyMMddHHmm"); + private static final int MSECS_PER_MINUTE = 60*1000; + + private FileSystem fs; + private Path root; + private Path current; + private long interval; + + /** Construct a trash can accessor. + * @param conf a Configuration + */ + public Trash(Configuration conf) throws IOException { + super(conf); + + Path root = new Path(conf.get("fs.trash.root", "/tmp/Trash")); + + this.fs = root.getFileSystem(conf); + + if (!root.isAbsolute()) + root = new Path(fs.getWorkingDirectory(), root); + + this.root = root; + this.current = new Path(root, CURRENT); + this.interval = conf.getLong("fs.trash.interval", 60) * MSECS_PER_MINUTE; + } + + /** Move a file or directory to the current trash directory. + * @return false if the item is already in the trash or trash is disabled + */ + public boolean moveToTrash(Path path) throws IOException { + if (interval == 0) + return false; + + if (!path.isAbsolute()) // make path absolute + path = new Path(fs.getWorkingDirectory(), path); + + if (!fs.exists(path)) // check that path exists + throw new FileNotFoundException(path.toString()); + + URI rootUri = root.toUri(); + String dirPath = path.toUri().getPath(); + + if (dirPath.startsWith(rootUri.getPath())) { // already in trash + return false; + } + + Path trashPath = // create path in current + new Path(rootUri.getScheme(), rootUri.getAuthority(), + current.toUri().getPath()+dirPath); + + IOException cause = null; + + // try twice, in case checkpoint between the mkdirs() & rename() + for (int i = 0; i < 2; i++) { + Path trashDir = trashPath.getParent(); + if (!fs.mkdirs(trashDir)) { // make parent directory + throw new IOException("Failed to create trash directory: "+trashDir); + } + try { + if (fs.rename(path, trashPath)) // move to current trash + return true; + } catch (IOException e) { + cause = e; + } + } + throw (IOException) + new IOException("Failed to move to trash: "+path).initCause(cause); + } + + /** Create a trash checkpoint. */ + public void checkpoint() throws IOException { + if (!fs.exists(current)) // no trash, no checkpoint + return; + + Path checkpoint; + synchronized (CHECKPOINT) { + checkpoint = new Path(root, CHECKPOINT.format(new Date())); + } + + if (fs.rename(current, checkpoint)) { + LOG.info("Created trash checkpoint: "+checkpoint); + } else { + throw new IOException("Failed to checkpoint trash: "+checkpoint); + } + } + + /** Delete old checkpoints. */ + public void expunge() throws IOException { + Path[] dirs = fs.listPaths(root); // scan trash sub-directories + long now = System.currentTimeMillis(); + for (int i = 0; i < dirs.length; i++) { + Path dir = dirs[i]; + String name = dir.getName(); + if (name.equals(CURRENT)) // skip current + continue; + + long time; + try { + synchronized (CHECKPOINT) { + time = CHECKPOINT.parse(name).getTime(); + } + } catch (ParseException e) { + LOG.warn("Unexpected item in trash: "+dir+". Ignoring."); + continue; + } + + if ((now - interval) > time) { + if (fs.delete(dir)) { + LOG.info("Deleted trash checkpoint: "+dir); + } else { + LOG.warn("Couldn't delete checkpoint: "+dir+" Ignoring."); + } + } + } + } + + /** Return a [EMAIL PROTECTED] Runnable} that periodically empties the trash. + * Only one checkpoint is kept at a time. + */ + public Runnable getEmptier() { + return new Emptier(); + } + + private class Emptier implements Runnable { + + public void run() { + if (interval == 0) + return; // trash disabled + + long now = System.currentTimeMillis(); + long end = ceiling(now, interval); + while (true) { + try { // sleep for interval + Thread.sleep(end - now); + } catch (InterruptedException e) { + return; // exit on interrupt + } + + now = System.currentTimeMillis(); + if (now >= end) { + + try { + expunge(); + } catch (IOException e) { + LOG.warn("Trash expunge caught: "+e+". Ignoring."); + } + + try { + checkpoint(); + } catch (IOException e) { + LOG.warn("Trash checkpoint caught: "+e+". Ignoring."); + } + + end = ceiling(now, interval); + } + } + } + + private long ceiling(long time, long interval) { + return floor(time, interval) + interval; + } + private long floor(long time, long interval) { + return (time / interval) * interval; + } + + } + + /** Run an emptier.*/ + public static void main(String[] args) throws Exception { + new Trash(new Configuration()).getEmptier().run(); + } + +}