difin commented on code in PR #3795:
URL: https://github.com/apache/hive/pull/3795#discussion_r1042233921
##########
ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java:
##########
@@ -5208,55 +5208,94 @@ private static void moveAcidFiles(String deltaFileType,
PathFilter pathFilter, F
}
LOG.debug("Acid move found " + deltaStats.length + " " + deltaFileType + "
files");
+ List<Future<Void>> futures = new LinkedList<>();
+ final ExecutorService pool =
conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ?
+
Executors.newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname,
25),
+ new
ThreadFactoryBuilder().setDaemon(true).setNameFormat("Move-Acid-Files-Thread-%d").build())
: null;
+
for (FileStatus deltaStat : deltaStats) {
- Path deltaPath = deltaStat.getPath();
- // Create the delta directory. Don't worry if it already exists,
- // as that likely means another task got to it first. Then move each of
the buckets.
- // it would be more efficient to try to move the delta with it's buckets
but that is
- // harder to make race condition proof.
- Path deltaDest = new Path(dst, deltaPath.getName());
- try {
- if (!createdDeltaDirs.contains(deltaDest)) {
- try {
- if(fs.mkdirs(deltaDest)) {
- try {
-
fs.rename(AcidUtils.OrcAcidVersion.getVersionFilePath(deltaStat.getPath()),
- AcidUtils.OrcAcidVersion.getVersionFilePath(deltaDest));
- } catch (FileNotFoundException fnf) {
- // There might be no side file. Skip in this case.
- }
+
+ if (null == pool) {
+ moveAcidFilesForDelta(deltaFileType, fs, dst, createdDeltaDirs,
newFiles, deltaStat);
+ } else {
+ futures.add(pool.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws HiveException {
+ try {
+ moveAcidFilesForDelta(deltaFileType, fs, dst, createdDeltaDirs,
newFiles, deltaStat);
+ } catch (Exception e) {
+ final String poolMsg =
+ "Unable to move source " + deltaStat.getPath().getName()
+ " to destination " + dst.getName();
+ throw getHiveException(e, poolMsg);
}
- createdDeltaDirs.add(deltaDest);
- } catch (IOException swallowIt) {
- // Don't worry about this, as it likely just means it's already
been created.
- LOG.info("Unable to create " + deltaFileType + " directory " +
deltaDest +
- ", assuming it already exists: " + swallowIt.getMessage());
+ return null;
}
+ }));
+ }
+ }
Review Comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]