elangelo commented on code in PR #4023:
URL: https://github.com/apache/solr/pull/4023#discussion_r2917286043
##########
solr/core/src/java/org/apache/solr/handler/IncrementalShardBackup.java:
##########
@@ -191,55 +213,160 @@ private BackupStats incrementalCopy(Collection<String>
indexFiles, Directory dir
URI indexDir = incBackupFiles.getIndexDir();
BackupStats backupStats = new BackupStats();
- for (String fileName : indexFiles) {
- Optional<ShardBackupMetadata.BackedFile> opBackedFile =
oldBackupPoint.getFile(fileName);
- Checksum originalFileCS = backupRepo.checksum(dir, fileName);
-
- if (opBackedFile.isPresent()) {
- ShardBackupMetadata.BackedFile backedFile = opBackedFile.get();
- Checksum existedFileCS = backedFile.fileChecksum;
- if (existedFileCS.equals(originalFileCS)) {
- currentBackupPoint.addBackedFile(opBackedFile.get());
- backupStats.skippedUploadingFile(existedFileCS);
- continue;
+ // Only use an executor for parallel uploads when parallelism > 1
+ // When set to 1, run synchronously to avoid thread-local state issues
with CallerRunsPolicy
+ int maxParallelUploads = DEFAULT_MAX_PARALLEL_UPLOADS;
+ ExecutorService executor =
+ maxParallelUploads > 1
+ ? new ExecutorUtil.MDCAwareThreadPoolExecutor(
+ 0,
+ maxParallelUploads,
+ 60L,
+ TimeUnit.SECONDS,
+ new SynchronousQueue<>(),
+ new SolrNamedThreadFactory("IncrementalBackup"),
+ new ThreadPoolExecutor.CallerRunsPolicy())
+ : null;
+
+ List<Future<?>> uploadFutures = new ArrayList<>();
+
+ try {
Review Comment:
this really doesn't hold. we need to wait for all futures anyway, so storing
them in a list is what we need to do here
##########
solr/core/src/java/org/apache/solr/handler/IncrementalShardBackup.java:
##########
@@ -191,55 +213,160 @@ private BackupStats incrementalCopy(Collection<String>
indexFiles, Directory dir
URI indexDir = incBackupFiles.getIndexDir();
BackupStats backupStats = new BackupStats();
- for (String fileName : indexFiles) {
- Optional<ShardBackupMetadata.BackedFile> opBackedFile =
oldBackupPoint.getFile(fileName);
- Checksum originalFileCS = backupRepo.checksum(dir, fileName);
-
- if (opBackedFile.isPresent()) {
- ShardBackupMetadata.BackedFile backedFile = opBackedFile.get();
- Checksum existedFileCS = backedFile.fileChecksum;
- if (existedFileCS.equals(originalFileCS)) {
- currentBackupPoint.addBackedFile(opBackedFile.get());
- backupStats.skippedUploadingFile(existedFileCS);
- continue;
+ // Only use an executor for parallel uploads when parallelism > 1
+ // When set to 1, run synchronously to avoid thread-local state issues
with CallerRunsPolicy
+ int maxParallelUploads = DEFAULT_MAX_PARALLEL_UPLOADS;
+ ExecutorService executor =
+ maxParallelUploads > 1
+ ? new ExecutorUtil.MDCAwareThreadPoolExecutor(
+ 0,
+ maxParallelUploads,
+ 60L,
+ TimeUnit.SECONDS,
+ new SynchronousQueue<>(),
+ new SolrNamedThreadFactory("IncrementalBackup"),
+ new ThreadPoolExecutor.CallerRunsPolicy())
+ : null;
+
+ List<Future<?>> uploadFutures = new ArrayList<>();
+
+ try {
+ for (String fileName : indexFiles) {
+ // Capture variable for lambda
+ final String fileNameFinal = fileName;
+
+ Runnable uploadTask =
+ () -> {
+ try {
+ // Calculate checksum and check if file already exists in
previous backup
+ Optional<ShardBackupMetadata.BackedFile> opBackedFile =
+ oldBackupPoint.getFile(fileNameFinal);
+ Checksum originalFileCS = backupRepo.checksum(dir,
fileNameFinal);
+
+ if (opBackedFile.isPresent()) {
+ ShardBackupMetadata.BackedFile backedFile =
opBackedFile.get();
+ Checksum existedFileCS = backedFile.fileChecksum;
+ if (existedFileCS.equals(originalFileCS)) {
+ synchronized (currentBackupPoint) {
+ currentBackupPoint.addBackedFile(opBackedFile.get());
+ }
+ backupStats.skippedUploadingFile(existedFileCS);
+ return;
+ }
+ }
+
+ // File doesn't exist or has changed - upload it
+ String backedFileName = UUID.randomUUID().toString();
+ backupRepo.copyIndexFileFrom(dir, fileNameFinal, indexDir,
backedFileName);
+
+ synchronized (currentBackupPoint) {
+ currentBackupPoint.addBackedFile(backedFileName,
fileNameFinal, originalFileCS);
+ }
+ backupStats.uploadedFile(originalFileCS);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to process file: " +
fileNameFinal, e);
+ }
+ };
+
+ if (executor != null) {
+ uploadFutures.add(executor.submit(uploadTask));
+ } else {
+ // Run synchronously when parallelism is 1
+ try {
+ uploadTask.run();
+ } catch (RuntimeException e) {
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ }
+ throw e;
Review Comment:
fixed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]