HIVE-16119: HiveMetaStoreChecker: remove singleThread logic duplication (Zoltan Haindrich reviewed by Vihang Karajgaonkar, Ashutosh Chauhan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ed2f46aa Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ed2f46aa Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ed2f46aa Branch: refs/heads/hive-14535 Commit: ed2f46aa737efb859f23d357fbeafe1b42e7d404 Parents: 7437225 Author: Zoltan Haindrich <[email protected]> Authored: Thu Mar 9 08:32:35 2017 +0100 Committer: Zoltan Haindrich <[email protected]> Committed: Thu Mar 9 08:55:43 2017 +0100 ---------------------------------------------------------------------- .../hive/ql/metadata/HiveMetaStoreChecker.java | 90 ++++---------------- .../ql/metadata/TestHiveMetaStoreChecker.java | 39 +++++---- 2 files changed, 35 insertions(+), 94 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/ed2f46aa/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java index 3420ef8..6805c17 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java @@ -28,11 +28,12 @@ import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; -import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Sets; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.slf4j.Logger; @@ -52,6 +53,7 @@ import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner; import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; import org.apache.thrift.TException; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; /** @@ -411,35 +413,19 @@ public class HiveMetaStoreChecker { // pool here the smaller sized pool of the two becomes a bottleneck int poolSize = conf.getInt(ConfVars.METASTORE_FS_HANDLER_THREADS_COUNT.varname, 15); - // Check if too low config is provided for move files. 2x CPU is reasonable max count. - poolSize = poolSize == 0 ? poolSize : Math.max(poolSize, - getMinPoolSize()); - - // Fixed thread pool on need basis - final ThreadPoolExecutor pool = poolSize > 0 ? (ThreadPoolExecutor) - Executors.newFixedThreadPool(poolSize, - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("MSCK-GetPaths-%d").build()) : null; - - if (pool == null) { - LOG.debug("Not-using threaded version of MSCK-GetPaths"); - Queue<Path> basePaths = new LinkedList<>(); - basePaths.add(basePath); - checkPartitionDirsSingleThreaded(basePaths, allDirs, basePath.getFileSystem(conf), maxDepth, - maxDepth); + ExecutorService executor; + if (poolSize <= 1) { + LOG.debug("Using single-threaded version of MSCK-GetPaths"); + executor = MoreExecutors.sameThreadExecutor(); } else { - LOG.debug("Using multi-threaded version of MSCK-GetPaths with number of threads " - + pool.getMaximumPoolSize()); - checkPartitionDirsInParallel((ThreadPoolExecutor) pool, basePath, allDirs, - basePath.getFileSystem(conf), maxDepth); + LOG.debug("Using multi-threaded version of MSCK-GetPaths with number of threads " + poolSize); + ThreadFactory threadFactory = + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("MSCK-GetPaths-%d").build(); + executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(poolSize, threadFactory); } - if (pool != null) { - pool.shutdown(); - } - } + checkPartitionDirs(executor, basePath, allDirs, basePath.getFileSystem(conf), maxDepth); - @VisibleForTesting - int getMinPoolSize() { - return Runtime.getRuntime().availableProcessors() * 2; + executor.shutdown(); } private final class PathDepthInfoCallable implements Callable<Path> { @@ -515,7 +501,7 @@ public class HiveMetaStoreChecker { } } - private void checkPartitionDirsInParallel(final ThreadPoolExecutor pool, + private void checkPartitionDirs(final ExecutorService executor, final Path basePath, final Set<Path> result, final FileSystem fs, final int maxDepth) throws HiveException { try { @@ -534,7 +520,7 @@ public class HiveMetaStoreChecker { //process each level in parallel while(!nextLevel.isEmpty()) { futures.add( - pool.submit(new PathDepthInfoCallable(nextLevel.poll(), maxDepth, fs, tempQueue))); + executor.submit(new PathDepthInfoCallable(nextLevel.poll(), maxDepth, fs, tempQueue))); } while(!futures.isEmpty()) { Path p = futures.poll().get(); @@ -547,52 +533,8 @@ public class HiveMetaStoreChecker { } } catch (InterruptedException | ExecutionException e) { LOG.error(e.getMessage()); - pool.shutdownNow(); + executor.shutdownNow(); throw new HiveException(e.getCause()); } } - - /* - * Original recursive implementation works well for single threaded use-case but has limitations - * if we attempt to parallelize this directly - */ - private void checkPartitionDirsSingleThreaded(Queue<Path> basePaths, final Set<Path> allDirs, - final FileSystem fs, final int depth, final int maxDepth) throws IOException, HiveException { - for (final Path path : basePaths) { - FileStatus[] statuses = fs.listStatus(path, FileUtils.HIDDEN_FILES_PATH_FILTER); - final Queue<Path> nextLevel = new LinkedList<>(); - boolean fileFound = false; - for (FileStatus status : statuses) { - if (status.isDirectory()) { - nextLevel.add(status.getPath()); - } else { - fileFound = true; - } - } - if (depth != 0) { - // we are in the middle of the search and we find a file - if (fileFound) { - if ("throw".equals(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_MSCK_PATH_VALIDATION))) { - throw new HiveException( - "MSCK finds a file rather than a folder when it searches for " + path.toString()); - } else { - LOG.warn("MSCK finds a file rather than a folder when it searches for " - + path.toString()); - } - } - if (!nextLevel.isEmpty()) { - checkPartitionDirsSingleThreaded(nextLevel, allDirs, fs, depth - 1, maxDepth); - } else if (depth != maxDepth) { - // since nextLevel is empty, we are missing partition columns. - if ("throw".equals(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_MSCK_PATH_VALIDATION))) { - throw new HiveException("MSCK is missing partition columns under " + path.toString()); - } else { - LOG.warn("MSCK is missing partition columns under " + path.toString()); - } - } - } else { - allDirs.add(path); - } - } - } } http://git-wip-us.apache.org/repos/asf/hive/blob/ed2f46aa/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveMetaStoreChecker.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveMetaStoreChecker.java b/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveMetaStoreChecker.java index f9bcc52..21bc8ee 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveMetaStoreChecker.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveMetaStoreChecker.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hive.ql.metadata; -import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -25,9 +24,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import com.google.common.collect.Lists; -import junit.framework.TestCase; - import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; @@ -43,6 +39,10 @@ import org.apache.hadoop.mapred.TextInputFormat; import org.apache.thrift.TException; import org.mockito.Mockito; +import com.google.common.collect.Lists; + +import junit.framework.TestCase; + /** * TestHiveMetaStoreChecker. * @@ -359,11 +359,7 @@ public class TestHiveMetaStoreChecker extends TestCase { throws HiveException, AlreadyExistsException, IOException { // set num of threads to 0 so that single-threaded checkMetastore is called hive.getConf().setIntVar(HiveConf.ConfVars.METASTORE_FS_HANDLER_THREADS_COUNT, 0); - // currently HiveMetastoreChecker uses a minimum pool size of 2*numOfProcs - // no other easy way to set it deterministically for this test case - checker = Mockito.spy(checker); - Mockito.when(checker.getMinPoolSize()).thenReturn(2); - int poolSize = checker.getMinPoolSize(); + int poolSize = 2; // create a deeply nested table which has more partition keys than the pool size Table testTable = createPartitionedTestTable(dbName, tableName, poolSize + 2, 0); // add 10 partitions on the filesystem @@ -385,11 +381,8 @@ public class TestHiveMetaStoreChecker extends TestCase { */ public void testDeeplyNestedPartitionedTables() throws HiveException, AlreadyExistsException, IOException { - // currently HiveMetastoreChecker uses a minimum pool size of 2*numOfProcs - // no other easy way to set it deterministically for this test case - int poolSize = checker.getMinPoolSize(); - checker = Mockito.spy(checker); - Mockito.when(checker.getMinPoolSize()).thenReturn(2); + hive.getConf().setIntVar(HiveConf.ConfVars.METASTORE_FS_HANDLER_THREADS_COUNT, 2); + int poolSize = 2; // create a deeply nested table which has more partition keys than the pool size Table testTable = createPartitionedTestTable(dbName, tableName, poolSize + 2, 0); // add 10 partitions on the filesystem @@ -420,18 +413,22 @@ public class TestHiveMetaStoreChecker extends TestCase { createDirectory(sb.toString()); //check result now CheckResult result = new CheckResult(); + Exception exception = null; try { checker.checkMetastore(dbName, tableName, null, result); } catch (Exception e) { - assertTrue("Expected exception HiveException got " + e.getClass(), e instanceof HiveException); + exception = e; } + assertTrue("Expected HiveException", exception!=null && exception instanceof HiveException); createFile(sb.toString(), "dummyFile"); result = new CheckResult(); + exception = null; try { checker.checkMetastore(dbName, tableName, null, result); } catch (Exception e) { - assertTrue("Expected exception HiveException got " + e.getClass(), e instanceof HiveException); + exception = e; } + assertTrue("Expected HiveException", exception!=null && exception instanceof HiveException); } /* @@ -452,20 +449,22 @@ public class TestHiveMetaStoreChecker extends TestCase { createDirectory(sb.toString()); // check result now CheckResult result = new CheckResult(); + Exception exception = null; try { checker.checkMetastore(dbName, tableName, null, result); } catch (Exception e) { - assertTrue("Expected exception HiveException got " + e.getClass(), - e instanceof HiveException); + exception = e; } + assertTrue("Expected HiveException", exception!=null && exception instanceof HiveException); createFile(sb.toString(), "dummyFile"); result = new CheckResult(); + exception = null; try { checker.checkMetastore(dbName, tableName, null, result); } catch (Exception e) { - assertTrue("Expected exception HiveException got " + e.getClass(), - e instanceof HiveException); + exception = e; } + assertTrue("Expected HiveException", exception!=null && exception instanceof HiveException); } /** * Creates a test partitioned table with the required level of nested partitions and number of
