HIVE-16949: Leak of threads from Get-Input-Paths and Get-Input-Summary thread 
pool (Sahil Takiar, reviewed by Vihang Karajgaonkar)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/3fc131c7
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/3fc131c7
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/3fc131c7

Branch: refs/heads/hive-14535
Commit: 3fc131c79a5329ea509f0c125e789ddd042b1797
Parents: 6be50b7
Author: Sahil Takiar <takiar.sa...@gmail.com>
Authored: Mon Aug 28 11:19:41 2017 -0700
Committer: Sahil Takiar <stak...@cloudera.com>
Committed: Mon Aug 28 11:19:41 2017 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hive/ql/exec/Utilities.java   | 305 ++++++++++---------
 .../hadoop/hive/ql/exec/TestUtilities.java      |  99 ++++++
 2 files changed, 265 insertions(+), 139 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/3fc131c7/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 99c85f2..aca99f2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -2233,153 +2233,164 @@ public final class Utilities {
 
       // Process the case when name node call is needed
       final Map<String, ContentSummary> resultMap = new 
ConcurrentHashMap<String, ContentSummary>();
-      ArrayList<Future<?>> results = new ArrayList<Future<?>>();
       final ExecutorService executor;
 
       int numExecutors = getMaxExecutorsForInputListing(ctx.getConf(), 
pathNeedProcess.size());
       if (numExecutors > 1) {
         LOG.info("Using " + numExecutors + " threads for getContentSummary");
         executor = Executors.newFixedThreadPool(numExecutors,
-            new ThreadFactoryBuilder().setDaemon(true)
-                .setNameFormat("Get-Input-Summary-%d").build());
+                new ThreadFactoryBuilder().setDaemon(true)
+                        .setNameFormat("Get-Input-Summary-%d").build());
       } else {
         executor = null;
       }
+      ContentSummary cs = getInputSummaryWithPool(ctx, pathNeedProcess, work, 
summary, executor);
+      perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.INPUT_SUMMARY);
+      return cs;
+    }
+  }
 
