This is an automated email from the ASF dual-hosted git repository. hossman pushed a commit to branch jira/SOLR-6312 in repository https://gitbox.apache.org/repos/asf/solr.git
commit 345afd12fa2d03b795bcc6de0ee1639476e2e683 Author: Chris Hostetter <[email protected]> AuthorDate: Thu Jan 5 15:26:46 2023 -0700 Add IsUpdateRequest.isSendToLeaders() such that both it and client config must be true for shards.preference to be overridden --- .../solr/client/solrj/impl/CloudSolrClient.java | 24 +++- .../solrj/request/AbstractUpdateRequest.java | 11 ++ .../solr/client/solrj/request/IsUpdateRequest.java | 15 ++- .../impl/SendUpdatesToLeadersOverrideTest.java | 135 +++++++++++++++++++-- 4 files changed, 170 insertions(+), 15 deletions(-) diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java index c05fcf70b70..900aaa9108a 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java @@ -1044,9 +1044,7 @@ public abstract class CloudSolrClient extends SolrClient { boolean sendToLeaders = false; if (request instanceof IsUpdateRequest) { - sendToLeaders = - // nocommit: also check request.isSendToLeaders() (still to be added) - this.isUpdatesToLeaders(); + sendToLeaders = ((IsUpdateRequest) request).isSendToLeaders() && this.isUpdatesToLeaders(); // Check if we can do a "directUpdate" ... if (sendToLeaders && request instanceof UpdateRequest) { @@ -1206,11 +1204,31 @@ public abstract class CloudSolrClient extends SolrClient { return uniqueNames; } + /** + * If true, this client has been configured such that it will generally prefer to send {@link + * IsUpdateRequest} requests to a shard leader, if and only if {@link + * IsUpdateRequest#isSendToLeaders} is also true. If false, then this client has been configured + * to obey normal routing preferences when dealing with {@link IsUpdateRequest} requests. + * + * @see #isDirectUpdatesToLeadersOnly + */ public boolean isUpdatesToLeaders() { return updatesToLeaders; } /** + * If true, this client has been configured such that "direct updates" will <em>only</em> be sent + * to the current leader of the corrisponding shard, and will not be retried with other replicas. + * This method has no effect if {@link #isUpdatesToLeaders()} or {@link + * IsUpdateRequest#isSendToLeaders} returns false. + * + * <p>A "direct update" is any update that can be sent directly to a single shard, and does not + * need to be broadcast to every shard. (Example: document updates or "delete by id" when using + * the default router; non-direct updates are things like commits and "delete by query"). + * + * <p>NOTE: If a single {@link UpdateRequest} contains multiple "direct updates" for different + * shards, this client may break the request up and merge th resposes. + * * @return true if direct updates are sent to shard leaders only */ public boolean isDirectUpdatesToLeadersOnly() { diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/AbstractUpdateRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/AbstractUpdateRequest.java index 352fd290f59..733502353dd 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/AbstractUpdateRequest.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/AbstractUpdateRequest.java @@ -159,4 +159,15 @@ public abstract class AbstractUpdateRequest extends SolrRequest<UpdateResponse> this.commitWithin = commitWithin; return this; } + + private boolean sendToLeaders = true; + + public boolean isSendToLeaders() { + return sendToLeaders; + } + + public AbstractUpdateRequest setSendToLeaders(final boolean sendToLeaders) { + this.sendToLeaders = sendToLeaders; + return this; + } } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/IsUpdateRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/IsUpdateRequest.java index 5e8c8aff854..54f449e1358 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/IsUpdateRequest.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/IsUpdateRequest.java @@ -16,5 +16,18 @@ */ package org.apache.solr.client.solrj.request; +import org.apache.solr.client.solrj.impl.CloudSolrClient; + /** Marker class so that we can determine which requests are updates. */ -public interface IsUpdateRequest {} +public interface IsUpdateRequest { + + /** + * Indicates if clients should make attempts to route this request to a shard leader, overriding + * typical client routing preferences for requests. Defaults to true. + * + * @see CloudSolrClient#isUpdatesToLeaders + */ + default boolean isSendToLeaders() { + return true; + } +} diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/SendUpdatesToLeadersOverrideTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/SendUpdatesToLeadersOverrideTest.java index 0daaa76dcb0..044c24f339d 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/SendUpdatesToLeadersOverrideTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/SendUpdatesToLeadersOverrideTest.java @@ -32,6 +32,7 @@ import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.request.AbstractUpdateRequest; import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.cloud.SolrCloudTestCase; @@ -165,13 +166,13 @@ public class SendUpdatesToLeadersOverrideTest extends SolrCloudTestCase { } /** - * Given an {@link UpdateRequest} and a {@link SolrClient}, processes that request against that - * client while {@link TrackingUpdateProcessorFactory} is recording, does some basic validation, - * then passes the recorded <code>pre-distrib</code> and <code>post-distrib</code> coreNames to - * the specified validators + * Given an {@link AbstractUpdateRequest} and a {@link SolrClient}, processes that request against + * that client while {@link TrackingUpdateProcessorFactory} is recording, does some basic + * validation, then passes the recorded <code>pre-distrib</code> and <code>post-distrib</code> + * coreNames to the specified validators */ private static RecordingResults assertUpdateWithRecording( - final UpdateRequest req, final SolrClient client) throws Exception { + final AbstractUpdateRequest req, final SolrClient client) throws Exception { TrackingUpdateProcessorFactory.startRecording("pre-distrib"); TrackingUpdateProcessorFactory.startRecording("post-distrib"); @@ -194,10 +195,11 @@ public class SendUpdatesToLeadersOverrideTest extends SolrCloudTestCase { } /** - * Since {@link UpdateRequest#setParam} isn't a fluent API, this is a wrapper helper for setting - * <code>shards.preference=replica.type:PULL</code> on the input req, and then returning that req + * Since {@link AbstractUpdateRequest#setParam} isn't a fluent API, this is a wrapper helper for + * setting <code>shards.preference=replica.type:PULL</code> on the input req, and then returning + * that req */ - private static UpdateRequest prefPull(final UpdateRequest req) { + private static AbstractUpdateRequest prefPull(final AbstractUpdateRequest req) { req.setParam("shards.preference", "replica.type:PULL"); return req; } @@ -205,17 +207,18 @@ public class SendUpdatesToLeadersOverrideTest extends SolrCloudTestCase { // nocommit: - test CloudHttp2SolrClient as well // basic sanity check of expected default behavior - public void testUpdatesDefaultToLeaders() throws Exception { + public void testClientThatDefaultsToLeaders() throws Exception { try (CloudSolrClient client = new CloudLegacySolrClient.Builder( Collections.singletonList(cluster.getZkServer().getZkAddress()), Optional.empty()) .sendUpdatesOnlyToShardLeaders() .build()) { checkUpdatesDefaultToLeaders(client); + checkUpdatesWithSendToLeadersFalse(client); } } - public void testUpdatesWithShardsPrefPull() throws Exception { + public void testClientThatDoesNotDefaultToLeaders() throws Exception { try (CloudSolrClient client = new CloudLegacySolrClient.Builder( Collections.singletonList(cluster.getZkServer().getZkAddress()), Optional.empty()) @@ -224,6 +227,7 @@ public class SendUpdatesToLeadersOverrideTest extends SolrCloudTestCase { .sendUpdatesToAllReplicasInShard() .build()) { checkUpdatesWithShardsPrefPull(client); + checkUpdatesWithSendToLeadersFalse(client); } } @@ -334,7 +338,7 @@ public class SendUpdatesToLeadersOverrideTest extends SolrCloudTestCase { } /** - * Given a SolrClient, sends various updates using {#link #prefPull} and asserts expecations that + * Given a SolrClient, sends various updates using {@link #prefPull} and asserts expecations that * these requests will be initially sent to PULL replcias */ private void checkUpdatesWithShardsPrefPull(final CloudSolrClient client) throws Exception { @@ -441,6 +445,115 @@ public class SendUpdatesToLeadersOverrideTest extends SolrCloudTestCase { } } + /** + * Given a SolrClient, sends various updates were {@link IsUpdateRequest#isSendToLeaders} returns + * false, and asserts expectations that requess using {@link #prefPull} are all sent to PULL + * replicas, regardless of how the client is configured. + */ + private void checkUpdatesWithSendToLeadersFalse(final CloudSolrClient client) throws Exception { + { // single doc add... + final RecordingResults add = + assertUpdateWithRecording( + prefPull(new UpdateRequest().add(sdoc("id", "hoss"))).setSendToLeaders(false), + client); + + // ...should start on (some) PULL replica, since we asked nicely + assertThat("add pre-distrib size", add.preDistribCores.keySet(), hasSize(1)); + assertThat( + "add pre-distrib must be PULL", + add.preDistribCores.keySet(), + everyItem(isIn(PULL_REPLICA_CORE_NAMES))); + assertThat("add pre-distrib size", add.preDistribRequests.keySet(), hasSize(1)); + assertThat("add pre-distrib size", add.preDistribCommands, hasSize(1)); + + // ...then be routed to single leader for this id + assertThat("add post-distrib size", add.postDistribCores.keySet(), hasSize(1)); + assertThat( + "add post-distrib must be leader", + add.postDistribCores.keySet(), + everyItem(isIn(LEADER_CORE_NAMES))); + assertThat("add post-distrib size", add.postDistribRequests.keySet(), hasSize(1)); + assertThat("add post-distrib size", add.postDistribCommands, hasSize(1)); + + // A DBI should also start on (some) PULL replica, since we asked nicely. + // + // then it should be distributed to whatever leader our add doc (for the same id) was sent to + final RecordingResults del = + assertUpdateWithRecording( + prefPull(new UpdateRequest().deleteById("hoss")).setSendToLeaders(false), client); + assertThat("del pre-distrib size", del.preDistribCores.keySet(), hasSize(1)); + assertThat( + "del pre-distrib must be PULL", + del.preDistribCores.keySet(), + everyItem(isIn(PULL_REPLICA_CORE_NAMES))); + assertThat("del pre-distrib size", del.preDistribRequests.keySet(), hasSize(1)); + assertThat("del pre-distrib size", del.preDistribCommands, hasSize(1)); + + assertEquals( + "add and del should have same post-distrib leader", + add.postDistribCores.keySet(), + del.postDistribCores.keySet()); + assertThat("del post-distrib size", del.postDistribRequests.keySet(), hasSize(1)); + assertThat("del post-distrib size", del.postDistribCommands, hasSize(1)); + } + + { // DBQ start on (some) PULL replica, since we asked nicely, then be routed to all leaders + final RecordingResults record = + assertUpdateWithRecording( + prefPull(new UpdateRequest().deleteByQuery("*:*")).setSendToLeaders(false), client); + + assertThat("dbq pre-distrib size", record.preDistribCores.keySet(), hasSize(1)); + assertThat( + "dbq pre-distrib must be PULL", + record.preDistribCores.keySet(), + everyItem(isIn(PULL_REPLICA_CORE_NAMES))); + assertThat("dbq pre-distrib size", record.preDistribRequests.keySet(), hasSize(1)); + assertThat("dbq pre-distrib size", record.preDistribCommands, hasSize(1)); + + assertEquals( + "dbq post-distrib must be all leaders", + LEADER_CORE_NAMES, + record.postDistribCores.keySet()); + assertThat( + "dbq post-distrib size", + record.postDistribRequests.keySet(), + hasSize(LEADER_CORE_NAMES.size())); + assertThat( + "dbq post-distrib size", record.postDistribCommands, hasSize(LEADER_CORE_NAMES.size())); + } + + { // When we sendToLeaders is disabled, a single UpdateRequest containing multiple adds + // should still only go to one replica for all the "pre" commands, then be forwarded + // the respective leaders for the "post" commands + + final RecordingResults record = + assertUpdateWithRecording( + prefPull(createMultiDirectUpdates(100, 10)).setSendToLeaders(false), client); + + assertThat("multi pre-distrib size", record.preDistribCores.keySet(), hasSize(1)); + assertThat( + "multi pre-distrib must be PULL", + record.preDistribCores.keySet(), + everyItem(isIn(PULL_REPLICA_CORE_NAMES))); + assertThat("multi pre-distrib req size", record.preDistribRequests.keySet(), hasSize(1)); + assertThat("multi pre-distrib command size", record.preDistribCommands, hasSize(100 + 10)); + + assertEquals( + "multi post-distrib must be all leaders", + LEADER_CORE_NAMES, + record.postDistribCores.keySet()); + // NOTE: Don't assume our docIds are spread across multi-shards... + // + // We make no asertion about number of post-distrb requests + // (distrib proc may batch differently then what we send) + assertThat( + "multi post-distrib cores", + record.postDistribCores.keySet(), + everyItem(isIn(LEADER_CORE_NAMES))); + assertThat("multi post-distrib command size", record.postDistribCommands, hasSize(100 + 10)); + } + } + private static UpdateRequest createMultiDirectUpdates(final int numAdds, final int numDel) { final UpdateRequest req = new UpdateRequest(); for (int i = 0; i < numAdds; i++) {
