gvprathyusha6 commented on code in PR #6616:
URL: https://github.com/apache/hbase/pull/6616#discussion_r1928854910
##########
hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java:
##########
@@ -419,52 +423,97 @@ public static void archiveStoreFile(Configuration conf,
FileSystem fs, RegionInf
* @return the list of failed to archive files.
* @throws IOException if an unexpected file operation exception occurred
*/
- private static List<File> resolveAndArchive(FileSystem fs, Path
baseArchiveDir,
- Collection<File> toArchive, long start) throws IOException {
- // short circuit if no files to move
+ private static List<File> resolveAndArchive(Configuration conf, FileSystem
fs,
+ Path baseArchiveDir, Collection<File> toArchive, long start) throws
IOException {
+ // Early exit if no files to archive
if (toArchive.isEmpty()) {
+ LOG.trace("No files to archive, returning an empty list.");
return Collections.emptyList();
}
- LOG.trace("Moving files to the archive directory {}", baseArchiveDir);
+ LOG.trace("Preparing to archive files into directory: {}", baseArchiveDir);
- // make sure the archive directory exists
- if (!fs.exists(baseArchiveDir)) {
- if (!fs.mkdirs(baseArchiveDir)) {
- throw new IOException("Failed to create the archive directory:" +
baseArchiveDir
- + ", quitting archive attempt.");
- }
- LOG.trace("Created archive directory {}", baseArchiveDir);
- }
+ // Ensure the archive directory exists
+ ensureArchiveDirectoryExists(fs, baseArchiveDir);
- List<File> failures = new ArrayList<>();
+ // Thread-safe collection for storing failures
+ Queue<File> failures = new ConcurrentLinkedQueue<>();
String startTime = Long.toString(start);
+
+ // Separate files and directories for processing
+ List<File> filesOnly = new ArrayList<>();
for (File file : toArchive) {
- // if its a file archive it
- try {
- LOG.trace("Archiving {}", file);
- if (file.isFile()) {
- // attempt to archive the file
- if (!resolveAndArchiveFile(baseArchiveDir, file, startTime)) {
- LOG.warn("Couldn't archive " + file + " into backup directory: " +
baseArchiveDir);
+ if (file.isFile()) {
+ filesOnly.add(file);
+ } else {
+ handleDirectory(conf, fs, baseArchiveDir, failures, file, start);
+ }
+ }
+
+ // Archive files concurrently
+ archiveFilesConcurrently(conf, baseArchiveDir, filesOnly, failures,
startTime);
+
+ return new ArrayList<>(failures); // Convert to a List for the return value
+ }
+
+ private static void ensureArchiveDirectoryExists(FileSystem fs, Path
baseArchiveDir)
+ throws IOException {
+ if (!fs.exists(baseArchiveDir) && !fs.mkdirs(baseArchiveDir)) {
+ throw new IOException("Failed to create the archive directory: " +
baseArchiveDir);
+ }
+ LOG.trace("Archive directory ready: {}", baseArchiveDir);
+ }
+
+ private static void handleDirectory(Configuration conf, FileSystem fs, Path
baseArchiveDir,
+ Queue<File> failures, File directory, long start) {
+ LOG.trace("Processing directory: {}, archiving its children.", directory);
+ Path subArchiveDir = new Path(baseArchiveDir, directory.getName());
+
+ try {
+ Collection<File> children = directory.getChildren();
+ failures.addAll(resolveAndArchive(conf, fs, subArchiveDir, children,
start));
+ } catch (IOException e) {
+ LOG.warn("Failed to archive directory: {}", directory, e);
+ failures.add(directory);
+ }
+ }
+
+ private static void archiveFilesConcurrently(Configuration conf, Path
baseArchiveDir,
+ List<File> files, Queue<File> failures, String startTime) {
+ LOG.trace("Archiving {} files concurrently into directory: {}",
files.size(), baseArchiveDir);
+ Map<File, Future<Boolean>> futureMap = new HashMap<>();
+ // Submit file archiving tasks
+ // default is 16 which comes equal hbase.hstore.blockingStoreFiles default
value
+ int maxThreads =
conf.getInt("hbase.hfilearchiver.per.region.thread.pool.max", 16);
Review Comment:
>On a large system, we have 10s os thousands of regions, so this could still
be a lot of threads,
I think this could be an issue for Delete table scenario for large table
where it actually archives all the regions at [once in a
loop](https://github.com/apache/hbase/blob/master/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java#L189),
we might end up creating 16 X noOfRegions threads.
@stoty @mnpoonia
How about, keeping a flag on archiveFiles(boolean needsConcurrent), so that
archival done as part of SplitProc does it concurrently and DeleteProc can
still do it in existent sequential mode (since its not critical/timebound
unlike SpitProcedure which can effect availability)
##########
hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java:
##########
@@ -419,52 +423,97 @@ public static void archiveStoreFile(Configuration conf,
FileSystem fs, RegionInf
* @return the list of failed to archive files.
* @throws IOException if an unexpected file operation exception occurred
*/
- private static List<File> resolveAndArchive(FileSystem fs, Path
baseArchiveDir,
- Collection<File> toArchive, long start) throws IOException {
- // short circuit if no files to move
+ private static List<File> resolveAndArchive(Configuration conf, FileSystem
fs,
+ Path baseArchiveDir, Collection<File> toArchive, long start) throws
IOException {
+ // Early exit if no files to archive
if (toArchive.isEmpty()) {
+ LOG.trace("No files to archive, returning an empty list.");
return Collections.emptyList();
}
- LOG.trace("Moving files to the archive directory {}", baseArchiveDir);
+ LOG.trace("Preparing to archive files into directory: {}", baseArchiveDir);
- // make sure the archive directory exists
- if (!fs.exists(baseArchiveDir)) {
- if (!fs.mkdirs(baseArchiveDir)) {
- throw new IOException("Failed to create the archive directory:" +
baseArchiveDir
- + ", quitting archive attempt.");
- }
- LOG.trace("Created archive directory {}", baseArchiveDir);
- }
+ // Ensure the archive directory exists
+ ensureArchiveDirectoryExists(fs, baseArchiveDir);
- List<File> failures = new ArrayList<>();
+ // Thread-safe collection for storing failures
+ Queue<File> failures = new ConcurrentLinkedQueue<>();
String startTime = Long.toString(start);
+
+ // Separate files and directories for processing
+ List<File> filesOnly = new ArrayList<>();
for (File file : toArchive) {
- // if its a file archive it
- try {
- LOG.trace("Archiving {}", file);
- if (file.isFile()) {
- // attempt to archive the file
- if (!resolveAndArchiveFile(baseArchiveDir, file, startTime)) {
- LOG.warn("Couldn't archive " + file + " into backup directory: " +
baseArchiveDir);
+ if (file.isFile()) {
+ filesOnly.add(file);
+ } else {
+ handleDirectory(conf, fs, baseArchiveDir, failures, file, start);
+ }
+ }
+
+ // Archive files concurrently
+ archiveFilesConcurrently(conf, baseArchiveDir, filesOnly, failures,
startTime);
+
+ return new ArrayList<>(failures); // Convert to a List for the return value
+ }
+
+ private static void ensureArchiveDirectoryExists(FileSystem fs, Path
baseArchiveDir)
+ throws IOException {
+ if (!fs.exists(baseArchiveDir) && !fs.mkdirs(baseArchiveDir)) {
+ throw new IOException("Failed to create the archive directory: " +
baseArchiveDir);
+ }
+ LOG.trace("Archive directory ready: {}", baseArchiveDir);
+ }
+
+ private static void handleDirectory(Configuration conf, FileSystem fs, Path
baseArchiveDir,
+ Queue<File> failures, File directory, long start) {
+ LOG.trace("Processing directory: {}, archiving its children.", directory);
+ Path subArchiveDir = new Path(baseArchiveDir, directory.getName());
+
+ try {
+ Collection<File> children = directory.getChildren();
+ failures.addAll(resolveAndArchive(conf, fs, subArchiveDir, children,
start));
+ } catch (IOException e) {
+ LOG.warn("Failed to archive directory: {}", directory, e);
+ failures.add(directory);
+ }
+ }
+
+ private static void archiveFilesConcurrently(Configuration conf, Path
baseArchiveDir,
+ List<File> files, Queue<File> failures, String startTime) {
+ LOG.trace("Archiving {} files concurrently into directory: {}",
files.size(), baseArchiveDir);
+ Map<File, Future<Boolean>> futureMap = new HashMap<>();
+ // Submit file archiving tasks
+ // default is 16 which comes equal hbase.hstore.blockingStoreFiles default
value
+ int maxThreads =
conf.getInt("hbase.hfilearchiver.per.region.thread.pool.max", 16);
Review Comment:
>On a large system, we have 10s os thousands of regions, so this could still
be a lot of threads,
I think this could be an issue for Delete table scenario for large table
where it actually archives all the regions at [once in a
loop](https://github.com/apache/hbase/blob/master/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java#L189),
we might end up creating 16 X noOfRegions threads.
@stoty @mnpoonia
How about, keeping a flag on archiveFiles(boolean needsConcurrent), so that
archival done as part of SplitProc does it concurrently and DeleteProc can
still do it in existent sequential mode (since its not critical/timebound
unlike SpitProcedure which can effect availability)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]