ayushtkn commented on a change in pull request #2732:
URL: https://github.com/apache/hadoop/pull/2732#discussion_r591382388



##########
File path: 
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
##########
@@ -730,4 +660,155 @@ private void writeToFileListing(SequenceFile.Writer 
fileListWriter,
     totalPaths++;
     maybePrintStats();
   }
+
+  /**
+   * A utility class to traverse a directory.
+   */
+  private final class TraverseDirectory {
+
+    private SequenceFile.Writer fileListWriter;
+    private FileSystem sourceFS;
+    private ArrayList<FileStatus> sourceDirs;
+    private Path sourcePathRoot;
+    private DistCpContext context;
+    private HashSet<String> excludeList;
+    private List<FileStatusInfo> fileStatuses;
+    private final boolean preserveAcls;
+    private final boolean preserveXAttrs;
+    private final boolean preserveRawXattrs;
+
+    private TraverseDirectory(SequenceFile.Writer fileListWriter,
+        FileSystem sourceFS, ArrayList<FileStatus> sourceDirs,
+        Path sourcePathRoot, DistCpContext context, HashSet<String> 
excludeList,
+        List<FileStatusInfo> fileStatuses) {
+      this.fileListWriter = fileListWriter;
+      this.sourceFS = sourceFS;
+      this.sourceDirs = sourceDirs;
+      this.sourcePathRoot = sourcePathRoot;
+      this.context = context;
+      this.excludeList = excludeList;
+      this.fileStatuses = fileStatuses;
+      this.preserveAcls = context.shouldPreserve(FileAttribute.ACL);
+      this.preserveXAttrs = context.shouldPreserve(FileAttribute.XATTR);
+      this.preserveRawXattrs = context.shouldPreserveRawXattrs();
+    }
+
+    public void traverseDirectory() throws IOException {
+      if (context.shouldUseIterator()) {
+        traverseDirectoryLegacy();
+      } else {
+        traverseDirectoryMultiThreaded();
+      }
+    }
+
+    public void traverseDirectoryMultiThreaded() throws IOException {
+      assert numListstatusThreads > 0;
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Starting thread pool of " + numListstatusThreads
+            + " listStatus workers.");
+      }
+      ProducerConsumer<FileStatus, FileStatus[]> workers =
+          new ProducerConsumer<FileStatus, FileStatus[]>(numListstatusThreads);
+      for (int i = 0; i < numListstatusThreads; i++) {
+        workers.addWorker(
+            new FileStatusProcessor(sourcePathRoot.getFileSystem(getConf()),
+                excludeList));
+      }
+
+      for (FileStatus status : sourceDirs) {
+        workers.put(new WorkRequest<FileStatus>(status, 0));
+      }
+
+      while (workers.hasWork()) {
+        try {
+          WorkReport<FileStatus[]> workResult = workers.take();
+          int retry = workResult.getRetry();
+          for (FileStatus child : workResult.getItem()) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug(
+                  "Recording source-path: " + child.getPath() + " for copy.");
+            }
+            if (workResult.getSuccess()) {
+              LinkedList<CopyListingFileStatus> childCopyListingStatus =
+                  DistCpUtils.toCopyListingFileStatus(sourceFS, child,
+                      preserveAcls && child.isDirectory(),
+                      preserveXAttrs && child.isDirectory(),
+                      preserveRawXattrs && child.isDirectory(),
+                      context.getBlocksPerChunk());
+
+              for (CopyListingFileStatus fs : childCopyListingStatus) {
+                if (randomizeFileListing) {
+                  addToFileListing(fileStatuses,
+                      new FileStatusInfo(fs, sourcePathRoot), fileListWriter);
+                } else {
+                  writeToFileListing(fileListWriter, fs, sourcePathRoot);
+                }
+              }
+            }
+            if (retry < maxRetries) {
+              if (child.isDirectory()) {
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("Traversing into source dir: " + child.getPath());
+                }
+                workers.put(new WorkRequest<FileStatus>(child, retry));
+              }
+            } else {
+              LOG.error("Giving up on " + child.getPath() + " after " + retry
+                  + " retries.");
+            }
+          }
+        } catch (InterruptedException ie) {
+          LOG.error("Could not get item from childQueue. Retrying...");
+        }
+      }
+      workers.shutdown();
+    }
+
+    private void traverseDirectoryLegacy() throws IOException {
+      Stack<FileStatus> pathStack = new Stack<FileStatus>();
+      for (FileStatus fs : sourceDirs) {
+        if (excludeList == null || !excludeList
+            .contains(fs.getPath().toUri().getPath())) {
+          pathStack.add(fs);
+        }
+      }
+      while (!pathStack.isEmpty()) {
+        prepareListing(pathStack.pop().getPath());
+      }
+    }
+
+    private void prepareListing(Path path) throws IOException {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Recording source-path: " + path + " for copy.");
+      }
+      RemoteIterator<FileStatus> listStatus = 
sourceFS.listStatusIterator(path);
+      while (listStatus.hasNext()) {
+        FileStatus child = listStatus.next();
+        if (excludeList != null && excludeList
+            .contains(child.getPath().toUri().getPath())) {
+          continue;
+        }
+        LinkedList<CopyListingFileStatus> childCopyListingStatus = DistCpUtils
+            .toCopyListingFileStatus(sourceFS, child,
+                preserveAcls && child.isDirectory(),
+                preserveXAttrs && child.isDirectory(),
+                preserveRawXattrs && child.isDirectory(),
+                context.getBlocksPerChunk());
+        for (CopyListingFileStatus fs : childCopyListingStatus) {
+          if (randomizeFileListing) {
+            addToFileListing(fileStatuses,
+                new FileStatusInfo(fs, sourcePathRoot), fileListWriter);
+          } else {
+            writeToFileListing(fileListWriter, fs, sourcePathRoot);
+          }
+        }
+        if (child.isDirectory()) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Traversing into source dir: " + child.getPath());
+          }
+          prepareListing(child.getPath());
+        }
+      }
+    }

Review comment:
       Yeps, that is something cool, I extracted a part of it to 
`hadoop-common`, let me know if you have objections doing that, well I wanted 
to move whole of it to `common`, just left it because of the class 
`CallableSupplier`, I thought moving this might cause some incompatibility 
problems, as this was being used in the prod code as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to