kotman12 commented on code in PR #4472:
URL: https://github.com/apache/solr/pull/4472#discussion_r3313628122


##########
solr/core/src/java/org/apache/solr/handler/admin/LukeRequestHandler.java:
##########
@@ -189,8 +240,411 @@ public void handleRequestBody(SolrQueryRequest req, 
SolrQueryResponse rsp) throw
     info.add(
         "NOTE",
         "Document Frequency (df) is not updated when a document is marked for 
deletion.  df values include deleted documents.");
-    rsp.add("info", info);
+    rsp.add(RSP_INFO, info);
+    rsp.setHttpCaching(false);
+  }
+
+  /**
+   * Field-level response keys, declared in the order they appear in the local 
(non-distributed)
+   * response. EnumMap iteration follows declaration order, giving 
deterministic output.
+   */
+  enum FieldDataKey {
+    TYPE(KEY_TYPE),
+    SCHEMA(KEY_SCHEMA_FLAGS),
+    DYNAMIC_BASE(KEY_DYNAMIC_BASE),
+    INDEX(KEY_INDEX_FLAGS),
+    DOCS(KEY_DOCS);
+
+    final String responseKey;
+
+    FieldDataKey(String responseKey) {
+      this.responseKey = responseKey;
+    }
+  }
+
+  /** Per-field accumulation state across shards: aggregated response data and 
field validation. */
+  private static class AggregatedFieldData {
+    final EnumMap<FieldDataKey, Object> properties = new 
EnumMap<>(FieldDataKey.class);
+    final String originalShardAddr;
+    final LukeResponse.FieldInfo originalFieldInfo;
+    private String indexFlagsShardAddr;
+
+    AggregatedFieldData(String shardAddr, LukeResponse.FieldInfo fieldInfo) {
+      this.originalShardAddr = shardAddr;
+      this.originalFieldInfo = fieldInfo;
+      properties.put(FieldDataKey.TYPE, fieldInfo.getType());
+      properties.put(FieldDataKey.SCHEMA, fieldInfo.getSchema());
+      Object dynBase = fieldInfo.getExtras().get(KEY_DYNAMIC_BASE);
+      if (dynBase != null) {
+        properties.put(FieldDataKey.DYNAMIC_BASE, dynBase);
+      }
+      Object indexFlags = fieldInfo.getExtras().get(KEY_INDEX_FLAGS);
+      if (indexFlags != null) {
+        properties.put(FieldDataKey.INDEX, indexFlags);
+        this.indexFlagsShardAddr = shardAddr;
+      }
+    }
+
+    SimpleOrderedMap<Object> toResponse() {
+      SimpleOrderedMap<Object> result = new SimpleOrderedMap<>();
+      for (Map.Entry<FieldDataKey, Object> entry : properties.entrySet()) {
+        result.add(entry.getKey().responseKey, entry.getValue());
+      }
+      return result;
+    }
+  }
+
+  private static class ShardData {
+    final String shardAddr; // key in "shards" response map
+    final Map<String, LukeResponse.FieldInfo> shardFieldInfo; // keyed by 
field name
+    private NamedList<Object> indexInfo; // value for "index" key in per-shard 
entry
+    private SimpleOrderedMap<Object> detailedFields; // keyed by field name
+
+    ShardData(String shardAddr, Map<String, LukeResponse.FieldInfo> 
shardFieldInfo) {
+      this.shardAddr = shardAddr;
+      this.shardFieldInfo = shardFieldInfo;
+    }
+
+    void setIndexInfo(NamedList<Object> indexInfo) {
+      this.indexInfo = indexInfo;
+    }
+
+    void addDetailedFieldInfo(String fieldName, SimpleOrderedMap<Object> 
fieldStats) {
+      if (detailedFields == null) {
+        detailedFields = new SimpleOrderedMap<>();
+      }
+      detailedFields.add(fieldName, fieldStats);
+    }
+
+    SimpleOrderedMap<Object> toResponseEntry() {
+      SimpleOrderedMap<Object> entry = new SimpleOrderedMap<>();
+      if (indexInfo != null) {
+        entry.add(RSP_INDEX, indexInfo);
+      }
+      if (detailedFields != null) {
+        entry.add(RSP_FIELDS, detailedFields);
+      }
+      return entry;
+    }
+  }
+
+  /**
+   * @return true if the request was handled in distributed mode, false if 
prepDistributed
+   *     short-circuited (e.g. single-shard collection) and the caller should 
fall through to local
+   *     logic.
+   */
+  private boolean handleDistributed(SolrQueryRequest req, SolrQueryResponse 
rsp) {
+    SolrParams reqParams = req.getParams();
+
+    // docId is a Lucene-internal integer, not meaningful across shards
+    if (reqParams.getInt(DOC_ID) != null) {
+      throw new SolrException(
+          ErrorCode.BAD_REQUEST,
+          "docId parameter is not supported in distributed mode."
+              + " Use the id parameter to look up documents by their Solr 
unique key.");
+    }
+
+    ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
+    ResponseBuilder rb = new ResponseBuilder(req, rsp, List.of());
+    shardHandler.prepDistributed(rb);
+
+    String[] shards = rb.shards;
+    if (shards == null || shards.length == 0) {
+      return false;
+    }
+
+    ShardRequest sreq = new ShardRequest();
+    sreq.shards = shards;
+    sreq.actualShards = shards;
+    sreq.responses = new ArrayList<>(shards.length);
+
+    String reqPath = (String) req.getContext().get(PATH);
+
+    for (String shard : shards) {
+      ModifiableSolrParams params = new ModifiableSolrParams(reqParams);
+      params.set(CommonParams.QT, reqPath);
+      ShardHandler.setShardAttributesToParams(params, sreq.purpose);
+      shardHandler.submit(sreq, shard, params);
+    }
+
+    ShardResponse lastSrsp = shardHandler.takeCompletedOrError();
+    if (lastSrsp == null) {
+      throw new SolrException(ErrorCode.SERVER_ERROR, "No responses received 
from shards");
+    }
+    List<ShardResponse> responses = sreq.responses;
+    for (ShardResponse srsp : responses) {
+      if (srsp.getException() != null) {
+        shardHandler.cancelAll();
+        if (srsp.getException() instanceof SolrException) {
+          throw (SolrException) srsp.getException();
+        }
+        throw new SolrException(ErrorCode.SERVER_ERROR, srsp.getException());
+      }
+    }
+
+    aggregateDistributedResponses(req, rsp, responses);
     rsp.setHttpCaching(false);
+    return true;
+  }
+
+  private static String shardAddress(ShardResponse srsp) {
+    return srsp.getShardAddress() != null ? srsp.getShardAddress() : 
srsp.getShard();
+  }
+
+  private void aggregateDistributedResponses(
+      SolrQueryRequest req, SolrQueryResponse rsp, List<ShardResponse> 
responses) {
+
+    if (!responses.isEmpty()) {
+      ShardResponse firstRsp = responses.get(0);
+      NamedList<Object> firstShardRsp = 
firstRsp.getSolrResponse().getResponse();
+      if (firstShardRsp == null) {
+        throw new SolrException(
+            ErrorCode.SERVER_ERROR,
+            "Unexpected empty response from shard: " + shardAddress(firstRsp));
+      }
+      Object schema = firstShardRsp.get(RSP_SCHEMA);
+      if (schema != null) {
+        rsp.add(RSP_SCHEMA, schema);
+      }
+    }
+
+    long totalNumDocs = 0;
+    int totalMaxDoc = 0;
+    long totalDeletedDocs = 0;
+    int totalSegmentCount = 0;
+    Map<String, AggregatedFieldData> aggregatedFields = new TreeMap<>();
+    String firstDocShard = null;
+    Object firstDoc = null;
+    List<ShardData> shardDataList = new ArrayList<>();
+
+    for (ShardResponse srsp : responses) {
+      NamedList<Object> shardRsp = srsp.getSolrResponse().getResponse();
+      LukeResponse lukeRsp = new LukeResponse();
+      lukeRsp.setResponse(shardRsp);
+      // Only process field info if the shard explicitly included it in its 
response.
+      // LukeResponse.getFieldInfo() falls back to schema.fields which has 
incomplete data.
+      Map<String, LukeResponse.FieldInfo> fieldInfo =
+          shardRsp.get(RSP_FIELDS) != null ? lukeRsp.getFieldInfo() : null;
+      ShardData shardData = new ShardData(shardAddress(srsp), fieldInfo);
+
+      NamedList<Object> shardIndex = lukeRsp.getIndexInfo();
+      if (shardIndex != null) {
+        totalNumDocs += Optional.ofNullable(lukeRsp.getNumDocs()).orElse(0L);
+        totalMaxDoc = Math.max(totalMaxDoc, 
Optional.ofNullable(lukeRsp.getMaxDoc()).orElse(0));
+        totalDeletedDocs += 
Optional.ofNullable(lukeRsp.getDeletedDocs()).orElse(0L);
+        Number segCount = (Number) shardIndex.get(KEY_SEGMENT_COUNT);
+        totalSegmentCount += segCount != null ? segCount.intValue() : 0;
+
+        shardData.setIndexInfo(shardIndex);
+      }
+
+      processShardFields(shardData, aggregatedFields);
+      Object doc = shardRsp.get(RSP_DOC);
+      if (doc != null) {
+        if (firstDoc != null) {
+          throw new SolrException(
+              ErrorCode.SERVER_ERROR,
+              "Solr Id of document "
+                  + firstDoc
+                  + " found on multiple shards ("
+                  + firstDocShard
+                  + " and "
+                  + shardAddress(srsp)
+                  + "). The index is corrupt: unique key constraint 
violated.");
+        }
+        firstDoc = doc;
+        firstDocShard = shardAddress(srsp);
+      }
+      shardDataList.add(shardData);
+    }
+
+    shardDataList.sort(Comparator.comparing(sd -> sd.shardAddr));
+    SimpleOrderedMap<Object> shardsInfo = new SimpleOrderedMap<>();
+    for (ShardData sd : shardDataList) {
+      SimpleOrderedMap<Object> entry = sd.toResponseEntry();
+      if (!entry.isEmpty()) {
+        shardsInfo.add(sd.shardAddr, entry);
+      }
+    }
+
+    SimpleOrderedMap<Object> aggregatedIndex = new SimpleOrderedMap<>();
+    aggregatedIndex.add(KEY_NUM_DOCS, totalNumDocs);
+    aggregatedIndex.add(KEY_MAX_DOC, totalMaxDoc);
+    aggregatedIndex.add(KEY_DELETED_DOCS, totalDeletedDocs);
+    aggregatedIndex.add(KEY_SEGMENT_COUNT, totalSegmentCount);
+    rsp.add(RSP_INDEX, aggregatedIndex);
+
+    if (firstDoc != null) {
+      rsp.add(RSP_DOC, firstDoc);
+    }
+    boolean narrowLongs = shouldNarrowLongsForOldClient(req);
+    if (narrowLongs) {
+      narrowLongToInt(aggregatedIndex, KEY_NUM_DOCS);
+      narrowLongToInt(aggregatedIndex, KEY_DELETED_DOCS);
+    }
+    if (!aggregatedFields.isEmpty()) {
+      SimpleOrderedMap<Object> aggregatedFieldsNL = new SimpleOrderedMap<>();
+      for (Map.Entry<String, AggregatedFieldData> entry : 
aggregatedFields.entrySet()) {
+        SimpleOrderedMap<Object> fieldResponse = entry.getValue().toResponse();
+        if (narrowLongs) {
+          narrowLongToInt(fieldResponse, KEY_DOCS);
+        }
+        aggregatedFieldsNL.add(entry.getKey(), fieldResponse);
+      }
+      rsp.add(RSP_FIELDS, aggregatedFieldsNL);
+    }
+
+    // Add info section last (before shards), matching the local-mode key 
order.
+    if (!responses.isEmpty()) {
+      NamedList<Object> firstShardRsp = 
responses.get(0).getSolrResponse().getResponse();
+      Object info = firstShardRsp == null ? null : firstShardRsp.get(RSP_INFO);
+      if (info != null) {
+        rsp.add(RSP_INFO, info);
+      }
+    }
+
+    if (req.getParams().getBool(ShardParams.SHARDS_INFO, false)) {
+      rsp.add(RSP_SHARDS, shardsInfo);
+    }
+  }
+
+  private void processShardFields(
+      ShardData shardData, Map<String, AggregatedFieldData> aggregatedFields) {
+    if (shardData.shardFieldInfo == null) {
+      return;
+    }
+    for (Map.Entry<String, LukeResponse.FieldInfo> entry : 
shardData.shardFieldInfo.entrySet()) {
+      String fieldName = entry.getKey();
+      LukeResponse.FieldInfo fi = entry.getValue();
+
+      aggregateShardField(shardData.shardAddr, fi, aggregatedFields);
+
+      // Detailed stats — kept per-shard, not aggregated
+      NamedList<Integer> topTerms = fi.getTopTerms();
+      if (topTerms != null) {
+        SimpleOrderedMap<Object> detailedFieldInfo = new SimpleOrderedMap<>();
+        detailedFieldInfo.add(KEY_TOP_TERMS, topTerms);
+        detailedFieldInfo.add(KEY_HISTOGRAM, 
fi.getExtras().get(KEY_HISTOGRAM));
+        detailedFieldInfo.add(KEY_DISTINCT, fi.getDistinct());
+        shardData.addDetailedFieldInfo(fieldName, detailedFieldInfo);
+      }
+    }
+  }
+
+  private void aggregateShardField(
+      String shardAddr,
+      LukeResponse.FieldInfo fi,
+      Map<String, AggregatedFieldData> aggregatedFields) {
+
+    String fieldName = fi.getName();
+
+    AggregatedFieldData fieldData = aggregatedFields.get(fieldName);
+    if (fieldData == null) {
+      fieldData = new AggregatedFieldData(shardAddr, fi);
+      aggregatedFields.put(fieldName, fieldData);
+    } else {
+      // Subsequent shards: validate that type, schema, and dynamicBase match
+      validateFieldAttr(
+          fieldName,
+          KEY_TYPE,
+          fi.getType(),
+          fieldData.originalFieldInfo.getType(),
+          shardAddr,
+          fieldData.originalShardAddr);
+      validateFieldAttr(
+          fieldName,
+          KEY_SCHEMA_FLAGS,
+          fi.getSchema(),
+          fieldData.originalFieldInfo.getSchema(),
+          shardAddr,
+          fieldData.originalShardAddr);
+      validateFieldAttr(
+          fieldName,
+          KEY_DYNAMIC_BASE,
+          fi.getExtras().get(KEY_DYNAMIC_BASE),
+          fieldData.originalFieldInfo.getExtras().get(KEY_DYNAMIC_BASE),
+          shardAddr,
+          fieldData.originalShardAddr);
+
+      Object indexFlags = fi.getExtras().get(KEY_INDEX_FLAGS);
+      if (indexFlags != null) {
+        Object existing = fieldData.properties.get(FieldDataKey.INDEX);
+        if (existing == null) {
+          fieldData.properties.put(FieldDataKey.INDEX, indexFlags);
+          fieldData.indexFlagsShardAddr = shardAddr;
+        } else {
+          validateFieldAttr(
+              fieldName,
+              KEY_INDEX_FLAGS,
+              indexFlags,
+              existing,
+              shardAddr,
+              fieldData.indexFlagsShardAddr);
+        }
+      }
+    }
+
+    // Sum per-shard doc counts
+    fieldData.properties.merge(FieldDataKey.DOCS, fi.getDocs(), (a, b) -> 
(long) a + (long) b);
+  }
+
+  /**
+   * Minimum client version that understands Long values in distributed Luke 
responses. Distributed
+   * Luke aggregates counts across shards, which can overflow Integer. Older 
clients cast these
+   * values to Integer and would fail with a ClassCastException.
+   */
+  private static final SolrVersion DISTRIB_LONG_COUNTS_MIN_VERSION =
+      SolrVersion.forIntegers(9, 11, 0);
+
+  private static boolean shouldNarrowLongsForOldClient(SolrQueryRequest req) {
+    HttpSolrCall call = req.getHttpSolrCall();
+    if (call == null) return false;
+    SolrVersion clientVersion = call.getUserAgentSolrVersion();
+    return clientVersion != null && 
clientVersion.lessThan(DISTRIB_LONG_COUNTS_MIN_VERSION);

Review Comment:
   One issue here is that this only works for `javabin` clients, but a client 
requesting `wt=xml` may still get back an unexpected type, i.e. `<long 
name="docs">10000</long>` instead of `<int name="docs">10000</int>`. You can of 
course go back to the old, undistributed behavior via `distrib=false`. Btw I 
found it amusing that you can hijack the solrj-version headers for the xml wt 
so it respects the narrowing while writing out to xml instead of javabin, i.e.:
   
   `curl -s -H 'User-Agent: 
Solr[org.apache.solr.client.solrj.impl.Http2SolrClient] 9.10.0' \
     'http://localhost:8983/solr/luke_test/admin/luke?wt=xml&shards.info=true'`
   
   vs
   
   `curl -s -H 'User-Agent: 
Solr[org.apache.solr.client.solrj.impl.Http2SolrClient] 9.11.0' \
     'http://localhost:8983/solr/luke_test/admin/luke?wt=xml&shards.info=true'`
   
   But obviously that is an odd thing to recommend to users so I'd stick to 
recommending reverting to old behavior via `distrib=false` as is documented. 
Perhaps it is worth rewriting this more explicitly:
   
   > To revert to old, pre-distributed behavior just pass distrib=false
   
   I just worry about backwards compat and breaking people going from a popular 
and stable 9.10 version.



##########
solr/solr-ref-guide/modules/indexing-guide/pages/luke-request-handler.adoc:
##########
@@ -118,3 +130,43 @@ Alternatively, to work through the Lucene native id:
 http://localhost:8983/solr/techproducts/admin/luke?fl=manu&docId=0
 
 From SolrJ, you can access /luke using the 
{solr-javadocs}/solrj/org/apache/solr/client/solrj/request/LukeRequest.html[`LukeRequest`]
 object.
+
+== Distributed Mode (multiple shards)
+
+When running in SolrCloud, the Luke handler automatically distributes requests 
across all shards in the collection, the same as search requests.
+To inspect only the receiving shard's index set `distrib=false`.

Review Comment:
   As mentioned previously, perhaps it is worth rewriting this more explicitly:
   
   > To revert to old, pre-distributed behavior just pass distrib=false



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to