Author: cutting Date: Fri Jan 5 12:12:14 2007 New Revision: 493169 URL: http://svn.apache.org/viewvc?view=rev&rev=493169 Log: HADOOP-619. Extend InputFormatBase to accept individual files and glob patterns as MapReduce inputs. Contributed by Sanjay.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/Path.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=493169&r1=493168&r2=493169 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Fri Jan 5 12:12:14 2007 @@ -200,6 +200,10 @@ hadoop-${version}-core.jar so that it can be more easily identified. (Nigel Daley via cutting) +57. HADOOP-619. Extend InputFormatBase to accept individual files and + glob patterns as MapReduce inputs, not just directories. Also + change contrib/streaming to use this. (Sanjay Dahia via cutting) + Release 0.9.2 - 2006-12-15 Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java?view=diff&rev=493169&r1=493168&r2=493169 ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java Fri Jan 5 12:12:14 2007 @@ -21,20 +21,13 @@ import java.io.*; import java.lang.reflect.*; import java.util.ArrayList; -import java.util.Arrays; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import org.apache.commons.logging.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; - import org.apache.hadoop.mapred.*; /** An input format that performs globbing on DFS paths and @@ -48,10 +41,6 @@ protected static final Log LOG = LogFactory.getLog(StreamInputFormat.class.getName()); - /** This implementation always returns true. */ - public void validateInput(JobConf job) throws IOException { - } - static boolean isGzippedInput(JobConf job) { String val = job.get(StreamBaseRecordReader.CONF_NS + "compression"); return "gzip".equals(val); @@ -77,55 +66,6 @@ splits.add(new FileSplit(file, 0, splitSize, job)); } return (FileSplit[]) splits.toArray(new FileSplit[splits.size()]); - } - - protected Path[] listPaths(JobConf job) throws IOException { - Path[] globs = job.getInputPaths(); - ArrayList list = new ArrayList(); - int dsup = globs.length; - for (int d = 0; d < dsup; d++) { - String leafName = globs[d].getName(); - LOG.info("StreamInputFormat: globs[" + d + "] leafName = " + leafName); - Path[] paths; - Path dir; - FileSystem fs = globs[d].getFileSystem(job); - PathFilter filter = new GlobFilter(fs, leafName); - dir = new Path(globs[d].getParent().toString()); - if (dir == null) dir = new Path("."); - paths = fs.listPaths(dir, filter); - list.addAll(Arrays.asList(paths)); - } - return (Path[]) list.toArray(new Path[] {}); - } - - class GlobFilter implements PathFilter { - - public GlobFilter(FileSystem fs, String glob) { - fs_ = fs; - pat_ = Pattern.compile(globToRegexp(glob)); - } - - String globToRegexp(String glob) { - String re = glob; - re = re.replaceAll("\\.", "\\\\."); - re = re.replaceAll("\\+", "\\\\+"); - re = re.replaceAll("\\*", ".*"); - re = re.replaceAll("\\?", "."); - LOG.info("globToRegexp: |" + glob + "| -> |" + re + "|"); - return re; - } - - public boolean accept(Path pathname) { - boolean acc = !fs_.isChecksumFile(pathname); - if (acc) { - acc = pat_.matcher(pathname.getName()).matches(); - } - LOG.info("matches " + pat_ + ", " + pathname + " = " + acc); - return acc; - } - - Pattern pat_; - FileSystem fs_; } public RecordReader getRecordReader(final InputSplit genericSplit, Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java?view=diff&rev=493169&r1=493168&r2=493169 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java Fri Jan 5 12:12:14 2007 @@ -652,8 +652,7 @@ throws IOException { Path [] parents = new Path[1]; int level = 0; - - String filename = filePattern.toString(); + String filename = filePattern.toUri().getPath(); if("".equals(filename) || Path.SEPARATOR.equals(filename)) { parents[0] = filePattern; return parents; Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/Path.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/Path.java?view=diff&rev=493169&r1=493168&r2=493169 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/Path.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/Path.java Fri Jan 5 12:12:14 2007 @@ -32,6 +32,7 @@ /** The directory separator, a slash. */ public static final String SEPARATOR = "/"; + public static final char SEPARATOR_CHAR = '/'; static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows"); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java?view=diff&rev=493169&r1=493168&r2=493169 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java Fri Jan 5 12:12:14 2007 @@ -22,12 +22,14 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import org.apache.commons.logging.*; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; /** A base class for [EMAIL PROTECTED] InputFormat}. */ public abstract class InputFormatBase implements InputFormat { @@ -38,7 +40,12 @@ private static final double SPLIT_SLOP = 1.1; // 10% slop private long minSplitSize = 1; - + private static final PathFilter hiddenFileFilter = new PathFilter(){ + public boolean accept( Path p ){ + String name = p.getName(); + return !name.startsWith("_") && !name.startsWith("."); + } + }; protected void setMinSplitSize(long minSplitSize) { this.minSplitSize = minSplitSize; } @@ -63,39 +70,23 @@ * Subclasses may override to, e.g., select only files matching a regular * expression. * - * <p>Property <code>mapred.input.subdir</code>, if set, names a subdirectory - * that is appended to all input dirs specified by job, and if the given fs - * lists those too, each is added to the returned array of Path. - * * @param job the job to list input paths for * @return array of Path objects * @throws IOException if zero items. */ protected Path[] listPaths(JobConf job) throws IOException { - String subdir = job.get("mapred.input.subdir"); Path[] dirs = job.getInputPaths(); if (dirs.length == 0) { - throw new IOException("No input directories specified in job"); + throw new IOException("No input paths specified in job"); } - ArrayList result = new ArrayList(); - for (int i = 0; i < dirs.length; i++) { - FileSystem fs = dirs[i].getFileSystem(job); - Path[] dir = fs.listPaths(dirs[i]); - if (dir != null) { - for (int j = 0; j < dir.length; j++) { - Path file = dir[j]; - if (subdir != null) { - Path[] subFiles = fs.listPaths(new Path(file, subdir)); - if (subFiles != null) { - for (int k = 0; k < subFiles.length; k++) { - result.add(fs.makeQualified(subFiles[k])); - } - } - } else { - result.add(fs.makeQualified(file)); - } - } + List<Path> result = new ArrayList(); + for (Path p: dirs) { + FileSystem fs = p.getFileSystem(job); + Path[] matches = + fs.listPaths(fs.globPaths(p, hiddenFileFilter),hiddenFileFilter); + for (Path match: matches) { + result.add(fs.makeQualified(match)); } } @@ -104,22 +95,50 @@ public void validateInput(JobConf job) throws IOException { Path[] inputDirs = job.getInputPaths(); + if (inputDirs.length == 0) { + throw new IOException("No input paths specified in input"); + } + List<IOException> result = new ArrayList(); - for(int i=0; i < inputDirs.length; ++i) { - FileSystem fs = inputDirs[i].getFileSystem(job); - if (!fs.exists(inputDirs[i])) { - result.add(new FileNotFoundException("Input directory " + - inputDirs[i] + - " doesn't exist.")); - } else if (!fs.isDirectory(inputDirs[i])) { - result.add(new InvalidFileTypeException - ("Invalid input path, expecting directory : " + - inputDirs[i])); + int totalFiles = 0; + for (Path p: inputDirs) { + FileSystem fs = p.getFileSystem(job); + if (fs.exists(p)) { + // make sure all paths are files to avoid exception + // while generating splits + for (Path subPath : fs.listPaths(p, hiddenFileFilter)) { + FileSystem subFS = subPath.getFileSystem(job); + if (!subFS.isFile(subPath)) { + result.add(new IOException( + "Input path is not a file : " + subPath)); + } else { + totalFiles++; + } + } + } else { + Path [] paths = fs.globPaths(p, hiddenFileFilter); + if (paths.length == 0) { + result.add( + new IOException("Input Pattern " + p + " matches 0 files")); + } else { + // validate globbed paths + for (Path gPath : paths) { + FileSystem gPathFS = gPath.getFileSystem(job); + if (!gPathFS.exists(gPath)) { + result.add( + new FileNotFoundException( + "Input path doesnt exist : " + gPath)); + } + } + totalFiles += paths.length ; + } } } if (!result.isEmpty()) { throw new InvalidInputException(result); } + // send output to client. + LOG.info("Total input paths to process : " + totalFiles); } /** Splits files returned by [EMAIL PROTECTED] #listPaths(JobConf)} when @@ -146,7 +165,7 @@ Path file = files[i]; FileSystem fs = file.getFileSystem(job); long length = fs.getLength(file); - if (isSplitable(fs, file)) { + if (isSplitable(fs, file)) { long blockSize = fs.getBlockSize(file); long splitSize = computeSplitSize(goalSize, minSize, blockSize);