This is an automated email from the ASF dual-hosted git repository.
dsmiley pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/main by this push:
new 74974b1be7e SOLR-17208: Parallelize backup and restore file transfers
(#4023)
74974b1be7e is described below
commit 74974b1be7e5b4a58dbe93468a359aca9621a647
Author: Samuel Verstraete <[email protected]>
AuthorDate: Wed Jun 10 20:12:20 2026 +0200
SOLR-17208: Parallelize backup and restore file transfers (#4023)
Co-authored-by: samuel <[email protected]>
Co-authored-by: David Smiley <[email protected]>
---
.../unreleased/SOLR-1092-parallelizebackups.yml | 10 ++
.../solr/handler/IncrementalShardBackup.java | 133 ++++++++++++++++-----
.../java/org/apache/solr/handler/RestoreCore.java | 115 ++++++++++++++----
.../apache/solr/gcs/GCSIncrementalBackupTest.java | 3 +
.../apache/solr/s3/S3IncrementalBackupTest.java | 3 +
.../deployment-guide/pages/backup-restore.adoc | 32 +++++
.../org/apache/solr/common/util/ObjectCache.java | 7 +-
7 files changed, 245 insertions(+), 58 deletions(-)
diff --git a/changelog/unreleased/SOLR-1092-parallelizebackups.yml
b/changelog/unreleased/SOLR-1092-parallelizebackups.yml
new file mode 100644
index 00000000000..84853f8bc39
--- /dev/null
+++ b/changelog/unreleased/SOLR-1092-parallelizebackups.yml
@@ -0,0 +1,10 @@
+title: Parallelize Backup and Restore File Operations
+type: changed
+authors:
+ - name: Samuel Verstraete
+ nick: elangelo
+ - name: David Smiley
+ nick: dsmiley
+links:
+ - name: SOLR-1092
+ url: https://issues.apache.org/jira/browse/SOLR-1092
diff --git
a/solr/core/src/java/org/apache/solr/handler/IncrementalShardBackup.java
b/solr/core/src/java/org/apache/solr/handler/IncrementalShardBackup.java
index 0e07ac0ca28..71ea48eac82 100644
--- a/solr/core/src/java/org/apache/solr/handler/IncrementalShardBackup.java
+++ b/solr/core/src/java/org/apache/solr/handler/IncrementalShardBackup.java
@@ -23,15 +23,25 @@ import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.net.URI;
import java.time.Instant;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
import java.util.Optional;
import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.math3.util.Precision;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.store.Directory;
import org.apache.solr.client.api.model.SolrJerseyResponse;
import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.common.SolrException;
+import org.apache.solr.common.util.EnvUtils;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.apache.solr.core.DirectoryFactory;
import org.apache.solr.core.IndexDeletionPolicyWrapper;
import org.apache.solr.core.SolrCore;
@@ -52,6 +62,15 @@ import org.slf4j.LoggerFactory;
*/
public class IncrementalShardBackup {
private static final Logger log =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ /**
+ * Maximum number of files to upload in parallel during backup. Can be
configured via the system
+ * property {@code solr.backup.maxparalleluploads} or environment variable
{@code
+ * SOLR_BACKUP_MAXPARALLELUPLOADS}.
+ */
+ private static final int MAX_PARALLEL_UPLOADS =
+ EnvUtils.getPropertyAsInteger("solr.backup.maxparalleluploads", 1);
+
private SolrCore solrCore;
private BackupFilePaths incBackupFiles;
@@ -154,8 +173,8 @@ public class IncrementalShardBackup {
solrCore.getSolrConfig().indexConfig.lockType);
try {
BackupStats stats = incrementalCopy(files, dir);
- details.indexFileCount = stats.fileCount;
- details.uploadedIndexFileCount = stats.uploadedFileCount;
+ details.indexFileCount = stats.fileCount.get();
+ details.uploadedIndexFileCount = stats.uploadedFileCount.get();
details.indexSizeMB = stats.getIndexSizeMB();
details.uploadedIndexFileMB = stats.getTotalUploadedMB();
} finally {
@@ -191,25 +210,77 @@ public class IncrementalShardBackup {
URI indexDir = incBackupFiles.getIndexDir();
BackupStats backupStats = new BackupStats();
+ var executor =
+ solrCore
+ .getCoreContainer()
+ .getObjectCache()
+ .computeIfAbsent(
+ "BackupUploadExecutor",
+ ExecutorService.class,
+ s ->
+ ExecutorUtil.newMDCAwareCachedThreadPool(
+ MAX_PARALLEL_UPLOADS,
+ Integer.MAX_VALUE,
+ new SolrNamedThreadFactory("BackupUploadExecutor")));
+
+ List<Future<?>> uploadFutures = new ArrayList<>();
for (String fileName : indexFiles) {
- Optional<ShardBackupMetadata.BackedFile> opBackedFile =
oldBackupPoint.getFile(fileName);
- Checksum originalFileCS = backupRepo.checksum(dir, fileName);
-
- if (opBackedFile.isPresent()) {
- ShardBackupMetadata.BackedFile backedFile = opBackedFile.get();
- Checksum existedFileCS = backedFile.fileChecksum;
- if (existedFileCS.equals(originalFileCS)) {
- currentBackupPoint.addBackedFile(opBackedFile.get());
- backupStats.skippedUploadingFile(existedFileCS);
- continue;
- }
- }
-
- String backedFileName = UUID.randomUUID().toString();
- backupRepo.copyIndexFileFrom(dir, fileName, indexDir, backedFileName);
+ // Capture variable for lambda
+ final String fileNameFinal = fileName;
+
+ Runnable uploadTask =
+ () -> {
+ try {
+ // Calculate checksum and check if file already exists in
previous backup
+ Optional<ShardBackupMetadata.BackedFile> opBackedFile =
+ oldBackupPoint.getFile(fileNameFinal);
+ Checksum originalFileCS = backupRepo.checksum(dir,
fileNameFinal);
+
+ if (opBackedFile.isPresent()) {
+ ShardBackupMetadata.BackedFile backedFile = opBackedFile.get();
+ Checksum existedFileCS = backedFile.fileChecksum;
+ if (existedFileCS.equals(originalFileCS)) {
+ synchronized (currentBackupPoint) {
+ currentBackupPoint.addBackedFile(opBackedFile.get());
+ }
+ backupStats.skippedUploadingFile(existedFileCS);
+ return;
+ }
+ }
+
+ // File doesn't exist or has changed - upload it
+ String backedFileName = UUID.randomUUID().toString();
+ backupRepo.copyIndexFileFrom(dir, fileNameFinal, indexDir,
backedFileName);
+
+ synchronized (currentBackupPoint) {
+ currentBackupPoint.addBackedFile(backedFileName,
fileNameFinal, originalFileCS);
+ }
+ backupStats.uploadedFile(originalFileCS);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to process file: " +
fileNameFinal, e);
+ }
+ };
+
+ uploadFutures.add(executor.submit(uploadTask));
+ }
- currentBackupPoint.addBackedFile(backedFileName, fileName,
originalFileCS);
- backupStats.uploadedFile(originalFileCS);
+ try {
+ for (Future<?> future : uploadFutures) {
+ future.get();
+ }
+ } catch (ExecutionException e) {
+ uploadFutures.forEach(f -> f.cancel(true));
+ Throwable cause = e.getCause();
+ switch (cause) {
+ case Error err -> throw err;
+ case IOException ioe -> throw ioe;
+ case RuntimeException re -> throw re;
+ default -> throw new SolrException(
+ SolrException.ErrorCode.UNKNOWN, "Error during parallel backup
upload", cause);
+ }
+ } catch (InterruptedException e) {
+ uploadFutures.forEach(f -> f.cancel(true));
+ throw new SolrException(SolrException.ErrorCode.UNKNOWN, "Backup
interrupted", e);
}
currentBackupPoint.store(backupRepo,
incBackupFiles.getShardBackupMetadataDir(), shardBackupId);
@@ -217,29 +288,29 @@ public class IncrementalShardBackup {
}
private static class BackupStats {
- private int fileCount;
- private int uploadedFileCount;
- private long indexSize;
- private long totalUploadedBytes;
+ private final AtomicInteger fileCount = new AtomicInteger();
+ private final AtomicInteger uploadedFileCount = new AtomicInteger();
+ private final AtomicLong indexSize = new AtomicLong();
+ private final AtomicLong totalUploadedBytes = new AtomicLong();
public void uploadedFile(Checksum file) {
- fileCount++;
- uploadedFileCount++;
- indexSize += file.size;
- totalUploadedBytes += file.size;
+ fileCount.incrementAndGet();
+ uploadedFileCount.incrementAndGet();
+ indexSize.addAndGet(file.size);
+ totalUploadedBytes.addAndGet(file.size);
}
public void skippedUploadingFile(Checksum existedFile) {
- fileCount++;
- indexSize += existedFile.size;
+ fileCount.incrementAndGet();
+ indexSize.addAndGet(existedFile.size);
}
public double getIndexSizeMB() {
- return Precision.round(indexSize / (1024.0 * 1024), 3);
+ return Precision.round(indexSize.get() / (1024.0 * 1024), 3);
}
public double getTotalUploadedMB() {
- return Precision.round(totalUploadedBytes / (1024.0 * 1024), 3);
+ return Precision.round(totalUploadedBytes.get() / (1024.0 * 1024), 3);
}
}
diff --git a/solr/core/src/java/org/apache/solr/handler/RestoreCore.java
b/solr/core/src/java/org/apache/solr/handler/RestoreCore.java
index 7f4abc18ffd..244a50e7827 100644
--- a/solr/core/src/java/org/apache/solr/handler/RestoreCore.java
+++ b/solr/core/src/java/org/apache/solr/handler/RestoreCore.java
@@ -21,19 +21,26 @@ import java.lang.invoke.MethodHandles;
import java.lang.reflect.Array;
import java.net.URI;
import java.text.SimpleDateFormat;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashSet;
+import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.solr.common.SolrException;
+import org.apache.solr.common.util.EnvUtils;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.apache.solr.core.DirectoryFactory;
import org.apache.solr.core.SolrCore;
import org.apache.solr.core.backup.BackupFilePaths;
@@ -48,6 +55,14 @@ public class RestoreCore implements Callable<Boolean> {
private static final Logger log =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ /**
+ * Maximum number of files to download in parallel during restore. Can be
configured via the
+ * system property {@code solr.backup.maxparalleldownloads} or environment
variable {@code
+ * SOLR_BACKUP_MAXPARALLELDOWNLOADS}.
+ */
+ private static final int MAX_PARALLEL_DOWNLOADS =
+ EnvUtils.getPropertyAsInteger("solr.backup.maxparalleldownloads", 1);
+
private final SolrCore core;
private RestoreRepository repository;
@@ -107,35 +122,85 @@ public class RestoreCore implements Callable<Boolean> {
DirectoryFactory.DirContext.DEFAULT,
core.getSolrConfig().indexConfig.lockType);
Set<String> indexDirFiles = new
HashSet<>(Arrays.asList(indexDir.listAll()));
+
+ // Capture directories as final for lambda access
+ final Directory finalIndexDir = indexDir;
+ final Directory finalRestoreIndexDir = restoreIndexDir;
+
+ List<Future<?>> downloadFutures = new ArrayList<>();
+
+ var executor =
+ core.getCoreContainer()
+ .getObjectCache()
+ .computeIfAbsent(
+ "RestoreDownloadExecutor",
+ ExecutorService.class,
+ s ->
+ ExecutorUtil.newMDCAwareCachedThreadPool(
+ MAX_PARALLEL_DOWNLOADS,
+ Integer.MAX_VALUE,
+ new
SolrNamedThreadFactory("RestoreDownloadExecutor")));
+
// Move all files from backupDir to restoreIndexDir
for (String filename : repository.listAllFiles()) {
checkInterrupted();
- try {
- if (indexDirFiles.contains(filename)) {
- Checksum cs = repository.checksum(filename);
- IndexFetcher.CompareResult compareResult;
- if (cs == null) {
- compareResult = new IndexFetcher.CompareResult();
- compareResult.equal = false;
- } else {
- compareResult = IndexFetcher.compareFile(indexDir, filename,
cs.size, cs.checksum);
- }
- if (!compareResult.equal
- || (IndexFetcher.filesToAlwaysDownloadIfNoChecksums(
- filename, cs.size, compareResult))) {
- repository.repoCopy(filename, restoreIndexDir);
- } else {
- // prefer local copy
- repository.localCopy(indexDir, filename, restoreIndexDir);
- }
- } else {
- repository.repoCopy(filename, restoreIndexDir);
- }
- } catch (Exception e) {
- log.warn("Exception while restoring the backup index ", e);
- throw new SolrException(
- SolrException.ErrorCode.UNKNOWN, "Exception while restoring the
backup index", e);
+
+ // Capture variables for lambda
+ final String filenameFinal = filename;
+ final boolean fileExistsLocally = indexDirFiles.contains(filename);
+
+ Runnable downloadTask =
+ () -> {
+ try {
+ if (fileExistsLocally) {
+ Checksum cs = repository.checksum(filenameFinal);
+ IndexFetcher.CompareResult compareResult;
+ if (cs == null) {
+ compareResult = new IndexFetcher.CompareResult();
+ compareResult.equal = false;
+ } else {
+ compareResult =
+ IndexFetcher.compareFile(
+ finalIndexDir, filenameFinal, cs.size,
cs.checksum);
+ }
+ if (!compareResult.equal
+ || (IndexFetcher.filesToAlwaysDownloadIfNoChecksums(
+ filenameFinal, cs.size, compareResult))) {
+ repository.repoCopy(filenameFinal, finalRestoreIndexDir);
+ } else {
+ // prefer local copy
+ repository.localCopy(finalIndexDir, filenameFinal,
finalRestoreIndexDir);
+ }
+ } else {
+ repository.repoCopy(filenameFinal, finalRestoreIndexDir);
+ }
+ } catch (Exception e) {
+ log.warn("Exception while restoring the backup index ", e);
+ throw new RuntimeException(
+ "Exception while restoring the backup index for file: " +
filenameFinal, e);
+ }
+ };
+
+ downloadFutures.add(executor.submit(downloadTask));
+ }
+
+ try {
+ for (Future<?> future : downloadFutures) {
+ future.get();
+ }
+ } catch (ExecutionException e) {
+ downloadFutures.forEach(f -> f.cancel(true));
+ Throwable cause = e.getCause();
+ switch (cause) {
+ case Error err -> throw err;
+ case IOException ioe -> throw ioe;
+ case RuntimeException re -> throw re;
+ default -> throw new SolrException(
+ SolrException.ErrorCode.UNKNOWN, "Error during parallel restore
download", cause);
}
+ } catch (InterruptedException e) {
+ downloadFutures.forEach(f -> f.cancel(true));
+ throw new SolrException(SolrException.ErrorCode.UNKNOWN, "Restore
interrupted", e);
}
log.debug("Switching directories");
core.modifyIndexProps(restoreIndexName);
diff --git
a/solr/modules/gcs-repository/src/test/org/apache/solr/gcs/GCSIncrementalBackupTest.java
b/solr/modules/gcs-repository/src/test/org/apache/solr/gcs/GCSIncrementalBackupTest.java
index d955da11e1e..62b9b8e7717 100644
---
a/solr/modules/gcs-repository/src/test/org/apache/solr/gcs/GCSIncrementalBackupTest.java
+++
b/solr/modules/gcs-repository/src/test/org/apache/solr/gcs/GCSIncrementalBackupTest.java
@@ -76,6 +76,9 @@ public class GCSIncrementalBackupTest extends
AbstractIncrementalBackupTest {
@BeforeClass
public static void setupClass() throws Exception {
+ // Enable parallel backup/restore for cloud storage tests
+ System.setProperty("solr.backup.maxparalleluploads", "2");
+ System.setProperty("solr.backup.maxparalleldownloads", "2");
configureCluster(NUM_NODES) // nodes
.addConfig("conf1", getFile("conf/solrconfig.xml").getParent())
diff --git
a/solr/modules/s3-repository/src/test/org/apache/solr/s3/S3IncrementalBackupTest.java
b/solr/modules/s3-repository/src/test/org/apache/solr/s3/S3IncrementalBackupTest.java
index c35dbc17ab0..8cebe4a80a5 100644
---
a/solr/modules/s3-repository/src/test/org/apache/solr/s3/S3IncrementalBackupTest.java
+++
b/solr/modules/s3-repository/src/test/org/apache/solr/s3/S3IncrementalBackupTest.java
@@ -97,6 +97,9 @@ public class S3IncrementalBackupTest extends
AbstractIncrementalBackupTest {
public static void setupClass() throws Exception {
System.setProperty("aws.accessKeyId", "foo");
System.setProperty("aws.secretAccessKey", "bar");
+ // Enable parallel backup/restore for cloud storage tests
+ System.setProperty("solr.backup.maxparalleluploads", "2");
+ System.setProperty("solr.backup.maxparalleldownloads", "2");
String retryMode;
switch (random().nextInt(3)) {
case 0:
diff --git
a/solr/solr-ref-guide/modules/deployment-guide/pages/backup-restore.adoc
b/solr/solr-ref-guide/modules/deployment-guide/pages/backup-restore.adoc
index 12297153e9c..7e9cdd35bf2 100644
--- a/solr/solr-ref-guide/modules/deployment-guide/pages/backup-restore.adoc
+++ b/solr/solr-ref-guide/modules/deployment-guide/pages/backup-restore.adoc
@@ -396,6 +396,38 @@ Any children under the `<repository>` tag are passed as
additional configuration
Information on each of the repository implementations provided with Solr is
provided below.
+=== Parallel File Transfers
+
+Backup and restore operations can transfer multiple index files in parallel to
improve throughput, especially when using cloud storage repositories like S3 or
GCS where latency is higher.
+The parallelism is controlled via system properties or environment variables:
+
+`solr.backup.maxparalleluploads`::
++
+[%autowidth,frame=none]
+|===
+|Optional |Default: `1`
+|===
++
+Maximum number of index files to upload in parallel during backup operations.
+Can also be set via the `SOLR_BACKUP_MAXPARALLELUPLOADS` environment variable.
+
+`solr.backup.maxparalleldownloads`::
++
+[%autowidth,frame=none]
+|===
+|Optional |Default: `1`
+|===
++
+Maximum number of index files to download in parallel during restore
operations.
+Can also be set via the `SOLR_BACKUP_MAXPARALLELDOWNLOADS` environment
variable.
+
+TIP: These are two independent thread pools — one for backup uploads and one
for restore downloads.
+Increasing either value can significantly improve throughput when using cloud
storage (S3, GCS), but too high a value will increase IOPS and bandwidth
pressure on your cluster.
+Start small and increase based on observed throughput and available resources.
+The configured limit applies across all concurrent backup or restore
operations on the node.
+
+=== Checksum Verification
+
By default, all the repository implementations verify the integrity of the
index files before they are copied to the destination. However, it is possible
to disable this integrity check by setting the optional configuration property
`verifyChecksum`.
`verifyChecksum`::
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/ObjectCache.java
b/solr/solrj/src/java/org/apache/solr/common/util/ObjectCache.java
index 2bbb6529c0f..096998b76c8 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/ObjectCache.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/ObjectCache.java
@@ -19,6 +19,7 @@ package org.apache.solr.common.util;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.apache.solr.common.SolrCloseable;
@@ -91,8 +92,10 @@ public class ObjectCache extends MapBackedCache<String,
Object> implements SolrC
// owns this ObjectCache, which is useful for plugins to register objects
// which should be closed before being garbage-collected.
for (Object value : map.values()) {
- if (value instanceof Closeable) {
- ((Closeable) value).close();
+ if (value instanceof Closeable closeable) {
+ closeable.close();
+ } else if (value instanceof ExecutorService executor) {
+ ExecutorUtil.shutdownAndAwaitTermination(executor);
}
}
map.clear();