Repository: hadoop
Updated Branches:
  refs/heads/trunk 18de6f204 -> 0d898b7bb


HADOOP-12502 SetReplication OutOfMemoryError. Contributed by Vinayakumar B.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0d898b7b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0d898b7b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0d898b7b

Branch: refs/heads/trunk
Commit: 0d898b7bb8d8d616133236da979a4316be4c1a6f
Parents: 18de6f2
Author: Aaron Fabbri <fab...@apache.org>
Authored: Wed Apr 11 17:19:56 2018 -0700
Committer: Aaron Fabbri <fab...@apache.org>
Committed: Wed Apr 11 17:19:56 2018 -0700

----------------------------------------------------------------------
 .../apache/hadoop/fs/ChecksumFileSystem.java    |  9 ++-
 .../java/org/apache/hadoop/fs/FileSystem.java   |  2 +-
 .../org/apache/hadoop/fs/shell/Command.java     | 69 ++++++++++++++++++--
 .../apache/hadoop/fs/shell/CopyCommands.java    |  6 ++
 .../java/org/apache/hadoop/fs/shell/Ls.java     | 26 +++++++-
 .../org/apache/hadoop/fs/shell/PathData.java    | 27 ++++++++
 .../apache/hadoop/fs/shell/find/TestFind.java   | 34 +++++++++-
 7 files changed, 161 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d898b7b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java
