steveloughran commented on a change in pull request #2732:
URL: https://github.com/apache/hadoop/pull/2732#discussion_r591753745
##########
File path:
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java
##########
@@ -64,12 +71,22 @@
import org.apache.hadoop.thirdparty.com.google.common.base.Joiner;
import org.apache.hadoop.thirdparty.com.google.common.collect.Sets;
+import org.slf4j.LoggerFactory;
Review comment:
probably needs to go somewhere else in the imports
##########
File path:
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
##########
@@ -18,30 +18,31 @@
package org.apache.hadoop.tools;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
+import
org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
Review comment:
now, these imports we are trying to leave up where they were. Because
when cherrypicking we're trying to stay on the older versions. it's a PITA, I
know
##########
File path:
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java
##########
@@ -46,8 +50,11 @@
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
+import
org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
Review comment:
recent trunk changes #2522 will have broken this; just use direct
references to BlockingThreadPoolExecutorService
##########
File path:
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
##########
@@ -778,16 +774,13 @@ private void traverseDirectoryLegacy() throws IOException
{
}
private void prepareListing(Path path) throws IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Recording source-path: " + path + " for copy.");
- }
- RemoteIterator<FileStatus> listStatus =
sourceFS.listStatusIterator(path);
+ LOG.debug("Recording source-path: " + path + " for copy.");
+ RemoteIterator<FileStatus> listStatus = RemoteIterators
+ .filteringRemoteIterator(sourceFS.listStatusIterator(path),
+ i -> excludeList == null || !excludeList
+ .contains(i.getPath().toUri().getPath()));
Review comment:
nice to see this at work.
##########
File path:
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
##########
@@ -730,4 +660,155 @@ private void writeToFileListing(SequenceFile.Writer
fileListWriter,
totalPaths++;
maybePrintStats();
}
+
+ /**
+ * A utility class to traverse a directory.
+ */
+ private final class TraverseDirectory {
+
+ private SequenceFile.Writer fileListWriter;
+ private FileSystem sourceFS;
+ private ArrayList<FileStatus> sourceDirs;
+ private Path sourcePathRoot;
+ private DistCpContext context;
+ private HashSet<String> excludeList;
+ private List<FileStatusInfo> fileStatuses;
+ private final boolean preserveAcls;
+ private final boolean preserveXAttrs;
+ private final boolean preserveRawXattrs;
+
+ private TraverseDirectory(SequenceFile.Writer fileListWriter,
+ FileSystem sourceFS, ArrayList<FileStatus> sourceDirs,
+ Path sourcePathRoot, DistCpContext context, HashSet<String>
excludeList,
+ List<FileStatusInfo> fileStatuses) {
+ this.fileListWriter = fileListWriter;
+ this.sourceFS = sourceFS;
+ this.sourceDirs = sourceDirs;
+ this.sourcePathRoot = sourcePathRoot;
+ this.context = context;
+ this.excludeList = excludeList;
+ this.fileStatuses = fileStatuses;
+ this.preserveAcls = context.shouldPreserve(FileAttribute.ACL);
+ this.preserveXAttrs = context.shouldPreserve(FileAttribute.XATTR);
+ this.preserveRawXattrs = context.shouldPreserveRawXattrs();
+ }
+
+ public void traverseDirectory() throws IOException {
+ if (context.shouldUseIterator()) {
+ traverseDirectoryLegacy();
+ } else {
+ traverseDirectoryMultiThreaded();
+ }
+ }
+
+ public void traverseDirectoryMultiThreaded() throws IOException {
+ assert numListstatusThreads > 0;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Starting thread pool of " + numListstatusThreads
+ + " listStatus workers.");
+ }
+ ProducerConsumer<FileStatus, FileStatus[]> workers =
+ new ProducerConsumer<FileStatus, FileStatus[]>(numListstatusThreads);
+ for (int i = 0; i < numListstatusThreads; i++) {
+ workers.addWorker(
+ new FileStatusProcessor(sourcePathRoot.getFileSystem(getConf()),
+ excludeList));
+ }
+
+ for (FileStatus status : sourceDirs) {
+ workers.put(new WorkRequest<FileStatus>(status, 0));
+ }
+
+ while (workers.hasWork()) {
+ try {
+ WorkReport<FileStatus[]> workResult = workers.take();
+ int retry = workResult.getRetry();
+ for (FileStatus child : workResult.getItem()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Recording source-path: " + child.getPath() + " for copy.");
+ }
+ if (workResult.getSuccess()) {
+ LinkedList<CopyListingFileStatus> childCopyListingStatus =
+ DistCpUtils.toCopyListingFileStatus(sourceFS, child,
+ preserveAcls && child.isDirectory(),
+ preserveXAttrs && child.isDirectory(),
+ preserveRawXattrs && child.isDirectory(),
+ context.getBlocksPerChunk());
+
+ for (CopyListingFileStatus fs : childCopyListingStatus) {
+ if (randomizeFileListing) {
+ addToFileListing(fileStatuses,
+ new FileStatusInfo(fs, sourcePathRoot), fileListWriter);
+ } else {
+ writeToFileListing(fileListWriter, fs, sourcePathRoot);
+ }
+ }
+ }
+ if (retry < maxRetries) {
+ if (child.isDirectory()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Traversing into source dir: " + child.getPath());
+ }
+ workers.put(new WorkRequest<FileStatus>(child, retry));
+ }
+ } else {
+ LOG.error("Giving up on " + child.getPath() + " after " + retry
+ + " retries.");
+ }
+ }
+ } catch (InterruptedException ie) {
+ LOG.error("Could not get item from childQueue. Retrying...");
+ }
+ }
+ workers.shutdown();
+ }
+
+ private void traverseDirectoryLegacy() throws IOException {
+ Stack<FileStatus> pathStack = new Stack<FileStatus>();
+ for (FileStatus fs : sourceDirs) {
+ if (excludeList == null || !excludeList
+ .contains(fs.getPath().toUri().getPath())) {
+ pathStack.add(fs);
+ }
+ }
+ while (!pathStack.isEmpty()) {
+ prepareListing(pathStack.pop().getPath());
+ }
+ }
+
+ private void prepareListing(Path path) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Recording source-path: " + path + " for copy.");
+ }
+ RemoteIterator<FileStatus> listStatus =
sourceFS.listStatusIterator(path);
+ while (listStatus.hasNext()) {
+ FileStatus child = listStatus.next();
+ if (excludeList != null && excludeList
+ .contains(child.getPath().toUri().getPath())) {
+ continue;
+ }
+ LinkedList<CopyListingFileStatus> childCopyListingStatus = DistCpUtils
+ .toCopyListingFileStatus(sourceFS, child,
+ preserveAcls && child.isDirectory(),
+ preserveXAttrs && child.isDirectory(),
+ preserveRawXattrs && child.isDirectory(),
+ context.getBlocksPerChunk());
+ for (CopyListingFileStatus fs : childCopyListingStatus) {
+ if (randomizeFileListing) {
+ addToFileListing(fileStatuses,
+ new FileStatusInfo(fs, sourcePathRoot), fileListWriter);
+ } else {
+ writeToFileListing(fileListWriter, fs, sourcePathRoot);
+ }
+ }
+ if (child.isDirectory()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Traversing into source dir: " + child.getPath());
+ }
+ prepareListing(child.getPath());
+ }
+ }
+ }
Review comment:
happy for you to take what's merged up. That CallableSupplier can be
moved if you need to
##########
File path:
hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java
##########
@@ -43,6 +43,9 @@
import org.apache.hadoop.tools.DistCpOptions;
import org.apache.hadoop.tools.mapred.CopyMapper;
+import org.apache.hadoop.tools.util.DistCpTestUtils;
+import org.apache.hadoop.util.functional.RemoteIterators;
Review comment:
move these two to the block above
##########
File path:
hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java
##########
@@ -600,6 +603,50 @@ public void testNonDirectWrite() throws Exception {
directWrite(localFS, localDir, remoteFS, remoteDir, false);
}
+ @Test
+ public void testDistCpWithIterator() throws Exception {
+ describe("Build listing in distCp using the iterator option.");
+ Path source = new Path(remoteDir, "src");
+ Path dest = new Path(localDir, "dest");
+ dest = localFS.makeQualified(dest);
+ mkdirs(remoteFS, source);
+ verifyPathExists(remoteFS, "", source);
+
+ GenericTestUtils.createFiles(remoteFS, source, getDepth(), getWidth());
+
+ DistCpTestUtils.assertRunDistCp(DistCpConstants.SUCCESS, source.toString(),
+ dest.toString(), "-useiterator", conf);
+
+ // Check that all files got copied.
+ RemoteIterator<LocatedFileStatus> destFileItr =
Review comment:
you don't need this any more
##########
File path:
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
##########
@@ -288,10 +288,8 @@ protected void doBuildListingWithSnapshotDiff(
FileStatus sourceStatus = sourceFS.getFileStatus(diff.getTarget());
if (sourceStatus.isDirectory()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Adding source dir for traverse: " +
- sourceStatus.getPath());
- }
+ LOG.debug(
+ "Adding source dir for traverse: " + sourceStatus.getPath());
Review comment:
sorry, I meant you can do the LOG.debug("Adding source dir for traverse:
{}", sourceStatus.getPath()); style of log; the existing code is all for
commons logging and uses concatenation -so adds the isDebug guards to avoid
doing work when debug is false.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]