rbalamohan commented on a change in pull request #1934:
URL: https://github.com/apache/hive/pull/1934#discussion_r568426369
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
##########
@@ -5011,29 +5019,80 @@ public void cleanUpOneDirectoryForReplace(Path path,
FileSystem fs,
if (isNeedRecycle && conf.getBoolVar(HiveConf.ConfVars.REPLCMENABLED)) {
recycleDirToCmPath(path, purge);
}
- FileStatus[] statuses = fs.listStatus(path, pathFilter);
- if (statuses == null || statuses.length == 0) {
- return;
+
+ if (!trashDirectoryContent(fs, path, pathFilter, conf, purge)) {
+ throw new HiveException("Old path " + path + " has not been cleaned
up.");
}
- if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
- String s = "Deleting files under " + path + " for replace: ";
- for (FileStatus file : statuses) {
- s += file.getPath().getName() + ", ";
+ }
+
+ private static ExecutorService createDeleteProcessorPool(final Configuration
conf) {
+ final ExecutorService pool =
conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ?
+
Executors.newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname,
25),
+ new
ThreadFactoryBuilder().setDaemon(true).setNameFormat("Delete-Thread-%d").build())
: null;
+ return pool;
+ }
+
+ /**
+ * Trashes or deletes files in given directory. Deletion happens in a
+ * seperate thread pool if {@link ConfVars.HIVE_MOVE_FILES_THREAD_COUNT}
+ * is set accordingly
+ * @param fs FileSystem to use
+ * @param path directory to be cleaned up
+ * @param pathFilter filter to be applied
+ * @param conf hive configuration
+ * @param purge skip trash if true
+ * @return true if deletion successful
+ * @throws IOException
+ */
+ public static boolean trashDirectoryContent(final FileSystem fs, final Path
path, PathFilter pathFilter,
+ final Configuration conf, final boolean
purge)
+ throws IOException {
+ boolean result = true;
+
+ final List<Future<Boolean>> futures = new LinkedList<>();
+ final ExecutorService pool = createDeleteProcessorPool(conf);
+ final SessionState parentSession = SessionState.get();
+ RemoteIterator<FileStatus> remoteIterator = fs.listStatusIterator(path);
+ while (remoteIterator.hasNext()){
+ FileStatus status = remoteIterator.next();
+ if (!pathFilter.accept(status.getPath())) {
+ continue;
+ }
+ if (null == pool) {
Review comment:
Minor comment.
This kind of overlaps with trashFiles(). Should this be refactored instead
of duplicating it in 2 places?
Rest LGTM.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]