kotman12 commented on code in PR #4472:
URL: https://github.com/apache/solr/pull/4472#discussion_r3313628122
##########
solr/core/src/java/org/apache/solr/handler/admin/LukeRequestHandler.java:
##########
@@ -189,8 +240,411 @@ public void handleRequestBody(SolrQueryRequest req,
SolrQueryResponse rsp) throw
info.add(
"NOTE",
"Document Frequency (df) is not updated when a document is marked for
deletion. df values include deleted documents.");
- rsp.add("info", info);
+ rsp.add(RSP_INFO, info);
+ rsp.setHttpCaching(false);
+ }
+
+ /**
+ * Field-level response keys, declared in the order they appear in the local
(non-distributed)
+ * response. EnumMap iteration follows declaration order, giving
deterministic output.
+ */
+ enum FieldDataKey {
+ TYPE(KEY_TYPE),
+ SCHEMA(KEY_SCHEMA_FLAGS),
+ DYNAMIC_BASE(KEY_DYNAMIC_BASE),
+ INDEX(KEY_INDEX_FLAGS),
+ DOCS(KEY_DOCS);
+
+ final String responseKey;
+
+ FieldDataKey(String responseKey) {
+ this.responseKey = responseKey;
+ }
+ }
+
+ /** Per-field accumulation state across shards: aggregated response data and
field validation. */
+ private static class AggregatedFieldData {
+ final EnumMap<FieldDataKey, Object> properties = new
EnumMap<>(FieldDataKey.class);
+ final String originalShardAddr;
+ final LukeResponse.FieldInfo originalFieldInfo;
+ private String indexFlagsShardAddr;
+
+ AggregatedFieldData(String shardAddr, LukeResponse.FieldInfo fieldInfo) {
+ this.originalShardAddr = shardAddr;
+ this.originalFieldInfo = fieldInfo;
+ properties.put(FieldDataKey.TYPE, fieldInfo.getType());
+ properties.put(FieldDataKey.SCHEMA, fieldInfo.getSchema());
+ Object dynBase = fieldInfo.getExtras().get(KEY_DYNAMIC_BASE);
+ if (dynBase != null) {
+ properties.put(FieldDataKey.DYNAMIC_BASE, dynBase);
+ }
+ Object indexFlags = fieldInfo.getExtras().get(KEY_INDEX_FLAGS);
+ if (indexFlags != null) {
+ properties.put(FieldDataKey.INDEX, indexFlags);
+ this.indexFlagsShardAddr = shardAddr;
+ }
+ }
+
+ SimpleOrderedMap<Object> toResponse() {
+ SimpleOrderedMap<Object> result = new SimpleOrderedMap<>();
+ for (Map.Entry<FieldDataKey, Object> entry : properties.entrySet()) {
+ result.add(entry.getKey().responseKey, entry.getValue());
+ }
+ return result;
+ }
+ }
+
+ private static class ShardData {
+ final String shardAddr; // key in "shards" response map
+ final Map<String, LukeResponse.FieldInfo> shardFieldInfo; // keyed by
field name
+ private NamedList<Object> indexInfo; // value for "index" key in per-shard
entry
+ private SimpleOrderedMap<Object> detailedFields; // keyed by field name
+
+ ShardData(String shardAddr, Map<String, LukeResponse.FieldInfo>
shardFieldInfo) {
+ this.shardAddr = shardAddr;
+ this.shardFieldInfo = shardFieldInfo;
+ }
+
+ void setIndexInfo(NamedList<Object> indexInfo) {
+ this.indexInfo = indexInfo;
+ }
+
+ void addDetailedFieldInfo(String fieldName, SimpleOrderedMap<Object>
fieldStats) {
+ if (detailedFields == null) {
+ detailedFields = new SimpleOrderedMap<>();
+ }
+ detailedFields.add(fieldName, fieldStats);
+ }
+
+ SimpleOrderedMap<Object> toResponseEntry() {
+ SimpleOrderedMap<Object> entry = new SimpleOrderedMap<>();
+ if (indexInfo != null) {
+ entry.add(RSP_INDEX, indexInfo);
+ }
+ if (detailedFields != null) {
+ entry.add(RSP_FIELDS, detailedFields);
+ }
+ return entry;
+ }
+ }
+
+ /**
+ * @return true if the request was handled in distributed mode, false if
prepDistributed
+ * short-circuited (e.g. single-shard collection) and the caller should
fall through to local
+ * logic.
+ */
+ private boolean handleDistributed(SolrQueryRequest req, SolrQueryResponse
rsp) {
+ SolrParams reqParams = req.getParams();
+
+ // docId is a Lucene-internal integer, not meaningful across shards
+ if (reqParams.getInt(DOC_ID) != null) {
+ throw new SolrException(
+ ErrorCode.BAD_REQUEST,
+ "docId parameter is not supported in distributed mode."
+ + " Use the id parameter to look up documents by their Solr
unique key.");
+ }
+
+ ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
+ ResponseBuilder rb = new ResponseBuilder(req, rsp, List.of());
+ shardHandler.prepDistributed(rb);
+
+ String[] shards = rb.shards;
+ if (shards == null || shards.length == 0) {
+ return false;
+ }
+
+ ShardRequest sreq = new ShardRequest();
+ sreq.shards = shards;
+ sreq.actualShards = shards;
+ sreq.responses = new ArrayList<>(shards.length);
+
+ String reqPath = (String) req.getContext().get(PATH);
+
+ for (String shard : shards) {
+ ModifiableSolrParams params = new ModifiableSolrParams(reqParams);
+ params.set(CommonParams.QT, reqPath);
+ ShardHandler.setShardAttributesToParams(params, sreq.purpose);
+ shardHandler.submit(sreq, shard, params);
+ }
+
+ ShardResponse lastSrsp = shardHandler.takeCompletedOrError();
+ if (lastSrsp == null) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, "No responses received
from shards");
+ }
+ List<ShardResponse> responses = sreq.responses;
+ for (ShardResponse srsp : responses) {
+ if (srsp.getException() != null) {
+ shardHandler.cancelAll();
+ if (srsp.getException() instanceof SolrException) {
+ throw (SolrException) srsp.getException();
+ }
+ throw new SolrException(ErrorCode.SERVER_ERROR, srsp.getException());
+ }
+ }
+
+ aggregateDistributedResponses(req, rsp, responses);
rsp.setHttpCaching(false);
+ return true;
+ }
+
+ private static String shardAddress(ShardResponse srsp) {
+ return srsp.getShardAddress() != null ? srsp.getShardAddress() :
srsp.getShard();
+ }
+
+ private void aggregateDistributedResponses(
+ SolrQueryRequest req, SolrQueryResponse rsp, List<ShardResponse>
responses) {
+
+ if (!responses.isEmpty()) {
+ ShardResponse firstRsp = responses.get(0);
+ NamedList<Object> firstShardRsp =
firstRsp.getSolrResponse().getResponse();
+ if (firstShardRsp == null) {
+ throw new SolrException(
+ ErrorCode.SERVER_ERROR,
+ "Unexpected empty response from shard: " + shardAddress(firstRsp));
+ }
+ Object schema = firstShardRsp.get(RSP_SCHEMA);
+ if (schema != null) {
+ rsp.add(RSP_SCHEMA, schema);
+ }
+ }
+
+ long totalNumDocs = 0;
+ int totalMaxDoc = 0;
+ long totalDeletedDocs = 0;
+ int totalSegmentCount = 0;
+ Map<String, AggregatedFieldData> aggregatedFields = new TreeMap<>();
+ String firstDocShard = null;
+ Object firstDoc = null;
+ List<ShardData> shardDataList = new ArrayList<>();
+
+ for (ShardResponse srsp : responses) {
+ NamedList<Object> shardRsp = srsp.getSolrResponse().getResponse();
+ LukeResponse lukeRsp = new LukeResponse();
+ lukeRsp.setResponse(shardRsp);
+ // Only process field info if the shard explicitly included it in its
response.
+ // LukeResponse.getFieldInfo() falls back to schema.fields which has
incomplete data.
+ Map<String, LukeResponse.FieldInfo> fieldInfo =
+ shardRsp.get(RSP_FIELDS) != null ? lukeRsp.getFieldInfo() : null;
+ ShardData shardData = new ShardData(shardAddress(srsp), fieldInfo);
+
+ NamedList<Object> shardIndex = lukeRsp.getIndexInfo();
+ if (shardIndex != null) {
+ totalNumDocs += Optional.ofNullable(lukeRsp.getNumDocs()).orElse(0L);
+ totalMaxDoc = Math.max(totalMaxDoc,
Optional.ofNullable(lukeRsp.getMaxDoc()).orElse(0));
+ totalDeletedDocs +=
Optional.ofNullable(lukeRsp.getDeletedDocs()).orElse(0L);
+ Number segCount = (Number) shardIndex.get(KEY_SEGMENT_COUNT);
+ totalSegmentCount += segCount != null ? segCount.intValue() : 0;
+
+ shardData.setIndexInfo(shardIndex);
+ }
+
+ processShardFields(shardData, aggregatedFields);
+ Object doc = shardRsp.get(RSP_DOC);
+ if (doc != null) {
+ if (firstDoc != null) {
+ throw new SolrException(
+ ErrorCode.SERVER_ERROR,
+ "Solr Id of document "
+ + firstDoc
+ + " found on multiple shards ("
+ + firstDocShard
+ + " and "
+ + shardAddress(srsp)
+ + "). The index is corrupt: unique key constraint
violated.");
+ }
+ firstDoc = doc;
+ firstDocShard = shardAddress(srsp);
+ }
+ shardDataList.add(shardData);
+ }
+
+ shardDataList.sort(Comparator.comparing(sd -> sd.shardAddr));
+ SimpleOrderedMap<Object> shardsInfo = new SimpleOrderedMap<>();
+ for (ShardData sd : shardDataList) {
+ SimpleOrderedMap<Object> entry = sd.toResponseEntry();
+ if (!entry.isEmpty()) {
+ shardsInfo.add(sd.shardAddr, entry);
+ }
+ }
+
+ SimpleOrderedMap<Object> aggregatedIndex = new SimpleOrderedMap<>();
+ aggregatedIndex.add(KEY_NUM_DOCS, totalNumDocs);
+ aggregatedIndex.add(KEY_MAX_DOC, totalMaxDoc);
+ aggregatedIndex.add(KEY_DELETED_DOCS, totalDeletedDocs);
+ aggregatedIndex.add(KEY_SEGMENT_COUNT, totalSegmentCount);
+ rsp.add(RSP_INDEX, aggregatedIndex);
+
+ if (firstDoc != null) {
+ rsp.add(RSP_DOC, firstDoc);
+ }
+ boolean narrowLongs = shouldNarrowLongsForOldClient(req);
+ if (narrowLongs) {
+ narrowLongToInt(aggregatedIndex, KEY_NUM_DOCS);
+ narrowLongToInt(aggregatedIndex, KEY_DELETED_DOCS);
+ }
+ if (!aggregatedFields.isEmpty()) {
+ SimpleOrderedMap<Object> aggregatedFieldsNL = new SimpleOrderedMap<>();
+ for (Map.Entry<String, AggregatedFieldData> entry :
aggregatedFields.entrySet()) {
+ SimpleOrderedMap<Object> fieldResponse = entry.getValue().toResponse();
+ if (narrowLongs) {
+ narrowLongToInt(fieldResponse, KEY_DOCS);
+ }
+ aggregatedFieldsNL.add(entry.getKey(), fieldResponse);
+ }
+ rsp.add(RSP_FIELDS, aggregatedFieldsNL);
+ }
+
+ // Add info section last (before shards), matching the local-mode key
order.
+ if (!responses.isEmpty()) {
+ NamedList<Object> firstShardRsp =
responses.get(0).getSolrResponse().getResponse();
+ Object info = firstShardRsp == null ? null : firstShardRsp.get(RSP_INFO);
+ if (info != null) {
+ rsp.add(RSP_INFO, info);
+ }
+ }
+
+ if (req.getParams().getBool(ShardParams.SHARDS_INFO, false)) {
+ rsp.add(RSP_SHARDS, shardsInfo);
+ }
+ }
+
+ private void processShardFields(
+ ShardData shardData, Map<String, AggregatedFieldData> aggregatedFields) {
+ if (shardData.shardFieldInfo == null) {
+ return;
+ }
+ for (Map.Entry<String, LukeResponse.FieldInfo> entry :
shardData.shardFieldInfo.entrySet()) {
+ String fieldName = entry.getKey();
+ LukeResponse.FieldInfo fi = entry.getValue();
+
+ aggregateShardField(shardData.shardAddr, fi, aggregatedFields);
+
+ // Detailed stats — kept per-shard, not aggregated
+ NamedList<Integer> topTerms = fi.getTopTerms();
+ if (topTerms != null) {
+ SimpleOrderedMap<Object> detailedFieldInfo = new SimpleOrderedMap<>();
+ detailedFieldInfo.add(KEY_TOP_TERMS, topTerms);
+ detailedFieldInfo.add(KEY_HISTOGRAM,
fi.getExtras().get(KEY_HISTOGRAM));
+ detailedFieldInfo.add(KEY_DISTINCT, fi.getDistinct());
+ shardData.addDetailedFieldInfo(fieldName, detailedFieldInfo);
+ }
+ }
+ }
+
+ private void aggregateShardField(
+ String shardAddr,
+ LukeResponse.FieldInfo fi,
+ Map<String, AggregatedFieldData> aggregatedFields) {
+
+ String fieldName = fi.getName();
+
+ AggregatedFieldData fieldData = aggregatedFields.get(fieldName);
+ if (fieldData == null) {
+ fieldData = new AggregatedFieldData(shardAddr, fi);
+ aggregatedFields.put(fieldName, fieldData);
+ } else {
+ // Subsequent shards: validate that type, schema, and dynamicBase match
+ validateFieldAttr(
+ fieldName,
+ KEY_TYPE,
+ fi.getType(),
+ fieldData.originalFieldInfo.getType(),
+ shardAddr,
+ fieldData.originalShardAddr);
+ validateFieldAttr(
+ fieldName,
+ KEY_SCHEMA_FLAGS,
+ fi.getSchema(),
+ fieldData.originalFieldInfo.getSchema(),
+ shardAddr,
+ fieldData.originalShardAddr);
+ validateFieldAttr(
+ fieldName,
+ KEY_DYNAMIC_BASE,
+ fi.getExtras().get(KEY_DYNAMIC_BASE),
+ fieldData.originalFieldInfo.getExtras().get(KEY_DYNAMIC_BASE),
+ shardAddr,
+ fieldData.originalShardAddr);
+
+ Object indexFlags = fi.getExtras().get(KEY_INDEX_FLAGS);
+ if (indexFlags != null) {
+ Object existing = fieldData.properties.get(FieldDataKey.INDEX);
+ if (existing == null) {
+ fieldData.properties.put(FieldDataKey.INDEX, indexFlags);
+ fieldData.indexFlagsShardAddr = shardAddr;
+ } else {
+ validateFieldAttr(
+ fieldName,
+ KEY_INDEX_FLAGS,
+ indexFlags,
+ existing,
+ shardAddr,
+ fieldData.indexFlagsShardAddr);
+ }
+ }
+ }
+
+ // Sum per-shard doc counts
+ fieldData.properties.merge(FieldDataKey.DOCS, fi.getDocs(), (a, b) ->
(long) a + (long) b);
+ }
+
+ /**
+ * Minimum client version that understands Long values in distributed Luke
responses. Distributed
+ * Luke aggregates counts across shards, which can overflow Integer. Older
clients cast these
+ * values to Integer and would fail with a ClassCastException.
+ */
+ private static final SolrVersion DISTRIB_LONG_COUNTS_MIN_VERSION =
+ SolrVersion.forIntegers(9, 11, 0);
+
+ private static boolean shouldNarrowLongsForOldClient(SolrQueryRequest req) {
+ HttpSolrCall call = req.getHttpSolrCall();
+ if (call == null) return false;
+ SolrVersion clientVersion = call.getUserAgentSolrVersion();
+ return clientVersion != null &&
clientVersion.lessThan(DISTRIB_LONG_COUNTS_MIN_VERSION);
Review Comment:
One issue here is that this only works for `javabin` clients, but a client
requesting `wt=xml` may still get back an unexpected type, i.e. `<long
name="docs">10000</long>` instead of `<int name="docs">10000</int>`. You can of
course go back to the old, undistributed behavior via `distrib=false`. Btw I
found it amusing that you can hijack the solrj-version headers for the xml wt
so it respects the narrowing while writing out to xml instead of javabin, i.e.:
`curl -s -H 'User-Agent:
Solr[org.apache.solr.client.solrj.impl.Http2SolrClient] 9.10.0' \
'http://localhost:8983/solr/luke_test/admin/luke?wt=xml&shards.info=true'`
vs
`curl -s -H 'User-Agent:
Solr[org.apache.solr.client.solrj.impl.Http2SolrClient] 9.11.0' \
'http://localhost:8983/solr/luke_test/admin/luke?wt=xml&shards.info=true'`
But obviously that is an odd thing to recommend to users so I'd stick to
recommending reverting to old behavior via `distrib=false` as is documented.
Perhaps it is worth rewriting this more explicitly:
> To revert to old, pre-distributed behavior just pass distrib=false
I just worry about backwards compat and breaking people going from a popular
and stable 9.10 version.
##########
solr/solr-ref-guide/modules/indexing-guide/pages/luke-request-handler.adoc:
##########
@@ -118,3 +130,43 @@ Alternatively, to work through the Lucene native id:
http://localhost:8983/solr/techproducts/admin/luke?fl=manu&docId=0
From SolrJ, you can access /luke using the
{solr-javadocs}/solrj/org/apache/solr/client/solrj/request/LukeRequest.html[`LukeRequest`]
object.
+
+== Distributed Mode (multiple shards)
+
+When running in SolrCloud, the Luke handler automatically distributes requests
across all shards in the collection, the same as search requests.
+To inspect only the receiving shard's index set `distrib=false`.
Review Comment:
As mentioned previously, perhaps it is worth rewriting this more explicitly:
> To revert to old, pre-distributed behavior just pass distrib=false
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]