This is an automated email from the ASF dual-hosted git repository.

dsmiley pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/main by this push:
     new 6f41a7884ba SOLR-17973: Fix `shards.preference` not respected for 
cross-collection join queries (#4218)
6f41a7884ba is described below

commit 6f41a7884bac47be908635798351e636f603b7d9
Author: Khush Jain <[email protected]>
AuthorDate: Tue Mar 17 20:57:16 2026 -0400

    SOLR-17973: Fix `shards.preference` not respected for cross-collection join 
queries (#4218)
---
 ...fix-shards-preference-cross-collection-join.yml |   7 +
 .../search/join/CrossCollectionJoinQParser.java    |   7 +
 .../solr/search/join/CrossCollectionJoinQuery.java |  22 +-
 .../search/join/CrossCollectionJoinQueryTest.java  | 298 +++++++++++++++++++++
 4 files changed, 333 insertions(+), 1 deletion(-)

diff --git 
a/changelog/unreleased/SOLR-17973-fix-shards-preference-cross-collection-join.yml
 
b/changelog/unreleased/SOLR-17973-fix-shards-preference-cross-collection-join.yml
new file mode 100644
index 00000000000..1620764ea06
--- /dev/null
+++ 
b/changelog/unreleased/SOLR-17973-fix-shards-preference-cross-collection-join.yml
@@ -0,0 +1,7 @@
+title: "SOLR-17973: Fix `shards.preference` not respected for cross-collection 
join queries"
+type: fixed
+authors:
+  - name: khushjain
+links:
+  - name: SOLR-17973
+    url: https://issues.apache.org/jira/browse/SOLR-17973
diff --git 
a/solr/core/src/java/org/apache/solr/search/join/CrossCollectionJoinQParser.java
 
b/solr/core/src/java/org/apache/solr/search/join/CrossCollectionJoinQParser.java
index 1b78a0cb2e0..2e4675a880d 100644
--- 
a/solr/core/src/java/org/apache/solr/search/join/CrossCollectionJoinQParser.java
+++ 
b/solr/core/src/java/org/apache/solr/search/join/CrossCollectionJoinQParser.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
 import java.util.Set;
 import org.apache.lucene.search.Query;
 import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.ShardParams;
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.search.QParser;
@@ -102,6 +103,12 @@ public class CrossCollectionJoinQParser extends QParser {
       }
     }
 
+    // Propagate shards.preference from request-level params if not already 
set in localParams
+    String shardsPreference = 
req.getParams().get(ShardParams.SHARDS_PREFERENCE);
+    if (shardsPreference != null && 
otherParams.get(ShardParams.SHARDS_PREFERENCE) == null) {
+      otherParams.set(ShardParams.SHARDS_PREFERENCE, shardsPreference);
+    }
+
     return new CrossCollectionJoinQuery(
         query, zkHost, solrUrl, collection, fromField, toField, 
routedByJoinKey, ttl, otherParams);
   }
diff --git 
a/solr/core/src/java/org/apache/solr/search/join/CrossCollectionJoinQuery.java 
b/solr/core/src/java/org/apache/solr/search/join/CrossCollectionJoinQuery.java
index bb03022afdf..8381ed17dbb 100644
--- 
a/solr/core/src/java/org/apache/solr/search/join/CrossCollectionJoinQuery.java
+++ 
b/solr/core/src/java/org/apache/solr/search/join/CrossCollectionJoinQuery.java
@@ -48,11 +48,14 @@ import org.apache.solr.client.solrj.io.stream.TupleStream;
 import org.apache.solr.client.solrj.io.stream.UniqueStream;
 import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
 import 
org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
+import 
org.apache.solr.client.solrj.routing.RequestReplicaListTransformerGenerator;
 import org.apache.solr.cloud.CloudDescriptor;
+import org.apache.solr.cloud.ZkController;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocRouter;
 import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
@@ -219,11 +222,13 @@ public class CrossCollectionJoinQuery extends Query 
implements SolrSearcherRequi
     }
 
     private TupleStream createCloudSolrStream(SolrClientCache solrClientCache) 
