HIVE-16949: Leak of threads from Get-Input-Paths and Get-Input-Summary thread pool (Sahil Takiar, reviewed by Vihang Karajgaonkar)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/3fc131c7 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/3fc131c7 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/3fc131c7 Branch: refs/heads/hive-14535 Commit: 3fc131c79a5329ea509f0c125e789ddd042b1797 Parents: 6be50b7 Author: Sahil Takiar <takiar.sa...@gmail.com> Authored: Mon Aug 28 11:19:41 2017 -0700 Committer: Sahil Takiar <stak...@cloudera.com> Committed: Mon Aug 28 11:19:41 2017 -0700 ---------------------------------------------------------------------- .../apache/hadoop/hive/ql/exec/Utilities.java | 305 ++++++++++--------- .../hadoop/hive/ql/exec/TestUtilities.java | 99 ++++++ 2 files changed, 265 insertions(+), 139 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/3fc131c7/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 99c85f2..aca99f2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -2233,153 +2233,164 @@ public final class Utilities { // Process the case when name node call is needed final Map<String, ContentSummary> resultMap = new ConcurrentHashMap<String, ContentSummary>(); - ArrayList<Future<?>> results = new ArrayList<Future<?>>(); final ExecutorService executor; int numExecutors = getMaxExecutorsForInputListing(ctx.getConf(), pathNeedProcess.size()); if (numExecutors > 1) { LOG.info("Using " + numExecutors + " threads for getContentSummary"); executor = Executors.newFixedThreadPool(numExecutors, - new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("Get-Input-Summary-%d").build()); + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("Get-Input-Summary-%d").build()); } else { executor = null; } + ContentSummary cs = getInputSummaryWithPool(ctx, pathNeedProcess, work, summary, executor); + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.INPUT_SUMMARY); + return cs; + } + } - HiveInterruptCallback interrup = HiveInterruptUtils.add(new HiveInterruptCallback() { - @Override - public void interrupt() { - for (Path path : pathNeedProcess) { - try { - path.getFileSystem(ctx.getConf()).close(); - } catch (IOException ignore) { - LOG.debug("Failed to close filesystem", ignore); - } - } - if (executor != null) { - executor.shutdownNow(); + @VisibleForTesting + static ContentSummary getInputSummaryWithPool(final Context ctx, Set<Path> pathNeedProcess, MapWork work, + long[] summary, ExecutorService executor) throws IOException { + List<Future<?>> results = new ArrayList<Future<?>>(); + final Map<String, ContentSummary> resultMap = new ConcurrentHashMap<String, ContentSummary>(); + + HiveInterruptCallback interrup = HiveInterruptUtils.add(new HiveInterruptCallback() { + @Override + public void interrupt() { + for (Path path : pathNeedProcess) { + try { + path.getFileSystem(ctx.getConf()).close(); + } catch (IOException ignore) { + LOG.debug("Failed to close filesystem", ignore); } } - }); - try { - Configuration conf = ctx.getConf(); - JobConf jobConf = new JobConf(conf); - for (Path path : pathNeedProcess) { - final Path p = path; - final String pathStr = path.toString(); - // All threads share the same Configuration and JobConf based on the - // assumption that they are thread safe if only read operations are - // executed. It is not stated in Hadoop's javadoc, the sourcce codes - // clearly showed that they made efforts for it and we believe it is - // thread safe. Will revisit this piece of codes if we find the assumption - // is not correct. - final Configuration myConf = conf; - final JobConf myJobConf = jobConf; - final Map<String, Operator<?>> aliasToWork = work.getAliasToWork(); - final Map<Path, ArrayList<String>> pathToAlias = work.getPathToAliases(); - final PartitionDesc partDesc = work.getPathToPartitionInfo().get(p); - Runnable r = new Runnable() { - @Override - public void run() { - try { - Class<? extends InputFormat> inputFormatCls = partDesc - .getInputFileFormatClass(); - InputFormat inputFormatObj = HiveInputFormat.getInputFormatFromCache( - inputFormatCls, myJobConf); - if (inputFormatObj instanceof ContentSummaryInputFormat) { - ContentSummaryInputFormat cs = (ContentSummaryInputFormat) inputFormatObj; - resultMap.put(pathStr, cs.getContentSummary(p, myJobConf)); - return; - } + if (executor != null) { + executor.shutdownNow(); + } + } + }); + try { + Configuration conf = ctx.getConf(); + JobConf jobConf = new JobConf(conf); + for (Path path : pathNeedProcess) { + final Path p = path; + final String pathStr = path.toString(); + // All threads share the same Configuration and JobConf based on the + // assumption that they are thread safe if only read operations are + // executed. It is not stated in Hadoop's javadoc, the sourcce codes + // clearly showed that they made efforts for it and we believe it is + // thread safe. Will revisit this piece of codes if we find the assumption + // is not correct. + final Configuration myConf = conf; + final JobConf myJobConf = jobConf; + final Map<String, Operator<?>> aliasToWork = work.getAliasToWork(); + final Map<Path, ArrayList<String>> pathToAlias = work.getPathToAliases(); + final PartitionDesc partDesc = work.getPathToPartitionInfo().get(p); + Runnable r = new Runnable() { + @Override + public void run() { + try { + Class<? extends InputFormat> inputFormatCls = partDesc + .getInputFileFormatClass(); + InputFormat inputFormatObj = HiveInputFormat.getInputFormatFromCache( + inputFormatCls, myJobConf); + if (inputFormatObj instanceof ContentSummaryInputFormat) { + ContentSummaryInputFormat cs = (ContentSummaryInputFormat) inputFormatObj; + resultMap.put(pathStr, cs.getContentSummary(p, myJobConf)); + return; + } - String metaTableStorage = null; - if (partDesc.getTableDesc() != null && - partDesc.getTableDesc().getProperties() != null) { - metaTableStorage = partDesc.getTableDesc().getProperties() - .getProperty(hive_metastoreConstants.META_TABLE_STORAGE, null); - } - if (partDesc.getProperties() != null) { - metaTableStorage = partDesc.getProperties() - .getProperty(hive_metastoreConstants.META_TABLE_STORAGE, metaTableStorage); - } + String metaTableStorage = null; + if (partDesc.getTableDesc() != null && + partDesc.getTableDesc().getProperties() != null) { + metaTableStorage = partDesc.getTableDesc().getProperties() + .getProperty(hive_metastoreConstants.META_TABLE_STORAGE, null); + } + if (partDesc.getProperties() != null) { + metaTableStorage = partDesc.getProperties() + .getProperty(hive_metastoreConstants.META_TABLE_STORAGE, metaTableStorage); + } - HiveStorageHandler handler = HiveUtils.getStorageHandler(myConf, metaTableStorage); - if (handler instanceof InputEstimator) { - long total = 0; - TableDesc tableDesc = partDesc.getTableDesc(); - InputEstimator estimator = (InputEstimator) handler; - for (String alias : HiveFileFormatUtils.doGetAliasesFromPath(pathToAlias, p)) { - JobConf jobConf = new JobConf(myJobConf); - TableScanOperator scanOp = (TableScanOperator) aliasToWork.get(alias); - Utilities.setColumnNameList(jobConf, scanOp, true); - Utilities.setColumnTypeList(jobConf, scanOp, true); - PlanUtils.configureInputJobPropertiesForStorageHandler(tableDesc); - Utilities.copyTableJobPropertiesToConf(tableDesc, jobConf); - total += estimator.estimate(jobConf, scanOp, -1).getTotalLength(); - } - resultMap.put(pathStr, new ContentSummary(total, -1, -1)); - } else { - // todo: should nullify summary for non-native tables, - // not to be selected as a mapjoin target - FileSystem fs = p.getFileSystem(myConf); - resultMap.put(pathStr, fs.getContentSummary(p)); + HiveStorageHandler handler = HiveUtils.getStorageHandler(myConf, metaTableStorage); + if (handler instanceof InputEstimator) { + long total = 0; + TableDesc tableDesc = partDesc.getTableDesc(); + InputEstimator estimator = (InputEstimator) handler; + for (String alias : HiveFileFormatUtils.doGetAliasesFromPath(pathToAlias, p)) { + JobConf jobConf = new JobConf(myJobConf); + TableScanOperator scanOp = (TableScanOperator) aliasToWork.get(alias); + Utilities.setColumnNameList(jobConf, scanOp, true); + Utilities.setColumnTypeList(jobConf, scanOp, true); + PlanUtils.configureInputJobPropertiesForStorageHandler(tableDesc); + Utilities.copyTableJobPropertiesToConf(tableDesc, jobConf); + total += estimator.estimate(jobConf, scanOp, -1).getTotalLength(); } - } catch (Exception e) { - // We safely ignore this exception for summary data. - // We don't update the cache to protect it from polluting other - // usages. The worst case is that IOException will always be - // retried for another getInputSummary(), which is fine as - // IOException is not considered as a common case. - LOG.info("Cannot get size of " + pathStr + ". Safely ignored."); + resultMap.put(pathStr, new ContentSummary(total, -1, -1)); + } else { + // todo: should nullify summary for non-native tables, + // not to be selected as a mapjoin target + FileSystem fs = p.getFileSystem(myConf); + resultMap.put(pathStr, fs.getContentSummary(p)); } + } catch (Exception e) { + // We safely ignore this exception for summary data. + // We don't update the cache to protect it from polluting other + // usages. The worst case is that IOException will always be + // retried for another getInputSummary(), which is fine as + // IOException is not considered as a common case. + LOG.info("Cannot get size of " + pathStr + ". Safely ignored."); } - }; - - if (executor == null) { - r.run(); - } else { - Future<?> result = executor.submit(r); - results.add(result); } + }; + + if (executor == null) { + r.run(); + } else { + Future<?> result = executor.submit(r); + results.add(result); } + } - if (executor != null) { - for (Future<?> result : results) { - boolean executorDone = false; - do { - try { - result.get(); - executorDone = true; - } catch (InterruptedException e) { - LOG.info("Interrupted when waiting threads: ", e); - Thread.currentThread().interrupt(); - break; - } catch (ExecutionException e) { - throw new IOException(e); - } - } while (!executorDone); - } - executor.shutdown(); + if (executor != null) { + for (Future<?> result : results) { + boolean executorDone = false; + do { + try { + result.get(); + executorDone = true; + } catch (InterruptedException e) { + LOG.info("Interrupted when waiting threads: ", e); + Thread.currentThread().interrupt(); + break; + } catch (ExecutionException e) { + throw new IOException(e); + } + } while (!executorDone); } - HiveInterruptUtils.checkInterrupted(); - for (Map.Entry<String, ContentSummary> entry : resultMap.entrySet()) { - ContentSummary cs = entry.getValue(); + executor.shutdown(); + } + HiveInterruptUtils.checkInterrupted(); + for (Map.Entry<String, ContentSummary> entry : resultMap.entrySet()) { + ContentSummary cs = entry.getValue(); - summary[0] += cs.getLength(); - summary[1] += cs.getFileCount(); - summary[2] += cs.getDirectoryCount(); + summary[0] += cs.getLength(); + summary[1] += cs.getFileCount(); + summary[2] += cs.getDirectoryCount(); - ctx.addCS(entry.getKey(), cs); - LOG.info("Cache Content Summary for " + entry.getKey() + " length: " + cs.getLength() - + " file count: " - + cs.getFileCount() + " directory count: " + cs.getDirectoryCount()); - } + ctx.addCS(entry.getKey(), cs); + LOG.info("Cache Content Summary for " + entry.getKey() + " length: " + cs.getLength() + + " file count: " + + cs.getFileCount() + " directory count: " + cs.getDirectoryCount()); + } - perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.INPUT_SUMMARY); - return new ContentSummary(summary[0], summary[1], summary[2]); - } finally { - HiveInterruptUtils.remove(interrup); + return new ContentSummary(summary[0], summary[1], summary[2]); + } finally { + if (executor != null) { + executor.shutdownNow(); } + HiveInterruptUtils.remove(interrup); } } @@ -3144,7 +3155,7 @@ public final class Utilities { // Note: this copies the list because createDummyFileForEmptyPartition may modify the map. for (Path file : new LinkedList<Path>(work.getPathToAliases().keySet())) { if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT) - throw new IOException("Operation is Canceled. "); + throw new IOException("Operation is Canceled."); List<String> aliases = work.getPathToAliases().get(file); if (aliases.contains(alias)) { @@ -3188,41 +3199,57 @@ public final class Utilities { } } - ExecutorService pool = null; + List<Path> finalPathsToAdd = new LinkedList<>(); + int numExecutors = getMaxExecutorsForInputListing(job, pathsToAdd.size()); if (numExecutors > 1) { - pool = Executors.newFixedThreadPool(numExecutors, - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Get-Input-Paths-%d").build()); - } + ExecutorService pool = Executors.newFixedThreadPool(numExecutors, + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Get-Input-Paths-%d").build()); - List<Path> finalPathsToAdd = new LinkedList<>(); - Map<GetInputPathsCallable, Future<Path>> getPathsCallableToFuture = new LinkedHashMap<>(); - for (final Path path : pathsToAdd) { - if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT) { - throw new IOException("Operation is Canceled. "); - } - if (pool == null) { + finalPathsToAdd.addAll(getInputPathsWithPool(job, work, hiveScratchDir, ctx, skipDummy, pathsToAdd, pool)); + } else { + for (final Path path : pathsToAdd) { + if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT) { + throw new IOException("Operation is Canceled."); + } Path newPath = new GetInputPathsCallable(path, job, work, hiveScratchDir, ctx, skipDummy).call(); updatePathForMapWork(newPath, work, path); finalPathsToAdd.add(newPath); - } else { + } + } + + return finalPathsToAdd; + } + + @VisibleForTesting + static List<Path> getInputPathsWithPool(JobConf job, MapWork work, Path hiveScratchDir, + Context ctx, boolean skipDummy, List<Path> pathsToAdd, + ExecutorService pool) throws IOException, ExecutionException, InterruptedException { + LockedDriverState lDrvStat = LockedDriverState.getLockedDriverState(); + List<Path> finalPathsToAdd = new ArrayList<>(); + try { + Map<GetInputPathsCallable, Future<Path>> getPathsCallableToFuture = new LinkedHashMap<>(); + for (final Path path : pathsToAdd) { + if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT) { + throw new IOException("Operation is Canceled."); + } GetInputPathsCallable callable = new GetInputPathsCallable(path, job, work, hiveScratchDir, ctx, skipDummy); getPathsCallableToFuture.put(callable, pool.submit(callable)); } - } + pool.shutdown(); - if (pool != null) { for (Map.Entry<GetInputPathsCallable, Future<Path>> future : getPathsCallableToFuture.entrySet()) { if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT) { - throw new IOException("Operation is Canceled. "); + throw new IOException("Operation is Canceled."); } Path newPath = future.getValue().get(); updatePathForMapWork(newPath, work, future.getKey().path); finalPathsToAdd.add(newPath); } + } finally { + pool.shutdownNow(); } - return finalPathsToAdd; } http://git-wip-us.apache.org/repos/asf/hive/blob/3fc131c7/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java index dcd8f95..1a464c8 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java @@ -26,8 +26,11 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.apache.hadoop.hive.ql.exec.Utilities.getFileExtension; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.File; @@ -35,12 +38,17 @@ import java.io.IOException; import java.io.Serializable; import java.sql.Timestamp; import java.util.ArrayList; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Set; import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.ContentSummary; @@ -532,6 +540,97 @@ public class TestUtilities { } @Test + public void testGetInputSummaryPool() throws ExecutionException, InterruptedException, IOException { + ExecutorService pool = mock(ExecutorService.class); + when(pool.submit(any(Runnable.class))).thenReturn(mock(Future.class)); + + Set<Path> pathNeedProcess = new HashSet<>(); + pathNeedProcess.add(new Path("dummy-path1")); + pathNeedProcess.add(new Path("dummy-path2")); + pathNeedProcess.add(new Path("dummy-path3")); + + SessionState.start(new HiveConf()); + JobConf jobConf = new JobConf(); + Context context = new Context(jobConf); + + Utilities.getInputSummaryWithPool(context, pathNeedProcess, mock(MapWork.class), new long[3], pool); + verify(pool, times(3)).submit(any(Runnable.class)); + verify(pool).shutdown(); + verify(pool).shutdownNow(); + } + + @Test + public void testGetInputSummaryPoolAndFailure() throws ExecutionException, InterruptedException, IOException { + ExecutorService pool = mock(ExecutorService.class); + when(pool.submit(any(Runnable.class))).thenReturn(mock(Future.class)); + + Set<Path> pathNeedProcess = new HashSet<>(); + pathNeedProcess.add(new Path("dummy-path1")); + pathNeedProcess.add(new Path("dummy-path2")); + pathNeedProcess.add(new Path("dummy-path3")); + + SessionState.start(new HiveConf()); + JobConf jobConf = new JobConf(); + Context context = new Context(jobConf); + + Utilities.getInputSummaryWithPool(context, pathNeedProcess, mock(MapWork.class), new long[3], pool); + verify(pool, times(3)).submit(any(Runnable.class)); + verify(pool).shutdown(); + verify(pool).shutdownNow(); + } + + @Test + public void testGetInputPathsPool() throws IOException, ExecutionException, InterruptedException { + List<Path> pathsToAdd = new ArrayList<>(); + Path path = new Path("dummy-path"); + + pathsToAdd.add(path); + pathsToAdd.add(path); + pathsToAdd.add(path); + + ExecutorService pool = mock(ExecutorService.class); + Future mockFuture = mock(Future.class); + + when(mockFuture.get()).thenReturn(path); + when(pool.submit(any(Callable.class))).thenReturn(mockFuture); + + Utilities.getInputPathsWithPool(mock(JobConf.class), mock(MapWork.class), mock(Path.class), mock(Context.class), + false, pathsToAdd, pool); + + verify(pool, times(3)).submit(any(Callable.class)); + verify(pool).shutdown(); + verify(pool).shutdownNow(); + } + + @Test + public void testGetInputPathsPoolAndFailure() throws IOException, ExecutionException, InterruptedException { + List<Path> pathsToAdd = new ArrayList<>(); + Path path = new Path("dummy-path"); + + pathsToAdd.add(path); + pathsToAdd.add(path); + pathsToAdd.add(path); + + ExecutorService pool = mock(ExecutorService.class); + Future mockFuture = mock(Future.class); + + when(mockFuture.get()).thenThrow(new RuntimeException()); + when(pool.submit(any(Callable.class))).thenReturn(mockFuture); + + Exception e = null; + try { + Utilities.getInputPathsWithPool(mock(JobConf.class), mock(MapWork.class), mock(Path.class), mock(Context.class), + false, pathsToAdd, pool); + } catch (Exception thrownException) { + e = thrownException; + } + assertNotNull(e); + + verify(pool, times(3)).submit(any(Callable.class)); + verify(pool).shutdownNow(); + } + + @Test public void testGetInputSummaryWithASingleThread() throws IOException { final int NUM_PARTITIONS = 5; final int BYTES_PER_FILE = 5;