Repository: hadoop Updated Branches: refs/heads/branch-2 caae0a0a0 -> c8648bd27
HADOOP-10987. Provide an iterator-based listing API for FileSystem. Contributed by Kihwal Lee. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c8648bd2 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c8648bd2 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c8648bd2 Branch: refs/heads/branch-2 Commit: c8648bd27e4954d43b8c02dc020184a1a7698340 Parents: caae0a0 Author: Kihwal Lee <kih...@apache.org> Authored: Mon Nov 3 08:23:34 2014 -0600 Committer: Kihwal Lee <kih...@apache.org> Committed: Mon Nov 3 08:23:34 2014 -0600 ---------------------------------------------------------------------- hadoop-common-project/hadoop-common/CHANGES.txt | 2 + .../java/org/apache/hadoop/fs/FileSystem.java | 30 ++++ .../org/apache/hadoop/fs/FilterFileSystem.java | 7 + .../apache/hadoop/fs/FileSystemTestWrapper.java | 28 +-- .../org/apache/hadoop/fs/TestHarFileSystem.java | 1 + .../hadoop/hdfs/DistributedFileSystem.java | 175 ++++++++++++++----- .../org/apache/hadoop/hdfs/TestFileStatus.java | 23 +++ 7 files changed, 191 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/c8648bd2/hadoop-common-project/hadoop-common/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index bd9d391..bcafb12 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -6,6 +6,8 @@ Release 2.7.0 - UNRELEASED NEW FEATURES + HADOOP-10987. Provide an iterator-based listing API for FileSystem (kihwal) + IMPROVEMENTS HADOOP-11156. DelegateToFileSystem should implement http://git-wip-us.apache.org/repos/asf/hadoop/blob/c8648bd2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java index 894e9e0..745238c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java @@ -1702,6 +1702,36 @@ public abstract class FileSystem extends Configured implements Closeable { } /** + * Returns a remote iterator so that followup calls are made on demand + * while consuming the entries. Each file system implementation should + * override this method and provide a more efficient implementation, if + * possible. + * + * @param p target path + * @return remote iterator + */ + public RemoteIterator<FileStatus> listStatusIterator(final Path p) + throws FileNotFoundException, IOException { + return new RemoteIterator<FileStatus>() { + private final FileStatus[] stats = listStatus(p); + private int i = 0; + + @Override + public boolean hasNext() { + return i<stats.length; + } + + @Override + public FileStatus next() throws IOException { + if (!hasNext()) { + throw new NoSuchElementException("No more entry in " + p); + } + return stats[i++]; + } + }; + } + + /** * List the statuses and block locations of the files in the given path. * * If the path is a directory, http://git-wip-us.apache.org/repos/asf/hadoop/blob/c8648bd2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java index e729e67..3d5a753 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java @@ -251,6 +251,13 @@ public class FilterFileSystem extends FileSystem { return fs.listLocatedStatus(f); } + /** Return a remote iterator for listing in a directory */ + @Override + public RemoteIterator<FileStatus> listStatusIterator(Path f) + throws IOException { + return fs.listStatusIterator(f); + } + @Override public Path getHomeDirectory() { return fs.getHomeDirectory(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/c8648bd2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileSystemTestWrapper.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileSystemTestWrapper.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileSystemTestWrapper.java index 9a5f40e..933ad1a 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileSystemTestWrapper.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileSystemTestWrapper.java @@ -334,37 +334,11 @@ public final class FileSystemTestWrapper extends FSTestWrapper { return fs.getFileChecksum(f); } - private class FakeRemoteIterator<E> implements RemoteIterator<E> { - - private E[] elements; - private int count; - - FakeRemoteIterator(E[] elements) { - this.elements = elements; - count = 0; - } - - @Override - public boolean hasNext() throws IOException { - return count < elements.length; - } - - @Override - public E next() throws IOException { - if (hasNext()) { - return elements[count++]; - } - return null; - } - } - @Override public RemoteIterator<FileStatus> listStatusIterator(Path f) throws AccessControlException, FileNotFoundException, UnsupportedFileSystemException, IOException { - // Fake the RemoteIterator, because FileSystem has no such thing - FileStatus[] statuses = fs.listStatus(f); - return new FakeRemoteIterator<FileStatus>(statuses); + return fs.listStatusIterator(f); } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/c8648bd2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java index 1e86439..374bb2e 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java @@ -125,6 +125,7 @@ public class TestHarFileSystem { public Iterator<LocatedFileStatus> listLocatedStatus(Path f); public Iterator<LocatedFileStatus> listLocatedStatus(Path f, PathFilter filter); + public Iterator<FileStatus> listStatusIterator(Path f); public void copyFromLocalFile(Path src, Path dst); public void moveFromLocalFile(Path[] srcs, Path dst); public void moveFromLocalFile(Path src, Path dst); http://git-wip-us.apache.org/repos/asf/hadoop/blob/c8648bd2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index 067adce..8152527 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -766,66 +766,145 @@ public class DistributedFileSystem extends FileSystem { protected RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path p, final PathFilter filter) throws IOException { - final Path absF = fixRelativePart(p); - return new RemoteIterator<LocatedFileStatus>() { - private DirectoryListing thisListing; - private int i; - private String src; - private LocatedFileStatus curStat = null; - - { // initializer - // Fully resolve symlinks in path first to avoid additional resolution - // round-trips as we fetch more batches of listings - src = getPathName(resolvePath(absF)); - // fetch the first batch of entries in the directory - thisListing = dfs.listPaths(src, HdfsFileStatus.EMPTY_NAME, true); - statistics.incrementReadOps(1); - if (thisListing == null) { // the directory does not exist - throw new FileNotFoundException("File " + p + " does not exist."); + Path absF = fixRelativePart(p); + return new FileSystemLinkResolver<RemoteIterator<LocatedFileStatus>>() { + @Override + public RemoteIterator<LocatedFileStatus> doCall(final Path p) + throws IOException, UnresolvedLinkException { + return new DirListingIterator<LocatedFileStatus>(p, filter, true); + } + + @Override + public RemoteIterator<LocatedFileStatus> next(final FileSystem fs, final Path p) + throws IOException { + if (fs instanceof DistributedFileSystem) { + return ((DistributedFileSystem)fs).listLocatedStatus(p, filter); } + // symlink resolution for this methos does not work cross file systems + // because it is a protected method. + throw new IOException("Link resolution does not work with multiple " + + "file systems for listLocatedStatus(): " + p); + } + }.resolve(this, absF); + } + + + /** + * Returns a remote iterator so that followup calls are made on demand + * while consuming the entries. This reduces memory consumption during + * listing of a large directory. + * + * @param p target path + * @return remote iterator + */ + @Override + public RemoteIterator<FileStatus> listStatusIterator(final Path p) + throws IOException { + Path absF = fixRelativePart(p); + return new FileSystemLinkResolver<RemoteIterator<FileStatus>>() { + @Override + public RemoteIterator<FileStatus> doCall(final Path p) + throws IOException, UnresolvedLinkException { + return new DirListingIterator<FileStatus>(p, false); } @Override - public boolean hasNext() throws IOException { - while (curStat == null && hasNextNoFilter()) { - LocatedFileStatus next = - ((HdfsLocatedFileStatus)thisListing.getPartialListing()[i++]) - .makeQualifiedLocated(getUri(), absF); - if (filter.accept(next.getPath())) { - curStat = next; - } + public RemoteIterator<FileStatus> next(final FileSystem fs, final Path p) + throws IOException { + return ((DistributedFileSystem)fs).listStatusIterator(p); + } + }.resolve(this, absF); + + } + + /** + * This class defines an iterator that returns + * the file status of each file/subdirectory of a directory + * + * if needLocation, status contains block location if it is a file + * throws a RuntimeException with the error as its cause. + * + * @param <T> the type of the file status + */ + private class DirListingIterator<T extends FileStatus> + implements RemoteIterator<T> { + private DirectoryListing thisListing; + private int i; + private Path p; + private String src; + private T curStat = null; + private PathFilter filter; + private boolean needLocation; + + private DirListingIterator(Path p, PathFilter filter, + boolean needLocation) throws IOException { + this.p = p; + this.src = getPathName(p); + this.filter = filter; + this.needLocation = needLocation; + // fetch the first batch of entries in the directory + thisListing = dfs.listPaths(src, HdfsFileStatus.EMPTY_NAME, + needLocation); + statistics.incrementReadOps(1); + if (thisListing == null) { // the directory does not exist + throw new FileNotFoundException("File " + p + " does not exist."); + } + i = 0; + } + + private DirListingIterator(Path p, boolean needLocation) + throws IOException { + this(p, null, needLocation); + } + + @Override + @SuppressWarnings("unchecked") + public boolean hasNext() throws IOException { + while (curStat == null && hasNextNoFilter()) { + T next; + HdfsFileStatus fileStat = thisListing.getPartialListing()[i++]; + if (needLocation) { + next = (T)((HdfsLocatedFileStatus)fileStat) + .makeQualifiedLocated(getUri(), p); + } else { + next = (T)fileStat.makeQualified(getUri(), p); + } + // apply filter if not null + if (filter == null || filter.accept(next.getPath())) { + curStat = next; } - return curStat != null; } + return curStat != null; + } - /** Check if there is a next item before applying the given filter */ - private boolean hasNextNoFilter() throws IOException { + /** Check if there is a next item before applying the given filter */ + private boolean hasNextNoFilter() throws IOException { + if (thisListing == null) { + return false; + } + if (i >= thisListing.getPartialListing().length + && thisListing.hasMore()) { + // current listing is exhausted & fetch a new listing + thisListing = dfs.listPaths(src, thisListing.getLastName(), + needLocation); + statistics.incrementReadOps(1); if (thisListing == null) { return false; } - if (i>=thisListing.getPartialListing().length - && thisListing.hasMore()) { - // current listing is exhausted & fetch a new listing - thisListing = dfs.listPaths(src, thisListing.getLastName(), true); - statistics.incrementReadOps(1); - if (thisListing == null) { - return false; - } - i = 0; - } - return (i<thisListing.getPartialListing().length); + i = 0; } + return (i < thisListing.getPartialListing().length); + } - @Override - public LocatedFileStatus next() throws IOException { - if (hasNext()) { - LocatedFileStatus tmp = curStat; - curStat = null; - return tmp; - } - throw new java.util.NoSuchElementException("No more entry in " + p); - } - }; + @Override + public T next() throws IOException { + if (hasNext()) { + T tmp = curStat; + curStat = null; + return tmp; + } + throw new java.util.NoSuchElementException("No more entry in " + p); + } } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/c8648bd2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileStatus.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileStatus.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileStatus.java index 1c9aff0..bc01190 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileStatus.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileStatus.java @@ -227,6 +227,9 @@ public class TestFileStatus { RemoteIterator<FileStatus> itor = fc.listStatus(dir); assertFalse(dir + " should be empty", itor.hasNext()); + itor = fs.listStatusIterator(dir); + assertFalse(dir + " should be empty", itor.hasNext()); + // create another file that is smaller than a block. Path file2 = new Path(dir, "filestatus2.dat"); writeFile(fs, file2, 1, blockSize/4, blockSize); @@ -264,6 +267,12 @@ public class TestFileStatus { assertEquals(file3.toString(), itor.next().getPath().toString()); assertFalse("Unexpected addtional file", itor.hasNext()); + itor = fs.listStatusIterator(dir); + assertEquals(file2.toString(), itor.next().getPath().toString()); + assertEquals(file3.toString(), itor.next().getPath().toString()); + assertFalse("Unexpected addtional file", itor.hasNext()); + + // Test iterative listing. Now dir has 2 entries, create one more. Path dir3 = fs.makeQualified(new Path(dir, "dir3")); fs.mkdirs(dir3); @@ -280,6 +289,12 @@ public class TestFileStatus { assertEquals(file3.toString(), itor.next().getPath().toString()); assertFalse("Unexpected addtional file", itor.hasNext()); + itor = fs.listStatusIterator(dir); + assertEquals(dir3.toString(), itor.next().getPath().toString()); + assertEquals(file2.toString(), itor.next().getPath().toString()); + assertEquals(file3.toString(), itor.next().getPath().toString()); + assertFalse("Unexpected addtional file", itor.hasNext()); + // Now dir has 3 entries, create two more Path dir4 = fs.makeQualified(new Path(dir, "dir4")); fs.mkdirs(dir4); @@ -303,6 +318,14 @@ public class TestFileStatus { assertEquals(file3.toString(), itor.next().getPath().toString()); assertFalse(itor.hasNext()); + itor = fs.listStatusIterator(dir); + assertEquals(dir3.toString(), itor.next().getPath().toString()); + assertEquals(dir4.toString(), itor.next().getPath().toString()); + assertEquals(dir5.toString(), itor.next().getPath().toString()); + assertEquals(file2.toString(), itor.next().getPath().toString()); + assertEquals(file3.toString(), itor.next().getPath().toString()); + assertFalse(itor.hasNext()); + { //test permission error on hftp fs.setPermission(dir, new FsPermission((short)0)); try {