throws IOException {
+      ZkController zkController = 
searcher.getCore().getCoreContainer().getZkController();
+
       String streamZkHost;
       if (zkHost != null) {
         streamZkHost = zkHost;
       } else {
-        streamZkHost = 
searcher.getCore().getCoreContainer().getZkController().getZkServerAddress();
+        streamZkHost = zkController.getZkServerAddress();
       }
 
       ModifiableSolrParams params = new ModifiableSolrParams(otherParams);
@@ -239,6 +244,21 @@ public class CrossCollectionJoinQuery extends Query 
implements SolrSearcherRequi
 
       StreamContext streamContext = new StreamContext();
       streamContext.setSolrClientCache(solrClientCache);
+      streamContext.setRequestParams(new ModifiableSolrParams(otherParams));
+      if (zkController != null) {
+        RequestReplicaListTransformerGenerator rltg =
+            new RequestReplicaListTransformerGenerator(
+                zkController
+                    .getZkStateReader()
+                    .getClusterProperties()
+                    .getOrDefault(ZkStateReader.DEFAULT_SHARD_PREFERENCES, "")
+                    .toString(),
+                zkController.getNodeName(),
+                zkController.getBaseUrl(),
+                zkController.getHostName(),
+                zkController.getSysPropsCacher());
+        streamContext.setRequestReplicaListTransformerGenerator(rltg);
+      }
 
       TupleStream cloudSolrStream = new CloudSolrStream(streamZkHost, 
collection, params);
       TupleStream uniqueStream = new UniqueStream(cloudSolrStream, new 
FieldEqualitor(fromField));
diff --git 
a/solr/core/src/test/org/apache/solr/search/join/CrossCollectionJoinQueryTest.java
 
b/solr/core/src/test/org/apache/solr/search/join/CrossCollectionJoinQueryTest.java
index 91cbc3bbd8b..70d77363539 100644
--- 
a/solr/core/src/test/org/apache/solr/search/join/CrossCollectionJoinQueryTest.java
+++ 
b/solr/core/src/test/org/apache/solr/search/join/CrossCollectionJoinQueryTest.java
@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Locale;
+import org.apache.lucene.search.Query;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
@@ -30,8 +31,16 @@ import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.client.solrj.response.QueryResponse;
 import org.apache.solr.cloud.SolrCloudTestCase;
 import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.ShardParams;
 import org.apache.solr.embedded.JettySolrRunner;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.request.SolrQueryRequestBase;
+import org.apache.solr.search.QueryParsing;
+import org.apache.solr.util.SolrJMetricTestUtils;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -305,6 +314,295 @@ public class CrossCollectionJoinQueryTest extends 
SolrCloudTestCase {
     }
   }
 
