steveloughran commented on a change in pull request #2971:
URL: https://github.com/apache/hadoop/pull/2971#discussion_r827986651



##########
File path: 
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/TaskAttemptScanDirectoryStage.java
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.LongSummaryStatistics;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
+import 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileOrDirEntry;
+import 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
+import org.apache.hadoop.util.DurationInfo;
+
+import static 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.COMMITTER_TASK_DIRECTORY_COUNT_MEAN;
+import static 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.COMMITTER_TASK_DIRECTORY_DEPTH_MEAN;
+import static 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.COMMITTER_TASK_FILE_COUNT_MEAN;
+import static 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.COMMITTER_TASK_FILE_SIZE_MEAN;
+import static 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_TASK_SCAN_DIRECTORY;
+import static 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileOrDirEntry.dirEntry;
+import static 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.createTaskManifest;
+import static 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.maybeAddIOStatistics;
+
+/**
+ * Stage to scan a directory tree and build a task manifest.
+ * This is executed by the task committer.
+ */
+public final class TaskAttemptScanDirectoryStage
+    extends AbstractJobCommitStage<Void, TaskManifest> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      TaskAttemptScanDirectoryStage.class);
+
+  public TaskAttemptScanDirectoryStage(
+      final StageConfig stageConfig) {
+    super(true, stageConfig, OP_STAGE_TASK_SCAN_DIRECTORY, false);
+  }
+
+  /**
+   * Build the Manifest.
+   * @return the manifest
+   * @throws IOException failure.
+   */
+  @Override
+  protected TaskManifest executeStage(final Void arguments)
+      throws IOException {
+
+    final Path taskAttemptDir = getRequiredTaskAttemptDir();
+    final TaskManifest manifest = createTaskManifest(getStageConfig());
+
+    LOG.info("{}: scanning directory {}",
+        getName(), taskAttemptDir);
+
+    final int depth = scanDirectoryTree(manifest, taskAttemptDir,
+        getDestinationDir(),
+        0);
+    List<FileOrDirEntry> filesToCommit = manifest.getFilesToCommit();
+    LongSummaryStatistics fileSummary = filesToCommit.stream()
+        .mapToLong(FileOrDirEntry::getSize)
+        .summaryStatistics();
+    long fileDataSize = fileSummary.getSum();
+    long fileCount = fileSummary.getCount();
+    int dirCount = manifest.getDirectoriesToCreate().size();
+    LOG.info("{}: directory {} contained {} file(s); data size {}",
+        getName(),
+        taskAttemptDir,
+        fileCount,
+        fileDataSize);
+    LOG.info("{}: Directory count = {}; maximum depth {}",
+        getName(),
+        dirCount,
+        depth);
+    // add statistics about the task output which, when aggregated, provides
+    // insight into structure of job, task skew, etc.
+    IOStatisticsStore iostats = getIOStatistics();
+    iostats.addSample(COMMITTER_TASK_DIRECTORY_COUNT_MEAN, dirCount);
+    iostats.addSample(COMMITTER_TASK_DIRECTORY_DEPTH_MEAN, depth);
+    iostats.addSample(COMMITTER_TASK_FILE_COUNT_MEAN, fileCount);
+    iostats.addSample(COMMITTER_TASK_FILE_SIZE_MEAN, fileDataSize);
+
+    return manifest;
+  }
+
+  /**
+   * Recursively scan a directory tree.
+   * The manifest will contain all files to rename
+   * (source and dest) and directories to create.
+   * All files are processed before any of the subdirs are.
+   * This helps in statistics gathering.
+   * There's some optimizations which could be done with async
+   * fetching of the iterators of those subdirs, but as this
+   * is generally off-critical path then that "enhancement"
+   * can be postponed until data suggests this needs improvement.
+   * @param manifest manifest to update
+   * @param srcDir dir to scan
+   * @param destDir destination directory
+   * @param depth depth from the task attempt dir.
+   * @return the maximum depth of child directories
+   * @throws IOException IO failure.
+   */
+  private int scanDirectoryTree(
+      TaskManifest manifest,
+      Path srcDir,
+      Path destDir,
+      int depth) throws IOException {
+
+    // generate some task progress in case directory scanning is very slow.
+    progress();
+
+    int maxDepth = 0;
+    int files = 0;
+    List<FileStatus> subdirs = new ArrayList<>();
+    try (DurationInfo ignored = new DurationInfo(LOG, false,
+        "Task Attempt %s source dir %s, dest dir %s",
+        getTaskAttemptId(), srcDir, destDir)) {
+
+      // list the directory. This may block until the listing is complete,
+      // or, if the FS does incremental or asynchronous fetching, until the
+      // first page of results is ready.
+      final RemoteIterator<FileStatus> listing = listStatusIterator(srcDir);

Review comment:
       listFiles(recursive=true) is best for s3, but not so good elsewher
   
   * returns LocatedFileStatus , so on HDFS adds block lookup
   * abfs and gcs don't have deep tree list operations so treewalk internally
   * for them parallel scanning of subdirs can often be faster. 
   
   i don't bother with parallel scans here, though it would be possible to 
add...with real world job stats we can assess the cost of this operation and 
decide




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to