This is an automated email from the ASF dual-hosted git repository.
dsmiley pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/main by this push:
new 5c9cab2901a SOLR-17982: setForcedDistrib & SearchHandler refactor
(#3931)
5c9cab2901a is described below
commit 5c9cab2901a88385a09ca2a17821e41f062731c3
Author: David Smiley <[email protected]>
AuthorDate: Tue Dec 16 23:56:34 2025 -0500
SOLR-17982: setForcedDistrib & SearchHandler refactor (#3931)
Provide an internal API to force distributed search (even when one shard).
Also, refactor/reorganize SearchHandler for clarity & extensibility.
---
...LR-17982-refactorSearchHandlerForcedDistrib.yml | 9 +
.../solr/handler/component/HttpShardHandler.java | 8 +-
.../solr/handler/component/ResponseBuilder.java | 10 +
.../solr/handler/component/SearchHandler.java | 623 +++++++++++----------
.../java/org/apache/solr/servlet/HttpSolrCall.java | 14 +
.../apache/solr/BaseDistributedSearchTestCase.java | 11 +-
6 files changed, 372 insertions(+), 303 deletions(-)
diff --git
a/changelog/unreleased/SOLR-17982-refactorSearchHandlerForcedDistrib.yml
b/changelog/unreleased/SOLR-17982-refactorSearchHandlerForcedDistrib.yml
new file mode 100644
index 00000000000..dd330bd13f4
--- /dev/null
+++ b/changelog/unreleased/SOLR-17982-refactorSearchHandlerForcedDistrib.yml
@@ -0,0 +1,9 @@
+# See https://github.com/apache/solr/blob/main/dev-docs/changelog.adoc
+title: Provide an internal API to force distributed search (even when one
shard). Also, refactor/reorganize SearchHandler for clarity & extensibility.
+type: other # added, changed, fixed, deprecated, removed, dependency_update,
security, other
+authors:
+ - name: David Smiley
+ - name: Sonu Sharma
+links:
+ - name: SOLR-17982
+ url: https://issues.apache.org/jira/browse/SOLR-17982
diff --git
a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
index 788d985a79e..df0f39ccea5 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
@@ -405,7 +405,7 @@ public class HttpShardHandler extends ShardHandler {
public void prepDistributed(ResponseBuilder rb) {
final SolrQueryRequest req = rb.req;
final SolrParams params = req.getParams();
- final String shards = params.get(ShardParams.SHARDS);
+ String shards = params.get(ShardParams.SHARDS);
CoreDescriptor coreDescriptor = req.getCore().getCoreDescriptor();
CloudDescriptor cloudDescriptor = req.getCloudDescriptor();
@@ -444,7 +444,7 @@ public class HttpShardHandler extends ShardHandler {
.build();
rb.slices = replicaSource.getSliceNames().toArray(new
String[replicaSource.getSliceCount()]);
- if (canShortCircuit(rb.slices, onlyNrt, params, cloudDescriptor)) {
+ if (!rb.isForcedDistrib() && canShortCircuit(rb.slices, onlyNrt, params,
cloudDescriptor)) {
rb.isDistrib = false;
rb.shortCircuitedURL =
ZkCoreNodeProps.getCoreUrl(zkController.getBaseUrl(),
coreDescriptor.getName());
@@ -476,6 +476,9 @@ public class HttpShardHandler extends ShardHandler {
}
}
} else {
+ if (shards == null) {
+ shards = req.getHttpSolrCall().getThisNodeUrl() + "/" +
req.getCore().getName();
+ }
replicaSource =
new StandaloneReplicaSource.Builder()
.allowListUrlChecker(urlChecker)
@@ -503,6 +506,7 @@ public class HttpShardHandler extends ShardHandler {
return String.join("|", shardUrls);
}
+ /** Can we avoid distributed search / coordinator? */
private boolean canShortCircuit(
String[] slices,
boolean onlyNrtReplicas,
diff --git
a/solr/core/src/java/org/apache/solr/handler/component/ResponseBuilder.java
b/solr/core/src/java/org/apache/solr/handler/component/ResponseBuilder.java
index 8cf3f4b9310..596e9edddd0 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/ResponseBuilder.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/ResponseBuilder.java
@@ -168,6 +168,16 @@ public class ResponseBuilder {
public List<ShardRequest> outgoing; // requests to be sent
public List<ShardRequest> finished; // requests that have received responses
from all shards
public String shortCircuitedURL;
+ private boolean forcedDistrib = false;
+
+ public boolean isForcedDistrib() {
+ return forcedDistrib;
+ }
+
+ /** Force distributed-search / coordinator logic when not a sub-shard
request. */
+ public void setForcedDistrib(boolean forcedDistrib) {
+ this.forcedDistrib = forcedDistrib;
+ }
/** This function will return true if this was a distributed search request.
*/
public boolean isDistributed() {
diff --git
a/solr/core/src/java/org/apache/solr/handler/component/SearchHandler.java
b/solr/core/src/java/org/apache/solr/handler/component/SearchHandler.java
index 9c978be34b4..9811ba618db 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/SearchHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/SearchHandler.java
@@ -241,6 +241,30 @@ public class SearchHandler extends RequestHandlerBase
}
}
+ @Override
+ public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp)
throws Exception {
+ List<SearchComponent> components = getComponents();
+ ResponseBuilder rb = newResponseBuilder(req, rsp, components);
+ if (rb.requestInfo != null) {
+ rb.requestInfo.setResponseBuilder(rb);
+ }
+
+ rb.isDistrib = isDistrib(req, rb); // can change later nonetheless
+ tagRequestWithRequestId(rb);
+
+ boolean dbg = req.getParams().getBool(CommonParams.DEBUG_QUERY, false);
+ rb.setDebug(dbg);
+ if (dbg == false) { // if it's true, we are doing everything anyway.
+
SolrPluginUtils.getDebugInterests(req.getParams().getParams(CommonParams.DEBUG),
rb);
+ }
+
+ if (checkCircuitBreakers(req, rsp, rb)) {
+ return; // Circuit breaker tripped, return immediately
+ }
+
+ processComponents(req, rsp, rb, components);
+ }
+
@SuppressWarnings({"unchecked"})
private void initComponents() {
Object declaredComponents = initArgs.get(INIT_COMPONENTS);
@@ -304,9 +328,9 @@ public class SearchHandler extends RequestHandlerBase
return result;
}
- private boolean isDistrib(SolrQueryRequest req) {
- boolean isZkAware = req.getCoreContainer().isZooKeeperAware();
- boolean isDistrib = req.getParams().getBool(DISTRIB, isZkAware);
+ protected boolean isDistrib(SolrQueryRequest req, ResponseBuilder rb) {
+ boolean theDefault = req.getCoreContainer().isZooKeeperAware() ||
rb.isForcedDistrib();
+ boolean isDistrib = req.getParams().getBool(DISTRIB, theDefault);
if (!isDistrib) {
// for back compat, a shards param with URLs like localhost:8983/solr
will mean that this
// search is distributed.
@@ -316,7 +340,7 @@ public class SearchHandler extends RequestHandlerBase
return isDistrib;
}
- public ShardHandler getAndPrepShardHandler(SolrQueryRequest req,
ResponseBuilder rb) {
+ protected ShardHandler getAndPrepShardHandler(SolrQueryRequest req,
ResponseBuilder rb) {
ShardHandler shardHandler = null;
CoreContainer cc = req.getCoreContainer();
@@ -395,354 +419,363 @@ public class SearchHandler extends RequestHandlerBase
return false;
}
- @Override
- public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp)
throws Exception {
- List<SearchComponent> components = getComponents();
- ResponseBuilder rb = newResponseBuilder(req, rsp, components);
- if (rb.requestInfo != null) {
- rb.requestInfo.setResponseBuilder(rb);
- }
-
- rb.isDistrib = isDistrib(req);
- tagRequestWithRequestId(rb);
+ protected void processComponents(
+ SolrQueryRequest req,
+ SolrQueryResponse rsp,
+ ResponseBuilder rb,
+ List<SearchComponent> components)
+ throws IOException {
+ final RTimerTree timer = rb.isDebug() ? req.getRequestTimer() : null;
+ // return a ShardHandler only if doing distributed search (equivalent to
rb.isDistrib)
+ final ShardHandler shardHandler = getAndPrepShardHandler(req, rb);
- boolean dbg = req.getParams().getBool(CommonParams.DEBUG_QUERY, false);
- rb.setDebug(dbg);
- if (dbg == false) { // if it's true, we are doing everything anyway.
-
SolrPluginUtils.getDebugInterests(req.getParams().getParams(CommonParams.DEBUG),
rb);
- }
+ if (!prepareComponents(req, rb, timer, components)) return;
- final RTimerTree timer = rb.isDebug() ? req.getRequestTimer() : null;
+ postPrepareComponents(rb);
- if (checkCircuitBreakers(req, rsp, rb)) {
- return; // Circuit breaker tripped, return immediately
+ if (shardHandler == null) {
+ processComponentsLocal(req, rsp, rb, timer, components);
+ } else {
+ processComponentsDistrib(req, rsp, rb, timer, components, shardHandler);
}
+ }
- processComponents(req, rsp, rb, timer, components);
-
- // SOLR-5550: still provide shards.info if requested even for a
short-circuited distrib request
- if (!rb.isDistrib
- && req.getParams().getBool(ShardParams.SHARDS_INFO, false)
- && rb.shortCircuitedURL != null) {
- NamedList<Object> shardInfo = new SimpleOrderedMap<>();
- SimpleOrderedMap<Object> nl = new SimpleOrderedMap<>();
- if (rsp.getException() != null) {
- Throwable cause = rsp.getException();
- if (cause instanceof SolrServerException) {
- cause = ((SolrServerException) cause).getRootCause();
- } else {
- if (cause.getCause() != null) {
- cause = cause.getCause();
- }
+ private static boolean prepareComponents(
+ SolrQueryRequest req, ResponseBuilder rb, RTimerTree timer,
List<SearchComponent> components)
+ throws IOException {
+ if (timer == null) {
+ // non-debugging prepare phase
+ for (SearchComponent component : components) {
+ if (checkLimitsBefore(component, "prepare", rb.req, rb.rsp,
components)) {
+ shortCircuitedResults(req, rb);
+ return false;
}
- nl.add("error", cause.toString());
- if (!core.getCoreContainer().hideStackTrace()) {
- StringWriter trace = new StringWriter();
- cause.printStackTrace(new PrintWriter(trace));
- nl.add("trace", trace.toString());
+ component.prepare(rb);
+ }
+ } else {
+ // debugging prepare phase
+ RTimerTree subt = timer.sub("prepare");
+ for (SearchComponent c : components) {
+ if (checkLimitsBefore(c, "prepare debug", rb.req, rb.rsp, components))
{
+ shortCircuitedResults(req, rb);
+ return false;
}
- } else if (rb.getResults() != null) {
- nl.add("numFound", rb.getResults().docList.matches());
- nl.add(
- "numFoundExact",
- rb.getResults().docList.hitCountRelation() ==
TotalHits.Relation.EQUAL_TO);
- nl.add("maxScore", rb.getResults().docList.maxScore());
+ rb.setTimer(subt.sub(c.getName()));
+ c.prepare(rb);
+ rb.getTimer().stop();
}
- nl.add("shardAddress", rb.shortCircuitedURL);
- nl.add("time", req.getRequestTimer().getTime()); // elapsed time of this
request so far
+ subt.stop();
+ }
+ return true;
+ }
- int pos = rb.shortCircuitedURL.indexOf("://");
- String shardInfoName =
- pos != -1 ? rb.shortCircuitedURL.substring(pos + 3) :
rb.shortCircuitedURL;
- shardInfo.add(shardInfoName, nl);
- rsp.getValues().add(ShardParams.SHARDS_INFO, shardInfo);
+ /**
+ * Called after {@link #prepareComponents(SolrQueryRequest, ResponseBuilder,
RTimerTree, List)}
+ */
+ protected void postPrepareComponents(ResponseBuilder rb) {
+ // Once all of our components have been prepared, check if this request
involves a SortSpec.
+ // If it does, and if our request includes a cursorMark param, then parse
& init the
+ // CursorMark state (This must happen after the prepare() of all
components, because any
+ // component may have modified the SortSpec)
+ final SortSpec spec = rb.getSortSpec();
+ final String cursorStr =
rb.req.getParams().get(CursorMarkParams.CURSOR_MARK_PARAM);
+ if (null != spec && null != cursorStr) {
+ final CursorMark cursorMark = new CursorMark(rb.req.getSchema(), spec);
+ cursorMark.parseSerializedTotem(cursorStr);
+ rb.setCursorMark(cursorMark);
}
}
- private void processComponents(
+ /** Local request processing (not distributed). */
+ protected void processComponentsLocal(
SolrQueryRequest req,
SolrQueryResponse rsp,
ResponseBuilder rb,
RTimerTree timer,
List<SearchComponent> components)
throws IOException {
- // creates a ShardHandler object only if it's needed
- final ShardHandler shardHandler1 = getAndPrepShardHandler(req, rb);
-
- if (!prepareComponents(req, rb, timer, components)) return;
-
- { // Once all of our components have been prepared, check if this request
involves a SortSpec.
- // If it does, and if our request includes a cursorMark param, then
parse & init the
- // CursorMark state (This must happen after the prepare() of all
components, because any
- // component may have modified the SortSpec)
- final SortSpec spec = rb.getSortSpec();
- final String cursorStr =
rb.req.getParams().get(CursorMarkParams.CURSOR_MARK_PARAM);
- if (null != spec && null != cursorStr) {
- final CursorMark cursorMark = new CursorMark(rb.req.getSchema(), spec);
- cursorMark.parseSerializedTotem(cursorStr);
- rb.setCursorMark(cursorMark);
- }
- }
-
- if (!rb.isDistrib) {
- // a normal non-distributed request
- try {
- // The semantics of debugging vs not debugging are different enough
that
- // it makes sense to have two control loops
- if (!rb.isDebug()) {
- // Process
- for (SearchComponent c : components) {
- if (checkLimitsBefore(c, "process", rb.req, rb.rsp, components)) {
- shortCircuitedResults(req, rb);
- return;
- }
- c.process(rb);
+ // a normal non-distributed request
+ assert !rb.isDistrib;
+ try {
+ // The semantics of debugging vs not debugging are different enough that
+ // it makes sense to have two control loops
+ if (!rb.isDebug()) {
+ // Process
+ for (SearchComponent c : components) {
+ if (checkLimitsBefore(c, "process", rb.req, rb.rsp, components)) {
+ shortCircuitedResults(req, rb);
+ return;
}
- } else {
- // Process
- RTimerTree subt = timer.sub("process");
- for (SearchComponent c : components) {
- if (checkLimitsBefore(c, "process debug", rb.req, rb.rsp,
components)) {
- shortCircuitedResults(req, rb);
- return;
- }
- rb.setTimer(subt.sub(c.getName()));
- c.process(rb);
- rb.getTimer().stop();
+ c.process(rb);
+ }
+ } else {
+ // Process
+ RTimerTree subt = timer.sub("process");
+ for (SearchComponent c : components) {
+ if (checkLimitsBefore(c, "process debug", rb.req, rb.rsp,
components)) {
+ shortCircuitedResults(req, rb);
+ return;
}
- subt.stop();
+ rb.setTimer(subt.sub(c.getName()));
+ c.process(rb);
+ rb.getTimer().stop();
+ }
+ subt.stop();
- // add the timing info
- if (rb.isDebugTimings()) {
- rb.addDebugInfo("timing", timer.asNamedList());
- }
+ // add the timing info
+ if (rb.isDebugTimings()) {
+ rb.addDebugInfo("timing", timer.asNamedList());
}
- } catch (ExitableDirectoryReader.ExitingReaderException ex) {
- log.warn("Query terminated: {}; ", req.getParamString(), ex);
- shortCircuitedResults(req, rb);
}
- } else {
- // a distributed request
+ } catch (ExitableDirectoryReader.ExitingReaderException ex) {
+ log.warn("Query terminated: {}; ", req.getParamString(), ex);
+ shortCircuitedResults(req, rb);
+ }
- if (rb.outgoing == null) {
- rb.outgoing = new ArrayList<>();
- }
- rb.finished = new ArrayList<>();
+ fillShardsInfoShortCircuited(req, rsp, rb);
+ }
+
+ /** Distributed request processing (AKA coordinator). */
+ protected void processComponentsDistrib(
+ SolrQueryRequest req,
+ SolrQueryResponse rsp,
+ ResponseBuilder rb,
+ RTimerTree timer,
+ List<SearchComponent> components,
+ ShardHandler shardHandler)
+ throws IOException {
+ assert rb.isDistrib;
+ if (rb.outgoing == null) {
+ rb.outgoing = new ArrayList<>();
+ }
+ rb.finished = new ArrayList<>();
- int nextStage = 0;
- long totalShardCpuTime = 0L;
- do {
- rb.setStage(nextStage);
- nextStage = ResponseBuilder.STAGE_DONE;
+ int nextStage = 0;
+ long totalShardCpuTime = 0L;
+ do {
+ rb.setStage(nextStage);
+ nextStage = ResponseBuilder.STAGE_DONE;
- // call all components
- for (SearchComponent c : components) {
- if (checkLimitsBefore(c, "distrib", rb.req, rb.rsp, components)) {
- shortCircuitedResults(req, rb);
- return;
- } // the next stage is the minimum of what all components report
- nextStage = Math.min(nextStage, c.distributedProcess(rb));
- }
+ // call all components
+ for (SearchComponent c : components) {
+ if (checkLimitsBefore(c, "distrib", rb.req, rb.rsp, components)) {
+ shortCircuitedResults(req, rb);
+ return;
+ } // the next stage is the minimum of what all components report
+ nextStage = Math.min(nextStage, c.distributedProcess(rb));
+ }
- // check the outgoing queue and send requests
- while (rb.outgoing.size() > 0) {
+ // check the outgoing queue and send requests
+ while (rb.outgoing.size() > 0) {
- // submit all current request tasks at once
- while (rb.outgoing.size() > 0) {
- ShardRequest sreq = rb.outgoing.remove(0);
- sreq.actualShards = sreq.shards;
- if (sreq.actualShards == ShardRequest.ALL_SHARDS) {
- sreq.actualShards = rb.shards;
- }
- // presume we'll get a response from each shard we send to
- sreq.responses = new ArrayList<>(sreq.actualShards.length);
+ // submit all current request tasks at once
+ while (rb.outgoing.size() > 0) {
+ ShardRequest sreq = rb.outgoing.remove(0);
+ sreq.actualShards = sreq.shards;
+ if (sreq.actualShards == ShardRequest.ALL_SHARDS) {
+ sreq.actualShards = rb.shards;
+ }
+ // presume we'll get a response from each shard we send to
+ sreq.responses = new ArrayList<>(sreq.actualShards.length);
- QueryLimits queryLimits = QueryLimits.getCurrentLimits();
+ QueryLimits queryLimits = QueryLimits.getCurrentLimits();
- // TODO: map from shard to address[]
- for (String shard : sreq.actualShards) {
- ModifiableSolrParams params = new
ModifiableSolrParams(sreq.params);
- ShardHandler.setShardAttributesToParams(params, sreq.purpose);
+ // TODO: map from shard to address[]
+ for (String shard : sreq.actualShards) {
+ ModifiableSolrParams params = new
ModifiableSolrParams(sreq.params);
+ ShardHandler.setShardAttributesToParams(params, sreq.purpose);
- // Distributed request -- need to send queryID as a part of the
distributed request
- params.setNonNull(ShardParams.QUERY_ID, rb.queryID);
- if (rb.requestInfo != null) {
- // we could try and detect when this is needed, but it could
be tricky
- params.set("NOW",
Long.toString(rb.requestInfo.getNOW().getTime()));
- }
- String shardQt = params.get(ShardParams.SHARDS_QT);
- if (shardQt != null) {
- params.set(CommonParams.QT, shardQt);
- } else {
- // for distributed queries that don't include shards.qt, use
the original path
- // as the default but operators need to update their
luceneMatchVersion to enable
- // this behavior since it did not work this way prior to 5.1
- String reqPath = (String) req.getContext().get(PATH);
- if (!"/select".equals(reqPath)) {
- params.set(CommonParams.QT, reqPath);
- } // else if path is /select, then the qt gets passed through
if set
- }
- if (queryLimits.isLimitsEnabled()) {
- if (queryLimits.adjustShardRequestLimits(sreq, shard, params,
rb)) {
- // Skip this shard since one or more limits will be tripped
- if (log.isDebugEnabled()) {
- log.debug(
- "Skipping request to shard '{}' due to query limits,
params {}",
- shard,
- params);
- }
- continue;
+ // Distributed request -- need to send queryID as a part of the
distributed request
+ params.setNonNull(ShardParams.QUERY_ID, rb.queryID);
+ if (rb.requestInfo != null) {
+ // we could try and detect when this is needed, but it could be
tricky
+ params.set("NOW",
Long.toString(rb.requestInfo.getNOW().getTime()));
+ }
+ String shardQt = params.get(ShardParams.SHARDS_QT);
+ if (shardQt != null) {
+ params.set(CommonParams.QT, shardQt);
+ } else {
+ // for distributed queries that don't include shards.qt, use the
original path
+ // as the default but operators need to update their
luceneMatchVersion to enable
+ // this behavior since it did not work this way prior to 5.1
+ String reqPath = (String) req.getContext().get(PATH);
+ if (!"/select".equals(reqPath)) {
+ params.set(CommonParams.QT, reqPath);
+ } // else if path is /select, then the qt gets passed through if
set
+ }
+ if (queryLimits.isLimitsEnabled()) {
+ if (queryLimits.adjustShardRequestLimits(sreq, shard, params,
rb)) {
+ // Skip this shard since one or more limits will be tripped
+ if (log.isDebugEnabled()) {
+ log.debug(
+ "Skipping request to shard '{}' due to query limits,
params {}",
+ shard,
+ params);
}
+ continue;
}
- shardHandler1.submit(sreq, shard, params);
}
+ shardHandler.submit(sreq, shard, params);
}
+ }
- // now wait for replies, but if anyone puts more requests on
- // the outgoing queue, send them out immediately (by exiting
- // this loop)
- boolean tolerant = HttpShardHandler.getShardsTolerantAsBool(rb.req);
- while (rb.outgoing.size() == 0) {
- ShardResponse srsp =
- tolerant
- ? shardHandler1.takeCompletedIncludingErrors()
- : shardHandler1.takeCompletedOrError();
- if (srsp == null) break; // no more requests to wait for
- AtomicReference<Object> detailMesg =
- new AtomicReference<>(); // or perhaps new Object[1] ?
-
- boolean anyResponsesPartial =
- srsp.getShardRequest().responses.stream()
- .anyMatch(
- response -> {
- NamedList<Object> resp =
response.getSolrResponse().getResponse();
- if (resp == null) {
- return false;
- }
- Object recursive =
- resp._get(List.of("responseHeader",
"partialResults"), null);
- if (recursive != null) {
- Object message =
- "[Shard:"
- + response.getShardAddress()
- + "]"
- + resp._get(
- List.of(
- "responseHeader",
-
RESPONSE_HEADER_PARTIAL_RESULTS_DETAILS_KEY),
- null);
- detailMesg.compareAndSet(null, message); // first
one, ingore rest
- }
- return recursive != null;
- });
- if (anyResponsesPartial) {
- rb.rsp.addPartialResponseDetail(detailMesg.get());
- rsp.setPartialResults(rb.req);
- }
- // Was there an exception?
- // In the case of tolerant search, we need to check all responses
to see if there was an
- // exception.
- Optional<Throwable> shardException =
- srsp.getShardRequest().responses.stream()
- .map(ShardResponse::getException)
- .filter(Objects::nonNull)
- .findFirst();
- if (shardException.isPresent()) {
- // If things are not tolerant, abort everything and rethrow
- if (!tolerant) {
+ // now wait for replies, but if anyone puts more requests on
+ // the outgoing queue, send them out immediately (by exiting
+ // this loop)
+ boolean tolerant = HttpShardHandler.getShardsTolerantAsBool(rb.req);
+ while (rb.outgoing.size() == 0) {
+ ShardResponse srsp =
+ tolerant
+ ? shardHandler.takeCompletedIncludingErrors()
+ : shardHandler.takeCompletedOrError();
+ if (srsp == null) break; // no more requests to wait for
+ AtomicReference<Object> detailMesg =
+ new AtomicReference<>(); // or perhaps new Object[1] ?
+
+ boolean anyResponsesPartial =
+ srsp.getShardRequest().responses.stream()
+ .anyMatch(
+ response -> {
+ NamedList<Object> resp =
response.getSolrResponse().getResponse();
+ if (resp == null) {
+ return false;
+ }
+ Object recursive =
+ resp._get(List.of("responseHeader",
"partialResults"), null);
+ if (recursive != null) {
+ Object message =
+ "[Shard:"
+ + response.getShardAddress()
+ + "]"
+ + resp._get(
+ List.of(
+ "responseHeader",
+
RESPONSE_HEADER_PARTIAL_RESULTS_DETAILS_KEY),
+ null);
+ detailMesg.compareAndSet(null, message); // first
one, ingore rest
+ }
+ return recursive != null;
+ });
+ if (anyResponsesPartial) {
+ rb.rsp.addPartialResponseDetail(detailMesg.get());
+ rsp.setPartialResults(rb.req);
+ }
+ // Was there an exception?
+ // In the case of tolerant search, we need to check all responses to
see if there was an
+ // exception.
+ Optional<Throwable> shardException =
+ srsp.getShardRequest().responses.stream()
+ .map(ShardResponse::getException)
+ .filter(Objects::nonNull)
+ .findFirst();
+ if (shardException.isPresent()) {
+ // If things are not tolerant, abort everything and rethrow
+ if (!tolerant) {
+ throwSolrException(shardException.get());
+ } else {
+ // Check if the purpose includes 'PURPOSE_GET_TOP_IDS'
+ boolean includesTopIdsPurpose =
+ (srsp.getShardRequest().purpose &
ShardRequest.PURPOSE_GET_TOP_IDS) != 0;
+ // Check if all responses have exceptions
+ boolean allResponsesHaveExceptions =
+ srsp.getShardRequest().responses.stream()
+ .allMatch(response -> response.getException() != null);
+ // Check if all shards have failed for PURPOSE_GET_TOP_IDS
+ boolean allShardsFailed = includesTopIdsPurpose &&
allResponsesHaveExceptions;
+ // if all shards fail, fail the request despite shards.tolerant
+ if (allShardsFailed) {
throwSolrException(shardException.get());
} else {
- // Check if the purpose includes 'PURPOSE_GET_TOP_IDS'
- boolean includesTopIdsPurpose =
- (srsp.getShardRequest().purpose &
ShardRequest.PURPOSE_GET_TOP_IDS) != 0;
- // Check if all responses have exceptions
- boolean allResponsesHaveExceptions =
- srsp.getShardRequest().responses.stream()
- .allMatch(response -> response.getException() != null);
- // Check if all shards have failed for PURPOSE_GET_TOP_IDS
- boolean allShardsFailed = includesTopIdsPurpose &&
allResponsesHaveExceptions;
- // if all shards fail, fail the request despite shards.tolerant
- if (allShardsFailed) {
- throwSolrException(shardException.get());
- } else {
- rsp.setPartialResults(rb.req);
- if (publishCpuTime) {
- totalShardCpuTime +=
computeShardCpuTime(srsp.getShardRequest().responses);
- rsp.getResponseHeader().add(ThreadCpuTimer.CPU_TIME,
totalShardCpuTime);
- rsp.addToLog(ThreadCpuTimer.CPU_TIME, totalShardCpuTime);
- }
+ rsp.setPartialResults(rb.req);
+ if (publishCpuTime) {
+ totalShardCpuTime +=
computeShardCpuTime(srsp.getShardRequest().responses);
+ rsp.getResponseHeader().add(ThreadCpuTimer.CPU_TIME,
totalShardCpuTime);
+ rsp.addToLog(ThreadCpuTimer.CPU_TIME, totalShardCpuTime);
}
}
}
+ }
- rb.finished.add(srsp.getShardRequest());
-
- // let the components see the responses to the request
- for (SearchComponent c : components) {
- if (checkLimitsBefore(
- c,
- "handleResponses next stage:" + stageToString(nextStage),
- rb.req,
- rb.rsp,
- components)) {
- shortCircuitedResults(req, rb);
- return;
- }
- c.handleResponses(rb, srsp.getShardRequest());
- }
+ rb.finished.add(srsp.getShardRequest());
- // Compute total CpuTime used by all shards.
- if (publishCpuTime) {
- totalShardCpuTime +=
computeShardCpuTime(srsp.getShardRequest().responses);
+ // let the components see the responses to the request
+ for (SearchComponent c : components) {
+ if (checkLimitsBefore(
+ c,
+ "handleResponses next stage:" + stageToString(nextStage),
+ rb.req,
+ rb.rsp,
+ components)) {
+ shortCircuitedResults(req, rb);
+ return;
}
+ c.handleResponses(rb, srsp.getShardRequest());
}
- }
- for (SearchComponent c : components) {
- if (checkLimitsBefore(
- c, "finishStage stage:" + stageToString(nextStage), rb.req,
rb.rsp, components)) {
- return;
+ // Compute total CpuTime used by all shards.
+ if (publishCpuTime) {
+ totalShardCpuTime +=
computeShardCpuTime(srsp.getShardRequest().responses);
}
- c.finishStage(rb);
}
+ }
- // we are done when the next stage is MAX_VALUE
- } while (nextStage != Integer.MAX_VALUE);
-
- if (publishCpuTime) {
- rsp.getResponseHeader().add(ThreadCpuTimer.CPU_TIME,
totalShardCpuTime);
- rsp.addToLog(ThreadCpuTimer.CPU_TIME, totalShardCpuTime);
+ for (SearchComponent c : components) {
+ if (checkLimitsBefore(
+ c, "finishStage stage:" + stageToString(nextStage), rb.req,
rb.rsp, components)) {
+ return;
+ }
+ c.finishStage(rb);
}
+
+ // we are done when the next stage is MAX_VALUE
+ } while (nextStage != Integer.MAX_VALUE);
+
+ if (publishCpuTime) {
+ rsp.getResponseHeader().add(ThreadCpuTimer.CPU_TIME, totalShardCpuTime);
+ rsp.addToLog(ThreadCpuTimer.CPU_TIME, totalShardCpuTime);
}
}
- private static boolean prepareComponents(
- SolrQueryRequest req, ResponseBuilder rb, RTimerTree timer,
List<SearchComponent> components)
- throws IOException {
- if (timer == null) {
- // non-debugging prepare phase
- for (SearchComponent component : components) {
- if (checkLimitsBefore(component, "prepare", rb.req, rb.rsp,
components)) {
- shortCircuitedResults(req, rb);
- return false;
+ private void fillShardsInfoShortCircuited(
+ SolrQueryRequest req, SolrQueryResponse rsp, ResponseBuilder rb) {
+ // SOLR-5550: still provide shards.info if requested even for a
short-circuited distrib request
+ if (!req.getParams().getBool(ShardParams.SHARDS_INFO, false) ||
rb.shortCircuitedURL == null) {
+ return;
+ }
+ assert rb.isDistrib == false;
+
+ NamedList<Object> shardInfo = new SimpleOrderedMap<>();
+ SimpleOrderedMap<Object> nl = new SimpleOrderedMap<>();
+ if (rsp.getException() != null) {
+ Throwable cause = rsp.getException();
+ if (cause instanceof SolrServerException) {
+ cause = ((SolrServerException) cause).getRootCause();
+ } else {
+ if (cause.getCause() != null) {
+ cause = cause.getCause();
}
- component.prepare(rb);
}
- } else {
- // debugging prepare phase
- RTimerTree subt = timer.sub("prepare");
- for (SearchComponent c : components) {
- if (checkLimitsBefore(c, "prepare debug", rb.req, rb.rsp, components))
{
- shortCircuitedResults(req, rb);
- return false;
- }
- rb.setTimer(subt.sub(c.getName()));
- c.prepare(rb);
- rb.getTimer().stop();
+ nl.add("error", cause.toString());
+ if (!core.getCoreContainer().hideStackTrace()) {
+ StringWriter trace = new StringWriter();
+ cause.printStackTrace(new PrintWriter(trace));
+ nl.add("trace", trace.toString());
}
- subt.stop();
+ } else if (rb.getResults() != null) {
+ nl.add("numFound", rb.getResults().docList.matches());
+ nl.add(
+ "numFoundExact",
+ rb.getResults().docList.hitCountRelation() ==
TotalHits.Relation.EQUAL_TO);
+ nl.add("maxScore", rb.getResults().docList.maxScore());
}
- return true;
+ nl.add("shardAddress", rb.shortCircuitedURL);
+ nl.add("time", req.getRequestTimer().getTime()); // elapsed time of this
request so far
+
+ int pos = rb.shortCircuitedURL.indexOf("://");
+ String shardInfoName =
+ pos != -1 ? rb.shortCircuitedURL.substring(pos + 3) :
rb.shortCircuitedURL;
+ shardInfo.add(shardInfoName, nl);
+ rsp.getValues().add(ShardParams.SHARDS_INFO, shardInfo);
}
protected String stageToString(int stage) {
diff --git a/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
b/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
index d213ee52ccd..0e3642501c5 100644
--- a/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
+++ b/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
@@ -53,6 +53,7 @@ import org.apache.solr.api.ApiBag;
import org.apache.solr.api.V2HttpCall;
import org.apache.solr.client.api.util.SolrVersion;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.Aliases;
@@ -1112,6 +1113,19 @@ public class HttpSolrCall {
return Collections.emptyMap();
}
+ /**
+ * The URL to this core, e.g. {@code http://localhost:8983/solr}.
+ *
+ * @see ZkController#getBaseUrl()
+ */
+ public String getThisNodeUrl() {
+ String scheme = getReq().getScheme();
+ String host = getReq().getServerName();
+ int port = getReq().getServerPort();
+ String context = getReq().getContextPath();
+ return String.format(Locale.ROOT, "%s://%s:%d%s", scheme, host, port,
context);
+ }
+
/** A faster method for randomly picking items when you do not need to
consume all items. */
private static class RandomIterator<E> implements Iterator<E> {
private Random rand;
diff --git
a/solr/test-framework/src/java/org/apache/solr/BaseDistributedSearchTestCase.java
b/solr/test-framework/src/java/org/apache/solr/BaseDistributedSearchTestCase.java
index cc4ed5b8369..27876850858 100644
---
a/solr/test-framework/src/java/org/apache/solr/BaseDistributedSearchTestCase.java
+++
b/solr/test-framework/src/java/org/apache/solr/BaseDistributedSearchTestCase.java
@@ -624,19 +624,18 @@ public abstract class BaseDistributedSearchTestCase
extends SolrTestCaseJ4 {
/** Returns the distributed QueryResponse */
protected QueryResponse query(boolean setDistribParams, SolrParams p) throws
Exception {
+ if (p.get("distrib") != null) {
+ throw new IllegalArgumentException("don't pass distrib param");
+ }
- final ModifiableSolrParams params = new ModifiableSolrParams(p);
-
- // TODO: look into why passing true causes fails
- params.set("distrib", "false");
- final QueryResponse controlRsp = controlClient.query(params);
+ final QueryResponse controlRsp = controlClient.query(p);
validateControlData(controlRsp);
if (shardCount == 0) { // mostly for temp debugging
return controlRsp;
}
- params.remove("distrib");
+ final ModifiableSolrParams params = new ModifiableSolrParams(p);
if (setDistribParams) setDistributedParams(params);
QueryResponse rsp = queryRandomShard(params);