kotman12 commented on code in PR #4472:
URL: https://github.com/apache/solr/pull/4472#discussion_r3314556672
##########
solr/core/src/java/org/apache/solr/handler/admin/LukeRequestHandler.java:
##########
@@ -189,8 +240,411 @@ public void handleRequestBody(SolrQueryRequest req,
SolrQueryResponse rsp) throw
info.add(
"NOTE",
"Document Frequency (df) is not updated when a document is marked for
deletion. df values include deleted documents.");
- rsp.add("info", info);
+ rsp.add(RSP_INFO, info);
+ rsp.setHttpCaching(false);
+ }
+
+ /**
+ * Field-level response keys, declared in the order they appear in the local
(non-distributed)
+ * response. EnumMap iteration follows declaration order, giving
deterministic output.
+ */
+ enum FieldDataKey {
+ TYPE(KEY_TYPE),
+ SCHEMA(KEY_SCHEMA_FLAGS),
+ DYNAMIC_BASE(KEY_DYNAMIC_BASE),
+ INDEX(KEY_INDEX_FLAGS),
+ DOCS(KEY_DOCS);
+
+ final String responseKey;
+
+ FieldDataKey(String responseKey) {
+ this.responseKey = responseKey;
+ }
+ }
+
+ /** Per-field accumulation state across shards: aggregated response data and
field validation. */
+ private static class AggregatedFieldData {
+ final EnumMap<FieldDataKey, Object> properties = new
EnumMap<>(FieldDataKey.class);
+ final String originalShardAddr;
+ final LukeResponse.FieldInfo originalFieldInfo;
+ private String indexFlagsShardAddr;
+
+ AggregatedFieldData(String shardAddr, LukeResponse.FieldInfo fieldInfo) {
+ this.originalShardAddr = shardAddr;
+ this.originalFieldInfo = fieldInfo;
+ properties.put(FieldDataKey.TYPE, fieldInfo.getType());
+ properties.put(FieldDataKey.SCHEMA, fieldInfo.getSchema());
+ Object dynBase = fieldInfo.getExtras().get(KEY_DYNAMIC_BASE);
+ if (dynBase != null) {
+ properties.put(FieldDataKey.DYNAMIC_BASE, dynBase);
+ }
+ Object indexFlags = fieldInfo.getExtras().get(KEY_INDEX_FLAGS);
+ if (indexFlags != null) {
+ properties.put(FieldDataKey.INDEX, indexFlags);
+ this.indexFlagsShardAddr = shardAddr;
+ }
+ }
+
+ SimpleOrderedMap<Object> toResponse() {
+ SimpleOrderedMap<Object> result = new SimpleOrderedMap<>();
+ for (Map.Entry<FieldDataKey, Object> entry : properties.entrySet()) {
+ result.add(entry.getKey().responseKey, entry.getValue());
+ }
+ return result;
+ }
+ }
+
+ private static class ShardData {
+ final String shardAddr; // key in "shards" response map
+ final Map<String, LukeResponse.FieldInfo> shardFieldInfo; // keyed by
field name
+ private NamedList<Object> indexInfo; // value for "index" key in per-shard
entry
+ private SimpleOrderedMap<Object> detailedFields; // keyed by field name
+
+ ShardData(String shardAddr, Map<String, LukeResponse.FieldInfo>
shardFieldInfo) {
+ this.shardAddr = shardAddr;
+ this.shardFieldInfo = shardFieldInfo;
+ }
+
+ void setIndexInfo(NamedList<Object> indexInfo) {
+ this.indexInfo = indexInfo;
+ }
+
+ void addDetailedFieldInfo(String fieldName, SimpleOrderedMap<Object>
fieldStats) {
+ if (detailedFields == null) {
+ detailedFields = new SimpleOrderedMap<>();
+ }
+ detailedFields.add(fieldName, fieldStats);
+ }
+
+ SimpleOrderedMap<Object> toResponseEntry() {
+ SimpleOrderedMap<Object> entry = new SimpleOrderedMap<>();
+ if (indexInfo != null) {
+ entry.add(RSP_INDEX, indexInfo);
+ }
+ if (detailedFields != null) {
+ entry.add(RSP_FIELDS, detailedFields);
+ }
+ return entry;
+ }
+ }
+
+ /**
+ * @return true if the request was handled in distributed mode, false if
prepDistributed
+ * short-circuited (e.g. single-shard collection) and the caller should
fall through to local
+ * logic.
+ */
+ private boolean handleDistributed(SolrQueryRequest req, SolrQueryResponse
rsp) {
+ SolrParams reqParams = req.getParams();
+
+ // docId is a Lucene-internal integer, not meaningful across shards
+ if (reqParams.getInt(DOC_ID) != null) {
+ throw new SolrException(
+ ErrorCode.BAD_REQUEST,
+ "docId parameter is not supported in distributed mode."
+ + " Use the id parameter to look up documents by their Solr
unique key.");
+ }
+
+ ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
+ ResponseBuilder rb = new ResponseBuilder(req, rsp, List.of());
+ shardHandler.prepDistributed(rb);
+
+ String[] shards = rb.shards;
+ if (shards == null || shards.length == 0) {
+ return false;
+ }
+
+ ShardRequest sreq = new ShardRequest();
+ sreq.shards = shards;
+ sreq.actualShards = shards;
+ sreq.responses = new ArrayList<>(shards.length);
+
+ String reqPath = (String) req.getContext().get(PATH);
+
+ for (String shard : shards) {
+ ModifiableSolrParams params = new ModifiableSolrParams(reqParams);
+ params.set(CommonParams.QT, reqPath);
+ ShardHandler.setShardAttributesToParams(params, sreq.purpose);
+ shardHandler.submit(sreq, shard, params);
+ }
+
+ ShardResponse lastSrsp = shardHandler.takeCompletedOrError();
+ if (lastSrsp == null) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, "No responses received
from shards");
+ }
+ List<ShardResponse> responses = sreq.responses;
+ for (ShardResponse srsp : responses) {
+ if (srsp.getException() != null) {
+ shardHandler.cancelAll();
+ if (srsp.getException() instanceof SolrException) {
+ throw (SolrException) srsp.getException();
+ }
+ throw new SolrException(ErrorCode.SERVER_ERROR, srsp.getException());
+ }
+ }
+
+ aggregateDistributedResponses(req, rsp, responses);
rsp.setHttpCaching(false);
+ return true;
+ }
+
+ private static String shardAddress(ShardResponse srsp) {
+ return srsp.getShardAddress() != null ? srsp.getShardAddress() :
srsp.getShard();
+ }
+
+ private void aggregateDistributedResponses(
+ SolrQueryRequest req, SolrQueryResponse rsp, List<ShardResponse>
responses) {
+
+ if (!responses.isEmpty()) {
+ ShardResponse firstRsp = responses.get(0);
+ NamedList<Object> firstShardRsp =
firstRsp.getSolrResponse().getResponse();
+ if (firstShardRsp == null) {
+ throw new SolrException(
+ ErrorCode.SERVER_ERROR,
+ "Unexpected empty response from shard: " + shardAddress(firstRsp));
+ }
+ Object schema = firstShardRsp.get(RSP_SCHEMA);
+ if (schema != null) {
+ rsp.add(RSP_SCHEMA, schema);
+ }
+ }
+
+ long totalNumDocs = 0;
+ int totalMaxDoc = 0;
+ long totalDeletedDocs = 0;
+ int totalSegmentCount = 0;
+ Map<String, AggregatedFieldData> aggregatedFields = new TreeMap<>();
+ String firstDocShard = null;
+ Object firstDoc = null;
+ List<ShardData> shardDataList = new ArrayList<>();
+
+ for (ShardResponse srsp : responses) {
+ NamedList<Object> shardRsp = srsp.getSolrResponse().getResponse();
+ LukeResponse lukeRsp = new LukeResponse();
+ lukeRsp.setResponse(shardRsp);
+ // Only process field info if the shard explicitly included it in its
response.
+ // LukeResponse.getFieldInfo() falls back to schema.fields which has
incomplete data.
+ Map<String, LukeResponse.FieldInfo> fieldInfo =
+ shardRsp.get(RSP_FIELDS) != null ? lukeRsp.getFieldInfo() : null;
+ ShardData shardData = new ShardData(shardAddress(srsp), fieldInfo);
+
+ NamedList<Object> shardIndex = lukeRsp.getIndexInfo();
+ if (shardIndex != null) {
+ totalNumDocs += Optional.ofNullable(lukeRsp.getNumDocs()).orElse(0L);
+ totalMaxDoc = Math.max(totalMaxDoc,
Optional.ofNullable(lukeRsp.getMaxDoc()).orElse(0));
+ totalDeletedDocs +=
Optional.ofNullable(lukeRsp.getDeletedDocs()).orElse(0L);
+ Number segCount = (Number) shardIndex.get(KEY_SEGMENT_COUNT);
+ totalSegmentCount += segCount != null ? segCount.intValue() : 0;
+
+ shardData.setIndexInfo(shardIndex);
+ }
+
+ processShardFields(shardData, aggregatedFields);
+ Object doc = shardRsp.get(RSP_DOC);
+ if (doc != null) {
+ if (firstDoc != null) {
+ throw new SolrException(
+ ErrorCode.SERVER_ERROR,
+ "Solr Id of document "
+ + firstDoc
+ + " found on multiple shards ("
+ + firstDocShard
+ + " and "
+ + shardAddress(srsp)
+ + "). The index is corrupt: unique key constraint
violated.");
+ }
+ firstDoc = doc;
+ firstDocShard = shardAddress(srsp);
+ }
+ shardDataList.add(shardData);
+ }
+
+ shardDataList.sort(Comparator.comparing(sd -> sd.shardAddr));
+ SimpleOrderedMap<Object> shardsInfo = new SimpleOrderedMap<>();
+ for (ShardData sd : shardDataList) {
+ SimpleOrderedMap<Object> entry = sd.toResponseEntry();
+ if (!entry.isEmpty()) {
+ shardsInfo.add(sd.shardAddr, entry);
+ }
+ }
+
+ SimpleOrderedMap<Object> aggregatedIndex = new SimpleOrderedMap<>();
+ aggregatedIndex.add(KEY_NUM_DOCS, totalNumDocs);
+ aggregatedIndex.add(KEY_MAX_DOC, totalMaxDoc);
+ aggregatedIndex.add(KEY_DELETED_DOCS, totalDeletedDocs);
+ aggregatedIndex.add(KEY_SEGMENT_COUNT, totalSegmentCount);
+ rsp.add(RSP_INDEX, aggregatedIndex);
+
+ if (firstDoc != null) {
+ rsp.add(RSP_DOC, firstDoc);
+ }
+ boolean narrowLongs = shouldNarrowLongsForOldClient(req);
+ if (narrowLongs) {
+ narrowLongToInt(aggregatedIndex, KEY_NUM_DOCS);
+ narrowLongToInt(aggregatedIndex, KEY_DELETED_DOCS);
+ }
+ if (!aggregatedFields.isEmpty()) {
+ SimpleOrderedMap<Object> aggregatedFieldsNL = new SimpleOrderedMap<>();
+ for (Map.Entry<String, AggregatedFieldData> entry :
aggregatedFields.entrySet()) {
+ SimpleOrderedMap<Object> fieldResponse = entry.getValue().toResponse();
+ if (narrowLongs) {
+ narrowLongToInt(fieldResponse, KEY_DOCS);
+ }
+ aggregatedFieldsNL.add(entry.getKey(), fieldResponse);
+ }
+ rsp.add(RSP_FIELDS, aggregatedFieldsNL);
+ }
+
+ // Add info section last (before shards), matching the local-mode key
order.
+ if (!responses.isEmpty()) {
+ NamedList<Object> firstShardRsp =
responses.get(0).getSolrResponse().getResponse();
+ Object info = firstShardRsp == null ? null : firstShardRsp.get(RSP_INFO);
+ if (info != null) {
+ rsp.add(RSP_INFO, info);
+ }
+ }
+
+ if (req.getParams().getBool(ShardParams.SHARDS_INFO, false)) {
+ rsp.add(RSP_SHARDS, shardsInfo);
+ }
+ }
+
+ private void processShardFields(
+ ShardData shardData, Map<String, AggregatedFieldData> aggregatedFields) {
+ if (shardData.shardFieldInfo == null) {
+ return;
+ }
+ for (Map.Entry<String, LukeResponse.FieldInfo> entry :
shardData.shardFieldInfo.entrySet()) {
+ String fieldName = entry.getKey();
+ LukeResponse.FieldInfo fi = entry.getValue();
+
+ aggregateShardField(shardData.shardAddr, fi, aggregatedFields);
+
+ // Detailed stats — kept per-shard, not aggregated
+ NamedList<Integer> topTerms = fi.getTopTerms();
+ if (topTerms != null) {
+ SimpleOrderedMap<Object> detailedFieldInfo = new SimpleOrderedMap<>();
+ detailedFieldInfo.add(KEY_TOP_TERMS, topTerms);
+ detailedFieldInfo.add(KEY_HISTOGRAM,
fi.getExtras().get(KEY_HISTOGRAM));
+ detailedFieldInfo.add(KEY_DISTINCT, fi.getDistinct());
+ shardData.addDetailedFieldInfo(fieldName, detailedFieldInfo);
+ }
+ }
+ }
+
+ private void aggregateShardField(
+ String shardAddr,
+ LukeResponse.FieldInfo fi,
+ Map<String, AggregatedFieldData> aggregatedFields) {
+
+ String fieldName = fi.getName();
+
+ AggregatedFieldData fieldData = aggregatedFields.get(fieldName);
+ if (fieldData == null) {
+ fieldData = new AggregatedFieldData(shardAddr, fi);
+ aggregatedFields.put(fieldName, fieldData);
+ } else {
+ // Subsequent shards: validate that type, schema, and dynamicBase match
+ validateFieldAttr(
+ fieldName,
+ KEY_TYPE,
+ fi.getType(),
+ fieldData.originalFieldInfo.getType(),
+ shardAddr,
+ fieldData.originalShardAddr);
+ validateFieldAttr(
+ fieldName,
+ KEY_SCHEMA_FLAGS,
+ fi.getSchema(),
+ fieldData.originalFieldInfo.getSchema(),
+ shardAddr,
+ fieldData.originalShardAddr);
+ validateFieldAttr(
+ fieldName,
+ KEY_DYNAMIC_BASE,
+ fi.getExtras().get(KEY_DYNAMIC_BASE),
+ fieldData.originalFieldInfo.getExtras().get(KEY_DYNAMIC_BASE),
+ shardAddr,
+ fieldData.originalShardAddr);
+
+ Object indexFlags = fi.getExtras().get(KEY_INDEX_FLAGS);
+ if (indexFlags != null) {
+ Object existing = fieldData.properties.get(FieldDataKey.INDEX);
+ if (existing == null) {
+ fieldData.properties.put(FieldDataKey.INDEX, indexFlags);
+ fieldData.indexFlagsShardAddr = shardAddr;
+ } else {
+ validateFieldAttr(
+ fieldName,
+ KEY_INDEX_FLAGS,
+ indexFlags,
+ existing,
+ shardAddr,
+ fieldData.indexFlagsShardAddr);
+ }
+ }
+ }
+
+ // Sum per-shard doc counts
+ fieldData.properties.merge(FieldDataKey.DOCS, fi.getDocs(), (a, b) ->
(long) a + (long) b);
+ }
+
+ /**
+ * Minimum client version that understands Long values in distributed Luke
responses. Distributed
+ * Luke aggregates counts across shards, which can overflow Integer. Older
clients cast these
+ * values to Integer and would fail with a ClassCastException.
+ */
+ private static final SolrVersion DISTRIB_LONG_COUNTS_MIN_VERSION =
+ SolrVersion.forIntegers(9, 11, 0);
+
+ private static boolean shouldNarrowLongsForOldClient(SolrQueryRequest req) {
+ HttpSolrCall call = req.getHttpSolrCall();
+ if (call == null) return false;
+ SolrVersion clientVersion = call.getUserAgentSolrVersion();
+ return clientVersion != null &&
clientVersion.lessThan(DISTRIB_LONG_COUNTS_MIN_VERSION);
Review Comment:
I didn't mean it was an issue with the wt itself, just that to test the old
client narrowing behavior I hardcoded the java client version string into a
regular curl request header with wt=xml because I realized that the xml writer
response includes long/int datatype in the xml tag of the field. But this of
course can break callers requesting xml with a non-java client who expect docs
to be in an int tag and not a long tag. So I'm just calling this out as a
backwards incompatible flow.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]