This is an automated email from the ASF dual-hosted git repository.
mlbiscoc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/main by this push:
new 2041d1d0e6a SOLR-17756: Parallelize calculation of index fingerprint
across segments (#3352)
2041d1d0e6a is described below
commit 2041d1d0e6abfa3383980bf9f1ae8f543313c5e9
Author: Matthew Biscocho <[email protected]>
AuthorDate: Wed May 21 23:36:30 2025 -0400
SOLR-17756: Parallelize calculation of index fingerprint across segments
(#3352)
Parallelize index fingerprint computation across segments via a dedicated
thread pool
---
solr/CHANGES.txt | 2 +
.../java/org/apache/solr/core/CoreContainer.java | 15 ++++
.../org/apache/solr/search/SolrIndexSearcher.java | 32 ++++----
.../org/apache/solr/update/IndexFingerprint.java | 26 +++++++
.../solr/update/SolrIndexFingerprintTest.java | 86 ++++++++++++++++++++++
5 files changed, 142 insertions(+), 19 deletions(-)
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 539543cb257..48c1e2e2fcf 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -250,6 +250,8 @@ Optimizations
* SOLR-17669: Reduced memory usage in SolrJ getBeans() method when handling
dynamic fields with wildcards. (Martin Anzinger)
+* SOLR-17756: Parallelize index fingerprint computation across segments via a
dedicated thread pool (Matthew Biscocho, Luke Kot-Zaniewski)
+
Bug Fixes
---------------------
* SOLR-17629: If SQLHandler failed to open the underlying stream (e.g. Solr
returns an error; could be user/syntax problem),
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index bf0d56495d5..10d00987e6f 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -26,6 +26,7 @@ import static
org.apache.solr.common.params.CommonParams.INFO_HANDLER_PATH;
import static org.apache.solr.common.params.CommonParams.METRICS_PATH;
import static org.apache.solr.common.params.CommonParams.ZK_PATH;
import static org.apache.solr.common.params.CommonParams.ZK_STATUS_PATH;
+import static
org.apache.solr.search.SolrIndexSearcher.EXECUTOR_MAX_CPU_THREADS;
import static
org.apache.solr.security.AuthenticationPlugin.AUTHENTICATION_PLUGIN_PROP;
import com.github.benmanes.caffeine.cache.Interner;
@@ -187,6 +188,10 @@ public class CoreContainer {
return indexSearcherExecutor;
}
+ public ExecutorService getIndexFingerprintExecutor() {
+ return indexFingerprintExecutor;
+ }
+
public static class CoreLoadFailure {
public final CoreDescriptor cd;
@@ -292,6 +297,8 @@ public class CoreContainer {
private final ExecutorService indexSearcherExecutor;
+ private final ExecutorService indexFingerprintExecutor;
+
private final ClusterSingletons clusterSingletons =
new ClusterSingletons(
() ->
@@ -449,6 +456,12 @@ public class CoreContainer {
this.allowListUrlChecker = AllowListUrlChecker.create(config);
this.indexSearcherExecutor = SolrIndexSearcher.initCollectorExecutor(cfg);
+
+ this.indexFingerprintExecutor =
+ ExecutorUtil.newMDCAwareCachedThreadPool(
+ EXECUTOR_MAX_CPU_THREADS,
+ Integer.MAX_VALUE,
+ new SolrNamedThreadFactory("IndexFingerprintPool"));
}
@SuppressWarnings({"unchecked"})
@@ -678,6 +691,7 @@ public class CoreContainer {
allowPaths = null;
allowListUrlChecker = null;
indexSearcherExecutor = null;
+ indexFingerprintExecutor = null;
}
public static CoreContainer createAndLoad(Path solrHome) {
@@ -1293,6 +1307,7 @@ public class CoreContainer {
ExecutorUtil.shutdownAndAwaitTermination(coreContainerAsyncTaskExecutor);
ExecutorUtil.shutdownAndAwaitTermination(indexSearcherExecutor);
+ ExecutorUtil.shutdownNowAndAwaitTermination(indexFingerprintExecutor);
ExecutorService customThreadPool =
ExecutorUtil.newMDCAwareCachedThreadPool(new
SolrNamedThreadFactory("closeThreadPool"));
diff --git a/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java
b/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java
index e57a37a0593..a2009aff65d 100644
--- a/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java
+++ b/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java
@@ -32,11 +32,11 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -96,6 +96,7 @@ import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.CollectionUtil;
import org.apache.solr.common.util.EnvUtils;
+import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.ExecutorUtil.MDCAwareThreadPoolExecutor;
import org.apache.solr.common.util.ObjectReleaseTracker;
import org.apache.solr.common.util.SolrNamedThreadFactory;
@@ -2546,24 +2547,17 @@ public class SolrIndexSearcher extends IndexSearcher
implements Closeable, SolrI
*/
public IndexFingerprint getIndexFingerprint(long maxVersion) throws
IOException {
final SolrIndexSearcher searcher = this;
- final AtomicReference<IOException> exception = new AtomicReference<>();
- try {
- return searcher.getTopReaderContext().leaves().stream()
- .map(
- ctx -> {
- try {
- return searcher.getCore().getIndexFingerprint(searcher, ctx,
maxVersion);
- } catch (IOException e) {
- exception.set(e);
- return null;
- }
- })
- .filter(java.util.Objects::nonNull)
- .reduce(new IndexFingerprint(maxVersion), IndexFingerprint::reduce);
-
- } finally {
- if (exception.get() != null) throw exception.get();
- }
+ List<Callable<IndexFingerprint>> tasks =
+ searcher.getTopReaderContext().leaves().stream()
+ .map(
+ ctx ->
+ (Callable<IndexFingerprint>)
+ () -> searcher.getCore().getIndexFingerprint(searcher,
ctx, maxVersion))
+ .collect(Collectors.toList());
+ return ExecutorUtil.submitAllAndAwaitAggregatingExceptions(
+ core.getCoreContainer().getIndexFingerprintExecutor(), tasks)
+ .stream()
+ .reduce(new IndexFingerprint(maxVersion), IndexFingerprint::reduce);
}
/////////////////////////////////////////////////////////////////////
diff --git a/solr/core/src/java/org/apache/solr/update/IndexFingerprint.java
b/solr/core/src/java/org/apache/solr/update/IndexFingerprint.java
index be252ba3ac2..8a62ce299b1 100644
--- a/solr/core/src/java/org/apache/solr/update/IndexFingerprint.java
+++ b/solr/core/src/java/org/apache/solr/update/IndexFingerprint.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.Objects;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.queries.function.FunctionValues;
import org.apache.lucene.queries.function.ValueSource;
@@ -219,4 +220,29 @@ public class IndexFingerprint implements MapSerializable {
public String toString() {
return toMap(new LinkedHashMap<>()).toString();
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof IndexFingerprint other)) return false;
+ return maxVersionSpecified == other.maxVersionSpecified
+ && this.maxVersionEncountered == other.maxVersionEncountered
+ && this.maxInHash == other.maxInHash
+ && this.versionsHash == other.versionsHash
+ && this.numVersions == other.numVersions
+ && this.numDocs == other.numDocs
+ && this.maxDoc == other.maxDoc;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ maxVersionSpecified,
+ maxVersionEncountered,
+ maxInHash,
+ versionsHash,
+ numVersions,
+ numDocs,
+ maxDoc);
+ }
}
diff --git
a/solr/core/src/test/org/apache/solr/update/SolrIndexFingerprintTest.java
b/solr/core/src/test/org/apache/solr/update/SolrIndexFingerprintTest.java
new file mode 100644
index 00000000000..a2c7897e82e
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/update/SolrIndexFingerprintTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.update;
+
+import java.io.IOException;
+import org.apache.lucene.index.FilterLeafReader;
+import org.apache.lucene.index.LeafReader;
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.core.SolrCore;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class SolrIndexFingerprintTest extends SolrTestCaseJ4 {
+
+ @BeforeClass
+ public static void beforeTests() throws Exception {
+ initCore("solrconfig.xml", "schema.xml");
+ }
+
+ @Test
+ public void testSequentialVsParallelFingerprint() throws Exception {
+ long maxVersion = Long.MAX_VALUE;
+ SolrCore core = h.getCore();
+
+ // Create a set of 3 segments
+ assertU(adoc("id", "101"));
+ assertU(adoc("id", "102"));
+ assertU(adoc("id", "103"));
+ assertU(commit());
+
+ assertU(adoc("id", "104"));
+ assertU(adoc("id", "105"));
+ assertU(adoc("id", "106"));
+ assertU(commit());
+
+ assertU(adoc("id", "107"));
+ assertU(adoc("id", "108"));
+ assertU(adoc("id", "109"));
+ assertU(commit());
+
+ try (var searcher = core.getSearcher().get()) {
+ // Compute fingerprint sequentially to compare with parallel computation
+ IndexFingerprint expectedFingerprint =
+ searcher.getTopReaderContext().leaves().stream()
+ .map(
+ ctx -> {
+ try {
+ LeafReader noCacheLeafReader =
+ new FilterLeafReader(ctx.reader()) {
+ @Override
+ public CacheHelper getReaderCacheHelper() {
+ return null;
+ }
+
+ @Override
+ public CacheHelper getCoreCacheHelper() {
+ return null;
+ }
+ };
+ return core.getIndexFingerprint(
+ searcher, noCacheLeafReader.getContext(),
maxVersion);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .reduce(new IndexFingerprint(maxVersion),
IndexFingerprint::reduce);
+
+ IndexFingerprint actualFingerprint =
searcher.getIndexFingerprint(maxVersion);
+ assertEquals(expectedFingerprint, actualFingerprint);
+ }
+ }
+}