Author: omalley
Date: Wed Jun 4 13:31:04 2008
New Revision: 663370
URL: http://svn.apache.org/viewvc?rev=663370&view=rev
Log:
HADOOP-3095. Speed up split generation in the FileInputSplit,
especially for non-HDFS file systems. Deprecates
InputFormat.validateInput. Contributed by Tom White.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/HftpFileSystem.java
hadoop/core/trunk/src/java/org/apache/hadoop/fs/FileSystem.java
hadoop/core/trunk/src/java/org/apache/hadoop/fs/FilterFileSystem.java
hadoop/core/trunk/src/java/org/apache/hadoop/fs/InMemoryFileSystem.java
hadoop/core/trunk/src/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java
hadoop/core/trunk/src/java/org/apache/hadoop/fs/kfs/KosmosFileSystem.java
hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystem.java
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/FileInputFormat.java
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/InputFormat.java
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=663370&r1=663369&r2=663370&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Jun 4 13:31:04 2008
@@ -248,6 +248,10 @@
HADOOP-1702. Reduce buffer copies when data is written to DFS.
DataNodes take 30% less CPU while writing data. (rangadi)
+ HADOOP-3095. Speed up split generation in the FileInputSplit,
+ especially for non-HDFS file systems. Deprecates
+ InputFormat.validateInput. (tomwhite via omalley)
+
BUG FIXES
HADOOP-2905. 'fsck -move' triggers NPE in NameNode.
Modified:
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java?rev=663370&r1=663369&r2=663370&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java
(original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java
Wed Jun 4 13:31:04 2008
@@ -114,9 +114,12 @@
}
- public BlockLocation[] getFileBlockLocations(Path f, long start,
+ public BlockLocation[] getFileBlockLocations(FileStatus file, long start,
long len) throws IOException {
- return dfs.getBlockLocations(getPathName(f), start, len);
+ if (file == null) {
+ return null;
+ }
+ return dfs.getBlockLocations(getPathName(file.getPath()), start, len);
}
public void setVerifyChecksum(boolean verifyChecksum) {
@@ -352,11 +355,15 @@
DfsPath p = (DfsPath) f;
return p.info;
}
- FileStatus fs = dfs.getFileInfo(getPathName(f));
- if (fs != null)
- return fs;
- else
+ DFSFileInfo fi = dfs.getFileInfo(getPathName(f));
+ if (fi != null) {
+ return new FileStatus(fi.getLen(), fi.isDir(), fi.getReplication(),
+ fi.getBlockSize(), fi.getModificationTime(),
+ fi.getPermission(), fi.getOwner(), fi.getGroup(),
+ new DfsPath(fi, this)); // fully-qualify path;
+ } else {
throw new FileNotFoundException("File does not exist: " + f);
+ }
}
/** [EMAIL PROTECTED] }*/
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/HftpFileSystem.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/HftpFileSystem.java?rev=663370&r1=663369&r2=663370&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/HftpFileSystem.java
(original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/HftpFileSystem.java Wed
Jun 4 13:31:04 2008
@@ -161,11 +161,13 @@
Long.valueOf(attrs.getValue("blocksize")).longValue(),
modif, FsPermission.valueOf(attrs.getValue("permission")),
attrs.getValue("owner"), attrs.getValue("group"),
- new Path(getUri().toString(), attrs.getValue("path")))
+ new Path(getUri().toString(), attrs.getValue("path"))
+ .makeQualified(HftpFileSystem.this))
: new FileStatus(0L, true, 0, 0L,
modif, FsPermission.valueOf(attrs.getValue("permission")),
attrs.getValue("owner"), attrs.getValue("group"),
- new Path(getUri().toString(), attrs.getValue("path")));
+ new Path(getUri().toString(), attrs.getValue("path"))
+ .makeQualified(HftpFileSystem.this));
fslist.add(fs);
}
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/fs/FileSystem.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/fs/FileSystem.java?rev=663370&r1=663369&r2=663370&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/fs/FileSystem.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/fs/FileSystem.java Wed Jun 4
13:31:04 2008
@@ -326,21 +326,35 @@
* hostnames of machines that contain the given file.
*
* The FileSystem will simply return an elt containing 'localhost'.
+ * @deprecated use [EMAIL PROTECTED] #getFileBlockLocations(FileStatus,
long, long)}
*/
+ @Deprecated
public BlockLocation[] getFileBlockLocations(Path f,
long start, long len) throws IOException {
- if (!exists(f)) {
+
+ return getFileBlockLocations(getFileStatus(f), start, len);
+ }
+
+ /**
+ * Return an array containing hostnames, offset and size of
+ * portions of the given file. For a nonexistent
+ * file or regions, null will be returned.
+ *
+ * This call is most helpful with DFS, where it returns
+ * hostnames of machines that contain the given file.
+ *
+ * The FileSystem will simply return an elt containing 'localhost'.
+ */
+ public BlockLocation[] getFileBlockLocations(FileStatus file,
+ long start, long len) throws IOException {
+ if (file == null) {
return null;
- } else {
- BlockLocation result[] = new BlockLocation[1];
- String[] name = new String[1];
- name[0] = "localhost:50010";
- String[] host = new String[1];
- host[0] = "localhost";
- result[0] = new BlockLocation(name, host, 0, len);
- return result;
}
+ String[] name = { "localhost:50010" };
+ String[] host = { "localhost" };
+ return new BlockLocation[] { new BlockLocation(name, host, 0, len) };
}
+
/**
* Opens an FSDataInputStream at the indicated Path.
* @param f the file name to open
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/fs/FilterFileSystem.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/fs/FilterFileSystem.java?rev=663370&r1=663369&r2=663370&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/fs/FilterFileSystem.java
(original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/fs/FilterFileSystem.java Wed
Jun 4 13:31:04 2008
@@ -96,6 +96,11 @@
return fs.getFileBlockLocations(f, start, len);
}
+ public BlockLocation[] getFileBlockLocations(FileStatus file, long start,
+ long len) throws IOException {
+ return fs.getFileBlockLocations(file, start, len);
+ }
+
/**
* Opens an FSDataInputStream at the indicated Path.
* @param f the file name to open
Modified:
hadoop/core/trunk/src/java/org/apache/hadoop/fs/InMemoryFileSystem.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/fs/InMemoryFileSystem.java?rev=663370&r1=663369&r2=663370&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/fs/InMemoryFileSystem.java
(original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/fs/InMemoryFileSystem.java Wed
Jun 4 13:31:04 2008
@@ -81,30 +81,6 @@
return uri;
}
- /**
- * Return an array containing hostnames, offset and size.
- *
- * This call is most helpful with DFS, where it returns
- * hostnames for machines that contain the given file.
-
- * The InMemoryFileSystem will simply return an elt
- * containing 'inmemory'
- */
- public BlockLocation[] getFileBlockLocations (Path f,
- long start, long len) throws IOException {
- if (!exists(f)) {
- return null;
- } else {
- BlockLocation result[] = new BlockLocation[1];
- String[] name = new String[1];
- name[0] = "inmemory:50010";
- String[] host = new String[1];
- host[0] = "inmemory";
- result[0] = new BlockLocation(name, host, 0, len);
- return result;
- }
- }
-
private class InMemoryInputStream extends FSInputStream {
private DataInputBuffer din = new DataInputBuffer();
private FileAttributes fAttr;
@@ -309,7 +285,7 @@
if (attr==null) {
throw new FileNotFoundException("File " + f + " does not exist.");
}
- return new InMemoryFileStatus(f, attr);
+ return new InMemoryFileStatus(f.makeQualified(this), attr);
}
}
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java?rev=663370&r1=663369&r2=663370&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java
(original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java Wed
Jun 4 13:31:04 2008
@@ -403,7 +403,7 @@
long modTime = -1; // Modification time of root dir not known.
Path root = new Path("/");
return new FileStatus(length, isDir, blockReplication, blockSize,
- modTime, root);
+ modTime, root.makeQualified(this));
}
String pathName = parentPath.toUri().getPath();
FTPFile[] ftpFiles = client.listFiles(pathName);
@@ -443,7 +443,7 @@
String group = ftpFile.getGroup();
Path filePath = new Path(parentPath, ftpFile.getName());
return new FileStatus(length, isDir, blockReplication, blockSize, modTime,
- permission, user, group, filePath);
+ permission, user, group, filePath.makeQualified(this));
}
@Override
Modified:
hadoop/core/trunk/src/java/org/apache/hadoop/fs/kfs/KosmosFileSystem.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/fs/kfs/KosmosFileSystem.java?rev=663370&r1=663369&r2=663370&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/fs/kfs/KosmosFileSystem.java
(original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/fs/kfs/KosmosFileSystem.java
Wed Jun 4 13:31:04 2008
@@ -173,13 +173,14 @@
}
if (kfsImpl.isDirectory(srep)) {
// System.out.println("Status of path: " + path + " is dir");
- return new FileStatus(0, true, 1, 0, 0, path);
+ return new FileStatus(0, true, 1, 0, 0, path.makeQualified(this));
} else {
// System.out.println("Status of path: " + path + " is file");
return new FileStatus(kfsImpl.filesize(srep), false,
kfsImpl.getReplication(srep),
getDefaultBlockSize(),
- kfsImpl.getModificationTime(srep), path);
+ kfsImpl.getModificationTime(srep),
+ path.makeQualified(this));
}
}
@@ -308,12 +309,14 @@
* Return null if the file doesn't exist; otherwise, get the
* locations of the various chunks of the file file from KFS.
*/
- public BlockLocation[] getBlockLocations(Path f, long start, long len
- ) throws IOException {
- if (!exists(f)) {
+ @Override
+ public BlockLocation[] getFileBlockLocations(FileStatus file, long start,
+ long len) throws IOException {
+
+ if (file == null) {
return null;
}
- String srep = makeAbsolute(f).toUri().getPath();
+ String srep = makeAbsolute(file.getPath()).toUri().getPath();
String[][] hints = kfsImpl.getDataLocation(srep, start, len);
BlockLocation[] result = new BlockLocation[hints.length];
long blockSize = getDefaultBlockSize();
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystem.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystem.java?rev=663370&r1=663369&r2=663370&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystem.java
(original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystem.java Wed
Jun 4 13:31:04 2008
@@ -170,7 +170,9 @@
return null;
}
if (inode.isFile()) {
- return new FileStatus[] { new S3FileStatus(f, inode) };
+ return new FileStatus[] {
+ new S3FileStatus(f.makeQualified(this), inode)
+ };
}
ArrayList<FileStatus> ret = new ArrayList<FileStatus>();
for (Path p : store.listSubPaths(absolutePath)) {
@@ -314,7 +316,7 @@
if (inode == null) {
throw new FileNotFoundException(f + ": No such file or directory.");
}
- return new S3FileStatus(f, inode);
+ return new S3FileStatus(f.makeQualified(this), inode);
}
// diagnostic methods
Modified:
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/FileInputFormat.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/FileInputFormat.java?rev=663370&r1=663369&r2=663370&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/FileInputFormat.java
(original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/FileInputFormat.java
Wed Jun 4 13:31:04 2008
@@ -18,18 +18,18 @@
package org.apache.hadoop.mapred;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
@@ -38,9 +38,9 @@
* A base class for file-based [EMAIL PROTECTED] InputFormat}.
*
* <p><code>FileInputFormat</code> is the base class for all file-based
- * <code>InputFormat</code>s. This provides generic implementations of
- * [EMAIL PROTECTED] #validateInput(JobConf)} and [EMAIL PROTECTED]
#getSplits(JobConf, int)}.
- * Implementations fo <code>FileInputFormat</code> can also override the
+ * <code>InputFormat</code>s. This provides a generic implementation of
+ * [EMAIL PROTECTED] #getSplits(JobConf, int)}.
+ * Subclasses of <code>FileInputFormat</code> can also override the
* [EMAIL PROTECTED] #isSplitable(FileSystem, Path)} method to ensure
input-files are
* not split-up and are processed as a whole by [EMAIL PROTECTED] Mapper}s.
*/
@@ -121,7 +121,7 @@
* @return the PathFilter instance set for the job, NULL if none has been
set.
*/
public static PathFilter getInputPathFilter(JobConf conf) {
- Class filterClass = conf.getClass("mapred.input.pathFilter.class", null,
+ Class<?> filterClass = conf.getClass("mapred.input.pathFilter.class", null,
PathFilter.class);
return (filterClass != null) ?
(PathFilter) ReflectionUtils.newInstance(filterClass, conf) : null;
@@ -132,18 +132,18 @@
* expression.
*
* @param job the job to list input paths for
- * @return array of Path objects
+ * @return array of FileStatus objects
* @throws IOException if zero items.
*/
- protected Path[] listPaths(JobConf job)
- throws IOException {
+ protected FileStatus[] listStatus(JobConf job) throws IOException {
Path[] dirs = getInputPaths(job);
if (dirs.length == 0) {
throw new IOException("No input paths specified in job");
}
- List<Path> result = new ArrayList<Path>();
-
+ List<FileStatus> result = new ArrayList<FileStatus>();
+ List<IOException> errors = new ArrayList<IOException>();
+
// creates a MultiPathFilter with the hiddenFileFilter and the
// user provided one (if any).
List<PathFilter> filters = new ArrayList<PathFilter>();
@@ -156,73 +156,119 @@
for (Path p: dirs) {
FileSystem fs = p.getFileSystem(job);
- FileStatus[] matches = fs.listStatus(FileUtil.stat2Paths(fs.globStatus(p,
- inputFilter)), inputFilter);
-
- for (FileStatus match: matches) {
- result.add(fs.makeQualified(match.getPath()));
- }
- }
-
- return result.toArray(new Path[result.size()]);
- }
-
- public void validateInput(JobConf job) throws IOException {
- Path[] inputDirs = getInputPaths(job);
- if (inputDirs.length == 0) {
- throw new IOException("No input paths specified in input");
- }
-
- List<IOException> result = new ArrayList<IOException>();
- int totalFiles = 0;
- for (Path p: inputDirs) {
- FileSystem fs = p.getFileSystem(job);
- if (fs.exists(p)) {
- // make sure all paths are files to avoid exception
- // while generating splits
- Path[] subPaths = FileUtil.stat2Paths(fs.listStatus(p,
- hiddenFileFilter));
- for (Path subPath : subPaths) {
- FileSystem subFS = subPath.getFileSystem(job);
- if (!subFS.exists(subPath)) {
- result.add(new IOException(
- "Input path does not exist: " +
subPath));
- } else {
- totalFiles++;
- }
- }
+ FileStatus[] matches = fs.globStatus(p, inputFilter);
+ if (matches == null) {
+ errors.add(new IOException("Input path does not exist: " + p));
+ } else if (matches.length == 0) {
+ errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
} else {
- Path [] paths = FileUtil.stat2Paths(fs.globStatus(p,
- hiddenFileFilter),
p);
- if (paths.length == 0) {
- result.add(
- new IOException("Input Pattern " + p + " matches 0
files"));
- } else {
- // validate globbed paths
- for (Path gPath : paths) {
- FileSystem gPathFS = gPath.getFileSystem(job);
- if (!gPathFS.exists(gPath)) {
- result.add(
- new FileNotFoundException(
- "Input path doesnt exist :
" + gPath));
- }
+ for (FileStatus globStat: matches) {
+ if (globStat.isDir()) {
+ for(FileStatus stat: fs.listStatus(globStat.getPath(),
+ inputFilter)) {
+ result.add(stat);
+ }
+ } else {
+ result.add(globStat);
}
- totalFiles += paths.length;
}
}
}
- if (!result.isEmpty()) {
- throw new InvalidInputException(result);
+
+ if (!errors.isEmpty()) {
+ throw new InvalidInputException(errors);
}
- // send output to client.
- LOG.info("Total input paths to process : " + totalFiles);
+ LOG.info("Total input paths to process : " + result.size());
+ return result.toArray(new FileStatus[result.size()]);
+ }
+
+ /** List input directories.
+ * Subclasses may override to, e.g., select only files matching a regular
+ * expression.
+ *
+ * @param job the job to list input paths for
+ * @return array of Path objects
+ * @throws IOException if zero items.
+ * @deprecated Use [EMAIL PROTECTED] #listStatus(JobConf)} instead.
+ */
+ @Deprecated
+ protected Path[] listPaths(JobConf job)
+ throws IOException {
+ return FileUtil.stat2Paths(listStatus(job));
+ }
+
+ @Deprecated
+ public void validateInput(JobConf job) throws IOException {
+ // handled by getSplits
}
- /** Splits files returned by [EMAIL PROTECTED] #listPaths(JobConf)} when
+ /** Splits files returned by [EMAIL PROTECTED] #listStatus(JobConf)} when
* they're too big.*/
+ @SuppressWarnings("deprecation")
public InputSplit[] getSplits(JobConf job, int numSplits)
throws IOException {
- Path[] files = listPaths(job);
+ FileStatus[] files = listStatus(job);
+
+ // Applications may have overridden listPaths so we need to check if
+ // it returns a different set of paths to listStatus.
+ // If it does we revert to the old behavior using Paths not FileStatus
+ // objects.
+ // When listPaths is removed, this check can be removed too.
+ Path[] paths = listPaths(job);
+ if (!Arrays.equals(paths, FileUtil.stat2Paths(files))) {
+ LOG.warn("FileInputFormat#listPaths is deprecated, override listStatus "
+
+ "instead.");
+ return getSplitsForPaths(job, numSplits, paths);
+ }
+ long totalSize = 0; // compute total size
+ for (FileStatus file: files) { // check we have valid files
+ if (file.isDir()) {
+ throw new IOException("Not a file: "+ file.getPath());
+ }
+ totalSize += file.getLen();
+ }
+
+ long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
+ long minSize = Math.max(job.getLong("mapred.min.split.size", 1),
+ minSplitSize);
+
+ // generate splits
+ ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
+ for (FileStatus file: files) {
+ Path path = file.getPath();
+ FileSystem fs = path.getFileSystem(job);
+ long length = file.getLen();
+ BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
+ if ((length != 0) && isSplitable(fs, path)) {
+ long blockSize = file.getBlockSize();
+ long splitSize = computeSplitSize(goalSize, minSize, blockSize);
+
+ long bytesRemaining = length;
+ while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
+ int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
+ splits.add(new FileSplit(path, length-bytesRemaining, splitSize,
+ blkLocations[blkIndex].getHosts()));
+ bytesRemaining -= splitSize;
+ }
+
+ if (bytesRemaining != 0) {
+ splits.add(new FileSplit(path, length-bytesRemaining,
bytesRemaining,
+ blkLocations[blkLocations.length-1].getHosts()));
+ }
+ } else if (length != 0) {
+ splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));
+ } else {
+ //Create empty hosts array for zero length files
+ splits.add(new FileSplit(path, 0, length, new String[0]));
+ }
+ }
+ LOG.debug("Total # of splits: " + splits.size());
+ return splits.toArray(new FileSplit[splits.size()]);
+ }
+
+ @Deprecated
+ private InputSplit[] getSplitsForPaths(JobConf job, int numSplits,
+ Path[] files) throws IOException {
long totalSize = 0; // compute total size
for (int i = 0; i < files.length; i++) { // check we have valid files
Path file = files[i];
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/InputFormat.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/InputFormat.java?rev=663370&r1=663369&r2=663370&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/InputFormat.java
(original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/InputFormat.java Wed
Jun 4 13:31:04 2008
@@ -74,6 +74,8 @@
*
* @param job job configuration.
* @throws InvalidInputException if the job does not have valid input
+ * @deprecated getSplits is called in the client and can perform any
+ * necessary validation of the input
*/
void validateInput(JobConf job) throws IOException;
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobClient.java?rev=663370&r1=663369&r2=663370&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
(original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobClient.java Wed Jun
4 13:31:04 2008
@@ -672,7 +672,8 @@
// Check the input specification
- job.getInputFormat().validateInput(job);
+ InputFormat inFormat = job.getInputFormat();
+ inFormat.validateInput(job);
// Check the output specification
job.getOutputFormat().checkOutputSpecs(fs, job);
@@ -680,7 +681,7 @@
// Create the splits for the job
LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));
InputSplit[] splits =
- job.getInputFormat().getSplits(job, job.getNumMapTasks());
+ inFormat.getSplits(job, job.getNumMapTasks());
// sort the splits into order based on size, so that the biggest
// go first
Arrays.sort(splits, new Comparator<InputSplit>() {