Repository: hive Updated Branches: refs/heads/master 38797d212 -> 64c96e1e9
HIVE-13726 : Improve dynamic partition loading VI (Ashutosh Chauhan via Rui Li) Signed-off-by: Ashutosh Chauhan <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/64c96e1e Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/64c96e1e Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/64c96e1e Branch: refs/heads/master Commit: 64c96e1e92a490a069f9cc924b2ec476187f98ea Parents: 38797d2 Author: Ashutosh Chauhan <[email protected]> Authored: Mon May 9 18:31:15 2016 -0700 Committer: Ashutosh Chauhan <[email protected]> Committed: Thu May 12 08:59:12 2016 -0700 ---------------------------------------------------------------------- .../apache/hadoop/hive/common/FileUtils.java | 58 +++----------------- .../apache/hadoop/hive/ql/metadata/Hive.java | 49 ++++++++++++++++- 2 files changed, 54 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/64c96e1e/common/src/java/org/apache/hadoop/hive/common/FileUtils.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java index b65c35b..5cf4d39 100644 --- a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java +++ b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java @@ -50,7 +50,6 @@ import org.apache.hive.common.util.ShutdownHookManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - /** * Collection of file manipulation utilities common across Hive. */ @@ -575,73 +574,32 @@ public final class FileUtils { } /** - * Trashes or deletes all files under a directory. Leaves the directory as is. - * @param fs FileSystem to use - * @param f path of directory - * @param conf hive configuration - * @param forceDelete whether to force delete files if trashing does not succeed - * @return true if deletion successful - * @throws FileNotFoundException - * @throws IOException - */ - public static boolean trashFilesUnderDir(FileSystem fs, Path f, Configuration conf, - boolean forceDelete) throws FileNotFoundException, IOException { - FileStatus[] statuses = fs.listStatus(f, HIDDEN_FILES_PATH_FILTER); - boolean result = true; - for (FileStatus status : statuses) { - result = result & moveToTrash(fs, status.getPath(), conf, forceDelete); - } - return result; - } - - /** - * Move a particular file or directory to the trash. If for a certain reason the trashing fails - * it will force deletes the file or directory - * @param fs FileSystem to use - * @param f path of file or directory to move to trash. - * @param conf - * @return true if move successful - * @throws IOException - */ - public static boolean moveToTrash(FileSystem fs, Path f, Configuration conf) throws IOException { - return moveToTrash(fs, f, conf, true); - } - - /** * Move a particular file or directory to the trash. * @param fs FileSystem to use * @param f path of file or directory to move to trash. * @param conf - * @param forceDelete whether force delete the file or directory if trashing fails * @return true if move successful * @throws IOException */ - public static boolean moveToTrash(FileSystem fs, Path f, Configuration conf, boolean forceDelete) + public static boolean moveToTrash(FileSystem fs, Path f, Configuration conf) throws IOException { LOG.debug("deleting " + f); - boolean result = false; try { result = Trash.moveToAppropriateTrash(fs, f, conf); if (result) { - LOG.info("Moved to trash: " + f); + LOG.trace("Moved to trash: " + f); return true; } } catch (IOException ioe) { - if (forceDelete) { - // for whatever failure reason including that trash has lower encryption zone - // retry with force delete - LOG.warn(ioe.getMessage() + "; Force to delete it."); - } else { - throw ioe; - } + // for whatever failure reason including that trash has lower encryption zone + // retry with force delete + LOG.warn(ioe.getMessage() + "; Force to delete it."); } - if (forceDelete) { - result = fs.delete(f, true); - if (!result) { - LOG.error("Failed to delete " + f); - } + result = fs.delete(f, true); + if (!result) { + LOG.error("Failed to delete " + f); } return result; http://git-wip-us.apache.org/repos/asf/hive/blob/64c96e1e/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index b5e660b..dcfc2b5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -44,6 +44,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -1528,7 +1529,7 @@ public class Hive { } List<Path> newFiles = null; if (replace || (oldPart == null && !isAcid)) { - Hive.replaceFiles(tbl.getPath(), loadPath, newPartPath, oldPartPath, getConf(), + replaceFiles(tbl.getPath(), loadPath, newPartPath, oldPartPath, getConf(), isSrcLocal); } else { if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary() && oldPart != null) { @@ -3080,7 +3081,7 @@ private void constructOneLBLocationMap(FileStatus fSta, * @param isSrcLocal * If the source directory is LOCAL */ - protected static void replaceFiles(Path tablePath, Path srcf, Path destf, Path oldPath, HiveConf conf, + protected void replaceFiles(Path tablePath, Path srcf, Path destf, Path oldPath, HiveConf conf, boolean isSrcLocal) throws HiveException { try { @@ -3114,7 +3115,7 @@ private void constructOneLBLocationMap(FileStatus fSta, // existing content might result in incorrect (extra) data. // But not sure why we changed not to delete the oldPath in HIVE-8750 if it is // not the destf or its subdir? - oldPathDeleted = FileUtils.trashFilesUnderDir(fs2, oldPath, conf, true); + oldPathDeleted = trashFilesUnderDir(fs2, oldPath, conf); } } } catch (IOException e) { @@ -3162,6 +3163,48 @@ private void constructOneLBLocationMap(FileStatus fSta, } } + + /** + * Trashes or deletes all files under a directory. Leaves the directory as is. + * @param fs FileSystem to use + * @param f path of directory + * @param conf hive configuration + * @param forceDelete whether to force delete files if trashing does not succeed + * @return true if deletion successful + * @throws IOException + */ + private boolean trashFilesUnderDir(final FileSystem fs, Path f, final Configuration conf) + throws IOException { + FileStatus[] statuses = fs.listStatus(f, FileUtils.HIDDEN_FILES_PATH_FILTER); + boolean result = true; + final List<Future<Boolean>> futures = new LinkedList<>(); + final ExecutorService pool = Executors.newFixedThreadPool( + conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25), + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Delete-Thread-%d").build()); + final SessionState parentSession = SessionState.get(); + for (final FileStatus status : statuses) { + futures.add(pool.submit(new Callable<Boolean>() { + + @Override + public Boolean call() throws Exception { + SessionState.setCurrentSessionState(parentSession); + return FileUtils.moveToTrash(fs, status.getPath(), conf); + } + })); + } + pool.shutdown(); + for (Future<Boolean> future : futures) { + try { + result &= future.get(); + } catch (InterruptedException | ExecutionException e) { + LOG.error("Failed to delete: ",e); + pool.shutdownNow(); + throw new IOException(e); + } + } + return result; + } + public static boolean isHadoop1() { return ShimLoader.getMajorVersion().startsWith("0.20"); }