index 14c1905..663c910 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java
@@ -677,7 +677,14 @@ public abstract class ChecksumFileSystem extends 
FilterFileSystem {
   public FileStatus[] listStatus(Path f) throws IOException {
     return fs.listStatus(f, DEFAULT_FILTER);
   }
-  
+
+  @Override
+  public RemoteIterator<FileStatus> listStatusIterator(final Path p)
+      throws IOException {
+    // Not-using fs#listStatusIterator() since it includes crc files as well
+    return new DirListingIterator<>(p);
+  }
+
   /**
    * List the statuses of the files/directories in the given path if the path 
is
    * a directory.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d898b7b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
index facfe03..707b921 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
@@ -2147,7 +2147,7 @@ public abstract class FileSystem extends Configured 
implements Closeable {
   /**
    * Generic iterator for implementing {@link #listStatusIterator(Path)}.
    */
-  private class DirListingIterator<T extends FileStatus> implements
+  protected class DirListingIterator<T extends FileStatus> implements
       RemoteIterator<T> {
 
     private final Path path;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d898b7b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Command.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Command.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Command.java
index c292cf6..a4746cf 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Command.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Command.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathNotFoundException;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -325,14 +326,9 @@ abstract public class Command extends Configured {
    */
   protected void processPaths(PathData parent, PathData ... items)
   throws IOException {
-    // TODO: this really should be iterative
     for (PathData item : items) {
       try {
-        processPath(item);
-        if (recursive && isPathRecursable(item)) {
-          recursePath(item);
-        }
-        postProcessPath(item);
+        processPathInternal(item);
       } catch (IOException e) {
         displayError(e);
       }
@@ -340,6 +336,59 @@ abstract public class Command extends Configured {
   }
 
   /**
+   * Iterates over the given expanded paths and invokes
+   * {@link #processPath(PathData)} on each element. If "recursive" is true,
+   * will do a post-visit DFS on directories.
+   * @param parent if called via a recurse, will be the parent dir, else null
+   * @param itemsIterator a iterator of {@link PathData} objects to process
+   * @throws IOException if anything goes wrong...
+   */
+  protected void processPaths(PathData parent,
+      RemoteIterator<PathData> itemsIterator) throws IOException {
+    int groupSize = getListingGroupSize();
+    if (groupSize == 0) {
+      // No grouping of contents required.
+      while (itemsIterator.hasNext()) {
+        processPaths(parent, itemsIterator.next());
+      }
+    } else {
+      List<PathData> items = new ArrayList<PathData>(groupSize);
+      while (itemsIterator.hasNext()) {
+        items.add(itemsIterator.next());
+        if (!itemsIterator.hasNext() || items.size() == groupSize) {
+          processPaths(parent, items.toArray(new PathData[items.size()]));
+          items.clear();
+        }
+      }
+    }
+  }
+
+  private void processPathInternal(PathData item) throws IOException {
+    processPath(item);
+    if (recursive && isPathRecursable(item)) {
+      recursePath(item);
+    }
+    postProcessPath(item);
+  }
+
+  /**
+   * Whether the directory listing for a path should be sorted.?
+   * @return true/false.
+   */
+  protected boolean isSorted() {
+    return false;
+  }
+
+  /**
+   * While using iterator method for listing for a path, whether to group items
+   * and process as array? If so what is the size of array?
+   * @return size of the grouping array.
+   */
+  protected int getListingGroupSize() {
+    return 0;
+  }
+
+  /**
    * Determines whether a {@link PathData} item is recursable. Default
    * implementation is to recurse directories but can be overridden to recurse
    * through symbolic links.
@@ -384,7 +433,13 @@ abstract public class Command extends Configured {
   protected void recursePath(PathData item) throws IOException {
     try {
       depth++;
-      processPaths(item, item.getDirectoryContents());
+      if (isSorted()) {
+        // use the non-iterative method for listing because explicit sorting is
+        // required. Iterators not guaranteed to return sorted elements
+        processPaths(item, item.getDirectoryContents());
+      } else {
+        processPaths(item, item.getDirectoryContentsIterator());
+      }
     } finally {
       depth--;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d898b7b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java
index 11cb3d6..da7a2b2 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java
@@ -142,6 +142,12 @@ class CopyCommands {
         srcs.add(src);
       }
     }
+
+    @Override
+    protected boolean isSorted() {
+      //Sort the children for merge
+      return true;
+    }
   }
 
   static class Cp extends CommandWithDestination {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d898b7b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Ls.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Ls.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Ls.java
index a2d5017..56504e4 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Ls.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Ls.java
@@ -230,8 +230,30 @@ class Ls extends FsCommand {
   }
 
   @Override
-  protected void processPaths(PathData parent, PathData ... items)
-  throws IOException {
+  protected boolean isSorted() {
+    // use the non-iterative method for listing because explicit sorting is
+    // required based on time/size/reverse or Total number of entries
+    // required to print summary first when non-recursive.
+    return !isRecursive() || isOrderTime() || isOrderSize() || 
isOrderReverse();
+  }
+
+  @Override
+  protected int getListingGroupSize() {
+    if (pathOnly) {
+      // If there is a need of printing only paths, then no grouping required
+      return 0;
+    }
+    /*
+     * LS output should be formatted properly. Grouping 100 items and 
formatting
+     * the output to reduce the creation of huge sized arrays. This method will
+     * be called only when recursive is set.
+     */
+    return 100;
+  }
+
+  @Override
+  protected void processPaths(PathData parent, PathData... items)
+      throws IOException {
     if (parent != null && !isRecursive() && items.length != 0) {
       if (!pathOnly) {
         out.println("Found " + items.length + " items");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d898b7b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/PathData.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/PathData.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/PathData.java
index 3cf3273..adf17df 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/PathData.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/PathData.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.fs.PathIOException;
 import org.apache.hadoop.fs.PathIsDirectoryException;
 import org.apache.hadoop.fs.PathIsNotDirectoryException;
 import org.apache.hadoop.fs.PathNotFoundException;
+import org.apache.hadoop.fs.RemoteIterator;
 
 /**
  * Encapsulates a Path (path), its FileStatus (stat), and its FileSystem (fs).
@@ -277,6 +278,32 @@ public class PathData implements Comparable<PathData> {
   }
 
   /**
+   * Returns a RemoteIterator for PathData objects of the items contained in 
the
+   * given directory.
+   * @return remote iterator of PathData objects for its children
+   * @throws IOException if anything else goes wrong...
+   */
+  public RemoteIterator<PathData> getDirectoryContentsIterator()
+      throws IOException {
+    checkIfExists(FileTypeRequirement.SHOULD_BE_DIRECTORY);
+    final RemoteIterator<FileStatus> stats = this.fs.listStatusIterator(path);
+    return new RemoteIterator<PathData>() {
+
+      @Override
+      public boolean hasNext() throws IOException {
+        return stats.hasNext();
+      }
+
+      @Override
+      public PathData next() throws IOException {
+        FileStatus file = stats.next();
+        String child = getStringForChildPath(file.getPath());
+        return new PathData(fs, child, file);
+      }
+    };
+  }
+
+  /**
    * Creates a new object for a child entry in this directory
    * @param child the basename will be appended to this object's path
    * @return PathData for the child

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d898b7b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/find/TestFind.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/find/TestFind.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/find/TestFind.java
index 716230a..de0e512 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/find/TestFind.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/find/TestFind.java
@@ -19,18 +19,19 @@ package org.apache.hadoop.fs.shell.find;
 
 import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
-import static org.mockito.Matchers.*;
 
 import java.io.IOException;
 import java.io.PrintStream;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.LinkedList;
+import java.util.NoSuchElementException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.shell.PathData;
 import org.apache.hadoop.fs.shell.find.BaseExpression;
 import org.apache.hadoop.fs.shell.find.Expression;
@@ -42,6 +43,9 @@ import org.junit.Rule;
 import org.junit.rules.Timeout;
 import org.junit.Test;
 import org.mockito.InOrder;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 public class TestFind {
 
@@ -861,6 +865,34 @@ public class TestFind {
     when(mockFs.listStatus(eq(item5c.path))).thenReturn(
         new FileStatus[] { item5ca.stat });
 
+    when(mockFs.listStatusIterator(Mockito.any(Path.class)))
+        .thenAnswer(new Answer<RemoteIterator<FileStatus>>() {
+
+          @Override
+          public RemoteIterator<FileStatus> answer(InvocationOnMock invocation)
+              throws Throwable {
+            final Path p = (Path) invocation.getArguments()[0];
+            final FileStatus[] stats = mockFs.listStatus(p);
+
+            return new RemoteIterator<FileStatus>() {
+              private int i = 0;
+
+              @Override
+              public boolean hasNext() throws IOException {
+                return i < stats.length;
+              }
+
+              @Override
+              public FileStatus next() throws IOException {
+                if (!hasNext()) {
+                  throw new NoSuchElementException("No more entry in " + p);
+                }
+                return stats[i++];
+              }
+            };
+          }
+        });
+
     when(item1.stat.isSymlink()).thenReturn(false);
     when(item1a.stat.isSymlink()).thenReturn(false);
     when(item1aa.stat.isSymlink()).thenReturn(false);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to