-      HiveInterruptCallback interrup = HiveInterruptUtils.add(new 
HiveInterruptCallback() {
-        @Override
-        public void interrupt() {
-          for (Path path : pathNeedProcess) {
-            try {
-              path.getFileSystem(ctx.getConf()).close();
-            } catch (IOException ignore) {
-                LOG.debug("Failed to close filesystem", ignore);
-            }
-          }
-          if (executor != null) {
-            executor.shutdownNow();
+  @VisibleForTesting
+  static ContentSummary getInputSummaryWithPool(final Context ctx, Set<Path> 
pathNeedProcess, MapWork work,
+                                                long[] summary, 
ExecutorService executor) throws IOException {
+    List<Future<?>> results = new ArrayList<Future<?>>();
+    final Map<String, ContentSummary> resultMap = new 
ConcurrentHashMap<String, ContentSummary>();
+
+    HiveInterruptCallback interrup = HiveInterruptUtils.add(new 
HiveInterruptCallback() {
+      @Override
+      public void interrupt() {
+        for (Path path : pathNeedProcess) {
+          try {
+            path.getFileSystem(ctx.getConf()).close();
+          } catch (IOException ignore) {
+            LOG.debug("Failed to close filesystem", ignore);
           }
         }
-      });
-      try {
-        Configuration conf = ctx.getConf();
-        JobConf jobConf = new JobConf(conf);
-        for (Path path : pathNeedProcess) {
-          final Path p = path;
-          final String pathStr = path.toString();
-          // All threads share the same Configuration and JobConf based on the
-          // assumption that they are thread safe if only read operations are
-          // executed. It is not stated in Hadoop's javadoc, the sourcce codes
-          // clearly showed that they made efforts for it and we believe it is
-          // thread safe. Will revisit this piece of codes if we find the 
assumption
-          // is not correct.
-          final Configuration myConf = conf;
-          final JobConf myJobConf = jobConf;
-          final Map<String, Operator<?>> aliasToWork = work.getAliasToWork();
-          final Map<Path, ArrayList<String>> pathToAlias = 
work.getPathToAliases();
-          final PartitionDesc partDesc = work.getPathToPartitionInfo().get(p);
-          Runnable r = new Runnable() {
-            @Override
-            public void run() {
-              try {
-                Class<? extends InputFormat> inputFormatCls = partDesc
-                    .getInputFileFormatClass();
-                InputFormat inputFormatObj = 
HiveInputFormat.getInputFormatFromCache(
-                    inputFormatCls, myJobConf);
-                if (inputFormatObj instanceof ContentSummaryInputFormat) {
-                  ContentSummaryInputFormat cs = (ContentSummaryInputFormat) 
inputFormatObj;
-                  resultMap.put(pathStr, cs.getContentSummary(p, myJobConf));
-                  return;
-                }
+        if (executor != null) {
+          executor.shutdownNow();
+        }
+      }
+    });
+    try {
+      Configuration conf = ctx.getConf();
+      JobConf jobConf = new JobConf(conf);
+      for (Path path : pathNeedProcess) {
+        final Path p = path;
+        final String pathStr = path.toString();
+        // All threads share the same Configuration and JobConf based on the
+        // assumption that they are thread safe if only read operations are
+        // executed. It is not stated in Hadoop's javadoc, the sourcce codes
+        // clearly showed that they made efforts for it and we believe it is
+        // thread safe. Will revisit this piece of codes if we find the 
assumption
+        // is not correct.
+        final Configuration myConf = conf;
+        final JobConf myJobConf = jobConf;
+        final Map<String, Operator<?>> aliasToWork = work.getAliasToWork();
+        final Map<Path, ArrayList<String>> pathToAlias = 
work.getPathToAliases();
+        final PartitionDesc partDesc = work.getPathToPartitionInfo().get(p);
+        Runnable r = new Runnable() {
+          @Override
+          public void run() {
+            try {
+              Class<? extends InputFormat> inputFormatCls = partDesc
+                      .getInputFileFormatClass();
+              InputFormat inputFormatObj = 
HiveInputFormat.getInputFormatFromCache(
+                      inputFormatCls, myJobConf);
+              if (inputFormatObj instanceof ContentSummaryInputFormat) {
+                ContentSummaryInputFormat cs = (ContentSummaryInputFormat) 
inputFormatObj;
+                resultMap.put(pathStr, cs.getContentSummary(p, myJobConf));
+                return;
+              }
 
-                String metaTableStorage = null;
-                if (partDesc.getTableDesc() != null &&
-                    partDesc.getTableDesc().getProperties() != null) {
-                  metaTableStorage = partDesc.getTableDesc().getProperties()
-                      .getProperty(hive_metastoreConstants.META_TABLE_STORAGE, 
null);
-                }
-                if (partDesc.getProperties() != null) {
-                  metaTableStorage = partDesc.getProperties()
-                      .getProperty(hive_metastoreConstants.META_TABLE_STORAGE, 
metaTableStorage);
-                }
+              String metaTableStorage = null;
+              if (partDesc.getTableDesc() != null &&
+                      partDesc.getTableDesc().getProperties() != null) {
+                metaTableStorage = partDesc.getTableDesc().getProperties()
+                        
.getProperty(hive_metastoreConstants.META_TABLE_STORAGE, null);
+              }
+              if (partDesc.getProperties() != null) {
+                metaTableStorage = partDesc.getProperties()
+                        
.getProperty(hive_metastoreConstants.META_TABLE_STORAGE, metaTableStorage);
+              }
 
-                HiveStorageHandler handler = 
HiveUtils.getStorageHandler(myConf, metaTableStorage);
-                if (handler instanceof InputEstimator) {
-                  long total = 0;
-                  TableDesc tableDesc = partDesc.getTableDesc();
-                  InputEstimator estimator = (InputEstimator) handler;
-                  for (String alias : 
HiveFileFormatUtils.doGetAliasesFromPath(pathToAlias, p)) {
-                    JobConf jobConf = new JobConf(myJobConf);
-                    TableScanOperator scanOp = (TableScanOperator) 
aliasToWork.get(alias);
-                    Utilities.setColumnNameList(jobConf, scanOp, true);
-                    Utilities.setColumnTypeList(jobConf, scanOp, true);
-                    
PlanUtils.configureInputJobPropertiesForStorageHandler(tableDesc);
-                    Utilities.copyTableJobPropertiesToConf(tableDesc, jobConf);
-                    total += estimator.estimate(jobConf, scanOp, 
-1).getTotalLength();
-                  }
-                  resultMap.put(pathStr, new ContentSummary(total, -1, -1));
-                } else {
-                  // todo: should nullify summary for non-native tables,
-                  // not to be selected as a mapjoin target
-                  FileSystem fs = p.getFileSystem(myConf);
-                  resultMap.put(pathStr, fs.getContentSummary(p));
+              HiveStorageHandler handler = HiveUtils.getStorageHandler(myConf, 
metaTableStorage);
+              if (handler instanceof InputEstimator) {
+                long total = 0;
+                TableDesc tableDesc = partDesc.getTableDesc();
+                InputEstimator estimator = (InputEstimator) handler;
+                for (String alias : 
HiveFileFormatUtils.doGetAliasesFromPath(pathToAlias, p)) {
+                  JobConf jobConf = new JobConf(myJobConf);
+                  TableScanOperator scanOp = (TableScanOperator) 
aliasToWork.get(alias);
+                  Utilities.setColumnNameList(jobConf, scanOp, true);
+                  Utilities.setColumnTypeList(jobConf, scanOp, true);
+                  
PlanUtils.configureInputJobPropertiesForStorageHandler(tableDesc);
+                  Utilities.copyTableJobPropertiesToConf(tableDesc, jobConf);
+                  total += estimator.estimate(jobConf, scanOp, 
-1).getTotalLength();
                 }
-              } catch (Exception e) {
-                // We safely ignore this exception for summary data.
-                // We don't update the cache to protect it from polluting other
-                // usages. The worst case is that IOException will always be
-                // retried for another getInputSummary(), which is fine as
-                // IOException is not considered as a common case.
-                LOG.info("Cannot get size of " + pathStr + ". Safely 
ignored.");
+                resultMap.put(pathStr, new ContentSummary(total, -1, -1));
+              } else {
+                // todo: should nullify summary for non-native tables,
+                // not to be selected as a mapjoin target
+                FileSystem fs = p.getFileSystem(myConf);
+                resultMap.put(pathStr, fs.getContentSummary(p));
               }
+            } catch (Exception e) {
+              // We safely ignore this exception for summary data.
+              // We don't update the cache to protect it from polluting other
+              // usages. The worst case is that IOException will always be
+              // retried for another getInputSummary(), which is fine as
+              // IOException is not considered as a common case.
+              LOG.info("Cannot get size of " + pathStr + ". Safely ignored.");
             }
-          };
-
-          if (executor == null) {
-            r.run();
-          } else {
-            Future<?> result = executor.submit(r);
-            results.add(result);
           }
+        };
+
+        if (executor == null) {
+          r.run();
+        } else {
+          Future<?> result = executor.submit(r);
+          results.add(result);
         }
+      }
 
-        if (executor != null) {
-          for (Future<?> result : results) {
-            boolean executorDone = false;
-            do {
-              try {
-                result.get();
-                executorDone = true;
-              } catch (InterruptedException e) {
-                LOG.info("Interrupted when waiting threads: ", e);
-                Thread.currentThread().interrupt();
-                break;
-              } catch (ExecutionException e) {
-                throw new IOException(e);
-              }
-            } while (!executorDone);
-          }
-          executor.shutdown();
+      if (executor != null) {
+        for (Future<?> result : results) {
+          boolean executorDone = false;
+          do {
+            try {
+              result.get();
+              executorDone = true;
+            } catch (InterruptedException e) {
+              LOG.info("Interrupted when waiting threads: ", e);
+              Thread.currentThread().interrupt();
+              break;
+            } catch (ExecutionException e) {
+              throw new IOException(e);
+            }
+          } while (!executorDone);
         }
-        HiveInterruptUtils.checkInterrupted();
-        for (Map.Entry<String, ContentSummary> entry : resultMap.entrySet()) {
-          ContentSummary cs = entry.getValue();
+        executor.shutdown();
+      }
+      HiveInterruptUtils.checkInterrupted();
+      for (Map.Entry<String, ContentSummary> entry : resultMap.entrySet()) {
+        ContentSummary cs = entry.getValue();
 
-          summary[0] += cs.getLength();
-          summary[1] += cs.getFileCount();
-          summary[2] += cs.getDirectoryCount();
+        summary[0] += cs.getLength();
+        summary[1] += cs.getFileCount();
+        summary[2] += cs.getDirectoryCount();
 
-          ctx.addCS(entry.getKey(), cs);
-          LOG.info("Cache Content Summary for " + entry.getKey() + " length: " 
+ cs.getLength()
-              + " file count: "
-              + cs.getFileCount() + " directory count: " + 
cs.getDirectoryCount());
-        }
+        ctx.addCS(entry.getKey(), cs);
+        LOG.info("Cache Content Summary for " + entry.getKey() + " length: " + 
cs.getLength()
+                + " file count: "
+                + cs.getFileCount() + " directory count: " + 
cs.getDirectoryCount());
+      }
 
-        perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.INPUT_SUMMARY);
-        return new ContentSummary(summary[0], summary[1], summary[2]);
-      } finally {
-        HiveInterruptUtils.remove(interrup);
+      return new ContentSummary(summary[0], summary[1], summary[2]);
+    } finally {
+      if (executor != null) {
+        executor.shutdownNow();
       }
+      HiveInterruptUtils.remove(interrup);
     }
   }
 
@@ -3144,7 +3155,7 @@ public final class Utilities {
       // Note: this copies the list because createDummyFileForEmptyPartition 
may modify the map.
       for (Path file : new LinkedList<Path>(work.getPathToAliases().keySet())) 
{
         if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT)
-          throw new IOException("Operation is Canceled. ");
+          throw new IOException("Operation is Canceled.");
 
         List<String> aliases = work.getPathToAliases().get(file);
         if (aliases.contains(alias)) {
@@ -3188,41 +3199,57 @@ public final class Utilities {
       }
     }
 
-    ExecutorService pool = null;
+    List<Path> finalPathsToAdd = new LinkedList<>();
+
     int numExecutors = getMaxExecutorsForInputListing(job, pathsToAdd.size());
     if (numExecutors > 1) {
-      pool = Executors.newFixedThreadPool(numExecutors,
-          new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("Get-Input-Paths-%d").build());
-    }
+      ExecutorService pool = Executors.newFixedThreadPool(numExecutors,
+              new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("Get-Input-Paths-%d").build());
 
-    List<Path> finalPathsToAdd = new LinkedList<>();
-    Map<GetInputPathsCallable, Future<Path>> getPathsCallableToFuture = new 
LinkedHashMap<>();
-    for (final Path path : pathsToAdd) {
-      if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT) {
-        throw new IOException("Operation is Canceled. ");
-      }
-      if (pool == null) {
+      finalPathsToAdd.addAll(getInputPathsWithPool(job, work, hiveScratchDir, 
ctx, skipDummy, pathsToAdd, pool));
+    } else {
+      for (final Path path : pathsToAdd) {
+        if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT) 
{
+          throw new IOException("Operation is Canceled.");
+        }
         Path newPath = new GetInputPathsCallable(path, job, work, 
hiveScratchDir, ctx, skipDummy).call();
         updatePathForMapWork(newPath, work, path);
         finalPathsToAdd.add(newPath);
-      } else {
+      }
+    }
+
+    return finalPathsToAdd;
+  }
+
+  @VisibleForTesting
+  static List<Path> getInputPathsWithPool(JobConf job, MapWork work, Path 
hiveScratchDir,
+                                           Context ctx, boolean skipDummy, 
List<Path> pathsToAdd,
+                                           ExecutorService pool) throws 
IOException, ExecutionException, InterruptedException {
+    LockedDriverState lDrvStat = LockedDriverState.getLockedDriverState();
+    List<Path> finalPathsToAdd = new ArrayList<>();
+    try {
+      Map<GetInputPathsCallable, Future<Path>> getPathsCallableToFuture = new 
LinkedHashMap<>();
+      for (final Path path : pathsToAdd) {
+        if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT) 
{
+          throw new IOException("Operation is Canceled.");
+        }
         GetInputPathsCallable callable = new GetInputPathsCallable(path, job, 
work, hiveScratchDir, ctx, skipDummy);
         getPathsCallableToFuture.put(callable, pool.submit(callable));
       }
-    }
+      pool.shutdown();
 
-    if (pool != null) {
       for (Map.Entry<GetInputPathsCallable, Future<Path>> future : 
getPathsCallableToFuture.entrySet()) {
         if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT) 
{
-          throw new IOException("Operation is Canceled. ");
+          throw new IOException("Operation is Canceled.");
         }
 
         Path newPath = future.getValue().get();
         updatePathForMapWork(newPath, work, future.getKey().path);
         finalPathsToAdd.add(newPath);
       }
+    } finally {
+      pool.shutdownNow();
     }
-
     return finalPathsToAdd;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/3fc131c7/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java
index dcd8f95..1a464c8 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java
@@ -26,8 +26,11 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.apache.hadoop.hive.ql.exec.Utilities.getFileExtension;
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.io.File;
@@ -35,12 +38,17 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.sql.Timestamp;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ContentSummary;
@@ -532,6 +540,97 @@ public class TestUtilities {
   }
 
   @Test
+  public void testGetInputSummaryPool() throws ExecutionException, 
InterruptedException, IOException {
+    ExecutorService pool = mock(ExecutorService.class);
+    when(pool.submit(any(Runnable.class))).thenReturn(mock(Future.class));
+
+    Set<Path> pathNeedProcess = new HashSet<>();
+    pathNeedProcess.add(new Path("dummy-path1"));
+    pathNeedProcess.add(new Path("dummy-path2"));
+    pathNeedProcess.add(new Path("dummy-path3"));
+
+    SessionState.start(new HiveConf());
+    JobConf jobConf = new JobConf();
+    Context context = new Context(jobConf);
+
+    Utilities.getInputSummaryWithPool(context, pathNeedProcess, 
mock(MapWork.class), new long[3], pool);
+    verify(pool, times(3)).submit(any(Runnable.class));
+    verify(pool).shutdown();
+    verify(pool).shutdownNow();
+  }
+
+  @Test
+  public void testGetInputSummaryPoolAndFailure() throws ExecutionException, 
InterruptedException, IOException {
+    ExecutorService pool = mock(ExecutorService.class);
+    when(pool.submit(any(Runnable.class))).thenReturn(mock(Future.class));
+
+    Set<Path> pathNeedProcess = new HashSet<>();
+    pathNeedProcess.add(new Path("dummy-path1"));
+    pathNeedProcess.add(new Path("dummy-path2"));
+    pathNeedProcess.add(new Path("dummy-path3"));
+
+    SessionState.start(new HiveConf());
+    JobConf jobConf = new JobConf();
+    Context context = new Context(jobConf);
+
+    Utilities.getInputSummaryWithPool(context, pathNeedProcess, 
mock(MapWork.class), new long[3], pool);
+    verify(pool, times(3)).submit(any(Runnable.class));
+    verify(pool).shutdown();
+    verify(pool).shutdownNow();
+  }
+
+  @Test
+  public void testGetInputPathsPool() throws IOException, ExecutionException, 
InterruptedException {
+    List<Path> pathsToAdd = new ArrayList<>();
+    Path path = new Path("dummy-path");
+
+    pathsToAdd.add(path);
+    pathsToAdd.add(path);
+    pathsToAdd.add(path);
+
+    ExecutorService pool = mock(ExecutorService.class);
+    Future mockFuture = mock(Future.class);
+
+    when(mockFuture.get()).thenReturn(path);
+    when(pool.submit(any(Callable.class))).thenReturn(mockFuture);
+
+    Utilities.getInputPathsWithPool(mock(JobConf.class), mock(MapWork.class), 
mock(Path.class), mock(Context.class),
+            false, pathsToAdd, pool);
+
+    verify(pool, times(3)).submit(any(Callable.class));
+    verify(pool).shutdown();
+    verify(pool).shutdownNow();
+  }
+
+  @Test
+  public void testGetInputPathsPoolAndFailure() throws IOException, 
ExecutionException, InterruptedException {
+    List<Path> pathsToAdd = new ArrayList<>();
+    Path path = new Path("dummy-path");
+
+    pathsToAdd.add(path);
+    pathsToAdd.add(path);
+    pathsToAdd.add(path);
+
+    ExecutorService pool = mock(ExecutorService.class);
+    Future mockFuture = mock(Future.class);
+
+    when(mockFuture.get()).thenThrow(new RuntimeException());
+    when(pool.submit(any(Callable.class))).thenReturn(mockFuture);
+
+    Exception e = null;
+    try {
+      Utilities.getInputPathsWithPool(mock(JobConf.class), 
mock(MapWork.class), mock(Path.class), mock(Context.class),
+              false, pathsToAdd, pool);
+    } catch (Exception thrownException) {
+      e = thrownException;
+    }
+    assertNotNull(e);
+
+    verify(pool, times(3)).submit(any(Callable.class));
+    verify(pool).shutdownNow();
+  }
+
+  @Test
   public void testGetInputSummaryWithASingleThread() throws IOException {
     final int NUM_PARTITIONS = 5;
     final int BYTES_PER_FILE = 5;

Reply via email to