Copilot commented on code in PR #4023:
URL: https://github.com/apache/solr/pull/4023#discussion_r2908810858


##########
solr/core/src/java/org/apache/solr/handler/RestoreCore.java:
##########
@@ -107,34 +125,140 @@ public boolean doRestore() throws Exception {
                   DirectoryFactory.DirContext.DEFAULT,
                   core.getSolrConfig().indexConfig.lockType);
       Set<String> indexDirFiles = new 
HashSet<>(Arrays.asList(indexDir.listAll()));
-      // Move all files from backupDir to restoreIndexDir
-      for (String filename : repository.listAllFiles()) {
-        checkInterrupted();
-        try {
-          if (indexDirFiles.contains(filename)) {
-            Checksum cs = repository.checksum(filename);
-            IndexFetcher.CompareResult compareResult;
-            if (cs == null) {
-              compareResult = new IndexFetcher.CompareResult();
-              compareResult.equal = false;
-            } else {
-              compareResult = IndexFetcher.compareFile(indexDir, filename, 
cs.size, cs.checksum);
+
+      // Capture directories as final for lambda access
+      final Directory finalIndexDir = indexDir;
+      final Directory finalRestoreIndexDir = restoreIndexDir;
+
+      // Only use an executor for parallel downloads when parallelism > 1
+      // When set to 1, run synchronously to avoid thread-local state issues 
with CallerRunsPolicy
+      int maxParallelDownloads = DEFAULT_MAX_PARALLEL_DOWNLOADS;
+      ExecutorService executor =
+          maxParallelDownloads > 1
+              ? new ExecutorUtil.MDCAwareThreadPoolExecutor(
+                  0,
+                  maxParallelDownloads,
+                  60L,
+                  TimeUnit.SECONDS,
+                  new SynchronousQueue<>(),
+                  new SolrNamedThreadFactory("RestoreCore"),
+                  new ThreadPoolExecutor.CallerRunsPolicy())

Review Comment:
   Using `SynchronousQueue` with `CallerRunsPolicy` means once all 
`maxParallelDownloads` threads are busy, additional downloads will execute on 
the calling thread. That can exceed the configured cap (up to 
`maxParallelDownloads + 1` concurrent transfers) and also bypass the 
`MDCAwareThreadPoolExecutor` wrapping for those caller-run tasks. Consider a 
bounded queue/fixed pool or explicitly limiting in-flight submissions to 
enforce the configured parallelism.



##########
solr/core/src/java/org/apache/solr/handler/IncrementalShardBackup.java:
##########
@@ -191,55 +213,160 @@ private BackupStats incrementalCopy(Collection<String> 
indexFiles, Directory dir
     URI indexDir = incBackupFiles.getIndexDir();
     BackupStats backupStats = new BackupStats();
 
-    for (String fileName : indexFiles) {
-      Optional<ShardBackupMetadata.BackedFile> opBackedFile = 
oldBackupPoint.getFile(fileName);
-      Checksum originalFileCS = backupRepo.checksum(dir, fileName);
-
-      if (opBackedFile.isPresent()) {
-        ShardBackupMetadata.BackedFile backedFile = opBackedFile.get();
-        Checksum existedFileCS = backedFile.fileChecksum;
-        if (existedFileCS.equals(originalFileCS)) {
-          currentBackupPoint.addBackedFile(opBackedFile.get());
-          backupStats.skippedUploadingFile(existedFileCS);
-          continue;
+    // Only use an executor for parallel uploads when parallelism > 1
+    // When set to 1, run synchronously to avoid thread-local state issues 
with CallerRunsPolicy
+    int maxParallelUploads = DEFAULT_MAX_PARALLEL_UPLOADS;
+    ExecutorService executor =
+        maxParallelUploads > 1
+            ? new ExecutorUtil.MDCAwareThreadPoolExecutor(
+                0,
+                maxParallelUploads,
+                60L,
+                TimeUnit.SECONDS,
+                new SynchronousQueue<>(),
+                new SolrNamedThreadFactory("IncrementalBackup"),
+                new ThreadPoolExecutor.CallerRunsPolicy())

Review Comment:
   With `SynchronousQueue` + `CallerRunsPolicy`, once `maxParallelUploads` 
threads are busy, additional uploads will run on the submitting thread. That 
can exceed the configured cap (up to `maxParallelUploads + 1` concurrent 
uploads) and bypass `MDCAwareThreadPoolExecutor` wrapping for caller-run tasks. 
Consider a bounded queue/fixed pool or explicitly bounding in-flight 
submissions to enforce the configured limit.



##########
solr/core/src/java/org/apache/solr/handler/IncrementalShardBackup.java:
##########
@@ -191,55 +213,160 @@ private BackupStats incrementalCopy(Collection<String> 
indexFiles, Directory dir
     URI indexDir = incBackupFiles.getIndexDir();
     BackupStats backupStats = new BackupStats();
 
-    for (String fileName : indexFiles) {
-      Optional<ShardBackupMetadata.BackedFile> opBackedFile = 
oldBackupPoint.getFile(fileName);
-      Checksum originalFileCS = backupRepo.checksum(dir, fileName);
-
-      if (opBackedFile.isPresent()) {
-        ShardBackupMetadata.BackedFile backedFile = opBackedFile.get();
-        Checksum existedFileCS = backedFile.fileChecksum;
-        if (existedFileCS.equals(originalFileCS)) {
-          currentBackupPoint.addBackedFile(opBackedFile.get());
-          backupStats.skippedUploadingFile(existedFileCS);
-          continue;
+    // Only use an executor for parallel uploads when parallelism > 1
+    // When set to 1, run synchronously to avoid thread-local state issues 
with CallerRunsPolicy
+    int maxParallelUploads = DEFAULT_MAX_PARALLEL_UPLOADS;
+    ExecutorService executor =
+        maxParallelUploads > 1
+            ? new ExecutorUtil.MDCAwareThreadPoolExecutor(
+                0,
+                maxParallelUploads,
+                60L,
+                TimeUnit.SECONDS,
+                new SynchronousQueue<>(),
+                new SolrNamedThreadFactory("IncrementalBackup"),
+                new ThreadPoolExecutor.CallerRunsPolicy())
+            : null;
+
+    List<Future<?>> uploadFutures = new ArrayList<>();
+
+    try {

Review Comment:
   This implementation queues a `Future` for every index file and holds them in 
`uploadFutures` until the end. For large indexes this can create significant 
memory overhead and delays error reporting. Consider processing completed tasks 
as they finish (e.g., `ExecutorCompletionService`) and/or limiting in-flight 
submissions to `maxParallelUploads`.



##########
solr/core/src/java/org/apache/solr/handler/RestoreCore.java:
##########
@@ -107,34 +125,140 @@ public boolean doRestore() throws Exception {
                   DirectoryFactory.DirContext.DEFAULT,
                   core.getSolrConfig().indexConfig.lockType);
       Set<String> indexDirFiles = new 
HashSet<>(Arrays.asList(indexDir.listAll()));
-      // Move all files from backupDir to restoreIndexDir
-      for (String filename : repository.listAllFiles()) {
-        checkInterrupted();
-        try {
-          if (indexDirFiles.contains(filename)) {
-            Checksum cs = repository.checksum(filename);
-            IndexFetcher.CompareResult compareResult;
-            if (cs == null) {
-              compareResult = new IndexFetcher.CompareResult();
-              compareResult.equal = false;
-            } else {
-              compareResult = IndexFetcher.compareFile(indexDir, filename, 
cs.size, cs.checksum);
+
+      // Capture directories as final for lambda access
+      final Directory finalIndexDir = indexDir;
+      final Directory finalRestoreIndexDir = restoreIndexDir;
+
+      // Only use an executor for parallel downloads when parallelism > 1
+      // When set to 1, run synchronously to avoid thread-local state issues 
with CallerRunsPolicy
+      int maxParallelDownloads = DEFAULT_MAX_PARALLEL_DOWNLOADS;
+      ExecutorService executor =
+          maxParallelDownloads > 1
+              ? new ExecutorUtil.MDCAwareThreadPoolExecutor(
+                  0,
+                  maxParallelDownloads,
+                  60L,
+                  TimeUnit.SECONDS,
+                  new SynchronousQueue<>(),
+                  new SolrNamedThreadFactory("RestoreCore"),
+                  new ThreadPoolExecutor.CallerRunsPolicy())
+              : null;
+
+      List<Future<?>> downloadFutures = new ArrayList<>();
+
+      try {
+        // Move all files from backupDir to restoreIndexDir
+        for (String filename : repository.listAllFiles()) {
+          checkInterrupted();
+
+          // Capture variables for lambda
+          final String filenameFinal = filename;
+          final boolean fileExistsLocally = indexDirFiles.contains(filename);
+
+          Runnable downloadTask =
+              () -> {
+                try {
+                  if (fileExistsLocally) {
+                    Checksum cs = repository.checksum(filenameFinal);
+                    IndexFetcher.CompareResult compareResult;
+                    if (cs == null) {
+                      compareResult = new IndexFetcher.CompareResult();
+                      compareResult.equal = false;
+                    } else {
+                      compareResult =
+                          IndexFetcher.compareFile(
+                              finalIndexDir, filenameFinal, cs.size, 
cs.checksum);
+                    }
+                    if (!compareResult.equal
+                        || (IndexFetcher.filesToAlwaysDownloadIfNoChecksums(
+                            filenameFinal, cs.size, compareResult))) {
+                      repository.repoCopy(filenameFinal, finalRestoreIndexDir);
+                    } else {
+                      // prefer local copy
+                      repository.localCopy(finalIndexDir, filenameFinal, 
finalRestoreIndexDir);
+                    }
+                  } else {
+                    repository.repoCopy(filenameFinal, finalRestoreIndexDir);
+                  }
+                } catch (Exception e) {
+                  log.warn("Exception while restoring the backup index ", e);
+                  throw new RuntimeException(
+                      "Exception while restoring the backup index for file: " 
+ filenameFinal, e);
+                }
+              };
+
+          if (executor != null) {
+            downloadFutures.add(executor.submit(downloadTask));
+          } else {
+            // Run synchronously when parallelism is 1
+            try {
+              downloadTask.run();
+            } catch (RuntimeException e) {
+              if (e.getCause() instanceof IOException) {
+                throw (IOException) e.getCause();
+              }
+              throw e;
+            }
+          }
+        }
+
+        // Wait for all downloads to complete and collect any errors (only if 
using executor)
+        if (executor != null) {
+          // We need to wait for ALL futures to ensure all files are processed
+          Throwable firstError = null;
+          for (Future<?> future : downloadFutures) {
+            try {
+              future.get();
+            } catch (ExecutionException e) {
+              if (firstError == null) {
+                Throwable cause = e.getCause();
+                // Unwrap RuntimeExceptions that wrap the original IOException
+                if (cause instanceof RuntimeException && cause.getCause() != 
null) {
+                  firstError = cause.getCause();
+                } else {

Review Comment:
   In the parallel path, the `RuntimeException` unwrapping (`firstError = 
cause.getCause()`) can discard the wrapper message that contains the filename. 
Prefer preserving that message (or re-wrapping the cause with filename context) 
when propagating the first failure.



##########
changelog/unreleased/parallelizebackups.yml:
##########
@@ -0,0 +1,9 @@
+# See https://github.com/apache/solr/blob/main/dev-docs/changelog.adoc
+title: Parallelize Backup and Restore File Operations
+type: changed
+authors:
+  - name: Samuel Verstraete
+    github: elangelo

Review Comment:
   The changelog author metadata uses a `github` field, but this repository’s 
changelog format documentation uses `nick` (optionally with `url`) under 
`authors`. Using an unexpected key may fail changelog validation or omit author 
info; please switch `github: elangelo` to `nick: elangelo` (and add `url` if 
desired).



##########
solr/core/src/java/org/apache/solr/handler/RestoreCore.java:
##########
@@ -107,34 +125,140 @@ public boolean doRestore() throws Exception {
                   DirectoryFactory.DirContext.DEFAULT,
                   core.getSolrConfig().indexConfig.lockType);
       Set<String> indexDirFiles = new 
HashSet<>(Arrays.asList(indexDir.listAll()));
-      // Move all files from backupDir to restoreIndexDir
-      for (String filename : repository.listAllFiles()) {
-        checkInterrupted();
-        try {
-          if (indexDirFiles.contains(filename)) {
-            Checksum cs = repository.checksum(filename);
-            IndexFetcher.CompareResult compareResult;
-            if (cs == null) {
-              compareResult = new IndexFetcher.CompareResult();
-              compareResult.equal = false;
-            } else {
-              compareResult = IndexFetcher.compareFile(indexDir, filename, 
cs.size, cs.checksum);
+
+      // Capture directories as final for lambda access
+      final Directory finalIndexDir = indexDir;
+      final Directory finalRestoreIndexDir = restoreIndexDir;
+
+      // Only use an executor for parallel downloads when parallelism > 1
+      // When set to 1, run synchronously to avoid thread-local state issues 
with CallerRunsPolicy
+      int maxParallelDownloads = DEFAULT_MAX_PARALLEL_DOWNLOADS;
+      ExecutorService executor =
+          maxParallelDownloads > 1
+              ? new ExecutorUtil.MDCAwareThreadPoolExecutor(
+                  0,
+                  maxParallelDownloads,
+                  60L,
+                  TimeUnit.SECONDS,
+                  new SynchronousQueue<>(),
+                  new SolrNamedThreadFactory("RestoreCore"),
+                  new ThreadPoolExecutor.CallerRunsPolicy())
+              : null;
+
+      List<Future<?>> downloadFutures = new ArrayList<>();
+
+      try {
+        // Move all files from backupDir to restoreIndexDir
+        for (String filename : repository.listAllFiles()) {
+          checkInterrupted();
+
+          // Capture variables for lambda
+          final String filenameFinal = filename;
+          final boolean fileExistsLocally = indexDirFiles.contains(filename);
+
+          Runnable downloadTask =
+              () -> {
+                try {
+                  if (fileExistsLocally) {
+                    Checksum cs = repository.checksum(filenameFinal);
+                    IndexFetcher.CompareResult compareResult;
+                    if (cs == null) {
+                      compareResult = new IndexFetcher.CompareResult();
+                      compareResult.equal = false;
+                    } else {
+                      compareResult =
+                          IndexFetcher.compareFile(
+                              finalIndexDir, filenameFinal, cs.size, 
cs.checksum);
+                    }
+                    if (!compareResult.equal
+                        || (IndexFetcher.filesToAlwaysDownloadIfNoChecksums(
+                            filenameFinal, cs.size, compareResult))) {
+                      repository.repoCopy(filenameFinal, finalRestoreIndexDir);
+                    } else {
+                      // prefer local copy
+                      repository.localCopy(finalIndexDir, filenameFinal, 
finalRestoreIndexDir);
+                    }
+                  } else {
+                    repository.repoCopy(filenameFinal, finalRestoreIndexDir);
+                  }
+                } catch (Exception e) {
+                  log.warn("Exception while restoring the backup index ", e);
+                  throw new RuntimeException(
+                      "Exception while restoring the backup index for file: " 
+ filenameFinal, e);
+                }
+              };
+
+          if (executor != null) {
+            downloadFutures.add(executor.submit(downloadTask));
+          } else {
+            // Run synchronously when parallelism is 1
+            try {
+              downloadTask.run();
+            } catch (RuntimeException e) {
+              if (e.getCause() instanceof IOException) {
+                throw (IOException) e.getCause();
+              }
+              throw e;

Review Comment:
   In the synchronous path, unwrapping and rethrowing only `e.getCause()` drops 
the wrapper message that includes per-file context (filename). Preserve the 
wrapper message (or re-wrap the cause including the filename) so restore 
failures remain actionable.



##########
solr/core/src/java/org/apache/solr/handler/RestoreCore.java:
##########
@@ -107,34 +125,140 @@ public boolean doRestore() throws Exception {
                   DirectoryFactory.DirContext.DEFAULT,
                   core.getSolrConfig().indexConfig.lockType);
       Set<String> indexDirFiles = new 
HashSet<>(Arrays.asList(indexDir.listAll()));
-      // Move all files from backupDir to restoreIndexDir
-      for (String filename : repository.listAllFiles()) {
-        checkInterrupted();
-        try {
-          if (indexDirFiles.contains(filename)) {
-            Checksum cs = repository.checksum(filename);
-            IndexFetcher.CompareResult compareResult;
-            if (cs == null) {
-              compareResult = new IndexFetcher.CompareResult();
-              compareResult.equal = false;
-            } else {
-              compareResult = IndexFetcher.compareFile(indexDir, filename, 
cs.size, cs.checksum);
+
+      // Capture directories as final for lambda access
+      final Directory finalIndexDir = indexDir;
+      final Directory finalRestoreIndexDir = restoreIndexDir;
+
+      // Only use an executor for parallel downloads when parallelism > 1
+      // When set to 1, run synchronously to avoid thread-local state issues 
with CallerRunsPolicy
+      int maxParallelDownloads = DEFAULT_MAX_PARALLEL_DOWNLOADS;
+      ExecutorService executor =
+          maxParallelDownloads > 1
+              ? new ExecutorUtil.MDCAwareThreadPoolExecutor(
+                  0,
+                  maxParallelDownloads,
+                  60L,
+                  TimeUnit.SECONDS,
+                  new SynchronousQueue<>(),
+                  new SolrNamedThreadFactory("RestoreCore"),
+                  new ThreadPoolExecutor.CallerRunsPolicy())
+              : null;
+
+      List<Future<?>> downloadFutures = new ArrayList<>();
+

Review Comment:
   This submits one task per index file and retains every `Future` in 
`downloadFutures` until the end. For large collections with many segment files, 
that can add substantial memory/GC overhead and delays surfacing failures until 
all tasks are submitted. Consider processing completions incrementally (e.g., 
`ExecutorCompletionService`) and/or bounding the number of in-flight tasks to 
`maxParallelDownloads`.



##########
solr/core/src/java/org/apache/solr/handler/IncrementalShardBackup.java:
##########
@@ -191,55 +213,160 @@ private BackupStats incrementalCopy(Collection<String> 
indexFiles, Directory dir
     URI indexDir = incBackupFiles.getIndexDir();
     BackupStats backupStats = new BackupStats();
 
-    for (String fileName : indexFiles) {
-      Optional<ShardBackupMetadata.BackedFile> opBackedFile = 
oldBackupPoint.getFile(fileName);
-      Checksum originalFileCS = backupRepo.checksum(dir, fileName);
-
-      if (opBackedFile.isPresent()) {
-        ShardBackupMetadata.BackedFile backedFile = opBackedFile.get();
-        Checksum existedFileCS = backedFile.fileChecksum;
-        if (existedFileCS.equals(originalFileCS)) {
-          currentBackupPoint.addBackedFile(opBackedFile.get());
-          backupStats.skippedUploadingFile(existedFileCS);
-          continue;
+    // Only use an executor for parallel uploads when parallelism > 1
+    // When set to 1, run synchronously to avoid thread-local state issues 
with CallerRunsPolicy
+    int maxParallelUploads = DEFAULT_MAX_PARALLEL_UPLOADS;
+    ExecutorService executor =
+        maxParallelUploads > 1
+            ? new ExecutorUtil.MDCAwareThreadPoolExecutor(
+                0,
+                maxParallelUploads,
+                60L,
+                TimeUnit.SECONDS,
+                new SynchronousQueue<>(),
+                new SolrNamedThreadFactory("IncrementalBackup"),
+                new ThreadPoolExecutor.CallerRunsPolicy())
+            : null;
+
+    List<Future<?>> uploadFutures = new ArrayList<>();
+
+    try {
+      for (String fileName : indexFiles) {
+        // Capture variable for lambda
+        final String fileNameFinal = fileName;
+
+        Runnable uploadTask =
+            () -> {
+              try {
+                // Calculate checksum and check if file already exists in 
previous backup
+                Optional<ShardBackupMetadata.BackedFile> opBackedFile =
+                    oldBackupPoint.getFile(fileNameFinal);
+                Checksum originalFileCS = backupRepo.checksum(dir, 
fileNameFinal);
+
+                if (opBackedFile.isPresent()) {
+                  ShardBackupMetadata.BackedFile backedFile = 
opBackedFile.get();
+                  Checksum existedFileCS = backedFile.fileChecksum;
+                  if (existedFileCS.equals(originalFileCS)) {
+                    synchronized (currentBackupPoint) {
+                      currentBackupPoint.addBackedFile(opBackedFile.get());
+                    }
+                    backupStats.skippedUploadingFile(existedFileCS);
+                    return;
+                  }
+                }
+
+                // File doesn't exist or has changed - upload it
+                String backedFileName = UUID.randomUUID().toString();
+                backupRepo.copyIndexFileFrom(dir, fileNameFinal, indexDir, 
backedFileName);
+
+                synchronized (currentBackupPoint) {
+                  currentBackupPoint.addBackedFile(backedFileName, 
fileNameFinal, originalFileCS);
+                }
+                backupStats.uploadedFile(originalFileCS);
+              } catch (IOException e) {
+                throw new RuntimeException("Failed to process file: " + 
fileNameFinal, e);
+              }
+            };
+
+        if (executor != null) {
+          uploadFutures.add(executor.submit(uploadTask));
+        } else {
+          // Run synchronously when parallelism is 1
+          try {
+            uploadTask.run();
+          } catch (RuntimeException e) {
+            if (e.getCause() instanceof IOException) {
+              throw (IOException) e.getCause();
+            }
+            throw e;

Review Comment:
   In the synchronous path, rethrowing only `e.getCause()` (when it’s an 
`IOException`) loses the wrapper message that includes the filename ("Failed to 
process file: ..."). Preserve that per-file context when propagating errors so 
backup failures are diagnosable.



##########
solr/core/src/java/org/apache/solr/handler/IncrementalShardBackup.java:
##########
@@ -191,55 +213,160 @@ private BackupStats incrementalCopy(Collection<String> 
indexFiles, Directory dir
     URI indexDir = incBackupFiles.getIndexDir();
     BackupStats backupStats = new BackupStats();
 
-    for (String fileName : indexFiles) {
-      Optional<ShardBackupMetadata.BackedFile> opBackedFile = 
oldBackupPoint.getFile(fileName);
-      Checksum originalFileCS = backupRepo.checksum(dir, fileName);
-
-      if (opBackedFile.isPresent()) {
-        ShardBackupMetadata.BackedFile backedFile = opBackedFile.get();
-        Checksum existedFileCS = backedFile.fileChecksum;
-        if (existedFileCS.equals(originalFileCS)) {
-          currentBackupPoint.addBackedFile(opBackedFile.get());
-          backupStats.skippedUploadingFile(existedFileCS);
-          continue;
+    // Only use an executor for parallel uploads when parallelism > 1
+    // When set to 1, run synchronously to avoid thread-local state issues 
with CallerRunsPolicy
+    int maxParallelUploads = DEFAULT_MAX_PARALLEL_UPLOADS;
+    ExecutorService executor =
+        maxParallelUploads > 1
+            ? new ExecutorUtil.MDCAwareThreadPoolExecutor(
+                0,
+                maxParallelUploads,
+                60L,
+                TimeUnit.SECONDS,
+                new SynchronousQueue<>(),
+                new SolrNamedThreadFactory("IncrementalBackup"),
+                new ThreadPoolExecutor.CallerRunsPolicy())
+            : null;
+
+    List<Future<?>> uploadFutures = new ArrayList<>();
+
+    try {
+      for (String fileName : indexFiles) {
+        // Capture variable for lambda
+        final String fileNameFinal = fileName;
+
+        Runnable uploadTask =
+            () -> {
+              try {
+                // Calculate checksum and check if file already exists in 
previous backup
+                Optional<ShardBackupMetadata.BackedFile> opBackedFile =
+                    oldBackupPoint.getFile(fileNameFinal);
+                Checksum originalFileCS = backupRepo.checksum(dir, 
fileNameFinal);
+
+                if (opBackedFile.isPresent()) {
+                  ShardBackupMetadata.BackedFile backedFile = 
opBackedFile.get();
+                  Checksum existedFileCS = backedFile.fileChecksum;
+                  if (existedFileCS.equals(originalFileCS)) {
+                    synchronized (currentBackupPoint) {
+                      currentBackupPoint.addBackedFile(opBackedFile.get());
+                    }
+                    backupStats.skippedUploadingFile(existedFileCS);
+                    return;
+                  }
+                }
+
+                // File doesn't exist or has changed - upload it
+                String backedFileName = UUID.randomUUID().toString();
+                backupRepo.copyIndexFileFrom(dir, fileNameFinal, indexDir, 
backedFileName);
+
+                synchronized (currentBackupPoint) {
+                  currentBackupPoint.addBackedFile(backedFileName, 
fileNameFinal, originalFileCS);
+                }
+                backupStats.uploadedFile(originalFileCS);
+              } catch (IOException e) {
+                throw new RuntimeException("Failed to process file: " + 
fileNameFinal, e);
+              }
+            };
+
+        if (executor != null) {
+          uploadFutures.add(executor.submit(uploadTask));
+        } else {
+          // Run synchronously when parallelism is 1
+          try {
+            uploadTask.run();
+          } catch (RuntimeException e) {
+            if (e.getCause() instanceof IOException) {
+              throw (IOException) e.getCause();
+            }
+            throw e;
+          }
         }
       }
 
-      String backedFileName = UUID.randomUUID().toString();
-      backupRepo.copyIndexFileFrom(dir, fileName, indexDir, backedFileName);
+      // Wait for all uploads to complete and collect any errors (only if 
using executor)
+      if (executor != null) {
+        // We need to wait for ALL futures before throwing, otherwise we might 
exit
+        // before all successfully uploaded files are added to 
currentBackupPoint
+        Throwable firstError = null;
+        for (Future<?> future : uploadFutures) {
+          try {
+            future.get();
+          } catch (ExecutionException e) {
+            if (firstError == null) {
+              Throwable cause = e.getCause();
+              // Unwrap RuntimeExceptions that wrap the original IOException
+              if (cause instanceof RuntimeException && cause.getCause() != 
null) {
+                firstError = cause.getCause();
+              } else {

Review Comment:
   In the parallel join logic, unwrapping `RuntimeException` to 
`cause.getCause()` can discard the wrapper message that includes the filename. 
Preserve the wrapper message (or re-wrap the underlying `IOException` with file 
context) when surfacing the first failure from `future.get()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to