dsmiley commented on code in PR #3851:
URL: https://github.com/apache/solr/pull/3851#discussion_r2508365394
##########
solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java:
##########
@@ -117,7 +119,12 @@ public abstract class CloudSolrClient extends SolrClient {
// UpdateParams.ROLLBACK
);
- protected volatile Object[] locks = objectList(3);
+ private final ConcurrentHashMap<String, CompletableFuture<DocCollection>>
collectionRefreshes =
+ new ConcurrentHashMap<>();
+ private final Object stateRefreshExecutorLock = new Object();
+ private volatile int stateRefreshParallelism =
DEFAULT_STATE_REFRESH_PARALLELISM;
+ private volatile ExecutorService stateRefreshExecutor;
Review Comment:
I find it disappointing to see yet another ExecutorService (ThreadPool) for
this use-case, so close to a a place where we can call
HttpSolrClientBase.requestAsync, which uses an existing threadpool and
non-blocking IO. The parallelism can be controlled with a semaphore.
##########
solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java:
##########
@@ -117,7 +119,12 @@ public abstract class CloudSolrClient extends SolrClient {
// UpdateParams.ROLLBACK
);
- protected volatile Object[] locks = objectList(3);
+ private final ConcurrentHashMap<String, CompletableFuture<DocCollection>>
collectionRefreshes =
Review Comment:
Instead of creating an additional cache, I think it would be better to
integrate into StateCache, which is already collection-keyed, already has
eviction of old stuff.
##########
solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java:
##########
@@ -902,16 +975,56 @@ protected NamedList<Object> requestWithRetryOnStaleState(
}
}
- if (requestedCollections != null) {
- requestedCollections.clear(); // done with this
- }
-
// if the state was stale, then we retry the request once with new state
pulled from Zk
if (stateWasStale) {
log.warn(
"Re-trying request to collection(s) {} after stale state error
from server.",
inputCollections);
- resp = requestWithRetryOnStaleState(request, retryCount + 1,
inputCollections);
+
+ Map<String, CompletableFuture<DocCollection>> refreshesToWaitFor =
pendingRefreshes;
+ if (!waitedForRefresh && (pendingRefreshes == null ||
pendingRefreshes.isEmpty())) {
+ refreshesToWaitFor = new HashMap<>();
+ for (DocCollection ext : requestedCollections) {
+ refreshesToWaitFor.put(ext.getName(),
triggerCollectionRefresh(ext.getName()));
+ }
+ }
+
+ // First retry without sending state versions to avoid needless waits
when stale state is
+ // still usable.
+ if (!skipStateVersion && !waitedForRefresh) {
+ resp =
+ requestWithRetryOnStaleState(
+ request,
+ retryCount + 1,
+ inputCollections,
+ /*skipStateVersion*/ true,
+ refreshesToWaitFor,
+ waitedForRefresh);
Review Comment:
I'd prefer like you do elsewhere: `/*waitedForRefresh*/ true`
##########
solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java:
##########
@@ -902,16 +975,56 @@ protected NamedList<Object> requestWithRetryOnStaleState(
}
}
- if (requestedCollections != null) {
- requestedCollections.clear(); // done with this
- }
-
// if the state was stale, then we retry the request once with new state
pulled from Zk
if (stateWasStale) {
log.warn(
"Re-trying request to collection(s) {} after stale state error
from server.",
inputCollections);
- resp = requestWithRetryOnStaleState(request, retryCount + 1,
inputCollections);
+
+ Map<String, CompletableFuture<DocCollection>> refreshesToWaitFor =
pendingRefreshes;
+ if (!waitedForRefresh && (pendingRefreshes == null ||
pendingRefreshes.isEmpty())) {
+ refreshesToWaitFor = new HashMap<>();
+ for (DocCollection ext : requestedCollections) {
+ refreshesToWaitFor.put(ext.getName(),
triggerCollectionRefresh(ext.getName()));
+ }
+ }
+
+ // First retry without sending state versions to avoid needless waits
when stale state is
Review Comment:
Could you elaborate why "sending state versions" (I assume `\_stateVer\_`)
is a problem?
##########
solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java:
##########
@@ -195,14 +206,39 @@ boolean shouldRetry() {
void setRetriedAt() {
retriedAtNano = System.nanoTime();
}
+
+ boolean markMaybeStaleIfOutsideBackoff(long retryBackoffNano) {
Review Comment:
a bit of javadocs would be helpful. Especially to document the return
meaning.
##########
solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java:
##########
@@ -1135,51 +1252,129 @@ public boolean isDirectUpdatesToLeadersOnly() {
return directUpdatesToLeadersOnly;
}
- protected static Object[] objectList(int n) {
- Object[] l = new Object[n];
- for (int i = 0; i < n; i++) {
- l[i] = new Object();
+ protected DocCollection getDocCollection(String collection, Integer
expectedVersion)
+ throws SolrException {
+ if (expectedVersion == null) {
+ expectedVersion = -1;
}
- return l;
+ if (collection == null) {
+ return null;
+ }
+
+ ExpiringCachedDocCollection cacheEntry =
collectionStateCache.peek(collection);
+ if (cacheEntry != null &&
cacheEntry.isExpired(collectionStateCache.timeToLiveMs)) {
+ collectionStateCache.remove(collection, cacheEntry);
+ cacheEntry = null;
+ }
+
+ DocCollection cached = cacheEntry == null ? null : cacheEntry.cached;
+
+ if (cacheEntry != null && cacheEntry.shouldRetry()) {
+ triggerCollectionRefresh(collection);
+ }
+
+ if (cached != null && expectedVersion <= cached.getZNodeVersion()) {
+ return cached;
+ }
+
+ CompletableFuture<DocCollection> refreshFuture =
triggerCollectionRefresh(collection);
+ return waitForCollectionRefresh(collection, refreshFuture);
}
- protected DocCollection getDocCollection(String collection, Integer
expectedVersion)
- throws SolrException {
- if (expectedVersion == null) expectedVersion = -1;
- if (collection == null) return null;
- ExpiringCachedDocCollection cacheEntry =
collectionStateCache.get(collection);
- DocCollection col = cacheEntry == null ? null : cacheEntry.cached;
- if (col != null) {
- if (expectedVersion <= col.getZNodeVersion() &&
!cacheEntry.shouldRetry()) return col;
+ private CompletableFuture<DocCollection> triggerCollectionRefresh(String
collection) {
+ if (closed) {
+ ExpiringCachedDocCollection cacheEntry =
collectionStateCache.peek(collection);
+ DocCollection cached = cacheEntry == null ? null : cacheEntry.cached;
+ return CompletableFuture.completedFuture(cached);
}
+ return collectionRefreshes.computeIfAbsent(
+ collection,
+ key -> {
+ ExecutorService executor = stateRefreshExecutor;
+ CompletableFuture<DocCollection> future;
+ if (executor == null || ExecutorUtil.isShutdown(executor)) {
+ future = new CompletableFuture<>();
+ try {
+ future.complete(loadDocCollection(key));
+ } catch (Throwable t) {
+ future.completeExceptionally(t);
+ }
+ } else {
+ future = CompletableFuture.supplyAsync(() ->
loadDocCollection(key), executor);
+ }
+ future.whenComplete(
Review Comment:
it could happen that `whenComplete` is invoked by the caller thread if the
future has already completed. I think. The lambda you pass in will manipulate
`collectionRefreshes`, which violates the rules of its compute methods on a
ConcurrentHashMap.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]