mnpoonia commented on code in PR #6483:
URL: https://github.com/apache/hbase/pull/6483#discussion_r1851367892
##########
hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java:
##########
@@ -437,33 +440,53 @@ private static List<File> resolveAndArchive(FileSystem
fs, Path baseArchiveDir,
LOG.trace("Created archive directory {}", baseArchiveDir);
}
- List<File> failures = new ArrayList<>();
+ List<File> failures = Collections.synchronizedList(new ArrayList<>());
String startTime = Long.toString(start);
+ List<File> filesOnly = new ArrayList<>();
for (File file : toArchive) {
- // if its a file archive it
try {
- LOG.trace("Archiving {}", file);
- if (file.isFile()) {
- // attempt to archive the file
- if (!resolveAndArchiveFile(baseArchiveDir, file, startTime)) {
- LOG.warn("Couldn't archive " + file + " into backup directory: " +
baseArchiveDir);
- failures.add(file);
- }
- } else {
- // otherwise its a directory and we need to archive all files
+ if (!file.isFile()) {
+ // if its a directory and we need to archive all files
LOG.trace("{} is a directory, archiving children files", file);
// so we add the directory name to the one base archive
Path parentArchiveDir = new Path(baseArchiveDir, file.getName());
// and then get all the files from that directory and attempt to
// archive those too
Collection<File> children = file.getChildren();
- failures.addAll(resolveAndArchive(fs, parentArchiveDir, children,
start));
+ failures.addAll(resolveAndArchive(conf, fs, parentArchiveDir,
children, start));
+ } else {
+ filesOnly.add(file);
}
} catch (IOException e) {
LOG.warn("Failed to archive {}", file, e);
failures.add(file);
}
}
+ Map<File, Future<Boolean>> futures = new HashMap<>();
+ // In current baseDir all files will be processed concurrently
+ for (File file : filesOnly) {
+ LOG.trace("Archiving {}", file);
+ Future<Boolean> archiveTask = getArchiveExecutor(conf)
+ .submit(() -> resolveAndArchiveFile(baseArchiveDir, file, startTime));
+ futures.put(file, archiveTask);
+ }
+
+ for (Map.Entry<File, Future<Boolean>> fileFutureEntry :
futures.entrySet()) {
+ try {
+ boolean fileCleaned = fileFutureEntry.getValue().get();
+ if (!fileCleaned) {
+ LOG.warn("Couldn't archive %s into backup directory: %s"
+ .formatted(fileFutureEntry.getKey(), baseArchiveDir));
+ failures.add(fileFutureEntry.getKey());
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("HFileArchive Cleanup thread was interrupted");
Review Comment:
Thanks for pointing. Added it now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]