Author: cutting Date: Tue Dec 12 15:00:31 2006 New Revision: 486399 URL: http://svn.apache.org/viewvc?view=rev&rev=486399 Log: HADOOP-571. Extend the syntax of Path to be a URI.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/conf/hadoop-default.xml lucene/hadoop/trunk/src/java/org/apache/hadoop/conf/Configuration.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/Path.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormat.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormat.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormatBase.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextOutputFormat.java lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestPath.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=486399&r1=486398&r2=486399 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Tue Dec 12 15:00:31 2006 @@ -74,6 +74,15 @@ 22. HADOOP-673. Give each task its own working directory again. (Mahadev Konar via cutting) +23. HADOOP-571. Extend the syntax of Path to be a URI; to be + optionally qualified with a scheme and authority. The scheme + determines the FileSystem implementation, while the authority + determines the FileSystem instance. New FileSystem + implementations may be provided by defining an fs.<scheme>.impl + property, naming the FileSystem implementation class. This + permits easy integration of new FileSystem implementations. + (cutting) + Release 0.9.1 - 2006-12-06 Modified: lucene/hadoop/trunk/conf/hadoop-default.xml URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/conf/hadoop-default.xml?view=diff&rev=486399&r1=486398&r2=486399 ============================================================================== --- lucene/hadoop/trunk/conf/hadoop-default.xml (original) +++ lucene/hadoop/trunk/conf/hadoop-default.xml Tue Dec 12 15:00:31 2006 @@ -98,9 +98,24 @@ <property> <name>fs.default.name</name> - <value>local</value> - <description>The name of the default file system. Either the - literal string "local" or a host:port for DFS.</description> + <value>file:///</value> + <description>The name of the default file system. A URI whose + scheme and authority determine the FileSystem implementation. The + uri's scheme determines the config property (fs.SCHEME.impl) naming + the FileSystem implementation class. The uri's authority is used to + determine the host, port, etc. for a filesystem.</description> +</property> + +<property> + <name>fs.file.impl</name> + <value>org.apache.hadoop.fs.LocalFileSystem</value> + <description>The FileSystem for file: uris.</description> +</property> + +<property> + <name>fs.hdfs.impl</name> + <value>org.apache.hadoop.dfs.DistributedFileSystem</value> + <description>The FileSystem for hdfs: uris.</description> </property> <property> Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/conf/Configuration.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/conf/Configuration.java?view=diff&rev=486399&r1=486398&r2=486399 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/conf/Configuration.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/conf/Configuration.java Tue Dec 12 15:00:31 2006 @@ -497,13 +497,15 @@ doc = builder.parse(url.toString()); } } else if (name instanceof Path) { // a file resource - Path file = (Path)name; - FileSystem fs = FileSystem.getNamed("local", this); - if (fs.exists(file)) { + // Can't use FileSystem API or we get an infinite loop + // since FileSystem uses Configuration API. Use java.io.File instead. + File file = new File(((Path)name).toUri().getPath()) + .getAbsoluteFile(); + if (file.exists()) { if (!quiet) { LOG.info("parsing " + file); } - InputStream in = new BufferedInputStream(fs.openRaw(file)); + InputStream in = new BufferedInputStream(new FileInputStream(file)); try { doc = builder.parse(in); } finally { Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?view=diff&rev=486399&r1=486398&r2=486399 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Tue Dec 12 15:00:31 2006 @@ -165,11 +165,7 @@ return defaultBlockSize; } - public long getBlockSize(Path f) throws IOException { - // if we already know the answer, use it. - if (f instanceof DfsPath) { - return ((DfsPath) f).getBlockSize(); - } + public long getBlockSize(UTF8 f) throws IOException { int retries = 4; while (true) { try { Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java?view=diff&rev=486399&r1=486398&r2=486399 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java Tue Dec 12 15:00:31 2006 @@ -37,21 +37,35 @@ private Path workingDir = new Path("/user", System.getProperty("user.name")); - private String name; + private URI uri; private FileSystem localFs; DFSClient dfs; - /** Construct a client for the filesystem at <code>namenode</code>. - */ - public DistributedFileSystem(InetSocketAddress namenode, Configuration conf) throws IOException { - super(conf); - this.dfs = new DFSClient(namenode, conf); - this.name = namenode.getHostName() + ":" + namenode.getPort(); - this.localFs = getNamed("local", conf); + public DistributedFileSystem() {} + + /** @deprecated */ + public DistributedFileSystem(InetSocketAddress namenode, + Configuration conf) throws IOException { + initialize(URI.create("hdfs://"+ + namenode.getHostName()+":"+ + namenode.getPort()), + conf); } - public String getName() { return name; } + /** @deprecated */ + public String getName() { return uri.getAuthority(); } + + public URI getUri() { return uri; } + + public void initialize(URI uri, Configuration conf) throws IOException { + setConf(conf); + String host = uri.getHost(); + int port = uri.getPort(); + this.dfs = new DFSClient(new InetSocketAddress(host,port), conf); + this.uri = URI.create("hdfs://"+host+":"+port); + this.localFs = getNamed("file:///", conf); + } public Path getWorkingDirectory() { return workingDir; @@ -62,7 +76,11 @@ } public long getBlockSize(Path f) throws IOException { - return dfs.getBlockSize(makeAbsolute(f)); + // if we already know the answer, use it. + if (f instanceof DfsPath) { + return ((DfsPath) f).getBlockSize(); + } + return dfs.getBlockSize(getPath(f)); } public short getDefaultReplication() { @@ -82,7 +100,8 @@ } private UTF8 getPath(Path file) { - return new UTF8(makeAbsolute(file).toString()); + checkPath(file); + return new UTF8(makeAbsolute(file).toUri().getPath()); } public String[][] getFileCacheHints(Path f, long start, long len) throws IOException { Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java?view=diff&rev=486399&r1=486398&r2=486399 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java Tue Dec 12 15:00:31 2006 @@ -26,7 +26,7 @@ import org.apache.hadoop.dfs.*; import org.apache.hadoop.conf.*; -import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.util.*; /**************************************************************** * An abstract base class for a fairly generic filesystem. It @@ -50,7 +50,9 @@ public abstract class FileSystem extends Configured { public static final Log LOG = LogFactory.getLog("org.apache.hadoop.dfs.DistributedFileSystem"); - private static final HashMap NAME_TO_FS = new HashMap(); + // cache indexed by URI scheme and authority + private static final Map<String,Map<String,FileSystem>> CACHE + = new HashMap<String,Map<String,FileSystem>>(); /** * Parse the cmd-line args, starting at i. Remove consumed args * from array. We expect param in the form: @@ -89,25 +91,100 @@ return getNamed(conf.get("fs.default.name", "local"), conf); } - /** Returns a name for this filesystem, suitable to pass to [EMAIL PROTECTED] - * FileSystem#getNamed(String,Configuration)}.*/ + /** Called after a new FileSystem instance is constructed. + * @param name a uri whose authority section names the host, port, etc. + * for this FileSystem + * @param conf the configuration + */ + public abstract void initialize(URI name, Configuration conf) + throws IOException; + + /** Returns a URI whose scheme and authority identify this FileSystem.*/ + public abstract URI getUri(); + + /** @deprecated call #getUri() instead.*/ public abstract String getName(); + + /** @deprecated call #get(URI,Configuration) instead. */ + public static FileSystem getNamed(String name, Configuration conf) + throws IOException { + + // convert old-format name to new-format name + if (name.equals("local")) { // "local" is now "file:///". + name = "file:///"; + } else if (name.indexOf('/')==-1) { // unqualified is "hdfs://" + name = "hdfs://"+name; + } + + return get(URI.create(name), conf); + } - /** Returns a named filesystem. Names are either the string "local" or a - * host:port pair, naming an DFS name server.*/ - public static FileSystem getNamed(String name, Configuration conf) throws IOException { - FileSystem fs = (FileSystem)NAME_TO_FS.get(name); + /** Returns the FileSystem for this URI's scheme and authority. The scheme + * of the URI determines a configuration property name, + * <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class. + * The entire URI is passed to the FileSystem instance's initialize method. + */ + public static synchronized FileSystem get(URI uri, Configuration conf) + throws IOException { + + String scheme = uri.getScheme(); + String authority = uri.getAuthority(); + + if (scheme == null) { // no scheme: use default FS + return get(conf); + } + + Map<String,FileSystem> authorityToFs = CACHE.get(scheme); + if (authorityToFs == null) { + authorityToFs = new HashMap<String,FileSystem>(); + CACHE.put(scheme, authorityToFs); + } + + FileSystem fs = authorityToFs.get(authority); if (fs == null) { - if ("local".equals(name)) { - fs = new LocalFileSystem(conf); - } else { - fs = new DistributedFileSystem(DataNode.createSocketAddr(name), conf); - } - NAME_TO_FS.put(name, fs); + Class fsClass = conf.getClass("fs."+scheme+".impl", null); + if (fsClass == null) { + throw new IOException("No FileSystem for scheme: " + scheme); + } + fs = (FileSystem)ReflectionUtils.newInstance(fsClass, conf); + fs.initialize(uri, conf); + authorityToFs.put(authority, fs); } + return fs; } + /** Make sure that a path specifies a FileSystem. */ + public Path makeQualified(Path path) { + checkPath(path); + + if (!path.isAbsolute()) + path = new Path(getWorkingDirectory(), path); + + URI pathUri = path.toUri(); + URI fsUri = getUri(); + + String scheme = pathUri.getScheme(); + String authority = pathUri.getAuthority(); + + if (scheme != null && + (authority != null || fsUri.getAuthority() == null)) + return path; + + if (scheme == null) { + scheme = fsUri.getScheme(); + } + + if (authority == null) { + authority = fsUri.getAuthority(); + if (authority == null) { + authority = ""; + } + } + + return new Path(scheme+":"+"//"+authority + pathUri.getPath()); + } + /** Return the name of the checksum file associated with a file.*/ public static Path getChecksumFile(Path file) { return new Path(file.getParent(), "."+file.getName()+".crc"); @@ -123,10 +200,29 @@ // FileSystem /////////////////////////////////////////////////////////////// + /** @deprecated */ protected FileSystem(Configuration conf) { super(conf); } + protected FileSystem() { + super(null); + } + + /** Check that a Path belongs to this FileSystem. */ + protected void checkPath(Path path) { + URI uri = path.toUri(); + if (uri.getScheme() == null) // fs is relative + return; + String thisAuthority = this.getUri().getAuthority(); + String thatAuthority = uri.getAuthority(); + if (!(this.getUri().getScheme().equals(uri.getScheme()) && + (thisAuthority == null && thatAuthority == null) + || thisAuthority.equals(thatAuthority))) + throw new IllegalArgumentException("Wrong FS: "+path+ + ", expected: "+this.getUri()); + } + /** * Return a 2D array of size 1x1 or greater, containing hostnames * where portions of the given file can be found. For a nonexistent @@ -766,7 +862,13 @@ * release any held locks. */ public void close() throws IOException { - NAME_TO_FS.remove(getName()); + URI uri = getUri(); + synchronized (FileSystem.class) { + Map<String,FileSystem> authorityToFs = CACHE.get(uri.getScheme()); + if (authorityToFs != null) { + authorityToFs.remove(uri.getAuthority()); + } + } } /** Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java?view=diff&rev=486399&r1=486398&r2=486399 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java Tue Dec 12 15:00:31 2006 @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; import java.util.*; import java.nio.channels.*; +import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.Progressable; @@ -31,6 +32,8 @@ * @author Mike Cafarella *****************************************************************/ public class LocalFileSystem extends FileSystem { + private static final URI NAME = URI.create("file:///"); + private Path workingDir = new Path(System.getProperty("user.dir")); TreeMap sharedLockDataSet = new TreeMap(); @@ -39,15 +42,11 @@ // by default use copy/delete instead of rename boolean useCopyForRename = true; - /** Construct a local filesystem client. */ + public LocalFileSystem() {} + + /** @deprecated. */ public LocalFileSystem(Configuration conf) throws IOException { - super(conf); - // if you find an OS which reliably supports non-POSIX - // rename(2) across filesystems / volumes, you can - // uncomment this. - // String os = System.getProperty("os.name"); - // if (os.toLowerCase().indexOf("os-with-super-rename") != -1) - // useCopyForRename = false; + initialize(NAME, conf); } /** @@ -65,14 +64,22 @@ } } + /** @deprecated */ public String getName() { return "local"; } + public URI getUri() { return NAME; } + + public void initialize(URI uri, Configuration conf) { + setConf(conf); + } + /** Convert a path to a File. */ public File pathToFile(Path path) { + checkPath(path); if (!path.isAbsolute()) { path = new Path(workingDir, path); } - return new File(path.toString()); + return new File(path.toUri().getPath()); } /******************************************************* Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/Path.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/Path.java?view=diff&rev=486399&r1=486398&r2=486399 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/Path.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/Path.java Tue Dec 12 15:00:31 2006 @@ -19,6 +19,10 @@ package org.apache.hadoop.fs; import java.util.*; +import java.net.*; +import java.io.*; + +import org.apache.hadoop.conf.Configuration; /** Names a file or directory in a [EMAIL PROTECTED] FileSystem}. * Path strings use slash as the directory separator. A path string is @@ -32,10 +36,7 @@ static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows"); - private boolean isAbsolute; // if path starts with sepr - private String[] elements; // tokenized path elements - private String drive; // Windows drive letter - private String asString; // cached toString() value + private URI uri; // a hierarchical uri /** Resolve a child path against a parent path. */ public Path(String parent, String child) { @@ -55,72 +56,142 @@ /** Resolve a child path against a parent path. */ public Path(Path parent, Path child) { if (child.isAbsolute()) { - this.isAbsolute = child.isAbsolute; - this.elements = child.elements; + this.uri = child.uri; } else { - this.isAbsolute = parent.isAbsolute; - ArrayList list = new ArrayList(parent.elements.length+child.elements.length); - for (int i = 0; i < parent.elements.length; i++) { - list.add(parent.elements[i]); - } - for (int i = 0; i < child.elements.length; i++) { - list.add(child.elements[i]); - } - normalize(list); - this.elements = (String[])list.toArray(new String[list.size()]); + // Add a slash to parent's path so resolution is compatible with URI's + URI parentUri = parent.uri; + String parentPath = parentUri.getPath(); + if (!(parentPath.equals("/") || parentPath.equals(""))) + try { + parentUri = new URI(parentUri.getScheme(), parentUri.getAuthority(), + parentUri.getPath()+"/", null, null); + } catch (URISyntaxException e) { + throw new IllegalArgumentException(e); + } + URI resolved = parentUri.resolve(child.uri); + initialize(resolved.getScheme(), resolved.getAuthority(), + normalizePath(resolved.getPath())); } - this.drive = child.drive == null ? parent.drive : child.drive; } - /** Construct a path from a String. */ + /** Construct a path from a String. Path strings are URIs, but with + * unescaped elements and some additional normalization. */ public Path(String pathString) { - if (WINDOWS) { // parse Windows path - int colon = pathString.indexOf(':'); - if (colon == 1) { // parse Windows drive letter - this.drive = pathString.substring(0, 1); - pathString = pathString.substring(2); - } - pathString = pathString.replace('\\','/'); // convert backslash to slash + // We can't use 'new URI(String)' directly, since it assumes things are + // escaped, which we don't require of Paths. + + // add a slash in front of paths with Windows drive letters + if (hasWindowsDrive(pathString, false)) + pathString = "/"+pathString; + + // parse uri components + String scheme = null; + String authority = null; + + int start = 0; + + // parse uri scheme, if any + int colon = pathString.indexOf(':'); + int slash = pathString.indexOf('/'); + if ((colon != -1) && + ((slash == -1) || (colon < slash))) { // has a scheme + scheme = pathString.substring(0, colon); + start = colon+1; + } + + // parse uri authority, if any + if (pathString.startsWith("//", start) && + (pathString.length()-start > 2)) { // has authority + int nextSlash = pathString.indexOf('/', start+2); + int authEnd = nextSlash > 0 ? nextSlash : pathString.length(); + authority = pathString.substring(start+2, authEnd); + start = authEnd; } - // determine whether the path is absolute - this.isAbsolute = pathString.startsWith(SEPARATOR); + // uri path is the rest of the string -- query & fragment not supported + String path = pathString.substring(start, pathString.length()); + + initialize(scheme, authority, path); + } + + /** Construct a Path from components. */ + public Path(String scheme, String authority, String path) { + initialize(scheme, authority, path); + } + + private void initialize(String scheme, String authority, String path) { + try { + this.uri = new URI(scheme, authority, normalizePath(path), null, null) + .normalize(); + } catch (URISyntaxException e) { + throw new IllegalArgumentException(e); + } + } + + private String normalizePath(String path) { + // remove double slashes & backslashes + path = path.replace("//", "/"); + path = path.replace("\\", "/"); + + // trim trailing slash from non-root path (ignoring windows drive) + int minLength = hasWindowsDrive(path, true) ? 4 : 1; + if (path.length() > minLength && path.endsWith("/")) { + path = path.substring(0, path.length()-1); + } + + return path; + } - // tokenize the path into elements - Enumeration tokens = new StringTokenizer(pathString, SEPARATOR); - ArrayList list = Collections.list(tokens); - normalize(list); - this.elements = (String[])list.toArray(new String[list.size()]); + private boolean hasWindowsDrive(String path, boolean slashed) { + if (!WINDOWS) return false; + int start = slashed ? 1 : 0; + return + path.length() >= start+2 && + (slashed ? path.charAt(0) == '/' : true) && + path.charAt(start+1) == ':' && + ((path.charAt(start) >= 'A' && path.charAt(start) <= 'Z') || + (path.charAt(start) >= 'a' && path.charAt(start) <= 'z')); } - private Path(boolean isAbsolute, String[] elements, String drive) { - this.isAbsolute = isAbsolute; - this.elements = elements; - this.drive = drive; + + /** Convert this to a URI. */ + public URI toUri() { return uri; } + + /** Return the FileSystem that owns this Path. */ + public FileSystem getFileSystem(Configuration conf) throws IOException { + return FileSystem.get(this.toUri(), conf); } - /** True if this path is absolute. */ - public boolean isAbsolute() { return isAbsolute; } + /** True if the directory of this path is absolute. */ + public boolean isAbsolute() { + int start = hasWindowsDrive(uri.getPath(), true) ? 3 : 0; + return uri.getPath().startsWith(SEPARATOR, start); + } /** Returns the final component of this path.*/ public String getName() { - if (elements.length == 0) { - return ""; - } else { - return elements[elements.length-1]; - } + String path = uri.getPath(); + int slash = path.lastIndexOf(SEPARATOR); + return path.substring(slash+1); } - /** Returns the parent of a path. */ + /** Returns the parent of a path or null if at root. */ public Path getParent() { - if (elements.length == 0) { + String path = uri.getPath(); + int lastSlash = path.lastIndexOf('/'); + int start = hasWindowsDrive(path,true) ? 3 : 0; + if ((path.length() == start) || // empty path + (lastSlash == start && path.length() == start+1)) { // at root return null; } - String[] newElements = new String[elements.length-1]; - for (int i = 0; i < newElements.length; i++) { - newElements[i] = elements[i]; + String parent; + if (lastSlash==-1) { + parent = ""; + } else { + int end = hasWindowsDrive(path, true) ? 3 : 0; + parent = path.substring(0,lastSlash==end?end+1:lastSlash); } - return new Path(isAbsolute, newElements, drive); + return new Path(uri.getScheme(), uri.getAuthority(), parent); } /** Adds a suffix to the final name in the path.*/ @@ -129,27 +200,27 @@ } public String toString() { - if (asString == null) { - StringBuffer buffer = new StringBuffer(); - - if (drive != null) { - buffer.append(drive); - buffer.append(':'); - } - - if (elements.length == 0 && isAbsolute) { - buffer.append(SEPARATOR); - } - - for (int i = 0; i < elements.length; i++) { - if (i !=0 || isAbsolute) { - buffer.append(SEPARATOR); - } - buffer.append(elements[i]); - } - asString = buffer.toString(); + // we can't use uri.toString(), which escapes everything, because we want + // illegal characters unescaped in the string, for glob processing, etc. + StringBuffer buffer = new StringBuffer(); + if (uri.getScheme() != null) { + buffer.append(uri.getScheme()); + buffer.append(":"); + } + if (uri.getAuthority() != null) { + buffer.append("//"); + buffer.append(uri.getAuthority()); + } + if (uri.getPath() != null) { + String path = uri.getPath(); + if (path.indexOf('/')==0 && + hasWindowsDrive(path, true) && // has windows drive + uri.getScheme() == null && // but no scheme + uri.getAuthority() == null) // or authority + path = path.substring(1); // remove slash before drive + buffer.append(path); } - return asString; + return buffer.toString(); } public boolean equals(Object o) { @@ -157,83 +228,28 @@ return false; } Path that = (Path)o; - return - this.isAbsolute == that.isAbsolute && - Arrays.equals(this.elements, that.elements) && - (this.drive == null ? true : this.drive.equals(that.drive)); + return this.uri.equals(that.uri); } public int hashCode() { - int hashCode = isAbsolute ? 1 : -1; - for (int i = 0; i < elements.length; i++) { - hashCode ^= elements[i].hashCode(); - } - if (drive != null) { - hashCode ^= drive.hashCode(); - } - return hashCode; + return uri.hashCode(); } public int compareTo(Object o) { Path that = (Path)o; - return this.toString().compareTo(that.toString()); + return this.uri.compareTo(that.uri); } /** Return the number of elements in this path. */ public int depth() { - return elements.length; - } - - /* - * Removes '.' and '..' - */ - private void normalize(ArrayList list) { - boolean canNormalize = this.isAbsolute; - boolean changed = false; // true if we have detected any . or .. - int index = 0; - int listSize = list.size(); - for (int i = 0; i < listSize; i++) { - // Invariant: (index >= 0) && (index <= i) - if (list.get(i).equals(".")) { - changed = true; - } else { - if (canNormalize) { - if (list.get(i).equals("..")) { - if ((index > 0) && !list.get(index-1).equals("..")) { - index--; // effectively deletes the last element currently in list. - changed = true; - } else { // index == 0 - // the first element is now going to be '..' - canNormalize = false; - list.set(index, ".."); - isAbsolute = false; - index++; - } - } else { // list.get(i) != ".." or "." - if (changed) { - list.set(index, list.get(i)); - } - index++; - } - } else { // !canNormalize - if (changed) { - list.set(index, list.get(i)); - } - index++; - if (!list.get(i).equals("..")) { - canNormalize = true; - } - } // else !canNormalize - } - } // for - - // Remove the junk at the end of the list. - for (int j = listSize-1; j >= index; j--) { - list.remove(j); + String path = uri.getPath(); + int depth = 0; + int slash = path.length()==1 && path.charAt(0)=='/' ? -1 : 0; + while (slash != -1) { + depth++; + slash = path.indexOf(SEPARATOR, slash+1); } - + return depth; } - } - Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormat.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormat.java?view=diff&rev=486399&r1=486398&r2=486399 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormat.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormat.java Tue Dec 12 15:00:31 2006 @@ -43,22 +43,20 @@ /** Splits a set of input files. One split is created per map task. * - * @param fs the filesystem containing the files to be split * @param job the job whose input files are to be split * @param numSplits the desired number of splits * @return the splits */ - FileSplit[] getSplits(FileSystem fs, JobConf job, int numSplits) + FileSplit[] getSplits(FileSystem ignored, JobConf job, int numSplits) throws IOException; /** Construct a [EMAIL PROTECTED] RecordReader} for a [EMAIL PROTECTED] FileSplit}. * - * @param fs the [EMAIL PROTECTED] FileSystem} * @param split the [EMAIL PROTECTED] FileSplit} * @param job the job that this split belongs to * @return a [EMAIL PROTECTED] RecordReader} */ - RecordReader getRecordReader(FileSystem fs, FileSplit split, + RecordReader getRecordReader(FileSystem ignored, FileSplit split, JobConf job, Reporter reporter) throws IOException; } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java?view=diff&rev=486399&r1=486398&r2=486399 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java Tue Dec 12 15:00:31 2006 @@ -71,12 +71,13 @@ * @return array of Path objects, never zero length. * @throws IOException if zero items. */ - protected Path[] listPaths(FileSystem fs, JobConf job) + protected Path[] listPaths(FileSystem ignored, JobConf job) throws IOException { Path[] dirs = job.getInputPaths(); String subdir = job.get("mapred.input.subdir"); ArrayList result = new ArrayList(); for (int i = 0; i < dirs.length; i++) { + FileSystem fs = dirs[i].getFileSystem(job); Path[] dir = fs.listPaths(dirs[i]); if (dir != null) { for (int j = 0; j < dir.length; j++) { @@ -85,11 +86,11 @@ Path[] subFiles = fs.listPaths(new Path(file, subdir)); if (subFiles != null) { for (int k = 0; k < subFiles.length; k++) { - result.add(subFiles[k]); + result.add(fs.makeQualified(subFiles[k])); } } } else { - result.add(file); + result.add(fs.makeQualified(file)); } } } @@ -101,26 +102,28 @@ return (Path[])result.toArray(new Path[result.size()]); } - public boolean[] areValidInputDirectories(FileSystem fileSys, - Path[] inputDirs - ) throws IOException { + // NOTE: should really pass a Configuration here, not a FileSystem + public boolean[] areValidInputDirectories(FileSystem fs, Path[] inputDirs) + throws IOException { boolean[] result = new boolean[inputDirs.length]; for(int i=0; i < inputDirs.length; ++i) { - result[i] = fileSys.isDirectory(inputDirs[i]); + result[i] = + inputDirs[i].getFileSystem(fs.getConf()).isDirectory(inputDirs[i]); } return result; } /** Splits files returned by [EMAIL PROTECTED] #listPaths(FileSystem,JobConf)} when * they're too big.*/ - public FileSplit[] getSplits(FileSystem fs, JobConf job, int numSplits) + public FileSplit[] getSplits(FileSystem ignored, JobConf job, int numSplits) throws IOException { - Path[] files = listPaths(fs, job); + Path[] files = listPaths(ignored, job); long totalSize = 0; // compute total size for (int i = 0; i < files.length; i++) { // check we have valid files Path file = files[i]; + FileSystem fs = file.getFileSystem(job); if (fs.isDirectory(file) || !fs.exists(file)) { throw new IOException("Not a file: "+files[i]); } @@ -135,6 +138,7 @@ ArrayList splits = new ArrayList(numSplits); // generate splits for (int i = 0; i < files.length; i++) { Path file = files[i]; + FileSystem fs = file.getFileSystem(job); long length = fs.getLength(file); if (isSplitable(fs, file)) { long blockSize = fs.getBlockSize(file); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java?view=diff&rev=486399&r1=486398&r2=486399 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java Tue Dec 12 15:00:31 2006 @@ -304,8 +304,15 @@ FileSystem userFileSys = FileSystem.get(job); Path[] inputDirs = job.getInputPaths(); + + // make sure directories are fully qualified before checking them + for(int i=0; i < inputDirs.length; ++i) { + if (inputDirs[i].toUri().getScheme() == null) { + inputDirs[i] = userFileSys.makeQualified(inputDirs[i]); + } + } + // input paths should exist. - boolean[] validDirs = job.getInputFormat().areValidInputDirectories(userFileSys, inputDirs); for(int i=0; i < validDirs.length; ++i) { Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java?view=diff&rev=486399&r1=486398&r2=486399 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java Tue Dec 12 15:00:31 2006 @@ -34,7 +34,7 @@ /** An [EMAIL PROTECTED] OutputFormat} that writes [EMAIL PROTECTED] MapFile}s. */ public class MapFileOutputFormat extends OutputFormatBase { - public RecordWriter getRecordWriter(FileSystem fs, JobConf job, + public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException { @@ -42,7 +42,7 @@ // ignore the progress parameter, since MapFile is local final MapFile.Writer out = - new MapFile.Writer(job, fs, file.toString(), + new MapFile.Writer(job, file.getFileSystem(job), file.toString(), job.getMapOutputKeyClass(), job.getMapOutputValueClass(), SequenceFile.getCompressionType(job), @@ -61,11 +61,12 @@ } /** Open the output generated by this format. */ - public static MapFile.Reader[] getReaders(FileSystem fs, Path dir, + public static MapFile.Reader[] getReaders(FileSystem ignored, Path dir, Configuration conf) throws IOException { + FileSystem fs = dir.getFileSystem(conf); Path[] names = fs.listPaths(dir); - + // sort names, so that hash partitioning works Arrays.sort(names); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormat.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormat.java?view=diff&rev=486399&r1=486398&r2=486399 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormat.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormat.java Tue Dec 12 15:00:31 2006 @@ -29,13 +29,12 @@ /** Construct a [EMAIL PROTECTED] RecordWriter} with Progressable. * - * @param fs the file system to write to * @param job the job whose output is being written * @param name the unique name for this part of the output * @param progress mechanism for reporting progress while writing to file * @return a [EMAIL PROTECTED] RecordWriter} */ - RecordWriter getRecordWriter(FileSystem fs, JobConf job, String name, + RecordWriter getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException; @@ -47,6 +46,6 @@ * @param job the job whose output will be written * @throws IOException when output should not be attempted */ - void checkOutputSpecs(FileSystem fs, JobConf job) throws IOException; + void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException; } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormatBase.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormatBase.java?view=diff&rev=486399&r1=486398&r2=486399 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormatBase.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormatBase.java Tue Dec 12 15:00:31 2006 @@ -79,12 +79,12 @@ } } - public abstract RecordWriter getRecordWriter(FileSystem fs, + public abstract RecordWriter getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException; - public void checkOutputSpecs(FileSystem fs, JobConf job) + public void checkOutputSpecs(FileSystem ignored, JobConf job) throws FileAlreadyExistsException, InvalidJobConfException, IOException { // Ensure that the output directory is set and not already there @@ -92,9 +92,9 @@ if (outDir == null && job.getNumReduceTasks() != 0) { throw new InvalidJobConfException("Output directory not set in JobConf."); } - if (outDir != null && fs.exists(outDir)) { + if (outDir != null && outDir.getFileSystem(job).exists(outDir)) { throw new FileAlreadyExistsException("Output directory " + outDir + - " already exists in " + fs.getName() ); + " already exists"); } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java?view=diff&rev=486399&r1=486398&r2=486399 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java Tue Dec 12 15:00:31 2006 @@ -3,6 +3,8 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; +import java.net.URI; +import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; @@ -28,6 +30,8 @@ public class PhasedFileSystem extends FileSystem { private FileSystem baseFS ; + private URI uri; + // Map from final file name to temporary file name private Map<Path, FileInfo> finalNameToFileInfo = new HashMap(); @@ -110,6 +114,14 @@ return tempPath ; } + public URI getUri() { + return baseFS.getUri(); + } + + public void initialize(URI uri, Configuration conf) throws IOException { + baseFS.initialize(uri, conf); + } + @Override public FSOutputStream createRaw( Path f, boolean overwrite, short replication, long blockSize) Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java?view=diff&rev=486399&r1=486398&r2=486399 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java Tue Dec 12 15:00:31 2006 @@ -36,11 +36,12 @@ /** An [EMAIL PROTECTED] OutputFormat} that writes [EMAIL PROTECTED] SequenceFile}s. */ public class SequenceFileOutputFormat extends OutputFormatBase { - public RecordWriter getRecordWriter(FileSystem fs, JobConf job, + public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException { Path file = new Path(job.getOutputPath(), name); + FileSystem fs = file.getFileSystem(job); CompressionCodec codec = null; CompressionType compressionType = CompressionType.NONE; if (getCompressOutput(job)) { @@ -75,7 +76,7 @@ /** Open the output generated by this format. */ public static SequenceFile.Reader[] getReaders(Configuration conf, Path dir) throws IOException { - FileSystem fs = FileSystem.get(conf); + FileSystem fs = dir.getFileSystem(conf); Path[] names = fs.listPaths(dir); // sort names, so that hash partitioning works Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextOutputFormat.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextOutputFormat.java?view=diff&rev=486399&r1=486398&r2=486399 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextOutputFormat.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextOutputFormat.java Tue Dec 12 15:00:31 2006 @@ -53,10 +53,12 @@ } } - public RecordWriter getRecordWriter(FileSystem fs, JobConf job, - String name, Progressable progress) throws IOException { + public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, + String name, Progressable progress) + throws IOException { Path dir = job.getOutputPath(); + FileSystem fs = dir.getFileSystem(job); boolean isCompressed = getCompressOutput(job); if (!isCompressed) { FSDataOutputStream fileOut = fs.create(new Path(dir, name), progress); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java?view=diff&rev=486399&r1=486398&r2=486399 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java Tue Dec 12 15:00:31 2006 @@ -132,8 +132,8 @@ */ public static Path makeRelative(Path root, Path absPath) { if (!absPath.isAbsolute()) { return absPath; } - String sRoot = root.toString(); - String sPath = absPath.toString(); + String sRoot = root.toUri().getPath(); + String sPath = absPath.toUri().getPath(); Enumeration rootTokens = new StringTokenizer(sRoot, "/"); ArrayList rList = Collections.list(rootTokens); Enumeration pathTokens = new StringTokenizer(sPath, "/"); @@ -815,7 +815,7 @@ try { copy(conf, srcPath, destPath, srcAsList, ignoreReadFailures); } catch (Exception e) { - System.out.println("Caught: " + e); + System.err.println("Copy failed: "+StringUtils.stringifyException(e)); return -1; } Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestPath.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestPath.java?view=diff&rev=486399&r1=486398&r2=486399 ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestPath.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestPath.java Tue Dec 12 15:00:31 2006 @@ -90,7 +90,6 @@ assertEquals(new Path("foo/bar/baz"), new Path("foo/bar", "baz")); assertEquals(new Path("/foo"), new Path("/bar", "/foo")); if (Path.WINDOWS) { - assertEquals(new Path("c:/foo"), new Path("c:/bar", "/foo")); assertEquals(new Path("c:/foo"), new Path("/bar", "c:/foo")); assertEquals(new Path("c:/foo"), new Path("d:/bar", "c:/foo")); } @@ -103,6 +102,7 @@ public void testDots() { // Test Path(String) assertEquals(new Path("/foo/bar/baz").toString(), "/foo/bar/baz"); + assertEquals(new Path("/foo/bar", ".").toString(), "/foo/bar"); assertEquals(new Path("/foo/bar/../baz").toString(), "/foo/baz"); assertEquals(new Path("/foo/bar/./baz").toString(), "/foo/bar/baz"); assertEquals(new Path("/foo/bar/baz/../../fud").toString(), "/foo/fud"); @@ -111,7 +111,6 @@ assertEquals(new Path(".././../foo/bar").toString(), "../../foo/bar"); assertEquals(new Path("./foo/bar/baz").toString(), "foo/bar/baz"); assertEquals(new Path("/foo/bar/../../baz/boo").toString(), "/baz/boo"); - assertEquals(new Path("/foo/bar/../../../baz").toString(), "../baz"); assertEquals(new Path("foo/bar/").toString(), "foo/bar"); assertEquals(new Path("foo/bar/../baz").toString(), "foo/baz"); assertEquals(new Path("foo/bar/../../baz/boo").toString(), "baz/boo"); @@ -128,7 +127,6 @@ assertEquals(new Path("/foo/bar/baz","../../boo/bud").toString(), "/foo/boo/bud"); assertEquals(new Path("foo/bar/baz","../../boo/bud").toString(), "foo/boo/bud"); - assertEquals(new Path("/foo/bar/","../../../baz/boo").toString(), "../baz/boo"); assertEquals(new Path("../../","../../boo/bud").toString(), "../../../../boo/bud"); assertEquals(new Path("../../foo","../../../boo/bud").toString(), "../../../../boo/bud");