Repository: hive Updated Branches: refs/heads/master 441b29e10 -> 19a6831b9
HIVE-15803 : msck can hang when nested partitions are present (Rajesh Balamohan via Ashutosh Chauhan) Signed-off-by: Ashutosh Chauhan <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/19a6831b Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/19a6831b Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/19a6831b Branch: refs/heads/master Commit: 19a6831b9c5fe3911872bcab1b974f3baa4e1db9 Parents: 441b29e Author: Rajesh Balamohan <[email protected]> Authored: Tue Feb 7 19:16:00 2017 -0800 Committer: Ashutosh Chauhan <[email protected]> Committed: Wed Feb 8 13:35:52 2017 -0800 ---------------------------------------------------------------------- .../hive/ql/metadata/HiveMetaStoreChecker.java | 45 +++++++++++++++----- .../clientpositive/msck_repair_batchsize.q | 10 +++++ .../clientpositive/msck_repair_batchsize.q.out | 25 +++++++++++ 3 files changed, 70 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/19a6831b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java index 57f731f..7c94c95 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java @@ -28,7 +28,6 @@ import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; @@ -400,24 +399,31 @@ public class HiveMetaStoreChecker { * Specify how deep the search goes. * @throws IOException * Thrown if we can't get lists from the fs. - * @throws HiveException + * @throws HiveException */ private void checkPartitionDirs(Path basePath, Set<Path> allDirs, int maxDepth) throws IOException, HiveException { ConcurrentLinkedQueue<Path> basePaths = new ConcurrentLinkedQueue<>(); basePaths.add(basePath); - Set<Path> dirSet = Collections.newSetFromMap(new ConcurrentHashMap<Path, Boolean>()); + Set<Path> dirSet = Collections.newSetFromMap(new ConcurrentHashMap<Path, Boolean>()); // Here we just reuse the THREAD_COUNT configuration for // HIVE_MOVE_FILES_THREAD_COUNT - final ExecutorService pool = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ? Executors - .newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25), - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("MSCK-GetPaths-%d").build()) - : null; + int poolSize = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 15); + + // Check if too low config is provided for move files. 2x CPU is reasonable max count. + poolSize = poolSize == 0 ? poolSize : Math.max(poolSize, + Runtime.getRuntime().availableProcessors() * 2); + + // Fixed thread pool on need basis + final ThreadPoolExecutor pool = poolSize > 0 ? (ThreadPoolExecutor) + Executors.newFixedThreadPool(poolSize, + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("MSCK-GetPaths-%d").build()) : null; + if (pool == null) { LOG.debug("Not-using threaded version of MSCK-GetPaths"); } else { LOG.debug("Using threaded version of MSCK-GetPaths with number of threads " - + ((ThreadPoolExecutor) pool).getPoolSize()); + + pool.getMaximumPoolSize()); } checkPartitionDirs(pool, basePaths, dirSet, basePath.getFileSystem(conf), maxDepth, maxDepth); if (pool != null) { @@ -427,11 +433,30 @@ public class HiveMetaStoreChecker { } // process the basePaths in parallel and then the next level of basePaths - private void checkPartitionDirs(final ExecutorService pool, + private void checkPartitionDirs(final ThreadPoolExecutor pool, final ConcurrentLinkedQueue<Path> basePaths, final Set<Path> allDirs, final FileSystem fs, final int depth, final int maxDepth) throws IOException, HiveException { final ConcurrentLinkedQueue<Path> nextLevel = new ConcurrentLinkedQueue<>(); - if (null == pool) { + + // Check if thread pool can be used. + boolean useThreadPool = false; + if (pool != null) { + synchronized (pool) { + // In case of recursive calls, it is possible to deadlock with TP. Check TP usage here. + if (pool.getActiveCount() < pool.getMaximumPoolSize()) { + useThreadPool = true; + } + + if (!useThreadPool) { + if (LOG.isDebugEnabled()) { + LOG.debug("Not using threadPool as active count:" + pool.getActiveCount() + + ", max:" + pool.getMaximumPoolSize()); + } + } + } + } + + if (null == pool || !useThreadPool) { for (final Path path : basePaths) { FileStatus[] statuses = fs.listStatus(path, FileUtils.HIDDEN_FILES_PATH_FILTER); boolean fileFound = false; http://git-wip-us.apache.org/repos/asf/hive/blob/19a6831b/ql/src/test/queries/clientpositive/msck_repair_batchsize.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/msck_repair_batchsize.q b/ql/src/test/queries/clientpositive/msck_repair_batchsize.q index 06e4507..e56e97a 100644 --- a/ql/src/test/queries/clientpositive/msck_repair_batchsize.q +++ b/ql/src/test/queries/clientpositive/msck_repair_batchsize.q @@ -20,3 +20,13 @@ MSCK REPAIR TABLE default.repairtable; MSCK TABLE repairtable; DROP TABLE default.repairtable; + + +dfs ${system:test.dfs.mkdir} -p ${system:test.tmp.dir}/apps/hive/warehouse/test.db/repairtable/p1=c/p2=a/p3=b; +CREATE TABLE `repairtable`( `col` string) PARTITIONED BY ( `p1` string, `p2` string) location '${system:test.tmp.dir}/apps/hive/warehouse/test.db/repairtable/'; + +dfs -touchz ${system:test.tmp.dir}/apps/hive/warehouse/test.db/repairtable/p1=c/p2=a/p3=b/datafile; +set hive.mv.files.thread=1; +MSCK TABLE repairtable; + +DROP TABLE default.repairtable; http://git-wip-us.apache.org/repos/asf/hive/blob/19a6831b/ql/src/test/results/clientpositive/msck_repair_batchsize.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/msck_repair_batchsize.q.out b/ql/src/test/results/clientpositive/msck_repair_batchsize.q.out index a0180b7..ba99024 100644 --- a/ql/src/test/results/clientpositive/msck_repair_batchsize.q.out +++ b/ql/src/test/results/clientpositive/msck_repair_batchsize.q.out @@ -47,3 +47,28 @@ POSTHOOK: query: DROP TABLE default.repairtable POSTHOOK: type: DROPTABLE POSTHOOK: Input: default@repairtable POSTHOOK: Output: default@repairtable +#### A masked pattern was here #### +PREHOOK: type: CREATETABLE +#### A masked pattern was here #### +PREHOOK: Output: database:default +PREHOOK: Output: default@repairtable +#### A masked pattern was here #### +POSTHOOK: type: CREATETABLE +#### A masked pattern was here #### +POSTHOOK: Output: database:default +POSTHOOK: Output: default@repairtable +PREHOOK: query: MSCK TABLE repairtable +PREHOOK: type: MSCK +PREHOOK: Output: default@repairtable +POSTHOOK: query: MSCK TABLE repairtable +POSTHOOK: type: MSCK +POSTHOOK: Output: default@repairtable +Partitions not in metastore: repairtable:p1=c/p2=a +PREHOOK: query: DROP TABLE default.repairtable +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@repairtable +PREHOOK: Output: default@repairtable +POSTHOOK: query: DROP TABLE default.repairtable +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@repairtable +POSTHOOK: Output: default@repairtable
