This is an automated email from the ASF dual-hosted git repository.

dsmiley pushed a commit to branch branch_10x
in repository https://gitbox.apache.org/repos/asf/solr.git

commit e2edd26b12c9e97d32c5b3383b7da5f9579d2665
Author: Samuel Verstraete <[email protected]>
AuthorDate: Wed Jun 10 20:12:20 2026 +0200

    SOLR-17208: Parallelize backup and restore file transfers (#4023)
    
    Co-authored-by: samuel <[email protected]>
    Co-authored-by: David Smiley <[email protected]>
    (cherry picked from commit 74974b1be7e5b4a58dbe93468a359aca9621a647)
---
 .../unreleased/SOLR-1092-parallelizebackups.yml    |  10 ++
 .../solr/handler/IncrementalShardBackup.java       | 133 ++++++++++++++++-----
 .../java/org/apache/solr/handler/RestoreCore.java  | 115 ++++++++++++++----
 .../apache/solr/gcs/GCSIncrementalBackupTest.java  |   3 +
 .../apache/solr/s3/S3IncrementalBackupTest.java    |   3 +
 .../deployment-guide/pages/backup-restore.adoc     |  32 +++++
 .../org/apache/solr/common/util/ObjectCache.java   |   7 +-
 7 files changed, 245 insertions(+), 58 deletions(-)

diff --git a/changelog/unreleased/SOLR-1092-parallelizebackups.yml 
b/changelog/unreleased/SOLR-1092-parallelizebackups.yml
new file mode 100644
index 00000000000..84853f8bc39
--- /dev/null
+++ b/changelog/unreleased/SOLR-1092-parallelizebackups.yml
@@ -0,0 +1,10 @@
+title: Parallelize Backup and Restore File Operations
+type: changed
+authors:
+  - name: Samuel Verstraete
+    nick: elangelo
+  - name: David Smiley
+    nick: dsmiley
+links:
+  - name: SOLR-1092
+    url: https://issues.apache.org/jira/browse/SOLR-1092
diff --git 
a/solr/core/src/java/org/apache/solr/handler/IncrementalShardBackup.java 
b/solr/core/src/java/org/apache/solr/handler/IncrementalShardBackup.java
index 0e07ac0ca28..71ea48eac82 100644
--- a/solr/core/src/java/org/apache/solr/handler/IncrementalShardBackup.java
+++ b/solr/core/src/java/org/apache/solr/handler/IncrementalShardBackup.java
@@ -23,15 +23,25 @@ import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.net.URI;
 import java.time.Instant;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
 import java.util.Optional;
 import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.commons.math3.util.Precision;
 import org.apache.lucene.index.IndexCommit;
 import org.apache.lucene.store.Directory;
 import org.apache.solr.client.api.model.SolrJerseyResponse;
 import org.apache.solr.cloud.CloudDescriptor;
 import org.apache.solr.common.SolrException;
+import org.apache.solr.common.util.EnvUtils;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.SolrNamedThreadFactory;
 import org.apache.solr.core.DirectoryFactory;
 import org.apache.solr.core.IndexDeletionPolicyWrapper;
 import org.apache.solr.core.SolrCore;
@@ -52,6 +62,15 @@ import org.slf4j.LoggerFactory;
  */
 public class IncrementalShardBackup {
   private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  /**
+   * Maximum number of files to upload in parallel during backup. Can be 
configured via the system
+   * property {@code solr.backup.maxparalleluploads} or environment variable 
{@code
+   * SOLR_BACKUP_MAXPARALLELUPLOADS}.
+   */
+  private static final int MAX_PARALLEL_UPLOADS =
+      EnvUtils.getPropertyAsInteger("solr.backup.maxparalleluploads", 1);
+
   private SolrCore solrCore;
 
   private BackupFilePaths incBackupFiles;
@@ -154,8 +173,8 @@ public class IncrementalShardBackup {
                 solrCore.getSolrConfig().indexConfig.lockType);
     try {
       BackupStats stats = incrementalCopy(files, dir);
-      details.indexFileCount = stats.fileCount;
-      details.uploadedIndexFileCount = stats.uploadedFileCount;
+      details.indexFileCount = stats.fileCount.get();
+      details.uploadedIndexFileCount = stats.uploadedFileCount.get();
       details.indexSizeMB = stats.getIndexSizeMB();
       details.uploadedIndexFileMB = stats.getTotalUploadedMB();
     } finally {
@@ -191,25 +210,77 @@ public class IncrementalShardBackup {
     URI indexDir = incBackupFiles.getIndexDir();
     BackupStats backupStats = new BackupStats();
 
+    var executor =
+        solrCore
+            .getCoreContainer()
+            .getObjectCache()
+            .computeIfAbsent(
+                "BackupUploadExecutor",
+                ExecutorService.class,
+                s ->
+                    ExecutorUtil.newMDCAwareCachedThreadPool(
+                        MAX_PARALLEL_UPLOADS,
+                        Integer.MAX_VALUE,
+                        new SolrNamedThreadFactory("BackupUploadExecutor")));
+
+    List<Future<?>> uploadFutures = new ArrayList<>();
     for (String fileName : indexFiles) {
-      Optional<ShardBackupMetadata.BackedFile> opBackedFile = 
oldBackupPoint.getFile(fileName);
-      Checksum originalFileCS = backupRepo.checksum(dir, fileName);
-
-      if (opBackedFile.isPresent()) {
-        ShardBackupMetadata.BackedFile backedFile = opBackedFile.get();
-        Checksum existedFileCS = backedFile.fileChecksum;
-        if (existedFileCS.equals(originalFileCS)) {
-          currentBackupPoint.addBackedFile(opBackedFile.get());
-          backupStats.skippedUploadingFile(existedFileCS);
-          continue;
-        }
-      }
-
-      String backedFileName = UUID.randomUUID().toString();
-      backupRepo.copyIndexFileFrom(dir, fileName, indexDir, backedFileName);
+      // Capture variable for lambda
+      final String fileNameFinal = fileName;
+
+      Runnable uploadTask =
+          () -> {
+            try {
+              // Calculate checksum and check if file already exists in 
previous backup
+              Optional<ShardBackupMetadata.BackedFile> opBackedFile =
+                  oldBackupPoint.getFile(fileNameFinal);
+              Checksum originalFileCS = backupRepo.checksum(dir, 
fileNameFinal);
+
+              if (opBackedFile.isPresent()) {
+                ShardBackupMetadata.BackedFile backedFile = opBackedFile.get();
+                Checksum existedFileCS = backedFile.fileChecksum;
+                if (existedFileCS.equals(originalFileCS)) {
+                  synchronized (currentBackupPoint) {
+                    currentBackupPoint.addBackedFile(opBackedFile.get());
+                  }
+                  backupStats.skippedUploadingFile(existedFileCS);
+                  return;
+                }
+              }
+
+              // File doesn't exist or has changed - upload it
+              String backedFileName = UUID.randomUUID().toString();
+              backupRepo.copyIndexFileFrom(dir, fileNameFinal, indexDir, 
backedFileName);
+
+              synchronized (currentBackupPoint) {
+                currentBackupPoint.addBackedFile(backedFileName, 
fileNameFinal, originalFileCS);
+              }
+              backupStats.uploadedFile(originalFileCS);
+            } catch (IOException e) {
+              throw new RuntimeException("Failed to process file: " + 
fileNameFinal, e);
+            }
+          };
+
+      uploadFutures.add(executor.submit(uploadTask));
+    }
 
-      currentBackupPoint.addBackedFile(backedFileName, fileName, 
originalFileCS);
-      backupStats.uploadedFile(originalFileCS);
+    try {
+      for (Future<?> future : uploadFutures) {
+        future.get();
+      }
+    } catch (ExecutionException e) {
+      uploadFutures.forEach(f -> f.cancel(true));
+      Throwable cause = e.getCause();
+      switch (cause) {
+        case Error err -> throw err;
+        case IOException ioe -> throw ioe;
+        case RuntimeException re -> throw re;
+        default -> throw new SolrException(
+            SolrException.ErrorCode.UNKNOWN, "Error during parallel backup 
upload", cause);
+      }
+    } catch (InterruptedException e) {
+      uploadFutures.forEach(f -> f.cancel(true));
+      throw new SolrException(SolrException.ErrorCode.UNKNOWN, "Backup 
interrupted", e);
     }
 
     currentBackupPoint.store(backupRepo, 
incBackupFiles.getShardBackupMetadataDir(), shardBackupId);
@@ -217,29 +288,29 @@ public class IncrementalShardBackup {
   }
 
   private static class BackupStats {
-    private int fileCount;
-    private int uploadedFileCount;
-    private long indexSize;
-    private long totalUploadedBytes;
+    private final AtomicInteger fileCount = new AtomicInteger();
+    private final AtomicInteger uploadedFileCount = new AtomicInteger();
+    private final AtomicLong indexSize = new AtomicLong();
+    private final AtomicLong totalUploadedBytes = new AtomicLong();
 
     public void uploadedFile(Checksum file) {
-      fileCount++;
-      uploadedFileCount++;
-      indexSize += file.size;
-      totalUploadedBytes += file.size;
+      fileCount.incrementAndGet();
+      uploadedFileCount.incrementAndGet();
+      indexSize.addAndGet(file.size);
+      totalUploadedBytes.addAndGet(file.size);
     }
 
     public void skippedUploadingFile(Checksum existedFile) {
-      fileCount++;
-      indexSize += existedFile.size;
+      fileCount.incrementAndGet();
+      indexSize.addAndGet(existedFile.size);
     }
 
     public double getIndexSizeMB() {
-      return Precision.round(indexSize / (1024.0 * 1024), 3);
+      return Precision.round(indexSize.get() / (1024.0 * 1024), 3);
     }
 
     public double getTotalUploadedMB() {
-      return Precision.round(totalUploadedBytes / (1024.0 * 1024), 3);
+      return Precision.round(totalUploadedBytes.get() / (1024.0 * 1024), 3);
     }
   }
 
diff --git a/solr/core/src/java/org/apache/solr/handler/RestoreCore.java 
b/solr/core/src/java/org/apache/solr/handler/RestoreCore.java
index 7f4abc18ffd..244a50e7827 100644
--- a/solr/core/src/java/org/apache/solr/handler/RestoreCore.java
+++ b/solr/core/src/java/org/apache/solr/handler/RestoreCore.java
@@ -21,19 +21,26 @@ import java.lang.invoke.MethodHandles;
 import java.lang.reflect.Array;
 import java.net.URI;
 import java.text.SimpleDateFormat;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Date;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Locale;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import org.apache.lucene.codecs.CodecUtil;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.IOContext;
 import org.apache.lucene.store.IndexInput;
 import org.apache.solr.common.SolrException;
+import org.apache.solr.common.util.EnvUtils;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.SolrNamedThreadFactory;
 import org.apache.solr.core.DirectoryFactory;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.core.backup.BackupFilePaths;
@@ -48,6 +55,14 @@ public class RestoreCore implements Callable<Boolean> {
 
   private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
+  /**
+   * Maximum number of files to download in parallel during restore. Can be 
configured via the
+   * system property {@code solr.backup.maxparalleldownloads} or environment 
variable {@code
+   * SOLR_BACKUP_MAXPARALLELDOWNLOADS}.
+   */
+  private static final int MAX_PARALLEL_DOWNLOADS =
+      EnvUtils.getPropertyAsInteger("solr.backup.maxparalleldownloads", 1);
+
   private final SolrCore core;
   private RestoreRepository repository;
 
@@ -107,35 +122,85 @@ public class RestoreCore implements Callable<Boolean> {
                   DirectoryFactory.DirContext.DEFAULT,
                   core.getSolrConfig().indexConfig.lockType);
       Set<String> indexDirFiles = new 
HashSet<>(Arrays.asList(indexDir.listAll()));
+
+      // Capture directories as final for lambda access
+      final Directory finalIndexDir = indexDir;
+      final Directory finalRestoreIndexDir = restoreIndexDir;
+
+      List<Future<?>> downloadFutures = new ArrayList<>();
+
+      var executor =
+          core.getCoreContainer()
+              .getObjectCache()
+              .computeIfAbsent(
+                  "RestoreDownloadExecutor",
+                  ExecutorService.class,
+                  s ->
+                      ExecutorUtil.newMDCAwareCachedThreadPool(
+                          MAX_PARALLEL_DOWNLOADS,
+                          Integer.MAX_VALUE,
+                          new 
SolrNamedThreadFactory("RestoreDownloadExecutor")));
+
       // Move all files from backupDir to restoreIndexDir
       for (String filename : repository.listAllFiles()) {
         checkInterrupted();
-        try {
-          if (indexDirFiles.contains(filename)) {
-            Checksum cs = repository.checksum(filename);
-            IndexFetcher.CompareResult compareResult;
-            if (cs == null) {
-              compareResult = new IndexFetcher.CompareResult();
-              compareResult.equal = false;
-            } else {
-              compareResult = IndexFetcher.compareFile(indexDir, filename, 
cs.size, cs.checksum);
-            }
-            if (!compareResult.equal
-                || (IndexFetcher.filesToAlwaysDownloadIfNoChecksums(
-                    filename, cs.size, compareResult))) {
-              repository.repoCopy(filename, restoreIndexDir);
-            } else {
-              // prefer local copy
-              repository.localCopy(indexDir, filename, restoreIndexDir);
-            }
-          } else {
-            repository.repoCopy(filename, restoreIndexDir);
-          }
-        } catch (Exception e) {
-          log.warn("Exception while restoring the backup index ", e);
-          throw new SolrException(
-              SolrException.ErrorCode.UNKNOWN, "Exception while restoring the 
backup index", e);
+
+        // Capture variables for lambda
+        final String filenameFinal = filename;
+        final boolean fileExistsLocally = indexDirFiles.contains(filename);
+
+        Runnable downloadTask =
+            () -> {
+              try {
+                if (fileExistsLocally) {
+                  Checksum cs = repository.checksum(filenameFinal);
+                  IndexFetcher.CompareResult compareResult;
+                  if (cs == null) {
+                    compareResult = new IndexFetcher.CompareResult();
+                    compareResult.equal = false;
+                  } else {
+                    compareResult =
+                        IndexFetcher.compareFile(
+                            finalIndexDir, filenameFinal, cs.size, 
cs.checksum);
+                  }
+                  if (!compareResult.equal
+                      || (IndexFetcher.filesToAlwaysDownloadIfNoChecksums(
+                          filenameFinal, cs.size, compareResult))) {
+                    repository.repoCopy(filenameFinal, finalRestoreIndexDir);
+                  } else {
+                    // prefer local copy
+                    repository.localCopy(finalIndexDir, filenameFinal, 
finalRestoreIndexDir);
+                  }
+                } else {
+                  repository.repoCopy(filenameFinal, finalRestoreIndexDir);
+                }
+              } catch (Exception e) {
+                log.warn("Exception while restoring the backup index ", e);
+                throw new RuntimeException(
+                    "Exception while restoring the backup index for file: " + 
filenameFinal, e);
+              }
+            };
+
+        downloadFutures.add(executor.submit(downloadTask));
+      }
+
+      try {
+        for (Future<?> future : downloadFutures) {
+          future.get();
+        }
+      } catch (ExecutionException e) {
+        downloadFutures.forEach(f -> f.cancel(true));
+        Throwable cause = e.getCause();
+        switch (cause) {
+          case Error err -> throw err;
+          case IOException ioe -> throw ioe;
+          case RuntimeException re -> throw re;
+          default -> throw new SolrException(
+              SolrException.ErrorCode.UNKNOWN, "Error during parallel restore 
download", cause);
         }
+      } catch (InterruptedException e) {
+        downloadFutures.forEach(f -> f.cancel(true));
+        throw new SolrException(SolrException.ErrorCode.UNKNOWN, "Restore 
interrupted", e);
       }
       log.debug("Switching directories");
       core.modifyIndexProps(restoreIndexName);
diff --git 
a/solr/modules/gcs-repository/src/test/org/apache/solr/gcs/GCSIncrementalBackupTest.java
 
b/solr/modules/gcs-repository/src/test/org/apache/solr/gcs/GCSIncrementalBackupTest.java
index d955da11e1e..62b9b8e7717 100644
--- 
a/solr/modules/gcs-repository/src/test/org/apache/solr/gcs/GCSIncrementalBackupTest.java
+++ 
b/solr/modules/gcs-repository/src/test/org/apache/solr/gcs/GCSIncrementalBackupTest.java
@@ -76,6 +76,9 @@ public class GCSIncrementalBackupTest extends 
AbstractIncrementalBackupTest {
 
   @BeforeClass
   public static void setupClass() throws Exception {
+    // Enable parallel backup/restore for cloud storage tests
+    System.setProperty("solr.backup.maxparalleluploads", "2");
+    System.setProperty("solr.backup.maxparalleldownloads", "2");
 
     configureCluster(NUM_NODES) // nodes
         .addConfig("conf1", getFile("conf/solrconfig.xml").getParent())
diff --git 
a/solr/modules/s3-repository/src/test/org/apache/solr/s3/S3IncrementalBackupTest.java
 
b/solr/modules/s3-repository/src/test/org/apache/solr/s3/S3IncrementalBackupTest.java
index c35dbc17ab0..8cebe4a80a5 100644
--- 
a/solr/modules/s3-repository/src/test/org/apache/solr/s3/S3IncrementalBackupTest.java
+++ 
b/solr/modules/s3-repository/src/test/org/apache/solr/s3/S3IncrementalBackupTest.java
@@ -97,6 +97,9 @@ public class S3IncrementalBackupTest extends 
AbstractIncrementalBackupTest {
   public static void setupClass() throws Exception {
     System.setProperty("aws.accessKeyId", "foo");
     System.setProperty("aws.secretAccessKey", "bar");
+    // Enable parallel backup/restore for cloud storage tests
+    System.setProperty("solr.backup.maxparalleluploads", "2");
+    System.setProperty("solr.backup.maxparalleldownloads", "2");
     String retryMode;
     switch (random().nextInt(3)) {
       case 0:
diff --git 
a/solr/solr-ref-guide/modules/deployment-guide/pages/backup-restore.adoc 
b/solr/solr-ref-guide/modules/deployment-guide/pages/backup-restore.adoc
index 12297153e9c..7e9cdd35bf2 100644
--- a/solr/solr-ref-guide/modules/deployment-guide/pages/backup-restore.adoc
+++ b/solr/solr-ref-guide/modules/deployment-guide/pages/backup-restore.adoc
@@ -396,6 +396,38 @@ Any children under the `<repository>` tag are passed as 
additional configuration
 
 Information on each of the repository implementations provided with Solr is 
provided below.
 
+=== Parallel File Transfers
+
+Backup and restore operations can transfer multiple index files in parallel to 
improve throughput, especially when using cloud storage repositories like S3 or 
GCS where latency is higher.
+The parallelism is controlled via system properties or environment variables:
+
+`solr.backup.maxparalleluploads`::
++
+[%autowidth,frame=none]
+|===
+|Optional |Default: `1`
+|===
++
+Maximum number of index files to upload in parallel during backup operations.
+Can also be set via the `SOLR_BACKUP_MAXPARALLELUPLOADS` environment variable.
+
+`solr.backup.maxparalleldownloads`::
++
+[%autowidth,frame=none]
+|===
+|Optional |Default: `1`
+|===
++
+Maximum number of index files to download in parallel during restore 
operations.
+Can also be set via the `SOLR_BACKUP_MAXPARALLELDOWNLOADS` environment 
variable.
+
+TIP: These are two independent thread pools — one for backup uploads and one 
for restore downloads.
+Increasing either value can significantly improve throughput when using cloud 
storage (S3, GCS), but too high a value will increase IOPS and bandwidth 
pressure on your cluster.
+Start small and increase based on observed throughput and available resources.
+The configured limit applies across all concurrent backup or restore 
operations on the node.
+
+=== Checksum Verification
+
 By default, all the repository implementations verify the integrity of the 
index files before they are copied to the destination. However, it is possible 
to disable this integrity check by setting the optional configuration property 
`verifyChecksum`.
 
 `verifyChecksum`::
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/ObjectCache.java 
b/solr/solrj/src/java/org/apache/solr/common/util/ObjectCache.java
index 2bbb6529c0f..096998b76c8 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/ObjectCache.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/ObjectCache.java
@@ -19,6 +19,7 @@ package org.apache.solr.common.util;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
 import org.apache.solr.common.SolrCloseable;
@@ -91,8 +92,10 @@ public class ObjectCache extends MapBackedCache<String, 
Object> implements SolrC
       // owns this ObjectCache, which is useful for plugins to register objects
       // which should be closed before being garbage-collected.
       for (Object value : map.values()) {
-        if (value instanceof Closeable) {
-          ((Closeable) value).close();
+        if (value instanceof Closeable closeable) {
+          closeable.close();
+        } else if (value instanceof ExecutorService executor) {
+          ExecutorUtil.shutdownAndAwaitTermination(executor);
         }
       }
       map.clear();

Reply via email to