difin commented on code in PR #3795:
URL: https://github.com/apache/hive/pull/3795#discussion_r1042234444
##########
ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java:
##########
@@ -5208,55 +5208,94 @@ private static void moveAcidFiles(String deltaFileType,
PathFilter pathFilter, F
}
LOG.debug("Acid move found " + deltaStats.length + " " + deltaFileType + "
files");
+ List<Future<Void>> futures = new LinkedList<>();
+ final ExecutorService pool =
conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ?
+
Executors.newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname,
25),
+ new
ThreadFactoryBuilder().setDaemon(true).setNameFormat("Move-Acid-Files-Thread-%d").build())
: null;
+
for (FileStatus deltaStat : deltaStats) {
- Path deltaPath = deltaStat.getPath();
- // Create the delta directory. Don't worry if it already exists,
- // as that likely means another task got to it first. Then move each of
the buckets.
- // it would be more efficient to try to move the delta with it's buckets
but that is
- // harder to make race condition proof.
- Path deltaDest = new Path(dst, deltaPath.getName());
- try {
- if (!createdDeltaDirs.contains(deltaDest)) {
- try {
- if(fs.mkdirs(deltaDest)) {
- try {
-
fs.rename(AcidUtils.OrcAcidVersion.getVersionFilePath(deltaStat.getPath()),
- AcidUtils.OrcAcidVersion.getVersionFilePath(deltaDest));
- } catch (FileNotFoundException fnf) {
- // There might be no side file. Skip in this case.
- }
+
+ if (null == pool) {
+ moveAcidFilesForDelta(deltaFileType, fs, dst, createdDeltaDirs,
newFiles, deltaStat);
+ } else {
+ futures.add(pool.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws HiveException {
+ try {
+ moveAcidFilesForDelta(deltaFileType, fs, dst, createdDeltaDirs,
newFiles, deltaStat);
+ } catch (Exception e) {
+ final String poolMsg =
+ "Unable to move source " + deltaStat.getPath().getName()
+ " to destination " + dst.getName();
+ throw getHiveException(e, poolMsg);
}
- createdDeltaDirs.add(deltaDest);
- } catch (IOException swallowIt) {
- // Don't worry about this, as it likely just means it's already
been created.
- LOG.info("Unable to create " + deltaFileType + " directory " +
deltaDest +
- ", assuming it already exists: " + swallowIt.getMessage());
+ return null;
}
+ }));
+ }
+ }
+
+ if (null != pool) {
+ pool.shutdown();
+ for (Future<Void> future : futures) {
+ try {
+ future.get();
+ } catch (Exception e) {
+ throw handlePoolException(pool, e);
}
- FileStatus[] bucketStats = fs.listStatus(deltaPath,
AcidUtils.bucketFileFilter);
- LOG.debug("Acid move found " + bucketStats.length + " bucket files");
- for (FileStatus bucketStat : bucketStats) {
- Path bucketSrc = bucketStat.getPath();
- Path bucketDest = new Path(deltaDest, bucketSrc.getName());
- final String msg = "Unable to move source " + bucketSrc + " to
destination " +
- bucketDest;
- LOG.info("Moving bucket " + bucketSrc.toUri().toString() + " to " +
- bucketDest.toUri().toString());
- try {
- fs.rename(bucketSrc, bucketDest);
- if (newFiles != null) {
- newFiles.add(bucketDest);
+ }
+ }
+ }
+
+ private static void moveAcidFilesForDelta(String deltaFileType, FileSystem
fs,
+ Path dst, Set<Path>
createdDeltaDirs,
+ List<Path> newFiles, FileStatus
deltaStat) throws HiveException {
+
+ Path deltaPath = deltaStat.getPath();
+ // Create the delta directory. Don't worry if it already exists,
+ // as that likely means another task got to it first. Then move each of
the buckets.
+ // it would be more efficient to try to move the delta with it's buckets
but that is
+ // harder to make race condition proof.
+ Path deltaDest = new Path(dst, deltaPath.getName());
+ try {
+ if (!createdDeltaDirs.contains(deltaDest)) {
+ try {
+ if(fs.mkdirs(deltaDest)) {
+ try {
+
fs.rename(AcidUtils.OrcAcidVersion.getVersionFilePath(deltaStat.getPath()),
+ AcidUtils.OrcAcidVersion.getVersionFilePath(deltaDest));
+ } catch (FileNotFoundException fnf) {
+ // There might be no side file. Skip in this case.
}
- } catch (Exception e) {
- throw getHiveException(e, msg);
}
+ createdDeltaDirs.add(deltaDest);
+ } catch (IOException swallowIt) {
+ // Don't worry about this, as it likely just means it's already been
created.
+ LOG.info("Unable to create " + deltaFileType + " directory " +
deltaDest +
+ ", assuming it already exists: " + swallowIt.getMessage());
}
- } catch (IOException e) {
- throw new HiveException("Error moving acid files " + e.getMessage(),
e);
}
+ FileStatus[] bucketStats = fs.listStatus(deltaPath,
AcidUtils.bucketFileFilter);
+ LOG.debug("Acid move found " + bucketStats.length + " bucket files");
+ for (FileStatus bucketStat : bucketStats) {
+ Path bucketSrc = bucketStat.getPath();
+ Path bucketDest = new Path(deltaDest, bucketSrc.getName());
+ final String msg = "Unable to move source " + bucketSrc + " to
destination " +
Review Comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]