This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 5be450393c9c417b05ad0d4bf96b68e772aa6a3d
Author: Steve Loughran <[email protected]>
AuthorDate: Thu Dec 31 16:02:10 2020 +0000

    MAPREDUCE-7315. LocatedFileStatusFetcher to collect/publish IOStatistics. 
(#2579)
    
    Part of the HADOOP-16830 IOStatistics API feature.
    
    If the source FileSystem's listing RemoteIterators
    implement IOStatisticsSource, these are collected and served through
    the IOStatisticsSource API. If they are not: getIOStatistics() returns
    null.
    
    Only the listing statistics are collected; FileSystem.globStatus() doesn't
    provide any, so IO use there is not included in the aggregate results.
    
    Contributed by Steve Loughran.
    
    Change-Id: Iff1485297c2c7e181b54eaf1d2c4f80faeee7cfa
---
 .../hadoop/mapred/LocatedFileStatusFetcher.java    | 60 +++++++++++++++++++++-
 1 file changed, 58 insertions(+), 2 deletions(-)

diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java
index 74ade77..5810698 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.mapred;
 import java.io.IOException;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.StringJoiner;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
@@ -37,6 +38,9 @@ import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 
 import 
org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
@@ -52,6 +56,9 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.util.concurrent.HadoopExecutors;
 
+import static 
org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics;
+import static 
org.apache.hadoop.fs.statistics.IOStatisticsSupport.snapshotIOStatistics;
+
 /**
  * Utility class to fetch block locations for specified Input paths using a
  * configured number of threads.
@@ -60,7 +67,7 @@ import org.apache.hadoop.util.concurrent.HadoopExecutors;
  * configuration.
  */
 @Private
-public class LocatedFileStatusFetcher {
+public class LocatedFileStatusFetcher implements IOStatisticsSource {
 
   public static final Logger LOG =
       LoggerFactory.getLogger(LocatedFileStatusFetcher.class.getName());
@@ -88,6 +95,12 @@ public class LocatedFileStatusFetcher {
   private volatile Throwable unknownError;
 
   /**
+   * Demand created IO Statistics: only if the filesystem
+   * returns statistics does this fetch collect them.
+   */
+  private IOStatisticsSnapshot iostats;
+
+  /**
    * Instantiate.
    * The newApi switch is only used to configure what exception is raised
    * on failure of {@link #getFileStatuses()}, it does not change the 
algorithm.
@@ -226,7 +239,46 @@ public class LocatedFileStatusFetcher {
       lock.unlock();
     }
   }
-  
+
+  /**
+   * Return any IOStatistics collected during listing.
+   * @return IO stats accrued.
+   */
+  @Override
+  public synchronized IOStatistics getIOStatistics() {
+    return iostats;
+  }
+
+  /**
+   * Add the statistics of an individual thread's scan.
+   * @param stats possibly null statistics.
+   */
+  private void addResultStatistics(IOStatistics stats) {
+    if (stats != null) {
+      // demand creation of IO statistics.
+      synchronized (this) {
+        LOG.debug("Adding IOStatistics: {}", stats);
+        if (iostats == null) {
+          // demand create the statistics
+          iostats = snapshotIOStatistics(stats);
+        } else {
+          iostats.aggregate(stats);
+        }
+      }
+    }
+  }
+
+  @Override
+  public String toString() {
+    final IOStatistics ioStatistics = getIOStatistics();
+    StringJoiner stringJoiner = new StringJoiner(", ",
+        LocatedFileStatusFetcher.class.getSimpleName() + "[", "]");
+    if (ioStatistics != null) {
+      stringJoiner.add("IOStatistics=" + ioStatistics);
+    }
+    return stringJoiner.toString();
+  }
+
   /**
    * Retrieves block locations for the given @link {@link FileStatus}, and adds
    * additional paths to the process queue if required.
@@ -265,6 +317,8 @@ public class LocatedFileStatusFetcher {
             }
           }
         }
+        // aggregate any stats
+        result.stats = retrieveIOStatistics(iter);
       } else {
         result.locatedFileStatuses.add(fileStatus);
       }
@@ -275,6 +329,7 @@ public class LocatedFileStatusFetcher {
       private List<FileStatus> locatedFileStatuses = new LinkedList<>();
       private List<FileStatus> dirsNeedingRecursiveCalls = new LinkedList<>();
       private FileSystem fs;
+      private IOStatistics stats;
     }
   }
 
@@ -289,6 +344,7 @@ public class LocatedFileStatusFetcher {
     @Override
     public void onSuccess(ProcessInputDirCallable.Result result) {
       try {
+        addResultStatistics(result.stats);
         if (!result.locatedFileStatuses.isEmpty()) {
           resultQueue.add(result.locatedFileStatuses);
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to