[
https://issues.apache.org/jira/browse/HIVE-23559?focusedWorklogId=831201&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-831201
]
ASF GitHub Bot logged work on HIVE-23559:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 05/Dec/22 21:51
Start Date: 05/Dec/22 21:51
Worklog Time Spent: 10m
Work Description: ramesh0201 commented on code in PR #3795:
URL: https://github.com/apache/hive/pull/3795#discussion_r1040134949
##########
ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java:
##########
@@ -5208,55 +5208,94 @@ private static void moveAcidFiles(String deltaFileType,
PathFilter pathFilter, F
}
LOG.debug("Acid move found " + deltaStats.length + " " + deltaFileType + "
files");
+ List<Future<Void>> futures = new LinkedList<>();
+ final ExecutorService pool =
conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ?
+
Executors.newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname,
25),
+ new
ThreadFactoryBuilder().setDaemon(true).setNameFormat("Move-Acid-Files-Thread-%d").build())
: null;
+
for (FileStatus deltaStat : deltaStats) {
- Path deltaPath = deltaStat.getPath();
- // Create the delta directory. Don't worry if it already exists,
- // as that likely means another task got to it first. Then move each of
the buckets.
- // it would be more efficient to try to move the delta with it's buckets
but that is
- // harder to make race condition proof.
- Path deltaDest = new Path(dst, deltaPath.getName());
- try {
- if (!createdDeltaDirs.contains(deltaDest)) {
- try {
- if(fs.mkdirs(deltaDest)) {
- try {
-
fs.rename(AcidUtils.OrcAcidVersion.getVersionFilePath(deltaStat.getPath()),
- AcidUtils.OrcAcidVersion.getVersionFilePath(deltaDest));
- } catch (FileNotFoundException fnf) {
- // There might be no side file. Skip in this case.
- }
+
+ if (null == pool) {
+ moveAcidFilesForDelta(deltaFileType, fs, dst, createdDeltaDirs,
newFiles, deltaStat);
+ } else {
+ futures.add(pool.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws HiveException {
+ try {
+ moveAcidFilesForDelta(deltaFileType, fs, dst, createdDeltaDirs,
newFiles, deltaStat);
+ } catch (Exception e) {
+ final String poolMsg =
+ "Unable to move source " + deltaStat.getPath().getName()
+ " to destination " + dst.getName();
+ throw getHiveException(e, poolMsg);
}
- createdDeltaDirs.add(deltaDest);
- } catch (IOException swallowIt) {
- // Don't worry about this, as it likely just means it's already
been created.
- LOG.info("Unable to create " + deltaFileType + " directory " +
deltaDest +
- ", assuming it already exists: " + swallowIt.getMessage());
+ return null;
}
+ }));
+ }
+ }
+
+ if (null != pool) {
+ pool.shutdown();
+ for (Future<Void> future : futures) {
+ try {
+ future.get();
+ } catch (Exception e) {
+ throw handlePoolException(pool, e);
}
- FileStatus[] bucketStats = fs.listStatus(deltaPath,
AcidUtils.bucketFileFilter);
- LOG.debug("Acid move found " + bucketStats.length + " bucket files");
- for (FileStatus bucketStat : bucketStats) {
- Path bucketSrc = bucketStat.getPath();
- Path bucketDest = new Path(deltaDest, bucketSrc.getName());
- final String msg = "Unable to move source " + bucketSrc + " to
destination " +
- bucketDest;
- LOG.info("Moving bucket " + bucketSrc.toUri().toString() + " to " +
- bucketDest.toUri().toString());
- try {
- fs.rename(bucketSrc, bucketDest);
- if (newFiles != null) {
- newFiles.add(bucketDest);
+ }
+ }
+ }
+
+ private static void moveAcidFilesForDelta(String deltaFileType, FileSystem
fs,
+ Path dst, Set<Path>
createdDeltaDirs,
+ List<Path> newFiles, FileStatus
deltaStat) throws HiveException {
+
+ Path deltaPath = deltaStat.getPath();
+ // Create the delta directory. Don't worry if it already exists,
+ // as that likely means another task got to it first. Then move each of
the buckets.
+ // it would be more efficient to try to move the delta with it's buckets
but that is
+ // harder to make race condition proof.
+ Path deltaDest = new Path(dst, deltaPath.getName());
+ try {
+ if (!createdDeltaDirs.contains(deltaDest)) {
+ try {
+ if(fs.mkdirs(deltaDest)) {
+ try {
+
fs.rename(AcidUtils.OrcAcidVersion.getVersionFilePath(deltaStat.getPath()),
+ AcidUtils.OrcAcidVersion.getVersionFilePath(deltaDest));
+ } catch (FileNotFoundException fnf) {
+ // There might be no side file. Skip in this case.
}
- } catch (Exception e) {
- throw getHiveException(e, msg);
}
+ createdDeltaDirs.add(deltaDest);
+ } catch (IOException swallowIt) {
+ // Don't worry about this, as it likely just means it's already been
created.
+ LOG.info("Unable to create " + deltaFileType + " directory " +
deltaDest +
+ ", assuming it already exists: " + swallowIt.getMessage());
}
- } catch (IOException e) {
- throw new HiveException("Error moving acid files " + e.getMessage(),
e);
}
+ FileStatus[] bucketStats = fs.listStatus(deltaPath,
AcidUtils.bucketFileFilter);
+ LOG.debug("Acid move found " + bucketStats.length + " bucket files");
+ for (FileStatus bucketStat : bucketStats) {
+ Path bucketSrc = bucketStat.getPath();
+ Path bucketDest = new Path(deltaDest, bucketSrc.getName());
+ final String msg = "Unable to move source " + bucketSrc + " to
destination " +
Review Comment:
Minor comment, can we move this to catch block?
Issue Time Tracking
-------------------
Worklog Id: (was: 831201)
Time Spent: 40m (was: 0.5h)
> Optimise Hive::moveAcidFiles for cloud storage
> ----------------------------------------------
>
> Key: HIVE-23559
> URL: https://issues.apache.org/jira/browse/HIVE-23559
> Project: Hive
> Issue Type: Improvement
> Reporter: Rajesh Balamohan
> Assignee: Dmitriy Fingerman
> Priority: Major
> Labels: pull-request-available
> Time Spent: 40m
> Remaining Estimate: 0h
>
> [https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java#L4752]
> It ends up transferring DELTA, DELETE_DELTA, BASE prefixes sequentially from
> staging to final location.
> This causes delays even with simple updates statements, which updates smaller
> number of records in cloud storage.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)