mlbiscoc commented on code in PR #4023:
URL: https://github.com/apache/solr/pull/4023#discussion_r3197423594


##########
solr/core/src/java/org/apache/solr/handler/IncrementalShardBackup.java:
##########
@@ -191,55 +216,115 @@ private BackupStats incrementalCopy(Collection<String> 
indexFiles, Directory dir
     URI indexDir = incBackupFiles.getIndexDir();
     BackupStats backupStats = new BackupStats();
 
-    for (String fileName : indexFiles) {
-      Optional<ShardBackupMetadata.BackedFile> opBackedFile = 
oldBackupPoint.getFile(fileName);
-      Checksum originalFileCS = backupRepo.checksum(dir, fileName);
-
-      if (opBackedFile.isPresent()) {
-        ShardBackupMetadata.BackedFile backedFile = opBackedFile.get();
-        Checksum existedFileCS = backedFile.fileChecksum;
-        if (existedFileCS.equals(originalFileCS)) {
-          currentBackupPoint.addBackedFile(opBackedFile.get());
-          backupStats.skippedUploadingFile(existedFileCS);
-          continue;
-        }
+    ExecutorService executor = BACKUP_EXECUTOR;
+
+    List<Future<?>> uploadFutures = new ArrayList<>();
+
+    try {
+      for (String fileName : indexFiles) {
+        // Capture variable for lambda
+        final String fileNameFinal = fileName;
+
+        Runnable uploadTask =
+            () -> {
+              try {
+                // Calculate checksum and check if file already exists in 
previous backup
+                Optional<ShardBackupMetadata.BackedFile> opBackedFile =
+                    oldBackupPoint.getFile(fileNameFinal);
+                Checksum originalFileCS = backupRepo.checksum(dir, 
fileNameFinal);
+
+                if (opBackedFile.isPresent()) {
+                  ShardBackupMetadata.BackedFile backedFile = 
opBackedFile.get();
+                  Checksum existedFileCS = backedFile.fileChecksum;
+                  if (existedFileCS.equals(originalFileCS)) {
+                    synchronized (currentBackupPoint) {
+                      currentBackupPoint.addBackedFile(opBackedFile.get());
+                    }
+                    backupStats.skippedUploadingFile(existedFileCS);
+                    return;
+                  }
+                }
+
+                // File doesn't exist or has changed - upload it
+                String backedFileName = UUID.randomUUID().toString();
+                backupRepo.copyIndexFileFrom(dir, fileNameFinal, indexDir, 
backedFileName);
+
+                synchronized (currentBackupPoint) {
+                  currentBackupPoint.addBackedFile(backedFileName, 
fileNameFinal, originalFileCS);
+                }
+                backupStats.uploadedFile(originalFileCS);
+              } catch (IOException e) {
+                throw new RuntimeException("Failed to process file: " + 
fileNameFinal, e);
+              }
+            };
+
+        uploadFutures.add(executor.submit(uploadTask));
       }
 
-      String backedFileName = UUID.randomUUID().toString();
-      backupRepo.copyIndexFileFrom(dir, fileName, indexDir, backedFileName);
+      // Wait for ALL futures before throwing - currentBackupPoint must 
reflect every
+      // successfully uploaded file before it is written, even when an error 
occurs.
+      Throwable firstError = null;
+      for (Future<?> future : uploadFutures) {
+        try {
+          future.get();
+        } catch (ExecutionException e) {
+          if (firstError == null) {
+            firstError = e.getCause();
+          }
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();

Review Comment:
   If any of them fail, you need to cancel the existing futures here with 
`future.cancel(True)` to stop executing the rest of the jobs in the queue. I 
believe all this does it interrupt the calling thread.



##########
solr/core/src/java/org/apache/solr/handler/IncrementalShardBackup.java:
##########
@@ -191,55 +216,115 @@ private BackupStats incrementalCopy(Collection<String> 
indexFiles, Directory dir
     URI indexDir = incBackupFiles.getIndexDir();
     BackupStats backupStats = new BackupStats();
 
-    for (String fileName : indexFiles) {
-      Optional<ShardBackupMetadata.BackedFile> opBackedFile = 
oldBackupPoint.getFile(fileName);
-      Checksum originalFileCS = backupRepo.checksum(dir, fileName);
-
-      if (opBackedFile.isPresent()) {
-        ShardBackupMetadata.BackedFile backedFile = opBackedFile.get();
-        Checksum existedFileCS = backedFile.fileChecksum;
-        if (existedFileCS.equals(originalFileCS)) {
-          currentBackupPoint.addBackedFile(opBackedFile.get());
-          backupStats.skippedUploadingFile(existedFileCS);
-          continue;
-        }
+    ExecutorService executor = BACKUP_EXECUTOR;
+
+    List<Future<?>> uploadFutures = new ArrayList<>();
+
+    try {
+      for (String fileName : indexFiles) {
+        // Capture variable for lambda
+        final String fileNameFinal = fileName;
+
+        Runnable uploadTask =
+            () -> {
+              try {
+                // Calculate checksum and check if file already exists in 
previous backup
+                Optional<ShardBackupMetadata.BackedFile> opBackedFile =
+                    oldBackupPoint.getFile(fileNameFinal);
+                Checksum originalFileCS = backupRepo.checksum(dir, 
fileNameFinal);
+
+                if (opBackedFile.isPresent()) {
+                  ShardBackupMetadata.BackedFile backedFile = 
opBackedFile.get();
+                  Checksum existedFileCS = backedFile.fileChecksum;
+                  if (existedFileCS.equals(originalFileCS)) {
+                    synchronized (currentBackupPoint) {
+                      currentBackupPoint.addBackedFile(opBackedFile.get());
+                    }
+                    backupStats.skippedUploadingFile(existedFileCS);
+                    return;
+                  }
+                }
+
+                // File doesn't exist or has changed - upload it
+                String backedFileName = UUID.randomUUID().toString();
+                backupRepo.copyIndexFileFrom(dir, fileNameFinal, indexDir, 
backedFileName);
+
+                synchronized (currentBackupPoint) {
+                  currentBackupPoint.addBackedFile(backedFileName, 
fileNameFinal, originalFileCS);
+                }
+                backupStats.uploadedFile(originalFileCS);
+              } catch (IOException e) {
+                throw new RuntimeException("Failed to process file: " + 
fileNameFinal, e);
+              }
+            };
+
+        uploadFutures.add(executor.submit(uploadTask));
       }
 
-      String backedFileName = UUID.randomUUID().toString();
-      backupRepo.copyIndexFileFrom(dir, fileName, indexDir, backedFileName);
+      // Wait for ALL futures before throwing - currentBackupPoint must 
reflect every
+      // successfully uploaded file before it is written, even when an error 
occurs.
+      Throwable firstError = null;
+      for (Future<?> future : uploadFutures) {
+        try {
+          future.get();
+        } catch (ExecutionException e) {
+          if (firstError == null) {
+            firstError = e.getCause();
+          }
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          if (firstError == null) {
+            firstError = e;
+          }
+        }
+      }
 
-      currentBackupPoint.addBackedFile(backedFileName, fileName, 
originalFileCS);
-      backupStats.uploadedFile(originalFileCS);
+      if (firstError != null) {
+        if (firstError instanceof Error) {
+          // Rethrow Errors (like OutOfMemoryError) - don't try to recover
+          throw (Error) firstError;
+        } else if (firstError instanceof IOException) {
+          throw (IOException) firstError;
+        } else if (firstError instanceof RuntimeException) {
+          throw (RuntimeException) firstError;
+        } else if (firstError instanceof InterruptedException) {
+          throw new IOException("Backup interrupted", firstError);
+        } else {
+          throw new IOException("Error during parallel backup upload", 
firstError);
+        }
+      }
+    } finally {

Review Comment:
   Just remove this finally since it does nothing now.



##########
solr/core/src/java/org/apache/solr/handler/IncrementalShardBackup.java:
##########
@@ -52,6 +62,21 @@
  */
 public class IncrementalShardBackup {
   private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  /**
+   * Maximum number of files to upload in parallel during backup. Can be 
configured via the system
+   * property {@code solr.backup.maxparalleluploads} or environment variable 
{@code
+   * SOLR_BACKUP_MAXPARALLELUPLOADS}.
+   */
+  private static final int DEFAULT_MAX_PARALLEL_UPLOADS =

Review Comment:
   This should just be `MAX_PARALLEL_UPLOADS` and drop the `default` prefix



##########
solr/core/src/java/org/apache/solr/handler/IncrementalShardBackup.java:
##########
@@ -191,55 +216,115 @@ private BackupStats incrementalCopy(Collection<String> 
indexFiles, Directory dir
     URI indexDir = incBackupFiles.getIndexDir();
     BackupStats backupStats = new BackupStats();
 
-    for (String fileName : indexFiles) {
-      Optional<ShardBackupMetadata.BackedFile> opBackedFile = 
oldBackupPoint.getFile(fileName);
-      Checksum originalFileCS = backupRepo.checksum(dir, fileName);
-
-      if (opBackedFile.isPresent()) {
-        ShardBackupMetadata.BackedFile backedFile = opBackedFile.get();
-        Checksum existedFileCS = backedFile.fileChecksum;
-        if (existedFileCS.equals(originalFileCS)) {
-          currentBackupPoint.addBackedFile(opBackedFile.get());
-          backupStats.skippedUploadingFile(existedFileCS);
-          continue;
-        }
+    ExecutorService executor = BACKUP_EXECUTOR;

Review Comment:
   Just use `BACKUP_EXECUTOR` instead of doing this.



##########
solr/core/src/java/org/apache/solr/handler/IncrementalShardBackup.java:
##########
@@ -191,55 +216,115 @@ private BackupStats incrementalCopy(Collection<String> 
indexFiles, Directory dir
     URI indexDir = incBackupFiles.getIndexDir();
     BackupStats backupStats = new BackupStats();
 
-    for (String fileName : indexFiles) {
-      Optional<ShardBackupMetadata.BackedFile> opBackedFile = 
oldBackupPoint.getFile(fileName);
-      Checksum originalFileCS = backupRepo.checksum(dir, fileName);
-
-      if (opBackedFile.isPresent()) {
-        ShardBackupMetadata.BackedFile backedFile = opBackedFile.get();
-        Checksum existedFileCS = backedFile.fileChecksum;
-        if (existedFileCS.equals(originalFileCS)) {
-          currentBackupPoint.addBackedFile(opBackedFile.get());
-          backupStats.skippedUploadingFile(existedFileCS);
-          continue;
-        }
+    ExecutorService executor = BACKUP_EXECUTOR;
+
+    List<Future<?>> uploadFutures = new ArrayList<>();
+
+    try {
+      for (String fileName : indexFiles) {
+        // Capture variable for lambda
+        final String fileNameFinal = fileName;
+
+        Runnable uploadTask =
+            () -> {
+              try {
+                // Calculate checksum and check if file already exists in 
previous backup
+                Optional<ShardBackupMetadata.BackedFile> opBackedFile =
+                    oldBackupPoint.getFile(fileNameFinal);
+                Checksum originalFileCS = backupRepo.checksum(dir, 
fileNameFinal);
+
+                if (opBackedFile.isPresent()) {
+                  ShardBackupMetadata.BackedFile backedFile = 
opBackedFile.get();
+                  Checksum existedFileCS = backedFile.fileChecksum;
+                  if (existedFileCS.equals(originalFileCS)) {
+                    synchronized (currentBackupPoint) {
+                      currentBackupPoint.addBackedFile(opBackedFile.get());
+                    }
+                    backupStats.skippedUploadingFile(existedFileCS);
+                    return;
+                  }
+                }
+
+                // File doesn't exist or has changed - upload it
+                String backedFileName = UUID.randomUUID().toString();
+                backupRepo.copyIndexFileFrom(dir, fileNameFinal, indexDir, 
backedFileName);
+
+                synchronized (currentBackupPoint) {
+                  currentBackupPoint.addBackedFile(backedFileName, 
fileNameFinal, originalFileCS);
+                }
+                backupStats.uploadedFile(originalFileCS);
+              } catch (IOException e) {
+                throw new RuntimeException("Failed to process file: " + 
fileNameFinal, e);
+              }
+            };
+
+        uploadFutures.add(executor.submit(uploadTask));
       }
 
-      String backedFileName = UUID.randomUUID().toString();
-      backupRepo.copyIndexFileFrom(dir, fileName, indexDir, backedFileName);
+      // Wait for ALL futures before throwing - currentBackupPoint must 
reflect every
+      // successfully uploaded file before it is written, even when an error 
occurs.
+      Throwable firstError = null;
+      for (Future<?> future : uploadFutures) {
+        try {
+          future.get();
+        } catch (ExecutionException e) {
+          if (firstError == null) {
+            firstError = e.getCause();
+          }
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          if (firstError == null) {
+            firstError = e;
+          }
+        }
+      }
 
-      currentBackupPoint.addBackedFile(backedFileName, fileName, 
originalFileCS);
-      backupStats.uploadedFile(originalFileCS);
+      if (firstError != null) {
+        if (firstError instanceof Error) {
+          // Rethrow Errors (like OutOfMemoryError) - don't try to recover
+          throw (Error) firstError;
+        } else if (firstError instanceof IOException) {
+          throw (IOException) firstError;
+        } else if (firstError instanceof RuntimeException) {
+          throw (RuntimeException) firstError;
+        } else if (firstError instanceof InterruptedException) {
+          throw new IOException("Backup interrupted", firstError);
+        } else {
+          throw new IOException("Error during parallel backup upload", 
firstError);
+        }

Review Comment:
   In Java 21 you can use a switch case here



##########
solr/solr-ref-guide/modules/deployment-guide/pages/backup-restore.adoc:
##########
@@ -396,6 +396,39 @@ Any children under the `<repository>` tag are passed as 
additional configuration
 
 Information on each of the repository implementations provided with Solr is 
provided below.
 
+=== Parallel File Transfers
+
+Backup and restore operations can transfer multiple index files in parallel to 
improve throughput, especially when using cloud storage repositories like S3 or 
GCS where latency is higher.
+The parallelism is controlled via system properties or environment variables:
+
+`solr.backup.maxparalleluploads`::
++
+[%autowidth,frame=none]
+|===
+|Optional |Default: `1`
+|===
++
+Maximum number of index files to upload in parallel during backup operations.
+Can also be set via the `SOLR_BACKUP_MAXPARALLELUPLOADS` environment variable.
+Increasing this value can significantly improve backup throughput when using 
cloud storage (S3, GCS), but too high a value will increase IOPS and bandwidth 
pressure on your cluster.
+Start small and increase based on observed throughput and available resources.
+
+`solr.backup.maxparalleldownloads`::
++
+[%autowidth,frame=none]
+|===
+|Optional |Default: `1`
+|===
++
+Maximum number of index files to download in parallel during restore 
operations.
+Can also be set via the `SOLR_BACKUP_MAXPARALLELDOWNLOADS` environment 
variable.
+Increasing this value can significantly improve restore throughput when using 
cloud storage (S3, GCS), but too high a value will increase IOPS and bandwidth 
pressure on your cluster.
+Start small and increase based on observed throughput and available resources.
+
+TIP: Both settings share a single global thread pool per property, so the 
configured limit applies across all concurrent backup or restore operations on 
the node.

Review Comment:
   I misread this the first time as both backup restore share a single pool 
which contradicted the settings. Can you rewrite this to be clearly state that 
these are 2 separate thread pools



##########
solr/core/src/java/org/apache/solr/handler/IncrementalShardBackup.java:
##########
@@ -191,55 +216,115 @@ private BackupStats incrementalCopy(Collection<String> 
indexFiles, Directory dir
     URI indexDir = incBackupFiles.getIndexDir();
     BackupStats backupStats = new BackupStats();
 
-    for (String fileName : indexFiles) {
-      Optional<ShardBackupMetadata.BackedFile> opBackedFile = 
oldBackupPoint.getFile(fileName);
-      Checksum originalFileCS = backupRepo.checksum(dir, fileName);
-
-      if (opBackedFile.isPresent()) {
-        ShardBackupMetadata.BackedFile backedFile = opBackedFile.get();
-        Checksum existedFileCS = backedFile.fileChecksum;
-        if (existedFileCS.equals(originalFileCS)) {
-          currentBackupPoint.addBackedFile(opBackedFile.get());
-          backupStats.skippedUploadingFile(existedFileCS);
-          continue;
-        }
+    ExecutorService executor = BACKUP_EXECUTOR;
+
+    List<Future<?>> uploadFutures = new ArrayList<>();
+
+    try {
+      for (String fileName : indexFiles) {
+        // Capture variable for lambda
+        final String fileNameFinal = fileName;
+
+        Runnable uploadTask =
+            () -> {
+              try {
+                // Calculate checksum and check if file already exists in 
previous backup
+                Optional<ShardBackupMetadata.BackedFile> opBackedFile =
+                    oldBackupPoint.getFile(fileNameFinal);
+                Checksum originalFileCS = backupRepo.checksum(dir, 
fileNameFinal);
+
+                if (opBackedFile.isPresent()) {
+                  ShardBackupMetadata.BackedFile backedFile = 
opBackedFile.get();
+                  Checksum existedFileCS = backedFile.fileChecksum;
+                  if (existedFileCS.equals(originalFileCS)) {
+                    synchronized (currentBackupPoint) {
+                      currentBackupPoint.addBackedFile(opBackedFile.get());
+                    }
+                    backupStats.skippedUploadingFile(existedFileCS);
+                    return;
+                  }
+                }
+
+                // File doesn't exist or has changed - upload it
+                String backedFileName = UUID.randomUUID().toString();
+                backupRepo.copyIndexFileFrom(dir, fileNameFinal, indexDir, 
backedFileName);
+
+                synchronized (currentBackupPoint) {
+                  currentBackupPoint.addBackedFile(backedFileName, 
fileNameFinal, originalFileCS);
+                }
+                backupStats.uploadedFile(originalFileCS);
+              } catch (IOException e) {
+                throw new RuntimeException("Failed to process file: " + 
fileNameFinal, e);
+              }
+            };
+
+        uploadFutures.add(executor.submit(uploadTask));
       }
 
-      String backedFileName = UUID.randomUUID().toString();
-      backupRepo.copyIndexFileFrom(dir, fileName, indexDir, backedFileName);
+      // Wait for ALL futures before throwing - currentBackupPoint must 
reflect every
+      // successfully uploaded file before it is written, even when an error 
occurs.
+      Throwable firstError = null;
+      for (Future<?> future : uploadFutures) {
+        try {
+          future.get();
+        } catch (ExecutionException e) {
+          if (firstError == null) {
+            firstError = e.getCause();
+          }
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          if (firstError == null) {
+            firstError = e;
+          }
+        }
+      }
 
-      currentBackupPoint.addBackedFile(backedFileName, fileName, 
originalFileCS);
-      backupStats.uploadedFile(originalFileCS);
+      if (firstError != null) {
+        if (firstError instanceof Error) {
+          // Rethrow Errors (like OutOfMemoryError) - don't try to recover
+          throw (Error) firstError;
+        } else if (firstError instanceof IOException) {
+          throw (IOException) firstError;
+        } else if (firstError instanceof RuntimeException) {
+          throw (RuntimeException) firstError;
+        } else if (firstError instanceof InterruptedException) {
+          throw new IOException("Backup interrupted", firstError);
+        } else {
+          throw new IOException("Error during parallel backup upload", 
firstError);

Review Comment:
   You threw a `SolrException` in RestoreCore but throw `IOException` here.



##########
solr/core/src/java/org/apache/solr/handler/RestoreCore.java:
##########
@@ -107,34 +125,140 @@ public boolean doRestore() throws Exception {
                   DirectoryFactory.DirContext.DEFAULT,
                   core.getSolrConfig().indexConfig.lockType);
       Set<String> indexDirFiles = new 
HashSet<>(Arrays.asList(indexDir.listAll()));
-      // Move all files from backupDir to restoreIndexDir
-      for (String filename : repository.listAllFiles()) {
-        checkInterrupted();
-        try {
-          if (indexDirFiles.contains(filename)) {
-            Checksum cs = repository.checksum(filename);
-            IndexFetcher.CompareResult compareResult;
-            if (cs == null) {
-              compareResult = new IndexFetcher.CompareResult();
-              compareResult.equal = false;
-            } else {
-              compareResult = IndexFetcher.compareFile(indexDir, filename, 
cs.size, cs.checksum);
+
+      // Capture directories as final for lambda access
+      final Directory finalIndexDir = indexDir;
+      final Directory finalRestoreIndexDir = restoreIndexDir;
+
+      // Only use an executor for parallel downloads when parallelism > 1
+      // When set to 1, run synchronously to avoid thread-local state issues 
with CallerRunsPolicy
+      int maxParallelDownloads = DEFAULT_MAX_PARALLEL_DOWNLOADS;
+      ExecutorService executor =
+          maxParallelDownloads > 1
+              ? new ExecutorUtil.MDCAwareThreadPoolExecutor(
+                  0,
+                  maxParallelDownloads,
+                  60L,
+                  TimeUnit.SECONDS,
+                  new SynchronousQueue<>(),
+                  new SolrNamedThreadFactory("RestoreCore"),
+                  new ThreadPoolExecutor.CallerRunsPolicy())

Review Comment:
   Thanks! I see you dropped `CallerRunsPolicy`. I kind of liked it so that the 
calling thread can handle some of the backpressure but I can see an argument 
for not keeping it as it is harder to control. Also when this gets released, 
since we set the default to 1, backup/restore will be slow if someone upgrades 
without knowing this change was implemented and the calling thread isn't doing 
the backup anymore. @dsmiley wdyt?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to