This is an automated email from the ASF dual-hosted git repository. dsmiley pushed a commit to branch branch_10x in repository https://gitbox.apache.org/repos/asf/solr.git
commit ef262aee258df45de9d48101ee8eb8828fb00bc6 Author: Khush Jain <[email protected]> AuthorDate: Tue Mar 17 20:57:16 2026 -0400 SOLR-17973: Fix `shards.preference` not respected for cross-collection join queries (#4218) --- ...fix-shards-preference-cross-collection-join.yml | 7 + .../search/join/CrossCollectionJoinQParser.java | 7 + .../solr/search/join/CrossCollectionJoinQuery.java | 22 +- .../search/join/CrossCollectionJoinQueryTest.java | 298 +++++++++++++++++++++ 4 files changed, 333 insertions(+), 1 deletion(-) diff --git a/changelog/unreleased/SOLR-17973-fix-shards-preference-cross-collection-join.yml b/changelog/unreleased/SOLR-17973-fix-shards-preference-cross-collection-join.yml new file mode 100644 index 00000000000..1620764ea06 --- /dev/null +++ b/changelog/unreleased/SOLR-17973-fix-shards-preference-cross-collection-join.yml @@ -0,0 +1,7 @@ +title: "SOLR-17973: Fix `shards.preference` not respected for cross-collection join queries" +type: fixed +authors: + - name: khushjain +links: + - name: SOLR-17973 + url: https://issues.apache.org/jira/browse/SOLR-17973 diff --git a/solr/core/src/java/org/apache/solr/search/join/CrossCollectionJoinQParser.java b/solr/core/src/java/org/apache/solr/search/join/CrossCollectionJoinQParser.java index 1b78a0cb2e0..2e4675a880d 100644 --- a/solr/core/src/java/org/apache/solr/search/join/CrossCollectionJoinQParser.java +++ b/solr/core/src/java/org/apache/solr/search/join/CrossCollectionJoinQParser.java @@ -23,6 +23,7 @@ import java.util.Iterator; import java.util.Set; import org.apache.lucene.search.Query; import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.params.ShardParams; import org.apache.solr.common.params.SolrParams; import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.search.QParser; @@ -102,6 +103,12 @@ public class CrossCollectionJoinQParser extends QParser { } } + // Propagate shards.preference from request-level params if not already set in localParams + String shardsPreference = req.getParams().get(ShardParams.SHARDS_PREFERENCE); + if (shardsPreference != null && otherParams.get(ShardParams.SHARDS_PREFERENCE) == null) { + otherParams.set(ShardParams.SHARDS_PREFERENCE, shardsPreference); + } + return new CrossCollectionJoinQuery( query, zkHost, solrUrl, collection, fromField, toField, routedByJoinKey, ttl, otherParams); } diff --git a/solr/core/src/java/org/apache/solr/search/join/CrossCollectionJoinQuery.java b/solr/core/src/java/org/apache/solr/search/join/CrossCollectionJoinQuery.java index bb03022afdf..8381ed17dbb 100644 --- a/solr/core/src/java/org/apache/solr/search/join/CrossCollectionJoinQuery.java +++ b/solr/core/src/java/org/apache/solr/search/join/CrossCollectionJoinQuery.java @@ -48,11 +48,14 @@ import org.apache.solr.client.solrj.io.stream.TupleStream; import org.apache.solr.client.solrj.io.stream.UniqueStream; import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter; +import org.apache.solr.client.solrj.routing.RequestReplicaListTransformerGenerator; import org.apache.solr.cloud.CloudDescriptor; +import org.apache.solr.cloud.ZkController; import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.DocRouter; import org.apache.solr.common.cloud.Slice; +import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.SolrParams; @@ -219,11 +222,13 @@ public class CrossCollectionJoinQuery extends Query implements SolrSearcherRequi } private TupleStream createCloudSolrStream(SolrClientCache solrClientCache) throws IOException { + ZkController zkController = searcher.getCore().getCoreContainer().getZkController(); + String streamZkHost; if (zkHost != null) { streamZkHost = zkHost; } else { - streamZkHost = searcher.getCore().getCoreContainer().getZkController().getZkServerAddress(); + streamZkHost = zkController.getZkServerAddress(); } ModifiableSolrParams params = new ModifiableSolrParams(otherParams); @@ -239,6 +244,21 @@ public class CrossCollectionJoinQuery extends Query implements SolrSearcherRequi StreamContext streamContext = new StreamContext(); streamContext.setSolrClientCache(solrClientCache); + streamContext.setRequestParams(new ModifiableSolrParams(otherParams)); + if (zkController != null) { + RequestReplicaListTransformerGenerator rltg = + new RequestReplicaListTransformerGenerator( + zkController + .getZkStateReader() + .getClusterProperties() + .getOrDefault(ZkStateReader.DEFAULT_SHARD_PREFERENCES, "") + .toString(), + zkController.getNodeName(), + zkController.getBaseUrl(), + zkController.getHostName(), + zkController.getSysPropsCacher()); + streamContext.setRequestReplicaListTransformerGenerator(rltg); + } TupleStream cloudSolrStream = new CloudSolrStream(streamZkHost, collection, params); TupleStream uniqueStream = new UniqueStream(cloudSolrStream, new FieldEqualitor(fromField)); diff --git a/solr/core/src/test/org/apache/solr/search/join/CrossCollectionJoinQueryTest.java b/solr/core/src/test/org/apache/solr/search/join/CrossCollectionJoinQueryTest.java index 91cbc3bbd8b..70d77363539 100644 --- a/solr/core/src/test/org/apache/solr/search/join/CrossCollectionJoinQueryTest.java +++ b/solr/core/src/test/org/apache/solr/search/join/CrossCollectionJoinQueryTest.java @@ -23,6 +23,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Locale; +import org.apache.lucene.search.Query; import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.impl.CloudSolrClient; import org.apache.solr.client.solrj.request.CollectionAdminRequest; @@ -30,8 +31,16 @@ import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.client.solrj.response.QueryResponse; import org.apache.solr.cloud.SolrCloudTestCase; import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.params.ShardParams; import org.apache.solr.embedded.JettySolrRunner; +import org.apache.solr.request.SolrQueryRequest; +import org.apache.solr.request.SolrQueryRequestBase; +import org.apache.solr.search.QueryParsing; +import org.apache.solr.util.SolrJMetricTestUtils; import org.junit.BeforeClass; import org.junit.Test; @@ -305,6 +314,295 @@ public class CrossCollectionJoinQueryTest extends SolrCloudTestCase { } } + @Test + public void testShardsPreferenceRequestParamPropagation() throws Exception { + // shards.preference set as a request-level param (not in localParams) should be + // propagated to the query's otherParams + ModifiableSolrParams requestParams = new ModifiableSolrParams(); + requestParams.set(ShardParams.SHARDS_PREFERENCE, "replica.leader:false"); + + ModifiableSolrParams localParams = new ModifiableSolrParams(); + localParams.set(QueryParsing.V, "*:*"); + localParams.set(CrossCollectionJoinQParser.FROM_INDEX, "products"); + localParams.set(CrossCollectionJoinQParser.FROM, "product_id_s"); + localParams.set(CrossCollectionJoinQParser.TO, "product_id_s"); + localParams.set(CrossCollectionJoinQParser.ROUTED_BY_JOIN_KEY, "false"); + + try (SolrQueryRequest req = new SolrQueryRequestBase(null, requestParams) {}) { + CrossCollectionJoinQParser parser = + new CrossCollectionJoinQParser( + null, localParams, requestParams, req, "product_id_s", null); + Query query = parser.parse(); + + CrossCollectionJoinQuery ccjQuery = (CrossCollectionJoinQuery) query; + assertEquals("replica.leader:false", ccjQuery.otherParams.get(ShardParams.SHARDS_PREFERENCE)); + } + } + + @Test + public void testShardsPreferenceLocalParamTakesPrecedence() throws Exception { + // When shards.preference is set in both localParams and request params, + // the localParams value should take precedence + ModifiableSolrParams requestParams = new ModifiableSolrParams(); + requestParams.set(ShardParams.SHARDS_PREFERENCE, "replica.leader:false"); + + ModifiableSolrParams localParams = new ModifiableSolrParams(); + localParams.set(QueryParsing.V, "*:*"); + localParams.set(CrossCollectionJoinQParser.FROM_INDEX, "products"); + localParams.set(CrossCollectionJoinQParser.FROM, "product_id_s"); + localParams.set(CrossCollectionJoinQParser.TO, "product_id_s"); + localParams.set(CrossCollectionJoinQParser.ROUTED_BY_JOIN_KEY, "false"); + localParams.set(ShardParams.SHARDS_PREFERENCE, "replica.leader:true"); + + try (SolrQueryRequest req = new SolrQueryRequestBase(null, requestParams) {}) { + CrossCollectionJoinQParser parser = + new CrossCollectionJoinQParser( + null, localParams, requestParams, req, "product_id_s", null); + Query query = parser.parse(); + + CrossCollectionJoinQuery ccjQuery = (CrossCollectionJoinQuery) query; + // localParams value should take precedence over request-level param + assertEquals("replica.leader:true", ccjQuery.otherParams.get(ShardParams.SHARDS_PREFERENCE)); + } + } + + @Test + public void testShardsPreferenceWithCrossCollectionJoin() throws Exception { + // Use 1 shard with 2 replicas for the "from" collection so there is exactly + // 1 leader and 1 non-leader, each on a different node. This lets us verify + // via per-node /export metrics which replica actually served the stream request. + final String fromCollection = "products_pref_test"; + final String toCollection = "parts_pref_test"; + try { + CollectionAdminRequest.createCollection(fromCollection, "ccjoin", 1, 2) + .process(cluster.getSolrClient()); + CollectionAdminRequest.createCollection(toCollection, "ccjoin", 1, 1) + .process(cluster.getSolrClient()); + cluster.waitForActiveCollection(fromCollection, 1, 2); + cluster.waitForActiveCollection(toCollection, 1, 1); + + // Index test data + List<SolrInputDocument> productDocs = new ArrayList<>(); + List<SolrInputDocument> partDocs = new ArrayList<>(); + for (int productId = 0; productId < NUM_PRODUCTS; ++productId) { + int sizeNum = productId % SIZES.length; + String size = SIZES[sizeNum]; + productDocs.add( + new SolrInputDocument( + "id", String.valueOf(productId), + "product_id_s", String.valueOf(productId), + "size_s", size)); + for (int partNum = 0; partNum <= sizeNum; partNum++) { + String partId = String.format(Locale.ROOT, "%d_%d", productId, partNum); + partDocs.add( + new SolrInputDocument("id", partId, "product_id_s", String.valueOf(productId))); + } + } + indexDocs(fromCollection, productDocs); + cluster.getSolrClient().commit(fromCollection); + indexDocs(toCollection, partDocs); + cluster.getSolrClient().commit(toCollection); + + // Identify leader and non-leader replicas for the "from" collection's single shard + DocCollection fromDocCollection = + cluster.getSolrClient().getClusterState().getCollection(fromCollection); + Slice shard = fromDocCollection.getSlices().iterator().next(); + Replica leader = shard.getLeader(); + assertNotNull("Leader should exist for shard", leader); + Replica nonLeader = + shard.getReplicas().stream() + .filter(r -> !r.getName().equals(leader.getName())) + .findFirst() + .orElseThrow(() -> new AssertionError("Expected a non-leader replica")); + + String leaderBaseUrl = leader.getBaseUrl(); + String nonLeaderBaseUrl = nonLeader.getBaseUrl(); + assertNotEquals( + "Leader and non-leader should be on different nodes for this test to be meaningful", + leaderBaseUrl, + nonLeaderBaseUrl); + + // --- Test 1: replica.leader:false should route /export to the non-leader --- + double leaderCountBefore = getNumExportRequests(leaderBaseUrl, fromCollection); + double nonLeaderCountBefore = getNumExportRequests(nonLeaderBaseUrl, fromCollection); + + ModifiableSolrParams params = new ModifiableSolrParams(); + params.set( + "q", + String.format( + Locale.ROOT, + "{!join method=crossCollection fromIndex=%s from=product_id_s to=product_id_s routed=false ttl=0}size_s:M", + fromCollection)); + params.set("rows", "0"); + params.set(ShardParams.SHARDS_PREFERENCE, "replica.leader:false"); + + QueryResponse resp = cluster.getSolrClient().query(toCollection, params); + assertEquals(NUM_PRODUCTS / 2, resp.getResults().getNumFound()); + + double leaderCountAfter = getNumExportRequests(leaderBaseUrl, fromCollection); + double nonLeaderCountAfter = getNumExportRequests(nonLeaderBaseUrl, fromCollection); + + assertTrue( + "Non-leader replica should have received the /export request" + + " (before=" + + nonLeaderCountBefore + + ", after=" + + nonLeaderCountAfter + + ")", + nonLeaderCountAfter > nonLeaderCountBefore); + assertEquals( + "Leader replica should NOT have received the /export request", + leaderCountBefore, + leaderCountAfter, + 0.0); + + // --- Test 2: replica.leader:true should route /export to the leader --- + leaderCountBefore = leaderCountAfter; + nonLeaderCountBefore = nonLeaderCountAfter; + + params.set(ShardParams.SHARDS_PREFERENCE, "replica.leader:true"); + resp = cluster.getSolrClient().query(toCollection, params); + assertEquals(NUM_PRODUCTS / 2, resp.getResults().getNumFound()); + + leaderCountAfter = getNumExportRequests(leaderBaseUrl, fromCollection); + nonLeaderCountAfter = getNumExportRequests(nonLeaderBaseUrl, fromCollection); + + assertTrue( + "Leader replica should have received the /export request" + + " (before=" + + leaderCountBefore + + ", after=" + + leaderCountAfter + + ")", + leaderCountAfter > leaderCountBefore); + assertEquals( + "Non-leader replica should NOT have received the /export request", + nonLeaderCountBefore, + nonLeaderCountAfter, + 0.0); + } finally { + CollectionAdminRequest.deleteCollection(toCollection).process(cluster.getSolrClient()); + CollectionAdminRequest.deleteCollection(fromCollection).process(cluster.getSolrClient()); + } + } + + @Test + public void testShardsPreferenceLocationLocal() throws Exception { + // Test that replica.location:local routes the join's /export stream to a replica + // on the same node where the join query is processed. This validates that the + // RequestReplicaListTransformerGenerator is initialized with full node context + // (nodeName, baseUrl, hostName). + final String fromCollection = "products_local_test"; + final String toCollection = "parts_local_test"; + try { + // "from" collection: 1 shard, NUM_NODES replicas → one replica on every node + CollectionAdminRequest.createCollection(fromCollection, "ccjoin", 1, NUM_NODES) + .process(cluster.getSolrClient()); + // "to" collection: 1 shard, 1 replica → on exactly one node + CollectionAdminRequest.createCollection(toCollection, "ccjoin", 1, 1) + .process(cluster.getSolrClient()); + cluster.waitForActiveCollection(fromCollection, 1, NUM_NODES); + cluster.waitForActiveCollection(toCollection, 1, 1); + + // Index test data + List<SolrInputDocument> productDocs = new ArrayList<>(); + List<SolrInputDocument> partDocs = new ArrayList<>(); + for (int productId = 0; productId < NUM_PRODUCTS; ++productId) { + int sizeNum = productId % SIZES.length; + String size = SIZES[sizeNum]; + productDocs.add( + new SolrInputDocument( + "id", String.valueOf(productId), + "product_id_s", String.valueOf(productId), + "size_s", size)); + for (int partNum = 0; partNum <= sizeNum; partNum++) { + String partId = String.format(Locale.ROOT, "%d_%d", productId, partNum); + partDocs.add( + new SolrInputDocument("id", partId, "product_id_s", String.valueOf(productId))); + } + } + indexDocs(fromCollection, productDocs); + cluster.getSolrClient().commit(fromCollection); + indexDocs(toCollection, partDocs); + cluster.getSolrClient().commit(toCollection); + + // Find the node hosting the "to" collection's shard. The join stream will execute + // on this node, so replica.location:local should prefer the "from" replica here. + DocCollection toDocCollection = + cluster.getSolrClient().getClusterState().getCollection(toCollection); + Slice toShard = toDocCollection.getSlices().iterator().next(); + String toNodeBaseUrl = toShard.getReplicas().iterator().next().getBaseUrl(); + + // Collect all "from" replica base URLs + DocCollection fromDocCollection = + cluster.getSolrClient().getClusterState().getCollection(fromCollection); + Slice fromShard = fromDocCollection.getSlices().iterator().next(); + List<String> fromBaseUrls = new ArrayList<>(); + for (Replica r : fromShard.getReplicas()) { + fromBaseUrls.add(r.getBaseUrl()); + } + assertTrue( + "The 'from' collection should have a replica on the same node as the 'to' collection", + fromBaseUrls.contains(toNodeBaseUrl)); + + // Get baseline /export request counts for the "from" collection on all nodes + double localCountBefore = getNumExportRequests(toNodeBaseUrl, fromCollection); + List<double[]> remoteCountsBefore = new ArrayList<>(); + for (String baseUrl : fromBaseUrls) { + if (!baseUrl.equals(toNodeBaseUrl)) { + remoteCountsBefore.add(new double[] {getNumExportRequests(baseUrl, fromCollection)}); + } + } + + // Execute join query with replica.location:local + ModifiableSolrParams params = new ModifiableSolrParams(); + params.set( + "q", + String.format( + Locale.ROOT, + "{!join method=crossCollection fromIndex=%s from=product_id_s to=product_id_s routed=false ttl=0}size_s:M", + fromCollection)); + params.set("rows", "0"); + params.set(ShardParams.SHARDS_PREFERENCE, "replica.location:local"); + + QueryResponse resp = cluster.getSolrClient().query(toCollection, params); + assertEquals(NUM_PRODUCTS / 2, resp.getResults().getNumFound()); + + // Verify the local node's "from" replica received the /export request + double localCountAfter = getNumExportRequests(toNodeBaseUrl, fromCollection); + assertTrue( + "Local 'from' replica should have received the /export request" + + " (before=" + + localCountBefore + + ", after=" + + localCountAfter + + ")", + localCountAfter > localCountBefore); + + // Verify remote nodes did NOT receive /export requests + int remoteIdx = 0; + for (String baseUrl : fromBaseUrls) { + if (!baseUrl.equals(toNodeBaseUrl)) { + double remoteCountAfter = getNumExportRequests(baseUrl, fromCollection); + assertEquals( + "Remote 'from' replica on " + baseUrl + " should NOT have received /export request", + remoteCountsBefore.get(remoteIdx)[0], + remoteCountAfter, + 0.0); + remoteIdx++; + } + } + } finally { + CollectionAdminRequest.deleteCollection(toCollection).process(cluster.getSolrClient()); + CollectionAdminRequest.deleteCollection(fromCollection).process(cluster.getSolrClient()); + } + } + + private static double getNumExportRequests(String baseUrl, String collectionName) + throws SolrServerException, IOException { + return SolrJMetricTestUtils.getNumCoreRequests(baseUrl, collectionName, "QUERY", "/export"); + } + public void testCcJoinQuery(String query, boolean expectFullResults) throws Exception { assertResultCount("parts", query, NUM_PRODUCTS / 2, expectFullResults); }
