This is an automated email from the ASF dual-hosted git repository.

janh pushed a commit to branch branch-2.2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.2 by this push:
     new 631b176  HBASE-23379 Clean Up FSUtil getRegionLocalityMappingFromFS
631b176 is described below

commit 631b1762fc7b1b0db1c2810d9fce064b01794a24
Author: belugabehr <[email protected]>
AuthorDate: Tue Dec 10 15:56:53 2019 -0500

    HBASE-23379 Clean Up FSUtil getRegionLocalityMappingFromFS
    
    Signed-off-by: Jan Hentschel <[email protected]>
---
 .../java/org/apache/hadoop/hbase/util/FSUtils.java | 66 ++++++++--------------
 1 file changed, 25 insertions(+), 41 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
index 5cd714b..0c11f76 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
@@ -39,7 +39,6 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Vector;
-import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -1625,8 +1624,7 @@ public abstract class FSUtils extends CommonFSUtils {
       final Configuration conf, final String desiredTable, int threadPoolSize)
       throws IOException {
     Map<String, Map<String, Float>> regionDegreeLocalityMapping = new 
ConcurrentHashMap<>();
-    getRegionLocalityMappingFromFS(conf, desiredTable, threadPoolSize, null,
-        regionDegreeLocalityMapping);
+    getRegionLocalityMappingFromFS(conf, desiredTable, threadPoolSize, 
regionDegreeLocalityMapping);
     return regionDegreeLocalityMapping;
   }
 
@@ -1642,24 +1640,19 @@ public abstract class FSUtils extends CommonFSUtils {
    *          the table you wish to scan locality for
    * @param threadPoolSize
    *          the thread pool size to use
-   * @param regionToBestLocalityRSMapping
-   *          the map into which to put the best locality mapping or null
    * @param regionDegreeLocalityMapping
    *          the map into which to put the locality degree mapping or null,
    *          must be a thread-safe implementation
    * @throws IOException
    *           in case of file system errors or interrupts
    */
-  private static void getRegionLocalityMappingFromFS(
-      final Configuration conf, final String desiredTable,
-      int threadPoolSize,
-      Map<String, String> regionToBestLocalityRSMapping,
-      Map<String, Map<String, Float>> regionDegreeLocalityMapping)
-      throws IOException {
-    FileSystem fs =  FileSystem.get(conf);
-    Path rootPath = FSUtils.getRootDir(conf);
-    long startTime = EnvironmentEdgeManager.currentTime();
-    Path queryPath;
+  private static void getRegionLocalityMappingFromFS(final Configuration conf,
+      final String desiredTable, int threadPoolSize,
+      final Map<String, Map<String, Float>> regionDegreeLocalityMapping) 
throws IOException {
+    final FileSystem fs =  FileSystem.get(conf);
+    final Path rootPath = FSUtils.getRootDir(conf);
+    final long startTime = EnvironmentEdgeManager.currentTime();
+    final Path queryPath;
     // The table files are in ${hbase.rootdir}/data/<namespace>/<table>/*
     if (null == desiredTable) {
       queryPath = new Path(new Path(rootPath, 
HConstants.BASE_NAMESPACE_DIR).toString() + "/*/*/*/");
@@ -1696,44 +1689,36 @@ public abstract class FSUtils extends CommonFSUtils {
 
     FileStatus[] statusList = fs.globStatus(queryPath, pathFilter);
 
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Query Path: {} ; # list of files: {}", queryPath, 
Arrays.toString(statusList));
+    }
+
     if (null == statusList) {
       return;
-    } else {
-      LOG.debug("Query Path: " + queryPath + " ; # list of files: " +
-          statusList.length);
     }
 
     // lower the number of threads in case we have very few expected regions
     threadPoolSize = Math.min(threadPoolSize, statusList.length);
 
     // run in multiple threads
-    ThreadPoolExecutor tpe = new ThreadPoolExecutor(threadPoolSize,
-        threadPoolSize, 60, TimeUnit.SECONDS,
-        new ArrayBlockingQueue<>(statusList.length),
-        Threads.newDaemonThreadFactory("FSRegionQuery"));
+    final ExecutorService tpe = Executors.newFixedThreadPool(threadPoolSize,
+      Threads.newDaemonThreadFactory("FSRegionQuery"));
     try {
       // ignore all file status items that are not of interest
       for (FileStatus regionStatus : statusList) {
-        if (null == regionStatus) {
+        if (null == regionStatus || !regionStatus.isDirectory()) {
           continue;
         }
 
-        if (!regionStatus.isDirectory()) {
-          continue;
+        final Path regionPath = regionStatus.getPath();
+        if (null != regionPath) {
+          tpe.execute(new FSRegionScanner(fs, regionPath, null, 
regionDegreeLocalityMapping));
         }
-
-        Path regionPath = regionStatus.getPath();
-        if (null == regionPath) {
-          continue;
-        }
-
-        tpe.execute(new FSRegionScanner(fs, regionPath,
-            regionToBestLocalityRSMapping, regionDegreeLocalityMapping));
       }
     } finally {
       tpe.shutdown();
-      int threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY,
-          60 * 1000);
+      final long threadWakeFrequency = (long) 
conf.getInt(HConstants.THREAD_WAKE_FREQUENCY,
+        HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
       try {
         // here we wait until TPE terminates, which is either naturally or by
         // exceptions in the execution of the threads
@@ -1742,18 +1727,17 @@ public abstract class FSUtils extends CommonFSUtils {
           // printing out rough estimate, so as to not introduce
           // AtomicInteger
           LOG.info("Locality checking is underway: { Scanned Regions : "
-              + tpe.getCompletedTaskCount() + "/"
-              + tpe.getTaskCount() + " }");
+              + ((ThreadPoolExecutor) tpe).getCompletedTaskCount() + "/"
+              + ((ThreadPoolExecutor) tpe).getTaskCount() + " }");
         }
       } catch (InterruptedException e) {
-        throw (InterruptedIOException)new 
InterruptedIOException().initCause(e);
+        Thread.currentThread().interrupt();
+        throw (InterruptedIOException) new 
InterruptedIOException().initCause(e);
       }
     }
 
     long overhead = EnvironmentEdgeManager.currentTime() - startTime;
-    String overheadMsg = "Scan DFS for locality info takes " + overhead + " 
ms";
-
-    LOG.info(overheadMsg);
+    LOG.info("Scan DFS for locality info takes {}ms", overhead);
   }
 
   /**

Reply via email to