+  @Test
+  public void testShardsPreferenceRequestParamPropagation() throws Exception {
+    // shards.preference set as a request-level param (not in localParams) 
should be
+    // propagated to the query's otherParams
+    ModifiableSolrParams requestParams = new ModifiableSolrParams();
+    requestParams.set(ShardParams.SHARDS_PREFERENCE, "replica.leader:false");
+
+    ModifiableSolrParams localParams = new ModifiableSolrParams();
+    localParams.set(QueryParsing.V, "*:*");
+    localParams.set(CrossCollectionJoinQParser.FROM_INDEX, "products");
+    localParams.set(CrossCollectionJoinQParser.FROM, "product_id_s");
+    localParams.set(CrossCollectionJoinQParser.TO, "product_id_s");
+    localParams.set(CrossCollectionJoinQParser.ROUTED_BY_JOIN_KEY, "false");
+
+    try (SolrQueryRequest req = new SolrQueryRequestBase(null, requestParams) 
{}) {
+      CrossCollectionJoinQParser parser =
+          new CrossCollectionJoinQParser(
+              null, localParams, requestParams, req, "product_id_s", null);
+      Query query = parser.parse();
+
+      CrossCollectionJoinQuery ccjQuery = (CrossCollectionJoinQuery) query;
+      assertEquals("replica.leader:false", 
ccjQuery.otherParams.get(ShardParams.SHARDS_PREFERENCE));
+    }
+  }
+
+  @Test
+  public void testShardsPreferenceLocalParamTakesPrecedence() throws Exception 
{
+    // When shards.preference is set in both localParams and request params,
+    // the localParams value should take precedence
+    ModifiableSolrParams requestParams = new ModifiableSolrParams();
+    requestParams.set(ShardParams.SHARDS_PREFERENCE, "replica.leader:false");
+
+    ModifiableSolrParams localParams = new ModifiableSolrParams();
+    localParams.set(QueryParsing.V, "*:*");
+    localParams.set(CrossCollectionJoinQParser.FROM_INDEX, "products");
+    localParams.set(CrossCollectionJoinQParser.FROM, "product_id_s");
+    localParams.set(CrossCollectionJoinQParser.TO, "product_id_s");
+    localParams.set(CrossCollectionJoinQParser.ROUTED_BY_JOIN_KEY, "false");
+    localParams.set(ShardParams.SHARDS_PREFERENCE, "replica.leader:true");
+
+    try (SolrQueryRequest req = new SolrQueryRequestBase(null, requestParams) 
{}) {
+      CrossCollectionJoinQParser parser =
+          new CrossCollectionJoinQParser(
+              null, localParams, requestParams, req, "product_id_s", null);
+      Query query = parser.parse();
+
+      CrossCollectionJoinQuery ccjQuery = (CrossCollectionJoinQuery) query;
+      // localParams value should take precedence over request-level param
+      assertEquals("replica.leader:true", 
ccjQuery.otherParams.get(ShardParams.SHARDS_PREFERENCE));
+    }
+  }
+
+  @Test
+  public void testShardsPreferenceWithCrossCollectionJoin() throws Exception {
+    // Use 1 shard with 2 replicas for the "from" collection so there is 
exactly
+    // 1 leader and 1 non-leader, each on a different node. This lets us verify
+    // via per-node /export metrics which replica actually served the stream 
request.
+    final String fromCollection = "products_pref_test";
+    final String toCollection = "parts_pref_test";
+    try {
+      CollectionAdminRequest.createCollection(fromCollection, "ccjoin", 1, 2)
+          .process(cluster.getSolrClient());
+      CollectionAdminRequest.createCollection(toCollection, "ccjoin", 1, 1)
+          .process(cluster.getSolrClient());
+      cluster.waitForActiveCollection(fromCollection, 1, 2);
+      cluster.waitForActiveCollection(toCollection, 1, 1);
+
+      // Index test data
+      List<SolrInputDocument> productDocs = new ArrayList<>();
+      List<SolrInputDocument> partDocs = new ArrayList<>();
+      for (int productId = 0; productId < NUM_PRODUCTS; ++productId) {
+        int sizeNum = productId % SIZES.length;
+        String size = SIZES[sizeNum];
+        productDocs.add(
+            new SolrInputDocument(
+                "id", String.valueOf(productId),
+                "product_id_s", String.valueOf(productId),
+                "size_s", size));
+        for (int partNum = 0; partNum <= sizeNum; partNum++) {
+          String partId = String.format(Locale.ROOT, "%d_%d", productId, 
partNum);
+          partDocs.add(
+              new SolrInputDocument("id", partId, "product_id_s", 
String.valueOf(productId)));
+        }
+      }
+      indexDocs(fromCollection, productDocs);
+      cluster.getSolrClient().commit(fromCollection);
+      indexDocs(toCollection, partDocs);
+      cluster.getSolrClient().commit(toCollection);
+
+      // Identify leader and non-leader replicas for the "from" collection's 
single shard
+      DocCollection fromDocCollection =
+          
cluster.getSolrClient().getClusterState().getCollection(fromCollection);
+      Slice shard = fromDocCollection.getSlices().iterator().next();
+      Replica leader = shard.getLeader();
+      assertNotNull("Leader should exist for shard", leader);
+      Replica nonLeader =
+          shard.getReplicas().stream()
+              .filter(r -> !r.getName().equals(leader.getName()))
+              .findFirst()
+              .orElseThrow(() -> new AssertionError("Expected a non-leader 
replica"));
+
+      String leaderBaseUrl = leader.getBaseUrl();
+      String nonLeaderBaseUrl = nonLeader.getBaseUrl();
+      assertNotEquals(
+          "Leader and non-leader should be on different nodes for this test to 
be meaningful",
+          leaderBaseUrl,
+          nonLeaderBaseUrl);
+
+      // --- Test 1: replica.leader:false should route /export to the 
non-leader ---
+      double leaderCountBefore = getNumExportRequests(leaderBaseUrl, 
fromCollection);
+      double nonLeaderCountBefore = getNumExportRequests(nonLeaderBaseUrl, 
fromCollection);
+
+      ModifiableSolrParams params = new ModifiableSolrParams();
+      params.set(
+          "q",
+          String.format(
+              Locale.ROOT,
+              "{!join method=crossCollection fromIndex=%s from=product_id_s 
to=product_id_s routed=false ttl=0}size_s:M",
+              fromCollection));
+      params.set("rows", "0");
+      params.set(ShardParams.SHARDS_PREFERENCE, "replica.leader:false");
+
+      QueryResponse resp = cluster.getSolrClient().query(toCollection, params);
+      assertEquals(NUM_PRODUCTS / 2, resp.getResults().getNumFound());
+
+      double leaderCountAfter = getNumExportRequests(leaderBaseUrl, 
fromCollection);
+      double nonLeaderCountAfter = getNumExportRequests(nonLeaderBaseUrl, 
fromCollection);
+
+      assertTrue(
+          "Non-leader replica should have received the /export request"
+              + " (before="
+              + nonLeaderCountBefore
+              + ", after="
+              + nonLeaderCountAfter
+              + ")",
+          nonLeaderCountAfter > nonLeaderCountBefore);
+      assertEquals(
+          "Leader replica should NOT have received the /export request",
+          leaderCountBefore,
+          leaderCountAfter,
+          0.0);
+
+      // --- Test 2: replica.leader:true should route /export to the leader ---
+      leaderCountBefore = leaderCountAfter;
+      nonLeaderCountBefore = nonLeaderCountAfter;
+
+      params.set(ShardParams.SHARDS_PREFERENCE, "replica.leader:true");
+      resp = cluster.getSolrClient().query(toCollection, params);
+      assertEquals(NUM_PRODUCTS / 2, resp.getResults().getNumFound());
+
+      leaderCountAfter = getNumExportRequests(leaderBaseUrl, fromCollection);
+      nonLeaderCountAfter = getNumExportRequests(nonLeaderBaseUrl, 
fromCollection);
+
+      assertTrue(
+          "Leader replica should have received the /export request"
+              + " (before="
+              + leaderCountBefore
+              + ", after="
+              + leaderCountAfter
+              + ")",
+          leaderCountAfter > leaderCountBefore);
+      assertEquals(
+          "Non-leader replica should NOT have received the /export request",
+          nonLeaderCountBefore,
+          nonLeaderCountAfter,
+          0.0);
+    } finally {
+      
CollectionAdminRequest.deleteCollection(toCollection).process(cluster.getSolrClient());
+      
CollectionAdminRequest.deleteCollection(fromCollection).process(cluster.getSolrClient());
+    }
+  }
+
+  @Test
+  public void testShardsPreferenceLocationLocal() throws Exception {
+    // Test that replica.location:local routes the join's /export stream to a 
replica
+    // on the same node where the join query is processed. This validates that 
the
+    // RequestReplicaListTransformerGenerator is initialized with full node 
context
+    // (nodeName, baseUrl, hostName).
+    final String fromCollection = "products_local_test";
+    final String toCollection = "parts_local_test";
+    try {
+      // "from" collection: 1 shard, NUM_NODES replicas → one replica on every 
node
+      CollectionAdminRequest.createCollection(fromCollection, "ccjoin", 1, 
NUM_NODES)
+          .process(cluster.getSolrClient());
+      // "to" collection: 1 shard, 1 replica → on exactly one node
+      CollectionAdminRequest.createCollection(toCollection, "ccjoin", 1, 1)
+          .process(cluster.getSolrClient());
+      cluster.waitForActiveCollection(fromCollection, 1, NUM_NODES);
+      cluster.waitForActiveCollection(toCollection, 1, 1);
+
+      // Index test data
+      List<SolrInputDocument> productDocs = new ArrayList<>();
+      List<SolrInputDocument> partDocs = new ArrayList<>();
+      for (int productId = 0; productId < NUM_PRODUCTS; ++productId) {
+        int sizeNum = productId % SIZES.length;
+        String size = SIZES[sizeNum];
+        productDocs.add(
+            new SolrInputDocument(
+                "id", String.valueOf(productId),
+                "product_id_s", String.valueOf(productId),
+                "size_s", size));
+        for (int partNum = 0; partNum <= sizeNum; partNum++) {
+          String partId = String.format(Locale.ROOT, "%d_%d", productId, 
partNum);
+          partDocs.add(
+              new SolrInputDocument("id", partId, "product_id_s", 
String.valueOf(productId)));
+        }
+      }
+      indexDocs(fromCollection, productDocs);
+      cluster.getSolrClient().commit(fromCollection);
+      indexDocs(toCollection, partDocs);
+      cluster.getSolrClient().commit(toCollection);
+
+      // Find the node hosting the "to" collection's shard. The join stream 
will execute
+      // on this node, so replica.location:local should prefer the "from" 
replica here.
+      DocCollection toDocCollection =
+          
cluster.getSolrClient().getClusterState().getCollection(toCollection);
+      Slice toShard = toDocCollection.getSlices().iterator().next();
+      String toNodeBaseUrl = 
toShard.getReplicas().iterator().next().getBaseUrl();
+
+      // Collect all "from" replica base URLs
+      DocCollection fromDocCollection =
+          
cluster.getSolrClient().getClusterState().getCollection(fromCollection);
+      Slice fromShard = fromDocCollection.getSlices().iterator().next();
+      List<String> fromBaseUrls = new ArrayList<>();
+      for (Replica r : fromShard.getReplicas()) {
+        fromBaseUrls.add(r.getBaseUrl());
+      }
+      assertTrue(
+          "The 'from' collection should have a replica on the same node as the 
'to' collection",
+          fromBaseUrls.contains(toNodeBaseUrl));
+
+      // Get baseline /export request counts for the "from" collection on all 
nodes
+      double localCountBefore = getNumExportRequests(toNodeBaseUrl, 
fromCollection);
+      List<double[]> remoteCountsBefore = new ArrayList<>();
+      for (String baseUrl : fromBaseUrls) {
+        if (!baseUrl.equals(toNodeBaseUrl)) {
+          remoteCountsBefore.add(new double[] {getNumExportRequests(baseUrl, 
fromCollection)});
+        }
+      }
+
+      // Execute join query with replica.location:local
+      ModifiableSolrParams params = new ModifiableSolrParams();
+      params.set(
+          "q",
+          String.format(
+              Locale.ROOT,
+              "{!join method=crossCollection fromIndex=%s from=product_id_s 
to=product_id_s routed=false ttl=0}size_s:M",
+              fromCollection));
+      params.set("rows", "0");
+      params.set(ShardParams.SHARDS_PREFERENCE, "replica.location:local");
+
+      QueryResponse resp = cluster.getSolrClient().query(toCollection, params);
+      assertEquals(NUM_PRODUCTS / 2, resp.getResults().getNumFound());
+
+      // Verify the local node's "from" replica received the /export request
+      double localCountAfter = getNumExportRequests(toNodeBaseUrl, 
fromCollection);
+      assertTrue(
+          "Local 'from' replica should have received the /export request"
+              + " (before="
+              + localCountBefore
+              + ", after="
+              + localCountAfter
+              + ")",
+          localCountAfter > localCountBefore);
+
+      // Verify remote nodes did NOT receive /export requests
+      int remoteIdx = 0;
+      for (String baseUrl : fromBaseUrls) {
+        if (!baseUrl.equals(toNodeBaseUrl)) {
+          double remoteCountAfter = getNumExportRequests(baseUrl, 
fromCollection);
+          assertEquals(
+              "Remote 'from' replica on " + baseUrl + " should NOT have 
received /export request",
+              remoteCountsBefore.get(remoteIdx)[0],
+              remoteCountAfter,
+              0.0);
+          remoteIdx++;
+        }
+      }
+    } finally {
+      
CollectionAdminRequest.deleteCollection(toCollection).process(cluster.getSolrClient());
+      
CollectionAdminRequest.deleteCollection(fromCollection).process(cluster.getSolrClient());
+    }
+  }
+
+  private static double getNumExportRequests(String baseUrl, String 
collectionName)
+      throws SolrServerException, IOException {
+    return SolrJMetricTestUtils.getNumCoreRequests(baseUrl, collectionName, 
"QUERY", "/export");
+  }
+
   public void testCcJoinQuery(String query, boolean expectFullResults) throws 
Exception {
     assertResultCount("parts", query, NUM_PRODUCTS / 2, expectFullResults);
   }

Reply via email to