Repository: hadoop Updated Branches: refs/heads/trunk 18de6f204 -> 0d898b7bb
HADOOP-12502 SetReplication OutOfMemoryError. Contributed by Vinayakumar B. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0d898b7b Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0d898b7b Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0d898b7b Branch: refs/heads/trunk Commit: 0d898b7bb8d8d616133236da979a4316be4c1a6f Parents: 18de6f2 Author: Aaron Fabbri <fab...@apache.org> Authored: Wed Apr 11 17:19:56 2018 -0700 Committer: Aaron Fabbri <fab...@apache.org> Committed: Wed Apr 11 17:19:56 2018 -0700 ---------------------------------------------------------------------- .../apache/hadoop/fs/ChecksumFileSystem.java | 9 ++- .../java/org/apache/hadoop/fs/FileSystem.java | 2 +- .../org/apache/hadoop/fs/shell/Command.java | 69 ++++++++++++++++++-- .../apache/hadoop/fs/shell/CopyCommands.java | 6 ++ .../java/org/apache/hadoop/fs/shell/Ls.java | 26 +++++++- .../org/apache/hadoop/fs/shell/PathData.java | 27 ++++++++ .../apache/hadoop/fs/shell/find/TestFind.java | 34 +++++++++- 7 files changed, 161 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d898b7b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java index 14c1905..663c910 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java @@ -677,7 +677,14 @@ public abstract class ChecksumFileSystem extends FilterFileSystem { public FileStatus[] listStatus(Path f) throws IOException { return fs.listStatus(f, DEFAULT_FILTER); } - + + @Override + public RemoteIterator<FileStatus> listStatusIterator(final Path p) + throws IOException { + // Not-using fs#listStatusIterator() since it includes crc files as well + return new DirListingIterator<>(p); + } + /** * List the statuses of the files/directories in the given path if the path is * a directory. http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d898b7b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java index facfe03..707b921 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java @@ -2147,7 +2147,7 @@ public abstract class FileSystem extends Configured implements Closeable { /** * Generic iterator for implementing {@link #listStatusIterator(Path)}. */ - private class DirListingIterator<T extends FileStatus> implements + protected class DirListingIterator<T extends FileStatus> implements RemoteIterator<T> { private final Path path; http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d898b7b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Command.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Command.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Command.java index c292cf6..a4746cf 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Command.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Command.java @@ -33,6 +33,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathNotFoundException; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -325,14 +326,9 @@ abstract public class Command extends Configured { */ protected void processPaths(PathData parent, PathData ... items) throws IOException { - // TODO: this really should be iterative for (PathData item : items) { try { - processPath(item); - if (recursive && isPathRecursable(item)) { - recursePath(item); - } - postProcessPath(item); + processPathInternal(item); } catch (IOException e) { displayError(e); } @@ -340,6 +336,59 @@ abstract public class Command extends Configured { } /** + * Iterates over the given expanded paths and invokes + * {@link #processPath(PathData)} on each element. If "recursive" is true, + * will do a post-visit DFS on directories. + * @param parent if called via a recurse, will be the parent dir, else null + * @param itemsIterator a iterator of {@link PathData} objects to process + * @throws IOException if anything goes wrong... + */ + protected void processPaths(PathData parent, + RemoteIterator<PathData> itemsIterator) throws IOException { + int groupSize = getListingGroupSize(); + if (groupSize == 0) { + // No grouping of contents required. + while (itemsIterator.hasNext()) { + processPaths(parent, itemsIterator.next()); + } + } else { + List<PathData> items = new ArrayList<PathData>(groupSize); + while (itemsIterator.hasNext()) { + items.add(itemsIterator.next()); + if (!itemsIterator.hasNext() || items.size() == groupSize) { + processPaths(parent, items.toArray(new PathData[items.size()])); + items.clear(); + } + } + } + } + + private void processPathInternal(PathData item) throws IOException { + processPath(item); + if (recursive && isPathRecursable(item)) { + recursePath(item); + } + postProcessPath(item); + } + + /** + * Whether the directory listing for a path should be sorted.? + * @return true/false. + */ + protected boolean isSorted() { + return false; + } + + /** + * While using iterator method for listing for a path, whether to group items + * and process as array? If so what is the size of array? + * @return size of the grouping array. + */ + protected int getListingGroupSize() { + return 0; + } + + /** * Determines whether a {@link PathData} item is recursable. Default * implementation is to recurse directories but can be overridden to recurse * through symbolic links. @@ -384,7 +433,13 @@ abstract public class Command extends Configured { protected void recursePath(PathData item) throws IOException { try { depth++; - processPaths(item, item.getDirectoryContents()); + if (isSorted()) { + // use the non-iterative method for listing because explicit sorting is + // required. Iterators not guaranteed to return sorted elements + processPaths(item, item.getDirectoryContents()); + } else { + processPaths(item, item.getDirectoryContentsIterator()); + } } finally { depth--; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d898b7b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java index 11cb3d6..da7a2b2 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java @@ -142,6 +142,12 @@ class CopyCommands { srcs.add(src); } } + + @Override + protected boolean isSorted() { + //Sort the children for merge + return true; + } } static class Cp extends CommandWithDestination { http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d898b7b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Ls.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Ls.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Ls.java index a2d5017..56504e4 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Ls.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Ls.java @@ -230,8 +230,30 @@ class Ls extends FsCommand { } @Override - protected void processPaths(PathData parent, PathData ... items) - throws IOException { + protected boolean isSorted() { + // use the non-iterative method for listing because explicit sorting is + // required based on time/size/reverse or Total number of entries + // required to print summary first when non-recursive. + return !isRecursive() || isOrderTime() || isOrderSize() || isOrderReverse(); + } + + @Override + protected int getListingGroupSize() { + if (pathOnly) { + // If there is a need of printing only paths, then no grouping required + return 0; + } + /* + * LS output should be formatted properly. Grouping 100 items and formatting + * the output to reduce the creation of huge sized arrays. This method will + * be called only when recursive is set. + */ + return 100; + } + + @Override + protected void processPaths(PathData parent, PathData... items) + throws IOException { if (parent != null && !isRecursive() && items.length != 0) { if (!pathOnly) { out.println("Found " + items.length + " items"); http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d898b7b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/PathData.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/PathData.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/PathData.java index 3cf3273..adf17df 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/PathData.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/PathData.java @@ -37,6 +37,7 @@ import org.apache.hadoop.fs.PathIOException; import org.apache.hadoop.fs.PathIsDirectoryException; import org.apache.hadoop.fs.PathIsNotDirectoryException; import org.apache.hadoop.fs.PathNotFoundException; +import org.apache.hadoop.fs.RemoteIterator; /** * Encapsulates a Path (path), its FileStatus (stat), and its FileSystem (fs). @@ -277,6 +278,32 @@ public class PathData implements Comparable<PathData> { } /** + * Returns a RemoteIterator for PathData objects of the items contained in the + * given directory. + * @return remote iterator of PathData objects for its children + * @throws IOException if anything else goes wrong... + */ + public RemoteIterator<PathData> getDirectoryContentsIterator() + throws IOException { + checkIfExists(FileTypeRequirement.SHOULD_BE_DIRECTORY); + final RemoteIterator<FileStatus> stats = this.fs.listStatusIterator(path); + return new RemoteIterator<PathData>() { + + @Override + public boolean hasNext() throws IOException { + return stats.hasNext(); + } + + @Override + public PathData next() throws IOException { + FileStatus file = stats.next(); + String child = getStringForChildPath(file.getPath()); + return new PathData(fs, child, file); + } + }; + } + + /** * Creates a new object for a child entry in this directory * @param child the basename will be appended to this object's path * @return PathData for the child http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d898b7b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/find/TestFind.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/find/TestFind.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/find/TestFind.java index 716230a..de0e512 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/find/TestFind.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/find/TestFind.java @@ -19,18 +19,19 @@ package org.apache.hadoop.fs.shell.find; import static org.junit.Assert.*; import static org.mockito.Mockito.*; -import static org.mockito.Matchers.*; import java.io.IOException; import java.io.PrintStream; import java.util.Arrays; import java.util.Collections; import java.util.LinkedList; +import java.util.NoSuchElementException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.shell.PathData; import org.apache.hadoop.fs.shell.find.BaseExpression; import org.apache.hadoop.fs.shell.find.Expression; @@ -42,6 +43,9 @@ import org.junit.Rule; import org.junit.rules.Timeout; import org.junit.Test; import org.mockito.InOrder; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; public class TestFind { @@ -861,6 +865,34 @@ public class TestFind { when(mockFs.listStatus(eq(item5c.path))).thenReturn( new FileStatus[] { item5ca.stat }); + when(mockFs.listStatusIterator(Mockito.any(Path.class))) + .thenAnswer(new Answer<RemoteIterator<FileStatus>>() { + + @Override + public RemoteIterator<FileStatus> answer(InvocationOnMock invocation) + throws Throwable { + final Path p = (Path) invocation.getArguments()[0]; + final FileStatus[] stats = mockFs.listStatus(p); + + return new RemoteIterator<FileStatus>() { + private int i = 0; + + @Override + public boolean hasNext() throws IOException { + return i < stats.length; + } + + @Override + public FileStatus next() throws IOException { + if (!hasNext()) { + throw new NoSuchElementException("No more entry in " + p); + } + return stats[i++]; + } + }; + } + }); + when(item1.stat.isSymlink()).thenReturn(false); when(item1a.stat.isSymlink()).thenReturn(false); when(item1aa.stat.isSymlink()).thenReturn(false); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org