dsmiley commented on code in PR #4149:
URL: https://github.com/apache/solr/pull/4149#discussion_r2956981246


##########
solr/core/src/java/org/apache/solr/handler/admin/LukeRequestHandler.java:
##########
@@ -192,8 +238,346 @@ public void handleRequestBody(SolrQueryRequest req, 
SolrQueryResponse rsp) throw
     info.add(
         "NOTE",
         "Document Frequency (df) is not updated when a document is marked for 
deletion.  df values include deleted documents.");
-    rsp.add("info", info);
+    rsp.add(RSP_INFO, info);
+    rsp.setHttpCaching(false);
+  }
+
+  /** Per-field accumulation state across shards: aggregated response data and 
field validation. */
+  private static class AggregatedFieldData {
+    final SimpleOrderedMap<Object> aggregated = new SimpleOrderedMap<>();
+    final String originalShardAddr;
+    final LukeResponse.FieldInfo originalFieldInfo;
+    private Object indexFlags;
+    private String indexFlagsShardAddr;
+
+    AggregatedFieldData(String shardAddr, LukeResponse.FieldInfo fieldInfo) {
+      this.originalShardAddr = shardAddr;
+      this.originalFieldInfo = fieldInfo;
+      Object flags = fieldInfo.getExtras().get(KEY_INDEX_FLAGS);
+      if (flags != null) {
+        this.indexFlags = flags;
+        this.indexFlagsShardAddr = shardAddr;
+      }
+    }
+  }
+
+  private static class ShardData {
+    final String shardAddr;
+    final Map<String, LukeResponse.FieldInfo> shardFieldInfo;
+    private NamedList<Object> indexInfo;
+    private SimpleOrderedMap<Object> detailedFields;
+
+    ShardData(String shardAddr, Map<String, LukeResponse.FieldInfo> 
shardFieldInfo) {
+      this.shardAddr = shardAddr;
+      this.shardFieldInfo = shardFieldInfo;
+    }
+
+    void setIndexInfo(NamedList<Object> indexInfo) {
+      this.indexInfo = indexInfo;
+    }
+
+    void addDetailedFieldInfo(String fieldName, SimpleOrderedMap<Object> 
fieldStats) {
+      if (detailedFields == null) {
+        detailedFields = new SimpleOrderedMap<>();
+      }
+      detailedFields.add(fieldName, fieldStats);
+    }
+
+    SimpleOrderedMap<Object> toResponseEntry() {
+      SimpleOrderedMap<Object> entry = new SimpleOrderedMap<>();
+      if (indexInfo != null) {
+        entry.add(RSP_INDEX, indexInfo);
+      }
+      if (detailedFields != null) {
+        entry.add(RSP_FIELDS, detailedFields);
+      }
+      return entry;
+    }
+  }
+
+  /**
+   * @return true if the request was handled in distributed mode, false if 
prepDistributed
+   *     short-circuited (e.g. single-shard collection) and the caller should 
fall through to local
+   *     logic.
+   */
+  private boolean handleDistributed(SolrQueryRequest req, SolrQueryResponse 
rsp) {
+    SolrParams reqParams = req.getParams();
+
+    // docId is a Lucene-internal integer, not meaningful across shards
+    if (reqParams.getInt(DOC_ID) != null) {
+      throw new SolrException(
+          ErrorCode.BAD_REQUEST,
+          "docId parameter is not supported in distributed mode."
+              + " Use the id parameter to look up documents by their Solr 
unique key.");
+    }
+
+    ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
+    ResponseBuilder rb = new ResponseBuilder(req, rsp, 
Collections.emptyList());
+    shardHandler.prepDistributed(rb);
+
+    String[] shards = rb.shards;
+    if (shards == null || shards.length == 0) {
+      return false;
+    }
+
+    ShardRequest sreq = new ShardRequest();
+    sreq.shards = shards;
+    sreq.actualShards = shards;
+    sreq.responses = new ArrayList<>(shards.length);
+
+    String reqPath = (String) req.getContext().get(PATH);
+
+    for (String shard : shards) {
+      ModifiableSolrParams params = new ModifiableSolrParams(reqParams);
+      params.set(CommonParams.QT, reqPath);
+      ShardHandler.setShardAttributesToParams(params, sreq.purpose);
+      shardHandler.submit(sreq, shard, params);
+    }
+
+    ShardResponse lastSrsp = shardHandler.takeCompletedOrError();
+    if (lastSrsp == null) {
+      throw new SolrException(ErrorCode.SERVER_ERROR, "No responses received 
from shards");
+    }
+    List<ShardResponse> responses = sreq.responses;
+    for (ShardResponse srsp : responses) {
+      if (srsp.getException() != null) {
+        shardHandler.cancelAll();
+        if (srsp.getException() instanceof SolrException) {
+          throw (SolrException) srsp.getException();
+        }
+        throw new SolrException(ErrorCode.SERVER_ERROR, srsp.getException());
+      }
+    }
+
+    aggregateDistributedResponses(rsp, responses);
     rsp.setHttpCaching(false);
+    return true;
+  }
+
+  private static String shardAddress(ShardResponse srsp) {
+    return srsp.getShardAddress() != null ? srsp.getShardAddress() : 
srsp.getShard();
+  }
+
+  private void aggregateDistributedResponses(SolrQueryResponse rsp, 
List<ShardResponse> responses) {
+
+    if (!responses.isEmpty()) {
+      ShardResponse firstRsp = responses.getFirst();
+      NamedList<Object> firstShardRsp = 
firstRsp.getSolrResponse().getResponse();
+      if (firstShardRsp == null) {
+        throw new SolrException(
+            ErrorCode.SERVER_ERROR,
+            "Unexpected empty response from shard: " + shardAddress(firstRsp));
+      }
+      Object schema = firstShardRsp.get(RSP_SCHEMA);
+      if (schema != null) {
+        rsp.add(RSP_SCHEMA, schema);
+      }
+      Object info = firstShardRsp.get(RSP_INFO);
+      if (info != null) {
+        rsp.add(RSP_INFO, info);
+      }
+    }
+
+    long totalNumDocs = 0;
+    int totalMaxDoc = 0;
+    long totalDeletedDocs = 0;
+    int totalSegmentCount = 0;
+    Map<String, AggregatedFieldData> aggregatedFields = new HashMap<>();
+    String firstDocShard = null;
+    Object firstDoc = null;
+    List<ShardData> shardDataList = new ArrayList<>();
+
+    for (ShardResponse srsp : responses) {
+      NamedList<Object> shardRsp = srsp.getSolrResponse().getResponse();
+      LukeResponse lukeRsp = new LukeResponse();
+      lukeRsp.setResponse(shardRsp);
+      // Only process field info if the shard explicitly included it in its 
response.
+      // LukeResponse.getFieldInfo() falls back to schema.fields which has 
incomplete data.
+      Map<String, LukeResponse.FieldInfo> fieldInfo =
+          shardRsp.get(RSP_FIELDS) != null ? lukeRsp.getFieldInfo() : null;
+      ShardData shardData = new ShardData(shardAddress(srsp), fieldInfo);
+
+      NamedList<Object> shardIndex = lukeRsp.getIndexInfo();
+      if (shardIndex != null) {
+        totalNumDocs += 
Optional.ofNullable(lukeRsp.getNumDocsAsLong()).orElse(0L);
+        totalMaxDoc = Math.max(totalMaxDoc, 
Optional.ofNullable(lukeRsp.getMaxDoc()).orElse(0));
+        totalDeletedDocs += 
Optional.ofNullable(lukeRsp.getDeletedDocsAsLong()).orElse(0L);
+        Number segCount = (Number) shardIndex.get(KEY_SEGMENT_COUNT);
+        totalSegmentCount += segCount != null ? segCount.intValue() : 0;
+
+        shardData.setIndexInfo(shardIndex);
+      }
+
+      processShardFields(shardData, aggregatedFields);
+      Object doc = shardRsp.get(RSP_DOC);
+      if (doc != null) {
+        if (firstDoc != null) {
+          throw new SolrException(
+              ErrorCode.SERVER_ERROR,
+              "Solr Id of document "
+                  + firstDoc
+                  + " found on multiple shards ("
+                  + firstDocShard
+                  + " and "
+                  + shardAddress(srsp)
+                  + "). The index is corrupt: unique key constraint 
violated.");
+        }
+        firstDoc = doc;
+        firstDocShard = shardAddress(srsp);
+      }
+      shardDataList.add(shardData);
+    }
+
+    SimpleOrderedMap<Object> shardsInfo = new SimpleOrderedMap<>();
+    for (ShardData sd : shardDataList) {
+      SimpleOrderedMap<Object> entry = sd.toResponseEntry();
+      if (!entry.isEmpty()) {
+        shardsInfo.add(sd.shardAddr, entry);
+      }
+    }
+
+    SimpleOrderedMap<Object> aggregatedIndex = new SimpleOrderedMap<>();
+    aggregatedIndex.add(KEY_NUM_DOCS, totalNumDocs);
+    aggregatedIndex.add(KEY_MAX_DOC, totalMaxDoc);
+    aggregatedIndex.add(KEY_DELETED_DOCS, totalDeletedDocs);
+    aggregatedIndex.add(KEY_SEGMENT_COUNT, totalSegmentCount);
+    rsp.add(RSP_INDEX, aggregatedIndex);
+
+    if (firstDoc != null) {
+      rsp.add(RSP_DOC, firstDoc);
+    }
+    if (!aggregatedFields.isEmpty()) {
+      SimpleOrderedMap<Object> aggregatedFieldsNL = new SimpleOrderedMap<>();
+      for (Map.Entry<String, AggregatedFieldData> entry : 
aggregatedFields.entrySet()) {
+        aggregatedFieldsNL.add(entry.getKey(), entry.getValue().aggregated);
+      }
+      rsp.add(RSP_FIELDS, aggregatedFieldsNL);
+    }
+
+    rsp.add(RSP_SHARDS, shardsInfo);
+  }
+
+  private void processShardFields(
+      ShardData shardData, Map<String, AggregatedFieldData> aggregatedFields) {
+    if (shardData.shardFieldInfo == null) {
+      return;
+    }
+    for (Map.Entry<String, LukeResponse.FieldInfo> entry : 
shardData.shardFieldInfo.entrySet()) {
+      String fieldName = entry.getKey();
+      LukeResponse.FieldInfo fi = entry.getValue();
+
+      aggregateShardField(shardData.shardAddr, fi, aggregatedFields);
+
+      // Detailed stats — kept per-shard, not aggregated
+      NamedList<Integer> topTerms = fi.getTopTerms();
+      if (topTerms != null) {
+        SimpleOrderedMap<Object> detailedFieldInfo = new SimpleOrderedMap<>();
+        detailedFieldInfo.add(KEY_TOP_TERMS, topTerms);
+        detailedFieldInfo.add(KEY_HISTOGRAM, 
fi.getExtras().get(KEY_HISTOGRAM));
+        detailedFieldInfo.add(KEY_DISTINCT, fi.getDistinct());
+        shardData.addDetailedFieldInfo(fieldName, detailedFieldInfo);
+      }
+    }
+  }
+
+  private void aggregateShardField(
+      String shardAddr,
+      LukeResponse.FieldInfo fi,
+      Map<String, AggregatedFieldData> aggregatedFields) {
+
+    String fieldName = fi.getName();
+
+    AggregatedFieldData fieldData = aggregatedFields.get(fieldName);
+    if (fieldData == null) {
+      fieldData = new AggregatedFieldData(shardAddr, fi);
+      aggregatedFields.put(fieldName, fieldData);
+
+      // First shard to report this field: populate aggregated with 
schema-derived attrs
+      fieldData.aggregated.add(KEY_TYPE, fi.getType());
+      fieldData.aggregated.add(KEY_SCHEMA_FLAGS, fi.getSchema());
+      Object dynBase = fi.getExtras().get(KEY_DYNAMIC_BASE);
+      if (dynBase != null) {
+        fieldData.aggregated.add(KEY_DYNAMIC_BASE, dynBase);
+      }
+      if (fieldData.indexFlags != null) {
+        fieldData.aggregated.add(KEY_INDEX_FLAGS, fieldData.indexFlags);
+      }
+    } else {
+      // Subsequent shards: validate consistency
+      validateFieldAttr(
+          fieldName,
+          KEY_TYPE,
+          fi.getType(),
+          fieldData.originalFieldInfo.getType(),
+          shardAddr,
+          fieldData.originalShardAddr);
+      validateFieldAttr(
+          fieldName,
+          KEY_SCHEMA_FLAGS,
+          fi.getSchema(),
+          fieldData.originalFieldInfo.getSchema(),
+          shardAddr,
+          fieldData.originalShardAddr);
+      validateFieldAttr(
+          fieldName,
+          KEY_DYNAMIC_BASE,
+          fi.getExtras().get(KEY_DYNAMIC_BASE),
+          fieldData.originalFieldInfo.getExtras().get(KEY_DYNAMIC_BASE),
+          shardAddr,
+          fieldData.originalShardAddr);
+
+      Object indexFlags = fi.getExtras().get(KEY_INDEX_FLAGS);
+      if (indexFlags != null) {
+        if (fieldData.indexFlags == null) {
+          fieldData.indexFlags = indexFlags;
+          fieldData.indexFlagsShardAddr = shardAddr;
+          fieldData.aggregated.add(KEY_INDEX_FLAGS, indexFlags);
+        } else {
+          validateFieldAttr(
+              fieldName,
+              KEY_INDEX_FLAGS,
+              indexFlags,
+              fieldData.indexFlags,
+              shardAddr,
+              fieldData.indexFlagsShardAddr);
+        }
+      }
+    }
+
+    Long docsAsLong = fi.getDocsAsLong();
+    if (docsAsLong != null) {
+      fieldData.aggregated.compute(
+          KEY_DOCS_AS_LONG, (key, val) -> val == null ? docsAsLong : (Long) 
val + docsAsLong);
+    }
+  }
+
+  /** Validates that a field attribute value is identical across shards. */
+  private void validateFieldAttr(
+      String fieldName,
+      String attrName,
+      Object currentVal,
+      Object expectedVal,
+      String currentShardAddr,
+      String expectedShardAddr) {
+    String currentStr = currentVal != null ? currentVal.toString() : null;
+    String expectedStr = expectedVal != null ? expectedVal.toString() : null;
+    if (!Objects.equals(currentStr, expectedStr)) {
+      throw new SolrException(

Review Comment:
   I don't mean to suggest sweeping this problem under the rug.  But maybe add 
as a header in all-caps and log a warning as well.  OTOH, what you said about 
doing distrib=false seems like a perfectly fine work-around for a user in this 
case.  I'm fine either way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to