Repository: hive Updated Branches: refs/heads/branch-2.1 5633df3c2 -> dcb8e3956
HIVE-13984: Use multi-threaded approach to listing files for msck (Pengcheng Xiong, reviewed by Prasanth Jayachandran, Hari Sankar Sivarama Subramaniyan, Rajesh Balamohan) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/dcb8e395 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/dcb8e395 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/dcb8e395 Branch: refs/heads/branch-2.1 Commit: dcb8e395666a8792bfb62dd36a642443a6476ef3 Parents: 5633df3 Author: Pengcheng Xiong <[email protected]> Authored: Thu Jun 16 11:30:20 2016 -0700 Committer: Pengcheng Xiong <[email protected]> Committed: Thu Jun 16 11:33:25 2016 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 5 +- .../hive/ql/metadata/HiveMetaStoreChecker.java | 106 ++++++++++++++++--- 2 files changed, 92 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/dcb8e395/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 33b0713..c60a193 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2401,8 +2401,9 @@ public class HiveConf extends Configuration { HIVE_SECURITY_COMMAND_WHITELIST("hive.security.command.whitelist", "set,reset,dfs,add,list,delete,reload,compile", "Comma separated list of non-SQL Hive commands users are authorized to execute"), - HIVE_MOVE_FILES_THREAD_COUNT("hive.mv.files.thread", 25, new SizeValidator(0L, true, 1024L, true), "Number of threads" - + " used to move files in move task. Set it to 0 to disable multi-threaded file moves."), + HIVE_MOVE_FILES_THREAD_COUNT("hive.mv.files.thread", 25, new SizeValidator(0L, true, 1024L, true), "Number of threads" + + " used to move files in move task. Set it to 0 to disable multi-threaded file moves. This parameter is also used by" + + " MSCK to check tables."), // If this is set all move tasks at the end of a multi-insert query will only begin once all // outputs are ready HIVE_MULTI_INSERT_MOVE_TASKS_SHARE_DEPENDENCIES( http://git-wip-us.apache.org/repos/asf/hive/blob/dcb8e395/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java index 10fa561..1122f8d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java @@ -21,9 +21,17 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,6 +40,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.MetaException; @@ -39,6 +48,8 @@ import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.ql.metadata.CheckResult.PartitionResult; import org.apache.thrift.TException; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + /** * Verify that the information in the metastore matches what is on the * filesystem. Return a CheckResult object containing lists of missing and any @@ -286,9 +297,10 @@ public class HiveMetaStoreChecker { * Result object * @throws IOException * Thrown if we fail at fetching listings from the fs. + * @throws HiveException */ void findUnknownPartitions(Table table, Set<Path> partPaths, - CheckResult result) throws IOException { + CheckResult result) throws IOException, HiveException { Path tablePath = table.getPath(); // now check the table folder and see if we find anything @@ -353,28 +365,88 @@ public class HiveMetaStoreChecker { * This set will contain the leaf paths at the end. * @throws IOException * Thrown if we can't get lists from the fs. + * @throws HiveException */ - private void getAllLeafDirs(Path basePath, Set<Path> allDirs) - throws IOException { - getAllLeafDirs(basePath, allDirs, basePath.getFileSystem(conf)); + private void getAllLeafDirs(Path basePath, Set<Path> allDirs) throws IOException, HiveException { + ConcurrentLinkedQueue<Path> basePaths = new ConcurrentLinkedQueue<>(); + basePaths.add(basePath); + // we only use the keySet of ConcurrentHashMap + Map<Path, Object> dirSet = new ConcurrentHashMap<>(); + // Here we just reuse the THREAD_COUNT configuration for + // HIVE_MOVE_FILES_THREAD_COUNT + final ExecutorService pool = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ? Executors + .newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25), + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("MSCK-GetPaths-%d").build()) + : null; + if (pool == null) { + LOG.debug("Not-using threaded version of MSCK-GetPaths"); + } else { + LOG.debug("Using threaded version of MSCK-GetPaths with number of threads " + + ((ThreadPoolExecutor) pool).getPoolSize()); + } + getAllLeafDirs(pool, basePaths, dirSet, basePath.getFileSystem(conf)); + pool.shutdown(); + allDirs.addAll(dirSet.keySet()); } - private void getAllLeafDirs(Path basePath, Set<Path> allDirs, FileSystem fs) - throws IOException { - - FileStatus[] statuses = fs.listStatus(basePath, FileUtils.HIDDEN_FILES_PATH_FILTER); - boolean directoryFound=false; + // process the basePaths in parallel and then the next level of basePaths + private void getAllLeafDirs(final ExecutorService pool, final ConcurrentLinkedQueue<Path> basePaths, + final Map<Path, Object> allDirs, final FileSystem fs) throws IOException, HiveException { + final ConcurrentLinkedQueue<Path> nextLevel = new ConcurrentLinkedQueue<>(); + if (null == pool) { + for (final Path path : basePaths) { + FileStatus[] statuses = fs.listStatus(path, FileUtils.HIDDEN_FILES_PATH_FILTER); + boolean directoryFound = false; + for (FileStatus status : statuses) { + if (status.isDir()) { + directoryFound = true; + nextLevel.add(status.getPath()); + } + } - for (FileStatus status : statuses) { - if (status.isDir()) { - directoryFound = true; - getAllLeafDirs(status.getPath(), allDirs, fs); + if (!directoryFound) { + allDirs.put(path, null); + } + if (!nextLevel.isEmpty()) { + getAllLeafDirs(pool, nextLevel, allDirs, fs); + } + } + } else { + final List<Future<Void>> futures = new LinkedList<>(); + for (final Path path : basePaths) { + futures.add(pool.submit(new Callable<Void>() { + @Override + public Void call() throws Exception { + FileStatus[] statuses = fs.listStatus(path, FileUtils.HIDDEN_FILES_PATH_FILTER); + boolean directoryFound = false; + + for (FileStatus status : statuses) { + if (status.isDir()) { + directoryFound = true; + nextLevel.add(status.getPath()); + } + } + + if (!directoryFound) { + allDirs.put(path, null); + } + return null; + } + })); + } + for (Future<Void> future : futures) { + try { + future.get(); + } catch (Exception e) { + LOG.error(e.getMessage()); + pool.shutdownNow(); + throw new HiveException(e.getCause()); + } + } + if (!nextLevel.isEmpty()) { + getAllLeafDirs(pool, nextLevel, allDirs, fs); } - } - - if(!directoryFound){ - allDirs.add(basePath); } }
