This is an automated email from the ASF dual-hosted git repository. dsmiley pushed a commit to branch branch_9x in repository https://gitbox.apache.org/repos/asf/solr.git
commit 48524c8b3066d07a356f94c46789d28dc0910201 Author: David Smiley <[email protected]> AuthorDate: Wed Dec 17 20:49:08 2025 -0500 SOLR-17982: SearchHandler refactor Did this manually, imitating similar actions as done on main (10x). This helps us see the differences in the commit to follow. --- .../solr/handler/component/SearchHandler.java | 587 +++++++++++---------- 1 file changed, 310 insertions(+), 277 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/handler/component/SearchHandler.java b/solr/core/src/java/org/apache/solr/handler/component/SearchHandler.java index 61424556517..0bf03caab5e 100644 --- a/solr/core/src/java/org/apache/solr/handler/component/SearchHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/component/SearchHandler.java @@ -240,6 +240,36 @@ public class SearchHandler extends RequestHandlerBase } } + @Override + public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception { + if (req.getParams().getBool(ShardParams.IS_SHARD, false)) { + int purpose = req.getParams().getInt(ShardParams.SHARDS_PURPOSE, 0); + SolrPluginUtils.forEachRequestPurpose( + purpose, n -> shardPurposes.computeIfAbsent(n, name -> new Counter()).inc()); + } + + List<SearchComponent> components = getComponents(); + ResponseBuilder rb = newResponseBuilder(req, rsp, components); + if (rb.requestInfo != null) { + rb.requestInfo.setResponseBuilder(rb); + } + + rb.isDistrib = isDistrib(req, rb); // can change later nonetheless + tagRequestWithRequestId(rb); + + boolean dbg = req.getParams().getBool(CommonParams.DEBUG_QUERY, false); + rb.setDebug(dbg); + if (dbg == false) { // if it's true, we are doing everything anyway. + SolrPluginUtils.getDebugInterests(req.getParams().getParams(CommonParams.DEBUG), rb); + } + + if (checkCircuitBreakers(req, rsp, rb)) { + return; // Circuit breaker tripped, return immediately + } + + processComponents(req, rsp, rb, components); + } + @SuppressWarnings({"unchecked"}) private void initComponents() { Object declaredComponents = initArgs.get(INIT_COMPONENTS); @@ -303,7 +333,7 @@ public class SearchHandler extends RequestHandlerBase return result; } - private boolean isDistrib(SolrQueryRequest req) { + protected boolean isDistrib(SolrQueryRequest req, ResponseBuilder rb) { boolean isZkAware = req.getCoreContainer().isZooKeeperAware(); boolean isDistrib = req.getParams().getBool(DISTRIB, isZkAware); if (!isDistrib) { @@ -396,330 +426,333 @@ public class SearchHandler extends RequestHandlerBase return false; } - @Override - public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception { - if (req.getParams().getBool(ShardParams.IS_SHARD, false)) { - int purpose = req.getParams().getInt(ShardParams.SHARDS_PURPOSE, 0); - SolrPluginUtils.forEachRequestPurpose( - purpose, n -> shardPurposes.computeIfAbsent(n, name -> new Counter()).inc()); - } - - List<SearchComponent> components = getComponents(); - ResponseBuilder rb = newResponseBuilder(req, rsp, components); - if (rb.requestInfo != null) { - rb.requestInfo.setResponseBuilder(rb); - } - - rb.isDistrib = isDistrib(req); - tagRequestWithRequestId(rb); + protected void processComponents( + SolrQueryRequest req, + SolrQueryResponse rsp, + ResponseBuilder rb, + List<SearchComponent> components) + throws IOException { + final RTimerTree timer = rb.isDebug() ? req.getRequestTimer() : null; + // return a ShardHandler only if doing distributed search (equivalent to rb.isDistrib) + final ShardHandler shardHandler = getAndPrepShardHandler(req, rb); - boolean dbg = req.getParams().getBool(CommonParams.DEBUG_QUERY, false); - rb.setDebug(dbg); - if (dbg == false) { // if it's true, we are doing everything anyway. - SolrPluginUtils.getDebugInterests(req.getParams().getParams(CommonParams.DEBUG), rb); - } + if (!prepareComponents(req, rb, timer, components)) return; - final RTimerTree timer = rb.isDebug() ? req.getRequestTimer() : null; + postPrepareComponents(rb); - if (checkCircuitBreakers(req, rsp, rb)) { - return; // Circuit breaker tripped, return immediately + if (shardHandler == null) { + processComponentsLocal(req, rsp, rb, timer, components); + } else { + processComponentsDistrib(req, rsp, rb, timer, components, shardHandler); } + } - processComponents(req, rsp, rb, timer, components); - - // SOLR-5550: still provide shards.info if requested even for a short-circuited distrib request - if (!rb.isDistrib - && req.getParams().getBool(ShardParams.SHARDS_INFO, false) - && rb.shortCircuitedURL != null) { - NamedList<Object> shardInfo = new SimpleOrderedMap<>(); - SimpleOrderedMap<Object> nl = new SimpleOrderedMap<>(); - if (rsp.getException() != null) { - Throwable cause = rsp.getException(); - if (cause instanceof SolrServerException) { - cause = ((SolrServerException) cause).getRootCause(); - } else { - if (cause.getCause() != null) { - cause = cause.getCause(); - } + private static boolean prepareComponents( + SolrQueryRequest req, ResponseBuilder rb, RTimerTree timer, List<SearchComponent> components) + throws IOException { + if (timer == null) { + // non-debugging prepare phase + for (SearchComponent component : components) { + if (checkLimitsBefore(component, "prepare", rb.req, rb.rsp, components)) { + shortCircuitedResults(req, rb); + return false; } - nl.add("error", cause.toString()); - if (!core.getCoreContainer().hideStackTrace()) { - StringWriter trace = new StringWriter(); - cause.printStackTrace(new PrintWriter(trace)); - nl.add("trace", trace.toString()); + component.prepare(rb); + } + } else { + // debugging prepare phase + RTimerTree subt = timer.sub("prepare"); + for (SearchComponent c : components) { + if (checkLimitsBefore(c, "prepare debug", rb.req, rb.rsp, components)) { + shortCircuitedResults(req, rb); + return false; } - } else if (rb.getResults() != null) { - nl.add("numFound", rb.getResults().docList.matches()); - nl.add( - "numFoundExact", - rb.getResults().docList.hitCountRelation() == TotalHits.Relation.EQUAL_TO); - nl.add("maxScore", rb.getResults().docList.maxScore()); + rb.setTimer(subt.sub(c.getName())); + c.prepare(rb); + rb.getTimer().stop(); } - nl.add("shardAddress", rb.shortCircuitedURL); - nl.add("time", req.getRequestTimer().getTime()); // elapsed time of this request so far - - int pos = rb.shortCircuitedURL.indexOf("://"); - String shardInfoName = - pos != -1 ? rb.shortCircuitedURL.substring(pos + 3) : rb.shortCircuitedURL; - shardInfo.add(shardInfoName, nl); - rsp.getValues().add(ShardParams.SHARDS_INFO, shardInfo); + subt.stop(); } + return true; } - private void processComponents( + /** + * Called after {@link #prepareComponents(SolrQueryRequest, ResponseBuilder, RTimerTree, List)} + */ + protected void postPrepareComponents(ResponseBuilder rb) { + // Once all of our components have been prepared, check if this request involves a SortSpec. + // If it does, and if our request includes a cursorMark param, then parse & init the + // CursorMark state (This must happen after the prepare() of all components, because any + // component may have modified the SortSpec) + final SortSpec spec = rb.getSortSpec(); + final String cursorStr = rb.req.getParams().get(CursorMarkParams.CURSOR_MARK_PARAM); + if (null != spec && null != cursorStr) { + final CursorMark cursorMark = new CursorMark(rb.req.getSchema(), spec); + cursorMark.parseSerializedTotem(cursorStr); + rb.setCursorMark(cursorMark); + } + } + + /** Local request processing (not distributed). */ + protected void processComponentsLocal( SolrQueryRequest req, SolrQueryResponse rsp, ResponseBuilder rb, RTimerTree timer, List<SearchComponent> components) throws IOException { - // creates a ShardHandler object only if it's needed - final ShardHandler shardHandler1 = getAndPrepShardHandler(req, rb); - - if (!prepareComponents(req, rb, timer, components)) return; - - { // Once all of our components have been prepared, check if this request involves a SortSpec. - // If it does, and if our request includes a cursorMark param, then parse & init the - // CursorMark state (This must happen after the prepare() of all components, because any - // component may have modified the SortSpec) - final SortSpec spec = rb.getSortSpec(); - final String cursorStr = rb.req.getParams().get(CursorMarkParams.CURSOR_MARK_PARAM); - if (null != spec && null != cursorStr) { - final CursorMark cursorMark = new CursorMark(rb.req.getSchema(), spec); - cursorMark.parseSerializedTotem(cursorStr); - rb.setCursorMark(cursorMark); - } - } - - if (!rb.isDistrib) { - // a normal non-distributed request - try { - // The semantics of debugging vs not debugging are different enough that - // it makes sense to have two control loops - if (!rb.isDebug()) { - // Process - for (SearchComponent c : components) { - if (checkLimitsBefore(c, "process", rb.req, rb.rsp, components)) { - shortCircuitedResults(req, rb); - return; - } - c.process(rb); + // a normal non-distributed request + assert !rb.isDistrib; + try { + // The semantics of debugging vs not debugging are different enough that + // it makes sense to have two control loops + if (!rb.isDebug()) { + // Process + for (SearchComponent c : components) { + if (checkLimitsBefore(c, "process", rb.req, rb.rsp, components)) { + shortCircuitedResults(req, rb); + return; } - } else { - // Process - RTimerTree subt = timer.sub("process"); - for (SearchComponent c : components) { - if (checkLimitsBefore(c, "process debug", rb.req, rb.rsp, components)) { - shortCircuitedResults(req, rb); - return; - } - rb.setTimer(subt.sub(c.getName())); - c.process(rb); - rb.getTimer().stop(); + c.process(rb); + } + } else { + // Process + RTimerTree subt = timer.sub("process"); + for (SearchComponent c : components) { + if (checkLimitsBefore(c, "process debug", rb.req, rb.rsp, components)) { + shortCircuitedResults(req, rb); + return; } - subt.stop(); + rb.setTimer(subt.sub(c.getName())); + c.process(rb); + rb.getTimer().stop(); + } + subt.stop(); - // add the timing info - if (rb.isDebugTimings()) { - rb.addDebugInfo("timing", timer.asNamedList()); - } + // add the timing info + if (rb.isDebugTimings()) { + rb.addDebugInfo("timing", timer.asNamedList()); } - } catch (ExitableDirectoryReader.ExitingReaderException ex) { - log.warn("Query: {}; ", req.getParamString(), ex); - shortCircuitedResults(req, rb); } - } else { - // a distributed request + } catch (ExitableDirectoryReader.ExitingReaderException ex) { + log.warn("Query: {}; ", req.getParamString(), ex); + shortCircuitedResults(req, rb); + } - if (rb.outgoing == null) { - rb.outgoing = new ArrayList<>(); - } - rb.finished = new ArrayList<>(); + fillShardsInfoShortCircuited(req, rsp, rb); + } - int nextStage = 0; - do { - rb.setStage(nextStage); - nextStage = ResponseBuilder.STAGE_DONE; + /** Distributed request processing (AKA coordinator). */ + protected void processComponentsDistrib( + SolrQueryRequest req, + SolrQueryResponse rsp, + ResponseBuilder rb, + RTimerTree timer, + List<SearchComponent> components, + ShardHandler shardHandler) + throws IOException { + assert rb.isDistrib; + if (rb.outgoing == null) { + rb.outgoing = new ArrayList<>(); + } + rb.finished = new ArrayList<>(); - // call all components - for (SearchComponent c : components) { - if (checkLimitsBefore(c, "distrib", rb.req, rb.rsp, components)) { - shortCircuitedResults(req, rb); - return; - } // the next stage is the minimum of what all components report - nextStage = Math.min(nextStage, c.distributedProcess(rb)); - } + int nextStage = 0; + do { + rb.setStage(nextStage); + nextStage = ResponseBuilder.STAGE_DONE; - // check the outgoing queue and send requests - while (rb.outgoing.size() > 0) { + // call all components + for (SearchComponent c : components) { + if (checkLimitsBefore(c, "distrib", rb.req, rb.rsp, components)) { + shortCircuitedResults(req, rb); + return; + } // the next stage is the minimum of what all components report + nextStage = Math.min(nextStage, c.distributedProcess(rb)); + } - // submit all current request tasks at once - while (rb.outgoing.size() > 0) { - ShardRequest sreq = rb.outgoing.remove(0); - sreq.actualShards = sreq.shards; - if (sreq.actualShards == ShardRequest.ALL_SHARDS) { - sreq.actualShards = rb.shards; + // check the outgoing queue and send requests + while (rb.outgoing.size() > 0) { + + // submit all current request tasks at once + while (rb.outgoing.size() > 0) { + ShardRequest sreq = rb.outgoing.remove(0); + sreq.actualShards = sreq.shards; + if (sreq.actualShards == ShardRequest.ALL_SHARDS) { + sreq.actualShards = rb.shards; + } + // presume we'll get a response from each shard we send to + sreq.responses = new ArrayList<>(sreq.actualShards.length); + + // TODO: map from shard to address[] + for (String shard : sreq.actualShards) { + ModifiableSolrParams params = new ModifiableSolrParams(sreq.params); + ShardHandler.setShardAttributesToParams(params, sreq.purpose); + + // Distributed request -- need to send queryID as a part of the distributed request + params.setNonNull(ShardParams.QUERY_ID, rb.queryID); + if (rb.requestInfo != null) { + // we could try and detect when this is needed, but it could be tricky + params.set("NOW", Long.toString(rb.requestInfo.getNOW().getTime())); } - // presume we'll get a response from each shard we send to - sreq.responses = new ArrayList<>(sreq.actualShards.length); - - // TODO: map from shard to address[] - for (String shard : sreq.actualShards) { - ModifiableSolrParams params = new ModifiableSolrParams(sreq.params); - ShardHandler.setShardAttributesToParams(params, sreq.purpose); - - // Distributed request -- need to send queryID as a part of the distributed request - params.setNonNull(ShardParams.QUERY_ID, rb.queryID); - if (rb.requestInfo != null) { - // we could try and detect when this is needed, but it could be tricky - params.set("NOW", Long.toString(rb.requestInfo.getNOW().getTime())); - } - String shardQt = params.get(ShardParams.SHARDS_QT); - if (shardQt != null) { - params.set(CommonParams.QT, shardQt); - } else { - // for distributed queries that don't include shards.qt, use the original path - // as the default but operators need to update their luceneMatchVersion to enable - // this behavior since it did not work this way prior to 5.1 - String reqPath = (String) req.getContext().get(PATH); - if (!"/select".equals(reqPath)) { - params.set(CommonParams.QT, reqPath); - } // else if path is /select, then the qt gets passed thru if set - } - shardHandler1.submit(sreq, shard, params); + String shardQt = params.get(ShardParams.SHARDS_QT); + if (shardQt != null) { + params.set(CommonParams.QT, shardQt); + } else { + // for distributed queries that don't include shards.qt, use the original path + // as the default but operators need to update their luceneMatchVersion to enable + // this behavior since it did not work this way prior to 5.1 + String reqPath = (String) req.getContext().get(PATH); + if (!"/select".equals(reqPath)) { + params.set(CommonParams.QT, reqPath); + } // else if path is /select, then the qt gets passed thru if set } + shardHandler.submit(sreq, shard, params); } + } - // now wait for replies, but if anyone puts more requests on - // the outgoing queue, send them out immediately (by exiting - // this loop) - boolean tolerant = HttpShardHandler.getShardsTolerantAsBool(rb.req); - while (rb.outgoing.size() == 0) { - ShardResponse srsp = - tolerant - ? shardHandler1.takeCompletedIncludingErrors() - : shardHandler1.takeCompletedOrError(); - if (srsp == null) break; // no more requests to wait for - AtomicReference<Object> detailMesg = - new AtomicReference<>(); // or perhaps new Object[1] ? - - boolean anyResponsesPartial = - srsp.getShardRequest().responses.stream() - .anyMatch( - response -> { - NamedList<Object> resp = response.getSolrResponse().getResponse(); - if (resp == null) { - return false; - } - Object recursive = - resp._get(List.of("responseHeader", "partialResults"), null); - if (recursive != null) { - Object message = - "[Shard:" - + response.getShardAddress() - + "]" - + resp._get( - List.of( - "responseHeader", - RESPONSE_HEADER_PARTIAL_RESULTS_DETAILS_KEY), - null); - detailMesg.compareAndSet(null, message); // first one, ingore rest - } - return recursive != null; - }); - if (anyResponsesPartial) { - rb.rsp.addPartialResponseDetail(detailMesg.get()); - rsp.setPartialResults(rb.req); - } - // Was there an exception? - // In the case of tolerant search, we need to check all responses to see if there was an - // exception. - Optional<Throwable> shardException = - srsp.getShardRequest().responses.stream() - .map(ShardResponse::getException) - .filter(Objects::nonNull) - .findFirst(); - if (shardException.isPresent()) { - // If things are not tolerant, abort everything and rethrow - if (!tolerant) { + // now wait for replies, but if anyone puts more requests on + // the outgoing queue, send them out immediately (by exiting + // this loop) + boolean tolerant = HttpShardHandler.getShardsTolerantAsBool(rb.req); + while (rb.outgoing.size() == 0) { + ShardResponse srsp = + tolerant + ? shardHandler.takeCompletedIncludingErrors() + : shardHandler.takeCompletedOrError(); + if (srsp == null) break; // no more requests to wait for + AtomicReference<Object> detailMesg = + new AtomicReference<>(); // or perhaps new Object[1] ? + + boolean anyResponsesPartial = + srsp.getShardRequest().responses.stream() + .anyMatch( + response -> { + NamedList<Object> resp = response.getSolrResponse().getResponse(); + if (resp == null) { + return false; + } + Object recursive = + resp._get(List.of("responseHeader", "partialResults"), null); + if (recursive != null) { + Object message = + "[Shard:" + + response.getShardAddress() + + "]" + + resp._get( + List.of( + "responseHeader", + RESPONSE_HEADER_PARTIAL_RESULTS_DETAILS_KEY), + null); + detailMesg.compareAndSet(null, message); // first one, ingore rest + } + return recursive != null; + }); + if (anyResponsesPartial) { + rb.rsp.addPartialResponseDetail(detailMesg.get()); + rsp.setPartialResults(rb.req); + } + // Was there an exception? + // In the case of tolerant search, we need to check all responses to see if there was an + // exception. + Optional<Throwable> shardException = + srsp.getShardRequest().responses.stream() + .map(ShardResponse::getException) + .filter(Objects::nonNull) + .findFirst(); + if (shardException.isPresent()) { + // If things are not tolerant, abort everything and rethrow + if (!tolerant) { + throwSolrException(shardException.get()); + } else { + // Check if the purpose includes 'PURPOSE_GET_TOP_IDS' + boolean includesTopIdsPurpose = + (srsp.getShardRequest().purpose & ShardRequest.PURPOSE_GET_TOP_IDS) != 0; + // Check if all responses have exceptions + boolean allResponsesHaveExceptions = + srsp.getShardRequest().responses.stream() + .allMatch(response -> response.getException() != null); + // Check if all shards have failed for PURPOSE_GET_TOP_IDS + boolean allShardsFailed = includesTopIdsPurpose && allResponsesHaveExceptions; + // if all shards fail, fail the request despite shards.tolerant + if (allShardsFailed) { throwSolrException(shardException.get()); } else { - // Check if the purpose includes 'PURPOSE_GET_TOP_IDS' - boolean includesTopIdsPurpose = - (srsp.getShardRequest().purpose & ShardRequest.PURPOSE_GET_TOP_IDS) != 0; - // Check if all responses have exceptions - boolean allResponsesHaveExceptions = - srsp.getShardRequest().responses.stream() - .allMatch(response -> response.getException() != null); - // Check if all shards have failed for PURPOSE_GET_TOP_IDS - boolean allShardsFailed = includesTopIdsPurpose && allResponsesHaveExceptions; - // if all shards fail, fail the request despite shards.tolerant - if (allShardsFailed) { - throwSolrException(shardException.get()); - } else { - rsp.setPartialResults(rb.req); - } + rsp.setPartialResults(rb.req); } } + } - rb.finished.add(srsp.getShardRequest()); - - // let the components see the responses to the request - for (SearchComponent c : components) { - if (checkLimitsBefore( - c, - "handleResponses next stage:" + stageToString(nextStage), - rb.req, - rb.rsp, - components)) { - shortCircuitedResults(req, rb); - return; - } - c.handleResponses(rb, srsp.getShardRequest()); + rb.finished.add(srsp.getShardRequest()); + + // let the components see the responses to the request + for (SearchComponent c : components) { + if (checkLimitsBefore( + c, + "handleResponses next stage:" + stageToString(nextStage), + rb.req, + rb.rsp, + components)) { + shortCircuitedResults(req, rb); + return; } + c.handleResponses(rb, srsp.getShardRequest()); } } + } - for (SearchComponent c : components) { - if (checkLimitsBefore( - c, "finishStage stage:" + stageToString(nextStage), rb.req, rb.rsp, components)) { - return; - } - c.finishStage(rb); + for (SearchComponent c : components) { + if (checkLimitsBefore( + c, "finishStage stage:" + stageToString(nextStage), rb.req, rb.rsp, components)) { + return; } + c.finishStage(rb); + } - // we are done when the next stage is MAX_VALUE - } while (nextStage != Integer.MAX_VALUE); - } + // we are done when the next stage is MAX_VALUE + } while (nextStage != Integer.MAX_VALUE); } - private static boolean prepareComponents( - SolrQueryRequest req, ResponseBuilder rb, RTimerTree timer, List<SearchComponent> components) - throws IOException { - if (timer == null) { - // non-debugging prepare phase - for (SearchComponent component : components) { - if (checkLimitsBefore(component, "prepare", rb.req, rb.rsp, components)) { - shortCircuitedResults(req, rb); - return false; + private void fillShardsInfoShortCircuited( + SolrQueryRequest req, SolrQueryResponse rsp, ResponseBuilder rb) { + // SOLR-5550: still provide shards.info if requested even for a short-circuited distrib request + if (!req.getParams().getBool(ShardParams.SHARDS_INFO, false) || rb.shortCircuitedURL == null) { + return; + } + assert rb.isDistrib == false; + + NamedList<Object> shardInfo = new SimpleOrderedMap<>(); + SimpleOrderedMap<Object> nl = new SimpleOrderedMap<>(); + if (rsp.getException() != null) { + Throwable cause = rsp.getException(); + if (cause instanceof SolrServerException) { + cause = ((SolrServerException) cause).getRootCause(); + } else { + if (cause.getCause() != null) { + cause = cause.getCause(); } - component.prepare(rb); } - } else { - // debugging prepare phase - RTimerTree subt = timer.sub("prepare"); - for (SearchComponent c : components) { - if (checkLimitsBefore(c, "prepare debug", rb.req, rb.rsp, components)) { - shortCircuitedResults(req, rb); - return false; - } - rb.setTimer(subt.sub(c.getName())); - c.prepare(rb); - rb.getTimer().stop(); + nl.add("error", cause.toString()); + if (!core.getCoreContainer().hideStackTrace()) { + StringWriter trace = new StringWriter(); + cause.printStackTrace(new PrintWriter(trace)); + nl.add("trace", trace.toString()); } - subt.stop(); + } else if (rb.getResults() != null) { + nl.add("numFound", rb.getResults().docList.matches()); + nl.add( + "numFoundExact", + rb.getResults().docList.hitCountRelation() == TotalHits.Relation.EQUAL_TO); + nl.add("maxScore", rb.getResults().docList.maxScore()); } - return true; + nl.add("shardAddress", rb.shortCircuitedURL); + nl.add("time", req.getRequestTimer().getTime()); // elapsed time of this request so far + + int pos = rb.shortCircuitedURL.indexOf("://"); + String shardInfoName = + pos != -1 ? rb.shortCircuitedURL.substring(pos + 3) : rb.shortCircuitedURL; + shardInfo.add(shardInfoName, nl); + rsp.getValues().add(ShardParams.SHARDS_INFO, shardInfo); } protected String stageToString(int stage) {
