This is an automated email from the ASF dual-hosted git repository.
janh pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new f2230a8 HBASE-23379 Clean Up FSUtil getRegionLocalityMappingFromFS
f2230a8 is described below
commit f2230a822fd78633ce47766cd83eeb0b4ded0570
Author: belugabehr <[email protected]>
AuthorDate: Tue Dec 10 15:56:53 2019 -0500
HBASE-23379 Clean Up FSUtil getRegionLocalityMappingFromFS
Signed-off-by: Jan Hentschel <[email protected]>
---
.../java/org/apache/hadoop/hbase/util/FSUtils.java | 66 ++++++++--------------
1 file changed, 25 insertions(+), 41 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
index 7b8fb60..13c308c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
@@ -39,7 +39,6 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Vector;
-import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -1623,8 +1622,7 @@ public abstract class FSUtils extends CommonFSUtils {
final Configuration conf, final String desiredTable, int threadPoolSize)
throws IOException {
Map<String, Map<String, Float>> regionDegreeLocalityMapping = new
ConcurrentHashMap<>();
- getRegionLocalityMappingFromFS(conf, desiredTable, threadPoolSize, null,
- regionDegreeLocalityMapping);
+ getRegionLocalityMappingFromFS(conf, desiredTable, threadPoolSize,
regionDegreeLocalityMapping);
return regionDegreeLocalityMapping;
}
@@ -1640,24 +1638,19 @@ public abstract class FSUtils extends CommonFSUtils {
* the table you wish to scan locality for
* @param threadPoolSize
* the thread pool size to use
- * @param regionToBestLocalityRSMapping
- * the map into which to put the best locality mapping or null
* @param regionDegreeLocalityMapping
* the map into which to put the locality degree mapping or null,
* must be a thread-safe implementation
* @throws IOException
* in case of file system errors or interrupts
*/
- private static void getRegionLocalityMappingFromFS(
- final Configuration conf, final String desiredTable,
- int threadPoolSize,
- Map<String, String> regionToBestLocalityRSMapping,
- Map<String, Map<String, Float>> regionDegreeLocalityMapping)
- throws IOException {
- FileSystem fs = FileSystem.get(conf);
- Path rootPath = FSUtils.getRootDir(conf);
- long startTime = EnvironmentEdgeManager.currentTime();
- Path queryPath;
+ private static void getRegionLocalityMappingFromFS(final Configuration conf,
+ final String desiredTable, int threadPoolSize,
+ final Map<String, Map<String, Float>> regionDegreeLocalityMapping)
throws IOException {
+ final FileSystem fs = FileSystem.get(conf);
+ final Path rootPath = FSUtils.getRootDir(conf);
+ final long startTime = EnvironmentEdgeManager.currentTime();
+ final Path queryPath;
// The table files are in ${hbase.rootdir}/data/<namespace>/<table>/*
if (null == desiredTable) {
queryPath = new Path(new Path(rootPath,
HConstants.BASE_NAMESPACE_DIR).toString() + "/*/*/*/");
@@ -1694,44 +1687,36 @@ public abstract class FSUtils extends CommonFSUtils {
FileStatus[] statusList = fs.globStatus(queryPath, pathFilter);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Query Path: {} ; # list of files: {}", queryPath,
Arrays.toString(statusList));
+ }
+
if (null == statusList) {
return;
- } else {
- LOG.debug("Query Path: " + queryPath + " ; # list of files: " +
- statusList.length);
}
// lower the number of threads in case we have very few expected regions
threadPoolSize = Math.min(threadPoolSize, statusList.length);
// run in multiple threads
- ThreadPoolExecutor tpe = new ThreadPoolExecutor(threadPoolSize,
- threadPoolSize, 60, TimeUnit.SECONDS,
- new ArrayBlockingQueue<>(statusList.length),
- Threads.newDaemonThreadFactory("FSRegionQuery"));
+ final ExecutorService tpe = Executors.newFixedThreadPool(threadPoolSize,
+ Threads.newDaemonThreadFactory("FSRegionQuery"));
try {
// ignore all file status items that are not of interest
for (FileStatus regionStatus : statusList) {
- if (null == regionStatus) {
+ if (null == regionStatus || !regionStatus.isDirectory()) {
continue;
}
- if (!regionStatus.isDirectory()) {
- continue;
+ final Path regionPath = regionStatus.getPath();
+ if (null != regionPath) {
+ tpe.execute(new FSRegionScanner(fs, regionPath, null,
regionDegreeLocalityMapping));
}
-
- Path regionPath = regionStatus.getPath();
- if (null == regionPath) {
- continue;
- }
-
- tpe.execute(new FSRegionScanner(fs, regionPath,
- regionToBestLocalityRSMapping, regionDegreeLocalityMapping));
}
} finally {
tpe.shutdown();
- int threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY,
- 60 * 1000);
+ final long threadWakeFrequency = (long)
conf.getInt(HConstants.THREAD_WAKE_FREQUENCY,
+ HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
try {
// here we wait until TPE terminates, which is either naturally or by
// exceptions in the execution of the threads
@@ -1740,18 +1725,17 @@ public abstract class FSUtils extends CommonFSUtils {
// printing out rough estimate, so as to not introduce
// AtomicInteger
LOG.info("Locality checking is underway: { Scanned Regions : "
- + tpe.getCompletedTaskCount() + "/"
- + tpe.getTaskCount() + " }");
+ + ((ThreadPoolExecutor) tpe).getCompletedTaskCount() + "/"
+ + ((ThreadPoolExecutor) tpe).getTaskCount() + " }");
}
} catch (InterruptedException e) {
- throw (InterruptedIOException)new
InterruptedIOException().initCause(e);
+ Thread.currentThread().interrupt();
+ throw (InterruptedIOException) new
InterruptedIOException().initCause(e);
}
}
long overhead = EnvironmentEdgeManager.currentTime() - startTime;
- String overheadMsg = "Scan DFS for locality info takes " + overhead + "
ms";
-
- LOG.info(overheadMsg);
+ LOG.info("Scan DFS for locality info takes {}ms", overhead);
}
/**